feat(NIP-40): postgres support for event expiration

This commit is contained in:
Greg Heartsfield 2023-02-17 13:25:56 -06:00
parent e5ca8c2a86
commit bf06bea808
3 changed files with 76 additions and 5 deletions

View File

@ -71,6 +71,8 @@ async fn build_postgres_pool(settings: &Settings, metrics: NostrMetrics) -> Post
// Panic on migration failure // Panic on migration failure
let version = repo.migrate_up().await.unwrap(); let version = repo.migrate_up().await.unwrap();
info!("Postgres migration completed, at v{}", version); info!("Postgres migration completed, at v{}", version);
// startup scheduled tasks
repo.start().await.ok();
repo repo
} }

View File

@ -15,11 +15,11 @@ use sqlx::Error::RowNotFound;
use crate::hexrange::{hex_range, HexSearch}; use crate::hexrange::{hex_range, HexSearch};
use crate::repo::postgres_migration::run_migrations; use crate::repo::postgres_migration::run_migrations;
use crate::server::NostrMetrics; use crate::server::NostrMetrics;
use crate::utils::{is_hex, is_lower_hex}; use crate::utils::{is_hex, is_lower_hex, self};
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot::Receiver; use tokio::sync::oneshot::Receiver;
use tracing::log::trace; use tracing::log::trace;
use tracing::{debug, error, info}; use tracing::{debug, error, warn, info};
use crate::error; use crate::error;
pub type PostgresPool = sqlx::pool::Pool<Postgres>; pub type PostgresPool = sqlx::pool::Pool<Postgres>;
@ -36,13 +36,52 @@ impl PostgresRepo {
metrics: m, metrics: m,
} }
} }
}
/// Cleanup expired events on a regular basis
async fn cleanup_expired(conn: PostgresPool, frequency: Duration) -> Result<()> {
tokio::task::spawn(async move {
loop {
tokio::select! {
_ = tokio::time::sleep(frequency) => {
let start = Instant::now();
let exp_res = delete_expired(conn.clone()).await;
match exp_res {
Ok(exp_count) => {
if exp_count > 0 {
info!("removed {} expired events in: {:?}", exp_count, start.elapsed());
}
},
Err(e) => {
warn!("could not remove expired events due to error: {:?}", e);
}
}
}
};
}
});
Ok(())
}
/// One-time deletion of all expired events
async fn delete_expired(conn:PostgresPool) -> Result<u64> {
let mut tx = conn.begin().await?;
let update_count = sqlx::query("DELETE FROM \"event\" WHERE expires_at <= $1;")
.bind(Utc.timestamp_opt(utils::unix_time() as i64, 0).unwrap())
.execute(&mut tx)
.await?.rows_affected();
tx.commit().await?;
Ok(update_count)
} }
#[async_trait] #[async_trait]
impl NostrRepo for PostgresRepo { impl NostrRepo for PostgresRepo {
async fn start(&self) -> Result<()> { async fn start(&self) -> Result<()> {
info!("not implemented"); // begin a cleanup task for expired events.
cleanup_expired(self.conn.clone(), Duration::from_secs(600)).await?;
Ok(()) Ok(())
} }
@ -106,13 +145,14 @@ impl NostrRepo for PostgresRepo {
// ignore if the event hash is a duplicate. // ignore if the event hash is a duplicate.
let mut ins_count = sqlx::query( let mut ins_count = sqlx::query(
r#"INSERT INTO "event" r#"INSERT INTO "event"
(id, pub_key, created_at, kind, "content", delegated_by) (id, pub_key, created_at, expires_at, kind, "content", delegated_by)
VALUES($1, $2, $3, $4, $5, $6) VALUES($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (id) DO NOTHING"#, ON CONFLICT (id) DO NOTHING"#,
) )
.bind(&id_blob) .bind(&id_blob)
.bind(&pubkey_blob) .bind(&pubkey_blob)
.bind(Utc.timestamp_opt(e.created_at as i64, 0).unwrap()) .bind(Utc.timestamp_opt(e.created_at as i64, 0).unwrap())
.bind(e.expiration().and_then(|x| Utc.timestamp_opt(x as i64, 0).latest()))
.bind(e.kind as i64) .bind(e.kind as i64)
.bind(event_str.into_bytes()) .bind(event_str.into_bytes())
.bind(delegator_blob) .bind(delegator_blob)
@ -708,6 +748,14 @@ fn query_from_filter(f: &ReqFilter) -> Option<QueryBuilder<Postgres>> {
query.push("e.hidden != 1::bit(1)"); query.push("e.hidden != 1::bit(1)");
} }
// never display expired events
if push_and {
query.push(" AND ");
}
query
.push("(e.expires_at IS NULL OR e.expires_at > ")
.push_bind(Utc.timestamp_opt(utils::unix_time() as i64, 0).unwrap()).push(")");
// Apply per-filter limit to this query. // Apply per-filter limit to this query.
// The use of a LIMIT implies a DESC order, to capture only the most recent events. // The use of a LIMIT implies a DESC order, to capture only the most recent events.
if let Some(lim) = f.limit { if let Some(lim) = f.limit {

View File

@ -35,6 +35,7 @@ pub async fn run_migrations(db: &PostgresPool) -> crate::error::Result<usize> {
m002::rebuild_tags(db).await?; m002::rebuild_tags(db).await?;
} }
run_migration(m003::migration(), db).await; run_migration(m003::migration(), db).await;
run_migration(m004::migration(), db).await;
Ok(current_version(db).await as usize) Ok(current_version(db).await as usize)
} }
@ -256,3 +257,23 @@ ALTER TABLE tag ADD CONSTRAINT unique_constraint_name UNIQUE (event_id, "name",
} }
} }
} }
mod m004 {
use crate::repo::postgres_migration::{Migration, SimpleSqlMigration};
pub const VERSION: i64 = 4;
pub fn migration() -> impl Migration {
SimpleSqlMigration {
serial_number: VERSION,
sql: vec![
r#"
-- Add expiration time for events
ALTER TABLE event ADD COLUMN expires_at timestamp(0) with time zone;
-- Index expiration time
CREATE INDEX event_expires_at_idx ON "event" (expires_at);
"#,
],
}
}
}