2021-12-11 22:43:41 -05:00
//! Event persistence and querying
2022-02-12 10:29:25 -05:00
use crate ::config ::SETTINGS ;
use crate ::error ::Error ;
2021-12-11 16:48:59 -05:00
use crate ::error ::Result ;
use crate ::event ::Event ;
2022-02-12 10:29:25 -05:00
use crate ::nip05 ;
2021-12-11 16:48:59 -05:00
use crate ::subscription ::Subscription ;
2021-12-30 22:07:21 -05:00
use governor ::clock ::Clock ;
use governor ::{ Quota , RateLimiter } ;
2021-12-11 16:48:59 -05:00
use hex ;
use log ::* ;
2022-01-25 21:39:24 -05:00
use r2d2 ;
use r2d2_sqlite ::SqliteConnectionManager ;
2022-01-22 22:29:15 -05:00
use rusqlite ::limits ::Limit ;
2022-02-12 10:29:25 -05:00
use rusqlite ::params ;
2022-01-22 22:29:15 -05:00
use rusqlite ::types ::ToSql ;
2022-02-12 10:29:25 -05:00
use rusqlite ::Connection ;
use rusqlite ::OpenFlags ;
2021-12-11 16:48:59 -05:00
use std ::path ::Path ;
2021-12-30 22:07:21 -05:00
use std ::thread ;
2022-01-26 22:39:03 -05:00
use std ::time ::Duration ;
2022-01-01 20:25:09 -05:00
use std ::time ::Instant ;
2021-12-11 16:48:59 -05:00
use tokio ::task ;
2022-01-25 21:39:24 -05:00
pub type SqlitePool = r2d2 ::Pool < r2d2_sqlite ::SqliteConnectionManager > ;
2022-02-12 10:29:25 -05:00
pub type PooledConnection = r2d2 ::PooledConnection < r2d2_sqlite ::SqliteConnectionManager > ;
2021-12-11 22:43:41 -05:00
/// Database file
2022-02-12 10:29:25 -05:00
pub const DB_FILE : & str = " nostr.db " ;
2021-12-11 16:48:59 -05:00
2021-12-30 14:55:05 -05:00
/// Startup DB Pragmas
const STARTUP_SQL : & str = r ##"
PRAGMA main . synchronous = NORMAL ;
PRAGMA foreign_keys = ON ;
pragma mmap_size = 536870912 ; - - 512 MB of mmap
" ##;
2021-12-11 22:43:41 -05:00
/// Schema definition
2021-12-11 16:48:59 -05:00
const INIT_SQL : & str = r ##"
2021-12-11 22:43:41 -05:00
- - Database settings
2021-12-11 16:48:59 -05:00
PRAGMA encoding = " UTF-8 " ;
PRAGMA journal_mode = WAL ;
PRAGMA main . synchronous = NORMAL ;
PRAGMA foreign_keys = ON ;
PRAGMA application_id = 1654008667 ;
2022-02-12 10:29:25 -05:00
PRAGMA user_version = 4 ;
2021-12-11 22:43:41 -05:00
- - Event Table
2021-12-11 16:48:59 -05:00
CREATE TABLE IF NOT EXISTS event (
id INTEGER PRIMARY KEY ,
event_hash BLOB NOT NULL , - - 4 - byte hash
first_seen INTEGER NOT NULL , - - when the event was first seen ( not authored ! ) ( seconds since 1970 )
created_at INTEGER NOT NULL , - - when the event was authored
author BLOB NOT NULL , - - author pubkey
kind INTEGER NOT NULL , - - event kind
2021-12-30 14:55:05 -05:00
hidden INTEGER , - - relevant for queries
2021-12-11 16:48:59 -05:00
content TEXT NOT NULL - - serialized json of event object
) ;
2021-12-11 22:43:41 -05:00
- - Event Indexes
2021-12-11 16:48:59 -05:00
CREATE UNIQUE INDEX IF NOT EXISTS event_hash_index ON event ( event_hash ) ;
CREATE INDEX IF NOT EXISTS created_at_index ON event ( created_at ) ;
CREATE INDEX IF NOT EXISTS author_index ON event ( author ) ;
CREATE INDEX IF NOT EXISTS kind_index ON event ( kind ) ;
2021-12-11 22:43:41 -05:00
2022-01-22 22:29:15 -05:00
- - Tag Table
- - Tag values are stored as either a BLOB ( if they come in as a
- - hex - string ) , or TEXT otherwise .
- - This means that searches need to select the appropriate column .
CREATE TABLE IF NOT EXISTS tag (
id INTEGER PRIMARY KEY ,
event_id INTEGER NOT NULL , - - an event ID that contains a tag .
name TEXT , - - the tag name ( " p " , " e " , whatever )
value TEXT , - - the tag value , if not hex .
value_hex BLOB , - - the tag value , if it can be interpreted as a hex string .
FOREIGN KEY ( event_id ) REFERENCES event ( id ) ON UPDATE CASCADE ON DELETE CASCADE
) ;
CREATE INDEX IF NOT EXISTS tag_val_index ON tag ( value ) ;
CREATE INDEX IF NOT EXISTS tag_val_hex_index ON tag ( value_hex ) ;
2021-12-11 22:43:41 -05:00
- - Event References Table
2021-12-11 16:48:59 -05:00
CREATE TABLE IF NOT EXISTS event_ref (
id INTEGER PRIMARY KEY ,
event_id INTEGER NOT NULL , - - an event ID that contains an #e tag .
referenced_event BLOB NOT NULL , - - the event that is referenced .
FOREIGN KEY ( event_id ) REFERENCES event ( id ) ON UPDATE CASCADE ON DELETE CASCADE
) ;
2021-12-11 22:43:41 -05:00
- - Event References Index
2021-12-11 16:48:59 -05:00
CREATE INDEX IF NOT EXISTS event_ref_index ON event_ref ( referenced_event ) ;
2021-12-11 22:43:41 -05:00
- - Pubkey References Table
2021-12-11 16:48:59 -05:00
CREATE TABLE IF NOT EXISTS pubkey_ref (
id INTEGER PRIMARY KEY ,
event_id INTEGER NOT NULL , - - an event ID that contains an #p tag .
referenced_pubkey BLOB NOT NULL , - - the pubkey that is referenced .
FOREIGN KEY ( event_id ) REFERENCES event ( id ) ON UPDATE RESTRICT ON DELETE CASCADE
) ;
2021-12-11 22:43:41 -05:00
- - Pubkey References Index
2021-12-11 16:48:59 -05:00
CREATE INDEX IF NOT EXISTS pubkey_ref_index ON pubkey_ref ( referenced_pubkey ) ;
2022-02-12 10:29:25 -05:00
- - NIP - 05 User Validation .
- - This represents the validation of a user .
- - cases ;
- - we query , and find a valid result . update verified_at , and proceed .
- - we query , and get a 404 / 503 / host down . update failed_at , and we are done .
- - we query , and get a 200 , but the local part is not present with the given address . wipe out verified_at , update failed_at .
- - we need to know how often to query failing validations .
- - two cases , either we get a NIP - 05 metadata event regularly that we can use to restart validation .
- - or , we simply get lots of non - metadata events , but the user fixed their NIP - 05 host .
- - what should trigger a new attempt ? what should trigger cleaning ?
- - we will never write anything to the table if it is not valid at least once .
- - we will keep trying at frequency X to re - validate the already - valid nip05s .
- - incoming metadata events with nip05
CREATE TABLE IF NOT EXISTS user_verification (
id INTEGER PRIMARY KEY ,
metadata_event INTEGER NOT NULL , - - the metadata event used for this validation .
name TEXT NOT NULL , - - the nip05 field value ( user @ domain ) .
verified_at INTEGER , - - timestamp this author / nip05 was most recently verified .
failed_at INTEGER , - - timestamp a verification attempt failed ( host down ) .
failure_count INTEGER DEFAULT 0 , - - number of consecutive failures .
FOREIGN KEY ( metadata_event ) REFERENCES event ( id ) ON UPDATE CASCADE ON DELETE CASCADE
) ;
2021-12-11 16:48:59 -05:00
" ##;
2022-02-12 10:29:25 -05:00
// TODO: drop the pubkey_ref and event_ref tables
2022-02-12 10:29:35 -05:00
/// Build a database connection pool.
2022-02-12 10:29:25 -05:00
pub fn build_pool (
name : & str ,
flags : OpenFlags ,
min_size : u32 ,
max_size : u32 ,
wait_for_db : bool ,
) -> SqlitePool {
let settings = SETTINGS . read ( ) . unwrap ( ) ;
let db_dir = & settings . database . data_directory ;
2022-01-25 21:39:24 -05:00
let full_path = Path ::new ( db_dir ) . join ( DB_FILE ) ;
2022-01-26 22:39:03 -05:00
// small hack; if the database doesn't exist yet, that means the
// writer thread hasn't finished. Give it a chance to work. This
// is only an issue with the first time we run.
2022-02-12 10:29:25 -05:00
while ! full_path . exists ( ) & & wait_for_db {
2022-01-26 22:39:03 -05:00
debug! ( " Database reader pool is waiting on the database to be created... " ) ;
thread ::sleep ( Duration ::from_millis ( 500 ) ) ;
}
2022-01-25 21:39:24 -05:00
let manager = SqliteConnectionManager ::file ( & full_path )
2022-02-12 10:29:25 -05:00
. with_flags ( flags )
2022-01-25 21:39:24 -05:00
. with_init ( | c | c . execute_batch ( STARTUP_SQL ) ) ;
let pool : SqlitePool = r2d2 ::Pool ::builder ( )
. test_on_check_out ( true ) // no noticeable performance hit
2022-02-12 10:29:25 -05:00
. min_idle ( Some ( min_size ) )
. max_size ( max_size )
2022-01-25 21:39:24 -05:00
. build ( manager )
. unwrap ( ) ;
2022-01-25 22:42:43 -05:00
info! (
2022-02-12 10:29:25 -05:00
" Built a connection pool {:?} (min={}, max={}) " ,
name , min_size , max_size
2022-01-25 22:42:43 -05:00
) ;
2022-02-12 10:29:25 -05:00
pool
}
/// Build a single database connection, with provided flags
pub fn build_conn ( flags : OpenFlags ) -> Result < Connection > {
let settings = SETTINGS . read ( ) . unwrap ( ) ;
let db_dir = & settings . database . data_directory ;
let full_path = Path ::new ( db_dir ) . join ( DB_FILE ) ;
// create a connection
Ok ( Connection ::open_with_flags ( & full_path , flags ) ? )
2022-01-25 21:39:24 -05:00
}
2021-12-30 14:55:05 -05:00
/// Upgrade DB to latest version, and execute pragma settings
2022-02-12 10:29:25 -05:00
pub fn upgrade_db ( conn : & mut PooledConnection ) -> Result < ( ) > {
2021-12-30 14:55:05 -05:00
// check the version.
2022-01-22 22:29:15 -05:00
let mut curr_version = db_version ( conn ) ? ;
2021-12-30 14:55:05 -05:00
info! ( " DB version = {:?} " , curr_version ) ;
2022-01-22 22:29:15 -05:00
debug! (
" SQLite max query parameters: {} " ,
conn . limit ( Limit ::SQLITE_LIMIT_VARIABLE_NUMBER )
) ;
debug! (
" SQLite max table/blob/text length: {} MB " ,
( conn . limit ( Limit ::SQLITE_LIMIT_LENGTH ) as f64 / ( 1024 * 1024 ) as f64 ) . floor ( )
) ;
debug! (
" SQLite max SQL length: {} MB " ,
( conn . limit ( Limit ::SQLITE_LIMIT_SQL_LENGTH ) as f64 / ( 1024 * 1024 ) as f64 ) . floor ( )
) ;
2021-12-30 14:55:05 -05:00
// initialize from scratch
if curr_version = = 0 {
match conn . execute_batch ( INIT_SQL ) {
2022-01-22 22:29:15 -05:00
Ok ( ( ) ) = > {
2022-02-12 10:29:25 -05:00
info! ( " database pragma/schema initialized to v4, and ready " ) ;
2022-01-22 22:29:15 -05:00
}
2021-12-30 14:55:05 -05:00
Err ( err ) = > {
error! ( " update failed: {} " , err ) ;
panic! ( " database could not be initialized " ) ;
}
}
2022-01-22 22:29:15 -05:00
}
if curr_version = = 1 {
2021-12-30 14:55:05 -05:00
// only change is adding a hidden column to events.
let upgrade_sql = r ##"
ALTER TABLE event ADD hidden INTEGER ;
UPDATE event SET hidden = FALSE ;
PRAGMA user_version = 2 ;
" ##;
match conn . execute_batch ( upgrade_sql ) {
2022-01-22 22:29:15 -05:00
Ok ( ( ) ) = > {
info! ( " database schema upgraded v1 -> v2 " ) ;
curr_version = 2 ;
}
Err ( err ) = > {
error! ( " update failed: {} " , err ) ;
panic! ( " database could not be upgraded " ) ;
}
}
}
if curr_version = = 2 {
// this version lacks the tag column
debug! ( " database schema needs update from 2->3 " ) ;
let upgrade_sql = r ##"
CREATE TABLE IF NOT EXISTS tag (
id INTEGER PRIMARY KEY ,
event_id INTEGER NOT NULL , - - an event ID that contains a tag .
name TEXT , - - the tag name ( " p " , " e " , whatever )
value TEXT , - - the tag value , if not hex .
value_hex BLOB , - - the tag value , if it can be interpreted as a hex string .
FOREIGN KEY ( event_id ) REFERENCES event ( id ) ON UPDATE CASCADE ON DELETE CASCADE
) ;
PRAGMA user_version = 3 ;
" ##;
// TODO: load existing refs into tag table
match conn . execute_batch ( upgrade_sql ) {
Ok ( ( ) ) = > {
info! ( " database schema upgraded v2 -> v3 " ) ;
2022-02-12 10:29:25 -05:00
curr_version = 3 ;
2022-01-22 22:29:15 -05:00
}
2021-12-30 14:55:05 -05:00
Err ( err ) = > {
error! ( " update failed: {} " , err ) ;
panic! ( " database could not be upgraded " ) ;
}
}
2022-01-22 22:29:15 -05:00
info! ( " Starting transaction " ) ;
// iterate over every event/pubkey tag
let tx = conn . transaction ( ) ? ;
{
let mut stmt = tx . prepare ( " select event_id, \" e \" , lower(hex(referenced_event)) from event_ref union select event_id, \" p \" , lower(hex(referenced_pubkey)) from pubkey_ref; " ) ? ;
let mut tag_rows = stmt . query ( [ ] ) ? ;
while let Some ( row ) = tag_rows . next ( ) ? {
// we want to capture the event_id that had the tag, the tag name, and the tag hex value.
let event_id : u64 = row . get ( 0 ) ? ;
let tag_name : String = row . get ( 1 ) ? ;
let tag_value : String = row . get ( 2 ) ? ;
// this will leave behind p/e tags that were non-hex, but they are invalid anyways.
if is_hex ( & tag_value ) {
tx . execute (
" INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3); " ,
params! [ event_id , tag_name , hex ::decode ( & tag_value ) . ok ( ) ] ,
) ? ;
}
}
}
tx . commit ( ) ? ;
info! ( " Upgrade complete " ) ;
2022-02-12 10:29:25 -05:00
}
if curr_version = = 3 {
debug! ( " database schema needs update from 3->4 " ) ;
let upgrade_sql = r ##"
- - incoming metadata events with nip05
CREATE TABLE IF NOT EXISTS user_verification (
id INTEGER PRIMARY KEY ,
metadata_event INTEGER NOT NULL , - - the metadata event used for this validation .
name TEXT NOT NULL , - - the nip05 field value ( user @ domain ) .
verified_at INTEGER , - - timestamp this author / nip05 was most recently verified .
failed_at INTEGER , - - timestamp a verification attempt failed ( host down ) .
failure_count INTEGER DEFAULT 0 , - - number of consecutive failures .
FOREIGN KEY ( metadata_event ) REFERENCES event ( id ) ON UPDATE CASCADE ON DELETE CASCADE
) ;
CREATE INDEX IF NOT EXISTS user_verification_author_index ON user_verification ( author ) ;
CREATE INDEX IF NOT EXISTS user_verification_author_index ON user_verification ( author ) ;
PRAGMA user_version = 4 ;
" ##;
// TODO: load existing refs into tag table
match conn . execute_batch ( upgrade_sql ) {
Ok ( ( ) ) = > {
info! ( " database schema upgraded v3 -> v4 " ) ;
//curr_version = 4;
}
Err ( err ) = > {
error! ( " update failed: {} " , err ) ;
panic! ( " database could not be upgraded " ) ;
}
}
} else if curr_version = = 4 {
2021-12-30 14:55:05 -05:00
debug! ( " Database version was already current " ) ;
2022-01-22 22:29:15 -05:00
} else if curr_version > 3 {
2021-12-30 14:55:05 -05:00
panic! ( " Database version is newer than supported by this executable " ) ;
}
// Setup PRAGMA
conn . execute_batch ( STARTUP_SQL ) ? ;
2022-02-12 10:29:25 -05:00
debug! ( " SQLite PRAGMA startup completed " ) ;
2021-12-30 14:55:05 -05:00
Ok ( ( ) )
}
2021-12-11 16:48:59 -05:00
/// Spawn a database writer that persists events to the SQLite store.
pub async fn db_writer (
mut event_rx : tokio ::sync ::mpsc ::Receiver < Event > ,
2021-12-12 11:20:23 -05:00
bcast_tx : tokio ::sync ::broadcast ::Sender < Event > ,
2022-02-12 10:29:25 -05:00
metadata_tx : tokio ::sync ::broadcast ::Sender < Event > ,
2021-12-30 22:07:21 -05:00
mut shutdown : tokio ::sync ::broadcast ::Receiver < ( ) > ,
2021-12-11 16:48:59 -05:00
) -> tokio ::task ::JoinHandle < Result < ( ) > > {
2022-02-12 10:29:25 -05:00
let settings = SETTINGS . read ( ) . unwrap ( ) ;
// are we performing NIP-05 checking?
let nip05_active = settings . verified_users . is_active ( ) ;
// are we requriing NIP-05 user verification?
let nip05_enabled = settings . verified_users . is_enabled ( ) ;
2021-12-11 16:48:59 -05:00
task ::spawn_blocking ( move | | {
2021-12-31 12:51:57 -05:00
// get database configuration settings
2022-02-12 10:29:25 -05:00
let settings = SETTINGS . read ( ) . unwrap ( ) ;
let db_dir = & settings . database . data_directory ;
2021-12-31 12:51:57 -05:00
let full_path = Path ::new ( db_dir ) . join ( DB_FILE ) ;
2022-02-12 10:29:25 -05:00
// create a connection pool
let pool = build_pool (
" event writer " ,
2021-12-11 16:48:59 -05:00
OpenFlags ::SQLITE_OPEN_READ_WRITE | OpenFlags ::SQLITE_OPEN_CREATE ,
2022-02-12 10:29:25 -05:00
1 ,
4 ,
false ,
) ;
2021-12-31 12:51:57 -05:00
info! ( " opened database {:?} for writing " , full_path ) ;
2022-02-12 10:29:25 -05:00
upgrade_db ( & mut pool . get ( ) ? ) ? ;
2022-01-22 22:29:15 -05:00
2022-01-26 22:39:03 -05:00
// Make a copy of the whitelist
2022-02-12 10:29:25 -05:00
let whitelist = & settings . authorization . pubkey_whitelist . clone ( ) ;
2022-01-26 22:39:03 -05:00
2021-12-30 22:07:21 -05:00
// get rate limit settings
2022-02-12 10:29:25 -05:00
let rps_setting = settings . limits . messages_per_sec ;
2022-01-05 18:30:47 -05:00
let mut most_recent_rate_limit = Instant ::now ( ) ;
2021-12-30 22:07:21 -05:00
let mut lim_opt = None ;
let clock = governor ::clock ::QuantaClock ::default ( ) ;
if let Some ( rps ) = rps_setting {
if rps > 0 {
info! ( " Enabling rate limits for event creation ({}/sec) " , rps ) ;
let quota = core ::num ::NonZeroU32 ::new ( rps * 60 ) . unwrap ( ) ;
lim_opt = Some ( RateLimiter ::direct ( Quota ::per_minute ( quota ) ) ) ;
}
}
2021-12-11 16:48:59 -05:00
loop {
2021-12-31 16:19:35 -05:00
if shutdown . try_recv ( ) . is_ok ( ) {
2021-12-30 22:07:21 -05:00
info! ( " shutting down database writer " ) ;
break ;
}
2021-12-11 16:48:59 -05:00
// call blocking read on channel
let next_event = event_rx . blocking_recv ( ) ;
// if the channel has closed, we will never get work
if next_event . is_none ( ) {
break ;
}
2021-12-30 22:07:21 -05:00
let mut event_write = false ;
2021-12-11 16:48:59 -05:00
let event = next_event . unwrap ( ) ;
2022-01-26 22:39:03 -05:00
// check if this event is authorized.
if let Some ( allowed_addrs ) = whitelist {
2022-02-12 10:29:25 -05:00
debug! ( " Checking against pubkey whitelist " ) ;
2022-01-26 22:39:03 -05:00
// if the event address is not in allowed_addrs.
if ! allowed_addrs . contains ( & event . pubkey ) {
info! (
" Rejecting event {}, unauthorized author " ,
event . get_event_id_prefix ( )
) ;
// TODO: define a channel that can send NOTICEs back to the client.
continue ;
}
}
2022-02-12 10:29:25 -05:00
// send any metadata events to the NIP-05 verifier
if nip05_active & & event . is_kind_metadata ( ) {
// we are sending this prior to even deciding if we
// persist it. this allows the nip05 module to
// inspect it, update if necessary, or persist a new
// event and broadcast it itself.
metadata_tx . send ( event . clone ( ) ) . ok ( ) ;
}
// check for NIP-05 verification
if nip05_enabled {
match nip05 ::query_latest_user_verification ( pool . get ( ) ? , event . pubkey . to_owned ( ) ) {
Ok ( uv ) = > {
if uv . is_valid ( ) {
info! (
" new event from verified author ({:?},{:?}) " ,
uv . name . to_string ( ) ,
event . get_author_prefix ( )
) ;
} else {
info! ( " rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain) " ,
uv . name . to_string ( ) ,
event . get_author_prefix ( )
) ;
continue ;
}
}
Err ( Error ::SqlError ( rusqlite ::Error ::QueryReturnedNoRows ) ) = > {
debug! (
" no verification records found for pubkey: {:?} " ,
event . get_author_prefix ( )
) ;
continue ;
}
Err ( e ) = > {
warn! ( " checking nip05 verification status failed: {:?} " , e ) ;
continue ;
}
}
}
// TODO: cache recent list of authors to remove a DB call.
2022-01-01 20:25:09 -05:00
let start = Instant ::now ( ) ;
2022-02-12 10:29:25 -05:00
match write_event ( & mut pool . get ( ) ? , & event ) {
2021-12-11 16:48:59 -05:00
Ok ( updated ) = > {
if updated = = 0 {
2022-02-12 10:29:25 -05:00
trace! ( " ignoring duplicate event " ) ;
2021-12-11 16:48:59 -05:00
} else {
2022-01-01 20:25:09 -05:00
info! (
2022-02-12 10:29:25 -05:00
" persisted event {:?} from {:?} in {:?} " ,
2022-01-01 20:25:09 -05:00
event . get_event_id_prefix ( ) ,
2022-02-12 10:29:25 -05:00
event . get_author_prefix ( ) ,
2022-01-01 20:25:09 -05:00
start . elapsed ( )
) ;
2021-12-30 22:07:21 -05:00
event_write = true ;
2021-12-12 11:20:23 -05:00
// send this out to all clients
bcast_tx . send ( event . clone ( ) ) . ok ( ) ;
2021-12-11 16:48:59 -05:00
}
}
Err ( err ) = > {
2022-02-12 10:29:25 -05:00
warn! ( " event insert failed: {:?} " , err ) ;
2021-12-11 16:48:59 -05:00
}
}
2022-02-12 10:29:25 -05:00
2021-12-30 22:07:21 -05:00
// use rate limit, if defined, and if an event was actually written.
if event_write {
if let Some ( ref lim ) = lim_opt {
if let Err ( n ) = lim . check ( ) {
2022-01-05 18:30:47 -05:00
let wait_for = n . wait_time_from ( clock . now ( ) ) ;
// check if we have recently logged rate
// limits, but print out a message only once
// per second.
2022-02-12 10:29:25 -05:00
if most_recent_rate_limit . elapsed ( ) . as_secs ( ) > 10 {
2022-01-05 18:30:47 -05:00
warn! (
2022-02-12 10:29:25 -05:00
" rate limit reached for event creation (sleep for {:?}) (suppressing future messages for 10 seconds) " ,
2022-01-05 18:30:47 -05:00
wait_for
) ;
// reset last rate limit message
most_recent_rate_limit = Instant ::now ( ) ;
}
// block event writes, allowing them to queue up
thread ::sleep ( wait_for ) ;
2021-12-30 22:07:21 -05:00
continue ;
}
}
}
2021-12-11 16:48:59 -05:00
}
info! ( " database connection closed " ) ;
Ok ( ( ) )
} )
}
2022-02-12 10:29:35 -05:00
/// Determine the current application database schema version.
2021-12-30 14:55:05 -05:00
pub fn db_version ( conn : & mut Connection ) -> Result < usize > {
let query = " PRAGMA user_version; " ;
let curr_version = conn . query_row ( query , [ ] , | row | row . get ( 0 ) ) ? ;
Ok ( curr_version )
}
2022-02-12 10:29:35 -05:00
/// Persist an event to the database, returning rows added.
2022-02-12 10:29:25 -05:00
pub fn write_event ( conn : & mut PooledConnection , e : & Event ) -> Result < usize > {
2021-12-11 16:48:59 -05:00
// start transaction
let tx = conn . transaction ( ) ? ;
// get relevant fields from event and convert to blobs.
let id_blob = hex ::decode ( & e . id ) . ok ( ) ;
let pubkey_blob = hex ::decode ( & e . pubkey ) . ok ( ) ;
let event_str = serde_json ::to_string ( & e ) . ok ( ) ;
2021-12-11 22:43:41 -05:00
// ignore if the event hash is a duplicate.
2021-12-11 16:48:59 -05:00
let ins_count = tx . execute (
2021-12-30 14:55:05 -05:00
" INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, strftime('%s','now'), FALSE); " ,
2021-12-11 16:48:59 -05:00
params! [ id_blob , e . created_at , e . kind , pubkey_blob , event_str ]
) ? ;
2021-12-11 22:43:41 -05:00
if ins_count = = 0 {
// if the event was a duplicate, no need to insert event or
// pubkey references.
return Ok ( ins_count ) ;
}
// remember primary key of the event most recently inserted.
2021-12-11 16:48:59 -05:00
let ev_id = tx . last_insert_rowid ( ) ;
2022-01-22 22:29:15 -05:00
// add all tags to the tag table
for tag in e . tags . iter ( ) {
// ensure we have 2 values.
if tag . len ( ) > = 2 {
let tagname = & tag [ 0 ] ;
let tagval = & tag [ 1 ] ;
// if tagvalue is hex;
if is_hex ( tagval ) {
tx . execute (
" INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3) " ,
params! [ ev_id , & tagname , hex ::decode ( & tagval ) . ok ( ) ] ,
) ? ;
} else {
tx . execute (
" INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3) " ,
params! [ ev_id , & tagname , & tagval ] ,
) ? ;
}
2021-12-11 16:48:59 -05:00
}
}
2021-12-30 14:55:05 -05:00
// if this event is for a metadata update, hide every other kind=0
2021-12-30 16:45:03 -05:00
// event from the same author that was issued earlier than this.
2021-12-30 14:55:05 -05:00
if e . kind = = 0 {
let update_count = tx . execute (
2021-12-30 16:45:03 -05:00
" UPDATE event SET hidden=TRUE WHERE id!=? AND kind=0 AND author=? AND created_at <= ? and hidden!=TRUE " ,
2021-12-30 14:55:05 -05:00
params! [ ev_id , hex ::decode ( & e . pubkey ) . ok ( ) , e . created_at ] ,
) ? ;
if update_count > 0 {
2022-02-12 10:29:25 -05:00
info! (
" hid {} older metadata events for author {:?} " ,
update_count ,
e . get_author_prefix ( )
) ;
2021-12-30 14:55:05 -05:00
}
}
2021-12-30 16:45:03 -05:00
// if this event is for a contact update, hide every other kind=3
// event from the same author that was issued earlier than this.
if e . kind = = 3 {
let update_count = tx . execute (
" UPDATE event SET hidden=TRUE WHERE id!=? AND kind=3 AND author=? AND created_at <= ? and hidden!=TRUE " ,
params! [ ev_id , hex ::decode ( & e . pubkey ) . ok ( ) , e . created_at ] ,
) ? ;
if update_count > 0 {
2022-02-12 10:29:25 -05:00
info! (
" hid {} older contact events for author {:?} " ,
update_count ,
e . get_author_prefix ( )
) ;
2021-12-30 16:45:03 -05:00
}
}
2021-12-11 16:48:59 -05:00
tx . commit ( ) ? ;
Ok ( ins_count )
}
2022-02-12 10:29:35 -05:00
/// Serialized event associated with a specific subscription request.
2021-12-11 16:48:59 -05:00
#[ derive(PartialEq, Debug, Clone) ]
pub struct QueryResult {
2021-12-11 22:43:41 -05:00
/// Subscription identifier
2021-12-11 16:48:59 -05:00
pub sub_id : String ,
2021-12-11 22:43:41 -05:00
/// Serialized event
2021-12-11 16:48:59 -05:00
pub event : String ,
}
2021-12-11 22:43:41 -05:00
/// Check if a string contains only hex characters.
fn is_hex ( s : & str ) -> bool {
2021-12-11 16:48:59 -05:00
s . chars ( ) . all ( | x | char ::is_ascii_hexdigit ( & x ) )
}
2022-01-25 19:21:43 -05:00
/// Check if a string contains only f chars
fn is_all_fs ( s : & str ) -> bool {
s . chars ( ) . all ( | x | x = = 'f' | | x = = 'F' )
}
2022-02-12 10:29:35 -05:00
/// Types of hexadecimal queries.
2022-01-25 19:21:43 -05:00
#[ derive(PartialEq, Debug, Clone) ]
enum HexSearch {
// when no range is needed, exact 32-byte
Exact ( Vec < u8 > ) ,
// lower (inclusive) and upper range (exclusive)
Range ( Vec < u8 > , Vec < u8 > ) ,
// lower bound only, upper bound is MAX inclusive
LowerOnly ( Vec < u8 > ) ,
}
/// Find the next hex sequence greater than the argument.
fn hex_range ( s : & str ) -> Option < HexSearch > {
// handle special cases
if ! is_hex ( s ) | | s . len ( ) > 64 {
return None ;
}
if s . len ( ) = = 64 {
return Some ( HexSearch ::Exact ( hex ::decode ( s ) . ok ( ) ? ) ) ;
}
// if s is odd, add a zero
let mut hash_base = s . to_owned ( ) ;
let mut odd = hash_base . len ( ) % 2 ! = 0 ;
if odd {
// extend the string to make it even
hash_base . push ( '0' ) ;
}
let base = hex ::decode ( hash_base ) . ok ( ) ? ;
// check for all ff's
if is_all_fs ( s ) {
// there is no higher bound, we only want to search for blobs greater than this.
return Some ( HexSearch ::LowerOnly ( base ) ) ;
}
// return a range
let mut upper = base . clone ( ) ;
let mut byte_len = upper . len ( ) ;
// for odd strings, we made them longer, but we want to increment the upper char (+16).
// we know we can do this without overflowing because we explicitly set the bottom half to 0's.
while byte_len > 0 {
byte_len - = 1 ;
// check if byte can be incremented, or if we need to carry.
let b = upper [ byte_len ] ;
if b = = u8 ::MAX {
// reset and carry
upper [ byte_len ] = 0 ;
} else if odd {
// check if first char in this byte is NOT 'f'
if b < 240 {
upper [ byte_len ] = b + 16 ; // bump up the first character in this byte
// increment done, stop iterating through the vec
break ;
} else {
// if it is 'f', reset the byte to 0 and do a carry
// reset and carry
upper [ byte_len ] = 0 ;
}
// done with odd logic, so don't repeat this
odd = false ;
} else {
// bump up the first character in this byte
upper [ byte_len ] = b + 1 ;
// increment done, stop iterating
break ;
}
}
Some ( HexSearch ::Range ( base , upper ) )
}
2022-02-12 10:29:35 -05:00
/// Produce a arbitrary list of '?' parameters.
2022-01-22 22:29:15 -05:00
fn repeat_vars ( count : usize ) -> String {
if count = = 0 {
return " " . to_owned ( ) ;
}
let mut s = " ?, " . repeat ( count ) ;
// Remove trailing comma
s . pop ( ) ;
s
}
/// Create a dynamic SQL query string and params from a subscription.
fn query_from_sub ( sub : & Subscription ) -> ( String , Vec < Box < dyn ToSql > > ) {
2021-12-11 16:48:59 -05:00
// build a dynamic SQL query. all user-input is either an integer
// (sqli-safe), or a string that is filtered to only contain
2022-01-22 22:29:15 -05:00
// hexadecimal characters. Strings that require escaping (tag
// names/values) use parameters.
2021-12-11 16:48:59 -05:00
let mut query =
2022-01-22 22:29:15 -05:00
" SELECT DISTINCT(e.content) FROM event e LEFT JOIN tag t ON e.id=t.event_id " . to_owned ( ) ;
// parameters
let mut params : Vec < Box < dyn ToSql > > = vec! [ ] ;
2021-12-11 16:48:59 -05:00
// for every filter in the subscription, generate a where clause
let mut filter_clauses : Vec < String > = Vec ::new ( ) ;
for f in sub . filters . iter ( ) {
// individual filter components
let mut filter_components : Vec < String > = Vec ::new ( ) ;
2022-01-25 19:21:43 -05:00
// Query for "authors", allowing prefix matches
if let Some ( authvec ) = & f . authors {
// take each author and convert to a hexsearch
let mut auth_searches : Vec < String > = vec! [ ] ;
for auth in authvec {
match hex_range ( auth ) {
Some ( HexSearch ::Exact ( ex ) ) = > {
auth_searches . push ( " author=? " . to_owned ( ) ) ;
params . push ( Box ::new ( ex ) ) ;
}
Some ( HexSearch ::Range ( lower , upper ) ) = > {
auth_searches . push ( " (author>? AND author<?) " . to_owned ( ) ) ;
params . push ( Box ::new ( lower ) ) ;
params . push ( Box ::new ( upper ) ) ;
}
Some ( HexSearch ::LowerOnly ( lower ) ) = > {
// info!("{:?} => lower; {:?} ", auth, hex::encode(lower));
auth_searches . push ( " author>? " . to_owned ( ) ) ;
params . push ( Box ::new ( lower ) ) ;
}
None = > {
2022-01-25 22:42:43 -05:00
info! ( " Could not parse hex range from author {:?} " , auth ) ;
2022-01-25 19:21:43 -05:00
}
}
}
let authors_clause = format! ( " ( {} ) " , auth_searches . join ( " OR " ) ) ;
2021-12-11 16:48:59 -05:00
filter_components . push ( authors_clause ) ;
}
// Query for Kind
2022-01-01 19:38:52 -05:00
if let Some ( ks ) = & f . kinds {
2021-12-11 16:48:59 -05:00
// kind is number, no escaping needed
2022-01-01 19:38:52 -05:00
let str_kinds : Vec < String > = ks . iter ( ) . map ( | x | x . to_string ( ) ) . collect ( ) ;
let kind_clause = format! ( " kind IN ( {} ) " , str_kinds . join ( " , " ) ) ;
2021-12-11 16:48:59 -05:00
filter_components . push ( kind_clause ) ;
}
2022-01-25 19:21:43 -05:00
// Query for event, allowing prefix matches
if let Some ( idvec ) = & f . ids {
// take each author and convert to a hexsearch
let mut id_searches : Vec < String > = vec! [ ] ;
for id in idvec {
match hex_range ( id ) {
Some ( HexSearch ::Exact ( ex ) ) = > {
id_searches . push ( " event_hash=? " . to_owned ( ) ) ;
params . push ( Box ::new ( ex ) ) ;
}
Some ( HexSearch ::Range ( lower , upper ) ) = > {
id_searches . push ( " (event_hash>? AND event_hash<?) " . to_owned ( ) ) ;
params . push ( Box ::new ( lower ) ) ;
params . push ( Box ::new ( upper ) ) ;
}
Some ( HexSearch ::LowerOnly ( lower ) ) = > {
id_searches . push ( " event_hash>? " . to_owned ( ) ) ;
params . push ( Box ::new ( lower ) ) ;
}
None = > {
2022-01-25 22:42:43 -05:00
info! ( " Could not parse hex range from id {:?} " , id ) ;
2022-01-25 19:21:43 -05:00
}
}
}
let id_clause = format! ( " ( {} ) " , id_searches . join ( " OR " ) ) ;
2022-01-01 19:38:52 -05:00
filter_components . push ( id_clause ) ;
2021-12-11 16:48:59 -05:00
}
2022-01-22 22:29:15 -05:00
// Query for tags
if let Some ( map ) = & f . tags {
for ( key , val ) in map . iter ( ) {
let mut str_vals : Vec < Box < dyn ToSql > > = vec! [ ] ;
let mut blob_vals : Vec < Box < dyn ToSql > > = vec! [ ] ;
for v in val {
if is_hex ( v ) {
if let Ok ( h ) = hex ::decode ( & v ) {
blob_vals . push ( Box ::new ( h ) ) ;
}
} else {
str_vals . push ( Box ::new ( v . to_owned ( ) ) ) ;
}
}
// create clauses with "?" params for each tag value being searched
let str_clause = format! ( " value IN ( {} ) " , repeat_vars ( str_vals . len ( ) ) ) ;
let blob_clause = format! ( " value_hex IN ( {} ) " , repeat_vars ( blob_vals . len ( ) ) ) ;
let tag_clause = format! ( " (name=? AND ( {} OR {} )) " , str_clause , blob_clause ) ;
// add the tag name as the first parameter
params . push ( Box ::new ( key . to_owned ( ) ) ) ;
// add all tag values that are plain strings as params
params . append ( & mut str_vals ) ;
// add all tag values that are blobs as params
params . append ( & mut blob_vals ) ;
filter_components . push ( tag_clause ) ;
}
2021-12-11 16:48:59 -05:00
}
// Query for timestamp
if f . since . is_some ( ) {
let created_clause = format! ( " created_at > {} " , f . since . unwrap ( ) ) ;
filter_components . push ( created_clause ) ;
}
2021-12-23 22:38:32 -05:00
// Query for timestamp
if f . until . is_some ( ) {
let until_clause = format! ( " created_at < {} " , f . until . unwrap ( ) ) ;
filter_components . push ( until_clause ) ;
}
2021-12-11 16:48:59 -05:00
// combine all clauses, and add to filter_clauses
2021-12-11 22:56:52 -05:00
if ! filter_components . is_empty ( ) {
2021-12-11 16:48:59 -05:00
let mut fc = " ( " . to_owned ( ) ;
fc . push_str ( & filter_components . join ( " AND " ) ) ;
fc . push_str ( " ) " ) ;
filter_clauses . push ( fc ) ;
}
}
2022-01-25 21:48:46 -05:00
// never display hidden events
query . push_str ( " WHERE hidden!=TRUE " ) ;
2021-12-11 16:48:59 -05:00
// combine all filters with OR clauses, if any exist
2021-12-11 22:56:52 -05:00
if ! filter_clauses . is_empty ( ) {
2022-01-25 21:48:46 -05:00
query . push_str ( " AND ( " ) ;
2021-12-11 16:48:59 -05:00
query . push_str ( & filter_clauses . join ( " OR " ) ) ;
2022-01-25 21:48:46 -05:00
query . push_str ( " ) " ) ;
2021-12-11 16:48:59 -05:00
}
2021-12-23 22:36:46 -05:00
// add order clause
query . push_str ( " ORDER BY created_at ASC " ) ;
2021-12-12 11:03:28 -05:00
debug! ( " query string: {} " , query ) ;
2022-01-22 22:29:15 -05:00
( query , params )
2021-12-11 16:48:59 -05:00
}
2021-12-11 22:43:41 -05:00
/// Perform a database query using a subscription.
///
/// The [`Subscription`] is converted into a SQL query. Each result
/// is published on the `query_tx` channel as it is returned. If a
/// message becomes available on the `abandon_query_rx` channel, the
/// query is immediately aborted.
2021-12-11 16:48:59 -05:00
pub async fn db_query (
sub : Subscription ,
2022-02-12 10:29:25 -05:00
conn : PooledConnection ,
2021-12-11 16:48:59 -05:00
query_tx : tokio ::sync ::mpsc ::Sender < QueryResult > ,
mut abandon_query_rx : tokio ::sync ::oneshot ::Receiver < ( ) > ,
) {
task ::spawn_blocking ( move | | {
2021-12-12 11:03:28 -05:00
debug! ( " going to query for: {:?} " , sub ) ;
2022-01-01 20:25:09 -05:00
let mut row_count : usize = 0 ;
let start = Instant ::now ( ) ;
2021-12-11 22:43:41 -05:00
// generate SQL query
2022-01-22 22:29:15 -05:00
let ( q , p ) = query_from_sub ( & sub ) ;
2022-01-25 21:39:24 -05:00
// execute the query. Don't cache, since queries vary so much.
2022-01-05 18:33:08 -05:00
let mut stmt = conn . prepare ( & q ) ? ;
2022-01-22 22:29:15 -05:00
let mut event_rows = stmt . query ( rusqlite ::params_from_iter ( p ) ) ? ;
2022-01-05 18:33:08 -05:00
while let Some ( row ) = event_rows . next ( ) ? {
2021-12-11 16:48:59 -05:00
// check if this is still active (we could do this every N rows)
if abandon_query_rx . try_recv ( ) . is_ok ( ) {
2021-12-11 22:43:41 -05:00
debug! ( " query aborted " ) ;
2022-01-05 18:33:08 -05:00
return Ok ( ( ) ) ;
2021-12-11 16:48:59 -05:00
}
2022-01-01 20:25:09 -05:00
row_count + = 1 ;
2022-01-05 18:33:08 -05:00
let event_json = row . get ( 0 ) ? ;
2021-12-11 16:48:59 -05:00
query_tx
. blocking_send ( QueryResult {
sub_id : sub . get_id ( ) ,
event : event_json ,
} )
. ok ( ) ;
}
2022-01-01 20:25:09 -05:00
debug! (
" query completed ({} rows) in {:?} " ,
row_count ,
start . elapsed ( )
) ;
2022-01-05 18:33:08 -05:00
let ok : Result < ( ) > = Ok ( ( ) ) ;
2022-01-22 22:29:15 -05:00
ok
2021-12-11 16:48:59 -05:00
} ) ;
}
2022-01-25 19:21:43 -05:00
#[ cfg(test) ]
mod tests {
use super ::* ;
#[ test ]
fn hex_range_exact ( ) -> Result < ( ) > {
let hex = " abcdef00abcdef00abcdef00abcdef00abcdef00abcdef00abcdef00abcdef00 " ;
let r = hex_range ( hex ) ;
assert_eq! (
r ,
Some ( HexSearch ::Exact ( hex ::decode ( hex ) . expect ( " invalid hex " ) ) )
) ;
Ok ( ( ) )
}
#[ test ]
fn hex_full_range ( ) -> Result < ( ) > {
//let hex = "abcdef00abcdef00abcdef00abcdef00abcdef00abcdef00abcdef00abcdef00";
let hex = " aaaa " ;
let hex_upper = " aaab " ;
let r = hex_range ( hex ) ;
assert_eq! (
r ,
Some ( HexSearch ::Range (
hex ::decode ( hex ) . expect ( " invalid hex " ) ,
hex ::decode ( hex_upper ) . expect ( " invalid hex " )
) )
) ;
Ok ( ( ) )
}
#[ test ]
fn hex_full_range_odd ( ) -> Result < ( ) > {
let r = hex_range ( " abc " ) ;
assert_eq! (
r ,
Some ( HexSearch ::Range (
hex ::decode ( " abc0 " ) . expect ( " invalid hex " ) ,
hex ::decode ( " abd0 " ) . expect ( " invalid hex " )
) )
) ;
Ok ( ( ) )
}
#[ test ]
fn hex_full_range_odd_end_f ( ) -> Result < ( ) > {
let r = hex_range ( " abf " ) ;
assert_eq! (
r ,
Some ( HexSearch ::Range (
hex ::decode ( " abf0 " ) . expect ( " invalid hex " ) ,
hex ::decode ( " ac00 " ) . expect ( " invalid hex " )
) )
) ;
Ok ( ( ) )
}
#[ test ]
fn hex_no_upper ( ) -> Result < ( ) > {
let r = hex_range ( " ffff " ) ;
assert_eq! (
r ,
Some ( HexSearch ::LowerOnly (
hex ::decode ( " ffff " ) . expect ( " invalid hex " )
) )
) ;
Ok ( ( ) )
}
#[ test ]
fn hex_no_upper_odd ( ) -> Result < ( ) > {
let r = hex_range ( " fff " ) ;
assert_eq! (
r ,
Some ( HexSearch ::LowerOnly (
hex ::decode ( " fff0 " ) . expect ( " invalid hex " )
) )
) ;
Ok ( ( ) )
}
}