Compare commits

...

47 Commits

Author SHA1 Message Date
Greg Heartsfield
e8557d421b build: bump version to 0.7.5 2022-12-16 17:21:00 -06:00
Greg Heartsfield
7ca9c864f2 improvement: DB pool logging shows used connections directly 2022-12-16 17:01:49 -06:00
Greg Heartsfield
838aafd079 improvement: consistent log messages for client/sub ids 2022-12-16 15:22:27 -06:00
Greg Heartsfield
e554b10ac2 improvement: tweak sub/sql logging for slow queries 2022-12-16 14:55:45 -06:00
Greg Heartsfield
b0bfaa48fc improvement: ignore duplicate REQ messages 2022-12-16 14:37:02 -06:00
Greg Heartsfield
2e9b1b6ba7 docs: comment reason for force_no_match 2022-12-16 14:35:21 -06:00
Greg Heartsfield
4d9012d94c improvement: upgrade docker builder and base images 2022-12-16 14:33:08 -06:00
Greg Heartsfield
ffe7aac066 improvement: upgrade multiple dependencies
Updating aho-corasick v0.7.19 -> v0.7.20
Updating async-trait v0.1.58 -> v0.1.59
Updating axum v0.5.17 -> v0.6.1
Updating axum-core v0.2.9 -> v0.3.0
Updating bytes v1.2.1 -> v1.3.0
Updating cc v1.0.76 -> v1.0.78
Updating crossbeam-utils v0.8.12 -> v0.8.14
Updating cxx v1.0.82 -> v1.0.83
Updating cxx-build v1.0.82 -> v1.0.83
Updating cxxbridge-flags v1.0.82 -> v1.0.83
Updating cxxbridge-macro v1.0.82 -> v1.0.83
Updating flate2 v1.0.24 -> v1.0.25
Updating libc v0.2.137 -> v0.2.138
Updating matchit v0.5.0 -> v0.7.0
Updating miniz_oxide v0.5.4 -> v0.6.2
Updating openssl v0.10.42 -> v0.10.44
Updating openssl-sys v0.9.77 -> v0.9.79
Updating parking_lot_core v0.9.4 -> v0.9.5
Updating pest v2.4.1 -> v2.5.1
Updating pest_derive v2.4.1 -> v2.5.1
Updating pest_generator v2.4.1 -> v2.5.1
Updating pest_meta v2.4.1 -> v2.5.1
Updating prost v0.11.2 -> v0.11.3
Adding rustversion v1.0.9
Updating serde v1.0.147 -> v1.0.150
Updating serde_derive v1.0.147 -> v1.0.150
Updating serde_json v1.0.88 -> v1.0.89
Updating sha-1 v0.10.0 -> v0.10.1
Updating syn v1.0.103 -> v1.0.105
Updating tokio v1.22.0 -> v1.23.0
Updating tokio-macros v1.8.0 -> v1.8.2
Updating toml v0.5.9 -> v0.5.10
Updating tonic v0.8.2 -> v0.8.3
Updating tower-http v0.3.4 -> v0.3.5
Updating typenum v1.15.0 -> v1.16.0
2022-12-16 11:17:05 -06:00
Greg Heartsfield
f9695bd0a9 fix: db schema version updates correctly for v9 2022-12-16 10:01:49 -06:00
Greg Heartsfield
7c4bf5cc8f fix: run db migration for v9 2022-12-16 08:21:00 -06:00
Greg Heartsfield
e2de162931 feat: only show SQL in logs for slow queries unless tracing 2022-12-16 08:17:39 -06:00
Greg Heartsfield
4f606615eb perf: indexing improvement 2022-12-16 08:16:49 -06:00
Greg Heartsfield
84a58ebbcd build: bump version to 0.7.3 2022-12-16 06:32:00 -06:00
Greg Heartsfield
c48e45686d perf: schema updates for better event indexing 2022-12-15 08:48:35 -06:00
Greg Heartsfield
bbe359364a refactor: clippy warnings 2022-12-15 08:43:36 -06:00
Greg Heartsfield
9e9c494367 perf: significant query speedup when using kinds.
fixes: https://todo.sr.ht/~gheartsfield/nostr-rs-relay/51
2022-12-14 21:04:49 -06:00
Greg Heartsfield
5fa24bc9f1 fix: send EOSE when ids list is empty in subscriptions
Fixes: https://todo.sr.ht/~gheartsfield/nostr-rs-relay/50
2022-11-19 10:35:00 -06:00
Greg Heartsfield
4de7490d97 fix: send EOSE when authors list is empty in subscriptions
Fixes: https://todo.sr.ht/~gheartsfield/nostr-rs-relay/49
2022-11-19 10:00:38 -06:00
Greg Heartsfield
d0f63dc66e docs: update container instructions for rootless podman 2022-11-19 09:32:26 -06:00
Greg Heartsfield
06078648c8 build: bump version to 0.7.2 2022-11-19 07:55:52 -06:00
Greg Heartsfield
cc0fcc5d66 docs: add Cargo package metadata 2022-11-19 07:32:17 -06:00
Greg Heartsfield
dfb2096653 improvement: build auditable binary in docker 2022-11-19 07:11:39 -06:00
Greg Heartsfield
486508d192 improvement: upgrade multiple dependencies
Updating crates.io index
Updating cc v1.0.74 -> v1.0.76
Updating chrono v0.4.22 -> v0.4.23
Updating cxx v1.0.80 -> v1.0.82
Updating cxx-build v1.0.80 -> v1.0.82
Updating cxxbridge-flags v1.0.80 -> v1.0.82
Updating cxxbridge-macro v1.0.80 -> v1.0.82
Updating digest v0.10.5 -> v0.10.6
Updating hyper v0.14.22 -> v0.14.23
Updating indexmap v1.9.1 -> v1.9.2
Updating regex v1.6.0 -> v1.7.0
Updating regex-syntax v0.6.27 -> v0.6.28
Updating serde_json v1.0.87 -> v1.0.88
Updating tokio v1.21.2 -> v1.22.0
Updating uuid v1.2.1 -> v1.2.2
2022-11-19 06:52:06 -06:00
Greg Heartsfield
84b43c144b improvement: use locked cargo packages to build container images 2022-11-19 06:29:13 -06:00
Greg Heartsfield
110500bb46 feat(NIP-20): advertise support for NIP-20 in relay info/readme 2022-11-12 09:22:43 -06:00
Greg Heartsfield
83f6b11de7 refactor: clippy fix 2022-11-12 09:22:24 -06:00
William Casarin
6d1244434b feat(NIP-20): improve invalid event error messages
Instead of returning a NOTICE for invalid events, return a `OK false`
command result with a reason as to why the event is invalid.
2022-11-12 09:13:22 -06:00
William Casarin
5a91419d34 feat(NIP-20): send command results to clients
When submitting events to relays, clients currently have no way to know
if an event was successfully committed to the database. This NIP
introduces the concept of command results which are like NOTICE's except
provide more information about if an event was accepted or rejected.

A command result is a JSON object with the following structure that is
returned when an event is successfully saved to the database or
rejected:

	["OK", <event_id>, <true|false>, <message>]

nip20: https://github.com/nostr-protocol/nips/pull/62
2022-11-12 09:12:35 -06:00
William Casarin
7adc5c9af7 perf: dont create intermediate vecs when matching subs
Avoid creating intermediate vectors when matching subscriptions. We can
just iterate over the hashmap directly.
2022-11-09 07:30:43 -06:00
Greg Heartsfield
9dd4571bee refactor: reduce level of some common DB logs 2022-11-06 13:49:32 -06:00
Greg Heartsfield
9db5a26b9c refactor: more consistent logging messages 2022-11-05 16:11:20 -05:00
Greg Heartsfield
ac345b5744 refactor: do not quote server-generated client id in logs 2022-11-05 15:59:39 -05:00
Greg Heartsfield
675662c7fb improvement: upgrade docker builder and base images 2022-11-05 13:24:17 -05:00
Greg Heartsfield
505b0cb71f improvement: upgrade multiple dependencies
Updating anyhow v1.0.65 -> v1.0.66
Updating async-trait v0.1.57 -> v0.1.58
Updating axum v0.5.16 -> v0.5.17
Updating axum-core v0.2.8 -> v0.2.9
Updating base64 v0.13.0 -> v0.13.1
Updating bumpalo v3.11.0 -> v3.11.1
Updating cc v1.0.73 -> v1.0.74
Updating cxx v1.0.79 -> v1.0.80
Updating cxx-build v1.0.79 -> v1.0.80
Updating cxxbridge-flags v1.0.79 -> v1.0.80
Updating cxxbridge-macro v1.0.79 -> v1.0.80
Updating futures v0.3.24 -> v0.3.25
Updating futures-channel v0.3.24 -> v0.3.25
Updating futures-core v0.3.24 -> v0.3.25
Updating futures-executor v0.3.24 -> v0.3.25
Updating futures-io v0.3.24 -> v0.3.25
Updating futures-macro v0.3.24 -> v0.3.25
Updating futures-sink v0.3.24 -> v0.3.25
Updating futures-task v0.3.24 -> v0.3.25
Updating futures-util v0.3.24 -> v0.3.25
Updating getrandom v0.2.7 -> v0.2.8
Updating h2 v0.3.14 -> v0.3.15
Updating hyper v0.14.20 -> v0.14.22
Updating iana-time-zone v0.1.51 -> v0.1.53
Updating libc v0.2.135 -> v0.2.137
Updating mio v0.8.4 -> v0.8.5
Updating native-tls v0.2.10 -> v0.2.11
Updating num_cpus v1.13.1 -> v1.14.0
Updating once_cell v1.15.0 -> v1.16.0
Updating openssl-sys v0.9.76 -> v0.9.77
Updating parking_lot_core v0.9.3 -> v0.9.4
Updating pest v2.4.0 -> v2.4.1
Updating pest_derive v2.4.0 -> v2.4.1
Updating pest_generator v2.4.0 -> v2.4.1
Updating pest_meta v2.4.0 -> v2.4.1
Updating pkg-config v0.3.25 -> v0.3.26
Updating ppv-lite86 v0.2.16 -> v0.2.17
Updating prost v0.11.0 -> v0.11.2
Updating prost-derive v0.11.0 -> v0.11.2
Updating prost-types v0.11.1 -> v0.11.2
Updating serde v1.0.145 -> v1.0.147
Updating serde_derive v1.0.145 -> v1.0.147
Updating serde_json v1.0.86 -> v1.0.87
Updating syn v1.0.102 -> v1.0.103
  Adding windows-sys v0.42.0
  Adding windows_aarch64_gnullvm v0.42.0
  Adding windows_aarch64_msvc v0.42.0
  Adding windows_i686_gnu v0.42.0
  Adding windows_i686_msvc v0.42.0
  Adding windows_x86_64_gnu v0.42.0
  Adding windows_x86_64_gnullvm v0.42.0
  Adding windows_x86_64_msvc v0.42.0
