refactor: create schema module

This commit is contained in:
Greg Heartsfield 2022-02-12 09:58:42 -06:00
parent 753df47443
commit 7056aae227
3 changed files with 252 additions and 243 deletions

244
src/db.rs
View File

@ -6,6 +6,7 @@ use crate::event::Event;
use crate::hexrange::hex_range;
use crate::hexrange::HexSearch;
use crate::nip05;
use crate::schema::{upgrade_db, STARTUP_SQL};
use crate::subscription::Subscription;
use crate::utils::is_hex;
use governor::clock::Clock;
@ -14,7 +15,6 @@ use hex;
use log::*;
use r2d2;
use r2d2_sqlite::SqliteConnectionManager;
use rusqlite::limits::Limit;
use rusqlite::params;
use rusqlite::types::ToSql;
use rusqlite::Connection;
@ -30,105 +30,6 @@ pub type PooledConnection = r2d2::PooledConnection<r2d2_sqlite::SqliteConnection
/// Database file
pub const DB_FILE: &str = "nostr.db";
/// Startup DB Pragmas
const STARTUP_SQL: &str = r##"
PRAGMA main.synchronous=NORMAL;
PRAGMA foreign_keys = ON;
pragma mmap_size = 536870912; -- 512MB of mmap
"##;
/// Schema definition
const INIT_SQL: &str = r##"
-- Database settings
PRAGMA encoding = "UTF-8";
PRAGMA journal_mode=WAL;
PRAGMA main.synchronous=NORMAL;
PRAGMA foreign_keys = ON;
PRAGMA application_id = 1654008667;
PRAGMA user_version = 4;
-- Event Table
CREATE TABLE IF NOT EXISTS event (
id INTEGER PRIMARY KEY,
event_hash BLOB NOT NULL, -- 4-byte hash
first_seen INTEGER NOT NULL, -- when the event was first seen (not authored!) (seconds since 1970)
created_at INTEGER NOT NULL, -- when the event was authored
author BLOB NOT NULL, -- author pubkey
kind INTEGER NOT NULL, -- event kind
hidden INTEGER, -- relevant for queries
content TEXT NOT NULL -- serialized json of event object
);
-- Event Indexes
CREATE UNIQUE INDEX IF NOT EXISTS event_hash_index ON event(event_hash);
CREATE INDEX IF NOT EXISTS created_at_index ON event(created_at);
CREATE INDEX IF NOT EXISTS author_index ON event(author);
CREATE INDEX IF NOT EXISTS kind_index ON event(kind);
-- Tag Table
-- Tag values are stored as either a BLOB (if they come in as a
-- hex-string), or TEXT otherwise.
-- This means that searches need to select the appropriate column.
CREATE TABLE IF NOT EXISTS tag (
id INTEGER PRIMARY KEY,
event_id INTEGER NOT NULL, -- an event ID that contains a tag.
name TEXT, -- the tag name ("p", "e", whatever)
value TEXT, -- the tag value, if not hex.
value_hex BLOB, -- the tag value, if it can be interpreted as a hex string.
FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS tag_val_index ON tag(value);
CREATE INDEX IF NOT EXISTS tag_val_hex_index ON tag(value_hex);
-- Event References Table
CREATE TABLE IF NOT EXISTS event_ref (
id INTEGER PRIMARY KEY,
event_id INTEGER NOT NULL, -- an event ID that contains an #e tag.
referenced_event BLOB NOT NULL, -- the event that is referenced.
FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
);
-- Event References Index
CREATE INDEX IF NOT EXISTS event_ref_index ON event_ref(referenced_event);
-- Pubkey References Table
CREATE TABLE IF NOT EXISTS pubkey_ref (
id INTEGER PRIMARY KEY,
event_id INTEGER NOT NULL, -- an event ID that contains an #p tag.
referenced_pubkey BLOB NOT NULL, -- the pubkey that is referenced.
FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE RESTRICT ON DELETE CASCADE
);
-- Pubkey References Index
CREATE INDEX IF NOT EXISTS pubkey_ref_index ON pubkey_ref(referenced_pubkey);
-- NIP-05 User Validation.
-- This represents the validation of a user.
-- cases;
-- we query, and find a valid result. update verified_at, and proceed.
-- we query, and get a 404/503/host down. update failed_at, and we are done.
-- we query, and get a 200, but the local part is not present with the given address. wipe out verified_at, update failed_at.
-- we need to know how often to query failing validations.
-- two cases, either we get a NIP-05 metadata event regularly that we can use to restart validation.
-- or, we simply get lots of non-metadata events, but the user fixed their NIP-05 host.
-- what should trigger a new attempt? what should trigger cleaning?
-- we will never write anything to the table if it is not valid at least once.
-- we will keep trying at frequency X to re-validate the already-valid nip05s.
-- incoming metadata events with nip05
CREATE TABLE IF NOT EXISTS user_verification (
id INTEGER PRIMARY KEY,
metadata_event INTEGER NOT NULL, -- the metadata event used for this validation.
name TEXT NOT NULL, -- the nip05 field value (user@domain).
verified_at INTEGER, -- timestamp this author/nip05 was most recently verified.
failed_at INTEGER, -- timestamp a verification attempt failed (host down).
failure_count INTEGER DEFAULT 0, -- number of consecutive failures.
FOREIGN KEY(metadata_event) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
);
"##;
// TODO: drop the pubkey_ref and event_ref tables
/// Build a database connection pool.
pub fn build_pool(
name: &str,
@ -173,142 +74,6 @@ pub fn build_conn(flags: OpenFlags) -> Result<Connection> {
Ok(Connection::open_with_flags(&full_path, flags)?)
}
/// Upgrade DB to latest version, and execute pragma settings
pub fn upgrade_db(conn: &mut PooledConnection) -> Result<()> {
// check the version.
let mut curr_version = db_version(conn)?;
info!("DB version = {:?}", curr_version);
debug!(
"SQLite max query parameters: {}",
conn.limit(Limit::SQLITE_LIMIT_VARIABLE_NUMBER)
);
debug!(
"SQLite max table/blob/text length: {} MB",
(conn.limit(Limit::SQLITE_LIMIT_LENGTH) as f64 / (1024 * 1024) as f64).floor()
);
debug!(
"SQLite max SQL length: {} MB",
(conn.limit(Limit::SQLITE_LIMIT_SQL_LENGTH) as f64 / (1024 * 1024) as f64).floor()
);
// initialize from scratch
if curr_version == 0 {
match conn.execute_batch(INIT_SQL) {
Ok(()) => {
info!("database pragma/schema initialized to v4, and ready");
}
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be initialized");
}
}
}
if curr_version == 1 {
// only change is adding a hidden column to events.
let upgrade_sql = r##"
ALTER TABLE event ADD hidden INTEGER;
UPDATE event SET hidden=FALSE;
PRAGMA user_version = 2;
"##;
match conn.execute_batch(upgrade_sql) {
Ok(()) => {
info!("database schema upgraded v1 -> v2");
curr_version = 2;
}
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be upgraded");
}
}
}
if curr_version == 2 {
// this version lacks the tag column
debug!("database schema needs update from 2->3");
let upgrade_sql = r##"
CREATE TABLE IF NOT EXISTS tag (
id INTEGER PRIMARY KEY,
event_id INTEGER NOT NULL, -- an event ID that contains a tag.
name TEXT, -- the tag name ("p", "e", whatever)
value TEXT, -- the tag value, if not hex.
value_hex BLOB, -- the tag value, if it can be interpreted as a hex string.
FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
);
PRAGMA user_version = 3;
"##;
// TODO: load existing refs into tag table
match conn.execute_batch(upgrade_sql) {
Ok(()) => {
info!("database schema upgraded v2 -> v3");
curr_version = 3;
}
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be upgraded");
}
}
info!("Starting transaction");
// iterate over every event/pubkey tag
let tx = conn.transaction()?;
{
let mut stmt = tx.prepare("select event_id, \"e\", lower(hex(referenced_event)) from event_ref union select event_id, \"p\", lower(hex(referenced_pubkey)) from pubkey_ref;")?;
let mut tag_rows = stmt.query([])?;
while let Some(row) = tag_rows.next()? {
// we want to capture the event_id that had the tag, the tag name, and the tag hex value.
let event_id: u64 = row.get(0)?;
let tag_name: String = row.get(1)?;
let tag_value: String = row.get(2)?;
// this will leave behind p/e tags that were non-hex, but they are invalid anyways.
if is_hex(&tag_value) {
tx.execute(
"INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);",
params![event_id, tag_name, hex::decode(&tag_value).ok()],
)?;
}
}
}
tx.commit()?;
info!("Upgrade complete");
}
if curr_version == 3 {
debug!("database schema needs update from 3->4");
let upgrade_sql = r##"
-- incoming metadata events with nip05
CREATE TABLE IF NOT EXISTS user_verification (
id INTEGER PRIMARY KEY,
metadata_event INTEGER NOT NULL, -- the metadata event used for this validation.
name TEXT NOT NULL, -- the nip05 field value (user@domain).
verified_at INTEGER, -- timestamp this author/nip05 was most recently verified.
failed_at INTEGER, -- timestamp a verification attempt failed (host down).
failure_count INTEGER DEFAULT 0, -- number of consecutive failures.
FOREIGN KEY(metadata_event) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS user_verification_author_index ON user_verification(author);
CREATE INDEX IF NOT EXISTS user_verification_author_index ON user_verification(author);
PRAGMA user_version = 4;
"##;
// TODO: load existing refs into tag table
match conn.execute_batch(upgrade_sql) {
Ok(()) => {
info!("database schema upgraded v3 -> v4");
//curr_version = 4;
}
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be upgraded");
}
}
} else if curr_version == 4 {
debug!("Database version was already current");
} else if curr_version > 3 {
panic!("Database version is newer than supported by this executable");
}
// Setup PRAGMA
conn.execute_batch(STARTUP_SQL)?;
debug!("SQLite PRAGMA startup completed");
Ok(())
}
/// Spawn a database writer that persists events to the SQLite store.
pub async fn db_writer(
mut event_rx: tokio::sync::mpsc::Receiver<Event>,
@ -473,13 +238,6 @@ pub async fn db_writer(
})
}
/// Determine the current application database schema version.
pub fn db_version(conn: &mut Connection) -> Result<usize> {
let query = "PRAGMA user_version;";
let curr_version = conn.query_row(query, [], |row| row.get(0))?;
Ok(curr_version)
}
/// Persist an event to the database, returning rows added.
pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
// start transaction

