mirror of
https://github.com/scsibug/nostr-rs-relay.git
synced 2025-09-01 03:40:46 -04:00
Compare commits
48 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
6a8c4ed1b5 | ||
|
966c853700 | ||
|
65fd0ed08b | ||
|
0b51675b38 | ||
|
2e22334631 | ||
|
cb2ac4bf0f | ||
|
38dc7789dc | ||
|
ce0e00ffb3 | ||
|
3e4ae4aeec | ||
|
c6a8807485 | ||
|
8137b6211c | ||
|
29effaae23 | ||
|
e5074f2e46 | ||
|
4fd7643907 | ||
|
1e1ec69175 | ||
|
e08647867c | ||
|
ae0f7171ed | ||
|
4f1a912f36 | ||
|
95748647f0 | ||
|
25480e837f | ||
|
b80b54cd9d | ||
|
8ea732cbe5 | ||
|
0f68c4e5c2 | ||
|
dab2cd5792 | ||
|
f411aa6fc2 | ||
|
d31bbda087 | ||
|
5917bc53b2 | ||
|
91177c61a1 | ||
|
53c2a8051c | ||
|
168cf513ac | ||
|
ea204761c9 | ||
|
c270ae1434 | ||
|
64bd983cb6 | ||
|
1c153bc784 | ||
|
dc11d9a619 | ||
|
cd1557787b | ||
|
86bb7aeb9a | ||
|
ce37fc1a2d | ||
|
2cfd384339 | ||
|
8c013107f9 | ||
|
64a4466d30 | ||
|
1596c23eb4 | ||
|
129badd4e1 | ||
|
6f7c080180 | ||
|
af92561ef6 | ||
|
d833a3e40d | ||
|
462eb46642 | ||
|
cf144d503d |
@@ -11,6 +11,6 @@ repos:
|
||||
- repo: https://github.com/doublify/pre-commit-rust
|
||||
rev: v1.0
|
||||
hooks:
|
||||
- id: fmt
|
||||
# - id: fmt
|
||||
- id: cargo-check
|
||||
- id: clippy
|
||||
|
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -1096,7 +1096,7 @@ checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
|
||||
|
||||
[[package]]
|
||||
name = "nostr-rs-relay"
|
||||
version = "0.7.10"
|
||||
version = "0.7.14"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bitcoin_hashes",
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "nostr-rs-relay"
|
||||
version = "0.7.10"
|
||||
version = "0.7.14"
|
||||
edition = "2021"
|
||||
authors = ["Greg Heartsfield <scsibug@imap.cc>"]
|
||||
description = "A relay implementation for the Nostr protocol"
|
||||
@@ -28,7 +28,7 @@ secp256k1 = {version = "0.21", features = ["rand", "rand-std", "serde", "bitcoin
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = {version = "1.0", features = ["preserve_order"]}
|
||||
hex = "0.4"
|
||||
rusqlite = { version = "0.26", features = ["limits","bundled"]}
|
||||
rusqlite = { version = "0.26", features = ["limits","bundled","modern_sqlite", "trace"]}
|
||||
r2d2 = "0.8"
|
||||
r2d2_sqlite = "0.19"
|
||||
lazy_static = "1.4"
|
||||
|
@@ -1,4 +1,4 @@
|
||||
FROM docker.io/library/rust:1.66.0@sha256:359949280cebefe93ccb33089fe25111a3aadfe99eac4b6cbe8ec3e1b571dacb as builder
|
||||
FROM docker.io/library/rust:1.66.0 as builder
|
||||
|
||||
RUN USER=root cargo install cargo-auditable
|
||||
RUN USER=root cargo new --bin nostr-rs-relay
|
||||
@@ -17,7 +17,7 @@ COPY ./src ./src
|
||||
RUN rm ./target/release/deps/nostr*relay*
|
||||
RUN cargo auditable build --release --locked
|
||||
|
||||
FROM docker.io/library/debian:bullseye-20221205-slim@sha256:25f10b4f1ded5341a3ca0a30290ff3cd5639415f0c5a2222d5e7d5dd72952aa1
|
||||
FROM docker.io/library/debian:bullseye-slim
|
||||
|
||||
ARG APP=/usr/src/app
|
||||
ARG APP_DATA=/usr/src/app/db
|
||||
|
33
README.md
33
README.md
@@ -29,6 +29,7 @@ mirrored on [GitHub](https://github.com/scsibug/nostr-rs-relay).
|
||||
- [x] NIP-20: [Command Results](https://github.com/nostr-protocol/nips/blob/master/20.md)
|
||||
- [x] NIP-22: [Event `created_at` limits](https://github.com/nostr-protocol/nips/blob/master/22.md) (_future-dated events only_)
|
||||
- [x] NIP-26: [Event Delegation](https://github.com/nostr-protocol/nips/blob/master/26.md)
|
||||
- [x] NIP-28: [Public Chat](https://github.com/nostr-protocol/nips/blob/master/28.md)
|
||||
|
||||
## Quick Start
|
||||
|
||||
@@ -81,6 +82,38 @@ Text Note [81cf...2652] from 296a...9b92 5 seconds ago
|
||||
A pre-built container is also available on DockerHub:
|
||||
https://hub.docker.com/r/scsibug/nostr-rs-relay
|
||||
|
||||
## Build and Run (without Docker)
|
||||
|
||||
Building `nostr-rs-relay` requires an installation of Cargo & Rust: https://www.rust-lang.org/tools/install
|
||||
|
||||
Clone this repository, and then build a release version of the relay:
|
||||
|
||||
```console
|
||||
$ git clone -q https://git.sr.ht/\~gheartsfield/nostr-rs-relay
|
||||
$ cd nostr-rs-relay
|
||||
$ cargo build -q -r
|
||||
```
|
||||
|
||||
The relay executable is now located in
|
||||
`target/release/nostr-rs-relay`. In order to run it with logging
|
||||
enabled, execute it with the `RUST_LOG` variable set:
|
||||
|
||||
```console
|
||||
$ RUST_LOG=warn,nostr_rs_relay=info ./target/release/nostr-rs-relay
|
||||
Dec 26 10:31:56.455 INFO nostr_rs_relay: Starting up from main
|
||||
Dec 26 10:31:56.464 INFO nostr_rs_relay::server: listening on: 0.0.0.0:8080
|
||||
Dec 26 10:31:56.466 INFO nostr_rs_relay::server: db writer created
|
||||
Dec 26 10:31:56.466 INFO nostr_rs_relay::db: Built a connection pool "event writer" (min=1, max=2)
|
||||
Dec 26 10:31:56.466 INFO nostr_rs_relay::db: opened database "./nostr.db" for writing
|
||||
Dec 26 10:31:56.466 INFO nostr_rs_relay::schema: DB version = 11
|
||||
Dec 26 10:31:56.467 INFO nostr_rs_relay::db: Built a connection pool "maintenance writer" (min=1, max=2)
|
||||
Dec 26 10:31:56.467 INFO nostr_rs_relay::server: control message listener started
|
||||
Dec 26 10:31:56.468 INFO nostr_rs_relay::db: Built a connection pool "client query" (min=4, max=8)
|
||||
```
|
||||
|
||||
You now have a running relay, on port `8080`. Use a `nostr` client or
|
||||
`websocat` to connect and send/query for events.
|
||||
|
||||
## Configuration
|
||||
|
||||
The sample [`config.toml`](config.toml) file demonstrates the
|
||||
|
@@ -36,8 +36,9 @@ data_directory = "."
|
||||
# Minimum number of SQLite reader connections
|
||||
#min_conn = 4
|
||||
|
||||
# Maximum number of SQLite reader connections
|
||||
#max_conn = 128
|
||||
# Maximum number of SQLite reader connections. Recommend setting this
|
||||
# to approx the number of cores.
|
||||
#max_conn = 8
|
||||
|
||||
[network]
|
||||
# Bind to this network address
|
||||
|
@@ -1,3 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
sed -E 's/@sha256:[[:alnum:]]+//g' Dockerfile > Dockerfile.any-platform
|
||||
echo "Created platform-agnostic Dockerfile in 'Dockerfile.any-platform'"
|
@@ -1 +1,4 @@
|
||||
edition = "2021"
|
||||
#max_width = 140
|
||||
#chain_width = 100
|
||||
#fn_call_width = 100
|
||||
|
@@ -207,7 +207,7 @@ impl Default for Settings {
|
||||
data_directory: ".".to_owned(),
|
||||
in_memory: false,
|
||||
min_conn: 4,
|
||||
max_conn: 128,
|
||||
max_conn: 8,
|
||||
},
|
||||
network: Network {
|
||||
port: 8080,
|
||||
|
10
src/conn.rs
10
src/conn.rs
@@ -5,7 +5,7 @@ use crate::error::Result;
|
||||
|
||||
use crate::subscription::Subscription;
|
||||
use std::collections::HashMap;
|
||||
use tracing::{debug, info};
|
||||
use tracing::{debug, trace};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// A subscription identifier has a maximum length
|
||||
@@ -74,7 +74,7 @@ impl ClientConn {
|
||||
// prevent arbitrarily long subscription identifiers from
|
||||
// being used.
|
||||
if sub_id_len > MAX_SUBSCRIPTION_ID_LEN {
|
||||
info!(
|
||||
debug!(
|
||||
"ignoring sub request with excessive length: ({})",
|
||||
sub_id_len
|
||||
);
|
||||
@@ -84,7 +84,7 @@ impl ClientConn {
|
||||
if self.subscriptions.contains_key(&k) {
|
||||
self.subscriptions.remove(&k);
|
||||
self.subscriptions.insert(k, s.clone());
|
||||
debug!(
|
||||
trace!(
|
||||
"replaced existing subscription (cid: {}, sub: {:?})",
|
||||
self.get_client_prefix(),
|
||||
s.get_id()
|
||||
@@ -98,7 +98,7 @@ impl ClientConn {
|
||||
}
|
||||
// add subscription
|
||||
self.subscriptions.insert(k, s);
|
||||
debug!(
|
||||
trace!(
|
||||
"registered new subscription, currently have {} active subs (cid: {})",
|
||||
self.subscriptions.len(),
|
||||
self.get_client_prefix(),
|
||||
@@ -110,7 +110,7 @@ impl ClientConn {
|
||||
pub fn unsubscribe(&mut self, c: &Close) {
|
||||
// TODO: return notice if subscription did not exist.
|
||||
self.subscriptions.remove(&c.id);
|
||||
debug!(
|
||||
trace!(
|
||||
"removed subscription, currently have {} active subs (cid: {})",
|
||||
self.subscriptions.len(),
|
||||
self.get_client_prefix(),
|
||||
|
269
src/db.rs
269
src/db.rs
@@ -38,8 +38,13 @@ pub struct SubmittedEvent {
|
||||
|
||||
/// Database file
|
||||
pub const DB_FILE: &str = "nostr.db";
|
||||
/// How many persisted events before optimization is triggered
|
||||
pub const EVENT_COUNT_OPTIMIZE_TRIGGER: usize = 500;
|
||||
|
||||
/// How frequently to attempt checkpointing
|
||||
pub const CHECKPOINT_FREQ_SEC: u64 = 60;
|
||||
|
||||
/// How many persisted events before we pause for backups.
|
||||
/// It isn't clear this is enough to make the online backup API work yet.
|
||||
pub const EVENT_COUNT_BACKUP_PAUSE_TRIGGER: usize = 1000;
|
||||
|
||||
/// Build a database connection pool.
|
||||
/// # Panics
|
||||
@@ -78,7 +83,7 @@ pub fn build_pool(
|
||||
.test_on_check_out(true) // no noticeable performance hit
|
||||
.min_idle(Some(min_size))
|
||||
.max_size(max_size)
|
||||
.max_lifetime(Some(Duration::from_secs(60)))
|
||||
.max_lifetime(Some(Duration::from_secs(30)))
|
||||
.build(manager)
|
||||
.unwrap();
|
||||
info!(
|
||||
@@ -88,11 +93,55 @@ pub fn build_pool(
|
||||
pool
|
||||
}
|
||||
|
||||
/// Display database pool stats every 1 minute
|
||||
pub async fn monitor_pool(name: &str, pool: SqlitePool) {
|
||||
let sleep_dur = Duration::from_secs(60);
|
||||
loop {
|
||||
log_pool_stats(name, &pool);
|
||||
tokio::time::sleep(sleep_dur).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Perform normal maintenance
|
||||
pub fn optimize_db(conn: &mut PooledConnection) -> Result<()> {
|
||||
let start = Instant::now();
|
||||
conn.execute_batch("PRAGMA optimize;")?;
|
||||
info!("optimize ran in {:?}", start.elapsed());
|
||||
Ok(())
|
||||
}
|
||||
#[derive(Debug)]
|
||||
enum SqliteStatus {
|
||||
Ok,
|
||||
Busy,
|
||||
Error,
|
||||
Other(u64),
|
||||
}
|
||||
|
||||
/// Checkpoint/Truncate WAL. Returns the number of WAL pages remaining.
|
||||
pub fn checkpoint_db(conn: &mut PooledConnection) -> Result<usize> {
|
||||
let query = "PRAGMA wal_checkpoint(TRUNCATE);";
|
||||
let start = Instant::now();
|
||||
let (cp_result, wal_size, _frames_checkpointed) = conn.query_row(query, [], |row| {
|
||||
let checkpoint_result: u64 = row.get(0)?;
|
||||
let wal_size: u64 = row.get(1)?;
|
||||
let frames_checkpointed: u64 = row.get(2)?;
|
||||
Ok((checkpoint_result, wal_size, frames_checkpointed))
|
||||
})?;
|
||||
let result = match cp_result {
|
||||
0 => SqliteStatus::Ok,
|
||||
1 => SqliteStatus::Busy,
|
||||
2 => SqliteStatus::Error,
|
||||
x => SqliteStatus::Other(x),
|
||||
};
|
||||
info!(
|
||||
"checkpoint ran in {:?} (result: {:?}, WAL size: {})",
|
||||
start.elapsed(),
|
||||
result,
|
||||
wal_size
|
||||
);
|
||||
Ok(wal_size as usize)
|
||||
}
|
||||
|
||||
/// Spawn a database writer that persists events to the SQLite store.
|
||||
pub async fn db_writer(
|
||||
@@ -116,7 +165,7 @@ pub async fn db_writer(
|
||||
&settings,
|
||||
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
|
||||
1,
|
||||
4,
|
||||
2,
|
||||
false,
|
||||
);
|
||||
if settings.database.in_memory {
|
||||
@@ -133,8 +182,10 @@ pub async fn db_writer(
|
||||
let rps_setting = settings.limits.messages_per_sec;
|
||||
let mut most_recent_rate_limit = Instant::now();
|
||||
let mut lim_opt = None;
|
||||
// Keep rough track of events so we can run optimize eventually.
|
||||
let mut optimize_counter: usize = 0;
|
||||
// Constant writing has interfered with online backups. Keep
|
||||
// track of how long since we've given the backups a chance to
|
||||
// run.
|
||||
let mut backup_pause_counter: usize = 0;
|
||||
let clock = governor::clock::QuantaClock::default();
|
||||
if let Some(rps) = rps_setting {
|
||||
if rps > 0 {
|
||||
@@ -199,9 +250,10 @@ pub async fn db_writer(
|
||||
event.get_author_prefix()
|
||||
);
|
||||
} else {
|
||||
info!("rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)",
|
||||
uv.name.to_string(),
|
||||
event.get_author_prefix()
|
||||
info!(
|
||||
"rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)",
|
||||
uv.name.to_string(),
|
||||
event.get_author_prefix()
|
||||
);
|
||||
notice_tx
|
||||
.try_send(Notice::blocked(
|
||||
@@ -243,6 +295,7 @@ pub async fn db_writer(
|
||||
);
|
||||
event_write = true
|
||||
} else {
|
||||
log_pool_stats("writer", &pool);
|
||||
match write_event(&mut pool.get()?, &event) {
|
||||
Ok(updated) => {
|
||||
if updated == 0 {
|
||||
@@ -267,12 +320,11 @@ pub async fn db_writer(
|
||||
notice_tx.try_send(Notice::error(event.id, msg)).ok();
|
||||
}
|
||||
}
|
||||
// Use this as a trigger to do optimization
|
||||
optimize_counter += 1;
|
||||
if optimize_counter > EVENT_COUNT_OPTIMIZE_TRIGGER {
|
||||
info!("running database optimizer");
|
||||
optimize_counter = 0;
|
||||
optimize_db(&mut pool.get()?).ok();
|
||||
backup_pause_counter += 1;
|
||||
if backup_pause_counter > EVENT_COUNT_BACKUP_PAUSE_TRIGGER {
|
||||
info!("pausing db write thread for a moment...");
|
||||
thread::sleep(Duration::from_millis(500));
|
||||
backup_pause_counter = 0
|
||||
}
|
||||
}
|
||||
|
||||
@@ -320,7 +372,8 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
|
||||
)?;
|
||||
if ins_count == 0 {
|
||||
// if the event was a duplicate, no need to insert event or
|
||||
// pubkey references. This will abort the txn.
|
||||
// pubkey references.
|
||||
tx.rollback().ok();
|
||||
return Ok(ins_count);
|
||||
}
|
||||
// remember primary key of the event most recently inserted.
|
||||
@@ -338,9 +391,9 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
|
||||
// if tagvalue is lowercase hex;
|
||||
if is_lower_hex(tagval) && (tagval.len() % 2 == 0) {
|
||||
tx.execute(
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
|
||||
params![ev_id, &tagname, hex::decode(tagval).ok()],
|
||||
)?;
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
|
||||
params![ev_id, &tagname, hex::decode(tagval).ok()],
|
||||
)?;
|
||||
} else {
|
||||
tx.execute(
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)",
|
||||
@@ -355,7 +408,7 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
|
||||
// if this event is replaceable update, hide every other replaceable
|
||||
// event with the same kind from the same author that was issued
|
||||
// earlier than this.
|
||||
if e.kind == 0 || e.kind == 3 || (e.kind >= 10000 && e.kind < 20000) {
|
||||
if e.kind == 0 || e.kind == 3 || e.kind == 41 || (e.kind >= 10000 && e.kind < 20000) {
|
||||
let update_count = tx.execute(
|
||||
"UPDATE event SET hidden=TRUE WHERE id!=? AND kind=? AND author=? AND created_at <= ? and hidden!=TRUE",
|
||||
params![ev_id, e.kind, hex::decode(&e.pubkey).ok(), e.created_at],
|
||||
@@ -555,7 +608,10 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
let str_clause = format!("value IN ({})", repeat_vars(str_vals.len()));
|
||||
let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
|
||||
// find evidence of the target tag name/value existing for this event.
|
||||
let tag_clause = format!("e.id IN (SELECT e.id FROM event e LEFT JOIN tag t on e.id=t.event_id WHERE hidden!=TRUE and (name=? AND ({} OR {})))", str_clause, blob_clause);
|
||||
let tag_clause = format!(
|
||||
"e.id IN (SELECT e.id FROM event e LEFT JOIN tag t on e.id=t.event_id WHERE hidden!=TRUE and (name=? AND ({} OR {})))",
|
||||
str_clause, blob_clause
|
||||
);
|
||||
// add the tag name as the first parameter
|
||||
params.push(Box::new(key.to_string()));
|
||||
// add all tag values that are plain strings as params
|
||||
@@ -614,15 +670,80 @@ fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
(query, params)
|
||||
}
|
||||
|
||||
fn log_pool_stats(pool: &SqlitePool) {
|
||||
/// Check if the pool is fully utilized
|
||||
fn _pool_at_capacity(pool: &SqlitePool) -> bool {
|
||||
let state: r2d2::State = pool.state();
|
||||
state.idle_connections == 0
|
||||
}
|
||||
|
||||
/// Log pool stats
|
||||
fn log_pool_stats(name: &str, pool: &SqlitePool) {
|
||||
let state: r2d2::State = pool.state();
|
||||
let in_use_cxns = state.connections - state.idle_connections;
|
||||
debug!(
|
||||
"DB pool usage (in_use: {}, available: {})",
|
||||
in_use_cxns, state.connections
|
||||
"DB pool {:?} usage (in_use: {}, available: {}, max: {})",
|
||||
name,
|
||||
in_use_cxns,
|
||||
state.connections,
|
||||
pool.max_size()
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
/// Perform database maintenance on a regular basis
|
||||
pub async fn db_optimize(pool: SqlitePool) {
|
||||
tokio::task::spawn(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(Duration::from_secs(60*60)) => {
|
||||
if let Ok(mut conn) = pool.get() {
|
||||
// the busy timer will block writers, so don't set
|
||||
// this any higher than you want max latency for event
|
||||
// writes.
|
||||
info!("running database optimizer");
|
||||
optimize_db(&mut conn).ok();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Perform database WAL checkpoint on a regular basis
|
||||
pub async fn db_checkpoint(pool: SqlitePool) {
|
||||
tokio::task::spawn(async move {
|
||||
// WAL size in pages.
|
||||
let mut current_wal_size = 0;
|
||||
// WAL threshold for more aggressive checkpointing (10,000 pages, or about 40MB)
|
||||
let wal_threshold = 1000*10;
|
||||
// default threshold for the busy timer
|
||||
let busy_wait_default = Duration::from_secs(1);
|
||||
// if the WAL file is getting too big, switch to this
|
||||
let busy_wait_default_long = Duration::from_secs(5);
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(Duration::from_secs(CHECKPOINT_FREQ_SEC)) => {
|
||||
if let Ok(mut conn) = pool.get() {
|
||||
// the busy timer will block writers, so don't set
|
||||
// this any higher than you want max latency for event
|
||||
// writes.
|
||||
if current_wal_size <= wal_threshold {
|
||||
conn.busy_timeout(busy_wait_default).ok();
|
||||
} else {
|
||||
// if the wal size has exceeded a threshold, increase the busy timeout.
|
||||
conn.busy_timeout(busy_wait_default_long).ok();
|
||||
}
|
||||
debug!("running wal_checkpoint(TRUNCATE)");
|
||||
if let Ok(new_size) = checkpoint_db(&mut conn) {
|
||||
current_wal_size = new_size;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Perform a database query using a subscription.
|
||||
///
|
||||
/// The [`Subscription`] is converted into a SQL query. Each result
|
||||
@@ -636,24 +757,47 @@ pub async fn db_query(
|
||||
query_tx: tokio::sync::mpsc::Sender<QueryResult>,
|
||||
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
|
||||
) {
|
||||
let start = Instant::now();
|
||||
let pre_spawn_start = Instant::now();
|
||||
task::spawn_blocking(move || {
|
||||
debug!("moved DB query to thread in {:?}", start.elapsed());
|
||||
let db_queue_time = pre_spawn_start.elapsed();
|
||||
// if the queue time was very long (>5 seconds), spare the DB and abort.
|
||||
if db_queue_time > Duration::from_secs(5) {
|
||||
info!(
|
||||
"shedding DB query load from {:?} (cid: {}, sub: {:?})",
|
||||
db_queue_time, client_id, sub.id
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
// otherwise, report queuing time if it is slow
|
||||
else if db_queue_time > Duration::from_secs(1) {
|
||||
debug!(
|
||||
"(slow) DB query queued for {:?} (cid: {}, sub: {:?})",
|
||||
db_queue_time, client_id, sub.id
|
||||
);
|
||||
}
|
||||
let start = Instant::now();
|
||||
let mut row_count: usize = 0;
|
||||
// generate SQL query
|
||||
let (q, p) = query_from_sub(&sub);
|
||||
debug!("SQL generated in {:?}", start.elapsed());
|
||||
// show pool stats
|
||||
log_pool_stats(&pool);
|
||||
let sql_gen_elapsed = start.elapsed();
|
||||
if sql_gen_elapsed > Duration::from_millis(10) {
|
||||
debug!("SQL (slow) generated in {:?}", start.elapsed());
|
||||
}
|
||||
// cutoff for displaying slow queries
|
||||
let slow_cutoff = Duration::from_millis(1000);
|
||||
let slow_cutoff = Duration::from_millis(2000);
|
||||
// any client that doesn't cause us to generate new rows in 5
|
||||
// seconds gets dropped.
|
||||
let abort_cutoff = Duration::from_secs(5);
|
||||
let start = Instant::now();
|
||||
let mut slow_first_event;
|
||||
if let Ok(conn) = pool.get() {
|
||||
// execute the query. Don't cache, since queries vary so much.
|
||||
let mut stmt = conn.prepare(&q)?;
|
||||
let mut last_successful_send = Instant::now();
|
||||
if let Ok(mut conn) = pool.get() {
|
||||
// execute the query.
|
||||
// make the actual SQL query (with parameters inserted) available
|
||||
conn.trace(Some(|x| {trace!("SQL trace: {:?}", x)}));
|
||||
let mut stmt = conn.prepare_cached(&q)?;
|
||||
let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
|
||||
|
||||
let mut first_result = true;
|
||||
while let Some(row) = event_rows.next()? {
|
||||
let first_event_elapsed = start.elapsed();
|
||||
@@ -665,45 +809,49 @@ pub async fn db_query(
|
||||
);
|
||||
first_result = false;
|
||||
}
|
||||
// logging for slow queries; show sub and SQL
|
||||
//
|
||||
if slow_first_event {
|
||||
info!(
|
||||
// logging for slow queries; show sub and SQL.
|
||||
// to reduce logging; only show 1/16th of clients (leading 0)
|
||||
if row_count == 0 && slow_first_event && client_id.starts_with('0') {
|
||||
debug!(
|
||||
"query req (slow): {:?} (cid: {}, sub: {:?})",
|
||||
sub, client_id, sub.id
|
||||
);
|
||||
info!(
|
||||
"query string (slow): {} (cid: {}, sub: {:?})",
|
||||
q, client_id, sub.id
|
||||
);
|
||||
} else {
|
||||
trace!(
|
||||
"query req: {:?} (cid: {}, sub: {:?})",
|
||||
sub,
|
||||
client_id,
|
||||
sub.id
|
||||
);
|
||||
trace!(
|
||||
"query string: {} (cid: {}, sub: {:?})",
|
||||
q,
|
||||
client_id,
|
||||
sub.id
|
||||
);
|
||||
}
|
||||
// check if this is still active
|
||||
// TODO: check every N rows
|
||||
if abandon_query_rx.try_recv().is_ok() {
|
||||
}
|
||||
// check if this is still active; every 100 rows
|
||||
if row_count % 100 == 0 && abandon_query_rx.try_recv().is_ok() {
|
||||
debug!("query aborted (cid: {}, sub: {:?})", client_id, sub.id);
|
||||
return Ok(());
|
||||
}
|
||||
row_count += 1;
|
||||
let event_json = row.get(0)?;
|
||||
loop {
|
||||
if query_tx.capacity() != 0 {
|
||||
// we have capacity to add another item
|
||||
break;
|
||||
} else {
|
||||
// the queue is full
|
||||
trace!("db reader thread is stalled");
|
||||
if last_successful_send + abort_cutoff < Instant::now() {
|
||||
// the queue has been full for too long, abort
|
||||
info!("aborting database query due to slow client (cid: {}, sub: {:?})",
|
||||
client_id, sub.id);
|
||||
let ok: Result<()> = Ok(());
|
||||
return ok;
|
||||
}
|
||||
// give the queue a chance to clear before trying again
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
}
|
||||
}
|
||||
// TODO: we could use try_send, but we'd have to juggle
|
||||
// getting the query result back as part of the error
|
||||
// result.
|
||||
query_tx
|
||||
.blocking_send(QueryResult {
|
||||
sub_id: sub.get_id(),
|
||||
event: event_json,
|
||||
})
|
||||
.ok();
|
||||
last_successful_send = Instant::now();
|
||||
}
|
||||
query_tx
|
||||
.blocking_send(QueryResult {
|
||||
@@ -712,10 +860,11 @@ pub async fn db_query(
|
||||
})
|
||||
.ok();
|
||||
debug!(
|
||||
"query completed in {:?} (cid: {}, sub: {:?}, rows: {})",
|
||||
start.elapsed(),
|
||||
"query completed in {:?} (cid: {}, sub: {:?}, db_time: {:?}, rows: {})",
|
||||
pre_spawn_start.elapsed(),
|
||||
client_id,
|
||||
sub.id,
|
||||
start.elapsed(),
|
||||
row_count
|
||||
);
|
||||
} else {
|
||||
|
@@ -80,7 +80,7 @@ impl FromStr for Operator {
|
||||
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
|
||||
pub struct ConditionQuery {
|
||||
pub(crate) conditions: Vec<Condition>,
|
||||
pub conditions: Vec<Condition>,
|
||||
}
|
||||
|
||||
impl ConditionQuery {
|
||||
@@ -137,9 +137,9 @@ pub fn validate_delegation(
|
||||
/// An example complex condition would be: kind=1,2,3&created_at<1665265999
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
|
||||
pub struct Condition {
|
||||
pub(crate) field: Field,
|
||||
pub(crate) operator: Operator,
|
||||
pub(crate) values: Vec<u64>,
|
||||
pub field: Field,
|
||||
pub operator: Operator,
|
||||
pub values: Vec<u64>,
|
||||
}
|
||||
|
||||
impl Condition {
|
||||
@@ -332,19 +332,6 @@ mod tests {
|
||||
assert_eq!(parsed, cq);
|
||||
Ok(())
|
||||
}
|
||||
fn simple_event() -> Event {
|
||||
Event {
|
||||
id: "0".to_owned(),
|
||||
pubkey: "0".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 0,
|
||||
kind: 0,
|
||||
tags: vec![],
|
||||
content: "".to_owned(),
|
||||
sig: "0".to_owned(),
|
||||
tagidx: None,
|
||||
}
|
||||
}
|
||||
// Check for condition logic on event w/ empty values
|
||||
#[test]
|
||||
fn condition_with_empty_values() {
|
||||
@@ -353,7 +340,7 @@ mod tests {
|
||||
operator: Operator::GreaterThan,
|
||||
values: vec![],
|
||||
};
|
||||
let e = simple_event();
|
||||
let e = Event::simple_event();
|
||||
assert!(!c.allows_event(&e));
|
||||
c.operator = Operator::LessThan;
|
||||
assert!(!c.allows_event(&e));
|
||||
@@ -373,7 +360,7 @@ mod tests {
|
||||
operator: Operator::GreaterThan,
|
||||
values: vec![10],
|
||||
};
|
||||
let mut e = simple_event();
|
||||
let mut e = Event::simple_event();
|
||||
// kind is not greater than 10, not allowed
|
||||
e.kind = 1;
|
||||
assert!(!c.allows_event(&e));
|
||||
@@ -392,7 +379,7 @@ mod tests {
|
||||
operator: Operator::Equals,
|
||||
values: vec![0, 10, 20],
|
||||
};
|
||||
let mut e = simple_event();
|
||||
let mut e = Event::simple_event();
|
||||
// Allow if event kind is in list for Equals
|
||||
e.kind = 10;
|
||||
assert!(c.allows_event(&e));
|
||||
|
56
src/event.rs
56
src/event.rs
@@ -37,19 +37,19 @@ impl EventCmd {
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
|
||||
pub struct Event {
|
||||
pub id: String,
|
||||
pub(crate) pubkey: String,
|
||||
pub pubkey: String,
|
||||
#[serde(skip)]
|
||||
pub(crate) delegated_by: Option<String>,
|
||||
pub(crate) created_at: u64,
|
||||
pub(crate) kind: u64,
|
||||
pub delegated_by: Option<String>,
|
||||
pub created_at: u64,
|
||||
pub kind: u64,
|
||||
#[serde(deserialize_with = "tag_from_string")]
|
||||
// NOTE: array-of-arrays may need to be more general than a string container
|
||||
pub(crate) tags: Vec<Vec<String>>,
|
||||
pub(crate) content: String,
|
||||
pub(crate) sig: String,
|
||||
pub tags: Vec<Vec<String>>,
|
||||
pub content: String,
|
||||
pub sig: String,
|
||||
// Optimization for tag search, built on demand.
|
||||
#[serde(skip)]
|
||||
pub(crate) tagidx: Option<HashMap<char, HashSet<String>>>,
|
||||
pub tagidx: Option<HashMap<char, HashSet<String>>>,
|
||||
}
|
||||
|
||||
/// Simple tag type for array of array of strings.
|
||||
@@ -101,6 +101,21 @@ impl From<EventCmd> for Result<Event> {
|
||||
}
|
||||
|
||||
impl Event {
|
||||
#[cfg(test)]
|
||||
pub fn simple_event() -> Event {
|
||||
Event {
|
||||
id: "0".to_owned(),
|
||||
pubkey: "0".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 0,
|
||||
kind: 0,
|
||||
tags: vec![],
|
||||
content: "".to_owned(),
|
||||
sig: "0".to_owned(),
|
||||
tagidx: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_kind_metadata(&self) -> bool {
|
||||
self.kind == 0
|
||||
}
|
||||
@@ -226,7 +241,7 @@ impl Event {
|
||||
}
|
||||
|
||||
/// Check if this event has a valid signature.
|
||||
fn validate(&self) -> Result<()> {
|
||||
pub fn validate(&self) -> Result<()> {
|
||||
// TODO: return a Result with a reason for invalid events
|
||||
// validation is performed by:
|
||||
// * parsing JSON string into event fields
|
||||
@@ -319,31 +334,18 @@ impl Event {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
fn simple_event() -> Event {
|
||||
Event {
|
||||
id: "0".to_owned(),
|
||||
pubkey: "0".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 0,
|
||||
kind: 0,
|
||||
tags: vec![],
|
||||
content: "".to_owned(),
|
||||
sig: "0".to_owned(),
|
||||
tagidx: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_creation() {
|
||||
// create an event
|
||||
let event = simple_event();
|
||||
let event = Event::simple_event();
|
||||
assert_eq!(event.id, "0");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_serialize() -> Result<()> {
|
||||
// serialize an event to JSON string
|
||||
let event = simple_event();
|
||||
let event = Event::simple_event();
|
||||
let j = serde_json::to_string(&event)?;
|
||||
assert_eq!(j, "{\"id\":\"0\",\"pubkey\":\"0\",\"created_at\":0,\"kind\":0,\"tags\":[],\"content\":\"\",\"sig\":\"0\"}");
|
||||
Ok(())
|
||||
@@ -351,14 +353,14 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn empty_event_tag_match() {
|
||||
let event = simple_event();
|
||||
let event = Event::simple_event();
|
||||
assert!(!event
|
||||
.generic_tag_val_intersect('e', &HashSet::from(["foo".to_owned(), "bar".to_owned()])));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn single_event_tag_match() {
|
||||
let mut event = simple_event();
|
||||
let mut event = Event::simple_event();
|
||||
event.tags = vec![vec!["e".to_owned(), "foo".to_owned()]];
|
||||
event.build_index();
|
||||
assert_eq!(
|
||||
@@ -373,7 +375,7 @@ mod tests {
|
||||
#[test]
|
||||
fn event_tags_serialize() -> Result<()> {
|
||||
// serialize an event with tags to JSON string
|
||||
let mut event = simple_event();
|
||||
let mut event = Event::simple_event();
|
||||
event.tags = vec![
|
||||
vec![
|
||||
"e".to_owned(),
|
||||
|
@@ -517,7 +517,7 @@ impl Verifier {
|
||||
Ok(updated) => {
|
||||
if updated != 0 {
|
||||
info!(
|
||||
"persisted event: {:?} in {:?}",
|
||||
"persisted event (new verified pubkey): {:?} in {:?}",
|
||||
event.get_event_id_prefix(),
|
||||
start.elapsed()
|
||||
);
|
||||
@@ -721,7 +721,7 @@ pub fn query_oldest_user_verification(
|
||||
earliest: u64,
|
||||
) -> Result<VerificationRecord> {
|
||||
let tx = conn.transaction()?;
|
||||
let query = "SELECT v.id, v.name, e.event_hash, e.author, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v LEFT JOIN event e ON e.id=v.metadata_event WHERE (v.verified_at < ? OR v.verified_at IS NULL) AND (v.failed_at < ? OR v.failed_at IS NULL) ORDER BY v.verified_at ASC, v.failed_at ASC LIMIT 1;";
|
||||
let query = "SELECT v.id, v.name, e.event_hash, e.author, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v INNER JOIN event e ON e.id=v.metadata_event WHERE (v.verified_at < ? OR v.verified_at IS NULL) AND (v.failed_at < ? OR v.failed_at IS NULL) ORDER BY v.verified_at ASC, v.failed_at ASC LIMIT 1;";
|
||||
let mut stmt = tx.prepare_cached(query)?;
|
||||
let fields = stmt.query_row(params![earliest, earliest], |r| {
|
||||
let rowid: u64 = r.get(0)?;
|
||||
|
@@ -16,7 +16,7 @@ pub const STARTUP_SQL: &str = r##"
|
||||
PRAGMA main.synchronous=NORMAL;
|
||||
PRAGMA foreign_keys = ON;
|
||||
PRAGMA journal_size_limit=32768;
|
||||
pragma mmap_size = 1073741824; -- 1024MB of mmap
|
||||
pragma mmap_size = 17179869184; -- cap mmap at 16GB
|
||||
"##;
|
||||
|
||||
/// Latest database version
|
||||
|
@@ -22,12 +22,14 @@ use hyper::upgrade::Upgraded;
|
||||
use hyper::{
|
||||
header, server::conn::AddrStream, upgrade, Body, Request, Response, Server, StatusCode,
|
||||
};
|
||||
use rusqlite::OpenFlags;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use std::collections::HashMap;
|
||||
use std::convert::Infallible;
|
||||
use std::net::SocketAddr;
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::mpsc::Receiver as MpscReceiver;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
@@ -87,6 +89,7 @@ async fn handle_web_request(
|
||||
Some(config),
|
||||
)
|
||||
.await;
|
||||
let origin = get_header_string("origin", request.headers());
|
||||
let user_agent = get_header_string("user-agent", request.headers());
|
||||
// determine the remote IP from headers if the exist
|
||||
let header_ip = settings
|
||||
@@ -100,6 +103,7 @@ async fn handle_web_request(
|
||||
let client_info = ClientInfo {
|
||||
remote_ip,
|
||||
user_agent,
|
||||
origin,
|
||||
};
|
||||
// spawn a nostr server with our websocket
|
||||
tokio::spawn(nostr_server(
|
||||
@@ -244,14 +248,19 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
|
||||
// configure tokio runtime
|
||||
let rt = Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.thread_name("tokio-ws")
|
||||
.thread_name_fn(|| {
|
||||
// give each thread a unique numeric name
|
||||
static ATOMIC_ID: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
|
||||
let id = ATOMIC_ID.fetch_add(1,Ordering::SeqCst);
|
||||
format!("tokio-ws-{}", id)
|
||||
})
|
||||
// limit concurrent SQLite blocking threads
|
||||
.max_blocking_threads(settings.limits.max_blocking_threads)
|
||||
.on_thread_start(|| {
|
||||
debug!("started new thread");
|
||||
trace!("started new thread: {:?}", std::thread::current().name());
|
||||
})
|
||||
.on_thread_stop(|| {
|
||||
debug!("stopping thread");
|
||||
trace!("stopped thread: {:?}", std::thread::current().name());
|
||||
})
|
||||
.build()
|
||||
.unwrap();
|
||||
@@ -310,6 +319,18 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
|
||||
}
|
||||
}
|
||||
}
|
||||
// build a connection pool for DB maintenance
|
||||
let maintenance_pool = db::build_pool(
|
||||
"maintenance writer",
|
||||
&settings,
|
||||
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
|
||||
1,
|
||||
2,
|
||||
false,
|
||||
);
|
||||
db::db_optimize(maintenance_pool.clone()).await;
|
||||
db::db_checkpoint(maintenance_pool).await;
|
||||
|
||||
// listen for (external to tokio) shutdown request
|
||||
let controlled_shutdown = invoke_shutdown.clone();
|
||||
tokio::spawn(async move {
|
||||
@@ -339,12 +360,15 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
|
||||
let pool = db::build_pool(
|
||||
"client query",
|
||||
&settings,
|
||||
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
|
||||
| rusqlite::OpenFlags::SQLITE_OPEN_SHARED_CACHE,
|
||||
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY,
|
||||
db_min_conn,
|
||||
db_max_conn,
|
||||
true,
|
||||
);
|
||||
// spawn a task to check the pool size.
|
||||
let pool_monitor = pool.clone();
|
||||
tokio::spawn(async move {db::monitor_pool("reader", pool_monitor).await;});
|
||||
|
||||
// A `Service` is needed for every connection, so this
|
||||
// creates one from our `handle_request` function.
|
||||
let make_svc = make_service_fn(|conn: &AddrStream| {
|
||||
@@ -428,6 +452,7 @@ fn make_notice_message(notice: Notice) -> Message {
|
||||
struct ClientInfo {
|
||||
remote_ip: String,
|
||||
user_agent: Option<String>,
|
||||
origin: Option<String>,
|
||||
}
|
||||
|
||||
/// Handle new client connections. This runs through an event loop
|
||||
@@ -491,9 +516,14 @@ async fn nostr_server(
|
||||
let mut client_published_event_count: usize = 0;
|
||||
let mut client_received_event_count: usize = 0;
|
||||
debug!("new client connection (cid: {}, ip: {:?})", cid, conn.ip());
|
||||
if let Some(ua) = client_info.user_agent {
|
||||
debug!("cid: {}, user-agent: {:?}", cid, ua);
|
||||
}
|
||||
let origin = client_info.origin.unwrap_or_else(|| "<unspecified>".into());
|
||||
let user_agent = client_info
|
||||
.user_agent
|
||||
.unwrap_or_else(|| "<unspecified>".into());
|
||||
debug!(
|
||||
"cid: {}, origin: {:?}, user-agent: {:?}",
|
||||
cid, origin, user_agent
|
||||
);
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = shutdown.recv() => {
|
||||
@@ -539,7 +569,7 @@ async fn nostr_server(
|
||||
// TODO: serialize at broadcast time, instead of
|
||||
// once for each consumer.
|
||||
if let Ok(event_str) = serde_json::to_string(&global_event) {
|
||||
debug!("sub match for client: {}, sub: {:?}, event: {:?}",
|
||||
trace!("sub match for client: {}, sub: {:?}, event: {:?}",
|
||||
cid, s,
|
||||
global_event.get_event_id_prefix());
|
||||
// create an event response and send it
|
||||
|
@@ -65,12 +65,21 @@ impl<'de> Deserialize<'de> for ReqFilter {
|
||||
tags: None,
|
||||
force_no_match: false,
|
||||
};
|
||||
let empty_string = "".into();
|
||||
let mut ts = None;
|
||||
// iterate through each key, and assign values that exist
|
||||
for (key, val) in filter.into_iter() {
|
||||
// ids
|
||||
if key == "ids" {
|
||||
rf.ids = Deserialize::deserialize(val).ok();
|
||||
let raw_ids: Option<Vec<String>>= Deserialize::deserialize(val).ok();
|
||||
if let Some(a) = raw_ids.as_ref() {
|
||||
if a.contains(&empty_string) {
|
||||
return Err(serde::de::Error::invalid_type(
|
||||
Unexpected::Other("prefix matches must not be empty strings"),
|
||||
&"a json object"));
|
||||
}
|
||||
}
|
||||
rf.ids =raw_ids;
|
||||
} else if key == "kinds" {
|
||||
rf.kinds = Deserialize::deserialize(val).ok();
|
||||
} else if key == "since" {
|
||||
@@ -80,7 +89,15 @@ impl<'de> Deserialize<'de> for ReqFilter {
|
||||
} else if key == "limit" {
|
||||
rf.limit = Deserialize::deserialize(val).ok();
|
||||
} else if key == "authors" {
|
||||
rf.authors = Deserialize::deserialize(val).ok();
|
||||
let raw_authors: Option<Vec<String>>= Deserialize::deserialize(val).ok();
|
||||
if let Some(a) = raw_authors.as_ref() {
|
||||
if a.contains(&empty_string) {
|
||||
return Err(serde::de::Error::invalid_type(
|
||||
Unexpected::Other("prefix matches must not be empty strings"),
|
||||
&"a json object"));
|
||||
}
|
||||
}
|
||||
rf.authors = raw_authors;
|
||||
} else if key.starts_with('#') && key.len() > 1 && val.is_array() {
|
||||
if let Some(tag_search) = tag_search_char_from_filter(key) {
|
||||
if ts.is_none() {
|
||||
@@ -294,6 +311,24 @@ mod tests {
|
||||
assert!(serde_json::from_str::<Subscription>(raw_json).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn req_empty_authors_prefix() {
|
||||
let raw_json = "[\"REQ\",\"some-id\",{\"authors\": [\"\"]}]";
|
||||
assert!(serde_json::from_str::<Subscription>(raw_json).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn req_empty_ids_prefix() {
|
||||
let raw_json = "[\"REQ\",\"some-id\",{\"ids\": [\"\"]}]";
|
||||
assert!(serde_json::from_str::<Subscription>(raw_json).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn req_empty_ids_prefix_mixed() {
|
||||
let raw_json = "[\"REQ\",\"some-id\",{\"ids\": [\"\",\"aaa\"]}]";
|
||||
assert!(serde_json::from_str::<Subscription>(raw_json).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn legacy_filter() {
|
||||
// legacy field in filter
|
||||
|
Reference in New Issue
Block a user