2022-11-05 10:59:03 -05:00
Greg Heartsfield
e8aa450802 build: bump version to 0.7.1 2022-11-05 10:35:38 -05:00
Greg Heartsfield
5a8860bb09 feat: log user-agent if present 2022-11-05 10:29:25 -05:00
Greg Heartsfield
11e43eccf9 refactor: add unit to ping_interval config 2022-11-05 07:42:08 -05:00
William Casarin
50577b2dfa feat: add network.ping_interval setting
Add a ping interval setting that allows you to customize the websocket
ping interval. The default of 5 minutes may be too high for some proxy
servers that disconnect connections that are held open for too long.
2022-11-05 07:40:28 -05:00
William Casarin
a6cb6f8486 refactor: rename get_header_remote_ip -> get_header_string
This function has nothing to do with remote ips!
2022-11-05 07:37:18 -05:00
Greg Heartsfield
ae5bf98d87 feat: retrieve client IP from header in config.toml
If the config.toml has defined a HTTP header to look for a remote IP,
that will be logged.  Otherwise, the socket address IP will be used.

closes: https://todo.sr.ht/~gheartsfield/nostr-rs-relay/47
2022-11-04 18:05:01 -05:00
William Casarin
1cf9d719f0 feat: look for proxied ip headers
This enables support for using the proxied IP from cloudflare. The damus
relay is behind cloudflare, so to get accurate remote ip logging we need
to look at the headers instead of the socket address.

Signed-off-by: William Casarin <jb55@jb55.com>
2022-11-04 17:09:28 -05:00
William Casarin
311f4b5283 refactor: switch new connections to debug log
These are pretty spammy on busy relays. I've been using the info log to
monitor spam attacks, and these are the least useful info log.

Leave the "stopping connection" log because it at least provides useful
sent/received information.

Signed-off-by: William Casarin <jb55@jb55.com>
2022-11-04 07:59:53 -05:00
Greg Heartsfield
14b5a51e3a fix: log ephemeral events after send 2022-11-04 07:55:38 -05:00
Greg Heartsfield
8ecce3f566 feat: show client IP in logs 2022-11-02 18:33:44 -05:00
Greg Heartsfield
caffbbbede build: bump version to 0.7.0 2022-10-16 15:42:11 -05:00
Greg Heartsfield
81045ad3d0 improvement: upgrade multiple dependencies
Updating anyhow v1.0.64 -> v1.0.65
  Adding codespan-reporting v0.11.1
Updating const_format v0.2.28 -> v0.2.30
Updating const_format_proc_macros v0.2.22 -> v0.2.29
Updating crossbeam-utils v0.8.11 -> v0.8.12
  Adding cxx v1.0.79
  Adding cxx-build v1.0.79
  Adding cxxbridge-flags v1.0.79
  Adding cxxbridge-macro v1.0.79
Updating digest v0.10.3 -> v0.10.5
Updating hdrhistogram v7.5.1 -> v7.5.2
Updating iana-time-zone v0.1.50 -> v0.1.51
  Adding iana-time-zone-haiku v0.1.1
Updating itertools v0.10.3 -> v0.10.5
Updating itoa v1.0.3 -> v1.0.4
Updating js-sys v0.3.59 -> v0.3.60
Updating libc v0.2.132 -> v0.2.135
  Adding link-cplusplus v1.0.7
Updating lock_api v0.4.8 -> v0.4.9
Updating once_cell v1.14.0 -> v1.15.0
Updating openssl v0.10.41 -> v0.10.42
Updating openssl-sys v0.9.75 -> v0.9.76
Updating pest v2.3.0 -> v2.4.0
Updating pest_derive v2.3.0 -> v2.4.0
Updating pest_generator v2.3.0 -> v2.4.0
Updating pest_meta v2.3.0 -> v2.4.0
Updating proc-macro2 v1.0.43 -> v1.0.47
Updating rand_core v0.6.3 -> v0.6.4
Updating raw-cpuid v10.5.0 -> v10.6.0
  Adding scratch v1.0.2
Updating serde v1.0.144 -> v1.0.145
Updating serde_derive v1.0.144 -> v1.0.145
Updating serde_json v1.0.85 -> v1.0.86
  Adding sha1 v0.10.5
Updating smallvec v1.9.0 -> v1.10.0
Updating syn v1.0.99 -> v1.0.102
  Adding termcolor v1.1.3
Updating thiserror v1.0.34 -> v1.0.37
Updating thiserror-impl v1.0.34 -> v1.0.37
Updating tokio v1.21.0 -> v1.21.2
Updating tokio-stream v0.1.9 -> v0.1.11
Updating tonic v0.8.1 -> v0.8.2
Updating tower-layer v0.3.1 -> v0.3.2
Updating tracing v0.1.36 -> v0.1.37
Updating tracing-attributes v0.1.22 -> v0.1.23
Updating tracing-core v0.1.29 -> v0.1.30
Updating tracing-subscriber v0.3.15 -> v0.3.16
Updating unicode-ident v1.0.3 -> v1.0.5
Updating unicode-normalization v0.1.21 -> v0.1.22
  Adding unicode-width v0.1.10
Updating uuid v1.1.2 -> v1.2.1
Updating wasm-bindgen v0.2.82 -> v0.2.83
Updating wasm-bindgen-backend v0.2.82 -> v0.2.83
Updating wasm-bindgen-macro v0.2.82 -> v0.2.83
Updating wasm-bindgen-macro-support v0.2.82 -> v0.2.83
Updating wasm-bindgen-shared v0.2.82 -> v0.2.83
Updating web-sys v0.3.59 -> v0.3.60
  Adding winapi-util v0.1.5
2022-10-16 15:33:11 -05:00
Greg Heartsfield
72f8a1aa5c feat(NIP-26): allow searches for delegated public keys
Implements core NIP-26 delegated event functionality.  Events can
include a `delegation` tag that provides a signature and restrictions
on which events can be delegated.

Notable points on the implementation so far:

* Schema has been upgraded to include an index and new column.
* Basic rune parsing/evaluation to implement the example event in the
  NIP, but no more.
* No special logic for deletion.
* No migration logic for determining delegated authors for
  already-stored events.
2022-10-16 15:25:06 -05:00
18 changed files with 1384 additions and 341 deletions

562
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,15 @@
[package]
name = "nostr-rs-relay"
version = "0.6.2"
version = "0.7.5"
edition = "2021"
authors = ["Greg Heartsfield <scsibug@imap.cc>"]
description = "A relay implementation for the Nostr protocol"
readme = "README.md"
homepage = "https://sr.ht/~gheartsfield/nostr-rs-relay/"
repository = "https://git.sr.ht/~gheartsfield/nostr-rs-relay"
license = "MIT"
keywords = ["nostr", "server"]
categories = ["network-programming", "web-programming"]
[dependencies]
tracing = "0.1.36"
@@ -32,6 +40,7 @@ http = { version = "0.2" }
parse_duration = "2"
rand = "0.8"
const_format = "0.2.28"
regex = "1"
[dev-dependencies]
anyhow = "1"

View File

