Compare commits

...

16 Commits
0.1.4 ... 0.2.0

Author SHA1 Message Date
Greg Heartsfield
f7f12a7984 build: bump version to 0.2.0 2021-12-30 21:27:48 -06:00
Greg Heartsfield
20ee5a054c feat: rate limit event creation
A configuration option, `messages_per_sec`, imposes a global limit on
the rate for which new events can be stored.

Fixes https://todo.sr.ht/~gheartsfield/nostr-rs-relay/6
2021-12-30 21:07:21 -06:00
Greg Heartsfield
c60519de23 feat: hide older contact update events
Type 3 (NIP-02) contact lists are hidden when newer ones are submitted
for the same author.

Fixes https://todo.sr.ht/~gheartsfield/nostr-rs-relay/4
2021-12-30 15:45:03 -06:00
Greg Heartsfield
d72e7a57b6 feat: hide older metadata update events
This updates the database schema to support hiding events.  In this
case, we are hiding older metadata updates when an author provides an
updated event.

Fixes https://todo.sr.ht/~gheartsfield/nostr-rs-relay/11
2021-12-30 13:55:05 -06:00
Greg Heartsfield
6447ddd974 fix: compile error with missing import 2021-12-30 10:00:34 -06:00
Greg Heartsfield
079722ddd9 improvement: reduce logging level for rejected events 2021-12-30 06:35:36 -06:00
Greg Heartsfield
3302fb2e81 refactor: clippy suggestions 2021-12-29 22:49:46 -06:00
Greg Heartsfield
f415295184 feat: reject future-dated events
If configured, reject events than are more than N seconds in the
future.

Fixes https://todo.sr.ht/~gheartsfield/nostr-rs-relay/5
2021-12-29 22:47:31 -06:00
Greg Heartsfield
d730bf0c59 feat: add configuration through file
A file named `config.toml` can now be used to load the address, port,
and some websocket configuration settings.

Fixes https://todo.sr.ht/~gheartsfield/nostr-rs-relay/3
2021-12-29 22:13:02 -06:00
Greg Heartsfield
2e2e01203b build: bump version to 0.1.6 2021-12-23 21:44:12 -06:00
Greg Heartsfield
100f890284 feat: add until for request filters
This implements an additional filter criteria for selecting events
prior to some timestamp.

See https://github.com/fiatjaf/nostr/issues/39x
2021-12-23 21:38:32 -06:00
Greg Heartsfield
0e288fe678 feat: send messages in order of oldest to newest 2021-12-23 21:36:46 -06:00
Greg Heartsfield
bfc804e18c feat: debug protocol messages 2021-12-23 21:30:04 -06:00
Greg Heartsfield
8a8ee5c425 build: bump version to 0.1.5 2021-12-19 16:45:17 -06:00
Greg Heartsfield
55bb6bd440 feat: add resource limits for websocket messages 2021-12-19 16:26:32 -06:00
Greg Heartsfield
7933abaa48 fix: allow unknown fields, like author 2021-12-19 16:18:03 -06:00
12 changed files with 639 additions and 33 deletions

307
Cargo.lock generated
View File

