diff --git a/src/db.rs b/src/db.rs index cfd2324..4a93d62 100644 --- a/src/db.rs +++ b/src/db.rs @@ -6,6 +6,7 @@ use crate::event::{single_char_tagname, Event}; use crate::hexrange::hex_range; use crate::hexrange::HexSearch; use crate::nip05; +use crate::notice::Notice; use crate::schema::{upgrade_db, STARTUP_SQL}; use crate::subscription::ReqFilter; use crate::subscription::Subscription; @@ -32,7 +33,7 @@ pub type PooledConnection = r2d2::PooledConnection, + pub notice_tx: tokio::sync::mpsc::Sender, } /// Database file @@ -158,7 +159,9 @@ pub async fn db_writer( event.get_event_id_prefix() ); notice_tx - .try_send("pubkey is not allowed to publish to this relay".to_owned()) + .try_send(Notice::message( + "pubkey is not allowed to publish to this relay".to_owned(), + )) .ok(); continue; } @@ -189,10 +192,10 @@ pub async fn db_writer( event.get_author_prefix() ); notice_tx - .try_send( + .try_send(Notice::message( "NIP-05 verification is no longer valid (expired/wrong domain)" .to_owned(), - ) + )) .ok(); continue; } @@ -203,7 +206,9 @@ pub async fn db_writer( event.get_author_prefix() ); notice_tx - .try_send("NIP-05 verification needed to publish events".to_owned()) + .try_send(Notice::message( + "NIP-05 verification needed to publish events".to_owned(), + )) .ok(); continue; } @@ -229,6 +234,7 @@ pub async fn db_writer( Ok(updated) => { if updated == 0 { trace!("ignoring duplicate or deleted event"); + notice_tx.try_send(Notice::duplicate(event.id)).ok(); } else { info!( "persisted event: {:?} from: {:?} in: {:?}", @@ -239,16 +245,14 @@ pub async fn db_writer( event_write = true; // send this out to all clients bcast_tx.send(event.clone()).ok(); + notice_tx.try_send(Notice::saved(event.id)).ok(); } } Err(err) => { warn!("event insert failed: {:?}", err); - notice_tx - .try_send( - "relay experienced an error trying to publish the latest event" - .to_owned(), - ) - .ok(); + let msg = + "relay experienced an error trying to publish the latest event".into(); + notice_tx.try_send(Notice::err_msg(msg, event.id)).ok(); } } } diff --git a/src/lib.rs b/src/lib.rs index dc00507..5341061 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ pub mod event; pub mod hexrange; pub mod info; pub mod nip05; +pub mod notice; pub mod schema; pub mod subscription; pub mod utils; diff --git a/src/notice.rs b/src/notice.rs new file mode 100644 index 0000000..acfb794 --- /dev/null +++ b/src/notice.rs @@ -0,0 +1,48 @@ +use crate::error; + +pub enum EventResultStatus { + Saved, + Duplicate, + Error(String), +} + +pub struct EventResult { + pub id: String, + pub status: EventResultStatus, +} + +pub enum Notice { + Message(String), + EventResult(EventResult), +} + +impl Notice { + pub fn err(err: error::Error, id: String) -> Notice { + Notice::err_msg(format!("{}", err), id) + } + + pub fn message(msg: String) -> Notice { + Notice::Message(msg) + } + + pub fn saved(id: String) -> Notice { + Notice::EventResult(EventResult { + id, + status: EventResultStatus::Saved, + }) + } + + pub fn duplicate(id: String) -> Notice { + Notice::EventResult(EventResult { + id, + status: EventResultStatus::Duplicate, + }) + } + + pub fn err_msg(msg: String, id: String) -> Notice { + Notice::EventResult(EventResult { + id, + status: EventResultStatus::Error(msg), + }) + } +} diff --git a/src/server.rs b/src/server.rs index c50b00a..eecd0a1 100644 --- a/src/server.rs +++ b/src/server.rs @@ -10,6 +10,7 @@ use crate::event::Event; use crate::event::EventCmd; use crate::info::RelayInfo; use crate::nip05; +use crate::notice::{EventResultStatus, Notice}; use crate::subscription::Subscription; use futures::SinkExt; use futures::StreamExt; @@ -405,8 +406,17 @@ fn convert_to_msg(msg: String, max_bytes: Option) -> Result } /// Turn a string into a NOTICE message ready to send over a WebSocket -fn make_notice_message(msg: &str) -> Message { - Message::text(json!(["NOTICE", msg]).to_string()) +fn make_notice_message(notice: Notice) -> Message { + let json = match notice { + Notice::Message(ref msg) => json!(["NOTICE", msg]), + Notice::EventResult(ref res) => match &res.status { + EventResultStatus::Saved => json!(["OK", res.id, "true"]), + EventResultStatus::Duplicate => json!(["OK", res.id, "true", "duplicate"]), + EventResultStatus::Error(msg) => json!(["OK", res.id, "false", msg]), + }, + }; + + Message::text(json.to_string()) } struct ClientInfo { @@ -435,7 +445,7 @@ async fn nostr_server( // we will send out the tx handle to any query we generate. let (query_tx, mut query_rx) = mpsc::channel::(256); // Create channel for receiving NOTICEs - let (notice_tx, mut notice_rx) = mpsc::channel::(32); + let (notice_tx, mut notice_rx) = mpsc::channel::(32); // last time this client sent data (message, ping, etc.) let mut last_message_time = Instant::now(); @@ -480,7 +490,7 @@ async fn nostr_server( ws_stream.send(Message::Ping(Vec::new())).await.ok(); }, Some(notice_msg) = notice_rx.recv() => { - ws_stream.send(make_notice_message(¬ice_msg)).await.ok(); + ws_stream.send(make_notice_message(notice_msg)).await.ok(); }, Some(query_result) = query_rx.recv() => { // database informed us of a query result we asked for @@ -528,7 +538,7 @@ async fn nostr_server( }, Some(Ok(Message::Binary(_))) => { ws_stream.send( - make_notice_message("binary messages are not accepted")).await.ok(); + make_notice_message(Notice::message("binary messages are not accepted".into()))).await.ok(); continue; }, Some(Ok(Message::Ping(_) | Message::Pong(_))) => { @@ -538,8 +548,7 @@ async fn nostr_server( }, Some(Err(WsError::Capacity(MessageTooLong{size, max_size}))) => { ws_stream.send( - make_notice_message( - &format!("message too large ({} > {})",size, max_size))).await.ok(); + make_notice_message(Notice::message(format!("message too large ({} > {})",size, max_size)))).await.ok(); continue; }, None | @@ -581,13 +590,15 @@ async fn nostr_server( } else { info!("client: {} sent a far future-dated event", cid); if let Some(fut_sec) = settings.options.reject_future_seconds { - ws_stream.send(make_notice_message(&format!("The event created_at field is out of the acceptable range (+{}sec) for this relay and was not stored.",fut_sec))).await.ok(); + let msg = format!("The event created_at field is out of the acceptable range (+{}sec) for this relay and was not stored.",fut_sec); + let notice = Notice::err_msg(msg, e.id); + ws_stream.send(make_notice_message(notice)).await.ok(); } } }, Err(_) => { info!("client: {} sent an invalid event", cid); - ws_stream.send(make_notice_message("event was invalid")).await.ok(); + ws_stream.send(make_notice_message(Notice::message("event was invalid".into()))).await.ok(); } } }, @@ -609,7 +620,7 @@ async fn nostr_server( }, Err(e) => { info!("Subscription error: {}", e); - ws_stream.send(make_notice_message(&e.to_string())).await.ok(); + ws_stream.send(make_notice_message(Notice::err(e, s.id))).await.ok(); } } }, @@ -628,7 +639,7 @@ async fn nostr_server( conn.unsubscribe(&c); } else { info!("invalid command ignored"); - ws_stream.send(make_notice_message("could not parse command")).await.ok(); + ws_stream.send(make_notice_message(Notice::message("could not parse command".into()))).await.ok(); } }, Err(Error::ConnError) => { @@ -637,11 +648,11 @@ async fn nostr_server( } Err(Error::EventMaxLengthError(s)) => { info!("client: {} sent event larger ({} bytes) than max size", cid, s); - ws_stream.send(make_notice_message("event exceeded max size")).await.ok(); + ws_stream.send(make_notice_message(Notice::message("event exceeded max size".into()))).await.ok(); }, Err(Error::ProtoParseError) => { info!("client {} sent event that could not be parsed", cid); - ws_stream.send(make_notice_message("could not parse command")).await.ok(); + ws_stream.send(make_notice_message(Notice::message("could not parse command".into()))).await.ok(); }, Err(e) => { info!("got non-fatal error from client: {}, error: {:?}", cid, e);