diff --git a/src/bin/bulkloader.rs b/src/bin/bulkloader.rs new file mode 100644 index 0000000..854ca52 --- /dev/null +++ b/src/bin/bulkloader.rs @@ -0,0 +1,176 @@ +use std::io; +use std::path::Path; +use nostr_rs_relay::utils::is_lower_hex; +use tracing::*; +use nostr_rs_relay::config; +use nostr_rs_relay::event::{Event,single_char_tagname}; +use nostr_rs_relay::error::{Error, Result}; +use nostr_rs_relay::db::build_pool; +use nostr_rs_relay::schema::{curr_db_version, DB_VERSION}; +use rusqlite::{OpenFlags, Transaction}; +use nostr_rs_relay::db::PooledConnection; +use std::sync::mpsc; +use std::thread; +use rusqlite::params; + +/// Bulk load JSONL data from STDIN to the database specified in config.toml (or ./nostr.db as a default). +/// The database must already exist, this will not create a new one. +/// Tested against schema v13. + +pub fn main() -> Result<()> { + let _trace_sub = tracing_subscriber::fmt::try_init(); + println!("Nostr-rs-relay Bulk Loader"); + // check for a database file, or create one. + let settings = config::Settings::new(); + if !Path::new(&settings.database.data_directory).is_dir() { + info!("Database directory does not exist"); + return Err(Error::DatabaseDirError); + } + // Get a database pool + let pool = build_pool("bulk-loader", &settings, OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE, 1,4,false); + { + // check for database schema version + let mut conn: PooledConnection = pool.get()?; + let version = curr_db_version(&mut conn)?; + info!("current version is: {:?}", version); + // ensure the schema version is current. + if version != DB_VERSION { + info!("version is not current, exiting"); + panic!("cannot write to schema other than v{}", DB_VERSION); + } + } + // this channel will contain parsed events ready to be inserted + let (event_tx, event_rx) = mpsc::sync_channel(100_000); + // Thread for reading events + let _stdin_reader_handler = thread::spawn(move || { + let stdin = io::stdin(); + for readline in stdin.lines() { + if let Ok(line) = readline { + // try to parse a nostr event + let eres: Result = serde_json::from_str(&line); + if let Ok(mut e) = eres { + if let Ok(()) = e.validate() { + e.build_index(); + //debug!("Event: {:?}", e); + event_tx.send(Some(e)).ok(); + } else { + info!("could not validate event"); + } + } else { + info!("error reading event: {:?}", eres); + } + } else { + // error reading + info!("error reading: {:?}", readline); + } + } + info!("finished parsing events"); + event_tx.send(None).ok(); + let ok: Result<()> = Ok(()); + return ok; + }); + let mut conn: PooledConnection = pool.get()?; + let mut events_read = 0; + let event_batch_size =50_000; + let mut new_events = 0; + let mut has_more_events = true; + while has_more_events { + // begin a transaction + let tx = conn.transaction()?; + // read in batch_size events and commit + for _ in 0..event_batch_size { + match event_rx.recv() { + Ok(Some(e)) => { + events_read += 1; + // ignore ephemeral events + if !(e.kind >= 20000 && e.kind < 30000) { + match write_event(&tx, e) { + Ok(c) => { + new_events += c; + }, + Err(e) => { + info!("error inserting event: {:?}", e); + } + } + } + }, + Ok(None) => { + // signal that the sender will never produce more + // events + has_more_events=false; + break; + }, + Err(_) => { + info!("sender is closed"); + // sender is done + } + } + } + info!("committed {} events...", new_events); + tx.commit()?; + conn.execute_batch("pragma wal_checkpoint(truncate)")?; + + } + info!("processed {} events", events_read); + info!("stored {} new events", new_events); + // get a connection for writing events + // read standard in. + info!("finished reading input"); + Ok(()) +} + +/// Write an event and update the tag table. +/// Assumes the event has its index built. +fn write_event(tx: &Transaction, e: Event) -> Result { + let id_blob = hex::decode(&e.id).ok(); + let pubkey_blob: Option> = hex::decode(&e.pubkey).ok(); + let delegator_blob: Option> = e.delegated_by.as_ref().and_then(|d| hex::decode(d).ok()); + let event_str = serde_json::to_string(&e).ok(); + // ignore if the event hash is a duplicate. + let ins_count = tx.execute( + "INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, delegated_by, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%s','now'), FALSE);", + params![id_blob, e.created_at, e.kind, pubkey_blob, delegator_blob, event_str] + )?; + if ins_count == 0 { + return Ok(0); + } + // we want to capture the event_id that had the tag, the tag name, and the tag hex value. + let event_id = tx.last_insert_rowid(); + // look at each event, and each tag, creating new tag entries if appropriate. + for t in e.tags.iter().filter(|x| x.len() > 1) { + let tagname = t.get(0).unwrap(); + let tagnamechar_opt = single_char_tagname(tagname); + if tagnamechar_opt.is_none() { + continue; + } + // safe because len was > 1 + let tagval = t.get(1).unwrap(); + // insert as BLOB if we can restore it losslessly. + // this means it needs to be even length and lowercase. + if (tagval.len() % 2 == 0) && is_lower_hex(tagval) { + tx.execute( + "INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);", + params![event_id, tagname, hex::decode(tagval).ok()], + )?; + } else { + // otherwise, insert as text + tx.execute( + "INSERT INTO tag (event_id, name, value) VALUES (?1, ?2, ?3);", + params![event_id, tagname, &tagval], + )?; + } + } + if e.kind == 0 || e.kind == 3 || e.kind == 41 || (e.kind >= 10000 && e.kind < 20000) { + //let query = "SELECT id FROM event WHERE kind=? AND author=? ORDER BY created_at DESC LIMIT 1;"; + //let count: usize = tx.query_row(query, params![e.kind, pubkey_blob], |row| row.get(0))?; + //info!("found {} rows that /would/ be preserved", count); + match tx.execute( + "DELETE FROM event WHERE kind=? and author=? and id NOT IN (SELECT id FROM event WHERE kind=? AND author=? ORDER BY created_at DESC LIMIT 1);", + params![e.kind, pubkey_blob, e.kind, pubkey_blob], + ) { + Ok(_) => {}, + Err(x) => {info!("error deleting replaceable event: {:?}",x);} + } + } + Ok(ins_count) +}