@@ -22,6 +22,12 @@ dependencies = [
"memchr",
]
[[package]]
name = "arrayvec"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
[[package]]
name = "atty"
version = "0.2.14"
@@ -57,7 +63,7 @@ version = "0.9.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ce18265ec2324ad075345d5814fbeed4f41f0a660055dc78840b74d19b874b1"
dependencies = [
"serde",
"serde 1.0.131",
]
[[package]]
@@ -75,6 +81,12 @@ dependencies = [
"generic-array",
]
[[package]]
name = "bumpalo"
version = "3.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f1e260c3a9040a7c19a12468758f4c16f31a81a1fe087482be9570ec864bb6c"
[[package]]
name = "byteorder"
version = "1.4.3"
@@ -108,6 +120,22 @@ dependencies = [
"bitflags",
]
[[package]]
name = "config"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b1b9d958c2b1368a663f05538fc1b5975adce1e19f435acceae987aceeeb369"
dependencies = [
"lazy_static",
"nom",
"rust-ini",
"serde 1.0.131",
"serde-hjson",
"serde_json",
"toml",
"yaml-rust",
]
[[package]]
name = "cpufeatures"
version = "0.2.1"
@@ -117,6 +145,27 @@ dependencies = [
"libc",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db"
dependencies = [
"cfg-if",
"lazy_static",
]
[[package]]
name = "dashmap"
version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b799062aaf67eb976af3bdca031ee6f846d2f0a5710ddbb0d2efee33f3cc4760"
dependencies = [
"cfg-if",
"num_cpus",
"parking_lot",
]
[[package]]
name = "digest"
version = "0.9.0"
@@ -244,6 +293,12 @@ version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dabf1872aaab32c886832f2276d2f5399887e2bd613698a02359e4ea83f8de12"
[[package]]
name = "futures-timer"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
[[package]]
name = "futures-util"
version = "0.3.18"
@@ -283,6 +338,23 @@ dependencies = [
"wasi",
]
[[package]]
name = "governor"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3de427a64787873c3b196285e6684cddbf0ae7d1d8d56eaafbb4120c4cb641"
dependencies = [
"dashmap",
"futures",
"futures-timer",
"no-std-compat",
"nonzero_ext",
"parking_lot",
"quanta",
"rand 0.8.4",
"smallvec",
]
[[package]]
name = "hashbrown"
version = "0.11.2"
@@ -365,6 +437,34 @@ version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
[[package]]
name = "js-sys"
version = "0.3.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cc9ffccd38c451a86bf13657df244e9c3f37493cce8e5e21e940963777acc84"
dependencies = [
"wasm-bindgen",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "lexical-core"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6607c62aa161d23d17a9072cc5da0be67cdfc89d3afb1e8d9c842bebc2525ffe"
dependencies = [
"arrayvec",
"bitflags",
"cfg-if",
"ryu",
"static_assertions",
]
[[package]]
name = "libc"
version = "0.2.111"
@@ -381,6 +481,12 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "linked-hash-map"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3"
[[package]]
name = "lock_api"
version = "0.4.5"
@@ -399,6 +505,15 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "mach"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa"
dependencies = [
"libc",
]
[[package]]
name = "matches"
version = "0.1.9"
@@ -433,19 +548,46 @@ dependencies = [
"winapi",
]
[[package]]
name = "no-std-compat"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c"
[[package]]
name = "nom"
version = "5.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffb4262d26ed83a1c0a33a38fe2bb15797329c85770da05e6b828ddb782627af"
dependencies = [
"lexical-core",
"memchr",
"version_check",
]
[[package]]
name = "nonzero_ext"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]]
name = "nostr-rs-relay"
version = "0.1.4"
version = "0.2.0"
dependencies = [
"bitcoin_hashes",
"config",
"env_logger",
"futures",
"futures-util",
"governor",
"hex",
"lazy_static",
"log",
"nonzero_ext",
"rusqlite",
"secp256k1",
"serde",
"serde 1.0.131",
"serde_json",
"thiserror",
"tokio",
@@ -463,6 +605,24 @@ dependencies = [
"winapi",
]
[[package]]
name = "num-traits"
version = "0.1.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92e5113e9fd4cc14ded8e499429f396a20f98c772a47cc8622a736e1ec843c31"
dependencies = [
"num-traits 0.2.14",
]
[[package]]
name = "num-traits"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290"
dependencies = [
"autocfg 1.0.1",
]
[[package]]
name = "num_cpus"
version = "1.13.0"
@@ -549,6 +709,22 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "quanta"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20afe714292d5e879d8b12740aa223c6a88f118af41870e8b6196e39a02238a8"
dependencies = [
"crossbeam-utils",
"libc",
"mach",
"once_cell",
"raw-cpuid",
"wasi",
"web-sys",
"winapi",
]
[[package]]
name = "quote"
version = "1.0.10"
@@ -704,6 +880,15 @@ dependencies = [
"rand_core 0.3.1",
]
[[package]]
name = "raw-cpuid"
version = "10.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "929f54e29691d4e6a9cc558479de70db7aa3d98cd6fe7ab86d7507aa2886b9d2"
dependencies = [
"bitflags",
]
[[package]]
name = "rdrand"
version = "0.4.0"
@@ -754,6 +939,12 @@ dependencies = [
"smallvec",
]
[[package]]
name = "rust-ini"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e52c148ef37f8c375d49d5a73aa70713125b7f19095948a923f80afdeb22ec2"
[[package]]
name = "ryu"
version = "1.0.7"
@@ -775,7 +966,7 @@ dependencies = [
"bitcoin_hashes",
"rand 0.6.5",
"secp256k1-sys",
"serde",
"serde 1.0.131",
]
[[package]]
@@ -787,6 +978,12 @@ dependencies = [
"cc",
]
[[package]]
name = "serde"
version = "0.8.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9dad3f759919b92c3068c696c15c3d17238234498bbdcc80f2c469606f948ac8"
[[package]]
name = "serde"
version = "1.0.131"
@@ -796,6 +993,18 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "serde-hjson"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a3a4e0ea8a88553209f6cc6cfe8724ecad22e1acf372793c27d995290fe74f8"
dependencies = [
"lazy_static",
"num-traits 0.1.43",
"regex",
"serde 0.8.23",
]
[[package]]
name = "serde_derive"
version = "1.0.131"
@@ -815,7 +1024,7 @@ checksum = "d0ffa0837f2dfa6fb90868c2b5468cad482e175f7dad97e7421951e663f2b527"
dependencies = [
"itoa",
"ryu",
"serde",
"serde 1.0.131",
]
[[package]]
@@ -852,6 +1061,12 @@ version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309"
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "syn"
version = "1.0.82"
@@ -950,6 +1165,15 @@ dependencies = [
"tungstenite",
]
[[package]]
name = "toml"
version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a31142970826733df8241ef35dc040ef98c679ab14d7c3e54d827099b3acecaa"
dependencies = [
"serde 1.0.131",
]
[[package]]
name = "tungstenite"
version = "0.16.0"
@@ -1041,6 +1265,70 @@ version = "0.10.2+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
[[package]]
name = "wasm-bindgen"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce"
dependencies = [
"cfg-if",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a317bf8f9fba2476b4b2c85ef4c4af8ff39c3c7f0cdfeed4f82c34a880aa837b"
dependencies = [
"bumpalo",
"lazy_static",
"log",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d56146e7c495528bf6587663bea13a8eb588d39b36b679d83972e1a2dbbdacf9"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7803e0eea25835f8abdc585cd3021b3deb11543c6fe226dcd30b228857c5c5ab"
dependencies = [
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0237232789cf037d5480773fe568aac745bfe2afbc11a863e97901780a6b47cc"
[[package]]
name = "web-sys"
version = "0.3.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38eb105f1c59d9eaa6b5cdc92b859d85b926e82cb2e0945cd0c9259faa6fe9fb"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "winapi"
version = "0.3.9"
@@ -1071,3 +1359,12 @@ name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "yaml-rust"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85"
dependencies = [
"linked-hash-map",
]

View File

@@ -1,6 +1,6 @@
[package]
name = "nostr-rs-relay"
version = "0.1.4"
version = "0.2.0"
edition = "2021"
[dependencies]
@@ -13,10 +13,13 @@ tokio-tungstenite = "^0.16"
tungstenite = "^0.16"
thiserror = "^1"
uuid = { version = "^0.8", features = ["v4"] }
config = { version = "0.11", features = ["toml"] }
bitcoin_hashes = { version = "^0.9", features = ["serde"] }
secp256k1 = { version = "^0.20", features = ["rand", "rand-std", "serde", "bitcoin_hashes"] }
serde = { version = "^1.0", features = ["derive"] }
serde_json = "^1.0"
hex = "^0.4"
rusqlite = "^0.26"
lazy_static = "^1.4"
governor = "^0.4"
nonzero_ext = "^0.3"

32
config.toml Normal file
View File

@@ -0,0 +1,32 @@
# Nostr-rs-relay configuration
[network]
# Bind to this network address
address = "0.0.0.0"
# Listen on this port
port = 8080
[options]
# Reject events that have timestamps greater than this many seconds in
# the future. Defaults to rejecting anything greater than 30 minutes
# from the current time.
#reject_future_seconds = 1800
[limits]
# Limit events created per second, averaged over one minute. Must be
# an integer. If not set (or set to 0), defaults to unlimited.
messages_per_sec = 0
# Maximum WebSocket message in bytes. Defaults to 128k.
#max_ws_message_bytes = 131072
# Maximum WebSocket frame size in bytes. Defaults to 128k.
#max_ws_frame_bytes = 131072
# Broadcast buffer size, in number of events. This prevents slow
# readers from consuming memory. Defaults to 4096.
#broadcast_buffer = 4096
# Event persistence buffer size, in number of events. This provides
# backpressure to senders if writes are slow. Defaults to 16.
#event_persist_buffer = 16

108
src/config.rs Normal file
View File

@@ -0,0 +1,108 @@
use lazy_static::lazy_static;
use log::*;
use serde::{Deserialize, Serialize};
use std::sync::RwLock;
// initialize a singleton default configuration
lazy_static! {
pub static ref SETTINGS: RwLock<Settings> = RwLock::new(Settings::default());
}
#[derive(Debug, Serialize, Deserialize)]
#[allow(unused)]
pub struct Network {
pub port: u16,
pub address: String,
}
//
#[derive(Debug, Serialize, Deserialize)]
#[allow(unused)]
pub struct Options {
pub reject_future_seconds: Option<usize>, // if defined, reject any events with a timestamp more than X seconds in the future
}
#[derive(Debug, Serialize, Deserialize)]
#[allow(unused)]
pub struct Retention {
// TODO: implement
pub max_events: Option<usize>, // max events
pub max_bytes: Option<usize>, // max size
pub persist_days: Option<usize>, // oldest message
pub whitelist_addresses: Option<Vec<String>>, // whitelisted addresses (never delete)
}
#[derive(Debug, Serialize, Deserialize)]
#[allow(unused)]
pub struct Limits {
pub messages_per_sec: Option<u32>, // Artificially slow down event writing to limit disk consumption (averaged over 1 minute)
pub max_event_bytes: Option<usize>,
pub max_ws_message_bytes: Option<usize>,
pub max_ws_frame_bytes: Option<usize>,
pub broadcast_buffer: usize, // events to buffer for subscribers (prevents slow readers from consuming memory)
pub event_persist_buffer: usize, // events to buffer for database commits (block senders if database writes are too slow)
}
#[derive(Debug, Serialize, Deserialize)]
#[allow(unused)]
pub struct Settings {
pub network: Network,
pub limits: Limits,
pub retention: Retention,
pub options: Options,
}
impl Settings {
pub fn new() -> Self {
let d = Self::default();
// attempt to construct settings with file
// Self::new_from_default(&d).unwrap_or(d)
let from_file = Self::new_from_default(&d);
match from_file {
Ok(f) => f,
Err(e) => {
warn!("Error reading config file ({:?})", e);
d
}
}
}
fn new_from_default(default: &Settings) -> Result<Self, config::ConfigError> {
let config: config::Config = config::Config::new();
let settings: Settings = config
// use defaults
.with_merged(config::Config::try_from(default).unwrap())?
// override with file contents
.with_merged(config::File::with_name("config"))?
.try_into()?;
Ok(settings)
}
}
impl Default for Settings {
fn default() -> Self {
Settings {
network: Network {
port: 8080,
address: "0.0.0.0".to_owned(),
},
limits: Limits {
messages_per_sec: None,
max_event_bytes: Some(2 << 17), // 128K
max_ws_message_bytes: Some(2 << 17), // 128K
max_ws_frame_bytes: Some(2 << 17), // 128K
broadcast_buffer: 4096,
event_persist_buffer: 16,
},
retention: Retention {
max_events: None, // max events
max_bytes: None, // max size
persist_days: None, // oldest message
whitelist_addresses: None, // whitelisted addresses (never delete)
},
options: Options {
reject_future_seconds: Some(30 * 60), // Reject events 30min in the future or greater
},
}
}
}

View File

@@ -53,7 +53,7 @@ impl ClientConn {
v.push(id);
}
}
return v;
v
}
/// Add a new subscription for this connection.

133
src/db.rs
View File

@@ -2,17 +2,29 @@
use crate::error::Result;
use crate::event::Event;
use crate::subscription::Subscription;
use governor::clock::Clock;
use governor::{Quota, RateLimiter};
use hex;
use log::*;
use rusqlite::params;
use rusqlite::Connection;
use rusqlite::OpenFlags;
//use std::num::NonZeroU32;
use crate::config::SETTINGS;
use std::path::Path;
use std::thread;
use tokio::task;
/// Database file
const DB_FILE: &str = "nostr.db";
/// Startup DB Pragmas
const STARTUP_SQL: &str = r##"
PRAGMA main.synchronous=NORMAL;
PRAGMA foreign_keys = ON;
pragma mmap_size = 536870912; -- 512MB of mmap
"##;
/// Schema definition
const INIT_SQL: &str = r##"
-- Database settings
@@ -21,8 +33,7 @@ PRAGMA journal_mode=WAL;
PRAGMA main.synchronous=NORMAL;
PRAGMA foreign_keys = ON;
PRAGMA application_id = 1654008667;
PRAGMA user_version = 1;
pragma mmap_size = 536870912; -- 512MB of mmap
PRAGMA user_version = 2;
-- Event Table
CREATE TABLE IF NOT EXISTS event (
@@ -32,6 +43,7 @@ first_seen INTEGER NOT NULL, -- when the event was first seen (not authored!) (s
created_at INTEGER NOT NULL, -- when the event was authored
author BLOB NOT NULL, -- author pubkey
kind INTEGER NOT NULL, -- event kind
hidden INTEGER, -- relevant for queries
content TEXT NOT NULL -- serialized json of event object
);
@@ -64,10 +76,50 @@ FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE RESTRICT ON DELETE CASCADE
CREATE INDEX IF NOT EXISTS pubkey_ref_index ON pubkey_ref(referenced_pubkey);
"##;
/// Upgrade DB to latest version, and execute pragma settings
pub fn upgrade_db(conn: &mut Connection) -> Result<()> {
// check the version.
let curr_version = db_version(conn)?;
info!("DB version = {:?}", curr_version);
// initialize from scratch
if curr_version == 0 {
match conn.execute_batch(INIT_SQL) {
Ok(()) => info!("database pragma/schema initialized to v2, and ready"),
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be initialized");
}
}
} else if curr_version == 1 {
// only change is adding a hidden column to events.
let upgrade_sql = r##"
ALTER TABLE event ADD hidden INTEGER;
UPDATE event SET hidden=FALSE;
PRAGMA user_version = 2;
"##;
match conn.execute_batch(upgrade_sql) {
Ok(()) => info!("database schema upgraded v1 -> v2"),
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be upgraded");
}
}
} else if curr_version == 2 {
debug!("Database version was already current");
} else if curr_version > 2 {
panic!("Database version is newer than supported by this executable");
}
// Setup PRAGMA
conn.execute_batch(STARTUP_SQL)?;
Ok(())
}
/// Spawn a database writer that persists events to the SQLite store.
pub async fn db_writer(
mut event_rx: tokio::sync::mpsc::Receiver<Event>,
bcast_tx: tokio::sync::broadcast::Sender<Event>,
mut shutdown: tokio::sync::broadcast::Receiver<()>,
) -> tokio::task::JoinHandle<Result<()>> {
task::spawn_blocking(move || {
let mut conn = Connection::open_with_flags(
@@ -75,26 +127,39 @@ pub async fn db_writer(
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
)?;
info!("opened database for writing");
// TODO: determine if we need to execute the init script.
// TODO: check database app id / version before proceeding.
match conn.execute_batch(INIT_SQL) {
Ok(()) => info!("database pragma/schema initialized and ready"),
Err(err) => error!("update failed: {}", err),
upgrade_db(&mut conn)?;
// get rate limit settings
let config = SETTINGS.read().unwrap();
let rps_setting = config.limits.messages_per_sec;
let mut lim_opt = None;
let clock = governor::clock::QuantaClock::default();
if let Some(rps) = rps_setting {
if rps > 0 {
info!("Enabling rate limits for event creation ({}/sec)", rps);
let quota = core::num::NonZeroU32::new(rps * 60).unwrap();
lim_opt = Some(RateLimiter::direct(Quota::per_minute(quota)));
}
}
loop {
if let Ok(_) = shutdown.try_recv() {
info!("shutting down database writer");
break;
}
// call blocking read on channel
let next_event = event_rx.blocking_recv();
// if the channel has closed, we will never get work
if next_event.is_none() {
break;
}
let mut event_write = false;
let event = next_event.unwrap();
match write_event(&mut conn, &event) {
Ok(updated) => {
if updated == 0 {
info!("nothing inserted (dupe?)");
debug!("ignoring duplicate event");
} else {
info!("persisted event: {}", event.get_event_id_prefix());
event_write = true;
// send this out to all clients
bcast_tx.send(event.clone()).ok();
}
@@ -103,6 +168,16 @@ pub async fn db_writer(
warn!("event insert failed: {}", err);
}
}
// use rate limit, if defined, and if an event was actually written.
if event_write {
if let Some(ref lim) = lim_opt {
if let Err(n) = lim.check() {
info!("Rate limiting event creation");
thread::sleep(n.wait_time_from(clock.now()));
continue;
}
}
}
}
conn.close().ok();
info!("database connection closed");
@@ -110,6 +185,12 @@ pub async fn db_writer(
})
}
pub fn db_version(conn: &mut Connection) -> Result<usize> {
let query = "PRAGMA user_version;";
let curr_version = conn.query_row(query, [], |row| row.get(0))?;
Ok(curr_version)
}
/// Persist an event to the database.
pub fn write_event(conn: &mut Connection, e: &Event) -> Result<usize> {
// start transaction
@@ -120,7 +201,7 @@ pub fn write_event(conn: &mut Connection, e: &Event) -> Result<usize> {
let event_str = serde_json::to_string(&e).ok();
// ignore if the event hash is a duplicate.
let ins_count = tx.execute(
"INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, content, first_seen) VALUES (?1, ?2, ?3, ?4, ?5, strftime('%s','now'));",
"INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, strftime('%s','now'), FALSE);",
params![id_blob, e.created_at, e.kind, pubkey_blob, event_str]
)?;
if ins_count == 0 {
@@ -150,6 +231,28 @@ pub fn write_event(conn: &mut Connection, e: &Event) -> Result<usize> {
)?;
}
}
// if this event is for a metadata update, hide every other kind=0
// event from the same author that was issued earlier than this.
if e.kind == 0 {
let update_count = tx.execute(
"UPDATE event SET hidden=TRUE WHERE id!=? AND kind=0 AND author=? AND created_at <= ? and hidden!=TRUE",
params![ev_id, hex::decode(&e.pubkey).ok(), e.created_at],
)?;
if update_count > 0 {
info!("hid {} older metadata events", update_count);
}
}
// if this event is for a contact update, hide every other kind=3
// event from the same author that was issued earlier than this.
if e.kind == 3 {
let update_count = tx.execute(
"UPDATE event SET hidden=TRUE WHERE id!=? AND kind=3 AND author=? AND created_at <= ? and hidden!=TRUE",
params![ev_id, hex::decode(&e.pubkey).ok(), e.created_at],
)?;
if update_count > 0 {
info!("hid {} older contact events", update_count);
}
}
tx.commit()?;
Ok(ins_count)
}
@@ -229,6 +332,12 @@ fn query_from_sub(sub: &Subscription) -> String {
let created_clause = format!("created_at > {}", f.since.unwrap());
filter_components.push(created_clause);
}
// Query for timestamp
if f.until.is_some() {
let until_clause = format!("created_at < {}", f.until.unwrap());
filter_components.push(until_clause);
}
// combine all clauses, and add to filter_clauses
if !filter_components.is_empty() {
let mut fc = "( ".to_owned();
@@ -236,8 +345,8 @@ fn query_from_sub(sub: &Subscription) -> String {
fc.push_str(" )");
filter_clauses.push(fc);
} else {
// if there are no filter clauses, we should return everything
filter_clauses.push(" 1=1 ".to_owned());
// never display hidden events
filter_clauses.push("hidden!=FALSE".to_owned());
}
}
@@ -246,6 +355,8 @@ fn query_from_sub(sub: &Subscription) -> String {
query.push_str(" WHERE ");
query.push_str(&filter_clauses.join(" OR "));
}
// add order clause
query.push_str(" ORDER BY created_at ASC");
debug!("query string: {}", query);
query
}

View File

@@ -34,6 +34,8 @@ pub enum Error {
CommandUnknownError,
#[error("SQL error")]
SqlError(rusqlite::Error),
#[error("Config error")]
ConfigError(config::ConfigError),
}
impl From<rusqlite::Error> for Error {
@@ -56,3 +58,10 @@ impl From<WsError> for Error {
Error::WebsocketError(r)
}
}
impl From<config::ConfigError> for Error {
/// Wrap Config error
fn from(r: config::ConfigError) -> Self {
Error::ConfigError(r)
}
}

View File

@@ -1,13 +1,15 @@
//! Event parsing and validation
use crate::config;
use crate::error::Error::*;
use crate::error::Result;
use bitcoin_hashes::{sha256, Hash};
use log::info;
use log::*;
use secp256k1::{schnorrsig, Secp256k1};
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::value::Value;
use serde_json::Number;
use std::str::FromStr;
use std::time::SystemTime;
/// Event command in network format
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
@@ -56,6 +58,14 @@ impl From<EventCmd> for Result<Event> {
}
}
/// Seconds since 1970
fn unix_time() -> u64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|x| x.as_secs())
.unwrap_or(0)
}
impl Event {
/// Create a short event identifier, suitable for logging.
pub fn get_event_id_prefix(&self) -> String {
@@ -64,6 +74,22 @@ impl Event {
/// Check if this event has a valid signature.
fn is_valid(&self) -> bool {
// TODO: return a Result with a reason for invalid events
// don't bother to validate an event with a timestamp in the distant future.
let config = config::SETTINGS.read().unwrap();
let max_future_sec = config.options.reject_future_seconds;
if let Some(allowable_future) = max_future_sec {
let curr_time = unix_time();
// calculate difference, plus how far future we allow
if curr_time + (allowable_future as u64) < self.created_at {
let delta = self.created_at - curr_time;
debug!(
"Event is too far in the future ({} seconds), rejecting",
delta
);
return false;
}
}
// validation is performed by:
// * parsing JSON string into event fields
// * create an array:

View File

@@ -1,4 +1,5 @@
pub mod close;
pub mod config;
pub mod conn;
pub mod db;
pub mod error;

View File

@@ -3,6 +3,7 @@ use futures::SinkExt;
use futures::StreamExt;
use log::*;
use nostr_rs_relay::close::Close;
use nostr_rs_relay::config;
use nostr_rs_relay::conn;
use nostr_rs_relay::db;
use nostr_rs_relay::error::{Error, Result};
@@ -11,21 +12,27 @@ use nostr_rs_relay::protostream;
use nostr_rs_relay::protostream::NostrMessage::*;
use nostr_rs_relay::protostream::NostrResponse::*;
use std::collections::HashMap;
use std::env;
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime::Builder;
use tokio::sync::broadcast;
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tungstenite::protocol::WebSocketConfig;
/// Start running a Nostr relay server.
fn main() -> Result<(), Error> {
// setup logger and environment
// setup logger
let _ = env_logger::try_init();
let addr = env::args()
.nth(1)
.unwrap_or_else(|| "0.0.0.0:8080".to_string());
{
let mut settings = config::SETTINGS.write().unwrap();
// replace default settings with those read from config.toml
let c = config::Settings::new();
*settings = c;
}
let config = config::SETTINGS.read().unwrap();
debug!("config: {:?}", config);
let addr = format!("{}:{}", config.network.address.trim(), config.network.port);
// configure tokio runtime
let rt = Builder::new_multi_thread()
.enable_all()
@@ -34,20 +41,17 @@ fn main() -> Result<(), Error> {
.unwrap();
// start tokio
rt.block_on(async {
let settings = config::SETTINGS.read().unwrap();
let listener = TcpListener::bind(&addr).await.expect("Failed to bind");
info!("listening on: {}", addr);
// all client-submitted valid events are broadcast to every
// other client on this channel. This should be large enough
// to accomodate slower readers (messages are dropped if
// clients can not keep up).
let (bcast_tx, _) = broadcast::channel::<Event>(4096);
let (bcast_tx, _) = broadcast::channel::<Event>(settings.limits.broadcast_buffer);
// validated events that need to be persisted are sent to the
// database on via this channel.
let (event_tx, event_rx) = mpsc::channel::<Event>(16);
// start the database writer thread. Give it a channel for
// writing events, and for publishing events that have been
// written (to all connected clients).
db::db_writer(event_rx, bcast_tx.clone()).await;
let (event_tx, event_rx) = mpsc::channel::<Event>(settings.limits.event_persist_buffer);
// establish a channel for letting all threads now about a
// requested server shutdown.
let (invoke_shutdown, _) = broadcast::channel::<()>(1);
@@ -58,6 +62,11 @@ fn main() -> Result<(), Error> {
info!("shutting down due to SIGINT");
ctrl_c_shutdown.send(()).ok();
});
// start the database writer thread. Give it a channel for
// writing events, and for publishing events that have been
// written (to all connected clients).
db::db_writer(event_rx, bcast_tx.clone(), invoke_shutdown.subscribe()).await;
// track unique client connection count
let mut client_accept_count: usize = 0;
let mut stop_listening = invoke_shutdown.subscribe();
@@ -93,8 +102,14 @@ async fn nostr_server(
) {
// get a broadcast channel for clients to communicate on
let mut bcast_rx = broadcast.subscribe();
let mut config = WebSocketConfig::default();
{
let settings = config::SETTINGS.read().unwrap();
config.max_message_size = settings.limits.max_ws_message_bytes;
config.max_frame_size = settings.limits.max_ws_frame_bytes;
}
// upgrade the TCP connection to WebSocket
let conn = tokio_tungstenite::accept_async(stream).await;
let conn = tokio_tungstenite::accept_async_with_config(stream, Some(config)).await;
let ws_stream = conn.expect("websocket handshake error");
// wrap websocket into a stream & sink of Nostr protocol messages
let mut nostr_stream = protostream::wrap_ws_in_nostr(ws_stream);

View File

@@ -53,6 +53,9 @@ impl Stream for NostrStream {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
/// Convert Message to NostrMessage
fn convert(msg: String) -> Result<NostrMessage> {
debug!("raw msg: {}", msg);
let event_size = msg.len();
debug!("event size is {} bytes", event_size);
let parsed_res: Result<NostrMessage> = serde_json::from_str(&msg).map_err(|e| e.into());
match parsed_res {
Ok(m) => Ok(m),

View File

@@ -16,7 +16,6 @@ pub struct Subscription {
/// element can be present if it should be used in filtering, or
/// absent ([`None`]) if it should be ignored.
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct ReqFilter {
/// Event hash
pub id: Option<String>,
@@ -30,6 +29,8 @@ pub struct ReqFilter {
pub pubkey: Option<String>,
/// Events published after this time
pub since: Option<u64>,
/// Events published before this time
pub until: Option<u64>,
/// List of author public keys
pub authors: Option<Vec<String>>,
}