2021-12-11 22:43:41 -05:00
|
|
|
//! Event persistence and querying
|
2021-12-11 16:48:59 -05:00
|
|
|
use crate::error::Result;
|
|
|
|
use crate::event::Event;
|
|
|
|
use crate::subscription::Subscription;
|
2021-12-30 22:07:21 -05:00
|
|
|
use governor::clock::Clock;
|
|
|
|
use governor::{Quota, RateLimiter};
|
2021-12-11 16:48:59 -05:00
|
|
|
use hex;
|
|
|
|
use log::*;
|
|
|
|
use rusqlite::params;
|
|
|
|
use rusqlite::Connection;
|
|
|
|
use rusqlite::OpenFlags;
|
2021-12-30 22:07:21 -05:00
|
|
|
//use std::num::NonZeroU32;
|
|
|
|
use crate::config::SETTINGS;
|
2021-12-11 16:48:59 -05:00
|
|
|
use std::path::Path;
|
2021-12-30 22:07:21 -05:00
|
|
|
use std::thread;
|
2022-01-01 20:25:09 -05:00
|
|
|
use std::time::Instant;
|
2021-12-11 16:48:59 -05:00
|
|
|
use tokio::task;
|
|
|
|
|
2021-12-11 22:43:41 -05:00
|
|
|
/// Database file
|
2021-12-11 16:48:59 -05:00
|
|
|
const DB_FILE: &str = "nostr.db";
|
|
|
|
|
2021-12-30 14:55:05 -05:00
|
|
|
/// Startup DB Pragmas
|
|
|
|
const STARTUP_SQL: &str = r##"
|
|
|
|
PRAGMA main.synchronous=NORMAL;
|
|
|
|
PRAGMA foreign_keys = ON;
|
|
|
|
pragma mmap_size = 536870912; -- 512MB of mmap
|
|
|
|
"##;
|
|
|
|
|
2021-12-11 22:43:41 -05:00
|
|
|
/// Schema definition
|
2021-12-11 16:48:59 -05:00
|
|
|
const INIT_SQL: &str = r##"
|
2021-12-11 22:43:41 -05:00
|
|
|
-- Database settings
|
2021-12-11 16:48:59 -05:00
|
|
|
PRAGMA encoding = "UTF-8";
|
|
|
|
PRAGMA journal_mode=WAL;
|
|
|
|
PRAGMA main.synchronous=NORMAL;
|
|
|
|
PRAGMA foreign_keys = ON;
|
|
|
|
PRAGMA application_id = 1654008667;
|
2021-12-30 14:55:05 -05:00
|
|
|
PRAGMA user_version = 2;
|
2021-12-11 22:43:41 -05:00
|
|
|
|
|
|
|
-- Event Table
|
2021-12-11 16:48:59 -05:00
|
|
|
CREATE TABLE IF NOT EXISTS event (
|
|
|
|
id INTEGER PRIMARY KEY,
|
|
|
|
event_hash BLOB NOT NULL, -- 4-byte hash
|
|
|
|
first_seen INTEGER NOT NULL, -- when the event was first seen (not authored!) (seconds since 1970)
|
|
|
|
created_at INTEGER NOT NULL, -- when the event was authored
|
|
|
|
author BLOB NOT NULL, -- author pubkey
|
|
|
|
kind INTEGER NOT NULL, -- event kind
|
2021-12-30 14:55:05 -05:00
|
|
|
hidden INTEGER, -- relevant for queries
|
2021-12-11 16:48:59 -05:00
|
|
|
content TEXT NOT NULL -- serialized json of event object
|
|
|
|
);
|
2021-12-11 22:43:41 -05:00
|
|
|
|
|
|
|
-- Event Indexes
|
2021-12-11 16:48:59 -05:00
|
|
|
CREATE UNIQUE INDEX IF NOT EXISTS event_hash_index ON event(event_hash);
|
|
|
|
CREATE INDEX IF NOT EXISTS created_at_index ON event(created_at);
|
|
|
|
CREATE INDEX IF NOT EXISTS author_index ON event(author);
|
|
|
|
CREATE INDEX IF NOT EXISTS kind_index ON event(kind);
|
2021-12-11 22:43:41 -05:00
|
|
|
|
|
|
|
-- Event References Table
|
2021-12-11 16:48:59 -05:00
|
|
|
CREATE TABLE IF NOT EXISTS event_ref (
|
|
|
|
id INTEGER PRIMARY KEY,
|
|
|
|
event_id INTEGER NOT NULL, -- an event ID that contains an #e tag.
|
|
|
|
referenced_event BLOB NOT NULL, -- the event that is referenced.
|
|
|
|
FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
|
|
|
|
);
|
2021-12-11 22:43:41 -05:00
|
|
|
|
|
|
|
-- Event References Index
|
2021-12-11 16:48:59 -05:00
|
|
|
CREATE INDEX IF NOT EXISTS event_ref_index ON event_ref(referenced_event);
|
2021-12-11 22:43:41 -05:00
|
|
|
|
|
|
|
-- Pubkey References Table
|
2021-12-11 16:48:59 -05:00
|
|
|
CREATE TABLE IF NOT EXISTS pubkey_ref (
|
|
|
|
id INTEGER PRIMARY KEY,
|
|
|
|
event_id INTEGER NOT NULL, -- an event ID that contains an #p tag.
|
|
|
|
referenced_pubkey BLOB NOT NULL, -- the pubkey that is referenced.
|
|
|
|
FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE RESTRICT ON DELETE CASCADE
|
|
|
|
);
|
2021-12-11 22:43:41 -05:00
|
|
|
|
|
|
|
-- Pubkey References Index
|
2021-12-11 16:48:59 -05:00
|
|
|
CREATE INDEX IF NOT EXISTS pubkey_ref_index ON pubkey_ref(referenced_pubkey);
|
|
|
|
"##;
|
|
|
|
|
2021-12-30 14:55:05 -05:00
|
|
|
/// Upgrade DB to latest version, and execute pragma settings
|
|
|
|
pub fn upgrade_db(conn: &mut Connection) -> Result<()> {
|
|
|
|
// check the version.
|
|
|
|
let curr_version = db_version(conn)?;
|
|
|
|
info!("DB version = {:?}", curr_version);
|
|
|
|
|
|
|
|
// initialize from scratch
|
|
|
|
if curr_version == 0 {
|
|
|
|
match conn.execute_batch(INIT_SQL) {
|
|
|
|
Ok(()) => info!("database pragma/schema initialized to v2, and ready"),
|
|
|
|
Err(err) => {
|
|
|
|
error!("update failed: {}", err);
|
|
|
|
panic!("database could not be initialized");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else if curr_version == 1 {
|
|
|
|
// only change is adding a hidden column to events.
|
|
|
|
let upgrade_sql = r##"
|
|
|
|
ALTER TABLE event ADD hidden INTEGER;
|
|
|
|
UPDATE event SET hidden=FALSE;
|
|
|
|
PRAGMA user_version = 2;
|
|
|
|
"##;
|
|
|
|
match conn.execute_batch(upgrade_sql) {
|
|
|
|
Ok(()) => info!("database schema upgraded v1 -> v2"),
|
|
|
|
Err(err) => {
|
|
|
|
error!("update failed: {}", err);
|
|
|
|
panic!("database could not be upgraded");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else if curr_version == 2 {
|
|
|
|
debug!("Database version was already current");
|
|
|
|
} else if curr_version > 2 {
|
|
|
|
panic!("Database version is newer than supported by this executable");
|
|
|
|
}
|
|
|
|
// Setup PRAGMA
|
|
|
|
conn.execute_batch(STARTUP_SQL)?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2021-12-11 16:48:59 -05:00
|
|
|
/// Spawn a database writer that persists events to the SQLite store.
|
|
|
|
pub async fn db_writer(
|
|
|
|
mut event_rx: tokio::sync::mpsc::Receiver<Event>,
|
2021-12-12 11:20:23 -05:00
|
|
|
bcast_tx: tokio::sync::broadcast::Sender<Event>,
|
2021-12-30 22:07:21 -05:00
|
|
|
mut shutdown: tokio::sync::broadcast::Receiver<()>,
|
2021-12-11 16:48:59 -05:00
|
|
|
) -> tokio::task::JoinHandle<Result<()>> {
|
|
|
|
task::spawn_blocking(move || {
|
2021-12-31 12:51:57 -05:00
|
|
|
// get database configuration settings
|
|
|
|
let config = SETTINGS.read().unwrap();
|
|
|
|
let db_dir = &config.database.data_directory;
|
|
|
|
let full_path = Path::new(db_dir).join(DB_FILE);
|
|
|
|
// create a connection
|
2021-12-11 16:48:59 -05:00
|
|
|
let mut conn = Connection::open_with_flags(
|
2021-12-31 12:51:57 -05:00
|
|
|
&full_path,
|
2021-12-11 16:48:59 -05:00
|
|
|
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
|
|
|
|
)?;
|
2021-12-31 12:51:57 -05:00
|
|
|
info!("opened database {:?} for writing", full_path);
|
2021-12-30 14:55:05 -05:00
|
|
|
upgrade_db(&mut conn)?;
|
2021-12-30 22:07:21 -05:00
|
|
|
// get rate limit settings
|
|
|
|
let rps_setting = config.limits.messages_per_sec;
|
2022-01-05 18:30:47 -05:00
|
|
|
let mut most_recent_rate_limit = Instant::now();
|
2021-12-30 22:07:21 -05:00
|
|
|
let mut lim_opt = None;
|
|
|
|
let clock = governor::clock::QuantaClock::default();
|
|
|
|
if let Some(rps) = rps_setting {
|
|
|
|
if rps > 0 {
|
|
|
|
info!("Enabling rate limits for event creation ({}/sec)", rps);
|
|
|
|
let quota = core::num::NonZeroU32::new(rps * 60).unwrap();
|
|
|
|
lim_opt = Some(RateLimiter::direct(Quota::per_minute(quota)));
|
|
|
|
}
|
|
|
|
}
|
2021-12-11 16:48:59 -05:00
|
|
|
loop {
|
2021-12-31 16:19:35 -05:00
|
|
|
if shutdown.try_recv().is_ok() {
|
2021-12-30 22:07:21 -05:00
|
|
|
info!("shutting down database writer");
|
|
|
|
break;
|
|
|
|
}
|
2021-12-11 16:48:59 -05:00
|
|
|
// call blocking read on channel
|
|
|
|
let next_event = event_rx.blocking_recv();
|
|
|
|
// if the channel has closed, we will never get work
|
|
|
|
if next_event.is_none() {
|
|
|
|
break;
|
|
|
|
}
|
2021-12-30 22:07:21 -05:00
|
|
|
let mut event_write = false;
|
2021-12-11 16:48:59 -05:00
|
|
|
let event = next_event.unwrap();
|
2022-01-01 20:25:09 -05:00
|
|
|
let start = Instant::now();
|
2021-12-11 16:48:59 -05:00
|
|
|
match write_event(&mut conn, &event) {
|
|
|
|
Ok(updated) => {
|
|
|
|
if updated == 0 {
|
2021-12-30 22:07:21 -05:00
|
|
|
debug!("ignoring duplicate event");
|
2021-12-11 16:48:59 -05:00
|
|
|
} else {
|
2022-01-01 20:25:09 -05:00
|
|
|
info!(
|
|
|
|
"persisted event: {} in {:?}",
|
|
|
|
event.get_event_id_prefix(),
|
|
|
|
start.elapsed()
|
|
|
|
);
|
2021-12-30 22:07:21 -05:00
|
|
|
event_write = true;
|
2021-12-12 11:20:23 -05:00
|
|
|
// send this out to all clients
|
|
|
|
bcast_tx.send(event.clone()).ok();
|
2021-12-11 16:48:59 -05:00
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(err) => {
|
2021-12-11 22:43:41 -05:00
|
|
|
warn!("event insert failed: {}", err);
|
2021-12-11 16:48:59 -05:00
|
|
|
}
|
|
|
|
}
|
2021-12-30 22:07:21 -05:00
|
|
|
// use rate limit, if defined, and if an event was actually written.
|
|
|
|
if event_write {
|
|
|
|
if let Some(ref lim) = lim_opt {
|
|
|
|
if let Err(n) = lim.check() {
|
2022-01-05 18:30:47 -05:00
|
|
|
let wait_for = n.wait_time_from(clock.now());
|
|
|
|
// check if we have recently logged rate
|
|
|
|
// limits, but print out a message only once
|
|
|
|
// per second.
|
|
|
|
if most_recent_rate_limit.elapsed().as_secs() > 1 {
|
|
|
|
warn!(
|
|
|
|
"rate limit reached for event creation (sleep for {:?})",
|
|
|
|
wait_for
|
|
|
|
);
|
|
|
|
// reset last rate limit message
|
|
|
|
most_recent_rate_limit = Instant::now();
|
|
|
|
}
|
|
|
|
// block event writes, allowing them to queue up
|
|
|
|
thread::sleep(wait_for);
|
2021-12-30 22:07:21 -05:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-12-11 16:48:59 -05:00
|
|
|
}
|
|
|
|
conn.close().ok();
|
|
|
|
info!("database connection closed");
|
|
|
|
Ok(())
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2021-12-30 14:55:05 -05:00
|
|
|
pub fn db_version(conn: &mut Connection) -> Result<usize> {
|
|
|
|
let query = "PRAGMA user_version;";
|
|
|
|
let curr_version = conn.query_row(query, [], |row| row.get(0))?;
|
|
|
|
Ok(curr_version)
|
|
|
|
}
|
|
|
|
|
2021-12-11 22:43:41 -05:00
|
|
|
/// Persist an event to the database.
|
2021-12-11 16:48:59 -05:00
|
|
|
pub fn write_event(conn: &mut Connection, e: &Event) -> Result<usize> {
|
|
|
|
// start transaction
|
|
|
|
let tx = conn.transaction()?;
|
|
|
|
// get relevant fields from event and convert to blobs.
|
|
|
|
let id_blob = hex::decode(&e.id).ok();
|
|
|
|
let pubkey_blob = hex::decode(&e.pubkey).ok();
|
|
|
|
let event_str = serde_json::to_string(&e).ok();
|
2021-12-11 22:43:41 -05:00
|
|
|
// ignore if the event hash is a duplicate.
|
2021-12-11 16:48:59 -05:00
|
|
|
let ins_count = tx.execute(
|
2021-12-30 14:55:05 -05:00
|
|
|
"INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, strftime('%s','now'), FALSE);",
|
2021-12-11 16:48:59 -05:00
|
|
|
params![id_blob, e.created_at, e.kind, pubkey_blob, event_str]
|
|
|
|
)?;
|
2021-12-11 22:43:41 -05:00
|
|
|
if ins_count == 0 {
|
|
|
|
// if the event was a duplicate, no need to insert event or
|
|
|
|
// pubkey references.
|
|
|
|
return Ok(ins_count);
|
|
|
|
}
|
|
|
|
// remember primary key of the event most recently inserted.
|
2021-12-11 16:48:59 -05:00
|
|
|
let ev_id = tx.last_insert_rowid();
|
2021-12-11 22:43:41 -05:00
|
|
|
// add all event tags into the event_ref table
|
2021-12-11 16:48:59 -05:00
|
|
|
let etags = e.get_event_tags();
|
2021-12-11 22:56:52 -05:00
|
|
|
if !etags.is_empty() {
|
2021-12-11 16:48:59 -05:00
|
|
|
for etag in etags.iter() {
|
|
|
|
tx.execute(
|
|
|
|
"INSERT OR IGNORE INTO event_ref (event_id, referenced_event) VALUES (?1, ?2)",
|
|
|
|
params![ev_id, hex::decode(&etag).ok()],
|
|
|
|
)?;
|
|
|
|
}
|
|
|
|
}
|
2021-12-11 22:43:41 -05:00
|
|
|
// add all event tags into the pubkey_ref table
|
2021-12-11 16:48:59 -05:00
|
|
|
let ptags = e.get_pubkey_tags();
|
2021-12-11 22:56:52 -05:00
|
|
|
if !ptags.is_empty() {
|
2021-12-11 16:48:59 -05:00
|
|
|
for ptag in ptags.iter() {
|
|
|
|
tx.execute(
|
2021-12-12 15:34:52 -05:00
|
|
|
"INSERT OR IGNORE INTO pubkey_ref (event_id, referenced_pubkey) VALUES (?1, ?2)",
|
2021-12-11 16:48:59 -05:00
|
|
|
params![ev_id, hex::decode(&ptag).ok()],
|
|
|
|
)?;
|
|
|
|
}
|
|
|
|
}
|
2021-12-30 14:55:05 -05:00
|
|
|
// if this event is for a metadata update, hide every other kind=0
|
2021-12-30 16:45:03 -05:00
|
|
|
// event from the same author that was issued earlier than this.
|
2021-12-30 14:55:05 -05:00
|
|
|
if e.kind == 0 {
|
|
|
|
let update_count = tx.execute(
|
2021-12-30 16:45:03 -05:00
|
|
|
"UPDATE event SET hidden=TRUE WHERE id!=? AND kind=0 AND author=? AND created_at <= ? and hidden!=TRUE",
|
2021-12-30 14:55:05 -05:00
|
|
|
params![ev_id, hex::decode(&e.pubkey).ok(), e.created_at],
|
|
|
|
)?;
|
|
|
|
if update_count > 0 {
|
|
|
|
info!("hid {} older metadata events", update_count);
|
|
|
|
}
|
|
|
|
}
|
2021-12-30 16:45:03 -05:00
|
|
|
// if this event is for a contact update, hide every other kind=3
|
|
|
|
// event from the same author that was issued earlier than this.
|
|
|
|
if e.kind == 3 {
|
|
|
|
let update_count = tx.execute(
|
|
|
|
"UPDATE event SET hidden=TRUE WHERE id!=? AND kind=3 AND author=? AND created_at <= ? and hidden!=TRUE",
|
|
|
|
params![ev_id, hex::decode(&e.pubkey).ok(), e.created_at],
|
|
|
|
)?;
|
|
|
|
if update_count > 0 {
|
|
|
|
info!("hid {} older contact events", update_count);
|
|
|
|
}
|
|
|
|
}
|
2021-12-11 16:48:59 -05:00
|
|
|
tx.commit()?;
|
|
|
|
Ok(ins_count)
|
|
|
|
}
|
|
|
|
|
2021-12-11 22:43:41 -05:00
|
|
|
/// Event resulting from a specific subscription request
|
2021-12-11 16:48:59 -05:00
|
|
|
#[derive(PartialEq, Debug, Clone)]
|
|
|
|
pub struct QueryResult {
|
2021-12-11 22:43:41 -05:00
|
|
|
/// Subscription identifier
|
2021-12-11 16:48:59 -05:00
|
|
|
pub sub_id: String,
|
2021-12-11 22:43:41 -05:00
|
|
|
/// Serialized event
|
2021-12-11 16:48:59 -05:00
|
|
|
pub event: String,
|
|
|
|
}
|
|
|
|
|
2021-12-11 22:43:41 -05:00
|
|
|
/// Check if a string contains only hex characters.
|
|
|
|
fn is_hex(s: &str) -> bool {
|
2021-12-11 16:48:59 -05:00
|
|
|
s.chars().all(|x| char::is_ascii_hexdigit(&x))
|
|
|
|
}
|
|
|
|
|
2021-12-11 22:43:41 -05:00
|
|
|
/// Create a dynamic SQL query string from a subscription.
|
2021-12-11 16:48:59 -05:00
|
|
|
fn query_from_sub(sub: &Subscription) -> String {
|
|
|
|
// build a dynamic SQL query. all user-input is either an integer
|
|
|
|
// (sqli-safe), or a string that is filtered to only contain
|
|
|
|
// hexadecimal characters.
|
|
|
|
let mut query =
|
|
|
|
"SELECT DISTINCT(e.content) FROM event e LEFT JOIN event_ref er ON e.id=er.event_id LEFT JOIN pubkey_ref pr ON e.id=pr.event_id "
|
|
|
|
.to_owned();
|
|
|
|
// for every filter in the subscription, generate a where clause
|
|
|
|
let mut filter_clauses: Vec<String> = Vec::new();
|
|
|
|
for f in sub.filters.iter() {
|
|
|
|
// individual filter components
|
|
|
|
let mut filter_components: Vec<String> = Vec::new();
|
|
|
|
// Query for "authors"
|
|
|
|
if f.authors.is_some() {
|
|
|
|
let authors_escaped: Vec<String> = f
|
|
|
|
.authors
|
|
|
|
.as_ref()
|
|
|
|
.unwrap()
|
|
|
|
.iter()
|
2021-12-11 22:43:41 -05:00
|
|
|
.filter(|&x| is_hex(x))
|
2021-12-11 16:48:59 -05:00
|
|
|
.map(|x| format!("x'{}'", x))
|
|
|
|
.collect();
|
|
|
|
let authors_clause = format!("author IN ({})", authors_escaped.join(", "));
|
|
|
|
filter_components.push(authors_clause);
|
|
|
|
}
|
|
|
|
// Query for Kind
|
2022-01-01 19:38:52 -05:00
|
|
|
if let Some(ks) = &f.kinds {
|
2021-12-11 16:48:59 -05:00
|
|
|
// kind is number, no escaping needed
|
2022-01-01 19:38:52 -05:00
|
|
|
let str_kinds: Vec<String> = ks.iter().map(|x| x.to_string()).collect();
|
|
|
|
let kind_clause = format!("kind IN ({})", str_kinds.join(", "));
|
2021-12-11 16:48:59 -05:00
|
|
|
filter_components.push(kind_clause);
|
|
|
|
}
|
|
|
|
// Query for event
|
2022-01-01 19:38:52 -05:00
|
|
|
if f.ids.is_some() {
|
|
|
|
let ids_escaped: Vec<String> = f
|
|
|
|
.ids
|
|
|
|
.as_ref()
|
|
|
|
.unwrap()
|
|
|
|
.iter()
|
|
|
|
.filter(|&x| is_hex(x))
|
|
|
|
.map(|x| format!("x'{}'", x))
|
|
|
|
.collect();
|
|
|
|
let id_clause = format!("event_hash IN ({})", ids_escaped.join(", "));
|
|
|
|
filter_components.push(id_clause);
|
2021-12-11 16:48:59 -05:00
|
|
|
}
|
|
|
|
// Query for referenced event
|
2022-01-01 19:38:52 -05:00
|
|
|
if f.events.is_some() {
|
|
|
|
let events_escaped: Vec<String> = f
|
|
|
|
.events
|
|
|
|
.as_ref()
|
|
|
|
.unwrap()
|
|
|
|
.iter()
|
|
|
|
.filter(|&x| is_hex(x))
|
|
|
|
.map(|x| format!("x'{}'", x))
|
|
|
|
.collect();
|
|
|
|
let events_clause = format!("referenced_event IN ({})", events_escaped.join(", "));
|
|
|
|
filter_components.push(events_clause);
|
2021-12-11 16:48:59 -05:00
|
|
|
}
|
2022-01-01 19:38:52 -05:00
|
|
|
// Query for referenced pubkey
|
|
|
|
if f.pubkeys.is_some() {
|
|
|
|
let pubkeys_escaped: Vec<String> = f
|
|
|
|
.pubkeys
|
|
|
|
.as_ref()
|
|
|
|
.unwrap()
|
|
|
|
.iter()
|
|
|
|
.filter(|&x| is_hex(x))
|
|
|
|
.map(|x| format!("x'{}'", x))
|
|
|
|
.collect();
|
|
|
|
let pubkeys_clause = format!("referenced_pubkey IN ({})", pubkeys_escaped.join(", "));
|
|
|
|
filter_components.push(pubkeys_clause);
|
2021-12-11 16:48:59 -05:00
|
|
|
}
|
2022-01-01 19:38:52 -05:00
|
|
|
|
2021-12-11 16:48:59 -05:00
|
|
|
// Query for timestamp
|
|
|
|
if f.since.is_some() {
|
|
|
|
let created_clause = format!("created_at > {}", f.since.unwrap());
|
|
|
|
filter_components.push(created_clause);
|
|
|
|
}
|
2021-12-23 22:38:32 -05:00
|
|
|
// Query for timestamp
|
|
|
|
if f.until.is_some() {
|
|
|
|
let until_clause = format!("created_at < {}", f.until.unwrap());
|
|
|
|
filter_components.push(until_clause);
|
|
|
|
}
|
|
|
|
|
2021-12-11 16:48:59 -05:00
|
|
|
// combine all clauses, and add to filter_clauses
|
2021-12-11 22:56:52 -05:00
|
|
|
if !filter_components.is_empty() {
|
2021-12-11 16:48:59 -05:00
|
|
|
let mut fc = "( ".to_owned();
|
|
|
|
fc.push_str(&filter_components.join(" AND "));
|
|
|
|
fc.push_str(" )");
|
|
|
|
filter_clauses.push(fc);
|
2021-12-12 15:52:55 -05:00
|
|
|
} else {
|
2021-12-30 14:55:05 -05:00
|
|
|
// never display hidden events
|
2021-12-31 17:34:10 -05:00
|
|
|
filter_clauses.push("hidden!=TRUE".to_owned());
|
2021-12-11 16:48:59 -05:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// combine all filters with OR clauses, if any exist
|
2021-12-11 22:56:52 -05:00
|
|
|
if !filter_clauses.is_empty() {
|
2021-12-11 16:48:59 -05:00
|
|
|
query.push_str(" WHERE ");
|
|
|
|
query.push_str(&filter_clauses.join(" OR "));
|
|
|
|
}
|
2021-12-23 22:36:46 -05:00
|
|
|
// add order clause
|
|
|
|
query.push_str(" ORDER BY created_at ASC");
|
2021-12-12 11:03:28 -05:00
|
|
|
debug!("query string: {}", query);
|
2021-12-11 22:43:41 -05:00
|
|
|
query
|
2021-12-11 16:48:59 -05:00
|
|
|
}
|
|
|
|
|
2021-12-11 22:43:41 -05:00
|
|
|
/// Perform a database query using a subscription.
|
|
|
|
///
|
|
|
|
/// The [`Subscription`] is converted into a SQL query. Each result
|
|
|
|
/// is published on the `query_tx` channel as it is returned. If a
|
|
|
|
/// message becomes available on the `abandon_query_rx` channel, the
|
|
|
|
/// query is immediately aborted.
|
2021-12-11 16:48:59 -05:00
|
|
|
pub async fn db_query(
|
|
|
|
sub: Subscription,
|
|
|
|
query_tx: tokio::sync::mpsc::Sender<QueryResult>,
|
|
|
|
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
|
|
|
|
) {
|
|
|
|
task::spawn_blocking(move || {
|
2021-12-31 12:51:57 -05:00
|
|
|
let config = SETTINGS.read().unwrap();
|
|
|
|
let db_dir = &config.database.data_directory;
|
|
|
|
let full_path = Path::new(db_dir).join(DB_FILE);
|
|
|
|
|
2021-12-11 16:48:59 -05:00
|
|
|
let conn =
|
2021-12-31 12:51:57 -05:00
|
|
|
Connection::open_with_flags(&full_path, OpenFlags::SQLITE_OPEN_READ_ONLY).unwrap();
|
2021-12-12 11:03:28 -05:00
|
|
|
debug!("opened database for reading");
|
|
|
|
debug!("going to query for: {:?}", sub);
|
2022-01-01 20:25:09 -05:00
|
|
|
let mut row_count: usize = 0;
|
|
|
|
let start = Instant::now();
|
2021-12-11 22:43:41 -05:00
|
|
|
// generate SQL query
|
2021-12-11 16:48:59 -05:00
|
|
|
let q = query_from_sub(&sub);
|
2021-12-11 22:43:41 -05:00
|
|
|
// execute the query
|
2021-12-11 16:48:59 -05:00
|
|
|
let mut stmt = conn.prepare(&q).unwrap();
|
|
|
|
let mut event_rows = stmt.query([]).unwrap();
|
|
|
|
while let Some(row) = event_rows.next().unwrap() {
|
|
|
|
// check if this is still active (we could do this every N rows)
|
|
|
|
if abandon_query_rx.try_recv().is_ok() {
|
2021-12-11 22:43:41 -05:00
|
|
|
debug!("query aborted");
|
2021-12-11 16:48:59 -05:00
|
|
|
return;
|
|
|
|
}
|
2022-01-01 20:25:09 -05:00
|
|
|
row_count += 1;
|
2021-12-11 22:43:41 -05:00
|
|
|
// TODO: check before unwrapping
|
2021-12-11 16:48:59 -05:00
|
|
|
let event_json = row.get(0).unwrap();
|
|
|
|
query_tx
|
|
|
|
.blocking_send(QueryResult {
|
|
|
|
sub_id: sub.get_id(),
|
|
|
|
event: event_json,
|
|
|
|
})
|
|
|
|
.ok();
|
|
|
|
}
|
2022-01-01 20:25:09 -05:00
|
|
|
debug!(
|
|
|
|
"query completed ({} rows) in {:?}",
|
|
|
|
row_count,
|
|
|
|
start.elapsed()
|
|
|
|
);
|
2021-12-11 16:48:59 -05:00
|
|
|
});
|
|
|
|
}
|