feat: perform full checkpoints and truncate WAL every 2k events

This commit is contained in:
Greg Heartsfield 2022-12-22 10:11:05 -06:00
parent ea204761c9
commit 168cf513ac

View File

@ -38,9 +38,10 @@ pub struct SubmittedEvent {
/// Database file
pub const DB_FILE: &str = "nostr.db";
/// How many persisted events before optimization is triggered
pub const EVENT_COUNT_OPTIMIZE_TRIGGER: usize = 500;
/// How many persisted events before we pause for backups
/// How many persisted events before DB maintenannce is triggered.
pub const EVENT_COUNT_MAINTENANCE_TRIGGER: usize = 2000;
/// How many persisted events before we pause for backups.
/// It isn't clear this is enough to make the online backup API work yet.
pub const EVENT_COUNT_BACKUP_PAUSE_TRIGGER: usize = 1000;
/// Build a database connection pool.
@ -95,6 +96,38 @@ pub fn optimize_db(conn: &mut PooledConnection) -> Result<()> {
conn.execute_batch("PRAGMA optimize;")?;
Ok(())
}
#[derive(Debug)]
enum SqliteReturnStatus {
SqliteOk,
SqliteBusy,
SqliteError,
SqliteOther(u64),
}
/// Checkpoint/Truncate WAL
pub fn checkpoint_db(conn: &mut PooledConnection) -> Result<()> {
let query = "PRAGMA wal_checkpoint(TRUNCATE);";
let start = Instant::now();
let (cp_result, wal_size, _frames_checkpointed) = conn.query_row(query, [], |row| {
let checkpoint_result: u64 = row.get(0)?;
let wal_size: u64 = row.get(1)?;
let frames_checkpointed: u64 = row.get(2)?;
Ok((checkpoint_result, wal_size, frames_checkpointed))
})?;
let result = match cp_result {
0 => SqliteReturnStatus::SqliteOk,
1 => SqliteReturnStatus::SqliteBusy,
2 => SqliteReturnStatus::SqliteError,
x => SqliteReturnStatus::SqliteOther(x),
};
info!(
"checkpoint ran in {:?} (result: {:?}, WAL size: {})",
start.elapsed(),
result,
wal_size
);
Ok(())
}
/// Spawn a database writer that persists events to the SQLite store.
pub async fn db_writer(
@ -135,9 +168,12 @@ pub async fn db_writer(
let rps_setting = settings.limits.messages_per_sec;
let mut most_recent_rate_limit = Instant::now();
let mut lim_opt = None;
// Keep rough track of events so we can run optimize eventually.
let mut optimize_counter: usize = 0;
// Constant writing has interfered with online backups. Keep track of how long since we've given the backups a chance to run.
// Keep rough track of events so we can run maintenance
// eventually.
let mut maintenance_counter: usize = 0;
// Constant writing has interfered with online backups. Keep
// track of how long since we've given the backups a chance to
// run.
let mut backup_pause_counter: usize = 0;
let clock = governor::clock::QuantaClock::default();
if let Some(rps) = rps_setting {
@ -279,12 +315,14 @@ pub async fn db_writer(
backup_pause_counter = 0
}
// Use this as a trigger to do optimization
optimize_counter += 1;
if optimize_counter > EVENT_COUNT_OPTIMIZE_TRIGGER {
info!("running database optimizer");
optimize_counter = 0;
// Use this as a trigger to do optimization & checkpointing
maintenance_counter += 1;
if maintenance_counter > EVENT_COUNT_MAINTENANCE_TRIGGER {
debug!("running database optimizer");
maintenance_counter = 0;
optimize_db(&mut pool.get()?).ok();
debug!("running wal_checkpoint(TRUNCATE)");
checkpoint_db(&mut pool.get()?).ok();
}
}