feat(NIP-111): pay to relay (experimental)

This commit is contained in:
thesimplekid 2023-02-25 15:38:26 -06:00 committed by Greg Heartsfield
parent 164603dedd
commit c0158af18b
19 changed files with 1935 additions and 28 deletions

2
.gitignore vendored
View File

@ -1,2 +1,4 @@
**/target/
nostr.db
nostr.db-*
justfile

143
Cargo.lock generated
View File

@ -8,6 +8,17 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "aes"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "433cfd6710c9986c576a25ca913c39d66a6474107b406f34f91d4a8923395241"
dependencies = [
"cfg-if",
"cipher",
"cpufeatures",
]
[[package]]
name = "ahash"
version = "0.4.7"
@ -286,6 +297,18 @@ version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"
[[package]]
name = "bitcoin"
version = "0.29.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0694ea59225b0c5f3cb405ff3f670e4828358ed26aec49dc352f730f0cb1a8a3"
dependencies = [
"bech32",
"bitcoin_hashes 0.11.0",
"secp256k1 0.24.3",
"serde",
]
[[package]]
name = "bitcoin_hashes"
version = "0.10.0"
@ -295,6 +318,15 @@ dependencies = [
"serde",
]
[[package]]
name = "bitcoin_hashes"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90064b8dee6815a6470d60bad07bbbaee885c0e12d04177138fa3291a01b7bc4"
dependencies = [
"serde",
]
[[package]]
name = "bitflags"
version = "1.3.2"
@ -310,6 +342,15 @@ dependencies = [
"generic-array",
]
[[package]]
name = "block-padding"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a90ec2df9600c28a01c56c4784c9207a96d2451833aeceb8cc97e4c9548bb78"
dependencies = [
"generic-array",
]
[[package]]
name = "blocking"
version = "1.3.0"
@ -342,6 +383,15 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be"
[[package]]
name = "cbc"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6"
dependencies = [
"cipher",
]
[[package]]
name = "cc"
version = "1.0.79"
@ -354,6 +404,12 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "checked_int_cast"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17cc5e6b5ab06331c33589842070416baa137e8b0eb912b008cfd4a78ada7919"
[[package]]
name = "chrono"
version = "0.4.23"
@ -369,6 +425,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "cipher"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1873270f8f7942c191139cb8a40fd228da6c3fd2fc376d7e92d47aa14aeb59e"
dependencies = [
"crypto-common",
"inout",
]
[[package]]
name = "clap"
version = "4.1.4"
@ -973,8 +1039,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31"
dependencies = [
"cfg-if",
"js-sys",
"libc",
"wasi 0.11.0+wasi-snapshot-preview1",
"wasm-bindgen",
]
[[package]]
@ -1282,6 +1350,16 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "inout"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5"
dependencies = [
"block-padding",
"generic-array",
]
[[package]]
name = "instant"
version = "0.1.12"
@ -1289,6 +1367,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
dependencies = [
"cfg-if",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]]
@ -1548,6 +1629,27 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]]
name = "nostr"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28f6282699b1a163c0ac3f12cc1297974d0f60602fa057a3e047a6439ed741b0"
dependencies = [
"aes",
"base64 0.21.0",
"bitcoin",
"cbc",
"getrandom",
"instant",
"log",
"once_cell",
"regex",
"serde",
"serde_json",
"thiserror",
"url",
]
[[package]]
name = "nostr-rs-relay"
version = "0.8.8"
@ -1556,7 +1658,7 @@ dependencies = [
"async-std",
"async-trait",
"bech32",
"bitcoin_hashes",
"bitcoin_hashes 0.10.0",
"chrono",
"clap",
"config",
@ -1572,15 +1674,17 @@ dependencies = [
"indicatif",
"lazy_static",
"nonzero_ext",
"nostr",
"parse_duration",
"prometheus",
"prost",
"qrcode",
"r2d2",
"r2d2_sqlite",
"rand 0.8.5",
"regex",
"rusqlite",
"secp256k1",
"secp256k1 0.21.3",
"serde",
"serde_json",
"sqlx",
@ -2076,6 +2180,15 @@ version = "2.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
[[package]]
name = "qrcode"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16d2f1455f3630c6e5107b4f2b94e74d76dea80736de0981fd27644216cff57f"
dependencies = [
"checked_int_cast",
]
[[package]]
name = "quanta"
version = "0.9.3"
@ -2475,9 +2588,21 @@ version = "0.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c42e6f1735c5f00f51e43e28d6634141f2bcad10931b2609ddd74a86d751260"
dependencies = [
"bitcoin_hashes",
"bitcoin_hashes 0.10.0",
"rand 0.6.5",
"secp256k1-sys",
"secp256k1-sys 0.4.2",
"serde",
]
[[package]]
name = "secp256k1"
version = "0.24.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b1629c9c557ef9b293568b338dddfc8208c98a18c59d722a9d53f859d9c9b62"
dependencies = [
"bitcoin_hashes 0.11.0",
"rand 0.8.5",
"secp256k1-sys 0.6.1",
"serde",
]
@ -2490,6 +2615,15 @@ dependencies = [
"cc",
]
[[package]]
name = "secp256k1-sys"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83080e2c2fc1006e625be82e5d1eb6a43b7fd9578b617fcc55814daf286bba4b"
dependencies = [
"cc",
]
[[package]]
name = "security-framework"
version = "2.8.2"
@ -3254,6 +3388,7 @@ dependencies = [
"form_urlencoded",
"idna",
"percent-encoding",
"serde",
]
[[package]]

View File

@ -52,6 +52,8 @@ prometheus = "0.13.3"
indicatif = "0.17.3"
bech32 = "0.9.1"
url = "2.3.1"
qrcode = { version = "0.12.0", default-features = false, features = ["svg"] }
nostr = { version = "0.18.0", default-features = false, features = ["base", "nip04", "nip19"] }
[dev-dependencies]
anyhow = "1"

View File

@ -52,6 +52,7 @@ description = "A newly created nostr-rs-relay.\n\nCustomize this with your own i
# sqlite.
#connection = "postgresql://postgres:nostr@localhost:7500/nostr"
[grpc]
# gRPC interfaces for externalized decisions and other extensions to
# functionality.
@ -173,3 +174,37 @@ reject_future_seconds = 1800
# How many consecutive failed checks before we give up on verifying
# this author.
#max_consecutive_failures = 20
[pay_to_relay]
# Enable pay to relay
enabled = true
# The cost to be admitted to relay
admission_cost = 1000
# The cost in sats per post
cost_per_event = 0
# Url of lnbits api
node_url = "<node url>"
# LNBits api secret
api_secret = "<ln bits api>"
# Terms of service
terms_message = """
This service (and supporting services) are provided "as is", without warranty of any kind, express or implied.
By using this service, you agree:
* Not to engage in spam or abuse the relay service
* Not to disseminate illegal content
* That requests to delete content cannot be guaranteed
* To use the service in compliance with all applicable laws
* To grant necessary rights to your content for unlimited time
* To be of legal age and have capacity to use this service
* That the service may be terminated at any time without notice
* That the content you publish may be removed at any time without notice
* To have your IP address collected to detect abuse or misuse
* To cooperate with the relay to combat abuse or misuse
* You may be exposed to content that you might find triggering or distasteful
* The relay operator is not liable for content produced by users of the relay
"""
# Whether or not new sign ups should be allowed
sign_ups = true
secret_key = "<nostr nsec>"

85
docs/pay-to-relay.md Normal file
View File

@ -0,0 +1,85 @@
# Pay to Relay Design Document
The relay with use payment as a form of spam prevention. In order to post to the relay a user must pay a set rate. There is also the option to require a payment for each note posted to the relay. There is no cost to read from the relay.
## Configuration
Currently, [LNBits](https://github.com/lnbits/lnbits) is implemented as the payment processor. LNBits exposes a simple API for creating invoices, to use this API create a wallet and on the right side find "API info" you will need to add the invoice/read key to this relays config file.
The below configuration will need to be added to config.toml
```
[pay_to_relay]
# Enable pay to relay
enabled = true
# The cost to be admitted to relay
admission_cost = 1000
# The cost in sats per post
cost_per_event = 0
# Url of lnbits api
node_url = "https://<IP of node>:5001/api/v1/payments"
# LNBits api secret
api_secret = "<LNbits api key>"
# Terms of service
terms_message = """This service ....
"""
# Whether or not new sign ups should be allowed
sign_ups = true
secret_key = "<nostr secret key to send dms>"
```
The LNBits instance must have a signed HTTPS a self signed certificate will not work.
## Design Overview
### Concepts
All authors are initially not admitted to write to the relay. There are two ways to gain access write to the relay. The first is by attempting to post the the relay, upon receiving an event from an author that is not admitted, the relay will send a direct message including the terms of service of the relay and a lighting invoice for the admission cost. Once this invoice is payed the author can write to the relay. For this method to work the author must be reading from the relay. An author can also pay and accept the terms of service via a webpage `https://<relay-url>/join`.
## Design Details
Authors are stored in a dedicated table. This tracks:
* `pubkey`
* `is_admitted` whether on no the admission invoice has been paid, accepting the terms of service.
* `balance` the current balance in sats of the author, used if there is a cost per post
* `tos_accepted_at` the timestamp of when the author accepted the tos
Invoice information is stored in a dedicated table. This tracks:
* `payment_hash` the payment hash of the lighting invoice
* `pubkey` of the author the invoice is issued to
* `invoice` bolt11 invoice
* `amount` in sats
* `status` (Paid/Unpaid/Expired)
* `description`
* `created_at` timestamp of creation
* `confirmed_at` timestamp of payment
### Event Handling
If "pay to relay" is enabled, all incoming events are evaluated to determine whether the author is on the relay's whitelist or if they have paid the admission fee and accepted the terms. If "pay per note" is enabled, there is an additional check to ensure that the author has enough balance, which is then reduced by the cost per note. If the author is on the whitelist, this balance check is not necessary.
### Integration
We have an existing database writer thread, which receives events and
attempts to persist them to disk. Once validated and persisted, these
events are broadcast to all subscribers.
When "pay to relay" is enabled, the writer must check if the author is admitted to post. If the author is not admitted to post the event is forwarded to the payment module. Where an invoice is generated, persisted and broadcast as an direct message to the author.
### Threat Scenarios
Some of these mitigation's are fully implemented, others are documented
simply to demonstrate a mitigation is possible.
### Sign up Spamming
*Threat*: An attacker generates a large number of new pubkeys publishing to the relays. Causing a large number of new invoices to be created for each new pubkey.
*Mitigation*: Rate limit number of new sign ups
### Admitted Author Spamming
*Threat*: An attacker gains write access by paying the admission fee, and then floods the relay with a large number of spam events.
*Mitigation*: The attacker's admission can be revoked and their admission fee will not be refunded. Enabling "cost per event" and increasing the admission cost can also discourage this type of behavior.

View File

@ -4,6 +4,8 @@ use serde::{Deserialize, Serialize};
use std::time::Duration;
use tracing::warn;
use crate::payment::Processor;
#[derive(Debug, Serialize, Deserialize, Clone)]
#[allow(unused)]
pub struct Info {
@ -80,6 +82,20 @@ pub struct Authorization {
pub nip42_auth: bool, // if true enables NIP-42 authentication
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[allow(unused)]
pub struct PayToRelay {
pub enabled: bool,
pub admission_cost: u64, // Cost to have pubkey whitelisted
pub cost_per_event: u64, // Cost author to pay per event
pub node_url: String,
pub api_secret: String,
pub terms_message: String,
pub sign_ups: bool, // allow new users to sign up to relay
pub secret_key: String,
pub processor: Processor,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[allow(unused)]
pub struct Diagnostics {
@ -158,6 +174,7 @@ pub struct Settings {
pub network: Network,
pub limits: Limits,
pub authorization: Authorization,
pub pay_to_relay: PayToRelay,
pub verified_users: VerifiedUsers,
pub retention: Retention,
pub options: Options,
@ -209,6 +226,16 @@ impl Settings {
);
// initialize durations for verified users
settings.verified_users.init();
// Validate pay to relay settings
if settings.pay_to_relay.enabled {
assert_ne!(settings.pay_to_relay.api_secret, "");
// Should check that url is valid
assert_ne!(settings.pay_to_relay.node_url, "");
assert_ne!(settings.pay_to_relay.terms_message, "");
assert_ne!(settings.pay_to_relay.secret_key, "");
}
Ok(settings)
}
}
@ -259,6 +286,17 @@ impl Default for Settings {
pubkey_whitelist: None, // Allow any address to publish
nip42_auth: false, // Disable NIP-42 authentication
},
pay_to_relay: PayToRelay {
enabled: false,
admission_cost: 4200,
cost_per_event: 0,
terms_message: "".to_string(),
node_url: "".to_string(),
api_secret: "".to_string(),
sign_ups: false,
secret_key: "".to_string(),
processor: Processor::LNBits,
},
verified_users: VerifiedUsers {
mode: VerifiedUsersMode::Disabled,
domain_whitelist: None,

122
src/db.rs
View File

@ -4,6 +4,7 @@ use crate::error::{Error, Result};
use crate::event::Event;
use crate::nauthz;
use crate::notice::Notice;
use crate::payment::PaymentMessage;
use crate::repo::postgres::{PostgresPool, PostgresRepo};
use crate::repo::sqlite::SqliteRepo;
use crate::repo::NostrRepo;
@ -19,6 +20,8 @@ use std::thread;
use std::time::{Duration, Instant};
use tracing::log::LevelFilter;
use tracing::{debug, info, trace, warn};
use nostr::key::FromPkStr;
use nostr::key::Keys;
pub type SqlitePool = r2d2::Pool<r2d2_sqlite::SqliteConnectionManager>;
pub type PooledConnection = r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>;
@ -83,6 +86,7 @@ pub async fn db_writer(
mut event_rx: tokio::sync::mpsc::Receiver<SubmittedEvent>,
bcast_tx: tokio::sync::broadcast::Sender<Event>,
metadata_tx: tokio::sync::broadcast::Sender<Event>,
payment_tx: tokio::sync::broadcast::Sender<PaymentMessage>,
mut shutdown: tokio::sync::broadcast::Receiver<()>,
) -> Result<()> {
// are we performing NIP-05 checking?
@ -90,6 +94,10 @@ pub async fn db_writer(
// are we requriing NIP-05 user verification?
let nip05_enabled = settings.verified_users.is_enabled();
let pay_to_relay_enabled = settings.pay_to_relay.enabled;
let cost_per_event = settings.pay_to_relay.cost_per_event;
debug!("Pay to relay: {}", pay_to_relay_enabled);
//upgrade_db(&mut pool.get()?)?;
// Make a copy of the whitelist
@ -136,24 +144,6 @@ pub async fn db_writer(
let subm_event = next_event.unwrap();
let event = subm_event.event;
let notice_tx = subm_event.notice_tx;
// check if this event is authorized.
if let Some(allowed_addrs) = whitelist {
// TODO: incorporate delegated pubkeys
// if the event address is not in allowed_addrs.
if !allowed_addrs.contains(&event.pubkey) {
debug!(
"rejecting event: {}, unauthorized author",
event.get_event_id_prefix()
);
notice_tx
.try_send(Notice::blocked(
event.id,
"pubkey is not allowed to publish to this relay",
))
.ok();
continue;
}
}
// Check that event kind isn't blacklisted
let kinds_blacklist = &settings.limits.event_kind_blacklist.clone();
@ -187,6 +177,91 @@ pub async fn db_writer(
}
}
// Set to none until balance is got from db
// Will stay none if user in whitelisted and does not have to pay to post
// When pay to relay is enabled the whitelist is not a list of who can post
// It is a list of who can post for free
let mut user_balance: Option<u64> = None;
if !pay_to_relay_enabled {
// check if this event is authorized.
if let Some(allowed_addrs) = whitelist {
// TODO: incorporate delegated pubkeys
// if the event address is not in allowed_addrs.
if !allowed_addrs.contains(&event.pubkey) {
debug!(
"rejecting event: {}, unauthorized author",
event.get_event_id_prefix()
);
notice_tx
.try_send(Notice::blocked(
event.id,
"pubkey is not allowed to publish to this relay",
))
.ok();
continue;
}
}
} else {
// If the user is on whitelist there is no need to check if the user is admitted or has balance to post
if whitelist.is_none()
|| (whitelist.is_some() && !whitelist.as_ref().unwrap().contains(&event.pubkey))
{
let key = Keys::from_pk_str(&event.pubkey).unwrap();
match repo.get_account_balance(&key).await {
Ok((user_admitted, balance)) => {
// Checks to make sure user is admitted
if !user_admitted {
debug!("user: {}, is not admitted", &event.pubkey);
// If the user is in DB but not admitted
// Send meeage to payment thread to check if outstanding invoice has been paid
payment_tx
.send(PaymentMessage::CheckAccount(event.pubkey))
.ok();
notice_tx
.try_send(Notice::blocked(event.id, "User is not admitted"))
.ok();
continue;
}
// Checks that user has enough balance to post
// TODO: this should send an invoice to user to top up
if balance < cost_per_event {
debug!("user: {}, does not have a balance", &event.pubkey,);
notice_tx
.try_send(Notice::blocked(event.id, "Insufficient balance"))
.ok();
continue;
}
user_balance = Some(balance);
debug!("User balance: {:?}", user_balance);
}
Err(
Error::SqlError(rusqlite::Error::QueryReturnedNoRows)
| Error::SqlxError(sqlx::Error::RowNotFound),
) => {
// User does not exist
info!("Unregistered user");
if settings.pay_to_relay.sign_ups {
payment_tx
.send(PaymentMessage::NewAccount(event.pubkey))
.ok();
}
let msg = "Pubkey not registered";
notice_tx.try_send(Notice::error(event.id, msg)).ok();
continue;
}
Err(err) => {
warn!("Error checking admission status: {:?}", err);
let msg = "relay experienced an error checking your admission status";
notice_tx.try_send(Notice::error(event.id, msg)).ok();
// Other error
continue;
}
}
}
}
// send any metadata events to the NIP-05 verifier
if nip05_active && event.is_kind_metadata() {
// we are sending this prior to even deciding if we
@ -335,6 +410,17 @@ pub async fn db_writer(
// use rate limit, if defined, and if an event was actually written.
if event_write {
// If pay to relay is diabaled or the cost per event is 0
// No need to update user balance
if pay_to_relay_enabled && cost_per_event > 0 {
// If the user balance is some, user was not on whitelist
// Their balance should be reduced by the cost per event
if let Some(_balance) = user_balance {
let pubkey = Keys::from_pk_str(&event.pubkey)?;
repo.update_account_balance(&pubkey, false, cost_per_event)
.await?;
}
}
if let Some(ref lim) = lim_opt {
if let Err(n) = lim.check() {
let wait_for = n.wait_time_from(clock.now());

View File

@ -72,6 +72,16 @@ pub enum Error {
AuthFailure,
#[error("I/O Error")]
IoError(std::io::Error),
#[error("Event builder error")]
EventError(nostr::event::builder::Error),
#[error("Nostr key error")]
NostrKeyError(nostr::key::Error),
#[error("Payment hash mismatch")]
PaymentHash,
#[error("Error parsing url")]
URLParseError(url::ParseError),
#[error("HTTP error")]
HTTPError(http::Error),
#[error("Unknown/Undocumented")]
UnknownError,
}
@ -153,3 +163,30 @@ impl From<std::io::Error> for Error {
Error::IoError(r)
}
}
impl From<nostr::event::builder::Error> for Error {
/// Wrap event builder error
fn from(r: nostr::event::builder::Error) -> Self {
Error::EventError(r)
}
}
impl From<nostr::key::Error> for Error {
/// Wrap nostr key error
fn from(r: nostr::key::Error) -> Self {
Error::NostrKeyError(r)
}
}
impl From<url::ParseError> for Error {
/// Wrap nostr key error
fn from(r: url::ParseError) -> Self {
Error::URLParseError(r)
}
}
impl From<http::Error> for Error {
/// Wrap nostr key error
fn from(r: http::Error) -> Self {
Error::HTTPError(r)
}
}

View File

@ -424,6 +424,22 @@ impl Event {
}
}
impl From<nostr::Event> for Event {
fn from(nostr_event: nostr::Event) -> Self {
Event {
id: nostr_event.id.to_hex(),
pubkey: nostr_event.pubkey.to_string(),
created_at: nostr_event.created_at.as_u64(),
kind: nostr_event.kind.as_u64(),
tags: nostr_event.tags.iter().map(|x| x.as_vec()).collect(),
content: nostr_event.content,
sig: nostr_event.sig.to_string(),
delegated_by: None,
tagidx: None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -4,6 +4,32 @@ use crate::config::Settings;
use serde::{Deserialize, Serialize};
pub const CARGO_PKG_VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION");
pub const UNIT: &str = "sats";
/// Limitations of the relay as specified in NIP-111
/// (This nip isn't finalized so may change)
#[derive(Debug, Serialize, Deserialize)]
#[allow(unused)]
pub struct Limitation {
#[serde(skip_serializing_if = "Option::is_none")]
payment_required: Option<bool>,
}
#[derive(Serialize, Deserialize, Debug)]
#[allow(unused)]
pub struct Fees {
#[serde(skip_serializing_if = "Option::is_none")]
admission: Option<Vec<Fee>>,
#[serde(skip_serializing_if = "Option::is_none")]
publication: Option<Vec<Fee>>,
}
#[derive(Serialize, Deserialize, Debug)]
#[allow(unused)]
pub struct Fee {
amount: u64,
unit: String,
}
#[derive(Debug, Serialize, Deserialize)]
#[allow(unused)]
@ -24,6 +50,12 @@ pub struct RelayInfo {
pub software: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub version: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub limitation: Option<Limitation>,
#[serde(skip_serializing_if = "Option::is_none")]
pub payment_url: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub fees: Option<Fees>,
}
/// Convert an Info configuration into public Relay Info
@ -37,6 +69,48 @@ impl From<Settings> for RelayInfo {
}
let i = c.info;
let p = c.pay_to_relay;
let limitations = Limitation {
payment_required: Some(p.enabled),
};
let (payment_url, fees) = if p.enabled {
let admission_fee = if p.admission_cost > 0 {
Some(vec![Fee {
amount: p.admission_cost,
unit: UNIT.to_string(),
}])
} else {
None
};
let post_fee = if p.cost_per_event > 0 {
Some(vec![Fee {
amount: p.cost_per_event,
unit: UNIT.to_string(),
}])
} else {
None
};
let fees = Fees {
admission: admission_fee,
publication: post_fee,
};
let payment_url = if p.enabled && i.relay_url.is_some() {
Some(format!(
"{}join",
i.relay_url.clone().unwrap().replace("ws", "http")
))
} else {
None
};
(payment_url, Some(fees))
} else {
(None, None)
};
RelayInfo {
id: i.relay_url,
@ -47,6 +121,9 @@ impl From<Settings> for RelayInfo {
supported_nips: Some(supported_nips),
software: Some("https://git.sr.ht/~gheartsfield/nostr-rs-relay".to_owned()),
version: CARGO_PKG_VERSION.map(std::borrow::ToOwned::to_owned),
limitation: Some(limitations),
payment_url,
fees,
}
}
}

View File

@ -15,4 +15,5 @@ pub mod repo;
pub mod subscription;
pub mod utils;
// Public API for creating relays programatically
pub mod payment;
pub mod server;

173
src/payment/lnbits.rs Normal file
View File

@ -0,0 +1,173 @@
//! LNBits payment processor
use http::Uri;
use hyper::client::connect::HttpConnector;
use hyper::Client;
use hyper_tls::HttpsConnector;
use nostr::Keys;
use serde::{Deserialize, Serialize};
use async_trait::async_trait;
use rand::Rng;
use tracing::debug;
use std::str::FromStr;
use url::Url;
use crate::{config::Settings, error::Error};
use super::{InvoiceInfo, InvoiceStatus, PaymentProcessor};
const APIPATH: &str = "/api/v1/payments/";
/// Info LNBits expects in create invoice request
#[derive(Serialize, Deserialize, Debug)]
pub struct LNBitsCreateInvoice {
out: bool,
amount: u64,
memo: String,
webhook: String,
unit: String,
internal: bool,
expiry: u64,
}
/// Invoice response for LN bits
#[derive(Debug, Serialize, Deserialize)]
pub struct LNBitsCreateInvoiceResponse {
payment_hash: String,
payment_request: String,
}
/// LNBits call back response
/// Used when an invoice is paid
/// lnbits to post the status change to relay
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct LNBitsCallback {
pub checking_id: String,
pub pending: bool,
pub amount: u64,
pub memo: String,
pub time: u64,
pub bolt11: String,
pub preimage: String,
pub payment_hash: String,
pub wallet_id: String,
pub webhook: String,
pub webhook_status: Option<String>,
}
/// LN Bits repose for check invoice endpoint
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct LNBitsCheckInvoiceResponse {
paid: bool,
}
#[derive(Clone)]
pub struct LNBitsPaymentProcessor {
/// HTTP client
client: hyper::Client<HttpsConnector<HttpConnector>, hyper::Body>,
settings: Settings,
}
impl LNBitsPaymentProcessor {
pub fn new(settings: &Settings) -> Self {
// setup hyper client
let https = HttpsConnector::new();
let client = Client::builder().build::<_, hyper::Body>(https);
Self {
client,
settings: settings.clone(),
}
}
}
#[async_trait]
impl PaymentProcessor for LNBitsPaymentProcessor {
/// Calls LNBits api to ger new invoice
async fn get_invoice(&self, key: &Keys, amount: u64) -> Result<InvoiceInfo, Error> {
let random_number: u16 = rand::thread_rng().gen();
let memo = format!("{}: {}", random_number, key.public_key());
let callback_url = Url::parse(
&self
.settings
.info
.relay_url
.clone()
.unwrap()
.replace("ws", "http"),
)?
.join("lnbits")?;
let body = LNBitsCreateInvoice {
out: false,
amount,
memo: memo.clone(),
webhook: callback_url.to_string(),
unit: "sat".to_string(),
internal: false,
expiry: 3600,
};
let url = Url::parse(&self.settings.pay_to_relay.node_url)?.join(APIPATH)?;
let uri = Uri::from_str(url.as_str().strip_suffix("/").unwrap_or(url.as_str())).unwrap();
debug!("{uri}");
let req = hyper::Request::builder()
.method(hyper::Method::POST)
.uri(uri)
.header("X-Api-Key", &self.settings.pay_to_relay.api_secret)
.body(hyper::Body::from(serde_json::to_string(&body)?))
.expect("request builder");
let res = self.client.request(req).await?;
debug!("{res:?}");
// Json to Struct of LNbits callback
let body = hyper::body::to_bytes(res.into_body()).await?;
let invoice_response: LNBitsCreateInvoiceResponse = serde_json::from_slice(&body)?;
debug!("{:?}", invoice_response);
Ok(InvoiceInfo {
pubkey: key.public_key().to_string(),
payment_hash: invoice_response.payment_hash,
bolt11: invoice_response.payment_request,
amount,
memo,
status: InvoiceStatus::Unpaid,
confirmed_at: None,
})
}
/// Calls LNBits Api to check the payment status of invoice
async fn check_invoice(&self, payment_hash: &str) -> Result<InvoiceStatus, Error> {
let url = Url::parse(&self.settings.pay_to_relay.node_url)?
.join(APIPATH)?
.join(payment_hash)?;
let uri = Uri::from_str(url.as_str()).unwrap();
debug!("{uri}");
let req = hyper::Request::builder()
.method(hyper::Method::GET)
.uri(uri)
.header("X-Api-Key", &self.settings.pay_to_relay.api_secret)
.body(hyper::Body::empty())
.expect("request builder");
let res = self.client.request(req).await?;
// Json to Struct of LNbits callback
let body = hyper::body::to_bytes(res.into_body()).await?;
debug!("check invoice: {body:?}");
let invoice_response: LNBitsCheckInvoiceResponse = serde_json::from_slice(&body)?;
let status = if invoice_response.paid {
InvoiceStatus::Paid
} else {
InvoiceStatus::Unpaid
};
Ok(status)
}
}

261
src/payment/mod.rs Normal file
View File

@ -0,0 +1,261 @@
use crate::error::{Error, Result};
use crate::event::Event;
use crate::payment::lnbits::LNBitsPaymentProcessor;
use crate::repo::NostrRepo;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tracing::{info, warn};
use async_trait::async_trait;
use nostr::key::{FromPkStr, FromSkStr};
use nostr::{key::Keys, Event as NostrEvent, EventBuilder};
pub mod lnbits;
/// Payment handler
pub struct Payment {
/// Repository for saving/retrieving events and events
repo: Arc<dyn NostrRepo>,
/// Newly validated events get written and then broadcast on this channel to subscribers
event_tx: tokio::sync::broadcast::Sender<Event>,
/// Payment message sender
payment_tx: tokio::sync::broadcast::Sender<PaymentMessage>,
/// Payment message receiver
payment_rx: tokio::sync::broadcast::Receiver<PaymentMessage>,
/// Settings
settings: crate::config::Settings,
// Nostr Keys
nostr_keys: Keys,
/// Payment Processor
processor: Arc<dyn PaymentProcessor>,
}
#[async_trait]
pub trait PaymentProcessor: Send + Sync {
/// Get invoice from processor
async fn get_invoice(&self, keys: &Keys, amount: u64) -> Result<InvoiceInfo, Error>;
/// Check payment status of an invoice
async fn check_invoice(&self, payment_hash: &str) -> Result<InvoiceStatus, Error>;
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub enum Processor {
LNBits,
}
/// Possible states of an invoice
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, sqlx::Type)]
#[sqlx(type_name = "status")]
pub enum InvoiceStatus {
Unpaid,
Paid,
Expired,
}
impl ToString for InvoiceStatus {
fn to_string(&self) -> String {
match self {
InvoiceStatus::Paid => "Paid".to_string(),
InvoiceStatus::Unpaid => "Unpaid".to_string(),
InvoiceStatus::Expired => "Expired".to_string(),
}
}
}
/// Invoice information
#[derive(Debug, Clone)]
pub struct InvoiceInfo {
pub pubkey: String,
pub payment_hash: String,
pub bolt11: String,
pub amount: u64,
pub status: InvoiceStatus,
pub memo: String,
pub confirmed_at: Option<u64>,
}
/// Message variants for the payment channel
#[derive(Debug, Clone)]
pub enum PaymentMessage {
/// New account
NewAccount(String),
/// Check account,
CheckAccount(String),
/// Account Admitted
AccountAdmitted(String),
/// Invoice generated
Invoice(String, InvoiceInfo),
/// Invoice call back
/// Payment hash is passed
// This may have to be changed to better support other processors
InvoicePaid(String),
}
impl Payment {
pub fn new(
repo: Arc<dyn NostrRepo>,
payment_tx: tokio::sync::broadcast::Sender<PaymentMessage>,
payment_rx: tokio::sync::broadcast::Receiver<PaymentMessage>,
event_tx: tokio::sync::broadcast::Sender<Event>,
settings: crate::config::Settings,
) -> Result<Self> {
info!("Create payment handler");
// Create nostr key from sk string
let nostr_keys = Keys::from_sk_str(&settings.pay_to_relay.secret_key)?;
// Create processor kind defined in settings
let processor = match &settings.pay_to_relay.processor {
Processor::LNBits => Arc::new(LNBitsPaymentProcessor::new(&settings)),
};
Ok(Payment {
repo,
payment_tx,
payment_rx,
event_tx,
settings,
nostr_keys,
processor,
})
}
/// Perform Payment tasks
pub async fn run(&mut self) {
loop {
let res = self.run_internal().await;
if let Err(e) = res {
info!("error in payment: {:?}", e);
}
}
}
/// Internal select loop for preforming payment operatons
async fn run_internal(&mut self) -> Result<()> {
tokio::select! {
m = self.payment_rx.recv() => {
match m {
Ok(PaymentMessage::NewAccount(pubkey)) => {
info!("payment event for {:?}", pubkey);
// REVIEW: This will need to change for cost per event
let amount = self.settings.pay_to_relay.admission_cost;
let invoice_info = self.get_invoice_info(&pubkey, amount).await?;
// TODO: should handle this error
self.payment_tx.send(PaymentMessage::Invoice(pubkey, invoice_info)).ok();
},
// Gets the most recent unpaid invoice from database
// Checks LNbits to verify if paid/unpaid
Ok(PaymentMessage::CheckAccount(pubkey)) => {
let keys = Keys::from_pk_str(&pubkey)?;
if let Some(invoice_info) = self.repo.get_unpaid_invoice(&keys).await? {
match self.check_invoice_status(&invoice_info.payment_hash).await? {
InvoiceStatus::Paid => {
self.repo.admit_account(&keys, self.settings.pay_to_relay.admission_cost).await?;
self.payment_tx.send(PaymentMessage::AccountAdmitted(pubkey)).ok();
}
_ => {
self.payment_tx.send(PaymentMessage::Invoice(pubkey, invoice_info)).ok();
}
}
}
}
Ok(PaymentMessage::InvoicePaid(payment_hash)) => {
if self.check_invoice_status(&payment_hash).await?.eq(&InvoiceStatus::Paid) {
let pubkey = self.repo
.update_invoice(&payment_hash, InvoiceStatus::Paid)
.await?;
let key = Keys::from_pk_str(&pubkey)?;
self.repo.admit_account(&key, self.settings.pay_to_relay.admission_cost).await?;
}
}
Ok(_) => {
// For this variant nothing need to be done here
// it is used by `server`
}
Err(err) => warn!("Payment RX: {err}")
}
}
}
Ok(())
}
/// Sends Nostr DM to pubkey that requested invoice
/// Two events the terms followed by the bolt11 invoice
pub async fn send_admission_message(
&self,
pubkey: &str,
invoice_info: &InvoiceInfo,
) -> Result<()> {
// Create Nostr key from pk
let key = Keys::from_pk_str(pubkey)?;
let pubkey = key.public_key();
// Event DM with terms of service
let message_event: NostrEvent = EventBuilder::new_encrypted_direct_msg(
&self.nostr_keys,
pubkey,
&self.settings.pay_to_relay.terms_message,
)?
.to_event(&self.nostr_keys)?;
// Event DM with invoice
let invoice_event: NostrEvent =
EventBuilder::new_encrypted_direct_msg(&self.nostr_keys, pubkey, &invoice_info.bolt11)?
.to_event(&self.nostr_keys)?;
// Persist DM events to DB
self.repo.write_event(&message_event.clone().into()).await?;
self.repo.write_event(&invoice_event.clone().into()).await?;
// Broadcast DM events
self.event_tx.send(message_event.clone().into()).ok();
self.event_tx.send(invoice_event.clone().into()).ok();
Ok(())
}
/// Get Invoice Info
/// If the has an active invoice that will be return
/// Otherwise a new invoice will be generated by the payment processor
pub async fn get_invoice_info(&self, pubkey: &str, amount: u64) -> Result<InvoiceInfo> {
// If user is already in DB this will be false
// This avoids recreating admission invoices
// I think it will continue to send DMs with the invoice
// If client continues to try and write to the relay (will be same invoice)
let key = Keys::from_pk_str(pubkey)?;
if !self.repo.create_account(&key).await? {
if let Ok(Some(invoice_info)) = self.repo.get_unpaid_invoice(&key).await {
return Ok(invoice_info);
}
}
let key = Keys::from_pk_str(pubkey)?;
let invoice_info = self.processor.get_invoice(&key, amount).await?;
// Persist invoice to DB
self.repo
.create_invoice_record(&key, invoice_info.clone())
.await?;
// Admission event invoice and terms to pubkey that is joining
self.send_admission_message(pubkey, &invoice_info).await?;
Ok(invoice_info)
}
/// Check paid status of invoice with LNbits
pub async fn check_invoice_status(&self, payment_hash: &str) -> Result<InvoiceStatus, Error> {
// Check base if passed expiry time
let status = self.processor.check_invoice(payment_hash).await?;
self.repo
.update_invoice(payment_hash, status.clone())
.await?;
Ok(status)
}
}

View File

@ -2,9 +2,11 @@ use crate::db::QueryResult;
use crate::error::Result;
use crate::event::Event;
use crate::nip05::VerificationRecord;
use crate::payment::{InvoiceInfo, InvoiceStatus};
use crate::subscription::Subscription;
use crate::utils::unix_time;
use async_trait::async_trait;
use nostr::Keys;
use rand::Rng;
pub mod postgres;
@ -57,6 +59,33 @@ pub trait NostrRepo: Send + Sync {
/// Get oldest verification before timestamp
async fn get_oldest_user_verification(&self, before: u64) -> Result<VerificationRecord>;
/// Create a new account
async fn create_account(&self, pubkey: &Keys) -> Result<bool>;
/// Admit an account
async fn admit_account(&self, pubkey: &Keys, admission_cost: u64) -> Result<()>;
/// Gets user balance if they are an admitted pubkey
async fn get_account_balance(&self, pubkey: &Keys) -> Result<(bool, u64)>;
/// Update account balance
async fn update_account_balance(
&self,
pub_key: &Keys,
positive: bool,
new_balance: u64,
) -> Result<()>;
/// Create invoice record
async fn create_invoice_record(&self, pubkey: &Keys, invoice_info: InvoiceInfo) -> Result<()>;
/// Update Invoice for given payment hash
async fn update_invoice(&self, payment_hash: &str, status: InvoiceStatus) -> Result<String>;
/// Get the most recent invoice for a given pubkey
/// invoice must be unpaid and not expired
async fn get_unpaid_invoice(&self, pubkey: &Keys) -> Result<Option<InvoiceInfo>>;
}
// Current time, with a slight forward jitter in seconds

View File

@ -2,6 +2,7 @@ use crate::db::QueryResult;
use crate::error::Result;
use crate::event::{single_char_tagname, Event};
use crate::nip05::{Nip05Name, VerificationRecord};
use crate::payment::{InvoiceInfo, InvoiceStatus};
use crate::repo::{now_jitter, NostrRepo};
use crate::subscription::{ReqFilter, Subscription};
use async_std::stream::StreamExt;
@ -17,6 +18,7 @@ use crate::hexrange::{hex_range, HexSearch};
use crate::repo::postgres_migration::run_migrations;
use crate::server::NostrMetrics;
use crate::utils::{self, is_hex, is_lower_hex};
use nostr::key::Keys;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot::Receiver;
use tracing::log::trace;
@ -160,6 +162,7 @@ ON CONFLICT (id) DO NOTHING"#,
.execute(&mut tx)
.await?
.rows_affected();
if ins_count == 0 {
// if the event was a duplicate, no need to insert event or
// pubkey references. This will abort the txn.
@ -184,7 +187,8 @@ ON CONFLICT (id) DO NOTHING"#,
.bind(tag_name)
.bind(hex::decode(tag_val).ok())
.execute(&mut tx)
.await?;
.await
.unwrap();
} else {
sqlx::query("INSERT INTO tag (event_id, \"name\", value, value_hex) VALUES($1, $2, $3, NULL) \
ON CONFLICT (event_id, \"name\", value, value_hex) DO NOTHING")
@ -192,7 +196,8 @@ ON CONFLICT (id) DO NOTHING"#,
.bind(tag_name)
.bind(tag_val.as_bytes())
.execute(&mut tx)
.await?;
.await
.unwrap();
}
}
None => {}
@ -543,6 +548,172 @@ ON CONFLICT (id) DO NOTHING"#,
.await?
.ok_or(error::Error::SqlxError(RowNotFound))
}
async fn create_account(&self, pub_key: &Keys) -> Result<bool> {
let pub_key = pub_key.public_key().to_string();
let mut tx = self.conn.begin().await?;
let result = sqlx::query("INSERT INTO account (pubkey, balance) VALUES ($1, 0);")
.bind(pub_key)
.execute(&mut tx)
.await;
let success = match result {
Ok(res) => {
tx.commit().await?;
res.rows_affected() == 1
}
Err(_err) => false,
};
Ok(success)
}
/// Admit account
async fn admit_account(&self, pub_key: &Keys, admission_cost: u64) -> Result<()> {
let pub_key = pub_key.public_key().to_string();
sqlx::query(
"UPDATE account SET is_admitted = TRUE, balance = balance - $1 WHERE pubkey = $2",
)
.bind(admission_cost as i64)
.bind(pub_key)
.execute(&self.conn)
.await?;
Ok(())
}
/// Gets if the account is admitted and balance
async fn get_account_balance(&self, pub_key: &Keys) -> Result<(bool, u64)> {
let pub_key = pub_key.public_key().to_string();
let query = r#"SELECT
is_admitted,
balance
FROM account
WHERE pubkey = $1
LIMIT 1"#;
let result = sqlx::query_as::<_, (bool, i64)>(query)
.bind(pub_key)
.fetch_optional(&self.conn)
.await?
.ok_or(error::Error::SqlxError(RowNotFound))?;
Ok((result.0, result.1 as u64))
}
/// Update account balance
async fn update_account_balance(
&self,
pub_key: &Keys,
positive: bool,
new_balance: u64,
) -> Result<()> {
let pub_key = pub_key.public_key().to_string();
match positive {
true => {
sqlx::query("UPDATE account SET balance = balance + $1 WHERE pubkey = $2")
.bind(new_balance as i64)
.bind(pub_key)
.execute(&self.conn)
.await?
}
false => {
sqlx::query("UPDATE account SET balance = balance - $1 WHERE pubkey = $2")
.bind(new_balance as i64)
.bind(pub_key)
.execute(&self.conn)
.await?
}
};
Ok(())
}
/// Create invoice record
async fn create_invoice_record(&self, pub_key: &Keys, invoice_info: InvoiceInfo) -> Result<()> {
let pub_key = pub_key.public_key().to_string();
let mut tx = self.conn.begin().await?;
sqlx::query(
"INSERT INTO invoice (pubkey, payment_hash, amount, status, description, created_at, invoice) VALUES ($1, $2, $3, $4, $5, now(), $6)",
)
.bind(pub_key)
.bind(invoice_info.payment_hash)
.bind(invoice_info.amount as i64)
.bind(invoice_info.status)
.bind(invoice_info.memo)
.bind(invoice_info.bolt11)
.execute(&mut tx)
.await.unwrap();
debug!("Invoice added");
tx.commit().await?;
Ok(())
}
/// Update invoice record
async fn update_invoice(&self, payment_hash: &str, status: InvoiceStatus) -> Result<String> {
debug!("Payment Hash: {}", payment_hash);
let query = "SELECT pubkey, status, amount FROM invoice WHERE payment_hash=$1;";
let (pubkey, prev_invoice_status, amount) =
sqlx::query_as::<_, (String, InvoiceStatus, i64)>(query)
.bind(payment_hash)
.fetch_optional(&self.conn)
.await?
.ok_or(error::Error::SqlxError(RowNotFound))?;
// If the invoice is paid update the confirmed at timestamp
let query = if status.eq(&InvoiceStatus::Paid) {
"UPDATE invoice SET status=$1, confirmed_at = now() WHERE payment_hash=$2;"
} else {
"UPDATE invoice SET status=$1 WHERE payment_hash=$2;"
};
sqlx::query(query)
.bind(&status)
.bind(payment_hash)
.execute(&self.conn)
.await?;
if prev_invoice_status.eq(&InvoiceStatus::Unpaid) && status.eq(&InvoiceStatus::Paid) {
sqlx::query("UPDATE account SET balance = balance + $1 WHERE pubkey = $2")
.bind(amount)
.bind(&pubkey)
.execute(&self.conn)
.await?;
}
Ok(pubkey)
}
/// Get the most recent invoice for a given pubkey
/// invoice must be unpaid and not expired
async fn get_unpaid_invoice(&self, pubkey: &Keys) -> Result<Option<InvoiceInfo>> {
let query = r#"
SELECT amount, payment_hash, description, invoice
FROM invoice
WHERE pubkey = $1
ORDER BY created_at DESC
LIMIT 1;
"#;
match sqlx::query_as::<_, (i64, String, String, String)>(query)
.bind(pubkey.public_key().to_string())
.fetch_optional(&self.conn)
.await
.unwrap()
{
Some((amount, payment_hash, description, invoice)) => Ok(Some(InvoiceInfo {
pubkey: pubkey.public_key().to_string(),
payment_hash,
bolt11: invoice,
amount: amount as u64,
status: InvoiceStatus::Unpaid,
memo: description,
confirmed_at: None,
})),
None => Ok(None),
}
}
}
/// Create a dynamic SQL query and params from a subscription filter.

View File

@ -36,6 +36,7 @@ pub async fn run_migrations(db: &PostgresPool) -> crate::error::Result<usize> {
}
run_migration(m003::migration(), db).await;
run_migration(m004::migration(), db).await;
run_migration(m005::migration(), db).await;
Ok(current_version(db).await as usize)
}
@ -277,3 +278,43 @@ CREATE INDEX event_expires_at_idx ON "event" (expires_at);
}
}
}
mod m005 {
use crate::repo::postgres_migration::{Migration, SimpleSqlMigration};
pub const VERSION: i64 = 5;
pub fn migration() -> impl Migration {
SimpleSqlMigration {
serial_number: VERSION,
sql: vec![
r#"
-- Create account table
CREATE TABLE "account" (
pubkey varchar NOT NULL,
is_admitted BOOLEAN NOT NULL DEFAULT FALSE,
balance BIGINT NOT NULL DEFAULT 0,
tos_accepted_at TIMESTAMP,
CONSTRAINT account_pkey PRIMARY KEY (pubkey)
);
CREATE TYPE status AS ENUM ('Paid', 'Unpaid', 'Expired');
CREATE TABLE "invoice" (
payment_hash varchar NOT NULL,
pubkey varchar NOT NULL,
invoice varchar NOT NULL,
amount BIGINT NOT NULL,
status status NOT NULL DEFAULT 'Unpaid',
description varchar,
created_at timestamp,
confirmed_at timestamp,
CONSTRAINT invoice_payment_hash PRIMARY KEY (payment_hash),
CONSTRAINT invoice_pubkey_fkey FOREIGN KEY (pubkey) REFERENCES account (pubkey) ON DELETE CASCADE
);
"#,
],
}
}
}

View File

@ -2,12 +2,12 @@
//use crate::config::SETTINGS;
use crate::config::Settings;
use crate::db::QueryResult;
use crate::error::Error::SqlError;
use crate::error::Result;
use crate::error::{Error::SqlError, Result};
use crate::event::{single_char_tagname, Event};
use crate::hexrange::hex_range;
use crate::hexrange::HexSearch;
use crate::nip05::{Nip05Name, VerificationRecord};
use crate::payment::{InvoiceInfo, InvoiceStatus};
use crate::repo::sqlite_migration::{upgrade_db, STARTUP_SQL};
use crate::server::NostrMetrics;
use crate::subscription::{ReqFilter, Subscription};
@ -30,6 +30,7 @@ use tokio::task;
use tracing::{debug, info, trace, warn};
use crate::repo::{now_jitter, NostrRepo};
use nostr::key::Keys;
pub type SqlitePool = r2d2::Pool<r2d2_sqlite::SqliteConnectionManager>;
pub type PooledConnection = r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>;
@ -723,6 +724,209 @@ impl NostrRepo for SqliteRepo {
Ok(vr)
}).await?
}
/// Create account
async fn create_account(&self, pub_key: &Keys) -> Result<bool> {
let pub_key = pub_key.public_key().to_string();
let mut conn = self.write_pool.get()?;
let ins_count = tokio::task::spawn_blocking(move || {
let tx = conn.transaction()?;
let ins_count: u64;
{
// Ignore if user is already in db
let query = "INSERT OR IGNORE INTO account (pubkey, is_admitted, balance) VALUES (?1, ?2, ?3);";
let mut stmt = tx.prepare(query)?;
ins_count = stmt.execute(params![&pub_key, false, 0])? as u64;
}
tx.commit()?;
let ok: Result<u64> = Ok(ins_count);
ok
}).await??;
if ins_count != 1 {
return Ok(false);
}
Ok(true)
}
/// Admit account
async fn admit_account(&self, pub_key: &Keys, admission_cost: u64) -> Result<()> {
let pub_key = pub_key.public_key().to_string();
let mut conn = self.write_pool.get()?;
let pub_key = pub_key.to_owned();
tokio::task::spawn_blocking(move || {
let tx = conn.transaction()?;
{
let query = "UPDATE account SET is_admitted = TRUE, tos_accepted_at = strftime('%s','now'), balance = balance - ?1 WHERE pubkey=?2;";
let mut stmt = tx.prepare(query)?;
stmt.execute(params![admission_cost, pub_key])?;
}
tx.commit()?;
let ok: Result<()> = Ok(());
ok
})
.await?
}
/// Gets if the account is admitted and balance
async fn get_account_balance(&self, pub_key: &Keys) -> Result<(bool, u64)> {
let pub_key = pub_key.public_key().to_string();
let mut conn = self.write_pool.get()?;
let pub_key = pub_key.to_owned();
tokio::task::spawn_blocking(move || {
let tx = conn.transaction()?;
let query = "SELECT is_admitted, balance FROM account WHERE pubkey = ?1;";
let mut stmt = tx.prepare_cached(query)?;
let fields = stmt.query_row(params![pub_key], |r| {
let is_admitted: bool = r.get(0)?;
let balance: u64 = r.get(1)?;
// create a tuple since we can't throw non-rusqlite errors in this closure
Ok((is_admitted, balance))
})?;
Ok(fields)
})
.await?
}
/// Update account balance
async fn update_account_balance(
&self,
pub_key: &Keys,
positive: bool,
new_balance: u64,
) -> Result<()> {
let pub_key = pub_key.public_key().to_string();
let mut conn = self.write_pool.get()?;
tokio::task::spawn_blocking(move || {
let tx = conn.transaction()?;
{
let query = if positive {
"UPDATE account SET balance=balance + ?1 WHERE pubkey=?2"
} else {
"UPDATE account SET balance=balance - ?1 WHERE pubkey=?2"
};
let mut stmt = tx.prepare(query)?;
stmt.execute(params![new_balance, pub_key])?;
}
tx.commit()?;
let ok: Result<()> = Ok(());
ok
})
.await?
}
/// Create invoice record
async fn create_invoice_record(&self, pub_key: &Keys, invoice_info: InvoiceInfo) -> Result<()> {
let pub_key = pub_key.public_key().to_string();
let pub_key = pub_key.to_owned();
let mut conn = self.write_pool.get()?;
tokio::task::spawn_blocking(move || {
let tx = conn.transaction()?;
{
let query = "INSERT INTO invoice (pubkey, payment_hash, amount, status, description, created_at, invoice) VALUES (?1, ?2, ?3, ?4, ?5, strftime('%s','now'), ?6);";
let mut stmt = tx.prepare(query)?;
stmt.execute(params![&pub_key, invoice_info.payment_hash, invoice_info.amount, invoice_info.status.to_string(), invoice_info.memo, invoice_info.bolt11])?;
}
tx.commit()?;
let ok: Result<()> = Ok(());
ok
}).await??;
Ok(())
}
/// Update invoice record
async fn update_invoice(&self, payment_hash: &str, status: InvoiceStatus) -> Result<String> {
let mut conn = self.write_pool.get()?;
let payment_hash = payment_hash.to_owned();
let pub_key = tokio::task::spawn_blocking(move || {
let tx = conn.transaction()?;
let pubkey: String;
{
// Get required invoice info for given payment hash
let query = "SELECT pubkey, status, amount FROM invoice WHERE payment_hash=?1;";
let mut stmt = tx.prepare(query)?;
let (pub_key, prev_status, amount) = stmt.query_row(params![payment_hash], |r| {
let pub_key: String = r.get(0)?;
let status: String = r.get(1)?;
let amount: u64 = r.get(2)?;
Ok((pub_key, status, amount))
})?;
// If the invoice is paid update the confirmed_at timestamp
let query = if status.eq(&InvoiceStatus::Paid) {
"UPDATE invoice SET status=?1, confirmed_at = strftime('%s', 'now') WHERE payment_hash=?2;"
} else {
"UPDATE invoice SET status=?1 WHERE payment_hash=?2;"
};
let mut stmt = tx.prepare(query)?;
stmt.execute(params![status.to_string(), payment_hash])?;
// Increase account balance by given invoice amount
if prev_status == "Unpaid" && status.eq(&InvoiceStatus::Paid) {
let query =
"UPDATE account SET balance = balance + ?1 WHERE pubkey = ?2;";
let mut stmt = tx.prepare(query)?;
stmt.execute(params![amount, pub_key])?;
}
pubkey = pub_key;
}
tx.commit()?;
let ok: Result<String> = Ok(pubkey);
ok
})
.await?;
pub_key
}
/// Get the most recent invoice for a given pubkey
/// invoice must be unpaid and not expired
async fn get_unpaid_invoice(&self, pubkey: &Keys) -> Result<Option<InvoiceInfo>> {
let mut conn = self.write_pool.get()?;
let pubkey = pubkey.to_owned();
let pubkey_str = pubkey.clone().public_key().to_string();
let (payment_hash, invoice, amount, description) = tokio::task::spawn_blocking(move || {
let tx = conn.transaction()?;
let query = r#"
SELECT amount, payment_hash, description, invoice
FROM invoice
WHERE pubkey = ?1 AND status = 'Unpaid'
ORDER BY created_at DESC
LIMIT 1;
"#;
let mut stmt = tx.prepare(query).unwrap();
stmt.query_row(params![&pubkey_str], |r| {
let amount: u64 = r.get(0)?;
let payment_hash: String = r.get(1)?;
let description: String = r.get(2)?;
let invoice: String = r.get(3)?;
Ok((payment_hash, invoice, amount, description))
})
})
.await??;
Ok(Some(InvoiceInfo {
pubkey: pubkey.public_key().to_string(),
payment_hash,
bolt11: invoice,
amount,
status: InvoiceStatus::Unpaid,
memo: description,
confirmed_at: None,
}))
}
}
/// Decide if there is an index that should be used explicitly

View File

@ -23,7 +23,7 @@ pragma mmap_size = 17179869184; -- cap mmap at 16GB
"##;
/// Latest database version
pub const DB_VERSION: usize = 17;
pub const DB_VERSION: usize = 18;
/// Schema definition
const INIT_SQL: &str = formatcp!(
@ -96,6 +96,35 @@ FOREIGN KEY(metadata_event) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CAS
);
CREATE INDEX IF NOT EXISTS user_verification_name_index ON user_verification(name);
CREATE INDEX IF NOT EXISTS user_verification_event_index ON user_verification(metadata_event);
-- Create account table
CREATE TABLE IF NOT EXISTS account (
pubkey TEXT PRIMARY KEY,
is_admitted INTEGER NOT NULL DEFAULT 0,
balance INTEGER NOT NULL DEFAULT 0,
tos_accepted_at INTEGER
);
-- Create account index
CREATE INDEX IF NOT EXISTS user_pubkey_index ON account(pubkey);
-- Invoice table
CREATE TABLE IF NOT EXISTS invoice (
payment_hash TEXT PRIMARY KEY,
pubkey TEXT NOT NULL,
invoice TEXT NOT NULL,
amount INTEGER NOT NULL,
status TEXT CHECK ( status IN ('Paid', 'Unpaid', 'Expired' ) ) NOT NUll DEFAULT 'Unpaid',
description TEXT,
created_at INTEGER NOT NULL,
confirmed_at INTEGER,
CONSTRAINT invoice_pubkey_fkey FOREIGN KEY (pubkey) REFERENCES account (pubkey) ON DELETE CASCADE
);
-- Create invoice index
CREATE INDEX IF NOT EXISTS invoice_pubkey_index ON invoice(pubkey);
"##,
DB_VERSION
);
@ -213,6 +242,9 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<usize> {
if curr_version == 16 {
curr_version = mig_16_to_17(conn)?;
}
if curr_version == 17 {
curr_version = mig_17_to_18(conn)?;
}
if curr_version == DB_VERSION {
info!(
@ -760,3 +792,50 @@ PRAGMA user_version = 17;
}
Ok(17)
}
fn mig_17_to_18(conn: &mut PooledConnection) -> Result<usize> {
info!("database schema needs update from 17->18");
let upgrade_sql = r##"
-- Create invoices table
CREATE TABLE IF NOT EXISTS invoice (
payment_hash TEXT PRIMARY KEY,
pubkey TEXT NOT NULL,
invoice TEXT NOT NULL,
amount INTEGER NOT NULL,
status TEXT CHECK ( status IN ('Paid', 'Unpaid', 'Expired' ) ) NOT NUll DEFAULT 'Unpaid',
description TEXT,
created_at INTEGER NOT NULL,
confirmed_at INTEGER,
CONSTRAINT invoice_pubkey_fkey FOREIGN KEY (pubkey) REFERENCES account (pubkey) ON DELETE CASCADE
);
-- Create invoice index
CREATE INDEX IF NOT EXISTS invoice_pubkey_index ON invoice(pubkey);
-- Create account table
CREATE TABLE IF NOT EXISTS account (
pubkey TEXT PRIMARY KEY,
is_admitted INTEGER NOT NULL DEFAULT 0,
balance INTEGER NOT NULL DEFAULT 0,
tos_accepted_at INTEGER
);
-- Create account index
CREATE INDEX IF NOT EXISTS account_pubkey_index ON account(pubkey);
pragma optimize;
PRAGMA user_version = 17;
"##;
match conn.execute_batch(upgrade_sql) {
Ok(()) => {
info!("database schema upgraded v17 -> v18");
}
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be upgraded");
}
}
Ok(18)
}

View File

@ -12,6 +12,9 @@ use crate::event::EventWrapper;
use crate::info::RelayInfo;
use crate::nip05;
use crate::notice::Notice;
use crate::payment;
use crate::payment::InvoiceInfo;
use crate::payment::PaymentMessage;
use crate::repo::NostrRepo;
use crate::server::Error::CommandUnknownError;
use crate::server::EventWrapper::{WrappedAuth, WrappedEvent};
@ -20,6 +23,7 @@ use futures::SinkExt;
use futures::StreamExt;
use governor::{Jitter, Quota, RateLimiter};
use http::header::HeaderMap;
use hyper::body::to_bytes;
use hyper::header::ACCEPT;
use hyper::service::{make_service_fn, service_fn};
use hyper::upgrade::Upgraded;
@ -29,6 +33,8 @@ use hyper::{
use prometheus::IntCounterVec;
use prometheus::IntGauge;
use prometheus::{Encoder, Histogram, HistogramOpts, IntCounter, Opts, Registry, TextEncoder};
use qrcode::render::svg;
use qrcode::QrCode;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
@ -54,6 +60,8 @@ use tungstenite::error::Error as WsError;
use tungstenite::handshake;
use tungstenite::protocol::Message;
use tungstenite::protocol::WebSocketConfig;
use nostr::key::FromPkStr;
use nostr::key::Keys;
/// Handle arbitrary HTTP requests, including for `WebSocket` upgrades.
#[allow(clippy::too_many_arguments)]
@ -64,11 +72,13 @@ async fn handle_web_request(
remote_addr: SocketAddr,
broadcast: Sender<Event>,
event_tx: tokio::sync::mpsc::Sender<SubmittedEvent>,
payment_tx: tokio::sync::broadcast::Sender<PaymentMessage>,
shutdown: Receiver<()>,
favicon: Option<Vec<u8>>,
registry: Registry,
metrics: NostrMetrics,
) -> Result<Response<Body>, Infallible> {
debug!("{:?}", request);
match (
request.uri().path(),
request.headers().contains_key(header::UPGRADE),
@ -175,6 +185,16 @@ async fn handle_web_request(
}
}
}
// Redirect users to join page when pay to relay enabled
if settings.pay_to_relay.enabled {
return Ok(Response::builder()
.status(StatusCode::TEMPORARY_REDIRECT)
.header("location", "/join")
.body(Body::empty())
.unwrap());
}
Ok(Response::builder()
.status(200)
.header("Content-Type", "text/plain")
@ -210,8 +230,384 @@ async fn handle_web_request(
.unwrap())
}
}
// LN bits callback endpoint for paid invoices
("/lnbits", false) => {
let callback: payment::lnbits::LNBitsCallback =
serde_json::from_slice(&to_bytes(request.into_body()).await.unwrap()).unwrap();
debug!("LNBits callback: {callback:?}");
if let Err(e) = payment_tx.send(PaymentMessage::InvoicePaid(callback.payment_hash)) {
warn!("Could not send invoice update: {}", e);
return Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from("Error processing callback"))
.unwrap());
}
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from("ok"))
.unwrap())
}
// Endpoint for relays terms
("/terms", false) => Ok(Response::builder()
.status(200)
.header("Content-Type", "text/plain")
.body(Body::from(settings.pay_to_relay.terms_message))
.unwrap()),
// Endpoint to allow users to sign up
("/join", false) => {
// Stops sign ups if disabled
if !settings.pay_to_relay.sign_ups {
return Ok(Response::builder()
.status(401)
.header("Content-Type", "text/plain")
.body(Body::from("Sorry, joining is not allowed at the moment"))
.unwrap());
}
let html = r#"
<!doctype HTML>
<head>
<meta charset="UTF-8">
<style>
body {
display: flex;
flex-direction: column;
align-items: center;
text-align: center;
font-family: Arial, sans-serif;
background-color: #6320a7;
color: white;
}
.container {
display: flex;
justify-content: center;
align-items: center;
height: 400px;
}
a {
color: pink;
}
input[type="text"] {
width: 100%;
max-width: 500px;
box-sizing: border-box;
overflow-x: auto;
white-space: nowrap;
}
</style>
</head>
<body>
<div style="width:75%;">
<h1>Enter your pubkey</h1>
<form action="/invoice" onsubmit="return checkForm(this);">
<input type="text" name="pubkey"><br><br>
<input type="checkbox" id="terms" required>
<label for="terms">I agree to the <a href="/terms">terms and conditions</a></label><br><br>
<button type="submit">Submit</button>
</form>
</div>
<script>
function checkForm(form) {
if (!form.terms.checked) {
alert("Please agree to the terms and conditions");
return false;
}
return true;
}
</script>
</body>
</html>
"#;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from(html))
.unwrap())
}
// Endpoint to display invoice
("/invoice", false) => {
// Stops sign ups if disabled
if !settings.pay_to_relay.sign_ups {
return Ok(Response::builder()
.status(401)
.header("Content-Type", "text/plain")
.body(Body::from("Sorry, joining is not allowed at the moment"))
.unwrap());
}
// Get query pubkey from query string
let pubkey = get_pubkey(request);
// Redirect back to join page if no pub key is found in query string
if pubkey.is_none() {
return Ok(Response::builder()
.status(404)
.header("location", "/join")
.body(Body::empty())
.unwrap());
}
// Checks key is valid
let pubkey = pubkey.unwrap();
let key = Keys::from_pk_str(&pubkey);
if key.is_err() {
return Ok(Response::builder()
.status(401)
.header("Content-Type", "text/plain")
.body(Body::from("Looks like your key is invalid"))
.unwrap());
}
// Checks if user is already admitted
let payment_message;
if let Ok((admission_status, _)) = repo.get_account_balance(&key.unwrap()).await {
if admission_status {
return Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from("Already admitted"))
.unwrap());
} else {
payment_message = PaymentMessage::CheckAccount(pubkey.clone());
}
} else {
payment_message = PaymentMessage::NewAccount(pubkey.clone());
}
// Send message on payment channel requesting invoice
if payment_tx.send(payment_message).is_err() {
warn!("Could not send payment tx");
return Ok(Response::builder()
.status(501)
.header("Content-Type", "text/plain")
.body(Body::from("Sorry, something went wrong"))
.unwrap());
}
// wait for message with invoice back that matched the pub key
let mut invoice_info: Option<InvoiceInfo> = None;
while let Ok(msg) = payment_tx.subscribe().recv().await {
match msg {
PaymentMessage::Invoice(m_pubkey, m_invoice_info) => {
if m_pubkey == pubkey.clone() {
invoice_info = Some(m_invoice_info);
break;
}
}
PaymentMessage::AccountAdmitted(m_pubkey) => {
if m_pubkey == pubkey.clone() {
return Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from("Already admitted"))
.unwrap());
}
}
_ => (),
}
}
// Return early if cant get invoice
if invoice_info.is_none() {
return Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from("Sorry, could not get invoice"))
.unwrap());
}
// Since invoice is checked to be not none, unwrap
let invoice_info = invoice_info.unwrap();
let qr_code: String;
if let Ok(code) = QrCode::new(invoice_info.bolt11.as_bytes()) {
qr_code = code
.render()
.min_dimensions(200, 200)
.dark_color(svg::Color("#800000"))
.light_color(svg::Color("#ffff80"))
.build();
} else {
qr_code = "Could not render image".to_string();
}
let html_result = format!(
r#"
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<style>
body {{
display: flex;
flex-direction: column;
align-items: center;
text-align: center;
font-family: Arial, sans-serif;
background-color: #6320a7 ;
color: white;
}}
#copy-button {{
background-color: #bb5f0d ;
color: white;
padding: 10px 20px;
border-radius: 5px;
border: none;
cursor: pointer;
}}
#copy-button:hover {{
background-color: #8f29f4;
}}
.container {{
display: flex;
justify-content: center;
align-items: center;
height: 400px;
}}
a {{
color: pink;
}}
</style>
</head>
<body>
<div style="width:75%;">
<h3>
To use this relay, an admission fee of {} sats is required. By paying the fee, you agree to the <a href='terms'>terms</a>.
</h3>
</div>
<div>
<div style="max-height: 300px;">
{}
</div>
</div>
<div>
<div style="width: 75%;">
<p style="overflow-wrap: break-word; width: 500px;">{}</p>
<button id="copy-button">Copy</button>
</div>
<div>
<p> This page will not refresh </p>
<p> Verify admission <a href=/account?pubkey={}>here</a> once you have paid</p>
</div>
</div>
</body>
</html>
<script>
const copyButton = document.getElementById("copy-button");
if (navigator.clipboard) {{
copyButton.addEventListener("click", function() {{
const textToCopy = "{}";
navigator.clipboard.writeText(textToCopy).then(function() {{
console.log("Text copied to clipboard");
}}, function(err) {{
console.error("Could not copy text: ", err);
}});
}});
}} else {{
copyButton.style.display = "none";
console.warn("Clipboard API is not supported in this browser");
}}
</script>
"#,
settings.pay_to_relay.admission_cost,
qr_code,
invoice_info.bolt11,
pubkey,
invoice_info.bolt11
);
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from(html_result))
.unwrap())
}
("/account", false) => {
// Stops sign ups if disabled
if !settings.pay_to_relay.enabled {
return Ok(Response::builder()
.status(401)
.header("Content-Type", "text/plain")
.body(Body::from("This relay is not paid"))
.unwrap());
}
// Gets the pubkey from query string
let pubkey = get_pubkey(request);
// Redirect back to join page if no pub key is found in query string
if pubkey.is_none() {
return Ok(Response::builder()
.status(404)
.header("location", "/join")
.body(Body::empty())
.unwrap());
}
// Checks key is valid
let pubkey = pubkey.unwrap();
let key = Keys::from_pk_str(&pubkey);
if key.is_err() {
return Ok(Response::builder()
.status(401)
.header("Content-Type", "text/plain")
.body(Body::from("Looks like your key is invalid"))
.unwrap());
}
// Checks if user is already admitted
let text =
if let Ok((admission_status, _)) = repo.get_account_balance(&key.unwrap()).await {
if admission_status {
r#"<span style="color: green;">is</span>"#
} else {
r#"<span style="color: red;">is not</span>"#
}
} else {
"Could not get admission status"
};
let html_result = format!(
r#"
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<style>
body {{
display: flex;
flex-direction: column;
align-items: center;
text-align: center;
font-family: Arial, sans-serif;
background-color: #6320a7;
color: white;
height: 100vh;
}}
</style>
</head>
<body>
<div>
<h5>{} {} admitted</h5>
</div>
</body>
</html>
"#,
pubkey, text
);
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from(html_result))
.unwrap())
}
// later balance
(_, _) => {
//handle any other url
// handle any other url
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("Nothing here."))
@ -220,6 +616,22 @@ async fn handle_web_request(
}
}
// Get pubkey from request query string
fn get_pubkey(request: Request<Body>) -> Option<String> {
let query = request.uri().query().unwrap_or("").to_string();
// Gets the pubkey value from query string
query.split('&').fold(None, |acc, pair| {
let mut parts = pair.splitn(2, '=');
let key = parts.next();
let value = parts.next();
if key == Some("pubkey") {
return value.map(|s| s.to_owned());
}
acc
})
}
fn get_header_string(header: &str, headers: &HeaderMap) -> Option<String> {
headers
.get(header)
@ -423,7 +835,10 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
// metadata events.
let (metadata_tx, metadata_rx) = broadcast::channel::<Event>(4096);
let (payment_tx, payment_rx) = broadcast::channel::<PaymentMessage>(4096);
let (registry, metrics) = create_metrics();
// build a repository for events
let repo = db::build_repo(&settings, metrics.clone()).await;
// start the database writer task. Give it a channel for
@ -435,6 +850,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
event_rx,
bcast_tx.clone(),
metadata_tx.clone(),
payment_tx.clone(),
shutdown_listen,
));
info!("db writer created");
@ -457,6 +873,23 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
}
}
// Create payments thread if pay to relay enabled
if settings.pay_to_relay.enabled {
let payment_opt = payment::Payment::new(
repo.clone(),
payment_tx.clone(),
payment_rx,
bcast_tx.clone(),
settings.clone(),
);
if let Ok(mut p) = payment_opt {
tokio::task::spawn(async move {
info!("starting payment process ...");
p.run().await;
});
}
}
// listen for (external to tokio) shutdown request
let controlled_shutdown = invoke_shutdown.clone();
tokio::spawn(async move {
@ -498,6 +931,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
let remote_addr = conn.remote_addr();
let bcast = bcast_tx.clone();
let event = event_tx.clone();
let payment_tx = payment_tx.clone();
let stop = invoke_shutdown.clone();
let settings = settings.clone();
let favicon = favicon.clone();
@ -513,6 +947,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
remote_addr,
bcast.clone(),
event.clone(),
payment_tx.clone(),
stop.subscribe(),
favicon.clone(),
registry.clone(),