From 20ee5a054cb3b8662a91f6bf17ca58735a86ae55 Mon Sep 17 00:00:00 2001 From: Greg Heartsfield Date: Thu, 30 Dec 2021 21:07:21 -0600 Subject: [PATCH] feat: rate limit event creation A configuration option, `messages_per_sec`, imposes a global limit on the rate for which new events can be stored. Fixes https://todo.sr.ht/~gheartsfield/nostr-rs-relay/6 --- Cargo.lock | 171 +++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 2 + config.toml | 32 +++++++++ src/config.rs | 29 +++++--- src/db.rs | 42 +++++++++-- src/main.rs | 11 +-- src/protostream.rs | 2 + 7 files changed, 267 insertions(+), 22 deletions(-) create mode 100644 config.toml diff --git a/Cargo.lock b/Cargo.lock index 62c423f..f671c48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -81,6 +81,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bumpalo" +version = "3.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f1e260c3a9040a7c19a12468758f4c16f31a81a1fe087482be9570ec864bb6c" + [[package]] name = "byteorder" version = "1.4.3" @@ -139,6 +145,27 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" +dependencies = [ + "cfg-if", + "lazy_static", +] + +[[package]] +name = "dashmap" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b799062aaf67eb976af3bdca031ee6f846d2f0a5710ddbb0d2efee33f3cc4760" +dependencies = [ + "cfg-if", + "num_cpus", + "parking_lot", +] + [[package]] name = "digest" version = "0.9.0" @@ -266,6 +293,12 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dabf1872aaab32c886832f2276d2f5399887e2bd613698a02359e4ea83f8de12" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.18" @@ -305,6 +338,23 @@ dependencies = [ "wasi", ] +[[package]] +name = "governor" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3de427a64787873c3b196285e6684cddbf0ae7d1d8d56eaafbb4120c4cb641" +dependencies = [ + "dashmap", + "futures", + "futures-timer", + "no-std-compat", + "nonzero_ext", + "parking_lot", + "quanta", + "rand 0.8.4", + "smallvec", +] + [[package]] name = "hashbrown" version = "0.11.2" @@ -387,6 +437,15 @@ version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" +[[package]] +name = "js-sys" +version = "0.3.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cc9ffccd38c451a86bf13657df244e9c3f37493cce8e5e21e940963777acc84" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -446,6 +505,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "mach" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa" +dependencies = [ + "libc", +] + [[package]] name = "matches" version = "0.1.9" @@ -480,6 +548,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nom" version = "5.1.2" @@ -491,6 +565,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "nostr-rs-relay" version = "0.1.6" @@ -500,9 +580,11 @@ dependencies = [ "env_logger", "futures", "futures-util", + "governor", "hex", "lazy_static", "log", + "nonzero_ext", "rusqlite", "secp256k1", "serde 1.0.131", @@ -627,6 +709,22 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "quanta" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20afe714292d5e879d8b12740aa223c6a88f118af41870e8b6196e39a02238a8" +dependencies = [ + "crossbeam-utils", + "libc", + "mach", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quote" version = "1.0.10" @@ -782,6 +880,15 @@ dependencies = [ "rand_core 0.3.1", ] +[[package]] +name = "raw-cpuid" +version = "10.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "929f54e29691d4e6a9cc558479de70db7aa3d98cd6fe7ab86d7507aa2886b9d2" +dependencies = [ + "bitflags", +] + [[package]] name = "rdrand" version = "0.4.0" @@ -1158,6 +1265,70 @@ version = "0.10.2+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" +[[package]] +name = "wasm-bindgen" +version = "0.2.78" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.78" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a317bf8f9fba2476b4b2c85ef4c4af8ff39c3c7f0cdfeed4f82c34a880aa837b" +dependencies = [ + "bumpalo", + "lazy_static", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.78" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d56146e7c495528bf6587663bea13a8eb588d39b36b679d83972e1a2dbbdacf9" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.78" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7803e0eea25835f8abdc585cd3021b3deb11543c6fe226dcd30b228857c5c5ab" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.78" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0237232789cf037d5480773fe568aac745bfe2afbc11a863e97901780a6b47cc" + +[[package]] +name = "web-sys" +version = "0.3.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38eb105f1c59d9eaa6b5cdc92b859d85b926e82cb2e0945cd0c9259faa6fe9fb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index fc6b4ef..f23543a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,3 +21,5 @@ serde_json = "^1.0" hex = "^0.4" rusqlite = "^0.26" lazy_static = "^1.4" +governor = "^0.4" +nonzero_ext = "^0.3" diff --git a/config.toml b/config.toml new file mode 100644 index 0000000..2ea365d --- /dev/null +++ b/config.toml @@ -0,0 +1,32 @@ +# Nostr-rs-relay configuration + +[network] +# Bind to this network address +address = "0.0.0.0" +# Listen on this port +port = 8080 + +[options] +# Reject events that have timestamps greater than this many seconds in +# the future. Defaults to rejecting anything greater than 30 minutes +# from the current time. +#reject_future_seconds = 1800 + +[limits] +# Limit events created per second, averaged over one minute. Must be +# an integer. If not set (or set to 0), defaults to unlimited. +messages_per_sec = 0 + +# Maximum WebSocket message in bytes. Defaults to 128k. +#max_ws_message_bytes = 131072 + +# Maximum WebSocket frame size in bytes. Defaults to 128k. +#max_ws_frame_bytes = 131072 + +# Broadcast buffer size, in number of events. This prevents slow +# readers from consuming memory. Defaults to 4096. +#broadcast_buffer = 4096 + +# Event persistence buffer size, in number of events. This provides +# backpressure to senders if writes are slow. Defaults to 16. +#event_persist_buffer = 16 diff --git a/src/config.rs b/src/config.rs index 359bdef..72708bf 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,5 @@ use lazy_static::lazy_static; +use log::*; use serde::{Deserialize, Serialize}; use std::sync::RwLock; @@ -25,16 +26,16 @@ pub struct Options { #[allow(unused)] pub struct Retention { // TODO: implement - pub max_events: Option, // max events - pub max_bytes: Option, // max size - pub persist_days: Option, // oldest message - pub whitelist_addresses: Vec, // whitelisted addresses (never delete) + pub max_events: Option, // max events + pub max_bytes: Option, // max size + pub persist_days: Option, // oldest message + pub whitelist_addresses: Option>, // whitelisted addresses (never delete) } #[derive(Debug, Serialize, Deserialize)] #[allow(unused)] pub struct Limits { - pub messages_per_sec: Option, // Artificially slow down event writing to limit disk consumption + pub messages_per_sec: Option, // Artificially slow down event writing to limit disk consumption (averaged over 1 minute) pub max_event_bytes: Option, pub max_ws_message_bytes: Option, pub max_ws_frame_bytes: Option, @@ -55,7 +56,15 @@ impl Settings { pub fn new() -> Self { let d = Self::default(); // attempt to construct settings with file - Self::new_from_default(&d).unwrap_or(d) + // Self::new_from_default(&d).unwrap_or(d) + let from_file = Self::new_from_default(&d); + match from_file { + Ok(f) => f, + Err(e) => { + warn!("Error reading config file ({:?})", e); + d + } + } } fn new_from_default(default: &Settings) -> Result { @@ -86,10 +95,10 @@ impl Default for Settings { event_persist_buffer: 16, }, retention: Retention { - max_events: None, // max events - max_bytes: None, // max size - persist_days: None, // oldest message - whitelist_addresses: vec![], // whitelisted addresses (never delete) + max_events: None, // max events + max_bytes: None, // max size + persist_days: None, // oldest message + whitelist_addresses: None, // whitelisted addresses (never delete) }, options: Options { reject_future_seconds: Some(30 * 60), // Reject events 30min in the future or greater diff --git a/src/db.rs b/src/db.rs index aca05cd..7b459ce 100644 --- a/src/db.rs +++ b/src/db.rs @@ -2,12 +2,17 @@ use crate::error::Result; use crate::event::Event; use crate::subscription::Subscription; +use governor::clock::Clock; +use governor::{Quota, RateLimiter}; use hex; use log::*; use rusqlite::params; use rusqlite::Connection; use rusqlite::OpenFlags; +//use std::num::NonZeroU32; +use crate::config::SETTINGS; use std::path::Path; +use std::thread; use tokio::task; /// Database file @@ -114,6 +119,7 @@ PRAGMA user_version = 2; pub async fn db_writer( mut event_rx: tokio::sync::mpsc::Receiver, bcast_tx: tokio::sync::broadcast::Sender, + mut shutdown: tokio::sync::broadcast::Receiver<()>, ) -> tokio::task::JoinHandle> { task::spawn_blocking(move || { let mut conn = Connection::open_with_flags( @@ -122,26 +128,38 @@ pub async fn db_writer( )?; info!("opened database for writing"); upgrade_db(&mut conn)?; - // if version is zero, then we need to initialize from scratch. - // if version is one, we need to upgrade. - // if version is two, we are at the latest! - - // TODO: determine if we need to execute the init script. - // TODO: check database app id / version before proceeding. + // get rate limit settings + let config = SETTINGS.read().unwrap(); + let rps_setting = config.limits.messages_per_sec; + let mut lim_opt = None; + let clock = governor::clock::QuantaClock::default(); + if let Some(rps) = rps_setting { + if rps > 0 { + info!("Enabling rate limits for event creation ({}/sec)", rps); + let quota = core::num::NonZeroU32::new(rps * 60).unwrap(); + lim_opt = Some(RateLimiter::direct(Quota::per_minute(quota))); + } + } loop { + if let Ok(_) = shutdown.try_recv() { + info!("shutting down database writer"); + break; + } // call blocking read on channel let next_event = event_rx.blocking_recv(); // if the channel has closed, we will never get work if next_event.is_none() { break; } + let mut event_write = false; let event = next_event.unwrap(); match write_event(&mut conn, &event) { Ok(updated) => { if updated == 0 { - info!("nothing inserted (dupe?)"); + debug!("ignoring duplicate event"); } else { info!("persisted event: {}", event.get_event_id_prefix()); + event_write = true; // send this out to all clients bcast_tx.send(event.clone()).ok(); } @@ -150,6 +168,16 @@ pub async fn db_writer( warn!("event insert failed: {}", err); } } + // use rate limit, if defined, and if an event was actually written. + if event_write { + if let Some(ref lim) = lim_opt { + if let Err(n) = lim.check() { + info!("Rate limiting event creation"); + thread::sleep(n.wait_time_from(clock.now())); + continue; + } + } + } } conn.close().ok(); info!("database connection closed"); diff --git a/src/main.rs b/src/main.rs index 5e6eb40..b705093 100644 --- a/src/main.rs +++ b/src/main.rs @@ -28,10 +28,10 @@ fn main() -> Result<(), Error> { let mut settings = config::SETTINGS.write().unwrap(); // replace default settings with those read from config.toml let c = config::Settings::new(); - debug!("using settings: {:?}", c); *settings = c; } let config = config::SETTINGS.read().unwrap(); + debug!("config: {:?}", config); let addr = format!("{}:{}", config.network.address.trim(), config.network.port); // configure tokio runtime let rt = Builder::new_multi_thread() @@ -52,10 +52,6 @@ fn main() -> Result<(), Error> { // validated events that need to be persisted are sent to the // database on via this channel. let (event_tx, event_rx) = mpsc::channel::(settings.limits.event_persist_buffer); - // start the database writer thread. Give it a channel for - // writing events, and for publishing events that have been - // written (to all connected clients). - db::db_writer(event_rx, bcast_tx.clone()).await; // establish a channel for letting all threads now about a // requested server shutdown. let (invoke_shutdown, _) = broadcast::channel::<()>(1); @@ -66,6 +62,11 @@ fn main() -> Result<(), Error> { info!("shutting down due to SIGINT"); ctrl_c_shutdown.send(()).ok(); }); + // start the database writer thread. Give it a channel for + // writing events, and for publishing events that have been + // written (to all connected clients). + db::db_writer(event_rx, bcast_tx.clone(), invoke_shutdown.subscribe()).await; + // track unique client connection count let mut client_accept_count: usize = 0; let mut stop_listening = invoke_shutdown.subscribe(); diff --git a/src/protostream.rs b/src/protostream.rs index 3e8bfd3..410edb7 100644 --- a/src/protostream.rs +++ b/src/protostream.rs @@ -54,6 +54,8 @@ impl Stream for NostrStream { /// Convert Message to NostrMessage fn convert(msg: String) -> Result { debug!("raw msg: {}", msg); + let event_size = msg.len(); + debug!("event size is {} bytes", event_size); let parsed_res: Result = serde_json::from_str(&msg).map_err(|e| e.into()); match parsed_res { Ok(m) => Ok(m),