Compare commits

...

30 Commits

Author SHA1 Message Date
Greg Heartsfield
95748647f0 build: bump version to 0.7.13 2022-12-22 16:27:34 -06:00
Greg Heartsfield
25480e837f fix: do not block writers for more than 1 second during checkpoints 2022-12-22 16:10:49 -06:00
Greg Heartsfield
b80b54cd9d improvement: reduce logging, especially for database pool size 2022-12-22 15:47:33 -06:00
Greg Heartsfield
8ea732cbe5 feat: perform regular database maintenance (60sec), without blocking main writer thread 2022-12-22 15:16:21 -06:00
Greg Heartsfield
0f68c4e5c2 refactor: formatting 2022-12-22 15:15:45 -06:00
Greg Heartsfield
dab2cd5792 wip: future changes to rustfmt 2022-12-22 15:13:54 -06:00
Greg Heartsfield
f411aa6fc2 fix: do not re-verify NIP-05 entries where metadata was deleted 2022-12-22 13:01:48 -06:00
Greg Heartsfield
d31bbda087 improvement: reduce lifetime of database connections 2022-12-22 13:01:12 -06:00
Greg Heartsfield
5917bc53b2 improvement: run maintenance every 60 seconds instead of by event count 2022-12-22 11:40:17 -06:00
Greg Heartsfield
91177c61a1 improvement: log reason for new event creation from nip05 2022-12-22 10:48:30 -06:00
Greg Heartsfield
53c2a8051c improvement: reduce logging 2022-12-22 10:29:27 -06:00
Greg Heartsfield
168cf513ac feat: perform full checkpoints and truncate WAL every 2k events 2022-12-22 10:11:05 -06:00
Greg Heartsfield
ea204761c9 fix: do not show slow queries more than once per sub 2022-12-20 15:41:50 -06:00
Greg Heartsfield
c270ae1434 improvement: reduce event count for db writer pauses 2022-12-20 15:25:24 -06:00
Greg Heartsfield
64bd983cb6 perf: every 5000 persisted events, pause for 500ms for backups
I have observed backups running for a very long time under heavy load,
this introduces some artificial delay to give the online backup enough
time to make progress.
2022-12-20 15:05:04 -06:00
Greg Heartsfield
1c153bc784 perf: shed DB query load when queue gets large 2022-12-20 13:23:21 -06:00
Greg Heartsfield
dc11d9a619 improvement: explicitly rollback transaction on duplicate event 2022-12-20 13:23:04 -06:00
Greg Heartsfield
cd1557787b improvement: log write pool 2022-12-20 13:21:57 -06:00
Greg Heartsfield
86bb7aeb9a improvement: function to check pool capacity 2022-12-20 10:07:01 -06:00
Greg Heartsfield
ce37fc1a2d build: bump version to 0.7.12 2022-12-19 14:50:42 -06:00
Greg Heartsfield
2cfd384339 perf: drop db handles that are not quickly read 2022-12-19 00:18:39 -06:00
Greg Heartsfield
8c013107f9 perf: increase upper bound for sqlite mmap 2022-12-18 23:19:43 -06:00
Greg Heartsfield
64a4466d30 perf: backing down on max_blocking_threads 2022-12-18 23:14:41 -06:00
Greg Heartsfield
1596c23eb4 perf: increase blocking threads now that contention is reduced 2022-12-18 22:46:32 -06:00
Greg Heartsfield
129badd4e1 perf: reduce per thread mmap allocation for DB 2022-12-18 22:45:32 -06:00
Greg Heartsfield
6f7c080180 improvement: reduce number of writer blocking threads from 4->2 2022-12-18 22:32:31 -06:00
Greg Heartsfield
af92561ef6 perf: remove shared cache mode (experiment) 2022-12-18 22:15:50 -06:00
Greg Heartsfield
d833a3e40d perf: reduce logging 2022-12-18 22:11:46 -06:00
Greg Heartsfield
462eb46642 build: bump version to 0.7.11 2022-12-18 20:52:01 -06:00
Greg Heartsfield
cf144d503d perf: reduce logging for slow queries 2022-12-18 20:47:11 -06:00
8 changed files with 192 additions and 54 deletions

2
Cargo.lock generated
View File

