diff --git a/Cargo.toml b/Cargo.toml index 19ae3b4..7b8e6c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ secp256k1 = {git = "https://github.com/rust-bitcoin/rust-secp256k1.git", rev = " serde = { version = "^1.0", features = ["derive"] } serde_json = {version = "^1.0", features = ["preserve_order"]} hex = "^0.4" -rusqlite = "^0.26" +rusqlite = { version = "^0.26", features = ["limits"]} lazy_static = "^1.4" governor = "^0.4" nonzero_ext = "^0.3" diff --git a/src/db.rs b/src/db.rs index 5bc4e88..aca3bef 100644 --- a/src/db.rs +++ b/src/db.rs @@ -11,6 +11,8 @@ use rusqlite::Connection; use rusqlite::OpenFlags; //use std::num::NonZeroU32; use crate::config::SETTINGS; +use rusqlite::limits::Limit; +use rusqlite::types::ToSql; use std::path::Path; use std::thread; use std::time::Instant; @@ -34,7 +36,7 @@ PRAGMA journal_mode=WAL; PRAGMA main.synchronous=NORMAL; PRAGMA foreign_keys = ON; PRAGMA application_id = 1654008667; -PRAGMA user_version = 2; +PRAGMA user_version = 3; -- Event Table CREATE TABLE IF NOT EXISTS event ( @@ -54,6 +56,21 @@ CREATE INDEX IF NOT EXISTS created_at_index ON event(created_at); CREATE INDEX IF NOT EXISTS author_index ON event(author); CREATE INDEX IF NOT EXISTS kind_index ON event(kind); +-- Tag Table +-- Tag values are stored as either a BLOB (if they come in as a +-- hex-string), or TEXT otherwise. +-- This means that searches need to select the appropriate column. +CREATE TABLE IF NOT EXISTS tag ( +id INTEGER PRIMARY KEY, +event_id INTEGER NOT NULL, -- an event ID that contains a tag. +name TEXT, -- the tag name ("p", "e", whatever) +value TEXT, -- the tag value, if not hex. +value_hex BLOB, -- the tag value, if it can be interpreted as a hex string. +FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE +); +CREATE INDEX IF NOT EXISTS tag_val_index ON tag(value); +CREATE INDEX IF NOT EXISTS tag_val_hex_index ON tag(value_hex); + -- Event References Table CREATE TABLE IF NOT EXISTS event_ref ( id INTEGER PRIMARY KEY, @@ -80,19 +97,36 @@ CREATE INDEX IF NOT EXISTS pubkey_ref_index ON pubkey_ref(referenced_pubkey); /// Upgrade DB to latest version, and execute pragma settings pub fn upgrade_db(conn: &mut Connection) -> Result<()> { // check the version. - let curr_version = db_version(conn)?; + let mut curr_version = db_version(conn)?; info!("DB version = {:?}", curr_version); + debug!( + "SQLite max query parameters: {}", + conn.limit(Limit::SQLITE_LIMIT_VARIABLE_NUMBER) + ); + debug!( + "SQLite max table/blob/text length: {} MB", + (conn.limit(Limit::SQLITE_LIMIT_LENGTH) as f64 / (1024 * 1024) as f64).floor() + ); + debug!( + "SQLite max SQL length: {} MB", + (conn.limit(Limit::SQLITE_LIMIT_SQL_LENGTH) as f64 / (1024 * 1024) as f64).floor() + ); + // initialize from scratch if curr_version == 0 { match conn.execute_batch(INIT_SQL) { - Ok(()) => info!("database pragma/schema initialized to v2, and ready"), + Ok(()) => { + info!("database pragma/schema initialized to v3, and ready"); + //curr_version = 3; + } Err(err) => { error!("update failed: {}", err); panic!("database could not be initialized"); } } - } else if curr_version == 1 { + } + if curr_version == 1 { // only change is adding a hidden column to events. let upgrade_sql = r##" ALTER TABLE event ADD hidden INTEGER; @@ -100,19 +134,73 @@ UPDATE event SET hidden=FALSE; PRAGMA user_version = 2; "##; match conn.execute_batch(upgrade_sql) { - Ok(()) => info!("database schema upgraded v1 -> v2"), + Ok(()) => { + info!("database schema upgraded v1 -> v2"); + curr_version = 2; + } Err(err) => { error!("update failed: {}", err); panic!("database could not be upgraded"); } } - } else if curr_version == 2 { + } + if curr_version == 2 { + // this version lacks the tag column + debug!("database schema needs update from 2->3"); + let upgrade_sql = r##" +CREATE TABLE IF NOT EXISTS tag ( +id INTEGER PRIMARY KEY, +event_id INTEGER NOT NULL, -- an event ID that contains a tag. +name TEXT, -- the tag name ("p", "e", whatever) +value TEXT, -- the tag value, if not hex. +value_hex BLOB, -- the tag value, if it can be interpreted as a hex string. +FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE +); +CREATE INDEX IF NOT EXISTS tag_val_index ON tag(value); +CREATE INDEX IF NOT EXISTS tag_val_hex_index ON tag(value_hex); +PRAGMA user_version = 3; +"##; + // TODO: load existing refs into tag table + match conn.execute_batch(upgrade_sql) { + Ok(()) => { + info!("database schema upgraded v2 -> v3"); + //curr_version = 3; + } + Err(err) => { + error!("update failed: {}", err); + panic!("database could not be upgraded"); + } + } + info!("Starting transaction"); + // iterate over every event/pubkey tag + let tx = conn.transaction()?; + { + let mut stmt = tx.prepare("select event_id, \"e\", lower(hex(referenced_event)) from event_ref union select event_id, \"p\", lower(hex(referenced_pubkey)) from pubkey_ref;")?; + let mut tag_rows = stmt.query([])?; + while let Some(row) = tag_rows.next()? { + // we want to capture the event_id that had the tag, the tag name, and the tag hex value. + let event_id: u64 = row.get(0)?; + let tag_name: String = row.get(1)?; + let tag_value: String = row.get(2)?; + // this will leave behind p/e tags that were non-hex, but they are invalid anyways. + if is_hex(&tag_value) { + tx.execute( + "INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);", + params![event_id, tag_name, hex::decode(&tag_value).ok()], + )?; + } + } + } + tx.commit()?; + info!("Upgrade complete"); + } else if curr_version == 3 { debug!("Database version was already current"); - } else if curr_version > 2 { + } else if curr_version > 3 { panic!("Database version is newer than supported by this executable"); } // Setup PRAGMA conn.execute_batch(STARTUP_SQL)?; + info!("Finished pragma"); Ok(()) } @@ -134,6 +222,7 @@ pub async fn db_writer( )?; info!("opened database {:?} for writing", full_path); upgrade_db(&mut conn)?; + // get rate limit settings let rps_setting = config.limits.messages_per_sec; let mut most_recent_rate_limit = Instant::now(); @@ -234,24 +323,24 @@ pub fn write_event(conn: &mut Connection, e: &Event) -> Result { } // remember primary key of the event most recently inserted. let ev_id = tx.last_insert_rowid(); - // add all event tags into the event_ref table - let etags = e.get_event_tags(); - if !etags.is_empty() { - for etag in etags.iter() { - tx.execute( - "INSERT OR IGNORE INTO event_ref (event_id, referenced_event) VALUES (?1, ?2)", - params![ev_id, hex::decode(&etag).ok()], - )?; - } - } - // add all event tags into the pubkey_ref table - let ptags = e.get_pubkey_tags(); - if !ptags.is_empty() { - for ptag in ptags.iter() { - tx.execute( - "INSERT OR IGNORE INTO pubkey_ref (event_id, referenced_pubkey) VALUES (?1, ?2)", - params![ev_id, hex::decode(&ptag).ok()], - )?; + // add all tags to the tag table + for tag in e.tags.iter() { + // ensure we have 2 values. + if tag.len() >= 2 { + let tagname = &tag[0]; + let tagval = &tag[1]; + // if tagvalue is hex; + if is_hex(tagval) { + tx.execute( + "INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)", + params![ev_id, &tagname, hex::decode(&tagval).ok()], + )?; + } else { + tx.execute( + "INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)", + params![ev_id, &tagname, &tagval], + )?; + } } } // if this event is for a metadata update, hide every other kind=0 @@ -294,14 +383,27 @@ fn is_hex(s: &str) -> bool { s.chars().all(|x| char::is_ascii_hexdigit(&x)) } -/// Create a dynamic SQL query string from a subscription. -fn query_from_sub(sub: &Subscription) -> String { +fn repeat_vars(count: usize) -> String { + if count == 0 { + return "".to_owned(); + } + let mut s = "?,".repeat(count); + // Remove trailing comma + s.pop(); + s +} + +/// Create a dynamic SQL query string and params from a subscription. +fn query_from_sub(sub: &Subscription) -> (String, Vec>) { // build a dynamic SQL query. all user-input is either an integer // (sqli-safe), or a string that is filtered to only contain - // hexadecimal characters. + // hexadecimal characters. Strings that require escaping (tag + // names/values) use parameters. let mut query = - "SELECT DISTINCT(e.content) FROM event e LEFT JOIN event_ref er ON e.id=er.event_id LEFT JOIN pubkey_ref pr ON e.id=pr.event_id " - .to_owned(); + "SELECT DISTINCT(e.content) FROM event e LEFT JOIN tag t ON e.id=t.event_id ".to_owned(); + // parameters + let mut params: Vec> = vec![]; + // for every filter in the subscription, generate a where clause let mut filter_clauses: Vec = Vec::new(); for f in sub.filters.iter() { @@ -340,33 +442,33 @@ fn query_from_sub(sub: &Subscription) -> String { let id_clause = format!("event_hash IN ({})", ids_escaped.join(", ")); filter_components.push(id_clause); } - // Query for referenced event - if f.events.is_some() { - let events_escaped: Vec = f - .events - .as_ref() - .unwrap() - .iter() - .filter(|&x| is_hex(x)) - .map(|x| format!("x'{}'", x)) - .collect(); - let events_clause = format!("referenced_event IN ({})", events_escaped.join(", ")); - filter_components.push(events_clause); + // Query for tags + if let Some(map) = &f.tags { + for (key, val) in map.iter() { + let mut str_vals: Vec> = vec![]; + let mut blob_vals: Vec> = vec![]; + for v in val { + if is_hex(v) { + if let Ok(h) = hex::decode(&v) { + blob_vals.push(Box::new(h)); + } + } else { + str_vals.push(Box::new(v.to_owned())); + } + } + // create clauses with "?" params for each tag value being searched + let str_clause = format!("value IN ({})", repeat_vars(str_vals.len())); + let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len())); + let tag_clause = format!("(name=? AND ({} OR {}))", str_clause, blob_clause); + // add the tag name as the first parameter + params.push(Box::new(key.to_owned())); + // add all tag values that are plain strings as params + params.append(&mut str_vals); + // add all tag values that are blobs as params + params.append(&mut blob_vals); + filter_components.push(tag_clause); + } } - // Query for referenced pubkey - if f.pubkeys.is_some() { - let pubkeys_escaped: Vec = f - .pubkeys - .as_ref() - .unwrap() - .iter() - .filter(|&x| is_hex(x)) - .map(|x| format!("x'{}'", x)) - .collect(); - let pubkeys_clause = format!("referenced_pubkey IN ({})", pubkeys_escaped.join(", ")); - filter_components.push(pubkeys_clause); - } - // Query for timestamp if f.since.is_some() { let created_clause = format!("created_at > {}", f.since.unwrap()); @@ -398,7 +500,7 @@ fn query_from_sub(sub: &Subscription) -> String { // add order clause query.push_str(" ORDER BY created_at ASC"); debug!("query string: {}", query); - query + (query, params) } /// Perform a database query using a subscription. @@ -423,10 +525,10 @@ pub async fn db_query( let mut row_count: usize = 0; let start = Instant::now(); // generate SQL query - let q = query_from_sub(&sub); + let (q, p) = query_from_sub(&sub); // execute the query let mut stmt = conn.prepare(&q)?; - let mut event_rows = stmt.query([])?; + let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?; while let Some(row) = event_rows.next()? { // check if this is still active (we could do this every N rows) if abandon_query_rx.try_recv().is_ok() { @@ -448,6 +550,6 @@ pub async fn db_query( start.elapsed() ); let ok: Result<()> = Ok(()); - return ok; + ok }); } diff --git a/src/event.rs b/src/event.rs index fb527e9..99b2453 100644 --- a/src/event.rs +++ b/src/event.rs @@ -193,39 +193,8 @@ impl Event { serde_json::Value::Array(tags) } - /// Get a list of event tags. - pub fn get_event_tags(&self) -> Vec<&str> { - let mut etags = vec![]; - for t in self.tags.iter() { - if t.len() >= 2 && t.get(0).unwrap() == "e" { - etags.push(&t.get(1).unwrap()[..]); - } - } - etags - } - - /// Get a list of pubkey/petname tags. - pub fn get_pubkey_tags(&self) -> Vec<&str> { - let mut ptags = vec![]; - for t in self.tags.iter() { - if t.len() >= 2 && t.get(0).unwrap() == "p" { - ptags.push(&t.get(1).unwrap()[..]); - } - } - ptags - } - - /// Check if a given event is referenced in an event tag. - pub fn event_tag_match(&self, eventid: &str) -> bool { - self.get_event_tags().contains(&eventid) - } - - /// Check if a given event is referenced in an event tag. - pub fn pubkey_tag_match(&self, pubkey: &str) -> bool { - self.get_pubkey_tags().contains(&pubkey) - } - /// Generic tag match + // TODO: is this used anywhere? pub fn generic_tag_match(&self, tagname: &str, tagvalue: &str) -> bool { match &self.tagidx { Some(idx) => { diff --git a/src/protostream.rs b/src/protostream.rs index 830902a..205878e 100644 --- a/src/protostream.rs +++ b/src/protostream.rs @@ -81,8 +81,14 @@ impl Stream for NostrStream { Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some(v)) => match v { Ok(Message::Text(vs)) => Poll::Ready(Some(convert(vs))), + Ok(Message::Ping(_x)) => { + debug!("client ping"); + //Pin::new(&mut self.ws_stream).start_send(Message::Pong(x)); + //info!("sent pong"); + Poll::Pending + } Ok(Message::Binary(_)) => Poll::Ready(Some(Err(Error::ProtoParseError))), - Ok(Message::Pong(_)) | Ok(Message::Ping(_)) => Poll::Pending, + Ok(Message::Pong(_)) => Poll::Pending, Ok(Message::Close(_)) => Poll::Ready(None), Err(WsError::AlreadyClosed) | Err(WsError::ConnectionClosed) => Poll::Ready(None), Err(_) => Poll::Ready(Some(Err(Error::ConnError))), diff --git a/src/subscription.rs b/src/subscription.rs index 7dd53ac..75337e1 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -1,7 +1,10 @@ //! Subscription and filter parsing use crate::error::Result; use crate::event::Event; +use serde::de::Unexpected; use serde::{Deserialize, Deserializer, Serialize}; +use serde_json::Value; +use std::collections::HashMap; use std::collections::HashSet; /// Subscription identifier and set of request filters @@ -16,30 +19,76 @@ pub struct Subscription { /// Corresponds to client-provided subscription request elements. Any /// element can be present if it should be used in filtering, or /// absent ([`None`]) if it should be ignored. -#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)] +#[derive(Serialize, PartialEq, Debug, Clone)] pub struct ReqFilter { /// Event hashes pub ids: Option>, /// Event kinds pub kinds: Option>, - /// Referenced event hash - #[serde(rename = "#e")] - pub events: Option>, - /// Referenced public key for a petname - #[serde(rename = "#p")] - pub pubkeys: Option>, /// Events published after this time pub since: Option, /// Events published before this time pub until: Option, /// List of author public keys pub authors: Option>, - /// Set of event tags, for quick indexing + /// Set of tags #[serde(skip)] - event_tag_set: Option>, - /// Set of pubkey tags, for quick indexing - #[serde(skip)] - pubkey_tag_set: Option>, + pub tags: Option>>, +} + +impl<'de> Deserialize<'de> for ReqFilter { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let received: Value = Deserialize::deserialize(deserializer)?; + let filter = received.as_object().ok_or_else(|| { + serde::de::Error::invalid_type( + Unexpected::Other("reqfilter is not an object"), + &"a json object", + ) + })?; + let mut rf = ReqFilter { + ids: None, + kinds: None, + since: None, + until: None, + authors: None, + tags: None, + }; + let mut ts = None; + // iterate through each key, and assign values that exist + for (key, val) in filter.into_iter() { + // ids + if key == "ids" { + rf.ids = Deserialize::deserialize(val).ok(); + } else if key == "kinds" { + rf.kinds = Deserialize::deserialize(val).ok(); + } else if key == "since" { + rf.since = Deserialize::deserialize(val).ok(); + } else if key == "until" { + rf.until = Deserialize::deserialize(val).ok(); + } else if key == "authors" { + rf.authors = Deserialize::deserialize(val).ok(); + } else if key.starts_with('#') && key.len() > 1 && val.is_array() { + // remove the prefix + let tagname = &key[1..]; + if ts.is_none() { + // Initialize the tag if necessary + ts = Some(HashMap::new()); + } + if let Some(m) = ts.as_mut() { + let tag_vals: Option> = Deserialize::deserialize(val).ok(); + if let Some(v) = tag_vals { + let hs = HashSet::from_iter(v.into_iter()); + m.insert(tagname.to_owned(), hs); + } + }; + } + } + rf.tags = ts; + Ok(rf) + } } impl<'de> Deserialize<'de> for Subscription { @@ -49,7 +98,7 @@ impl<'de> Deserialize<'de> for Subscription { where D: Deserializer<'de>, { - let mut v: serde_json::Value = Deserialize::deserialize(deserializer)?; + let mut v: Value = Deserialize::deserialize(deserializer)?; // this shoud be a 3-or-more element array. // verify the first element is a String, REQ // get the subscription from the second element. @@ -82,10 +131,9 @@ impl<'de> Deserialize<'de> for Subscription { let mut filters = vec![]; for fv in i { - let mut f: ReqFilter = serde_json::from_value(fv.take()) + let f: ReqFilter = serde_json::from_value(fv.take()) .map_err(|_| serde::de::Error::custom("could not parse filter"))?; // create indexes - f.update_tag_indexes(); filters.push(f); } Ok(Subscription { @@ -113,15 +161,6 @@ impl Subscription { } impl ReqFilter { - /// Update pubkey and event indexes - fn update_tag_indexes(&mut self) { - if let Some(event_vec) = &self.events { - self.event_tag_set = Some(HashSet::from_iter(event_vec.iter().cloned())); - } - if let Some(pubkey_vec) = &self.pubkeys { - self.pubkey_tag_set = Some(HashSet::from_iter(pubkey_vec.iter().cloned())); - } - } /// Check for a match within the authors list. fn ids_match(&self, event: &Event) -> bool { self.ids @@ -137,28 +176,20 @@ impl ReqFilter { .unwrap_or(true) } - /// Check if this filter either matches, or does not care about the event tags. - fn event_match(&self, event: &Event) -> bool { - // an event match is performed by looking at the ReqFilter events field, and sending a hashset to the event to intersect with. - if let Some(es) = &self.event_tag_set { - // if there exists event tags in this filter, find if any intersect. - event.generic_tag_val_intersect("e", es) - } else { - // if no event tags were requested in a filter, we do match - true - } - } - - /// Check if this filter either matches, or does not care about the event tags. - fn pubkey_match(&self, event: &Event) -> bool { - // an event match is performed by looking at the ReqFilter events field, and sending a hashset to the event to intersect with. - if let Some(ps) = &self.pubkey_tag_set { - // if there exists event tags in this filter, find if any intersect. - event.generic_tag_val_intersect("p", ps) - } else { - // if no event tags were requested in a filter, we do match - true + fn tag_match(&self, event: &Event) -> bool { + // get the hashset from the filter. + if let Some(map) = &self.tags { + for (key, val) in map.iter() { + let tag_match = event.generic_tag_val_intersect(key, val); + // if there is no match for this tag, the match fails. + if !tag_match { + return false; + } + // if there was a match, we move on to the next one. + } } + // if the tag map is empty, the match succeeds (there was no filter) + true } /// Check if this filter either matches, or does not care about the kind. @@ -176,8 +207,7 @@ impl ReqFilter { && self.since.map(|t| event.created_at > t).unwrap_or(true) && self.kind_match(event.kind) && self.authors_match(event) - && self.pubkey_match(event) - && self.event_match(event) + && self.tag_match(event) } }