Compare commits

...

19 Commits
0.5.1 ... 0.6.0

Author SHA1 Message Date
Greg Heartsfield
23f7730fea build: bump version to 0.6.0 2022-05-10 21:19:21 -05:00
Greg Heartsfield
8aa1256254 improvement: upgrade multiple dependencies 2022-05-10 17:07:18 -05:00
Greg Heartsfield
9ed3391b46 fix(NIP-09): correct WHERE clause for event deletion 2022-05-10 16:50:52 -05:00
William Casarin
4ad483090e feat(NIP-01): Implement limit
This was quickly sneaked in by fiatjaf per my request[0], it makes many
queries more efficient and allows for paging when combined with until.

It is a bit weird to have multiple limits on each filter... for now we
just choose any or the last limit seen.

[0]: a4aea5337f

Signed-off-by: William Casarin <jb55@jb55.com>
2022-05-10 16:47:56 -05:00
Greg Heartsfield
9b351aab9b docs: update devel discussion link 2022-02-28 17:19:24 -06:00
Greg Heartsfield
597749890e improvement: remove unnecessary event logging 2022-02-27 19:30:48 -06:00
Greg Heartsfield
1d499cf12b feat: handle NIP-09 for deletion events 2022-02-27 11:35:23 -06:00
Greg Heartsfield
ed3a6b9692 refactor: simplify NOTICE messages 2022-02-26 17:34:58 -06:00
Greg Heartsfield
048199e30b build: bump version to 0.5.2 2022-02-26 11:22:16 -06:00
Greg Heartsfield
414e83f696 refactor: import cleanup for config 2022-02-26 11:16:12 -06:00
Greg Heartsfield
225c8f762e improvement: upgrade dependencies; config, tungstenite, tokio 2022-02-26 09:55:12 -06:00
Greg Heartsfield
887fc28ab2 fix: until filters in subscriptions now used 2022-02-26 09:15:45 -06:00
Greg Heartsfield
294d3b99c3 fix: correct imports for test cases 2022-02-26 09:07:07 -06:00
Greg Heartsfield
53990672ae improvement: move db pool operations closer to query, do not panic on failure 2022-02-23 16:38:16 -06:00
Greg Heartsfield
9c1b21cbfe improvement: more granular perf logging for SQL queries 2022-02-21 09:03:05 -06:00
Greg Heartsfield
2f63417646 improvement: better logging for connection resets 2022-02-21 08:57:07 -06:00
Greg Heartsfield
3b25160852 fix: abort on connection IO errors 2022-02-21 08:50:46 -06:00
Greg Heartsfield
34ad549cde fix: update event buffer size comment in config 2022-02-20 11:46:24 -06:00
Greg Heartsfield
f8b1fe5035 docs: line up comments with code 2022-02-17 16:18:05 -06:00
12 changed files with 665 additions and 341 deletions

705
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[package]
name = "nostr-rs-relay"
version = "0.5.1"
version = "0.6.0"
edition = "2021"
[dependencies]
@@ -9,12 +9,12 @@ env_logger = "^0.9"
tokio = { version = "^1.16", features = ["full"] }
futures = "^0.3"
futures-util = "^0.3"
tokio-tungstenite = "^0.16"
tungstenite = "^0.16"
tokio-tungstenite = "^0.17"
tungstenite = "^0.17"
thiserror = "^1"
uuid = { version = "^0.8", features = ["v4"] }
config = { version = "0.11", features = ["toml"] }
bitcoin_hashes = { version = "^0.9", features = ["serde"] }
config = { version = "^0.12", features = ["toml"] }
bitcoin_hashes = { version = "^0.10", features = ["serde"] }
secp256k1 = {version = "^0.21", features = ["rand", "rand-std", "serde", "bitcoin_hashes"] }
serde = { version = "^1.0", features = ["derive"] }
serde_json = {version = "^1.0", features = ["preserve_order"]}

