mirror of
https://github.com/scsibug/nostr-rs-relay.git
synced 2025-09-01 03:40:46 -04:00
Compare commits
159 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
cf3e67500f | ||
|
1d19442cfd | ||
|
13cc24b5cd | ||
|
f543957b34 | ||
|
7021f102e8 | ||
|
fddbf321bc | ||
|
3e7f2e21df | ||
|
9d9c6c78d1 | ||
|
703b2efe6e | ||
|
0db6487ce3 | ||
|
ba987d3212 | ||
|
73f4f60cc7 | ||
|
d06d227ebe | ||
|
3519488c4e | ||
|
fbd3315110 | ||
|
3d3d1bde53 | ||
|
ed336111bb | ||
|
8aed572989 | ||
|
62e8da689d | ||
|
807d1aa384 | ||
|
66a55b55b9 | ||
|
76c77c3e56 | ||
|
50daab8a6f | ||
|
ffd4e6f997 | ||
|
bbd716963e | ||
|
ca95e8cf22 | ||
|
e9d2a2cbd0 | ||
|
39a945b493 | ||
|
9a84dc19e9 | ||
|
20c4bb42eb | ||
|
0e519f6b77 | ||
|
3dd0f2c9c6 | ||
|
b7c8737166 | ||
|
c0b112c094 | ||
|
cb283ac316 | ||
|
2c6ac69bfd | ||
|
d929ae2752 | ||
|
14fe9f9ee1 | ||
|
7774db8c47 | ||
|
104ef2b9e1 | ||
|
c06139ec99 | ||
|
19ec89593d | ||
|
27902bc5f4 | ||
|
d2adddaee4 | ||
|
b23b3ce8ec | ||
|
5f9fe1ce59 | ||
|
6a8c4ed1b5 | ||
|
966c853700 | ||
|
65fd0ed08b | ||
|
0b51675b38 | ||
|
2e22334631 | ||
|
cb2ac4bf0f | ||
|
38dc7789dc | ||
|
ce0e00ffb3 | ||
|
3e4ae4aeec | ||
|
c6a8807485 | ||
|
8137b6211c | ||
|
29effaae23 | ||
|
e5074f2e46 | ||
|
4fd7643907 | ||
|
1e1ec69175 | ||
|
e08647867c | ||
|
ae0f7171ed | ||
|
4f1a912f36 | ||
|
95748647f0 | ||
|
25480e837f | ||
|
b80b54cd9d | ||
|
8ea732cbe5 | ||
|
0f68c4e5c2 | ||
|
dab2cd5792 | ||
|
f411aa6fc2 | ||
|
d31bbda087 | ||
|
5917bc53b2 | ||
|
91177c61a1 | ||
|
53c2a8051c | ||
|
168cf513ac | ||
|
ea204761c9 | ||
|
c270ae1434 | ||
|
64bd983cb6 | ||
|
1c153bc784 | ||
|
dc11d9a619 | ||
|
cd1557787b | ||
|
86bb7aeb9a | ||
|
ce37fc1a2d | ||
|
2cfd384339 | ||
|
8c013107f9 | ||
|
64a4466d30 | ||
|
1596c23eb4 | ||
|
129badd4e1 | ||
|
6f7c080180 | ||
|
af92561ef6 | ||
|
d833a3e40d | ||
|
462eb46642 | ||
|
cf144d503d | ||
|
fb8375aef2 | ||
|
88ac31b549 | ||
|
677b7d39e9 | ||
|
b24d2f9aaa | ||
|
7a3899d852 | ||
|
818108b793 | ||
|
d10348f7e1 | ||
|
8598e443d8 | ||
|
43222d44e5 | ||
|
7c1516c4fb | ||
|
0c72053a49 | ||
|
3f32ff67ab | ||
|
0b9778d6ca | ||
|
9be04120c7 | ||
|
cc06167e06 | ||
|
b6e33f044f | ||
|
1b2c6f9fca | ||
|
0d8d39ad22 | ||
|
0e851d4f71 | ||
|
3c880b2f49 | ||
|
7a4c9266ec | ||
|
e8557d421b | ||
|
7ca9c864f2 | ||
|
838aafd079 | ||
|
e554b10ac2 | ||
|
b0bfaa48fc | ||
|
2e9b1b6ba7 | ||
|
4d9012d94c | ||
|
ffe7aac066 | ||
|
f9695bd0a9 | ||
|
7c4bf5cc8f | ||
|
e2de162931 | ||
|
4f606615eb | ||
|
84a58ebbcd | ||
|
c48e45686d | ||
|
bbe359364a | ||
|
9e9c494367 | ||
|
5fa24bc9f1 | ||
|
4de7490d97 | ||
|
d0f63dc66e | ||
|
06078648c8 | ||
|
cc0fcc5d66 | ||
|
dfb2096653 | ||
|
486508d192 | ||
|
84b43c144b | ||
|
110500bb46 | ||
|
83f6b11de7 | ||
|
6d1244434b | ||
|
5a91419d34 | ||
|
7adc5c9af7 | ||
|
9dd4571bee | ||
|
9db5a26b9c | ||
|
ac345b5744 | ||
|
675662c7fb | ||
|
505b0cb71f | ||
|
e8aa450802 | ||
|
5a8860bb09 | ||
|
11e43eccf9 | ||
|
50577b2dfa | ||
|
a6cb6f8486 | ||
|
ae5bf98d87 | ||
|
1cf9d719f0 | ||
|
311f4b5283 | ||
|
14b5a51e3a | ||
|
8ecce3f566 |
@@ -11,6 +11,6 @@ repos:
|
||||
- repo: https://github.com/doublify/pre-commit-rust
|
||||
rev: v1.0
|
||||
hooks:
|
||||
- id: fmt
|
||||
# - id: fmt
|
||||
- id: cargo-check
|
||||
- id: clippy
|
||||
|
521
Cargo.lock
generated
521
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
13
Cargo.toml
13
Cargo.toml
@@ -1,9 +1,18 @@
|
||||
[package]
|
||||
name = "nostr-rs-relay"
|
||||
version = "0.7.0"
|
||||
version = "0.7.17"
|
||||
edition = "2021"
|
||||
authors = ["Greg Heartsfield <scsibug@imap.cc>"]
|
||||
description = "A relay implementation for the Nostr protocol"
|
||||
readme = "README.md"
|
||||
homepage = "https://sr.ht/~gheartsfield/nostr-rs-relay/"
|
||||
repository = "https://git.sr.ht/~gheartsfield/nostr-rs-relay"
|
||||
license = "MIT"
|
||||
keywords = ["nostr", "server"]
|
||||
categories = ["network-programming", "web-programming"]
|
||||
|
||||
[dependencies]
|
||||
clap = { version = "4.0.32", features = ["env", "default", "derive"]}
|
||||
tracing = "0.1.36"
|
||||
tracing-subscriber = "0.2.0"
|
||||
tokio = { version = "1", features = ["full", "tracing", "signal"] }
|
||||
@@ -20,7 +29,7 @@ secp256k1 = {version = "0.21", features = ["rand", "rand-std", "serde", "bitcoin
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = {version = "1.0", features = ["preserve_order"]}
|
||||
hex = "0.4"
|
||||
rusqlite = { version = "0.26", features = ["limits","bundled"]}
|
||||
rusqlite = { version = "0.26", features = ["limits","bundled","modern_sqlite", "trace"]}
|
||||
r2d2 = "0.8"
|
||||
r2d2_sqlite = "0.19"
|
||||
lazy_static = "1.4"
|
||||
|
14
Dockerfile
14
Dockerfile
@@ -1,18 +1,24 @@
|
||||
FROM docker.io/library/rust:1.64.0@sha256:5cf09a76cb9baf4990d121221bbad64927cc5690ee54f246487e302ddc2ba300 as builder
|
||||
FROM docker.io/library/rust:1.66.0 as builder
|
||||
|
||||
RUN USER=root cargo install cargo-auditable
|
||||
RUN USER=root cargo new --bin nostr-rs-relay
|
||||
WORKDIR ./nostr-rs-relay
|
||||
COPY ./Cargo.toml ./Cargo.toml
|
||||
COPY ./Cargo.lock ./Cargo.lock
|
||||
RUN cargo build --release
|
||||
# build dependencies only (caching)
|
||||
RUN cargo auditable build --release --locked
|
||||
# get rid of starter project code
|
||||
RUN rm src/*.rs
|
||||
|
||||
# copy project source code
|
||||
COPY ./src ./src
|
||||
|
||||
# build auditable release using locked deps
|
||||
RUN rm ./target/release/deps/nostr*relay*
|
||||
RUN cargo build --release
|
||||
RUN cargo auditable build --release --locked
|
||||
|
||||
FROM docker.io/library/debian:bullseye-slim
|
||||
|
||||
FROM docker.io/library/debian:bullseye-20221004-slim@sha256:8b702518a671c926b5ece4efe386a476eb4777646a36d996d4bd50944f2f11a2
|
||||
ARG APP=/usr/src/app
|
||||
ARG APP_DATA=/usr/src/app/db
|
||||
RUN apt-get update \
|
||||
|
76
README.md
76
README.md
@@ -1,8 +1,8 @@
|
||||
# [nostr-rs-relay](https://git.sr.ht/~gheartsfield/nostr-rs-relay)
|
||||
|
||||
This is a [nostr](https://github.com/nostr-protocol/nostr) relay, written in
|
||||
Rust. It currently supports the entire relay protocol, and has a
|
||||
SQLite persistence layer.
|
||||
This is a [nostr](https://github.com/nostr-protocol/nostr) relay,
|
||||
written in Rust. It currently supports the entire relay protocol, and
|
||||
persists data with SQLite.
|
||||
|
||||
The project master repository is available on
|
||||
[sourcehut](https://sr.ht/~gheartsfield/nostr-rs-relay/), and is
|
||||
@@ -26,8 +26,10 @@ mirrored on [GitHub](https://github.com/scsibug/nostr-rs-relay).
|
||||
- [x] NIP-12: [Generic Tag Queries](https://github.com/nostr-protocol/nips/blob/master/12.md)
|
||||
- [x] NIP-15: [End of Stored Events Notice](https://github.com/nostr-protocol/nips/blob/master/15.md)
|
||||
- [x] NIP-16: [Event Treatment](https://github.com/nostr-protocol/nips/blob/master/16.md)
|
||||
- [x] NIP-20: [Command Results](https://github.com/nostr-protocol/nips/blob/master/20.md)
|
||||
- [x] NIP-22: [Event `created_at` limits](https://github.com/nostr-protocol/nips/blob/master/22.md) (_future-dated events only_)
|
||||
- [x] NIP-26: [Event Delegation](https://github.com/nostr-protocol/nips/blob/master/26.md)
|
||||
- [ ] NIP-26: [Event Delegation](https://github.com/nostr-protocol/nips/blob/master/26.md) (_implemented, but currently disabled_)
|
||||
- [x] NIP-28: [Public Chat](https://github.com/nostr-protocol/nips/blob/master/28.md)
|
||||
|
||||
## Quick Start
|
||||
|
||||
@@ -36,15 +38,32 @@ application. Use a bind mount to store the SQLite database outside of
|
||||
the container image, and map the container's 8080 port to a host port
|
||||
(7000 in the example below).
|
||||
|
||||
The examples below start a rootless podman container, mapping a local
|
||||
data directory and config file.
|
||||
|
||||
```console
|
||||
$ docker build -t nostr-rs-relay .
|
||||
$ podman build -t nostr-rs-relay .
|
||||
|
||||
$ docker run -it -p 7000:8080 \
|
||||
--mount src=$(pwd)/data,target=/usr/src/app/db,type=bind nostr-rs-relay
|
||||
$ mkdir data
|
||||
|
||||
[2021-12-31T19:58:31Z INFO nostr_rs_relay] listening on: 0.0.0.0:8080
|
||||
[2021-12-31T19:58:31Z INFO nostr_rs_relay::db] opened database "/usr/src/app/db/nostr.db" for writing
|
||||
[2021-12-31T19:58:31Z INFO nostr_rs_relay::db] DB version = 2
|
||||
$ podman unshare chown 100:100 data
|
||||
|
||||
$ podman run -it --rm -p 7000:8080 \
|
||||
--user=100:100 \
|
||||
-v $(pwd)/data:/usr/src/app/db:Z \
|
||||
-v $(pwd)/config.toml:/usr/src/app/config.toml:ro,Z \
|
||||
--name nostr-relay nostr-rs-relay:latest
|
||||
|
||||
Nov 19 15:31:15.013 INFO nostr_rs_relay: Starting up from main
|
||||
Nov 19 15:31:15.017 INFO nostr_rs_relay::server: listening on: 0.0.0.0:8080
|
||||
Nov 19 15:31:15.019 INFO nostr_rs_relay::server: db writer created
|
||||
Nov 19 15:31:15.019 INFO nostr_rs_relay::server: control message listener started
|
||||
Nov 19 15:31:15.019 INFO nostr_rs_relay::db: Built a connection pool "event writer" (min=1, max=4)
|
||||
Nov 19 15:31:15.019 INFO nostr_rs_relay::db: opened database "/usr/src/app/db/nostr.db" for writing
|
||||
Nov 19 15:31:15.019 INFO nostr_rs_relay::schema: DB version = 0
|
||||
Nov 19 15:31:15.054 INFO nostr_rs_relay::schema: database pragma/schema initialized to v7, and ready
|
||||
Nov 19 15:31:15.054 INFO nostr_rs_relay::schema: All migration scripts completed successfully. Welcome to v7.
|
||||
Nov 19 15:31:15.521 INFO nostr_rs_relay::db: Built a connection pool "client query" (min=4, max=128)
|
||||
```
|
||||
|
||||
Use a `nostr` client such as
|
||||
@@ -63,6 +82,38 @@ Text Note [81cf...2652] from 296a...9b92 5 seconds ago
|
||||
A pre-built container is also available on DockerHub:
|
||||
https://hub.docker.com/r/scsibug/nostr-rs-relay
|
||||
|
||||
## Build and Run (without Docker)
|
||||
|
||||
Building `nostr-rs-relay` requires an installation of Cargo & Rust: https://www.rust-lang.org/tools/install
|
||||
|
||||
Clone this repository, and then build a release version of the relay:
|
||||
|
||||
```console
|
||||
$ git clone -q https://git.sr.ht/\~gheartsfield/nostr-rs-relay
|
||||
$ cd nostr-rs-relay
|
||||
$ cargo build -q -r
|
||||
```
|
||||
|
||||
The relay executable is now located in
|
||||
`target/release/nostr-rs-relay`. In order to run it with logging
|
||||
enabled, execute it with the `RUST_LOG` variable set:
|
||||
|
||||
```console
|
||||
$ RUST_LOG=warn,nostr_rs_relay=info ./target/release/nostr-rs-relay
|
||||
Dec 26 10:31:56.455 INFO nostr_rs_relay: Starting up from main
|
||||
Dec 26 10:31:56.464 INFO nostr_rs_relay::server: listening on: 0.0.0.0:8080
|
||||
Dec 26 10:31:56.466 INFO nostr_rs_relay::server: db writer created
|
||||
Dec 26 10:31:56.466 INFO nostr_rs_relay::db: Built a connection pool "event writer" (min=1, max=2)
|
||||
Dec 26 10:31:56.466 INFO nostr_rs_relay::db: opened database "./nostr.db" for writing
|
||||
Dec 26 10:31:56.466 INFO nostr_rs_relay::schema: DB version = 11
|
||||
Dec 26 10:31:56.467 INFO nostr_rs_relay::db: Built a connection pool "maintenance writer" (min=1, max=2)
|
||||
Dec 26 10:31:56.467 INFO nostr_rs_relay::server: control message listener started
|
||||
Dec 26 10:31:56.468 INFO nostr_rs_relay::db: Built a connection pool "client query" (min=4, max=8)
|
||||
```
|
||||
|
||||
You now have a running relay, on port `8080`. Use a `nostr` client or
|
||||
`websocat` to connect and send/query for events.
|
||||
|
||||
## Configuration
|
||||
|
||||
The sample [`config.toml`](config.toml) file demonstrates the
|
||||
@@ -97,3 +148,8 @@ To chat about `nostr-rs-relay` on `nostr` itself; visit our channel on [anigma](
|
||||
License
|
||||
---
|
||||
This project is MIT licensed.
|
||||
|
||||
External Documentation and Links
|
||||
---
|
||||
|
||||
* [BlockChainCaffe's Nostr Relay Setup Guide](https://github.com/BlockChainCaffe/Nostr-Relay-Setup-Guide)
|
||||
|
40
config.toml
40
config.toml
@@ -18,7 +18,7 @@ description = "A newly created nostr-rs-relay.\n\nCustomize this with your own i
|
||||
|
||||
[diagnostics]
|
||||
# Enable tokio tracing (for use with tokio-console)
|
||||
#tracing = true
|
||||
#tracing = false
|
||||
|
||||
[database]
|
||||
# Directory for SQLite files. Defaults to the current directory. Can
|
||||
@@ -36,8 +36,9 @@ data_directory = "."
|
||||
# Minimum number of SQLite reader connections
|
||||
#min_conn = 4
|
||||
|
||||
# Maximum number of SQLite reader connections
|
||||
#max_conn = 128
|
||||
# Maximum number of SQLite reader connections. Recommend setting this
|
||||
# to approx the number of cores.
|
||||
#max_conn = 8
|
||||
|
||||
[network]
|
||||
# Bind to this network address
|
||||
@@ -46,6 +47,14 @@ address = "0.0.0.0"
|
||||
# Listen on this port
|
||||
port = 8080
|
||||
|
||||
# If present, read this HTTP header for logging client IP addresses.
|
||||
# Examples for common proxies, cloudflare:
|
||||
#remote_ip_header = "x-forwarded-for"
|
||||
#remote_ip_header = "cf-connecting-ip"
|
||||
|
||||
# Websocket ping interval in seconds, defaults to 5 minutes
|
||||
#ping_interval = 300
|
||||
|
||||
[options]
|
||||
# Reject events that have timestamps greater than this many seconds in
|
||||
# the future. Recommended to reject anything greater than 30 minutes
|
||||
@@ -54,8 +63,24 @@ reject_future_seconds = 1800
|
||||
|
||||
[limits]
|
||||
# Limit events created per second, averaged over one minute. Must be
|
||||
# an integer. If not set (or set to 0), defaults to unlimited.
|
||||
#messages_per_sec = 0
|
||||
# an integer. If not set (or set to 0), defaults to unlimited. Note:
|
||||
# this is for the server as a whole, not per-connection.
|
||||
# messages_per_sec = 0
|
||||
|
||||
# Limit client subscriptions created per second, averaged over one
|
||||
# minute. Must be an integer. If not set (or set to 0), defaults to
|
||||
# unlimited.
|
||||
#subscriptions_per_min = 0
|
||||
|
||||
# UNIMPLEMENTED...
|
||||
# Limit how many concurrent database connections a client can have.
|
||||
# This prevents a single client from starting too many expensive
|
||||
# database queries. Must be an integer. If not set (or set to 0),
|
||||
# defaults to unlimited (subject to subscription limits).
|
||||
#db_conns_per_client = 0
|
||||
|
||||
# Limit blocking threads used for database connections. Defaults to 16.
|
||||
#max_blocking_threads = 16
|
||||
|
||||
# Limit the maximum size of an EVENT message. Defaults to 128 KB.
|
||||
# Set to 0 for unlimited.
|
||||
@@ -75,6 +100,11 @@ reject_future_seconds = 1800
|
||||
# backpressure to senders if writes are slow.
|
||||
#event_persist_buffer = 4096
|
||||
|
||||
# Event kind blacklist. Events with these kinds will be discarded.
|
||||
#event_kind_blacklist = [
|
||||
# 70202,
|
||||
#]
|
||||
|
||||
[authorization]
|
||||
# Pubkey addresses in this array are whitelisted for event publishing.
|
||||
# Only valid events by these authors will be accepted, if the variable
|
||||
|
125
docs/database-maintenance.md
Normal file
125
docs/database-maintenance.md
Normal file
@@ -0,0 +1,125 @@
|
||||
# Database Maintenance
|
||||
|
||||
`nostr-rs-relay` uses the SQLite embedded database to minimize
|
||||
dependencies and overall footprint of running a relay. If traffic is
|
||||
light, the relay should just run with very little need for
|
||||
intervention. For heavily trafficked relays, there are a number of
|
||||
steps that the operator may need to take to maintain performance and
|
||||
limit disk usage.
|
||||
|
||||
This maintenance guide is current as of version `0.7.14`. Future
|
||||
versions may incorporate and automate some of these steps.
|
||||
|
||||
## Backing Up the Database
|
||||
|
||||
To prevent data loss, the database should be backed up regularly. The
|
||||
recommended method is to use the `sqlite3` command to perform an
|
||||
"Online Backup". This can be done while the relay is running, queries
|
||||
can still run and events will be persisted during the backup.
|
||||
|
||||
The following commands will perform a backup of the database to a
|
||||
dated file, and then compress to minimize size:
|
||||
|
||||
```console
|
||||
BACKUP_FILE=/var/backups/nostr/`date +%Y%m%d_%H%M`.db
|
||||
sqlite3 -readonly /apps/nostr-relay/nostr.db ".backup $BACKUP_FILE
|
||||
sqlite3 $BACKUP_FILE "vacuum;"
|
||||
bzip2 -9 $BACKUP_FILE
|
||||
```
|
||||
|
||||
Nostr events are very compressible. Expect a compression ratio on the
|
||||
order of 4:1, resulting in a 75% space saving.
|
||||
|
||||
## Vacuuming the Database
|
||||
|
||||
As the database is updated, it can become fragmented. Performing a
|
||||
full `vacuum` will rebuild the entire database file, and can reduce
|
||||
space. Running this may reduce the size of the database file,
|
||||
especially if a large amount of data was updated or deleted.
|
||||
|
||||
```console
|
||||
vacuum;
|
||||
```
|
||||
|
||||
## Clearing Hidden Events
|
||||
|
||||
When events are deleted, either through deletion events, metadata or
|
||||
follower updates, or a replaceable event kind, the event is not
|
||||
actually removed from the database. Instead, a flag `HIDDEN` is set
|
||||
to true for the event, which excludes it from search results. The
|
||||
original intent was to ensure that subsequent rebroadcasts of the
|
||||
event would be easily detected as having been deleted, and would not
|
||||
need to be stored again. In practice, this decision causes excessive
|
||||
growth of the `tags` table, since all the previous followers are
|
||||
retained for those `HIDDEN` events.
|
||||
|
||||
The `event` and especially the `tag` table can be significantly
|
||||
reduced in size by running these commands:
|
||||
|
||||
```console
|
||||
PRAGMA foreign_keys = ON;
|
||||
delete from event where HIDDEN=true;
|
||||
```
|
||||
|
||||
## Manually Removing Events
|
||||
|
||||
For a variety of reasons, an operator may wish to remove some events
|
||||
from the database. The only way of achieving this today is with
|
||||
manually run SQL commands.
|
||||
|
||||
It is recommended to have a good backup prior to manually running SQL
|
||||
commands!
|
||||
|
||||
In all cases, it is mandatory to enable foreign keys, and this must be
|
||||
done for every connection. Otherwise, you will likely orphan rows in
|
||||
the `tag` table.
|
||||
|
||||
### Deleting Specific Event
|
||||
|
||||
```console
|
||||
PRAGMA foreign_keys = ON;
|
||||
delete from event where event_hash=x'00000000000c1271675dc86e3e1dd1336827bccabb90dc4c9d3b4465efefe00e';
|
||||
```
|
||||
|
||||
### Deleting All Events for Pubkey
|
||||
|
||||
```console
|
||||
PRAGMA foreign_keys = ON;
|
||||
delete from event where author=x'000000000002c7831d9c5a99f183afc2813a6f69a16edda7f6fc0ed8110566e6';
|
||||
```
|
||||
|
||||
### Deleting All Events of a Kind
|
||||
|
||||
|
||||
```console
|
||||
PRAGMA foreign_keys = ON;
|
||||
delete from event where kind=70202;
|
||||
```
|
||||
|
||||
### Deleting Old Events
|
||||
|
||||
In this scenario, we wish to delete any event that has been stored by
|
||||
our relay for more than 1 month. Crucially, this is based on when the
|
||||
event was stored, not when the event says it was created. If an event
|
||||
has a `created` field of 2 years ago, but was first sent to our relay
|
||||
yesterday, it would not be deleted in this scenario. Keep in mind, we
|
||||
do not track anything for re-broadcast events that we already have, so
|
||||
this is not a very effective way of implementing a "least recently
|
||||
seen" policy.
|
||||
|
||||
```console
|
||||
PRAGMA foreign_keys = ON;
|
||||
TODO!
|
||||
```
|
||||
|
||||
### Delete Profile Events with No Recent Events
|
||||
|
||||
Many users create profiles, post a "hello world" event, and then never
|
||||
appear again (likely using an ephemeral keypair that was lost in the
|
||||
browser cache). We can find these accounts and remove them after some
|
||||
time.
|
||||
|
||||
```console
|
||||
PRAGMA foreign_keys = ON;
|
||||
TODO!
|
||||
```
|
@@ -1,3 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
sed -E 's/@sha256:[[:alnum:]]+//g' Dockerfile > Dockerfile.any-platform
|
||||
echo "Created platform-agnostic Dockerfile in 'Dockerfile.any-platform'"
|
@@ -68,8 +68,25 @@ http {
|
||||
server_name relay.example.com;
|
||||
ssl_certificate /etc/letsencrypt/live/relay.example.com/fullchain.pem;
|
||||
ssl_certificate_key /etc/letsencrypt/live/relay.example.com/privkey.pem;
|
||||
ssl_protocols TLSv1 TLSv1.1 TLSv1.2;
|
||||
ssl_ciphers HIGH:!aNULL:!MD5;
|
||||
ssl_protocols TLSv1.3 TLSv1.2;
|
||||
ssl_prefer_server_ciphers on;
|
||||
ssl_ecdh_curve secp521r1:secp384r1;
|
||||
ssl_ciphers EECDH+AESGCM:EECDH+AES256;
|
||||
|
||||
# Optional Diffie-Helmann parameters
|
||||
# Generate with openssl dhparam -out /etc/ssl/certs/dhparam.pem 4096
|
||||
#ssl_dhparam /etc/ssl/certs/dhparam.pem;
|
||||
|
||||
ssl_session_cache shared:TLS:2m;
|
||||
ssl_buffer_size 4k;
|
||||
|
||||
# OCSP stapling
|
||||
ssl_stapling on;
|
||||
ssl_stapling_verify on;
|
||||
resolver 1.1.1.1 1.0.0.1 [2606:4700:4700::1111] [2606:4700:4700::1001]; # Cloudflare
|
||||
|
||||
# Set HSTS to 365 days
|
||||
add_header Strict-Transport-Security 'max-age=31536000; includeSubDomains; preload' always;
|
||||
keepalive_timeout 70;
|
||||
|
||||
location / {
|
||||
|
@@ -1 +1,4 @@
|
||||
edition = "2021"
|
||||
#max_width = 140
|
||||
#chain_width = 100
|
||||
#fn_call_width = 100
|
||||
|
176
src/bin/bulkloader.rs
Normal file
176
src/bin/bulkloader.rs
Normal file
@@ -0,0 +1,176 @@
|
||||
use std::io;
|
||||
use std::path::Path;
|
||||
use nostr_rs_relay::utils::is_lower_hex;
|
||||
use tracing::*;
|
||||
use nostr_rs_relay::config;
|
||||
use nostr_rs_relay::event::{Event,single_char_tagname};
|
||||
use nostr_rs_relay::error::{Error, Result};
|
||||
use nostr_rs_relay::db::build_pool;
|
||||
use nostr_rs_relay::schema::{curr_db_version, DB_VERSION};
|
||||
use rusqlite::{OpenFlags, Transaction};
|
||||
use nostr_rs_relay::db::PooledConnection;
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
use rusqlite::params;
|
||||
|
||||
/// Bulk load JSONL data from STDIN to the database specified in config.toml (or ./nostr.db as a default).
|
||||
/// The database must already exist, this will not create a new one.
|
||||
/// Tested against schema v13.
|
||||
|
||||
pub fn main() -> Result<()> {
|
||||
let _trace_sub = tracing_subscriber::fmt::try_init();
|
||||
println!("Nostr-rs-relay Bulk Loader");
|
||||
// check for a database file, or create one.
|
||||
let settings = config::Settings::new();
|
||||
if !Path::new(&settings.database.data_directory).is_dir() {
|
||||
info!("Database directory does not exist");
|
||||
return Err(Error::DatabaseDirError);
|
||||
}
|
||||
// Get a database pool
|
||||
let pool = build_pool("bulk-loader", &settings, OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE, 1,4,false);
|
||||
{
|
||||
// check for database schema version
|
||||
let mut conn: PooledConnection = pool.get()?;
|
||||
let version = curr_db_version(&mut conn)?;
|
||||
info!("current version is: {:?}", version);
|
||||
// ensure the schema version is current.
|
||||
if version != DB_VERSION {
|
||||
info!("version is not current, exiting");
|
||||
panic!("cannot write to schema other than v{}", DB_VERSION);
|
||||
}
|
||||
}
|
||||
// this channel will contain parsed events ready to be inserted
|
||||
let (event_tx, event_rx) = mpsc::sync_channel(100_000);
|
||||
// Thread for reading events
|
||||
let _stdin_reader_handler = thread::spawn(move || {
|
||||
let stdin = io::stdin();
|
||||
for readline in stdin.lines() {
|
||||
if let Ok(line) = readline {
|
||||
// try to parse a nostr event
|
||||
let eres: Result<Event, serde_json::Error> = serde_json::from_str(&line);
|
||||
if let Ok(mut e) = eres {
|
||||
if let Ok(()) = e.validate() {
|
||||
e.build_index();
|
||||
//debug!("Event: {:?}", e);
|
||||
event_tx.send(Some(e)).ok();
|
||||
} else {
|
||||
info!("could not validate event");
|
||||
}
|
||||
} else {
|
||||
info!("error reading event: {:?}", eres);
|
||||
}
|
||||
} else {
|
||||
// error reading
|
||||
info!("error reading: {:?}", readline);
|
||||
}
|
||||
}
|
||||
info!("finished parsing events");
|
||||
event_tx.send(None).ok();
|
||||
let ok: Result<()> = Ok(());
|
||||
return ok;
|
||||
});
|
||||
let mut conn: PooledConnection = pool.get()?;
|
||||
let mut events_read = 0;
|
||||
let event_batch_size =50_000;
|
||||
let mut new_events = 0;
|
||||
let mut has_more_events = true;
|
||||
while has_more_events {
|
||||
// begin a transaction
|
||||
let tx = conn.transaction()?;
|
||||
// read in batch_size events and commit
|
||||
for _ in 0..event_batch_size {
|
||||
match event_rx.recv() {
|
||||
Ok(Some(e)) => {
|
||||
events_read += 1;
|
||||
// ignore ephemeral events
|
||||
if !(e.kind >= 20000 && e.kind < 30000) {
|
||||
match write_event(&tx, e) {
|
||||
Ok(c) => {
|
||||
new_events += c;
|
||||
},
|
||||
Err(e) => {
|
||||
info!("error inserting event: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Ok(None) => {
|
||||
// signal that the sender will never produce more
|
||||
// events
|
||||
has_more_events=false;
|
||||
break;
|
||||
},
|
||||
Err(_) => {
|
||||
info!("sender is closed");
|
||||
// sender is done
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("committed {} events...", new_events);
|
||||
tx.commit()?;
|
||||
conn.execute_batch("pragma wal_checkpoint(truncate)")?;
|
||||
|
||||
}
|
||||
info!("processed {} events", events_read);
|
||||
info!("stored {} new events", new_events);
|
||||
// get a connection for writing events
|
||||
// read standard in.
|
||||
info!("finished reading input");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Write an event and update the tag table.
|
||||
/// Assumes the event has its index built.
|
||||
fn write_event(tx: &Transaction, e: Event) -> Result<usize> {
|
||||
let id_blob = hex::decode(&e.id).ok();
|
||||
let pubkey_blob: Option<Vec<u8>> = hex::decode(&e.pubkey).ok();
|
||||
let delegator_blob: Option<Vec<u8>> = e.delegated_by.as_ref().and_then(|d| hex::decode(d).ok());
|
||||
let event_str = serde_json::to_string(&e).ok();
|
||||
// ignore if the event hash is a duplicate.
|
||||
let ins_count = tx.execute(
|
||||
"INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, delegated_by, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%s','now'), FALSE);",
|
||||
params![id_blob, e.created_at, e.kind, pubkey_blob, delegator_blob, event_str]
|
||||
)?;
|
||||
if ins_count == 0 {
|
||||
return Ok(0);
|
||||
}
|
||||
// we want to capture the event_id that had the tag, the tag name, and the tag hex value.
|
||||
let event_id = tx.last_insert_rowid();
|
||||
// look at each event, and each tag, creating new tag entries if appropriate.
|
||||
for t in e.tags.iter().filter(|x| x.len() > 1) {
|
||||
let tagname = t.get(0).unwrap();
|
||||
let tagnamechar_opt = single_char_tagname(tagname);
|
||||
if tagnamechar_opt.is_none() {
|
||||
continue;
|
||||
}
|
||||
// safe because len was > 1
|
||||
let tagval = t.get(1).unwrap();
|
||||
// insert as BLOB if we can restore it losslessly.
|
||||
// this means it needs to be even length and lowercase.
|
||||
if (tagval.len() % 2 == 0) && is_lower_hex(tagval) {
|
||||
tx.execute(
|
||||
"INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);",
|
||||
params![event_id, tagname, hex::decode(tagval).ok()],
|
||||
)?;
|
||||
} else {
|
||||
// otherwise, insert as text
|
||||
tx.execute(
|
||||
"INSERT INTO tag (event_id, name, value) VALUES (?1, ?2, ?3);",
|
||||
params![event_id, tagname, &tagval],
|
||||
)?;
|
||||
}
|
||||
}
|
||||
if e.is_replaceable() {
|
||||
//let query = "SELECT id FROM event WHERE kind=? AND author=? ORDER BY created_at DESC LIMIT 1;";
|
||||
//let count: usize = tx.query_row(query, params![e.kind, pubkey_blob], |row| row.get(0))?;
|
||||
//info!("found {} rows that /would/ be preserved", count);
|
||||
match tx.execute(
|
||||
"DELETE FROM event WHERE kind=? and author=? and id NOT IN (SELECT id FROM event WHERE kind=? AND author=? ORDER BY created_at DESC LIMIT 1);",
|
||||
params![e.kind, pubkey_blob, e.kind, pubkey_blob],
|
||||
) {
|
||||
Ok(_) => {},
|
||||
Err(x) => {info!("error deleting replaceable event: {:?}",x);}
|
||||
}
|
||||
}
|
||||
Ok(ins_count)
|
||||
}
|
14
src/cli.rs
Normal file
14
src/cli.rs
Normal file
@@ -0,0 +1,14 @@
|
||||
use clap::Parser;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(about = "A nostr relay written in Rust", author = env!("CARGO_PKG_AUTHORS"), version = env!("CARGO_PKG_VERSION"))]
|
||||
pub struct CLIArgs {
|
||||
#[arg(
|
||||
short,
|
||||
long,
|
||||
help = "Use the <directory> as the location of the database",
|
||||
default_value = ".",
|
||||
required = false
|
||||
)]
|
||||
pub db: String,
|
||||
}
|
@@ -28,9 +28,10 @@ pub struct Database {
|
||||
pub struct Network {
|
||||
pub port: u16,
|
||||
pub address: String,
|
||||
pub remote_ip_header: Option<String>, // retrieve client IP from this HTTP header if present
|
||||
pub ping_interval_seconds: u32,
|
||||
}
|
||||
|
||||
//
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[allow(unused)]
|
||||
pub struct Options {
|
||||
@@ -51,11 +52,15 @@ pub struct Retention {
|
||||
#[allow(unused)]
|
||||
pub struct Limits {
|
||||
pub messages_per_sec: Option<u32>, // Artificially slow down event writing to limit disk consumption (averaged over 1 minute)
|
||||
pub subscriptions_per_min: Option<u32>, // Artificially slow down request (db query) creation to prevent abuse (averaged over 1 minute)
|
||||
pub db_conns_per_client: Option<u32>, // How many concurrent database queries (not subscriptions) may a client have?
|
||||
pub max_blocking_threads: usize,
|
||||
pub max_event_bytes: Option<usize>, // Maximum size of an EVENT message
|
||||
pub max_ws_message_bytes: Option<usize>,
|
||||
pub max_ws_frame_bytes: Option<usize>,
|
||||
pub broadcast_buffer: usize, // events to buffer for subscribers (prevents slow readers from consuming memory)
|
||||
pub event_persist_buffer: usize, // events to buffer for database commits (block senders if database writes are too slow)
|
||||
pub event_kind_blacklist: Option<Vec<u64>>
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
@@ -203,19 +208,25 @@ impl Default for Settings {
|
||||
data_directory: ".".to_owned(),
|
||||
in_memory: false,
|
||||
min_conn: 4,
|
||||
max_conn: 128,
|
||||
max_conn: 8,
|
||||
},
|
||||
network: Network {
|
||||
port: 8080,
|
||||
ping_interval_seconds: 300,
|
||||
address: "0.0.0.0".to_owned(),
|
||||
remote_ip_header: None,
|
||||
},
|
||||
limits: Limits {
|
||||
messages_per_sec: None,
|
||||
subscriptions_per_min: None,
|
||||
db_conns_per_client: None,
|
||||
max_blocking_threads: 16,
|
||||
max_event_bytes: Some(2 << 17), // 128K
|
||||
max_ws_message_bytes: Some(2 << 17), // 128K
|
||||
max_ws_frame_bytes: Some(2 << 17), // 128K
|
||||
broadcast_buffer: 16384,
|
||||
event_persist_buffer: 4096,
|
||||
event_kind_blacklist: None,
|
||||
},
|
||||
authorization: Authorization {
|
||||
pubkey_whitelist: None, // Allow any address to publish
|
||||
|
53
src/conn.rs
53
src/conn.rs
@@ -2,11 +2,10 @@
|
||||
use crate::close::Close;
|
||||
use crate::error::Error;
|
||||
use crate::error::Result;
|
||||
use crate::event::Event;
|
||||
|
||||
use crate::subscription::Subscription;
|
||||
use std::collections::HashMap;
|
||||
use tracing::{debug, info};
|
||||
use tracing::{debug, trace};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// A subscription identifier has a maximum length
|
||||
@@ -14,6 +13,8 @@ const MAX_SUBSCRIPTION_ID_LEN: usize = 256;
|
||||
|
||||
/// State for a client connection
|
||||
pub struct ClientConn {
|
||||
/// Client IP (either from socket, or configured proxy header
|
||||
client_ip: String,
|
||||
/// Unique client identifier generated at connection time
|
||||
client_id: Uuid,
|
||||
/// The current set of active client subscriptions
|
||||
@@ -24,22 +25,32 @@ pub struct ClientConn {
|
||||
|
||||
impl Default for ClientConn {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
Self::new("unknown".to_owned())
|
||||
}
|
||||
}
|
||||
|
||||
impl ClientConn {
|
||||
/// Create a new, empty connection state.
|
||||
#[must_use]
|
||||
pub fn new() -> Self {
|
||||
pub fn new(client_ip: String) -> Self {
|
||||
let client_id = Uuid::new_v4();
|
||||
ClientConn {
|
||||
client_ip,
|
||||
client_id,
|
||||
subscriptions: HashMap::new(),
|
||||
max_subs: 32,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn subscriptions(&self) -> &HashMap<String, Subscription> {
|
||||
&self.subscriptions
|
||||
}
|
||||
|
||||
/// Check if the given subscription already exists
|
||||
pub fn has_subscription(&self, sub: &Subscription) -> bool {
|
||||
self.subscriptions.values().any(|x| x == sub)
|
||||
}
|
||||
|
||||
/// Get a short prefix of the client's unique identifier, suitable
|
||||
/// for logging.
|
||||
#[must_use]
|
||||
@@ -47,16 +58,9 @@ impl ClientConn {
|
||||
self.client_id.to_string().chars().take(8).collect()
|
||||
}
|
||||
|
||||
/// Find all matching subscriptions.
|
||||
#[must_use]
|
||||
pub fn get_matching_subscriptions(&self, e: &Event) -> Vec<&str> {
|
||||
let mut v: Vec<&str> = vec![];
|
||||
for (id, sub) in &self.subscriptions {
|
||||
if sub.interested_in_event(e) {
|
||||
v.push(id);
|
||||
}
|
||||
}
|
||||
v
|
||||
pub fn ip(&self) -> &str {
|
||||
&self.client_ip
|
||||
}
|
||||
|
||||
/// Add a new subscription for this connection.
|
||||
@@ -70,7 +74,7 @@ impl ClientConn {
|
||||
// prevent arbitrarily long subscription identifiers from
|
||||
// being used.
|
||||
if sub_id_len > MAX_SUBSCRIPTION_ID_LEN {
|
||||
info!(
|
||||
debug!(
|
||||
"ignoring sub request with excessive length: ({})",
|
||||
sub_id_len
|
||||
);
|
||||
@@ -79,8 +83,12 @@ impl ClientConn {
|
||||
// check if an existing subscription exists, and replace if so
|
||||
if self.subscriptions.contains_key(&k) {
|
||||
self.subscriptions.remove(&k);
|
||||
self.subscriptions.insert(k, s);
|
||||
debug!("replaced existing subscription");
|
||||
self.subscriptions.insert(k, s.clone());
|
||||
trace!(
|
||||
"replaced existing subscription (cid: {}, sub: {:?})",
|
||||
self.get_client_prefix(),
|
||||
s.get_id()
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -90,9 +98,10 @@ impl ClientConn {
|
||||
}
|
||||
// add subscription
|
||||
self.subscriptions.insert(k, s);
|
||||
debug!(
|
||||
"registered new subscription, currently have {} active subs",
|
||||
self.subscriptions.len()
|
||||
trace!(
|
||||
"registered new subscription, currently have {} active subs (cid: {})",
|
||||
self.subscriptions.len(),
|
||||
self.get_client_prefix(),
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
@@ -101,10 +110,10 @@ impl ClientConn {
|
||||
pub fn unsubscribe(&mut self, c: &Close) {
|
||||
// TODO: return notice if subscription did not exist.
|
||||
self.subscriptions.remove(&c.id);
|
||||
debug!(
|
||||
"removed subscription, currently have {} active subs (cid={})",
|
||||
trace!(
|
||||
"removed subscription, currently have {} active subs (cid: {})",
|
||||
self.subscriptions.len(),
|
||||
self.client_id
|
||||
self.get_client_prefix(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
455
src/db.rs
455
src/db.rs
@@ -6,6 +6,7 @@ use crate::event::{single_char_tagname, Event};
|
||||
use crate::hexrange::hex_range;
|
||||
use crate::hexrange::HexSearch;
|
||||
use crate::nip05;
|
||||
use crate::notice::Notice;
|
||||
use crate::schema::{upgrade_db, STARTUP_SQL};
|
||||
use crate::subscription::ReqFilter;
|
||||
use crate::subscription::Subscription;
|
||||
@@ -18,8 +19,10 @@ use r2d2_sqlite::SqliteConnectionManager;
|
||||
use rusqlite::params;
|
||||
use rusqlite::types::ToSql;
|
||||
use rusqlite::OpenFlags;
|
||||
use tokio::sync::{Mutex, MutexGuard};
|
||||
use std::fmt::Write as _;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
@@ -32,12 +35,15 @@ pub type PooledConnection = r2d2::PooledConnection<r2d2_sqlite::SqliteConnection
|
||||
/// Events submitted from a client, with a return channel for notices
|
||||
pub struct SubmittedEvent {
|
||||
pub event: Event,
|
||||
pub notice_tx: tokio::sync::mpsc::Sender<String>,
|
||||
pub notice_tx: tokio::sync::mpsc::Sender<Notice>,
|
||||
}
|
||||
|
||||
/// Database file
|
||||
pub const DB_FILE: &str = "nostr.db";
|
||||
|
||||
/// How frequently to attempt checkpointing
|
||||
pub const CHECKPOINT_FREQ_SEC: u64 = 60;
|
||||
|
||||
/// Build a database connection pool.
|
||||
/// # Panics
|
||||
///
|
||||
@@ -75,6 +81,7 @@ pub fn build_pool(
|
||||
.test_on_check_out(true) // no noticeable performance hit
|
||||
.min_idle(Some(min_size))
|
||||
.max_size(max_size)
|
||||
.max_lifetime(Some(Duration::from_secs(30)))
|
||||
.build(manager)
|
||||
.unwrap();
|
||||
info!(
|
||||
@@ -84,6 +91,56 @@ pub fn build_pool(
|
||||
pool
|
||||
}
|
||||
|
||||
/// Display database pool stats every 1 minute
|
||||
pub async fn monitor_pool(name: &str, pool: SqlitePool) {
|
||||
let sleep_dur = Duration::from_secs(60);
|
||||
loop {
|
||||
log_pool_stats(name, &pool);
|
||||
tokio::time::sleep(sleep_dur).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Perform normal maintenance
|
||||
pub fn optimize_db(conn: &mut PooledConnection) -> Result<()> {
|
||||
let start = Instant::now();
|
||||
conn.execute_batch("PRAGMA optimize;")?;
|
||||
info!("optimize ran in {:?}", start.elapsed());
|
||||
Ok(())
|
||||
}
|
||||
#[derive(Debug)]
|
||||
enum SqliteStatus {
|
||||
Ok,
|
||||
Busy,
|
||||
Error,
|
||||
Other(u64),
|
||||
}
|
||||
|
||||
/// Checkpoint/Truncate WAL. Returns the number of WAL pages remaining.
|
||||
pub fn checkpoint_db(conn: &mut PooledConnection) -> Result<usize> {
|
||||
let query = "PRAGMA wal_checkpoint(TRUNCATE);";
|
||||
let start = Instant::now();
|
||||
let (cp_result, wal_size, _frames_checkpointed) = conn.query_row(query, [], |row| {
|
||||
let checkpoint_result: u64 = row.get(0)?;
|
||||
let wal_size: u64 = row.get(1)?;
|
||||
let frames_checkpointed: u64 = row.get(2)?;
|
||||
Ok((checkpoint_result, wal_size, frames_checkpointed))
|
||||
})?;
|
||||
let result = match cp_result {
|
||||
0 => SqliteStatus::Ok,
|
||||
1 => SqliteStatus::Busy,
|
||||
2 => SqliteStatus::Error,
|
||||
x => SqliteStatus::Other(x),
|
||||
};
|
||||
info!(
|
||||
"checkpoint ran in {:?} (result: {:?}, WAL size: {})",
|
||||
start.elapsed(),
|
||||
result,
|
||||
wal_size
|
||||
);
|
||||
Ok(wal_size as usize)
|
||||
}
|
||||
|
||||
/// Spawn a database writer that persists events to the SQLite store.
|
||||
pub async fn db_writer(
|
||||
settings: Settings,
|
||||
@@ -106,7 +163,7 @@ pub async fn db_writer(
|
||||
&settings,
|
||||
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
|
||||
1,
|
||||
4,
|
||||
2,
|
||||
false,
|
||||
);
|
||||
if settings.database.in_memory {
|
||||
@@ -153,12 +210,34 @@ pub async fn db_writer(
|
||||
// TODO: incorporate delegated pubkeys
|
||||
// if the event address is not in allowed_addrs.
|
||||
if !allowed_addrs.contains(&event.pubkey) {
|
||||
info!(
|
||||
"Rejecting event {}, unauthorized author",
|
||||
debug!(
|
||||
"rejecting event: {}, unauthorized author",
|
||||
event.get_event_id_prefix()
|
||||
);
|
||||
notice_tx
|
||||
.try_send("pubkey is not allowed to publish to this relay".to_owned())
|
||||
.try_send(Notice::blocked(
|
||||
event.id,
|
||||
"pubkey is not allowed to publish to this relay",
|
||||
))
|
||||
.ok();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Check that event kind isn't blacklisted
|
||||
let kinds_blacklist = &settings.limits.event_kind_blacklist.clone();
|
||||
if let Some(event_kind_blacklist) = kinds_blacklist {
|
||||
if event_kind_blacklist.contains(&event.kind) {
|
||||
debug!(
|
||||
"rejecting event: {}, blacklisted kind: {}",
|
||||
&event.get_event_id_prefix(),
|
||||
&event.kind
|
||||
);
|
||||
notice_tx
|
||||
.try_send(Notice::blocked(
|
||||
event.id,
|
||||
"event kind is blocked by relay"
|
||||
))
|
||||
.ok();
|
||||
continue;
|
||||
}
|
||||
@@ -184,15 +263,16 @@ pub async fn db_writer(
|
||||
event.get_author_prefix()
|
||||
);
|
||||
} else {
|
||||
info!("rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)",
|
||||
uv.name.to_string(),
|
||||
event.get_author_prefix()
|
||||
info!(
|
||||
"rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)",
|
||||
uv.name.to_string(),
|
||||
event.get_author_prefix()
|
||||
);
|
||||
notice_tx
|
||||
.try_send(
|
||||
"NIP-05 verification is no longer valid (expired/wrong domain)"
|
||||
.to_owned(),
|
||||
)
|
||||
.try_send(Notice::blocked(
|
||||
event.id,
|
||||
"NIP-05 verification is no longer valid (expired/wrong domain)",
|
||||
))
|
||||
.ok();
|
||||
continue;
|
||||
}
|
||||
@@ -203,7 +283,10 @@ pub async fn db_writer(
|
||||
event.get_author_prefix()
|
||||
);
|
||||
notice_tx
|
||||
.try_send("NIP-05 verification needed to publish events".to_owned())
|
||||
.try_send(Notice::blocked(
|
||||
event.id,
|
||||
"NIP-05 verification needed to publish events",
|
||||
))
|
||||
.ok();
|
||||
continue;
|
||||
}
|
||||
@@ -216,39 +299,38 @@ pub async fn db_writer(
|
||||
// TODO: cache recent list of authors to remove a DB call.
|
||||
let start = Instant::now();
|
||||
if event.kind >= 20000 && event.kind < 30000 {
|
||||
bcast_tx.send(event.clone()).ok();
|
||||
info!(
|
||||
"published ephemeral event {:?} from {:?} in {:?}",
|
||||
"published ephemeral event: {:?} from: {:?} in: {:?}",
|
||||
event.get_event_id_prefix(),
|
||||
event.get_author_prefix(),
|
||||
start.elapsed()
|
||||
);
|
||||
bcast_tx.send(event.clone()).ok();
|
||||
event_write = true
|
||||
} else {
|
||||
match write_event(&mut pool.get()?, &event) {
|
||||
Ok(updated) => {
|
||||
if updated == 0 {
|
||||
trace!("ignoring duplicate or deleted event");
|
||||
notice_tx.try_send(Notice::duplicate(event.id)).ok();
|
||||
} else {
|
||||
info!(
|
||||
"persisted event {:?} from {:?} in {:?}",
|
||||
"persisted event: {:?} (kind: {}) from: {:?} in: {:?}",
|
||||
event.get_event_id_prefix(),
|
||||
event.kind,
|
||||
event.get_author_prefix(),
|
||||
start.elapsed()
|
||||
);
|
||||
event_write = true;
|
||||
// send this out to all clients
|
||||
bcast_tx.send(event.clone()).ok();
|
||||
notice_tx.try_send(Notice::saved(event.id)).ok();
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("event insert failed: {:?}", err);
|
||||
notice_tx
|
||||
.try_send(
|
||||
"relay experienced an error trying to publish the latest event"
|
||||
.to_owned(),
|
||||
)
|
||||
.ok();
|
||||
let msg = "relay experienced an error trying to publish the latest event";
|
||||
notice_tx.try_send(Notice::error(event.id, msg)).ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -283,6 +365,9 @@ pub async fn db_writer(
|
||||
|
||||
/// Persist an event to the database, returning rows added.
|
||||
pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
|
||||
// enable auto vacuum
|
||||
conn.execute_batch("pragma auto_vacuum = FULL")?;
|
||||
|
||||
// start transaction
|
||||
let tx = conn.transaction()?;
|
||||
// get relevant fields from event and convert to blobs.
|
||||
@@ -290,6 +375,15 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
|
||||
let pubkey_blob: Option<Vec<u8>> = hex::decode(&e.pubkey).ok();
|
||||
let delegator_blob: Option<Vec<u8>> = e.delegated_by.as_ref().and_then(|d| hex::decode(d).ok());
|
||||
let event_str = serde_json::to_string(&e).ok();
|
||||
// check for replaceable events that would hide this one; we won't even attempt to insert these.
|
||||
if e.is_replaceable() {
|
||||
let repl_count = tx.query_row(
|
||||
"SELECT e.id FROM event e INDEXED BY author_index WHERE e.author=? AND e.kind=? AND e.created_at > ? LIMIT 1;",
|
||||
params![pubkey_blob, e.kind, e.created_at], |row| row.get::<usize, usize>(0));
|
||||
if repl_count.ok().is_some() {
|
||||
return Ok(0);
|
||||
}
|
||||
}
|
||||
// ignore if the event hash is a duplicate.
|
||||
let mut ins_count = tx.execute(
|
||||
"INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, delegated_by, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%s','now'), FALSE);",
|
||||
@@ -297,7 +391,8 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
|
||||
)?;
|
||||
if ins_count == 0 {
|
||||
// if the event was a duplicate, no need to insert event or
|
||||
// pubkey references. This will abort the txn.
|
||||
// pubkey references.
|
||||
tx.rollback().ok();
|
||||
return Ok(ins_count);
|
||||
}
|
||||
// remember primary key of the event most recently inserted.
|
||||
@@ -315,9 +410,9 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
|
||||
// if tagvalue is lowercase hex;
|
||||
if is_lower_hex(tagval) && (tagval.len() % 2 == 0) {
|
||||
tx.execute(
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
|
||||
params![ev_id, &tagname, hex::decode(&tagval).ok()],
|
||||
)?;
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
|
||||
params![ev_id, &tagname, hex::decode(tagval).ok()],
|
||||
)?;
|
||||
} else {
|
||||
tx.execute(
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)",
|
||||
@@ -329,17 +424,19 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
|
||||
}
|
||||
}
|
||||
}
|
||||
// if this event is replaceable update, hide every other replaceable
|
||||
// if this event is replaceable update, remove other replaceable
|
||||
// event with the same kind from the same author that was issued
|
||||
// earlier than this.
|
||||
if e.kind == 0 || e.kind == 3 || (e.kind >= 10000 && e.kind < 20000) {
|
||||
if e.is_replaceable() {
|
||||
let author = hex::decode(&e.pubkey).ok();
|
||||
// this is a backwards check - hide any events that were older.
|
||||
let update_count = tx.execute(
|
||||
"UPDATE event SET hidden=TRUE WHERE id!=? AND kind=? AND author=? AND created_at <= ? and hidden!=TRUE",
|
||||
params![ev_id, e.kind, hex::decode(&e.pubkey).ok(), e.created_at],
|
||||
"DELETE FROM event WHERE kind=? and author=? and id NOT IN (SELECT id FROM event INDEXED BY author_kind_index WHERE kind=? AND author=? ORDER BY created_at DESC LIMIT 1)",
|
||||
params![e.kind, author, e.kind, author],
|
||||
)?;
|
||||
if update_count > 0 {
|
||||
info!(
|
||||
"hid {} older replaceable kind {} events for author: {:?}",
|
||||
"removed {} older replaceable kind {} events for author: {:?}",
|
||||
update_count,
|
||||
e.kind,
|
||||
e.get_author_prefix()
|
||||
@@ -414,8 +511,40 @@ fn repeat_vars(count: usize) -> String {
|
||||
s
|
||||
}
|
||||
|
||||
/// Create a dynamic SQL subquery and params from a subscription filter.
|
||||
fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
/// Decide if there is an index that should be used explicitly
|
||||
fn override_index(f: &ReqFilter) -> Option<String> {
|
||||
// queries for multiple kinds default to kind_index, which is
|
||||
// significantly slower than kind_created_at_index.
|
||||
if let Some(ks) = &f.kinds {
|
||||
if f.ids.is_none() &&
|
||||
ks.len() > 1 &&
|
||||
f.since.is_none() &&
|
||||
f.until.is_none() &&
|
||||
f.tags.is_none() &&
|
||||
f.authors.is_none() {
|
||||
return Some("kind_created_at_index".into());
|
||||
}
|
||||
}
|
||||
// if there is an author, it is much better to force the authors index.
|
||||
if let Some(_) = &f.authors {
|
||||
if f.since.is_none() && f.until.is_none() {
|
||||
if f.kinds.is_none() {
|
||||
// with no use of kinds/created_at, just author
|
||||
return Some("author_index".into());
|
||||
} else {
|
||||
// prefer author_kind if there are kinds
|
||||
return Some("author_kind_index".into());
|
||||
}
|
||||
} else {
|
||||
// finally, prefer author_created_at if time is provided
|
||||
return Some("author_created_at_index".into());
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Create a dynamic SQL subquery and params from a subscription filter (and optional explicit index used)
|
||||
fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<String>) {
|
||||
// build a dynamic SQL query. all user-input is either an integer
|
||||
// (sqli-safe), or a string that is filtered to only contain
|
||||
// hexadecimal characters. Strings that require escaping (tag
|
||||
@@ -423,14 +552,16 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
|
||||
// if the filter is malformed, don't return anything.
|
||||
if f.force_no_match {
|
||||
let empty_query =
|
||||
"SELECT DISTINCT(e.content), e.created_at FROM event e WHERE 1=0".to_owned();
|
||||
let empty_query = "SELECT e.content, e.created_at FROM event e WHERE 1=0".to_owned();
|
||||
// query parameters for SQLite
|
||||
let empty_params: Vec<Box<dyn ToSql>> = vec![];
|
||||
return (empty_query, empty_params);
|
||||
return (empty_query, empty_params, None);
|
||||
}
|
||||
|
||||
let mut query = "SELECT DISTINCT(e.content), e.created_at FROM event e ".to_owned();
|
||||
// check if the index needs to be overriden
|
||||
let idx_name = override_index(f);
|
||||
let idx_stmt = idx_name.as_ref().map_or_else(|| "".to_owned(), |i| format!("INDEXED BY {}",i));
|
||||
let mut query = format!("SELECT e.content, e.created_at FROM event e {}", idx_stmt);
|
||||
// query parameters for SQLite
|
||||
let mut params: Vec<Box<dyn ToSql>> = vec![];
|
||||
|
||||
@@ -443,22 +574,18 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
for auth in authvec {
|
||||
match hex_range(auth) {
|
||||
Some(HexSearch::Exact(ex)) => {
|
||||
auth_searches.push("author=? OR delegated_by=?".to_owned());
|
||||
params.push(Box::new(ex.clone()));
|
||||
auth_searches.push("author=?".to_owned());
|
||||
params.push(Box::new(ex));
|
||||
}
|
||||
Some(HexSearch::Range(lower, upper)) => {
|
||||
auth_searches.push(
|
||||
"(author>? AND author<?) OR (delegated_by>? AND delegated_by<?)".to_owned(),
|
||||
"(author>? AND author<?)".to_owned(),
|
||||
);
|
||||
params.push(Box::new(lower.clone()));
|
||||
params.push(Box::new(upper.clone()));
|
||||
params.push(Box::new(lower));
|
||||
params.push(Box::new(upper));
|
||||
}
|
||||
Some(HexSearch::LowerOnly(lower)) => {
|
||||
auth_searches.push("author>? OR delegated_by>?".to_owned());
|
||||
params.push(Box::new(lower.clone()));
|
||||
auth_searches.push("author>?".to_owned());
|
||||
params.push(Box::new(lower));
|
||||
}
|
||||
None => {
|
||||
@@ -466,8 +593,12 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
}
|
||||
}
|
||||
}
|
||||
let authors_clause = format!("({})", auth_searches.join(" OR "));
|
||||
filter_components.push(authors_clause);
|
||||
if !authvec.is_empty() {
|
||||
let auth_clause = format!("({})", auth_searches.join(" OR "));
|
||||
filter_components.push(auth_clause);
|
||||
} else {
|
||||
filter_components.push("false".to_owned());
|
||||
}
|
||||
}
|
||||
// Query for Kind
|
||||
if let Some(ks) = &f.kinds {
|
||||
@@ -500,8 +631,14 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
}
|
||||
}
|
||||
}
|
||||
let id_clause = format!("({})", id_searches.join(" OR "));
|
||||
filter_components.push(id_clause);
|
||||
if !idvec.is_empty() {
|
||||
let id_clause = format!("({})", id_searches.join(" OR "));
|
||||
filter_components.push(id_clause);
|
||||
} else {
|
||||
// if the ids list was empty, we should never return
|
||||
// any results.
|
||||
filter_components.push("false".to_owned());
|
||||
}
|
||||
}
|
||||
// Query for tags
|
||||
if let Some(map) = &f.tags {
|
||||
@@ -510,7 +647,7 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
let mut blob_vals: Vec<Box<dyn ToSql>> = vec![];
|
||||
for v in val {
|
||||
if (v.len() % 2 == 0) && is_lower_hex(v) {
|
||||
if let Ok(h) = hex::decode(&v) {
|
||||
if let Ok(h) = hex::decode(v) {
|
||||
blob_vals.push(Box::new(h));
|
||||
}
|
||||
} else {
|
||||
@@ -521,7 +658,10 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
let str_clause = format!("value IN ({})", repeat_vars(str_vals.len()));
|
||||
let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
|
||||
// find evidence of the target tag name/value existing for this event.
|
||||
let tag_clause = format!("e.id IN (SELECT e.id FROM event e LEFT JOIN tag t on e.id=t.event_id WHERE hidden!=TRUE and (name=? AND ({} OR {})))", str_clause, blob_clause);
|
||||
let tag_clause = format!(
|
||||
"e.id IN (SELECT e.id FROM event e LEFT JOIN tag t on e.id=t.event_id WHERE hidden!=TRUE and (name=? AND ({} OR {})))",
|
||||
str_clause, blob_clause
|
||||
);
|
||||
// add the tag name as the first parameter
|
||||
params.push(Box::new(key.to_string()));
|
||||
// add all tag values that are plain strings as params
|
||||
@@ -555,30 +695,111 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
} else {
|
||||
query.push_str(" ORDER BY e.created_at ASC")
|
||||
}
|
||||
(query, params)
|
||||
(query, params, idx_name)
|
||||
}
|
||||
|
||||
/// Create a dynamic SQL query string and params from a subscription.
|
||||
fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>, Vec<String>) {
|
||||
// build a dynamic SQL query for an entire subscription, based on
|
||||
// SQL subqueries for filters.
|
||||
let mut subqueries: Vec<String> = Vec::new();
|
||||
let mut indexes = vec![];
|
||||
// subquery params
|
||||
let mut params: Vec<Box<dyn ToSql>> = vec![];
|
||||
// for every filter in the subscription, generate a subquery
|
||||
for f in sub.filters.iter() {
|
||||
let (f_subquery, mut f_params) = query_from_filter(f);
|
||||
let (f_subquery, mut f_params, index) = query_from_filter(f);
|
||||
if let Some(i) = index {
|
||||
indexes.push(i);
|
||||
}
|
||||
subqueries.push(f_subquery);
|
||||
params.append(&mut f_params);
|
||||
}
|
||||
// encapsulate subqueries into select statements
|
||||
let subqueries_selects: Vec<String> = subqueries
|
||||
.iter()
|
||||
.map(|s| format!("SELECT content, created_at FROM ({})", s))
|
||||
.map(|s| format!("SELECT distinct content, created_at FROM ({})", s))
|
||||
.collect();
|
||||
let query: String = subqueries_selects.join(" UNION ");
|
||||
debug!("final query string: {}", query);
|
||||
(query, params)
|
||||
(query, params,indexes)
|
||||
}
|
||||
|
||||
/// Check if the pool is fully utilized
|
||||
fn _pool_at_capacity(pool: &SqlitePool) -> bool {
|
||||
let state: r2d2::State = pool.state();
|
||||
state.idle_connections == 0
|
||||
}
|
||||
|
||||
/// Log pool stats
|
||||
fn log_pool_stats(name: &str, pool: &SqlitePool) {
|
||||
let state: r2d2::State = pool.state();
|
||||
let in_use_cxns = state.connections - state.idle_connections;
|
||||
debug!(
|
||||
"DB pool {:?} usage (in_use: {}, available: {}, max: {})",
|
||||
name,
|
||||
in_use_cxns,
|
||||
state.connections,
|
||||
pool.max_size()
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
/// Perform database maintenance on a regular basis
|
||||
pub async fn db_optimize_task(pool: SqlitePool) {
|
||||
tokio::task::spawn(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(Duration::from_secs(60*60)) => {
|
||||
if let Ok(mut conn) = pool.get() {
|
||||
// the busy timer will block writers, so don't set
|
||||
// this any higher than you want max latency for event
|
||||
// writes.
|
||||
info!("running database optimizer");
|
||||
optimize_db(&mut conn).ok();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Perform database WAL checkpoint on a regular basis
|
||||
pub async fn db_checkpoint_task(pool: SqlitePool, safe_to_read: Arc<Mutex<u64>>) {
|
||||
tokio::task::spawn(async move {
|
||||
// WAL size in pages.
|
||||
let mut current_wal_size = 0;
|
||||
// WAL threshold for more aggressive checkpointing (10,000 pages, or about 40MB)
|
||||
let wal_threshold = 1000*10;
|
||||
// default threshold for the busy timer
|
||||
let busy_wait_default = Duration::from_secs(1);
|
||||
// if the WAL file is getting too big, switch to this
|
||||
let busy_wait_default_long = Duration::from_secs(10);
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(Duration::from_secs(CHECKPOINT_FREQ_SEC)) => {
|
||||
if let Ok(mut conn) = pool.get() {
|
||||
let mut _guard:Option<MutexGuard<u64>> = None;
|
||||
// the busy timer will block writers, so don't set
|
||||
// this any higher than you want max latency for event
|
||||
// writes.
|
||||
if current_wal_size <= wal_threshold {
|
||||
conn.busy_timeout(busy_wait_default).ok();
|
||||
} else {
|
||||
// if the wal size has exceeded a threshold, increase the busy timeout.
|
||||
conn.busy_timeout(busy_wait_default_long).ok();
|
||||
// take a lock that will prevent new readers.
|
||||
info!("blocking new readers to perform wal_checkpoint");
|
||||
_guard = Some(safe_to_read.lock().await);
|
||||
}
|
||||
debug!("running wal_checkpoint(TRUNCATE)");
|
||||
if let Ok(new_size) = checkpoint_db(&mut conn) {
|
||||
current_wal_size = new_size;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Perform a database query using a subscription.
|
||||
@@ -593,46 +814,125 @@ pub async fn db_query(
|
||||
pool: SqlitePool,
|
||||
query_tx: tokio::sync::mpsc::Sender<QueryResult>,
|
||||
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
|
||||
safe_to_read: Arc<Mutex<u64>>,
|
||||
) {
|
||||
let pre_spawn_start = Instant::now();
|
||||
task::spawn_blocking(move || {
|
||||
debug!("going to query for: {:?}", sub);
|
||||
{
|
||||
// if we are waiting on a checkpoint, stop until it is complete
|
||||
let _ = safe_to_read.blocking_lock();
|
||||
}
|
||||
let db_queue_time = pre_spawn_start.elapsed();
|
||||
// if the queue time was very long (>5 seconds), spare the DB and abort.
|
||||
if db_queue_time > Duration::from_secs(5) {
|
||||
info!(
|
||||
"shedding DB query load queued for {:?} (cid: {}, sub: {:?})",
|
||||
db_queue_time, client_id, sub.id
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
// otherwise, report queuing time if it is slow
|
||||
else if db_queue_time > Duration::from_secs(1) {
|
||||
debug!(
|
||||
"(slow) DB query queued for {:?} (cid: {}, sub: {:?})",
|
||||
db_queue_time, client_id, sub.id
|
||||
);
|
||||
}
|
||||
let start = Instant::now();
|
||||
let mut row_count: usize = 0;
|
||||
let start = Instant::now();
|
||||
// generate SQL query
|
||||
let (q, p) = query_from_sub(&sub);
|
||||
debug!("SQL generated in {:?}", start.elapsed());
|
||||
// show pool stats
|
||||
debug!("DB pool stats: {:?}", pool.state());
|
||||
let (q, p, idxs) = query_from_sub(&sub);
|
||||
let sql_gen_elapsed = start.elapsed();
|
||||
|
||||
if sql_gen_elapsed > Duration::from_millis(10) {
|
||||
debug!("SQL (slow) generated in {:?}", start.elapsed());
|
||||
}
|
||||
// cutoff for displaying slow queries
|
||||
let slow_cutoff = Duration::from_millis(2000);
|
||||
// any client that doesn't cause us to generate new rows in 5
|
||||
// seconds gets dropped.
|
||||
let abort_cutoff = Duration::from_secs(5);
|
||||
let start = Instant::now();
|
||||
if let Ok(conn) = pool.get() {
|
||||
// execute the query. Don't cache, since queries vary so much.
|
||||
let mut stmt = conn.prepare(&q)?;
|
||||
let mut slow_first_event;
|
||||
let mut last_successful_send = Instant::now();
|
||||
if let Ok(mut conn) = pool.get() {
|
||||
// execute the query.
|
||||
// make the actual SQL query (with parameters inserted) available
|
||||
conn.trace(Some(|x| {trace!("SQL trace: {:?}", x)}));
|
||||
let mut stmt = conn.prepare_cached(&q)?;
|
||||
let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
|
||||
|
||||
let mut first_result = true;
|
||||
while let Some(row) = event_rows.next()? {
|
||||
let first_event_elapsed = start.elapsed();
|
||||
slow_first_event = first_event_elapsed >= slow_cutoff;
|
||||
if first_result {
|
||||
debug!(
|
||||
"time to first result: {:?} (cid={}, sub={:?})",
|
||||
start.elapsed(),
|
||||
client_id,
|
||||
sub.id
|
||||
"first result in {:?} (cid: {}, sub: {:?}) [used indexes: {:?}]",
|
||||
first_event_elapsed, client_id, sub.id, idxs
|
||||
);
|
||||
first_result = false;
|
||||
}
|
||||
// check if this is still active
|
||||
// TODO: check every N rows
|
||||
if abandon_query_rx.try_recv().is_ok() {
|
||||
debug!("query aborted (sub={:?})", sub.id);
|
||||
// logging for slow queries; show sub and SQL.
|
||||
// to reduce logging; only show 1/16th of clients (leading 0)
|
||||
if row_count == 0 && slow_first_event && client_id.starts_with('0') {
|
||||
debug!(
|
||||
"query req (slow): {:?} (cid: {}, sub: {:?})",
|
||||
sub, client_id, sub.id
|
||||
);
|
||||
}
|
||||
// check if a checkpoint is trying to run, and abort
|
||||
if row_count % 100 == 0 {
|
||||
{
|
||||
if safe_to_read.try_lock().is_err() {
|
||||
// lock was held, abort this query
|
||||
debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check if this is still active; every 100 rows
|
||||
if row_count % 100 == 0 && abandon_query_rx.try_recv().is_ok() {
|
||||
debug!("query aborted (cid: {}, sub: {:?})", client_id, sub.id);
|
||||
return Ok(());
|
||||
}
|
||||
row_count += 1;
|
||||
let event_json = row.get(0)?;
|
||||
loop {
|
||||
if query_tx.capacity() != 0 {
|
||||
// we have capacity to add another item
|
||||
break;
|
||||
} else {
|
||||
// the queue is full
|
||||
trace!("db reader thread is stalled");
|
||||
if last_successful_send + abort_cutoff < Instant::now() {
|
||||
// the queue has been full for too long, abort
|
||||
info!("aborting database query due to slow client (cid: {}, sub: {:?})",
|
||||
client_id, sub.id);
|
||||
let ok: Result<()> = Ok(());
|
||||
return ok;
|
||||
}
|
||||
// check if a checkpoint is trying to run, and abort
|
||||
if safe_to_read.try_lock().is_err() {
|
||||
// lock was held, abort this query
|
||||
debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id);
|
||||
return Ok(());
|
||||
}
|
||||
// give the queue a chance to clear before trying again
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
}
|
||||
}
|
||||
// TODO: we could use try_send, but we'd have to juggle
|
||||
// getting the query result back as part of the error
|
||||
// result.
|
||||
query_tx
|
||||
.blocking_send(QueryResult {
|
||||
sub_id: sub.get_id(),
|
||||
event: event_json,
|
||||
})
|
||||
.ok();
|
||||
last_successful_send = Instant::now();
|
||||
}
|
||||
query_tx
|
||||
.blocking_send(QueryResult {
|
||||
@@ -641,11 +941,12 @@ pub async fn db_query(
|
||||
})
|
||||
.ok();
|
||||
debug!(
|
||||
"query completed ({} rows) in {:?} (cid={}, sub={:?})",
|
||||
row_count,
|
||||
start.elapsed(),
|
||||
"query completed in {:?} (cid: {}, sub: {:?}, db_time: {:?}, rows: {})",
|
||||
pre_spawn_start.elapsed(),
|
||||
client_id,
|
||||
sub.id
|
||||
sub.id,
|
||||
start.elapsed(),
|
||||
row_count
|
||||
);
|
||||
} else {
|
||||
warn!("Could not get a database connection for querying");
|
||||
|
@@ -80,7 +80,7 @@ impl FromStr for Operator {
|
||||
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
|
||||
pub struct ConditionQuery {
|
||||
pub(crate) conditions: Vec<Condition>,
|
||||
pub conditions: Vec<Condition>,
|
||||
}
|
||||
|
||||
impl ConditionQuery {
|
||||
@@ -137,9 +137,9 @@ pub fn validate_delegation(
|
||||
/// An example complex condition would be: kind=1,2,3&created_at<1665265999
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
|
||||
pub struct Condition {
|
||||
pub(crate) field: Field,
|
||||
pub(crate) operator: Operator,
|
||||
pub(crate) values: Vec<u64>,
|
||||
pub field: Field,
|
||||
pub operator: Operator,
|
||||
pub values: Vec<u64>,
|
||||
}
|
||||
|
||||
impl Condition {
|
||||
@@ -332,19 +332,6 @@ mod tests {
|
||||
assert_eq!(parsed, cq);
|
||||
Ok(())
|
||||
}
|
||||
fn simple_event() -> Event {
|
||||
Event {
|
||||
id: "0".to_owned(),
|
||||
pubkey: "0".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 0,
|
||||
kind: 0,
|
||||
tags: vec![],
|
||||
content: "".to_owned(),
|
||||
sig: "0".to_owned(),
|
||||
tagidx: None,
|
||||
}
|
||||
}
|
||||
// Check for condition logic on event w/ empty values
|
||||
#[test]
|
||||
fn condition_with_empty_values() {
|
||||
@@ -353,7 +340,7 @@ mod tests {
|
||||
operator: Operator::GreaterThan,
|
||||
values: vec![],
|
||||
};
|
||||
let e = simple_event();
|
||||
let e = Event::simple_event();
|
||||
assert!(!c.allows_event(&e));
|
||||
c.operator = Operator::LessThan;
|
||||
assert!(!c.allows_event(&e));
|
||||
@@ -373,7 +360,7 @@ mod tests {
|
||||
operator: Operator::GreaterThan,
|
||||
values: vec![10],
|
||||
};
|
||||
let mut e = simple_event();
|
||||
let mut e = Event::simple_event();
|
||||
// kind is not greater than 10, not allowed
|
||||
e.kind = 1;
|
||||
assert!(!c.allows_event(&e));
|
||||
@@ -392,7 +379,7 @@ mod tests {
|
||||
operator: Operator::Equals,
|
||||
values: vec![0, 10, 20],
|
||||
};
|
||||
let mut e = simple_event();
|
||||
let mut e = Event::simple_event();
|
||||
// Allow if event kind is in list for Equals
|
||||
e.kind = 10;
|
||||
assert!(c.allows_event(&e));
|
||||
|
12
src/error.rs
12
src/error.rs
@@ -17,10 +17,16 @@ pub enum Error {
|
||||
ConnWriteError,
|
||||
#[error("EVENT parse failed")]
|
||||
EventParseFailed,
|
||||
#[error("ClOSE message parse failed")]
|
||||
#[error("CLOSE message parse failed")]
|
||||
CloseParseFailed,
|
||||
#[error("Event validation failed")]
|
||||
EventInvalid,
|
||||
#[error("Event invalid signature")]
|
||||
EventInvalidSignature,
|
||||
#[error("Event invalid id")]
|
||||
EventInvalidId,
|
||||
#[error("Event malformed pubkey")]
|
||||
EventMalformedPubkey,
|
||||
#[error("Event could not canonicalize")]
|
||||
EventCouldNotCanonicalize,
|
||||
#[error("Event too large")]
|
||||
EventMaxLengthError(usize),
|
||||
#[error("Subscription identifier max length exceeded")]
|
||||
|
110
src/event.rs
110
src/event.rs
@@ -27,23 +27,29 @@ pub struct EventCmd {
|
||||
event: Event,
|
||||
}
|
||||
|
||||
impl EventCmd {
|
||||
pub fn event_id(&self) -> &str {
|
||||
&self.event.id
|
||||
}
|
||||
}
|
||||
|
||||
/// Parsed nostr event.
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
|
||||
pub struct Event {
|
||||
pub id: String,
|
||||
pub(crate) pubkey: String,
|
||||
pub pubkey: String,
|
||||
#[serde(skip)]
|
||||
pub(crate) delegated_by: Option<String>,
|
||||
pub(crate) created_at: u64,
|
||||
pub(crate) kind: u64,
|
||||
pub delegated_by: Option<String>,
|
||||
pub created_at: u64,
|
||||
pub kind: u64,
|
||||
#[serde(deserialize_with = "tag_from_string")]
|
||||
// NOTE: array-of-arrays may need to be more general than a string container
|
||||
pub(crate) tags: Vec<Vec<String>>,
|
||||
pub(crate) content: String,
|
||||
pub(crate) sig: String,
|
||||
pub tags: Vec<Vec<String>>,
|
||||
pub content: String,
|
||||
pub sig: String,
|
||||
// Optimization for tag search, built on demand.
|
||||
#[serde(skip)]
|
||||
pub(crate) tagidx: Option<HashMap<char, HashSet<String>>>,
|
||||
pub tagidx: Option<HashMap<char, HashSet<String>>>,
|
||||
}
|
||||
|
||||
/// Simple tag type for array of array of strings.
|
||||
@@ -83,22 +89,42 @@ impl From<EventCmd> for Result<Event> {
|
||||
// ensure command is correct
|
||||
if ec.cmd != "EVENT" {
|
||||
Err(CommandUnknownError)
|
||||
} else if ec.event.is_valid() {
|
||||
let mut e = ec.event;
|
||||
e.build_index();
|
||||
e.update_delegation();
|
||||
Ok(e)
|
||||
} else {
|
||||
Err(EventInvalid)
|
||||
ec.event.validate().map(|_| {
|
||||
let mut e = ec.event;
|
||||
e.build_index();
|
||||
e.update_delegation();
|
||||
e
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Event {
|
||||
#[cfg(test)]
|
||||
pub fn simple_event() -> Event {
|
||||
Event {
|
||||
id: "0".to_owned(),
|
||||
pubkey: "0".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 0,
|
||||
kind: 0,
|
||||
tags: vec![],
|
||||
content: "".to_owned(),
|
||||
sig: "0".to_owned(),
|
||||
tagidx: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_kind_metadata(&self) -> bool {
|
||||
self.kind == 0
|
||||
}
|
||||
|
||||
/// Should this event be replaced with newer timestamps from same author?
|
||||
pub fn is_replaceable(&self) -> bool {
|
||||
self.kind == 0 || self.kind == 3 || self.kind == 41 || (self.kind >= 10000 && self.kind < 20000)
|
||||
}
|
||||
|
||||
/// Pull a NIP-05 Name out of the event, if one exists
|
||||
pub fn get_nip05_addr(&self) -> Option<nip05::Nip05Name> {
|
||||
if self.is_kind_metadata() {
|
||||
@@ -155,11 +181,11 @@ impl Event {
|
||||
}
|
||||
|
||||
/// Update delegation status
|
||||
fn update_delegation(&mut self) {
|
||||
pub fn update_delegation(&mut self) {
|
||||
self.delegated_by = self.delegated_author();
|
||||
}
|
||||
/// Build an event tag index
|
||||
fn build_index(&mut self) {
|
||||
pub fn build_index(&mut self) {
|
||||
// if there are no tags; just leave the index as None
|
||||
if self.tags.is_empty() {
|
||||
return;
|
||||
@@ -220,7 +246,7 @@ impl Event {
|
||||
}
|
||||
|
||||
/// Check if this event has a valid signature.
|
||||
fn is_valid(&self) -> bool {
|
||||
pub fn validate(&self) -> Result<()> {
|
||||
// TODO: return a Result with a reason for invalid events
|
||||
// validation is performed by:
|
||||
// * parsing JSON string into event fields
|
||||
@@ -229,8 +255,8 @@ impl Event {
|
||||
// * serialize with no spaces/newlines
|
||||
let c_opt = self.to_canonical();
|
||||
if c_opt.is_none() {
|
||||
debug!("event could not be canonicalized");
|
||||
return false;
|
||||
debug!("could not canonicalize");
|
||||
return Err(EventCouldNotCanonicalize);
|
||||
}
|
||||
let c = c_opt.unwrap();
|
||||
// * compute the sha256sum.
|
||||
@@ -239,21 +265,21 @@ impl Event {
|
||||
// * ensure the id matches the computed sha256sum.
|
||||
if self.id != hex_digest {
|
||||
debug!("event id does not match digest");
|
||||
return false;
|
||||
return Err(EventInvalidId);
|
||||
}
|
||||
// * validate the message digest (sig) using the pubkey & computed sha256 message hash.
|
||||
let sig = schnorr::Signature::from_str(&self.sig).unwrap();
|
||||
if let Ok(msg) = secp256k1::Message::from_slice(digest.as_ref()) {
|
||||
if let Ok(pubkey) = XOnlyPublicKey::from_str(&self.pubkey) {
|
||||
let verify = SECP.verify_schnorr(&sig, &msg, &pubkey);
|
||||
matches!(verify, Ok(()))
|
||||
SECP.verify_schnorr(&sig, &msg, &pubkey)
|
||||
.map_err(|_| EventInvalidSignature)
|
||||
} else {
|
||||
debug!("client sent malformed pubkey");
|
||||
false
|
||||
Err(EventMalformedPubkey)
|
||||
}
|
||||
} else {
|
||||
info!("error converting digest to secp256k1 message");
|
||||
false
|
||||
Err(EventInvalidSignature)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -313,31 +339,18 @@ impl Event {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
fn simple_event() -> Event {
|
||||
Event {
|
||||
id: "0".to_owned(),
|
||||
pubkey: "0".to_owned(),
|
||||
delegated_by: None,
|
||||
created_at: 0,
|
||||
kind: 0,
|
||||
tags: vec![],
|
||||
content: "".to_owned(),
|
||||
sig: "0".to_owned(),
|
||||
tagidx: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_creation() {
|
||||
// create an event
|
||||
let event = simple_event();
|
||||
let event = Event::simple_event();
|
||||
assert_eq!(event.id, "0");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_serialize() -> Result<()> {
|
||||
// serialize an event to JSON string
|
||||
let event = simple_event();
|
||||
let event = Event::simple_event();
|
||||
let j = serde_json::to_string(&event)?;
|
||||
assert_eq!(j, "{\"id\":\"0\",\"pubkey\":\"0\",\"created_at\":0,\"kind\":0,\"tags\":[],\"content\":\"\",\"sig\":\"0\"}");
|
||||
Ok(())
|
||||
@@ -345,14 +358,14 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn empty_event_tag_match() {
|
||||
let event = simple_event();
|
||||
let event = Event::simple_event();
|
||||
assert!(!event
|
||||
.generic_tag_val_intersect('e', &HashSet::from(["foo".to_owned(), "bar".to_owned()])));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn single_event_tag_match() {
|
||||
let mut event = simple_event();
|
||||
let mut event = Event::simple_event();
|
||||
event.tags = vec![vec!["e".to_owned(), "foo".to_owned()]];
|
||||
event.build_index();
|
||||
assert_eq!(
|
||||
@@ -367,7 +380,7 @@ mod tests {
|
||||
#[test]
|
||||
fn event_tags_serialize() -> Result<()> {
|
||||
// serialize an event with tags to JSON string
|
||||
let mut event = simple_event();
|
||||
let mut event = Event::simple_event();
|
||||
event.tags = vec![
|
||||
vec![
|
||||
"e".to_owned(),
|
||||
@@ -491,4 +504,17 @@ mod tests {
|
||||
let expected = Some(expected_json.to_owned());
|
||||
assert_eq!(c, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn replaceable_event() {
|
||||
let mut event = Event::simple_event();
|
||||
event.kind=0;
|
||||
assert!(event.is_replaceable());
|
||||
event.kind=3;
|
||||
assert!(event.is_replaceable());
|
||||
event.kind=12000;
|
||||
assert!(event.is_replaceable());
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -3,7 +3,7 @@ use crate::utils::is_hex;
|
||||
use hex;
|
||||
|
||||
/// Types of hexadecimal queries.
|
||||
#[derive(PartialEq, Eq, Debug, Clone)]
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone)]
|
||||
pub enum HexSearch {
|
||||
// when no range is needed, exact 32-byte
|
||||
Exact(Vec<u8>),
|
||||
|
@@ -35,7 +35,7 @@ impl From<config::Info> for RelayInfo {
|
||||
description: i.description,
|
||||
pubkey: i.pubkey,
|
||||
contact: i.contact,
|
||||
supported_nips: Some(vec![1, 2, 9, 11, 12, 15, 16, 22, 26]),
|
||||
supported_nips: Some(vec![1, 2, 9, 11, 12, 15, 16, 20, 22]),
|
||||
software: Some("https://git.sr.ht/~gheartsfield/nostr-rs-relay".to_owned()),
|
||||
version: CARGO_PKG_VERSION.map(|x| x.to_owned()),
|
||||
}
|
||||
|
@@ -1,3 +1,4 @@
|
||||
pub mod cli;
|
||||
pub mod close;
|
||||
pub mod config;
|
||||
pub mod conn;
|
||||
@@ -8,6 +9,7 @@ pub mod event;
|
||||
pub mod hexrange;
|
||||
pub mod info;
|
||||
pub mod nip05;
|
||||
pub mod notice;
|
||||
pub mod schema;
|
||||
pub mod subscription;
|
||||
pub mod utils;
|
||||
|
33
src/main.rs
33
src/main.rs
@@ -1,44 +1,43 @@
|
||||
//! Server process
|
||||
|
||||
use clap::Parser;
|
||||
use nostr_rs_relay::cli::*;
|
||||
use nostr_rs_relay::config;
|
||||
use nostr_rs_relay::server::start_server;
|
||||
use std::env;
|
||||
use std::sync::mpsc as syncmpsc;
|
||||
use std::sync::mpsc::{Receiver as MpscReceiver, Sender as MpscSender};
|
||||
use std::thread;
|
||||
use tracing::info;
|
||||
|
||||
|
||||
use console_subscriber::ConsoleLayer;
|
||||
|
||||
/// Return a requested DB name from command line arguments.
|
||||
fn db_from_args(args: &[String]) -> Option<String> {
|
||||
if args.len() == 3 && args.get(1) == Some(&"--db".to_owned()) {
|
||||
return args.get(2).map(std::clone::Clone::clone);
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Start running a Nostr relay server.
|
||||
fn main() {
|
||||
// setup tracing
|
||||
let _trace_sub = tracing_subscriber::fmt::try_init();
|
||||
info!("Starting up from main");
|
||||
// get database directory from args
|
||||
let args: Vec<String> = env::args().collect();
|
||||
let db_dir: Option<String> = db_from_args(&args);
|
||||
// configure settings from config.toml
|
||||
// replace default settings with those read from config.toml
|
||||
let mut settings = config::Settings::new();
|
||||
|
||||
// setup tracing
|
||||
if settings.diagnostics.tracing {
|
||||
// enable tracing with tokio-console
|
||||
ConsoleLayer::builder().with_default_env().init();
|
||||
} else {
|
||||
// standard logging
|
||||
tracing_subscriber::fmt::try_init().unwrap();
|
||||
}
|
||||
info!("Starting up from main");
|
||||
|
||||
let args = CLIArgs::parse();
|
||||
|
||||
// get database directory from args
|
||||
let db_dir = args.db;
|
||||
|
||||
// update with database location
|
||||
if let Some(db) = db_dir {
|
||||
settings.database.data_directory = db;
|
||||
if db_dir.len() > 0 {
|
||||
settings.database.data_directory = db_dir;
|
||||
}
|
||||
|
||||
let (_, ctrl_rx): (MpscSender<()>, MpscReceiver<()>) = syncmpsc::channel();
|
||||
// run this in a new thread
|
||||
let handle = thread::spawn(|| {
|
||||
|
@@ -517,7 +517,7 @@ impl Verifier {
|
||||
Ok(updated) => {
|
||||
if updated != 0 {
|
||||
info!(
|
||||
"persisted event: {:?} in {:?}",
|
||||
"persisted event (new verified pubkey): {:?} in {:?}",
|
||||
event.get_event_id_prefix(),
|
||||
start.elapsed()
|
||||
);
|
||||
@@ -721,7 +721,7 @@ pub fn query_oldest_user_verification(
|
||||
earliest: u64,
|
||||
) -> Result<VerificationRecord> {
|
||||
let tx = conn.transaction()?;
|
||||
let query = "SELECT v.id, v.name, e.event_hash, e.author, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v LEFT JOIN event e ON e.id=v.metadata_event WHERE (v.verified_at < ? OR v.verified_at IS NULL) AND (v.failed_at < ? OR v.failed_at IS NULL) ORDER BY v.verified_at ASC, v.failed_at ASC LIMIT 1;";
|
||||
let query = "SELECT v.id, v.name, e.event_hash, e.author, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v INNER JOIN event e ON e.id=v.metadata_event WHERE (v.verified_at < ? OR v.verified_at IS NULL) AND (v.failed_at < ? OR v.failed_at IS NULL) ORDER BY v.verified_at ASC, v.failed_at ASC LIMIT 1;";
|
||||
let mut stmt = tx.prepare_cached(query)?;
|
||||
let fields = stmt.query_row(params![earliest, earliest], |r| {
|
||||
let rowid: u64 = r.get(0)?;
|
||||
|
86
src/notice.rs
Normal file
86
src/notice.rs
Normal file
@@ -0,0 +1,86 @@
|
||||
pub enum EventResultStatus {
|
||||
Saved,
|
||||
Duplicate,
|
||||
Invalid,
|
||||
Blocked,
|
||||
RateLimited,
|
||||
Error,
|
||||
}
|
||||
|
||||
pub struct EventResult {
|
||||
pub id: String,
|
||||
pub msg: String,
|
||||
pub status: EventResultStatus,
|
||||
}
|
||||
|
||||
pub enum Notice {
|
||||
Message(String),
|
||||
EventResult(EventResult),
|
||||
}
|
||||
|
||||
impl EventResultStatus {
|
||||
pub fn to_bool(&self) -> bool {
|
||||
match self {
|
||||
Self::Saved => true,
|
||||
Self::Duplicate => true,
|
||||
Self::Invalid => false,
|
||||
Self::Blocked => false,
|
||||
Self::RateLimited => false,
|
||||
Self::Error => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn prefix(&self) -> &'static str {
|
||||
match self {
|
||||
Self::Saved => "saved",
|
||||
Self::Duplicate => "duplicate",
|
||||
Self::Invalid => "invalid",
|
||||
Self::Blocked => "blocked",
|
||||
Self::RateLimited => "rate-limited",
|
||||
Self::Error => "error",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Notice {
|
||||
//pub fn err(err: error::Error, id: String) -> Notice {
|
||||
// Notice::err_msg(format!("{}", err), id)
|
||||
//}
|
||||
|
||||
pub fn message(msg: String) -> Notice {
|
||||
Notice::Message(msg)
|
||||
}
|
||||
|
||||
fn prefixed(id: String, msg: &str, status: EventResultStatus) -> Notice {
|
||||
let msg = format!("{}: {}", status.prefix(), msg);
|
||||
Notice::EventResult(EventResult { id, msg, status })
|
||||
}
|
||||
|
||||
pub fn invalid(id: String, msg: &str) -> Notice {
|
||||
Notice::prefixed(id, msg, EventResultStatus::Invalid)
|
||||
}
|
||||
|
||||
pub fn blocked(id: String, msg: &str) -> Notice {
|
||||
Notice::prefixed(id, msg, EventResultStatus::Blocked)
|
||||
}
|
||||
|
||||
pub fn rate_limited(id: String, msg: &str) -> Notice {
|
||||
Notice::prefixed(id, msg, EventResultStatus::RateLimited)
|
||||
}
|
||||
|
||||
pub fn duplicate(id: String) -> Notice {
|
||||
Notice::prefixed(id, "", EventResultStatus::Duplicate)
|
||||
}
|
||||
|
||||
pub fn error(id: String, msg: &str) -> Notice {
|
||||
Notice::prefixed(id, msg, EventResultStatus::Error)
|
||||
}
|
||||
|
||||
pub fn saved(id: String) -> Notice {
|
||||
Notice::EventResult(EventResult {
|
||||
id,
|
||||
msg: "".into(),
|
||||
status: EventResultStatus::Saved,
|
||||
})
|
||||
}
|
||||
}
|
306
src/schema.rs
306
src/schema.rs
@@ -13,21 +13,22 @@ use tracing::{debug, error, info};
|
||||
|
||||
/// Startup DB Pragmas
|
||||
pub const STARTUP_SQL: &str = r##"
|
||||
PRAGMA main.synchronous=NORMAL;
|
||||
PRAGMA main.synchronous = NORMAL;
|
||||
PRAGMA foreign_keys = ON;
|
||||
PRAGMA journal_size_limit=32768;
|
||||
pragma mmap_size = 536870912; -- 512MB of mmap
|
||||
PRAGMA journal_size_limit = 32768;
|
||||
pragma mmap_size = 17179869184; -- cap mmap at 16GB
|
||||
"##;
|
||||
|
||||
/// Latest database version
|
||||
pub const DB_VERSION: usize = 7;
|
||||
pub const DB_VERSION: usize = 15;
|
||||
|
||||
/// Schema definition
|
||||
const INIT_SQL: &str = formatcp!(
|
||||
r##"
|
||||
-- Database settings
|
||||
PRAGMA encoding = "UTF-8";
|
||||
PRAGMA journal_mode=WAL;
|
||||
PRAGMA journal_mode = WAL;
|
||||
PRAGMA auto_vacuum = FULL;
|
||||
PRAGMA main.synchronous=NORMAL;
|
||||
PRAGMA foreign_keys = ON;
|
||||
PRAGMA application_id = 1654008667;
|
||||
@@ -48,10 +49,15 @@ content TEXT NOT NULL -- serialized json of event object
|
||||
|
||||
-- Event Indexes
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS event_hash_index ON event(event_hash);
|
||||
CREATE INDEX IF NOT EXISTS created_at_index ON event(created_at);
|
||||
CREATE INDEX IF NOT EXISTS author_index ON event(author);
|
||||
CREATE INDEX IF NOT EXISTS delegated_by_index ON event(delegated_by);
|
||||
CREATE INDEX IF NOT EXISTS kind_index ON event(kind);
|
||||
CREATE INDEX IF NOT EXISTS created_at_index ON event(created_at);
|
||||
CREATE INDEX IF NOT EXISTS delegated_by_index ON event(delegated_by);
|
||||
CREATE INDEX IF NOT EXISTS event_composite_index ON event(kind,created_at);
|
||||
CREATE INDEX IF NOT EXISTS kind_author_index ON event(kind,author);
|
||||
CREATE INDEX IF NOT EXISTS kind_created_at_index ON event(kind,created_at);
|
||||
CREATE INDEX IF NOT EXISTS author_created_at_index ON event(author,created_at);
|
||||
CREATE INDEX IF NOT EXISTS author_kind_index ON event(author,kind);
|
||||
|
||||
-- Tag Table
|
||||
-- Tag values are stored as either a BLOB (if they come in as a
|
||||
@@ -67,6 +73,8 @@ FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS tag_val_index ON tag(value);
|
||||
CREATE INDEX IF NOT EXISTS tag_val_hex_index ON tag(value_hex);
|
||||
CREATE INDEX IF NOT EXISTS tag_composite_index ON tag(event_id,name,value_hex,value);
|
||||
CREATE INDEX IF NOT EXISTS tag_name_eid_index ON tag(name,event_id,value_hex);
|
||||
|
||||
-- NIP-05 User Validation
|
||||
CREATE TABLE IF NOT EXISTS user_verification (
|
||||
@@ -91,6 +99,20 @@ pub fn curr_db_version(conn: &mut Connection) -> Result<usize> {
|
||||
Ok(curr_version)
|
||||
}
|
||||
|
||||
/// Determine event count
|
||||
pub fn db_event_count(conn: &mut Connection) -> Result<usize> {
|
||||
let query = "SELECT count(*) FROM event;";
|
||||
let count = conn.query_row(query, [], |row| row.get(0))?;
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
/// Determine tag count
|
||||
pub fn db_tag_count(conn: &mut Connection) -> Result<usize> {
|
||||
let query = "SELECT count(*) FROM tag;";
|
||||
let count = conn.query_row(query, [], |row| row.get(0))?;
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
fn mig_init(conn: &mut PooledConnection) -> Result<usize> {
|
||||
match conn.execute_batch(INIT_SQL) {
|
||||
Ok(()) => {
|
||||
@@ -138,25 +160,46 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<()> {
|
||||
if curr_version == 1 {
|
||||
curr_version = mig_1_to_2(conn)?;
|
||||
}
|
||||
|
||||
if curr_version == 2 {
|
||||
curr_version = mig_2_to_3(conn)?;
|
||||
}
|
||||
|
||||
if curr_version == 3 {
|
||||
curr_version = mig_3_to_4(conn)?;
|
||||
}
|
||||
|
||||
if curr_version == 4 {
|
||||
curr_version = mig_4_to_5(conn)?;
|
||||
}
|
||||
|
||||
if curr_version == 5 {
|
||||
curr_version = mig_5_to_6(conn)?;
|
||||
}
|
||||
if curr_version == 6 {
|
||||
curr_version = mig_6_to_7(conn)?;
|
||||
}
|
||||
if curr_version == 7 {
|
||||
curr_version = mig_7_to_8(conn)?;
|
||||
}
|
||||
if curr_version == 8 {
|
||||
curr_version = mig_8_to_9(conn)?;
|
||||
}
|
||||
if curr_version == 9 {
|
||||
curr_version = mig_9_to_10(conn)?;
|
||||
}
|
||||
if curr_version == 10 {
|
||||
curr_version = mig_10_to_11(conn)?;
|
||||
}
|
||||
if curr_version == 11 {
|
||||
curr_version = mig_11_to_12(conn)?;
|
||||
}
|
||||
if curr_version == 12 {
|
||||
curr_version = mig_12_to_13(conn)?;
|
||||
}
|
||||
if curr_version == 13 {
|
||||
curr_version = mig_13_to_14(conn)?;
|
||||
}
|
||||
if curr_version == 14 {
|
||||
curr_version = mig_14_to_15(conn)?;
|
||||
}
|
||||
|
||||
if curr_version == DB_VERSION {
|
||||
info!(
|
||||
"All migration scripts completed successfully. Welcome to v{}.",
|
||||
@@ -183,6 +226,62 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn rebuild_tags(conn: &mut PooledConnection) -> Result<()> {
|
||||
// Check how many events we have to process
|
||||
let count = db_event_count(conn)?;
|
||||
let update_each_percent = 0.05;
|
||||
let mut percent_done = 0.0;
|
||||
let mut events_processed = 0;
|
||||
let start = Instant::now();
|
||||
let tx = conn.transaction()?;
|
||||
{
|
||||
// Clear out table
|
||||
tx.execute("DELETE FROM tag;", [])?;
|
||||
let mut stmt = tx.prepare("select id, content from event order by id;")?;
|
||||
let mut tag_rows = stmt.query([])?;
|
||||
while let Some(row) = tag_rows.next()? {
|
||||
if (events_processed as f32)/(count as f32) > percent_done {
|
||||
info!("Tag update {}% complete...", (100.0*percent_done).round());
|
||||
percent_done += update_each_percent;
|
||||
}
|
||||
// we want to capture the event_id that had the tag, the tag name, and the tag hex value.
|
||||
let event_id: u64 = row.get(0)?;
|
||||
let event_json: String = row.get(1)?;
|
||||
let event: Event = serde_json::from_str(&event_json)?;
|
||||
// look at each event, and each tag, creating new tag entries if appropriate.
|
||||
for t in event.tags.iter().filter(|x| x.len() > 1) {
|
||||
let tagname = t.get(0).unwrap();
|
||||
let tagnamechar_opt = single_char_tagname(tagname);
|
||||
if tagnamechar_opt.is_none() {
|
||||
continue;
|
||||
}
|
||||
// safe because len was > 1
|
||||
let tagval = t.get(1).unwrap();
|
||||
// insert as BLOB if we can restore it losslessly.
|
||||
// this means it needs to be even length and lowercase.
|
||||
if (tagval.len() % 2 == 0) && is_lower_hex(tagval) {
|
||||
tx.execute(
|
||||
"INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);",
|
||||
params![event_id, tagname, hex::decode(tagval).ok()],
|
||||
)?;
|
||||
} else {
|
||||
// otherwise, insert as text
|
||||
tx.execute(
|
||||
"INSERT INTO tag (event_id, name, value) VALUES (?1, ?2, ?3);",
|
||||
params![event_id, tagname, &tagval],
|
||||
)?;
|
||||
}
|
||||
}
|
||||
events_processed += 1;
|
||||
}
|
||||
}
|
||||
tx.commit()?;
|
||||
info!("rebuilt tags in {:?}", start.elapsed());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
|
||||
//// Migration Scripts
|
||||
|
||||
fn mig_1_to_2(conn: &mut PooledConnection) -> Result<usize> {
|
||||
@@ -314,7 +413,6 @@ fn mig_5_to_6(conn: &mut PooledConnection) -> Result<usize> {
|
||||
let mut stmt = tx.prepare("select id, content from event order by id;")?;
|
||||
let mut tag_rows = stmt.query([])?;
|
||||
while let Some(row) = tag_rows.next()? {
|
||||
// we want to capture the event_id that had the tag, the tag name, and the tag hex value.
|
||||
let event_id: u64 = row.get(0)?;
|
||||
let event_json: String = row.get(1)?;
|
||||
let event: Event = serde_json::from_str(&event_json)?;
|
||||
@@ -332,7 +430,7 @@ fn mig_5_to_6(conn: &mut PooledConnection) -> Result<usize> {
|
||||
if (tagval.len() % 2 == 0) && is_lower_hex(tagval) {
|
||||
tx.execute(
|
||||
"INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);",
|
||||
params![event_id, tagname, hex::decode(&tagval).ok()],
|
||||
params![event_id, tagname, hex::decode(tagval).ok()],
|
||||
)?;
|
||||
} else {
|
||||
// otherwise, insert as text
|
||||
@@ -356,7 +454,6 @@ fn mig_5_to_6(conn: &mut PooledConnection) -> Result<usize> {
|
||||
|
||||
fn mig_6_to_7(conn: &mut PooledConnection) -> Result<usize> {
|
||||
info!("database schema needs update from 6->7");
|
||||
// only change is adding a hidden column to events.
|
||||
let upgrade_sql = r##"
|
||||
ALTER TABLE event ADD delegated_by BLOB;
|
||||
CREATE INDEX IF NOT EXISTS delegated_by_index ON event(delegated_by);
|
||||
@@ -373,3 +470,184 @@ PRAGMA user_version = 7;
|
||||
}
|
||||
Ok(7)
|
||||
}
|
||||
|
||||
fn mig_7_to_8(conn: &mut PooledConnection) -> Result<usize> {
|
||||
info!("database schema needs update from 7->8");
|
||||
// Remove redundant indexes, and add a better multi-column index.
|
||||
let upgrade_sql = r##"
|
||||
DROP INDEX IF EXISTS created_at_index;
|
||||
DROP INDEX IF EXISTS kind_index;
|
||||
CREATE INDEX IF NOT EXISTS event_composite_index ON event(kind,created_at);
|
||||
PRAGMA user_version = 8;
|
||||
"##;
|
||||
match conn.execute_batch(upgrade_sql) {
|
||||
Ok(()) => {
|
||||
info!("database schema upgraded v7 -> v8");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
Ok(8)
|
||||
}
|
||||
|
||||
fn mig_8_to_9(conn: &mut PooledConnection) -> Result<usize> {
|
||||
info!("database schema needs update from 8->9");
|
||||
// Those old indexes were actually helpful...
|
||||
let upgrade_sql = r##"
|
||||
CREATE INDEX IF NOT EXISTS created_at_index ON event(created_at);
|
||||
CREATE INDEX IF NOT EXISTS event_composite_index ON event(kind,created_at);
|
||||
PRAGMA user_version = 9;
|
||||
"##;
|
||||
match conn.execute_batch(upgrade_sql) {
|
||||
Ok(()) => {
|
||||
info!("database schema upgraded v8 -> v9");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
Ok(9)
|
||||
}
|
||||
|
||||
fn mig_9_to_10(conn: &mut PooledConnection) -> Result<usize> {
|
||||
info!("database schema needs update from 9->10");
|
||||
// Those old indexes were actually helpful...
|
||||
let upgrade_sql = r##"
|
||||
CREATE INDEX IF NOT EXISTS tag_composite_index ON tag(event_id,name,value_hex,value);
|
||||
PRAGMA user_version = 10;
|
||||
"##;
|
||||
match conn.execute_batch(upgrade_sql) {
|
||||
Ok(()) => {
|
||||
info!("database schema upgraded v9 -> v10");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
Ok(10)
|
||||
}
|
||||
|
||||
fn mig_10_to_11(conn: &mut PooledConnection) -> Result<usize> {
|
||||
info!("database schema needs update from 10->11");
|
||||
// Those old indexes were actually helpful...
|
||||
let upgrade_sql = r##"
|
||||
CREATE INDEX IF NOT EXISTS tag_name_eid_index ON tag(name,event_id,value_hex);
|
||||
reindex;
|
||||
pragma optimize;
|
||||
PRAGMA user_version = 11;
|
||||
"##;
|
||||
match conn.execute_batch(upgrade_sql) {
|
||||
Ok(()) => {
|
||||
info!("database schema upgraded v10 -> v11");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
Ok(11)
|
||||
}
|
||||
|
||||
fn mig_11_to_12(conn: &mut PooledConnection) -> Result<usize> {
|
||||
info!("database schema needs update from 11->12");
|
||||
let start = Instant::now();
|
||||
let tx = conn.transaction()?;
|
||||
{
|
||||
// Lookup every replaceable event
|
||||
let mut stmt = tx.prepare("select kind,author from event where kind in (0,3,41) or (kind>=10000 and kind<20000) order by id;")?;
|
||||
let mut replaceable_rows = stmt.query([])?;
|
||||
info!("updating replaceable events; this could take awhile...");
|
||||
while let Some(row) = replaceable_rows.next()? {
|
||||
// we want to capture the event_id that had the tag, the tag name, and the tag hex value.
|
||||
let event_kind: u64 = row.get(0)?;
|
||||
let event_author: Vec<u8> = row.get(1)?;
|
||||
tx.execute(
|
||||
"UPDATE event SET hidden=TRUE WHERE hidden!=TRUE and kind=? and author=? and id NOT IN (SELECT id FROM event WHERE kind=? AND author=? ORDER BY created_at DESC LIMIT 1)",
|
||||
params![event_kind, event_author, event_kind, event_author],
|
||||
)?;
|
||||
}
|
||||
tx.execute("PRAGMA user_version = 12;", [])?;
|
||||
}
|
||||
tx.commit()?;
|
||||
info!("database schema upgraded v11 -> v12 in {:?}", start.elapsed());
|
||||
// vacuum after large table modification
|
||||
let start = Instant::now();
|
||||
conn.execute("VACUUM;", [])?;
|
||||
info!("vacuumed DB after hidden event cleanup in {:?}", start.elapsed());
|
||||
Ok(12)
|
||||
}
|
||||
|
||||
fn mig_12_to_13(conn: &mut PooledConnection) -> Result<usize> {
|
||||
info!("database schema needs update from 12->13");
|
||||
let upgrade_sql = r##"
|
||||
CREATE INDEX IF NOT EXISTS kind_author_index ON event(kind,author);
|
||||
reindex;
|
||||
pragma optimize;
|
||||
PRAGMA user_version = 13;
|
||||
"##;
|
||||
match conn.execute_batch(upgrade_sql) {
|
||||
Ok(()) => {
|
||||
info!("database schema upgraded v12 -> v13");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
Ok(13)
|
||||
}
|
||||
|
||||
fn mig_13_to_14(conn: &mut PooledConnection) -> Result<usize> {
|
||||
info!("database schema needs update from 13->14");
|
||||
let upgrade_sql = r##"
|
||||
CREATE INDEX IF NOT EXISTS kind_index ON event(kind);
|
||||
CREATE INDEX IF NOT EXISTS kind_created_at_index ON event(kind,created_at);
|
||||
pragma optimize;
|
||||
PRAGMA user_version = 14;
|
||||
"##;
|
||||
match conn.execute_batch(upgrade_sql) {
|
||||
Ok(()) => {
|
||||
info!("database schema upgraded v13 -> v14");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
Ok(14)
|
||||
}
|
||||
|
||||
fn mig_14_to_15(conn: &mut PooledConnection) -> Result<usize> {
|
||||
info!("database schema needs update from 14->15");
|
||||
let upgrade_sql = r##"
|
||||
CREATE INDEX IF NOT EXISTS author_created_at_index ON event(author,created_at);
|
||||
CREATE INDEX IF NOT EXISTS author_kind_index ON event(author,kind);
|
||||
PRAGMA user_version = 15;
|
||||
"##;
|
||||
match conn.execute_batch(upgrade_sql) {
|
||||
Ok(()) => {
|
||||
info!("database schema upgraded v14 -> v15");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
// clear out hidden events
|
||||
let clear_hidden_sql = r##"DELETE FROM event WHERE HIDDEN=true;"##;
|
||||
info!("removing hidden events; this may take awhile...");
|
||||
match conn.execute_batch(clear_hidden_sql) {
|
||||
Ok(()) => {
|
||||
info!("all hidden events removed");
|
||||
},
|
||||
Err(err) => {
|
||||
error!("delete failed: {}", err);
|
||||
panic!("could not remove hidden events");
|
||||
}
|
||||
}
|
||||
Ok(15)
|
||||
}
|
||||
|
250
src/server.rs
250
src/server.rs
@@ -10,21 +10,28 @@ use crate::event::Event;
|
||||
use crate::event::EventCmd;
|
||||
use crate::info::RelayInfo;
|
||||
use crate::nip05;
|
||||
use crate::notice::Notice;
|
||||
use crate::subscription::Subscription;
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
use governor::{Jitter, Quota, RateLimiter};
|
||||
use http::header::HeaderMap;
|
||||
use hyper::header::ACCEPT;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::upgrade::Upgraded;
|
||||
use hyper::{
|
||||
header, server::conn::AddrStream, upgrade, Body, Request, Response, Server, StatusCode,
|
||||
};
|
||||
use rusqlite::OpenFlags;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use tokio::sync::Mutex;
|
||||
use std::collections::HashMap;
|
||||
use std::convert::Infallible;
|
||||
use std::net::SocketAddr;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::mpsc::Receiver as MpscReceiver;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
@@ -49,6 +56,7 @@ async fn handle_web_request(
|
||||
broadcast: Sender<Event>,
|
||||
event_tx: tokio::sync::mpsc::Sender<SubmittedEvent>,
|
||||
shutdown: Receiver<()>,
|
||||
safe_to_read: Arc<Mutex<u64>>,
|
||||
) -> Result<Response<Body>, Infallible> {
|
||||
match (
|
||||
request.uri().path(),
|
||||
@@ -71,6 +79,7 @@ async fn handle_web_request(
|
||||
Ok(upgraded) => {
|
||||
// set WebSocket configuration options
|
||||
let config = WebSocketConfig {
|
||||
max_send_queue: Some(1024),
|
||||
max_message_size: settings.limits.max_ws_message_bytes,
|
||||
max_frame_size: settings.limits.max_ws_frame_bytes,
|
||||
..Default::default()
|
||||
@@ -84,11 +93,35 @@ async fn handle_web_request(
|
||||
Some(config),
|
||||
)
|
||||
.await;
|
||||
|
||||
let origin = get_header_string("origin", request.headers());
|
||||
let user_agent = get_header_string("user-agent", request.headers());
|
||||
// determine the remote IP from headers if the exist
|
||||
let header_ip = settings
|
||||
.network
|
||||
.remote_ip_header
|
||||
.as_ref()
|
||||
.and_then(|x| get_header_string(x, request.headers()));
|
||||
// use the socket addr as a backup
|
||||
let remote_ip =
|
||||
header_ip.unwrap_or_else(|| remote_addr.ip().to_string());
|
||||
let client_info = ClientInfo {
|
||||
remote_ip,
|
||||
user_agent,
|
||||
origin,
|
||||
};
|
||||
// spawn a nostr server with our websocket
|
||||
tokio::spawn(nostr_server(
|
||||
pool, settings, ws_stream, broadcast, event_tx, shutdown,
|
||||
pool,
|
||||
client_info,
|
||||
settings,
|
||||
ws_stream,
|
||||
broadcast,
|
||||
event_tx,
|
||||
shutdown,
|
||||
safe_to_read,
|
||||
));
|
||||
}
|
||||
// todo: trace, don't print...
|
||||
Err(e) => println!(
|
||||
"error when trying to upgrade connection \
|
||||
from address {} to websocket connection. \
|
||||
@@ -148,6 +181,12 @@ async fn handle_web_request(
|
||||
}
|
||||
}
|
||||
|
||||
fn get_header_string(header: &str, headers: &HeaderMap) -> Option<String> {
|
||||
headers
|
||||
.get(header)
|
||||
.and_then(|x| x.to_str().ok().map(|x| x.to_string()))
|
||||
}
|
||||
|
||||
// return on a control-c or internally requested shutdown signal
|
||||
async fn ctrl_c_or_signal(mut shutdown_signal: Receiver<()>) {
|
||||
let mut term_signal = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
|
||||
@@ -167,7 +206,6 @@ async fn ctrl_c_or_signal(mut shutdown_signal: Receiver<()>) {
|
||||
info!("Shutting down webserver due to SIGTERM");
|
||||
break;
|
||||
},
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -215,7 +253,20 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
|
||||
// configure tokio runtime
|
||||
let rt = Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.thread_name("tokio-ws")
|
||||
.thread_name_fn(|| {
|
||||
// give each thread a unique numeric name
|
||||
static ATOMIC_ID: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
|
||||
let id = ATOMIC_ID.fetch_add(1,Ordering::SeqCst);
|
||||
format!("tokio-ws-{}", id)
|
||||
})
|
||||
// limit concurrent SQLite blocking threads
|
||||
.max_blocking_threads(settings.limits.max_blocking_threads)
|
||||
.on_thread_start(|| {
|
||||
trace!("started new thread: {:?}", std::thread::current().name());
|
||||
})
|
||||
.on_thread_stop(|| {
|
||||
trace!("stopped thread: {:?}", std::thread::current().name());
|
||||
})
|
||||
.build()
|
||||
.unwrap();
|
||||
// start tokio
|
||||
@@ -273,6 +324,23 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
|
||||
}
|
||||
}
|
||||
}
|
||||
// build a connection pool for DB maintenance
|
||||
let maintenance_pool = db::build_pool(
|
||||
"maintenance writer",
|
||||
&settings,
|
||||
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
|
||||
1,
|
||||
2,
|
||||
false,
|
||||
);
|
||||
|
||||
// Create a mutex that will block readers, so that a
|
||||
// checkpoint can be performed quickly.
|
||||
let safe_to_read = Arc::new(Mutex::new(0));
|
||||
|
||||
db::db_optimize_task(maintenance_pool.clone()).await;
|
||||
db::db_checkpoint_task(maintenance_pool, safe_to_read.clone()).await;
|
||||
|
||||
// listen for (external to tokio) shutdown request
|
||||
let controlled_shutdown = invoke_shutdown.clone();
|
||||
tokio::spawn(async move {
|
||||
@@ -283,6 +351,7 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
|
||||
controlled_shutdown.send(()).ok();
|
||||
}
|
||||
Err(std::sync::mpsc::RecvError) => {
|
||||
// FIXME: spurious error on startup?
|
||||
debug!("shutdown requestor is disconnected");
|
||||
}
|
||||
};
|
||||
@@ -301,12 +370,15 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
|
||||
let pool = db::build_pool(
|
||||
"client query",
|
||||
&settings,
|
||||
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
|
||||
| rusqlite::OpenFlags::SQLITE_OPEN_SHARED_CACHE,
|
||||
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY,
|
||||
db_min_conn,
|
||||
db_max_conn,
|
||||
true,
|
||||
);
|
||||
// spawn a task to check the pool size.
|
||||
let pool_monitor = pool.clone();
|
||||
tokio::spawn(async move {db::monitor_pool("reader", pool_monitor).await;});
|
||||
|
||||
// A `Service` is needed for every connection, so this
|
||||
// creates one from our `handle_request` function.
|
||||
let make_svc = make_service_fn(|conn: &AddrStream| {
|
||||
@@ -316,6 +388,7 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
|
||||
let event = event_tx.clone();
|
||||
let stop = invoke_shutdown.clone();
|
||||
let settings = settings.clone();
|
||||
let safe_to_read = safe_to_read.clone();
|
||||
async move {
|
||||
// service_fn converts our function into a `Service`
|
||||
Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {
|
||||
@@ -327,6 +400,7 @@ pub fn start_server(settings: Settings, shutdown_rx: MpscReceiver<()>) -> Result
|
||||
bcast.clone(),
|
||||
event.clone(),
|
||||
stop.subscribe(),
|
||||
safe_to_read.clone(),
|
||||
)
|
||||
}))
|
||||
}
|
||||
@@ -359,6 +433,10 @@ fn convert_to_msg(msg: String, max_bytes: Option<usize>) -> Result<NostrMessage>
|
||||
let parsed_res: Result<NostrMessage> = serde_json::from_str(&msg).map_err(|e| e.into());
|
||||
match parsed_res {
|
||||
Ok(m) => {
|
||||
if let NostrMessage::SubMsg(_) = m {
|
||||
// note; this only prints the first 16k of a REQ and then truncates.
|
||||
trace!("REQ: {:?}",msg);
|
||||
};
|
||||
if let NostrMessage::EventMsg(_) = m {
|
||||
if let Some(max_size) = max_bytes {
|
||||
// check length, ensure that some max size is set.
|
||||
@@ -370,44 +448,75 @@ fn convert_to_msg(msg: String, max_bytes: Option<usize>) -> Result<NostrMessage>
|
||||
Ok(m)
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("proto parse error: {:?}", e);
|
||||
debug!("parse error on message: {}", msg.trim());
|
||||
trace!("proto parse error: {:?}", e);
|
||||
trace!("parse error on message: {:?}", msg.trim());
|
||||
Err(Error::ProtoParseError)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Turn a string into a NOTICE message ready to send over a WebSocket
|
||||
fn make_notice_message(msg: &str) -> Message {
|
||||
Message::text(json!(["NOTICE", msg]).to_string())
|
||||
fn make_notice_message(notice: Notice) -> Message {
|
||||
let json = match notice {
|
||||
Notice::Message(ref msg) => json!(["NOTICE", msg]),
|
||||
Notice::EventResult(ref res) => json!(["OK", res.id, res.status.to_bool(), res.msg]),
|
||||
};
|
||||
|
||||
Message::text(json.to_string())
|
||||
}
|
||||
|
||||
struct ClientInfo {
|
||||
remote_ip: String,
|
||||
user_agent: Option<String>,
|
||||
origin: Option<String>,
|
||||
}
|
||||
|
||||
/// Handle new client connections. This runs through an event loop
|
||||
/// for all client communication.
|
||||
async fn nostr_server(
|
||||
pool: db::SqlitePool,
|
||||
client_info: ClientInfo,
|
||||
settings: Settings,
|
||||
mut ws_stream: WebSocketStream<Upgraded>,
|
||||
broadcast: Sender<Event>,
|
||||
event_tx: mpsc::Sender<SubmittedEvent>,
|
||||
mut shutdown: Receiver<()>,
|
||||
safe_to_read: Arc<Mutex<u64>>,
|
||||
) {
|
||||
// the time this websocket nostr server started
|
||||
let orig_start = Instant::now();
|
||||
// get a broadcast channel for clients to communicate on
|
||||
let mut bcast_rx = broadcast.subscribe();
|
||||
// Track internal client state
|
||||
let mut conn = conn::ClientConn::new();
|
||||
let mut conn = conn::ClientConn::new(client_info.remote_ip);
|
||||
// subscription creation rate limiting
|
||||
let mut sub_lim_opt = None;
|
||||
// 100ms jitter when the rate limiter returns
|
||||
let jitter = Jitter::up_to(Duration::from_millis(100));
|
||||
let sub_per_min_setting = settings.limits.subscriptions_per_min;
|
||||
if let Some(sub_per_min) = sub_per_min_setting {
|
||||
if sub_per_min > 0 {
|
||||
trace!("Rate limits for sub creation ({}/min)", sub_per_min);
|
||||
let quota_time = core::num::NonZeroU32::new(sub_per_min).unwrap();
|
||||
let quota = Quota::per_minute(quota_time);
|
||||
sub_lim_opt = Some(RateLimiter::direct(quota));
|
||||
}
|
||||
}
|
||||
// Use the remote IP as the client identifier
|
||||
let cid = conn.get_client_prefix();
|
||||
// Create a channel for receiving query results from the database.
|
||||
// we will send out the tx handle to any query we generate.
|
||||
let (query_tx, mut query_rx) = mpsc::channel::<db::QueryResult>(256);
|
||||
// this has capacity for some of the larger requests we see, which
|
||||
// should allow the DB thread to release the handle earlier.
|
||||
let (query_tx, mut query_rx) = mpsc::channel::<db::QueryResult>(20000);
|
||||
// Create channel for receiving NOTICEs
|
||||
let (notice_tx, mut notice_rx) = mpsc::channel::<String>(32);
|
||||
let (notice_tx, mut notice_rx) = mpsc::channel::<Notice>(128);
|
||||
|
||||
// last time this client sent data (message, ping, etc.)
|
||||
let mut last_message_time = Instant::now();
|
||||
|
||||
// ping interval (every 5 minutes)
|
||||
let default_ping_dur = Duration::from_secs(300);
|
||||
let default_ping_dur = Duration::from_secs(settings.network.ping_interval_seconds.into());
|
||||
|
||||
// disconnect after 20 minutes without a ping response or event.
|
||||
let max_quiet_time = Duration::from_secs(60 * 20);
|
||||
@@ -419,16 +528,23 @@ async fn nostr_server(
|
||||
// when these subscriptions are cancelled, make a message
|
||||
// available to the executing query so it knows to stop.
|
||||
let mut running_queries: HashMap<String, oneshot::Sender<()>> = HashMap::new();
|
||||
|
||||
// for stats, keep track of how many events the client published,
|
||||
// and how many it received from queries.
|
||||
let mut client_published_event_count: usize = 0;
|
||||
let mut client_received_event_count: usize = 0;
|
||||
info!("new connection for client: {:?}", cid);
|
||||
debug!("new client connection (cid: {}, ip: {:?})", cid, conn.ip());
|
||||
let origin = client_info.origin.unwrap_or_else(|| "<unspecified>".into());
|
||||
let user_agent = client_info
|
||||
.user_agent
|
||||
.unwrap_or_else(|| "<unspecified>".into());
|
||||
debug!(
|
||||
"cid: {}, origin: {:?}, user-agent: {:?}",
|
||||
cid, origin, user_agent
|
||||
);
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = shutdown.recv() => {
|
||||
info!("Shutting client connection down due to shutdown: {:?}", cid);
|
||||
info!("Close connection down due to shutdown, client: {}, ip: {:?}, connected: {:?}", cid, conn.ip(), orig_start.elapsed());
|
||||
// server shutting down, exit loop
|
||||
break;
|
||||
},
|
||||
@@ -443,7 +559,7 @@ async fn nostr_server(
|
||||
ws_stream.send(Message::Ping(Vec::new())).await.ok();
|
||||
},
|
||||
Some(notice_msg) = notice_rx.recv() => {
|
||||
ws_stream.send(make_notice_message(¬ice_msg)).await.ok();
|
||||
ws_stream.send(make_notice_message(notice_msg)).await.ok();
|
||||
},
|
||||
Some(query_result) = query_rx.recv() => {
|
||||
// database informed us of a query result we asked for
|
||||
@@ -462,12 +578,15 @@ async fn nostr_server(
|
||||
Ok(global_event) = bcast_rx.recv() => {
|
||||
// an event has been broadcast to all clients
|
||||
// first check if there is a subscription for this event.
|
||||
let matching_subs = conn.get_matching_subscriptions(&global_event);
|
||||
for s in matching_subs {
|
||||
for (s, sub) in conn.subscriptions() {
|
||||
if !sub.interested_in_event(&global_event) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: serialize at broadcast time, instead of
|
||||
// once for each consumer.
|
||||
if let Ok(event_str) = serde_json::to_string(&global_event) {
|
||||
debug!("sub match for client: {:?}, sub: {:?}, event: {:?}",
|
||||
trace!("sub match for client: {}, sub: {:?}, event: {:?}",
|
||||
cid, s,
|
||||
global_event.get_event_id_prefix());
|
||||
// create an event response and send it
|
||||
@@ -488,7 +607,7 @@ async fn nostr_server(
|
||||
},
|
||||
Some(Ok(Message::Binary(_))) => {
|
||||
ws_stream.send(
|
||||
make_notice_message("binary messages are not accepted")).await.ok();
|
||||
make_notice_message(Notice::message("binary messages are not accepted".into()))).await.ok();
|
||||
continue;
|
||||
},
|
||||
Some(Ok(Message::Ping(_) | Message::Pong(_))) => {
|
||||
@@ -498,8 +617,7 @@ async fn nostr_server(
|
||||
},
|
||||
Some(Err(WsError::Capacity(MessageTooLong{size, max_size}))) => {
|
||||
ws_stream.send(
|
||||
make_notice_message(
|
||||
&format!("message too large ({} > {})",size, max_size))).await.ok();
|
||||
make_notice_message(Notice::message(format!("message too large ({} > {})",size, max_size)))).await.ok();
|
||||
continue;
|
||||
},
|
||||
None |
|
||||
@@ -507,17 +625,17 @@ async fn nostr_server(
|
||||
Err(WsError::AlreadyClosed | WsError::ConnectionClosed |
|
||||
WsError::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake)))
|
||||
=> {
|
||||
debug!("websocket close from client: {:?}",cid);
|
||||
debug!("websocket close from client (cid: {}, ip: {:?})",cid, conn.ip());
|
||||
break;
|
||||
},
|
||||
Some(Err(WsError::Io(e))) => {
|
||||
// IO errors are considered fatal
|
||||
warn!("IO error (client: {:?}): {:?}", cid, e);
|
||||
warn!("IO error (cid: {}, ip: {:?}): {:?}", cid, conn.ip(), e);
|
||||
break;
|
||||
}
|
||||
x => {
|
||||
// default condition on error is to close the client connection
|
||||
info!("unknown error (client: {:?}): {:?} (closing conn)", cid, x);
|
||||
info!("unknown error (cid: {}, ip: {:?}): {:?} (closing conn)", cid, conn.ip(), x);
|
||||
break;
|
||||
}
|
||||
};
|
||||
@@ -527,11 +645,12 @@ async fn nostr_server(
|
||||
Ok(NostrMessage::EventMsg(ec)) => {
|
||||
// An EventCmd needs to be validated to be converted into an Event
|
||||
// handle each type of message
|
||||
let evid = ec.event_id().to_owned();
|
||||
let parsed : Result<Event> = Result::<Event>::from(ec);
|
||||
match parsed {
|
||||
Ok(e) => {
|
||||
let id_prefix:String = e.id.chars().take(8).collect();
|
||||
debug!("successfully parsed/validated event: {:?} from client: {:?}", id_prefix, cid);
|
||||
debug!("successfully parsed/validated event: {:?} (cid: {})", id_prefix, cid);
|
||||
// check if the event is too far in the future.
|
||||
if e.is_valid_timestamp(settings.options.reject_future_seconds) {
|
||||
// Write this to the database.
|
||||
@@ -539,39 +658,52 @@ async fn nostr_server(
|
||||
event_tx.send(submit_event).await.ok();
|
||||
client_published_event_count += 1;
|
||||
} else {
|
||||
info!("client {:?} sent a far future-dated event", cid);
|
||||
info!("client: {} sent a far future-dated event", cid);
|
||||
if let Some(fut_sec) = settings.options.reject_future_seconds {
|
||||
ws_stream.send(make_notice_message(&format!("The event created_at field is out of the acceptable range (+{}sec) for this relay and was not stored.",fut_sec))).await.ok();
|
||||
let msg = format!("The event created_at field is out of the acceptable range (+{}sec) for this relay.",fut_sec);
|
||||
let notice = Notice::invalid(e.id, &msg);
|
||||
ws_stream.send(make_notice_message(notice)).await.ok();
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
info!("client {:?} sent an invalid event", cid);
|
||||
ws_stream.send(make_notice_message("event was invalid")).await.ok();
|
||||
Err(e) => {
|
||||
info!("client sent an invalid event (cid: {})", cid);
|
||||
ws_stream.send(make_notice_message(Notice::invalid(evid, &format!("{}", e)))).await.ok();
|
||||
}
|
||||
}
|
||||
},
|
||||
Ok(NostrMessage::SubMsg(s)) => {
|
||||
debug!("client {} requesting a subscription", cid);
|
||||
debug!("subscription requested (cid: {}, sub: {:?})", cid, s.id);
|
||||
// subscription handling consists of:
|
||||
// * check for rate limits
|
||||
// * registering the subscription so future events can be matched
|
||||
// * making a channel to cancel to request later
|
||||
// * sending a request for a SQL query
|
||||
let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>();
|
||||
match conn.subscribe(s.clone()) {
|
||||
Ok(()) => {
|
||||
// when we insert, if there was a previous query running with the same name, cancel it.
|
||||
if let Some(previous_query) = running_queries.insert(s.id.to_owned(), abandon_query_tx) {
|
||||
previous_query.send(()).ok();
|
||||
}
|
||||
// start a database query
|
||||
db::db_query(s, cid.to_owned(), pool.clone(), query_tx.clone(), abandon_query_rx).await;
|
||||
},
|
||||
Err(e) => {
|
||||
info!("Subscription error: {}", e);
|
||||
ws_stream.send(make_notice_message(&e.to_string())).await.ok();
|
||||
// Do nothing if the sub already exists.
|
||||
if !conn.has_subscription(&s) {
|
||||
if let Some(ref lim) = sub_lim_opt {
|
||||
lim.until_ready_with_jitter(jitter).await;
|
||||
}
|
||||
let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>();
|
||||
match conn.subscribe(s.clone()) {
|
||||
Ok(()) => {
|
||||
// when we insert, if there was a previous query running with the same name, cancel it.
|
||||
if let Some(previous_query) = running_queries.insert(s.id.to_owned(), abandon_query_tx) {
|
||||
previous_query.send(()).ok();
|
||||
}
|
||||
if s.needs_historical_events() {
|
||||
// start a database query. this spawns a blocking database query on a worker thread.
|
||||
db::db_query(s, cid.to_owned(), pool.clone(), query_tx.clone(), abandon_query_rx,safe_to_read.clone()).await;
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
info!("Subscription error: {} (cid: {}, sub: {:?})", e, cid, s.id);
|
||||
ws_stream.send(make_notice_message(Notice::message(format!("Subscription error: {}", e)))).await.ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
info!("client sent duplicate subscription, ignoring (cid: {}, sub: {:?})", cid, s.id);
|
||||
}
|
||||
},
|
||||
Ok(NostrMessage::CloseMsg(cc)) => {
|
||||
// closing a request simply removes the subscription.
|
||||
@@ -588,23 +720,23 @@ async fn nostr_server(
|
||||
conn.unsubscribe(&c);
|
||||
} else {
|
||||
info!("invalid command ignored");
|
||||
ws_stream.send(make_notice_message("could not parse command")).await.ok();
|
||||
ws_stream.send(make_notice_message(Notice::message("could not parse command".into()))).await.ok();
|
||||
}
|
||||
},
|
||||
Err(Error::ConnError) => {
|
||||
debug!("got connection close/error, disconnecting client: {:?}",cid);
|
||||
debug!("got connection close/error, disconnecting cid: {}, ip: {:?}",cid, conn.ip());
|
||||
break;
|
||||
}
|
||||
Err(Error::EventMaxLengthError(s)) => {
|
||||
info!("client {:?} sent event larger ({} bytes) than max size", cid, s);
|
||||
ws_stream.send(make_notice_message("event exceeded max size")).await.ok();
|
||||
info!("client sent event larger ({} bytes) than max size (cid: {})", s, cid);
|
||||
ws_stream.send(make_notice_message(Notice::message("event exceeded max size".into()))).await.ok();
|
||||
},
|
||||
Err(Error::ProtoParseError) => {
|
||||
info!("client {:?} sent event that could not be parsed", cid);
|
||||
ws_stream.send(make_notice_message("could not parse command")).await.ok();
|
||||
info!("client sent event that could not be parsed (cid: {})", cid);
|
||||
ws_stream.send(make_notice_message(Notice::message("could not parse command".into()))).await.ok();
|
||||
},
|
||||
Err(e) => {
|
||||
info!("got non-fatal error from client: {:?}, error: {:?}", cid, e);
|
||||
info!("got non-fatal error from client (cid: {}, error: {:?}", cid, e);
|
||||
},
|
||||
}
|
||||
},
|
||||
@@ -615,7 +747,11 @@ async fn nostr_server(
|
||||
stop_tx.send(()).ok();
|
||||
}
|
||||
info!(
|
||||
"stopping connection for client: {:?} (client sent {} event(s), received {})",
|
||||
cid, client_published_event_count, client_received_event_count
|
||||
"stopping client connection (cid: {}, ip: {:?}, sent: {} events, recv: {} events, connected: {:?})",
|
||||
cid,
|
||||
conn.ip(),
|
||||
client_published_event_count,
|
||||
client_received_event_count,
|
||||
orig_start.elapsed()
|
||||
);
|
||||
}
|
||||
|
@@ -37,6 +37,9 @@ pub struct ReqFilter {
|
||||
#[serde(skip)]
|
||||
pub tags: Option<HashMap<char, HashSet<String>>>,
|
||||
/// Force no matches due to malformed data
|
||||
// we can't represent it in the req filter, so we don't want to
|
||||
// erroneously match. This basically indicates the req tried to
|
||||
// do something invalid.
|
||||
pub force_no_match: bool,
|
||||
}
|
||||
|
||||
@@ -62,12 +65,21 @@ impl<'de> Deserialize<'de> for ReqFilter {
|
||||
tags: None,
|
||||
force_no_match: false,
|
||||
};
|
||||
let empty_string = "".into();
|
||||
let mut ts = None;
|
||||
// iterate through each key, and assign values that exist
|
||||
for (key, val) in filter.into_iter() {
|
||||
// ids
|
||||
if key == "ids" {
|
||||
rf.ids = Deserialize::deserialize(val).ok();
|
||||
let raw_ids: Option<Vec<String>>= Deserialize::deserialize(val).ok();
|
||||
if let Some(a) = raw_ids.as_ref() {
|
||||
if a.contains(&empty_string) {
|
||||
return Err(serde::de::Error::invalid_type(
|
||||
Unexpected::Other("prefix matches must not be empty strings"),
|
||||
&"a json object"));
|
||||
}
|
||||
}
|
||||
rf.ids =raw_ids;
|
||||
} else if key == "kinds" {
|
||||
rf.kinds = Deserialize::deserialize(val).ok();
|
||||
} else if key == "since" {
|
||||
@@ -77,7 +89,15 @@ impl<'de> Deserialize<'de> for ReqFilter {
|
||||
} else if key == "limit" {
|
||||
rf.limit = Deserialize::deserialize(val).ok();
|
||||
} else if key == "authors" {
|
||||
rf.authors = Deserialize::deserialize(val).ok();
|
||||
let raw_authors: Option<Vec<String>>= Deserialize::deserialize(val).ok();
|
||||
if let Some(a) = raw_authors.as_ref() {
|
||||
if a.contains(&empty_string) {
|
||||
return Err(serde::de::Error::invalid_type(
|
||||
Unexpected::Other("prefix matches must not be empty strings"),
|
||||
&"a json object"));
|
||||
}
|
||||
}
|
||||
rf.authors = raw_authors;
|
||||
} else if key.starts_with('#') && key.len() > 1 && val.is_array() {
|
||||
if let Some(tag_search) = tag_search_char_from_filter(key) {
|
||||
if ts.is_none() {
|
||||
@@ -180,6 +200,13 @@ impl Subscription {
|
||||
pub fn get_id(&self) -> String {
|
||||
self.id.clone()
|
||||
}
|
||||
|
||||
/// Determine if any filter is requesting historical (database)
|
||||
/// queries. If every filter has limit:0, we do not need to query the DB.
|
||||
pub fn needs_historical_events(&self) -> bool {
|
||||
self.filters.iter().any(|f| f.limit!=Some(0))
|
||||
}
|
||||
|
||||
/// Determine if this subscription matches a given [`Event`]. Any
|
||||
/// individual filter match is sufficient.
|
||||
pub fn interested_in_event(&self, event: &Event) -> bool {
|
||||
@@ -291,6 +318,24 @@ mod tests {
|
||||
assert!(serde_json::from_str::<Subscription>(raw_json).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn req_empty_authors_prefix() {
|
||||
let raw_json = "[\"REQ\",\"some-id\",{\"authors\": [\"\"]}]";
|
||||
assert!(serde_json::from_str::<Subscription>(raw_json).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn req_empty_ids_prefix() {
|
||||
let raw_json = "[\"REQ\",\"some-id\",{\"ids\": [\"\"]}]";
|
||||
assert!(serde_json::from_str::<Subscription>(raw_json).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn req_empty_ids_prefix_mixed() {
|
||||
let raw_json = "[\"REQ\",\"some-id\",{\"ids\": [\"\",\"aaa\"]}]";
|
||||
assert!(serde_json::from_str::<Subscription>(raw_json).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn legacy_filter() {
|
||||
// legacy field in filter
|
||||
|
10
tests/cli.rs
Normal file
10
tests/cli.rs
Normal file
@@ -0,0 +1,10 @@
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use nostr_rs_relay::cli::CLIArgs;
|
||||
|
||||
#[test]
|
||||
fn cli_tests() {
|
||||
use clap::CommandFactory;
|
||||
CLIArgs::command().debug_assert();
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user