diff --git a/src/bin/bulkloader.rs b/src/bin/bulkloader.rs index cac6b7e..af276ea 100644 --- a/src/bin/bulkloader.rs +++ b/src/bin/bulkloader.rs @@ -1,16 +1,16 @@ +use nostr_rs_relay::config; +use nostr_rs_relay::error::{Error, Result}; +use nostr_rs_relay::event::{single_char_tagname, Event}; +use nostr_rs_relay::repo::sqlite::{build_pool, PooledConnection}; +use nostr_rs_relay::repo::sqlite_migration::{curr_db_version, DB_VERSION}; +use nostr_rs_relay::utils::is_lower_hex; +use rusqlite::params; +use rusqlite::{OpenFlags, Transaction}; use std::io; use std::path::Path; -use nostr_rs_relay::utils::is_lower_hex; -use tracing::info; -use nostr_rs_relay::config; -use nostr_rs_relay::event::{Event,single_char_tagname}; -use nostr_rs_relay::error::{Error, Result}; -use nostr_rs_relay::repo::sqlite::{PooledConnection, build_pool}; -use nostr_rs_relay::repo::sqlite_migration::{curr_db_version, DB_VERSION}; -use rusqlite::{OpenFlags, Transaction}; use std::sync::mpsc; use std::thread; -use rusqlite::params; +use tracing::info; /// Bulk load JSONL data from STDIN to the database specified in config.toml (or ./nostr.db as a default). /// The database must already exist, this will not create a new one. @@ -26,89 +26,95 @@ pub fn main() -> Result<()> { return Err(Error::DatabaseDirError); } // Get a database pool - let pool = build_pool("bulk-loader", &settings, OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE, 1,4,false); + let pool = build_pool( + "bulk-loader", + &settings, + OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE, + 1, + 4, + false, + ); { - // check for database schema version - let mut conn: PooledConnection = pool.get()?; - let version = curr_db_version(&mut conn)?; - info!("current version is: {:?}", version); - // ensure the schema version is current. - if version != DB_VERSION { - info!("version is not current, exiting"); - panic!("cannot write to schema other than v{DB_VERSION}"); - } + // check for database schema version + let mut conn: PooledConnection = pool.get()?; + let version = curr_db_version(&mut conn)?; + info!("current version is: {:?}", version); + // ensure the schema version is current. + if version != DB_VERSION { + info!("version is not current, exiting"); + panic!("cannot write to schema other than v{DB_VERSION}"); + } } // this channel will contain parsed events ready to be inserted let (event_tx, event_rx) = mpsc::sync_channel(100_000); // Thread for reading events let _stdin_reader_handler = thread::spawn(move || { - let stdin = io::stdin(); - for readline in stdin.lines() { - if let Ok(line) = readline { - // try to parse a nostr event - let eres: Result = serde_json::from_str(&line); - if let Ok(mut e) = eres { - if let Ok(()) = e.validate() { - e.build_index(); - //debug!("Event: {:?}", e); - event_tx.send(Some(e)).ok(); - } else { - info!("could not validate event"); - } - } else { - info!("error reading event: {:?}", eres); - } - } else { - // error reading - info!("error reading: {:?}", readline); - } - } - info!("finished parsing events"); - event_tx.send(None).ok(); - let ok: Result<()> = Ok(()); + let stdin = io::stdin(); + for readline in stdin.lines() { + if let Ok(line) = readline { + // try to parse a nostr event + let eres: Result = serde_json::from_str(&line); + if let Ok(mut e) = eres { + if let Ok(()) = e.validate() { + e.build_index(); + //debug!("Event: {:?}", e); + event_tx.send(Some(e)).ok(); + } else { + info!("could not validate event"); + } + } else { + info!("error reading event: {:?}", eres); + } + } else { + // error reading + info!("error reading: {:?}", readline); + } + } + info!("finished parsing events"); + event_tx.send(None).ok(); + let ok: Result<()> = Ok(()); ok }); let mut conn: PooledConnection = pool.get()?; let mut events_read = 0; - let event_batch_size =50_000; + let event_batch_size = 50_000; let mut new_events = 0; let mut has_more_events = true; while has_more_events { - // begin a transaction - let tx = conn.transaction()?; - // read in batch_size events and commit - for _ in 0..event_batch_size { - match event_rx.recv() { - Ok(Some(e)) => { - events_read += 1; - // ignore ephemeral events - if !(e.kind >= 20000 && e.kind < 30000) { - match write_event(&tx, e) { - Ok(c) => { - new_events += c; - }, - Err(e) => { - info!("error inserting event: {:?}", e); - } - } - } - }, - Ok(None) => { - // signal that the sender will never produce more - // events - has_more_events=false; - break; - }, - Err(_) => { - info!("sender is closed"); - // sender is done - } - } - } - info!("committed {} events...", new_events); - tx.commit()?; - conn.execute_batch("pragma wal_checkpoint(truncate)")?; - + // begin a transaction + let tx = conn.transaction()?; + // read in batch_size events and commit + for _ in 0..event_batch_size { + match event_rx.recv() { + Ok(Some(e)) => { + events_read += 1; + // ignore ephemeral events + if !(e.kind >= 20000 && e.kind < 30000) { + match write_event(&tx, e) { + Ok(c) => { + new_events += c; + } + Err(e) => { + info!("error inserting event: {:?}", e); + } + } + } + } + Ok(None) => { + // signal that the sender will never produce more + // events + has_more_events = false; + break; + } + Err(_) => { + info!("sender is closed"); + // sender is done + } + } + } + info!("committed {} events...", new_events); + tx.commit()?; + conn.execute_batch("pragma wal_checkpoint(truncate)")?; } info!("processed {} events", events_read); info!("stored {} new events", new_events); @@ -131,7 +137,7 @@ fn write_event(tx: &Transaction, e: Event) -> Result { params![id_blob, e.created_at, e.kind, pubkey_blob, delegator_blob, event_str] )?; if ins_count == 0 { - return Ok(0); + return Ok(0); } // we want to capture the event_id that had the tag, the tag name, and the tag hex value. let event_id = tx.last_insert_rowid(); @@ -140,30 +146,30 @@ fn write_event(tx: &Transaction, e: Event) -> Result { let tagname = t.get(0).unwrap(); let tagnamechar_opt = single_char_tagname(tagname); if tagnamechar_opt.is_none() { - continue; + continue; } // safe because len was > 1 let tagval = t.get(1).unwrap(); // insert as BLOB if we can restore it losslessly. // this means it needs to be even length and lowercase. if (tagval.len() % 2 == 0) && is_lower_hex(tagval) { - tx.execute( + tx.execute( "INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);", params![event_id, tagname, hex::decode(tagval).ok()], - )?; + )?; } else { - // otherwise, insert as text - tx.execute( + // otherwise, insert as text + tx.execute( "INSERT INTO tag (event_id, name, value) VALUES (?1, ?2, ?3);", params![event_id, tagname, &tagval], )?; } } if e.is_replaceable() { - //let query = "SELECT id FROM event WHERE kind=? AND author=? ORDER BY created_at DESC LIMIT 1;"; - //let count: usize = tx.query_row(query, params![e.kind, pubkey_blob], |row| row.get(0))?; - //info!("found {} rows that /would/ be preserved", count); - match tx.execute( + //let query = "SELECT id FROM event WHERE kind=? AND author=? ORDER BY created_at DESC LIMIT 1;"; + //let count: usize = tx.query_row(query, params![e.kind, pubkey_blob], |row| row.get(0))?; + //info!("found {} rows that /would/ be preserved", count); + match tx.execute( "DELETE FROM event WHERE kind=? and author=? and id NOT IN (SELECT id FROM event WHERE kind=? AND author=? ORDER BY created_at DESC LIMIT 1);", params![e.kind, pubkey_blob, e.kind, pubkey_blob], ) { diff --git a/src/cli.rs b/src/cli.rs index 49b616e..15a6f14 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -7,14 +7,14 @@ pub struct CLIArgs { short, long, help = "Use the as the location of the database", - required = false, + required = false )] pub db: Option, #[arg( short, long, help = "Use the as the location of the config file", - required = false, + required = false )] pub config: Option, } diff --git a/src/config.rs b/src/config.rs index 8aa0667..442c6bb 100644 --- a/src/config.rs +++ b/src/config.rs @@ -70,7 +70,7 @@ pub struct Limits { pub broadcast_buffer: usize, // events to buffer for subscribers (prevents slow readers from consuming memory) pub event_persist_buffer: usize, // events to buffer for database commits (block senders if database writes are too slow) pub event_kind_blacklist: Option>, - pub event_kind_allowlist: Option> + pub event_kind_allowlist: Option>, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -178,18 +178,20 @@ impl Settings { } } - - fn new_from_default(default: &Settings, config_file_name: &Option) -> Result { + fn new_from_default( + default: &Settings, + config_file_name: &Option, + ) -> Result { let default_config_file_name = "config.toml".to_string(); let config: &String = match config_file_name { Some(value) => value, - None => &default_config_file_name + None => &default_config_file_name, }; let builder = Config::builder(); let config: Config = builder - // use defaults + // use defaults .add_source(Config::try_from(default)?) - // override with file contents + // override with file contents .add_source(File::with_name(config)) .build()?; let mut settings: Settings = config.try_deserialize()?; @@ -229,7 +231,7 @@ impl Default for Settings { in_memory: false, min_conn: 4, max_conn: 8, - connection: "".to_owned(), + connection: "".to_owned(), }, grpc: Grpc { event_admission_server: None, diff --git a/src/db.rs b/src/db.rs index d24f31c..9d478aa 100644 --- a/src/db.rs +++ b/src/db.rs @@ -3,20 +3,20 @@ use crate::config::Settings; use crate::error::{Error, Result}; use crate::event::Event; use crate::notice::Notice; +use crate::repo::postgres::{PostgresPool, PostgresRepo}; +use crate::repo::sqlite::SqliteRepo; +use crate::repo::NostrRepo; use crate::server::NostrMetrics; use crate::nauthz; use governor::clock::Clock; use governor::{Quota, RateLimiter}; use r2d2; -use std::sync::Arc; -use std::thread; use sqlx::pool::PoolOptions; use sqlx::postgres::PgConnectOptions; use sqlx::ConnectOptions; -use crate::repo::sqlite::SqliteRepo; -use crate::repo::postgres::{PostgresRepo,PostgresPool}; -use crate::repo::NostrRepo; -use std::time::{Instant, Duration}; +use std::sync::Arc; +use std::thread; +use std::time::{Duration, Instant}; use tracing::log::LevelFilter; use tracing::{debug, info, trace, warn}; @@ -42,8 +42,8 @@ pub const DB_FILE: &str = "nostr.db"; /// Will panic if the pool could not be created. pub async fn build_repo(settings: &Settings, metrics: NostrMetrics) -> Arc { match settings.database.engine.as_str() { - "sqlite" => {Arc::new(build_sqlite_pool(settings, metrics).await)}, - "postgres" => {Arc::new(build_postgres_pool(settings, metrics).await)}, + "sqlite" => Arc::new(build_sqlite_pool(settings, metrics).await), + "postgres" => Arc::new(build_postgres_pool(settings, metrics).await), _ => panic!("Unknown database engine"), } } @@ -165,10 +165,7 @@ pub async fn db_writer( &event.kind ); notice_tx - .try_send(Notice::blocked( - event.id, - "event kind is blocked by relay" - )) + .try_send(Notice::blocked(event.id, "event kind is blocked by relay")) .ok(); continue; } diff --git a/src/delegation.rs b/src/delegation.rs index 8d06a74..cf7bd4e 100644 --- a/src/delegation.rs +++ b/src/delegation.rs @@ -84,7 +84,8 @@ pub struct ConditionQuery { } impl ConditionQuery { - #[must_use] pub fn allows_event(&self, event: &Event) -> bool { + #[must_use] + pub fn allows_event(&self, event: &Event) -> bool { // check each condition, to ensure that the event complies // with the restriction. for c in &self.conditions { @@ -101,7 +102,8 @@ impl ConditionQuery { } // Verify that the delegator approved the delegation; return a ConditionQuery if so. -#[must_use] pub fn validate_delegation( +#[must_use] +pub fn validate_delegation( delegator: &str, delegatee: &str, cond_query: &str, @@ -144,7 +146,8 @@ pub struct Condition { impl Condition { /// Check if this condition allows the given event to be delegated - #[must_use] pub fn allows_event(&self, event: &Event) -> bool { + #[must_use] + pub fn allows_event(&self, event: &Event) -> bool { // determine what the right-hand side of the operator is let resolved_field = match &self.field { Field::Kind => event.kind, diff --git a/src/event.rs b/src/event.rs index 0aff15f..5af9c3b 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,6 +1,9 @@ //! Event parsing and validation use crate::delegation::validate_delegation; -use crate::error::Error::{CommandUnknownError, EventCouldNotCanonicalize, EventInvalidId, EventInvalidSignature, EventMalformedPubkey}; +use crate::error::Error::{ + CommandUnknownError, EventCouldNotCanonicalize, EventInvalidId, EventInvalidSignature, + EventMalformedPubkey, +}; use crate::error::Result; use crate::nip05; use crate::utils::unix_time; @@ -30,7 +33,8 @@ pub struct EventCmd { } impl EventCmd { - #[must_use] pub fn event_id(&self) -> &str { + #[must_use] + pub fn event_id(&self) -> &str { &self.event.id } } @@ -67,7 +71,8 @@ where } /// Attempt to form a single-char tag name. -#[must_use] pub fn single_char_tagname(tagname: &str) -> Option { +#[must_use] +pub fn single_char_tagname(tagname: &str) -> Option { // We return the tag character if and only if the tagname consists // of a single char. let mut tagnamechars = tagname.chars(); @@ -114,7 +119,8 @@ impl From for Result { impl Event { #[cfg(test)] - #[must_use] pub fn simple_event() -> Event { + #[must_use] + pub fn simple_event() -> Event { Event { id: "0".to_owned(), pubkey: "0".to_owned(), @@ -128,12 +134,14 @@ impl Event { } } - #[must_use] pub fn is_kind_metadata(&self) -> bool { + #[must_use] + pub fn is_kind_metadata(&self) -> bool { self.kind == 0 } /// Should this event be persisted? - #[must_use] pub fn is_ephemeral(&self) -> bool { + #[must_use] + pub fn is_ephemeral(&self) -> bool { self.kind >= 20000 && self.kind < 30000 } @@ -160,29 +168,37 @@ impl Event { } /// Should this event be replaced with newer timestamps from same author? - #[must_use] pub fn is_replaceable(&self) -> bool { - self.kind == 0 || self.kind == 3 || self.kind == 41 || (self.kind >= 10000 && self.kind < 20000) + #[must_use] + pub fn is_replaceable(&self) -> bool { + self.kind == 0 + || self.kind == 3 + || self.kind == 41 + || (self.kind >= 10000 && self.kind < 20000) } /// Should this event be replaced with newer timestamps from same author, for distinct `d` tag values? - #[must_use] pub fn is_param_replaceable(&self) -> bool { + #[must_use] + pub fn is_param_replaceable(&self) -> bool { self.kind >= 30000 && self.kind < 40000 } /// Should this event be replaced with newer timestamps from same author, for distinct `d` tag values? - #[must_use] pub fn distinct_param(&self) -> Option { + #[must_use] + pub fn distinct_param(&self) -> Option { if self.is_param_replaceable() { let default = "".to_string(); - let dvals:Vec<&String> = self.tags + let dvals: Vec<&String> = self + .tags .iter() .filter(|x| !x.is_empty()) .filter(|x| x.get(0).unwrap() == "d") - .map(|x| x.get(1).unwrap_or(&default)).take(1) + .map(|x| x.get(1).unwrap_or(&default)) + .take(1) .collect(); let dval_first = dvals.get(0); match dval_first { - Some(_) => {dval_first.map(|x| x.to_string())}, - None => Some(default) + Some(_) => dval_first.map(|x| x.to_string()), + None => Some(default), } } else { None @@ -190,7 +206,8 @@ impl Event { } /// Pull a NIP-05 Name out of the event, if one exists - #[must_use] pub fn get_nip05_addr(&self) -> Option { + #[must_use] + pub fn get_nip05_addr(&self) -> Option { if self.is_kind_metadata() { // very quick check if we should attempt to parse this json if self.content.contains("\"nip05\"") { @@ -207,7 +224,8 @@ impl Event { // is this event delegated (properly)? // does the signature match, and are conditions valid? // if so, return an alternate author for the event - #[must_use] pub fn delegated_author(&self) -> Option { + #[must_use] + pub fn delegated_author(&self) -> Option { // is there a delegation tag? let delegation_tag: Vec = self .tags @@ -215,7 +233,8 @@ impl Event { .filter(|x| x.len() == 4) .filter(|x| x.get(0).unwrap() == "delegation") .take(1) - .next()?.clone(); // get first tag + .next()? + .clone(); // get first tag //let delegation_tag = self.tag_values_by_name("delegation"); // delegation tags should have exactly 3 elements after the name (pubkey, condition, sig) @@ -275,15 +294,18 @@ impl Event { } /// Create a short event identifier, suitable for logging. - #[must_use] pub fn get_event_id_prefix(&self) -> String { + #[must_use] + pub fn get_event_id_prefix(&self) -> String { self.id.chars().take(8).collect() } - #[must_use] pub fn get_author_prefix(&self) -> String { + #[must_use] + pub fn get_author_prefix(&self) -> String { self.pubkey.chars().take(8).collect() } /// Retrieve tag initial values across all tags matching the name - #[must_use] pub fn tag_values_by_name(&self, tag_name: &str) -> Vec { + #[must_use] + pub fn tag_values_by_name(&self, tag_name: &str) -> Vec { self.tags .iter() .filter(|x| x.len() > 1) @@ -292,7 +314,8 @@ impl Event { .collect() } - #[must_use] pub fn is_valid_timestamp(&self, reject_future_seconds: Option) -> bool { + #[must_use] + pub fn is_valid_timestamp(&self, reject_future_seconds: Option) -> bool { if let Some(allowable_future) = reject_future_seconds { let curr_time = unix_time(); // calculate difference, plus how far future we allow @@ -384,7 +407,8 @@ impl Event { } /// Determine if the given tag and value set intersect with tags in this event. - #[must_use] pub fn generic_tag_val_intersect(&self, tagname: char, check: &HashSet) -> bool { + #[must_use] + pub fn generic_tag_val_intersect(&self, tagname: char, check: &HashSet) -> bool { match &self.tagidx { // check if this is indexable tagname Some(idx) => match idx.get(&tagname) { @@ -423,7 +447,7 @@ mod tests { fn empty_event_tag_match() { let event = Event::simple_event(); assert!(!event - .generic_tag_val_intersect('e', &HashSet::from(["foo".to_owned(), "bar".to_owned()]))); + .generic_tag_val_intersect('e', &HashSet::from(["foo".to_owned(), "bar".to_owned()]))); } #[test] @@ -571,28 +595,28 @@ mod tests { #[test] fn ephemeral_event() { let mut event = Event::simple_event(); - event.kind=20000; + event.kind = 20000; assert!(event.is_ephemeral()); - event.kind=29999; + event.kind = 29999; assert!(event.is_ephemeral()); - event.kind=30000; + event.kind = 30000; assert!(!event.is_ephemeral()); - event.kind=19999; + event.kind = 19999; assert!(!event.is_ephemeral()); } #[test] fn replaceable_event() { let mut event = Event::simple_event(); - event.kind=0; + event.kind = 0; assert!(event.is_replaceable()); - event.kind=3; + event.kind = 3; assert!(event.is_replaceable()); - event.kind=10000; + event.kind = 10000; assert!(event.is_replaceable()); - event.kind=19999; + event.kind = 19999; assert!(event.is_replaceable()); - event.kind=20000; + event.kind = 20000; assert!(!event.is_replaceable()); } @@ -614,8 +638,7 @@ mod tests { // NIP case #1: "tags":[["d",""]] let mut event = Event::simple_event(); event.kind = 30000; - event.tags = vec![ - vec!["d".to_owned(), "".to_owned()]]; + event.tags = vec![vec!["d".to_owned(), "".to_owned()]]; assert_eq!(event.distinct_param(), Some("".to_string())); } @@ -632,8 +655,7 @@ mod tests { // NIP case #3: "tags":[["d"]]: implicit empty value "" let mut event = Event::simple_event(); event.kind = 30000; - event.tags = vec![ - vec!["d".to_owned()]]; + event.tags = vec![vec!["d".to_owned()]]; assert_eq!(event.distinct_param(), Some("".to_string())); } @@ -644,7 +666,7 @@ mod tests { event.kind = 30000; event.tags = vec![ vec!["d".to_owned(), "".to_string()], - vec!["d".to_owned(), "not empty".to_string()] + vec!["d".to_owned(), "not empty".to_string()], ]; assert_eq!(event.distinct_param(), Some("".to_string())); } @@ -657,7 +679,7 @@ mod tests { event.kind = 30000; event.tags = vec![ vec!["d".to_owned(), "not empty".to_string()], - vec!["d".to_owned(), "".to_string()] + vec!["d".to_owned(), "".to_string()], ]; assert_eq!(event.distinct_param(), Some("not empty".to_string())); } @@ -670,7 +692,7 @@ mod tests { event.tags = vec![ vec!["d".to_owned()], vec!["d".to_owned(), "second value".to_string()], - vec!["d".to_owned(), "third value".to_string()] + vec!["d".to_owned(), "third value".to_string()], ]; assert_eq!(event.distinct_param(), Some("".to_string())); } @@ -680,9 +702,7 @@ mod tests { // NIP case #6: "tags":[["e"]]: same as no tags let mut event = Event::simple_event(); event.kind = 30000; - event.tags = vec![ - vec!["e".to_owned()], - ]; + event.tags = vec![vec!["e".to_owned()]]; assert_eq!(event.distinct_param(), Some("".to_string())); } diff --git a/src/hexrange.rs b/src/hexrange.rs index 3020faa..a9e3e83 100644 --- a/src/hexrange.rs +++ b/src/hexrange.rs @@ -1,5 +1,5 @@ //! Utilities for searching hexadecimal -use crate::utils::{is_hex}; +use crate::utils::is_hex; use hex; /// Types of hexadecimal queries. @@ -19,7 +19,8 @@ fn is_all_fs(s: &str) -> bool { } /// Find the next hex sequence greater than the argument. -#[must_use] pub fn hex_range(s: &str) -> Option { +#[must_use] +pub fn hex_range(s: &str) -> Option { let mut hash_base = s.to_owned(); if !is_hex(&hash_base) || hash_base.len() > 64 { return None; @@ -56,9 +57,9 @@ fn is_all_fs(s: &str) -> bool { } else if odd { // check if first char in this byte is NOT 'f' if b < 240 { - // bump up the first character in this byte + // bump up the first character in this byte upper[byte_len] = b + 16; - // increment done, stop iterating through the vec + // increment done, stop iterating through the vec break; } // if it is 'f', reset the byte to 0 and do a carry diff --git a/src/main.rs b/src/main.rs index d4221fa..712c902 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ //! Server process use clap::Parser; +use console_subscriber::ConsoleLayer; use nostr_rs_relay::cli::CLIArgs; use nostr_rs_relay::config; use nostr_rs_relay::server::start_server; @@ -7,7 +8,6 @@ use std::sync::mpsc as syncmpsc; use std::sync::mpsc::{Receiver as MpscReceiver, Sender as MpscSender}; use std::thread; use tracing::info; -use console_subscriber::ConsoleLayer; /// Start running a Nostr relay server. fn main() { diff --git a/src/nip05.rs b/src/nip05.rs index 00f1689..6849b36 100644 --- a/src/nip05.rs +++ b/src/nip05.rs @@ -8,11 +8,11 @@ use crate::config::VerifiedUsers; use crate::error::{Error, Result}; use crate::event::Event; use crate::repo::NostrRepo; -use std::sync::Arc; use hyper::body::HttpBody; use hyper::client::connect::HttpConnector; use hyper::Client; use hyper_tls::HttpsConnector; +use std::sync::Arc; use std::time::Duration; use std::time::Instant; use std::time::SystemTime; @@ -48,7 +48,8 @@ pub struct Nip05Name { impl Nip05Name { /// Does this name represent the entire domain? - #[must_use] pub fn is_domain_only(&self) -> bool { + #[must_use] + pub fn is_domain_only(&self) -> bool { self.local == "_" } @@ -58,8 +59,8 @@ impl Nip05Name { "https://{}/.well-known/nostr.json?name={}", self.domain, self.local ) - .parse::() - .ok() + .parse::() + .ok() } } @@ -73,7 +74,10 @@ impl std::convert::TryFrom<&str> for Nip05Name { // check if local name is valid let local = components[0]; let domain = components[1]; - if local.chars().all(|x| x.is_alphanumeric() || x == '_' || x == '-' || x == '.') { + if local + .chars() + .all(|x| x.is_alphanumeric() || x == '_' || x == '-' || x == '.') + { if domain .chars() .all(|x| x.is_alphanumeric() || x == '-' || x == '.') @@ -349,38 +353,34 @@ impl Verifier { UserWebVerificationStatus::Verified => { // freshly verified account, update the // timestamp. - self.repo.update_verification_timestamp(v.rowid) - .await?; + self.repo.update_verification_timestamp(v.rowid).await?; info!("verification updated for {}", v.to_string()); - } UserWebVerificationStatus::DomainNotAllowed - | UserWebVerificationStatus::Unknown => { - // server may be offline, or temporarily - // blocked by the config file. Note the - // failure so we can process something - // else. + | UserWebVerificationStatus::Unknown => { + // server may be offline, or temporarily + // blocked by the config file. Note the + // failure so we can process something + // else. - // have we had enough failures to give up? - if v.failure_count >= max_failures as u64 { - info!( - "giving up on verifying {:?} after {} failures", - v.name, v.failure_count - ); - self.repo.delete_verification(v.rowid) - .await?; - } else { - // record normal failure, incrementing failure count - info!("verification failed for {}", v.to_string()); - self.repo.fail_verification(v.rowid).await?; - } - } + // have we had enough failures to give up? + if v.failure_count >= max_failures as u64 { + info!( + "giving up on verifying {:?} after {} failures", + v.name, v.failure_count + ); + self.repo.delete_verification(v.rowid).await?; + } else { + // record normal failure, incrementing failure count + info!("verification failed for {}", v.to_string()); + self.repo.fail_verification(v.rowid).await?; + } + } UserWebVerificationStatus::Unverified => { // domain has removed the verification, drop // the record on our side. info!("verification rescinded for {}", v.to_string()); - self.repo.delete_verification(v.rowid) - .await?; + self.repo.delete_verification(v.rowid).await?; } } } @@ -433,7 +433,9 @@ impl Verifier { } } // write the verification record - self.repo.create_verification_record(&event.id, name).await?; + self.repo + .create_verification_record(&event.id, name) + .await?; Ok(()) } } @@ -463,7 +465,8 @@ pub struct VerificationRecord { /// Check with settings to determine if a given domain is allowed to /// publish. -#[must_use] pub fn is_domain_allowed( +#[must_use] +pub fn is_domain_allowed( domain: &str, whitelist: &Option>, blacklist: &Option>, @@ -483,7 +486,8 @@ pub struct VerificationRecord { impl VerificationRecord { /// Check if the record is recent enough to be considered valid, /// and the domain is allowed. - #[must_use] pub fn is_valid(&self, verified_users_settings: &VerifiedUsers) -> bool { + #[must_use] + pub fn is_valid(&self, verified_users_settings: &VerifiedUsers) -> bool { //let settings = SETTINGS.read().unwrap(); // how long a verification record is good for let nip05_expiration = &verified_users_settings.verify_expiration_duration; diff --git a/src/notice.rs b/src/notice.rs index 0dd8202..39c9602 100644 --- a/src/notice.rs +++ b/src/notice.rs @@ -20,14 +20,16 @@ pub enum Notice { } impl EventResultStatus { - #[must_use] pub fn to_bool(&self) -> bool { + #[must_use] + pub fn to_bool(&self) -> bool { match self { Self::Duplicate | Self::Saved => true, - Self::Invalid |Self::Blocked | Self::RateLimited | Self::Error => false, + Self::Invalid | Self::Blocked | Self::RateLimited | Self::Error => false, } } - #[must_use] pub fn prefix(&self) -> &'static str { + #[must_use] + pub fn prefix(&self) -> &'static str { match self { Self::Saved => "saved", Self::Duplicate => "duplicate", @@ -44,7 +46,8 @@ impl Notice { // Notice::err_msg(format!("{}", err), id) //} - #[must_use] pub fn message(msg: String) -> Notice { + #[must_use] + pub fn message(msg: String) -> Notice { Notice::Message(msg) } @@ -53,27 +56,33 @@ impl Notice { Notice::EventResult(EventResult { id, msg, status }) } - #[must_use] pub fn invalid(id: String, msg: &str) -> Notice { + #[must_use] + pub fn invalid(id: String, msg: &str) -> Notice { Notice::prefixed(id, msg, EventResultStatus::Invalid) } - #[must_use] pub fn blocked(id: String, msg: &str) -> Notice { + #[must_use] + pub fn blocked(id: String, msg: &str) -> Notice { Notice::prefixed(id, msg, EventResultStatus::Blocked) } - #[must_use] pub fn rate_limited(id: String, msg: &str) -> Notice { + #[must_use] + pub fn rate_limited(id: String, msg: &str) -> Notice { Notice::prefixed(id, msg, EventResultStatus::RateLimited) } - #[must_use] pub fn duplicate(id: String) -> Notice { + #[must_use] + pub fn duplicate(id: String) -> Notice { Notice::prefixed(id, "", EventResultStatus::Duplicate) } - #[must_use] pub fn error(id: String, msg: &str) -> Notice { + #[must_use] + pub fn error(id: String, msg: &str) -> Notice { Notice::prefixed(id, msg, EventResultStatus::Error) } - #[must_use] pub fn saved(id: String) -> Notice { + #[must_use] + pub fn saved(id: String) -> Notice { Notice::EventResult(EventResult { id, msg: "".into(), diff --git a/src/repo/mod.rs b/src/repo/mod.rs index a74c665..a0ddea6 100644 --- a/src/repo/mod.rs +++ b/src/repo/mod.rs @@ -7,10 +7,10 @@ use crate::utils::unix_time; use async_trait::async_trait; use rand::Rng; -pub mod sqlite; -pub mod sqlite_migration; pub mod postgres; pub mod postgres_migration; +pub mod sqlite; +pub mod sqlite_migration; #[async_trait] pub trait NostrRepo: Send + Sync { diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index f9b85ca..3ade449 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -8,10 +8,11 @@ use async_std::stream::StreamExt; use async_trait::async_trait; use chrono::{DateTime, TimeZone, Utc}; use sqlx::postgres::PgRow; +use sqlx::Error::RowNotFound; use sqlx::{Error, Execute, FromRow, Postgres, QueryBuilder, Row}; use std::time::{Duration, Instant}; -use sqlx::Error::RowNotFound; +use crate::error; use crate::hexrange::{hex_range, HexSearch}; use crate::repo::postgres_migration::run_migrations; use crate::server::NostrMetrics; @@ -19,8 +20,7 @@ use crate::utils::{is_hex, is_lower_hex, self}; use tokio::sync::mpsc::Sender; use tokio::sync::oneshot::Receiver; use tracing::log::trace; -use tracing::{debug, error, warn, info}; -use crate::error; +use tracing::{debug, info, warn, error}; pub type PostgresPool = sqlx::pool::Pool; @@ -78,7 +78,6 @@ async fn delete_expired(conn:PostgresPool) -> Result { #[async_trait] impl NostrRepo for PostgresRepo { - async fn start(&self) -> Result<()> { // begin a cleanup task for expired events. cleanup_expired(self.conn.clone(), Duration::from_secs(600)).await?; @@ -116,7 +115,7 @@ impl NostrRepo for PostgresRepo { } } if let Some(d_tag) = e.distinct_param() { - let repl_count:i64 = if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) { + let repl_count: i64 = if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) { sqlx::query_scalar( "SELECT count(*) AS count FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.pub_key=$1 AND e.kind=$2 AND t.name='d' AND t.value_hex=$3 AND e.created_at >= $4 LIMIT 1;") .bind(hex::decode(&e.pubkey).ok()) @@ -139,7 +138,7 @@ impl NostrRepo for PostgresRepo { // the same author/kind/tag value exist, and we can ignore // this event. if repl_count > 0 { - return Ok(0) + return Ok(0); } } // ignore if the event hash is a duplicate. @@ -159,7 +158,6 @@ ON CONFLICT (id) DO NOTHING"#, .execute(&mut tx) .await? .rows_affected(); - if ins_count == 0 { // if the event was a duplicate, no need to insert event or // pubkey references. This will abort the txn. @@ -276,10 +274,10 @@ ON CONFLICT (id) DO NOTHING"#, LEFT JOIN tag t ON e.id = t.event_id \ WHERE e.pub_key = $1 AND t.\"name\" = 'e' AND e.kind = 5 AND t.value = $2 LIMIT 1", ) - .bind(&pubkey_blob) - .bind(&id_blob) - .fetch_optional(&mut tx) - .await?; + .bind(&pubkey_blob) + .bind(&id_blob) + .fetch_optional(&mut tx) + .await?; // check if a the query returned a result, meaning we should // hid the current event @@ -380,7 +378,10 @@ ON CONFLICT (id) DO NOTHING"#, // check if this is still active; every 100 rows if row_count % 100 == 0 && abandon_query_rx.try_recv().is_ok() { - debug!("query cancelled by client (cid: {}, sub: {:?})", client_id, sub.id); + debug!( + "query cancelled by client (cid: {}, sub: {:?})", + client_id, sub.id + ); return Ok(()); } @@ -396,7 +397,10 @@ ON CONFLICT (id) DO NOTHING"#, if last_successful_send + abort_cutoff < Instant::now() { // the queue has been full for too long, abort info!("aborting database query due to slow client"); - metrics.query_aborts.with_label_values(&["slowclient"]).inc(); + metrics + .query_aborts + .with_label_values(&["slowclient"]) + .inc(); return Ok(()); } // give the queue a chance to clear before trying again @@ -467,9 +471,7 @@ ON CONFLICT (id) DO NOTHING"#, let verify_time = now_jitter(600); // update verification time and reset any failure count - sqlx::query( - "UPDATE user_verification SET verified_at = $1, fail_count = 0 WHERE id = $2", - ) + sqlx::query("UPDATE user_verification SET verified_at = $1, fail_count = 0 WHERE id = $2") .bind(Utc.timestamp_opt(verify_time as i64, 0).unwrap()) .bind(id as i64) .execute(&self.conn) @@ -767,8 +769,7 @@ fn query_from_filter(f: &ReqFilter) -> Option> { impl FromRow<'_, PgRow> for VerificationRecord { fn from_row(row: &'_ PgRow) -> std::result::Result { - let name = - Nip05Name::try_from(row.get::<'_, &str, &str>("name")).or(Err(RowNotFound))?; + let name = Nip05Name::try_from(row.get::<'_, &str, &str>("name")).or(Err(RowNotFound))?; Ok(VerificationRecord { rowid: row.get::<'_, i64, &str>("id") as u64, name, diff --git a/src/repo/sqlite.rs b/src/repo/sqlite.rs index cf8bf59..a892e38 100644 --- a/src/repo/sqlite.rs +++ b/src/repo/sqlite.rs @@ -1,32 +1,33 @@ //! Event persistence and querying //use crate::config::SETTINGS; use crate::config::Settings; -use crate::error::{Result,Error::SqlError}; +use crate::db::QueryResult; +use crate::error::Result; +use crate::error::Error::SqlError; use crate::event::{single_char_tagname, Event}; use crate::hexrange::hex_range; use crate::hexrange::HexSearch; -use crate::repo::sqlite_migration::{STARTUP_SQL,upgrade_db}; -use crate::utils::{is_hex,unix_time}; use crate::nip05::{Nip05Name, VerificationRecord}; -use crate::subscription::{ReqFilter, Subscription}; +use crate::repo::sqlite_migration::{upgrade_db, STARTUP_SQL}; use crate::server::NostrMetrics; +use crate::subscription::{ReqFilter, Subscription}; +use crate::utils::{is_hex, unix_time}; +use async_trait::async_trait; use hex; use r2d2; use r2d2_sqlite::SqliteConnectionManager; use rusqlite::params; use rusqlite::types::ToSql; use rusqlite::OpenFlags; -use tokio::sync::{Mutex, MutexGuard, Semaphore}; use std::fmt::Write as _; use std::path::Path; use std::sync::Arc; use std::thread; use std::time::Duration; use std::time::Instant; +use tokio::sync::{Mutex, MutexGuard, Semaphore}; use tokio::task; use tracing::{debug, info, trace, warn}; -use async_trait::async_trait; -use crate::db::QueryResult; use crate::repo::{now_jitter, NostrRepo}; @@ -54,7 +55,8 @@ pub struct SqliteRepo { impl SqliteRepo { // build all the pools needed - #[must_use] pub fn new(settings: &Settings, metrics: NostrMetrics) -> SqliteRepo { + #[must_use] + pub fn new(settings: &Settings, metrics: NostrMetrics) -> SqliteRepo { let write_pool = build_pool( "writer", settings, @@ -110,7 +112,8 @@ impl SqliteRepo { // get relevant fields from event and convert to blobs. let id_blob = hex::decode(&e.id).ok(); let pubkey_blob: Option> = hex::decode(&e.pubkey).ok(); - let delegator_blob: Option> = e.delegated_by.as_ref().and_then(|d| hex::decode(d).ok()); + let delegator_blob: Option> = + e.delegated_by.as_ref().and_then(|d| hex::decode(d).ok()); let event_str = serde_json::to_string(&e).ok(); // check for replaceable events that would hide this one; we won't even attempt to insert these. if e.is_replaceable() { @@ -130,7 +133,7 @@ impl SqliteRepo { // the same author/kind/tag value exist, and we can ignore // this event. if repl_count.ok().is_some() { - return Ok(0) + return Ok(0); } } // ignore if the event hash is a duplicate. @@ -249,21 +252,24 @@ impl SqliteRepo { #[async_trait] impl NostrRepo for SqliteRepo { - async fn start(&self) -> Result<()> { - db_checkpoint_task(self.maint_pool.clone(), Duration::from_secs(60), - self.write_in_progress.clone(), - self.checkpoint_in_progress.clone()).await?; - cleanup_expired(self.maint_pool.clone(), Duration::from_secs(600), - self.write_in_progress.clone()).await + db_checkpoint_task( + self.maint_pool.clone(), + Duration::from_secs(60), + self.write_in_progress.clone(), + self.checkpoint_in_progress.clone() + ).await?; + cleanup_expired( + self.maint_pool.clone(), + Duration::from_secs(600), + self.write_in_progress.clone() + ).await } async fn migrate_up(&self) -> Result { let _write_guard = self.write_in_progress.lock().await; let mut conn = self.write_pool.get()?; - task::spawn_blocking(move || { - upgrade_db(&mut conn) - }).await? + task::spawn_blocking(move || upgrade_db(&mut conn)).await? } /// Persist event to database async fn write_event(&self, e: &Event) -> Result { @@ -322,9 +328,14 @@ impl NostrRepo for SqliteRepo { // thread pool waiting for queries to finish under high load. // Instead, don't bother spawning threads when they will just // block on a database connection. - let sem = self.reader_threads_ready.clone().acquire_owned().await.unwrap(); - let self=self.clone(); - let metrics=self.metrics.clone(); + let sem = self + .reader_threads_ready + .clone() + .acquire_owned() + .await + .unwrap(); + let self = self.clone(); + let metrics = self.metrics.clone(); task::spawn_blocking(move || { { // if we are waiting on a checkpoint, stop until it is complete @@ -349,7 +360,10 @@ impl NostrRepo for SqliteRepo { } // check before getting a DB connection if the client still wants the results if abandon_query_rx.try_recv().is_ok() { - debug!("query cancelled by client (before execution) (cid: {}, sub: {:?})", client_id, sub.id); + debug!( + "query cancelled by client (before execution) (cid: {}, sub: {:?})", + client_id, sub.id + ); return Ok(()); } @@ -362,7 +376,9 @@ impl NostrRepo for SqliteRepo { if let Ok(mut conn) = self.read_pool.get() { { let pool_state = self.read_pool.state(); - metrics.db_connections.set((pool_state.connections - pool_state.idle_connections).into()); + metrics + .db_connections + .set((pool_state.connections - pool_state.idle_connections).into()); } for filter in sub.filters.iter() { let filter_start = Instant::now(); @@ -379,7 +395,7 @@ impl NostrRepo for SqliteRepo { let mut last_successful_send = Instant::now(); // execute the query. // make the actual SQL query (with parameters inserted) available - conn.trace(Some(|x| {trace!("SQL trace: {:?}", x)})); + conn.trace(Some(|x| trace!("SQL trace: {:?}", x))); let mut stmt = conn.prepare_cached(&q)?; let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?; @@ -397,7 +413,10 @@ impl NostrRepo for SqliteRepo { if slow_first_event && client_id.starts_with('0') { debug!( "filter first result in {:?} (slow): {} (cid: {}, sub: {:?})", - first_event_elapsed, serde_json::to_string(&filter)?, client_id, sub.id + first_event_elapsed, + serde_json::to_string(&filter)?, + client_id, + sub.id ); } first_result = false; @@ -407,8 +426,14 @@ impl NostrRepo for SqliteRepo { { if self.checkpoint_in_progress.try_lock().is_err() { // lock was held, abort this query - debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id); - metrics.query_aborts.with_label_values(&["checkpoint"]).inc(); + debug!( + "query aborted due to checkpoint (cid: {}, sub: {:?})", + client_id, sub.id + ); + metrics + .query_aborts + .with_label_values(&["checkpoint"]) + .inc(); return Ok(()); } } @@ -416,7 +441,10 @@ impl NostrRepo for SqliteRepo { // check if this is still active; every 100 rows if row_count % 100 == 0 && abandon_query_rx.try_recv().is_ok() { - debug!("query cancelled by client (cid: {}, sub: {:?})", client_id, sub.id); + debug!( + "query cancelled by client (cid: {}, sub: {:?})", + client_id, sub.id + ); return Ok(()); } row_count += 1; @@ -432,19 +460,31 @@ impl NostrRepo for SqliteRepo { // the queue has been full for too long, abort info!("aborting database query due to slow client (cid: {}, sub: {:?})", client_id, sub.id); - metrics.query_aborts.with_label_values(&["slowclient"]).inc(); + metrics + .query_aborts + .with_label_values(&["slowclient"]) + .inc(); let ok: Result<()> = Ok(()); return ok; } // check if a checkpoint is trying to run, and abort if self.checkpoint_in_progress.try_lock().is_err() { // lock was held, abort this query - debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id); - metrics.query_aborts.with_label_values(&["checkpoint"]).inc(); + debug!( + "query aborted due to checkpoint (cid: {}, sub: {:?})", + client_id, sub.id + ); + metrics + .query_aborts + .with_label_values(&["checkpoint"]) + .inc(); return Ok(()); } // give the queue a chance to clear before trying again - debug!("query thread sleeping due to full query_tx (cid: {}, sub: {:?})", client_id, sub.id); + debug!( + "query thread sleeping due to full query_tx (cid: {}, sub: {:?})", + client_id, sub.id + ); thread::sleep(Duration::from_millis(500)); } // TODO: we could use try_send, but we'd have to juggle @@ -465,10 +505,12 @@ impl NostrRepo for SqliteRepo { if filter_start.elapsed() > slow_cutoff && client_id.starts_with('0') { debug!( "query filter req (slow): {} (cid: {}, sub: {:?}, filter: {})", - serde_json::to_string(&filter)?, client_id, sub.id, filter_count + serde_json::to_string(&filter)?, + client_id, + sub.id, + filter_count ); } - } } else { warn!("Could not get a database connection for querying"); @@ -504,7 +546,8 @@ impl NostrRepo for SqliteRepo { let start = Instant::now(); conn.execute_batch("PRAGMA optimize;").ok(); info!("optimize ran in {:?}", start.elapsed()); - }).await?; + }) + .await?; Ok(()) } @@ -557,8 +600,7 @@ impl NostrRepo for SqliteRepo { let ok: Result<()> = Ok(()); ok }) - .await? - + .await? } /// Update verification record as failed @@ -596,7 +638,7 @@ impl NostrRepo for SqliteRepo { let ok: Result<()> = Ok(()); ok }) - .await? + .await? } /// Get the latest verification record for a given pubkey. @@ -684,14 +726,15 @@ fn override_index(f: &ReqFilter) -> Option { // queries for multiple kinds default to kind_index, which is // significantly slower than kind_created_at_index. if let Some(ks) = &f.kinds { - if f.ids.is_none() && - ks.len() > 1 && - f.since.is_none() && - f.until.is_none() && - f.tags.is_none() && - f.authors.is_none() { - return Some("kind_created_at_index".into()); - } + if f.ids.is_none() + && ks.len() > 1 + && f.since.is_none() + && f.until.is_none() + && f.tags.is_none() + && f.authors.is_none() + { + return Some("kind_created_at_index".into()); + } } // if there is an author, it is much better to force the authors index. if f.authors.is_some() { @@ -726,7 +769,9 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec>, Option> = vec![]; @@ -744,9 +789,7 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec>, Option { - auth_searches.push( - "(author>? AND author? AND author (String, Vec>, Vec Result { } /// Perform database WAL checkpoint on a regular basis -pub async fn db_checkpoint_task(pool: SqlitePool, frequency: Duration, write_in_progress: Arc>, checkpoint_in_progress: Arc>) -> Result<()> { +pub async fn db_checkpoint_task( + pool: SqlitePool, + frequency: Duration, + write_in_progress: Arc>, + checkpoint_in_progress: Arc>) -> Result<()> { // TODO; use acquire_many on the reader semaphore to stop them from interrupting this. tokio::task::spawn(async move { // WAL size in pages. let mut current_wal_size = 0; // WAL threshold for more aggressive checkpointing (10,000 pages, or about 40MB) - let wal_threshold = 1000*10; + let wal_threshold = 1000 * 10; // default threshold for the busy timer let busy_wait_default = Duration::from_secs(1); // if the WAL file is getting too big, switch to this @@ -1079,7 +1126,6 @@ pub fn checkpoint_db(conn: &mut PooledConnection) -> Result { Ok(wal_size as usize) } - /// Produce a arbitrary list of '?' parameters. fn repeat_vars(count: usize) -> String { if count == 0 { @@ -1113,7 +1159,6 @@ fn log_pool_stats(name: &str, pool: &SqlitePool) { ); } - /// Check if the pool is fully utilized fn _pool_at_capacity(pool: &SqlitePool) -> bool { let state: r2d2::State = pool.state(); diff --git a/src/repo/sqlite_migration.rs b/src/repo/sqlite_migration.rs index a139d67..c0c4f1c 100644 --- a/src/repo/sqlite_migration.rs +++ b/src/repo/sqlite_migration.rs @@ -253,8 +253,8 @@ pub fn rebuild_tags(conn: &mut PooledConnection) -> Result<()> { let mut stmt = tx.prepare("select id, content from event order by id;")?; let mut tag_rows = stmt.query([])?; while let Some(row) = tag_rows.next()? { - if (events_processed as f32)/(count as f32) > percent_done { - info!("Tag update {}% complete...", (100.0*percent_done).round()); + if (events_processed as f32) / (count as f32) > percent_done { + info!("Tag update {}% complete...", (100.0 * percent_done).round()); percent_done += update_each_percent; } // we want to capture the event_id that had the tag, the tag name, and the tag hex value. @@ -293,8 +293,6 @@ pub fn rebuild_tags(conn: &mut PooledConnection) -> Result<()> { Ok(()) } - - //// Migration Scripts fn mig_1_to_2(conn: &mut PooledConnection) -> Result { @@ -586,11 +584,17 @@ fn mig_11_to_12(conn: &mut PooledConnection) -> Result { tx.execute("PRAGMA user_version = 12;", [])?; } tx.commit()?; - info!("database schema upgraded v11 -> v12 in {:?}", start.elapsed()); + info!( + "database schema upgraded v11 -> v12 in {:?}", + start.elapsed() + ); // vacuum after large table modification let start = Instant::now(); conn.execute("VACUUM;", [])?; - info!("vacuumed DB after hidden event cleanup in {:?}", start.elapsed()); + info!( + "vacuumed DB after hidden event cleanup in {:?}", + start.elapsed() + ); Ok(12) } @@ -656,7 +660,7 @@ PRAGMA user_version = 15; match conn.execute_batch(clear_hidden_sql) { Ok(()) => { info!("all hidden events removed"); - }, + } Err(err) => { error!("delete failed: {}", err); panic!("could not remove hidden events"); diff --git a/src/server.rs b/src/server.rs index d1e52ce..7a8dffa 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,7 +3,6 @@ use crate::close::Close; use crate::close::CloseCmd; use crate::config::{Settings, VerifiedUsersMode}; use crate::conn; -use crate::repo::NostrRepo; use crate::db; use crate::db::SubmittedEvent; use crate::error::{Error, Result}; @@ -14,10 +13,8 @@ use crate::event::EventCmd; use crate::info::RelayInfo; use crate::nip05; use crate::notice::Notice; +use crate::repo::NostrRepo; use crate::subscription::Subscription; -use prometheus::IntCounterVec; -use prometheus::IntGauge; -use prometheus::{Encoder, Histogram, IntCounter, HistogramOpts, Opts, Registry, TextEncoder}; use futures::SinkExt; use futures::StreamExt; use governor::{Jitter, Quota, RateLimiter}; @@ -28,6 +25,9 @@ use hyper::upgrade::Upgraded; use hyper::{ header, server::conn::AddrStream, upgrade, Body, Request, Response, Server, StatusCode, }; +use prometheus::IntCounterVec; +use prometheus::IntGauge; +use prometheus::{Encoder, Histogram, HistogramOpts, IntCounter, Opts, Registry, TextEncoder}; use serde::{Deserialize, Serialize}; use serde_json::json; use std::collections::HashMap; @@ -37,9 +37,9 @@ use std::io::BufReader; use std::io::Read; use std::net::SocketAddr; use std::path::Path; -use std::sync::Arc; use std::sync::atomic::Ordering; use std::sync::mpsc::Receiver as MpscReceiver; +use std::sync::Arc; use std::time::Duration; use std::time::Instant; use tokio::runtime::Builder; @@ -103,7 +103,7 @@ async fn handle_web_request( tokio_tungstenite::tungstenite::protocol::Role::Server, Some(config), ) - .await; + .await; let origin = get_header_string("origin", request.headers()); let user_agent = get_header_string("user-agent", request.headers()); // determine the remote IP from headers if the exist @@ -167,19 +167,19 @@ async fn handle_web_request( let rinfo = RelayInfo::from(settings); let b = Body::from(serde_json::to_string_pretty(&rinfo).unwrap()); return Ok(Response::builder() - .status(200) - .header("Content-Type", "application/nostr+json") - .header("Access-Control-Allow-Origin", "*") - .body(b) - .unwrap()); + .status(200) + .header("Content-Type", "application/nostr+json") + .header("Access-Control-Allow-Origin", "*") + .body(b) + .unwrap()); } } } Ok(Response::builder() - .status(200) - .header("Content-Type", "text/plain") - .body(Body::from("Please use a Nostr client to connect.")) - .unwrap()) + .status(200) + .header("Content-Type", "text/plain") + .body(Body::from("Please use a Nostr client to connect.")) + .unwrap()) } ("/metrics", false) => { let mut buffer = vec![]; @@ -188,10 +188,10 @@ async fn handle_web_request( encoder.encode(&metric_families, &mut buffer).unwrap(); Ok(Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "text/plain") - .body(Body::from(buffer)) - .unwrap()) + .status(StatusCode::OK) + .header("Content-Type", "text/plain") + .body(Body::from(buffer)) + .unwrap()) } ("/favicon.ico", false) => { if let Some(favicon_bytes) = favicon { @@ -213,9 +213,9 @@ async fn handle_web_request( (_, _) => { //handle any other url Ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::from("Nothing here.")) - .unwrap()) + .status(StatusCode::NOT_FOUND) + .body(Body::from("Nothing here.")) + .unwrap()) } } } @@ -256,50 +256,42 @@ fn create_metrics() -> (Registry, NostrMetrics) { let query_sub = Histogram::with_opts(HistogramOpts::new( "nostr_query_seconds", "Subscription response times", - )).unwrap(); + )) + .unwrap(); let query_db = Histogram::with_opts(HistogramOpts::new( "nostr_filter_seconds", "Filter SQL query times", - )).unwrap(); + )) + .unwrap(); let write_events = Histogram::with_opts(HistogramOpts::new( "nostr_events_write_seconds", "Event writing response times", - )).unwrap(); + )) + .unwrap(); let sent_events = IntCounterVec::new( - Opts::new("nostr_events_sent_total", "Events sent to clients"), - vec!["source"].as_slice(), - ).unwrap(); - let connections = IntCounter::with_opts(Opts::new( - "nostr_connections_total", - "New connections", - )).unwrap(); + Opts::new("nostr_events_sent_total", "Events sent to clients"), + vec!["source"].as_slice(), + ) + .unwrap(); + let connections = + IntCounter::with_opts(Opts::new("nostr_connections_total", "New connections")).unwrap(); let db_connections = IntGauge::with_opts(Opts::new( - "nostr_db_connections", "Active database connections" - )).unwrap(); + "nostr_db_connections", + "Active database connections", + )) + .unwrap(); let query_aborts = IntCounterVec::new( Opts::new("nostr_query_abort_total", "Aborted queries"), vec!["reason"].as_slice(), ).unwrap(); - let cmd_req = IntCounter::with_opts(Opts::new( - "nostr_cmd_req_total", - "REQ commands", - )).unwrap(); - let cmd_event = IntCounter::with_opts(Opts::new( - "nostr_cmd_event_total", - "EVENT commands", - )).unwrap(); - let cmd_close = IntCounter::with_opts(Opts::new( - "nostr_cmd_close_total", - "CLOSE commands", - )).unwrap(); - let cmd_auth = IntCounter::with_opts(Opts::new( - "nostr_cmd_auth_total", - "AUTH commands", - )).unwrap(); - let disconnects = IntCounterVec::new( - Opts::new("nostr_disconnects_total", "Client disconnects"), - vec!["reason"].as_slice(), - ).unwrap(); + let cmd_req = IntCounter::with_opts(Opts::new("nostr_cmd_req_total", "REQ commands")).unwrap(); + let cmd_event = + IntCounter::with_opts(Opts::new("nostr_cmd_event_total", "EVENT commands")).unwrap(); + let cmd_close = + IntCounter::with_opts(Opts::new("nostr_cmd_close_total", "CLOSE commands")).unwrap(); + let cmd_auth = IntCounter::with_opts(Opts::new("nostr_cmd_auth_total", "AUTH commands")).unwrap(); + let disconnects = IntCounterVec::new(Opts::new("nostr_disconnects_total", "Client disconnects"), + vec!["reason"].as_slice()).unwrap(); registry.register(Box::new(query_sub.clone())).unwrap(); registry.register(Box::new(query_db.clone())).unwrap(); registry.register(Box::new(write_events.clone())).unwrap(); @@ -319,14 +311,14 @@ fn create_metrics() -> (Registry, NostrMetrics) { sent_events, connections, db_connections, - disconnects, + disconnects, query_aborts, cmd_req, cmd_event, cmd_close, cmd_auth, }; - (registry,metrics) + (registry, metrics) } fn file_bytes(path: &str) -> Result> { @@ -383,11 +375,12 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul .enable_all() .thread_name_fn(|| { // give each thread a unique numeric name - static ATOMIC_ID: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0); - let id = ATOMIC_ID.fetch_add(1,Ordering::SeqCst); + static ATOMIC_ID: std::sync::atomic::AtomicUsize = + std::sync::atomic::AtomicUsize::new(0); + let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); format!("tokio-ws-{id}") }) - // limit concurrent SQLite blocking threads + // limit concurrent SQLite blocking threads .max_blocking_threads(settings.limits.max_blocking_threads) .on_thread_start(|| { trace!("started new thread: {:?}", std::thread::current().name()); @@ -425,27 +418,30 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul // metadata events. let (metadata_tx, metadata_rx) = broadcast::channel::(4096); - let (registry, metrics) = create_metrics(); + let (registry, metrics) = create_metrics(); // build a repository for events let repo = db::build_repo(&settings, metrics.clone()).await; // start the database writer task. Give it a channel for // writing events, and for publishing events that have been // written (to all connected clients). - tokio::task::spawn( - db::db_writer( - repo.clone(), - settings.clone(), - event_rx, - bcast_tx.clone(), - metadata_tx.clone(), - shutdown_listen, - )); + tokio::task::spawn(db::db_writer( + repo.clone(), + settings.clone(), + event_rx, + bcast_tx.clone(), + metadata_tx.clone(), + shutdown_listen, + )); info!("db writer created"); // create a nip-05 verifier thread; if enabled. if settings.verified_users.mode != VerifiedUsersMode::Disabled { - let verifier_opt = - nip05::Verifier::new(repo.clone(), metadata_rx, bcast_tx.clone(), settings.clone()); + let verifier_opt = nip05::Verifier::new( + repo.clone(), + metadata_rx, + bcast_tx.clone(), + settings.clone(), + ); if let Ok(mut v) = verifier_opt { if verified_users_active { tokio::task::spawn(async move { @@ -464,7 +460,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul Ok(()) => { info!("control message requesting shutdown"); controlled_shutdown.send(()).ok(); - }, + } Err(std::sync::mpsc::RecvError) => { trace!("shutdown requestor is disconnected (this is normal)"); } @@ -545,12 +541,13 @@ pub enum NostrMessage { /// Convert Message to `NostrMessage` fn convert_to_msg(msg: &str, max_bytes: Option) -> Result { - let parsed_res: Result = serde_json::from_str(msg).map_err(std::convert::Into::into); + let parsed_res: Result = + serde_json::from_str(msg).map_err(std::convert::Into::into); match parsed_res { Ok(m) => { if let NostrMessage::SubMsg(_) = m { // note; this only prints the first 16k of a REQ and then truncates. - trace!("REQ: {:?}",msg); + trace!("REQ: {:?}", msg); }; if let NostrMessage::EventMsg(_) = m { if let Some(max_size) = max_bytes { @@ -675,7 +672,7 @@ async fn nostr_server( loop { tokio::select! { _ = shutdown.recv() => { - metrics.disconnects.with_label_values(&["shutdown"]).inc(); + metrics.disconnects.with_label_values(&["shutdown"]).inc(); info!("Close connection down due to shutdown, client: {}, ip: {:?}, connected: {:?}", cid, conn.ip(), orig_start.elapsed()); // server shutting down, exit loop break; @@ -685,7 +682,7 @@ async fn nostr_server( // if it has been too long, disconnect if last_message_time.elapsed() > max_quiet_time { debug!("ending connection due to lack of client ping response"); - metrics.disconnects.with_label_values(&["timeout"]).inc(); + metrics.disconnects.with_label_values(&["timeout"]).inc(); break; } // Send a ping @@ -702,7 +699,7 @@ async fn nostr_server( ws_stream.send(Message::Text(send_str)).await.ok(); } else { client_received_event_count += 1; - metrics.sent_events.with_label_values(&["db"]).inc(); + metrics.sent_events.with_label_values(&["db"]).inc(); // send a result let send_str = format!("[\"EVENT\",\"{}\",{}]", subesc, &query_result.event); ws_stream.send(Message::Text(send_str)).await.ok(); @@ -724,7 +721,7 @@ async fn nostr_server( global_event.get_event_id_prefix()); // create an event response and send it let subesc = s.replace('"', ""); - metrics.sent_events.with_label_values(&["realtime"]).inc(); + metrics.sent_events.with_label_values(&["realtime"]).inc(); ws_stream.send(Message::Text(format!("[\"EVENT\",\"{subesc}\",{event_str}]"))).await.ok(); } else { warn!("could not serialize event: {:?}", global_event.get_event_id_prefix()); @@ -760,20 +757,20 @@ async fn nostr_server( WsError::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake))) => { debug!("websocket close from client (cid: {}, ip: {:?})",cid, conn.ip()); - metrics.disconnects.with_label_values(&["normal"]).inc(); + metrics.disconnects.with_label_values(&["normal"]).inc(); break; }, Some(Err(WsError::Io(e))) => { // IO errors are considered fatal warn!("IO error (cid: {}, ip: {:?}): {:?}", cid, conn.ip(), e); - metrics.disconnects.with_label_values(&["error"]).inc(); + metrics.disconnects.with_label_values(&["error"]).inc(); break; } x => { // default condition on error is to close the client connection info!("unknown error (cid: {}, ip: {:?}): {:?} (closing conn)", cid, conn.ip(), x); - metrics.disconnects.with_label_values(&["error"]).inc(); + metrics.disconnects.with_label_values(&["error"]).inc(); break; } @@ -786,6 +783,7 @@ async fn nostr_server( // handle each type of message let evid = ec.event_id().to_owned(); let parsed : Result = Result::::from(ec); + metrics.cmd_event.inc(); match parsed { Ok(WrappedEvent(e)) => { metrics.cmd_event.inc(); @@ -866,7 +864,7 @@ async fn nostr_server( if conn.has_subscription(&s) { info!("client sent duplicate subscription, ignoring (cid: {}, sub: {:?})", cid, s.id); } else { - metrics.cmd_req.inc(); + metrics.cmd_req.inc(); if let Some(ref lim) = sub_lim_opt { lim.until_ready_with_jitter(jitter).await; } @@ -893,7 +891,7 @@ async fn nostr_server( // closing a request simply removes the subscription. let parsed : Result = Result::::from(cc); if let Ok(c) = parsed { - metrics.cmd_close.inc(); + metrics.cmd_close.inc(); // check if a query is currently // running, and remove it if so. let stop_tx = running_queries.remove(&c.id); @@ -943,17 +941,16 @@ async fn nostr_server( #[derive(Clone)] pub struct NostrMetrics { - pub query_sub: Histogram, // response time of successful subscriptions - pub query_db: Histogram, // individual database query execution time - pub db_connections: IntGauge, // database connections in use - pub write_events: Histogram, // response time of event writes - pub sent_events: IntCounterVec, // count of events sent to clients - pub connections: IntCounter, // count of websocket connections - pub disconnects: IntCounterVec, // client disconnects + pub query_sub: Histogram, // response time of successful subscriptions + pub query_db: Histogram, // individual database query execution time + pub db_connections: IntGauge, // database connections in use + pub write_events: Histogram, // response time of event writes + pub sent_events: IntCounterVec, // count of events sent to clients + pub connections: IntCounter, // count of websocket connections + pub disconnects: IntCounterVec, // client disconnects pub query_aborts: IntCounterVec, // count of queries aborted by server - pub cmd_req: IntCounter, // count of REQ commands received - pub cmd_event: IntCounter, // count of EVENT commands received - pub cmd_close: IntCounter, // count of CLOSE commands received - pub cmd_auth: IntCounter, // count of AUTH commands received - + pub cmd_req: IntCounter, // count of REQ commands received + pub cmd_event: IntCounter, // count of EVENT commands received + pub cmd_close: IntCounter, // count of CLOSE commands received + pub cmd_auth: IntCounter, // count of AUTH commands received } diff --git a/src/subscription.rs b/src/subscription.rs index 55dd757..63d1f68 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -45,7 +45,8 @@ pub struct ReqFilter { impl Serialize for ReqFilter { fn serialize(&self, serializer: S) -> Result - where S:Serializer, + where + S: Serializer, { let mut map = serializer.serialize_map(None)?; if let Some(ids) = &self.ids { @@ -68,8 +69,8 @@ impl Serialize for ReqFilter { } // serialize tags if let Some(tags) = &self.tags { - for (k,v) in tags { - let vals:Vec<&String> = v.iter().collect(); + for (k, v) in tags { + let vals: Vec<&String> = v.iter().collect(); map.serialize_entry(&format!("#{k}"), &vals)?; } } @@ -105,15 +106,16 @@ impl<'de> Deserialize<'de> for ReqFilter { for (key, val) in filter { // ids if key == "ids" { - let raw_ids: Option>= Deserialize::deserialize(val).ok(); + let raw_ids: Option> = Deserialize::deserialize(val).ok(); if let Some(a) = raw_ids.as_ref() { if a.contains(&empty_string) { return Err(serde::de::Error::invalid_type( Unexpected::Other("prefix matches must not be empty strings"), - &"a json object")); + &"a json object", + )); } } - rf.ids =raw_ids; + rf.ids = raw_ids; } else if key == "kinds" { rf.kinds = Deserialize::deserialize(val).ok(); } else if key == "since" { @@ -123,12 +125,13 @@ impl<'de> Deserialize<'de> for ReqFilter { } else if key == "limit" { rf.limit = Deserialize::deserialize(val).ok(); } else if key == "authors" { - let raw_authors: Option>= Deserialize::deserialize(val).ok(); + let raw_authors: Option> = Deserialize::deserialize(val).ok(); if let Some(a) = raw_authors.as_ref() { if a.contains(&empty_string) { return Err(serde::de::Error::invalid_type( Unexpected::Other("prefix matches must not be empty strings"), - &"a json object")); + &"a json object", + )); } } rf.authors = raw_authors; @@ -232,19 +235,22 @@ impl<'de> Deserialize<'de> for Subscription { impl Subscription { /// Get a copy of the subscription identifier. - #[must_use] pub fn get_id(&self) -> String { + #[must_use] + pub fn get_id(&self) -> String { self.id.clone() } /// Determine if any filter is requesting historical (database) /// queries. If every filter has limit:0, we do not need to query the DB. - #[must_use] pub fn needs_historical_events(&self) -> bool { - self.filters.iter().any(|f| f.limit!=Some(0)) + #[must_use] + pub fn needs_historical_events(&self) -> bool { + self.filters.iter().any(|f| f.limit != Some(0)) } /// Determine if this subscription matches a given [`Event`]. Any /// individual filter match is sufficient. - #[must_use] pub fn interested_in_event(&self, event: &Event) -> bool { + #[must_use] + pub fn interested_in_event(&self, event: &Event) -> bool { for f in &self.filters { if f.interested_in_event(event) { return true; @@ -305,13 +311,12 @@ impl ReqFilter { /// Check if this filter either matches, or does not care about the kind. fn kind_match(&self, kind: u64) -> bool { - self.kinds - .as_ref() - .map_or(true, |ks| ks.contains(&kind)) + self.kinds.as_ref().map_or(true, |ks| ks.contains(&kind)) } /// Determine if all populated fields in this filter match the provided event. - #[must_use] pub fn interested_in_event(&self, event: &Event) -> bool { + #[must_use] + pub fn interested_in_event(&self, event: &Event) -> bool { // self.id.as_ref().map(|v| v == &event.id).unwrap_or(true) self.ids_match(event) && self.since.map_or(true, |t| event.created_at > t) @@ -625,7 +630,9 @@ mod tests { #[test] fn serialize_filter() -> Result<()> { - let s: Subscription = serde_json::from_str(r##"["REQ","xyz",{"authors":["abc", "bcd"], "since": 10, "until": 20, "limit":100, "#e": ["foo", "bar"], "#d": ["test"]}]"##)?; + let s: Subscription = serde_json::from_str( + r##"["REQ","xyz",{"authors":["abc", "bcd"], "since": 10, "until": 20, "limit":100, "#e": ["foo", "bar"], "#d": ["test"]}]"##, + )?; let f = s.filters.get(0); let serialized = serde_json::to_string(&f)?; let serialized_wrapped = format!(r##"["REQ", "xyz",{}]"##, serialized); diff --git a/src/utils.rs b/src/utils.rs index 3cceae7..c080aea 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -4,7 +4,8 @@ use std::time::SystemTime; use url::Url; /// Seconds since 1970. -#[must_use] pub fn unix_time() -> u64 { +#[must_use] +pub fn unix_time() -> u64 { SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .map(|x| x.as_secs()) @@ -12,7 +13,8 @@ use url::Url; } /// Check if a string contains only hex characters. -#[must_use] pub fn is_hex(s: &str) -> bool { +#[must_use] +pub fn is_hex(s: &str) -> bool { s.chars().all(|x| char::is_ascii_hexdigit(&x)) } @@ -28,7 +30,8 @@ pub fn nip19_to_hex(s: &str) -> Result { } /// Check if a string contains only lower-case hex chars. -#[must_use] pub fn is_lower_hex(s: &str) -> bool { +#[must_use] +pub fn is_lower_hex(s: &str) -> bool { s.chars().all(|x| { (char::is_ascii_lowercase(&x) || char::is_ascii_digit(&x)) && char::is_ascii_hexdigit(&x) }) diff --git a/tests/cli.rs b/tests/cli.rs index cfee3aa..95d334a 100644 --- a/tests/cli.rs +++ b/tests/cli.rs @@ -7,4 +7,4 @@ mod tests { use clap::CommandFactory; CLIArgs::command().debug_assert(); } -} \ No newline at end of file +}