feat: prometheus metrics

Prometheus metrics exposed at /metrics
This commit is contained in:
Kieran 2023-01-22 11:08:12 -06:00 committed by Greg Heartsfield
parent 6489e685ab
commit 4121c872bc
5 changed files with 112 additions and 9 deletions

22
Cargo.lock generated
View File

@ -1207,6 +1207,7 @@ dependencies = [
"lazy_static", "lazy_static",
"nonzero_ext", "nonzero_ext",
"parse_duration", "parse_duration",
"prometheus",
"r2d2", "r2d2",
"r2d2_sqlite", "r2d2_sqlite",
"rand 0.8.5", "rand 0.8.5",
@ -1545,6 +1546,21 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "prometheus"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c"
dependencies = [
"cfg-if",
"fnv",
"lazy_static",
"memchr",
"parking_lot",
"protobuf",
"thiserror",
]
[[package]] [[package]]
name = "prost" name = "prost"
version = "0.11.6" version = "0.11.6"
@ -1578,6 +1594,12 @@ dependencies = [
"prost", "prost",
] ]
[[package]]
name = "protobuf"
version = "2.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
[[package]] [[package]]
name = "quanta" name = "quanta"
version = "0.9.3" version = "0.9.3"

View File

@ -43,6 +43,7 @@ rand = "0.8"
const_format = "0.2.28" const_format = "0.2.28"
regex = "1" regex = "1"
async-trait = "0.1.60" async-trait = "0.1.60"
prometheus = "0.13.3"
[dev-dependencies] [dev-dependencies]
anyhow = "1" anyhow = "1"

View File