View File

@@ -1,4 +1,4 @@
FROM rust:1.58.1 as builder
FROM rust:1.59.0 as builder
RUN USER=root cargo new --bin nostr-rs-relay
WORKDIR ./nostr-rs-relay

View File

@@ -18,7 +18,7 @@ NIPs with a relay-specific implementation are listed here.
- [x] NIP-02: Hide old contact list events
- [ ] NIP-03: OpenTimestamps
- [x] NIP-05: Mapping Nostr keys to DNS identifiers
- [ ] NIP-09: Event deletion
- [x] NIP-09: Event deletion
- [x] NIP-11: Relay information document
- [x] NIP-12: Generic tag search (_experimental_)
@@ -79,8 +79,10 @@ termination, load balancing, and other features), see [Reverse
Proxy](reverse-proxy.md).
## Dev Channel
The current dev discussions for this project is happening at https://discord.gg/ufG6fH52Vk.
Drop in to query any development related questions.
For development discussions, please feel free to use the [sourcehut
mailing list](https://lists.sr.ht/~gheartsfield/nostr-rs-relay-devel).
Or, drop by the [Nostr Telegram Channel](https://t.me/nostr_protocol).
License
---

View File

@@ -63,8 +63,8 @@ reject_future_seconds = 1800
#broadcast_buffer = 16384
# Event persistence buffer size, in number of events. This provides
# backpressure to senders if writes are slow. Defaults to 16.
#event_persist_buffer = 16
# backpressure to senders if writes are slow.
#event_persist_buffer = 4096
[authorization]
# Pubkey addresses in this array are whitelisted for event publishing.

View File

@@ -1,4 +1,5 @@
//! Configuration file and settings management
use config::{Config, ConfigError, File};
use lazy_static::lazy_static;
use log::*;
use serde::{Deserialize, Serialize};
@@ -138,27 +139,29 @@ pub struct Settings {
impl Settings {
pub fn new() -> Self {
let d = Self::default();
let default_settings = Self::default();
// attempt to construct settings with file
// Self::new_from_default(&d).unwrap_or(d)
let from_file = Self::new_from_default(&d);
let from_file = Self::new_from_default(&default_settings);
match from_file {
Ok(f) => f,
Err(e) => {
warn!("Error reading config file ({:?})", e);
d
default_settings
}
}
}
fn new_from_default(default: &Settings) -> Result<Self, config::ConfigError> {
let config: config::Config = config::Config::new();
let mut settings: Settings = config
fn new_from_default(default: &Settings) -> Result<Self, ConfigError> {
let builder = Config::builder();
let config: Config = builder
// use defaults
.with_merged(config::Config::try_from(default).unwrap())?
.add_source(Config::try_from(default)?)
// override with file contents
.with_merged(config::File::with_name("config"))?
.try_into()?;
.add_source(File::with_name("config"))
.build()?
.try_into()
.unwrap();
let mut settings: Settings = config.try_deserialize()?;
// ensure connection pool size is logical
if settings.database.min_conn > settings.database.max_conn {
panic!(

View File

@@ -332,6 +332,29 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
);
}
}
// if this event is a deletion, hide the referenced events from the same author.
if e.kind == 5 {
let event_candidates = e.tag_values_by_name("e");
let mut params: Vec<Box<dyn ToSql>> = vec![];
// first parameter will be author
params.push(Box::new(hex::decode(&e.pubkey)?));
event_candidates
.iter()
.filter(|x| is_hex(x) && x.len() == 64)
.filter_map(|x| hex::decode(x).ok())
.for_each(|x| params.push(Box::new(x)));
let query = format!(
"UPDATE event SET hidden=TRUE WHERE author=? AND event_hash IN ({})",
repeat_vars(params.len() - 1)
);
let mut stmt = tx.prepare(&query)?;
let update_count = stmt.execute(rusqlite::params_from_iter(params))?;
info!(
"hid {} deleted events for author {:?}",
update_count,
e.get_author_prefix()
);
}
tx.commit()?;
Ok(ins_count)
}
@@ -362,6 +385,7 @@ fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
// (sqli-safe), or a string that is filtered to only contain
// hexadecimal characters. Strings that require escaping (tag
// names/values) use parameters.
let mut limit: Option<u32> = None;
let mut query =
"SELECT DISTINCT(e.content) FROM event e LEFT JOIN tag t ON e.id=t.event_id ".to_owned();
// parameters
@@ -399,6 +423,9 @@ fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
let authors_clause = format!("({})", auth_searches.join(" OR "));
filter_components.push(authors_clause);
}
if let Some(lim) = f.limit {
limit = Some(lim)
}
// Query for Kind
if let Some(ks) = &f.kinds {
// kind is number, no escaping needed
@@ -490,7 +517,13 @@ fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
query.push_str(") ");
}
// add order clause
query.push_str(" ORDER BY created_at ASC");
query.push_str(&format!(
" ORDER BY created_at {}",
limit.map_or("ASC", |_| "DESC")
));
if let Some(lim) = limit {
query.push_str(&format!(" LIMIT {}", lim))
}
debug!("query string: {}", query);
(query, params)
}
@@ -503,7 +536,7 @@ fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
/// query is immediately aborted.
pub async fn db_query(
sub: Subscription,
conn: PooledConnection,
pool: SqlitePool,
query_tx: tokio::sync::mpsc::Sender<QueryResult>,
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
) {
@@ -513,29 +546,43 @@ pub async fn db_query(
let start = Instant::now();
// generate SQL query
let (q, p) = query_from_sub(&sub);
// execute the query. Don't cache, since queries vary so much.
let mut stmt = conn.prepare(&q)?;
let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
while let Some(row) = event_rows.next()? {
// check if this is still active (we could do this every N rows)
if abandon_query_rx.try_recv().is_ok() {
debug!("query aborted");
return Ok(());
debug!("SQL generated in {:?}", start.elapsed());
// show pool stats
debug!("DB pool stats: {:?}", pool.state());
let start = Instant::now();
if let Ok(conn) = pool.get() {
// execute the query. Don't cache, since queries vary so much.
let mut stmt = conn.prepare(&q)?;
let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
let mut first_result = true;
while let Some(row) = event_rows.next()? {
if first_result {
debug!("time to first result: {:?}", start.elapsed());
first_result = false;
}
// check if this is still active
// TODO: check every N rows
if abandon_query_rx.try_recv().is_ok() {
debug!("query aborted");
return Ok(());
}
row_count += 1;
let event_json = row.get(0)?;
query_tx
.blocking_send(QueryResult {
sub_id: sub.get_id(),
event: event_json,
})
.ok();
}
row_count += 1;
let event_json = row.get(0)?;
query_tx
.blocking_send(QueryResult {
sub_id: sub.get_id(),
event: event_json,
})
.ok();
debug!(
"query completed ({} rows) in {:?}",
row_count,
start.elapsed()
);
} else {
warn!("Could not get a database connection for querying");
}
debug!(
"query completed ({} rows) in {:?}",
row_count,
start.elapsed()
);
let ok: Result<()> = Ok(());
ok
});

View File

@@ -48,6 +48,8 @@ pub enum Error {
JoinError,
#[error("Hyper Client error")]
HyperError(hyper::Error),
#[error("Hex encoding error")]
HexError(hex::FromHexError),
#[error("Unknown/Undocumented")]
UnknownError,
}
@@ -58,6 +60,12 @@ pub enum Error {
// }
//}
impl From<hex::FromHexError> for Error {
fn from(h: hex::FromHexError) -> Self {
Error::HexError(h)
}
}
impl From<hyper::Error> for Error {
fn from(h: hyper::Error) -> Self {
Error::HyperError(h)

View File

@@ -124,6 +124,16 @@ impl Event {
self.pubkey.chars().take(8).collect()
}
/// Retrieve tag values
pub fn tag_values_by_name(&self, tag_name: &str) -> Vec<String> {
self.tags
.iter()
.filter(|x| x.len() > 1)
.filter(|x| x.get(0).unwrap() == tag_name)
.map(|x| x.get(1).unwrap().to_owned())
.collect()
}
/// Check if this event has a valid signature.
fn is_valid(&self) -> bool {
// TODO: return a Result with a reason for invalid events
@@ -136,7 +146,7 @@ impl Event {
if curr_time + (allowable_future as u64) < self.created_at {
let delta = self.created_at - curr_time;
debug!(
"Event is too far in the future ({} seconds), rejecting",
"event is too far in the future ({} seconds), rejecting",
delta
);
return false;
@@ -169,11 +179,11 @@ impl Event {
let verify = SECP.verify_schnorr(&sig, &msg, &pubkey);
matches!(verify, Ok(()))
} else {
debug!("Client sent malformed pubkey");
debug!("client sent malformed pubkey");
false
}
} else {
info!("Error converting digest to secp256k1 message");
info!("error converting digest to secp256k1 message");
false
}
}
@@ -332,6 +342,32 @@ mod tests {
assert_eq!(c, expected);
}
#[test]
fn event_tag_select() {
let e = Event {
id: "999".to_owned(),
pubkey: "012345".to_owned(),
created_at: 501234,
kind: 1,
tags: vec![
vec!["j".to_owned(), "abc".to_owned()],
vec!["e".to_owned(), "foo".to_owned()],
vec!["e".to_owned(), "bar".to_owned()],
vec!["e".to_owned(), "baz".to_owned()],
vec![
"p".to_owned(),
"aaaa".to_owned(),
"ws://example.com".to_owned(),
],
],
content: "this is a test".to_owned(),
sig: "abcde".to_owned(),
tagidx: None,
};
let v = e.tag_values_by_name("e");
assert_eq!(v, vec!["foo", "bar", "baz"]);
}
#[test]
fn event_canonical_with_tags() {
let e = Event {

View File

@@ -80,6 +80,7 @@ pub fn hex_range(s: &str) -> Option<HexSearch> {
#[cfg(test)]
mod tests {
use super::*;
use crate::error::Result;
#[test]
fn hex_range_exact() -> Result<()> {

View File

@@ -21,6 +21,7 @@ use nostr_rs_relay::info::RelayInfo;
use nostr_rs_relay::nip05;
use nostr_rs_relay::subscription::Subscription;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::convert::Infallible;
use std::env;
@@ -61,7 +62,7 @@ async fn handle_web_request(
) {
// Request for / as websocket
("/", true) => {
debug!("websocket with upgrade request");
trace!("websocket with upgrade request");
//assume request is a handshake, so create the handshake response
let response = match handshake::server::create_response_with_body(&request, || {
Body::empty()
@@ -355,6 +356,11 @@ fn convert_to_msg(msg: String) -> Result<NostrMessage> {
}
}
/// Turn a string into a NOTICE message ready to send over a WebSocket
fn make_notice_message(msg: &str) -> Message {
Message::text(json!(["NOTICE", msg]).to_string())
}
/// Handle new client connections. This runs through an event loop
/// for all client communication.
async fn nostr_server(
@@ -375,11 +381,7 @@ async fn nostr_server(
// Create channel for receiving NOTICEs
let (notice_tx, mut notice_rx) = mpsc::channel::<String>(32);
// maintain a hashmap of a oneshot channel for active subscriptions.
// when these subscriptions are cancelled, make a message
// available to the executing query so it knows to stop.
// last time this client sent data
// last time this client sent data (message, ping, etc.)
let mut last_message_time = Instant::now();
// ping interval (every 5 minutes)
@@ -391,7 +393,11 @@ async fn nostr_server(
let start = tokio::time::Instant::now() + default_ping_dur;
let mut ping_interval = tokio::time::interval_at(start, default_ping_dur);
// maintain a hashmap of a oneshot channel for active subscriptions.
// when these subscriptions are cancelled, make a message
// available to the executing query so it knows to stop.
let mut running_queries: HashMap<String, oneshot::Sender<()>> = HashMap::new();
// for stats, keep track of how many events the client published,
// and how many it received from queries.
let mut client_published_event_count: usize = 0;
@@ -414,8 +420,7 @@ async fn nostr_server(
ws_stream.send(Message::Ping(Vec::new())).await.ok();
},
Some(notice_msg) = notice_rx.recv() => {
let n = notice_msg.to_string().replace("\"", "");
ws_stream.send(Message::Text(format!("[\"NOTICE\",\"{}\"]", n))).await.ok();
ws_stream.send(make_notice_message(&notice_msg)).await.ok();
},
Some(query_result) = query_rx.recv() => {
// database informed us of a query result we asked for
@@ -455,20 +460,32 @@ async fn nostr_server(
convert_to_msg(m)
},
Some(Ok(Message::Binary(_))) => {
ws_stream.send(Message::Text(format!("[\"NOTICE\",\"{}\"]", "binary messages are not accepted"))).await.ok();
ws_stream.send(make_notice_message("binary messages are not accepted")).await.ok();
continue;
},
Some(Ok(Message::Ping(_))) | Some(Ok(Message::Pong(_))) => {
// get a ping/pong, ignore
// get a ping/pong, ignore. tungstenite will
// send responses automatically.
continue;
},
None | Some(Ok(Message::Close(_))) | Some(Err(WsError::AlreadyClosed)) | Some(Err(WsError::ConnectionClosed)) => {
debug!("normal websocket close from client: {:?}",cid);
None |
Some(Ok(Message::Close(_))) |
Some(Err(WsError::AlreadyClosed)) |
Some(Err(WsError::ConnectionClosed)) |
Some(Err(WsError::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake)))
=> {
debug!("websocket close from client: {:?}",cid);
break;
},
Some(Err(WsError::Io(e))) => {
// IO errors are considered fatal
warn!("IO error (client: {:?}): {:?}", cid, e);
break;
}
x => {
info!("message was: {:?} (ignoring)", x);
continue;
// default condition on error is to close the client connection
info!("unknown error (client: {:?}): {:?} (closing conn)", cid, x);
break;
}
};
@@ -489,7 +506,7 @@ async fn nostr_server(
},
Err(_) => {
info!("client {:?} sent an invalid event", cid);
ws_stream.send(Message::Text(format!("[\"NOTICE\",\"{}\"]", "event was invalid"))).await.ok();
ws_stream.send(make_notice_message("event was invalid")).await.ok();
}
}
},
@@ -507,14 +524,11 @@ async fn nostr_server(
previous_query.send(()).ok();
}
// start a database query
// show pool stats
debug!("DB pool stats: {:?}", pool.state());
db::db_query(s, pool.get().expect("could not get connection"), query_tx.clone(), abandon_query_rx).await;
db::db_query(s, pool.clone(), query_tx.clone(), abandon_query_rx).await;
},
Err(e) => {
info!("Subscription error: {}", e);
let s = e.to_string().replace("\"", "");
ws_stream.send(Message::Text(format!("[\"NOTICE\",\"{}\"]", s))).await.ok();
ws_stream.send(make_notice_message(&e.to_string())).await.ok();
}
}
},
@@ -535,7 +549,7 @@ async fn nostr_server(
},
Err(_) => {
info!("invalid command ignored");
ws_stream.send(Message::Text(format!("[\"NOTICE\",\"{}\"]", "could not parse command"))).await.ok();
ws_stream.send(make_notice_message("could not parse command")).await.ok();
}
}
},
@@ -545,11 +559,11 @@ async fn nostr_server(
}
Err(Error::EventMaxLengthError(s)) => {
info!("client {:?} sent event larger ({} bytes) than max size", cid, s);
ws_stream.send(Message::Text(format!("[\"NOTICE\",\"{}\"]", "event exceeded max size"))).await.ok();
ws_stream.send(make_notice_message("event exceeded max size")).await.ok();
},
Err(Error::ProtoParseError) => {
info!("client {:?} sent event that could not be parsed", cid);
ws_stream.send(Message::Text(format!("[\"NOTICE\",\"{}\"]", "could not parse command"))).await.ok();
ws_stream.send(make_notice_message("could not parse command")).await.ok();
},
Err(e) => {
info!("got non-fatal error from client: {:?}, error: {:?}", cid, e);

View File

@@ -31,6 +31,8 @@ pub struct ReqFilter {
pub until: Option<u64>,
/// List of author public keys
pub authors: Option<Vec<String>>,
/// Limit number of results
pub limit: Option<u32>,
/// Set of tags
#[serde(skip)]
pub tags: Option<HashMap<String, HashSet<String>>>,
@@ -54,6 +56,7 @@ impl<'de> Deserialize<'de> for ReqFilter {
since: None,
until: None,
authors: None,
limit: None,
tags: None,
};
let mut ts = None;
@@ -68,6 +71,8 @@ impl<'de> Deserialize<'de> for ReqFilter {
rf.since = Deserialize::deserialize(val).ok();
} else if key == "until" {
rf.until = Deserialize::deserialize(val).ok();
} else if key == "limit" {
rf.limit = Deserialize::deserialize(val).ok();
} else if key == "authors" {
rf.authors = Deserialize::deserialize(val).ok();
} else if key.starts_with('#') && key.len() > 1 && val.is_array() {
@@ -214,6 +219,7 @@ impl ReqFilter {
// self.id.as_ref().map(|v| v == &event.id).unwrap_or(true)
self.ids_match(event)
&& self.since.map(|t| event.created_at > t).unwrap_or(true)
&& self.until.map(|t| event.created_at < t).unwrap_or(true)
&& self.kind_match(event.kind)
&& self.authors_match(event)
&& self.tag_match(event)
@@ -321,6 +327,50 @@ mod tests {
Ok(())
}
#[test]
fn interest_until() -> Result<()> {
// subscription with a filter for ID and time
let s: Subscription =
serde_json::from_str(r#"["REQ","xyz",{"ids": ["abc"], "until": 1000}]"#)?;
let e = Event {
id: "abc".to_owned(),
pubkey: "".to_owned(),
created_at: 50,
kind: 0,
tags: Vec::new(),
content: "".to_owned(),
sig: "".to_owned(),
tagidx: None,
};
assert!(s.interested_in_event(&e));
Ok(())
}
#[test]
fn interest_range() -> Result<()> {
// subscription with a filter for ID and time
let s_in: Subscription =
serde_json::from_str(r#"["REQ","xyz",{"ids": ["abc"], "since": 100, "until": 200}]"#)?;
let s_before: Subscription =
serde_json::from_str(r#"["REQ","xyz",{"ids": ["abc"], "since": 100, "until": 140}]"#)?;
let s_after: Subscription =
serde_json::from_str(r#"["REQ","xyz",{"ids": ["abc"], "since": 160, "until": 200}]"#)?;
let e = Event {
id: "abc".to_owned(),
pubkey: "".to_owned(),
created_at: 150,
kind: 0,
tags: Vec::new(),
content: "".to_owned(),
sig: "".to_owned(),
tagidx: None,
};
assert!(s_in.interested_in_event(&e));
assert!(!s_before.interested_in_event(&e));
assert!(!s_after.interested_in_event(&e));
Ok(())
}
#[test]
fn interest_time_and_id() -> Result<()> {
// subscription with a filter for ID and time