mirror of
https://github.com/scsibug/nostr-rs-relay.git
synced 2024-06-02 18:14:08 -04:00
Compare commits
67 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
b04ab76e73 | ||
|
39a3a258a0 | ||
|
44c6e3d88b | ||
|
767b76b2b3 | ||
|
c5fb16cd98 | ||
|
9c86f03902 | ||
|
971889f9a6 | ||
|
388eadf880 | ||
|
1ce029860c | ||
|
b7e10e26a2 | ||
|
ab736f5f98 | ||
|
b4471a6698 | ||
|
7120de4ff8 | ||
|
4ff77ab537 | ||
|
84f60f0abc | ||
|
8a67770206 | ||
|
7650f5f4a3 | ||
|
a7b169c0d3 | ||
|
24b1705a08 | ||
|
9d0a98f8bf | ||
|
26f296f76f | ||
|
c3c9b5dcd2 | ||
|
da29bdd837 | ||
|
bacb85024c | ||
|
7a77c459bb | ||
|
34c8b04926 | ||
|
1032a51220 | ||
|
79abd981e1 | ||
|
b1957ab2b1 | ||
|
23aa6e7313 | ||
|
fb751ba252 | ||
|
7c5e851b82 | ||
|
f965c53434 | ||
|
74376d94e5 | ||
|
21d1bbcfe3 | ||
|
c3e13af9e3 | ||
|
05f70112e8 | ||
|
eab522dc39 | ||
|
edf7af1573 | ||
|
34f497a650 | ||
|
4adad4c3a9 | ||
|
70dfcb6a04 | ||
|
c50e10aa21 | ||
|
9e22776227 | ||
|
dad6911807 | ||
|
ddc58a2f1c | ||
|
1131c1986e | ||
|
06fcaad9a1 | ||
|
087b68128f | ||
|
4647476622 | ||
|
7a72e588ea | ||
|
9237eed735 | ||
|
f4beb884b3 | ||
|
73285683a3 | ||
|
2f10271903 | ||
|
a34516628b | ||
|
eba7a32615 | ||
|
4d746fad85 | ||
|
0582a891cc | ||
|
2bcddf8bbf | ||
|
1595ec783d | ||
|
a2d1d78e23 | ||
|
04db2203bb | ||
|
1c1b1a1802 | ||
|
993fec4eed | ||
|
beffeb4d86 | ||
|
5135f3b007 |
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
|
@ -9,7 +9,7 @@ jobs:
|
|||
test_nostr-rs-relay:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
- name: Update local toolchain
|
||||
run: |
|
||||
|
|
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -1,4 +1,4 @@
|
|||
**/target/
|
||||
nostr.db
|
||||
nostr.db-*
|
||||
justfile
|
||||
justfile
|
||||
|
|
2192
Cargo.lock
generated
2192
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
10
Cargo.toml
10
Cargo.toml
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "nostr-rs-relay"
|
||||
version = "0.8.9"
|
||||
version = "0.8.13"
|
||||
edition = "2021"
|
||||
authors = ["Greg Heartsfield <scsibug@imap.cc>"]
|
||||
description = "A relay implementation for the Nostr protocol"
|
||||
|
@ -13,8 +13,9 @@ categories = ["network-programming", "web-programming"]
|
|||
|
||||
[dependencies]
|
||||
clap = { version = "4.0.32", features = ["env", "default", "derive"]}
|
||||
tracing = "0.1.36"
|
||||
tracing-subscriber = "0.2.0"
|
||||
tracing = "0.1.37"
|
||||
tracing-appender = "0.2.2"
|
||||
tracing-subscriber = "0.3.16"
|
||||
tokio = { version = "1", features = ["full", "tracing", "signal"] }
|
||||
prost = "0.11"
|
||||
tonic = "0.8.3"
|
||||
|
@ -38,7 +39,7 @@ lazy_static = "1.4"
|
|||
governor = "0.4"
|
||||
nonzero_ext = "0.3"
|
||||
hyper = { version="0.14", features=["client", "server","http1","http2","tcp"] }
|
||||
hyper-tls = "0.5"
|
||||
hyper-rustls = { version = "0.24" }
|
||||
http = { version = "0.2" }
|
||||
parse_duration = "2"
|
||||
rand = "0.8"
|
||||
|
@ -56,6 +57,7 @@ qrcode = { version = "0.12.0", default-features = false, features = ["svg"] }
|
|||
nostr = { version = "0.18.0", default-features = false, features = ["base", "nip04", "nip19"] }
|
||||
[target.'cfg(not(target_env = "msvc"))'.dependencies]
|
||||
tikv-jemallocator = "0.5"
|
||||
log = "0.4"
|
||||
|
||||
[dev-dependencies]
|
||||
anyhow = "1"
|
||||
|
|
|
@ -49,7 +49,7 @@ The examples below start a rootless podman container, mapping a local
|
|||
data directory and config file.
|
||||
|
||||
```console
|
||||
$ podman build -t nostr-rs-relay .
|
||||
$ podman build --pull -t nostr-rs-relay .
|
||||
|
||||
$ mkdir data
|
||||
|
||||
|
@ -93,6 +93,11 @@ https://hub.docker.com/r/scsibug/nostr-rs-relay
|
|||
|
||||
Building `nostr-rs-relay` requires an installation of Cargo & Rust: https://www.rust-lang.org/tools/install
|
||||
|
||||
The following OS packages will be helpful; on Debian/Ubuntu:
|
||||
```console
|
||||
$ sudo apt-get install build-essential cmake protobuf-compiler pkg-config libssl-dev
|
||||
```
|
||||
|
||||
Clone this repository, and then build a release version of the relay:
|
||||
|
||||
```console
|
||||
|
|
33
config.toml
33
config.toml
|
@ -10,7 +10,7 @@ name = "nostr-rs-relay"
|
|||
# Description
|
||||
description = "A newly created nostr-rs-relay.\n\nCustomize this with your own info."
|
||||
|
||||
# Administrative contact pubkey
|
||||
# Administrative contact pubkey (32-byte hex, not npub)
|
||||
#pubkey = "0c2d168a4ae8ca58c9f1ab237b5df682599c6c7ab74307ea8b05684b60405d41"
|
||||
|
||||
# Administrative contact URI
|
||||
|
@ -20,6 +20,9 @@ description = "A newly created nostr-rs-relay.\n\nCustomize this with your own i
|
|||
# ICO format.
|
||||
#favicon = "favicon.ico"
|
||||
|
||||
# URL of Relay's icon.
|
||||
#relay_icon = "https://example.test/img.png"
|
||||
|
||||
[diagnostics]
|
||||
# Enable tokio tracing (for use with tokio-console)
|
||||
#tracing = false
|
||||
|
@ -42,7 +45,7 @@ description = "A newly created nostr-rs-relay.\n\nCustomize this with your own i
|
|||
# Database connection pool settings for subscribers:
|
||||
|
||||
# Minimum number of SQLite reader connections
|
||||
#min_conn = 4
|
||||
#min_conn = 0
|
||||
|
||||
# Maximum number of SQLite reader connections. Recommend setting this
|
||||
# to approx the number of cores.
|
||||
|
@ -52,6 +55,15 @@ description = "A newly created nostr-rs-relay.\n\nCustomize this with your own i
|
|||
# sqlite.
|
||||
#connection = "postgresql://postgres:nostr@localhost:7500/nostr"
|
||||
|
||||
# Optional database connection string for writing. Use this for
|
||||
# postgres clusters where you want to separate reads and writes to
|
||||
# different nodes. Ignore for single-database instances.
|
||||
#connection_write = "postgresql://postgres:nostr@localhost:7500/nostr"
|
||||
|
||||
[logging]
|
||||
# Directory to store log files. Log files roll over daily.
|
||||
#folder_path = "./log"
|
||||
#file_prefix = "nostr-relay"
|
||||
|
||||
[grpc]
|
||||
# gRPC interfaces for externalized decisions and other extensions to
|
||||
|
@ -63,6 +75,11 @@ description = "A newly created nostr-rs-relay.\n\nCustomize this with your own i
|
|||
# `proto/nauthz.proto`.
|
||||
# event_admission_server = "http://[::1]:50051"
|
||||
|
||||
# If the event admission server denies writes
|
||||
# in any case (excluding spam filtering).
|
||||
# This is reflected in the relay information document.
|
||||
# restricts_write = true
|
||||
|
||||
[network]
|
||||
# Bind to this network address
|
||||
address = "0.0.0.0"
|
||||
|
@ -138,6 +155,11 @@ reject_future_seconds = 1800
|
|||
# 0, 1, 2, 3, 7, 40, 41, 42, 43, 44, 30023,
|
||||
#]
|
||||
|
||||
# Rejects imprecise requests (kind only and author only etc)
|
||||
# This is a temperary measure to improve the adoption of outbox model
|
||||
# Its recommended to have this enabled
|
||||
limit_scrapers = false
|
||||
|
||||
[authorization]
|
||||
# Pubkey addresses in this array are whitelisted for event publishing.
|
||||
# Only valid events by these authors will be accepted, if the variable
|
||||
|
@ -148,7 +170,7 @@ reject_future_seconds = 1800
|
|||
#]
|
||||
# Enable NIP-42 authentication
|
||||
#nip42_auth = false
|
||||
# Send DMs events (kind 4) only to their authenticated recipients
|
||||
# Send DMs (kind 4 and 44) and gift wraps (kind 1059) only to their authenticated recipients
|
||||
#nip42_dms = false
|
||||
|
||||
[verified_users]
|
||||
|
@ -193,6 +215,9 @@ reject_future_seconds = 1800
|
|||
# LNBits api secret
|
||||
#api_secret = "<ln bits api>"
|
||||
|
||||
# Nostr direct message on signup
|
||||
#direct_message=false
|
||||
|
||||
# Terms of service
|
||||
#terms_message = """
|
||||
#This service (and supporting services) are provided "as is", without warranty of any kind, express or implied.
|
||||
|
@ -214,4 +239,6 @@ reject_future_seconds = 1800
|
|||
|
||||
# Whether or not new sign ups should be allowed
|
||||
#sign_ups = false
|
||||
|
||||
# optional if `direct_message=false`
|
||||
#secret_key = "<nostr nsec>"
|
||||
|
|
|
@ -1,12 +1,14 @@
|
|||
[Unit]
|
||||
Description=nostr-rs-relay
|
||||
|
||||
[Service]
|
||||
User=REPLACE_WITH_YOUR_USERNAME
|
||||
WorkingDirectory=/usr/bin/
|
||||
WorkingDirectory=/var/lib/nostr-rs-relay
|
||||
Environment=RUST_LOG=warn,nostr_rs_relay=info
|
||||
ExecStart=nostr-rs-relay --config /etc/nostr-rs-relay/config.toml
|
||||
ExecStart=/usr/bin/nostr-rs-relay --config /etc/nostr-rs-relay/config.toml
|
||||
TimeoutStopSec=10
|
||||
Restart=on-failure
|
||||
RestartSec=5
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
WantedBy=multi-user.target
|
||||
|
|
|
@ -78,18 +78,24 @@ PRAGMA foreign_keys = ON;
|
|||
delete from event where event_hash=x'00000000000c1271675dc86e3e1dd1336827bccabb90dc4c9d3b4465efefe00e';
|
||||
```
|
||||
|
||||
### Deleting All Events for Pubkey
|
||||
### Querying and Deleting All Events for Pubkey
|
||||
|
||||
```console
|
||||
PRAGMA foreign_keys = ON;
|
||||
|
||||
select lower(hex(author)) as author, count(*) as c from event group by author order by c asc;
|
||||
|
||||
delete from event where author=x'000000000002c7831d9c5a99f183afc2813a6f69a16edda7f6fc0ed8110566e6';
|
||||
```
|
||||
|
||||
### Deleting All Events of a Kind
|
||||
### Querying and Deleting All Events of a Kind
|
||||
|
||||
|
||||
```console
|
||||
PRAGMA foreign_keys = ON;
|
||||
|
||||
select printf('%7d', kind), count(*) as c from event group by kind order by c;
|
||||
|
||||
delete from event where kind=70202;
|
||||
```
|
||||
|
||||
|
@ -106,7 +112,8 @@ seen" policy.
|
|||
|
||||
```console
|
||||
PRAGMA foreign_keys = ON;
|
||||
TODO!
|
||||
|
||||
DELETE FROM event WHERE first_seen < CAST(strftime('%s', date('now', '-30 day')) AS INT);
|
||||
```
|
||||
|
||||
### Delete Profile Events with No Recent Events
|
||||
|
|
|
@ -10,7 +10,7 @@ and reduce spam and abuse.
|
|||
|
||||
This will likely evolve substantially, the first goal is to get a
|
||||
basic one-way service that lets an externalized program decide on
|
||||
event persistance. This does not represent the final state of gRPC
|
||||
event persistence. This does not represent the final state of gRPC
|
||||
extensibility in `nostr-rs-relay`.
|
||||
|
||||
## Considerations
|
||||
|
|
|
@ -22,18 +22,18 @@ api_secret = "<LNbits api key>"
|
|||
# Terms of service
|
||||
terms_message = """This service ....
|
||||
"""
|
||||
# Whether or not new sign ups should be allowed
|
||||
sign_ups = true
|
||||
# Whether or not new sign ups should be allowed
|
||||
sign_ups = true
|
||||
secret_key = "<nostr secret key to send dms>"
|
||||
```
|
||||
|
||||
The LNBits instance must have a signed HTTPS a self signed certificate will not work.
|
||||
The LNBits instance must have a signed HTTPS a self signed certificate will not work.
|
||||
|
||||
## Design Overview
|
||||
|
||||
### Concepts
|
||||
|
||||
All authors are initially not admitted to write to the relay. There are two ways to gain access write to the relay. The first is by attempting to post the the relay, upon receiving an event from an author that is not admitted, the relay will send a direct message including the terms of service of the relay and a lighting invoice for the admission cost. Once this invoice is payed the author can write to the relay. For this method to work the author must be reading from the relay. An author can also pay and accept the terms of service via a webpage `https://<relay-url>/join`.
|
||||
All authors are initially not admitted to write to the relay. There are two ways to gain access write to the relay. The first is by attempting to post the the relay, upon receiving an event from an author that is not admitted, the relay will send a direct message including the terms of service of the relay and a lighting invoice for the admission cost. Once this invoice is paid the author can write to the relay. For this method to work the author must be reading from the relay. An author can also pay and accept the terms of service via a webpage `https://<relay-url>/join`.
|
||||
|
||||
## Design Details
|
||||
|
||||
|
@ -54,7 +54,7 @@ Invoice information is stored in a dedicated table. This tracks:
|
|||
* `created_at` timestamp of creation
|
||||
* `confirmed_at` timestamp of payment
|
||||
|
||||
### Event Handling
|
||||
### Event Handling
|
||||
|
||||
If "pay to relay" is enabled, all incoming events are evaluated to determine whether the author is on the relay's whitelist or if they have paid the admission fee and accepted the terms. If "pay per note" is enabled, there is an additional check to ensure that the author has enough balance, which is then reduced by the cost per note. If the author is on the whitelist, this balance check is not necessary.
|
||||
|
||||
|
@ -77,9 +77,8 @@ simply to demonstrate a mitigation is possible.
|
|||
|
||||
*Mitigation*: Rate limit number of new sign ups
|
||||
|
||||
### Admitted Author Spamming
|
||||
### Admitted Author Spamming
|
||||
|
||||
*Threat*: An attacker gains write access by paying the admission fee, and then floods the relay with a large number of spam events.
|
||||
|
||||
*Mitigation*: The attacker's admission can be revoked and their admission fee will not be refunded. Enabling "cost per event" and increasing the admission cost can also discourage this type of behavior.
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ frontend fe_prod
|
|||
bind :80
|
||||
http-request set-header X-Forwarded-Proto https if { ssl_fc }
|
||||
redirect scheme https code 301 if !{ ssl_fc }
|
||||
acl host_relay hdr(host) -i relay.example.com
|
||||
acl host_relay hdr(host) -i -m beg relay.example.com
|
||||
use_backend relay if host_relay
|
||||
# HSTS (1 year)
|
||||
http-response set-header Strict-Transport-Security max-age=31536000
|
||||
|
@ -117,10 +117,10 @@ Assumptions:
|
|||
|
||||
* `Traefik` version is `2.9` (other versions not tested).
|
||||
* `Traefik` is used for provisioning of Let's Encrypt certificates.
|
||||
* `Traefik` is running in `Docker`, using `docker compose` and labels for the static configuration. An equivalent setup useing a Traefik config file is possible too (but not covered here).
|
||||
* `Traefik` is running in `Docker`, using `docker compose` and labels for the static configuration. An equivalent setup using a Traefik config file is possible too (but not covered here).
|
||||
* Strict Transport Security is enabled.
|
||||
* Hostname for the relay is `relay.example.com`, email adres for ACME certificates provider is `name@example.com`.
|
||||
* ipv6 is enabled, a viable private ipv6 subnet is specified in the example below.
|
||||
* Hostname for the relay is `relay.example.com`, email address for ACME certificates provider is `name@example.com`.
|
||||
* ipv6 is enabled, a viable private ipv6 subnet is specified in the example below.
|
||||
* Relay is running on port `8080`.
|
||||
|
||||
```
|
||||
|
@ -196,4 +196,4 @@ services:
|
|||
|
||||
### Traefik Notes
|
||||
|
||||
Traefik will take care of the provisioning and renewal of certificates. In case of an ipv4-only relay, simply detele the `enable_ipv6:` and `ipam:` entries in the `networks:` section of the docker-compose file.
|
||||
Traefik will take care of the provisioning and renewal of certificates. In case of an ipv4-only relay, simply detele the `enable_ipv6:` and `ipam:` entries in the `networks:` section of the docker-compose file.
|
||||
|
|
|
@ -12,13 +12,13 @@ Start by building the application from source. Here is how to do that:
|
|||
3. `cargo build --release`
|
||||
|
||||
### Place the files where they belong
|
||||
We want to palce the nostr-rs-relay binary and the config.toml file where they belong. While still in the root level of the nostr-rs-relay folder you cloned in last step, run the following commands:
|
||||
We want to place the nostr-rs-relay binary and the config.toml file where they belong. While still in the root level of the nostr-rs-relay folder you cloned in last step, run the following commands:
|
||||
1. `sudo cp target/release/nostr-rs-relay /usr/local/bin/`
|
||||
2. `sudo mkdir /etc/nostr-rs-relay`
|
||||
2. `sudo cp config.toml /etc/nostr-rs-relay`
|
||||
|
||||
### Create the Systemd service file
|
||||
We need to create a new Systemd service file. These files are placed in the `/etc/systemd/system/` folder where you will find many other services running.
|
||||
We need to create a new Systemd service file. These files are placed in the `/etc/systemd/system/` folder where you will find many other services running.
|
||||
|
||||
1. `sudo vim /etc/systemd/system/nostr-rs-relay.service`
|
||||
2. Paste in the contents of [this service file](../contrib/nostr-rs-relay.service). Remember to replace the `User` value with your own username.
|
||||
|
@ -30,10 +30,11 @@ To get the service running, we need to reload the systemd daemon and enable the
|
|||
|
||||
1. `sudo systemctl daemon-reload`
|
||||
2. `sudo systemctl start nostr-rs-relay.service`
|
||||
3. `sudo systemctl status nostr-rs-relay.service`
|
||||
3. `sudo systemctl enable nostr-rs-relay.service`
|
||||
4. `sudo systemctl status nostr-rs-relay.service`
|
||||
|
||||
|
||||
### Tips
|
||||
|
||||
#### Logs
|
||||
The application will write logs to the journal. To read it, execute `sudo journalctl -f -u nostr-rs-relay`
|
||||
The application will write logs to the journal. To read it, execute `sudo journalctl -f -u nostr-rs-relay`
|
||||
|
|
|
@ -179,7 +179,7 @@ attempts to persist them to disk. Once validated and persisted, these
|
|||
events are broadcast to all subscribers.
|
||||
|
||||
When verification is enabled, the writer must check to ensure a valid,
|
||||
unexpired verification record exists for the auther. All metadata
|
||||
unexpired verification record exists for the author. All metadata
|
||||
events (regardless of verification status) are forwarded to a verifier
|
||||
module. If the verifier determines a new verification record is
|
||||
needed, it is also responsible for persisting and broadcasting the
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
tonic_build::configure()
|
||||
.build_server(false)
|
||||
.build_server(true)
|
||||
.protoc_arg("--experimental_allow_proto3_optional")
|
||||
.compile(&["../../proto/nauthz.proto"], &["../../proto"])?;
|
||||
Ok(())
|
||||
|
|
|
@ -20,7 +20,7 @@ pub fn main() -> Result<()> {
|
|||
let _trace_sub = tracing_subscriber::fmt::try_init();
|
||||
println!("Nostr-rs-relay Bulk Loader");
|
||||
// check for a database file, or create one.
|
||||
let settings = config::Settings::new(&None);
|
||||
let settings = config::Settings::new(&None)?;
|
||||
if !Path::new(&settings.database.data_directory).is_dir() {
|
||||
info!("Database directory does not exist");
|
||||
return Err(Error::DatabaseDirError);
|
||||
|
@ -143,7 +143,7 @@ fn write_event(tx: &Transaction, e: Event) -> Result<usize> {
|
|||
let event_id = tx.last_insert_rowid();
|
||||
// look at each event, and each tag, creating new tag entries if appropriate.
|
||||
for t in e.tags.iter().filter(|x| x.len() > 1) {
|
||||
let tagname = t.get(0).unwrap();
|
||||
let tagname = t.first().unwrap();
|
||||
let tagnamechar_opt = single_char_tagname(tagname);
|
||||
if tagnamechar_opt.is_none() {
|
||||
continue;
|
||||
|
|
|
@ -1,10 +1,8 @@
|
|||
//! Configuration file and settings management
|
||||
use crate::payment::Processor;
|
||||
use config::{Config, ConfigError, File};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::time::Duration;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::payment::Processor;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[allow(unused)]
|
||||
|
@ -15,6 +13,7 @@ pub struct Info {
|
|||
pub pubkey: Option<String>,
|
||||
pub contact: Option<String>,
|
||||
pub favicon: Option<String>,
|
||||
pub relay_icon: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
@ -26,12 +25,14 @@ pub struct Database {
|
|||
pub min_conn: u32,
|
||||
pub max_conn: u32,
|
||||
pub connection: String,
|
||||
pub connection_write: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[allow(unused)]
|
||||
pub struct Grpc {
|
||||
pub event_admission_server: Option<String>,
|
||||
pub restricts_write: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
@ -73,6 +74,7 @@ pub struct Limits {
|
|||
pub event_persist_buffer: usize, // events to buffer for database commits (block senders if database writes are too slow)
|
||||
pub event_kind_blacklist: Option<Vec<u64>>,
|
||||
pub event_kind_allowlist: Option<Vec<u64>>,
|
||||
pub limit_scrapers: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
@ -80,7 +82,7 @@ pub struct Limits {
|
|||
pub struct Authorization {
|
||||
pub pubkey_whitelist: Option<Vec<String>>, // If present, only allow these pubkeys to publish events
|
||||
pub nip42_auth: bool, // if true enables NIP-42 authentication
|
||||
pub nip42_dms: bool, // if true send DMs only to their authenticated recipients
|
||||
pub nip42_dms: bool, // if true send DMs only to their authenticated recipients
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
@ -92,8 +94,9 @@ pub struct PayToRelay {
|
|||
pub node_url: String,
|
||||
pub api_secret: String,
|
||||
pub terms_message: String,
|
||||
pub sign_ups: bool, // allow new users to sign up to relay
|
||||
pub secret_key: String,
|
||||
pub sign_ups: bool, // allow new users to sign up to relay
|
||||
pub direct_message: bool, // Send direct message to user with invoice and terms
|
||||
pub secret_key: Option<String>,
|
||||
pub processor: Processor,
|
||||
}
|
||||
|
||||
|
@ -165,6 +168,13 @@ impl VerifiedUsers {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[allow(unused)]
|
||||
pub struct Logging {
|
||||
pub folder_path: Option<String>,
|
||||
pub file_prefix: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[allow(unused)]
|
||||
pub struct Settings {
|
||||
|
@ -179,20 +189,27 @@ pub struct Settings {
|
|||
pub verified_users: VerifiedUsers,
|
||||
pub retention: Retention,
|
||||
pub options: Options,
|
||||
pub logging: Logging,
|
||||
}
|
||||
|
||||
impl Settings {
|
||||
#[must_use]
|
||||
pub fn new(config_file_name: &Option<String>) -> Self {
|
||||
pub fn new(config_file_name: &Option<String>) -> Result<Self, ConfigError> {
|
||||
let default_settings = Self::default();
|
||||
// attempt to construct settings with file
|
||||
let from_file = Self::new_from_default(&default_settings, config_file_name);
|
||||
match from_file {
|
||||
Ok(f) => f,
|
||||
Err(e) => {
|
||||
warn!("Error reading config file ({:?})", e);
|
||||
default_settings
|
||||
// pass up the parse error if the config file was specified,
|
||||
// otherwise use the default config (with a warning).
|
||||
if config_file_name.is_some() {
|
||||
Err(e)
|
||||
} else {
|
||||
eprintln!("Error reading config file ({:?})", e);
|
||||
eprintln!("WARNING: Default configuration settings will be used");
|
||||
Ok(default_settings)
|
||||
}
|
||||
}
|
||||
ok => ok,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -234,7 +251,14 @@ impl Settings {
|
|||
// Should check that url is valid
|
||||
assert_ne!(settings.pay_to_relay.node_url, "");
|
||||
assert_ne!(settings.pay_to_relay.terms_message, "");
|
||||
assert_ne!(settings.pay_to_relay.secret_key, "");
|
||||
|
||||
if settings.pay_to_relay.direct_message {
|
||||
assert_ne!(
|
||||
settings.pay_to_relay.secret_key,
|
||||
Some("<nostr nsec>".to_string())
|
||||
);
|
||||
assert!(settings.pay_to_relay.secret_key.is_some());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(settings)
|
||||
|
@ -251,6 +275,7 @@ impl Default for Settings {
|
|||
pubkey: None,
|
||||
contact: None,
|
||||
favicon: None,
|
||||
relay_icon: None,
|
||||
},
|
||||
diagnostics: Diagnostics { tracing: false },
|
||||
database: Database {
|
||||
|
@ -260,9 +285,11 @@ impl Default for Settings {
|
|||
min_conn: 4,
|
||||
max_conn: 8,
|
||||
connection: "".to_owned(),
|
||||
connection_write: None,
|
||||
},
|
||||
grpc: Grpc {
|
||||
event_admission_server: None,
|
||||
restricts_write: false,
|
||||
},
|
||||
network: Network {
|
||||
port: 8080,
|
||||
|
@ -282,6 +309,7 @@ impl Default for Settings {
|
|||
event_persist_buffer: 4096,
|
||||
event_kind_blacklist: None,
|
||||
event_kind_allowlist: None,
|
||||
limit_scrapers: false
|
||||
},
|
||||
authorization: Authorization {
|
||||
pubkey_whitelist: None, // Allow any address to publish
|
||||
|
@ -296,7 +324,8 @@ impl Default for Settings {
|
|||
node_url: "".to_string(),
|
||||
api_secret: "".to_string(),
|
||||
sign_ups: false,
|
||||
secret_key: "".to_string(),
|
||||
direct_message: false,
|
||||
secret_key: None,
|
||||
processor: Processor::LNBits,
|
||||
},
|
||||
verified_users: VerifiedUsers {
|
||||
|
@ -318,6 +347,10 @@ impl Default for Settings {
|
|||
options: Options {
|
||||
reject_future_seconds: None, // Reject events in the future if defined
|
||||
},
|
||||
logging: Logging {
|
||||
folder_path: None,
|
||||
file_prefix: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
16
src/conn.rs
16
src/conn.rs
|
@ -156,7 +156,7 @@ impl ClientConn {
|
|||
self.auth = Challenge(Uuid::new_v4().to_string());
|
||||
}
|
||||
|
||||
pub fn authenticate(&mut self, event: &Event, relay_url: &String) -> Result<()> {
|
||||
pub fn authenticate(&mut self, event: &Event, relay_url: &str) -> Result<()> {
|
||||
match &self.auth {
|
||||
Challenge(_) => (),
|
||||
AuthPubkey(_) => {
|
||||
|
@ -181,15 +181,15 @@ impl ClientConn {
|
|||
return Err(Error::AuthFailure);
|
||||
}
|
||||
|
||||
let mut challenge: Option<&String> = None;
|
||||
let mut relay: Option<&String> = None;
|
||||
let mut challenge: Option<&str> = None;
|
||||
let mut relay: Option<&str> = None;
|
||||
|
||||
for tag in &event.tags {
|
||||
if tag.len() == 2 && tag.get(0) == Some(&"challenge".into()) {
|
||||
challenge = tag.get(1);
|
||||
if tag.len() == 2 && tag.first() == Some(&"challenge".into()) {
|
||||
challenge = tag.get(1).map(|x| x.as_str());
|
||||
}
|
||||
if tag.len() == 2 && tag.get(0) == Some(&"relay".into()) {
|
||||
relay = tag.get(1);
|
||||
if tag.len() == 2 && tag.first() == Some(&"relay".into()) {
|
||||
relay = tag.get(1).map(|x| x.as_str());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -204,7 +204,7 @@ impl ClientConn {
|
|||
}
|
||||
}
|
||||
|
||||
match (relay.and_then(|url| host_str(url)), host_str(relay_url)) {
|
||||
match (relay.and_then(host_str), host_str(relay_url)) {
|
||||
(Some(received_relay), Some(our_relay)) => {
|
||||
if received_relay != our_relay {
|
||||
return Err(Error::AuthFailure);
|
||||
|
|
30
src/db.rs
30
src/db.rs
|
@ -11,6 +11,7 @@ use crate::repo::NostrRepo;
|
|||
use crate::server::NostrMetrics;
|
||||
use governor::clock::Clock;
|
||||
use governor::{Quota, RateLimiter};
|
||||
use log::LevelFilter;
|
||||
use nostr::key::FromPkStr;
|
||||
use nostr::key::Keys;
|
||||
use r2d2;
|
||||
|
@ -20,7 +21,6 @@ use sqlx::ConnectOptions;
|
|||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::time::{Duration, Instant};
|
||||
use tracing::log::LevelFilter;
|
||||
use tracing::{debug, info, trace, warn};
|
||||
|
||||
pub type SqlitePool = r2d2::Pool<r2d2_sqlite::SqliteConnectionManager>;
|
||||
|
@ -70,7 +70,26 @@ async fn build_postgres_pool(settings: &Settings, metrics: NostrMetrics) -> Post
|
|||
.connect_with(options)
|
||||
.await
|
||||
.unwrap();
|
||||
let repo = PostgresRepo::new(pool, metrics);
|
||||
|
||||
let write_pool: PostgresPool = match &settings.database.connection_write {
|
||||
Some(cfg_write) => {
|
||||
let mut options_write: PgConnectOptions = cfg_write.as_str().parse().unwrap();
|
||||
options_write.log_statements(LevelFilter::Debug);
|
||||
options_write.log_slow_statements(LevelFilter::Warn, Duration::from_secs(60));
|
||||
|
||||
PoolOptions::new()
|
||||
.max_connections(settings.database.max_conn)
|
||||
.min_connections(settings.database.min_conn)
|
||||
.idle_timeout(Duration::from_secs(60))
|
||||
.connect_with(options_write)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
None => pool.clone(),
|
||||
};
|
||||
|
||||
let repo = PostgresRepo::new(pool, write_pool, metrics);
|
||||
|
||||
// Panic on migration failure
|
||||
let version = repo.migrate_up().await.unwrap();
|
||||
info!("Postgres migration completed, at v{}", version);
|
||||
|
@ -242,7 +261,7 @@ pub async fn db_writer(
|
|||
) => {
|
||||
// User does not exist
|
||||
info!("Unregistered user");
|
||||
if settings.pay_to_relay.sign_ups {
|
||||
if settings.pay_to_relay.sign_ups && settings.pay_to_relay.direct_message {
|
||||
payment_tx
|
||||
.send(PaymentMessage::NewAccount(event.pubkey))
|
||||
.ok();
|
||||
|
@ -359,7 +378,7 @@ pub async fn db_writer(
|
|||
notice_tx
|
||||
.try_send(Notice::blocked(
|
||||
event.id,
|
||||
&decision.message().unwrap_or_else(|| "".to_string()),
|
||||
&decision.message().unwrap_or_default(),
|
||||
))
|
||||
.ok();
|
||||
continue;
|
||||
|
@ -382,6 +401,9 @@ pub async fn db_writer(
|
|||
start.elapsed()
|
||||
);
|
||||
event_write = true;
|
||||
|
||||
// send OK message
|
||||
notice_tx.try_send(Notice::saved(event.id)).ok();
|
||||
} else {
|
||||
match repo.write_event(&event).await {
|
||||
Ok(updated) => {
|
||||
|
|
23
src/event.rs
23
src/event.rs
|
@ -160,11 +160,11 @@ impl Event {
|
|||
.tags
|
||||
.iter()
|
||||
.filter(|x| !x.is_empty())
|
||||
.filter(|x| x.get(0).unwrap() == "expiration")
|
||||
.filter(|x| x.first().unwrap() == "expiration")
|
||||
.map(|x| x.get(1).unwrap_or(&default))
|
||||
.take(1)
|
||||
.collect();
|
||||
let val_first = dvals.get(0);
|
||||
let val_first = dvals.first();
|
||||
val_first.and_then(|t| t.parse::<u64>().ok())
|
||||
}
|
||||
|
||||
|
@ -192,11 +192,11 @@ impl Event {
|
|||
.tags
|
||||
.iter()
|
||||
.filter(|x| !x.is_empty())
|
||||
.filter(|x| x.get(0).unwrap() == "d")
|
||||
.filter(|x| x.first().unwrap() == "d")
|
||||
.map(|x| x.get(1).unwrap_or(&default))
|
||||
.take(1)
|
||||
.collect();
|
||||
let dval_first = dvals.get(0);
|
||||
let dval_first = dvals.first();
|
||||
match dval_first {
|
||||
Some(_) => dval_first.map(|x| x.to_string()),
|
||||
None => Some(default),
|
||||
|
@ -232,7 +232,7 @@ impl Event {
|
|||
.tags
|
||||
.iter()
|
||||
.filter(|x| x.len() == 4)
|
||||
.filter(|x| x.get(0).unwrap() == "delegation")
|
||||
.filter(|x| x.first().unwrap() == "delegation")
|
||||
.take(1)
|
||||
.next()?
|
||||
.clone(); // get first tag
|
||||
|
@ -277,7 +277,7 @@ impl Event {
|
|||
let mut idx: HashMap<char, HashSet<String>> = HashMap::new();
|
||||
// iterate over tags that have at least 2 elements
|
||||
for t in self.tags.iter().filter(|x| x.len() > 1) {
|
||||
let tagname = t.get(0).unwrap();
|
||||
let tagname = t.first().unwrap();
|
||||
let tagnamechar_opt = single_char_tagname(tagname);
|
||||
if tagnamechar_opt.is_none() {
|
||||
continue;
|
||||
|
@ -285,7 +285,7 @@ impl Event {
|
|||
let tagnamechar = tagnamechar_opt.unwrap();
|
||||
let tagval = t.get(1).unwrap();
|
||||
// ensure a vector exists for this tag
|
||||
idx.entry(tagnamechar).or_insert_with(HashSet::new);
|
||||
idx.entry(tagnamechar).or_default();
|
||||
// get the tag vec and insert entry
|
||||
let idx_tag_vec = idx.get_mut(&tagnamechar).expect("could not get tag vector");
|
||||
idx_tag_vec.insert(tagval.clone());
|
||||
|
@ -310,7 +310,7 @@ impl Event {
|
|||
self.tags
|
||||
.iter()
|
||||
.filter(|x| x.len() > 1)
|
||||
.filter(|x| x.get(0).unwrap() == tag_name)
|
||||
.filter(|x| x.first().unwrap() == tag_name)
|
||||
.map(|x| x.get(1).unwrap().clone())
|
||||
.collect()
|
||||
}
|
||||
|
@ -355,7 +355,7 @@ impl Event {
|
|||
return Err(EventInvalidId);
|
||||
}
|
||||
// * validate the message digest (sig) using the pubkey & computed sha256 message hash.
|
||||
let sig = schnorr::Signature::from_str(&self.sig).unwrap();
|
||||
let sig = schnorr::Signature::from_str(&self.sig).map_err(|_| EventInvalidSignature)?;
|
||||
if let Ok(msg) = secp256k1::Message::from_slice(digest.as_ref()) {
|
||||
if let Ok(pubkey) = XOnlyPublicKey::from_str(&self.pubkey) {
|
||||
SECP.verify_schnorr(&sig, &msg, &pubkey)
|
||||
|
@ -472,12 +472,11 @@ mod tests {
|
|||
let mut event = Event::simple_event();
|
||||
event.tags = vec![vec!["e".to_owned(), "foo".to_owned()]];
|
||||
event.build_index();
|
||||
assert_eq!(
|
||||
assert!(
|
||||
event.generic_tag_val_intersect(
|
||||
'e',
|
||||
&HashSet::from(["foo".to_owned(), "bar".to_owned()])
|
||||
),
|
||||
true
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
159
src/hexrange.rs
159
src/hexrange.rs
|
@ -1,159 +0,0 @@
|
|||
//! Utilities for searching hexadecimal
|
||||
use crate::utils::is_hex;
|
||||
use hex;
|
||||
|
||||
/// Types of hexadecimal queries.
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone)]
|
||||
pub enum HexSearch {
|
||||
// when no range is needed, exact 32-byte
|
||||
Exact(Vec<u8>),
|
||||
// lower (inclusive) and upper range (exclusive)
|
||||
Range(Vec<u8>, Vec<u8>),
|
||||
// lower bound only, upper bound is MAX inclusive
|
||||
LowerOnly(Vec<u8>),
|
||||
}
|
||||
|
||||
/// Check if a string contains only f chars
|
||||
fn is_all_fs(s: &str) -> bool {
|
||||
s.chars().all(|x| x == 'f' || x == 'F')
|
||||
}
|
||||
|
||||
/// Find the next hex sequence greater than the argument.
|
||||
#[must_use]
|
||||
pub fn hex_range(s: &str) -> Option<HexSearch> {
|
||||
let mut hash_base = s.to_owned();
|
||||
if !is_hex(&hash_base) || hash_base.len() > 64 {
|
||||
return None;
|
||||
}
|
||||
if hash_base.len() == 64 {
|
||||
return Some(HexSearch::Exact(hex::decode(&hash_base).ok()?));
|
||||
}
|
||||
// if s is odd, add a zero
|
||||
let mut odd = hash_base.len() % 2 != 0;
|
||||
if odd {
|
||||
// extend the string to make it even
|
||||
hash_base.push('0');
|
||||
}
|
||||
let base = hex::decode(hash_base).ok()?;
|
||||
// check for all ff's
|
||||
if is_all_fs(s) {
|
||||
// there is no higher bound, we only want to search for blobs greater than this.
|
||||
return Some(HexSearch::LowerOnly(base));
|
||||
}
|
||||
|
||||
// return a range
|
||||
let mut upper = base.clone();
|
||||
let mut byte_len = upper.len();
|
||||
|
||||
// for odd strings, we made them longer, but we want to increment the upper char (+16).
|
||||
// we know we can do this without overflowing because we explicitly set the bottom half to 0's.
|
||||
while byte_len > 0 {
|
||||
byte_len -= 1;
|
||||
// check if byte can be incremented, or if we need to carry.
|
||||
let b = upper[byte_len];
|
||||
if b == u8::MAX {
|
||||
// reset and carry
|
||||
upper[byte_len] = 0;
|
||||
} else if odd {
|
||||
// check if first char in this byte is NOT 'f'
|
||||
if b < 240 {
|
||||
// bump up the first character in this byte
|
||||
upper[byte_len] = b + 16;
|
||||
// increment done, stop iterating through the vec
|
||||
break;
|
||||
}
|
||||
// if it is 'f', reset the byte to 0 and do a carry
|
||||
// reset and carry
|
||||
upper[byte_len] = 0;
|
||||
// done with odd logic, so don't repeat this
|
||||
odd = false;
|
||||
} else {
|
||||
// bump up the first character in this byte
|
||||
upper[byte_len] = b + 1;
|
||||
// increment done, stop iterating
|
||||
break;
|
||||
}
|
||||
}
|
||||
Some(HexSearch::Range(base, upper))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::error::Result;
|
||||
|
||||
#[test]
|
||||
fn hex_range_exact() -> Result<()> {
|
||||
let hex = "abcdef00abcdef00abcdef00abcdef00abcdef00abcdef00abcdef00abcdef00";
|
||||
let r = hex_range(hex);
|
||||
assert_eq!(
|
||||
r,
|
||||
Some(HexSearch::Exact(hex::decode(hex).expect("invalid hex")))
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
#[test]
|
||||
fn hex_full_range() -> Result<()> {
|
||||
let hex = "aaaa";
|
||||
let hex_upper = "aaab";
|
||||
let r = hex_range(hex);
|
||||
assert_eq!(
|
||||
r,
|
||||
Some(HexSearch::Range(
|
||||
hex::decode(hex).expect("invalid hex"),
|
||||
hex::decode(hex_upper).expect("invalid hex")
|
||||
))
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn hex_full_range_odd() -> Result<()> {
|
||||
let r = hex_range("abc");
|
||||
assert_eq!(
|
||||
r,
|
||||
Some(HexSearch::Range(
|
||||
hex::decode("abc0").expect("invalid hex"),
|
||||
hex::decode("abd0").expect("invalid hex")
|
||||
))
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn hex_full_range_odd_end_f() -> Result<()> {
|
||||
let r = hex_range("abf");
|
||||
assert_eq!(
|
||||
r,
|
||||
Some(HexSearch::Range(
|
||||
hex::decode("abf0").expect("invalid hex"),
|
||||
hex::decode("ac00").expect("invalid hex")
|
||||
))
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn hex_no_upper() -> Result<()> {
|
||||
let r = hex_range("ffff");
|
||||
assert_eq!(
|
||||
r,
|
||||
Some(HexSearch::LowerOnly(
|
||||
hex::decode("ffff").expect("invalid hex")
|
||||
))
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn hex_no_upper_odd() -> Result<()> {
|
||||
let r = hex_range("fff");
|
||||
assert_eq!(
|
||||
r,
|
||||
Some(HexSearch::LowerOnly(
|
||||
hex::decode("fff0").expect("invalid hex")
|
||||
))
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
20
src/info.rs
20
src/info.rs
|
@ -4,7 +4,7 @@ use crate::config::Settings;
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub const CARGO_PKG_VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION");
|
||||
pub const UNIT: &str = "sats";
|
||||
pub const UNIT: &str = "msats";
|
||||
|
||||
/// Limitations of the relay as specified in NIP-111
|
||||
/// (This nip isn't finalized so may change)
|
||||
|
@ -13,6 +13,9 @@ pub const UNIT: &str = "sats";
|
|||
pub struct Limitation {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
payment_required: Option<bool>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
restricted_writes: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
|
@ -45,6 +48,8 @@ pub struct RelayInfo {
|
|||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub contact: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub icon: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub supported_nips: Option<Vec<i64>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub software: Option<String>,
|
||||
|
@ -61,7 +66,7 @@ pub struct RelayInfo {
|
|||
/// Convert an Info configuration into public Relay Info
|
||||
impl From<Settings> for RelayInfo {
|
||||
fn from(c: Settings) -> Self {
|
||||
let mut supported_nips = vec![1, 2, 9, 11, 12, 15, 16, 20, 22, 33, 40, 42];
|
||||
let mut supported_nips = vec![1, 2, 9, 11, 12, 15, 16, 20, 22, 33, 40];
|
||||
|
||||
if c.authorization.nip42_auth {
|
||||
supported_nips.push(42);
|
||||
|
@ -73,12 +78,18 @@ impl From<Settings> for RelayInfo {
|
|||
|
||||
let limitations = Limitation {
|
||||
payment_required: Some(p.enabled),
|
||||
restricted_writes: Some(
|
||||
p.enabled
|
||||
|| c.verified_users.is_enabled()
|
||||
|| c.authorization.pubkey_whitelist.is_some()
|
||||
|| c.grpc.restricts_write,
|
||||
),
|
||||
};
|
||||
|
||||
let (payment_url, fees) = if p.enabled {
|
||||
let admission_fee = if p.admission_cost > 0 {
|
||||
Some(vec![Fee {
|
||||
amount: p.admission_cost,
|
||||
amount: p.admission_cost * 1000,
|
||||
unit: UNIT.to_string(),
|
||||
}])
|
||||
} else {
|
||||
|
@ -87,7 +98,7 @@ impl From<Settings> for RelayInfo {
|
|||
|
||||
let post_fee = if p.cost_per_event > 0 {
|
||||
Some(vec![Fee {
|
||||
amount: p.cost_per_event,
|
||||
amount: p.cost_per_event * 1000,
|
||||
unit: UNIT.to_string(),
|
||||
}])
|
||||
} else {
|
||||
|
@ -124,6 +135,7 @@ impl From<Settings> for RelayInfo {
|
|||
limitation: Some(limitations),
|
||||
payment_url,
|
||||
fees,
|
||||
icon: i.relay_icon,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,7 +6,6 @@ pub mod db;
|
|||
pub mod delegation;
|
||||
pub mod error;
|
||||
pub mod event;
|
||||
pub mod hexrange;
|
||||
pub mod info;
|
||||
pub mod nauthz;
|
||||
pub mod nip05;
|
||||
|
@ -14,6 +13,6 @@ pub mod notice;
|
|||
pub mod repo;
|
||||
pub mod subscription;
|
||||
pub mod utils;
|
||||
// Public API for creating relays programatically
|
||||
// Public API for creating relays programmatically
|
||||
pub mod payment;
|
||||
pub mod server;
|
||||
|
|
58
src/main.rs
58
src/main.rs
|
@ -4,13 +4,17 @@ use console_subscriber::ConsoleLayer;
|
|||
use nostr_rs_relay::cli::CLIArgs;
|
||||
use nostr_rs_relay::config;
|
||||
use nostr_rs_relay::server::start_server;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::process;
|
||||
use std::sync::mpsc as syncmpsc;
|
||||
use std::sync::mpsc::{Receiver as MpscReceiver, Sender as MpscSender};
|
||||
use std::thread;
|
||||
use tracing::info;
|
||||
|
||||
#[cfg(not(target_env = "msvc"))]
|
||||
use tikv_jemallocator::Jemalloc;
|
||||
use tracing::info;
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
#[cfg(not(target_env = "msvc"))]
|
||||
#[global_allocator]
|
||||
|
@ -23,9 +27,35 @@ fn main() {
|
|||
// get config file name from args
|
||||
let config_file_arg = args.config;
|
||||
|
||||
// Ensure the config file is readable if it was explicitly set
|
||||
if let Some(config_path) = config_file_arg.as_ref() {
|
||||
let path = Path::new(&config_path);
|
||||
if !path.exists() {
|
||||
eprintln!("Config file not found: {}", &config_path);
|
||||
process::exit(1);
|
||||
}
|
||||
if !path.is_file() {
|
||||
eprintln!("Invalid config file path: {}", &config_path);
|
||||
process::exit(1);
|
||||
}
|
||||
if let Err(err) = fs::metadata(path) {
|
||||
eprintln!("Error while accessing file metadata: {}", err);
|
||||
process::exit(1);
|
||||
}
|
||||
if let Err(err) = fs::File::open(path) {
|
||||
eprintln!("Config file is not readable: {}", err);
|
||||
process::exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
let mut _log_guard: Option<WorkerGuard> = None;
|
||||
|
||||
// configure settings from the config file (defaults to config.toml)
|
||||
// replace default settings with those read from the config file
|
||||
let mut settings = config::Settings::new(&config_file_arg);
|
||||
let mut settings = config::Settings::new(&config_file_arg).unwrap_or_else(|e| {
|
||||
eprintln!("Error reading config file ({:?})", e);
|
||||
process::exit(1);
|
||||
});
|
||||
|
||||
// setup tracing
|
||||
if settings.diagnostics.tracing {
|
||||
|
@ -33,7 +63,27 @@ fn main() {
|
|||
ConsoleLayer::builder().with_default_env().init();
|
||||
} else {
|
||||
// standard logging
|
||||
tracing_subscriber::fmt::try_init().unwrap();
|
||||
if let Some(path) = &settings.logging.folder_path {
|
||||
// write logs to a folder
|
||||
let prefix = match &settings.logging.file_prefix {
|
||||
Some(p) => p.as_str(),
|
||||
None => "relay",
|
||||
};
|
||||
let file_appender = tracing_appender::rolling::daily(path, prefix);
|
||||
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
|
||||
let filter = EnvFilter::from_default_env();
|
||||
// assign to a variable that is not dropped till the program ends
|
||||
_log_guard = Some(guard);
|
||||
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(filter)
|
||||
.with_writer(non_blocking)
|
||||
.try_init()
|
||||
.unwrap();
|
||||
} else {
|
||||
// write to stdout
|
||||
tracing_subscriber::fmt::try_init().unwrap();
|
||||
}
|
||||
}
|
||||
info!("Starting up from main");
|
||||
|
||||
|
|
|
@ -35,13 +35,13 @@ impl std::convert::From<Nip05Name> for nauthz_grpc::event_request::Nip05Name {
|
|||
fn from(value: Nip05Name) -> Self {
|
||||
nauthz_grpc::event_request::Nip05Name {
|
||||
local: value.local.clone(),
|
||||
domain: value.domain.clone(),
|
||||
domain: value.domain,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// conversion of event tags into gprc struct
|
||||
fn tags_to_protobuf(tags: &Vec<Vec<String>>) -> Vec<TagEntry> {
|
||||
fn tags_to_protobuf(tags: &[Vec<String>]) -> Vec<TagEntry> {
|
||||
tags.iter()
|
||||
.map(|x| TagEntry { values: x.clone() })
|
||||
.collect()
|
||||
|
@ -57,7 +57,7 @@ impl EventAuthzService {
|
|||
eas
|
||||
}
|
||||
|
||||
pub async fn ready_connection(self: &mut Self) {
|
||||
pub async fn ready_connection(&mut self) {
|
||||
if self.conn.is_none() {
|
||||
let client = AuthorizationClient::connect(self.server_addr.to_string()).await;
|
||||
if let Err(ref msg) = client {
|
||||
|
@ -70,7 +70,7 @@ impl EventAuthzService {
|
|||
}
|
||||
|
||||
pub async fn admit_event(
|
||||
self: &mut Self,
|
||||
&mut self,
|
||||
event: &Event,
|
||||
ip: &str,
|
||||
origin: Option<String>,
|
||||
|
@ -99,13 +99,13 @@ impl EventAuthzService {
|
|||
origin,
|
||||
user_agent,
|
||||
auth_pubkey,
|
||||
nip05: nip05.map(|x| nauthz_grpc::event_request::Nip05Name::from(x)),
|
||||
nip05: nip05.map(nauthz_grpc::event_request::Nip05Name::from),
|
||||
})
|
||||
.await?;
|
||||
let reply = svr_res.into_inner();
|
||||
return Ok(Box::new(reply));
|
||||
Ok(Box::new(reply))
|
||||
} else {
|
||||
return Err(Error::AuthzError);
|
||||
Err(Error::AuthzError)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,7 +11,7 @@ use crate::repo::NostrRepo;
|
|||
use hyper::body::HttpBody;
|
||||
use hyper::client::connect::HttpConnector;
|
||||
use hyper::Client;
|
||||
use hyper_tls::HttpsConnector;
|
||||
use hyper_rustls::HttpsConnector;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
@ -133,7 +133,12 @@ impl Verifier {
|
|||
) -> Result<Self> {
|
||||
info!("creating NIP-05 verifier");
|
||||
// setup hyper client
|
||||
let https = HttpsConnector::new();
|
||||
let https = hyper_rustls::HttpsConnectorBuilder::new()
|
||||
.with_native_roots()
|
||||
.https_or_http()
|
||||
.enable_http1()
|
||||
.build();
|
||||
|
||||
let client = Client::builder().build::<_, hyper::Body>(https);
|
||||
|
||||
// After all accounts have been re-verified, don't check again
|
||||
|
|
|
@ -5,6 +5,7 @@ pub enum EventResultStatus {
|
|||
Blocked,
|
||||
RateLimited,
|
||||
Error,
|
||||
Restricted,
|
||||
}
|
||||
|
||||
pub struct EventResult {
|
||||
|
@ -24,7 +25,7 @@ impl EventResultStatus {
|
|||
pub fn to_bool(&self) -> bool {
|
||||
match self {
|
||||
Self::Duplicate | Self::Saved => true,
|
||||
Self::Invalid | Self::Blocked | Self::RateLimited | Self::Error => false,
|
||||
Self::Invalid | Self::Blocked | Self::RateLimited | Self::Error | Self::Restricted => false,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -37,6 +38,7 @@ impl EventResultStatus {
|
|||
Self::Blocked => "blocked",
|
||||
Self::RateLimited => "rate-limited",
|
||||
Self::Error => "error",
|
||||
Self::Restricted => "restricted",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -81,6 +83,11 @@ impl Notice {
|
|||
Notice::prefixed(id, msg, EventResultStatus::Error)
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn restricted(id: String, msg: &str) -> Notice {
|
||||
Notice::prefixed(id, msg, EventResultStatus::Restricted)
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn saved(id: String) -> Notice {
|
||||
Notice::EventResult(EventResult {
|
||||
|
|
|
@ -2,13 +2,13 @@
|
|||
use http::Uri;
|
||||
use hyper::client::connect::HttpConnector;
|
||||
use hyper::Client;
|
||||
use hyper_tls::HttpsConnector;
|
||||
use hyper_rustls::HttpsConnector;
|
||||
use nostr::Keys;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use rand::Rng;
|
||||
use tracing::debug;
|
||||
|
||||
use std::str::FromStr;
|
||||
use url::Url;
|
||||
|
@ -72,7 +72,11 @@ pub struct LNBitsPaymentProcessor {
|
|||
impl LNBitsPaymentProcessor {
|
||||
pub fn new(settings: &Settings) -> Self {
|
||||
// setup hyper client
|
||||
let https = HttpsConnector::new();
|
||||
let https = hyper_rustls::HttpsConnectorBuilder::new()
|
||||
.with_native_roots()
|
||||
.https_only()
|
||||
.enable_http1()
|
||||
.build();
|
||||
let client = Client::builder().build::<_, hyper::Body>(https);
|
||||
|
||||
Self {
|
||||
|
@ -110,8 +114,7 @@ impl PaymentProcessor for LNBitsPaymentProcessor {
|
|||
expiry: 3600,
|
||||
};
|
||||
let url = Url::parse(&self.settings.pay_to_relay.node_url)?.join(APIPATH)?;
|
||||
let uri = Uri::from_str(url.as_str().strip_suffix("/").unwrap_or(url.as_str())).unwrap();
|
||||
debug!("{uri}");
|
||||
let uri = Uri::from_str(url.as_str().strip_suffix('/').unwrap_or(url.as_str())).unwrap();
|
||||
|
||||
let req = hyper::Request::builder()
|
||||
.method(hyper::Method::POST)
|
||||
|
@ -122,14 +125,10 @@ impl PaymentProcessor for LNBitsPaymentProcessor {
|
|||
|
||||
let res = self.client.request(req).await?;
|
||||
|
||||
debug!("{res:?}");
|
||||
|
||||
// Json to Struct of LNbits callback
|
||||
let body = hyper::body::to_bytes(res.into_body()).await?;
|
||||
let invoice_response: LNBitsCreateInvoiceResponse = serde_json::from_slice(&body)?;
|
||||
|
||||
debug!("{:?}", invoice_response);
|
||||
|
||||
Ok(InvoiceInfo {
|
||||
pubkey: key.public_key().to_string(),
|
||||
payment_hash: invoice_response.payment_hash,
|
||||
|
@ -147,7 +146,6 @@ impl PaymentProcessor for LNBitsPaymentProcessor {
|
|||
.join(APIPATH)?
|
||||
.join(payment_hash)?;
|
||||
let uri = Uri::from_str(url.as_str()).unwrap();
|
||||
debug!("{uri}");
|
||||
|
||||
let req = hyper::Request::builder()
|
||||
.method(hyper::Method::GET)
|
||||
|
@ -159,13 +157,18 @@ impl PaymentProcessor for LNBitsPaymentProcessor {
|
|||
let res = self.client.request(req).await?;
|
||||
// Json to Struct of LNbits callback
|
||||
let body = hyper::body::to_bytes(res.into_body()).await?;
|
||||
debug!("check invoice: {body:?}");
|
||||
let invoice_response: LNBitsCheckInvoiceResponse = serde_json::from_slice(&body)?;
|
||||
let invoice_response: Value = serde_json::from_slice(&body)?;
|
||||
|
||||
let status = if invoice_response.paid {
|
||||
InvoiceStatus::Paid
|
||||
let status = if let Ok(invoice_response) =
|
||||
serde_json::from_value::<LNBitsCheckInvoiceResponse>(invoice_response)
|
||||
{
|
||||
if invoice_response.paid {
|
||||
InvoiceStatus::Paid
|
||||
} else {
|
||||
InvoiceStatus::Unpaid
|
||||
}
|
||||
} else {
|
||||
InvoiceStatus::Unpaid
|
||||
InvoiceStatus::Expired
|
||||
};
|
||||
|
||||
Ok(status)
|
||||
|
|
|
@ -25,7 +25,7 @@ pub struct Payment {
|
|||
/// Settings
|
||||
settings: crate::config::Settings,
|
||||
// Nostr Keys
|
||||
nostr_keys: Keys,
|
||||
nostr_keys: Option<Keys>,
|
||||
/// Payment Processor
|
||||
processor: Arc<dyn PaymentProcessor>,
|
||||
}
|
||||
|
@ -102,7 +102,11 @@ impl Payment {
|
|||
info!("Create payment handler");
|
||||
|
||||
// Create nostr key from sk string
|
||||
let nostr_keys = Keys::from_sk_str(&settings.pay_to_relay.secret_key)?;
|
||||
let nostr_keys = if let Some(secret_key) = &settings.pay_to_relay.secret_key {
|
||||
Some(Keys::from_sk_str(secret_key)?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Create processor kind defined in settings
|
||||
let processor = match &settings.pay_to_relay.processor {
|
||||
|
@ -130,7 +134,7 @@ impl Payment {
|
|||
}
|
||||
}
|
||||
|
||||
/// Internal select loop for preforming payment operatons
|
||||
/// Internal select loop for preforming payment operations
|
||||
async fn run_internal(&mut self) -> Result<()> {
|
||||
tokio::select! {
|
||||
m = self.payment_rx.recv() => {
|
||||
|
@ -148,7 +152,7 @@ impl Payment {
|
|||
Ok(PaymentMessage::CheckAccount(pubkey)) => {
|
||||
let keys = Keys::from_pk_str(&pubkey)?;
|
||||
|
||||
if let Some(invoice_info) = self.repo.get_unpaid_invoice(&keys).await? {
|
||||
if let Ok(Some(invoice_info)) = self.repo.get_unpaid_invoice(&keys).await {
|
||||
match self.check_invoice_status(&invoice_info.payment_hash).await? {
|
||||
InvoiceStatus::Paid => {
|
||||
self.repo.admit_account(&keys, self.settings.pay_to_relay.admission_cost).await?;
|
||||
|
@ -158,6 +162,10 @@ impl Payment {
|
|||
self.payment_tx.send(PaymentMessage::Invoice(pubkey, invoice_info)).ok();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let amount = self.settings.pay_to_relay.admission_cost;
|
||||
let invoice_info = self.get_invoice_info(&pubkey, amount).await?;
|
||||
self.payment_tx.send(PaymentMessage::Invoice(pubkey, invoice_info)).ok();
|
||||
}
|
||||
}
|
||||
Ok(PaymentMessage::InvoicePaid(payment_hash)) => {
|
||||
|
@ -189,6 +197,11 @@ impl Payment {
|
|||
pubkey: &str,
|
||||
invoice_info: &InvoiceInfo,
|
||||
) -> Result<()> {
|
||||
let nostr_keys = match &self.nostr_keys {
|
||||
Some(key) => key,
|
||||
None => return Err(Error::CustomError("Nostr key not defined".to_string())),
|
||||
};
|
||||
|
||||
// Create Nostr key from pk
|
||||
let key = Keys::from_pk_str(pubkey)?;
|
||||
|
||||
|
@ -196,16 +209,16 @@ impl Payment {
|
|||
|
||||
// Event DM with terms of service
|
||||
let message_event: NostrEvent = EventBuilder::new_encrypted_direct_msg(
|
||||
&self.nostr_keys,
|
||||
nostr_keys,
|
||||
pubkey,
|
||||
&self.settings.pay_to_relay.terms_message,
|
||||
)?
|
||||
.to_event(&self.nostr_keys)?;
|
||||
.to_event(nostr_keys)?;
|
||||
|
||||
// Event DM with invoice
|
||||
let invoice_event: NostrEvent =
|
||||
EventBuilder::new_encrypted_direct_msg(&self.nostr_keys, pubkey, &invoice_info.bolt11)?
|
||||
.to_event(&self.nostr_keys)?;
|
||||
EventBuilder::new_encrypted_direct_msg(nostr_keys, pubkey, &invoice_info.bolt11)?
|
||||
.to_event(nostr_keys)?;
|
||||
|
||||
// Persist DM events to DB
|
||||
self.repo.write_event(&message_event.clone().into()).await?;
|
||||
|
@ -242,8 +255,10 @@ impl Payment {
|
|||
.create_invoice_record(&key, invoice_info.clone())
|
||||
.await?;
|
||||
|
||||
// Admission event invoice and terms to pubkey that is joining
|
||||
self.send_admission_message(pubkey, &invoice_info).await?;
|
||||
if self.settings.pay_to_relay.direct_message {
|
||||
// Admission event invoice and terms to pubkey that is joining
|
||||
self.send_admission_message(pubkey, &invoice_info).await?;
|
||||
}
|
||||
|
||||
Ok(invoice_info)
|
||||
}
|
||||
|
|
|
@ -14,27 +14,27 @@ use sqlx::{Error, Execute, FromRow, Postgres, QueryBuilder, Row};
|
|||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::error;
|
||||
use crate::hexrange::{hex_range, HexSearch};
|
||||
use crate::repo::postgres_migration::run_migrations;
|
||||
use crate::server::NostrMetrics;
|
||||
use crate::utils::{self, is_hex, is_lower_hex};
|
||||
use nostr::key::Keys;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::oneshot::Receiver;
|
||||
use tracing::log::trace;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
|
||||
pub type PostgresPool = sqlx::pool::Pool<Postgres>;
|
||||
|
||||
pub struct PostgresRepo {
|
||||
conn: PostgresPool,
|
||||
conn_write: PostgresPool,
|
||||
metrics: NostrMetrics,
|
||||
}
|
||||
|
||||
impl PostgresRepo {
|
||||
pub fn new(c: PostgresPool, m: NostrMetrics) -> PostgresRepo {
|
||||
pub fn new(c: PostgresPool, cw: PostgresPool, m: NostrMetrics) -> PostgresRepo {
|
||||
PostgresRepo {
|
||||
conn: c,
|
||||
conn_write: cw,
|
||||
metrics: m,
|
||||
}
|
||||
}
|
||||
|
@ -81,17 +81,17 @@ async fn delete_expired(conn: PostgresPool) -> Result<u64> {
|
|||
impl NostrRepo for PostgresRepo {
|
||||
async fn start(&self) -> Result<()> {
|
||||
// begin a cleanup task for expired events.
|
||||
cleanup_expired(self.conn.clone(), Duration::from_secs(600)).await?;
|
||||
cleanup_expired(self.conn_write.clone(), Duration::from_secs(600)).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn migrate_up(&self) -> Result<usize> {
|
||||
Ok(run_migrations(&self.conn).await?)
|
||||
Ok(run_migrations(&self.conn_write).await?)
|
||||
}
|
||||
|
||||
async fn write_event(&self, e: &Event) -> Result<u64> {
|
||||
// start transaction
|
||||
let mut tx = self.conn.begin().await?;
|
||||
let mut tx = self.conn_write.begin().await?;
|
||||
let start = Instant::now();
|
||||
|
||||
// get relevant fields from event and convert to blobs.
|
||||
|
@ -455,7 +455,7 @@ ON CONFLICT (id) DO NOTHING"#,
|
|||
}
|
||||
|
||||
async fn create_verification_record(&self, event_id: &str, name: &str) -> Result<()> {
|
||||
let mut tx = self.conn.begin().await?;
|
||||
let mut tx = self.conn_write.begin().await?;
|
||||
|
||||
sqlx::query("DELETE FROM user_verification WHERE \"name\" = $1")
|
||||
.bind(name)
|
||||
|
@ -481,7 +481,7 @@ ON CONFLICT (id) DO NOTHING"#,
|
|||
sqlx::query("UPDATE user_verification SET verified_at = $1, fail_count = 0 WHERE id = $2")
|
||||
.bind(Utc.timestamp_opt(verify_time as i64, 0).unwrap())
|
||||
.bind(id as i64)
|
||||
.execute(&self.conn)
|
||||
.execute(&self.conn_write)
|
||||
.await?;
|
||||
|
||||
info!("verification updated for {}", id);
|
||||
|
@ -491,7 +491,7 @@ ON CONFLICT (id) DO NOTHING"#,
|
|||
async fn fail_verification(&self, id: u64) -> Result<()> {
|
||||
sqlx::query("UPDATE user_verification SET failed_at = now(), fail_count = fail_count + 1 WHERE id = $1")
|
||||
.bind(id as i64)
|
||||
.execute(&self.conn)
|
||||
.execute(&self.conn_write)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -499,7 +499,7 @@ ON CONFLICT (id) DO NOTHING"#,
|
|||
async fn delete_verification(&self, id: u64) -> Result<()> {
|
||||
sqlx::query("DELETE FROM user_verification WHERE id = $1")
|
||||
.bind(id as i64)
|
||||
.execute(&self.conn)
|
||||
.execute(&self.conn_write)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -551,7 +551,7 @@ ON CONFLICT (id) DO NOTHING"#,
|
|||
|
||||
async fn create_account(&self, pub_key: &Keys) -> Result<bool> {
|
||||
let pub_key = pub_key.public_key().to_string();
|
||||
let mut tx = self.conn.begin().await?;
|
||||
let mut tx = self.conn_write.begin().await?;
|
||||
|
||||
let result = sqlx::query("INSERT INTO account (pubkey, balance) VALUES ($1, 0);")
|
||||
.bind(pub_key)
|
||||
|
@ -577,7 +577,7 @@ ON CONFLICT (id) DO NOTHING"#,
|
|||
)
|
||||
.bind(admission_cost as i64)
|
||||
.bind(pub_key)
|
||||
.execute(&self.conn)
|
||||
.execute(&self.conn_write)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -594,7 +594,7 @@ ON CONFLICT (id) DO NOTHING"#,
|
|||
|
||||
let result = sqlx::query_as::<_, (bool, i64)>(query)
|
||||
.bind(pub_key)
|
||||
.fetch_optional(&self.conn)
|
||||
.fetch_optional(&self.conn_write)
|
||||
.await?
|
||||
.ok_or(error::Error::SqlxError(RowNotFound))?;
|
||||
|
||||
|
@ -614,14 +614,14 @@ ON CONFLICT (id) DO NOTHING"#,
|
|||
sqlx::query("UPDATE account SET balance = balance + $1 WHERE pubkey = $2")
|
||||
.bind(new_balance as i64)
|
||||
.bind(pub_key)
|
||||
.execute(&self.conn)
|
||||
.execute(&self.conn_write)
|
||||
.await?
|
||||
}
|
||||
false => {
|
||||
sqlx::query("UPDATE account SET balance = balance - $1 WHERE pubkey = $2")
|
||||
.bind(new_balance as i64)
|
||||
.bind(pub_key)
|
||||
.execute(&self.conn)
|
||||
.execute(&self.conn_write)
|
||||
.await?
|
||||
}
|
||||
};
|
||||
|
@ -631,19 +631,19 @@ ON CONFLICT (id) DO NOTHING"#,
|
|||
/// Create invoice record
|
||||
async fn create_invoice_record(&self, pub_key: &Keys, invoice_info: InvoiceInfo) -> Result<()> {
|
||||
let pub_key = pub_key.public_key().to_string();
|
||||
let mut tx = self.conn.begin().await?;
|
||||
let mut tx = self.conn_write.begin().await?;
|
||||
|
||||
sqlx::query(
|
||||
"INSERT INTO invoice (pubkey, payment_hash, amount, status, description, created_at, invoice) VALUES ($1, $2, $3, $4, $5, now(), $6)",
|
||||
)
|
||||
.bind(pub_key)
|
||||
.bind(invoice_info.payment_hash)
|
||||
.bind(invoice_info.amount as i64)
|
||||
.bind(invoice_info.status)
|
||||
.bind(invoice_info.memo)
|
||||
.bind(invoice_info.bolt11)
|
||||
.execute(&mut tx)
|
||||
.await.unwrap();
|
||||
.bind(pub_key)
|
||||
.bind(invoice_info.payment_hash)
|
||||
.bind(invoice_info.amount as i64)
|
||||
.bind(invoice_info.status)
|
||||
.bind(invoice_info.memo)
|
||||
.bind(invoice_info.bolt11)
|
||||
.execute(&mut tx)
|
||||
.await.unwrap();
|
||||
|
||||
debug!("Invoice added");
|
||||
|
||||
|
@ -658,7 +658,7 @@ ON CONFLICT (id) DO NOTHING"#,
|
|||
let (pubkey, prev_invoice_status, amount) =
|
||||
sqlx::query_as::<_, (String, InvoiceStatus, i64)>(query)
|
||||
.bind(payment_hash)
|
||||
.fetch_optional(&self.conn)
|
||||
.fetch_optional(&self.conn_write)
|
||||
.await?
|
||||
.ok_or(error::Error::SqlxError(RowNotFound))?;
|
||||
|
||||
|
@ -672,14 +672,14 @@ ON CONFLICT (id) DO NOTHING"#,
|
|||
sqlx::query(query)
|
||||
.bind(&status)
|
||||
.bind(payment_hash)
|
||||
.execute(&self.conn)
|
||||
.execute(&self.conn_write)
|
||||
.await?;
|
||||
|
||||
if prev_invoice_status.eq(&InvoiceStatus::Unpaid) && status.eq(&InvoiceStatus::Paid) {
|
||||
sqlx::query("UPDATE account SET balance = balance + $1 WHERE pubkey = $2")
|
||||
.bind(amount)
|
||||
.bind(&pubkey)
|
||||
.execute(&self.conn)
|
||||
.execute(&self.conn_write)
|
||||
.await?;
|
||||
}
|
||||
|
||||
|
@ -698,7 +698,7 @@ LIMIT 1;
|
|||
"#;
|
||||
match sqlx::query_as::<_, (i64, String, String, String)>(query)
|
||||
.bind(pubkey.public_key().to_string())
|
||||
.fetch_optional(&self.conn)
|
||||
.fetch_optional(&self.conn_write)
|
||||
.await
|
||||
.unwrap()
|
||||
{
|
||||
|
@ -732,140 +732,62 @@ fn query_from_filter(f: &ReqFilter) -> Option<QueryBuilder<Postgres>> {
|
|||
// filter out non-hex values
|
||||
let auth_vec: Vec<&String> = auth_vec.iter().filter(|a| is_hex(a)).collect();
|
||||
|
||||
if !auth_vec.is_empty() {
|
||||
query.push("(");
|
||||
|
||||
// shortcut authors into "IN" query
|
||||
let any_is_range = auth_vec.iter().any(|pk| pk.len() != 64);
|
||||
if !any_is_range {
|
||||
query.push("e.pub_key in (");
|
||||
let mut pk_sep = query.separated(", ");
|
||||
for pk in auth_vec.iter() {
|
||||
pk_sep.push_bind(hex::decode(pk).ok());
|
||||
}
|
||||
query.push(") OR e.delegated_by in (");
|
||||
let mut pk_delegated_sep = query.separated(", ");
|
||||
for pk in auth_vec.iter() {
|
||||
pk_delegated_sep.push_bind(hex::decode(pk).ok());
|
||||
}
|
||||
query.push(")");
|
||||
push_and = true;
|
||||
} else {
|
||||
let mut range_authors = query.separated(" OR ");
|
||||
for auth in auth_vec {
|
||||
match hex_range(auth) {
|
||||
Some(HexSearch::Exact(ex)) => {
|
||||
range_authors
|
||||
.push("(e.pub_key = ")
|
||||
.push_bind_unseparated(ex.clone())
|
||||
.push_unseparated(" OR e.delegated_by = ")
|
||||
.push_bind_unseparated(ex)
|
||||
.push_unseparated(")");
|
||||
}
|
||||
Some(HexSearch::Range(lower, upper)) => {
|
||||
range_authors
|
||||
.push("((e.pub_key > ")
|
||||
.push_bind_unseparated(lower.clone())
|
||||
.push_unseparated(" AND e.pub_key < ")
|
||||
.push_bind_unseparated(upper.clone())
|
||||
.push_unseparated(") OR (e.delegated_by > ")
|
||||
.push_bind_unseparated(lower)
|
||||
.push_unseparated(" AND e.delegated_by < ")
|
||||
.push_bind_unseparated(upper)
|
||||
.push_unseparated("))");
|
||||
}
|
||||
Some(HexSearch::LowerOnly(lower)) => {
|
||||
range_authors
|
||||
.push("(e.pub_key > ")
|
||||
.push_bind_unseparated(lower.clone())
|
||||
.push_unseparated(" OR e.delegated_by > ")
|
||||
.push_bind_unseparated(lower)
|
||||
.push_unseparated(")");
|
||||
}
|
||||
None => {
|
||||
info!("Could not parse hex range from author {:?}", auth);
|
||||
}
|
||||
}
|
||||
push_and = true;
|
||||
}
|
||||
}
|
||||
query.push(")");
|
||||
if auth_vec.is_empty() {
|
||||
return None;
|
||||
}
|
||||
query.push("(e.pub_key in (");
|
||||
|
||||
let mut pk_sep = query.separated(", ");
|
||||
for pk in auth_vec.iter() {
|
||||
pk_sep.push_bind(hex::decode(pk).ok());
|
||||
}
|
||||
query.push(") OR e.delegated_by in (");
|
||||
let mut pk_delegated_sep = query.separated(", ");
|
||||
for pk in auth_vec.iter() {
|
||||
pk_delegated_sep.push_bind(hex::decode(pk).ok());
|
||||
}
|
||||
push_and = true;
|
||||
query.push("))");
|
||||
}
|
||||
|
||||
// Query for Kind
|
||||
if let Some(ks) = &f.kinds {
|
||||
if !ks.is_empty() {
|
||||
if push_and {
|
||||
query.push(" AND ");
|
||||
}
|
||||
push_and = true;
|
||||
|
||||
query.push("e.kind in (");
|
||||
let mut list_query = query.separated(", ");
|
||||
for k in ks.iter() {
|
||||
list_query.push_bind(*k as i64);
|
||||
}
|
||||
query.push(")");
|
||||
if ks.is_empty() {
|
||||
return None;
|
||||
}
|
||||
if push_and {
|
||||
query.push(" AND ");
|
||||
}
|
||||
push_and = true;
|
||||
|
||||
query.push("e.kind in (");
|
||||
let mut list_query = query.separated(", ");
|
||||
for k in ks.iter() {
|
||||
list_query.push_bind(*k as i64);
|
||||
}
|
||||
query.push(")");
|
||||
}
|
||||
|
||||
// Query for event, allowing prefix matches
|
||||
// Query for event,
|
||||
if let Some(id_vec) = &f.ids {
|
||||
// filter out non-hex values
|
||||
let id_vec: Vec<&String> = id_vec.iter().filter(|a| is_hex(a)).collect();
|
||||
|
||||
if !id_vec.is_empty() {
|
||||
if push_and {
|
||||
query.push(" AND (");
|
||||
} else {
|
||||
query.push("(");
|
||||
}
|
||||
push_and = true;
|
||||
|
||||
// shortcut ids into "IN" query
|
||||
let any_is_range = id_vec.iter().any(|pk| pk.len() != 64);
|
||||
if !any_is_range {
|
||||
query.push("id in (");
|
||||
let mut sep = query.separated(", ");
|
||||
for id in id_vec.iter() {
|
||||
sep.push_bind(hex::decode(id).ok());
|
||||
}
|
||||
query.push(")");
|
||||
} else {
|
||||
// take each author and convert to a hex search
|
||||
let mut id_query = query.separated(" OR ");
|
||||
for id in id_vec {
|
||||
match hex_range(id) {
|
||||
Some(HexSearch::Exact(ex)) => {
|
||||
id_query
|
||||
.push("(id = ")
|
||||
.push_bind_unseparated(ex)
|
||||
.push_unseparated(")");
|
||||
}
|
||||
Some(HexSearch::Range(lower, upper)) => {
|
||||
id_query
|
||||
.push("(id > ")
|
||||
.push_bind_unseparated(lower)
|
||||
.push_unseparated(" AND id < ")
|
||||
.push_bind_unseparated(upper)
|
||||
.push_unseparated(")");
|
||||
}
|
||||
Some(HexSearch::LowerOnly(lower)) => {
|
||||
id_query
|
||||
.push("(id > ")
|
||||
.push_bind_unseparated(lower)
|
||||
.push_unseparated(")");
|
||||
}
|
||||
None => {
|
||||
info!("Could not parse hex range from id {:?}", id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
query.push(")");
|
||||
if id_vec.is_empty() {
|
||||
return None;
|
||||
}
|
||||
if push_and {
|
||||
query.push(" AND (");
|
||||
} else {
|
||||
query.push("(");
|
||||
}
|
||||
push_and = true;
|
||||
|
||||
query.push("id in (");
|
||||
let mut sep = query.separated(", ");
|
||||
for id in id_vec.iter() {
|
||||
sep.push_bind(hex::decode(id).ok());
|
||||
}
|
||||
query.push("))");
|
||||
}
|
||||
|
||||
// Query for tags
|
||||
|
@ -876,22 +798,46 @@ fn query_from_filter(f: &ReqFilter) -> Option<QueryBuilder<Postgres>> {
|
|||
}
|
||||
push_and = true;
|
||||
|
||||
let mut push_or = false;
|
||||
query.push("e.id IN (SELECT ee.id FROM \"event\" ee LEFT JOIN tag t on ee.id = t.event_id WHERE ee.hidden != 1::bit(1) and ");
|
||||
for (key, val) in map.iter() {
|
||||
query.push("e.id IN (SELECT ee.id FROM \"event\" ee LEFT JOIN tag t on ee.id = t.event_id WHERE ee.hidden != 1::bit(1) and (t.\"name\" = ")
|
||||
if val.is_empty() {
|
||||
return None;
|
||||
}
|
||||
if push_or {
|
||||
query.push(" OR ");
|
||||
}
|
||||
query
|
||||
.push("(t.\"name\" = ")
|
||||
.push_bind(key.to_string())
|
||||
.push(" AND (value in (");
|
||||
.push(" AND (");
|
||||
|
||||
// plain value match first
|
||||
let mut tag_query = query.separated(", ");
|
||||
for v in val.iter() {
|
||||
if (v.len() % 2 != 0) && !is_lower_hex(v) {
|
||||
let has_plain_values = val.iter().any(|v| (v.len() % 2 != 0 || !is_lower_hex(v)));
|
||||
let has_hex_values = val.iter().any(|v| v.len() % 2 == 0 && is_lower_hex(v));
|
||||
if has_plain_values {
|
||||
query.push("value in (");
|
||||
// plain value match first
|
||||
let mut tag_query = query.separated(", ");
|
||||
for v in val.iter().filter(|v| v.len() % 2 != 0 || !is_lower_hex(v)) {
|
||||
tag_query.push_bind(v.as_bytes());
|
||||
} else {
|
||||
}
|
||||
}
|
||||
if has_plain_values && has_hex_values {
|
||||
query.push(") OR ");
|
||||
}
|
||||
if has_hex_values {
|
||||
query.push("value_hex in (");
|
||||
// plain value match first
|
||||
let mut tag_query = query.separated(", ");
|
||||
for v in val.iter().filter(|v| v.len() % 2 == 0 && is_lower_hex(v)) {
|
||||
tag_query.push_bind(hex::decode(v).ok());
|
||||
}
|
||||
}
|
||||
query.push("))))");
|
||||
|
||||
query.push(")))");
|
||||
push_or = true;
|
||||
}
|
||||
query.push(")");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -902,7 +848,7 @@ fn query_from_filter(f: &ReqFilter) -> Option<QueryBuilder<Postgres>> {
|
|||
}
|
||||
push_and = true;
|
||||
query
|
||||
.push("e.created_at > ")
|
||||
.push("e.created_at >= ")
|
||||
.push_bind(Utc.timestamp_opt(f.since.unwrap() as i64, 0).unwrap());
|
||||
}
|
||||
|
||||
|
@ -913,7 +859,7 @@ fn query_from_filter(f: &ReqFilter) -> Option<QueryBuilder<Postgres>> {
|
|||
}
|
||||
push_and = true;
|
||||
query
|
||||
.push("e.created_at < ")
|
||||
.push("e.created_at <= ")
|
||||
.push_bind(Utc.timestamp_opt(f.until.unwrap() as i64, 0).unwrap());
|
||||
}
|
||||
|
||||
|
@ -924,10 +870,7 @@ fn query_from_filter(f: &ReqFilter) -> Option<QueryBuilder<Postgres>> {
|
|||
query.push("e.hidden != 1::bit(1)");
|
||||
}
|
||||
// never display expired events
|
||||
query
|
||||
.push(" AND (e.expires_at IS NULL OR e.expires_at > ")
|
||||
.push_bind(Utc.timestamp_opt(utils::unix_time() as i64, 0).unwrap())
|
||||
.push(")");
|
||||
query.push(" AND (e.expires_at IS NULL OR e.expires_at > now())");
|
||||
|
||||
// Apply per-filter limit to this query.
|
||||
// The use of a LIMIT implies a DESC order, to capture only the most recent events.
|
||||
|
@ -962,3 +905,111 @@ impl FromRow<'_, PgRow> for VerificationRecord {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
#[test]
|
||||
fn test_query_gen_tag_value_hex() {
|
||||
let filter = ReqFilter {
|
||||
ids: None,
|
||||
kinds: Some(vec![1000]),
|
||||
since: None,
|
||||
until: None,
|
||||
authors: Some(vec![
|
||||
"84de35e2584d2b144aae823c9ed0b0f3deda09648530b93d1a2a146d1dea9864".to_owned(),
|
||||
]),
|
||||
limit: None,
|
||||
tags: Some(HashMap::from([(
|
||||
'p',
|
||||
HashSet::from([
|
||||
"63fe6318dc58583cfe16810f86dd09e18bfd76aabc24a0081ce2856f330504ed".to_owned(),
|
||||
]),
|
||||
)])),
|
||||
force_no_match: false,
|
||||
};
|
||||
|
||||
let q = query_from_filter(&filter).unwrap();
|
||||
assert_eq!(q.sql(), "SELECT e.\"content\", e.created_at FROM \"event\" e WHERE (e.pub_key in ($1) OR e.delegated_by in ($2)) AND e.kind in ($3) AND e.id IN (SELECT ee.id FROM \"event\" ee LEFT JOIN tag t on ee.id = t.event_id WHERE ee.hidden != 1::bit(1) and (t.\"name\" = $4 AND (value_hex in ($5)))) AND e.hidden != 1::bit(1) AND (e.expires_at IS NULL OR e.expires_at > now()) ORDER BY e.created_at ASC LIMIT 1000")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_query_gen_tag_value() {
|
||||
let filter = ReqFilter {
|
||||
ids: None,
|
||||
kinds: Some(vec![1000]),
|
||||
since: None,
|
||||
until: None,
|
||||
authors: Some(vec![
|
||||
"84de35e2584d2b144aae823c9ed0b0f3deda09648530b93d1a2a146d1dea9864".to_owned(),
|
||||
]),
|
||||
limit: None,
|
||||
tags: Some(HashMap::from([('d', HashSet::from(["test".to_owned()]))])),
|
||||
force_no_match: false,
|
||||
};
|
||||
|
||||
let q = query_from_filter(&filter).unwrap();
|
||||
assert_eq!(q.sql(), "SELECT e.\"content\", e.created_at FROM \"event\" e WHERE (e.pub_key in ($1) OR e.delegated_by in ($2)) AND e.kind in ($3) AND e.id IN (SELECT ee.id FROM \"event\" ee LEFT JOIN tag t on ee.id = t.event_id WHERE ee.hidden != 1::bit(1) and (t.\"name\" = $4 AND (value in ($5)))) AND e.hidden != 1::bit(1) AND (e.expires_at IS NULL OR e.expires_at > now()) ORDER BY e.created_at ASC LIMIT 1000")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_query_gen_tag_value_and_value_hex() {
|
||||
let filter = ReqFilter {
|
||||
ids: None,
|
||||
kinds: Some(vec![1000]),
|
||||
since: None,
|
||||
until: None,
|
||||
authors: Some(vec![
|
||||
"84de35e2584d2b144aae823c9ed0b0f3deda09648530b93d1a2a146d1dea9864".to_owned(),
|
||||
]),
|
||||
limit: None,
|
||||
tags: Some(HashMap::from([(
|
||||
'd',
|
||||
HashSet::from([
|
||||
"test".to_owned(),
|
||||
"63fe6318dc58583cfe16810f86dd09e18bfd76aabc24a0081ce2856f330504ed".to_owned(),
|
||||
]),
|
||||
)])),
|
||||
force_no_match: false,
|
||||
};
|
||||
|
||||
let q = query_from_filter(&filter).unwrap();
|
||||
assert_eq!(q.sql(), "SELECT e.\"content\", e.created_at FROM \"event\" e WHERE (e.pub_key in ($1) OR e.delegated_by in ($2)) AND e.kind in ($3) AND e.id IN (SELECT ee.id FROM \"event\" ee LEFT JOIN tag t on ee.id = t.event_id WHERE ee.hidden != 1::bit(1) and (t.\"name\" = $4 AND (value in ($5) OR value_hex in ($6)))) AND e.hidden != 1::bit(1) AND (e.expires_at IS NULL OR e.expires_at > now()) ORDER BY e.created_at ASC LIMIT 1000")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_query_multiple_tags() {
|
||||
let filter = ReqFilter {
|
||||
ids: None,
|
||||
kinds: Some(vec![30_001]),
|
||||
since: None,
|
||||
until: None,
|
||||
authors: None,
|
||||
limit: None,
|
||||
tags: Some(HashMap::from([
|
||||
('d', HashSet::from(["follow".to_owned()])),
|
||||
('t', HashSet::from(["siamstr".to_owned()])),
|
||||
])),
|
||||
force_no_match: false,
|
||||
};
|
||||
let q = query_from_filter(&filter).unwrap();
|
||||
assert_eq!(q.sql(), "SELECT e.\"content\", e.created_at FROM \"event\" e WHERE e.kind in ($1) AND e.id IN (SELECT ee.id FROM \"event\" ee LEFT JOIN tag t on ee.id = t.event_id WHERE ee.hidden != 1::bit(1) and (t.\"name\" = $2 AND (value in ($3))) OR (t.\"name\" = $4 AND (value in ($5)))) AND e.hidden != 1::bit(1) AND (e.expires_at IS NULL OR e.expires_at > now()) ORDER BY e.created_at ASC LIMIT 1000")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_query_empty_tags() {
|
||||
let filter = ReqFilter {
|
||||
ids: None,
|
||||
kinds: Some(vec![1, 6, 16, 30023, 1063, 6969]),
|
||||
since: Some(1700697846),
|
||||
until: None,
|
||||
authors: None,
|
||||
limit: None,
|
||||
tags: Some(HashMap::from([('a', HashSet::new())])),
|
||||
force_no_match: false,
|
||||
};
|
||||
assert!(query_from_filter(&filter).is_none());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -123,7 +123,7 @@ CREATE TABLE "tag" (
|
|||
CREATE INDEX tag_event_id_idx ON tag USING btree (event_id, name);
|
||||
CREATE INDEX tag_value_idx ON tag USING btree (value);
|
||||
|
||||
-- NIP-05 Verfication table
|
||||
-- NIP-05 Verification table
|
||||
CREATE TABLE "user_verification" (
|
||||
id int8 NOT NULL GENERATED BY DEFAULT AS IDENTITY,
|
||||
event_id bytea NOT NULL,
|
||||
|
@ -205,7 +205,7 @@ CREATE INDEX tag_value_hex_idx ON tag USING btree (value_hex);
|
|||
let event: Event = serde_json::from_str(&String::from_utf8(event_bytes).unwrap())?;
|
||||
|
||||
for t in event.tags.iter().filter(|x| x.len() > 1) {
|
||||
let tagname = t.get(0).unwrap();
|
||||
let tagname = t.first().unwrap();
|
||||
let tagnamechar_opt = single_char_tagname(tagname);
|
||||
if tagnamechar_opt.is_none() {
|
||||
continue;
|
||||
|
|
|
@ -4,8 +4,6 @@ use crate::config::Settings;
|
|||
use crate::db::QueryResult;
|
||||
use crate::error::{Error::SqlError, Result};
|
||||
use crate::event::{single_char_tagname, Event};
|
||||
use crate::hexrange::hex_range;
|
||||
use crate::hexrange::HexSearch;
|
||||
use crate::nip05::{Nip05Name, VerificationRecord};
|
||||
use crate::payment::{InvoiceInfo, InvoiceStatus};
|
||||
use crate::repo::sqlite_migration::{upgrade_db, STARTUP_SQL};
|
||||
|
@ -62,7 +60,7 @@ impl SqliteRepo {
|
|||
"writer",
|
||||
settings,
|
||||
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
|
||||
1,
|
||||
0,
|
||||
2,
|
||||
false,
|
||||
);
|
||||
|
@ -70,7 +68,7 @@ impl SqliteRepo {
|
|||
"maintenance",
|
||||
settings,
|
||||
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
|
||||
1,
|
||||
0,
|
||||
2,
|
||||
true,
|
||||
);
|
||||
|
@ -842,7 +840,8 @@ impl NostrRepo for SqliteRepo {
|
|||
async fn update_invoice(&self, payment_hash: &str, status: InvoiceStatus) -> Result<String> {
|
||||
let mut conn = self.write_pool.get()?;
|
||||
let payment_hash = payment_hash.to_owned();
|
||||
let pub_key = tokio::task::spawn_blocking(move || {
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let tx = conn.transaction()?;
|
||||
let pubkey: String;
|
||||
{
|
||||
|
@ -884,8 +883,7 @@ impl NostrRepo for SqliteRepo {
|
|||
let ok: Result<String> = Ok(pubkey);
|
||||
ok
|
||||
})
|
||||
.await?;
|
||||
pub_key
|
||||
.await?
|
||||
}
|
||||
|
||||
/// Get the most recent invoice for a given pubkey
|
||||
|
@ -978,7 +976,7 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
|
|||
return (empty_query, empty_params, None);
|
||||
}
|
||||
|
||||
// check if the index needs to be overriden
|
||||
// check if the index needs to be overridden
|
||||
let idx_name = override_index(f);
|
||||
let idx_stmt = idx_name
|
||||
.as_ref()
|
||||
|
@ -994,24 +992,9 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
|
|||
// take each author and convert to a hexsearch
|
||||
let mut auth_searches: Vec<String> = vec![];
|
||||
for auth in authvec {
|
||||
match hex_range(auth) {
|
||||
Some(HexSearch::Exact(ex)) => {
|
||||
auth_searches.push("author=?".to_owned());
|
||||
params.push(Box::new(ex));
|
||||
}
|
||||
Some(HexSearch::Range(lower, upper)) => {
|
||||
auth_searches.push("(author>? AND author<?)".to_owned());
|
||||
params.push(Box::new(lower));
|
||||
params.push(Box::new(upper));
|
||||
}
|
||||
Some(HexSearch::LowerOnly(lower)) => {
|
||||
auth_searches.push("author>?".to_owned());
|
||||
params.push(Box::new(lower));
|
||||
}
|
||||
None => {
|
||||
info!("Could not parse hex range from author {:?}", auth);
|
||||
}
|
||||
}
|
||||
auth_searches.push("author=?".to_owned());
|
||||
let auth_bin = hex::decode(auth).ok();
|
||||
params.push(Box::new(auth_bin));
|
||||
}
|
||||
if !authvec.is_empty() {
|
||||
let auth_clause = format!("({})", auth_searches.join(" OR "));
|
||||
|
@ -1032,24 +1015,8 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
|
|||
// take each author and convert to a hexsearch
|
||||
let mut id_searches: Vec<String> = vec![];
|
||||
for id in idvec {
|
||||
match hex_range(id) {
|
||||
Some(HexSearch::Exact(ex)) => {
|
||||
id_searches.push("event_hash=?".to_owned());
|
||||
params.push(Box::new(ex));
|
||||
}
|
||||
Some(HexSearch::Range(lower, upper)) => {
|
||||
id_searches.push("(event_hash>? AND event_hash<?)".to_owned());
|
||||
params.push(Box::new(lower));
|
||||
params.push(Box::new(upper));
|
||||
}
|
||||
Some(HexSearch::LowerOnly(lower)) => {
|
||||
id_searches.push("event_hash>?".to_owned());
|
||||
params.push(Box::new(lower));
|
||||
}
|
||||
None => {
|
||||
info!("Could not parse hex range from id {:?}", id);
|
||||
}
|
||||
}
|
||||
id_searches.push("event_hash=?".to_owned());
|
||||
params.push(Box::new(id.clone()));
|
||||
}
|
||||
if idvec.is_empty() {
|
||||
// if the ids list was empty, we should never return
|
||||
|
@ -1072,26 +1039,24 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
|
|||
// find evidence of the target tag name/value existing for this event.
|
||||
// Query for Kind/Since/Until additionally, to reduce the number of tags that come back.
|
||||
let kind_clause;
|
||||
let since_clause;
|
||||
let until_clause;
|
||||
if let Some(ks) = &f.kinds {
|
||||
// kind is number, no escaping needed
|
||||
let str_kinds: Vec<String> =
|
||||
ks.iter().map(std::string::ToString::to_string).collect();
|
||||
kind_clause = format!("AND kind IN ({})", str_kinds.join(", "));
|
||||
} else {
|
||||
kind_clause = format!("");
|
||||
kind_clause = String::new();
|
||||
};
|
||||
if f.since.is_some() {
|
||||
since_clause = format!("AND created_at > {}", f.since.unwrap());
|
||||
let since_clause = if f.since.is_some() {
|
||||
format!("AND created_at >= {}", f.since.unwrap())
|
||||
} else {
|
||||
since_clause = format!("");
|
||||
String::new()
|
||||
};
|
||||
// Query for timestamp
|
||||
if f.until.is_some() {
|
||||
until_clause = format!("AND created_at < {}", f.until.unwrap());
|
||||
let until_clause = if f.until.is_some() {
|
||||
format!("AND created_at <= {}", f.until.unwrap())
|
||||
} else {
|
||||
until_clause = format!("");
|
||||
String::new()
|
||||
};
|
||||
|
||||
let tag_clause = format!(
|
||||
|
@ -1107,12 +1072,12 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
|
|||
}
|
||||
// Query for timestamp
|
||||
if f.since.is_some() {
|
||||
let created_clause = format!("created_at > {}", f.since.unwrap());
|
||||
let created_clause = format!("created_at >= {}", f.since.unwrap());
|
||||
filter_components.push(created_clause);
|
||||
}
|
||||
// Query for timestamp
|
||||
if f.until.is_some() {
|
||||
let until_clause = format!("created_at < {}", f.until.unwrap());
|
||||
let until_clause = format!("created_at <= {}", f.until.unwrap());
|
||||
filter_components.push(until_clause);
|
||||
}
|
||||
// never display hidden events
|
||||
|
@ -1199,9 +1164,15 @@ pub fn build_pool(
|
|||
.test_on_check_out(true) // no noticeable performance hit
|
||||
.min_idle(Some(min_size))
|
||||
.max_size(max_size)
|
||||
.idle_timeout(Some(Duration::from_secs(10)))
|
||||
.max_lifetime(Some(Duration::from_secs(30)))
|
||||
.build(manager)
|
||||
.unwrap();
|
||||
// retrieve a connection to ensure the startup statements run immediately
|
||||
{
|
||||
let _ = pool.get();
|
||||
}
|
||||
|
||||
info!(
|
||||
"Built a connection pool {:?} (min={}, max={})",
|
||||
name, min_size, max_size
|
||||
|
@ -1311,6 +1282,7 @@ pub async fn db_checkpoint_task(
|
|||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[allow(dead_code)]
|
||||
enum SqliteStatus {
|
||||
Ok,
|
||||
Busy,
|
||||
|
|
|
@ -19,7 +19,7 @@ PRAGMA foreign_keys = ON;
|
|||
PRAGMA journal_size_limit = 32768;
|
||||
PRAGMA temp_store = 2; -- use memory, not temp files
|
||||
PRAGMA main.cache_size = 20000; -- 80MB max cache size per conn
|
||||
pragma mmap_size = 17179869184; -- cap mmap at 16GB
|
||||
pragma mmap_size = 0; -- disable mmap (default)
|
||||
"##;
|
||||
|
||||
/// Latest database version
|
||||
|
@ -159,7 +159,7 @@ fn mig_init(conn: &mut PooledConnection) -> usize {
|
|||
);
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
error!("update (init) failed: {}", err);
|
||||
panic!("database could not be initialized");
|
||||
}
|
||||
}
|
||||
|
@ -295,7 +295,7 @@ pub fn rebuild_tags(conn: &mut PooledConnection) -> Result<()> {
|
|||
let event: Event = serde_json::from_str(&event_json)?;
|
||||
// look at each event, and each tag, creating new tag entries if appropriate.
|
||||
for t in event.tags.iter().filter(|x| x.len() > 1) {
|
||||
let tagname = t.get(0).unwrap();
|
||||
let tagname = t.first().unwrap();
|
||||
let tagnamechar_opt = single_char_tagname(tagname);
|
||||
if tagnamechar_opt.is_none() {
|
||||
continue;
|
||||
|
@ -325,7 +325,7 @@ pub fn rebuild_tags(conn: &mut PooledConnection) -> Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
//// Migration Scripts
|
||||
// Migration Scripts
|
||||
|
||||
fn mig_1_to_2(conn: &mut PooledConnection) -> Result<usize> {
|
||||
// only change is adding a hidden column to events.
|
||||
|
@ -339,7 +339,7 @@ PRAGMA user_version = 2;
|
|||
info!("database schema upgraded v1 -> v2");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
error!("update (v1->v2) failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
|
@ -366,7 +366,7 @@ PRAGMA user_version = 3;
|
|||
info!("database schema upgraded v2 -> v3");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
error!("update (v2->v3) failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
|
@ -416,7 +416,7 @@ PRAGMA user_version = 4;
|
|||
info!("database schema upgraded v3 -> v4");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
error!("update (v3->v4) failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
|
@ -435,7 +435,7 @@ PRAGMA user_version=5;
|
|||
info!("database schema upgraded v4 -> v5");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
error!("update (v4->v5) failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
|
@ -461,7 +461,7 @@ fn mig_5_to_6(conn: &mut PooledConnection) -> Result<usize> {
|
|||
let event: Event = serde_json::from_str(&event_json)?;
|
||||
// look at each event, and each tag, creating new tag entries if appropriate.
|
||||
for t in event.tags.iter().filter(|x| x.len() > 1) {
|
||||
let tagname = t.get(0).unwrap();
|
||||
let tagname = t.first().unwrap();
|
||||
let tagnamechar_opt = single_char_tagname(tagname);
|
||||
if tagnamechar_opt.is_none() {
|
||||
continue;
|
||||
|
@ -507,7 +507,7 @@ PRAGMA user_version = 7;
|
|||
info!("database schema upgraded v6 -> v7");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
error!("update (v6->v7) failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
|
@ -528,7 +528,7 @@ PRAGMA user_version = 8;
|
|||
info!("database schema upgraded v7 -> v8");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
error!("update (v7->v8) failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
|
@ -548,7 +548,7 @@ PRAGMA user_version = 9;
|
|||
info!("database schema upgraded v8 -> v9");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
error!("update (v8->v9) failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
|
@ -567,7 +567,7 @@ PRAGMA user_version = 10;
|
|||
info!("database schema upgraded v9 -> v10");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
error!("update (v9->v10) failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
|
@ -588,7 +588,7 @@ PRAGMA user_version = 11;
|
|||
info!("database schema upgraded v10 -> v11");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
error!("update (v10->v11) failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
|
@ -643,7 +643,7 @@ PRAGMA user_version = 13;
|
|||
info!("database schema upgraded v12 -> v13");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
error!("update (v12->v13) failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
|
@ -663,7 +663,7 @@ PRAGMA user_version = 14;
|
|||
info!("database schema upgraded v13 -> v14");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
error!("update (v13->v14) failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
|
@ -682,7 +682,7 @@ PRAGMA user_version = 15;
|
|||
info!("database schema upgraded v14 -> v15");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
error!("update (v14->v15) failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
|
@ -749,7 +749,7 @@ CREATE INDEX IF NOT EXISTS tag_covering_index ON tag(name,kind,value,created_at,
|
|||
let event: Event = serde_json::from_str(&event_json)?;
|
||||
// look at each event, and each tag, creating new tag entries if appropriate.
|
||||
for t in event.tags.iter().filter(|x| x.len() > 1) {
|
||||
let tagname = t.get(0).unwrap();
|
||||
let tagname = t.first().unwrap();
|
||||
let tagnamechar_opt = single_char_tagname(tagname);
|
||||
if tagnamechar_opt.is_none() {
|
||||
continue;
|
||||
|
@ -786,7 +786,7 @@ PRAGMA user_version = 17;
|
|||
info!("database schema upgraded v16 -> v17");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
error!("update (v16->v17) failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
|
@ -833,7 +833,7 @@ PRAGMA user_version = 18;
|
|||
info!("database schema upgraded v17 -> v18");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
error!("update (v17->v18) failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,8 @@ use hyper::upgrade::Upgraded;
|
|||
use hyper::{
|
||||
header, server::conn::AddrStream, upgrade, Body, Request, Response, Server, StatusCode,
|
||||
};
|
||||
use nostr::key::FromPkStr;
|
||||
use nostr::key::Keys;
|
||||
use prometheus::IntCounterVec;
|
||||
use prometheus::IntGauge;
|
||||
use prometheus::{Encoder, Histogram, HistogramOpts, IntCounter, Opts, Registry, TextEncoder};
|
||||
|
@ -60,8 +62,6 @@ use tungstenite::error::Error as WsError;
|
|||
use tungstenite::handshake;
|
||||
use tungstenite::protocol::Message;
|
||||
use tungstenite::protocol::WebSocketConfig;
|
||||
use nostr::key::FromPkStr;
|
||||
use nostr::key::Keys;
|
||||
|
||||
/// Handle arbitrary HTTP requests, including for `WebSocket` upgrades.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
|
@ -653,6 +653,7 @@ fn get_header_string(header: &str, headers: &HeaderMap) -> Option<String> {
|
|||
async fn ctrl_c_or_signal(mut shutdown_signal: Receiver<()>) {
|
||||
let mut term_signal = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
|
||||
.expect("could not define signal");
|
||||
#[allow(clippy::never_loop)]
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = shutdown_signal.recv() => {
|
||||
|
@ -827,7 +828,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
|
|||
info!("listening on: {}", socket_addr);
|
||||
// all client-submitted valid events are broadcast to every
|
||||
// other client on this channel. This should be large enough
|
||||
// to accomodate slower readers (messages are dropped if
|
||||
// to accommodate slower readers (messages are dropped if
|
||||
// clients can not keep up).
|
||||
let (bcast_tx, _) = broadcast::channel::<Event>(broadcast_buffer_limit);
|
||||
// validated events that need to be persisted are sent to the
|
||||
|
@ -1029,25 +1030,23 @@ fn make_notice_message(notice: &Notice) -> Message {
|
|||
Message::text(json.to_string())
|
||||
}
|
||||
|
||||
fn allowed_to_send(event_str: &String, conn: &conn::ClientConn, settings: &Settings) -> bool {
|
||||
fn allowed_to_send(event_str: &str, conn: &conn::ClientConn, settings: &Settings) -> bool {
|
||||
// TODO: pass in kind so that we can avoid deserialization for most events
|
||||
if settings.authorization.nip42_dms {
|
||||
match serde_json::from_str::<Event>(event_str) {
|
||||
Ok(event) => {
|
||||
if event.kind == 4 {
|
||||
if event.kind == 4 || event.kind == 44 || event.kind == 1059 {
|
||||
match (conn.auth_pubkey(), event.tag_values_by_name("p").first()) {
|
||||
(Some(auth_pubkey), Some(recipient_pubkey)) => {
|
||||
recipient_pubkey == auth_pubkey || &event.pubkey == auth_pubkey
|
||||
},
|
||||
(_, _) => {
|
||||
false
|
||||
},
|
||||
}
|
||||
(_, _) => false,
|
||||
}
|
||||
} else {
|
||||
true
|
||||
}
|
||||
},
|
||||
Err(_) => false
|
||||
}
|
||||
Err(_) => false,
|
||||
}
|
||||
} else {
|
||||
true
|
||||
|
@ -1125,8 +1124,8 @@ async fn nostr_server(
|
|||
|
||||
let unspec = "<unspecified>".to_string();
|
||||
info!("new client connection (cid: {}, ip: {:?})", cid, conn.ip());
|
||||
let origin = client_info.origin.as_ref().unwrap_or_else(|| &unspec);
|
||||
let user_agent = client_info.user_agent.as_ref().unwrap_or_else(|| &unspec);
|
||||
let origin = client_info.origin.as_ref().unwrap_or(&unspec);
|
||||
let user_agent = client_info.user_agent.as_ref().unwrap_or(&unspec);
|
||||
info!(
|
||||
"cid: {}, origin: {:?}, user-agent: {:?}",
|
||||
cid, origin, user_agent
|
||||
|
@ -1175,14 +1174,12 @@ async fn nostr_server(
|
|||
if query_result.event == "EOSE" {
|
||||
let send_str = format!("[\"EOSE\",\"{subesc}\"]");
|
||||
ws_stream.send(Message::Text(send_str)).await.ok();
|
||||
} else {
|
||||
if allowed_to_send(&query_result.event, &conn, &settings) {
|
||||
metrics.sent_events.with_label_values(&["db"]).inc();
|
||||
client_received_event_count += 1;
|
||||
// send a result
|
||||
let send_str = format!("[\"EVENT\",\"{}\",{}]", subesc, &query_result.event);
|
||||
ws_stream.send(Message::Text(send_str)).await.ok();
|
||||
}
|
||||
} else if allowed_to_send(&query_result.event, &conn, &settings) {
|
||||
metrics.sent_events.with_label_values(&["db"]).inc();
|
||||
client_received_event_count += 1;
|
||||
// send a result
|
||||
let send_str = format!("[\"EVENT\",\"{}\",{}]", subesc, &query_result.event);
|
||||
ws_stream.send(Message::Text(send_str)).await.ok();
|
||||
}
|
||||
},
|
||||
// TODO: consider logging the LaggedRecv error
|
||||
|
@ -1265,7 +1262,6 @@ async fn nostr_server(
|
|||
// handle each type of message
|
||||
let evid = ec.event_id().to_owned();
|
||||
let parsed : Result<EventWrapper> = Result::<EventWrapper>::from(ec);
|
||||
metrics.cmd_event.inc();
|
||||
match parsed {
|
||||
Ok(WrappedEvent(e)) => {
|
||||
metrics.cmd_event.inc();
|
||||
|
@ -1278,7 +1274,7 @@ async fn nostr_server(
|
|||
// check if the event is too far in the future.
|
||||
} else if e.is_valid_timestamp(settings.options.reject_future_seconds) {
|
||||
// Write this to the database.
|
||||
let auth_pubkey = conn.auth_pubkey().and_then(|pubkey| hex::decode(&pubkey).ok());
|
||||
let auth_pubkey = conn.auth_pubkey().and_then(|pubkey| hex::decode(pubkey).ok());
|
||||
let submit_event = SubmittedEvent {
|
||||
event: e.clone(),
|
||||
notice_tx: notice_tx.clone(),
|
||||
|
@ -1307,7 +1303,7 @@ async fn nostr_server(
|
|||
error!("AUTH command received, but relay_url is not set in the config file (cid: {})", cid);
|
||||
},
|
||||
Some(relay) => {
|
||||
match conn.authenticate(&event, &relay) {
|
||||
match conn.authenticate(&event, relay) {
|
||||
Ok(_) => {
|
||||
let pubkey = match conn.auth_pubkey() {
|
||||
Some(k) => k.chars().take(8).collect(),
|
||||
|
@ -1317,7 +1313,7 @@ async fn nostr_server(
|
|||
},
|
||||
Err(e) => {
|
||||
info!("authentication error: {} (cid: {})", e, cid);
|
||||
ws_stream.send(make_notice_message(&Notice::message(format!("Authentication error: {e}")))).await.ok();
|
||||
ws_stream.send(make_notice_message(&Notice::restricted(event.id, format!("authentication error: {e}").as_str()))).await.ok();
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -1346,10 +1342,15 @@ async fn nostr_server(
|
|||
if conn.has_subscription(&s) {
|
||||
info!("client sent duplicate subscription, ignoring (cid: {}, sub: {:?})", cid, s.id);
|
||||
} else {
|
||||
metrics.cmd_req.inc();
|
||||
metrics.cmd_req.inc();
|
||||
if let Some(ref lim) = sub_lim_opt {
|
||||
lim.until_ready_with_jitter(jitter).await;
|
||||
}
|
||||
if settings.limits.limit_scrapers && s.is_scraper() {
|
||||
info!("subscription was scraper, ignoring (cid: {}, sub: {:?})", cid, s.id);
|
||||
ws_stream.send(Message::Text(format!("[\"EOSE\",\"{}\"]", s.id))).await.ok();
|
||||
continue
|
||||
}
|
||||
let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>();
|
||||
match conn.subscribe(s.clone()) {
|
||||
Ok(()) => {
|
||||
|
@ -1373,7 +1374,7 @@ async fn nostr_server(
|
|||
// closing a request simply removes the subscription.
|
||||
let parsed : Result<Close> = Result::<Close>::from(cc);
|
||||
if let Ok(c) = parsed {
|
||||
metrics.cmd_close.inc();
|
||||
metrics.cmd_close.inc();
|
||||
// check if a query is currently
|
||||
// running, and remove it if so.
|
||||
let stop_tx = running_queries.remove(&c.id);
|
||||
|
|
|
@ -45,8 +45,8 @@ pub struct ReqFilter {
|
|||
|
||||
impl Serialize for ReqFilter {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
let mut map = serializer.serialize_map(None)?;
|
||||
if let Some(ids) = &self.ids {
|
||||
|
@ -80,8 +80,8 @@ impl Serialize for ReqFilter {
|
|||
|
||||
impl<'de> Deserialize<'de> for ReqFilter {
|
||||
fn deserialize<D>(deserializer: D) -> Result<ReqFilter, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
let received: Value = Deserialize::deserialize(deserializer)?;
|
||||
let filter = received.as_object().ok_or_else(|| {
|
||||
|
@ -184,11 +184,11 @@ impl<'de> Deserialize<'de> for Subscription {
|
|||
/// Custom deserializer for subscriptions, which have a more
|
||||
/// complex structure than the other message types.
|
||||
fn deserialize<D>(deserializer: D) -> Result<Subscription, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
let mut v: Value = Deserialize::deserialize(deserializer)?;
|
||||
// this shoud be a 3-or-more element array.
|
||||
// this should be a 3-or-more element array.
|
||||
// verify the first element is a String, REQ
|
||||
// get the subscription from the second element.
|
||||
// convert each of the remaining objects into filters
|
||||
|
@ -258,6 +258,29 @@ impl Subscription {
|
|||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Is this subscription defined as a scraper query
|
||||
pub fn is_scraper(&self) -> bool {
|
||||
for f in &self.filters {
|
||||
let mut precision = 0;
|
||||
if f.ids.is_some() {
|
||||
precision += 2;
|
||||
}
|
||||
if f.authors.is_some() {
|
||||
precision += 1;
|
||||
}
|
||||
if f.kinds.is_some() {
|
||||
precision += 1;
|
||||
}
|
||||
if f.tags.is_some() {
|
||||
precision += 1;
|
||||
}
|
||||
if precision < 2 {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn prefix_match(prefixes: &[String], target: &str) -> bool {
|
||||
|
@ -319,8 +342,8 @@ impl ReqFilter {
|
|||
pub fn interested_in_event(&self, event: &Event) -> bool {
|
||||
// self.id.as_ref().map(|v| v == &event.id).unwrap_or(true)
|
||||
self.ids_match(event)
|
||||
&& self.since.map_or(true, |t| event.created_at > t)
|
||||
&& self.until.map_or(true, |t| event.created_at < t)
|
||||
&& self.since.map_or(true, |t| event.created_at >= t)
|
||||
&& self.until.map_or(true, |t| event.created_at <= t)
|
||||
&& self.kind_match(event.kind)
|
||||
&& (self.authors_match(event) || self.delegated_authors_match(event))
|
||||
&& self.tag_match(event)
|
||||
|
@ -338,7 +361,7 @@ mod tests {
|
|||
let s: Subscription = serde_json::from_str(raw_json)?;
|
||||
assert_eq!(s.id, "some-id");
|
||||
assert_eq!(s.filters.len(), 1);
|
||||
assert_eq!(s.filters.get(0).unwrap().authors, None);
|
||||
assert_eq!(s.filters.first().unwrap().authors, None);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -402,7 +425,7 @@ mod tests {
|
|||
let s: Subscription = serde_json::from_str(raw_json)?;
|
||||
assert_eq!(s.id, "some-id");
|
||||
assert_eq!(s.filters.len(), 1);
|
||||
let first_filter = s.filters.get(0).unwrap();
|
||||
let first_filter = s.filters.first().unwrap();
|
||||
assert_eq!(
|
||||
first_filter.authors,
|
||||
Some(vec!("test-author-id".to_owned()))
|
||||
|
@ -633,11 +656,11 @@ mod tests {
|
|||
let s: Subscription = serde_json::from_str(
|
||||
r##"["REQ","xyz",{"authors":["abc", "bcd"], "since": 10, "until": 20, "limit":100, "#e": ["foo", "bar"], "#d": ["test"]}]"##,
|
||||
)?;
|
||||
let f = s.filters.get(0);
|
||||
let f = s.filters.first();
|
||||
let serialized = serde_json::to_string(&f)?;
|
||||
let serialized_wrapped = format!(r##"["REQ", "xyz",{}]"##, serialized);
|
||||
let parsed: Subscription = serde_json::from_str(&serialized_wrapped)?;
|
||||
let parsed_filter = parsed.filters.get(0);
|
||||
let parsed_filter = parsed.filters.first();
|
||||
if let Some(pf) = parsed_filter {
|
||||
assert_eq!(pf.since, Some(10));
|
||||
assert_eq!(pf.until, Some(20));
|
||||
|
@ -647,4 +670,14 @@ mod tests {
|
|||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_scraper() -> Result<()> {
|
||||
assert!(serde_json::from_str::<Subscription>(r#"["REQ","some-id",{"kinds": [1984],"since": 123,"limit":1}]"#)?.is_scraper());
|
||||
assert!(serde_json::from_str::<Subscription>(r#"["REQ","some-id",{"kinds": [1984]},{"kinds": [1984],"authors":["aaaa"]}]"#)?.is_scraper());
|
||||
assert!(!serde_json::from_str::<Subscription>(r#"["REQ","some-id",{"kinds": [1984],"authors":["aaaa"]}]"#)?.is_scraper());
|
||||
assert!(!serde_json::from_str::<Subscription>(r#"["REQ","some-id",{"ids": ["aaaa"]}]"#)?.is_scraper());
|
||||
assert!(!serde_json::from_str::<Subscription>(r##"["REQ","some-id",{"#p": ["aaaa"],"kinds":[1,4]}]"##)?.is_scraper());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ pub fn is_lower_hex(s: &str) -> bool {
|
|||
})
|
||||
}
|
||||
|
||||
pub fn host_str(url: &String) -> Option<String> {
|
||||
pub fn host_str(url: &str) -> Option<String> {
|
||||
Url::parse(url)
|
||||
.ok()
|
||||
.and_then(|u| u.host_str().map(|s| s.to_string()))
|
||||
|
@ -50,15 +50,15 @@ mod tests {
|
|||
#[test]
|
||||
fn lower_hex() {
|
||||
let hexstr = "abcd0123";
|
||||
assert_eq!(is_lower_hex(hexstr), true);
|
||||
assert!(is_lower_hex(hexstr));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn nip19() {
|
||||
let hexkey = "3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d";
|
||||
let nip19key = "npub180cvv07tjdrrgpa0j7j7tmnyl2yr6yr7l8j4s3evf6u64th6gkwsyjh6w6";
|
||||
assert_eq!(is_nip19(hexkey), false);
|
||||
assert_eq!(is_nip19(nip19key), true);
|
||||
assert!(!is_nip19(hexkey));
|
||||
assert!(is_nip19(nip19key));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -103,8 +103,5 @@ fn get_available_port() -> Option<u16> {
|
|||
}
|
||||
pub fn port_is_available(port: u16) -> bool {
|
||||
info!("checking on port {}", port);
|
||||
match TcpListener::bind(("127.0.0.1", port)) {
|
||||
Ok(_) => true,
|
||||
Err(_) => false,
|
||||
}
|
||||
TcpListener::bind(("127.0.0.1", port)).is_ok()
|
||||
}
|
||||
|
|
|
@ -52,7 +52,7 @@ mod tests {
|
|||
let challenge = client_conn.auth_challenge().unwrap();
|
||||
let event = auth_event(challenge);
|
||||
|
||||
let result = client_conn.authenticate(&event, &RELAY.into());
|
||||
let result = client_conn.authenticate(&event, RELAY);
|
||||
|
||||
assert!(matches!(result, Ok(())));
|
||||
assert_eq!(client_conn.auth_challenge(), None);
|
||||
|
@ -67,7 +67,7 @@ mod tests {
|
|||
assert_eq!(client_conn.auth_pubkey(), None);
|
||||
|
||||
let event = auth_event(&"challenge".into());
|
||||
let result = client_conn.authenticate(&event, &RELAY.into());
|
||||
let result = client_conn.authenticate(&event, RELAY);
|
||||
|
||||
assert!(matches!(result, Err(Error::AuthFailure)));
|
||||
}
|
||||
|
@ -87,14 +87,14 @@ mod tests {
|
|||
let challenge = client_conn.auth_challenge().unwrap().clone();
|
||||
|
||||
let event = auth_event(&challenge);
|
||||
let result = client_conn.authenticate(&event, &RELAY.into());
|
||||
let result = client_conn.authenticate(&event, RELAY);
|
||||
|
||||
assert!(matches!(result, Ok(())));
|
||||
assert_eq!(client_conn.auth_challenge(), None);
|
||||
assert_eq!(client_conn.auth_pubkey(), Some(&event.pubkey));
|
||||
|
||||
let event1 = auth_event(&challenge);
|
||||
let result1 = client_conn.authenticate(&event1, &RELAY.into());
|
||||
let result1 = client_conn.authenticate(&event1, RELAY);
|
||||
|
||||
assert!(matches!(result1, Ok(())));
|
||||
assert_eq!(client_conn.auth_challenge(), None);
|
||||
|
@ -118,7 +118,7 @@ mod tests {
|
|||
let mut event = auth_event(challenge);
|
||||
event.sig = event.sig.chars().rev().collect::<String>();
|
||||
|
||||
let result = client_conn.authenticate(&event, &RELAY.into());
|
||||
let result = client_conn.authenticate(&event, RELAY);
|
||||
|
||||
assert!(matches!(result, Err(Error::AuthFailure)));
|
||||
}
|
||||
|
@ -138,7 +138,7 @@ mod tests {
|
|||
let challenge = client_conn.auth_challenge().unwrap();
|
||||
let event = auth_event_with_kind(challenge, 9999999999999999);
|
||||
|
||||
let result = client_conn.authenticate(&event, &RELAY.into());
|
||||
let result = client_conn.authenticate(&event, RELAY);
|
||||
|
||||
assert!(matches!(result, Err(Error::AuthFailure)));
|
||||
}
|
||||
|
@ -158,7 +158,7 @@ mod tests {
|
|||
let challenge = client_conn.auth_challenge().unwrap();
|
||||
let event = auth_event_with_created_at(challenge, unix_time() - 1200); // 20 minutes
|
||||
|
||||
let result = client_conn.authenticate(&event, &RELAY.into());
|
||||
let result = client_conn.authenticate(&event, RELAY);
|
||||
|
||||
assert!(matches!(result, Err(Error::AuthFailure)));
|
||||
}
|
||||
|
@ -178,7 +178,7 @@ mod tests {
|
|||
let challenge = client_conn.auth_challenge().unwrap();
|
||||
let event = auth_event_with_created_at(challenge, unix_time() + 1200); // 20 minutes
|
||||
|
||||
let result = client_conn.authenticate(&event, &RELAY.into());
|
||||
let result = client_conn.authenticate(&event, RELAY);
|
||||
|
||||
assert!(matches!(result, Err(Error::AuthFailure)));
|
||||
}
|
||||
|
@ -197,7 +197,7 @@ mod tests {
|
|||
|
||||
let event = auth_event_without_tags();
|
||||
|
||||
let result = client_conn.authenticate(&event, &RELAY.into());
|
||||
let result = client_conn.authenticate(&event, RELAY);
|
||||
|
||||
assert!(matches!(result, Err(Error::AuthFailure)));
|
||||
}
|
||||
|
@ -216,7 +216,7 @@ mod tests {
|
|||
|
||||
let event = auth_event_without_challenge();
|
||||
|
||||
let result = client_conn.authenticate(&event, &RELAY.into());
|
||||
let result = client_conn.authenticate(&event, RELAY);
|
||||
|
||||
assert!(matches!(result, Err(Error::AuthFailure)));
|
||||
}
|
||||
|
@ -236,7 +236,7 @@ mod tests {
|
|||
let challenge = client_conn.auth_challenge().unwrap();
|
||||
let event = auth_event_without_relay(challenge);
|
||||
|
||||
let result = client_conn.authenticate(&event, &RELAY.into());
|
||||
let result = client_conn.authenticate(&event, RELAY);
|
||||
|
||||
assert!(matches!(result, Err(Error::AuthFailure)));
|
||||
}
|
||||
|
@ -255,7 +255,7 @@ mod tests {
|
|||
|
||||
let event = auth_event(&"invalid challenge".into());
|
||||
|
||||
let result = client_conn.authenticate(&event, &RELAY.into());
|
||||
let result = client_conn.authenticate(&event, RELAY);
|
||||
|
||||
assert!(matches!(result, Err(Error::AuthFailure)));
|
||||
}
|
||||
|
@ -275,7 +275,7 @@ mod tests {
|
|||
let challenge = client_conn.auth_challenge().unwrap();
|
||||
let event = auth_event_with_relay(challenge, &"xyz".into());
|
||||
|
||||
let result = client_conn.authenticate(&event, &RELAY.into());
|
||||
let result = client_conn.authenticate(&event, RELAY);
|
||||
|
||||
assert!(matches!(result, Err(Error::AuthFailure)));
|
||||
}
|
||||
|
@ -334,9 +334,9 @@ mod tests {
|
|||
id: "0".to_owned(),
|
||||
pubkey: public_key.to_hex(),
|
||||
delegated_by: None,
|
||||
created_at: created_at,
|
||||
kind: kind,
|
||||
tags: tags,
|
||||
created_at,
|
||||
kind,
|
||||
tags,
|
||||
content: "".to_owned(),
|
||||
sig: "0".to_owned(),
|
||||
tagidx: None,
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
use anyhow::Result;
|
||||
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio_tungstenite::connect_async;
|
||||
use tracing::info;
|
||||
mod common;
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -45,3 +47,33 @@ async fn relay_home_page() -> Result<()> {
|
|||
let _res = relay.shutdown_tx.send(());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//#[tokio::test]
|
||||
// Still inwork
|
||||
async fn publish_test() -> Result<()> {
|
||||
// get a relay and wait for startup
|
||||
let relay = common::start_relay()?;
|
||||
common::wait_for_healthy_relay(&relay).await?;
|
||||
// open a non-secure websocket connection.
|
||||
let (mut ws, _res) = connect_async(format!("ws://localhost:{}", relay.port)).await?;
|
||||
// send a simple pre-made message
|
||||
let simple_event = r#"["EVENT", {"content": "hello world","created_at": 1691239763,
|
||||
"id":"f3ce6798d70e358213ebbeba4886bbdfacf1ecfd4f65ee5323ef5f404de32b86",
|
||||
"kind": 1,
|
||||
"pubkey": "79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798",
|
||||
"sig": "30ca29e8581eeee75bf838171dec818af5e6de2b74f5337de940f5cc91186534c0b20d6cf7ad1043a2c51dbd60b979447720a471d346322103c83f6cb66e4e98",
|
||||
"tags": []}]"#;
|
||||
ws.send(simple_event.into()).await?;
|
||||
// get response from server, confirm it is an array with first element "OK"
|
||||
let event_confirm = ws.next().await;
|
||||
ws.close(None).await?;
|
||||
info!("event confirmed: {:?}", event_confirm);
|
||||
// open a new connection, and wait for some time to get the event.
|
||||
let (mut sub_ws, _res) = connect_async(format!("ws://localhost:{}", relay.port)).await?;
|
||||
let event_sub = r#"["REQ", "simple", {}]"#;
|
||||
sub_ws.send(event_sub.into()).await?;
|
||||
// read from subscription
|
||||
let _ws_next = sub_ws.next().await;
|
||||
let _res = relay.shutdown_tx.send(());
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user