feat: postgres migration schema v2

This primarily deals with correctly handling tag values.
This commit is contained in:
Greg Heartsfield 2023-01-27 19:02:19 -06:00
parent 53f83aa923
commit 57e1b53c13
6 changed files with 155 additions and 10 deletions

44
Cargo.lock generated
View File

@ -447,6 +447,19 @@ dependencies = [
"yaml-rust",
]
[[package]]
name = "console"
version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3d79fbe8970a77e3e34151cc13d3b3e248aa0faaecb9f6091fa07ebefe5ad60"
dependencies = [
"encode_unicode",
"lazy_static",
"libc",
"unicode-width",
"windows-sys",
]
[[package]]
name = "console-api"
version = "0.4.0"
@ -710,6 +723,12 @@ version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797"
[[package]]
name = "encode_unicode"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f"
[[package]]
name = "errno"
version = "0.2.8"
@ -1233,6 +1252,18 @@ dependencies = [
"hashbrown 0.12.3",
]
[[package]]
name = "indicatif"
version = "0.17.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cef509aa9bc73864d6756f0d34d35504af3cf0844373afe9b8669a5b8005a729"
dependencies = [
"console",
"number_prefix",
"portable-atomic",
"unicode-width",
]
[[package]]
name = "instant"
version = "0.1.12"
@ -1513,6 +1544,7 @@ dependencies = [
"http",
"hyper",
"hyper-tls",
"indicatif",
"lazy_static",
"nonzero_ext",
"parse_duration",
@ -1622,6 +1654,12 @@ dependencies = [
"libc",
]
[[package]]
name = "number_prefix"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
[[package]]
name = "once_cell"
version = "1.17.0"
@ -1868,6 +1906,12 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "portable-atomic"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26f6a7b87c2e435a3241addceeeff740ff8b7e76b74c13bf9acb17fa454ea00b"
[[package]]
name = "ppv-lite86"
version = "0.2.17"

View File

@ -47,6 +47,7 @@ async-std = "1.12.0"
sqlx = { version ="0.6.2", features=["runtime-tokio-rustls", "postgres", "chrono"]}
chrono = "0.4.23"
prometheus = "0.13.3"
indicatif = "0.17.3"
[dev-dependencies]
anyhow = "1"

View File

@ -77,7 +77,7 @@ reject_future_seconds = 1800
# Limiting event creation is highly recommended if your relay is
# public!
#
messages_per_sec = 5
#messages_per_sec = 5
# Limit client subscriptions created per second, averaged over one
# minute. Must be an integer. If not set (or set to 0), defaults to

View File

@ -63,9 +63,9 @@ async fn build_postgres_pool(settings: &Settings, metrics: NostrMetrics) -> Post
.await
.unwrap();
let repo = PostgresRepo::new(pool, metrics);
if let Ok(version) = repo.migrate_up().await {
info!("Postgres migration completed, at v{}", version);
}
// Panic on migration failure
let version = repo.migrate_up().await.unwrap();
info!("Postgres migration completed, at v{}", version);
repo
}

View File

