Perform subscribe/unsubscribe from websocket

This commit is contained in:
Greg Heartsfield 2021-11-24 16:07:35 -06:00
parent 2da46501ca
commit 1a0f66a518
3 changed files with 24 additions and 7 deletions

View File

@ -72,7 +72,7 @@ async fn nostr_server(stream: TcpStream) {
// Handles valid clients who have upgraded to WebSockets // Handles valid clients who have upgraded to WebSockets
async fn process_client(stream: WebSocketStream<TcpStream>) { async fn process_client(stream: WebSocketStream<TcpStream>) {
// get a protocol helper; // get a protocol helper;
let proto = Proto::new(); let mut proto = Proto::new();
let (mut write, mut read) = stream.split(); let (mut write, mut read) = stream.split();
// TODO: select on a timeout to kill non-responsive clients // TODO: select on a timeout to kill non-responsive clients
@ -91,7 +91,7 @@ async fn process_client(stream: WebSocketStream<TcpStream>) {
.await .await
.expect("send failed"); .expect("send failed");
// Handle this request. Everything else below is basically websocket error handling. // Handle this request. Everything else below is basically websocket error handling.
proto.process_message(cmd); proto.process_message(cmd).ok();
} }
Ok(Message::Binary(_)) => { Ok(Message::Binary(_)) => {
info!("Ignoring Binary message"); info!("Ignoring Binary message");

View File

@ -27,13 +27,20 @@ impl Proto {
p p
} }
pub fn process_message(self: &Self, cmd: String) { // TODO: figure out NOTICE handling for errors here
pub fn process_message(&mut self, cmd: String) -> Result<()> {
info!( info!(
"Processing message in proto for client: {:?}", "Processing message in proto for client: {:?}",
self.client_id self.client_id
); );
// check what kind of message let message = parse_cmd(cmd)?;
info!("Parse result: {:?}", parse_type(cmd)); info!("Parsed message: {:?}", message);
match message {
NostrRequest::EvReq(_) => {}
NostrRequest::SubReq(sub) => self.subscribe(sub),
NostrRequest::CloseReq(close) => self.unsubscribe(close),
};
Ok(())
} }
pub fn subscribe(&mut self, s: Subscription) { pub fn subscribe(&mut self, s: Subscription) {
@ -61,11 +68,18 @@ impl Proto {
} }
// add subscription // add subscription
self.subscriptions.insert(k, s); self.subscriptions.insert(k, s);
info!(
"Registered new subscription, currently have {} active subs",
self.subscriptions.len()
);
} }
pub fn unsubscribe(&mut self, c: Close) { pub fn unsubscribe(&mut self, c: Close) {
self.subscriptions.remove(&c.get_id()); self.subscriptions.remove(&c.get_id());
unimplemented!(); info!(
"Removed subscription, currently have {} active subs",
self.subscriptions.len()
);
} }
} }
@ -99,7 +113,7 @@ fn msg_type_wrapper(msg: String) -> Result<NostrRawMessage> {
} }
} }
pub fn parse_type(msg: String) -> Result<NostrRequest> { pub fn parse_cmd(msg: String) -> Result<NostrRequest> {
// turn this raw string into a parsed request // turn this raw string into a parsed request
let typ = msg_type_wrapper(msg)?; let typ = msg_type_wrapper(msg)?;
match typ { match typ {

View File

@ -86,6 +86,9 @@ impl Subscription {
pub fn get_id(&self) -> String { pub fn get_id(&self) -> String {
self.id.clone() self.id.clone()
} }
pub fn get_filter_count(&self) -> usize {
self.filters.len()
}
} }
#[cfg(test)] #[cfg(test)]