From beffeb4d869fd0078cc52528938fee965e91a328 Mon Sep 17 00:00:00 2001 From: Kieran Date: Thu, 20 Apr 2023 12:01:28 +0100 Subject: [PATCH] improvement: add a configurable postgres write conn string This adds a new configurable connection string for postgres writes. --- src/config.rs | 4 +++- src/db.rs | 25 ++++++++++++++++++++++--- src/repo/postgres.rs | 38 ++++++++++++++++++++------------------ 3 files changed, 45 insertions(+), 22 deletions(-) diff --git a/src/config.rs b/src/config.rs index 9ac0398..32ac2d3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -26,6 +26,7 @@ pub struct Database { pub min_conn: u32, pub max_conn: u32, pub connection: String, + pub connection_write: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -80,7 +81,7 @@ pub struct Limits { pub struct Authorization { pub pubkey_whitelist: Option>, // If present, only allow these pubkeys to publish events pub nip42_auth: bool, // if true enables NIP-42 authentication - pub nip42_dms: bool, // if true send DMs only to their authenticated recipients + pub nip42_dms: bool, // if true send DMs only to their authenticated recipients } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -260,6 +261,7 @@ impl Default for Settings { min_conn: 4, max_conn: 8, connection: "".to_owned(), + connection_write: None, }, grpc: Grpc { event_admission_server: None, diff --git a/src/db.rs b/src/db.rs index a4f648e..27aafaf 100644 --- a/src/db.rs +++ b/src/db.rs @@ -45,8 +45,8 @@ pub const DB_FILE: &str = "nostr.db"; /// Will panic if the pool could not be created. pub async fn build_repo(settings: &Settings, metrics: NostrMetrics) -> Arc { match settings.database.engine.as_str() { - "sqlite" => Arc::new(build_sqlite_pool(settings, metrics).await), - "postgres" => Arc::new(build_postgres_pool(settings, metrics).await), + "sqlite" => Arc::new(build_sqlite_pool(&settings, metrics).await), + "postgres" => Arc::new(build_postgres_pool(&settings, metrics).await), _ => panic!("Unknown database engine"), } } @@ -70,7 +70,26 @@ async fn build_postgres_pool(settings: &Settings, metrics: NostrMetrics) -> Post .connect_with(options) .await .unwrap(); - let repo = PostgresRepo::new(pool, metrics); + + let write_pool: PostgresPool = match &settings.database.connection_write { + Some(cfg_write) => { + let mut options_write: PgConnectOptions = cfg_write.as_str().parse().unwrap(); + options_write.log_statements(LevelFilter::Debug); + options_write.log_slow_statements(LevelFilter::Warn, Duration::from_secs(60)); + + PoolOptions::new() + .max_connections(settings.database.max_conn) + .min_connections(settings.database.min_conn) + .idle_timeout(Duration::from_secs(60)) + .connect_with(options_write) + .await + .unwrap() + } + None => pool.clone(), + }; + + let repo = PostgresRepo::new(pool, write_pool, metrics); + // Panic on migration failure let version = repo.migrate_up().await.unwrap(); info!("Postgres migration completed, at v{}", version); diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index 6e8fea6..8fc46eb 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -28,13 +28,15 @@ pub type PostgresPool = sqlx::pool::Pool; pub struct PostgresRepo { conn: PostgresPool, + conn_write: PostgresPool, metrics: NostrMetrics, } impl PostgresRepo { - pub fn new(c: PostgresPool, m: NostrMetrics) -> PostgresRepo { + pub fn new(c: PostgresPool, cw: PostgresPool, m: NostrMetrics) -> PostgresRepo { PostgresRepo { conn: c, + conn_write: cw, metrics: m, } } @@ -81,17 +83,17 @@ async fn delete_expired(conn: PostgresPool) -> Result { impl NostrRepo for PostgresRepo { async fn start(&self) -> Result<()> { // begin a cleanup task for expired events. - cleanup_expired(self.conn.clone(), Duration::from_secs(600)).await?; + cleanup_expired(self.conn_write.clone(), Duration::from_secs(600)).await?; Ok(()) } async fn migrate_up(&self) -> Result { - Ok(run_migrations(&self.conn).await?) + Ok(run_migrations(&self.conn_write).await?) } async fn write_event(&self, e: &Event) -> Result { // start transaction - let mut tx = self.conn.begin().await?; + let mut tx = self.conn_write.begin().await?; let start = Instant::now(); // get relevant fields from event and convert to blobs. @@ -455,7 +457,7 @@ ON CONFLICT (id) DO NOTHING"#, } async fn create_verification_record(&self, event_id: &str, name: &str) -> Result<()> { - let mut tx = self.conn.begin().await?; + let mut tx = self.conn_write.begin().await?; sqlx::query("DELETE FROM user_verification WHERE \"name\" = $1") .bind(name) @@ -481,7 +483,7 @@ ON CONFLICT (id) DO NOTHING"#, sqlx::query("UPDATE user_verification SET verified_at = $1, fail_count = 0 WHERE id = $2") .bind(Utc.timestamp_opt(verify_time as i64, 0).unwrap()) .bind(id as i64) - .execute(&self.conn) + .execute(&self.conn_write) .await?; info!("verification updated for {}", id); @@ -491,7 +493,7 @@ ON CONFLICT (id) DO NOTHING"#, async fn fail_verification(&self, id: u64) -> Result<()> { sqlx::query("UPDATE user_verification SET failed_at = now(), fail_count = fail_count + 1 WHERE id = $1") .bind(id as i64) - .execute(&self.conn) + .execute(&self.conn_write) .await?; Ok(()) } @@ -499,7 +501,7 @@ ON CONFLICT (id) DO NOTHING"#, async fn delete_verification(&self, id: u64) -> Result<()> { sqlx::query("DELETE FROM user_verification WHERE id = $1") .bind(id as i64) - .execute(&self.conn) + .execute(&self.conn_write) .await?; Ok(()) } @@ -551,7 +553,7 @@ ON CONFLICT (id) DO NOTHING"#, async fn create_account(&self, pub_key: &Keys) -> Result { let pub_key = pub_key.public_key().to_string(); - let mut tx = self.conn.begin().await?; + let mut tx = self.conn_write.begin().await?; let result = sqlx::query("INSERT INTO account (pubkey, balance) VALUES ($1, 0);") .bind(pub_key) @@ -577,7 +579,7 @@ ON CONFLICT (id) DO NOTHING"#, ) .bind(admission_cost as i64) .bind(pub_key) - .execute(&self.conn) + .execute(&self.conn_write) .await?; Ok(()) } @@ -594,7 +596,7 @@ ON CONFLICT (id) DO NOTHING"#, let result = sqlx::query_as::<_, (bool, i64)>(query) .bind(pub_key) - .fetch_optional(&self.conn) + .fetch_optional(&self.conn_write) .await? .ok_or(error::Error::SqlxError(RowNotFound))?; @@ -614,14 +616,14 @@ ON CONFLICT (id) DO NOTHING"#, sqlx::query("UPDATE account SET balance = balance + $1 WHERE pubkey = $2") .bind(new_balance as i64) .bind(pub_key) - .execute(&self.conn) + .execute(&self.conn_write) .await? } false => { sqlx::query("UPDATE account SET balance = balance - $1 WHERE pubkey = $2") .bind(new_balance as i64) .bind(pub_key) - .execute(&self.conn) + .execute(&self.conn_write) .await? } }; @@ -631,7 +633,7 @@ ON CONFLICT (id) DO NOTHING"#, /// Create invoice record async fn create_invoice_record(&self, pub_key: &Keys, invoice_info: InvoiceInfo) -> Result<()> { let pub_key = pub_key.public_key().to_string(); - let mut tx = self.conn.begin().await?; + let mut tx = self.conn_write.begin().await?; sqlx::query( "INSERT INTO invoice (pubkey, payment_hash, amount, status, description, created_at, invoice) VALUES ($1, $2, $3, $4, $5, now(), $6)", @@ -658,7 +660,7 @@ ON CONFLICT (id) DO NOTHING"#, let (pubkey, prev_invoice_status, amount) = sqlx::query_as::<_, (String, InvoiceStatus, i64)>(query) .bind(payment_hash) - .fetch_optional(&self.conn) + .fetch_optional(&self.conn_write) .await? .ok_or(error::Error::SqlxError(RowNotFound))?; @@ -672,14 +674,14 @@ ON CONFLICT (id) DO NOTHING"#, sqlx::query(query) .bind(&status) .bind(payment_hash) - .execute(&self.conn) + .execute(&self.conn_write) .await?; if prev_invoice_status.eq(&InvoiceStatus::Unpaid) && status.eq(&InvoiceStatus::Paid) { sqlx::query("UPDATE account SET balance = balance + $1 WHERE pubkey = $2") .bind(amount) .bind(&pubkey) - .execute(&self.conn) + .execute(&self.conn_write) .await?; } @@ -698,7 +700,7 @@ LIMIT 1; "#; match sqlx::query_as::<_, (i64, String, String, String)>(query) .bind(pubkey.public_key().to_string()) - .fetch_optional(&self.conn) + .fetch_optional(&self.conn_write) .await .unwrap() {