2023-01-22 10:49:49 -05:00
//! Event persistence and querying
//use crate::config::SETTINGS;
use crate ::config ::Settings ;
use crate ::error ::Result ;
use crate ::event ::{ single_char_tagname , Event } ;
use crate ::hexrange ::hex_range ;
use crate ::hexrange ::HexSearch ;
use crate ::repo ::sqlite_migration ::{ STARTUP_SQL , upgrade_db } ;
2023-02-12 15:33:40 -05:00
use crate ::utils ::{ is_hex } ;
2023-01-22 10:49:49 -05:00
use crate ::nip05 ::{ Nip05Name , VerificationRecord } ;
use crate ::subscription ::{ ReqFilter , Subscription } ;
2023-01-22 12:08:12 -05:00
use crate ::server ::NostrMetrics ;
2023-01-22 10:49:49 -05:00
use hex ;
use r2d2 ;
use r2d2_sqlite ::SqliteConnectionManager ;
use rusqlite ::params ;
use rusqlite ::types ::ToSql ;
use rusqlite ::OpenFlags ;
2023-01-31 19:09:23 -05:00
use tokio ::sync ::{ Mutex , MutexGuard , Semaphore } ;
2023-01-22 10:49:49 -05:00
use std ::fmt ::Write as _ ;
use std ::path ::Path ;
use std ::sync ::Arc ;
use std ::thread ;
use std ::time ::Duration ;
use std ::time ::Instant ;
use tokio ::task ;
use tracing ::{ debug , info , trace , warn } ;
use async_trait ::async_trait ;
use crate ::db ::QueryResult ;
use crate ::repo ::{ now_jitter , NostrRepo } ;
pub type SqlitePool = r2d2 ::Pool < r2d2_sqlite ::SqliteConnectionManager > ;
pub type PooledConnection = r2d2 ::PooledConnection < r2d2_sqlite ::SqliteConnectionManager > ;
pub const DB_FILE : & str = " nostr.db " ;
#[ derive(Clone) ]
pub struct SqliteRepo {
2023-01-22 12:08:12 -05:00
/// Metrics
metrics : NostrMetrics ,
2023-01-22 10:49:49 -05:00
/// Pool for reading events and NIP-05 status
read_pool : SqlitePool ,
/// Pool for writing events and NIP-05 verification
write_pool : SqlitePool ,
/// Pool for performing checkpoints/optimization
maint_pool : SqlitePool ,
/// Flag to indicate a checkpoint is underway
checkpoint_in_progress : Arc < Mutex < u64 > > ,
/// Flag to limit writer concurrency
write_in_progress : Arc < Mutex < u64 > > ,
2023-01-31 19:09:23 -05:00
/// Semaphore for readers to acquire blocking threads
reader_threads_ready : Arc < Semaphore > ,
2023-01-22 10:49:49 -05:00
}
impl SqliteRepo {
// build all the pools needed
2023-01-22 12:08:12 -05:00
#[ must_use ] pub fn new ( settings : & Settings , metrics : NostrMetrics ) -> SqliteRepo {
2023-01-28 15:02:20 -05:00
let write_pool = build_pool (
" writer " ,
settings ,
OpenFlags ::SQLITE_OPEN_READ_WRITE | OpenFlags ::SQLITE_OPEN_CREATE ,
1 ,
2 ,
false ,
) ;
2023-01-22 10:49:49 -05:00
let maint_pool = build_pool (
" maintenance " ,
settings ,
OpenFlags ::SQLITE_OPEN_READ_WRITE | OpenFlags ::SQLITE_OPEN_CREATE ,
1 ,
2 ,
true ,
) ;
let read_pool = build_pool (
" reader " ,
settings ,
OpenFlags ::SQLITE_OPEN_READ_WRITE | OpenFlags ::SQLITE_OPEN_CREATE ,
settings . database . min_conn ,
settings . database . max_conn ,
true ,
) ;
2023-01-22 11:06:44 -05:00
// this is used to block new reads during critical checkpoints
let checkpoint_in_progress = Arc ::new ( Mutex ::new ( 0 ) ) ;
// SQLite can only effectively write single threaded, so don't
// block multiple worker threads unnecessarily.
let write_in_progress = Arc ::new ( Mutex ::new ( 0 ) ) ;
2023-01-31 19:09:23 -05:00
// configure the number of worker threads that can be spawned
// to match the number of database reader connections.
let max_conn = settings . database . max_conn as usize ;
let reader_threads_ready = Arc ::new ( Semaphore ::new ( max_conn ) ) ;
2023-01-22 10:49:49 -05:00
SqliteRepo {
2023-01-28 15:02:20 -05:00
metrics ,
2023-01-22 10:49:49 -05:00
read_pool ,
2023-01-22 11:06:44 -05:00
write_pool ,
maint_pool ,
checkpoint_in_progress ,
write_in_progress ,
2023-01-31 19:09:23 -05:00
reader_threads_ready ,
2023-01-22 10:49:49 -05:00
}
}
/// Persist an event to the database, returning rows added.
pub fn persist_event ( conn : & mut PooledConnection , e : & Event ) -> Result < u64 > {
2023-01-22 11:06:44 -05:00
// enable auto vacuum
conn . execute_batch ( " pragma auto_vacuum = FULL " ) ? ;
// start transaction
let tx = conn . transaction ( ) ? ;
// get relevant fields from event and convert to blobs.
let id_blob = hex ::decode ( & e . id ) . ok ( ) ;
let pubkey_blob : Option < Vec < u8 > > = hex ::decode ( & e . pubkey ) . ok ( ) ;
let delegator_blob : Option < Vec < u8 > > = e . delegated_by . as_ref ( ) . and_then ( | d | hex ::decode ( d ) . ok ( ) ) ;
let event_str = serde_json ::to_string ( & e ) . ok ( ) ;
// check for replaceable events that would hide this one; we won't even attempt to insert these.
if e . is_replaceable ( ) {
2023-01-22 10:49:49 -05:00
let repl_count = tx . query_row (
2023-01-24 09:04:42 -05:00
" SELECT e.id FROM event e INDEXED BY author_index WHERE e.author=? AND e.kind=? AND e.created_at >= ? LIMIT 1; " ,
2023-01-22 11:06:44 -05:00
params! [ pubkey_blob , e . kind , e . created_at ] , | row | row . get ::< usize , usize > ( 0 ) ) ;
if repl_count . ok ( ) . is_some ( ) {
return Ok ( 0 ) ;
}
}
2023-01-24 09:04:42 -05:00
// check for parameterized replaceable events that would be hidden; don't insert these either.
if let Some ( d_tag ) = e . distinct_param ( ) {
2023-02-12 15:33:40 -05:00
let repl_count = tx . query_row (
" SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND e.kind=? AND t.name='d' AND t.value=? AND e.created_at >= ? LIMIT 1; " ,
params! [ pubkey_blob , e . kind , d_tag , e . created_at ] , | row | row . get ::< usize , usize > ( 0 ) ) ;
2023-01-24 09:04:42 -05:00
// if any rows were returned, then some newer event with
// the same author/kind/tag value exist, and we can ignore
// this event.
if repl_count . ok ( ) . is_some ( ) {
return Ok ( 0 )
}
}
2023-01-22 11:06:44 -05:00
// ignore if the event hash is a duplicate.
let mut ins_count = tx . execute (
2023-01-22 10:49:49 -05:00
" INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, delegated_by, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%s','now'), FALSE); " ,
params! [ id_blob , e . created_at , e . kind , pubkey_blob , delegator_blob , event_str ]
2023-01-22 11:06:44 -05:00
) ? as u64 ;
if ins_count = = 0 {
2023-01-22 10:49:49 -05:00
// if the event was a duplicate, no need to insert event or
// pubkey references.
tx . rollback ( ) . ok ( ) ;
return Ok ( ins_count ) ;
2023-01-22 11:06:44 -05:00
}
// remember primary key of the event most recently inserted.
let ev_id = tx . last_insert_rowid ( ) ;
// add all tags to the tag table
for tag in & e . tags {
2023-01-22 10:49:49 -05:00
// ensure we have 2 values.
if tag . len ( ) > = 2 {
2023-01-22 11:06:44 -05:00
let tagname = & tag [ 0 ] ;
let tagval = & tag [ 1 ] ;
// only single-char tags are searchable
let tagchar_opt = single_char_tagname ( tagname ) ;
match & tagchar_opt {
2023-01-22 10:49:49 -05:00
Some ( _ ) = > {
2023-02-12 15:33:40 -05:00
tx . execute (
" INSERT OR IGNORE INTO tag (event_id, name, value, kind, created_at) VALUES (?1, ?2, ?3, ?4, ?5) " ,
params! [ ev_id , & tagname , & tagval , e . kind , e . created_at ] ,
) ? ;
2023-01-22 10:49:49 -05:00
}
None = > { }
2023-01-22 11:06:44 -05:00
}
2023-01-22 10:49:49 -05:00
}
2023-01-22 11:06:44 -05:00
}
// if this event is replaceable update, remove other replaceable
// event with the same kind from the same author that was issued
// earlier than this.
if e . is_replaceable ( ) {
let author = hex ::decode ( & e . pubkey ) . ok ( ) ;
// this is a backwards check - hide any events that were older.
2023-01-22 10:49:49 -05:00
let update_count = tx . execute (
2023-01-22 11:06:44 -05:00
" DELETE FROM event WHERE kind=? and author=? and id NOT IN (SELECT id FROM event INDEXED BY author_kind_index WHERE kind=? AND author=? ORDER BY created_at DESC LIMIT 1) " ,
params! [ e . kind , author , e . kind , author ] ,
2023-01-22 10:49:49 -05:00
) ? ;
if update_count > 0 {
2023-01-22 11:06:44 -05:00
info! (
2023-01-22 10:49:49 -05:00
" removed {} older replaceable kind {} events for author: {:?} " ,
update_count ,
e . kind ,
e . get_author_prefix ( )
2023-01-22 11:06:44 -05:00
) ;
2023-01-22 10:49:49 -05:00
}
2023-01-22 11:06:44 -05:00
}
2023-01-24 09:04:42 -05:00
// if this event is parameterized replaceable, remove other events.
if let Some ( d_tag ) = e . distinct_param ( ) {
2023-02-12 15:33:40 -05:00
let update_count = tx . execute (
" DELETE FROM event WHERE kind=? AND author=? AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=? AND e.author=? AND t.name='d' AND t.value=? ORDER BY created_at DESC LIMIT -1 OFFSET 1); " ,
params! [ e . kind , pubkey_blob , e . kind , pubkey_blob , d_tag ] ) ? ;
2023-01-24 09:04:42 -05:00
if update_count > 0 {
info! (
" removed {} older parameterized replaceable kind {} events for author: {:?} " ,
update_count ,
e . kind ,
e . get_author_prefix ( )
) ;
}
}
2023-01-22 11:06:44 -05:00
// if this event is a deletion, hide the referenced events from the same author.
if e . kind = = 5 {
2023-01-22 10:49:49 -05:00
let event_candidates = e . tag_values_by_name ( " e " ) ;
// first parameter will be author
let mut params : Vec < Box < dyn ToSql > > = vec! [ Box ::new ( hex ::decode ( & e . pubkey ) ? ) ] ;
event_candidates
2023-01-22 11:06:44 -05:00
. iter ( )
. filter ( | x | is_hex ( x ) & & x . len ( ) = = 64 )
. filter_map ( | x | hex ::decode ( x ) . ok ( ) )
. for_each ( | x | params . push ( Box ::new ( x ) ) ) ;
2023-01-22 10:49:49 -05:00
let query = format! (
2023-01-22 11:06:44 -05:00
" UPDATE event SET hidden=TRUE WHERE kind!=5 AND author=? AND event_hash IN ({}) " ,
repeat_vars ( params . len ( ) - 1 )
2023-01-22 10:49:49 -05:00
) ;
let mut stmt = tx . prepare ( & query ) ? ;
let update_count = stmt . execute ( rusqlite ::params_from_iter ( params ) ) ? ;
info! (
2023-01-22 11:06:44 -05:00
" hid {} deleted events for author {:?} " ,
update_count ,
e . get_author_prefix ( )
2023-01-22 10:49:49 -05:00
) ;
2023-01-22 11:06:44 -05:00
} else {
2023-01-22 10:49:49 -05:00
// check if a deletion has already been recorded for this event.
// Only relevant for non-deletion events
let del_count = tx . query_row (
2023-02-12 16:43:22 -05:00
" SELECT e.id FROM event e WHERE e.author=? AND e.id IN (SELECT t.event_id FROM tag t WHERE t.name='e' AND t.kind=5 AND t.value=?) LIMIT 1; " ,
2023-02-12 15:33:40 -05:00
params! [ pubkey_blob , e . id ] , | row | row . get ::< usize , usize > ( 0 ) ) ;
2023-01-22 10:49:49 -05:00
// check if a the query returned a result, meaning we should
// hid the current event
if del_count . ok ( ) . is_some ( ) {
2023-01-22 11:06:44 -05:00
// a deletion already existed, mark original event as hidden.
info! (
2023-01-22 10:49:49 -05:00
" hid event: {:?} due to existing deletion by author: {:?} " ,
e . get_event_id_prefix ( ) ,
e . get_author_prefix ( )
2023-01-22 11:06:44 -05:00
) ;
let _update_count =
2023-01-22 10:49:49 -05:00
tx . execute ( " UPDATE event SET hidden=TRUE WHERE id=? " , params! [ ev_id ] ) ? ;
2023-01-22 11:06:44 -05:00
// event was deleted, so let caller know nothing new
// arrived, preventing this from being sent to active
// subscriptions
ins_count = 0 ;
2023-01-22 10:49:49 -05:00
}
2023-01-22 11:06:44 -05:00
}
tx . commit ( ) ? ;
Ok ( ins_count )
2023-01-22 10:49:49 -05:00
}
}
#[ async_trait ]
impl NostrRepo for SqliteRepo {
async fn start ( & self ) -> Result < ( ) > {
2023-01-22 11:06:44 -05:00
db_checkpoint_task ( self . maint_pool . clone ( ) , Duration ::from_secs ( 60 ) , self . checkpoint_in_progress . clone ( ) ) . await
2023-01-22 10:49:49 -05:00
}
async fn migrate_up ( & self ) -> Result < usize > {
2023-01-22 11:06:44 -05:00
let _write_guard = self . write_in_progress . lock ( ) . await ;
let mut conn = self . write_pool . get ( ) ? ;
task ::spawn_blocking ( move | | {
upgrade_db ( & mut conn )
} ) . await ?
2023-01-22 10:49:49 -05:00
}
/// Persist event to database
async fn write_event ( & self , e : & Event ) -> Result < u64 > {
2023-01-28 15:02:20 -05:00
let start = Instant ::now ( ) ;
2023-01-22 11:06:44 -05:00
let _write_guard = self . write_in_progress . lock ( ) . await ;
// spawn a blocking thread
2023-01-30 19:02:40 -05:00
//let mut conn = self.write_pool.get()?;
let pool = self . write_pool . clone ( ) ;
2023-01-22 11:06:44 -05:00
let e = e . clone ( ) ;
2023-01-22 12:08:12 -05:00
let event_count = task ::spawn_blocking ( move | | {
2023-01-30 19:02:40 -05:00
let mut conn = pool . get ( ) ? ;
2023-01-22 11:06:44 -05:00
SqliteRepo ::persist_event ( & mut conn , & e )
2023-01-22 12:08:12 -05:00
} ) . await ? ;
2023-01-28 15:02:20 -05:00
self . metrics
2023-01-22 12:08:12 -05:00
. write_events
. observe ( start . elapsed ( ) . as_secs_f64 ( ) ) ;
2023-01-28 15:02:20 -05:00
event_count
2023-01-22 10:49:49 -05:00
}
/// Perform a database query using a subscription.
///
/// The [`Subscription`] is converted into a SQL query. Each result
/// is published on the `query_tx` channel as it is returned. If a
/// message becomes available on the `abandon_query_rx` channel, the
/// query is immediately aborted.
async fn query_subscription (
& self ,
sub : Subscription ,
client_id : String ,
query_tx : tokio ::sync ::mpsc ::Sender < QueryResult > ,
mut abandon_query_rx : tokio ::sync ::oneshot ::Receiver < ( ) > ,
) -> Result < ( ) > {
2023-01-22 11:06:44 -05:00
let pre_spawn_start = Instant ::now ( ) ;
2023-01-31 19:09:23 -05:00
// if we let every request spawn a thread, we'll exhaust the
// thread pool waiting for queries to finish under high load.
// Instead, don't bother spawning threads when they will just
// block on a database connection.
let sem = self . reader_threads_ready . clone ( ) . acquire_owned ( ) . await . unwrap ( ) ;
2023-01-22 11:06:44 -05:00
let self = self . clone ( ) ;
2023-01-28 15:02:20 -05:00
let metrics = self . metrics . clone ( ) ;
2023-01-22 11:06:44 -05:00
task ::spawn_blocking ( move | | {
{
// if we are waiting on a checkpoint, stop until it is complete
let _x = self . checkpoint_in_progress . blocking_lock ( ) ;
}
2023-01-22 10:49:49 -05:00
let db_queue_time = pre_spawn_start . elapsed ( ) ;
// if the queue time was very long (>5 seconds), spare the DB and abort.
if db_queue_time > Duration ::from_secs ( 5 ) {
2023-01-22 11:06:44 -05:00
info! (
2023-01-22 10:49:49 -05:00
" shedding DB query load queued for {:?} (cid: {}, sub: {:?}) " ,
db_queue_time , client_id , sub . id
2023-01-22 11:06:44 -05:00
) ;
2023-01-30 19:40:47 -05:00
metrics . query_aborts . with_label_values ( & [ " loadshed " ] ) . inc ( ) ;
2023-01-22 11:06:44 -05:00
return Ok ( ( ) ) ;
2023-01-22 10:49:49 -05:00
}
// otherwise, report queuing time if it is slow
else if db_queue_time > Duration ::from_secs ( 1 ) {
2023-01-22 11:06:44 -05:00
debug! (
2023-01-22 10:49:49 -05:00
" (slow) DB query queued for {:?} (cid: {}, sub: {:?}) " ,
db_queue_time , client_id , sub . id
2023-01-22 11:06:44 -05:00
) ;
2023-01-22 10:49:49 -05:00
}
2023-02-01 19:09:30 -05:00
// check before getting a DB connection if the client still wants the results
if abandon_query_rx . try_recv ( ) . is_ok ( ) {
debug! ( " query cancelled by client (before execution) (cid: {}, sub: {:?}) " , client_id , sub . id ) ;
return Ok ( ( ) ) ;
}
2023-01-22 10:49:49 -05:00
let start = Instant ::now ( ) ;
let mut row_count : usize = 0 ;
// cutoff for displaying slow queries
2023-01-31 19:09:43 -05:00
let slow_cutoff = Duration ::from_millis ( 250 ) ;
2023-01-30 19:02:40 -05:00
let mut filter_count = 0 ;
2023-01-31 19:09:43 -05:00
// remove duplicates from the filter list.
2023-02-01 19:09:30 -05:00
if let Ok ( mut conn ) = self . read_pool . get ( ) {
2023-02-01 19:09:30 -05:00
{
let pool_state = self . read_pool . state ( ) ;
metrics . db_connections . set ( ( pool_state . connections - pool_state . idle_connections ) . into ( ) ) ;
}
2023-02-01 19:09:30 -05:00
for filter in sub . filters . iter ( ) {
let filter_start = Instant ::now ( ) ;
filter_count + = 1 ;
2023-02-12 16:00:50 -05:00
let sql_gen_elapsed = filter_start . elapsed ( ) ;
2023-02-06 07:43:09 -05:00
let ( q , p , idx ) = query_from_filter ( filter ) ;
2023-02-01 19:09:30 -05:00
if sql_gen_elapsed > Duration ::from_millis ( 10 ) {
debug! ( " SQL (slow) generated in {:?} " , filter_start . elapsed ( ) ) ;
}
2023-02-01 19:09:30 -05:00
// any client that doesn't cause us to generate new rows in 2
2023-02-01 19:09:30 -05:00
// seconds gets dropped.
2023-02-01 19:09:30 -05:00
let abort_cutoff = Duration ::from_secs ( 2 ) ;
2023-02-01 19:09:30 -05:00
let mut slow_first_event ;
let mut last_successful_send = Instant ::now ( ) ;
2023-01-30 19:02:40 -05:00
// execute the query.
// make the actual SQL query (with parameters inserted) available
2023-02-01 19:09:30 -05:00
conn . trace ( Some ( | x | { trace! ( " SQL trace: {:?} " , x ) } ) ) ;
2023-01-30 19:02:40 -05:00
let mut stmt = conn . prepare_cached ( & q ) ? ;
let mut event_rows = stmt . query ( rusqlite ::params_from_iter ( p ) ) ? ;
let mut first_result = true ;
while let Some ( row ) = event_rows . next ( ) ? {
2023-01-30 19:02:40 -05:00
let first_event_elapsed = filter_start . elapsed ( ) ;
2023-01-30 19:02:40 -05:00
slow_first_event = first_event_elapsed > = slow_cutoff ;
if first_result {
debug! (
" first result in {:?} (cid: {}, sub: {:?}, filter: {}) [used index: {:?}] " ,
first_event_elapsed , client_id , sub . id , filter_count , idx
) ;
2023-01-31 19:09:43 -05:00
// logging for slow queries; show filter and SQL.
// to reduce logging; only show 1/16th of clients (leading 0)
if slow_first_event & & client_id . starts_with ( '0' ) {
debug! (
2023-02-01 19:09:30 -05:00
" filter first result in {:?} (slow): {} (cid: {}, sub: {:?}) " ,
first_event_elapsed , serde_json ::to_string ( & filter ) ? , client_id , sub . id
2023-01-31 19:09:43 -05:00
) ;
}
2023-01-30 19:02:40 -05:00
first_result = false ;
}
// check if a checkpoint is trying to run, and abort
if row_count % 100 = = 0 {
{
if self . checkpoint_in_progress . try_lock ( ) . is_err ( ) {
// lock was held, abort this query
debug! ( " query aborted due to checkpoint (cid: {}, sub: {:?}) " , client_id , sub . id ) ;
2023-01-30 19:40:47 -05:00
metrics . query_aborts . with_label_values ( & [ " checkpoint " ] ) . inc ( ) ;
2023-01-30 19:02:40 -05:00
return Ok ( ( ) ) ;
}
}
}
// check if this is still active; every 100 rows
if row_count % 100 = = 0 & & abandon_query_rx . try_recv ( ) . is_ok ( ) {
debug! ( " query cancelled by client (cid: {}, sub: {:?}) " , client_id , sub . id ) ;
return Ok ( ( ) ) ;
}
row_count + = 1 ;
let event_json = row . get ( 0 ) ? ;
loop {
if query_tx . capacity ( ) ! = 0 {
// we have capacity to add another item
break ;
}
// the queue is full
trace! ( " db reader thread is stalled " ) ;
if last_successful_send + abort_cutoff < Instant ::now ( ) {
// the queue has been full for too long, abort
info! ( " aborting database query due to slow client (cid: {}, sub: {:?}) " ,
client_id , sub . id ) ;
2023-01-30 19:40:47 -05:00
metrics . query_aborts . with_label_values ( & [ " slowclient " ] ) . inc ( ) ;
2023-01-30 19:02:40 -05:00
let ok : Result < ( ) > = Ok ( ( ) ) ;
return ok ;
}
// check if a checkpoint is trying to run, and abort
2023-01-22 11:06:44 -05:00
if self . checkpoint_in_progress . try_lock ( ) . is_err ( ) {
// lock was held, abort this query
debug! ( " query aborted due to checkpoint (cid: {}, sub: {:?}) " , client_id , sub . id ) ;
2023-01-30 19:40:47 -05:00
metrics . query_aborts . with_label_values ( & [ " checkpoint " ] ) . inc ( ) ;
2023-01-22 11:06:44 -05:00
return Ok ( ( ) ) ;
}
2023-01-30 19:02:40 -05:00
// give the queue a chance to clear before trying again
2023-02-01 19:09:30 -05:00
debug! ( " query thread sleeping due to full query_tx (cid: {}, sub: {:?}) " , client_id , sub . id ) ;
thread ::sleep ( Duration ::from_millis ( 500 ) ) ;
2023-01-22 11:06:44 -05:00
}
2023-01-30 19:02:40 -05:00
// TODO: we could use try_send, but we'd have to juggle
// getting the query result back as part of the error
// result.
query_tx
. blocking_send ( QueryResult {
sub_id : sub . get_id ( ) ,
event : event_json ,
} )
. ok ( ) ;
last_successful_send = Instant ::now ( ) ;
2023-01-22 11:06:44 -05:00
}
2023-02-01 19:09:30 -05:00
metrics
. query_db
. observe ( filter_start . elapsed ( ) . as_secs_f64 ( ) ) ;
// if the filter took too much db_time, print out the JSON.
if filter_start . elapsed ( ) > slow_cutoff & & client_id . starts_with ( '0' ) {
debug! (
" query filter req (slow): {} (cid: {}, sub: {:?}, filter: {}) " ,
serde_json ::to_string ( & filter ) ? , client_id , sub . id , filter_count
) ;
}
2023-01-31 19:09:43 -05:00
2023-02-01 19:09:30 -05:00
}
} else {
warn! ( " Could not get a database connection for querying " ) ;
2023-01-22 10:49:49 -05:00
}
2023-01-31 19:09:23 -05:00
drop ( sem ) ; // new query can begin
2023-01-30 19:02:40 -05:00
debug! (
" query completed in {:?} (cid: {}, sub: {:?}, db_time: {:?}, rows: {}) " ,
pre_spawn_start . elapsed ( ) ,
client_id ,
sub . id ,
start . elapsed ( ) ,
row_count
) ;
query_tx
. blocking_send ( QueryResult {
sub_id : sub . get_id ( ) ,
event : " EOSE " . to_string ( ) ,
} )
. ok ( ) ;
2023-01-28 15:02:20 -05:00
metrics
. query_sub
. observe ( pre_spawn_start . elapsed ( ) . as_secs_f64 ( ) ) ;
2023-01-22 10:49:49 -05:00
let ok : Result < ( ) > = Ok ( ( ) ) ;
ok
2023-01-22 11:06:44 -05:00
} ) ;
2023-01-28 15:02:20 -05:00
Ok ( ( ) )
2023-01-22 10:49:49 -05:00
}
/// Perform normal maintenance
async fn optimize_db ( & self ) -> Result < ( ) > {
2023-01-22 11:06:44 -05:00
let conn = self . write_pool . get ( ) ? ;
task ::spawn_blocking ( move | | {
let start = Instant ::now ( ) ;
conn . execute_batch ( " PRAGMA optimize; " ) . ok ( ) ;
info! ( " optimize ran in {:?} " , start . elapsed ( ) ) ;
} ) . await ? ;
Ok ( ( ) )
2023-01-22 10:49:49 -05:00
}
/// Create a new verification record connected to a specific event
async fn create_verification_record ( & self , event_id : & str , name : & str ) -> Result < ( ) > {
2023-01-22 11:06:44 -05:00
let e = hex ::decode ( event_id ) . ok ( ) ;
let n = name . to_owned ( ) ;
let mut conn = self . write_pool . get ( ) ? ;
tokio ::task ::spawn_blocking ( move | | {
2023-01-22 10:49:49 -05:00
let tx = conn . transaction ( ) ? ;
{
2023-01-22 11:06:44 -05:00
// if we create a /new/ one, we should get rid of any old ones. or group the new ones by name and only consider the latest.
let query = " INSERT INTO user_verification (metadata_event, name, verified_at) VALUES ((SELECT id from event WHERE event_hash=?), ?, strftime('%s','now')); " ;
let mut stmt = tx . prepare ( query ) ? ;
stmt . execute ( params! [ e , n ] ) ? ;
// get the row ID
let v_id = tx . last_insert_rowid ( ) ;
// delete everything else by this name
let del_query = " DELETE FROM user_verification WHERE name = ? AND id != ?; " ;
let mut del_stmt = tx . prepare ( del_query ) ? ;
let count = del_stmt . execute ( params! [ n , v_id ] ) ? ;
if count > 0 {
2023-01-22 10:49:49 -05:00
info! ( " removed {} old verification records for ({:?}) " , count , n ) ;
2023-01-22 11:06:44 -05:00
}
2023-01-22 10:49:49 -05:00
}
tx . commit ( ) ? ;
info! ( " saved new verification record for ({:?}) " , n ) ;
let ok : Result < ( ) > = Ok ( ( ) ) ;
ok
2023-01-22 11:06:44 -05:00
} ) . await ?
2023-01-22 10:49:49 -05:00
}
/// Update verification timestamp
async fn update_verification_timestamp ( & self , id : u64 ) -> Result < ( ) > {
2023-01-22 11:06:44 -05:00
let mut conn = self . write_pool . get ( ) ? ;
2023-01-22 10:49:49 -05:00
tokio ::task ::spawn_blocking ( move | | {
// add some jitter to the verification to prevent everything from stacking up together.
let verif_time = now_jitter ( 600 ) ;
let tx = conn . transaction ( ) ? ;
{
// update verification time and reset any failure count
let query =
" UPDATE user_verification SET verified_at=?, failure_count=0 WHERE id=? " ;
let mut stmt = tx . prepare ( query ) ? ;
stmt . execute ( params! [ verif_time , id ] ) ? ;
}
tx . commit ( ) ? ;
let ok : Result < ( ) > = Ok ( ( ) ) ;
ok
} )
. await ?
}
/// Update verification record as failed
async fn fail_verification ( & self , id : u64 ) -> Result < ( ) > {
2023-01-22 11:06:44 -05:00
let mut conn = self . write_pool . get ( ) ? ;
tokio ::task ::spawn_blocking ( move | | {
2023-01-22 10:49:49 -05:00
// add some jitter to the verification to prevent everything from stacking up together.
let fail_time = now_jitter ( 600 ) ;
let tx = conn . transaction ( ) ? ;
{
let query = " UPDATE user_verification SET failed_at=?, failure_count=failure_count+1 WHERE id=? " ;
let mut stmt = tx . prepare ( query ) ? ;
stmt . execute ( params! [ fail_time , id ] ) ? ;
}
tx . commit ( ) ? ;
let ok : Result < ( ) > = Ok ( ( ) ) ;
ok
} )
. await ?
}
/// Delete verification record
async fn delete_verification ( & self , id : u64 ) -> Result < ( ) > {
2023-01-22 11:06:44 -05:00
let mut conn = self . write_pool . get ( ) ? ;
tokio ::task ::spawn_blocking ( move | | {
2023-01-22 10:49:49 -05:00
let tx = conn . transaction ( ) ? ;
{
let query = " DELETE FROM user_verification WHERE id=?; " ;
let mut stmt = tx . prepare ( query ) ? ;
stmt . execute ( params! [ id ] ) ? ;
}
tx . commit ( ) ? ;
let ok : Result < ( ) > = Ok ( ( ) ) ;
ok
} )
. await ?
}
/// Get the latest verification record for a given pubkey.
async fn get_latest_user_verification ( & self , pub_key : & str ) -> Result < VerificationRecord > {
2023-01-22 11:06:44 -05:00
let mut conn = self . read_pool . get ( ) ? ;
let pub_key = pub_key . to_owned ( ) ;
tokio ::task ::spawn_blocking ( move | | {
let tx = conn . transaction ( ) ? ;
let query = " SELECT v.id, v.name, e.event_hash, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v LEFT JOIN event e ON e.id=v.metadata_event WHERE e.author=? ORDER BY e.created_at DESC, v.verified_at DESC, v.failed_at DESC LIMIT 1; " ;
let mut stmt = tx . prepare_cached ( query ) ? ;
let fields = stmt . query_row ( params! [ hex ::decode ( & pub_key ) . ok ( ) ] , | r | {
let rowid : u64 = r . get ( 0 ) ? ;
let rowname : String = r . get ( 1 ) ? ;
let eventid : Vec < u8 > = r . get ( 2 ) ? ;
let created_at : u64 = r . get ( 3 ) ? ;
// create a tuple since we can't throw non-rusqlite errors in this closure
Ok ( (
rowid ,
rowname ,
eventid ,
created_at ,
r . get ( 4 ) . ok ( ) ,
r . get ( 5 ) . ok ( ) ,
r . get ( 6 ) ? ,
) )
} ) ? ;
Ok ( VerificationRecord {
rowid : fields . 0 ,
name : Nip05Name ::try_from ( & fields . 1 [ .. ] ) ? ,
address : pub_key ,
event : hex ::encode ( fields . 2 ) ,
event_created : fields . 3 ,
last_success : fields . 4 ,
last_failure : fields . 5 ,
failure_count : fields . 6 ,
} )
} ) . await ?
2023-01-22 10:49:49 -05:00
}
/// Get oldest verification before timestamp
async fn get_oldest_user_verification ( & self , before : u64 ) -> Result < VerificationRecord > {
2023-01-22 11:06:44 -05:00
let mut conn = self . read_pool . get ( ) ? ;
tokio ::task ::spawn_blocking ( move | | {
let tx = conn . transaction ( ) ? ;
let query = " SELECT v.id, v.name, e.event_hash, e.author, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v INNER JOIN event e ON e.id=v.metadata_event WHERE (v.verified_at < ? OR v.verified_at IS NULL) AND (v.failed_at < ? OR v.failed_at IS NULL) ORDER BY v.verified_at ASC, v.failed_at ASC LIMIT 1; " ;
let mut stmt = tx . prepare_cached ( query ) ? ;
let fields = stmt . query_row ( params! [ before , before ] , | r | {
let rowid : u64 = r . get ( 0 ) ? ;
let rowname : String = r . get ( 1 ) ? ;
let eventid : Vec < u8 > = r . get ( 2 ) ? ;
let pubkey : Vec < u8 > = r . get ( 3 ) ? ;
let created_at : u64 = r . get ( 4 ) ? ;
// create a tuple since we can't throw non-rusqlite errors in this closure
Ok ( (
rowid ,
rowname ,
eventid ,
pubkey ,
created_at ,
r . get ( 5 ) . ok ( ) ,
r . get ( 6 ) . ok ( ) ,
r . get ( 7 ) ? ,
) )
} ) ? ;
let vr = VerificationRecord {
rowid : fields . 0 ,
name : Nip05Name ::try_from ( & fields . 1 [ .. ] ) ? ,
address : hex ::encode ( fields . 3 ) ,
event : hex ::encode ( fields . 2 ) ,
event_created : fields . 4 ,
last_success : fields . 5 ,
last_failure : fields . 6 ,
failure_count : fields . 7 ,
} ;
Ok ( vr )
} ) . await ?
2023-01-22 10:49:49 -05:00
}
}
/// Decide if there is an index that should be used explicitly
fn override_index ( f : & ReqFilter ) -> Option < String > {
2023-02-01 07:46:35 -05:00
if f . ids . is_some ( ) {
return Some ( " event_hash_index " . into ( ) ) ;
}
2023-01-22 10:49:49 -05:00
// queries for multiple kinds default to kind_index, which is
// significantly slower than kind_created_at_index.
if let Some ( ks ) = & f . kinds {
2023-01-22 11:06:44 -05:00
if f . ids . is_none ( ) & &
ks . len ( ) > 1 & &
f . since . is_none ( ) & &
f . until . is_none ( ) & &
f . tags . is_none ( ) & &
f . authors . is_none ( ) {
return Some ( " kind_created_at_index " . into ( ) ) ;
}
2023-01-22 10:49:49 -05:00
}
// if there is an author, it is much better to force the authors index.
if f . authors . is_some ( ) {
2023-01-30 19:02:40 -05:00
if f . since . is_none ( ) & & f . until . is_none ( ) & & f . limit . is_none ( ) {
2023-01-22 11:06:44 -05:00
if f . kinds . is_none ( ) {
// with no use of kinds/created_at, just author
return Some ( " author_index " . into ( ) ) ;
}
// prefer author_kind if there are kinds
return Some ( " author_kind_index " . into ( ) ) ;
}
// finally, prefer author_created_at if time is provided
return Some ( " author_created_at_index " . into ( ) ) ;
2023-01-22 10:49:49 -05:00
}
None
}
/// Create a dynamic SQL subquery and params from a subscription filter (and optional explicit index used)
fn query_from_filter ( f : & ReqFilter ) -> ( String , Vec < Box < dyn ToSql > > , Option < String > ) {
// build a dynamic SQL query. all user-input is either an integer
// (sqli-safe), or a string that is filtered to only contain
// hexadecimal characters. Strings that require escaping (tag
// names/values) use parameters.
// if the filter is malformed, don't return anything.
if f . force_no_match {
2023-01-30 19:02:40 -05:00
let empty_query = " SELECT e.content FROM event e WHERE 1=0 " . to_owned ( ) ;
2023-01-22 10:49:49 -05:00
// query parameters for SQLite
let empty_params : Vec < Box < dyn ToSql > > = vec! [ ] ;
return ( empty_query , empty_params , None ) ;
}
// check if the index needs to be overriden
let idx_name = override_index ( f ) ;
2023-02-06 07:43:09 -05:00
let idx_stmt = idx_name . as_ref ( ) . map_or_else ( | | " " . to_owned ( ) , | i | format! ( " INDEXED BY {i} " ) ) ;
let mut query = format! ( " SELECT e.content FROM event e {idx_stmt} " ) ;
2023-01-22 10:49:49 -05:00
// query parameters for SQLite
let mut params : Vec < Box < dyn ToSql > > = vec! [ ] ;
// individual filter components (single conditions such as an author or event ID)
let mut filter_components : Vec < String > = Vec ::new ( ) ;
// Query for "authors", allowing prefix matches
if let Some ( authvec ) = & f . authors {
// take each author and convert to a hexsearch
let mut auth_searches : Vec < String > = vec! [ ] ;
for auth in authvec {
match hex_range ( auth ) {
Some ( HexSearch ::Exact ( ex ) ) = > {
auth_searches . push ( " author=? " . to_owned ( ) ) ;
params . push ( Box ::new ( ex ) ) ;
}
Some ( HexSearch ::Range ( lower , upper ) ) = > {
auth_searches . push (
" (author>? AND author<?) " . to_owned ( ) ,
) ;
params . push ( Box ::new ( lower ) ) ;
params . push ( Box ::new ( upper ) ) ;
}
Some ( HexSearch ::LowerOnly ( lower ) ) = > {
auth_searches . push ( " author>? " . to_owned ( ) ) ;
params . push ( Box ::new ( lower ) ) ;
}
None = > {
info! ( " Could not parse hex range from author {:?} " , auth ) ;
}
}
}
if ! authvec . is_empty ( ) {
2023-01-22 11:06:44 -05:00
let auth_clause = format! ( " ( {} ) " , auth_searches . join ( " OR " ) ) ;
2023-01-22 10:49:49 -05:00
filter_components . push ( auth_clause ) ;
} else {
2023-01-22 11:06:44 -05:00
filter_components . push ( " false " . to_owned ( ) ) ;
}
2023-01-22 10:49:49 -05:00
}
// Query for Kind
if let Some ( ks ) = & f . kinds {
// kind is number, no escaping needed
let str_kinds : Vec < String > = ks . iter ( ) . map ( std ::string ::ToString ::to_string ) . collect ( ) ;
let kind_clause = format! ( " kind IN ( {} ) " , str_kinds . join ( " , " ) ) ;
filter_components . push ( kind_clause ) ;
}
// Query for event, allowing prefix matches
if let Some ( idvec ) = & f . ids {
// take each author and convert to a hexsearch
let mut id_searches : Vec < String > = vec! [ ] ;
for id in idvec {
match hex_range ( id ) {
Some ( HexSearch ::Exact ( ex ) ) = > {
id_searches . push ( " event_hash=? " . to_owned ( ) ) ;
params . push ( Box ::new ( ex ) ) ;
}
Some ( HexSearch ::Range ( lower , upper ) ) = > {
id_searches . push ( " (event_hash>? AND event_hash<?) " . to_owned ( ) ) ;
params . push ( Box ::new ( lower ) ) ;
params . push ( Box ::new ( upper ) ) ;
}
Some ( HexSearch ::LowerOnly ( lower ) ) = > {
id_searches . push ( " event_hash>? " . to_owned ( ) ) ;
params . push ( Box ::new ( lower ) ) ;
}
None = > {
info! ( " Could not parse hex range from id {:?} " , id ) ;
}
}
}
if idvec . is_empty ( ) {
2023-01-22 11:06:44 -05:00
// if the ids list was empty, we should never return
2023-01-22 10:49:49 -05:00
// any results.
filter_components . push ( " false " . to_owned ( ) ) ;
} else {
2023-01-22 11:06:44 -05:00
let id_clause = format! ( " ( {} ) " , id_searches . join ( " OR " ) ) ;
2023-01-22 10:49:49 -05:00
filter_components . push ( id_clause ) ;
}
}
// Query for tags
if let Some ( map ) = & f . tags {
for ( key , val ) in map . iter ( ) {
let mut str_vals : Vec < Box < dyn ToSql > > = vec! [ ] ;
for v in val {
2023-02-12 15:33:40 -05:00
str_vals . push ( Box ::new ( v . clone ( ) ) ) ;
2023-01-22 10:49:49 -05:00
}
2023-02-12 15:33:40 -05:00
// create clauses with "?" params for each tag value being searched
let str_clause = format! ( " AND value IN ( {} ) " , repeat_vars ( str_vals . len ( ) ) ) ;
// find evidence of the target tag name/value existing for this event.
// Query for Kind/Since/Until additionally, to reduce the number of tags that come back.
let kind_clause ;
let since_clause ;
let until_clause ;
if let Some ( ks ) = & f . kinds {
// kind is number, no escaping needed
let str_kinds : Vec < String > = ks . iter ( ) . map ( std ::string ::ToString ::to_string ) . collect ( ) ;
kind_clause = format! ( " AND kind IN ( {} ) " , str_kinds . join ( " , " ) ) ;
2023-02-01 08:13:29 -05:00
} else {
2023-02-12 15:33:40 -05:00
kind_clause = format! ( " " ) ;
} ;
if f . since . is_some ( ) {
since_clause = format! ( " AND created_at > {} " , f . since . unwrap ( ) ) ;
} else {
since_clause = format! ( " " ) ;
} ;
// Query for timestamp
if f . until . is_some ( ) {
until_clause = format! ( " AND created_at < {} " , f . until . unwrap ( ) ) ;
} else {
until_clause = format! ( " " ) ;
} ;
let tag_clause = format! (
" e.id IN (SELECT t.event_id FROM tag t WHERE (name=? {str_clause} {kind_clause} {since_clause} {until_clause})) "
) ;
// add the tag name as the first parameter
params . push ( Box ::new ( key . to_string ( ) ) ) ;
// add all tag values that are blobs as params
params . append ( & mut str_vals ) ;
filter_components . push ( tag_clause ) ;
2023-01-22 10:49:49 -05:00
}
}
// Query for timestamp
if f . since . is_some ( ) {
let created_clause = format! ( " created_at > {} " , f . since . unwrap ( ) ) ;
filter_components . push ( created_clause ) ;
}
// Query for timestamp
if f . until . is_some ( ) {
let until_clause = format! ( " created_at < {} " , f . until . unwrap ( ) ) ;
filter_components . push ( until_clause ) ;
}
// never display hidden events
query . push_str ( " WHERE hidden!=TRUE " ) ;
// build filter component conditions
if ! filter_components . is_empty ( ) {
query . push_str ( " AND " ) ;
query . push_str ( & filter_components . join ( " AND " ) ) ;
}
// Apply per-filter limit to this subquery.
// The use of a LIMIT implies a DESC order, to capture only the most recent events.
if let Some ( lim ) = f . limit {
2023-02-06 07:43:09 -05:00
let _ = write! ( query , " ORDER BY e.created_at DESC LIMIT {lim} " ) ;
2023-01-22 10:49:49 -05:00
} else {
query . push_str ( " ORDER BY e.created_at ASC " ) ;
}
( query , params , idx_name )
}
/// Create a dynamic SQL query string and params from a subscription.
2023-01-30 19:02:40 -05:00
fn _query_from_sub ( sub : & Subscription ) -> ( String , Vec < Box < dyn ToSql > > , Vec < String > ) {
2023-01-22 10:49:49 -05:00
// build a dynamic SQL query for an entire subscription, based on
// SQL subqueries for filters.
let mut subqueries : Vec < String > = Vec ::new ( ) ;
let mut indexes = vec! [ ] ;
// subquery params
let mut params : Vec < Box < dyn ToSql > > = vec! [ ] ;
// for every filter in the subscription, generate a subquery
for f in & sub . filters {
let ( f_subquery , mut f_params , index ) = query_from_filter ( f ) ;
2023-01-22 11:06:44 -05:00
if let Some ( i ) = index {
indexes . push ( i ) ;
}
2023-01-22 10:49:49 -05:00
subqueries . push ( f_subquery ) ;
params . append ( & mut f_params ) ;
}
// encapsulate subqueries into select statements
let subqueries_selects : Vec < String > = subqueries
. iter ( )
2023-02-06 07:43:09 -05:00
. map ( | s | format! ( " SELECT distinct content, created_at FROM ( {s} ) " ) )
2023-01-22 10:49:49 -05:00
. collect ( ) ;
let query : String = subqueries_selects . join ( " UNION " ) ;
( query , params , indexes )
}
/// Build a database connection pool.
/// # Panics
///
/// Will panic if the pool could not be created.
#[ must_use ]
pub fn build_pool (
name : & str ,
settings : & Settings ,
flags : OpenFlags ,
min_size : u32 ,
max_size : u32 ,
wait_for_db : bool ,
) -> SqlitePool {
let db_dir = & settings . database . data_directory ;
let full_path = Path ::new ( db_dir ) . join ( DB_FILE ) ;
2023-01-28 15:02:20 -05:00
2023-01-22 10:49:49 -05:00
// small hack; if the database doesn't exist yet, that means the
// writer thread hasn't finished. Give it a chance to work. This
// is only an issue with the first time we run.
if ! settings . database . in_memory {
while ! full_path . exists ( ) & & wait_for_db {
debug! ( " Database reader pool is waiting on the database to be created... " ) ;
thread ::sleep ( Duration ::from_millis ( 500 ) ) ;
}
}
let manager = if settings . database . in_memory {
SqliteConnectionManager ::memory ( )
. with_flags ( flags )
. with_init ( | c | c . execute_batch ( STARTUP_SQL ) )
} else {
SqliteConnectionManager ::file ( & full_path )
. with_flags ( flags )
. with_init ( | c | c . execute_batch ( STARTUP_SQL ) )
} ;
let pool : SqlitePool = r2d2 ::Pool ::builder ( )
. test_on_check_out ( true ) // no noticeable performance hit
. min_idle ( Some ( min_size ) )
. max_size ( max_size )
. max_lifetime ( Some ( Duration ::from_secs ( 30 ) ) )
. build ( manager )
. unwrap ( ) ;
info! (
" Built a connection pool {:?} (min={}, max={}) " ,
name , min_size , max_size
) ;
pool
}
/// Perform database WAL checkpoint on a regular basis
pub async fn db_checkpoint_task ( pool : SqlitePool , frequency : Duration , checkpoint_in_progress : Arc < Mutex < u64 > > ) -> Result < ( ) > {
2023-01-31 19:09:23 -05:00
// TODO; use acquire_many on the reader semaphore to stop them from interrupting this.
2023-01-22 10:49:49 -05:00
tokio ::task ::spawn ( async move {
2023-01-22 11:06:44 -05:00
// WAL size in pages.
let mut current_wal_size = 0 ;
// WAL threshold for more aggressive checkpointing (10,000 pages, or about 40MB)
let wal_threshold = 1000 * 10 ;
// default threshold for the busy timer
let busy_wait_default = Duration ::from_secs ( 1 ) ;
// if the WAL file is getting too big, switch to this
let busy_wait_default_long = Duration ::from_secs ( 10 ) ;
2023-01-22 10:49:49 -05:00
loop {
tokio ::select! {
_ = tokio ::time ::sleep ( frequency ) = > {
if let Ok ( mut conn ) = pool . get ( ) {
let mut _guard :Option < MutexGuard < u64 > > = None ;
2023-01-22 11:06:44 -05:00
// the busy timer will block writers, so don't set
// this any higher than you want max latency for event
// writes.
if current_wal_size < = wal_threshold {
conn . busy_timeout ( busy_wait_default ) . ok ( ) ;
} else {
// if the wal size has exceeded a threshold, increase the busy timeout.
conn . busy_timeout ( busy_wait_default_long ) . ok ( ) ;
// take a lock that will prevent new readers.
info! ( " blocking new readers to perform wal_checkpoint " ) ;
_guard = Some ( checkpoint_in_progress . lock ( ) . await ) ;
}
2023-01-22 10:49:49 -05:00
debug! ( " running wal_checkpoint(TRUNCATE) " ) ;
if let Ok ( new_size ) = checkpoint_db ( & mut conn ) {
2023-01-22 11:06:44 -05:00
current_wal_size = new_size ;
}
2023-01-22 10:49:49 -05:00
}
2023-01-22 11:06:44 -05:00
}
2023-01-22 10:49:49 -05:00
} ;
}
} ) ;
Ok ( ( ) )
}
#[ derive(Debug) ]
enum SqliteStatus {
Ok ,
Busy ,
Error ,
Other ( u64 ) ,
}
/// Checkpoint/Truncate WAL. Returns the number of WAL pages remaining.
pub fn checkpoint_db ( conn : & mut PooledConnection ) -> Result < usize > {
let query = " PRAGMA wal_checkpoint(TRUNCATE); " ;
let start = Instant ::now ( ) ;
let ( cp_result , wal_size , _frames_checkpointed ) = conn . query_row ( query , [ ] , | row | {
let checkpoint_result : u64 = row . get ( 0 ) ? ;
let wal_size : u64 = row . get ( 1 ) ? ;
let frames_checkpointed : u64 = row . get ( 2 ) ? ;
Ok ( ( checkpoint_result , wal_size , frames_checkpointed ) )
} ) ? ;
let result = match cp_result {
0 = > SqliteStatus ::Ok ,
1 = > SqliteStatus ::Busy ,
2 = > SqliteStatus ::Error ,
x = > SqliteStatus ::Other ( x ) ,
} ;
info! (
" checkpoint ran in {:?} (result: {:?}, WAL size: {}) " ,
start . elapsed ( ) ,
result ,
wal_size
) ;
Ok ( wal_size as usize )
}
/// Produce a arbitrary list of '?' parameters.
fn repeat_vars ( count : usize ) -> String {
if count = = 0 {
return " " . to_owned ( ) ;
}
let mut s = " ?, " . repeat ( count ) ;
// Remove trailing comma
s . pop ( ) ;
s
}
/// Display database pool stats every 1 minute
pub async fn monitor_pool ( name : & str , pool : SqlitePool ) {
let sleep_dur = Duration ::from_secs ( 60 ) ;
loop {
2023-01-22 11:06:44 -05:00
log_pool_stats ( name , & pool ) ;
tokio ::time ::sleep ( sleep_dur ) . await ;
2023-01-22 10:49:49 -05:00
}
}
/// Log pool stats
fn log_pool_stats ( name : & str , pool : & SqlitePool ) {
let state : r2d2 ::State = pool . state ( ) ;
let in_use_cxns = state . connections - state . idle_connections ;
debug! (
" DB pool {:?} usage (in_use: {}, available: {}, max: {}) " ,
name ,
in_use_cxns ,
state . connections ,
2023-01-22 11:06:44 -05:00
pool . max_size ( )
2023-01-22 10:49:49 -05:00
) ;
}
/// Check if the pool is fully utilized
fn _pool_at_capacity ( pool : & SqlitePool ) -> bool {
let state : r2d2 ::State = pool . state ( ) ;
state . idle_connections = = 0
}