Compare commits

...

62 Commits
0.8.0 ... 0.8.7

Author SHA1 Message Date
Greg Heartsfield
39f9984c4f build: bump version to 0.8.7 2023-02-17 21:05:36 -06:00
Greg Heartsfield
9d55731073 fix: Postgres SQL generation for expiring events 2023-02-17 21:04:30 -06:00
Greg Heartsfield
5638f70d66 fix: set SQL tracing back to appropriate level 2023-02-17 20:50:19 -06:00
Greg Heartsfield
98a08d054a improvement: advertise support for NIP-42 in relay info 2023-02-17 14:02:49 -06:00
Greg Heartsfield
0ef7d618a8 build: bump version to 0.8.6 2023-02-17 13:59:07 -06:00
Greg Heartsfield
bf06bea808 feat(NIP-40): postgres support for event expiration 2023-02-17 13:25:56 -06:00
Greg Heartsfield
e5ca8c2a86 improvement: run expired event cleanup every 10 minutes 2023-02-17 11:22:00 -06:00
Greg Heartsfield
8ea63f0b27 feat(NIP-40): sqlite support for event expiration 2023-02-17 11:15:06 -06:00
Greg Heartsfield
3229e4192f feat: publish favicon.ico 2023-02-16 18:03:28 -06:00
0xtr
7fd9b55e70 fix: typo in sqlite_migration.rs 2023-02-15 18:52:49 -06:00
rorp
5cecfba319 feat(NIP-42): pubkey authentication
Configurable in `config.toml`.  Limited functionality, but this does
send metadata to gRPC for event authorization.

fixes: https://todo.sr.ht/~gheartsfield/nostr-rs-relay/66
2023-02-15 18:51:40 -06:00
Greg Heartsfield
d0f57aea21 improvement(NIP-40): functions for checking event expiration 2023-02-15 18:47:27 -06:00
Yuval Adam
40abd6858e docs: cleanup location of documentation 2023-02-15 18:43:22 -06:00
Greg Heartsfield
136e41d234 fix: retry event writes if DB is busy 2023-02-15 18:38:34 -06:00
Yuval Adam
35a1973a46 fix: allow older versions of protobuf-compiler to work
Add --experimental_allow_proto3_optional protoc arg in build configs

fixes https://github.com/scsibug/nostr-rs-relay/issues/77
2023-02-14 16:59:41 -06:00
Kieran
1daa25600d fix: postgres tag inserts 2023-02-14 06:33:01 -06:00
Greg Heartsfield
692925942a build: bump version to 0.8.5 2023-02-13 17:53:33 -06:00
Greg Heartsfield
84afd4b64e refactor: whitespace 2023-02-13 17:52:00 -06:00
Greg Heartsfield
46160bb1f9 fix: correct name of gRPC configuration in toml 2023-02-13 17:30:26 -06:00
Greg Heartsfield
2fc9168a38 fix: SQL error with parameterized replaceable events 2023-02-13 17:10:42 -06:00
Greg Heartsfield
01d0d44868 build: bump version to 0.8.4 2023-02-13 09:34:30 -06:00
Greg Heartsfield
93f6337fda fix: upgrade docker image to include OpenSSL 3 2023-02-13 09:33:14 -06:00
Greg Heartsfield
f3a42712a6 build: bump version to 0.8.3 2023-02-13 08:08:28 -06:00
Greg Heartsfield
27361d064a improvement: upgrade multiple dependencies
Updating anyhow v1.0.68 -> v1.0.69
Updating axum v0.6.4 -> v0.6.6
Updating cxx v1.0.89 -> v1.0.90
Updating cxx-build v1.0.89 -> v1.0.90
Updating cxxbridge-flags v1.0.89 -> v1.0.90
Updating cxxbridge-macro v1.0.89 -> v1.0.90
Adding hermit-abi v0.3.1
Updating is-terminal v0.4.2 -> v0.4.3
Updating pest v2.5.4 -> v2.5.5
Updating pest_derive v2.5.4 -> v2.5.5
Updating pest_generator v2.5.4 -> v2.5.5
Updating pest_meta v2.5.4 -> v2.5.5
Updating proc-macro2 v1.0.50 -> v1.0.51
Updating raw-cpuid v10.6.0 -> v10.6.1
Updating rustix v0.36.7 -> v0.36.8
Updating serde_json v1.0.91 -> v1.0.93
Updating signal-hook-registry v1.4.0 -> v1.4.1
Updating thread_local v1.1.4 -> v1.1.7
Updating tinyvec_macros v0.1.0 -> v0.1.1
Updating tokio-native-tls v0.3.0 -> v0.3.1
Updating tokio-util v0.7.4 -> v0.7.7
2023-02-13 07:57:14 -06:00
Greg Heartsfield
3bafb611e5 build: install packages with sudo for github ci 2023-02-13 07:50:48 -06:00
Greg Heartsfield
b960ab70de build: add protobuf compiler to github ci workflow 2023-02-13 07:48:09 -06:00
Greg Heartsfield
15e2f097aa improvement: advise operator this upgrade may take a minute 2023-02-13 07:37:13 -06:00
Greg Heartsfield
185f9e7abb feat: improved query performance when looking for deletion events (improves event insert time) 2023-02-12 15:43:22 -06:00
Greg Heartsfield
f44dae6ac9 fix: use correct start time for logging SQL generation 2023-02-12 15:00:50 -06:00
Greg Heartsfield
abc356c17d perf(sqlite): index tags with their kind/created_at fields
This updates the DB schema to remove the distinction between hex and
non-hex tag values, for simplicity.  The space savings did not seem to
be worth the extra complexity.

The SQLite tags table is denormalized to duplicate kind/created_at to
improve the ability of tag indexes to filter data.
2023-02-12 14:33:40 -06:00
Greg Heartsfield
81f8256c37 fix: container builds support protobuf compilation 2023-02-11 14:30:42 -06:00
Greg Heartsfield
b3db2bd081 fix: protobuf compiler not needed in runtime container 2023-02-11 13:57:53 -06:00
Greg Heartsfield
d31e974d56 fix: add protobuf-compiler for Docker and CI builds 2023-02-11 13:56:15 -06:00
Greg Heartsfield
36eaf9fea5 improvement: make comments match code for nauthz example 2023-02-11 13:36:10 -06:00
Greg Heartsfield
a16c4e698a feat: gRPC authorization for events
closes: https://todo.sr.ht/~gheartsfield/nostr-rs-relay/46
2023-02-11 13:26:08 -06:00
Greg Heartsfield
e63d179424 fix: prevent loop when nip05 metadata channel closes 2023-02-11 13:26:08 -06:00
rorp
28b7b83a6e improvement: make config file location configurable via CLI args 2023-02-08 07:59:26 -06:00
Greg Heartsfield
2e42b1b86e improvement: log source IP for persisted events 2023-02-06 17:15:27 -06:00
Naoki Ikeguchi
bd07a11f50 refactor: Fix clippy warnings 2023-02-06 07:29:45 -06:00
Greg Heartsfield
bc4b45d4b8 docs: update DB maintenance for v0.8.x 2023-02-06 07:07:23 -06:00
thesimplekid
1ca5d652de format: postgres_migrations 2023-02-06 06:44:57 -06:00
thesimplekid
d7cceab8fc fix: tag table does not have a unique constraint
`cargo fmt` on the document.
2023-02-06 06:44:57 -06:00
Greg Heartsfield
2805a96e5b docs: nginx timeouts
suggested by Michael Dilger;
ref: https://snort.social/e/note15jtrt8zsrvckyv6hggmanwk43p50gvmxe30s62x9tt6x9hyruzaq6fca44
2023-02-05 17:17:13 -06:00
Greg Heartsfield
ac14a0759f docs: clarify wording around subscription limits 2023-02-03 13:08:31 -06:00
Greg Heartsfield
cdd4e5949f fix: correctly log SQL generation time 2023-02-03 10:39:41 -06:00
Greg Heartsfield
5999009779 improvement: increase connection cache size 2023-02-02 18:34:30 -06:00
Greg Heartsfield
e36c791c53 improvement: prevent spilling temp indexes to disk 2023-02-02 18:15:14 -06:00
Greg Heartsfield
d95adbcb3d build: bump version to 0.8.2 2023-02-02 16:21:45 -06:00
Greg Heartsfield
509736c56d improvement: update multiple dependencies
Updating cxx v1.0.88 -> v1.0.89
Updating cxx-build v1.0.88 -> v1.0.89
Updating cxxbridge-flags v1.0.88 -> v1.0.89
Updating cxxbridge-macro v1.0.88 -> v1.0.89
Updating heck v0.4.0 -> v0.4.1
Updating hyper v0.14.23 -> v0.14.24
Updating io-lifetimes v1.0.4 -> v1.0.5
Updating js-sys v0.3.60 -> v0.3.61
Updating parking_lot_core v0.9.6 -> v0.9.7
Updating sync_wrapper v0.1.1 -> v0.1.2
Updating wasm-bindgen v0.2.83 -> v0.2.84
Updating wasm-bindgen-backend v0.2.83 -> v0.2.84
Updating wasm-bindgen-futures v0.4.33 -> v0.4.34
Updating wasm-bindgen-macro v0.2.83 -> v0.2.84
Updating wasm-bindgen-macro-support v0.2.83 -> v0.2.84
Updating wasm-bindgen-shared v0.2.83 -> v0.2.84
Updating web-sys v0.3.60 -> v0.3.61
2023-02-02 16:12:49 -06:00
Greg Heartsfield
8004ea9b44 fix(NIP-33): only delete older events with matching 'd' tags 2023-02-02 16:09:17 -06:00
Greg Heartsfield
866c239cc9 improvement: simplify SQL queries for tags 2023-02-02 12:24:10 -06:00
Greg Heartsfield
6012b57e95 improvement: log connection details at INFO level 2023-02-02 11:55:41 -06:00
Greg Heartsfield
559541b160 build: bump version to 0.8.1 2023-02-01 18:16:08 -06:00
Greg Heartsfield
facaed7805 improvement: guidance for subscription limits 2023-02-01 18:09:30 -06:00
Greg Heartsfield
ba4fcd072a improvement: allow queries to be cancelled earlier (before SQL execution) 2023-02-01 18:09:30 -06:00
Greg Heartsfield
2b79099cfe improvement: drop slow readers more quickly 2023-02-01 18:09:30 -06:00
Greg Heartsfield
eb1d2d717d improvement: log sleeps due to full query_tx 2023-02-01 18:09:30 -06:00
Greg Heartsfield
e5e03d4378 improvement: log slow filter query time 2023-02-01 18:09:30 -06:00
Greg Heartsfield
c377b136aa improvement: prometheus metric for db connections (sqlite) 2023-02-01 18:09:30 -06:00
Greg Heartsfield
bca5614a82 perf: hold database handle through all filters when querying 2023-02-01 18:09:30 -06:00
Greg Heartsfield
f7550b4c61 improvement: more precise log message 2023-02-01 18:09:30 -06:00
Greg Heartsfield
1623bacd0d improvement(NIP-33): advertise support for parameterized replaceable events 2023-02-01 18:09:27 -06:00
39 changed files with 2925 additions and 351 deletions

View File

@@ -7,6 +7,7 @@ environment:
packages:
- cargo
- sqlite-devel
- protobuf-compiler
sources:
- https://git.sr.ht/~gheartsfield/nostr-rs-relay/
shell: false

View File

@@ -4,7 +4,7 @@ on:
push:
branches:
- master
jobs:
test_nostr-rs-relay:
runs-on: ubuntu-latest
@@ -13,26 +13,27 @@ jobs:
- name: Update local toolchain
run: |
sudo apt-get install -y protobuf-compiler
rustup update
rustup component add clippy
rustup install nightly
rustup install nightly
- name: Toolchain info
run: |
cargo --version --verbose
rustc --version
cargo clippy --version
cargo clippy --version
# - name: Lint
# run: |
# cargo fmt -- --check
# cargo clippy -- -D warnings
# cargo clippy -- -D warnings
- name: Test
run: |
cargo check
cargo test --all
cargo test --all
- name: Build
run: |
cargo build --release
cargo build --release --locked

2
.gitignore vendored
View File

@@ -1,2 +1,2 @@
/target
**/target/
nostr.db

275
Cargo.lock generated
View File

