2021-12-11 22:43:41 -05:00
//! Event persistence and querying
2021-12-11 16:48:59 -05:00
use crate ::error ::Result ;
use crate ::event ::Event ;
use crate ::subscription ::Subscription ;
2021-12-30 22:07:21 -05:00
use governor ::clock ::Clock ;
use governor ::{ Quota , RateLimiter } ;
2021-12-11 16:48:59 -05:00
use hex ;
use log ::* ;
use rusqlite ::params ;
use rusqlite ::Connection ;
use rusqlite ::OpenFlags ;
2021-12-30 22:07:21 -05:00
//use std::num::NonZeroU32;
use crate ::config ::SETTINGS ;
2022-01-22 22:29:15 -05:00
use rusqlite ::limits ::Limit ;
use rusqlite ::types ::ToSql ;
2021-12-11 16:48:59 -05:00
use std ::path ::Path ;
2021-12-30 22:07:21 -05:00
use std ::thread ;
2022-01-01 20:25:09 -05:00
use std ::time ::Instant ;
2021-12-11 16:48:59 -05:00
use tokio ::task ;
2021-12-11 22:43:41 -05:00
/// Database file
2021-12-11 16:48:59 -05:00
const DB_FILE : & str = " nostr.db " ;
2021-12-30 14:55:05 -05:00
/// Startup DB Pragmas
const STARTUP_SQL : & str = r ##"
PRAGMA main . synchronous = NORMAL ;
PRAGMA foreign_keys = ON ;
pragma mmap_size = 536870912 ; - - 512 MB of mmap
" ##;
2021-12-11 22:43:41 -05:00
/// Schema definition
2021-12-11 16:48:59 -05:00
const INIT_SQL : & str = r ##"
2021-12-11 22:43:41 -05:00
- - Database settings
2021-12-11 16:48:59 -05:00
PRAGMA encoding = " UTF-8 " ;
PRAGMA journal_mode = WAL ;
PRAGMA main . synchronous = NORMAL ;
PRAGMA foreign_keys = ON ;
PRAGMA application_id = 1654008667 ;
2022-01-22 22:29:15 -05:00
PRAGMA user_version = 3 ;
2021-12-11 22:43:41 -05:00
- - Event Table
2021-12-11 16:48:59 -05:00
CREATE TABLE IF NOT EXISTS event (
id INTEGER PRIMARY KEY ,
event_hash BLOB NOT NULL , - - 4 - byte hash
first_seen INTEGER NOT NULL , - - when the event was first seen ( not authored ! ) ( seconds since 1970 )
created_at INTEGER NOT NULL , - - when the event was authored
author BLOB NOT NULL , - - author pubkey
kind INTEGER NOT NULL , - - event kind
2021-12-30 14:55:05 -05:00
hidden INTEGER , - - relevant for queries
2021-12-11 16:48:59 -05:00
content TEXT NOT NULL - - serialized json of event object
) ;
2021-12-11 22:43:41 -05:00
- - Event Indexes
2021-12-11 16:48:59 -05:00
CREATE UNIQUE INDEX IF NOT EXISTS event_hash_index ON event ( event_hash ) ;
CREATE INDEX IF NOT EXISTS created_at_index ON event ( created_at ) ;
CREATE INDEX IF NOT EXISTS author_index ON event ( author ) ;
CREATE INDEX IF NOT EXISTS kind_index ON event ( kind ) ;
2021-12-11 22:43:41 -05:00
2022-01-22 22:29:15 -05:00
- - Tag Table
- - Tag values are stored as either a BLOB ( if they come in as a
- - hex - string ) , or TEXT otherwise .
- - This means that searches need to select the appropriate column .
CREATE TABLE IF NOT EXISTS tag (
id INTEGER PRIMARY KEY ,
event_id INTEGER NOT NULL , - - an event ID that contains a tag .
name TEXT , - - the tag name ( " p " , " e " , whatever )
value TEXT , - - the tag value , if not hex .
value_hex BLOB , - - the tag value , if it can be interpreted as a hex string .
FOREIGN KEY ( event_id ) REFERENCES event ( id ) ON UPDATE CASCADE ON DELETE CASCADE
) ;
CREATE INDEX IF NOT EXISTS tag_val_index ON tag ( value ) ;
CREATE INDEX IF NOT EXISTS tag_val_hex_index ON tag ( value_hex ) ;
2021-12-11 22:43:41 -05:00
- - Event References Table
2021-12-11 16:48:59 -05:00
CREATE TABLE IF NOT EXISTS event_ref (
id INTEGER PRIMARY KEY ,
event_id INTEGER NOT NULL , - - an event ID that contains an #e tag .
referenced_event BLOB NOT NULL , - - the event that is referenced .
FOREIGN KEY ( event_id ) REFERENCES event ( id ) ON UPDATE CASCADE ON DELETE CASCADE
) ;
2021-12-11 22:43:41 -05:00
- - Event References Index
2021-12-11 16:48:59 -05:00
CREATE INDEX IF NOT EXISTS event_ref_index ON event_ref ( referenced_event ) ;
2021-12-11 22:43:41 -05:00
- - Pubkey References Table
2021-12-11 16:48:59 -05:00
CREATE TABLE IF NOT EXISTS pubkey_ref (
id INTEGER PRIMARY KEY ,
event_id INTEGER NOT NULL , - - an event ID that contains an #p tag .
referenced_pubkey BLOB NOT NULL , - - the pubkey that is referenced .
FOREIGN KEY ( event_id ) REFERENCES event ( id ) ON UPDATE RESTRICT ON DELETE CASCADE
) ;
2021-12-11 22:43:41 -05:00
- - Pubkey References Index
2021-12-11 16:48:59 -05:00
CREATE INDEX IF NOT EXISTS pubkey_ref_index ON pubkey_ref ( referenced_pubkey ) ;
" ##;
2021-12-30 14:55:05 -05:00
/// Upgrade DB to latest version, and execute pragma settings
pub fn upgrade_db ( conn : & mut Connection ) -> Result < ( ) > {
// check the version.
2022-01-22 22:29:15 -05:00
let mut curr_version = db_version ( conn ) ? ;
2021-12-30 14:55:05 -05:00
info! ( " DB version = {:?} " , curr_version ) ;
2022-01-22 22:29:15 -05:00
debug! (
" SQLite max query parameters: {} " ,
conn . limit ( Limit ::SQLITE_LIMIT_VARIABLE_NUMBER )
) ;
debug! (
" SQLite max table/blob/text length: {} MB " ,
( conn . limit ( Limit ::SQLITE_LIMIT_LENGTH ) as f64 / ( 1024 * 1024 ) as f64 ) . floor ( )
) ;
debug! (
" SQLite max SQL length: {} MB " ,
( conn . limit ( Limit ::SQLITE_LIMIT_SQL_LENGTH ) as f64 / ( 1024 * 1024 ) as f64 ) . floor ( )
) ;
2021-12-30 14:55:05 -05:00
// initialize from scratch
if curr_version = = 0 {
match conn . execute_batch ( INIT_SQL ) {
2022-01-22 22:29:15 -05:00
Ok ( ( ) ) = > {
info! ( " database pragma/schema initialized to v3, and ready " ) ;
//curr_version = 3;
}
2021-12-30 14:55:05 -05:00
Err ( err ) = > {
error! ( " update failed: {} " , err ) ;
panic! ( " database could not be initialized " ) ;
}
}
2022-01-22 22:29:15 -05:00
}
if curr_version = = 1 {
2021-12-30 14:55:05 -05:00
// only change is adding a hidden column to events.
let upgrade_sql = r ##"
ALTER TABLE event ADD hidden INTEGER ;
UPDATE event SET hidden = FALSE ;
PRAGMA user_version = 2 ;
" ##;
match conn . execute_batch ( upgrade_sql ) {
2022-01-22 22:29:15 -05:00
Ok ( ( ) ) = > {
info! ( " database schema upgraded v1 -> v2 " ) ;
curr_version = 2 ;
}
Err ( err ) = > {
error! ( " update failed: {} " , err ) ;
panic! ( " database could not be upgraded " ) ;
}
}
}
if curr_version = = 2 {
// this version lacks the tag column
debug! ( " database schema needs update from 2->3 " ) ;
let upgrade_sql = r ##"
CREATE TABLE IF NOT EXISTS tag (
id INTEGER PRIMARY KEY ,
event_id INTEGER NOT NULL , - - an event ID that contains a tag .
name TEXT , - - the tag name ( " p " , " e " , whatever )
value TEXT , - - the tag value , if not hex .
value_hex BLOB , - - the tag value , if it can be interpreted as a hex string .
FOREIGN KEY ( event_id ) REFERENCES event ( id ) ON UPDATE CASCADE ON DELETE CASCADE
) ;
CREATE INDEX IF NOT EXISTS tag_val_index ON tag ( value ) ;
CREATE INDEX IF NOT EXISTS tag_val_hex_index ON tag ( value_hex ) ;
PRAGMA user_version = 3 ;
" ##;
// TODO: load existing refs into tag table
match conn . execute_batch ( upgrade_sql ) {
Ok ( ( ) ) = > {
info! ( " database schema upgraded v2 -> v3 " ) ;
//curr_version = 3;
}
2021-12-30 14:55:05 -05:00
Err ( err ) = > {
error! ( " update failed: {} " , err ) ;
panic! ( " database could not be upgraded " ) ;
}
}
2022-01-22 22:29:15 -05:00
info! ( " Starting transaction " ) ;
// iterate over every event/pubkey tag
let tx = conn . transaction ( ) ? ;
{
let mut stmt = tx . prepare ( " select event_id, \" e \" , lower(hex(referenced_event)) from event_ref union select event_id, \" p \" , lower(hex(referenced_pubkey)) from pubkey_ref; " ) ? ;
let mut tag_rows = stmt . query ( [ ] ) ? ;
while let Some ( row ) = tag_rows . next ( ) ? {
// we want to capture the event_id that had the tag, the tag name, and the tag hex value.
let event_id : u64 = row . get ( 0 ) ? ;
let tag_name : String = row . get ( 1 ) ? ;
let tag_value : String = row . get ( 2 ) ? ;
// this will leave behind p/e tags that were non-hex, but they are invalid anyways.
if is_hex ( & tag_value ) {
tx . execute (
" INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3); " ,
params! [ event_id , tag_name , hex ::decode ( & tag_value ) . ok ( ) ] ,
) ? ;
}
}
}
tx . commit ( ) ? ;
info! ( " Upgrade complete " ) ;
} else if curr_version = = 3 {
2021-12-30 14:55:05 -05:00
debug! ( " Database version was already current " ) ;
2022-01-22 22:29:15 -05:00
} else if curr_version > 3 {
2021-12-30 14:55:05 -05:00
panic! ( " Database version is newer than supported by this executable " ) ;
}
// Setup PRAGMA
conn . execute_batch ( STARTUP_SQL ) ? ;
2022-01-22 22:29:15 -05:00
info! ( " Finished pragma " ) ;
2021-12-30 14:55:05 -05:00
Ok ( ( ) )
}
2021-12-11 16:48:59 -05:00
/// Spawn a database writer that persists events to the SQLite store.
pub async fn db_writer (
mut event_rx : tokio ::sync ::mpsc ::Receiver < Event > ,
2021-12-12 11:20:23 -05:00
bcast_tx : tokio ::sync ::broadcast ::Sender < Event > ,
2021-12-30 22:07:21 -05:00
mut shutdown : tokio ::sync ::broadcast ::Receiver < ( ) > ,
2021-12-11 16:48:59 -05:00
) -> tokio ::task ::JoinHandle < Result < ( ) > > {
task ::spawn_blocking ( move | | {
2021-12-31 12:51:57 -05:00
// get database configuration settings
let config = SETTINGS . read ( ) . unwrap ( ) ;
let db_dir = & config . database . data_directory ;
let full_path = Path ::new ( db_dir ) . join ( DB_FILE ) ;
// create a connection
2021-12-11 16:48:59 -05:00
let mut conn = Connection ::open_with_flags (
2021-12-31 12:51:57 -05:00
& full_path ,
2021-12-11 16:48:59 -05:00
OpenFlags ::SQLITE_OPEN_READ_WRITE | OpenFlags ::SQLITE_OPEN_CREATE ,
) ? ;
2021-12-31 12:51:57 -05:00
info! ( " opened database {:?} for writing " , full_path ) ;
2021-12-30 14:55:05 -05:00
upgrade_db ( & mut conn ) ? ;
2022-01-22 22:29:15 -05:00
2021-12-30 22:07:21 -05:00
// get rate limit settings
let rps_setting = config . limits . messages_per_sec ;
2022-01-05 18:30:47 -05:00
let mut most_recent_rate_limit = Instant ::now ( ) ;
2021-12-30 22:07:21 -05:00
let mut lim_opt = None ;
let clock = governor ::clock ::QuantaClock ::default ( ) ;
if let Some ( rps ) = rps_setting {
if rps > 0 {
info! ( " Enabling rate limits for event creation ({}/sec) " , rps ) ;
let quota = core ::num ::NonZeroU32 ::new ( rps * 60 ) . unwrap ( ) ;
lim_opt = Some ( RateLimiter ::direct ( Quota ::per_minute ( quota ) ) ) ;
}
}
2021-12-11 16:48:59 -05:00
loop {
2021-12-31 16:19:35 -05:00
if shutdown . try_recv ( ) . is_ok ( ) {
2021-12-30 22:07:21 -05:00
info! ( " shutting down database writer " ) ;
break ;
}
2021-12-11 16:48:59 -05:00
// call blocking read on channel
let next_event = event_rx . blocking_recv ( ) ;
// if the channel has closed, we will never get work
if next_event . is_none ( ) {
break ;
}
2021-12-30 22:07:21 -05:00
let mut event_write = false ;
2021-12-11 16:48:59 -05:00
let event = next_event . unwrap ( ) ;
2022-01-01 20:25:09 -05:00
let start = Instant ::now ( ) ;
2021-12-11 16:48:59 -05:00
match write_event ( & mut conn , & event ) {
Ok ( updated ) = > {
if updated = = 0 {
2021-12-30 22:07:21 -05:00
debug! ( " ignoring duplicate event " ) ;
2021-12-11 16:48:59 -05:00
} else {
2022-01-01 20:25:09 -05:00
info! (
" persisted event: {} in {:?} " ,
event . get_event_id_prefix ( ) ,
start . elapsed ( )
) ;
2021-12-30 22:07:21 -05:00
event_write = true ;
2021-12-12 11:20:23 -05:00
// send this out to all clients
bcast_tx . send ( event . clone ( ) ) . ok ( ) ;
2021-12-11 16:48:59 -05:00
}
}
Err ( err ) = > {
2021-12-11 22:43:41 -05:00
warn! ( " event insert failed: {} " , err ) ;
2021-12-11 16:48:59 -05:00
}
}
2021-12-30 22:07:21 -05:00
// use rate limit, if defined, and if an event was actually written.
if event_write {
if let Some ( ref lim ) = lim_opt {
if let Err ( n ) = lim . check ( ) {
2022-01-05 18:30:47 -05:00
let wait_for = n . wait_time_from ( clock . now ( ) ) ;
// check if we have recently logged rate
// limits, but print out a message only once
// per second.
if most_recent_rate_limit . elapsed ( ) . as_secs ( ) > 1 {
warn! (
" rate limit reached for event creation (sleep for {:?}) " ,
wait_for
) ;
// reset last rate limit message
most_recent_rate_limit = Instant ::now ( ) ;
}
// block event writes, allowing them to queue up
thread ::sleep ( wait_for ) ;
2021-12-30 22:07:21 -05:00
continue ;
}
}
}
2021-12-11 16:48:59 -05:00
}
conn . close ( ) . ok ( ) ;
info! ( " database connection closed " ) ;
Ok ( ( ) )
} )
}
2021-12-30 14:55:05 -05:00
pub fn db_version ( conn : & mut Connection ) -> Result < usize > {
let query = " PRAGMA user_version; " ;
let curr_version = conn . query_row ( query , [ ] , | row | row . get ( 0 ) ) ? ;
Ok ( curr_version )
}
2021-12-11 22:43:41 -05:00
/// Persist an event to the database.
2021-12-11 16:48:59 -05:00
pub fn write_event ( conn : & mut Connection , e : & Event ) -> Result < usize > {
// start transaction
let tx = conn . transaction ( ) ? ;
// get relevant fields from event and convert to blobs.
let id_blob = hex ::decode ( & e . id ) . ok ( ) ;
let pubkey_blob = hex ::decode ( & e . pubkey ) . ok ( ) ;
let event_str = serde_json ::to_string ( & e ) . ok ( ) ;
2021-12-11 22:43:41 -05:00
// ignore if the event hash is a duplicate.
2021-12-11 16:48:59 -05:00
let ins_count = tx . execute (
2021-12-30 14:55:05 -05:00
" INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, strftime('%s','now'), FALSE); " ,
2021-12-11 16:48:59 -05:00
params! [ id_blob , e . created_at , e . kind , pubkey_blob , event_str ]
) ? ;
2021-12-11 22:43:41 -05:00
if ins_count = = 0 {
// if the event was a duplicate, no need to insert event or
// pubkey references.
return Ok ( ins_count ) ;
}
// remember primary key of the event most recently inserted.
2021-12-11 16:48:59 -05:00
let ev_id = tx . last_insert_rowid ( ) ;
2022-01-22 22:29:15 -05:00
// add all tags to the tag table
for tag in e . tags . iter ( ) {
// ensure we have 2 values.
if tag . len ( ) > = 2 {
let tagname = & tag [ 0 ] ;
let tagval = & tag [ 1 ] ;
// if tagvalue is hex;
if is_hex ( tagval ) {
tx . execute (
" INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3) " ,
params! [ ev_id , & tagname , hex ::decode ( & tagval ) . ok ( ) ] ,
) ? ;
} else {
tx . execute (
" INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3) " ,
params! [ ev_id , & tagname , & tagval ] ,
) ? ;
}
2021-12-11 16:48:59 -05:00
}
}
2021-12-30 14:55:05 -05:00
// if this event is for a metadata update, hide every other kind=0
2021-12-30 16:45:03 -05:00
// event from the same author that was issued earlier than this.
2021-12-30 14:55:05 -05:00
if e . kind = = 0 {
let update_count = tx . execute (
2021-12-30 16:45:03 -05:00
" UPDATE event SET hidden=TRUE WHERE id!=? AND kind=0 AND author=? AND created_at <= ? and hidden!=TRUE " ,
2021-12-30 14:55:05 -05:00
params! [ ev_id , hex ::decode ( & e . pubkey ) . ok ( ) , e . created_at ] ,
) ? ;
if update_count > 0 {
info! ( " hid {} older metadata events " , update_count ) ;
}
}
2021-12-30 16:45:03 -05:00
// if this event is for a contact update, hide every other kind=3
// event from the same author that was issued earlier than this.
if e . kind = = 3 {
let update_count = tx . execute (
" UPDATE event SET hidden=TRUE WHERE id!=? AND kind=3 AND author=? AND created_at <= ? and hidden!=TRUE " ,
params! [ ev_id , hex ::decode ( & e . pubkey ) . ok ( ) , e . created_at ] ,
) ? ;
if update_count > 0 {
info! ( " hid {} older contact events " , update_count ) ;
}
}
2021-12-11 16:48:59 -05:00
tx . commit ( ) ? ;
Ok ( ins_count )
}
2021-12-11 22:43:41 -05:00
/// Event resulting from a specific subscription request
2021-12-11 16:48:59 -05:00
#[ derive(PartialEq, Debug, Clone) ]
pub struct QueryResult {
2021-12-11 22:43:41 -05:00
/// Subscription identifier
2021-12-11 16:48:59 -05:00
pub sub_id : String ,
2021-12-11 22:43:41 -05:00
/// Serialized event
2021-12-11 16:48:59 -05:00
pub event : String ,
}
2021-12-11 22:43:41 -05:00
/// Check if a string contains only hex characters.
fn is_hex ( s : & str ) -> bool {
2021-12-11 16:48:59 -05:00
s . chars ( ) . all ( | x | char ::is_ascii_hexdigit ( & x ) )
}
2022-01-22 22:29:15 -05:00
fn repeat_vars ( count : usize ) -> String {
if count = = 0 {
return " " . to_owned ( ) ;
}
let mut s = " ?, " . repeat ( count ) ;
// Remove trailing comma
s . pop ( ) ;
s
}
/// Create a dynamic SQL query string and params from a subscription.
fn query_from_sub ( sub : & Subscription ) -> ( String , Vec < Box < dyn ToSql > > ) {
2021-12-11 16:48:59 -05:00
// build a dynamic SQL query. all user-input is either an integer
// (sqli-safe), or a string that is filtered to only contain
2022-01-22 22:29:15 -05:00
// hexadecimal characters. Strings that require escaping (tag
// names/values) use parameters.
2021-12-11 16:48:59 -05:00
let mut query =
2022-01-22 22:29:15 -05:00
" SELECT DISTINCT(e.content) FROM event e LEFT JOIN tag t ON e.id=t.event_id " . to_owned ( ) ;
// parameters
let mut params : Vec < Box < dyn ToSql > > = vec! [ ] ;
2021-12-11 16:48:59 -05:00
// for every filter in the subscription, generate a where clause
let mut filter_clauses : Vec < String > = Vec ::new ( ) ;
for f in sub . filters . iter ( ) {
// individual filter components
let mut filter_components : Vec < String > = Vec ::new ( ) ;
// Query for "authors"
if f . authors . is_some ( ) {
let authors_escaped : Vec < String > = f
. authors
. as_ref ( )
. unwrap ( )
. iter ( )
2021-12-11 22:43:41 -05:00
. filter ( | & x | is_hex ( x ) )
2021-12-11 16:48:59 -05:00
. map ( | x | format! ( " x' {} ' " , x ) )
. collect ( ) ;
let authors_clause = format! ( " author IN ( {} ) " , authors_escaped . join ( " , " ) ) ;
filter_components . push ( authors_clause ) ;
}
// Query for Kind
2022-01-01 19:38:52 -05:00
if let Some ( ks ) = & f . kinds {
2021-12-11 16:48:59 -05:00
// kind is number, no escaping needed
2022-01-01 19:38:52 -05:00
let str_kinds : Vec < String > = ks . iter ( ) . map ( | x | x . to_string ( ) ) . collect ( ) ;
let kind_clause = format! ( " kind IN ( {} ) " , str_kinds . join ( " , " ) ) ;
2021-12-11 16:48:59 -05:00
filter_components . push ( kind_clause ) ;
}
// Query for event
2022-01-01 19:38:52 -05:00
if f . ids . is_some ( ) {
let ids_escaped : Vec < String > = f
. ids
. as_ref ( )
. unwrap ( )
. iter ( )
. filter ( | & x | is_hex ( x ) )
. map ( | x | format! ( " x' {} ' " , x ) )
. collect ( ) ;
let id_clause = format! ( " event_hash IN ( {} ) " , ids_escaped . join ( " , " ) ) ;
filter_components . push ( id_clause ) ;
2021-12-11 16:48:59 -05:00
}
2022-01-22 22:29:15 -05:00
// Query for tags
if let Some ( map ) = & f . tags {
for ( key , val ) in map . iter ( ) {
let mut str_vals : Vec < Box < dyn ToSql > > = vec! [ ] ;
let mut blob_vals : Vec < Box < dyn ToSql > > = vec! [ ] ;
for v in val {
if is_hex ( v ) {
if let Ok ( h ) = hex ::decode ( & v ) {
blob_vals . push ( Box ::new ( h ) ) ;
}
} else {
str_vals . push ( Box ::new ( v . to_owned ( ) ) ) ;
}
}
// create clauses with "?" params for each tag value being searched
let str_clause = format! ( " value IN ( {} ) " , repeat_vars ( str_vals . len ( ) ) ) ;
let blob_clause = format! ( " value_hex IN ( {} ) " , repeat_vars ( blob_vals . len ( ) ) ) ;
let tag_clause = format! ( " (name=? AND ( {} OR {} )) " , str_clause , blob_clause ) ;
// add the tag name as the first parameter
params . push ( Box ::new ( key . to_owned ( ) ) ) ;
// add all tag values that are plain strings as params
params . append ( & mut str_vals ) ;
// add all tag values that are blobs as params
params . append ( & mut blob_vals ) ;
filter_components . push ( tag_clause ) ;
}
2021-12-11 16:48:59 -05:00
}
// Query for timestamp
if f . since . is_some ( ) {
let created_clause = format! ( " created_at > {} " , f . since . unwrap ( ) ) ;
filter_components . push ( created_clause ) ;
}
2021-12-23 22:38:32 -05:00
// Query for timestamp
if f . until . is_some ( ) {
let until_clause = format! ( " created_at < {} " , f . until . unwrap ( ) ) ;
filter_components . push ( until_clause ) ;
}
2021-12-11 16:48:59 -05:00
// combine all clauses, and add to filter_clauses
2021-12-11 22:56:52 -05:00
if ! filter_components . is_empty ( ) {
2021-12-11 16:48:59 -05:00
let mut fc = " ( " . to_owned ( ) ;
fc . push_str ( & filter_components . join ( " AND " ) ) ;
fc . push_str ( " ) " ) ;
filter_clauses . push ( fc ) ;
2021-12-12 15:52:55 -05:00
} else {
2021-12-30 14:55:05 -05:00
// never display hidden events
2021-12-31 17:34:10 -05:00
filter_clauses . push ( " hidden!=TRUE " . to_owned ( ) ) ;
2021-12-11 16:48:59 -05:00
}
}
// combine all filters with OR clauses, if any exist
2021-12-11 22:56:52 -05:00
if ! filter_clauses . is_empty ( ) {
2021-12-11 16:48:59 -05:00
query . push_str ( " WHERE " ) ;
query . push_str ( & filter_clauses . join ( " OR " ) ) ;
}
2021-12-23 22:36:46 -05:00
// add order clause
query . push_str ( " ORDER BY created_at ASC " ) ;
2021-12-12 11:03:28 -05:00
debug! ( " query string: {} " , query ) ;
2022-01-22 22:29:15 -05:00
( query , params )
2021-12-11 16:48:59 -05:00
}
2021-12-11 22:43:41 -05:00
/// Perform a database query using a subscription.
///
/// The [`Subscription`] is converted into a SQL query. Each result
/// is published on the `query_tx` channel as it is returned. If a
/// message becomes available on the `abandon_query_rx` channel, the
/// query is immediately aborted.
2021-12-11 16:48:59 -05:00
pub async fn db_query (
sub : Subscription ,
query_tx : tokio ::sync ::mpsc ::Sender < QueryResult > ,
mut abandon_query_rx : tokio ::sync ::oneshot ::Receiver < ( ) > ,
) {
task ::spawn_blocking ( move | | {
2021-12-31 12:51:57 -05:00
let config = SETTINGS . read ( ) . unwrap ( ) ;
let db_dir = & config . database . data_directory ;
let full_path = Path ::new ( db_dir ) . join ( DB_FILE ) ;
2022-01-05 18:33:08 -05:00
let conn = Connection ::open_with_flags ( & full_path , OpenFlags ::SQLITE_OPEN_READ_ONLY ) ? ;
2021-12-12 11:03:28 -05:00
debug! ( " opened database for reading " ) ;
debug! ( " going to query for: {:?} " , sub ) ;
2022-01-01 20:25:09 -05:00
let mut row_count : usize = 0 ;
let start = Instant ::now ( ) ;
2021-12-11 22:43:41 -05:00
// generate SQL query
2022-01-22 22:29:15 -05:00
let ( q , p ) = query_from_sub ( & sub ) ;
2021-12-11 22:43:41 -05:00
// execute the query
2022-01-05 18:33:08 -05:00
let mut stmt = conn . prepare ( & q ) ? ;
2022-01-22 22:29:15 -05:00
let mut event_rows = stmt . query ( rusqlite ::params_from_iter ( p ) ) ? ;
2022-01-05 18:33:08 -05:00
while let Some ( row ) = event_rows . next ( ) ? {
2021-12-11 16:48:59 -05:00
// check if this is still active (we could do this every N rows)
if abandon_query_rx . try_recv ( ) . is_ok ( ) {
2021-12-11 22:43:41 -05:00
debug! ( " query aborted " ) ;
2022-01-05 18:33:08 -05:00
return Ok ( ( ) ) ;
2021-12-11 16:48:59 -05:00
}
2022-01-01 20:25:09 -05:00
row_count + = 1 ;
2022-01-05 18:33:08 -05:00
let event_json = row . get ( 0 ) ? ;
2021-12-11 16:48:59 -05:00
query_tx
. blocking_send ( QueryResult {
sub_id : sub . get_id ( ) ,
event : event_json ,
} )
. ok ( ) ;
}
2022-01-01 20:25:09 -05:00
debug! (
" query completed ({} rows) in {:?} " ,
row_count ,
start . elapsed ( )
) ;
2022-01-05 18:33:08 -05:00
let ok : Result < ( ) > = Ok ( ( ) ) ;
2022-01-22 22:29:15 -05:00
ok
2021-12-11 16:48:59 -05:00
} ) ;
}