mirror of
https://github.com/scsibug/nostr-rs-relay.git
synced 2025-09-01 03:40:46 -04:00
Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
ce37fc1a2d | ||
|
2cfd384339 | ||
|
8c013107f9 | ||
|
64a4466d30 | ||
|
1596c23eb4 | ||
|
129badd4e1 | ||
|
6f7c080180 | ||
|
af92561ef6 | ||
|
d833a3e40d |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -1096,7 +1096,7 @@ checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
|
||||
|
||||
[[package]]
|
||||
name = "nostr-rs-relay"
|
||||
version = "0.7.11"
|
||||
version = "0.7.12"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bitcoin_hashes",
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "nostr-rs-relay"
|
||||
version = "0.7.11"
|
||||
version = "0.7.12"
|
||||
edition = "2021"
|
||||
authors = ["Greg Heartsfield <scsibug@imap.cc>"]
|
||||
description = "A relay implementation for the Nostr protocol"
|
||||
|
49
src/db.rs
49
src/db.rs
@@ -116,7 +116,7 @@ pub async fn db_writer(
|
||||
&settings,
|
||||
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
|
||||
1,
|
||||
4,
|
||||
2,
|
||||
false,
|
||||
);
|
||||
if settings.database.in_memory {
|
||||
@@ -636,9 +636,16 @@ pub async fn db_query(
|
||||
query_tx: tokio::sync::mpsc::Sender<QueryResult>,
|
||||
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
|
||||
) {
|
||||
let start = Instant::now();
|
||||
let pre_spawn_start = Instant::now();
|
||||
task::spawn_blocking(move || {
|
||||
debug!("moved DB query to thread in {:?}", start.elapsed());
|
||||
let db_queue_time = pre_spawn_start.elapsed();
|
||||
// report queuing time if it is slow
|
||||
if db_queue_time > Duration::from_secs(1) {
|
||||
debug!(
|
||||
"(slow) DB query queued for {:?} (cid: {}, sub: {:?})",
|
||||
db_queue_time, client_id, sub.id
|
||||
);
|
||||
}
|
||||
let start = Instant::now();
|
||||
let mut row_count: usize = 0;
|
||||
// generate SQL query
|
||||
@@ -648,8 +655,12 @@ pub async fn db_query(
|
||||
log_pool_stats(&pool);
|
||||
// cutoff for displaying slow queries
|
||||
let slow_cutoff = Duration::from_millis(2000);
|
||||
// any client that doesn't cause us to generate new rows in 5
|
||||
// seconds gets dropped.
|
||||
let abort_cutoff = Duration::from_secs(5);
|
||||
let start = Instant::now();
|
||||
let mut slow_first_event;
|
||||
let mut last_successful_send = Instant::now();
|
||||
if let Ok(conn) = pool.get() {
|
||||
// execute the query. Don't cache, since queries vary so much.
|
||||
let mut stmt = conn.prepare(&q)?;
|
||||
@@ -667,12 +678,12 @@ pub async fn db_query(
|
||||
}
|
||||
// logging for slow queries; show sub and SQL.
|
||||
// to reduce logging; only show 1/16th of clients (leading 0)
|
||||
if slow_first_event && client_id.starts_with('0') {
|
||||
info!(
|
||||
if slow_first_event && client_id.starts_with("00") {
|
||||
debug!(
|
||||
"query req (slow): {:?} (cid: {}, sub: {:?})",
|
||||
sub, client_id, sub.id
|
||||
);
|
||||
info!(
|
||||
debug!(
|
||||
"query string (slow): {} (cid: {}, sub: {:?})",
|
||||
q, client_id, sub.id
|
||||
);
|
||||
@@ -697,12 +708,33 @@ pub async fn db_query(
|
||||
}
|
||||
row_count += 1;
|
||||
let event_json = row.get(0)?;
|
||||
loop {
|
||||
if query_tx.capacity() != 0 {
|
||||
// we have capacity to add another item
|
||||
break;
|
||||
} else {
|
||||
// the queue is full
|
||||
trace!("db reader thread is stalled");
|
||||
if last_successful_send + abort_cutoff < Instant::now() {
|
||||
// the queue has been full for too long, abort
|
||||
info!("aborting database query due to slow client");
|
||||
let ok: Result<()> = Ok(());
|
||||
return ok;
|
||||
}
|
||||
// give the queue a chance to clear before trying again
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
}
|
||||
}
|
||||
// TODO: we could use try_send, but we'd have to juggle
|
||||
// getting the query result back as part of the error
|
||||
// result.
|
||||
query_tx
|
||||
.blocking_send(QueryResult {
|
||||
sub_id: sub.get_id(),
|
||||
event: event_json,
|
||||
})
|
||||
.ok();
|
||||
last_successful_send = Instant::now();
|
||||
}
|
||||
query_tx
|
||||
.blocking_send(QueryResult {
|
||||
@@ -711,10 +743,11 @@ pub async fn db_query(
|
||||
})
|
||||
.ok();
|
||||
debug!(
|
||||
"query completed in {:?} (cid: {}, sub: {:?}, rows: {})",
|
||||
start.elapsed(),
|
||||
"query completed in {:?} (cid: {}, sub: {:?}, db_time: {:?}, rows: {})",
|
||||
pre_spawn_start.elapsed(),
|
||||
client_id,
|
||||
sub.id,
|
||||
start.elapsed(),
|
||||
row_count
|
||||
);
|
||||
} else {
|
||||
|
@@ -16,7 +16,7 @@ pub const STARTUP_SQL: &str = r##"
|
||||
PRAGMA main.synchronous=NORMAL;
|
||||
PRAGMA foreign_keys = ON;
|
||||
PRAGMA journal_size_limit=32768;
|
||||
pragma mmap_size = 1073741824; -- 1024MB of mmap
|
||||
pragma mmap_size = 17179869184; -- cap mmap at 16GB
|
||||
"##;
|
||||
|
||||
/// Latest database version
|
||||
|
@@ -339,8 +339,7 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
|
||||
let pool = db::build_pool(
|
||||
"client query",
|
||||
&settings,
|
||||
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
|
||||
| rusqlite::OpenFlags::SQLITE_OPEN_SHARED_CACHE,
|
||||
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY,
|
||||
db_min_conn,
|
||||
db_max_conn,
|
||||
true,
|
||||
|
Reference in New Issue
Block a user