diff --git a/src/db.rs b/src/db.rs index 4f5e1da..6913cce 100644 --- a/src/db.rs +++ b/src/db.rs @@ -655,8 +655,12 @@ pub async fn db_query( log_pool_stats(&pool); // cutoff for displaying slow queries let slow_cutoff = Duration::from_millis(2000); + // any client that doesn't cause us to generate new rows in 5 + // seconds gets dropped. + let abort_cutoff = Duration::from_secs(5); let start = Instant::now(); let mut slow_first_event; + let mut last_successful_send = Instant::now(); if let Ok(conn) = pool.get() { // execute the query. Don't cache, since queries vary so much. let mut stmt = conn.prepare(&q)?; @@ -704,12 +708,33 @@ pub async fn db_query( } row_count += 1; let event_json = row.get(0)?; + loop { + if query_tx.capacity() != 0 { + // we have capacity to add another item + break; + } else { + // the queue is full + trace!("db reader thread is stalled"); + if last_successful_send + abort_cutoff < Instant::now() { + // the queue has been full for too long, abort + info!("aborting database query due to slow client"); + let ok: Result<()> = Ok(()); + return ok; + } + // give the queue a chance to clear before trying again + thread::sleep(Duration::from_millis(100)); + } + } + // TODO: we could use try_send, but we'd have to juggle + // getting the query result back as part of the error + // result. query_tx .blocking_send(QueryResult { sub_id: sub.get_id(), event: event_json, }) .ok(); + last_successful_send = Instant::now(); } query_tx .blocking_send(QueryResult {