feat: broadcast events that match active client subscriptions

A broadcast channel sends messages to all connections.  Any connection
with a subscription that matches then sends it via websocket.
This commit is contained in:
Greg Heartsfield 2021-12-05 20:28:02 -06:00
parent 8b4c43ae71
commit 23f47899cd
4 changed files with 70 additions and 8 deletions

View File

@ -1,5 +1,6 @@
use crate::close::Close; use crate::close::Close;
use crate::error::Result; use crate::error::Result;
use crate::event::Event;
use crate::subscription::Subscription; use crate::subscription::Subscription;
use log::*; use log::*;
use std::collections::HashMap; use std::collections::HashMap;
@ -10,7 +11,7 @@ const MAX_SUBSCRIPTION_ID_LEN: usize = 256;
// state for a client connection // state for a client connection
pub struct ClientConn { pub struct ClientConn {
_client_id: Uuid, client_id: Uuid,
// current set of subscriptions // current set of subscriptions
subscriptions: HashMap<String, Subscription>, subscriptions: HashMap<String, Subscription>,
// websocket // websocket
@ -22,12 +23,26 @@ impl ClientConn {
pub fn new() -> Self { pub fn new() -> Self {
let client_id = Uuid::new_v4(); let client_id = Uuid::new_v4();
ClientConn { ClientConn {
_client_id: client_id, client_id: client_id,
subscriptions: HashMap::new(), subscriptions: HashMap::new(),
max_subs: 128, max_subs: 128,
} }
} }
pub fn get_client_prefix(&self) -> String {
self.client_id.to_string().chars().take(8).collect()
}
// return the first subscription that matches the event.
pub fn get_matching_subscription(&self, e: &Event) -> Option<&str> {
for (id, sub) in self.subscriptions.iter() {
if sub.interested_in_event(e) {
return Some(id);
}
}
None
}
pub fn subscribe(&mut self, s: Subscription) -> Result<()> { pub fn subscribe(&mut self, s: Subscription) -> Result<()> {
let k = s.get_id(); let k = s.get_id();
let sub_id_len = k.len(); let sub_id_len = k.len();

View File

@ -52,6 +52,11 @@ impl From<EventCmd> for Result<Event> {
} }
impl Event { impl Event {
// get short event identifer
pub fn get_event_id_prefix(&self) -> String {
self.id.chars().take(8).collect()
}
// check if this event is valid (should be propagated, stored) based on signature. // check if this event is valid (should be propagated, stored) based on signature.
fn is_valid(&self) -> bool { fn is_valid(&self) -> bool {
// validation is performed by: // validation is performed by:

View File

@ -1,3 +1,4 @@
use futures::SinkExt;
use futures::StreamExt; use futures::StreamExt;
use log::*; use log::*;
use nostr_rs_relay::close::Close; use nostr_rs_relay::close::Close;
@ -6,6 +7,7 @@ use nostr_rs_relay::error::{Error, Result};
use nostr_rs_relay::event::Event; use nostr_rs_relay::event::Event;
use nostr_rs_relay::protostream; use nostr_rs_relay::protostream;
use nostr_rs_relay::protostream::NostrMessage::*; use nostr_rs_relay::protostream::NostrMessage::*;
use nostr_rs_relay::protostream::NostrResponse::*;
use rusqlite::Result as SQLResult; use rusqlite::Result as SQLResult;
use std::env; use std::env;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
@ -59,7 +61,7 @@ async fn nostr_server(
) { ) {
// get a broadcast channel for clients to communicate on // get a broadcast channel for clients to communicate on
// wrap the TCP stream in a websocket. // wrap the TCP stream in a websocket.
let mut _bcast_rx = broadcast.subscribe(); let mut bcast_rx = broadcast.subscribe();
let conn = tokio_tungstenite::accept_async(stream).await; let conn = tokio_tungstenite::accept_async(stream).await;
let ws_stream = conn.expect("websocket handshake error"); let ws_stream = conn.expect("websocket handshake error");
// a stream & sink of Nostr protocol messages // a stream & sink of Nostr protocol messages
@ -71,6 +73,26 @@ async fn nostr_server(
let mut conn_good = true; let mut conn_good = true;
loop { loop {
tokio::select! { tokio::select! {
Ok(global_event) = bcast_rx.recv() => {
// ignoring closed broadcast errors, there will always be one sender available.
// Is there a subscription for this event?
let sub_name_opt = conn.get_matching_subscription(&global_event);
if sub_name_opt.is_none() {
return;
} else {
let sub_name = sub_name_opt.unwrap();
let event_str = serde_json::to_string(&global_event);
if event_str.is_ok() {
info!("sub match: client: {}, sub: {}, event: {}",
conn.get_client_prefix(), sub_name,
global_event.get_event_id_prefix());
// create an event response and send it
let res = EventRes(sub_name.to_owned(),event_str.unwrap());
nostr_stream.send(res).await.ok();
}
}
},
// check if this client has a subscription
proto_next = nostr_stream.next() => { proto_next = nostr_stream.next() => {
match proto_next { match proto_next {
Some(Ok(EventMsg(ec))) => { Some(Ok(EventMsg(ec))) => {
@ -80,7 +102,13 @@ async fn nostr_server(
match parsed { match parsed {
Ok(e) => { Ok(e) => {
let id_prefix:String = e.id.chars().take(8).collect(); let id_prefix:String = e.id.chars().take(8).collect();
info!("Successfully parsed/validated event: {}", id_prefix)}, info!("Successfully parsed/validated event: {}", id_prefix);
// send this event to everyone listening.
let bcast_res = broadcast.send(e);
if bcast_res.is_err() {
warn!("Could not send broadcast message: {:?}", bcast_res);
}
},
Err(_) => {info!("Invalid event ignored")} Err(_) => {info!("Invalid event ignored")}
} }
}, },

View File

@ -25,8 +25,10 @@ pub enum NostrMessage {
// Either an event w/ subscription, or a notice // Either an event w/ subscription, or a notice
#[derive(Deserialize, Serialize, Clone, PartialEq, Debug)] #[derive(Deserialize, Serialize, Clone, PartialEq, Debug)]
enum NostrResponse { pub enum NostrResponse {
Notice(String), NoticeRes(String),
// A subscription identifier and serialized response
EventRes(String, String),
} }
// A Nostr protocol stream is layered on top of a Websocket stream. // A Nostr protocol stream is layered on top of a Websocket stream.
@ -84,8 +86,20 @@ impl Sink<NostrResponse> for NostrStream {
} }
fn start_send(mut self: Pin<&mut Self>, item: NostrResponse) -> Result<(), Self::Error> { fn start_send(mut self: Pin<&mut Self>, item: NostrResponse) -> Result<(), Self::Error> {
let res_message = serde_json::to_string(&item).expect("Could convert message to string"); //let res_message = serde_json::to_string(&item).expect("Could convert message to string");
match Pin::new(&mut self.ws_stream).start_send(Message::Text(res_message)) { // create the string to send.
// TODO: do real escaping for both of these. Currently output isn't correctly escaped.
let send_str = match item {
NostrResponse::NoticeRes(msg) => {
let s = msg.replace("\"", "");
format!("[\"NOTICE\",\"{}\"]", s)
}
NostrResponse::EventRes(sub, eventstr) => {
let subesc = sub.replace("\"", "");
format!("[\"EVENT\",\"{}\",{}]", subesc, eventstr)
}
};
match Pin::new(&mut self.ws_stream).start_send(Message::Text(send_str)) {
Ok(()) => Ok(()), Ok(()) => Ok(()),
Err(_) => Err(Error::ConnWriteError), Err(_) => Err(Error::ConnWriteError),
} }