@ -3,6 +3,7 @@ use crate::config::Settings;
use crate::error::{Error, Result}; use crate::error::{Error, Result};
use crate::event::Event; use crate::event::Event;
use crate::notice::Notice; use crate::notice::Notice;
use crate::server::NostrMetrics;
use governor::clock::Clock; use governor::clock::Clock;
use governor::{Quota, RateLimiter}; use governor::{Quota, RateLimiter};
use r2d2; use r2d2;
@ -29,15 +30,15 @@ pub const DB_FILE: &str = "nostr.db";
/// # Panics /// # Panics
/// ///
/// Will panic if the pool could not be created. /// Will panic if the pool could not be created.
pub async fn build_repo(settings: &Settings) -> Arc<dyn NostrRepo> { pub async fn build_repo(settings: &Settings, metrics: NostrMetrics) -> Arc<dyn NostrRepo> {
match settings.database.engine.as_str() { match settings.database.engine.as_str() {
"sqlite" => {Arc::new(build_sqlite_pool(settings).await)}, "sqlite" => {Arc::new(build_sqlite_pool(settings, metrics).await)},
_ => panic!("Unknown database engine"), _ => panic!("Unknown database engine"),
} }
} }
async fn build_sqlite_pool(settings: &Settings) -> SqliteRepo { async fn build_sqlite_pool(settings: &Settings, metrics: NostrMetrics) -> SqliteRepo {
let repo = SqliteRepo::new(settings); let repo = SqliteRepo::new(settings, metrics);
repo.start().await.ok(); repo.start().await.ok();
repo.migrate_up().await.ok(); repo.migrate_up().await.ok();
repo repo

View File

@ -9,6 +9,7 @@ use crate::repo::sqlite_migration::{STARTUP_SQL,upgrade_db};
use crate::utils::{is_hex, is_lower_hex}; use crate::utils::{is_hex, is_lower_hex};
use crate::nip05::{Nip05Name, VerificationRecord}; use crate::nip05::{Nip05Name, VerificationRecord};
use crate::subscription::{ReqFilter, Subscription}; use crate::subscription::{ReqFilter, Subscription};
use crate::server::NostrMetrics;
use hex; use hex;
use r2d2; use r2d2;
use r2d2_sqlite::SqliteConnectionManager; use r2d2_sqlite::SqliteConnectionManager;
@ -35,6 +36,8 @@ pub const DB_FILE: &str = "nostr.db";
#[derive(Clone)] #[derive(Clone)]
pub struct SqliteRepo { pub struct SqliteRepo {
/// Metrics
metrics: NostrMetrics,
/// Pool for reading events and NIP-05 status /// Pool for reading events and NIP-05 status
read_pool: SqlitePool, read_pool: SqlitePool,
/// Pool for writing events and NIP-05 verification /// Pool for writing events and NIP-05 verification
@ -49,7 +52,7 @@ pub struct SqliteRepo {
impl SqliteRepo { impl SqliteRepo {
// build all the pools needed // build all the pools needed
#[must_use] pub fn new(settings: &Settings) -> SqliteRepo { #[must_use] pub fn new(settings: &Settings, metrics: NostrMetrics) -> SqliteRepo {
let maint_pool = build_pool( let maint_pool = build_pool(
"maintenance", "maintenance",
settings, settings,
@ -82,6 +85,7 @@ impl SqliteRepo {
let write_in_progress = Arc::new(Mutex::new(0)); let write_in_progress = Arc::new(Mutex::new(0));
SqliteRepo { SqliteRepo {
metrics,
read_pool, read_pool,
write_pool, write_pool,
maint_pool, maint_pool,
@ -235,13 +239,18 @@ impl NostrRepo for SqliteRepo {
} }
/// Persist event to database /// Persist event to database
async fn write_event(&self, e: &Event) -> Result<u64> { async fn write_event(&self, e: &Event) -> Result<u64> {
let start = Instant::now();
let _write_guard = self.write_in_progress.lock().await; let _write_guard = self.write_in_progress.lock().await;
// spawn a blocking thread // spawn a blocking thread
let mut conn = self.write_pool.get()?; let mut conn = self.write_pool.get()?;
let e = e.clone(); let e = e.clone();
task::spawn_blocking(move || { let event_count = task::spawn_blocking(move || {
SqliteRepo::persist_event(&mut conn, &e) SqliteRepo::persist_event(&mut conn, &e)
}).await? }).await?;
self.metrics
.write_events
.observe(start.elapsed().as_secs_f64());
event_count
} }
/// Perform a database query using a subscription. /// Perform a database query using a subscription.
@ -259,6 +268,7 @@ impl NostrRepo for SqliteRepo {
) -> Result<()> { ) -> Result<()> {
let pre_spawn_start = Instant::now(); let pre_spawn_start = Instant::now();
let self=self.clone(); let self=self.clone();
let metrics=self.metrics.clone();
task::spawn_blocking(move || { task::spawn_blocking(move || {
{ {
// if we are waiting on a checkpoint, stop until it is complete // if we are waiting on a checkpoint, stop until it is complete
@ -392,10 +402,13 @@ impl NostrRepo for SqliteRepo {
} else { } else {
warn!("Could not get a database connection for querying"); warn!("Could not get a database connection for querying");
} }
metrics
.query_sub
.observe(pre_spawn_start.elapsed().as_secs_f64());
let ok: Result<()> = Ok(()); let ok: Result<()> = Ok(());
ok ok
}); });
Ok(()) Ok(())
} }
/// Perform normal maintenance /// Perform normal maintenance

View File

@ -13,6 +13,7 @@ use crate::info::RelayInfo;
use crate::nip05; use crate::nip05;
use crate::notice::Notice; use crate::notice::Notice;
use crate::subscription::Subscription; use crate::subscription::Subscription;
use prometheus::{CounterVec, Encoder, Histogram, HistogramOpts, Opts, Registry, TextEncoder};
use futures::SinkExt; use futures::SinkExt;
use futures::StreamExt; use futures::StreamExt;
use governor::{Jitter, Quota, RateLimiter}; use governor::{Jitter, Quota, RateLimiter};
@ -55,6 +56,8 @@ async fn handle_web_request(
broadcast: Sender<Event>, broadcast: Sender<Event>,
event_tx: tokio::sync::mpsc::Sender<SubmittedEvent>, event_tx: tokio::sync::mpsc::Sender<SubmittedEvent>,
shutdown: Receiver<()>, shutdown: Receiver<()>,
registry: Registry,
metrics: NostrMetrics,
) -> Result<Response<Body>, Infallible> { ) -> Result<Response<Body>, Infallible> {
match ( match (
request.uri().path(), request.uri().path(),
@ -116,6 +119,7 @@ async fn handle_web_request(
broadcast, broadcast,
event_tx, event_tx,
shutdown, shutdown,
metrics,
)); ));
} }
// todo: trace, don't print... // todo: trace, don't print...
@ -168,6 +172,18 @@ async fn handle_web_request(
.body(Body::from("Please use a Nostr client to connect.")) .body(Body::from("Please use a Nostr client to connect."))
.unwrap()) .unwrap())
} }
("/metrics", false) => {
let mut buffer = vec![];
let encoder = TextEncoder::new();
let metric_families = registry.gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/plain")
.body(Body::from(buffer))
.unwrap())
}
(_, _) => { (_, _) => {
//handle any other url //handle any other url
Ok(Response::builder() Ok(Response::builder()
@ -293,8 +309,35 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
// overwhelming this will drop events and won't register // overwhelming this will drop events and won't register
// metadata events. // metadata events.
let (metadata_tx, metadata_rx) = broadcast::channel::<Event>(4096); let (metadata_tx, metadata_rx) = broadcast::channel::<Event>(4096);
// setup prometheus registry
let registry = Registry::new();
let query_sub = Histogram::with_opts(HistogramOpts::new(
"query_sub",
"Subscription response times",
))
.unwrap();
let write_events = Histogram::with_opts(HistogramOpts::new(
"write_event",
"Event writing response times",
))
.unwrap();
let connections = CounterVec::new(
Opts::new("connections", "New connections"),
vec!["origin"].as_slice(),
)
.unwrap();
registry.register(Box::new(query_sub.clone())).unwrap();
registry.register(Box::new(write_events.clone())).unwrap();
registry.register(Box::new(connections.clone())).unwrap();
let metrics = NostrMetrics {
query_sub,
write_events,
connections,
};
// build a repository for events // build a repository for events
let repo = db::build_repo(&settings).await; let repo = db::build_repo(&settings, metrics.clone()).await;
// start the database writer task. Give it a channel for // start the database writer task. Give it a channel for
// writing events, and for publishing events that have been // writing events, and for publishing events that have been
// written (to all connected clients). // written (to all connected clients).
@ -360,6 +403,8 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
let event = event_tx.clone(); let event = event_tx.clone();
let stop = invoke_shutdown.clone(); let stop = invoke_shutdown.clone();
let settings = settings.clone(); let settings = settings.clone();
let registry = registry.clone();
let metrics = metrics.clone();
async move { async move {
// service_fn converts our function into a `Service` // service_fn converts our function into a `Service`
Ok::<_, Infallible>(service_fn(move |request: Request<Body>| { Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {
@ -371,6 +416,8 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
bcast.clone(), bcast.clone(),
event.clone(), event.clone(),
stop.subscribe(), stop.subscribe(),
registry.clone(),
metrics.clone(),
) )
})) }))
} }
@ -451,6 +498,7 @@ async fn nostr_server(
broadcast: Sender<Event>, broadcast: Sender<Event>,
event_tx: mpsc::Sender<SubmittedEvent>, event_tx: mpsc::Sender<SubmittedEvent>,
mut shutdown: Receiver<()>, mut shutdown: Receiver<()>,
metrics: NostrMetrics,
) { ) {
// the time this websocket nostr server started // the time this websocket nostr server started
let orig_start = Instant::now(); let orig_start = Instant::now();
@ -510,6 +558,17 @@ async fn nostr_server(
"cid: {}, origin: {:?}, user-agent: {:?}", "cid: {}, origin: {:?}, user-agent: {:?}",
cid, origin, user_agent cid, origin, user_agent
); );
// Measure connections per origin
let mut metric_map: HashMap<&str, &str> = HashMap::new();
metric_map.insert("origin", origin.as_str());
metrics
.connections
.get_metric_with(&metric_map)
.unwrap()
.inc();
loop { loop {
tokio::select! { tokio::select! {
_ = shutdown.recv() => { _ = shutdown.recv() => {
@ -723,3 +782,10 @@ async fn nostr_server(
orig_start.elapsed() orig_start.elapsed()
); );
} }
#[derive(Clone)]
pub struct NostrMetrics {
pub query_sub: Histogram,
pub write_events: Histogram,
pub connections: CounterVec,
}