Compare commits

...

25 Commits
0.8.3 ... 0.8.8

Author SHA1 Message Date
Greg Heartsfield
2be75e18fb build: bump version to 0.8.8 2023-02-21 08:16:40 -06:00
Greg Heartsfield
5f6ff4c2b7 fix: in-memory SQLite DB correctly shares memory between connections
fixes: https://todo.sr.ht/~gheartsfield/nostr-rs-relay/73#event-227131
2023-02-21 08:14:19 -06:00
Greg Heartsfield
df411c24fb fix: block other writers during checkpoint to eliminate DB lock errors 2023-02-20 16:50:44 -06:00
Greg Heartsfield
39f9984c4f build: bump version to 0.8.7 2023-02-17 21:05:36 -06:00
Greg Heartsfield
9d55731073 fix: Postgres SQL generation for expiring events 2023-02-17 21:04:30 -06:00
Greg Heartsfield
5638f70d66 fix: set SQL tracing back to appropriate level 2023-02-17 20:50:19 -06:00
Greg Heartsfield
98a08d054a improvement: advertise support for NIP-42 in relay info 2023-02-17 14:02:49 -06:00
Greg Heartsfield
0ef7d618a8 build: bump version to 0.8.6 2023-02-17 13:59:07 -06:00
Greg Heartsfield
bf06bea808 feat(NIP-40): postgres support for event expiration 2023-02-17 13:25:56 -06:00
Greg Heartsfield
e5ca8c2a86 improvement: run expired event cleanup every 10 minutes 2023-02-17 11:22:00 -06:00
Greg Heartsfield
8ea63f0b27 feat(NIP-40): sqlite support for event expiration 2023-02-17 11:15:06 -06:00
Greg Heartsfield
3229e4192f feat: publish favicon.ico 2023-02-16 18:03:28 -06:00
0xtr
7fd9b55e70 fix: typo in sqlite_migration.rs 2023-02-15 18:52:49 -06:00
rorp
5cecfba319 feat(NIP-42): pubkey authentication
Configurable in `config.toml`.  Limited functionality, but this does
send metadata to gRPC for event authorization.

fixes: https://todo.sr.ht/~gheartsfield/nostr-rs-relay/66
2023-02-15 18:51:40 -06:00
Greg Heartsfield
d0f57aea21 improvement(NIP-40): functions for checking event expiration 2023-02-15 18:47:27 -06:00
Yuval Adam
40abd6858e docs: cleanup location of documentation 2023-02-15 18:43:22 -06:00
Greg Heartsfield
136e41d234 fix: retry event writes if DB is busy 2023-02-15 18:38:34 -06:00
Yuval Adam
35a1973a46 fix: allow older versions of protobuf-compiler to work
Add --experimental_allow_proto3_optional protoc arg in build configs

fixes https://github.com/scsibug/nostr-rs-relay/issues/77
2023-02-14 16:59:41 -06:00
Kieran
1daa25600d fix: postgres tag inserts 2023-02-14 06:33:01 -06:00
Greg Heartsfield
692925942a build: bump version to 0.8.5 2023-02-13 17:53:33 -06:00
Greg Heartsfield
84afd4b64e refactor: whitespace 2023-02-13 17:52:00 -06:00
Greg Heartsfield
46160bb1f9 fix: correct name of gRPC configuration in toml 2023-02-13 17:30:26 -06:00
Greg Heartsfield
2fc9168a38 fix: SQL error with parameterized replaceable events 2023-02-13 17:10:42 -06:00
Greg Heartsfield
01d0d44868 build: bump version to 0.8.4 2023-02-13 09:34:30 -06:00
Greg Heartsfield
93f6337fda fix: upgrade docker image to include OpenSSL 3 2023-02-13 09:33:14 -06:00
28 changed files with 971 additions and 71 deletions

2
.gitignore vendored
View File

@@ -1,2 +1,2 @@
/target
**/target/
nostr.db

3
Cargo.lock generated
View File

