Compare commits

...

16 Commits

Author SHA1 Message Date
Greg Heartsfield
fb8375aef2 build: bump version to 0.7.10 2022-12-18 13:46:18 -06:00
Greg Heartsfield
88ac31b549 perf: increase channel size for DB communication 2022-12-18 13:44:28 -06:00
Greg Heartsfield
677b7d39e9 improvement: log slow requests that return zero results 2022-12-18 13:42:31 -06:00
Greg Heartsfield
b24d2f9aaa perf: set default blocking threads to lower value 2022-12-18 12:20:57 -06:00
Greg Heartsfield
7a3899d852 build: bump version to 0.7.9 2022-12-18 09:21:07 -06:00
Greg Heartsfield
818108b793 improvement: upgrade multiple dependencies
Updating anyhow v1.0.66 -> v1.0.67
Updating async-trait v0.1.59 -> v0.1.60
Updating cxx v1.0.83 -> v1.0.84
Updating cxx-build v1.0.83 -> v1.0.84
Updating cxxbridge-flags v1.0.83 -> v1.0.84
Updating cxxbridge-macro v1.0.83 -> v1.0.84
Updating itoa v1.0.4 -> v1.0.5
Updating link-cplusplus v1.0.7 -> v1.0.8
Updating proc-macro2 v1.0.47 -> v1.0.48
Updating quote v1.0.21 -> v1.0.22
Updating rustversion v1.0.9 -> v1.0.11
Updating ryu v1.0.11 -> v1.0.12
Updating scratch v1.0.2 -> v1.0.3
Updating serde v1.0.150 -> v1.0.151
Updating serde_derive v1.0.150 -> v1.0.151
Updating serde_json v1.0.89 -> v1.0.90
Updating syn v1.0.105 -> v1.0.106
Updating thiserror v1.0.37 -> v1.0.38
Updating thiserror-impl v1.0.37 -> v1.0.38
Updating unicode-ident v1.0.5 -> v1.0.6
2022-12-18 09:16:09 -06:00
Greg Heartsfield
d10348f7e1 feat: configurable blocking threads 2022-12-18 09:14:04 -06:00
Greg Heartsfield
8598e443d8 wip: add configuration for future feature (client concurrent db limits) 2022-12-17 23:19:48 -06:00
Greg Heartsfield
43222d44e5 feat: perform optimization after seeing many events 2022-12-17 23:18:54 -06:00
Greg Heartsfield
7c1516c4fb perf: add index for tags 2022-12-17 23:17:53 -06:00
Greg Heartsfield
0c72053a49 perf: increase mmap size to 1GB 2022-12-17 23:17:16 -06:00
Greg Heartsfield
3f32ff67ab improvement: minor logging 2022-12-17 23:11:14 -06:00
Greg Heartsfield
0b9778d6ca refactor: simplify tracking of subscriptions 2022-12-17 20:46:58 -06:00
Greg Heartsfield
9be04120c7 build: bump version to 0.7.8 2022-12-17 12:01:43 -06:00
Greg Heartsfield
cc06167e06 perf: add composite index for tag table 2022-12-17 12:01:20 -06:00
Greg Heartsfield
b6e33f044f improvement: limit db connection max lifetime 2022-12-17 10:47:35 -06:00
8 changed files with 184 additions and 86 deletions

82
Cargo.lock generated
View File

