2022-09-06 06:56:04 -04:00
//! Server process
use crate ::close ::Close ;
use crate ::close ::CloseCmd ;
2022-09-11 12:01:36 -04:00
use crate ::config ::{ Settings , VerifiedUsersMode } ;
2022-09-06 06:56:04 -04:00
use crate ::conn ;
use crate ::db ;
use crate ::db ::SubmittedEvent ;
use crate ::error ::{ Error , Result } ;
use crate ::event ::Event ;
use crate ::event ::EventCmd ;
use crate ::info ::RelayInfo ;
use crate ::nip05 ;
use crate ::subscription ::Subscription ;
use futures ::SinkExt ;
use futures ::StreamExt ;
2022-11-03 14:01:38 -04:00
use http ::header ::HeaderMap ;
2022-09-06 06:56:04 -04:00
use hyper ::header ::ACCEPT ;
use hyper ::service ::{ make_service_fn , service_fn } ;
use hyper ::upgrade ::Upgraded ;
use hyper ::{
header , server ::conn ::AddrStream , upgrade , Body , Request , Response , Server , StatusCode ,
} ;
use serde ::{ Deserialize , Serialize } ;
use serde_json ::json ;
use std ::collections ::HashMap ;
use std ::convert ::Infallible ;
use std ::net ::SocketAddr ;
use std ::path ::Path ;
2022-09-06 07:12:07 -04:00
use std ::sync ::mpsc ::Receiver as MpscReceiver ;
2022-09-06 06:56:04 -04:00
use std ::time ::Duration ;
use std ::time ::Instant ;
use tokio ::runtime ::Builder ;
use tokio ::sync ::broadcast ::{ self , Receiver , Sender } ;
use tokio ::sync ::mpsc ;
use tokio ::sync ::oneshot ;
use tokio_tungstenite ::WebSocketStream ;
2022-09-28 08:19:59 -04:00
use tracing ::* ;
2022-09-06 06:56:04 -04:00
use tungstenite ::error ::CapacityError ::MessageTooLong ;
use tungstenite ::error ::Error as WsError ;
use tungstenite ::handshake ;
use tungstenite ::protocol ::Message ;
use tungstenite ::protocol ::WebSocketConfig ;
/// Handle arbitrary HTTP requests, including for WebSocket upgrades.
async fn handle_web_request (
mut request : Request < Body > ,
pool : db ::SqlitePool ,
2022-09-06 07:12:07 -04:00
settings : Settings ,
2022-09-06 06:56:04 -04:00
remote_addr : SocketAddr ,
broadcast : Sender < Event > ,
event_tx : tokio ::sync ::mpsc ::Sender < SubmittedEvent > ,
shutdown : Receiver < ( ) > ,
) -> Result < Response < Body > , Infallible > {
match (
request . uri ( ) . path ( ) ,
request . headers ( ) . contains_key ( header ::UPGRADE ) ,
) {
// Request for / as websocket
( " / " , true ) = > {
trace! ( " websocket with upgrade request " ) ;
//assume request is a handshake, so create the handshake response
let response = match handshake ::server ::create_response_with_body ( & request , | | {
Body ::empty ( )
} ) {
Ok ( response ) = > {
//in case the handshake response creation succeeds,
//spawn a task to handle the websocket connection
tokio ::spawn ( async move {
//using the hyper feature of upgrading a connection
match upgrade ::on ( & mut request ) . await {
//if successfully upgraded
Ok ( upgraded ) = > {
// set WebSocket configuration options
2022-09-06 07:12:07 -04:00
let config = WebSocketConfig {
max_message_size : settings . limits . max_ws_message_bytes ,
max_frame_size : settings . limits . max_ws_frame_bytes ,
.. Default ::default ( )
} ;
2022-09-06 06:56:04 -04:00
//create a websocket stream from the upgraded object
let ws_stream = WebSocketStream ::from_raw_socket (
//pass the upgraded object
//as the base layer stream of the Websocket
upgraded ,
tokio_tungstenite ::tungstenite ::protocol ::Role ::Server ,
Some ( config ) ,
)
. await ;
2022-11-04 19:05:01 -04:00
// determine the remote IP from headers if the exist
let header_ip = settings
. network
. remote_ip_header
. as_ref ( )
. and_then ( | x | get_header_remote_ip ( x , request . headers ( ) ) ) ;
// use the socket addr as a backup
2022-11-03 14:01:38 -04:00
let remote_ip =
2022-11-04 19:05:01 -04:00
header_ip . unwrap_or_else ( | | remote_addr . ip ( ) . to_string ( ) ) ;
// spawn a nostr server with our websocket
2022-09-06 06:56:04 -04:00
tokio ::spawn ( nostr_server (
2022-11-02 19:33:44 -04:00
pool , remote_ip , settings , ws_stream , broadcast , event_tx ,
shutdown ,
2022-09-06 06:56:04 -04:00
) ) ;
}
2022-11-02 19:33:44 -04:00
// todo: trace, don't print...
2022-09-06 06:56:04 -04:00
Err ( e ) = > println! (
" error when trying to upgrade connection \
from address { } to websocket connection . \
Error is : { } " ,
remote_addr , e
) ,
}
} ) ;
//return the response to the handshake request
response
}
Err ( error ) = > {
warn! ( " websocket response failed " ) ;
let mut res =
Response ::new ( Body ::from ( format! ( " Failed to create websocket: {} " , error ) ) ) ;
* res . status_mut ( ) = StatusCode ::BAD_REQUEST ;
return Ok ( res ) ;
}
} ;
Ok ::< _ , Infallible > ( response )
}
// Request for Relay info
( " / " , false ) = > {
// handle request at root with no upgrade header
// Check if this is a nostr server info request
let accept_header = & request . headers ( ) . get ( ACCEPT ) ;
// check if application/nostr+json is included
if let Some ( media_types ) = accept_header {
if let Ok ( mt_str ) = media_types . to_str ( ) {
if mt_str . contains ( " application/nostr+json " ) {
// build a relay info response
debug! ( " Responding to server info request " ) ;
2022-09-06 07:12:07 -04:00
let rinfo = RelayInfo ::from ( settings . info ) ;
2022-09-06 06:56:04 -04:00
let b = Body ::from ( serde_json ::to_string_pretty ( & rinfo ) . unwrap ( ) ) ;
return Ok ( Response ::builder ( )
. status ( 200 )
. header ( " Content-Type " , " application/nostr+json " )
. header ( " Access-Control-Allow-Origin " , " * " )
. body ( b )
. unwrap ( ) ) ;
}
}
}
Ok ( Response ::builder ( )
. status ( 200 )
. header ( " Content-Type " , " text/plain " )
. body ( Body ::from ( " Please use a Nostr client to connect. " ) )
. unwrap ( ) )
}
( _ , _ ) = > {
//handle any other url
Ok ( Response ::builder ( )
. status ( StatusCode ::NOT_FOUND )
. body ( Body ::from ( " Nothing here. " ) )
. unwrap ( ) )
}
}
}
2022-11-04 19:05:01 -04:00
fn get_header_remote_ip ( header : & str , headers : & HeaderMap ) -> Option < String > {
headers
. get ( header )
. and_then ( | x | x . to_str ( ) . ok ( ) . map ( | x | x . to_string ( ) ) )
2022-11-03 14:01:38 -04:00
}
2022-09-06 07:12:07 -04:00
// return on a control-c or internally requested shutdown signal
async fn ctrl_c_or_signal ( mut shutdown_signal : Receiver < ( ) > ) {
2022-09-28 08:20:31 -04:00
let mut term_signal = tokio ::signal ::unix ::signal ( tokio ::signal ::unix ::SignalKind ::terminate ( ) )
. expect ( " could not define signal " ) ;
2022-09-06 07:12:07 -04:00
loop {
tokio ::select! {
2022-09-28 08:20:31 -04:00
_ = shutdown_signal . recv ( ) = > {
2022-09-06 07:12:07 -04:00
info! ( " Shutting down webserver as requested " ) ;
// server shutting down, exit loop
break ;
} ,
_ = tokio ::signal ::ctrl_c ( ) = > {
info! ( " Shutting down webserver due to SIGINT " ) ;
break ;
2022-09-28 08:20:31 -04:00
} ,
_ = term_signal . recv ( ) = > {
info! ( " Shutting down webserver due to SIGTERM " ) ;
break ;
} ,
2022-09-06 07:12:07 -04:00
}
}
2022-09-06 06:56:04 -04:00
}
/// Start running a Nostr relay server.
2022-09-06 07:12:07 -04:00
pub fn start_server ( settings : Settings , shutdown_rx : MpscReceiver < ( ) > ) -> Result < ( ) , Error > {
2022-09-06 06:56:04 -04:00
trace! ( " Config: {:?} " , settings ) ;
// do some config validation.
if ! Path ::new ( & settings . database . data_directory ) . is_dir ( ) {
error! ( " Database directory does not exist " ) ;
return Err ( Error ::DatabaseDirError ) ;
}
let addr = format! (
" {}:{} " ,
settings . network . address . trim ( ) ,
settings . network . port
) ;
let socket_addr = addr . parse ( ) . expect ( " listening address not valid " ) ;
// address whitelisting settings
if let Some ( addr_whitelist ) = & settings . authorization . pubkey_whitelist {
info! (
" Event publishing restricted to {} pubkey(s) " ,
addr_whitelist . len ( )
) ;
}
// check if NIP-05 enforced user verification is on
if settings . verified_users . is_active ( ) {
info! (
" NIP-05 user verification mode:{:?} " ,
settings . verified_users . mode
) ;
if let Some ( d ) = settings . verified_users . verify_update_duration ( ) {
info! ( " NIP-05 check user verification every: {:?} " , d ) ;
}
if let Some ( d ) = settings . verified_users . verify_expiration_duration ( ) {
info! ( " NIP-05 user verification expires after: {:?} " , d ) ;
}
if let Some ( wl ) = & settings . verified_users . domain_whitelist {
info! ( " NIP-05 domain whitelist: {:?} " , wl ) ;
}
if let Some ( bl ) = & settings . verified_users . domain_blacklist {
info! ( " NIP-05 domain blacklist: {:?} " , bl ) ;
}
}
// configure tokio runtime
let rt = Builder ::new_multi_thread ( )
. enable_all ( )
. thread_name ( " tokio-ws " )
. build ( )
. unwrap ( ) ;
// start tokio
rt . block_on ( async {
2022-09-06 07:12:07 -04:00
let broadcast_buffer_limit = settings . limits . broadcast_buffer ;
let persist_buffer_limit = settings . limits . event_persist_buffer ;
let verified_users_active = settings . verified_users . is_active ( ) ;
let db_min_conn = settings . database . min_conn ;
let db_max_conn = settings . database . max_conn ;
let settings = settings . clone ( ) ;
2022-09-06 06:56:04 -04:00
info! ( " listening on: {} " , socket_addr ) ;
// all client-submitted valid events are broadcast to every
// other client on this channel. This should be large enough
// to accomodate slower readers (messages are dropped if
// clients can not keep up).
let ( bcast_tx , _ ) = broadcast ::channel ::< Event > ( broadcast_buffer_limit ) ;
// validated events that need to be persisted are sent to the
// database on via this channel.
let ( event_tx , event_rx ) = mpsc ::channel ::< SubmittedEvent > ( persist_buffer_limit ) ;
// establish a channel for letting all threads now about a
// requested server shutdown.
let ( invoke_shutdown , shutdown_listen ) = broadcast ::channel ::< ( ) > ( 1 ) ;
// create a channel for sending any new metadata event. These
// will get processed relatively slowly (a potentially
// multi-second blocking HTTP call) on a single thread, so we
// buffer requests on the channel. No harm in dropping events
// here, since we are protecting against DoS. This can make
// it difficult to setup initial metadata in bulk, since
// overwhelming this will drop events and won't register
// metadata events.
let ( metadata_tx , metadata_rx ) = broadcast ::channel ::< Event > ( 4096 ) ;
// start the database writer thread. Give it a channel for
// writing events, and for publishing events that have been
// written (to all connected clients).
db ::db_writer (
2022-09-06 07:12:07 -04:00
settings . clone ( ) ,
2022-09-06 06:56:04 -04:00
event_rx ,
bcast_tx . clone ( ) ,
metadata_tx . clone ( ) ,
shutdown_listen ,
)
. await ;
info! ( " db writer created " ) ;
2022-09-11 12:01:36 -04:00
// create a nip-05 verifier thread; if enabled.
if settings . verified_users . mode ! = VerifiedUsersMode ::Disabled {
let verifier_opt =
nip05 ::Verifier ::new ( metadata_rx , bcast_tx . clone ( ) , settings . clone ( ) ) ;
if let Ok ( mut v ) = verifier_opt {
if verified_users_active {
tokio ::task ::spawn ( async move {
info! ( " starting up NIP-05 verifier... " ) ;
v . run ( ) . await ;
} ) ;
}
2022-09-06 06:56:04 -04:00
}
}
2022-09-06 07:12:07 -04:00
// listen for (external to tokio) shutdown request
let controlled_shutdown = invoke_shutdown . clone ( ) ;
tokio ::spawn ( async move {
info! ( " control message listener started " ) ;
match shutdown_rx . recv ( ) {
Ok ( ( ) ) = > {
info! ( " control message requesting shutdown " ) ;
controlled_shutdown . send ( ( ) ) . ok ( ) ;
}
Err ( std ::sync ::mpsc ::RecvError ) = > {
debug! ( " shutdown requestor is disconnected " ) ;
}
} ;
} ) ;
// listen for ctrl-c interruupts
2022-09-06 06:56:04 -04:00
let ctrl_c_shutdown = invoke_shutdown . clone ( ) ;
2022-09-11 13:44:45 -04:00
// listener for webserver shutdown
let webserver_shutdown_listen = invoke_shutdown . subscribe ( ) ;
2022-09-06 06:56:04 -04:00
tokio ::spawn ( async move {
tokio ::signal ::ctrl_c ( ) . await . unwrap ( ) ;
2022-09-06 07:12:07 -04:00
info! ( " shutting down due to SIGINT (main) " ) ;
2022-09-06 06:56:04 -04:00
ctrl_c_shutdown . send ( ( ) ) . ok ( ) ;
} ) ;
// build a connection pool for sqlite connections
let pool = db ::build_pool (
" client query " ,
2022-09-24 10:19:16 -04:00
& settings ,
2022-09-06 06:56:04 -04:00
rusqlite ::OpenFlags ::SQLITE_OPEN_READ_ONLY
| rusqlite ::OpenFlags ::SQLITE_OPEN_SHARED_CACHE ,
db_min_conn ,
db_max_conn ,
true ,
) ;
// A `Service` is needed for every connection, so this
// creates one from our `handle_request` function.
let make_svc = make_service_fn ( | conn : & AddrStream | {
let svc_pool = pool . clone ( ) ;
let remote_addr = conn . remote_addr ( ) ;
let bcast = bcast_tx . clone ( ) ;
let event = event_tx . clone ( ) ;
let stop = invoke_shutdown . clone ( ) ;
2022-09-06 07:12:07 -04:00
let settings = settings . clone ( ) ;
2022-09-06 06:56:04 -04:00
async move {
// service_fn converts our function into a `Service`
Ok ::< _ , Infallible > ( service_fn ( move | request : Request < Body > | {
handle_web_request (
request ,
svc_pool . clone ( ) ,
2022-09-06 07:12:07 -04:00
settings . clone ( ) ,
2022-09-06 06:56:04 -04:00
remote_addr ,
bcast . clone ( ) ,
event . clone ( ) ,
stop . subscribe ( ) ,
)
} ) )
}
} ) ;
let server = Server ::bind ( & socket_addr )
. serve ( make_svc )
2022-09-11 13:44:45 -04:00
. with_graceful_shutdown ( ctrl_c_or_signal ( webserver_shutdown_listen ) ) ;
// run hyper in this thread. This is why the thread does not return.
2022-09-06 06:56:04 -04:00
if let Err ( e ) = server . await {
eprintln! ( " server error: {} " , e ) ;
}
} ) ;
Ok ( ( ) )
}
/// Nostr protocol messages from a client
2022-09-24 09:30:22 -04:00
#[ derive(Deserialize, Serialize, Clone, PartialEq, Eq, Debug) ]
2022-09-06 06:56:04 -04:00
#[ serde(untagged) ]
pub enum NostrMessage {
/// An `EVENT` message
EventMsg ( EventCmd ) ,
/// A `REQ` message
SubMsg ( Subscription ) ,
/// A `CLOSE` message
CloseMsg ( CloseCmd ) ,
}
/// Convert Message to NostrMessage
2022-09-06 07:12:07 -04:00
fn convert_to_msg ( msg : String , max_bytes : Option < usize > ) -> Result < NostrMessage > {
2022-09-06 06:56:04 -04:00
let parsed_res : Result < NostrMessage > = serde_json ::from_str ( & msg ) . map_err ( | e | e . into ( ) ) ;
match parsed_res {
Ok ( m ) = > {
if let NostrMessage ::EventMsg ( _ ) = m {
2022-09-06 07:12:07 -04:00
if let Some ( max_size ) = max_bytes {
2022-09-06 06:56:04 -04:00
// check length, ensure that some max size is set.
if msg . len ( ) > max_size & & max_size > 0 {
return Err ( Error ::EventMaxLengthError ( msg . len ( ) ) ) ;
}
}
}
Ok ( m )
}
Err ( e ) = > {
debug! ( " proto parse error: {:?} " , e ) ;
debug! ( " parse error on message: {} " , msg . trim ( ) ) ;
Err ( Error ::ProtoParseError )
}
}
}
/// Turn a string into a NOTICE message ready to send over a WebSocket
fn make_notice_message ( msg : & str ) -> Message {
Message ::text ( json! ( [ " NOTICE " , msg ] ) . to_string ( ) )
}
/// Handle new client connections. This runs through an event loop
/// for all client communication.
async fn nostr_server (
pool : db ::SqlitePool ,
2022-11-02 19:33:44 -04:00
remote_ip : String ,
2022-09-06 07:12:07 -04:00
settings : Settings ,
2022-09-06 06:56:04 -04:00
mut ws_stream : WebSocketStream < Upgraded > ,
broadcast : Sender < Event > ,
event_tx : mpsc ::Sender < SubmittedEvent > ,
mut shutdown : Receiver < ( ) > ,
) {
// get a broadcast channel for clients to communicate on
let mut bcast_rx = broadcast . subscribe ( ) ;
// Track internal client state
2022-11-02 19:33:44 -04:00
let mut conn = conn ::ClientConn ::new ( remote_ip ) ;
// Use the remote IP as the client identifier
2022-09-06 06:56:04 -04:00
let cid = conn . get_client_prefix ( ) ;
// Create a channel for receiving query results from the database.
// we will send out the tx handle to any query we generate.
let ( query_tx , mut query_rx ) = mpsc ::channel ::< db ::QueryResult > ( 256 ) ;
// Create channel for receiving NOTICEs
let ( notice_tx , mut notice_rx ) = mpsc ::channel ::< String > ( 32 ) ;
// last time this client sent data (message, ping, etc.)
let mut last_message_time = Instant ::now ( ) ;
// ping interval (every 5 minutes)
let default_ping_dur = Duration ::from_secs ( 300 ) ;
// disconnect after 20 minutes without a ping response or event.
let max_quiet_time = Duration ::from_secs ( 60 * 20 ) ;
let start = tokio ::time ::Instant ::now ( ) + default_ping_dur ;
let mut ping_interval = tokio ::time ::interval_at ( start , default_ping_dur ) ;
// maintain a hashmap of a oneshot channel for active subscriptions.
// when these subscriptions are cancelled, make a message
// available to the executing query so it knows to stop.
let mut running_queries : HashMap < String , oneshot ::Sender < ( ) > > = HashMap ::new ( ) ;
// for stats, keep track of how many events the client published,
// and how many it received from queries.
let mut client_published_event_count : usize = 0 ;
let mut client_received_event_count : usize = 0 ;
2022-11-03 16:15:47 -04:00
debug! ( " new connection for client: {:?}, ip: {:?} " , cid , conn . ip ( ) ) ;
2022-09-06 06:56:04 -04:00
loop {
tokio ::select! {
_ = shutdown . recv ( ) = > {
2022-11-02 19:33:44 -04:00
info! ( " Shutting client connection down due to shutdown: {:?}, ip: {:?} " , cid , conn . ip ( ) ) ;
2022-09-06 06:56:04 -04:00
// server shutting down, exit loop
break ;
} ,
_ = ping_interval . tick ( ) = > {
// check how long since we talked to client
// if it has been too long, disconnect
if last_message_time . elapsed ( ) > max_quiet_time {
debug! ( " ending connection due to lack of client ping response " ) ;
break ;
}
// Send a ping
ws_stream . send ( Message ::Ping ( Vec ::new ( ) ) ) . await . ok ( ) ;
} ,
Some ( notice_msg ) = notice_rx . recv ( ) = > {
ws_stream . send ( make_notice_message ( & notice_msg ) ) . await . ok ( ) ;
} ,
Some ( query_result ) = query_rx . recv ( ) = > {
// database informed us of a query result we asked for
let subesc = query_result . sub_id . replace ( '"' , " " ) ;
if query_result . event = = " EOSE " {
let send_str = format! ( " [ \" EOSE \" , \" {} \" ] " , subesc ) ;
ws_stream . send ( Message ::Text ( send_str ) ) . await . ok ( ) ;
} else {
client_received_event_count + = 1 ;
// send a result
let send_str = format! ( " [ \" EVENT \" , \" {} \" , {} ] " , subesc , & query_result . event ) ;
ws_stream . send ( Message ::Text ( send_str ) ) . await . ok ( ) ;
}
} ,
// TODO: consider logging the LaggedRecv error
Ok ( global_event ) = bcast_rx . recv ( ) = > {
// an event has been broadcast to all clients
// first check if there is a subscription for this event.
let matching_subs = conn . get_matching_subscriptions ( & global_event ) ;
for s in matching_subs {
// TODO: serialize at broadcast time, instead of
// once for each consumer.
if let Ok ( event_str ) = serde_json ::to_string ( & global_event ) {
2022-09-11 11:22:01 -04:00
debug! ( " sub match for client: {:?}, sub: {:?}, event: {:?} " ,
2022-09-06 06:56:04 -04:00
cid , s ,
global_event . get_event_id_prefix ( ) ) ;
// create an event response and send it
let subesc = s . replace ( '"' , " " ) ;
ws_stream . send ( Message ::Text ( format! ( " [ \" EVENT \" , \" {} \" , {} ] " , subesc , event_str ) ) ) . await . ok ( ) ;
} else {
2022-09-11 11:22:01 -04:00
warn! ( " could not serialize event: {:?} " , global_event . get_event_id_prefix ( ) ) ;
2022-09-06 06:56:04 -04:00
}
}
} ,
ws_next = ws_stream . next ( ) = > {
// update most recent message time for client
last_message_time = Instant ::now ( ) ;
// Consume text messages from the client, parse into Nostr messages.
let nostr_msg = match ws_next {
Some ( Ok ( Message ::Text ( m ) ) ) = > {
2022-09-06 07:12:07 -04:00
convert_to_msg ( m , settings . limits . max_event_bytes )
2022-09-06 06:56:04 -04:00
} ,
Some ( Ok ( Message ::Binary ( _ ) ) ) = > {
ws_stream . send (
make_notice_message ( " binary messages are not accepted " ) ) . await . ok ( ) ;
continue ;
} ,
2022-09-24 10:01:09 -04:00
Some ( Ok ( Message ::Ping ( _ ) | Message ::Pong ( _ ) ) ) = > {
2022-09-06 06:56:04 -04:00
// get a ping/pong, ignore. tungstenite will
// send responses automatically.
continue ;
} ,
Some ( Err ( WsError ::Capacity ( MessageTooLong { size , max_size } ) ) ) = > {
ws_stream . send (
make_notice_message (
& format! ( " message too large ( {} > {} ) " , size , max_size ) ) ) . await . ok ( ) ;
continue ;
} ,
None |
2022-09-24 10:01:09 -04:00
Some ( Ok ( Message ::Close ( _ ) ) |
Err ( WsError ::AlreadyClosed | WsError ::ConnectionClosed |
WsError ::Protocol ( tungstenite ::error ::ProtocolError ::ResetWithoutClosingHandshake ) ) )
2022-09-06 06:56:04 -04:00
= > {
2022-11-02 19:33:44 -04:00
debug! ( " websocket close from client: {:?}, ip: {:?} " , cid , conn . ip ( ) ) ;
2022-09-06 06:56:04 -04:00
break ;
} ,
Some ( Err ( WsError ::Io ( e ) ) ) = > {
// IO errors are considered fatal
2022-11-02 19:33:44 -04:00
warn! ( " IO error (client: {:?}, ip: {:?}): {:?} " , cid , conn . ip ( ) , e ) ;
2022-09-06 06:56:04 -04:00
break ;
}
x = > {
// default condition on error is to close the client connection
2022-11-02 19:33:44 -04:00
info! ( " unknown error (client: {:?}, ip: {:?}): {:?} (closing conn) " , cid , conn . ip ( ) , x ) ;
2022-09-06 06:56:04 -04:00
break ;
}
} ;
// convert ws_next into proto_next
match nostr_msg {
Ok ( NostrMessage ::EventMsg ( ec ) ) = > {
// An EventCmd needs to be validated to be converted into an Event
// handle each type of message
let parsed : Result < Event > = Result ::< Event > ::from ( ec ) ;
match parsed {
Ok ( e ) = > {
let id_prefix :String = e . id . chars ( ) . take ( 8 ) . collect ( ) ;
debug! ( " successfully parsed/validated event: {:?} from client: {:?} " , id_prefix , cid ) ;
2022-09-06 07:12:07 -04:00
// check if the event is too far in the future.
if e . is_valid_timestamp ( settings . options . reject_future_seconds ) {
// Write this to the database.
let submit_event = SubmittedEvent { event : e . clone ( ) , notice_tx : notice_tx . clone ( ) } ;
event_tx . send ( submit_event ) . await . ok ( ) ;
client_published_event_count + = 1 ;
} else {
info! ( " client {:?} sent a far future-dated event " , cid ) ;
2022-09-10 21:40:10 -04:00
if let Some ( fut_sec ) = settings . options . reject_future_seconds {
ws_stream . send ( make_notice_message ( & format! ( " The event created_at field is out of the acceptable range (+ {} sec) for this relay and was not stored. " , fut_sec ) ) ) . await . ok ( ) ;
}
2022-09-06 07:12:07 -04:00
}
2022-09-06 06:56:04 -04:00
} ,
Err ( _ ) = > {
2022-11-02 19:33:44 -04:00
info! ( " client: {:?} sent an invalid event " , cid ) ;
2022-09-06 06:56:04 -04:00
ws_stream . send ( make_notice_message ( " event was invalid " ) ) . await . ok ( ) ;
}
}
} ,
Ok ( NostrMessage ::SubMsg ( s ) ) = > {
debug! ( " client {} requesting a subscription " , cid ) ;
// subscription handling consists of:
// * registering the subscription so future events can be matched
// * making a channel to cancel to request later
// * sending a request for a SQL query
let ( abandon_query_tx , abandon_query_rx ) = oneshot ::channel ::< ( ) > ( ) ;
match conn . subscribe ( s . clone ( ) ) {
Ok ( ( ) ) = > {
// when we insert, if there was a previous query running with the same name, cancel it.
if let Some ( previous_query ) = running_queries . insert ( s . id . to_owned ( ) , abandon_query_tx ) {
previous_query . send ( ( ) ) . ok ( ) ;
}
// start a database query
db ::db_query ( s , cid . to_owned ( ) , pool . clone ( ) , query_tx . clone ( ) , abandon_query_rx ) . await ;
} ,
Err ( e ) = > {
info! ( " Subscription error: {} " , e ) ;
ws_stream . send ( make_notice_message ( & e . to_string ( ) ) ) . await . ok ( ) ;
}
}
} ,
Ok ( NostrMessage ::CloseMsg ( cc ) ) = > {
// closing a request simply removes the subscription.
let parsed : Result < Close > = Result ::< Close > ::from ( cc ) ;
2022-09-24 20:28:02 -04:00
if let Ok ( c ) = parsed {
2022-09-06 06:56:04 -04:00
// check if a query is currently
// running, and remove it if so.
let stop_tx = running_queries . remove ( & c . id ) ;
if let Some ( tx ) = stop_tx {
tx . send ( ( ) ) . ok ( ) ;
}
// stop checking new events against
// the subscription
2022-09-24 10:19:16 -04:00
conn . unsubscribe ( & c ) ;
2022-09-24 20:28:02 -04:00
} else {
2022-09-06 06:56:04 -04:00
info! ( " invalid command ignored " ) ;
ws_stream . send ( make_notice_message ( " could not parse command " ) ) . await . ok ( ) ;
}
} ,
Err ( Error ::ConnError ) = > {
2022-11-02 19:33:44 -04:00
debug! ( " got connection close/error, disconnecting client: {:?}, ip: {:?} " , cid , conn . ip ( ) ) ;
2022-09-06 06:56:04 -04:00
break ;
}
Err ( Error ::EventMaxLengthError ( s ) ) = > {
info! ( " client {:?} sent event larger ({} bytes) than max size " , cid , s ) ;
ws_stream . send ( make_notice_message ( " event exceeded max size " ) ) . await . ok ( ) ;
} ,
Err ( Error ::ProtoParseError ) = > {
info! ( " client {:?} sent event that could not be parsed " , cid ) ;
ws_stream . send ( make_notice_message ( " could not parse command " ) ) . await . ok ( ) ;
} ,
Err ( e ) = > {
info! ( " got non-fatal error from client: {:?}, error: {:?} " , cid , e ) ;
} ,
}
} ,
}
}
// connection cleanup - ensure any still running queries are terminated.
2022-09-24 20:28:02 -04:00
for ( _ , stop_tx ) in running_queries {
2022-09-06 06:56:04 -04:00
stop_tx . send ( ( ) ) . ok ( ) ;
}
info! (
2022-11-02 19:33:44 -04:00
" stopping connection for client: {:?}, ip: {:?} (client sent {} event(s), received {}) " ,
cid ,
conn . ip ( ) ,
client_published_event_count ,
client_received_event_count
2022-09-06 06:56:04 -04:00
) ;
}