mirror of
https://github.com/scsibug/nostr-rs-relay.git
synced 2025-09-01 03:40:46 -04:00
Compare commits
16 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
fb8375aef2 | ||
|
88ac31b549 | ||
|
677b7d39e9 | ||
|
b24d2f9aaa | ||
|
7a3899d852 | ||
|
818108b793 | ||
|
d10348f7e1 | ||
|
8598e443d8 | ||
|
43222d44e5 | ||
|
7c1516c4fb | ||
|
0c72053a49 | ||
|
3f32ff67ab | ||
|
0b9778d6ca | ||
|
9be04120c7 | ||
|
cc06167e06 | ||
|
b6e33f044f |
82
Cargo.lock
generated
82
Cargo.lock
generated
@@ -54,9 +54,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "anyhow"
|
||||
version = "1.0.66"
|
||||
version = "1.0.67"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6"
|
||||
checksum = "7724808837b77f4b4de9d283820f9d98bcf496d5692934b857a2399d31ff22e6"
|
||||
|
||||
[[package]]
|
||||
name = "async-stream"
|
||||
@@ -81,9 +81,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.59"
|
||||
version = "0.1.60"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "31e6e93155431f3931513b243d371981bb2770112b370c82745a1d19d2f99364"
|
||||
checksum = "677d1d8ab452a3936018a687b20e6f7cf5363d713b732b8884001317b0e48aa3"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -382,9 +382,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cxx"
|
||||
version = "1.0.83"
|
||||
version = "1.0.84"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bdf07d07d6531bfcdbe9b8b739b104610c6508dcc4d63b410585faf338241daf"
|
||||
checksum = "27874566aca772cb515af4c6e997b5fe2119820bca447689145e39bb734d19a0"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"cxxbridge-flags",
|
||||
@@ -394,9 +394,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cxx-build"
|
||||
version = "1.0.83"
|
||||
version = "1.0.84"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d2eb5b96ecdc99f72657332953d4d9c50135af1bac34277801cc3937906ebd39"
|
||||
checksum = "e7bb951f2523a49533003656a72121306b225ec16a49a09dc6b0ba0d6f3ec3c0"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"codespan-reporting",
|
||||
@@ -409,15 +409,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cxxbridge-flags"
|
||||
version = "1.0.83"
|
||||
version = "1.0.84"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ac040a39517fd1674e0f32177648334b0f4074625b5588a64519804ba0553b12"
|
||||
checksum = "be778b6327031c1c7b61dd2e48124eee5361e6aa76b8de93692f011b08870ab4"
|
||||
|
||||
[[package]]
|
||||
name = "cxxbridge-macro"
|
||||
version = "1.0.83"
|
||||
version = "1.0.84"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1362b0ddcfc4eb0a1f57b68bd77dd99f0e826958a96abd0ae9bd092e114ffed6"
|
||||
checksum = "7b8a2b87662fe5a0a0b38507756ab66aff32638876a0866e5a5fc82ceb07ee49"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -901,9 +901,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "itoa"
|
||||
version = "1.0.4"
|
||||
version = "1.0.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4217ad341ebadf8d8e724e264f13e593e0648f5b3e94b3896a5df283be015ecc"
|
||||
checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440"
|
||||
|
||||
[[package]]
|
||||
name = "js-sys"
|
||||
@@ -950,9 +950,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "link-cplusplus"
|
||||
version = "1.0.7"
|
||||
version = "1.0.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9272ab7b96c9046fbc5bc56c06c117cb639fe2d509df0c421cad82d2915cf369"
|
||||
checksum = "ecd207c9c713c34f95a097a5b029ac2ce6010530c7b49d7fea24d977dede04f5"
|
||||
dependencies = [
|
||||
"cc",
|
||||
]
|
||||
@@ -1096,7 +1096,7 @@ checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
|
||||
|
||||
[[package]]
|
||||
name = "nostr-rs-relay"
|
||||
version = "0.7.7"
|
||||
version = "0.7.10"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bitcoin_hashes",
|
||||
@@ -1414,9 +1414,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro2"
|
||||
version = "1.0.47"
|
||||
version = "1.0.48"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725"
|
||||
checksum = "e9d89e5dba24725ae5678020bf8f1357a9aa7ff10736b551adbcd3f8d17d766f"
|
||||
dependencies = [
|
||||
"unicode-ident",
|
||||
]
|
||||
@@ -1472,9 +1472,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "quote"
|
||||
version = "1.0.21"
|
||||
version = "1.0.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bbe448f377a7d6961e30f5955f9b8d106c3f5e449d493ee1b125c1d43c2b5179"
|
||||
checksum = "556d0f47a940e895261e77dc200d5eadfc6ef644c179c6f5edfc105e3a2292c8"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
]
|
||||
@@ -1736,15 +1736,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rustversion"
|
||||
version = "1.0.9"
|
||||
version = "1.0.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "97477e48b4cf8603ad5f7aaf897467cf42ab4218a38ef76fb14c2d6773a6d6a8"
|
||||
checksum = "5583e89e108996506031660fe09baa5011b9dd0341b89029313006d1fb508d70"
|
||||
|
||||
[[package]]
|
||||
name = "ryu"
|
||||
version = "1.0.11"
|
||||
version = "1.0.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09"
|
||||
checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde"
|
||||
|
||||
[[package]]
|
||||
name = "schannel"
|
||||
@@ -1773,9 +1773,9 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
|
||||
|
||||
[[package]]
|
||||
name = "scratch"
|
||||
version = "1.0.2"
|
||||
version = "1.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9c8132065adcfd6e02db789d9285a0deb2f3fcb04002865ab67d5fb103533898"
|
||||
checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2"
|
||||
|
||||
[[package]]
|
||||
name = "secp256k1"
|
||||
@@ -1823,18 +1823,18 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.150"
|
||||
version = "1.0.151"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e326c9ec8042f1b5da33252c8a37e9ffbd2c9bef0155215b6e6c80c790e05f91"
|
||||
checksum = "97fed41fc1a24994d044e6db6935e69511a1153b52c15eb42493b26fa87feba0"
|
||||
dependencies = [
|
||||
"serde_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_derive"
|
||||
version = "1.0.150"
|
||||
version = "1.0.151"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "42a3df25b0713732468deadad63ab9da1f1fd75a48a15024b50363f128db627e"
|
||||
checksum = "255abe9a125a985c05190d687b320c12f9b1f0b99445e608c21ba0782c719ad8"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -1843,9 +1843,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde_json"
|
||||
version = "1.0.89"
|
||||
version = "1.0.90"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "020ff22c755c2ed3f8cf162dbb41a7268d934702f3ed3631656ea597e08fc3db"
|
||||
checksum = "8778cc0b528968fe72abec38b5db5a20a70d148116cd9325d2bc5f5180ca3faf"
|
||||
dependencies = [
|
||||
"indexmap",
|
||||
"itoa",
|
||||
@@ -1920,9 +1920,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "1.0.105"
|
||||
version = "1.0.106"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "60b9b43d45702de4c839cb9b51d9f529c5dd26a4aff255b42b1ebc03e88ee908"
|
||||
checksum = "09ee3a69cd2c7e06684677e5629b3878b253af05e4714964204279c6bc02cf0b"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -1960,18 +1960,18 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
version = "1.0.37"
|
||||
version = "1.0.38"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e"
|
||||
checksum = "6a9cd18aa97d5c45c6603caea1da6628790b37f7a34b6ca89522331c5180fed0"
|
||||
dependencies = [
|
||||
"thiserror-impl",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thiserror-impl"
|
||||
version = "1.0.37"
|
||||
version = "1.0.38"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb"
|
||||
checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -2330,9 +2330,9 @@ checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992"
|
||||
|
||||
[[package]]
|
||||
name = "unicode-ident"
|
||||
version = "1.0.5"
|
||||
version = "1.0.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3"
|
||||
checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc"
|
||||
|
||||
[[package]]
|
||||
name = "unicode-normalization"
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "nostr-rs-relay"
|
||||
version = "0.7.7"
|
||||
version = "0.7.10"
|
||||
edition = "2021"
|
||||
authors = ["Greg Heartsfield <scsibug@imap.cc>"]
|
||||
description = "A relay implementation for the Nostr protocol"
|
||||
|
15
config.toml
15
config.toml
@@ -62,14 +62,25 @@ reject_future_seconds = 1800
|
||||
|
||||
[limits]
|
||||
# Limit events created per second, averaged over one minute. Must be
|
||||
# an integer. If not set (or set to 0), defaults to unlimited.
|
||||
#messages_per_sec = 0
|
||||
# an integer. If not set (or set to 0), defaults to unlimited. Note:
|
||||
# this is for the server as a whole, not per-connection.
|
||||
# messages_per_sec = 0
|
||||
|
||||
# Limit client subscriptions created per second, averaged over one
|
||||
# minute. Must be an integer. If not set (or set to 0), defaults to
|
||||
# unlimited.
|
||||
#subscriptions_per_min = 0
|
||||
|
||||
# UNIMPLEMENTED...
|
||||
# Limit how many concurrent database connections a client can have.
|
||||
# This prevents a single client from starting too many expensive
|
||||
# database queries. Must be an integer. If not set (or set to 0),
|
||||
# defaults to unlimited (subject to subscription limits).
|
||||
#db_conns_per_client = 0
|
||||
|
||||
# Limit blocking threads used for database connections. Defaults to 16.
|
||||
#max_blocking_threads = 16
|
||||
|
||||
# Limit the maximum size of an EVENT message. Defaults to 128 KB.
|
||||
# Set to 0 for unlimited.
|
||||
#max_event_bytes = 131072
|
||||
|
@@ -53,7 +53,9 @@ pub struct Retention {
|
||||
pub struct Limits {
|
||||
pub messages_per_sec: Option<u32>, // Artificially slow down event writing to limit disk consumption (averaged over 1 minute)
|
||||
pub subscriptions_per_min: Option<u32>, // Artificially slow down request (db query) creation to prevent abuse (averaged over 1 minute)
|
||||
pub max_event_bytes: Option<usize>, // Maximum size of an EVENT message
|
||||
pub db_conns_per_client: Option<u32>, // How many concurrent database queries (not subscriptions) may a client have?
|
||||
pub max_blocking_threads: usize,
|
||||
pub max_event_bytes: Option<usize>, // Maximum size of an EVENT message
|
||||
pub max_ws_message_bytes: Option<usize>,
|
||||
pub max_ws_frame_bytes: Option<usize>,
|
||||
pub broadcast_buffer: usize, // events to buffer for subscribers (prevents slow readers from consuming memory)
|
||||
@@ -216,6 +218,8 @@ impl Default for Settings {
|
||||
limits: Limits {
|
||||
messages_per_sec: None,
|
||||
subscriptions_per_min: None,
|
||||
db_conns_per_client: None,
|
||||
max_blocking_threads: 16,
|
||||
max_event_bytes: Some(2 << 17), // 128K
|
||||
max_ws_message_bytes: Some(2 << 17), // 128K
|
||||
max_ws_frame_bytes: Some(2 << 17), // 128K
|
||||
|
@@ -46,6 +46,11 @@ impl ClientConn {
|
||||
&self.subscriptions
|
||||
}
|
||||
|
||||
/// Check if the given subscription already exists
|
||||
pub fn has_subscription(&self, sub: &Subscription) -> bool {
|
||||
self.subscriptions.values().any(|x| x == sub)
|
||||
}
|
||||
|
||||
/// Get a short prefix of the client's unique identifier, suitable
|
||||
/// for logging.
|
||||
#[must_use]
|
||||
|
74
src/db.rs
74
src/db.rs
@@ -38,6 +38,8 @@ pub struct SubmittedEvent {
|
||||
|
||||
/// Database file
|
||||
pub const DB_FILE: &str = "nostr.db";
|
||||
/// How many persisted events before optimization is triggered
|
||||
pub const EVENT_COUNT_OPTIMIZE_TRIGGER: usize = 500;
|
||||
|
||||
/// Build a database connection pool.
|
||||
/// # Panics
|
||||
@@ -76,6 +78,7 @@ pub fn build_pool(
|
||||
.test_on_check_out(true) // no noticeable performance hit
|
||||
.min_idle(Some(min_size))
|
||||
.max_size(max_size)
|
||||
.max_lifetime(Some(Duration::from_secs(60)))
|
||||
.build(manager)
|
||||
.unwrap();
|
||||
info!(
|
||||
@@ -85,6 +88,12 @@ pub fn build_pool(
|
||||
pool
|
||||
}
|
||||
|
||||
/// Perform normal maintenance
|
||||
pub fn optimize_db(conn: &mut PooledConnection) -> Result<()> {
|
||||
conn.execute_batch("PRAGMA optimize;")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Spawn a database writer that persists events to the SQLite store.
|
||||
pub async fn db_writer(
|
||||
settings: Settings,
|
||||
@@ -124,6 +133,8 @@ pub async fn db_writer(
|
||||
let rps_setting = settings.limits.messages_per_sec;
|
||||
let mut most_recent_rate_limit = Instant::now();
|
||||
let mut lim_opt = None;
|
||||
// Keep rough track of events so we can run optimize eventually.
|
||||
let mut optimize_counter: usize = 0;
|
||||
let clock = governor::clock::QuantaClock::default();
|
||||
if let Some(rps) = rps_setting {
|
||||
if rps > 0 {
|
||||
@@ -256,6 +267,13 @@ pub async fn db_writer(
|
||||
notice_tx.try_send(Notice::error(event.id, msg)).ok();
|
||||
}
|
||||
}
|
||||
// Use this as a trigger to do optimization
|
||||
optimize_counter += 1;
|
||||
if optimize_counter > EVENT_COUNT_OPTIMIZE_TRIGGER {
|
||||
info!("running database optimizer");
|
||||
optimize_counter = 0;
|
||||
optimize_db(&mut pool.get()?).ok();
|
||||
}
|
||||
}
|
||||
|
||||
// use rate limit, if defined, and if an event was actually written.
|
||||
@@ -618,50 +636,60 @@ pub async fn db_query(
|
||||
query_tx: tokio::sync::mpsc::Sender<QueryResult>,
|
||||
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
|
||||
) {
|
||||
let start = Instant::now();
|
||||
task::spawn_blocking(move || {
|
||||
let mut row_count: usize = 0;
|
||||
debug!("moved DB query to thread in {:?}", start.elapsed());
|
||||
let start = Instant::now();
|
||||
let mut row_count: usize = 0;
|
||||
// generate SQL query
|
||||
let (q, p) = query_from_sub(&sub);
|
||||
trace!("SQL generated in {:?}", start.elapsed());
|
||||
debug!("SQL generated in {:?}", start.elapsed());
|
||||
// show pool stats
|
||||
log_pool_stats(&pool);
|
||||
// cutoff for displaying slow queries
|
||||
let slow_cutoff = Duration::from_millis(1000);
|
||||
let start = Instant::now();
|
||||
let mut slow_first_event;
|
||||
if let Ok(conn) = pool.get() {
|
||||
// execute the query. Don't cache, since queries vary so much.
|
||||
let mut stmt = conn.prepare(&q)?;
|
||||
let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
|
||||
let mut first_result = true;
|
||||
while let Some(row) = event_rows.next()? {
|
||||
let first_event_elapsed = start.elapsed();
|
||||
slow_first_event = first_event_elapsed >= slow_cutoff;
|
||||
if first_result {
|
||||
let first_result_elapsed = start.elapsed();
|
||||
// logging for slow queries; show sub and SQL
|
||||
if first_result_elapsed >= slow_cutoff {
|
||||
info!(
|
||||
"going to query for: {:?} (cid: {}, sub: {:?})",
|
||||
sub, client_id, sub.id
|
||||
);
|
||||
info!(
|
||||
"final query string (slow): {} (cid: {}, sub: {:?})",
|
||||
q, client_id, sub.id
|
||||
);
|
||||
} else {
|
||||
trace!(
|
||||
"going to query for: {:?} (cid: {}, sub: {:?})",
|
||||
sub,
|
||||
client_id,
|
||||
sub.id
|
||||
);
|
||||
trace!("final query string: {}", q);
|
||||
}
|
||||
debug!(
|
||||
"first result in {:?} (cid: {}, sub: {:?})",
|
||||
first_result_elapsed, client_id, sub.id
|
||||
first_event_elapsed, client_id, sub.id
|
||||
);
|
||||
first_result = false;
|
||||
}
|
||||
// logging for slow queries; show sub and SQL
|
||||
//
|
||||
if slow_first_event {
|
||||
info!(
|
||||
"query req (slow): {:?} (cid: {}, sub: {:?})",
|
||||
sub, client_id, sub.id
|
||||
);
|
||||
info!(
|
||||
"query string (slow): {} (cid: {}, sub: {:?})",
|
||||
q, client_id, sub.id
|
||||
);
|
||||
} else {
|
||||
trace!(
|
||||
"query req: {:?} (cid: {}, sub: {:?})",
|
||||
sub,
|
||||
client_id,
|
||||
sub.id
|
||||
);
|
||||
trace!(
|
||||
"query string: {} (cid: {}, sub: {:?})",
|
||||
q,
|
||||
client_id,
|
||||
sub.id
|
||||
);
|
||||
}
|
||||
// check if this is still active
|
||||
// TODO: check every N rows
|
||||
if abandon_query_rx.try_recv().is_ok() {
|
||||
|
@@ -16,11 +16,11 @@ pub const STARTUP_SQL: &str = r##"
|
||||
PRAGMA main.synchronous=NORMAL;
|
||||
PRAGMA foreign_keys = ON;
|
||||
PRAGMA journal_size_limit=32768;
|
||||
pragma mmap_size = 536870912; -- 512MB of mmap
|
||||
pragma mmap_size = 1073741824; -- 1024MB of mmap
|
||||
"##;
|
||||
|
||||
/// Latest database version
|
||||
pub const DB_VERSION: usize = 9;
|
||||
pub const DB_VERSION: usize = 11;
|
||||
|
||||
/// Schema definition
|
||||
const INIT_SQL: &str = formatcp!(
|
||||
@@ -67,6 +67,8 @@ FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS tag_val_index ON tag(value);
|
||||
CREATE INDEX IF NOT EXISTS tag_val_hex_index ON tag(value_hex);
|
||||
CREATE INDEX IF NOT EXISTS tag_composite_index ON tag(event_id,name,value_hex,value);
|
||||
CREATE INDEX IF NOT EXISTS tag_name_eid_index ON tag(name,event_id,value_hex);
|
||||
|
||||
-- NIP-05 User Validation
|
||||
CREATE TABLE IF NOT EXISTS user_verification (
|
||||
@@ -163,6 +165,12 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<()> {
|
||||
if curr_version == 8 {
|
||||
curr_version = mig_8_to_9(conn)?;
|
||||
}
|
||||
if curr_version == 9 {
|
||||
curr_version = mig_9_to_10(conn)?;
|
||||
}
|
||||
if curr_version == 10 {
|
||||
curr_version = mig_10_to_11(conn)?;
|
||||
}
|
||||
|
||||
if curr_version == DB_VERSION {
|
||||
info!(
|
||||
@@ -419,5 +427,45 @@ PRAGMA user_version = 9;
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
Ok(8)
|
||||
Ok(9)
|
||||
}
|
||||
|
||||
fn mig_9_to_10(conn: &mut PooledConnection) -> Result<usize> {
|
||||
info!("database schema needs update from 9->10");
|
||||
// Those old indexes were actually helpful...
|
||||
let upgrade_sql = r##"
|
||||
CREATE INDEX IF NOT EXISTS tag_composite_index ON tag(event_id,name,value_hex,value);
|
||||
PRAGMA user_version = 10;
|
||||
"##;
|
||||
match conn.execute_batch(upgrade_sql) {
|
||||
Ok(()) => {
|
||||
info!("database schema upgraded v9 -> v10");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
Ok(10)
|
||||
}
|
||||
|
||||
fn mig_10_to_11(conn: &mut PooledConnection) -> Result<usize> {
|
||||
info!("database schema needs update from 10->11");
|
||||
// Those old indexes were actually helpful...
|
||||
let upgrade_sql = r##"
|
||||
CREATE INDEX IF NOT EXISTS tag_name_eid_index ON tag(name,event_id,value_hex);
|
||||
reindex;
|
||||
pragma optimize;
|
||||
PRAGMA user_version = 11;
|
||||
"##;
|
||||
match conn.execute_batch(upgrade_sql) {
|
||||
Ok(()) => {
|
||||
info!("database schema upgraded v10 -> v11");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
Ok(11)
|
||||
}
|
||||
|
@@ -245,6 +245,14 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
|
||||
let rt = Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.thread_name("tokio-ws")
|
||||
// limit concurrent SQLite blocking threads
|
||||
.max_blocking_threads(settings.limits.max_blocking_threads)
|
||||
.on_thread_start(|| {
|
||||
debug!("started new thread");
|
||||
})
|
||||
.on_thread_stop(|| {
|
||||
debug!("stopping thread");
|
||||
})
|
||||
.build()
|
||||
.unwrap();
|
||||
// start tokio
|
||||
@@ -456,9 +464,11 @@ async fn nostr_server(
|
||||
let cid = conn.get_client_prefix();
|
||||
// Create a channel for receiving query results from the database.
|
||||
// we will send out the tx handle to any query we generate.
|
||||
let (query_tx, mut query_rx) = mpsc::channel::<db::QueryResult>(256);
|
||||
// this has capacity for some of the larger requests we see, which
|
||||
// should allow the DB thread to release the handle earlier.
|
||||
let (query_tx, mut query_rx) = mpsc::channel::<db::QueryResult>(20000);
|
||||
// Create channel for receiving NOTICEs
|
||||
let (notice_tx, mut notice_rx) = mpsc::channel::<Notice>(32);
|
||||
let (notice_tx, mut notice_rx) = mpsc::channel::<Notice>(128);
|
||||
|
||||
// last time this client sent data (message, ping, etc.)
|
||||
let mut last_message_time = Instant::now();
|
||||
@@ -476,8 +486,6 @@ async fn nostr_server(
|
||||
// when these subscriptions are cancelled, make a message
|
||||
// available to the executing query so it knows to stop.
|
||||
let mut running_queries: HashMap<String, oneshot::Sender<()>> = HashMap::new();
|
||||
// keep track of the subscriptions we have
|
||||
let mut current_subs: Vec<Subscription> = Vec::new();
|
||||
// for stats, keep track of how many events the client published,
|
||||
// and how many it received from queries.
|
||||
let mut client_published_event_count: usize = 0;
|
||||
@@ -625,11 +633,10 @@ async fn nostr_server(
|
||||
// * making a channel to cancel to request later
|
||||
// * sending a request for a SQL query
|
||||
// Do nothing if the sub already exists.
|
||||
if !current_subs.contains(&s) {
|
||||
if !conn.has_subscription(&s) {
|
||||
if let Some(ref lim) = sub_lim_opt {
|
||||
lim.until_ready_with_jitter(jitter).await;
|
||||
lim.until_ready_with_jitter(jitter).await;
|
||||
}
|
||||
current_subs.push(s.clone());
|
||||
let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>();
|
||||
match conn.subscribe(s.clone()) {
|
||||
Ok(()) => {
|
||||
@@ -637,27 +644,22 @@ async fn nostr_server(
|
||||
if let Some(previous_query) = running_queries.insert(s.id.to_owned(), abandon_query_tx) {
|
||||
previous_query.send(()).ok();
|
||||
}
|
||||
// start a database query
|
||||
// start a database query. this spawns a blocking database query on a worker thread.
|
||||
db::db_query(s, cid.to_owned(), pool.clone(), query_tx.clone(), abandon_query_rx).await;
|
||||
},
|
||||
Err(e) => {
|
||||
info!("Subscription error: {}", e);
|
||||
info!("Subscription error: {} (cid: {}, sub: {:?})", e, cid, s.id);
|
||||
ws_stream.send(make_notice_message(Notice::message(format!("Subscription error: {}", e)))).await.ok();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
info!("client send duplicate subscription, ignoring (cid: {}, sub: {:?})", cid, s.id);
|
||||
info!("client sent duplicate subscription, ignoring (cid: {}, sub: {:?})", cid, s.id);
|
||||
}
|
||||
},
|
||||
Ok(NostrMessage::CloseMsg(cc)) => {
|
||||
// closing a request simply removes the subscription.
|
||||
let parsed : Result<Close> = Result::<Close>::from(cc);
|
||||
if let Ok(c) = parsed {
|
||||
// remove from the list of known subs
|
||||
if let Some(pos) = current_subs.iter().position(|s| *s.id == c.id) {
|
||||
current_subs.remove(pos);
|
||||
}
|
||||
|
||||
// check if a query is currently
|
||||
// running, and remove it if so.
|
||||
let stop_tx = running_queries.remove(&c.id);
|
||||
|
Reference in New Issue
Block a user