Compare commits

...

6 Commits
0.8.6 ... 0.8.8

Author SHA1 Message Date
Greg Heartsfield
2be75e18fb build: bump version to 0.8.8 2023-02-21 08:16:40 -06:00
Greg Heartsfield
5f6ff4c2b7 fix: in-memory SQLite DB correctly shares memory between connections
fixes: https://todo.sr.ht/~gheartsfield/nostr-rs-relay/73#event-227131
2023-02-21 08:14:19 -06:00
Greg Heartsfield
df411c24fb fix: block other writers during checkpoint to eliminate DB lock errors 2023-02-20 16:50:44 -06:00
Greg Heartsfield
39f9984c4f build: bump version to 0.8.7 2023-02-17 21:05:36 -06:00
Greg Heartsfield
9d55731073 fix: Postgres SQL generation for expiring events 2023-02-17 21:04:30 -06:00
Greg Heartsfield
5638f70d66 fix: set SQL tracing back to appropriate level 2023-02-17 20:50:19 -06:00
4 changed files with 18 additions and 12 deletions

2
Cargo.lock generated
View File

@@ -1550,7 +1550,7 @@ checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]]
name = "nostr-rs-relay"
version = "0.8.6"
version = "0.8.8"
dependencies = [
"anyhow",
"async-std",

View File

@@ -1,6 +1,6 @@
[package]
name = "nostr-rs-relay"
version = "0.8.6"
version = "0.8.8"
edition = "2021"
authors = ["Greg Heartsfield <scsibug@imap.cc>"]
description = "A relay implementation for the Nostr protocol"

View File

@@ -550,6 +550,7 @@ fn query_from_filter(f: &ReqFilter) -> Option<QueryBuilder<Postgres>> {
let mut query = QueryBuilder::new("SELECT e.\"content\", e.created_at FROM \"event\" e WHERE ");
// This tracks whether we need to push a prefix AND before adding another clause
let mut push_and = false;
// Query for "authors", allowing prefix matches
if let Some(auth_vec) = &f.authors {
@@ -747,13 +748,9 @@ fn query_from_filter(f: &ReqFilter) -> Option<QueryBuilder<Postgres>> {
} else {
query.push("e.hidden != 1::bit(1)");
}
// never display expired events
if push_and {
query.push(" AND ");
}
query
.push("(e.expires_at IS NULL OR e.expires_at > ")
.push(" AND (e.expires_at IS NULL OR e.expires_at > ")
.push_bind(Utc.timestamp_opt(utils::unix_time() as i64, 0).unwrap()).push(")");
// Apply per-filter limit to this query.

View File

@@ -251,8 +251,11 @@ impl SqliteRepo {
impl NostrRepo for SqliteRepo {
async fn start(&self) -> Result<()> {
db_checkpoint_task(self.maint_pool.clone(), Duration::from_secs(60), self.checkpoint_in_progress.clone()).await?;
cleanup_expired(self.maint_pool.clone(), Duration::from_secs(600), self.write_in_progress.clone()).await
db_checkpoint_task(self.maint_pool.clone(), Duration::from_secs(60),
self.write_in_progress.clone(),
self.checkpoint_in_progress.clone()).await?;
cleanup_expired(self.maint_pool.clone(), Duration::from_secs(600),
self.write_in_progress.clone()).await
}
async fn migrate_up(&self) -> Result<usize> {
@@ -376,7 +379,7 @@ impl NostrRepo for SqliteRepo {
let mut last_successful_send = Instant::now();
// execute the query.
// make the actual SQL query (with parameters inserted) available
conn.trace(Some(|x| {info!("SQL trace: {:?}", x)}));
conn.trace(Some(|x| {trace!("SQL trace: {:?}", x)}));
let mut stmt = conn.prepare_cached(&q)?;
let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
@@ -510,6 +513,7 @@ impl NostrRepo for SqliteRepo {
let e = hex::decode(event_id).ok();
let n = name.to_owned();
let mut conn = self.write_pool.get()?;
let _write_guard = self.write_in_progress.lock().await;
tokio::task::spawn_blocking(move || {
let tx = conn.transaction()?;
{
@@ -537,6 +541,7 @@ impl NostrRepo for SqliteRepo {
/// Update verification timestamp
async fn update_verification_timestamp(&self, id: u64) -> Result<()> {
let mut conn = self.write_pool.get()?;
let _write_guard = self.write_in_progress.lock().await;
tokio::task::spawn_blocking(move || {
// add some jitter to the verification to prevent everything from stacking up together.
let verif_time = now_jitter(600);
@@ -559,6 +564,7 @@ impl NostrRepo for SqliteRepo {
/// Update verification record as failed
async fn fail_verification(&self, id: u64) -> Result<()> {
let mut conn = self.write_pool.get()?;
let _write_guard = self.write_in_progress.lock().await;
tokio::task::spawn_blocking(move || {
// add some jitter to the verification to prevent everything from stacking up together.
let fail_time = now_jitter(600);
@@ -578,6 +584,7 @@ impl NostrRepo for SqliteRepo {
/// Delete verification record
async fn delete_verification(&self, id: u64) -> Result<()> {
let mut conn = self.write_pool.get()?;
let _write_guard = self.write_in_progress.lock().await;
tokio::task::spawn_blocking(move || {
let tx = conn.transaction()?;
{
@@ -925,7 +932,7 @@ pub fn build_pool(
}
}
let manager = if settings.database.in_memory {
SqliteConnectionManager::memory()
SqliteConnectionManager::file("file::memory:?cache=shared")
.with_flags(flags)
.with_init(|c| c.execute_batch(STARTUP_SQL))
} else {
@@ -996,7 +1003,7 @@ pub fn delete_expired(conn: &mut PooledConnection) -> Result<usize> {
}
/// Perform database WAL checkpoint on a regular basis
pub async fn db_checkpoint_task(pool: SqlitePool, frequency: Duration, checkpoint_in_progress: Arc<Mutex<u64>>) -> Result<()> {
pub async fn db_checkpoint_task(pool: SqlitePool, frequency: Duration, write_in_progress: Arc<Mutex<u64>>, checkpoint_in_progress: Arc<Mutex<u64>>) -> Result<()> {
// TODO; use acquire_many on the reader semaphore to stop them from interrupting this.
tokio::task::spawn(async move {
// WAL size in pages.
@@ -1011,6 +1018,8 @@ pub async fn db_checkpoint_task(pool: SqlitePool, frequency: Duration, checkpoin
tokio::select! {
_ = tokio::time::sleep(frequency) => {
if let Ok(mut conn) = pool.get() {
// block all other writers
let _write_guard = write_in_progress.lock().await;
let mut _guard:Option<MutexGuard<u64>> = None;
// the busy timer will block writers, so don't set
// this any higher than you want max latency for event