improvement: add a configurable postgres write conn string

This adds a new configurable connection string for postgres writes.
This commit is contained in:
Kieran 2023-04-20 12:01:28 +01:00 committed by Greg Heartsfield
parent 5135f3b007
commit beffeb4d86
3 changed files with 45 additions and 22 deletions

View File

@ -26,6 +26,7 @@ pub struct Database {
pub min_conn: u32,
pub max_conn: u32,
pub connection: String,
pub connection_write: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -80,7 +81,7 @@ pub struct Limits {
pub struct Authorization {
pub pubkey_whitelist: Option<Vec<String>>, // If present, only allow these pubkeys to publish events
pub nip42_auth: bool, // if true enables NIP-42 authentication
pub nip42_dms: bool, // if true send DMs only to their authenticated recipients
pub nip42_dms: bool, // if true send DMs only to their authenticated recipients
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -260,6 +261,7 @@ impl Default for Settings {
min_conn: 4,
max_conn: 8,
connection: "".to_owned(),
connection_write: None,
},
grpc: Grpc {
event_admission_server: None,

View File

@ -45,8 +45,8 @@ pub const DB_FILE: &str = "nostr.db";
/// Will panic if the pool could not be created.
pub async fn build_repo(settings: &Settings, metrics: NostrMetrics) -> Arc<dyn NostrRepo> {
match settings.database.engine.as_str() {
"sqlite" => Arc::new(build_sqlite_pool(settings, metrics).await),
"postgres" => Arc::new(build_postgres_pool(settings, metrics).await),
"sqlite" => Arc::new(build_sqlite_pool(&settings, metrics).await),
"postgres" => Arc::new(build_postgres_pool(&settings, metrics).await),
_ => panic!("Unknown database engine"),
}
}
@ -70,7 +70,26 @@ async fn build_postgres_pool(settings: &Settings, metrics: NostrMetrics) -> Post
.connect_with(options)
.await
.unwrap();
let repo = PostgresRepo::new(pool, metrics);
let write_pool: PostgresPool = match &settings.database.connection_write {
Some(cfg_write) => {
let mut options_write: PgConnectOptions = cfg_write.as_str().parse().unwrap();
options_write.log_statements(LevelFilter::Debug);
options_write.log_slow_statements(LevelFilter::Warn, Duration::from_secs(60));
PoolOptions::new()
.max_connections(settings.database.max_conn)
.min_connections(settings.database.min_conn)
.idle_timeout(Duration::from_secs(60))
.connect_with(options_write)
.await
.unwrap()
}
None => pool.clone(),
};
let repo = PostgresRepo::new(pool, write_pool, metrics);
// Panic on migration failure
let version = repo.migrate_up().await.unwrap();
info!("Postgres migration completed, at v{}", version);

View File

@ -28,13 +28,15 @@ pub type PostgresPool = sqlx::pool::Pool<Postgres>;
pub struct PostgresRepo {
conn: PostgresPool,
conn_write: PostgresPool,
metrics: NostrMetrics,
}
impl PostgresRepo {
pub fn new(c: PostgresPool, m: NostrMetrics) -> PostgresRepo {
pub fn new(c: PostgresPool, cw: PostgresPool, m: NostrMetrics) -> PostgresRepo {
PostgresRepo {
conn: c,
conn_write: cw,
metrics: m,
}
}
@ -81,17 +83,17 @@ async fn delete_expired(conn: PostgresPool) -> Result<u64> {
impl NostrRepo for PostgresRepo {
async fn start(&self) -> Result<()> {
// begin a cleanup task for expired events.
cleanup_expired(self.conn.clone(), Duration::from_secs(600)).await?;
cleanup_expired(self.conn_write.clone(), Duration::from_secs(600)).await?;
Ok(())
}
async fn migrate_up(&self) -> Result<usize> {
Ok(run_migrations(&self.conn).await?)
Ok(run_migrations(&self.conn_write).await?)
}
async fn write_event(&self, e: &Event) -> Result<u64> {
// start transaction
let mut tx = self.conn.begin().await?;
let mut tx = self.conn_write.begin().await?;
let start = Instant::now();
// get relevant fields from event and convert to blobs.
@ -455,7 +457,7 @@ ON CONFLICT (id) DO NOTHING"#,
}
async fn create_verification_record(&self, event_id: &str, name: &str) -> Result<()> {
let mut tx = self.conn.begin().await?;
let mut tx = self.conn_write.begin().await?;
sqlx::query("DELETE FROM user_verification WHERE \"name\" = $1")
.bind(name)
@ -481,7 +483,7 @@ ON CONFLICT (id) DO NOTHING"#,
sqlx::query("UPDATE user_verification SET verified_at = $1, fail_count = 0 WHERE id = $2")
.bind(Utc.timestamp_opt(verify_time as i64, 0).unwrap())
.bind(id as i64)
.execute(&self.conn)
.execute(&self.conn_write)
.await?;
info!("verification updated for {}", id);
@ -491,7 +493,7 @@ ON CONFLICT (id) DO NOTHING"#,
async fn fail_verification(&self, id: u64) -> Result<()> {
sqlx::query("UPDATE user_verification SET failed_at = now(), fail_count = fail_count + 1 WHERE id = $1")
.bind(id as i64)
.execute(&self.conn)
.execute(&self.conn_write)
.await?;
Ok(())
}
@ -499,7 +501,7 @@ ON CONFLICT (id) DO NOTHING"#,
async fn delete_verification(&self, id: u64) -> Result<()> {
sqlx::query("DELETE FROM user_verification WHERE id = $1")
.bind(id as i64)
.execute(&self.conn)
.execute(&self.conn_write)
.await?;
Ok(())
}
@ -551,7 +553,7 @@ ON CONFLICT (id) DO NOTHING"#,
async fn create_account(&self, pub_key: &Keys) -> Result<bool> {
let pub_key = pub_key.public_key().to_string();
let mut tx = self.conn.begin().await?;
let mut tx = self.conn_write.begin().await?;
let result = sqlx::query("INSERT INTO account (pubkey, balance) VALUES ($1, 0);")
.bind(pub_key)
@ -577,7 +579,7 @@ ON CONFLICT (id) DO NOTHING"#,
)
.bind(admission_cost as i64)
.bind(pub_key)
.execute(&self.conn)
.execute(&self.conn_write)
.await?;
Ok(())
}
@ -594,7 +596,7 @@ ON CONFLICT (id) DO NOTHING"#,
let result = sqlx::query_as::<_, (bool, i64)>(query)
.bind(pub_key)
.fetch_optional(&self.conn)
.fetch_optional(&self.conn_write)
.await?
.ok_or(error::Error::SqlxError(RowNotFound))?;
@ -614,14 +616,14 @@ ON CONFLICT (id) DO NOTHING"#,
sqlx::query("UPDATE account SET balance = balance + $1 WHERE pubkey = $2")
.bind(new_balance as i64)
.bind(pub_key)
.execute(&self.conn)
.execute(&self.conn_write)
.await?
}
false => {
sqlx::query("UPDATE account SET balance = balance - $1 WHERE pubkey = $2")
.bind(new_balance as i64)
.bind(pub_key)
.execute(&self.conn)
.execute(&self.conn_write)
.await?
}
};
@ -631,7 +633,7 @@ ON CONFLICT (id) DO NOTHING"#,
/// Create invoice record
async fn create_invoice_record(&self, pub_key: &Keys, invoice_info: InvoiceInfo) -> Result<()> {
let pub_key = pub_key.public_key().to_string();
let mut tx = self.conn.begin().await?;
let mut tx = self.conn_write.begin().await?;
sqlx::query(
"INSERT INTO invoice (pubkey, payment_hash, amount, status, description, created_at, invoice) VALUES ($1, $2, $3, $4, $5, now(), $6)",
@ -658,7 +660,7 @@ ON CONFLICT (id) DO NOTHING"#,
let (pubkey, prev_invoice_status, amount) =
sqlx::query_as::<_, (String, InvoiceStatus, i64)>(query)
.bind(payment_hash)
.fetch_optional(&self.conn)
.fetch_optional(&self.conn_write)
.await?
.ok_or(error::Error::SqlxError(RowNotFound))?;
@ -672,14 +674,14 @@ ON CONFLICT (id) DO NOTHING"#,
sqlx::query(query)
.bind(&status)
.bind(payment_hash)
.execute(&self.conn)
.execute(&self.conn_write)
.await?;
if prev_invoice_status.eq(&InvoiceStatus::Unpaid) && status.eq(&InvoiceStatus::Paid) {
sqlx::query("UPDATE account SET balance = balance + $1 WHERE pubkey = $2")
.bind(amount)
.bind(&pubkey)
.execute(&self.conn)
.execute(&self.conn_write)
.await?;
}
@ -698,7 +700,7 @@ LIMIT 1;
"#;
match sqlx::query_as::<_, (i64, String, String, String)>(query)
.bind(pubkey.public_key().to_string())
.fetch_optional(&self.conn)
.fetch_optional(&self.conn_write)
.await
.unwrap()
{