@@ -1550,7 +1550,7 @@ checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]]
name = "nostr-rs-relay"
version = "0.8.3"
version = "0.8.8"
dependencies = [
"anyhow",
"async-std",
@@ -1592,6 +1592,7 @@ dependencies = [
"tracing",
"tracing-subscriber 0.2.25",
"tungstenite",
"url",
"uuid",
]

View File

@@ -1,6 +1,6 @@
[package]
name = "nostr-rs-relay"
version = "0.8.3"
version = "0.8.8"
edition = "2021"
authors = ["Greg Heartsfield <scsibug@imap.cc>"]
description = "A relay implementation for the Nostr protocol"
@@ -51,6 +51,7 @@ chrono = "0.4.23"
prometheus = "0.13.3"
indicatif = "0.17.3"
bech32 = "0.9.1"
url = "2.3.1"
[dev-dependencies]
anyhow = "1"

View File

@@ -21,7 +21,7 @@ COPY ./build.rs ./build.rs
RUN rm ./target/release/deps/nostr*relay*
RUN cargo auditable build --release --locked
FROM docker.io/library/debian:bullseye-slim
FROM docker.io/library/debian:bookworm-slim
ARG APP=/usr/src/app
ARG APP_DATA=/usr/src/app/db

View File

@@ -35,6 +35,7 @@ mirrored on [GitHub](https://github.com/scsibug/nostr-rs-relay).
- [ ] NIP-26: [Event Delegation](https://github.com/nostr-protocol/nips/blob/master/26.md) (_implemented, but currently disabled_)
- [x] NIP-28: [Public Chat](https://github.com/nostr-protocol/nips/blob/master/28.md)
- [x] NIP-33: [Parameterized Replaceable Events](https://github.com/nostr-protocol/nips/blob/master/33.md)
- [x] NIP-42: [Authentication of clients to relays](https://github.com/nostr-protocol/nips/blob/master/42.md)
## Quick Start
@@ -139,7 +140,7 @@ settings.
For examples of putting the relay behind a reverse proxy (for TLS
termination, load balancing, and other features), see [Reverse
Proxy](reverse-proxy.md).
Proxy](docs/reverse-proxy.md).
## Dev Channel

View File

@@ -1,4 +1,10 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/nauthz.proto")?;
tonic_build::configure()
.build_server(false)
.protoc_arg("--experimental_allow_proto3_optional")
.compile(
&["proto/nauthz.proto"],
&["proto"],
)?;
Ok(())
}

View File

@@ -16,6 +16,10 @@ description = "A newly created nostr-rs-relay.\n\nCustomize this with your own i
# Administrative contact URI
#contact = "mailto:contact@example.com"
# Favicon location. Relative to the current directory. Assumes an
# ICO format.
#favicon = "favicon.ico"
[diagnostics]
# Enable tokio tracing (for use with tokio-console)
#tracing = false
@@ -56,7 +60,7 @@ description = "A newly created nostr-rs-relay.\n\nCustomize this with your own i
# the URL below. In the event the server is not accessible, events
# will be permitted. The protobuf3 schema used is available in
# `proto/nauthz.proto`.
# event_authorization_server = "http://[::1]:50051"
# event_admission_server = "http://[::1]:50051"
[network]
# Bind to this network address
@@ -136,6 +140,8 @@ reject_future_seconds = 1800
# "35d26e4690cbe1a898af61cc3515661eb5fa763b57bd0b42e45099c8b32fd50f",
# "887645fef0ce0c3c1218d2f5d8e6132a19304cdc57cd20281d082f38cfea0072",
#]
# Enable NIP-42 authentication
#nip42_auth = false
[verified_users]
# NIP-05 verification of users. Can be "enabled" to require NIP-05

View File

@@ -104,7 +104,7 @@ http {
### Nginx Notes
The above configuration was tested on `nginx` `1.18.0` was tested on `Ubuntu 20.04`.
The above configuration was tested on `nginx` `1.18.0` on `Ubuntu` `20.04` and `22.04`
For help installing `nginx` on `Ubuntu`, see [this guide](https://www.digitalocean.com/community/tutorials/how-to-install-nginx-on-ubuntu-20-04).

7
examples/nauthz/build.rs Normal file
View File

@@ -0,0 +1,7 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.build_server(false)
.protoc_arg("--experimental_allow_proto3_optional")
.compile(&["../../proto/nauthz.proto"], &["../../proto"])?;
Ok(())
}

View File

@@ -1,7 +1,7 @@
use tonic::{transport::Server, Request, Response, Status};
use nauthz_grpc::authorization_server::{Authorization, AuthorizationServer};
use nauthz_grpc::{EventReply, EventRequest, Decision};
use nauthz_grpc::{Decision, EventReply, EventRequest};
pub mod nauthz_grpc {
tonic::include_proto!("nauthz");
@@ -14,7 +14,6 @@ pub struct EventAuthz {
#[tonic::async_trait]
impl Authorization for EventAuthz {
async fn event_admit(
&self,
request: Request<EventRequest>,
@@ -22,18 +21,18 @@ impl Authorization for EventAuthz {
let reply;
let req = request.into_inner();
let event = req.event.unwrap();
let content_prefix:String = event.content.chars().take(40).collect();
let content_prefix: String = event.content.chars().take(40).collect();
println!("recvd event, [kind={}, origin={:?}, nip05_domain={:?}, tag_count={}, content_sample={:?}]",
event.kind, req.origin, req.nip05.map(|x| x.domain), event.tags.len(), content_prefix);
// Permit any event with a whitelisted kind
if self.allowed_kinds.contains(&event.kind) {
println!("This looks fine! (kind={})",event.kind);
println!("This looks fine! (kind={})", event.kind);
reply = nauthz_grpc::EventReply {
decision: Decision::Permit as i32,
message: None
message: None,
};
} else {
println!("Blocked! (kind={})",event.kind);
println!("Blocked! (kind={})", event.kind);
reply = nauthz_grpc::EventReply {
decision: Decision::Deny as i32,
message: Some(format!("kind {} not permitted", event.kind)),
@@ -49,7 +48,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// A simple authorization engine that allows kinds 0-3
let checker = EventAuthz {
allowed_kinds: vec![0,1,2,3],
allowed_kinds: vec![0, 1, 2, 3],
};
println!("EventAuthz Server listening on {}", addr);
// Start serving

View File

@@ -1,4 +0,0 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("../proto/nauthz.proto")?;
Ok(())
}

View File

@@ -12,6 +12,7 @@ pub struct Info {
pub description: Option<String>,
pub pubkey: Option<String>,
pub contact: Option<String>,
pub favicon: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -75,6 +76,7 @@ pub struct Limits {
#[allow(unused)]
pub struct Authorization {
pub pubkey_whitelist: Option<Vec<String>>, // If present, only allow these pubkeys to publish events
pub nip42_auth: bool, // if true enables NIP-42 authentication
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -217,6 +219,7 @@ impl Default for Settings {
description: None,
pubkey: None,
contact: None,
favicon: None,
},
diagnostics: Diagnostics { tracing: false },
database: Database {
@@ -250,6 +253,7 @@ impl Default for Settings {
},
authorization: Authorization {
pubkey_whitelist: None, // Allow any address to publish
nip42_auth: false, // Disable NIP-42 authentication
},
verified_users: VerifiedUsers {
mode: VerifiedUsersMode::Disabled,

View File

@@ -1,16 +1,30 @@
//! Client connection state
use crate::close::Close;
use crate::error::Error;
use crate::error::Result;
use crate::subscription::Subscription;
use std::collections::HashMap;
use tracing::{debug, trace};
use uuid::Uuid;
use crate::close::Close;
use crate::conn::Nip42AuthState::{AuthPubkey, Challenge, NoAuth};
use crate::error::Error;
use crate::error::Result;
use crate::event::Event;
use crate::subscription::Subscription;
use crate::utils::{host_str, unix_time};
/// A subscription identifier has a maximum length
const MAX_SUBSCRIPTION_ID_LEN: usize = 256;
/// NIP-42 authentication state
pub enum Nip42AuthState {
/// The client is not authenticated yet
NoAuth,
/// The AUTH challenge sent
Challenge(String),
/// The client is authenticated
AuthPubkey(String),
}
/// State for a client connection
pub struct ClientConn {
/// Client IP (either from socket, or configured proxy header
@@ -21,6 +35,8 @@ pub struct ClientConn {
subscriptions: HashMap<String, Subscription>,
/// Per-connection maximum concurrent subscriptions
max_subs: usize,
/// NIP-42 AUTH
auth: Nip42AuthState,
}
impl Default for ClientConn {
@@ -39,15 +55,18 @@ impl ClientConn {
client_id,
subscriptions: HashMap::new(),
max_subs: 32,
auth: NoAuth,
}
}
#[must_use] pub fn subscriptions(&self) -> &HashMap<String, Subscription> {
#[must_use]
pub fn subscriptions(&self) -> &HashMap<String, Subscription> {
&self.subscriptions
}
/// Check if the given subscription already exists
#[must_use] pub fn has_subscription(&self, sub: &Subscription) -> bool {
#[must_use]
pub fn has_subscription(&self, sub: &Subscription) -> bool {
self.subscriptions.values().any(|x| x == sub)
}
@@ -63,6 +82,22 @@ impl ClientConn {
&self.client_ip_addr
}
#[must_use]
pub fn auth_pubkey(&self) -> Option<&String> {
match &self.auth {
AuthPubkey(pubkey) => Some(pubkey),
_ => None,
}
}
#[must_use]
pub fn auth_challenge(&self) -> Option<&String> {
match &self.auth {
Challenge(pubkey) => Some(pubkey),
_ => None,
}
}
/// Add a new subscription for this connection.
/// # Errors
///
@@ -116,4 +151,79 @@ impl ClientConn {
self.get_client_prefix(),
);
}
pub fn generate_auth_challenge(&mut self) {
self.auth = Challenge(Uuid::new_v4().to_string());
}
pub fn authenticate(&mut self, event: &Event, relay_url: &String) -> Result<()> {
match &self.auth {
Challenge(_) => (),
AuthPubkey(_) => {
// already authenticated
return Ok(())
},
NoAuth => {
// unexpected AUTH request
return Err(Error::AuthFailure);
},
}
match event.validate() {
Ok(_) => {
if event.kind != 22242 {
return Err(Error::AuthFailure);
}
let curr_time = unix_time();
let past_cutoff = curr_time - 600; // 10 minutes
let future_cutoff = curr_time + 600; // 10 minutes
if event.created_at < past_cutoff || event.created_at > future_cutoff {
return Err(Error::AuthFailure);
}
let mut challenge: Option<&String> = None;
let mut relay: Option<&String> = None;
for tag in &event.tags {
if tag.len() == 2 && tag.get(0) == Some(&"challenge".into()) {
challenge = tag.get(1);
}
if tag.len() == 2 && tag.get(0) == Some(&"relay".into()) {
relay = tag.get(1);
}
}
match (challenge, &self.auth) {
(Some(received_challenge), Challenge(sent_challenge)) => {
if received_challenge != sent_challenge {
return Err(Error::AuthFailure);
}
}
(_, _) => {
return Err(Error::AuthFailure);
}
}
match (relay.and_then(|url| host_str(url)), host_str(relay_url)) {
(Some(received_relay), Some(our_relay)) => {
if received_relay != our_relay {
return Err(Error::AuthFailure);
}
}
(_, _) => {
return Err(Error::AuthFailure);
}
}
self.auth = AuthPubkey(event.pubkey.clone());
trace!(
"authenticated pubkey {} (cid: {})",
event.pubkey.chars().take(8).collect::<String>(),
self.get_client_prefix()
);
Ok(())
}
Err(_) => Err(Error::AuthFailure),
}
}
}

View File

@@ -30,6 +30,7 @@ pub struct SubmittedEvent {
pub source_ip: String,
pub origin: Option<String>,
pub user_agent: Option<String>,
pub auth_pubkey: Option<Vec<u8>>,
}
/// Database file
@@ -70,6 +71,8 @@ async fn build_postgres_pool(settings: &Settings, metrics: NostrMetrics) -> Post
// Panic on migration failure
let version = repo.migrate_up().await.unwrap();
info!("Postgres migration completed, at v{}", version);
// startup scheduled tasks
repo.start().await.ok();
repo
}
@@ -187,7 +190,6 @@ pub async fn db_writer(
None
};
// check for NIP-05 verification
if nip05_enabled && validation.is_some() {
match validation.as_ref().unwrap() {
@@ -241,7 +243,7 @@ pub async fn db_writer(
if let Some(ref mut c) = grpc_client {
trace!("checking if grpc permits");
let grpc_start = Instant::now();
let decision_res = c.admit_event(&event, &subm_event.source_ip, subm_event.origin, subm_event.user_agent, nip05_address).await;
let decision_res = c.admit_event(&event, &subm_event.source_ip, subm_event.origin, subm_event.user_agent, nip05_address, subm_event.auth_pubkey).await;
match decision_res {
Ok(decision) => {
if !decision.permitted() {

View File

@@ -68,6 +68,10 @@ pub enum Error {
AuthzError,
#[error("Tonic GRPC error")]
TonicError(tonic::Status),
#[error("Invalid AUTH message")]
AuthFailure,
#[error("I/O Error")]
IoError(std::io::Error),
#[error("Unknown/Undocumented")]
UnknownError,
}
@@ -143,3 +147,9 @@ impl From<tonic::Status> for Error {
Error::TonicError(r)
}
}
impl From<std::io::Error> for Error {
fn from(r: std::io::Error) -> Self {
Error::IoError(r)
}
}

View File

@@ -14,6 +14,8 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::str::FromStr;
use tracing::{debug, info};
use crate::event::EventWrapper::WrappedEvent;
use crate::event::EventWrapper::WrappedAuth;
lazy_static! {
/// Secp256k1 verification instance.
@@ -83,17 +85,27 @@ where
}
}
pub enum EventWrapper {
WrappedEvent(Event),
WrappedAuth(Event)
}
/// Convert network event to parsed/validated event.
impl From<EventCmd> for Result<Event> {
fn from(ec: EventCmd) -> Result<Event> {
impl From<EventCmd> for Result<EventWrapper> {
fn from(ec: EventCmd) -> Result<EventWrapper> {
// ensure command is correct
if ec.cmd == "EVENT" {
ec.event.validate().map(|_| {
let mut e = ec.event;
e.build_index();
e.update_delegation();
e
WrappedEvent(e)
})
} else if ec.cmd == "AUTH" {
// we don't want to validate the event here, because NIP-42 can be disabled
// it will be validated later during the authentication process
Ok(WrappedAuth(ec.event))
} else {
Err(CommandUnknownError)
}
@@ -125,6 +137,28 @@ impl Event {
self.kind >= 20000 && self.kind < 30000
}
/// Is this event currently expired?
pub fn is_expired(&self) -> bool {
if let Some(exp) = self.expiration() {
exp <= unix_time()
} else {
false
}
}
/// Determine the time at which this event should expire
pub fn expiration(&self) -> Option<u64> {
let default = "".to_string();
let dvals:Vec<&String> = self.tags
.iter()
.filter(|x| !x.is_empty())
.filter(|x| x.get(0).unwrap() == "expiration")
.map(|x| x.get(1).unwrap_or(&default)).take(1)
.collect();
let val_first = dvals.get(0);
val_first.and_then(|t| t.parse::<u64>().ok())
}
/// Should this event be replaced with newer timestamps from same author?
#[must_use] pub fn is_replaceable(&self) -> bool {
self.kind == 0 || self.kind == 3 || self.kind == 41 || (self.kind >= 10000 && self.kind < 20000)
@@ -135,8 +169,6 @@ impl Event {
self.kind >= 30000 && self.kind < 40000
}
/// What is the replaceable `d` tag value?
/// Should this event be replaced with newer timestamps from same author, for distinct `d` tag values?
#[must_use] pub fn distinct_param(&self) -> Option<String> {
if self.is_param_replaceable() {
@@ -315,7 +347,7 @@ impl Event {
}
/// Convert event to canonical representation for signing.
fn to_canonical(&self) -> Option<String> {
pub fn to_canonical(&self) -> Option<String> {
// create a JsonValue for each event element
let mut c: Vec<Value> = vec![];
// id must be set to 0
@@ -654,4 +686,85 @@ mod tests {
assert_eq!(event.distinct_param(), Some("".to_string()));
}
#[test]
fn expiring_event_none() {
// regular events do not expire
let mut event = Event::simple_event();
event.kind = 7;
event.tags = vec![
vec!["test".to_string(), "foo".to_string()],
];
assert_eq!(event.expiration(), None);
}
#[test]
fn expiring_event_empty() {
// regular events do not expire
let mut event = Event::simple_event();
event.kind = 7;
event.tags = vec![
vec!["expiration".to_string()],
];
assert_eq!(event.expiration(), None);
}
#[test]
fn expiring_event_future() {
// a normal expiring event
let exp:u64 = 1676264138;
let mut event = Event::simple_event();
event.kind = 1;
event.tags = vec![
vec!["expiration".to_string(), exp.to_string()],
];
assert_eq!(event.expiration(), Some(exp));
}
#[test]
fn expiring_event_negative() {
// expiration set to a negative value (invalid)
let exp:i64 = -90;
let mut event = Event::simple_event();
event.kind = 1;
event.tags = vec![
vec!["expiration".to_string(), exp.to_string()],
];
assert_eq!(event.expiration(), None);
}
#[test]
fn expiring_event_zero() {
// a normal expiring event set to zero
let exp:i64 = 0;
let mut event = Event::simple_event();
event.kind = 1;
event.tags = vec![
vec!["expiration".to_string(), exp.to_string()],
];
assert_eq!(event.expiration(), Some(0));
}
#[test]
fn expiring_event_fraction() {
// expiration is fractional (invalid)
let exp:f64 = 23.334;
let mut event = Event::simple_event();
event.kind = 1;
event.tags = vec![
vec!["expiration".to_string(), exp.to_string()],
];
assert_eq!(event.expiration(), None);
}
#[test]
fn expiring_event_multiple() {
// multiple values, we just take the first
let mut event = Event::simple_event();
event.kind = 1;
event.tags = vec![
vec!["expiration".to_string(), (10).to_string()],
vec!["expiration".to_string(), (20).to_string()],
];
assert_eq!(event.expiration(), Some(10));
}
}

View File

@@ -1,6 +1,6 @@
//! Relay metadata using NIP-11
/// Relay Info
use crate::config;
use crate::config::Settings;
use serde::{Deserialize, Serialize};
pub const CARGO_PKG_VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION");
@@ -27,15 +27,24 @@ pub struct RelayInfo {
}
/// Convert an Info configuration into public Relay Info
impl From<config::Info> for RelayInfo {
fn from(i: config::Info) -> Self {
impl From<Settings> for RelayInfo {
fn from(c: Settings) -> Self {
let mut supported_nips = vec![1, 2, 9, 11, 12, 15, 16, 20, 22, 33, 40, 42];
if c.authorization.nip42_auth {
supported_nips.push(42);
supported_nips.sort();
}
let i = c.info;
RelayInfo {
id: i.relay_url,
name: i.name,
description: i.description,
pubkey: i.pubkey,
contact: i.contact,
supported_nips: Some(vec![1, 2, 9, 11, 12, 15, 16, 20, 22, 33]),
supported_nips: Some(supported_nips),
software: Some("https://git.sr.ht/~gheartsfield/nostr-rs-relay".to_owned()),
version: CARGO_PKG_VERSION.map(std::borrow::ToOwned::to_owned),
}

View File

@@ -76,6 +76,7 @@ impl EventAuthzService {
origin: Option<String>,
user_agent: Option<String>,
nip05: Option<Nip05Name>,
auth_pubkey: Option<Vec<u8>>
) -> Result<Box<dyn AuthzDecision>> {
self.ready_connection().await;
let id_blob = hex::decode(&event.id)?;
@@ -97,7 +98,7 @@ impl EventAuthzService {
ip_addr: Some(ip.to_string()),
origin,
user_agent,
auth_pubkey: None,
auth_pubkey,
nip05: nip05.map(|x| nauthz_grpc::event_request::Nip05Name::from(x)),
})
.await?;

View File

@@ -16,6 +16,7 @@ pub struct EventResult {
pub enum Notice {
Message(String),
EventResult(EventResult),
AuthChallenge(String)
}
impl EventResultStatus {

View File

@@ -15,11 +15,11 @@ use sqlx::Error::RowNotFound;
use crate::hexrange::{hex_range, HexSearch};
use crate::repo::postgres_migration::run_migrations;
use crate::server::NostrMetrics;
use crate::utils::{is_hex, is_lower_hex};
use crate::utils::{is_hex, is_lower_hex, self};
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot::Receiver;
use tracing::log::trace;
use tracing::{debug, error, info};
use tracing::{debug, error, warn, info};
use crate::error;
pub type PostgresPool = sqlx::pool::Pool<Postgres>;
@@ -36,13 +36,52 @@ impl PostgresRepo {
metrics: m,
}
}
}
/// Cleanup expired events on a regular basis
async fn cleanup_expired(conn: PostgresPool, frequency: Duration) -> Result<()> {
tokio::task::spawn(async move {
loop {
tokio::select! {
_ = tokio::time::sleep(frequency) => {
let start = Instant::now();
let exp_res = delete_expired(conn.clone()).await;
match exp_res {
Ok(exp_count) => {
if exp_count > 0 {
info!("removed {} expired events in: {:?}", exp_count, start.elapsed());
}
},
Err(e) => {
warn!("could not remove expired events due to error: {:?}", e);
}
}
}
};
}
});
Ok(())
}
/// One-time deletion of all expired events
async fn delete_expired(conn:PostgresPool) -> Result<u64> {
let mut tx = conn.begin().await?;
let update_count = sqlx::query("DELETE FROM \"event\" WHERE expires_at <= $1;")
.bind(Utc.timestamp_opt(utils::unix_time() as i64, 0).unwrap())
.execute(&mut tx)
.await?.rows_affected();
tx.commit().await?;
Ok(update_count)
}
#[async_trait]
impl NostrRepo for PostgresRepo {
async fn start(&self) -> Result<()> {
info!("not implemented");
// begin a cleanup task for expired events.
cleanup_expired(self.conn.clone(), Duration::from_secs(600)).await?;
Ok(())
}
@@ -66,7 +105,7 @@ impl NostrRepo for PostgresRepo {
// replaceable event or parameterized replaceable event.
if e.is_replaceable() {
let repl_count = sqlx::query(
"SELECT e.id FROM event e WHERE e.pub_key=? AND e.kind=? AND e.created_at >= ? LIMIT 1;")
"SELECT e.id FROM event e WHERE e.pub_key=$1 AND e.kind=$2 AND e.created_at >= $3 LIMIT 1;")
.bind(&pubkey_blob)
.bind(e.kind as i64)
.bind(Utc.timestamp_opt(e.created_at as i64, 0).unwrap())
@@ -106,13 +145,14 @@ impl NostrRepo for PostgresRepo {
// ignore if the event hash is a duplicate.
let mut ins_count = sqlx::query(
r#"INSERT INTO "event"
(id, pub_key, created_at, kind, "content", delegated_by)
VALUES($1, $2, $3, $4, $5, $6)
(id, pub_key, created_at, expires_at, kind, "content", delegated_by)
VALUES($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (id) DO NOTHING"#,
)
.bind(&id_blob)
.bind(&pubkey_blob)
.bind(Utc.timestamp_opt(e.created_at as i64, 0).unwrap())
.bind(e.expiration().and_then(|x| Utc.timestamp_opt(x as i64, 0).latest()))
.bind(e.kind as i64)
.bind(event_str.into_bytes())
.bind(delegator_blob)
@@ -134,20 +174,20 @@ ON CONFLICT (id) DO NOTHING"#,
let tag_val = &tag[1];
// only single-char tags are searchable
let tag_char_opt = single_char_tagname(tag_name);
let query = "INSERT INTO tag (event_id, \"name\", value) VALUES($1, $2, $3) \
ON CONFLICT (event_id, \"name\", value) DO NOTHING";
match &tag_char_opt {
Some(_) => {
// if tag value is lowercase hex;
if is_lower_hex(tag_val) && (tag_val.len() % 2 == 0) {
sqlx::query(query)
sqlx::query("INSERT INTO tag (event_id, \"name\", value, value_hex) VALUES($1, $2, NULL, $3) \
ON CONFLICT (event_id, \"name\", value, value_hex) DO NOTHING")
.bind(&id_blob)
.bind(tag_name)
.bind(hex::decode(tag_val).ok())
.execute(&mut tx)
.await?;
} else {
sqlx::query(query)
sqlx::query("INSERT INTO tag (event_id, \"name\", value, value_hex) VALUES($1, $2, $3, NULL) \
ON CONFLICT (event_id, \"name\", value, value_hex) DO NOTHING")
.bind(&id_blob)
.bind(tag_name)
.bind(tag_val.as_bytes())
@@ -510,6 +550,7 @@ fn query_from_filter(f: &ReqFilter) -> Option<QueryBuilder<Postgres>> {
let mut query = QueryBuilder::new("SELECT e.\"content\", e.created_at FROM \"event\" e WHERE ");
// This tracks whether we need to push a prefix AND before adding another clause
let mut push_and = false;
// Query for "authors", allowing prefix matches
if let Some(auth_vec) = &f.authors {
@@ -707,6 +748,10 @@ fn query_from_filter(f: &ReqFilter) -> Option<QueryBuilder<Postgres>> {
} else {
query.push("e.hidden != 1::bit(1)");
}
// never display expired events
query
.push(" AND (e.expires_at IS NULL OR e.expires_at > ")
.push_bind(Utc.timestamp_opt(utils::unix_time() as i64, 0).unwrap()).push(")");
// Apply per-filter limit to this query.
// The use of a LIMIT implies a DESC order, to capture only the most recent events.

View File

@@ -35,6 +35,7 @@ pub async fn run_migrations(db: &PostgresPool) -> crate::error::Result<usize> {
m002::rebuild_tags(db).await?;
}
run_migration(m003::migration(), db).await;
run_migration(m004::migration(), db).await;
Ok(current_version(db).await as usize)
}
@@ -213,7 +214,7 @@ CREATE INDEX tag_value_hex_idx ON tag USING btree (value_hex);
// insert as BLOB if we can restore it losslessly.
// this means it needs to be even length and lowercase.
if (tagval.len() % 2 == 0) && is_lower_hex(tagval) {
let q = "INSERT INTO tag (event_id, \"name\", value_hex) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;";
let q = "INSERT INTO tag (event_id, \"name\", value, value_hex) VALUES ($1, $2, NULL, $3) ON CONFLICT DO NOTHING;";
sqlx::query(q)
.bind(&event_id)
.bind(tagname)
@@ -221,7 +222,7 @@ CREATE INDEX tag_value_hex_idx ON tag USING btree (value_hex);
.execute(&mut update_tx)
.await?;
} else {
let q = "INSERT INTO tag (event_id, \"name\", value) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;";
let q = "INSERT INTO tag (event_id, \"name\", value, value_hex) VALUES ($1, $2, $3, NULL) ON CONFLICT DO NOTHING;";
sqlx::query(q)
.bind(&event_id)
.bind(tagname)
@@ -250,7 +251,27 @@ mod m003 {
sql: vec![
r#"
-- Add unique constraint on tag
ALTER TABLE tag ADD CONSTRAINT unique_constraint_name UNIQUE (event_id, "name", value);
ALTER TABLE tag ADD CONSTRAINT unique_constraint_name UNIQUE (event_id, "name", value, value_hex);
"#,
],
}
}
}
mod m004 {
use crate::repo::postgres_migration::{Migration, SimpleSqlMigration};
pub const VERSION: i64 = 4;
pub fn migration() -> impl Migration {
SimpleSqlMigration {
serial_number: VERSION,
sql: vec![
r#"
-- Add expiration time for events
ALTER TABLE event ADD COLUMN expires_at timestamp(0) with time zone;
-- Index expiration time
CREATE INDEX event_expires_at_idx ON "event" (expires_at);
"#,
],
}

View File

@@ -1,12 +1,12 @@
//! Event persistence and querying
//use crate::config::SETTINGS;
use crate::config::Settings;
use crate::error::Result;
use crate::error::{Result,Error::SqlError};
use crate::event::{single_char_tagname, Event};
use crate::hexrange::hex_range;
use crate::hexrange::HexSearch;
use crate::repo::sqlite_migration::{STARTUP_SQL,upgrade_db};
use crate::utils::{is_hex};
use crate::utils::{is_hex,unix_time};
use crate::nip05::{Nip05Name, VerificationRecord};
use crate::subscription::{ReqFilter, Subscription};
use crate::server::NostrMetrics;
@@ -135,8 +135,8 @@ impl SqliteRepo {
}
// ignore if the event hash is a duplicate.
let mut ins_count = tx.execute(
"INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, delegated_by, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%s','now'), FALSE);",
params![id_blob, e.created_at, e.kind, pubkey_blob, delegator_blob, event_str]
"INSERT OR IGNORE INTO event (event_hash, created_at, expires_at, kind, author, delegated_by, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, strftime('%s','now'), FALSE);",
params![id_blob, e.created_at, e.expiration(), e.kind, pubkey_blob, delegator_blob, event_str]
)? as u64;
if ins_count == 0 {
// if the event was a duplicate, no need to insert event or
@@ -187,7 +187,7 @@ impl SqliteRepo {
// if this event is parameterized replaceable, remove other events.
if let Some(d_tag) = e.distinct_param() {
let update_count = tx.execute(
"DELETE FROM event WHERE kind=? AND author=? AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=? AND e.author=? AND t.name='d' AND t.value=? ORDER BY created_at DESC LIMIT -1 OFFSET 1);",
"DELETE FROM event WHERE kind=? AND author=? AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=? AND e.author=? AND t.name='d' AND t.value=? ORDER BY t.created_at DESC LIMIT -1 OFFSET 1);",
params![e.kind, pubkey_blob, e.kind, pubkey_blob, d_tag])?;
if update_count > 0 {
info!(
@@ -251,7 +251,11 @@ impl SqliteRepo {
impl NostrRepo for SqliteRepo {
async fn start(&self) -> Result<()> {
db_checkpoint_task(self.maint_pool.clone(), Duration::from_secs(60), self.checkpoint_in_progress.clone()).await
db_checkpoint_task(self.maint_pool.clone(), Duration::from_secs(60),
self.write_in_progress.clone(),
self.checkpoint_in_progress.clone()).await?;
cleanup_expired(self.maint_pool.clone(), Duration::from_secs(600),
self.write_in_progress.clone()).await
}
async fn migrate_up(&self) -> Result<usize> {
@@ -264,6 +268,8 @@ impl NostrRepo for SqliteRepo {
/// Persist event to database
async fn write_event(&self, e: &Event) -> Result<u64> {
let start = Instant::now();
let max_write_attempts = 10;
let mut attempts = 0;
let _write_guard = self.write_in_progress.lock().await;
// spawn a blocking thread
//let mut conn = self.write_pool.get()?;
@@ -271,7 +277,26 @@ impl NostrRepo for SqliteRepo {
let e = e.clone();
let event_count = task::spawn_blocking(move || {
let mut conn = pool.get()?;
SqliteRepo::persist_event(&mut conn, &e)
// this could fail because the database was busy; try
// multiple times before giving up.
loop {
attempts+=1;
let wr = SqliteRepo::persist_event(&mut conn, &e);
match wr {
Err(SqlError(rusqlite::Error::SqliteFailure(e,_))) => {
// this basically means that NIP-05 or another
// writer was using the database between us
// reading and promoting the connection to a
// write lock.
info!("event write failed, DB locked (attempt: {}); sqlite err: {}",
attempts, e.extended_code);
},
_ => {return wr;},
}
if attempts >= max_write_attempts {
return wr;
}
}
}).await?;
self.metrics
.write_events
@@ -488,6 +513,7 @@ impl NostrRepo for SqliteRepo {
let e = hex::decode(event_id).ok();
let n = name.to_owned();
let mut conn = self.write_pool.get()?;
let _write_guard = self.write_in_progress.lock().await;
tokio::task::spawn_blocking(move || {
let tx = conn.transaction()?;
{
@@ -515,6 +541,7 @@ impl NostrRepo for SqliteRepo {
/// Update verification timestamp
async fn update_verification_timestamp(&self, id: u64) -> Result<()> {
let mut conn = self.write_pool.get()?;
let _write_guard = self.write_in_progress.lock().await;
tokio::task::spawn_blocking(move || {
// add some jitter to the verification to prevent everything from stacking up together.
let verif_time = now_jitter(600);
@@ -537,6 +564,7 @@ impl NostrRepo for SqliteRepo {
/// Update verification record as failed
async fn fail_verification(&self, id: u64) -> Result<()> {
let mut conn = self.write_pool.get()?;
let _write_guard = self.write_in_progress.lock().await;
tokio::task::spawn_blocking(move || {
// add some jitter to the verification to prevent everything from stacking up together.
let fail_time = now_jitter(600);
@@ -556,6 +584,7 @@ impl NostrRepo for SqliteRepo {
/// Delete verification record
async fn delete_verification(&self, id: u64) -> Result<()> {
let mut conn = self.write_pool.get()?;
let _write_guard = self.write_in_progress.lock().await;
tokio::task::spawn_blocking(move || {
let tx = conn.transaction()?;
{
@@ -833,6 +862,9 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
}
// never display hidden events
query.push_str(" WHERE hidden!=TRUE");
// never display hidden events
filter_components.push("(expires_at IS NULL OR expires_at > ?)".to_string());
params.push(Box::new(unix_time()));
// build filter component conditions
if !filter_components.is_empty() {
query.push_str(" AND ");
@@ -900,7 +932,7 @@ pub fn build_pool(
}
}
let manager = if settings.database.in_memory {
SqliteConnectionManager::memory()
SqliteConnectionManager::file("file::memory:?cache=shared")
.with_flags(flags)
.with_init(|c| c.execute_batch(STARTUP_SQL))
} else {
@@ -922,8 +954,56 @@ pub fn build_pool(
pool
}
/// Cleanup expired events on a regular basis
async fn cleanup_expired(pool: SqlitePool, frequency: Duration, write_in_progress: Arc<Mutex<u64>>) -> Result<()> {
tokio::task::spawn(async move {
loop {
tokio::select! {
_ = tokio::time::sleep(frequency) => {
if let Ok(mut conn) = pool.get() {
let mut _guard:Option<MutexGuard<u64>> = None;
// take a write lock to prevent event writes
// from proceeding while we are deleting
// events. This isn't necessary, but
// minimizes the chances of forcing event
// persistence to be retried.
_guard = Some(write_in_progress.lock().await);
let start = Instant::now();
let exp_res = tokio::task::spawn_blocking(move || {
delete_expired(&mut conn)
}).await;
match exp_res {
Ok(Ok(count)) => {
if count > 0 {
info!("removed {} expired events in: {:?}", count, start.elapsed());
}
},
_ => {
// either the task or underlying query failed
info!("there was an error cleaning up expired events: {:?}", exp_res);
}
}
}
}
};
}
});
Ok(())
}
/// Execute a query to delete all expired events
pub fn delete_expired(conn: &mut PooledConnection) -> Result<usize> {
let tx = conn.transaction()?;
let update_count = tx.execute(
"DELETE FROM event WHERE expires_at <= ?",
params![unix_time()],
)?;
tx.commit()?;
Ok(update_count)
}
/// Perform database WAL checkpoint on a regular basis
pub async fn db_checkpoint_task(pool: SqlitePool, frequency: Duration, checkpoint_in_progress: Arc<Mutex<u64>>) -> Result<()> {
pub async fn db_checkpoint_task(pool: SqlitePool, frequency: Duration, write_in_progress: Arc<Mutex<u64>>, checkpoint_in_progress: Arc<Mutex<u64>>) -> Result<()> {
// TODO; use acquire_many on the reader semaphore to stop them from interrupting this.
tokio::task::spawn(async move {
// WAL size in pages.
@@ -938,6 +1018,8 @@ pub async fn db_checkpoint_task(pool: SqlitePool, frequency: Duration, checkpoin
tokio::select! {
_ = tokio::time::sleep(frequency) => {
if let Ok(mut conn) = pool.get() {
// block all other writers
let _write_guard = write_in_progress.lock().await;
let mut _guard:Option<MutexGuard<u64>> = None;
// the busy timer will block writers, so don't set
// this any higher than you want max latency for event

View File

@@ -23,7 +23,7 @@ pragma mmap_size = 17179869184; -- cap mmap at 16GB
"##;
/// Latest database version
pub const DB_VERSION: usize = 16;
pub const DB_VERSION: usize = 17;
/// Schema definition
const INIT_SQL: &str = formatcp!(
@@ -43,6 +43,7 @@ id INTEGER PRIMARY KEY,
event_hash BLOB NOT NULL, -- 4-byte hash
first_seen INTEGER NOT NULL, -- when the event was first seen (not authored!) (seconds since 1970)
created_at INTEGER NOT NULL, -- when the event was authored
expires_at INTEGER, -- when the event expires and may be deleted
author BLOB NOT NULL, -- author pubkey
delegated_by BLOB, -- delegator pubkey (NIP-26)
kind INTEGER NOT NULL, -- event kind
@@ -61,6 +62,7 @@ CREATE INDEX IF NOT EXISTS kind_author_index ON event(kind,author);
CREATE INDEX IF NOT EXISTS kind_created_at_index ON event(kind,created_at);
CREATE INDEX IF NOT EXISTS author_created_at_index ON event(author,created_at);
CREATE INDEX IF NOT EXISTS author_kind_index ON event(author,kind);
CREATE INDEX IF NOT EXISTS event_expiration ON event(expires_at);
-- Tag Table
-- Tag values are stored as either a BLOB (if they come in as a
@@ -208,6 +210,9 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<usize> {
if curr_version == 15 {
curr_version = mig_15_to_16(conn)?;
}
if curr_version == 16 {
curr_version = mig_16_to_17(conn)?;
}
if curr_version == DB_VERSION {
info!(
@@ -662,7 +667,7 @@ PRAGMA user_version = 15;
fn mig_15_to_16(conn: &mut PooledConnection) -> Result<usize> {
let count = db_event_count(conn)?;
info!("database schema needs update from 15->16 (this make take a few minutes)");
info!("database schema needs update from 15->16 (this may take a few minutes)");
let upgrade_sql = r##"
DROP TABLE tag;
CREATE TABLE tag (
@@ -729,3 +734,22 @@ CREATE INDEX IF NOT EXISTS tag_covering_index ON tag(name,kind,value,created_at,
info!("database schema upgraded v15 -> v16 in {:?}", start.elapsed());
Ok(16)
}
fn mig_16_to_17(conn: &mut PooledConnection) -> Result<usize> {
info!("database schema needs update from 16->17");
let upgrade_sql = r##"
ALTER TABLE event ADD COLUMN expires_at INTEGER;
CREATE INDEX IF NOT EXISTS event_expiration ON event(expires_at);
PRAGMA user_version = 17;
"##;
match conn.execute_batch(upgrade_sql) {
Ok(()) => {
info!("database schema upgraded v16 -> v17");
}
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be upgraded");
}
}
Ok(17)
}

View File

@@ -7,6 +7,8 @@ use crate::repo::NostrRepo;
use crate::db;
use crate::db::SubmittedEvent;
use crate::error::{Error, Result};
use crate::event::EventWrapper;
use crate::server::EventWrapper::{WrappedAuth, WrappedEvent};
use crate::event::Event;
use crate::event::EventCmd;
use crate::info::RelayInfo;
@@ -30,6 +32,9 @@ use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::convert::Infallible;
use std::fs::File;
use std::io::BufReader;
use std::io::Read;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
@@ -48,6 +53,7 @@ use tungstenite::error::Error as WsError;
use tungstenite::handshake;
use tungstenite::protocol::Message;
use tungstenite::protocol::WebSocketConfig;
use crate::server::Error::CommandUnknownError;
/// Handle arbitrary HTTP requests, including for `WebSocket` upgrades.
#[allow(clippy::too_many_arguments)]
@@ -59,6 +65,7 @@ async fn handle_web_request(
broadcast: Sender<Event>,
event_tx: tokio::sync::mpsc::Sender<SubmittedEvent>,
shutdown: Receiver<()>,
favicon: Option<Vec<u8>>,
registry: Registry,
metrics: NostrMetrics,
) -> Result<Response<Body>, Infallible> {
@@ -157,7 +164,7 @@ async fn handle_web_request(
if mt_str.contains("application/nostr+json") {
// build a relay info response
debug!("Responding to server info request");
let rinfo = RelayInfo::from(settings.info);
let rinfo = RelayInfo::from(settings);
let b = Body::from(serde_json::to_string_pretty(&rinfo).unwrap());
return Ok(Response::builder()
.status(200)
@@ -186,6 +193,23 @@ async fn handle_web_request(
.body(Body::from(buffer))
.unwrap())
}
("/favicon.ico", false) => {
if let Some(favicon_bytes) = favicon {
info!("returning favicon");
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "image/x-icon")
// 1 month cache
.header("Cache-Control", "public, max-age=2419200")
.body(Body::from(favicon_bytes))
.unwrap())
} else {
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from(""))
.unwrap())
}
}
(_, _) => {
//handle any other url
Ok(Response::builder()
@@ -268,6 +292,10 @@ fn create_metrics() -> (Registry, NostrMetrics) {
"nostr_cmd_close_total",
"CLOSE commands",
)).unwrap();
let cmd_auth = IntCounter::with_opts(Opts::new(
"nostr_cmd_auth_total",
"AUTH commands",
)).unwrap();
let disconnects = IntCounterVec::new(
Opts::new("nostr_disconnects_total", "Client disconnects"),
vec!["reason"].as_slice(),
@@ -282,6 +310,7 @@ fn create_metrics() -> (Registry, NostrMetrics) {
registry.register(Box::new(cmd_req.clone())).unwrap();
registry.register(Box::new(cmd_event.clone())).unwrap();
registry.register(Box::new(cmd_close.clone())).unwrap();
registry.register(Box::new(cmd_auth.clone())).unwrap();
registry.register(Box::new(disconnects.clone())).unwrap();
let metrics = NostrMetrics {
query_sub,
@@ -295,10 +324,20 @@ fn create_metrics() -> (Registry, NostrMetrics) {
cmd_req,
cmd_event,
cmd_close,
cmd_auth,
};
(registry,metrics)
}
fn file_bytes(path: &str) -> Result<Vec<u8>> {
let f = File::open(path)?;
let mut reader = BufReader::new(f);
let mut buffer = Vec::new();
// Read file into vector.
reader.read_to_end(&mut buffer)?;
Ok(buffer)
}
/// Start running a Nostr relay server.
pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Result<(), Error> {
trace!("Config: {:?}", settings);
@@ -445,6 +484,12 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
//let pool_monitor = pool.clone();
//tokio::spawn(async move {db::monitor_pool("reader", pool_monitor).await;});
// Read in the favicon if it exists
let favicon = settings.info.favicon.as_ref().and_then(|x| {
info!("reading favicon...");
file_bytes(x).ok()
});
// A `Service` is needed for every connection, so this
// creates one from our `handle_request` function.
let make_svc = make_service_fn(|conn: &AddrStream| {
@@ -454,6 +499,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
let event = event_tx.clone();
let stop = invoke_shutdown.clone();
let settings = settings.clone();
let favicon = favicon.clone();
let registry = registry.clone();
let metrics = metrics.clone();
async move {
@@ -467,6 +513,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
bcast.clone(),
event.clone(),
stop.subscribe(),
favicon.clone(),
registry.clone(),
metrics.clone(),
)
@@ -488,7 +535,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
#[derive(Deserialize, Serialize, Clone, PartialEq, Eq, Debug)]
#[serde(untagged)]
pub enum NostrMessage {
/// An `EVENT` message
/// `EVENT` and `AUTH` messages
EventMsg(EventCmd),
/// A `REQ` message
SubMsg(Subscription),
@@ -528,6 +575,7 @@ fn make_notice_message(notice: &Notice) -> Message {
let json = match notice {
Notice::Message(ref msg) => json!(["NOTICE", msg]),
Notice::EventResult(ref res) => json!(["OK", res.id, res.status.to_bool(), res.msg]),
Notice::AuthChallenge(ref challenge) => json!(["AUTH", challenge]),
};
Message::text(json.to_string())
@@ -616,6 +664,14 @@ async fn nostr_server(
// Measure connections
metrics.connections.inc();
if settings.authorization.nip42_auth {
conn.generate_auth_challenge();
if let Some(challenge) = conn.auth_challenge() {
ws_stream.send(
make_notice_message(&Notice::AuthChallenge(challenge.to_string()))).await.ok();
}
}
loop {
tokio::select! {
_ = shutdown.recv() => {
@@ -729,16 +785,27 @@ async fn nostr_server(
// An EventCmd needs to be validated to be converted into an Event
// handle each type of message
let evid = ec.event_id().to_owned();
let parsed : Result<Event> = Result::<Event>::from(ec);
metrics.cmd_event.inc();
let parsed : Result<EventWrapper> = Result::<EventWrapper>::from(ec);
match parsed {
Ok(e) => {
Ok(WrappedEvent(e)) => {
metrics.cmd_event.inc();
let id_prefix:String = e.id.chars().take(8).collect();
debug!("successfully parsed/validated event: {:?} (cid: {}, kind: {})", id_prefix, cid, e.kind);
// check if the event is too far in the future.
if e.is_valid_timestamp(settings.options.reject_future_seconds) {
// check if event is expired
if e.is_expired() {
let notice = Notice::invalid(e.id, "The event has already expired");
ws_stream.send(make_notice_message(&notice)).await.ok();
// check if the event is too far in the future.
} else if e.is_valid_timestamp(settings.options.reject_future_seconds) {
// Write this to the database.
let submit_event = SubmittedEvent { event: e.clone(), notice_tx: notice_tx.clone(), source_ip: conn.ip().to_string(), origin: client_info.origin.clone(), user_agent: client_info.user_agent.clone()};
let auth_pubkey = conn.auth_pubkey().and_then(|pubkey| hex::decode(&pubkey).ok());
let submit_event = SubmittedEvent {
event: e.clone(),
notice_tx: notice_tx.clone(),
source_ip: conn.ip().to_string(),
origin: client_info.origin.clone(),
user_agent: client_info.user_agent.clone(),
auth_pubkey };
event_tx.send(submit_event).await.ok();
client_published_event_count += 1;
} else {
@@ -750,7 +817,39 @@ async fn nostr_server(
}
}
},
Ok(WrappedAuth(event)) => {
metrics.cmd_auth.inc();
if settings.authorization.nip42_auth {
let id_prefix:String = event.id.chars().take(8).collect();
debug!("successfully parsed auth: {:?} (cid: {})", id_prefix, cid);
match &settings.info.relay_url {
None => {
error!("AUTH command received, but relay_url is not set in the config file (cid: {})", cid);
},
Some(relay) => {
match conn.authenticate(&event, &relay) {
Ok(_) => {
let pubkey = match conn.auth_pubkey() {
Some(k) => k.chars().take(8).collect(),
None => "<unspecified>".to_string(),
};
info!("client is authenticated: (cid: {}, pubkey: {:?})", cid, pubkey);
},
Err(e) => {
info!("authentication error: {} (cid: {})", e, cid);
ws_stream.send(make_notice_message(&Notice::message(format!("Authentication error: {e}")))).await.ok();
},
}
}
}
} else {
let e = CommandUnknownError;
info!("client sent an invalid event (cid: {})", cid);
ws_stream.send(make_notice_message(&Notice::invalid(evid, &format!("{e}")))).await.ok();
}
},
Err(e) => {
metrics.cmd_event.inc();
info!("client sent an invalid event (cid: {})", cid);
ws_stream.send(make_notice_message(&Notice::invalid(evid, &format!("{e}")))).await.ok();
}
@@ -855,5 +954,6 @@ pub struct NostrMetrics {
pub cmd_req: IntCounter, // count of REQ commands received
pub cmd_event: IntCounter, // count of EVENT commands received
pub cmd_close: IntCounter, // count of CLOSE commands received
pub cmd_auth: IntCounter, // count of AUTH commands received
}

View File

@@ -1,6 +1,7 @@
//! Common utility functions
use bech32::FromBase32;
use std::time::SystemTime;
use url::Url;
/// Seconds since 1970.
#[must_use] pub fn unix_time() -> u64 {
@@ -33,6 +34,10 @@ pub fn nip19_to_hex(s: &str) -> Result<String, bech32::Error> {
})
}
pub fn host_str(url: &String) -> Option<String> {
Url::parse(url).ok().and_then(|u| u.host_str().map(|s| s.to_string()))
}
#[cfg(test)]
mod tests {
use super::*;

356
tests/conn.rs Normal file
View File

@@ -0,0 +1,356 @@
#[cfg(test)]
mod tests {
use bitcoin_hashes::hex::ToHex;
use bitcoin_hashes::sha256;
use bitcoin_hashes::Hash;
use secp256k1::rand;
use secp256k1::{KeyPair, Secp256k1, XOnlyPublicKey};
use nostr_rs_relay::conn::ClientConn;
use nostr_rs_relay::error::Error;
use nostr_rs_relay::event::Event;
use nostr_rs_relay::utils::unix_time;
const RELAY: &str = "wss://nostr.example.com/";
#[test]
fn test_generate_auth_challenge() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let last_auth_challenge = client_conn.auth_challenge().cloned();
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_ne!(
client_conn.auth_challenge().unwrap(),
&last_auth_challenge.unwrap()
);
assert_eq!(client_conn.auth_pubkey(), None);
}
#[test]
fn test_authenticate_with_valid_event() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let challenge = client_conn.auth_challenge().unwrap();
let event = auth_event(challenge);
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Ok(())));
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), Some(&event.pubkey));
}
#[test]
fn test_fail_to_authenticate_in_invalid_state() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let event = auth_event(&"challenge".into());
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Err(Error::AuthFailure)));
}
#[test]
fn test_authenticate_when_already_authenticated() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let challenge = client_conn.auth_challenge().unwrap().clone();
let event = auth_event(&challenge);
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Ok(())));
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), Some(&event.pubkey));
let event1 = auth_event(&challenge);
let result1 = client_conn.authenticate(&event1, &RELAY.into());
assert!(matches!(result1, Ok(())));
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), Some(&event.pubkey));
assert_ne!(client_conn.auth_pubkey(), Some(&event1.pubkey));
}
#[test]
fn test_fail_to_authenticate_with_invalid_event() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let challenge = client_conn.auth_challenge().unwrap();
let mut event = auth_event(challenge);
event.sig = event.sig.chars().rev().collect::<String>();
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Err(Error::AuthFailure)));
}
#[test]
fn test_fail_to_authenticate_with_invalid_event_kind() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let challenge = client_conn.auth_challenge().unwrap();
let event = auth_event_with_kind(challenge, 9999999999999999);
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Err(Error::AuthFailure)));
}
#[test]
fn test_fail_to_authenticate_with_expired_timestamp() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let challenge = client_conn.auth_challenge().unwrap();
let event = auth_event_with_created_at(challenge, unix_time() - 1200); // 20 minutes
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Err(Error::AuthFailure)));
}
#[test]
fn test_fail_to_authenticate_with_future_timestamp() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let challenge = client_conn.auth_challenge().unwrap();
let event = auth_event_with_created_at(challenge, unix_time() + 1200); // 20 minutes
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Err(Error::AuthFailure)));
}
#[test]
fn test_fail_to_authenticate_without_tags() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let event = auth_event_without_tags();
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Err(Error::AuthFailure)));
}
#[test]
fn test_fail_to_authenticate_without_challenge() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let event = auth_event_without_challenge();
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Err(Error::AuthFailure)));
}
#[test]
fn test_fail_to_authenticate_without_relay() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let challenge = client_conn.auth_challenge().unwrap();
let event = auth_event_without_relay(challenge);
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Err(Error::AuthFailure)));
}
#[test]
fn test_fail_to_authenticate_with_invalid_challenge() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let event = auth_event(&"invalid challenge".into());
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Err(Error::AuthFailure)));
}
#[test]
fn test_fail_to_authenticate_with_invalid_relay() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let challenge = client_conn.auth_challenge().unwrap();
let event = auth_event_with_relay(challenge, &"xyz".into());
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Err(Error::AuthFailure)));
}
fn auth_event(challenge: &String) -> Event {
create_auth_event(Some(challenge), Some(&RELAY.into()), 22242, unix_time())
}
fn auth_event_with_kind(challenge: &String, kind: u64) -> Event {
create_auth_event(Some(challenge), Some(&RELAY.into()), kind, unix_time())
}
fn auth_event_with_created_at(challenge: &String, created_at: u64) -> Event {
create_auth_event(Some(challenge), Some(&RELAY.into()), 22242, created_at)
}
fn auth_event_without_challenge() -> Event {
create_auth_event(None, Some(&RELAY.into()), 22242, unix_time())
}
fn auth_event_without_relay(challenge: &String) -> Event {
create_auth_event(Some(challenge), None, 22242, unix_time())
}
fn auth_event_without_tags() -> Event {
create_auth_event(None, None, 22242, unix_time())
}
fn auth_event_with_relay(challenge: &String, relay: &String) -> Event {
create_auth_event(Some(challenge), Some(relay), 22242, unix_time())
}
fn create_auth_event(
challenge: Option<&String>,
relay: Option<&String>,
kind: u64,
created_at: u64,
) -> Event {
let secp = Secp256k1::new();
let key_pair = KeyPair::new(&secp, &mut rand::thread_rng());
let public_key = XOnlyPublicKey::from_keypair(&key_pair);
let mut tags: Vec<Vec<String>> = vec![];
if let Some(c) = challenge {
let tag = vec!["challenge".into(), c.into()];
tags.push(tag);
}
if let Some(r) = relay {
let tag = vec!["relay".into(), r.into()];
tags.push(tag);
}
let mut event = Event {
id: "0".to_owned(),
pubkey: public_key.to_hex(),
delegated_by: None,
created_at: created_at,
kind: kind,
tags: tags,
content: "".to_owned(),
sig: "0".to_owned(),
tagidx: None,
};
let c = event.to_canonical().unwrap();
let digest: sha256::Hash = sha256::Hash::hash(c.as_bytes());
let msg = secp256k1::Message::from_slice(digest.as_ref()).unwrap();
let sig = secp.sign_schnorr(&msg, &key_pair);
event.id = format!("{digest:x}");
event.sig = sig.to_hex();
event
}
}