fix: block other writers during checkpoint to eliminate DB lock errors

This commit is contained in:
Greg Heartsfield 2023-02-20 16:50:44 -06:00
parent 39f9984c4f
commit df411c24fb

View File

@ -251,8 +251,11 @@ impl SqliteRepo {
impl NostrRepo for SqliteRepo { impl NostrRepo for SqliteRepo {
async fn start(&self) -> Result<()> { async fn start(&self) -> Result<()> {
db_checkpoint_task(self.maint_pool.clone(), Duration::from_secs(60), self.checkpoint_in_progress.clone()).await?; db_checkpoint_task(self.maint_pool.clone(), Duration::from_secs(60),
cleanup_expired(self.maint_pool.clone(), Duration::from_secs(600), self.write_in_progress.clone()).await self.write_in_progress.clone(),
self.checkpoint_in_progress.clone()).await?;
cleanup_expired(self.maint_pool.clone(), Duration::from_secs(600),
self.write_in_progress.clone()).await
} }
async fn migrate_up(&self) -> Result<usize> { async fn migrate_up(&self) -> Result<usize> {
@ -510,6 +513,7 @@ impl NostrRepo for SqliteRepo {
let e = hex::decode(event_id).ok(); let e = hex::decode(event_id).ok();
let n = name.to_owned(); let n = name.to_owned();
let mut conn = self.write_pool.get()?; let mut conn = self.write_pool.get()?;
let _write_guard = self.write_in_progress.lock().await;
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let tx = conn.transaction()?; let tx = conn.transaction()?;
{ {
@ -537,6 +541,7 @@ impl NostrRepo for SqliteRepo {
/// Update verification timestamp /// Update verification timestamp
async fn update_verification_timestamp(&self, id: u64) -> Result<()> { async fn update_verification_timestamp(&self, id: u64) -> Result<()> {
let mut conn = self.write_pool.get()?; let mut conn = self.write_pool.get()?;
let _write_guard = self.write_in_progress.lock().await;
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
// add some jitter to the verification to prevent everything from stacking up together. // add some jitter to the verification to prevent everything from stacking up together.
let verif_time = now_jitter(600); let verif_time = now_jitter(600);
@ -559,6 +564,7 @@ impl NostrRepo for SqliteRepo {
/// Update verification record as failed /// Update verification record as failed
async fn fail_verification(&self, id: u64) -> Result<()> { async fn fail_verification(&self, id: u64) -> Result<()> {
let mut conn = self.write_pool.get()?; let mut conn = self.write_pool.get()?;
let _write_guard = self.write_in_progress.lock().await;
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
// add some jitter to the verification to prevent everything from stacking up together. // add some jitter to the verification to prevent everything from stacking up together.
let fail_time = now_jitter(600); let fail_time = now_jitter(600);
@ -578,6 +584,7 @@ impl NostrRepo for SqliteRepo {
/// Delete verification record /// Delete verification record
async fn delete_verification(&self, id: u64) -> Result<()> { async fn delete_verification(&self, id: u64) -> Result<()> {
let mut conn = self.write_pool.get()?; let mut conn = self.write_pool.get()?;
let _write_guard = self.write_in_progress.lock().await;
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let tx = conn.transaction()?; let tx = conn.transaction()?;
{ {
@ -996,7 +1003,7 @@ pub fn delete_expired(conn: &mut PooledConnection) -> Result<usize> {
} }
/// Perform database WAL checkpoint on a regular basis /// Perform database WAL checkpoint on a regular basis
pub async fn db_checkpoint_task(pool: SqlitePool, frequency: Duration, checkpoint_in_progress: Arc<Mutex<u64>>) -> Result<()> { pub async fn db_checkpoint_task(pool: SqlitePool, frequency: Duration, write_in_progress: Arc<Mutex<u64>>, checkpoint_in_progress: Arc<Mutex<u64>>) -> Result<()> {
// TODO; use acquire_many on the reader semaphore to stop them from interrupting this. // TODO; use acquire_many on the reader semaphore to stop them from interrupting this.
tokio::task::spawn(async move { tokio::task::spawn(async move {
// WAL size in pages. // WAL size in pages.
@ -1011,6 +1018,8 @@ pub async fn db_checkpoint_task(pool: SqlitePool, frequency: Duration, checkpoin
tokio::select! { tokio::select! {
_ = tokio::time::sleep(frequency) => { _ = tokio::time::sleep(frequency) => {
if let Ok(mut conn) = pool.get() { if let Ok(mut conn) = pool.get() {
// block all other writers
let _write_guard = write_in_progress.lock().await;
let mut _guard:Option<MutexGuard<u64>> = None; let mut _guard:Option<MutexGuard<u64>> = None;
// the busy timer will block writers, so don't set // the busy timer will block writers, so don't set
// this any higher than you want max latency for event // this any higher than you want max latency for event