From e7d0ab1aca14c4e69508bf89852e5c4e9f1b74c8 Mon Sep 17 00:00:00 2001 From: Greg Heartsfield Date: Sun, 5 Dec 2021 17:15:50 -0600 Subject: [PATCH] feat: parse subscriptions from websockets Parses subscription requests (REQ, but not CLOSE). Performs no subscription state management yet. --- src/event.rs | 5 +- src/lib.rs | 1 + src/main.rs | 13 +- src/protostream.rs | 3 +- src/subscription.rs | 321 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 335 insertions(+), 8 deletions(-) create mode 100644 src/subscription.rs diff --git a/src/event.rs b/src/event.rs index 46cbb3d..9975898 100644 --- a/src/event.rs +++ b/src/event.rs @@ -79,10 +79,7 @@ impl Event { let pubkey = schnorrsig::PublicKey::from_str(&self.pubkey).unwrap(); let verify = secp.schnorrsig_verify(&sig, &message, &pubkey); match verify { - Ok(()) => { - info!("verified event"); - true - } + Ok(()) => true, _ => false, } } diff --git a/src/lib.rs b/src/lib.rs index eb7e101..8b94275 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,3 +2,4 @@ pub mod conn; pub mod error; pub mod event; pub mod protostream; +pub mod subscription; diff --git a/src/main.rs b/src/main.rs index 127d7f9..0df575e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -72,11 +72,18 @@ async fn nostr_server( tokio::select! { proto_next = nostr_stream.next() => { match proto_next { - Some(Ok(EventMsg(e))) => { + Some(Ok(EventMsg(ec))) => { + // An EventCmd needs to be validated to be converted into an Event // handle each type of message - let _x : Result = Result::::from(e); + let parsed : Result = Result::::from(ec); + match parsed { + Ok(_) => {info!("Successfully parsed/validated event")}, + Err(_) => {info!("Invalid event ignored")} + } + }, + Some(Ok(SubMsg(s))) => { + info!("Sub request from client: {:?}", s); }, - Some(Ok(SubMsg)) => {}, Some(Ok(CloseMsg)) => {}, None => { info!("stream ended"); diff --git a/src/protostream.rs b/src/protostream.rs index 58404e9..1bef291 100644 --- a/src/protostream.rs +++ b/src/protostream.rs @@ -1,5 +1,6 @@ use crate::error::{Error, Result}; use crate::event::EventCmd; +use crate::subscription::Subscription; use core::pin::Pin; use futures::sink::Sink; use futures::stream::Stream; @@ -17,7 +18,7 @@ use tungstenite::protocol::Message; #[serde(untagged)] pub enum NostrMessage { EventMsg(EventCmd), - SubMsg, + SubMsg(Subscription), CloseMsg, } diff --git a/src/subscription.rs b/src/subscription.rs new file mode 100644 index 0000000..f086ffa --- /dev/null +++ b/src/subscription.rs @@ -0,0 +1,321 @@ +use crate::error::{Error, Result}; +use crate::event::Event; +use serde::{Deserialize, Deserializer, Serialize}; +//use serde_json::json; +//use serde_json::Result; + +#[derive(Serialize, PartialEq, Debug, Clone)] +pub struct Subscription { + pub id: String, + pub filters: Vec, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub struct ReqFilter { + pub id: Option, + pub author: Option, + pub kind: Option, + #[serde(rename = "e#")] + pub event: Option, + #[serde(rename = "p#")] + pub pubkey: Option, + pub since: Option, + pub authors: Option>, +} + +impl<'de> Deserialize<'de> for Subscription { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let mut v: serde_json::Value = Deserialize::deserialize(deserializer)?; + // this shoud be a 3-or-more element array. + // verify the first element is a String, REQ + // get the subscription from the second element. + // convert each of the remaining objects into filters + + // check for array + let va = v + .as_array_mut() + .ok_or(serde::de::Error::custom("not array"))?; + + // check length + if va.len() < 3 { + return Err(serde::de::Error::custom("not enough fields")); + } + let mut i = va.into_iter(); + // get command ("REQ") and ensure it is a string + let req_cmd_str: serde_json::Value = i.next().unwrap().take(); + let req = req_cmd_str.as_str().ok_or(serde::de::Error::custom( + "first element of request was not a string", + ))?; + if req != "REQ" { + return Err(serde::de::Error::custom("missing REQ command")); + } + + // ensure sub id is a string + let sub_id_str: serde_json::Value = i.next().unwrap().take(); + let sub_id = sub_id_str + .as_str() + .ok_or(serde::de::Error::custom("missing subscription id"))?; + + let mut filters = vec![]; + for fv in i { + let f: ReqFilter = serde_json::from_value(fv.take()) + .map_err(|_| serde::de::Error::custom("could not parse filter"))?; + filters.push(f); + } + Ok(Subscription { + id: sub_id.to_owned(), + filters, + }) + } +} + +impl Subscription { + pub fn parse(json: &str) -> Result { + serde_json::from_str(json).map_err(|e| Error::JsonParseFailed(e)) + } + pub fn get_id(&self) -> String { + self.id.clone() + } + pub fn get_filter_count(&self) -> usize { + self.filters.len() + } + pub fn interested_in_event(&self, event: &Event) -> bool { + // loop through every filter, and return true if any match this event. + for f in self.filters.iter() { + if f.interested_in_event(event) { + return true; + } + } + return false; + } +} + +impl ReqFilter { + // attempt to match against author/authors fields + fn author_match(&self, event: &Event) -> bool { + self.authors + .as_ref() + .map(|vs| vs.contains(&event.pubkey.to_owned())) + .unwrap_or(true) + && self + .author + .as_ref() + .map(|v| v == &event.pubkey) + .unwrap_or(true) + } + fn event_match(&self, event: &Event) -> bool { + self.event + .as_ref() + .map(|t| event.event_tag_match(t)) + .unwrap_or(true) + } + + fn kind_match(&self, kind: u64) -> bool { + self.kind.map(|v| v == kind).unwrap_or(true) + } + + pub fn interested_in_event(&self, event: &Event) -> bool { + // determine if all populated fields in this filter match the provided event. + // a filter matches an event if all the populated fields match. + self.id.as_ref().map(|v| v == &event.id).unwrap_or(true) + && self.since.map(|t| event.created_at > t).unwrap_or(true) + && self.kind_match(event.kind) + && self.author_match(&event) + && self.event_match(&event) + && true // match if all other fields are absent + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn empty_request_parse() -> Result<()> { + let raw_json = "[\"REQ\",\"some-id\",{}]"; + let s: Subscription = serde_json::from_str(raw_json)?; + assert_eq!(s.id, "some-id"); + assert_eq!(s.filters.len(), 1); + assert_eq!(s.filters.get(0).unwrap().author, None); + Ok(()) + } + + #[test] + fn multi_empty_request_parse() -> Result<()> { + let raw_json = r#"["REQ","some-id",{}]"#; + let s: Subscription = serde_json::from_str(raw_json)?; + assert_eq!(s.id, "some-id"); + assert_eq!(s.filters.len(), 1); + assert_eq!(s.filters.get(0).unwrap().author, None); + Ok(()) + } + + #[test] + fn incorrect_header() { + let raw_json = "[\"REQUEST\",\"some-id\",\"{}\"]"; + assert!(serde_json::from_str::(raw_json).is_err()); + } + + #[test] + fn req_missing_filters() { + let raw_json = "[\"REQ\",\"some-id\"]"; + assert!(serde_json::from_str::(raw_json).is_err()); + } + + #[test] + fn invalid_filter() { + // unrecognized field in filter + let raw_json = "[\"REQ\",\"some-id\",{\"foo\": 3}]"; + assert!(serde_json::from_str::(raw_json).is_err()); + } + + #[test] + fn author_filter() -> Result<()> { + let raw_json = "[\"REQ\",\"some-id\",{\"author\": \"test-author-id\"}]"; + let s: Subscription = serde_json::from_str(raw_json)?; + assert_eq!(s.id, "some-id"); + assert_eq!(s.filters.len(), 1); + let first_filter = s.filters.get(0).unwrap(); + assert_eq!(first_filter.author, Some("test-author-id".to_owned())); + Ok(()) + } + + #[test] + fn interest_id_nomatch() -> Result<()> { + // subscription with a filter for ID + let s: Subscription = serde_json::from_str(r#"["REQ","xyz",{"id":"abc"}]"#)?; + let e = Event { + id: "abcde".to_owned(), + pubkey: "".to_owned(), + created_at: 0, + kind: 0, + tags: Vec::new(), + content: "".to_owned(), + sig: "".to_owned(), + }; + assert_eq!(s.interested_in_event(&e), false); + Ok(()) + } + + #[test] + fn interest_time_and_id() -> Result<()> { + // subscription with a filter for ID and time + let s: Subscription = serde_json::from_str(r#"["REQ","xyz",{"id":"abc", "since": 1000}]"#)?; + let e = Event { + id: "abc".to_owned(), + pubkey: "".to_owned(), + created_at: 50, + kind: 0, + tags: Vec::new(), + content: "".to_owned(), + sig: "".to_owned(), + }; + assert_eq!(s.interested_in_event(&e), false); + Ok(()) + } + + #[test] + fn interest_time_and_id2() -> Result<()> { + // subscription with a filter for ID and time + let s: Subscription = serde_json::from_str(r#"["REQ","xyz",{"id":"abc", "since": 1000}]"#)?; + let e = Event { + id: "abc".to_owned(), + pubkey: "".to_owned(), + created_at: 1001, + kind: 0, + tags: Vec::new(), + content: "".to_owned(), + sig: "".to_owned(), + }; + assert_eq!(s.interested_in_event(&e), true); + Ok(()) + } + + #[test] + fn interest_id() -> Result<()> { + // subscription with a filter for ID + let s: Subscription = serde_json::from_str(r#"["REQ","xyz",{"id":"abc"}]"#)?; + let e = Event { + id: "abc".to_owned(), + pubkey: "".to_owned(), + created_at: 0, + kind: 0, + tags: Vec::new(), + content: "".to_owned(), + sig: "".to_owned(), + }; + assert_eq!(s.interested_in_event(&e), true); + Ok(()) + } + + #[test] + fn author_single() -> Result<()> { + // subscription with a filter for ID + let s: Subscription = serde_json::from_str(r#"["REQ","xyz",{"author":"abc"}]"#)?; + let e = Event { + id: "123".to_owned(), + pubkey: "abc".to_owned(), + created_at: 0, + kind: 0, + tags: Vec::new(), + content: "".to_owned(), + sig: "".to_owned(), + }; + assert_eq!(s.interested_in_event(&e), true); + Ok(()) + } + + #[test] + fn authors_single() -> Result<()> { + // subscription with a filter for ID + let s: Subscription = serde_json::from_str(r#"["REQ","xyz",{"authors":["abc"]}]"#)?; + let e = Event { + id: "123".to_owned(), + pubkey: "abc".to_owned(), + created_at: 0, + kind: 0, + tags: Vec::new(), + content: "".to_owned(), + sig: "".to_owned(), + }; + assert_eq!(s.interested_in_event(&e), true); + Ok(()) + } + #[test] + fn authors_multi_pubkey() -> Result<()> { + // check for any of a set of authors, against the pubkey + let s: Subscription = serde_json::from_str(r#"["REQ","xyz",{"authors":["abc", "bcd"]}]"#)?; + let e = Event { + id: "123".to_owned(), + pubkey: "bcd".to_owned(), + created_at: 0, + kind: 0, + tags: Vec::new(), + content: "".to_owned(), + sig: "".to_owned(), + }; + assert_eq!(s.interested_in_event(&e), true); + Ok(()) + } + + #[test] + fn authors_multi_no_match() -> Result<()> { + // check for any of a set of authors, against the pubkey + let s: Subscription = serde_json::from_str(r#"["REQ","xyz",{"authors":["abc", "bcd"]}]"#)?; + let e = Event { + id: "123".to_owned(), + pubkey: "xyz".to_owned(), + created_at: 0, + kind: 0, + tags: Vec::new(), + content: "".to_owned(), + sig: "".to_owned(), + }; + assert_eq!(s.interested_in_event(&e), false); + Ok(()) + } +}