@@ -1096,7 +1096,7 @@ checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]]
name = "nostr-rs-relay"
version = "0.7.10"
version = "0.7.13"
dependencies = [
"anyhow",
"bitcoin_hashes",

View File

@@ -1,6 +1,6 @@
[package]
name = "nostr-rs-relay"
version = "0.7.10"
version = "0.7.13"
edition = "2021"
authors = ["Greg Heartsfield <scsibug@imap.cc>"]
description = "A relay implementation for the Nostr protocol"

View File

@@ -1 +1,4 @@
edition = "2021"
#max_width = 140
#chain_width = 100
#fn_call_width = 100

View File

@@ -5,7 +5,7 @@ use crate::error::Result;
use crate::subscription::Subscription;
use std::collections::HashMap;
use tracing::{debug, info};
use tracing::{debug, trace};
use uuid::Uuid;
/// A subscription identifier has a maximum length
@@ -74,7 +74,7 @@ impl ClientConn {
// prevent arbitrarily long subscription identifiers from
// being used.
if sub_id_len > MAX_SUBSCRIPTION_ID_LEN {
info!(
debug!(
"ignoring sub request with excessive length: ({})",
sub_id_len
);
@@ -84,7 +84,7 @@ impl ClientConn {
if self.subscriptions.contains_key(&k) {
self.subscriptions.remove(&k);
self.subscriptions.insert(k, s.clone());
debug!(
trace!(
"replaced existing subscription (cid: {}, sub: {:?})",
self.get_client_prefix(),
s.get_id()
@@ -98,7 +98,7 @@ impl ClientConn {
}
// add subscription
self.subscriptions.insert(k, s);
debug!(
trace!(
"registered new subscription, currently have {} active subs (cid: {})",
self.subscriptions.len(),
self.get_client_prefix(),
@@ -110,7 +110,7 @@ impl ClientConn {
pub fn unsubscribe(&mut self, c: &Close) {
// TODO: return notice if subscription did not exist.
self.subscriptions.remove(&c.id);
debug!(
trace!(
"removed subscription, currently have {} active subs (cid: {})",
self.subscriptions.len(),
self.get_client_prefix(),

202
src/db.rs
View File

@@ -38,8 +38,14 @@ pub struct SubmittedEvent {
/// Database file
pub const DB_FILE: &str = "nostr.db";
/// How many persisted events before optimization is triggered
pub const EVENT_COUNT_OPTIMIZE_TRIGGER: usize = 500;
/// How frequently to run maintenance
/// How many persisted events before DB maintenannce is triggered.
pub const EVENT_MAINTENANCE_FREQ_SEC: u64 = 60;
/// How many persisted events before we pause for backups.
/// It isn't clear this is enough to make the online backup API work yet.
pub const EVENT_COUNT_BACKUP_PAUSE_TRIGGER: usize = 1000;
/// Build a database connection pool.
/// # Panics
@@ -78,7 +84,7 @@ pub fn build_pool(
.test_on_check_out(true) // no noticeable performance hit
.min_idle(Some(min_size))
.max_size(max_size)
.max_lifetime(Some(Duration::from_secs(60)))
.max_lifetime(Some(Duration::from_secs(30)))
.build(manager)
.unwrap();
info!(
@@ -90,7 +96,41 @@ pub fn build_pool(
/// Perform normal maintenance
pub fn optimize_db(conn: &mut PooledConnection) -> Result<()> {
let start = Instant::now();
conn.execute_batch("PRAGMA optimize;")?;
info!("optimize ran in {:?}", start.elapsed());
Ok(())
}
#[derive(Debug)]
enum SqliteReturnStatus {
SqliteOk,
SqliteBusy,
SqliteError,
SqliteOther(u64),
}
/// Checkpoint/Truncate WAL
pub fn checkpoint_db(conn: &mut PooledConnection) -> Result<()> {
let query = "PRAGMA wal_checkpoint(TRUNCATE);";
let start = Instant::now();
let (cp_result, wal_size, _frames_checkpointed) = conn.query_row(query, [], |row| {
let checkpoint_result: u64 = row.get(0)?;
let wal_size: u64 = row.get(1)?;
let frames_checkpointed: u64 = row.get(2)?;
Ok((checkpoint_result, wal_size, frames_checkpointed))
})?;
let result = match cp_result {
0 => SqliteReturnStatus::SqliteOk,
1 => SqliteReturnStatus::SqliteBusy,
2 => SqliteReturnStatus::SqliteError,
x => SqliteReturnStatus::SqliteOther(x),
};
info!(
"checkpoint ran in {:?} (result: {:?}, WAL size: {})",
start.elapsed(),
result,
wal_size
);
Ok(())
}
@@ -116,7 +156,7 @@ pub async fn db_writer(
&settings,
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
1,
4,
2,
false,
);
if settings.database.in_memory {
@@ -133,8 +173,10 @@ pub async fn db_writer(
let rps_setting = settings.limits.messages_per_sec;
let mut most_recent_rate_limit = Instant::now();
let mut lim_opt = None;
// Keep rough track of events so we can run optimize eventually.
let mut optimize_counter: usize = 0;
// Constant writing has interfered with online backups. Keep
// track of how long since we've given the backups a chance to
// run.
let mut backup_pause_counter: usize = 0;
let clock = governor::clock::QuantaClock::default();
if let Some(rps) = rps_setting {
if rps > 0 {
@@ -199,9 +241,10 @@ pub async fn db_writer(
event.get_author_prefix()
);
} else {
info!("rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)",
uv.name.to_string(),
event.get_author_prefix()
info!(
"rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)",
uv.name.to_string(),
event.get_author_prefix()
);
notice_tx
.try_send(Notice::blocked(
@@ -243,6 +286,7 @@ pub async fn db_writer(
);
event_write = true
} else {
log_pool_stats("writer", &pool);
match write_event(&mut pool.get()?, &event) {
Ok(updated) => {
if updated == 0 {
@@ -267,12 +311,11 @@ pub async fn db_writer(
notice_tx.try_send(Notice::error(event.id, msg)).ok();
}
}
// Use this as a trigger to do optimization
optimize_counter += 1;
if optimize_counter > EVENT_COUNT_OPTIMIZE_TRIGGER {
info!("running database optimizer");
optimize_counter = 0;
optimize_db(&mut pool.get()?).ok();
backup_pause_counter += 1;
if backup_pause_counter > EVENT_COUNT_BACKUP_PAUSE_TRIGGER {
info!("pausing db write thread for a moment...");
thread::sleep(Duration::from_millis(500));
backup_pause_counter = 0
}
}
@@ -320,7 +363,8 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
)?;
if ins_count == 0 {
// if the event was a duplicate, no need to insert event or
// pubkey references. This will abort the txn.
// pubkey references.
tx.rollback().ok();
return Ok(ins_count);
}
// remember primary key of the event most recently inserted.
@@ -338,9 +382,9 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
// if tagvalue is lowercase hex;
if is_lower_hex(tagval) && (tagval.len() % 2 == 0) {
tx.execute(
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
params![ev_id, &tagname, hex::decode(tagval).ok()],
)?;
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
params![ev_id, &tagname, hex::decode(tagval).ok()],
)?;
} else {
tx.execute(
"INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)",
@@ -555,7 +599,10 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
let str_clause = format!("value IN ({})", repeat_vars(str_vals.len()));
let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
// find evidence of the target tag name/value existing for this event.
let tag_clause = format!("e.id IN (SELECT e.id FROM event e LEFT JOIN tag t on e.id=t.event_id WHERE hidden!=TRUE and (name=? AND ({} OR {})))", str_clause, blob_clause);
let tag_clause = format!(
"e.id IN (SELECT e.id FROM event e LEFT JOIN tag t on e.id=t.event_id WHERE hidden!=TRUE and (name=? AND ({} OR {})))",
str_clause, blob_clause
);
// add the tag name as the first parameter
params.push(Box::new(key.to_string()));
// add all tag values that are plain strings as params
@@ -614,13 +661,47 @@ fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
(query, params)
}
fn log_pool_stats(pool: &SqlitePool) {
/// Check if the pool is fully utilized
fn _pool_at_capacity(pool: &SqlitePool) -> bool {
let state: r2d2::State = pool.state();
state.idle_connections == 0
}
/// Log pool stats
fn log_pool_stats(name: &str, pool: &SqlitePool) {
let state: r2d2::State = pool.state();
let in_use_cxns = state.connections - state.idle_connections;
debug!(
"DB pool usage (in_use: {}, available: {})",
in_use_cxns, state.connections
trace!(
"DB pool {:?} usage (in_use: {}, available: {})",
name,
in_use_cxns,
state.connections
);
if state.connections == in_use_cxns {
debug!("DB pool {:?} is empty (in_use: {})", name, in_use_cxns);
}
}
/// Perform database maintenance on a regular basis
pub async fn db_maintenance(pool: SqlitePool) {
tokio::task::spawn(async move {
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(EVENT_MAINTENANCE_FREQ_SEC)) => {
if let Ok(mut conn) = pool.get() {
// the busy timer will block writers, so don't set
// this any higher than you want max latency for event
// writes.
conn.busy_timeout(Duration::from_secs(1)).ok();
debug!("running database optimizer");
optimize_db(&mut conn).ok();
debug!("running wal_checkpoint(TRUNCATE)");
checkpoint_db(&mut conn).ok();
}
}
};
}
});
}
/// Perform a database query using a subscription.
@@ -636,20 +717,42 @@ pub async fn db_query(
query_tx: tokio::sync::mpsc::Sender<QueryResult>,
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
) {
let start = Instant::now();
let pre_spawn_start = Instant::now();
task::spawn_blocking(move || {
debug!("moved DB query to thread in {:?}", start.elapsed());
let db_queue_time = pre_spawn_start.elapsed();
// if the queue time was very long (>5 seconds), spare the DB and abort.
if db_queue_time > Duration::from_secs(5) {
info!(
"shedding DB query load from {:?} (cid: {}, sub: {:?})",
db_queue_time, client_id, sub.id
);
return Ok(());
}
// otherwise, report queuing time if it is slow
else if db_queue_time > Duration::from_secs(1) {
debug!(
"(slow) DB query queued for {:?} (cid: {}, sub: {:?})",
db_queue_time, client_id, sub.id
);
}
let start = Instant::now();
let mut row_count: usize = 0;
// generate SQL query
let (q, p) = query_from_sub(&sub);
debug!("SQL generated in {:?}", start.elapsed());
let sql_gen_elapsed = start.elapsed();
if sql_gen_elapsed > Duration::from_millis(10) {
debug!("SQL (slow) generated in {:?}", start.elapsed());
}
// show pool stats
log_pool_stats(&pool);
log_pool_stats("reader", &pool);
// cutoff for displaying slow queries
let slow_cutoff = Duration::from_millis(1000);
let slow_cutoff = Duration::from_millis(2000);
// any client that doesn't cause us to generate new rows in 5
// seconds gets dropped.
let abort_cutoff = Duration::from_secs(5);
let start = Instant::now();
let mut slow_first_event;
let mut last_successful_send = Instant::now();
if let Ok(conn) = pool.get() {
// execute the query. Don't cache, since queries vary so much.
let mut stmt = conn.prepare(&q)?;
@@ -665,14 +768,14 @@ pub async fn db_query(
);
first_result = false;
}
// logging for slow queries; show sub and SQL
//
if slow_first_event {
info!(
// logging for slow queries; show sub and SQL.
// to reduce logging; only show 1/16th of clients (leading 0)
if row_count == 0 && slow_first_event && client_id.starts_with("0") {
debug!(
"query req (slow): {:?} (cid: {}, sub: {:?})",
sub, client_id, sub.id
);
info!(
debug!(
"query string (slow): {} (cid: {}, sub: {:?})",
q, client_id, sub.id
);
@@ -690,20 +793,40 @@ pub async fn db_query(
sub.id
);
}
// check if this is still active
// TODO: check every N rows
if abandon_query_rx.try_recv().is_ok() {
// check if this is still active; every 100 rows
if row_count % 100 == 0 && abandon_query_rx.try_recv().is_ok() {
debug!("query aborted (cid: {}, sub: {:?})", client_id, sub.id);
return Ok(());
}
row_count += 1;
let event_json = row.get(0)?;
loop {
if query_tx.capacity() != 0 {
// we have capacity to add another item
break;
} else {
// the queue is full
trace!("db reader thread is stalled");
if last_successful_send + abort_cutoff < Instant::now() {
// the queue has been full for too long, abort
info!("aborting database query due to slow client");
let ok: Result<()> = Ok(());
return ok;
}
// give the queue a chance to clear before trying again
thread::sleep(Duration::from_millis(100));
}
}
// TODO: we could use try_send, but we'd have to juggle
// getting the query result back as part of the error
// result.
query_tx
.blocking_send(QueryResult {
sub_id: sub.get_id(),
event: event_json,
})
.ok();
last_successful_send = Instant::now();
}
query_tx
.blocking_send(QueryResult {
@@ -712,10 +835,11 @@ pub async fn db_query(
})
.ok();
debug!(
"query completed in {:?} (cid: {}, sub: {:?}, rows: {})",
start.elapsed(),
"query completed in {:?} (cid: {}, sub: {:?}, db_time: {:?}, rows: {})",
pre_spawn_start.elapsed(),
client_id,
sub.id,
start.elapsed(),
row_count
);
} else {

View File

@@ -517,7 +517,7 @@ impl Verifier {
Ok(updated) => {
if updated != 0 {
info!(
"persisted event: {:?} in {:?}",
"persisted event (new verified pubkey): {:?} in {:?}",
event.get_event_id_prefix(),
start.elapsed()
);
@@ -721,7 +721,7 @@ pub fn query_oldest_user_verification(
earliest: u64,
) -> Result<VerificationRecord> {
let tx = conn.transaction()?;
let query = "SELECT v.id, v.name, e.event_hash, e.author, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v LEFT JOIN event e ON e.id=v.metadata_event WHERE (v.verified_at < ? OR v.verified_at IS NULL) AND (v.failed_at < ? OR v.failed_at IS NULL) ORDER BY v.verified_at ASC, v.failed_at ASC LIMIT 1;";
let query = "SELECT v.id, v.name, e.event_hash, e.author, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v INNER JOIN event e ON e.id=v.metadata_event WHERE (v.verified_at < ? OR v.verified_at IS NULL) AND (v.failed_at < ? OR v.failed_at IS NULL) ORDER BY v.verified_at ASC, v.failed_at ASC LIMIT 1;";
let mut stmt = tx.prepare_cached(query)?;
let fields = stmt.query_row(params![earliest, earliest], |r| {
let rowid: u64 = r.get(0)?;

View File

@@ -16,7 +16,7 @@ pub const STARTUP_SQL: &str = r##"
PRAGMA main.synchronous=NORMAL;
PRAGMA foreign_keys = ON;
PRAGMA journal_size_limit=32768;
pragma mmap_size = 1073741824; -- 1024MB of mmap
pragma mmap_size = 17179869184; -- cap mmap at 16GB
"##;
/// Latest database version

View File

@@ -22,6 +22,7 @@ use hyper::upgrade::Upgraded;
use hyper::{
header, server::conn::AddrStream, upgrade, Body, Request, Response, Server, StatusCode,
};
use rusqlite::OpenFlags;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
@@ -248,10 +249,10 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
// limit concurrent SQLite blocking threads
.max_blocking_threads(settings.limits.max_blocking_threads)
.on_thread_start(|| {
debug!("started new thread");
trace!("started new thread");
})
.on_thread_stop(|| {
debug!("stopping thread");
trace!("stopping thread");
})
.build()
.unwrap();
@@ -310,6 +311,17 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
}
}
}
// build a connection pool for DB maintenance
let maintenance_pool = db::build_pool(
"maintenance writer",
&settings,
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
1,
1,
false,
);
db::db_maintenance(maintenance_pool).await;
// listen for (external to tokio) shutdown request
let controlled_shutdown = invoke_shutdown.clone();
tokio::spawn(async move {
@@ -339,8 +351,7 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
let pool = db::build_pool(
"client query",
&settings,
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
| rusqlite::OpenFlags::SQLITE_OPEN_SHARED_CACHE,
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY,
db_min_conn,
db_max_conn,
true,
@@ -539,7 +550,7 @@ async fn nostr_server(
// TODO: serialize at broadcast time, instead of
// once for each consumer.
if let Ok(event_str) = serde_json::to_string(&global_event) {
debug!("sub match for client: {}, sub: {:?}, event: {:?}",
trace!("sub match for client: {}, sub: {:?}, event: {:?}",
cid, s,
global_event.get_event_id_prefix());
// create an event response and send it