perf: prevent sqlite readers from capturing worker thread pool and impacting writer latency

This commit is contained in:
Greg Heartsfield 2023-01-31 18:09:23 -06:00
parent 214f152c5d
commit 111eb4a10c

View File

@ -16,7 +16,7 @@ use r2d2_sqlite::SqliteConnectionManager;
use rusqlite::params; use rusqlite::params;
use rusqlite::types::ToSql; use rusqlite::types::ToSql;
use rusqlite::OpenFlags; use rusqlite::OpenFlags;
use tokio::sync::{Mutex, MutexGuard}; use tokio::sync::{Mutex, MutexGuard, Semaphore};
use std::fmt::Write as _; use std::fmt::Write as _;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
@ -48,6 +48,8 @@ pub struct SqliteRepo {
checkpoint_in_progress: Arc<Mutex<u64>>, checkpoint_in_progress: Arc<Mutex<u64>>,
/// Flag to limit writer concurrency /// Flag to limit writer concurrency
write_in_progress: Arc<Mutex<u64>>, write_in_progress: Arc<Mutex<u64>>,
/// Semaphore for readers to acquire blocking threads
reader_threads_ready: Arc<Semaphore>,
} }
impl SqliteRepo { impl SqliteRepo {
@ -83,7 +85,10 @@ impl SqliteRepo {
// SQLite can only effectively write single threaded, so don't // SQLite can only effectively write single threaded, so don't
// block multiple worker threads unnecessarily. // block multiple worker threads unnecessarily.
let write_in_progress = Arc::new(Mutex::new(0)); let write_in_progress = Arc::new(Mutex::new(0));
// configure the number of worker threads that can be spawned
// to match the number of database reader connections.
let max_conn = settings.database.max_conn as usize;
let reader_threads_ready = Arc::new(Semaphore::new(max_conn));
SqliteRepo { SqliteRepo {
metrics, metrics,
read_pool, read_pool,
@ -91,6 +96,7 @@ impl SqliteRepo {
maint_pool, maint_pool,
checkpoint_in_progress, checkpoint_in_progress,
write_in_progress, write_in_progress,
reader_threads_ready,
} }
} }
@ -309,6 +315,11 @@ impl NostrRepo for SqliteRepo {
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>, mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
) -> Result<()> { ) -> Result<()> {
let pre_spawn_start = Instant::now(); let pre_spawn_start = Instant::now();
// if we let every request spawn a thread, we'll exhaust the
// thread pool waiting for queries to finish under high load.
// Instead, don't bother spawning threads when they will just
// block on a database connection.
let sem = self.reader_threads_ready.clone().acquire_owned().await.unwrap();
let self=self.clone(); let self=self.clone();
let metrics=self.metrics.clone(); let metrics=self.metrics.clone();
task::spawn_blocking(move || { task::spawn_blocking(move || {
@ -437,6 +448,7 @@ impl NostrRepo for SqliteRepo {
warn!("Could not get a database connection for querying"); warn!("Could not get a database connection for querying");
} }
} }
drop(sem); // new query can begin
debug!( debug!(
"query completed in {:?} (cid: {}, sub: {:?}, db_time: {:?}, rows: {})", "query completed in {:?} (cid: {}, sub: {:?}, db_time: {:?}, rows: {})",
pre_spawn_start.elapsed(), pre_spawn_start.elapsed(),
@ -896,7 +908,7 @@ pub fn build_pool(
/// Perform database WAL checkpoint on a regular basis /// Perform database WAL checkpoint on a regular basis
pub async fn db_checkpoint_task(pool: SqlitePool, frequency: Duration, checkpoint_in_progress: Arc<Mutex<u64>>) -> Result<()> { pub async fn db_checkpoint_task(pool: SqlitePool, frequency: Duration, checkpoint_in_progress: Arc<Mutex<u64>>) -> Result<()> {
// TODO; use acquire_many on the reader semaphore to stop them from interrupting this.
tokio::task::spawn(async move { tokio::task::spawn(async move {
// WAL size in pages. // WAL size in pages.
let mut current_wal_size = 0; let mut current_wal_size = 0;