diff --git a/src/conn.rs b/src/conn.rs index 42de4f3..67c3276 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -1,5 +1,6 @@ use crate::close::Close; use crate::error::Result; +use crate::event::Event; use crate::subscription::Subscription; use log::*; use std::collections::HashMap; @@ -10,7 +11,7 @@ const MAX_SUBSCRIPTION_ID_LEN: usize = 256; // state for a client connection pub struct ClientConn { - _client_id: Uuid, + client_id: Uuid, // current set of subscriptions subscriptions: HashMap, // websocket @@ -22,12 +23,26 @@ impl ClientConn { pub fn new() -> Self { let client_id = Uuid::new_v4(); ClientConn { - _client_id: client_id, + client_id: client_id, subscriptions: HashMap::new(), max_subs: 128, } } + pub fn get_client_prefix(&self) -> String { + self.client_id.to_string().chars().take(8).collect() + } + + // return the first subscription that matches the event. + pub fn get_matching_subscription(&self, e: &Event) -> Option<&str> { + for (id, sub) in self.subscriptions.iter() { + if sub.interested_in_event(e) { + return Some(id); + } + } + None + } + pub fn subscribe(&mut self, s: Subscription) -> Result<()> { let k = s.get_id(); let sub_id_len = k.len(); diff --git a/src/event.rs b/src/event.rs index 971c1f4..a7d029c 100644 --- a/src/event.rs +++ b/src/event.rs @@ -52,6 +52,11 @@ impl From for Result { } impl Event { + // get short event identifer + pub fn get_event_id_prefix(&self) -> String { + self.id.chars().take(8).collect() + } + // check if this event is valid (should be propagated, stored) based on signature. fn is_valid(&self) -> bool { // validation is performed by: diff --git a/src/main.rs b/src/main.rs index 1c35aaf..e8c8d44 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +use futures::SinkExt; use futures::StreamExt; use log::*; use nostr_rs_relay::close::Close; @@ -6,6 +7,7 @@ use nostr_rs_relay::error::{Error, Result}; use nostr_rs_relay::event::Event; use nostr_rs_relay::protostream; use nostr_rs_relay::protostream::NostrMessage::*; +use nostr_rs_relay::protostream::NostrResponse::*; use rusqlite::Result as SQLResult; use std::env; use tokio::net::{TcpListener, TcpStream}; @@ -59,7 +61,7 @@ async fn nostr_server( ) { // get a broadcast channel for clients to communicate on // wrap the TCP stream in a websocket. - let mut _bcast_rx = broadcast.subscribe(); + let mut bcast_rx = broadcast.subscribe(); let conn = tokio_tungstenite::accept_async(stream).await; let ws_stream = conn.expect("websocket handshake error"); // a stream & sink of Nostr protocol messages @@ -71,6 +73,26 @@ async fn nostr_server( let mut conn_good = true; loop { tokio::select! { + Ok(global_event) = bcast_rx.recv() => { + // ignoring closed broadcast errors, there will always be one sender available. + // Is there a subscription for this event? + let sub_name_opt = conn.get_matching_subscription(&global_event); + if sub_name_opt.is_none() { + return; + } else { + let sub_name = sub_name_opt.unwrap(); + let event_str = serde_json::to_string(&global_event); + if event_str.is_ok() { + info!("sub match: client: {}, sub: {}, event: {}", + conn.get_client_prefix(), sub_name, + global_event.get_event_id_prefix()); + // create an event response and send it + let res = EventRes(sub_name.to_owned(),event_str.unwrap()); + nostr_stream.send(res).await.ok(); + } + } + }, + // check if this client has a subscription proto_next = nostr_stream.next() => { match proto_next { Some(Ok(EventMsg(ec))) => { @@ -80,7 +102,13 @@ async fn nostr_server( match parsed { Ok(e) => { let id_prefix:String = e.id.chars().take(8).collect(); - info!("Successfully parsed/validated event: {}", id_prefix)}, + info!("Successfully parsed/validated event: {}", id_prefix); + // send this event to everyone listening. + let bcast_res = broadcast.send(e); + if bcast_res.is_err() { + warn!("Could not send broadcast message: {:?}", bcast_res); + } + }, Err(_) => {info!("Invalid event ignored")} } }, diff --git a/src/protostream.rs b/src/protostream.rs index 2d5ec33..722eee2 100644 --- a/src/protostream.rs +++ b/src/protostream.rs @@ -25,8 +25,10 @@ pub enum NostrMessage { // Either an event w/ subscription, or a notice #[derive(Deserialize, Serialize, Clone, PartialEq, Debug)] -enum NostrResponse { - Notice(String), +pub enum NostrResponse { + NoticeRes(String), + // A subscription identifier and serialized response + EventRes(String, String), } // A Nostr protocol stream is layered on top of a Websocket stream. @@ -84,8 +86,20 @@ impl Sink for NostrStream { } fn start_send(mut self: Pin<&mut Self>, item: NostrResponse) -> Result<(), Self::Error> { - let res_message = serde_json::to_string(&item).expect("Could convert message to string"); - match Pin::new(&mut self.ws_stream).start_send(Message::Text(res_message)) { + //let res_message = serde_json::to_string(&item).expect("Could convert message to string"); + // create the string to send. + // TODO: do real escaping for both of these. Currently output isn't correctly escaped. + let send_str = match item { + NostrResponse::NoticeRes(msg) => { + let s = msg.replace("\"", ""); + format!("[\"NOTICE\",\"{}\"]", s) + } + NostrResponse::EventRes(sub, eventstr) => { + let subesc = sub.replace("\"", ""); + format!("[\"EVENT\",\"{}\",{}]", subesc, eventstr) + } + }; + match Pin::new(&mut self.ws_stream).start_send(Message::Text(send_str)) { Ok(()) => Ok(()), Err(_) => Err(Error::ConnWriteError), }