@@ -54,9 +54,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.66"
version = "1.0.67"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6"
checksum = "7724808837b77f4b4de9d283820f9d98bcf496d5692934b857a2399d31ff22e6"
[[package]]
name = "async-stream"
@@ -81,9 +81,9 @@ dependencies = [
[[package]]
name = "async-trait"
version = "0.1.59"
version = "0.1.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31e6e93155431f3931513b243d371981bb2770112b370c82745a1d19d2f99364"
checksum = "677d1d8ab452a3936018a687b20e6f7cf5363d713b732b8884001317b0e48aa3"
dependencies = [
"proc-macro2",
"quote",
@@ -382,9 +382,9 @@ dependencies = [
[[package]]
name = "cxx"
version = "1.0.83"
version = "1.0.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdf07d07d6531bfcdbe9b8b739b104610c6508dcc4d63b410585faf338241daf"
checksum = "27874566aca772cb515af4c6e997b5fe2119820bca447689145e39bb734d19a0"
dependencies = [
"cc",
"cxxbridge-flags",
@@ -394,9 +394,9 @@ dependencies = [
[[package]]
name = "cxx-build"
version = "1.0.83"
version = "1.0.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2eb5b96ecdc99f72657332953d4d9c50135af1bac34277801cc3937906ebd39"
checksum = "e7bb951f2523a49533003656a72121306b225ec16a49a09dc6b0ba0d6f3ec3c0"
dependencies = [
"cc",
"codespan-reporting",
@@ -409,15 +409,15 @@ dependencies = [
[[package]]
name = "cxxbridge-flags"
version = "1.0.83"
version = "1.0.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac040a39517fd1674e0f32177648334b0f4074625b5588a64519804ba0553b12"
checksum = "be778b6327031c1c7b61dd2e48124eee5361e6aa76b8de93692f011b08870ab4"
[[package]]
name = "cxxbridge-macro"
version = "1.0.83"
version = "1.0.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1362b0ddcfc4eb0a1f57b68bd77dd99f0e826958a96abd0ae9bd092e114ffed6"
checksum = "7b8a2b87662fe5a0a0b38507756ab66aff32638876a0866e5a5fc82ceb07ee49"
dependencies = [
"proc-macro2",
"quote",
@@ -901,9 +901,9 @@ dependencies = [
[[package]]
name = "itoa"
version = "1.0.4"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4217ad341ebadf8d8e724e264f13e593e0648f5b3e94b3896a5df283be015ecc"
checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440"
[[package]]
name = "js-sys"
@@ -950,9 +950,9 @@ dependencies = [
[[package]]
name = "link-cplusplus"
version = "1.0.7"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9272ab7b96c9046fbc5bc56c06c117cb639fe2d509df0c421cad82d2915cf369"
checksum = "ecd207c9c713c34f95a097a5b029ac2ce6010530c7b49d7fea24d977dede04f5"
dependencies = [
"cc",
]
@@ -1096,7 +1096,7 @@ checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]]
name = "nostr-rs-relay"
version = "0.7.7"
version = "0.7.10"
dependencies = [
"anyhow",
"bitcoin_hashes",
@@ -1414,9 +1414,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "proc-macro2"
version = "1.0.47"
version = "1.0.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725"
checksum = "e9d89e5dba24725ae5678020bf8f1357a9aa7ff10736b551adbcd3f8d17d766f"
dependencies = [
"unicode-ident",
]
@@ -1472,9 +1472,9 @@ dependencies = [
[[package]]
name = "quote"
version = "1.0.21"
version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbe448f377a7d6961e30f5955f9b8d106c3f5e449d493ee1b125c1d43c2b5179"
checksum = "556d0f47a940e895261e77dc200d5eadfc6ef644c179c6f5edfc105e3a2292c8"
dependencies = [
"proc-macro2",
]
@@ -1736,15 +1736,15 @@ dependencies = [
[[package]]
name = "rustversion"
version = "1.0.9"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97477e48b4cf8603ad5f7aaf897467cf42ab4218a38ef76fb14c2d6773a6d6a8"
checksum = "5583e89e108996506031660fe09baa5011b9dd0341b89029313006d1fb508d70"
[[package]]
name = "ryu"
version = "1.0.11"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09"
checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde"
[[package]]
name = "schannel"
@@ -1773,9 +1773,9 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "scratch"
version = "1.0.2"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8132065adcfd6e02db789d9285a0deb2f3fcb04002865ab67d5fb103533898"
checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2"
[[package]]
name = "secp256k1"
@@ -1823,18 +1823,18 @@ dependencies = [
[[package]]
name = "serde"
version = "1.0.150"
version = "1.0.151"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e326c9ec8042f1b5da33252c8a37e9ffbd2c9bef0155215b6e6c80c790e05f91"
checksum = "97fed41fc1a24994d044e6db6935e69511a1153b52c15eb42493b26fa87feba0"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.150"
version = "1.0.151"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42a3df25b0713732468deadad63ab9da1f1fd75a48a15024b50363f128db627e"
checksum = "255abe9a125a985c05190d687b320c12f9b1f0b99445e608c21ba0782c719ad8"
dependencies = [
"proc-macro2",
"quote",
@@ -1843,9 +1843,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.89"
version = "1.0.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "020ff22c755c2ed3f8cf162dbb41a7268d934702f3ed3631656ea597e08fc3db"
checksum = "8778cc0b528968fe72abec38b5db5a20a70d148116cd9325d2bc5f5180ca3faf"
dependencies = [
"indexmap",
"itoa",
@@ -1920,9 +1920,9 @@ dependencies = [
[[package]]
name = "syn"
version = "1.0.105"
version = "1.0.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60b9b43d45702de4c839cb9b51d9f529c5dd26a4aff255b42b1ebc03e88ee908"
checksum = "09ee3a69cd2c7e06684677e5629b3878b253af05e4714964204279c6bc02cf0b"
dependencies = [
"proc-macro2",
"quote",
@@ -1960,18 +1960,18 @@ dependencies = [
[[package]]
name = "thiserror"
version = "1.0.37"
version = "1.0.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e"
checksum = "6a9cd18aa97d5c45c6603caea1da6628790b37f7a34b6ca89522331c5180fed0"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.37"
version = "1.0.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb"
checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f"
dependencies = [
"proc-macro2",
"quote",
@@ -2330,9 +2330,9 @@ checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992"
[[package]]
name = "unicode-ident"
version = "1.0.5"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3"
checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc"
[[package]]
name = "unicode-normalization"

View File

@@ -1,6 +1,6 @@
[package]
name = "nostr-rs-relay"
version = "0.7.7"
version = "0.7.10"
edition = "2021"
authors = ["Greg Heartsfield <scsibug@imap.cc>"]
description = "A relay implementation for the Nostr protocol"

View File

@@ -62,14 +62,25 @@ reject_future_seconds = 1800
[limits]
# Limit events created per second, averaged over one minute. Must be
# an integer. If not set (or set to 0), defaults to unlimited.
#messages_per_sec = 0
# an integer. If not set (or set to 0), defaults to unlimited. Note:
# this is for the server as a whole, not per-connection.
# messages_per_sec = 0
# Limit client subscriptions created per second, averaged over one
# minute. Must be an integer. If not set (or set to 0), defaults to
# unlimited.
#subscriptions_per_min = 0
# UNIMPLEMENTED...
# Limit how many concurrent database connections a client can have.
# This prevents a single client from starting too many expensive
# database queries. Must be an integer. If not set (or set to 0),
# defaults to unlimited (subject to subscription limits).
#db_conns_per_client = 0
# Limit blocking threads used for database connections. Defaults to 16.
#max_blocking_threads = 16
# Limit the maximum size of an EVENT message. Defaults to 128 KB.
# Set to 0 for unlimited.
#max_event_bytes = 131072

View File

@@ -53,7 +53,9 @@ pub struct Retention {
pub struct Limits {
pub messages_per_sec: Option<u32>, // Artificially slow down event writing to limit disk consumption (averaged over 1 minute)
pub subscriptions_per_min: Option<u32>, // Artificially slow down request (db query) creation to prevent abuse (averaged over 1 minute)
pub max_event_bytes: Option<usize>, // Maximum size of an EVENT message
pub db_conns_per_client: Option<u32>, // How many concurrent database queries (not subscriptions) may a client have?
pub max_blocking_threads: usize,
pub max_event_bytes: Option<usize>, // Maximum size of an EVENT message
pub max_ws_message_bytes: Option<usize>,
pub max_ws_frame_bytes: Option<usize>,
pub broadcast_buffer: usize, // events to buffer for subscribers (prevents slow readers from consuming memory)
@@ -216,6 +218,8 @@ impl Default for Settings {
limits: Limits {
messages_per_sec: None,
subscriptions_per_min: None,
db_conns_per_client: None,
max_blocking_threads: 16,
max_event_bytes: Some(2 << 17), // 128K
max_ws_message_bytes: Some(2 << 17), // 128K
max_ws_frame_bytes: Some(2 << 17), // 128K

View File

@@ -46,6 +46,11 @@ impl ClientConn {
&self.subscriptions
}
/// Check if the given subscription already exists
pub fn has_subscription(&self, sub: &Subscription) -> bool {
self.subscriptions.values().any(|x| x == sub)
}
/// Get a short prefix of the client's unique identifier, suitable
/// for logging.
#[must_use]

View File

@@ -38,6 +38,8 @@ pub struct SubmittedEvent {
/// Database file
pub const DB_FILE: &str = "nostr.db";
/// How many persisted events before optimization is triggered
pub const EVENT_COUNT_OPTIMIZE_TRIGGER: usize = 500;
/// Build a database connection pool.
/// # Panics
@@ -76,6 +78,7 @@ pub fn build_pool(
.test_on_check_out(true) // no noticeable performance hit
.min_idle(Some(min_size))
.max_size(max_size)
.max_lifetime(Some(Duration::from_secs(60)))
.build(manager)
.unwrap();
info!(
@@ -85,6 +88,12 @@ pub fn build_pool(
pool
}
/// Perform normal maintenance
pub fn optimize_db(conn: &mut PooledConnection) -> Result<()> {
conn.execute_batch("PRAGMA optimize;")?;
Ok(())
}
/// Spawn a database writer that persists events to the SQLite store.
pub async fn db_writer(
settings: Settings,
@@ -124,6 +133,8 @@ pub async fn db_writer(
let rps_setting = settings.limits.messages_per_sec;
let mut most_recent_rate_limit = Instant::now();
let mut lim_opt = None;
// Keep rough track of events so we can run optimize eventually.
let mut optimize_counter: usize = 0;
let clock = governor::clock::QuantaClock::default();
if let Some(rps) = rps_setting {
if rps > 0 {
@@ -256,6 +267,13 @@ pub async fn db_writer(
notice_tx.try_send(Notice::error(event.id, msg)).ok();
}
}
// Use this as a trigger to do optimization
optimize_counter += 1;
if optimize_counter > EVENT_COUNT_OPTIMIZE_TRIGGER {
info!("running database optimizer");
optimize_counter = 0;
optimize_db(&mut pool.get()?).ok();
}
}
// use rate limit, if defined, and if an event was actually written.
@@ -618,50 +636,60 @@ pub async fn db_query(
query_tx: tokio::sync::mpsc::Sender<QueryResult>,
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
) {
let start = Instant::now();
task::spawn_blocking(move || {
let mut row_count: usize = 0;
debug!("moved DB query to thread in {:?}", start.elapsed());
let start = Instant::now();
let mut row_count: usize = 0;
// generate SQL query
let (q, p) = query_from_sub(&sub);
trace!("SQL generated in {:?}", start.elapsed());
debug!("SQL generated in {:?}", start.elapsed());
// show pool stats
log_pool_stats(&pool);
// cutoff for displaying slow queries
let slow_cutoff = Duration::from_millis(1000);
let start = Instant::now();
let mut slow_first_event;
if let Ok(conn) = pool.get() {
// execute the query. Don't cache, since queries vary so much.
let mut stmt = conn.prepare(&q)?;
let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
let mut first_result = true;
while let Some(row) = event_rows.next()? {
let first_event_elapsed = start.elapsed();
slow_first_event = first_event_elapsed >= slow_cutoff;
if first_result {
let first_result_elapsed = start.elapsed();
// logging for slow queries; show sub and SQL
if first_result_elapsed >= slow_cutoff {
info!(
"going to query for: {:?} (cid: {}, sub: {:?})",
sub, client_id, sub.id
);
info!(
"final query string (slow): {} (cid: {}, sub: {:?})",
q, client_id, sub.id
);
} else {
trace!(
"going to query for: {:?} (cid: {}, sub: {:?})",
sub,
client_id,
sub.id
);
trace!("final query string: {}", q);
}
debug!(
"first result in {:?} (cid: {}, sub: {:?})",
first_result_elapsed, client_id, sub.id
first_event_elapsed, client_id, sub.id
);
first_result = false;
}
// logging for slow queries; show sub and SQL
//
if slow_first_event {
info!(
"query req (slow): {:?} (cid: {}, sub: {:?})",
sub, client_id, sub.id
);
info!(
"query string (slow): {} (cid: {}, sub: {:?})",
q, client_id, sub.id
);
} else {
trace!(
"query req: {:?} (cid: {}, sub: {:?})",
sub,
client_id,
sub.id
);
trace!(
"query string: {} (cid: {}, sub: {:?})",
q,
client_id,
sub.id
);
}
// check if this is still active
// TODO: check every N rows
if abandon_query_rx.try_recv().is_ok() {

View File

@@ -16,11 +16,11 @@ pub const STARTUP_SQL: &str = r##"
PRAGMA main.synchronous=NORMAL;
PRAGMA foreign_keys = ON;
PRAGMA journal_size_limit=32768;
pragma mmap_size = 536870912; -- 512MB of mmap
pragma mmap_size = 1073741824; -- 1024MB of mmap
"##;
/// Latest database version
pub const DB_VERSION: usize = 9;
pub const DB_VERSION: usize = 11;
/// Schema definition
const INIT_SQL: &str = formatcp!(
@@ -67,6 +67,8 @@ FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS tag_val_index ON tag(value);
CREATE INDEX IF NOT EXISTS tag_val_hex_index ON tag(value_hex);
CREATE INDEX IF NOT EXISTS tag_composite_index ON tag(event_id,name,value_hex,value);
CREATE INDEX IF NOT EXISTS tag_name_eid_index ON tag(name,event_id,value_hex);
-- NIP-05 User Validation
CREATE TABLE IF NOT EXISTS user_verification (
@@ -163,6 +165,12 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<()> {
if curr_version == 8 {
curr_version = mig_8_to_9(conn)?;
}
if curr_version == 9 {
curr_version = mig_9_to_10(conn)?;
}
if curr_version == 10 {
curr_version = mig_10_to_11(conn)?;
}
if curr_version == DB_VERSION {
info!(
@@ -419,5 +427,45 @@ PRAGMA user_version = 9;
panic!("database could not be upgraded");
}
}
Ok(8)
Ok(9)
}
fn mig_9_to_10(conn: &mut PooledConnection) -> Result<usize> {
info!("database schema needs update from 9->10");
// Those old indexes were actually helpful...
let upgrade_sql = r##"
CREATE INDEX IF NOT EXISTS tag_composite_index ON tag(event_id,name,value_hex,value);
PRAGMA user_version = 10;
"##;
match conn.execute_batch(upgrade_sql) {
Ok(()) => {
info!("database schema upgraded v9 -> v10");
}
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be upgraded");
}
}
Ok(10)
}
fn mig_10_to_11(conn: &mut PooledConnection) -> Result<usize> {
info!("database schema needs update from 10->11");
// Those old indexes were actually helpful...
let upgrade_sql = r##"
CREATE INDEX IF NOT EXISTS tag_name_eid_index ON tag(name,event_id,value_hex);
reindex;
pragma optimize;
PRAGMA user_version = 11;
"##;
match conn.execute_batch(upgrade_sql) {
Ok(()) => {
info!("database schema upgraded v10 -> v11");
}
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be upgraded");
}
}
Ok(11)
}

View File

@@ -245,6 +245,14 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
let rt = Builder::new_multi_thread()
.enable_all()
.thread_name("tokio-ws")
// limit concurrent SQLite blocking threads
.max_blocking_threads(settings.limits.max_blocking_threads)
.on_thread_start(|| {
debug!("started new thread");
})
.on_thread_stop(|| {
debug!("stopping thread");
})
.build()
.unwrap();
// start tokio
@@ -456,9 +464,11 @@ async fn nostr_server(
let cid = conn.get_client_prefix();
// Create a channel for receiving query results from the database.
// we will send out the tx handle to any query we generate.
let (query_tx, mut query_rx) = mpsc::channel::<db::QueryResult>(256);
// this has capacity for some of the larger requests we see, which
// should allow the DB thread to release the handle earlier.
let (query_tx, mut query_rx) = mpsc::channel::<db::QueryResult>(20000);
// Create channel for receiving NOTICEs
let (notice_tx, mut notice_rx) = mpsc::channel::<Notice>(32);
let (notice_tx, mut notice_rx) = mpsc::channel::<Notice>(128);
// last time this client sent data (message, ping, etc.)
let mut last_message_time = Instant::now();
@@ -476,8 +486,6 @@ async fn nostr_server(
// when these subscriptions are cancelled, make a message
// available to the executing query so it knows to stop.
let mut running_queries: HashMap<String, oneshot::Sender<()>> = HashMap::new();
// keep track of the subscriptions we have
let mut current_subs: Vec<Subscription> = Vec::new();
// for stats, keep track of how many events the client published,
// and how many it received from queries.
let mut client_published_event_count: usize = 0;
@@ -625,11 +633,10 @@ async fn nostr_server(
// * making a channel to cancel to request later
// * sending a request for a SQL query
// Do nothing if the sub already exists.
if !current_subs.contains(&s) {
if !conn.has_subscription(&s) {
if let Some(ref lim) = sub_lim_opt {
lim.until_ready_with_jitter(jitter).await;
lim.until_ready_with_jitter(jitter).await;
}
current_subs.push(s.clone());
let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>();
match conn.subscribe(s.clone()) {
Ok(()) => {
@@ -637,27 +644,22 @@ async fn nostr_server(
if let Some(previous_query) = running_queries.insert(s.id.to_owned(), abandon_query_tx) {
previous_query.send(()).ok();
}
// start a database query
// start a database query. this spawns a blocking database query on a worker thread.
db::db_query(s, cid.to_owned(), pool.clone(), query_tx.clone(), abandon_query_rx).await;
},
Err(e) => {
info!("Subscription error: {}", e);
info!("Subscription error: {} (cid: {}, sub: {:?})", e, cid, s.id);
ws_stream.send(make_notice_message(Notice::message(format!("Subscription error: {}", e)))).await.ok();
}
}
} else {
info!("client send duplicate subscription, ignoring (cid: {}, sub: {:?})", cid, s.id);
info!("client sent duplicate subscription, ignoring (cid: {}, sub: {:?})", cid, s.id);
}
},
Ok(NostrMessage::CloseMsg(cc)) => {
// closing a request simply removes the subscription.
let parsed : Result<Close> = Result::<Close>::from(cc);
if let Ok(c) = parsed {
// remove from the list of known subs
if let Some(pos) = current_subs.iter().position(|s| *s.id == c.id) {
current_subs.remove(pos);
}
// check if a query is currently
// running, and remove it if so.
let stop_tx = running_queries.remove(&c.id);