Configure tokio runtime manually

This commit is contained in:
Greg Heartsfield 2021-11-21 13:31:23 -06:00
parent efa14418fc
commit 259c115d67
3 changed files with 36 additions and 21 deletions

1
Cargo.lock generated
View File

@ -309,6 +309,7 @@ dependencies = [
"log", "log",
"tokio", "tokio",
"tokio-tungstenite", "tokio-tungstenite",
"tungstenite",
] ]
[[package]] [[package]]

View File

@ -11,3 +11,4 @@ env_logger = "0.9.0"
tokio = { version = "1.14.0", features = ["full"] } tokio = { version = "1.14.0", features = ["full"] }
futures-util = "0.3.17" futures-util = "0.3.17"
tokio-tungstenite = "0.16.0" tokio-tungstenite = "0.16.0"
tungstenite = "0.16.0"

View File

@ -1,25 +1,39 @@
use std::{env, io::Error}; use std::{env, io::Error};
use futures_util::{future, StreamExt, TryStreamExt}; use futures_util::StreamExt;
use log::{info, warn}; use log::info;
use std::time::Duration;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::runtime::Builder;
use tungstenite::protocol::Message;
#[tokio::main] fn main() -> Result<(), Error> {
async fn main() -> Result<(), Error> {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
let addr = env::args() let addr = env::args()
.nth(1) .nth(1)
.unwrap_or_else(|| "127.0.0.1:8080".to_string()); .unwrap_or_else(|| "127.0.0.1:8080".to_string());
// configure tokio runtime
// Create the event loop and TCP listener we'll accept connections on. let rt = Builder::new_multi_thread()
let try_socket = TcpListener::bind(&addr).await; .worker_threads(2)
let listener = try_socket.expect("Failed to bind"); .enable_io()
info!("Listening on: {}", addr); .thread_name("tokio-ws")
.on_thread_stop(|| {
while let Ok((stream, _)) = listener.accept().await { info!("thread stopping");
tokio::spawn(accept_connection(stream)); })
} .on_thread_start(|| {
info!("thread starting");
})
.build()
.unwrap();
// start tokio
rt.block_on(async {
// Create the event loop and TCP listener we'll accept connections on.
let listener = TcpListener::bind(&addr).await.expect("Failed to bind");
info!("Listening on: {}", addr);
while let Ok((stream, _)) = listener.accept().await {
tokio::spawn(accept_connection(stream));
}
});
Ok(()) Ok(())
} }
@ -34,13 +48,12 @@ async fn accept_connection(stream: TcpStream) {
match ws_stream_res { match ws_stream_res {
Ok(ws_stream) => { Ok(ws_stream) => {
info!("New WebSocket connection: {}", addr); info!("New WebSocket connection: {}", addr);
let (_write, mut read) = ws_stream.split();
let (write, read) = ws_stream.split(); // TODO: error on binary messages
// We should not forward messages other than text or binary. // TODO: error on text messages > MAX_EVENT_SIZE
read.try_filter(|msg| future::ready(msg.is_text() || msg.is_binary())) while let Some(mes_res) = read.next().await {
.forward(write) println!("got {:?}", mes_res);
.await }
.unwrap_or_else(|_| warn!("Failed to forward message"));
} }
Err(_) => { Err(_) => {
println!("Error"); println!("Error");