mirror of
https://github.com/scsibug/nostr-rs-relay.git
synced 2024-11-09 21:29:06 -05:00
fix: use database to publish all events
This fixes a race condition where a publisher might send an event, and immediately after issue a subscription for the same event ID. Prior to this change, that event would have been published on the broadcast channel (and ignored by our publisher, because they had not yet issued the subscription), but not yet committed to the database. Their subscription would trigger a database query which would return zero results. Therefore, they would never see the event they published. The noscl tool is one client that would suffer from this. Now, all events are broadcast only after they exist in the database, so a late subscription will always return the event.
This commit is contained in:
parent
56c40f2be9
commit
1589268eba
|
@ -67,6 +67,7 @@ CREATE INDEX IF NOT EXISTS pubkey_ref_index ON pubkey_ref(referenced_pubkey);
|
||||||
/// Spawn a database writer that persists events to the SQLite store.
|
/// Spawn a database writer that persists events to the SQLite store.
|
||||||
pub async fn db_writer(
|
pub async fn db_writer(
|
||||||
mut event_rx: tokio::sync::mpsc::Receiver<Event>,
|
mut event_rx: tokio::sync::mpsc::Receiver<Event>,
|
||||||
|
bcast_tx: tokio::sync::broadcast::Sender<Event>,
|
||||||
) -> tokio::task::JoinHandle<Result<()>> {
|
) -> tokio::task::JoinHandle<Result<()>> {
|
||||||
task::spawn_blocking(move || {
|
task::spawn_blocking(move || {
|
||||||
let mut conn = Connection::open_with_flags(
|
let mut conn = Connection::open_with_flags(
|
||||||
|
@ -94,6 +95,8 @@ pub async fn db_writer(
|
||||||
info!("nothing inserted (dupe?)");
|
info!("nothing inserted (dupe?)");
|
||||||
} else {
|
} else {
|
||||||
info!("persisted event: {}", event.get_event_id_prefix());
|
info!("persisted event: {}", event.get_event_id_prefix());
|
||||||
|
// send this out to all clients
|
||||||
|
bcast_tx.send(event.clone()).ok();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
|
13
src/main.rs
13
src/main.rs
|
@ -44,8 +44,10 @@ fn main() -> Result<(), Error> {
|
||||||
// validated events that need to be persisted are sent to the
|
// validated events that need to be persisted are sent to the
|
||||||
// database on via this channel.
|
// database on via this channel.
|
||||||
let (event_tx, event_rx) = mpsc::channel::<Event>(16);
|
let (event_tx, event_rx) = mpsc::channel::<Event>(16);
|
||||||
// start the database writer thread.
|
// start the database writer thread. Give it a channel for
|
||||||
db::db_writer(event_rx).await;
|
// writing events, and for publishing events that have been
|
||||||
|
// written (to all connected clients).
|
||||||
|
db::db_writer(event_rx, bcast_tx.clone()).await;
|
||||||
// establish a channel for letting all threads now about a
|
// establish a channel for letting all threads now about a
|
||||||
// requested server shutdown.
|
// requested server shutdown.
|
||||||
let (invoke_shutdown, _) = broadcast::channel::<()>(1);
|
let (invoke_shutdown, _) = broadcast::channel::<()>(1);
|
||||||
|
@ -86,7 +88,6 @@ async fn nostr_server(
|
||||||
mut shutdown: Receiver<()>,
|
mut shutdown: Receiver<()>,
|
||||||
) {
|
) {
|
||||||
// get a broadcast channel for clients to communicate on
|
// get a broadcast channel for clients to communicate on
|
||||||
// wrap the TCP stream in a websocket.
|
|
||||||
let mut bcast_rx = broadcast.subscribe();
|
let mut bcast_rx = broadcast.subscribe();
|
||||||
// upgrade the TCP connection to WebSocket
|
// upgrade the TCP connection to WebSocket
|
||||||
let conn = tokio_tungstenite::accept_async(stream).await;
|
let conn = tokio_tungstenite::accept_async(stream).await;
|
||||||
|
@ -154,18 +155,12 @@ async fn nostr_server(
|
||||||
// Write this to the database
|
// Write this to the database
|
||||||
event_tx.send(e.clone()).await.ok();
|
event_tx.send(e.clone()).await.ok();
|
||||||
client_published_event_count += 1;
|
client_published_event_count += 1;
|
||||||
// send this event to everyone listening.
|
|
||||||
let bcast_res = broadcast.send(e);
|
|
||||||
if bcast_res.is_err() {
|
|
||||||
warn!("could not send broadcast message: {:?}", bcast_res);
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
Err(_) => {info!("client {} sent an invalid event", cid)}
|
Err(_) => {info!("client {} sent an invalid event", cid)}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Some(Ok(SubMsg(s))) => {
|
Some(Ok(SubMsg(s))) => {
|
||||||
debug!("client {} requesting a subscription", cid);
|
debug!("client {} requesting a subscription", cid);
|
||||||
|
|
||||||
// subscription handling consists of:
|
// subscription handling consists of:
|
||||||
// * registering the subscription so future events can be matched
|
// * registering the subscription so future events can be matched
|
||||||
// * making a channel to cancel to request later
|
// * making a channel to cancel to request later
|
||||||
|
|
Loading…
Reference in New Issue
Block a user