View File

@ -2,6 +2,7 @@ pub mod close;
pub mod config;
pub mod conn;
pub mod db;
pub mod schema;
pub mod error;
pub mod event;
pub mod hexrange;

250
src/schema.rs Normal file
View File

@ -0,0 +1,250 @@
//! Database schema and migrations
use crate::db::PooledConnection;
use crate::error::Result;
use crate::utils::is_hex;
use log::*;
use rusqlite::limits::Limit;
use rusqlite::params;
use rusqlite::Connection;
// TODO: drop the pubkey_ref and event_ref tables
/// Startup DB Pragmas
pub const STARTUP_SQL: &str = r##"
PRAGMA main.synchronous=NORMAL;
PRAGMA foreign_keys = ON;
pragma mmap_size = 536870912; -- 512MB of mmap
"##;
/// Schema definition
const INIT_SQL: &str = r##"
-- Database settings
PRAGMA encoding = "UTF-8";
PRAGMA journal_mode=WAL;
PRAGMA main.synchronous=NORMAL;
PRAGMA foreign_keys = ON;
PRAGMA application_id = 1654008667;
PRAGMA user_version = 4;
-- Event Table
CREATE TABLE IF NOT EXISTS event (
id INTEGER PRIMARY KEY,
event_hash BLOB NOT NULL, -- 4-byte hash
first_seen INTEGER NOT NULL, -- when the event was first seen (not authored!) (seconds since 1970)
created_at INTEGER NOT NULL, -- when the event was authored
author BLOB NOT NULL, -- author pubkey
kind INTEGER NOT NULL, -- event kind
hidden INTEGER, -- relevant for queries
content TEXT NOT NULL -- serialized json of event object
);
-- Event Indexes
CREATE UNIQUE INDEX IF NOT EXISTS event_hash_index ON event(event_hash);
CREATE INDEX IF NOT EXISTS created_at_index ON event(created_at);
CREATE INDEX IF NOT EXISTS author_index ON event(author);
CREATE INDEX IF NOT EXISTS kind_index ON event(kind);
-- Tag Table
-- Tag values are stored as either a BLOB (if they come in as a
-- hex-string), or TEXT otherwise.
-- This means that searches need to select the appropriate column.
CREATE TABLE IF NOT EXISTS tag (
id INTEGER PRIMARY KEY,
event_id INTEGER NOT NULL, -- an event ID that contains a tag.
name TEXT, -- the tag name ("p", "e", whatever)
value TEXT, -- the tag value, if not hex.
value_hex BLOB, -- the tag value, if it can be interpreted as a hex string.
FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS tag_val_index ON tag(value);
CREATE INDEX IF NOT EXISTS tag_val_hex_index ON tag(value_hex);
-- Event References Table
CREATE TABLE IF NOT EXISTS event_ref (
id INTEGER PRIMARY KEY,
event_id INTEGER NOT NULL, -- an event ID that contains an #e tag.
referenced_event BLOB NOT NULL, -- the event that is referenced.
FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
);
-- Event References Index
CREATE INDEX IF NOT EXISTS event_ref_index ON event_ref(referenced_event);
-- Pubkey References Table
CREATE TABLE IF NOT EXISTS pubkey_ref (
id INTEGER PRIMARY KEY,
event_id INTEGER NOT NULL, -- an event ID that contains an #p tag.
referenced_pubkey BLOB NOT NULL, -- the pubkey that is referenced.
FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE RESTRICT ON DELETE CASCADE
);
-- Pubkey References Index
CREATE INDEX IF NOT EXISTS pubkey_ref_index ON pubkey_ref(referenced_pubkey);
-- NIP-05 User Validation.
-- This represents the validation of a user.
-- cases;
-- we query, and find a valid result. update verified_at, and proceed.
-- we query, and get a 404/503/host down. update failed_at, and we are done.
-- we query, and get a 200, but the local part is not present with the given address. wipe out verified_at, update failed_at.
-- we need to know how often to query failing validations.
-- two cases, either we get a NIP-05 metadata event regularly that we can use to restart validation.
-- or, we simply get lots of non-metadata events, but the user fixed their NIP-05 host.
-- what should trigger a new attempt? what should trigger cleaning?
-- we will never write anything to the table if it is not valid at least once.
-- we will keep trying at frequency X to re-validate the already-valid nip05s.
-- incoming metadata events with nip05
CREATE TABLE IF NOT EXISTS user_verification (
id INTEGER PRIMARY KEY,
metadata_event INTEGER NOT NULL, -- the metadata event used for this validation.
name TEXT NOT NULL, -- the nip05 field value (user@domain).
verified_at INTEGER, -- timestamp this author/nip05 was most recently verified.
failed_at INTEGER, -- timestamp a verification attempt failed (host down).
failure_count INTEGER DEFAULT 0, -- number of consecutive failures.
FOREIGN KEY(metadata_event) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
);
"##;
/// Determine the current application database schema version.
pub fn db_version(conn: &mut Connection) -> Result<usize> {
let query = "PRAGMA user_version;";
let curr_version = conn.query_row(query, [], |row| row.get(0))?;
Ok(curr_version)
}
/// Upgrade DB to latest version, and execute pragma settings
pub fn upgrade_db(conn: &mut PooledConnection) -> Result<()> {
// check the version.
let mut curr_version = db_version(conn)?;
info!("DB version = {:?}", curr_version);
debug!(
"SQLite max query parameters: {}",
conn.limit(Limit::SQLITE_LIMIT_VARIABLE_NUMBER)
);
debug!(
"SQLite max table/blob/text length: {} MB",
(conn.limit(Limit::SQLITE_LIMIT_LENGTH) as f64 / (1024 * 1024) as f64).floor()
);
debug!(
"SQLite max SQL length: {} MB",
(conn.limit(Limit::SQLITE_LIMIT_SQL_LENGTH) as f64 / (1024 * 1024) as f64).floor()
);
// initialize from scratch
if curr_version == 0 {
match conn.execute_batch(INIT_SQL) {
Ok(()) => {
info!("database pragma/schema initialized to v4, and ready");
}
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be initialized");
}
}
}
if curr_version == 1 {
// only change is adding a hidden column to events.
let upgrade_sql = r##"
ALTER TABLE event ADD hidden INTEGER;
UPDATE event SET hidden=FALSE;
PRAGMA user_version = 2;
"##;
match conn.execute_batch(upgrade_sql) {
Ok(()) => {
info!("database schema upgraded v1 -> v2");
curr_version = 2;
}
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be upgraded");
}
}
}
if curr_version == 2 {
// this version lacks the tag column
debug!("database schema needs update from 2->3");
let upgrade_sql = r##"
CREATE TABLE IF NOT EXISTS tag (
id INTEGER PRIMARY KEY,
event_id INTEGER NOT NULL, -- an event ID that contains a tag.
name TEXT, -- the tag name ("p", "e", whatever)
value TEXT, -- the tag value, if not hex.
value_hex BLOB, -- the tag value, if it can be interpreted as a hex string.
FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
);
PRAGMA user_version = 3;
"##;
// TODO: load existing refs into tag table
match conn.execute_batch(upgrade_sql) {
Ok(()) => {
info!("database schema upgraded v2 -> v3");
curr_version = 3;
}
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be upgraded");
}
}
info!("Starting transaction");
// iterate over every event/pubkey tag
let tx = conn.transaction()?;
{
let mut stmt = tx.prepare("select event_id, \"e\", lower(hex(referenced_event)) from event_ref union select event_id, \"p\", lower(hex(referenced_pubkey)) from pubkey_ref;")?;
let mut tag_rows = stmt.query([])?;
while let Some(row) = tag_rows.next()? {
// we want to capture the event_id that had the tag, the tag name, and the tag hex value.
let event_id: u64 = row.get(0)?;
let tag_name: String = row.get(1)?;
let tag_value: String = row.get(2)?;
// this will leave behind p/e tags that were non-hex, but they are invalid anyways.
if is_hex(&tag_value) {
tx.execute(
"INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);",
params![event_id, tag_name, hex::decode(&tag_value).ok()],
)?;
}
}
}
tx.commit()?;
info!("Upgrade complete");
}
if curr_version == 3 {
debug!("database schema needs update from 3->4");
let upgrade_sql = r##"
-- incoming metadata events with nip05
CREATE TABLE IF NOT EXISTS user_verification (
id INTEGER PRIMARY KEY,
metadata_event INTEGER NOT NULL, -- the metadata event used for this validation.
name TEXT NOT NULL, -- the nip05 field value (user@domain).
verified_at INTEGER, -- timestamp this author/nip05 was most recently verified.
failed_at INTEGER, -- timestamp a verification attempt failed (host down).
failure_count INTEGER DEFAULT 0, -- number of consecutive failures.
FOREIGN KEY(metadata_event) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS user_verification_author_index ON user_verification(author);
CREATE INDEX IF NOT EXISTS user_verification_author_index ON user_verification(author);
PRAGMA user_version = 4;
"##;
// TODO: load existing refs into tag table
match conn.execute_batch(upgrade_sql) {
Ok(()) => {
info!("database schema upgraded v3 -> v4");
//curr_version = 4;
}
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be upgraded");
}
}
} else if curr_version == 4 {
debug!("Database version was already current");
} else if curr_version > 3 {
panic!("Database version is newer than supported by this executable");
}
// Setup PRAGMA
conn.execute_batch(STARTUP_SQL)?;
debug!("SQLite PRAGMA startup completed");
Ok(())
}