Compare commits

...

39 Commits
0.5.0 ... 0.6.1

Author SHA1 Message Date
Greg Heartsfield
f4ecd43708 build: bump version to 0.6.1 2022-07-04 17:41:16 -05:00
Greg Heartsfield
a8f465fdc8 improvement: upgrade docker base images (and specify explicit repository) 2022-07-04 17:35:17 -05:00
Greg Heartsfield
1c14adc766 fix(NIP-01): allow limits on a per-filter basis
The original implementation of subscription limit applied to the
entire query, instead of the specific filter.  Now, each filter gets
its own query limit.  When a limit is applied, the most recent N
events will be returned, otherwise the default is to return the
earliest events (in order), for all matching events.
2022-07-04 17:25:32 -05:00
Greg Heartsfield
e894a86566 docs: NIP-15, NIP-16 feature notes in README 2022-07-04 13:10:48 -05:00
Greg Heartsfield
bedc378624 improvement: upgrade multiple dependencies
Updating async-trait v0.1.53 -> v0.1.56
Updating bumpalo v3.9.1 -> v3.10.0
Updating crossbeam-utils v0.8.8 -> v0.8.10
Updating crypto-common v0.1.3 -> v0.1.4
Updating getrandom v0.2.6 -> v0.2.7
Updating http v0.2.7 -> v0.2.8
Updating indexmap v1.8.2 -> v1.9.1
Updating js-sys v0.3.57 -> v0.3.58
Updating linked-hash-map v0.5.4 -> v0.5.6
Updating mio v0.8.3 -> v0.8.4
Updating once_cell v1.12.0 -> v1.12.1
Updating openssl-sys v0.9.73 -> v0.9.74
Removing parking_lot v0.11.2
Removing parking_lot_core v0.8.5
Updating proc-macro2 v1.0.39 -> v1.0.40
Updating quote v1.0.18 -> v1.0.20
Updating r2d2 v0.8.9 -> v0.8.10
Updating ron v0.7.0 -> v0.7.1
Updating serde v1.0.137 -> v1.0.138
Updating serde_derive v1.0.137 -> v1.0.138
Updating serde_json v1.0.81 -> v1.0.82
Updating smallvec v1.8.0 -> v1.9.0
Updating syn v1.0.95 -> v1.0.98
Updating tokio v1.18.2 -> v1.19.2
Updating tokio-macros v1.7.0 -> v1.8.0
Updating tokio-util v0.7.2 -> v0.7.3
Updating tower-service v0.3.1 -> v0.3.2
Updating tracing v0.1.34 -> v0.1.35
Removing tracing-attributes v0.1.21
Updating tracing-core v0.1.26 -> v0.1.28
Updating unicode-ident v1.0.0 -> v1.0.1
Updating unicode-normalization v0.1.19 -> v0.1.21
Updating wasm-bindgen v0.2.80 -> v0.2.81
Updating wasm-bindgen-backend v0.2.80 -> v0.2.81
Updating wasm-bindgen-macro v0.2.80 -> v0.2.81
Updating wasm-bindgen-macro-support v0.2.80 -> v0.2.81
Updating wasm-bindgen-shared v0.2.80 -> v0.2.81
Updating web-sys v0.3.57 -> v0.3.58
2022-07-04 12:56:10 -05:00
Greg Heartsfield
e1c2a6b758 improvement: upgrade docker base image 2022-05-30 21:53:46 -05:00
Greg Heartsfield
990bb656e8 improvement: upgrade multiple dependencies
Cargo updated the following dependencies:

Updating dashmap v5.3.3 -> v5.3.4
Updating http-body v0.4.4 -> v0.4.5
Updating hyper v0.14.18 -> v0.14.19
Updating indexmap v1.8.1 -> v1.8.2
Updating itoa v1.0.1 -> v1.0.2
Updating libc v0.2.125 -> v0.2.126
Updating once_cell v1.10.0 -> v1.12.0
Updating parking_lot v0.12.0 -> v0.12.1
Updating proc-macro2 v1.0.38 -> v1.0.39
Updating regex v1.5.5 -> v1.5.6
Updating regex-syntax v0.6.25 -> v0.6.26
Updating ryu v1.0.9 -> v1.0.10
Updating schannel v0.1.19 -> v0.1.20
Updating scheduled-thread-pool v0.2.5 -> v0.2.6
Updating syn v1.0.93 -> v1.0.95
Updating tokio-util v0.7.1 -> v0.7.2

Adding unicode-ident v1.0.0

Removing unicode-xid v0.2.3
2022-05-30 21:47:24 -05:00
Semisol
168cfc3b26 feat(NIP-16): Implement NIP16
NIP16 introduces a replaceable and ephemeral event range:
[10000..20000) for replaceable and [20000..30000) for
ephemeral.
2022-05-30 21:43:06 -05:00
Semisol
a36ad378f6 feat(NIP-15): Implement NIP15
NIP15 sends an EOSE notice to clients after all stored events are sent
to allow loading indicators and other use cases.
2022-05-30 21:43:00 -05:00
Greg Heartsfield
538d139ebf improvement: upgrade docker base image 2022-05-10 21:24:22 -05:00
Greg Heartsfield
23f7730fea build: bump version to 0.6.0 2022-05-10 21:19:21 -05:00
Greg Heartsfield
8aa1256254 improvement: upgrade multiple dependencies 2022-05-10 17:07:18 -05:00
Greg Heartsfield
9ed3391b46 fix(NIP-09): correct WHERE clause for event deletion 2022-05-10 16:50:52 -05:00
William Casarin
4ad483090e feat(NIP-01): Implement limit
This was quickly sneaked in by fiatjaf per my request[0], it makes many
queries more efficient and allows for paging when combined with until.

It is a bit weird to have multiple limits on each filter... for now we
just choose any or the last limit seen.

[0]: a4aea5337f

