diff --git a/Cargo.lock b/Cargo.lock index ceb6816..cfb382a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1207,6 +1207,7 @@ dependencies = [ "lazy_static", "nonzero_ext", "parse_duration", + "prometheus", "r2d2", "r2d2_sqlite", "rand 0.8.5", @@ -1545,6 +1546,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf", + "thiserror", +] + [[package]] name = "prost" version = "0.11.6" @@ -1578,6 +1594,12 @@ dependencies = [ "prost", ] +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "quanta" version = "0.9.3" diff --git a/Cargo.toml b/Cargo.toml index 3fc859c..3b2164b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ rand = "0.8" const_format = "0.2.28" regex = "1" async-trait = "0.1.60" +prometheus = "0.13.3" [dev-dependencies] anyhow = "1" diff --git a/src/db.rs b/src/db.rs index 8fc4ae6..1bdd03c 100644 --- a/src/db.rs +++ b/src/db.rs @@ -3,6 +3,7 @@ use crate::config::Settings; use crate::error::{Error, Result}; use crate::event::Event; use crate::notice::Notice; +use crate::server::NostrMetrics; use governor::clock::Clock; use governor::{Quota, RateLimiter}; use r2d2; @@ -29,15 +30,15 @@ pub const DB_FILE: &str = "nostr.db"; /// # Panics /// /// Will panic if the pool could not be created. -pub async fn build_repo(settings: &Settings) -> Arc { +pub async fn build_repo(settings: &Settings, metrics: NostrMetrics) -> Arc { match settings.database.engine.as_str() { - "sqlite" => {Arc::new(build_sqlite_pool(settings).await)}, + "sqlite" => {Arc::new(build_sqlite_pool(settings, metrics).await)}, _ => panic!("Unknown database engine"), } } -async fn build_sqlite_pool(settings: &Settings) -> SqliteRepo { - let repo = SqliteRepo::new(settings); +async fn build_sqlite_pool(settings: &Settings, metrics: NostrMetrics) -> SqliteRepo { + let repo = SqliteRepo::new(settings, metrics); repo.start().await.ok(); repo.migrate_up().await.ok(); repo diff --git a/src/repo/sqlite.rs b/src/repo/sqlite.rs index 2a3be02..f69225a 100644 --- a/src/repo/sqlite.rs +++ b/src/repo/sqlite.rs @@ -9,6 +9,7 @@ use crate::repo::sqlite_migration::{STARTUP_SQL,upgrade_db}; use crate::utils::{is_hex, is_lower_hex}; use crate::nip05::{Nip05Name, VerificationRecord}; use crate::subscription::{ReqFilter, Subscription}; +use crate::server::NostrMetrics; use hex; use r2d2; use r2d2_sqlite::SqliteConnectionManager; @@ -35,6 +36,8 @@ pub const DB_FILE: &str = "nostr.db"; #[derive(Clone)] pub struct SqliteRepo { + /// Metrics + metrics: NostrMetrics, /// Pool for reading events and NIP-05 status read_pool: SqlitePool, /// Pool for writing events and NIP-05 verification @@ -49,7 +52,7 @@ pub struct SqliteRepo { impl SqliteRepo { // build all the pools needed - #[must_use] pub fn new(settings: &Settings) -> SqliteRepo { + #[must_use] pub fn new(settings: &Settings, metrics: NostrMetrics) -> SqliteRepo { let maint_pool = build_pool( "maintenance", settings, @@ -82,6 +85,7 @@ impl SqliteRepo { let write_in_progress = Arc::new(Mutex::new(0)); SqliteRepo { + metrics, read_pool, write_pool, maint_pool, @@ -235,13 +239,18 @@ impl NostrRepo for SqliteRepo { } /// Persist event to database async fn write_event(&self, e: &Event) -> Result { + let start = Instant::now(); let _write_guard = self.write_in_progress.lock().await; // spawn a blocking thread let mut conn = self.write_pool.get()?; let e = e.clone(); - task::spawn_blocking(move || { + let event_count = task::spawn_blocking(move || { SqliteRepo::persist_event(&mut conn, &e) - }).await? + }).await?; + self.metrics + .write_events + .observe(start.elapsed().as_secs_f64()); + event_count } /// Perform a database query using a subscription. @@ -259,6 +268,7 @@ impl NostrRepo for SqliteRepo { ) -> Result<()> { let pre_spawn_start = Instant::now(); let self=self.clone(); + let metrics=self.metrics.clone(); task::spawn_blocking(move || { { // if we are waiting on a checkpoint, stop until it is complete @@ -392,10 +402,13 @@ impl NostrRepo for SqliteRepo { } else { warn!("Could not get a database connection for querying"); } + metrics + .query_sub + .observe(pre_spawn_start.elapsed().as_secs_f64()); let ok: Result<()> = Ok(()); ok }); - Ok(()) + Ok(()) } /// Perform normal maintenance diff --git a/src/server.rs b/src/server.rs index 36da758..febdcbe 100644 --- a/src/server.rs +++ b/src/server.rs @@ -13,6 +13,7 @@ use crate::info::RelayInfo; use crate::nip05; use crate::notice::Notice; use crate::subscription::Subscription; +use prometheus::{CounterVec, Encoder, Histogram, HistogramOpts, Opts, Registry, TextEncoder}; use futures::SinkExt; use futures::StreamExt; use governor::{Jitter, Quota, RateLimiter}; @@ -55,6 +56,8 @@ async fn handle_web_request( broadcast: Sender, event_tx: tokio::sync::mpsc::Sender, shutdown: Receiver<()>, + registry: Registry, + metrics: NostrMetrics, ) -> Result, Infallible> { match ( request.uri().path(), @@ -116,6 +119,7 @@ async fn handle_web_request( broadcast, event_tx, shutdown, + metrics, )); } // todo: trace, don't print... @@ -168,6 +172,18 @@ async fn handle_web_request( .body(Body::from("Please use a Nostr client to connect.")) .unwrap()) } + ("/metrics", false) => { + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + let metric_families = registry.gather(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + + Ok(Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "text/plain") + .body(Body::from(buffer)) + .unwrap()) + } (_, _) => { //handle any other url Ok(Response::builder() @@ -293,8 +309,35 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul // overwhelming this will drop events and won't register // metadata events. let (metadata_tx, metadata_rx) = broadcast::channel::(4096); + + // setup prometheus registry + let registry = Registry::new(); + + let query_sub = Histogram::with_opts(HistogramOpts::new( + "query_sub", + "Subscription response times", + )) + .unwrap(); + let write_events = Histogram::with_opts(HistogramOpts::new( + "write_event", + "Event writing response times", + )) + .unwrap(); + let connections = CounterVec::new( + Opts::new("connections", "New connections"), + vec!["origin"].as_slice(), + ) + .unwrap(); + registry.register(Box::new(query_sub.clone())).unwrap(); + registry.register(Box::new(write_events.clone())).unwrap(); + registry.register(Box::new(connections.clone())).unwrap(); + let metrics = NostrMetrics { + query_sub, + write_events, + connections, + }; // build a repository for events - let repo = db::build_repo(&settings).await; + let repo = db::build_repo(&settings, metrics.clone()).await; // start the database writer task. Give it a channel for // writing events, and for publishing events that have been // written (to all connected clients). @@ -360,6 +403,8 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul let event = event_tx.clone(); let stop = invoke_shutdown.clone(); let settings = settings.clone(); + let registry = registry.clone(); + let metrics = metrics.clone(); async move { // service_fn converts our function into a `Service` Ok::<_, Infallible>(service_fn(move |request: Request| { @@ -371,6 +416,8 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul bcast.clone(), event.clone(), stop.subscribe(), + registry.clone(), + metrics.clone(), ) })) } @@ -451,6 +498,7 @@ async fn nostr_server( broadcast: Sender, event_tx: mpsc::Sender, mut shutdown: Receiver<()>, + metrics: NostrMetrics, ) { // the time this websocket nostr server started let orig_start = Instant::now(); @@ -510,6 +558,17 @@ async fn nostr_server( "cid: {}, origin: {:?}, user-agent: {:?}", cid, origin, user_agent ); + + // Measure connections per origin + let mut metric_map: HashMap<&str, &str> = HashMap::new(); + metric_map.insert("origin", origin.as_str()); + + metrics + .connections + .get_metric_with(&metric_map) + .unwrap() + .inc(); + loop { tokio::select! { _ = shutdown.recv() => { @@ -723,3 +782,10 @@ async fn nostr_server( orig_start.elapsed() ); } + +#[derive(Clone)] +pub struct NostrMetrics { + pub query_sub: Histogram, + pub write_events: Histogram, + pub connections: CounterVec, +}