feat: limit event publishing to NIP-05 verified users

This adds a new configurable feature to restrict event publishing to
only users with NIP-05 verified metadata.  Domains can be whitelisted
or blacklisted.  Verification expiration and schedules are
configurable.

This upgrades the database to add a table for tracking verification
records.
This commit is contained in:
Greg Heartsfield 2022-02-12 09:29:25 -06:00
parent f679fa0893
commit 234a8ba0ac
14 changed files with 1787 additions and 118 deletions

317
Cargo.lock generated
View File

@ -142,6 +142,22 @@ dependencies = [
"yaml-rust",
]
[[package]]
name = "core-foundation"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6888e10551bb93e424d8df1d07f1a8b4fceb0001a3a4b048bfc47554946f47b3"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "core-foundation-sys"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc"
[[package]]
name = "cpufeatures"
version = "0.2.1"
@ -153,9 +169,9 @@ dependencies = [
[[package]]
name = "crossbeam-utils"
version = "0.8.6"
version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfcae03edb34f947e64acdb1c33ec169824e20657e9ecb61cef6c8c74dcb8120"
checksum = "b5e5bed1f1c269533fa816a0a5492b3545209a205ca1a54842be180eb63a16a6"
dependencies = [
"cfg-if",
"lazy_static",
@ -205,12 +221,36 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
[[package]]
name = "fastrand"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3fcf0cee53519c866c09b5de1f6c56ff9d647101f81c1964fa632e148896cdf"
dependencies = [
"instant",
]
[[package]]
name = "fnv"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
dependencies = [
"foreign-types-shared",
]
[[package]]
name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
[[package]]
name = "form_urlencoded"
version = "1.0.1"
@ -229,9 +269,9 @@ checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba"
[[package]]
name = "futures"
version = "0.3.19"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28560757fe2bb34e79f907794bb6b22ae8b0e5c669b638a1132f2592b19035b4"
checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e"
dependencies = [
"futures-channel",
"futures-core",
@ -244,9 +284,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.19"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba3dda0b6588335f360afc675d0564c17a77a2bda81ca178a4b6081bd86c7f0b"
checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010"
dependencies = [
"futures-core",
"futures-sink",
@ -254,15 +294,15 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.19"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0c8ff0461b82559810cdccfde3215c3f373807f5e5232b71479bff7bb2583d7"
checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
[[package]]
name = "futures-executor"
version = "0.3.19"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29d6d2ff5bb10fb95c85b8ce46538a2e5f5e7fdc755623a7d4529ab8a4ed9d2a"
checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6"
dependencies = [
"futures-core",
"futures-task",
@ -271,15 +311,15 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.19"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1f9d34af5a1aac6fb380f735fe510746c38067c5bf16c7fd250280503c971b2"
checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b"
[[package]]
name = "futures-macro"
version = "0.3.19"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6dbd947adfffb0efc70599b3ddcf7b5597bb5fa9e245eb99f62b3a5f7bb8bd3c"
checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512"
dependencies = [
"proc-macro2",
"quote",
@ -288,15 +328,15 @@ dependencies = [
[[package]]
name = "futures-sink"
version = "0.3.19"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3055baccb68d74ff6480350f8d6eb8fcfa3aa11bdc1a1ae3afdd0514617d508"
checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868"
[[package]]
name = "futures-task"
version = "0.3.19"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ee7c6485c30167ce4dfb83ac568a849fe53274c831081476ee13e0dce1aad72"
checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a"
[[package]]
name = "futures-timer"
@ -306,9 +346,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
[[package]]
name = "futures-util"
version = "0.3.19"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b5cf40b47a271f77a8b1bec03ca09044d99d2372c0de244e66430761127164"
checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"
dependencies = [
"futures-channel",
"futures-core",
@ -476,6 +516,19 @@ dependencies = [
"want",
]
[[package]]
name = "hyper-tls"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes",
"hyper",
"native-tls",
"tokio",
"tokio-native-tls",
]
[[package]]
name = "idna"
version = "0.2.3"
@ -548,9 +601,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.114"
version = "0.2.117"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0005d08a8f7b65fb8073cb697aa0b12b631ed251ce73d862ce50eeb52ce3b50"
checksum = "e74d72e0f9b65b5b4ca49a346af3976df0f9c61d550727f349ecd559f251a26c"
[[package]]
name = "libsqlite3-sys"
@ -570,9 +623,9 @@ checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3"
[[package]]
name = "lock_api"
version = "0.4.5"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712a4d093c9976e24e7dbca41db895dabcbac38eb5f4045393d17a95bdfb1109"
checksum = "88943dd7ef4a2e5a4bfa2753aaab3013e34ce2533d1996fb18ef591e315e2b3b"
dependencies = [
"scopeguard",
]
@ -629,6 +682,24 @@ dependencies = [
"winapi",
]
[[package]]
name = "native-tls"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48ba9f7719b5a0f42f338907614285fb5fd70e53858141f69898a1fb7203b24d"
dependencies = [
"lazy_static",
"libc",
"log",
"openssl",
"openssl-probe",
"openssl-sys",
"schannel",
"security-framework",
"security-framework-sys",
"tempfile",
]
[[package]]
name = "no-std-compat"
version = "0.4.1"
@ -663,12 +734,16 @@ dependencies = [
"futures-util",
"governor",
"hex",
"http",
"hyper",
"hyper-tls",
"lazy_static",
"log",
"nonzero_ext",
"parse_duration",
"r2d2",
"r2d2_sqlite",
"rand 0.8.4",
"rusqlite",
"secp256k1",
"serde 1.0.136",
@ -689,6 +764,74 @@ dependencies = [
"winapi",
]
[[package]]
name = "num"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8536030f9fea7127f841b45bb6243b27255787fb4eb83958aa1ef9d2fdc0c36"
dependencies = [
"num-bigint",
"num-complex",
"num-integer",
"num-iter",
"num-rational",
"num-traits 0.2.14",
]
[[package]]
name = "num-bigint"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "090c7f9998ee0ff65aa5b723e4009f7b217707f1fb5ea551329cc4d6231fb304"
dependencies = [
"autocfg 1.0.1",
"num-integer",
"num-traits 0.2.14",
]
[[package]]
name = "num-complex"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6b19411a9719e753aff12e5187b74d60d3dc449ec3f4dc21e3989c3f554bc95"
dependencies = [
"autocfg 1.0.1",
"num-traits 0.2.14",
]
[[package]]
name = "num-integer"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db"
dependencies = [
"autocfg 1.0.1",
"num-traits 0.2.14",
]
[[package]]
name = "num-iter"
version = "0.1.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2021c8337a54d21aca0d59a92577a029af9431cb59b909b03252b9c164fad59"
dependencies = [
"autocfg 1.0.1",
"num-integer",
"num-traits 0.2.14",
]
[[package]]
name = "num-rational"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c000134b5dbf44adc5cb772486d335293351644b801551abe8f75c84cfa4aef"
dependencies = [
"autocfg 1.0.1",
"num-bigint",
"num-integer",
"num-traits 0.2.14",
]
[[package]]
name = "num-traits"
version = "0.1.43"
@ -729,6 +872,39 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "openssl"
version = "0.10.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c7ae222234c30df141154f159066c5093ff73b63204dcda7121eb082fc56a95"
dependencies = [
"bitflags",
"cfg-if",
"foreign-types",
"libc",
"once_cell",
"openssl-sys",
]
[[package]]
name = "openssl-probe"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
version = "0.9.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e46109c383602735fa0a2e48dd2b7c892b048e1bf69e5c3b1d804b7d9c203cb"
dependencies = [
"autocfg 1.0.1",
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "parking_lot"
version = "0.11.2"
@ -754,6 +930,17 @@ dependencies = [
"winapi",
]
[[package]]
name = "parse_duration"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7037e5e93e0172a5a96874380bf73bc6ecef022e26fa25f2be26864d6b3ba95d"
dependencies = [
"lazy_static",
"num",
"regex",
]
[[package]]
name = "percent-encoding"
version = "2.1.0"
@ -1029,6 +1216,15 @@ version = "0.6.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
[[package]]
name = "remove_dir_all"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7"
dependencies = [
"winapi",
]
[[package]]
name = "rusqlite"
version = "0.26.3"
@ -1056,6 +1252,16 @@ version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f"
[[package]]
name = "schannel"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75"
dependencies = [
"lazy_static",
"winapi",
]
[[package]]
name = "scheduled-thread-pool"
version = "0.2.5"
@ -1090,6 +1296,29 @@ dependencies = [
"cc",
]
[[package]]
name = "security-framework"
version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dc14f172faf8a0194a3aded622712b0de276821addc574fa54fc0a1167e10dc"
dependencies = [
"bitflags",
"core-foundation",
"core-foundation-sys",
"libc",
"security-framework-sys",
]
[[package]]
name = "security-framework-sys"
version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0160a13a177a45bfb43ce71c01580998474f556ad854dcbca936dd2841a5c556"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "serde"
version = "0.8.23"
@ -1176,9 +1405,9 @@ checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
[[package]]
name = "socket2"
version = "0.4.3"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f82496b90c36d70af5fcd482edaa2e0bd16fade569de1330405fecbbdac736b"
checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0"
dependencies = [
"libc",
"winapi",
@ -1201,6 +1430,20 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "tempfile"
version = "3.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4"
dependencies = [
"cfg-if",
"fastrand",
"libc",
"redox_syscall",
"remove_dir_all",
"winapi",
]
[[package]]
name = "termcolor"
version = "1.1.2"
@ -1247,9 +1490,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.15.0"
version = "1.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbbf1c778ec206785635ce8ad57fe52b3009ae9e0c9f574a728f3049d3e55838"
checksum = "0c27a64b625de6d309e8c57716ba93021dccf1b3b5c97edd6d3dd2d2135afc0a"
dependencies = [
"bytes",
"libc",
@ -1275,6 +1518,16 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-native-tls"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b"
dependencies = [
"native-tls",
"tokio",
]
[[package]]
name = "tokio-tungstenite"
version = "0.16.1"
@ -1318,9 +1571,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
[[package]]
name = "tracing"
version = "0.1.29"
version = "0.1.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105"
checksum = "2d8d93354fe2a8e50d5953f5ae2e47a3fc2ef03292e7ea46e3cc38f549525fb9"
dependencies = [
"cfg-if",
"pin-project-lite",
@ -1329,9 +1582,9 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.21"
version = "0.1.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4"
checksum = "03cfcb51380632a72d3111cb8d3447a8d908e577d31beeac006f836383d29a23"
dependencies = [
"lazy_static",
]

View File

@ -25,4 +25,8 @@ r2d2_sqlite = "^0.19"
lazy_static = "^1.4"
governor = "^0.4"
nonzero_ext = "^0.3"
hyper={ version="0.14", features=["server","http1","http2","tcp"] }
hyper = { version="0.14", features=["client", "server","http1","http2","tcp"] }
hyper-tls = "^0.5"
http = { version = "^0.2" }
parse_duration = "^2"
rand = "^0.8"

View File

@ -1,4 +1,4 @@
FROM rust:1.57 as builder
FROM rust:1.58.1 as builder
RUN USER=root cargo new --bin nostr-rs-relay
WORKDIR ./nostr-rs-relay
@ -12,11 +12,11 @@ COPY ./src ./src
RUN rm ./target/release/deps/nostr*relay*
RUN cargo build --release
FROM debian:buster-slim
FROM debian:bullseye-20220125-slim
ARG APP=/usr/src/app
ARG APP_DATA=/usr/src/app/db
RUN apt-get update \
&& apt-get install -y ca-certificates tzdata sqlite3 \
&& apt-get install -y ca-certificates tzdata sqlite3 libc6 \
&& rm -rf /var/lib/apt/lists/*
EXPOSE 8080

View File

@ -22,7 +22,7 @@ description = "A newly created nostr-rs-relay.\n\nCustomize this with your own i
# line option.
data_directory = "."
# Database connection pool settings:
# Database connection pool settings for subscribers:
# Minimum number of SQLite reader connections
#min_conn = 4
@ -59,8 +59,8 @@ reject_future_seconds = 1800
#max_ws_frame_bytes = 131072
# Broadcast buffer size, in number of events. This prevents slow
# readers from consuming memory. Defaults to 4096.
#broadcast_buffer = 4096
# readers from consuming memory.
#broadcast_buffer = 16384
# Event persistence buffer size, in number of events. This provides
# backpressure to senders if writes are slow. Defaults to 16.
@ -68,8 +68,35 @@ reject_future_seconds = 1800
[authorization]
# Pubkey addresses in this array are whitelisted for event publishing.
# Only valid events by these authors will be accepted.
# Only valid events by these authors will be accepted, if the variable
# is set.
#pubkey_whitelist = [
# "35d26e4690cbe1a898af61cc3515661eb5fa763b57bd0b42e45099c8b32fd50f",
# "887645fef0ce0c3c1218d2f5d8e6132a19304cdc57cd20281d082f38cfea0072",
#]
[verified_users]
# NIP-05 verification of users. Can be "enabled" to require NIP-05
# metadata for event authors, "passive" to perform validation but
# never block publishing, or "disabled" to do nothing.
#mode = "disabled"
# Domain names that will be prevented from publishing events.
#domain_blacklist = ["wellorder.net"]
# Domain names that are allowed to publish events. If defined, only
# events NIP-05 verified authors at these domains are persisted.
#domain_whitelist = ["example.com"]
# Consider an pubkey "verified" if we have a successful validation
# from the NIP-05 domain within this amount of time. Note, if the
# domain provides a successful response that omits the account,
# verification is immediately revoked.
#verify_expiration = "1 week"
# How long to wait between verification attempts for a specific author.
#verify_update_frequency = "24 hours"
# How many consecutive failed checks before we give up on verifying
# this author.
#max_consecutive_failures = 20

View File

@ -0,0 +1,248 @@
# Author Verification Design Document
The relay will use NIP-05 DNS-based author verification to limit which
authors can publish events to a relay. This document describes how
this feature will operate.
## Considerations
DNS-based author verification is designed to be deployed in relays that
want to prevent spam, so there should be strong protections to prevent
unauthorized authors from persisting data. This includes data needed to
verify new authors.
There should be protections in place to ensure the relay cannot be
used to spam or flood other webservers. Additionally, there should be
protections against server-side request forgery (SSRF).
## Design Overview
### Concepts
All authors are initially "unverified". Unverified authors that submit
appropriate `NIP-05` metadata events become "candidates" for
verification. A candidate author becomes verified when the relay
inspects a kind `0` metadata event for the author with a `nip05` field,
and follows the procedure in `NIP-05` to successfully associate the
author with an internet identifier.
The `NIP-05` procedure verifies an author for a fixed period of time,
configurable by the relay operator. If this "verification expiration
time" (`verify_expiration`) is exceeded without being refreshed, they
are once again unverified.
Verified authors have their status regularly and automatically updated
through scheduled polling to their verified domain, this process is
"re-verification". It is performed based on the configuration setting
`verify_update_frequency`, which defines how long the relay waits
between verification attempts (whether the result was success or
failure).
Authors may change their verification data (the internet identifier from
`NIP-05`) with a new metadata event, which then requires
re-verification. Their old verification remains valid until
expiration.
Performing candidate author verification is a best-effort activity and
may be significantly rate-limited to prevent relays being used to
attack other hosts. Candidate verification (untrusted authors) should
never impact re-verification (trusted authors).
## Operating Modes
The relay may operate in one of three modes. "Disabled" performs no
validation activities, and will never permit or deny events based on
an author's NIP-05 metadata. "Passive" performs NIP-05 validation,
but does not permit or deny events based on the validity or presence
of NIP-05 metadata. "Enabled" will require current and valid NIP-05
metadata for any events to be persisted. "Enabled" mode will
additionally consider domain whitelist/blacklist configuration data to
restrict which author's events are persisted.
## Design Details
### Data Storage
Verification is stored in a dedicated table. This tracks:
* `nip05` identifier
* most recent verification timestamp
* most recent verification failure timestamp
* reference to the metadata event (used for tracking `created_at` and
`pubkey`)
### Event Handling
All events are first validated to ensure the signature is valid.
Incoming events of kind _other_ than metadata (kind `0`) submitted by
clients will be evaluated as follows.
* If the event's author has a current verification, the event is
persisted as normal.
* If the event's author has either no verification, or the
verification is expired, the event is rejected.
If the event is a metadata event, we handle it differently.
We first determine the verification status of the event's pubkey.
* If the event author is unverified, AND the event contains a `nip05`
key, we consider this a verification candidate.
* If the event author is unverified, AND the event does not contain a
`nip05` key, this is not a candidate, and the event is dropped.
* If the event author is verified, AND the event contains a `nip05`
key that is identical to the currently stored value, no special
action is needed.
* If the event author is verified, AND the event contains a different
`nip05` than was previously verified, with a more recent timestamp,
we need to re-verify.
* If the event author is verified, AND the event is missing a `nip05`
key, and the event timestamp is more recent than what was verified,
we do nothing. The current verification will be allowed to expire.
### Candidate Verification
When a candidate verification is requested, a rate limit will be
utilized. If the rate limit is exceeded, new candidate verification
requests will be dropped. In practice, this is implemented by a
size-limited channel that drops events that exceed a threshold.
Candidates are never persisted in the database.
### Re-Verification
Re-verification is straightforward when there has been no change to
the `nip05` key. A new request to the `nip05` domain is performed,
and if successful, the verification timestamp is updated to the
current time. If the request fails due to a timeout or server error,
the failure timestamp is updated instead.
When the the `nip05` key has changed and this event is more recent, we
will create a new verification record, and delete all other records
for the same name.
Regarding creating new records vs. updating: We never update the event
reference or `nip05` identifier in a verification record. Every update
either reset the last failure or last success timestamp.
### Determining Verification Status
In determining if an event is from a verified author, the following
procedure should be used:
Join the verification table with the event table, to provide
verification data alongside the event `created_at` and `pubkey`
metadata. Find the most recent verification record for the author,
based on the `created_at` time.
Reject the record if the success timestamp is not within our
configured expiration time.
Reject records with disallowed domains, based on any whitelists or
blacklists in effect.
If a result remains, the author is treated as verified.
This does give a time window for authors transitioning their verified
status between domains. There may be a period of time in which there
are multiple valid rows in the verification table for a given author.
### Cleaning Up Inactive Verifications
After a author verification has expired, we will continue to check for
it to become valid again. After a configurable number of attempts, we
should simply forget it, and reclaim the space.
### Addition of Domain Whitelist/Blacklist
A set of whitelisted or blacklisted domains may be provided. If both
are provided, only the whitelist is used. In this context, domains
are either "allowed" (present on a whitelist and NOT present on a
blacklist), or "denied" (NOT present on a whitelist and present on a
blacklist).
The processes outlined so far are modified in the presence of these
options:
* Only authors with allowed domains can become candidates for
verification.
* Verification status queries additionally filter out any denied
domains.
* Re-verification processes only proceed with allowed domains.
### Integration
We have an existing database writer thread, which receives events and
attempts to persist them to disk. Once validated and persisted, these
events are broadcast to all subscribers.
When verification is enabled, the writer must check to ensure a valid,
unexpired verification record exists for the auther. All metadata
events (regardless of verification status) are forwarded to a verifier
module. If the verifier determines a new verification record is
needed, it is also responsible for persisting and broadcasting the
event, just as the database writer would have done.
## Threat Scenarios
Some of these mitigations are fully implemented, others are documented
simply to demonstrate a mitigation is possible.
### Domain Spamming
*Threat*: A author with a high-volume of events creates a metadata event
with a bogus domain, causing the relay to generate significant
unwanted traffic to a target.
*Mitigation*: Rate limiting for all candidate verification will limit
external requests to a reasonable amount. Currently, this is a simple
delay that slows down the HTTP task.
### Denial of Service for Legitimate Authors
*Threat*: A author with a high-volume of events creates a metadata event
with a domain that is invalid for them, _but which is used by other
legitimate authors_. This triggers rate-limiting against the legitimate
domain, and blocks authors from updating their own metadata.
*Mitigation*: Rate limiting should only apply to candidates, so any
existing verified authors have priority for re-verification. New
authors will be affected, as we can not distinguish between the threat
and a legitimate author. _(Unimplemented)_
### Denial of Service by Consuming Storage
*Threat*: A author creates a high volume of random metadata events with
unique domains, in order to cause us to store large amounts of data
for to-be-verified authors.
*Mitigation*: No data is stored for candidate authors. This makes it
harder for new authors to become verified, but is effective at
preventing this attack.
### Metadata Replay for Verified Author
*Threat*: Attacker replays out-of-date metadata event for a author, to
cause a verification to fail.
*Mitigation*: New metadata events have their signed timestamp compared
against the signed timestamp of the event that has most recently
verified them. If the metadata event is older, it is discarded.
### Server-Side Request Forgery via Metadata
*Threat*: Attacker includes malicious data in the `nip05` event, which
is used to generate HTTP requests against potentially internal
resources. Either leaking data, or invoking webservices beyond their
own privileges.
*Mitigation*: Consider detecting and dropping when the `nip05` field
is an IP address. Allow the relay operator to utilize the `blacklist`
or `whitelist` to constrain hosts that will be contacted. Most
importantly, the verification process is hardcoded to only make
requests to a known url path
(`.well-known/nostr.json?name=<LOCAL_NAME>`). The `<LOCAL_NAME>`
component is restricted to a basic ASCII subset (preventing additional
URL components).

View File

@ -2,6 +2,7 @@ use lazy_static::lazy_static;
use log::*;
use serde::{Deserialize, Serialize};
use std::sync::RwLock;
use std::time::Duration;
// initialize a singleton default configuration
lazy_static! {
@ -67,6 +68,60 @@ pub struct Authorization {
pub pubkey_whitelist: Option<Vec<String>>, // If present, only allow these pubkeys to publish events
}
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
#[serde(rename_all = "lowercase")]
pub enum VerifiedUsersMode {
Enabled,
Passive,
Disabled,
}
#[derive(Debug, Serialize, Deserialize)]
#[allow(unused)]
pub struct VerifiedUsers {
pub mode: VerifiedUsersMode, // Mode of operation: "enabled" (enforce) or "passive" (check only). If none, this is simply disabled.
pub domain_whitelist: Option<Vec<String>>, // If present, only allow verified users from these domains can publish events
pub domain_blacklist: Option<Vec<String>>, // If present, allow all verified users from any domain except these
pub verify_expiration: Option<String>, // how long a verification is cached for before no longer being used
pub verify_update_frequency: Option<String>, // how often to attempt to update verification
pub verify_expiration_duration: Option<Duration>, // internal result of parsing verify_expiration
pub verify_update_frequency_duration: Option<Duration>, // internal result of parsing verify_update_frequency
pub max_consecutive_failures: usize, // maximum number of verification failures in a row, before ceasing future checks
}
impl VerifiedUsers {
pub fn init(&mut self) {
self.verify_expiration_duration = self.verify_expiration_duration();
self.verify_update_frequency_duration = self.verify_update_duration();
}
pub fn is_enabled(&self) -> bool {
self.mode == VerifiedUsersMode::Enabled
}
pub fn is_active(&self) -> bool {
self.mode == VerifiedUsersMode::Enabled || self.mode == VerifiedUsersMode::Passive
}
pub fn is_passive(&self) -> bool {
self.mode == VerifiedUsersMode::Passive
}
pub fn verify_expiration_duration(&self) -> Option<Duration> {
self.verify_expiration
.as_ref()
.and_then(|x| parse_duration::parse(x).ok())
}
pub fn verify_update_duration(&self) -> Option<Duration> {
self.verify_update_frequency
.as_ref()
.and_then(|x| parse_duration::parse(x).ok())
}
pub fn is_valid(&self) -> bool {
self.verify_expiration_duration().is_some() && self.verify_update_duration().is_some()
}
}
#[derive(Debug, Serialize, Deserialize)]
#[allow(unused)]
pub struct Settings {
@ -75,6 +130,7 @@ pub struct Settings {
pub network: Network,
pub limits: Limits,
pub authorization: Authorization,
pub verified_users: VerifiedUsers,
pub retention: Retention,
pub options: Options,
}
@ -96,7 +152,7 @@ impl Settings {
fn new_from_default(default: &Settings) -> Result<Self, config::ConfigError> {
let config: config::Config = config::Config::new();
let settings: Settings = config
let mut settings: Settings = config
// use defaults
.with_merged(config::Config::try_from(default).unwrap())?
// override with file contents
@ -109,6 +165,12 @@ impl Settings {
settings.database.min_conn, settings.database.max_conn
);
}
// ensure durations parse
if !settings.verified_users.is_valid() {
panic!("VerifiedUsers time settings could not be parsed");
}
// initialize durations for verified users
settings.verified_users.init();
Ok(settings)
}
}
@ -137,12 +199,22 @@ impl Default for Settings {
max_event_bytes: Some(2 << 17), // 128K
max_ws_message_bytes: Some(2 << 17), // 128K
max_ws_frame_bytes: Some(2 << 17), // 128K
broadcast_buffer: 4096,
event_persist_buffer: 16,
broadcast_buffer: 16384,
event_persist_buffer: 4096,
},
authorization: Authorization {
pubkey_whitelist: None, // Allow any address to publish
},
verified_users: VerifiedUsers {
mode: VerifiedUsersMode::Disabled,
domain_whitelist: None,
domain_blacklist: None,
verify_expiration: Some("1 week".to_owned()),
verify_update_frequency: Some("1 day".to_owned()),
verify_expiration_duration: None,
verify_update_frequency_duration: None,
max_consecutive_failures: 20,
},
retention: Retention {
max_events: None, // max events
max_bytes: None, // max size

225
src/db.rs
View File

@ -1,21 +1,21 @@
//! Event persistence and querying
use crate::config;
use crate::config::SETTINGS;
use crate::error::Error;
use crate::error::Result;
use crate::event::Event;
use crate::nip05;
use crate::subscription::Subscription;
use governor::clock::Clock;
use governor::{Quota, RateLimiter};
use hex;
use log::*;
use rusqlite::params;
use rusqlite::Connection;
use rusqlite::OpenFlags;
//use std::num::NonZeroU32;
use crate::config::SETTINGS;
use r2d2;
use r2d2_sqlite::SqliteConnectionManager;
use rusqlite::limits::Limit;
use rusqlite::params;
use rusqlite::types::ToSql;
use rusqlite::Connection;
use rusqlite::OpenFlags;
use std::path::Path;
use std::thread;
use std::time::Duration;
@ -23,9 +23,9 @@ use std::time::Instant;
use tokio::task;
pub type SqlitePool = r2d2::Pool<r2d2_sqlite::SqliteConnectionManager>;
pub type PooledConnection = r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>;
/// Database file
const DB_FILE: &str = "nostr.db";
pub const DB_FILE: &str = "nostr.db";
/// Startup DB Pragmas
const STARTUP_SQL: &str = r##"
@ -42,7 +42,7 @@ PRAGMA journal_mode=WAL;
PRAGMA main.synchronous=NORMAL;
PRAGMA foreign_keys = ON;
PRAGMA application_id = 1654008667;
PRAGMA user_version = 3;
PRAGMA user_version = 4;
-- Event Table
CREATE TABLE IF NOT EXISTS event (
@ -98,37 +98,79 @@ FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE RESTRICT ON DELETE CASCADE
-- Pubkey References Index
CREATE INDEX IF NOT EXISTS pubkey_ref_index ON pubkey_ref(referenced_pubkey);
-- NIP-05 User Validation.
-- This represents the validation of a user.
-- cases;
-- we query, and find a valid result. update verified_at, and proceed.
-- we query, and get a 404/503/host down. update failed_at, and we are done.
-- we query, and get a 200, but the local part is not present with the given address. wipe out verified_at, update failed_at.
-- we need to know how often to query failing validations.
-- two cases, either we get a NIP-05 metadata event regularly that we can use to restart validation.
-- or, we simply get lots of non-metadata events, but the user fixed their NIP-05 host.
-- what should trigger a new attempt? what should trigger cleaning?
-- we will never write anything to the table if it is not valid at least once.
-- we will keep trying at frequency X to re-validate the already-valid nip05s.
-- incoming metadata events with nip05
CREATE TABLE IF NOT EXISTS user_verification (
id INTEGER PRIMARY KEY,
metadata_event INTEGER NOT NULL, -- the metadata event used for this validation.
name TEXT NOT NULL, -- the nip05 field value (user@domain).
verified_at INTEGER, -- timestamp this author/nip05 was most recently verified.
failed_at INTEGER, -- timestamp a verification attempt failed (host down).
failure_count INTEGER DEFAULT 0, -- number of consecutive failures.
FOREIGN KEY(metadata_event) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
);
"##;
pub fn build_read_pool() -> SqlitePool {
let config = config::SETTINGS.read().unwrap();
let db_dir = &config.database.data_directory;
// TODO: drop the pubkey_ref and event_ref tables
pub fn build_pool(
name: &str,
flags: OpenFlags,
min_size: u32,
max_size: u32,
wait_for_db: bool,
) -> SqlitePool {
let settings = SETTINGS.read().unwrap();
let db_dir = &settings.database.data_directory;
let full_path = Path::new(db_dir).join(DB_FILE);
// small hack; if the database doesn't exist yet, that means the
// writer thread hasn't finished. Give it a chance to work. This
// is only an issue with the first time we run.
while !full_path.exists() {
while !full_path.exists() && wait_for_db {
debug!("Database reader pool is waiting on the database to be created...");
thread::sleep(Duration::from_millis(500));
}
let manager = SqliteConnectionManager::file(&full_path)
.with_flags(OpenFlags::SQLITE_OPEN_READ_ONLY)
.with_flags(flags)
.with_init(|c| c.execute_batch(STARTUP_SQL));
let pool: SqlitePool = r2d2::Pool::builder()
.test_on_check_out(true) // no noticeable performance hit
.min_idle(Some(config.database.min_conn))
.max_size(config.database.max_conn)
.min_idle(Some(min_size))
.max_size(max_size)
.build(manager)
.unwrap();
info!(
"Built a connection pool (min={}, max={})",
config.database.min_conn, config.database.max_conn
"Built a connection pool {:?} (min={}, max={})",
name, min_size, max_size
);
return pool;
pool
}
/// Build a single database connection, with provided flags
pub fn build_conn(flags: OpenFlags) -> Result<Connection> {
let settings = SETTINGS.read().unwrap();
let db_dir = &settings.database.data_directory;
let full_path = Path::new(db_dir).join(DB_FILE);
// create a connection
Ok(Connection::open_with_flags(&full_path, flags)?)
}
/// Upgrade DB to latest version, and execute pragma settings
pub fn upgrade_db(conn: &mut Connection) -> Result<()> {
pub fn upgrade_db(conn: &mut PooledConnection) -> Result<()> {
// check the version.
let mut curr_version = db_version(conn)?;
info!("DB version = {:?}", curr_version);
@ -150,8 +192,7 @@ pub fn upgrade_db(conn: &mut Connection) -> Result<()> {
if curr_version == 0 {
match conn.execute_batch(INIT_SQL) {
Ok(()) => {
info!("database pragma/schema initialized to v3, and ready");
//curr_version = 3;
info!("database pragma/schema initialized to v4, and ready");
}
Err(err) => {
error!("update failed: {}", err);
@ -189,15 +230,13 @@ value TEXT, -- the tag value, if not hex.
value_hex BLOB, -- the tag value, if it can be interpreted as a hex string.
FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS tag_val_index ON tag(value);
CREATE INDEX IF NOT EXISTS tag_val_hex_index ON tag(value_hex);
PRAGMA user_version = 3;
"##;
// TODO: load existing refs into tag table
match conn.execute_batch(upgrade_sql) {
Ok(()) => {
info!("database schema upgraded v2 -> v3");
//curr_version = 3;
curr_version = 3;
}
Err(err) => {
error!("update failed: {}", err);
@ -226,14 +265,43 @@ PRAGMA user_version = 3;
}
tx.commit()?;
info!("Upgrade complete");
} else if curr_version == 3 {
}
if curr_version == 3 {
debug!("database schema needs update from 3->4");
let upgrade_sql = r##"
-- incoming metadata events with nip05
CREATE TABLE IF NOT EXISTS user_verification (
id INTEGER PRIMARY KEY,
metadata_event INTEGER NOT NULL, -- the metadata event used for this validation.
name TEXT NOT NULL, -- the nip05 field value (user@domain).
verified_at INTEGER, -- timestamp this author/nip05 was most recently verified.
failed_at INTEGER, -- timestamp a verification attempt failed (host down).
failure_count INTEGER DEFAULT 0, -- number of consecutive failures.
FOREIGN KEY(metadata_event) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS user_verification_author_index ON user_verification(author);
CREATE INDEX IF NOT EXISTS user_verification_author_index ON user_verification(author);
PRAGMA user_version = 4;
"##;
// TODO: load existing refs into tag table
match conn.execute_batch(upgrade_sql) {
Ok(()) => {
info!("database schema upgraded v3 -> v4");
//curr_version = 4;
}
Err(err) => {
error!("update failed: {}", err);
panic!("database could not be upgraded");
}
}
} else if curr_version == 4 {
debug!("Database version was already current");
} else if curr_version > 3 {
panic!("Database version is newer than supported by this executable");
}
// Setup PRAGMA
conn.execute_batch(STARTUP_SQL)?;
info!("Finished pragma");
debug!("SQLite PRAGMA startup completed");
Ok(())
}
@ -241,26 +309,37 @@ PRAGMA user_version = 3;
pub async fn db_writer(
mut event_rx: tokio::sync::mpsc::Receiver<Event>,
bcast_tx: tokio::sync::broadcast::Sender<Event>,
metadata_tx: tokio::sync::broadcast::Sender<Event>,
mut shutdown: tokio::sync::broadcast::Receiver<()>,
) -> tokio::task::JoinHandle<Result<()>> {
let settings = SETTINGS.read().unwrap();
// are we performing NIP-05 checking?
let nip05_active = settings.verified_users.is_active();
// are we requriing NIP-05 user verification?
let nip05_enabled = settings.verified_users.is_enabled();
task::spawn_blocking(move || {
// get database configuration settings
let config = SETTINGS.read().unwrap();
let db_dir = &config.database.data_directory;
let settings = SETTINGS.read().unwrap();
let db_dir = &settings.database.data_directory;
let full_path = Path::new(db_dir).join(DB_FILE);
// create a connection
let mut conn = Connection::open_with_flags(
&full_path,
// create a connection pool
let pool = build_pool(
"event writer",
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
)?;
1,
4,
false,
);
info!("opened database {:?} for writing", full_path);
upgrade_db(&mut conn)?;
upgrade_db(&mut pool.get()?)?;
// Make a copy of the whitelist
let whitelist = &config.authorization.pubkey_whitelist.clone();
let whitelist = &settings.authorization.pubkey_whitelist.clone();
// get rate limit settings
let rps_setting = config.limits.messages_per_sec;
let rps_setting = settings.limits.messages_per_sec;
let mut most_recent_rate_limit = Instant::now();
let mut lim_opt = None;
let clock = governor::clock::QuantaClock::default();
@ -287,7 +366,7 @@ pub async fn db_writer(
// check if this event is authorized.
if let Some(allowed_addrs) = whitelist {
debug!("Checking against whitelist");
debug!("Checking against pubkey whitelist");
// if the event address is not in allowed_addrs.
if !allowed_addrs.contains(&event.pubkey) {
info!(
@ -299,15 +378,57 @@ pub async fn db_writer(
}
}
// send any metadata events to the NIP-05 verifier
if nip05_active && event.is_kind_metadata() {
// we are sending this prior to even deciding if we
// persist it. this allows the nip05 module to
// inspect it, update if necessary, or persist a new
// event and broadcast it itself.
metadata_tx.send(event.clone()).ok();
}
// check for NIP-05 verification
if nip05_enabled {
match nip05::query_latest_user_verification(pool.get()?, event.pubkey.to_owned()) {
Ok(uv) => {
if uv.is_valid() {
info!(
"new event from verified author ({:?},{:?})",
uv.name.to_string(),
event.get_author_prefix()
);
} else {
info!("rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)",
uv.name.to_string(),
event.get_author_prefix()
);
continue;
}
}
Err(Error::SqlError(rusqlite::Error::QueryReturnedNoRows)) => {
debug!(
"no verification records found for pubkey: {:?}",
event.get_author_prefix()
);
continue;
}
Err(e) => {
warn!("checking nip05 verification status failed: {:?}", e);
continue;
}
}
}
// TODO: cache recent list of authors to remove a DB call.
let start = Instant::now();
match write_event(&mut conn, &event) {
match write_event(&mut pool.get()?, &event) {
Ok(updated) => {
if updated == 0 {
debug!("ignoring duplicate event");
trace!("ignoring duplicate event");
} else {
info!(
"persisted event: {} in {:?}",
"persisted event {:?} from {:?} in {:?}",
event.get_event_id_prefix(),
event.get_author_prefix(),
start.elapsed()
);
event_write = true;
@ -316,9 +437,10 @@ pub async fn db_writer(
}
}
Err(err) => {
warn!("event insert failed: {}", err);
warn!("event insert failed: {:?}", err);
}
}
// use rate limit, if defined, and if an event was actually written.
if event_write {
if let Some(ref lim) = lim_opt {
@ -327,9 +449,9 @@ pub async fn db_writer(
// check if we have recently logged rate
// limits, but print out a message only once
// per second.
if most_recent_rate_limit.elapsed().as_secs() > 1 {
if most_recent_rate_limit.elapsed().as_secs() > 10 {
warn!(
"rate limit reached for event creation (sleep for {:?})",
"rate limit reached for event creation (sleep for {:?}) (suppressing future messages for 10 seconds)",
wait_for
);
// reset last rate limit message
@ -342,7 +464,6 @@ pub async fn db_writer(
}
}
}
conn.close().ok();
info!("database connection closed");
Ok(())
})
@ -355,7 +476,7 @@ pub fn db_version(conn: &mut Connection) -> Result<usize> {
}
/// Persist an event to the database.
pub fn write_event(conn: &mut Connection, e: &Event) -> Result<usize> {
pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
// start transaction
let tx = conn.transaction()?;
// get relevant fields from event and convert to blobs.
@ -402,7 +523,11 @@ pub fn write_event(conn: &mut Connection, e: &Event) -> Result<usize> {
params![ev_id, hex::decode(&e.pubkey).ok(), e.created_at],
)?;
if update_count > 0 {
info!("hid {} older metadata events", update_count);
info!(
"hid {} older metadata events for author {:?}",
update_count,
e.get_author_prefix()
);
}
}
// if this event is for a contact update, hide every other kind=3
@ -413,7 +538,11 @@ pub fn write_event(conn: &mut Connection, e: &Event) -> Result<usize> {
params![ev_id, hex::decode(&e.pubkey).ok(), e.created_at],
)?;
if update_count > 0 {
info!("hid {} older contact events", update_count);
info!(
"hid {} older contact events for author {:?}",
update_count,
e.get_author_prefix()
);
}
}
tx.commit()?;
@ -666,7 +795,7 @@ fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
/// query is immediately aborted.
pub async fn db_query(
sub: Subscription,
conn: r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>,
conn: PooledConnection,
query_tx: tokio::sync::mpsc::Sender<QueryResult>,
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
) {

View File

@ -40,8 +40,41 @@ pub enum Error {
ConfigError(config::ConfigError),
#[error("Data directory does not exist")]
DatabaseDirError,
#[error("Database Connection Pool Error")]
DatabasePoolError(r2d2::Error),
#[error("Custom Error : {0}")]
CustomError(String),
#[error("Task join error")]
JoinError,
#[error("Hyper Client error")]
HyperError(hyper::Error),
#[error("Unknown/Undocumented")]
UnknownError,
}
//impl From<Box<dyn std::error::Error>> for Error {
// fn from(e: Box<dyn std::error::Error>) -> Self {
// Error::CustomError("error".to_owned())
// }
//}
impl From<hyper::Error> for Error {
fn from(h: hyper::Error) -> Self {
Error::HyperError(h)
}
}
impl From<r2d2::Error> for Error {
fn from(d: r2d2::Error) -> Self {
Error::DatabasePoolError(d)
}
}
impl From<tokio::task::JoinError> for Error {
/// Wrap SQL error
fn from(_j: tokio::task::JoinError) -> Self {
Error::JoinError
}
}
impl From<rusqlite::Error> for Error {

View File

@ -2,6 +2,7 @@
use crate::config;
use crate::error::Error::*;
use crate::error::Result;
use crate::nip05;
use bitcoin_hashes::{sha256, Hash};
use lazy_static::lazy_static;
use log::*;
@ -71,7 +72,7 @@ impl From<EventCmd> for Result<Event> {
}
/// Seconds since 1970
fn unix_time() -> u64 {
pub fn unix_time() -> u64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|x| x.as_secs())
@ -79,6 +80,25 @@ fn unix_time() -> u64 {
}
impl Event {
pub fn is_kind_metadata(&self) -> bool {
self.kind == 0
}
/// Pull a NIP-05 Name out of the event, if one exists
pub fn get_nip05_addr(&self) -> Option<nip05::Nip05Name> {
if self.is_kind_metadata() {
// very quick check if we should attempt to parse this json
if self.content.contains("\"nip05\"") {
// Parse into JSON
let md_parsed: Value = serde_json::from_str(&self.content).ok()?;
let md_map = md_parsed.as_object()?;
let nip05_str = md_map.get("nip05")?.as_str()?;
return nip05::Nip05Name::try_from(nip05_str).ok();
}
}
None
}
/// Build an event tag index
fn build_index(&mut self) {
// if there are no tags; just leave the index as None
@ -107,6 +127,9 @@ impl Event {
pub fn get_event_id_prefix(&self) -> String {
self.id.chars().take(8).collect()
}
pub fn get_author_prefix(&self) -> String {
self.pubkey.chars().take(8).collect()
}
/// Check if this event has a valid signature.
fn is_valid(&self) -> bool {
@ -133,7 +156,7 @@ impl Event {
// * serialize with no spaces/newlines
let c_opt = self.to_canonical();
if c_opt.is_none() {
info!("event could not be canonicalized");
debug!("event could not be canonicalized");
return false;
}
let c = c_opt.unwrap();
@ -142,6 +165,7 @@ impl Event {
let hex_digest = format!("{:x}", digest);
// * ensure the id matches the computed sha256sum.
if self.id != hex_digest {
debug!("event id does not match digest");
return false;
}
// * validate the message digest (sig) using the pubkey & computed sha256 message hash.
@ -152,11 +176,11 @@ impl Event {
let verify = SECP.verify_schnorr(&sig, &msg, &pubkey);
matches!(verify, Ok(()))
} else {
info!("Client sent malformed pubkey");
debug!("Client sent malformed pubkey");
false
}
} else {
warn!("Error converting digest to secp256k1 message");
info!("Error converting digest to secp256k1 message");
false
}
}

View File

@ -2,7 +2,7 @@ use crate::config;
/// Relay Info
use serde::{Deserialize, Serialize};
const CARGO_PKG_VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION");
pub const CARGO_PKG_VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION");
#[derive(Debug, Serialize, Deserialize)]
#[allow(unused)]

View File

@ -5,6 +5,7 @@ pub mod db;
pub mod error;
pub mod event;
pub mod info;
pub mod nip05;
pub mod protostream;
pub mod subscription;
pub mod tags;

View File

@ -15,6 +15,7 @@ use nostr_rs_relay::db;
use nostr_rs_relay::error::{Error, Result};
use nostr_rs_relay::event::Event;
use nostr_rs_relay::info::RelayInfo;
use nostr_rs_relay::nip05;
use nostr_rs_relay::protostream;
use nostr_rs_relay::protostream::NostrMessage::*;
use nostr_rs_relay::protostream::NostrResponse::*;
@ -24,8 +25,7 @@ use std::env;
use std::net::SocketAddr;
use std::path::Path;
use tokio::runtime::Builder;
use tokio::sync::broadcast;
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::sync::broadcast::{self, Receiver, Sender};
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio_tungstenite::WebSocketStream;
@ -171,21 +171,45 @@ fn main() -> Result<(), Error> {
*settings = c;
}
let config = config::SETTINGS.read().unwrap();
let settings = config::SETTINGS.read().unwrap();
trace!("Config: {:?}", settings);
// do some config validation.
if !Path::new(&config.database.data_directory).is_dir() {
if !Path::new(&settings.database.data_directory).is_dir() {
error!("Database directory does not exist");
return Err(Error::DatabaseDirError);
}
trace!("config: {:?}", config);
let addr = format!("{}:{}", config.network.address.trim(), config.network.port);
let addr = format!(
"{}:{}",
settings.network.address.trim(),
settings.network.port
);
let socket_addr = addr.parse().expect("listening address not valid");
if let Some(addr_whitelist) = &config.authorization.pubkey_whitelist {
// address whitelisting settings
if let Some(addr_whitelist) = &settings.authorization.pubkey_whitelist {
info!(
"Event publishing restricted to {} pubkey(s)",
addr_whitelist.len()
);
}
// check if NIP-05 enforced user verification is on
if settings.verified_users.is_active() {
info!(
"NIP-05 user verification mode:{:?}",
settings.verified_users.mode
);
if let Some(d) = settings.verified_users.verify_update_duration() {
info!("NIP-05 check user verification every: {:?}", d);
}
if let Some(d) = settings.verified_users.verify_expiration_duration() {
info!("NIP-05 user verification expires after: {:?}", d);
}
if let Some(wl) = &settings.verified_users.domain_whitelist {
info!("NIP-05 domain whitelist: {:?}", wl);
}
if let Some(bl) = &settings.verified_users.domain_blacklist {
info!("NIP-05 domain blacklist: {:?}", bl);
}
}
// configure tokio runtime
let rt = Builder::new_multi_thread()
.enable_all()
@ -206,12 +230,38 @@ fn main() -> Result<(), Error> {
let (event_tx, event_rx) = mpsc::channel::<Event>(settings.limits.event_persist_buffer);
// establish a channel for letting all threads now about a
// requested server shutdown.
let (invoke_shutdown, _) = broadcast::channel::<()>(1);
let (invoke_shutdown, shutdown_listen) = broadcast::channel::<()>(1);
// create a channel for sending any new metadata event. These
// will get processed relatively slowly (a potentially
// multi-second blocking HTTP call) on a single thread, so we
// buffer requests on the channel. No harm in dropping events
// here, since we are protecting against DoS. This can make
// it difficult to setup initial metadata in bulk, since
// overwhelming this will drop events and won't register
// metadata events.
let (metadata_tx, metadata_rx) = broadcast::channel::<Event>(4096);
// start the database writer thread. Give it a channel for
// writing events, and for publishing events that have been
// written (to all connected clients).
db::db_writer(event_rx, bcast_tx.clone(), invoke_shutdown.subscribe()).await;
db::db_writer(
event_rx,
bcast_tx.clone(),
metadata_tx.clone(),
shutdown_listen,
)
.await;
info!("db writer created");
// create a nip-05 verifier thread
let verifier_opt = nip05::Verifier::new(metadata_rx, bcast_tx.clone());
if let Ok(mut v) = verifier_opt {
if settings.verified_users.is_active() {
tokio::task::spawn(async move {
info!("starting up NIP-05 verifier...");
v.run().await;
});
}
}
// // listen for ctrl-c interruupts
let ctrl_c_shutdown = invoke_shutdown.clone();
tokio::spawn(async move {
@ -220,7 +270,14 @@ fn main() -> Result<(), Error> {
ctrl_c_shutdown.send(()).ok();
});
// build a connection pool for sqlite connections
let pool = db::build_read_pool();
let pool = db::build_pool(
"client query",
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
| rusqlite::OpenFlags::SQLITE_OPEN_SHARED_CACHE,
settings.database.min_conn,
settings.database.max_conn,
true,
);
// A `Service` is needed for every connection, so this
// creates one from our `handle_request` function.
let make_svc = make_service_fn(|conn: &AddrStream| {
@ -285,7 +342,7 @@ async fn nostr_server(
// and how many it received from queries.
let mut client_published_event_count: usize = 0;
let mut client_received_event_count: usize = 0;
info!("new connection for client: {}", cid);
info!("new connection for client: {:?}", cid);
loop {
tokio::select! {
_ = shutdown.recv() => {
@ -298,6 +355,7 @@ async fn nostr_server(
client_received_event_count += 1;
nostr_stream.send(res).await.ok();
},
// TODO: consider logging the LaggedRecv error
Ok(global_event) = bcast_rx.recv() => {
// an event has been broadcast to all clients
// first check if there is a subscription for this event.
@ -306,14 +364,14 @@ async fn nostr_server(
// TODO: serialize at broadcast time, instead of
// once for each consumer.
if let Ok(event_str) = serde_json::to_string(&global_event) {
debug!("sub match: client: {}, sub: {}, event: {}",
debug!("sub match: client: {:?}, sub: {:?}, event: {:?}",
cid, s,
global_event.get_event_id_prefix());
// create an event response and send it
let res = EventRes(s.to_owned(),event_str);
nostr_stream.send(res).await.ok();
} else {
warn!("could not convert event to string");
warn!("could not serialize event {:?}", global_event.get_event_id_prefix());
}
}
},
@ -327,13 +385,19 @@ async fn nostr_server(
match parsed {
Ok(e) => {
let id_prefix:String = e.id.chars().take(8).collect();
debug!("successfully parsed/validated event: {} from client: {}", id_prefix, cid);
debug!("successfully parsed/validated event: {:?} from client: {:?}", id_prefix, cid);
// TODO: consider moving some/all
// authorization checks here, instead
// of the DB module, so we can send a
// proper NOTICE back to the client if
// they are unable to write.
// Write this to the database
event_tx.send(e.clone()).await.ok();
client_published_event_count += 1;
},
Err(_) => {
info!("client {} sent an invalid event", cid);
info!("client {:?} sent an invalid event", cid);
nostr_stream.send(NoticeRes("event was invalid".to_owned())).await.ok();
}
}
@ -385,19 +449,19 @@ async fn nostr_server(
}
},
None => {
debug!("normal websocket close from client: {}",cid);
debug!("normal websocket close from client: {:?}",cid);
break;
},
Some(Err(Error::ConnError)) => {
debug!("got connection close/error, disconnecting client: {}",cid);
debug!("got connection close/error, disconnecting client: {:?}",cid);
break;
}
Some(Err(Error::EventMaxLengthError(s))) => {
info!("client {} sent event larger ({} bytes) than max size", cid, s);
info!("client {:?} sent event larger ({} bytes) than max size", cid, s);
nostr_stream.send(NoticeRes("event exceeded max size".to_owned())).await.ok();
},
Some(Err(e)) => {
info!("got non-fatal error from client: {}, error: {:?}", cid, e);
info!("got non-fatal error from client: {:?}, error: {:?}", cid, e);
},
}
},
@ -408,7 +472,7 @@ async fn nostr_server(
stop_tx.send(()).ok();
}
info!(
"stopping connection for client: {} (client sent {} event(s), received {})",
"stopping connection for client: {:?} (client sent {} event(s), received {})",
cid, client_published_event_count, client_received_event_count
);
}

812
src/nip05.rs Normal file
View File

@ -0,0 +1,812 @@
//! User Verification Through NIP-05
use crate::config::SETTINGS;
use crate::db;
use crate::error::{Error, Result};
use crate::event::{unix_time, Event};
use hyper::body::HttpBody;
use hyper::client::connect::HttpConnector;
use hyper::Client;
use hyper_tls::HttpsConnector;
use log::*;
use rand::Rng;
use rusqlite::params;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use tokio::time::Interval;
/// NIP-05 verifier
pub struct Verifier {
/// Metadata events for us to inspect
metadata_rx: tokio::sync::broadcast::Receiver<Event>,
/// Newly validated events get written and then broadcast on this channel to subscribers
event_tx: tokio::sync::broadcast::Sender<Event>,
/// SQLite read query pool
read_pool: db::SqlitePool,
/// SQLite write query pool
write_pool: db::SqlitePool,
/// HTTP client
client: hyper::Client<HttpsConnector<HttpConnector>, hyper::Body>,
/// After all accounts are updated, wait this long before checking again.
wait_after_finish: Duration,
/// Minimum amount of time between HTTP queries
http_wait_duration: Duration,
/// Interval for updating verification records
reverify_interval: Interval,
}
/// A NIP-05 identifier is a local part and domain.
#[derive(PartialEq, Debug, Clone)]
pub struct Nip05Name {
local: String,
domain: String,
}
impl Nip05Name {
/// Does this name represent the entire domain?
pub fn is_domain_only(&self) -> bool {
self.local == "_"
}
/// Determine the URL to query for verification
fn to_url(&self) -> Option<http::Uri> {
format!(
"https://{}/.well-known/nostr.json?name={}",
self.domain, self.local
)
.parse::<http::Uri>()
.ok()
}
}
// Parsing Nip05Names from strings
impl std::convert::TryFrom<&str> for Nip05Name {
type Error = Error;
fn try_from(inet: &str) -> Result<Self, Self::Error> {
// break full name at the @ boundary.
let components: Vec<&str> = inet.split('@').collect();
if components.len() != 2 {
Err(Error::CustomError("too many/few components".to_owned()))
} else {
// check if local name is valid
let local = components[0];
let domain = components[1];
if local
.chars()
.all(|x| x.is_alphanumeric() || x == '_' || x == '-' || x == '.')
{
if domain
.chars()
.all(|x| x.is_alphanumeric() || x == '-' || x == '.')
{
Ok(Nip05Name {
local: local.to_owned(),
domain: domain.to_owned(),
})
} else {
Err(Error::CustomError(
"invalid character in domain part".to_owned(),
))
}
} else {
Err(Error::CustomError(
"invalid character in local part".to_owned(),
))
}
}
}
}
impl std::fmt::Display for Nip05Name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}@{}", self.local, self.domain)
}
}
// Current time, with a slight foward jitter in seconds
fn now_jitter(sec: u64) -> u64 {
// random time between now, and 10min in future.
let mut rng = rand::thread_rng();
let jitter_amount = rng.gen_range(0..sec);
let now = unix_time();
now.saturating_add(jitter_amount)
}
/// Check if the specified username and address are present and match in this response body
fn body_contains_user(username: &str, address: &str, bytes: hyper::body::Bytes) -> Result<bool> {
// convert the body into json
let body: serde_json::Value = serde_json::from_slice(&bytes)?;
// ensure we have a names object.
let names_map = body
.as_object()
.and_then(|x| x.get("names"))
.and_then(|x| x.as_object())
.ok_or_else(|| Error::CustomError("not a map".to_owned()))?;
// get the pubkey for the requested user
let check_name = names_map.get(username).and_then(|x| x.as_str());
// ensure the address is a match
Ok(check_name.map(|x| x == address).unwrap_or(false))
}
impl Verifier {
pub fn new(
metadata_rx: tokio::sync::broadcast::Receiver<Event>,
event_tx: tokio::sync::broadcast::Sender<Event>,
) -> Result<Self> {
info!("creating NIP-05 verifier");
// build a database connection for reading and writing.
let write_pool = db::build_pool(
"nip05 writer",
rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE,
1, // min conns
4, // max conns
true, // wait for DB
);
let read_pool = db::build_pool(
"nip05 reader",
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY,
1, // min conns
8, // max conns
true, // wait for DB
);
// setup hyper client
let https = HttpsConnector::new();
let client = Client::builder().build::<_, hyper::Body>(https);
// After all accounts have been re-verified, don't check again
// for this long.
let wait_after_finish = Duration::from_secs(60 * 10);
// when we have an active queue of accounts to validate, we
// will wait this duration between HTTP requests.
let http_wait_duration = Duration::from_secs(1);
// setup initial interval for re-verification. If we find
// there is no work to be done, it will be reset to a longer
// duration.
let reverify_interval = tokio::time::interval(http_wait_duration);
Ok(Verifier {
metadata_rx,
event_tx,
read_pool,
write_pool,
client,
wait_after_finish,
http_wait_duration,
reverify_interval,
})
}
/// Perform web verification against a NIP-05 name and address.
pub async fn get_web_verification(
&mut self,
nip: &Nip05Name,
pubkey: &str,
) -> UserWebVerificationStatus {
self.get_web_verification_res(nip, pubkey)
.await
.unwrap_or(UserWebVerificationStatus::Unknown)
}
/// Perform web verification against an `Event` (must be metadata).
pub async fn get_web_verification_from_event(
&mut self,
e: &Event,
) -> UserWebVerificationStatus {
let nip_parse = e.get_nip05_addr();
if let Some(nip) = nip_parse {
self.get_web_verification_res(&nip, &e.pubkey)
.await
.unwrap_or(UserWebVerificationStatus::Unknown)
} else {
UserWebVerificationStatus::Unknown
}
}
/// Perform web verification, with a `Result` return.
async fn get_web_verification_res(
&mut self,
nip: &Nip05Name,
pubkey: &str,
) -> Result<UserWebVerificationStatus> {
// determine if this domain should be checked
if !is_domain_allowed(&nip.domain) {
return Ok(UserWebVerificationStatus::DomainNotAllowed);
}
let url = nip
.to_url()
.ok_or_else(|| Error::CustomError("invalid NIP-05 URL".to_owned()))?;
let req = hyper::Request::builder()
.method(hyper::Method::GET)
.uri(url)
.header("Accept", "application/json")
.header(
"User-Agent",
format!(
"nostr-rs-relay/{} NIP-05 Verifier",
crate::info::CARGO_PKG_VERSION.unwrap()
),
)
.body(hyper::Body::empty())
.expect("request builder");
let response_fut = self.client.request(req);
// HTTP request with timeout
match tokio::time::timeout(Duration::from_secs(5), response_fut).await {
Ok(response_res) => {
let response = response_res?;
// limit size of verification document to 1MB.
const MAX_ALLOWED_RESPONSE_SIZE: u64 = 1024 * 1024;
// determine content length from response
let response_content_length = match response.body().size_hint().upper() {
Some(v) => v,
None => MAX_ALLOWED_RESPONSE_SIZE + 1, // reject missing content length
};
// TODO: test how hyper handles the client providing an inaccurate content-length.
if response_content_length <= MAX_ALLOWED_RESPONSE_SIZE {
let (parts, body) = response.into_parts();
// TODO: consider redirects
if parts.status == http::StatusCode::OK {
// parse body, determine if the username / key / address is present
let body_bytes = hyper::body::to_bytes(body).await?;
let body_matches = body_contains_user(&nip.local, pubkey, body_bytes)?;
if body_matches {
return Ok(UserWebVerificationStatus::Verified);
} else {
// successful response, parsed as a nip-05
// document, but this name/pubkey was not
// present.
return Ok(UserWebVerificationStatus::Unverified);
}
}
} else {
info!(
"content length missing or exceeded limits for account: {:?}",
nip.to_string()
);
}
}
Err(_) => {
info!("timeout verifying account {:?}", nip);
return Ok(UserWebVerificationStatus::Unknown);
}
}
Ok(UserWebVerificationStatus::Unknown)
}
/// Perform NIP-05 verifier tasks.
pub async fn run(&mut self) {
// use this to schedule periodic re-validation tasks
// run a loop, restarting on failure
loop {
let res = self.run_internal().await;
if let Err(e) = res {
info!("error in verifier: {:?}", e);
}
}
}
/// Internal select loop for performing verification
async fn run_internal(&mut self) -> Result<()> {
tokio::select! {
m = self.metadata_rx.recv() => {
match m {
Ok(e) => {
if let Some(naddr) = e.get_nip05_addr() {
info!("got metadata event for ({:?},{:?})", naddr.to_string() ,e.get_author_prefix());
// Process a new author, checking if they are verified:
let check_verified = get_latest_user_verification(self.read_pool.get().expect("could not get connection"), &e.pubkey).await;
// ensure the event we got is more recent than the one we have, otherwise we can ignore it.
if let Ok(last_check) = check_verified {
if e.created_at <= last_check.event_created {
// this metadata is from the same author as an existing verification.
// it is older than what we have, so we can ignore it.
debug!("received older metadata event for author {:?}", e.get_author_prefix());
return Ok(());
}
}
// old, or no existing record for this user. In either case, we just create a new one.
let start = Instant::now();
let v = self.get_web_verification_from_event(&e).await;
info!(
"checked name {:?}, result: {:?}, in: {:?}",
naddr.to_string(),
v,
start.elapsed()
);
// sleep to limit how frequently we make HTTP requests for new metadata events. This should limit us to 4 req/sec.
tokio::time::sleep(Duration::from_millis(250)).await;
// if this user was verified, we need to write the
// record, persist the event, and broadcast.
if let UserWebVerificationStatus::Verified = v {
self.create_new_verified_user(&naddr.to_string(), &e).await?;
}
}
},
Err(tokio::sync::broadcast::error::RecvError::Lagged(c)) => {
warn!("incoming metadata events overwhelmed buffer, {} events dropped",c);
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
info!("metadata broadcast channel closed");
}
}
},
_ = self.reverify_interval.tick() => {
// check and see if there is an old account that needs
// to be reverified
self.do_reverify().await?;
},
}
Ok(())
}
/// Reverify the oldest user verification record.
async fn do_reverify(&mut self) -> Result<()> {
let reverify_setting;
let max_failures;
{
// this block prevents a read handle to settings being
// captured by the async DB call (guard is not Send)
let settings = SETTINGS.read().unwrap();
reverify_setting = settings.verified_users.verify_update_frequency_duration;
max_failures = settings.verified_users.max_consecutive_failures;
}
// get from settings, but default to 6hrs between re-checking an account
let reverify_dur = reverify_setting.unwrap_or_else(|| Duration::from_secs(60 * 60 * 6));
// find all verification records that have success or failure OLDER than the reverify_dur.
let now = SystemTime::now();
let earliest = now - reverify_dur;
let earliest_epoch = earliest
.duration_since(SystemTime::UNIX_EPOCH)
.map(|x| x.as_secs())
.unwrap_or(0);
let vr = get_oldest_user_verification(self.read_pool.get()?, earliest_epoch).await;
match vr {
Ok(ref v) => {
let new_status = self.get_web_verification(&v.name, &v.address).await;
match new_status {
UserWebVerificationStatus::Verified => {
// freshly verified account, update the
// timestamp.
self.update_verification_record(self.write_pool.get()?, v)
.await?;
}
UserWebVerificationStatus::DomainNotAllowed
| UserWebVerificationStatus::Unknown => {
// server may be offline, or temporarily
// blocked by the config file. Note the
// failure so we can process something
// else.
// have we had enough failures to give up?
if v.failure_count >= max_failures as u64 {
info!(
"giving up on verifying {:?} after {} failures",
v.name, v.failure_count
);
self.delete_verification_record(self.write_pool.get()?, v)
.await?;
} else {
// record normal failure, incrementing failure count
self.fail_verification_record(self.write_pool.get()?, v)
.await?;
}
}
UserWebVerificationStatus::Unverified => {
// domain has removed the verification, drop
// the record on our side.
self.delete_verification_record(self.write_pool.get()?, v)
.await?;
}
}
}
Err(Error::SqlError(rusqlite::Error::QueryReturnedNoRows)) => {
// No users need verification. Reset the interval to
// the next verification attempt.
let start = tokio::time::Instant::now() + self.wait_after_finish;
self.reverify_interval = tokio::time::interval_at(start, self.http_wait_duration);
}
Err(ref e) => {
warn!(
"Error when checking for NIP-05 verification records: {:?}",
e
);
}
}
Ok(())
}
/// Reset the verification timestamp on a VerificationRecord
pub async fn update_verification_record(
&mut self,
mut conn: db::PooledConnection,
vr: &VerificationRecord,
) -> Result<()> {
let vr_id = vr.rowid;
let vr_str = vr.to_string();
tokio::task::spawn_blocking(move || {
// add some jitter to the verification to prevent everything from stacking up together.
let verif_time = now_jitter(600);
let tx = conn.transaction()?;
{
// update verification time and reset any failure count
let query =
"UPDATE user_verification SET verified_at=?, failure_count=0 WHERE id=?";
let mut stmt = tx.prepare(query)?;
stmt.execute(params![verif_time, vr_id])?;
}
tx.commit()?;
info!("verification updated for {}", vr_str);
let ok: Result<()> = Ok(());
ok
})
.await?
}
/// Reset the failure timestamp on a VerificationRecord
pub async fn fail_verification_record(
&mut self,
mut conn: db::PooledConnection,
vr: &VerificationRecord,
) -> Result<()> {
let vr_id = vr.rowid;
let vr_str = vr.to_string();
let fail_count = vr.failure_count.saturating_add(1);
tokio::task::spawn_blocking(move || {
// add some jitter to the verification to prevent everything from stacking up together.
let fail_time = now_jitter(600);
let tx = conn.transaction()?;
{
let query = "UPDATE user_verification SET failed_at=?, failure_count=? WHERE id=?";
let mut stmt = tx.prepare(query)?;
stmt.execute(params![fail_time, fail_count, vr_id])?;
}
tx.commit()?;
info!("verification failed for {}", vr_str);
let ok: Result<()> = Ok(());
ok
})
.await?
}
/// Delete a VerificationRecord that is no longer valid
pub async fn delete_verification_record(
&mut self,
mut conn: db::PooledConnection,
vr: &VerificationRecord,
) -> Result<()> {
let vr_id = vr.rowid;
let vr_str = vr.to_string();
tokio::task::spawn_blocking(move || {
let tx = conn.transaction()?;
{
let query = "DELETE FROM user_verification WHERE id=?;";
let mut stmt = tx.prepare(query)?;
stmt.execute(params![vr_id])?;
}
tx.commit()?;
info!("verification rescinded for {}", vr_str);
let ok: Result<()> = Ok(());
ok
})
.await?
}
/// Persist an event, create a verification record, and broadcast.
// TODO: have more event-writing logic handled in the db module.
// Right now, these events avoid the rate limit. That is
// acceptable since as soon as the user is registered, this path
// is no longer used.
// TODO: refactor these into spawn_blocking
// calls to get them off the async executors.
async fn create_new_verified_user(&mut self, name: &str, event: &Event) -> Result<()> {
let start = Instant::now();
// we should only do this if we are enabled. if we are
// disabled/passive, the event has already been persisted.
let should_write_event;
{
let settings = SETTINGS.read().unwrap();
should_write_event = settings.verified_users.is_enabled()
}
if should_write_event {
match db::write_event(&mut self.write_pool.get()?, event) {
Ok(updated) => {
if updated != 0 {
info!(
"persisted event: {:?} in {:?}",
event.get_event_id_prefix(),
start.elapsed()
);
self.event_tx.send(event.clone()).ok();
}
}
Err(err) => {
warn!("event insert failed: {:?}", err);
if let Error::SqlError(r) = err {
warn!("because: : {:?}", r);
}
}
}
}
// write the verification record
save_verification_record(self.write_pool.get()?, event, name).await?;
Ok(())
}
}
/// Result of checking user's verification status against DNS/HTTP.
#[derive(PartialEq, Debug, Clone)]
pub enum UserWebVerificationStatus {
Verified, // user is verified, as of now.
DomainNotAllowed, // domain blacklist or whitelist denied us from attempting a verification
Unknown, // user's status could not be determined (timeout, server error)
Unverified, // user's status is not verified (successful check, name / addr do not match)
}
/// A NIP-05 verification record.
#[derive(PartialEq, Debug, Clone)]
// Basic information for a verification event. Gives us all we need to assert a NIP-05 address is good.
pub struct VerificationRecord {
pub rowid: u64, // database row for this verification event
pub name: Nip05Name, // address being verified
pub address: String, // pubkey
pub event: String, // event ID hash providing the verification
pub event_created: u64, // when the metadata event was published
pub last_success: Option<u64>, // the most recent time a verification was provided. None if verification under this name has never succeeded.
pub last_failure: Option<u64>, // the most recent time verification was attempted, but could not be completed.
pub failure_count: u64, // how many consecutive failures have been observed.
}
/// Check with settings to determine if a given domain is allowed to
/// publish.
pub fn is_domain_allowed(domain: &str) -> bool {
let settings = SETTINGS.read().unwrap();
// if there is a whitelist, domain must be present in it.
if let Some(wl) = &settings.verified_users.domain_whitelist {
// workaround for Vec contains not accepting &str
return wl.iter().any(|x| x == domain);
}
// otherwise, check that user is not in the blacklist
if let Some(bl) = &settings.verified_users.domain_blacklist {
return !bl.iter().any(|x| x == domain);
}
true
}
impl VerificationRecord {
/// Check if the record is recent enough to be considered valid,
/// and the domain is allowed.
pub fn is_valid(&self) -> bool {
let settings = SETTINGS.read().unwrap();
// how long a verification record is good for
let nip05_expiration = &settings.verified_users.verify_expiration_duration;
if let Some(e) = nip05_expiration {
if !self.is_current(e) {
return false;
}
}
// check domains
is_domain_allowed(&self.name.domain)
}
/// Check if this record has been validated since the given
/// duration.
fn is_current(&self, d: &Duration) -> bool {
match self.last_success {
Some(s) => {
// current time - duration
let now = SystemTime::now();
let cutoff = now - *d;
let cutoff_epoch = cutoff
.duration_since(SystemTime::UNIX_EPOCH)
.map(|x| x.as_secs())
.unwrap_or(0);
s > cutoff_epoch
}
None => false,
}
}
}
impl std::fmt::Display for VerificationRecord {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"({:?},{:?})",
self.name.to_string(),
self.address.chars().take(8).collect::<String>()
)
}
}
/// Create a new verification record based on an event
pub async fn save_verification_record(
mut conn: db::PooledConnection,
event: &Event,
name: &str,
) -> Result<()> {
let e = hex::decode(&event.id).ok();
let n = name.to_owned();
let a_prefix = event.get_author_prefix();
tokio::task::spawn_blocking(move || {
let tx = conn.transaction()?;
{
// if we create a /new/ one, we should get rid of any old ones. or group the new ones by name and only consider the latest.
let query = "INSERT INTO user_verification (metadata_event, name, verified_at) VALUES ((SELECT id from event WHERE event_hash=?), ?, strftime('%s','now'));";
let mut stmt = tx.prepare(query)?;
stmt.execute(params![e, n])?;
// get the row ID
let v_id = tx.last_insert_rowid();
// delete everything else by this name
let del_query = "DELETE FROM user_verification WHERE name = ? AND id != ?;";
let mut del_stmt = tx.prepare(del_query)?;
let count = del_stmt.execute(params![n,v_id])?;
if count > 0 {
info!("removed {} old verification records for ({:?},{:?})", count, n, a_prefix);
}
}
tx.commit()?;
info!("saved new verification record for ({:?},{:?})", n, a_prefix);
let ok: Result<()> = Ok(());
ok
}).await?
}
/// Retrieve the most recent verification record for a given pubkey
// Important, this is the most recent verification /of the most recent metadata event/.
pub async fn get_latest_user_verification(
conn: db::PooledConnection,
pubkey: &str,
) -> Result<VerificationRecord> {
let p = pubkey.to_owned();
tokio::task::spawn_blocking(move || query_latest_user_verification(conn, p)).await?
}
pub fn query_latest_user_verification(
mut conn: db::PooledConnection,
pubkey: String,
) -> Result<VerificationRecord> {
let tx = conn.transaction()?;
let query = "SELECT v.id, v.name, e.event_hash, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v LEFT JOIN event e ON e.id=v.metadata_event WHERE e.author=? ORDER BY e.created_at DESC, v.verified_at DESC, v.failed_at DESC LIMIT 1;";
let mut stmt = tx.prepare_cached(query)?;
let fields = stmt.query_row(params![hex::decode(&pubkey).ok()], |r| {
let rowid: u64 = r.get(0)?;
let rowname: String = r.get(1)?;
let eventid: Vec<u8> = r.get(2)?;
let created_at: u64 = r.get(3)?;
// create a tuple since we can't throw non-rusqlite errors in this closure
Ok((
rowid,
rowname,
eventid,
created_at,
r.get(4).ok(),
r.get(5).ok(),
r.get(6)?,
))
})?;
Ok(VerificationRecord {
rowid: fields.0,
name: Nip05Name::try_from(&fields.1[..])?,
address: pubkey,
event: hex::encode(fields.2),
event_created: fields.3,
last_success: fields.4,
last_failure: fields.5,
failure_count: fields.6,
})
}
/// Retrieve the oldest user verification (async)
pub async fn get_oldest_user_verification(
conn: db::PooledConnection,
earliest: u64,
) -> Result<VerificationRecord> {
let res =
tokio::task::spawn_blocking(move || query_oldest_user_verification(conn, earliest)).await?;
res
}
pub fn query_oldest_user_verification(
mut conn: db::PooledConnection,
earliest: u64,
) -> Result<VerificationRecord> {
let tx = conn.transaction()?;
let query = "SELECT v.id, v.name, e.event_hash, e.author, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v LEFT JOIN event e ON e.id=v.metadata_event WHERE (v.verified_at < ? OR v.verified_at IS NULL) AND (v.failed_at < ? OR v.failed_at IS NULL) ORDER BY v.verified_at ASC, v.failed_at ASC LIMIT 1;";
let mut stmt = tx.prepare_cached(query)?;
let fields = stmt.query_row(params![earliest, earliest], |r| {
let rowid: u64 = r.get(0)?;
let rowname: String = r.get(1)?;
let eventid: Vec<u8> = r.get(2)?;
let pubkey: Vec<u8> = r.get(3)?;
let created_at: u64 = r.get(4)?;
// create a tuple since we can't throw non-rusqlite errors in this closure
Ok((
rowid,
rowname,
eventid,
pubkey,
created_at,
r.get(5).ok(),
r.get(6).ok(),
r.get(7)?,
))
})?;
let vr = VerificationRecord {
rowid: fields.0,
name: Nip05Name::try_from(&fields.1[..])?,
address: hex::encode(fields.3),
event: hex::encode(fields.2),
event_created: fields.4,
last_success: fields.5,
last_failure: fields.6,
failure_count: fields.7,
};
Ok(vr)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn local_from_inet() {
let addr = "bob@example.com";
let parsed = Nip05Name::try_from(addr);
assert!(!parsed.is_err());
let v = parsed.unwrap();
assert_eq!(v.local, "bob");
assert_eq!(v.domain, "example.com");
}
#[test]
fn not_enough_sep() {
let addr = "bob_example.com";
let parsed = Nip05Name::try_from(addr);
assert!(parsed.is_err());
}
#[test]
fn too_many_sep() {
let addr = "foo@bob@example.com";
let parsed = Nip05Name::try_from(addr);
assert!(parsed.is_err());
}
#[test]
fn invalid_local_name() {
// non-permitted ascii chars
assert!(Nip05Name::try_from("foo!@example.com").is_err());
assert!(Nip05Name::try_from("foo @example.com").is_err());
assert!(Nip05Name::try_from(" foo@example.com").is_err());
assert!(Nip05Name::try_from("f oo@example.com").is_err());
assert!(Nip05Name::try_from("foo<@example.com").is_err());
// unicode dash
assert!(Nip05Name::try_from("foobar@example.com").is_err());
// emoji
assert!(Nip05Name::try_from("foo😭bar@example.com").is_err());
}
#[test]
fn invalid_domain_name() {
// non-permitted ascii chars
assert!(Nip05Name::try_from("foo@examp!e.com").is_err());
assert!(Nip05Name::try_from("foo@ example.com").is_err());
assert!(Nip05Name::try_from("foo@exa mple.com").is_err());
assert!(Nip05Name::try_from("foo@example .com").is_err());
assert!(Nip05Name::try_from("foo@exa<mple.com").is_err());
// unicode dash
assert!(Nip05Name::try_from("foobar@example.com").is_err());
// emoji
assert!(Nip05Name::try_from("foobar@ex😭ample.com").is_err());
}
#[test]
fn to_url() {
let nip = Nip05Name::try_from("foobar@example.com").unwrap();
assert_eq!(
nip.to_url(),
Some(
"https://example.com/.well-known/nostr.json?name=foobar"
.parse()
.unwrap()
)
);
}
}

View File

@ -81,9 +81,11 @@ impl Stream for NostrStream {
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(v)) => match v {
Ok(Message::Text(vs)) => Poll::Ready(Some(convert(vs))),
Ok(Message::Ping(_x)) => {
debug!("client ping");
Ok(Message::Ping(x)) => {
debug!("client ping ({:?})", x);
//Pin::new(&mut self.ws_stream).start_send(Message::Pong(x));
// TODO: restructure this so that Pongs work
//Pin::new(&mut self.ws_stream).write_pending();
//info!("sent pong");
Poll::Pending
}