diff --git a/src/conn.rs b/src/conn.rs index 68bf5d2..053b777 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -1,7 +1,9 @@ //! Client connection state use crate::close::Close; +use crate::error::Error; use crate::error::Result; use crate::event::Event; + use crate::subscription::Subscription; use log::*; use std::collections::HashMap; @@ -33,7 +35,7 @@ impl ClientConn { ClientConn { client_id, subscriptions: HashMap::new(), - max_subs: 128, + max_subs: 32, } } @@ -65,7 +67,7 @@ impl ClientConn { "ignoring sub request with excessive length: ({})", sub_id_len ); - return Ok(()); + return Err(Error::SubIdMaxLengthError); } // check if an existing subscription exists, and replace if so if self.subscriptions.contains_key(&k) { @@ -77,9 +79,7 @@ impl ClientConn { // check if there is room for another subscription. if self.subscriptions.len() >= self.max_subs { - // TODO: return error/notice for this - info!("client has reached the maximum number of unique subscriptions"); - return Ok(()); + return Err(Error::SubMaxExceededError); } // add subscription self.subscriptions.insert(k, s); diff --git a/src/error.rs b/src/error.rs index 2b8a778..dac58ac 100644 --- a/src/error.rs +++ b/src/error.rs @@ -21,6 +21,10 @@ pub enum Error { CloseParseFailed, #[error("Event validation failed")] EventInvalid, + #[error("Subscription identifier max length exceeded")] + SubIdMaxLengthError, + #[error("Maximum concurrent subscription count reached")] + SubMaxExceededError, // this should be used if the JSON is invalid #[error("JSON parsing failed")] JsonParseFailed(serde_json::Error), diff --git a/src/main.rs b/src/main.rs index 1563c70..452c76c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -156,7 +156,10 @@ async fn nostr_server( event_tx.send(e.clone()).await.ok(); client_published_event_count += 1; }, - Err(_) => {info!("client {} sent an invalid event", cid)} + Err(_) => { + info!("client {} sent an invalid event", cid); + nostr_stream.send(NoticeRes("event was invalid".to_owned())).await.ok(); + } } }, Some(Ok(SubMsg(s))) => { @@ -166,11 +169,18 @@ async fn nostr_server( // * making a channel to cancel to request later // * sending a request for a SQL query let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>(); - running_queries.insert(s.id.to_owned(), abandon_query_tx); - // register this connection - conn.subscribe(s.clone()).ok(); - // start a database query - db::db_query(s, query_tx.clone(), abandon_query_rx).await; + match conn.subscribe(s.clone()) { + Ok(()) => { + running_queries.insert(s.id.to_owned(), abandon_query_tx); + // start a database query + db::db_query(s, query_tx.clone(), abandon_query_rx).await; + }, + Err(e) => { + info!("Subscription error: {}", e); + nostr_stream.send(NoticeRes(format!("{}",e))).await.ok(); + + } + } }, Some(Ok(CloseMsg(cc))) => { // closing a request simply removes the subscription. @@ -187,7 +197,10 @@ async fn nostr_server( // the subscription conn.unsubscribe(c); }, - Err(_) => {info!("invalid command ignored");} + Err(_) => { + info!("invalid command ignored"); + + } } }, None => {