Compare commits

...

31 Commits
0.8.2 ... 0.8.5

Author SHA1 Message Date
Greg Heartsfield
692925942a build: bump version to 0.8.5 2023-02-13 17:53:33 -06:00
Greg Heartsfield
84afd4b64e refactor: whitespace 2023-02-13 17:52:00 -06:00
Greg Heartsfield
46160bb1f9 fix: correct name of gRPC configuration in toml 2023-02-13 17:30:26 -06:00
Greg Heartsfield
2fc9168a38 fix: SQL error with parameterized replaceable events 2023-02-13 17:10:42 -06:00
Greg Heartsfield
01d0d44868 build: bump version to 0.8.4 2023-02-13 09:34:30 -06:00
Greg Heartsfield
93f6337fda fix: upgrade docker image to include OpenSSL 3 2023-02-13 09:33:14 -06:00
Greg Heartsfield
f3a42712a6 build: bump version to 0.8.3 2023-02-13 08:08:28 -06:00
Greg Heartsfield
27361d064a improvement: upgrade multiple dependencies
Updating anyhow v1.0.68 -> v1.0.69
Updating axum v0.6.4 -> v0.6.6
Updating cxx v1.0.89 -> v1.0.90
Updating cxx-build v1.0.89 -> v1.0.90
Updating cxxbridge-flags v1.0.89 -> v1.0.90
Updating cxxbridge-macro v1.0.89 -> v1.0.90
Adding hermit-abi v0.3.1
Updating is-terminal v0.4.2 -> v0.4.3
Updating pest v2.5.4 -> v2.5.5
Updating pest_derive v2.5.4 -> v2.5.5
Updating pest_generator v2.5.4 -> v2.5.5
Updating pest_meta v2.5.4 -> v2.5.5
Updating proc-macro2 v1.0.50 -> v1.0.51
Updating raw-cpuid v10.6.0 -> v10.6.1
Updating rustix v0.36.7 -> v0.36.8
Updating serde_json v1.0.91 -> v1.0.93
Updating signal-hook-registry v1.4.0 -> v1.4.1
Updating thread_local v1.1.4 -> v1.1.7
Updating tinyvec_macros v0.1.0 -> v0.1.1
Updating tokio-native-tls v0.3.0 -> v0.3.1
Updating tokio-util v0.7.4 -> v0.7.7
2023-02-13 07:57:14 -06:00
Greg Heartsfield
3bafb611e5 build: install packages with sudo for github ci 2023-02-13 07:50:48 -06:00
Greg Heartsfield
b960ab70de build: add protobuf compiler to github ci workflow 2023-02-13 07:48:09 -06:00
Greg Heartsfield
15e2f097aa improvement: advise operator this upgrade may take a minute 2023-02-13 07:37:13 -06:00
Greg Heartsfield
185f9e7abb feat: improved query performance when looking for deletion events (improves event insert time) 2023-02-12 15:43:22 -06:00
Greg Heartsfield
f44dae6ac9 fix: use correct start time for logging SQL generation 2023-02-12 15:00:50 -06:00
Greg Heartsfield
abc356c17d perf(sqlite): index tags with their kind/created_at fields
This updates the DB schema to remove the distinction between hex and
non-hex tag values, for simplicity.  The space savings did not seem to
be worth the extra complexity.

The SQLite tags table is denormalized to duplicate kind/created_at to
improve the ability of tag indexes to filter data.
2023-02-12 14:33:40 -06:00
Greg Heartsfield
81f8256c37 fix: container builds support protobuf compilation 2023-02-11 14:30:42 -06:00
Greg Heartsfield
b3db2bd081 fix: protobuf compiler not needed in runtime container 2023-02-11 13:57:53 -06:00
Greg Heartsfield
d31e974d56 fix: add protobuf-compiler for Docker and CI builds 2023-02-11 13:56:15 -06:00
Greg Heartsfield
36eaf9fea5 improvement: make comments match code for nauthz example 2023-02-11 13:36:10 -06:00
Greg Heartsfield
a16c4e698a feat: gRPC authorization for events
closes: https://todo.sr.ht/~gheartsfield/nostr-rs-relay/46
2023-02-11 13:26:08 -06:00
Greg Heartsfield
e63d179424 fix: prevent loop when nip05 metadata channel closes 2023-02-11 13:26:08 -06:00
rorp
28b7b83a6e improvement: make config file location configurable via CLI args 2023-02-08 07:59:26 -06:00
Greg Heartsfield
2e42b1b86e improvement: log source IP for persisted events 2023-02-06 17:15:27 -06:00
Naoki Ikeguchi
bd07a11f50 refactor: Fix clippy warnings 2023-02-06 07:29:45 -06:00
Greg Heartsfield
bc4b45d4b8 docs: update DB maintenance for v0.8.x 2023-02-06 07:07:23 -06:00
thesimplekid
1ca5d652de format: postgres_migrations 2023-02-06 06:44:57 -06:00
thesimplekid
d7cceab8fc fix: tag table does not have a unique constraint
`cargo fmt` on the document.
2023-02-06 06:44:57 -06:00
Greg Heartsfield
2805a96e5b docs: nginx timeouts
suggested by Michael Dilger;
ref: https://snort.social/e/note15jtrt8zsrvckyv6hggmanwk43p50gvmxe30s62x9tt6x9hyruzaq6fca44
2023-02-05 17:17:13 -06:00
Greg Heartsfield
ac14a0759f docs: clarify wording around subscription limits 2023-02-03 13:08:31 -06:00
Greg Heartsfield
cdd4e5949f fix: correctly log SQL generation time 2023-02-03 10:39:41 -06:00
Greg Heartsfield
5999009779 improvement: increase connection cache size 2023-02-02 18:34:30 -06:00
Greg Heartsfield
e36c791c53 improvement: prevent spilling temp indexes to disk 2023-02-02 18:15:14 -06:00
32 changed files with 1872 additions and 243 deletions

View File

@@ -7,6 +7,7 @@ environment:
packages:
- cargo
- sqlite-devel
- protobuf-compiler
sources:
- https://git.sr.ht/~gheartsfield/nostr-rs-relay/
shell: false

View File

@@ -4,7 +4,7 @@ on:
push:
branches:
- master
jobs:
test_nostr-rs-relay:
runs-on: ubuntu-latest
@@ -13,26 +13,27 @@ jobs:
- name: Update local toolchain
run: |
sudo apt-get install -y protobuf-compiler
rustup update
rustup component add clippy
rustup install nightly
rustup install nightly
- name: Toolchain info
run: |
cargo --version --verbose
rustc --version
cargo clippy --version
cargo clippy --version
# - name: Lint
# run: |
# cargo fmt -- --check
# cargo clippy -- -D warnings
# cargo clippy -- -D warnings
- name: Test
run: |
cargo check
cargo test --all
cargo test --all
- name: Build
run: |
cargo build --release
cargo build --release --locked

178
Cargo.lock generated
View File

