feat(NIP-40): sqlite support for event expiration

This commit is contained in:
Greg Heartsfield 2023-02-17 11:15:06 -06:00
parent 3229e4192f
commit 8ea63f0b27
5 changed files with 109 additions and 14 deletions

View File

@ -137,6 +137,15 @@ impl Event {
self.kind >= 20000 && self.kind < 30000 self.kind >= 20000 && self.kind < 30000
} }
/// Is this event currently expired?
pub fn is_expired(&self) -> bool {
if let Some(exp) = self.expiration() {
exp <= unix_time()
} else {
false
}
}
/// Determine the time at which this event should expire /// Determine the time at which this event should expire
pub fn expiration(&self) -> Option<u64> { pub fn expiration(&self) -> Option<u64> {
let default = "".to_string(); let default = "".to_string();

View File

@ -29,7 +29,7 @@ pub struct RelayInfo {
/// Convert an Info configuration into public Relay Info /// Convert an Info configuration into public Relay Info
impl From<Settings> for RelayInfo { impl From<Settings> for RelayInfo {
fn from(c: Settings) -> Self { fn from(c: Settings) -> Self {
let mut supported_nips = vec![1, 2, 9, 11, 12, 15, 16, 20, 22, 33]; let mut supported_nips = vec![1, 2, 9, 11, 12, 15, 16, 20, 22, 33, 40];
if c.authorization.nip42_auth { if c.authorization.nip42_auth {
supported_nips.push(42); supported_nips.push(42);

View File

@ -6,7 +6,7 @@ use crate::event::{single_char_tagname, Event};
use crate::hexrange::hex_range; use crate::hexrange::hex_range;
use crate::hexrange::HexSearch; use crate::hexrange::HexSearch;
use crate::repo::sqlite_migration::{STARTUP_SQL,upgrade_db}; use crate::repo::sqlite_migration::{STARTUP_SQL,upgrade_db};
use crate::utils::{is_hex}; use crate::utils::{is_hex,unix_time};
use crate::nip05::{Nip05Name, VerificationRecord}; use crate::nip05::{Nip05Name, VerificationRecord};
use crate::subscription::{ReqFilter, Subscription}; use crate::subscription::{ReqFilter, Subscription};
use crate::server::NostrMetrics; use crate::server::NostrMetrics;
@ -135,8 +135,8 @@ impl SqliteRepo {
} }
// ignore if the event hash is a duplicate. // ignore if the event hash is a duplicate.
let mut ins_count = tx.execute( let mut ins_count = tx.execute(
"INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, delegated_by, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%s','now'), FALSE);", "INSERT OR IGNORE INTO event (event_hash, created_at, expires_at, kind, author, delegated_by, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, strftime('%s','now'), FALSE);",
params![id_blob, e.created_at, e.kind, pubkey_blob, delegator_blob, event_str] params![id_blob, e.created_at, e.expiration(), e.kind, pubkey_blob, delegator_blob, event_str]
)? as u64; )? as u64;
if ins_count == 0 { if ins_count == 0 {
// if the event was a duplicate, no need to insert event or // if the event was a duplicate, no need to insert event or
@ -251,7 +251,8 @@ impl SqliteRepo {
impl NostrRepo for SqliteRepo { impl NostrRepo for SqliteRepo {
async fn start(&self) -> Result<()> { async fn start(&self) -> Result<()> {
db_checkpoint_task(self.maint_pool.clone(), Duration::from_secs(60), self.checkpoint_in_progress.clone()).await db_checkpoint_task(self.maint_pool.clone(), Duration::from_secs(60), self.checkpoint_in_progress.clone()).await?;
cleanup_expired(self.maint_pool.clone(), Duration::from_secs(5), self.write_in_progress.clone()).await
} }
async fn migrate_up(&self) -> Result<usize> { async fn migrate_up(&self) -> Result<usize> {
@ -280,10 +281,10 @@ impl NostrRepo for SqliteRepo {
let wr = SqliteRepo::persist_event(&mut conn, &e); let wr = SqliteRepo::persist_event(&mut conn, &e);
match wr { match wr {
Err(SqlError(rusqlite::Error::SqliteFailure(e,_))) => { Err(SqlError(rusqlite::Error::SqliteFailure(e,_))) => {
// this basically means that NIP-05 was // this basically means that NIP-05 or another
// writing to the database between us reading // writer was using the database between us
// and promoting the connection to a write // reading and promoting the connection to a
// lock. // write lock.
info!("event write failed, DB locked (attempt: {}); sqlite err: {}", info!("event write failed, DB locked (attempt: {}); sqlite err: {}",
attempts, e.extended_code); attempts, e.extended_code);
}, },
@ -375,7 +376,7 @@ impl NostrRepo for SqliteRepo {
let mut last_successful_send = Instant::now(); let mut last_successful_send = Instant::now();
// execute the query. // execute the query.
// make the actual SQL query (with parameters inserted) available // make the actual SQL query (with parameters inserted) available
conn.trace(Some(|x| {trace!("SQL trace: {:?}", x)})); conn.trace(Some(|x| {info!("SQL trace: {:?}", x)}));
let mut stmt = conn.prepare_cached(&q)?; let mut stmt = conn.prepare_cached(&q)?;
let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?; let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
@ -854,6 +855,9 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
} }
// never display hidden events // never display hidden events
query.push_str(" WHERE hidden!=TRUE"); query.push_str(" WHERE hidden!=TRUE");
// never display hidden events
filter_components.push("(expires_at IS NULL OR expires_at > ?)".to_string());
params.push(Box::new(unix_time()));
// build filter component conditions // build filter component conditions
if !filter_components.is_empty() { if !filter_components.is_empty() {
query.push_str(" AND "); query.push_str(" AND ");
@ -943,6 +947,54 @@ pub fn build_pool(
pool pool
} }
/// Cleanup expired events on a regular basis
async fn cleanup_expired(pool: SqlitePool, frequency: Duration, write_in_progress: Arc<Mutex<u64>>) -> Result<()> {
tokio::task::spawn(async move {
loop {
tokio::select! {
_ = tokio::time::sleep(frequency) => {
if let Ok(mut conn) = pool.get() {
let mut _guard:Option<MutexGuard<u64>> = None;
// take a write lock to prevent event writes
// from proceeding while we are deleting
// events. This isn't necessary, but
// minimizes the chances of forcing event
// persistence to be retried.
_guard = Some(write_in_progress.lock().await);
let start = Instant::now();
let exp_res = tokio::task::spawn_blocking(move || {
delete_expired(&mut conn)
}).await;
match exp_res {
Ok(Ok(count)) => {
if count > 0 {
info!("removed {} expired events in: {:?}", count, start.elapsed());
}
},
_ => {
// either the task or underlying query failed
info!("there was an error cleaning up expired events: {:?}", exp_res);
}
}
}
}
};
}
});
Ok(())
}
/// Execute a query to delete all expired events
pub fn delete_expired(conn: &mut PooledConnection) -> Result<usize> {
let tx = conn.transaction()?;
let update_count = tx.execute(
"DELETE FROM event WHERE expires_at <= ?",
params![unix_time()],
)?;
tx.commit()?;
Ok(update_count)
}
/// Perform database WAL checkpoint on a regular basis /// Perform database WAL checkpoint on a regular basis
pub async fn db_checkpoint_task(pool: SqlitePool, frequency: Duration, checkpoint_in_progress: Arc<Mutex<u64>>) -> Result<()> { pub async fn db_checkpoint_task(pool: SqlitePool, frequency: Duration, checkpoint_in_progress: Arc<Mutex<u64>>) -> Result<()> {
// TODO; use acquire_many on the reader semaphore to stop them from interrupting this. // TODO; use acquire_many on the reader semaphore to stop them from interrupting this.

View File

@ -23,7 +23,7 @@ pragma mmap_size = 17179869184; -- cap mmap at 16GB
"##; "##;
/// Latest database version /// Latest database version
pub const DB_VERSION: usize = 16; pub const DB_VERSION: usize = 17;
/// Schema definition /// Schema definition
const INIT_SQL: &str = formatcp!( const INIT_SQL: &str = formatcp!(
@ -43,6 +43,7 @@ id INTEGER PRIMARY KEY,
event_hash BLOB NOT NULL, -- 4-byte hash event_hash BLOB NOT NULL, -- 4-byte hash
first_seen INTEGER NOT NULL, -- when the event was first seen (not authored!) (seconds since 1970) first_seen INTEGER NOT NULL, -- when the event was first seen (not authored!) (seconds since 1970)
created_at INTEGER NOT NULL, -- when the event was authored created_at INTEGER NOT NULL, -- when the event was authored
expires_at INTEGER, -- when the event expires and may be deleted
author BLOB NOT NULL, -- author pubkey author BLOB NOT NULL, -- author pubkey
delegated_by BLOB, -- delegator pubkey (NIP-26) delegated_by BLOB, -- delegator pubkey (NIP-26)
kind INTEGER NOT NULL, -- event kind kind INTEGER NOT NULL, -- event kind
@ -61,6 +62,7 @@ CREATE INDEX IF NOT EXISTS kind_author_index ON event(kind,author);
CREATE INDEX IF NOT EXISTS kind_created_at_index ON event(kind,created_at); CREATE INDEX IF NOT EXISTS kind_created_at_index ON event(kind,created_at);
CREATE INDEX IF NOT EXISTS author_created_at_index ON event(author,created_at); CREATE INDEX IF NOT EXISTS author_created_at_index ON event(author,created_at);
CREATE INDEX IF NOT EXISTS author_kind_index ON event(author,kind); CREATE INDEX IF NOT EXISTS author_kind_index ON event(author,kind);
CREATE INDEX IF NOT EXISTS event_expiration ON event(expires_at);
-- Tag Table -- Tag Table
-- Tag values are stored as either a BLOB (if they come in as a -- Tag values are stored as either a BLOB (if they come in as a
@ -208,6 +210,9 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<usize> {
if curr_version == 15 { if curr_version == 15 {
curr_version = mig_15_to_16(conn)?; curr_version = mig_15_to_16(conn)?;
} }
if curr_version == 16 {
curr_version = mig_16_to_17(conn)?;
}
if curr_version == DB_VERSION { if curr_version == DB_VERSION {
info!( info!(
@ -729,3 +734,22 @@ CREATE INDEX IF NOT EXISTS tag_covering_index ON tag(name,kind,value,created_at,
info!("database schema upgraded v15 -> v16 in {:?}", start.elapsed()); info!("database schema upgraded v15 -> v16 in {:?}", start.elapsed());
Ok(16) Ok(16)
} }
fn mig_16_to_17(conn: &mut PooledConnection) -> Result<usize> {
info!("database schema needs update from 16->17");
let upgrade_sql = r##"
ALTER TABLE event ADD COLUMN expires_at INTEGER;
CREATE INDEX IF NOT EXISTS event_expiration ON event(expires_at);
PRAGMA user_version = 17;
"##;
match conn.execute_batch(upgrade_sql) {
Ok(()) => {
info!("database schema upgraded v16 -> v17");
}
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be upgraded");
}
}
Ok(17)
}

View File

@ -791,11 +791,21 @@ async fn nostr_server(
metrics.cmd_event.inc(); metrics.cmd_event.inc();
let id_prefix:String = e.id.chars().take(8).collect(); let id_prefix:String = e.id.chars().take(8).collect();
debug!("successfully parsed/validated event: {:?} (cid: {}, kind: {})", id_prefix, cid, e.kind); debug!("successfully parsed/validated event: {:?} (cid: {}, kind: {})", id_prefix, cid, e.kind);
// check if event is expired
if e.is_expired() {
let notice = Notice::invalid(e.id, "The event has already expired");
ws_stream.send(make_notice_message(&notice)).await.ok();
// check if the event is too far in the future. // check if the event is too far in the future.
if e.is_valid_timestamp(settings.options.reject_future_seconds) { } else if e.is_valid_timestamp(settings.options.reject_future_seconds) {
// Write this to the database. // Write this to the database.
let auth_pubkey = conn.auth_pubkey().and_then(|pubkey| hex::decode(&pubkey).ok()); let auth_pubkey = conn.auth_pubkey().and_then(|pubkey| hex::decode(&pubkey).ok());
let submit_event = SubmittedEvent { event: e.clone(), notice_tx: notice_tx.clone(), source_ip: conn.ip().to_string(), origin: client_info.origin.clone(), user_agent: client_info.user_agent.clone(), auth_pubkey }; let submit_event = SubmittedEvent {
event: e.clone(),
notice_tx: notice_tx.clone(),
source_ip: conn.ip().to_string(),
origin: client_info.origin.clone(),
user_agent: client_info.user_agent.clone(),
auth_pubkey };
event_tx.send(submit_event).await.ok(); event_tx.send(submit_event).await.ok();
client_published_event_count += 1; client_published_event_count += 1;
} else { } else {