Compare commits

...

46 Commits

Author SHA1 Message Date
Greg Heartsfield
6a8c4ed1b5 build: bump version to 0.7.14 2022-12-26 11:26:48 -06:00
Greg Heartsfield
966c853700 docs: non-docker quick start 2022-12-26 10:34:09 -06:00
Greg Heartsfield
65fd0ed08b feat: increase wal_checkpoint time when WAL is large 2022-12-26 10:03:51 -06:00
Greg Heartsfield
0b51675b38 improvement: change suggestion and default for max sqlite DB readers 2022-12-25 11:17:08 -06:00
Greg Heartsfield
2e22334631 refactor: formatting 2022-12-25 11:06:30 -06:00
Greg Heartsfield
cb2ac4bf0f improvement: give threads unique names 2022-12-25 10:47:32 -06:00
Greg Heartsfield
38dc7789dc improvement: cleaner slow query logs 2022-12-25 10:47:32 -06:00
Greg Heartsfield
ce0e00ffb3 feat: log reader DB pool stats every minute 2022-12-25 10:47:32 -06:00
Greg Heartsfield
3e4ae4aeec feat: cache prepared statements and trace expanded SQL queries 2022-12-25 10:47:32 -06:00
Greg Heartsfield
c6a8807485 improvement: send error on empty-string prefix author/id searches 2022-12-25 10:47:32 -06:00
Greg Heartsfield
8137b6211c refactor: clippy suggestions 2022-12-24 10:29:47 -06:00
Greg Heartsfield
29effaae23 build: remove pre-commit rustfmt check 2022-12-24 10:29:30 -06:00
Greg Heartsfield
e5074f2e46 feat(NIP-28): replaceable kind 41 channel metadata events 2022-12-24 10:14:43 -06:00
Blake Jakopovic
4fd7643907 feat: change pub(crate) to pub for use as a library 2022-12-23 07:14:58 -06:00
Greg Heartsfield
1e1ec69175 build: remove unnecessary dockerfile mod script 2022-12-23 06:52:09 -06:00
benthecarman
e08647867c refactor: remove code duplication for simple_event 2022-12-23 06:39:50 -06:00
Greg Heartsfield
ae0f7171ed build: remove digest-locked docker base images 2022-12-23 06:30:59 -06:00
Greg Heartsfield
4f1a912f36 feat: log origin header from websocket requests
fixes: https://todo.sr.ht/~gheartsfield/nostr-rs-relay/33
2022-12-22 16:55:53 -06:00
Greg Heartsfield
95748647f0 build: bump version to 0.7.13 2022-12-22 16:27:34 -06:00
Greg Heartsfield
25480e837f fix: do not block writers for more than 1 second during checkpoints 2022-12-22 16:10:49 -06:00
Greg Heartsfield
b80b54cd9d improvement: reduce logging, especially for database pool size 2022-12-22 15:47:33 -06:00
Greg Heartsfield
8ea732cbe5 feat: perform regular database maintenance (60sec), without blocking main writer thread 2022-12-22 15:16:21 -06:00
Greg Heartsfield
0f68c4e5c2 refactor: formatting 2022-12-22 15:15:45 -06:00
Greg Heartsfield
dab2cd5792 wip: future changes to rustfmt 2022-12-22 15:13:54 -06:00
Greg Heartsfield
f411aa6fc2 fix: do not re-verify NIP-05 entries where metadata was deleted 2022-12-22 13:01:48 -06:00
Greg Heartsfield
d31bbda087 improvement: reduce lifetime of database connections 2022-12-22 13:01:12 -06:00
Greg Heartsfield
5917bc53b2 improvement: run maintenance every 60 seconds instead of by event count 2022-12-22 11:40:17 -06:00
Greg Heartsfield
91177c61a1 improvement: log reason for new event creation from nip05 2022-12-22 10:48:30 -06:00
Greg Heartsfield
53c2a8051c improvement: reduce logging 2022-12-22 10:29:27 -06:00
Greg Heartsfield
168cf513ac feat: perform full checkpoints and truncate WAL every 2k events 2022-12-22 10:11:05 -06:00
Greg Heartsfield
ea204761c9 fix: do not show slow queries more than once per sub 2022-12-20 15:41:50 -06:00
Greg Heartsfield
c270ae1434 improvement: reduce event count for db writer pauses 2022-12-20 15:25:24 -06:00
Greg Heartsfield
64bd983cb6 perf: every 5000 persisted events, pause for 500ms for backups
I have observed backups running for a very long time under heavy load,
this introduces some artificial delay to give the online backup enough
time to make progress.
2022-12-20 15:05:04 -06:00
Greg Heartsfield
1c153bc784 perf: shed DB query load when queue gets large 2022-12-20 13:23:21 -06:00
Greg Heartsfield
dc11d9a619 improvement: explicitly rollback transaction on duplicate event 2022-12-20 13:23:04 -06:00
Greg Heartsfield
cd1557787b improvement: log write pool 2022-12-20 13:21:57 -06:00
Greg Heartsfield
86bb7aeb9a improvement: function to check pool capacity 2022-12-20 10:07:01 -06:00
Greg Heartsfield
ce37fc1a2d build: bump version to 0.7.12 2022-12-19 14:50:42 -06:00
Greg Heartsfield
2cfd384339 perf: drop db handles that are not quickly read 2022-12-19 00:18:39 -06:00
Greg Heartsfield
8c013107f9 perf: increase upper bound for sqlite mmap 2022-12-18 23:19:43 -06:00
Greg Heartsfield
64a4466d30 perf: backing down on max_blocking_threads 2022-12-18 23:14:41 -06:00
Greg Heartsfield
1596c23eb4 perf: increase blocking threads now that contention is reduced 2022-12-18 22:46:32 -06:00
Greg Heartsfield
129badd4e1 perf: reduce per thread mmap allocation for DB 2022-12-18 22:45:32 -06:00
Greg Heartsfield
6f7c080180 improvement: reduce number of writer blocking threads from 4->2 2022-12-18 22:32:31 -06:00
Greg Heartsfield
af92561ef6 perf: remove shared cache mode (experiment) 2022-12-18 22:15:50 -06:00
Greg Heartsfield
d833a3e40d perf: reduce logging 2022-12-18 22:11:46 -06:00
17 changed files with 370 additions and 132 deletions

