Compare commits

...

18 Commits
0.2.0 ... 0.3.2

Author SHA1 Message Date
Greg Heartsfield
3024e9fba4 build: bump version to 0.3.2 2022-01-03 18:43:17 -05:00
Greg Heartsfield
d3da4eb009 feat: implementation of proposed NIP-11 (server metadata) 2022-01-03 18:42:24 -05:00
Greg Heartsfield
19637d612e build: bump version to 0.3.1 2022-01-01 19:26:15 -06:00
Greg Heartsfield
afc9a0096a improvement: logging failed queries and timing 2022-01-01 19:25:09 -06:00
Greg Heartsfield
3d56262386 build: bump version to 0.3.0 2022-01-01 18:40:57 -06:00
Greg Heartsfield
6673fcfd11 feat: implement multi-valued filter searching
NIP-01 now uses arrays instead of scalars.

Fixes https://todo.sr.ht/~gheartsfield/nostr-rs-relay/17
2022-01-01 18:38:52 -06:00
Greg Heartsfield
b5da3fa2b0 docs: link to docker hub 2022-01-01 12:27:09 -06:00
Greg Heartsfield
850957213e build: bump version to 0.2.3 2022-01-01 09:13:13 -06:00
Greg Heartsfield
1aa5a5458d improvement: event signature validation is 100x faster
Switched to latest (git) release of secp256k1, which has more
efficient verification-only context for Schnorr.  Switched to single
pre-instantiated instance of the verifier.
2022-01-01 09:08:19 -06:00
Greg Heartsfield
620e227699 fix: connection issues with Firefox
This adds Hyper, and a 200 response code.  Prior to this, Firefox
would fail to connect.  There is also a text document displayed at the
root URL to indicate this is a Nostr relay.

Fixes https://todo.sr.ht/~gheartsfield/nostr-rs-relay/15
2022-01-01 08:11:20 -06:00
Greg Heartsfield
14e59ed278 build: bump version to 0.2.2 2021-12-31 16:34:52 -06:00
Greg Heartsfield
5ad383f257 fix: incorrect logic on empty filters for hidden events 2021-12-31 16:34:10 -06:00
Greg Heartsfield
9710ea27aa build: bump version to 0.2.1 2021-12-31 15:38:58 -06:00
Greg Heartsfield
783a6e1042 docs: fix docker examples 2021-12-31 15:28:26 -06:00
Greg Heartsfield
4171a8870e feat: reject events that are too large
A new configuration setting controls the maximum size of event
messages, and sends a notice to the client if they exceed it.

Fixes https://todo.sr.ht/~gheartsfield/nostr-rs-relay/14
2021-12-31 15:19:35 -06:00
Greg Heartsfield
8f3891c781 docs: docker and config updates 2021-12-31 14:08:04 -06:00
Greg Heartsfield
415d32299b fix: docker run references the correct database file 2021-12-31 14:05:11 -06:00
Greg Heartsfield
5a19a8876f feat: allow database directory configuration
Adds configuration options for database directory, either on command
line through (--db dir-name) or the config.toml file.

Fixes: https://todo.sr.ht/~gheartsfield/nostr-rs-relay/13
2021-12-31 11:51:57 -06:00
14 changed files with 625 additions and 118 deletions

156
Cargo.lock generated
View File

