From 1a0f66a518d4dd57ad0bda63b0a2d17710c7da1d Mon Sep 17 00:00:00 2001 From: Greg Heartsfield Date: Wed, 24 Nov 2021 16:07:35 -0600 Subject: [PATCH] Perform subscribe/unsubscribe from websocket --- src/main.rs | 4 ++-- src/proto.rs | 24 +++++++++++++++++++----- src/subscription.rs | 3 +++ 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/src/main.rs b/src/main.rs index 3b63be8..909d426 100644 --- a/src/main.rs +++ b/src/main.rs @@ -72,7 +72,7 @@ async fn nostr_server(stream: TcpStream) { // Handles valid clients who have upgraded to WebSockets async fn process_client(stream: WebSocketStream) { // get a protocol helper; - let proto = Proto::new(); + let mut proto = Proto::new(); let (mut write, mut read) = stream.split(); // TODO: select on a timeout to kill non-responsive clients @@ -91,7 +91,7 @@ async fn process_client(stream: WebSocketStream) { .await .expect("send failed"); // Handle this request. Everything else below is basically websocket error handling. - proto.process_message(cmd); + proto.process_message(cmd).ok(); } Ok(Message::Binary(_)) => { info!("Ignoring Binary message"); diff --git a/src/proto.rs b/src/proto.rs index 1ac75ce..e6dc070 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -27,13 +27,20 @@ impl Proto { p } - pub fn process_message(self: &Self, cmd: String) { + // TODO: figure out NOTICE handling for errors here + pub fn process_message(&mut self, cmd: String) -> Result<()> { info!( "Processing message in proto for client: {:?}", self.client_id ); - // check what kind of message - info!("Parse result: {:?}", parse_type(cmd)); + let message = parse_cmd(cmd)?; + info!("Parsed message: {:?}", message); + match message { + NostrRequest::EvReq(_) => {} + NostrRequest::SubReq(sub) => self.subscribe(sub), + NostrRequest::CloseReq(close) => self.unsubscribe(close), + }; + Ok(()) } pub fn subscribe(&mut self, s: Subscription) { @@ -61,11 +68,18 @@ impl Proto { } // add subscription self.subscriptions.insert(k, s); + info!( + "Registered new subscription, currently have {} active subs", + self.subscriptions.len() + ); } pub fn unsubscribe(&mut self, c: Close) { self.subscriptions.remove(&c.get_id()); - unimplemented!(); + info!( + "Removed subscription, currently have {} active subs", + self.subscriptions.len() + ); } } @@ -99,7 +113,7 @@ fn msg_type_wrapper(msg: String) -> Result { } } -pub fn parse_type(msg: String) -> Result { +pub fn parse_cmd(msg: String) -> Result { // turn this raw string into a parsed request let typ = msg_type_wrapper(msg)?; match typ { diff --git a/src/subscription.rs b/src/subscription.rs index ad9fdf5..c0f6e38 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -86,6 +86,9 @@ impl Subscription { pub fn get_id(&self) -> String { self.id.clone() } + pub fn get_filter_count(&self) -> usize { + self.filters.len() + } } #[cfg(test)]