From cf6171aadc33eef3cf7b8accf4f54c63851b0588 Mon Sep 17 00:00:00 2001 From: Greg Heartsfield Date: Sun, 21 Nov 2021 16:03:03 -0600 Subject: [PATCH] Handle client messages that exceed max size --- src/main.rs | 71 ++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 65 insertions(+), 6 deletions(-) diff --git a/src/main.rs b/src/main.rs index c87ba7b..6e07485 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,13 @@ use std::{env, io::Error}; -use futures_util::StreamExt; -use log::info; +use futures_util::{SinkExt, StreamExt}; +use log::{debug, info}; use tokio::net::{TcpListener, TcpStream}; use tokio::runtime::Builder; use tokio_tungstenite::WebSocketStream; -//use tungstenite::protocol::Message; +use tungstenite::protocol::frame::coding::CloseCode; +use tungstenite::protocol::frame::CloseFrame; +use tungstenite::protocol::{Message, WebSocketConfig}; fn main() -> Result<(), Error> { let _ = env_logger::try_init(); @@ -37,12 +39,19 @@ fn main() -> Result<(), Error> { Ok(()) } +// Handles new TCP connections async fn nostr_server(stream: TcpStream) { let addr = stream .peer_addr() .expect("connected streams should have a peer address"); info!("Peer address: {}", addr); - let conn = tokio_tungstenite::accept_async(stream).await; + let config = WebSocketConfig { + max_send_queue: None, + max_message_size: Some(2 << 16), // 64K + max_frame_size: Some(2 << 16), // 64k + accept_unmasked_frames: false, // follow the spec + }; + let conn = tokio_tungstenite::accept_async_with_config(stream, Some(config)).await; match conn { Ok(ws_stream) => { info!("New WebSocket connection: {}", addr); @@ -54,11 +63,61 @@ async fn nostr_server(stream: TcpStream) { } }; } + +// Handles valid clients who have upgraded to WebSockets async fn process_client(stream: WebSocketStream) { - let (_write, mut read) = stream.split(); + let (mut write, mut read) = stream.split(); // TODO: error on binary messages // TODO: error on text messages > MAX_EVENT_SIZE + // TODO: select on a timeout to kill non-responsive clients + while let Some(mes_res) = read.next().await { - println!("got {:?}", mes_res); + // as long as connection is not closed... + match mes_res { + Ok(Message::Text(cmd)) => { + let length = cmd.len(); + write + .send(Message::Text(format!( + "got your message of length {}", + length + ))) + .await + .expect("send failed"); + } + Ok(Message::Binary(_)) => { + info!("Ignoring Binary message"); + write + .send(Message::Text( + "[\"NOTICE\", \"BINARY_INVALID: Binary messages are not supported.\"]" + .to_owned(), + )) + .await + .expect("send failed"); + } + Ok(Message::Ping(_)) | Ok(Message::Pong(_)) => debug!("Ping/Pong"), + Ok(Message::Close(_)) => { + info!("Got request to close connection"); + return; + } + Err(e) => { + // TODO: check for specific error: (Capacity(MessageTooLong { size: xxxxx, max_size: 131072 })) + info!( + "Message size too large, disconnecting this client: ({:?})", + e + ); + write.send(Message::Text("[\"NOTICE\", \"MAX_EVENT_SIZE_EXCEEDED: Exceeded maximum event size for this relay. Closing Connection.\"]".to_owned())).await.expect("send failed"); + write + .reunite(read) + .expect("reunite failed") + .close(Some(CloseFrame { + code: CloseCode::Size, + reason: "Exceeded max message size".into(), + })) + .await + .expect("failed to send close frame"); + return; + } + } } + println!("Client is exiting, no longer reading websocket messages"); }