mirror of
https://github.com/scsibug/nostr-rs-relay.git
synced 2024-11-22 00:59:07 -05:00
improvement: run filters as separate queries, to reduce complexity on SQLite query planner
This commit is contained in:
parent
3979a94726
commit
2557c7f69c
|
@ -332,119 +332,121 @@ impl NostrRepo for SqliteRepo {
|
||||||
}
|
}
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
let mut row_count: usize = 0;
|
let mut row_count: usize = 0;
|
||||||
// generate SQL query
|
|
||||||
let (q, p, idxs) = query_from_sub(&sub);
|
|
||||||
let sql_gen_elapsed = start.elapsed();
|
|
||||||
|
|
||||||
if sql_gen_elapsed > Duration::from_millis(10) {
|
|
||||||
debug!("SQL (slow) generated in {:?}", start.elapsed());
|
|
||||||
}
|
|
||||||
// cutoff for displaying slow queries
|
// cutoff for displaying slow queries
|
||||||
let slow_cutoff = Duration::from_millis(2000);
|
let slow_cutoff = Duration::from_millis(2000);
|
||||||
// any client that doesn't cause us to generate new rows in 5
|
let mut filter_count = 0;
|
||||||
// seconds gets dropped.
|
for filter in sub.filters.iter() {
|
||||||
let abort_cutoff = Duration::from_secs(5);
|
filter_count += 1;
|
||||||
let start = Instant::now();
|
let (q, p, idx) = query_from_filter(&filter);
|
||||||
let mut slow_first_event;
|
let sql_gen_elapsed = start.elapsed();
|
||||||
let mut last_successful_send = Instant::now();
|
|
||||||
if let Ok(mut conn) = self.read_pool.get() {
|
|
||||||
// execute the query.
|
|
||||||
// make the actual SQL query (with parameters inserted) available
|
|
||||||
conn.trace(Some(|x| {trace!("SQL trace: {:?}", x)}));
|
|
||||||
let mut stmt = conn.prepare_cached(&q)?;
|
|
||||||
let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
|
|
||||||
|
|
||||||
let mut first_result = true;
|
if sql_gen_elapsed > Duration::from_millis(10) {
|
||||||
while let Some(row) = event_rows.next()? {
|
debug!("SQL (slow) generated in {:?}", start.elapsed());
|
||||||
let first_event_elapsed = start.elapsed();
|
}
|
||||||
slow_first_event = first_event_elapsed >= slow_cutoff;
|
// any client that doesn't cause us to generate new rows in 5
|
||||||
if first_result {
|
// seconds gets dropped.
|
||||||
debug!(
|
let abort_cutoff = Duration::from_secs(5);
|
||||||
"first result in {:?} (cid: {}, sub: {:?}) [used indexes: {:?}]",
|
let mut slow_first_event;
|
||||||
first_event_elapsed, client_id, sub.id, idxs
|
let mut last_successful_send = Instant::now();
|
||||||
);
|
if let Ok(mut conn) = self.read_pool.get() {
|
||||||
first_result = false;
|
// execute the query.
|
||||||
}
|
// make the actual SQL query (with parameters inserted) available
|
||||||
// logging for slow queries; show sub and SQL.
|
conn.trace(Some(|x| {trace!("SQL trace: {:?}", x)}));
|
||||||
// to reduce logging; only show 1/16th of clients (leading 0)
|
let mut stmt = conn.prepare_cached(&q)?;
|
||||||
if row_count == 0 && slow_first_event && client_id.starts_with('0') {
|
let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
|
||||||
debug!(
|
|
||||||
"query req (slow): {:?} (cid: {}, sub: {:?})",
|
let mut first_result = true;
|
||||||
sub, client_id, sub.id
|
while let Some(row) = event_rows.next()? {
|
||||||
);
|
let first_event_elapsed = start.elapsed();
|
||||||
}
|
slow_first_event = first_event_elapsed >= slow_cutoff;
|
||||||
// check if a checkpoint is trying to run, and abort
|
if first_result {
|
||||||
if row_count % 100 == 0 {
|
debug!(
|
||||||
{
|
"first result in {:?} (cid: {}, sub: {:?}, filter: {}) [used index: {:?}]",
|
||||||
|
first_event_elapsed, client_id, sub.id, filter_count, idx
|
||||||
|
);
|
||||||
|
first_result = false;
|
||||||
|
}
|
||||||
|
// logging for slow queries; show sub and SQL.
|
||||||
|
// to reduce logging; only show 1/16th of clients (leading 0)
|
||||||
|
if row_count == 0 && slow_first_event && client_id.starts_with('0') {
|
||||||
|
debug!(
|
||||||
|
"query req (slow): {:?} (cid: {}, sub: {:?})",
|
||||||
|
sub, client_id, sub.id
|
||||||
|
);
|
||||||
|
}
|
||||||
|
// check if a checkpoint is trying to run, and abort
|
||||||
|
if row_count % 100 == 0 {
|
||||||
|
{
|
||||||
|
if self.checkpoint_in_progress.try_lock().is_err() {
|
||||||
|
// lock was held, abort this query
|
||||||
|
debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id);
|
||||||
|
metrics.query_aborts.inc();
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if this is still active; every 100 rows
|
||||||
|
if row_count % 100 == 0 && abandon_query_rx.try_recv().is_ok() {
|
||||||
|
debug!("query cancelled by client (cid: {}, sub: {:?})", client_id, sub.id);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
row_count += 1;
|
||||||
|
let event_json = row.get(0)?;
|
||||||
|
loop {
|
||||||
|
if query_tx.capacity() != 0 {
|
||||||
|
// we have capacity to add another item
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// the queue is full
|
||||||
|
trace!("db reader thread is stalled");
|
||||||
|
if last_successful_send + abort_cutoff < Instant::now() {
|
||||||
|
// the queue has been full for too long, abort
|
||||||
|
info!("aborting database query due to slow client (cid: {}, sub: {:?})",
|
||||||
|
client_id, sub.id);
|
||||||
|
metrics.query_aborts.inc();
|
||||||
|
let ok: Result<()> = Ok(());
|
||||||
|
return ok;
|
||||||
|
}
|
||||||
|
// check if a checkpoint is trying to run, and abort
|
||||||
if self.checkpoint_in_progress.try_lock().is_err() {
|
if self.checkpoint_in_progress.try_lock().is_err() {
|
||||||
// lock was held, abort this query
|
// lock was held, abort this query
|
||||||
debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id);
|
debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id);
|
||||||
metrics.query_aborts.inc();
|
metrics.query_aborts.inc();
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
// give the queue a chance to clear before trying again
|
||||||
|
thread::sleep(Duration::from_millis(100));
|
||||||
}
|
}
|
||||||
|
// TODO: we could use try_send, but we'd have to juggle
|
||||||
|
// getting the query result back as part of the error
|
||||||
|
// result.
|
||||||
|
query_tx
|
||||||
|
.blocking_send(QueryResult {
|
||||||
|
sub_id: sub.get_id(),
|
||||||
|
event: event_json,
|
||||||
|
})
|
||||||
|
.ok();
|
||||||
|
last_successful_send = Instant::now();
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
// check if this is still active; every 100 rows
|
warn!("Could not get a database connection for querying");
|
||||||
if row_count % 100 == 0 && abandon_query_rx.try_recv().is_ok() {
|
|
||||||
debug!("query cancelled by client (cid: {}, sub: {:?})", client_id, sub.id);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
row_count += 1;
|
|
||||||
let event_json = row.get(0)?;
|
|
||||||
loop {
|
|
||||||
if query_tx.capacity() != 0 {
|
|
||||||
// we have capacity to add another item
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
// the queue is full
|
|
||||||
trace!("db reader thread is stalled");
|
|
||||||
if last_successful_send + abort_cutoff < Instant::now() {
|
|
||||||
// the queue has been full for too long, abort
|
|
||||||
info!("aborting database query due to slow client (cid: {}, sub: {:?})",
|
|
||||||
client_id, sub.id);
|
|
||||||
metrics.query_aborts.inc();
|
|
||||||
let ok: Result<()> = Ok(());
|
|
||||||
return ok;
|
|
||||||
}
|
|
||||||
// check if a checkpoint is trying to run, and abort
|
|
||||||
if self.checkpoint_in_progress.try_lock().is_err() {
|
|
||||||
// lock was held, abort this query
|
|
||||||
debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id);
|
|
||||||
metrics.query_aborts.inc();
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
// give the queue a chance to clear before trying again
|
|
||||||
thread::sleep(Duration::from_millis(100));
|
|
||||||
}
|
|
||||||
// TODO: we could use try_send, but we'd have to juggle
|
|
||||||
// getting the query result back as part of the error
|
|
||||||
// result.
|
|
||||||
query_tx
|
|
||||||
.blocking_send(QueryResult {
|
|
||||||
sub_id: sub.get_id(),
|
|
||||||
event: event_json,
|
|
||||||
})
|
|
||||||
.ok();
|
|
||||||
last_successful_send = Instant::now();
|
|
||||||
}
|
}
|
||||||
query_tx
|
|
||||||
.blocking_send(QueryResult {
|
|
||||||
sub_id: sub.get_id(),
|
|
||||||
event: "EOSE".to_string(),
|
|
||||||
})
|
|
||||||
.ok();
|
|
||||||
debug!(
|
|
||||||
"query completed in {:?} (cid: {}, sub: {:?}, db_time: {:?}, rows: {})",
|
|
||||||
pre_spawn_start.elapsed(),
|
|
||||||
client_id,
|
|
||||||
sub.id,
|
|
||||||
start.elapsed(),
|
|
||||||
row_count
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
warn!("Could not get a database connection for querying");
|
|
||||||
}
|
}
|
||||||
|
debug!(
|
||||||
|
"query completed in {:?} (cid: {}, sub: {:?}, db_time: {:?}, rows: {})",
|
||||||
|
pre_spawn_start.elapsed(),
|
||||||
|
client_id,
|
||||||
|
sub.id,
|
||||||
|
start.elapsed(),
|
||||||
|
row_count
|
||||||
|
);
|
||||||
|
query_tx
|
||||||
|
.blocking_send(QueryResult {
|
||||||
|
sub_id: sub.get_id(),
|
||||||
|
event: "EOSE".to_string(),
|
||||||
|
})
|
||||||
|
.ok();
|
||||||
metrics
|
metrics
|
||||||
.query_sub
|
.query_sub
|
||||||
.observe(pre_spawn_start.elapsed().as_secs_f64());
|
.observe(pre_spawn_start.elapsed().as_secs_f64());
|
||||||
|
@ -668,7 +670,7 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
|
||||||
|
|
||||||
// if the filter is malformed, don't return anything.
|
// if the filter is malformed, don't return anything.
|
||||||
if f.force_no_match {
|
if f.force_no_match {
|
||||||
let empty_query = "SELECT e.content, e.created_at FROM event e WHERE 1=0".to_owned();
|
let empty_query = "SELECT e.content FROM event e WHERE 1=0".to_owned();
|
||||||
// query parameters for SQLite
|
// query parameters for SQLite
|
||||||
let empty_params: Vec<Box<dyn ToSql>> = vec![];
|
let empty_params: Vec<Box<dyn ToSql>> = vec![];
|
||||||
return (empty_query, empty_params, None);
|
return (empty_query, empty_params, None);
|
||||||
|
@ -677,7 +679,7 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
|
||||||
// check if the index needs to be overriden
|
// check if the index needs to be overriden
|
||||||
let idx_name = override_index(f);
|
let idx_name = override_index(f);
|
||||||
let idx_stmt = idx_name.as_ref().map_or_else(|| "".to_owned(), |i| format!("INDEXED BY {}",i));
|
let idx_stmt = idx_name.as_ref().map_or_else(|| "".to_owned(), |i| format!("INDEXED BY {}",i));
|
||||||
let mut query = format!("SELECT e.content, e.created_at FROM event e {}", idx_stmt);
|
let mut query = format!("SELECT e.content FROM event e {}", idx_stmt);
|
||||||
// query parameters for SQLite
|
// query parameters for SQLite
|
||||||
let mut params: Vec<Box<dyn ToSql>> = vec![];
|
let mut params: Vec<Box<dyn ToSql>> = vec![];
|
||||||
|
|
||||||
|
@ -815,7 +817,7 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a dynamic SQL query string and params from a subscription.
|
/// Create a dynamic SQL query string and params from a subscription.
|
||||||
fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>, Vec<String>) {
|
fn _query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>, Vec<String>) {
|
||||||
// build a dynamic SQL query for an entire subscription, based on
|
// build a dynamic SQL query for an entire subscription, based on
|
||||||
// SQL subqueries for filters.
|
// SQL subqueries for filters.
|
||||||
let mut subqueries: Vec<String> = Vec::new();
|
let mut subqueries: Vec<String> = Vec::new();
|
||||||
|
|
|
@ -719,7 +719,7 @@ async fn nostr_server(
|
||||||
match parsed {
|
match parsed {
|
||||||
Ok(e) => {
|
Ok(e) => {
|
||||||
let id_prefix:String = e.id.chars().take(8).collect();
|
let id_prefix:String = e.id.chars().take(8).collect();
|
||||||
debug!("successfully parsed/validated event: {:?} (cid: {})", id_prefix, cid);
|
debug!("successfully parsed/validated event: {:?} (cid: {}, kind: {})", id_prefix, cid, e.kind);
|
||||||
// check if the event is too far in the future.
|
// check if the event is too far in the future.
|
||||||
if e.is_valid_timestamp(settings.options.reject_future_seconds) {
|
if e.is_valid_timestamp(settings.options.reject_future_seconds) {
|
||||||
// Write this to the database.
|
// Write this to the database.
|
||||||
|
|
Loading…
Reference in New Issue
Block a user