Signed-off-by: William Casarin <jb55@jb55.com>
2022-05-10 16:47:56 -05:00
Greg Heartsfield
9b351aab9b docs: update devel discussion link 2022-02-28 17:19:24 -06:00
Greg Heartsfield
597749890e improvement: remove unnecessary event logging 2022-02-27 19:30:48 -06:00
Greg Heartsfield
1d499cf12b feat: handle NIP-09 for deletion events 2022-02-27 11:35:23 -06:00
Greg Heartsfield
ed3a6b9692 refactor: simplify NOTICE messages 2022-02-26 17:34:58 -06:00
Greg Heartsfield
048199e30b build: bump version to 0.5.2 2022-02-26 11:22:16 -06:00
Greg Heartsfield
414e83f696 refactor: import cleanup for config 2022-02-26 11:16:12 -06:00
Greg Heartsfield
225c8f762e improvement: upgrade dependencies; config, tungstenite, tokio 2022-02-26 09:55:12 -06:00
Greg Heartsfield
887fc28ab2 fix: until filters in subscriptions now used 2022-02-26 09:15:45 -06:00
Greg Heartsfield
294d3b99c3 fix: correct imports for test cases 2022-02-26 09:07:07 -06:00
Greg Heartsfield
53990672ae improvement: move db pool operations closer to query, do not panic on failure 2022-02-23 16:38:16 -06:00
Greg Heartsfield
9c1b21cbfe improvement: more granular perf logging for SQL queries 2022-02-21 09:03:05 -06:00
Greg Heartsfield
2f63417646 improvement: better logging for connection resets 2022-02-21 08:57:07 -06:00
Greg Heartsfield
3b25160852 fix: abort on connection IO errors 2022-02-21 08:50:46 -06:00
Greg Heartsfield
34ad549cde fix: update event buffer size comment in config 2022-02-20 11:46:24 -06:00
Greg Heartsfield
f8b1fe5035 docs: line up comments with code 2022-02-17 16:18:05 -06:00
Greg Heartsfield
f2001dc34a build: bump version to 0.5.1 2022-02-13 09:38:45 -06:00
Greg Heartsfield
b593001229 fix: remove setting from example config 2022-02-13 09:37:05 -06:00
Greg Heartsfield
5913b9f87a feat: send notices when authorization checks fail 2022-02-13 09:35:54 -06:00
Greg Heartsfield
77f35f9f43 feat: server-side pings and disconnects 2022-02-12 16:57:26 -06:00
Greg Heartsfield
9e06cc9482 improvement: better error messages on parse failures 2022-02-12 16:33:29 -06:00
Greg Heartsfield
e66fa4ac42 refactor: remove unnecessary Option wrapping 2022-02-12 16:29:27 -06:00
Greg Heartsfield
99e117f620 improvement: better handling of out-of-protocol messages 2022-02-12 16:26:55 -06:00
Greg Heartsfield
8250e00f05 fix: remove protostream module, and missing NOTICE 2022-02-12 16:22:12 -06:00
Greg Heartsfield
c9f87ec563 docs: NIP-05 feature note in README 2022-02-12 16:19:46 -06:00
Greg Heartsfield
ceaa01e8b4 fix: removed manual nostr stream, so websocket pings work 2022-02-12 16:19:10 -06:00
15 changed files with 999 additions and 722 deletions

785
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[package]
name = "nostr-rs-relay"
version = "0.5.0"
version = "0.6.1"
edition = "2021"
[dependencies]
@@ -9,12 +9,12 @@ env_logger = "^0.9"
tokio = { version = "^1.16", features = ["full"] }
futures = "^0.3"
futures-util = "^0.3"
tokio-tungstenite = "^0.16"
tungstenite = "^0.16"
tokio-tungstenite = "^0.17"
tungstenite = "^0.17"
thiserror = "^1"
uuid = { version = "^0.8", features = ["v4"] }
config = { version = "0.11", features = ["toml"] }
bitcoin_hashes = { version = "^0.9", features = ["serde"] }
config = { version = "^0.12", features = ["toml"] }
bitcoin_hashes = { version = "^0.10", features = ["serde"] }
secp256k1 = {version = "^0.21", features = ["rand", "rand-std", "serde", "bitcoin_hashes"] }
serde = { version = "^1.0", features = ["derive"] }
serde_json = {version = "^1.0", features = ["preserve_order"]}

View File

@@ -1,4 +1,4 @@
FROM rust:1.58.1 as builder
FROM docker.io/library/rust:1.62.0@sha256:8220e4fbb22a07b78e6472cdf8f5fb8913a04974c26b130177b73a8a64334541 as builder
RUN USER=root cargo new --bin nostr-rs-relay
WORKDIR ./nostr-rs-relay
@@ -12,7 +12,7 @@ COPY ./src ./src
RUN rm ./target/release/deps/nostr*relay*
RUN cargo build --release
FROM debian:bullseye-20220125-slim
FROM docker.io/library/debian:bullseye-20220622-slim@sha256:89e9d812b34f393bddc3ff289f0036a3d9efc7e2f7289ec902c6427b69f39649
ARG APP=/usr/src/app
ARG APP_DATA=/usr/src/app/db
RUN apt-get update \

View File

