From 1804bee912f1b01009501e31c715fbe7c072f41e Mon Sep 17 00:00:00 2001 From: Greg Heartsfield Date: Sat, 28 Jan 2023 17:09:21 -0600 Subject: [PATCH] feat(NIP-33): parameterized replaceable events for postgres --- src/repo/postgres.rs | 30 +++++++++++++++++++++++++++++- src/repo/sqlite.rs | 1 + src/server.rs | 11 +++++++++-- 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index 1991be2..8391b89 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -133,7 +133,34 @@ ON CONFLICT (id) DO NOTHING"#, ); } } - + // parameterized replaceable events + // check for parameterized replaceable events that would be hidden; don't insert these either. + if let Some(d_tag) = e.distinct_param() { + let update_count; + if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) { + update_count = sqlx::query("DELETE FROM event WHERE kind=$1 AND pub_key=$2 AND id NOT IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=$1 AND e.pub_key=$2 AND t.name='d' AND t.value_hex=$3 ORDER BY created_at DESC LIMIT 1);") + .bind(e.kind as i64) + .bind(hex::decode(&e.pubkey).ok()) + .bind(hex::decode(d_tag).ok()) + .execute(&mut tx) + .await?.rows_affected(); + } else { + update_count = sqlx::query("DELETE FROM event WHERE kind=$1 AND pub_key=$2 AND id NOT IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=$1 AND e.pub_key=$2 AND t.name='d' AND t.value=$3 ORDER BY created_at DESC LIMIT 1);") + .bind(e.kind as i64) + .bind(hex::decode(&e.pubkey).ok()) + .bind(d_tag.as_bytes()) + .execute(&mut tx) + .await?.rows_affected(); + } + if update_count > 0 { + info!( + "removed {} older parameterized replaceable kind {} events for author: {:?}", + update_count, + e.kind, + e.get_author_prefix() + ); + } + } // if this event is a deletion, hide the referenced events from the same author. if e.kind == 5 { let event_candidates = e.tag_values_by_name("e"); @@ -300,6 +327,7 @@ ON CONFLICT (id) DO NOTHING"#, // TODO: we could use try_send, but we'd have to juggle // getting the query result back as part of the error // result. + metrics.sent_events.inc(); query_tx .send(QueryResult { sub_id: sub.get_id(), diff --git a/src/repo/sqlite.rs b/src/repo/sqlite.rs index bdd21ac..299f047 100644 --- a/src/repo/sqlite.rs +++ b/src/repo/sqlite.rs @@ -420,6 +420,7 @@ impl NostrRepo for SqliteRepo { // TODO: we could use try_send, but we'd have to juggle // getting the query result back as part of the error // result. + metrics.sent_events.inc(); query_tx .blocking_send(QueryResult { sub_id: sub.get_id(), diff --git a/src/server.rs b/src/server.rs index 9fffd07..e64b946 100644 --- a/src/server.rs +++ b/src/server.rs @@ -321,21 +321,27 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul "write_event", "Event writing response times", )).unwrap(); + let sent_events = IntCounter::with_opts(Opts::new( + "sent_event", + "Events sent", + )).unwrap(); let connections = IntCounter::with_opts(Opts::new( "connections", - "New connections" + "New connections", )).unwrap(); let query_aborts = IntCounter::with_opts(Opts::new( "query_abort", - "Aborted queries" + "Aborted queries", )).unwrap(); registry.register(Box::new(query_sub.clone())).unwrap(); registry.register(Box::new(write_events.clone())).unwrap(); + registry.register(Box::new(sent_events.clone())).unwrap(); registry.register(Box::new(connections.clone())).unwrap(); registry.register(Box::new(query_aborts.clone())).unwrap(); let metrics = NostrMetrics { query_sub, write_events, + sent_events, connections, query_aborts, }; @@ -783,6 +789,7 @@ async fn nostr_server( pub struct NostrMetrics { pub query_sub: Histogram, pub write_events: Histogram, + pub sent_events: IntCounter, pub connections: IntCounter, pub query_aborts: IntCounter, }