mirror of
https://github.com/scsibug/nostr-rs-relay.git
synced 2025-09-01 03:40:46 -04:00
Compare commits
47 Commits
feat-43-de
...
0.7.5
Author | SHA1 | Date | |
---|---|---|---|
|
e8557d421b | ||
|
7ca9c864f2 | ||
|
838aafd079 | ||
|
e554b10ac2 | ||
|
b0bfaa48fc | ||
|
2e9b1b6ba7 | ||
|
4d9012d94c | ||
|
ffe7aac066 | ||
|
f9695bd0a9 | ||
|
7c4bf5cc8f | ||
|
e2de162931 | ||
|
4f606615eb | ||
|
84a58ebbcd | ||
|
c48e45686d | ||
|
bbe359364a | ||
|
9e9c494367 | ||
|
5fa24bc9f1 | ||
|
4de7490d97 | ||
|
d0f63dc66e | ||
|
06078648c8 | ||
|
cc0fcc5d66 | ||
|
dfb2096653 | ||
|
486508d192 | ||
|
84b43c144b | ||
|
110500bb46 | ||
|
83f6b11de7 | ||
|
6d1244434b | ||
|
5a91419d34 | ||
|
7adc5c9af7 | ||
|
9dd4571bee | ||
|
9db5a26b9c | ||
|
ac345b5744 | ||
|
675662c7fb | ||
|
505b0cb71f | ||
|
e8aa450802 | ||
|
5a8860bb09 | ||
|
11e43eccf9 | ||
|
50577b2dfa | ||
|
a6cb6f8486 | ||
|
ae5bf98d87 | ||
|
1cf9d719f0 | ||
|
311f4b5283 | ||
|
14b5a51e3a | ||
|
8ecce3f566 | ||
|
caffbbbede | ||
|
81045ad3d0 | ||
|
72f8a1aa5c |
562
Cargo.lock
generated
562
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
11
Cargo.toml
11
Cargo.toml
@@ -1,7 +1,15 @@
|
||||
[package]
|
||||
name = "nostr-rs-relay"
|
||||
version = "0.6.2"
|
||||
version = "0.7.5"
|
||||
edition = "2021"
|
||||
authors = ["Greg Heartsfield <scsibug@imap.cc>"]
|
||||
description = "A relay implementation for the Nostr protocol"
|
||||
readme = "README.md"
|
||||
homepage = "https://sr.ht/~gheartsfield/nostr-rs-relay/"
|
||||
repository = "https://git.sr.ht/~gheartsfield/nostr-rs-relay"
|
||||
license = "MIT"
|
||||
keywords = ["nostr", "server"]
|
||||
categories = ["network-programming", "web-programming"]
|
||||
|
||||
[dependencies]
|
||||
tracing = "0.1.36"
|
||||
@@ -32,6 +40,7 @@ http = { version = "0.2" }
|
||||
parse_duration = "2"
|
||||
rand = "0.8"
|
||||
const_format = "0.2.28"
|
||||
regex = "1"
|
||||
|
||||
[dev-dependencies]
|
||||
anyhow = "1"
|
||||
|
14
Dockerfile
14
Dockerfile
@@ -1,18 +1,24 @@
|
||||
FROM docker.io/library/rust:1.64.0@sha256:5cf09a76cb9baf4990d121221bbad64927cc5690ee54f246487e302ddc2ba300 as builder
|
||||
FROM docker.io/library/rust:1.66.0@sha256:359949280cebefe93ccb33089fe25111a3aadfe99eac4b6cbe8ec3e1b571dacb as builder
|
||||
|
||||
RUN USER=root cargo install cargo-auditable
|
||||
RUN USER=root cargo new --bin nostr-rs-relay
|
||||
WORKDIR ./nostr-rs-relay
|
||||
COPY ./Cargo.toml ./Cargo.toml
|
||||
COPY ./Cargo.lock ./Cargo.lock
|
||||
RUN cargo build --release
|
||||
# build dependencies only (caching)
|
||||
RUN cargo auditable build --release --locked
|
||||
# get rid of starter project code
|
||||
RUN rm src/*.rs
|
||||
|
||||
# copy project source code
|
||||
COPY ./src ./src
|
||||
|
||||
# build auditable release using locked deps
|
||||
RUN rm ./target/release/deps/nostr*relay*
|
||||
RUN cargo build --release
|
||||
RUN cargo auditable build --release --locked
|
||||
|
||||
FROM docker.io/library/debian:bullseye-20221205-slim@sha256:25f10b4f1ded5341a3ca0a30290ff3cd5639415f0c5a2222d5e7d5dd72952aa1
|
||||
|
||||
FROM docker.io/library/debian:bullseye-20221004-slim@sha256:8b702518a671c926b5ece4efe386a476eb4777646a36d996d4bd50944f2f11a2
|
||||
ARG APP=/usr/src/app
|
||||
ARG APP_DATA=/usr/src/app/db
|
||||
RUN apt-get update \
|
||||
|
37
README.md
37
README.md
@@ -1,8 +1,8 @@
|
||||
# [nostr-rs-relay](https://git.sr.ht/~gheartsfield/nostr-rs-relay)
|
||||
|
||||
This is a [nostr](https://github.com/nostr-protocol/nostr) relay, written in
|
||||
Rust. It currently supports the entire relay protocol, and has a
|
||||
SQLite persistence layer.
|
||||
This is a [nostr](https://github.com/nostr-protocol/nostr) relay,
|
||||
written in Rust. It currently supports the entire relay protocol, and
|
||||
persists data with SQLite.
|
||||
|
||||
The project master repository is available on
|
||||
[sourcehut](https://sr.ht/~gheartsfield/nostr-rs-relay/), and is
|
||||
@@ -26,7 +26,9 @@ mirrored on [GitHub](https://github.com/scsibug/nostr-rs-relay).
|
||||
- [x] NIP-12: [Generic Tag Queries](https://github.com/nostr-protocol/nips/blob/master/12.md)
|
||||
- [x] NIP-15: [End of Stored Events Notice](https://github.com/nostr-protocol/nips/blob/master/15.md)
|
||||
- [x] NIP-16: [Event Treatment](https://github.com/nostr-protocol/nips/blob/master/16.md)
|
||||
- [x] NIP-20: [Command Results](https://github.com/nostr-protocol/nips/blob/master/20.md)
|
||||
- [x] NIP-22: [Event `created_at` limits](https://github.com/nostr-protocol/nips/blob/master/22.md) (_future-dated events only_)
|
||||
- [x] NIP-26: [Event Delegation](https://github.com/nostr-protocol/nips/blob/master/26.md)
|
||||
|
||||
## Quick Start
|
||||
|
||||
@@ -35,15 +37,32 @@ application. Use a bind mount to store the SQLite database outside of
|
||||
the container image, and map the container's 8080 port to a host port
|
||||
(7000 in the example below).
|
||||
|
||||
The examples below start a rootless podman container, mapping a local
|
||||
data directory and config file.
|
||||
|
||||
```console
|
||||
$ docker build -t nostr-rs-relay .
|
||||
$ podman build -t nostr-rs-relay .
|
||||
|
||||
$ docker run -it -p 7000:8080 \
|
||||
--mount src=$(pwd)/data,target=/usr/src/app/db,type=bind nostr-rs-relay
|
||||
$ mkdir data
|
||||
|
||||
[2021-12-31T19:58:31Z INFO nostr_rs_relay] listening on: 0.0.0.0:8080
|
||||
[2021-12-31T19:58:31Z INFO nostr_rs_relay::db] opened database "/usr/src/app/db/nostr.db" for writing
|
||||
[2021-12-31T19:58:31Z INFO nostr_rs_relay::db] DB version = 2
|
||||
$ podman unshare chown 100:100 data
|
||||
|
||||
$ podman run -it --rm -p 7000:8080 \
|
||||
--user=100:100 \
|
||||
-v $(pwd)/data:/usr/src/app/db:Z \
|
||||
-v $(pwd)/config.toml:/usr/src/app/config.toml:ro,Z \
|
||||
--name nostr-relay nostr-rs-relay:latest
|
||||
|
||||
Nov 19 15:31:15.013 INFO nostr_rs_relay: Starting up from main
|
||||
Nov 19 15:31:15.017 INFO nostr_rs_relay::server: listening on: 0.0.0.0:8080
|
||||
Nov 19 15:31:15.019 INFO nostr_rs_relay::server: db writer created
|
||||
Nov 19 15:31:15.019 INFO nostr_rs_relay::server: control message listener started
|
||||
Nov 19 15:31:15.019 INFO nostr_rs_relay::db: Built a connection pool "event writer" (min=1, max=4)
|
||||
Nov 19 15:31:15.019 INFO nostr_rs_relay::db: opened database "/usr/src/app/db/nostr.db" for writing
|
||||
Nov 19 15:31:15.019 INFO nostr_rs_relay::schema: DB version = 0
|
||||
Nov 19 15:31:15.054 INFO nostr_rs_relay::schema: database pragma/schema initialized to v7, and ready
|
||||
Nov 19 15:31:15.054 INFO nostr_rs_relay::schema: All migration scripts completed successfully. Welcome to v7.
|
||||
Nov 19 15:31:15.521 INFO nostr_rs_relay::db: Built a connection pool "client query" (min=4, max=128)
|
||||
```
|
||||
|
||||
Use a `nostr` client such as
|
||||
|
@@ -46,6 +46,14 @@ address = "0.0.0.0"
|
||||
# Listen on this port
|
||||
port = 8080
|
||||
|
||||
# If present, read this HTTP header for logging client IP addresses.
|
||||
# Examples for common proxies, cloudflare:
|
||||
#remote_ip_header = "x-forwarded-for"
|
||||
#remote_ip_header = "cf-connecting-ip"
|
||||
|
||||
# Websocket ping interval in seconds, defaults to 5 minutes
|
||||
#ping_interval = 300
|
||||
|
||||
[options]
|
||||
# Reject events that have timestamps greater than this many seconds in
|
||||
# the future. Recommended to reject anything greater than 30 minutes
|
||||
|
@@ -28,9 +28,10 @@ pub struct Database {
|
||||
pub struct Network {
|
||||
pub port: u16,
|
||||
pub address: String,
|
||||
pub remote_ip_header: Option<String>, // retrieve client IP from this HTTP header if present
|
||||
pub ping_interval_seconds: u32,
|
||||
}
|
||||
|
||||
//
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[allow(unused)]
|
||||
pub struct Options {
|
||||
@@ -207,7 +208,9 @@ impl Default for Settings {
|
||||
},
|
||||
network: Network {
|
||||
port: 8080,
|
||||
ping_interval_seconds: 300,
|
||||
address: "0.0.0.0".to_owned(),
|
||||
remote_ip_header: None,
|
||||
},
|
||||
limits: Limits {
|
||||
messages_per_sec: None,
|
||||
|
40
src/conn.rs
40
src/conn.rs
@@ -2,7 +2,6 @@
|
||||
use crate::close::Close;
|
||||
use crate::error::Error;
|
||||
use crate::error::Result;
|
||||
use crate::event::Event;
|
||||
|
||||
use crate::subscription::Subscription;
|
||||
use std::collections::HashMap;
|
||||
@@ -14,6 +13,8 @@ const MAX_SUBSCRIPTION_ID_LEN: usize = 256;
|
||||
|
||||
/// State for a client connection
|
||||
pub struct ClientConn {
|
||||
/// Client IP (either from socket, or configured proxy header
|
||||
client_ip: String,
|
||||
/// Unique client identifier generated at connection time
|
||||
client_id: Uuid,
|
||||
/// The current set of active client subscriptions
|
||||
@@ -24,22 +25,27 @@ pub struct ClientConn {
|
||||
|
||||
impl Default for ClientConn {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
Self::new("unknown".to_owned())
|
||||
}
|
||||
}
|
||||
|
||||
impl ClientConn {
|
||||
/// Create a new, empty connection state.
|
||||
#[must_use]
|
||||
pub fn new() -> Self {
|
||||
pub fn new(client_ip: String) -> Self {
|
||||
let client_id = Uuid::new_v4();
|
||||
ClientConn {
|
||||
client_ip,
|
||||
client_id,
|
||||
subscriptions: HashMap::new(),
|
||||
max_subs: 32,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn subscriptions(&self) -> &HashMap<String, Subscription> {
|
||||
&self.subscriptions
|
||||
}
|
||||
|
||||
/// Get a short prefix of the client's unique identifier, suitable
|
||||
/// for logging.
|
||||
#[must_use]
|
||||
@@ -47,16 +53,9 @@ impl ClientConn {
|
||||
self.client_id.to_string().chars().take(8).collect()
|
||||
}
|
||||
|
||||
/// Find all matching subscriptions.
|
||||
#[must_use]
|
||||
pub fn get_matching_subscriptions(&self, e: &Event) -> Vec<&str> {
|
||||
let mut v: Vec<&str> = vec![];
|
||||
for (id, sub) in &self.subscriptions {
|
||||
if sub.interested_in_event(e) {
|
||||
v.push(id);
|
||||
}
|
||||
}
|
||||
v
|
||||
pub fn ip(&self) -> &str {
|
||||
&self.client_ip
|
||||
}
|
||||
|
||||
/// Add a new subscription for this connection.
|
||||
@@ -79,8 +78,12 @@ impl ClientConn {
|
||||
// check if an existing subscription exists, and replace if so
|
||||
if self.subscriptions.contains_key(&k) {
|
||||
self.subscriptions.remove(&k);
|
||||
self.subscriptions.insert(k, s);
|
||||
debug!("replaced existing subscription");
|
||||
self.subscriptions.insert(k, s.clone());
|
||||
debug!(
|
||||
"replaced existing subscription (cid: {}, sub: {:?})",
|
||||
self.get_client_prefix(),
|
||||
s.get_id()
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -91,8 +94,9 @@ impl ClientConn {
|
||||
// add subscription
|
||||
self.subscriptions.insert(k, s);
|
||||
debug!(
|
||||
"registered new subscription, currently have {} active subs",
|
||||
self.subscriptions.len()
|
||||
"registered new subscription, currently have {} active subs (cid: {})",
|
||||
self.subscriptions.len(),
|
||||
self.get_client_prefix(),
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
@@ -102,9 +106,9 @@ impl ClientConn {
|
||||
// TODO: return notice if subscription did not exist.
|
||||
self.subscriptions.remove(&c.id);
|
||||
debug!(
|
||||
"removed subscription, currently have {} active subs (cid={})",
|
||||
"removed subscription, currently have {} active subs (cid: {})",
|
||||
self.subscriptions.len(),
|
||||
self.client_id
|
||||
self.get_client_prefix(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
136
src/db.rs
136
src/db.rs
@@ -6,6 +6,7 @@ use crate::event::{single_char_tagname, Event};
|
||||
use crate::hexrange::hex_range;
|
||||
use crate::hexrange::HexSearch;
|
||||
use crate::nip05;
|
||||
use crate::notice::Notice;
|
||||
use crate::schema::{upgrade_db, STARTUP_SQL};
|
||||
use crate::subscription::ReqFilter;
|
||||
use crate::subscription::Subscription;
|
||||
@@ -32,7 +33,7 @@ pub type PooledConnection = r2d2::PooledConnection<r2d2_sqlite::SqliteConnection
|
||||
/// Events submitted from a client, with a return channel for notices
|
||||
pub struct SubmittedEvent {
|
||||
pub event: Event,
|
||||
pub notice_tx: tokio::sync::mpsc::Sender<String>,
|
||||
pub notice_tx: tokio::sync::mpsc::Sender<Notice>,
|
||||
}
|
||||
|
||||
/// Database file
|
||||
@@ -142,12 +143,15 @@ pub async fn db_writer(
|
||||
if next_event.is_none() {
|
||||
break;
|
||||
}
|
||||
// track if an event write occurred; this is used to
|
||||
// update the rate limiter
|
||||
let mut event_write = false;
|
||||
let subm_event = next_event.unwrap();
|
||||
let event = subm_event.event;
|
||||
let notice_tx = subm_event.notice_tx;
|
||||
// check if this event is authorized.
|
||||
if let Some(allowed_addrs) = whitelist {
|
||||
// TODO: incorporate delegated pubkeys
|
||||
// if the event address is not in allowed_addrs.
|
||||
if !allowed_addrs.contains(&event.pubkey) {
|
||||
info!(
|
||||
@@ -155,7 +159,10 @@ pub async fn db_writer(
|
||||
event.get_event_id_prefix()
|
||||
);
|
||||
notice_tx
|
||||
.try_send("pubkey is not allowed to publish to this relay".to_owned())
|
||||
.try_send(Notice::blocked(
|
||||
event.id,
|
||||
"pubkey is not allowed to publish to this relay",
|
||||
))
|
||||
.ok();
|
||||
continue;
|
||||
}
|
||||
@@ -186,10 +193,10 @@ pub async fn db_writer(
|
||||
event.get_author_prefix()
|
||||
);
|
||||
notice_tx
|
||||
.try_send(
|
||||
"NIP-05 verification is no longer valid (expired/wrong domain)"
|
||||
.to_owned(),
|
||||
)
|
||||
.try_send(Notice::blocked(
|
||||
event.id,
|
||||
"NIP-05 verification is no longer valid (expired/wrong domain)",
|
||||
))
|
||||
.ok();
|
||||
continue;
|
||||
}
|
||||
@@ -200,7 +207,10 @@ pub async fn db_writer(
|
||||
event.get_author_prefix()
|
||||
);
|
||||
notice_tx
|
||||
.try_send("NIP-05 verification needed to publish events".to_owned())
|
||||
.try_send(Notice::blocked(
|
||||
event.id,
|
||||
"NIP-05 verification needed to publish events",
|
||||
))
|
||||
.ok();
|
||||
continue;
|
||||
}
|
||||
@@ -213,22 +223,23 @@ pub async fn db_writer(
|
||||
// TODO: cache recent list of authors to remove a DB call.
|
||||
let start = Instant::now();
|
||||
if event.kind >= 20000 && event.kind < 30000 {
|
||||
bcast_tx.send(event.clone()).ok();
|
||||
info!(
|
||||
"published ephemeral event {:?} from {:?} in {:?}",
|
||||
"published ephemeral event: {:?} from: {:?} in: {:?}",
|
||||
event.get_event_id_prefix(),
|
||||
event.get_author_prefix(),
|
||||
start.elapsed()
|
||||
);
|
||||
bcast_tx.send(event.clone()).ok();
|
||||
event_write = true
|
||||
} else {
|
||||
match write_event(&mut pool.get()?, &event) {
|
||||
Ok(updated) => {
|
||||
if updated == 0 {
|
||||
trace!("ignoring duplicate or deleted event");
|
||||
notice_tx.try_send(Notice::duplicate(event.id)).ok();
|
||||
} else {
|
||||
info!(
|
||||
"persisted event {:?} from {:?} in {:?}",
|
||||
"persisted event: {:?} from: {:?} in: {:?}",
|
||||
event.get_event_id_prefix(),
|
||||
event.get_author_prefix(),
|
||||
start.elapsed()
|
||||
@@ -236,16 +247,13 @@ pub async fn db_writer(
|
||||
event_write = true;
|
||||
// send this out to all clients
|
||||
bcast_tx.send(event.clone()).ok();
|
||||
notice_tx.try_send(Notice::saved(event.id)).ok();
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("event insert failed: {:?}", err);
|
||||
notice_tx
|
||||
.try_send(
|
||||
"relay experienced an error trying to publish the latest event"
|
||||
.to_owned(),
|
||||
)
|
||||
.ok();
|
||||
let msg = "relay experienced an error trying to publish the latest event";
|
||||
notice_tx.try_send(Notice::error(event.id, msg)).ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -284,12 +292,13 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
|
||||
let tx = conn.transaction()?;
|
||||
// get relevant fields from event and convert to blobs.
|
||||
let id_blob = hex::decode(&e.id).ok();
|
||||
let pubkey_blob = hex::decode(&e.pubkey).ok();
|
||||
let pubkey_blob: Option<Vec<u8>> = hex::decode(&e.pubkey).ok();
|
||||
let delegator_blob: Option<Vec<u8>> = e.delegated_by.as_ref().and_then(|d| hex::decode(d).ok());
|
||||
let event_str = serde_json::to_string(&e).ok();
|
||||
// ignore if the event hash is a duplicate.
|
||||
let mut ins_count = tx.execute(
|
||||
"INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, strftime('%s','now'), FALSE);",
|
||||
params![id_blob, e.created_at, e.kind, pubkey_blob, event_str]
|
||||
"INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, delegated_by, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%s','now'), FALSE);",
|
||||
params![id_blob, e.created_at, e.kind, pubkey_blob, delegator_blob, event_str]
|
||||
)?;
|
||||
if ins_count == 0 {
|
||||
// if the event was a duplicate, no need to insert event or
|
||||
@@ -312,7 +321,7 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
|
||||
if is_lower_hex(tagval) && (tagval.len() % 2 == 0) {
|
||||
tx.execute(
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
|
||||
params![ev_id, &tagname, hex::decode(&tagval).ok()],
|
||||
params![ev_id, &tagname, hex::decode(tagval).ok()],
|
||||
)?;
|
||||
} else {
|
||||
tx.execute(
|
||||
@@ -426,7 +435,7 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
return (empty_query, empty_params);
|
||||
}
|
||||
|
||||
let mut query = "SELECT DISTINCT(e.content), e.created_at FROM event e ".to_owned();
|
||||
let mut query = "SELECT DISTINCT(e.content), e.created_at FROM event e".to_owned();
|
||||
// query parameters for SQLite
|
||||
let mut params: Vec<Box<dyn ToSql>> = vec![];
|
||||
|
||||
@@ -439,16 +448,22 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
for auth in authvec {
|
||||
match hex_range(auth) {
|
||||
Some(HexSearch::Exact(ex)) => {
|
||||
auth_searches.push("author=?".to_owned());
|
||||
auth_searches.push("author=? OR delegated_by=?".to_owned());
|
||||
params.push(Box::new(ex.clone()));
|
||||
params.push(Box::new(ex));
|
||||
}
|
||||
Some(HexSearch::Range(lower, upper)) => {
|
||||
auth_searches.push("(author>? AND author<?)".to_owned());
|
||||
auth_searches.push(
|
||||
"(author>? AND author<?) OR (delegated_by>? AND delegated_by<?)".to_owned(),
|
||||
);
|
||||
params.push(Box::new(lower.clone()));
|
||||
params.push(Box::new(upper.clone()));
|
||||
params.push(Box::new(lower));
|
||||
params.push(Box::new(upper));
|
||||
}
|
||||
Some(HexSearch::LowerOnly(lower)) => {
|
||||
auth_searches.push("author>?".to_owned());
|
||||
auth_searches.push("author>? OR delegated_by>?".to_owned());
|
||||
params.push(Box::new(lower.clone()));
|
||||
params.push(Box::new(lower));
|
||||
}
|
||||
None => {
|
||||
@@ -456,8 +471,14 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
}
|
||||
}
|
||||
}
|
||||
let authors_clause = format!("({})", auth_searches.join(" OR "));
|
||||
filter_components.push(authors_clause);
|
||||
if !authvec.is_empty() {
|
||||
let authors_clause = format!("({})", auth_searches.join(" OR "));
|
||||
filter_components.push(authors_clause);
|
||||
} else {
|
||||
// if the authors list was empty, we should never return
|
||||
// any results.
|
||||
filter_components.push("false".to_owned());
|
||||
}
|
||||
}
|
||||
// Query for Kind
|
||||
if let Some(ks) = &f.kinds {
|
||||
@@ -490,8 +511,14 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
}
|
||||
}
|
||||
}
|
||||
let id_clause = format!("({})", id_searches.join(" OR "));
|
||||
filter_components.push(id_clause);
|
||||
if !idvec.is_empty() {
|
||||
let id_clause = format!("({})", id_searches.join(" OR "));
|
||||
filter_components.push(id_clause);
|
||||
} else {
|
||||
// if the ids list was empty, we should never return
|
||||
// any results.
|
||||
filter_components.push("false".to_owned());
|
||||
}
|
||||
}
|
||||
// Query for tags
|
||||
if let Some(map) = &f.tags {
|
||||
@@ -500,7 +527,7 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
let mut blob_vals: Vec<Box<dyn ToSql>> = vec![];
|
||||
for v in val {
|
||||
if (v.len() % 2 == 0) && is_lower_hex(v) {
|
||||
if let Ok(h) = hex::decode(&v) {
|
||||
if let Ok(h) = hex::decode(v) {
|
||||
blob_vals.push(Box::new(h));
|
||||
}
|
||||
} else {
|
||||
@@ -567,10 +594,18 @@ fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
.map(|s| format!("SELECT content, created_at FROM ({})", s))
|
||||
.collect();
|
||||
let query: String = subqueries_selects.join(" UNION ");
|
||||
debug!("final query string: {}", query);
|
||||
(query, params)
|
||||
}
|
||||
|
||||
fn log_pool_stats(pool: &SqlitePool) {
|
||||
let state: r2d2::State = pool.state();
|
||||
let in_use_cxns = state.connections - state.idle_connections;
|
||||
debug!(
|
||||
"DB pool usage (in_use: {}, available: {})",
|
||||
in_use_cxns, state.connections
|
||||
);
|
||||
}
|
||||
|
||||
/// Perform a database query using a subscription.
|
||||
///
|
||||
/// The [`Subscription`] is converted into a SQL query. Each result
|
||||
@@ -585,14 +620,15 @@ pub async fn db_query(
|
||||
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
|
||||
) {
|
||||
task::spawn_blocking(move || {
|
||||
debug!("going to query for: {:?}", sub);
|
||||
let mut row_count: usize = 0;
|
||||
let start = Instant::now();
|
||||
// generate SQL query
|
||||
let (q, p) = query_from_sub(&sub);
|
||||
debug!("SQL generated in {:?}", start.elapsed());
|
||||
trace!("SQL generated in {:?}", start.elapsed());
|
||||
// show pool stats
|
||||
debug!("DB pool stats: {:?}", pool.state());
|
||||
log_pool_stats(&pool);
|
||||
// cutoff for displaying slow queries
|
||||
let slow_cutoff = Duration::from_millis(1000);
|
||||
let start = Instant::now();
|
||||
if let Ok(conn) = pool.get() {
|
||||
// execute the query. Don't cache, since queries vary so much.
|
||||
@@ -601,18 +637,36 @@ pub async fn db_query(
|
||||
let mut first_result = true;
|
||||
while let Some(row) = event_rows.next()? {
|
||||
if first_result {
|
||||
let first_result_elapsed = start.elapsed();
|
||||
// logging for slow queries; show sub and SQL
|
||||
if first_result_elapsed >= slow_cutoff {
|
||||
info!(
|
||||
"going to query for: {:?} (cid: {}, sub: {:?})",
|
||||
sub, client_id, sub.id
|
||||
);
|
||||
info!(
|
||||
"final query string (slow): {} (cid: {}, sub: {:?})",
|
||||
q, client_id, sub.id
|
||||
);
|
||||
} else {
|
||||
trace!(
|
||||
"going to query for: {:?} (cid: {}, sub: {:?})",
|
||||
sub,
|
||||
client_id,
|
||||
sub.id
|
||||
);
|
||||
trace!("final query string: {}", q);
|
||||
}
|
||||
debug!(
|
||||
"time to first result: {:?} (cid={}, sub={:?})",
|
||||
start.elapsed(),
|
||||
client_id,
|
||||
sub.id
|
||||
"first result in {:?} (cid: {}, sub: {:?})",
|
||||
first_result_elapsed, client_id, sub.id
|
||||
);
|
||||
first_result = false;
|
||||
}
|
||||
// check if this is still active
|
||||
// TODO: check every N rows
|
||||
if abandon_query_rx.try_recv().is_ok() {
|
||||
debug!("query aborted (sub={:?})", sub.id);
|
||||
debug!("query aborted (cid: {}, sub: {:?})", client_id, sub.id);
|
||||
return Ok(());
|
||||
}
|
||||
row_count += 1;
|
||||
@@ -631,11 +685,11 @@ pub async fn db_query(
|
||||
})
|
||||
.ok();
|
||||
debug!(
|
||||
"query completed ({} rows) in {:?} (cid={}, sub={:?})",
|
||||
row_count,
|
||||
"query completed in {:?} (cid: {}, sub: {:?}, rows: {})",
|
||||
start.elapsed(),
|
||||
client_id,
|
||||
sub.id
|
||||
sub.id,
|
||||
row_count
|
||||
);
|
||||
} else {
|
||||
warn!("Could not get a database connection for querying");
|
||||
|
416
src/delegation.rs
Normal file
416
src/delegation.rs
Normal file
@@ -0,0 +1,416 @@
|
||||
//! Event parsing and validation
|
||||
use crate::error::Error;
|
||||
use crate::error::Result;
|
||||
use crate::event::Event;
|
||||
use bitcoin_hashes::{sha256, Hash};
|
||||
use lazy_static::lazy_static;
|
||||
use regex::Regex;
|
||||
use secp256k1::{schnorr, Secp256k1, VerifyOnly, XOnlyPublicKey};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::str::FromStr;
|
||||
use tracing::{debug, info};
|
||||
|
||||
// This handles everything related to delegation, in particular the
|
||||
// condition/rune parsing and logic.
|
||||
|
||||
// Conditions are poorly specified, so we will implement the minimum
|
||||
// necessary for now.
|
||||
|
||||
// fields MUST be either "kind" or "created_at".
|
||||
// operators supported are ">", "<", "=", "!".
|
||||
// no operations on 'content' are supported.
|
||||
|
||||
// this allows constraints for:
|
||||
// valid date ranges (valid from X->Y dates).
|
||||
// specific kinds (publish kind=1,5)
|
||||
// kind ranges (publish ephemeral events, kind>19999&kind<30001)
|
||||
|
||||
// for more complex scenarios (allow delegatee to publish ephemeral
|
||||
// AND replacement events), it may be necessary to generate and use
|
||||
// different condition strings, since we do not support grouping or
|
||||
// "OR" logic.
|
||||
|
||||
lazy_static! {
|
||||
/// Secp256k1 verification instance.
|
||||
pub static ref SECP: Secp256k1<VerifyOnly> = Secp256k1::verification_only();
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
|
||||
pub enum Field {
|
||||
Kind,
|
||||
CreatedAt,
|
||||
}
|
||||
|
||||
impl FromStr for Field {
|
||||
type Err = Error;
|
||||
fn from_str(value: &str) -> Result<Self, Self::Err> {
|
||||
if value == "kind" {
|
||||
Ok(Field::Kind)
|
||||
} else if value == "created_at" {
|
||||
Ok(Field::CreatedAt)
|
||||
} else {
|
||||
Err(Error::DelegationParseError)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
|
||||
pub enum Operator {
|
||||
LessThan,
|
||||
GreaterThan,
|
||||
Equals,
|
||||
NotEquals,
|
||||
}
|
||||
impl FromStr for Operator {
|
||||
type Err = Error;
|
||||
fn from_str(value: &str) -> Result<Self, Self::Err> {
|
||||
if value == "<" {
|
||||
Ok(Operator::LessThan)
|
||||
} else if value == ">" {
|
||||
Ok(Operator::GreaterThan)
|
||||
} else if value == "=" {
|
||||
Ok(Operator::Equals)
|
||||
} else if value == "!" {
|
||||
Ok(Operator::NotEquals)
|
||||
} else {
|
||||
Err(Error::DelegationParseError)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
|
||||
pub struct ConditionQuery {
|
||||
pub(crate) conditions: Vec<Condition>,
|
||||
}
|
||||
|
||||
impl ConditionQuery {
|
||||
pub fn allows_event(&self, event: &Event) -> bool {
|
||||
// check each condition, to ensure that the event complies
|
||||
// with the restriction.
|
||||
for c in &self.conditions {
|
||||
if !c.allows_event(event) {
|
||||
// any failing conditions invalidates the delegation
|
||||
// on this event
|
||||
return false;
|
||||
}
|
||||
}
|
||||
// delegation was permitted unconditionally, or all conditions
|
||||
// were true
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
// Verify that the delegator approved the delegation; return a ConditionQuery if so.
|
||||
pub fn validate_delegation(
|
||||
delegator: &str,
|
||||
delegatee: &str,
|
||||
cond_query: &str,
|
||||
sigstr: &str,
|
||||
) -> Option<ConditionQuery> {
|
||||
// form the token
|
||||
let tok = format!("nostr:delegation:{}:{}", delegatee, cond_query);
|
||||
// form SHA256 hash
|
||||
let digest: sha256::Hash = sha256::Hash::hash(tok.as_bytes());
|
||||
let sig = schnorr::Signature::from_str(sigstr).unwrap();
|
||||
if let Ok(msg) = secp256k1::Message::from_slice(digest.as_ref()) {
|
||||
if let Ok(pubkey) = XOnlyPublicKey::from_str(delegator) {
|
||||
let verify = SECP.verify_schnorr(&sig, &msg, &pubkey);
|
||||
if verify.is_ok() {
|
||||
// return the parsed condition query
|
||||
cond_query.parse::<ConditionQuery>().ok()
|
||||
} else {
|
||||
debug!("client sent an delegation signature that did not validate");
|
||||
None
|
||||
}
|
||||
} else {
|
||||
debug!("client sent malformed delegation pubkey");
|
||||
None
|
||||
}
|
||||
} else {
|
||||
info!("error converting delegation digest to secp256k1 message");
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Parsed delegation condition
|
||||
/// see https://github.com/nostr-protocol/nips/pull/28#pullrequestreview-1084903800
|
||||
/// An example complex condition would be: kind=1,2,3&created_at<1665265999
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
|
||||
pub struct Condition {
|
||||
pub(crate) field: Field,
|
||||
pub(crate) operator: Operator,
|
||||
pub(crate) values: Vec<u64>,
|
||||
}
|
||||
|
||||
impl Condition {
|
||||
/// Check if this condition allows the given event to be delegated
|
||||
pub fn allows_event(&self, event: &Event) -> bool {
|
||||
// determine what the right-hand side of the operator is
|
||||
let resolved_field = match &self.field {
|
||||
Field::Kind => event.kind,
|
||||
Field::CreatedAt => event.created_at,
|
||||
};
|
||||
match &self.operator {
|
||||
Operator::LessThan => {
|
||||
// the less-than operator is only valid for single values.
|
||||
if self.values.len() == 1 {
|
||||
if let Some(v) = self.values.first() {
|
||||
return resolved_field < *v;
|
||||
}
|
||||
}
|
||||
}
|
||||
Operator::GreaterThan => {
|
||||
// the greater-than operator is only valid for single values.
|
||||
if self.values.len() == 1 {
|
||||
if let Some(v) = self.values.first() {
|
||||
return resolved_field > *v;
|
||||
}
|
||||
}
|
||||
}
|
||||
Operator::Equals => {
|
||||
// equals is interpreted as "must be equal to at least one provided value"
|
||||
return self.values.iter().any(|&x| resolved_field == x);
|
||||
}
|
||||
Operator::NotEquals => {
|
||||
// not-equals is interpreted as "must not be equal to any provided value"
|
||||
// this is the one case where an empty list of values could be allowed; even though it is a pointless restriction.
|
||||
return self.values.iter().all(|&x| resolved_field != x);
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn str_to_condition(cs: &str) -> Option<Condition> {
|
||||
// a condition is a string (alphanum+underscore), an operator (<>=!), and values (num+comma)
|
||||
lazy_static! {
|
||||
static ref RE: Regex = Regex::new("([[:word:]]+)([<>=!]+)([,[[:digit:]]]*)").unwrap();
|
||||
}
|
||||
// match against the regex
|
||||
let caps = RE.captures(cs)?;
|
||||
let field = caps.get(1)?.as_str().parse::<Field>().ok()?;
|
||||
let operator = caps.get(2)?.as_str().parse::<Operator>().ok()?;
|
||||
// values are just comma separated numbers, but all must be parsed
|
||||
let rawvals = caps.get(3)?.as_str();
|
||||
let values = rawvals
|
||||
.split_terminator(',')
|
||||
.map(|n| n.parse::<u64>().ok())
|
||||
.collect::<Option<Vec<_>>>()?;
|
||||
// convert field string into Field
|
||||
Some(Condition {
|
||||
field,
|
||||
operator,
|
||||
values,
|
||||
})
|
||||
}
|
||||
|
||||
/// Parse a condition query from a string slice
|
||||
impl FromStr for ConditionQuery {
|
||||
type Err = Error;
|
||||
fn from_str(value: &str) -> Result<Self, Self::Err> {
|
||||
// split the string with '&'
|
||||
let mut conditions = vec![];
|
||||
let condstrs = value.split_terminator('&');
|
||||
// parse each individual condition
|
||||
for c in condstrs {
|
||||
conditions.push(str_to_condition(c).ok_or(Error::DelegationParseError)?);
|
||||
}
|
||||
Ok(ConditionQuery { conditions })
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
// parse condition strings
|
||||
#[test]
|
||||
fn parse_empty() -> Result<()> {
|
||||
// given an empty condition query, produce an empty vector
|
||||
let empty_cq = ConditionQuery { conditions: vec![] };
|
||||
let parsed = "".parse::<ConditionQuery>()?;
|
||||
assert_eq!(parsed, empty_cq);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// parse field 'kind'
|
||||
#[test]
|
||||
fn test_kind_field_parse() -> Result<()> {
|
||||
let field = "kind".parse::<Field>()?;
|
||||
assert_eq!(field, Field::Kind);
|
||||
Ok(())
|
||||
}
|
||||
// parse field 'created_at'
|
||||
#[test]
|
||||
fn test_created_at_field_parse() -> Result<()> {
|
||||
let field = "created_at".parse::<Field>()?;
|
||||
assert_eq!(field, Field::CreatedAt);
|
||||
Ok(())
|
||||
}
|
||||
// parse unknown field
|
||||
#[test]
|
||||
fn unknown_field_parse() {
|
||||
let field = "unk".parse::<Field>();
|
||||
assert!(field.is_err());
|
||||
}
|
||||
|
||||
// parse a full conditional query with an empty array
|
||||
#[test]
|
||||
fn parse_kind_equals_empty() -> Result<()> {
|
||||
// given an empty condition query, produce an empty vector
|
||||
let kind_cq = ConditionQuery {
|
||||
conditions: vec![Condition {
|
||||
field: Field::Kind,
|
||||
operator: Operator::Equals,
|
||||
values: vec![],
|
||||
}],
|
||||
};
|
||||
let parsed = "kind=".parse::<ConditionQuery>()?;
|
||||
assert_eq!(parsed, kind_cq);
|
||||
Ok(())
|
||||
}
|
||||
// parse a full conditional query with a single value
|
||||
#[test]
|
||||
fn parse_kind_equals_singleval() -> Result<()> {
|
||||
// given an empty condition query, produce an empty vector
|
||||
let kind_cq = ConditionQuery {
|
||||
conditions: vec![Condition {
|
||||
field: Field::Kind,
|
||||
operator: Operator::Equals,
|
||||
values: vec![1],
|
||||
}],
|
||||
};
|
||||
let parsed = "kind=1".parse::<ConditionQuery>()?;
|
||||
assert_eq!(parsed, kind_cq);
|
||||
Ok(())
|
||||
}
|
||||
// parse a full conditional query with multiple values
|
||||
#[test]
|
||||
fn parse_kind_equals_multival() -> Result<()> {
|
||||
// given an empty condition query, produce an empty vector
|
||||
let kind_cq = ConditionQuery {
|
||||
conditions: vec![Condition {
|
||||
field: Field::Kind,
|
||||
operator: Operator::Equals,
|
||||
values: vec![1, 2, 4],
|
||||
}],
|
||||
};
|
||||
let parsed = "kind=1,2,4".parse::<ConditionQuery>()?;
|
||||
assert_eq!(parsed, kind_cq);
|
||||
Ok(())
|
||||
}
|
||||
// parse multiple conditions
|
||||
#[test]
|
||||
fn parse_multi_conditions() -> Result<()> {
|
||||
// given an empty condition query, produce an empty vector
|
||||
let cq = ConditionQuery {
|
||||
conditions: vec![
|
||||
Condition {
|
||||
field: Field::Kind,
|
||||
operator: Operator::GreaterThan,
|
||||
values: vec![10000],
|
||||
},
|
||||
Condition {
|
||||
field: Field::Kind,
|
||||
operator: Operator::LessThan,
|
||||
values: vec![20000],
|
||||
},
|
||||
Condition {
|
||||
field: Field::Kind,
|
||||
operator: Operator::NotEquals,
|
||||
values: vec![10001],
|
||||
},
|
||||
Condition {
|
||||
field: Field::CreatedAt,
|
||||
operator: Operator::LessThan,
|
||||
values: vec![1665867123],
|
||||
},
|
||||
],
|
||||
};
|
||||
let parsed =
|
||||
"kind>10000&kind<20000&kind!10001&created_at<1665867123".parse::<ConditionQuery>()?;
|
||||
assert_eq!(parsed, cq);
|
||||
Ok(())
|
||||
}
|
||||
fn simple_event() -> Event {
|
||||
Event {
|
||||
id: "0".to_owned(),
|
||||
pubkey: "0".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 0,
|
||||
kind: 0,
|
||||
tags: vec![],
|
||||
content: "".to_owned(),
|
||||
sig: "0".to_owned(),
|
||||
tagidx: None,
|
||||
}
|
||||
}
|
||||
// Check for condition logic on event w/ empty values
|
||||
#[test]
|
||||
fn condition_with_empty_values() {
|
||||
let mut c = Condition {
|
||||
field: Field::Kind,
|
||||
operator: Operator::GreaterThan,
|
||||
values: vec![],
|
||||
};
|
||||
let e = simple_event();
|
||||
assert!(!c.allows_event(&e));
|
||||
c.operator = Operator::LessThan;
|
||||
assert!(!c.allows_event(&e));
|
||||
c.operator = Operator::Equals;
|
||||
assert!(!c.allows_event(&e));
|
||||
// Not Equals applied to an empty list *is* allowed
|
||||
// (pointless, but logically valid).
|
||||
c.operator = Operator::NotEquals;
|
||||
assert!(c.allows_event(&e));
|
||||
}
|
||||
|
||||
// Check for condition logic on event w/ single value
|
||||
#[test]
|
||||
fn condition_kind_gt_event_single() {
|
||||
let c = Condition {
|
||||
field: Field::Kind,
|
||||
operator: Operator::GreaterThan,
|
||||
values: vec![10],
|
||||
};
|
||||
let mut e = simple_event();
|
||||
// kind is not greater than 10, not allowed
|
||||
e.kind = 1;
|
||||
assert!(!c.allows_event(&e));
|
||||
// kind is greater than 10, allowed
|
||||
e.kind = 100;
|
||||
assert!(c.allows_event(&e));
|
||||
// kind is 10, not allowed
|
||||
e.kind = 10;
|
||||
assert!(!c.allows_event(&e));
|
||||
}
|
||||
// Check for condition logic on event w/ multi values
|
||||
#[test]
|
||||
fn condition_with_multi_values() {
|
||||
let mut c = Condition {
|
||||
field: Field::Kind,
|
||||
operator: Operator::Equals,
|
||||
values: vec![0, 10, 20],
|
||||
};
|
||||
let mut e = simple_event();
|
||||
// Allow if event kind is in list for Equals
|
||||
e.kind = 10;
|
||||
assert!(c.allows_event(&e));
|
||||
// Deny if event kind is not in list for Equals
|
||||
e.kind = 11;
|
||||
assert!(!c.allows_event(&e));
|
||||
// Deny if event kind is in list for NotEquals
|
||||
e.kind = 10;
|
||||
c.operator = Operator::NotEquals;
|
||||
assert!(!c.allows_event(&e));
|
||||
// Allow if event kind is not in list for NotEquals
|
||||
e.kind = 99;
|
||||
c.operator = Operator::NotEquals;
|
||||
assert!(c.allows_event(&e));
|
||||
// Always deny if GreaterThan/LessThan for a list
|
||||
c.operator = Operator::LessThan;
|
||||
assert!(!c.allows_event(&e));
|
||||
c.operator = Operator::GreaterThan;
|
||||
assert!(!c.allows_event(&e));
|
||||
}
|
||||
}
|
14
src/error.rs
14
src/error.rs
@@ -17,10 +17,16 @@ pub enum Error {
|
||||
ConnWriteError,
|
||||
#[error("EVENT parse failed")]
|
||||
EventParseFailed,
|
||||
#[error("ClOSE message parse failed")]
|
||||
#[error("CLOSE message parse failed")]
|
||||
CloseParseFailed,
|
||||
#[error("Event validation failed")]
|
||||
EventInvalid,
|
||||
#[error("Event invalid signature")]
|
||||
EventInvalidSignature,
|
||||
#[error("Event invalid id")]
|
||||
EventInvalidId,
|
||||
#[error("Event malformed pubkey")]
|
||||
EventMalformedPubkey,
|
||||
#[error("Event could not canonicalize")]
|
||||
EventCouldNotCanonicalize,
|
||||
#[error("Event too large")]
|
||||
EventMaxLengthError(usize),
|
||||
#[error("Subscription identifier max length exceeded")]
|
||||
@@ -50,6 +56,8 @@ pub enum Error {
|
||||
HyperError(hyper::Error),
|
||||
#[error("Hex encoding error")]
|
||||
HexError(hex::FromHexError),
|
||||
#[error("Delegation parse error")]
|
||||
DelegationParseError,
|
||||
#[error("Unknown/Undocumented")]
|
||||
UnknownError,
|
||||
}
|
||||
|
113
src/event.rs
113
src/event.rs
@@ -1,4 +1,5 @@
|
||||
//! Event parsing and validation
|
||||
use crate::delegation::validate_delegation;
|
||||
use crate::error::Error::*;
|
||||
use crate::error::Result;
|
||||
use crate::nip05;
|
||||
@@ -26,11 +27,19 @@ pub struct EventCmd {
|
||||
event: Event,
|
||||
}
|
||||
|
||||
impl EventCmd {
|
||||
pub fn event_id(&self) -> &str {
|
||||
&self.event.id
|
||||
}
|
||||
}
|
||||
|
||||
/// Parsed nostr event.
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
|
||||
pub struct Event {
|
||||
pub id: String,
|
||||
pub(crate) pubkey: String,
|
||||
#[serde(skip)]
|
||||
pub(crate) delegated_by: Option<String>,
|
||||
pub(crate) created_at: u64,
|
||||
pub(crate) kind: u64,
|
||||
#[serde(deserialize_with = "tag_from_string")]
|
||||
@@ -80,12 +89,13 @@ impl From<EventCmd> for Result<Event> {
|
||||
// ensure command is correct
|
||||
if ec.cmd != "EVENT" {
|
||||
Err(CommandUnknownError)
|
||||
} else if ec.event.is_valid() {
|
||||
let mut e = ec.event;
|
||||
e.build_index();
|
||||
Ok(e)
|
||||
} else {
|
||||
Err(EventInvalid)
|
||||
ec.event.validate().map(|_| {
|
||||
let mut e = ec.event;
|
||||
e.build_index();
|
||||
e.update_delegation();
|
||||
e
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -110,6 +120,50 @@ impl Event {
|
||||
None
|
||||
}
|
||||
|
||||
// is this event delegated (properly)?
|
||||
// does the signature match, and are conditions valid?
|
||||
// if so, return an alternate author for the event
|
||||
pub fn delegated_author(&self) -> Option<String> {
|
||||
// is there a delegation tag?
|
||||
let delegation_tag: Vec<String> = self
|
||||
.tags
|
||||
.iter()
|
||||
.filter(|x| x.len() == 4)
|
||||
.filter(|x| x.get(0).unwrap() == "delegation")
|
||||
.take(1)
|
||||
.next()?
|
||||
.to_vec(); // get first tag
|
||||
|
||||
//let delegation_tag = self.tag_values_by_name("delegation");
|
||||
// delegation tags should have exactly 3 elements after the name (pubkey, condition, sig)
|
||||
// the event is signed by the delagatee
|
||||
let delegatee = &self.pubkey;
|
||||
// the delegation tag references the claimed delagator
|
||||
let delegator: &str = delegation_tag.get(1)?;
|
||||
let querystr: &str = delegation_tag.get(2)?;
|
||||
let sig: &str = delegation_tag.get(3)?;
|
||||
|
||||
// attempt to get a condition query; this requires the delegation to have a valid signature.
|
||||
if let Some(cond_query) = validate_delegation(delegator, delegatee, querystr, sig) {
|
||||
// The signature was valid, now we ensure the delegation
|
||||
// condition is valid for this event:
|
||||
if cond_query.allows_event(self) {
|
||||
// since this is allowed, we will provide the delegatee
|
||||
Some(delegator.into())
|
||||
} else {
|
||||
debug!("an event failed to satisfy delegation conditions");
|
||||
None
|
||||
}
|
||||
} else {
|
||||
debug!("event had had invalid delegation signature");
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Update delegation status
|
||||
fn update_delegation(&mut self) {
|
||||
self.delegated_by = self.delegated_author();
|
||||
}
|
||||
/// Build an event tag index
|
||||
fn build_index(&mut self) {
|
||||
// if there are no tags; just leave the index as None
|
||||
@@ -145,7 +199,7 @@ impl Event {
|
||||
self.pubkey.chars().take(8).collect()
|
||||
}
|
||||
|
||||
/// Retrieve tag values
|
||||
/// Retrieve tag initial values across all tags matching the name
|
||||
pub fn tag_values_by_name(&self, tag_name: &str) -> Vec<String> {
|
||||
self.tags
|
||||
.iter()
|
||||
@@ -172,7 +226,7 @@ impl Event {
|
||||
}
|
||||
|
||||
/// Check if this event has a valid signature.
|
||||
fn is_valid(&self) -> bool {
|
||||
fn validate(&self) -> Result<()> {
|
||||
// TODO: return a Result with a reason for invalid events
|
||||
// validation is performed by:
|
||||
// * parsing JSON string into event fields
|
||||
@@ -181,8 +235,8 @@ impl Event {
|
||||
// * serialize with no spaces/newlines
|
||||
let c_opt = self.to_canonical();
|
||||
if c_opt.is_none() {
|
||||
debug!("event could not be canonicalized");
|
||||
return false;
|
||||
debug!("could not canonicalize");
|
||||
return Err(EventCouldNotCanonicalize);
|
||||
}
|
||||
let c = c_opt.unwrap();
|
||||
// * compute the sha256sum.
|
||||
@@ -191,21 +245,21 @@ impl Event {
|
||||
// * ensure the id matches the computed sha256sum.
|
||||
if self.id != hex_digest {
|
||||
debug!("event id does not match digest");
|
||||
return false;
|
||||
return Err(EventInvalidId);
|
||||
}
|
||||
// * validate the message digest (sig) using the pubkey & computed sha256 message hash.
|
||||
let sig = schnorr::Signature::from_str(&self.sig).unwrap();
|
||||
if let Ok(msg) = secp256k1::Message::from_slice(digest.as_ref()) {
|
||||
if let Ok(pubkey) = XOnlyPublicKey::from_str(&self.pubkey) {
|
||||
let verify = SECP.verify_schnorr(&sig, &msg, &pubkey);
|
||||
matches!(verify, Ok(()))
|
||||
SECP.verify_schnorr(&sig, &msg, &pubkey)
|
||||
.map_err(|_| EventInvalidSignature)
|
||||
} else {
|
||||
debug!("client sent malformed pubkey");
|
||||
false
|
||||
Err(EventMalformedPubkey)
|
||||
}
|
||||
} else {
|
||||
info!("error converting digest to secp256k1 message");
|
||||
false
|
||||
Err(EventInvalidSignature)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -269,6 +323,7 @@ mod tests {
|
||||
Event {
|
||||
id: "0".to_owned(),
|
||||
pubkey: "0".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 0,
|
||||
kind: 0,
|
||||
tags: vec![],
|
||||
@@ -350,6 +405,7 @@ mod tests {
|
||||
let e = Event {
|
||||
id: "999".to_owned(),
|
||||
pubkey: "012345".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 501234,
|
||||
kind: 1,
|
||||
tags: vec![],
|
||||
@@ -367,6 +423,7 @@ mod tests {
|
||||
let e = Event {
|
||||
id: "999".to_owned(),
|
||||
pubkey: "012345".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 501234,
|
||||
kind: 1,
|
||||
tags: vec![
|
||||
@@ -388,11 +445,39 @@ mod tests {
|
||||
assert_eq!(v, vec!["foo", "bar", "baz"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_no_tag_select() {
|
||||
let e = Event {
|
||||
id: "999".to_owned(),
|
||||
pubkey: "012345".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 501234,
|
||||
kind: 1,
|
||||
tags: vec![
|
||||
vec!["j".to_owned(), "abc".to_owned()],
|
||||
vec!["e".to_owned(), "foo".to_owned()],
|
||||
vec!["e".to_owned(), "baz".to_owned()],
|
||||
vec![
|
||||
"p".to_owned(),
|
||||
"aaaa".to_owned(),
|
||||
"ws://example.com".to_owned(),
|
||||
],
|
||||
],
|
||||
content: "this is a test".to_owned(),
|
||||
sig: "abcde".to_owned(),
|
||||
tagidx: None,
|
||||
};
|
||||
let v = e.tag_values_by_name("x");
|
||||
// asking for tags that don't exist just returns zero-length vector
|
||||
assert_eq!(v.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_canonical_with_tags() {
|
||||
let e = Event {
|
||||
id: "999".to_owned(),
|
||||
pubkey: "012345".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 501234,
|
||||
kind: 1,
|
||||
tags: vec![
|
||||
|
@@ -35,7 +35,7 @@ impl From<config::Info> for RelayInfo {
|
||||
description: i.description,
|
||||
pubkey: i.pubkey,
|
||||
contact: i.contact,
|
||||
supported_nips: Some(vec![1, 2, 9, 11, 12, 15, 16, 22]),
|
||||
supported_nips: Some(vec![1, 2, 9, 11, 12, 15, 16, 20, 22, 26]),
|
||||
software: Some("https://git.sr.ht/~gheartsfield/nostr-rs-relay".to_owned()),
|
||||
version: CARGO_PKG_VERSION.map(|x| x.to_owned()),
|
||||
}
|
||||
|
@@ -2,11 +2,13 @@ pub mod close;
|
||||
pub mod config;
|
||||
pub mod conn;
|
||||
pub mod db;
|
||||
pub mod delegation;
|
||||
pub mod error;
|
||||
pub mod event;
|
||||
pub mod hexrange;
|
||||
pub mod info;
|
||||
pub mod nip05;
|
||||
pub mod notice;
|
||||
pub mod schema;
|
||||
pub mod subscription;
|
||||
pub mod utils;
|
||||
|
@@ -34,11 +34,11 @@ fn main() {
|
||||
// enable tracing with tokio-console
|
||||
ConsoleLayer::builder().with_default_env().init();
|
||||
}
|
||||
|
||||
// update with database location
|
||||
if let Some(db) = db_dir {
|
||||
settings.database.data_directory = db;
|
||||
}
|
||||
|
||||
let (_, ctrl_rx): (MpscSender<()>, MpscReceiver<()>) = syncmpsc::channel();
|
||||
// run this in a new thread
|
||||
let handle = thread::spawn(|| {
|
||||
|
86
src/notice.rs
Normal file
86
src/notice.rs
Normal file
@@ -0,0 +1,86 @@
|
||||
pub enum EventResultStatus {
|
||||
Saved,
|
||||
Duplicate,
|
||||
Invalid,
|
||||
Blocked,
|
||||
RateLimited,
|
||||
Error,
|
||||
}
|
||||
|
||||
pub struct EventResult {
|
||||
pub id: String,
|
||||
pub msg: String,
|
||||
pub status: EventResultStatus,
|
||||
}
|
||||
|
||||
pub enum Notice {
|
||||
Message(String),
|
||||
EventResult(EventResult),
|
||||
}
|
||||
|
||||
impl EventResultStatus {
|
||||
pub fn to_bool(&self) -> bool {
|
||||
match self {
|
||||
Self::Saved => true,
|
||||
Self::Duplicate => true,
|
||||
Self::Invalid => false,
|
||||
Self::Blocked => false,
|
||||
Self::RateLimited => false,
|
||||
Self::Error => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn prefix(&self) -> &'static str {
|
||||
match self {
|
||||
Self::Saved => "saved",
|
||||
Self::Duplicate => "duplicate",
|
||||
Self::Invalid => "invalid",
|
||||
Self::Blocked => "blocked",
|
||||
Self::RateLimited => "rate-limited",
|
||||
Self::Error => "error",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Notice {
|
||||
//pub fn err(err: error::Error, id: String) -> Notice {
|
||||
// Notice::err_msg(format!("{}", err), id)
|
||||
//}
|
||||
|
||||
pub fn message(msg: String) -> Notice {
|
||||
Notice::Message(msg)
|
||||
}
|
||||
|
||||
fn prefixed(id: String, msg: &str, status: EventResultStatus) -> Notice {
|
||||
let msg = format!("{}: {}", status.prefix(), msg);
|
||||
Notice::EventResult(EventResult { id, msg, status })
|
||||
}
|
||||
|
||||
pub fn invalid(id: String, msg: &str) -> Notice {
|
||||
Notice::prefixed(id, msg, EventResultStatus::Invalid)
|
||||
}
|
||||
|
||||
pub fn blocked(id: String, msg: &str) -> Notice {
|
||||
Notice::prefixed(id, msg, EventResultStatus::Blocked)
|
||||
}
|
||||
|
||||
pub fn rate_limited(id: String, msg: &str) -> Notice {
|
||||
Notice::prefixed(id, msg, EventResultStatus::RateLimited)
|
||||
}
|
||||
|
||||
pub fn duplicate(id: String) -> Notice {
|
||||
Notice::prefixed(id, "", EventResultStatus::Duplicate)
|
||||
}
|
||||
|
||||
pub fn error(id: String, msg: &str) -> Notice {
|
||||
Notice::prefixed(id, msg, EventResultStatus::Error)
|
||||
}
|
||||
|
||||
pub fn saved(id: String) -> Notice {
|
||||
Notice::EventResult(EventResult {
|
||||
id,
|
||||
msg: "".into(),
|
||||
status: EventResultStatus::Saved,
|
||||
})
|
||||
}
|
||||
}
|
@@ -20,7 +20,7 @@ pragma mmap_size = 536870912; -- 512MB of mmap
|
||||
"##;
|
||||
|
||||
/// Latest database version
|
||||
pub const DB_VERSION: usize = 6;
|
||||
pub const DB_VERSION: usize = 9;
|
||||
|
||||
/// Schema definition
|
||||
const INIT_SQL: &str = formatcp!(
|
||||
@@ -40,6 +40,7 @@ event_hash BLOB NOT NULL, -- 4-byte hash
|
||||
first_seen INTEGER NOT NULL, -- when the event was first seen (not authored!) (seconds since 1970)
|
||||
created_at INTEGER NOT NULL, -- when the event was authored
|
||||
author BLOB NOT NULL, -- author pubkey
|
||||
delegated_by BLOB, -- delegator pubkey (NIP-26)
|
||||
kind INTEGER NOT NULL, -- event kind
|
||||
hidden INTEGER, -- relevant for queries
|
||||
content TEXT NOT NULL -- serialized json of event object
|
||||
@@ -47,9 +48,10 @@ content TEXT NOT NULL -- serialized json of event object
|
||||
|
||||
-- Event Indexes
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS event_hash_index ON event(event_hash);
|
||||
CREATE INDEX IF NOT EXISTS created_at_index ON event(created_at);
|
||||
CREATE INDEX IF NOT EXISTS author_index ON event(author);
|
||||
CREATE INDEX IF NOT EXISTS kind_index ON event(kind);
|
||||
CREATE INDEX IF NOT EXISTS created_at_index ON event(created_at);
|
||||
CREATE INDEX IF NOT EXISTS delegated_by_index ON event(delegated_by);
|
||||
CREATE INDEX IF NOT EXISTS event_composite_index ON event(kind,created_at);
|
||||
|
||||
-- Tag Table
|
||||
-- Tag values are stored as either a BLOB (if they come in as a
|
||||
@@ -152,6 +154,16 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<()> {
|
||||
if curr_version == 5 {
|
||||
curr_version = mig_5_to_6(conn)?;
|
||||
}
|
||||
if curr_version == 6 {
|
||||
curr_version = mig_6_to_7(conn)?;
|
||||
}
|
||||
if curr_version == 7 {
|
||||
curr_version = mig_7_to_8(conn)?;
|
||||
}
|
||||
if curr_version == 8 {
|
||||
curr_version = mig_8_to_9(conn)?;
|
||||
}
|
||||
|
||||
if curr_version == DB_VERSION {
|
||||
info!(
|
||||
"All migration scripts completed successfully. Welcome to v{}.",
|
||||
@@ -327,7 +339,7 @@ fn mig_5_to_6(conn: &mut PooledConnection) -> Result<usize> {
|
||||
if (tagval.len() % 2 == 0) && is_lower_hex(tagval) {
|
||||
tx.execute(
|
||||
"INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);",
|
||||
params![event_id, tagname, hex::decode(&tagval).ok()],
|
||||
params![event_id, tagname, hex::decode(tagval).ok()],
|
||||
)?;
|
||||
} else {
|
||||
// otherwise, insert as text
|
||||
@@ -348,3 +360,64 @@ fn mig_5_to_6(conn: &mut PooledConnection) -> Result<usize> {
|
||||
info!("vacuumed DB after tags rebuild in {:?}", start.elapsed());
|
||||
Ok(6)
|
||||
}
|
||||
|
||||
fn mig_6_to_7(conn: &mut PooledConnection) -> Result<usize> {
|
||||
info!("database schema needs update from 6->7");
|
||||
// only change is adding a hidden column to events.
|
||||
let upgrade_sql = r##"
|
||||
ALTER TABLE event ADD delegated_by BLOB;
|
||||
CREATE INDEX IF NOT EXISTS delegated_by_index ON event(delegated_by);
|
||||
PRAGMA user_version = 7;
|
||||
"##;
|
||||
match conn.execute_batch(upgrade_sql) {
|
||||
Ok(()) => {
|
||||
info!("database schema upgraded v6 -> v7");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
Ok(7)
|
||||
}
|
||||
|
||||
fn mig_7_to_8(conn: &mut PooledConnection) -> Result<usize> {
|
||||
info!("database schema needs update from 7->8");
|
||||
// Remove redundant indexes, and add a better multi-column index.
|
||||
let upgrade_sql = r##"
|
||||
DROP INDEX IF EXISTS created_at_index;
|
||||
DROP INDEX IF EXISTS kind_index;
|
||||
CREATE INDEX IF NOT EXISTS event_composite_index ON event(kind,created_at);
|
||||
PRAGMA user_version = 8;
|
||||
"##;
|
||||
match conn.execute_batch(upgrade_sql) {
|
||||
Ok(()) => {
|
||||
info!("database schema upgraded v7 -> v8");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
Ok(8)
|
||||
}
|
||||
|
||||
fn mig_8_to_9(conn: &mut PooledConnection) -> Result<usize> {
|
||||
info!("database schema needs update from 8->9");
|
||||
// Those old indexes were actually helpful...
|
||||
let upgrade_sql = r##"
|
||||
CREATE INDEX IF NOT EXISTS created_at_index ON event(created_at);
|
||||
CREATE INDEX IF NOT EXISTS event_composite_index ON event(kind,created_at);
|
||||
PRAGMA user_version = 9;
|
||||
"##;
|
||||
match conn.execute_batch(upgrade_sql) {
|
||||
Ok(()) => {
|
||||
info!("database schema upgraded v8 -> v9");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
Ok(8)
|
||||
}
|
||||
|
169
src/server.rs
169
src/server.rs
@@ -10,9 +10,11 @@ use crate::event::Event;
|
||||
use crate::event::EventCmd;
|
||||
use crate::info::RelayInfo;
|
||||
use crate::nip05;
|
||||
use crate::notice::Notice;
|
||||
use crate::subscription::Subscription;
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
use http::header::HeaderMap;
|
||||
use hyper::header::ACCEPT;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::upgrade::Upgraded;
|
||||
@@ -84,11 +86,32 @@ async fn handle_web_request(
|
||||
Some(config),
|
||||
)
|
||||
.await;
|
||||
|
||||
let user_agent = get_header_string("user-agent", request.headers());
|
||||
// determine the remote IP from headers if the exist
|
||||
let header_ip = settings
|
||||
.network
|
||||
.remote_ip_header
|
||||
.as_ref()
|
||||
.and_then(|x| get_header_string(x, request.headers()));
|
||||
// use the socket addr as a backup
|
||||
let remote_ip =
|
||||
header_ip.unwrap_or_else(|| remote_addr.ip().to_string());
|
||||
let client_info = ClientInfo {
|
||||
remote_ip,
|
||||
user_agent,
|
||||
};
|
||||
// spawn a nostr server with our websocket
|
||||
tokio::spawn(nostr_server(
|
||||
pool, settings, ws_stream, broadcast, event_tx, shutdown,
|
||||
pool,
|
||||
client_info,
|
||||
settings,
|
||||
ws_stream,
|
||||
broadcast,
|
||||
event_tx,
|
||||
shutdown,
|
||||
));
|
||||
}
|
||||
// todo: trace, don't print...
|
||||
Err(e) => println!(
|
||||
"error when trying to upgrade connection \
|
||||
from address {} to websocket connection. \
|
||||
@@ -148,6 +171,12 @@ async fn handle_web_request(
|
||||
}
|
||||
}
|
||||
|
||||
fn get_header_string(header: &str, headers: &HeaderMap) -> Option<String> {
|
||||
headers
|
||||
.get(header)
|
||||
.and_then(|x| x.to_str().ok().map(|x| x.to_string()))
|
||||
}
|
||||
|
||||
// return on a control-c or internally requested shutdown signal
|
||||
async fn ctrl_c_or_signal(mut shutdown_signal: Receiver<()>) {
|
||||
let mut term_signal = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
|
||||
@@ -167,7 +196,6 @@ async fn ctrl_c_or_signal(mut shutdown_signal: Receiver<()>) {
|
||||
info!("Shutting down webserver due to SIGTERM");
|
||||
break;
|
||||
},
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -283,6 +311,7 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
|
||||
controlled_shutdown.send(()).ok();
|
||||
}
|
||||
Err(std::sync::mpsc::RecvError) => {
|
||||
// FIXME: spurious error on startup?
|
||||
debug!("shutdown requestor is disconnected");
|
||||
}
|
||||
};
|
||||
@@ -378,36 +407,50 @@ fn convert_to_msg(msg: String, max_bytes: Option<usize>) -> Result<NostrMessage>
|
||||
}
|
||||
|
||||
/// Turn a string into a NOTICE message ready to send over a WebSocket
|
||||
fn make_notice_message(msg: &str) -> Message {
|
||||
Message::text(json!(["NOTICE", msg]).to_string())
|
||||
fn make_notice_message(notice: Notice) -> Message {
|
||||
let json = match notice {
|
||||
Notice::Message(ref msg) => json!(["NOTICE", msg]),
|
||||
Notice::EventResult(ref res) => json!(["OK", res.id, res.status.to_bool(), res.msg]),
|
||||
};
|
||||
|
||||
Message::text(json.to_string())
|
||||
}
|
||||
|
||||
struct ClientInfo {
|
||||
remote_ip: String,
|
||||
user_agent: Option<String>,
|
||||
}
|
||||
|
||||
/// Handle new client connections. This runs through an event loop
|
||||
/// for all client communication.
|
||||
async fn nostr_server(
|
||||
pool: db::SqlitePool,
|
||||
client_info: ClientInfo,
|
||||
settings: Settings,
|
||||
mut ws_stream: WebSocketStream<Upgraded>,
|
||||
broadcast: Sender<Event>,
|
||||
event_tx: mpsc::Sender<SubmittedEvent>,
|
||||
mut shutdown: Receiver<()>,
|
||||
) {
|
||||
// the time this websocket nostr server started
|
||||
let orig_start = Instant::now();
|
||||
// get a broadcast channel for clients to communicate on
|
||||
let mut bcast_rx = broadcast.subscribe();
|
||||
// Track internal client state
|
||||
let mut conn = conn::ClientConn::new();
|
||||
let mut conn = conn::ClientConn::new(client_info.remote_ip);
|
||||
// Use the remote IP as the client identifier
|
||||
let cid = conn.get_client_prefix();
|
||||
// Create a channel for receiving query results from the database.
|
||||
// we will send out the tx handle to any query we generate.
|
||||
let (query_tx, mut query_rx) = mpsc::channel::<db::QueryResult>(256);
|
||||
// Create channel for receiving NOTICEs
|
||||
let (notice_tx, mut notice_rx) = mpsc::channel::<String>(32);
|
||||
let (notice_tx, mut notice_rx) = mpsc::channel::<Notice>(32);
|
||||
|
||||
// last time this client sent data (message, ping, etc.)
|
||||
let mut last_message_time = Instant::now();
|
||||
|
||||
// ping interval (every 5 minutes)
|
||||
let default_ping_dur = Duration::from_secs(300);
|
||||
let default_ping_dur = Duration::from_secs(settings.network.ping_interval_seconds.into());
|
||||
|
||||
// disconnect after 20 minutes without a ping response or event.
|
||||
let max_quiet_time = Duration::from_secs(60 * 20);
|
||||
@@ -419,16 +462,20 @@ async fn nostr_server(
|
||||
// when these subscriptions are cancelled, make a message
|
||||
// available to the executing query so it knows to stop.
|
||||
let mut running_queries: HashMap<String, oneshot::Sender<()>> = HashMap::new();
|
||||
|
||||
// keep track of the subscriptions we have
|
||||
let mut current_subs: Vec<Subscription> = Vec::new();
|
||||
// for stats, keep track of how many events the client published,
|
||||
// and how many it received from queries.
|
||||
let mut client_published_event_count: usize = 0;
|
||||
let mut client_received_event_count: usize = 0;
|
||||
info!("new connection for client: {:?}", cid);
|
||||
debug!("new client connection (cid: {}, ip: {:?})", cid, conn.ip());
|
||||
if let Some(ua) = client_info.user_agent {
|
||||
debug!("cid: {}, user-agent: {:?}", cid, ua);
|
||||
}
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = shutdown.recv() => {
|
||||
info!("Shutting client connection down due to shutdown: {:?}", cid);
|
||||
info!("Close connection down due to shutdown, client: {}, ip: {:?}, connected: {:?}", cid, conn.ip(), orig_start.elapsed());
|
||||
// server shutting down, exit loop
|
||||
break;
|
||||
},
|
||||
@@ -443,7 +490,7 @@ async fn nostr_server(
|
||||
ws_stream.send(Message::Ping(Vec::new())).await.ok();
|
||||
},
|
||||
Some(notice_msg) = notice_rx.recv() => {
|
||||
ws_stream.send(make_notice_message(¬ice_msg)).await.ok();
|
||||
ws_stream.send(make_notice_message(notice_msg)).await.ok();
|
||||
},
|
||||
Some(query_result) = query_rx.recv() => {
|
||||
// database informed us of a query result we asked for
|
||||
@@ -462,12 +509,15 @@ async fn nostr_server(
|
||||
Ok(global_event) = bcast_rx.recv() => {
|
||||
// an event has been broadcast to all clients
|
||||
// first check if there is a subscription for this event.
|
||||
let matching_subs = conn.get_matching_subscriptions(&global_event);
|
||||
for s in matching_subs {
|
||||
for (s, sub) in conn.subscriptions() {
|
||||
if !sub.interested_in_event(&global_event) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: serialize at broadcast time, instead of
|
||||
// once for each consumer.
|
||||
if let Ok(event_str) = serde_json::to_string(&global_event) {
|
||||
debug!("sub match for client: {:?}, sub: {:?}, event: {:?}",
|
||||
debug!("sub match for client: {}, sub: {:?}, event: {:?}",
|
||||
cid, s,
|
||||
global_event.get_event_id_prefix());
|
||||
// create an event response and send it
|
||||
@@ -488,7 +538,7 @@ async fn nostr_server(
|
||||
},
|
||||
Some(Ok(Message::Binary(_))) => {
|
||||
ws_stream.send(
|
||||
make_notice_message("binary messages are not accepted")).await.ok();
|
||||
make_notice_message(Notice::message("binary messages are not accepted".into()))).await.ok();
|
||||
continue;
|
||||
},
|
||||
Some(Ok(Message::Ping(_) | Message::Pong(_))) => {
|
||||
@@ -498,8 +548,7 @@ async fn nostr_server(
|
||||
},
|
||||
Some(Err(WsError::Capacity(MessageTooLong{size, max_size}))) => {
|
||||
ws_stream.send(
|
||||
make_notice_message(
|
||||
&format!("message too large ({} > {})",size, max_size))).await.ok();
|
||||
make_notice_message(Notice::message(format!("message too large ({} > {})",size, max_size)))).await.ok();
|
||||
continue;
|
||||
},
|
||||
None |
|
||||
@@ -507,17 +556,17 @@ async fn nostr_server(
|
||||
Err(WsError::AlreadyClosed | WsError::ConnectionClosed |
|
||||
WsError::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake)))
|
||||
=> {
|
||||
debug!("websocket close from client: {:?}",cid);
|
||||
debug!("websocket close from client (cid: {}, ip: {:?})",cid, conn.ip());
|
||||
break;
|
||||
},
|
||||
Some(Err(WsError::Io(e))) => {
|
||||
// IO errors are considered fatal
|
||||
warn!("IO error (client: {:?}): {:?}", cid, e);
|
||||
warn!("IO error (cid: {}, ip: {:?}): {:?}", cid, conn.ip(), e);
|
||||
break;
|
||||
}
|
||||
x => {
|
||||
// default condition on error is to close the client connection
|
||||
info!("unknown error (client: {:?}): {:?} (closing conn)", cid, x);
|
||||
info!("unknown error (cid: {}, ip: {:?}): {:?} (closing conn)", cid, conn.ip(), x);
|
||||
break;
|
||||
}
|
||||
};
|
||||
@@ -527,11 +576,12 @@ async fn nostr_server(
|
||||
Ok(NostrMessage::EventMsg(ec)) => {
|
||||
// An EventCmd needs to be validated to be converted into an Event
|
||||
// handle each type of message
|
||||
let evid = ec.event_id().to_owned();
|
||||
let parsed : Result<Event> = Result::<Event>::from(ec);
|
||||
match parsed {
|
||||
Ok(e) => {
|
||||
let id_prefix:String = e.id.chars().take(8).collect();
|
||||
debug!("successfully parsed/validated event: {:?} from client: {:?}", id_prefix, cid);
|
||||
debug!("successfully parsed/validated event: {:?} (cid: {})", id_prefix, cid);
|
||||
// check if the event is too far in the future.
|
||||
if e.is_valid_timestamp(settings.options.reject_future_seconds) {
|
||||
// Write this to the database.
|
||||
@@ -539,44 +589,57 @@ async fn nostr_server(
|
||||
event_tx.send(submit_event).await.ok();
|
||||
client_published_event_count += 1;
|
||||
} else {
|
||||
info!("client {:?} sent a far future-dated event", cid);
|
||||
info!("client: {} sent a far future-dated event", cid);
|
||||
if let Some(fut_sec) = settings.options.reject_future_seconds {
|
||||
ws_stream.send(make_notice_message(&format!("The event created_at field is out of the acceptable range (+{}sec) for this relay and was not stored.",fut_sec))).await.ok();
|
||||
let msg = format!("The event created_at field is out of the acceptable range (+{}sec) for this relay.",fut_sec);
|
||||
let notice = Notice::invalid(e.id, &msg);
|
||||
ws_stream.send(make_notice_message(notice)).await.ok();
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
info!("client {:?} sent an invalid event", cid);
|
||||
ws_stream.send(make_notice_message("event was invalid")).await.ok();
|
||||
Err(e) => {
|
||||
info!("client sent an invalid event (cid: {})", cid);
|
||||
ws_stream.send(make_notice_message(Notice::invalid(evid, &format!("{}", e)))).await.ok();
|
||||
}
|
||||
}
|
||||
},
|
||||
Ok(NostrMessage::SubMsg(s)) => {
|
||||
debug!("client {} requesting a subscription", cid);
|
||||
debug!("subscription requested (cid: {}, sub: {:?})", cid, s.id);
|
||||
// subscription handling consists of:
|
||||
// * registering the subscription so future events can be matched
|
||||
// * making a channel to cancel to request later
|
||||
// * sending a request for a SQL query
|
||||
let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>();
|
||||
match conn.subscribe(s.clone()) {
|
||||
Ok(()) => {
|
||||
// when we insert, if there was a previous query running with the same name, cancel it.
|
||||
if let Some(previous_query) = running_queries.insert(s.id.to_owned(), abandon_query_tx) {
|
||||
previous_query.send(()).ok();
|
||||
}
|
||||
// start a database query
|
||||
db::db_query(s, cid.to_owned(), pool.clone(), query_tx.clone(), abandon_query_rx).await;
|
||||
},
|
||||
Err(e) => {
|
||||
info!("Subscription error: {}", e);
|
||||
ws_stream.send(make_notice_message(&e.to_string())).await.ok();
|
||||
// Do nothing if the sub already exists.
|
||||
if !current_subs.contains(&s) {
|
||||
current_subs.push(s.clone());
|
||||
let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>();
|
||||
match conn.subscribe(s.clone()) {
|
||||
Ok(()) => {
|
||||
// when we insert, if there was a previous query running with the same name, cancel it.
|
||||
if let Some(previous_query) = running_queries.insert(s.id.to_owned(), abandon_query_tx) {
|
||||
previous_query.send(()).ok();
|
||||
}
|
||||
// start a database query
|
||||
db::db_query(s, cid.to_owned(), pool.clone(), query_tx.clone(), abandon_query_rx).await;
|
||||
},
|
||||
Err(e) => {
|
||||
info!("Subscription error: {}", e);
|
||||
ws_stream.send(make_notice_message(Notice::message(format!("Subscription error: {}", e)))).await.ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
info!("client send duplicate subscription, ignoring (cid: {}, sub: {:?})", cid, s.id);
|
||||
}
|
||||
},
|
||||
Ok(NostrMessage::CloseMsg(cc)) => {
|
||||
// closing a request simply removes the subscription.
|
||||
let parsed : Result<Close> = Result::<Close>::from(cc);
|
||||
if let Ok(c) = parsed {
|
||||
// remove from the list of known subs
|
||||
if let Some(pos) = current_subs.iter().position(|s| *s.id == c.id) {
|
||||
current_subs.remove(pos);
|
||||
}
|
||||
|
||||
// check if a query is currently
|
||||
// running, and remove it if so.
|
||||
let stop_tx = running_queries.remove(&c.id);
|
||||
@@ -588,23 +651,23 @@ async fn nostr_server(
|
||||
conn.unsubscribe(&c);
|
||||
} else {
|
||||
info!("invalid command ignored");
|
||||
ws_stream.send(make_notice_message("could not parse command")).await.ok();
|
||||
ws_stream.send(make_notice_message(Notice::message("could not parse command".into()))).await.ok();
|
||||
}
|
||||
},
|
||||
Err(Error::ConnError) => {
|
||||
debug!("got connection close/error, disconnecting client: {:?}",cid);
|
||||
debug!("got connection close/error, disconnecting cid: {}, ip: {:?}",cid, conn.ip());
|
||||
break;
|
||||
}
|
||||
Err(Error::EventMaxLengthError(s)) => {
|
||||
info!("client {:?} sent event larger ({} bytes) than max size", cid, s);
|
||||
ws_stream.send(make_notice_message("event exceeded max size")).await.ok();
|
||||
info!("client sent event larger ({} bytes) than max size (cid: {})", s, cid);
|
||||
ws_stream.send(make_notice_message(Notice::message("event exceeded max size".into()))).await.ok();
|
||||
},
|
||||
Err(Error::ProtoParseError) => {
|
||||
info!("client {:?} sent event that could not be parsed", cid);
|
||||
ws_stream.send(make_notice_message("could not parse command")).await.ok();
|
||||
info!("client sent event that could not be parsed (cid: {})", cid);
|
||||
ws_stream.send(make_notice_message(Notice::message("could not parse command".into()))).await.ok();
|
||||
},
|
||||
Err(e) => {
|
||||
info!("got non-fatal error from client: {:?}, error: {:?}", cid, e);
|
||||
info!("got non-fatal error from client (cid: {}, error: {:?}", cid, e);
|
||||
},
|
||||
}
|
||||
},
|
||||
@@ -615,7 +678,11 @@ async fn nostr_server(
|
||||
stop_tx.send(()).ok();
|
||||
}
|
||||
info!(
|
||||
"stopping connection for client: {:?} (client sent {} event(s), received {})",
|
||||
cid, client_published_event_count, client_received_event_count
|
||||
"stopping client connection (cid: {}, ip: {:?}, sent: {} events, recv: {} events, connected: {:?})",
|
||||
cid,
|
||||
conn.ip(),
|
||||
client_published_event_count,
|
||||
client_received_event_count,
|
||||
orig_start.elapsed()
|
||||
);
|
||||
}
|
||||
|
@@ -37,6 +37,9 @@ pub struct ReqFilter {
|
||||
#[serde(skip)]
|
||||
pub tags: Option<HashMap<char, HashSet<String>>>,
|
||||
/// Force no matches due to malformed data
|
||||
// we can't represent it in the req filter, so we don't want to
|
||||
// erroneously match. This basically indicates the req tried to
|
||||
// do something invalid.
|
||||
pub force_no_match: bool,
|
||||
}
|
||||
|
||||
@@ -217,6 +220,17 @@ impl ReqFilter {
|
||||
.unwrap_or(true)
|
||||
}
|
||||
|
||||
fn delegated_authors_match(&self, event: &Event) -> bool {
|
||||
if let Some(delegated_pubkey) = &event.delegated_by {
|
||||
self.authors
|
||||
.as_ref()
|
||||
.map(|vs| prefix_match(vs, delegated_pubkey))
|
||||
.unwrap_or(true)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn tag_match(&self, event: &Event) -> bool {
|
||||
// get the hashset from the filter.
|
||||
if let Some(map) = &self.tags {
|
||||
@@ -248,7 +262,7 @@ impl ReqFilter {
|
||||
&& self.since.map(|t| event.created_at > t).unwrap_or(true)
|
||||
&& self.until.map(|t| event.created_at < t).unwrap_or(true)
|
||||
&& self.kind_match(event.kind)
|
||||
&& self.authors_match(event)
|
||||
&& (self.authors_match(event) || self.delegated_authors_match(event))
|
||||
&& self.tag_match(event)
|
||||
&& !self.force_no_match
|
||||
}
|
||||
@@ -308,6 +322,7 @@ mod tests {
|
||||
let e = Event {
|
||||
id: "foo".to_owned(),
|
||||
pubkey: "abcd".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 0,
|
||||
kind: 0,
|
||||
tags: Vec::new(),
|
||||
@@ -326,6 +341,7 @@ mod tests {
|
||||
let e = Event {
|
||||
id: "abcd".to_owned(),
|
||||
pubkey: "".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 0,
|
||||
kind: 0,
|
||||
tags: Vec::new(),
|
||||
@@ -344,6 +360,7 @@ mod tests {
|
||||
let e = Event {
|
||||
id: "abcde".to_owned(),
|
||||
pubkey: "".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 0,
|
||||
kind: 0,
|
||||
tags: Vec::new(),
|
||||
@@ -363,6 +380,7 @@ mod tests {
|
||||
let e = Event {
|
||||
id: "abc".to_owned(),
|
||||
pubkey: "".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 50,
|
||||
kind: 0,
|
||||
tags: Vec::new(),
|
||||
@@ -386,6 +404,7 @@ mod tests {
|
||||
let e = Event {
|
||||
id: "abc".to_owned(),
|
||||
pubkey: "".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 150,
|
||||
kind: 0,
|
||||
tags: Vec::new(),
|
||||
@@ -407,6 +426,7 @@ mod tests {
|
||||
let e = Event {
|
||||
id: "abc".to_owned(),
|
||||
pubkey: "".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 50,
|
||||
kind: 0,
|
||||
tags: Vec::new(),
|
||||
@@ -425,6 +445,7 @@ mod tests {
|
||||
let e = Event {
|
||||
id: "abc".to_owned(),
|
||||
pubkey: "".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 1001,
|
||||
kind: 0,
|
||||
tags: Vec::new(),
|
||||
@@ -443,6 +464,7 @@ mod tests {
|
||||
let e = Event {
|
||||
id: "abc".to_owned(),
|
||||
pubkey: "".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 0,
|
||||
kind: 0,
|
||||
tags: Vec::new(),
|
||||
@@ -461,6 +483,7 @@ mod tests {
|
||||
let e = Event {
|
||||
id: "123".to_owned(),
|
||||
pubkey: "abc".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 0,
|
||||
kind: 0,
|
||||
tags: Vec::new(),
|
||||
@@ -479,6 +502,7 @@ mod tests {
|
||||
let e = Event {
|
||||
id: "123".to_owned(),
|
||||
pubkey: "bcd".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 0,
|
||||
kind: 0,
|
||||
tags: Vec::new(),
|
||||
@@ -497,6 +521,7 @@ mod tests {
|
||||
let e = Event {
|
||||
id: "123".to_owned(),
|
||||
pubkey: "xyz".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 0,
|
||||
kind: 0,
|
||||
tags: Vec::new(),
|
||||
|
Reference in New Issue
Block a user