mirror of
https://github.com/scsibug/nostr-rs-relay.git
synced 2024-11-09 21:29:06 -05:00
refactor: move db migrations into isolated functions
This commit is contained in:
parent
2af5f9fbe8
commit
3e8adf978f
369
src/schema.rs
369
src/schema.rs
|
@ -83,16 +83,32 @@ CREATE INDEX IF NOT EXISTS user_verification_event_index ON user_verification(me
|
||||||
);
|
);
|
||||||
|
|
||||||
/// Determine the current application database schema version.
|
/// Determine the current application database schema version.
|
||||||
pub fn db_version(conn: &mut Connection) -> Result<usize> {
|
pub fn curr_db_version(conn: &mut Connection) -> Result<usize> {
|
||||||
let query = "PRAGMA user_version;";
|
let query = "PRAGMA user_version;";
|
||||||
let curr_version = conn.query_row(query, [], |row| row.get(0))?;
|
let curr_version = conn.query_row(query, [], |row| row.get(0))?;
|
||||||
Ok(curr_version)
|
Ok(curr_version)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn mig_init(conn: &mut PooledConnection) -> Result<usize> {
|
||||||
|
match conn.execute_batch(INIT_SQL) {
|
||||||
|
Ok(()) => {
|
||||||
|
info!(
|
||||||
|
"database pragma/schema initialized to v{}, and ready",
|
||||||
|
DB_VERSION
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
error!("update failed: {}", err);
|
||||||
|
panic!("database could not be initialized");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(DB_VERSION)
|
||||||
|
}
|
||||||
|
|
||||||
/// Upgrade DB to latest version, and execute pragma settings
|
/// Upgrade DB to latest version, and execute pragma settings
|
||||||
pub fn upgrade_db(conn: &mut PooledConnection) -> Result<()> {
|
pub fn upgrade_db(conn: &mut PooledConnection) -> Result<()> {
|
||||||
// check the version.
|
// check the version.
|
||||||
let mut curr_version = db_version(conn)?;
|
let mut curr_version = curr_db_version(conn)?;
|
||||||
info!("DB version = {:?}", curr_version);
|
info!("DB version = {:?}", curr_version);
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
|
@ -113,184 +129,34 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<()> {
|
||||||
Ordering::Less => {
|
Ordering::Less => {
|
||||||
// initialize from scratch
|
// initialize from scratch
|
||||||
if curr_version == 0 {
|
if curr_version == 0 {
|
||||||
match conn.execute_batch(INIT_SQL) {
|
curr_version = mig_init(conn)?;
|
||||||
Ok(()) => {
|
|
||||||
info!("database pragma/schema initialized to v6, and ready");
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
error!("update failed: {}", err);
|
|
||||||
panic!("database could not be initialized");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
// for initialized but out-of-date schemas, proceed to
|
||||||
|
// upgrade sequentially until we are current.
|
||||||
if curr_version == 1 {
|
if curr_version == 1 {
|
||||||
// only change is adding a hidden column to events.
|
curr_version = mig_1_to_2(conn)?;
|
||||||
let upgrade_sql = r##"
|
|
||||||
ALTER TABLE event ADD hidden INTEGER;
|
|
||||||
UPDATE event SET hidden=FALSE;
|
|
||||||
PRAGMA user_version = 2;
|
|
||||||
"##;
|
|
||||||
match conn.execute_batch(upgrade_sql) {
|
|
||||||
Ok(()) => {
|
|
||||||
info!("database schema upgraded v1 -> v2");
|
|
||||||
curr_version = 2;
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
error!("update failed: {}", err);
|
|
||||||
panic!("database could not be upgraded");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if curr_version == 2 {
|
if curr_version == 2 {
|
||||||
// this version lacks the tag column
|
curr_version = mig_2_to_3(conn)?;
|
||||||
info!("database schema needs update from 2->3");
|
|
||||||
let upgrade_sql = r##"
|
|
||||||
CREATE TABLE IF NOT EXISTS tag (
|
|
||||||
id INTEGER PRIMARY KEY,
|
|
||||||
event_id INTEGER NOT NULL, -- an event ID that contains a tag.
|
|
||||||
name TEXT, -- the tag name ("p", "e", whatever)
|
|
||||||
value TEXT, -- the tag value, if not hex.
|
|
||||||
value_hex BLOB, -- the tag value, if it can be interpreted as a hex string.
|
|
||||||
FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
|
|
||||||
);
|
|
||||||
PRAGMA user_version = 3;
|
|
||||||
"##;
|
|
||||||
// TODO: load existing refs into tag table
|
|
||||||
match conn.execute_batch(upgrade_sql) {
|
|
||||||
Ok(()) => {
|
|
||||||
info!("database schema upgraded v2 -> v3");
|
|
||||||
curr_version = 3;
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
error!("update failed: {}", err);
|
|
||||||
panic!("database could not be upgraded");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// iterate over every event/pubkey tag
|
|
||||||
let tx = conn.transaction()?;
|
|
||||||
{
|
|
||||||
let mut stmt = tx.prepare("select event_id, \"e\", lower(hex(referenced_event)) from event_ref union select event_id, \"p\", lower(hex(referenced_pubkey)) from pubkey_ref;")?;
|
|
||||||
let mut tag_rows = stmt.query([])?;
|
|
||||||
while let Some(row) = tag_rows.next()? {
|
|
||||||
// we want to capture the event_id that had the tag, the tag name, and the tag hex value.
|
|
||||||
let event_id: u64 = row.get(0)?;
|
|
||||||
let tag_name: String = row.get(1)?;
|
|
||||||
let tag_value: String = row.get(2)?;
|
|
||||||
// this will leave behind p/e tags that were non-hex, but they are invalid anyways.
|
|
||||||
if is_lower_hex(&tag_value) {
|
|
||||||
tx.execute(
|
|
||||||
"INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);",
|
|
||||||
params![event_id, tag_name, hex::decode(&tag_value).ok()],
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tx.commit()?;
|
|
||||||
info!("Upgrade complete");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if curr_version == 3 {
|
if curr_version == 3 {
|
||||||
info!("database schema needs update from 3->4");
|
curr_version = mig_3_to_4(conn)?;
|
||||||
let upgrade_sql = r##"
|
|
||||||
-- incoming metadata events with nip05
|
|
||||||
CREATE TABLE IF NOT EXISTS user_verification (
|
|
||||||
id INTEGER PRIMARY KEY,
|
|
||||||
metadata_event INTEGER NOT NULL, -- the metadata event used for this validation.
|
|
||||||
name TEXT NOT NULL, -- the nip05 field value (user@domain).
|
|
||||||
verified_at INTEGER, -- timestamp this author/nip05 was most recently verified.
|
|
||||||
failed_at INTEGER, -- timestamp a verification attempt failed (host down).
|
|
||||||
failure_count INTEGER DEFAULT 0, -- number of consecutive failures.
|
|
||||||
FOREIGN KEY(metadata_event) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
|
|
||||||
);
|
|
||||||
CREATE INDEX IF NOT EXISTS user_verification_name_index ON user_verification(name);
|
|
||||||
CREATE INDEX IF NOT EXISTS user_verification_event_index ON user_verification(metadata_event);
|
|
||||||
PRAGMA user_version = 4;
|
|
||||||
"##;
|
|
||||||
match conn.execute_batch(upgrade_sql) {
|
|
||||||
Ok(()) => {
|
|
||||||
info!("database schema upgraded v3 -> v4");
|
|
||||||
curr_version = 4;
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
error!("update failed: {}", err);
|
|
||||||
panic!("database could not be upgraded");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if curr_version == 4 {
|
if curr_version == 4 {
|
||||||
info!("database schema needs update from 4->5");
|
curr_version = mig_4_to_5(conn)?;
|
||||||
let upgrade_sql = r##"
|
|
||||||
DROP TABLE IF EXISTS event_ref;
|
|
||||||
DROP TABLE IF EXISTS pubkey_ref;
|
|
||||||
PRAGMA user_version=5;
|
|
||||||
"##;
|
|
||||||
match conn.execute_batch(upgrade_sql) {
|
|
||||||
Ok(()) => {
|
|
||||||
info!("database schema upgraded v4 -> v5");
|
|
||||||
// uncomment if we have a newer version
|
|
||||||
//curr_version = 5;
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
error!("update failed: {}", err);
|
|
||||||
panic!("database could not be upgraded");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if curr_version == 5 {
|
if curr_version == 5 {
|
||||||
info!("database schema needs update from 5->6");
|
curr_version = mig_5_to_6(conn)?;
|
||||||
// We need to rebuild the tags table. iterate through the
|
}
|
||||||
// event table. build event from json, insert tags into a
|
if curr_version == DB_VERSION {
|
||||||
// fresh tag table. This was needed due to a logic error in
|
info!(
|
||||||
// how hex-like tags got indexed.
|
"All migration scripts completed successfully. Welcome to v{}.",
|
||||||
let start = Instant::now();
|
DB_VERSION
|
||||||
let tx = conn.transaction()?;
|
);
|
||||||
{
|
|
||||||
// Clear out table
|
|
||||||
tx.execute("DELETE FROM tag;", [])?;
|
|
||||||
let mut stmt = tx.prepare("select id, content from event order by id;")?;
|
|
||||||
let mut tag_rows = stmt.query([])?;
|
|
||||||
while let Some(row) = tag_rows.next()? {
|
|
||||||
// we want to capture the event_id that had the tag, the tag name, and the tag hex value.
|
|
||||||
let event_id: u64 = row.get(0)?;
|
|
||||||
let event_json: String = row.get(1)?;
|
|
||||||
let event: Event = serde_json::from_str(&event_json)?;
|
|
||||||
// look at each event, and each tag, creating new tag entries if appropriate.
|
|
||||||
for t in event.tags.iter().filter(|x| x.len() > 1) {
|
|
||||||
let tagname = t.get(0).unwrap();
|
|
||||||
let tagnamechar_opt = single_char_tagname(tagname);
|
|
||||||
if tagnamechar_opt.is_none() {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// safe because len was > 1
|
|
||||||
let tagval = t.get(1).unwrap();
|
|
||||||
// insert as BLOB if we can restore it losslessly.
|
|
||||||
// this means it needs to be even length and lowercase.
|
|
||||||
if (tagval.len() % 2 == 0) && is_lower_hex(tagval) {
|
|
||||||
tx.execute(
|
|
||||||
"INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);",
|
|
||||||
params![event_id, tagname, hex::decode(&tagval).ok()],
|
|
||||||
)?;
|
|
||||||
} else {
|
|
||||||
// otherwise, insert as text
|
|
||||||
tx.execute(
|
|
||||||
"INSERT INTO tag (event_id, name, value) VALUES (?1, ?2, ?3);",
|
|
||||||
params![event_id, tagname, &tagval],
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tx.execute("PRAGMA user_version = 6;", [])?;
|
|
||||||
}
|
|
||||||
tx.commit()?;
|
|
||||||
info!("database schema upgraded v5 -> v6 in {:?}", start.elapsed());
|
|
||||||
// vacuum after large table modification
|
|
||||||
let start = Instant::now();
|
|
||||||
conn.execute("VACUUM;", [])?;
|
|
||||||
info!("vacuumed DB after tags rebuild in {:?}", start.elapsed());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Database is current, all is good
|
// Database is current, all is good
|
||||||
|
@ -311,3 +177,174 @@ PRAGMA user_version=5;
|
||||||
debug!("SQLite PRAGMA startup completed");
|
debug!("SQLite PRAGMA startup completed");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//// Migration Scripts
|
||||||
|
|
||||||
|
fn mig_1_to_2(conn: &mut PooledConnection) -> Result<usize> {
|
||||||
|
// only change is adding a hidden column to events.
|
||||||
|
let upgrade_sql = r##"
|
||||||
|
ALTER TABLE event ADD hidden INTEGER;
|
||||||
|
UPDATE event SET hidden=FALSE;
|
||||||
|
PRAGMA user_version = 2;
|
||||||
|
"##;
|
||||||
|
match conn.execute_batch(upgrade_sql) {
|
||||||
|
Ok(()) => {
|
||||||
|
info!("database schema upgraded v1 -> v2");
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
error!("update failed: {}", err);
|
||||||
|
panic!("database could not be upgraded");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(2)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn mig_2_to_3(conn: &mut PooledConnection) -> Result<usize> {
|
||||||
|
// this version lacks the tag column
|
||||||
|
info!("database schema needs update from 2->3");
|
||||||
|
let upgrade_sql = r##"
|
||||||
|
CREATE TABLE IF NOT EXISTS tag (
|
||||||
|
id INTEGER PRIMARY KEY,
|
||||||
|
event_id INTEGER NOT NULL, -- an event ID that contains a tag.
|
||||||
|
name TEXT, -- the tag name ("p", "e", whatever)
|
||||||
|
value TEXT, -- the tag value, if not hex.
|
||||||
|
value_hex BLOB, -- the tag value, if it can be interpreted as a hex string.
|
||||||
|
FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
|
||||||
|
);
|
||||||
|
PRAGMA user_version = 3;
|
||||||
|
"##;
|
||||||
|
// TODO: load existing refs into tag table
|
||||||
|
match conn.execute_batch(upgrade_sql) {
|
||||||
|
Ok(()) => {
|
||||||
|
info!("database schema upgraded v2 -> v3");
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
error!("update failed: {}", err);
|
||||||
|
panic!("database could not be upgraded");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// iterate over every event/pubkey tag
|
||||||
|
let tx = conn.transaction()?;
|
||||||
|
{
|
||||||
|
let mut stmt = tx.prepare("select event_id, \"e\", lower(hex(referenced_event)) from event_ref union select event_id, \"p\", lower(hex(referenced_pubkey)) from pubkey_ref;")?;
|
||||||
|
let mut tag_rows = stmt.query([])?;
|
||||||
|
while let Some(row) = tag_rows.next()? {
|
||||||
|
// we want to capture the event_id that had the tag, the tag name, and the tag hex value.
|
||||||
|
let event_id: u64 = row.get(0)?;
|
||||||
|
let tag_name: String = row.get(1)?;
|
||||||
|
let tag_value: String = row.get(2)?;
|
||||||
|
// this will leave behind p/e tags that were non-hex, but they are invalid anyways.
|
||||||
|
if is_lower_hex(&tag_value) {
|
||||||
|
tx.execute(
|
||||||
|
"INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);",
|
||||||
|
params![event_id, tag_name, hex::decode(&tag_value).ok()],
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
info!("Updated tag values");
|
||||||
|
tx.commit()?;
|
||||||
|
Ok(3)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn mig_3_to_4(conn: &mut PooledConnection) -> Result<usize> {
|
||||||
|
info!("database schema needs update from 3->4");
|
||||||
|
let upgrade_sql = r##"
|
||||||
|
-- incoming metadata events with nip05
|
||||||
|
CREATE TABLE IF NOT EXISTS user_verification (
|
||||||
|
id INTEGER PRIMARY KEY,
|
||||||
|
metadata_event INTEGER NOT NULL, -- the metadata event used for this validation.
|
||||||
|
name TEXT NOT NULL, -- the nip05 field value (user@domain).
|
||||||
|
verified_at INTEGER, -- timestamp this author/nip05 was most recently verified.
|
||||||
|
failed_at INTEGER, -- timestamp a verification attempt failed (host down).
|
||||||
|
failure_count INTEGER DEFAULT 0, -- number of consecutive failures.
|
||||||
|
FOREIGN KEY(metadata_event) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS user_verification_name_index ON user_verification(name);
|
||||||
|
CREATE INDEX IF NOT EXISTS user_verification_event_index ON user_verification(metadata_event);
|
||||||
|
PRAGMA user_version = 4;
|
||||||
|
"##;
|
||||||
|
match conn.execute_batch(upgrade_sql) {
|
||||||
|
Ok(()) => {
|
||||||
|
info!("database schema upgraded v3 -> v4");
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
error!("update failed: {}", err);
|
||||||
|
panic!("database could not be upgraded");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(4)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn mig_4_to_5(conn: &mut PooledConnection) -> Result<usize> {
|
||||||
|
info!("database schema needs update from 4->5");
|
||||||
|
let upgrade_sql = r##"
|
||||||
|
DROP TABLE IF EXISTS event_ref;
|
||||||
|
DROP TABLE IF EXISTS pubkey_ref;
|
||||||
|
PRAGMA user_version=5;
|
||||||
|
"##;
|
||||||
|
match conn.execute_batch(upgrade_sql) {
|
||||||
|
Ok(()) => {
|
||||||
|
info!("database schema upgraded v4 -> v5");
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
error!("update failed: {}", err);
|
||||||
|
panic!("database could not be upgraded");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(5)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn mig_5_to_6(conn: &mut PooledConnection) -> Result<usize> {
|
||||||
|
info!("database schema needs update from 5->6");
|
||||||
|
// We need to rebuild the tags table. iterate through the
|
||||||
|
// event table. build event from json, insert tags into a
|
||||||
|
// fresh tag table. This was needed due to a logic error in
|
||||||
|
// how hex-like tags got indexed.
|
||||||
|
let start = Instant::now();
|
||||||
|
let tx = conn.transaction()?;
|
||||||
|
{
|
||||||
|
// Clear out table
|
||||||
|
tx.execute("DELETE FROM tag;", [])?;
|
||||||
|
let mut stmt = tx.prepare("select id, content from event order by id;")?;
|
||||||
|
let mut tag_rows = stmt.query([])?;
|
||||||
|
while let Some(row) = tag_rows.next()? {
|
||||||
|
// we want to capture the event_id that had the tag, the tag name, and the tag hex value.
|
||||||
|
let event_id: u64 = row.get(0)?;
|
||||||
|
let event_json: String = row.get(1)?;
|
||||||
|
let event: Event = serde_json::from_str(&event_json)?;
|
||||||
|
// look at each event, and each tag, creating new tag entries if appropriate.
|
||||||
|
for t in event.tags.iter().filter(|x| x.len() > 1) {
|
||||||
|
let tagname = t.get(0).unwrap();
|
||||||
|
let tagnamechar_opt = single_char_tagname(tagname);
|
||||||
|
if tagnamechar_opt.is_none() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// safe because len was > 1
|
||||||
|
let tagval = t.get(1).unwrap();
|
||||||
|
// insert as BLOB if we can restore it losslessly.
|
||||||
|
// this means it needs to be even length and lowercase.
|
||||||
|
if (tagval.len() % 2 == 0) && is_lower_hex(tagval) {
|
||||||
|
tx.execute(
|
||||||
|
"INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);",
|
||||||
|
params![event_id, tagname, hex::decode(&tagval).ok()],
|
||||||
|
)?;
|
||||||
|
} else {
|
||||||
|
// otherwise, insert as text
|
||||||
|
tx.execute(
|
||||||
|
"INSERT INTO tag (event_id, name, value) VALUES (?1, ?2, ?3);",
|
||||||
|
params![event_id, tagname, &tagval],
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tx.execute("PRAGMA user_version = 6;", [])?;
|
||||||
|
}
|
||||||
|
tx.commit()?;
|
||||||
|
info!("database schema upgraded v5 -> v6 in {:?}", start.elapsed());
|
||||||
|
// vacuum after large table modification
|
||||||
|
let start = Instant::now();
|
||||||
|
conn.execute("VACUUM;", [])?;
|
||||||
|
info!("vacuumed DB after tags rebuild in {:?}", start.elapsed());
|
||||||
|
Ok(6)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user