Compare commits

...

51 Commits

Author SHA1 Message Date
Greg Heartsfield
95748647f0 build: bump version to 0.7.13 2022-12-22 16:27:34 -06:00
Greg Heartsfield
25480e837f fix: do not block writers for more than 1 second during checkpoints 2022-12-22 16:10:49 -06:00
Greg Heartsfield
b80b54cd9d improvement: reduce logging, especially for database pool size 2022-12-22 15:47:33 -06:00
Greg Heartsfield
8ea732cbe5 feat: perform regular database maintenance (60sec), without blocking main writer thread 2022-12-22 15:16:21 -06:00
Greg Heartsfield
0f68c4e5c2 refactor: formatting 2022-12-22 15:15:45 -06:00
Greg Heartsfield
dab2cd5792 wip: future changes to rustfmt 2022-12-22 15:13:54 -06:00
Greg Heartsfield
f411aa6fc2 fix: do not re-verify NIP-05 entries where metadata was deleted 2022-12-22 13:01:48 -06:00
Greg Heartsfield
d31bbda087 improvement: reduce lifetime of database connections 2022-12-22 13:01:12 -06:00
Greg Heartsfield
5917bc53b2 improvement: run maintenance every 60 seconds instead of by event count 2022-12-22 11:40:17 -06:00
Greg Heartsfield
91177c61a1 improvement: log reason for new event creation from nip05 2022-12-22 10:48:30 -06:00
Greg Heartsfield
53c2a8051c improvement: reduce logging 2022-12-22 10:29:27 -06:00
Greg Heartsfield
168cf513ac feat: perform full checkpoints and truncate WAL every 2k events 2022-12-22 10:11:05 -06:00
Greg Heartsfield
ea204761c9 fix: do not show slow queries more than once per sub 2022-12-20 15:41:50 -06:00
Greg Heartsfield
c270ae1434 improvement: reduce event count for db writer pauses 2022-12-20 15:25:24 -06:00
Greg Heartsfield
64bd983cb6 perf: every 5000 persisted events, pause for 500ms for backups
I have observed backups running for a very long time under heavy load,
this introduces some artificial delay to give the online backup enough
time to make progress.
2022-12-20 15:05:04 -06:00
Greg Heartsfield
1c153bc784 perf: shed DB query load when queue gets large 2022-12-20 13:23:21 -06:00
Greg Heartsfield
dc11d9a619 improvement: explicitly rollback transaction on duplicate event 2022-12-20 13:23:04 -06:00
Greg Heartsfield
cd1557787b improvement: log write pool 2022-12-20 13:21:57 -06:00
Greg Heartsfield
86bb7aeb9a improvement: function to check pool capacity 2022-12-20 10:07:01 -06:00
Greg Heartsfield
ce37fc1a2d build: bump version to 0.7.12 2022-12-19 14:50:42 -06:00
Greg Heartsfield
2cfd384339 perf: drop db handles that are not quickly read 2022-12-19 00:18:39 -06:00
Greg Heartsfield
8c013107f9 perf: increase upper bound for sqlite mmap 2022-12-18 23:19:43 -06:00
Greg Heartsfield
64a4466d30 perf: backing down on max_blocking_threads 2022-12-18 23:14:41 -06:00
Greg Heartsfield
1596c23eb4 perf: increase blocking threads now that contention is reduced 2022-12-18 22:46:32 -06:00
Greg Heartsfield
129badd4e1 perf: reduce per thread mmap allocation for DB 2022-12-18 22:45:32 -06:00
Greg Heartsfield
6f7c080180 improvement: reduce number of writer blocking threads from 4->2 2022-12-18 22:32:31 -06:00
Greg Heartsfield
af92561ef6 perf: remove shared cache mode (experiment) 2022-12-18 22:15:50 -06:00
Greg Heartsfield
d833a3e40d perf: reduce logging 2022-12-18 22:11:46 -06:00
Greg Heartsfield
462eb46642 build: bump version to 0.7.11 2022-12-18 20:52:01 -06:00
Greg Heartsfield
cf144d503d perf: reduce logging for slow queries 2022-12-18 20:47:11 -06:00
Greg Heartsfield
fb8375aef2 build: bump version to 0.7.10 2022-12-18 13:46:18 -06:00
Greg Heartsfield
88ac31b549 perf: increase channel size for DB communication 2022-12-18 13:44:28 -06:00
Greg Heartsfield
677b7d39e9 improvement: log slow requests that return zero results 2022-12-18 13:42:31 -06:00
Greg Heartsfield
b24d2f9aaa perf: set default blocking threads to lower value 2022-12-18 12:20:57 -06:00
Greg Heartsfield
7a3899d852 build: bump version to 0.7.9 2022-12-18 09:21:07 -06:00
Greg Heartsfield
818108b793 improvement: upgrade multiple dependencies
Updating anyhow v1.0.66 -> v1.0.67
Updating async-trait v0.1.59 -> v0.1.60
Updating cxx v1.0.83 -> v1.0.84
Updating cxx-build v1.0.83 -> v1.0.84
Updating cxxbridge-flags v1.0.83 -> v1.0.84
Updating cxxbridge-macro v1.0.83 -> v1.0.84
Updating itoa v1.0.4 -> v1.0.5
Updating link-cplusplus v1.0.7 -> v1.0.8
Updating proc-macro2 v1.0.47 -> v1.0.48
Updating quote v1.0.21 -> v1.0.22
Updating rustversion v1.0.9 -> v1.0.11
Updating ryu v1.0.11 -> v1.0.12
Updating scratch v1.0.2 -> v1.0.3
Updating serde v1.0.150 -> v1.0.151
Updating serde_derive v1.0.150 -> v1.0.151
Updating serde_json v1.0.89 -> v1.0.90
Updating syn v1.0.105 -> v1.0.106
Updating thiserror v1.0.37 -> v1.0.38
Updating thiserror-impl v1.0.37 -> v1.0.38
Updating unicode-ident v1.0.5 -> v1.0.6
2022-12-18 09:16:09 -06:00
Greg Heartsfield
d10348f7e1 feat: configurable blocking threads 2022-12-18 09:14:04 -06:00
Greg Heartsfield
8598e443d8 wip: add configuration for future feature (client concurrent db limits) 2022-12-17 23:19:48 -06:00
Greg Heartsfield
43222d44e5 feat: perform optimization after seeing many events 2022-12-17 23:18:54 -06:00
Greg Heartsfield
7c1516c4fb perf: add index for tags 2022-12-17 23:17:53 -06:00
Greg Heartsfield
0c72053a49 perf: increase mmap size to 1GB 2022-12-17 23:17:16 -06:00
Greg Heartsfield
3f32ff67ab improvement: minor logging 2022-12-17 23:11:14 -06:00
Greg Heartsfield
0b9778d6ca refactor: simplify tracking of subscriptions 2022-12-17 20:46:58 -06:00
Greg Heartsfield
9be04120c7 build: bump version to 0.7.8 2022-12-17 12:01:43 -06:00
Greg Heartsfield
cc06167e06 perf: add composite index for tag table 2022-12-17 12:01:20 -06:00
Greg Heartsfield
b6e33f044f improvement: limit db connection max lifetime 2022-12-17 10:47:35 -06:00
Greg Heartsfield
1b2c6f9fca build: bump version to 0.7.7 2022-12-17 10:09:44 -06:00
Greg Heartsfield
0d8d39ad22 feat: add rate limiting setting for subscription creation 2022-12-17 09:27:29 -06:00
Greg Heartsfield
0e851d4f71 build: bump version to 0.7.6 2022-12-17 07:51:57 -06:00
Greg Heartsfield
3c880b2f49 perf: pull distinct to outermost SQL 2022-12-17 07:49:28 -06:00
Greg Heartsfield
7a4c9266ec improvement: make hexsearch structs sortable 2022-12-17 07:49:05 -06:00
11 changed files with 379 additions and 119 deletions

