Compare commits

...

15 Commits

Author SHA1 Message Date
Greg Heartsfield
675662c7fb improvement: upgrade docker builder and base images 2022-11-05 13:24:17 -05:00
Greg Heartsfield
505b0cb71f improvement: upgrade multiple dependencies
Updating anyhow v1.0.65 -> v1.0.66
Updating async-trait v0.1.57 -> v0.1.58
Updating axum v0.5.16 -> v0.5.17
Updating axum-core v0.2.8 -> v0.2.9
Updating base64 v0.13.0 -> v0.13.1
Updating bumpalo v3.11.0 -> v3.11.1
Updating cc v1.0.73 -> v1.0.74
Updating cxx v1.0.79 -> v1.0.80
Updating cxx-build v1.0.79 -> v1.0.80
Updating cxxbridge-flags v1.0.79 -> v1.0.80
Updating cxxbridge-macro v1.0.79 -> v1.0.80
Updating futures v0.3.24 -> v0.3.25
Updating futures-channel v0.3.24 -> v0.3.25
Updating futures-core v0.3.24 -> v0.3.25
Updating futures-executor v0.3.24 -> v0.3.25
Updating futures-io v0.3.24 -> v0.3.25
Updating futures-macro v0.3.24 -> v0.3.25
Updating futures-sink v0.3.24 -> v0.3.25
Updating futures-task v0.3.24 -> v0.3.25
Updating futures-util v0.3.24 -> v0.3.25
Updating getrandom v0.2.7 -> v0.2.8
Updating h2 v0.3.14 -> v0.3.15
Updating hyper v0.14.20 -> v0.14.22
Updating iana-time-zone v0.1.51 -> v0.1.53
Updating libc v0.2.135 -> v0.2.137
Updating mio v0.8.4 -> v0.8.5
Updating native-tls v0.2.10 -> v0.2.11
Updating num_cpus v1.13.1 -> v1.14.0
Updating once_cell v1.15.0 -> v1.16.0
Updating openssl-sys v0.9.76 -> v0.9.77
Updating parking_lot_core v0.9.3 -> v0.9.4
Updating pest v2.4.0 -> v2.4.1
Updating pest_derive v2.4.0 -> v2.4.1
Updating pest_generator v2.4.0 -> v2.4.1
Updating pest_meta v2.4.0 -> v2.4.1
Updating pkg-config v0.3.25 -> v0.3.26
Updating ppv-lite86 v0.2.16 -> v0.2.17
Updating prost v0.11.0 -> v0.11.2
Updating prost-derive v0.11.0 -> v0.11.2
Updating prost-types v0.11.1 -> v0.11.2
Updating serde v1.0.145 -> v1.0.147
Updating serde_derive v1.0.145 -> v1.0.147
Updating serde_json v1.0.86 -> v1.0.87
Updating syn v1.0.102 -> v1.0.103
  Adding windows-sys v0.42.0
  Adding windows_aarch64_gnullvm v0.42.0
  Adding windows_aarch64_msvc v0.42.0
  Adding windows_i686_gnu v0.42.0
  Adding windows_i686_msvc v0.42.0
  Adding windows_x86_64_gnu v0.42.0
  Adding windows_x86_64_gnullvm v0.42.0
  Adding windows_x86_64_msvc v0.42.0
2022-11-05 10:59:03 -05:00
Greg Heartsfield
e8aa450802 build: bump version to 0.7.1 2022-11-05 10:35:38 -05:00
Greg Heartsfield
5a8860bb09 feat: log user-agent if present 2022-11-05 10:29:25 -05:00
Greg Heartsfield
11e43eccf9 refactor: add unit to ping_interval config 2022-11-05 07:42:08 -05:00
William Casarin
50577b2dfa feat: add network.ping_interval setting
Add a ping interval setting that allows you to customize the websocket
ping interval. The default of 5 minutes may be too high for some proxy
servers that disconnect connections that are held open for too long.
2022-11-05 07:40:28 -05:00
William Casarin
a6cb6f8486 refactor: rename get_header_remote_ip -> get_header_string
This function has nothing to do with remote ips!
2022-11-05 07:37:18 -05:00
Greg Heartsfield
ae5bf98d87 feat: retrieve client IP from header in config.toml
If the config.toml has defined a HTTP header to look for a remote IP,
that will be logged.  Otherwise, the socket address IP will be used.

closes: https://todo.sr.ht/~gheartsfield/nostr-rs-relay/47
2022-11-04 18:05:01 -05:00
William Casarin
1cf9d719f0 feat: look for proxied ip headers
This enables support for using the proxied IP from cloudflare. The damus
relay is behind cloudflare, so to get accurate remote ip logging we need
to look at the headers instead of the socket address.

Signed-off-by: William Casarin <jb55@jb55.com>
2022-11-04 17:09:28 -05:00
William Casarin
311f4b5283 refactor: switch new connections to debug log
These are pretty spammy on busy relays. I've been using the info log to
monitor spam attacks, and these are the least useful info log.

Leave the "stopping connection" log because it at least provides useful
sent/received information.

Signed-off-by: William Casarin <jb55@jb55.com>
2022-11-04 07:59:53 -05:00
Greg Heartsfield
14b5a51e3a fix: log ephemeral events after send 2022-11-04 07:55:38 -05:00
Greg Heartsfield
8ecce3f566 feat: show client IP in logs 2022-11-02 18:33:44 -05:00
Greg Heartsfield
caffbbbede build: bump version to 0.7.0 2022-10-16 15:42:11 -05:00
Greg Heartsfield
81045ad3d0 improvement: upgrade multiple dependencies
Updating anyhow v1.0.64 -> v1.0.65
  Adding codespan-reporting v0.11.1
Updating const_format v0.2.28 -> v0.2.30
Updating const_format_proc_macros v0.2.22 -> v0.2.29
Updating crossbeam-utils v0.8.11 -> v0.8.12
  Adding cxx v1.0.79
  Adding cxx-build v1.0.79
  Adding cxxbridge-flags v1.0.79
  Adding cxxbridge-macro v1.0.79
