diff --git a/src/close.rs b/src/close.rs index 783c650..3f41f40 100644 --- a/src/close.rs +++ b/src/close.rs @@ -58,4 +58,7 @@ impl Close { pub fn parse(json: &str) -> Result { serde_json::from_str(json).map_err(|e| Error::JsonParseFailed(e)) } + pub fn get_id(&self) -> String { + self.id.clone() + } } diff --git a/src/proto.rs b/src/proto.rs index 0931a35..1ac75ce 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -14,6 +14,8 @@ pub struct Proto { max_subs: usize, } +const MAX_SUBSCRIPTION_ID_LEN: usize = 256; + impl Proto { pub fn new() -> Self { let p = Proto { @@ -35,10 +37,34 @@ impl Proto { } pub fn subscribe(&mut self, s: Subscription) { - unimplemented!(); + // TODO: add NOTICE responses for error conditions. At the + // moment, we are silently dropping subscription requests that + // aren't perfect. + + // check if the subscription key is reasonable. + let k = s.get_id(); + let sub_id_len = k.len(); + if sub_id_len > MAX_SUBSCRIPTION_ID_LEN { + info!("Dropping subscription with huge ({}) length", sub_id_len); + return; + } + // check if an existing subscription exists. + if self.subscriptions.contains_key(&k) { + info!("Client requested a subscription with an already-existing key"); + return; + } + + // check if there is room for another subscription. + if self.subscriptions.len() >= self.max_subs { + info!("Client has reached the maximum number of unique subscriptions"); + return; + } + // add subscription + self.subscriptions.insert(k, s); } pub fn unsubscribe(&mut self, c: Close) { + self.subscriptions.remove(&c.get_id()); unimplemented!(); } } diff --git a/src/subscription.rs b/src/subscription.rs index 367ebcf..ad9fdf5 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -83,6 +83,9 @@ impl Subscription { pub fn parse(json: &str) -> Result { serde_json::from_str(json).map_err(|e| Error::JsonParseFailed(e)) } + pub fn get_id(&self) -> String { + self.id.clone() + } } #[cfg(test)]