diff --git a/.gitignore b/.gitignore index a62c7a1..d9f2b5d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ **/target/ nostr.db +nostr.db-* +justfile \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index f82e754..0c7c1ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "aes" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "433cfd6710c9986c576a25ca913c39d66a6474107b406f34f91d4a8923395241" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + [[package]] name = "ahash" version = "0.4.7" @@ -286,6 +297,18 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445" +[[package]] +name = "bitcoin" +version = "0.29.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0694ea59225b0c5f3cb405ff3f670e4828358ed26aec49dc352f730f0cb1a8a3" +dependencies = [ + "bech32", + "bitcoin_hashes 0.11.0", + "secp256k1 0.24.3", + "serde", +] + [[package]] name = "bitcoin_hashes" version = "0.10.0" @@ -295,6 +318,15 @@ dependencies = [ "serde", ] +[[package]] +name = "bitcoin_hashes" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90064b8dee6815a6470d60bad07bbbaee885c0e12d04177138fa3291a01b7bc4" +dependencies = [ + "serde", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -310,6 +342,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-padding" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a90ec2df9600c28a01c56c4784c9207a96d2451833aeceb8cc97e4c9548bb78" +dependencies = [ + "generic-array", +] + [[package]] name = "blocking" version = "1.3.0" @@ -342,6 +383,15 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +[[package]] +name = "cbc" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" +dependencies = [ + "cipher", +] + [[package]] name = "cc" version = "1.0.79" @@ -354,6 +404,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "checked_int_cast" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17cc5e6b5ab06331c33589842070416baa137e8b0eb912b008cfd4a78ada7919" + [[package]] name = "chrono" version = "0.4.23" @@ -369,6 +425,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "cipher" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1873270f8f7942c191139cb8a40fd228da6c3fd2fc376d7e92d47aa14aeb59e" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "clap" version = "4.1.4" @@ -973,8 +1039,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi 0.11.0+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -1282,6 +1350,16 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "inout" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" +dependencies = [ + "block-padding", + "generic-array", +] + [[package]] name = "instant" version = "0.1.12" @@ -1289,6 +1367,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" dependencies = [ "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", ] [[package]] @@ -1548,6 +1629,27 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" +[[package]] +name = "nostr" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28f6282699b1a163c0ac3f12cc1297974d0f60602fa057a3e047a6439ed741b0" +dependencies = [ + "aes", + "base64 0.21.0", + "bitcoin", + "cbc", + "getrandom", + "instant", + "log", + "once_cell", + "regex", + "serde", + "serde_json", + "thiserror", + "url", +] + [[package]] name = "nostr-rs-relay" version = "0.8.8" @@ -1556,7 +1658,7 @@ dependencies = [ "async-std", "async-trait", "bech32", - "bitcoin_hashes", + "bitcoin_hashes 0.10.0", "chrono", "clap", "config", @@ -1572,15 +1674,17 @@ dependencies = [ "indicatif", "lazy_static", "nonzero_ext", + "nostr", "parse_duration", "prometheus", "prost", + "qrcode", "r2d2", "r2d2_sqlite", "rand 0.8.5", "regex", "rusqlite", - "secp256k1", + "secp256k1 0.21.3", "serde", "serde_json", "sqlx", @@ -2076,6 +2180,15 @@ version = "2.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +[[package]] +name = "qrcode" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d2f1455f3630c6e5107b4f2b94e74d76dea80736de0981fd27644216cff57f" +dependencies = [ + "checked_int_cast", +] + [[package]] name = "quanta" version = "0.9.3" @@ -2475,9 +2588,21 @@ version = "0.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c42e6f1735c5f00f51e43e28d6634141f2bcad10931b2609ddd74a86d751260" dependencies = [ - "bitcoin_hashes", + "bitcoin_hashes 0.10.0", "rand 0.6.5", - "secp256k1-sys", + "secp256k1-sys 0.4.2", + "serde", +] + +[[package]] +name = "secp256k1" +version = "0.24.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b1629c9c557ef9b293568b338dddfc8208c98a18c59d722a9d53f859d9c9b62" +dependencies = [ + "bitcoin_hashes 0.11.0", + "rand 0.8.5", + "secp256k1-sys 0.6.1", "serde", ] @@ -2490,6 +2615,15 @@ dependencies = [ "cc", ] +[[package]] +name = "secp256k1-sys" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83080e2c2fc1006e625be82e5d1eb6a43b7fd9578b617fcc55814daf286bba4b" +dependencies = [ + "cc", +] + [[package]] name = "security-framework" version = "2.8.2" @@ -3254,6 +3388,7 @@ dependencies = [ "form_urlencoded", "idna", "percent-encoding", + "serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 8a8a4ca..aeddb05 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,8 @@ prometheus = "0.13.3" indicatif = "0.17.3" bech32 = "0.9.1" url = "2.3.1" +qrcode = { version = "0.12.0", default-features = false, features = ["svg"] } +nostr = { version = "0.18.0", default-features = false, features = ["base", "nip04", "nip19"] } [dev-dependencies] anyhow = "1" diff --git a/config.toml b/config.toml index 4d8d3f7..be1c02e 100644 --- a/config.toml +++ b/config.toml @@ -52,6 +52,7 @@ description = "A newly created nostr-rs-relay.\n\nCustomize this with your own i # sqlite. #connection = "postgresql://postgres:nostr@localhost:7500/nostr" + [grpc] # gRPC interfaces for externalized decisions and other extensions to # functionality. @@ -173,3 +174,37 @@ reject_future_seconds = 1800 # How many consecutive failed checks before we give up on verifying # this author. #max_consecutive_failures = 20 + +[pay_to_relay] +# Enable pay to relay +enabled = true +# The cost to be admitted to relay +admission_cost = 1000 +# The cost in sats per post +cost_per_event = 0 +# Url of lnbits api +node_url = "" +# LNBits api secret +api_secret = "" +# Terms of service +terms_message = """ +This service (and supporting services) are provided "as is", without warranty of any kind, express or implied. + +By using this service, you agree: +* Not to engage in spam or abuse the relay service +* Not to disseminate illegal content +* That requests to delete content cannot be guaranteed +* To use the service in compliance with all applicable laws +* To grant necessary rights to your content for unlimited time +* To be of legal age and have capacity to use this service +* That the service may be terminated at any time without notice +* That the content you publish may be removed at any time without notice +* To have your IP address collected to detect abuse or misuse +* To cooperate with the relay to combat abuse or misuse +* You may be exposed to content that you might find triggering or distasteful +* The relay operator is not liable for content produced by users of the relay +""" +# Whether or not new sign ups should be allowed +sign_ups = true +secret_key = "" + diff --git a/docs/pay-to-relay.md b/docs/pay-to-relay.md new file mode 100644 index 0000000..c4ccb4f --- /dev/null +++ b/docs/pay-to-relay.md @@ -0,0 +1,85 @@ +# Pay to Relay Design Document + +The relay with use payment as a form of spam prevention. In order to post to the relay a user must pay a set rate. There is also the option to require a payment for each note posted to the relay. There is no cost to read from the relay. + +## Configuration + +Currently, [LNBits](https://github.com/lnbits/lnbits) is implemented as the payment processor. LNBits exposes a simple API for creating invoices, to use this API create a wallet and on the right side find "API info" you will need to add the invoice/read key to this relays config file. + +The below configuration will need to be added to config.toml +``` +[pay_to_relay] +# Enable pay to relay +enabled = true +# The cost to be admitted to relay +admission_cost = 1000 +# The cost in sats per post +cost_per_event = 0 +# Url of lnbits api +node_url = "https://:5001/api/v1/payments" +# LNBits api secret +api_secret = "" +# Terms of service +terms_message = """This service .... +""" +# Whether or not new sign ups should be allowed +sign_ups = true +secret_key = "" +``` + +The LNBits instance must have a signed HTTPS a self signed certificate will not work. + +## Design Overview + +### Concepts + +All authors are initially not admitted to write to the relay. There are two ways to gain access write to the relay. The first is by attempting to post the the relay, upon receiving an event from an author that is not admitted, the relay will send a direct message including the terms of service of the relay and a lighting invoice for the admission cost. Once this invoice is payed the author can write to the relay. For this method to work the author must be reading from the relay. An author can also pay and accept the terms of service via a webpage `https:///join`. + +## Design Details + +Authors are stored in a dedicated table. This tracks: + +* `pubkey` +* `is_admitted` whether on no the admission invoice has been paid, accepting the terms of service. +* `balance` the current balance in sats of the author, used if there is a cost per post +* `tos_accepted_at` the timestamp of when the author accepted the tos + +Invoice information is stored in a dedicated table. This tracks: +* `payment_hash` the payment hash of the lighting invoice +* `pubkey` of the author the invoice is issued to +* `invoice` bolt11 invoice +* `amount` in sats +* `status` (Paid/Unpaid/Expired) +* `description` +* `created_at` timestamp of creation +* `confirmed_at` timestamp of payment + +### Event Handling + +If "pay to relay" is enabled, all incoming events are evaluated to determine whether the author is on the relay's whitelist or if they have paid the admission fee and accepted the terms. If "pay per note" is enabled, there is an additional check to ensure that the author has enough balance, which is then reduced by the cost per note. If the author is on the whitelist, this balance check is not necessary. + +### Integration + +We have an existing database writer thread, which receives events and +attempts to persist them to disk. Once validated and persisted, these +events are broadcast to all subscribers. + +When "pay to relay" is enabled, the writer must check if the author is admitted to post. If the author is not admitted to post the event is forwarded to the payment module. Where an invoice is generated, persisted and broadcast as an direct message to the author. + +### Threat Scenarios + +Some of these mitigation's are fully implemented, others are documented +simply to demonstrate a mitigation is possible. + +### Sign up Spamming + +*Threat*: An attacker generates a large number of new pubkeys publishing to the relays. Causing a large number of new invoices to be created for each new pubkey. + +*Mitigation*: Rate limit number of new sign ups + +### Admitted Author Spamming + +*Threat*: An attacker gains write access by paying the admission fee, and then floods the relay with a large number of spam events. + +*Mitigation*: The attacker's admission can be revoked and their admission fee will not be refunded. Enabling "cost per event" and increasing the admission cost can also discourage this type of behavior. + diff --git a/src/config.rs b/src/config.rs index ff31f2a..b40423b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -4,6 +4,8 @@ use serde::{Deserialize, Serialize}; use std::time::Duration; use tracing::warn; +use crate::payment::Processor; + #[derive(Debug, Serialize, Deserialize, Clone)] #[allow(unused)] pub struct Info { @@ -80,6 +82,20 @@ pub struct Authorization { pub nip42_auth: bool, // if true enables NIP-42 authentication } +#[derive(Debug, Clone, Serialize, Deserialize)] +#[allow(unused)] +pub struct PayToRelay { + pub enabled: bool, + pub admission_cost: u64, // Cost to have pubkey whitelisted + pub cost_per_event: u64, // Cost author to pay per event + pub node_url: String, + pub api_secret: String, + pub terms_message: String, + pub sign_ups: bool, // allow new users to sign up to relay + pub secret_key: String, + pub processor: Processor, +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[allow(unused)] pub struct Diagnostics { @@ -158,6 +174,7 @@ pub struct Settings { pub network: Network, pub limits: Limits, pub authorization: Authorization, + pub pay_to_relay: PayToRelay, pub verified_users: VerifiedUsers, pub retention: Retention, pub options: Options, @@ -209,6 +226,16 @@ impl Settings { ); // initialize durations for verified users settings.verified_users.init(); + + // Validate pay to relay settings + if settings.pay_to_relay.enabled { + assert_ne!(settings.pay_to_relay.api_secret, ""); + // Should check that url is valid + assert_ne!(settings.pay_to_relay.node_url, ""); + assert_ne!(settings.pay_to_relay.terms_message, ""); + assert_ne!(settings.pay_to_relay.secret_key, ""); + } + Ok(settings) } } @@ -259,6 +286,17 @@ impl Default for Settings { pubkey_whitelist: None, // Allow any address to publish nip42_auth: false, // Disable NIP-42 authentication }, + pay_to_relay: PayToRelay { + enabled: false, + admission_cost: 4200, + cost_per_event: 0, + terms_message: "".to_string(), + node_url: "".to_string(), + api_secret: "".to_string(), + sign_ups: false, + secret_key: "".to_string(), + processor: Processor::LNBits, + }, verified_users: VerifiedUsers { mode: VerifiedUsersMode::Disabled, domain_whitelist: None, diff --git a/src/db.rs b/src/db.rs index a52199a..b020187 100644 --- a/src/db.rs +++ b/src/db.rs @@ -4,6 +4,7 @@ use crate::error::{Error, Result}; use crate::event::Event; use crate::nauthz; use crate::notice::Notice; +use crate::payment::PaymentMessage; use crate::repo::postgres::{PostgresPool, PostgresRepo}; use crate::repo::sqlite::SqliteRepo; use crate::repo::NostrRepo; @@ -19,6 +20,8 @@ use std::thread; use std::time::{Duration, Instant}; use tracing::log::LevelFilter; use tracing::{debug, info, trace, warn}; +use nostr::key::FromPkStr; +use nostr::key::Keys; pub type SqlitePool = r2d2::Pool; pub type PooledConnection = r2d2::PooledConnection; @@ -83,6 +86,7 @@ pub async fn db_writer( mut event_rx: tokio::sync::mpsc::Receiver, bcast_tx: tokio::sync::broadcast::Sender, metadata_tx: tokio::sync::broadcast::Sender, + payment_tx: tokio::sync::broadcast::Sender, mut shutdown: tokio::sync::broadcast::Receiver<()>, ) -> Result<()> { // are we performing NIP-05 checking? @@ -90,6 +94,10 @@ pub async fn db_writer( // are we requriing NIP-05 user verification? let nip05_enabled = settings.verified_users.is_enabled(); + let pay_to_relay_enabled = settings.pay_to_relay.enabled; + let cost_per_event = settings.pay_to_relay.cost_per_event; + debug!("Pay to relay: {}", pay_to_relay_enabled); + //upgrade_db(&mut pool.get()?)?; // Make a copy of the whitelist @@ -136,24 +144,6 @@ pub async fn db_writer( let subm_event = next_event.unwrap(); let event = subm_event.event; let notice_tx = subm_event.notice_tx; - // check if this event is authorized. - if let Some(allowed_addrs) = whitelist { - // TODO: incorporate delegated pubkeys - // if the event address is not in allowed_addrs. - if !allowed_addrs.contains(&event.pubkey) { - debug!( - "rejecting event: {}, unauthorized author", - event.get_event_id_prefix() - ); - notice_tx - .try_send(Notice::blocked( - event.id, - "pubkey is not allowed to publish to this relay", - )) - .ok(); - continue; - } - } // Check that event kind isn't blacklisted let kinds_blacklist = &settings.limits.event_kind_blacklist.clone(); @@ -187,6 +177,91 @@ pub async fn db_writer( } } + // Set to none until balance is got from db + // Will stay none if user in whitelisted and does not have to pay to post + // When pay to relay is enabled the whitelist is not a list of who can post + // It is a list of who can post for free + let mut user_balance: Option = None; + if !pay_to_relay_enabled { + // check if this event is authorized. + if let Some(allowed_addrs) = whitelist { + // TODO: incorporate delegated pubkeys + // if the event address is not in allowed_addrs. + if !allowed_addrs.contains(&event.pubkey) { + debug!( + "rejecting event: {}, unauthorized author", + event.get_event_id_prefix() + ); + notice_tx + .try_send(Notice::blocked( + event.id, + "pubkey is not allowed to publish to this relay", + )) + .ok(); + continue; + } + } + } else { + // If the user is on whitelist there is no need to check if the user is admitted or has balance to post + if whitelist.is_none() + || (whitelist.is_some() && !whitelist.as_ref().unwrap().contains(&event.pubkey)) + { + let key = Keys::from_pk_str(&event.pubkey).unwrap(); + match repo.get_account_balance(&key).await { + Ok((user_admitted, balance)) => { + // Checks to make sure user is admitted + if !user_admitted { + debug!("user: {}, is not admitted", &event.pubkey); + + // If the user is in DB but not admitted + // Send meeage to payment thread to check if outstanding invoice has been paid + payment_tx + .send(PaymentMessage::CheckAccount(event.pubkey)) + .ok(); + notice_tx + .try_send(Notice::blocked(event.id, "User is not admitted")) + .ok(); + continue; + } + + // Checks that user has enough balance to post + // TODO: this should send an invoice to user to top up + if balance < cost_per_event { + debug!("user: {}, does not have a balance", &event.pubkey,); + notice_tx + .try_send(Notice::blocked(event.id, "Insufficient balance")) + .ok(); + continue; + } + user_balance = Some(balance); + debug!("User balance: {:?}", user_balance); + } + Err( + Error::SqlError(rusqlite::Error::QueryReturnedNoRows) + | Error::SqlxError(sqlx::Error::RowNotFound), + ) => { + // User does not exist + info!("Unregistered user"); + if settings.pay_to_relay.sign_ups { + payment_tx + .send(PaymentMessage::NewAccount(event.pubkey)) + .ok(); + } + let msg = "Pubkey not registered"; + notice_tx.try_send(Notice::error(event.id, msg)).ok(); + continue; + } + Err(err) => { + warn!("Error checking admission status: {:?}", err); + let msg = "relay experienced an error checking your admission status"; + notice_tx.try_send(Notice::error(event.id, msg)).ok(); + // Other error + continue; + } + } + } + } + // send any metadata events to the NIP-05 verifier if nip05_active && event.is_kind_metadata() { // we are sending this prior to even deciding if we @@ -335,6 +410,17 @@ pub async fn db_writer( // use rate limit, if defined, and if an event was actually written. if event_write { + // If pay to relay is diabaled or the cost per event is 0 + // No need to update user balance + if pay_to_relay_enabled && cost_per_event > 0 { + // If the user balance is some, user was not on whitelist + // Their balance should be reduced by the cost per event + if let Some(_balance) = user_balance { + let pubkey = Keys::from_pk_str(&event.pubkey)?; + repo.update_account_balance(&pubkey, false, cost_per_event) + .await?; + } + } if let Some(ref lim) = lim_opt { if let Err(n) = lim.check() { let wait_for = n.wait_time_from(clock.now()); diff --git a/src/error.rs b/src/error.rs index 790486f..ecfa97f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -72,6 +72,16 @@ pub enum Error { AuthFailure, #[error("I/O Error")] IoError(std::io::Error), + #[error("Event builder error")] + EventError(nostr::event::builder::Error), + #[error("Nostr key error")] + NostrKeyError(nostr::key::Error), + #[error("Payment hash mismatch")] + PaymentHash, + #[error("Error parsing url")] + URLParseError(url::ParseError), + #[error("HTTP error")] + HTTPError(http::Error), #[error("Unknown/Undocumented")] UnknownError, } @@ -153,3 +163,30 @@ impl From for Error { Error::IoError(r) } } +impl From for Error { + /// Wrap event builder error + fn from(r: nostr::event::builder::Error) -> Self { + Error::EventError(r) + } +} + +impl From for Error { + /// Wrap nostr key error + fn from(r: nostr::key::Error) -> Self { + Error::NostrKeyError(r) + } +} + +impl From for Error { + /// Wrap nostr key error + fn from(r: url::ParseError) -> Self { + Error::URLParseError(r) + } +} + +impl From for Error { + /// Wrap nostr key error + fn from(r: http::Error) -> Self { + Error::HTTPError(r) + } +} diff --git a/src/event.rs b/src/event.rs index 6f5fde3..6d16fc3 100644 --- a/src/event.rs +++ b/src/event.rs @@ -424,6 +424,22 @@ impl Event { } } +impl From for Event { + fn from(nostr_event: nostr::Event) -> Self { + Event { + id: nostr_event.id.to_hex(), + pubkey: nostr_event.pubkey.to_string(), + created_at: nostr_event.created_at.as_u64(), + kind: nostr_event.kind.as_u64(), + tags: nostr_event.tags.iter().map(|x| x.as_vec()).collect(), + content: nostr_event.content, + sig: nostr_event.sig.to_string(), + delegated_by: None, + tagidx: None, + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/info.rs b/src/info.rs index 5aae143..d2233ca 100644 --- a/src/info.rs +++ b/src/info.rs @@ -4,6 +4,32 @@ use crate::config::Settings; use serde::{Deserialize, Serialize}; pub const CARGO_PKG_VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION"); +pub const UNIT: &str = "sats"; + +/// Limitations of the relay as specified in NIP-111 +/// (This nip isn't finalized so may change) +#[derive(Debug, Serialize, Deserialize)] +#[allow(unused)] +pub struct Limitation { + #[serde(skip_serializing_if = "Option::is_none")] + payment_required: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +#[allow(unused)] +pub struct Fees { + #[serde(skip_serializing_if = "Option::is_none")] + admission: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + publication: Option>, +} + +#[derive(Serialize, Deserialize, Debug)] +#[allow(unused)] +pub struct Fee { + amount: u64, + unit: String, +} #[derive(Debug, Serialize, Deserialize)] #[allow(unused)] @@ -24,6 +50,12 @@ pub struct RelayInfo { pub software: Option, #[serde(skip_serializing_if = "Option::is_none")] pub version: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub limitation: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub payment_url: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub fees: Option, } /// Convert an Info configuration into public Relay Info @@ -37,6 +69,48 @@ impl From for RelayInfo { } let i = c.info; + let p = c.pay_to_relay; + + let limitations = Limitation { + payment_required: Some(p.enabled), + }; + + let (payment_url, fees) = if p.enabled { + let admission_fee = if p.admission_cost > 0 { + Some(vec![Fee { + amount: p.admission_cost, + unit: UNIT.to_string(), + }]) + } else { + None + }; + + let post_fee = if p.cost_per_event > 0 { + Some(vec![Fee { + amount: p.cost_per_event, + unit: UNIT.to_string(), + }]) + } else { + None + }; + + let fees = Fees { + admission: admission_fee, + publication: post_fee, + }; + + let payment_url = if p.enabled && i.relay_url.is_some() { + Some(format!( + "{}join", + i.relay_url.clone().unwrap().replace("ws", "http") + )) + } else { + None + }; + (payment_url, Some(fees)) + } else { + (None, None) + }; RelayInfo { id: i.relay_url, @@ -47,6 +121,9 @@ impl From for RelayInfo { supported_nips: Some(supported_nips), software: Some("https://git.sr.ht/~gheartsfield/nostr-rs-relay".to_owned()), version: CARGO_PKG_VERSION.map(std::borrow::ToOwned::to_owned), + limitation: Some(limitations), + payment_url, + fees, } } } diff --git a/src/lib.rs b/src/lib.rs index ba496b9..7f2c25d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,4 +15,5 @@ pub mod repo; pub mod subscription; pub mod utils; // Public API for creating relays programatically +pub mod payment; pub mod server; diff --git a/src/payment/lnbits.rs b/src/payment/lnbits.rs new file mode 100644 index 0000000..b9acbcd --- /dev/null +++ b/src/payment/lnbits.rs @@ -0,0 +1,173 @@ +//! LNBits payment processor +use http::Uri; +use hyper::client::connect::HttpConnector; +use hyper::Client; +use hyper_tls::HttpsConnector; +use nostr::Keys; +use serde::{Deserialize, Serialize}; + +use async_trait::async_trait; +use rand::Rng; +use tracing::debug; + +use std::str::FromStr; +use url::Url; + +use crate::{config::Settings, error::Error}; + +use super::{InvoiceInfo, InvoiceStatus, PaymentProcessor}; + +const APIPATH: &str = "/api/v1/payments/"; + +/// Info LNBits expects in create invoice request +#[derive(Serialize, Deserialize, Debug)] +pub struct LNBitsCreateInvoice { + out: bool, + amount: u64, + memo: String, + webhook: String, + unit: String, + internal: bool, + expiry: u64, +} + +/// Invoice response for LN bits +#[derive(Debug, Serialize, Deserialize)] +pub struct LNBitsCreateInvoiceResponse { + payment_hash: String, + payment_request: String, +} + +/// LNBits call back response +/// Used when an invoice is paid +/// lnbits to post the status change to relay +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct LNBitsCallback { + pub checking_id: String, + pub pending: bool, + pub amount: u64, + pub memo: String, + pub time: u64, + pub bolt11: String, + pub preimage: String, + pub payment_hash: String, + pub wallet_id: String, + pub webhook: String, + pub webhook_status: Option, +} + +/// LN Bits repose for check invoice endpoint +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct LNBitsCheckInvoiceResponse { + paid: bool, +} + +#[derive(Clone)] +pub struct LNBitsPaymentProcessor { + /// HTTP client + client: hyper::Client, hyper::Body>, + settings: Settings, +} + +impl LNBitsPaymentProcessor { + pub fn new(settings: &Settings) -> Self { + // setup hyper client + let https = HttpsConnector::new(); + let client = Client::builder().build::<_, hyper::Body>(https); + + Self { + client, + settings: settings.clone(), + } + } +} + +#[async_trait] +impl PaymentProcessor for LNBitsPaymentProcessor { + /// Calls LNBits api to ger new invoice + async fn get_invoice(&self, key: &Keys, amount: u64) -> Result { + let random_number: u16 = rand::thread_rng().gen(); + let memo = format!("{}: {}", random_number, key.public_key()); + + let callback_url = Url::parse( + &self + .settings + .info + .relay_url + .clone() + .unwrap() + .replace("ws", "http"), + )? + .join("lnbits")?; + + let body = LNBitsCreateInvoice { + out: false, + amount, + memo: memo.clone(), + webhook: callback_url.to_string(), + unit: "sat".to_string(), + internal: false, + expiry: 3600, + }; + let url = Url::parse(&self.settings.pay_to_relay.node_url)?.join(APIPATH)?; + let uri = Uri::from_str(url.as_str().strip_suffix("/").unwrap_or(url.as_str())).unwrap(); + debug!("{uri}"); + + let req = hyper::Request::builder() + .method(hyper::Method::POST) + .uri(uri) + .header("X-Api-Key", &self.settings.pay_to_relay.api_secret) + .body(hyper::Body::from(serde_json::to_string(&body)?)) + .expect("request builder"); + + let res = self.client.request(req).await?; + + debug!("{res:?}"); + + // Json to Struct of LNbits callback + let body = hyper::body::to_bytes(res.into_body()).await?; + let invoice_response: LNBitsCreateInvoiceResponse = serde_json::from_slice(&body)?; + + debug!("{:?}", invoice_response); + + Ok(InvoiceInfo { + pubkey: key.public_key().to_string(), + payment_hash: invoice_response.payment_hash, + bolt11: invoice_response.payment_request, + amount, + memo, + status: InvoiceStatus::Unpaid, + confirmed_at: None, + }) + } + + /// Calls LNBits Api to check the payment status of invoice + async fn check_invoice(&self, payment_hash: &str) -> Result { + let url = Url::parse(&self.settings.pay_to_relay.node_url)? + .join(APIPATH)? + .join(payment_hash)?; + let uri = Uri::from_str(url.as_str()).unwrap(); + debug!("{uri}"); + + let req = hyper::Request::builder() + .method(hyper::Method::GET) + .uri(uri) + .header("X-Api-Key", &self.settings.pay_to_relay.api_secret) + .body(hyper::Body::empty()) + .expect("request builder"); + + let res = self.client.request(req).await?; + // Json to Struct of LNbits callback + let body = hyper::body::to_bytes(res.into_body()).await?; + debug!("check invoice: {body:?}"); + let invoice_response: LNBitsCheckInvoiceResponse = serde_json::from_slice(&body)?; + + let status = if invoice_response.paid { + InvoiceStatus::Paid + } else { + InvoiceStatus::Unpaid + }; + + Ok(status) + } +} diff --git a/src/payment/mod.rs b/src/payment/mod.rs new file mode 100644 index 0000000..7247638 --- /dev/null +++ b/src/payment/mod.rs @@ -0,0 +1,261 @@ +use crate::error::{Error, Result}; +use crate::event::Event; +use crate::payment::lnbits::LNBitsPaymentProcessor; +use crate::repo::NostrRepo; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tracing::{info, warn}; + +use async_trait::async_trait; +use nostr::key::{FromPkStr, FromSkStr}; +use nostr::{key::Keys, Event as NostrEvent, EventBuilder}; + +pub mod lnbits; + +/// Payment handler +pub struct Payment { + /// Repository for saving/retrieving events and events + repo: Arc, + /// Newly validated events get written and then broadcast on this channel to subscribers + event_tx: tokio::sync::broadcast::Sender, + /// Payment message sender + payment_tx: tokio::sync::broadcast::Sender, + /// Payment message receiver + payment_rx: tokio::sync::broadcast::Receiver, + /// Settings + settings: crate::config::Settings, + // Nostr Keys + nostr_keys: Keys, + /// Payment Processor + processor: Arc, +} + +#[async_trait] +pub trait PaymentProcessor: Send + Sync { + /// Get invoice from processor + async fn get_invoice(&self, keys: &Keys, amount: u64) -> Result; + /// Check payment status of an invoice + async fn check_invoice(&self, payment_hash: &str) -> Result; +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub enum Processor { + LNBits, +} + +/// Possible states of an invoice +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, sqlx::Type)] +#[sqlx(type_name = "status")] +pub enum InvoiceStatus { + Unpaid, + Paid, + Expired, +} + +impl ToString for InvoiceStatus { + fn to_string(&self) -> String { + match self { + InvoiceStatus::Paid => "Paid".to_string(), + InvoiceStatus::Unpaid => "Unpaid".to_string(), + InvoiceStatus::Expired => "Expired".to_string(), + } + } +} + +/// Invoice information +#[derive(Debug, Clone)] +pub struct InvoiceInfo { + pub pubkey: String, + pub payment_hash: String, + pub bolt11: String, + pub amount: u64, + pub status: InvoiceStatus, + pub memo: String, + pub confirmed_at: Option, +} + +/// Message variants for the payment channel +#[derive(Debug, Clone)] +pub enum PaymentMessage { + /// New account + NewAccount(String), + /// Check account, + CheckAccount(String), + /// Account Admitted + AccountAdmitted(String), + /// Invoice generated + Invoice(String, InvoiceInfo), + /// Invoice call back + /// Payment hash is passed + // This may have to be changed to better support other processors + InvoicePaid(String), +} + +impl Payment { + pub fn new( + repo: Arc, + payment_tx: tokio::sync::broadcast::Sender, + payment_rx: tokio::sync::broadcast::Receiver, + event_tx: tokio::sync::broadcast::Sender, + settings: crate::config::Settings, + ) -> Result { + info!("Create payment handler"); + + // Create nostr key from sk string + let nostr_keys = Keys::from_sk_str(&settings.pay_to_relay.secret_key)?; + + // Create processor kind defined in settings + let processor = match &settings.pay_to_relay.processor { + Processor::LNBits => Arc::new(LNBitsPaymentProcessor::new(&settings)), + }; + + Ok(Payment { + repo, + payment_tx, + payment_rx, + event_tx, + settings, + nostr_keys, + processor, + }) + } + + /// Perform Payment tasks + pub async fn run(&mut self) { + loop { + let res = self.run_internal().await; + if let Err(e) = res { + info!("error in payment: {:?}", e); + } + } + } + + /// Internal select loop for preforming payment operatons + async fn run_internal(&mut self) -> Result<()> { + tokio::select! { + m = self.payment_rx.recv() => { + match m { + Ok(PaymentMessage::NewAccount(pubkey)) => { + info!("payment event for {:?}", pubkey); + // REVIEW: This will need to change for cost per event + let amount = self.settings.pay_to_relay.admission_cost; + let invoice_info = self.get_invoice_info(&pubkey, amount).await?; + // TODO: should handle this error + self.payment_tx.send(PaymentMessage::Invoice(pubkey, invoice_info)).ok(); + }, + // Gets the most recent unpaid invoice from database + // Checks LNbits to verify if paid/unpaid + Ok(PaymentMessage::CheckAccount(pubkey)) => { + let keys = Keys::from_pk_str(&pubkey)?; + + if let Some(invoice_info) = self.repo.get_unpaid_invoice(&keys).await? { + match self.check_invoice_status(&invoice_info.payment_hash).await? { + InvoiceStatus::Paid => { + self.repo.admit_account(&keys, self.settings.pay_to_relay.admission_cost).await?; + self.payment_tx.send(PaymentMessage::AccountAdmitted(pubkey)).ok(); + } + _ => { + self.payment_tx.send(PaymentMessage::Invoice(pubkey, invoice_info)).ok(); + } + } + } + } + Ok(PaymentMessage::InvoicePaid(payment_hash)) => { + if self.check_invoice_status(&payment_hash).await?.eq(&InvoiceStatus::Paid) { + let pubkey = self.repo + .update_invoice(&payment_hash, InvoiceStatus::Paid) + .await?; + + let key = Keys::from_pk_str(&pubkey)?; + self.repo.admit_account(&key, self.settings.pay_to_relay.admission_cost).await?; + } + } + Ok(_) => { + // For this variant nothing need to be done here + // it is used by `server` + } + Err(err) => warn!("Payment RX: {err}") + } + } + } + + Ok(()) + } + + /// Sends Nostr DM to pubkey that requested invoice + /// Two events the terms followed by the bolt11 invoice + pub async fn send_admission_message( + &self, + pubkey: &str, + invoice_info: &InvoiceInfo, + ) -> Result<()> { + // Create Nostr key from pk + let key = Keys::from_pk_str(pubkey)?; + + let pubkey = key.public_key(); + + // Event DM with terms of service + let message_event: NostrEvent = EventBuilder::new_encrypted_direct_msg( + &self.nostr_keys, + pubkey, + &self.settings.pay_to_relay.terms_message, + )? + .to_event(&self.nostr_keys)?; + + // Event DM with invoice + let invoice_event: NostrEvent = + EventBuilder::new_encrypted_direct_msg(&self.nostr_keys, pubkey, &invoice_info.bolt11)? + .to_event(&self.nostr_keys)?; + + // Persist DM events to DB + self.repo.write_event(&message_event.clone().into()).await?; + self.repo.write_event(&invoice_event.clone().into()).await?; + + // Broadcast DM events + self.event_tx.send(message_event.clone().into()).ok(); + self.event_tx.send(invoice_event.clone().into()).ok(); + + Ok(()) + } + + /// Get Invoice Info + /// If the has an active invoice that will be return + /// Otherwise a new invoice will be generated by the payment processor + pub async fn get_invoice_info(&self, pubkey: &str, amount: u64) -> Result { + // If user is already in DB this will be false + // This avoids recreating admission invoices + // I think it will continue to send DMs with the invoice + // If client continues to try and write to the relay (will be same invoice) + let key = Keys::from_pk_str(pubkey)?; + if !self.repo.create_account(&key).await? { + if let Ok(Some(invoice_info)) = self.repo.get_unpaid_invoice(&key).await { + return Ok(invoice_info); + } + } + + let key = Keys::from_pk_str(pubkey)?; + + let invoice_info = self.processor.get_invoice(&key, amount).await?; + + // Persist invoice to DB + self.repo + .create_invoice_record(&key, invoice_info.clone()) + .await?; + + // Admission event invoice and terms to pubkey that is joining + self.send_admission_message(pubkey, &invoice_info).await?; + + Ok(invoice_info) + } + + /// Check paid status of invoice with LNbits + pub async fn check_invoice_status(&self, payment_hash: &str) -> Result { + // Check base if passed expiry time + let status = self.processor.check_invoice(payment_hash).await?; + self.repo + .update_invoice(payment_hash, status.clone()) + .await?; + + Ok(status) + } +} diff --git a/src/repo/mod.rs b/src/repo/mod.rs index a0ddea6..b4dafb9 100644 --- a/src/repo/mod.rs +++ b/src/repo/mod.rs @@ -2,9 +2,11 @@ use crate::db::QueryResult; use crate::error::Result; use crate::event::Event; use crate::nip05::VerificationRecord; +use crate::payment::{InvoiceInfo, InvoiceStatus}; use crate::subscription::Subscription; use crate::utils::unix_time; use async_trait::async_trait; +use nostr::Keys; use rand::Rng; pub mod postgres; @@ -57,6 +59,33 @@ pub trait NostrRepo: Send + Sync { /// Get oldest verification before timestamp async fn get_oldest_user_verification(&self, before: u64) -> Result; + + /// Create a new account + async fn create_account(&self, pubkey: &Keys) -> Result; + + /// Admit an account + async fn admit_account(&self, pubkey: &Keys, admission_cost: u64) -> Result<()>; + + /// Gets user balance if they are an admitted pubkey + async fn get_account_balance(&self, pubkey: &Keys) -> Result<(bool, u64)>; + + /// Update account balance + async fn update_account_balance( + &self, + pub_key: &Keys, + positive: bool, + new_balance: u64, + ) -> Result<()>; + + /// Create invoice record + async fn create_invoice_record(&self, pubkey: &Keys, invoice_info: InvoiceInfo) -> Result<()>; + + /// Update Invoice for given payment hash + async fn update_invoice(&self, payment_hash: &str, status: InvoiceStatus) -> Result; + + /// Get the most recent invoice for a given pubkey + /// invoice must be unpaid and not expired + async fn get_unpaid_invoice(&self, pubkey: &Keys) -> Result>; } // Current time, with a slight forward jitter in seconds diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index 56a2696..93b5a43 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -2,6 +2,7 @@ use crate::db::QueryResult; use crate::error::Result; use crate::event::{single_char_tagname, Event}; use crate::nip05::{Nip05Name, VerificationRecord}; +use crate::payment::{InvoiceInfo, InvoiceStatus}; use crate::repo::{now_jitter, NostrRepo}; use crate::subscription::{ReqFilter, Subscription}; use async_std::stream::StreamExt; @@ -17,6 +18,7 @@ use crate::hexrange::{hex_range, HexSearch}; use crate::repo::postgres_migration::run_migrations; use crate::server::NostrMetrics; use crate::utils::{self, is_hex, is_lower_hex}; +use nostr::key::Keys; use tokio::sync::mpsc::Sender; use tokio::sync::oneshot::Receiver; use tracing::log::trace; @@ -160,6 +162,7 @@ ON CONFLICT (id) DO NOTHING"#, .execute(&mut tx) .await? .rows_affected(); + if ins_count == 0 { // if the event was a duplicate, no need to insert event or // pubkey references. This will abort the txn. @@ -184,7 +187,8 @@ ON CONFLICT (id) DO NOTHING"#, .bind(tag_name) .bind(hex::decode(tag_val).ok()) .execute(&mut tx) - .await?; + .await + .unwrap(); } else { sqlx::query("INSERT INTO tag (event_id, \"name\", value, value_hex) VALUES($1, $2, $3, NULL) \ ON CONFLICT (event_id, \"name\", value, value_hex) DO NOTHING") @@ -192,7 +196,8 @@ ON CONFLICT (id) DO NOTHING"#, .bind(tag_name) .bind(tag_val.as_bytes()) .execute(&mut tx) - .await?; + .await + .unwrap(); } } None => {} @@ -543,6 +548,172 @@ ON CONFLICT (id) DO NOTHING"#, .await? .ok_or(error::Error::SqlxError(RowNotFound)) } + + async fn create_account(&self, pub_key: &Keys) -> Result { + let pub_key = pub_key.public_key().to_string(); + let mut tx = self.conn.begin().await?; + + let result = sqlx::query("INSERT INTO account (pubkey, balance) VALUES ($1, 0);") + .bind(pub_key) + .execute(&mut tx) + .await; + + let success = match result { + Ok(res) => { + tx.commit().await?; + res.rows_affected() == 1 + } + Err(_err) => false, + }; + + Ok(success) + } + + /// Admit account + async fn admit_account(&self, pub_key: &Keys, admission_cost: u64) -> Result<()> { + let pub_key = pub_key.public_key().to_string(); + sqlx::query( + "UPDATE account SET is_admitted = TRUE, balance = balance - $1 WHERE pubkey = $2", + ) + .bind(admission_cost as i64) + .bind(pub_key) + .execute(&self.conn) + .await?; + Ok(()) + } + + /// Gets if the account is admitted and balance + async fn get_account_balance(&self, pub_key: &Keys) -> Result<(bool, u64)> { + let pub_key = pub_key.public_key().to_string(); + let query = r#"SELECT + is_admitted, + balance + FROM account + WHERE pubkey = $1 + LIMIT 1"#; + + let result = sqlx::query_as::<_, (bool, i64)>(query) + .bind(pub_key) + .fetch_optional(&self.conn) + .await? + .ok_or(error::Error::SqlxError(RowNotFound))?; + + Ok((result.0, result.1 as u64)) + } + + /// Update account balance + async fn update_account_balance( + &self, + pub_key: &Keys, + positive: bool, + new_balance: u64, + ) -> Result<()> { + let pub_key = pub_key.public_key().to_string(); + match positive { + true => { + sqlx::query("UPDATE account SET balance = balance + $1 WHERE pubkey = $2") + .bind(new_balance as i64) + .bind(pub_key) + .execute(&self.conn) + .await? + } + false => { + sqlx::query("UPDATE account SET balance = balance - $1 WHERE pubkey = $2") + .bind(new_balance as i64) + .bind(pub_key) + .execute(&self.conn) + .await? + } + }; + Ok(()) + } + + /// Create invoice record + async fn create_invoice_record(&self, pub_key: &Keys, invoice_info: InvoiceInfo) -> Result<()> { + let pub_key = pub_key.public_key().to_string(); + let mut tx = self.conn.begin().await?; + + sqlx::query( + "INSERT INTO invoice (pubkey, payment_hash, amount, status, description, created_at, invoice) VALUES ($1, $2, $3, $4, $5, now(), $6)", + ) + .bind(pub_key) + .bind(invoice_info.payment_hash) + .bind(invoice_info.amount as i64) + .bind(invoice_info.status) + .bind(invoice_info.memo) + .bind(invoice_info.bolt11) + .execute(&mut tx) + .await.unwrap(); + + debug!("Invoice added"); + + tx.commit().await?; + Ok(()) + } + + /// Update invoice record + async fn update_invoice(&self, payment_hash: &str, status: InvoiceStatus) -> Result { + debug!("Payment Hash: {}", payment_hash); + let query = "SELECT pubkey, status, amount FROM invoice WHERE payment_hash=$1;"; + let (pubkey, prev_invoice_status, amount) = + sqlx::query_as::<_, (String, InvoiceStatus, i64)>(query) + .bind(payment_hash) + .fetch_optional(&self.conn) + .await? + .ok_or(error::Error::SqlxError(RowNotFound))?; + + // If the invoice is paid update the confirmed at timestamp + let query = if status.eq(&InvoiceStatus::Paid) { + "UPDATE invoice SET status=$1, confirmed_at = now() WHERE payment_hash=$2;" + } else { + "UPDATE invoice SET status=$1 WHERE payment_hash=$2;" + }; + + sqlx::query(query) + .bind(&status) + .bind(payment_hash) + .execute(&self.conn) + .await?; + + if prev_invoice_status.eq(&InvoiceStatus::Unpaid) && status.eq(&InvoiceStatus::Paid) { + sqlx::query("UPDATE account SET balance = balance + $1 WHERE pubkey = $2") + .bind(amount) + .bind(&pubkey) + .execute(&self.conn) + .await?; + } + + Ok(pubkey) + } + + /// Get the most recent invoice for a given pubkey + /// invoice must be unpaid and not expired + async fn get_unpaid_invoice(&self, pubkey: &Keys) -> Result> { + let query = r#" +SELECT amount, payment_hash, description, invoice +FROM invoice +WHERE pubkey = $1 +ORDER BY created_at DESC +LIMIT 1; + "#; + match sqlx::query_as::<_, (i64, String, String, String)>(query) + .bind(pubkey.public_key().to_string()) + .fetch_optional(&self.conn) + .await + .unwrap() + { + Some((amount, payment_hash, description, invoice)) => Ok(Some(InvoiceInfo { + pubkey: pubkey.public_key().to_string(), + payment_hash, + bolt11: invoice, + amount: amount as u64, + status: InvoiceStatus::Unpaid, + memo: description, + confirmed_at: None, + })), + None => Ok(None), + } + } } /// Create a dynamic SQL query and params from a subscription filter. diff --git a/src/repo/postgres_migration.rs b/src/repo/postgres_migration.rs index 2c16d13..d6b7aa2 100644 --- a/src/repo/postgres_migration.rs +++ b/src/repo/postgres_migration.rs @@ -36,6 +36,7 @@ pub async fn run_migrations(db: &PostgresPool) -> crate::error::Result { } run_migration(m003::migration(), db).await; run_migration(m004::migration(), db).await; + run_migration(m005::migration(), db).await; Ok(current_version(db).await as usize) } @@ -277,3 +278,43 @@ CREATE INDEX event_expires_at_idx ON "event" (expires_at); } } } + +mod m005 { + use crate::repo::postgres_migration::{Migration, SimpleSqlMigration}; + + pub const VERSION: i64 = 5; + + pub fn migration() -> impl Migration { + SimpleSqlMigration { + serial_number: VERSION, + sql: vec![ + r#" +-- Create account table +CREATE TABLE "account" ( + pubkey varchar NOT NULL, + is_admitted BOOLEAN NOT NULL DEFAULT FALSE, + balance BIGINT NOT NULL DEFAULT 0, + tos_accepted_at TIMESTAMP, + CONSTRAINT account_pkey PRIMARY KEY (pubkey) +); + +CREATE TYPE status AS ENUM ('Paid', 'Unpaid', 'Expired'); + + +CREATE TABLE "invoice" ( + payment_hash varchar NOT NULL, + pubkey varchar NOT NULL, + invoice varchar NOT NULL, + amount BIGINT NOT NULL, + status status NOT NULL DEFAULT 'Unpaid', + description varchar, + created_at timestamp, + confirmed_at timestamp, + CONSTRAINT invoice_payment_hash PRIMARY KEY (payment_hash), + CONSTRAINT invoice_pubkey_fkey FOREIGN KEY (pubkey) REFERENCES account (pubkey) ON DELETE CASCADE +); + "#, + ], + } + } +} diff --git a/src/repo/sqlite.rs b/src/repo/sqlite.rs index db933b4..91b48e6 100644 --- a/src/repo/sqlite.rs +++ b/src/repo/sqlite.rs @@ -2,12 +2,12 @@ //use crate::config::SETTINGS; use crate::config::Settings; use crate::db::QueryResult; -use crate::error::Error::SqlError; -use crate::error::Result; +use crate::error::{Error::SqlError, Result}; use crate::event::{single_char_tagname, Event}; use crate::hexrange::hex_range; use crate::hexrange::HexSearch; use crate::nip05::{Nip05Name, VerificationRecord}; +use crate::payment::{InvoiceInfo, InvoiceStatus}; use crate::repo::sqlite_migration::{upgrade_db, STARTUP_SQL}; use crate::server::NostrMetrics; use crate::subscription::{ReqFilter, Subscription}; @@ -30,6 +30,7 @@ use tokio::task; use tracing::{debug, info, trace, warn}; use crate::repo::{now_jitter, NostrRepo}; +use nostr::key::Keys; pub type SqlitePool = r2d2::Pool; pub type PooledConnection = r2d2::PooledConnection; @@ -723,6 +724,209 @@ impl NostrRepo for SqliteRepo { Ok(vr) }).await? } + + /// Create account + async fn create_account(&self, pub_key: &Keys) -> Result { + let pub_key = pub_key.public_key().to_string(); + + let mut conn = self.write_pool.get()?; + let ins_count = tokio::task::spawn_blocking(move || { + let tx = conn.transaction()?; + let ins_count: u64; + { + // Ignore if user is already in db + let query = "INSERT OR IGNORE INTO account (pubkey, is_admitted, balance) VALUES (?1, ?2, ?3);"; + let mut stmt = tx.prepare(query)?; + ins_count = stmt.execute(params![&pub_key, false, 0])? as u64; + } + tx.commit()?; + let ok: Result = Ok(ins_count); + ok + }).await??; + + if ins_count != 1 { + return Ok(false); + } + + Ok(true) + } + + /// Admit account + async fn admit_account(&self, pub_key: &Keys, admission_cost: u64) -> Result<()> { + let pub_key = pub_key.public_key().to_string(); + let mut conn = self.write_pool.get()?; + let pub_key = pub_key.to_owned(); + tokio::task::spawn_blocking(move || { + let tx = conn.transaction()?; + { + let query = "UPDATE account SET is_admitted = TRUE, tos_accepted_at = strftime('%s','now'), balance = balance - ?1 WHERE pubkey=?2;"; + let mut stmt = tx.prepare(query)?; + stmt.execute(params![admission_cost, pub_key])?; + } + tx.commit()?; + let ok: Result<()> = Ok(()); + ok + }) + .await? + } + + /// Gets if the account is admitted and balance + async fn get_account_balance(&self, pub_key: &Keys) -> Result<(bool, u64)> { + let pub_key = pub_key.public_key().to_string(); + let mut conn = self.write_pool.get()?; + let pub_key = pub_key.to_owned(); + tokio::task::spawn_blocking(move || { + let tx = conn.transaction()?; + let query = "SELECT is_admitted, balance FROM account WHERE pubkey = ?1;"; + let mut stmt = tx.prepare_cached(query)?; + let fields = stmt.query_row(params![pub_key], |r| { + let is_admitted: bool = r.get(0)?; + let balance: u64 = r.get(1)?; + // create a tuple since we can't throw non-rusqlite errors in this closure + Ok((is_admitted, balance)) + })?; + Ok(fields) + }) + .await? + } + + /// Update account balance + async fn update_account_balance( + &self, + pub_key: &Keys, + positive: bool, + new_balance: u64, + ) -> Result<()> { + let pub_key = pub_key.public_key().to_string(); + + let mut conn = self.write_pool.get()?; + tokio::task::spawn_blocking(move || { + let tx = conn.transaction()?; + { + let query = if positive { + "UPDATE account SET balance=balance + ?1 WHERE pubkey=?2" + } else { + "UPDATE account SET balance=balance - ?1 WHERE pubkey=?2" + }; + let mut stmt = tx.prepare(query)?; + stmt.execute(params![new_balance, pub_key])?; + } + tx.commit()?; + let ok: Result<()> = Ok(()); + ok + }) + .await? + } + + /// Create invoice record + async fn create_invoice_record(&self, pub_key: &Keys, invoice_info: InvoiceInfo) -> Result<()> { + let pub_key = pub_key.public_key().to_string(); + let pub_key = pub_key.to_owned(); + let mut conn = self.write_pool.get()?; + tokio::task::spawn_blocking(move || { + let tx = conn.transaction()?; + { + let query = "INSERT INTO invoice (pubkey, payment_hash, amount, status, description, created_at, invoice) VALUES (?1, ?2, ?3, ?4, ?5, strftime('%s','now'), ?6);"; + let mut stmt = tx.prepare(query)?; + stmt.execute(params![&pub_key, invoice_info.payment_hash, invoice_info.amount, invoice_info.status.to_string(), invoice_info.memo, invoice_info.bolt11])?; + } + tx.commit()?; + let ok: Result<()> = Ok(()); + ok + }).await??; + + Ok(()) + } + + /// Update invoice record + async fn update_invoice(&self, payment_hash: &str, status: InvoiceStatus) -> Result { + let mut conn = self.write_pool.get()?; + let payment_hash = payment_hash.to_owned(); + let pub_key = tokio::task::spawn_blocking(move || { + let tx = conn.transaction()?; + let pubkey: String; + { + + // Get required invoice info for given payment hash + let query = "SELECT pubkey, status, amount FROM invoice WHERE payment_hash=?1;"; + let mut stmt = tx.prepare(query)?; + let (pub_key, prev_status, amount) = stmt.query_row(params![payment_hash], |r| { + let pub_key: String = r.get(0)?; + let status: String = r.get(1)?; + let amount: u64 = r.get(2)?; + + + Ok((pub_key, status, amount)) + + })?; + + // If the invoice is paid update the confirmed_at timestamp + let query = if status.eq(&InvoiceStatus::Paid) { + "UPDATE invoice SET status=?1, confirmed_at = strftime('%s', 'now') WHERE payment_hash=?2;" + } else { + "UPDATE invoice SET status=?1 WHERE payment_hash=?2;" + }; + let mut stmt = tx.prepare(query)?; + stmt.execute(params![status.to_string(), payment_hash])?; + + // Increase account balance by given invoice amount + if prev_status == "Unpaid" && status.eq(&InvoiceStatus::Paid) { + let query = + "UPDATE account SET balance = balance + ?1 WHERE pubkey = ?2;"; + let mut stmt = tx.prepare(query)?; + stmt.execute(params![amount, pub_key])?; + } + + pubkey = pub_key; + } + + tx.commit()?; + let ok: Result = Ok(pubkey); + ok + }) + .await?; + pub_key + } + + /// Get the most recent invoice for a given pubkey + /// invoice must be unpaid and not expired + async fn get_unpaid_invoice(&self, pubkey: &Keys) -> Result> { + let mut conn = self.write_pool.get()?; + + let pubkey = pubkey.to_owned(); + let pubkey_str = pubkey.clone().public_key().to_string(); + let (payment_hash, invoice, amount, description) = tokio::task::spawn_blocking(move || { + let tx = conn.transaction()?; + + let query = r#" +SELECT amount, payment_hash, description, invoice +FROM invoice +WHERE pubkey = ?1 AND status = 'Unpaid' +ORDER BY created_at DESC +LIMIT 1; + "#; + let mut stmt = tx.prepare(query).unwrap(); + stmt.query_row(params![&pubkey_str], |r| { + let amount: u64 = r.get(0)?; + let payment_hash: String = r.get(1)?; + let description: String = r.get(2)?; + let invoice: String = r.get(3)?; + + Ok((payment_hash, invoice, amount, description)) + }) + }) + .await??; + + Ok(Some(InvoiceInfo { + pubkey: pubkey.public_key().to_string(), + payment_hash, + bolt11: invoice, + amount, + status: InvoiceStatus::Unpaid, + memo: description, + confirmed_at: None, + })) + } } /// Decide if there is an index that should be used explicitly diff --git a/src/repo/sqlite_migration.rs b/src/repo/sqlite_migration.rs index b058e30..a68a880 100644 --- a/src/repo/sqlite_migration.rs +++ b/src/repo/sqlite_migration.rs @@ -23,7 +23,7 @@ pragma mmap_size = 17179869184; -- cap mmap at 16GB "##; /// Latest database version -pub const DB_VERSION: usize = 17; +pub const DB_VERSION: usize = 18; /// Schema definition const INIT_SQL: &str = formatcp!( @@ -96,6 +96,35 @@ FOREIGN KEY(metadata_event) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CAS ); CREATE INDEX IF NOT EXISTS user_verification_name_index ON user_verification(name); CREATE INDEX IF NOT EXISTS user_verification_event_index ON user_verification(metadata_event); + +-- Create account table +CREATE TABLE IF NOT EXISTS account ( +pubkey TEXT PRIMARY KEY, +is_admitted INTEGER NOT NULL DEFAULT 0, +balance INTEGER NOT NULL DEFAULT 0, +tos_accepted_at INTEGER +); + +-- Create account index +CREATE INDEX IF NOT EXISTS user_pubkey_index ON account(pubkey); + +-- Invoice table +CREATE TABLE IF NOT EXISTS invoice ( +payment_hash TEXT PRIMARY KEY, +pubkey TEXT NOT NULL, +invoice TEXT NOT NULL, +amount INTEGER NOT NULL, +status TEXT CHECK ( status IN ('Paid', 'Unpaid', 'Expired' ) ) NOT NUll DEFAULT 'Unpaid', +description TEXT, +created_at INTEGER NOT NULL, +confirmed_at INTEGER, +CONSTRAINT invoice_pubkey_fkey FOREIGN KEY (pubkey) REFERENCES account (pubkey) ON DELETE CASCADE +); + +-- Create invoice index +CREATE INDEX IF NOT EXISTS invoice_pubkey_index ON invoice(pubkey); + + "##, DB_VERSION ); @@ -213,6 +242,9 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result { if curr_version == 16 { curr_version = mig_16_to_17(conn)?; } + if curr_version == 17 { + curr_version = mig_17_to_18(conn)?; + } if curr_version == DB_VERSION { info!( @@ -760,3 +792,50 @@ PRAGMA user_version = 17; } Ok(17) } + +fn mig_17_to_18(conn: &mut PooledConnection) -> Result { + info!("database schema needs update from 17->18"); + let upgrade_sql = r##" +-- Create invoices table +CREATE TABLE IF NOT EXISTS invoice ( +payment_hash TEXT PRIMARY KEY, +pubkey TEXT NOT NULL, +invoice TEXT NOT NULL, +amount INTEGER NOT NULL, +status TEXT CHECK ( status IN ('Paid', 'Unpaid', 'Expired' ) ) NOT NUll DEFAULT 'Unpaid', +description TEXT, +created_at INTEGER NOT NULL, +confirmed_at INTEGER, +CONSTRAINT invoice_pubkey_fkey FOREIGN KEY (pubkey) REFERENCES account (pubkey) ON DELETE CASCADE +); + +-- Create invoice index +CREATE INDEX IF NOT EXISTS invoice_pubkey_index ON invoice(pubkey); + +-- Create account table + +CREATE TABLE IF NOT EXISTS account ( +pubkey TEXT PRIMARY KEY, +is_admitted INTEGER NOT NULL DEFAULT 0, +balance INTEGER NOT NULL DEFAULT 0, +tos_accepted_at INTEGER +); + +-- Create account index +CREATE INDEX IF NOT EXISTS account_pubkey_index ON account(pubkey); + + +pragma optimize; +PRAGMA user_version = 17; +"##; + match conn.execute_batch(upgrade_sql) { + Ok(()) => { + info!("database schema upgraded v17 -> v18"); + } + Err(err) => { + error!("update failed: {}", err); + panic!("database could not be upgraded"); + } + } + Ok(18) +} diff --git a/src/server.rs b/src/server.rs index e8d40f4..7555593 100644 --- a/src/server.rs +++ b/src/server.rs @@ -12,6 +12,9 @@ use crate::event::EventWrapper; use crate::info::RelayInfo; use crate::nip05; use crate::notice::Notice; +use crate::payment; +use crate::payment::InvoiceInfo; +use crate::payment::PaymentMessage; use crate::repo::NostrRepo; use crate::server::Error::CommandUnknownError; use crate::server::EventWrapper::{WrappedAuth, WrappedEvent}; @@ -20,6 +23,7 @@ use futures::SinkExt; use futures::StreamExt; use governor::{Jitter, Quota, RateLimiter}; use http::header::HeaderMap; +use hyper::body::to_bytes; use hyper::header::ACCEPT; use hyper::service::{make_service_fn, service_fn}; use hyper::upgrade::Upgraded; @@ -29,6 +33,8 @@ use hyper::{ use prometheus::IntCounterVec; use prometheus::IntGauge; use prometheus::{Encoder, Histogram, HistogramOpts, IntCounter, Opts, Registry, TextEncoder}; +use qrcode::render::svg; +use qrcode::QrCode; use serde::{Deserialize, Serialize}; use serde_json::json; use std::collections::HashMap; @@ -54,6 +60,8 @@ use tungstenite::error::Error as WsError; use tungstenite::handshake; use tungstenite::protocol::Message; use tungstenite::protocol::WebSocketConfig; +use nostr::key::FromPkStr; +use nostr::key::Keys; /// Handle arbitrary HTTP requests, including for `WebSocket` upgrades. #[allow(clippy::too_many_arguments)] @@ -64,11 +72,13 @@ async fn handle_web_request( remote_addr: SocketAddr, broadcast: Sender, event_tx: tokio::sync::mpsc::Sender, + payment_tx: tokio::sync::broadcast::Sender, shutdown: Receiver<()>, favicon: Option>, registry: Registry, metrics: NostrMetrics, ) -> Result, Infallible> { + debug!("{:?}", request); match ( request.uri().path(), request.headers().contains_key(header::UPGRADE), @@ -175,6 +185,16 @@ async fn handle_web_request( } } } + + // Redirect users to join page when pay to relay enabled + if settings.pay_to_relay.enabled { + return Ok(Response::builder() + .status(StatusCode::TEMPORARY_REDIRECT) + .header("location", "/join") + .body(Body::empty()) + .unwrap()); + } + Ok(Response::builder() .status(200) .header("Content-Type", "text/plain") @@ -210,8 +230,384 @@ async fn handle_web_request( .unwrap()) } } + // LN bits callback endpoint for paid invoices + ("/lnbits", false) => { + let callback: payment::lnbits::LNBitsCallback = + serde_json::from_slice(&to_bytes(request.into_body()).await.unwrap()).unwrap(); + debug!("LNBits callback: {callback:?}"); + + if let Err(e) = payment_tx.send(PaymentMessage::InvoicePaid(callback.payment_hash)) { + warn!("Could not send invoice update: {}", e); + return Ok(Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from("Error processing callback")) + .unwrap()); + } + + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from("ok")) + .unwrap()) + } + // Endpoint for relays terms + ("/terms", false) => Ok(Response::builder() + .status(200) + .header("Content-Type", "text/plain") + .body(Body::from(settings.pay_to_relay.terms_message)) + .unwrap()), + // Endpoint to allow users to sign up + ("/join", false) => { + // Stops sign ups if disabled + if !settings.pay_to_relay.sign_ups { + return Ok(Response::builder() + .status(401) + .header("Content-Type", "text/plain") + .body(Body::from("Sorry, joining is not allowed at the moment")) + .unwrap()); + } + + let html = r#" + + + + + + +
+

Enter your pubkey

+
+

+ +

+ +
+
+ + + + "#; + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(html)) + .unwrap()) + } + // Endpoint to display invoice + ("/invoice", false) => { + // Stops sign ups if disabled + if !settings.pay_to_relay.sign_ups { + return Ok(Response::builder() + .status(401) + .header("Content-Type", "text/plain") + .body(Body::from("Sorry, joining is not allowed at the moment")) + .unwrap()); + } + + // Get query pubkey from query string + let pubkey = get_pubkey(request); + + // Redirect back to join page if no pub key is found in query string + if pubkey.is_none() { + return Ok(Response::builder() + .status(404) + .header("location", "/join") + .body(Body::empty()) + .unwrap()); + } + + // Checks key is valid + let pubkey = pubkey.unwrap(); + let key = Keys::from_pk_str(&pubkey); + if key.is_err() { + return Ok(Response::builder() + .status(401) + .header("Content-Type", "text/plain") + .body(Body::from("Looks like your key is invalid")) + .unwrap()); + } + + // Checks if user is already admitted + let payment_message; + if let Ok((admission_status, _)) = repo.get_account_balance(&key.unwrap()).await { + if admission_status { + return Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from("Already admitted")) + .unwrap()); + } else { + payment_message = PaymentMessage::CheckAccount(pubkey.clone()); + } + } else { + payment_message = PaymentMessage::NewAccount(pubkey.clone()); + } + + // Send message on payment channel requesting invoice + if payment_tx.send(payment_message).is_err() { + warn!("Could not send payment tx"); + return Ok(Response::builder() + .status(501) + .header("Content-Type", "text/plain") + .body(Body::from("Sorry, something went wrong")) + .unwrap()); + } + + // wait for message with invoice back that matched the pub key + let mut invoice_info: Option = None; + while let Ok(msg) = payment_tx.subscribe().recv().await { + match msg { + PaymentMessage::Invoice(m_pubkey, m_invoice_info) => { + if m_pubkey == pubkey.clone() { + invoice_info = Some(m_invoice_info); + break; + } + } + PaymentMessage::AccountAdmitted(m_pubkey) => { + if m_pubkey == pubkey.clone() { + return Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from("Already admitted")) + .unwrap()); + } + } + _ => (), + } + } + + // Return early if cant get invoice + if invoice_info.is_none() { + return Ok(Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from("Sorry, could not get invoice")) + .unwrap()); + } + + // Since invoice is checked to be not none, unwrap + let invoice_info = invoice_info.unwrap(); + + let qr_code: String; + if let Ok(code) = QrCode::new(invoice_info.bolt11.as_bytes()) { + qr_code = code + .render() + .min_dimensions(200, 200) + .dark_color(svg::Color("#800000")) + .light_color(svg::Color("#ffff80")) + .build(); + } else { + qr_code = "Could not render image".to_string(); + } + + let html_result = format!( + r#" + + + + + + + +
+

+ To use this relay, an admission fee of {} sats is required. By paying the fee, you agree to the terms. +

+
+
+
+ {} +
+
+
+
+

{}

+ +
+
+

This page will not refresh

+

Verify admission here once you have paid

+
+
+ + + + + +"#, + settings.pay_to_relay.admission_cost, + qr_code, + invoice_info.bolt11, + pubkey, + invoice_info.bolt11 + ); + + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(html_result)) + .unwrap()) + } + ("/account", false) => { + // Stops sign ups if disabled + if !settings.pay_to_relay.enabled { + return Ok(Response::builder() + .status(401) + .header("Content-Type", "text/plain") + .body(Body::from("This relay is not paid")) + .unwrap()); + } + + // Gets the pubkey from query string + let pubkey = get_pubkey(request); + + // Redirect back to join page if no pub key is found in query string + if pubkey.is_none() { + return Ok(Response::builder() + .status(404) + .header("location", "/join") + .body(Body::empty()) + .unwrap()); + } + + // Checks key is valid + let pubkey = pubkey.unwrap(); + let key = Keys::from_pk_str(&pubkey); + if key.is_err() { + return Ok(Response::builder() + .status(401) + .header("Content-Type", "text/plain") + .body(Body::from("Looks like your key is invalid")) + .unwrap()); + } + + // Checks if user is already admitted + let text = + if let Ok((admission_status, _)) = repo.get_account_balance(&key.unwrap()).await { + if admission_status { + r#"is"# + } else { + r#"is not"# + } + } else { + "Could not get admission status" + }; + + let html_result = format!( + r#" + + + + + + + +
+
{} {} admitted
+
+ + + + + "#, + pubkey, text + ); + + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(html_result)) + .unwrap()) + } + // later balance (_, _) => { - //handle any other url + // handle any other url Ok(Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::from("Nothing here.")) @@ -220,6 +616,22 @@ async fn handle_web_request( } } +// Get pubkey from request query string +fn get_pubkey(request: Request) -> Option { + let query = request.uri().query().unwrap_or("").to_string(); + + // Gets the pubkey value from query string + query.split('&').fold(None, |acc, pair| { + let mut parts = pair.splitn(2, '='); + let key = parts.next(); + let value = parts.next(); + if key == Some("pubkey") { + return value.map(|s| s.to_owned()); + } + acc + }) +} + fn get_header_string(header: &str, headers: &HeaderMap) -> Option { headers .get(header) @@ -423,7 +835,10 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul // metadata events. let (metadata_tx, metadata_rx) = broadcast::channel::(4096); + let (payment_tx, payment_rx) = broadcast::channel::(4096); + let (registry, metrics) = create_metrics(); + // build a repository for events let repo = db::build_repo(&settings, metrics.clone()).await; // start the database writer task. Give it a channel for @@ -435,6 +850,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul event_rx, bcast_tx.clone(), metadata_tx.clone(), + payment_tx.clone(), shutdown_listen, )); info!("db writer created"); @@ -457,6 +873,23 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul } } + // Create payments thread if pay to relay enabled + if settings.pay_to_relay.enabled { + let payment_opt = payment::Payment::new( + repo.clone(), + payment_tx.clone(), + payment_rx, + bcast_tx.clone(), + settings.clone(), + ); + if let Ok(mut p) = payment_opt { + tokio::task::spawn(async move { + info!("starting payment process ..."); + p.run().await; + }); + } + } + // listen for (external to tokio) shutdown request let controlled_shutdown = invoke_shutdown.clone(); tokio::spawn(async move { @@ -498,6 +931,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul let remote_addr = conn.remote_addr(); let bcast = bcast_tx.clone(); let event = event_tx.clone(); + let payment_tx = payment_tx.clone(); let stop = invoke_shutdown.clone(); let settings = settings.clone(); let favicon = favicon.clone(); @@ -513,6 +947,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul remote_addr, bcast.clone(), event.clone(), + payment_tx.clone(), stop.subscribe(), favicon.clone(), registry.clone(),