From b23b3ce8ec4316005fda0c7e8fa3eec17ec7561c Mon Sep 17 00:00:00 2001 From: Greg Heartsfield Date: Tue, 27 Dec 2022 09:48:07 -0600 Subject: [PATCH] improvement: block new readers when WAL is large --- src/db.rs | 10 ++++++++-- src/server.rs | 22 +++++++++++++++++++--- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/src/db.rs b/src/db.rs index 703fced..66718d1 100644 --- a/src/db.rs +++ b/src/db.rs @@ -19,8 +19,10 @@ use r2d2_sqlite::SqliteConnectionManager; use rusqlite::params; use rusqlite::types::ToSql; use rusqlite::OpenFlags; +use tokio::sync::{Mutex, MutexGuard}; use std::fmt::Write as _; use std::path::Path; +use std::sync::Arc; use std::thread; use std::time::Duration; use std::time::Instant; @@ -691,7 +693,7 @@ fn log_pool_stats(name: &str, pool: &SqlitePool) { /// Perform database maintenance on a regular basis -pub async fn db_optimize(pool: SqlitePool) { +pub async fn db_optimize_task(pool: SqlitePool) { tokio::task::spawn(async move { loop { tokio::select! { @@ -710,7 +712,7 @@ pub async fn db_optimize(pool: SqlitePool) { } /// Perform database WAL checkpoint on a regular basis -pub async fn db_checkpoint(pool: SqlitePool) { +pub async fn db_checkpoint_task(pool: SqlitePool, safe_to_read: Arc>) { tokio::task::spawn(async move { // WAL size in pages. let mut current_wal_size = 0; @@ -724,6 +726,7 @@ pub async fn db_checkpoint(pool: SqlitePool) { tokio::select! { _ = tokio::time::sleep(Duration::from_secs(CHECKPOINT_FREQ_SEC)) => { if let Ok(mut conn) = pool.get() { + let mut _guard:Option> = None; // the busy timer will block writers, so don't set // this any higher than you want max latency for event // writes. @@ -732,6 +735,9 @@ pub async fn db_checkpoint(pool: SqlitePool) { } else { // if the wal size has exceeded a threshold, increase the busy timeout. conn.busy_timeout(busy_wait_default_long).ok(); + // take a lock that will prevent new readers. + info!("blocking new readers to perform wal_checkpoint"); + _guard = Some(safe_to_read.lock().await); } debug!("running wal_checkpoint(TRUNCATE)"); if let Ok(new_size) = checkpoint_db(&mut conn) { diff --git a/src/server.rs b/src/server.rs index 4b9b4ea..ee9e078 100644 --- a/src/server.rs +++ b/src/server.rs @@ -25,10 +25,12 @@ use hyper::{ use rusqlite::OpenFlags; use serde::{Deserialize, Serialize}; use serde_json::json; +use tokio::sync::Mutex; use std::collections::HashMap; use std::convert::Infallible; use std::net::SocketAddr; use std::path::Path; +use std::sync::Arc; use std::sync::atomic::Ordering; use std::sync::mpsc::Receiver as MpscReceiver; use std::time::Duration; @@ -54,6 +56,7 @@ async fn handle_web_request( broadcast: Sender, event_tx: tokio::sync::mpsc::Sender, shutdown: Receiver<()>, + safe_to_read: Arc>, ) -> Result, Infallible> { match ( request.uri().path(), @@ -114,6 +117,7 @@ async fn handle_web_request( broadcast, event_tx, shutdown, + safe_to_read, )); } // todo: trace, don't print... @@ -328,8 +332,13 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result 2, false, ); - db::db_optimize(maintenance_pool.clone()).await; - db::db_checkpoint(maintenance_pool).await; + + // Create a mutex that will block readers, so that a + // checkpoint can be performed quickly. + let safe_to_read = Arc::new(Mutex::new(0)); + + db::db_optimize_task(maintenance_pool.clone()).await; + db::db_checkpoint_task(maintenance_pool, safe_to_read.clone()).await; // listen for (external to tokio) shutdown request let controlled_shutdown = invoke_shutdown.clone(); @@ -378,6 +387,7 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result let event = event_tx.clone(); let stop = invoke_shutdown.clone(); let settings = settings.clone(); + let safe_to_read = safe_to_read.clone(); async move { // service_fn converts our function into a `Service` Ok::<_, Infallible>(service_fn(move |request: Request| { @@ -389,6 +399,7 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result bcast.clone(), event.clone(), stop.subscribe(), + safe_to_read.clone(), ) })) } @@ -465,6 +476,7 @@ async fn nostr_server( broadcast: Sender, event_tx: mpsc::Sender, mut shutdown: Receiver<()>, + safe_to_read: Arc>, ) { // the time this websocket nostr server started let orig_start = Instant::now(); @@ -674,7 +686,11 @@ async fn nostr_server( if let Some(previous_query) = running_queries.insert(s.id.to_owned(), abandon_query_tx) { previous_query.send(()).ok(); } - if s.needs_historical_events() { + if s.needs_historical_events() { + { + // acquire and immediately release lock; this ensures we do not start new queries during a wal checkpoint. + let _ = safe_to_read.lock().await; + } // start a database query. this spawns a blocking database query on a worker thread. db::db_query(s, cid.to_owned(), pool.clone(), query_tx.clone(), abandon_query_rx).await; }