mirror of
https://github.com/scsibug/nostr-rs-relay.git
synced 2024-11-12 22:29:07 -05:00
64bd983cb6
I have observed backups running for a very long time under heavy load, this introduces some artificial delay to give the online backup enough time to make progress.
787 lines
32 KiB
Rust
787 lines
32 KiB
Rust
//! Event persistence and querying
|
|
//use crate::config::SETTINGS;
|
|
use crate::config::Settings;
|
|
use crate::error::{Error, Result};
|
|
use crate::event::{single_char_tagname, Event};
|
|
use crate::hexrange::hex_range;
|
|
use crate::hexrange::HexSearch;
|
|
use crate::nip05;
|
|
use crate::notice::Notice;
|
|
use crate::schema::{upgrade_db, STARTUP_SQL};
|
|
use crate::subscription::ReqFilter;
|
|
use crate::subscription::Subscription;
|
|
use crate::utils::{is_hex, is_lower_hex};
|
|
use governor::clock::Clock;
|
|
use governor::{Quota, RateLimiter};
|
|
use hex;
|
|
use r2d2;
|
|
use r2d2_sqlite::SqliteConnectionManager;
|
|
use rusqlite::params;
|
|
use rusqlite::types::ToSql;
|
|
use rusqlite::OpenFlags;
|
|
use std::fmt::Write as _;
|
|
use std::path::Path;
|
|
use std::thread;
|
|
use std::time::Duration;
|
|
use std::time::Instant;
|
|
use tokio::task;
|
|
use tracing::{debug, info, trace, warn};
|
|
|
|
pub type SqlitePool = r2d2::Pool<r2d2_sqlite::SqliteConnectionManager>;
|
|
pub type PooledConnection = r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>;
|
|
|
|
/// Events submitted from a client, with a return channel for notices
|
|
pub struct SubmittedEvent {
|
|
pub event: Event,
|
|
pub notice_tx: tokio::sync::mpsc::Sender<Notice>,
|
|
}
|
|
|
|
/// Database file
|
|
pub const DB_FILE: &str = "nostr.db";
|
|
/// How many persisted events before optimization is triggered
|
|
pub const EVENT_COUNT_OPTIMIZE_TRIGGER: usize = 500;
|
|
/// How many persisted events before we pause for backups
|
|
pub const EVENT_COUNT_BACKUP_PAUSE_TRIGGER: usize = 5000;
|
|
|
|
/// Build a database connection pool.
|
|
/// # Panics
|
|
///
|
|
/// Will panic if the pool could not be created.
|
|
#[must_use]
|
|
pub fn build_pool(
|
|
name: &str,
|
|
settings: &Settings,
|
|
flags: OpenFlags,
|
|
min_size: u32,
|
|
max_size: u32,
|
|
wait_for_db: bool,
|
|
) -> SqlitePool {
|
|
let db_dir = &settings.database.data_directory;
|
|
let full_path = Path::new(db_dir).join(DB_FILE);
|
|
// small hack; if the database doesn't exist yet, that means the
|
|
// writer thread hasn't finished. Give it a chance to work. This
|
|
// is only an issue with the first time we run.
|
|
if !settings.database.in_memory {
|
|
while !full_path.exists() && wait_for_db {
|
|
debug!("Database reader pool is waiting on the database to be created...");
|
|
thread::sleep(Duration::from_millis(500));
|
|
}
|
|
}
|
|
let manager = if settings.database.in_memory {
|
|
SqliteConnectionManager::memory()
|
|
.with_flags(flags)
|
|
.with_init(|c| c.execute_batch(STARTUP_SQL))
|
|
} else {
|
|
SqliteConnectionManager::file(&full_path)
|
|
.with_flags(flags)
|
|
.with_init(|c| c.execute_batch(STARTUP_SQL))
|
|
};
|
|
let pool: SqlitePool = r2d2::Pool::builder()
|
|
.test_on_check_out(true) // no noticeable performance hit
|
|
.min_idle(Some(min_size))
|
|
.max_size(max_size)
|
|
.max_lifetime(Some(Duration::from_secs(60)))
|
|
.build(manager)
|
|
.unwrap();
|
|
info!(
|
|
"Built a connection pool {:?} (min={}, max={})",
|
|
name, min_size, max_size
|
|
);
|
|
pool
|
|
}
|
|
|
|
/// Perform normal maintenance
|
|
pub fn optimize_db(conn: &mut PooledConnection) -> Result<()> {
|
|
conn.execute_batch("PRAGMA optimize;")?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Spawn a database writer that persists events to the SQLite store.
|
|
pub async fn db_writer(
|
|
settings: Settings,
|
|
mut event_rx: tokio::sync::mpsc::Receiver<SubmittedEvent>,
|
|
bcast_tx: tokio::sync::broadcast::Sender<Event>,
|
|
metadata_tx: tokio::sync::broadcast::Sender<Event>,
|
|
mut shutdown: tokio::sync::broadcast::Receiver<()>,
|
|
) -> tokio::task::JoinHandle<Result<()>> {
|
|
// are we performing NIP-05 checking?
|
|
let nip05_active = settings.verified_users.is_active();
|
|
// are we requriing NIP-05 user verification?
|
|
let nip05_enabled = settings.verified_users.is_enabled();
|
|
|
|
task::spawn_blocking(move || {
|
|
let db_dir = &settings.database.data_directory;
|
|
let full_path = Path::new(db_dir).join(DB_FILE);
|
|
// create a connection pool
|
|
let pool = build_pool(
|
|
"event writer",
|
|
&settings,
|
|
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
|
|
1,
|
|
2,
|
|
false,
|
|
);
|
|
if settings.database.in_memory {
|
|
info!("using in-memory database, this will not persist a restart!");
|
|
} else {
|
|
info!("opened database {:?} for writing", full_path);
|
|
}
|
|
upgrade_db(&mut pool.get()?)?;
|
|
|
|
// Make a copy of the whitelist
|
|
let whitelist = &settings.authorization.pubkey_whitelist.clone();
|
|
|
|
// get rate limit settings
|
|
let rps_setting = settings.limits.messages_per_sec;
|
|
let mut most_recent_rate_limit = Instant::now();
|
|
let mut lim_opt = None;
|
|
// Keep rough track of events so we can run optimize eventually.
|
|
let mut optimize_counter: usize = 0;
|
|
// Constant writing has interfered with online backups. Keep track of how long since we've given the backups a chance to run.
|
|
let mut backup_pause_counter: usize = 0;
|
|
let clock = governor::clock::QuantaClock::default();
|
|
if let Some(rps) = rps_setting {
|
|
if rps > 0 {
|
|
info!("Enabling rate limits for event creation ({}/sec)", rps);
|
|
let quota = core::num::NonZeroU32::new(rps * 60).unwrap();
|
|
lim_opt = Some(RateLimiter::direct(Quota::per_minute(quota)));
|
|
}
|
|
}
|
|
loop {
|
|
if shutdown.try_recv().is_ok() {
|
|
info!("shutting down database writer");
|
|
break;
|
|
}
|
|
// call blocking read on channel
|
|
let next_event = event_rx.blocking_recv();
|
|
// if the channel has closed, we will never get work
|
|
if next_event.is_none() {
|
|
break;
|
|
}
|
|
// track if an event write occurred; this is used to
|
|
// update the rate limiter
|
|
let mut event_write = false;
|
|
let subm_event = next_event.unwrap();
|
|
let event = subm_event.event;
|
|
let notice_tx = subm_event.notice_tx;
|
|
// check if this event is authorized.
|
|
if let Some(allowed_addrs) = whitelist {
|
|
// TODO: incorporate delegated pubkeys
|
|
// if the event address is not in allowed_addrs.
|
|
if !allowed_addrs.contains(&event.pubkey) {
|
|
info!(
|
|
"Rejecting event {}, unauthorized author",
|
|
event.get_event_id_prefix()
|
|
);
|
|
notice_tx
|
|
.try_send(Notice::blocked(
|
|
event.id,
|
|
"pubkey is not allowed to publish to this relay",
|
|
))
|
|
.ok();
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// send any metadata events to the NIP-05 verifier
|
|
if nip05_active && event.is_kind_metadata() {
|
|
// we are sending this prior to even deciding if we
|
|
// persist it. this allows the nip05 module to
|
|
// inspect it, update if necessary, or persist a new
|
|
// event and broadcast it itself.
|
|
metadata_tx.send(event.clone()).ok();
|
|
}
|
|
|
|
// check for NIP-05 verification
|
|
if nip05_enabled {
|
|
match nip05::query_latest_user_verification(pool.get()?, event.pubkey.to_owned()) {
|
|
Ok(uv) => {
|
|
if uv.is_valid(&settings.verified_users) {
|
|
info!(
|
|
"new event from verified author ({:?},{:?})",
|
|
uv.name.to_string(),
|
|
event.get_author_prefix()
|
|
);
|
|
} else {
|
|
info!("rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)",
|
|
uv.name.to_string(),
|
|
event.get_author_prefix()
|
|
);
|
|
notice_tx
|
|
.try_send(Notice::blocked(
|
|
event.id,
|
|
"NIP-05 verification is no longer valid (expired/wrong domain)",
|
|
))
|
|
.ok();
|
|
continue;
|
|
}
|
|
}
|
|
Err(Error::SqlError(rusqlite::Error::QueryReturnedNoRows)) => {
|
|
debug!(
|
|
"no verification records found for pubkey: {:?}",
|
|
event.get_author_prefix()
|
|
);
|
|
notice_tx
|
|
.try_send(Notice::blocked(
|
|
event.id,
|
|
"NIP-05 verification needed to publish events",
|
|
))
|
|
.ok();
|
|
continue;
|
|
}
|
|
Err(e) => {
|
|
warn!("checking nip05 verification status failed: {:?}", e);
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
// TODO: cache recent list of authors to remove a DB call.
|
|
let start = Instant::now();
|
|
if event.kind >= 20000 && event.kind < 30000 {
|
|
bcast_tx.send(event.clone()).ok();
|
|
info!(
|
|
"published ephemeral event: {:?} from: {:?} in: {:?}",
|
|
event.get_event_id_prefix(),
|
|
event.get_author_prefix(),
|
|
start.elapsed()
|
|
);
|
|
event_write = true
|
|
} else {
|
|
log_pool_stats("writer", &pool);
|
|
match write_event(&mut pool.get()?, &event) {
|
|
Ok(updated) => {
|
|
if updated == 0 {
|
|
trace!("ignoring duplicate or deleted event");
|
|
notice_tx.try_send(Notice::duplicate(event.id)).ok();
|
|
} else {
|
|
info!(
|
|
"persisted event: {:?} from: {:?} in: {:?}",
|
|
event.get_event_id_prefix(),
|
|
event.get_author_prefix(),
|
|
start.elapsed()
|
|
);
|
|
event_write = true;
|
|
// send this out to all clients
|
|
bcast_tx.send(event.clone()).ok();
|
|
notice_tx.try_send(Notice::saved(event.id)).ok();
|
|
}
|
|
}
|
|
Err(err) => {
|
|
warn!("event insert failed: {:?}", err);
|
|
let msg = "relay experienced an error trying to publish the latest event";
|
|
notice_tx.try_send(Notice::error(event.id, msg)).ok();
|
|
}
|
|
}
|
|
backup_pause_counter += 1;
|
|
if backup_pause_counter > EVENT_COUNT_BACKUP_PAUSE_TRIGGER {
|
|
info!("pausing db write thread for a moment...");
|
|
thread::sleep(Duration::from_millis(500));
|
|
backup_pause_counter = 0
|
|
}
|
|
|
|
// Use this as a trigger to do optimization
|
|
optimize_counter += 1;
|
|
if optimize_counter > EVENT_COUNT_OPTIMIZE_TRIGGER {
|
|
info!("running database optimizer");
|
|
optimize_counter = 0;
|
|
optimize_db(&mut pool.get()?).ok();
|
|
}
|
|
}
|
|
|
|
// use rate limit, if defined, and if an event was actually written.
|
|
if event_write {
|
|
if let Some(ref lim) = lim_opt {
|
|
if let Err(n) = lim.check() {
|
|
let wait_for = n.wait_time_from(clock.now());
|
|
// check if we have recently logged rate
|
|
// limits, but print out a message only once
|
|
// per second.
|
|
if most_recent_rate_limit.elapsed().as_secs() > 10 {
|
|
warn!(
|
|
"rate limit reached for event creation (sleep for {:?}) (suppressing future messages for 10 seconds)",
|
|
wait_for
|
|
);
|
|
// reset last rate limit message
|
|
most_recent_rate_limit = Instant::now();
|
|
}
|
|
// block event writes, allowing them to queue up
|
|
thread::sleep(wait_for);
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
info!("database connection closed");
|
|
Ok(())
|
|
})
|
|
}
|
|
|
|
/// Persist an event to the database, returning rows added.
|
|
pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
|
|
// start transaction
|
|
let tx = conn.transaction()?;
|
|
// get relevant fields from event and convert to blobs.
|
|
let id_blob = hex::decode(&e.id).ok();
|
|
let pubkey_blob: Option<Vec<u8>> = hex::decode(&e.pubkey).ok();
|
|
let delegator_blob: Option<Vec<u8>> = e.delegated_by.as_ref().and_then(|d| hex::decode(d).ok());
|
|
let event_str = serde_json::to_string(&e).ok();
|
|
// ignore if the event hash is a duplicate.
|
|
let mut ins_count = tx.execute(
|
|
"INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, delegated_by, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%s','now'), FALSE);",
|
|
params![id_blob, e.created_at, e.kind, pubkey_blob, delegator_blob, event_str]
|
|
)?;
|
|
if ins_count == 0 {
|
|
// if the event was a duplicate, no need to insert event or
|
|
// pubkey references.
|
|
tx.rollback().ok();
|
|
return Ok(ins_count);
|
|
}
|
|
// remember primary key of the event most recently inserted.
|
|
let ev_id = tx.last_insert_rowid();
|
|
// add all tags to the tag table
|
|
for tag in e.tags.iter() {
|
|
// ensure we have 2 values.
|
|
if tag.len() >= 2 {
|
|
let tagname = &tag[0];
|
|
let tagval = &tag[1];
|
|
// only single-char tags are searchable
|
|
let tagchar_opt = single_char_tagname(tagname);
|
|
match &tagchar_opt {
|
|
Some(_) => {
|
|
// if tagvalue is lowercase hex;
|
|
if is_lower_hex(tagval) && (tagval.len() % 2 == 0) {
|
|
tx.execute(
|
|
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
|
|
params![ev_id, &tagname, hex::decode(tagval).ok()],
|
|
)?;
|
|
} else {
|
|
tx.execute(
|
|
"INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)",
|
|
params![ev_id, &tagname, &tagval],
|
|
)?;
|
|
}
|
|
}
|
|
None => {}
|
|
}
|
|
}
|
|
}
|
|
// if this event is replaceable update, hide every other replaceable
|
|
// event with the same kind from the same author that was issued
|
|
// earlier than this.
|
|
if e.kind == 0 || e.kind == 3 || (e.kind >= 10000 && e.kind < 20000) {
|
|
let update_count = tx.execute(
|
|
"UPDATE event SET hidden=TRUE WHERE id!=? AND kind=? AND author=? AND created_at <= ? and hidden!=TRUE",
|
|
params![ev_id, e.kind, hex::decode(&e.pubkey).ok(), e.created_at],
|
|
)?;
|
|
if update_count > 0 {
|
|
info!(
|
|
"hid {} older replaceable kind {} events for author: {:?}",
|
|
update_count,
|
|
e.kind,
|
|
e.get_author_prefix()
|
|
);
|
|
}
|
|
}
|
|
// if this event is a deletion, hide the referenced events from the same author.
|
|
if e.kind == 5 {
|
|
let event_candidates = e.tag_values_by_name("e");
|
|
// first parameter will be author
|
|
let mut params: Vec<Box<dyn ToSql>> = vec![Box::new(hex::decode(&e.pubkey)?)];
|
|
event_candidates
|
|
.iter()
|
|
.filter(|x| is_hex(x) && x.len() == 64)
|
|
.filter_map(|x| hex::decode(x).ok())
|
|
.for_each(|x| params.push(Box::new(x)));
|
|
let query = format!(
|
|
"UPDATE event SET hidden=TRUE WHERE kind!=5 AND author=? AND event_hash IN ({})",
|
|
repeat_vars(params.len() - 1)
|
|
);
|
|
let mut stmt = tx.prepare(&query)?;
|
|
let update_count = stmt.execute(rusqlite::params_from_iter(params))?;
|
|
info!(
|
|
"hid {} deleted events for author {:?}",
|
|
update_count,
|
|
e.get_author_prefix()
|
|
);
|
|
} else {
|
|
// check if a deletion has already been recorded for this event.
|
|
// Only relevant for non-deletion events
|
|
let del_count = tx.query_row(
|
|
"SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND t.name='e' AND e.kind=5 AND t.value_hex=? LIMIT 1;",
|
|
params![pubkey_blob, id_blob], |row| row.get::<usize, usize>(0));
|
|
// check if a the query returned a result, meaning we should
|
|
// hid the current event
|
|
if del_count.ok().is_some() {
|
|
// a deletion already existed, mark original event as hidden.
|
|
info!(
|
|
"hid event: {:?} due to existing deletion by author: {:?}",
|
|
e.get_event_id_prefix(),
|
|
e.get_author_prefix()
|
|
);
|
|
let _update_count =
|
|
tx.execute("UPDATE event SET hidden=TRUE WHERE id=?", params![ev_id])?;
|
|
// event was deleted, so let caller know nothing new
|
|
// arrived, preventing this from being sent to active
|
|
// subscriptions
|
|
ins_count = 0;
|
|
}
|
|
}
|
|
tx.commit()?;
|
|
Ok(ins_count)
|
|
}
|
|
|
|
/// Serialized event associated with a specific subscription request.
|
|
#[derive(PartialEq, Eq, Debug, Clone)]
|
|
pub struct QueryResult {
|
|
/// Subscription identifier
|
|
pub sub_id: String,
|
|
/// Serialized event
|
|
pub event: String,
|
|
}
|
|
|
|
/// Produce a arbitrary list of '?' parameters.
|
|
fn repeat_vars(count: usize) -> String {
|
|
if count == 0 {
|
|
return "".to_owned();
|
|
}
|
|
let mut s = "?,".repeat(count);
|
|
// Remove trailing comma
|
|
s.pop();
|
|
s
|
|
}
|
|
|
|
/// Create a dynamic SQL subquery and params from a subscription filter.
|
|
fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
|
|
// build a dynamic SQL query. all user-input is either an integer
|
|
// (sqli-safe), or a string that is filtered to only contain
|
|
// hexadecimal characters. Strings that require escaping (tag
|
|
// names/values) use parameters.
|
|
|
|
// if the filter is malformed, don't return anything.
|
|
if f.force_no_match {
|
|
let empty_query = "SELECT e.content, e.created_at FROM event e WHERE 1=0".to_owned();
|
|
// query parameters for SQLite
|
|
let empty_params: Vec<Box<dyn ToSql>> = vec![];
|
|
return (empty_query, empty_params);
|
|
}
|
|
|
|
let mut query = "SELECT e.content, e.created_at FROM event e".to_owned();
|
|
// query parameters for SQLite
|
|
let mut params: Vec<Box<dyn ToSql>> = vec![];
|
|
|
|
// individual filter components (single conditions such as an author or event ID)
|
|
let mut filter_components: Vec<String> = Vec::new();
|
|
// Query for "authors", allowing prefix matches
|
|
if let Some(authvec) = &f.authors {
|
|
// take each author and convert to a hexsearch
|
|
let mut auth_searches: Vec<String> = vec![];
|
|
for auth in authvec {
|
|
match hex_range(auth) {
|
|
Some(HexSearch::Exact(ex)) => {
|
|
auth_searches.push("author=? OR delegated_by=?".to_owned());
|
|
params.push(Box::new(ex.clone()));
|
|
params.push(Box::new(ex));
|
|
}
|
|
Some(HexSearch::Range(lower, upper)) => {
|
|
auth_searches.push(
|
|
"(author>? AND author<?) OR (delegated_by>? AND delegated_by<?)".to_owned(),
|
|
);
|
|
params.push(Box::new(lower.clone()));
|
|
params.push(Box::new(upper.clone()));
|
|
params.push(Box::new(lower));
|
|
params.push(Box::new(upper));
|
|
}
|
|
Some(HexSearch::LowerOnly(lower)) => {
|
|
auth_searches.push("author>? OR delegated_by>?".to_owned());
|
|
params.push(Box::new(lower.clone()));
|
|
params.push(Box::new(lower));
|
|
}
|
|
None => {
|
|
info!("Could not parse hex range from author {:?}", auth);
|
|
}
|
|
}
|
|
}
|
|
if !authvec.is_empty() {
|
|
let authors_clause = format!("({})", auth_searches.join(" OR "));
|
|
filter_components.push(authors_clause);
|
|
} else {
|
|
// if the authors list was empty, we should never return
|
|
// any results.
|
|
filter_components.push("false".to_owned());
|
|
}
|
|
}
|
|
// Query for Kind
|
|
if let Some(ks) = &f.kinds {
|
|
// kind is number, no escaping needed
|
|
let str_kinds: Vec<String> = ks.iter().map(|x| x.to_string()).collect();
|
|
let kind_clause = format!("kind IN ({})", str_kinds.join(", "));
|
|
filter_components.push(kind_clause);
|
|
}
|
|
// Query for event, allowing prefix matches
|
|
if let Some(idvec) = &f.ids {
|
|
// take each author and convert to a hexsearch
|
|
let mut id_searches: Vec<String> = vec![];
|
|
for id in idvec {
|
|
match hex_range(id) {
|
|
Some(HexSearch::Exact(ex)) => {
|
|
id_searches.push("event_hash=?".to_owned());
|
|
params.push(Box::new(ex));
|
|
}
|
|
Some(HexSearch::Range(lower, upper)) => {
|
|
id_searches.push("(event_hash>? AND event_hash<?)".to_owned());
|
|
params.push(Box::new(lower));
|
|
params.push(Box::new(upper));
|
|
}
|
|
Some(HexSearch::LowerOnly(lower)) => {
|
|
id_searches.push("event_hash>?".to_owned());
|
|
params.push(Box::new(lower));
|
|
}
|
|
None => {
|
|
info!("Could not parse hex range from id {:?}", id);
|
|
}
|
|
}
|
|
}
|
|
if !idvec.is_empty() {
|
|
let id_clause = format!("({})", id_searches.join(" OR "));
|
|
filter_components.push(id_clause);
|
|
} else {
|
|
// if the ids list was empty, we should never return
|
|
// any results.
|
|
filter_components.push("false".to_owned());
|
|
}
|
|
}
|
|
// Query for tags
|
|
if let Some(map) = &f.tags {
|
|
for (key, val) in map.iter() {
|
|
let mut str_vals: Vec<Box<dyn ToSql>> = vec![];
|
|
let mut blob_vals: Vec<Box<dyn ToSql>> = vec![];
|
|
for v in val {
|
|
if (v.len() % 2 == 0) && is_lower_hex(v) {
|
|
if let Ok(h) = hex::decode(v) {
|
|
blob_vals.push(Box::new(h));
|
|
}
|
|
} else {
|
|
str_vals.push(Box::new(v.to_owned()));
|
|
}
|
|
}
|
|
// create clauses with "?" params for each tag value being searched
|
|
let str_clause = format!("value IN ({})", repeat_vars(str_vals.len()));
|
|
let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
|
|
// find evidence of the target tag name/value existing for this event.
|
|
let tag_clause = format!("e.id IN (SELECT e.id FROM event e LEFT JOIN tag t on e.id=t.event_id WHERE hidden!=TRUE and (name=? AND ({} OR {})))", str_clause, blob_clause);
|
|
// add the tag name as the first parameter
|
|
params.push(Box::new(key.to_string()));
|
|
// add all tag values that are plain strings as params
|
|
params.append(&mut str_vals);
|
|
// add all tag values that are blobs as params
|
|
params.append(&mut blob_vals);
|
|
filter_components.push(tag_clause);
|
|
}
|
|
}
|
|
// Query for timestamp
|
|
if f.since.is_some() {
|
|
let created_clause = format!("created_at > {}", f.since.unwrap());
|
|
filter_components.push(created_clause);
|
|
}
|
|
// Query for timestamp
|
|
if f.until.is_some() {
|
|
let until_clause = format!("created_at < {}", f.until.unwrap());
|
|
filter_components.push(until_clause);
|
|
}
|
|
// never display hidden events
|
|
query.push_str(" WHERE hidden!=TRUE");
|
|
// build filter component conditions
|
|
if !filter_components.is_empty() {
|
|
query.push_str(" AND ");
|
|
query.push_str(&filter_components.join(" AND "));
|
|
}
|
|
// Apply per-filter limit to this subquery.
|
|
// The use of a LIMIT implies a DESC order, to capture only the most recent events.
|
|
if let Some(lim) = f.limit {
|
|
let _ = write!(query, " ORDER BY e.created_at DESC LIMIT {}", lim);
|
|
} else {
|
|
query.push_str(" ORDER BY e.created_at ASC")
|
|
}
|
|
(query, params)
|
|
}
|
|
|
|
/// Create a dynamic SQL query string and params from a subscription.
|
|
fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
|
|
// build a dynamic SQL query for an entire subscription, based on
|
|
// SQL subqueries for filters.
|
|
let mut subqueries: Vec<String> = Vec::new();
|
|
// subquery params
|
|
let mut params: Vec<Box<dyn ToSql>> = vec![];
|
|
// for every filter in the subscription, generate a subquery
|
|
for f in sub.filters.iter() {
|
|
let (f_subquery, mut f_params) = query_from_filter(f);
|
|
subqueries.push(f_subquery);
|
|
params.append(&mut f_params);
|
|
}
|
|
// encapsulate subqueries into select statements
|
|
let subqueries_selects: Vec<String> = subqueries
|
|
.iter()
|
|
.map(|s| format!("SELECT distinct content, created_at FROM ({})", s))
|
|
.collect();
|
|
let query: String = subqueries_selects.join(" UNION ");
|
|
(query, params)
|
|
}
|
|
|
|
/// Check if the pool is fully utilized
|
|
fn _pool_at_capacity(pool: &SqlitePool) -> bool {
|
|
let state: r2d2::State = pool.state();
|
|
state.idle_connections == 0
|
|
}
|
|
|
|
fn log_pool_stats(name: &str, pool: &SqlitePool) {
|
|
let state: r2d2::State = pool.state();
|
|
let in_use_cxns = state.connections - state.idle_connections;
|
|
debug!(
|
|
"DB pool {:?} usage (in_use: {}, available: {})",
|
|
name, in_use_cxns, state.connections
|
|
);
|
|
}
|
|
|
|
/// Perform a database query using a subscription.
|
|
///
|
|
/// The [`Subscription`] is converted into a SQL query. Each result
|
|
/// is published on the `query_tx` channel as it is returned. If a
|
|
/// message becomes available on the `abandon_query_rx` channel, the
|
|
/// query is immediately aborted.
|
|
pub async fn db_query(
|
|
sub: Subscription,
|
|
client_id: String,
|
|
pool: SqlitePool,
|
|
query_tx: tokio::sync::mpsc::Sender<QueryResult>,
|
|
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
|
|
) {
|
|
let pre_spawn_start = Instant::now();
|
|
task::spawn_blocking(move || {
|
|
let db_queue_time = pre_spawn_start.elapsed();
|
|
// if the queue time was very long (>5 seconds), spare the DB and abort.
|
|
if db_queue_time > Duration::from_secs(5) {
|
|
info!(
|
|
"shedding DB query load from {:?} (cid: {}, sub: {:?})",
|
|
db_queue_time, client_id, sub.id
|
|
);
|
|
return Ok(());
|
|
}
|
|
// otherwise, report queuing time if it is slow
|
|
else if db_queue_time > Duration::from_secs(1) {
|
|
debug!(
|
|
"(slow) DB query queued for {:?} (cid: {}, sub: {:?})",
|
|
db_queue_time, client_id, sub.id
|
|
);
|
|
}
|
|
let start = Instant::now();
|
|
let mut row_count: usize = 0;
|
|
// generate SQL query
|
|
let (q, p) = query_from_sub(&sub);
|
|
debug!("SQL generated in {:?}", start.elapsed());
|
|
// show pool stats
|
|
log_pool_stats("reader", &pool);
|
|
// cutoff for displaying slow queries
|
|
let slow_cutoff = Duration::from_millis(2000);
|
|
// any client that doesn't cause us to generate new rows in 5
|
|
// seconds gets dropped.
|
|
let abort_cutoff = Duration::from_secs(5);
|
|
let start = Instant::now();
|
|
let mut slow_first_event;
|
|
let mut last_successful_send = Instant::now();
|
|
if let Ok(conn) = pool.get() {
|
|
// execute the query. Don't cache, since queries vary so much.
|
|
let mut stmt = conn.prepare(&q)?;
|
|
let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
|
|
let mut first_result = true;
|
|
while let Some(row) = event_rows.next()? {
|
|
let first_event_elapsed = start.elapsed();
|
|
slow_first_event = first_event_elapsed >= slow_cutoff;
|
|
if first_result {
|
|
debug!(
|
|
"first result in {:?} (cid: {}, sub: {:?})",
|
|
first_event_elapsed, client_id, sub.id
|
|
);
|
|
first_result = false;
|
|
}
|
|
// logging for slow queries; show sub and SQL.
|
|
// to reduce logging; only show 1/16th of clients (leading 0)
|
|
if slow_first_event && client_id.starts_with("00") {
|
|
debug!(
|
|
"query req (slow): {:?} (cid: {}, sub: {:?})",
|
|
sub, client_id, sub.id
|
|
);
|
|
debug!(
|
|
"query string (slow): {} (cid: {}, sub: {:?})",
|
|
q, client_id, sub.id
|
|
);
|
|
} else {
|
|
trace!(
|
|
"query req: {:?} (cid: {}, sub: {:?})",
|
|
sub,
|
|
client_id,
|
|
sub.id
|
|
);
|
|
trace!(
|
|
"query string: {} (cid: {}, sub: {:?})",
|
|
q,
|
|
client_id,
|
|
sub.id
|
|
);
|
|
}
|
|
// check if this is still active; every 100 rows
|
|
if row_count % 100 == 0 && abandon_query_rx.try_recv().is_ok() {
|
|
debug!("query aborted (cid: {}, sub: {:?})", client_id, sub.id);
|
|
return Ok(());
|
|
}
|
|
row_count += 1;
|
|
let event_json = row.get(0)?;
|
|
loop {
|
|
if query_tx.capacity() != 0 {
|
|
// we have capacity to add another item
|
|
break;
|
|
} else {
|
|
// the queue is full
|
|
trace!("db reader thread is stalled");
|
|
if last_successful_send + abort_cutoff < Instant::now() {
|
|
// the queue has been full for too long, abort
|
|
info!("aborting database query due to slow client");
|
|
let ok: Result<()> = Ok(());
|
|
return ok;
|
|
}
|
|
// give the queue a chance to clear before trying again
|
|
thread::sleep(Duration::from_millis(100));
|
|
}
|
|
}
|
|
// TODO: we could use try_send, but we'd have to juggle
|
|
// getting the query result back as part of the error
|
|
// result.
|
|
query_tx
|
|
.blocking_send(QueryResult {
|
|
sub_id: sub.get_id(),
|
|
event: event_json,
|
|
})
|
|
.ok();
|
|
last_successful_send = Instant::now();
|
|
}
|
|
query_tx
|
|
.blocking_send(QueryResult {
|
|
sub_id: sub.get_id(),
|
|
event: "EOSE".to_string(),
|
|
})
|
|
.ok();
|
|
debug!(
|
|
"query completed in {:?} (cid: {}, sub: {:?}, db_time: {:?}, rows: {})",
|
|
pre_spawn_start.elapsed(),
|
|
client_id,
|
|
sub.id,
|
|
start.elapsed(),
|
|
row_count
|
|
);
|
|
} else {
|
|
warn!("Could not get a database connection for querying");
|
|
}
|
|
let ok: Result<()> = Ok(());
|
|
ok
|
|
});
|
|
}
|