improvement: add prometheus metrics, renaming others

This commit is contained in:
Greg Heartsfield 2023-01-30 18:02:28 -06:00
parent c1152ce430
commit b6798f96b6
3 changed files with 87 additions and 42 deletions

View File

@ -327,7 +327,6 @@ ON CONFLICT (id) DO NOTHING"#,
// TODO: we could use try_send, but we'd have to juggle // TODO: we could use try_send, but we'd have to juggle
// getting the query result back as part of the error // getting the query result back as part of the error
// result. // result.
metrics.sent_events.inc();
query_tx query_tx
.send(QueryResult { .send(QueryResult {
sub_id: sub.get_id(), sub_id: sub.get_id(),

View File

@ -420,7 +420,6 @@ impl NostrRepo for SqliteRepo {
// TODO: we could use try_send, but we'd have to juggle // TODO: we could use try_send, but we'd have to juggle
// getting the query result back as part of the error // getting the query result back as part of the error
// result. // result.
metrics.sent_events.inc();
query_tx query_tx
.blocking_send(QueryResult { .blocking_send(QueryResult {
sub_id: sub.get_id(), sub_id: sub.get_id(),

View File

@ -13,6 +13,7 @@ use crate::info::RelayInfo;
use crate::nip05; use crate::nip05;
use crate::notice::Notice; use crate::notice::Notice;
use crate::subscription::Subscription; use crate::subscription::Subscription;
use prometheus::IntCounterVec;
use prometheus::{Encoder, Histogram, IntCounter, HistogramOpts, Opts, Registry, TextEncoder}; use prometheus::{Encoder, Histogram, IntCounter, HistogramOpts, Opts, Registry, TextEncoder};
use futures::SinkExt; use futures::SinkExt;
use futures::StreamExt; use futures::StreamExt;
@ -223,6 +224,69 @@ async fn ctrl_c_or_signal(mut shutdown_signal: Receiver<()>) {
} }
} }
fn create_metrics() -> (Registry, NostrMetrics) {
// setup prometheus registry
let registry = Registry::new();
let query_sub = Histogram::with_opts(HistogramOpts::new(
"nostr_query_seconds",
"Subscription response times",
)).unwrap();
let write_events = Histogram::with_opts(HistogramOpts::new(
"nostr_events_write_seconds",
"Event writing response times",
)).unwrap();
let sent_events = IntCounterVec::new(
Opts::new("nostr_events_sent_total", "Events sent to clients"),
vec!["source"].as_slice(),
).unwrap();
let connections = IntCounter::with_opts(Opts::new(
"nostr_connections_total",
"New connections",
)).unwrap();
let query_aborts = IntCounter::with_opts(Opts::new(
"nostr_query_abort_total",
"Aborted queries",
)).unwrap();
let cmd_req = IntCounter::with_opts(Opts::new(
"nostr_cmd_req_total",
"REQ commands",
)).unwrap();
let cmd_event = IntCounter::with_opts(Opts::new(
"nostr_cmd_event_total",
"EVENT commands",
)).unwrap();
let cmd_close = IntCounter::with_opts(Opts::new(
"nostr_cmd_close_total",
"CLOSE commands",
)).unwrap();
let disconnects = IntCounterVec::new(
Opts::new("nostr_disconnects_total", "Client disconnects"),
vec!["reason"].as_slice(),
).unwrap();
registry.register(Box::new(query_sub.clone())).unwrap();
registry.register(Box::new(write_events.clone())).unwrap();
registry.register(Box::new(sent_events.clone())).unwrap();
registry.register(Box::new(connections.clone())).unwrap();
registry.register(Box::new(query_aborts.clone())).unwrap();
registry.register(Box::new(cmd_req.clone())).unwrap();
registry.register(Box::new(cmd_event.clone())).unwrap();
registry.register(Box::new(cmd_close.clone())).unwrap();
registry.register(Box::new(disconnects.clone())).unwrap();
let metrics = NostrMetrics {
query_sub,
write_events,
sent_events,
connections,
disconnects,
query_aborts,
cmd_req,
cmd_event,
cmd_close,
};
(registry,metrics)
}
/// Start running a Nostr relay server. /// Start running a Nostr relay server.
pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Result<(), Error> { pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Result<(), Error> {
trace!("Config: {:?}", settings); trace!("Config: {:?}", settings);
@ -310,41 +374,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
// metadata events. // metadata events.
let (metadata_tx, metadata_rx) = broadcast::channel::<Event>(4096); let (metadata_tx, metadata_rx) = broadcast::channel::<Event>(4096);
// setup prometheus registry let (registry, metrics) = create_metrics();
let registry = Registry::new();
let query_sub = Histogram::with_opts(HistogramOpts::new(
"query_sub",
"Subscription response times",
)).unwrap();
let write_events = Histogram::with_opts(HistogramOpts::new(
"write_event",
"Event writing response times",
)).unwrap();
let sent_events = IntCounter::with_opts(Opts::new(
"sent_event",
"Events sent",
)).unwrap();
let connections = IntCounter::with_opts(Opts::new(
"connections",
"New connections",
)).unwrap();
let query_aborts = IntCounter::with_opts(Opts::new(
"query_abort",
"Aborted queries",
)).unwrap();
registry.register(Box::new(query_sub.clone())).unwrap();
registry.register(Box::new(write_events.clone())).unwrap();
registry.register(Box::new(sent_events.clone())).unwrap();
registry.register(Box::new(connections.clone())).unwrap();
registry.register(Box::new(query_aborts.clone())).unwrap();
let metrics = NostrMetrics {
query_sub,
write_events,
sent_events,
connections,
query_aborts,
};
// build a repository for events // build a repository for events
let repo = db::build_repo(&settings, metrics.clone()).await; let repo = db::build_repo(&settings, metrics.clone()).await;
// start the database writer task. Give it a channel for // start the database writer task. Give it a channel for
@ -574,6 +604,7 @@ async fn nostr_server(
loop { loop {
tokio::select! { tokio::select! {
_ = shutdown.recv() => { _ = shutdown.recv() => {
metrics.disconnects.with_label_values(&["shutdown"]).inc();
info!("Close connection down due to shutdown, client: {}, ip: {:?}, connected: {:?}", cid, conn.ip(), orig_start.elapsed()); info!("Close connection down due to shutdown, client: {}, ip: {:?}, connected: {:?}", cid, conn.ip(), orig_start.elapsed());
// server shutting down, exit loop // server shutting down, exit loop
break; break;
@ -583,6 +614,7 @@ async fn nostr_server(
// if it has been too long, disconnect // if it has been too long, disconnect
if last_message_time.elapsed() > max_quiet_time { if last_message_time.elapsed() > max_quiet_time {
debug!("ending connection due to lack of client ping response"); debug!("ending connection due to lack of client ping response");
metrics.disconnects.with_label_values(&["timeout"]).inc();
break; break;
} }
// Send a ping // Send a ping
@ -599,6 +631,7 @@ async fn nostr_server(
ws_stream.send(Message::Text(send_str)).await.ok(); ws_stream.send(Message::Text(send_str)).await.ok();
} else { } else {
client_received_event_count += 1; client_received_event_count += 1;
metrics.sent_events.with_label_values(&["db"]).inc();
// send a result // send a result
let send_str = format!("[\"EVENT\",\"{}\",{}]", subesc, &query_result.event); let send_str = format!("[\"EVENT\",\"{}\",{}]", subesc, &query_result.event);
ws_stream.send(Message::Text(send_str)).await.ok(); ws_stream.send(Message::Text(send_str)).await.ok();
@ -620,6 +653,7 @@ async fn nostr_server(
global_event.get_event_id_prefix()); global_event.get_event_id_prefix());
// create an event response and send it // create an event response and send it
let subesc = s.replace('"', ""); let subesc = s.replace('"', "");
metrics.sent_events.with_label_values(&["realtime"]).inc();
ws_stream.send(Message::Text(format!("[\"EVENT\",\"{}\",{}]", subesc, event_str))).await.ok(); ws_stream.send(Message::Text(format!("[\"EVENT\",\"{}\",{}]", subesc, event_str))).await.ok();
} else { } else {
warn!("could not serialize event: {:?}", global_event.get_event_id_prefix()); warn!("could not serialize event: {:?}", global_event.get_event_id_prefix());
@ -655,16 +689,21 @@ async fn nostr_server(
WsError::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake))) WsError::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake)))
=> { => {
debug!("websocket close from client (cid: {}, ip: {:?})",cid, conn.ip()); debug!("websocket close from client (cid: {}, ip: {:?})",cid, conn.ip());
metrics.disconnects.with_label_values(&["normal"]).inc();
break; break;
}, },
Some(Err(WsError::Io(e))) => { Some(Err(WsError::Io(e))) => {
// IO errors are considered fatal // IO errors are considered fatal
warn!("IO error (cid: {}, ip: {:?}): {:?}", cid, conn.ip(), e); warn!("IO error (cid: {}, ip: {:?}): {:?}", cid, conn.ip(), e);
metrics.disconnects.with_label_values(&["error"]).inc();
break; break;
} }
x => { x => {
// default condition on error is to close the client connection // default condition on error is to close the client connection
info!("unknown error (cid: {}, ip: {:?}): {:?} (closing conn)", cid, conn.ip(), x); info!("unknown error (cid: {}, ip: {:?}): {:?} (closing conn)", cid, conn.ip(), x);
metrics.disconnects.with_label_values(&["error"]).inc();
break; break;
} }
}; };
@ -676,6 +715,7 @@ async fn nostr_server(
// handle each type of message // handle each type of message
let evid = ec.event_id().to_owned(); let evid = ec.event_id().to_owned();
let parsed : Result<Event> = Result::<Event>::from(ec); let parsed : Result<Event> = Result::<Event>::from(ec);
metrics.cmd_event.inc();
match parsed { match parsed {
Ok(e) => { Ok(e) => {
let id_prefix:String = e.id.chars().take(8).collect(); let id_prefix:String = e.id.chars().take(8).collect();
@ -712,6 +752,7 @@ async fn nostr_server(
if conn.has_subscription(&s) { if conn.has_subscription(&s) {
info!("client sent duplicate subscription, ignoring (cid: {}, sub: {:?})", cid, s.id); info!("client sent duplicate subscription, ignoring (cid: {}, sub: {:?})", cid, s.id);
} else { } else {
metrics.cmd_req.inc();
if let Some(ref lim) = sub_lim_opt { if let Some(ref lim) = sub_lim_opt {
lim.until_ready_with_jitter(jitter).await; lim.until_ready_with_jitter(jitter).await;
} }
@ -738,6 +779,7 @@ async fn nostr_server(
// closing a request simply removes the subscription. // closing a request simply removes the subscription.
let parsed : Result<Close> = Result::<Close>::from(cc); let parsed : Result<Close> = Result::<Close>::from(cc);
if let Ok(c) = parsed { if let Ok(c) = parsed {
metrics.cmd_close.inc();
// check if a query is currently // check if a query is currently
// running, and remove it if so. // running, and remove it if so.
let stop_tx = running_queries.remove(&c.id); let stop_tx = running_queries.remove(&c.id);
@ -787,9 +829,14 @@ async fn nostr_server(
#[derive(Clone)] #[derive(Clone)]
pub struct NostrMetrics { pub struct NostrMetrics {
pub query_sub: Histogram, pub query_sub: Histogram, // response time of successful subscriptions
pub write_events: Histogram, pub write_events: Histogram, // response time of event writes
pub sent_events: IntCounter, pub sent_events: IntCounterVec, // count of events sent to clients
pub connections: IntCounter, pub connections: IntCounter, // count of websocket connections
pub query_aborts: IntCounter, pub disconnects: IntCounterVec, // client disconnects
pub query_aborts: IntCounter, // count of queries aborted by server
pub cmd_req: IntCounter, // count of REQ commands received
pub cmd_event: IntCounter, // count of EVENT commands received
pub cmd_close: IntCounter, // count of CLOSE commands received
} }