Handle client messages that exceed max size

This commit is contained in:
Greg Heartsfield 2021-11-21 16:03:03 -06:00
parent dbb535eaaa
commit cf6171aadc

View File

@ -1,11 +1,13 @@
use std::{env, io::Error}; use std::{env, io::Error};
use futures_util::StreamExt; use futures_util::{SinkExt, StreamExt};
use log::info; use log::{debug, info};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::runtime::Builder; use tokio::runtime::Builder;
use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::WebSocketStream;
//use tungstenite::protocol::Message; use tungstenite::protocol::frame::coding::CloseCode;
use tungstenite::protocol::frame::CloseFrame;
use tungstenite::protocol::{Message, WebSocketConfig};
fn main() -> Result<(), Error> { fn main() -> Result<(), Error> {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
@ -37,12 +39,19 @@ fn main() -> Result<(), Error> {
Ok(()) Ok(())
} }
// Handles new TCP connections
async fn nostr_server(stream: TcpStream) { async fn nostr_server(stream: TcpStream) {
let addr = stream let addr = stream
.peer_addr() .peer_addr()
.expect("connected streams should have a peer address"); .expect("connected streams should have a peer address");
info!("Peer address: {}", addr); info!("Peer address: {}", addr);
let conn = tokio_tungstenite::accept_async(stream).await; let config = WebSocketConfig {
max_send_queue: None,
max_message_size: Some(2 << 16), // 64K
max_frame_size: Some(2 << 16), // 64k
accept_unmasked_frames: false, // follow the spec
};
let conn = tokio_tungstenite::accept_async_with_config(stream, Some(config)).await;
match conn { match conn {
Ok(ws_stream) => { Ok(ws_stream) => {
info!("New WebSocket connection: {}", addr); info!("New WebSocket connection: {}", addr);
@ -54,11 +63,61 @@ async fn nostr_server(stream: TcpStream) {
} }
}; };
} }
// Handles valid clients who have upgraded to WebSockets
async fn process_client(stream: WebSocketStream<TcpStream>) { async fn process_client(stream: WebSocketStream<TcpStream>) {
let (_write, mut read) = stream.split(); let (mut write, mut read) = stream.split();
// TODO: error on binary messages // TODO: error on binary messages
// TODO: error on text messages > MAX_EVENT_SIZE // TODO: error on text messages > MAX_EVENT_SIZE
// TODO: select on a timeout to kill non-responsive clients
while let Some(mes_res) = read.next().await { while let Some(mes_res) = read.next().await {
println!("got {:?}", mes_res); // as long as connection is not closed...
match mes_res {
Ok(Message::Text(cmd)) => {
let length = cmd.len();
write
.send(Message::Text(format!(
"got your message of length {}",
length
)))
.await
.expect("send failed");
} }
Ok(Message::Binary(_)) => {
info!("Ignoring Binary message");
write
.send(Message::Text(
"[\"NOTICE\", \"BINARY_INVALID: Binary messages are not supported.\"]"
.to_owned(),
))
.await
.expect("send failed");
}
Ok(Message::Ping(_)) | Ok(Message::Pong(_)) => debug!("Ping/Pong"),
Ok(Message::Close(_)) => {
info!("Got request to close connection");
return;
}
Err(e) => {
// TODO: check for specific error: (Capacity(MessageTooLong { size: xxxxx, max_size: 131072 }))
info!(
"Message size too large, disconnecting this client: ({:?})",
e
);
write.send(Message::Text("[\"NOTICE\", \"MAX_EVENT_SIZE_EXCEEDED: Exceeded maximum event size for this relay. Closing Connection.\"]".to_owned())).await.expect("send failed");
write
.reunite(read)
.expect("reunite failed")
.close(Some(CloseFrame {
code: CloseCode::Size,
reason: "Exceeded max message size".into(),
}))
.await
.expect("failed to send close frame");
return;
}
}
}
println!("Client is exiting, no longer reading websocket messages");
} }