View File

@@ -11,6 +11,6 @@ repos:
- repo: https://github.com/doublify/pre-commit-rust
rev: v1.0
hooks:
- id: fmt
# - id: fmt
- id: cargo-check
- id: clippy

2
Cargo.lock generated
View File

@@ -1096,7 +1096,7 @@ checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]]
name = "nostr-rs-relay"
version = "0.7.11"
version = "0.7.14"
dependencies = [
"anyhow",
"bitcoin_hashes",

View File

@@ -1,6 +1,6 @@
[package]
name = "nostr-rs-relay"
version = "0.7.11"
version = "0.7.14"
edition = "2021"
authors = ["Greg Heartsfield <scsibug@imap.cc>"]
description = "A relay implementation for the Nostr protocol"
@@ -28,7 +28,7 @@ secp256k1 = {version = "0.21", features = ["rand", "rand-std", "serde", "bitcoin
serde = { version = "1.0", features = ["derive"] }
serde_json = {version = "1.0", features = ["preserve_order"]}
hex = "0.4"
rusqlite = { version = "0.26", features = ["limits","bundled"]}
rusqlite = { version = "0.26", features = ["limits","bundled","modern_sqlite", "trace"]}
r2d2 = "0.8"
r2d2_sqlite = "0.19"
lazy_static = "1.4"

View File

@@ -1,4 +1,4 @@
FROM docker.io/library/rust:1.66.0@sha256:359949280cebefe93ccb33089fe25111a3aadfe99eac4b6cbe8ec3e1b571dacb as builder
FROM docker.io/library/rust:1.66.0 as builder
RUN USER=root cargo install cargo-auditable
RUN USER=root cargo new --bin nostr-rs-relay
@@ -17,7 +17,7 @@ COPY ./src ./src
RUN rm ./target/release/deps/nostr*relay*
RUN cargo auditable build --release --locked
FROM docker.io/library/debian:bullseye-20221205-slim@sha256:25f10b4f1ded5341a3ca0a30290ff3cd5639415f0c5a2222d5e7d5dd72952aa1
FROM docker.io/library/debian:bullseye-slim
ARG APP=/usr/src/app
ARG APP_DATA=/usr/src/app/db

View File

@@ -29,6 +29,7 @@ mirrored on [GitHub](https://github.com/scsibug/nostr-rs-relay).
- [x] NIP-20: [Command Results](https://github.com/nostr-protocol/nips/blob/master/20.md)
- [x] NIP-22: [Event `created_at` limits](https://github.com/nostr-protocol/nips/blob/master/22.md) (_future-dated events only_)
- [x] NIP-26: [Event Delegation](https://github.com/nostr-protocol/nips/blob/master/26.md)
- [x] NIP-28: [Public Chat](https://github.com/nostr-protocol/nips/blob/master/28.md)
## Quick Start
@@ -81,6 +82,38 @@ Text Note [81cf...2652] from 296a...9b92 5 seconds ago
A pre-built container is also available on DockerHub:
https://hub.docker.com/r/scsibug/nostr-rs-relay
## Build and Run (without Docker)
Building `nostr-rs-relay` requires an installation of Cargo & Rust: https://www.rust-lang.org/tools/install
Clone this repository, and then build a release version of the relay:
```console
$ git clone -q https://git.sr.ht/\~gheartsfield/nostr-rs-relay
$ cd nostr-rs-relay
$ cargo build -q -r
```
The relay executable is now located in
`target/release/nostr-rs-relay`. In order to run it with logging
enabled, execute it with the `RUST_LOG` variable set:
```console
$ RUST_LOG=warn,nostr_rs_relay=info ./target/release/nostr-rs-relay
Dec 26 10:31:56.455 INFO nostr_rs_relay: Starting up from main
Dec 26 10:31:56.464 INFO nostr_rs_relay::server: listening on: 0.0.0.0:8080
Dec 26 10:31:56.466 INFO nostr_rs_relay::server: db writer created
Dec 26 10:31:56.466 INFO nostr_rs_relay::db: Built a connection pool "event writer" (min=1, max=2)
Dec 26 10:31:56.466 INFO nostr_rs_relay::db: opened database "./nostr.db" for writing
Dec 26 10:31:56.466 INFO nostr_rs_relay::schema: DB version = 11
Dec 26 10:31:56.467 INFO nostr_rs_relay::db: Built a connection pool "maintenance writer" (min=1, max=2)
Dec 26 10:31:56.467 INFO nostr_rs_relay::server: control message listener started
Dec 26 10:31:56.468 INFO nostr_rs_relay::db: Built a connection pool "client query" (min=4, max=8)
```
You now have a running relay, on port `8080`. Use a `nostr` client or
`websocat` to connect and send/query for events.
## Configuration
The sample [`config.toml`](config.toml) file demonstrates the

View File

@@ -36,8 +36,9 @@ data_directory = "."
# Minimum number of SQLite reader connections
#min_conn = 4
# Maximum number of SQLite reader connections
#max_conn = 128
# Maximum number of SQLite reader connections. Recommend setting this
# to approx the number of cores.
#max_conn = 8
[network]
# Bind to this network address

View File

@@ -1,3 +0,0 @@
#!/usr/bin/env bash
sed -E 's/@sha256:[[:alnum:]]+//g' Dockerfile > Dockerfile.any-platform
echo "Created platform-agnostic Dockerfile in 'Dockerfile.any-platform'"

View File

@@ -1 +1,4 @@
edition = "2021"
#max_width = 140
#chain_width = 100
#fn_call_width = 100

View File

@@ -207,7 +207,7 @@ impl Default for Settings {
data_directory: ".".to_owned(),
in_memory: false,
min_conn: 4,
max_conn: 128,
max_conn: 8,
},
network: Network {
port: 8080,

View File

@@ -5,7 +5,7 @@ use crate::error::Result;
use crate::subscription::Subscription;
use std::collections::HashMap;
use tracing::{debug, info};
use tracing::{debug, trace};
use uuid::Uuid;
/// A subscription identifier has a maximum length
@@ -74,7 +74,7 @@ impl ClientConn {
// prevent arbitrarily long subscription identifiers from
// being used.
if sub_id_len > MAX_SUBSCRIPTION_ID_LEN {
info!(
debug!(
"ignoring sub request with excessive length: ({})",
sub_id_len
);
@@ -84,7 +84,7 @@ impl ClientConn {
if self.subscriptions.contains_key(&k) {
self.subscriptions.remove(&k);
self.subscriptions.insert(k, s.clone());
debug!(
trace!(
"replaced existing subscription (cid: {}, sub: {:?})",
self.get_client_prefix(),
s.get_id()
@@ -98,7 +98,7 @@ impl ClientConn {
}
// add subscription
self.subscriptions.insert(k, s);
debug!(
trace!(
"registered new subscription, currently have {} active subs (cid: {})",
self.subscriptions.len(),
self.get_client_prefix(),
@@ -110,7 +110,7 @@ impl ClientConn {
pub fn unsubscribe(&mut self, c: &Close) {
// TODO: return notice if subscription did not exist.
self.subscriptions.remove(&c.id);
debug!(
trace!(
"removed subscription, currently have {} active subs (cid: {})",
self.subscriptions.len(),
self.get_client_prefix(),

258
src/db.rs
View File

@@ -38,8 +38,13 @@ pub struct SubmittedEvent {
/// Database file
pub const DB_FILE: &str = "nostr.db";
/// How many persisted events before optimization is triggered
pub const EVENT_COUNT_OPTIMIZE_TRIGGER: usize = 500;
/// How frequently to attempt checkpointing
pub const CHECKPOINT_FREQ_SEC: u64 = 60;
/// How many persisted events before we pause for backups.
/// It isn't clear this is enough to make the online backup API work yet.
pub const EVENT_COUNT_BACKUP_PAUSE_TRIGGER: usize = 1000;
/// Build a database connection pool.
/// # Panics
@@ -78,7 +83,7 @@ pub fn build_pool(
.test_on_check_out(true) // no noticeable performance hit
.min_idle(Some(min_size))
.max_size(max_size)
.max_lifetime(Some(Duration::from_secs(60)))
.max_lifetime(Some(Duration::from_secs(30)))
.build(manager)
.unwrap();
info!(
@@ -88,11 +93,55 @@ pub fn build_pool(
pool
}
/// Display database pool stats every 1 minute
pub async fn monitor_pool(name: &str, pool: SqlitePool) {
let sleep_dur = Duration::from_secs(60);
loop {
log_pool_stats(name, &pool);
tokio::time::sleep(sleep_dur).await;
}
}
/// Perform normal maintenance
pub fn optimize_db(conn: &mut PooledConnection) -> Result<()> {
let start = Instant::now();
conn.execute_batch("PRAGMA optimize;")?;
info!("optimize ran in {:?}", start.elapsed());
Ok(())
}
#[derive(Debug)]
enum SqliteStatus {
Ok,
Busy,
Error,
Other(u64),
}
/// Checkpoint/Truncate WAL. Returns the number of WAL pages remaining.
pub fn checkpoint_db(conn: &mut PooledConnection) -> Result<usize> {
let query = "PRAGMA wal_checkpoint(TRUNCATE);";
let start = Instant::now();
let (cp_result, wal_size, _frames_checkpointed) = conn.query_row(query, [], |row| {
let checkpoint_result: u64 = row.get(0)?;
let wal_size: u64 = row.get(1)?;
let frames_checkpointed: u64 = row.get(2)?;
Ok((checkpoint_result, wal_size, frames_checkpointed))
})?;
let result = match cp_result {
0 => SqliteStatus::Ok,
1 => SqliteStatus::Busy,
2 => SqliteStatus::Error,
x => SqliteStatus::Other(x),
};
info!(
"checkpoint ran in {:?} (result: {:?}, WAL size: {})",
start.elapsed(),
result,
wal_size
);
Ok(wal_size as usize)
}
/// Spawn a database writer that persists events to the SQLite store.
pub async fn db_writer(
@@ -116,7 +165,7 @@ pub async fn db_writer(
&settings,
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
1,
4,
2,
false,
);
if settings.database.in_memory {
@@ -133,8 +182,10 @@ pub async fn db_writer(
let rps_setting = settings.limits.messages_per_sec;
let mut most_recent_rate_limit = Instant::now();
let mut lim_opt = None;
// Keep rough track of events so we can run optimize eventually.
let mut optimize_counter: usize = 0;
// Constant writing has interfered with online backups. Keep
// track of how long since we've given the backups a chance to
// run.
let mut backup_pause_counter: usize = 0;
let clock = governor::clock::QuantaClock::default();
if let Some(rps) = rps_setting {
if rps > 0 {
@@ -199,9 +250,10 @@ pub async fn db_writer(
event.get_author_prefix()
);
} else {
info!("rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)",
uv.name.to_string(),
event.get_author_prefix()
info!(
"rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)",
uv.name.to_string(),
event.get_author_prefix()
);
notice_tx
.try_send(Notice::blocked(
@@ -243,6 +295,7 @@ pub async fn db_writer(
);
event_write = true
} else {
log_pool_stats("writer", &pool);
match write_event(&mut pool.get()?, &event) {
Ok(updated) => {
if updated == 0 {
@@ -267,12 +320,11 @@ pub async fn db_writer(
notice_tx.try_send(Notice::error(event.id, msg)).ok();
}
}
// Use this as a trigger to do optimization
optimize_counter += 1;
if optimize_counter > EVENT_COUNT_OPTIMIZE_TRIGGER {
info!("running database optimizer");
optimize_counter = 0;
optimize_db(&mut pool.get()?).ok();
backup_pause_counter += 1;
if backup_pause_counter > EVENT_COUNT_BACKUP_PAUSE_TRIGGER {
info!("pausing db write thread for a moment...");
thread::sleep(Duration::from_millis(500));
backup_pause_counter = 0
}
}
@@ -320,7 +372,8 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
)?;
if ins_count == 0 {
// if the event was a duplicate, no need to insert event or
// pubkey references. This will abort the txn.
// pubkey references.
tx.rollback().ok();
return Ok(ins_count);
}
// remember primary key of the event most recently inserted.
@@ -338,9 +391,9 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
// if tagvalue is lowercase hex;
if is_lower_hex(tagval) && (tagval.len() % 2 == 0) {
tx.execute(
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
params![ev_id, &tagname, hex::decode(tagval).ok()],
)?;
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
params![ev_id, &tagname, hex::decode(tagval).ok()],
)?;
} else {
tx.execute(
"INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)",
@@ -355,7 +408,7 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
// if this event is replaceable update, hide every other replaceable
// event with the same kind from the same author that was issued
// earlier than this.
if e.kind == 0 || e.kind == 3 || (e.kind >= 10000 && e.kind < 20000) {
if e.kind == 0 || e.kind == 3 || e.kind == 41 || (e.kind >= 10000 && e.kind < 20000) {
let update_count = tx.execute(
"UPDATE event SET hidden=TRUE WHERE id!=? AND kind=? AND author=? AND created_at <= ? and hidden!=TRUE",
params![ev_id, e.kind, hex::decode(&e.pubkey).ok(), e.created_at],
@@ -555,7 +608,10 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
let str_clause = format!("value IN ({})", repeat_vars(str_vals.len()));
let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
// find evidence of the target tag name/value existing for this event.
let tag_clause = format!("e.id IN (SELECT e.id FROM event e LEFT JOIN tag t on e.id=t.event_id WHERE hidden!=TRUE and (name=? AND ({} OR {})))", str_clause, blob_clause);
let tag_clause = format!(
"e.id IN (SELECT e.id FROM event e LEFT JOIN tag t on e.id=t.event_id WHERE hidden!=TRUE and (name=? AND ({} OR {})))",
str_clause, blob_clause
);
// add the tag name as the first parameter
params.push(Box::new(key.to_string()));
// add all tag values that are plain strings as params
@@ -614,15 +670,80 @@ fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
(query, params)
}
fn log_pool_stats(pool: &SqlitePool) {
/// Check if the pool is fully utilized
fn _pool_at_capacity(pool: &SqlitePool) -> bool {
let state: r2d2::State = pool.state();
state.idle_connections == 0
}
/// Log pool stats
fn log_pool_stats(name: &str, pool: &SqlitePool) {
let state: r2d2::State = pool.state();
let in_use_cxns = state.connections - state.idle_connections;
debug!(
"DB pool usage (in_use: {}, available: {})",
in_use_cxns, state.connections
"DB pool {:?} usage (in_use: {}, available: {}, max: {})",
name,
in_use_cxns,
state.connections,
pool.max_size()
);
}
/// Perform database maintenance on a regular basis
pub async fn db_optimize(pool: SqlitePool) {
tokio::task::spawn(async move {
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(60*60)) => {
if let Ok(mut conn) = pool.get() {
// the busy timer will block writers, so don't set
// this any higher than you want max latency for event
// writes.
info!("running database optimizer");
optimize_db(&mut conn).ok();
}
}
};
}
});
}
/// Perform database WAL checkpoint on a regular basis
pub async fn db_checkpoint(pool: SqlitePool) {
tokio::task::spawn(async move {
// WAL size in pages.
let mut current_wal_size = 0;
// WAL threshold for more aggressive checkpointing (10,000 pages, or about 40MB)
let wal_threshold = 1000*10;
// default threshold for the busy timer
let busy_wait_default = Duration::from_secs(1);
// if the WAL file is getting too big, switch to this
let busy_wait_default_long = Duration::from_secs(5);
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(CHECKPOINT_FREQ_SEC)) => {
if let Ok(mut conn) = pool.get() {
// the busy timer will block writers, so don't set
// this any higher than you want max latency for event
// writes.
if current_wal_size <= wal_threshold {
conn.busy_timeout(busy_wait_default).ok();
} else {
// if the wal size has exceeded a threshold, increase the busy timeout.
conn.busy_timeout(busy_wait_default_long).ok();
}
debug!("running wal_checkpoint(TRUNCATE)");
if let Ok(new_size) = checkpoint_db(&mut conn) {
current_wal_size = new_size;
}
}
}
};
}
});
}
/// Perform a database query using a subscription.
///
/// The [`Subscription`] is converted into a SQL query. Each result
@@ -636,24 +757,47 @@ pub async fn db_query(
query_tx: tokio::sync::mpsc::Sender<QueryResult>,
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
) {
let start = Instant::now();
let pre_spawn_start = Instant::now();
task::spawn_blocking(move || {
debug!("moved DB query to thread in {:?}", start.elapsed());
let db_queue_time = pre_spawn_start.elapsed();
// if the queue time was very long (>5 seconds), spare the DB and abort.
if db_queue_time > Duration::from_secs(5) {
info!(
"shedding DB query load from {:?} (cid: {}, sub: {:?})",
db_queue_time, client_id, sub.id
);
return Ok(());
}
// otherwise, report queuing time if it is slow
else if db_queue_time > Duration::from_secs(1) {
debug!(
"(slow) DB query queued for {:?} (cid: {}, sub: {:?})",
db_queue_time, client_id, sub.id
);
}
let start = Instant::now();
let mut row_count: usize = 0;
// generate SQL query
let (q, p) = query_from_sub(&sub);
debug!("SQL generated in {:?}", start.elapsed());
// show pool stats
log_pool_stats(&pool);
let sql_gen_elapsed = start.elapsed();
if sql_gen_elapsed > Duration::from_millis(10) {
debug!("SQL (slow) generated in {:?}", start.elapsed());
}
// cutoff for displaying slow queries
let slow_cutoff = Duration::from_millis(2000);
// any client that doesn't cause us to generate new rows in 5
// seconds gets dropped.
let abort_cutoff = Duration::from_secs(5);
let start = Instant::now();
let mut slow_first_event;
if let Ok(conn) = pool.get() {
// execute the query. Don't cache, since queries vary so much.
let mut stmt = conn.prepare(&q)?;
let mut last_successful_send = Instant::now();
if let Ok(mut conn) = pool.get() {
// execute the query.
// make the actual SQL query (with parameters inserted) available
conn.trace(Some(|x| {trace!("SQL trace: {:?}", x)}));
let mut stmt = conn.prepare_cached(&q)?;
let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
let mut first_result = true;
while let Some(row) = event_rows.next()? {
let first_event_elapsed = start.elapsed();
@@ -667,29 +811,12 @@ pub async fn db_query(
}
// logging for slow queries; show sub and SQL.
// to reduce logging; only show 1/16th of clients (leading 0)
if slow_first_event && client_id.starts_with('0') {
info!(
if row_count == 0 && slow_first_event && client_id.starts_with('0') {
debug!(
"query req (slow): {:?} (cid: {}, sub: {:?})",
sub, client_id, sub.id
);
info!(
"query string (slow): {} (cid: {}, sub: {:?})",
q, client_id, sub.id
);
} else {
trace!(
"query req: {:?} (cid: {}, sub: {:?})",
sub,
client_id,
sub.id
);
trace!(
"query string: {} (cid: {}, sub: {:?})",
q,
client_id,
sub.id
);
}
}
// check if this is still active; every 100 rows
if row_count % 100 == 0 && abandon_query_rx.try_recv().is_ok() {
debug!("query aborted (cid: {}, sub: {:?})", client_id, sub.id);
@@ -697,12 +824,34 @@ pub async fn db_query(
}
row_count += 1;
let event_json = row.get(0)?;
loop {
if query_tx.capacity() != 0 {
// we have capacity to add another item
break;
} else {
// the queue is full
trace!("db reader thread is stalled");
if last_successful_send + abort_cutoff < Instant::now() {
// the queue has been full for too long, abort
info!("aborting database query due to slow client (cid: {}, sub: {:?})",
client_id, sub.id);
let ok: Result<()> = Ok(());
return ok;
}
// give the queue a chance to clear before trying again
thread::sleep(Duration::from_millis(100));
}
}
// TODO: we could use try_send, but we'd have to juggle
// getting the query result back as part of the error
// result.
query_tx
.blocking_send(QueryResult {
sub_id: sub.get_id(),
event: event_json,
})
.ok();
last_successful_send = Instant::now();
}
query_tx
.blocking_send(QueryResult {
@@ -711,10 +860,11 @@ pub async fn db_query(
})
.ok();
debug!(
"query completed in {:?} (cid: {}, sub: {:?}, rows: {})",
start.elapsed(),
"query completed in {:?} (cid: {}, sub: {:?}, db_time: {:?}, rows: {})",
pre_spawn_start.elapsed(),
client_id,
sub.id,
start.elapsed(),
row_count
);
} else {

View File

@@ -80,7 +80,7 @@ impl FromStr for Operator {
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
pub struct ConditionQuery {
pub(crate) conditions: Vec<Condition>,
pub conditions: Vec<Condition>,
}
impl ConditionQuery {
@@ -137,9 +137,9 @@ pub fn validate_delegation(
/// An example complex condition would be: kind=1,2,3&created_at<1665265999
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
pub struct Condition {
pub(crate) field: Field,
pub(crate) operator: Operator,
pub(crate) values: Vec<u64>,
pub field: Field,
pub operator: Operator,
pub values: Vec<u64>,
}
impl Condition {
@@ -332,19 +332,6 @@ mod tests {
assert_eq!(parsed, cq);
Ok(())
}
fn simple_event() -> Event {
Event {
id: "0".to_owned(),
pubkey: "0".to_owned(),
delegated_by: None,
created_at: 0,
kind: 0,
tags: vec![],
content: "".to_owned(),
sig: "0".to_owned(),
tagidx: None,
}
}
// Check for condition logic on event w/ empty values
#[test]
fn condition_with_empty_values() {
@@ -353,7 +340,7 @@ mod tests {
operator: Operator::GreaterThan,
values: vec![],
};
let e = simple_event();
let e = Event::simple_event();
assert!(!c.allows_event(&e));
c.operator = Operator::LessThan;
assert!(!c.allows_event(&e));
@@ -373,7 +360,7 @@ mod tests {
operator: Operator::GreaterThan,
values: vec![10],
};
let mut e = simple_event();
let mut e = Event::simple_event();
// kind is not greater than 10, not allowed
e.kind = 1;
assert!(!c.allows_event(&e));
@@ -392,7 +379,7 @@ mod tests {
operator: Operator::Equals,
values: vec![0, 10, 20],
};
let mut e = simple_event();
let mut e = Event::simple_event();
// Allow if event kind is in list for Equals
e.kind = 10;
assert!(c.allows_event(&e));

View File

@@ -37,19 +37,19 @@ impl EventCmd {
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
pub struct Event {
pub id: String,
pub(crate) pubkey: String,
pub pubkey: String,
#[serde(skip)]
pub(crate) delegated_by: Option<String>,
pub(crate) created_at: u64,
pub(crate) kind: u64,
pub delegated_by: Option<String>,
pub created_at: u64,
pub kind: u64,
#[serde(deserialize_with = "tag_from_string")]
// NOTE: array-of-arrays may need to be more general than a string container
pub(crate) tags: Vec<Vec<String>>,
pub(crate) content: String,
pub(crate) sig: String,
pub tags: Vec<Vec<String>>,
pub content: String,
pub sig: String,
// Optimization for tag search, built on demand.
#[serde(skip)]
pub(crate) tagidx: Option<HashMap<char, HashSet<String>>>,
pub tagidx: Option<HashMap<char, HashSet<String>>>,
}
/// Simple tag type for array of array of strings.
@@ -101,6 +101,21 @@ impl From<EventCmd> for Result<Event> {
}
impl Event {
#[cfg(test)]
pub fn simple_event() -> Event {
Event {
id: "0".to_owned(),
pubkey: "0".to_owned(),
delegated_by: None,
created_at: 0,
kind: 0,
tags: vec![],
content: "".to_owned(),
sig: "0".to_owned(),
tagidx: None,
}
}
pub fn is_kind_metadata(&self) -> bool {
self.kind == 0
}
@@ -226,7 +241,7 @@ impl Event {
}
/// Check if this event has a valid signature.
fn validate(&self) -> Result<()> {
pub fn validate(&self) -> Result<()> {
// TODO: return a Result with a reason for invalid events
// validation is performed by:
// * parsing JSON string into event fields
@@ -319,31 +334,18 @@ impl Event {
#[cfg(test)]
mod tests {
use super::*;
fn simple_event() -> Event {
Event {
id: "0".to_owned(),
pubkey: "0".to_owned(),
delegated_by: None,
created_at: 0,
kind: 0,
tags: vec![],
content: "".to_owned(),
sig: "0".to_owned(),
tagidx: None,
}
}
#[test]
fn event_creation() {
// create an event
let event = simple_event();
let event = Event::simple_event();
assert_eq!(event.id, "0");
}
#[test]
fn event_serialize() -> Result<()> {
// serialize an event to JSON string
let event = simple_event();
let event = Event::simple_event();
let j = serde_json::to_string(&event)?;
assert_eq!(j, "{\"id\":\"0\",\"pubkey\":\"0\",\"created_at\":0,\"kind\":0,\"tags\":[],\"content\":\"\",\"sig\":\"0\"}");
Ok(())
@@ -351,14 +353,14 @@ mod tests {
#[test]
fn empty_event_tag_match() {
let event = simple_event();
let event = Event::simple_event();
assert!(!event
.generic_tag_val_intersect('e', &HashSet::from(["foo".to_owned(), "bar".to_owned()])));
}
#[test]
fn single_event_tag_match() {
let mut event = simple_event();
let mut event = Event::simple_event();
event.tags = vec![vec!["e".to_owned(), "foo".to_owned()]];
event.build_index();
assert_eq!(
@@ -373,7 +375,7 @@ mod tests {
#[test]
fn event_tags_serialize() -> Result<()> {
// serialize an event with tags to JSON string
let mut event = simple_event();
let mut event = Event::simple_event();
event.tags = vec![
vec![
"e".to_owned(),

View File

@@ -517,7 +517,7 @@ impl Verifier {
Ok(updated) => {
if updated != 0 {
info!(
"persisted event: {:?} in {:?}",
"persisted event (new verified pubkey): {:?} in {:?}",
event.get_event_id_prefix(),
start.elapsed()
);
@@ -721,7 +721,7 @@ pub fn query_oldest_user_verification(
earliest: u64,
) -> Result<VerificationRecord> {
let tx = conn.transaction()?;
let query = "SELECT v.id, v.name, e.event_hash, e.author, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v LEFT JOIN event e ON e.id=v.metadata_event WHERE (v.verified_at < ? OR v.verified_at IS NULL) AND (v.failed_at < ? OR v.failed_at IS NULL) ORDER BY v.verified_at ASC, v.failed_at ASC LIMIT 1;";
let query = "SELECT v.id, v.name, e.event_hash, e.author, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v INNER JOIN event e ON e.id=v.metadata_event WHERE (v.verified_at < ? OR v.verified_at IS NULL) AND (v.failed_at < ? OR v.failed_at IS NULL) ORDER BY v.verified_at ASC, v.failed_at ASC LIMIT 1;";
let mut stmt = tx.prepare_cached(query)?;
let fields = stmt.query_row(params![earliest, earliest], |r| {
let rowid: u64 = r.get(0)?;

View File

@@ -16,7 +16,7 @@ pub const STARTUP_SQL: &str = r##"
PRAGMA main.synchronous=NORMAL;
PRAGMA foreign_keys = ON;
PRAGMA journal_size_limit=32768;
pragma mmap_size = 1073741824; -- 1024MB of mmap
pragma mmap_size = 17179869184; -- cap mmap at 16GB
"##;
/// Latest database version

View File

@@ -22,12 +22,14 @@ use hyper::upgrade::Upgraded;
use hyper::{
header, server::conn::AddrStream, upgrade, Body, Request, Response, Server, StatusCode,
};
use rusqlite::OpenFlags;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::atomic::Ordering;
use std::sync::mpsc::Receiver as MpscReceiver;
use std::time::Duration;
use std::time::Instant;
@@ -87,6 +89,7 @@ async fn handle_web_request(
Some(config),
)
.await;
let origin = get_header_string("origin", request.headers());
let user_agent = get_header_string("user-agent", request.headers());
// determine the remote IP from headers if the exist
let header_ip = settings
@@ -100,6 +103,7 @@ async fn handle_web_request(
let client_info = ClientInfo {
remote_ip,
user_agent,
origin,
};
// spawn a nostr server with our websocket
tokio::spawn(nostr_server(
@@ -244,14 +248,19 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
// configure tokio runtime
let rt = Builder::new_multi_thread()
.enable_all()
.thread_name("tokio-ws")
.thread_name_fn(|| {
// give each thread a unique numeric name
static ATOMIC_ID: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1,Ordering::SeqCst);
format!("tokio-ws-{}", id)
})
// limit concurrent SQLite blocking threads
.max_blocking_threads(settings.limits.max_blocking_threads)
.on_thread_start(|| {
debug!("started new thread");
trace!("started new thread: {:?}", std::thread::current().name());
})
.on_thread_stop(|| {
debug!("stopping thread");
trace!("stopped thread: {:?}", std::thread::current().name());
})
.build()
.unwrap();
@@ -310,6 +319,18 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
}
}
}
// build a connection pool for DB maintenance
let maintenance_pool = db::build_pool(
"maintenance writer",
&settings,
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
1,
2,
false,
);
db::db_optimize(maintenance_pool.clone()).await;
db::db_checkpoint(maintenance_pool).await;
// listen for (external to tokio) shutdown request
let controlled_shutdown = invoke_shutdown.clone();
tokio::spawn(async move {
@@ -339,12 +360,15 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
let pool = db::build_pool(
"client query",
&settings,
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
| rusqlite::OpenFlags::SQLITE_OPEN_SHARED_CACHE,
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY,
db_min_conn,
db_max_conn,
true,
);
// spawn a task to check the pool size.
let pool_monitor = pool.clone();
tokio::spawn(async move {db::monitor_pool("reader", pool_monitor).await;});
// A `Service` is needed for every connection, so this
// creates one from our `handle_request` function.
let make_svc = make_service_fn(|conn: &AddrStream| {
@@ -428,6 +452,7 @@ fn make_notice_message(notice: Notice) -> Message {
struct ClientInfo {
remote_ip: String,
user_agent: Option<String>,
origin: Option<String>,
}
/// Handle new client connections. This runs through an event loop
@@ -491,9 +516,14 @@ async fn nostr_server(
let mut client_published_event_count: usize = 0;
let mut client_received_event_count: usize = 0;
debug!("new client connection (cid: {}, ip: {:?})", cid, conn.ip());
if let Some(ua) = client_info.user_agent {
debug!("cid: {}, user-agent: {:?}", cid, ua);
}
let origin = client_info.origin.unwrap_or_else(|| "<unspecified>".into());
let user_agent = client_info
.user_agent
.unwrap_or_else(|| "<unspecified>".into());
debug!(
"cid: {}, origin: {:?}, user-agent: {:?}",
cid, origin, user_agent
);
loop {
tokio::select! {
_ = shutdown.recv() => {
@@ -539,7 +569,7 @@ async fn nostr_server(
// TODO: serialize at broadcast time, instead of
// once for each consumer.
if let Ok(event_str) = serde_json::to_string(&global_event) {
debug!("sub match for client: {}, sub: {:?}, event: {:?}",
trace!("sub match for client: {}, sub: {:?}, event: {:?}",
cid, s,
global_event.get_event_id_prefix());
// create an event response and send it

View File

@@ -65,12 +65,21 @@ impl<'de> Deserialize<'de> for ReqFilter {
tags: None,
force_no_match: false,
};
let empty_string = "".into();
let mut ts = None;
// iterate through each key, and assign values that exist
for (key, val) in filter.into_iter() {
// ids
if key == "ids" {
rf.ids = Deserialize::deserialize(val).ok();
let raw_ids: Option<Vec<String>>= Deserialize::deserialize(val).ok();
if let Some(a) = raw_ids.as_ref() {
if a.contains(&empty_string) {
return Err(serde::de::Error::invalid_type(
Unexpected::Other("prefix matches must not be empty strings"),
&"a json object"));
}
}
rf.ids =raw_ids;
} else if key == "kinds" {
rf.kinds = Deserialize::deserialize(val).ok();
} else if key == "since" {
@@ -80,7 +89,15 @@ impl<'de> Deserialize<'de> for ReqFilter {
} else if key == "limit" {
rf.limit = Deserialize::deserialize(val).ok();
} else if key == "authors" {
rf.authors = Deserialize::deserialize(val).ok();
let raw_authors: Option<Vec<String>>= Deserialize::deserialize(val).ok();
if let Some(a) = raw_authors.as_ref() {
if a.contains(&empty_string) {
return Err(serde::de::Error::invalid_type(
Unexpected::Other("prefix matches must not be empty strings"),
&"a json object"));
}
}
rf.authors = raw_authors;
} else if key.starts_with('#') && key.len() > 1 && val.is_array() {
if let Some(tag_search) = tag_search_char_from_filter(key) {
if ts.is_none() {
@@ -294,6 +311,24 @@ mod tests {
assert!(serde_json::from_str::<Subscription>(raw_json).is_err());
}
#[test]
fn req_empty_authors_prefix() {
let raw_json = "[\"REQ\",\"some-id\",{\"authors\": [\"\"]}]";
assert!(serde_json::from_str::<Subscription>(raw_json).is_err());
}
#[test]
fn req_empty_ids_prefix() {
let raw_json = "[\"REQ\",\"some-id\",{\"ids\": [\"\"]}]";
assert!(serde_json::from_str::<Subscription>(raw_json).is_err());
}
#[test]
fn req_empty_ids_prefix_mixed() {
let raw_json = "[\"REQ\",\"some-id\",{\"ids\": [\"\",\"aaa\"]}]";
assert!(serde_json::from_str::<Subscription>(raw_json).is_err());
}
#[test]
fn legacy_filter() {
// legacy field in filter