feat: add rate limiting setting for subscription creation

This commit is contained in:
Greg Heartsfield 2022-12-17 09:27:29 -06:00
parent 0e851d4f71
commit 0d8d39ad22
3 changed files with 26 additions and 1 deletions

View File

@ -65,6 +65,11 @@ reject_future_seconds = 1800
# an integer. If not set (or set to 0), defaults to unlimited. # an integer. If not set (or set to 0), defaults to unlimited.
#messages_per_sec = 0 #messages_per_sec = 0
# Limit client subscriptions created per second, averaged over one
# minute. Must be an integer. If not set (or set to 0), defaults to
# unlimited.
#subscriptions_per_min = 0
# Limit the maximum size of an EVENT message. Defaults to 128 KB. # Limit the maximum size of an EVENT message. Defaults to 128 KB.
# Set to 0 for unlimited. # Set to 0 for unlimited.
#max_event_bytes = 131072 #max_event_bytes = 131072

View File

@ -52,7 +52,8 @@ pub struct Retention {
#[allow(unused)] #[allow(unused)]
pub struct Limits { pub struct Limits {
pub messages_per_sec: Option<u32>, // Artificially slow down event writing to limit disk consumption (averaged over 1 minute) pub messages_per_sec: Option<u32>, // Artificially slow down event writing to limit disk consumption (averaged over 1 minute)
pub max_event_bytes: Option<usize>, // Maximum size of an EVENT message pub subscriptions_per_min: Option<u32>, // Artificially slow down request (db query) creation to prevent abuse (averaged over 1 minute)
pub max_event_bytes: Option<usize>, // Maximum size of an EVENT message
pub max_ws_message_bytes: Option<usize>, pub max_ws_message_bytes: Option<usize>,
pub max_ws_frame_bytes: Option<usize>, pub max_ws_frame_bytes: Option<usize>,
pub broadcast_buffer: usize, // events to buffer for subscribers (prevents slow readers from consuming memory) pub broadcast_buffer: usize, // events to buffer for subscribers (prevents slow readers from consuming memory)
@ -214,6 +215,7 @@ impl Default for Settings {
}, },
limits: Limits { limits: Limits {
messages_per_sec: None, messages_per_sec: None,
subscriptions_per_min: None,
max_event_bytes: Some(2 << 17), // 128K max_event_bytes: Some(2 << 17), // 128K
max_ws_message_bytes: Some(2 << 17), // 128K max_ws_message_bytes: Some(2 << 17), // 128K
max_ws_frame_bytes: Some(2 << 17), // 128K max_ws_frame_bytes: Some(2 << 17), // 128K

View File

@ -14,6 +14,7 @@ use crate::notice::Notice;
use crate::subscription::Subscription; use crate::subscription::Subscription;
use futures::SinkExt; use futures::SinkExt;
use futures::StreamExt; use futures::StreamExt;
use governor::{Jitter, Quota, RateLimiter};
use http::header::HeaderMap; use http::header::HeaderMap;
use hyper::header::ACCEPT; use hyper::header::ACCEPT;
use hyper::service::{make_service_fn, service_fn}; use hyper::service::{make_service_fn, service_fn};
@ -438,6 +439,19 @@ async fn nostr_server(
let mut bcast_rx = broadcast.subscribe(); let mut bcast_rx = broadcast.subscribe();
// Track internal client state // Track internal client state
let mut conn = conn::ClientConn::new(client_info.remote_ip); let mut conn = conn::ClientConn::new(client_info.remote_ip);
// subscription creation rate limiting
let mut sub_lim_opt = None;
// 100ms jitter when the rate limiter returns
let jitter = Jitter::up_to(Duration::from_millis(100));
let sub_per_min_setting = settings.limits.subscriptions_per_min;
if let Some(sub_per_min) = sub_per_min_setting {
if sub_per_min > 0 {
trace!("Rate limits for sub creation ({}/min)", sub_per_min);
let quota_time = core::num::NonZeroU32::new(sub_per_min).unwrap();
let quota = Quota::per_minute(quota_time);
sub_lim_opt = Some(RateLimiter::direct(quota));
}
}
// Use the remote IP as the client identifier // Use the remote IP as the client identifier
let cid = conn.get_client_prefix(); let cid = conn.get_client_prefix();
// Create a channel for receiving query results from the database. // Create a channel for receiving query results from the database.
@ -606,11 +620,15 @@ async fn nostr_server(
Ok(NostrMessage::SubMsg(s)) => { Ok(NostrMessage::SubMsg(s)) => {
debug!("subscription requested (cid: {}, sub: {:?})", cid, s.id); debug!("subscription requested (cid: {}, sub: {:?})", cid, s.id);
// subscription handling consists of: // subscription handling consists of:
// * check for rate limits
// * registering the subscription so future events can be matched // * registering the subscription so future events can be matched
// * making a channel to cancel to request later // * making a channel to cancel to request later
// * sending a request for a SQL query // * sending a request for a SQL query
// Do nothing if the sub already exists. // Do nothing if the sub already exists.
if !current_subs.contains(&s) { if !current_subs.contains(&s) {
if let Some(ref lim) = sub_lim_opt {
lim.until_ready_with_jitter(jitter).await;
}
current_subs.push(s.clone()); current_subs.push(s.clone());
let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>(); let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>();
match conn.subscribe(s.clone()) { match conn.subscribe(s.clone()) {