diff --git a/src/repo/sqlite.rs b/src/repo/sqlite.rs index 8abe959..eae684c 100644 --- a/src/repo/sqlite.rs +++ b/src/repo/sqlite.rs @@ -332,119 +332,121 @@ impl NostrRepo for SqliteRepo { } let start = Instant::now(); let mut row_count: usize = 0; - // generate SQL query - let (q, p, idxs) = query_from_sub(&sub); - let sql_gen_elapsed = start.elapsed(); - - if sql_gen_elapsed > Duration::from_millis(10) { - debug!("SQL (slow) generated in {:?}", start.elapsed()); - } // cutoff for displaying slow queries let slow_cutoff = Duration::from_millis(2000); - // any client that doesn't cause us to generate new rows in 5 - // seconds gets dropped. - let abort_cutoff = Duration::from_secs(5); - let start = Instant::now(); - let mut slow_first_event; - let mut last_successful_send = Instant::now(); - if let Ok(mut conn) = self.read_pool.get() { - // execute the query. - // make the actual SQL query (with parameters inserted) available - conn.trace(Some(|x| {trace!("SQL trace: {:?}", x)})); - let mut stmt = conn.prepare_cached(&q)?; - let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?; + let mut filter_count = 0; + for filter in sub.filters.iter() { + filter_count += 1; + let (q, p, idx) = query_from_filter(&filter); + let sql_gen_elapsed = start.elapsed(); - let mut first_result = true; - while let Some(row) = event_rows.next()? { - let first_event_elapsed = start.elapsed(); - slow_first_event = first_event_elapsed >= slow_cutoff; - if first_result { - debug!( - "first result in {:?} (cid: {}, sub: {:?}) [used indexes: {:?}]", - first_event_elapsed, client_id, sub.id, idxs - ); - first_result = false; - } - // logging for slow queries; show sub and SQL. - // to reduce logging; only show 1/16th of clients (leading 0) - if row_count == 0 && slow_first_event && client_id.starts_with('0') { - debug!( - "query req (slow): {:?} (cid: {}, sub: {:?})", - sub, client_id, sub.id - ); - } - // check if a checkpoint is trying to run, and abort - if row_count % 100 == 0 { - { + if sql_gen_elapsed > Duration::from_millis(10) { + debug!("SQL (slow) generated in {:?}", start.elapsed()); + } + // any client that doesn't cause us to generate new rows in 5 + // seconds gets dropped. + let abort_cutoff = Duration::from_secs(5); + let mut slow_first_event; + let mut last_successful_send = Instant::now(); + if let Ok(mut conn) = self.read_pool.get() { + // execute the query. + // make the actual SQL query (with parameters inserted) available + conn.trace(Some(|x| {trace!("SQL trace: {:?}", x)})); + let mut stmt = conn.prepare_cached(&q)?; + let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?; + + let mut first_result = true; + while let Some(row) = event_rows.next()? { + let first_event_elapsed = start.elapsed(); + slow_first_event = first_event_elapsed >= slow_cutoff; + if first_result { + debug!( + "first result in {:?} (cid: {}, sub: {:?}, filter: {}) [used index: {:?}]", + first_event_elapsed, client_id, sub.id, filter_count, idx + ); + first_result = false; + } + // logging for slow queries; show sub and SQL. + // to reduce logging; only show 1/16th of clients (leading 0) + if row_count == 0 && slow_first_event && client_id.starts_with('0') { + debug!( + "query req (slow): {:?} (cid: {}, sub: {:?})", + sub, client_id, sub.id + ); + } + // check if a checkpoint is trying to run, and abort + if row_count % 100 == 0 { + { + if self.checkpoint_in_progress.try_lock().is_err() { + // lock was held, abort this query + debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id); + metrics.query_aborts.inc(); + return Ok(()); + } + } + } + + // check if this is still active; every 100 rows + if row_count % 100 == 0 && abandon_query_rx.try_recv().is_ok() { + debug!("query cancelled by client (cid: {}, sub: {:?})", client_id, sub.id); + return Ok(()); + } + row_count += 1; + let event_json = row.get(0)?; + loop { + if query_tx.capacity() != 0 { + // we have capacity to add another item + break; + } + // the queue is full + trace!("db reader thread is stalled"); + if last_successful_send + abort_cutoff < Instant::now() { + // the queue has been full for too long, abort + info!("aborting database query due to slow client (cid: {}, sub: {:?})", + client_id, sub.id); + metrics.query_aborts.inc(); + let ok: Result<()> = Ok(()); + return ok; + } + // check if a checkpoint is trying to run, and abort if self.checkpoint_in_progress.try_lock().is_err() { // lock was held, abort this query debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id); metrics.query_aborts.inc(); return Ok(()); } + // give the queue a chance to clear before trying again + thread::sleep(Duration::from_millis(100)); } + // TODO: we could use try_send, but we'd have to juggle + // getting the query result back as part of the error + // result. + query_tx + .blocking_send(QueryResult { + sub_id: sub.get_id(), + event: event_json, + }) + .ok(); + last_successful_send = Instant::now(); } - - // check if this is still active; every 100 rows - if row_count % 100 == 0 && abandon_query_rx.try_recv().is_ok() { - debug!("query cancelled by client (cid: {}, sub: {:?})", client_id, sub.id); - return Ok(()); - } - row_count += 1; - let event_json = row.get(0)?; - loop { - if query_tx.capacity() != 0 { - // we have capacity to add another item - break; - } - // the queue is full - trace!("db reader thread is stalled"); - if last_successful_send + abort_cutoff < Instant::now() { - // the queue has been full for too long, abort - info!("aborting database query due to slow client (cid: {}, sub: {:?})", - client_id, sub.id); - metrics.query_aborts.inc(); - let ok: Result<()> = Ok(()); - return ok; - } - // check if a checkpoint is trying to run, and abort - if self.checkpoint_in_progress.try_lock().is_err() { - // lock was held, abort this query - debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id); - metrics.query_aborts.inc(); - return Ok(()); - } - // give the queue a chance to clear before trying again - thread::sleep(Duration::from_millis(100)); - } - // TODO: we could use try_send, but we'd have to juggle - // getting the query result back as part of the error - // result. - query_tx - .blocking_send(QueryResult { - sub_id: sub.get_id(), - event: event_json, - }) - .ok(); - last_successful_send = Instant::now(); + } else { + warn!("Could not get a database connection for querying"); } - query_tx - .blocking_send(QueryResult { - sub_id: sub.get_id(), - event: "EOSE".to_string(), - }) - .ok(); - debug!( - "query completed in {:?} (cid: {}, sub: {:?}, db_time: {:?}, rows: {})", - pre_spawn_start.elapsed(), - client_id, - sub.id, - start.elapsed(), - row_count - ); - } else { - warn!("Could not get a database connection for querying"); } + debug!( + "query completed in {:?} (cid: {}, sub: {:?}, db_time: {:?}, rows: {})", + pre_spawn_start.elapsed(), + client_id, + sub.id, + start.elapsed(), + row_count + ); + query_tx + .blocking_send(QueryResult { + sub_id: sub.get_id(), + event: "EOSE".to_string(), + }) + .ok(); metrics .query_sub .observe(pre_spawn_start.elapsed().as_secs_f64()); @@ -668,7 +670,7 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec>, Option> = vec![]; return (empty_query, empty_params, None); @@ -677,7 +679,7 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec>, Option> = vec![]; @@ -815,7 +817,7 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec>, Option (String, Vec>, Vec) { +fn _query_from_sub(sub: &Subscription) -> (String, Vec>, Vec) { // build a dynamic SQL query for an entire subscription, based on // SQL subqueries for filters. let mut subqueries: Vec = Vec::new(); diff --git a/src/server.rs b/src/server.rs index 31e5fc6..56a18d8 100644 --- a/src/server.rs +++ b/src/server.rs @@ -719,7 +719,7 @@ async fn nostr_server( match parsed { Ok(e) => { let id_prefix:String = e.id.chars().take(8).collect(); - debug!("successfully parsed/validated event: {:?} (cid: {})", id_prefix, cid); + debug!("successfully parsed/validated event: {:?} (cid: {}, kind: {})", id_prefix, cid, e.kind); // check if the event is too far in the future. if e.is_valid_timestamp(settings.options.reject_future_seconds) { // Write this to the database.