diff --git a/src/db.rs b/src/db.rs index a922a94..15b594b 100644 --- a/src/db.rs +++ b/src/db.rs @@ -96,7 +96,9 @@ pub fn build_pool( /// Perform normal maintenance pub fn optimize_db(conn: &mut PooledConnection) -> Result<()> { + let start = Instant::now(); conn.execute_batch("PRAGMA optimize;")?; + info!("optimize ran in {:?}", start.elapsed()); Ok(()) } #[derive(Debug)] @@ -171,9 +173,6 @@ pub async fn db_writer( let rps_setting = settings.limits.messages_per_sec; let mut most_recent_rate_limit = Instant::now(); let mut lim_opt = None; - // Keep rough track of events so we can run maintenance - // eventually. - let mut last_maintenance = Instant::now(); // Constant writing has interfered with online backups. Keep // track of how long since we've given the backups a chance to // run. @@ -318,15 +317,6 @@ pub async fn db_writer( thread::sleep(Duration::from_millis(500)); backup_pause_counter = 0 } - - // Use this as a trigger to do optimization & checkpointing - if last_maintenance.elapsed() > Duration::from_secs(EVENT_MAINTENANCE_FREQ_SEC) { - last_maintenance = Instant::now(); - debug!("running database optimizer"); - optimize_db(&mut pool.get()?).ok(); - debug!("running wal_checkpoint(TRUNCATE)"); - checkpoint_db(&mut pool.get()?).ok(); - } } // use rate limit, if defined, and if an event was actually written. @@ -686,6 +676,26 @@ fn log_pool_stats(name: &str, pool: &SqlitePool) { ); } +/// Perform database maintenance on a regular basis +pub async fn db_maintenance(pool: SqlitePool) { + tokio::task::spawn(async move { + loop { + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(EVENT_MAINTENANCE_FREQ_SEC)) => { + if let Ok(mut conn) = pool.get() { + // set the busy timeout to a larger value (default is 5 seconds). + conn.busy_timeout(Duration::from_secs(60)).ok(); + debug!("running database optimizer"); + optimize_db(&mut conn).ok(); + debug!("running wal_checkpoint(TRUNCATE)"); + checkpoint_db(&mut conn).ok(); + } + } + }; + } + }); +} + /// Perform a database query using a subscription. /// /// The [`Subscription`] is converted into a SQL query. Each result diff --git a/src/server.rs b/src/server.rs index 644566c..43ea09b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -22,6 +22,7 @@ use hyper::upgrade::Upgraded; use hyper::{ header, server::conn::AddrStream, upgrade, Body, Request, Response, Server, StatusCode, }; +use rusqlite::OpenFlags; use serde::{Deserialize, Serialize}; use serde_json::json; use std::collections::HashMap; @@ -310,6 +311,17 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result } } } + // build a connection pool for DB maintenance + let maintenance_pool = db::build_pool( + "maintenance writer", + &settings, + OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE, + 1, + 1, + false, + ); + db::db_maintenance(maintenance_pool).await; + // listen for (external to tokio) shutdown request let controlled_shutdown = invoke_shutdown.clone(); tokio::spawn(async move {