diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index 8391b89..d5e4b79 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -327,7 +327,6 @@ ON CONFLICT (id) DO NOTHING"#, // TODO: we could use try_send, but we'd have to juggle // getting the query result back as part of the error // result. - metrics.sent_events.inc(); query_tx .send(QueryResult { sub_id: sub.get_id(), diff --git a/src/repo/sqlite.rs b/src/repo/sqlite.rs index 299f047..bdd21ac 100644 --- a/src/repo/sqlite.rs +++ b/src/repo/sqlite.rs @@ -420,7 +420,6 @@ impl NostrRepo for SqliteRepo { // TODO: we could use try_send, but we'd have to juggle // getting the query result back as part of the error // result. - metrics.sent_events.inc(); query_tx .blocking_send(QueryResult { sub_id: sub.get_id(), diff --git a/src/server.rs b/src/server.rs index e64b946..31e5fc6 100644 --- a/src/server.rs +++ b/src/server.rs @@ -13,6 +13,7 @@ use crate::info::RelayInfo; use crate::nip05; use crate::notice::Notice; use crate::subscription::Subscription; +use prometheus::IntCounterVec; use prometheus::{Encoder, Histogram, IntCounter, HistogramOpts, Opts, Registry, TextEncoder}; use futures::SinkExt; use futures::StreamExt; @@ -223,6 +224,69 @@ async fn ctrl_c_or_signal(mut shutdown_signal: Receiver<()>) { } } +fn create_metrics() -> (Registry, NostrMetrics) { + // setup prometheus registry + let registry = Registry::new(); + + let query_sub = Histogram::with_opts(HistogramOpts::new( + "nostr_query_seconds", + "Subscription response times", + )).unwrap(); + let write_events = Histogram::with_opts(HistogramOpts::new( + "nostr_events_write_seconds", + "Event writing response times", + )).unwrap(); + let sent_events = IntCounterVec::new( + Opts::new("nostr_events_sent_total", "Events sent to clients"), + vec!["source"].as_slice(), + ).unwrap(); + let connections = IntCounter::with_opts(Opts::new( + "nostr_connections_total", + "New connections", + )).unwrap(); + let query_aborts = IntCounter::with_opts(Opts::new( + "nostr_query_abort_total", + "Aborted queries", + )).unwrap(); + let cmd_req = IntCounter::with_opts(Opts::new( + "nostr_cmd_req_total", + "REQ commands", + )).unwrap(); + let cmd_event = IntCounter::with_opts(Opts::new( + "nostr_cmd_event_total", + "EVENT commands", + )).unwrap(); + let cmd_close = IntCounter::with_opts(Opts::new( + "nostr_cmd_close_total", + "CLOSE commands", + )).unwrap(); + let disconnects = IntCounterVec::new( + Opts::new("nostr_disconnects_total", "Client disconnects"), + vec!["reason"].as_slice(), + ).unwrap(); + registry.register(Box::new(query_sub.clone())).unwrap(); + registry.register(Box::new(write_events.clone())).unwrap(); + registry.register(Box::new(sent_events.clone())).unwrap(); + registry.register(Box::new(connections.clone())).unwrap(); + registry.register(Box::new(query_aborts.clone())).unwrap(); + registry.register(Box::new(cmd_req.clone())).unwrap(); + registry.register(Box::new(cmd_event.clone())).unwrap(); + registry.register(Box::new(cmd_close.clone())).unwrap(); + registry.register(Box::new(disconnects.clone())).unwrap(); + let metrics = NostrMetrics { + query_sub, + write_events, + sent_events, + connections, + disconnects, + query_aborts, + cmd_req, + cmd_event, + cmd_close, + }; + (registry,metrics) +} + /// Start running a Nostr relay server. pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Result<(), Error> { trace!("Config: {:?}", settings); @@ -310,41 +374,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul // metadata events. let (metadata_tx, metadata_rx) = broadcast::channel::(4096); - // setup prometheus registry - let registry = Registry::new(); - - let query_sub = Histogram::with_opts(HistogramOpts::new( - "query_sub", - "Subscription response times", - )).unwrap(); - let write_events = Histogram::with_opts(HistogramOpts::new( - "write_event", - "Event writing response times", - )).unwrap(); - let sent_events = IntCounter::with_opts(Opts::new( - "sent_event", - "Events sent", - )).unwrap(); - let connections = IntCounter::with_opts(Opts::new( - "connections", - "New connections", - )).unwrap(); - let query_aborts = IntCounter::with_opts(Opts::new( - "query_abort", - "Aborted queries", - )).unwrap(); - registry.register(Box::new(query_sub.clone())).unwrap(); - registry.register(Box::new(write_events.clone())).unwrap(); - registry.register(Box::new(sent_events.clone())).unwrap(); - registry.register(Box::new(connections.clone())).unwrap(); - registry.register(Box::new(query_aborts.clone())).unwrap(); - let metrics = NostrMetrics { - query_sub, - write_events, - sent_events, - connections, - query_aborts, - }; + let (registry, metrics) = create_metrics(); // build a repository for events let repo = db::build_repo(&settings, metrics.clone()).await; // start the database writer task. Give it a channel for @@ -574,6 +604,7 @@ async fn nostr_server( loop { tokio::select! { _ = shutdown.recv() => { + metrics.disconnects.with_label_values(&["shutdown"]).inc(); info!("Close connection down due to shutdown, client: {}, ip: {:?}, connected: {:?}", cid, conn.ip(), orig_start.elapsed()); // server shutting down, exit loop break; @@ -583,6 +614,7 @@ async fn nostr_server( // if it has been too long, disconnect if last_message_time.elapsed() > max_quiet_time { debug!("ending connection due to lack of client ping response"); + metrics.disconnects.with_label_values(&["timeout"]).inc(); break; } // Send a ping @@ -599,6 +631,7 @@ async fn nostr_server( ws_stream.send(Message::Text(send_str)).await.ok(); } else { client_received_event_count += 1; + metrics.sent_events.with_label_values(&["db"]).inc(); // send a result let send_str = format!("[\"EVENT\",\"{}\",{}]", subesc, &query_result.event); ws_stream.send(Message::Text(send_str)).await.ok(); @@ -620,6 +653,7 @@ async fn nostr_server( global_event.get_event_id_prefix()); // create an event response and send it let subesc = s.replace('"', ""); + metrics.sent_events.with_label_values(&["realtime"]).inc(); ws_stream.send(Message::Text(format!("[\"EVENT\",\"{}\",{}]", subesc, event_str))).await.ok(); } else { warn!("could not serialize event: {:?}", global_event.get_event_id_prefix()); @@ -655,16 +689,21 @@ async fn nostr_server( WsError::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake))) => { debug!("websocket close from client (cid: {}, ip: {:?})",cid, conn.ip()); + metrics.disconnects.with_label_values(&["normal"]).inc(); break; }, Some(Err(WsError::Io(e))) => { // IO errors are considered fatal warn!("IO error (cid: {}, ip: {:?}): {:?}", cid, conn.ip(), e); + metrics.disconnects.with_label_values(&["error"]).inc(); + break; } x => { // default condition on error is to close the client connection info!("unknown error (cid: {}, ip: {:?}): {:?} (closing conn)", cid, conn.ip(), x); + metrics.disconnects.with_label_values(&["error"]).inc(); + break; } }; @@ -676,6 +715,7 @@ async fn nostr_server( // handle each type of message let evid = ec.event_id().to_owned(); let parsed : Result = Result::::from(ec); + metrics.cmd_event.inc(); match parsed { Ok(e) => { let id_prefix:String = e.id.chars().take(8).collect(); @@ -712,6 +752,7 @@ async fn nostr_server( if conn.has_subscription(&s) { info!("client sent duplicate subscription, ignoring (cid: {}, sub: {:?})", cid, s.id); } else { + metrics.cmd_req.inc(); if let Some(ref lim) = sub_lim_opt { lim.until_ready_with_jitter(jitter).await; } @@ -738,6 +779,7 @@ async fn nostr_server( // closing a request simply removes the subscription. let parsed : Result = Result::::from(cc); if let Ok(c) = parsed { + metrics.cmd_close.inc(); // check if a query is currently // running, and remove it if so. let stop_tx = running_queries.remove(&c.id); @@ -787,9 +829,14 @@ async fn nostr_server( #[derive(Clone)] pub struct NostrMetrics { - pub query_sub: Histogram, - pub write_events: Histogram, - pub sent_events: IntCounter, - pub connections: IntCounter, - pub query_aborts: IntCounter, + pub query_sub: Histogram, // response time of successful subscriptions + pub write_events: Histogram, // response time of event writes + pub sent_events: IntCounterVec, // count of events sent to clients + pub connections: IntCounter, // count of websocket connections + pub disconnects: IntCounterVec, // client disconnects + pub query_aborts: IntCounter, // count of queries aborted by server + pub cmd_req: IntCounter, // count of REQ commands received + pub cmd_event: IntCounter, // count of EVENT commands received + pub cmd_close: IntCounter, // count of CLOSE commands received + }