@@ -66,6 +66,12 @@ dependencies = [
"serde 1.0.131",
]
[[package]]
name = "bitcoin_hashes"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "006cc91e1a1d99819bc5b8214be3555c1f0611b169f527a1fdc54ed1f2b745b0"
[[package]]
name = "bitflags"
version = "1.3.2"
@@ -355,6 +361,25 @@ dependencies = [
"smallvec",
]
[[package]]
name = "h2"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f072413d126e57991455e0a922b31e4c8ba7c2ffbebf6b78b4f8521397d65cd"
dependencies = [
"bytes",
"fnv",
"futures-core",
"futures-sink",
"futures-util",
"http",
"indexmap",
"slab",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "hashbrown"
version = "0.11.2"
@@ -399,18 +424,59 @@ dependencies = [
"itoa",
]
[[package]]
name = "http-body"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6"
dependencies = [
"bytes",
"http",
"pin-project-lite",
]
[[package]]
name = "httparse"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acd94fdbe1d4ff688b67b04eee2e17bd50995534a61539e45adfefb45e5e5503"
[[package]]
name = "httpdate"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
version = "0.14.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7ec3e62bdc98a2f0393a5048e4c30ef659440ea6e0e572965103e72bd836f55"
dependencies = [
"bytes",
"futures-channel",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"socket2",
"tokio",
"tower-service",
"tracing",
"want",
]
[[package]]
name = "idna"
version = "0.2.3"
@@ -422,6 +488,16 @@ dependencies = [
"unicode-normalization",
]
[[package]]
name = "indexmap"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5"
dependencies = [
"autocfg 1.0.1",
"hashbrown",
]
[[package]]
name = "instant"
version = "0.1.12"
@@ -573,15 +649,16 @@ checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]]
name = "nostr-rs-relay"
version = "0.2.0"
version = "0.3.2"
dependencies = [
"bitcoin_hashes",
"bitcoin_hashes 0.9.7",
"config",
"env_logger",
"futures",
"futures-util",
"governor",
"hex",
"hyper",
"lazy_static",
"log",
"nonzero_ext",
@@ -960,10 +1037,9 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "secp256k1"
version = "0.20.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97d03ceae636d0fed5bae6a7f4f664354c5f4fcedf6eef053fef17e49f837d0a"
source = "git+https://github.com/rust-bitcoin/rust-secp256k1.git?rev=50034ccb18fdd84904ab3aa6c84a12fcced33209#50034ccb18fdd84904ab3aa6c84a12fcced33209"
dependencies = [
"bitcoin_hashes",
"bitcoin_hashes 0.10.0",
"rand 0.6.5",
"secp256k1-sys",
"serde 1.0.131",
@@ -972,8 +1048,7 @@ dependencies = [
[[package]]
name = "secp256k1-sys"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "827cb7cce42533829c792fc51b82fbf18b125b45a702ef2c8be77fce65463a7b"
source = "git+https://github.com/rust-bitcoin/rust-secp256k1.git?rev=50034ccb18fdd84904ab3aa6c84a12fcced33209#50034ccb18fdd84904ab3aa6c84a12fcced33209"
dependencies = [
"cc",
]
@@ -1022,6 +1097,7 @@ version = "1.0.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0ffa0837f2dfa6fb90868c2b5468cad482e175f7dad97e7421951e663f2b527"
dependencies = [
"indexmap",
"itoa",
"ryu",
"serde 1.0.131",
@@ -1061,6 +1137,16 @@ version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309"
[[package]]
name = "socket2"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5dc90fe6c7be1a323296982db1836d1ea9e47b6839496dde9a541bc496df3516"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "static_assertions"
version = "1.1.0"
@@ -1165,6 +1251,20 @@ dependencies = [
"tungstenite",
]
[[package]]
name = "tokio-util"
version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"log",
"pin-project-lite",
"tokio",
]
[[package]]
name = "toml"
version = "0.5.8"
@@ -1174,6 +1274,38 @@ dependencies = [
"serde 1.0.131",
]
[[package]]
name = "tower-service"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
[[package]]
name = "tracing"
version = "0.1.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105"
dependencies = [
"cfg-if",
"pin-project-lite",
"tracing-core",
]
[[package]]
name = "tracing-core"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4"
dependencies = [
"lazy_static",
]
[[package]]
name = "try-lock"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "tungstenite"
version = "0.16.0"
@@ -1259,6 +1391,16 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe"
[[package]]
name = "want"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0"
dependencies = [
"log",
"try-lock",
]
[[package]]
name = "wasi"
version = "0.10.2+wasi-snapshot-preview1"

View File

@@ -1,6 +1,6 @@
[package]
name = "nostr-rs-relay"
version = "0.2.0"
version = "0.3.2"
edition = "2021"
[dependencies]
@@ -15,11 +15,12 @@ thiserror = "^1"
uuid = { version = "^0.8", features = ["v4"] }
config = { version = "0.11", features = ["toml"] }
bitcoin_hashes = { version = "^0.9", features = ["serde"] }
secp256k1 = { version = "^0.20", features = ["rand", "rand-std", "serde", "bitcoin_hashes"] }
secp256k1 = {git = "https://github.com/rust-bitcoin/rust-secp256k1.git", rev = "50034ccb18fdd84904ab3aa6c84a12fcced33209", features = ["rand", "rand-std", "serde", "bitcoin_hashes"] }
serde = { version = "^1.0", features = ["derive"] }
serde_json = "^1.0"
serde_json = {version = "^1.0", features = ["preserve_order"]}
hex = "^0.4"
rusqlite = "^0.26"
lazy_static = "^1.4"
governor = "^0.4"
nonzero_ext = "^0.3"
hyper={ version="0.14", features=["server","http1","http2","tcp"] }

View File

@@ -15,7 +15,6 @@ RUN cargo build --release
FROM debian:buster-slim
ARG APP=/usr/src/app
ARG APP_DATA=/usr/src/app/db
RUN apt-get update \
&& apt-get install -y ca-certificates tzdata sqlite3 \
&& rm -rf /var/lib/apt/lists/*
@@ -35,9 +34,9 @@ COPY --from=builder /nostr-rs-relay/target/release/nostr-rs-relay ${APP}/nostr-r
RUN chown -R $APP_USER:$APP_USER ${APP}
USER $APP_USER
WORKDIR ${APP_DATA}
WORKDIR ${APP}
ENV RUST_LOG=info
ENV APP_DATA=${APP_DATA}
CMD ["../nostr-rs-relay"]
CMD ./nostr-rs-relay --db ${APP_DATA}

View File

@@ -10,17 +10,25 @@ mirrored on [GitHub](https://github.com/scsibug/nostr-rs-relay).
## Quick Start
The provided `Dockerfile` will compile and build the server application. Use a bind mount to store the SQLite database outside of the container image, and map the container's 8080 port to a host port (8090 in the example below).
The provided `Dockerfile` will compile and build the server
application. Use a bind mount to store the SQLite database outside of
the container image, and map the container's 8080 port to a host port
(7000 in the example below).
```console
$ docker build -t nostr-rs-relay .
$ docker run -p 8090:8080 --mount src=$(pwd)/nostr_data,target=/usr/src/app/db,type=bind nostr-rs-relay
[2021-12-12T04:20:47Z INFO nostr_rs_relay] Listening on: 0.0.0.0:8080
[2021-12-12T04:20:47Z INFO nostr_rs_relay::db] Opened database for writing
[2021-12-12T04:20:47Z INFO nostr_rs_relay::db] init completed
$ docker run -it -p 7000:8080 \
--mount src=$(pwd)/data,target=/usr/src/app/db,type=bind nostr-rs-relay
[2021-12-31T19:58:31Z INFO nostr_rs_relay] listening on: 0.0.0.0:8080
[2021-12-31T19:58:31Z INFO nostr_rs_relay::db] opened database "/usr/src/app/db/nostr.db" for writing
[2021-12-31T19:58:31Z INFO nostr_rs_relay::db] DB version = 2
```
Use a `nostr` client such as [`noscl`](https://github.com/fiatjaf/noscl) to publish and query events.
Use a `nostr` client such as
[`noscl`](https://github.com/fiatjaf/noscl) to publish and query
events.
```console
$ noscl publish "hello world"
@@ -31,6 +39,25 @@ Text Note [81cf...2652] from 296a...9b92 5 seconds ago
hello world
```
A pre-built container is also available on DockerHub:
https://hub.docker.com/repository/docker/scsibug/nostr-rs-relay
## Configuration
The sample `[config.toml](config.toml)` file demonstrates the
configuration available to the relay. This file is optional, but may
be mounted into a docker container like so:
```console
$ docker run -it -p 7000:8080 \
--mount src=$(pwd)/config.toml,target=/usr/src/app/config.toml,type=bind \
--mount src=$(pwd)/data,target=/usr/src/app/db,type=bind \
nostr-rs-relay
```
Options include rate-limiting, event size limits, and network address
settings.
License
---
This project is MIT licensed.

View File

@@ -1,5 +1,22 @@
# Nostr-rs-relay configuration
[info]
# Relay information for clients. Put your unique server name here.
name = "nostr-rs-relay"
# Description
description = "A newly created nostr-rs-relay.\n\nCustomize this with your own info."
# Administrative contact pubkey
#pubkey = "0c2d168a4ae8ca58c9f1ab237b5df682599c6c7ab74307ea8b05684b60405d41"
# Administrative contact email
#email = "contact@example.com"
[database]
# Directory for SQLite files. Defaults to the current directory. Can
# also be specified (and overriden) with the "--db dirname" command
# line option.
data_directory = "."
[network]
# Bind to this network address
address = "0.0.0.0"
@@ -10,23 +27,27 @@ port = 8080
# Reject events that have timestamps greater than this many seconds in
# the future. Defaults to rejecting anything greater than 30 minutes
# from the current time.
#reject_future_seconds = 1800
reject_future_seconds = 1800
[limits]
# Limit events created per second, averaged over one minute. Must be
# an integer. If not set (or set to 0), defaults to unlimited.
messages_per_sec = 0
# Maximum WebSocket message in bytes. Defaults to 128k.
#max_ws_message_bytes = 131072
# Limit the maximum size of an EVENT message. Defaults to 128 KB.
# Set to 0 for unlimited.
max_event_bytes = 131072
# Maximum WebSocket frame size in bytes. Defaults to 128k.
#max_ws_frame_bytes = 131072
# Maximum WebSocket message in bytes. Defaults to 128 KB.
max_ws_message_bytes = 131072
# Maximum WebSocket frame size in bytes. Defaults to 128 KB.
max_ws_frame_bytes = 131072
# Broadcast buffer size, in number of events. This prevents slow
# readers from consuming memory. Defaults to 4096.
#broadcast_buffer = 4096
broadcast_buffer = 4096
# Event persistence buffer size, in number of events. This provides
# backpressure to senders if writes are slow. Defaults to 16.
#event_persist_buffer = 16
event_persist_buffer = 16

View File

@@ -8,6 +8,22 @@ lazy_static! {
pub static ref SETTINGS: RwLock<Settings> = RwLock::new(Settings::default());
}
#[derive(Debug, Serialize, Deserialize)]
#[allow(unused)]
pub struct Info {
pub name: Option<String>,
#[serde(rename = "description")]
pub descr: Option<String>,
pub pubkey: Option<String>,
pub email: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
#[allow(unused)]
pub struct Database {
pub data_directory: String,
}
#[derive(Debug, Serialize, Deserialize)]
#[allow(unused)]
pub struct Network {
@@ -36,7 +52,7 @@ pub struct Retention {
#[allow(unused)]
pub struct Limits {
pub messages_per_sec: Option<u32>, // Artificially slow down event writing to limit disk consumption (averaged over 1 minute)
pub max_event_bytes: Option<usize>,
pub max_event_bytes: Option<usize>, // Maximum size of an EVENT message
pub max_ws_message_bytes: Option<usize>,
pub max_ws_frame_bytes: Option<usize>,
pub broadcast_buffer: usize, // events to buffer for subscribers (prevents slow readers from consuming memory)
@@ -46,6 +62,8 @@ pub struct Limits {
#[derive(Debug, Serialize, Deserialize)]
#[allow(unused)]
pub struct Settings {
pub info: Info,
pub database: Database,
pub network: Network,
pub limits: Limits,
pub retention: Retention,
@@ -82,6 +100,15 @@ impl Settings {
impl Default for Settings {
fn default() -> Self {
Settings {
info: Info {
name: Some("Unnamed nostr-rs-relay".to_owned()),
descr: None,
pubkey: None,
email: None,
},
database: Database {
data_directory: ".".to_owned(),
},
network: Network {
port: 8080,
address: "0.0.0.0".to_owned(),

View File

@@ -13,6 +13,7 @@ use rusqlite::OpenFlags;
use crate::config::SETTINGS;
use std::path::Path;
use std::thread;
use std::time::Instant;
use tokio::task;
/// Database file
@@ -122,14 +123,18 @@ pub async fn db_writer(
mut shutdown: tokio::sync::broadcast::Receiver<()>,
) -> tokio::task::JoinHandle<Result<()>> {
task::spawn_blocking(move || {
// get database configuration settings
let config = SETTINGS.read().unwrap();
let db_dir = &config.database.data_directory;
let full_path = Path::new(db_dir).join(DB_FILE);
// create a connection
let mut conn = Connection::open_with_flags(
Path::new(DB_FILE),
&full_path,
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
)?;
info!("opened database for writing");
info!("opened database {:?} for writing", full_path);
upgrade_db(&mut conn)?;
// get rate limit settings
let config = SETTINGS.read().unwrap();
let rps_setting = config.limits.messages_per_sec;
let mut lim_opt = None;
let clock = governor::clock::QuantaClock::default();
@@ -141,7 +146,7 @@ pub async fn db_writer(
}
}
loop {
if let Ok(_) = shutdown.try_recv() {
if shutdown.try_recv().is_ok() {
info!("shutting down database writer");
break;
}
@@ -153,12 +158,17 @@ pub async fn db_writer(
}
let mut event_write = false;
let event = next_event.unwrap();
let start = Instant::now();
match write_event(&mut conn, &event) {
Ok(updated) => {
if updated == 0 {
debug!("ignoring duplicate event");
} else {
info!("persisted event: {}", event.get_event_id_prefix());
info!(
"persisted event: {} in {:?}",
event.get_event_id_prefix(),
start.elapsed()
);
event_write = true;
// send this out to all clients
bcast_tx.send(event.clone()).ok();
@@ -298,35 +308,52 @@ fn query_from_sub(sub: &Subscription) -> String {
filter_components.push(authors_clause);
}
// Query for Kind
if f.kind.is_some() {
if let Some(ks) = &f.kinds {
// kind is number, no escaping needed
let kind_clause = format!("kind = {}", f.kind.unwrap());
let str_kinds: Vec<String> = ks.iter().map(|x| x.to_string()).collect();
let kind_clause = format!("kind IN ({})", str_kinds.join(", "));
filter_components.push(kind_clause);
}
// Query for event
if f.id.is_some() {
let id_str = f.id.as_ref().unwrap();
if is_hex(id_str) {
let id_clause = format!("event_hash = x'{}'", id_str);
filter_components.push(id_clause);
}
if f.ids.is_some() {
let ids_escaped: Vec<String> = f
.ids
.as_ref()
.unwrap()
.iter()
.filter(|&x| is_hex(x))
.map(|x| format!("x'{}'", x))
.collect();
let id_clause = format!("event_hash IN ({})", ids_escaped.join(", "));
filter_components.push(id_clause);
}
// Query for referenced event
if f.event.is_some() {
let ev_str = f.event.as_ref().unwrap();
if is_hex(ev_str) {
let ev_clause = format!("referenced_event = x'{}'", ev_str);
filter_components.push(ev_clause);
}
if f.events.is_some() {
let events_escaped: Vec<String> = f
.events
.as_ref()
.unwrap()
.iter()
.filter(|&x| is_hex(x))
.map(|x| format!("x'{}'", x))
.collect();
let events_clause = format!("referenced_event IN ({})", events_escaped.join(", "));
filter_components.push(events_clause);
}
// Query for referenced pet name pubkey
if f.pubkey.is_some() {
let pet_str = f.pubkey.as_ref().unwrap();
if is_hex(pet_str) {
let pet_clause = format!("referenced_pubkey = x'{}'", pet_str);
filter_components.push(pet_clause);
}
// Query for referenced pubkey
if f.pubkeys.is_some() {
let pubkeys_escaped: Vec<String> = f
.pubkeys
.as_ref()
.unwrap()
.iter()
.filter(|&x| is_hex(x))
.map(|x| format!("x'{}'", x))
.collect();
let pubkeys_clause = format!("referenced_pubkey IN ({})", pubkeys_escaped.join(", "));
filter_components.push(pubkeys_clause);
}
// Query for timestamp
if f.since.is_some() {
let created_clause = format!("created_at > {}", f.since.unwrap());
@@ -346,7 +373,7 @@ fn query_from_sub(sub: &Subscription) -> String {
filter_clauses.push(fc);
} else {
// never display hidden events
filter_clauses.push("hidden!=FALSE".to_owned());
filter_clauses.push("hidden!=TRUE".to_owned());
}
}
@@ -373,11 +400,16 @@ pub async fn db_query(
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
) {
task::spawn_blocking(move || {
let config = SETTINGS.read().unwrap();
let db_dir = &config.database.data_directory;
let full_path = Path::new(db_dir).join(DB_FILE);
let conn =
Connection::open_with_flags(Path::new(DB_FILE), OpenFlags::SQLITE_OPEN_READ_ONLY)
.unwrap();
Connection::open_with_flags(&full_path, OpenFlags::SQLITE_OPEN_READ_ONLY).unwrap();
debug!("opened database for reading");
debug!("going to query for: {:?}", sub);
let mut row_count: usize = 0;
let start = Instant::now();
// generate SQL query
let q = query_from_sub(&sub);
// execute the query
@@ -389,6 +421,7 @@ pub async fn db_query(
debug!("query aborted");
return;
}
row_count += 1;
// TODO: check before unwrapping
let event_json = row.get(0).unwrap();
query_tx
@@ -398,6 +431,10 @@ pub async fn db_query(
})
.ok();
}
debug!("query completed");
debug!(
"query completed ({} rows) in {:?}",
row_count,
start.elapsed()
);
});
}

View File

@@ -21,6 +21,8 @@ pub enum Error {
CloseParseFailed,
#[error("Event validation failed")]
EventInvalid,
#[error("Event too large")]
EventMaxLengthError(usize),
#[error("Subscription identifier max length exceeded")]
SubIdMaxLengthError,
#[error("Maximum concurrent subscription count reached")]
@@ -36,6 +38,8 @@ pub enum Error {
SqlError(rusqlite::Error),
#[error("Config error")]
ConfigError(config::ConfigError),
#[error("Data directory does not exist")]
DatabaseDirError,
}
impl From<rusqlite::Error> for Error {

View File

@@ -3,14 +3,19 @@ use crate::config;
use crate::error::Error::*;
use crate::error::Result;
use bitcoin_hashes::{sha256, Hash};
use lazy_static::lazy_static;
use log::*;
use secp256k1::{schnorrsig, Secp256k1};
use secp256k1::{schnorr, Secp256k1, VerifyOnly, XOnlyPublicKey};
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::value::Value;
use serde_json::Number;
use std::str::FromStr;
use std::time::SystemTime;
lazy_static! {
pub static ref SECP: Secp256k1<VerifyOnly> = Secp256k1::verification_only();
}
/// Event command in network format
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
pub struct EventCmd {
@@ -109,12 +114,15 @@ impl Event {
return false;
}
// * validate the message digest (sig) using the pubkey & computed sha256 message hash.
let secp = Secp256k1::new();
let sig = schnorrsig::Signature::from_str(&self.sig).unwrap();
let message = secp256k1::Message::from(digest);
let pubkey = schnorrsig::PublicKey::from_str(&self.pubkey).unwrap();
let verify = secp.schnorrsig_verify(&sig, &message, &pubkey);
matches!(verify, Ok(()))
let sig = schnorr::Signature::from_str(&self.sig).unwrap();
if let Ok(msg) = secp256k1::Message::from_slice(digest.as_ref()) {
let pubkey = XOnlyPublicKey::from_str(&self.pubkey).unwrap();
let verify = SECP.verify_schnorr(&sig, &msg, &pubkey);
matches!(verify, Ok(()))
} else {
warn!("Error converting digest to secp256k1 message");
false
}
}
/// Convert event to canonical representation for signing.

61
src/info.rs Normal file
View File

@@ -0,0 +1,61 @@
use crate::config;
/// Relay Info
use serde::{Deserialize, Serialize};
use serde_json::value::Value;
const CARGO_PKG_VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION");
#[derive(Debug, Serialize, Deserialize)]
#[allow(unused)]
pub struct RelayInfo {
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub descr: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub pubkey: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub email: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub supported_nips: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub software: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub version: Option<String>,
}
impl Default for RelayInfo {
fn default() -> Self {
RelayInfo {
name: None,
descr: None,
pubkey: None,
email: None,
supported_nips: Some(vec!["NIP-01".to_owned()]),
software: Some("https://git.sr.ht/~gheartsfield/nostr-rs-relay".to_owned()),
version: CARGO_PKG_VERSION.map(|x| x.to_owned()),
}
}
}
/// Convert an Info struct into Relay Info json string
pub fn relay_info_json(info: &config::Info) -> String {
// get a default RelayInfo
let mut r = RelayInfo::default();
// update fields from Info, if present
r.name = info.name.clone();
r.descr = info.descr.clone();
r.pubkey = info.pubkey.clone();
r.email = info.email.clone();
r.to_json()
}
impl RelayInfo {
pub fn to_json(self) -> String {
// create the info ARRAY
let mut info_arr: Vec<Value> = vec![];
info_arr.push(Value::String("NOSTR_SERVER_INFO".to_owned()));
info_arr.push(serde_json::to_value(&self).unwrap());
serde_json::to_string_pretty(&info_arr).unwrap()
}
}

View File

@@ -4,5 +4,6 @@ pub mod conn;
pub mod db;
pub mod error;
pub mod event;
pub mod info;
pub mod protostream;
pub mod subscription;

View File

@@ -1,6 +1,12 @@
//! Server process
use futures::SinkExt;
use futures::StreamExt;
use hyper::header::ACCEPT;
use hyper::service::{make_service_fn, service_fn};
use hyper::upgrade::Upgraded;
use hyper::{
header, server::conn::AddrStream, upgrade, Body, Request, Response, Server, StatusCode,
};
use log::*;
use nostr_rs_relay::close::Close;
use nostr_rs_relay::config;
@@ -8,31 +14,159 @@ use nostr_rs_relay::conn;
use nostr_rs_relay::db;
use nostr_rs_relay::error::{Error, Result};
use nostr_rs_relay::event::Event;
use nostr_rs_relay::info::relay_info_json;
use nostr_rs_relay::protostream;
use nostr_rs_relay::protostream::NostrMessage::*;
use nostr_rs_relay::protostream::NostrResponse::*;
use std::collections::HashMap;
use tokio::net::{TcpListener, TcpStream};
use std::convert::Infallible;
use std::env;
use std::net::SocketAddr;
use std::path::Path;
use tokio::runtime::Builder;
use tokio::sync::broadcast;
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio_tungstenite::WebSocketStream;
use tungstenite::handshake;
use tungstenite::protocol::WebSocketConfig;
fn db_from_args(args: Vec<String>) -> Option<String> {
if args.len() == 3 && args.get(1) == Some(&"--db".to_owned()) {
return args.get(2).map(|x| x.to_owned());
}
None
}
async fn handle_web_request(
mut request: Request<Body>,
remote_addr: SocketAddr,
broadcast: Sender<Event>,
event_tx: tokio::sync::mpsc::Sender<Event>,
shutdown: Receiver<()>,
) -> Result<Response<Body>, Infallible> {
match (
request.uri().path(),
request.headers().contains_key(header::UPGRADE),
) {
// Request for / as websocket
("/", true) => {
debug!("websocket with upgrade request");
//assume request is a handshake, so create the handshake response
let response = match handshake::server::create_response_with_body(&request, || {
Body::empty()
}) {
Ok(response) => {
//in case the handshake response creation succeeds,
//spawn a task to handle the websocket connection
tokio::spawn(async move {
//using the hyper feature of upgrading a connection
match upgrade::on(&mut request).await {
//if successfully upgraded
Ok(upgraded) => {
//create a websocket stream from the upgraded object
let ws_stream = WebSocketStream::from_raw_socket(
//pass the upgraded object
//as the base layer stream of the Websocket
upgraded,
tokio_tungstenite::tungstenite::protocol::Role::Server,
None,
)
.await;
tokio::spawn(nostr_server(
ws_stream, broadcast, event_tx, shutdown,
));
}
Err(e) => println!(
"error when trying to upgrade connection \
from address {} to websocket connection. \
Error is: {}",
remote_addr, e
),
}
});
//return the response to the handshake request
response
}
Err(error) => {
warn!("websocket response failed");
let mut res =
Response::new(Body::from(format!("Failed to create websocket: {}", error)));
*res.status_mut() = StatusCode::BAD_REQUEST;
return Ok(res);
}
};
Ok::<_, Infallible>(response)
}
// Request for Relay info
("/", false) => {
// handle request at root with no upgrade header
// Check if this is a nostr server info request
let accept_header = &request.headers().get(ACCEPT);
// check if application/nostr+json is included
if let Some(media_types) = accept_header {
if let Ok(mt_str) = media_types.to_str() {
if mt_str.contains("application/nostr+json") {
let config = config::SETTINGS.read().unwrap();
// build a relay info response
debug!("Responding to server info request");
let b = Body::from(relay_info_json(&config.info));
return Ok(Response::builder()
.status(200)
.header("Content-Type", "application/nostr+json")
.body(b)
.unwrap());
}
}
}
return Ok(Response::new(Body::from(
"Please use a Nostr client to connect.",
)));
}
(_, _) => {
//handle any other url
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("Nothing here."))
.unwrap())
}
}
}
async fn shutdown_signal() {
// Wait for the CTRL+C signal
tokio::signal::ctrl_c()
.await
.expect("failed to install CTRL+C signal handler");
}
/// Start running a Nostr relay server.
fn main() -> Result<(), Error> {
// setup logger
let _ = env_logger::try_init();
// get database directory from args
let args: Vec<String> = env::args().collect();
let db_dir: Option<String> = db_from_args(args);
{
let mut settings = config::SETTINGS.write().unwrap();
// replace default settings with those read from config.toml
let c = config::Settings::new();
let mut c = config::Settings::new();
// update with database location
if let Some(db) = db_dir {
c.database.data_directory = db;
}
*settings = c;
}
let config = config::SETTINGS.read().unwrap();
// do some config validation.
if !Path::new(&config.database.data_directory).is_dir() {
error!("Database directory does not exist");
return Err(Error::DatabaseDirError);
}
debug!("config: {:?}", config);
let addr = format!("{}:{}", config.network.address.trim(), config.network.port);
let socket_addr = addr.parse().expect("listening address not valid");
// configure tokio runtime
let rt = Builder::new_multi_thread()
.enable_all()
@@ -42,8 +176,7 @@ fn main() -> Result<(), Error> {
// start tokio
rt.block_on(async {
let settings = config::SETTINGS.read().unwrap();
let listener = TcpListener::bind(&addr).await.expect("Failed to bind");
info!("listening on: {}", addr);
info!("listening on: {}", socket_addr);
// all client-submitted valid events are broadcast to every
// other client on this channel. This should be large enough
// to accomodate slower readers (messages are dropped if
@@ -56,7 +189,7 @@ fn main() -> Result<(), Error> {
// requested server shutdown.
let (invoke_shutdown, _) = broadcast::channel::<()>(1);
let ctrl_c_shutdown = invoke_shutdown.clone();
// listen for ctrl-c interruupts
// // listen for ctrl-c interruupts
tokio::spawn(async move {
tokio::signal::ctrl_c().await.unwrap();
info!("shutting down due to SIGINT");
@@ -66,28 +199,35 @@ fn main() -> Result<(), Error> {
// writing events, and for publishing events that have been
// written (to all connected clients).
db::db_writer(event_rx, bcast_tx.clone(), invoke_shutdown.subscribe()).await;
// track unique client connection count
let mut client_accept_count: usize = 0;
let mut stop_listening = invoke_shutdown.subscribe();
// handle new client connection requests, or SIGINT signals.
loop {
tokio::select! {
_ = stop_listening.recv() => {
break;
}
Ok((stream, _)) = listener.accept() => {
client_accept_count += 1;
info!("creating new connection for client #{}",client_accept_count);
tokio::spawn(nostr_server(
stream,
bcast_tx.clone(),
event_tx.clone(),
invoke_shutdown.subscribe(),
));
}
info!("db writer created");
// A `Service` is needed for every connection, so this
// creates one from our `handle_request` function.
let make_svc = make_service_fn(|conn: &AddrStream| {
let remote_addr = conn.remote_addr();
let bcast = bcast_tx.clone();
let event = event_tx.clone();
let stop = invoke_shutdown.clone();
async move {
// service_fn converts our function into a `Service`
Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {
handle_web_request(
request,
remote_addr,
bcast.clone(),
event.clone(),
stop.subscribe(),
)
}))
}
});
let server = Server::bind(&socket_addr)
.serve(make_svc)
.with_graceful_shutdown(shutdown_signal());
// run hyper
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
// our code
});
Ok(())
}
@@ -95,7 +235,7 @@ fn main() -> Result<(), Error> {
/// Handle new client connections. This runs through an event loop
/// for all client communication.
async fn nostr_server(
stream: TcpStream,
ws_stream: WebSocketStream<Upgraded>,
broadcast: Sender<Event>,
event_tx: tokio::sync::mpsc::Sender<Event>,
mut shutdown: Receiver<()>,
@@ -109,8 +249,8 @@ async fn nostr_server(
config.max_frame_size = settings.limits.max_ws_frame_bytes;
}
// upgrade the TCP connection to WebSocket
let conn = tokio_tungstenite::accept_async_with_config(stream, Some(config)).await;
let ws_stream = conn.expect("websocket handshake error");
//let conn = tokio_tungstenite::accept_async_with_config(stream, Some(config)).await;
//let ws_stream = conn.expect("websocket handshake error");
// wrap websocket into a stream & sink of Nostr protocol messages
let mut nostr_stream = protostream::wrap_ws_in_nostr(ws_stream);
// Track internal client state
@@ -230,6 +370,10 @@ async fn nostr_server(
debug!("got connection close/error, disconnecting client: {}",cid);
break;
}
Some(Err(Error::EventMaxLengthError(s))) => {
info!("client {} sent event larger ({} bytes) than max size", cid, s);
nostr_stream.send(NoticeRes("event exceeded max size".to_owned())).await.ok();
},
Some(Err(e)) => {
info!("got non-fatal error from client: {}, error: {:?}", cid, e);
},

View File

@@ -1,5 +1,6 @@
//! Nostr protocol layered over WebSocket
use crate::close::CloseCmd;
use crate::config;
use crate::error::{Error, Result};
use crate::event::EventCmd;
use crate::subscription::Subscription;
@@ -8,9 +9,9 @@ use futures::sink::Sink;
use futures::stream::Stream;
use futures::task::Context;
use futures::task::Poll;
use hyper::upgrade::Upgraded;
use log::*;
use serde::{Deserialize, Serialize};
use tokio::net::TcpStream;
use tokio_tungstenite::WebSocketStream;
use tungstenite::error::Error as WsError;
use tungstenite::protocol::Message;
@@ -39,11 +40,11 @@ pub enum NostrResponse {
/// A Nostr protocol stream is layered on top of a Websocket stream.
pub struct NostrStream {
ws_stream: WebSocketStream<TcpStream>,
ws_stream: WebSocketStream<Upgraded>,
}
/// Given a websocket, return a protocol stream wrapper.
pub fn wrap_ws_in_nostr(ws: WebSocketStream<TcpStream>) -> NostrStream {
pub fn wrap_ws_in_nostr(ws: WebSocketStream<Upgraded>) -> NostrStream {
NostrStream { ws_stream: ws }
}
@@ -51,16 +52,26 @@ pub fn wrap_ws_in_nostr(ws: WebSocketStream<TcpStream>) -> NostrStream {
impl Stream for NostrStream {
type Item = Result<NostrMessage>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// get the configuration
/// Convert Message to NostrMessage
fn convert(msg: String) -> Result<NostrMessage> {
debug!("raw msg: {}", msg);
let event_size = msg.len();
debug!("event size is {} bytes", event_size);
let config = config::SETTINGS.read().unwrap();
let parsed_res: Result<NostrMessage> = serde_json::from_str(&msg).map_err(|e| e.into());
match parsed_res {
Ok(m) => Ok(m),
Ok(m) => {
if let NostrMessage::EventMsg(_) = m {
if let Some(max_size) = config.limits.max_event_bytes {
// check length, ensure that some max size is set.
if msg.len() > max_size && max_size > 0 {
return Err(Error::EventMaxLengthError(msg.len()));
}
}
}
Ok(m)
}
Err(e) => {
debug!("proto parse error: {:?}", e);
debug!("parse error on message: {}", msg.trim());
Err(Error::ProtoParseError)
}
}

View File

@@ -2,6 +2,7 @@
use crate::error::Result;
use crate::event::Event;
use serde::{Deserialize, Deserializer, Serialize};
use std::collections::HashSet;
/// Subscription identifier and set of request filters
#[derive(Serialize, PartialEq, Debug, Clone)]
@@ -17,16 +18,16 @@ pub struct Subscription {
/// absent ([`None`]) if it should be ignored.
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
pub struct ReqFilter {
/// Event hash
pub id: Option<String>,
/// Event kind
pub kind: Option<u64>,
/// Event hashes
pub ids: Option<Vec<String>>,
/// Event kinds
pub kinds: Option<Vec<u64>>,
/// Referenced event hash
#[serde(rename = "#e")]
pub event: Option<String>,
pub events: Option<Vec<String>>,
/// Referenced public key for a petname
#[serde(rename = "#p")]
pub pubkey: Option<String>,
pub pubkeys: Option<Vec<String>>,
/// Events published after this time
pub since: Option<u64>,
/// Events published before this time
@@ -105,8 +106,13 @@ impl Subscription {
impl ReqFilter {
/// Check for a match within the authors list.
// TODO: Ambiguity; what if the array is empty? Should we
// consider that the same as null?
fn ids_match(&self, event: &Event) -> bool {
self.ids
.as_ref()
.map(|vs| vs.contains(&event.id.to_owned()))
.unwrap_or(true)
}
fn authors_match(&self, event: &Event) -> bool {
self.authors
.as_ref()
@@ -115,29 +121,47 @@ impl ReqFilter {
}
/// Check if this filter either matches, or does not care about the event tags.
fn event_match(&self, event: &Event) -> bool {
self.event
.as_ref()
.map(|t| event.event_tag_match(t))
.unwrap_or(true)
// This needs to be analyzed for performance; building these
// hash sets for each active subscription isn't great.
if let Some(es) = &self.events {
let event_refs =
HashSet::<_>::from_iter(event.get_event_tags().iter().map(|x| x.to_owned()));
let filter_refs = HashSet::<_>::from_iter(es.iter().map(|x| &x[..]));
let cardinality = event_refs.intersection(&filter_refs).count();
cardinality > 0
} else {
true
}
}
/// Check if this filter either matches, or does not care about
/// the pubkey/petname tags.
fn pubkey_match(&self, event: &Event) -> bool {
self.pubkey
.as_ref()
.map(|t| event.pubkey_tag_match(t))
.unwrap_or(true)
// This needs to be analyzed for performance; building these
// hash sets for each active subscription isn't great.
if let Some(ps) = &self.pubkeys {
let pubkey_refs =
HashSet::<_>::from_iter(event.get_pubkey_tags().iter().map(|x| x.to_owned()));
let filter_refs = HashSet::<_>::from_iter(ps.iter().map(|x| &x[..]));
let cardinality = pubkey_refs.intersection(&filter_refs).count();
cardinality > 0
} else {
true
}
}
/// Check if this filter either matches, or does not care about the kind.
fn kind_match(&self, kind: u64) -> bool {
self.kind.map(|v| v == kind).unwrap_or(true)
self.kinds
.as_ref()
.map(|ks| ks.contains(&kind))
.unwrap_or(true)
}
/// Determine if all populated fields in this filter match the provided event.
pub fn interested_in_event(&self, event: &Event) -> bool {
self.id.as_ref().map(|v| v == &event.id).unwrap_or(true)
// self.id.as_ref().map(|v| v == &event.id).unwrap_or(true)
self.ids_match(event)
&& self.since.map(|t| event.created_at > t).unwrap_or(true)
&& self.kind_match(event.kind)
&& self.authors_match(event)