diff --git a/Cargo.lock b/Cargo.lock index e47473e..3869390 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -309,6 +309,7 @@ dependencies = [ "log", "tokio", "tokio-tungstenite", + "tungstenite", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index b53c3fb..572e84e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,3 +11,4 @@ env_logger = "0.9.0" tokio = { version = "1.14.0", features = ["full"] } futures-util = "0.3.17" tokio-tungstenite = "0.16.0" +tungstenite = "0.16.0" diff --git a/src/main.rs b/src/main.rs index 5b4c3ad..18250a2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,25 +1,39 @@ use std::{env, io::Error}; -use futures_util::{future, StreamExt, TryStreamExt}; -use log::{info, warn}; +use futures_util::StreamExt; +use log::info; +use std::time::Duration; use tokio::net::{TcpListener, TcpStream}; +use tokio::runtime::Builder; +use tungstenite::protocol::Message; -#[tokio::main] -async fn main() -> Result<(), Error> { +fn main() -> Result<(), Error> { let _ = env_logger::try_init(); let addr = env::args() .nth(1) .unwrap_or_else(|| "127.0.0.1:8080".to_string()); - - // Create the event loop and TCP listener we'll accept connections on. - let try_socket = TcpListener::bind(&addr).await; - let listener = try_socket.expect("Failed to bind"); - info!("Listening on: {}", addr); - - while let Ok((stream, _)) = listener.accept().await { - tokio::spawn(accept_connection(stream)); - } - + // configure tokio runtime + let rt = Builder::new_multi_thread() + .worker_threads(2) + .enable_io() + .thread_name("tokio-ws") + .on_thread_stop(|| { + info!("thread stopping"); + }) + .on_thread_start(|| { + info!("thread starting"); + }) + .build() + .unwrap(); + // start tokio + rt.block_on(async { + // Create the event loop and TCP listener we'll accept connections on. + let listener = TcpListener::bind(&addr).await.expect("Failed to bind"); + info!("Listening on: {}", addr); + while let Ok((stream, _)) = listener.accept().await { + tokio::spawn(accept_connection(stream)); + } + }); Ok(()) } @@ -34,13 +48,12 @@ async fn accept_connection(stream: TcpStream) { match ws_stream_res { Ok(ws_stream) => { info!("New WebSocket connection: {}", addr); - - let (write, read) = ws_stream.split(); - // We should not forward messages other than text or binary. - read.try_filter(|msg| future::ready(msg.is_text() || msg.is_binary())) - .forward(write) - .await - .unwrap_or_else(|_| warn!("Failed to forward message")); + let (_write, mut read) = ws_stream.split(); + // TODO: error on binary messages + // TODO: error on text messages > MAX_EVENT_SIZE + while let Some(mes_res) = read.next().await { + println!("got {:?}", mes_res); + } } Err(_) => { println!("Error");