Updating digest v0.10.3 -> v0.10.5
Updating hdrhistogram v7.5.1 -> v7.5.2
Updating iana-time-zone v0.1.50 -> v0.1.51
  Adding iana-time-zone-haiku v0.1.1
Updating itertools v0.10.3 -> v0.10.5
Updating itoa v1.0.3 -> v1.0.4
Updating js-sys v0.3.59 -> v0.3.60
Updating libc v0.2.132 -> v0.2.135
  Adding link-cplusplus v1.0.7
Updating lock_api v0.4.8 -> v0.4.9
Updating once_cell v1.14.0 -> v1.15.0
Updating openssl v0.10.41 -> v0.10.42
Updating openssl-sys v0.9.75 -> v0.9.76
Updating pest v2.3.0 -> v2.4.0
Updating pest_derive v2.3.0 -> v2.4.0
Updating pest_generator v2.3.0 -> v2.4.0
Updating pest_meta v2.3.0 -> v2.4.0
Updating proc-macro2 v1.0.43 -> v1.0.47
Updating rand_core v0.6.3 -> v0.6.4
Updating raw-cpuid v10.5.0 -> v10.6.0
  Adding scratch v1.0.2
Updating serde v1.0.144 -> v1.0.145
Updating serde_derive v1.0.144 -> v1.0.145
Updating serde_json v1.0.85 -> v1.0.86
  Adding sha1 v0.10.5
Updating smallvec v1.9.0 -> v1.10.0
Updating syn v1.0.99 -> v1.0.102
  Adding termcolor v1.1.3
Updating thiserror v1.0.34 -> v1.0.37
Updating thiserror-impl v1.0.34 -> v1.0.37
Updating tokio v1.21.0 -> v1.21.2
Updating tokio-stream v0.1.9 -> v0.1.11
Updating tonic v0.8.1 -> v0.8.2
Updating tower-layer v0.3.1 -> v0.3.2
Updating tracing v0.1.36 -> v0.1.37
Updating tracing-attributes v0.1.22 -> v0.1.23
Updating tracing-core v0.1.29 -> v0.1.30
Updating tracing-subscriber v0.3.15 -> v0.3.16
Updating unicode-ident v1.0.3 -> v1.0.5
Updating unicode-normalization v0.1.21 -> v0.1.22
  Adding unicode-width v0.1.10
Updating uuid v1.1.2 -> v1.2.1
Updating wasm-bindgen v0.2.82 -> v0.2.83
Updating wasm-bindgen-backend v0.2.82 -> v0.2.83
Updating wasm-bindgen-macro v0.2.82 -> v0.2.83
Updating wasm-bindgen-macro-support v0.2.82 -> v0.2.83
Updating wasm-bindgen-shared v0.2.82 -> v0.2.83
Updating web-sys v0.3.59 -> v0.3.60
  Adding winapi-util v0.1.5
2022-10-16 15:33:11 -05:00
Greg Heartsfield
72f8a1aa5c feat(NIP-26): allow searches for delegated public keys
Implements core NIP-26 delegated event functionality.  Events can
include a `delegation` tag that provides a signature and restrictions
on which events can be delegated.

Notable points on the implementation so far:

* Schema has been upgraded to include an index and new column.
* Basic rune parsing/evaluation to implement the example event in the
  NIP, but no more.
* No special logic for deletion.
* No migration logic for determining delegated authors for
  already-stored events.
2022-10-16 15:25:06 -05:00
17 changed files with 986 additions and 198 deletions

495
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[package]
name = "nostr-rs-relay"
version = "0.6.2"
version = "0.7.1"
edition = "2021"
[dependencies]
@@ -32,6 +32,7 @@ http = { version = "0.2" }
parse_duration = "2"
rand = "0.8"
const_format = "0.2.28"
regex = "1"
[dev-dependencies]
anyhow = "1"

View File

@@ -1,4 +1,4 @@
FROM docker.io/library/rust:1.64.0@sha256:5cf09a76cb9baf4990d121221bbad64927cc5690ee54f246487e302ddc2ba300 as builder
FROM docker.io/library/rust:1.65.0@sha256:1bca14676a365d0ed37a1e2a1da86c2bcf883fdf6e6886469434763d94d4afd5 as builder
RUN USER=root cargo new --bin nostr-rs-relay
WORKDIR ./nostr-rs-relay
@@ -12,7 +12,8 @@ COPY ./src ./src
RUN rm ./target/release/deps/nostr*relay*
RUN cargo build --release
FROM docker.io/library/debian:bullseye-20221004-slim@sha256:8b702518a671c926b5ece4efe386a476eb4777646a36d996d4bd50944f2f11a2
FROM docker.io/library/debian:bullseye-20221024-slim@sha256:76cdda8fe5eb597ef5e712e4c9a9f5f1fb119e69f353daaa7bd6d0f6e66e541d
ARG APP=/usr/src/app
ARG APP_DATA=/usr/src/app/db
RUN apt-get update \

View File

