diff --git a/src/db.rs b/src/db.rs index 1a026ff..cd74df2 100644 --- a/src/db.rs +++ b/src/db.rs @@ -13,6 +13,13 @@ use tokio::task; /// Database file const DB_FILE: &str = "nostr.db"; +/// Startup DB Pragmas +const STARTUP_SQL: &str = r##" +PRAGMA main.synchronous=NORMAL; +PRAGMA foreign_keys = ON; +pragma mmap_size = 536870912; -- 512MB of mmap +"##; + /// Schema definition const INIT_SQL: &str = r##" -- Database settings @@ -21,8 +28,7 @@ PRAGMA journal_mode=WAL; PRAGMA main.synchronous=NORMAL; PRAGMA foreign_keys = ON; PRAGMA application_id = 1654008667; -PRAGMA user_version = 1; -pragma mmap_size = 536870912; -- 512MB of mmap +PRAGMA user_version = 2; -- Event Table CREATE TABLE IF NOT EXISTS event ( @@ -32,6 +38,7 @@ first_seen INTEGER NOT NULL, -- when the event was first seen (not authored!) (s created_at INTEGER NOT NULL, -- when the event was authored author BLOB NOT NULL, -- author pubkey kind INTEGER NOT NULL, -- event kind +hidden INTEGER, -- relevant for queries content TEXT NOT NULL -- serialized json of event object ); @@ -64,6 +71,45 @@ FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE RESTRICT ON DELETE CASCADE CREATE INDEX IF NOT EXISTS pubkey_ref_index ON pubkey_ref(referenced_pubkey); "##; +/// Upgrade DB to latest version, and execute pragma settings +pub fn upgrade_db(conn: &mut Connection) -> Result<()> { + // check the version. + let curr_version = db_version(conn)?; + info!("DB version = {:?}", curr_version); + + // initialize from scratch + if curr_version == 0 { + match conn.execute_batch(INIT_SQL) { + Ok(()) => info!("database pragma/schema initialized to v2, and ready"), + Err(err) => { + error!("update failed: {}", err); + panic!("database could not be initialized"); + } + } + } else if curr_version == 1 { + // only change is adding a hidden column to events. + let upgrade_sql = r##" +ALTER TABLE event ADD hidden INTEGER; +UPDATE event SET hidden=FALSE; +PRAGMA user_version = 2; +"##; + match conn.execute_batch(upgrade_sql) { + Ok(()) => info!("database schema upgraded v1 -> v2"), + Err(err) => { + error!("update failed: {}", err); + panic!("database could not be upgraded"); + } + } + } else if curr_version == 2 { + debug!("Database version was already current"); + } else if curr_version > 2 { + panic!("Database version is newer than supported by this executable"); + } + // Setup PRAGMA + conn.execute_batch(STARTUP_SQL)?; + Ok(()) +} + /// Spawn a database writer that persists events to the SQLite store. pub async fn db_writer( mut event_rx: tokio::sync::mpsc::Receiver, @@ -75,12 +121,13 @@ pub async fn db_writer( OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE, )?; info!("opened database for writing"); + upgrade_db(&mut conn)?; + // if version is zero, then we need to initialize from scratch. + // if version is one, we need to upgrade. + // if version is two, we are at the latest! + // TODO: determine if we need to execute the init script. // TODO: check database app id / version before proceeding. - match conn.execute_batch(INIT_SQL) { - Ok(()) => info!("database pragma/schema initialized and ready"), - Err(err) => error!("update failed: {}", err), - } loop { // call blocking read on channel let next_event = event_rx.blocking_recv(); @@ -110,6 +157,12 @@ pub async fn db_writer( }) } +pub fn db_version(conn: &mut Connection) -> Result { + let query = "PRAGMA user_version;"; + let curr_version = conn.query_row(query, [], |row| row.get(0))?; + Ok(curr_version) +} + /// Persist an event to the database. pub fn write_event(conn: &mut Connection, e: &Event) -> Result { // start transaction @@ -120,7 +173,7 @@ pub fn write_event(conn: &mut Connection, e: &Event) -> Result { let event_str = serde_json::to_string(&e).ok(); // ignore if the event hash is a duplicate. let ins_count = tx.execute( - "INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, content, first_seen) VALUES (?1, ?2, ?3, ?4, ?5, strftime('%s','now'));", + "INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, strftime('%s','now'), FALSE);", params![id_blob, e.created_at, e.kind, pubkey_blob, event_str] )?; if ins_count == 0 { @@ -150,6 +203,17 @@ pub fn write_event(conn: &mut Connection, e: &Event) -> Result { )?; } } + // if this event is for a metadata update, hide every other kind=0 + // event from the same author that was issued earlier + if e.kind == 0 { + let update_count = tx.execute( + "UPDATE event SET hidden=TRUE WHERE id!=? AND kind=0 AND author=? AND created_at <= ?", + params![ev_id, hex::decode(&e.pubkey).ok(), e.created_at], + )?; + if update_count > 0 { + info!("hid {} older metadata events", update_count); + } + } tx.commit()?; Ok(ins_count) } @@ -242,8 +306,8 @@ fn query_from_sub(sub: &Subscription) -> String { fc.push_str(" )"); filter_clauses.push(fc); } else { - // if there are no filter clauses, we should return everything - filter_clauses.push(" 1=1 ".to_owned()); + // never display hidden events + filter_clauses.push("hidden!=FALSE".to_owned()); } }