improvement: prometheus metrics for aborted queries

This commit is contained in:
Greg Heartsfield 2023-01-28 16:05:58 -06:00
parent 0859e535ed
commit 34db91940c
3 changed files with 14 additions and 2 deletions

View File

@ -209,6 +209,7 @@ ON CONFLICT (id) DO NOTHING"#,
) -> Result<()> { ) -> Result<()> {
let start = Instant::now(); let start = Instant::now();
let mut row_count: usize = 0; let mut row_count: usize = 0;
let metrics = &self.metrics;
for filter in sub.filters.iter() { for filter in sub.filters.iter() {
let start = Instant::now(); let start = Instant::now();
@ -272,7 +273,7 @@ ON CONFLICT (id) DO NOTHING"#,
// check if this is still active; every 100 rows // check if this is still active; every 100 rows
if row_count % 100 == 0 && abandon_query_rx.try_recv().is_ok() { if row_count % 100 == 0 && abandon_query_rx.try_recv().is_ok() {
debug!("query aborted (cid: {}, sub: {:?})", client_id, sub.id); debug!("query cancelled by client (cid: {}, sub: {:?})", client_id, sub.id);
return Ok(()); return Ok(());
} }
@ -288,6 +289,7 @@ ON CONFLICT (id) DO NOTHING"#,
if last_successful_send + abort_cutoff < Instant::now() { if last_successful_send + abort_cutoff < Instant::now() {
// the queue has been full for too long, abort // the queue has been full for too long, abort
info!("aborting database query due to slow client"); info!("aborting database query due to slow client");
metrics.query_aborts.inc();
return Ok(()); return Ok(());
} }
// give the queue a chance to clear before trying again // give the queue a chance to clear before trying again

View File

@ -379,6 +379,7 @@ impl NostrRepo for SqliteRepo {
if self.checkpoint_in_progress.try_lock().is_err() { if self.checkpoint_in_progress.try_lock().is_err() {
// lock was held, abort this query // lock was held, abort this query
debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id); debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id);
metrics.query_aborts.inc();
return Ok(()); return Ok(());
} }
} }
@ -386,7 +387,7 @@ impl NostrRepo for SqliteRepo {
// check if this is still active; every 100 rows // check if this is still active; every 100 rows
if row_count % 100 == 0 && abandon_query_rx.try_recv().is_ok() { if row_count % 100 == 0 && abandon_query_rx.try_recv().is_ok() {
debug!("query aborted (cid: {}, sub: {:?})", client_id, sub.id); debug!("query cancelled by client (cid: {}, sub: {:?})", client_id, sub.id);
return Ok(()); return Ok(());
} }
row_count += 1; row_count += 1;
@ -402,6 +403,7 @@ impl NostrRepo for SqliteRepo {
// the queue has been full for too long, abort // the queue has been full for too long, abort
info!("aborting database query due to slow client (cid: {}, sub: {:?})", info!("aborting database query due to slow client (cid: {}, sub: {:?})",
client_id, sub.id); client_id, sub.id);
metrics.query_aborts.inc();
let ok: Result<()> = Ok(()); let ok: Result<()> = Ok(());
return ok; return ok;
} }
@ -409,6 +411,7 @@ impl NostrRepo for SqliteRepo {
if self.checkpoint_in_progress.try_lock().is_err() { if self.checkpoint_in_progress.try_lock().is_err() {
// lock was held, abort this query // lock was held, abort this query
debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id); debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id);
metrics.query_aborts.inc();
return Ok(()); return Ok(());
} }
// give the queue a chance to clear before trying again // give the queue a chance to clear before trying again

View File

@ -325,13 +325,19 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
"connections", "connections",
"New connections" "New connections"
)).unwrap(); )).unwrap();
let query_aborts = IntCounter::with_opts(Opts::new(
"query_abort",
"Aborted queries"
)).unwrap();
registry.register(Box::new(query_sub.clone())).unwrap(); registry.register(Box::new(query_sub.clone())).unwrap();
registry.register(Box::new(write_events.clone())).unwrap(); registry.register(Box::new(write_events.clone())).unwrap();
registry.register(Box::new(connections.clone())).unwrap(); registry.register(Box::new(connections.clone())).unwrap();
registry.register(Box::new(query_aborts.clone())).unwrap();
let metrics = NostrMetrics { let metrics = NostrMetrics {
query_sub, query_sub,
write_events, write_events,
connections, connections,
query_aborts,
}; };
// build a repository for events // build a repository for events
let repo = db::build_repo(&settings, metrics.clone()).await; let repo = db::build_repo(&settings, metrics.clone()).await;
@ -778,4 +784,5 @@ pub struct NostrMetrics {
pub query_sub: Histogram, pub query_sub: Histogram,
pub write_events: Histogram, pub write_events: Histogram,
pub connections: IntCounter, pub connections: IntCounter,
pub query_aborts: IntCounter,
} }