diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index de2a4fd..1991be2 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -209,6 +209,7 @@ ON CONFLICT (id) DO NOTHING"#, ) -> Result<()> { let start = Instant::now(); let mut row_count: usize = 0; + let metrics = &self.metrics; for filter in sub.filters.iter() { let start = Instant::now(); @@ -272,7 +273,7 @@ ON CONFLICT (id) DO NOTHING"#, // check if this is still active; every 100 rows if row_count % 100 == 0 && abandon_query_rx.try_recv().is_ok() { - debug!("query aborted (cid: {}, sub: {:?})", client_id, sub.id); + debug!("query cancelled by client (cid: {}, sub: {:?})", client_id, sub.id); return Ok(()); } @@ -288,6 +289,7 @@ ON CONFLICT (id) DO NOTHING"#, if last_successful_send + abort_cutoff < Instant::now() { // the queue has been full for too long, abort info!("aborting database query due to slow client"); + metrics.query_aborts.inc(); return Ok(()); } // give the queue a chance to clear before trying again diff --git a/src/repo/sqlite.rs b/src/repo/sqlite.rs index 717542b..bdd21ac 100644 --- a/src/repo/sqlite.rs +++ b/src/repo/sqlite.rs @@ -379,6 +379,7 @@ impl NostrRepo for SqliteRepo { if self.checkpoint_in_progress.try_lock().is_err() { // lock was held, abort this query debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id); + metrics.query_aborts.inc(); return Ok(()); } } @@ -386,7 +387,7 @@ impl NostrRepo for SqliteRepo { // check if this is still active; every 100 rows if row_count % 100 == 0 && abandon_query_rx.try_recv().is_ok() { - debug!("query aborted (cid: {}, sub: {:?})", client_id, sub.id); + debug!("query cancelled by client (cid: {}, sub: {:?})", client_id, sub.id); return Ok(()); } row_count += 1; @@ -402,6 +403,7 @@ impl NostrRepo for SqliteRepo { // the queue has been full for too long, abort info!("aborting database query due to slow client (cid: {}, sub: {:?})", client_id, sub.id); + metrics.query_aborts.inc(); let ok: Result<()> = Ok(()); return ok; } @@ -409,6 +411,7 @@ impl NostrRepo for SqliteRepo { if self.checkpoint_in_progress.try_lock().is_err() { // lock was held, abort this query debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id); + metrics.query_aborts.inc(); return Ok(()); } // give the queue a chance to clear before trying again diff --git a/src/server.rs b/src/server.rs index c6f3324..9fffd07 100644 --- a/src/server.rs +++ b/src/server.rs @@ -325,13 +325,19 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul "connections", "New connections" )).unwrap(); + let query_aborts = IntCounter::with_opts(Opts::new( + "query_abort", + "Aborted queries" + )).unwrap(); registry.register(Box::new(query_sub.clone())).unwrap(); registry.register(Box::new(write_events.clone())).unwrap(); registry.register(Box::new(connections.clone())).unwrap(); + registry.register(Box::new(query_aborts.clone())).unwrap(); let metrics = NostrMetrics { query_sub, write_events, connections, + query_aborts, }; // build a repository for events let repo = db::build_repo(&settings, metrics.clone()).await; @@ -778,4 +784,5 @@ pub struct NostrMetrics { pub query_sub: Histogram, pub write_events: Histogram, pub connections: IntCounter, + pub query_aborts: IntCounter, }