@@ -17,10 +17,12 @@ NIPs with a relay-specific implementation are listed here.
- [x] NIP-01: Id/Author prefix search (_experimental_)
- [x] NIP-02: Hide old contact list events
- [ ] NIP-03: OpenTimestamps
- [ ] NIP-05: Mapping Nostr keys to DNS identifiers
- [ ] NIP-09: Event deletion
- [x] NIP-05: Mapping Nostr keys to DNS identifiers
- [x] NIP-09: Event deletion
- [x] NIP-11: Relay information document
- [x] NIP-12: Generic tag search (_experimental_)
- [x] NIP-15: End of stored events notice
- [x] NIP-16: Replaceable and ephemeral events
## Quick Start
@@ -79,8 +81,10 @@ termination, load balancing, and other features), see [Reverse
Proxy](reverse-proxy.md).
## Dev Channel
The current dev discussions for this project is happening at https://discord.gg/ufG6fH52Vk.
Drop in to query any development related questions.
For development discussions, please feel free to use the [sourcehut
mailing list](https://lists.sr.ht/~gheartsfield/nostr-rs-relay-devel).
Or, drop by the [Nostr Telegram Channel](https://t.me/nostr_protocol).
License
---

View File

@@ -63,8 +63,8 @@ reject_future_seconds = 1800
#broadcast_buffer = 16384
# Event persistence buffer size, in number of events. This provides
# backpressure to senders if writes are slow. Defaults to 16.
#event_persist_buffer = 16
# backpressure to senders if writes are slow.
#event_persist_buffer = 4096
[authorization]
# Pubkey addresses in this array are whitelisted for event publishing.

View File

@@ -1,4 +1,5 @@
//! Configuration file and settings management
use config::{Config, ConfigError, File};
use lazy_static::lazy_static;
use log::*;
use serde::{Deserialize, Serialize};
@@ -138,27 +139,29 @@ pub struct Settings {
impl Settings {
pub fn new() -> Self {
let d = Self::default();
let default_settings = Self::default();
// attempt to construct settings with file
// Self::new_from_default(&d).unwrap_or(d)
let from_file = Self::new_from_default(&d);
let from_file = Self::new_from_default(&default_settings);
match from_file {
Ok(f) => f,
Err(e) => {
warn!("Error reading config file ({:?})", e);
d
default_settings
}
}
}
fn new_from_default(default: &Settings) -> Result<Self, config::ConfigError> {
let config: config::Config = config::Config::new();
let mut settings: Settings = config
fn new_from_default(default: &Settings) -> Result<Self, ConfigError> {
let builder = Config::builder();
let config: Config = builder
// use defaults
.with_merged(config::Config::try_from(default).unwrap())?
.add_source(Config::try_from(default)?)
// override with file contents
.with_merged(config::File::with_name("config"))?
.try_into()?;
.add_source(File::with_name("config"))
.build()?
.try_into()
.unwrap();
let mut settings: Settings = config.try_deserialize()?;
// ensure connection pool size is logical
if settings.database.min_conn > settings.database.max_conn {
panic!(

432
src/db.rs
View File

@@ -7,6 +7,7 @@ use crate::hexrange::hex_range;
use crate::hexrange::HexSearch;
use crate::nip05;
use crate::schema::{upgrade_db, STARTUP_SQL};
use crate::subscription::ReqFilter;
use crate::subscription::Subscription;
use crate::utils::is_hex;
use governor::clock::Clock;
@@ -27,6 +28,13 @@ use tokio::task;
pub type SqlitePool = r2d2::Pool<r2d2_sqlite::SqliteConnectionManager>;
pub type PooledConnection = r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>;
/// Events submitted from a client, with a return channel for notices
pub struct SubmittedEvent {
pub event: Event,
pub notice_tx: tokio::sync::mpsc::Sender<String>,
}
/// Database file
pub const DB_FILE: &str = "nostr.db";
@@ -76,7 +84,7 @@ pub fn build_conn(flags: OpenFlags) -> Result<Connection> {
/// Spawn a database writer that persists events to the SQLite store.
pub async fn db_writer(
mut event_rx: tokio::sync::mpsc::Receiver<Event>,
mut event_rx: tokio::sync::mpsc::Receiver<SubmittedEvent>,
bcast_tx: tokio::sync::broadcast::Sender<Event>,
metadata_tx: tokio::sync::broadcast::Sender<Event>,
mut shutdown: tokio::sync::broadcast::Receiver<()>,
@@ -131,18 +139,20 @@ pub async fn db_writer(
break;
}
let mut event_write = false;
let event = next_event.unwrap();
let subm_event = next_event.unwrap();
let event = subm_event.event;
let notice_tx = subm_event.notice_tx;
// check if this event is authorized.
if let Some(allowed_addrs) = whitelist {
debug!("Checking against pubkey whitelist");
// if the event address is not in allowed_addrs.
if !allowed_addrs.contains(&event.pubkey) {
info!(
"Rejecting event {}, unauthorized author",
event.get_event_id_prefix()
);
// TODO: define a channel that can send NOTICEs back to the client.
notice_tx
.try_send("pubkey is not allowed to publish to this relay".to_owned())
.ok();
continue;
}
}
@@ -171,6 +181,12 @@ pub async fn db_writer(
uv.name.to_string(),
event.get_author_prefix()
);
notice_tx
.try_send(
"NIP-05 verification is no longer valid (expired/wrong domain)"
.to_owned(),
)
.ok();
continue;
}
}
@@ -179,6 +195,9 @@ pub async fn db_writer(
"no verification records found for pubkey: {:?}",
event.get_author_prefix()
);
notice_tx
.try_send("NIP-05 verification needed to publish events".to_owned())
.ok();
continue;
}
Err(e) => {
@@ -189,24 +208,41 @@ pub async fn db_writer(
}
// TODO: cache recent list of authors to remove a DB call.
let start = Instant::now();
match write_event(&mut pool.get()?, &event) {
Ok(updated) => {
if updated == 0 {
trace!("ignoring duplicate event");
} else {
info!(
"persisted event {:?} from {:?} in {:?}",
event.get_event_id_prefix(),
event.get_author_prefix(),
start.elapsed()
);
event_write = true;
// send this out to all clients
bcast_tx.send(event.clone()).ok();
if event.kind >= 20000 && event.kind < 30000 {
info!(
"published ephemeral event {:?} from {:?} in {:?}",
event.get_event_id_prefix(),
event.get_author_prefix(),
start.elapsed()
);
bcast_tx.send(event.clone()).ok();
event_write = true
} else {
match write_event(&mut pool.get()?, &event) {
Ok(updated) => {
if updated == 0 {
trace!("ignoring duplicate event");
} else {
info!(
"persisted event {:?} from {:?} in {:?}",
event.get_event_id_prefix(),
event.get_author_prefix(),
start.elapsed()
);
event_write = true;
// send this out to all clients
bcast_tx.send(event.clone()).ok();
}
}
Err(err) => {
warn!("event insert failed: {:?}", err);
notice_tx
.try_send(
"relay experienced an error trying to publish the latest event"
.to_owned(),
)
.ok();
}
}
Err(err) => {
warn!("event insert failed: {:?}", err);
}
}
@@ -278,35 +314,45 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
}
}
}
// if this event is for a metadata update, hide every other kind=0
// event from the same author that was issued earlier than this.
if e.kind == 0 {
// if this event is replaceable update, hide every other replaceable
// event with the same kind from the same author that was issued
// earlier than this.
if e.kind == 0 || e.kind == 3 || (e.kind >= 10000 && e.kind < 20000) {
let update_count = tx.execute(
"UPDATE event SET hidden=TRUE WHERE id!=? AND kind=0 AND author=? AND created_at <= ? and hidden!=TRUE",
params![ev_id, hex::decode(&e.pubkey).ok(), e.created_at],
"UPDATE event SET hidden=TRUE WHERE id!=? AND kind=? AND author=? AND created_at <= ? and hidden!=TRUE",
params![ev_id, e.kind, hex::decode(&e.pubkey).ok(), e.created_at],
)?;
if update_count > 0 {
info!(
"hid {} older metadata events for author {:?}",
"hid {} older replaceable kind {} events for author {:?}",
update_count,
e.kind,
e.get_author_prefix()
);
}
}
// if this event is for a contact update, hide every other kind=3
// event from the same author that was issued earlier than this.
if e.kind == 3 {
let update_count = tx.execute(
"UPDATE event SET hidden=TRUE WHERE id!=? AND kind=3 AND author=? AND created_at <= ? and hidden!=TRUE",
params![ev_id, hex::decode(&e.pubkey).ok(), e.created_at],
)?;
if update_count > 0 {
info!(
"hid {} older contact events for author {:?}",
update_count,
e.get_author_prefix()
);
}
// if this event is a deletion, hide the referenced events from the same author.
if e.kind == 5 {
let event_candidates = e.tag_values_by_name("e");
let mut params: Vec<Box<dyn ToSql>> = vec![];
// first parameter will be author
params.push(Box::new(hex::decode(&e.pubkey)?));
event_candidates
.iter()
.filter(|x| is_hex(x) && x.len() == 64)
.filter_map(|x| hex::decode(x).ok())
.for_each(|x| params.push(Box::new(x)));
let query = format!(
"UPDATE event SET hidden=TRUE WHERE author=? AND event_hash IN ({})",
repeat_vars(params.len() - 1)
);
let mut stmt = tx.prepare(&query)?;
let update_count = stmt.execute(rusqlite::params_from_iter(params))?;
info!(
"hid {} deleted events for author {:?}",
update_count,
e.get_author_prefix()
);
}
tx.commit()?;
Ok(ins_count)
@@ -332,142 +378,156 @@ fn repeat_vars(count: usize) -> String {
s
}
/// Create a dynamic SQL query string and params from a subscription.
fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
/// Create a dynamic SQL subquery and params from a subscription filter.
fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
// build a dynamic SQL query. all user-input is either an integer
// (sqli-safe), or a string that is filtered to only contain
// hexadecimal characters. Strings that require escaping (tag
// names/values) use parameters.
let mut query =
"SELECT DISTINCT(e.content) FROM event e LEFT JOIN tag t ON e.id=t.event_id ".to_owned();
// parameters
"SELECT DISTINCT(e.content), e.created_at FROM event e LEFT JOIN tag t ON e.id=t.event_id "
.to_owned();
// query parameters for SQLite
let mut params: Vec<Box<dyn ToSql>> = vec![];
// for every filter in the subscription, generate a where clause
let mut filter_clauses: Vec<String> = Vec::new();
for f in sub.filters.iter() {
// individual filter components
let mut filter_components: Vec<String> = Vec::new();
// Query for "authors", allowing prefix matches
if let Some(authvec) = &f.authors {
// take each author and convert to a hexsearch
let mut auth_searches: Vec<String> = vec![];
for auth in authvec {
match hex_range(auth) {
Some(HexSearch::Exact(ex)) => {
auth_searches.push("author=?".to_owned());
params.push(Box::new(ex));
}
Some(HexSearch::Range(lower, upper)) => {
auth_searches.push("(author>? AND author<?)".to_owned());
params.push(Box::new(lower));
params.push(Box::new(upper));
}
Some(HexSearch::LowerOnly(lower)) => {
auth_searches.push("author>?".to_owned());
params.push(Box::new(lower));
}
None => {
info!("Could not parse hex range from author {:?}", auth);
}
// individual filter components (single conditions such as an author or event ID)
let mut filter_components: Vec<String> = Vec::new();
// Query for "authors", allowing prefix matches
if let Some(authvec) = &f.authors {
// take each author and convert to a hexsearch
let mut auth_searches: Vec<String> = vec![];
for auth in authvec {
match hex_range(auth) {
Some(HexSearch::Exact(ex)) => {
auth_searches.push("author=?".to_owned());
params.push(Box::new(ex));
}
Some(HexSearch::Range(lower, upper)) => {
auth_searches.push("(author>? AND author<?)".to_owned());
params.push(Box::new(lower));
params.push(Box::new(upper));
}
Some(HexSearch::LowerOnly(lower)) => {
auth_searches.push("author>?".to_owned());
params.push(Box::new(lower));
}
None => {
info!("Could not parse hex range from author {:?}", auth);
}
}
let authors_clause = format!("({})", auth_searches.join(" OR "));
filter_components.push(authors_clause);
}
// Query for Kind
if let Some(ks) = &f.kinds {
// kind is number, no escaping needed
let str_kinds: Vec<String> = ks.iter().map(|x| x.to_string()).collect();
let kind_clause = format!("kind IN ({})", str_kinds.join(", "));
filter_components.push(kind_clause);
}
// Query for event, allowing prefix matches
if let Some(idvec) = &f.ids {
// take each author and convert to a hexsearch
let mut id_searches: Vec<String> = vec![];
for id in idvec {
match hex_range(id) {
Some(HexSearch::Exact(ex)) => {
id_searches.push("event_hash=?".to_owned());
params.push(Box::new(ex));
}
Some(HexSearch::Range(lower, upper)) => {
id_searches.push("(event_hash>? AND event_hash<?)".to_owned());
params.push(Box::new(lower));
params.push(Box::new(upper));
}
Some(HexSearch::LowerOnly(lower)) => {
id_searches.push("event_hash>?".to_owned());
params.push(Box::new(lower));
}
None => {
info!("Could not parse hex range from id {:?}", id);
}
let authors_clause = format!("({})", auth_searches.join(" OR "));
filter_components.push(authors_clause);
}
// Query for Kind
if let Some(ks) = &f.kinds {
// kind is number, no escaping needed
let str_kinds: Vec<String> = ks.iter().map(|x| x.to_string()).collect();
let kind_clause = format!("kind IN ({})", str_kinds.join(", "));
filter_components.push(kind_clause);
}
// Query for event, allowing prefix matches
if let Some(idvec) = &f.ids {
// take each author and convert to a hexsearch
let mut id_searches: Vec<String> = vec![];
for id in idvec {
match hex_range(id) {
Some(HexSearch::Exact(ex)) => {
id_searches.push("event_hash=?".to_owned());
params.push(Box::new(ex));
}
Some(HexSearch::Range(lower, upper)) => {
id_searches.push("(event_hash>? AND event_hash<?)".to_owned());
params.push(Box::new(lower));
params.push(Box::new(upper));
}
Some(HexSearch::LowerOnly(lower)) => {
id_searches.push("event_hash>?".to_owned());
params.push(Box::new(lower));
}
None => {
info!("Could not parse hex range from id {:?}", id);
}
}
let id_clause = format!("({})", id_searches.join(" OR "));
filter_components.push(id_clause);
}
// Query for tags
if let Some(map) = &f.tags {
for (key, val) in map.iter() {
let mut str_vals: Vec<Box<dyn ToSql>> = vec![];
let mut blob_vals: Vec<Box<dyn ToSql>> = vec![];
for v in val {
if is_hex(v) {
if let Ok(h) = hex::decode(&v) {
blob_vals.push(Box::new(h));
}
} else {
str_vals.push(Box::new(v.to_owned()));
let id_clause = format!("({})", id_searches.join(" OR "));
filter_components.push(id_clause);
}
// Query for tags
if let Some(map) = &f.tags {
for (key, val) in map.iter() {
let mut str_vals: Vec<Box<dyn ToSql>> = vec![];
let mut blob_vals: Vec<Box<dyn ToSql>> = vec![];
for v in val {
if is_hex(v) {
if let Ok(h) = hex::decode(&v) {
blob_vals.push(Box::new(h));
}
} else {
str_vals.push(Box::new(v.to_owned()));
}
// create clauses with "?" params for each tag value being searched
let str_clause = format!("value IN ({})", repeat_vars(str_vals.len()));
let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
let tag_clause = format!("(name=? AND ({} OR {}))", str_clause, blob_clause);
// add the tag name as the first parameter
params.push(Box::new(key.to_owned()));
// add all tag values that are plain strings as params
params.append(&mut str_vals);
// add all tag values that are blobs as params
params.append(&mut blob_vals);
filter_components.push(tag_clause);
}
}
// Query for timestamp
if f.since.is_some() {
let created_clause = format!("created_at > {}", f.since.unwrap());
filter_components.push(created_clause);
}
// Query for timestamp
if f.until.is_some() {
let until_clause = format!("created_at < {}", f.until.unwrap());
filter_components.push(until_clause);
}
// combine all clauses, and add to filter_clauses
if !filter_components.is_empty() {
let mut fc = "( ".to_owned();
fc.push_str(&filter_components.join(" AND "));
fc.push_str(" )");
filter_clauses.push(fc);
// create clauses with "?" params for each tag value being searched
let str_clause = format!("value IN ({})", repeat_vars(str_vals.len()));
let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
let tag_clause = format!("(name=? AND ({} OR {}))", str_clause, blob_clause);
// add the tag name as the first parameter
params.push(Box::new(key.to_owned()));
// add all tag values that are plain strings as params
params.append(&mut str_vals);
// add all tag values that are blobs as params
params.append(&mut blob_vals);
filter_components.push(tag_clause);
}
}
// Query for timestamp
if f.since.is_some() {
let created_clause = format!("created_at > {}", f.since.unwrap());
filter_components.push(created_clause);
}
// Query for timestamp
if f.until.is_some() {
let until_clause = format!("created_at < {}", f.until.unwrap());
filter_components.push(until_clause);
}
// never display hidden events
query.push_str(" WHERE hidden!=TRUE ");
// combine all filters with OR clauses, if any exist
if !filter_clauses.is_empty() {
query.push_str(" AND (");
query.push_str(&filter_clauses.join(" OR "));
query.push_str(") ");
query.push_str(" WHERE hidden!=TRUE");
// build filter component conditions
if !filter_components.is_empty() {
query.push_str(" AND ");
query.push_str(&filter_components.join(" AND "));
}
// add order clause
query.push_str(" ORDER BY created_at ASC");
debug!("query string: {}", query);
// Apply per-filter limit to this subquery.
// The use of a LIMIT implies a DESC order, to capture only the most recent events.
if let Some(lim) = f.limit {
query.push_str(&format!(" ORDER BY e.created_at DESC LIMIT {}", lim))
} else {
query.push_str(" ORDER BY e.created_at ASC")
}
(query, params)
}
/// Create a dynamic SQL query string and params from a subscription.
fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
// build a dynamic SQL query for an entire subscription, based on
// SQL subqueries for filters.
let mut subqueries: Vec<String> = Vec::new();
// subquery params
let mut params: Vec<Box<dyn ToSql>> = vec![];
// for every filter in the subscription, generate a subquery
for f in sub.filters.iter() {
let (f_subquery, mut f_params) = query_from_filter(&f);
subqueries.push(f_subquery);
params.append(&mut f_params);
}
// encapsulate subqueries into select statements
let subqueries_selects: Vec<String> = subqueries
.iter()
.map(|s| {
return format!("SELECT content, created_at FROM ({})", s);
})
.collect();
let query: String = subqueries_selects.join(" UNION ");
info!("final query string: {}", query);
(query, params)
}
@@ -479,7 +539,7 @@ fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
/// query is immediately aborted.
pub async fn db_query(
sub: Subscription,
conn: PooledConnection,
pool: SqlitePool,
query_tx: tokio::sync::mpsc::Sender<QueryResult>,
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
) {
@@ -489,29 +549,49 @@ pub async fn db_query(
let start = Instant::now();
// generate SQL query
let (q, p) = query_from_sub(&sub);
// execute the query. Don't cache, since queries vary so much.
let mut stmt = conn.prepare(&q)?;
let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
while let Some(row) = event_rows.next()? {
// check if this is still active (we could do this every N rows)
if abandon_query_rx.try_recv().is_ok() {
debug!("query aborted");
return Ok(());
debug!("SQL generated in {:?}", start.elapsed());
// show pool stats
debug!("DB pool stats: {:?}", pool.state());
let start = Instant::now();
if let Ok(conn) = pool.get() {
// execute the query. Don't cache, since queries vary so much.
let mut stmt = conn.prepare(&q)?;
let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
let mut first_result = true;
while let Some(row) = event_rows.next()? {
if first_result {
debug!("time to first result: {:?}", start.elapsed());
first_result = false;
}
// check if this is still active
// TODO: check every N rows
if abandon_query_rx.try_recv().is_ok() {
debug!("query aborted");
return Ok(());
}
row_count += 1;
let event_json = row.get(0)?;
query_tx
.blocking_send(QueryResult {
sub_id: sub.get_id(),
event: event_json,
})
.ok();
}
row_count += 1;
let event_json = row.get(0)?;
query_tx
.blocking_send(QueryResult {
sub_id: sub.get_id(),
event: event_json,
event: "EOSE".to_string(),
})
.ok();
debug!(
"query completed ({} rows) in {:?}",
row_count,
start.elapsed()
);
} else {
warn!("Could not get a database connection for querying");
}
debug!(
"query completed ({} rows) in {:?}",
row_count,
start.elapsed()
);
let ok: Result<()> = Ok(());
ok
});

View File

@@ -48,6 +48,8 @@ pub enum Error {
JoinError,
#[error("Hyper Client error")]
HyperError(hyper::Error),
#[error("Hex encoding error")]
HexError(hex::FromHexError),
#[error("Unknown/Undocumented")]
UnknownError,
}
@@ -58,6 +60,12 @@ pub enum Error {
// }
//}
impl From<hex::FromHexError> for Error {
fn from(h: hex::FromHexError) -> Self {
Error::HexError(h)
}
}
impl From<hyper::Error> for Error {
fn from(h: hyper::Error) -> Self {
Error::HyperError(h)

View File

@@ -124,6 +124,16 @@ impl Event {
self.pubkey.chars().take(8).collect()
}
/// Retrieve tag values
pub fn tag_values_by_name(&self, tag_name: &str) -> Vec<String> {
self.tags
.iter()
.filter(|x| x.len() > 1)
.filter(|x| x.get(0).unwrap() == tag_name)
.map(|x| x.get(1).unwrap().to_owned())
.collect()
}
/// Check if this event has a valid signature.
fn is_valid(&self) -> bool {
// TODO: return a Result with a reason for invalid events
@@ -136,7 +146,7 @@ impl Event {
if curr_time + (allowable_future as u64) < self.created_at {
let delta = self.created_at - curr_time;
debug!(
"Event is too far in the future ({} seconds), rejecting",
"event is too far in the future ({} seconds), rejecting",
delta
);
return false;
@@ -169,11 +179,11 @@ impl Event {
let verify = SECP.verify_schnorr(&sig, &msg, &pubkey);
matches!(verify, Ok(()))
} else {
debug!("Client sent malformed pubkey");
debug!("client sent malformed pubkey");
false
}
} else {
info!("Error converting digest to secp256k1 message");
info!("error converting digest to secp256k1 message");
false
}
}
@@ -332,6 +342,32 @@ mod tests {
assert_eq!(c, expected);
}
#[test]
fn event_tag_select() {
let e = Event {
id: "999".to_owned(),
pubkey: "012345".to_owned(),
created_at: 501234,
kind: 1,
tags: vec![
vec!["j".to_owned(), "abc".to_owned()],
vec!["e".to_owned(), "foo".to_owned()],
vec!["e".to_owned(), "bar".to_owned()],
vec!["e".to_owned(), "baz".to_owned()],
vec![
"p".to_owned(),
"aaaa".to_owned(),
"ws://example.com".to_owned(),
],
],
content: "this is a test".to_owned(),
sig: "abcde".to_owned(),
tagidx: None,
};
let v = e.tag_values_by_name("e");
assert_eq!(v, vec!["foo", "bar", "baz"]);
}
#[test]
fn event_canonical_with_tags() {
let e = Event {

View File

@@ -80,6 +80,7 @@ pub fn hex_range(s: &str) -> Option<HexSearch> {
#[cfg(test)]
mod tests {
use super::*;
use crate::error::Result;
#[test]
fn hex_range_exact() -> Result<()> {

View File

@@ -35,7 +35,7 @@ impl From<config::Info> for RelayInfo {
description: i.description,
pubkey: i.pubkey,
contact: i.contact,
supported_nips: Some(vec![1, 2, 11]),
supported_nips: Some(vec![1, 2, 11, 15, 16]),
software: Some("https://git.sr.ht/~gheartsfield/nostr-rs-relay".to_owned()),
version: CARGO_PKG_VERSION.map(|x| x.to_owned()),
}

View File

@@ -2,12 +2,11 @@ pub mod close;
pub mod config;
pub mod conn;
pub mod db;
pub mod schema;
pub mod error;
pub mod event;
pub mod hexrange;
pub mod info;
pub mod nip05;
pub mod protostream;
pub mod schema;
pub mod subscription;
pub mod utils;

View File

@@ -9,27 +9,34 @@ use hyper::{
};
use log::*;
use nostr_rs_relay::close::Close;
use nostr_rs_relay::close::CloseCmd;
use nostr_rs_relay::config;
use nostr_rs_relay::conn;
use nostr_rs_relay::db;
use nostr_rs_relay::db::SubmittedEvent;
use nostr_rs_relay::error::{Error, Result};
use nostr_rs_relay::event::Event;
use nostr_rs_relay::event::EventCmd;
use nostr_rs_relay::info::RelayInfo;
use nostr_rs_relay::nip05;
use nostr_rs_relay::protostream;
use nostr_rs_relay::protostream::NostrMessage::*;
use nostr_rs_relay::protostream::NostrResponse::*;
use nostr_rs_relay::subscription::Subscription;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::convert::Infallible;
use std::env;
use std::net::SocketAddr;
use std::path::Path;
use std::time::Duration;
use std::time::Instant;
use tokio::runtime::Builder;
use tokio::sync::broadcast::{self, Receiver, Sender};
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio_tungstenite::WebSocketStream;
use tungstenite::error::Error as WsError;
use tungstenite::handshake;
use tungstenite::protocol::Message;
use tungstenite::protocol::WebSocketConfig;
/// Return a requested DB name from command line arguments.
@@ -46,7 +53,7 @@ async fn handle_web_request(
pool: db::SqlitePool,
remote_addr: SocketAddr,
broadcast: Sender<Event>,
event_tx: tokio::sync::mpsc::Sender<Event>,
event_tx: tokio::sync::mpsc::Sender<SubmittedEvent>,
shutdown: Receiver<()>,
) -> Result<Response<Body>, Infallible> {
match (
@@ -55,7 +62,7 @@ async fn handle_web_request(
) {
// Request for / as websocket
("/", true) => {
debug!("websocket with upgrade request");
trace!("websocket with upgrade request");
//assume request is a handshake, so create the handshake response
let response = match handshake::server::create_response_with_body(&request, || {
Body::empty()
@@ -227,7 +234,8 @@ fn main() -> Result<(), Error> {
let (bcast_tx, _) = broadcast::channel::<Event>(settings.limits.broadcast_buffer);
// validated events that need to be persisted are sent to the
// database on via this channel.
let (event_tx, event_rx) = mpsc::channel::<Event>(settings.limits.event_persist_buffer);
let (event_tx, event_rx) =
mpsc::channel::<SubmittedEvent>(settings.limits.event_persist_buffer);
// establish a channel for letting all threads now about a
// requested server shutdown.
let (invoke_shutdown, shutdown_listen) = broadcast::channel::<()>(1);
@@ -312,32 +320,84 @@ fn main() -> Result<(), Error> {
Ok(())
}
/// Nostr protocol messages from a client
#[derive(Deserialize, Serialize, Clone, PartialEq, Debug)]
#[serde(untagged)]
pub enum NostrMessage {
/// An `EVENT` message
EventMsg(EventCmd),
/// A `REQ` message
SubMsg(Subscription),
/// A `CLOSE` message
CloseMsg(CloseCmd),
}
/// Convert Message to NostrMessage
fn convert_to_msg(msg: String) -> Result<NostrMessage> {
let config = config::SETTINGS.read().unwrap();
let parsed_res: Result<NostrMessage> = serde_json::from_str(&msg).map_err(|e| e.into());
match parsed_res {
Ok(m) => {
if let NostrMessage::EventMsg(_) = m {
if let Some(max_size) = config.limits.max_event_bytes {
// check length, ensure that some max size is set.
if msg.len() > max_size && max_size > 0 {
return Err(Error::EventMaxLengthError(msg.len()));
}
}
}
Ok(m)
}
Err(e) => {
debug!("proto parse error: {:?}", e);
debug!("parse error on message: {}", msg.trim());
Err(Error::ProtoParseError)
}
}
}
/// Turn a string into a NOTICE message ready to send over a WebSocket
fn make_notice_message(msg: &str) -> Message {
Message::text(json!(["NOTICE", msg]).to_string())
}
/// Handle new client connections. This runs through an event loop
/// for all client communication.
async fn nostr_server(
pool: db::SqlitePool,
ws_stream: WebSocketStream<Upgraded>,
mut ws_stream: WebSocketStream<Upgraded>,
broadcast: Sender<Event>,
event_tx: tokio::sync::mpsc::Sender<Event>,
event_tx: mpsc::Sender<SubmittedEvent>,
mut shutdown: Receiver<()>,
) {
// get a broadcast channel for clients to communicate on
let mut bcast_rx = broadcast.subscribe();
// upgrade the TCP connection to WebSocket
//let conn = tokio_tungstenite::accept_async_with_config(stream, Some(config)).await;
//let ws_stream = conn.expect("websocket handshake error");
// wrap websocket into a stream & sink of Nostr protocol messages
let mut nostr_stream = protostream::wrap_ws_in_nostr(ws_stream);
// Track internal client state
let mut conn = conn::ClientConn::new();
let cid = conn.get_client_prefix();
// Create a channel for receiving query results from the database.
// we will send out the tx handle to any query we generate.
let (query_tx, mut query_rx) = mpsc::channel::<db::QueryResult>(256);
// Create channel for receiving NOTICEs
let (notice_tx, mut notice_rx) = mpsc::channel::<String>(32);
// last time this client sent data (message, ping, etc.)
let mut last_message_time = Instant::now();
// ping interval (every 5 minutes)
let default_ping_dur = Duration::from_secs(300);
// disconnect after 20 minutes without a ping response or event.
let max_quiet_time = Duration::from_secs(60 * 20);
let start = tokio::time::Instant::now() + default_ping_dur;
let mut ping_interval = tokio::time::interval_at(start, default_ping_dur);
// maintain a hashmap of a oneshot channel for active subscriptions.
// when these subscriptions are cancelled, make a message
// available to the executing query so it knows to stop.
let mut running_queries: HashMap<String, oneshot::Sender<()>> = HashMap::new();
// for stats, keep track of how many events the client published,
// and how many it received from queries.
let mut client_published_event_count: usize = 0;
@@ -349,11 +409,31 @@ async fn nostr_server(
// server shutting down, exit loop
break;
},
_ = ping_interval.tick() => {
// check how long since we talked to client
// if it has been too long, disconnect
if last_message_time.elapsed() > max_quiet_time {
debug!("ending connection due to lack of client ping response");
break;
}
// Send a ping
ws_stream.send(Message::Ping(Vec::new())).await.ok();
},
Some(notice_msg) = notice_rx.recv() => {
ws_stream.send(make_notice_message(&notice_msg)).await.ok();
},
Some(query_result) = query_rx.recv() => {
// database informed us of a query result we asked for
let res = EventRes(query_result.sub_id,query_result.event);
client_received_event_count += 1;
nostr_stream.send(res).await.ok();
let subesc = query_result.sub_id.replace("\"", "");
if query_result.event == "EOSE" {
let send_str = format!("[\"EOSE\",\"{}\"]", subesc);
ws_stream.send(Message::Text(send_str)).await.ok();
} else {
client_received_event_count += 1;
// send a result
let send_str = format!("[\"EVENT\",\"{}\",{}]", subesc, &query_result.event);
ws_stream.send(Message::Text(send_str)).await.ok();
}
},
// TODO: consider logging the LaggedRecv error
Ok(global_event) = bcast_rx.recv() => {
@@ -368,17 +448,55 @@ async fn nostr_server(
cid, s,
global_event.get_event_id_prefix());
// create an event response and send it
let res = EventRes(s.to_owned(),event_str);
nostr_stream.send(res).await.ok();
let subesc = s.replace("\"", "");
ws_stream.send(Message::Text(format!("[\"EVENT\",\"{}\",{}]", subesc, event_str))).await.ok();
//nostr_stream.send(res).await.ok();
} else {
warn!("could not serialize event {:?}", global_event.get_event_id_prefix());
}
}
},
// check if this client has a subscription
proto_next = nostr_stream.next() => {
match proto_next {
Some(Ok(EventMsg(ec))) => {
ws_next = ws_stream.next() => {
// update most recent message time for client
last_message_time = Instant::now();
// Consume text messages from the client, parse into Nostr messages.
let nostr_msg = match ws_next {
Some(Ok(Message::Text(m))) => {
convert_to_msg(m)
},
Some(Ok(Message::Binary(_))) => {
ws_stream.send(make_notice_message("binary messages are not accepted")).await.ok();
continue;
},
Some(Ok(Message::Ping(_))) | Some(Ok(Message::Pong(_))) => {
// get a ping/pong, ignore. tungstenite will
// send responses automatically.
continue;
},
None |
Some(Ok(Message::Close(_))) |
Some(Err(WsError::AlreadyClosed)) |
Some(Err(WsError::ConnectionClosed)) |
Some(Err(WsError::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake)))
=> {
debug!("websocket close from client: {:?}",cid);
break;
},
Some(Err(WsError::Io(e))) => {
// IO errors are considered fatal
warn!("IO error (client: {:?}): {:?}", cid, e);
break;
}
x => {
// default condition on error is to close the client connection
info!("unknown error (client: {:?}): {:?} (closing conn)", cid, x);
break;
}
};
// convert ws_next into proto_next
match nostr_msg {
Ok(NostrMessage::EventMsg(ec)) => {
// An EventCmd needs to be validated to be converted into an Event
// handle each type of message
let parsed : Result<Event> = Result::<Event>::from(ec);
@@ -386,23 +504,18 @@ async fn nostr_server(
Ok(e) => {
let id_prefix:String = e.id.chars().take(8).collect();
debug!("successfully parsed/validated event: {:?} from client: {:?}", id_prefix, cid);
// TODO: consider moving some/all
// authorization checks here, instead
// of the DB module, so we can send a
// proper NOTICE back to the client if
// they are unable to write.
// Write this to the database
event_tx.send(e.clone()).await.ok();
// Write this to the database.
let submit_event = SubmittedEvent { event: e.clone(), notice_tx: notice_tx.clone() };
event_tx.send(submit_event).await.ok();
client_published_event_count += 1;
},
Err(_) => {
info!("client {:?} sent an invalid event", cid);
nostr_stream.send(NoticeRes("event was invalid".to_owned())).await.ok();
ws_stream.send(make_notice_message("event was invalid")).await.ok();
}
}
},
Some(Ok(SubMsg(s))) => {
Ok(NostrMessage::SubMsg(s)) => {
debug!("client {} requesting a subscription", cid);
// subscription handling consists of:
// * registering the subscription so future events can be matched
@@ -416,18 +529,15 @@ async fn nostr_server(
previous_query.send(()).ok();
}
// start a database query
// show pool stats
debug!("DB pool stats: {:?}", pool.state());
db::db_query(s, pool.get().expect("could not get connection"), query_tx.clone(), abandon_query_rx).await;
db::db_query(s, pool.clone(), query_tx.clone(), abandon_query_rx).await;
},
Err(e) => {
info!("Subscription error: {}", e);
nostr_stream.send(NoticeRes(format!("{}",e))).await.ok();
ws_stream.send(make_notice_message(&e.to_string())).await.ok();
}
}
},
Some(Ok(CloseMsg(cc))) => {
Ok(NostrMessage::CloseMsg(cc)) => {
// closing a request simply removes the subscription.
let parsed : Result<Close> = Result::<Close>::from(cc);
match parsed {
@@ -444,23 +554,23 @@ async fn nostr_server(
},
Err(_) => {
info!("invalid command ignored");
ws_stream.send(make_notice_message("could not parse command")).await.ok();
}
}
},
None => {
debug!("normal websocket close from client: {:?}",cid);
break;
},
Some(Err(Error::ConnError)) => {
Err(Error::ConnError) => {
debug!("got connection close/error, disconnecting client: {:?}",cid);
break;
}
Some(Err(Error::EventMaxLengthError(s))) => {
Err(Error::EventMaxLengthError(s)) => {
info!("client {:?} sent event larger ({} bytes) than max size", cid, s);
nostr_stream.send(NoticeRes("event exceeded max size".to_owned())).await.ok();
ws_stream.send(make_notice_message("event exceeded max size")).await.ok();
},
Some(Err(e)) => {
Err(Error::ProtoParseError) => {
info!("client {:?} sent event that could not be parsed", cid);
ws_stream.send(make_notice_message("could not parse command")).await.ok();
},
Err(e) => {
info!("got non-fatal error from client: {:?}, error: {:?}", cid, e);
},
}

View File

@@ -1,141 +0,0 @@
//! Nostr protocol layered over WebSocket
use crate::close::CloseCmd;
use crate::config;
use crate::error::{Error, Result};
use crate::event::EventCmd;
use crate::subscription::Subscription;
use core::pin::Pin;
use futures::sink::Sink;
use futures::stream::Stream;
use futures::task::Context;
use futures::task::Poll;
use hyper::upgrade::Upgraded;
use log::*;
use serde::{Deserialize, Serialize};
use tokio_tungstenite::WebSocketStream;
use tungstenite::error::Error as WsError;
use tungstenite::protocol::Message;
/// Nostr protocol messages from a client
#[derive(Deserialize, Serialize, Clone, PartialEq, Debug)]
#[serde(untagged)]
pub enum NostrMessage {
/// An `EVENT` message
EventMsg(EventCmd),
/// A `REQ` message
SubMsg(Subscription),
/// A `CLOSE` message
CloseMsg(CloseCmd),
}
/// Nostr protocol messages from a relay/server
#[derive(Deserialize, Serialize, Clone, PartialEq, Debug)]
pub enum NostrResponse {
/// A `NOTICE` response
NoticeRes(String),
/// An `EVENT` response, composed of the subscription identifier,
/// and serialized event JSON
EventRes(String, String),
}
/// A Nostr protocol stream is layered on top of a Websocket stream.
pub struct NostrStream {
ws_stream: WebSocketStream<Upgraded>,
}
/// Given a websocket, return a protocol stream wrapper.
pub fn wrap_ws_in_nostr(ws: WebSocketStream<Upgraded>) -> NostrStream {
NostrStream { ws_stream: ws }
}
/// Implement the [`Stream`] interface to produce Nostr messages.
impl Stream for NostrStream {
type Item = Result<NostrMessage>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// get the configuration
/// Convert Message to NostrMessage
fn convert(msg: String) -> Result<NostrMessage> {
let config = config::SETTINGS.read().unwrap();
let parsed_res: Result<NostrMessage> = serde_json::from_str(&msg).map_err(|e| e.into());
match parsed_res {
Ok(m) => {
if let NostrMessage::EventMsg(_) = m {
if let Some(max_size) = config.limits.max_event_bytes {
// check length, ensure that some max size is set.
if msg.len() > max_size && max_size > 0 {
return Err(Error::EventMaxLengthError(msg.len()));
}
}
}
Ok(m)
}
Err(e) => {
debug!("proto parse error: {:?}", e);
debug!("parse error on message: {}", msg.trim());
Err(Error::ProtoParseError)
}
}
}
match Pin::new(&mut self.ws_stream).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(v)) => match v {
Ok(Message::Text(vs)) => Poll::Ready(Some(convert(vs))),
Ok(Message::Ping(x)) => {
debug!("client ping ({:?})", x);
//Pin::new(&mut self.ws_stream).start_send(Message::Pong(x));
// TODO: restructure this so that Pongs work
//Pin::new(&mut self.ws_stream).write_pending();
//info!("sent pong");
Poll::Pending
}
Ok(Message::Binary(_)) => Poll::Ready(Some(Err(Error::ProtoParseError))),
Ok(Message::Pong(_)) => Poll::Pending,
Ok(Message::Close(_)) => Poll::Ready(None),
Err(WsError::AlreadyClosed) | Err(WsError::ConnectionClosed) => Poll::Ready(None),
Err(_) => Poll::Ready(Some(Err(Error::ConnError))),
},
}
}
}
/// Implement the [`Sink`] interface to produce Nostr responses.
impl Sink<NostrResponse> for NostrStream {
type Error = Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// map the error type
match Pin::new(&mut self.ws_stream).poll_ready(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(_)) => Poll::Ready(Err(Error::ConnWriteError)),
Poll::Pending => Poll::Pending,
}
}
fn start_send(mut self: Pin<&mut Self>, item: NostrResponse) -> Result<(), Self::Error> {
// TODO: do real escaping for these - at least on NOTICE,
// which surely has some problems if arbitrary text is sent.
let send_str = match item {
NostrResponse::NoticeRes(msg) => {
let s = msg.replace("\"", "");
format!("[\"NOTICE\",\"{}\"]", s)
}
NostrResponse::EventRes(sub, eventstr) => {
let subesc = sub.replace("\"", "");
format!("[\"EVENT\",\"{}\",{}]", subesc, eventstr)
}
};
match Pin::new(&mut self.ws_stream).start_send(Message::Text(send_str)) {
Ok(()) => Ok(()),
Err(_) => Err(Error::ConnWriteError),
}
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}

View File

@@ -31,6 +31,8 @@ pub struct ReqFilter {
pub until: Option<u64>,
/// List of author public keys
pub authors: Option<Vec<String>>,
/// Limit number of results
pub limit: Option<u64>,
/// Set of tags
#[serde(skip)]
pub tags: Option<HashMap<String, HashSet<String>>>,
@@ -54,6 +56,7 @@ impl<'de> Deserialize<'de> for ReqFilter {
since: None,
until: None,
authors: None,
limit: None,
tags: None,
};
let mut ts = None;
@@ -68,6 +71,8 @@ impl<'de> Deserialize<'de> for ReqFilter {
rf.since = Deserialize::deserialize(val).ok();
} else if key == "until" {
rf.until = Deserialize::deserialize(val).ok();
} else if key == "limit" {
rf.limit = Deserialize::deserialize(val).ok();
} else if key == "authors" {
rf.authors = Deserialize::deserialize(val).ok();
} else if key.starts_with('#') && key.len() > 1 && val.is_array() {
@@ -214,6 +219,7 @@ impl ReqFilter {
// self.id.as_ref().map(|v| v == &event.id).unwrap_or(true)
self.ids_match(event)
&& self.since.map(|t| event.created_at > t).unwrap_or(true)
&& self.until.map(|t| event.created_at < t).unwrap_or(true)
&& self.kind_match(event.kind)
&& self.authors_match(event)
&& self.tag_match(event)
@@ -321,6 +327,50 @@ mod tests {
Ok(())
}
#[test]
fn interest_until() -> Result<()> {
// subscription with a filter for ID and time
let s: Subscription =
serde_json::from_str(r#"["REQ","xyz",{"ids": ["abc"], "until": 1000}]"#)?;
let e = Event {
id: "abc".to_owned(),
pubkey: "".to_owned(),
created_at: 50,
kind: 0,
tags: Vec::new(),
content: "".to_owned(),
sig: "".to_owned(),
tagidx: None,
};
assert!(s.interested_in_event(&e));
Ok(())
}
#[test]
fn interest_range() -> Result<()> {
// subscription with a filter for ID and time
let s_in: Subscription =
serde_json::from_str(r#"["REQ","xyz",{"ids": ["abc"], "since": 100, "until": 200}]"#)?;
let s_before: Subscription =
serde_json::from_str(r#"["REQ","xyz",{"ids": ["abc"], "since": 100, "until": 140}]"#)?;
let s_after: Subscription =
serde_json::from_str(r#"["REQ","xyz",{"ids": ["abc"], "since": 160, "until": 200}]"#)?;
let e = Event {
id: "abc".to_owned(),
pubkey: "".to_owned(),
created_at: 150,
kind: 0,
tags: Vec::new(),
content: "".to_owned(),
sig: "".to_owned(),
tagidx: None,
};
assert!(s_in.interested_in_event(&e));
assert!(!s_before.interested_in_event(&e));
assert!(!s_after.interested_in_event(&e));
Ok(())
}
#[test]
fn interest_time_and_id() -> Result<()> {
// subscription with a filter for ID and time