@ -47,7 +47,7 @@ impl NostrRepo for PostgresRepo {
}
async fn migrate_up(&self) -> Result<usize> {
run_migrations(&self.conn).await
Ok(run_migrations(&self.conn).await?)
}
async fn write_event(&self, e: &Event) -> Result<u64> {

View File

@ -30,7 +30,15 @@ impl Migration for SimpleSqlMigration {
pub async fn run_migrations(db: &PostgresPool) -> crate::error::Result<usize> {
prepare_migrations_table(db).await;
run_migration(m001::migration(), db).await;
Ok(m001::migration().serial_number() as usize)
let m002_result = run_migration(m002::migration(), db).await;
if m002_result == MigrationResult::Upgraded {
m002::rebuild_tags(db).await?;
}
Ok(current_version(db).await as usize)
}
async fn current_version(db: &PostgresPool) -> i64 {
sqlx::query_scalar("SELECT max(serial_number) FROM migrations;").fetch_one(db).await.unwrap()
}
async fn prepare_migrations_table(db: &PostgresPool) {
@ -40,7 +48,14 @@ async fn prepare_migrations_table(db: &PostgresPool) {
.unwrap();
}
async fn run_migration(migration: impl Migration, db: &PostgresPool) {
// Running a migration was either unnecessary, or completed
#[derive(PartialEq, Eq, Debug, Clone)]
enum MigrationResult {
Upgraded,
NotNeeded,
}
async fn run_migration(migration: impl Migration, db: &PostgresPool) -> MigrationResult {
let row: i64 =
sqlx::query_scalar("SELECT COUNT(*) AS count FROM migrations WHERE serial_number = $1")
.bind(migration.serial_number())
@ -49,7 +64,7 @@ async fn run_migration(migration: impl Migration, db: &PostgresPool) {
.unwrap();
if row > 0 {
return;
return MigrationResult::NotNeeded;
}
let mut transaction = db.begin().await.unwrap();
@ -62,14 +77,17 @@ async fn run_migration(migration: impl Migration, db: &PostgresPool) {
.unwrap();
transaction.commit().await.unwrap();
return MigrationResult::Upgraded;
}
mod m001 {
use crate::repo::postgres_migration::{Migration, SimpleSqlMigration};
pub const VERSION: i64 = 1;
pub fn migration() -> impl Migration {
SimpleSqlMigration {
serial_number: 1,
serial_number: VERSION,
sql: vec![
r#"
-- Events table
@ -97,8 +115,8 @@ CREATE TABLE "tag" (
CONSTRAINT tag_fk FOREIGN KEY (event_id) REFERENCES "event"(id) ON DELETE CASCADE
);
CREATE INDEX tag_event_id_idx ON tag USING btree (event_id, name);
CREATE UNIQUE INDEX tag_event_id_value_idx ON tag (event_id,name,value);
CREATE INDEX tag_value_idx ON tag USING btree (value);
CREATE INDEX tag_value_hex_idx ON tag USING btree (value_hex);
-- NIP-05 Verfication table
CREATE TABLE "user_verification" (
@ -118,3 +136,85 @@ CREATE INDEX user_verification_name_idx ON user_verification USING btree (name);
}
}
}
mod m002 {
use std::time::Instant;
use tracing::info;
use async_std::stream::StreamExt;
use sqlx::Row;
use indicatif::{ProgressBar, ProgressStyle};
use crate::repo::postgres_migration::{Migration, SimpleSqlMigration};
use crate::repo::postgres::PostgresPool;
use crate::event::{Event, single_char_tagname};
use crate::utils::is_lower_hex;
pub const VERSION: i64 = 2;
pub fn migration() -> impl Migration {
SimpleSqlMigration {
serial_number: VERSION,
sql: vec![
r#"
-- Add tag value column
ALTER TABLE tag ADD COLUMN value_hex bytea;
-- Remove not-null constraint
ALTER TABLE tag ALTER COLUMN value DROP NOT NULL;
-- Add value index
CREATE INDEX tag_value_hex_idx ON tag USING btree (value_hex);
"#,
],
}
}
pub async fn rebuild_tags(db: &PostgresPool) -> crate::error::Result<()> {
// Check how many events we have to process
let start = Instant::now();
let mut tx = db.begin().await.unwrap();
let mut update_tx = db.begin().await.unwrap();
// Clear out table
sqlx::query("DELETE FROM tag;").execute(&mut update_tx).await?;
{
let event_count: i64 =
sqlx::query_scalar("SELECT COUNT(*) from event;")
.fetch_one(&mut tx)
.await
.unwrap();
let bar = ProgressBar::new(event_count.try_into().unwrap()).with_message("rebuilding tags table");
bar.set_style(ProgressStyle::with_template("[{elapsed_precise}] {bar:40.white/blue} {pos:>7}/{len:7} [{percent}%] {msg}").unwrap());
let mut events = sqlx::query("SELECT id, content FROM event ORDER BY id;").fetch(&mut tx);
while let Some(row) = events.next().await {
bar.inc(1);
// get the row id and content
let row = row.unwrap();
let event_id: Vec<u8> = row.get(0);
let event_bytes: Vec<u8> = row.get(1);
let event:Event = serde_json::from_str(&String::from_utf8(event_bytes).unwrap())?;
for t in event.tags.iter().filter(|x| x.len() > 1) {
let tagname = t.get(0).unwrap();
let tagnamechar_opt = single_char_tagname(tagname);
if tagnamechar_opt.is_none() {
continue;
}
// safe because len was > 1
let tagval = t.get(1).unwrap();
// insert as BLOB if we can restore it losslessly.
// this means it needs to be even length and lowercase.
if (tagval.len() % 2 == 0) && is_lower_hex(tagval) {
let q = "INSERT INTO tag (event_id, \"name\", value_hex) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;";
sqlx::query(q).bind(&event_id).bind(&tagname).bind(hex::decode(tagval).ok()).execute(&mut update_tx).await?;
} else {
let q = "INSERT INTO tag (event_id, \"name\", value) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;";
sqlx::query(q).bind(&event_id).bind(&tagname).bind(tagval.as_bytes()).execute(&mut update_tx).await?;
}
}
}
update_tx.commit().await?;
bar.finish();
}
info!("rebuilt tags in {:?}", start.elapsed());
Ok(())
}
}