Moved nostr protocol handling to proto module

This commit is contained in:
Greg Heartsfield 2021-11-26 10:17:20 -06:00
parent 31057b6fd3
commit 010c96b05f
4 changed files with 157 additions and 105 deletions

46
Cargo.lock generated
View File

@ -156,12 +156,54 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba"
[[package]]
name = "futures"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cd0210d8c325c245ff06fd95a3b13689a1a276ac8cfa8e8720cb840bfb84b9e"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fc8cd39e3dbf865f7340dce6a2d401d24fd37c6fe6c4f0ee0de8bfca2252d27"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "629316e42fe7c2a0b9a65b47d159ceaa5453ab14e8f0a3c5eedbb8cd55b4a445"
[[package]]
name = "futures-executor"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b808bf53348a36cab739d7e04755909b9fcaaa69b7d7e588b37b6ec62704c97"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e481354db6b5c353246ccf6a728b0c5511d752c08da7260546fc0933869daa11"
[[package]]
name = "futures-macro"
version = "0.3.18"
@ -191,10 +233,13 @@ version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41d22213122356472061ac0f1ab2cee28d2bac8491410fd68c2af53d1cedb83e"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
@ -344,6 +389,7 @@ dependencies = [
"base64",
"bitcoin_hashes 0.10.0",
"env_logger",
"futures",
"futures-util",
"log",
"secp256k1",

View File

@ -9,6 +9,7 @@ edition = "2018"
log = "0.4.14"
env_logger = "0.9.0"
tokio = { version = "1.14.0", features = ["full"] }
futures = "0.3.18"
futures-util = "0.3.18"
tokio-tungstenite = "0.16.0"
tungstenite = "0.16.0"

View File

@ -1,17 +1,9 @@
use std::{env, io::Error};
//use futures::stream::SplitSink;
use futures_util::{SinkExt, StreamExt};
use log::{debug, info, warn};
use log::info;
use nostr_rs_relay::proto::Proto;
//use tokio::io::{ReadHalf, WriteHalf};
use std::{env, io::Error};
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime::Builder;
use tokio_tungstenite::WebSocketStream;
use tungstenite::error::Error::*;
use tungstenite::protocol::frame::coding::CloseCode;
use tungstenite::protocol::frame::CloseFrame;
use tungstenite::protocol::{Message, WebSocketConfig};
use tungstenite::protocol::WebSocketConfig;
fn main() -> Result<(), Error> {
let _ = env_logger::try_init();
@ -46,7 +38,7 @@ fn main() -> Result<(), Error> {
// Todo: Implement sending messages to all other clients; example:
// https://github.com/snapview/tokio-tungstenite/blob/master/examples/server.rs
// Handles new TCP connections
// Wrap/Upgrade TCP connections in WebSocket Streams, and hand off to Nostr protocol handler.
async fn nostr_server(stream: TcpStream) {
let addr = stream
.peer_addr()
@ -62,7 +54,9 @@ async fn nostr_server(stream: TcpStream) {
match conn {
Ok(ws_stream) => {
info!("New WebSocket connection: {}", addr);
process_client(ws_stream).await;
// create a nostr protocol handler, and give it the stream
let mut proto = Proto::new(ws_stream);
proto.process_client().await
}
Err(_) => {
println!("Error");
@ -70,93 +64,3 @@ async fn nostr_server(stream: TcpStream) {
}
};
}
// Handles valid clients who have upgraded to WebSockets
async fn process_client(mut stream: WebSocketStream<TcpStream>) {
// get a protocol helper;
let mut proto = Proto::new();
// TODO: select on a timeout to kill non-responsive clients
while let Some(mes_res) = stream.next().await {
// as long as connection is not closed...
match mes_res {
Ok(Message::Text(cmd)) => {
info!("Message received");
let length = cmd.len();
debug!("Message: {}", cmd);
stream
.send(Message::Text(format!(
"got your message of length {}",
length
)))
.await
.expect("send failed");
// Handle this request. Everything else below is basically websocket error handling.
let proto_error = proto.process_message(cmd);
match proto_error {
Err(_) => {
stream
.send(Message::Text(
"[\"NOTICE\", \"Failed to process message.\"]".to_owned(),
))
.await
.expect("send failed");
}
Ok(_) => {
info!("Message processed successfully");
}
}
}
Ok(Message::Binary(_)) => {
info!("Ignoring Binary message");
stream
.send(Message::Text(
"[\"NOTICE\", \"BINARY_INVALID: Binary messages are not supported.\"]"
.to_owned(),
))
.await
.expect("send failed");
}
Ok(Message::Ping(_)) | Ok(Message::Pong(_)) => debug!("Ping/Pong"),
Ok(Message::Close(_)) => {
info!("Got request to close connection");
return;
}
Err(tungstenite::error::Error::Capacity(
tungstenite::error::CapacityError::MessageTooLong { size, max_size },
)) => {
info!(
"Message size too large, disconnecting this client. ({} > {})",
size, max_size
);
stream.send(Message::Text("[\"NOTICE\", \"MAX_EVENT_SIZE_EXCEEDED: Exceeded maximum event size for this relay. Closing Connection.\"]".to_owned())).await.expect("send notice failed");
stream
.close(Some(CloseFrame {
code: CloseCode::Size,
reason: "Exceeded max message size".into(),
}))
.await
.expect("failed to send close frame");
return;
}
Err(AlreadyClosed) => {
warn!("this connection was already closed, and we tried to operate on it");
return;
}
Err(ConnectionClosed) | Err(Io(_)) => {
debug!("Closing this connection normally");
return;
}
Err(Tls(_)) | Err(Protocol(_)) | Err(Utf8) | Err(Url(_)) | Err(HttpFormat(_))
| Err(Http(_)) => {
info!("websocket/tls/enc protocol error, dropping connection");
return;
}
Err(e) => {
warn!("Some new kind of error, bailing: {:?}", e);
return;
}
}
}
println!("Client is exiting, no longer reading websocket messages");
}

View File

@ -2,8 +2,17 @@ use crate::close::Close;
use crate::error::{Error, Result};
use crate::event::Event;
use crate::subscription::Subscription;
use log::{debug, info};
use futures::SinkExt;
use tungstenite::error::Error::*;
use tungstenite::protocol::frame::coding::CloseCode;
use tungstenite::protocol::frame::CloseFrame;
use tungstenite::protocol::Message;
use futures::StreamExt;
use log::{debug, info, warn};
use std::collections::HashMap;
use tokio::net::TcpStream;
use tokio_tungstenite::WebSocketStream;
use uuid::Uuid;
// A protocol handler/helper. Use one per client.
@ -11,22 +20,114 @@ pub struct Proto {
client_id: Uuid,
// current set of subscriptions
subscriptions: HashMap<String, Subscription>,
// websocket
stream: WebSocketStream<TcpStream>,
max_subs: usize,
}
const MAX_SUBSCRIPTION_ID_LEN: usize = 256;
impl Proto {
pub fn new() -> Self {
pub fn new(stream: WebSocketStream<TcpStream>) -> Self {
let p = Proto {
client_id: Uuid::new_v4(),
subscriptions: HashMap::new(),
max_subs: 128,
stream: stream,
};
debug!("New client: {:?}", p.client_id);
p
}
pub async fn process_client(&mut self) {
while let Some(mes_res) = self.stream.next().await {
self.send_notice().await;
match mes_res {
Ok(Message::Text(cmd)) => {
info!("Message received");
let length = cmd.len();
debug!("Message: {}", cmd);
self.stream
.send(Message::Text(format!(
"got your message of length {}",
length
)))
.await
.expect("send failed");
// Handle this request. Everything else below is basically websocket error handling.
let proto_error = self.process_message(cmd);
match proto_error {
Err(_) => {
self.stream
.send(Message::Text(
"[\"NOTICE\", \"Failed to process message.\"]".to_owned(),
))
.await
.expect("send failed");
}
Ok(_) => {
info!("Message processed successfully");
}
}
}
Ok(Message::Binary(_)) => {
info!("Ignoring Binary message");
self.stream
.send(Message::Text(
"[\"NOTICE\", \"BINARY_INVALID: Binary messages are not supported.\"]"
.to_owned(),
))
.await
.expect("send failed");
}
Ok(Message::Ping(_)) | Ok(Message::Pong(_)) => debug!("Ping/Pong"),
Ok(Message::Close(_)) => {
info!("Got request to close connection");
return;
}
Err(tungstenite::error::Error::Capacity(
tungstenite::error::CapacityError::MessageTooLong { size, max_size },
)) => {
info!(
"Message size too large, disconnecting this client. ({} > {})",
size, max_size
);
self.stream.send(Message::Text("[\"NOTICE\", \"MAX_EVENT_SIZE_EXCEEDED: Exceeded maximum event size for this relay. Closing Connection.\"]".to_owned())).await.expect("send notice failed");
self.stream
.close(Some(CloseFrame {
code: CloseCode::Size,
reason: "Exceeded max message size".into(),
}))
.await
.expect("failed to send close frame");
return;
}
Err(AlreadyClosed) => {
warn!("this connection was already closed, and we tried to operate on it");
return;
}
Err(ConnectionClosed) | Err(Io(_)) => {
debug!("Closing this connection normally");
return;
}
Err(Tls(_)) | Err(Protocol(_)) | Err(Utf8) | Err(Url(_)) | Err(HttpFormat(_))
| Err(Http(_)) => {
info!("websocket/tls/enc protocol error, dropping connection");
return;
}
Err(e) => {
warn!("Some new kind of error, bailing: {:?}", e);
return;
}
}
}
}
pub async fn send_notice(&mut self) {
self.stream.send(Message::Text(format!("foo"))).await;
}
// Error results will be transformed into client NOTICEs
pub fn process_message(&mut self, cmd: String) -> Result<()> {
info!(