feat: rate limit event creation

A configuration option, `messages_per_sec`, imposes a global limit on
the rate for which new events can be stored.

Fixes https://todo.sr.ht/~gheartsfield/nostr-rs-relay/6
This commit is contained in:
Greg Heartsfield 2021-12-30 21:07:21 -06:00
parent c60519de23
commit 20ee5a054c
7 changed files with 267 additions and 22 deletions

171
Cargo.lock generated
View File

@ -81,6 +81,12 @@ dependencies = [
"generic-array",
]
[[package]]
name = "bumpalo"
version = "3.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f1e260c3a9040a7c19a12468758f4c16f31a81a1fe087482be9570ec864bb6c"
[[package]]
name = "byteorder"
version = "1.4.3"
@ -139,6 +145,27 @@ dependencies = [
"libc",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db"
dependencies = [
"cfg-if",
"lazy_static",
]
[[package]]
name = "dashmap"
version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b799062aaf67eb976af3bdca031ee6f846d2f0a5710ddbb0d2efee33f3cc4760"
dependencies = [
"cfg-if",
"num_cpus",
"parking_lot",
]
[[package]]
name = "digest"
version = "0.9.0"
@ -266,6 +293,12 @@ version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dabf1872aaab32c886832f2276d2f5399887e2bd613698a02359e4ea83f8de12"
[[package]]
name = "futures-timer"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
[[package]]
name = "futures-util"
version = "0.3.18"
@ -305,6 +338,23 @@ dependencies = [
"wasi",
]
[[package]]
name = "governor"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3de427a64787873c3b196285e6684cddbf0ae7d1d8d56eaafbb4120c4cb641"
dependencies = [
"dashmap",
"futures",
"futures-timer",
"no-std-compat",
"nonzero_ext",
"parking_lot",
"quanta",
"rand 0.8.4",
"smallvec",
]
[[package]]
name = "hashbrown"
version = "0.11.2"
@ -387,6 +437,15 @@ version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
[[package]]
name = "js-sys"
version = "0.3.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cc9ffccd38c451a86bf13657df244e9c3f37493cce8e5e21e940963777acc84"
dependencies = [
"wasm-bindgen",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
@ -446,6 +505,15 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "mach"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa"
dependencies = [
"libc",
]
[[package]]
name = "matches"
version = "0.1.9"
@ -480,6 +548,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "no-std-compat"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c"
[[package]]
name = "nom"
version = "5.1.2"
@ -491,6 +565,12 @@ dependencies = [
"version_check",
]
[[package]]
name = "nonzero_ext"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]]
name = "nostr-rs-relay"
version = "0.1.6"
@ -500,9 +580,11 @@ dependencies = [
"env_logger",
"futures",
"futures-util",
"governor",
"hex",
"lazy_static",
"log",
"nonzero_ext",
"rusqlite",
"secp256k1",
"serde 1.0.131",
@ -627,6 +709,22 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "quanta"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20afe714292d5e879d8b12740aa223c6a88f118af41870e8b6196e39a02238a8"
dependencies = [
"crossbeam-utils",
"libc",
"mach",
"once_cell",
"raw-cpuid",
"wasi",
"web-sys",
"winapi",
]
[[package]]
name = "quote"
version = "1.0.10"
@ -782,6 +880,15 @@ dependencies = [
"rand_core 0.3.1",
]
[[package]]
name = "raw-cpuid"
version = "10.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "929f54e29691d4e6a9cc558479de70db7aa3d98cd6fe7ab86d7507aa2886b9d2"
dependencies = [
"bitflags",
]
[[package]]
name = "rdrand"
version = "0.4.0"
@ -1158,6 +1265,70 @@ version = "0.10.2+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
[[package]]
name = "wasm-bindgen"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce"
dependencies = [
"cfg-if",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a317bf8f9fba2476b4b2c85ef4c4af8ff39c3c7f0cdfeed4f82c34a880aa837b"
dependencies = [
"bumpalo",
"lazy_static",
"log",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d56146e7c495528bf6587663bea13a8eb588d39b36b679d83972e1a2dbbdacf9"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7803e0eea25835f8abdc585cd3021b3deb11543c6fe226dcd30b228857c5c5ab"
dependencies = [
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0237232789cf037d5480773fe568aac745bfe2afbc11a863e97901780a6b47cc"
[[package]]
name = "web-sys"
version = "0.3.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38eb105f1c59d9eaa6b5cdc92b859d85b926e82cb2e0945cd0c9259faa6fe9fb"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "winapi"
version = "0.3.9"

View File

@ -21,3 +21,5 @@ serde_json = "^1.0"
hex = "^0.4"
rusqlite = "^0.26"
lazy_static = "^1.4"
governor = "^0.4"
nonzero_ext = "^0.3"

32
config.toml Normal file
View File

@ -0,0 +1,32 @@
# Nostr-rs-relay configuration
[network]
# Bind to this network address
address = "0.0.0.0"
# Listen on this port
port = 8080
[options]
# Reject events that have timestamps greater than this many seconds in
# the future. Defaults to rejecting anything greater than 30 minutes
# from the current time.
#reject_future_seconds = 1800
[limits]
# Limit events created per second, averaged over one minute. Must be
# an integer. If not set (or set to 0), defaults to unlimited.
messages_per_sec = 0
# Maximum WebSocket message in bytes. Defaults to 128k.
#max_ws_message_bytes = 131072
# Maximum WebSocket frame size in bytes. Defaults to 128k.
#max_ws_frame_bytes = 131072
# Broadcast buffer size, in number of events. This prevents slow
# readers from consuming memory. Defaults to 4096.
#broadcast_buffer = 4096
# Event persistence buffer size, in number of events. This provides
# backpressure to senders if writes are slow. Defaults to 16.
#event_persist_buffer = 16

View File

@ -1,4 +1,5 @@
use lazy_static::lazy_static;
use log::*;
use serde::{Deserialize, Serialize};
use std::sync::RwLock;
@ -28,13 +29,13 @@ pub struct Retention {
pub max_events: Option<usize>, // max events
pub max_bytes: Option<usize>, // max size
pub persist_days: Option<usize>, // oldest message
pub whitelist_addresses: Vec<String>, // whitelisted addresses (never delete)
pub whitelist_addresses: Option<Vec<String>>, // whitelisted addresses (never delete)
}
#[derive(Debug, Serialize, Deserialize)]
#[allow(unused)]
pub struct Limits {
pub messages_per_sec: Option<usize>, // Artificially slow down event writing to limit disk consumption
pub messages_per_sec: Option<u32>, // Artificially slow down event writing to limit disk consumption (averaged over 1 minute)
pub max_event_bytes: Option<usize>,
pub max_ws_message_bytes: Option<usize>,
pub max_ws_frame_bytes: Option<usize>,
@ -55,7 +56,15 @@ impl Settings {
pub fn new() -> Self {
let d = Self::default();
// attempt to construct settings with file
Self::new_from_default(&d).unwrap_or(d)
// Self::new_from_default(&d).unwrap_or(d)
let from_file = Self::new_from_default(&d);
match from_file {
Ok(f) => f,
Err(e) => {
warn!("Error reading config file ({:?})", e);
d
}
}
}
fn new_from_default(default: &Settings) -> Result<Self, config::ConfigError> {
@ -89,7 +98,7 @@ impl Default for Settings {
max_events: None, // max events
max_bytes: None, // max size
persist_days: None, // oldest message
whitelist_addresses: vec![], // whitelisted addresses (never delete)
whitelist_addresses: None, // whitelisted addresses (never delete)
},
options: Options {
reject_future_seconds: Some(30 * 60), // Reject events 30min in the future or greater

View File

@ -2,12 +2,17 @@
use crate::error::Result;
use crate::event::Event;
use crate::subscription::Subscription;
use governor::clock::Clock;
use governor::{Quota, RateLimiter};
use hex;
use log::*;
use rusqlite::params;
use rusqlite::Connection;
use rusqlite::OpenFlags;
//use std::num::NonZeroU32;
use crate::config::SETTINGS;
use std::path::Path;
use std::thread;
use tokio::task;
/// Database file
@ -114,6 +119,7 @@ PRAGMA user_version = 2;
pub async fn db_writer(
mut event_rx: tokio::sync::mpsc::Receiver<Event>,
bcast_tx: tokio::sync::broadcast::Sender<Event>,
mut shutdown: tokio::sync::broadcast::Receiver<()>,
) -> tokio::task::JoinHandle<Result<()>> {
task::spawn_blocking(move || {
let mut conn = Connection::open_with_flags(
@ -122,26 +128,38 @@ pub async fn db_writer(
)?;
info!("opened database for writing");
upgrade_db(&mut conn)?;
// if version is zero, then we need to initialize from scratch.
// if version is one, we need to upgrade.
// if version is two, we are at the latest!
// TODO: determine if we need to execute the init script.
// TODO: check database app id / version before proceeding.
// get rate limit settings
let config = SETTINGS.read().unwrap();
let rps_setting = config.limits.messages_per_sec;
let mut lim_opt = None;
let clock = governor::clock::QuantaClock::default();
if let Some(rps) = rps_setting {
if rps > 0 {
info!("Enabling rate limits for event creation ({}/sec)", rps);
let quota = core::num::NonZeroU32::new(rps * 60).unwrap();
lim_opt = Some(RateLimiter::direct(Quota::per_minute(quota)));
}
}
loop {
if let Ok(_) = shutdown.try_recv() {
info!("shutting down database writer");
break;
}
// call blocking read on channel
let next_event = event_rx.blocking_recv();
// if the channel has closed, we will never get work
if next_event.is_none() {
break;
}
let mut event_write = false;
let event = next_event.unwrap();
match write_event(&mut conn, &event) {
Ok(updated) => {
if updated == 0 {
info!("nothing inserted (dupe?)");
debug!("ignoring duplicate event");
} else {
info!("persisted event: {}", event.get_event_id_prefix());
event_write = true;
// send this out to all clients
bcast_tx.send(event.clone()).ok();
}
@ -150,6 +168,16 @@ pub async fn db_writer(
warn!("event insert failed: {}", err);
}
}
// use rate limit, if defined, and if an event was actually written.
if event_write {
if let Some(ref lim) = lim_opt {
if let Err(n) = lim.check() {
info!("Rate limiting event creation");
thread::sleep(n.wait_time_from(clock.now()));
continue;
}
}
}
}
conn.close().ok();
info!("database connection closed");

View File

@ -28,10 +28,10 @@ fn main() -> Result<(), Error> {
let mut settings = config::SETTINGS.write().unwrap();
// replace default settings with those read from config.toml
let c = config::Settings::new();
debug!("using settings: {:?}", c);
*settings = c;
}
let config = config::SETTINGS.read().unwrap();
debug!("config: {:?}", config);
let addr = format!("{}:{}", config.network.address.trim(), config.network.port);
// configure tokio runtime
let rt = Builder::new_multi_thread()
@ -52,10 +52,6 @@ fn main() -> Result<(), Error> {
// validated events that need to be persisted are sent to the
// database on via this channel.
let (event_tx, event_rx) = mpsc::channel::<Event>(settings.limits.event_persist_buffer);
// start the database writer thread. Give it a channel for
// writing events, and for publishing events that have been
// written (to all connected clients).
db::db_writer(event_rx, bcast_tx.clone()).await;
// establish a channel for letting all threads now about a
// requested server shutdown.
let (invoke_shutdown, _) = broadcast::channel::<()>(1);
@ -66,6 +62,11 @@ fn main() -> Result<(), Error> {
info!("shutting down due to SIGINT");
ctrl_c_shutdown.send(()).ok();
});
// start the database writer thread. Give it a channel for
// writing events, and for publishing events that have been
// written (to all connected clients).
db::db_writer(event_rx, bcast_tx.clone(), invoke_shutdown.subscribe()).await;
// track unique client connection count
let mut client_accept_count: usize = 0;
let mut stop_listening = invoke_shutdown.subscribe();

View File

@ -54,6 +54,8 @@ impl Stream for NostrStream {
/// Convert Message to NostrMessage
fn convert(msg: String) -> Result<NostrMessage> {
debug!("raw msg: {}", msg);
let event_size = msg.len();
debug!("event size is {} bytes", event_size);
let parsed_res: Result<NostrMessage> = serde_json::from_str(&msg).map_err(|e| e.into());
match parsed_res {
Ok(m) => Ok(m),