diff --git a/src/db.rs b/src/db.rs index 7df8935..e0121d5 100644 --- a/src/db.rs +++ b/src/db.rs @@ -71,6 +71,8 @@ async fn build_postgres_pool(settings: &Settings, metrics: NostrMetrics) -> Post // Panic on migration failure let version = repo.migrate_up().await.unwrap(); info!("Postgres migration completed, at v{}", version); + // startup scheduled tasks + repo.start().await.ok(); repo } diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index 251a188..be2ec3d 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -15,11 +15,11 @@ use sqlx::Error::RowNotFound; use crate::hexrange::{hex_range, HexSearch}; use crate::repo::postgres_migration::run_migrations; use crate::server::NostrMetrics; -use crate::utils::{is_hex, is_lower_hex}; +use crate::utils::{is_hex, is_lower_hex, self}; use tokio::sync::mpsc::Sender; use tokio::sync::oneshot::Receiver; use tracing::log::trace; -use tracing::{debug, error, info}; +use tracing::{debug, error, warn, info}; use crate::error; pub type PostgresPool = sqlx::pool::Pool; @@ -36,13 +36,52 @@ impl PostgresRepo { metrics: m, } } + +} + + +/// Cleanup expired events on a regular basis +async fn cleanup_expired(conn: PostgresPool, frequency: Duration) -> Result<()> { + tokio::task::spawn(async move { + loop { + tokio::select! { + _ = tokio::time::sleep(frequency) => { + let start = Instant::now(); + let exp_res = delete_expired(conn.clone()).await; + match exp_res { + Ok(exp_count) => { + if exp_count > 0 { + info!("removed {} expired events in: {:?}", exp_count, start.elapsed()); + } + }, + Err(e) => { + warn!("could not remove expired events due to error: {:?}", e); + } + } + } + }; + } + }); + Ok(()) +} + +/// One-time deletion of all expired events +async fn delete_expired(conn:PostgresPool) -> Result { + let mut tx = conn.begin().await?; + let update_count = sqlx::query("DELETE FROM \"event\" WHERE expires_at <= $1;") + .bind(Utc.timestamp_opt(utils::unix_time() as i64, 0).unwrap()) + .execute(&mut tx) + .await?.rows_affected(); + tx.commit().await?; + Ok(update_count) } #[async_trait] impl NostrRepo for PostgresRepo { async fn start(&self) -> Result<()> { - info!("not implemented"); + // begin a cleanup task for expired events. + cleanup_expired(self.conn.clone(), Duration::from_secs(600)).await?; Ok(()) } @@ -106,13 +145,14 @@ impl NostrRepo for PostgresRepo { // ignore if the event hash is a duplicate. let mut ins_count = sqlx::query( r#"INSERT INTO "event" -(id, pub_key, created_at, kind, "content", delegated_by) -VALUES($1, $2, $3, $4, $5, $6) +(id, pub_key, created_at, expires_at, kind, "content", delegated_by) +VALUES($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (id) DO NOTHING"#, ) .bind(&id_blob) .bind(&pubkey_blob) .bind(Utc.timestamp_opt(e.created_at as i64, 0).unwrap()) + .bind(e.expiration().and_then(|x| Utc.timestamp_opt(x as i64, 0).latest())) .bind(e.kind as i64) .bind(event_str.into_bytes()) .bind(delegator_blob) @@ -708,6 +748,14 @@ fn query_from_filter(f: &ReqFilter) -> Option> { query.push("e.hidden != 1::bit(1)"); } + // never display expired events + if push_and { + query.push(" AND "); + } + query + .push("(e.expires_at IS NULL OR e.expires_at > ") + .push_bind(Utc.timestamp_opt(utils::unix_time() as i64, 0).unwrap()).push(")"); + // Apply per-filter limit to this query. // The use of a LIMIT implies a DESC order, to capture only the most recent events. if let Some(lim) = f.limit { diff --git a/src/repo/postgres_migration.rs b/src/repo/postgres_migration.rs index 9347054..2c16d13 100644 --- a/src/repo/postgres_migration.rs +++ b/src/repo/postgres_migration.rs @@ -35,6 +35,7 @@ pub async fn run_migrations(db: &PostgresPool) -> crate::error::Result { m002::rebuild_tags(db).await?; } run_migration(m003::migration(), db).await; + run_migration(m004::migration(), db).await; Ok(current_version(db).await as usize) } @@ -256,3 +257,23 @@ ALTER TABLE tag ADD CONSTRAINT unique_constraint_name UNIQUE (event_id, "name", } } } + +mod m004 { + use crate::repo::postgres_migration::{Migration, SimpleSqlMigration}; + + pub const VERSION: i64 = 4; + + pub fn migration() -> impl Migration { + SimpleSqlMigration { + serial_number: VERSION, + sql: vec![ + r#" +-- Add expiration time for events +ALTER TABLE event ADD COLUMN expires_at timestamp(0) with time zone; +-- Index expiration time +CREATE INDEX event_expires_at_idx ON "event" (expires_at); + "#, + ], + } + } +}