@@ -1,18 +1,24 @@
FROM docker.io/library/rust:1.64.0@sha256:5cf09a76cb9baf4990d121221bbad64927cc5690ee54f246487e302ddc2ba300 as builder
FROM docker.io/library/rust:1.66.0@sha256:359949280cebefe93ccb33089fe25111a3aadfe99eac4b6cbe8ec3e1b571dacb as builder
RUN USER=root cargo install cargo-auditable
RUN USER=root cargo new --bin nostr-rs-relay
WORKDIR ./nostr-rs-relay
COPY ./Cargo.toml ./Cargo.toml
COPY ./Cargo.lock ./Cargo.lock
RUN cargo build --release
# build dependencies only (caching)
RUN cargo auditable build --release --locked
# get rid of starter project code
RUN rm src/*.rs
# copy project source code
COPY ./src ./src
# build auditable release using locked deps
RUN rm ./target/release/deps/nostr*relay*
RUN cargo build --release
RUN cargo auditable build --release --locked
FROM docker.io/library/debian:bullseye-20221205-slim@sha256:25f10b4f1ded5341a3ca0a30290ff3cd5639415f0c5a2222d5e7d5dd72952aa1
FROM docker.io/library/debian:bullseye-20221004-slim@sha256:8b702518a671c926b5ece4efe386a476eb4777646a36d996d4bd50944f2f11a2
ARG APP=/usr/src/app
ARG APP_DATA=/usr/src/app/db
RUN apt-get update \

View File

@@ -1,8 +1,8 @@
# [nostr-rs-relay](https://git.sr.ht/~gheartsfield/nostr-rs-relay)
This is a [nostr](https://github.com/nostr-protocol/nostr) relay, written in
Rust. It currently supports the entire relay protocol, and has a
SQLite persistence layer.
This is a [nostr](https://github.com/nostr-protocol/nostr) relay,
written in Rust. It currently supports the entire relay protocol, and
persists data with SQLite.
The project master repository is available on
[sourcehut](https://sr.ht/~gheartsfield/nostr-rs-relay/), and is
@@ -26,7 +26,9 @@ mirrored on [GitHub](https://github.com/scsibug/nostr-rs-relay).
- [x] NIP-12: [Generic Tag Queries](https://github.com/nostr-protocol/nips/blob/master/12.md)
- [x] NIP-15: [End of Stored Events Notice](https://github.com/nostr-protocol/nips/blob/master/15.md)
- [x] NIP-16: [Event Treatment](https://github.com/nostr-protocol/nips/blob/master/16.md)
- [x] NIP-20: [Command Results](https://github.com/nostr-protocol/nips/blob/master/20.md)
- [x] NIP-22: [Event `created_at` limits](https://github.com/nostr-protocol/nips/blob/master/22.md) (_future-dated events only_)
- [x] NIP-26: [Event Delegation](https://github.com/nostr-protocol/nips/blob/master/26.md)
## Quick Start
@@ -35,15 +37,32 @@ application. Use a bind mount to store the SQLite database outside of
the container image, and map the container's 8080 port to a host port
(7000 in the example below).
The examples below start a rootless podman container, mapping a local
data directory and config file.
```console
$ docker build -t nostr-rs-relay .
$ podman build -t nostr-rs-relay .
$ docker run -it -p 7000:8080 \
--mount src=$(pwd)/data,target=/usr/src/app/db,type=bind nostr-rs-relay
$ mkdir data
[2021-12-31T19:58:31Z INFO nostr_rs_relay] listening on: 0.0.0.0:8080
[2021-12-31T19:58:31Z INFO nostr_rs_relay::db] opened database "/usr/src/app/db/nostr.db" for writing
[2021-12-31T19:58:31Z INFO nostr_rs_relay::db] DB version = 2
$ podman unshare chown 100:100 data
$ podman run -it --rm -p 7000:8080 \
--user=100:100 \
-v $(pwd)/data:/usr/src/app/db:Z \
-v $(pwd)/config.toml:/usr/src/app/config.toml:ro,Z \
--name nostr-relay nostr-rs-relay:latest
Nov 19 15:31:15.013 INFO nostr_rs_relay: Starting up from main
Nov 19 15:31:15.017 INFO nostr_rs_relay::server: listening on: 0.0.0.0:8080
Nov 19 15:31:15.019 INFO nostr_rs_relay::server: db writer created
Nov 19 15:31:15.019 INFO nostr_rs_relay::server: control message listener started
Nov 19 15:31:15.019 INFO nostr_rs_relay::db: Built a connection pool "event writer" (min=1, max=4)
Nov 19 15:31:15.019 INFO nostr_rs_relay::db: opened database "/usr/src/app/db/nostr.db" for writing
Nov 19 15:31:15.019 INFO nostr_rs_relay::schema: DB version = 0
Nov 19 15:31:15.054 INFO nostr_rs_relay::schema: database pragma/schema initialized to v7, and ready
Nov 19 15:31:15.054 INFO nostr_rs_relay::schema: All migration scripts completed successfully. Welcome to v7.
Nov 19 15:31:15.521 INFO nostr_rs_relay::db: Built a connection pool "client query" (min=4, max=128)
```
Use a `nostr` client such as

View File

@@ -46,6 +46,14 @@ address = "0.0.0.0"
# Listen on this port
port = 8080
# If present, read this HTTP header for logging client IP addresses.
# Examples for common proxies, cloudflare:
#remote_ip_header = "x-forwarded-for"
#remote_ip_header = "cf-connecting-ip"
# Websocket ping interval in seconds, defaults to 5 minutes
#ping_interval = 300
[options]
# Reject events that have timestamps greater than this many seconds in
# the future. Recommended to reject anything greater than 30 minutes

View File

@@ -28,9 +28,10 @@ pub struct Database {
pub struct Network {
pub port: u16,
pub address: String,
pub remote_ip_header: Option<String>, // retrieve client IP from this HTTP header if present
pub ping_interval_seconds: u32,
}
//
#[derive(Debug, Clone, Serialize, Deserialize)]
#[allow(unused)]
pub struct Options {
@@ -207,7 +208,9 @@ impl Default for Settings {
},
network: Network {
port: 8080,
ping_interval_seconds: 300,
address: "0.0.0.0".to_owned(),
remote_ip_header: None,
},
limits: Limits {
messages_per_sec: None,

View File

@@ -2,7 +2,6 @@
use crate::close::Close;
use crate::error::Error;
use crate::error::Result;
use crate::event::Event;
use crate::subscription::Subscription;
use std::collections::HashMap;
@@ -14,6 +13,8 @@ const MAX_SUBSCRIPTION_ID_LEN: usize = 256;
/// State for a client connection
pub struct ClientConn {
/// Client IP (either from socket, or configured proxy header
client_ip: String,
/// Unique client identifier generated at connection time
client_id: Uuid,
/// The current set of active client subscriptions
@@ -24,22 +25,27 @@ pub struct ClientConn {
impl Default for ClientConn {
fn default() -> Self {
Self::new()
Self::new("unknown".to_owned())
}
}
impl ClientConn {
/// Create a new, empty connection state.
#[must_use]
pub fn new() -> Self {
pub fn new(client_ip: String) -> Self {
let client_id = Uuid::new_v4();
ClientConn {
client_ip,
client_id,
subscriptions: HashMap::new(),
max_subs: 32,
}
}
pub fn subscriptions(&self) -> &HashMap<String, Subscription> {
&self.subscriptions
}
/// Get a short prefix of the client's unique identifier, suitable
/// for logging.
#[must_use]
@@ -47,16 +53,9 @@ impl ClientConn {
self.client_id.to_string().chars().take(8).collect()
}
/// Find all matching subscriptions.
#[must_use]
pub fn get_matching_subscriptions(&self, e: &Event) -> Vec<&str> {
let mut v: Vec<&str> = vec![];
for (id, sub) in &self.subscriptions {
if sub.interested_in_event(e) {
v.push(id);
}
}
v
pub fn ip(&self) -> &str {
&self.client_ip
}
/// Add a new subscription for this connection.
@@ -79,8 +78,12 @@ impl ClientConn {
// check if an existing subscription exists, and replace if so
if self.subscriptions.contains_key(&k) {
self.subscriptions.remove(&k);
self.subscriptions.insert(k, s);
debug!("replaced existing subscription");
self.subscriptions.insert(k, s.clone());
debug!(
"replaced existing subscription (cid: {}, sub: {:?})",
self.get_client_prefix(),
s.get_id()
);
return Ok(());
}
@@ -91,8 +94,9 @@ impl ClientConn {
// add subscription
self.subscriptions.insert(k, s);
debug!(
"registered new subscription, currently have {} active subs",
self.subscriptions.len()
"registered new subscription, currently have {} active subs (cid: {})",
self.subscriptions.len(),
self.get_client_prefix(),
);
Ok(())
}
@@ -102,9 +106,9 @@ impl ClientConn {
// TODO: return notice if subscription did not exist.
self.subscriptions.remove(&c.id);
debug!(
"removed subscription, currently have {} active subs (cid={})",
"removed subscription, currently have {} active subs (cid: {})",
self.subscriptions.len(),
self.client_id
self.get_client_prefix(),
);
}
}

136
src/db.rs
View File

@@ -6,6 +6,7 @@ use crate::event::{single_char_tagname, Event};
use crate::hexrange::hex_range;
use crate::hexrange::HexSearch;
use crate::nip05;
use crate::notice::Notice;
use crate::schema::{upgrade_db, STARTUP_SQL};
use crate::subscription::ReqFilter;
use crate::subscription::Subscription;
@@ -32,7 +33,7 @@ pub type PooledConnection = r2d2::PooledConnection<r2d2_sqlite::SqliteConnection
/// Events submitted from a client, with a return channel for notices
pub struct SubmittedEvent {
pub event: Event,
pub notice_tx: tokio::sync::mpsc::Sender<String>,
pub notice_tx: tokio::sync::mpsc::Sender<Notice>,
}
/// Database file
@@ -142,12 +143,15 @@ pub async fn db_writer(
if next_event.is_none() {
break;
}
// track if an event write occurred; this is used to
// update the rate limiter
let mut event_write = false;
let subm_event = next_event.unwrap();
let event = subm_event.event;
let notice_tx = subm_event.notice_tx;
// check if this event is authorized.
if let Some(allowed_addrs) = whitelist {
// TODO: incorporate delegated pubkeys
// if the event address is not in allowed_addrs.
if !allowed_addrs.contains(&event.pubkey) {
info!(
@@ -155,7 +159,10 @@ pub async fn db_writer(
event.get_event_id_prefix()
);
notice_tx
.try_send("pubkey is not allowed to publish to this relay".to_owned())
.try_send(Notice::blocked(
event.id,
"pubkey is not allowed to publish to this relay",
))
.ok();
continue;
}
@@ -186,10 +193,10 @@ pub async fn db_writer(
event.get_author_prefix()
);
notice_tx
.try_send(
"NIP-05 verification is no longer valid (expired/wrong domain)"
.to_owned(),
)
.try_send(Notice::blocked(
event.id,
"NIP-05 verification is no longer valid (expired/wrong domain)",
))
.ok();
continue;
}
@@ -200,7 +207,10 @@ pub async fn db_writer(
event.get_author_prefix()
);
notice_tx
.try_send("NIP-05 verification needed to publish events".to_owned())
.try_send(Notice::blocked(
event.id,
"NIP-05 verification needed to publish events",
))
.ok();
continue;
}
@@ -213,22 +223,23 @@ pub async fn db_writer(
// TODO: cache recent list of authors to remove a DB call.
let start = Instant::now();
if event.kind >= 20000 && event.kind < 30000 {
bcast_tx.send(event.clone()).ok();
info!(
"published ephemeral event {:?} from {:?} in {:?}",
"published ephemeral event: {:?} from: {:?} in: {:?}",
event.get_event_id_prefix(),
event.get_author_prefix(),
start.elapsed()
);
bcast_tx.send(event.clone()).ok();
event_write = true
} else {
match write_event(&mut pool.get()?, &event) {
Ok(updated) => {
if updated == 0 {
trace!("ignoring duplicate or deleted event");
notice_tx.try_send(Notice::duplicate(event.id)).ok();
} else {
info!(
"persisted event {:?} from {:?} in {:?}",
"persisted event: {:?} from: {:?} in: {:?}",
event.get_event_id_prefix(),
event.get_author_prefix(),
start.elapsed()
@@ -236,16 +247,13 @@ pub async fn db_writer(
event_write = true;
// send this out to all clients
bcast_tx.send(event.clone()).ok();
notice_tx.try_send(Notice::saved(event.id)).ok();
}
}
Err(err) => {
warn!("event insert failed: {:?}", err);
notice_tx
.try_send(
"relay experienced an error trying to publish the latest event"
.to_owned(),
)
.ok();
let msg = "relay experienced an error trying to publish the latest event";
notice_tx.try_send(Notice::error(event.id, msg)).ok();
}
}
}
@@ -284,12 +292,13 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
let tx = conn.transaction()?;
// get relevant fields from event and convert to blobs.
let id_blob = hex::decode(&e.id).ok();
let pubkey_blob = hex::decode(&e.pubkey).ok();
let pubkey_blob: Option<Vec<u8>> = hex::decode(&e.pubkey).ok();
let delegator_blob: Option<Vec<u8>> = e.delegated_by.as_ref().and_then(|d| hex::decode(d).ok());
let event_str = serde_json::to_string(&e).ok();
// ignore if the event hash is a duplicate.
let mut ins_count = tx.execute(
"INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, strftime('%s','now'), FALSE);",
params![id_blob, e.created_at, e.kind, pubkey_blob, event_str]
"INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, delegated_by, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%s','now'), FALSE);",
params![id_blob, e.created_at, e.kind, pubkey_blob, delegator_blob, event_str]
)?;
if ins_count == 0 {
// if the event was a duplicate, no need to insert event or
@@ -312,7 +321,7 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
if is_lower_hex(tagval) && (tagval.len() % 2 == 0) {
tx.execute(
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
params![ev_id, &tagname, hex::decode(&tagval).ok()],
params![ev_id, &tagname, hex::decode(tagval).ok()],
)?;
} else {
tx.execute(
@@ -426,7 +435,7 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
return (empty_query, empty_params);
}
let mut query = "SELECT DISTINCT(e.content), e.created_at FROM event e ".to_owned();
let mut query = "SELECT DISTINCT(e.content), e.created_at FROM event e".to_owned();
// query parameters for SQLite
let mut params: Vec<Box<dyn ToSql>> = vec![];
@@ -439,16 +448,22 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
for auth in authvec {
match hex_range(auth) {
Some(HexSearch::Exact(ex)) => {
auth_searches.push("author=?".to_owned());
auth_searches.push("author=? OR delegated_by=?".to_owned());
params.push(Box::new(ex.clone()));
params.push(Box::new(ex));
}
Some(HexSearch::Range(lower, upper)) => {
auth_searches.push("(author>? AND author<?)".to_owned());
auth_searches.push(
"(author>? AND author<?) OR (delegated_by>? AND delegated_by<?)".to_owned(),
);
params.push(Box::new(lower.clone()));
params.push(Box::new(upper.clone()));
params.push(Box::new(lower));
params.push(Box::new(upper));
}
Some(HexSearch::LowerOnly(lower)) => {
auth_searches.push("author>?".to_owned());
auth_searches.push("author>? OR delegated_by>?".to_owned());
params.push(Box::new(lower.clone()));
params.push(Box::new(lower));
}
None => {
@@ -456,8 +471,14 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
}
}
}
let authors_clause = format!("({})", auth_searches.join(" OR "));
filter_components.push(authors_clause);
if !authvec.is_empty() {
let authors_clause = format!("({})", auth_searches.join(" OR "));
filter_components.push(authors_clause);
} else {
// if the authors list was empty, we should never return
// any results.
filter_components.push("false".to_owned());
}
}
// Query for Kind
if let Some(ks) = &f.kinds {
@@ -490,8 +511,14 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
}
}
}
let id_clause = format!("({})", id_searches.join(" OR "));
filter_components.push(id_clause);
if !idvec.is_empty() {
let id_clause = format!("({})", id_searches.join(" OR "));
filter_components.push(id_clause);
} else {
// if the ids list was empty, we should never return
// any results.
filter_components.push("false".to_owned());
}
}
// Query for tags
if let Some(map) = &f.tags {
@@ -500,7 +527,7 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
let mut blob_vals: Vec<Box<dyn ToSql>> = vec![];
for v in val {
if (v.len() % 2 == 0) && is_lower_hex(v) {
if let Ok(h) = hex::decode(&v) {
if let Ok(h) = hex::decode(v) {
blob_vals.push(Box::new(h));
}
} else {
@@ -567,10 +594,18 @@ fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
.map(|s| format!("SELECT content, created_at FROM ({})", s))
.collect();
let query: String = subqueries_selects.join(" UNION ");
debug!("final query string: {}", query);
(query, params)
}
fn log_pool_stats(pool: &SqlitePool) {
let state: r2d2::State = pool.state();
let in_use_cxns = state.connections - state.idle_connections;
debug!(
"DB pool usage (in_use: {}, available: {})",
in_use_cxns, state.connections
);
}
/// Perform a database query using a subscription.
///
/// The [`Subscription`] is converted into a SQL query. Each result
@@ -585,14 +620,15 @@ pub async fn db_query(
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
) {
task::spawn_blocking(move || {
debug!("going to query for: {:?}", sub);
let mut row_count: usize = 0;
let start = Instant::now();
// generate SQL query
let (q, p) = query_from_sub(&sub);
debug!("SQL generated in {:?}", start.elapsed());
trace!("SQL generated in {:?}", start.elapsed());
// show pool stats
debug!("DB pool stats: {:?}", pool.state());
log_pool_stats(&pool);
// cutoff for displaying slow queries
let slow_cutoff = Duration::from_millis(1000);
let start = Instant::now();
if let Ok(conn) = pool.get() {
// execute the query. Don't cache, since queries vary so much.
@@ -601,18 +637,36 @@ pub async fn db_query(
let mut first_result = true;
while let Some(row) = event_rows.next()? {
if first_result {
let first_result_elapsed = start.elapsed();
// logging for slow queries; show sub and SQL
if first_result_elapsed >= slow_cutoff {
info!(
"going to query for: {:?} (cid: {}, sub: {:?})",
sub, client_id, sub.id
);
info!(
"final query string (slow): {} (cid: {}, sub: {:?})",
q, client_id, sub.id
);
} else {
trace!(
"going to query for: {:?} (cid: {}, sub: {:?})",
sub,
client_id,
sub.id
);
trace!("final query string: {}", q);
}
debug!(
"time to first result: {:?} (cid={}, sub={:?})",
start.elapsed(),
client_id,
sub.id
"first result in {:?} (cid: {}, sub: {:?})",
first_result_elapsed, client_id, sub.id
);
first_result = false;
}
// check if this is still active
// TODO: check every N rows
if abandon_query_rx.try_recv().is_ok() {
debug!("query aborted (sub={:?})", sub.id);
debug!("query aborted (cid: {}, sub: {:?})", client_id, sub.id);
return Ok(());
}
row_count += 1;
@@ -631,11 +685,11 @@ pub async fn db_query(
})
.ok();
debug!(
"query completed ({} rows) in {:?} (cid={}, sub={:?})",
row_count,
"query completed in {:?} (cid: {}, sub: {:?}, rows: {})",
start.elapsed(),
client_id,
sub.id
sub.id,
row_count
);
} else {
warn!("Could not get a database connection for querying");

416
src/delegation.rs Normal file
View File

@@ -0,0 +1,416 @@
//! Event parsing and validation
use crate::error::Error;
use crate::error::Result;
use crate::event::Event;
use bitcoin_hashes::{sha256, Hash};
use lazy_static::lazy_static;
use regex::Regex;
use secp256k1::{schnorr, Secp256k1, VerifyOnly, XOnlyPublicKey};
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use tracing::{debug, info};
// This handles everything related to delegation, in particular the
// condition/rune parsing and logic.
// Conditions are poorly specified, so we will implement the minimum
// necessary for now.
// fields MUST be either "kind" or "created_at".
// operators supported are ">", "<", "=", "!".
// no operations on 'content' are supported.
// this allows constraints for:
// valid date ranges (valid from X->Y dates).
// specific kinds (publish kind=1,5)
// kind ranges (publish ephemeral events, kind>19999&kind<30001)
// for more complex scenarios (allow delegatee to publish ephemeral
// AND replacement events), it may be necessary to generate and use
// different condition strings, since we do not support grouping or
// "OR" logic.
lazy_static! {
/// Secp256k1 verification instance.
pub static ref SECP: Secp256k1<VerifyOnly> = Secp256k1::verification_only();
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
pub enum Field {
Kind,
CreatedAt,
}
impl FromStr for Field {
type Err = Error;
fn from_str(value: &str) -> Result<Self, Self::Err> {
if value == "kind" {
Ok(Field::Kind)
} else if value == "created_at" {
Ok(Field::CreatedAt)
} else {
Err(Error::DelegationParseError)
}
}
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
pub enum Operator {
LessThan,
GreaterThan,
Equals,
NotEquals,
}
impl FromStr for Operator {
type Err = Error;
fn from_str(value: &str) -> Result<Self, Self::Err> {
if value == "<" {
Ok(Operator::LessThan)
} else if value == ">" {
Ok(Operator::GreaterThan)
} else if value == "=" {
Ok(Operator::Equals)
} else if value == "!" {
Ok(Operator::NotEquals)
} else {
Err(Error::DelegationParseError)
}
}
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
pub struct ConditionQuery {
pub(crate) conditions: Vec<Condition>,
}
impl ConditionQuery {
pub fn allows_event(&self, event: &Event) -> bool {
// check each condition, to ensure that the event complies
// with the restriction.
for c in &self.conditions {
if !c.allows_event(event) {
// any failing conditions invalidates the delegation
// on this event
return false;
}
}
// delegation was permitted unconditionally, or all conditions
// were true
true
}
}
// Verify that the delegator approved the delegation; return a ConditionQuery if so.
pub fn validate_delegation(
delegator: &str,
delegatee: &str,
cond_query: &str,
sigstr: &str,
) -> Option<ConditionQuery> {
// form the token
let tok = format!("nostr:delegation:{}:{}", delegatee, cond_query);
// form SHA256 hash
let digest: sha256::Hash = sha256::Hash::hash(tok.as_bytes());
let sig = schnorr::Signature::from_str(sigstr).unwrap();
if let Ok(msg) = secp256k1::Message::from_slice(digest.as_ref()) {
if let Ok(pubkey) = XOnlyPublicKey::from_str(delegator) {
let verify = SECP.verify_schnorr(&sig, &msg, &pubkey);
if verify.is_ok() {
// return the parsed condition query
cond_query.parse::<ConditionQuery>().ok()
} else {
debug!("client sent an delegation signature that did not validate");
None
}
} else {
debug!("client sent malformed delegation pubkey");
None
}
} else {
info!("error converting delegation digest to secp256k1 message");
None
}
}
/// Parsed delegation condition
/// see https://github.com/nostr-protocol/nips/pull/28#pullrequestreview-1084903800
/// An example complex condition would be: kind=1,2,3&created_at<1665265999
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
pub struct Condition {
pub(crate) field: Field,
pub(crate) operator: Operator,
pub(crate) values: Vec<u64>,
}
impl Condition {
/// Check if this condition allows the given event to be delegated
pub fn allows_event(&self, event: &Event) -> bool {
// determine what the right-hand side of the operator is
let resolved_field = match &self.field {
Field::Kind => event.kind,
Field::CreatedAt => event.created_at,
};
match &self.operator {
Operator::LessThan => {
// the less-than operator is only valid for single values.
if self.values.len() == 1 {
if let Some(v) = self.values.first() {
return resolved_field < *v;
}
}
}
Operator::GreaterThan => {
// the greater-than operator is only valid for single values.
if self.values.len() == 1 {
if let Some(v) = self.values.first() {
return resolved_field > *v;
}
}
}
Operator::Equals => {
// equals is interpreted as "must be equal to at least one provided value"
return self.values.iter().any(|&x| resolved_field == x);
}
Operator::NotEquals => {
// not-equals is interpreted as "must not be equal to any provided value"
// this is the one case where an empty list of values could be allowed; even though it is a pointless restriction.
return self.values.iter().all(|&x| resolved_field != x);
}
}
false
}
}
fn str_to_condition(cs: &str) -> Option<Condition> {
// a condition is a string (alphanum+underscore), an operator (<>=!), and values (num+comma)
lazy_static! {
static ref RE: Regex = Regex::new("([[:word:]]+)([<>=!]+)([,[[:digit:]]]*)").unwrap();
}
// match against the regex
let caps = RE.captures(cs)?;
let field = caps.get(1)?.as_str().parse::<Field>().ok()?;
let operator = caps.get(2)?.as_str().parse::<Operator>().ok()?;
// values are just comma separated numbers, but all must be parsed
let rawvals = caps.get(3)?.as_str();
let values = rawvals
.split_terminator(',')
.map(|n| n.parse::<u64>().ok())
.collect::<Option<Vec<_>>>()?;
// convert field string into Field
Some(Condition {
field,
operator,
values,
})
}
/// Parse a condition query from a string slice
impl FromStr for ConditionQuery {
type Err = Error;
fn from_str(value: &str) -> Result<Self, Self::Err> {
// split the string with '&'
let mut conditions = vec![];
let condstrs = value.split_terminator('&');
// parse each individual condition
for c in condstrs {
conditions.push(str_to_condition(c).ok_or(Error::DelegationParseError)?);
}
Ok(ConditionQuery { conditions })
}
}
#[cfg(test)]
mod tests {
use super::*;
// parse condition strings
#[test]
fn parse_empty() -> Result<()> {
// given an empty condition query, produce an empty vector
let empty_cq = ConditionQuery { conditions: vec![] };
let parsed = "".parse::<ConditionQuery>()?;
assert_eq!(parsed, empty_cq);
Ok(())
}
// parse field 'kind'
#[test]
fn test_kind_field_parse() -> Result<()> {
let field = "kind".parse::<Field>()?;
assert_eq!(field, Field::Kind);
Ok(())
}
// parse field 'created_at'
#[test]
fn test_created_at_field_parse() -> Result<()> {
let field = "created_at".parse::<Field>()?;
assert_eq!(field, Field::CreatedAt);
Ok(())
}
// parse unknown field
#[test]
fn unknown_field_parse() {
let field = "unk".parse::<Field>();
assert!(field.is_err());
}
// parse a full conditional query with an empty array
#[test]
fn parse_kind_equals_empty() -> Result<()> {
// given an empty condition query, produce an empty vector
let kind_cq = ConditionQuery {
conditions: vec![Condition {
field: Field::Kind,
operator: Operator::Equals,
values: vec![],
}],
};
let parsed = "kind=".parse::<ConditionQuery>()?;
assert_eq!(parsed, kind_cq);
Ok(())
}
// parse a full conditional query with a single value
#[test]
fn parse_kind_equals_singleval() -> Result<()> {
// given an empty condition query, produce an empty vector
let kind_cq = ConditionQuery {
conditions: vec![Condition {
field: Field::Kind,
operator: Operator::Equals,
values: vec![1],
}],
};
let parsed = "kind=1".parse::<ConditionQuery>()?;
assert_eq!(parsed, kind_cq);
Ok(())
}
// parse a full conditional query with multiple values
#[test]
fn parse_kind_equals_multival() -> Result<()> {
// given an empty condition query, produce an empty vector
let kind_cq = ConditionQuery {
conditions: vec![Condition {
field: Field::Kind,
operator: Operator::Equals,
values: vec![1, 2, 4],
}],
};
let parsed = "kind=1,2,4".parse::<ConditionQuery>()?;
assert_eq!(parsed, kind_cq);
Ok(())
}
// parse multiple conditions
#[test]
fn parse_multi_conditions() -> Result<()> {
// given an empty condition query, produce an empty vector
let cq = ConditionQuery {
conditions: vec![
Condition {
field: Field::Kind,
operator: Operator::GreaterThan,
values: vec![10000],
},
Condition {
field: Field::Kind,
operator: Operator::LessThan,
values: vec![20000],
},
Condition {
field: Field::Kind,
operator: Operator::NotEquals,
values: vec![10001],
},
Condition {
field: Field::CreatedAt,
operator: Operator::LessThan,
values: vec![1665867123],
},
],
};
let parsed =
"kind>10000&kind<20000&kind!10001&created_at<1665867123".parse::<ConditionQuery>()?;
assert_eq!(parsed, cq);
Ok(())
}
fn simple_event() -> Event {
Event {
id: "0".to_owned(),
pubkey: "0".to_owned(),
delegated_by: None,
created_at: 0,
kind: 0,
tags: vec![],
content: "".to_owned(),
sig: "0".to_owned(),
tagidx: None,
}
}
// Check for condition logic on event w/ empty values
#[test]
fn condition_with_empty_values() {
let mut c = Condition {
field: Field::Kind,
operator: Operator::GreaterThan,
values: vec![],
};
let e = simple_event();
assert!(!c.allows_event(&e));
c.operator = Operator::LessThan;
assert!(!c.allows_event(&e));
c.operator = Operator::Equals;
assert!(!c.allows_event(&e));
// Not Equals applied to an empty list *is* allowed
// (pointless, but logically valid).
c.operator = Operator::NotEquals;
assert!(c.allows_event(&e));
}
// Check for condition logic on event w/ single value
#[test]
fn condition_kind_gt_event_single() {
let c = Condition {
field: Field::Kind,
operator: Operator::GreaterThan,
values: vec![10],
};
let mut e = simple_event();
// kind is not greater than 10, not allowed
e.kind = 1;
assert!(!c.allows_event(&e));
// kind is greater than 10, allowed
e.kind = 100;
assert!(c.allows_event(&e));
// kind is 10, not allowed
e.kind = 10;
assert!(!c.allows_event(&e));
}
// Check for condition logic on event w/ multi values
#[test]
fn condition_with_multi_values() {
let mut c = Condition {
field: Field::Kind,
operator: Operator::Equals,
values: vec![0, 10, 20],
};
let mut e = simple_event();
// Allow if event kind is in list for Equals
e.kind = 10;
assert!(c.allows_event(&e));
// Deny if event kind is not in list for Equals
e.kind = 11;
assert!(!c.allows_event(&e));
// Deny if event kind is in list for NotEquals
e.kind = 10;
c.operator = Operator::NotEquals;
assert!(!c.allows_event(&e));
// Allow if event kind is not in list for NotEquals
e.kind = 99;
c.operator = Operator::NotEquals;
assert!(c.allows_event(&e));
// Always deny if GreaterThan/LessThan for a list
c.operator = Operator::LessThan;
assert!(!c.allows_event(&e));
c.operator = Operator::GreaterThan;
assert!(!c.allows_event(&e));
}
}

View File

@@ -17,10 +17,16 @@ pub enum Error {
ConnWriteError,
#[error("EVENT parse failed")]
EventParseFailed,
#[error("ClOSE message parse failed")]
#[error("CLOSE message parse failed")]
CloseParseFailed,
#[error("Event validation failed")]
EventInvalid,
#[error("Event invalid signature")]
EventInvalidSignature,
#[error("Event invalid id")]
EventInvalidId,
#[error("Event malformed pubkey")]
EventMalformedPubkey,
#[error("Event could not canonicalize")]
EventCouldNotCanonicalize,
#[error("Event too large")]
EventMaxLengthError(usize),
#[error("Subscription identifier max length exceeded")]
@@ -50,6 +56,8 @@ pub enum Error {
HyperError(hyper::Error),
#[error("Hex encoding error")]
HexError(hex::FromHexError),
#[error("Delegation parse error")]
DelegationParseError,
#[error("Unknown/Undocumented")]
UnknownError,
}

View File

@@ -1,4 +1,5 @@
//! Event parsing and validation
use crate::delegation::validate_delegation;
use crate::error::Error::*;
use crate::error::Result;
use crate::nip05;
@@ -26,11 +27,19 @@ pub struct EventCmd {
event: Event,
}
impl EventCmd {
pub fn event_id(&self) -> &str {
&self.event.id
}
}
/// Parsed nostr event.
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
pub struct Event {
pub id: String,
pub(crate) pubkey: String,
#[serde(skip)]
pub(crate) delegated_by: Option<String>,
pub(crate) created_at: u64,
pub(crate) kind: u64,
#[serde(deserialize_with = "tag_from_string")]
@@ -80,12 +89,13 @@ impl From<EventCmd> for Result<Event> {
// ensure command is correct
if ec.cmd != "EVENT" {
Err(CommandUnknownError)
} else if ec.event.is_valid() {
let mut e = ec.event;
e.build_index();
Ok(e)
} else {
Err(EventInvalid)
ec.event.validate().map(|_| {
let mut e = ec.event;
e.build_index();
e.update_delegation();
e
})
}
}
}
@@ -110,6 +120,50 @@ impl Event {
None
}
// is this event delegated (properly)?
// does the signature match, and are conditions valid?
// if so, return an alternate author for the event
pub fn delegated_author(&self) -> Option<String> {
// is there a delegation tag?
let delegation_tag: Vec<String> = self
.tags
.iter()
.filter(|x| x.len() == 4)
.filter(|x| x.get(0).unwrap() == "delegation")
.take(1)
.next()?
.to_vec(); // get first tag
//let delegation_tag = self.tag_values_by_name("delegation");
// delegation tags should have exactly 3 elements after the name (pubkey, condition, sig)
// the event is signed by the delagatee
let delegatee = &self.pubkey;
// the delegation tag references the claimed delagator
let delegator: &str = delegation_tag.get(1)?;
let querystr: &str = delegation_tag.get(2)?;
let sig: &str = delegation_tag.get(3)?;
// attempt to get a condition query; this requires the delegation to have a valid signature.
if let Some(cond_query) = validate_delegation(delegator, delegatee, querystr, sig) {
// The signature was valid, now we ensure the delegation
// condition is valid for this event:
if cond_query.allows_event(self) {
// since this is allowed, we will provide the delegatee
Some(delegator.into())
} else {
debug!("an event failed to satisfy delegation conditions");
None
}
} else {
debug!("event had had invalid delegation signature");
None
}
}
/// Update delegation status
fn update_delegation(&mut self) {
self.delegated_by = self.delegated_author();
}
/// Build an event tag index
fn build_index(&mut self) {
// if there are no tags; just leave the index as None
@@ -145,7 +199,7 @@ impl Event {
self.pubkey.chars().take(8).collect()
}
/// Retrieve tag values
/// Retrieve tag initial values across all tags matching the name
pub fn tag_values_by_name(&self, tag_name: &str) -> Vec<String> {
self.tags
.iter()
@@ -172,7 +226,7 @@ impl Event {
}
/// Check if this event has a valid signature.
fn is_valid(&self) -> bool {
fn validate(&self) -> Result<()> {
// TODO: return a Result with a reason for invalid events
// validation is performed by:
// * parsing JSON string into event fields
@@ -181,8 +235,8 @@ impl Event {
// * serialize with no spaces/newlines
let c_opt = self.to_canonical();
if c_opt.is_none() {
debug!("event could not be canonicalized");
return false;
debug!("could not canonicalize");
return Err(EventCouldNotCanonicalize);
}
let c = c_opt.unwrap();
// * compute the sha256sum.
@@ -191,21 +245,21 @@ impl Event {
// * ensure the id matches the computed sha256sum.
if self.id != hex_digest {
debug!("event id does not match digest");
return false;
return Err(EventInvalidId);
}
// * validate the message digest (sig) using the pubkey & computed sha256 message hash.
let sig = schnorr::Signature::from_str(&self.sig).unwrap();
if let Ok(msg) = secp256k1::Message::from_slice(digest.as_ref()) {
if let Ok(pubkey) = XOnlyPublicKey::from_str(&self.pubkey) {
let verify = SECP.verify_schnorr(&sig, &msg, &pubkey);
matches!(verify, Ok(()))
SECP.verify_schnorr(&sig, &msg, &pubkey)
.map_err(|_| EventInvalidSignature)
} else {
debug!("client sent malformed pubkey");
false
Err(EventMalformedPubkey)
}
} else {
info!("error converting digest to secp256k1 message");
false
Err(EventInvalidSignature)
}
}
@@ -269,6 +323,7 @@ mod tests {
Event {
id: "0".to_owned(),
pubkey: "0".to_owned(),
delegated_by: None,
created_at: 0,
kind: 0,
tags: vec![],
@@ -350,6 +405,7 @@ mod tests {
let e = Event {
id: "999".to_owned(),
pubkey: "012345".to_owned(),
delegated_by: None,
created_at: 501234,
kind: 1,
tags: vec![],
@@ -367,6 +423,7 @@ mod tests {
let e = Event {
id: "999".to_owned(),
pubkey: "012345".to_owned(),
delegated_by: None,
created_at: 501234,
kind: 1,
tags: vec![
@@ -388,11 +445,39 @@ mod tests {
assert_eq!(v, vec!["foo", "bar", "baz"]);
}
#[test]
fn event_no_tag_select() {
let e = Event {
id: "999".to_owned(),
pubkey: "012345".to_owned(),
delegated_by: None,
created_at: 501234,
kind: 1,
tags: vec![
vec!["j".to_owned(), "abc".to_owned()],
vec!["e".to_owned(), "foo".to_owned()],
vec!["e".to_owned(), "baz".to_owned()],
vec![
"p".to_owned(),
"aaaa".to_owned(),
"ws://example.com".to_owned(),
],
],
content: "this is a test".to_owned(),
sig: "abcde".to_owned(),
tagidx: None,
};
let v = e.tag_values_by_name("x");
// asking for tags that don't exist just returns zero-length vector
assert_eq!(v.len(), 0);
}
#[test]
fn event_canonical_with_tags() {
let e = Event {
id: "999".to_owned(),
pubkey: "012345".to_owned(),
delegated_by: None,
created_at: 501234,
kind: 1,
tags: vec![

View File

@@ -35,7 +35,7 @@ impl From<config::Info> for RelayInfo {
description: i.description,
pubkey: i.pubkey,
contact: i.contact,
supported_nips: Some(vec![1, 2, 9, 11, 12, 15, 16, 22]),
supported_nips: Some(vec![1, 2, 9, 11, 12, 15, 16, 20, 22, 26]),
software: Some("https://git.sr.ht/~gheartsfield/nostr-rs-relay".to_owned()),
version: CARGO_PKG_VERSION.map(|x| x.to_owned()),
}

View File

@@ -2,11 +2,13 @@ pub mod close;
pub mod config;
pub mod conn;
pub mod db;
pub mod delegation;
pub mod error;
pub mod event;
pub mod hexrange;
pub mod info;
pub mod nip05;
pub mod notice;
pub mod schema;
pub mod subscription;
pub mod utils;

View File

@@ -34,11 +34,11 @@ fn main() {
// enable tracing with tokio-console
ConsoleLayer::builder().with_default_env().init();
}
// update with database location
if let Some(db) = db_dir {
settings.database.data_directory = db;
}
let (_, ctrl_rx): (MpscSender<()>, MpscReceiver<()>) = syncmpsc::channel();
// run this in a new thread
let handle = thread::spawn(|| {

86
src/notice.rs Normal file
View File

@@ -0,0 +1,86 @@
pub enum EventResultStatus {
Saved,
Duplicate,
Invalid,
Blocked,
RateLimited,
Error,
}
pub struct EventResult {
pub id: String,
pub msg: String,
pub status: EventResultStatus,
}
pub enum Notice {
Message(String),
EventResult(EventResult),
}
impl EventResultStatus {
pub fn to_bool(&self) -> bool {
match self {
Self::Saved => true,
Self::Duplicate => true,
Self::Invalid => false,
Self::Blocked => false,
Self::RateLimited => false,
Self::Error => false,
}
}
pub fn prefix(&self) -> &'static str {
match self {
Self::Saved => "saved",
Self::Duplicate => "duplicate",
Self::Invalid => "invalid",
Self::Blocked => "blocked",
Self::RateLimited => "rate-limited",
Self::Error => "error",
}
}
}
impl Notice {
//pub fn err(err: error::Error, id: String) -> Notice {
// Notice::err_msg(format!("{}", err), id)
//}
pub fn message(msg: String) -> Notice {
Notice::Message(msg)
}
fn prefixed(id: String, msg: &str, status: EventResultStatus) -> Notice {
let msg = format!("{}: {}", status.prefix(), msg);
Notice::EventResult(EventResult { id, msg, status })
}
pub fn invalid(id: String, msg: &str) -> Notice {
Notice::prefixed(id, msg, EventResultStatus::Invalid)
}
pub fn blocked(id: String, msg: &str) -> Notice {
Notice::prefixed(id, msg, EventResultStatus::Blocked)
}
pub fn rate_limited(id: String, msg: &str) -> Notice {
Notice::prefixed(id, msg, EventResultStatus::RateLimited)
}
pub fn duplicate(id: String) -> Notice {
Notice::prefixed(id, "", EventResultStatus::Duplicate)
}
pub fn error(id: String, msg: &str) -> Notice {
Notice::prefixed(id, msg, EventResultStatus::Error)
}
pub fn saved(id: String) -> Notice {
Notice::EventResult(EventResult {
id,
msg: "".into(),
status: EventResultStatus::Saved,
})
}
}

View File

@@ -20,7 +20,7 @@ pragma mmap_size = 536870912; -- 512MB of mmap
"##;
/// Latest database version
pub const DB_VERSION: usize = 6;
pub const DB_VERSION: usize = 9;
/// Schema definition
const INIT_SQL: &str = formatcp!(
@@ -40,6 +40,7 @@ event_hash BLOB NOT NULL, -- 4-byte hash
first_seen INTEGER NOT NULL, -- when the event was first seen (not authored!) (seconds since 1970)
created_at INTEGER NOT NULL, -- when the event was authored
author BLOB NOT NULL, -- author pubkey
delegated_by BLOB, -- delegator pubkey (NIP-26)
kind INTEGER NOT NULL, -- event kind
hidden INTEGER, -- relevant for queries
content TEXT NOT NULL -- serialized json of event object
@@ -47,9 +48,10 @@ content TEXT NOT NULL -- serialized json of event object
-- Event Indexes
CREATE UNIQUE INDEX IF NOT EXISTS event_hash_index ON event(event_hash);
CREATE INDEX IF NOT EXISTS created_at_index ON event(created_at);
CREATE INDEX IF NOT EXISTS author_index ON event(author);
CREATE INDEX IF NOT EXISTS kind_index ON event(kind);
CREATE INDEX IF NOT EXISTS created_at_index ON event(created_at);
CREATE INDEX IF NOT EXISTS delegated_by_index ON event(delegated_by);
CREATE INDEX IF NOT EXISTS event_composite_index ON event(kind,created_at);
-- Tag Table
-- Tag values are stored as either a BLOB (if they come in as a
@@ -152,6 +154,16 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<()> {
if curr_version == 5 {
curr_version = mig_5_to_6(conn)?;
}
if curr_version == 6 {
curr_version = mig_6_to_7(conn)?;
}
if curr_version == 7 {
curr_version = mig_7_to_8(conn)?;
}
if curr_version == 8 {
curr_version = mig_8_to_9(conn)?;
}
if curr_version == DB_VERSION {
info!(
"All migration scripts completed successfully. Welcome to v{}.",
@@ -327,7 +339,7 @@ fn mig_5_to_6(conn: &mut PooledConnection) -> Result<usize> {
if (tagval.len() % 2 == 0) && is_lower_hex(tagval) {
tx.execute(
"INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);",
params![event_id, tagname, hex::decode(&tagval).ok()],
params![event_id, tagname, hex::decode(tagval).ok()],
)?;
} else {
// otherwise, insert as text
@@ -348,3 +360,64 @@ fn mig_5_to_6(conn: &mut PooledConnection) -> Result<usize> {
info!("vacuumed DB after tags rebuild in {:?}", start.elapsed());
Ok(6)
}
fn mig_6_to_7(conn: &mut PooledConnection) -> Result<usize> {
info!("database schema needs update from 6->7");
// only change is adding a hidden column to events.
let upgrade_sql = r##"
ALTER TABLE event ADD delegated_by BLOB;
CREATE INDEX IF NOT EXISTS delegated_by_index ON event(delegated_by);
PRAGMA user_version = 7;
"##;
match conn.execute_batch(upgrade_sql) {
Ok(()) => {
info!("database schema upgraded v6 -> v7");
}
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be upgraded");
}
}
Ok(7)
}
fn mig_7_to_8(conn: &mut PooledConnection) -> Result<usize> {
info!("database schema needs update from 7->8");
// Remove redundant indexes, and add a better multi-column index.
let upgrade_sql = r##"
DROP INDEX IF EXISTS created_at_index;
DROP INDEX IF EXISTS kind_index;
CREATE INDEX IF NOT EXISTS event_composite_index ON event(kind,created_at);
PRAGMA user_version = 8;
"##;
match conn.execute_batch(upgrade_sql) {
Ok(()) => {
info!("database schema upgraded v7 -> v8");
}
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be upgraded");
}
}
Ok(8)
}
fn mig_8_to_9(conn: &mut PooledConnection) -> Result<usize> {
info!("database schema needs update from 8->9");
// Those old indexes were actually helpful...
let upgrade_sql = r##"
CREATE INDEX IF NOT EXISTS created_at_index ON event(created_at);
CREATE INDEX IF NOT EXISTS event_composite_index ON event(kind,created_at);
PRAGMA user_version = 9;
"##;
match conn.execute_batch(upgrade_sql) {
Ok(()) => {
info!("database schema upgraded v8 -> v9");
}
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be upgraded");
}
}
Ok(8)
}

View File

@@ -10,9 +10,11 @@ use crate::event::Event;
use crate::event::EventCmd;
use crate::info::RelayInfo;
use crate::nip05;
use crate::notice::Notice;
use crate::subscription::Subscription;
use futures::SinkExt;
use futures::StreamExt;
use http::header::HeaderMap;
use hyper::header::ACCEPT;
use hyper::service::{make_service_fn, service_fn};
use hyper::upgrade::Upgraded;
@@ -84,11 +86,32 @@ async fn handle_web_request(
Some(config),
)
.await;
let user_agent = get_header_string("user-agent", request.headers());
// determine the remote IP from headers if the exist
let header_ip = settings
.network
.remote_ip_header
.as_ref()
.and_then(|x| get_header_string(x, request.headers()));
// use the socket addr as a backup
let remote_ip =
header_ip.unwrap_or_else(|| remote_addr.ip().to_string());
let client_info = ClientInfo {
remote_ip,
user_agent,
};
// spawn a nostr server with our websocket
tokio::spawn(nostr_server(
pool, settings, ws_stream, broadcast, event_tx, shutdown,
pool,
client_info,
settings,
ws_stream,
broadcast,
event_tx,
shutdown,
));
}
// todo: trace, don't print...
Err(e) => println!(
"error when trying to upgrade connection \
from address {} to websocket connection. \
@@ -148,6 +171,12 @@ async fn handle_web_request(
}
}
fn get_header_string(header: &str, headers: &HeaderMap) -> Option<String> {
headers
.get(header)
.and_then(|x| x.to_str().ok().map(|x| x.to_string()))
}
// return on a control-c or internally requested shutdown signal
async fn ctrl_c_or_signal(mut shutdown_signal: Receiver<()>) {
let mut term_signal = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
@@ -167,7 +196,6 @@ async fn ctrl_c_or_signal(mut shutdown_signal: Receiver<()>) {
info!("Shutting down webserver due to SIGTERM");
break;
},
}
}
}
@@ -283,6 +311,7 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
controlled_shutdown.send(()).ok();
}
Err(std::sync::mpsc::RecvError) => {
// FIXME: spurious error on startup?
debug!("shutdown requestor is disconnected");
}
};
@@ -378,36 +407,50 @@ fn convert_to_msg(msg: String, max_bytes: Option<usize>) -> Result<NostrMessage>
}
/// Turn a string into a NOTICE message ready to send over a WebSocket
fn make_notice_message(msg: &str) -> Message {
Message::text(json!(["NOTICE", msg]).to_string())
fn make_notice_message(notice: Notice) -> Message {
let json = match notice {
Notice::Message(ref msg) => json!(["NOTICE", msg]),
Notice::EventResult(ref res) => json!(["OK", res.id, res.status.to_bool(), res.msg]),
};
Message::text(json.to_string())
}
struct ClientInfo {
remote_ip: String,
user_agent: Option<String>,
}
/// Handle new client connections. This runs through an event loop
/// for all client communication.
async fn nostr_server(
pool: db::SqlitePool,
client_info: ClientInfo,
settings: Settings,
mut ws_stream: WebSocketStream<Upgraded>,
broadcast: Sender<Event>,
event_tx: mpsc::Sender<SubmittedEvent>,
mut shutdown: Receiver<()>,
) {
// the time this websocket nostr server started
let orig_start = Instant::now();
// get a broadcast channel for clients to communicate on
let mut bcast_rx = broadcast.subscribe();
// Track internal client state
let mut conn = conn::ClientConn::new();
let mut conn = conn::ClientConn::new(client_info.remote_ip);
// Use the remote IP as the client identifier
let cid = conn.get_client_prefix();
// Create a channel for receiving query results from the database.
// we will send out the tx handle to any query we generate.
let (query_tx, mut query_rx) = mpsc::channel::<db::QueryResult>(256);
// Create channel for receiving NOTICEs
let (notice_tx, mut notice_rx) = mpsc::channel::<String>(32);
let (notice_tx, mut notice_rx) = mpsc::channel::<Notice>(32);
// last time this client sent data (message, ping, etc.)
let mut last_message_time = Instant::now();
// ping interval (every 5 minutes)
let default_ping_dur = Duration::from_secs(300);
let default_ping_dur = Duration::from_secs(settings.network.ping_interval_seconds.into());
// disconnect after 20 minutes without a ping response or event.
let max_quiet_time = Duration::from_secs(60 * 20);
@@ -419,16 +462,20 @@ async fn nostr_server(
// when these subscriptions are cancelled, make a message
// available to the executing query so it knows to stop.
let mut running_queries: HashMap<String, oneshot::Sender<()>> = HashMap::new();
// keep track of the subscriptions we have
let mut current_subs: Vec<Subscription> = Vec::new();
// for stats, keep track of how many events the client published,
// and how many it received from queries.
let mut client_published_event_count: usize = 0;
let mut client_received_event_count: usize = 0;
info!("new connection for client: {:?}", cid);
debug!("new client connection (cid: {}, ip: {:?})", cid, conn.ip());
if let Some(ua) = client_info.user_agent {
debug!("cid: {}, user-agent: {:?}", cid, ua);
}
loop {
tokio::select! {
_ = shutdown.recv() => {
info!("Shutting client connection down due to shutdown: {:?}", cid);
info!("Close connection down due to shutdown, client: {}, ip: {:?}, connected: {:?}", cid, conn.ip(), orig_start.elapsed());
// server shutting down, exit loop
break;
},
@@ -443,7 +490,7 @@ async fn nostr_server(
ws_stream.send(Message::Ping(Vec::new())).await.ok();
},
Some(notice_msg) = notice_rx.recv() => {
ws_stream.send(make_notice_message(&notice_msg)).await.ok();
ws_stream.send(make_notice_message(notice_msg)).await.ok();
},
Some(query_result) = query_rx.recv() => {
// database informed us of a query result we asked for
@@ -462,12 +509,15 @@ async fn nostr_server(
Ok(global_event) = bcast_rx.recv() => {
// an event has been broadcast to all clients
// first check if there is a subscription for this event.
let matching_subs = conn.get_matching_subscriptions(&global_event);
for s in matching_subs {
for (s, sub) in conn.subscriptions() {
if !sub.interested_in_event(&global_event) {
continue;
}
// TODO: serialize at broadcast time, instead of
// once for each consumer.
if let Ok(event_str) = serde_json::to_string(&global_event) {
debug!("sub match for client: {:?}, sub: {:?}, event: {:?}",
debug!("sub match for client: {}, sub: {:?}, event: {:?}",
cid, s,
global_event.get_event_id_prefix());
// create an event response and send it
@@ -488,7 +538,7 @@ async fn nostr_server(
},
Some(Ok(Message::Binary(_))) => {
ws_stream.send(
make_notice_message("binary messages are not accepted")).await.ok();
make_notice_message(Notice::message("binary messages are not accepted".into()))).await.ok();
continue;
},
Some(Ok(Message::Ping(_) | Message::Pong(_))) => {
@@ -498,8 +548,7 @@ async fn nostr_server(
},
Some(Err(WsError::Capacity(MessageTooLong{size, max_size}))) => {
ws_stream.send(
make_notice_message(
&format!("message too large ({} > {})",size, max_size))).await.ok();
make_notice_message(Notice::message(format!("message too large ({} > {})",size, max_size)))).await.ok();
continue;
},
None |
@@ -507,17 +556,17 @@ async fn nostr_server(
Err(WsError::AlreadyClosed | WsError::ConnectionClosed |
WsError::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake)))
=> {
debug!("websocket close from client: {:?}",cid);
debug!("websocket close from client (cid: {}, ip: {:?})",cid, conn.ip());
break;
},
Some(Err(WsError::Io(e))) => {
// IO errors are considered fatal
warn!("IO error (client: {:?}): {:?}", cid, e);
warn!("IO error (cid: {}, ip: {:?}): {:?}", cid, conn.ip(), e);
break;
}
x => {
// default condition on error is to close the client connection
info!("unknown error (client: {:?}): {:?} (closing conn)", cid, x);
info!("unknown error (cid: {}, ip: {:?}): {:?} (closing conn)", cid, conn.ip(), x);
break;
}
};
@@ -527,11 +576,12 @@ async fn nostr_server(
Ok(NostrMessage::EventMsg(ec)) => {
// An EventCmd needs to be validated to be converted into an Event
// handle each type of message
let evid = ec.event_id().to_owned();
let parsed : Result<Event> = Result::<Event>::from(ec);
match parsed {
Ok(e) => {
let id_prefix:String = e.id.chars().take(8).collect();
debug!("successfully parsed/validated event: {:?} from client: {:?}", id_prefix, cid);
debug!("successfully parsed/validated event: {:?} (cid: {})", id_prefix, cid);
// check if the event is too far in the future.
if e.is_valid_timestamp(settings.options.reject_future_seconds) {
// Write this to the database.
@@ -539,44 +589,57 @@ async fn nostr_server(
event_tx.send(submit_event).await.ok();
client_published_event_count += 1;
} else {
info!("client {:?} sent a far future-dated event", cid);
info!("client: {} sent a far future-dated event", cid);
if let Some(fut_sec) = settings.options.reject_future_seconds {
ws_stream.send(make_notice_message(&format!("The event created_at field is out of the acceptable range (+{}sec) for this relay and was not stored.",fut_sec))).await.ok();
let msg = format!("The event created_at field is out of the acceptable range (+{}sec) for this relay.",fut_sec);
let notice = Notice::invalid(e.id, &msg);
ws_stream.send(make_notice_message(notice)).await.ok();
}
}
},
Err(_) => {
info!("client {:?} sent an invalid event", cid);
ws_stream.send(make_notice_message("event was invalid")).await.ok();
Err(e) => {
info!("client sent an invalid event (cid: {})", cid);
ws_stream.send(make_notice_message(Notice::invalid(evid, &format!("{}", e)))).await.ok();
}
}
},
Ok(NostrMessage::SubMsg(s)) => {
debug!("client {} requesting a subscription", cid);
debug!("subscription requested (cid: {}, sub: {:?})", cid, s.id);
// subscription handling consists of:
// * registering the subscription so future events can be matched
// * making a channel to cancel to request later
// * sending a request for a SQL query
let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>();
match conn.subscribe(s.clone()) {
Ok(()) => {
// when we insert, if there was a previous query running with the same name, cancel it.
if let Some(previous_query) = running_queries.insert(s.id.to_owned(), abandon_query_tx) {
previous_query.send(()).ok();
}
// start a database query
db::db_query(s, cid.to_owned(), pool.clone(), query_tx.clone(), abandon_query_rx).await;
},
Err(e) => {
info!("Subscription error: {}", e);
ws_stream.send(make_notice_message(&e.to_string())).await.ok();
// Do nothing if the sub already exists.
if !current_subs.contains(&s) {
current_subs.push(s.clone());
let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>();
match conn.subscribe(s.clone()) {
Ok(()) => {
// when we insert, if there was a previous query running with the same name, cancel it.
if let Some(previous_query) = running_queries.insert(s.id.to_owned(), abandon_query_tx) {
previous_query.send(()).ok();
}
// start a database query
db::db_query(s, cid.to_owned(), pool.clone(), query_tx.clone(), abandon_query_rx).await;
},
Err(e) => {
info!("Subscription error: {}", e);
ws_stream.send(make_notice_message(Notice::message(format!("Subscription error: {}", e)))).await.ok();
}
}
}
} else {
info!("client send duplicate subscription, ignoring (cid: {}, sub: {:?})", cid, s.id);
}
},
Ok(NostrMessage::CloseMsg(cc)) => {
// closing a request simply removes the subscription.
let parsed : Result<Close> = Result::<Close>::from(cc);
if let Ok(c) = parsed {
// remove from the list of known subs
if let Some(pos) = current_subs.iter().position(|s| *s.id == c.id) {
current_subs.remove(pos);
}
// check if a query is currently
// running, and remove it if so.
let stop_tx = running_queries.remove(&c.id);
@@ -588,23 +651,23 @@ async fn nostr_server(
conn.unsubscribe(&c);
} else {
info!("invalid command ignored");
ws_stream.send(make_notice_message("could not parse command")).await.ok();
ws_stream.send(make_notice_message(Notice::message("could not parse command".into()))).await.ok();
}
},
Err(Error::ConnError) => {
debug!("got connection close/error, disconnecting client: {:?}",cid);
debug!("got connection close/error, disconnecting cid: {}, ip: {:?}",cid, conn.ip());
break;
}
Err(Error::EventMaxLengthError(s)) => {
info!("client {:?} sent event larger ({} bytes) than max size", cid, s);
ws_stream.send(make_notice_message("event exceeded max size")).await.ok();
info!("client sent event larger ({} bytes) than max size (cid: {})", s, cid);
ws_stream.send(make_notice_message(Notice::message("event exceeded max size".into()))).await.ok();
},
Err(Error::ProtoParseError) => {
info!("client {:?} sent event that could not be parsed", cid);
ws_stream.send(make_notice_message("could not parse command")).await.ok();
info!("client sent event that could not be parsed (cid: {})", cid);
ws_stream.send(make_notice_message(Notice::message("could not parse command".into()))).await.ok();
},
Err(e) => {
info!("got non-fatal error from client: {:?}, error: {:?}", cid, e);
info!("got non-fatal error from client (cid: {}, error: {:?}", cid, e);
},
}
},
@@ -615,7 +678,11 @@ async fn nostr_server(
stop_tx.send(()).ok();
}
info!(
"stopping connection for client: {:?} (client sent {} event(s), received {})",
cid, client_published_event_count, client_received_event_count
"stopping client connection (cid: {}, ip: {:?}, sent: {} events, recv: {} events, connected: {:?})",
cid,
conn.ip(),
client_published_event_count,
client_received_event_count,
orig_start.elapsed()
);
}

View File

@@ -37,6 +37,9 @@ pub struct ReqFilter {
#[serde(skip)]
pub tags: Option<HashMap<char, HashSet<String>>>,
/// Force no matches due to malformed data
// we can't represent it in the req filter, so we don't want to
// erroneously match. This basically indicates the req tried to
// do something invalid.
pub force_no_match: bool,
}
@@ -217,6 +220,17 @@ impl ReqFilter {
.unwrap_or(true)
}
fn delegated_authors_match(&self, event: &Event) -> bool {
if let Some(delegated_pubkey) = &event.delegated_by {
self.authors
.as_ref()
.map(|vs| prefix_match(vs, delegated_pubkey))
.unwrap_or(true)
} else {
false
}
}
fn tag_match(&self, event: &Event) -> bool {
// get the hashset from the filter.
if let Some(map) = &self.tags {
@@ -248,7 +262,7 @@ impl ReqFilter {
&& self.since.map(|t| event.created_at > t).unwrap_or(true)
&& self.until.map(|t| event.created_at < t).unwrap_or(true)
&& self.kind_match(event.kind)
&& self.authors_match(event)
&& (self.authors_match(event) || self.delegated_authors_match(event))
&& self.tag_match(event)
&& !self.force_no_match
}
@@ -308,6 +322,7 @@ mod tests {
let e = Event {
id: "foo".to_owned(),
pubkey: "abcd".to_owned(),
delegated_by: None,
created_at: 0,
kind: 0,
tags: Vec::new(),
@@ -326,6 +341,7 @@ mod tests {
let e = Event {
id: "abcd".to_owned(),
pubkey: "".to_owned(),
delegated_by: None,
created_at: 0,
kind: 0,
tags: Vec::new(),
@@ -344,6 +360,7 @@ mod tests {
let e = Event {
id: "abcde".to_owned(),
pubkey: "".to_owned(),
delegated_by: None,
created_at: 0,
kind: 0,
tags: Vec::new(),
@@ -363,6 +380,7 @@ mod tests {
let e = Event {
id: "abc".to_owned(),
pubkey: "".to_owned(),
delegated_by: None,
created_at: 50,
kind: 0,
tags: Vec::new(),
@@ -386,6 +404,7 @@ mod tests {
let e = Event {
id: "abc".to_owned(),
pubkey: "".to_owned(),
delegated_by: None,
created_at: 150,
kind: 0,
tags: Vec::new(),
@@ -407,6 +426,7 @@ mod tests {
let e = Event {
id: "abc".to_owned(),
pubkey: "".to_owned(),
delegated_by: None,
created_at: 50,
kind: 0,
tags: Vec::new(),
@@ -425,6 +445,7 @@ mod tests {
let e = Event {
id: "abc".to_owned(),
pubkey: "".to_owned(),
delegated_by: None,
created_at: 1001,
kind: 0,
tags: Vec::new(),
@@ -443,6 +464,7 @@ mod tests {
let e = Event {
id: "abc".to_owned(),
pubkey: "".to_owned(),
delegated_by: None,
created_at: 0,
kind: 0,
tags: Vec::new(),
@@ -461,6 +483,7 @@ mod tests {
let e = Event {
id: "123".to_owned(),
pubkey: "abc".to_owned(),
delegated_by: None,
created_at: 0,
kind: 0,
tags: Vec::new(),
@@ -479,6 +502,7 @@ mod tests {
let e = Event {
id: "123".to_owned(),
pubkey: "bcd".to_owned(),
delegated_by: None,
created_at: 0,
kind: 0,
tags: Vec::new(),
@@ -497,6 +521,7 @@ mod tests {
let e = Event {
id: "123".to_owned(),
pubkey: "xyz".to_owned(),
delegated_by: None,
created_at: 0,
kind: 0,
tags: Vec::new(),