mirror of
https://github.com/scsibug/nostr-rs-relay.git
synced 2025-09-01 03:40:46 -04:00
Compare commits
28 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
95748647f0 | ||
|
25480e837f | ||
|
b80b54cd9d | ||
|
8ea732cbe5 | ||
|
0f68c4e5c2 | ||
|
dab2cd5792 | ||
|
f411aa6fc2 | ||
|
d31bbda087 | ||
|
5917bc53b2 | ||
|
91177c61a1 | ||
|
53c2a8051c | ||
|
168cf513ac | ||
|
ea204761c9 | ||
|
c270ae1434 | ||
|
64bd983cb6 | ||
|
1c153bc784 | ||
|
dc11d9a619 | ||
|
cd1557787b | ||
|
86bb7aeb9a | ||
|
ce37fc1a2d | ||
|
2cfd384339 | ||
|
8c013107f9 | ||
|
64a4466d30 | ||
|
1596c23eb4 | ||
|
129badd4e1 | ||
|
6f7c080180 | ||
|
af92561ef6 | ||
|
d833a3e40d |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -1096,7 +1096,7 @@ checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
|
||||
|
||||
[[package]]
|
||||
name = "nostr-rs-relay"
|
||||
version = "0.7.11"
|
||||
version = "0.7.13"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bitcoin_hashes",
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "nostr-rs-relay"
|
||||
version = "0.7.11"
|
||||
version = "0.7.13"
|
||||
edition = "2021"
|
||||
authors = ["Greg Heartsfield <scsibug@imap.cc>"]
|
||||
description = "A relay implementation for the Nostr protocol"
|
||||
|
@@ -1 +1,4 @@
|
||||
edition = "2021"
|
||||
#max_width = 140
|
||||
#chain_width = 100
|
||||
#fn_call_width = 100
|
||||
|
10
src/conn.rs
10
src/conn.rs
@@ -5,7 +5,7 @@ use crate::error::Result;
|
||||
|
||||
use crate::subscription::Subscription;
|
||||
use std::collections::HashMap;
|
||||
use tracing::{debug, info};
|
||||
use tracing::{debug, trace};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// A subscription identifier has a maximum length
|
||||
@@ -74,7 +74,7 @@ impl ClientConn {
|
||||
// prevent arbitrarily long subscription identifiers from
|
||||
// being used.
|
||||
if sub_id_len > MAX_SUBSCRIPTION_ID_LEN {
|
||||
info!(
|
||||
debug!(
|
||||
"ignoring sub request with excessive length: ({})",
|
||||
sub_id_len
|
||||
);
|
||||
@@ -84,7 +84,7 @@ impl ClientConn {
|
||||
if self.subscriptions.contains_key(&k) {
|
||||
self.subscriptions.remove(&k);
|
||||
self.subscriptions.insert(k, s.clone());
|
||||
debug!(
|
||||
trace!(
|
||||
"replaced existing subscription (cid: {}, sub: {:?})",
|
||||
self.get_client_prefix(),
|
||||
s.get_id()
|
||||
@@ -98,7 +98,7 @@ impl ClientConn {
|
||||
}
|
||||
// add subscription
|
||||
self.subscriptions.insert(k, s);
|
||||
debug!(
|
||||
trace!(
|
||||
"registered new subscription, currently have {} active subs (cid: {})",
|
||||
self.subscriptions.len(),
|
||||
self.get_client_prefix(),
|
||||
@@ -110,7 +110,7 @@ impl ClientConn {
|
||||
pub fn unsubscribe(&mut self, c: &Close) {
|
||||
// TODO: return notice if subscription did not exist.
|
||||
self.subscriptions.remove(&c.id);
|
||||
debug!(
|
||||
trace!(
|
||||
"removed subscription, currently have {} active subs (cid: {})",
|
||||
self.subscriptions.len(),
|
||||
self.get_client_prefix(),
|
||||
|
191
src/db.rs
191
src/db.rs
@@ -38,8 +38,14 @@ pub struct SubmittedEvent {
|
||||
|
||||
/// Database file
|
||||
pub const DB_FILE: &str = "nostr.db";
|
||||
/// How many persisted events before optimization is triggered
|
||||
pub const EVENT_COUNT_OPTIMIZE_TRIGGER: usize = 500;
|
||||
|
||||
/// How frequently to run maintenance
|
||||
/// How many persisted events before DB maintenannce is triggered.
|
||||
pub const EVENT_MAINTENANCE_FREQ_SEC: u64 = 60;
|
||||
|
||||
/// How many persisted events before we pause for backups.
|
||||
/// It isn't clear this is enough to make the online backup API work yet.
|
||||
pub const EVENT_COUNT_BACKUP_PAUSE_TRIGGER: usize = 1000;
|
||||
|
||||
/// Build a database connection pool.
|
||||
/// # Panics
|
||||
@@ -78,7 +84,7 @@ pub fn build_pool(
|
||||
.test_on_check_out(true) // no noticeable performance hit
|
||||
.min_idle(Some(min_size))
|
||||
.max_size(max_size)
|
||||
.max_lifetime(Some(Duration::from_secs(60)))
|
||||
.max_lifetime(Some(Duration::from_secs(30)))
|
||||
.build(manager)
|
||||
.unwrap();
|
||||
info!(
|
||||
@@ -90,7 +96,41 @@ pub fn build_pool(
|
||||
|
||||
/// Perform normal maintenance
|
||||
pub fn optimize_db(conn: &mut PooledConnection) -> Result<()> {
|
||||
let start = Instant::now();
|
||||
conn.execute_batch("PRAGMA optimize;")?;
|
||||
info!("optimize ran in {:?}", start.elapsed());
|
||||
Ok(())
|
||||
}
|
||||
#[derive(Debug)]
|
||||
enum SqliteReturnStatus {
|
||||
SqliteOk,
|
||||
SqliteBusy,
|
||||
SqliteError,
|
||||
SqliteOther(u64),
|
||||
}
|
||||
|
||||
/// Checkpoint/Truncate WAL
|
||||
pub fn checkpoint_db(conn: &mut PooledConnection) -> Result<()> {
|
||||
let query = "PRAGMA wal_checkpoint(TRUNCATE);";
|
||||
let start = Instant::now();
|
||||
let (cp_result, wal_size, _frames_checkpointed) = conn.query_row(query, [], |row| {
|
||||
let checkpoint_result: u64 = row.get(0)?;
|
||||
let wal_size: u64 = row.get(1)?;
|
||||
let frames_checkpointed: u64 = row.get(2)?;
|
||||
Ok((checkpoint_result, wal_size, frames_checkpointed))
|
||||
})?;
|
||||
let result = match cp_result {
|
||||
0 => SqliteReturnStatus::SqliteOk,
|
||||
1 => SqliteReturnStatus::SqliteBusy,
|
||||
2 => SqliteReturnStatus::SqliteError,
|
||||
x => SqliteReturnStatus::SqliteOther(x),
|
||||
};
|
||||
info!(
|
||||
"checkpoint ran in {:?} (result: {:?}, WAL size: {})",
|
||||
start.elapsed(),
|
||||
result,
|
||||
wal_size
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -116,7 +156,7 @@ pub async fn db_writer(
|
||||
&settings,
|
||||
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
|
||||
1,
|
||||
4,
|
||||
2,
|
||||
false,
|
||||
);
|
||||
if settings.database.in_memory {
|
||||
@@ -133,8 +173,10 @@ pub async fn db_writer(
|
||||
let rps_setting = settings.limits.messages_per_sec;
|
||||
let mut most_recent_rate_limit = Instant::now();
|
||||
let mut lim_opt = None;
|
||||
// Keep rough track of events so we can run optimize eventually.
|
||||
let mut optimize_counter: usize = 0;
|
||||
// Constant writing has interfered with online backups. Keep
|
||||
// track of how long since we've given the backups a chance to
|
||||
// run.
|
||||
let mut backup_pause_counter: usize = 0;
|
||||
let clock = governor::clock::QuantaClock::default();
|
||||
if let Some(rps) = rps_setting {
|
||||
if rps > 0 {
|
||||
@@ -199,9 +241,10 @@ pub async fn db_writer(
|
||||
event.get_author_prefix()
|
||||
);
|
||||
} else {
|
||||
info!("rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)",
|
||||
uv.name.to_string(),
|
||||
event.get_author_prefix()
|
||||
info!(
|
||||
"rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)",
|
||||
uv.name.to_string(),
|
||||
event.get_author_prefix()
|
||||
);
|
||||
notice_tx
|
||||
.try_send(Notice::blocked(
|
||||
@@ -243,6 +286,7 @@ pub async fn db_writer(
|
||||
);
|
||||
event_write = true
|
||||
} else {
|
||||
log_pool_stats("writer", &pool);
|
||||
match write_event(&mut pool.get()?, &event) {
|
||||
Ok(updated) => {
|
||||
if updated == 0 {
|
||||
@@ -267,12 +311,11 @@ pub async fn db_writer(
|
||||
notice_tx.try_send(Notice::error(event.id, msg)).ok();
|
||||
}
|
||||
}
|
||||
// Use this as a trigger to do optimization
|
||||
optimize_counter += 1;
|
||||
if optimize_counter > EVENT_COUNT_OPTIMIZE_TRIGGER {
|
||||
info!("running database optimizer");
|
||||
optimize_counter = 0;
|
||||
optimize_db(&mut pool.get()?).ok();
|
||||
backup_pause_counter += 1;
|
||||
if backup_pause_counter > EVENT_COUNT_BACKUP_PAUSE_TRIGGER {
|
||||
info!("pausing db write thread for a moment...");
|
||||
thread::sleep(Duration::from_millis(500));
|
||||
backup_pause_counter = 0
|
||||
}
|
||||
}
|
||||
|
||||
@@ -320,7 +363,8 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
|
||||
)?;
|
||||
if ins_count == 0 {
|
||||
// if the event was a duplicate, no need to insert event or
|
||||
// pubkey references. This will abort the txn.
|
||||
// pubkey references.
|
||||
tx.rollback().ok();
|
||||
return Ok(ins_count);
|
||||
}
|
||||
// remember primary key of the event most recently inserted.
|
||||
@@ -338,9 +382,9 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
|
||||
// if tagvalue is lowercase hex;
|
||||
if is_lower_hex(tagval) && (tagval.len() % 2 == 0) {
|
||||
tx.execute(
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
|
||||
params![ev_id, &tagname, hex::decode(tagval).ok()],
|
||||
)?;
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
|
||||
params![ev_id, &tagname, hex::decode(tagval).ok()],
|
||||
)?;
|
||||
} else {
|
||||
tx.execute(
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)",
|
||||
@@ -555,7 +599,10 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
let str_clause = format!("value IN ({})", repeat_vars(str_vals.len()));
|
||||
let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
|
||||
// find evidence of the target tag name/value existing for this event.
|
||||
let tag_clause = format!("e.id IN (SELECT e.id FROM event e LEFT JOIN tag t on e.id=t.event_id WHERE hidden!=TRUE and (name=? AND ({} OR {})))", str_clause, blob_clause);
|
||||
let tag_clause = format!(
|
||||
"e.id IN (SELECT e.id FROM event e LEFT JOIN tag t on e.id=t.event_id WHERE hidden!=TRUE and (name=? AND ({} OR {})))",
|
||||
str_clause, blob_clause
|
||||
);
|
||||
// add the tag name as the first parameter
|
||||
params.push(Box::new(key.to_string()));
|
||||
// add all tag values that are plain strings as params
|
||||
@@ -614,13 +661,47 @@ fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
(query, params)
|
||||
}
|
||||
|
||||
fn log_pool_stats(pool: &SqlitePool) {
|
||||
/// Check if the pool is fully utilized
|
||||
fn _pool_at_capacity(pool: &SqlitePool) -> bool {
|
||||
let state: r2d2::State = pool.state();
|
||||
state.idle_connections == 0
|
||||
}
|
||||
|
||||
/// Log pool stats
|
||||
fn log_pool_stats(name: &str, pool: &SqlitePool) {
|
||||
let state: r2d2::State = pool.state();
|
||||
let in_use_cxns = state.connections - state.idle_connections;
|
||||
debug!(
|
||||
"DB pool usage (in_use: {}, available: {})",
|
||||
in_use_cxns, state.connections
|
||||
trace!(
|
||||
"DB pool {:?} usage (in_use: {}, available: {})",
|
||||
name,
|
||||
in_use_cxns,
|
||||
state.connections
|
||||
);
|
||||
if state.connections == in_use_cxns {
|
||||
debug!("DB pool {:?} is empty (in_use: {})", name, in_use_cxns);
|
||||
}
|
||||
}
|
||||
|
||||
/// Perform database maintenance on a regular basis
|
||||
pub async fn db_maintenance(pool: SqlitePool) {
|
||||
tokio::task::spawn(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(Duration::from_secs(EVENT_MAINTENANCE_FREQ_SEC)) => {
|
||||
if let Ok(mut conn) = pool.get() {
|
||||
// the busy timer will block writers, so don't set
|
||||
// this any higher than you want max latency for event
|
||||
// writes.
|
||||
conn.busy_timeout(Duration::from_secs(1)).ok();
|
||||
debug!("running database optimizer");
|
||||
optimize_db(&mut conn).ok();
|
||||
debug!("running wal_checkpoint(TRUNCATE)");
|
||||
checkpoint_db(&mut conn).ok();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Perform a database query using a subscription.
|
||||
@@ -636,20 +717,42 @@ pub async fn db_query(
|
||||
query_tx: tokio::sync::mpsc::Sender<QueryResult>,
|
||||
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
|
||||
) {
|
||||
let start = Instant::now();
|
||||
let pre_spawn_start = Instant::now();
|
||||
task::spawn_blocking(move || {
|
||||
debug!("moved DB query to thread in {:?}", start.elapsed());
|
||||
let db_queue_time = pre_spawn_start.elapsed();
|
||||
// if the queue time was very long (>5 seconds), spare the DB and abort.
|
||||
if db_queue_time > Duration::from_secs(5) {
|
||||
info!(
|
||||
"shedding DB query load from {:?} (cid: {}, sub: {:?})",
|
||||
db_queue_time, client_id, sub.id
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
// otherwise, report queuing time if it is slow
|
||||
else if db_queue_time > Duration::from_secs(1) {
|
||||
debug!(
|
||||
"(slow) DB query queued for {:?} (cid: {}, sub: {:?})",
|
||||
db_queue_time, client_id, sub.id
|
||||
);
|
||||
}
|
||||
let start = Instant::now();
|
||||
let mut row_count: usize = 0;
|
||||
// generate SQL query
|
||||
let (q, p) = query_from_sub(&sub);
|
||||
debug!("SQL generated in {:?}", start.elapsed());
|
||||
let sql_gen_elapsed = start.elapsed();
|
||||
if sql_gen_elapsed > Duration::from_millis(10) {
|
||||
debug!("SQL (slow) generated in {:?}", start.elapsed());
|
||||
}
|
||||
// show pool stats
|
||||
log_pool_stats(&pool);
|
||||
log_pool_stats("reader", &pool);
|
||||
// cutoff for displaying slow queries
|
||||
let slow_cutoff = Duration::from_millis(2000);
|
||||
// any client that doesn't cause us to generate new rows in 5
|
||||
// seconds gets dropped.
|
||||
let abort_cutoff = Duration::from_secs(5);
|
||||
let start = Instant::now();
|
||||
let mut slow_first_event;
|
||||
let mut last_successful_send = Instant::now();
|
||||
if let Ok(conn) = pool.get() {
|
||||
// execute the query. Don't cache, since queries vary so much.
|
||||
let mut stmt = conn.prepare(&q)?;
|
||||
@@ -667,12 +770,12 @@ pub async fn db_query(
|
||||
}
|
||||
// logging for slow queries; show sub and SQL.
|
||||
// to reduce logging; only show 1/16th of clients (leading 0)
|
||||
if slow_first_event && client_id.starts_with('0') {
|
||||
info!(
|
||||
if row_count == 0 && slow_first_event && client_id.starts_with("0") {
|
||||
debug!(
|
||||
"query req (slow): {:?} (cid: {}, sub: {:?})",
|
||||
sub, client_id, sub.id
|
||||
);
|
||||
info!(
|
||||
debug!(
|
||||
"query string (slow): {} (cid: {}, sub: {:?})",
|
||||
q, client_id, sub.id
|
||||
);
|
||||
@@ -697,12 +800,33 @@ pub async fn db_query(
|
||||
}
|
||||
row_count += 1;
|
||||
let event_json = row.get(0)?;
|
||||
loop {
|
||||
if query_tx.capacity() != 0 {
|
||||
// we have capacity to add another item
|
||||
break;
|
||||
} else {
|
||||
// the queue is full
|
||||
trace!("db reader thread is stalled");
|
||||
if last_successful_send + abort_cutoff < Instant::now() {
|
||||
// the queue has been full for too long, abort
|
||||
info!("aborting database query due to slow client");
|
||||
let ok: Result<()> = Ok(());
|
||||
return ok;
|
||||
}
|
||||
// give the queue a chance to clear before trying again
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
}
|
||||
}
|
||||
// TODO: we could use try_send, but we'd have to juggle
|
||||
// getting the query result back as part of the error
|
||||
// result.
|
||||
query_tx
|
||||
.blocking_send(QueryResult {
|
||||
sub_id: sub.get_id(),
|
||||
event: event_json,
|
||||
})
|
||||
.ok();
|
||||
last_successful_send = Instant::now();
|
||||
}
|
||||
query_tx
|
||||
.blocking_send(QueryResult {
|
||||
@@ -711,10 +835,11 @@ pub async fn db_query(
|
||||
})
|
||||
.ok();
|
||||
debug!(
|
||||
"query completed in {:?} (cid: {}, sub: {:?}, rows: {})",
|
||||
start.elapsed(),
|
||||
"query completed in {:?} (cid: {}, sub: {:?}, db_time: {:?}, rows: {})",
|
||||
pre_spawn_start.elapsed(),
|
||||
client_id,
|
||||
sub.id,
|
||||
start.elapsed(),
|
||||
row_count
|
||||
);
|
||||
} else {
|
||||
|
@@ -517,7 +517,7 @@ impl Verifier {
|
||||
Ok(updated) => {
|
||||
if updated != 0 {
|
||||
info!(
|
||||
"persisted event: {:?} in {:?}",
|
||||
"persisted event (new verified pubkey): {:?} in {:?}",
|
||||
event.get_event_id_prefix(),
|
||||
start.elapsed()
|
||||
);
|
||||
@@ -721,7 +721,7 @@ pub fn query_oldest_user_verification(
|
||||
earliest: u64,
|
||||
) -> Result<VerificationRecord> {
|
||||
let tx = conn.transaction()?;
|
||||
let query = "SELECT v.id, v.name, e.event_hash, e.author, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v LEFT JOIN event e ON e.id=v.metadata_event WHERE (v.verified_at < ? OR v.verified_at IS NULL) AND (v.failed_at < ? OR v.failed_at IS NULL) ORDER BY v.verified_at ASC, v.failed_at ASC LIMIT 1;";
|
||||
let query = "SELECT v.id, v.name, e.event_hash, e.author, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v INNER JOIN event e ON e.id=v.metadata_event WHERE (v.verified_at < ? OR v.verified_at IS NULL) AND (v.failed_at < ? OR v.failed_at IS NULL) ORDER BY v.verified_at ASC, v.failed_at ASC LIMIT 1;";
|
||||
let mut stmt = tx.prepare_cached(query)?;
|
||||
let fields = stmt.query_row(params![earliest, earliest], |r| {
|
||||
let rowid: u64 = r.get(0)?;
|
||||
|
@@ -16,7 +16,7 @@ pub const STARTUP_SQL: &str = r##"
|
||||
PRAGMA main.synchronous=NORMAL;
|
||||
PRAGMA foreign_keys = ON;
|
||||
PRAGMA journal_size_limit=32768;
|
||||
pragma mmap_size = 1073741824; -- 1024MB of mmap
|
||||
pragma mmap_size = 17179869184; -- cap mmap at 16GB
|
||||
"##;
|
||||
|
||||
/// Latest database version
|
||||
|
@@ -22,6 +22,7 @@ use hyper::upgrade::Upgraded;
|
||||
use hyper::{
|
||||
header, server::conn::AddrStream, upgrade, Body, Request, Response, Server, StatusCode,
|
||||
};
|
||||
use rusqlite::OpenFlags;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use std::collections::HashMap;
|
||||
@@ -248,10 +249,10 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
|
||||
// limit concurrent SQLite blocking threads
|
||||
.max_blocking_threads(settings.limits.max_blocking_threads)
|
||||
.on_thread_start(|| {
|
||||
debug!("started new thread");
|
||||
trace!("started new thread");
|
||||
})
|
||||
.on_thread_stop(|| {
|
||||
debug!("stopping thread");
|
||||
trace!("stopping thread");
|
||||
})
|
||||
.build()
|
||||
.unwrap();
|
||||
@@ -310,6 +311,17 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
|
||||
}
|
||||
}
|
||||
}
|
||||
// build a connection pool for DB maintenance
|
||||
let maintenance_pool = db::build_pool(
|
||||
"maintenance writer",
|
||||
&settings,
|
||||
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
|
||||
1,
|
||||
1,
|
||||
false,
|
||||
);
|
||||
db::db_maintenance(maintenance_pool).await;
|
||||
|
||||
// listen for (external to tokio) shutdown request
|
||||
let controlled_shutdown = invoke_shutdown.clone();
|
||||
tokio::spawn(async move {
|
||||
@@ -339,8 +351,7 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
|
||||
let pool = db::build_pool(
|
||||
"client query",
|
||||
&settings,
|
||||
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
|
||||
| rusqlite::OpenFlags::SQLITE_OPEN_SHARED_CACHE,
|
||||
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY,
|
||||
db_min_conn,
|
||||
db_max_conn,
|
||||
true,
|
||||
@@ -539,7 +550,7 @@ async fn nostr_server(
|
||||
// TODO: serialize at broadcast time, instead of
|
||||
// once for each consumer.
|
||||
if let Ok(event_str) = serde_json::to_string(&global_event) {
|
||||
debug!("sub match for client: {}, sub: {:?}, event: {:?}",
|
||||
trace!("sub match for client: {}, sub: {:?}, event: {:?}",
|
||||
cid, s,
|
||||
global_event.get_event_id_prefix());
|
||||
// create an event response and send it
|
||||
|
Reference in New Issue
Block a user