refactor: simplify tracking of subscriptions

This commit is contained in:
Greg Heartsfield 2022-12-17 20:46:58 -06:00
parent 9be04120c7
commit 0b9778d6ca
2 changed files with 7 additions and 10 deletions

View File

@ -46,6 +46,11 @@ impl ClientConn {
&self.subscriptions &self.subscriptions
} }
/// Check if the given subscription already exists
pub fn has_subscription(&self, sub: &Subscription) -> bool {
self.subscriptions.values().any(|x| x == sub)
}
/// Get a short prefix of the client's unique identifier, suitable /// Get a short prefix of the client's unique identifier, suitable
/// for logging. /// for logging.
#[must_use] #[must_use]

View File

@ -476,8 +476,6 @@ async fn nostr_server(
// when these subscriptions are cancelled, make a message // when these subscriptions are cancelled, make a message
// available to the executing query so it knows to stop. // available to the executing query so it knows to stop.
let mut running_queries: HashMap<String, oneshot::Sender<()>> = HashMap::new(); let mut running_queries: HashMap<String, oneshot::Sender<()>> = HashMap::new();
// keep track of the subscriptions we have
let mut current_subs: Vec<Subscription> = Vec::new();
// for stats, keep track of how many events the client published, // for stats, keep track of how many events the client published,
// and how many it received from queries. // and how many it received from queries.
let mut client_published_event_count: usize = 0; let mut client_published_event_count: usize = 0;
@ -625,11 +623,10 @@ async fn nostr_server(
// * making a channel to cancel to request later // * making a channel to cancel to request later
// * sending a request for a SQL query // * sending a request for a SQL query
// Do nothing if the sub already exists. // Do nothing if the sub already exists.
if !current_subs.contains(&s) { if !conn.has_subscription(&s) {
if let Some(ref lim) = sub_lim_opt { if let Some(ref lim) = sub_lim_opt {
lim.until_ready_with_jitter(jitter).await; lim.until_ready_with_jitter(jitter).await;
} }
current_subs.push(s.clone());
let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>(); let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>();
match conn.subscribe(s.clone()) { match conn.subscribe(s.clone()) {
Ok(()) => { Ok(()) => {
@ -653,11 +650,6 @@ async fn nostr_server(
// closing a request simply removes the subscription. // closing a request simply removes the subscription.
let parsed : Result<Close> = Result::<Close>::from(cc); let parsed : Result<Close> = Result::<Close>::from(cc);
if let Ok(c) = parsed { if let Ok(c) = parsed {
// remove from the list of known subs
if let Some(pos) = current_subs.iter().position(|s| *s.id == c.id) {
current_subs.remove(pos);
}
// check if a query is currently // check if a query is currently
// running, and remove it if so. // running, and remove it if so.
let stop_tx = running_queries.remove(&c.id); let stop_tx = running_queries.remove(&c.id);