diff --git a/README.md b/README.md index d569640..a4026c6 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,7 @@ mirrored on [GitHub](https://github.com/scsibug/nostr-rs-relay). - [x] NIP-33: [Parameterized Replaceable Events](https://github.com/nostr-protocol/nips/blob/master/33.md) - [x] NIP-40: [Expiration Timestamp](https://github.com/nostr-protocol/nips/blob/master/40.md) - [x] NIP-42: [Authentication of clients to relays](https://github.com/nostr-protocol/nips/blob/master/42.md) +- [x] NIP-114 filter.ids_only ## Quick Start diff --git a/src/bin/bulkloader.rs b/src/bin/bulkloader.rs index 77d9e74..fa0819f 100644 --- a/src/bin/bulkloader.rs +++ b/src/bin/bulkloader.rs @@ -22,7 +22,7 @@ pub fn main() -> Result<()> { // check for a database file, or create one. let settings = config::Settings::new(&None)?; if !Path::new(&settings.database.data_directory).is_dir() { - info!("Database directory does not exist"); + info!("Database directory {:?} does not exist", settings.database.data_directory); return Err(Error::DatabaseDirError); } // Get a database pool diff --git a/src/info.rs b/src/info.rs index b8ac975..a4a155e 100644 --- a/src/info.rs +++ b/src/info.rs @@ -66,7 +66,7 @@ pub struct RelayInfo { /// Convert an Info configuration into public Relay Info impl From for RelayInfo { fn from(c: Settings) -> Self { - let mut supported_nips = vec![1, 2, 9, 11, 12, 15, 16, 20, 22, 33, 40]; + let mut supported_nips = vec![1, 2, 9, 11, 12, 15, 16, 20, 22, 33, 40, 114]; if c.authorization.nip42_auth { supported_nips.push(42); diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index dd120ab..50274f5 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -415,13 +415,18 @@ ON CONFLICT (id) DO NOTHING"#, } } + let event_json_str = if filter.ids_only { // id: hex encoded string + format!("\"{}\"", hex::encode(event_json)) + } else { // event content: utf8 encode + String::from_utf8(event_json).unwrap() + }; + // TODO: we could use try_send, but we'd have to juggle // getting the query result back as part of the error - // result. query_tx .send(QueryResult { sub_id: sub.get_id(), - event: String::from_utf8(event_json).unwrap(), + event: event_json_str, }) .await .ok(); @@ -723,7 +728,14 @@ fn query_from_filter(f: &ReqFilter) -> Option> { return None; } - let mut query = QueryBuilder::new("SELECT e.\"content\", e.created_at FROM \"event\" e WHERE "); + let mut query = QueryBuilder::new("SELECT "); + + if f.ids_only { + query.push("e.id"); + } else { + query.push("e.\"content\", e.created_at"); + } + query.push(" FROM \"event\" e WHERE "); // This tracks whether we need to push a prefix AND before adding another clause let mut push_and = false; @@ -929,6 +941,7 @@ mod tests { ]), )])), force_no_match: false, + ids_only: false, }; let q = query_from_filter(&filter).unwrap(); @@ -948,6 +961,7 @@ mod tests { limit: None, tags: Some(HashMap::from([('d', HashSet::from(["test".to_owned()]))])), force_no_match: false, + ids_only: false, }; let q = query_from_filter(&filter).unwrap(); @@ -973,6 +987,7 @@ mod tests { ]), )])), force_no_match: false, + ids_only: false, }; let q = query_from_filter(&filter).unwrap(); @@ -993,6 +1008,7 @@ mod tests { ('t', HashSet::from(["siamstr".to_owned()])), ])), force_no_match: false, + ids_only: false, }; let q = query_from_filter(&filter).unwrap(); assert_eq!(q.sql(), "SELECT e.\"content\", e.created_at FROM \"event\" e WHERE e.kind in ($1) AND e.id IN (SELECT ee.id FROM \"event\" ee LEFT JOIN tag t on ee.id = t.event_id WHERE ee.hidden != 1::bit(1) and (t.\"name\" = $2 AND (value in ($3))) OR (t.\"name\" = $4 AND (value in ($5)))) AND e.hidden != 1::bit(1) AND (e.expires_at IS NULL OR e.expires_at > now()) ORDER BY e.created_at ASC LIMIT 1000") @@ -1009,7 +1025,25 @@ mod tests { limit: None, tags: Some(HashMap::from([('a', HashSet::new())])), force_no_match: false, + ids_only: false, }; assert!(query_from_filter(&filter).is_none()); } + + #[test] + fn test_ids_only_filter() { + let filter = ReqFilter { + ids: None, + kinds: Some(vec![1, 6, 16, 30023, 1063, 6969]), + since: Some(1700697846), + until: None, + authors: None, + limit: None, + tags: None, + force_no_match: false, + ids_only: true, + }; + let q = query_from_filter(&filter).unwrap(); + assert_eq!(q.sql(), "SELECT e.id FROM \"event\" e WHERE e.kind in ($1, $2, $3, $4, $5, $6) AND e.created_at >= $7 AND e.hidden != 1::bit(1) AND (e.expires_at IS NULL OR e.expires_at > now()) ORDER BY e.created_at ASC LIMIT 1000") + } } diff --git a/src/repo/sqlite.rs b/src/repo/sqlite.rs index e12ae8e..188df8b 100644 --- a/src/repo/sqlite.rs +++ b/src/repo/sqlite.rs @@ -454,7 +454,11 @@ impl NostrRepo for SqliteRepo { return Ok(()); } row_count += 1; - let event_json = row.get(0)?; + let mut event_json = row.get(0)?; + if filter.ids_only { // hex event id + event_json = format!("\"{}\"", event_json); + } + info!("event_json: {:?}", event_json); loop { if query_tx.capacity() != 0 { // we have capacity to add another item @@ -976,12 +980,17 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec>, Option> = vec![]; diff --git a/src/server.rs b/src/server.rs index eef1e7b..358937b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -763,7 +763,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul trace!("Config: {:?}", settings); // do some config validation. if !Path::new(&settings.database.data_directory).is_dir() { - error!("Database directory does not exist"); + error!("Database directory {:?} does not exist", settings.database.data_directory); return Err(Error::DatabaseDirError); } let addr = format!( @@ -1177,7 +1177,12 @@ async fn nostr_server( metrics.sent_events.with_label_values(&["db"]).inc(); client_received_event_count += 1; // send a result - let send_str = format!("[\"EVENT\",\"{}\",{}]", subesc, &query_result.event); + let method = if query_result.event.len() == (64 + 2) { // 64 hex chars + 2 quotes + "HAVE" + } else { + "EVENT" + }; + let send_str = format!("[\"{}\",\"{}\",{}]", method, subesc, &query_result.event); ws_stream.send(Message::Text(send_str)).await.ok(); } }, @@ -1189,17 +1194,25 @@ async fn nostr_server( if !sub.interested_in_event(&global_event) { continue; } + // TODO: serialize at broadcast time, instead of // once for each consumer. if let Ok(event_str) = serde_json::to_string(&global_event) { + // TODO allowed_to_send unnecessarily deserializes this again if allowed_to_send(&event_str, &conn, &settings) { + let subesc = s.replace('"', ""); + let send_str = if sub.ids_only() { + format!("[\"HAVE\",\"{}\",\"{}\"]", subesc, &global_event.id) + } else { + format!("[\"EVENT\",\"{subesc}\",{event_str}]") + }; + // create an event response and send it trace!("sub match for client: {}, sub: {:?}, event: {:?}", cid, s, global_event.get_event_id_prefix()); - let subesc = s.replace('"', ""); metrics.sent_events.with_label_values(&["realtime"]).inc(); - ws_stream.send(Message::Text(format!("[\"EVENT\",\"{subesc}\",{event_str}]"))).await.ok(); + ws_stream.send(Message::Text(send_str)).await.ok(); } } else { warn!("could not serialize event: {:?}", global_event.get_event_id_prefix()); diff --git a/src/subscription.rs b/src/subscription.rs index 4ceca64..af370fb 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -41,6 +41,8 @@ pub struct ReqFilter { // erroneously match. This basically indicates the req tried to // do something invalid. pub force_no_match: bool, + // NIP-114: If the request was submitted via IDS message, return matching event IDs only + pub ids_only: bool, } impl Serialize for ReqFilter { @@ -67,6 +69,9 @@ impl Serialize for ReqFilter { if let Some(authors) = &self.authors { map.serialize_entry("authors", &authors)?; } + if self.ids_only { + map.serialize_entry("ids_only", &self.ids_only)?; + } // serialize tags if let Some(tags) = &self.tags { for (k, v) in tags { @@ -99,6 +104,7 @@ impl<'de> Deserialize<'de> for ReqFilter { limit: None, tags: None, force_no_match: false, + ids_only: false, }; let empty_string = "".into(); let mut ts = None; @@ -135,6 +141,8 @@ impl<'de> Deserialize<'de> for ReqFilter { } } rf.authors = raw_authors; + } else if key == "ids_only" { + rf.ids_only = Deserialize::deserialize(val).ok().unwrap_or(false); } else if key.starts_with('#') && key.len() > 1 && val.is_array() { if let Some(tag_search) = tag_search_char_from_filter(key) { if ts.is_none() { @@ -281,6 +289,15 @@ impl Subscription { } false } + + pub fn ids_only(&self) -> bool { + for f in &self.filters { + if f.ids_only { + return true; + } + } + false + } } fn prefix_match(prefixes: &[String], target: &str) -> bool { diff --git a/tests/integration_test.rs b/tests/integration_test.rs index f65f4e8..f2158eb 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -77,3 +77,79 @@ async fn publish_test() -> Result<()> { let _res = relay.shutdown_tx.send(()); Ok(()) } + +#[tokio::test] +async fn nip_114_flow_test() -> Result<()> { + // Start a relay and wait for startup + let relay = common::start_relay()?; + common::wait_for_healthy_relay(&relay).await?; + + // Open a WebSocket connection to the relay + let (mut ws, _res) = connect_async(format!("ws://localhost:{}", relay.port)).await?; + + let event_id = "f3ce6798d70e358213ebbeba4886bbdfacf1ecfd4f65ee5323ef5f404de32b86"; + + // send a simple pre-made message + let simple_event = r#"["EVENT", {"content": "hello world","created_at": 1691239763, + "id":"f3ce6798d70e358213ebbeba4886bbdfacf1ecfd4f65ee5323ef5f404de32b86", + "kind": 1, + "pubkey": "79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798", + "sig": "30ca29e8581eeee75bf838171dec818af5e6de2b74f5337de940f5cc91186534c0b20d6cf7ad1043a2c51dbd60b979447720a471d346322103c83f6cb66e4e98", + "tags": []}]"#; + ws.send(simple_event.into()).await?; + + // wait a sec so the event is saved + // but with this the event is persisted first, and response never sent? + // thread::sleep(Duration::from_millis(1000)); + + // Send a subscription request with ids_only set to true + let ids_request = r#"["REQ", "sub1", {"kinds": [1], "ids_only": true}]"#; // Example filter + ws.send(ids_request.into()).await?; + + + let mut message_count = 0; + // loop until we receive a message string that contains the event_id. no parsing yet, just quick check + let mut have_msg; + loop { + have_msg = ws.next().await + .ok_or_else(|| anyhow::Error::msg("Did not receive a response message"))??; + info!("Received {:?}", have_msg); + if have_msg.to_text()?.contains("HAVE") { + break; + } + message_count += 1; + if message_count > 5 { + panic!("Did not receive a HAVE message"); + } + } + + // Assuming the "HAVE" message is in the format ["HAVE", "sub1", ["event_id1", ...]] + // Parse the "HAVE" message + let have_msg_json: serde_json::Value = serde_json::from_str(&have_msg.to_text()?)?; + assert!(have_msg_json.is_array(), "Response is not an array"); + let have_msg_array = have_msg_json.as_array().unwrap(); + + // Check if "HAVE" message content is correct + assert_eq!(have_msg_array[0].as_str().unwrap(), "HAVE", "Response does not start with 'HAVE'"); + assert_eq!(have_msg_array[1].as_str().unwrap(), "sub1", "HAVE message does not contain the correct subscription ID"); + let received_event_id = have_msg_array[2].as_str().unwrap(); + assert_eq!(received_event_id, event_id, "HAVE message does not contain the correct event ID"); + assert_eq!(have_msg_array.len(), 3, "HAVE message does not have 3 elements"); + + // Request full event data for specific IDs + let req_message = format!(r#"["REQ", "new_sub", {{"ids": ["{}"]}}]"#, received_event_id); + ws.send(req_message.into()).await?; + + // Listen for full event data + let event_data_msg = ws.next().await + .ok_or_else(|| anyhow::Error::msg("Did not receive full event data"))??; + info!("Received full event data: {:?}", event_data_msg); + + // Shutdown the relay + let _res = relay.shutdown_tx.send(()); + let _join_handle = relay.handle.join(); + + Ok(()) +} + +