diff --git a/src/db.rs b/src/db.rs index cfd2324..b3bbb31 100644 --- a/src/db.rs +++ b/src/db.rs @@ -6,6 +6,7 @@ use crate::event::{single_char_tagname, Event}; use crate::hexrange::hex_range; use crate::hexrange::HexSearch; use crate::nip05; +use crate::notice::Notice; use crate::schema::{upgrade_db, STARTUP_SQL}; use crate::subscription::ReqFilter; use crate::subscription::Subscription; @@ -32,7 +33,7 @@ pub type PooledConnection = r2d2::PooledConnection, + pub notice_tx: tokio::sync::mpsc::Sender, } /// Database file @@ -158,7 +159,10 @@ pub async fn db_writer( event.get_event_id_prefix() ); notice_tx - .try_send("pubkey is not allowed to publish to this relay".to_owned()) + .try_send(Notice::blocked( + event.id, + "pubkey is not allowed to publish to this relay", + )) .ok(); continue; } @@ -189,10 +193,10 @@ pub async fn db_writer( event.get_author_prefix() ); notice_tx - .try_send( - "NIP-05 verification is no longer valid (expired/wrong domain)" - .to_owned(), - ) + .try_send(Notice::blocked( + event.id, + "NIP-05 verification is no longer valid (expired/wrong domain)", + )) .ok(); continue; } @@ -203,7 +207,10 @@ pub async fn db_writer( event.get_author_prefix() ); notice_tx - .try_send("NIP-05 verification needed to publish events".to_owned()) + .try_send(Notice::blocked( + event.id, + "NIP-05 verification needed to publish events", + )) .ok(); continue; } @@ -229,6 +236,7 @@ pub async fn db_writer( Ok(updated) => { if updated == 0 { trace!("ignoring duplicate or deleted event"); + notice_tx.try_send(Notice::duplicate(event.id)).ok(); } else { info!( "persisted event: {:?} from: {:?} in: {:?}", @@ -239,16 +247,13 @@ pub async fn db_writer( event_write = true; // send this out to all clients bcast_tx.send(event.clone()).ok(); + notice_tx.try_send(Notice::saved(event.id)).ok(); } } Err(err) => { warn!("event insert failed: {:?}", err); - notice_tx - .try_send( - "relay experienced an error trying to publish the latest event" - .to_owned(), - ) - .ok(); + let msg = "relay experienced an error trying to publish the latest event"; + notice_tx.try_send(Notice::error(event.id, msg)).ok(); } } } diff --git a/src/lib.rs b/src/lib.rs index dc00507..5341061 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ pub mod event; pub mod hexrange; pub mod info; pub mod nip05; +pub mod notice; pub mod schema; pub mod subscription; pub mod utils; diff --git a/src/notice.rs b/src/notice.rs new file mode 100644 index 0000000..f229eb5 --- /dev/null +++ b/src/notice.rs @@ -0,0 +1,86 @@ +pub enum EventResultStatus { + Saved, + Duplicate, + Invalid, + Blocked, + RateLimited, + Error, +} + +pub struct EventResult { + pub id: String, + pub msg: String, + pub status: EventResultStatus, +} + +pub enum Notice { + Message(String), + EventResult(EventResult), +} + +impl EventResultStatus { + pub fn to_bool(&self) -> bool { + match self { + Self::Saved => true, + Self::Duplicate => true, + Self::Invalid => false, + Self::Blocked => false, + Self::RateLimited => false, + Self::Error => false, + } + } + + pub fn prefix(&self) -> &'static str { + match self { + Self::Saved => "saved", + Self::Duplicate => "duplicate", + Self::Invalid => "invalid", + Self::Blocked => "blocked", + Self::RateLimited => "rate-limited", + Self::Error => "error", + } + } +} + +impl Notice { + //pub fn err(err: error::Error, id: String) -> Notice { + // Notice::err_msg(format!("{}", err), id) + //} + + pub fn message(msg: String) -> Notice { + Notice::Message(msg) + } + + fn prefixed(id: String, msg: &str, status: EventResultStatus) -> Notice { + let msg = format!("{}: {}", status.prefix(), msg); + Notice::EventResult(EventResult { id, msg, status }) + } + + pub fn invalid(id: String, msg: &str) -> Notice { + Notice::prefixed(id, msg, EventResultStatus::Invalid) + } + + pub fn blocked(id: String, msg: &str) -> Notice { + Notice::prefixed(id, msg, EventResultStatus::Blocked) + } + + pub fn rate_limited(id: String, msg: &str) -> Notice { + Notice::prefixed(id, msg, EventResultStatus::RateLimited) + } + + pub fn duplicate(id: String) -> Notice { + Notice::prefixed(id, "", EventResultStatus::Duplicate) + } + + pub fn error(id: String, msg: &str) -> Notice { + Notice::prefixed(id, msg, EventResultStatus::Error) + } + + pub fn saved(id: String) -> Notice { + Notice::EventResult(EventResult { + id, + msg: "".into(), + status: EventResultStatus::Saved, + }) + } +} diff --git a/src/server.rs b/src/server.rs index c50b00a..29337a2 100644 --- a/src/server.rs +++ b/src/server.rs @@ -10,6 +10,7 @@ use crate::event::Event; use crate::event::EventCmd; use crate::info::RelayInfo; use crate::nip05; +use crate::notice::Notice; use crate::subscription::Subscription; use futures::SinkExt; use futures::StreamExt; @@ -405,8 +406,13 @@ fn convert_to_msg(msg: String, max_bytes: Option) -> Result } /// Turn a string into a NOTICE message ready to send over a WebSocket -fn make_notice_message(msg: &str) -> Message { - Message::text(json!(["NOTICE", msg]).to_string()) +fn make_notice_message(notice: Notice) -> Message { + let json = match notice { + Notice::Message(ref msg) => json!(["NOTICE", msg]), + Notice::EventResult(ref res) => json!(["OK", res.id, res.status.to_bool(), res.msg]), + }; + + Message::text(json.to_string()) } struct ClientInfo { @@ -435,7 +441,7 @@ async fn nostr_server( // we will send out the tx handle to any query we generate. let (query_tx, mut query_rx) = mpsc::channel::(256); // Create channel for receiving NOTICEs - let (notice_tx, mut notice_rx) = mpsc::channel::(32); + let (notice_tx, mut notice_rx) = mpsc::channel::(32); // last time this client sent data (message, ping, etc.) let mut last_message_time = Instant::now(); @@ -480,7 +486,7 @@ async fn nostr_server( ws_stream.send(Message::Ping(Vec::new())).await.ok(); }, Some(notice_msg) = notice_rx.recv() => { - ws_stream.send(make_notice_message(¬ice_msg)).await.ok(); + ws_stream.send(make_notice_message(notice_msg)).await.ok(); }, Some(query_result) = query_rx.recv() => { // database informed us of a query result we asked for @@ -528,7 +534,7 @@ async fn nostr_server( }, Some(Ok(Message::Binary(_))) => { ws_stream.send( - make_notice_message("binary messages are not accepted")).await.ok(); + make_notice_message(Notice::message("binary messages are not accepted".into()))).await.ok(); continue; }, Some(Ok(Message::Ping(_) | Message::Pong(_))) => { @@ -538,8 +544,7 @@ async fn nostr_server( }, Some(Err(WsError::Capacity(MessageTooLong{size, max_size}))) => { ws_stream.send( - make_notice_message( - &format!("message too large ({} > {})",size, max_size))).await.ok(); + make_notice_message(Notice::message(format!("message too large ({} > {})",size, max_size)))).await.ok(); continue; }, None | @@ -581,13 +586,15 @@ async fn nostr_server( } else { info!("client: {} sent a far future-dated event", cid); if let Some(fut_sec) = settings.options.reject_future_seconds { - ws_stream.send(make_notice_message(&format!("The event created_at field is out of the acceptable range (+{}sec) for this relay and was not stored.",fut_sec))).await.ok(); + let msg = format!("The event created_at field is out of the acceptable range (+{}sec) for this relay.",fut_sec); + let notice = Notice::invalid(e.id, &msg); + ws_stream.send(make_notice_message(notice)).await.ok(); } } }, Err(_) => { info!("client: {} sent an invalid event", cid); - ws_stream.send(make_notice_message("event was invalid")).await.ok(); + ws_stream.send(make_notice_message(Notice::message("event was invalid".into()))).await.ok(); } } }, @@ -609,7 +616,7 @@ async fn nostr_server( }, Err(e) => { info!("Subscription error: {}", e); - ws_stream.send(make_notice_message(&e.to_string())).await.ok(); + ws_stream.send(make_notice_message(Notice::message(format!("Subscription error: {}", e)))).await.ok(); } } }, @@ -628,7 +635,7 @@ async fn nostr_server( conn.unsubscribe(&c); } else { info!("invalid command ignored"); - ws_stream.send(make_notice_message("could not parse command")).await.ok(); + ws_stream.send(make_notice_message(Notice::message("could not parse command".into()))).await.ok(); } }, Err(Error::ConnError) => { @@ -637,11 +644,11 @@ async fn nostr_server( } Err(Error::EventMaxLengthError(s)) => { info!("client: {} sent event larger ({} bytes) than max size", cid, s); - ws_stream.send(make_notice_message("event exceeded max size")).await.ok(); + ws_stream.send(make_notice_message(Notice::message("event exceeded max size".into()))).await.ok(); }, Err(Error::ProtoParseError) => { info!("client {} sent event that could not be parsed", cid); - ws_stream.send(make_notice_message("could not parse command")).await.ok(); + ws_stream.send(make_notice_message(Notice::message("could not parse command".into()))).await.ok(); }, Err(e) => { info!("got non-fatal error from client: {}, error: {:?}", cid, e);