From b0bfaa48fcc050dc3d291b32ed069271e09c31d1 Mon Sep 17 00:00:00 2001 From: Greg Heartsfield Date: Fri, 16 Dec 2022 14:37:02 -0600 Subject: [PATCH] improvement: ignore duplicate REQ messages --- src/server.rs | 41 +++++++++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/src/server.rs b/src/server.rs index abd1973..c6d569d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -459,6 +459,8 @@ async fn nostr_server( // when these subscriptions are cancelled, make a message // available to the executing query so it knows to stop. let mut running_queries: HashMap> = HashMap::new(); + // keep track of the subscriptions we have + let mut current_subs: Vec = Vec::new(); // for stats, keep track of how many events the client published, // and how many it received from queries. @@ -605,26 +607,37 @@ async fn nostr_server( // * registering the subscription so future events can be matched // * making a channel to cancel to request later // * sending a request for a SQL query - let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>(); - match conn.subscribe(s.clone()) { - Ok(()) => { - // when we insert, if there was a previous query running with the same name, cancel it. - if let Some(previous_query) = running_queries.insert(s.id.to_owned(), abandon_query_tx) { - previous_query.send(()).ok(); - } - // start a database query - db::db_query(s, cid.to_owned(), pool.clone(), query_tx.clone(), abandon_query_rx).await; - }, - Err(e) => { - info!("Subscription error: {}", e); - ws_stream.send(make_notice_message(Notice::message(format!("Subscription error: {}", e)))).await.ok(); + // Do nothing if the sub already exists. + if !current_subs.contains(&s) { + current_subs.push(s.clone()); + let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>(); + match conn.subscribe(s.clone()) { + Ok(()) => { + // when we insert, if there was a previous query running with the same name, cancel it. + if let Some(previous_query) = running_queries.insert(s.id.to_owned(), abandon_query_tx) { + previous_query.send(()).ok(); + } + // start a database query + db::db_query(s, cid.to_owned(), pool.clone(), query_tx.clone(), abandon_query_rx).await; + }, + Err(e) => { + info!("Subscription error: {}", e); + ws_stream.send(make_notice_message(Notice::message(format!("Subscription error: {}", e)))).await.ok(); + } } - } + } else { + info!("client send duplicate subscription, ignoring"); + } }, Ok(NostrMessage::CloseMsg(cc)) => { // closing a request simply removes the subscription. let parsed : Result = Result::::from(cc); if let Ok(c) = parsed { + // remove from the list of known subs + if let Some(pos) = current_subs.iter().position(|s| *s.id == c.id) { + current_subs.remove(pos); + } + // check if a query is currently // running, and remove it if so. let stop_tx = running_queries.remove(&c.id);