improvement: ignore duplicate REQ messages

This commit is contained in:
Greg Heartsfield 2022-12-16 14:37:02 -06:00
parent 2e9b1b6ba7
commit b0bfaa48fc

View File

@ -459,6 +459,8 @@ async fn nostr_server(
// when these subscriptions are cancelled, make a message // when these subscriptions are cancelled, make a message
// available to the executing query so it knows to stop. // available to the executing query so it knows to stop.
let mut running_queries: HashMap<String, oneshot::Sender<()>> = HashMap::new(); let mut running_queries: HashMap<String, oneshot::Sender<()>> = HashMap::new();
// keep track of the subscriptions we have
let mut current_subs: Vec<Subscription> = Vec::new();
// for stats, keep track of how many events the client published, // for stats, keep track of how many events the client published,
// and how many it received from queries. // and how many it received from queries.
@ -605,26 +607,37 @@ async fn nostr_server(
// * registering the subscription so future events can be matched // * registering the subscription so future events can be matched
// * making a channel to cancel to request later // * making a channel to cancel to request later
// * sending a request for a SQL query // * sending a request for a SQL query
let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>(); // Do nothing if the sub already exists.
match conn.subscribe(s.clone()) { if !current_subs.contains(&s) {
Ok(()) => { current_subs.push(s.clone());
// when we insert, if there was a previous query running with the same name, cancel it. let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>();
if let Some(previous_query) = running_queries.insert(s.id.to_owned(), abandon_query_tx) { match conn.subscribe(s.clone()) {
previous_query.send(()).ok(); Ok(()) => {
} // when we insert, if there was a previous query running with the same name, cancel it.
// start a database query if let Some(previous_query) = running_queries.insert(s.id.to_owned(), abandon_query_tx) {
db::db_query(s, cid.to_owned(), pool.clone(), query_tx.clone(), abandon_query_rx).await; previous_query.send(()).ok();
}, }
Err(e) => { // start a database query
info!("Subscription error: {}", e); db::db_query(s, cid.to_owned(), pool.clone(), query_tx.clone(), abandon_query_rx).await;
ws_stream.send(make_notice_message(Notice::message(format!("Subscription error: {}", e)))).await.ok(); },
Err(e) => {
info!("Subscription error: {}", e);
ws_stream.send(make_notice_message(Notice::message(format!("Subscription error: {}", e)))).await.ok();
}
} }
} } else {
info!("client send duplicate subscription, ignoring");
}
}, },
Ok(NostrMessage::CloseMsg(cc)) => { Ok(NostrMessage::CloseMsg(cc)) => {
// closing a request simply removes the subscription. // closing a request simply removes the subscription.
let parsed : Result<Close> = Result::<Close>::from(cc); let parsed : Result<Close> = Result::<Close>::from(cc);
if let Ok(c) = parsed { if let Ok(c) = parsed {
// remove from the list of known subs
if let Some(pos) = current_subs.iter().position(|s| *s.id == c.id) {
current_subs.remove(pos);
}
// check if a query is currently // check if a query is currently
// running, and remove it if so. // running, and remove it if so.
let stop_tx = running_queries.remove(&c.id); let stop_tx = running_queries.remove(&c.id);