feat: perform regular database maintenance (60sec), without blocking main writer thread

This commit is contained in:
Greg Heartsfield 2022-12-22 15:16:21 -06:00
parent 0f68c4e5c2
commit 8ea732cbe5
2 changed files with 34 additions and 12 deletions

View File

@ -96,7 +96,9 @@ pub fn build_pool(
/// Perform normal maintenance /// Perform normal maintenance
pub fn optimize_db(conn: &mut PooledConnection) -> Result<()> { pub fn optimize_db(conn: &mut PooledConnection) -> Result<()> {
let start = Instant::now();
conn.execute_batch("PRAGMA optimize;")?; conn.execute_batch("PRAGMA optimize;")?;
info!("optimize ran in {:?}", start.elapsed());
Ok(()) Ok(())
} }
#[derive(Debug)] #[derive(Debug)]
@ -171,9 +173,6 @@ pub async fn db_writer(
let rps_setting = settings.limits.messages_per_sec; let rps_setting = settings.limits.messages_per_sec;
let mut most_recent_rate_limit = Instant::now(); let mut most_recent_rate_limit = Instant::now();
let mut lim_opt = None; let mut lim_opt = None;
// Keep rough track of events so we can run maintenance
// eventually.
let mut last_maintenance = Instant::now();
// Constant writing has interfered with online backups. Keep // Constant writing has interfered with online backups. Keep
// track of how long since we've given the backups a chance to // track of how long since we've given the backups a chance to
// run. // run.
@ -318,15 +317,6 @@ pub async fn db_writer(
thread::sleep(Duration::from_millis(500)); thread::sleep(Duration::from_millis(500));
backup_pause_counter = 0 backup_pause_counter = 0
} }
// Use this as a trigger to do optimization & checkpointing
if last_maintenance.elapsed() > Duration::from_secs(EVENT_MAINTENANCE_FREQ_SEC) {
last_maintenance = Instant::now();
debug!("running database optimizer");
optimize_db(&mut pool.get()?).ok();
debug!("running wal_checkpoint(TRUNCATE)");
checkpoint_db(&mut pool.get()?).ok();
}
} }
// use rate limit, if defined, and if an event was actually written. // use rate limit, if defined, and if an event was actually written.
@ -686,6 +676,26 @@ fn log_pool_stats(name: &str, pool: &SqlitePool) {
); );
} }
/// Perform database maintenance on a regular basis
pub async fn db_maintenance(pool: SqlitePool) {
tokio::task::spawn(async move {
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(EVENT_MAINTENANCE_FREQ_SEC)) => {
if let Ok(mut conn) = pool.get() {
// set the busy timeout to a larger value (default is 5 seconds).
conn.busy_timeout(Duration::from_secs(60)).ok();
debug!("running database optimizer");
optimize_db(&mut conn).ok();
debug!("running wal_checkpoint(TRUNCATE)");
checkpoint_db(&mut conn).ok();
}
}
};
}
});
}
/// Perform a database query using a subscription. /// Perform a database query using a subscription.
/// ///
/// The [`Subscription`] is converted into a SQL query. Each result /// The [`Subscription`] is converted into a SQL query. Each result

View File

@ -22,6 +22,7 @@ use hyper::upgrade::Upgraded;
use hyper::{ use hyper::{
header, server::conn::AddrStream, upgrade, Body, Request, Response, Server, StatusCode, header, server::conn::AddrStream, upgrade, Body, Request, Response, Server, StatusCode,
}; };
use rusqlite::OpenFlags;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
use std::collections::HashMap; use std::collections::HashMap;
@ -310,6 +311,17 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
} }
} }
} }
// build a connection pool for DB maintenance
let maintenance_pool = db::build_pool(
"maintenance writer",
&settings,
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
1,
1,
false,
);
db::db_maintenance(maintenance_pool).await;
// listen for (external to tokio) shutdown request // listen for (external to tokio) shutdown request
let controlled_shutdown = invoke_shutdown.clone(); let controlled_shutdown = invoke_shutdown.clone();
tokio::spawn(async move { tokio::spawn(async move {