From 57e1b53c13cba97d410e2700efabfdfed5db2f90 Mon Sep 17 00:00:00 2001 From: Greg Heartsfield Date: Fri, 27 Jan 2023 19:02:19 -0600 Subject: [PATCH] feat: postgres migration schema v2 This primarily deals with correctly handling tag values. --- Cargo.lock | 44 +++++++++++++ Cargo.toml | 1 + config.toml | 2 +- src/db.rs | 6 +- src/repo/postgres.rs | 2 +- src/repo/postgres_migration.rs | 110 +++++++++++++++++++++++++++++++-- 6 files changed, 155 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 514de36..3a21c24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -447,6 +447,19 @@ dependencies = [ "yaml-rust", ] +[[package]] +name = "console" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3d79fbe8970a77e3e34151cc13d3b3e248aa0faaecb9f6091fa07ebefe5ad60" +dependencies = [ + "encode_unicode", + "lazy_static", + "libc", + "unicode-width", + "windows-sys", +] + [[package]] name = "console-api" version = "0.4.0" @@ -710,6 +723,12 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797" +[[package]] +name = "encode_unicode" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" + [[package]] name = "errno" version = "0.2.8" @@ -1233,6 +1252,18 @@ dependencies = [ "hashbrown 0.12.3", ] +[[package]] +name = "indicatif" +version = "0.17.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cef509aa9bc73864d6756f0d34d35504af3cf0844373afe9b8669a5b8005a729" +dependencies = [ + "console", + "number_prefix", + "portable-atomic", + "unicode-width", +] + [[package]] name = "instant" version = "0.1.12" @@ -1513,6 +1544,7 @@ dependencies = [ "http", "hyper", "hyper-tls", + "indicatif", "lazy_static", "nonzero_ext", "parse_duration", @@ -1622,6 +1654,12 @@ dependencies = [ "libc", ] +[[package]] +name = "number_prefix" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" + [[package]] name = "once_cell" version = "1.17.0" @@ -1868,6 +1906,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "portable-atomic" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26f6a7b87c2e435a3241addceeeff740ff8b7e76b74c13bf9acb17fa454ea00b" + [[package]] name = "ppv-lite86" version = "0.2.17" diff --git a/Cargo.toml b/Cargo.toml index 1a249e0..b67aa2e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ async-std = "1.12.0" sqlx = { version ="0.6.2", features=["runtime-tokio-rustls", "postgres", "chrono"]} chrono = "0.4.23" prometheus = "0.13.3" +indicatif = "0.17.3" [dev-dependencies] anyhow = "1" diff --git a/config.toml b/config.toml index f6ff68f..30cc5b1 100644 --- a/config.toml +++ b/config.toml @@ -77,7 +77,7 @@ reject_future_seconds = 1800 # Limiting event creation is highly recommended if your relay is # public! # -messages_per_sec = 5 +#messages_per_sec = 5 # Limit client subscriptions created per second, averaged over one # minute. Must be an integer. If not set (or set to 0), defaults to diff --git a/src/db.rs b/src/db.rs index bc3ff7b..f5e7895 100644 --- a/src/db.rs +++ b/src/db.rs @@ -63,9 +63,9 @@ async fn build_postgres_pool(settings: &Settings, metrics: NostrMetrics) -> Post .await .unwrap(); let repo = PostgresRepo::new(pool, metrics); - if let Ok(version) = repo.migrate_up().await { - info!("Postgres migration completed, at v{}", version); - } + // Panic on migration failure + let version = repo.migrate_up().await.unwrap(); + info!("Postgres migration completed, at v{}", version); repo } diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index 9ee6a36..de2a4fd 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -47,7 +47,7 @@ impl NostrRepo for PostgresRepo { } async fn migrate_up(&self) -> Result { - run_migrations(&self.conn).await + Ok(run_migrations(&self.conn).await?) } async fn write_event(&self, e: &Event) -> Result { diff --git a/src/repo/postgres_migration.rs b/src/repo/postgres_migration.rs index 5640396..211a322 100644 --- a/src/repo/postgres_migration.rs +++ b/src/repo/postgres_migration.rs @@ -30,7 +30,15 @@ impl Migration for SimpleSqlMigration { pub async fn run_migrations(db: &PostgresPool) -> crate::error::Result { prepare_migrations_table(db).await; run_migration(m001::migration(), db).await; - Ok(m001::migration().serial_number() as usize) + let m002_result = run_migration(m002::migration(), db).await; + if m002_result == MigrationResult::Upgraded { + m002::rebuild_tags(db).await?; + } + Ok(current_version(db).await as usize) +} + +async fn current_version(db: &PostgresPool) -> i64 { + sqlx::query_scalar("SELECT max(serial_number) FROM migrations;").fetch_one(db).await.unwrap() } async fn prepare_migrations_table(db: &PostgresPool) { @@ -40,7 +48,14 @@ async fn prepare_migrations_table(db: &PostgresPool) { .unwrap(); } -async fn run_migration(migration: impl Migration, db: &PostgresPool) { +// Running a migration was either unnecessary, or completed +#[derive(PartialEq, Eq, Debug, Clone)] +enum MigrationResult { + Upgraded, + NotNeeded, +} + +async fn run_migration(migration: impl Migration, db: &PostgresPool) -> MigrationResult { let row: i64 = sqlx::query_scalar("SELECT COUNT(*) AS count FROM migrations WHERE serial_number = $1") .bind(migration.serial_number()) @@ -49,7 +64,7 @@ async fn run_migration(migration: impl Migration, db: &PostgresPool) { .unwrap(); if row > 0 { - return; + return MigrationResult::NotNeeded; } let mut transaction = db.begin().await.unwrap(); @@ -62,14 +77,17 @@ async fn run_migration(migration: impl Migration, db: &PostgresPool) { .unwrap(); transaction.commit().await.unwrap(); + return MigrationResult::Upgraded; } mod m001 { use crate::repo::postgres_migration::{Migration, SimpleSqlMigration}; + pub const VERSION: i64 = 1; + pub fn migration() -> impl Migration { SimpleSqlMigration { - serial_number: 1, + serial_number: VERSION, sql: vec![ r#" -- Events table @@ -97,8 +115,8 @@ CREATE TABLE "tag" ( CONSTRAINT tag_fk FOREIGN KEY (event_id) REFERENCES "event"(id) ON DELETE CASCADE ); CREATE INDEX tag_event_id_idx ON tag USING btree (event_id, name); -CREATE UNIQUE INDEX tag_event_id_value_idx ON tag (event_id,name,value); CREATE INDEX tag_value_idx ON tag USING btree (value); +CREATE INDEX tag_value_hex_idx ON tag USING btree (value_hex); -- NIP-05 Verfication table CREATE TABLE "user_verification" ( @@ -118,3 +136,85 @@ CREATE INDEX user_verification_name_idx ON user_verification USING btree (name); } } } + +mod m002 { + use std::time::Instant; + use tracing::info; + use async_std::stream::StreamExt; + use sqlx::Row; + use indicatif::{ProgressBar, ProgressStyle}; + + use crate::repo::postgres_migration::{Migration, SimpleSqlMigration}; + use crate::repo::postgres::PostgresPool; + use crate::event::{Event, single_char_tagname}; + use crate::utils::is_lower_hex; + + pub const VERSION: i64 = 2; + + pub fn migration() -> impl Migration { + SimpleSqlMigration { + serial_number: VERSION, + sql: vec![ + r#" +-- Add tag value column +ALTER TABLE tag ADD COLUMN value_hex bytea; +-- Remove not-null constraint +ALTER TABLE tag ALTER COLUMN value DROP NOT NULL; +-- Add value index +CREATE INDEX tag_value_hex_idx ON tag USING btree (value_hex); + "#, + ], + } + } + + pub async fn rebuild_tags(db: &PostgresPool) -> crate::error::Result<()> { + // Check how many events we have to process + let start = Instant::now(); + let mut tx = db.begin().await.unwrap(); + let mut update_tx = db.begin().await.unwrap(); + // Clear out table + sqlx::query("DELETE FROM tag;").execute(&mut update_tx).await?; + { + let event_count: i64 = + sqlx::query_scalar("SELECT COUNT(*) from event;") + .fetch_one(&mut tx) + .await + .unwrap(); + let bar = ProgressBar::new(event_count.try_into().unwrap()).with_message("rebuilding tags table"); + bar.set_style(ProgressStyle::with_template("[{elapsed_precise}] {bar:40.white/blue} {pos:>7}/{len:7} [{percent}%] {msg}").unwrap()); + let mut events = sqlx::query("SELECT id, content FROM event ORDER BY id;").fetch(&mut tx); + while let Some(row) = events.next().await { + bar.inc(1); + // get the row id and content + let row = row.unwrap(); + let event_id: Vec = row.get(0); + let event_bytes: Vec = row.get(1); + let event:Event = serde_json::from_str(&String::from_utf8(event_bytes).unwrap())?; + + for t in event.tags.iter().filter(|x| x.len() > 1) { + let tagname = t.get(0).unwrap(); + let tagnamechar_opt = single_char_tagname(tagname); + if tagnamechar_opt.is_none() { + continue; + } + // safe because len was > 1 + let tagval = t.get(1).unwrap(); + // insert as BLOB if we can restore it losslessly. + // this means it needs to be even length and lowercase. + if (tagval.len() % 2 == 0) && is_lower_hex(tagval) { + let q = "INSERT INTO tag (event_id, \"name\", value_hex) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;"; + sqlx::query(q).bind(&event_id).bind(&tagname).bind(hex::decode(tagval).ok()).execute(&mut update_tx).await?; + } else { + let q = "INSERT INTO tag (event_id, \"name\", value) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;"; + sqlx::query(q).bind(&event_id).bind(&tagname).bind(tagval.as_bytes()).execute(&mut update_tx).await?; + } + + } + } + update_tx.commit().await?; + bar.finish(); + } + info!("rebuilt tags in {:?}", start.elapsed()); + Ok(()) + } +}