feat: gRPC authorization for events

closes: https://todo.sr.ht/~gheartsfield/nostr-rs-relay/46
This commit is contained in:
Greg Heartsfield 2023-02-11 13:26:08 -06:00
parent e63d179424
commit a16c4e698a
17 changed files with 1522 additions and 8 deletions

81
Cargo.lock generated
View File

@ -783,6 +783,12 @@ dependencies = [
"instant", "instant",
] ]
[[package]]
name = "fixedbitset"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]] [[package]]
name = "flate2" name = "flate2"
version = "1.0.25" version = "1.0.25"
@ -1490,6 +1496,12 @@ dependencies = [
"windows-sys 0.42.0", "windows-sys 0.42.0",
] ]
[[package]]
name = "multimap"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]] [[package]]
name = "native-tls" name = "native-tls"
version = "0.2.11" version = "0.2.11"
@ -1556,6 +1568,7 @@ dependencies = [
"nonzero_ext", "nonzero_ext",
"parse_duration", "parse_duration",
"prometheus", "prometheus",
"prost",
"r2d2", "r2d2",
"r2d2_sqlite", "r2d2_sqlite",
"rand 0.8.5", "rand 0.8.5",
@ -1568,6 +1581,8 @@ dependencies = [
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-tungstenite", "tokio-tungstenite",
"tonic",
"tonic-build",
"tracing", "tracing",
"tracing-subscriber 0.2.25", "tracing-subscriber 0.2.25",
"tungstenite", "tungstenite",
@ -1861,6 +1876,16 @@ dependencies = [
"sha2", "sha2",
] ]
[[package]]
name = "petgraph"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dd7d28ee937e54fe3080c91faa1c3a46c06de6252988a7f4592ba2310ef22a4"
dependencies = [
"fixedbitset",
"indexmap",
]
[[package]] [[package]]
name = "pin-project" name = "pin-project"
version = "1.0.12" version = "1.0.12"
@ -1925,6 +1950,16 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "prettyplease"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e97e3215779627f01ee256d2fad52f3d95e8e1c11e9fc6fd08f7cd455d5d5c78"
dependencies = [
"proc-macro2",
"syn",
]
[[package]] [[package]]
name = "proc-macro-error" name = "proc-macro-error"
version = "1.0.4" version = "1.0.4"
@ -1983,6 +2018,28 @@ dependencies = [
"prost-derive", "prost-derive",
] ]
[[package]]
name = "prost-build"
version = "0.11.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3f8ad728fb08fe212df3c05169e940fbb6d9d16a877ddde14644a983ba2012e"
dependencies = [
"bytes",
"heck",
"itertools",
"lazy_static",
"log",
"multimap",
"petgraph",
"prettyplease",
"prost",
"prost-types",
"regex",
"syn",
"tempfile",
"which",
]
[[package]] [[package]]
name = "prost-derive" name = "prost-derive"
version = "0.11.6" version = "0.11.6"
@ -2926,6 +2983,19 @@ dependencies = [
"tracing-futures", "tracing-futures",
] ]
[[package]]
name = "tonic-build"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bf5e9b9c0f7e0a7c027dcfaba7b2c60816c7049171f679d99ee2ff65d0de8c4"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build",
"quote",
"syn",
]
[[package]] [[package]]
name = "tower" name = "tower"
version = "0.4.13" version = "0.4.13"
@ -3353,6 +3423,17 @@ dependencies = [
"cc", "cc",
] ]
[[package]]
name = "which"
version = "4.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2441c784c52b289a054b7201fc93253e288f094e2f4be9058343127c4226a269"
dependencies = [
"either",
"libc",
"once_cell",
]
[[package]] [[package]]
name = "whoami" name = "whoami"
version = "1.3.0" version = "1.3.0"

View File

