diff --git a/config.toml b/config.toml index cbe91b2..2491761 100644 --- a/config.toml +++ b/config.toml @@ -80,6 +80,7 @@ reject_future_seconds = 1800 # metadata for event authors, "passive" to perform validation but # never block publishing, or "disabled" to do nothing. #mode = "disabled" +mode = "passive" # Domain names that will be prevented from publishing events. #domain_blacklist = ["wellorder.net"] diff --git a/src/db.rs b/src/db.rs index 3643575..53be636 100644 --- a/src/db.rs +++ b/src/db.rs @@ -27,6 +27,13 @@ use tokio::task; pub type SqlitePool = r2d2::Pool; pub type PooledConnection = r2d2::PooledConnection; + +/// Events submitted from a client, with a return channel for notices +pub struct SubmittedEvent { + pub event: Event, + pub notice_tx: tokio::sync::mpsc::Sender, +} + /// Database file pub const DB_FILE: &str = "nostr.db"; @@ -76,7 +83,7 @@ pub fn build_conn(flags: OpenFlags) -> Result { /// Spawn a database writer that persists events to the SQLite store. pub async fn db_writer( - mut event_rx: tokio::sync::mpsc::Receiver, + mut event_rx: tokio::sync::mpsc::Receiver, bcast_tx: tokio::sync::broadcast::Sender, metadata_tx: tokio::sync::broadcast::Sender, mut shutdown: tokio::sync::broadcast::Receiver<()>, @@ -131,18 +138,20 @@ pub async fn db_writer( break; } let mut event_write = false; - let event = next_event.unwrap(); - + let subm_event = next_event.unwrap(); + let event = subm_event.event; + let notice_tx = subm_event.notice_tx; // check if this event is authorized. if let Some(allowed_addrs) = whitelist { - debug!("Checking against pubkey whitelist"); // if the event address is not in allowed_addrs. if !allowed_addrs.contains(&event.pubkey) { info!( "Rejecting event {}, unauthorized author", event.get_event_id_prefix() ); - // TODO: define a channel that can send NOTICEs back to the client. + notice_tx + .try_send("pubkey is not allowed to publish to this relay".to_owned()) + .ok(); continue; } } @@ -171,6 +180,12 @@ pub async fn db_writer( uv.name.to_string(), event.get_author_prefix() ); + notice_tx + .try_send( + "NIP-05 verification is no longer valid (expired/wrong domain)" + .to_owned(), + ) + .ok(); continue; } } @@ -179,6 +194,9 @@ pub async fn db_writer( "no verification records found for pubkey: {:?}", event.get_author_prefix() ); + notice_tx + .try_send("NIP-05 verification needed to publish events".to_owned()) + .ok(); continue; } Err(e) => { @@ -207,6 +225,12 @@ pub async fn db_writer( } Err(err) => { warn!("event insert failed: {:?}", err); + notice_tx + .try_send( + "relay experienced an error trying to publish the latest event" + .to_owned(), + ) + .ok(); } } diff --git a/src/main.rs b/src/main.rs index 065fa95..62613c6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,6 +13,7 @@ use nostr_rs_relay::close::CloseCmd; use nostr_rs_relay::config; use nostr_rs_relay::conn; use nostr_rs_relay::db; +use nostr_rs_relay::db::SubmittedEvent; use nostr_rs_relay::error::{Error, Result}; use nostr_rs_relay::event::Event; use nostr_rs_relay::event::EventCmd; @@ -51,7 +52,7 @@ async fn handle_web_request( pool: db::SqlitePool, remote_addr: SocketAddr, broadcast: Sender, - event_tx: tokio::sync::mpsc::Sender, + event_tx: tokio::sync::mpsc::Sender, shutdown: Receiver<()>, ) -> Result, Infallible> { match ( @@ -232,7 +233,8 @@ fn main() -> Result<(), Error> { let (bcast_tx, _) = broadcast::channel::(settings.limits.broadcast_buffer); // validated events that need to be persisted are sent to the // database on via this channel. - let (event_tx, event_rx) = mpsc::channel::(settings.limits.event_persist_buffer); + let (event_tx, event_rx) = + mpsc::channel::(settings.limits.event_persist_buffer); // establish a channel for letting all threads now about a // requested server shutdown. let (invoke_shutdown, shutdown_listen) = broadcast::channel::<()>(1); @@ -359,7 +361,7 @@ async fn nostr_server( pool: db::SqlitePool, mut ws_stream: WebSocketStream, broadcast: Sender, - event_tx: tokio::sync::mpsc::Sender, + event_tx: mpsc::Sender, mut shutdown: Receiver<()>, ) { // get a broadcast channel for clients to communicate on @@ -370,6 +372,9 @@ async fn nostr_server( // Create a channel for receiving query results from the database. // we will send out the tx handle to any query we generate. let (query_tx, mut query_rx) = mpsc::channel::(256); + // Create channel for receiving NOTICEs + let (notice_tx, mut notice_rx) = mpsc::channel::(32); + // maintain a hashmap of a oneshot channel for active subscriptions. // when these subscriptions are cancelled, make a message // available to the executing query so it knows to stop. @@ -408,9 +413,12 @@ async fn nostr_server( // Send a ping ws_stream.send(Message::Ping(Vec::new())).await.ok(); }, + Some(notice_msg) = notice_rx.recv() => { + let n = notice_msg.to_string().replace("\"", ""); + ws_stream.send(Message::Text(format!("[\"NOTICE\",\"{}\"]", n))).await.ok(); + }, Some(query_result) = query_rx.recv() => { // database informed us of a query result we asked for - //let res = EventRes(query_result.sub_id,query_result.event); client_received_event_count += 1; // send a result let subesc = query_result.sub_id.replace("\"", ""); @@ -474,14 +482,9 @@ async fn nostr_server( Ok(e) => { let id_prefix:String = e.id.chars().take(8).collect(); debug!("successfully parsed/validated event: {:?} from client: {:?}", id_prefix, cid); - // TODO: consider moving some/all - // authorization checks here, instead - // of the DB module, so we can send a - // proper NOTICE back to the client if - // they are unable to write. - - // Write this to the database - event_tx.send(e.clone()).await.ok(); + // Write this to the database. + let submit_event = SubmittedEvent { event: e.clone(), notice_tx: notice_tx.clone() }; + event_tx.send(submit_event).await.ok(); client_published_event_count += 1; }, Err(_) => {