diff --git a/Cargo.lock b/Cargo.lock index ef38322..5cecb39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -156,12 +156,54 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" +[[package]] +name = "futures" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cd0210d8c325c245ff06fd95a3b13689a1a276ac8cfa8e8720cb840bfb84b9e" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fc8cd39e3dbf865f7340dce6a2d401d24fd37c6fe6c4f0ee0de8bfca2252d27" +dependencies = [ + "futures-core", + "futures-sink", +] + [[package]] name = "futures-core" version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "629316e42fe7c2a0b9a65b47d159ceaa5453ab14e8f0a3c5eedbb8cd55b4a445" +[[package]] +name = "futures-executor" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b808bf53348a36cab739d7e04755909b9fcaaa69b7d7e588b37b6ec62704c97" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e481354db6b5c353246ccf6a728b0c5511d752c08da7260546fc0933869daa11" + [[package]] name = "futures-macro" version = "0.3.18" @@ -191,10 +233,13 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41d22213122356472061ac0f1ab2cee28d2bac8491410fd68c2af53d1cedb83e" dependencies = [ + "futures-channel", "futures-core", + "futures-io", "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -344,6 +389,7 @@ dependencies = [ "base64", "bitcoin_hashes 0.10.0", "env_logger", + "futures", "futures-util", "log", "secp256k1", diff --git a/Cargo.toml b/Cargo.toml index 3a954c5..fdfc53f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ edition = "2018" log = "0.4.14" env_logger = "0.9.0" tokio = { version = "1.14.0", features = ["full"] } +futures = "0.3.18" futures-util = "0.3.18" tokio-tungstenite = "0.16.0" tungstenite = "0.16.0" diff --git a/src/main.rs b/src/main.rs index 9eeb858..a38e308 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,9 @@ -use std::{env, io::Error}; - -//use futures::stream::SplitSink; -use futures_util::{SinkExt, StreamExt}; -use log::{debug, info, warn}; +use log::info; use nostr_rs_relay::proto::Proto; -//use tokio::io::{ReadHalf, WriteHalf}; +use std::{env, io::Error}; use tokio::net::{TcpListener, TcpStream}; use tokio::runtime::Builder; -use tokio_tungstenite::WebSocketStream; -use tungstenite::error::Error::*; -use tungstenite::protocol::frame::coding::CloseCode; -use tungstenite::protocol::frame::CloseFrame; -use tungstenite::protocol::{Message, WebSocketConfig}; +use tungstenite::protocol::WebSocketConfig; fn main() -> Result<(), Error> { let _ = env_logger::try_init(); @@ -46,7 +38,7 @@ fn main() -> Result<(), Error> { // Todo: Implement sending messages to all other clients; example: // https://github.com/snapview/tokio-tungstenite/blob/master/examples/server.rs -// Handles new TCP connections +// Wrap/Upgrade TCP connections in WebSocket Streams, and hand off to Nostr protocol handler. async fn nostr_server(stream: TcpStream) { let addr = stream .peer_addr() @@ -62,7 +54,9 @@ async fn nostr_server(stream: TcpStream) { match conn { Ok(ws_stream) => { info!("New WebSocket connection: {}", addr); - process_client(ws_stream).await; + // create a nostr protocol handler, and give it the stream + let mut proto = Proto::new(ws_stream); + proto.process_client().await } Err(_) => { println!("Error"); @@ -70,93 +64,3 @@ async fn nostr_server(stream: TcpStream) { } }; } - -// Handles valid clients who have upgraded to WebSockets -async fn process_client(mut stream: WebSocketStream) { - // get a protocol helper; - let mut proto = Proto::new(); - // TODO: select on a timeout to kill non-responsive clients - - while let Some(mes_res) = stream.next().await { - // as long as connection is not closed... - match mes_res { - Ok(Message::Text(cmd)) => { - info!("Message received"); - let length = cmd.len(); - debug!("Message: {}", cmd); - stream - .send(Message::Text(format!( - "got your message of length {}", - length - ))) - .await - .expect("send failed"); - // Handle this request. Everything else below is basically websocket error handling. - let proto_error = proto.process_message(cmd); - match proto_error { - Err(_) => { - stream - .send(Message::Text( - "[\"NOTICE\", \"Failed to process message.\"]".to_owned(), - )) - .await - .expect("send failed"); - } - Ok(_) => { - info!("Message processed successfully"); - } - } - } - Ok(Message::Binary(_)) => { - info!("Ignoring Binary message"); - stream - .send(Message::Text( - "[\"NOTICE\", \"BINARY_INVALID: Binary messages are not supported.\"]" - .to_owned(), - )) - .await - .expect("send failed"); - } - Ok(Message::Ping(_)) | Ok(Message::Pong(_)) => debug!("Ping/Pong"), - Ok(Message::Close(_)) => { - info!("Got request to close connection"); - return; - } - Err(tungstenite::error::Error::Capacity( - tungstenite::error::CapacityError::MessageTooLong { size, max_size }, - )) => { - info!( - "Message size too large, disconnecting this client. ({} > {})", - size, max_size - ); - stream.send(Message::Text("[\"NOTICE\", \"MAX_EVENT_SIZE_EXCEEDED: Exceeded maximum event size for this relay. Closing Connection.\"]".to_owned())).await.expect("send notice failed"); - stream - .close(Some(CloseFrame { - code: CloseCode::Size, - reason: "Exceeded max message size".into(), - })) - .await - .expect("failed to send close frame"); - return; - } - Err(AlreadyClosed) => { - warn!("this connection was already closed, and we tried to operate on it"); - return; - } - Err(ConnectionClosed) | Err(Io(_)) => { - debug!("Closing this connection normally"); - return; - } - Err(Tls(_)) | Err(Protocol(_)) | Err(Utf8) | Err(Url(_)) | Err(HttpFormat(_)) - | Err(Http(_)) => { - info!("websocket/tls/enc protocol error, dropping connection"); - return; - } - Err(e) => { - warn!("Some new kind of error, bailing: {:?}", e); - return; - } - } - } - println!("Client is exiting, no longer reading websocket messages"); -} diff --git a/src/proto.rs b/src/proto.rs index 00892c0..927d5d7 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -2,8 +2,17 @@ use crate::close::Close; use crate::error::{Error, Result}; use crate::event::Event; use crate::subscription::Subscription; -use log::{debug, info}; +use futures::SinkExt; +use tungstenite::error::Error::*; +use tungstenite::protocol::frame::coding::CloseCode; +use tungstenite::protocol::frame::CloseFrame; +use tungstenite::protocol::Message; + +use futures::StreamExt; +use log::{debug, info, warn}; use std::collections::HashMap; +use tokio::net::TcpStream; +use tokio_tungstenite::WebSocketStream; use uuid::Uuid; // A protocol handler/helper. Use one per client. @@ -11,22 +20,114 @@ pub struct Proto { client_id: Uuid, // current set of subscriptions subscriptions: HashMap, + // websocket + stream: WebSocketStream, max_subs: usize, } const MAX_SUBSCRIPTION_ID_LEN: usize = 256; impl Proto { - pub fn new() -> Self { + pub fn new(stream: WebSocketStream) -> Self { let p = Proto { client_id: Uuid::new_v4(), subscriptions: HashMap::new(), max_subs: 128, + stream: stream, }; debug!("New client: {:?}", p.client_id); p } + pub async fn process_client(&mut self) { + while let Some(mes_res) = self.stream.next().await { + self.send_notice().await; + + match mes_res { + Ok(Message::Text(cmd)) => { + info!("Message received"); + let length = cmd.len(); + debug!("Message: {}", cmd); + self.stream + .send(Message::Text(format!( + "got your message of length {}", + length + ))) + .await + .expect("send failed"); + // Handle this request. Everything else below is basically websocket error handling. + let proto_error = self.process_message(cmd); + match proto_error { + Err(_) => { + self.stream + .send(Message::Text( + "[\"NOTICE\", \"Failed to process message.\"]".to_owned(), + )) + .await + .expect("send failed"); + } + Ok(_) => { + info!("Message processed successfully"); + } + } + } + Ok(Message::Binary(_)) => { + info!("Ignoring Binary message"); + self.stream + .send(Message::Text( + "[\"NOTICE\", \"BINARY_INVALID: Binary messages are not supported.\"]" + .to_owned(), + )) + .await + .expect("send failed"); + } + Ok(Message::Ping(_)) | Ok(Message::Pong(_)) => debug!("Ping/Pong"), + Ok(Message::Close(_)) => { + info!("Got request to close connection"); + return; + } + Err(tungstenite::error::Error::Capacity( + tungstenite::error::CapacityError::MessageTooLong { size, max_size }, + )) => { + info!( + "Message size too large, disconnecting this client. ({} > {})", + size, max_size + ); + self.stream.send(Message::Text("[\"NOTICE\", \"MAX_EVENT_SIZE_EXCEEDED: Exceeded maximum event size for this relay. Closing Connection.\"]".to_owned())).await.expect("send notice failed"); + self.stream + .close(Some(CloseFrame { + code: CloseCode::Size, + reason: "Exceeded max message size".into(), + })) + .await + .expect("failed to send close frame"); + return; + } + Err(AlreadyClosed) => { + warn!("this connection was already closed, and we tried to operate on it"); + return; + } + Err(ConnectionClosed) | Err(Io(_)) => { + debug!("Closing this connection normally"); + return; + } + Err(Tls(_)) | Err(Protocol(_)) | Err(Utf8) | Err(Url(_)) | Err(HttpFormat(_)) + | Err(Http(_)) => { + info!("websocket/tls/enc protocol error, dropping connection"); + return; + } + Err(e) => { + warn!("Some new kind of error, bailing: {:?}", e); + return; + } + } + } + } + + pub async fn send_notice(&mut self) { + self.stream.send(Message::Text(format!("foo"))).await; + } + // Error results will be transformed into client NOTICEs pub fn process_message(&mut self, cmd: String) -> Result<()> { info!(