mirror of
https://github.com/scsibug/nostr-rs-relay.git
synced 2025-09-01 03:40:46 -04:00
Compare commits
5 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
9be04120c7 | ||
|
cc06167e06 | ||
|
b6e33f044f | ||
|
1b2c6f9fca | ||
|
0d8d39ad22 |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -1096,7 +1096,7 @@ checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
|
||||
|
||||
[[package]]
|
||||
name = "nostr-rs-relay"
|
||||
version = "0.7.6"
|
||||
version = "0.7.8"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bitcoin_hashes",
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "nostr-rs-relay"
|
||||
version = "0.7.6"
|
||||
version = "0.7.8"
|
||||
edition = "2021"
|
||||
authors = ["Greg Heartsfield <scsibug@imap.cc>"]
|
||||
description = "A relay implementation for the Nostr protocol"
|
||||
|
@@ -65,6 +65,11 @@ reject_future_seconds = 1800
|
||||
# an integer. If not set (or set to 0), defaults to unlimited.
|
||||
#messages_per_sec = 0
|
||||
|
||||
# Limit client subscriptions created per second, averaged over one
|
||||
# minute. Must be an integer. If not set (or set to 0), defaults to
|
||||
# unlimited.
|
||||
#subscriptions_per_min = 0
|
||||
|
||||
# Limit the maximum size of an EVENT message. Defaults to 128 KB.
|
||||
# Set to 0 for unlimited.
|
||||
#max_event_bytes = 131072
|
||||
|
@@ -52,7 +52,8 @@ pub struct Retention {
|
||||
#[allow(unused)]
|
||||
pub struct Limits {
|
||||
pub messages_per_sec: Option<u32>, // Artificially slow down event writing to limit disk consumption (averaged over 1 minute)
|
||||
pub max_event_bytes: Option<usize>, // Maximum size of an EVENT message
|
||||
pub subscriptions_per_min: Option<u32>, // Artificially slow down request (db query) creation to prevent abuse (averaged over 1 minute)
|
||||
pub max_event_bytes: Option<usize>, // Maximum size of an EVENT message
|
||||
pub max_ws_message_bytes: Option<usize>,
|
||||
pub max_ws_frame_bytes: Option<usize>,
|
||||
pub broadcast_buffer: usize, // events to buffer for subscribers (prevents slow readers from consuming memory)
|
||||
@@ -214,6 +215,7 @@ impl Default for Settings {
|
||||
},
|
||||
limits: Limits {
|
||||
messages_per_sec: None,
|
||||
subscriptions_per_min: None,
|
||||
max_event_bytes: Some(2 << 17), // 128K
|
||||
max_ws_message_bytes: Some(2 << 17), // 128K
|
||||
max_ws_frame_bytes: Some(2 << 17), // 128K
|
||||
|
@@ -76,6 +76,7 @@ pub fn build_pool(
|
||||
.test_on_check_out(true) // no noticeable performance hit
|
||||
.min_idle(Some(min_size))
|
||||
.max_size(max_size)
|
||||
.max_lifetime(Some(Duration::from_secs(60)))
|
||||
.build(manager)
|
||||
.unwrap();
|
||||
info!(
|
||||
|
@@ -20,7 +20,7 @@ pragma mmap_size = 536870912; -- 512MB of mmap
|
||||
"##;
|
||||
|
||||
/// Latest database version
|
||||
pub const DB_VERSION: usize = 9;
|
||||
pub const DB_VERSION: usize = 10;
|
||||
|
||||
/// Schema definition
|
||||
const INIT_SQL: &str = formatcp!(
|
||||
@@ -67,6 +67,7 @@ FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS tag_val_index ON tag(value);
|
||||
CREATE INDEX IF NOT EXISTS tag_val_hex_index ON tag(value_hex);
|
||||
CREATE INDEX IF NOT EXISTS tag_composite_index ON tag(event_id,name,value_hex,value);
|
||||
|
||||
-- NIP-05 User Validation
|
||||
CREATE TABLE IF NOT EXISTS user_verification (
|
||||
@@ -163,7 +164,9 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<()> {
|
||||
if curr_version == 8 {
|
||||
curr_version = mig_8_to_9(conn)?;
|
||||
}
|
||||
|
||||
if curr_version == 9 {
|
||||
curr_version = mig_9_to_10(conn)?;
|
||||
}
|
||||
if curr_version == DB_VERSION {
|
||||
info!(
|
||||
"All migration scripts completed successfully. Welcome to v{}.",
|
||||
@@ -419,5 +422,24 @@ PRAGMA user_version = 9;
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
Ok(8)
|
||||
Ok(9)
|
||||
}
|
||||
|
||||
fn mig_9_to_10(conn: &mut PooledConnection) -> Result<usize> {
|
||||
info!("database schema needs update from 9->10");
|
||||
// Those old indexes were actually helpful...
|
||||
let upgrade_sql = r##"
|
||||
CREATE INDEX IF NOT EXISTS tag_composite_index ON tag(event_id,name,value_hex,value);
|
||||
PRAGMA user_version = 10;
|
||||
"##;
|
||||
match conn.execute_batch(upgrade_sql) {
|
||||
Ok(()) => {
|
||||
info!("database schema upgraded v9 -> v10");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
Ok(10)
|
||||
}
|
||||
|
@@ -14,6 +14,7 @@ use crate::notice::Notice;
|
||||
use crate::subscription::Subscription;
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
use governor::{Jitter, Quota, RateLimiter};
|
||||
use http::header::HeaderMap;
|
||||
use hyper::header::ACCEPT;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
@@ -438,6 +439,19 @@ async fn nostr_server(
|
||||
let mut bcast_rx = broadcast.subscribe();
|
||||
// Track internal client state
|
||||
let mut conn = conn::ClientConn::new(client_info.remote_ip);
|
||||
// subscription creation rate limiting
|
||||
let mut sub_lim_opt = None;
|
||||
// 100ms jitter when the rate limiter returns
|
||||
let jitter = Jitter::up_to(Duration::from_millis(100));
|
||||
let sub_per_min_setting = settings.limits.subscriptions_per_min;
|
||||
if let Some(sub_per_min) = sub_per_min_setting {
|
||||
if sub_per_min > 0 {
|
||||
trace!("Rate limits for sub creation ({}/min)", sub_per_min);
|
||||
let quota_time = core::num::NonZeroU32::new(sub_per_min).unwrap();
|
||||
let quota = Quota::per_minute(quota_time);
|
||||
sub_lim_opt = Some(RateLimiter::direct(quota));
|
||||
}
|
||||
}
|
||||
// Use the remote IP as the client identifier
|
||||
let cid = conn.get_client_prefix();
|
||||
// Create a channel for receiving query results from the database.
|
||||
@@ -606,11 +620,15 @@ async fn nostr_server(
|
||||
Ok(NostrMessage::SubMsg(s)) => {
|
||||
debug!("subscription requested (cid: {}, sub: {:?})", cid, s.id);
|
||||
// subscription handling consists of:
|
||||
// * check for rate limits
|
||||
// * registering the subscription so future events can be matched
|
||||
// * making a channel to cancel to request later
|
||||
// * sending a request for a SQL query
|
||||
// Do nothing if the sub already exists.
|
||||
if !current_subs.contains(&s) {
|
||||
if let Some(ref lim) = sub_lim_opt {
|
||||
lim.until_ready_with_jitter(jitter).await;
|
||||
}
|
||||
current_subs.push(s.clone());
|
||||
let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>();
|
||||
match conn.subscribe(s.clone()) {
|
||||
@@ -619,7 +637,7 @@ async fn nostr_server(
|
||||
if let Some(previous_query) = running_queries.insert(s.id.to_owned(), abandon_query_tx) {
|
||||
previous_query.send(()).ok();
|
||||
}
|
||||
// start a database query
|
||||
// start a database query. this spawns a blocking database query on a worker thread.
|
||||
db::db_query(s, cid.to_owned(), pool.clone(), query_tx.clone(), abandon_query_rx).await;
|
||||
},
|
||||
Err(e) => {
|
||||
|
Reference in New Issue
Block a user