mirror of
https://github.com/scsibug/nostr-rs-relay.git
synced 2024-11-25 01:59:08 -05:00
perf(sqlite): index tags with their kind/created_at fields
This updates the DB schema to remove the distinction between hex and non-hex tag values, for simplicity. The space savings did not seem to be worth the extra complexity. The SQLite tags table is denormalized to duplicate kind/created_at to improve the ability of tag indexes to filter data.
This commit is contained in:
parent
81f8256c37
commit
abc356c17d
|
@ -6,7 +6,7 @@ use crate::event::{single_char_tagname, Event};
|
|||
use crate::hexrange::hex_range;
|
||||
use crate::hexrange::HexSearch;
|
||||
use crate::repo::sqlite_migration::{STARTUP_SQL,upgrade_db};
|
||||
use crate::utils::{is_hex, is_lower_hex};
|
||||
use crate::utils::{is_hex};
|
||||
use crate::nip05::{Nip05Name, VerificationRecord};
|
||||
use crate::subscription::{ReqFilter, Subscription};
|
||||
use crate::server::NostrMetrics;
|
||||
|
@ -123,15 +123,9 @@ impl SqliteRepo {
|
|||
}
|
||||
// check for parameterized replaceable events that would be hidden; don't insert these either.
|
||||
if let Some(d_tag) = e.distinct_param() {
|
||||
let repl_count = if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) {
|
||||
tx.query_row(
|
||||
"SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND e.kind=? AND t.name='d' AND t.value_hex=? AND e.created_at >= ? LIMIT 1;",
|
||||
params![pubkey_blob, e.kind, hex::decode(d_tag).ok(), e.created_at],|row| row.get::<usize, usize>(0))
|
||||
} else {
|
||||
tx.query_row(
|
||||
let repl_count = tx.query_row(
|
||||
"SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND e.kind=? AND t.name='d' AND t.value=? AND e.created_at >= ? LIMIT 1;",
|
||||
params![pubkey_blob, e.kind, d_tag, e.created_at],|row| row.get::<usize, usize>(0))
|
||||
};
|
||||
params![pubkey_blob, e.kind, d_tag, e.created_at],|row| row.get::<usize, usize>(0));
|
||||
// if any rows were returned, then some newer event with
|
||||
// the same author/kind/tag value exist, and we can ignore
|
||||
// this event.
|
||||
|
@ -162,18 +156,10 @@ impl SqliteRepo {
|
|||
let tagchar_opt = single_char_tagname(tagname);
|
||||
match &tagchar_opt {
|
||||
Some(_) => {
|
||||
// if tagvalue is lowercase hex;
|
||||
if is_lower_hex(tagval) && (tagval.len() % 2 == 0) {
|
||||
tx.execute(
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
|
||||
params![ev_id, &tagname, hex::decode(tagval).ok()],
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value, kind, created_at) VALUES (?1, ?2, ?3, ?4, ?5)",
|
||||
params![ev_id, &tagname, &tagval, e.kind, e.created_at],
|
||||
)?;
|
||||
} else {
|
||||
tx.execute(
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)",
|
||||
params![ev_id, &tagname, &tagval],
|
||||
)?;
|
||||
}
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
|
@ -200,15 +186,9 @@ impl SqliteRepo {
|
|||
}
|
||||
// if this event is parameterized replaceable, remove other events.
|
||||
if let Some(d_tag) = e.distinct_param() {
|
||||
let update_count = if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) {
|
||||
tx.execute(
|
||||
"DELETE FROM event WHERE kind=? AND author=? AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=? AND e.author=? AND t.name='d' AND t.value_hex=? ORDER BY created_at DESC LIMIT -1 OFFSET 1);",
|
||||
params![e.kind, pubkey_blob, e.kind, pubkey_blob, hex::decode(d_tag).ok()])?
|
||||
} else {
|
||||
tx.execute(
|
||||
let update_count = tx.execute(
|
||||
"DELETE FROM event WHERE kind=? AND author=? AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=? AND e.author=? AND t.name='d' AND t.value=? ORDER BY created_at DESC LIMIT -1 OFFSET 1);",
|
||||
params![e.kind, pubkey_blob, e.kind, pubkey_blob, d_tag])?
|
||||
};
|
||||
params![e.kind, pubkey_blob, e.kind, pubkey_blob, d_tag])?;
|
||||
if update_count > 0 {
|
||||
info!(
|
||||
"removed {} older parameterized replaceable kind {} events for author: {:?}",
|
||||
|
@ -243,8 +223,8 @@ impl SqliteRepo {
|
|||
// check if a deletion has already been recorded for this event.
|
||||
// Only relevant for non-deletion events
|
||||
let del_count = tx.query_row(
|
||||
"SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND t.name='e' AND e.kind=5 AND t.value_hex=? LIMIT 1;",
|
||||
params![pubkey_blob, id_blob], |row| row.get::<usize, usize>(0));
|
||||
"SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND t.name='e' AND e.kind=5 AND t.value=? LIMIT 1;",
|
||||
params![pubkey_blob, e.id], |row| row.get::<usize, usize>(0));
|
||||
// check if a the query returned a result, meaning we should
|
||||
// hid the current event
|
||||
if del_count.ok().is_some() {
|
||||
|
@ -801,60 +781,46 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
|
|||
if let Some(map) = &f.tags {
|
||||
for (key, val) in map.iter() {
|
||||
let mut str_vals: Vec<Box<dyn ToSql>> = vec![];
|
||||
let mut blob_vals: Vec<Box<dyn ToSql>> = vec![];
|
||||
for v in val {
|
||||
if (v.len() % 2 == 0) && is_lower_hex(v) {
|
||||
if let Ok(h) = hex::decode(v) {
|
||||
blob_vals.push(Box::new(h));
|
||||
}
|
||||
} else {
|
||||
str_vals.push(Box::new(v.clone()));
|
||||
}
|
||||
}
|
||||
// do not mix value and value_hex; this is a temporary special case.
|
||||
if str_vals.is_empty() {
|
||||
// create clauses with "?" params for each tag value being searched
|
||||
let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
|
||||
let str_clause = format!("AND value IN ({})", repeat_vars(str_vals.len()));
|
||||
// find evidence of the target tag name/value existing for this event.
|
||||
let tag_clause = format!(
|
||||
"e.id IN (SELECT t.event_id FROM tag t WHERE (name=? AND {blob_clause}))",
|
||||
);
|
||||
// add the tag name as the first parameter
|
||||
params.push(Box::new(key.to_string()));
|
||||
// add all tag values that are blobs as params
|
||||
params.append(&mut blob_vals);
|
||||
filter_components.push(tag_clause);
|
||||
} else if blob_vals.is_empty() {
|
||||
// create clauses with "?" params for each tag value being searched
|
||||
let str_clause = format!("value IN ({})", repeat_vars(str_vals.len()));
|
||||
// find evidence of the target tag name/value existing for this event.
|
||||
let tag_clause = format!(
|
||||
"e.id IN (SELECT t.event_id FROM tag t WHERE (name=? AND {str_clause}))",
|
||||
);
|
||||
// add the tag name as the first parameter
|
||||
params.push(Box::new(key.to_string()));
|
||||
// add all tag values that are blobs as params
|
||||
params.append(&mut str_vals);
|
||||
filter_components.push(tag_clause);
|
||||
// Query for Kind/Since/Until additionally, to reduce the number of tags that come back.
|
||||
let kind_clause;
|
||||
let since_clause;
|
||||
let until_clause;
|
||||
if let Some(ks) = &f.kinds {
|
||||
// kind is number, no escaping needed
|
||||
let str_kinds: Vec<String> = ks.iter().map(std::string::ToString::to_string).collect();
|
||||
kind_clause = format!("AND kind IN ({})", str_kinds.join(", "));
|
||||
} else {
|
||||
debug!("mixed string/blob query");
|
||||
// create clauses with "?" params for each tag value being searched
|
||||
let str_clause = format!("value IN ({})", repeat_vars(str_vals.len()));
|
||||
let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
|
||||
// find evidence of the target tag name/value existing for this event.
|
||||
kind_clause = format!("");
|
||||
};
|
||||
if f.since.is_some() {
|
||||
since_clause = format!("AND created_at > {}", f.since.unwrap());
|
||||
} else {
|
||||
since_clause = format!("");
|
||||
};
|
||||
// Query for timestamp
|
||||
if f.until.is_some() {
|
||||
until_clause = format!("AND created_at < {}", f.until.unwrap());
|
||||
} else {
|
||||
until_clause = format!("");
|
||||
};
|
||||
|
||||
let tag_clause = format!(
|
||||
"e.id IN (SELECT t.event_id FROM tag t WHERE (name=? AND ({str_clause} OR {blob_clause})))",
|
||||
"e.id IN (SELECT t.event_id FROM tag t WHERE (name=? {str_clause} {kind_clause} {since_clause} {until_clause}))"
|
||||
);
|
||||
|
||||
// add the tag name as the first parameter
|
||||
params.push(Box::new(key.to_string()));
|
||||
// add all tag values that are plain strings as params
|
||||
params.append(&mut str_vals);
|
||||
// add all tag values that are blobs as params
|
||||
params.append(&mut blob_vals);
|
||||
params.append(&mut str_vals);
|
||||
filter_components.push(tag_clause);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Query for timestamp
|
||||
if f.since.is_some() {
|
||||
let created_clause = format!("created_at > {}", f.since.unwrap());
|
||||
|
|
|
@ -10,6 +10,7 @@ use rusqlite::Connection;
|
|||
use std::cmp::Ordering;
|
||||
use std::time::Instant;
|
||||
use tracing::{debug, error, info};
|
||||
use indicatif::{ProgressBar, ProgressStyle};
|
||||
|
||||
/// Startup DB Pragmas
|
||||
pub const STARTUP_SQL: &str = r##"
|
||||
|
@ -22,7 +23,7 @@ pragma mmap_size = 17179869184; -- cap mmap at 16GB
|
|||
"##;
|
||||
|
||||
/// Latest database version
|
||||
pub const DB_VERSION: usize = 15;
|
||||
pub const DB_VERSION: usize = 16;
|
||||
|
||||
/// Schema definition
|
||||
const INIT_SQL: &str = formatcp!(
|
||||
|
@ -65,18 +66,21 @@ CREATE INDEX IF NOT EXISTS author_kind_index ON event(author,kind);
|
|||
-- Tag values are stored as either a BLOB (if they come in as a
|
||||
-- hex-string), or TEXT otherwise.
|
||||
-- This means that searches need to select the appropriate column.
|
||||
-- We duplicate the kind/created_at to make indexes much more efficient.
|
||||
CREATE TABLE IF NOT EXISTS tag (
|
||||
id INTEGER PRIMARY KEY,
|
||||
event_id INTEGER NOT NULL, -- an event ID that contains a tag.
|
||||
name TEXT, -- the tag name ("p", "e", whatever)
|
||||
value TEXT, -- the tag value, if not hex.
|
||||
value_hex BLOB, -- the tag value, if it can be interpreted as a lowercase hex string.
|
||||
created_at INTEGER NOT NULL, -- when the event was authored
|
||||
kind INTEGER NOT NULL, -- event kind
|
||||
FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS tag_val_index ON tag(value);
|
||||
CREATE INDEX IF NOT EXISTS tag_val_hex_index ON tag(value_hex);
|
||||
CREATE INDEX IF NOT EXISTS tag_composite_index ON tag(event_id,name,value_hex,value);
|
||||
CREATE INDEX IF NOT EXISTS tag_name_eid_index ON tag(name,event_id,value_hex);
|
||||
CREATE INDEX IF NOT EXISTS tag_composite_index ON tag(event_id,name,value);
|
||||
CREATE INDEX IF NOT EXISTS tag_name_eid_index ON tag(name,event_id,value);
|
||||
CREATE INDEX IF NOT EXISTS tag_covering_index ON tag(name,kind,value,created_at,event_id);
|
||||
|
||||
-- NIP-05 User Validation
|
||||
CREATE TABLE IF NOT EXISTS user_verification (
|
||||
|
@ -201,6 +205,9 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<usize> {
|
|||
if curr_version == 14 {
|
||||
curr_version = mig_14_to_15(conn)?;
|
||||
}
|
||||
if curr_version == 15 {
|
||||
curr_version = mig_15_to_16(conn)?;
|
||||
}
|
||||
|
||||
if curr_version == DB_VERSION {
|
||||
info!(
|
||||
|
@ -652,3 +659,73 @@ PRAGMA user_version = 15;
|
|||
}
|
||||
Ok(15)
|
||||
}
|
||||
|
||||
fn mig_15_to_16(conn: &mut PooledConnection) -> Result<usize> {
|
||||
let count = db_event_count(conn)?;
|
||||
info!("database schema needs update from 15->16");
|
||||
let upgrade_sql = r##"
|
||||
DROP TABLE tag;
|
||||
CREATE TABLE tag (
|
||||
id INTEGER PRIMARY KEY,
|
||||
event_id INTEGER NOT NULL, -- an event ID that contains a tag.
|
||||
name TEXT, -- the tag name ("p", "e", whatever)
|
||||
value TEXT, -- the tag value, if not hex.
|
||||
created_at INTEGER NOT NULL, -- when the event was authored
|
||||
kind INTEGER NOT NULL, -- event kind
|
||||
FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS tag_val_index ON tag(value);
|
||||
CREATE INDEX IF NOT EXISTS tag_composite_index ON tag(event_id,name,value);
|
||||
CREATE INDEX IF NOT EXISTS tag_name_eid_index ON tag(name,event_id,value);
|
||||
CREATE INDEX IF NOT EXISTS tag_covering_index ON tag(name,kind,value,created_at,event_id);
|
||||
"##;
|
||||
|
||||
let start = Instant::now();
|
||||
let tx = conn.transaction()?;
|
||||
|
||||
let bar = ProgressBar::new(count.try_into().unwrap())
|
||||
.with_message("rebuilding tags table");
|
||||
bar.set_style(
|
||||
ProgressStyle::with_template(
|
||||
"[{elapsed_precise}] {bar:40.white/blue} {pos:>7}/{len:7} [{percent}%] {msg}",
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
{
|
||||
tx.execute_batch(upgrade_sql)?;
|
||||
let mut stmt = tx.prepare("select id, kind, created_at, content from event order by id;")?;
|
||||
let mut tag_rows = stmt.query([])?;
|
||||
let mut count = 0;
|
||||
while let Some(row) = tag_rows.next()? {
|
||||
count += 1;
|
||||
if count%10==0 {
|
||||
bar.inc(10);
|
||||
}
|
||||
let event_id: u64 = row.get(0)?;
|
||||
let kind: u64 = row.get(1)?;
|
||||
let created_at: u64 = row.get(2)?;
|
||||
let event_json: String = row.get(3)?;
|
||||
let event: Event = serde_json::from_str(&event_json)?;
|
||||
// look at each event, and each tag, creating new tag entries if appropriate.
|
||||
for t in event.tags.iter().filter(|x| x.len() > 1) {
|
||||
let tagname = t.get(0).unwrap();
|
||||
let tagnamechar_opt = single_char_tagname(tagname);
|
||||
if tagnamechar_opt.is_none() {
|
||||
continue;
|
||||
}
|
||||
// safe because len was > 1
|
||||
let tagval = t.get(1).unwrap();
|
||||
// otherwise, insert as text
|
||||
tx.execute(
|
||||
"INSERT INTO tag (event_id, name, value, kind, created_at) VALUES (?1, ?2, ?3, ?4, ?5);",
|
||||
params![event_id, tagname, &tagval, kind, created_at],
|
||||
)?;
|
||||
}
|
||||
}
|
||||
tx.execute("PRAGMA user_version = 16;", [])?;
|
||||
}
|
||||
bar.finish();
|
||||
tx.commit()?;
|
||||
info!("database schema upgraded v15 -> v16 in {:?}", start.elapsed());
|
||||
Ok(16)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user