nostr-rs-relay/src/db.rs
thesimplekid 4ff77ab537 improvement: config to disable dm/invoice creation
While sending dms to users who are not signed up
but have attempted to publish events, it is now
disabled by default. This stops the creation of
extra invoices for pubkeys that may have no
intention of signing up for the relay. It also
reduced the number of dms that are created.

Signed-off-by: Greg Heartsfield <scsibug@imap.cc>
2023-11-23 13:22:41 -06:00

482 lines
19 KiB
Rust

//! Event persistence and querying
use crate::config::Settings;
use crate::error::{Error, Result};
use crate::event::Event;
use crate::nauthz;
use crate::notice::Notice;
use crate::payment::PaymentMessage;
use crate::repo::postgres::{PostgresPool, PostgresRepo};
use crate::repo::sqlite::SqliteRepo;
use crate::repo::NostrRepo;
use crate::server::NostrMetrics;
use governor::clock::Clock;
use governor::{Quota, RateLimiter};
use log::LevelFilter;
use nostr::key::FromPkStr;
use nostr::key::Keys;
use r2d2;
use sqlx::pool::PoolOptions;
use sqlx::postgres::PgConnectOptions;
use sqlx::ConnectOptions;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use tracing::{debug, info, trace, warn};
pub type SqlitePool = r2d2::Pool<r2d2_sqlite::SqliteConnectionManager>;
pub type PooledConnection = r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>;
/// Events submitted from a client, with a return channel for notices
pub struct SubmittedEvent {
pub event: Event,
pub notice_tx: tokio::sync::mpsc::Sender<Notice>,
pub source_ip: String,
pub origin: Option<String>,
pub user_agent: Option<String>,
pub auth_pubkey: Option<Vec<u8>>,
}
/// Database file
pub const DB_FILE: &str = "nostr.db";
/// Build repo
/// # Panics
///
/// Will panic if the pool could not be created.
pub async fn build_repo(settings: &Settings, metrics: NostrMetrics) -> Arc<dyn NostrRepo> {
match settings.database.engine.as_str() {
"sqlite" => Arc::new(build_sqlite_pool(settings, metrics).await),
"postgres" => Arc::new(build_postgres_pool(settings, metrics).await),
_ => panic!("Unknown database engine"),
}
}
async fn build_sqlite_pool(settings: &Settings, metrics: NostrMetrics) -> SqliteRepo {
let repo = SqliteRepo::new(settings, metrics);
repo.start().await.ok();
repo.migrate_up().await.ok();
repo
}
async fn build_postgres_pool(settings: &Settings, metrics: NostrMetrics) -> PostgresRepo {
let mut options: PgConnectOptions = settings.database.connection.as_str().parse().unwrap();
options.log_statements(LevelFilter::Debug);
options.log_slow_statements(LevelFilter::Warn, Duration::from_secs(60));
let pool: PostgresPool = PoolOptions::new()
.max_connections(settings.database.max_conn)
.min_connections(settings.database.min_conn)
.idle_timeout(Duration::from_secs(60))
.connect_with(options)
.await
.unwrap();
let write_pool: PostgresPool = match &settings.database.connection_write {
Some(cfg_write) => {
let mut options_write: PgConnectOptions = cfg_write.as_str().parse().unwrap();
options_write.log_statements(LevelFilter::Debug);
options_write.log_slow_statements(LevelFilter::Warn, Duration::from_secs(60));
PoolOptions::new()
.max_connections(settings.database.max_conn)
.min_connections(settings.database.min_conn)
.idle_timeout(Duration::from_secs(60))
.connect_with(options_write)
.await
.unwrap()
}
None => pool.clone(),
};
let repo = PostgresRepo::new(pool, write_pool, metrics);
// Panic on migration failure
let version = repo.migrate_up().await.unwrap();
info!("Postgres migration completed, at v{}", version);
// startup scheduled tasks
repo.start().await.ok();
repo
}
/// Spawn a database writer that persists events to the `SQLite` store.
pub async fn db_writer(
repo: Arc<dyn NostrRepo>,
settings: Settings,
mut event_rx: tokio::sync::mpsc::Receiver<SubmittedEvent>,
bcast_tx: tokio::sync::broadcast::Sender<Event>,
metadata_tx: tokio::sync::broadcast::Sender<Event>,
payment_tx: tokio::sync::broadcast::Sender<PaymentMessage>,
mut shutdown: tokio::sync::broadcast::Receiver<()>,
) -> Result<()> {
// are we performing NIP-05 checking?
let nip05_active = settings.verified_users.is_active();
// are we requriing NIP-05 user verification?
let nip05_enabled = settings.verified_users.is_enabled();
let pay_to_relay_enabled = settings.pay_to_relay.enabled;
let cost_per_event = settings.pay_to_relay.cost_per_event;
debug!("Pay to relay: {}", pay_to_relay_enabled);
//upgrade_db(&mut pool.get()?)?;
// Make a copy of the whitelist
let whitelist = &settings.authorization.pubkey_whitelist.clone();
// get rate limit settings
let rps_setting = settings.limits.messages_per_sec;
let mut most_recent_rate_limit = Instant::now();
let mut lim_opt = None;
let clock = governor::clock::QuantaClock::default();
if let Some(rps) = rps_setting {
if rps > 0 {
info!("Enabling rate limits for event creation ({}/sec)", rps);
let quota = core::num::NonZeroU32::new(rps * 60).unwrap();
lim_opt = Some(RateLimiter::direct(Quota::per_minute(quota)));
}
}
// create a client if GRPC is enabled.
// Check with externalized event admitter service, if one is defined.
let mut grpc_client = if let Some(svr) = settings.grpc.event_admission_server {
Some(nauthz::EventAuthzService::connect(&svr).await)
} else {
None
};
//let gprc_client = settings.grpc.event_admission_server.map(|s| {
// event_admitter_connect(&s);
// });
loop {
if shutdown.try_recv().is_ok() {
info!("shutting down database writer");
break;
}
// call blocking read on channel
let next_event = event_rx.recv().await;
// if the channel has closed, we will never get work
if next_event.is_none() {
break;
}
// track if an event write occurred; this is used to
// update the rate limiter
let mut event_write = false;
let subm_event = next_event.unwrap();
let event = subm_event.event;
let notice_tx = subm_event.notice_tx;
// Check that event kind isn't blacklisted
let kinds_blacklist = &settings.limits.event_kind_blacklist.clone();
if let Some(event_kind_blacklist) = kinds_blacklist {
if event_kind_blacklist.contains(&event.kind) {
debug!(
"rejecting event: {}, blacklisted kind: {}",
&event.get_event_id_prefix(),
&event.kind
);
notice_tx
.try_send(Notice::blocked(event.id, "event kind is blocked by relay"))
.ok();
continue;
}
}
// Check that event kind isn't allowlisted
let kinds_allowlist = &settings.limits.event_kind_allowlist.clone();
if let Some(event_kind_allowlist) = kinds_allowlist {
if !event_kind_allowlist.contains(&event.kind) {
debug!(
"rejecting event: {}, allowlist kind: {}",
&event.get_event_id_prefix(),
&event.kind
);
notice_tx
.try_send(Notice::blocked(event.id, "event kind is blocked by relay"))
.ok();
continue;
}
}
// Set to none until balance is got from db
// Will stay none if user in whitelisted and does not have to pay to post
// When pay to relay is enabled the whitelist is not a list of who can post
// It is a list of who can post for free
let mut user_balance: Option<u64> = None;
if !pay_to_relay_enabled {
// check if this event is authorized.
if let Some(allowed_addrs) = whitelist {
// TODO: incorporate delegated pubkeys
// if the event address is not in allowed_addrs.
if !allowed_addrs.contains(&event.pubkey) {
debug!(
"rejecting event: {}, unauthorized author",
event.get_event_id_prefix()
);
notice_tx
.try_send(Notice::blocked(
event.id,
"pubkey is not allowed to publish to this relay",
))
.ok();
continue;
}
}
} else {
// If the user is on whitelist there is no need to check if the user is admitted or has balance to post
if whitelist.is_none()
|| (whitelist.is_some() && !whitelist.as_ref().unwrap().contains(&event.pubkey))
{
let key = Keys::from_pk_str(&event.pubkey).unwrap();
match repo.get_account_balance(&key).await {
Ok((user_admitted, balance)) => {
// Checks to make sure user is admitted
if !user_admitted {
debug!("user: {}, is not admitted", &event.pubkey);
// If the user is in DB but not admitted
// Send meeage to payment thread to check if outstanding invoice has been paid
payment_tx
.send(PaymentMessage::CheckAccount(event.pubkey))
.ok();
notice_tx
.try_send(Notice::blocked(event.id, "User is not admitted"))
.ok();
continue;
}
// Checks that user has enough balance to post
// TODO: this should send an invoice to user to top up
if balance < cost_per_event {
debug!("user: {}, does not have a balance", &event.pubkey,);
notice_tx
.try_send(Notice::blocked(event.id, "Insufficient balance"))
.ok();
continue;
}
user_balance = Some(balance);
debug!("User balance: {:?}", user_balance);
}
Err(
Error::SqlError(rusqlite::Error::QueryReturnedNoRows)
| Error::SqlxError(sqlx::Error::RowNotFound),
) => {
// User does not exist
info!("Unregistered user");
if settings.pay_to_relay.sign_ups && settings.pay_to_relay.direct_message {
payment_tx
.send(PaymentMessage::NewAccount(event.pubkey))
.ok();
}
let msg = "Pubkey not registered";
notice_tx.try_send(Notice::error(event.id, msg)).ok();
continue;
}
Err(err) => {
warn!("Error checking admission status: {:?}", err);
let msg = "relay experienced an error checking your admission status";
notice_tx.try_send(Notice::error(event.id, msg)).ok();
// Other error
continue;
}
}
}
}
// send any metadata events to the NIP-05 verifier
if nip05_active && event.is_kind_metadata() {
// we are sending this prior to even deciding if we
// persist it. this allows the nip05 module to
// inspect it, update if necessary, or persist a new
// event and broadcast it itself.
metadata_tx.send(event.clone()).ok();
}
// get a validation result for use in verification and GPRC
let validation = if nip05_active {
Some(repo.get_latest_user_verification(&event.pubkey).await)
} else {
None
};
// check for NIP-05 verification
if nip05_enabled && validation.is_some() {
match validation.as_ref().unwrap() {
Ok(uv) => {
if uv.is_valid(&settings.verified_users) {
info!(
"new event from verified author ({:?},{:?})",
uv.name.to_string(),
event.get_author_prefix()
);
} else {
info!(
"rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)",
uv.name.to_string(),
event.get_author_prefix()
);
notice_tx
.try_send(Notice::blocked(
event.id,
"NIP-05 verification is no longer valid (expired/wrong domain)",
))
.ok();
continue;
}
}
Err(
Error::SqlError(rusqlite::Error::QueryReturnedNoRows)
| Error::SqlxError(sqlx::Error::RowNotFound),
) => {
debug!(
"no verification records found for pubkey: {:?}",
event.get_author_prefix()
);
notice_tx
.try_send(Notice::blocked(
event.id,
"NIP-05 verification needed to publish events",
))
.ok();
continue;
}
Err(e) => {
warn!("checking nip05 verification status failed: {:?}", e);
continue;
}
}
}
// nip05 address
let nip05_address: Option<crate::nip05::Nip05Name> =
validation.and_then(|x| x.ok().map(|y| y.name));
// GRPC check
if let Some(ref mut c) = grpc_client {
trace!("checking if grpc permits");
let grpc_start = Instant::now();
let decision_res = c
.admit_event(
&event,
&subm_event.source_ip,
subm_event.origin,
subm_event.user_agent,
nip05_address,
subm_event.auth_pubkey,
)
.await;
match decision_res {
Ok(decision) => {
if !decision.permitted() {
// GPRC returned a decision to reject this event
info!(
"GRPC rejected event: {:?} (kind: {}) from: {:?} in: {:?} (IP: {:?})",
event.get_event_id_prefix(),
event.kind,
event.get_author_prefix(),
grpc_start.elapsed(),
subm_event.source_ip
);
notice_tx
.try_send(Notice::blocked(
event.id,
&decision.message().unwrap_or_default(),
))
.ok();
continue;
}
}
Err(e) => {
warn!("GRPC server error: {:?}", e);
}
}
}
// TODO: cache recent list of authors to remove a DB call.
let start = Instant::now();
if event.is_ephemeral() {
bcast_tx.send(event.clone()).ok();
debug!(
"published ephemeral event: {:?} from: {:?} in: {:?}",
event.get_event_id_prefix(),
event.get_author_prefix(),
start.elapsed()
);
event_write = true;
// send OK message
notice_tx.try_send(Notice::saved(event.id)).ok();
} else {
match repo.write_event(&event).await {
Ok(updated) => {
if updated == 0 {
trace!("ignoring duplicate or deleted event");
notice_tx.try_send(Notice::duplicate(event.id)).ok();
} else {
info!(
"persisted event: {:?} (kind: {}) from: {:?} in: {:?} (IP: {:?})",
event.get_event_id_prefix(),
event.kind,
event.get_author_prefix(),
start.elapsed(),
subm_event.source_ip,
);
event_write = true;
// send this out to all clients
bcast_tx.send(event.clone()).ok();
notice_tx.try_send(Notice::saved(event.id)).ok();
}
}
Err(err) => {
warn!("event insert failed: {:?}", err);
let msg = "relay experienced an error trying to publish the latest event";
notice_tx.try_send(Notice::error(event.id, msg)).ok();
}
}
}
// use rate limit, if defined, and if an event was actually written.
if event_write {
// If pay to relay is diabaled or the cost per event is 0
// No need to update user balance
if pay_to_relay_enabled && cost_per_event > 0 {
// If the user balance is some, user was not on whitelist
// Their balance should be reduced by the cost per event
if let Some(_balance) = user_balance {
let pubkey = Keys::from_pk_str(&event.pubkey)?;
repo.update_account_balance(&pubkey, false, cost_per_event)
.await?;
}
}
if let Some(ref lim) = lim_opt {
if let Err(n) = lim.check() {
let wait_for = n.wait_time_from(clock.now());
// check if we have recently logged rate
// limits, but print out a message only once
// per second.
if most_recent_rate_limit.elapsed().as_secs() > 10 {
warn!(
"rate limit reached for event creation (sleep for {:?}) (suppressing future messages for 10 seconds)",
wait_for
);
// reset last rate limit message
most_recent_rate_limit = Instant::now();
}
// block event writes, allowing them to queue up
thread::sleep(wait_for);
continue;
}
}
}
}
info!("database connection closed");
Ok(())
}
/// Serialized event associated with a specific subscription request.
#[derive(PartialEq, Eq, Debug, Clone)]
pub struct QueryResult {
/// Subscription identifier
pub sub_id: String,
/// Serialized event
pub event: String,
}