@ -16,6 +16,8 @@ clap = { version = "4.0.32", features = ["env", "default", "derive"]}
tracing = "0.1.36" tracing = "0.1.36"
tracing-subscriber = "0.2.0" tracing-subscriber = "0.2.0"
tokio = { version = "1", features = ["full", "tracing", "signal"] } tokio = { version = "1", features = ["full", "tracing", "signal"] }
prost = "0.11"
tonic = "0.8.3"
console-subscriber = "0.1.8" console-subscriber = "0.1.8"
futures = "0.3" futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
@ -52,3 +54,6 @@ bech32 = "0.9.1"
[dev-dependencies] [dev-dependencies]
anyhow = "1" anyhow = "1"
[build-dependencies]
tonic-build = { version="0.8.3", features = ["prost"] }

4
build.rs Normal file
View File

@ -0,0 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/nauthz.proto")?;
Ok(())
}

View File

@ -48,6 +48,16 @@ description = "A newly created nostr-rs-relay.\n\nCustomize this with your own i
# sqlite. # sqlite.
#connection = "postgresql://postgres:nostr@localhost:7500/nostr" #connection = "postgresql://postgres:nostr@localhost:7500/nostr"
[grpc]
# gRPC interfaces for externalized decisions and other extensions to
# functionality.
#
# Events can be authorized through an external service, by providing
# the URL below. In the event the server is not accessible, events
# will be permitted. The protobuf3 schema used is available in
# `proto/nauthz.proto`.
# event_authorization_server = "http://[::1]:50051"
[network] [network]
# Bind to this network address # Bind to this network address
address = "0.0.0.0" address = "0.0.0.0"

79
docs/grpc-extensions.md Normal file
View File

@ -0,0 +1,79 @@
# gRPC Extensions Design Document
The relay will be extensible through gRPC endpoints, definable in the
main configuration file. These will allow external programs to host
logic for deciding things such as, should this event be persisted,
should this connection be allowed, and should this subscription
request be registered. The primary goal is allow for relay operator
specific functionality that allows them to serve smaller communities
and reduce spam and abuse.
This will likely evolve substantially, the first goal is to get a
basic one-way service that lets an externalized program decide on
event persistance. This does not represent the final state of gRPC
extensibility in `nostr-rs-relay`.
## Considerations
Write event latency must not be significantly affected. However, the
primary reason we are implementing this is spam/abuse protection, so
we are willing to tolerate some increase in latency if that protects
us against outages!
The interface should provide enough information to make simple
decisions, without burdening the relay to do extra queries. The
decision endpoint will be mostly responsible for maintaining state and
gathering additional details.
## Design Overview
A gRPC server may be defined in the `config.toml` file. If it exists,
the relay will attempt to connect to it and send a message for each
`EVENT` command submitted by clients. If a successful response is
returned indicating the event is permitted, the relay continues
processing the event as normal. All existing whitelist, blacklist,
and `NIP-05` validation checks are still performed and MAY still
result in the event being rejected. If a successful response is
returned indicated the decision is anything other than permit, then
the relay MUST reject the event, and return a command result to the
user (using `NIP-20`) indicating the event was blocked (optionally
providing a message).
In the event there is an error in the gRPC interface, event processing
proceeds as if gRPC was disabled (fail open). This allows gRPC
servers to be deployed with minimal chance of causing a full relay
outage.
## Design Details
Currently one procedure call is supported, `EventAdmit`, in the
`Authorization` service. It accepts the following data in order to
support authorization decisions:
- The event itself
- The client IP that submitted the event
- The client's HTTP origin header, if one exists
- The client's HTTP user agent header, if one exists
- The public key of the client, if `NIP-42` authentication was
performed (not supported in the relay yet!)
- The `NIP-05` associated with the event's public key, if it is known
to the relay
A server providing authorization decisions will return the following:
- A decision to permit or deny the event
- An optional message that explains why the event was denied, to be
transmitted to the client
## Security Issues
There is little attempt to secure this interface, since it is intended
for use processes running on the same host. It is recommended to
ensure that the gRPC server providing the API is not exposed to the
public Internet. Authorization server implementations should have
their own security reviews performed.
A slow gRPC server could cause availability issues for event
processing, since this is performed on a single thread. Avoid any
expensive or long-running processes that could result from submitted
events, since any client can initiate a gRPC call to the service.

