feat: server-side pings and disconnects

This commit is contained in:
Greg Heartsfield 2022-02-12 16:57:26 -06:00
parent 9e06cc9482
commit 77f35f9f43

View File

@ -25,6 +25,8 @@ use std::convert::Infallible;
use std::env; use std::env;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::Path; use std::path::Path;
use std::time::Duration;
use std::time::Instant;
use tokio::runtime::Builder; use tokio::runtime::Builder;
use tokio::sync::broadcast::{self, Receiver, Sender}; use tokio::sync::broadcast::{self, Receiver, Sender};
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -371,6 +373,19 @@ async fn nostr_server(
// maintain a hashmap of a oneshot channel for active subscriptions. // maintain a hashmap of a oneshot channel for active subscriptions.
// when these subscriptions are cancelled, make a message // when these subscriptions are cancelled, make a message
// available to the executing query so it knows to stop. // available to the executing query so it knows to stop.
// last time this client sent data
let mut last_message_time = Instant::now();
// ping interval (every 5 minutes)
let default_ping_dur = Duration::from_secs(300);
// disconnect after 20 minutes without a ping response or event.
let max_quiet_time = Duration::from_secs(60 * 20);
let start = tokio::time::Instant::now() + default_ping_dur;
let mut ping_interval = tokio::time::interval_at(start, default_ping_dur);
let mut running_queries: HashMap<String, oneshot::Sender<()>> = HashMap::new(); let mut running_queries: HashMap<String, oneshot::Sender<()>> = HashMap::new();
// for stats, keep track of how many events the client published, // for stats, keep track of how many events the client published,
// and how many it received from queries. // and how many it received from queries.
@ -383,6 +398,16 @@ async fn nostr_server(
// server shutting down, exit loop // server shutting down, exit loop
break; break;
}, },
_ = ping_interval.tick() => {
// check how long since we talked to client
// if it has been too long, disconnect
if last_message_time.elapsed() > max_quiet_time {
debug!("ending connection due to lack of client ping response");
break;
}
// Send a ping
ws_stream.send(Message::Ping(Vec::new())).await.ok();
},
Some(query_result) = query_rx.recv() => { Some(query_result) = query_rx.recv() => {
// database informed us of a query result we asked for // database informed us of a query result we asked for
//let res = EventRes(query_result.sub_id,query_result.event); //let res = EventRes(query_result.sub_id,query_result.event);
@ -414,16 +439,21 @@ async fn nostr_server(
} }
}, },
ws_next = ws_stream.next() => { ws_next = ws_stream.next() => {
// update most recent message time for client
last_message_time = Instant::now();
// Consume text messages from the client, parse into Nostr messages. // Consume text messages from the client, parse into Nostr messages.
let nostr_msg = match ws_next { let nostr_msg = match ws_next {
Some(Ok(Message::Text(m))) => { Some(Ok(Message::Text(m))) => {
let msg_parse = convert_to_msg(m); convert_to_msg(m)
msg_parse
}, },
Some(Ok(Message::Binary(_))) => { Some(Ok(Message::Binary(_))) => {
ws_stream.send(Message::Text(format!("[\"NOTICE\",\"{}\"]", "binary messages are not accepted"))).await.ok(); ws_stream.send(Message::Text(format!("[\"NOTICE\",\"{}\"]", "binary messages are not accepted"))).await.ok();
continue; continue;
}, },
Some(Ok(Message::Ping(_))) | Some(Ok(Message::Pong(_))) => {
// get a ping/pong, ignore
continue;
},
None | Some(Ok(Message::Close(_))) | Some(Err(WsError::AlreadyClosed)) | Some(Err(WsError::ConnectionClosed)) => { None | Some(Ok(Message::Close(_))) | Some(Err(WsError::AlreadyClosed)) | Some(Err(WsError::ConnectionClosed)) => {
debug!("normal websocket close from client: {:?}",cid); debug!("normal websocket close from client: {:?}",cid);
break; break;