@@ -54,9 +54,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.68"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61"
checksum = "224afbd727c3d6e4b90103ece64b8d1b67fbb1973b1046c2281eed3f3803f800"
[[package]]
name = "async-channel"
@@ -115,7 +115,7 @@ dependencies = [
"slab",
"socket2",
"waker-fn",
"windows-sys",
"windows-sys 0.42.0",
]
[[package]]
@@ -224,9 +224,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.6.4"
version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5694b64066a2459918d8074c2ce0d5a88f409431994c2356617c8ae0c4721fc"
checksum = "4e246206a63c9830e118d12c894f56a82033da1a2361f5544deeee3df85c99d9"
dependencies = [
"async-trait",
"axum-core",
@@ -463,7 +463,7 @@ dependencies = [
"lazy_static",
"libc",
"unicode-width",
"windows-sys",
"windows-sys 0.42.0",
]
[[package]]
@@ -622,9 +622,9 @@ dependencies = [
[[package]]
name = "cxx"
version = "1.0.88"
version = "1.0.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "322296e2f2e5af4270b54df9e85a02ff037e271af20ba3e7fe1575515dc840b8"
checksum = "90d59d9acd2a682b4e40605a242f6670eaa58c5957471cbf85e8aa6a0b97a5e8"
dependencies = [
"cc",
"cxxbridge-flags",
@@ -634,9 +634,9 @@ dependencies = [
[[package]]
name = "cxx-build"
version = "1.0.88"
version = "1.0.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "017a1385b05d631e7875b1f151c9f012d37b53491e2a87f65bff5c262b2111d8"
checksum = "ebfa40bda659dd5c864e65f4c9a2b0aff19bea56b017b9b77c73d3766a453a38"
dependencies = [
"cc",
"codespan-reporting",
@@ -649,15 +649,15 @@ dependencies = [
[[package]]
name = "cxxbridge-flags"
version = "1.0.88"
version = "1.0.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c26bbb078acf09bc1ecda02d4223f03bdd28bd4874edcb0379138efc499ce971"
checksum = "457ce6757c5c70dc6ecdbda6925b958aae7f959bda7d8fb9bde889e34a09dc03"
[[package]]
name = "cxxbridge-macro"
version = "1.0.88"
version = "1.0.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "357f40d1f06a24b60ae1fe122542c1fb05d28d32acb2aed064e84bc2ad1e252e"
checksum = "ebf883b7aacd7b2aeb2a7b338648ee19f57c140d4ee8e52c68979c6b2f7f2263"
dependencies = [
"proc-macro2",
"quote",
@@ -674,7 +674,7 @@ dependencies = [
"hashbrown 0.12.3",
"lock_api",
"once_cell",
"parking_lot_core 0.9.6",
"parking_lot_core 0.9.7",
]
[[package]]
@@ -783,6 +783,12 @@ dependencies = [
"instant",
]
[[package]]
name = "fixedbitset"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]]
name = "flate2"
version = "1.0.25"
@@ -1079,9 +1085,9 @@ dependencies = [
[[package]]
name = "heck"
version = "0.4.0"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
dependencies = [
"unicode-segmentation",
]
@@ -1095,6 +1101,12 @@ dependencies = [
"libc",
]
[[package]]
name = "hermit-abi"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286"
[[package]]
name = "hex"
version = "0.4.3"
@@ -1167,9 +1179,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
version = "0.14.23"
version = "0.14.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "034711faac9d2166cb1baf1a2fb0b60b1f277f8492fd72176c17f3515e1abd3c"
checksum = "5e011372fa0b68db8350aa7a248930ecc7839bf46d8485577d69f117a75f164c"
dependencies = [
"bytes",
"futures-channel",
@@ -1281,24 +1293,24 @@ dependencies = [
[[package]]
name = "io-lifetimes"
version = "1.0.4"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7d6c6f8c91b4b9ed43484ad1a938e393caf35960fce7f82a040497207bd8e9e"
checksum = "1abeb7a0dd0f8181267ff8adc397075586500b81b28a73e8a0208b00fc170fb3"
dependencies = [
"libc",
"windows-sys",
"windows-sys 0.45.0",
]
[[package]]
name = "is-terminal"
version = "0.4.2"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28dfb6c8100ccc63462345b67d1bbc3679177c75ee4bf59bf29c8b1d110b8189"
checksum = "22e18b0a45d56fe973d6db23972bf5bc46f988a4a2385deac9cc29572f09daef"
dependencies = [
"hermit-abi",
"hermit-abi 0.3.1",
"io-lifetimes",
"rustix",
"windows-sys",
"windows-sys 0.45.0",
]
[[package]]
@@ -1318,9 +1330,9 @@ checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440"
[[package]]
name = "js-sys"
version = "0.3.60"
version = "0.3.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49409df3e3bf0856b916e2ceaca09ee28e6871cf7d9ce97a692cacfdb2a25a47"
checksum = "445dde2150c55e483f3d8416706b97ec8e8237c307e5b7b4b8dd15e6af2a0730"
dependencies = [
"wasm-bindgen",
]
@@ -1487,9 +1499,15 @@ dependencies = [
"libc",
"log",
"wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys",
"windows-sys 0.42.0",
]
[[package]]
name = "multimap"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "native-tls"
version = "0.2.11"
@@ -1532,7 +1550,7 @@ checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]]
name = "nostr-rs-relay"
version = "0.8.0"
version = "0.8.7"
dependencies = [
"anyhow",
"async-std",
@@ -1556,6 +1574,7 @@ dependencies = [
"nonzero_ext",
"parse_duration",
"prometheus",
"prost",
"r2d2",
"r2d2_sqlite",
"rand 0.8.5",
@@ -1568,9 +1587,12 @@ dependencies = [
"thiserror",
"tokio",
"tokio-tungstenite",
"tonic",
"tonic-build",
"tracing",
"tracing-subscriber 0.2.25",
"tungstenite",
"url",
"uuid",
]
@@ -1657,7 +1679,7 @@ version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b"
dependencies = [
"hermit-abi",
"hermit-abi 0.2.6",
"libc",
]
@@ -1758,7 +1780,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core 0.9.6",
"parking_lot_core 0.9.7",
]
[[package]]
@@ -1777,15 +1799,15 @@ dependencies = [
[[package]]
name = "parking_lot_core"
version = "0.9.6"
version = "0.9.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba1ef8814b5c993410bb3adfad7a5ed269563e4a2f90c41f5d85be7fb47133bf"
checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-sys",
"windows-sys 0.45.0",
]
[[package]]
@@ -1819,9 +1841,9 @@ checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
[[package]]
name = "pest"
version = "2.5.4"
version = "2.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ab62d2fa33726dbe6321cc97ef96d8cde531e3eeaf858a058de53a8a6d40d8f"
checksum = "028accff104c4e513bad663bbcd2ad7cfd5304144404c31ed0a77ac103d00660"
dependencies = [
"thiserror",
"ucd-trie",
@@ -1829,9 +1851,9 @@ dependencies = [
[[package]]
name = "pest_derive"
version = "2.5.4"
version = "2.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bf026e2d0581559db66d837fe5242320f525d85c76283c61f4d51a1238d65ea"
checksum = "2ac3922aac69a40733080f53c1ce7f91dcf57e1a5f6c52f421fadec7fbdc4b69"
dependencies = [
"pest",
"pest_generator",
@@ -1839,9 +1861,9 @@ dependencies = [
[[package]]
name = "pest_generator"
version = "2.5.4"
version = "2.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b27bd18aa01d91c8ed2b61ea23406a676b42d82609c6e2581fba42f0c15f17f"
checksum = "d06646e185566b5961b4058dd107e0a7f56e77c3f484549fb119867773c0f202"
dependencies = [
"pest",
"pest_meta",
@@ -1852,15 +1874,25 @@ dependencies = [
[[package]]
name = "pest_meta"
version = "2.5.4"
version = "2.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f02b677c1859756359fc9983c2e56a0237f18624a3789528804406b7e915e5d"
checksum = "e6f60b2ba541577e2a0c307c8f39d1439108120eb7903adeb6497fa880c59616"
dependencies = [
"once_cell",
"pest",
"sha2",
]
[[package]]
name = "petgraph"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dd7d28ee937e54fe3080c91faa1c3a46c06de6252988a7f4592ba2310ef22a4"
dependencies = [
"fixedbitset",
"indexmap",
]
[[package]]
name = "pin-project"
version = "1.0.12"
@@ -1910,7 +1942,7 @@ dependencies = [
"libc",
"log",
"wepoll-ffi",
"windows-sys",
"windows-sys 0.42.0",
]
[[package]]
@@ -1925,6 +1957,16 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "prettyplease"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e97e3215779627f01ee256d2fad52f3d95e8e1c11e9fc6fd08f7cd455d5d5c78"
dependencies = [
"proc-macro2",
"syn",
]
[[package]]
name = "proc-macro-error"
version = "1.0.4"
@@ -1951,9 +1993,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.50"
version = "1.0.51"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ef7d57beacfaf2d8aee5937dab7b7f28de3cb8b1828479bb5de2a7106f2bae2"
checksum = "5d727cae5b39d21da60fa540906919ad737832fe0b1c165da3a34d6548c849d6"
dependencies = [
"unicode-ident",
]
@@ -1983,6 +2025,28 @@ dependencies = [
"prost-derive",
]
[[package]]
name = "prost-build"
version = "0.11.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3f8ad728fb08fe212df3c05169e940fbb6d9d16a877ddde14644a983ba2012e"
dependencies = [
"bytes",
"heck",
"itertools",
"lazy_static",
"log",
"multimap",
"petgraph",
"prettyplease",
"prost",
"prost-types",
"regex",
"syn",
"tempfile",
"which",
]
[[package]]
name = "prost-derive"
version = "0.11.6"
@@ -2196,9 +2260,9 @@ dependencies = [
[[package]]
name = "raw-cpuid"
version = "10.6.0"
version = "10.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6823ea29436221176fe662da99998ad3b4db2c7f31e7b6f5fe43adccd6320bb"
checksum = "c307f7aacdbab3f0adee67d52739a1d71112cc068d6fab169ddeb18e48877fad"
dependencies = [
"bitflags",
]
@@ -2320,16 +2384,16 @@ dependencies = [
[[package]]
name = "rustix"
version = "0.36.7"
version = "0.36.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fdebc4b395b7fbb9ab11e462e20ed9051e7b16e42d24042c776eca0ac81b03"
checksum = "f43abb88211988493c1abb44a70efa56ff0ce98f233b7b276146f1f3f7ba9644"
dependencies = [
"bitflags",
"errno",
"io-lifetimes",
"libc",
"linux-raw-sys",
"windows-sys",
"windows-sys 0.45.0",
]
[[package]]
@@ -2371,7 +2435,7 @@ version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "713cfb06c7059f3588fb8044c0fad1d09e3c01d225e25b9220dbfdcf16dbb1b3"
dependencies = [
"windows-sys",
"windows-sys 0.42.0",
]
[[package]]
@@ -2471,9 +2535,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.91"
version = "1.0.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877c235533714907a8c2464236f5c4b2a17262ef1bd71f38f35ea592c8da6883"
checksum = "cad406b69c91885b5107daf2c29572f6c8cdb3c66826821e286c533490c0bc76"
dependencies = [
"indexmap",
"itoa",
@@ -2525,9 +2589,9 @@ dependencies = [
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0"
checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1"
dependencies = [
"libc",
]
@@ -2703,9 +2767,9 @@ dependencies = [
[[package]]
name = "sync_wrapper"
version = "0.1.1"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "tempfile"
@@ -2752,10 +2816,11 @@ dependencies = [
[[package]]
name = "thread_local"
version = "1.1.4"
version = "1.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180"
checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152"
dependencies = [
"cfg-if",
"once_cell",
]
@@ -2781,9 +2846,9 @@ dependencies = [
[[package]]
name = "tinyvec_macros"
version = "0.1.0"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
@@ -2803,7 +2868,7 @@ dependencies = [
"socket2",
"tokio-macros",
"tracing",
"windows-sys",
"windows-sys 0.42.0",
]
[[package]]
@@ -2829,9 +2894,9 @@ dependencies = [
[[package]]
name = "tokio-native-tls"
version = "0.3.0"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b"
checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2"
dependencies = [
"native-tls",
"tokio",
@@ -2873,9 +2938,9 @@ dependencies = [
[[package]]
name = "tokio-util"
version = "0.7.4"
version = "0.7.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740"
checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2"
dependencies = [
"bytes",
"futures-core",
@@ -2926,6 +2991,19 @@ dependencies = [
"tracing-futures",
]
[[package]]
name = "tonic-build"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bf5e9b9c0f7e0a7c027dcfaba7b2c60816c7049171f679d99ee2ff65d0de8c4"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build",
"quote",
"syn",
]
[[package]]
name = "tower"
version = "0.4.13"
@@ -3251,9 +3329,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
version = "0.2.83"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eaf9f5aceeec8be17c128b2e93e031fb8a4d469bb9c4ae2d7dc1888b26887268"
checksum = "31f8dcbc21f30d9b8f2ea926ecb58f6b91192c17e9d33594b3df58b2007ca53b"
dependencies = [
"cfg-if",
"wasm-bindgen-macro",
@@ -3261,9 +3339,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.83"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c8ffb332579b0557b52d268b91feab8df3615f265d5270fec2a8c95b17c1142"
checksum = "95ce90fd5bcc06af55a641a86428ee4229e44e07033963a2290a8e241607ccb9"
dependencies = [
"bumpalo",
"log",
@@ -3276,9 +3354,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.33"
version = "0.4.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23639446165ca5a5de86ae1d8896b737ae80319560fbaa4c2887b7da6e7ebd7d"
checksum = "f219e0d211ba40266969f6dbdd90636da12f75bee4fc9d6c23d1260dadb51454"
dependencies = [
"cfg-if",
"js-sys",
@@ -3288,9 +3366,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.83"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "052be0f94026e6cbc75cdefc9bae13fd6052cdcaf532fa6c45e7ae33a1e6c810"
checksum = "4c21f77c0bedc37fd5dc21f897894a5ca01e7bb159884559461862ae90c0b4c5"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@@ -3298,9 +3376,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.83"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07bc0c051dc5f23e307b13285f9d75df86bfdf816c5721e573dec1f9b8aa193c"
checksum = "2aff81306fcac3c7515ad4e177f521b5c9a15f2b08f4e32d823066102f35a5f6"
dependencies = [
"proc-macro2",
"quote",
@@ -3311,15 +3389,15 @@ dependencies = [
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.83"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c38c045535d93ec4f0b4defec448e4291638ee608530863b1e2ba115d4fff7f"
checksum = "0046fef7e28c3804e5e38bfa31ea2a0f73905319b677e57ebe37e49358989b5d"
[[package]]
name = "web-sys"
version = "0.3.60"
version = "0.3.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bcda906d8be16e728fd5adc5b729afad4e444e106ab28cd1c7256e54fa61510f"
checksum = "e33b99f4b23ba3eec1a53ac264e35a755f00e966e0065077d6027c0f575b0b97"
dependencies = [
"js-sys",
"wasm-bindgen",
@@ -3353,6 +3431,17 @@ dependencies = [
"cc",
]
[[package]]
name = "which"
version = "4.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2441c784c52b289a054b7201fc93253e288f094e2f4be9058343127c4226a269"
dependencies = [
"either",
"libc",
"once_cell",
]
[[package]]
name = "whoami"
version = "1.3.0"
@@ -3409,6 +3498,30 @@ dependencies = [
"windows_x86_64_msvc",
]
[[package]]
name = "windows-sys"
version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-targets"
version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e2522491fbfcd58cc84d47aeb2958948c4b8982e9a2d8a2a35bbaed431390e7"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.42.1"

View File

@@ -1,6 +1,6 @@
[package]
name = "nostr-rs-relay"
version = "0.8.0"
version = "0.8.7"
edition = "2021"
authors = ["Greg Heartsfield <scsibug@imap.cc>"]
description = "A relay implementation for the Nostr protocol"
@@ -16,6 +16,8 @@ clap = { version = "4.0.32", features = ["env", "default", "derive"]}
tracing = "0.1.36"
tracing-subscriber = "0.2.0"
tokio = { version = "1", features = ["full", "tracing", "signal"] }
prost = "0.11"
tonic = "0.8.3"
console-subscriber = "0.1.8"
futures = "0.3"
futures-util = "0.3"
@@ -49,6 +51,10 @@ chrono = "0.4.23"
prometheus = "0.13.3"
indicatif = "0.17.3"
bech32 = "0.9.1"
url = "2.3.1"
[dev-dependencies]
anyhow = "1"
[build-dependencies]
tonic-build = { version="0.8.3", features = ["prost"] }

View File

@@ -1,5 +1,7 @@
FROM docker.io/library/rust:1.67.0 as builder
FROM docker.io/library/rust:1-bookworm as builder
RUN apt-get update \
&& apt-get install -y cmake protobuf-compiler \
&& rm -rf /var/lib/apt/lists/*
RUN USER=root cargo install cargo-auditable
RUN USER=root cargo new --bin nostr-rs-relay
WORKDIR ./nostr-rs-relay
@@ -12,12 +14,14 @@ RUN rm src/*.rs
# copy project source code
COPY ./src ./src
COPY ./proto ./proto
COPY ./build.rs ./build.rs
# build auditable release using locked deps
RUN rm ./target/release/deps/nostr*relay*
RUN cargo auditable build --release --locked
FROM docker.io/library/debian:bullseye-slim
FROM docker.io/library/debian:bookworm-slim
ARG APP=/usr/src/app
ARG APP_DATA=/usr/src/app/db

View File

@@ -35,6 +35,7 @@ mirrored on [GitHub](https://github.com/scsibug/nostr-rs-relay).
- [ ] NIP-26: [Event Delegation](https://github.com/nostr-protocol/nips/blob/master/26.md) (_implemented, but currently disabled_)
- [x] NIP-28: [Public Chat](https://github.com/nostr-protocol/nips/blob/master/28.md)
- [x] NIP-33: [Parameterized Replaceable Events](https://github.com/nostr-protocol/nips/blob/master/33.md)
- [x] NIP-42: [Authentication of clients to relays](https://github.com/nostr-protocol/nips/blob/master/42.md)
## Quick Start
@@ -139,7 +140,7 @@ settings.
For examples of putting the relay behind a reverse proxy (for TLS
termination, load balancing, and other features), see [Reverse
Proxy](reverse-proxy.md).
Proxy](docs/reverse-proxy.md).
## Dev Channel

10
build.rs Normal file
View File

@@ -0,0 +1,10 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.build_server(false)
.protoc_arg("--experimental_allow_proto3_optional")
.compile(
&["proto/nauthz.proto"],
&["proto"],
)?;
Ok(())
}

View File

@@ -16,6 +16,10 @@ description = "A newly created nostr-rs-relay.\n\nCustomize this with your own i
# Administrative contact URI
#contact = "mailto:contact@example.com"
# Favicon location. Relative to the current directory. Assumes an
# ICO format.
#favicon = "favicon.ico"
[diagnostics]
# Enable tokio tracing (for use with tokio-console)
#tracing = false
@@ -48,6 +52,16 @@ description = "A newly created nostr-rs-relay.\n\nCustomize this with your own i
# sqlite.
#connection = "postgresql://postgres:nostr@localhost:7500/nostr"
[grpc]
# gRPC interfaces for externalized decisions and other extensions to
# functionality.
#
# Events can be authorized through an external service, by providing
# the URL below. In the event the server is not accessible, events
# will be permitted. The protobuf3 schema used is available in
# `proto/nauthz.proto`.
# event_admission_server = "http://[::1]:50051"
[network]
# Bind to this network address
address = "0.0.0.0"
@@ -79,9 +93,10 @@ reject_future_seconds = 1800
#
#messages_per_sec = 5
# Limit client subscriptions created per second, averaged over one
# minute. Must be an integer. If not set (or set to 0), defaults to
# unlimited.
# Limit client subscriptions created, averaged over one minute. Must
# be an integer. If not set (or set to 0), defaults to unlimited.
# Strongly recommended to set this to a low value such as 10 to ensure
# fair service.
#subscriptions_per_min = 0
# UNIMPLEMENTED...
@@ -125,6 +140,8 @@ reject_future_seconds = 1800
# "35d26e4690cbe1a898af61cc3515661eb5fa763b57bd0b42e45099c8b32fd50f",
# "887645fef0ce0c3c1218d2f5d8e6132a19304cdc57cd20281d082f38cfea0072",
#]
# Enable NIP-42 authentication
#nip42_auth = false
[verified_users]
# NIP-05 verification of users. Can be "enabled" to require NIP-05

View File

@@ -7,7 +7,7 @@ intervention. For heavily trafficked relays, there are a number of
steps that the operator may need to take to maintain performance and
limit disk usage.
This maintenance guide is current as of version `0.7.14`. Future
This maintenance guide is current as of version `0.8.2`. Future
versions may incorporate and automate some of these steps.
## Backing Up the Database
@@ -43,18 +43,15 @@ vacuum;
## Clearing Hidden Events
When events are deleted, either through deletion events, metadata or
follower updates, or a replaceable event kind, the event is not
actually removed from the database. Instead, a flag `HIDDEN` is set
to true for the event, which excludes it from search results. The
original intent was to ensure that subsequent rebroadcasts of the
event would be easily detected as having been deleted, and would not
need to be stored again. In practice, this decision causes excessive
growth of the `tags` table, since all the previous followers are
retained for those `HIDDEN` events.
When events are deleted, the event is not actually removed from the
database. Instead, a flag `HIDDEN` is set to true for the event,
which excludes it from search results. High volume replacements from
profile or other replaceable events are deleted, not hidden, in the
current version of the relay.
The `event` and especially the `tag` table can be significantly
reduced in size by running these commands:
In the current version, removing hidden events should not result in
significant space savings, but it can still be used if there is no
desire to hold on to events that can never be re-broadcast.
```console
PRAGMA foreign_keys = ON;

79
docs/grpc-extensions.md Normal file
View File

@@ -0,0 +1,79 @@
# gRPC Extensions Design Document
The relay will be extensible through gRPC endpoints, definable in the
main configuration file. These will allow external programs to host
logic for deciding things such as, should this event be persisted,
should this connection be allowed, and should this subscription
request be registered. The primary goal is allow for relay operator
specific functionality that allows them to serve smaller communities
and reduce spam and abuse.
This will likely evolve substantially, the first goal is to get a
basic one-way service that lets an externalized program decide on
event persistance. This does not represent the final state of gRPC
extensibility in `nostr-rs-relay`.
## Considerations
Write event latency must not be significantly affected. However, the
primary reason we are implementing this is spam/abuse protection, so
we are willing to tolerate some increase in latency if that protects
us against outages!
The interface should provide enough information to make simple
decisions, without burdening the relay to do extra queries. The
decision endpoint will be mostly responsible for maintaining state and
gathering additional details.
## Design Overview
A gRPC server may be defined in the `config.toml` file. If it exists,
the relay will attempt to connect to it and send a message for each
`EVENT` command submitted by clients. If a successful response is
returned indicating the event is permitted, the relay continues
processing the event as normal. All existing whitelist, blacklist,
and `NIP-05` validation checks are still performed and MAY still
result in the event being rejected. If a successful response is
returned indicated the decision is anything other than permit, then
the relay MUST reject the event, and return a command result to the
user (using `NIP-20`) indicating the event was blocked (optionally
providing a message).
In the event there is an error in the gRPC interface, event processing
proceeds as if gRPC was disabled (fail open). This allows gRPC
servers to be deployed with minimal chance of causing a full relay
outage.
## Design Details
Currently one procedure call is supported, `EventAdmit`, in the
`Authorization` service. It accepts the following data in order to
support authorization decisions:
- The event itself
- The client IP that submitted the event
- The client's HTTP origin header, if one exists
- The client's HTTP user agent header, if one exists
- The public key of the client, if `NIP-42` authentication was
performed (not supported in the relay yet!)
- The `NIP-05` associated with the event's public key, if it is known
to the relay
A server providing authorization decisions will return the following:
- A decision to permit or deny the event
- An optional message that explains why the event was denied, to be
transmitted to the client
## Security Issues
There is little attempt to secure this interface, since it is intended
for use processes running on the same host. It is recommended to
ensure that the gRPC server providing the API is not exposed to the
public Internet. Authorization server implementations should have
their own security reviews performed.
A slow gRPC server could cause availability issues for event
processing, since this is performed on a single thread. Avoid any
expensive or long-running processes that could result from submitted
events, since any client can initiate a gRPC call to the service.

View File

@@ -92,6 +92,8 @@ http {
location / {
proxy_pass http://localhost:8080;
proxy_http_version 1.1;
proxy_read_timeout 1d;
proxy_send_timeout 1d;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
proxy_set_header Host $host;
@@ -102,7 +104,7 @@ http {
### Nginx Notes
The above configuration was tested on `nginx` `1.18.0` was tested on `Ubuntu 20.04`.
The above configuration was tested on `nginx` `1.18.0` on `Ubuntu` `20.04` and `22.04`
For help installing `nginx` on `Ubuntu`, see [this guide](https://www.digitalocean.com/community/tutorials/how-to-install-nginx-on-ubuntu-20-04).

1010
examples/nauthz/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,13 @@
[package]
name = "nauthz-server"
version = "0.1.0"
edition = "2021"
[dependencies]
# Common dependencies
tokio = { version = "1.0", features = ["rt-multi-thread", "macros"] }
prost = "0.11"
tonic = "0.8.3"
[build-dependencies]
tonic-build = { version="0.8.3", features = ["prost"] }

7
examples/nauthz/build.rs Normal file
View File

@@ -0,0 +1,7 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.build_server(false)
.protoc_arg("--experimental_allow_proto3_optional")
.compile(&["../../proto/nauthz.proto"], &["../../proto"])?;
Ok(())
}

View File

@@ -0,0 +1,60 @@
use tonic::{transport::Server, Request, Response, Status};
use nauthz_grpc::authorization_server::{Authorization, AuthorizationServer};
use nauthz_grpc::{Decision, EventReply, EventRequest};
pub mod nauthz_grpc {
tonic::include_proto!("nauthz");
}
#[derive(Default)]
pub struct EventAuthz {
allowed_kinds: Vec<u64>,
}
#[tonic::async_trait]
impl Authorization for EventAuthz {
async fn event_admit(
&self,
request: Request<EventRequest>,
) -> Result<Response<EventReply>, Status> {
let reply;
let req = request.into_inner();
let event = req.event.unwrap();
let content_prefix: String = event.content.chars().take(40).collect();
println!("recvd event, [kind={}, origin={:?}, nip05_domain={:?}, tag_count={}, content_sample={:?}]",
event.kind, req.origin, req.nip05.map(|x| x.domain), event.tags.len(), content_prefix);
// Permit any event with a whitelisted kind
if self.allowed_kinds.contains(&event.kind) {
println!("This looks fine! (kind={})", event.kind);
reply = nauthz_grpc::EventReply {
decision: Decision::Permit as i32,
message: None,
};
} else {
println!("Blocked! (kind={})", event.kind);
reply = nauthz_grpc::EventReply {
decision: Decision::Deny as i32,
message: Some(format!("kind {} not permitted", event.kind)),
};
}
Ok(Response::new(reply))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse().unwrap();
// A simple authorization engine that allows kinds 0-3
let checker = EventAuthz {
allowed_kinds: vec![0, 1, 2, 3],
};
println!("EventAuthz Server listening on {}", addr);
// Start serving
Server::builder()
.add_service(AuthorizationServer::new(checker))
.serve(addr)
.await?;
Ok(())
}

60
proto/nauthz.proto Normal file
View File

@@ -0,0 +1,60 @@
syntax = "proto3";
// Nostr Authorization Services
package nauthz;
// Authorization for actions against a relay
service Authorization {
// Determine if an event should be admitted to the relay
rpc EventAdmit(EventRequest) returns (EventReply) {}
}
message Event {
bytes id = 1; // 32-byte SHA256 hash of serialized event
bytes pubkey = 2; // 32-byte public key of event creator
fixed64 created_at = 3; // UNIX timestamp provided by event creator
uint64 kind = 4; // event kind
string content = 5; // arbitrary event contents
repeated TagEntry tags = 6; // event tag array
bytes sig = 7; // 32-byte signature of the event id
// Individual values for a single tag
message TagEntry {
repeated string values = 1;
}
}
// Event data and metadata for authorization decisions
message EventRequest {
Event event =
1; // the event to be admitted for further relay processing
optional string ip_addr =
2; // IP address of the client that submitted the event
optional string origin =
3; // HTTP origin header from the client, if one exists
optional string user_agent =
4; // HTTP user-agent header from the client, if one exists
optional bytes auth_pubkey =
5; // the public key associated with a NIP-42 AUTH'd session, if
// authentication occurred
optional Nip05Name nip05 =
6; // NIP-05 address associated with the event pubkey, if it is
// known and has been validated by the relay
// A NIP_05 verification record
message Nip05Name {
string local = 1;
string domain = 2;
}
}
// A permit or deny decision
enum Decision {
DECISION_UNSPECIFIED = 0;
DECISION_PERMIT = 1; // Admit this event for further processing
DECISION_DENY = 2; // Deny persisting or propagating this event
}
// Response to a event authorization request
message EventReply {
Decision decision = 1; // decision to enforce
optional string message = 2; // informative message for the client
}

View File

@@ -20,7 +20,7 @@ pub fn main() -> Result<()> {
let _trace_sub = tracing_subscriber::fmt::try_init();
println!("Nostr-rs-relay Bulk Loader");
// check for a database file, or create one.
let settings = config::Settings::new();
let settings = config::Settings::new(&None);
if !Path::new(&settings.database.data_directory).is_dir() {
info!("Database directory does not exist");
return Err(Error::DatabaseDirError);
@@ -35,7 +35,7 @@ pub fn main() -> Result<()> {
// ensure the schema version is current.
if version != DB_VERSION {
info!("version is not current, exiting");
panic!("cannot write to schema other than v{}", DB_VERSION);
panic!("cannot write to schema other than v{DB_VERSION}");
}
}
// this channel will contain parsed events ready to be inserted

View File

@@ -10,4 +10,11 @@ pub struct CLIArgs {
required = false,
)]
pub db: Option<String>,
#[arg(
short,
long,
help = "Use the <file name> as the location of the config file",
required = false,
)]
pub config: Option<String>,
}

View File

@@ -12,6 +12,7 @@ pub struct Info {
pub description: Option<String>,
pub pubkey: Option<String>,
pub contact: Option<String>,
pub favicon: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -25,6 +26,12 @@ pub struct Database {
pub connection: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[allow(unused)]
pub struct Grpc {
pub event_admission_server: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[allow(unused)]
pub struct Network {
@@ -69,6 +76,7 @@ pub struct Limits {
#[allow(unused)]
pub struct Authorization {
pub pubkey_whitelist: Option<Vec<String>>, // If present, only allow these pubkeys to publish events
pub nip42_auth: bool, // if true enables NIP-42 authentication
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -145,6 +153,7 @@ pub struct Settings {
pub info: Info,
pub diagnostics: Diagnostics,
pub database: Database,
pub grpc: Grpc,
pub network: Network,
pub limits: Limits,
pub authorization: Authorization,
@@ -155,10 +164,10 @@ pub struct Settings {
impl Settings {
#[must_use]
pub fn new() -> Self {
pub fn new(config_file_name: &Option<String>) -> Self {
let default_settings = Self::default();
// attempt to construct settings with file
let from_file = Self::new_from_default(&default_settings);
let from_file = Self::new_from_default(&default_settings, config_file_name);
match from_file {
Ok(f) => f,
Err(e) => {
@@ -168,13 +177,19 @@ impl Settings {
}
}
fn new_from_default(default: &Settings) -> Result<Self, ConfigError> {
fn new_from_default(default: &Settings, config_file_name: &Option<String>) -> Result<Self, ConfigError> {
let default_config_file_name = "config.toml".to_string();
let config: &String = match config_file_name {
Some(value) => value,
None => &default_config_file_name
};
let builder = Config::builder();
let config: Config = builder
// use defaults
.add_source(Config::try_from(default)?)
// override with file contents
.add_source(File::with_name("config.toml"))
.add_source(File::with_name(config))
.build()?;
let mut settings: Settings = config.try_deserialize()?;
// ensure connection pool size is logical
@@ -204,6 +219,7 @@ impl Default for Settings {
description: None,
pubkey: None,
contact: None,
favicon: None,
},
diagnostics: Diagnostics { tracing: false },
database: Database {
@@ -214,6 +230,9 @@ impl Default for Settings {
max_conn: 8,
connection: "".to_owned(),
},
grpc: Grpc {
event_admission_server: None,
},
network: Network {
port: 8080,
ping_interval_seconds: 300,
@@ -234,6 +253,7 @@ impl Default for Settings {
},
authorization: Authorization {
pubkey_whitelist: None, // Allow any address to publish
nip42_auth: false, // Disable NIP-42 authentication
},
verified_users: VerifiedUsers {
mode: VerifiedUsersMode::Disabled,

View File

@@ -1,16 +1,30 @@
//! Client connection state
use crate::close::Close;
use crate::error::Error;
use crate::error::Result;
use crate::subscription::Subscription;
use std::collections::HashMap;
use tracing::{debug, trace};
use uuid::Uuid;
use crate::close::Close;
use crate::conn::Nip42AuthState::{AuthPubkey, Challenge, NoAuth};
use crate::error::Error;
use crate::error::Result;
use crate::event::Event;
use crate::subscription::Subscription;
use crate::utils::{host_str, unix_time};
/// A subscription identifier has a maximum length
const MAX_SUBSCRIPTION_ID_LEN: usize = 256;
/// NIP-42 authentication state
pub enum Nip42AuthState {
/// The client is not authenticated yet
NoAuth,
/// The AUTH challenge sent
Challenge(String),
/// The client is authenticated
AuthPubkey(String),
}
/// State for a client connection
pub struct ClientConn {
/// Client IP (either from socket, or configured proxy header
@@ -21,6 +35,8 @@ pub struct ClientConn {
subscriptions: HashMap<String, Subscription>,
/// Per-connection maximum concurrent subscriptions
max_subs: usize,
/// NIP-42 AUTH
auth: Nip42AuthState,
}
impl Default for ClientConn {
@@ -39,15 +55,18 @@ impl ClientConn {
client_id,
subscriptions: HashMap::new(),
max_subs: 32,
auth: NoAuth,
}
}
#[must_use] pub fn subscriptions(&self) -> &HashMap<String, Subscription> {
#[must_use]
pub fn subscriptions(&self) -> &HashMap<String, Subscription> {
&self.subscriptions
}
/// Check if the given subscription already exists
#[must_use] pub fn has_subscription(&self, sub: &Subscription) -> bool {
#[must_use]
pub fn has_subscription(&self, sub: &Subscription) -> bool {
self.subscriptions.values().any(|x| x == sub)
}
@@ -63,6 +82,22 @@ impl ClientConn {
&self.client_ip_addr
}
#[must_use]
pub fn auth_pubkey(&self) -> Option<&String> {
match &self.auth {
AuthPubkey(pubkey) => Some(pubkey),
_ => None,
}
}
#[must_use]
pub fn auth_challenge(&self) -> Option<&String> {
match &self.auth {
Challenge(pubkey) => Some(pubkey),
_ => None,
}
}
/// Add a new subscription for this connection.
/// # Errors
///
@@ -116,4 +151,79 @@ impl ClientConn {
self.get_client_prefix(),
);
}
pub fn generate_auth_challenge(&mut self) {
self.auth = Challenge(Uuid::new_v4().to_string());
}
pub fn authenticate(&mut self, event: &Event, relay_url: &String) -> Result<()> {
match &self.auth {
Challenge(_) => (),
AuthPubkey(_) => {
// already authenticated
return Ok(())
},
NoAuth => {
// unexpected AUTH request
return Err(Error::AuthFailure);
},
}
match event.validate() {
Ok(_) => {
if event.kind != 22242 {
return Err(Error::AuthFailure);
}
let curr_time = unix_time();
let past_cutoff = curr_time - 600; // 10 minutes
let future_cutoff = curr_time + 600; // 10 minutes
if event.created_at < past_cutoff || event.created_at > future_cutoff {
return Err(Error::AuthFailure);
}
let mut challenge: Option<&String> = None;
let mut relay: Option<&String> = None;
for tag in &event.tags {
if tag.len() == 2 && tag.get(0) == Some(&"challenge".into()) {
challenge = tag.get(1);
}
if tag.len() == 2 && tag.get(0) == Some(&"relay".into()) {
relay = tag.get(1);
}
}
match (challenge, &self.auth) {
(Some(received_challenge), Challenge(sent_challenge)) => {
if received_challenge != sent_challenge {
return Err(Error::AuthFailure);
}
}
(_, _) => {
return Err(Error::AuthFailure);
}
}
match (relay.and_then(|url| host_str(url)), host_str(relay_url)) {
(Some(received_relay), Some(our_relay)) => {
if received_relay != our_relay {
return Err(Error::AuthFailure);
}
}
(_, _) => {
return Err(Error::AuthFailure);
}
}
self.auth = AuthPubkey(event.pubkey.clone());
trace!(
"authenticated pubkey {} (cid: {})",
event.pubkey.chars().take(8).collect::<String>(),
self.get_client_prefix()
);
Ok(())
}
Err(_) => Err(Error::AuthFailure),
}
}
}

View File

@@ -4,6 +4,7 @@ use crate::error::{Error, Result};
use crate::event::Event;
use crate::notice::Notice;
use crate::server::NostrMetrics;
use crate::nauthz;
use governor::clock::Clock;
use governor::{Quota, RateLimiter};
use r2d2;
@@ -26,6 +27,10 @@ pub type PooledConnection = r2d2::PooledConnection<r2d2_sqlite::SqliteConnection
pub struct SubmittedEvent {
pub event: Event,
pub notice_tx: tokio::sync::mpsc::Sender<Notice>,
pub source_ip: String,
pub origin: Option<String>,
pub user_agent: Option<String>,
pub auth_pubkey: Option<Vec<u8>>,
}
/// Database file
@@ -66,6 +71,8 @@ async fn build_postgres_pool(settings: &Settings, metrics: NostrMetrics) -> Post
// Panic on migration failure
let version = repo.migrate_up().await.unwrap();
info!("Postgres migration completed, at v{}", version);
// startup scheduled tasks
repo.start().await.ok();
repo
}
@@ -100,6 +107,18 @@ pub async fn db_writer(
lim_opt = Some(RateLimiter::direct(Quota::per_minute(quota)));
}
}
// create a client if GRPC is enabled.
// Check with externalized event admitter service, if one is defined.
let mut grpc_client = if let Some(svr) = settings.grpc.event_admission_server {
Some(nauthz::EventAuthzService::connect(&svr).await)
} else {
None
};
//let gprc_client = settings.grpc.event_admission_server.map(|s| {
// event_admitter_connect(&s);
// });
loop {
if shutdown.try_recv().is_ok() {
info!("shutting down database writer");
@@ -164,9 +183,16 @@ pub async fn db_writer(
metadata_tx.send(event.clone()).ok();
}
// get a validation result for use in verification and GPRC
let validation = if nip05_active {
Some(repo.get_latest_user_verification(&event.pubkey).await)
} else {
None
};
// check for NIP-05 verification
if nip05_enabled {
match repo.get_latest_user_verification(&event.pubkey).await {
if nip05_enabled && validation.is_some() {
match validation.as_ref().unwrap() {
Ok(uv) => {
if uv.is_valid(&settings.verified_users) {
info!(
@@ -174,6 +200,7 @@ pub async fn db_writer(
uv.name.to_string(),
event.get_author_prefix()
);
} else {
info!(
"rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)",
@@ -208,6 +235,35 @@ pub async fn db_writer(
}
}
}
// nip05 address
let nip05_address : Option<crate::nip05::Nip05Name> = validation.and_then(|x| x.ok().map(|y| y.name));
// GRPC check
if let Some(ref mut c) = grpc_client {
trace!("checking if grpc permits");
let grpc_start = Instant::now();
let decision_res = c.admit_event(&event, &subm_event.source_ip, subm_event.origin, subm_event.user_agent, nip05_address, subm_event.auth_pubkey).await;
match decision_res {
Ok(decision) => {
if !decision.permitted() {
// GPRC returned a decision to reject this event
info!("GRPC rejected event: {:?} (kind: {}) from: {:?} in: {:?} (IP: {:?})",
event.get_event_id_prefix(),
event.kind,
event.get_author_prefix(),
grpc_start.elapsed(),
subm_event.source_ip);
notice_tx.try_send(Notice::blocked(event.id, &decision.message().unwrap_or_else(|| "".to_string()))).ok();
continue;
}
},
Err(e) => {
warn!("GRPC server error: {:?}", e);
}
}
}
// TODO: cache recent list of authors to remove a DB call.
let start = Instant::now();
if event.is_ephemeral() {
@@ -227,11 +283,12 @@ pub async fn db_writer(
notice_tx.try_send(Notice::duplicate(event.id)).ok();
} else {
info!(
"persisted event: {:?} (kind: {}) from: {:?} in: {:?}",
"persisted event: {:?} (kind: {}) from: {:?} in: {:?} (IP: {:?})",
event.get_event_id_prefix(),
event.kind,
event.get_author_prefix(),
start.elapsed()
start.elapsed(),
subm_event.source_ip,
);
event_write = true;
// send this out to all clients

View File

@@ -108,7 +108,7 @@ impl ConditionQuery {
sigstr: &str,
) -> Option<ConditionQuery> {
// form the token
let tok = format!("nostr:delegation:{}:{}", delegatee, cond_query);
let tok = format!("nostr:delegation:{delegatee}:{cond_query}");
// form SHA256 hash
let digest: sha256::Hash = sha256::Hash::hash(tok.as_bytes());
let sig = schnorr::Signature::from_str(sigstr).unwrap();

View File

@@ -62,6 +62,16 @@ pub enum Error {
HexError(hex::FromHexError),
#[error("Delegation parse error")]
DelegationParseError,
#[error("Channel closed error")]
ChannelClosed,
#[error("Authz error")]
AuthzError,
#[error("Tonic GRPC error")]
TonicError(tonic::Status),
#[error("Invalid AUTH message")]
AuthFailure,
#[error("I/O Error")]
IoError(std::io::Error),
#[error("Unknown/Undocumented")]
UnknownError,
}
@@ -130,3 +140,16 @@ impl From<config::ConfigError> for Error {
Error::ConfigError(r)
}
}
impl From<tonic::Status> for Error {
/// Wrap Config error
fn from(r: tonic::Status) -> Self {
Error::TonicError(r)
}
}
impl From<std::io::Error> for Error {
fn from(r: std::io::Error) -> Self {
Error::IoError(r)
}
}

View File

@@ -14,6 +14,8 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::str::FromStr;
use tracing::{debug, info};
use crate::event::EventWrapper::WrappedEvent;
use crate::event::EventWrapper::WrappedAuth;
lazy_static! {
/// Secp256k1 verification instance.
@@ -83,17 +85,27 @@ where
}
}
pub enum EventWrapper {
WrappedEvent(Event),
WrappedAuth(Event)
}
/// Convert network event to parsed/validated event.
impl From<EventCmd> for Result<Event> {
fn from(ec: EventCmd) -> Result<Event> {
impl From<EventCmd> for Result<EventWrapper> {
fn from(ec: EventCmd) -> Result<EventWrapper> {
// ensure command is correct
if ec.cmd == "EVENT" {
ec.event.validate().map(|_| {
let mut e = ec.event;
e.build_index();
e.update_delegation();
e
WrappedEvent(e)
})
} else if ec.cmd == "AUTH" {
// we don't want to validate the event here, because NIP-42 can be disabled
// it will be validated later during the authentication process
Ok(WrappedAuth(ec.event))
} else {
Err(CommandUnknownError)
}
@@ -125,6 +137,28 @@ impl Event {
self.kind >= 20000 && self.kind < 30000
}
/// Is this event currently expired?
pub fn is_expired(&self) -> bool {
if let Some(exp) = self.expiration() {
exp <= unix_time()
} else {
false
}
}
/// Determine the time at which this event should expire
pub fn expiration(&self) -> Option<u64> {
let default = "".to_string();
let dvals:Vec<&String> = self.tags
.iter()
.filter(|x| !x.is_empty())
.filter(|x| x.get(0).unwrap() == "expiration")
.map(|x| x.get(1).unwrap_or(&default)).take(1)
.collect();
let val_first = dvals.get(0);
val_first.and_then(|t| t.parse::<u64>().ok())
}
/// Should this event be replaced with newer timestamps from same author?
#[must_use] pub fn is_replaceable(&self) -> bool {
self.kind == 0 || self.kind == 3 || self.kind == 41 || (self.kind >= 10000 && self.kind < 20000)
@@ -135,17 +169,15 @@ impl Event {
self.kind >= 30000 && self.kind < 40000
}
/// What is the replaceable `d` tag value?
/// Should this event be replaced with newer timestamps from same author, for distinct `d` tag values?
#[must_use] pub fn distinct_param(&self) -> Option<String> {
if self.is_param_replaceable() {
let default = "".to_string();
let dvals:Vec<&String> = self.tags
.iter()
.filter(|x| x.len() >= 1)
.filter(|x| !x.is_empty())
.filter(|x| x.get(0).unwrap() == "d")
.map(|x| x.get(1).unwrap_or_else(|| &default)).take(1)
.map(|x| x.get(1).unwrap_or(&default)).take(1)
.collect();
let dval_first = dvals.get(0);
match dval_first {
@@ -292,7 +324,7 @@ impl Event {
let c = c_opt.unwrap();
// * compute the sha256sum.
let digest: sha256::Hash = sha256::Hash::hash(c.as_bytes());
let hex_digest = format!("{:x}", digest);
let hex_digest = format!("{digest:x}");
// * ensure the id matches the computed sha256sum.
if self.id != hex_digest {
debug!("event id does not match digest");
@@ -315,7 +347,7 @@ impl Event {
}
/// Convert event to canonical representation for signing.
fn to_canonical(&self) -> Option<String> {
pub fn to_canonical(&self) -> Option<String> {
// create a JsonValue for each event element
let mut c: Vec<Value> = vec![];
// id must be set to 0
@@ -654,4 +686,85 @@ mod tests {
assert_eq!(event.distinct_param(), Some("".to_string()));
}
#[test]
fn expiring_event_none() {
// regular events do not expire
let mut event = Event::simple_event();
event.kind = 7;
event.tags = vec![
vec!["test".to_string(), "foo".to_string()],
];
assert_eq!(event.expiration(), None);
}
#[test]
fn expiring_event_empty() {
// regular events do not expire
let mut event = Event::simple_event();
event.kind = 7;
event.tags = vec![
vec!["expiration".to_string()],
];
assert_eq!(event.expiration(), None);
}
#[test]
fn expiring_event_future() {
// a normal expiring event
let exp:u64 = 1676264138;
let mut event = Event::simple_event();
event.kind = 1;
event.tags = vec![
vec!["expiration".to_string(), exp.to_string()],
];
assert_eq!(event.expiration(), Some(exp));
}
#[test]
fn expiring_event_negative() {
// expiration set to a negative value (invalid)
let exp:i64 = -90;
let mut event = Event::simple_event();
event.kind = 1;
event.tags = vec![
vec!["expiration".to_string(), exp.to_string()],
];
assert_eq!(event.expiration(), None);
}
#[test]
fn expiring_event_zero() {
// a normal expiring event set to zero
let exp:i64 = 0;
let mut event = Event::simple_event();
event.kind = 1;
event.tags = vec![
vec!["expiration".to_string(), exp.to_string()],
];
assert_eq!(event.expiration(), Some(0));
}
#[test]
fn expiring_event_fraction() {
// expiration is fractional (invalid)
let exp:f64 = 23.334;
let mut event = Event::simple_event();
event.kind = 1;
event.tags = vec![
vec!["expiration".to_string(), exp.to_string()],
];
assert_eq!(event.expiration(), None);
}
#[test]
fn expiring_event_multiple() {
// multiple values, we just take the first
let mut event = Event::simple_event();
event.kind = 1;
event.tags = vec![
vec!["expiration".to_string(), (10).to_string()],
vec!["expiration".to_string(), (20).to_string()],
];
assert_eq!(event.expiration(), Some(10));
}
}

View File

@@ -1,6 +1,6 @@
//! Relay metadata using NIP-11
/// Relay Info
use crate::config;
use crate::config::Settings;
use serde::{Deserialize, Serialize};
pub const CARGO_PKG_VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION");
@@ -27,15 +27,24 @@ pub struct RelayInfo {
}
/// Convert an Info configuration into public Relay Info
impl From<config::Info> for RelayInfo {
fn from(i: config::Info) -> Self {
impl From<Settings> for RelayInfo {
fn from(c: Settings) -> Self {
let mut supported_nips = vec![1, 2, 9, 11, 12, 15, 16, 20, 22, 33, 40, 42];
if c.authorization.nip42_auth {
supported_nips.push(42);
supported_nips.sort();
}
let i = c.info;
RelayInfo {
id: i.relay_url,
name: i.name,
description: i.description,
pubkey: i.pubkey,
contact: i.contact,
supported_nips: Some(vec![1, 2, 9, 11, 12, 15, 16, 20, 22]),
supported_nips: Some(supported_nips),
software: Some("https://git.sr.ht/~gheartsfield/nostr-rs-relay".to_owned()),
version: CARGO_PKG_VERSION.map(std::borrow::ToOwned::to_owned),
}

View File

@@ -9,6 +9,7 @@ pub mod event;
pub mod hexrange;
pub mod info;
pub mod nip05;
pub mod nauthz;
pub mod notice;
pub mod repo;
pub mod subscription;

View File

@@ -11,9 +11,14 @@ use console_subscriber::ConsoleLayer;
/// Start running a Nostr relay server.
fn main() {
// configure settings from config.toml
// replace default settings with those read from config.toml
let mut settings = config::Settings::new();
let args = CLIArgs::parse();
// get config file name from args
let config_file_arg = args.config;
// configure settings from the config file (defaults to config.toml)
// replace default settings with those read from the config file
let mut settings = config::Settings::new(&config_file_arg);
// setup tracing
if settings.diagnostics.tracing {
@@ -25,8 +30,6 @@ fn main() {
}
info!("Starting up from main");
let args = CLIArgs::parse();
// get database directory from args
let db_dir_arg = args.db;

111
src/nauthz.rs Normal file
View File

@@ -0,0 +1,111 @@
use crate::error::{Error, Result};
use crate::{event::Event, nip05::Nip05Name};
use nauthz_grpc::authorization_client::AuthorizationClient;
use nauthz_grpc::event::TagEntry;
use nauthz_grpc::{Decision, Event as GrpcEvent, EventReply, EventRequest};
use tracing::{info, warn};
pub mod nauthz_grpc {
tonic::include_proto!("nauthz");
}
// A decision for the DB to act upon
pub trait AuthzDecision: Send + Sync {
fn permitted(&self) -> bool;
fn message(&self) -> Option<String>;
}
impl AuthzDecision for EventReply {
fn permitted(&self) -> bool {
self.decision == Decision::Permit as i32
}
fn message(&self) -> Option<String> {
self.message.clone()
}
}
// A connection to an event admission GRPC server
pub struct EventAuthzService {
server_addr: String,
conn: Option<AuthorizationClient<tonic::transport::Channel>>,
}
// conversion of Nip05Names into GRPC type
impl std::convert::From<Nip05Name> for nauthz_grpc::event_request::Nip05Name {
fn from(value: Nip05Name) -> Self {
nauthz_grpc::event_request::Nip05Name {
local: value.local.clone(),
domain: value.domain.clone(),
}
}
}
// conversion of event tags into gprc struct
fn tags_to_protobuf(tags: &Vec<Vec<String>>) -> Vec<TagEntry> {
tags.iter()
.map(|x| TagEntry { values: x.clone() })
.collect()
}
impl EventAuthzService {
pub async fn connect(server_addr: &str) -> EventAuthzService {
let mut eas = EventAuthzService {
server_addr: server_addr.to_string(),
conn: None,
};
eas.ready_connection().await;
eas
}
pub async fn ready_connection(self: &mut Self) {
if self.conn.is_none() {
let client = AuthorizationClient::connect(self.server_addr.to_string()).await;
if let Err(ref msg) = client {
warn!("could not connect to nostr authz GRPC server: {:?}", msg);
} else {
info!("connected to nostr authorization GRPC server");
}
self.conn = client.ok();
}
}
pub async fn admit_event(
self: &mut Self,
event: &Event,
ip: &str,
origin: Option<String>,
user_agent: Option<String>,
nip05: Option<Nip05Name>,
auth_pubkey: Option<Vec<u8>>
) -> Result<Box<dyn AuthzDecision>> {
self.ready_connection().await;
let id_blob = hex::decode(&event.id)?;
let pubkey_blob = hex::decode(&event.pubkey)?;
let sig_blob = hex::decode(&event.sig)?;
if let Some(ref mut c) = self.conn {
let gevent = GrpcEvent {
id: id_blob,
pubkey: pubkey_blob,
sig: sig_blob,
created_at: event.created_at,
kind: event.kind,
content: event.content.clone(),
tags: tags_to_protobuf(&event.tags),
};
let svr_res = c
.event_admit(EventRequest {
event: Some(gevent),
ip_addr: Some(ip.to_string()),
origin,
user_agent,
auth_pubkey,
nip05: nip05.map(|x| nauthz_grpc::event_request::Nip05Name::from(x)),
})
.await?;
let reply = svr_res.into_inner();
return Ok(Box::new(reply));
} else {
return Err(Error::AuthzError);
}
}
}

View File

@@ -42,8 +42,8 @@ pub struct Verifier {
/// A NIP-05 identifier is a local part and domain.
#[derive(PartialEq, Eq, Debug, Clone)]
pub struct Nip05Name {
local: String,
domain: String,
pub local: String,
pub domain: String,
}
impl Nip05Name {
@@ -107,7 +107,7 @@ impl std::fmt::Display for Nip05Name {
/// Check if the specified username and address are present and match in this response body
fn body_contains_user(username: &str, address: &str, bytes: &hyper::body::Bytes) -> Result<bool> {
// convert the body into json
let body: serde_json::Value = serde_json::from_slice(&bytes)?;
let body: serde_json::Value = serde_json::from_slice(bytes)?;
// ensure we have a names object.
let names_map = body
.as_object()
@@ -257,8 +257,15 @@ impl Verifier {
// run a loop, restarting on failure
loop {
let res = self.run_internal().await;
if let Err(e) = res {
match res {
Err(Error::ChannelClosed) => {
// channel was closed, we are shutting down
return;
},
Err(e) => {
info!("error in verifier: {:?}", e);
},
_ => {}
}
}
}
@@ -305,6 +312,7 @@ impl Verifier {
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
info!("metadata broadcast channel closed");
return Err(Error::ChannelClosed);
}
}
},

View File

@@ -16,6 +16,7 @@ pub struct EventResult {
pub enum Notice {
Message(String),
EventResult(EventResult),
AuthChallenge(String)
}
impl EventResultStatus {

View File

@@ -15,11 +15,11 @@ use sqlx::Error::RowNotFound;
use crate::hexrange::{hex_range, HexSearch};
use crate::repo::postgres_migration::run_migrations;
use crate::server::NostrMetrics;
use crate::utils::{is_hex, is_lower_hex};
use crate::utils::{is_hex, is_lower_hex, self};
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot::Receiver;
use tracing::log::trace;
use tracing::{debug, error, info};
use tracing::{debug, error, warn, info};
use crate::error;
pub type PostgresPool = sqlx::pool::Pool<Postgres>;
@@ -36,13 +36,52 @@ impl PostgresRepo {
metrics: m,
}
}
}
/// Cleanup expired events on a regular basis
async fn cleanup_expired(conn: PostgresPool, frequency: Duration) -> Result<()> {
tokio::task::spawn(async move {
loop {
tokio::select! {
_ = tokio::time::sleep(frequency) => {
let start = Instant::now();
let exp_res = delete_expired(conn.clone()).await;
match exp_res {
Ok(exp_count) => {
if exp_count > 0 {
info!("removed {} expired events in: {:?}", exp_count, start.elapsed());
}
},
Err(e) => {
warn!("could not remove expired events due to error: {:?}", e);
}
}
}
};
}
});
Ok(())
}
/// One-time deletion of all expired events
async fn delete_expired(conn:PostgresPool) -> Result<u64> {
let mut tx = conn.begin().await?;
let update_count = sqlx::query("DELETE FROM \"event\" WHERE expires_at <= $1;")
.bind(Utc.timestamp_opt(utils::unix_time() as i64, 0).unwrap())
.execute(&mut tx)
.await?.rows_affected();
tx.commit().await?;
Ok(update_count)
}
#[async_trait]
impl NostrRepo for PostgresRepo {
async fn start(&self) -> Result<()> {
info!("not implemented");
// begin a cleanup task for expired events.
cleanup_expired(self.conn.clone(), Duration::from_secs(600)).await?;
Ok(())
}
@@ -66,7 +105,7 @@ impl NostrRepo for PostgresRepo {
// replaceable event or parameterized replaceable event.
if e.is_replaceable() {
let repl_count = sqlx::query(
"SELECT e.id FROM event e WHERE e.pub_key=? AND e.kind=? AND e.created_at >= ? LIMIT 1;")
"SELECT e.id FROM event e WHERE e.pub_key=$1 AND e.kind=$2 AND e.created_at >= $3 LIMIT 1;")
.bind(&pubkey_blob)
.bind(e.kind as i64)
.bind(Utc.timestamp_opt(e.created_at as i64, 0).unwrap())
@@ -77,26 +116,25 @@ impl NostrRepo for PostgresRepo {
}
}
if let Some(d_tag) = e.distinct_param() {
let repl_count:i64;
if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) {
repl_count = sqlx::query_scalar(
let repl_count:i64 = if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) {
sqlx::query_scalar(
"SELECT count(*) AS count FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.pub_key=$1 AND e.kind=$2 AND t.name='d' AND t.value_hex=$3 AND e.created_at >= $4 LIMIT 1;")
.bind(hex::decode(&e.pubkey).ok())
.bind(e.kind as i64)
.bind(hex::decode(d_tag).ok())
.bind(Utc.timestamp_opt(e.created_at as i64, 0).unwrap())
.fetch_one(&mut tx)
.await?;
.await?
} else {
repl_count = sqlx::query_scalar(
sqlx::query_scalar(
"SELECT count(*) AS count FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.pub_key=$1 AND e.kind=$2 AND t.name='d' AND t.value=$3 AND e.created_at >= $4 LIMIT 1;")
.bind(hex::decode(&e.pubkey).ok())
.bind(e.kind as i64)
.bind(d_tag.as_bytes())
.bind(Utc.timestamp_opt(e.created_at as i64, 0).unwrap())
.fetch_one(&mut tx)
.await?;
}
.await?
};
// if any rows were returned, then some newer event with
// the same author/kind/tag value exist, and we can ignore
// this event.
@@ -107,13 +145,14 @@ impl NostrRepo for PostgresRepo {
// ignore if the event hash is a duplicate.
let mut ins_count = sqlx::query(
r#"INSERT INTO "event"
(id, pub_key, created_at, kind, "content", delegated_by)
VALUES($1, $2, $3, $4, $5, $6)
(id, pub_key, created_at, expires_at, kind, "content", delegated_by)
VALUES($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (id) DO NOTHING"#,
)
.bind(&id_blob)
.bind(&pubkey_blob)
.bind(Utc.timestamp_opt(e.created_at as i64, 0).unwrap())
.bind(e.expiration().and_then(|x| Utc.timestamp_opt(x as i64, 0).latest()))
.bind(e.kind as i64)
.bind(event_str.into_bytes())
.bind(delegator_blob)
@@ -135,20 +174,20 @@ ON CONFLICT (id) DO NOTHING"#,
let tag_val = &tag[1];
// only single-char tags are searchable
let tag_char_opt = single_char_tagname(tag_name);
let query = "INSERT INTO tag (event_id, \"name\", value) VALUES($1, $2, $3) \
ON CONFLICT (event_id, \"name\", value) DO NOTHING";
match &tag_char_opt {
Some(_) => {
// if tag value is lowercase hex;
if is_lower_hex(tag_val) && (tag_val.len() % 2 == 0) {
sqlx::query(query)
sqlx::query("INSERT INTO tag (event_id, \"name\", value, value_hex) VALUES($1, $2, NULL, $3) \
ON CONFLICT (event_id, \"name\", value, value_hex) DO NOTHING")
.bind(&id_blob)
.bind(tag_name)
.bind(hex::decode(tag_val).ok())
.execute(&mut tx)
.await?;
} else {
sqlx::query(query)
sqlx::query("INSERT INTO tag (event_id, \"name\", value, value_hex) VALUES($1, $2, $3, NULL) \
ON CONFLICT (event_id, \"name\", value, value_hex) DO NOTHING")
.bind(&id_blob)
.bind(tag_name)
.bind(tag_val.as_bytes())
@@ -178,22 +217,21 @@ ON CONFLICT (id) DO NOTHING"#,
// parameterized replaceable events
// check for parameterized replaceable events that would be hidden; don't insert these either.
if let Some(d_tag) = e.distinct_param() {
let update_count;
if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) {
update_count = sqlx::query("DELETE FROM event WHERE kind=$1 AND pub_key=$2 AND id NOT IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=$1 AND e.pub_key=$2 AND t.name='d' AND t.value_hex=$3 ORDER BY created_at DESC LIMIT 1);")
let update_count = if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) {
sqlx::query("DELETE FROM event WHERE kind=$1 AND pub_key=$2 AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=$1 AND e.pub_key=$2 AND t.name='d' AND t.value_hex=$3 ORDER BY created_at DESC OFFSET 1);")
.bind(e.kind as i64)
.bind(hex::decode(&e.pubkey).ok())
.bind(hex::decode(d_tag).ok())
.execute(&mut tx)
.await?.rows_affected();
.await?.rows_affected()
} else {
update_count = sqlx::query("DELETE FROM event WHERE kind=$1 AND pub_key=$2 AND id NOT IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=$1 AND e.pub_key=$2 AND t.name='d' AND t.value=$3 ORDER BY created_at DESC LIMIT 1);")
sqlx::query("DELETE FROM event WHERE kind=$1 AND pub_key=$2 AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=$1 AND e.pub_key=$2 AND t.name='d' AND t.value=$3 ORDER BY created_at DESC OFFSET 1);")
.bind(e.kind as i64)
.bind(hex::decode(&e.pubkey).ok())
.bind(d_tag.as_bytes())
.execute(&mut tx)
.await?.rows_affected();
}
.await?.rows_affected()
};
if update_count > 0 {
info!(
"removed {} older parameterized replaceable kind {} events for author: {:?}",
@@ -512,6 +550,7 @@ fn query_from_filter(f: &ReqFilter) -> Option<QueryBuilder<Postgres>> {
let mut query = QueryBuilder::new("SELECT e.\"content\", e.created_at FROM \"event\" e WHERE ");
// This tracks whether we need to push a prefix AND before adding another clause
let mut push_and = false;
// Query for "authors", allowing prefix matches
if let Some(auth_vec) = &f.authors {
@@ -709,6 +748,10 @@ fn query_from_filter(f: &ReqFilter) -> Option<QueryBuilder<Postgres>> {
} else {
query.push("e.hidden != 1::bit(1)");
}
// never display expired events
query
.push(" AND (e.expires_at IS NULL OR e.expires_at > ")
.push_bind(Utc.timestamp_opt(utils::unix_time() as i64, 0).unwrap()).push(")");
// Apply per-filter limit to this query.
// The use of a LIMIT implies a DESC order, to capture only the most recent events.

View File

@@ -34,11 +34,16 @@ pub async fn run_migrations(db: &PostgresPool) -> crate::error::Result<usize> {
if m002_result == MigrationResult::Upgraded {
m002::rebuild_tags(db).await?;
}
run_migration(m003::migration(), db).await;
run_migration(m004::migration(), db).await;
Ok(current_version(db).await as usize)
}
async fn current_version(db: &PostgresPool) -> i64 {
sqlx::query_scalar("SELECT max(serial_number) FROM migrations;").fetch_one(db).await.unwrap()
sqlx::query_scalar("SELECT max(serial_number) FROM migrations;")
.fetch_one(db)
.await
.unwrap()
}
async fn prepare_migrations_table(db: &PostgresPool) {
@@ -77,7 +82,7 @@ async fn run_migration(migration: impl Migration, db: &PostgresPool) -> Migratio
.unwrap();
transaction.commit().await.unwrap();
return MigrationResult::Upgraded;
MigrationResult::Upgraded
}
mod m001 {
@@ -137,15 +142,15 @@ CREATE INDEX user_verification_name_idx ON user_verification USING btree (name);
}
mod m002 {
use async_std::stream::StreamExt;
use indicatif::{ProgressBar, ProgressStyle};
use sqlx::Row;
use std::time::Instant;
use tracing::info;
use async_std::stream::StreamExt;
use sqlx::Row;
use indicatif::{ProgressBar, ProgressStyle};
use crate::repo::postgres_migration::{Migration, SimpleSqlMigration};
use crate::event::{single_char_tagname, Event};
use crate::repo::postgres::PostgresPool;
use crate::event::{Event, single_char_tagname};
use crate::repo::postgres_migration::{Migration, SimpleSqlMigration};
use crate::utils::is_lower_hex;
pub const VERSION: i64 = 2;
@@ -172,23 +177,31 @@ CREATE INDEX tag_value_hex_idx ON tag USING btree (value_hex);
let mut tx = db.begin().await.unwrap();
let mut update_tx = db.begin().await.unwrap();
// Clear out table
sqlx::query("DELETE FROM tag;").execute(&mut update_tx).await?;
sqlx::query("DELETE FROM tag;")
.execute(&mut update_tx)
.await?;
{
let event_count: i64 =
sqlx::query_scalar("SELECT COUNT(*) from event;")
let event_count: i64 = sqlx::query_scalar("SELECT COUNT(*) from event;")
.fetch_one(&mut tx)
.await
.unwrap();
let bar = ProgressBar::new(event_count.try_into().unwrap()).with_message("rebuilding tags table");
bar.set_style(ProgressStyle::with_template("[{elapsed_precise}] {bar:40.white/blue} {pos:>7}/{len:7} [{percent}%] {msg}").unwrap());
let mut events = sqlx::query("SELECT id, content FROM event ORDER BY id;").fetch(&mut tx);
let bar = ProgressBar::new(event_count.try_into().unwrap())
.with_message("rebuilding tags table");
bar.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] {bar:40.white/blue} {pos:>7}/{len:7} [{percent}%] {msg}",
)
.unwrap(),
);
let mut events =
sqlx::query("SELECT id, content FROM event ORDER BY id;").fetch(&mut tx);
while let Some(row) = events.next().await {
bar.inc(1);
// get the row id and content
let row = row.unwrap();
let event_id: Vec<u8> = row.get(0);
let event_bytes: Vec<u8> = row.get(1);
let event:Event = serde_json::from_str(&String::from_utf8(event_bytes).unwrap())?;
let event: Event = serde_json::from_str(&String::from_utf8(event_bytes).unwrap())?;
for t in event.tags.iter().filter(|x| x.len() > 1) {
let tagname = t.get(0).unwrap();
@@ -201,13 +214,22 @@ CREATE INDEX tag_value_hex_idx ON tag USING btree (value_hex);
// insert as BLOB if we can restore it losslessly.
// this means it needs to be even length and lowercase.
if (tagval.len() % 2 == 0) && is_lower_hex(tagval) {
let q = "INSERT INTO tag (event_id, \"name\", value_hex) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;";
sqlx::query(q).bind(&event_id).bind(&tagname).bind(hex::decode(tagval).ok()).execute(&mut update_tx).await?;
let q = "INSERT INTO tag (event_id, \"name\", value, value_hex) VALUES ($1, $2, NULL, $3) ON CONFLICT DO NOTHING;";
sqlx::query(q)
.bind(&event_id)
.bind(tagname)
.bind(hex::decode(tagval).ok())
.execute(&mut update_tx)
.await?;
} else {
let q = "INSERT INTO tag (event_id, \"name\", value) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;";
sqlx::query(q).bind(&event_id).bind(&tagname).bind(tagval.as_bytes()).execute(&mut update_tx).await?;
let q = "INSERT INTO tag (event_id, \"name\", value, value_hex) VALUES ($1, $2, $3, NULL) ON CONFLICT DO NOTHING;";
sqlx::query(q)
.bind(&event_id)
.bind(tagname)
.bind(tagval.as_bytes())
.execute(&mut update_tx)
.await?;
}
}
}
update_tx.commit().await?;
@@ -217,3 +239,41 @@ CREATE INDEX tag_value_hex_idx ON tag USING btree (value_hex);
Ok(())
}
}
mod m003 {
use crate::repo::postgres_migration::{Migration, SimpleSqlMigration};
pub const VERSION: i64 = 3;
pub fn migration() -> impl Migration {
SimpleSqlMigration {
serial_number: VERSION,
sql: vec![
r#"
-- Add unique constraint on tag
ALTER TABLE tag ADD CONSTRAINT unique_constraint_name UNIQUE (event_id, "name", value, value_hex);
"#,
],
}
}
}
mod m004 {
use crate::repo::postgres_migration::{Migration, SimpleSqlMigration};
pub const VERSION: i64 = 4;
pub fn migration() -> impl Migration {
SimpleSqlMigration {
serial_number: VERSION,
sql: vec![
r#"
-- Add expiration time for events
ALTER TABLE event ADD COLUMN expires_at timestamp(0) with time zone;
-- Index expiration time
CREATE INDEX event_expires_at_idx ON "event" (expires_at);
"#,
],
}
}
}

View File

@@ -1,12 +1,12 @@
//! Event persistence and querying
//use crate::config::SETTINGS;
use crate::config::Settings;
use crate::error::Result;
use crate::error::{Result,Error::SqlError};
use crate::event::{single_char_tagname, Event};
use crate::hexrange::hex_range;
use crate::hexrange::HexSearch;
use crate::repo::sqlite_migration::{STARTUP_SQL,upgrade_db};
use crate::utils::{is_hex, is_lower_hex};
use crate::utils::{is_hex,unix_time};
use crate::nip05::{Nip05Name, VerificationRecord};
use crate::subscription::{ReqFilter, Subscription};
use crate::server::NostrMetrics;
@@ -123,16 +123,9 @@ impl SqliteRepo {
}
// check for parameterized replaceable events that would be hidden; don't insert these either.
if let Some(d_tag) = e.distinct_param() {
let repl_count;
if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) {
repl_count = tx.query_row(
"SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND e.kind=? AND t.name='d' AND t.value_hex=? AND e.created_at >= ? LIMIT 1;",
params![pubkey_blob, e.kind, hex::decode(d_tag).ok(), e.created_at],|row| row.get::<usize, usize>(0));
} else {
repl_count = tx.query_row(
"SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND e.kind=? AND t.name='d' AND t.value=? AND e.created_at >= ? LIMIT 1;",
params![pubkey_blob, e.kind, d_tag, e.created_at],|row| row.get::<usize, usize>(0));
}
let repl_count = tx.query_row(
"SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND e.kind=? AND t.name='d' AND t.value=? AND e.created_at >= ? LIMIT 1;",
params![pubkey_blob, e.kind, d_tag, e.created_at],|row| row.get::<usize, usize>(0));
// if any rows were returned, then some newer event with
// the same author/kind/tag value exist, and we can ignore
// this event.
@@ -142,8 +135,8 @@ impl SqliteRepo {
}
// ignore if the event hash is a duplicate.
let mut ins_count = tx.execute(
"INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, delegated_by, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%s','now'), FALSE);",
params![id_blob, e.created_at, e.kind, pubkey_blob, delegator_blob, event_str]
"INSERT OR IGNORE INTO event (event_hash, created_at, expires_at, kind, author, delegated_by, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, strftime('%s','now'), FALSE);",
params![id_blob, e.created_at, e.expiration(), e.kind, pubkey_blob, delegator_blob, event_str]
)? as u64;
if ins_count == 0 {
// if the event was a duplicate, no need to insert event or
@@ -163,18 +156,10 @@ impl SqliteRepo {
let tagchar_opt = single_char_tagname(tagname);
match &tagchar_opt {
Some(_) => {
// if tagvalue is lowercase hex;
if is_lower_hex(tagval) && (tagval.len() % 2 == 0) {
tx.execute(
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
params![ev_id, &tagname, hex::decode(tagval).ok()],
)?;
} else {
tx.execute(
"INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)",
params![ev_id, &tagname, &tagval],
)?;
}
tx.execute(
"INSERT OR IGNORE INTO tag (event_id, name, value, kind, created_at) VALUES (?1, ?2, ?3, ?4, ?5)",
params![ev_id, &tagname, &tagval, e.kind, e.created_at],
)?;
}
None => {}
}
@@ -201,16 +186,9 @@ impl SqliteRepo {
}
// if this event is parameterized replaceable, remove other events.
if let Some(d_tag) = e.distinct_param() {
let update_count;
if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) {
update_count = tx.execute(
"DELETE FROM event WHERE kind=? AND author=? AND id NOT IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=? AND e.author=? AND t.name='d' AND t.value_hex=? ORDER BY created_at DESC LIMIT 1);",
params![e.kind, pubkey_blob, e.kind, pubkey_blob, hex::decode(d_tag).ok()])?;
} else {
update_count = tx.execute(
"DELETE FROM event WHERE kind=? AND author=? AND id NOT IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=? AND e.author=? AND t.name='d' AND t.value=? ORDER BY created_at DESC LIMIT 1);",
params![e.kind, pubkey_blob, e.kind, pubkey_blob, d_tag])?;
}
let update_count = tx.execute(
"DELETE FROM event WHERE kind=? AND author=? AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=? AND e.author=? AND t.name='d' AND t.value=? ORDER BY t.created_at DESC LIMIT -1 OFFSET 1);",
params![e.kind, pubkey_blob, e.kind, pubkey_blob, d_tag])?;
if update_count > 0 {
info!(
"removed {} older parameterized replaceable kind {} events for author: {:?}",
@@ -245,8 +223,8 @@ impl SqliteRepo {
// check if a deletion has already been recorded for this event.
// Only relevant for non-deletion events
let del_count = tx.query_row(
"SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND t.name='e' AND e.kind=5 AND t.value_hex=? LIMIT 1;",
params![pubkey_blob, id_blob], |row| row.get::<usize, usize>(0));
"SELECT e.id FROM event e WHERE e.author=? AND e.id IN (SELECT t.event_id FROM tag t WHERE t.name='e' AND t.kind=5 AND t.value=?) LIMIT 1;",
params![pubkey_blob, e.id], |row| row.get::<usize, usize>(0));
// check if a the query returned a result, meaning we should
// hid the current event
if del_count.ok().is_some() {
@@ -273,7 +251,8 @@ impl SqliteRepo {
impl NostrRepo for SqliteRepo {
async fn start(&self) -> Result<()> {
db_checkpoint_task(self.maint_pool.clone(), Duration::from_secs(60), self.checkpoint_in_progress.clone()).await
db_checkpoint_task(self.maint_pool.clone(), Duration::from_secs(60), self.checkpoint_in_progress.clone()).await?;
cleanup_expired(self.maint_pool.clone(), Duration::from_secs(600), self.write_in_progress.clone()).await
}
async fn migrate_up(&self) -> Result<usize> {
@@ -286,6 +265,8 @@ impl NostrRepo for SqliteRepo {
/// Persist event to database
async fn write_event(&self, e: &Event) -> Result<u64> {
let start = Instant::now();
let max_write_attempts = 10;
let mut attempts = 0;
let _write_guard = self.write_in_progress.lock().await;
// spawn a blocking thread
//let mut conn = self.write_pool.get()?;
@@ -293,7 +274,26 @@ impl NostrRepo for SqliteRepo {
let e = e.clone();
let event_count = task::spawn_blocking(move || {
let mut conn = pool.get()?;
SqliteRepo::persist_event(&mut conn, &e)
// this could fail because the database was busy; try
// multiple times before giving up.
loop {
attempts+=1;
let wr = SqliteRepo::persist_event(&mut conn, &e);
match wr {
Err(SqlError(rusqlite::Error::SqliteFailure(e,_))) => {
// this basically means that NIP-05 or another
// writer was using the database between us
// reading and promoting the connection to a
// write lock.
info!("event write failed, DB locked (attempt: {}); sqlite err: {}",
attempts, e.extended_code);
},
_ => {return wr;},
}
if attempts >= max_write_attempts {
return wr;
}
}
}).await?;
self.metrics
.write_events
@@ -344,27 +344,36 @@ impl NostrRepo for SqliteRepo {
db_queue_time, client_id, sub.id
);
}
// check before getting a DB connection if the client still wants the results
if abandon_query_rx.try_recv().is_ok() {
debug!("query cancelled by client (before execution) (cid: {}, sub: {:?})", client_id, sub.id);
return Ok(());
}
let start = Instant::now();
let mut row_count: usize = 0;
// cutoff for displaying slow queries
let slow_cutoff = Duration::from_millis(250);
let mut filter_count = 0;
// remove duplicates from the filter list.
for filter in sub.filters.iter() {
let filter_start = Instant::now();
filter_count += 1;
let (q, p, idx) = query_from_filter(&filter);
let sql_gen_elapsed = start.elapsed();
if sql_gen_elapsed > Duration::from_millis(10) {
debug!("SQL (slow) generated in {:?}", filter_start.elapsed());
if let Ok(mut conn) = self.read_pool.get() {
{
let pool_state = self.read_pool.state();
metrics.db_connections.set((pool_state.connections - pool_state.idle_connections).into());
}
// any client that doesn't cause us to generate new rows in 5
// seconds gets dropped.
let abort_cutoff = Duration::from_secs(5);
let mut slow_first_event;
let mut last_successful_send = Instant::now();
if let Ok(mut conn) = self.read_pool.get() {
for filter in sub.filters.iter() {
let filter_start = Instant::now();
filter_count += 1;
let sql_gen_elapsed = filter_start.elapsed();
let (q, p, idx) = query_from_filter(filter);
if sql_gen_elapsed > Duration::from_millis(10) {
debug!("SQL (slow) generated in {:?}", filter_start.elapsed());
}
// any client that doesn't cause us to generate new rows in 2
// seconds gets dropped.
let abort_cutoff = Duration::from_secs(2);
let mut slow_first_event;
let mut last_successful_send = Instant::now();
// execute the query.
// make the actual SQL query (with parameters inserted) available
conn.trace(Some(|x| {trace!("SQL trace: {:?}", x)}));
@@ -384,8 +393,8 @@ impl NostrRepo for SqliteRepo {
// to reduce logging; only show 1/16th of clients (leading 0)
if slow_first_event && client_id.starts_with('0') {
debug!(
"filter first result (slow): {} (cid: {}, sub: {:?})",
serde_json::to_string(&filter)?, client_id, sub.id
"filter first result in {:?} (slow): {} (cid: {}, sub: {:?})",
first_event_elapsed, serde_json::to_string(&filter)?, client_id, sub.id
);
}
first_result = false;
@@ -432,7 +441,8 @@ impl NostrRepo for SqliteRepo {
return Ok(());
}
// give the queue a chance to clear before trying again
thread::sleep(Duration::from_millis(100));
debug!("query thread sleeping due to full query_tx (cid: {}, sub: {:?})", client_id, sub.id);
thread::sleep(Duration::from_millis(500));
}
// TODO: we could use try_send, but we'd have to juggle
// getting the query result back as part of the error
@@ -445,17 +455,20 @@ impl NostrRepo for SqliteRepo {
.ok();
last_successful_send = Instant::now();
}
} else {
warn!("Could not get a database connection for querying");
}
// if the filter took too much db_time, print out the JSON.
if filter_start.elapsed() > slow_cutoff && client_id.starts_with('0') {
debug!(
"query filter req (slow): {} (cid: {}, sub: {:?}, filter: {})",
serde_json::to_string(&filter)?, client_id, sub.id, filter_count
);
}
metrics
.query_db
.observe(filter_start.elapsed().as_secs_f64());
// if the filter took too much db_time, print out the JSON.
if filter_start.elapsed() > slow_cutoff && client_id.starts_with('0') {
debug!(
"query filter req (slow): {} (cid: {}, sub: {:?}, filter: {})",
serde_json::to_string(&filter)?, client_id, sub.id, filter_count
);
}
}
} else {
warn!("Could not get a database connection for querying");
}
drop(sem); // new query can begin
debug!(
@@ -706,8 +719,8 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
// check if the index needs to be overriden
let idx_name = override_index(f);
let idx_stmt = idx_name.as_ref().map_or_else(|| "".to_owned(), |i| format!("INDEXED BY {}",i));
let mut query = format!("SELECT e.content FROM event e {}", idx_stmt);
let idx_stmt = idx_name.as_ref().map_or_else(|| "".to_owned(), |i| format!("INDEXED BY {i}"));
let mut query = format!("SELECT e.content FROM event e {idx_stmt}");
// query parameters for SQLite
let mut params: Vec<Box<dyn ToSql>> = vec![];
@@ -790,61 +803,44 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
if let Some(map) = &f.tags {
for (key, val) in map.iter() {
let mut str_vals: Vec<Box<dyn ToSql>> = vec![];
let mut blob_vals: Vec<Box<dyn ToSql>> = vec![];
for v in val {
if (v.len() % 2 == 0) && is_lower_hex(v) {
if let Ok(h) = hex::decode(v) {
blob_vals.push(Box::new(h));
}
} else {
str_vals.push(Box::new(v.clone()));
}
str_vals.push(Box::new(v.clone()));
}
// do not mix value and value_hex; this is a temporary special case.
if str_vals.len() == 0 {
// create clauses with "?" params for each tag value being searched
let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
// find evidence of the target tag name/value existing for this event.
let tag_clause = format!(
"e.id IN (SELECT e.id FROM event e LEFT JOIN tag t on e.id=t.event_id WHERE hidden!=TRUE and (name=? AND {}))",
blob_clause
);
// add the tag name as the first parameter
params.push(Box::new(key.to_string()));
// add all tag values that are blobs as params
params.append(&mut blob_vals);
filter_components.push(tag_clause);
} else if blob_vals.len() == 0 {
// create clauses with "?" params for each tag value being searched
let str_clause = format!("value IN ({})", repeat_vars(str_vals.len()));
// find evidence of the target tag name/value existing for this event.
let tag_clause = format!(
"e.id IN (SELECT e.id FROM event e LEFT JOIN tag t on e.id=t.event_id WHERE hidden!=TRUE and (name=? AND {}))",
str_clause
);
// add the tag name as the first parameter
params.push(Box::new(key.to_string()));
// add all tag values that are blobs as params
params.append(&mut str_vals);
filter_components.push(tag_clause);
// create clauses with "?" params for each tag value being searched
let str_clause = format!("AND value IN ({})", repeat_vars(str_vals.len()));
// find evidence of the target tag name/value existing for this event.
// Query for Kind/Since/Until additionally, to reduce the number of tags that come back.
let kind_clause;
let since_clause;
let until_clause;
if let Some(ks) = &f.kinds {
// kind is number, no escaping needed
let str_kinds: Vec<String> = ks.iter().map(std::string::ToString::to_string).collect();
kind_clause = format!("AND kind IN ({})", str_kinds.join(", "));
} else {
debug!("mixed string/blob query");
// create clauses with "?" params for each tag value being searched
let str_clause = format!("value IN ({})", repeat_vars(str_vals.len()));
let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
// find evidence of the target tag name/value existing for this event.
let tag_clause = format!(
"e.id IN (SELECT e.id FROM event e LEFT JOIN tag t on e.id=t.event_id WHERE hidden!=TRUE and (name=? AND ({} OR {})))",
str_clause, blob_clause
);
// add the tag name as the first parameter
params.push(Box::new(key.to_string()));
// add all tag values that are plain strings as params
params.append(&mut str_vals);
// add all tag values that are blobs as params
params.append(&mut blob_vals);
filter_components.push(tag_clause);
}
kind_clause = format!("");
};
if f.since.is_some() {
since_clause = format!("AND created_at > {}", f.since.unwrap());
} else {
since_clause = format!("");
};
// Query for timestamp
if f.until.is_some() {
until_clause = format!("AND created_at < {}", f.until.unwrap());
} else {
until_clause = format!("");
};
let tag_clause = format!(
"e.id IN (SELECT t.event_id FROM tag t WHERE (name=? {str_clause} {kind_clause} {since_clause} {until_clause}))"
);
// add the tag name as the first parameter
params.push(Box::new(key.to_string()));
// add all tag values that are blobs as params
params.append(&mut str_vals);
filter_components.push(tag_clause);
}
}
// Query for timestamp
@@ -859,6 +855,9 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
}
// never display hidden events
query.push_str(" WHERE hidden!=TRUE");
// never display hidden events
filter_components.push("(expires_at IS NULL OR expires_at > ?)".to_string());
params.push(Box::new(unix_time()));
// build filter component conditions
if !filter_components.is_empty() {
query.push_str(" AND ");
@@ -867,7 +866,7 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
// Apply per-filter limit to this subquery.
// The use of a LIMIT implies a DESC order, to capture only the most recent events.
if let Some(lim) = f.limit {
let _ = write!(query, " ORDER BY e.created_at DESC LIMIT {}", lim);
let _ = write!(query, " ORDER BY e.created_at DESC LIMIT {lim}");
} else {
query.push_str(" ORDER BY e.created_at ASC");
}
@@ -894,7 +893,7 @@ fn _query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>, Vec<Stri
// encapsulate subqueries into select statements
let subqueries_selects: Vec<String> = subqueries
.iter()
.map(|s| format!("SELECT distinct content, created_at FROM ({})", s))
.map(|s| format!("SELECT distinct content, created_at FROM ({s})"))
.collect();
let query: String = subqueries_selects.join(" UNION ");
(query, params,indexes)
@@ -948,6 +947,54 @@ pub fn build_pool(
pool
}
/// Cleanup expired events on a regular basis
async fn cleanup_expired(pool: SqlitePool, frequency: Duration, write_in_progress: Arc<Mutex<u64>>) -> Result<()> {
tokio::task::spawn(async move {
loop {
tokio::select! {
_ = tokio::time::sleep(frequency) => {
if let Ok(mut conn) = pool.get() {
let mut _guard:Option<MutexGuard<u64>> = None;
// take a write lock to prevent event writes
// from proceeding while we are deleting
// events. This isn't necessary, but
// minimizes the chances of forcing event
// persistence to be retried.
_guard = Some(write_in_progress.lock().await);
let start = Instant::now();
let exp_res = tokio::task::spawn_blocking(move || {
delete_expired(&mut conn)
}).await;
match exp_res {
Ok(Ok(count)) => {
if count > 0 {
info!("removed {} expired events in: {:?}", count, start.elapsed());
}
},
_ => {
// either the task or underlying query failed
info!("there was an error cleaning up expired events: {:?}", exp_res);
}
}
}
}
};
}
});
Ok(())
}
/// Execute a query to delete all expired events
pub fn delete_expired(conn: &mut PooledConnection) -> Result<usize> {
let tx = conn.transaction()?;
let update_count = tx.execute(
"DELETE FROM event WHERE expires_at <= ?",
params![unix_time()],
)?;
tx.commit()?;
Ok(update_count)
}
/// Perform database WAL checkpoint on a regular basis
pub async fn db_checkpoint_task(pool: SqlitePool, frequency: Duration, checkpoint_in_progress: Arc<Mutex<u64>>) -> Result<()> {
// TODO; use acquire_many on the reader semaphore to stop them from interrupting this.

View File

@@ -10,17 +10,20 @@ use rusqlite::Connection;
use std::cmp::Ordering;
use std::time::Instant;
use tracing::{debug, error, info};
use indicatif::{ProgressBar, ProgressStyle};
/// Startup DB Pragmas
pub const STARTUP_SQL: &str = r##"
PRAGMA main.synchronous = NORMAL;
PRAGMA foreign_keys = ON;
PRAGMA journal_size_limit = 32768;
PRAGMA temp_store = 2; -- use memory, not temp files
PRAGMA main.cache_size = 20000; -- 80MB max cache size per conn
pragma mmap_size = 17179869184; -- cap mmap at 16GB
"##;
/// Latest database version
pub const DB_VERSION: usize = 15;
pub const DB_VERSION: usize = 17;
/// Schema definition
const INIT_SQL: &str = formatcp!(
@@ -40,6 +43,7 @@ id INTEGER PRIMARY KEY,
event_hash BLOB NOT NULL, -- 4-byte hash
first_seen INTEGER NOT NULL, -- when the event was first seen (not authored!) (seconds since 1970)
created_at INTEGER NOT NULL, -- when the event was authored
expires_at INTEGER, -- when the event expires and may be deleted
author BLOB NOT NULL, -- author pubkey
delegated_by BLOB, -- delegator pubkey (NIP-26)
kind INTEGER NOT NULL, -- event kind
@@ -58,23 +62,27 @@ CREATE INDEX IF NOT EXISTS kind_author_index ON event(kind,author);
CREATE INDEX IF NOT EXISTS kind_created_at_index ON event(kind,created_at);
CREATE INDEX IF NOT EXISTS author_created_at_index ON event(author,created_at);
CREATE INDEX IF NOT EXISTS author_kind_index ON event(author,kind);
CREATE INDEX IF NOT EXISTS event_expiration ON event(expires_at);
-- Tag Table
-- Tag values are stored as either a BLOB (if they come in as a
-- hex-string), or TEXT otherwise.
-- This means that searches need to select the appropriate column.
-- We duplicate the kind/created_at to make indexes much more efficient.
CREATE TABLE IF NOT EXISTS tag (
id INTEGER PRIMARY KEY,
event_id INTEGER NOT NULL, -- an event ID that contains a tag.
name TEXT, -- the tag name ("p", "e", whatever)
value TEXT, -- the tag value, if not hex.
value_hex BLOB, -- the tag value, if it can be interpreted as a lowercase hex string.
created_at INTEGER NOT NULL, -- when the event was authored
kind INTEGER NOT NULL, -- event kind
FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS tag_val_index ON tag(value);
CREATE INDEX IF NOT EXISTS tag_val_hex_index ON tag(value_hex);
CREATE INDEX IF NOT EXISTS tag_composite_index ON tag(event_id,name,value_hex,value);
CREATE INDEX IF NOT EXISTS tag_name_eid_index ON tag(name,event_id,value_hex);
CREATE INDEX IF NOT EXISTS tag_composite_index ON tag(event_id,name,value);
CREATE INDEX IF NOT EXISTS tag_name_eid_index ON tag(name,event_id,value);
CREATE INDEX IF NOT EXISTS tag_covering_index ON tag(name,kind,value,created_at,event_id);
-- NIP-05 User Validation
CREATE TABLE IF NOT EXISTS user_verification (
@@ -199,6 +207,12 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<usize> {
if curr_version == 14 {
curr_version = mig_14_to_15(conn)?;
}
if curr_version == 15 {
curr_version = mig_15_to_16(conn)?;
}
if curr_version == 16 {
curr_version = mig_16_to_17(conn)?;
}
if curr_version == DB_VERSION {
info!(
@@ -209,13 +223,12 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<usize> {
}
// Database is current, all is good
Ordering::Equal => {
debug!("Database version was already current (v{})", DB_VERSION);
debug!("Database version was already current (v{DB_VERSION})");
}
// Database is newer than what this code understands, abort
Ordering::Greater => {
panic!(
"Database version is newer than supported by this executable (v{} > v{})",
curr_version, DB_VERSION
"Database version is newer than supported by this executable (v{curr_version} > v{DB_VERSION})",
);
}
}
@@ -651,3 +664,92 @@ PRAGMA user_version = 15;
}
Ok(15)
}
fn mig_15_to_16(conn: &mut PooledConnection) -> Result<usize> {
let count = db_event_count(conn)?;
info!("database schema needs update from 15->16 (this may take a few minutes)");
let upgrade_sql = r##"
DROP TABLE tag;
CREATE TABLE tag (
id INTEGER PRIMARY KEY,
event_id INTEGER NOT NULL, -- an event ID that contains a tag.
name TEXT, -- the tag name ("p", "e", whatever)
value TEXT, -- the tag value, if not hex.
created_at INTEGER NOT NULL, -- when the event was authored
kind INTEGER NOT NULL, -- event kind
FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS tag_val_index ON tag(value);
CREATE INDEX IF NOT EXISTS tag_composite_index ON tag(event_id,name,value);
CREATE INDEX IF NOT EXISTS tag_name_eid_index ON tag(name,event_id,value);
CREATE INDEX IF NOT EXISTS tag_covering_index ON tag(name,kind,value,created_at,event_id);
"##;
let start = Instant::now();
let tx = conn.transaction()?;
let bar = ProgressBar::new(count.try_into().unwrap())
.with_message("rebuilding tags table");
bar.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] {bar:40.white/blue} {pos:>7}/{len:7} [{percent}%] {msg}",
)
.unwrap(),
);
{
tx.execute_batch(upgrade_sql)?;
let mut stmt = tx.prepare("select id, kind, created_at, content from event order by id;")?;
let mut tag_rows = stmt.query([])?;
let mut count = 0;
while let Some(row) = tag_rows.next()? {
count += 1;
if count%10==0 {
bar.inc(10);
}
let event_id: u64 = row.get(0)?;
let kind: u64 = row.get(1)?;
let created_at: u64 = row.get(2)?;
let event_json: String = row.get(3)?;
let event: Event = serde_json::from_str(&event_json)?;
// look at each event, and each tag, creating new tag entries if appropriate.
for t in event.tags.iter().filter(|x| x.len() > 1) {
let tagname = t.get(0).unwrap();
let tagnamechar_opt = single_char_tagname(tagname);
if tagnamechar_opt.is_none() {
continue;
}
// safe because len was > 1
let tagval = t.get(1).unwrap();
// otherwise, insert as text
tx.execute(
"INSERT INTO tag (event_id, name, value, kind, created_at) VALUES (?1, ?2, ?3, ?4, ?5);",
params![event_id, tagname, &tagval, kind, created_at],
)?;
}
}
tx.execute("PRAGMA user_version = 16;", [])?;
}
bar.finish();
tx.commit()?;
info!("database schema upgraded v15 -> v16 in {:?}", start.elapsed());
Ok(16)
}
fn mig_16_to_17(conn: &mut PooledConnection) -> Result<usize> {
info!("database schema needs update from 16->17");
let upgrade_sql = r##"
ALTER TABLE event ADD COLUMN expires_at INTEGER;
CREATE INDEX IF NOT EXISTS event_expiration ON event(expires_at);
PRAGMA user_version = 17;
"##;
match conn.execute_batch(upgrade_sql) {
Ok(()) => {
info!("database schema upgraded v16 -> v17");
}
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be upgraded");
}
}
Ok(17)
}

View File

@@ -7,6 +7,8 @@ use crate::repo::NostrRepo;
use crate::db;
use crate::db::SubmittedEvent;
use crate::error::{Error, Result};
use crate::event::EventWrapper;
use crate::server::EventWrapper::{WrappedAuth, WrappedEvent};
use crate::event::Event;
use crate::event::EventCmd;
use crate::info::RelayInfo;
@@ -14,6 +16,7 @@ use crate::nip05;
use crate::notice::Notice;
use crate::subscription::Subscription;
use prometheus::IntCounterVec;
use prometheus::IntGauge;
use prometheus::{Encoder, Histogram, IntCounter, HistogramOpts, Opts, Registry, TextEncoder};
use futures::SinkExt;
use futures::StreamExt;
@@ -29,6 +32,9 @@ use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::convert::Infallible;
use std::fs::File;
use std::io::BufReader;
use std::io::Read;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
@@ -47,8 +53,10 @@ use tungstenite::error::Error as WsError;
use tungstenite::handshake;
use tungstenite::protocol::Message;
use tungstenite::protocol::WebSocketConfig;
use crate::server::Error::CommandUnknownError;
/// Handle arbitrary HTTP requests, including for `WebSocket` upgrades.
#[allow(clippy::too_many_arguments)]
async fn handle_web_request(
mut request: Request<Body>,
repo: Arc<dyn NostrRepo>,
@@ -57,6 +65,7 @@ async fn handle_web_request(
broadcast: Sender<Event>,
event_tx: tokio::sync::mpsc::Sender<SubmittedEvent>,
shutdown: Receiver<()>,
favicon: Option<Vec<u8>>,
registry: Registry,
metrics: NostrMetrics,
) -> Result<Response<Body>, Infallible> {
@@ -126,9 +135,8 @@ async fn handle_web_request(
// todo: trace, don't print...
Err(e) => println!(
"error when trying to upgrade connection \
from address {} to websocket connection. \
Error is: {}",
remote_addr, e
from address {remote_addr} to websocket connection. \
Error is: {e}",
),
}
});
@@ -138,7 +146,7 @@ async fn handle_web_request(
Err(error) => {
warn!("websocket response failed");
let mut res =
Response::new(Body::from(format!("Failed to create websocket: {}", error)));
Response::new(Body::from(format!("Failed to create websocket: {error}")));
*res.status_mut() = StatusCode::BAD_REQUEST;
return Ok(res);
}
@@ -156,7 +164,7 @@ async fn handle_web_request(
if mt_str.contains("application/nostr+json") {
// build a relay info response
debug!("Responding to server info request");
let rinfo = RelayInfo::from(settings.info);
let rinfo = RelayInfo::from(settings);
let b = Body::from(serde_json::to_string_pretty(&rinfo).unwrap());
return Ok(Response::builder()
.status(200)
@@ -185,6 +193,23 @@ async fn handle_web_request(
.body(Body::from(buffer))
.unwrap())
}
("/favicon.ico", false) => {
if let Some(favicon_bytes) = favicon {
info!("returning favicon");
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "image/x-icon")
// 1 month cache
.header("Cache-Control", "public, max-age=2419200")
.body(Body::from(favicon_bytes))
.unwrap())
} else {
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from(""))
.unwrap())
}
}
(_, _) => {
//handle any other url
Ok(Response::builder()
@@ -232,6 +257,10 @@ fn create_metrics() -> (Registry, NostrMetrics) {
"nostr_query_seconds",
"Subscription response times",
)).unwrap();
let query_db = Histogram::with_opts(HistogramOpts::new(
"nostr_filter_seconds",
"Filter SQL query times",
)).unwrap();
let write_events = Histogram::with_opts(HistogramOpts::new(
"nostr_events_write_seconds",
"Event writing response times",
@@ -244,6 +273,9 @@ fn create_metrics() -> (Registry, NostrMetrics) {
"nostr_connections_total",
"New connections",
)).unwrap();
let db_connections = IntGauge::with_opts(Opts::new(
"nostr_db_connections", "Active database connections"
)).unwrap();
let query_aborts = IntCounterVec::new(
Opts::new("nostr_query_abort_total", "Aborted queries"),
vec!["reason"].as_slice(),
@@ -260,33 +292,52 @@ fn create_metrics() -> (Registry, NostrMetrics) {
"nostr_cmd_close_total",
"CLOSE commands",
)).unwrap();
let cmd_auth = IntCounter::with_opts(Opts::new(
"nostr_cmd_auth_total",
"AUTH commands",
)).unwrap();
let disconnects = IntCounterVec::new(
Opts::new("nostr_disconnects_total", "Client disconnects"),
vec!["reason"].as_slice(),
).unwrap();
registry.register(Box::new(query_sub.clone())).unwrap();
registry.register(Box::new(query_db.clone())).unwrap();
registry.register(Box::new(write_events.clone())).unwrap();
registry.register(Box::new(sent_events.clone())).unwrap();
registry.register(Box::new(connections.clone())).unwrap();
registry.register(Box::new(db_connections.clone())).unwrap();
registry.register(Box::new(query_aborts.clone())).unwrap();
registry.register(Box::new(cmd_req.clone())).unwrap();
registry.register(Box::new(cmd_event.clone())).unwrap();
registry.register(Box::new(cmd_close.clone())).unwrap();
registry.register(Box::new(cmd_auth.clone())).unwrap();
registry.register(Box::new(disconnects.clone())).unwrap();
let metrics = NostrMetrics {
query_sub,
query_db,
write_events,
sent_events,
connections,
db_connections,
disconnects,
query_aborts,
cmd_req,
cmd_event,
cmd_close,
cmd_auth,
};
(registry,metrics)
}
fn file_bytes(path: &str) -> Result<Vec<u8>> {
let f = File::open(path)?;
let mut reader = BufReader::new(f);
let mut buffer = Vec::new();
// Read file into vector.
reader.read_to_end(&mut buffer)?;
Ok(buffer)
}
/// Start running a Nostr relay server.
pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Result<(), Error> {
trace!("Config: {:?}", settings);
@@ -334,7 +385,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
// give each thread a unique numeric name
static ATOMIC_ID: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1,Ordering::SeqCst);
format!("tokio-ws-{}", id)
format!("tokio-ws-{id}")
})
// limit concurrent SQLite blocking threads
.max_blocking_threads(settings.limits.max_blocking_threads)
@@ -433,6 +484,12 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
//let pool_monitor = pool.clone();
//tokio::spawn(async move {db::monitor_pool("reader", pool_monitor).await;});
// Read in the favicon if it exists
let favicon = settings.info.favicon.as_ref().and_then(|x| {
info!("reading favicon...");
file_bytes(x).ok()
});
// A `Service` is needed for every connection, so this
// creates one from our `handle_request` function.
let make_svc = make_service_fn(|conn: &AddrStream| {
@@ -442,6 +499,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
let event = event_tx.clone();
let stop = invoke_shutdown.clone();
let settings = settings.clone();
let favicon = favicon.clone();
let registry = registry.clone();
let metrics = metrics.clone();
async move {
@@ -455,6 +513,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
bcast.clone(),
event.clone(),
stop.subscribe(),
favicon.clone(),
registry.clone(),
metrics.clone(),
)
@@ -466,7 +525,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
.with_graceful_shutdown(ctrl_c_or_signal(webserver_shutdown_listen));
// run hyper in this thread. This is why the thread does not return.
if let Err(e) = server.await {
eprintln!("server error: {}", e);
eprintln!("server error: {e}");
}
});
Ok(())
@@ -476,7 +535,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
#[derive(Deserialize, Serialize, Clone, PartialEq, Eq, Debug)]
#[serde(untagged)]
pub enum NostrMessage {
/// An `EVENT` message
/// `EVENT` and `AUTH` messages
EventMsg(EventCmd),
/// A `REQ` message
SubMsg(Subscription),
@@ -516,6 +575,7 @@ fn make_notice_message(notice: &Notice) -> Message {
let json = match notice {
Notice::Message(ref msg) => json!(["NOTICE", msg]),
Notice::EventResult(ref res) => json!(["OK", res.id, res.status.to_bool(), res.msg]),
Notice::AuthChallenge(ref challenge) => json!(["AUTH", challenge]),
};
Message::text(json.to_string())
@@ -529,6 +589,7 @@ struct ClientInfo {
/// Handle new client connections. This runs through an event loop
/// for all client communication.
#[allow(clippy::too_many_arguments)]
async fn nostr_server(
repo: Arc<dyn NostrRepo>,
client_info: ClientInfo,
@@ -564,7 +625,7 @@ async fn nostr_server(
// we will send out the tx handle to any query we generate.
// this has capacity for some of the larger requests we see, which
// should allow the DB thread to release the handle earlier.
let (query_tx, mut query_rx) = mpsc::channel::<db::QueryResult>(20000);
let (query_tx, mut query_rx) = mpsc::channel::<db::QueryResult>(20_000);
// Create channel for receiving NOTICEs
let (notice_tx, mut notice_rx) = mpsc::channel::<Notice>(128);
@@ -588,12 +649,14 @@ async fn nostr_server(
// and how many it received from queries.
let mut client_published_event_count: usize = 0;
let mut client_received_event_count: usize = 0;
debug!("new client connection (cid: {}, ip: {:?})", cid, conn.ip());
let origin = client_info.origin.unwrap_or_else(|| "<unspecified>".into());
let unspec = "<unspecified>".to_string();
info!("new client connection (cid: {}, ip: {:?})", cid, conn.ip());
let origin = client_info.origin.as_ref().unwrap_or_else(|| &unspec);
let user_agent = client_info
.user_agent
.unwrap_or_else(|| "<unspecified>".into());
debug!(
.user_agent.as_ref()
.unwrap_or_else(|| &unspec);
info!(
"cid: {}, origin: {:?}, user-agent: {:?}",
cid, origin, user_agent
);
@@ -601,6 +664,14 @@ async fn nostr_server(
// Measure connections
metrics.connections.inc();
if settings.authorization.nip42_auth {
conn.generate_auth_challenge();
if let Some(challenge) = conn.auth_challenge() {
ws_stream.send(
make_notice_message(&Notice::AuthChallenge(challenge.to_string()))).await.ok();
}
}
loop {
tokio::select! {
_ = shutdown.recv() => {
@@ -627,7 +698,7 @@ async fn nostr_server(
// database informed us of a query result we asked for
let subesc = query_result.sub_id.replace('"', "");
if query_result.event == "EOSE" {
let send_str = format!("[\"EOSE\",\"{}\"]", subesc);
let send_str = format!("[\"EOSE\",\"{subesc}\"]");
ws_stream.send(Message::Text(send_str)).await.ok();
} else {
client_received_event_count += 1;
@@ -654,7 +725,7 @@ async fn nostr_server(
// create an event response and send it
let subesc = s.replace('"', "");
metrics.sent_events.with_label_values(&["realtime"]).inc();
ws_stream.send(Message::Text(format!("[\"EVENT\",\"{}\",{}]", subesc, event_str))).await.ok();
ws_stream.send(Message::Text(format!("[\"EVENT\",\"{subesc}\",{event_str}]"))).await.ok();
} else {
warn!("could not serialize event: {:?}", global_event.get_event_id_prefix());
}
@@ -680,7 +751,7 @@ async fn nostr_server(
},
Some(Err(WsError::Capacity(MessageTooLong{size, max_size}))) => {
ws_stream.send(
make_notice_message(&Notice::message(format!("message too large ({} > {})",size, max_size)))).await.ok();
make_notice_message(&Notice::message(format!("message too large ({size} > {max_size})")))).await.ok();
continue;
},
None |
@@ -714,30 +785,73 @@ async fn nostr_server(
// An EventCmd needs to be validated to be converted into an Event
// handle each type of message
let evid = ec.event_id().to_owned();
let parsed : Result<Event> = Result::<Event>::from(ec);
metrics.cmd_event.inc();
let parsed : Result<EventWrapper> = Result::<EventWrapper>::from(ec);
match parsed {
Ok(e) => {
Ok(WrappedEvent(e)) => {
metrics.cmd_event.inc();
let id_prefix:String = e.id.chars().take(8).collect();
debug!("successfully parsed/validated event: {:?} (cid: {}, kind: {})", id_prefix, cid, e.kind);
// check if the event is too far in the future.
if e.is_valid_timestamp(settings.options.reject_future_seconds) {
// check if event is expired
if e.is_expired() {
let notice = Notice::invalid(e.id, "The event has already expired");
ws_stream.send(make_notice_message(&notice)).await.ok();
// check if the event is too far in the future.
} else if e.is_valid_timestamp(settings.options.reject_future_seconds) {
// Write this to the database.
let submit_event = SubmittedEvent { event: e.clone(), notice_tx: notice_tx.clone() };
let auth_pubkey = conn.auth_pubkey().and_then(|pubkey| hex::decode(&pubkey).ok());
let submit_event = SubmittedEvent {
event: e.clone(),
notice_tx: notice_tx.clone(),
source_ip: conn.ip().to_string(),
origin: client_info.origin.clone(),
user_agent: client_info.user_agent.clone(),
auth_pubkey };
event_tx.send(submit_event).await.ok();
client_published_event_count += 1;
} else {
info!("client: {} sent a far future-dated event", cid);
if let Some(fut_sec) = settings.options.reject_future_seconds {
let msg = format!("The event created_at field is out of the acceptable range (+{}sec) for this relay.",fut_sec);
let msg = format!("The event created_at field is out of the acceptable range (+{fut_sec}sec) for this relay.");
let notice = Notice::invalid(e.id, &msg);
ws_stream.send(make_notice_message(&notice)).await.ok();
}
}
},
Ok(WrappedAuth(event)) => {
metrics.cmd_auth.inc();
if settings.authorization.nip42_auth {
let id_prefix:String = event.id.chars().take(8).collect();
debug!("successfully parsed auth: {:?} (cid: {})", id_prefix, cid);
match &settings.info.relay_url {
None => {
error!("AUTH command received, but relay_url is not set in the config file (cid: {})", cid);
},
Some(relay) => {
match conn.authenticate(&event, &relay) {
Ok(_) => {
let pubkey = match conn.auth_pubkey() {
Some(k) => k.chars().take(8).collect(),
None => "<unspecified>".to_string(),
};
info!("client is authenticated: (cid: {}, pubkey: {:?})", cid, pubkey);
},
Err(e) => {
info!("authentication error: {} (cid: {})", e, cid);
ws_stream.send(make_notice_message(&Notice::message(format!("Authentication error: {e}")))).await.ok();
},
}
}
}
} else {
let e = CommandUnknownError;
info!("client sent an invalid event (cid: {})", cid);
ws_stream.send(make_notice_message(&Notice::invalid(evid, &format!("{e}")))).await.ok();
}
},
Err(e) => {
metrics.cmd_event.inc();
info!("client sent an invalid event (cid: {})", cid);
ws_stream.send(make_notice_message(&Notice::invalid(evid, &format!("{}", e)))).await.ok();
ws_stream.send(make_notice_message(&Notice::invalid(evid, &format!("{e}")))).await.ok();
}
}
},
@@ -770,7 +884,7 @@ async fn nostr_server(
},
Err(e) => {
info!("Subscription error: {} (cid: {}, sub: {:?})", e, cid, s.id);
ws_stream.send(make_notice_message(&Notice::message(format!("Subscription error: {}", e)))).await.ok();
ws_stream.send(make_notice_message(&Notice::message(format!("Subscription error: {e}")))).await.ok();
}
}
}
@@ -799,11 +913,11 @@ async fn nostr_server(
break;
}
Err(Error::EventMaxLengthError(s)) => {
info!("client sent event larger ({} bytes) than max size (cid: {})", s, cid);
info!("client sent command larger ({} bytes) than max size (cid: {})", s, cid);
ws_stream.send(make_notice_message(&Notice::message("event exceeded max size".into()))).await.ok();
},
Err(Error::ProtoParseError) => {
info!("client sent event that could not be parsed (cid: {})", cid);
info!("client sent command that could not be parsed (cid: {})", cid);
ws_stream.send(make_notice_message(&Notice::message("could not parse command".into()))).await.ok();
},
Err(e) => {
@@ -830,6 +944,8 @@ async fn nostr_server(
#[derive(Clone)]
pub struct NostrMetrics {
pub query_sub: Histogram, // response time of successful subscriptions
pub query_db: Histogram, // individual database query execution time
pub db_connections: IntGauge, // database connections in use
pub write_events: Histogram, // response time of event writes
pub sent_events: IntCounterVec, // count of events sent to clients
pub connections: IntCounter, // count of websocket connections
@@ -838,5 +954,6 @@ pub struct NostrMetrics {
pub cmd_req: IntCounter, // count of REQ commands received
pub cmd_event: IntCounter, // count of EVENT commands received
pub cmd_close: IntCounter, // count of CLOSE commands received
pub cmd_auth: IntCounter, // count of AUTH commands received
}

View File

@@ -70,7 +70,7 @@ impl Serialize for ReqFilter {
if let Some(tags) = &self.tags {
for (k,v) in tags {
let vals:Vec<&String> = v.iter().collect();
map.serialize_entry(&format!("#{}",k), &vals)?;
map.serialize_entry(&format!("#{k}"), &vals)?;
}
}
map.end()

View File

@@ -1,6 +1,7 @@
//! Common utility functions
use bech32::FromBase32;
use std::time::SystemTime;
use url::Url;
/// Seconds since 1970.
#[must_use] pub fn unix_time() -> u64 {
@@ -33,6 +34,10 @@ pub fn nip19_to_hex(s: &str) -> Result<String, bech32::Error> {
})
}
pub fn host_str(url: &String) -> Option<String> {
Url::parse(url).ok().and_then(|u| u.host_str().map(|s| s.to_string()))
}
#[cfg(test)]
mod tests {
use super::*;

356
tests/conn.rs Normal file
View File

@@ -0,0 +1,356 @@
#[cfg(test)]
mod tests {
use bitcoin_hashes::hex::ToHex;
use bitcoin_hashes::sha256;
use bitcoin_hashes::Hash;
use secp256k1::rand;
use secp256k1::{KeyPair, Secp256k1, XOnlyPublicKey};
use nostr_rs_relay::conn::ClientConn;
use nostr_rs_relay::error::Error;
use nostr_rs_relay::event::Event;
use nostr_rs_relay::utils::unix_time;
const RELAY: &str = "wss://nostr.example.com/";
#[test]
fn test_generate_auth_challenge() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let last_auth_challenge = client_conn.auth_challenge().cloned();
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_ne!(
client_conn.auth_challenge().unwrap(),
&last_auth_challenge.unwrap()
);
assert_eq!(client_conn.auth_pubkey(), None);
}
#[test]
fn test_authenticate_with_valid_event() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let challenge = client_conn.auth_challenge().unwrap();
let event = auth_event(challenge);
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Ok(())));
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), Some(&event.pubkey));
}
#[test]
fn test_fail_to_authenticate_in_invalid_state() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let event = auth_event(&"challenge".into());
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Err(Error::AuthFailure)));
}
#[test]
fn test_authenticate_when_already_authenticated() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let challenge = client_conn.auth_challenge().unwrap().clone();
let event = auth_event(&challenge);
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Ok(())));
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), Some(&event.pubkey));
let event1 = auth_event(&challenge);
let result1 = client_conn.authenticate(&event1, &RELAY.into());
assert!(matches!(result1, Ok(())));
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), Some(&event.pubkey));
assert_ne!(client_conn.auth_pubkey(), Some(&event1.pubkey));
}
#[test]
fn test_fail_to_authenticate_with_invalid_event() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let challenge = client_conn.auth_challenge().unwrap();
let mut event = auth_event(challenge);
event.sig = event.sig.chars().rev().collect::<String>();
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Err(Error::AuthFailure)));
}
#[test]
fn test_fail_to_authenticate_with_invalid_event_kind() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let challenge = client_conn.auth_challenge().unwrap();
let event = auth_event_with_kind(challenge, 9999999999999999);
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Err(Error::AuthFailure)));
}
#[test]
fn test_fail_to_authenticate_with_expired_timestamp() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let challenge = client_conn.auth_challenge().unwrap();
let event = auth_event_with_created_at(challenge, unix_time() - 1200); // 20 minutes
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Err(Error::AuthFailure)));
}
#[test]
fn test_fail_to_authenticate_with_future_timestamp() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let challenge = client_conn.auth_challenge().unwrap();
let event = auth_event_with_created_at(challenge, unix_time() + 1200); // 20 minutes
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Err(Error::AuthFailure)));
}
#[test]
fn test_fail_to_authenticate_without_tags() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let event = auth_event_without_tags();
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Err(Error::AuthFailure)));
}
#[test]
fn test_fail_to_authenticate_without_challenge() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let event = auth_event_without_challenge();
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Err(Error::AuthFailure)));
}
#[test]
fn test_fail_to_authenticate_without_relay() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let challenge = client_conn.auth_challenge().unwrap();
let event = auth_event_without_relay(challenge);
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Err(Error::AuthFailure)));
}
#[test]
fn test_fail_to_authenticate_with_invalid_challenge() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let event = auth_event(&"invalid challenge".into());
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Err(Error::AuthFailure)));
}
#[test]
fn test_fail_to_authenticate_with_invalid_relay() {
let mut client_conn = ClientConn::new("127.0.0.1".into());
assert_eq!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
client_conn.generate_auth_challenge();
assert_ne!(client_conn.auth_challenge(), None);
assert_eq!(client_conn.auth_pubkey(), None);
let challenge = client_conn.auth_challenge().unwrap();
let event = auth_event_with_relay(challenge, &"xyz".into());
let result = client_conn.authenticate(&event, &RELAY.into());
assert!(matches!(result, Err(Error::AuthFailure)));
}
fn auth_event(challenge: &String) -> Event {
create_auth_event(Some(challenge), Some(&RELAY.into()), 22242, unix_time())
}
fn auth_event_with_kind(challenge: &String, kind: u64) -> Event {
create_auth_event(Some(challenge), Some(&RELAY.into()), kind, unix_time())
}
fn auth_event_with_created_at(challenge: &String, created_at: u64) -> Event {
create_auth_event(Some(challenge), Some(&RELAY.into()), 22242, created_at)
}
fn auth_event_without_challenge() -> Event {
create_auth_event(None, Some(&RELAY.into()), 22242, unix_time())
}
fn auth_event_without_relay(challenge: &String) -> Event {
create_auth_event(Some(challenge), None, 22242, unix_time())
}
fn auth_event_without_tags() -> Event {
create_auth_event(None, None, 22242, unix_time())
}
fn auth_event_with_relay(challenge: &String, relay: &String) -> Event {
create_auth_event(Some(challenge), Some(relay), 22242, unix_time())
}
fn create_auth_event(
challenge: Option<&String>,
relay: Option<&String>,
kind: u64,
created_at: u64,
) -> Event {
let secp = Secp256k1::new();
let key_pair = KeyPair::new(&secp, &mut rand::thread_rng());
let public_key = XOnlyPublicKey::from_keypair(&key_pair);
let mut tags: Vec<Vec<String>> = vec![];
if let Some(c) = challenge {
let tag = vec!["challenge".into(), c.into()];
tags.push(tag);
}
if let Some(r) = relay {
let tag = vec!["relay".into(), r.into()];
tags.push(tag);
}
let mut event = Event {
id: "0".to_owned(),
pubkey: public_key.to_hex(),
delegated_by: None,
created_at: created_at,
kind: kind,
tags: tags,
content: "".to_owned(),
sig: "0".to_owned(),
tagidx: None,
};
let c = event.to_canonical().unwrap();
let digest: sha256::Hash = sha256::Hash::hash(c.as_bytes());
let msg = secp256k1::Message::from_slice(digest.as_ref()).unwrap();
let sig = secp.sign_schnorr(&msg, &key_pair);
event.id = format!("{digest:x}");
event.sig = sig.to_hex();
event
}
}