diff --git a/src/config.rs b/src/config.rs index 47be07c..f2e5530 100644 --- a/src/config.rs +++ b/src/config.rs @@ -170,9 +170,9 @@ impl Settings { fn new_from_default(default: &Settings) -> Result { let builder = Config::builder(); let config: Config = builder - // use defaults + // use defaults .add_source(Config::try_from(default)?) - // override with file contents + // override with file contents .add_source(File::with_name("config.toml")) .build()?; let mut settings: Settings = config.try_deserialize()?; @@ -207,7 +207,7 @@ impl Default for Settings { diagnostics: Diagnostics { tracing: false }, database: Database { data_directory: ".".to_owned(), - engine: "sqlite".to_owned(), + engine: "sqlite".to_owned(), in_memory: false, min_conn: 4, max_conn: 8, diff --git a/src/db.rs b/src/db.rs index f09a5d5..8fc4ae6 100644 --- a/src/db.rs +++ b/src/db.rs @@ -117,7 +117,7 @@ pub async fn db_writer( debug!( "rejecting event: {}, blacklisted kind: {}", &event.get_event_id_prefix(), - &event.kind + &event.kind ); notice_tx .try_send(Notice::blocked( @@ -138,51 +138,51 @@ pub async fn db_writer( metadata_tx.send(event.clone()).ok(); } - // check for NIP-05 verification + // check for NIP-05 verification if nip05_enabled { - match repo.get_latest_user_verification(&event.pubkey).await { - Ok(uv) => { - if uv.is_valid(&settings.verified_users) { - info!( - "new event from verified author ({:?},{:?})", - uv.name.to_string(), - event.get_author_prefix() - ); - } else { - info!( - "rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)", - uv.name.to_string(), - event.get_author_prefix() - ); - notice_tx - .try_send(Notice::blocked( - event.id, - "NIP-05 verification is no longer valid (expired/wrong domain)", - )) - .ok(); - continue; - } - } - Err(Error::SqlError(rusqlite::Error::QueryReturnedNoRows)) => { - debug!( - "no verification records found for pubkey: {:?}", + match repo.get_latest_user_verification(&event.pubkey).await { + Ok(uv) => { + if uv.is_valid(&settings.verified_users) { + info!( + "new event from verified author ({:?},{:?})", + uv.name.to_string(), + event.get_author_prefix() + ); + } else { + info!( + "rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)", + uv.name.to_string(), event.get_author_prefix() ); notice_tx .try_send(Notice::blocked( event.id, - "NIP-05 verification needed to publish events", + "NIP-05 verification is no longer valid (expired/wrong domain)", )) .ok(); continue; } - Err(e) => { - warn!("checking nip05 verification status failed: {:?}", e); - continue; - } + } + Err(Error::SqlError(rusqlite::Error::QueryReturnedNoRows)) => { + debug!( + "no verification records found for pubkey: {:?}", + event.get_author_prefix() + ); + notice_tx + .try_send(Notice::blocked( + event.id, + "NIP-05 verification needed to publish events", + )) + .ok(); + continue; + } + Err(e) => { + warn!("checking nip05 verification status failed: {:?}", e); + continue; } } - // TODO: cache recent list of authors to remove a DB call. + } + // TODO: cache recent list of authors to remove a DB call. let start = Instant::now(); if event.kind >= 20000 && event.kind < 30000 { bcast_tx.send(event.clone()).ok(); @@ -203,7 +203,7 @@ pub async fn db_writer( info!( "persisted event: {:?} (kind: {}) from: {:?} in: {:?}", event.get_event_id_prefix(), - event.kind, + event.kind, event.get_author_prefix(), start.elapsed() ); diff --git a/src/event.rs b/src/event.rs index 88765c6..05d2f93 100644 --- a/src/event.rs +++ b/src/event.rs @@ -88,14 +88,14 @@ impl From for Result { fn from(ec: EventCmd) -> Result { // ensure command is correct if ec.cmd == "EVENT" { - ec.event.validate().map(|_| { + ec.event.validate().map(|_| { let mut e = ec.event; e.build_index(); e.update_delegation(); e }) } else { - Err(CommandUnknownError) + Err(CommandUnknownError) } } } @@ -122,7 +122,7 @@ impl Event { /// Should this event be replaced with newer timestamps from same author? #[must_use] pub fn is_replaceable(&self) -> bool { - self.kind == 0 || self.kind == 3 || self.kind == 41 || (self.kind >= 10000 && self.kind < 20000) + self.kind == 0 || self.kind == 3 || self.kind == 41 || (self.kind >= 10000 && self.kind < 20000) } /// Pull a NIP-05 Name out of the event, if one exists @@ -359,7 +359,7 @@ mod tests { fn empty_event_tag_match() { let event = Event::simple_event(); assert!(!event - .generic_tag_val_intersect('e', &HashSet::from(["foo".to_owned(), "bar".to_owned()]))); + .generic_tag_val_intersect('e', &HashSet::from(["foo".to_owned(), "bar".to_owned()]))); } #[test] @@ -506,13 +506,13 @@ mod tests { #[test] fn replaceable_event() { - let mut event = Event::simple_event(); - event.kind=0; - assert!(event.is_replaceable()); - event.kind=3; - assert!(event.is_replaceable()); - event.kind=12000; - assert!(event.is_replaceable()); + let mut event = Event::simple_event(); + event.kind=0; + assert!(event.is_replaceable()); + event.kind=3; + assert!(event.is_replaceable()); + event.kind=12000; + assert!(event.is_replaceable()); } diff --git a/src/hexrange.rs b/src/hexrange.rs index fa9742b..b0566ff 100644 --- a/src/hexrange.rs +++ b/src/hexrange.rs @@ -57,8 +57,9 @@ fn is_all_fs(s: &str) -> bool { } else if odd { // check if first char in this byte is NOT 'f' if b < 240 { - upper[byte_len] = b + 16; // bump up the first character in this byte - // increment done, stop iterating through the vec + // bump up the first character in this byte + upper[byte_len] = b + 16; + // increment done, stop iterating through the vec break; } // if it is 'f', reset the byte to 0 and do a carry diff --git a/src/main.rs b/src/main.rs index 35dc094..cb24783 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,11 +7,8 @@ use std::sync::mpsc as syncmpsc; use std::sync::mpsc::{Receiver as MpscReceiver, Sender as MpscSender}; use std::thread; use tracing::info; - - use console_subscriber::ConsoleLayer; - /// Start running a Nostr relay server. fn main() { // configure settings from config.toml @@ -23,8 +20,8 @@ fn main() { // enable tracing with tokio-console ConsoleLayer::builder().with_default_env().init(); } else { - // standard logging - tracing_subscriber::fmt::try_init().unwrap(); + // standard logging + tracing_subscriber::fmt::try_init().unwrap(); } info!("Starting up from main"); diff --git a/src/nip05.rs b/src/nip05.rs index 3247be7..d5a1dd4 100644 --- a/src/nip05.rs +++ b/src/nip05.rs @@ -58,8 +58,8 @@ impl Nip05Name { "https://{}/.well-known/nostr.json?name={}", self.domain, self.local ) - .parse::() - .ok() + .parse::() + .ok() } } @@ -70,7 +70,7 @@ impl std::convert::TryFrom<&str> for Nip05Name { // break full name at the @ boundary. let components: Vec<&str> = inet.split('@').collect(); if components.len() == 2 { - // check if local name is valid + // check if local name is valid let local = components[0]; let domain = components[1]; if local.chars().all(|x| x.is_alphanumeric() || x == '_' || x == '-' || x == '.') { @@ -93,7 +93,7 @@ impl std::convert::TryFrom<&str> for Nip05Name { )) } } else { - Err(Error::CustomError("too many/few components".to_owned())) + Err(Error::CustomError("too many/few components".to_owned())) } } } @@ -122,7 +122,7 @@ fn body_contains_user(username: &str, address: &str, bytes: &hyper::body::Bytes) impl Verifier { pub fn new( - repo: Arc, + repo: Arc, metadata_rx: tokio::sync::broadcast::Receiver, event_tx: tokio::sync::broadcast::Sender, settings: crate::config::Settings, @@ -143,7 +143,7 @@ impl Verifier { // duration. let reverify_interval = tokio::time::interval(http_wait_duration); Ok(Verifier { - repo, + repo, metadata_rx, event_tx, settings, @@ -343,35 +343,35 @@ impl Verifier { // timestamp. self.repo.update_verification_timestamp(v.rowid) .await?; - info!("verification updated for {}", v.to_string()); + info!("verification updated for {}", v.to_string()); } UserWebVerificationStatus::DomainNotAllowed - | UserWebVerificationStatus::Unknown => { - // server may be offline, or temporarily - // blocked by the config file. Note the - // failure so we can process something - // else. + | UserWebVerificationStatus::Unknown => { + // server may be offline, or temporarily + // blocked by the config file. Note the + // failure so we can process something + // else. - // have we had enough failures to give up? - if v.failure_count >= max_failures as u64 { - info!( - "giving up on verifying {:?} after {} failures", - v.name, v.failure_count - ); - self.repo.delete_verification(v.rowid) - .await?; - } else { - // record normal failure, incrementing failure count - info!("verification failed for {}", v.to_string()); - self.repo.fail_verification(v.rowid).await?; - } - } + // have we had enough failures to give up? + if v.failure_count >= max_failures as u64 { + info!( + "giving up on verifying {:?} after {} failures", + v.name, v.failure_count + ); + self.repo.delete_verification(v.rowid) + .await?; + } else { + // record normal failure, incrementing failure count + info!("verification failed for {}", v.to_string()); + self.repo.fail_verification(v.rowid).await?; + } + } UserWebVerificationStatus::Unverified => { // domain has removed the verification, drop // the record on our side. - info!("verification rescinded for {}", v.to_string()); - self.repo.delete_verification(v.rowid) + info!("verification rescinded for {}", v.to_string()); + self.repo.delete_verification(v.rowid) .await?; } } @@ -405,27 +405,27 @@ impl Verifier { // disabled/passive, the event has already been persisted. let should_write_event = self.settings.verified_users.is_enabled(); if should_write_event { - match self.repo.write_event(event).await { - Ok(updated) => { - if updated != 0 { - info!( - "persisted event (new verified pubkey): {:?} in {:?}", - event.get_event_id_prefix(), - start.elapsed() - ); - self.event_tx.send(event.clone()).ok(); - } - } - Err(err) => { - warn!("event insert failed: {:?}", err); - if let Error::SqlError(r) = err { - warn!("because: : {:?}", r); - } - } - } + match self.repo.write_event(event).await { + Ok(updated) => { + if updated != 0 { + info!( + "persisted event (new verified pubkey): {:?} in {:?}", + event.get_event_id_prefix(), + start.elapsed() + ); + self.event_tx.send(event.clone()).ok(); + } + } + Err(err) => { + warn!("event insert failed: {:?}", err); + if let Error::SqlError(r) = err { + warn!("because: : {:?}", r); + } + } + } } // write the verification record - self.repo.create_verification_record(&event.id, name).await?; + self.repo.create_verification_record(&event.id, name).await?; Ok(()) } } diff --git a/src/repo/sqlite.rs b/src/repo/sqlite.rs index c610812..2a3be02 100644 --- a/src/repo/sqlite.rs +++ b/src/repo/sqlite.rs @@ -75,147 +75,147 @@ impl SqliteRepo { false, ); - // this is used to block new reads during critical checkpoints - let checkpoint_in_progress = Arc::new(Mutex::new(0)); - // SQLite can only effectively write single threaded, so don't - // block multiple worker threads unnecessarily. - let write_in_progress = Arc::new(Mutex::new(0)); + // this is used to block new reads during critical checkpoints + let checkpoint_in_progress = Arc::new(Mutex::new(0)); + // SQLite can only effectively write single threaded, so don't + // block multiple worker threads unnecessarily. + let write_in_progress = Arc::new(Mutex::new(0)); SqliteRepo { read_pool, - write_pool, - maint_pool, - checkpoint_in_progress, - write_in_progress, + write_pool, + maint_pool, + checkpoint_in_progress, + write_in_progress, } } /// Persist an event to the database, returning rows added. pub fn persist_event(conn: &mut PooledConnection, e: &Event) -> Result { - // enable auto vacuum - conn.execute_batch("pragma auto_vacuum = FULL")?; + // enable auto vacuum + conn.execute_batch("pragma auto_vacuum = FULL")?; - // start transaction - let tx = conn.transaction()?; - // get relevant fields from event and convert to blobs. - let id_blob = hex::decode(&e.id).ok(); - let pubkey_blob: Option> = hex::decode(&e.pubkey).ok(); - let delegator_blob: Option> = e.delegated_by.as_ref().and_then(|d| hex::decode(d).ok()); - let event_str = serde_json::to_string(&e).ok(); - // check for replaceable events that would hide this one; we won't even attempt to insert these. - if e.is_replaceable() { + // start transaction + let tx = conn.transaction()?; + // get relevant fields from event and convert to blobs. + let id_blob = hex::decode(&e.id).ok(); + let pubkey_blob: Option> = hex::decode(&e.pubkey).ok(); + let delegator_blob: Option> = e.delegated_by.as_ref().and_then(|d| hex::decode(d).ok()); + let event_str = serde_json::to_string(&e).ok(); + // check for replaceable events that would hide this one; we won't even attempt to insert these. + if e.is_replaceable() { let repl_count = tx.query_row( - "SELECT e.id FROM event e INDEXED BY author_index WHERE e.author=? AND e.kind=? AND e.created_at > ? LIMIT 1;", - params![pubkey_blob, e.kind, e.created_at], |row| row.get::(0)); - if repl_count.ok().is_some() { - return Ok(0); - } - } - // ignore if the event hash is a duplicate. - let mut ins_count = tx.execute( + "SELECT e.id FROM event e INDEXED BY author_index WHERE e.author=? AND e.kind=? AND e.created_at > ? LIMIT 1;", + params![pubkey_blob, e.kind, e.created_at], |row| row.get::(0)); + if repl_count.ok().is_some() { + return Ok(0); + } + } + // ignore if the event hash is a duplicate. + let mut ins_count = tx.execute( "INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, delegated_by, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%s','now'), FALSE);", params![id_blob, e.created_at, e.kind, pubkey_blob, delegator_blob, event_str] - )? as u64; - if ins_count == 0 { + )? as u64; + if ins_count == 0 { // if the event was a duplicate, no need to insert event or // pubkey references. tx.rollback().ok(); return Ok(ins_count); - } - // remember primary key of the event most recently inserted. - let ev_id = tx.last_insert_rowid(); - // add all tags to the tag table - for tag in &e.tags { + } + // remember primary key of the event most recently inserted. + let ev_id = tx.last_insert_rowid(); + // add all tags to the tag table + for tag in &e.tags { // ensure we have 2 values. if tag.len() >= 2 { - let tagname = &tag[0]; - let tagval = &tag[1]; - // only single-char tags are searchable - let tagchar_opt = single_char_tagname(tagname); - match &tagchar_opt { + let tagname = &tag[0]; + let tagval = &tag[1]; + // only single-char tags are searchable + let tagchar_opt = single_char_tagname(tagname); + match &tagchar_opt { Some(_) => { - // if tagvalue is lowercase hex; - if is_lower_hex(tagval) && (tagval.len() % 2 == 0) { + // if tagvalue is lowercase hex; + if is_lower_hex(tagval) && (tagval.len() % 2 == 0) { tx.execute( - "INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)", - params![ev_id, &tagname, hex::decode(tagval).ok()], + "INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)", + params![ev_id, &tagname, hex::decode(tagval).ok()], )?; - } else { + } else { tx.execute( - "INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)", - params![ev_id, &tagname, &tagval], + "INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)", + params![ev_id, &tagname, &tagval], )?; - } + } } None => {} - } + } } - } - // if this event is replaceable update, remove other replaceable - // event with the same kind from the same author that was issued - // earlier than this. - if e.is_replaceable() { - let author = hex::decode(&e.pubkey).ok(); - // this is a backwards check - hide any events that were older. + } + // if this event is replaceable update, remove other replaceable + // event with the same kind from the same author that was issued + // earlier than this. + if e.is_replaceable() { + let author = hex::decode(&e.pubkey).ok(); + // this is a backwards check - hide any events that were older. let update_count = tx.execute( - "DELETE FROM event WHERE kind=? and author=? and id NOT IN (SELECT id FROM event INDEXED BY author_kind_index WHERE kind=? AND author=? ORDER BY created_at DESC LIMIT 1)", - params![e.kind, author, e.kind, author], + "DELETE FROM event WHERE kind=? and author=? and id NOT IN (SELECT id FROM event INDEXED BY author_kind_index WHERE kind=? AND author=? ORDER BY created_at DESC LIMIT 1)", + params![e.kind, author, e.kind, author], )?; if update_count > 0 { - info!( + info!( "removed {} older replaceable kind {} events for author: {:?}", update_count, e.kind, e.get_author_prefix() - ); + ); } - } - // if this event is a deletion, hide the referenced events from the same author. - if e.kind == 5 { + } + // if this event is a deletion, hide the referenced events from the same author. + if e.kind == 5 { let event_candidates = e.tag_values_by_name("e"); // first parameter will be author let mut params: Vec> = vec![Box::new(hex::decode(&e.pubkey)?)]; event_candidates - .iter() - .filter(|x| is_hex(x) && x.len() == 64) - .filter_map(|x| hex::decode(x).ok()) - .for_each(|x| params.push(Box::new(x))); + .iter() + .filter(|x| is_hex(x) && x.len() == 64) + .filter_map(|x| hex::decode(x).ok()) + .for_each(|x| params.push(Box::new(x))); let query = format!( - "UPDATE event SET hidden=TRUE WHERE kind!=5 AND author=? AND event_hash IN ({})", - repeat_vars(params.len() - 1) + "UPDATE event SET hidden=TRUE WHERE kind!=5 AND author=? AND event_hash IN ({})", + repeat_vars(params.len() - 1) ); let mut stmt = tx.prepare(&query)?; let update_count = stmt.execute(rusqlite::params_from_iter(params))?; info!( - "hid {} deleted events for author {:?}", - update_count, - e.get_author_prefix() + "hid {} deleted events for author {:?}", + update_count, + e.get_author_prefix() ); - } else { + } else { // check if a deletion has already been recorded for this event. // Only relevant for non-deletion events let del_count = tx.query_row( - "SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND t.name='e' AND e.kind=5 AND t.value_hex=? LIMIT 1;", - params![pubkey_blob, id_blob], |row| row.get::(0)); + "SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND t.name='e' AND e.kind=5 AND t.value_hex=? LIMIT 1;", + params![pubkey_blob, id_blob], |row| row.get::(0)); // check if a the query returned a result, meaning we should // hid the current event if del_count.ok().is_some() { - // a deletion already existed, mark original event as hidden. - info!( + // a deletion already existed, mark original event as hidden. + info!( "hid event: {:?} due to existing deletion by author: {:?}", e.get_event_id_prefix(), e.get_author_prefix() - ); - let _update_count = + ); + let _update_count = tx.execute("UPDATE event SET hidden=TRUE WHERE id=?", params![ev_id])?; - // event was deleted, so let caller know nothing new - // arrived, preventing this from being sent to active - // subscriptions - ins_count = 0; + // event was deleted, so let caller know nothing new + // arrived, preventing this from being sent to active + // subscriptions + ins_count = 0; } - } - tx.commit()?; - Ok(ins_count) + } + tx.commit()?; + Ok(ins_count) } } @@ -223,25 +223,25 @@ impl SqliteRepo { impl NostrRepo for SqliteRepo { async fn start(&self) -> Result<()> { - db_checkpoint_task(self.maint_pool.clone(), Duration::from_secs(60), self.checkpoint_in_progress.clone()).await + db_checkpoint_task(self.maint_pool.clone(), Duration::from_secs(60), self.checkpoint_in_progress.clone()).await } async fn migrate_up(&self) -> Result { - let _write_guard = self.write_in_progress.lock().await; - let mut conn = self.write_pool.get()?; - task::spawn_blocking(move || { - upgrade_db(&mut conn) - }).await? + let _write_guard = self.write_in_progress.lock().await; + let mut conn = self.write_pool.get()?; + task::spawn_blocking(move || { + upgrade_db(&mut conn) + }).await? } /// Persist event to database async fn write_event(&self, e: &Event) -> Result { - let _write_guard = self.write_in_progress.lock().await; - // spawn a blocking thread - let mut conn = self.write_pool.get()?; - let e = e.clone(); - task::spawn_blocking(move || { - SqliteRepo::persist_event(&mut conn, &e) - }).await? + let _write_guard = self.write_in_progress.lock().await; + // spawn a blocking thread + let mut conn = self.write_pool.get()?; + let e = e.clone(); + task::spawn_blocking(move || { + SqliteRepo::persist_event(&mut conn, &e) + }).await? } /// Perform a database query using a subscription. @@ -257,28 +257,28 @@ impl NostrRepo for SqliteRepo { query_tx: tokio::sync::mpsc::Sender, mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>, ) -> Result<()> { - let pre_spawn_start = Instant::now(); - let self=self.clone(); - task::spawn_blocking(move || { - { - // if we are waiting on a checkpoint, stop until it is complete - let _x = self.checkpoint_in_progress.blocking_lock(); - } + let pre_spawn_start = Instant::now(); + let self=self.clone(); + task::spawn_blocking(move || { + { + // if we are waiting on a checkpoint, stop until it is complete + let _x = self.checkpoint_in_progress.blocking_lock(); + } let db_queue_time = pre_spawn_start.elapsed(); // if the queue time was very long (>5 seconds), spare the DB and abort. if db_queue_time > Duration::from_secs(5) { - info!( + info!( "shedding DB query load queued for {:?} (cid: {}, sub: {:?})", db_queue_time, client_id, sub.id - ); - return Ok(()); + ); + return Ok(()); } // otherwise, report queuing time if it is slow else if db_queue_time > Duration::from_secs(1) { - debug!( + debug!( "(slow) DB query queued for {:?} (cid: {}, sub: {:?})", db_queue_time, client_id, sub.id - ); + ); } let start = Instant::now(); let mut row_count: usize = 0; @@ -287,7 +287,7 @@ impl NostrRepo for SqliteRepo { let sql_gen_elapsed = start.elapsed(); if sql_gen_elapsed > Duration::from_millis(10) { - debug!("SQL (slow) generated in {:?}", start.elapsed()); + debug!("SQL (slow) generated in {:?}", start.elapsed()); } // cutoff for displaying slow queries let slow_cutoff = Duration::from_millis(2000); @@ -298,69 +298,69 @@ impl NostrRepo for SqliteRepo { let mut slow_first_event; let mut last_successful_send = Instant::now(); if let Ok(mut conn) = self.read_pool.get() { - // execute the query. - // make the actual SQL query (with parameters inserted) available - conn.trace(Some(|x| {trace!("SQL trace: {:?}", x)})); - let mut stmt = conn.prepare_cached(&q)?; - let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?; + // execute the query. + // make the actual SQL query (with parameters inserted) available + conn.trace(Some(|x| {trace!("SQL trace: {:?}", x)})); + let mut stmt = conn.prepare_cached(&q)?; + let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?; - let mut first_result = true; - while let Some(row) = event_rows.next()? { + let mut first_result = true; + while let Some(row) = event_rows.next()? { let first_event_elapsed = start.elapsed(); slow_first_event = first_event_elapsed >= slow_cutoff; if first_result { - debug!( + debug!( "first result in {:?} (cid: {}, sub: {:?}) [used indexes: {:?}]", first_event_elapsed, client_id, sub.id, idxs - ); - first_result = false; + ); + first_result = false; } // logging for slow queries; show sub and SQL. // to reduce logging; only show 1/16th of clients (leading 0) if row_count == 0 && slow_first_event && client_id.starts_with('0') { - debug!( + debug!( "query req (slow): {:?} (cid: {}, sub: {:?})", sub, client_id, sub.id - ); - } - // check if a checkpoint is trying to run, and abort - if row_count % 100 == 0 { - { - if self.checkpoint_in_progress.try_lock().is_err() { - // lock was held, abort this query - debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id); - return Ok(()); - } - } - } + ); + } + // check if a checkpoint is trying to run, and abort + if row_count % 100 == 0 { + { + if self.checkpoint_in_progress.try_lock().is_err() { + // lock was held, abort this query + debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id); + return Ok(()); + } + } + } // check if this is still active; every 100 rows if row_count % 100 == 0 && abandon_query_rx.try_recv().is_ok() { - debug!("query aborted (cid: {}, sub: {:?})", client_id, sub.id); - return Ok(()); + debug!("query aborted (cid: {}, sub: {:?})", client_id, sub.id); + return Ok(()); } row_count += 1; let event_json = row.get(0)?; loop { - if query_tx.capacity() != 0 { + if query_tx.capacity() != 0 { // we have capacity to add another item break; - } + } // the queue is full trace!("db reader thread is stalled"); if last_successful_send + abort_cutoff < Instant::now() { - // the queue has been full for too long, abort - info!("aborting database query due to slow client (cid: {}, sub: {:?})", - client_id, sub.id); - let ok: Result<()> = Ok(()); - return ok; + // the queue has been full for too long, abort + info!("aborting database query due to slow client (cid: {}, sub: {:?})", + client_id, sub.id); + let ok: Result<()> = Ok(()); + return ok; + } + // check if a checkpoint is trying to run, and abort + if self.checkpoint_in_progress.try_lock().is_err() { + // lock was held, abort this query + debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id); + return Ok(()); } - // check if a checkpoint is trying to run, and abort - if self.checkpoint_in_progress.try_lock().is_err() { - // lock was held, abort this query - debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id); - return Ok(()); - } // give the queue a chance to clear before trying again thread::sleep(Duration::from_millis(100)); } @@ -368,79 +368,79 @@ impl NostrRepo for SqliteRepo { // getting the query result back as part of the error // result. query_tx - .blocking_send(QueryResult { + .blocking_send(QueryResult { sub_id: sub.get_id(), event: event_json, - }) - .ok(); + }) + .ok(); last_successful_send = Instant::now(); - } - query_tx + } + query_tx .blocking_send(QueryResult { - sub_id: sub.get_id(), - event: "EOSE".to_string(), + sub_id: sub.get_id(), + event: "EOSE".to_string(), }) .ok(); - debug!( + debug!( "query completed in {:?} (cid: {}, sub: {:?}, db_time: {:?}, rows: {})", pre_spawn_start.elapsed(), client_id, sub.id, start.elapsed(), row_count - ); + ); } else { - warn!("Could not get a database connection for querying"); + warn!("Could not get a database connection for querying"); } let ok: Result<()> = Ok(()); ok - }); - Ok(()) + }); + Ok(()) } /// Perform normal maintenance async fn optimize_db(&self) -> Result<()> { - let conn = self.write_pool.get()?; - task::spawn_blocking(move || { - let start = Instant::now(); - conn.execute_batch("PRAGMA optimize;").ok(); - info!("optimize ran in {:?}", start.elapsed()); - }).await?; - Ok(()) + let conn = self.write_pool.get()?; + task::spawn_blocking(move || { + let start = Instant::now(); + conn.execute_batch("PRAGMA optimize;").ok(); + info!("optimize ran in {:?}", start.elapsed()); + }).await?; + Ok(()) } /// Create a new verification record connected to a specific event async fn create_verification_record(&self, event_id: &str, name: &str) -> Result<()> { - let e = hex::decode(event_id).ok(); - let n = name.to_owned(); - let mut conn = self.write_pool.get()?; - tokio::task::spawn_blocking(move || { + let e = hex::decode(event_id).ok(); + let n = name.to_owned(); + let mut conn = self.write_pool.get()?; + tokio::task::spawn_blocking(move || { let tx = conn.transaction()?; { - // if we create a /new/ one, we should get rid of any old ones. or group the new ones by name and only consider the latest. - let query = "INSERT INTO user_verification (metadata_event, name, verified_at) VALUES ((SELECT id from event WHERE event_hash=?), ?, strftime('%s','now'));"; - let mut stmt = tx.prepare(query)?; - stmt.execute(params![e, n])?; - // get the row ID - let v_id = tx.last_insert_rowid(); - // delete everything else by this name - let del_query = "DELETE FROM user_verification WHERE name = ? AND id != ?;"; - let mut del_stmt = tx.prepare(del_query)?; - let count = del_stmt.execute(params![n,v_id])?; - if count > 0 { + // if we create a /new/ one, we should get rid of any old ones. or group the new ones by name and only consider the latest. + let query = "INSERT INTO user_verification (metadata_event, name, verified_at) VALUES ((SELECT id from event WHERE event_hash=?), ?, strftime('%s','now'));"; + let mut stmt = tx.prepare(query)?; + stmt.execute(params![e, n])?; + // get the row ID + let v_id = tx.last_insert_rowid(); + // delete everything else by this name + let del_query = "DELETE FROM user_verification WHERE name = ? AND id != ?;"; + let mut del_stmt = tx.prepare(del_query)?; + let count = del_stmt.execute(params![n,v_id])?; + if count > 0 { info!("removed {} old verification records for ({:?})", count, n); - } + } } tx.commit()?; info!("saved new verification record for ({:?})", n); let ok: Result<()> = Ok(()); ok - }).await? + }).await? } /// Update verification timestamp async fn update_verification_timestamp(&self, id: u64) -> Result<()> { - let mut conn = self.write_pool.get()?; + let mut conn = self.write_pool.get()?; tokio::task::spawn_blocking(move || { // add some jitter to the verification to prevent everything from stacking up together. let verif_time = now_jitter(600); @@ -462,8 +462,8 @@ impl NostrRepo for SqliteRepo { /// Update verification record as failed async fn fail_verification(&self, id: u64) -> Result<()> { - let mut conn = self.write_pool.get()?; - tokio::task::spawn_blocking(move || { + let mut conn = self.write_pool.get()?; + tokio::task::spawn_blocking(move || { // add some jitter to the verification to prevent everything from stacking up together. let fail_time = now_jitter(600); let tx = conn.transaction()?; @@ -481,8 +481,8 @@ impl NostrRepo for SqliteRepo { /// Delete verification record async fn delete_verification(&self, id: u64) -> Result<()> { - let mut conn = self.write_pool.get()?; - tokio::task::spawn_blocking(move || { + let mut conn = self.write_pool.get()?; + tokio::task::spawn_blocking(move || { let tx = conn.transaction()?; { let query = "DELETE FROM user_verification WHERE id=?;"; @@ -498,78 +498,78 @@ impl NostrRepo for SqliteRepo { /// Get the latest verification record for a given pubkey. async fn get_latest_user_verification(&self, pub_key: &str) -> Result { - let mut conn = self.read_pool.get()?; - let pub_key = pub_key.to_owned(); - tokio::task::spawn_blocking(move || { - let tx = conn.transaction()?; - let query = "SELECT v.id, v.name, e.event_hash, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v LEFT JOIN event e ON e.id=v.metadata_event WHERE e.author=? ORDER BY e.created_at DESC, v.verified_at DESC, v.failed_at DESC LIMIT 1;"; - let mut stmt = tx.prepare_cached(query)?; - let fields = stmt.query_row(params![hex::decode(&pub_key).ok()], |r| { - let rowid: u64 = r.get(0)?; - let rowname: String = r.get(1)?; - let eventid: Vec = r.get(2)?; - let created_at: u64 = r.get(3)?; - // create a tuple since we can't throw non-rusqlite errors in this closure - Ok(( - rowid, - rowname, - eventid, - created_at, - r.get(4).ok(), - r.get(5).ok(), - r.get(6)?, - )) - })?; - Ok(VerificationRecord { - rowid: fields.0, - name: Nip05Name::try_from(&fields.1[..])?, - address: pub_key, - event: hex::encode(fields.2), - event_created: fields.3, - last_success: fields.4, - last_failure: fields.5, - failure_count: fields.6, - }) - }).await? + let mut conn = self.read_pool.get()?; + let pub_key = pub_key.to_owned(); + tokio::task::spawn_blocking(move || { + let tx = conn.transaction()?; + let query = "SELECT v.id, v.name, e.event_hash, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v LEFT JOIN event e ON e.id=v.metadata_event WHERE e.author=? ORDER BY e.created_at DESC, v.verified_at DESC, v.failed_at DESC LIMIT 1;"; + let mut stmt = tx.prepare_cached(query)?; + let fields = stmt.query_row(params![hex::decode(&pub_key).ok()], |r| { + let rowid: u64 = r.get(0)?; + let rowname: String = r.get(1)?; + let eventid: Vec = r.get(2)?; + let created_at: u64 = r.get(3)?; + // create a tuple since we can't throw non-rusqlite errors in this closure + Ok(( + rowid, + rowname, + eventid, + created_at, + r.get(4).ok(), + r.get(5).ok(), + r.get(6)?, + )) + })?; + Ok(VerificationRecord { + rowid: fields.0, + name: Nip05Name::try_from(&fields.1[..])?, + address: pub_key, + event: hex::encode(fields.2), + event_created: fields.3, + last_success: fields.4, + last_failure: fields.5, + failure_count: fields.6, + }) + }).await? } /// Get oldest verification before timestamp async fn get_oldest_user_verification(&self, before: u64) -> Result { - let mut conn = self.read_pool.get()?; - tokio::task::spawn_blocking(move || { - let tx = conn.transaction()?; - let query = "SELECT v.id, v.name, e.event_hash, e.author, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v INNER JOIN event e ON e.id=v.metadata_event WHERE (v.verified_at < ? OR v.verified_at IS NULL) AND (v.failed_at < ? OR v.failed_at IS NULL) ORDER BY v.verified_at ASC, v.failed_at ASC LIMIT 1;"; - let mut stmt = tx.prepare_cached(query)?; - let fields = stmt.query_row(params![before, before], |r| { - let rowid: u64 = r.get(0)?; - let rowname: String = r.get(1)?; - let eventid: Vec = r.get(2)?; - let pubkey: Vec = r.get(3)?; - let created_at: u64 = r.get(4)?; - // create a tuple since we can't throw non-rusqlite errors in this closure - Ok(( - rowid, - rowname, - eventid, - pubkey, - created_at, - r.get(5).ok(), - r.get(6).ok(), - r.get(7)?, - )) - })?; - let vr = VerificationRecord { - rowid: fields.0, - name: Nip05Name::try_from(&fields.1[..])?, - address: hex::encode(fields.3), - event: hex::encode(fields.2), - event_created: fields.4, - last_success: fields.5, - last_failure: fields.6, - failure_count: fields.7, - }; - Ok(vr) - }).await? + let mut conn = self.read_pool.get()?; + tokio::task::spawn_blocking(move || { + let tx = conn.transaction()?; + let query = "SELECT v.id, v.name, e.event_hash, e.author, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v INNER JOIN event e ON e.id=v.metadata_event WHERE (v.verified_at < ? OR v.verified_at IS NULL) AND (v.failed_at < ? OR v.failed_at IS NULL) ORDER BY v.verified_at ASC, v.failed_at ASC LIMIT 1;"; + let mut stmt = tx.prepare_cached(query)?; + let fields = stmt.query_row(params![before, before], |r| { + let rowid: u64 = r.get(0)?; + let rowname: String = r.get(1)?; + let eventid: Vec = r.get(2)?; + let pubkey: Vec = r.get(3)?; + let created_at: u64 = r.get(4)?; + // create a tuple since we can't throw non-rusqlite errors in this closure + Ok(( + rowid, + rowname, + eventid, + pubkey, + created_at, + r.get(5).ok(), + r.get(6).ok(), + r.get(7)?, + )) + })?; + let vr = VerificationRecord { + rowid: fields.0, + name: Nip05Name::try_from(&fields.1[..])?, + address: hex::encode(fields.3), + event: hex::encode(fields.2), + event_created: fields.4, + last_success: fields.5, + last_failure: fields.6, + failure_count: fields.7, + }; + Ok(vr) + }).await? } } @@ -578,27 +578,27 @@ fn override_index(f: &ReqFilter) -> Option { // queries for multiple kinds default to kind_index, which is // significantly slower than kind_created_at_index. if let Some(ks) = &f.kinds { - if f.ids.is_none() && - ks.len() > 1 && - f.since.is_none() && - f.until.is_none() && - f.tags.is_none() && - f.authors.is_none() { - return Some("kind_created_at_index".into()); - } + if f.ids.is_none() && + ks.len() > 1 && + f.since.is_none() && + f.until.is_none() && + f.tags.is_none() && + f.authors.is_none() { + return Some("kind_created_at_index".into()); + } } // if there is an author, it is much better to force the authors index. if f.authors.is_some() { - if f.since.is_none() && f.until.is_none() { - if f.kinds.is_none() { - // with no use of kinds/created_at, just author - return Some("author_index".into()); - } - // prefer author_kind if there are kinds - return Some("author_kind_index".into()); - } - // finally, prefer author_created_at if time is provided - return Some("author_created_at_index".into()); + if f.since.is_none() && f.until.is_none() { + if f.kinds.is_none() { + // with no use of kinds/created_at, just author + return Some("author_index".into()); + } + // prefer author_kind if there are kinds + return Some("author_kind_index".into()); + } + // finally, prefer author_created_at if time is provided + return Some("author_created_at_index".into()); } None } @@ -654,11 +654,11 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec>, Option (String, Vec>, Option (String, Vec>, Vec>) -> Result<()> { tokio::task::spawn(async move { - // WAL size in pages. - let mut current_wal_size = 0; - // WAL threshold for more aggressive checkpointing (10,000 pages, or about 40MB) - let wal_threshold = 1000*10; - // default threshold for the busy timer - let busy_wait_default = Duration::from_secs(1); - // if the WAL file is getting too big, switch to this - let busy_wait_default_long = Duration::from_secs(10); + // WAL size in pages. + let mut current_wal_size = 0; + // WAL threshold for more aggressive checkpointing (10,000 pages, or about 40MB) + let wal_threshold = 1000*10; + // default threshold for the busy timer + let busy_wait_default = Duration::from_secs(1); + // if the WAL file is getting too big, switch to this + let busy_wait_default_long = Duration::from_secs(10); loop { tokio::select! { _ = tokio::time::sleep(frequency) => { if let Ok(mut conn) = pool.get() { let mut _guard:Option> = None; - // the busy timer will block writers, so don't set - // this any higher than you want max latency for event - // writes. - if current_wal_size <= wal_threshold { - conn.busy_timeout(busy_wait_default).ok(); - } else { - // if the wal size has exceeded a threshold, increase the busy timeout. - conn.busy_timeout(busy_wait_default_long).ok(); - // take a lock that will prevent new readers. - info!("blocking new readers to perform wal_checkpoint"); - _guard = Some(checkpoint_in_progress.lock().await); - } + // the busy timer will block writers, so don't set + // this any higher than you want max latency for event + // writes. + if current_wal_size <= wal_threshold { + conn.busy_timeout(busy_wait_default).ok(); + } else { + // if the wal size has exceeded a threshold, increase the busy timeout. + conn.busy_timeout(busy_wait_default_long).ok(); + // take a lock that will prevent new readers. + info!("blocking new readers to perform wal_checkpoint"); + _guard = Some(checkpoint_in_progress.lock().await); + } debug!("running wal_checkpoint(TRUNCATE)"); if let Ok(new_size) = checkpoint_db(&mut conn) { - current_wal_size = new_size; - } + current_wal_size = new_size; + } } - } + } }; } }); @@ -922,8 +922,8 @@ fn repeat_vars(count: usize) -> String { pub async fn monitor_pool(name: &str, pool: SqlitePool) { let sleep_dur = Duration::from_secs(60); loop { - log_pool_stats(name, &pool); - tokio::time::sleep(sleep_dur).await; + log_pool_stats(name, &pool); + tokio::time::sleep(sleep_dur).await; } } @@ -936,7 +936,7 @@ fn log_pool_stats(name: &str, pool: &SqlitePool) { name, in_use_cxns, state.connections, - pool.max_size() + pool.max_size() ); } diff --git a/src/repo/sqlite_migration.rs b/src/repo/sqlite_migration.rs index d1722b1..3b6f608 100644 --- a/src/repo/sqlite_migration.rs +++ b/src/repo/sqlite_migration.rs @@ -240,10 +240,10 @@ pub fn rebuild_tags(conn: &mut PooledConnection) -> Result<()> { let mut stmt = tx.prepare("select id, content from event order by id;")?; let mut tag_rows = stmt.query([])?; while let Some(row) = tag_rows.next()? { - if (events_processed as f32)/(count as f32) > percent_done { - info!("Tag update {}% complete...", (100.0*percent_done).round()); - percent_done += update_each_percent; - } + if (events_processed as f32)/(count as f32) > percent_done { + info!("Tag update {}% complete...", (100.0*percent_done).round()); + percent_done += update_each_percent; + } // we want to capture the event_id that had the tag, the tag name, and the tag hex value. let event_id: u64 = row.get(0)?; let event_json: String = row.get(1)?; @@ -272,7 +272,7 @@ pub fn rebuild_tags(conn: &mut PooledConnection) -> Result<()> { )?; } } - events_processed += 1; + events_processed += 1; } } tx.commit()?; @@ -560,7 +560,7 @@ fn mig_11_to_12(conn: &mut PooledConnection) -> Result { // Lookup every replaceable event let mut stmt = tx.prepare("select kind,author from event where kind in (0,3,41) or (kind>=10000 and kind<20000) order by id;")?; let mut replaceable_rows = stmt.query([])?; - info!("updating replaceable events; this could take awhile..."); + info!("updating replaceable events; this could take awhile..."); while let Some(row) = replaceable_rows.next()? { // we want to capture the event_id that had the tag, the tag name, and the tag hex value. let event_kind: u64 = row.get(0)?; @@ -641,10 +641,10 @@ PRAGMA user_version = 15; let clear_hidden_sql = r##"DELETE FROM event WHERE HIDDEN=true;"##; info!("removing hidden events; this may take awhile..."); match conn.execute_batch(clear_hidden_sql) { - Ok(()) => { - info!("all hidden events removed"); - }, - Err(err) => { + Ok(()) => { + info!("all hidden events removed"); + }, + Err(err) => { error!("delete failed: {}", err); panic!("could not remove hidden events"); } diff --git a/src/server.rs b/src/server.rs index 3053db0..36da758 100644 --- a/src/server.rs +++ b/src/server.rs @@ -90,7 +90,7 @@ async fn handle_web_request( tokio_tungstenite::tungstenite::protocol::Role::Server, Some(config), ) - .await; + .await; let origin = get_header_string("origin", request.headers()); let user_agent = get_header_string("user-agent", request.headers()); // determine the remote IP from headers if the exist @@ -109,7 +109,7 @@ async fn handle_web_request( }; // spawn a nostr server with our websocket tokio::spawn(nostr_server( - repo, + repo, client_info, settings, ws_stream, @@ -154,26 +154,26 @@ async fn handle_web_request( let rinfo = RelayInfo::from(settings.info); let b = Body::from(serde_json::to_string_pretty(&rinfo).unwrap()); return Ok(Response::builder() - .status(200) - .header("Content-Type", "application/nostr+json") - .header("Access-Control-Allow-Origin", "*") - .body(b) - .unwrap()); + .status(200) + .header("Content-Type", "application/nostr+json") + .header("Access-Control-Allow-Origin", "*") + .body(b) + .unwrap()); } } } Ok(Response::builder() - .status(200) - .header("Content-Type", "text/plain") - .body(Body::from("Please use a Nostr client to connect.")) - .unwrap()) + .status(200) + .header("Content-Type", "text/plain") + .body(Body::from("Please use a Nostr client to connect.")) + .unwrap()) } (_, _) => { //handle any other url Ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::from("Nothing here.")) - .unwrap()) + .status(StatusCode::NOT_FOUND) + .body(Body::from("Nothing here.")) + .unwrap()) } } } @@ -191,18 +191,18 @@ async fn ctrl_c_or_signal(mut shutdown_signal: Receiver<()>) { loop { tokio::select! { _ = shutdown_signal.recv() => { - info!("Shutting down webserver as requested"); - // server shutting down, exit loop - break; - }, - _ = tokio::signal::ctrl_c() => { - info!("Shutting down webserver due to SIGINT"); - break; + info!("Shutting down webserver as requested"); + // server shutting down, exit loop + break; + }, + _ = tokio::signal::ctrl_c() => { + info!("Shutting down webserver due to SIGINT"); + break; + }, + _ = term_signal.recv() => { + info!("Shutting down webserver due to SIGTERM"); + break; }, - _ = term_signal.recv() => { - info!("Shutting down webserver due to SIGTERM"); - break; - }, } } } @@ -251,18 +251,18 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul let rt = Builder::new_multi_thread() .enable_all() .thread_name_fn(|| { - // give each thread a unique numeric name - static ATOMIC_ID: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0); - let id = ATOMIC_ID.fetch_add(1,Ordering::SeqCst); - format!("tokio-ws-{}", id) - }) - // limit concurrent SQLite blocking threads + // give each thread a unique numeric name + static ATOMIC_ID: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0); + let id = ATOMIC_ID.fetch_add(1,Ordering::SeqCst); + format!("tokio-ws-{}", id) + }) + // limit concurrent SQLite blocking threads .max_blocking_threads(settings.limits.max_blocking_threads) .on_thread_start(|| { trace!("started new thread: {:?}", std::thread::current().name()); }) .on_thread_stop(|| { - trace!("stopped thread: {:?}", std::thread::current().name()); + trace!("stopped thread: {:?}", std::thread::current().name()); }) .build() .unwrap(); @@ -293,19 +293,19 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul // overwhelming this will drop events and won't register // metadata events. let (metadata_tx, metadata_rx) = broadcast::channel::(4096); - // build a repository for events - let repo = db::build_repo(&settings).await; + // build a repository for events + let repo = db::build_repo(&settings).await; // start the database writer task. Give it a channel for // writing events, and for publishing events that have been // written (to all connected clients). - tokio::task::spawn( + tokio::task::spawn( db::db_writer( - repo.clone(), - settings.clone(), - event_rx, - bcast_tx.clone(), - metadata_tx.clone(), - shutdown_listen, + repo.clone(), + settings.clone(), + event_rx, + bcast_tx.clone(), + metadata_tx.clone(), + shutdown_listen, )); info!("db writer created"); @@ -327,7 +327,6 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul let controlled_shutdown = invoke_shutdown.clone(); tokio::spawn(async move { info!("control message listener started"); - // we only have good "shutdown" messages propagation from this-> controlled shutdown. Not from controlled_shutdown-> this. Which means we have a task that is stuck waiting on a sync receive. recv is blocking, and this is async. match shutdown_rx.recv() { Ok(()) => { info!("control message requesting shutdown"); @@ -348,14 +347,14 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul info!("shutting down due to SIGINT (main)"); ctrl_c_shutdown.send(()).ok(); }); - // spawn a task to check the pool size. - //let pool_monitor = pool.clone(); - //tokio::spawn(async move {db::monitor_pool("reader", pool_monitor).await;}); + // spawn a task to check the pool size. + //let pool_monitor = pool.clone(); + //tokio::spawn(async move {db::monitor_pool("reader", pool_monitor).await;}); // A `Service` is needed for every connection, so this // creates one from our `handle_request` function. let make_svc = make_service_fn(|conn: &AddrStream| { - let repo = repo.clone(); + let repo = repo.clone(); let remote_addr = conn.remote_addr(); let bcast = bcast_tx.clone(); let event = event_tx.clone(); @@ -366,7 +365,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul Ok::<_, Infallible>(service_fn(move |request: Request| { handle_web_request( request, - repo.clone(), + repo.clone(), settings.clone(), remote_addr, bcast.clone(), @@ -404,10 +403,10 @@ fn convert_to_msg(msg: &str, max_bytes: Option) -> Result { let parsed_res: Result = serde_json::from_str(msg).map_err(std::convert::Into::into); match parsed_res { Ok(m) => { - if let NostrMessage::SubMsg(_) = m { - // note; this only prints the first 16k of a REQ and then truncates. - trace!("REQ: {:?}",msg); - }; + if let NostrMessage::SubMsg(_) = m { + // note; this only prints the first 16k of a REQ and then truncates. + trace!("REQ: {:?}",msg); + }; if let NostrMessage::EventMsg(_) = m { if let Some(max_size) = max_bytes { // check length, ensure that some max size is set. @@ -514,7 +513,7 @@ async fn nostr_server( loop { tokio::select! { _ = shutdown.recv() => { - info!("Close connection down due to shutdown, client: {}, ip: {:?}, connected: {:?}", cid, conn.ip(), orig_start.elapsed()); + info!("Close connection down due to shutdown, client: {}, ip: {:?}, connected: {:?}", cid, conn.ip(), orig_start.elapsed()); // server shutting down, exit loop break; }, @@ -552,7 +551,6 @@ async fn nostr_server( if !sub.interested_in_event(&global_event) { continue; } - // TODO: serialize at broadcast time, instead of // once for each consumer. if let Ok(event_str) = serde_json::to_string(&global_event) { @@ -575,9 +573,9 @@ async fn nostr_server( Some(Ok(Message::Text(m))) => { convert_to_msg(&m,settings.limits.max_event_bytes) }, - Some(Ok(Message::Binary(_))) => { - ws_stream.send( - make_notice_message(&Notice::message("binary messages are not accepted".into()))).await.ok(); + Some(Ok(Message::Binary(_))) => { + ws_stream.send( + make_notice_message(&Notice::message("binary messages are not accepted".into()))).await.ok(); continue; }, Some(Ok(Message::Ping(_) | Message::Pong(_))) => { @@ -585,19 +583,19 @@ async fn nostr_server( // send responses automatically. continue; }, - Some(Err(WsError::Capacity(MessageTooLong{size, max_size}))) => { - ws_stream.send( - make_notice_message(&Notice::message(format!("message too large ({} > {})",size, max_size)))).await.ok(); + Some(Err(WsError::Capacity(MessageTooLong{size, max_size}))) => { + ws_stream.send( + make_notice_message(&Notice::message(format!("message too large ({} > {})",size, max_size)))).await.ok(); continue; - }, + }, None | - Some(Ok(Message::Close(_)) | - Err(WsError::AlreadyClosed | WsError::ConnectionClosed | - WsError::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake))) + Some(Ok(Message::Close(_)) | + Err(WsError::AlreadyClosed | WsError::ConnectionClosed | + WsError::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake))) => { debug!("websocket close from client (cid: {}, ip: {:?})",cid, conn.ip()); - break; - }, + break; + }, Some(Err(WsError::Io(e))) => { // IO errors are considered fatal warn!("IO error (cid: {}, ip: {:?}): {:?}", cid, conn.ip(), e); @@ -627,14 +625,14 @@ async fn nostr_server( let submit_event = SubmittedEvent { event: e.clone(), notice_tx: notice_tx.clone() }; event_tx.send(submit_event).await.ok(); client_published_event_count += 1; - } else { - info!("client: {} sent a far future-dated event", cid); - if let Some(fut_sec) = settings.options.reject_future_seconds { - let msg = format!("The event created_at field is out of the acceptable range (+{}sec) for this relay.",fut_sec); - let notice = Notice::invalid(e.id, &msg); - ws_stream.send(make_notice_message(¬ice)).await.ok(); - } - } + } else { + info!("client: {} sent a far future-dated event", cid); + if let Some(fut_sec) = settings.options.reject_future_seconds { + let msg = format!("The event created_at field is out of the acceptable range (+{}sec) for this relay.",fut_sec); + let notice = Notice::invalid(e.id, &msg); + ws_stream.send(make_notice_message(¬ice)).await.ok(); + } + } }, Err(e) => { info!("client sent an invalid event (cid: {})", cid); @@ -649,49 +647,49 @@ async fn nostr_server( // * registering the subscription so future events can be matched // * making a channel to cancel to request later // * sending a request for a SQL query - // Do nothing if the sub already exists. - if conn.has_subscription(&s) { - info!("client sent duplicate subscription, ignoring (cid: {}, sub: {:?})", cid, s.id); - } else { - if let Some(ref lim) = sub_lim_opt { - lim.until_ready_with_jitter(jitter).await; - } + // Do nothing if the sub already exists. + if conn.has_subscription(&s) { + info!("client sent duplicate subscription, ignoring (cid: {}, sub: {:?})", cid, s.id); + } else { + if let Some(ref lim) = sub_lim_opt { + lim.until_ready_with_jitter(jitter).await; + } let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>(); match conn.subscribe(s.clone()) { - Ok(()) => { + Ok(()) => { // when we insert, if there was a previous query running with the same name, cancel it. if let Some(previous_query) = running_queries.insert(s.id.clone(), abandon_query_tx) { - previous_query.send(()).ok(); + previous_query.send(()).ok(); } - if s.needs_historical_events() { - // start a database query. this spawns a blocking database query on a worker thread. - repo.query_subscription(s, cid.clone(), query_tx.clone(), abandon_query_rx).await.ok(); - } - }, - Err(e) => { - info!("Subscription error: {} (cid: {}, sub: {:?})", e, cid, s.id); + if s.needs_historical_events() { + // start a database query. this spawns a blocking database query on a worker thread. + repo.query_subscription(s, cid.clone(), query_tx.clone(), abandon_query_rx).await.ok(); + } + }, + Err(e) => { + info!("Subscription error: {} (cid: {}, sub: {:?})", e, cid, s.id); ws_stream.send(make_notice_message(&Notice::message(format!("Subscription error: {}", e)))).await.ok(); - } + } } - } + } }, Ok(NostrMessage::CloseMsg(cc)) => { // closing a request simply removes the subscription. let parsed : Result = Result::::from(cc); - if let Ok(c) = parsed { - // check if a query is currently - // running, and remove it if so. - let stop_tx = running_queries.remove(&c.id); - if let Some(tx) = stop_tx { - tx.send(()).ok(); - } - // stop checking new events against - // the subscription - conn.unsubscribe(&c); - } else { - info!("invalid command ignored"); - ws_stream.send(make_notice_message(&Notice::message("could not parse command".into()))).await.ok(); + if let Ok(c) = parsed { + // check if a query is currently + // running, and remove it if so. + let stop_tx = running_queries.remove(&c.id); + if let Some(tx) = stop_tx { + tx.send(()).ok(); } + // stop checking new events against + // the subscription + conn.unsubscribe(&c); + } else { + info!("invalid command ignored"); + ws_stream.send(make_notice_message(&Notice::message("could not parse command".into()))).await.ok(); + } }, Err(Error::ConnError) => { debug!("got connection close/error, disconnecting cid: {}, ip: {:?}",cid, conn.ip()); @@ -722,6 +720,6 @@ async fn nostr_server( conn.ip(), client_published_event_count, client_received_event_count, - orig_start.elapsed() + orig_start.elapsed() ); } diff --git a/src/subscription.rs b/src/subscription.rs index db83ae3..e055fe2 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -65,21 +65,21 @@ impl<'de> Deserialize<'de> for ReqFilter { tags: None, force_no_match: false, }; - let empty_string = "".into(); + let empty_string = "".into(); let mut ts = None; // iterate through each key, and assign values that exist for (key, val) in filter { // ids if key == "ids" { - let raw_ids: Option>= Deserialize::deserialize(val).ok(); - if let Some(a) = raw_ids.as_ref() { - if a.contains(&empty_string) { - return Err(serde::de::Error::invalid_type( - Unexpected::Other("prefix matches must not be empty strings"), - &"a json object")); - } - } - rf.ids =raw_ids; + let raw_ids: Option>= Deserialize::deserialize(val).ok(); + if let Some(a) = raw_ids.as_ref() { + if a.contains(&empty_string) { + return Err(serde::de::Error::invalid_type( + Unexpected::Other("prefix matches must not be empty strings"), + &"a json object")); + } + } + rf.ids =raw_ids; } else if key == "kinds" { rf.kinds = Deserialize::deserialize(val).ok(); } else if key == "since" { @@ -90,14 +90,14 @@ impl<'de> Deserialize<'de> for ReqFilter { rf.limit = Deserialize::deserialize(val).ok(); } else if key == "authors" { let raw_authors: Option>= Deserialize::deserialize(val).ok(); - if let Some(a) = raw_authors.as_ref() { - if a.contains(&empty_string) { - return Err(serde::de::Error::invalid_type( - Unexpected::Other("prefix matches must not be empty strings"), - &"a json object")); - } - } - rf.authors = raw_authors; + if let Some(a) = raw_authors.as_ref() { + if a.contains(&empty_string) { + return Err(serde::de::Error::invalid_type( + Unexpected::Other("prefix matches must not be empty strings"), + &"a json object")); + } + } + rf.authors = raw_authors; } else if key.starts_with('#') && key.len() > 1 && val.is_array() { if let Some(tag_search) = tag_search_char_from_filter(key) { if ts.is_none() { @@ -107,7 +107,7 @@ impl<'de> Deserialize<'de> for ReqFilter { if let Some(m) = ts.as_mut() { let tag_vals: Option> = Deserialize::deserialize(val).ok(); if let Some(v) = tag_vals { - let hs = v.into_iter().collect::>(); + let hs = v.into_iter().collect::>(); m.insert(tag_search.to_owned(), hs); } }; @@ -204,7 +204,7 @@ impl Subscription { /// Determine if any filter is requesting historical (database) /// queries. If every filter has limit:0, we do not need to query the DB. #[must_use] pub fn needs_historical_events(&self) -> bool { - self.filters.iter().any(|f| f.limit!=Some(0)) + self.filters.iter().any(|f| f.limit!=Some(0)) } /// Determine if this subscription matches a given [`Event`]. Any @@ -316,19 +316,19 @@ mod tests { #[test] fn req_empty_authors_prefix() { - let raw_json = "[\"REQ\",\"some-id\",{\"authors\": [\"\"]}]"; + let raw_json = "[\"REQ\",\"some-id\",{\"authors\": [\"\"]}]"; assert!(serde_json::from_str::(raw_json).is_err()); } #[test] fn req_empty_ids_prefix() { - let raw_json = "[\"REQ\",\"some-id\",{\"ids\": [\"\"]}]"; + let raw_json = "[\"REQ\",\"some-id\",{\"ids\": [\"\"]}]"; assert!(serde_json::from_str::(raw_json).is_err()); } #[test] fn req_empty_ids_prefix_mixed() { - let raw_json = "[\"REQ\",\"some-id\",{\"ids\": [\"\",\"aaa\"]}]"; + let raw_json = "[\"REQ\",\"some-id\",{\"ids\": [\"\",\"aaa\"]}]"; assert!(serde_json::from_str::(raw_json).is_err()); }