//! Event persistence and querying //use crate::config::SETTINGS; use crate::config::Settings; use crate::db::QueryResult; use crate::error::Result; use crate::error::Error::SqlError; use crate::event::{single_char_tagname, Event}; use crate::hexrange::hex_range; use crate::hexrange::HexSearch; use crate::nip05::{Nip05Name, VerificationRecord}; use crate::repo::sqlite_migration::{upgrade_db, STARTUP_SQL}; use crate::server::NostrMetrics; use crate::subscription::{ReqFilter, Subscription}; use crate::utils::{is_hex, unix_time}; use async_trait::async_trait; use hex; use r2d2; use r2d2_sqlite::SqliteConnectionManager; use rusqlite::params; use rusqlite::types::ToSql; use rusqlite::OpenFlags; use std::fmt::Write as _; use std::path::Path; use std::sync::Arc; use std::thread; use std::time::Duration; use std::time::Instant; use tokio::sync::{Mutex, MutexGuard, Semaphore}; use tokio::task; use tracing::{debug, info, trace, warn}; use crate::repo::{now_jitter, NostrRepo}; pub type SqlitePool = r2d2::Pool; pub type PooledConnection = r2d2::PooledConnection; pub const DB_FILE: &str = "nostr.db"; #[derive(Clone)] pub struct SqliteRepo { /// Metrics metrics: NostrMetrics, /// Pool for reading events and NIP-05 status read_pool: SqlitePool, /// Pool for writing events and NIP-05 verification write_pool: SqlitePool, /// Pool for performing checkpoints/optimization maint_pool: SqlitePool, /// Flag to indicate a checkpoint is underway checkpoint_in_progress: Arc>, /// Flag to limit writer concurrency write_in_progress: Arc>, /// Semaphore for readers to acquire blocking threads reader_threads_ready: Arc, } impl SqliteRepo { // build all the pools needed #[must_use] pub fn new(settings: &Settings, metrics: NostrMetrics) -> SqliteRepo { let write_pool = build_pool( "writer", settings, OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE, 1, 2, false, ); let maint_pool = build_pool( "maintenance", settings, OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE, 1, 2, true, ); let read_pool = build_pool( "reader", settings, OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE, settings.database.min_conn, settings.database.max_conn, true, ); // this is used to block new reads during critical checkpoints let checkpoint_in_progress = Arc::new(Mutex::new(0)); // SQLite can only effectively write single threaded, so don't // block multiple worker threads unnecessarily. let write_in_progress = Arc::new(Mutex::new(0)); // configure the number of worker threads that can be spawned // to match the number of database reader connections. let max_conn = settings.database.max_conn as usize; let reader_threads_ready = Arc::new(Semaphore::new(max_conn)); SqliteRepo { metrics, read_pool, write_pool, maint_pool, checkpoint_in_progress, write_in_progress, reader_threads_ready, } } /// Persist an event to the database, returning rows added. pub fn persist_event(conn: &mut PooledConnection, e: &Event) -> Result { // enable auto vacuum conn.execute_batch("pragma auto_vacuum = FULL")?; // start transaction let tx = conn.transaction()?; // get relevant fields from event and convert to blobs. let id_blob = hex::decode(&e.id).ok(); let pubkey_blob: Option> = hex::decode(&e.pubkey).ok(); let delegator_blob: Option> = e.delegated_by.as_ref().and_then(|d| hex::decode(d).ok()); let event_str = serde_json::to_string(&e).ok(); // check for replaceable events that would hide this one; we won't even attempt to insert these. if e.is_replaceable() { let repl_count = tx.query_row( "SELECT e.id FROM event e INDEXED BY author_index WHERE e.author=? AND e.kind=? AND e.created_at >= ? LIMIT 1;", params![pubkey_blob, e.kind, e.created_at], |row| row.get::(0)); if repl_count.ok().is_some() { return Ok(0); } } // check for parameterized replaceable events that would be hidden; don't insert these either. if let Some(d_tag) = e.distinct_param() { let repl_count = tx.query_row( "SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND e.kind=? AND t.name='d' AND t.value=? AND e.created_at >= ? LIMIT 1;", params![pubkey_blob, e.kind, d_tag, e.created_at],|row| row.get::(0)); // if any rows were returned, then some newer event with // the same author/kind/tag value exist, and we can ignore // this event. if repl_count.ok().is_some() { return Ok(0); } } // ignore if the event hash is a duplicate. let mut ins_count = tx.execute( "INSERT OR IGNORE INTO event (event_hash, created_at, expires_at, kind, author, delegated_by, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, strftime('%s','now'), FALSE);", params![id_blob, e.created_at, e.expiration(), e.kind, pubkey_blob, delegator_blob, event_str] )? as u64; if ins_count == 0 { // if the event was a duplicate, no need to insert event or // pubkey references. tx.rollback().ok(); return Ok(ins_count); } // remember primary key of the event most recently inserted. let ev_id = tx.last_insert_rowid(); // add all tags to the tag table for tag in &e.tags { // ensure we have 2 values. if tag.len() >= 2 { let tagname = &tag[0]; let tagval = &tag[1]; // only single-char tags are searchable let tagchar_opt = single_char_tagname(tagname); match &tagchar_opt { Some(_) => { tx.execute( "INSERT OR IGNORE INTO tag (event_id, name, value, kind, created_at) VALUES (?1, ?2, ?3, ?4, ?5)", params![ev_id, &tagname, &tagval, e.kind, e.created_at], )?; } None => {} } } } // if this event is replaceable update, remove other replaceable // event with the same kind from the same author that was issued // earlier than this. if e.is_replaceable() { let author = hex::decode(&e.pubkey).ok(); // this is a backwards check - hide any events that were older. let update_count = tx.execute( "DELETE FROM event WHERE kind=? and author=? and id NOT IN (SELECT id FROM event INDEXED BY author_kind_index WHERE kind=? AND author=? ORDER BY created_at DESC LIMIT 1)", params![e.kind, author, e.kind, author], )?; if update_count > 0 { info!( "removed {} older replaceable kind {} events for author: {:?}", update_count, e.kind, e.get_author_prefix() ); } } // if this event is parameterized replaceable, remove other events. if let Some(d_tag) = e.distinct_param() { let update_count = tx.execute( "DELETE FROM event WHERE kind=? AND author=? AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=? AND e.author=? AND t.name='d' AND t.value=? ORDER BY t.created_at DESC LIMIT -1 OFFSET 1);", params![e.kind, pubkey_blob, e.kind, pubkey_blob, d_tag])?; if update_count > 0 { info!( "removed {} older parameterized replaceable kind {} events for author: {:?}", update_count, e.kind, e.get_author_prefix() ); } } // if this event is a deletion, hide the referenced events from the same author. if e.kind == 5 { let event_candidates = e.tag_values_by_name("e"); // first parameter will be author let mut params: Vec> = vec![Box::new(hex::decode(&e.pubkey)?)]; event_candidates .iter() .filter(|x| is_hex(x) && x.len() == 64) .filter_map(|x| hex::decode(x).ok()) .for_each(|x| params.push(Box::new(x))); let query = format!( "UPDATE event SET hidden=TRUE WHERE kind!=5 AND author=? AND event_hash IN ({})", repeat_vars(params.len() - 1) ); let mut stmt = tx.prepare(&query)?; let update_count = stmt.execute(rusqlite::params_from_iter(params))?; info!( "hid {} deleted events for author {:?}", update_count, e.get_author_prefix() ); } else { // check if a deletion has already been recorded for this event. // Only relevant for non-deletion events let del_count = tx.query_row( "SELECT e.id FROM event e WHERE e.author=? AND e.id IN (SELECT t.event_id FROM tag t WHERE t.name='e' AND t.kind=5 AND t.value=?) LIMIT 1;", params![pubkey_blob, e.id], |row| row.get::(0)); // check if a the query returned a result, meaning we should // hid the current event if del_count.ok().is_some() { // a deletion already existed, mark original event as hidden. info!( "hid event: {:?} due to existing deletion by author: {:?}", e.get_event_id_prefix(), e.get_author_prefix() ); let _update_count = tx.execute("UPDATE event SET hidden=TRUE WHERE id=?", params![ev_id])?; // event was deleted, so let caller know nothing new // arrived, preventing this from being sent to active // subscriptions ins_count = 0; } } tx.commit()?; Ok(ins_count) } } #[async_trait] impl NostrRepo for SqliteRepo { async fn start(&self) -> Result<()> { db_checkpoint_task( self.maint_pool.clone(), Duration::from_secs(60), self.write_in_progress.clone(), self.checkpoint_in_progress.clone() ).await?; cleanup_expired( self.maint_pool.clone(), Duration::from_secs(600), self.write_in_progress.clone() ).await } async fn migrate_up(&self) -> Result { let _write_guard = self.write_in_progress.lock().await; let mut conn = self.write_pool.get()?; task::spawn_blocking(move || upgrade_db(&mut conn)).await? } /// Persist event to database async fn write_event(&self, e: &Event) -> Result { let start = Instant::now(); let max_write_attempts = 10; let mut attempts = 0; let _write_guard = self.write_in_progress.lock().await; // spawn a blocking thread //let mut conn = self.write_pool.get()?; let pool = self.write_pool.clone(); let e = e.clone(); let event_count = task::spawn_blocking(move || { let mut conn = pool.get()?; // this could fail because the database was busy; try // multiple times before giving up. loop { attempts+=1; let wr = SqliteRepo::persist_event(&mut conn, &e); match wr { Err(SqlError(rusqlite::Error::SqliteFailure(e,_))) => { // this basically means that NIP-05 or another // writer was using the database between us // reading and promoting the connection to a // write lock. info!("event write failed, DB locked (attempt: {}); sqlite err: {}", attempts, e.extended_code); }, _ => {return wr;}, } if attempts >= max_write_attempts { return wr; } } }).await?; self.metrics .write_events .observe(start.elapsed().as_secs_f64()); event_count } /// Perform a database query using a subscription. /// /// The [`Subscription`] is converted into a SQL query. Each result /// is published on the `query_tx` channel as it is returned. If a /// message becomes available on the `abandon_query_rx` channel, the /// query is immediately aborted. async fn query_subscription( &self, sub: Subscription, client_id: String, query_tx: tokio::sync::mpsc::Sender, mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>, ) -> Result<()> { let pre_spawn_start = Instant::now(); // if we let every request spawn a thread, we'll exhaust the // thread pool waiting for queries to finish under high load. // Instead, don't bother spawning threads when they will just // block on a database connection. let sem = self .reader_threads_ready .clone() .acquire_owned() .await .unwrap(); let self = self.clone(); let metrics = self.metrics.clone(); task::spawn_blocking(move || { { // if we are waiting on a checkpoint, stop until it is complete let _x = self.checkpoint_in_progress.blocking_lock(); } let db_queue_time = pre_spawn_start.elapsed(); // if the queue time was very long (>5 seconds), spare the DB and abort. if db_queue_time > Duration::from_secs(5) { info!( "shedding DB query load queued for {:?} (cid: {}, sub: {:?})", db_queue_time, client_id, sub.id ); metrics.query_aborts.with_label_values(&["loadshed"]).inc(); return Ok(()); } // otherwise, report queuing time if it is slow else if db_queue_time > Duration::from_secs(1) { debug!( "(slow) DB query queued for {:?} (cid: {}, sub: {:?})", db_queue_time, client_id, sub.id ); } // check before getting a DB connection if the client still wants the results if abandon_query_rx.try_recv().is_ok() { debug!( "query cancelled by client (before execution) (cid: {}, sub: {:?})", client_id, sub.id ); return Ok(()); } let start = Instant::now(); let mut row_count: usize = 0; // cutoff for displaying slow queries let slow_cutoff = Duration::from_millis(250); let mut filter_count = 0; // remove duplicates from the filter list. if let Ok(mut conn) = self.read_pool.get() { { let pool_state = self.read_pool.state(); metrics .db_connections .set((pool_state.connections - pool_state.idle_connections).into()); } for filter in sub.filters.iter() { let filter_start = Instant::now(); filter_count += 1; let sql_gen_elapsed = filter_start.elapsed(); let (q, p, idx) = query_from_filter(filter); if sql_gen_elapsed > Duration::from_millis(10) { debug!("SQL (slow) generated in {:?}", filter_start.elapsed()); } // any client that doesn't cause us to generate new rows in 2 // seconds gets dropped. let abort_cutoff = Duration::from_secs(2); let mut slow_first_event; let mut last_successful_send = Instant::now(); // execute the query. // make the actual SQL query (with parameters inserted) available conn.trace(Some(|x| trace!("SQL trace: {:?}", x))); let mut stmt = conn.prepare_cached(&q)?; let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?; let mut first_result = true; while let Some(row) = event_rows.next()? { let first_event_elapsed = filter_start.elapsed(); slow_first_event = first_event_elapsed >= slow_cutoff; if first_result { debug!( "first result in {:?} (cid: {}, sub: {:?}, filter: {}) [used index: {:?}]", first_event_elapsed, client_id, sub.id, filter_count, idx ); // logging for slow queries; show filter and SQL. // to reduce logging; only show 1/16th of clients (leading 0) if slow_first_event && client_id.starts_with('0') { debug!( "filter first result in {:?} (slow): {} (cid: {}, sub: {:?})", first_event_elapsed, serde_json::to_string(&filter)?, client_id, sub.id ); } first_result = false; } // check if a checkpoint is trying to run, and abort if row_count % 100 == 0 { { if self.checkpoint_in_progress.try_lock().is_err() { // lock was held, abort this query debug!( "query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id ); metrics .query_aborts .with_label_values(&["checkpoint"]) .inc(); return Ok(()); } } } // check if this is still active; every 100 rows if row_count % 100 == 0 && abandon_query_rx.try_recv().is_ok() { debug!( "query cancelled by client (cid: {}, sub: {:?})", client_id, sub.id ); return Ok(()); } row_count += 1; let event_json = row.get(0)?; loop { if query_tx.capacity() != 0 { // we have capacity to add another item break; } // the queue is full trace!("db reader thread is stalled"); if last_successful_send + abort_cutoff < Instant::now() { // the queue has been full for too long, abort info!("aborting database query due to slow client (cid: {}, sub: {:?})", client_id, sub.id); metrics .query_aborts .with_label_values(&["slowclient"]) .inc(); let ok: Result<()> = Ok(()); return ok; } // check if a checkpoint is trying to run, and abort if self.checkpoint_in_progress.try_lock().is_err() { // lock was held, abort this query debug!( "query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id ); metrics .query_aborts .with_label_values(&["checkpoint"]) .inc(); return Ok(()); } // give the queue a chance to clear before trying again debug!( "query thread sleeping due to full query_tx (cid: {}, sub: {:?})", client_id, sub.id ); thread::sleep(Duration::from_millis(500)); } // TODO: we could use try_send, but we'd have to juggle // getting the query result back as part of the error // result. query_tx .blocking_send(QueryResult { sub_id: sub.get_id(), event: event_json, }) .ok(); last_successful_send = Instant::now(); } metrics .query_db .observe(filter_start.elapsed().as_secs_f64()); // if the filter took too much db_time, print out the JSON. if filter_start.elapsed() > slow_cutoff && client_id.starts_with('0') { debug!( "query filter req (slow): {} (cid: {}, sub: {:?}, filter: {})", serde_json::to_string(&filter)?, client_id, sub.id, filter_count ); } } } else { warn!("Could not get a database connection for querying"); } drop(sem); // new query can begin debug!( "query completed in {:?} (cid: {}, sub: {:?}, db_time: {:?}, rows: {})", pre_spawn_start.elapsed(), client_id, sub.id, start.elapsed(), row_count ); query_tx .blocking_send(QueryResult { sub_id: sub.get_id(), event: "EOSE".to_string(), }) .ok(); metrics .query_sub .observe(pre_spawn_start.elapsed().as_secs_f64()); let ok: Result<()> = Ok(()); ok }); Ok(()) } /// Perform normal maintenance async fn optimize_db(&self) -> Result<()> { let conn = self.write_pool.get()?; task::spawn_blocking(move || { let start = Instant::now(); conn.execute_batch("PRAGMA optimize;").ok(); info!("optimize ran in {:?}", start.elapsed()); }) .await?; Ok(()) } /// Create a new verification record connected to a specific event async fn create_verification_record(&self, event_id: &str, name: &str) -> Result<()> { let e = hex::decode(event_id).ok(); let n = name.to_owned(); let mut conn = self.write_pool.get()?; let _write_guard = self.write_in_progress.lock().await; tokio::task::spawn_blocking(move || { let tx = conn.transaction()?; { // if we create a /new/ one, we should get rid of any old ones. or group the new ones by name and only consider the latest. let query = "INSERT INTO user_verification (metadata_event, name, verified_at) VALUES ((SELECT id from event WHERE event_hash=?), ?, strftime('%s','now'));"; let mut stmt = tx.prepare(query)?; stmt.execute(params![e, n])?; // get the row ID let v_id = tx.last_insert_rowid(); // delete everything else by this name let del_query = "DELETE FROM user_verification WHERE name = ? AND id != ?;"; let mut del_stmt = tx.prepare(del_query)?; let count = del_stmt.execute(params![n,v_id])?; if count > 0 { info!("removed {} old verification records for ({:?})", count, n); } } tx.commit()?; info!("saved new verification record for ({:?})", n); let ok: Result<()> = Ok(()); ok }).await? } /// Update verification timestamp async fn update_verification_timestamp(&self, id: u64) -> Result<()> { let mut conn = self.write_pool.get()?; let _write_guard = self.write_in_progress.lock().await; tokio::task::spawn_blocking(move || { // add some jitter to the verification to prevent everything from stacking up together. let verif_time = now_jitter(600); let tx = conn.transaction()?; { // update verification time and reset any failure count let query = "UPDATE user_verification SET verified_at=?, failure_count=0 WHERE id=?"; let mut stmt = tx.prepare(query)?; stmt.execute(params![verif_time, id])?; } tx.commit()?; let ok: Result<()> = Ok(()); ok }) .await? } /// Update verification record as failed async fn fail_verification(&self, id: u64) -> Result<()> { let mut conn = self.write_pool.get()?; let _write_guard = self.write_in_progress.lock().await; tokio::task::spawn_blocking(move || { // add some jitter to the verification to prevent everything from stacking up together. let fail_time = now_jitter(600); let tx = conn.transaction()?; { let query = "UPDATE user_verification SET failed_at=?, failure_count=failure_count+1 WHERE id=?"; let mut stmt = tx.prepare(query)?; stmt.execute(params![fail_time, id])?; } tx.commit()?; let ok: Result<()> = Ok(()); ok }) .await? } /// Delete verification record async fn delete_verification(&self, id: u64) -> Result<()> { let mut conn = self.write_pool.get()?; let _write_guard = self.write_in_progress.lock().await; tokio::task::spawn_blocking(move || { let tx = conn.transaction()?; { let query = "DELETE FROM user_verification WHERE id=?;"; let mut stmt = tx.prepare(query)?; stmt.execute(params![id])?; } tx.commit()?; let ok: Result<()> = Ok(()); ok }) .await? } /// Get the latest verification record for a given pubkey. async fn get_latest_user_verification(&self, pub_key: &str) -> Result { let mut conn = self.read_pool.get()?; let pub_key = pub_key.to_owned(); tokio::task::spawn_blocking(move || { let tx = conn.transaction()?; let query = "SELECT v.id, v.name, e.event_hash, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v LEFT JOIN event e ON e.id=v.metadata_event WHERE e.author=? ORDER BY e.created_at DESC, v.verified_at DESC, v.failed_at DESC LIMIT 1;"; let mut stmt = tx.prepare_cached(query)?; let fields = stmt.query_row(params![hex::decode(&pub_key).ok()], |r| { let rowid: u64 = r.get(0)?; let rowname: String = r.get(1)?; let eventid: Vec = r.get(2)?; let created_at: u64 = r.get(3)?; // create a tuple since we can't throw non-rusqlite errors in this closure Ok(( rowid, rowname, eventid, created_at, r.get(4).ok(), r.get(5).ok(), r.get(6)?, )) })?; Ok(VerificationRecord { rowid: fields.0, name: Nip05Name::try_from(&fields.1[..])?, address: pub_key, event: hex::encode(fields.2), event_created: fields.3, last_success: fields.4, last_failure: fields.5, failure_count: fields.6, }) }).await? } /// Get oldest verification before timestamp async fn get_oldest_user_verification(&self, before: u64) -> Result { let mut conn = self.read_pool.get()?; tokio::task::spawn_blocking(move || { let tx = conn.transaction()?; let query = "SELECT v.id, v.name, e.event_hash, e.author, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v INNER JOIN event e ON e.id=v.metadata_event WHERE (v.verified_at < ? OR v.verified_at IS NULL) AND (v.failed_at < ? OR v.failed_at IS NULL) ORDER BY v.verified_at ASC, v.failed_at ASC LIMIT 1;"; let mut stmt = tx.prepare_cached(query)?; let fields = stmt.query_row(params![before, before], |r| { let rowid: u64 = r.get(0)?; let rowname: String = r.get(1)?; let eventid: Vec = r.get(2)?; let pubkey: Vec = r.get(3)?; let created_at: u64 = r.get(4)?; // create a tuple since we can't throw non-rusqlite errors in this closure Ok(( rowid, rowname, eventid, pubkey, created_at, r.get(5).ok(), r.get(6).ok(), r.get(7)?, )) })?; let vr = VerificationRecord { rowid: fields.0, name: Nip05Name::try_from(&fields.1[..])?, address: hex::encode(fields.3), event: hex::encode(fields.2), event_created: fields.4, last_success: fields.5, last_failure: fields.6, failure_count: fields.7, }; Ok(vr) }).await? } } /// Decide if there is an index that should be used explicitly fn override_index(f: &ReqFilter) -> Option { if f.ids.is_some() { return Some("event_hash_index".into()); } // queries for multiple kinds default to kind_index, which is // significantly slower than kind_created_at_index. if let Some(ks) = &f.kinds { if f.ids.is_none() && ks.len() > 1 && f.since.is_none() && f.until.is_none() && f.tags.is_none() && f.authors.is_none() { return Some("kind_created_at_index".into()); } } // if there is an author, it is much better to force the authors index. if f.authors.is_some() { if f.since.is_none() && f.until.is_none() && f.limit.is_none() { if f.kinds.is_none() { // with no use of kinds/created_at, just author return Some("author_index".into()); } // prefer author_kind if there are kinds return Some("author_kind_index".into()); } // finally, prefer author_created_at if time is provided return Some("author_created_at_index".into()); } None } /// Create a dynamic SQL subquery and params from a subscription filter (and optional explicit index used) fn query_from_filter(f: &ReqFilter) -> (String, Vec>, Option) { // build a dynamic SQL query. all user-input is either an integer // (sqli-safe), or a string that is filtered to only contain // hexadecimal characters. Strings that require escaping (tag // names/values) use parameters. // if the filter is malformed, don't return anything. if f.force_no_match { let empty_query = "SELECT e.content FROM event e WHERE 1=0".to_owned(); // query parameters for SQLite let empty_params: Vec> = vec![]; return (empty_query, empty_params, None); } // check if the index needs to be overriden let idx_name = override_index(f); let idx_stmt = idx_name .as_ref() .map_or_else(|| "".to_owned(), |i| format!("INDEXED BY {i}")); let mut query = format!("SELECT e.content FROM event e {idx_stmt}"); // query parameters for SQLite let mut params: Vec> = vec![]; // individual filter components (single conditions such as an author or event ID) let mut filter_components: Vec = Vec::new(); // Query for "authors", allowing prefix matches if let Some(authvec) = &f.authors { // take each author and convert to a hexsearch let mut auth_searches: Vec = vec![]; for auth in authvec { match hex_range(auth) { Some(HexSearch::Exact(ex)) => { auth_searches.push("author=?".to_owned()); params.push(Box::new(ex)); } Some(HexSearch::Range(lower, upper)) => { auth_searches.push("(author>? AND author { auth_searches.push("author>?".to_owned()); params.push(Box::new(lower)); } None => { info!("Could not parse hex range from author {:?}", auth); } } } if !authvec.is_empty() { let auth_clause = format!("({})", auth_searches.join(" OR ")); filter_components.push(auth_clause); } else { filter_components.push("false".to_owned()); } } // Query for Kind if let Some(ks) = &f.kinds { // kind is number, no escaping needed let str_kinds: Vec = ks.iter().map(std::string::ToString::to_string).collect(); let kind_clause = format!("kind IN ({})", str_kinds.join(", ")); filter_components.push(kind_clause); } // Query for event, allowing prefix matches if let Some(idvec) = &f.ids { // take each author and convert to a hexsearch let mut id_searches: Vec = vec![]; for id in idvec { match hex_range(id) { Some(HexSearch::Exact(ex)) => { id_searches.push("event_hash=?".to_owned()); params.push(Box::new(ex)); } Some(HexSearch::Range(lower, upper)) => { id_searches.push("(event_hash>? AND event_hash { id_searches.push("event_hash>?".to_owned()); params.push(Box::new(lower)); } None => { info!("Could not parse hex range from id {:?}", id); } } } if idvec.is_empty() { // if the ids list was empty, we should never return // any results. filter_components.push("false".to_owned()); } else { let id_clause = format!("({})", id_searches.join(" OR ")); filter_components.push(id_clause); } } // Query for tags if let Some(map) = &f.tags { for (key, val) in map.iter() { let mut str_vals: Vec> = vec![]; for v in val { str_vals.push(Box::new(v.clone())); } // create clauses with "?" params for each tag value being searched let str_clause = format!("AND value IN ({})", repeat_vars(str_vals.len())); // find evidence of the target tag name/value existing for this event. // Query for Kind/Since/Until additionally, to reduce the number of tags that come back. let kind_clause; let since_clause; let until_clause; if let Some(ks) = &f.kinds { // kind is number, no escaping needed let str_kinds: Vec = ks.iter().map(std::string::ToString::to_string).collect(); kind_clause = format!("AND kind IN ({})", str_kinds.join(", ")); } else { kind_clause = format!(""); }; if f.since.is_some() { since_clause = format!("AND created_at > {}", f.since.unwrap()); } else { since_clause = format!(""); }; // Query for timestamp if f.until.is_some() { until_clause = format!("AND created_at < {}", f.until.unwrap()); } else { until_clause = format!(""); }; let tag_clause = format!( "e.id IN (SELECT t.event_id FROM tag t WHERE (name=? {str_clause} {kind_clause} {since_clause} {until_clause}))" ); // add the tag name as the first parameter params.push(Box::new(key.to_string())); // add all tag values that are blobs as params params.append(&mut str_vals); filter_components.push(tag_clause); } } // Query for timestamp if f.since.is_some() { let created_clause = format!("created_at > {}", f.since.unwrap()); filter_components.push(created_clause); } // Query for timestamp if f.until.is_some() { let until_clause = format!("created_at < {}", f.until.unwrap()); filter_components.push(until_clause); } // never display hidden events query.push_str(" WHERE hidden!=TRUE"); // never display hidden events filter_components.push("(expires_at IS NULL OR expires_at > ?)".to_string()); params.push(Box::new(unix_time())); // build filter component conditions if !filter_components.is_empty() { query.push_str(" AND "); query.push_str(&filter_components.join(" AND ")); } // Apply per-filter limit to this subquery. // The use of a LIMIT implies a DESC order, to capture only the most recent events. if let Some(lim) = f.limit { let _ = write!(query, " ORDER BY e.created_at DESC LIMIT {lim}"); } else { query.push_str(" ORDER BY e.created_at ASC"); } (query, params, idx_name) } /// Create a dynamic SQL query string and params from a subscription. fn _query_from_sub(sub: &Subscription) -> (String, Vec>, Vec) { // build a dynamic SQL query for an entire subscription, based on // SQL subqueries for filters. let mut subqueries: Vec = Vec::new(); let mut indexes = vec![]; // subquery params let mut params: Vec> = vec![]; // for every filter in the subscription, generate a subquery for f in &sub.filters { let (f_subquery, mut f_params, index) = query_from_filter(f); if let Some(i) = index { indexes.push(i); } subqueries.push(f_subquery); params.append(&mut f_params); } // encapsulate subqueries into select statements let subqueries_selects: Vec = subqueries .iter() .map(|s| format!("SELECT distinct content, created_at FROM ({s})")) .collect(); let query: String = subqueries_selects.join(" UNION "); (query, params, indexes) } /// Build a database connection pool. /// # Panics /// /// Will panic if the pool could not be created. #[must_use] pub fn build_pool( name: &str, settings: &Settings, flags: OpenFlags, min_size: u32, max_size: u32, wait_for_db: bool, ) -> SqlitePool { let db_dir = &settings.database.data_directory; let full_path = Path::new(db_dir).join(DB_FILE); // small hack; if the database doesn't exist yet, that means the // writer thread hasn't finished. Give it a chance to work. This // is only an issue with the first time we run. if !settings.database.in_memory { while !full_path.exists() && wait_for_db { debug!("Database reader pool is waiting on the database to be created..."); thread::sleep(Duration::from_millis(500)); } } let manager = if settings.database.in_memory { SqliteConnectionManager::file("file::memory:?cache=shared") .with_flags(flags) .with_init(|c| c.execute_batch(STARTUP_SQL)) } else { SqliteConnectionManager::file(&full_path) .with_flags(flags) .with_init(|c| c.execute_batch(STARTUP_SQL)) }; let pool: SqlitePool = r2d2::Pool::builder() .test_on_check_out(true) // no noticeable performance hit .min_idle(Some(min_size)) .max_size(max_size) .max_lifetime(Some(Duration::from_secs(30))) .build(manager) .unwrap(); info!( "Built a connection pool {:?} (min={}, max={})", name, min_size, max_size ); pool } /// Cleanup expired events on a regular basis async fn cleanup_expired(pool: SqlitePool, frequency: Duration, write_in_progress: Arc>) -> Result<()> { tokio::task::spawn(async move { loop { tokio::select! { _ = tokio::time::sleep(frequency) => { if let Ok(mut conn) = pool.get() { let mut _guard:Option> = None; // take a write lock to prevent event writes // from proceeding while we are deleting // events. This isn't necessary, but // minimizes the chances of forcing event // persistence to be retried. _guard = Some(write_in_progress.lock().await); let start = Instant::now(); let exp_res = tokio::task::spawn_blocking(move || { delete_expired(&mut conn) }).await; match exp_res { Ok(Ok(count)) => { if count > 0 { info!("removed {} expired events in: {:?}", count, start.elapsed()); } }, _ => { // either the task or underlying query failed info!("there was an error cleaning up expired events: {:?}", exp_res); } } } } }; } }); Ok(()) } /// Execute a query to delete all expired events pub fn delete_expired(conn: &mut PooledConnection) -> Result { let tx = conn.transaction()?; let update_count = tx.execute( "DELETE FROM event WHERE expires_at <= ?", params![unix_time()], )?; tx.commit()?; Ok(update_count) } /// Perform database WAL checkpoint on a regular basis pub async fn db_checkpoint_task( pool: SqlitePool, frequency: Duration, write_in_progress: Arc>, checkpoint_in_progress: Arc>) -> Result<()> { // TODO; use acquire_many on the reader semaphore to stop them from interrupting this. tokio::task::spawn(async move { // WAL size in pages. let mut current_wal_size = 0; // WAL threshold for more aggressive checkpointing (10,000 pages, or about 40MB) let wal_threshold = 1000 * 10; // default threshold for the busy timer let busy_wait_default = Duration::from_secs(1); // if the WAL file is getting too big, switch to this let busy_wait_default_long = Duration::from_secs(10); loop { tokio::select! { _ = tokio::time::sleep(frequency) => { if let Ok(mut conn) = pool.get() { // block all other writers let _write_guard = write_in_progress.lock().await; let mut _guard:Option> = None; // the busy timer will block writers, so don't set // this any higher than you want max latency for event // writes. if current_wal_size <= wal_threshold { conn.busy_timeout(busy_wait_default).ok(); } else { // if the wal size has exceeded a threshold, increase the busy timeout. conn.busy_timeout(busy_wait_default_long).ok(); // take a lock that will prevent new readers. info!("blocking new readers to perform wal_checkpoint"); _guard = Some(checkpoint_in_progress.lock().await); } debug!("running wal_checkpoint(TRUNCATE)"); if let Ok(new_size) = checkpoint_db(&mut conn) { current_wal_size = new_size; } } } }; } }); Ok(()) } #[derive(Debug)] enum SqliteStatus { Ok, Busy, Error, Other(u64), } /// Checkpoint/Truncate WAL. Returns the number of WAL pages remaining. pub fn checkpoint_db(conn: &mut PooledConnection) -> Result { let query = "PRAGMA wal_checkpoint(TRUNCATE);"; let start = Instant::now(); let (cp_result, wal_size, _frames_checkpointed) = conn.query_row(query, [], |row| { let checkpoint_result: u64 = row.get(0)?; let wal_size: u64 = row.get(1)?; let frames_checkpointed: u64 = row.get(2)?; Ok((checkpoint_result, wal_size, frames_checkpointed)) })?; let result = match cp_result { 0 => SqliteStatus::Ok, 1 => SqliteStatus::Busy, 2 => SqliteStatus::Error, x => SqliteStatus::Other(x), }; info!( "checkpoint ran in {:?} (result: {:?}, WAL size: {})", start.elapsed(), result, wal_size ); Ok(wal_size as usize) } /// Produce a arbitrary list of '?' parameters. fn repeat_vars(count: usize) -> String { if count == 0 { return "".to_owned(); } let mut s = "?,".repeat(count); // Remove trailing comma s.pop(); s } /// Display database pool stats every 1 minute pub async fn monitor_pool(name: &str, pool: SqlitePool) { let sleep_dur = Duration::from_secs(60); loop { log_pool_stats(name, &pool); tokio::time::sleep(sleep_dur).await; } } /// Log pool stats fn log_pool_stats(name: &str, pool: &SqlitePool) { let state: r2d2::State = pool.state(); let in_use_cxns = state.connections - state.idle_connections; debug!( "DB pool {:?} usage (in_use: {}, available: {}, max: {})", name, in_use_cxns, state.connections, pool.max_size() ); } /// Check if the pool is fully utilized fn _pool_at_capacity(pool: &SqlitePool) -> bool { let state: r2d2::State = pool.state(); state.idle_connections == 0 }