@@ -27,6 +27,7 @@ mirrored on [GitHub](https://github.com/scsibug/nostr-rs-relay).
- [x] NIP-15: [End of Stored Events Notice](https://github.com/nostr-protocol/nips/blob/master/15.md)
- [x] NIP-16: [Event Treatment](https://github.com/nostr-protocol/nips/blob/master/16.md)
- [x] NIP-22: [Event `created_at` limits](https://github.com/nostr-protocol/nips/blob/master/22.md) (_future-dated events only_)
- [x] NIP-26: [Event Delegation](https://github.com/nostr-protocol/nips/blob/master/26.md)
## Quick Start

View File

@@ -46,6 +46,14 @@ address = "0.0.0.0"
# Listen on this port
port = 8080
# If present, read this HTTP header for logging client IP addresses.
# Examples for common proxies, cloudflare:
#remote_ip_header = "x-forwarded-for"
#remote_ip_header = "cf-connecting-ip"
# Websocket ping interval in seconds, defaults to 5 minutes
#ping_interval = 300
[options]
# Reject events that have timestamps greater than this many seconds in
# the future. Recommended to reject anything greater than 30 minutes

View File

@@ -28,9 +28,10 @@ pub struct Database {
pub struct Network {
pub port: u16,
pub address: String,
pub remote_ip_header: Option<String>, // retrieve client IP from this HTTP header if present
pub ping_interval_seconds: u32,
}
//
#[derive(Debug, Clone, Serialize, Deserialize)]
#[allow(unused)]
pub struct Options {
@@ -207,7 +208,9 @@ impl Default for Settings {
},
network: Network {
port: 8080,
ping_interval_seconds: 300,
address: "0.0.0.0".to_owned(),
remote_ip_header: None,
},
limits: Limits {
messages_per_sec: None,

View File

@@ -14,6 +14,8 @@ const MAX_SUBSCRIPTION_ID_LEN: usize = 256;
/// State for a client connection
pub struct ClientConn {
/// Client IP (either from socket, or configured proxy header
client_ip: String,
/// Unique client identifier generated at connection time
client_id: Uuid,
/// The current set of active client subscriptions
@@ -24,16 +26,17 @@ pub struct ClientConn {
impl Default for ClientConn {
fn default() -> Self {
Self::new()
Self::new("unknown".to_owned())
}
}
impl ClientConn {
/// Create a new, empty connection state.
#[must_use]
pub fn new() -> Self {
pub fn new(client_ip: String) -> Self {
let client_id = Uuid::new_v4();
ClientConn {
client_ip,
client_id,
subscriptions: HashMap::new(),
max_subs: 32,
@@ -47,6 +50,11 @@ impl ClientConn {
self.client_id.to_string().chars().take(8).collect()
}
#[must_use]
pub fn ip(&self) -> &str {
&self.client_ip
}
/// Find all matching subscriptions.
#[must_use]
pub fn get_matching_subscriptions(&self, e: &Event) -> Vec<&str> {
@@ -102,7 +110,7 @@ impl ClientConn {
// TODO: return notice if subscription did not exist.
self.subscriptions.remove(&c.id);
debug!(
"removed subscription, currently have {} active subs (cid={})",
"removed subscription, currently have {} active subs (cid={:?})",
self.subscriptions.len(),
self.client_id
);

View File

@@ -142,12 +142,15 @@ pub async fn db_writer(
if next_event.is_none() {
break;
}
// track if an event write occurred; this is used to
// update the rate limiter
let mut event_write = false;
let subm_event = next_event.unwrap();
let event = subm_event.event;
let notice_tx = subm_event.notice_tx;
// check if this event is authorized.
if let Some(allowed_addrs) = whitelist {
// TODO: incorporate delegated pubkeys
// if the event address is not in allowed_addrs.
if !allowed_addrs.contains(&event.pubkey) {
info!(
@@ -213,13 +216,13 @@ pub async fn db_writer(
// TODO: cache recent list of authors to remove a DB call.
let start = Instant::now();
if event.kind >= 20000 && event.kind < 30000 {
bcast_tx.send(event.clone()).ok();
info!(
"published ephemeral event {:?} from {:?} in {:?}",
event.get_event_id_prefix(),
event.get_author_prefix(),
start.elapsed()
);
bcast_tx.send(event.clone()).ok();
event_write = true
} else {
match write_event(&mut pool.get()?, &event) {
@@ -284,12 +287,13 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
let tx = conn.transaction()?;
// get relevant fields from event and convert to blobs.
let id_blob = hex::decode(&e.id).ok();
let pubkey_blob = hex::decode(&e.pubkey).ok();
let pubkey_blob: Option<Vec<u8>> = hex::decode(&e.pubkey).ok();
let delegator_blob: Option<Vec<u8>> = e.delegated_by.as_ref().and_then(|d| hex::decode(d).ok());
let event_str = serde_json::to_string(&e).ok();
// ignore if the event hash is a duplicate.
let mut ins_count = tx.execute(
"INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, strftime('%s','now'), FALSE);",
params![id_blob, e.created_at, e.kind, pubkey_blob, event_str]
"INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, delegated_by, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%s','now'), FALSE);",
params![id_blob, e.created_at, e.kind, pubkey_blob, delegator_blob, event_str]
)?;
if ins_count == 0 {
// if the event was a duplicate, no need to insert event or
@@ -312,7 +316,7 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
if is_lower_hex(tagval) && (tagval.len() % 2 == 0) {
tx.execute(
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
params![ev_id, &tagname, hex::decode(&tagval).ok()],
params![ev_id, &tagname, hex::decode(tagval).ok()],
)?;
} else {
tx.execute(
@@ -439,16 +443,22 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
for auth in authvec {
match hex_range(auth) {
Some(HexSearch::Exact(ex)) => {
auth_searches.push("author=?".to_owned());
auth_searches.push("author=? OR delegated_by=?".to_owned());
params.push(Box::new(ex.clone()));
params.push(Box::new(ex));
}
Some(HexSearch::Range(lower, upper)) => {
auth_searches.push("(author>? AND author<?)".to_owned());
auth_searches.push(
"(author>? AND author<?) OR (delegated_by>? AND delegated_by<?)".to_owned(),
);
params.push(Box::new(lower.clone()));
params.push(Box::new(upper.clone()));
params.push(Box::new(lower));
params.push(Box::new(upper));
}
Some(HexSearch::LowerOnly(lower)) => {
auth_searches.push("author>?".to_owned());
auth_searches.push("author>? OR delegated_by>?".to_owned());
params.push(Box::new(lower.clone()));
params.push(Box::new(lower));
}
None => {
@@ -500,7 +510,7 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
let mut blob_vals: Vec<Box<dyn ToSql>> = vec![];
for v in val {
if (v.len() % 2 == 0) && is_lower_hex(v) {
if let Ok(h) = hex::decode(&v) {
if let Ok(h) = hex::decode(v) {
blob_vals.push(Box::new(h));
}
} else {

416
src/delegation.rs Normal file
View File

@@ -0,0 +1,416 @@
//! Event parsing and validation
use crate::error::Error;
use crate::error::Result;
use crate::event::Event;
use bitcoin_hashes::{sha256, Hash};
use lazy_static::lazy_static;
use regex::Regex;
use secp256k1::{schnorr, Secp256k1, VerifyOnly, XOnlyPublicKey};
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use tracing::{debug, info};
// This handles everything related to delegation, in particular the
// condition/rune parsing and logic.
// Conditions are poorly specified, so we will implement the minimum
// necessary for now.
// fields MUST be either "kind" or "created_at".
// operators supported are ">", "<", "=", "!".
// no operations on 'content' are supported.
// this allows constraints for:
// valid date ranges (valid from X->Y dates).
// specific kinds (publish kind=1,5)
// kind ranges (publish ephemeral events, kind>19999&kind<30001)
// for more complex scenarios (allow delegatee to publish ephemeral
// AND replacement events), it may be necessary to generate and use
// different condition strings, since we do not support grouping or
// "OR" logic.
lazy_static! {
/// Secp256k1 verification instance.
pub static ref SECP: Secp256k1<VerifyOnly> = Secp256k1::verification_only();
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
pub enum Field {
Kind,
CreatedAt,
}
impl FromStr for Field {
type Err = Error;
fn from_str(value: &str) -> Result<Self, Self::Err> {
if value == "kind" {
Ok(Field::Kind)
} else if value == "created_at" {
Ok(Field::CreatedAt)
} else {
Err(Error::DelegationParseError)
}
}
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
pub enum Operator {
LessThan,
GreaterThan,
Equals,
NotEquals,
}
impl FromStr for Operator {
type Err = Error;
fn from_str(value: &str) -> Result<Self, Self::Err> {
if value == "<" {
Ok(Operator::LessThan)
} else if value == ">" {
Ok(Operator::GreaterThan)
} else if value == "=" {
Ok(Operator::Equals)
} else if value == "!" {
Ok(Operator::NotEquals)
} else {
Err(Error::DelegationParseError)
}
}
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
pub struct ConditionQuery {
pub(crate) conditions: Vec<Condition>,
}
impl ConditionQuery {
pub fn allows_event(&self, event: &Event) -> bool {
// check each condition, to ensure that the event complies
// with the restriction.
for c in &self.conditions {
if !c.allows_event(event) {
// any failing conditions invalidates the delegation
// on this event
return false;
}
}
// delegation was permitted unconditionally, or all conditions
// were true
true
}
}
// Verify that the delegator approved the delegation; return a ConditionQuery if so.
pub fn validate_delegation(
delegator: &str,
delegatee: &str,
cond_query: &str,
sigstr: &str,
) -> Option<ConditionQuery> {
// form the token
let tok = format!("nostr:delegation:{}:{}", delegatee, cond_query);
// form SHA256 hash
let digest: sha256::Hash = sha256::Hash::hash(tok.as_bytes());
let sig = schnorr::Signature::from_str(sigstr).unwrap();
if let Ok(msg) = secp256k1::Message::from_slice(digest.as_ref()) {
if let Ok(pubkey) = XOnlyPublicKey::from_str(delegator) {
let verify = SECP.verify_schnorr(&sig, &msg, &pubkey);
if verify.is_ok() {
// return the parsed condition query
cond_query.parse::<ConditionQuery>().ok()
} else {
debug!("client sent an delegation signature that did not validate");
None
}
} else {
debug!("client sent malformed delegation pubkey");
None
}
} else {
info!("error converting delegation digest to secp256k1 message");
None
}
}
/// Parsed delegation condition
/// see https://github.com/nostr-protocol/nips/pull/28#pullrequestreview-1084903800
/// An example complex condition would be: kind=1,2,3&created_at<1665265999
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
pub struct Condition {
pub(crate) field: Field,
pub(crate) operator: Operator,
pub(crate) values: Vec<u64>,
}
impl Condition {
/// Check if this condition allows the given event to be delegated
pub fn allows_event(&self, event: &Event) -> bool {
// determine what the right-hand side of the operator is
let resolved_field = match &self.field {
Field::Kind => event.kind,
Field::CreatedAt => event.created_at,
};
match &self.operator {
Operator::LessThan => {
// the less-than operator is only valid for single values.
if self.values.len() == 1 {
if let Some(v) = self.values.first() {
return resolved_field < *v;
}
}
}
Operator::GreaterThan => {
// the greater-than operator is only valid for single values.
if self.values.len() == 1 {
if let Some(v) = self.values.first() {
return resolved_field > *v;
}
}
}
Operator::Equals => {
// equals is interpreted as "must be equal to at least one provided value"
return self.values.iter().any(|&x| resolved_field == x);
}
Operator::NotEquals => {
// not-equals is interpreted as "must not be equal to any provided value"
// this is the one case where an empty list of values could be allowed; even though it is a pointless restriction.
return self.values.iter().all(|&x| resolved_field != x);
}
}
false
}
}
fn str_to_condition(cs: &str) -> Option<Condition> {
// a condition is a string (alphanum+underscore), an operator (<>=!), and values (num+comma)
lazy_static! {
static ref RE: Regex = Regex::new("([[:word:]]+)([<>=!]+)([,[[:digit:]]]*)").unwrap();
}
// match against the regex
let caps = RE.captures(cs)?;
let field = caps.get(1)?.as_str().parse::<Field>().ok()?;
let operator = caps.get(2)?.as_str().parse::<Operator>().ok()?;
// values are just comma separated numbers, but all must be parsed
let rawvals = caps.get(3)?.as_str();
let values = rawvals
.split_terminator(',')
.map(|n| n.parse::<u64>().ok())
.collect::<Option<Vec<_>>>()?;
// convert field string into Field
Some(Condition {
field,
operator,
values,
})
}
/// Parse a condition query from a string slice
impl FromStr for ConditionQuery {
type Err = Error;
fn from_str(value: &str) -> Result<Self, Self::Err> {
// split the string with '&'
let mut conditions = vec![];
let condstrs = value.split_terminator('&');
// parse each individual condition
for c in condstrs {
conditions.push(str_to_condition(c).ok_or(Error::DelegationParseError)?);
}
Ok(ConditionQuery { conditions })
}
}
#[cfg(test)]
mod tests {
use super::*;
// parse condition strings
#[test]
fn parse_empty() -> Result<()> {
// given an empty condition query, produce an empty vector
let empty_cq = ConditionQuery { conditions: vec![] };
let parsed = "".parse::<ConditionQuery>()?;
assert_eq!(parsed, empty_cq);
Ok(())
}
// parse field 'kind'
#[test]
fn test_kind_field_parse() -> Result<()> {
let field = "kind".parse::<Field>()?;
assert_eq!(field, Field::Kind);
Ok(())
}
// parse field 'created_at'
#[test]
fn test_created_at_field_parse() -> Result<()> {
let field = "created_at".parse::<Field>()?;
assert_eq!(field, Field::CreatedAt);
Ok(())
}
// parse unknown field
#[test]
fn unknown_field_parse() {
let field = "unk".parse::<Field>();
assert!(field.is_err());
}
// parse a full conditional query with an empty array
#[test]
fn parse_kind_equals_empty() -> Result<()> {
// given an empty condition query, produce an empty vector
let kind_cq = ConditionQuery {
conditions: vec![Condition {
field: Field::Kind,
operator: Operator::Equals,
values: vec![],
}],
};
let parsed = "kind=".parse::<ConditionQuery>()?;
assert_eq!(parsed, kind_cq);
Ok(())
}
// parse a full conditional query with a single value
#[test]
fn parse_kind_equals_singleval() -> Result<()> {
// given an empty condition query, produce an empty vector
let kind_cq = ConditionQuery {
conditions: vec![Condition {
field: Field::Kind,
operator: Operator::Equals,
values: vec![1],
}],
};
let parsed = "kind=1".parse::<ConditionQuery>()?;
assert_eq!(parsed, kind_cq);
Ok(())
}
// parse a full conditional query with multiple values
#[test]
fn parse_kind_equals_multival() -> Result<()> {
// given an empty condition query, produce an empty vector
let kind_cq = ConditionQuery {
conditions: vec![Condition {
field: Field::Kind,
operator: Operator::Equals,
values: vec![1, 2, 4],
}],
};
let parsed = "kind=1,2,4".parse::<ConditionQuery>()?;
assert_eq!(parsed, kind_cq);
Ok(())
}
// parse multiple conditions
#[test]
fn parse_multi_conditions() -> Result<()> {
// given an empty condition query, produce an empty vector
let cq = ConditionQuery {
conditions: vec![
Condition {
field: Field::Kind,
operator: Operator::GreaterThan,
values: vec![10000],
},
Condition {
field: Field::Kind,
operator: Operator::LessThan,
values: vec![20000],
},
Condition {
field: Field::Kind,
operator: Operator::NotEquals,
values: vec![10001],
},
Condition {
field: Field::CreatedAt,
operator: Operator::LessThan,
values: vec![1665867123],
},
],
};
let parsed =
"kind>10000&kind<20000&kind!10001&created_at<1665867123".parse::<ConditionQuery>()?;
assert_eq!(parsed, cq);
Ok(())
}
fn simple_event() -> Event {
Event {
id: "0".to_owned(),
pubkey: "0".to_owned(),
delegated_by: None,
created_at: 0,
kind: 0,
tags: vec![],
content: "".to_owned(),
sig: "0".to_owned(),
tagidx: None,
}
}
// Check for condition logic on event w/ empty values
#[test]
fn condition_with_empty_values() {
let mut c = Condition {
field: Field::Kind,
operator: Operator::GreaterThan,
values: vec![],
};
let e = simple_event();
assert!(!c.allows_event(&e));
c.operator = Operator::LessThan;
assert!(!c.allows_event(&e));
c.operator = Operator::Equals;
assert!(!c.allows_event(&e));
// Not Equals applied to an empty list *is* allowed
// (pointless, but logically valid).
c.operator = Operator::NotEquals;
assert!(c.allows_event(&e));
}
// Check for condition logic on event w/ single value
#[test]
fn condition_kind_gt_event_single() {
let c = Condition {
field: Field::Kind,
operator: Operator::GreaterThan,
values: vec![10],
};
let mut e = simple_event();
// kind is not greater than 10, not allowed
e.kind = 1;
assert!(!c.allows_event(&e));
// kind is greater than 10, allowed
e.kind = 100;
assert!(c.allows_event(&e));
// kind is 10, not allowed
e.kind = 10;
assert!(!c.allows_event(&e));
}
// Check for condition logic on event w/ multi values
#[test]
fn condition_with_multi_values() {
let mut c = Condition {
field: Field::Kind,
operator: Operator::Equals,
values: vec![0, 10, 20],
};
let mut e = simple_event();
// Allow if event kind is in list for Equals
e.kind = 10;
assert!(c.allows_event(&e));
// Deny if event kind is not in list for Equals
e.kind = 11;
assert!(!c.allows_event(&e));
// Deny if event kind is in list for NotEquals
e.kind = 10;
c.operator = Operator::NotEquals;
assert!(!c.allows_event(&e));
// Allow if event kind is not in list for NotEquals
e.kind = 99;
c.operator = Operator::NotEquals;
assert!(c.allows_event(&e));
// Always deny if GreaterThan/LessThan for a list
c.operator = Operator::LessThan;
assert!(!c.allows_event(&e));
c.operator = Operator::GreaterThan;
assert!(!c.allows_event(&e));
}
}

View File

@@ -50,6 +50,8 @@ pub enum Error {
HyperError(hyper::Error),
#[error("Hex encoding error")]
HexError(hex::FromHexError),
#[error("Delegation parse error")]
DelegationParseError,
#[error("Unknown/Undocumented")]
UnknownError,
}

View File

@@ -1,4 +1,5 @@
//! Event parsing and validation
use crate::delegation::validate_delegation;
use crate::error::Error::*;
use crate::error::Result;
use crate::nip05;
@@ -31,6 +32,8 @@ pub struct EventCmd {
pub struct Event {
pub id: String,
pub(crate) pubkey: String,
#[serde(skip)]
pub(crate) delegated_by: Option<String>,
pub(crate) created_at: u64,
pub(crate) kind: u64,
#[serde(deserialize_with = "tag_from_string")]
@@ -83,6 +86,7 @@ impl From<EventCmd> for Result<Event> {
} else if ec.event.is_valid() {
let mut e = ec.event;
e.build_index();
e.update_delegation();
Ok(e)
} else {
Err(EventInvalid)
@@ -110,6 +114,50 @@ impl Event {
None
}
// is this event delegated (properly)?
// does the signature match, and are conditions valid?
// if so, return an alternate author for the event
pub fn delegated_author(&self) -> Option<String> {
// is there a delegation tag?
let delegation_tag: Vec<String> = self
.tags
.iter()
.filter(|x| x.len() == 4)
.filter(|x| x.get(0).unwrap() == "delegation")
.take(1)
.next()?
.to_vec(); // get first tag
//let delegation_tag = self.tag_values_by_name("delegation");
// delegation tags should have exactly 3 elements after the name (pubkey, condition, sig)
// the event is signed by the delagatee
let delegatee = &self.pubkey;
// the delegation tag references the claimed delagator
let delegator: &str = delegation_tag.get(1)?;
let querystr: &str = delegation_tag.get(2)?;
let sig: &str = delegation_tag.get(3)?;
// attempt to get a condition query; this requires the delegation to have a valid signature.
if let Some(cond_query) = validate_delegation(delegator, delegatee, querystr, sig) {
// The signature was valid, now we ensure the delegation
// condition is valid for this event:
if cond_query.allows_event(self) {
// since this is allowed, we will provide the delegatee
Some(delegator.into())
} else {
debug!("an event failed to satisfy delegation conditions");
None
}
} else {
debug!("event had had invalid delegation signature");
None
}
}
/// Update delegation status
fn update_delegation(&mut self) {
self.delegated_by = self.delegated_author();
}
/// Build an event tag index
fn build_index(&mut self) {
// if there are no tags; just leave the index as None
@@ -145,7 +193,7 @@ impl Event {
self.pubkey.chars().take(8).collect()
}
/// Retrieve tag values
/// Retrieve tag initial values across all tags matching the name
pub fn tag_values_by_name(&self, tag_name: &str) -> Vec<String> {
self.tags
.iter()
@@ -269,6 +317,7 @@ mod tests {
Event {
id: "0".to_owned(),
pubkey: "0".to_owned(),
delegated_by: None,
created_at: 0,
kind: 0,
tags: vec![],
@@ -350,6 +399,7 @@ mod tests {
let e = Event {
id: "999".to_owned(),
pubkey: "012345".to_owned(),
delegated_by: None,
created_at: 501234,
kind: 1,
tags: vec![],
@@ -367,6 +417,7 @@ mod tests {
let e = Event {
id: "999".to_owned(),
pubkey: "012345".to_owned(),
delegated_by: None,
created_at: 501234,
kind: 1,
tags: vec![
@@ -388,11 +439,39 @@ mod tests {
assert_eq!(v, vec!["foo", "bar", "baz"]);
}
#[test]
fn event_no_tag_select() {
let e = Event {
id: "999".to_owned(),
pubkey: "012345".to_owned(),
delegated_by: None,
created_at: 501234,
kind: 1,
tags: vec![
vec!["j".to_owned(), "abc".to_owned()],
vec!["e".to_owned(), "foo".to_owned()],
vec!["e".to_owned(), "baz".to_owned()],
vec![
"p".to_owned(),
"aaaa".to_owned(),
"ws://example.com".to_owned(),
],
],
content: "this is a test".to_owned(),
sig: "abcde".to_owned(),
tagidx: None,
};
let v = e.tag_values_by_name("x");
// asking for tags that don't exist just returns zero-length vector
assert_eq!(v.len(), 0);
}
#[test]
fn event_canonical_with_tags() {
let e = Event {
id: "999".to_owned(),
pubkey: "012345".to_owned(),
delegated_by: None,
created_at: 501234,
kind: 1,
tags: vec![

View File

@@ -35,7 +35,7 @@ impl From<config::Info> for RelayInfo {
description: i.description,
pubkey: i.pubkey,
contact: i.contact,
supported_nips: Some(vec![1, 2, 9, 11, 12, 15, 16, 22]),
supported_nips: Some(vec![1, 2, 9, 11, 12, 15, 16, 22, 26]),
software: Some("https://git.sr.ht/~gheartsfield/nostr-rs-relay".to_owned()),
version: CARGO_PKG_VERSION.map(|x| x.to_owned()),
}

View File

@@ -2,6 +2,7 @@ pub mod close;
pub mod config;
pub mod conn;
pub mod db;
pub mod delegation;
pub mod error;
pub mod event;
pub mod hexrange;

View File

@@ -34,11 +34,11 @@ fn main() {
// enable tracing with tokio-console
ConsoleLayer::builder().with_default_env().init();
}
// update with database location
if let Some(db) = db_dir {
settings.database.data_directory = db;
}
let (_, ctrl_rx): (MpscSender<()>, MpscReceiver<()>) = syncmpsc::channel();
// run this in a new thread
let handle = thread::spawn(|| {

View File

@@ -20,7 +20,7 @@ pragma mmap_size = 536870912; -- 512MB of mmap
"##;
/// Latest database version
pub const DB_VERSION: usize = 6;
pub const DB_VERSION: usize = 7;
/// Schema definition
const INIT_SQL: &str = formatcp!(
@@ -40,6 +40,7 @@ event_hash BLOB NOT NULL, -- 4-byte hash
first_seen INTEGER NOT NULL, -- when the event was first seen (not authored!) (seconds since 1970)
created_at INTEGER NOT NULL, -- when the event was authored
author BLOB NOT NULL, -- author pubkey
delegated_by BLOB, -- delegator pubkey (NIP-26)
kind INTEGER NOT NULL, -- event kind
hidden INTEGER, -- relevant for queries
content TEXT NOT NULL -- serialized json of event object
@@ -49,6 +50,7 @@ content TEXT NOT NULL -- serialized json of event object
CREATE UNIQUE INDEX IF NOT EXISTS event_hash_index ON event(event_hash);
CREATE INDEX IF NOT EXISTS created_at_index ON event(created_at);
CREATE INDEX IF NOT EXISTS author_index ON event(author);
CREATE INDEX IF NOT EXISTS delegated_by_index ON event(delegated_by);
CREATE INDEX IF NOT EXISTS kind_index ON event(kind);
-- Tag Table
@@ -152,6 +154,9 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<()> {
if curr_version == 5 {
curr_version = mig_5_to_6(conn)?;
}
if curr_version == 6 {
curr_version = mig_6_to_7(conn)?;
}
if curr_version == DB_VERSION {
info!(
"All migration scripts completed successfully. Welcome to v{}.",
@@ -327,7 +332,7 @@ fn mig_5_to_6(conn: &mut PooledConnection) -> Result<usize> {
if (tagval.len() % 2 == 0) && is_lower_hex(tagval) {
tx.execute(
"INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);",
params![event_id, tagname, hex::decode(&tagval).ok()],
params![event_id, tagname, hex::decode(tagval).ok()],
)?;
} else {
// otherwise, insert as text
@@ -348,3 +353,23 @@ fn mig_5_to_6(conn: &mut PooledConnection) -> Result<usize> {
info!("vacuumed DB after tags rebuild in {:?}", start.elapsed());
Ok(6)
}
fn mig_6_to_7(conn: &mut PooledConnection) -> Result<usize> {
info!("database schema needs update from 6->7");
// only change is adding a hidden column to events.
let upgrade_sql = r##"
ALTER TABLE event ADD delegated_by BLOB;
CREATE INDEX IF NOT EXISTS delegated_by_index ON event(delegated_by);
PRAGMA user_version = 7;
"##;
match conn.execute_batch(upgrade_sql) {
Ok(()) => {
info!("database schema upgraded v6 -> v7");
}
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be upgraded");
}
}
Ok(7)
}

View File

@@ -13,6 +13,7 @@ use crate::nip05;
use crate::subscription::Subscription;
use futures::SinkExt;
use futures::StreamExt;
use http::header::HeaderMap;
use hyper::header::ACCEPT;
use hyper::service::{make_service_fn, service_fn};
use hyper::upgrade::Upgraded;
@@ -84,11 +85,32 @@ async fn handle_web_request(
Some(config),
)
.await;
let user_agent = get_header_string("user-agent", request.headers());
// determine the remote IP from headers if the exist
let header_ip = settings
.network
.remote_ip_header
.as_ref()
.and_then(|x| get_header_string(x, request.headers()));
// use the socket addr as a backup
let remote_ip =
header_ip.unwrap_or_else(|| remote_addr.ip().to_string());
let client_info = ClientInfo {
remote_ip,
user_agent,
};
// spawn a nostr server with our websocket
tokio::spawn(nostr_server(
pool, settings, ws_stream, broadcast, event_tx, shutdown,
pool,
client_info,
settings,
ws_stream,
broadcast,
event_tx,
shutdown,
));
}
// todo: trace, don't print...
Err(e) => println!(
"error when trying to upgrade connection \
from address {} to websocket connection. \
@@ -148,6 +170,12 @@ async fn handle_web_request(
}
}
fn get_header_string(header: &str, headers: &HeaderMap) -> Option<String> {
headers
.get(header)
.and_then(|x| x.to_str().ok().map(|x| x.to_string()))
}
// return on a control-c or internally requested shutdown signal
async fn ctrl_c_or_signal(mut shutdown_signal: Receiver<()>) {
let mut term_signal = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
@@ -167,7 +195,6 @@ async fn ctrl_c_or_signal(mut shutdown_signal: Receiver<()>) {
info!("Shutting down webserver due to SIGTERM");
break;
},
}
}
}
@@ -382,10 +409,16 @@ fn make_notice_message(msg: &str) -> Message {
Message::text(json!(["NOTICE", msg]).to_string())
}
struct ClientInfo {
remote_ip: String,
user_agent: Option<String>,
}
/// Handle new client connections. This runs through an event loop
/// for all client communication.
async fn nostr_server(
pool: db::SqlitePool,
client_info: ClientInfo,
settings: Settings,
mut ws_stream: WebSocketStream<Upgraded>,
broadcast: Sender<Event>,
@@ -395,7 +428,8 @@ async fn nostr_server(
// get a broadcast channel for clients to communicate on
let mut bcast_rx = broadcast.subscribe();
// Track internal client state
let mut conn = conn::ClientConn::new();
let mut conn = conn::ClientConn::new(client_info.remote_ip);
// Use the remote IP as the client identifier
let cid = conn.get_client_prefix();
// Create a channel for receiving query results from the database.
// we will send out the tx handle to any query we generate.
@@ -407,7 +441,7 @@ async fn nostr_server(
let mut last_message_time = Instant::now();
// ping interval (every 5 minutes)
let default_ping_dur = Duration::from_secs(300);
let default_ping_dur = Duration::from_secs(settings.network.ping_interval_seconds.into());
// disconnect after 20 minutes without a ping response or event.
let max_quiet_time = Duration::from_secs(60 * 20);
@@ -424,11 +458,14 @@ async fn nostr_server(
// and how many it received from queries.
let mut client_published_event_count: usize = 0;
let mut client_received_event_count: usize = 0;
info!("new connection for client: {:?}", cid);
debug!("new connection for client: {:?}, ip: {:?}", cid, conn.ip());
if let Some(ua) = client_info.user_agent {
debug!("client: {:?} has user-agent: {:?}", cid, ua);
}
loop {
tokio::select! {
_ = shutdown.recv() => {
info!("Shutting client connection down due to shutdown: {:?}", cid);
info!("Shutting client connection down due to shutdown: {:?}, ip: {:?}", cid, conn.ip());
// server shutting down, exit loop
break;
},
@@ -507,17 +544,17 @@ async fn nostr_server(
Err(WsError::AlreadyClosed | WsError::ConnectionClosed |
WsError::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake)))
=> {
debug!("websocket close from client: {:?}",cid);
debug!("websocket close from client: {:?}, ip: {:?}",cid, conn.ip());
break;
},
Some(Err(WsError::Io(e))) => {
// IO errors are considered fatal
warn!("IO error (client: {:?}): {:?}", cid, e);
warn!("IO error (client: {:?}, ip: {:?}): {:?}", cid, conn.ip(), e);
break;
}
x => {
// default condition on error is to close the client connection
info!("unknown error (client: {:?}): {:?} (closing conn)", cid, x);
info!("unknown error (client: {:?}, ip: {:?}): {:?} (closing conn)", cid, conn.ip(), x);
break;
}
};
@@ -546,7 +583,7 @@ async fn nostr_server(
}
},
Err(_) => {
info!("client {:?} sent an invalid event", cid);
info!("client: {:?} sent an invalid event", cid);
ws_stream.send(make_notice_message("event was invalid")).await.ok();
}
}
@@ -592,7 +629,7 @@ async fn nostr_server(
}
},
Err(Error::ConnError) => {
debug!("got connection close/error, disconnecting client: {:?}",cid);
debug!("got connection close/error, disconnecting client: {:?}, ip: {:?}",cid, conn.ip());
break;
}
Err(Error::EventMaxLengthError(s)) => {
@@ -615,7 +652,10 @@ async fn nostr_server(
stop_tx.send(()).ok();
}
info!(
"stopping connection for client: {:?} (client sent {} event(s), received {})",
cid, client_published_event_count, client_received_event_count
"stopping connection for client: {:?}, ip: {:?} (client sent {} event(s), received {})",
cid,
conn.ip(),
client_published_event_count,
client_received_event_count
);
}

View File

@@ -217,6 +217,17 @@ impl ReqFilter {
.unwrap_or(true)
}
fn delegated_authors_match(&self, event: &Event) -> bool {
if let Some(delegated_pubkey) = &event.delegated_by {
self.authors
.as_ref()
.map(|vs| prefix_match(vs, delegated_pubkey))
.unwrap_or(true)
} else {
false
}
}
fn tag_match(&self, event: &Event) -> bool {
// get the hashset from the filter.
if let Some(map) = &self.tags {
@@ -248,7 +259,7 @@ impl ReqFilter {
&& self.since.map(|t| event.created_at > t).unwrap_or(true)
&& self.until.map(|t| event.created_at < t).unwrap_or(true)
&& self.kind_match(event.kind)
&& self.authors_match(event)
&& (self.authors_match(event) || self.delegated_authors_match(event))
&& self.tag_match(event)
&& !self.force_no_match
}
@@ -308,6 +319,7 @@ mod tests {
let e = Event {
id: "foo".to_owned(),
pubkey: "abcd".to_owned(),
delegated_by: None,
created_at: 0,
kind: 0,
tags: Vec::new(),
@@ -326,6 +338,7 @@ mod tests {
let e = Event {
id: "abcd".to_owned(),
pubkey: "".to_owned(),
delegated_by: None,
created_at: 0,
kind: 0,
tags: Vec::new(),
@@ -344,6 +357,7 @@ mod tests {
let e = Event {
id: "abcde".to_owned(),
pubkey: "".to_owned(),
delegated_by: None,
created_at: 0,
kind: 0,
tags: Vec::new(),
@@ -363,6 +377,7 @@ mod tests {
let e = Event {
id: "abc".to_owned(),
pubkey: "".to_owned(),
delegated_by: None,
created_at: 50,
kind: 0,
tags: Vec::new(),
@@ -386,6 +401,7 @@ mod tests {
let e = Event {
id: "abc".to_owned(),
pubkey: "".to_owned(),
delegated_by: None,
created_at: 150,
kind: 0,
tags: Vec::new(),
@@ -407,6 +423,7 @@ mod tests {
let e = Event {
id: "abc".to_owned(),
pubkey: "".to_owned(),
delegated_by: None,
created_at: 50,
kind: 0,
tags: Vec::new(),
@@ -425,6 +442,7 @@ mod tests {
let e = Event {
id: "abc".to_owned(),
pubkey: "".to_owned(),
delegated_by: None,
created_at: 1001,
kind: 0,
tags: Vec::new(),
@@ -443,6 +461,7 @@ mod tests {
let e = Event {
id: "abc".to_owned(),
pubkey: "".to_owned(),
delegated_by: None,
created_at: 0,
kind: 0,
tags: Vec::new(),
@@ -461,6 +480,7 @@ mod tests {
let e = Event {
id: "123".to_owned(),
pubkey: "abc".to_owned(),
delegated_by: None,
created_at: 0,
kind: 0,
tags: Vec::new(),
@@ -479,6 +499,7 @@ mod tests {
let e = Event {
id: "123".to_owned(),
pubkey: "bcd".to_owned(),
delegated_by: None,
created_at: 0,
kind: 0,
tags: Vec::new(),
@@ -497,6 +518,7 @@ mod tests {
let e = Event {
id: "123".to_owned(),
pubkey: "xyz".to_owned(),
delegated_by: None,
created_at: 0,
kind: 0,
tags: Vec::new(),