improvement: move db pool operations closer to query, do not panic on failure

This commit is contained in:
Greg Heartsfield 2022-02-23 16:38:16 -06:00
parent 9c1b21cbfe
commit 53990672ae
2 changed files with 35 additions and 31 deletions

View File

@ -503,7 +503,7 @@ fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
/// query is immediately aborted. /// query is immediately aborted.
pub async fn db_query( pub async fn db_query(
sub: Subscription, sub: Subscription,
conn: PooledConnection, pool: SqlitePool,
query_tx: tokio::sync::mpsc::Sender<QueryResult>, query_tx: tokio::sync::mpsc::Sender<QueryResult>,
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>, mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
) { ) {
@ -514,36 +514,42 @@ pub async fn db_query(
// generate SQL query // generate SQL query
let (q, p) = query_from_sub(&sub); let (q, p) = query_from_sub(&sub);
debug!("SQL generated in {:?}", start.elapsed()); debug!("SQL generated in {:?}", start.elapsed());
// show pool stats
debug!("DB pool stats: {:?}", pool.state());
let start = Instant::now(); let start = Instant::now();
// execute the query. Don't cache, since queries vary so much. if let Ok(conn) = pool.get() {
let mut stmt = conn.prepare(&q)?; // execute the query. Don't cache, since queries vary so much.
let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?; let mut stmt = conn.prepare(&q)?;
let mut first_result = true; let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
while let Some(row) = event_rows.next()? { let mut first_result = true;
if first_result { while let Some(row) = event_rows.next()? {
debug!("time to first result: {:?}", start.elapsed()); if first_result {
first_result = false; debug!("time to first result: {:?}", start.elapsed());
first_result = false;
}
// check if this is still active
// TODO: check every N rows
if abandon_query_rx.try_recv().is_ok() {
debug!("query aborted");
return Ok(());
}
row_count += 1;
let event_json = row.get(0)?;
query_tx
.blocking_send(QueryResult {
sub_id: sub.get_id(),
event: event_json,
})
.ok();
} }
// check if this is still active debug!(
// TODO: check every N rows "query completed ({} rows) in {:?}",
if abandon_query_rx.try_recv().is_ok() { row_count,
debug!("query aborted"); start.elapsed()
return Ok(()); );
} } else {
row_count += 1; warn!("Could not get a database connection for querying");
let event_json = row.get(0)?;
query_tx
.blocking_send(QueryResult {
sub_id: sub.get_id(),
event: event_json,
})
.ok();
} }
debug!(
"query completed ({} rows) in {:?}",
row_count,
start.elapsed()
);
let ok: Result<()> = Ok(()); let ok: Result<()> = Ok(());
ok ok
}); });

View File

@ -519,9 +519,7 @@ async fn nostr_server(
previous_query.send(()).ok(); previous_query.send(()).ok();
} }
// start a database query // start a database query
// show pool stats db::db_query(s, pool.clone(), query_tx.clone(), abandon_query_rx).await;
debug!("DB pool stats: {:?}", pool.state());
db::db_query(s, pool.get().expect("could not get connection"), query_tx.clone(), abandon_query_rx).await;
}, },
Err(e) => { Err(e) => {
info!("Subscription error: {}", e); info!("Subscription error: {}", e);