82
Cargo.lock generated
View File

@@ -54,9 +54,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.66"
version = "1.0.67"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6"
checksum = "7724808837b77f4b4de9d283820f9d98bcf496d5692934b857a2399d31ff22e6"
[[package]]
name = "async-stream"
@@ -81,9 +81,9 @@ dependencies = [
[[package]]
name = "async-trait"
version = "0.1.59"
version = "0.1.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31e6e93155431f3931513b243d371981bb2770112b370c82745a1d19d2f99364"
checksum = "677d1d8ab452a3936018a687b20e6f7cf5363d713b732b8884001317b0e48aa3"
dependencies = [
"proc-macro2",
"quote",
@@ -382,9 +382,9 @@ dependencies = [
[[package]]
name = "cxx"
version = "1.0.83"
version = "1.0.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdf07d07d6531bfcdbe9b8b739b104610c6508dcc4d63b410585faf338241daf"
checksum = "27874566aca772cb515af4c6e997b5fe2119820bca447689145e39bb734d19a0"
dependencies = [
"cc",
"cxxbridge-flags",
@@ -394,9 +394,9 @@ dependencies = [
[[package]]
name = "cxx-build"
version = "1.0.83"
version = "1.0.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2eb5b96ecdc99f72657332953d4d9c50135af1bac34277801cc3937906ebd39"
checksum = "e7bb951f2523a49533003656a72121306b225ec16a49a09dc6b0ba0d6f3ec3c0"
dependencies = [
"cc",
"codespan-reporting",
@@ -409,15 +409,15 @@ dependencies = [
[[package]]
name = "cxxbridge-flags"
version = "1.0.83"
version = "1.0.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac040a39517fd1674e0f32177648334b0f4074625b5588a64519804ba0553b12"
checksum = "be778b6327031c1c7b61dd2e48124eee5361e6aa76b8de93692f011b08870ab4"
[[package]]
name = "cxxbridge-macro"
version = "1.0.83"
version = "1.0.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1362b0ddcfc4eb0a1f57b68bd77dd99f0e826958a96abd0ae9bd092e114ffed6"
checksum = "7b8a2b87662fe5a0a0b38507756ab66aff32638876a0866e5a5fc82ceb07ee49"
dependencies = [
"proc-macro2",
"quote",
@@ -901,9 +901,9 @@ dependencies = [
[[package]]
name = "itoa"
version = "1.0.4"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4217ad341ebadf8d8e724e264f13e593e0648f5b3e94b3896a5df283be015ecc"
checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440"
[[package]]
name = "js-sys"
@@ -950,9 +950,9 @@ dependencies = [
[[package]]
name = "link-cplusplus"
version = "1.0.7"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9272ab7b96c9046fbc5bc56c06c117cb639fe2d509df0c421cad82d2915cf369"
checksum = "ecd207c9c713c34f95a097a5b029ac2ce6010530c7b49d7fea24d977dede04f5"
dependencies = [
"cc",
]
@@ -1096,7 +1096,7 @@ checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]]
name = "nostr-rs-relay"
version = "0.7.5"
version = "0.7.13"
dependencies = [
"anyhow",
"bitcoin_hashes",
@@ -1414,9 +1414,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "proc-macro2"
version = "1.0.47"
version = "1.0.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725"
checksum = "e9d89e5dba24725ae5678020bf8f1357a9aa7ff10736b551adbcd3f8d17d766f"
dependencies = [
"unicode-ident",
]
@@ -1472,9 +1472,9 @@ dependencies = [
[[package]]
name = "quote"
version = "1.0.21"
version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbe448f377a7d6961e30f5955f9b8d106c3f5e449d493ee1b125c1d43c2b5179"
checksum = "556d0f47a940e895261e77dc200d5eadfc6ef644c179c6f5edfc105e3a2292c8"
dependencies = [
"proc-macro2",
]
@@ -1736,15 +1736,15 @@ dependencies = [
[[package]]
name = "rustversion"
version = "1.0.9"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97477e48b4cf8603ad5f7aaf897467cf42ab4218a38ef76fb14c2d6773a6d6a8"
checksum = "5583e89e108996506031660fe09baa5011b9dd0341b89029313006d1fb508d70"
[[package]]
name = "ryu"
version = "1.0.11"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09"
checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde"
[[package]]
name = "schannel"
@@ -1773,9 +1773,9 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "scratch"
version = "1.0.2"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8132065adcfd6e02db789d9285a0deb2f3fcb04002865ab67d5fb103533898"
checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2"
[[package]]
name = "secp256k1"
@@ -1823,18 +1823,18 @@ dependencies = [
[[package]]
name = "serde"
version = "1.0.150"
version = "1.0.151"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e326c9ec8042f1b5da33252c8a37e9ffbd2c9bef0155215b6e6c80c790e05f91"
checksum = "97fed41fc1a24994d044e6db6935e69511a1153b52c15eb42493b26fa87feba0"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.150"
version = "1.0.151"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42a3df25b0713732468deadad63ab9da1f1fd75a48a15024b50363f128db627e"
checksum = "255abe9a125a985c05190d687b320c12f9b1f0b99445e608c21ba0782c719ad8"
dependencies = [
"proc-macro2",
"quote",
@@ -1843,9 +1843,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.89"
version = "1.0.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "020ff22c755c2ed3f8cf162dbb41a7268d934702f3ed3631656ea597e08fc3db"
checksum = "8778cc0b528968fe72abec38b5db5a20a70d148116cd9325d2bc5f5180ca3faf"
dependencies = [
"indexmap",
"itoa",
@@ -1920,9 +1920,9 @@ dependencies = [
[[package]]
name = "syn"
version = "1.0.105"
version = "1.0.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60b9b43d45702de4c839cb9b51d9f529c5dd26a4aff255b42b1ebc03e88ee908"
checksum = "09ee3a69cd2c7e06684677e5629b3878b253af05e4714964204279c6bc02cf0b"
dependencies = [
"proc-macro2",
"quote",
@@ -1960,18 +1960,18 @@ dependencies = [
[[package]]
name = "thiserror"
version = "1.0.37"
version = "1.0.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e"
checksum = "6a9cd18aa97d5c45c6603caea1da6628790b37f7a34b6ca89522331c5180fed0"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.37"
version = "1.0.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb"
checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f"
dependencies = [
"proc-macro2",
"quote",
@@ -2330,9 +2330,9 @@ checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992"
[[package]]
name = "unicode-ident"
version = "1.0.5"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3"
checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc"
[[package]]
name = "unicode-normalization"

View File

@@ -1,6 +1,6 @@
[package]
name = "nostr-rs-relay"
version = "0.7.5"
version = "0.7.13"
edition = "2021"
authors = ["Greg Heartsfield <scsibug@imap.cc>"]
description = "A relay implementation for the Nostr protocol"

View File

@@ -62,8 +62,24 @@ reject_future_seconds = 1800
[limits]
# Limit events created per second, averaged over one minute. Must be
# an integer. If not set (or set to 0), defaults to unlimited.
#messages_per_sec = 0
# an integer. If not set (or set to 0), defaults to unlimited. Note:
# this is for the server as a whole, not per-connection.
# messages_per_sec = 0
# Limit client subscriptions created per second, averaged over one
# minute. Must be an integer. If not set (or set to 0), defaults to
# unlimited.
#subscriptions_per_min = 0
# UNIMPLEMENTED...
# Limit how many concurrent database connections a client can have.
# This prevents a single client from starting too many expensive
# database queries. Must be an integer. If not set (or set to 0),
# defaults to unlimited (subject to subscription limits).
#db_conns_per_client = 0
# Limit blocking threads used for database connections. Defaults to 16.
#max_blocking_threads = 16
# Limit the maximum size of an EVENT message. Defaults to 128 KB.
# Set to 0 for unlimited.

View File

@@ -1 +1,4 @@
edition = "2021"
#max_width = 140
#chain_width = 100
#fn_call_width = 100

View File

@@ -52,6 +52,9 @@ pub struct Retention {
#[allow(unused)]
pub struct Limits {
pub messages_per_sec: Option<u32>, // Artificially slow down event writing to limit disk consumption (averaged over 1 minute)
pub subscriptions_per_min: Option<u32>, // Artificially slow down request (db query) creation to prevent abuse (averaged over 1 minute)
pub db_conns_per_client: Option<u32>, // How many concurrent database queries (not subscriptions) may a client have?
pub max_blocking_threads: usize,
pub max_event_bytes: Option<usize>, // Maximum size of an EVENT message
pub max_ws_message_bytes: Option<usize>,
pub max_ws_frame_bytes: Option<usize>,
@@ -214,6 +217,9 @@ impl Default for Settings {
},
limits: Limits {
messages_per_sec: None,
subscriptions_per_min: None,
db_conns_per_client: None,
max_blocking_threads: 16,
max_event_bytes: Some(2 << 17), // 128K
max_ws_message_bytes: Some(2 << 17), // 128K
max_ws_frame_bytes: Some(2 << 17), // 128K

View File

@@ -5,7 +5,7 @@ use crate::error::Result;
use crate::subscription::Subscription;
use std::collections::HashMap;
use tracing::{debug, info};
use tracing::{debug, trace};
use uuid::Uuid;
/// A subscription identifier has a maximum length
@@ -46,6 +46,11 @@ impl ClientConn {
&self.subscriptions
}
/// Check if the given subscription already exists
pub fn has_subscription(&self, sub: &Subscription) -> bool {
self.subscriptions.values().any(|x| x == sub)
}
/// Get a short prefix of the client's unique identifier, suitable
/// for logging.
#[must_use]
@@ -69,7 +74,7 @@ impl ClientConn {
// prevent arbitrarily long subscription identifiers from
// being used.
if sub_id_len > MAX_SUBSCRIPTION_ID_LEN {
info!(
debug!(
"ignoring sub request with excessive length: ({})",
sub_id_len
);
@@ -79,7 +84,7 @@ impl ClientConn {
if self.subscriptions.contains_key(&k) {
self.subscriptions.remove(&k);
self.subscriptions.insert(k, s.clone());
debug!(
trace!(
"replaced existing subscription (cid: {}, sub: {:?})",
self.get_client_prefix(),
s.get_id()
@@ -93,7 +98,7 @@ impl ClientConn {
}
// add subscription
self.subscriptions.insert(k, s);
debug!(
trace!(
"registered new subscription, currently have {} active subs (cid: {})",
self.subscriptions.len(),
self.get_client_prefix(),
@@ -105,7 +110,7 @@ impl ClientConn {
pub fn unsubscribe(&mut self, c: &Close) {
// TODO: return notice if subscription did not exist.
self.subscriptions.remove(&c.id);
debug!(
trace!(
"removed subscription, currently have {} active subs (cid: {})",
self.subscriptions.len(),
self.get_client_prefix(),

245
src/db.rs
View File

@@ -39,6 +39,14 @@ pub struct SubmittedEvent {
/// Database file
pub const DB_FILE: &str = "nostr.db";
/// How frequently to run maintenance
/// How many persisted events before DB maintenannce is triggered.
pub const EVENT_MAINTENANCE_FREQ_SEC: u64 = 60;
/// How many persisted events before we pause for backups.
/// It isn't clear this is enough to make the online backup API work yet.
pub const EVENT_COUNT_BACKUP_PAUSE_TRIGGER: usize = 1000;
/// Build a database connection pool.
/// # Panics
///
@@ -76,6 +84,7 @@ pub fn build_pool(
.test_on_check_out(true) // no noticeable performance hit
.min_idle(Some(min_size))
.max_size(max_size)
.max_lifetime(Some(Duration::from_secs(30)))
.build(manager)
.unwrap();
info!(
@@ -85,6 +94,46 @@ pub fn build_pool(
pool
}
/// Perform normal maintenance
pub fn optimize_db(conn: &mut PooledConnection) -> Result<()> {
let start = Instant::now();
conn.execute_batch("PRAGMA optimize;")?;
info!("optimize ran in {:?}", start.elapsed());
Ok(())
}
#[derive(Debug)]
enum SqliteReturnStatus {
SqliteOk,
SqliteBusy,
SqliteError,
SqliteOther(u64),
}
/// Checkpoint/Truncate WAL
pub fn checkpoint_db(conn: &mut PooledConnection) -> Result<()> {
let query = "PRAGMA wal_checkpoint(TRUNCATE);";
let start = Instant::now();
let (cp_result, wal_size, _frames_checkpointed) = conn.query_row(query, [], |row| {
let checkpoint_result: u64 = row.get(0)?;
let wal_size: u64 = row.get(1)?;
let frames_checkpointed: u64 = row.get(2)?;
Ok((checkpoint_result, wal_size, frames_checkpointed))
})?;
let result = match cp_result {
0 => SqliteReturnStatus::SqliteOk,
1 => SqliteReturnStatus::SqliteBusy,
2 => SqliteReturnStatus::SqliteError,
x => SqliteReturnStatus::SqliteOther(x),
};
info!(
"checkpoint ran in {:?} (result: {:?}, WAL size: {})",
start.elapsed(),
result,
wal_size
);
Ok(())
}
/// Spawn a database writer that persists events to the SQLite store.
pub async fn db_writer(
settings: Settings,
@@ -107,7 +156,7 @@ pub async fn db_writer(
&settings,
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
1,
4,
2,
false,
);
if settings.database.in_memory {
@@ -124,6 +173,10 @@ pub async fn db_writer(
let rps_setting = settings.limits.messages_per_sec;
let mut most_recent_rate_limit = Instant::now();
let mut lim_opt = None;
// Constant writing has interfered with online backups. Keep
// track of how long since we've given the backups a chance to
// run.
let mut backup_pause_counter: usize = 0;
let clock = governor::clock::QuantaClock::default();
if let Some(rps) = rps_setting {
if rps > 0 {
@@ -188,9 +241,10 @@ pub async fn db_writer(
event.get_author_prefix()
);
} else {
info!("rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)",
uv.name.to_string(),
event.get_author_prefix()
info!(
"rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)",
uv.name.to_string(),
event.get_author_prefix()
);
notice_tx
.try_send(Notice::blocked(
@@ -232,6 +286,7 @@ pub async fn db_writer(
);
event_write = true
} else {
log_pool_stats("writer", &pool);
match write_event(&mut pool.get()?, &event) {
Ok(updated) => {
if updated == 0 {
@@ -256,6 +311,12 @@ pub async fn db_writer(
notice_tx.try_send(Notice::error(event.id, msg)).ok();
}
}
backup_pause_counter += 1;
if backup_pause_counter > EVENT_COUNT_BACKUP_PAUSE_TRIGGER {
info!("pausing db write thread for a moment...");
thread::sleep(Duration::from_millis(500));
backup_pause_counter = 0
}
}
// use rate limit, if defined, and if an event was actually written.
@@ -302,7 +363,8 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
)?;
if ins_count == 0 {
// if the event was a duplicate, no need to insert event or
// pubkey references. This will abort the txn.
// pubkey references.
tx.rollback().ok();
return Ok(ins_count);
}
// remember primary key of the event most recently inserted.
@@ -320,9 +382,9 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
// if tagvalue is lowercase hex;
if is_lower_hex(tagval) && (tagval.len() % 2 == 0) {
tx.execute(
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
params![ev_id, &tagname, hex::decode(tagval).ok()],
)?;
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
params![ev_id, &tagname, hex::decode(tagval).ok()],
)?;
} else {
tx.execute(
"INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)",
@@ -428,14 +490,13 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
// if the filter is malformed, don't return anything.
if f.force_no_match {
let empty_query =
"SELECT DISTINCT(e.content), e.created_at FROM event e WHERE 1=0".to_owned();
let empty_query = "SELECT e.content, e.created_at FROM event e WHERE 1=0".to_owned();
// query parameters for SQLite
let empty_params: Vec<Box<dyn ToSql>> = vec![];
return (empty_query, empty_params);
}
let mut query = "SELECT DISTINCT(e.content), e.created_at FROM event e".to_owned();
let mut query = "SELECT e.content, e.created_at FROM event e".to_owned();
// query parameters for SQLite
let mut params: Vec<Box<dyn ToSql>> = vec![];
@@ -538,7 +599,10 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
let str_clause = format!("value IN ({})", repeat_vars(str_vals.len()));
let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
// find evidence of the target tag name/value existing for this event.
let tag_clause = format!("e.id IN (SELECT e.id FROM event e LEFT JOIN tag t on e.id=t.event_id WHERE hidden!=TRUE and (name=? AND ({} OR {})))", str_clause, blob_clause);
let tag_clause = format!(
"e.id IN (SELECT e.id FROM event e LEFT JOIN tag t on e.id=t.event_id WHERE hidden!=TRUE and (name=? AND ({} OR {})))",
str_clause, blob_clause
);
// add the tag name as the first parameter
params.push(Box::new(key.to_string()));
// add all tag values that are plain strings as params
@@ -591,19 +655,53 @@ fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
// encapsulate subqueries into select statements
let subqueries_selects: Vec<String> = subqueries
.iter()
.map(|s| format!("SELECT content, created_at FROM ({})", s))
.map(|s| format!("SELECT distinct content, created_at FROM ({})", s))
.collect();
let query: String = subqueries_selects.join(" UNION ");
(query, params)
}
fn log_pool_stats(pool: &SqlitePool) {
/// Check if the pool is fully utilized
fn _pool_at_capacity(pool: &SqlitePool) -> bool {
let state: r2d2::State = pool.state();
state.idle_connections == 0
}
/// Log pool stats
fn log_pool_stats(name: &str, pool: &SqlitePool) {
let state: r2d2::State = pool.state();
let in_use_cxns = state.connections - state.idle_connections;
debug!(
"DB pool usage (in_use: {}, available: {})",
in_use_cxns, state.connections
trace!(
"DB pool {:?} usage (in_use: {}, available: {})",
name,
in_use_cxns,
state.connections
);
if state.connections == in_use_cxns {
debug!("DB pool {:?} is empty (in_use: {})", name, in_use_cxns);
}
}
/// Perform database maintenance on a regular basis
pub async fn db_maintenance(pool: SqlitePool) {
tokio::task::spawn(async move {
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(EVENT_MAINTENANCE_FREQ_SEC)) => {
if let Ok(mut conn) = pool.get() {
// the busy timer will block writers, so don't set
// this any higher than you want max latency for event
// writes.
conn.busy_timeout(Duration::from_secs(1)).ok();
debug!("running database optimizer");
optimize_db(&mut conn).ok();
debug!("running wal_checkpoint(TRUNCATE)");
checkpoint_db(&mut conn).ok();
}
}
};
}
});
}
/// Perform a database query using a subscription.
@@ -619,64 +717,116 @@ pub async fn db_query(
query_tx: tokio::sync::mpsc::Sender<QueryResult>,
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
) {
let pre_spawn_start = Instant::now();
task::spawn_blocking(move || {
let mut row_count: usize = 0;
let db_queue_time = pre_spawn_start.elapsed();
// if the queue time was very long (>5 seconds), spare the DB and abort.
if db_queue_time > Duration::from_secs(5) {
info!(
"shedding DB query load from {:?} (cid: {}, sub: {:?})",
db_queue_time, client_id, sub.id
);
return Ok(());
}
// otherwise, report queuing time if it is slow
else if db_queue_time > Duration::from_secs(1) {
debug!(
"(slow) DB query queued for {:?} (cid: {}, sub: {:?})",
db_queue_time, client_id, sub.id
);
}
let start = Instant::now();
let mut row_count: usize = 0;
// generate SQL query
let (q, p) = query_from_sub(&sub);
trace!("SQL generated in {:?}", start.elapsed());
let sql_gen_elapsed = start.elapsed();
if sql_gen_elapsed > Duration::from_millis(10) {
debug!("SQL (slow) generated in {:?}", start.elapsed());
}
// show pool stats
log_pool_stats(&pool);
log_pool_stats("reader", &pool);
// cutoff for displaying slow queries
let slow_cutoff = Duration::from_millis(1000);
let slow_cutoff = Duration::from_millis(2000);
// any client that doesn't cause us to generate new rows in 5
// seconds gets dropped.
let abort_cutoff = Duration::from_secs(5);
let start = Instant::now();
let mut slow_first_event;
let mut last_successful_send = Instant::now();
if let Ok(conn) = pool.get() {
// execute the query. Don't cache, since queries vary so much.
let mut stmt = conn.prepare(&q)?;
let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
let mut first_result = true;
while let Some(row) = event_rows.next()? {
let first_event_elapsed = start.elapsed();
slow_first_event = first_event_elapsed >= slow_cutoff;
if first_result {
let first_result_elapsed = start.elapsed();
// logging for slow queries; show sub and SQL
if first_result_elapsed >= slow_cutoff {
info!(
"going to query for: {:?} (cid: {}, sub: {:?})",
sub, client_id, sub.id
);
info!(
"final query string (slow): {} (cid: {}, sub: {:?})",
q, client_id, sub.id
);
} else {
trace!(
"going to query for: {:?} (cid: {}, sub: {:?})",
sub,
client_id,
sub.id
);
trace!("final query string: {}", q);
}
debug!(
"first result in {:?} (cid: {}, sub: {:?})",
first_result_elapsed, client_id, sub.id
first_event_elapsed, client_id, sub.id
);
first_result = false;
}
// check if this is still active
// TODO: check every N rows
if abandon_query_rx.try_recv().is_ok() {
// logging for slow queries; show sub and SQL.
// to reduce logging; only show 1/16th of clients (leading 0)
if row_count == 0 && slow_first_event && client_id.starts_with("0") {
debug!(
"query req (slow): {:?} (cid: {}, sub: {:?})",
sub, client_id, sub.id
);
debug!(
"query string (slow): {} (cid: {}, sub: {:?})",
q, client_id, sub.id
);
} else {
trace!(
"query req: {:?} (cid: {}, sub: {:?})",
sub,
client_id,
sub.id
);
trace!(
"query string: {} (cid: {}, sub: {:?})",
q,
client_id,
sub.id
);
}
// check if this is still active; every 100 rows
if row_count % 100 == 0 && abandon_query_rx.try_recv().is_ok() {
debug!("query aborted (cid: {}, sub: {:?})", client_id, sub.id);
return Ok(());
}
row_count += 1;
let event_json = row.get(0)?;
loop {
if query_tx.capacity() != 0 {
// we have capacity to add another item
break;
} else {
// the queue is full
trace!("db reader thread is stalled");
if last_successful_send + abort_cutoff < Instant::now() {
// the queue has been full for too long, abort
info!("aborting database query due to slow client");
let ok: Result<()> = Ok(());
return ok;
}
// give the queue a chance to clear before trying again
thread::sleep(Duration::from_millis(100));
}
}
// TODO: we could use try_send, but we'd have to juggle
// getting the query result back as part of the error
// result.
query_tx
.blocking_send(QueryResult {
sub_id: sub.get_id(),
event: event_json,
})
.ok();
last_successful_send = Instant::now();
}
query_tx
.blocking_send(QueryResult {
@@ -685,10 +835,11 @@ pub async fn db_query(
})
.ok();
debug!(
"query completed in {:?} (cid: {}, sub: {:?}, rows: {})",
start.elapsed(),
"query completed in {:?} (cid: {}, sub: {:?}, db_time: {:?}, rows: {})",
pre_spawn_start.elapsed(),
client_id,
sub.id,
start.elapsed(),
row_count
);
} else {

View File

@@ -3,7 +3,7 @@ use crate::utils::is_hex;
use hex;
/// Types of hexadecimal queries.
#[derive(PartialEq, Eq, Debug, Clone)]
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone)]
pub enum HexSearch {
// when no range is needed, exact 32-byte
Exact(Vec<u8>),

View File

@@ -517,7 +517,7 @@ impl Verifier {
Ok(updated) => {
if updated != 0 {
info!(
"persisted event: {:?} in {:?}",
"persisted event (new verified pubkey): {:?} in {:?}",
event.get_event_id_prefix(),
start.elapsed()
);
@@ -721,7 +721,7 @@ pub fn query_oldest_user_verification(
earliest: u64,
) -> Result<VerificationRecord> {
let tx = conn.transaction()?;
let query = "SELECT v.id, v.name, e.event_hash, e.author, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v LEFT JOIN event e ON e.id=v.metadata_event WHERE (v.verified_at < ? OR v.verified_at IS NULL) AND (v.failed_at < ? OR v.failed_at IS NULL) ORDER BY v.verified_at ASC, v.failed_at ASC LIMIT 1;";
let query = "SELECT v.id, v.name, e.event_hash, e.author, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v INNER JOIN event e ON e.id=v.metadata_event WHERE (v.verified_at < ? OR v.verified_at IS NULL) AND (v.failed_at < ? OR v.failed_at IS NULL) ORDER BY v.verified_at ASC, v.failed_at ASC LIMIT 1;";
let mut stmt = tx.prepare_cached(query)?;
let fields = stmt.query_row(params![earliest, earliest], |r| {
let rowid: u64 = r.get(0)?;

View File

@@ -16,11 +16,11 @@ pub const STARTUP_SQL: &str = r##"
PRAGMA main.synchronous=NORMAL;
PRAGMA foreign_keys = ON;
PRAGMA journal_size_limit=32768;
pragma mmap_size = 536870912; -- 512MB of mmap
pragma mmap_size = 17179869184; -- cap mmap at 16GB
"##;
/// Latest database version
pub const DB_VERSION: usize = 9;
pub const DB_VERSION: usize = 11;
/// Schema definition
const INIT_SQL: &str = formatcp!(
@@ -67,6 +67,8 @@ FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS tag_val_index ON tag(value);
CREATE INDEX IF NOT EXISTS tag_val_hex_index ON tag(value_hex);
CREATE INDEX IF NOT EXISTS tag_composite_index ON tag(event_id,name,value_hex,value);
CREATE INDEX IF NOT EXISTS tag_name_eid_index ON tag(name,event_id,value_hex);
-- NIP-05 User Validation
CREATE TABLE IF NOT EXISTS user_verification (
@@ -163,6 +165,12 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<()> {
if curr_version == 8 {
curr_version = mig_8_to_9(conn)?;
}
if curr_version == 9 {
curr_version = mig_9_to_10(conn)?;
}
if curr_version == 10 {
curr_version = mig_10_to_11(conn)?;
}
if curr_version == DB_VERSION {
info!(
@@ -419,5 +427,45 @@ PRAGMA user_version = 9;
panic!("database could not be upgraded");
}
}
Ok(8)
Ok(9)
}
fn mig_9_to_10(conn: &mut PooledConnection) -> Result<usize> {
info!("database schema needs update from 9->10");
// Those old indexes were actually helpful...
let upgrade_sql = r##"
CREATE INDEX IF NOT EXISTS tag_composite_index ON tag(event_id,name,value_hex,value);
PRAGMA user_version = 10;
"##;
match conn.execute_batch(upgrade_sql) {
Ok(()) => {
info!("database schema upgraded v9 -> v10");
}
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be upgraded");
}
}
Ok(10)
}
fn mig_10_to_11(conn: &mut PooledConnection) -> Result<usize> {
info!("database schema needs update from 10->11");
// Those old indexes were actually helpful...
let upgrade_sql = r##"
CREATE INDEX IF NOT EXISTS tag_name_eid_index ON tag(name,event_id,value_hex);
reindex;
pragma optimize;
PRAGMA user_version = 11;
"##;
match conn.execute_batch(upgrade_sql) {
Ok(()) => {
info!("database schema upgraded v10 -> v11");
}
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be upgraded");
}
}
Ok(11)
}

View File

@@ -14,6 +14,7 @@ use crate::notice::Notice;
use crate::subscription::Subscription;
use futures::SinkExt;
use futures::StreamExt;
use governor::{Jitter, Quota, RateLimiter};
use http::header::HeaderMap;
use hyper::header::ACCEPT;
use hyper::service::{make_service_fn, service_fn};
@@ -21,6 +22,7 @@ use hyper::upgrade::Upgraded;
use hyper::{
header, server::conn::AddrStream, upgrade, Body, Request, Response, Server, StatusCode,
};
use rusqlite::OpenFlags;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
@@ -244,6 +246,14 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
let rt = Builder::new_multi_thread()
.enable_all()
.thread_name("tokio-ws")
// limit concurrent SQLite blocking threads
.max_blocking_threads(settings.limits.max_blocking_threads)
.on_thread_start(|| {
trace!("started new thread");
})
.on_thread_stop(|| {
trace!("stopping thread");
})
.build()
.unwrap();
// start tokio
@@ -301,6 +311,17 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
}
}
}
// build a connection pool for DB maintenance
let maintenance_pool = db::build_pool(
"maintenance writer",
&settings,
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
1,
1,
false,
);
db::db_maintenance(maintenance_pool).await;
// listen for (external to tokio) shutdown request
let controlled_shutdown = invoke_shutdown.clone();
tokio::spawn(async move {
@@ -330,8 +351,7 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
let pool = db::build_pool(
"client query",
&settings,
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
| rusqlite::OpenFlags::SQLITE_OPEN_SHARED_CACHE,
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY,
db_min_conn,
db_max_conn,
true,
@@ -438,13 +458,28 @@ async fn nostr_server(
let mut bcast_rx = broadcast.subscribe();
// Track internal client state
let mut conn = conn::ClientConn::new(client_info.remote_ip);
// subscription creation rate limiting
let mut sub_lim_opt = None;
// 100ms jitter when the rate limiter returns
let jitter = Jitter::up_to(Duration::from_millis(100));
let sub_per_min_setting = settings.limits.subscriptions_per_min;
if let Some(sub_per_min) = sub_per_min_setting {
if sub_per_min > 0 {
trace!("Rate limits for sub creation ({}/min)", sub_per_min);
let quota_time = core::num::NonZeroU32::new(sub_per_min).unwrap();
let quota = Quota::per_minute(quota_time);
sub_lim_opt = Some(RateLimiter::direct(quota));
}
}
// Use the remote IP as the client identifier
let cid = conn.get_client_prefix();
// Create a channel for receiving query results from the database.
// we will send out the tx handle to any query we generate.
let (query_tx, mut query_rx) = mpsc::channel::<db::QueryResult>(256);
// this has capacity for some of the larger requests we see, which
// should allow the DB thread to release the handle earlier.
let (query_tx, mut query_rx) = mpsc::channel::<db::QueryResult>(20000);
// Create channel for receiving NOTICEs
let (notice_tx, mut notice_rx) = mpsc::channel::<Notice>(32);
let (notice_tx, mut notice_rx) = mpsc::channel::<Notice>(128);
// last time this client sent data (message, ping, etc.)
let mut last_message_time = Instant::now();
@@ -462,8 +497,6 @@ async fn nostr_server(
// when these subscriptions are cancelled, make a message
// available to the executing query so it knows to stop.
let mut running_queries: HashMap<String, oneshot::Sender<()>> = HashMap::new();
// keep track of the subscriptions we have
let mut current_subs: Vec<Subscription> = Vec::new();
// for stats, keep track of how many events the client published,
// and how many it received from queries.
let mut client_published_event_count: usize = 0;
@@ -517,7 +550,7 @@ async fn nostr_server(
// TODO: serialize at broadcast time, instead of
// once for each consumer.
if let Ok(event_str) = serde_json::to_string(&global_event) {
debug!("sub match for client: {}, sub: {:?}, event: {:?}",
trace!("sub match for client: {}, sub: {:?}, event: {:?}",
cid, s,
global_event.get_event_id_prefix());
// create an event response and send it
@@ -606,12 +639,15 @@ async fn nostr_server(
Ok(NostrMessage::SubMsg(s)) => {
debug!("subscription requested (cid: {}, sub: {:?})", cid, s.id);
// subscription handling consists of:
// * check for rate limits
// * registering the subscription so future events can be matched
// * making a channel to cancel to request later
// * sending a request for a SQL query
// Do nothing if the sub already exists.
if !current_subs.contains(&s) {
current_subs.push(s.clone());
if !conn.has_subscription(&s) {
if let Some(ref lim) = sub_lim_opt {
lim.until_ready_with_jitter(jitter).await;
}
let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>();
match conn.subscribe(s.clone()) {
Ok(()) => {
@@ -619,27 +655,22 @@ async fn nostr_server(
if let Some(previous_query) = running_queries.insert(s.id.to_owned(), abandon_query_tx) {
previous_query.send(()).ok();
}
// start a database query
// start a database query. this spawns a blocking database query on a worker thread.
db::db_query(s, cid.to_owned(), pool.clone(), query_tx.clone(), abandon_query_rx).await;
},
Err(e) => {
info!("Subscription error: {}", e);
info!("Subscription error: {} (cid: {}, sub: {:?})", e, cid, s.id);
ws_stream.send(make_notice_message(Notice::message(format!("Subscription error: {}", e)))).await.ok();
}
}
} else {
info!("client send duplicate subscription, ignoring (cid: {}, sub: {:?})", cid, s.id);
info!("client sent duplicate subscription, ignoring (cid: {}, sub: {:?})", cid, s.id);
}
},
Ok(NostrMessage::CloseMsg(cc)) => {
// closing a request simply removes the subscription.
let parsed : Result<Close> = Result::<Close>::from(cc);
if let Ok(c) = parsed {
// remove from the list of known subs
if let Some(pos) = current_subs.iter().position(|s| *s.id == c.id) {
current_subs.remove(pos);
}
// check if a query is currently
// running, and remove it if so.
let stop_tx = running_queries.remove(&c.id);