@@ -54,9 +54,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.68"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61"
checksum = "224afbd727c3d6e4b90103ece64b8d1b67fbb1973b1046c2281eed3f3803f800"
[[package]]
name = "async-channel"
@@ -224,9 +224,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.6.4"
version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5694b64066a2459918d8074c2ce0d5a88f409431994c2356617c8ae0c4721fc"
checksum = "4e246206a63c9830e118d12c894f56a82033da1a2361f5544deeee3df85c99d9"
dependencies = [
"async-trait",
"axum-core",
@@ -622,9 +622,9 @@ dependencies = [
[[package]]
name = "cxx"
version = "1.0.89"
version = "1.0.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc831ee6a32dd495436e317595e639a587aa9907bef96fe6e6abc290ab6204e9"
checksum = "90d59d9acd2a682b4e40605a242f6670eaa58c5957471cbf85e8aa6a0b97a5e8"
dependencies = [
"cc",
"cxxbridge-flags",
@@ -634,9 +634,9 @@ dependencies = [
[[package]]
name = "cxx-build"
version = "1.0.89"
version = "1.0.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94331d54f1b1a8895cd81049f7eaaaef9d05a7dcb4d1fd08bf3ff0806246789d"
checksum = "ebfa40bda659dd5c864e65f4c9a2b0aff19bea56b017b9b77c73d3766a453a38"
dependencies = [
"cc",
"codespan-reporting",
@@ -649,15 +649,15 @@ dependencies = [
[[package]]
name = "cxxbridge-flags"
version = "1.0.89"
version = "1.0.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48dcd35ba14ca9b40d6e4b4b39961f23d835dbb8eed74565ded361d93e1feb8a"
checksum = "457ce6757c5c70dc6ecdbda6925b958aae7f959bda7d8fb9bde889e34a09dc03"
[[package]]
name = "cxxbridge-macro"
version = "1.0.89"
version = "1.0.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81bbeb29798b407ccd82a3324ade1a7286e0d29851475990b612670f6f5124d2"
checksum = "ebf883b7aacd7b2aeb2a7b338648ee19f57c140d4ee8e52c68979c6b2f7f2263"
dependencies = [
"proc-macro2",
"quote",
@@ -783,6 +783,12 @@ dependencies = [
"instant",
]
[[package]]
name = "fixedbitset"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]]
name = "flate2"
version = "1.0.25"
@@ -1095,6 +1101,12 @@ dependencies = [
"libc",
]
[[package]]
name = "hermit-abi"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286"
[[package]]
name = "hex"
version = "0.4.3"
@@ -1291,14 +1303,14 @@ dependencies = [
[[package]]
name = "is-terminal"
version = "0.4.2"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28dfb6c8100ccc63462345b67d1bbc3679177c75ee4bf59bf29c8b1d110b8189"
checksum = "22e18b0a45d56fe973d6db23972bf5bc46f988a4a2385deac9cc29572f09daef"
dependencies = [
"hermit-abi",
"hermit-abi 0.3.1",
"io-lifetimes",
"rustix",
"windows-sys 0.42.0",
"windows-sys 0.45.0",
]
[[package]]
@@ -1490,6 +1502,12 @@ dependencies = [
"windows-sys 0.42.0",
]
[[package]]
name = "multimap"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "native-tls"
version = "0.2.11"
@@ -1532,7 +1550,7 @@ checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]]
name = "nostr-rs-relay"
version = "0.8.2"
version = "0.8.5"
dependencies = [
"anyhow",
"async-std",
@@ -1556,6 +1574,7 @@ dependencies = [
"nonzero_ext",
"parse_duration",
"prometheus",
"prost",
"r2d2",
"r2d2_sqlite",
"rand 0.8.5",
@@ -1568,6 +1587,8 @@ dependencies = [
"thiserror",
"tokio",
"tokio-tungstenite",
"tonic",
"tonic-build",
"tracing",
"tracing-subscriber 0.2.25",
"tungstenite",
@@ -1657,7 +1678,7 @@ version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b"
dependencies = [
"hermit-abi",
"hermit-abi 0.2.6",
"libc",
]
@@ -1819,9 +1840,9 @@ checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
[[package]]
name = "pest"
version = "2.5.4"
version = "2.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ab62d2fa33726dbe6321cc97ef96d8cde531e3eeaf858a058de53a8a6d40d8f"
checksum = "028accff104c4e513bad663bbcd2ad7cfd5304144404c31ed0a77ac103d00660"
dependencies = [
"thiserror",
"ucd-trie",
@@ -1829,9 +1850,9 @@ dependencies = [
[[package]]
name = "pest_derive"
version = "2.5.4"
version = "2.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bf026e2d0581559db66d837fe5242320f525d85c76283c61f4d51a1238d65ea"
checksum = "2ac3922aac69a40733080f53c1ce7f91dcf57e1a5f6c52f421fadec7fbdc4b69"
dependencies = [
"pest",
"pest_generator",
@@ -1839,9 +1860,9 @@ dependencies = [
[[package]]
name = "pest_generator"
version = "2.5.4"
version = "2.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b27bd18aa01d91c8ed2b61ea23406a676b42d82609c6e2581fba42f0c15f17f"
checksum = "d06646e185566b5961b4058dd107e0a7f56e77c3f484549fb119867773c0f202"
dependencies = [
"pest",
"pest_meta",
@@ -1852,15 +1873,25 @@ dependencies = [
[[package]]
name = "pest_meta"
version = "2.5.4"
version = "2.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f02b677c1859756359fc9983c2e56a0237f18624a3789528804406b7e915e5d"
checksum = "e6f60b2ba541577e2a0c307c8f39d1439108120eb7903adeb6497fa880c59616"
dependencies = [
"once_cell",
"pest",
"sha2",
]
[[package]]
name = "petgraph"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dd7d28ee937e54fe3080c91faa1c3a46c06de6252988a7f4592ba2310ef22a4"
dependencies = [
"fixedbitset",
"indexmap",
]
[[package]]
name = "pin-project"
version = "1.0.12"
@@ -1925,6 +1956,16 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "prettyplease"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e97e3215779627f01ee256d2fad52f3d95e8e1c11e9fc6fd08f7cd455d5d5c78"
dependencies = [
"proc-macro2",
"syn",
]
[[package]]
name = "proc-macro-error"
version = "1.0.4"
@@ -1951,9 +1992,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.50"
version = "1.0.51"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ef7d57beacfaf2d8aee5937dab7b7f28de3cb8b1828479bb5de2a7106f2bae2"
checksum = "5d727cae5b39d21da60fa540906919ad737832fe0b1c165da3a34d6548c849d6"
dependencies = [
"unicode-ident",
]
@@ -1983,6 +2024,28 @@ dependencies = [
"prost-derive",
]
[[package]]
name = "prost-build"
version = "0.11.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3f8ad728fb08fe212df3c05169e940fbb6d9d16a877ddde14644a983ba2012e"
dependencies = [
"bytes",
"heck",
"itertools",
"lazy_static",
"log",
"multimap",
"petgraph",
"prettyplease",
"prost",
"prost-types",
"regex",
"syn",
"tempfile",
"which",
]
[[package]]
name = "prost-derive"
version = "0.11.6"
@@ -2196,9 +2259,9 @@ dependencies = [
[[package]]
name = "raw-cpuid"
version = "10.6.0"
version = "10.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6823ea29436221176fe662da99998ad3b4db2c7f31e7b6f5fe43adccd6320bb"
checksum = "c307f7aacdbab3f0adee67d52739a1d71112cc068d6fab169ddeb18e48877fad"
dependencies = [
"bitflags",
]
@@ -2320,16 +2383,16 @@ dependencies = [
[[package]]
name = "rustix"
version = "0.36.7"
version = "0.36.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fdebc4b395b7fbb9ab11e462e20ed9051e7b16e42d24042c776eca0ac81b03"
checksum = "f43abb88211988493c1abb44a70efa56ff0ce98f233b7b276146f1f3f7ba9644"
dependencies = [
"bitflags",
"errno",
"io-lifetimes",
"libc",
"linux-raw-sys",
"windows-sys 0.42.0",
"windows-sys 0.45.0",
]
[[package]]
@@ -2471,9 +2534,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.91"
version = "1.0.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877c235533714907a8c2464236f5c4b2a17262ef1bd71f38f35ea592c8da6883"
checksum = "cad406b69c91885b5107daf2c29572f6c8cdb3c66826821e286c533490c0bc76"
dependencies = [
"indexmap",
"itoa",
@@ -2525,9 +2588,9 @@ dependencies = [
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0"
checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1"
dependencies = [
"libc",
]
@@ -2752,10 +2815,11 @@ dependencies = [
[[package]]
name = "thread_local"
version = "1.1.4"
version = "1.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180"
checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152"
dependencies = [
"cfg-if",
"once_cell",
]
@@ -2781,9 +2845,9 @@ dependencies = [
[[package]]
name = "tinyvec_macros"
version = "0.1.0"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
@@ -2829,9 +2893,9 @@ dependencies = [
[[package]]
name = "tokio-native-tls"
version = "0.3.0"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b"
checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2"
dependencies = [
"native-tls",
"tokio",
@@ -2873,9 +2937,9 @@ dependencies = [
[[package]]
name = "tokio-util"
version = "0.7.4"
version = "0.7.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740"
checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2"
dependencies = [
"bytes",
"futures-core",
@@ -2926,6 +2990,19 @@ dependencies = [
"tracing-futures",
]
[[package]]
name = "tonic-build"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bf5e9b9c0f7e0a7c027dcfaba7b2c60816c7049171f679d99ee2ff65d0de8c4"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build",
"quote",
"syn",
]
[[package]]
name = "tower"
version = "0.4.13"
@@ -3353,6 +3430,17 @@ dependencies = [
"cc",
]
[[package]]
name = "which"
version = "4.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2441c784c52b289a054b7201fc93253e288f094e2f4be9058343127c4226a269"
dependencies = [
"either",
"libc",
"once_cell",
]
[[package]]
name = "whoami"
version = "1.3.0"

View File

@@ -1,6 +1,6 @@
[package]
name = "nostr-rs-relay"
version = "0.8.2"
version = "0.8.5"
edition = "2021"
authors = ["Greg Heartsfield <scsibug@imap.cc>"]
description = "A relay implementation for the Nostr protocol"
@@ -16,6 +16,8 @@ clap = { version = "4.0.32", features = ["env", "default", "derive"]}
tracing = "0.1.36"
tracing-subscriber = "0.2.0"
tokio = { version = "1", features = ["full", "tracing", "signal"] }
prost = "0.11"
tonic = "0.8.3"
console-subscriber = "0.1.8"
futures = "0.3"
futures-util = "0.3"
@@ -52,3 +54,6 @@ bech32 = "0.9.1"
[dev-dependencies]
anyhow = "1"
[build-dependencies]
tonic-build = { version="0.8.3", features = ["prost"] }

View File

@@ -1,5 +1,7 @@
FROM docker.io/library/rust:1.67.0 as builder
FROM docker.io/library/rust:1-bookworm as builder
RUN apt-get update \
&& apt-get install -y cmake protobuf-compiler \
&& rm -rf /var/lib/apt/lists/*
RUN USER=root cargo install cargo-auditable
RUN USER=root cargo new --bin nostr-rs-relay
WORKDIR ./nostr-rs-relay
@@ -12,12 +14,14 @@ RUN rm src/*.rs
# copy project source code
COPY ./src ./src
COPY ./proto ./proto
COPY ./build.rs ./build.rs
# build auditable release using locked deps
RUN rm ./target/release/deps/nostr*relay*
RUN cargo auditable build --release --locked
FROM docker.io/library/debian:bullseye-slim
FROM docker.io/library/debian:bookworm-slim
ARG APP=/usr/src/app
ARG APP_DATA=/usr/src/app/db

4
build.rs Normal file
View File

@@ -0,0 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/nauthz.proto")?;
Ok(())
}

View File

@@ -48,6 +48,16 @@ description = "A newly created nostr-rs-relay.\n\nCustomize this with your own i
# sqlite.
#connection = "postgresql://postgres:nostr@localhost:7500/nostr"
[grpc]
# gRPC interfaces for externalized decisions and other extensions to
# functionality.
#
# Events can be authorized through an external service, by providing
# the URL below. In the event the server is not accessible, events
# will be permitted. The protobuf3 schema used is available in
# `proto/nauthz.proto`.
# event_admission_server = "http://[::1]:50051"
[network]
# Bind to this network address
address = "0.0.0.0"
@@ -79,10 +89,10 @@ reject_future_seconds = 1800
#
#messages_per_sec = 5
# Limit client subscriptions created per second, averaged over one
# minute. Must be an integer. If not set (or set to 0), defaults to
# unlimited. Strongly recommended to set this to a low value such as
# 10 to ensure fair service.
# Limit client subscriptions created, averaged over one minute. Must
# be an integer. If not set (or set to 0), defaults to unlimited.
# Strongly recommended to set this to a low value such as 10 to ensure
# fair service.
#subscriptions_per_min = 0
# UNIMPLEMENTED...

View File

@@ -7,7 +7,7 @@ intervention. For heavily trafficked relays, there are a number of
steps that the operator may need to take to maintain performance and
limit disk usage.
This maintenance guide is current as of version `0.7.14`. Future
This maintenance guide is current as of version `0.8.2`. Future
versions may incorporate and automate some of these steps.
## Backing Up the Database
@@ -43,18 +43,15 @@ vacuum;
## Clearing Hidden Events
When events are deleted, either through deletion events, metadata or
follower updates, or a replaceable event kind, the event is not
actually removed from the database. Instead, a flag `HIDDEN` is set
to true for the event, which excludes it from search results. The
original intent was to ensure that subsequent rebroadcasts of the
event would be easily detected as having been deleted, and would not
need to be stored again. In practice, this decision causes excessive
growth of the `tags` table, since all the previous followers are
retained for those `HIDDEN` events.
When events are deleted, the event is not actually removed from the
database. Instead, a flag `HIDDEN` is set to true for the event,
which excludes it from search results. High volume replacements from
profile or other replaceable events are deleted, not hidden, in the
current version of the relay.
The `event` and especially the `tag` table can be significantly
reduced in size by running these commands:
In the current version, removing hidden events should not result in
significant space savings, but it can still be used if there is no
desire to hold on to events that can never be re-broadcast.
```console
PRAGMA foreign_keys = ON;

79
docs/grpc-extensions.md Normal file
View File

@@ -0,0 +1,79 @@
# gRPC Extensions Design Document
The relay will be extensible through gRPC endpoints, definable in the
main configuration file. These will allow external programs to host
logic for deciding things such as, should this event be persisted,
should this connection be allowed, and should this subscription
request be registered. The primary goal is allow for relay operator
specific functionality that allows them to serve smaller communities
and reduce spam and abuse.
This will likely evolve substantially, the first goal is to get a
basic one-way service that lets an externalized program decide on
event persistance. This does not represent the final state of gRPC
extensibility in `nostr-rs-relay`.
## Considerations
Write event latency must not be significantly affected. However, the
primary reason we are implementing this is spam/abuse protection, so
we are willing to tolerate some increase in latency if that protects
us against outages!
The interface should provide enough information to make simple
decisions, without burdening the relay to do extra queries. The
decision endpoint will be mostly responsible for maintaining state and
gathering additional details.
## Design Overview
A gRPC server may be defined in the `config.toml` file. If it exists,
the relay will attempt to connect to it and send a message for each
`EVENT` command submitted by clients. If a successful response is
returned indicating the event is permitted, the relay continues
processing the event as normal. All existing whitelist, blacklist,
and `NIP-05` validation checks are still performed and MAY still
result in the event being rejected. If a successful response is
returned indicated the decision is anything other than permit, then
the relay MUST reject the event, and return a command result to the
user (using `NIP-20`) indicating the event was blocked (optionally
providing a message).
In the event there is an error in the gRPC interface, event processing
proceeds as if gRPC was disabled (fail open). This allows gRPC
servers to be deployed with minimal chance of causing a full relay
outage.
## Design Details
Currently one procedure call is supported, `EventAdmit`, in the
`Authorization` service. It accepts the following data in order to
support authorization decisions:
- The event itself
- The client IP that submitted the event
- The client's HTTP origin header, if one exists
- The client's HTTP user agent header, if one exists
- The public key of the client, if `NIP-42` authentication was
performed (not supported in the relay yet!)
- The `NIP-05` associated with the event's public key, if it is known
to the relay
A server providing authorization decisions will return the following:
- A decision to permit or deny the event
- An optional message that explains why the event was denied, to be
transmitted to the client
## Security Issues
There is little attempt to secure this interface, since it is intended
for use processes running on the same host. It is recommended to
ensure that the gRPC server providing the API is not exposed to the
public Internet. Authorization server implementations should have
their own security reviews performed.
A slow gRPC server could cause availability issues for event
processing, since this is performed on a single thread. Avoid any
expensive or long-running processes that could result from submitted
events, since any client can initiate a gRPC call to the service.

1010
nauthz_server_example/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,13 @@
[package]
name = "nauthz-server"
version = "0.1.0"
edition = "2021"
[dependencies]
# Common dependencies
tokio = { version = "1.0", features = ["rt-multi-thread", "macros"] }
prost = "0.11"
tonic = "0.8.3"
[build-dependencies]
tonic-build = { version="0.8.3", features = ["prost"] }

View File

@@ -0,0 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("../proto/nauthz.proto")?;
Ok(())
}

View File

@@ -0,0 +1,61 @@
use tonic::{transport::Server, Request, Response, Status};
use nauthz_grpc::authorization_server::{Authorization, AuthorizationServer};
use nauthz_grpc::{EventReply, EventRequest, Decision};
pub mod nauthz_grpc {
tonic::include_proto!("nauthz");
}
#[derive(Default)]
pub struct EventAuthz {
allowed_kinds: Vec<u64>,
}
#[tonic::async_trait]
impl Authorization for EventAuthz {
async fn event_admit(
&self,
request: Request<EventRequest>,
) -> Result<Response<EventReply>, Status> {
let reply;
let req = request.into_inner();
let event = req.event.unwrap();
let content_prefix:String = event.content.chars().take(40).collect();
println!("recvd event, [kind={}, origin={:?}, nip05_domain={:?}, tag_count={}, content_sample={:?}]",
event.kind, req.origin, req.nip05.map(|x| x.domain), event.tags.len(), content_prefix);
// Permit any event with a whitelisted kind
if self.allowed_kinds.contains(&event.kind) {
println!("This looks fine! (kind={})",event.kind);
reply = nauthz_grpc::EventReply {
decision: Decision::Permit as i32,
message: None
};
} else {
println!("Blocked! (kind={})",event.kind);
reply = nauthz_grpc::EventReply {
decision: Decision::Deny as i32,
message: Some(format!("kind {} not permitted", event.kind)),
};
}
Ok(Response::new(reply))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse().unwrap();
// A simple authorization engine that allows kinds 0-3
let checker = EventAuthz {
allowed_kinds: vec![0,1,2,3],
};
println!("EventAuthz Server listening on {}", addr);
// Start serving
Server::builder()
.add_service(AuthorizationServer::new(checker))
.serve(addr)
.await?;
Ok(())
}

60
proto/nauthz.proto Normal file
View File

@@ -0,0 +1,60 @@
syntax = "proto3";
// Nostr Authorization Services
package nauthz;
// Authorization for actions against a relay
service Authorization {
// Determine if an event should be admitted to the relay
rpc EventAdmit(EventRequest) returns (EventReply) {}
}
message Event {
bytes id = 1; // 32-byte SHA256 hash of serialized event
bytes pubkey = 2; // 32-byte public key of event creator
fixed64 created_at = 3; // UNIX timestamp provided by event creator
uint64 kind = 4; // event kind
string content = 5; // arbitrary event contents
repeated TagEntry tags = 6; // event tag array
bytes sig = 7; // 32-byte signature of the event id
// Individual values for a single tag
message TagEntry {
repeated string values = 1;
}
}
// Event data and metadata for authorization decisions
message EventRequest {
Event event =
1; // the event to be admitted for further relay processing
optional string ip_addr =
2; // IP address of the client that submitted the event
optional string origin =
3; // HTTP origin header from the client, if one exists
optional string user_agent =
4; // HTTP user-agent header from the client, if one exists
optional bytes auth_pubkey =
5; // the public key associated with a NIP-42 AUTH'd session, if
// authentication occurred
optional Nip05Name nip05 =
6; // NIP-05 address associated with the event pubkey, if it is
// known and has been validated by the relay
// A NIP_05 verification record
message Nip05Name {
string local = 1;
string domain = 2;
}
}
// A permit or deny decision
enum Decision {
DECISION_UNSPECIFIED = 0;
DECISION_PERMIT = 1; // Admit this event for further processing
DECISION_DENY = 2; // Deny persisting or propagating this event
}
// Response to a event authorization request
message EventReply {
Decision decision = 1; // decision to enforce
optional string message = 2; // informative message for the client
}

View File

@@ -92,6 +92,8 @@ http {
location / {
proxy_pass http://localhost:8080;
proxy_http_version 1.1;
proxy_read_timeout 1d;
proxy_send_timeout 1d;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
proxy_set_header Host $host;

View File

@@ -20,7 +20,7 @@ pub fn main() -> Result<()> {
let _trace_sub = tracing_subscriber::fmt::try_init();
println!("Nostr-rs-relay Bulk Loader");
// check for a database file, or create one.
let settings = config::Settings::new();
let settings = config::Settings::new(&None);
if !Path::new(&settings.database.data_directory).is_dir() {
info!("Database directory does not exist");
return Err(Error::DatabaseDirError);
@@ -35,7 +35,7 @@ pub fn main() -> Result<()> {
// ensure the schema version is current.
if version != DB_VERSION {
info!("version is not current, exiting");
panic!("cannot write to schema other than v{}", DB_VERSION);
panic!("cannot write to schema other than v{DB_VERSION}");
}
}
// this channel will contain parsed events ready to be inserted

View File

@@ -10,4 +10,11 @@ pub struct CLIArgs {
required = false,
)]
pub db: Option<String>,
#[arg(
short,
long,
help = "Use the <file name> as the location of the config file",
required = false,
)]
pub config: Option<String>,
}

View File

@@ -25,6 +25,12 @@ pub struct Database {
pub connection: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[allow(unused)]
pub struct Grpc {
pub event_admission_server: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[allow(unused)]
pub struct Network {
@@ -145,6 +151,7 @@ pub struct Settings {
pub info: Info,
pub diagnostics: Diagnostics,
pub database: Database,
pub grpc: Grpc,
pub network: Network,
pub limits: Limits,
pub authorization: Authorization,
@@ -155,10 +162,10 @@ pub struct Settings {
impl Settings {
#[must_use]
pub fn new() -> Self {
pub fn new(config_file_name: &Option<String>) -> Self {
let default_settings = Self::default();
// attempt to construct settings with file
let from_file = Self::new_from_default(&default_settings);
let from_file = Self::new_from_default(&default_settings, config_file_name);
match from_file {
Ok(f) => f,
Err(e) => {
@@ -168,13 +175,19 @@ impl Settings {
}
}
fn new_from_default(default: &Settings) -> Result<Self, ConfigError> {
fn new_from_default(default: &Settings, config_file_name: &Option<String>) -> Result<Self, ConfigError> {
let default_config_file_name = "config.toml".to_string();
let config: &String = match config_file_name {
Some(value) => value,
None => &default_config_file_name
};
let builder = Config::builder();
let config: Config = builder
// use defaults
.add_source(Config::try_from(default)?)
// override with file contents
.add_source(File::with_name("config.toml"))
.add_source(File::with_name(config))
.build()?;
let mut settings: Settings = config.try_deserialize()?;
// ensure connection pool size is logical
@@ -214,6 +227,9 @@ impl Default for Settings {
max_conn: 8,
connection: "".to_owned(),
},
grpc: Grpc {
event_admission_server: None,
},
network: Network {
port: 8080,
ping_interval_seconds: 300,

View File

@@ -4,6 +4,7 @@ use crate::error::{Error, Result};
use crate::event::Event;
use crate::notice::Notice;
use crate::server::NostrMetrics;
use crate::nauthz;
use governor::clock::Clock;
use governor::{Quota, RateLimiter};
use r2d2;
@@ -26,6 +27,9 @@ pub type PooledConnection = r2d2::PooledConnection<r2d2_sqlite::SqliteConnection
pub struct SubmittedEvent {
pub event: Event,
pub notice_tx: tokio::sync::mpsc::Sender<Notice>,
pub source_ip: String,
pub origin: Option<String>,
pub user_agent: Option<String>,
}
/// Database file
@@ -100,6 +104,18 @@ pub async fn db_writer(
lim_opt = Some(RateLimiter::direct(Quota::per_minute(quota)));
}
}
// create a client if GRPC is enabled.
// Check with externalized event admitter service, if one is defined.
let mut grpc_client = if let Some(svr) = settings.grpc.event_admission_server {
Some(nauthz::EventAuthzService::connect(&svr).await)
} else {
None
};
//let gprc_client = settings.grpc.event_admission_server.map(|s| {
// event_admitter_connect(&s);
// });
loop {
if shutdown.try_recv().is_ok() {
info!("shutting down database writer");
@@ -164,9 +180,16 @@ pub async fn db_writer(
metadata_tx.send(event.clone()).ok();
}
// get a validation result for use in verification and GPRC
let validation = if nip05_active {
Some(repo.get_latest_user_verification(&event.pubkey).await)
} else {
None
};
// check for NIP-05 verification
if nip05_enabled {
match repo.get_latest_user_verification(&event.pubkey).await {
if nip05_enabled && validation.is_some() {
match validation.as_ref().unwrap() {
Ok(uv) => {
if uv.is_valid(&settings.verified_users) {
info!(
@@ -174,6 +197,7 @@ pub async fn db_writer(
uv.name.to_string(),
event.get_author_prefix()
);
} else {
info!(
"rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)",
@@ -208,6 +232,35 @@ pub async fn db_writer(
}
}
}
// nip05 address
let nip05_address : Option<crate::nip05::Nip05Name> = validation.and_then(|x| x.ok().map(|y| y.name));
// GRPC check
if let Some(ref mut c) = grpc_client {
trace!("checking if grpc permits");
let grpc_start = Instant::now();
let decision_res = c.admit_event(&event, &subm_event.source_ip, subm_event.origin, subm_event.user_agent, nip05_address).await;
match decision_res {
Ok(decision) => {
if !decision.permitted() {
// GPRC returned a decision to reject this event
info!("GRPC rejected event: {:?} (kind: {}) from: {:?} in: {:?} (IP: {:?})",
event.get_event_id_prefix(),
event.kind,
event.get_author_prefix(),
grpc_start.elapsed(),
subm_event.source_ip);
notice_tx.try_send(Notice::blocked(event.id, &decision.message().unwrap_or_else(|| "".to_string()))).ok();
continue;
}
},
Err(e) => {
warn!("GRPC server error: {:?}", e);
}
}
}
// TODO: cache recent list of authors to remove a DB call.
let start = Instant::now();
if event.is_ephemeral() {
@@ -227,11 +280,12 @@ pub async fn db_writer(
notice_tx.try_send(Notice::duplicate(event.id)).ok();
} else {
info!(
"persisted event: {:?} (kind: {}) from: {:?} in: {:?}",
"persisted event: {:?} (kind: {}) from: {:?} in: {:?} (IP: {:?})",
event.get_event_id_prefix(),
event.kind,
event.get_author_prefix(),
start.elapsed()
start.elapsed(),
subm_event.source_ip,
);
event_write = true;
// send this out to all clients

View File

@@ -108,7 +108,7 @@ impl ConditionQuery {
sigstr: &str,
) -> Option<ConditionQuery> {
// form the token
let tok = format!("nostr:delegation:{}:{}", delegatee, cond_query);
let tok = format!("nostr:delegation:{delegatee}:{cond_query}");
// form SHA256 hash
let digest: sha256::Hash = sha256::Hash::hash(tok.as_bytes());
let sig = schnorr::Signature::from_str(sigstr).unwrap();

View File

@@ -62,6 +62,12 @@ pub enum Error {
HexError(hex::FromHexError),
#[error("Delegation parse error")]
DelegationParseError,
#[error("Channel closed error")]
ChannelClosed,
#[error("Authz error")]
AuthzError,
#[error("Tonic GRPC error")]
TonicError(tonic::Status),
#[error("Unknown/Undocumented")]
UnknownError,
}
@@ -130,3 +136,10 @@ impl From<config::ConfigError> for Error {
Error::ConfigError(r)
}
}
impl From<tonic::Status> for Error {
/// Wrap Config error
fn from(r: tonic::Status) -> Self {
Error::TonicError(r)
}
}

View File

@@ -143,9 +143,9 @@ impl Event {
let default = "".to_string();
let dvals:Vec<&String> = self.tags
.iter()
.filter(|x| x.len() >= 1)
.filter(|x| !x.is_empty())
.filter(|x| x.get(0).unwrap() == "d")
.map(|x| x.get(1).unwrap_or_else(|| &default)).take(1)
.map(|x| x.get(1).unwrap_or(&default)).take(1)
.collect();
let dval_first = dvals.get(0);
match dval_first {
@@ -292,7 +292,7 @@ impl Event {
let c = c_opt.unwrap();
// * compute the sha256sum.
let digest: sha256::Hash = sha256::Hash::hash(c.as_bytes());
let hex_digest = format!("{:x}", digest);
let hex_digest = format!("{digest:x}");
// * ensure the id matches the computed sha256sum.
if self.id != hex_digest {
debug!("event id does not match digest");

View File

@@ -9,6 +9,7 @@ pub mod event;
pub mod hexrange;
pub mod info;
pub mod nip05;
pub mod nauthz;
pub mod notice;
pub mod repo;
pub mod subscription;

View File

@@ -11,9 +11,14 @@ use console_subscriber::ConsoleLayer;
/// Start running a Nostr relay server.
fn main() {
// configure settings from config.toml
// replace default settings with those read from config.toml
let mut settings = config::Settings::new();
let args = CLIArgs::parse();
// get config file name from args
let config_file_arg = args.config;
// configure settings from the config file (defaults to config.toml)
// replace default settings with those read from the config file
let mut settings = config::Settings::new(&config_file_arg);
// setup tracing
if settings.diagnostics.tracing {
@@ -25,8 +30,6 @@ fn main() {
}
info!("Starting up from main");
let args = CLIArgs::parse();
// get database directory from args
let db_dir_arg = args.db;

110
src/nauthz.rs Normal file
View File

@@ -0,0 +1,110 @@
use crate::error::{Error, Result};
use crate::{event::Event, nip05::Nip05Name};
use nauthz_grpc::authorization_client::AuthorizationClient;
use nauthz_grpc::event::TagEntry;
use nauthz_grpc::{Decision, Event as GrpcEvent, EventReply, EventRequest};
use tracing::{info, warn};
pub mod nauthz_grpc {
tonic::include_proto!("nauthz");
}
// A decision for the DB to act upon
pub trait AuthzDecision: Send + Sync {
fn permitted(&self) -> bool;
fn message(&self) -> Option<String>;
}
impl AuthzDecision for EventReply {
fn permitted(&self) -> bool {
self.decision == Decision::Permit as i32
}
fn message(&self) -> Option<String> {
self.message.clone()
}
}
// A connection to an event admission GRPC server
pub struct EventAuthzService {
server_addr: String,
conn: Option<AuthorizationClient<tonic::transport::Channel>>,
}
// conversion of Nip05Names into GRPC type
impl std::convert::From<Nip05Name> for nauthz_grpc::event_request::Nip05Name {
fn from(value: Nip05Name) -> Self {
nauthz_grpc::event_request::Nip05Name {
local: value.local.clone(),
domain: value.domain.clone(),
}
}
}
// conversion of event tags into gprc struct
fn tags_to_protobuf(tags: &Vec<Vec<String>>) -> Vec<TagEntry> {
tags.iter()
.map(|x| TagEntry { values: x.clone() })
.collect()
}
impl EventAuthzService {
pub async fn connect(server_addr: &str) -> EventAuthzService {
let mut eas = EventAuthzService {
server_addr: server_addr.to_string(),
conn: None,
};
eas.ready_connection().await;
eas
}
pub async fn ready_connection(self: &mut Self) {
if self.conn.is_none() {
let client = AuthorizationClient::connect(self.server_addr.to_string()).await;
if let Err(ref msg) = client {
warn!("could not connect to nostr authz GRPC server: {:?}", msg);
} else {
info!("connected to nostr authorization GRPC server");
}
self.conn = client.ok();
}
}
pub async fn admit_event(
self: &mut Self,
event: &Event,
ip: &str,
origin: Option<String>,
user_agent: Option<String>,
nip05: Option<Nip05Name>,
) -> Result<Box<dyn AuthzDecision>> {
self.ready_connection().await;
let id_blob = hex::decode(&event.id)?;
let pubkey_blob = hex::decode(&event.pubkey)?;
let sig_blob = hex::decode(&event.sig)?;
if let Some(ref mut c) = self.conn {
let gevent = GrpcEvent {
id: id_blob,
pubkey: pubkey_blob,
sig: sig_blob,
created_at: event.created_at,
kind: event.kind,
content: event.content.clone(),
tags: tags_to_protobuf(&event.tags),
};
let svr_res = c
.event_admit(EventRequest {
event: Some(gevent),
ip_addr: Some(ip.to_string()),
origin,
user_agent,
auth_pubkey: None,
nip05: nip05.map(|x| nauthz_grpc::event_request::Nip05Name::from(x)),
})
.await?;
let reply = svr_res.into_inner();
return Ok(Box::new(reply));
} else {
return Err(Error::AuthzError);
}
}
}

View File

@@ -42,8 +42,8 @@ pub struct Verifier {
/// A NIP-05 identifier is a local part and domain.
#[derive(PartialEq, Eq, Debug, Clone)]
pub struct Nip05Name {
local: String,
domain: String,
pub local: String,
pub domain: String,
}
impl Nip05Name {
@@ -107,7 +107,7 @@ impl std::fmt::Display for Nip05Name {
/// Check if the specified username and address are present and match in this response body
fn body_contains_user(username: &str, address: &str, bytes: &hyper::body::Bytes) -> Result<bool> {
// convert the body into json
let body: serde_json::Value = serde_json::from_slice(&bytes)?;
let body: serde_json::Value = serde_json::from_slice(bytes)?;
// ensure we have a names object.
let names_map = body
.as_object()
@@ -257,8 +257,15 @@ impl Verifier {
// run a loop, restarting on failure
loop {
let res = self.run_internal().await;
if let Err(e) = res {
match res {
Err(Error::ChannelClosed) => {
// channel was closed, we are shutting down
return;
},
Err(e) => {
info!("error in verifier: {:?}", e);
},
_ => {}
}
}
}
@@ -305,6 +312,7 @@ impl Verifier {
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
info!("metadata broadcast channel closed");
return Err(Error::ChannelClosed);
}
}
},

View File

@@ -77,26 +77,25 @@ impl NostrRepo for PostgresRepo {
}
}
if let Some(d_tag) = e.distinct_param() {
let repl_count:i64;
if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) {
repl_count = sqlx::query_scalar(
let repl_count:i64 = if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) {
sqlx::query_scalar(
"SELECT count(*) AS count FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.pub_key=$1 AND e.kind=$2 AND t.name='d' AND t.value_hex=$3 AND e.created_at >= $4 LIMIT 1;")
.bind(hex::decode(&e.pubkey).ok())
.bind(e.kind as i64)
.bind(hex::decode(d_tag).ok())
.bind(Utc.timestamp_opt(e.created_at as i64, 0).unwrap())
.fetch_one(&mut tx)
.await?;
.await?
} else {
repl_count = sqlx::query_scalar(
sqlx::query_scalar(
"SELECT count(*) AS count FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.pub_key=$1 AND e.kind=$2 AND t.name='d' AND t.value=$3 AND e.created_at >= $4 LIMIT 1;")
.bind(hex::decode(&e.pubkey).ok())
.bind(e.kind as i64)
.bind(d_tag.as_bytes())
.bind(Utc.timestamp_opt(e.created_at as i64, 0).unwrap())
.fetch_one(&mut tx)
.await?;
}
.await?
};
// if any rows were returned, then some newer event with
// the same author/kind/tag value exist, and we can ignore
// this event.
@@ -178,22 +177,21 @@ ON CONFLICT (id) DO NOTHING"#,
// parameterized replaceable events
// check for parameterized replaceable events that would be hidden; don't insert these either.
if let Some(d_tag) = e.distinct_param() {
let update_count;
if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) {
update_count = sqlx::query("DELETE FROM event WHERE kind=$1 AND pub_key=$2 AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=$1 AND e.pub_key=$2 AND t.name='d' AND t.value_hex=$3 ORDER BY created_at DESC OFFSET 1);")
let update_count = if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) {
sqlx::query("DELETE FROM event WHERE kind=$1 AND pub_key=$2 AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=$1 AND e.pub_key=$2 AND t.name='d' AND t.value_hex=$3 ORDER BY created_at DESC OFFSET 1);")
.bind(e.kind as i64)
.bind(hex::decode(&e.pubkey).ok())
.bind(hex::decode(d_tag).ok())
.execute(&mut tx)
.await?.rows_affected();
.await?.rows_affected()
} else {
update_count = sqlx::query("DELETE FROM event WHERE kind=$1 AND pub_key=$2 AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=$1 AND e.pub_key=$2 AND t.name='d' AND t.value=$3 ORDER BY created_at DESC OFFSET 1);")
sqlx::query("DELETE FROM event WHERE kind=$1 AND pub_key=$2 AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=$1 AND e.pub_key=$2 AND t.name='d' AND t.value=$3 ORDER BY created_at DESC OFFSET 1);")
.bind(e.kind as i64)
.bind(hex::decode(&e.pubkey).ok())
.bind(d_tag.as_bytes())
.execute(&mut tx)
.await?.rows_affected();
}
.await?.rows_affected()
};
if update_count > 0 {
info!(
"removed {} older parameterized replaceable kind {} events for author: {:?}",

View File

@@ -34,11 +34,15 @@ pub async fn run_migrations(db: &PostgresPool) -> crate::error::Result<usize> {
if m002_result == MigrationResult::Upgraded {
m002::rebuild_tags(db).await?;
}
run_migration(m003::migration(), db).await;
Ok(current_version(db).await as usize)
}
async fn current_version(db: &PostgresPool) -> i64 {
sqlx::query_scalar("SELECT max(serial_number) FROM migrations;").fetch_one(db).await.unwrap()
sqlx::query_scalar("SELECT max(serial_number) FROM migrations;")
.fetch_one(db)
.await
.unwrap()
}
async fn prepare_migrations_table(db: &PostgresPool) {
@@ -77,7 +81,7 @@ async fn run_migration(migration: impl Migration, db: &PostgresPool) -> Migratio
.unwrap();
transaction.commit().await.unwrap();
return MigrationResult::Upgraded;
MigrationResult::Upgraded
}
mod m001 {
@@ -137,15 +141,15 @@ CREATE INDEX user_verification_name_idx ON user_verification USING btree (name);
}
mod m002 {
use async_std::stream::StreamExt;
use indicatif::{ProgressBar, ProgressStyle};
use sqlx::Row;
use std::time::Instant;
use tracing::info;
use async_std::stream::StreamExt;
use sqlx::Row;
use indicatif::{ProgressBar, ProgressStyle};
use crate::repo::postgres_migration::{Migration, SimpleSqlMigration};
use crate::event::{single_char_tagname, Event};
use crate::repo::postgres::PostgresPool;
use crate::event::{Event, single_char_tagname};
use crate::repo::postgres_migration::{Migration, SimpleSqlMigration};
use crate::utils::is_lower_hex;
pub const VERSION: i64 = 2;
@@ -172,23 +176,31 @@ CREATE INDEX tag_value_hex_idx ON tag USING btree (value_hex);
let mut tx = db.begin().await.unwrap();
let mut update_tx = db.begin().await.unwrap();
// Clear out table
sqlx::query("DELETE FROM tag;").execute(&mut update_tx).await?;
sqlx::query("DELETE FROM tag;")
.execute(&mut update_tx)
.await?;
{
let event_count: i64 =
sqlx::query_scalar("SELECT COUNT(*) from event;")
let event_count: i64 = sqlx::query_scalar("SELECT COUNT(*) from event;")
.fetch_one(&mut tx)
.await
.unwrap();
let bar = ProgressBar::new(event_count.try_into().unwrap()).with_message("rebuilding tags table");
bar.set_style(ProgressStyle::with_template("[{elapsed_precise}] {bar:40.white/blue} {pos:>7}/{len:7} [{percent}%] {msg}").unwrap());
let mut events = sqlx::query("SELECT id, content FROM event ORDER BY id;").fetch(&mut tx);
let bar = ProgressBar::new(event_count.try_into().unwrap())
.with_message("rebuilding tags table");
bar.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] {bar:40.white/blue} {pos:>7}/{len:7} [{percent}%] {msg}",
)
.unwrap(),
);
let mut events =
sqlx::query("SELECT id, content FROM event ORDER BY id;").fetch(&mut tx);
while let Some(row) = events.next().await {
bar.inc(1);
// get the row id and content
let row = row.unwrap();
let event_id: Vec<u8> = row.get(0);
let event_bytes: Vec<u8> = row.get(1);
let event:Event = serde_json::from_str(&String::from_utf8(event_bytes).unwrap())?;
let event: Event = serde_json::from_str(&String::from_utf8(event_bytes).unwrap())?;
for t in event.tags.iter().filter(|x| x.len() > 1) {
let tagname = t.get(0).unwrap();
@@ -202,12 +214,21 @@ CREATE INDEX tag_value_hex_idx ON tag USING btree (value_hex);
// this means it needs to be even length and lowercase.
if (tagval.len() % 2 == 0) && is_lower_hex(tagval) {
let q = "INSERT INTO tag (event_id, \"name\", value_hex) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;";
sqlx::query(q).bind(&event_id).bind(&tagname).bind(hex::decode(tagval).ok()).execute(&mut update_tx).await?;
sqlx::query(q)
.bind(&event_id)
.bind(tagname)
.bind(hex::decode(tagval).ok())
.execute(&mut update_tx)
.await?;
} else {
let q = "INSERT INTO tag (event_id, \"name\", value) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;";
sqlx::query(q).bind(&event_id).bind(&tagname).bind(tagval.as_bytes()).execute(&mut update_tx).await?;
sqlx::query(q)
.bind(&event_id)
.bind(tagname)
.bind(tagval.as_bytes())
.execute(&mut update_tx)
.await?;
}
}
}
update_tx.commit().await?;
@@ -217,3 +238,21 @@ CREATE INDEX tag_value_hex_idx ON tag USING btree (value_hex);
Ok(())
}
}
mod m003 {
use crate::repo::postgres_migration::{Migration, SimpleSqlMigration};
pub const VERSION: i64 = 3;
pub fn migration() -> impl Migration {
SimpleSqlMigration {
serial_number: VERSION,
sql: vec![
r#"
-- Add unique constraint on tag
ALTER TABLE tag ADD CONSTRAINT unique_constraint_name UNIQUE (event_id, "name", value);
"#,
],
}
}
}

View File

@@ -6,7 +6,7 @@ use crate::event::{single_char_tagname, Event};
use crate::hexrange::hex_range;
use crate::hexrange::HexSearch;
use crate::repo::sqlite_migration::{STARTUP_SQL,upgrade_db};
use crate::utils::{is_hex, is_lower_hex};
use crate::utils::{is_hex};
use crate::nip05::{Nip05Name, VerificationRecord};
use crate::subscription::{ReqFilter, Subscription};
use crate::server::NostrMetrics;
@@ -123,16 +123,9 @@ impl SqliteRepo {
}
// check for parameterized replaceable events that would be hidden; don't insert these either.
if let Some(d_tag) = e.distinct_param() {
let repl_count;
if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) {
repl_count = tx.query_row(
"SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND e.kind=? AND t.name='d' AND t.value_hex=? AND e.created_at >= ? LIMIT 1;",
params![pubkey_blob, e.kind, hex::decode(d_tag).ok(), e.created_at],|row| row.get::<usize, usize>(0));
} else {
repl_count = tx.query_row(
"SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND e.kind=? AND t.name='d' AND t.value=? AND e.created_at >= ? LIMIT 1;",
params![pubkey_blob, e.kind, d_tag, e.created_at],|row| row.get::<usize, usize>(0));
}
let repl_count = tx.query_row(
"SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND e.kind=? AND t.name='d' AND t.value=? AND e.created_at >= ? LIMIT 1;",
params![pubkey_blob, e.kind, d_tag, e.created_at],|row| row.get::<usize, usize>(0));
// if any rows were returned, then some newer event with
// the same author/kind/tag value exist, and we can ignore
// this event.
@@ -163,18 +156,10 @@ impl SqliteRepo {
let tagchar_opt = single_char_tagname(tagname);
match &tagchar_opt {
Some(_) => {
// if tagvalue is lowercase hex;
if is_lower_hex(tagval) && (tagval.len() % 2 == 0) {
tx.execute(
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
params![ev_id, &tagname, hex::decode(tagval).ok()],
)?;
} else {
tx.execute(
"INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)",
params![ev_id, &tagname, &tagval],
)?;
}
tx.execute(
"INSERT OR IGNORE INTO tag (event_id, name, value, kind, created_at) VALUES (?1, ?2, ?3, ?4, ?5)",
params![ev_id, &tagname, &tagval, e.kind, e.created_at],
)?;
}
None => {}
}
@@ -201,16 +186,9 @@ impl SqliteRepo {
}
// if this event is parameterized replaceable, remove other events.
if let Some(d_tag) = e.distinct_param() {
let update_count;
if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) {
update_count = tx.execute(
"DELETE FROM event WHERE kind=? AND author=? AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=? AND e.author=? AND t.name='d' AND t.value_hex=? ORDER BY created_at DESC LIMIT -1 OFFSET 1);",
params![e.kind, pubkey_blob, e.kind, pubkey_blob, hex::decode(d_tag).ok()])?;
} else {
update_count = tx.execute(
"DELETE FROM event WHERE kind=? AND author=? AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=? AND e.author=? AND t.name='d' AND t.value=? ORDER BY created_at DESC LIMIT -1 OFFSET 1);",
params![e.kind, pubkey_blob, e.kind, pubkey_blob, d_tag])?;
}
let update_count = tx.execute(
"DELETE FROM event WHERE kind=? AND author=? AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=? AND e.author=? AND t.name='d' AND t.value=? ORDER BY t.created_at DESC LIMIT -1 OFFSET 1);",
params![e.kind, pubkey_blob, e.kind, pubkey_blob, d_tag])?;
if update_count > 0 {
info!(
"removed {} older parameterized replaceable kind {} events for author: {:?}",
@@ -245,8 +223,8 @@ impl SqliteRepo {
// check if a deletion has already been recorded for this event.
// Only relevant for non-deletion events
let del_count = tx.query_row(
"SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND t.name='e' AND e.kind=5 AND t.value_hex=? LIMIT 1;",
params![pubkey_blob, id_blob], |row| row.get::<usize, usize>(0));
"SELECT e.id FROM event e WHERE e.author=? AND e.id IN (SELECT t.event_id FROM tag t WHERE t.name='e' AND t.kind=5 AND t.value=?) LIMIT 1;",
params![pubkey_blob, e.id], |row| row.get::<usize, usize>(0));
// check if a the query returned a result, meaning we should
// hid the current event
if del_count.ok().is_some() {
@@ -364,9 +342,8 @@ impl NostrRepo for SqliteRepo {
for filter in sub.filters.iter() {
let filter_start = Instant::now();
filter_count += 1;
let (q, p, idx) = query_from_filter(&filter);
let sql_gen_elapsed = start.elapsed();
let sql_gen_elapsed = filter_start.elapsed();
let (q, p, idx) = query_from_filter(filter);
if sql_gen_elapsed > Duration::from_millis(10) {
debug!("SQL (slow) generated in {:?}", filter_start.elapsed());
}
@@ -720,8 +697,8 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
// check if the index needs to be overriden
let idx_name = override_index(f);
let idx_stmt = idx_name.as_ref().map_or_else(|| "".to_owned(), |i| format!("INDEXED BY {}",i));
let mut query = format!("SELECT e.content FROM event e {}", idx_stmt);
let idx_stmt = idx_name.as_ref().map_or_else(|| "".to_owned(), |i| format!("INDEXED BY {i}"));
let mut query = format!("SELECT e.content FROM event e {idx_stmt}");
// query parameters for SQLite
let mut params: Vec<Box<dyn ToSql>> = vec![];
@@ -804,61 +781,44 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
if let Some(map) = &f.tags {
for (key, val) in map.iter() {
let mut str_vals: Vec<Box<dyn ToSql>> = vec![];
let mut blob_vals: Vec<Box<dyn ToSql>> = vec![];
for v in val {
if (v.len() % 2 == 0) && is_lower_hex(v) {
if let Ok(h) = hex::decode(v) {
blob_vals.push(Box::new(h));
}
} else {
str_vals.push(Box::new(v.clone()));
}
str_vals.push(Box::new(v.clone()));
}
// do not mix value and value_hex; this is a temporary special case.
if str_vals.len() == 0 {
// create clauses with "?" params for each tag value being searched
let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
// find evidence of the target tag name/value existing for this event.
let tag_clause = format!(
"e.id IN (SELECT t.event_id FROM tag t WHERE (name=? AND {}))",
blob_clause
);
// add the tag name as the first parameter
params.push(Box::new(key.to_string()));
// add all tag values that are blobs as params
params.append(&mut blob_vals);
filter_components.push(tag_clause);
} else if blob_vals.len() == 0 {
// create clauses with "?" params for each tag value being searched
let str_clause = format!("value IN ({})", repeat_vars(str_vals.len()));
// find evidence of the target tag name/value existing for this event.
let tag_clause = format!(
"e.id IN (SELECT t.event_id FROM tag t WHERE (name=? AND {}))",
str_clause
);
// add the tag name as the first parameter
params.push(Box::new(key.to_string()));
// add all tag values that are blobs as params
params.append(&mut str_vals);
filter_components.push(tag_clause);
// create clauses with "?" params for each tag value being searched
let str_clause = format!("AND value IN ({})", repeat_vars(str_vals.len()));
// find evidence of the target tag name/value existing for this event.
// Query for Kind/Since/Until additionally, to reduce the number of tags that come back.
let kind_clause;
let since_clause;
let until_clause;
if let Some(ks) = &f.kinds {
// kind is number, no escaping needed
let str_kinds: Vec<String> = ks.iter().map(std::string::ToString::to_string).collect();
kind_clause = format!("AND kind IN ({})", str_kinds.join(", "));
} else {
debug!("mixed string/blob query");
// create clauses with "?" params for each tag value being searched
let str_clause = format!("value IN ({})", repeat_vars(str_vals.len()));
let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
// find evidence of the target tag name/value existing for this event.
let tag_clause = format!(
"e.id IN (SELECT t.event_id FROM tag t WHERE (name=? AND ({} OR {})))",
str_clause, blob_clause
);
// add the tag name as the first parameter
params.push(Box::new(key.to_string()));
// add all tag values that are plain strings as params
params.append(&mut str_vals);
// add all tag values that are blobs as params
params.append(&mut blob_vals);
filter_components.push(tag_clause);
}
kind_clause = format!("");
};
if f.since.is_some() {
since_clause = format!("AND created_at > {}", f.since.unwrap());
} else {
since_clause = format!("");
};
// Query for timestamp
if f.until.is_some() {
until_clause = format!("AND created_at < {}", f.until.unwrap());
} else {
until_clause = format!("");
};
let tag_clause = format!(
"e.id IN (SELECT t.event_id FROM tag t WHERE (name=? {str_clause} {kind_clause} {since_clause} {until_clause}))"
);
// add the tag name as the first parameter
params.push(Box::new(key.to_string()));
// add all tag values that are blobs as params
params.append(&mut str_vals);
filter_components.push(tag_clause);
}
}
// Query for timestamp
@@ -881,7 +841,7 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
// Apply per-filter limit to this subquery.
// The use of a LIMIT implies a DESC order, to capture only the most recent events.
if let Some(lim) = f.limit {
let _ = write!(query, " ORDER BY e.created_at DESC LIMIT {}", lim);
let _ = write!(query, " ORDER BY e.created_at DESC LIMIT {lim}");
} else {
query.push_str(" ORDER BY e.created_at ASC");
}
@@ -908,7 +868,7 @@ fn _query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>, Vec<Stri
// encapsulate subqueries into select statements
let subqueries_selects: Vec<String> = subqueries
.iter()
.map(|s| format!("SELECT distinct content, created_at FROM ({})", s))
.map(|s| format!("SELECT distinct content, created_at FROM ({s})"))
.collect();
let query: String = subqueries_selects.join(" UNION ");
(query, params,indexes)

View File

@@ -10,17 +10,20 @@ use rusqlite::Connection;
use std::cmp::Ordering;
use std::time::Instant;
use tracing::{debug, error, info};
use indicatif::{ProgressBar, ProgressStyle};
/// Startup DB Pragmas
pub const STARTUP_SQL: &str = r##"
PRAGMA main.synchronous = NORMAL;
PRAGMA foreign_keys = ON;
PRAGMA journal_size_limit = 32768;
PRAGMA temp_store = 2; -- use memory, not temp files
PRAGMA main.cache_size = 20000; -- 80MB max cache size per conn
pragma mmap_size = 17179869184; -- cap mmap at 16GB
"##;
/// Latest database version
pub const DB_VERSION: usize = 15;
pub const DB_VERSION: usize = 16;
/// Schema definition
const INIT_SQL: &str = formatcp!(
@@ -63,18 +66,21 @@ CREATE INDEX IF NOT EXISTS author_kind_index ON event(author,kind);
-- Tag values are stored as either a BLOB (if they come in as a
-- hex-string), or TEXT otherwise.
-- This means that searches need to select the appropriate column.
-- We duplicate the kind/created_at to make indexes much more efficient.
CREATE TABLE IF NOT EXISTS tag (
id INTEGER PRIMARY KEY,
event_id INTEGER NOT NULL, -- an event ID that contains a tag.
name TEXT, -- the tag name ("p", "e", whatever)
value TEXT, -- the tag value, if not hex.
value_hex BLOB, -- the tag value, if it can be interpreted as a lowercase hex string.
created_at INTEGER NOT NULL, -- when the event was authored
kind INTEGER NOT NULL, -- event kind
FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS tag_val_index ON tag(value);
CREATE INDEX IF NOT EXISTS tag_val_hex_index ON tag(value_hex);
CREATE INDEX IF NOT EXISTS tag_composite_index ON tag(event_id,name,value_hex,value);
CREATE INDEX IF NOT EXISTS tag_name_eid_index ON tag(name,event_id,value_hex);
CREATE INDEX IF NOT EXISTS tag_composite_index ON tag(event_id,name,value);
CREATE INDEX IF NOT EXISTS tag_name_eid_index ON tag(name,event_id,value);
CREATE INDEX IF NOT EXISTS tag_covering_index ON tag(name,kind,value,created_at,event_id);
-- NIP-05 User Validation
CREATE TABLE IF NOT EXISTS user_verification (
@@ -199,6 +205,9 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<usize> {
if curr_version == 14 {
curr_version = mig_14_to_15(conn)?;
}
if curr_version == 15 {
curr_version = mig_15_to_16(conn)?;
}
if curr_version == DB_VERSION {
info!(
@@ -209,13 +218,12 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<usize> {
}
// Database is current, all is good
Ordering::Equal => {
debug!("Database version was already current (v{})", DB_VERSION);
debug!("Database version was already current (v{DB_VERSION})");
}
// Database is newer than what this code understands, abort
Ordering::Greater => {
panic!(
"Database version is newer than supported by this executable (v{} > v{})",
curr_version, DB_VERSION
"Database version is newer than supported by this executable (v{curr_version} > v{DB_VERSION})",
);
}
}
@@ -651,3 +659,73 @@ PRAGMA user_version = 15;
}
Ok(15)
}
fn mig_15_to_16(conn: &mut PooledConnection) -> Result<usize> {
let count = db_event_count(conn)?;
info!("database schema needs update from 15->16 (this make take a few minutes)");
let upgrade_sql = r##"
DROP TABLE tag;
CREATE TABLE tag (
id INTEGER PRIMARY KEY,
event_id INTEGER NOT NULL, -- an event ID that contains a tag.
name TEXT, -- the tag name ("p", "e", whatever)
value TEXT, -- the tag value, if not hex.
created_at INTEGER NOT NULL, -- when the event was authored
kind INTEGER NOT NULL, -- event kind
FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS tag_val_index ON tag(value);
CREATE INDEX IF NOT EXISTS tag_composite_index ON tag(event_id,name,value);
CREATE INDEX IF NOT EXISTS tag_name_eid_index ON tag(name,event_id,value);
CREATE INDEX IF NOT EXISTS tag_covering_index ON tag(name,kind,value,created_at,event_id);
"##;
let start = Instant::now();
let tx = conn.transaction()?;
let bar = ProgressBar::new(count.try_into().unwrap())
.with_message("rebuilding tags table");
bar.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] {bar:40.white/blue} {pos:>7}/{len:7} [{percent}%] {msg}",
)
.unwrap(),
);
{
tx.execute_batch(upgrade_sql)?;
let mut stmt = tx.prepare("select id, kind, created_at, content from event order by id;")?;
let mut tag_rows = stmt.query([])?;
let mut count = 0;
while let Some(row) = tag_rows.next()? {
count += 1;
if count%10==0 {
bar.inc(10);
}
let event_id: u64 = row.get(0)?;
let kind: u64 = row.get(1)?;
let created_at: u64 = row.get(2)?;
let event_json: String = row.get(3)?;
let event: Event = serde_json::from_str(&event_json)?;
// look at each event, and each tag, creating new tag entries if appropriate.
for t in event.tags.iter().filter(|x| x.len() > 1) {
let tagname = t.get(0).unwrap();
let tagnamechar_opt = single_char_tagname(tagname);
if tagnamechar_opt.is_none() {
continue;
}
// safe because len was > 1
let tagval = t.get(1).unwrap();
// otherwise, insert as text
tx.execute(
"INSERT INTO tag (event_id, name, value, kind, created_at) VALUES (?1, ?2, ?3, ?4, ?5);",
params![event_id, tagname, &tagval, kind, created_at],
)?;
}
}
tx.execute("PRAGMA user_version = 16;", [])?;
}
bar.finish();
tx.commit()?;
info!("database schema upgraded v15 -> v16 in {:?}", start.elapsed());
Ok(16)
}

View File

@@ -50,6 +50,7 @@ use tungstenite::protocol::Message;
use tungstenite::protocol::WebSocketConfig;
/// Handle arbitrary HTTP requests, including for `WebSocket` upgrades.
#[allow(clippy::too_many_arguments)]
async fn handle_web_request(
mut request: Request<Body>,
repo: Arc<dyn NostrRepo>,
@@ -127,9 +128,8 @@ async fn handle_web_request(
// todo: trace, don't print...
Err(e) => println!(
"error when trying to upgrade connection \
from address {} to websocket connection. \
Error is: {}",
remote_addr, e
from address {remote_addr} to websocket connection. \
Error is: {e}",
),
}
});
@@ -139,7 +139,7 @@ async fn handle_web_request(
Err(error) => {
warn!("websocket response failed");
let mut res =
Response::new(Body::from(format!("Failed to create websocket: {}", error)));
Response::new(Body::from(format!("Failed to create websocket: {error}")));
*res.status_mut() = StatusCode::BAD_REQUEST;
return Ok(res);
}
@@ -346,7 +346,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
// give each thread a unique numeric name
static ATOMIC_ID: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1,Ordering::SeqCst);
format!("tokio-ws-{}", id)
format!("tokio-ws-{id}")
})
// limit concurrent SQLite blocking threads
.max_blocking_threads(settings.limits.max_blocking_threads)
@@ -478,7 +478,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
.with_graceful_shutdown(ctrl_c_or_signal(webserver_shutdown_listen));
// run hyper in this thread. This is why the thread does not return.
if let Err(e) = server.await {
eprintln!("server error: {}", e);
eprintln!("server error: {e}");
}
});
Ok(())
@@ -541,6 +541,7 @@ struct ClientInfo {
/// Handle new client connections. This runs through an event loop
/// for all client communication.
#[allow(clippy::too_many_arguments)]
async fn nostr_server(
repo: Arc<dyn NostrRepo>,
client_info: ClientInfo,
@@ -600,11 +601,13 @@ async fn nostr_server(
// and how many it received from queries.
let mut client_published_event_count: usize = 0;
let mut client_received_event_count: usize = 0;
let unspec = "<unspecified>".to_string();
info!("new client connection (cid: {}, ip: {:?})", cid, conn.ip());
let origin = client_info.origin.unwrap_or_else(|| "<unspecified>".into());
let origin = client_info.origin.as_ref().unwrap_or_else(|| &unspec);
let user_agent = client_info
.user_agent
.unwrap_or_else(|| "<unspecified>".into());
.user_agent.as_ref()
.unwrap_or_else(|| &unspec);
info!(
"cid: {}, origin: {:?}, user-agent: {:?}",
cid, origin, user_agent
@@ -639,7 +642,7 @@ async fn nostr_server(
// database informed us of a query result we asked for
let subesc = query_result.sub_id.replace('"', "");
if query_result.event == "EOSE" {
let send_str = format!("[\"EOSE\",\"{}\"]", subesc);
let send_str = format!("[\"EOSE\",\"{subesc}\"]");
ws_stream.send(Message::Text(send_str)).await.ok();
} else {
client_received_event_count += 1;
@@ -666,7 +669,7 @@ async fn nostr_server(
// create an event response and send it
let subesc = s.replace('"', "");
metrics.sent_events.with_label_values(&["realtime"]).inc();
ws_stream.send(Message::Text(format!("[\"EVENT\",\"{}\",{}]", subesc, event_str))).await.ok();
ws_stream.send(Message::Text(format!("[\"EVENT\",\"{subesc}\",{event_str}]"))).await.ok();
} else {
warn!("could not serialize event: {:?}", global_event.get_event_id_prefix());
}
@@ -692,7 +695,7 @@ async fn nostr_server(
},
Some(Err(WsError::Capacity(MessageTooLong{size, max_size}))) => {
ws_stream.send(
make_notice_message(&Notice::message(format!("message too large ({} > {})",size, max_size)))).await.ok();
make_notice_message(&Notice::message(format!("message too large ({size} > {max_size})")))).await.ok();
continue;
},
None |
@@ -735,13 +738,13 @@ async fn nostr_server(
// check if the event is too far in the future.
if e.is_valid_timestamp(settings.options.reject_future_seconds) {
// Write this to the database.
let submit_event = SubmittedEvent { event: e.clone(), notice_tx: notice_tx.clone() };
let submit_event = SubmittedEvent { event: e.clone(), notice_tx: notice_tx.clone(), source_ip: conn.ip().to_string(), origin: client_info.origin.clone(), user_agent: client_info.user_agent.clone()};
event_tx.send(submit_event).await.ok();
client_published_event_count += 1;
} else {
info!("client: {} sent a far future-dated event", cid);
if let Some(fut_sec) = settings.options.reject_future_seconds {
let msg = format!("The event created_at field is out of the acceptable range (+{}sec) for this relay.",fut_sec);
let msg = format!("The event created_at field is out of the acceptable range (+{fut_sec}sec) for this relay.");
let notice = Notice::invalid(e.id, &msg);
ws_stream.send(make_notice_message(&notice)).await.ok();
}
@@ -749,7 +752,7 @@ async fn nostr_server(
},
Err(e) => {
info!("client sent an invalid event (cid: {})", cid);
ws_stream.send(make_notice_message(&Notice::invalid(evid, &format!("{}", e)))).await.ok();
ws_stream.send(make_notice_message(&Notice::invalid(evid, &format!("{e}")))).await.ok();
}
}
},
@@ -782,7 +785,7 @@ async fn nostr_server(
},
Err(e) => {
info!("Subscription error: {} (cid: {}, sub: {:?})", e, cid, s.id);
ws_stream.send(make_notice_message(&Notice::message(format!("Subscription error: {}", e)))).await.ok();
ws_stream.send(make_notice_message(&Notice::message(format!("Subscription error: {e}")))).await.ok();
}
}
}

View File

@@ -70,7 +70,7 @@ impl Serialize for ReqFilter {
if let Some(tags) = &self.tags {
for (k,v) in tags {
let vals:Vec<&String> = v.iter().collect();
map.serialize_entry(&format!("#{}",k), &vals)?;
map.serialize_entry(&format!("#{k}"), &vals)?;
}
}
map.end()