2021-12-11 22:43:41 -05:00
//! Event persistence and querying
2022-09-06 07:12:07 -04:00
//use crate::config::SETTINGS;
use crate ::config ::Settings ;
use crate ::error ::{ Error , Result } ;
2022-08-17 19:34:11 -04:00
use crate ::event ::{ single_char_tagname , Event } ;
2022-02-12 10:29:38 -05:00
use crate ::hexrange ::hex_range ;
use crate ::hexrange ::HexSearch ;
2022-02-12 10:29:25 -05:00
use crate ::nip05 ;
2022-11-10 17:27:10 -05:00
use crate ::notice ::Notice ;
2022-02-12 10:58:42 -05:00
use crate ::schema ::{ upgrade_db , STARTUP_SQL } ;
2022-07-04 18:25:32 -04:00
use crate ::subscription ::ReqFilter ;
2021-12-11 16:48:59 -05:00
use crate ::subscription ::Subscription ;
2022-08-17 19:34:11 -04:00
use crate ::utils ::{ is_hex , is_lower_hex } ;
2021-12-30 22:07:21 -05:00
use governor ::clock ::Clock ;
use governor ::{ Quota , RateLimiter } ;
2021-12-11 16:48:59 -05:00
use hex ;
2022-01-25 21:39:24 -05:00
use r2d2 ;
use r2d2_sqlite ::SqliteConnectionManager ;
2022-02-12 10:29:25 -05:00
use rusqlite ::params ;
2022-01-22 22:29:15 -05:00
use rusqlite ::types ::ToSql ;
2022-02-12 10:29:25 -05:00
use rusqlite ::OpenFlags ;
2022-09-02 13:38:31 -04:00
use std ::fmt ::Write as _ ;
2021-12-11 16:48:59 -05:00
use std ::path ::Path ;
2021-12-30 22:07:21 -05:00
use std ::thread ;
2022-01-26 22:39:03 -05:00
use std ::time ::Duration ;
2022-01-01 20:25:09 -05:00
use std ::time ::Instant ;
2021-12-11 16:48:59 -05:00
use tokio ::task ;
2022-09-28 08:19:59 -04:00
use tracing ::{ debug , info , trace , warn } ;
2021-12-11 16:48:59 -05:00
2022-01-25 21:39:24 -05:00
pub type SqlitePool = r2d2 ::Pool < r2d2_sqlite ::SqliteConnectionManager > ;
2022-02-12 10:29:25 -05:00
pub type PooledConnection = r2d2 ::PooledConnection < r2d2_sqlite ::SqliteConnectionManager > ;
2022-02-13 10:35:54 -05:00
/// Events submitted from a client, with a return channel for notices
pub struct SubmittedEvent {
pub event : Event ,
2022-11-10 17:27:10 -05:00
pub notice_tx : tokio ::sync ::mpsc ::Sender < Notice > ,
2022-02-13 10:35:54 -05:00
}
2021-12-11 22:43:41 -05:00
/// Database file
2022-02-12 10:29:25 -05:00
pub const DB_FILE : & str = " nostr.db " ;
2022-12-18 00:18:54 -05:00
/// How many persisted events before optimization is triggered
pub const EVENT_COUNT_OPTIMIZE_TRIGGER : usize = 500 ;
2021-12-11 16:48:59 -05:00
2022-02-12 10:29:35 -05:00
/// Build a database connection pool.
2022-09-24 10:19:16 -04:00
/// # Panics
///
/// Will panic if the pool could not be created.
#[ must_use ]
2022-02-12 10:29:25 -05:00
pub fn build_pool (
name : & str ,
2022-09-24 10:19:16 -04:00
settings : & Settings ,
2022-02-12 10:29:25 -05:00
flags : OpenFlags ,
min_size : u32 ,
max_size : u32 ,
wait_for_db : bool ,
) -> SqlitePool {
let db_dir = & settings . database . data_directory ;
2022-01-25 21:39:24 -05:00
let full_path = Path ::new ( db_dir ) . join ( DB_FILE ) ;
2022-01-26 22:39:03 -05:00
// small hack; if the database doesn't exist yet, that means the
// writer thread hasn't finished. Give it a chance to work. This
// is only an issue with the first time we run.
2022-09-06 07:06:01 -04:00
if ! settings . database . in_memory {
while ! full_path . exists ( ) & & wait_for_db {
debug! ( " Database reader pool is waiting on the database to be created... " ) ;
thread ::sleep ( Duration ::from_millis ( 500 ) ) ;
}
2022-01-26 22:39:03 -05:00
}
2022-09-24 20:28:02 -04:00
let manager = if settings . database . in_memory {
SqliteConnectionManager ::memory ( )
2022-09-06 07:06:01 -04:00
. with_flags ( flags )
2022-09-24 20:28:02 -04:00
. with_init ( | c | c . execute_batch ( STARTUP_SQL ) )
} else {
SqliteConnectionManager ::file ( & full_path )
2022-09-06 07:06:01 -04:00
. with_flags ( flags )
2022-09-24 20:28:02 -04:00
. with_init ( | c | c . execute_batch ( STARTUP_SQL ) )
2022-09-06 07:06:01 -04:00
} ;
2022-01-25 21:39:24 -05:00
let pool : SqlitePool = r2d2 ::Pool ::builder ( )
. test_on_check_out ( true ) // no noticeable performance hit
2022-02-12 10:29:25 -05:00
. min_idle ( Some ( min_size ) )
. max_size ( max_size )
2022-12-17 11:47:35 -05:00
. max_lifetime ( Some ( Duration ::from_secs ( 60 ) ) )
2022-01-25 21:39:24 -05:00
. build ( manager )
. unwrap ( ) ;
2022-01-25 22:42:43 -05:00
info! (
2022-02-12 10:29:25 -05:00
" Built a connection pool {:?} (min={}, max={}) " ,
name , min_size , max_size
2022-01-25 22:42:43 -05:00
) ;
2022-02-12 10:29:25 -05:00
pool
}
2022-12-18 00:18:54 -05:00
/// Perform normal maintenance
pub fn optimize_db ( conn : & mut PooledConnection ) -> Result < ( ) > {
conn . execute_batch ( " PRAGMA optimize; " ) ? ;
Ok ( ( ) )
}
2021-12-11 16:48:59 -05:00
/// Spawn a database writer that persists events to the SQLite store.
pub async fn db_writer (
2022-09-06 07:12:07 -04:00
settings : Settings ,
2022-02-13 10:35:54 -05:00
mut event_rx : tokio ::sync ::mpsc ::Receiver < SubmittedEvent > ,
2021-12-12 11:20:23 -05:00
bcast_tx : tokio ::sync ::broadcast ::Sender < Event > ,
2022-02-12 10:29:25 -05:00
metadata_tx : tokio ::sync ::broadcast ::Sender < Event > ,
2021-12-30 22:07:21 -05:00
mut shutdown : tokio ::sync ::broadcast ::Receiver < ( ) > ,
2021-12-11 16:48:59 -05:00
) -> tokio ::task ::JoinHandle < Result < ( ) > > {
2022-02-12 10:29:25 -05:00
// are we performing NIP-05 checking?
let nip05_active = settings . verified_users . is_active ( ) ;
// are we requriing NIP-05 user verification?
let nip05_enabled = settings . verified_users . is_enabled ( ) ;
2021-12-11 16:48:59 -05:00
task ::spawn_blocking ( move | | {
2022-02-12 10:29:25 -05:00
let db_dir = & settings . database . data_directory ;
2021-12-31 12:51:57 -05:00
let full_path = Path ::new ( db_dir ) . join ( DB_FILE ) ;
2022-02-12 10:29:25 -05:00
// create a connection pool
let pool = build_pool (
" event writer " ,
2022-09-24 10:19:16 -04:00
& settings ,
2021-12-11 16:48:59 -05:00
OpenFlags ::SQLITE_OPEN_READ_WRITE | OpenFlags ::SQLITE_OPEN_CREATE ,
2022-02-12 10:29:25 -05:00
1 ,
2022-12-18 23:32:31 -05:00
2 ,
2022-02-12 10:29:25 -05:00
false ,
) ;
2022-09-06 07:12:07 -04:00
if settings . database . in_memory {
info! ( " using in-memory database, this will not persist a restart! " ) ;
} else {
info! ( " opened database {:?} for writing " , full_path ) ;
}
2022-02-12 10:29:25 -05:00
upgrade_db ( & mut pool . get ( ) ? ) ? ;
2022-01-22 22:29:15 -05:00
2022-01-26 22:39:03 -05:00
// Make a copy of the whitelist
2022-02-12 10:29:25 -05:00
let whitelist = & settings . authorization . pubkey_whitelist . clone ( ) ;
2022-01-26 22:39:03 -05:00
2021-12-30 22:07:21 -05:00
// get rate limit settings
2022-02-12 10:29:25 -05:00
let rps_setting = settings . limits . messages_per_sec ;
2022-01-05 18:30:47 -05:00
let mut most_recent_rate_limit = Instant ::now ( ) ;
2021-12-30 22:07:21 -05:00
let mut lim_opt = None ;
2022-12-18 00:18:54 -05:00
// Keep rough track of events so we can run optimize eventually.
let mut optimize_counter : usize = 0 ;
2021-12-30 22:07:21 -05:00
let clock = governor ::clock ::QuantaClock ::default ( ) ;
if let Some ( rps ) = rps_setting {
if rps > 0 {
info! ( " Enabling rate limits for event creation ({}/sec) " , rps ) ;
let quota = core ::num ::NonZeroU32 ::new ( rps * 60 ) . unwrap ( ) ;
lim_opt = Some ( RateLimiter ::direct ( Quota ::per_minute ( quota ) ) ) ;
}
}
2021-12-11 16:48:59 -05:00
loop {
2021-12-31 16:19:35 -05:00
if shutdown . try_recv ( ) . is_ok ( ) {
2021-12-30 22:07:21 -05:00
info! ( " shutting down database writer " ) ;
break ;
}
2021-12-11 16:48:59 -05:00
// call blocking read on channel
let next_event = event_rx . blocking_recv ( ) ;
// if the channel has closed, we will never get work
if next_event . is_none ( ) {
break ;
}
2022-10-16 16:25:06 -04:00
// track if an event write occurred; this is used to
// update the rate limiter
2021-12-30 22:07:21 -05:00
let mut event_write = false ;
2022-02-13 10:35:54 -05:00
let subm_event = next_event . unwrap ( ) ;
let event = subm_event . event ;
let notice_tx = subm_event . notice_tx ;
2022-01-26 22:39:03 -05:00
// check if this event is authorized.
if let Some ( allowed_addrs ) = whitelist {
2022-10-16 16:25:06 -04:00
// TODO: incorporate delegated pubkeys
2022-01-26 22:39:03 -05:00
// if the event address is not in allowed_addrs.
if ! allowed_addrs . contains ( & event . pubkey ) {
info! (
" Rejecting event {}, unauthorized author " ,
event . get_event_id_prefix ( )
) ;
2022-02-13 10:35:54 -05:00
notice_tx
2022-11-10 17:27:10 -05:00
. try_send ( Notice ::blocked (
event . id ,
" pubkey is not allowed to publish to this relay " ,
) )
2022-02-13 10:35:54 -05:00
. ok ( ) ;
2022-01-26 22:39:03 -05:00
continue ;
}
}
2022-02-12 10:29:25 -05:00
// send any metadata events to the NIP-05 verifier
if nip05_active & & event . is_kind_metadata ( ) {
// we are sending this prior to even deciding if we
// persist it. this allows the nip05 module to
// inspect it, update if necessary, or persist a new
// event and broadcast it itself.
metadata_tx . send ( event . clone ( ) ) . ok ( ) ;
}
// check for NIP-05 verification
if nip05_enabled {
match nip05 ::query_latest_user_verification ( pool . get ( ) ? , event . pubkey . to_owned ( ) ) {
Ok ( uv ) = > {
2022-09-06 07:12:07 -04:00
if uv . is_valid ( & settings . verified_users ) {
2022-02-12 10:29:25 -05:00
info! (
" new event from verified author ({:?},{:?}) " ,
uv . name . to_string ( ) ,
event . get_author_prefix ( )
) ;
} else {
info! ( " rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain) " ,
uv . name . to_string ( ) ,
event . get_author_prefix ( )
) ;
2022-02-13 10:35:54 -05:00
notice_tx
2022-11-10 17:27:10 -05:00
. try_send ( Notice ::blocked (
event . id ,
" NIP-05 verification is no longer valid (expired/wrong domain) " ,
) )
2022-02-13 10:35:54 -05:00
. ok ( ) ;
2022-02-12 10:29:25 -05:00
continue ;
}
}
Err ( Error ::SqlError ( rusqlite ::Error ::QueryReturnedNoRows ) ) = > {
debug! (
" no verification records found for pubkey: {:?} " ,
event . get_author_prefix ( )
) ;
2022-02-13 10:35:54 -05:00
notice_tx
2022-11-10 17:27:10 -05:00
. try_send ( Notice ::blocked (
event . id ,
" NIP-05 verification needed to publish events " ,
) )
2022-02-13 10:35:54 -05:00
. ok ( ) ;
2022-02-12 10:29:25 -05:00
continue ;
}
Err ( e ) = > {
warn! ( " checking nip05 verification status failed: {:?} " , e ) ;
continue ;
}
}
}
// TODO: cache recent list of authors to remove a DB call.
2022-01-01 20:25:09 -05:00
let start = Instant ::now ( ) ;
2022-05-30 18:03:00 -04:00
if event . kind > = 20000 & & event . kind < 30000 {
2022-11-04 08:55:38 -04:00
bcast_tx . send ( event . clone ( ) ) . ok ( ) ;
2022-05-30 18:03:00 -04:00
info! (
2022-11-05 17:11:20 -04:00
" published ephemeral event: {:?} from: {:?} in: {:?} " ,
2022-05-30 18:03:00 -04:00
event . get_event_id_prefix ( ) ,
event . get_author_prefix ( ) ,
start . elapsed ( )
) ;
event_write = true
} else {
match write_event ( & mut pool . get ( ) ? , & event ) {
Ok ( updated ) = > {
if updated = = 0 {
2022-10-08 13:12:41 -04:00
trace! ( " ignoring duplicate or deleted event " ) ;
2022-11-10 17:27:10 -05:00
notice_tx . try_send ( Notice ::duplicate ( event . id ) ) . ok ( ) ;
2022-05-30 18:03:00 -04:00
} else {
info! (
2022-11-05 17:11:20 -04:00
" persisted event: {:?} from: {:?} in: {:?} " ,
2022-05-30 18:03:00 -04:00
event . get_event_id_prefix ( ) ,
event . get_author_prefix ( ) ,
start . elapsed ( )
) ;
event_write = true ;
// send this out to all clients
bcast_tx . send ( event . clone ( ) ) . ok ( ) ;
2022-11-10 17:27:10 -05:00
notice_tx . try_send ( Notice ::saved ( event . id ) ) . ok ( ) ;
2022-05-30 18:03:00 -04:00
}
}
Err ( err ) = > {
warn! ( " event insert failed: {:?} " , err ) ;
2022-11-10 17:27:10 -05:00
let msg = " relay experienced an error trying to publish the latest event " ;
notice_tx . try_send ( Notice ::error ( event . id , msg ) ) . ok ( ) ;
2021-12-11 16:48:59 -05:00
}
}
2022-12-18 00:18:54 -05:00
// Use this as a trigger to do optimization
optimize_counter + = 1 ;
if optimize_counter > EVENT_COUNT_OPTIMIZE_TRIGGER {
info! ( " running database optimizer " ) ;
optimize_counter = 0 ;
optimize_db ( & mut pool . get ( ) ? ) . ok ( ) ;
}
2021-12-11 16:48:59 -05:00
}
2022-02-12 10:29:25 -05:00
2021-12-30 22:07:21 -05:00
// use rate limit, if defined, and if an event was actually written.
if event_write {
if let Some ( ref lim ) = lim_opt {
if let Err ( n ) = lim . check ( ) {
2022-01-05 18:30:47 -05:00
let wait_for = n . wait_time_from ( clock . now ( ) ) ;
// check if we have recently logged rate
// limits, but print out a message only once
// per second.
2022-02-12 10:29:25 -05:00
if most_recent_rate_limit . elapsed ( ) . as_secs ( ) > 10 {
2022-01-05 18:30:47 -05:00
warn! (
2022-02-12 10:29:25 -05:00
" rate limit reached for event creation (sleep for {:?}) (suppressing future messages for 10 seconds) " ,
2022-01-05 18:30:47 -05:00
wait_for
) ;
// reset last rate limit message
most_recent_rate_limit = Instant ::now ( ) ;
}
// block event writes, allowing them to queue up
thread ::sleep ( wait_for ) ;
2021-12-30 22:07:21 -05:00
continue ;
}
}
}
2021-12-11 16:48:59 -05:00
}
info! ( " database connection closed " ) ;
Ok ( ( ) )
} )
}
2022-02-12 10:29:35 -05:00
/// Persist an event to the database, returning rows added.
2022-02-12 10:29:25 -05:00
pub fn write_event ( conn : & mut PooledConnection , e : & Event ) -> Result < usize > {
2021-12-11 16:48:59 -05:00
// start transaction
let tx = conn . transaction ( ) ? ;
// get relevant fields from event and convert to blobs.
let id_blob = hex ::decode ( & e . id ) . ok ( ) ;
2022-10-16 16:25:06 -04:00
let pubkey_blob : Option < Vec < u8 > > = hex ::decode ( & e . pubkey ) . ok ( ) ;
let delegator_blob : Option < Vec < u8 > > = e . delegated_by . as_ref ( ) . and_then ( | d | hex ::decode ( d ) . ok ( ) ) ;
2021-12-11 16:48:59 -05:00
let event_str = serde_json ::to_string ( & e ) . ok ( ) ;
2021-12-11 22:43:41 -05:00
// ignore if the event hash is a duplicate.
2022-10-08 13:12:41 -04:00
let mut ins_count = tx . execute (
2022-10-16 16:25:06 -04:00
" INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, delegated_by, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%s','now'), FALSE); " ,
params! [ id_blob , e . created_at , e . kind , pubkey_blob , delegator_blob , event_str ]
2021-12-11 16:48:59 -05:00
) ? ;
2021-12-11 22:43:41 -05:00
if ins_count = = 0 {
// if the event was a duplicate, no need to insert event or
2022-10-08 13:12:41 -04:00
// pubkey references. This will abort the txn.
2021-12-11 22:43:41 -05:00
return Ok ( ins_count ) ;
}
// remember primary key of the event most recently inserted.
2021-12-11 16:48:59 -05:00
let ev_id = tx . last_insert_rowid ( ) ;
2022-01-22 22:29:15 -05:00
// add all tags to the tag table
for tag in e . tags . iter ( ) {
// ensure we have 2 values.
if tag . len ( ) > = 2 {
let tagname = & tag [ 0 ] ;
let tagval = & tag [ 1 ] ;
2022-08-17 19:34:11 -04:00
// only single-char tags are searchable
let tagchar_opt = single_char_tagname ( tagname ) ;
match & tagchar_opt {
Some ( _ ) = > {
// if tagvalue is lowercase hex;
2022-09-02 11:30:51 -04:00
if is_lower_hex ( tagval ) & & ( tagval . len ( ) % 2 = = 0 ) {
2022-08-17 19:34:11 -04:00
tx . execute (
" INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3) " ,
2022-11-02 19:33:44 -04:00
params! [ ev_id , & tagname , hex ::decode ( tagval ) . ok ( ) ] ,
2022-08-17 19:34:11 -04:00
) ? ;
} else {
tx . execute (
" INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3) " ,
params! [ ev_id , & tagname , & tagval ] ,
) ? ;
}
}
None = > { }
2022-01-22 22:29:15 -05:00
}
2021-12-11 16:48:59 -05:00
}
}
2022-05-30 18:03:00 -04:00
// if this event is replaceable update, hide every other replaceable
// event with the same kind from the same author that was issued
// earlier than this.
if e . kind = = 0 | | e . kind = = 3 | | ( e . kind > = 10000 & & e . kind < 20000 ) {
2021-12-30 16:45:03 -05:00
let update_count = tx . execute (
2022-05-30 18:03:00 -04:00
" UPDATE event SET hidden=TRUE WHERE id!=? AND kind=? AND author=? AND created_at <= ? and hidden!=TRUE " ,
params! [ ev_id , e . kind , hex ::decode ( & e . pubkey ) . ok ( ) , e . created_at ] ,
2021-12-30 16:45:03 -05:00
) ? ;
if update_count > 0 {
2022-02-12 10:29:25 -05:00
info! (
2022-10-08 13:12:41 -04:00
" hid {} older replaceable kind {} events for author: {:?} " ,
2022-02-12 10:29:25 -05:00
update_count ,
2022-05-30 18:03:00 -04:00
e . kind ,
2022-02-12 10:29:25 -05:00
e . get_author_prefix ( )
) ;
2021-12-30 16:45:03 -05:00
}
}
2022-02-27 12:34:10 -05:00
// if this event is a deletion, hide the referenced events from the same author.
if e . kind = = 5 {
let event_candidates = e . tag_values_by_name ( " e " ) ;
2022-09-02 13:38:31 -04:00
// first parameter will be author
2022-09-02 11:30:51 -04:00
let mut params : Vec < Box < dyn ToSql > > = vec! [ Box ::new ( hex ::decode ( & e . pubkey ) ? ) ] ;
2022-02-27 12:34:10 -05:00
event_candidates
. iter ( )
. filter ( | x | is_hex ( x ) & & x . len ( ) = = 64 )
. filter_map ( | x | hex ::decode ( x ) . ok ( ) )
. for_each ( | x | params . push ( Box ::new ( x ) ) ) ;
let query = format! (
2022-10-08 09:02:16 -04:00
" UPDATE event SET hidden=TRUE WHERE kind!=5 AND author=? AND event_hash IN ({}) " ,
2022-02-27 12:34:10 -05:00
repeat_vars ( params . len ( ) - 1 )
) ;
let mut stmt = tx . prepare ( & query ) ? ;
let update_count = stmt . execute ( rusqlite ::params_from_iter ( params ) ) ? ;
info! (
" hid {} deleted events for author {:?} " ,
update_count ,
e . get_author_prefix ( )
) ;
2022-10-08 13:12:41 -04:00
} else {
// check if a deletion has already been recorded for this event.
// Only relevant for non-deletion events
let del_count = tx . query_row (
" SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND t.name='e' AND e.kind=5 AND t.value_hex=? LIMIT 1; " ,
params! [ pubkey_blob , id_blob ] , | row | row . get ::< usize , usize > ( 0 ) ) ;
// check if a the query returned a result, meaning we should
// hid the current event
if del_count . ok ( ) . is_some ( ) {
// a deletion already existed, mark original event as hidden.
info! (
" hid event: {:?} due to existing deletion by author: {:?} " ,
e . get_event_id_prefix ( ) ,
e . get_author_prefix ( )
) ;
let _update_count =
tx . execute ( " UPDATE event SET hidden=TRUE WHERE id=? " , params! [ ev_id ] ) ? ;
// event was deleted, so let caller know nothing new
// arrived, preventing this from being sent to active
// subscriptions
ins_count = 0 ;
}
2022-02-27 12:34:10 -05:00
}
2021-12-11 16:48:59 -05:00
tx . commit ( ) ? ;
Ok ( ins_count )
}
2022-02-12 10:29:35 -05:00
/// Serialized event associated with a specific subscription request.
2022-09-24 09:30:22 -04:00
#[ derive(PartialEq, Eq, Debug, Clone) ]
2021-12-11 16:48:59 -05:00
pub struct QueryResult {
2021-12-11 22:43:41 -05:00
/// Subscription identifier
2021-12-11 16:48:59 -05:00
pub sub_id : String ,
2021-12-11 22:43:41 -05:00
/// Serialized event
2021-12-11 16:48:59 -05:00
pub event : String ,
}
2022-02-12 10:29:35 -05:00
/// Produce a arbitrary list of '?' parameters.
2022-01-22 22:29:15 -05:00
fn repeat_vars ( count : usize ) -> String {
if count = = 0 {
return " " . to_owned ( ) ;
}
let mut s = " ?, " . repeat ( count ) ;
// Remove trailing comma
s . pop ( ) ;
s
}
2022-07-04 18:25:32 -04:00
/// Create a dynamic SQL subquery and params from a subscription filter.
fn query_from_filter ( f : & ReqFilter ) -> ( String , Vec < Box < dyn ToSql > > ) {
2021-12-11 16:48:59 -05:00
// build a dynamic SQL query. all user-input is either an integer
// (sqli-safe), or a string that is filtered to only contain
2022-01-22 22:29:15 -05:00
// hexadecimal characters. Strings that require escaping (tag
// names/values) use parameters.
2022-08-07 11:15:36 -04:00
// if the filter is malformed, don't return anything.
if f . force_no_match {
2022-12-17 08:49:28 -05:00
let empty_query = " SELECT e.content, e.created_at FROM event e WHERE 1=0 " . to_owned ( ) ;
2022-08-07 11:15:36 -04:00
// query parameters for SQLite
let empty_params : Vec < Box < dyn ToSql > > = vec! [ ] ;
return ( empty_query , empty_params ) ;
}
2022-12-17 08:49:28 -05:00
let mut query = " SELECT e.content, e.created_at FROM event e " . to_owned ( ) ;
2022-07-04 18:25:32 -04:00
// query parameters for SQLite
2022-01-22 22:29:15 -05:00
let mut params : Vec < Box < dyn ToSql > > = vec! [ ] ;
2022-08-07 11:15:36 -04:00
2022-07-04 18:25:32 -04:00
// individual filter components (single conditions such as an author or event ID)
let mut filter_components : Vec < String > = Vec ::new ( ) ;
// Query for "authors", allowing prefix matches
if let Some ( authvec ) = & f . authors {
// take each author and convert to a hexsearch
let mut auth_searches : Vec < String > = vec! [ ] ;
for auth in authvec {
match hex_range ( auth ) {
Some ( HexSearch ::Exact ( ex ) ) = > {
2022-10-16 16:25:06 -04:00
auth_searches . push ( " author=? OR delegated_by=? " . to_owned ( ) ) ;
params . push ( Box ::new ( ex . clone ( ) ) ) ;
2022-07-04 18:25:32 -04:00
params . push ( Box ::new ( ex ) ) ;
}
Some ( HexSearch ::Range ( lower , upper ) ) = > {
2022-10-16 16:25:06 -04:00
auth_searches . push (
" (author>? AND author<?) OR (delegated_by>? AND delegated_by<?) " . to_owned ( ) ,
) ;
params . push ( Box ::new ( lower . clone ( ) ) ) ;
params . push ( Box ::new ( upper . clone ( ) ) ) ;
2022-07-04 18:25:32 -04:00
params . push ( Box ::new ( lower ) ) ;
params . push ( Box ::new ( upper ) ) ;
}
Some ( HexSearch ::LowerOnly ( lower ) ) = > {
2022-10-16 16:25:06 -04:00
auth_searches . push ( " author>? OR delegated_by>? " . to_owned ( ) ) ;
params . push ( Box ::new ( lower . clone ( ) ) ) ;
2022-07-04 18:25:32 -04:00
params . push ( Box ::new ( lower ) ) ;
}
None = > {
info! ( " Could not parse hex range from author {:?} " , auth ) ;
2022-01-25 19:21:43 -05:00
}
}
2021-12-11 16:48:59 -05:00
}
2022-12-14 23:07:56 -05:00
if ! authvec . is_empty ( ) {
2022-11-19 11:00:38 -05:00
let authors_clause = format! ( " ( {} ) " , auth_searches . join ( " OR " ) ) ;
filter_components . push ( authors_clause ) ;
} else {
// if the authors list was empty, we should never return
// any results.
filter_components . push ( " false " . to_owned ( ) ) ;
}
2022-07-04 18:25:32 -04:00
}
// Query for Kind
if let Some ( ks ) = & f . kinds {
// kind is number, no escaping needed
let str_kinds : Vec < String > = ks . iter ( ) . map ( | x | x . to_string ( ) ) . collect ( ) ;
let kind_clause = format! ( " kind IN ( {} ) " , str_kinds . join ( " , " ) ) ;
filter_components . push ( kind_clause ) ;
}
// Query for event, allowing prefix matches
if let Some ( idvec ) = & f . ids {
// take each author and convert to a hexsearch
let mut id_searches : Vec < String > = vec! [ ] ;
for id in idvec {
match hex_range ( id ) {
Some ( HexSearch ::Exact ( ex ) ) = > {
id_searches . push ( " event_hash=? " . to_owned ( ) ) ;
params . push ( Box ::new ( ex ) ) ;
}
Some ( HexSearch ::Range ( lower , upper ) ) = > {
id_searches . push ( " (event_hash>? AND event_hash<?) " . to_owned ( ) ) ;
params . push ( Box ::new ( lower ) ) ;
params . push ( Box ::new ( upper ) ) ;
}
Some ( HexSearch ::LowerOnly ( lower ) ) = > {
id_searches . push ( " event_hash>? " . to_owned ( ) ) ;
params . push ( Box ::new ( lower ) ) ;
}
None = > {
info! ( " Could not parse hex range from id {:?} " , id ) ;
2022-01-25 19:21:43 -05:00
}
}
2021-12-11 16:48:59 -05:00
}
2022-12-14 23:07:56 -05:00
if ! idvec . is_empty ( ) {
2022-11-19 11:35:00 -05:00
let id_clause = format! ( " ( {} ) " , id_searches . join ( " OR " ) ) ;
filter_components . push ( id_clause ) ;
} else {
// if the ids list was empty, we should never return
// any results.
filter_components . push ( " false " . to_owned ( ) ) ;
}
2022-07-04 18:25:32 -04:00
}
// Query for tags
if let Some ( map ) = & f . tags {
for ( key , val ) in map . iter ( ) {
let mut str_vals : Vec < Box < dyn ToSql > > = vec! [ ] ;
let mut blob_vals : Vec < Box < dyn ToSql > > = vec! [ ] ;
for v in val {
2022-08-21 12:10:19 -04:00
if ( v . len ( ) % 2 = = 0 ) & & is_lower_hex ( v ) {
2022-11-02 19:33:44 -04:00
if let Ok ( h ) = hex ::decode ( v ) {
2022-07-04 18:25:32 -04:00
blob_vals . push ( Box ::new ( h ) ) ;
2022-01-22 22:29:15 -05:00
}
2022-07-04 18:25:32 -04:00
} else {
str_vals . push ( Box ::new ( v . to_owned ( ) ) ) ;
2022-01-22 22:29:15 -05:00
}
}
2022-07-04 18:25:32 -04:00
// create clauses with "?" params for each tag value being searched
let str_clause = format! ( " value IN ( {} ) " , repeat_vars ( str_vals . len ( ) ) ) ;
let blob_clause = format! ( " value_hex IN ( {} ) " , repeat_vars ( blob_vals . len ( ) ) ) ;
2022-08-17 19:34:11 -04:00
// find evidence of the target tag name/value existing for this event.
2022-08-12 01:16:10 -04:00
let tag_clause = format! ( " e.id IN (SELECT e.id FROM event e LEFT JOIN tag t on e.id=t.event_id WHERE hidden!=TRUE and (name=? AND ( {} OR {} ))) " , str_clause , blob_clause ) ;
2022-07-04 18:25:32 -04:00
// add the tag name as the first parameter
2022-08-07 11:15:36 -04:00
params . push ( Box ::new ( key . to_string ( ) ) ) ;
2022-07-04 18:25:32 -04:00
// add all tag values that are plain strings as params
params . append ( & mut str_vals ) ;
// add all tag values that are blobs as params
params . append ( & mut blob_vals ) ;
filter_components . push ( tag_clause ) ;
2021-12-11 16:48:59 -05:00
}
}
2022-07-04 18:25:32 -04:00
// Query for timestamp
if f . since . is_some ( ) {
let created_clause = format! ( " created_at > {} " , f . since . unwrap ( ) ) ;
filter_components . push ( created_clause ) ;
}
// Query for timestamp
if f . until . is_some ( ) {
let until_clause = format! ( " created_at < {} " , f . until . unwrap ( ) ) ;
filter_components . push ( until_clause ) ;
}
2022-01-25 21:48:46 -05:00
// never display hidden events
2022-07-04 18:25:32 -04:00
query . push_str ( " WHERE hidden!=TRUE " ) ;
// build filter component conditions
if ! filter_components . is_empty ( ) {
query . push_str ( " AND " ) ;
query . push_str ( & filter_components . join ( " AND " ) ) ;
}
// Apply per-filter limit to this subquery.
// The use of a LIMIT implies a DESC order, to capture only the most recent events.
if let Some ( lim ) = f . limit {
2022-09-02 13:38:31 -04:00
let _ = write! ( query , " ORDER BY e.created_at DESC LIMIT {} " , lim ) ;
2022-07-04 18:25:32 -04:00
} else {
query . push_str ( " ORDER BY e.created_at ASC " )
2021-12-11 16:48:59 -05:00
}
2022-07-04 18:25:32 -04:00
( query , params )
}
/// Create a dynamic SQL query string and params from a subscription.
fn query_from_sub ( sub : & Subscription ) -> ( String , Vec < Box < dyn ToSql > > ) {
// build a dynamic SQL query for an entire subscription, based on
// SQL subqueries for filters.
let mut subqueries : Vec < String > = Vec ::new ( ) ;
// subquery params
let mut params : Vec < Box < dyn ToSql > > = vec! [ ] ;
// for every filter in the subscription, generate a subquery
for f in sub . filters . iter ( ) {
2022-09-02 11:30:51 -04:00
let ( f_subquery , mut f_params ) = query_from_filter ( f ) ;
2022-07-04 18:25:32 -04:00
subqueries . push ( f_subquery ) ;
params . append ( & mut f_params ) ;
2022-05-09 16:39:49 -04:00
}
2022-07-04 18:25:32 -04:00
// encapsulate subqueries into select statements
let subqueries_selects : Vec < String > = subqueries
. iter ( )
2022-12-17 08:49:28 -05:00
. map ( | s | format! ( " SELECT distinct content, created_at FROM ( {} ) " , s ) )
2022-07-04 18:25:32 -04:00
. collect ( ) ;
let query : String = subqueries_selects . join ( " UNION " ) ;
2022-01-22 22:29:15 -05:00
( query , params )
2021-12-11 16:48:59 -05:00
}
2022-12-20 11:07:01 -05:00
/// Check if the pool is fully utilized
fn _pool_at_capacity ( pool : & SqlitePool ) -> bool {
let state : r2d2 ::State = pool . state ( ) ;
state . idle_connections = = 0
}
2022-12-16 18:01:49 -05:00
fn log_pool_stats ( pool : & SqlitePool ) {
let state : r2d2 ::State = pool . state ( ) ;
let in_use_cxns = state . connections - state . idle_connections ;
debug! (
" DB pool usage (in_use: {}, available: {}) " ,
in_use_cxns , state . connections
) ;
}
2021-12-11 22:43:41 -05:00
/// Perform a database query using a subscription.
///
/// The [`Subscription`] is converted into a SQL query. Each result
/// is published on the `query_tx` channel as it is returned. If a
/// message becomes available on the `abandon_query_rx` channel, the
/// query is immediately aborted.
2021-12-11 16:48:59 -05:00
pub async fn db_query (
sub : Subscription ,
2022-08-17 19:34:11 -04:00
client_id : String ,
2022-02-23 17:38:16 -05:00
pool : SqlitePool ,
2021-12-11 16:48:59 -05:00
query_tx : tokio ::sync ::mpsc ::Sender < QueryResult > ,
mut abandon_query_rx : tokio ::sync ::oneshot ::Receiver < ( ) > ,
) {
2022-12-18 23:11:46 -05:00
let pre_spawn_start = Instant ::now ( ) ;
2021-12-11 16:48:59 -05:00
task ::spawn_blocking ( move | | {
2022-12-18 23:11:46 -05:00
let db_queue_time = pre_spawn_start . elapsed ( ) ;
// report queuing time if it is slow
if db_queue_time > Duration ::from_secs ( 1 ) {
debug! (
" (slow) DB query queued for {:?} (cid: {}, sub: {:?}) " ,
db_queue_time , client_id , sub . id
) ;
}
2022-01-01 20:25:09 -05:00
let start = Instant ::now ( ) ;
2022-12-18 14:42:31 -05:00
let mut row_count : usize = 0 ;
2021-12-11 22:43:41 -05:00
// generate SQL query
2022-01-22 22:29:15 -05:00
let ( q , p ) = query_from_sub ( & sub ) ;
2022-12-18 00:18:54 -05:00
debug! ( " SQL generated in {:?} " , start . elapsed ( ) ) ;
2022-02-23 17:38:16 -05:00
// show pool stats
2022-12-16 18:01:49 -05:00
log_pool_stats ( & pool ) ;
2022-12-16 09:17:39 -05:00
// cutoff for displaying slow queries
2022-12-18 21:47:11 -05:00
let slow_cutoff = Duration ::from_millis ( 2000 ) ;
2022-12-19 01:02:28 -05:00
// any client that doesn't cause us to generate new rows in 5
// seconds gets dropped.
let abort_cutoff = Duration ::from_secs ( 5 ) ;
2022-02-21 10:03:05 -05:00
let start = Instant ::now ( ) ;
2022-12-18 14:42:31 -05:00
let mut slow_first_event ;
2022-12-19 01:02:28 -05:00
let mut last_successful_send = Instant ::now ( ) ;
2022-02-23 17:38:16 -05:00
if let Ok ( conn ) = pool . get ( ) {
// execute the query. Don't cache, since queries vary so much.
let mut stmt = conn . prepare ( & q ) ? ;
let mut event_rows = stmt . query ( rusqlite ::params_from_iter ( p ) ) ? ;
let mut first_result = true ;
while let Some ( row ) = event_rows . next ( ) ? {
2022-12-18 14:42:31 -05:00
let first_event_elapsed = start . elapsed ( ) ;
slow_first_event = first_event_elapsed > = slow_cutoff ;
2022-02-23 17:38:16 -05:00
if first_result {
2022-08-17 19:34:11 -04:00
debug! (
2022-12-16 16:22:27 -05:00
" first result in {:?} (cid: {}, sub: {:?}) " ,
2022-12-18 14:42:31 -05:00
first_event_elapsed , client_id , sub . id
2022-08-17 19:34:11 -04:00
) ;
2022-02-23 17:38:16 -05:00
first_result = false ;
}
2022-12-18 21:47:11 -05:00
// logging for slow queries; show sub and SQL.
// to reduce logging; only show 1/16th of clients (leading 0)
2022-12-18 23:11:46 -05:00
if slow_first_event & & client_id . starts_with ( " 00 " ) {
debug! (
2022-12-18 14:42:31 -05:00
" query req (slow): {:?} (cid: {}, sub: {:?}) " ,
sub , client_id , sub . id
) ;
2022-12-18 23:11:46 -05:00
debug! (
2022-12-18 14:42:31 -05:00
" query string (slow): {} (cid: {}, sub: {:?}) " ,
q , client_id , sub . id
) ;
} else {
trace! (
" query req: {:?} (cid: {}, sub: {:?}) " ,
sub ,
client_id ,
sub . id
) ;
trace! (
" query string: {} (cid: {}, sub: {:?}) " ,
q ,
client_id ,
sub . id
) ;
}
2022-12-18 21:47:11 -05:00
// check if this is still active; every 100 rows
if row_count % 100 = = 0 & & abandon_query_rx . try_recv ( ) . is_ok ( ) {
2022-12-16 16:22:27 -05:00
debug! ( " query aborted (cid: {}, sub: {:?}) " , client_id , sub . id ) ;
2022-02-23 17:38:16 -05:00
return Ok ( ( ) ) ;
}
row_count + = 1 ;
let event_json = row . get ( 0 ) ? ;
2022-12-19 01:02:28 -05:00
loop {
if query_tx . capacity ( ) ! = 0 {
// we have capacity to add another item
break ;
} else {
// the queue is full
trace! ( " db reader thread is stalled " ) ;
if last_successful_send + abort_cutoff < Instant ::now ( ) {
// the queue has been full for too long, abort
info! ( " aborting database query due to slow client " ) ;
let ok : Result < ( ) > = Ok ( ( ) ) ;
return ok ;
}
// give the queue a chance to clear before trying again
thread ::sleep ( Duration ::from_millis ( 100 ) ) ;
}
}
// TODO: we could use try_send, but we'd have to juggle
// getting the query result back as part of the error
// result.
2022-02-23 17:38:16 -05:00
query_tx
. blocking_send ( QueryResult {
sub_id : sub . get_id ( ) ,
event : event_json ,
} )
. ok ( ) ;
2022-12-19 01:02:28 -05:00
last_successful_send = Instant ::now ( ) ;
2021-12-11 16:48:59 -05:00
}
2022-05-30 18:02:59 -04:00
query_tx
. blocking_send ( QueryResult {
sub_id : sub . get_id ( ) ,
event : " EOSE " . to_string ( ) ,
} )
. ok ( ) ;
2022-02-23 17:38:16 -05:00
debug! (
2022-12-18 23:11:46 -05:00
" query completed in {:?} (cid: {}, sub: {:?}, db_time: {:?}, rows: {}) " ,
pre_spawn_start . elapsed ( ) ,
2022-08-21 12:10:19 -04:00
client_id ,
2022-12-16 16:22:27 -05:00
sub . id ,
2022-12-18 23:11:46 -05:00
start . elapsed ( ) ,
2022-12-16 16:22:27 -05:00
row_count
2022-02-23 17:38:16 -05:00
) ;
} else {
warn! ( " Could not get a database connection for querying " ) ;
2021-12-11 16:48:59 -05:00
}
2022-01-05 18:33:08 -05:00
let ok : Result < ( ) > = Ok ( ( ) ) ;
2022-01-22 22:29:15 -05:00
ok
2021-12-11 16:48:59 -05:00
} ) ;
}