From 620e22769977200dc19586310bea30a3a3089608 Mon Sep 17 00:00:00 2001 From: Greg Heartsfield Date: Sat, 1 Jan 2022 08:08:54 -0600 Subject: [PATCH] fix: connection issues with Firefox This adds Hyper, and a 200 response code. Prior to this, Firefox would fail to connect. There is also a text document displayed at the root URL to indicate this is a Nostr relay. Fixes https://todo.sr.ht/~gheartsfield/nostr-rs-relay/15 --- Cargo.lock | 137 ++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + src/main.rs | 152 +++++++++++++++++++++++++++++++++++++-------- src/protostream.rs | 6 +- 4 files changed, 266 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8cab72d..acf153d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -355,6 +355,25 @@ dependencies = [ "smallvec", ] +[[package]] +name = "h2" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f072413d126e57991455e0a922b31e4c8ba7c2ffbebf6b78b4f8521397d65cd" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.11.2" @@ -399,18 +418,59 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + [[package]] name = "httparse" version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acd94fdbe1d4ff688b67b04eee2e17bd50995534a61539e45adfefb45e5e5503" +[[package]] +name = "httpdate" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" + [[package]] name = "humantime" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "hyper" +version = "0.14.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7ec3e62bdc98a2f0393a5048e4c30ef659440ea6e0e572965103e72bd836f55" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "idna" version = "0.2.3" @@ -422,6 +482,16 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "indexmap" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5" +dependencies = [ + "autocfg 1.0.1", + "hashbrown", +] + [[package]] name = "instant" version = "0.1.12" @@ -582,6 +652,7 @@ dependencies = [ "futures-util", "governor", "hex", + "hyper", "lazy_static", "log", "nonzero_ext", @@ -1061,6 +1132,16 @@ version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309" +[[package]] +name = "socket2" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dc90fe6c7be1a323296982db1836d1ea9e47b6839496dde9a541bc496df3516" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "static_assertions" version = "1.1.0" @@ -1165,6 +1246,20 @@ dependencies = [ "tungstenite", ] +[[package]] +name = "tokio-util" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "log", + "pin-project-lite", + "tokio", +] + [[package]] name = "toml" version = "0.5.8" @@ -1174,6 +1269,38 @@ dependencies = [ "serde 1.0.131", ] +[[package]] +name = "tower-service" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" + +[[package]] +name = "tracing" +version = "0.1.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105" +dependencies = [ + "cfg-if", + "pin-project-lite", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + [[package]] name = "tungstenite" version = "0.16.0" @@ -1259,6 +1386,16 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + [[package]] name = "wasi" version = "0.10.2+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index da2cdd6..48694bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,3 +23,4 @@ rusqlite = "^0.26" lazy_static = "^1.4" governor = "^0.4" nonzero_ext = "^0.3" +hyper={ version="0.14", features=["server","http1","http2","tcp"] } diff --git a/src/main.rs b/src/main.rs index 1da4f21..ac4241a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,11 @@ //! Server process use futures::SinkExt; use futures::StreamExt; +use hyper::service::{make_service_fn, service_fn}; +use hyper::upgrade::Upgraded; +use hyper::{ + header, server::conn::AddrStream, upgrade, Body, Request, Response, Server, StatusCode, +}; use log::*; use nostr_rs_relay::close::Close; use nostr_rs_relay::config; @@ -12,14 +17,17 @@ use nostr_rs_relay::protostream; use nostr_rs_relay::protostream::NostrMessage::*; use nostr_rs_relay::protostream::NostrResponse::*; use std::collections::HashMap; +use std::convert::Infallible; use std::env; +use std::net::SocketAddr; use std::path::Path; -use tokio::net::{TcpListener, TcpStream}; use tokio::runtime::Builder; use tokio::sync::broadcast; use tokio::sync::broadcast::{Receiver, Sender}; use tokio::sync::mpsc; use tokio::sync::oneshot; +use tokio_tungstenite::WebSocketStream; +use tungstenite::handshake; use tungstenite::protocol::WebSocketConfig; fn db_from_args(args: Vec) -> Option { @@ -28,6 +36,88 @@ fn db_from_args(args: Vec) -> Option { } None } +async fn handle_web_request( + mut request: Request, + remote_addr: SocketAddr, + broadcast: Sender, + event_tx: tokio::sync::mpsc::Sender, + shutdown: Receiver<()>, +) -> Result, Infallible> { + match ( + request.uri().path(), + request.headers().contains_key(header::UPGRADE), + ) { + //if the request is ws_echo and the request headers contains an Upgrade key + ("/", true) => { + debug!("websocket with upgrade request"); + //assume request is a handshake, so create the handshake response + let response = match handshake::server::create_response_with_body(&request, || { + Body::empty() + }) { + Ok(response) => { + //in case the handshake response creation succeeds, + //spawn a task to handle the websocket connection + tokio::spawn(async move { + //using the hyper feature of upgrading a connection + match upgrade::on(&mut request).await { + //if successfully upgraded + Ok(upgraded) => { + //create a websocket stream from the upgraded object + let ws_stream = WebSocketStream::from_raw_socket( + //pass the upgraded object + //as the base layer stream of the Websocket + upgraded, + tokio_tungstenite::tungstenite::protocol::Role::Server, + None, + ) + .await; + tokio::spawn(nostr_server( + ws_stream, broadcast, event_tx, shutdown, + )); + } + Err(e) => println!( + "error when trying to upgrade connection \ + from address {} to websocket connection. \ + Error is: {}", + remote_addr, e + ), + } + }); + //return the response to the handshake request + response + } + Err(error) => { + warn!("websocket response failed"); + let mut res = + Response::new(Body::from(format!("Failed to create websocket: {}", error))); + *res.status_mut() = StatusCode::BAD_REQUEST; + return Ok(res); + } + }; + Ok::<_, Infallible>(response) + } + ("/", false) => { + // handle request at root with no upgrade header + Ok(Response::new(Body::from(format!( + "This is a Nostr relay.\n" + )))) + } + (_, _) => { + //handle any other url + Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::from("Nothing here.")) + .unwrap()) + } + } +} + +async fn shutdown_signal() { + // Wait for the CTRL+C signal + tokio::signal::ctrl_c() + .await + .expect("failed to install CTRL+C signal handler"); +} /// Start running a Nostr relay server. fn main() -> Result<(), Error> { @@ -46,6 +136,7 @@ fn main() -> Result<(), Error> { } *settings = c; } + let config = config::SETTINGS.read().unwrap(); // do some config validation. if !Path::new(&config.database.data_directory).is_dir() { @@ -54,6 +145,7 @@ fn main() -> Result<(), Error> { } debug!("config: {:?}", config); let addr = format!("{}:{}", config.network.address.trim(), config.network.port); + let socket_addr = addr.parse().expect("listening address not valid"); // configure tokio runtime let rt = Builder::new_multi_thread() .enable_all() @@ -63,8 +155,7 @@ fn main() -> Result<(), Error> { // start tokio rt.block_on(async { let settings = config::SETTINGS.read().unwrap(); - let listener = TcpListener::bind(&addr).await.expect("Failed to bind"); - info!("listening on: {}", addr); + info!("listening on: {}", socket_addr); // all client-submitted valid events are broadcast to every // other client on this channel. This should be large enough // to accomodate slower readers (messages are dropped if @@ -77,7 +168,7 @@ fn main() -> Result<(), Error> { // requested server shutdown. let (invoke_shutdown, _) = broadcast::channel::<()>(1); let ctrl_c_shutdown = invoke_shutdown.clone(); - // listen for ctrl-c interruupts + // // listen for ctrl-c interruupts tokio::spawn(async move { tokio::signal::ctrl_c().await.unwrap(); info!("shutting down due to SIGINT"); @@ -87,28 +178,35 @@ fn main() -> Result<(), Error> { // writing events, and for publishing events that have been // written (to all connected clients). db::db_writer(event_rx, bcast_tx.clone(), invoke_shutdown.subscribe()).await; - - // track unique client connection count - let mut client_accept_count: usize = 0; - let mut stop_listening = invoke_shutdown.subscribe(); - // handle new client connection requests, or SIGINT signals. - loop { - tokio::select! { - _ = stop_listening.recv() => { - break; - } - Ok((stream, _)) = listener.accept() => { - client_accept_count += 1; - info!("creating new connection for client #{}",client_accept_count); - tokio::spawn(nostr_server( - stream, - bcast_tx.clone(), - event_tx.clone(), - invoke_shutdown.subscribe(), - )); - } + info!("db writer created"); + // A `Service` is needed for every connection, so this + // creates one from our `handle_request` function. + let make_svc = make_service_fn(|conn: &AddrStream| { + let remote_addr = conn.remote_addr(); + let bcast = bcast_tx.clone(); + let event = event_tx.clone(); + let stop = invoke_shutdown.clone(); + async move { + // service_fn converts our function into a `Service` + Ok::<_, Infallible>(service_fn(move |request: Request| { + handle_web_request( + request, + remote_addr, + bcast.clone(), + event.clone(), + stop.subscribe(), + ) + })) } + }); + let server = Server::bind(&socket_addr) + .serve(make_svc) + .with_graceful_shutdown(shutdown_signal()); + // run hyper + if let Err(e) = server.await { + eprintln!("server error: {}", e); } + // our code }); Ok(()) } @@ -116,7 +214,7 @@ fn main() -> Result<(), Error> { /// Handle new client connections. This runs through an event loop /// for all client communication. async fn nostr_server( - stream: TcpStream, + ws_stream: WebSocketStream, broadcast: Sender, event_tx: tokio::sync::mpsc::Sender, mut shutdown: Receiver<()>, @@ -130,8 +228,8 @@ async fn nostr_server( config.max_frame_size = settings.limits.max_ws_frame_bytes; } // upgrade the TCP connection to WebSocket - let conn = tokio_tungstenite::accept_async_with_config(stream, Some(config)).await; - let ws_stream = conn.expect("websocket handshake error"); + //let conn = tokio_tungstenite::accept_async_with_config(stream, Some(config)).await; + //let ws_stream = conn.expect("websocket handshake error"); // wrap websocket into a stream & sink of Nostr protocol messages let mut nostr_stream = protostream::wrap_ws_in_nostr(ws_stream); // Track internal client state diff --git a/src/protostream.rs b/src/protostream.rs index 670c3ac..4d02e32 100644 --- a/src/protostream.rs +++ b/src/protostream.rs @@ -9,9 +9,9 @@ use futures::sink::Sink; use futures::stream::Stream; use futures::task::Context; use futures::task::Poll; +use hyper::upgrade::Upgraded; use log::*; use serde::{Deserialize, Serialize}; -use tokio::net::TcpStream; use tokio_tungstenite::WebSocketStream; use tungstenite::error::Error as WsError; use tungstenite::protocol::Message; @@ -40,11 +40,11 @@ pub enum NostrResponse { /// A Nostr protocol stream is layered on top of a Websocket stream. pub struct NostrStream { - ws_stream: WebSocketStream, + ws_stream: WebSocketStream, } /// Given a websocket, return a protocol stream wrapper. -pub fn wrap_ws_in_nostr(ws: WebSocketStream) -> NostrStream { +pub fn wrap_ws_in_nostr(ws: WebSocketStream) -> NostrStream { NostrStream { ws_stream: ws } }