mirror of
https://github.com/scsibug/nostr-rs-relay.git
synced 2025-09-01 03:40:46 -04:00
Compare commits
41 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
ffd4e6f997 | ||
|
bbd716963e | ||
|
ca95e8cf22 | ||
|
e9d2a2cbd0 | ||
|
39a945b493 | ||
|
9a84dc19e9 | ||
|
20c4bb42eb | ||
|
0e519f6b77 | ||
|
3dd0f2c9c6 | ||
|
b7c8737166 | ||
|
c0b112c094 | ||
|
cb283ac316 | ||
|
2c6ac69bfd | ||
|
d929ae2752 | ||
|
14fe9f9ee1 | ||
|
7774db8c47 | ||
|
104ef2b9e1 | ||
|
c06139ec99 | ||
|
19ec89593d | ||
|
27902bc5f4 | ||
|
d2adddaee4 | ||
|
b23b3ce8ec | ||
|
5f9fe1ce59 | ||
|
6a8c4ed1b5 | ||
|
966c853700 | ||
|
65fd0ed08b | ||
|
0b51675b38 | ||
|
2e22334631 | ||
|
cb2ac4bf0f | ||
|
38dc7789dc | ||
|
ce0e00ffb3 | ||
|
3e4ae4aeec | ||
|
c6a8807485 | ||
|
8137b6211c | ||
|
29effaae23 | ||
|
e5074f2e46 | ||
|
4fd7643907 | ||
|
1e1ec69175 | ||
|
e08647867c | ||
|
ae0f7171ed | ||
|
4f1a912f36 |
@@ -11,6 +11,6 @@ repos:
|
||||
- repo: https://github.com/doublify/pre-commit-rust
|
||||
rev: v1.0
|
||||
hooks:
|
||||
- id: fmt
|
||||
# - id: fmt
|
||||
- id: cargo-check
|
||||
- id: clippy
|
||||
|
106
Cargo.lock
generated
106
Cargo.lock
generated
@@ -54,9 +54,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "anyhow"
|
||||
version = "1.0.67"
|
||||
version = "1.0.68"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7724808837b77f4b4de9d283820f9d98bcf496d5692934b857a2399d31ff22e6"
|
||||
checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61"
|
||||
|
||||
[[package]]
|
||||
name = "async-stream"
|
||||
@@ -382,9 +382,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cxx"
|
||||
version = "1.0.84"
|
||||
version = "1.0.85"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "27874566aca772cb515af4c6e997b5fe2119820bca447689145e39bb734d19a0"
|
||||
checksum = "5add3fc1717409d029b20c5b6903fc0c0b02fa6741d820054f4a2efa5e5816fd"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"cxxbridge-flags",
|
||||
@@ -394,9 +394,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cxx-build"
|
||||
version = "1.0.84"
|
||||
version = "1.0.85"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e7bb951f2523a49533003656a72121306b225ec16a49a09dc6b0ba0d6f3ec3c0"
|
||||
checksum = "b4c87959ba14bc6fbc61df77c3fcfe180fc32b93538c4f1031dd802ccb5f2ff0"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"codespan-reporting",
|
||||
@@ -409,15 +409,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cxxbridge-flags"
|
||||
version = "1.0.84"
|
||||
version = "1.0.85"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "be778b6327031c1c7b61dd2e48124eee5361e6aa76b8de93692f011b08870ab4"
|
||||
checksum = "69a3e162fde4e594ed2b07d0f83c6c67b745e7f28ce58c6df5e6b6bef99dfb59"
|
||||
|
||||
[[package]]
|
||||
name = "cxxbridge-macro"
|
||||
version = "1.0.84"
|
||||
version = "1.0.85"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7b8a2b87662fe5a0a0b38507756ab66aff32638876a0866e5a5fc82ceb07ee49"
|
||||
checksum = "3e7e2adeb6a0d4a282e581096b06e1791532b7d576dcde5ccd9382acf55db8e6"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -729,9 +729,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "hermit-abi"
|
||||
version = "0.1.19"
|
||||
version = "0.2.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33"
|
||||
checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
@@ -933,9 +933,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.138"
|
||||
version = "0.2.139"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "db6d7e329c562c5dfab7a46a2afabc8b987ab9a4834c9d1ca04dc54c1546cef8"
|
||||
checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79"
|
||||
|
||||
[[package]]
|
||||
name = "libsqlite3-sys"
|
||||
@@ -1080,9 +1080,9 @@ checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c"
|
||||
|
||||
[[package]]
|
||||
name = "nom"
|
||||
version = "7.1.1"
|
||||
version = "7.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a8903e5a29a317527874d0402f867152a3d21c908bb0b933e416c65e301d4c36"
|
||||
checksum = "e5507769c4919c998e69e49c839d9dc6e693ede4cc4290d6ad8b41d4f09c548c"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
"minimal-lexical",
|
||||
@@ -1096,7 +1096,7 @@ checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
|
||||
|
||||
[[package]]
|
||||
name = "nostr-rs-relay"
|
||||
version = "0.7.13"
|
||||
version = "0.7.16"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bitcoin_hashes",
|
||||
@@ -1209,9 +1209,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "num_cpus"
|
||||
version = "1.14.0"
|
||||
version = "1.15.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f6058e64324c71e02bc2b150e4f3bc8286db6c83092132ffa3f6b1eab0f9def5"
|
||||
checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b"
|
||||
dependencies = [
|
||||
"hermit-abi",
|
||||
"libc",
|
||||
@@ -1219,15 +1219,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "once_cell"
|
||||
version = "1.16.0"
|
||||
version = "1.17.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860"
|
||||
checksum = "6f61fba1741ea2b3d6a1e3178721804bb716a68a6aeba1149b5d52e3d464ea66"
|
||||
|
||||
[[package]]
|
||||
name = "openssl"
|
||||
version = "0.10.44"
|
||||
version = "0.10.45"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "29d971fd5722fec23977260f6e81aa67d2f22cadbdc2aa049f1022d9a3be1566"
|
||||
checksum = "b102428fd03bc5edf97f62620f7298614c45cedf287c271e7ed450bbaf83f2e1"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"cfg-if",
|
||||
@@ -1257,9 +1257,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
|
||||
|
||||
[[package]]
|
||||
name = "openssl-sys"
|
||||
version = "0.9.79"
|
||||
version = "0.9.80"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5454462c0eced1e97f2ec09036abc8da362e66802f66fd20f86854d9d8cbcbc4"
|
||||
checksum = "23bbbf7854cd45b83958ebe919f0e8e516793727652e27fda10a8384cfc790b7"
|
||||
dependencies = [
|
||||
"autocfg 1.1.0",
|
||||
"cc",
|
||||
@@ -1326,9 +1326,9 @@ checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
|
||||
|
||||
[[package]]
|
||||
name = "pest"
|
||||
version = "2.5.1"
|
||||
version = "2.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cc8bed3549e0f9b0a2a78bf7c0018237a2cdf085eecbbc048e52612438e4e9d0"
|
||||
checksum = "0f6e86fb9e7026527a0d46bc308b841d73170ef8f443e1807f6ef88526a816d4"
|
||||
dependencies = [
|
||||
"thiserror",
|
||||
"ucd-trie",
|
||||
@@ -1336,9 +1336,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "pest_derive"
|
||||
version = "2.5.1"
|
||||
version = "2.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cdc078600d06ff90d4ed238f0119d84ab5d43dbaad278b0e33a8820293b32344"
|
||||
checksum = "96504449aa860c8dcde14f9fba5c58dc6658688ca1fe363589d6327b8662c603"
|
||||
dependencies = [
|
||||
"pest",
|
||||
"pest_generator",
|
||||
@@ -1346,9 +1346,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "pest_generator"
|
||||
version = "2.5.1"
|
||||
version = "2.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "28a1af60b1c4148bb269006a750cff8e2ea36aff34d2d96cf7be0b14d1bed23c"
|
||||
checksum = "798e0220d1111ae63d66cb66a5dcb3fc2d986d520b98e49e1852bfdb11d7c5e7"
|
||||
dependencies = [
|
||||
"pest",
|
||||
"pest_meta",
|
||||
@@ -1359,9 +1359,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "pest_meta"
|
||||
version = "2.5.1"
|
||||
version = "2.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fec8605d59fc2ae0c6c1aefc0c7c7a9769732017c0ce07f7a9cfffa7b4404f20"
|
||||
checksum = "984298b75898e30a843e278a9f2452c31e349a073a0ce6fd950a12a74464e065"
|
||||
dependencies = [
|
||||
"once_cell",
|
||||
"pest",
|
||||
@@ -1414,18 +1414,18 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro2"
|
||||
version = "1.0.48"
|
||||
version = "1.0.49"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e9d89e5dba24725ae5678020bf8f1357a9aa7ff10736b551adbcd3f8d17d766f"
|
||||
checksum = "57a8eca9f9c4ffde41714334dee777596264c7825420f521abc92b5b5deb63a5"
|
||||
dependencies = [
|
||||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost"
|
||||
version = "0.11.3"
|
||||
version = "0.11.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c0b18e655c21ff5ac2084a5ad0611e827b3f92badf79f4910b5a5c58f4d87ff0"
|
||||
checksum = "c01db6702aa05baa3f57dec92b8eeeeb4cb19e894e73996b32a4093289e54592"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"prost-derive",
|
||||
@@ -1433,9 +1433,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "prost-derive"
|
||||
version = "0.11.2"
|
||||
version = "0.11.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "164ae68b6587001ca506d3bf7f1000bfa248d0e1217b618108fba4ec1d0cc306"
|
||||
checksum = "c8842bad1a5419bca14eac663ba798f6bc19c413c2fdceb5f3ba3b0932d96720"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"itertools",
|
||||
@@ -1446,9 +1446,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "prost-types"
|
||||
version = "0.11.2"
|
||||
version = "0.11.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "747761bc3dc48f9a34553bf65605cf6cb6288ba219f3450b4275dbd81539551a"
|
||||
checksum = "017f79637768cde62820bc2d4fe0e45daaa027755c323ad077767c6c5f173091"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"prost",
|
||||
@@ -1472,9 +1472,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "quote"
|
||||
version = "1.0.22"
|
||||
version = "1.0.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "556d0f47a940e895261e77dc200d5eadfc6ef644c179c6f5edfc105e3a2292c8"
|
||||
checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
]
|
||||
@@ -1823,18 +1823,18 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.151"
|
||||
version = "1.0.152"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "97fed41fc1a24994d044e6db6935e69511a1153b52c15eb42493b26fa87feba0"
|
||||
checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb"
|
||||
dependencies = [
|
||||
"serde_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_derive"
|
||||
version = "1.0.151"
|
||||
version = "1.0.152"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "255abe9a125a985c05190d687b320c12f9b1f0b99445e608c21ba0782c719ad8"
|
||||
checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -1843,9 +1843,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde_json"
|
||||
version = "1.0.90"
|
||||
version = "1.0.91"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8778cc0b528968fe72abec38b5db5a20a70d148116cd9325d2bc5f5180ca3faf"
|
||||
checksum = "877c235533714907a8c2464236f5c4b2a17262ef1bd71f38f35ea592c8da6883"
|
||||
dependencies = [
|
||||
"indexmap",
|
||||
"itoa",
|
||||
@@ -1920,9 +1920,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "1.0.106"
|
||||
version = "1.0.107"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "09ee3a69cd2c7e06684677e5629b3878b253af05e4714964204279c6bc02cf0b"
|
||||
checksum = "1f4064b5b16e03ae50984a5a8ed5d4f8803e6bc1fd170a3cda91a1be4b18e3f5"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -2004,9 +2004,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.23.0"
|
||||
version = "1.23.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eab6d665857cc6ca78d6e80303a02cea7a7851e85dfbd77cbdc09bd129f1ef46"
|
||||
checksum = "38a54aca0c15d014013256222ba0ebed095673f89345dd79119d912eb561b7a8"
|
||||
dependencies = [
|
||||
"autocfg 1.1.0",
|
||||
"bytes",
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "nostr-rs-relay"
|
||||
version = "0.7.13"
|
||||
version = "0.7.16"
|
||||
edition = "2021"
|
||||
authors = ["Greg Heartsfield <scsibug@imap.cc>"]
|
||||
description = "A relay implementation for the Nostr protocol"
|
||||
@@ -28,7 +28,7 @@ secp256k1 = {version = "0.21", features = ["rand", "rand-std", "serde", "bitcoin
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = {version = "1.0", features = ["preserve_order"]}
|
||||
hex = "0.4"
|
||||
rusqlite = { version = "0.26", features = ["limits","bundled"]}
|
||||
rusqlite = { version = "0.26", features = ["limits","bundled","modern_sqlite", "trace"]}
|
||||
r2d2 = "0.8"
|
||||
r2d2_sqlite = "0.19"
|
||||
lazy_static = "1.4"
|
||||
|
@@ -1,4 +1,4 @@
|
||||
FROM docker.io/library/rust:1.66.0@sha256:359949280cebefe93ccb33089fe25111a3aadfe99eac4b6cbe8ec3e1b571dacb as builder
|
||||
FROM docker.io/library/rust:1.66.0 as builder
|
||||
|
||||
RUN USER=root cargo install cargo-auditable
|
||||
RUN USER=root cargo new --bin nostr-rs-relay
|
||||
@@ -17,7 +17,7 @@ COPY ./src ./src
|
||||
RUN rm ./target/release/deps/nostr*relay*
|
||||
RUN cargo auditable build --release --locked
|
||||
|
||||
FROM docker.io/library/debian:bullseye-20221205-slim@sha256:25f10b4f1ded5341a3ca0a30290ff3cd5639415f0c5a2222d5e7d5dd72952aa1
|
||||
FROM docker.io/library/debian:bullseye-slim
|
||||
|
||||
ARG APP=/usr/src/app
|
||||
ARG APP_DATA=/usr/src/app/db
|
||||
|
35
README.md
35
README.md
@@ -28,7 +28,8 @@ mirrored on [GitHub](https://github.com/scsibug/nostr-rs-relay).
|
||||
- [x] NIP-16: [Event Treatment](https://github.com/nostr-protocol/nips/blob/master/16.md)
|
||||
- [x] NIP-20: [Command Results](https://github.com/nostr-protocol/nips/blob/master/20.md)
|
||||
- [x] NIP-22: [Event `created_at` limits](https://github.com/nostr-protocol/nips/blob/master/22.md) (_future-dated events only_)
|
||||
- [x] NIP-26: [Event Delegation](https://github.com/nostr-protocol/nips/blob/master/26.md)
|
||||
- [ ] NIP-26: [Event Delegation](https://github.com/nostr-protocol/nips/blob/master/26.md) (_implemented, but currently disabled_)
|
||||
- [x] NIP-28: [Public Chat](https://github.com/nostr-protocol/nips/blob/master/28.md)
|
||||
|
||||
## Quick Start
|
||||
|
||||
@@ -81,6 +82,38 @@ Text Note [81cf...2652] from 296a...9b92 5 seconds ago
|
||||
A pre-built container is also available on DockerHub:
|
||||
https://hub.docker.com/r/scsibug/nostr-rs-relay
|
||||
|
||||
## Build and Run (without Docker)
|
||||
|
||||
Building `nostr-rs-relay` requires an installation of Cargo & Rust: https://www.rust-lang.org/tools/install
|
||||
|
||||
Clone this repository, and then build a release version of the relay:
|
||||
|
||||
```console
|
||||
$ git clone -q https://git.sr.ht/\~gheartsfield/nostr-rs-relay
|
||||
$ cd nostr-rs-relay
|
||||
$ cargo build -q -r
|
||||
```
|
||||
|
||||
The relay executable is now located in
|
||||
`target/release/nostr-rs-relay`. In order to run it with logging
|
||||
enabled, execute it with the `RUST_LOG` variable set:
|
||||
|
||||
```console
|
||||
$ RUST_LOG=warn,nostr_rs_relay=info ./target/release/nostr-rs-relay
|
||||
Dec 26 10:31:56.455 INFO nostr_rs_relay: Starting up from main
|
||||
Dec 26 10:31:56.464 INFO nostr_rs_relay::server: listening on: 0.0.0.0:8080
|
||||
Dec 26 10:31:56.466 INFO nostr_rs_relay::server: db writer created
|
||||
Dec 26 10:31:56.466 INFO nostr_rs_relay::db: Built a connection pool "event writer" (min=1, max=2)
|
||||
Dec 26 10:31:56.466 INFO nostr_rs_relay::db: opened database "./nostr.db" for writing
|
||||
Dec 26 10:31:56.466 INFO nostr_rs_relay::schema: DB version = 11
|
||||
Dec 26 10:31:56.467 INFO nostr_rs_relay::db: Built a connection pool "maintenance writer" (min=1, max=2)
|
||||
Dec 26 10:31:56.467 INFO nostr_rs_relay::server: control message listener started
|
||||
Dec 26 10:31:56.468 INFO nostr_rs_relay::db: Built a connection pool "client query" (min=4, max=8)
|
||||
```
|
||||
|
||||
You now have a running relay, on port `8080`. Use a `nostr` client or
|
||||
`websocat` to connect and send/query for events.
|
||||
|
||||
## Configuration
|
||||
|
||||
The sample [`config.toml`](config.toml) file demonstrates the
|
||||
|
10
config.toml
10
config.toml
@@ -36,8 +36,9 @@ data_directory = "."
|
||||
# Minimum number of SQLite reader connections
|
||||
#min_conn = 4
|
||||
|
||||
# Maximum number of SQLite reader connections
|
||||
#max_conn = 128
|
||||
# Maximum number of SQLite reader connections. Recommend setting this
|
||||
# to approx the number of cores.
|
||||
#max_conn = 8
|
||||
|
||||
[network]
|
||||
# Bind to this network address
|
||||
@@ -99,6 +100,11 @@ reject_future_seconds = 1800
|
||||
# backpressure to senders if writes are slow.
|
||||
#event_persist_buffer = 4096
|
||||
|
||||
# Event kind blacklist. Events with these kinds will be discarded.
|
||||
#event_kind_blacklist = [
|
||||
# 70202,
|
||||
#]
|
||||
|
||||
[authorization]
|
||||
# Pubkey addresses in this array are whitelisted for event publishing.
|
||||
# Only valid events by these authors will be accepted, if the variable
|
||||
|
125
docs/database-maintenance.md
Normal file
125
docs/database-maintenance.md
Normal file
@@ -0,0 +1,125 @@
|
||||
# Database Maintenance
|
||||
|
||||
`nostr-rs-relay` uses the SQLite embedded database to minimize
|
||||
dependencies and overall footprint of running a relay. If traffic is
|
||||
light, the relay should just run with very little need for
|
||||
intervention. For heavily trafficked relays, there are a number of
|
||||
steps that the operator may need to take to maintain performance and
|
||||
limit disk usage.
|
||||
|
||||
This maintenance guide is current as of version `0.7.14`. Future
|
||||
versions may incorporate and automate some of these steps.
|
||||
|
||||
## Backing Up the Database
|
||||
|
||||
To prevent data loss, the database should be backed up regularly. The
|
||||
recommended method is to use the `sqlite3` command to perform an
|
||||
"Online Backup". This can be done while the relay is running, queries
|
||||
can still run and events will be persisted during the backup.
|
||||
|
||||
The following commands will perform a backup of the database to a
|
||||
dated file, and then compress to minimize size:
|
||||
|
||||
```console
|
||||
BACKUP_FILE=/var/backups/nostr/`date +%Y%m%d_%H%M`.db
|
||||
sqlite3 -readonly /apps/nostr-relay/nostr.db ".backup $BACKUP_FILE
|
||||
sqlite3 $BACKUP_FILE "vacuum;"
|
||||
bzip2 -9 $BACKUP_FILE
|
||||
```
|
||||
|
||||
Nostr events are very compressible. Expect a compression ratio on the
|
||||
order of 4:1, resulting in a 75% space saving.
|
||||
|
||||
## Vacuuming the Database
|
||||
|
||||
As the database is updated, it can become fragmented. Performing a
|
||||
full `vacuum` will rebuild the entire database file, and can reduce
|
||||
space. Running this may reduce the size of the database file,
|
||||
especially if a large amount of data was updated or deleted.
|
||||
|
||||
```console
|
||||
vacuum;
|
||||
```
|
||||
|
||||
## Clearing Hidden Events
|
||||
|
||||
When events are deleted, either through deletion events, metadata or
|
||||
follower updates, or a replaceable event kind, the event is not
|
||||
actually removed from the database. Instead, a flag `HIDDEN` is set
|
||||
to true for the event, which excludes it from search results. The
|
||||
original intent was to ensure that subsequent rebroadcasts of the
|
||||
event would be easily detected as having been deleted, and would not
|
||||
need to be stored again. In practice, this decision causes excessive
|
||||
growth of the `tags` table, since all the previous followers are
|
||||
retained for those `HIDDEN` events.
|
||||
|
||||
The `event` and especially the `tag` table can be significantly
|
||||
reduced in size by running these commands:
|
||||
|
||||
```console
|
||||
PRAGMA foreign_keys = ON;
|
||||
delete from event where HIDDEN=true;
|
||||
```
|
||||
|
||||
## Manually Removing Events
|
||||
|
||||
For a variety of reasons, an operator may wish to remove some events
|
||||
from the database. The only way of achieving this today is with
|
||||
manually run SQL commands.
|
||||
|
||||
It is recommended to have a good backup prior to manually running SQL
|
||||
commands!
|
||||
|
||||
In all cases, it is mandatory to enable foreign keys, and this must be
|
||||
done for every connection. Otherwise, you will likely orphan rows in
|
||||
the `tag` table.
|
||||
|
||||
### Deleting Specific Event
|
||||
|
||||
```console
|
||||
PRAGMA foreign_keys = ON;
|
||||
delete from event where event_hash=x'00000000000c1271675dc86e3e1dd1336827bccabb90dc4c9d3b4465efefe00e';
|
||||
```
|
||||
|
||||
### Deleting All Events for Pubkey
|
||||
|
||||
```console
|
||||
PRAGMA foreign_keys = ON;
|
||||
delete from event where author=x'000000000002c7831d9c5a99f183afc2813a6f69a16edda7f6fc0ed8110566e6';
|
||||
```
|
||||
|
||||
### Deleting All Events of a Kind
|
||||
|
||||
|
||||
```console
|
||||
PRAGMA foreign_keys = ON;
|
||||
delete from event where kind=70202;
|
||||
```
|
||||
|
||||
### Deleting Old Events
|
||||
|
||||
In this scenario, we wish to delete any event that has been stored by
|
||||
our relay for more than 1 month. Crucially, this is based on when the
|
||||
event was stored, not when the event says it was created. If an event
|
||||
has a `created` field of 2 years ago, but was first sent to our relay
|
||||
yesterday, it would not be deleted in this scenario. Keep in mind, we
|
||||
do not track anything for re-broadcast events that we already have, so
|
||||
this is not a very effective way of implementing a "least recently
|
||||
seen" policy.
|
||||
|
||||
```console
|
||||
PRAGMA foreign_keys = ON;
|
||||
TODO!
|
||||
```
|
||||
|
||||
### Delete Profile Events with No Recent Events
|
||||
|
||||
Many users create profiles, post a "hello world" event, and then never
|
||||
appear again (likely using an ephemeral keypair that was lost in the
|
||||
browser cache). We can find these accounts and remove them after some
|
||||
time.
|
||||
|
||||
```console
|
||||
PRAGMA foreign_keys = ON;
|
||||
TODO!
|
||||
```
|
@@ -1,3 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
sed -E 's/@sha256:[[:alnum:]]+//g' Dockerfile > Dockerfile.any-platform
|
||||
echo "Created platform-agnostic Dockerfile in 'Dockerfile.any-platform'"
|
@@ -60,6 +60,7 @@ pub struct Limits {
|
||||
pub max_ws_frame_bytes: Option<usize>,
|
||||
pub broadcast_buffer: usize, // events to buffer for subscribers (prevents slow readers from consuming memory)
|
||||
pub event_persist_buffer: usize, // events to buffer for database commits (block senders if database writes are too slow)
|
||||
pub event_kind_blacklist: Option<Vec<u64>>
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
@@ -207,7 +208,7 @@ impl Default for Settings {
|
||||
data_directory: ".".to_owned(),
|
||||
in_memory: false,
|
||||
min_conn: 4,
|
||||
max_conn: 128,
|
||||
max_conn: 8,
|
||||
},
|
||||
network: Network {
|
||||
port: 8080,
|
||||
@@ -225,6 +226,7 @@ impl Default for Settings {
|
||||
max_ws_frame_bytes: Some(2 << 17), // 128K
|
||||
broadcast_buffer: 16384,
|
||||
event_persist_buffer: 4096,
|
||||
event_kind_blacklist: None,
|
||||
},
|
||||
authorization: Authorization {
|
||||
pubkey_whitelist: None, // Allow any address to publish
|
||||
|
238
src/db.rs
238
src/db.rs
@@ -19,8 +19,10 @@ use r2d2_sqlite::SqliteConnectionManager;
|
||||
use rusqlite::params;
|
||||
use rusqlite::types::ToSql;
|
||||
use rusqlite::OpenFlags;
|
||||
use tokio::sync::{Mutex, MutexGuard};
|
||||
use std::fmt::Write as _;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
@@ -39,13 +41,8 @@ pub struct SubmittedEvent {
|
||||
/// Database file
|
||||
pub const DB_FILE: &str = "nostr.db";
|
||||
|
||||
/// How frequently to run maintenance
|
||||
/// How many persisted events before DB maintenannce is triggered.
|
||||
pub const EVENT_MAINTENANCE_FREQ_SEC: u64 = 60;
|
||||
|
||||
/// How many persisted events before we pause for backups.
|
||||
/// It isn't clear this is enough to make the online backup API work yet.
|
||||
pub const EVENT_COUNT_BACKUP_PAUSE_TRIGGER: usize = 1000;
|
||||
/// How frequently to attempt checkpointing
|
||||
pub const CHECKPOINT_FREQ_SEC: u64 = 60;
|
||||
|
||||
/// Build a database connection pool.
|
||||
/// # Panics
|
||||
@@ -94,6 +91,16 @@ pub fn build_pool(
|
||||
pool
|
||||
}
|
||||
|
||||
/// Display database pool stats every 1 minute
|
||||
pub async fn monitor_pool(name: &str, pool: SqlitePool) {
|
||||
let sleep_dur = Duration::from_secs(60);
|
||||
loop {
|
||||
log_pool_stats(name, &pool);
|
||||
tokio::time::sleep(sleep_dur).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Perform normal maintenance
|
||||
pub fn optimize_db(conn: &mut PooledConnection) -> Result<()> {
|
||||
let start = Instant::now();
|
||||
@@ -102,15 +109,15 @@ pub fn optimize_db(conn: &mut PooledConnection) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
#[derive(Debug)]
|
||||
enum SqliteReturnStatus {
|
||||
SqliteOk,
|
||||
SqliteBusy,
|
||||
SqliteError,
|
||||
SqliteOther(u64),
|
||||
enum SqliteStatus {
|
||||
Ok,
|
||||
Busy,
|
||||
Error,
|
||||
Other(u64),
|
||||
}
|
||||
|
||||
/// Checkpoint/Truncate WAL
|
||||
pub fn checkpoint_db(conn: &mut PooledConnection) -> Result<()> {
|
||||
/// Checkpoint/Truncate WAL. Returns the number of WAL pages remaining.
|
||||
pub fn checkpoint_db(conn: &mut PooledConnection) -> Result<usize> {
|
||||
let query = "PRAGMA wal_checkpoint(TRUNCATE);";
|
||||
let start = Instant::now();
|
||||
let (cp_result, wal_size, _frames_checkpointed) = conn.query_row(query, [], |row| {
|
||||
@@ -120,10 +127,10 @@ pub fn checkpoint_db(conn: &mut PooledConnection) -> Result<()> {
|
||||
Ok((checkpoint_result, wal_size, frames_checkpointed))
|
||||
})?;
|
||||
let result = match cp_result {
|
||||
0 => SqliteReturnStatus::SqliteOk,
|
||||
1 => SqliteReturnStatus::SqliteBusy,
|
||||
2 => SqliteReturnStatus::SqliteError,
|
||||
x => SqliteReturnStatus::SqliteOther(x),
|
||||
0 => SqliteStatus::Ok,
|
||||
1 => SqliteStatus::Busy,
|
||||
2 => SqliteStatus::Error,
|
||||
x => SqliteStatus::Other(x),
|
||||
};
|
||||
info!(
|
||||
"checkpoint ran in {:?} (result: {:?}, WAL size: {})",
|
||||
@@ -131,7 +138,7 @@ pub fn checkpoint_db(conn: &mut PooledConnection) -> Result<()> {
|
||||
result,
|
||||
wal_size
|
||||
);
|
||||
Ok(())
|
||||
Ok(wal_size as usize)
|
||||
}
|
||||
|
||||
/// Spawn a database writer that persists events to the SQLite store.
|
||||
@@ -173,10 +180,6 @@ pub async fn db_writer(
|
||||
let rps_setting = settings.limits.messages_per_sec;
|
||||
let mut most_recent_rate_limit = Instant::now();
|
||||
let mut lim_opt = None;
|
||||
// Constant writing has interfered with online backups. Keep
|
||||
// track of how long since we've given the backups a chance to
|
||||
// run.
|
||||
let mut backup_pause_counter: usize = 0;
|
||||
let clock = governor::clock::QuantaClock::default();
|
||||
if let Some(rps) = rps_setting {
|
||||
if rps > 0 {
|
||||
@@ -221,6 +224,24 @@ pub async fn db_writer(
|
||||
}
|
||||
}
|
||||
|
||||
// Check that event kind isn't blacklisted
|
||||
let kinds_blacklist = &settings.limits.event_kind_blacklist.clone();
|
||||
if let Some(event_kind_blacklist) = kinds_blacklist {
|
||||
if event_kind_blacklist.contains(&event.kind) {
|
||||
info!(
|
||||
"Rejecting event {}, blacklisted kind",
|
||||
&event.get_event_id_prefix()
|
||||
);
|
||||
notice_tx
|
||||
.try_send(Notice::blocked(
|
||||
event.id,
|
||||
"event kind is blocked by relay"
|
||||
))
|
||||
.ok();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// send any metadata events to the NIP-05 verifier
|
||||
if nip05_active && event.is_kind_metadata() {
|
||||
// we are sending this prior to even deciding if we
|
||||
@@ -311,12 +332,6 @@ pub async fn db_writer(
|
||||
notice_tx.try_send(Notice::error(event.id, msg)).ok();
|
||||
}
|
||||
}
|
||||
backup_pause_counter += 1;
|
||||
if backup_pause_counter > EVENT_COUNT_BACKUP_PAUSE_TRIGGER {
|
||||
info!("pausing db write thread for a moment...");
|
||||
thread::sleep(Duration::from_millis(500));
|
||||
backup_pause_counter = 0
|
||||
}
|
||||
}
|
||||
|
||||
// use rate limit, if defined, and if an event was actually written.
|
||||
@@ -349,6 +364,9 @@ pub async fn db_writer(
|
||||
|
||||
/// Persist an event to the database, returning rows added.
|
||||
pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
|
||||
// enable auto vacuum
|
||||
conn.execute_batch("pragma auto_vacuum = FULL")?;
|
||||
|
||||
// start transaction
|
||||
let tx = conn.transaction()?;
|
||||
// get relevant fields from event and convert to blobs.
|
||||
@@ -399,10 +417,12 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
|
||||
// if this event is replaceable update, hide every other replaceable
|
||||
// event with the same kind from the same author that was issued
|
||||
// earlier than this.
|
||||
if e.kind == 0 || e.kind == 3 || (e.kind >= 10000 && e.kind < 20000) {
|
||||
if e.kind == 0 || e.kind == 3 || e.kind == 41 || (e.kind >= 10000 && e.kind < 20000) {
|
||||
let author = hex::decode(&e.pubkey).ok();
|
||||
// this is a backwards check - hide any events that were older.
|
||||
let update_count = tx.execute(
|
||||
"UPDATE event SET hidden=TRUE WHERE id!=? AND kind=? AND author=? AND created_at <= ? and hidden!=TRUE",
|
||||
params![ev_id, e.kind, hex::decode(&e.pubkey).ok(), e.created_at],
|
||||
"UPDATE event SET hidden=TRUE WHERE hidden!=TRUE and kind=? and author=? and id NOT IN (SELECT id FROM event WHERE kind=? AND author=? ORDER BY created_at DESC LIMIT 1)",
|
||||
params![e.kind, author, e.kind, author],
|
||||
)?;
|
||||
if update_count > 0 {
|
||||
info!(
|
||||
@@ -509,22 +529,18 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
for auth in authvec {
|
||||
match hex_range(auth) {
|
||||
Some(HexSearch::Exact(ex)) => {
|
||||
auth_searches.push("author=? OR delegated_by=?".to_owned());
|
||||
params.push(Box::new(ex.clone()));
|
||||
auth_searches.push("author=?".to_owned());
|
||||
params.push(Box::new(ex));
|
||||
}
|
||||
Some(HexSearch::Range(lower, upper)) => {
|
||||
auth_searches.push(
|
||||
"(author>? AND author<?) OR (delegated_by>? AND delegated_by<?)".to_owned(),
|
||||
"(author>? AND author<?)".to_owned(),
|
||||
);
|
||||
params.push(Box::new(lower.clone()));
|
||||
params.push(Box::new(upper.clone()));
|
||||
params.push(Box::new(lower));
|
||||
params.push(Box::new(upper));
|
||||
}
|
||||
Some(HexSearch::LowerOnly(lower)) => {
|
||||
auth_searches.push("author>? OR delegated_by>?".to_owned());
|
||||
params.push(Box::new(lower.clone()));
|
||||
auth_searches.push("author>?".to_owned());
|
||||
params.push(Box::new(lower));
|
||||
}
|
||||
None => {
|
||||
@@ -533,13 +549,11 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
}
|
||||
}
|
||||
if !authvec.is_empty() {
|
||||
let authors_clause = format!("({})", auth_searches.join(" OR "));
|
||||
filter_components.push(authors_clause);
|
||||
let auth_clause = format!("({})", auth_searches.join(" OR "));
|
||||
filter_components.push(auth_clause);
|
||||
} else {
|
||||
// if the authors list was empty, we should never return
|
||||
// any results.
|
||||
filter_components.push("false".to_owned());
|
||||
}
|
||||
filter_components.push("false".to_owned());
|
||||
}
|
||||
}
|
||||
// Query for Kind
|
||||
if let Some(ks) = &f.kinds {
|
||||
@@ -671,35 +685,70 @@ fn _pool_at_capacity(pool: &SqlitePool) -> bool {
|
||||
fn log_pool_stats(name: &str, pool: &SqlitePool) {
|
||||
let state: r2d2::State = pool.state();
|
||||
let in_use_cxns = state.connections - state.idle_connections;
|
||||
trace!(
|
||||
"DB pool {:?} usage (in_use: {}, available: {})",
|
||||
debug!(
|
||||
"DB pool {:?} usage (in_use: {}, available: {}, max: {})",
|
||||
name,
|
||||
in_use_cxns,
|
||||
state.connections
|
||||
state.connections,
|
||||
pool.max_size()
|
||||
);
|
||||
if state.connections == in_use_cxns {
|
||||
debug!("DB pool {:?} is empty (in_use: {})", name, in_use_cxns);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Perform database maintenance on a regular basis
|
||||
pub async fn db_maintenance(pool: SqlitePool) {
|
||||
pub async fn db_optimize_task(pool: SqlitePool) {
|
||||
tokio::task::spawn(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(Duration::from_secs(EVENT_MAINTENANCE_FREQ_SEC)) => {
|
||||
if let Ok(mut conn) = pool.get() {
|
||||
// the busy timer will block writers, so don't set
|
||||
// this any higher than you want max latency for event
|
||||
// writes.
|
||||
conn.busy_timeout(Duration::from_secs(1)).ok();
|
||||
debug!("running database optimizer");
|
||||
optimize_db(&mut conn).ok();
|
||||
debug!("running wal_checkpoint(TRUNCATE)");
|
||||
checkpoint_db(&mut conn).ok();
|
||||
}
|
||||
}
|
||||
};
|
||||
_ = tokio::time::sleep(Duration::from_secs(60*60)) => {
|
||||
if let Ok(mut conn) = pool.get() {
|
||||
// the busy timer will block writers, so don't set
|
||||
// this any higher than you want max latency for event
|
||||
// writes.
|
||||
info!("running database optimizer");
|
||||
optimize_db(&mut conn).ok();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Perform database WAL checkpoint on a regular basis
|
||||
pub async fn db_checkpoint_task(pool: SqlitePool, safe_to_read: Arc<Mutex<u64>>) {
|
||||
tokio::task::spawn(async move {
|
||||
// WAL size in pages.
|
||||
let mut current_wal_size = 0;
|
||||
// WAL threshold for more aggressive checkpointing (10,000 pages, or about 40MB)
|
||||
let wal_threshold = 1000*10;
|
||||
// default threshold for the busy timer
|
||||
let busy_wait_default = Duration::from_secs(1);
|
||||
// if the WAL file is getting too big, switch to this
|
||||
let busy_wait_default_long = Duration::from_secs(10);
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(Duration::from_secs(CHECKPOINT_FREQ_SEC)) => {
|
||||
if let Ok(mut conn) = pool.get() {
|
||||
let mut _guard:Option<MutexGuard<u64>> = None;
|
||||
// the busy timer will block writers, so don't set
|
||||
// this any higher than you want max latency for event
|
||||
// writes.
|
||||
if current_wal_size <= wal_threshold {
|
||||
conn.busy_timeout(busy_wait_default).ok();
|
||||
} else {
|
||||
// if the wal size has exceeded a threshold, increase the busy timeout.
|
||||
conn.busy_timeout(busy_wait_default_long).ok();
|
||||
// take a lock that will prevent new readers.
|
||||
info!("blocking new readers to perform wal_checkpoint");
|
||||
_guard = Some(safe_to_read.lock().await);
|
||||
}
|
||||
debug!("running wal_checkpoint(TRUNCATE)");
|
||||
if let Ok(new_size) = checkpoint_db(&mut conn) {
|
||||
current_wal_size = new_size;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -716,14 +765,19 @@ pub async fn db_query(
|
||||
pool: SqlitePool,
|
||||
query_tx: tokio::sync::mpsc::Sender<QueryResult>,
|
||||
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
|
||||
safe_to_read: Arc<Mutex<u64>>,
|
||||
) {
|
||||
let pre_spawn_start = Instant::now();
|
||||
task::spawn_blocking(move || {
|
||||
{
|
||||
// if we are waiting on a checkpoint, stop until it is complete
|
||||
let _ = safe_to_read.blocking_lock();
|
||||
}
|
||||
let db_queue_time = pre_spawn_start.elapsed();
|
||||
// if the queue time was very long (>5 seconds), spare the DB and abort.
|
||||
if db_queue_time > Duration::from_secs(5) {
|
||||
info!(
|
||||
"shedding DB query load from {:?} (cid: {}, sub: {:?})",
|
||||
"shedding DB query load queued for {:?} (cid: {}, sub: {:?})",
|
||||
db_queue_time, client_id, sub.id
|
||||
);
|
||||
return Ok(());
|
||||
@@ -743,8 +797,6 @@ pub async fn db_query(
|
||||
if sql_gen_elapsed > Duration::from_millis(10) {
|
||||
debug!("SQL (slow) generated in {:?}", start.elapsed());
|
||||
}
|
||||
// show pool stats
|
||||
log_pool_stats("reader", &pool);
|
||||
// cutoff for displaying slow queries
|
||||
let slow_cutoff = Duration::from_millis(2000);
|
||||
// any client that doesn't cause us to generate new rows in 5
|
||||
@@ -753,10 +805,13 @@ pub async fn db_query(
|
||||
let start = Instant::now();
|
||||
let mut slow_first_event;
|
||||
let mut last_successful_send = Instant::now();
|
||||
if let Ok(conn) = pool.get() {
|
||||
// execute the query. Don't cache, since queries vary so much.
|
||||
let mut stmt = conn.prepare(&q)?;
|
||||
if let Ok(mut conn) = pool.get() {
|
||||
// execute the query.
|
||||
// make the actual SQL query (with parameters inserted) available
|
||||
conn.trace(Some(|x| {trace!("SQL trace: {:?}", x)}));
|
||||
let mut stmt = conn.prepare_cached(&q)?;
|
||||
let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
|
||||
|
||||
let mut first_result = true;
|
||||
while let Some(row) = event_rows.next()? {
|
||||
let first_event_elapsed = start.elapsed();
|
||||
@@ -770,29 +825,23 @@ pub async fn db_query(
|
||||
}
|
||||
// logging for slow queries; show sub and SQL.
|
||||
// to reduce logging; only show 1/16th of clients (leading 0)
|
||||
if row_count == 0 && slow_first_event && client_id.starts_with("0") {
|
||||
if row_count == 0 && slow_first_event && client_id.starts_with('0') {
|
||||
debug!(
|
||||
"query req (slow): {:?} (cid: {}, sub: {:?})",
|
||||
sub, client_id, sub.id
|
||||
);
|
||||
debug!(
|
||||
"query string (slow): {} (cid: {}, sub: {:?})",
|
||||
q, client_id, sub.id
|
||||
);
|
||||
} else {
|
||||
trace!(
|
||||
"query req: {:?} (cid: {}, sub: {:?})",
|
||||
sub,
|
||||
client_id,
|
||||
sub.id
|
||||
);
|
||||
trace!(
|
||||
"query string: {} (cid: {}, sub: {:?})",
|
||||
q,
|
||||
client_id,
|
||||
sub.id
|
||||
);
|
||||
}
|
||||
}
|
||||
// check if a checkpoint is trying to run, and abort
|
||||
if row_count % 100 == 0 {
|
||||
{
|
||||
if let Err(_) = safe_to_read.try_lock() {
|
||||
// lock was held, abort this query
|
||||
debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check if this is still active; every 100 rows
|
||||
if row_count % 100 == 0 && abandon_query_rx.try_recv().is_ok() {
|
||||
debug!("query aborted (cid: {}, sub: {:?})", client_id, sub.id);
|
||||
@@ -809,10 +858,17 @@ pub async fn db_query(
|
||||
trace!("db reader thread is stalled");
|
||||
if last_successful_send + abort_cutoff < Instant::now() {
|
||||
// the queue has been full for too long, abort
|
||||
info!("aborting database query due to slow client");
|
||||
info!("aborting database query due to slow client (cid: {}, sub: {:?})",
|
||||
client_id, sub.id);
|
||||
let ok: Result<()> = Ok(());
|
||||
return ok;
|
||||
}
|
||||
// check if a checkpoint is trying to run, and abort
|
||||
if let Err(_) = safe_to_read.try_lock() {
|
||||
// lock was held, abort this query
|
||||
debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id);
|
||||
return Ok(());
|
||||
}
|
||||
// give the queue a chance to clear before trying again
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
}
|
||||
|
@@ -80,7 +80,7 @@ impl FromStr for Operator {
|
||||
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
|
||||
pub struct ConditionQuery {
|
||||
pub(crate) conditions: Vec<Condition>,
|
||||
pub conditions: Vec<Condition>,
|
||||
}
|
||||
|
||||
impl ConditionQuery {
|
||||
@@ -137,9 +137,9 @@ pub fn validate_delegation(
|
||||
/// An example complex condition would be: kind=1,2,3&created_at<1665265999
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
|
||||
pub struct Condition {
|
||||
pub(crate) field: Field,
|
||||
pub(crate) operator: Operator,
|
||||
pub(crate) values: Vec<u64>,
|
||||
pub field: Field,
|
||||
pub operator: Operator,
|
||||
pub values: Vec<u64>,
|
||||
}
|
||||
|
||||
impl Condition {
|
||||
@@ -332,19 +332,6 @@ mod tests {
|
||||
assert_eq!(parsed, cq);
|
||||
Ok(())
|
||||
}
|
||||
fn simple_event() -> Event {
|
||||
Event {
|
||||
id: "0".to_owned(),
|
||||
pubkey: "0".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 0,
|
||||
kind: 0,
|
||||
tags: vec![],
|
||||
content: "".to_owned(),
|
||||
sig: "0".to_owned(),
|
||||
tagidx: None,
|
||||
}
|
||||
}
|
||||
// Check for condition logic on event w/ empty values
|
||||
#[test]
|
||||
fn condition_with_empty_values() {
|
||||
@@ -353,7 +340,7 @@ mod tests {
|
||||
operator: Operator::GreaterThan,
|
||||
values: vec![],
|
||||
};
|
||||
let e = simple_event();
|
||||
let e = Event::simple_event();
|
||||
assert!(!c.allows_event(&e));
|
||||
c.operator = Operator::LessThan;
|
||||
assert!(!c.allows_event(&e));
|
||||
@@ -373,7 +360,7 @@ mod tests {
|
||||
operator: Operator::GreaterThan,
|
||||
values: vec![10],
|
||||
};
|
||||
let mut e = simple_event();
|
||||
let mut e = Event::simple_event();
|
||||
// kind is not greater than 10, not allowed
|
||||
e.kind = 1;
|
||||
assert!(!c.allows_event(&e));
|
||||
@@ -392,7 +379,7 @@ mod tests {
|
||||
operator: Operator::Equals,
|
||||
values: vec![0, 10, 20],
|
||||
};
|
||||
let mut e = simple_event();
|
||||
let mut e = Event::simple_event();
|
||||
// Allow if event kind is in list for Equals
|
||||
e.kind = 10;
|
||||
assert!(c.allows_event(&e));
|
||||
|
56
src/event.rs
56
src/event.rs
@@ -37,19 +37,19 @@ impl EventCmd {
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
|
||||
pub struct Event {
|
||||
pub id: String,
|
||||
pub(crate) pubkey: String,
|
||||
pub pubkey: String,
|
||||
#[serde(skip)]
|
||||
pub(crate) delegated_by: Option<String>,
|
||||
pub(crate) created_at: u64,
|
||||
pub(crate) kind: u64,
|
||||
pub delegated_by: Option<String>,
|
||||
pub created_at: u64,
|
||||
pub kind: u64,
|
||||
#[serde(deserialize_with = "tag_from_string")]
|
||||
// NOTE: array-of-arrays may need to be more general than a string container
|
||||
pub(crate) tags: Vec<Vec<String>>,
|
||||
pub(crate) content: String,
|
||||
pub(crate) sig: String,
|
||||
pub tags: Vec<Vec<String>>,
|
||||
pub content: String,
|
||||
pub sig: String,
|
||||
// Optimization for tag search, built on demand.
|
||||
#[serde(skip)]
|
||||
pub(crate) tagidx: Option<HashMap<char, HashSet<String>>>,
|
||||
pub tagidx: Option<HashMap<char, HashSet<String>>>,
|
||||
}
|
||||
|
||||
/// Simple tag type for array of array of strings.
|
||||
@@ -101,6 +101,21 @@ impl From<EventCmd> for Result<Event> {
|
||||
}
|
||||
|
||||
impl Event {
|
||||
#[cfg(test)]
|
||||
pub fn simple_event() -> Event {
|
||||
Event {
|
||||
id: "0".to_owned(),
|
||||
pubkey: "0".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 0,
|
||||
kind: 0,
|
||||
tags: vec![],
|
||||
content: "".to_owned(),
|
||||
sig: "0".to_owned(),
|
||||
tagidx: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_kind_metadata(&self) -> bool {
|
||||
self.kind == 0
|
||||
}
|
||||
@@ -226,7 +241,7 @@ impl Event {
|
||||
}
|
||||
|
||||
/// Check if this event has a valid signature.
|
||||
fn validate(&self) -> Result<()> {
|
||||
pub fn validate(&self) -> Result<()> {
|
||||
// TODO: return a Result with a reason for invalid events
|
||||
// validation is performed by:
|
||||
// * parsing JSON string into event fields
|
||||
@@ -319,31 +334,18 @@ impl Event {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
fn simple_event() -> Event {
|
||||
Event {
|
||||
id: "0".to_owned(),
|
||||
pubkey: "0".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 0,
|
||||
kind: 0,
|
||||
tags: vec![],
|
||||
content: "".to_owned(),
|
||||
sig: "0".to_owned(),
|
||||
tagidx: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_creation() {
|
||||
// create an event
|
||||
let event = simple_event();
|
||||
let event = Event::simple_event();
|
||||
assert_eq!(event.id, "0");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_serialize() -> Result<()> {
|
||||
// serialize an event to JSON string
|
||||
let event = simple_event();
|
||||
let event = Event::simple_event();
|
||||
let j = serde_json::to_string(&event)?;
|
||||
assert_eq!(j, "{\"id\":\"0\",\"pubkey\":\"0\",\"created_at\":0,\"kind\":0,\"tags\":[],\"content\":\"\",\"sig\":\"0\"}");
|
||||
Ok(())
|
||||
@@ -351,14 +353,14 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn empty_event_tag_match() {
|
||||
let event = simple_event();
|
||||
let event = Event::simple_event();
|
||||
assert!(!event
|
||||
.generic_tag_val_intersect('e', &HashSet::from(["foo".to_owned(), "bar".to_owned()])));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn single_event_tag_match() {
|
||||
let mut event = simple_event();
|
||||
let mut event = Event::simple_event();
|
||||
event.tags = vec![vec!["e".to_owned(), "foo".to_owned()]];
|
||||
event.build_index();
|
||||
assert_eq!(
|
||||
@@ -373,7 +375,7 @@ mod tests {
|
||||
#[test]
|
||||
fn event_tags_serialize() -> Result<()> {
|
||||
// serialize an event with tags to JSON string
|
||||
let mut event = simple_event();
|
||||
let mut event = Event::simple_event();
|
||||
event.tags = vec![
|
||||
vec![
|
||||
"e".to_owned(),
|
||||
|
@@ -35,7 +35,7 @@ impl From<config::Info> for RelayInfo {
|
||||
description: i.description,
|
||||
pubkey: i.pubkey,
|
||||
contact: i.contact,
|
||||
supported_nips: Some(vec![1, 2, 9, 11, 12, 15, 16, 20, 22, 26]),
|
||||
supported_nips: Some(vec![1, 2, 9, 11, 12, 15, 16, 20, 22]),
|
||||
software: Some("https://git.sr.ht/~gheartsfield/nostr-rs-relay".to_owned()),
|
||||
version: CARGO_PKG_VERSION.map(|x| x.to_owned()),
|
||||
}
|
||||
|
27
src/main.rs
27
src/main.rs
@@ -18,6 +18,18 @@ fn db_from_args(args: &[String]) -> Option<String> {
|
||||
None
|
||||
}
|
||||
|
||||
fn print_version() {
|
||||
println!("{} v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"));
|
||||
}
|
||||
|
||||
fn print_help() {
|
||||
println!("Usage: nostr-rs-relay [OPTION]...\n");
|
||||
println!("Options:");
|
||||
println!(" --help Show this help message and exit");
|
||||
println!(" --version Show version information and exit");
|
||||
println!(" --db <directory> Use the <directory> as the location of the database");
|
||||
}
|
||||
|
||||
/// Start running a Nostr relay server.
|
||||
fn main() {
|
||||
// setup tracing
|
||||
@@ -25,6 +37,21 @@ fn main() {
|
||||
info!("Starting up from main");
|
||||
// get database directory from args
|
||||
let args: Vec<String> = env::args().collect();
|
||||
|
||||
let help_flag: bool = args.contains(&"--help".to_owned());
|
||||
// if --help flag was passed, display help and exit
|
||||
if help_flag {
|
||||
print_help();
|
||||
return;
|
||||
}
|
||||
|
||||
let version_flag: bool = args.contains(&"--version".to_owned());
|
||||
// if --version flag was passed, display version and exit
|
||||
if version_flag {
|
||||
print_version();
|
||||
return;
|
||||
}
|
||||
|
||||
let db_dir: Option<String> = db_from_args(&args);
|
||||
// configure settings from config.toml
|
||||
// replace default settings with those read from config.toml
|
||||
|
@@ -13,21 +13,22 @@ use tracing::{debug, error, info};
|
||||
|
||||
/// Startup DB Pragmas
|
||||
pub const STARTUP_SQL: &str = r##"
|
||||
PRAGMA main.synchronous=NORMAL;
|
||||
PRAGMA main.synchronous = NORMAL;
|
||||
PRAGMA foreign_keys = ON;
|
||||
PRAGMA journal_size_limit=32768;
|
||||
PRAGMA journal_size_limit = 32768;
|
||||
pragma mmap_size = 17179869184; -- cap mmap at 16GB
|
||||
"##;
|
||||
|
||||
/// Latest database version
|
||||
pub const DB_VERSION: usize = 11;
|
||||
pub const DB_VERSION: usize = 13;
|
||||
|
||||
/// Schema definition
|
||||
const INIT_SQL: &str = formatcp!(
|
||||
r##"
|
||||
-- Database settings
|
||||
PRAGMA encoding = "UTF-8";
|
||||
PRAGMA journal_mode=WAL;
|
||||
PRAGMA journal_mode = WAL;
|
||||
PRAGMA auto_vacuum = FULL;
|
||||
PRAGMA main.synchronous=NORMAL;
|
||||
PRAGMA foreign_keys = ON;
|
||||
PRAGMA application_id = 1654008667;
|
||||
@@ -52,6 +53,7 @@ CREATE INDEX IF NOT EXISTS author_index ON event(author);
|
||||
CREATE INDEX IF NOT EXISTS created_at_index ON event(created_at);
|
||||
CREATE INDEX IF NOT EXISTS delegated_by_index ON event(delegated_by);
|
||||
CREATE INDEX IF NOT EXISTS event_composite_index ON event(kind,created_at);
|
||||
CREATE INDEX IF NOT EXISTS kind_author_index ON event(kind,author);
|
||||
|
||||
-- Tag Table
|
||||
-- Tag values are stored as either a BLOB (if they come in as a
|
||||
@@ -171,6 +173,12 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<()> {
|
||||
if curr_version == 10 {
|
||||
curr_version = mig_10_to_11(conn)?;
|
||||
}
|
||||
if curr_version == 11 {
|
||||
curr_version = mig_11_to_12(conn)?;
|
||||
}
|
||||
if curr_version == 12 {
|
||||
curr_version = mig_12_to_13(conn)?;
|
||||
}
|
||||
|
||||
if curr_version == DB_VERSION {
|
||||
info!(
|
||||
@@ -371,7 +379,6 @@ fn mig_5_to_6(conn: &mut PooledConnection) -> Result<usize> {
|
||||
|
||||
fn mig_6_to_7(conn: &mut PooledConnection) -> Result<usize> {
|
||||
info!("database schema needs update from 6->7");
|
||||
// only change is adding a hidden column to events.
|
||||
let upgrade_sql = r##"
|
||||
ALTER TABLE event ADD delegated_by BLOB;
|
||||
CREATE INDEX IF NOT EXISTS delegated_by_index ON event(delegated_by);
|
||||
@@ -469,3 +476,51 @@ PRAGMA user_version = 11;
|
||||
}
|
||||
Ok(11)
|
||||
}
|
||||
|
||||
fn mig_11_to_12(conn: &mut PooledConnection) -> Result<usize> {
|
||||
info!("database schema needs update from 11->12");
|
||||
let start = Instant::now();
|
||||
let tx = conn.transaction()?;
|
||||
{
|
||||
// Lookup every replaceable event
|
||||
let mut stmt = tx.prepare("select kind,author from event where kind in (0,3,41) or (kind>=10000 and kind<20000) order by id;")?;
|
||||
let mut replaceable_rows = stmt.query([])?;
|
||||
while let Some(row) = replaceable_rows.next()? {
|
||||
// we want to capture the event_id that had the tag, the tag name, and the tag hex value.
|
||||
let event_kind: u64 = row.get(0)?;
|
||||
let event_author: Vec<u8> = row.get(1)?;
|
||||
tx.execute(
|
||||
"UPDATE event SET hidden=TRUE WHERE hidden!=TRUE and kind=? and author=? and id NOT IN (SELECT id FROM event WHERE kind=? AND author=? ORDER BY created_at DESC LIMIT 1)",
|
||||
params![event_kind, event_author, event_kind, event_author],
|
||||
)?;
|
||||
}
|
||||
tx.execute("PRAGMA user_version = 12;", [])?;
|
||||
}
|
||||
tx.commit()?;
|
||||
info!("database schema upgraded v11 -> v12 in {:?}", start.elapsed());
|
||||
// vacuum after large table modification
|
||||
let start = Instant::now();
|
||||
conn.execute("VACUUM;", [])?;
|
||||
info!("vacuumed DB after hidden event cleanup in {:?}", start.elapsed());
|
||||
Ok(12)
|
||||
}
|
||||
|
||||
fn mig_12_to_13(conn: &mut PooledConnection) -> Result<usize> {
|
||||
info!("database schema needs update from 12->13");
|
||||
let upgrade_sql = r##"
|
||||
CREATE INDEX IF NOT EXISTS kind_author_index ON event(kind,author);
|
||||
reindex;
|
||||
pragma optimize;
|
||||
PRAGMA user_version = 13;
|
||||
"##;
|
||||
match conn.execute_batch(upgrade_sql) {
|
||||
Ok(()) => {
|
||||
info!("database schema upgraded v12 -> v13");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
Ok(13)
|
||||
}
|
||||
|
@@ -25,10 +25,13 @@ use hyper::{
|
||||
use rusqlite::OpenFlags;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use tokio::sync::Mutex;
|
||||
use std::collections::HashMap;
|
||||
use std::convert::Infallible;
|
||||
use std::net::SocketAddr;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::mpsc::Receiver as MpscReceiver;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
@@ -53,6 +56,7 @@ async fn handle_web_request(
|
||||
broadcast: Sender<Event>,
|
||||
event_tx: tokio::sync::mpsc::Sender<SubmittedEvent>,
|
||||
shutdown: Receiver<()>,
|
||||
safe_to_read: Arc<Mutex<u64>>,
|
||||
) -> Result<Response<Body>, Infallible> {
|
||||
match (
|
||||
request.uri().path(),
|
||||
@@ -75,6 +79,7 @@ async fn handle_web_request(
|
||||
Ok(upgraded) => {
|
||||
// set WebSocket configuration options
|
||||
let config = WebSocketConfig {
|
||||
max_send_queue: Some(1024),
|
||||
max_message_size: settings.limits.max_ws_message_bytes,
|
||||
max_frame_size: settings.limits.max_ws_frame_bytes,
|
||||
..Default::default()
|
||||
@@ -88,6 +93,7 @@ async fn handle_web_request(
|
||||
Some(config),
|
||||
)
|
||||
.await;
|
||||
let origin = get_header_string("origin", request.headers());
|
||||
let user_agent = get_header_string("user-agent", request.headers());
|
||||
// determine the remote IP from headers if the exist
|
||||
let header_ip = settings
|
||||
@@ -101,6 +107,7 @@ async fn handle_web_request(
|
||||
let client_info = ClientInfo {
|
||||
remote_ip,
|
||||
user_agent,
|
||||
origin,
|
||||
};
|
||||
// spawn a nostr server with our websocket
|
||||
tokio::spawn(nostr_server(
|
||||
@@ -111,6 +118,7 @@ async fn handle_web_request(
|
||||
broadcast,
|
||||
event_tx,
|
||||
shutdown,
|
||||
safe_to_read,
|
||||
));
|
||||
}
|
||||
// todo: trace, don't print...
|
||||
@@ -245,14 +253,19 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
|
||||
// configure tokio runtime
|
||||
let rt = Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.thread_name("tokio-ws")
|
||||
.thread_name_fn(|| {
|
||||
// give each thread a unique numeric name
|
||||
static ATOMIC_ID: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
|
||||
let id = ATOMIC_ID.fetch_add(1,Ordering::SeqCst);
|
||||
format!("tokio-ws-{}", id)
|
||||
})
|
||||
// limit concurrent SQLite blocking threads
|
||||
.max_blocking_threads(settings.limits.max_blocking_threads)
|
||||
.on_thread_start(|| {
|
||||
trace!("started new thread");
|
||||
trace!("started new thread: {:?}", std::thread::current().name());
|
||||
})
|
||||
.on_thread_stop(|| {
|
||||
trace!("stopping thread");
|
||||
trace!("stopped thread: {:?}", std::thread::current().name());
|
||||
})
|
||||
.build()
|
||||
.unwrap();
|
||||
@@ -317,10 +330,16 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
|
||||
&settings,
|
||||
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
|
||||
1,
|
||||
1,
|
||||
2,
|
||||
false,
|
||||
);
|
||||
db::db_maintenance(maintenance_pool).await;
|
||||
|
||||
// Create a mutex that will block readers, so that a
|
||||
// checkpoint can be performed quickly.
|
||||
let safe_to_read = Arc::new(Mutex::new(0));
|
||||
|
||||
db::db_optimize_task(maintenance_pool.clone()).await;
|
||||
db::db_checkpoint_task(maintenance_pool, safe_to_read.clone()).await;
|
||||
|
||||
// listen for (external to tokio) shutdown request
|
||||
let controlled_shutdown = invoke_shutdown.clone();
|
||||
@@ -356,6 +375,10 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
|
||||
db_max_conn,
|
||||
true,
|
||||
);
|
||||
// spawn a task to check the pool size.
|
||||
let pool_monitor = pool.clone();
|
||||
tokio::spawn(async move {db::monitor_pool("reader", pool_monitor).await;});
|
||||
|
||||
// A `Service` is needed for every connection, so this
|
||||
// creates one from our `handle_request` function.
|
||||
let make_svc = make_service_fn(|conn: &AddrStream| {
|
||||
@@ -365,6 +388,7 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
|
||||
let event = event_tx.clone();
|
||||
let stop = invoke_shutdown.clone();
|
||||
let settings = settings.clone();
|
||||
let safe_to_read = safe_to_read.clone();
|
||||
async move {
|
||||
// service_fn converts our function into a `Service`
|
||||
Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {
|
||||
@@ -376,6 +400,7 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
|
||||
bcast.clone(),
|
||||
event.clone(),
|
||||
stop.subscribe(),
|
||||
safe_to_read.clone(),
|
||||
)
|
||||
}))
|
||||
}
|
||||
@@ -439,6 +464,7 @@ fn make_notice_message(notice: Notice) -> Message {
|
||||
struct ClientInfo {
|
||||
remote_ip: String,
|
||||
user_agent: Option<String>,
|
||||
origin: Option<String>,
|
||||
}
|
||||
|
||||
/// Handle new client connections. This runs through an event loop
|
||||
@@ -451,6 +477,7 @@ async fn nostr_server(
|
||||
broadcast: Sender<Event>,
|
||||
event_tx: mpsc::Sender<SubmittedEvent>,
|
||||
mut shutdown: Receiver<()>,
|
||||
safe_to_read: Arc<Mutex<u64>>,
|
||||
) {
|
||||
// the time this websocket nostr server started
|
||||
let orig_start = Instant::now();
|
||||
@@ -502,9 +529,14 @@ async fn nostr_server(
|
||||
let mut client_published_event_count: usize = 0;
|
||||
let mut client_received_event_count: usize = 0;
|
||||
debug!("new client connection (cid: {}, ip: {:?})", cid, conn.ip());
|
||||
if let Some(ua) = client_info.user_agent {
|
||||
debug!("cid: {}, user-agent: {:?}", cid, ua);
|
||||
}
|
||||
let origin = client_info.origin.unwrap_or_else(|| "<unspecified>".into());
|
||||
let user_agent = client_info
|
||||
.user_agent
|
||||
.unwrap_or_else(|| "<unspecified>".into());
|
||||
debug!(
|
||||
"cid: {}, origin: {:?}, user-agent: {:?}",
|
||||
cid, origin, user_agent
|
||||
);
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = shutdown.recv() => {
|
||||
@@ -655,8 +687,10 @@ async fn nostr_server(
|
||||
if let Some(previous_query) = running_queries.insert(s.id.to_owned(), abandon_query_tx) {
|
||||
previous_query.send(()).ok();
|
||||
}
|
||||
if s.needs_historical_events() {
|
||||
// start a database query. this spawns a blocking database query on a worker thread.
|
||||
db::db_query(s, cid.to_owned(), pool.clone(), query_tx.clone(), abandon_query_rx).await;
|
||||
db::db_query(s, cid.to_owned(), pool.clone(), query_tx.clone(), abandon_query_rx,safe_to_read.clone()).await;
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
info!("Subscription error: {} (cid: {}, sub: {:?})", e, cid, s.id);
|
||||
|
@@ -65,12 +65,21 @@ impl<'de> Deserialize<'de> for ReqFilter {
|
||||
tags: None,
|
||||
force_no_match: false,
|
||||
};
|
||||
let empty_string = "".into();
|
||||
let mut ts = None;
|
||||
// iterate through each key, and assign values that exist
|
||||
for (key, val) in filter.into_iter() {
|
||||
// ids
|
||||
if key == "ids" {
|
||||
rf.ids = Deserialize::deserialize(val).ok();
|
||||
let raw_ids: Option<Vec<String>>= Deserialize::deserialize(val).ok();
|
||||
if let Some(a) = raw_ids.as_ref() {
|
||||
if a.contains(&empty_string) {
|
||||
return Err(serde::de::Error::invalid_type(
|
||||
Unexpected::Other("prefix matches must not be empty strings"),
|
||||
&"a json object"));
|
||||
}
|
||||
}
|
||||
rf.ids =raw_ids;
|
||||
} else if key == "kinds" {
|
||||
rf.kinds = Deserialize::deserialize(val).ok();
|
||||
} else if key == "since" {
|
||||
@@ -80,7 +89,15 @@ impl<'de> Deserialize<'de> for ReqFilter {
|
||||
} else if key == "limit" {
|
||||
rf.limit = Deserialize::deserialize(val).ok();
|
||||
} else if key == "authors" {
|
||||
rf.authors = Deserialize::deserialize(val).ok();
|
||||
let raw_authors: Option<Vec<String>>= Deserialize::deserialize(val).ok();
|
||||
if let Some(a) = raw_authors.as_ref() {
|
||||
if a.contains(&empty_string) {
|
||||
return Err(serde::de::Error::invalid_type(
|
||||
Unexpected::Other("prefix matches must not be empty strings"),
|
||||
&"a json object"));
|
||||
}
|
||||
}
|
||||
rf.authors = raw_authors;
|
||||
} else if key.starts_with('#') && key.len() > 1 && val.is_array() {
|
||||
if let Some(tag_search) = tag_search_char_from_filter(key) {
|
||||
if ts.is_none() {
|
||||
@@ -183,6 +200,13 @@ impl Subscription {
|
||||
pub fn get_id(&self) -> String {
|
||||
self.id.clone()
|
||||
}
|
||||
|
||||
/// Determine if any filter is requesting historical (database)
|
||||
/// queries. If every filter has limit:0, we do not need to query the DB.
|
||||
pub fn needs_historical_events(&self) -> bool {
|
||||
self.filters.iter().any(|f| f.limit!=Some(0))
|
||||
}
|
||||
|
||||
/// Determine if this subscription matches a given [`Event`]. Any
|
||||
/// individual filter match is sufficient.
|
||||
pub fn interested_in_event(&self, event: &Event) -> bool {
|
||||
@@ -294,6 +318,24 @@ mod tests {
|
||||
assert!(serde_json::from_str::<Subscription>(raw_json).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn req_empty_authors_prefix() {
|
||||
let raw_json = "[\"REQ\",\"some-id\",{\"authors\": [\"\"]}]";
|
||||
assert!(serde_json::from_str::<Subscription>(raw_json).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn req_empty_ids_prefix() {
|
||||
let raw_json = "[\"REQ\",\"some-id\",{\"ids\": [\"\"]}]";
|
||||
assert!(serde_json::from_str::<Subscription>(raw_json).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn req_empty_ids_prefix_mixed() {
|
||||
let raw_json = "[\"REQ\",\"some-id\",{\"ids\": [\"\",\"aaa\"]}]";
|
||||
assert!(serde_json::from_str::<Subscription>(raw_json).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn legacy_filter() {
|
||||
// legacy field in filter
|
||||
|
Reference in New Issue
Block a user