2021-12-11 22:43:41 -05:00
//! Event persistence and querying
2022-09-06 07:12:07 -04:00
//use crate::config::SETTINGS;
use crate ::config ::Settings ;
use crate ::error ::{ Error , Result } ;
2022-08-17 19:34:11 -04:00
use crate ::event ::{ single_char_tagname , Event } ;
2022-02-12 10:29:38 -05:00
use crate ::hexrange ::hex_range ;
use crate ::hexrange ::HexSearch ;
2022-02-12 10:29:25 -05:00
use crate ::nip05 ;
2022-11-10 17:27:10 -05:00
use crate ::notice ::Notice ;
2022-02-12 10:58:42 -05:00
use crate ::schema ::{ upgrade_db , STARTUP_SQL } ;
2022-07-04 18:25:32 -04:00
use crate ::subscription ::ReqFilter ;
2021-12-11 16:48:59 -05:00
use crate ::subscription ::Subscription ;
2022-08-17 19:34:11 -04:00
use crate ::utils ::{ is_hex , is_lower_hex } ;
2021-12-30 22:07:21 -05:00
use governor ::clock ::Clock ;
use governor ::{ Quota , RateLimiter } ;
2021-12-11 16:48:59 -05:00
use hex ;
2022-01-25 21:39:24 -05:00
use r2d2 ;
use r2d2_sqlite ::SqliteConnectionManager ;
2022-02-12 10:29:25 -05:00
use rusqlite ::params ;
2022-01-22 22:29:15 -05:00
use rusqlite ::types ::ToSql ;
2022-02-12 10:29:25 -05:00
use rusqlite ::OpenFlags ;
2022-12-27 10:48:07 -05:00
use tokio ::sync ::{ Mutex , MutexGuard } ;
2022-09-02 13:38:31 -04:00
use std ::fmt ::Write as _ ;
2021-12-11 16:48:59 -05:00
use std ::path ::Path ;
2022-12-27 10:48:07 -05:00
use std ::sync ::Arc ;
2021-12-30 22:07:21 -05:00
use std ::thread ;
2022-01-26 22:39:03 -05:00
use std ::time ::Duration ;
2022-01-01 20:25:09 -05:00
use std ::time ::Instant ;
2021-12-11 16:48:59 -05:00
use tokio ::task ;
2022-09-28 08:19:59 -04:00
use tracing ::{ debug , info , trace , warn } ;
2021-12-11 16:48:59 -05:00
2022-01-25 21:39:24 -05:00
pub type SqlitePool = r2d2 ::Pool < r2d2_sqlite ::SqliteConnectionManager > ;
2022-02-12 10:29:25 -05:00
pub type PooledConnection = r2d2 ::PooledConnection < r2d2_sqlite ::SqliteConnectionManager > ;
2022-02-13 10:35:54 -05:00
/// Events submitted from a client, with a return channel for notices
pub struct SubmittedEvent {
pub event : Event ,
2022-11-10 17:27:10 -05:00
pub notice_tx : tokio ::sync ::mpsc ::Sender < Notice > ,
2022-02-13 10:35:54 -05:00
}
2021-12-11 22:43:41 -05:00
/// Database file
2022-02-12 10:29:25 -05:00
pub const DB_FILE : & str = " nostr.db " ;
2022-12-22 12:40:17 -05:00
2022-12-26 11:03:51 -05:00
/// How frequently to attempt checkpointing
pub const CHECKPOINT_FREQ_SEC : u64 = 60 ;
2022-12-22 12:40:17 -05:00
2022-02-12 10:29:35 -05:00
/// Build a database connection pool.
2022-09-24 10:19:16 -04:00
/// # Panics
///
/// Will panic if the pool could not be created.
#[ must_use ]
2022-02-12 10:29:25 -05:00
pub fn build_pool (
name : & str ,
2022-09-24 10:19:16 -04:00
settings : & Settings ,
2022-02-12 10:29:25 -05:00
flags : OpenFlags ,
min_size : u32 ,
max_size : u32 ,
wait_for_db : bool ,
) -> SqlitePool {
let db_dir = & settings . database . data_directory ;
2022-01-25 21:39:24 -05:00
let full_path = Path ::new ( db_dir ) . join ( DB_FILE ) ;
2022-01-26 22:39:03 -05:00
// small hack; if the database doesn't exist yet, that means the
// writer thread hasn't finished. Give it a chance to work. This
// is only an issue with the first time we run.
2022-09-06 07:06:01 -04:00
if ! settings . database . in_memory {
while ! full_path . exists ( ) & & wait_for_db {
debug! ( " Database reader pool is waiting on the database to be created... " ) ;
thread ::sleep ( Duration ::from_millis ( 500 ) ) ;
}
2022-01-26 22:39:03 -05:00
}
2022-09-24 20:28:02 -04:00
let manager = if settings . database . in_memory {
SqliteConnectionManager ::memory ( )
2022-09-06 07:06:01 -04:00
. with_flags ( flags )
2022-09-24 20:28:02 -04:00
. with_init ( | c | c . execute_batch ( STARTUP_SQL ) )
} else {
SqliteConnectionManager ::file ( & full_path )
2022-09-06 07:06:01 -04:00
. with_flags ( flags )
2022-09-24 20:28:02 -04:00
. with_init ( | c | c . execute_batch ( STARTUP_SQL ) )
2022-09-06 07:06:01 -04:00
} ;
2022-01-25 21:39:24 -05:00
let pool : SqlitePool = r2d2 ::Pool ::builder ( )
. test_on_check_out ( true ) // no noticeable performance hit
2022-02-12 10:29:25 -05:00
. min_idle ( Some ( min_size ) )
. max_size ( max_size )
2022-12-22 14:01:12 -05:00
. max_lifetime ( Some ( Duration ::from_secs ( 30 ) ) )
2022-01-25 21:39:24 -05:00
. build ( manager )
. unwrap ( ) ;
2022-01-25 22:42:43 -05:00
info! (
2022-02-12 10:29:25 -05:00
" Built a connection pool {:?} (min={}, max={}) " ,
name , min_size , max_size
2022-01-25 22:42:43 -05:00
) ;
2022-02-12 10:29:25 -05:00
pool
}
2022-12-25 11:43:47 -05:00
/// Display database pool stats every 1 minute
pub async fn monitor_pool ( name : & str , pool : SqlitePool ) {
let sleep_dur = Duration ::from_secs ( 60 ) ;
loop {
log_pool_stats ( name , & pool ) ;
tokio ::time ::sleep ( sleep_dur ) . await ;
}
}
2022-12-18 00:18:54 -05:00
/// Perform normal maintenance
pub fn optimize_db ( conn : & mut PooledConnection ) -> Result < ( ) > {
2022-12-22 16:16:21 -05:00
let start = Instant ::now ( ) ;
2022-12-18 00:18:54 -05:00
conn . execute_batch ( " PRAGMA optimize; " ) ? ;
2022-12-22 16:16:21 -05:00
info! ( " optimize ran in {:?} " , start . elapsed ( ) ) ;
2022-12-18 00:18:54 -05:00
Ok ( ( ) )
}
2022-12-22 11:11:05 -05:00
#[ derive(Debug) ]
2022-12-24 11:29:47 -05:00
enum SqliteStatus {
Ok ,
Busy ,
Error ,
Other ( u64 ) ,
2022-12-22 11:11:05 -05:00
}
2022-12-26 11:03:51 -05:00
/// Checkpoint/Truncate WAL. Returns the number of WAL pages remaining.
pub fn checkpoint_db ( conn : & mut PooledConnection ) -> Result < usize > {
2022-12-22 11:11:05 -05:00
let query = " PRAGMA wal_checkpoint(TRUNCATE); " ;
let start = Instant ::now ( ) ;
let ( cp_result , wal_size , _frames_checkpointed ) = conn . query_row ( query , [ ] , | row | {
let checkpoint_result : u64 = row . get ( 0 ) ? ;
let wal_size : u64 = row . get ( 1 ) ? ;
let frames_checkpointed : u64 = row . get ( 2 ) ? ;
Ok ( ( checkpoint_result , wal_size , frames_checkpointed ) )
} ) ? ;
let result = match cp_result {
2022-12-24 11:29:47 -05:00
0 = > SqliteStatus ::Ok ,
1 = > SqliteStatus ::Busy ,
2 = > SqliteStatus ::Error ,
x = > SqliteStatus ::Other ( x ) ,
2022-12-22 11:11:05 -05:00
} ;
info! (
" checkpoint ran in {:?} (result: {:?}, WAL size: {}) " ,
start . elapsed ( ) ,
result ,
wal_size
) ;
2022-12-26 11:03:51 -05:00
Ok ( wal_size as usize )
2022-12-22 11:11:05 -05:00
}
2022-12-18 00:18:54 -05:00
2021-12-11 16:48:59 -05:00
/// Spawn a database writer that persists events to the SQLite store.
pub async fn db_writer (
2022-09-06 07:12:07 -04:00
settings : Settings ,
2022-02-13 10:35:54 -05:00
mut event_rx : tokio ::sync ::mpsc ::Receiver < SubmittedEvent > ,
2021-12-12 11:20:23 -05:00
bcast_tx : tokio ::sync ::broadcast ::Sender < Event > ,
2022-02-12 10:29:25 -05:00
metadata_tx : tokio ::sync ::broadcast ::Sender < Event > ,
2021-12-30 22:07:21 -05:00
mut shutdown : tokio ::sync ::broadcast ::Receiver < ( ) > ,
2021-12-11 16:48:59 -05:00
) -> tokio ::task ::JoinHandle < Result < ( ) > > {
2022-02-12 10:29:25 -05:00
// are we performing NIP-05 checking?
let nip05_active = settings . verified_users . is_active ( ) ;
// are we requriing NIP-05 user verification?
let nip05_enabled = settings . verified_users . is_enabled ( ) ;
2021-12-11 16:48:59 -05:00
task ::spawn_blocking ( move | | {
2022-02-12 10:29:25 -05:00
let db_dir = & settings . database . data_directory ;
2021-12-31 12:51:57 -05:00
let full_path = Path ::new ( db_dir ) . join ( DB_FILE ) ;
2022-02-12 10:29:25 -05:00
// create a connection pool
let pool = build_pool (
" event writer " ,
2022-09-24 10:19:16 -04:00
& settings ,
2021-12-11 16:48:59 -05:00
OpenFlags ::SQLITE_OPEN_READ_WRITE | OpenFlags ::SQLITE_OPEN_CREATE ,
2022-02-12 10:29:25 -05:00
1 ,
2022-12-18 23:32:31 -05:00
2 ,
2022-02-12 10:29:25 -05:00
false ,
) ;
2022-09-06 07:12:07 -04:00
if settings . database . in_memory {
info! ( " using in-memory database, this will not persist a restart! " ) ;
} else {
info! ( " opened database {:?} for writing " , full_path ) ;
}
2022-02-12 10:29:25 -05:00
upgrade_db ( & mut pool . get ( ) ? ) ? ;
2022-01-22 22:29:15 -05:00
2022-01-26 22:39:03 -05:00
// Make a copy of the whitelist
2022-02-12 10:29:25 -05:00
let whitelist = & settings . authorization . pubkey_whitelist . clone ( ) ;
2022-01-26 22:39:03 -05:00
2021-12-30 22:07:21 -05:00
// get rate limit settings
2022-02-12 10:29:25 -05:00
let rps_setting = settings . limits . messages_per_sec ;
2022-01-05 18:30:47 -05:00
let mut most_recent_rate_limit = Instant ::now ( ) ;
2021-12-30 22:07:21 -05:00
let mut lim_opt = None ;
let clock = governor ::clock ::QuantaClock ::default ( ) ;
if let Some ( rps ) = rps_setting {
if rps > 0 {
info! ( " Enabling rate limits for event creation ({}/sec) " , rps ) ;
let quota = core ::num ::NonZeroU32 ::new ( rps * 60 ) . unwrap ( ) ;
lim_opt = Some ( RateLimiter ::direct ( Quota ::per_minute ( quota ) ) ) ;
}
}
2021-12-11 16:48:59 -05:00
loop {
2021-12-31 16:19:35 -05:00
if shutdown . try_recv ( ) . is_ok ( ) {
2021-12-30 22:07:21 -05:00
info! ( " shutting down database writer " ) ;
break ;
}
2021-12-11 16:48:59 -05:00
// call blocking read on channel
let next_event = event_rx . blocking_recv ( ) ;
// if the channel has closed, we will never get work
if next_event . is_none ( ) {
break ;
}
2022-10-16 16:25:06 -04:00
// track if an event write occurred; this is used to
// update the rate limiter
2021-12-30 22:07:21 -05:00
let mut event_write = false ;
2022-02-13 10:35:54 -05:00
let subm_event = next_event . unwrap ( ) ;
let event = subm_event . event ;
let notice_tx = subm_event . notice_tx ;
2022-01-26 22:39:03 -05:00
// check if this event is authorized.
if let Some ( allowed_addrs ) = whitelist {
2022-10-16 16:25:06 -04:00
// TODO: incorporate delegated pubkeys
2022-01-26 22:39:03 -05:00
// if the event address is not in allowed_addrs.
if ! allowed_addrs . contains ( & event . pubkey ) {
info! (
" Rejecting event {}, unauthorized author " ,
event . get_event_id_prefix ( )
) ;
2022-02-13 10:35:54 -05:00
notice_tx
2022-11-10 17:27:10 -05:00
. try_send ( Notice ::blocked (
event . id ,
" pubkey is not allowed to publish to this relay " ,
) )
2022-02-13 10:35:54 -05:00
. ok ( ) ;
2022-01-26 22:39:03 -05:00
continue ;
}
}
2022-12-24 18:16:55 -05:00
// Check that event kind isn't blacklisted
let kinds_blacklist = & settings . limits . event_kind_blacklist . clone ( ) ;
if let Some ( event_kind_blacklist ) = kinds_blacklist {
if event_kind_blacklist . contains ( & event . kind ) {
info! (
" Rejecting event {}, blacklisted kind " ,
& event . get_event_id_prefix ( )
) ;
notice_tx
. try_send ( Notice ::blocked (
event . id ,
" event kind is blocked by relay "
) )
. ok ( ) ;
continue ;
}
}
2022-02-12 10:29:25 -05:00
// send any metadata events to the NIP-05 verifier
if nip05_active & & event . is_kind_metadata ( ) {
// we are sending this prior to even deciding if we
// persist it. this allows the nip05 module to
// inspect it, update if necessary, or persist a new
// event and broadcast it itself.
metadata_tx . send ( event . clone ( ) ) . ok ( ) ;
}
// check for NIP-05 verification
if nip05_enabled {
match nip05 ::query_latest_user_verification ( pool . get ( ) ? , event . pubkey . to_owned ( ) ) {
Ok ( uv ) = > {
2022-09-06 07:12:07 -04:00
if uv . is_valid ( & settings . verified_users ) {
2022-02-12 10:29:25 -05:00
info! (
" new event from verified author ({:?},{:?}) " ,
uv . name . to_string ( ) ,
event . get_author_prefix ( )
) ;
} else {
2022-12-22 16:15:45 -05:00
info! (
" rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain) " ,
uv . name . to_string ( ) ,
event . get_author_prefix ( )
2022-02-12 10:29:25 -05:00
) ;
2022-02-13 10:35:54 -05:00
notice_tx
2022-11-10 17:27:10 -05:00
. try_send ( Notice ::blocked (
event . id ,
" NIP-05 verification is no longer valid (expired/wrong domain) " ,
) )
2022-02-13 10:35:54 -05:00
. ok ( ) ;
2022-02-12 10:29:25 -05:00
continue ;
}
}
Err ( Error ::SqlError ( rusqlite ::Error ::QueryReturnedNoRows ) ) = > {
debug! (
" no verification records found for pubkey: {:?} " ,
event . get_author_prefix ( )
) ;
2022-02-13 10:35:54 -05:00
notice_tx
2022-11-10 17:27:10 -05:00
. try_send ( Notice ::blocked (
event . id ,
" NIP-05 verification needed to publish events " ,
) )
2022-02-13 10:35:54 -05:00
. ok ( ) ;
2022-02-12 10:29:25 -05:00
continue ;
}
Err ( e ) = > {
warn! ( " checking nip05 verification status failed: {:?} " , e ) ;
continue ;
}
}
}
// TODO: cache recent list of authors to remove a DB call.
2022-01-01 20:25:09 -05:00
let start = Instant ::now ( ) ;
2022-05-30 18:03:00 -04:00
if event . kind > = 20000 & & event . kind < 30000 {
2022-11-04 08:55:38 -04:00
bcast_tx . send ( event . clone ( ) ) . ok ( ) ;
2022-05-30 18:03:00 -04:00
info! (
2022-11-05 17:11:20 -04:00
" published ephemeral event: {:?} from: {:?} in: {:?} " ,
2022-05-30 18:03:00 -04:00
event . get_event_id_prefix ( ) ,
event . get_author_prefix ( ) ,
start . elapsed ( )
) ;
event_write = true
} else {
2022-12-20 14:21:57 -05:00
log_pool_stats ( " writer " , & pool ) ;
2022-05-30 18:03:00 -04:00
match write_event ( & mut pool . get ( ) ? , & event ) {
Ok ( updated ) = > {
if updated = = 0 {
2022-10-08 13:12:41 -04:00
trace! ( " ignoring duplicate or deleted event " ) ;
2022-11-10 17:27:10 -05:00
notice_tx . try_send ( Notice ::duplicate ( event . id ) ) . ok ( ) ;
2022-05-30 18:03:00 -04:00
} else {
info! (
2022-11-05 17:11:20 -04:00
" persisted event: {:?} from: {:?} in: {:?} " ,
2022-05-30 18:03:00 -04:00
event . get_event_id_prefix ( ) ,
event . get_author_prefix ( ) ,
start . elapsed ( )
) ;
event_write = true ;
// send this out to all clients
bcast_tx . send ( event . clone ( ) ) . ok ( ) ;
2022-11-10 17:27:10 -05:00
notice_tx . try_send ( Notice ::saved ( event . id ) ) . ok ( ) ;
2022-05-30 18:03:00 -04:00
}
}
Err ( err ) = > {
warn! ( " event insert failed: {:?} " , err ) ;
2022-11-10 17:27:10 -05:00
let msg = " relay experienced an error trying to publish the latest event " ;
notice_tx . try_send ( Notice ::error ( event . id , msg ) ) . ok ( ) ;
2021-12-11 16:48:59 -05:00
}
}
}
2022-02-12 10:29:25 -05:00
2021-12-30 22:07:21 -05:00
// use rate limit, if defined, and if an event was actually written.
if event_write {
if let Some ( ref lim ) = lim_opt {
if let Err ( n ) = lim . check ( ) {
2022-01-05 18:30:47 -05:00
let wait_for = n . wait_time_from ( clock . now ( ) ) ;
// check if we have recently logged rate
// limits, but print out a message only once
// per second.
2022-02-12 10:29:25 -05:00
if most_recent_rate_limit . elapsed ( ) . as_secs ( ) > 10 {
2022-01-05 18:30:47 -05:00
warn! (
2022-02-12 10:29:25 -05:00
" rate limit reached for event creation (sleep for {:?}) (suppressing future messages for 10 seconds) " ,
2022-01-05 18:30:47 -05:00
wait_for
) ;
// reset last rate limit message
most_recent_rate_limit = Instant ::now ( ) ;
}
// block event writes, allowing them to queue up
thread ::sleep ( wait_for ) ;
2021-12-30 22:07:21 -05:00
continue ;
}
}
}
2021-12-11 16:48:59 -05:00
}
info! ( " database connection closed " ) ;
Ok ( ( ) )
} )
}
2022-02-12 10:29:35 -05:00
/// Persist an event to the database, returning rows added.
2022-02-12 10:29:25 -05:00
pub fn write_event ( conn : & mut PooledConnection , e : & Event ) -> Result < usize > {
2023-01-03 18:32:55 -05:00
// enable auto vacuum
conn . execute_batch ( " pragma auto_vacuum = FULL " ) ? ;
2021-12-11 16:48:59 -05:00
// start transaction
let tx = conn . transaction ( ) ? ;
// get relevant fields from event and convert to blobs.
let id_blob = hex ::decode ( & e . id ) . ok ( ) ;
2022-10-16 16:25:06 -04:00
let pubkey_blob : Option < Vec < u8 > > = hex ::decode ( & e . pubkey ) . ok ( ) ;
let delegator_blob : Option < Vec < u8 > > = e . delegated_by . as_ref ( ) . and_then ( | d | hex ::decode ( d ) . ok ( ) ) ;
2021-12-11 16:48:59 -05:00
let event_str = serde_json ::to_string ( & e ) . ok ( ) ;
2023-01-15 10:18:53 -05:00
// check for replaceable events that would hide this one.
2021-12-11 22:43:41 -05:00
// ignore if the event hash is a duplicate.
2022-10-08 13:12:41 -04:00
let mut ins_count = tx . execute (
2022-10-16 16:25:06 -04:00
" INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, delegated_by, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%s','now'), FALSE); " ,
params! [ id_blob , e . created_at , e . kind , pubkey_blob , delegator_blob , event_str ]
2021-12-11 16:48:59 -05:00
) ? ;
2021-12-11 22:43:41 -05:00
if ins_count = = 0 {
// if the event was a duplicate, no need to insert event or
2022-12-20 14:23:04 -05:00
// pubkey references.
tx . rollback ( ) . ok ( ) ;
2021-12-11 22:43:41 -05:00
return Ok ( ins_count ) ;
}
// remember primary key of the event most recently inserted.
2021-12-11 16:48:59 -05:00
let ev_id = tx . last_insert_rowid ( ) ;
2022-01-22 22:29:15 -05:00
// add all tags to the tag table
for tag in e . tags . iter ( ) {
// ensure we have 2 values.
if tag . len ( ) > = 2 {
let tagname = & tag [ 0 ] ;
let tagval = & tag [ 1 ] ;
2022-08-17 19:34:11 -04:00
// only single-char tags are searchable
let tagchar_opt = single_char_tagname ( tagname ) ;
match & tagchar_opt {
Some ( _ ) = > {
// if tagvalue is lowercase hex;
2022-09-02 11:30:51 -04:00
if is_lower_hex ( tagval ) & & ( tagval . len ( ) % 2 = = 0 ) {
2022-08-17 19:34:11 -04:00
tx . execute (
2022-12-22 16:15:45 -05:00
" INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3) " ,
params! [ ev_id , & tagname , hex ::decode ( tagval ) . ok ( ) ] ,
) ? ;
2022-08-17 19:34:11 -04:00
} else {
tx . execute (
" INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3) " ,
params! [ ev_id , & tagname , & tagval ] ,
) ? ;
}
}
None = > { }
2022-01-22 22:29:15 -05:00
}
2021-12-11 16:48:59 -05:00
}
}
2022-05-30 18:03:00 -04:00
// if this event is replaceable update, hide every other replaceable
// event with the same kind from the same author that was issued
// earlier than this.
2023-01-15 10:18:53 -05:00
if e . is_replaceable ( ) {
2023-01-02 18:18:11 -05:00
let author = hex ::decode ( & e . pubkey ) . ok ( ) ;
// this is a backwards check - hide any events that were older.
2021-12-30 16:45:03 -05:00
let update_count = tx . execute (
2023-01-02 18:18:11 -05:00
" UPDATE event SET hidden=TRUE WHERE hidden!=TRUE and kind=? and author=? and id NOT IN (SELECT id FROM event WHERE kind=? AND author=? ORDER BY created_at DESC LIMIT 1) " ,
params! [ e . kind , author , e . kind , author ] ,
2021-12-30 16:45:03 -05:00
) ? ;
if update_count > 0 {
2022-02-12 10:29:25 -05:00
info! (
2022-10-08 13:12:41 -04:00
" hid {} older replaceable kind {} events for author: {:?} " ,
2022-02-12 10:29:25 -05:00
update_count ,
2022-05-30 18:03:00 -04:00
e . kind ,
2022-02-12 10:29:25 -05:00
e . get_author_prefix ( )
) ;
2021-12-30 16:45:03 -05:00
}
}
2022-02-27 12:34:10 -05:00
// if this event is a deletion, hide the referenced events from the same author.
if e . kind = = 5 {
let event_candidates = e . tag_values_by_name ( " e " ) ;
2022-09-02 13:38:31 -04:00
// first parameter will be author
2022-09-02 11:30:51 -04:00
let mut params : Vec < Box < dyn ToSql > > = vec! [ Box ::new ( hex ::decode ( & e . pubkey ) ? ) ] ;
2022-02-27 12:34:10 -05:00
event_candidates
. iter ( )
. filter ( | x | is_hex ( x ) & & x . len ( ) = = 64 )
. filter_map ( | x | hex ::decode ( x ) . ok ( ) )
. for_each ( | x | params . push ( Box ::new ( x ) ) ) ;
let query = format! (
2022-10-08 09:02:16 -04:00
" UPDATE event SET hidden=TRUE WHERE kind!=5 AND author=? AND event_hash IN ({}) " ,
2022-02-27 12:34:10 -05:00
repeat_vars ( params . len ( ) - 1 )
) ;
let mut stmt = tx . prepare ( & query ) ? ;
let update_count = stmt . execute ( rusqlite ::params_from_iter ( params ) ) ? ;
info! (
" hid {} deleted events for author {:?} " ,
update_count ,
e . get_author_prefix ( )
) ;
2022-10-08 13:12:41 -04:00
} else {
// check if a deletion has already been recorded for this event.
// Only relevant for non-deletion events
let del_count = tx . query_row (
" SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND t.name='e' AND e.kind=5 AND t.value_hex=? LIMIT 1; " ,
params! [ pubkey_blob , id_blob ] , | row | row . get ::< usize , usize > ( 0 ) ) ;
// check if a the query returned a result, meaning we should
// hid the current event
if del_count . ok ( ) . is_some ( ) {
// a deletion already existed, mark original event as hidden.
info! (
" hid event: {:?} due to existing deletion by author: {:?} " ,
e . get_event_id_prefix ( ) ,
e . get_author_prefix ( )
) ;
let _update_count =
tx . execute ( " UPDATE event SET hidden=TRUE WHERE id=? " , params! [ ev_id ] ) ? ;
// event was deleted, so let caller know nothing new
// arrived, preventing this from being sent to active
// subscriptions
ins_count = 0 ;
}
2022-02-27 12:34:10 -05:00
}
2021-12-11 16:48:59 -05:00
tx . commit ( ) ? ;
Ok ( ins_count )
}
2022-02-12 10:29:35 -05:00
/// Serialized event associated with a specific subscription request.
2022-09-24 09:30:22 -04:00
#[ derive(PartialEq, Eq, Debug, Clone) ]
2021-12-11 16:48:59 -05:00
pub struct QueryResult {
2021-12-11 22:43:41 -05:00
/// Subscription identifier
2021-12-11 16:48:59 -05:00
pub sub_id : String ,
2021-12-11 22:43:41 -05:00
/// Serialized event
2021-12-11 16:48:59 -05:00
pub event : String ,
}
2022-02-12 10:29:35 -05:00
/// Produce a arbitrary list of '?' parameters.
2022-01-22 22:29:15 -05:00
fn repeat_vars ( count : usize ) -> String {
if count = = 0 {
return " " . to_owned ( ) ;
}
let mut s = " ?, " . repeat ( count ) ;
// Remove trailing comma
s . pop ( ) ;
s
}
2023-01-06 13:17:30 -05:00
/// Decide if there is an index that should be used explicitly
fn override_index ( f : & ReqFilter ) -> Option < String > {
// queries for multiple kinds default to kind_index, which is
// significantly slower than kind_created_at_index.
if let Some ( ks ) = & f . kinds {
if f . ids . is_none ( ) & &
ks . len ( ) > 1 & &
f . since . is_none ( ) & &
f . until . is_none ( ) & &
2023-01-06 13:57:48 -05:00
f . tags . is_none ( ) & &
2023-01-06 13:17:30 -05:00
f . authors . is_none ( ) {
return Some ( " kind_created_at_index " . into ( ) ) ;
}
}
None
}
2023-01-06 13:39:40 -05:00
/// Create a dynamic SQL subquery and params from a subscription filter (and optional explicit index used)
fn query_from_filter ( f : & ReqFilter ) -> ( String , Vec < Box < dyn ToSql > > , Option < String > ) {
2021-12-11 16:48:59 -05:00
// build a dynamic SQL query. all user-input is either an integer
// (sqli-safe), or a string that is filtered to only contain
2022-01-22 22:29:15 -05:00
// hexadecimal characters. Strings that require escaping (tag
// names/values) use parameters.
2022-08-07 11:15:36 -04:00
// if the filter is malformed, don't return anything.
if f . force_no_match {
2022-12-17 08:49:28 -05:00
let empty_query = " SELECT e.content, e.created_at FROM event e WHERE 1=0 " . to_owned ( ) ;
2022-08-07 11:15:36 -04:00
// query parameters for SQLite
let empty_params : Vec < Box < dyn ToSql > > = vec! [ ] ;
2023-01-06 13:57:48 -05:00
return ( empty_query , empty_params , None ) ;
2022-08-07 11:15:36 -04:00
}
2023-01-06 13:17:30 -05:00
// check if the index needs to be overriden
2023-01-09 23:12:04 -05:00
let idx_name = override_index ( f ) ;
2023-01-06 13:57:48 -05:00
let idx_stmt = idx_name . as_ref ( ) . map_or_else ( | | " " . to_owned ( ) , | i | format! ( " INDEXED BY {} " , i ) ) ;
2023-01-06 13:17:30 -05:00
let mut query = format! ( " SELECT e.content, e.created_at FROM event e {} " , idx_stmt ) ;
2022-07-04 18:25:32 -04:00
// query parameters for SQLite
2022-01-22 22:29:15 -05:00
let mut params : Vec < Box < dyn ToSql > > = vec! [ ] ;
2022-08-07 11:15:36 -04:00
2022-07-04 18:25:32 -04:00
// individual filter components (single conditions such as an author or event ID)
let mut filter_components : Vec < String > = Vec ::new ( ) ;
// Query for "authors", allowing prefix matches
if let Some ( authvec ) = & f . authors {
// take each author and convert to a hexsearch
let mut auth_searches : Vec < String > = vec! [ ] ;
for auth in authvec {
match hex_range ( auth ) {
Some ( HexSearch ::Exact ( ex ) ) = > {
2023-01-04 17:51:17 -05:00
auth_searches . push ( " author=? " . to_owned ( ) ) ;
2022-07-04 18:25:32 -04:00
params . push ( Box ::new ( ex ) ) ;
}
Some ( HexSearch ::Range ( lower , upper ) ) = > {
2022-10-16 16:25:06 -04:00
auth_searches . push (
2023-01-04 17:51:17 -05:00
" (author>? AND author<?) " . to_owned ( ) ,
2022-10-16 16:25:06 -04:00
) ;
2022-07-04 18:25:32 -04:00
params . push ( Box ::new ( lower ) ) ;
params . push ( Box ::new ( upper ) ) ;
}
Some ( HexSearch ::LowerOnly ( lower ) ) = > {
2023-01-04 17:51:17 -05:00
auth_searches . push ( " author>? " . to_owned ( ) ) ;
params . push ( Box ::new ( lower ) ) ;
}
None = > {
info! ( " Could not parse hex range from author {:?} " , auth ) ;
}
}
}
2022-12-14 23:07:56 -05:00
if ! authvec . is_empty ( ) {
2023-01-04 17:51:22 -05:00
let auth_clause = format! ( " ( {} ) " , auth_searches . join ( " OR " ) ) ;
filter_components . push ( auth_clause ) ;
2022-11-19 11:00:38 -05:00
} else {
2023-01-04 17:51:17 -05:00
filter_components . push ( " false " . to_owned ( ) ) ;
}
2022-07-04 18:25:32 -04:00
}
// Query for Kind
if let Some ( ks ) = & f . kinds {
// kind is number, no escaping needed
let str_kinds : Vec < String > = ks . iter ( ) . map ( | x | x . to_string ( ) ) . collect ( ) ;
let kind_clause = format! ( " kind IN ( {} ) " , str_kinds . join ( " , " ) ) ;
filter_components . push ( kind_clause ) ;
}
// Query for event, allowing prefix matches
if let Some ( idvec ) = & f . ids {
// take each author and convert to a hexsearch
let mut id_searches : Vec < String > = vec! [ ] ;
for id in idvec {
match hex_range ( id ) {
Some ( HexSearch ::Exact ( ex ) ) = > {
id_searches . push ( " event_hash=? " . to_owned ( ) ) ;
params . push ( Box ::new ( ex ) ) ;
}
Some ( HexSearch ::Range ( lower , upper ) ) = > {
id_searches . push ( " (event_hash>? AND event_hash<?) " . to_owned ( ) ) ;
params . push ( Box ::new ( lower ) ) ;
params . push ( Box ::new ( upper ) ) ;
}
Some ( HexSearch ::LowerOnly ( lower ) ) = > {
id_searches . push ( " event_hash>? " . to_owned ( ) ) ;
params . push ( Box ::new ( lower ) ) ;
}
None = > {
info! ( " Could not parse hex range from id {:?} " , id ) ;
2022-01-25 19:21:43 -05:00
}
}
2021-12-11 16:48:59 -05:00
}
2022-12-14 23:07:56 -05:00
if ! idvec . is_empty ( ) {
2022-11-19 11:35:00 -05:00
let id_clause = format! ( " ( {} ) " , id_searches . join ( " OR " ) ) ;
filter_components . push ( id_clause ) ;
} else {
// if the ids list was empty, we should never return
// any results.
filter_components . push ( " false " . to_owned ( ) ) ;
}
2022-07-04 18:25:32 -04:00
}
// Query for tags
if let Some ( map ) = & f . tags {
for ( key , val ) in map . iter ( ) {
let mut str_vals : Vec < Box < dyn ToSql > > = vec! [ ] ;
let mut blob_vals : Vec < Box < dyn ToSql > > = vec! [ ] ;
for v in val {
2022-08-21 12:10:19 -04:00
if ( v . len ( ) % 2 = = 0 ) & & is_lower_hex ( v ) {
2022-11-02 19:33:44 -04:00
if let Ok ( h ) = hex ::decode ( v ) {
2022-07-04 18:25:32 -04:00
blob_vals . push ( Box ::new ( h ) ) ;
2022-01-22 22:29:15 -05:00
}
2022-07-04 18:25:32 -04:00
} else {
str_vals . push ( Box ::new ( v . to_owned ( ) ) ) ;
2022-01-22 22:29:15 -05:00
}
}
2022-07-04 18:25:32 -04:00
// create clauses with "?" params for each tag value being searched
let str_clause = format! ( " value IN ( {} ) " , repeat_vars ( str_vals . len ( ) ) ) ;
let blob_clause = format! ( " value_hex IN ( {} ) " , repeat_vars ( blob_vals . len ( ) ) ) ;
2022-08-17 19:34:11 -04:00
// find evidence of the target tag name/value existing for this event.
2022-12-22 16:15:45 -05:00
let tag_clause = format! (
" e.id IN (SELECT e.id FROM event e LEFT JOIN tag t on e.id=t.event_id WHERE hidden!=TRUE and (name=? AND ({} OR {}))) " ,
str_clause , blob_clause
) ;
2022-07-04 18:25:32 -04:00
// add the tag name as the first parameter
2022-08-07 11:15:36 -04:00
params . push ( Box ::new ( key . to_string ( ) ) ) ;
2022-07-04 18:25:32 -04:00
// add all tag values that are plain strings as params
params . append ( & mut str_vals ) ;
// add all tag values that are blobs as params
params . append ( & mut blob_vals ) ;
filter_components . push ( tag_clause ) ;
2021-12-11 16:48:59 -05:00
}
}
2022-07-04 18:25:32 -04:00
// Query for timestamp
if f . since . is_some ( ) {
let created_clause = format! ( " created_at > {} " , f . since . unwrap ( ) ) ;
filter_components . push ( created_clause ) ;
}
// Query for timestamp
if f . until . is_some ( ) {
let until_clause = format! ( " created_at < {} " , f . until . unwrap ( ) ) ;
filter_components . push ( until_clause ) ;
}
2022-01-25 21:48:46 -05:00
// never display hidden events
2022-07-04 18:25:32 -04:00
query . push_str ( " WHERE hidden!=TRUE " ) ;
// build filter component conditions
if ! filter_components . is_empty ( ) {
query . push_str ( " AND " ) ;
query . push_str ( & filter_components . join ( " AND " ) ) ;
}
// Apply per-filter limit to this subquery.
// The use of a LIMIT implies a DESC order, to capture only the most recent events.
if let Some ( lim ) = f . limit {
2022-09-02 13:38:31 -04:00
let _ = write! ( query , " ORDER BY e.created_at DESC LIMIT {} " , lim ) ;
2022-07-04 18:25:32 -04:00
} else {
query . push_str ( " ORDER BY e.created_at ASC " )
2021-12-11 16:48:59 -05:00
}
2023-01-06 13:39:40 -05:00
( query , params , idx_name )
2022-07-04 18:25:32 -04:00
}
/// Create a dynamic SQL query string and params from a subscription.
2023-01-06 13:39:40 -05:00
fn query_from_sub ( sub : & Subscription ) -> ( String , Vec < Box < dyn ToSql > > , Vec < String > ) {
2022-07-04 18:25:32 -04:00
// build a dynamic SQL query for an entire subscription, based on
// SQL subqueries for filters.
let mut subqueries : Vec < String > = Vec ::new ( ) ;
2023-01-06 13:57:48 -05:00
let mut indexes = vec! [ ] ;
2022-07-04 18:25:32 -04:00
// subquery params
let mut params : Vec < Box < dyn ToSql > > = vec! [ ] ;
// for every filter in the subscription, generate a subquery
for f in sub . filters . iter ( ) {
2023-01-06 13:39:40 -05:00
let ( f_subquery , mut f_params , index ) = query_from_filter ( f ) ;
if let Some ( i ) = index {
indexes . push ( i ) ;
}
2022-07-04 18:25:32 -04:00
subqueries . push ( f_subquery ) ;
params . append ( & mut f_params ) ;
2022-05-09 16:39:49 -04:00
}
2022-07-04 18:25:32 -04:00
// encapsulate subqueries into select statements
let subqueries_selects : Vec < String > = subqueries
. iter ( )
2022-12-17 08:49:28 -05:00
. map ( | s | format! ( " SELECT distinct content, created_at FROM ( {} ) " , s ) )
2022-07-04 18:25:32 -04:00
. collect ( ) ;
let query : String = subqueries_selects . join ( " UNION " ) ;
2023-01-06 13:39:40 -05:00
( query , params , indexes )
2021-12-11 16:48:59 -05:00
}
2022-12-20 11:07:01 -05:00
/// Check if the pool is fully utilized
fn _pool_at_capacity ( pool : & SqlitePool ) -> bool {
let state : r2d2 ::State = pool . state ( ) ;
state . idle_connections = = 0
}
2022-12-22 16:47:33 -05:00
/// Log pool stats
2022-12-20 14:21:57 -05:00
fn log_pool_stats ( name : & str , pool : & SqlitePool ) {
2022-12-16 18:01:49 -05:00
let state : r2d2 ::State = pool . state ( ) ;
let in_use_cxns = state . connections - state . idle_connections ;
2022-12-25 11:43:47 -05:00
debug! (
" DB pool {:?} usage (in_use: {}, available: {}, max: {}) " ,
2022-12-22 16:47:33 -05:00
name ,
in_use_cxns ,
2022-12-25 11:43:47 -05:00
state . connections ,
pool . max_size ( )
2022-12-16 18:01:49 -05:00
) ;
}
2022-12-26 11:03:51 -05:00
2022-12-22 16:16:21 -05:00
/// Perform database maintenance on a regular basis
2022-12-27 10:48:07 -05:00
pub async fn db_optimize_task ( pool : SqlitePool ) {
2022-12-22 16:16:21 -05:00
tokio ::task ::spawn ( async move {
loop {
tokio ::select! {
2022-12-26 11:03:51 -05:00
_ = tokio ::time ::sleep ( Duration ::from_secs ( 60 * 60 ) ) = > {
2022-12-25 12:06:30 -05:00
if let Ok ( mut conn ) = pool . get ( ) {
// the busy timer will block writers, so don't set
// this any higher than you want max latency for event
// writes.
2022-12-26 11:03:51 -05:00
info! ( " running database optimizer " ) ;
2022-12-25 12:06:30 -05:00
optimize_db ( & mut conn ) . ok ( ) ;
2022-12-26 11:03:51 -05:00
}
}
} ;
}
} ) ;
}
/// Perform database WAL checkpoint on a regular basis
2022-12-27 10:48:07 -05:00
pub async fn db_checkpoint_task ( pool : SqlitePool , safe_to_read : Arc < Mutex < u64 > > ) {
2022-12-26 11:03:51 -05:00
tokio ::task ::spawn ( async move {
// WAL size in pages.
let mut current_wal_size = 0 ;
// WAL threshold for more aggressive checkpointing (10,000 pages, or about 40MB)
let wal_threshold = 1000 * 10 ;
// default threshold for the busy timer
let busy_wait_default = Duration ::from_secs ( 1 ) ;
// if the WAL file is getting too big, switch to this
2022-12-27 11:13:14 -05:00
let busy_wait_default_long = Duration ::from_secs ( 10 ) ;
2022-12-26 11:03:51 -05:00
loop {
tokio ::select! {
_ = tokio ::time ::sleep ( Duration ::from_secs ( CHECKPOINT_FREQ_SEC ) ) = > {
if let Ok ( mut conn ) = pool . get ( ) {
2022-12-27 10:48:07 -05:00
let mut _guard :Option < MutexGuard < u64 > > = None ;
2022-12-26 11:03:51 -05:00
// the busy timer will block writers, so don't set
// this any higher than you want max latency for event
// writes.
if current_wal_size < = wal_threshold {
conn . busy_timeout ( busy_wait_default ) . ok ( ) ;
} else {
// if the wal size has exceeded a threshold, increase the busy timeout.
conn . busy_timeout ( busy_wait_default_long ) . ok ( ) ;
2022-12-27 10:48:07 -05:00
// take a lock that will prevent new readers.
info! ( " blocking new readers to perform wal_checkpoint " ) ;
_guard = Some ( safe_to_read . lock ( ) . await ) ;
2022-12-26 11:03:51 -05:00
}
2022-12-25 12:06:30 -05:00
debug! ( " running wal_checkpoint(TRUNCATE) " ) ;
2022-12-26 11:03:51 -05:00
if let Ok ( new_size ) = checkpoint_db ( & mut conn ) {
current_wal_size = new_size ;
}
2022-12-25 12:06:30 -05:00
}
}
} ;
2022-12-22 16:16:21 -05:00
}
} ) ;
}
2021-12-11 22:43:41 -05:00
/// Perform a database query using a subscription.
///
/// The [`Subscription`] is converted into a SQL query. Each result
/// is published on the `query_tx` channel as it is returned. If a
/// message becomes available on the `abandon_query_rx` channel, the
/// query is immediately aborted.
2021-12-11 16:48:59 -05:00
pub async fn db_query (
sub : Subscription ,
2022-08-17 19:34:11 -04:00
client_id : String ,
2022-02-23 17:38:16 -05:00
pool : SqlitePool ,
2021-12-11 16:48:59 -05:00
query_tx : tokio ::sync ::mpsc ::Sender < QueryResult > ,
mut abandon_query_rx : tokio ::sync ::oneshot ::Receiver < ( ) > ,
2022-12-27 11:28:56 -05:00
safe_to_read : Arc < Mutex < u64 > > ,
2021-12-11 16:48:59 -05:00
) {
2022-12-18 23:11:46 -05:00
let pre_spawn_start = Instant ::now ( ) ;
2021-12-11 16:48:59 -05:00
task ::spawn_blocking ( move | | {
2022-12-27 11:28:56 -05:00
{
// if we are waiting on a checkpoint, stop until it is complete
let _ = safe_to_read . blocking_lock ( ) ;
}
2022-12-18 23:11:46 -05:00
let db_queue_time = pre_spawn_start . elapsed ( ) ;
2022-12-20 14:23:21 -05:00
// if the queue time was very long (>5 seconds), spare the DB and abort.
if db_queue_time > Duration ::from_secs ( 5 ) {
info! (
2023-01-03 22:24:46 -05:00
" shedding DB query load queued for {:?} (cid: {}, sub: {:?}) " ,
2022-12-20 14:23:21 -05:00
db_queue_time , client_id , sub . id
) ;
return Ok ( ( ) ) ;
}
// otherwise, report queuing time if it is slow
else if db_queue_time > Duration ::from_secs ( 1 ) {
2022-12-18 23:11:46 -05:00
debug! (
" (slow) DB query queued for {:?} (cid: {}, sub: {:?}) " ,
db_queue_time , client_id , sub . id
) ;
}
2022-01-01 20:25:09 -05:00
let start = Instant ::now ( ) ;
2022-12-18 14:42:31 -05:00
let mut row_count : usize = 0 ;
2021-12-11 22:43:41 -05:00
// generate SQL query
2023-01-06 13:39:40 -05:00
let ( q , p , idxs ) = query_from_sub ( & sub ) ;
2022-12-22 11:29:27 -05:00
let sql_gen_elapsed = start . elapsed ( ) ;
2023-01-06 13:39:40 -05:00
2022-12-22 11:29:27 -05:00
if sql_gen_elapsed > Duration ::from_millis ( 10 ) {
debug! ( " SQL (slow) generated in {:?} " , start . elapsed ( ) ) ;
}
2022-12-16 09:17:39 -05:00
// cutoff for displaying slow queries
2022-12-18 21:47:11 -05:00
let slow_cutoff = Duration ::from_millis ( 2000 ) ;
2022-12-19 01:02:28 -05:00
// any client that doesn't cause us to generate new rows in 5
// seconds gets dropped.
let abort_cutoff = Duration ::from_secs ( 5 ) ;
2022-02-21 10:03:05 -05:00
let start = Instant ::now ( ) ;
2022-12-18 14:42:31 -05:00
let mut slow_first_event ;
2022-12-19 01:02:28 -05:00
let mut last_successful_send = Instant ::now ( ) ;
2022-12-25 11:42:09 -05:00
if let Ok ( mut conn ) = pool . get ( ) {
// execute the query.
// make the actual SQL query (with parameters inserted) available
conn . trace ( Some ( | x | { trace! ( " SQL trace: {:?} " , x ) } ) ) ;
let mut stmt = conn . prepare_cached ( & q ) ? ;
2022-02-23 17:38:16 -05:00
let mut event_rows = stmt . query ( rusqlite ::params_from_iter ( p ) ) ? ;
2022-12-25 11:42:09 -05:00
2022-02-23 17:38:16 -05:00
let mut first_result = true ;
while let Some ( row ) = event_rows . next ( ) ? {
2022-12-18 14:42:31 -05:00
let first_event_elapsed = start . elapsed ( ) ;
slow_first_event = first_event_elapsed > = slow_cutoff ;
2022-02-23 17:38:16 -05:00
if first_result {
2022-08-17 19:34:11 -04:00
debug! (
2023-01-06 13:39:40 -05:00
" first result in {:?} (cid: {}, sub: {:?}) [used indexes: {:?}] " ,
first_event_elapsed , client_id , sub . id , idxs
2022-08-17 19:34:11 -04:00
) ;
2022-02-23 17:38:16 -05:00
first_result = false ;
}
2022-12-18 21:47:11 -05:00
// logging for slow queries; show sub and SQL.
// to reduce logging; only show 1/16th of clients (leading 0)
2022-12-24 11:29:47 -05:00
if row_count = = 0 & & slow_first_event & & client_id . starts_with ( '0' ) {
2022-12-18 23:11:46 -05:00
debug! (
2022-12-18 14:42:31 -05:00
" query req (slow): {:?} (cid: {}, sub: {:?}) " ,
sub , client_id , sub . id
) ;
2022-12-25 11:45:27 -05:00
}
2022-12-27 16:24:10 -05:00
// check if a checkpoint is trying to run, and abort
if row_count % 100 = = 0 {
{
2023-01-09 23:12:04 -05:00
if safe_to_read . try_lock ( ) . is_err ( ) {
2022-12-27 16:24:10 -05:00
// lock was held, abort this query
debug! ( " query aborted due to checkpoint (cid: {}, sub: {:?}) " , client_id , sub . id ) ;
return Ok ( ( ) ) ;
}
}
}
2022-12-18 21:47:11 -05:00
// check if this is still active; every 100 rows
if row_count % 100 = = 0 & & abandon_query_rx . try_recv ( ) . is_ok ( ) {
2022-12-16 16:22:27 -05:00
debug! ( " query aborted (cid: {}, sub: {:?}) " , client_id , sub . id ) ;
2022-02-23 17:38:16 -05:00
return Ok ( ( ) ) ;
}
row_count + = 1 ;
let event_json = row . get ( 0 ) ? ;
2022-12-19 01:02:28 -05:00
loop {
if query_tx . capacity ( ) ! = 0 {
// we have capacity to add another item
break ;
} else {
// the queue is full
trace! ( " db reader thread is stalled " ) ;
if last_successful_send + abort_cutoff < Instant ::now ( ) {
// the queue has been full for too long, abort
2022-12-25 11:45:27 -05:00
info! ( " aborting database query due to slow client (cid: {}, sub: {:?}) " ,
client_id , sub . id ) ;
2022-12-19 01:02:28 -05:00
let ok : Result < ( ) > = Ok ( ( ) ) ;
return ok ;
}
2022-12-27 16:24:10 -05:00
// check if a checkpoint is trying to run, and abort
2023-01-09 23:12:04 -05:00
if safe_to_read . try_lock ( ) . is_err ( ) {
2022-12-27 16:24:10 -05:00
// lock was held, abort this query
debug! ( " query aborted due to checkpoint (cid: {}, sub: {:?}) " , client_id , sub . id ) ;
return Ok ( ( ) ) ;
}
2022-12-19 01:02:28 -05:00
// give the queue a chance to clear before trying again
thread ::sleep ( Duration ::from_millis ( 100 ) ) ;
}
}
// TODO: we could use try_send, but we'd have to juggle
// getting the query result back as part of the error
// result.
2022-02-23 17:38:16 -05:00
query_tx
. blocking_send ( QueryResult {
sub_id : sub . get_id ( ) ,
event : event_json ,
} )
. ok ( ) ;
2022-12-19 01:02:28 -05:00
last_successful_send = Instant ::now ( ) ;
2021-12-11 16:48:59 -05:00
}
2022-05-30 18:02:59 -04:00
query_tx
. blocking_send ( QueryResult {
sub_id : sub . get_id ( ) ,
event : " EOSE " . to_string ( ) ,
} )
. ok ( ) ;
2022-02-23 17:38:16 -05:00
debug! (
2022-12-18 23:11:46 -05:00
" query completed in {:?} (cid: {}, sub: {:?}, db_time: {:?}, rows: {}) " ,
pre_spawn_start . elapsed ( ) ,
2022-08-21 12:10:19 -04:00
client_id ,
2022-12-16 16:22:27 -05:00
sub . id ,
2022-12-18 23:11:46 -05:00
start . elapsed ( ) ,
2022-12-16 16:22:27 -05:00
row_count
2022-02-23 17:38:16 -05:00
) ;
} else {
warn! ( " Could not get a database connection for querying " ) ;
2021-12-11 16:48:59 -05:00
}
2022-01-05 18:33:08 -05:00
let ok : Result < ( ) > = Ok ( ( ) ) ;
2022-01-22 22:29:15 -05:00
ok
2021-12-11 16:48:59 -05:00
} ) ;
2023-01-02 16:38:30 -05:00
}