diff --git a/build.rs b/build.rs index 7845f76..2dc88f9 100644 --- a/build.rs +++ b/build.rs @@ -2,9 +2,6 @@ fn main() -> Result<(), Box> { tonic_build::configure() .build_server(false) .protoc_arg("--experimental_allow_proto3_optional") - .compile( - &["proto/nauthz.proto"], - &["proto"], - )?; + .compile(&["proto/nauthz.proto"], &["proto"])?; Ok(()) } diff --git a/src/config.rs b/src/config.rs index 442c6bb..ff31f2a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -77,7 +77,7 @@ pub struct Limits { #[allow(unused)] pub struct Authorization { pub pubkey_whitelist: Option>, // If present, only allow these pubkeys to publish events - pub nip42_auth: bool, // if true enables NIP-42 authentication + pub nip42_auth: bool, // if true enables NIP-42 authentication } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -257,7 +257,7 @@ impl Default for Settings { }, authorization: Authorization { pubkey_whitelist: None, // Allow any address to publish - nip42_auth: false, // Disable NIP-42 authentication + nip42_auth: false, // Disable NIP-42 authentication }, verified_users: VerifiedUsers { mode: VerifiedUsersMode::Disabled, diff --git a/src/conn.rs b/src/conn.rs index 30ebeee..a0cd485 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -161,12 +161,12 @@ impl ClientConn { Challenge(_) => (), AuthPubkey(_) => { // already authenticated - return Ok(()) - }, + return Ok(()); + } NoAuth => { // unexpected AUTH request return Err(Error::AuthFailure); - }, + } } match event.validate() { Ok(_) => { diff --git a/src/db.rs b/src/db.rs index 9d478aa..a52199a 100644 --- a/src/db.rs +++ b/src/db.rs @@ -2,12 +2,12 @@ use crate::config::Settings; use crate::error::{Error, Result}; use crate::event::Event; +use crate::nauthz; use crate::notice::Notice; use crate::repo::postgres::{PostgresPool, PostgresRepo}; use crate::repo::sqlite::SqliteRepo; use crate::repo::NostrRepo; use crate::server::NostrMetrics; -use crate::nauthz; use governor::clock::Clock; use governor::{Quota, RateLimiter}; use r2d2; @@ -116,8 +116,8 @@ pub async fn db_writer( }; //let gprc_client = settings.grpc.event_admission_server.map(|s| { -// event_admitter_connect(&s); -// }); + // event_admitter_connect(&s); + // }); loop { if shutdown.try_recv().is_ok() { @@ -181,10 +181,7 @@ pub async fn db_writer( &event.kind ); notice_tx - .try_send(Notice::blocked( - event.id, - "event kind is blocked by relay" - )) + .try_send(Notice::blocked(event.id, "event kind is blocked by relay")) .ok(); continue; } @@ -216,7 +213,6 @@ pub async fn db_writer( uv.name.to_string(), event.get_author_prefix() ); - } else { info!( "rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)", @@ -253,27 +249,44 @@ pub async fn db_writer( } // nip05 address - let nip05_address : Option = validation.and_then(|x| x.ok().map(|y| y.name)); + let nip05_address: Option = + validation.and_then(|x| x.ok().map(|y| y.name)); // GRPC check if let Some(ref mut c) = grpc_client { trace!("checking if grpc permits"); let grpc_start = Instant::now(); - let decision_res = c.admit_event(&event, &subm_event.source_ip, subm_event.origin, subm_event.user_agent, nip05_address, subm_event.auth_pubkey).await; + let decision_res = c + .admit_event( + &event, + &subm_event.source_ip, + subm_event.origin, + subm_event.user_agent, + nip05_address, + subm_event.auth_pubkey, + ) + .await; match decision_res { Ok(decision) => { if !decision.permitted() { // GPRC returned a decision to reject this event - info!("GRPC rejected event: {:?} (kind: {}) from: {:?} in: {:?} (IP: {:?})", - event.get_event_id_prefix(), - event.kind, - event.get_author_prefix(), - grpc_start.elapsed(), - subm_event.source_ip); - notice_tx.try_send(Notice::blocked(event.id, &decision.message().unwrap_or_else(|| "".to_string()))).ok(); + info!( + "GRPC rejected event: {:?} (kind: {}) from: {:?} in: {:?} (IP: {:?})", + event.get_event_id_prefix(), + event.kind, + event.get_author_prefix(), + grpc_start.elapsed(), + subm_event.source_ip + ); + notice_tx + .try_send(Notice::blocked( + event.id, + &decision.message().unwrap_or_else(|| "".to_string()), + )) + .ok(); continue; } - }, + } Err(e) => { warn!("GRPC server error: {:?}", e); } diff --git a/src/event.rs b/src/event.rs index 5af9c3b..6f5fde3 100644 --- a/src/event.rs +++ b/src/event.rs @@ -5,6 +5,8 @@ use crate::error::Error::{ EventMalformedPubkey, }; use crate::error::Result; +use crate::event::EventWrapper::WrappedAuth; +use crate::event::EventWrapper::WrappedEvent; use crate::nip05; use crate::utils::unix_time; use bitcoin_hashes::{sha256, Hash}; @@ -17,8 +19,6 @@ use std::collections::HashMap; use std::collections::HashSet; use std::str::FromStr; use tracing::{debug, info}; -use crate::event::EventWrapper::WrappedEvent; -use crate::event::EventWrapper::WrappedAuth; lazy_static! { /// Secp256k1 verification instance. @@ -90,10 +90,9 @@ pub fn single_char_tagname(tagname: &str) -> Option { } } - pub enum EventWrapper { WrappedEvent(Event), - WrappedAuth(Event) + WrappedAuth(Event), } /// Convert network event to parsed/validated event. @@ -157,11 +156,13 @@ impl Event { /// Determine the time at which this event should expire pub fn expiration(&self) -> Option { let default = "".to_string(); - let dvals:Vec<&String> = self.tags + let dvals: Vec<&String> = self + .tags .iter() .filter(|x| !x.is_empty()) .filter(|x| x.get(0).unwrap() == "expiration") - .map(|x| x.get(1).unwrap_or(&default)).take(1) + .map(|x| x.get(1).unwrap_or(&default)) + .take(1) .collect(); let val_first = dvals.get(0); val_first.and_then(|t| t.parse::().ok()) @@ -711,9 +712,7 @@ mod tests { // regular events do not expire let mut event = Event::simple_event(); event.kind = 7; - event.tags = vec![ - vec!["test".to_string(), "foo".to_string()], - ]; + event.tags = vec![vec!["test".to_string(), "foo".to_string()]]; assert_eq!(event.expiration(), None); } @@ -722,57 +721,47 @@ mod tests { // regular events do not expire let mut event = Event::simple_event(); event.kind = 7; - event.tags = vec![ - vec!["expiration".to_string()], - ]; + event.tags = vec![vec!["expiration".to_string()]]; assert_eq!(event.expiration(), None); } #[test] fn expiring_event_future() { // a normal expiring event - let exp:u64 = 1676264138; + let exp: u64 = 1676264138; let mut event = Event::simple_event(); event.kind = 1; - event.tags = vec![ - vec!["expiration".to_string(), exp.to_string()], - ]; + event.tags = vec![vec!["expiration".to_string(), exp.to_string()]]; assert_eq!(event.expiration(), Some(exp)); } #[test] fn expiring_event_negative() { // expiration set to a negative value (invalid) - let exp:i64 = -90; + let exp: i64 = -90; let mut event = Event::simple_event(); event.kind = 1; - event.tags = vec![ - vec!["expiration".to_string(), exp.to_string()], - ]; + event.tags = vec![vec!["expiration".to_string(), exp.to_string()]]; assert_eq!(event.expiration(), None); } #[test] fn expiring_event_zero() { // a normal expiring event set to zero - let exp:i64 = 0; + let exp: i64 = 0; let mut event = Event::simple_event(); event.kind = 1; - event.tags = vec![ - vec!["expiration".to_string(), exp.to_string()], - ]; + event.tags = vec![vec!["expiration".to_string(), exp.to_string()]]; assert_eq!(event.expiration(), Some(0)); } #[test] fn expiring_event_fraction() { // expiration is fractional (invalid) - let exp:f64 = 23.334; + let exp: f64 = 23.334; let mut event = Event::simple_event(); event.kind = 1; - event.tags = vec![ - vec!["expiration".to_string(), exp.to_string()], - ]; + event.tags = vec![vec!["expiration".to_string(), exp.to_string()]]; assert_eq!(event.expiration(), None); } diff --git a/src/lib.rs b/src/lib.rs index 4df90e6..ba496b9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,8 +8,8 @@ pub mod error; pub mod event; pub mod hexrange; pub mod info; -pub mod nip05; pub mod nauthz; +pub mod nip05; pub mod notice; pub mod repo; pub mod subscription; diff --git a/src/nauthz.rs b/src/nauthz.rs index da57175..1d8caad 100644 --- a/src/nauthz.rs +++ b/src/nauthz.rs @@ -76,7 +76,7 @@ impl EventAuthzService { origin: Option, user_agent: Option, nip05: Option, - auth_pubkey: Option> + auth_pubkey: Option>, ) -> Result> { self.ready_connection().await; let id_blob = hex::decode(&event.id)?; diff --git a/src/nip05.rs b/src/nip05.rs index 6849b36..5e1f35b 100644 --- a/src/nip05.rs +++ b/src/nip05.rs @@ -265,10 +265,10 @@ impl Verifier { Err(Error::ChannelClosed) => { // channel was closed, we are shutting down return; - }, + } Err(e) => { - info!("error in verifier: {:?}", e); - }, + info!("error in verifier: {:?}", e); + } _ => {} } } diff --git a/src/notice.rs b/src/notice.rs index 39c9602..d243acc 100644 --- a/src/notice.rs +++ b/src/notice.rs @@ -16,7 +16,7 @@ pub struct EventResult { pub enum Notice { Message(String), EventResult(EventResult), - AuthChallenge(String) + AuthChallenge(String), } impl EventResultStatus { diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index 3ade449..56a2696 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -16,11 +16,11 @@ use crate::error; use crate::hexrange::{hex_range, HexSearch}; use crate::repo::postgres_migration::run_migrations; use crate::server::NostrMetrics; -use crate::utils::{is_hex, is_lower_hex, self}; +use crate::utils::{self, is_hex, is_lower_hex}; use tokio::sync::mpsc::Sender; use tokio::sync::oneshot::Receiver; use tracing::log::trace; -use tracing::{debug, info, warn, error}; +use tracing::{debug, error, info, warn}; pub type PostgresPool = sqlx::pool::Pool; @@ -36,10 +36,8 @@ impl PostgresRepo { metrics: m, } } - } - /// Cleanup expired events on a regular basis async fn cleanup_expired(conn: PostgresPool, frequency: Duration) -> Result<()> { tokio::task::spawn(async move { @@ -66,12 +64,13 @@ async fn cleanup_expired(conn: PostgresPool, frequency: Duration) -> Result<()> } /// One-time deletion of all expired events -async fn delete_expired(conn:PostgresPool) -> Result { +async fn delete_expired(conn: PostgresPool) -> Result { let mut tx = conn.begin().await?; let update_count = sqlx::query("DELETE FROM \"event\" WHERE expires_at <= $1;") .bind(Utc.timestamp_opt(utils::unix_time() as i64, 0).unwrap()) .execute(&mut tx) - .await?.rows_affected(); + .await? + .rows_affected(); tx.commit().await?; Ok(update_count) } @@ -81,7 +80,7 @@ impl NostrRepo for PostgresRepo { async fn start(&self) -> Result<()> { // begin a cleanup task for expired events. cleanup_expired(self.conn.clone(), Duration::from_secs(600)).await?; - Ok(()) + Ok(()) } async fn migrate_up(&self) -> Result { @@ -148,16 +147,19 @@ impl NostrRepo for PostgresRepo { VALUES($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (id) DO NOTHING"#, ) - .bind(&id_blob) - .bind(&pubkey_blob) - .bind(Utc.timestamp_opt(e.created_at as i64, 0).unwrap()) - .bind(e.expiration().and_then(|x| Utc.timestamp_opt(x as i64, 0).latest())) - .bind(e.kind as i64) - .bind(event_str.into_bytes()) - .bind(delegator_blob) - .execute(&mut tx) - .await? - .rows_affected(); + .bind(&id_blob) + .bind(&pubkey_blob) + .bind(Utc.timestamp_opt(e.created_at as i64, 0).unwrap()) + .bind( + e.expiration() + .and_then(|x| Utc.timestamp_opt(x as i64, 0).latest()), + ) + .bind(e.kind as i64) + .bind(event_str.into_bytes()) + .bind(delegator_blob) + .execute(&mut tx) + .await? + .rows_affected(); if ins_count == 0 { // if the event was a duplicate, no need to insert event or // pubkey references. This will abort the txn. @@ -753,7 +755,8 @@ fn query_from_filter(f: &ReqFilter) -> Option> { // never display expired events query .push(" AND (e.expires_at IS NULL OR e.expires_at > ") - .push_bind(Utc.timestamp_opt(utils::unix_time() as i64, 0).unwrap()).push(")"); + .push_bind(Utc.timestamp_opt(utils::unix_time() as i64, 0).unwrap()) + .push(")"); // Apply per-filter limit to this query. // The use of a LIMIT implies a DESC order, to capture only the most recent events. diff --git a/src/repo/sqlite.rs b/src/repo/sqlite.rs index a892e38..db933b4 100644 --- a/src/repo/sqlite.rs +++ b/src/repo/sqlite.rs @@ -2,8 +2,8 @@ //use crate::config::SETTINGS; use crate::config::Settings; use crate::db::QueryResult; -use crate::error::Result; use crate::error::Error::SqlError; +use crate::error::Result; use crate::event::{single_char_tagname, Event}; use crate::hexrange::hex_range; use crate::hexrange::HexSearch; @@ -257,13 +257,15 @@ impl NostrRepo for SqliteRepo { self.maint_pool.clone(), Duration::from_secs(60), self.write_in_progress.clone(), - self.checkpoint_in_progress.clone() - ).await?; + self.checkpoint_in_progress.clone(), + ) + .await?; cleanup_expired( self.maint_pool.clone(), Duration::from_secs(600), - self.write_in_progress.clone() - ).await + self.write_in_progress.clone(), + ) + .await } async fn migrate_up(&self) -> Result { @@ -286,24 +288,29 @@ impl NostrRepo for SqliteRepo { // this could fail because the database was busy; try // multiple times before giving up. loop { - attempts+=1; + attempts += 1; let wr = SqliteRepo::persist_event(&mut conn, &e); match wr { - Err(SqlError(rusqlite::Error::SqliteFailure(e,_))) => { + Err(SqlError(rusqlite::Error::SqliteFailure(e, _))) => { // this basically means that NIP-05 or another // writer was using the database between us // reading and promoting the connection to a // write lock. - info!("event write failed, DB locked (attempt: {}); sqlite err: {}", - attempts, e.extended_code); - }, - _ => {return wr;}, + info!( + "event write failed, DB locked (attempt: {}); sqlite err: {}", + attempts, e.extended_code + ); + } + _ => { + return wr; + } } if attempts >= max_write_attempts { return wr; } } - }).await?; + }) + .await?; self.metrics .write_events .observe(start.elapsed().as_secs_f64()); @@ -865,7 +872,8 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec>, Option = ks.iter().map(std::string::ToString::to_string).collect(); + let str_kinds: Vec = + ks.iter().map(std::string::ToString::to_string).collect(); kind_clause = format!("AND kind IN ({})", str_kinds.join(", ")); } else { kind_clause = format!(""); @@ -998,7 +1006,11 @@ pub fn build_pool( } /// Cleanup expired events on a regular basis -async fn cleanup_expired(pool: SqlitePool, frequency: Duration, write_in_progress: Arc>) -> Result<()> { +async fn cleanup_expired( + pool: SqlitePool, + frequency: Duration, + write_in_progress: Arc>, +) -> Result<()> { tokio::task::spawn(async move { loop { tokio::select! { @@ -1050,7 +1062,8 @@ pub async fn db_checkpoint_task( pool: SqlitePool, frequency: Duration, write_in_progress: Arc>, - checkpoint_in_progress: Arc>) -> Result<()> { + checkpoint_in_progress: Arc>, +) -> Result<()> { // TODO; use acquire_many on the reader semaphore to stop them from interrupting this. tokio::task::spawn(async move { // WAL size in pages. diff --git a/src/repo/sqlite_migration.rs b/src/repo/sqlite_migration.rs index c0c4f1c..b058e30 100644 --- a/src/repo/sqlite_migration.rs +++ b/src/repo/sqlite_migration.rs @@ -4,13 +4,13 @@ use crate::error::Result; use crate::event::{single_char_tagname, Event}; use crate::utils::is_lower_hex; use const_format::formatcp; +use indicatif::{ProgressBar, ProgressStyle}; use rusqlite::limits::Limit; use rusqlite::params; use rusqlite::Connection; use std::cmp::Ordering; use std::time::Instant; use tracing::{debug, error, info}; -use indicatif::{ProgressBar, ProgressStyle}; /// Startup DB Pragmas pub const STARTUP_SQL: &str = r##" @@ -692,22 +692,22 @@ CREATE INDEX IF NOT EXISTS tag_covering_index ON tag(name,kind,value,created_at, let start = Instant::now(); let tx = conn.transaction()?; - let bar = ProgressBar::new(count.try_into().unwrap()) - .with_message("rebuilding tags table"); + let bar = ProgressBar::new(count.try_into().unwrap()).with_message("rebuilding tags table"); bar.set_style( ProgressStyle::with_template( "[{elapsed_precise}] {bar:40.white/blue} {pos:>7}/{len:7} [{percent}%] {msg}", ) - .unwrap(), + .unwrap(), ); { tx.execute_batch(upgrade_sql)?; - let mut stmt = tx.prepare("select id, kind, created_at, content from event order by id;")?; + let mut stmt = + tx.prepare("select id, kind, created_at, content from event order by id;")?; let mut tag_rows = stmt.query([])?; let mut count = 0; while let Some(row) = tag_rows.next()? { count += 1; - if count%10==0 { + if count % 10 == 0 { bar.inc(10); } let event_id: u64 = row.get(0)?; @@ -735,7 +735,10 @@ CREATE INDEX IF NOT EXISTS tag_covering_index ON tag(name,kind,value,created_at, } bar.finish(); tx.commit()?; - info!("database schema upgraded v15 -> v16 in {:?}", start.elapsed()); + info!( + "database schema upgraded v15 -> v16 in {:?}", + start.elapsed() + ); Ok(16) } diff --git a/src/server.rs b/src/server.rs index 7a8dffa..e8d40f4 100644 --- a/src/server.rs +++ b/src/server.rs @@ -6,14 +6,15 @@ use crate::conn; use crate::db; use crate::db::SubmittedEvent; use crate::error::{Error, Result}; -use crate::event::EventWrapper; -use crate::server::EventWrapper::{WrappedAuth, WrappedEvent}; use crate::event::Event; use crate::event::EventCmd; +use crate::event::EventWrapper; use crate::info::RelayInfo; use crate::nip05; use crate::notice::Notice; use crate::repo::NostrRepo; +use crate::server::Error::CommandUnknownError; +use crate::server::EventWrapper::{WrappedAuth, WrappedEvent}; use crate::subscription::Subscription; use futures::SinkExt; use futures::StreamExt; @@ -53,7 +54,6 @@ use tungstenite::error::Error as WsError; use tungstenite::handshake; use tungstenite::protocol::Message; use tungstenite::protocol::WebSocketConfig; -use crate::server::Error::CommandUnknownError; /// Handle arbitrary HTTP requests, including for `WebSocket` upgrades. #[allow(clippy::too_many_arguments)] @@ -197,17 +197,17 @@ async fn handle_web_request( if let Some(favicon_bytes) = favicon { info!("returning favicon"); Ok(Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "image/x-icon") - // 1 month cache - .header("Cache-Control", "public, max-age=2419200") - .body(Body::from(favicon_bytes)) - .unwrap()) + .status(StatusCode::OK) + .header("Content-Type", "image/x-icon") + // 1 month cache + .header("Cache-Control", "public, max-age=2419200") + .body(Body::from(favicon_bytes)) + .unwrap()) } else { Ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::from("")) - .unwrap()) + .status(StatusCode::NOT_FOUND) + .body(Body::from("")) + .unwrap()) } } (_, _) => { @@ -283,15 +283,20 @@ fn create_metrics() -> (Registry, NostrMetrics) { let query_aborts = IntCounterVec::new( Opts::new("nostr_query_abort_total", "Aborted queries"), vec!["reason"].as_slice(), - ).unwrap(); + ) + .unwrap(); let cmd_req = IntCounter::with_opts(Opts::new("nostr_cmd_req_total", "REQ commands")).unwrap(); let cmd_event = IntCounter::with_opts(Opts::new("nostr_cmd_event_total", "EVENT commands")).unwrap(); let cmd_close = IntCounter::with_opts(Opts::new("nostr_cmd_close_total", "CLOSE commands")).unwrap(); - let cmd_auth = IntCounter::with_opts(Opts::new("nostr_cmd_auth_total", "AUTH commands")).unwrap(); - let disconnects = IntCounterVec::new(Opts::new("nostr_disconnects_total", "Client disconnects"), - vec!["reason"].as_slice()).unwrap(); + let cmd_auth = + IntCounter::with_opts(Opts::new("nostr_cmd_auth_total", "AUTH commands")).unwrap(); + let disconnects = IntCounterVec::new( + Opts::new("nostr_disconnects_total", "Client disconnects"), + vec!["reason"].as_slice(), + ) + .unwrap(); registry.register(Box::new(query_sub.clone())).unwrap(); registry.register(Box::new(query_db.clone())).unwrap(); registry.register(Box::new(write_events.clone())).unwrap(); @@ -313,9 +318,9 @@ fn create_metrics() -> (Registry, NostrMetrics) { db_connections, disconnects, query_aborts, - cmd_req, - cmd_event, - cmd_close, + cmd_req, + cmd_event, + cmd_close, cmd_auth, }; (registry, metrics) @@ -650,9 +655,7 @@ async fn nostr_server( let unspec = "".to_string(); info!("new client connection (cid: {}, ip: {:?})", cid, conn.ip()); let origin = client_info.origin.as_ref().unwrap_or_else(|| &unspec); - let user_agent = client_info - .user_agent.as_ref() - .unwrap_or_else(|| &unspec); + let user_agent = client_info.user_agent.as_ref().unwrap_or_else(|| &unspec); info!( "cid: {}, origin: {:?}, user-agent: {:?}", cid, origin, user_agent @@ -664,8 +667,12 @@ async fn nostr_server( if settings.authorization.nip42_auth { conn.generate_auth_challenge(); if let Some(challenge) = conn.auth_challenge() { - ws_stream.send( - make_notice_message(&Notice::AuthChallenge(challenge.to_string()))).await.ok(); + ws_stream + .send(make_notice_message(&Notice::AuthChallenge( + challenge.to_string(), + ))) + .await + .ok(); } } diff --git a/src/utils.rs b/src/utils.rs index c080aea..b037c97 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -38,7 +38,9 @@ pub fn is_lower_hex(s: &str) -> bool { } pub fn host_str(url: &String) -> Option { - Url::parse(url).ok().and_then(|u| u.host_str().map(|s| s.to_string())) + Url::parse(url) + .ok() + .and_then(|u| u.host_str().map(|s| s.to_string())) } #[cfg(test)]