perf: hold database handle through all filters when querying

This commit is contained in:
Greg Heartsfield 2023-02-01 18:09:30 -06:00
parent f7550b4c61
commit bca5614a82
2 changed files with 35 additions and 25 deletions

View File

@ -350,24 +350,24 @@ impl NostrRepo for SqliteRepo {
let slow_cutoff = Duration::from_millis(250);
let mut filter_count = 0;
// remove duplicates from the filter list.
for filter in sub.filters.iter() {
let filter_start = Instant::now();
filter_count += 1;
let (q, p, idx) = query_from_filter(&filter);
let sql_gen_elapsed = start.elapsed();
if let Ok(mut conn) = self.read_pool.get() {
for filter in sub.filters.iter() {
let filter_start = Instant::now();
filter_count += 1;
let (q, p, idx) = query_from_filter(&filter);
let sql_gen_elapsed = start.elapsed();
if sql_gen_elapsed > Duration::from_millis(10) {
debug!("SQL (slow) generated in {:?}", filter_start.elapsed());
}
// any client that doesn't cause us to generate new rows in 5
// seconds gets dropped.
let abort_cutoff = Duration::from_secs(5);
let mut slow_first_event;
let mut last_successful_send = Instant::now();
if let Ok(mut conn) = self.read_pool.get() {
if sql_gen_elapsed > Duration::from_millis(10) {
debug!("SQL (slow) generated in {:?}", filter_start.elapsed());
}
// any client that doesn't cause us to generate new rows in 5
// seconds gets dropped.
let abort_cutoff = Duration::from_secs(5);
let mut slow_first_event;
let mut last_successful_send = Instant::now();
// execute the query.
// make the actual SQL query (with parameters inserted) available
conn.trace(Some(|x| {trace!("SQL trace: {:?}", x)}));
conn.trace(Some(|x| {debug!("SQL trace: {:?}", x)}));
let mut stmt = conn.prepare_cached(&q)?;
let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
@ -445,17 +445,20 @@ impl NostrRepo for SqliteRepo {
.ok();
last_successful_send = Instant::now();
}
} else {
warn!("Could not get a database connection for querying");
}
// if the filter took too much db_time, print out the JSON.
if filter_start.elapsed() > slow_cutoff && client_id.starts_with('0') {
debug!(
"query filter req (slow): {} (cid: {}, sub: {:?}, filter: {})",
serde_json::to_string(&filter)?, client_id, sub.id, filter_count
);
}
metrics
.query_db
.observe(filter_start.elapsed().as_secs_f64());
// if the filter took too much db_time, print out the JSON.
if filter_start.elapsed() > slow_cutoff && client_id.starts_with('0') {
debug!(
"query filter req (slow): {} (cid: {}, sub: {:?}, filter: {})",
serde_json::to_string(&filter)?, client_id, sub.id, filter_count
);
}
}
} else {
warn!("Could not get a database connection for querying");
}
drop(sem); // new query can begin
debug!(

View File

@ -232,6 +232,10 @@ fn create_metrics() -> (Registry, NostrMetrics) {
"nostr_query_seconds",
"Subscription response times",
)).unwrap();
let query_db = Histogram::with_opts(HistogramOpts::new(
"nostr_filter_seconds",
"Filter SQL query times",
)).unwrap();
let write_events = Histogram::with_opts(HistogramOpts::new(
"nostr_events_write_seconds",
"Event writing response times",
@ -265,6 +269,7 @@ fn create_metrics() -> (Registry, NostrMetrics) {
vec!["reason"].as_slice(),
).unwrap();
registry.register(Box::new(query_sub.clone())).unwrap();
registry.register(Box::new(query_db.clone())).unwrap();
registry.register(Box::new(write_events.clone())).unwrap();
registry.register(Box::new(sent_events.clone())).unwrap();
registry.register(Box::new(connections.clone())).unwrap();
@ -275,6 +280,7 @@ fn create_metrics() -> (Registry, NostrMetrics) {
registry.register(Box::new(disconnects.clone())).unwrap();
let metrics = NostrMetrics {
query_sub,
query_db,
write_events,
sent_events,
connections,
@ -830,6 +836,7 @@ async fn nostr_server(
#[derive(Clone)]
pub struct NostrMetrics {
pub query_sub: Histogram, // response time of successful subscriptions
pub query_db: Histogram, // individual database query execution time
pub write_events: Histogram, // response time of event writes
pub sent_events: IntCounterVec, // count of events sent to clients
pub connections: IntCounter, // count of websocket connections