mirror of
https://github.com/scsibug/nostr-rs-relay.git
synced 2025-09-01 03:40:46 -04:00
Compare commits
20 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
d78bbfc290 | ||
|
2924da88bc | ||
|
3024e9fba4 | ||
|
d3da4eb009 | ||
|
19637d612e | ||
|
afc9a0096a | ||
|
3d56262386 | ||
|
6673fcfd11 | ||
|
b5da3fa2b0 | ||
|
850957213e | ||
|
1aa5a5458d | ||
|
620e227699 | ||
|
14e59ed278 | ||
|
5ad383f257 | ||
|
9710ea27aa | ||
|
783a6e1042 | ||
|
4171a8870e | ||
|
8f3891c781 | ||
|
415d32299b | ||
|
5a19a8876f |
156
Cargo.lock
generated
156
Cargo.lock
generated
@@ -66,6 +66,12 @@ dependencies = [
|
||||
"serde 1.0.131",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bitcoin_hashes"
|
||||
version = "0.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "006cc91e1a1d99819bc5b8214be3555c1f0611b169f527a1fdc54ed1f2b745b0"
|
||||
|
||||
[[package]]
|
||||
name = "bitflags"
|
||||
version = "1.3.2"
|
||||
@@ -355,6 +361,25 @@ dependencies = [
|
||||
"smallvec",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "h2"
|
||||
version = "0.3.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8f072413d126e57991455e0a922b31e4c8ba7c2ffbebf6b78b4f8521397d65cd"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fnv",
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
"futures-util",
|
||||
"http",
|
||||
"indexmap",
|
||||
"slab",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.11.2"
|
||||
@@ -399,18 +424,59 @@ dependencies = [
|
||||
"itoa",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "http-body"
|
||||
version = "0.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"http",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "httparse"
|
||||
version = "1.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "acd94fdbe1d4ff688b67b04eee2e17bd50995534a61539e45adfefb45e5e5503"
|
||||
|
||||
[[package]]
|
||||
name = "httpdate"
|
||||
version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
|
||||
|
||||
[[package]]
|
||||
name = "humantime"
|
||||
version = "2.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
|
||||
|
||||
[[package]]
|
||||
name = "hyper"
|
||||
version = "0.14.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b7ec3e62bdc98a2f0393a5048e4c30ef659440ea6e0e572965103e72bd836f55"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"h2",
|
||||
"http",
|
||||
"http-body",
|
||||
"httparse",
|
||||
"httpdate",
|
||||
"itoa",
|
||||
"pin-project-lite",
|
||||
"socket2",
|
||||
"tokio",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
"want",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "idna"
|
||||
version = "0.2.3"
|
||||
@@ -422,6 +488,16 @@ dependencies = [
|
||||
"unicode-normalization",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "indexmap"
|
||||
version = "1.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5"
|
||||
dependencies = [
|
||||
"autocfg 1.0.1",
|
||||
"hashbrown",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "instant"
|
||||
version = "0.1.12"
|
||||
@@ -573,15 +649,16 @@ checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
|
||||
|
||||
[[package]]
|
||||
name = "nostr-rs-relay"
|
||||
version = "0.2.0"
|
||||
version = "0.3.3"
|
||||
dependencies = [
|
||||
"bitcoin_hashes",
|
||||
"bitcoin_hashes 0.9.7",
|
||||
"config",
|
||||
"env_logger",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"governor",
|
||||
"hex",
|
||||
"hyper",
|
||||
"lazy_static",
|
||||
"log",
|
||||
"nonzero_ext",
|
||||
@@ -960,10 +1037,9 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
|
||||
[[package]]
|
||||
name = "secp256k1"
|
||||
version = "0.20.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "97d03ceae636d0fed5bae6a7f4f664354c5f4fcedf6eef053fef17e49f837d0a"
|
||||
source = "git+https://github.com/rust-bitcoin/rust-secp256k1.git?rev=50034ccb18fdd84904ab3aa6c84a12fcced33209#50034ccb18fdd84904ab3aa6c84a12fcced33209"
|
||||
dependencies = [
|
||||
"bitcoin_hashes",
|
||||
"bitcoin_hashes 0.10.0",
|
||||
"rand 0.6.5",
|
||||
"secp256k1-sys",
|
||||
"serde 1.0.131",
|
||||
@@ -972,8 +1048,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "secp256k1-sys"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "827cb7cce42533829c792fc51b82fbf18b125b45a702ef2c8be77fce65463a7b"
|
||||
source = "git+https://github.com/rust-bitcoin/rust-secp256k1.git?rev=50034ccb18fdd84904ab3aa6c84a12fcced33209#50034ccb18fdd84904ab3aa6c84a12fcced33209"
|
||||
dependencies = [
|
||||
"cc",
|
||||
]
|
||||
@@ -1022,6 +1097,7 @@ version = "1.0.72"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d0ffa0837f2dfa6fb90868c2b5468cad482e175f7dad97e7421951e663f2b527"
|
||||
dependencies = [
|
||||
"indexmap",
|
||||
"itoa",
|
||||
"ryu",
|
||||
"serde 1.0.131",
|
||||
@@ -1061,6 +1137,16 @@ version = "1.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309"
|
||||
|
||||
[[package]]
|
||||
name = "socket2"
|
||||
version = "0.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5dc90fe6c7be1a323296982db1836d1ea9e47b6839496dde9a541bc496df3516"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "static_assertions"
|
||||
version = "1.1.0"
|
||||
@@ -1165,6 +1251,20 @@ dependencies = [
|
||||
"tungstenite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-util"
|
||||
version = "0.6.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
"log",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "toml"
|
||||
version = "0.5.8"
|
||||
@@ -1174,6 +1274,38 @@ dependencies = [
|
||||
"serde 1.0.131",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower-service"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
|
||||
|
||||
[[package]]
|
||||
name = "tracing"
|
||||
version = "0.1.29"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"pin-project-lite",
|
||||
"tracing-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-core"
|
||||
version = "0.1.21"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "try-lock"
|
||||
version = "0.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
|
||||
|
||||
[[package]]
|
||||
name = "tungstenite"
|
||||
version = "0.16.0"
|
||||
@@ -1259,6 +1391,16 @@ version = "0.9.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe"
|
||||
|
||||
[[package]]
|
||||
name = "want"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0"
|
||||
dependencies = [
|
||||
"log",
|
||||
"try-lock",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wasi"
|
||||
version = "0.10.2+wasi-snapshot-preview1"
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "nostr-rs-relay"
|
||||
version = "0.2.0"
|
||||
version = "0.3.3"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
@@ -15,11 +15,12 @@ thiserror = "^1"
|
||||
uuid = { version = "^0.8", features = ["v4"] }
|
||||
config = { version = "0.11", features = ["toml"] }
|
||||
bitcoin_hashes = { version = "^0.9", features = ["serde"] }
|
||||
secp256k1 = { version = "^0.20", features = ["rand", "rand-std", "serde", "bitcoin_hashes"] }
|
||||
secp256k1 = {git = "https://github.com/rust-bitcoin/rust-secp256k1.git", rev = "50034ccb18fdd84904ab3aa6c84a12fcced33209", features = ["rand", "rand-std", "serde", "bitcoin_hashes"] }
|
||||
serde = { version = "^1.0", features = ["derive"] }
|
||||
serde_json = "^1.0"
|
||||
serde_json = {version = "^1.0", features = ["preserve_order"]}
|
||||
hex = "^0.4"
|
||||
rusqlite = "^0.26"
|
||||
lazy_static = "^1.4"
|
||||
governor = "^0.4"
|
||||
nonzero_ext = "^0.3"
|
||||
hyper={ version="0.14", features=["server","http1","http2","tcp"] }
|
||||
|
@@ -15,7 +15,6 @@ RUN cargo build --release
|
||||
FROM debian:buster-slim
|
||||
ARG APP=/usr/src/app
|
||||
ARG APP_DATA=/usr/src/app/db
|
||||
|
||||
RUN apt-get update \
|
||||
&& apt-get install -y ca-certificates tzdata sqlite3 \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
@@ -35,9 +34,9 @@ COPY --from=builder /nostr-rs-relay/target/release/nostr-rs-relay ${APP}/nostr-r
|
||||
RUN chown -R $APP_USER:$APP_USER ${APP}
|
||||
|
||||
USER $APP_USER
|
||||
WORKDIR ${APP_DATA}
|
||||
WORKDIR ${APP}
|
||||
|
||||
ENV RUST_LOG=info
|
||||
ENV APP_DATA=${APP_DATA}
|
||||
|
||||
|
||||
CMD ["../nostr-rs-relay"]
|
||||
CMD ./nostr-rs-relay --db ${APP_DATA}
|
||||
|
39
README.md
39
README.md
@@ -10,17 +10,25 @@ mirrored on [GitHub](https://github.com/scsibug/nostr-rs-relay).
|
||||
|
||||
## Quick Start
|
||||
|
||||
The provided `Dockerfile` will compile and build the server application. Use a bind mount to store the SQLite database outside of the container image, and map the container's 8080 port to a host port (8090 in the example below).
|
||||
The provided `Dockerfile` will compile and build the server
|
||||
application. Use a bind mount to store the SQLite database outside of
|
||||
the container image, and map the container's 8080 port to a host port
|
||||
(7000 in the example below).
|
||||
|
||||
```console
|
||||
$ docker build -t nostr-rs-relay .
|
||||
$ docker run -p 8090:8080 --mount src=$(pwd)/nostr_data,target=/usr/src/app/db,type=bind nostr-rs-relay
|
||||
[2021-12-12T04:20:47Z INFO nostr_rs_relay] Listening on: 0.0.0.0:8080
|
||||
[2021-12-12T04:20:47Z INFO nostr_rs_relay::db] Opened database for writing
|
||||
[2021-12-12T04:20:47Z INFO nostr_rs_relay::db] init completed
|
||||
|
||||
$ docker run -it -p 7000:8080 \
|
||||
--mount src=$(pwd)/data,target=/usr/src/app/db,type=bind nostr-rs-relay
|
||||
|
||||
[2021-12-31T19:58:31Z INFO nostr_rs_relay] listening on: 0.0.0.0:8080
|
||||
[2021-12-31T19:58:31Z INFO nostr_rs_relay::db] opened database "/usr/src/app/db/nostr.db" for writing
|
||||
[2021-12-31T19:58:31Z INFO nostr_rs_relay::db] DB version = 2
|
||||
```
|
||||
|
||||
Use a `nostr` client such as [`noscl`](https://github.com/fiatjaf/noscl) to publish and query events.
|
||||
Use a `nostr` client such as
|
||||
[`noscl`](https://github.com/fiatjaf/noscl) to publish and query
|
||||
events.
|
||||
|
||||
```console
|
||||
$ noscl publish "hello world"
|
||||
@@ -31,6 +39,25 @@ Text Note [81cf...2652] from 296a...9b92 5 seconds ago
|
||||
hello world
|
||||
```
|
||||
|
||||
A pre-built container is also available on DockerHub:
|
||||
https://hub.docker.com/repository/docker/scsibug/nostr-rs-relay
|
||||
|
||||
## Configuration
|
||||
|
||||
The sample `[config.toml](config.toml)` file demonstrates the
|
||||
configuration available to the relay. This file is optional, but may
|
||||
be mounted into a docker container like so:
|
||||
|
||||
```console
|
||||
$ docker run -it -p 7000:8080 \
|
||||
--mount src=$(pwd)/config.toml,target=/usr/src/app/config.toml,type=bind \
|
||||
--mount src=$(pwd)/data,target=/usr/src/app/db,type=bind \
|
||||
nostr-rs-relay
|
||||
```
|
||||
|
||||
Options include rate-limiting, event size limits, and network address
|
||||
settings.
|
||||
|
||||
License
|
||||
---
|
||||
This project is MIT licensed.
|
||||
|
40
config.toml
40
config.toml
@@ -1,5 +1,27 @@
|
||||
# Nostr-rs-relay configuration
|
||||
|
||||
[info]
|
||||
# The advertised URL for the Nostr websocket.
|
||||
relay_url = "wss://nostr.example.com/"
|
||||
|
||||
# Relay information for clients. Put your unique server name here.
|
||||
name = "nostr-rs-relay"
|
||||
|
||||
# Description
|
||||
description = "A newly created nostr-rs-relay.\n\nCustomize this with your own info."
|
||||
|
||||
# Administrative contact pubkey
|
||||
#pubkey = "0c2d168a4ae8ca58c9f1ab237b5df682599c6c7ab74307ea8b05684b60405d41"
|
||||
|
||||
# Administrative contact email
|
||||
#email = "contact@example.com"
|
||||
|
||||
[database]
|
||||
# Directory for SQLite files. Defaults to the current directory. Can
|
||||
# also be specified (and overriden) with the "--db dirname" command
|
||||
# line option.
|
||||
data_directory = "."
|
||||
|
||||
[network]
|
||||
# Bind to this network address
|
||||
address = "0.0.0.0"
|
||||
@@ -10,23 +32,27 @@ port = 8080
|
||||
# Reject events that have timestamps greater than this many seconds in
|
||||
# the future. Defaults to rejecting anything greater than 30 minutes
|
||||
# from the current time.
|
||||
#reject_future_seconds = 1800
|
||||
reject_future_seconds = 1800
|
||||
|
||||
[limits]
|
||||
# Limit events created per second, averaged over one minute. Must be
|
||||
# an integer. If not set (or set to 0), defaults to unlimited.
|
||||
messages_per_sec = 0
|
||||
|
||||
# Maximum WebSocket message in bytes. Defaults to 128k.
|
||||
#max_ws_message_bytes = 131072
|
||||
# Limit the maximum size of an EVENT message. Defaults to 128 KB.
|
||||
# Set to 0 for unlimited.
|
||||
max_event_bytes = 131072
|
||||
|
||||
# Maximum WebSocket frame size in bytes. Defaults to 128k.
|
||||
#max_ws_frame_bytes = 131072
|
||||
# Maximum WebSocket message in bytes. Defaults to 128 KB.
|
||||
max_ws_message_bytes = 131072
|
||||
|
||||
# Maximum WebSocket frame size in bytes. Defaults to 128 KB.
|
||||
max_ws_frame_bytes = 131072
|
||||
|
||||
# Broadcast buffer size, in number of events. This prevents slow
|
||||
# readers from consuming memory. Defaults to 4096.
|
||||
#broadcast_buffer = 4096
|
||||
broadcast_buffer = 4096
|
||||
|
||||
# Event persistence buffer size, in number of events. This provides
|
||||
# backpressure to senders if writes are slow. Defaults to 16.
|
||||
#event_persist_buffer = 16
|
||||
event_persist_buffer = 16
|
||||
|
@@ -8,6 +8,22 @@ lazy_static! {
|
||||
pub static ref SETTINGS: RwLock<Settings> = RwLock::new(Settings::default());
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[allow(unused)]
|
||||
pub struct Info {
|
||||
pub relay_url: Option<String>,
|
||||
pub name: Option<String>,
|
||||
pub description: Option<String>,
|
||||
pub pubkey: Option<String>,
|
||||
pub email: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[allow(unused)]
|
||||
pub struct Database {
|
||||
pub data_directory: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[allow(unused)]
|
||||
pub struct Network {
|
||||
@@ -36,7 +52,7 @@ pub struct Retention {
|
||||
#[allow(unused)]
|
||||
pub struct Limits {
|
||||
pub messages_per_sec: Option<u32>, // Artificially slow down event writing to limit disk consumption (averaged over 1 minute)
|
||||
pub max_event_bytes: Option<usize>,
|
||||
pub max_event_bytes: Option<usize>, // Maximum size of an EVENT message
|
||||
pub max_ws_message_bytes: Option<usize>,
|
||||
pub max_ws_frame_bytes: Option<usize>,
|
||||
pub broadcast_buffer: usize, // events to buffer for subscribers (prevents slow readers from consuming memory)
|
||||
@@ -46,6 +62,8 @@ pub struct Limits {
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[allow(unused)]
|
||||
pub struct Settings {
|
||||
pub info: Info,
|
||||
pub database: Database,
|
||||
pub network: Network,
|
||||
pub limits: Limits,
|
||||
pub retention: Retention,
|
||||
@@ -82,6 +100,16 @@ impl Settings {
|
||||
impl Default for Settings {
|
||||
fn default() -> Self {
|
||||
Settings {
|
||||
info: Info {
|
||||
relay_url: None,
|
||||
name: Some("Unnamed nostr-rs-relay".to_owned()),
|
||||
description: None,
|
||||
pubkey: None,
|
||||
email: None,
|
||||
},
|
||||
database: Database {
|
||||
data_directory: ".".to_owned(),
|
||||
},
|
||||
network: Network {
|
||||
port: 8080,
|
||||
address: "0.0.0.0".to_owned(),
|
||||
|
97
src/db.rs
97
src/db.rs
@@ -13,6 +13,7 @@ use rusqlite::OpenFlags;
|
||||
use crate::config::SETTINGS;
|
||||
use std::path::Path;
|
||||
use std::thread;
|
||||
use std::time::Instant;
|
||||
use tokio::task;
|
||||
|
||||
/// Database file
|
||||
@@ -122,14 +123,18 @@ pub async fn db_writer(
|
||||
mut shutdown: tokio::sync::broadcast::Receiver<()>,
|
||||
) -> tokio::task::JoinHandle<Result<()>> {
|
||||
task::spawn_blocking(move || {
|
||||
// get database configuration settings
|
||||
let config = SETTINGS.read().unwrap();
|
||||
let db_dir = &config.database.data_directory;
|
||||
let full_path = Path::new(db_dir).join(DB_FILE);
|
||||
// create a connection
|
||||
let mut conn = Connection::open_with_flags(
|
||||
Path::new(DB_FILE),
|
||||
&full_path,
|
||||
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
|
||||
)?;
|
||||
info!("opened database for writing");
|
||||
info!("opened database {:?} for writing", full_path);
|
||||
upgrade_db(&mut conn)?;
|
||||
// get rate limit settings
|
||||
let config = SETTINGS.read().unwrap();
|
||||
let rps_setting = config.limits.messages_per_sec;
|
||||
let mut lim_opt = None;
|
||||
let clock = governor::clock::QuantaClock::default();
|
||||
@@ -141,7 +146,7 @@ pub async fn db_writer(
|
||||
}
|
||||
}
|
||||
loop {
|
||||
if let Ok(_) = shutdown.try_recv() {
|
||||
if shutdown.try_recv().is_ok() {
|
||||
info!("shutting down database writer");
|
||||
break;
|
||||
}
|
||||
@@ -153,12 +158,17 @@ pub async fn db_writer(
|
||||
}
|
||||
let mut event_write = false;
|
||||
let event = next_event.unwrap();
|
||||
let start = Instant::now();
|
||||
match write_event(&mut conn, &event) {
|
||||
Ok(updated) => {
|
||||
if updated == 0 {
|
||||
debug!("ignoring duplicate event");
|
||||
} else {
|
||||
info!("persisted event: {}", event.get_event_id_prefix());
|
||||
info!(
|
||||
"persisted event: {} in {:?}",
|
||||
event.get_event_id_prefix(),
|
||||
start.elapsed()
|
||||
);
|
||||
event_write = true;
|
||||
// send this out to all clients
|
||||
bcast_tx.send(event.clone()).ok();
|
||||
@@ -298,35 +308,52 @@ fn query_from_sub(sub: &Subscription) -> String {
|
||||
filter_components.push(authors_clause);
|
||||
}
|
||||
// Query for Kind
|
||||
if f.kind.is_some() {
|
||||
if let Some(ks) = &f.kinds {
|
||||
// kind is number, no escaping needed
|
||||
let kind_clause = format!("kind = {}", f.kind.unwrap());
|
||||
let str_kinds: Vec<String> = ks.iter().map(|x| x.to_string()).collect();
|
||||
let kind_clause = format!("kind IN ({})", str_kinds.join(", "));
|
||||
filter_components.push(kind_clause);
|
||||
}
|
||||
// Query for event
|
||||
if f.id.is_some() {
|
||||
let id_str = f.id.as_ref().unwrap();
|
||||
if is_hex(id_str) {
|
||||
let id_clause = format!("event_hash = x'{}'", id_str);
|
||||
filter_components.push(id_clause);
|
||||
}
|
||||
if f.ids.is_some() {
|
||||
let ids_escaped: Vec<String> = f
|
||||
.ids
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.filter(|&x| is_hex(x))
|
||||
.map(|x| format!("x'{}'", x))
|
||||
.collect();
|
||||
let id_clause = format!("event_hash IN ({})", ids_escaped.join(", "));
|
||||
filter_components.push(id_clause);
|
||||
}
|
||||
// Query for referenced event
|
||||
if f.event.is_some() {
|
||||
let ev_str = f.event.as_ref().unwrap();
|
||||
if is_hex(ev_str) {
|
||||
let ev_clause = format!("referenced_event = x'{}'", ev_str);
|
||||
filter_components.push(ev_clause);
|
||||
}
|
||||
if f.events.is_some() {
|
||||
let events_escaped: Vec<String> = f
|
||||
.events
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.filter(|&x| is_hex(x))
|
||||
.map(|x| format!("x'{}'", x))
|
||||
.collect();
|
||||
let events_clause = format!("referenced_event IN ({})", events_escaped.join(", "));
|
||||
filter_components.push(events_clause);
|
||||
}
|
||||
// Query for referenced pet name pubkey
|
||||
if f.pubkey.is_some() {
|
||||
let pet_str = f.pubkey.as_ref().unwrap();
|
||||
if is_hex(pet_str) {
|
||||
let pet_clause = format!("referenced_pubkey = x'{}'", pet_str);
|
||||
filter_components.push(pet_clause);
|
||||
}
|
||||
// Query for referenced pubkey
|
||||
if f.pubkeys.is_some() {
|
||||
let pubkeys_escaped: Vec<String> = f
|
||||
.pubkeys
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.filter(|&x| is_hex(x))
|
||||
.map(|x| format!("x'{}'", x))
|
||||
.collect();
|
||||
let pubkeys_clause = format!("referenced_pubkey IN ({})", pubkeys_escaped.join(", "));
|
||||
filter_components.push(pubkeys_clause);
|
||||
}
|
||||
|
||||
// Query for timestamp
|
||||
if f.since.is_some() {
|
||||
let created_clause = format!("created_at > {}", f.since.unwrap());
|
||||
@@ -346,7 +373,7 @@ fn query_from_sub(sub: &Subscription) -> String {
|
||||
filter_clauses.push(fc);
|
||||
} else {
|
||||
// never display hidden events
|
||||
filter_clauses.push("hidden!=FALSE".to_owned());
|
||||
filter_clauses.push("hidden!=TRUE".to_owned());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -373,11 +400,16 @@ pub async fn db_query(
|
||||
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
|
||||
) {
|
||||
task::spawn_blocking(move || {
|
||||
let config = SETTINGS.read().unwrap();
|
||||
let db_dir = &config.database.data_directory;
|
||||
let full_path = Path::new(db_dir).join(DB_FILE);
|
||||
|
||||
let conn =
|
||||
Connection::open_with_flags(Path::new(DB_FILE), OpenFlags::SQLITE_OPEN_READ_ONLY)
|
||||
.unwrap();
|
||||
Connection::open_with_flags(&full_path, OpenFlags::SQLITE_OPEN_READ_ONLY).unwrap();
|
||||
debug!("opened database for reading");
|
||||
debug!("going to query for: {:?}", sub);
|
||||
let mut row_count: usize = 0;
|
||||
let start = Instant::now();
|
||||
// generate SQL query
|
||||
let q = query_from_sub(&sub);
|
||||
// execute the query
|
||||
@@ -389,6 +421,7 @@ pub async fn db_query(
|
||||
debug!("query aborted");
|
||||
return;
|
||||
}
|
||||
row_count += 1;
|
||||
// TODO: check before unwrapping
|
||||
let event_json = row.get(0).unwrap();
|
||||
query_tx
|
||||
@@ -398,6 +431,10 @@ pub async fn db_query(
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
debug!("query completed");
|
||||
debug!(
|
||||
"query completed ({} rows) in {:?}",
|
||||
row_count,
|
||||
start.elapsed()
|
||||
);
|
||||
});
|
||||
}
|
||||
|
@@ -21,6 +21,8 @@ pub enum Error {
|
||||
CloseParseFailed,
|
||||
#[error("Event validation failed")]
|
||||
EventInvalid,
|
||||
#[error("Event too large")]
|
||||
EventMaxLengthError(usize),
|
||||
#[error("Subscription identifier max length exceeded")]
|
||||
SubIdMaxLengthError,
|
||||
#[error("Maximum concurrent subscription count reached")]
|
||||
@@ -36,6 +38,8 @@ pub enum Error {
|
||||
SqlError(rusqlite::Error),
|
||||
#[error("Config error")]
|
||||
ConfigError(config::ConfigError),
|
||||
#[error("Data directory does not exist")]
|
||||
DatabaseDirError,
|
||||
}
|
||||
|
||||
impl From<rusqlite::Error> for Error {
|
||||
|
22
src/event.rs
22
src/event.rs
@@ -3,14 +3,19 @@ use crate::config;
|
||||
use crate::error::Error::*;
|
||||
use crate::error::Result;
|
||||
use bitcoin_hashes::{sha256, Hash};
|
||||
use lazy_static::lazy_static;
|
||||
use log::*;
|
||||
use secp256k1::{schnorrsig, Secp256k1};
|
||||
use secp256k1::{schnorr, Secp256k1, VerifyOnly, XOnlyPublicKey};
|
||||
use serde::{Deserialize, Deserializer, Serialize};
|
||||
use serde_json::value::Value;
|
||||
use serde_json::Number;
|
||||
use std::str::FromStr;
|
||||
use std::time::SystemTime;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref SECP: Secp256k1<VerifyOnly> = Secp256k1::verification_only();
|
||||
}
|
||||
|
||||
/// Event command in network format
|
||||
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
|
||||
pub struct EventCmd {
|
||||
@@ -109,12 +114,15 @@ impl Event {
|
||||
return false;
|
||||
}
|
||||
// * validate the message digest (sig) using the pubkey & computed sha256 message hash.
|
||||
let secp = Secp256k1::new();
|
||||
let sig = schnorrsig::Signature::from_str(&self.sig).unwrap();
|
||||
let message = secp256k1::Message::from(digest);
|
||||
let pubkey = schnorrsig::PublicKey::from_str(&self.pubkey).unwrap();
|
||||
let verify = secp.schnorrsig_verify(&sig, &message, &pubkey);
|
||||
matches!(verify, Ok(()))
|
||||
let sig = schnorr::Signature::from_str(&self.sig).unwrap();
|
||||
if let Ok(msg) = secp256k1::Message::from_slice(digest.as_ref()) {
|
||||
let pubkey = XOnlyPublicKey::from_str(&self.pubkey).unwrap();
|
||||
let verify = SECP.verify_schnorr(&sig, &msg, &pubkey);
|
||||
matches!(verify, Ok(()))
|
||||
} else {
|
||||
warn!("Error converting digest to secp256k1 message");
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert event to canonical representation for signing.
|
||||
|
60
src/info.rs
Normal file
60
src/info.rs
Normal file
@@ -0,0 +1,60 @@
|
||||
use crate::config;
|
||||
/// Relay Info
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
const CARGO_PKG_VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION");
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[allow(unused)]
|
||||
pub struct RelayInfo {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub id: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub name: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub description: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub pubkey: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub email: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub supported_nips: Option<Vec<i64>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub software: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub version: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for RelayInfo {
|
||||
fn default() -> Self {
|
||||
RelayInfo {
|
||||
id: None,
|
||||
name: None,
|
||||
description: None,
|
||||
pubkey: None,
|
||||
email: None,
|
||||
supported_nips: Some(vec![1]),
|
||||
software: Some("https://git.sr.ht/~gheartsfield/nostr-rs-relay".to_owned()),
|
||||
version: CARGO_PKG_VERSION.map(|x| x.to_owned()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert an Info struct into Relay Info json string
|
||||
pub fn relay_info_json(info: &config::Info) -> String {
|
||||
// get a default RelayInfo
|
||||
let mut r = RelayInfo::default();
|
||||
// update fields from Info, if present
|
||||
r.id = info.relay_url.clone();
|
||||
r.name = info.name.clone();
|
||||
r.description = info.description.clone();
|
||||
r.pubkey = info.pubkey.clone();
|
||||
r.email = info.email.clone();
|
||||
r.to_json()
|
||||
}
|
||||
|
||||
impl RelayInfo {
|
||||
pub fn to_json(self) -> String {
|
||||
serde_json::to_string_pretty(&self).unwrap()
|
||||
}
|
||||
}
|
@@ -4,5 +4,6 @@ pub mod conn;
|
||||
pub mod db;
|
||||
pub mod error;
|
||||
pub mod event;
|
||||
pub mod info;
|
||||
pub mod protostream;
|
||||
pub mod subscription;
|
||||
|
200
src/main.rs
200
src/main.rs
@@ -1,6 +1,12 @@
|
||||
//! Server process
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
use hyper::header::ACCEPT;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::upgrade::Upgraded;
|
||||
use hyper::{
|
||||
header, server::conn::AddrStream, upgrade, Body, Request, Response, Server, StatusCode,
|
||||
};
|
||||
use log::*;
|
||||
use nostr_rs_relay::close::Close;
|
||||
use nostr_rs_relay::config;
|
||||
@@ -8,31 +14,159 @@ use nostr_rs_relay::conn;
|
||||
use nostr_rs_relay::db;
|
||||
use nostr_rs_relay::error::{Error, Result};
|
||||
use nostr_rs_relay::event::Event;
|
||||
use nostr_rs_relay::info::relay_info_json;
|
||||
use nostr_rs_relay::protostream;
|
||||
use nostr_rs_relay::protostream::NostrMessage::*;
|
||||
use nostr_rs_relay::protostream::NostrResponse::*;
|
||||
use std::collections::HashMap;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use std::convert::Infallible;
|
||||
use std::env;
|
||||
use std::net::SocketAddr;
|
||||
use std::path::Path;
|
||||
use tokio::runtime::Builder;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::broadcast::{Receiver, Sender};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
use tungstenite::handshake;
|
||||
use tungstenite::protocol::WebSocketConfig;
|
||||
|
||||
fn db_from_args(args: Vec<String>) -> Option<String> {
|
||||
if args.len() == 3 && args.get(1) == Some(&"--db".to_owned()) {
|
||||
return args.get(2).map(|x| x.to_owned());
|
||||
}
|
||||
None
|
||||
}
|
||||
async fn handle_web_request(
|
||||
mut request: Request<Body>,
|
||||
remote_addr: SocketAddr,
|
||||
broadcast: Sender<Event>,
|
||||
event_tx: tokio::sync::mpsc::Sender<Event>,
|
||||
shutdown: Receiver<()>,
|
||||
) -> Result<Response<Body>, Infallible> {
|
||||
match (
|
||||
request.uri().path(),
|
||||
request.headers().contains_key(header::UPGRADE),
|
||||
) {
|
||||
// Request for / as websocket
|
||||
("/", true) => {
|
||||
debug!("websocket with upgrade request");
|
||||
//assume request is a handshake, so create the handshake response
|
||||
let response = match handshake::server::create_response_with_body(&request, || {
|
||||
Body::empty()
|
||||
}) {
|
||||
Ok(response) => {
|
||||
//in case the handshake response creation succeeds,
|
||||
//spawn a task to handle the websocket connection
|
||||
tokio::spawn(async move {
|
||||
//using the hyper feature of upgrading a connection
|
||||
match upgrade::on(&mut request).await {
|
||||
//if successfully upgraded
|
||||
Ok(upgraded) => {
|
||||
//create a websocket stream from the upgraded object
|
||||
let ws_stream = WebSocketStream::from_raw_socket(
|
||||
//pass the upgraded object
|
||||
//as the base layer stream of the Websocket
|
||||
upgraded,
|
||||
tokio_tungstenite::tungstenite::protocol::Role::Server,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
tokio::spawn(nostr_server(
|
||||
ws_stream, broadcast, event_tx, shutdown,
|
||||
));
|
||||
}
|
||||
Err(e) => println!(
|
||||
"error when trying to upgrade connection \
|
||||
from address {} to websocket connection. \
|
||||
Error is: {}",
|
||||
remote_addr, e
|
||||
),
|
||||
}
|
||||
});
|
||||
//return the response to the handshake request
|
||||
response
|
||||
}
|
||||
Err(error) => {
|
||||
warn!("websocket response failed");
|
||||
let mut res =
|
||||
Response::new(Body::from(format!("Failed to create websocket: {}", error)));
|
||||
*res.status_mut() = StatusCode::BAD_REQUEST;
|
||||
return Ok(res);
|
||||
}
|
||||
};
|
||||
Ok::<_, Infallible>(response)
|
||||
}
|
||||
// Request for Relay info
|
||||
("/", false) => {
|
||||
// handle request at root with no upgrade header
|
||||
// Check if this is a nostr server info request
|
||||
let accept_header = &request.headers().get(ACCEPT);
|
||||
// check if application/nostr+json is included
|
||||
if let Some(media_types) = accept_header {
|
||||
if let Ok(mt_str) = media_types.to_str() {
|
||||
if mt_str.contains("application/nostr+json") {
|
||||
let config = config::SETTINGS.read().unwrap();
|
||||
// build a relay info response
|
||||
debug!("Responding to server info request");
|
||||
let b = Body::from(relay_info_json(&config.info));
|
||||
return Ok(Response::builder()
|
||||
.status(200)
|
||||
.header("Content-Type", "application/nostr+json")
|
||||
.body(b)
|
||||
.unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Response::new(Body::from(
|
||||
"Please use a Nostr client to connect.",
|
||||
)))
|
||||
}
|
||||
(_, _) => {
|
||||
//handle any other url
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::NOT_FOUND)
|
||||
.body(Body::from("Nothing here."))
|
||||
.unwrap())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn shutdown_signal() {
|
||||
// Wait for the CTRL+C signal
|
||||
tokio::signal::ctrl_c()
|
||||
.await
|
||||
.expect("failed to install CTRL+C signal handler");
|
||||
}
|
||||
|
||||
/// Start running a Nostr relay server.
|
||||
fn main() -> Result<(), Error> {
|
||||
// setup logger
|
||||
let _ = env_logger::try_init();
|
||||
// get database directory from args
|
||||
let args: Vec<String> = env::args().collect();
|
||||
let db_dir: Option<String> = db_from_args(args);
|
||||
{
|
||||
let mut settings = config::SETTINGS.write().unwrap();
|
||||
// replace default settings with those read from config.toml
|
||||
let c = config::Settings::new();
|
||||
let mut c = config::Settings::new();
|
||||
// update with database location
|
||||
if let Some(db) = db_dir {
|
||||
c.database.data_directory = db;
|
||||
}
|
||||
*settings = c;
|
||||
}
|
||||
|
||||
let config = config::SETTINGS.read().unwrap();
|
||||
// do some config validation.
|
||||
if !Path::new(&config.database.data_directory).is_dir() {
|
||||
error!("Database directory does not exist");
|
||||
return Err(Error::DatabaseDirError);
|
||||
}
|
||||
debug!("config: {:?}", config);
|
||||
let addr = format!("{}:{}", config.network.address.trim(), config.network.port);
|
||||
let socket_addr = addr.parse().expect("listening address not valid");
|
||||
// configure tokio runtime
|
||||
let rt = Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
@@ -42,8 +176,7 @@ fn main() -> Result<(), Error> {
|
||||
// start tokio
|
||||
rt.block_on(async {
|
||||
let settings = config::SETTINGS.read().unwrap();
|
||||
let listener = TcpListener::bind(&addr).await.expect("Failed to bind");
|
||||
info!("listening on: {}", addr);
|
||||
info!("listening on: {}", socket_addr);
|
||||
// all client-submitted valid events are broadcast to every
|
||||
// other client on this channel. This should be large enough
|
||||
// to accomodate slower readers (messages are dropped if
|
||||
@@ -56,7 +189,7 @@ fn main() -> Result<(), Error> {
|
||||
// requested server shutdown.
|
||||
let (invoke_shutdown, _) = broadcast::channel::<()>(1);
|
||||
let ctrl_c_shutdown = invoke_shutdown.clone();
|
||||
// listen for ctrl-c interruupts
|
||||
// // listen for ctrl-c interruupts
|
||||
tokio::spawn(async move {
|
||||
tokio::signal::ctrl_c().await.unwrap();
|
||||
info!("shutting down due to SIGINT");
|
||||
@@ -66,28 +199,35 @@ fn main() -> Result<(), Error> {
|
||||
// writing events, and for publishing events that have been
|
||||
// written (to all connected clients).
|
||||
db::db_writer(event_rx, bcast_tx.clone(), invoke_shutdown.subscribe()).await;
|
||||
|
||||
// track unique client connection count
|
||||
let mut client_accept_count: usize = 0;
|
||||
let mut stop_listening = invoke_shutdown.subscribe();
|
||||
// handle new client connection requests, or SIGINT signals.
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = stop_listening.recv() => {
|
||||
break;
|
||||
}
|
||||
Ok((stream, _)) = listener.accept() => {
|
||||
client_accept_count += 1;
|
||||
info!("creating new connection for client #{}",client_accept_count);
|
||||
tokio::spawn(nostr_server(
|
||||
stream,
|
||||
bcast_tx.clone(),
|
||||
event_tx.clone(),
|
||||
invoke_shutdown.subscribe(),
|
||||
));
|
||||
}
|
||||
info!("db writer created");
|
||||
// A `Service` is needed for every connection, so this
|
||||
// creates one from our `handle_request` function.
|
||||
let make_svc = make_service_fn(|conn: &AddrStream| {
|
||||
let remote_addr = conn.remote_addr();
|
||||
let bcast = bcast_tx.clone();
|
||||
let event = event_tx.clone();
|
||||
let stop = invoke_shutdown.clone();
|
||||
async move {
|
||||
// service_fn converts our function into a `Service`
|
||||
Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {
|
||||
handle_web_request(
|
||||
request,
|
||||
remote_addr,
|
||||
bcast.clone(),
|
||||
event.clone(),
|
||||
stop.subscribe(),
|
||||
)
|
||||
}))
|
||||
}
|
||||
});
|
||||
let server = Server::bind(&socket_addr)
|
||||
.serve(make_svc)
|
||||
.with_graceful_shutdown(shutdown_signal());
|
||||
// run hyper
|
||||
if let Err(e) = server.await {
|
||||
eprintln!("server error: {}", e);
|
||||
}
|
||||
// our code
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
@@ -95,7 +235,7 @@ fn main() -> Result<(), Error> {
|
||||
/// Handle new client connections. This runs through an event loop
|
||||
/// for all client communication.
|
||||
async fn nostr_server(
|
||||
stream: TcpStream,
|
||||
ws_stream: WebSocketStream<Upgraded>,
|
||||
broadcast: Sender<Event>,
|
||||
event_tx: tokio::sync::mpsc::Sender<Event>,
|
||||
mut shutdown: Receiver<()>,
|
||||
@@ -109,8 +249,8 @@ async fn nostr_server(
|
||||
config.max_frame_size = settings.limits.max_ws_frame_bytes;
|
||||
}
|
||||
// upgrade the TCP connection to WebSocket
|
||||
let conn = tokio_tungstenite::accept_async_with_config(stream, Some(config)).await;
|
||||
let ws_stream = conn.expect("websocket handshake error");
|
||||
//let conn = tokio_tungstenite::accept_async_with_config(stream, Some(config)).await;
|
||||
//let ws_stream = conn.expect("websocket handshake error");
|
||||
// wrap websocket into a stream & sink of Nostr protocol messages
|
||||
let mut nostr_stream = protostream::wrap_ws_in_nostr(ws_stream);
|
||||
// Track internal client state
|
||||
@@ -230,6 +370,10 @@ async fn nostr_server(
|
||||
debug!("got connection close/error, disconnecting client: {}",cid);
|
||||
break;
|
||||
}
|
||||
Some(Err(Error::EventMaxLengthError(s))) => {
|
||||
info!("client {} sent event larger ({} bytes) than max size", cid, s);
|
||||
nostr_stream.send(NoticeRes("event exceeded max size".to_owned())).await.ok();
|
||||
},
|
||||
Some(Err(e)) => {
|
||||
info!("got non-fatal error from client: {}, error: {:?}", cid, e);
|
||||
},
|
||||
|
@@ -1,5 +1,6 @@
|
||||
//! Nostr protocol layered over WebSocket
|
||||
use crate::close::CloseCmd;
|
||||
use crate::config;
|
||||
use crate::error::{Error, Result};
|
||||
use crate::event::EventCmd;
|
||||
use crate::subscription::Subscription;
|
||||
@@ -8,9 +9,9 @@ use futures::sink::Sink;
|
||||
use futures::stream::Stream;
|
||||
use futures::task::Context;
|
||||
use futures::task::Poll;
|
||||
use hyper::upgrade::Upgraded;
|
||||
use log::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
use tungstenite::error::Error as WsError;
|
||||
use tungstenite::protocol::Message;
|
||||
@@ -39,11 +40,11 @@ pub enum NostrResponse {
|
||||
|
||||
/// A Nostr protocol stream is layered on top of a Websocket stream.
|
||||
pub struct NostrStream {
|
||||
ws_stream: WebSocketStream<TcpStream>,
|
||||
ws_stream: WebSocketStream<Upgraded>,
|
||||
}
|
||||
|
||||
/// Given a websocket, return a protocol stream wrapper.
|
||||
pub fn wrap_ws_in_nostr(ws: WebSocketStream<TcpStream>) -> NostrStream {
|
||||
pub fn wrap_ws_in_nostr(ws: WebSocketStream<Upgraded>) -> NostrStream {
|
||||
NostrStream { ws_stream: ws }
|
||||
}
|
||||
|
||||
@@ -51,16 +52,26 @@ pub fn wrap_ws_in_nostr(ws: WebSocketStream<TcpStream>) -> NostrStream {
|
||||
impl Stream for NostrStream {
|
||||
type Item = Result<NostrMessage>;
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
// get the configuration
|
||||
/// Convert Message to NostrMessage
|
||||
fn convert(msg: String) -> Result<NostrMessage> {
|
||||
debug!("raw msg: {}", msg);
|
||||
let event_size = msg.len();
|
||||
debug!("event size is {} bytes", event_size);
|
||||
let config = config::SETTINGS.read().unwrap();
|
||||
let parsed_res: Result<NostrMessage> = serde_json::from_str(&msg).map_err(|e| e.into());
|
||||
match parsed_res {
|
||||
Ok(m) => Ok(m),
|
||||
Ok(m) => {
|
||||
if let NostrMessage::EventMsg(_) = m {
|
||||
if let Some(max_size) = config.limits.max_event_bytes {
|
||||
// check length, ensure that some max size is set.
|
||||
if msg.len() > max_size && max_size > 0 {
|
||||
return Err(Error::EventMaxLengthError(msg.len()));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(m)
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("proto parse error: {:?}", e);
|
||||
debug!("parse error on message: {}", msg.trim());
|
||||
Err(Error::ProtoParseError)
|
||||
}
|
||||
}
|
||||
|
@@ -2,6 +2,7 @@
|
||||
use crate::error::Result;
|
||||
use crate::event::Event;
|
||||
use serde::{Deserialize, Deserializer, Serialize};
|
||||
use std::collections::HashSet;
|
||||
|
||||
/// Subscription identifier and set of request filters
|
||||
#[derive(Serialize, PartialEq, Debug, Clone)]
|
||||
@@ -17,16 +18,16 @@ pub struct Subscription {
|
||||
/// absent ([`None`]) if it should be ignored.
|
||||
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
|
||||
pub struct ReqFilter {
|
||||
/// Event hash
|
||||
pub id: Option<String>,
|
||||
/// Event kind
|
||||
pub kind: Option<u64>,
|
||||
/// Event hashes
|
||||
pub ids: Option<Vec<String>>,
|
||||
/// Event kinds
|
||||
pub kinds: Option<Vec<u64>>,
|
||||
/// Referenced event hash
|
||||
#[serde(rename = "#e")]
|
||||
pub event: Option<String>,
|
||||
pub events: Option<Vec<String>>,
|
||||
/// Referenced public key for a petname
|
||||
#[serde(rename = "#p")]
|
||||
pub pubkey: Option<String>,
|
||||
pub pubkeys: Option<Vec<String>>,
|
||||
/// Events published after this time
|
||||
pub since: Option<u64>,
|
||||
/// Events published before this time
|
||||
@@ -105,8 +106,13 @@ impl Subscription {
|
||||
|
||||
impl ReqFilter {
|
||||
/// Check for a match within the authors list.
|
||||
// TODO: Ambiguity; what if the array is empty? Should we
|
||||
// consider that the same as null?
|
||||
fn ids_match(&self, event: &Event) -> bool {
|
||||
self.ids
|
||||
.as_ref()
|
||||
.map(|vs| vs.contains(&event.id.to_owned()))
|
||||
.unwrap_or(true)
|
||||
}
|
||||
|
||||
fn authors_match(&self, event: &Event) -> bool {
|
||||
self.authors
|
||||
.as_ref()
|
||||
@@ -115,29 +121,47 @@ impl ReqFilter {
|
||||
}
|
||||
/// Check if this filter either matches, or does not care about the event tags.
|
||||
fn event_match(&self, event: &Event) -> bool {
|
||||
self.event
|
||||
.as_ref()
|
||||
.map(|t| event.event_tag_match(t))
|
||||
.unwrap_or(true)
|
||||
// This needs to be analyzed for performance; building these
|
||||
// hash sets for each active subscription isn't great.
|
||||
if let Some(es) = &self.events {
|
||||
let event_refs =
|
||||
HashSet::<_>::from_iter(event.get_event_tags().iter().map(|x| x.to_owned()));
|
||||
let filter_refs = HashSet::<_>::from_iter(es.iter().map(|x| &x[..]));
|
||||
let cardinality = event_refs.intersection(&filter_refs).count();
|
||||
cardinality > 0
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if this filter either matches, or does not care about
|
||||
/// the pubkey/petname tags.
|
||||
fn pubkey_match(&self, event: &Event) -> bool {
|
||||
self.pubkey
|
||||
.as_ref()
|
||||
.map(|t| event.pubkey_tag_match(t))
|
||||
.unwrap_or(true)
|
||||
// This needs to be analyzed for performance; building these
|
||||
// hash sets for each active subscription isn't great.
|
||||
if let Some(ps) = &self.pubkeys {
|
||||
let pubkey_refs =
|
||||
HashSet::<_>::from_iter(event.get_pubkey_tags().iter().map(|x| x.to_owned()));
|
||||
let filter_refs = HashSet::<_>::from_iter(ps.iter().map(|x| &x[..]));
|
||||
let cardinality = pubkey_refs.intersection(&filter_refs).count();
|
||||
cardinality > 0
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if this filter either matches, or does not care about the kind.
|
||||
fn kind_match(&self, kind: u64) -> bool {
|
||||
self.kind.map(|v| v == kind).unwrap_or(true)
|
||||
self.kinds
|
||||
.as_ref()
|
||||
.map(|ks| ks.contains(&kind))
|
||||
.unwrap_or(true)
|
||||
}
|
||||
|
||||
/// Determine if all populated fields in this filter match the provided event.
|
||||
pub fn interested_in_event(&self, event: &Event) -> bool {
|
||||
self.id.as_ref().map(|v| v == &event.id).unwrap_or(true)
|
||||
// self.id.as_ref().map(|v| v == &event.id).unwrap_or(true)
|
||||
self.ids_match(event)
|
||||
&& self.since.map(|t| event.created_at > t).unwrap_or(true)
|
||||
&& self.kind_match(event.kind)
|
||||
&& self.authors_match(event)
|
||||
|
Reference in New Issue
Block a user