diff --git a/src/db.rs b/src/db.rs index 62065a2..6fbaa32 100644 --- a/src/db.rs +++ b/src/db.rs @@ -67,6 +67,7 @@ CREATE INDEX IF NOT EXISTS pubkey_ref_index ON pubkey_ref(referenced_pubkey); /// Spawn a database writer that persists events to the SQLite store. pub async fn db_writer( mut event_rx: tokio::sync::mpsc::Receiver, + bcast_tx: tokio::sync::broadcast::Sender, ) -> tokio::task::JoinHandle> { task::spawn_blocking(move || { let mut conn = Connection::open_with_flags( @@ -94,6 +95,8 @@ pub async fn db_writer( info!("nothing inserted (dupe?)"); } else { info!("persisted event: {}", event.get_event_id_prefix()); + // send this out to all clients + bcast_tx.send(event.clone()).ok(); } } Err(err) => { diff --git a/src/main.rs b/src/main.rs index a8ff114..1563c70 100644 --- a/src/main.rs +++ b/src/main.rs @@ -44,8 +44,10 @@ fn main() -> Result<(), Error> { // validated events that need to be persisted are sent to the // database on via this channel. let (event_tx, event_rx) = mpsc::channel::(16); - // start the database writer thread. - db::db_writer(event_rx).await; + // start the database writer thread. Give it a channel for + // writing events, and for publishing events that have been + // written (to all connected clients). + db::db_writer(event_rx, bcast_tx.clone()).await; // establish a channel for letting all threads now about a // requested server shutdown. let (invoke_shutdown, _) = broadcast::channel::<()>(1); @@ -86,7 +88,6 @@ async fn nostr_server( mut shutdown: Receiver<()>, ) { // get a broadcast channel for clients to communicate on - // wrap the TCP stream in a websocket. let mut bcast_rx = broadcast.subscribe(); // upgrade the TCP connection to WebSocket let conn = tokio_tungstenite::accept_async(stream).await; @@ -154,18 +155,12 @@ async fn nostr_server( // Write this to the database event_tx.send(e.clone()).await.ok(); client_published_event_count += 1; - // send this event to everyone listening. - let bcast_res = broadcast.send(e); - if bcast_res.is_err() { - warn!("could not send broadcast message: {:?}", bcast_res); - } }, Err(_) => {info!("client {} sent an invalid event", cid)} } }, Some(Ok(SubMsg(s))) => { debug!("client {} requesting a subscription", cid); - // subscription handling consists of: // * registering the subscription so future events can be matched // * making a channel to cancel to request later