1010
nauthz_server_example/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,13 @@
[package]
name = "nauthz-server"
version = "0.1.0"
edition = "2021"
[dependencies]
# Common dependencies
tokio = { version = "1.0", features = ["rt-multi-thread", "macros"] }
prost = "0.11"
tonic = "0.8.3"
[build-dependencies]
tonic-build = { version="0.8.3", features = ["prost"] }

View File

@ -0,0 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("../proto/nauthz.proto")?;
Ok(())
}

View File

@ -0,0 +1,61 @@
use tonic::{transport::Server, Request, Response, Status};
use nauthz_grpc::authorization_server::{Authorization, AuthorizationServer};
use nauthz_grpc::{EventReply, EventRequest, Decision};
pub mod nauthz_grpc {
tonic::include_proto!("nauthz");
}
#[derive(Default)]
pub struct EventAuthz {
allowed_kinds: Vec<u64>,
}
#[tonic::async_trait]
impl Authorization for EventAuthz {
async fn event_admit(
&self,
request: Request<EventRequest>,
) -> Result<Response<EventReply>, Status> {
let reply;
let req = request.into_inner();
let event = req.event.unwrap();
let content_prefix:String = event.content.chars().take(40).collect();
println!("recvd event, [kind={}, origin={:?}, nip05_domain={:?}, tag_count={}, content_sample={:?}]",
event.kind, req.origin, req.nip05.map(|x| x.domain), event.tags.len(), content_prefix);
// Permit any event with a whitelisted kind
if self.allowed_kinds.contains(&event.kind) {
println!("This looks fine! (kind={})",event.kind);
reply = nauthz_grpc::EventReply {
decision: Decision::Permit as i32,
message: None
};
} else {
println!("Blocked! (kind={})",event.kind);
reply = nauthz_grpc::EventReply {
decision: Decision::Deny as i32,
message: Some(format!("kind {} not permitted", event.kind)),
};
}
Ok(Response::new(reply))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse().unwrap();
// A simple authorization engine that allows kinds 1, 2, and 3.
let checker = EventAuthz {
allowed_kinds: vec![0,1,2,3],
};
println!("EventAuthz Server listening on {}", addr);
// Start serving
Server::builder()
.add_service(AuthorizationServer::new(checker))
.serve(addr)
.await?;
Ok(())
}

60
proto/nauthz.proto Normal file
View File

@ -0,0 +1,60 @@
syntax = "proto3";
// Nostr Authorization Services
package nauthz;
// Authorization for actions against a relay
service Authorization {
// Determine if an event should be admitted to the relay
rpc EventAdmit(EventRequest) returns (EventReply) {}
}
message Event {
bytes id = 1; // 32-byte SHA256 hash of serialized event
bytes pubkey = 2; // 32-byte public key of event creator
fixed64 created_at = 3; // UNIX timestamp provided by event creator
uint64 kind = 4; // event kind
string content = 5; // arbitrary event contents
repeated TagEntry tags = 6; // event tag array
bytes sig = 7; // 32-byte signature of the event id
// Individual values for a single tag
message TagEntry {
repeated string values = 1;
}
}
// Event data and metadata for authorization decisions
message EventRequest {
Event event =
1; // the event to be admitted for further relay processing
optional string ip_addr =
2; // IP address of the client that submitted the event
optional string origin =
3; // HTTP origin header from the client, if one exists
optional string user_agent =
4; // HTTP user-agent header from the client, if one exists
optional bytes auth_pubkey =
5; // the public key associated with a NIP-42 AUTH'd session, if
// authentication occurred
optional Nip05Name nip05 =
6; // NIP-05 address associated with the event pubkey, if it is
// known and has been validated by the relay
// A NIP_05 verification record
message Nip05Name {
string local = 1;
string domain = 2;
}
}
// A permit or deny decision
enum Decision {
DECISION_UNSPECIFIED = 0;
DECISION_PERMIT = 1; // Admit this event for further processing
DECISION_DENY = 2; // Deny persisting or propagating this event
}
// Response to a event authorization request
message EventReply {
Decision decision = 1; // decision to enforce
optional string message = 2; // informative message for the client
}

View File

@ -25,6 +25,12 @@ pub struct Database {
pub connection: String, pub connection: String,
} }
#[derive(Debug, Clone, Serialize, Deserialize)]
#[allow(unused)]
pub struct Grpc {
pub event_admission_server: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[allow(unused)] #[allow(unused)]
pub struct Network { pub struct Network {
@ -145,6 +151,7 @@ pub struct Settings {
pub info: Info, pub info: Info,
pub diagnostics: Diagnostics, pub diagnostics: Diagnostics,
pub database: Database, pub database: Database,
pub grpc: Grpc,
pub network: Network, pub network: Network,
pub limits: Limits, pub limits: Limits,
pub authorization: Authorization, pub authorization: Authorization,
@ -220,6 +227,9 @@ impl Default for Settings {
max_conn: 8, max_conn: 8,
connection: "".to_owned(), connection: "".to_owned(),
}, },
grpc: Grpc {
event_admission_server: None,
},
network: Network { network: Network {
port: 8080, port: 8080,
ping_interval_seconds: 300, ping_interval_seconds: 300,

View File

@ -4,6 +4,7 @@ use crate::error::{Error, Result};
use crate::event::Event; use crate::event::Event;
use crate::notice::Notice; use crate::notice::Notice;
use crate::server::NostrMetrics; use crate::server::NostrMetrics;
use crate::nauthz;
use governor::clock::Clock; use governor::clock::Clock;
use governor::{Quota, RateLimiter}; use governor::{Quota, RateLimiter};
use r2d2; use r2d2;
@ -27,6 +28,8 @@ pub struct SubmittedEvent {
pub event: Event, pub event: Event,
pub notice_tx: tokio::sync::mpsc::Sender<Notice>, pub notice_tx: tokio::sync::mpsc::Sender<Notice>,
pub source_ip: String, pub source_ip: String,
pub origin: Option<String>,
pub user_agent: Option<String>,
} }
/// Database file /// Database file
@ -101,6 +104,18 @@ pub async fn db_writer(
lim_opt = Some(RateLimiter::direct(Quota::per_minute(quota))); lim_opt = Some(RateLimiter::direct(Quota::per_minute(quota)));
} }
} }
// create a client if GRPC is enabled.
// Check with externalized event admitter service, if one is defined.
let mut grpc_client = if let Some(svr) = settings.grpc.event_admission_server {
Some(nauthz::EventAuthzService::connect(&svr).await)
} else {
None
};
//let gprc_client = settings.grpc.event_admission_server.map(|s| {
// event_admitter_connect(&s);
// });
loop { loop {
if shutdown.try_recv().is_ok() { if shutdown.try_recv().is_ok() {
info!("shutting down database writer"); info!("shutting down database writer");
@ -165,9 +180,17 @@ pub async fn db_writer(
metadata_tx.send(event.clone()).ok(); metadata_tx.send(event.clone()).ok();
} }
// get a validation result for use in verification and GPRC
let validation = if nip05_active {
Some(repo.get_latest_user_verification(&event.pubkey).await)
} else {
None
};
// check for NIP-05 verification // check for NIP-05 verification
if nip05_enabled { if nip05_enabled && validation.is_some() {
match repo.get_latest_user_verification(&event.pubkey).await { match validation.as_ref().unwrap() {
Ok(uv) => { Ok(uv) => {
if uv.is_valid(&settings.verified_users) { if uv.is_valid(&settings.verified_users) {
info!( info!(
@ -175,6 +198,7 @@ pub async fn db_writer(
uv.name.to_string(), uv.name.to_string(),
event.get_author_prefix() event.get_author_prefix()
); );
} else { } else {
info!( info!(
"rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)", "rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)",
@ -209,6 +233,35 @@ pub async fn db_writer(
} }
} }
} }
// nip05 address
let nip05_address : Option<crate::nip05::Nip05Name> = validation.and_then(|x| x.ok().map(|y| y.name));
// GRPC check
if let Some(ref mut c) = grpc_client {
trace!("checking if grpc permits");
let grpc_start = Instant::now();
let decision_res = c.admit_event(&event, &subm_event.source_ip, subm_event.origin, subm_event.user_agent, nip05_address).await;
match decision_res {
Ok(decision) => {
if !decision.permitted() {
// GPRC returned a decision to reject this event
info!("GRPC rejected event: {:?} (kind: {}) from: {:?} in: {:?} (IP: {:?})",
event.get_event_id_prefix(),
event.kind,
event.get_author_prefix(),
grpc_start.elapsed(),
subm_event.source_ip);
notice_tx.try_send(Notice::blocked(event.id, &decision.message().unwrap_or_else(|| "".to_string()))).ok();
continue;
}
},
Err(e) => {
warn!("GRPC server error: {:?}", e);
}
}
}
// TODO: cache recent list of authors to remove a DB call. // TODO: cache recent list of authors to remove a DB call.
let start = Instant::now(); let start = Instant::now();
if event.is_ephemeral() { if event.is_ephemeral() {

View File

@ -64,6 +64,10 @@ pub enum Error {
DelegationParseError, DelegationParseError,
#[error("Channel closed error")] #[error("Channel closed error")]
ChannelClosed, ChannelClosed,
#[error("Authz error")]
AuthzError,
#[error("Tonic GRPC error")]
TonicError(tonic::Status),
#[error("Unknown/Undocumented")] #[error("Unknown/Undocumented")]
UnknownError, UnknownError,
} }
@ -132,3 +136,10 @@ impl From<config::ConfigError> for Error {
Error::ConfigError(r) Error::ConfigError(r)
} }
} }
impl From<tonic::Status> for Error {
/// Wrap Config error
fn from(r: tonic::Status) -> Self {
Error::TonicError(r)
}
}

View File

@ -9,6 +9,7 @@ pub mod event;
pub mod hexrange; pub mod hexrange;
pub mod info; pub mod info;
pub mod nip05; pub mod nip05;
pub mod nauthz;
pub mod notice; pub mod notice;
pub mod repo; pub mod repo;
pub mod subscription; pub mod subscription;

110
src/nauthz.rs Normal file
View File

@ -0,0 +1,110 @@
use crate::error::{Error, Result};
use crate::{event::Event, nip05::Nip05Name};
use nauthz_grpc::authorization_client::AuthorizationClient;
use nauthz_grpc::event::TagEntry;
use nauthz_grpc::{Decision, Event as GrpcEvent, EventReply, EventRequest};
use tracing::{info, warn};
pub mod nauthz_grpc {
tonic::include_proto!("nauthz");
}
// A decision for the DB to act upon
pub trait AuthzDecision: Send + Sync {
fn permitted(&self) -> bool;
fn message(&self) -> Option<String>;
}
impl AuthzDecision for EventReply {
fn permitted(&self) -> bool {
self.decision == Decision::Permit as i32
}
fn message(&self) -> Option<String> {
self.message.clone()
}
}
// A connection to an event admission GRPC server
pub struct EventAuthzService {
server_addr: String,
conn: Option<AuthorizationClient<tonic::transport::Channel>>,
}
// conversion of Nip05Names into GRPC type
impl std::convert::From<Nip05Name> for nauthz_grpc::event_request::Nip05Name {
fn from(value: Nip05Name) -> Self {
nauthz_grpc::event_request::Nip05Name {
local: value.local.clone(),
domain: value.domain.clone(),
}
}
}
// conversion of event tags into gprc struct
fn tags_to_protobuf(tags: &Vec<Vec<String>>) -> Vec<TagEntry> {
tags.iter()
.map(|x| TagEntry { values: x.clone() })
.collect()
}
impl EventAuthzService {
pub async fn connect(server_addr: &str) -> EventAuthzService {
let mut eas = EventAuthzService {
server_addr: server_addr.to_string(),
conn: None,
};
eas.ready_connection().await;
eas
}
pub async fn ready_connection(self: &mut Self) {
if self.conn.is_none() {
let client = AuthorizationClient::connect(self.server_addr.to_string()).await;
if let Err(ref msg) = client {
warn!("could not connect to nostr authz GRPC server: {:?}", msg);
} else {
info!("connected to nostr authorization GRPC server");
}
self.conn = client.ok();
}
}
pub async fn admit_event(
self: &mut Self,
event: &Event,
ip: &str,
origin: Option<String>,
user_agent: Option<String>,
nip05: Option<Nip05Name>,
) -> Result<Box<dyn AuthzDecision>> {
self.ready_connection().await;
let id_blob = hex::decode(&event.id)?;
let pubkey_blob = hex::decode(&event.pubkey)?;
let sig_blob = hex::decode(&event.sig)?;
if let Some(ref mut c) = self.conn {
let gevent = GrpcEvent {
id: id_blob,
pubkey: pubkey_blob,
sig: sig_blob,
created_at: event.created_at,
kind: event.kind,
content: event.content.clone(),
tags: tags_to_protobuf(&event.tags),
};
let svr_res = c
.event_admit(EventRequest {
event: Some(gevent),
ip_addr: Some(ip.to_string()),
origin,
user_agent,
auth_pubkey: None,
nip05: nip05.map(|x| nauthz_grpc::event_request::Nip05Name::from(x)),
})
.await?;
let reply = svr_res.into_inner();
return Ok(Box::new(reply));
} else {
return Err(Error::AuthzError);
}
}
}

View File

@ -42,8 +42,8 @@ pub struct Verifier {
/// A NIP-05 identifier is a local part and domain. /// A NIP-05 identifier is a local part and domain.
#[derive(PartialEq, Eq, Debug, Clone)] #[derive(PartialEq, Eq, Debug, Clone)]
pub struct Nip05Name { pub struct Nip05Name {
local: String, pub local: String,
domain: String, pub domain: String,
} }
impl Nip05Name { impl Nip05Name {

View File

@ -601,11 +601,13 @@ async fn nostr_server(
// and how many it received from queries. // and how many it received from queries.
let mut client_published_event_count: usize = 0; let mut client_published_event_count: usize = 0;
let mut client_received_event_count: usize = 0; let mut client_received_event_count: usize = 0;
let unspec = "<unspecified>".to_string();
info!("new client connection (cid: {}, ip: {:?})", cid, conn.ip()); info!("new client connection (cid: {}, ip: {:?})", cid, conn.ip());
let origin = client_info.origin.unwrap_or_else(|| "<unspecified>".into()); let origin = client_info.origin.as_ref().unwrap_or_else(|| &unspec);
let user_agent = client_info let user_agent = client_info
.user_agent .user_agent.as_ref()
.unwrap_or_else(|| "<unspecified>".into()); .unwrap_or_else(|| &unspec);
info!( info!(
"cid: {}, origin: {:?}, user-agent: {:?}", "cid: {}, origin: {:?}, user-agent: {:?}",
cid, origin, user_agent cid, origin, user_agent
@ -736,7 +738,7 @@ async fn nostr_server(
// check if the event is too far in the future. // check if the event is too far in the future.
if e.is_valid_timestamp(settings.options.reject_future_seconds) { if e.is_valid_timestamp(settings.options.reject_future_seconds) {
// Write this to the database. // Write this to the database.
let submit_event = SubmittedEvent { event: e.clone(), notice_tx: notice_tx.clone(), source_ip: conn.ip().to_string()}; let submit_event = SubmittedEvent { event: e.clone(), notice_tx: notice_tx.clone(), source_ip: conn.ip().to_string(), origin: client_info.origin.clone(), user_agent: client_info.user_agent.clone()};
event_tx.send(submit_event).await.ok(); event_tx.send(submit_event).await.ok();
client_published_event_count += 1; client_published_event_count += 1;
} else { } else {