mirror of
https://github.com/scsibug/nostr-rs-relay.git
synced 2024-11-22 09:09:07 -05:00
feat: increase wal_checkpoint time when WAL is large
This commit is contained in:
parent
0b51675b38
commit
65fd0ed08b
54
src/db.rs
54
src/db.rs
|
@ -39,9 +39,8 @@ pub struct SubmittedEvent {
|
||||||
/// Database file
|
/// Database file
|
||||||
pub const DB_FILE: &str = "nostr.db";
|
pub const DB_FILE: &str = "nostr.db";
|
||||||
|
|
||||||
/// How frequently to run maintenance
|
/// How frequently to attempt checkpointing
|
||||||
/// How many persisted events before DB maintenannce is triggered.
|
pub const CHECKPOINT_FREQ_SEC: u64 = 60;
|
||||||
pub const EVENT_MAINTENANCE_FREQ_SEC: u64 = 60;
|
|
||||||
|
|
||||||
/// How many persisted events before we pause for backups.
|
/// How many persisted events before we pause for backups.
|
||||||
/// It isn't clear this is enough to make the online backup API work yet.
|
/// It isn't clear this is enough to make the online backup API work yet.
|
||||||
|
@ -119,8 +118,8 @@ enum SqliteStatus {
|
||||||
Other(u64),
|
Other(u64),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Checkpoint/Truncate WAL
|
/// Checkpoint/Truncate WAL. Returns the number of WAL pages remaining.
|
||||||
pub fn checkpoint_db(conn: &mut PooledConnection) -> Result<()> {
|
pub fn checkpoint_db(conn: &mut PooledConnection) -> Result<usize> {
|
||||||
let query = "PRAGMA wal_checkpoint(TRUNCATE);";
|
let query = "PRAGMA wal_checkpoint(TRUNCATE);";
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
let (cp_result, wal_size, _frames_checkpointed) = conn.query_row(query, [], |row| {
|
let (cp_result, wal_size, _frames_checkpointed) = conn.query_row(query, [], |row| {
|
||||||
|
@ -141,7 +140,7 @@ pub fn checkpoint_db(conn: &mut PooledConnection) -> Result<()> {
|
||||||
result,
|
result,
|
||||||
wal_size
|
wal_size
|
||||||
);
|
);
|
||||||
Ok(())
|
Ok(wal_size as usize)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Spawn a database writer that persists events to the SQLite store.
|
/// Spawn a database writer that persists events to the SQLite store.
|
||||||
|
@ -690,21 +689,54 @@ fn log_pool_stats(name: &str, pool: &SqlitePool) {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// Perform database maintenance on a regular basis
|
/// Perform database maintenance on a regular basis
|
||||||
pub async fn db_maintenance(pool: SqlitePool) {
|
pub async fn db_optimize(pool: SqlitePool) {
|
||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = tokio::time::sleep(Duration::from_secs(EVENT_MAINTENANCE_FREQ_SEC)) => {
|
_ = tokio::time::sleep(Duration::from_secs(60*60)) => {
|
||||||
if let Ok(mut conn) = pool.get() {
|
if let Ok(mut conn) = pool.get() {
|
||||||
// the busy timer will block writers, so don't set
|
// the busy timer will block writers, so don't set
|
||||||
// this any higher than you want max latency for event
|
// this any higher than you want max latency for event
|
||||||
// writes.
|
// writes.
|
||||||
conn.busy_timeout(Duration::from_secs(1)).ok();
|
info!("running database optimizer");
|
||||||
debug!("running database optimizer");
|
|
||||||
optimize_db(&mut conn).ok();
|
optimize_db(&mut conn).ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Perform database WAL checkpoint on a regular basis
|
||||||
|
pub async fn db_checkpoint(pool: SqlitePool) {
|
||||||
|
tokio::task::spawn(async move {
|
||||||
|
// WAL size in pages.
|
||||||
|
let mut current_wal_size = 0;
|
||||||
|
// WAL threshold for more aggressive checkpointing (10,000 pages, or about 40MB)
|
||||||
|
let wal_threshold = 1000*10;
|
||||||
|
// default threshold for the busy timer
|
||||||
|
let busy_wait_default = Duration::from_secs(1);
|
||||||
|
// if the WAL file is getting too big, switch to this
|
||||||
|
let busy_wait_default_long = Duration::from_secs(5);
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = tokio::time::sleep(Duration::from_secs(CHECKPOINT_FREQ_SEC)) => {
|
||||||
|
if let Ok(mut conn) = pool.get() {
|
||||||
|
// the busy timer will block writers, so don't set
|
||||||
|
// this any higher than you want max latency for event
|
||||||
|
// writes.
|
||||||
|
if current_wal_size <= wal_threshold {
|
||||||
|
conn.busy_timeout(busy_wait_default).ok();
|
||||||
|
} else {
|
||||||
|
// if the wal size has exceeded a threshold, increase the busy timeout.
|
||||||
|
conn.busy_timeout(busy_wait_default_long).ok();
|
||||||
|
}
|
||||||
debug!("running wal_checkpoint(TRUNCATE)");
|
debug!("running wal_checkpoint(TRUNCATE)");
|
||||||
checkpoint_db(&mut conn).ok();
|
if let Ok(new_size) = checkpoint_db(&mut conn) {
|
||||||
|
current_wal_size = new_size;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -325,10 +325,11 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
|
||||||
&settings,
|
&settings,
|
||||||
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
|
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
|
||||||
1,
|
1,
|
||||||
1,
|
2,
|
||||||
false,
|
false,
|
||||||
);
|
);
|
||||||
db::db_maintenance(maintenance_pool).await;
|
db::db_optimize(maintenance_pool.clone()).await;
|
||||||
|
db::db_checkpoint(maintenance_pool).await;
|
||||||
|
|
||||||
// listen for (external to tokio) shutdown request
|
// listen for (external to tokio) shutdown request
|
||||||
let controlled_shutdown = invoke_shutdown.clone();
|
let controlled_shutdown = invoke_shutdown.clone();
|
||||||
|
|
Loading…
Reference in New Issue
Block a user