mirror of
https://github.com/scsibug/nostr-rs-relay.git
synced 2025-09-01 03:40:46 -04:00
Compare commits
46 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
8da6f6555a | ||
|
5bcc63bd56 | ||
|
035cf34673 | ||
|
be8170342e | ||
|
0a3b15f41f | ||
|
2b4b17dbda | ||
|
5058d98ad6 | ||
|
f4ecd43708 | ||
|
a8f465fdc8 | ||
|
1c14adc766 | ||
|
e894a86566 | ||
|
bedc378624 | ||
|
e1c2a6b758 | ||
|
990bb656e8 | ||
|
168cfc3b26 | ||
|
a36ad378f6 | ||
|
538d139ebf | ||
|
23f7730fea | ||
|
8aa1256254 | ||
|
9ed3391b46 | ||
|
4ad483090e | ||
|
9b351aab9b | ||
|
597749890e | ||
|
1d499cf12b | ||
|
ed3a6b9692 | ||
|
048199e30b | ||
|
414e83f696 | ||
|
225c8f762e | ||
|
887fc28ab2 | ||
|
294d3b99c3 | ||
|
53990672ae | ||
|
9c1b21cbfe | ||
|
2f63417646 | ||
|
3b25160852 | ||
|
34ad549cde | ||
|
f8b1fe5035 | ||
|
f2001dc34a | ||
|
b593001229 | ||
|
5913b9f87a | ||
|
77f35f9f43 | ||
|
9e06cc9482 | ||
|
e66fa4ac42 | ||
|
99e117f620 | ||
|
8250e00f05 | ||
|
c9f87ec563 | ||
|
ceaa01e8b4 |
772
Cargo.lock
generated
772
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
12
Cargo.toml
12
Cargo.toml
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "nostr-rs-relay"
|
||||
version = "0.5.0"
|
||||
version = "0.6.2"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
@@ -9,17 +9,17 @@ env_logger = "^0.9"
|
||||
tokio = { version = "^1.16", features = ["full"] }
|
||||
futures = "^0.3"
|
||||
futures-util = "^0.3"
|
||||
tokio-tungstenite = "^0.16"
|
||||
tungstenite = "^0.16"
|
||||
tokio-tungstenite = "^0.17"
|
||||
tungstenite = "^0.17"
|
||||
thiserror = "^1"
|
||||
uuid = { version = "^0.8", features = ["v4"] }
|
||||
config = { version = "0.11", features = ["toml"] }
|
||||
bitcoin_hashes = { version = "^0.9", features = ["serde"] }
|
||||
config = { version = "^0.12", features = ["toml"] }
|
||||
bitcoin_hashes = { version = "^0.10", features = ["serde"] }
|
||||
secp256k1 = {version = "^0.21", features = ["rand", "rand-std", "serde", "bitcoin_hashes"] }
|
||||
serde = { version = "^1.0", features = ["derive"] }
|
||||
serde_json = {version = "^1.0", features = ["preserve_order"]}
|
||||
hex = "^0.4"
|
||||
rusqlite = { version = "^0.26", features = ["limits"]}
|
||||
rusqlite = { version = "^0.26", features = ["limits","bundled"]}
|
||||
r2d2 = "^0.8"
|
||||
r2d2_sqlite = "^0.19"
|
||||
lazy_static = "^1.4"
|
||||
|
@@ -1,4 +1,4 @@
|
||||
FROM rust:1.58.1 as builder
|
||||
FROM docker.io/library/rust:1.63.0@sha256:d7e3f69edcdcd03b145d8d9361765b816656755e49c1c1fe28224a4505f91b0a as builder
|
||||
|
||||
RUN USER=root cargo new --bin nostr-rs-relay
|
||||
WORKDIR ./nostr-rs-relay
|
||||
@@ -12,7 +12,7 @@ COPY ./src ./src
|
||||
RUN rm ./target/release/deps/nostr*relay*
|
||||
RUN cargo build --release
|
||||
|
||||
FROM debian:bullseye-20220125-slim
|
||||
FROM docker.io/library/debian:bullseye-20220801-slim@sha256:139a42fa3bde3e5bad6ae912aaaf2103565558a7a73afe6ce6ceed6e46a6e519
|
||||
ARG APP=/usr/src/app
|
||||
ARG APP_DATA=/usr/src/app/db
|
||||
RUN apt-get update \
|
||||
|
12
README.md
12
README.md
@@ -17,10 +17,12 @@ NIPs with a relay-specific implementation are listed here.
|
||||
- [x] NIP-01: Id/Author prefix search (_experimental_)
|
||||
- [x] NIP-02: Hide old contact list events
|
||||
- [ ] NIP-03: OpenTimestamps
|
||||
- [ ] NIP-05: Mapping Nostr keys to DNS identifiers
|
||||
- [ ] NIP-09: Event deletion
|
||||
- [x] NIP-05: Mapping Nostr keys to DNS identifiers
|
||||
- [x] NIP-09: Event deletion
|
||||
- [x] NIP-11: Relay information document
|
||||
- [x] NIP-12: Generic tag search (_experimental_)
|
||||
- [x] NIP-15: End of stored events notice
|
||||
- [x] NIP-16: Replaceable and ephemeral events
|
||||
|
||||
## Quick Start
|
||||
|
||||
@@ -79,8 +81,10 @@ termination, load balancing, and other features), see [Reverse
|
||||
Proxy](reverse-proxy.md).
|
||||
|
||||
## Dev Channel
|
||||
The current dev discussions for this project is happening at https://discord.gg/ufG6fH52Vk.
|
||||
Drop in to query any development related questions.
|
||||
|
||||
For development discussions, please feel free to use the [sourcehut
|
||||
mailing list](https://lists.sr.ht/~gheartsfield/nostr-rs-relay-devel).
|
||||
Or, drop by the [Nostr Telegram Channel](https://t.me/nostr_protocol).
|
||||
|
||||
License
|
||||
---
|
||||
|
@@ -63,8 +63,8 @@ reject_future_seconds = 1800
|
||||
#broadcast_buffer = 16384
|
||||
|
||||
# Event persistence buffer size, in number of events. This provides
|
||||
# backpressure to senders if writes are slow. Defaults to 16.
|
||||
#event_persist_buffer = 16
|
||||
# backpressure to senders if writes are slow.
|
||||
#event_persist_buffer = 4096
|
||||
|
||||
[authorization]
|
||||
# Pubkey addresses in this array are whitelisted for event publishing.
|
||||
|
@@ -1,4 +1,5 @@
|
||||
//! Configuration file and settings management
|
||||
use config::{Config, ConfigError, File};
|
||||
use lazy_static::lazy_static;
|
||||
use log::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -138,27 +139,29 @@ pub struct Settings {
|
||||
|
||||
impl Settings {
|
||||
pub fn new() -> Self {
|
||||
let d = Self::default();
|
||||
let default_settings = Self::default();
|
||||
// attempt to construct settings with file
|
||||
// Self::new_from_default(&d).unwrap_or(d)
|
||||
let from_file = Self::new_from_default(&d);
|
||||
let from_file = Self::new_from_default(&default_settings);
|
||||
match from_file {
|
||||
Ok(f) => f,
|
||||
Err(e) => {
|
||||
warn!("Error reading config file ({:?})", e);
|
||||
d
|
||||
default_settings
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn new_from_default(default: &Settings) -> Result<Self, config::ConfigError> {
|
||||
let config: config::Config = config::Config::new();
|
||||
let mut settings: Settings = config
|
||||
fn new_from_default(default: &Settings) -> Result<Self, ConfigError> {
|
||||
let builder = Config::builder();
|
||||
let config: Config = builder
|
||||
// use defaults
|
||||
.with_merged(config::Config::try_from(default).unwrap())?
|
||||
.add_source(Config::try_from(default)?)
|
||||
// override with file contents
|
||||
.with_merged(config::File::with_name("config"))?
|
||||
.try_into()?;
|
||||
.add_source(File::with_name("config"))
|
||||
.build()?
|
||||
.try_into()
|
||||
.unwrap();
|
||||
let mut settings: Settings = config.try_deserialize()?;
|
||||
// ensure connection pool size is logical
|
||||
if settings.database.min_conn > settings.database.max_conn {
|
||||
panic!(
|
||||
|
483
src/db.rs
483
src/db.rs
@@ -2,13 +2,14 @@
|
||||
use crate::config::SETTINGS;
|
||||
use crate::error::Error;
|
||||
use crate::error::Result;
|
||||
use crate::event::Event;
|
||||
use crate::event::{single_char_tagname, Event};
|
||||
use crate::hexrange::hex_range;
|
||||
use crate::hexrange::HexSearch;
|
||||
use crate::nip05;
|
||||
use crate::schema::{upgrade_db, STARTUP_SQL};
|
||||
use crate::subscription::ReqFilter;
|
||||
use crate::subscription::Subscription;
|
||||
use crate::utils::is_hex;
|
||||
use crate::utils::{is_hex, is_lower_hex};
|
||||
use governor::clock::Clock;
|
||||
use governor::{Quota, RateLimiter};
|
||||
use hex;
|
||||
@@ -27,6 +28,13 @@ use tokio::task;
|
||||
|
||||
pub type SqlitePool = r2d2::Pool<r2d2_sqlite::SqliteConnectionManager>;
|
||||
pub type PooledConnection = r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>;
|
||||
|
||||
/// Events submitted from a client, with a return channel for notices
|
||||
pub struct SubmittedEvent {
|
||||
pub event: Event,
|
||||
pub notice_tx: tokio::sync::mpsc::Sender<String>,
|
||||
}
|
||||
|
||||
/// Database file
|
||||
pub const DB_FILE: &str = "nostr.db";
|
||||
|
||||
@@ -76,7 +84,7 @@ pub fn build_conn(flags: OpenFlags) -> Result<Connection> {
|
||||
|
||||
/// Spawn a database writer that persists events to the SQLite store.
|
||||
pub async fn db_writer(
|
||||
mut event_rx: tokio::sync::mpsc::Receiver<Event>,
|
||||
mut event_rx: tokio::sync::mpsc::Receiver<SubmittedEvent>,
|
||||
bcast_tx: tokio::sync::broadcast::Sender<Event>,
|
||||
metadata_tx: tokio::sync::broadcast::Sender<Event>,
|
||||
mut shutdown: tokio::sync::broadcast::Receiver<()>,
|
||||
@@ -131,18 +139,20 @@ pub async fn db_writer(
|
||||
break;
|
||||
}
|
||||
let mut event_write = false;
|
||||
let event = next_event.unwrap();
|
||||
|
||||
let subm_event = next_event.unwrap();
|
||||
let event = subm_event.event;
|
||||
let notice_tx = subm_event.notice_tx;
|
||||
// check if this event is authorized.
|
||||
if let Some(allowed_addrs) = whitelist {
|
||||
debug!("Checking against pubkey whitelist");
|
||||
// if the event address is not in allowed_addrs.
|
||||
if !allowed_addrs.contains(&event.pubkey) {
|
||||
info!(
|
||||
"Rejecting event {}, unauthorized author",
|
||||
event.get_event_id_prefix()
|
||||
);
|
||||
// TODO: define a channel that can send NOTICEs back to the client.
|
||||
notice_tx
|
||||
.try_send("pubkey is not allowed to publish to this relay".to_owned())
|
||||
.ok();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -171,6 +181,12 @@ pub async fn db_writer(
|
||||
uv.name.to_string(),
|
||||
event.get_author_prefix()
|
||||
);
|
||||
notice_tx
|
||||
.try_send(
|
||||
"NIP-05 verification is no longer valid (expired/wrong domain)"
|
||||
.to_owned(),
|
||||
)
|
||||
.ok();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -179,6 +195,9 @@ pub async fn db_writer(
|
||||
"no verification records found for pubkey: {:?}",
|
||||
event.get_author_prefix()
|
||||
);
|
||||
notice_tx
|
||||
.try_send("NIP-05 verification needed to publish events".to_owned())
|
||||
.ok();
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -189,24 +208,41 @@ pub async fn db_writer(
|
||||
}
|
||||
// TODO: cache recent list of authors to remove a DB call.
|
||||
let start = Instant::now();
|
||||
match write_event(&mut pool.get()?, &event) {
|
||||
Ok(updated) => {
|
||||
if updated == 0 {
|
||||
trace!("ignoring duplicate event");
|
||||
} else {
|
||||
info!(
|
||||
"persisted event {:?} from {:?} in {:?}",
|
||||
event.get_event_id_prefix(),
|
||||
event.get_author_prefix(),
|
||||
start.elapsed()
|
||||
);
|
||||
event_write = true;
|
||||
// send this out to all clients
|
||||
bcast_tx.send(event.clone()).ok();
|
||||
if event.kind >= 20000 && event.kind < 30000 {
|
||||
info!(
|
||||
"published ephemeral event {:?} from {:?} in {:?}",
|
||||
event.get_event_id_prefix(),
|
||||
event.get_author_prefix(),
|
||||
start.elapsed()
|
||||
);
|
||||
bcast_tx.send(event.clone()).ok();
|
||||
event_write = true
|
||||
} else {
|
||||
match write_event(&mut pool.get()?, &event) {
|
||||
Ok(updated) => {
|
||||
if updated == 0 {
|
||||
trace!("ignoring duplicate event");
|
||||
} else {
|
||||
info!(
|
||||
"persisted event {:?} from {:?} in {:?}",
|
||||
event.get_event_id_prefix(),
|
||||
event.get_author_prefix(),
|
||||
start.elapsed()
|
||||
);
|
||||
event_write = true;
|
||||
// send this out to all clients
|
||||
bcast_tx.send(event.clone()).ok();
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("event insert failed: {:?}", err);
|
||||
notice_tx
|
||||
.try_send(
|
||||
"relay experienced an error trying to publish the latest event"
|
||||
.to_owned(),
|
||||
)
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("event insert failed: {:?}", err);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -264,49 +300,66 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
|
||||
if tag.len() >= 2 {
|
||||
let tagname = &tag[0];
|
||||
let tagval = &tag[1];
|
||||
// if tagvalue is hex;
|
||||
if is_hex(tagval) {
|
||||
tx.execute(
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
|
||||
params![ev_id, &tagname, hex::decode(&tagval).ok()],
|
||||
)?;
|
||||
} else {
|
||||
tx.execute(
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)",
|
||||
params![ev_id, &tagname, &tagval],
|
||||
)?;
|
||||
// only single-char tags are searchable
|
||||
let tagchar_opt = single_char_tagname(tagname);
|
||||
match &tagchar_opt {
|
||||
Some(_) => {
|
||||
// if tagvalue is lowercase hex;
|
||||
if is_lower_hex(&tagval) && (tagval.len() % 2 == 0) {
|
||||
tx.execute(
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
|
||||
params![ev_id, &tagname, hex::decode(&tagval).ok()],
|
||||
)?;
|
||||
} else {
|
||||
tx.execute(
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)",
|
||||
params![ev_id, &tagname, &tagval],
|
||||
)?;
|
||||
}
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
// if this event is for a metadata update, hide every other kind=0
|
||||
// event from the same author that was issued earlier than this.
|
||||
if e.kind == 0 {
|
||||
// if this event is replaceable update, hide every other replaceable
|
||||
// event with the same kind from the same author that was issued
|
||||
// earlier than this.
|
||||
if e.kind == 0 || e.kind == 3 || (e.kind >= 10000 && e.kind < 20000) {
|
||||
let update_count = tx.execute(
|
||||
"UPDATE event SET hidden=TRUE WHERE id!=? AND kind=0 AND author=? AND created_at <= ? and hidden!=TRUE",
|
||||
params![ev_id, hex::decode(&e.pubkey).ok(), e.created_at],
|
||||
"UPDATE event SET hidden=TRUE WHERE id!=? AND kind=? AND author=? AND created_at <= ? and hidden!=TRUE",
|
||||
params![ev_id, e.kind, hex::decode(&e.pubkey).ok(), e.created_at],
|
||||
)?;
|
||||
if update_count > 0 {
|
||||
info!(
|
||||
"hid {} older metadata events for author {:?}",
|
||||
"hid {} older replaceable kind {} events for author {:?}",
|
||||
update_count,
|
||||
e.kind,
|
||||
e.get_author_prefix()
|
||||
);
|
||||
}
|
||||
}
|
||||
// if this event is for a contact update, hide every other kind=3
|
||||
// event from the same author that was issued earlier than this.
|
||||
if e.kind == 3 {
|
||||
let update_count = tx.execute(
|
||||
"UPDATE event SET hidden=TRUE WHERE id!=? AND kind=3 AND author=? AND created_at <= ? and hidden!=TRUE",
|
||||
params![ev_id, hex::decode(&e.pubkey).ok(), e.created_at],
|
||||
)?;
|
||||
if update_count > 0 {
|
||||
info!(
|
||||
"hid {} older contact events for author {:?}",
|
||||
update_count,
|
||||
e.get_author_prefix()
|
||||
);
|
||||
}
|
||||
// if this event is a deletion, hide the referenced events from the same author.
|
||||
if e.kind == 5 {
|
||||
let event_candidates = e.tag_values_by_name("e");
|
||||
let mut params: Vec<Box<dyn ToSql>> = vec![];
|
||||
// first parameter will be author
|
||||
params.push(Box::new(hex::decode(&e.pubkey)?));
|
||||
event_candidates
|
||||
.iter()
|
||||
.filter(|x| is_hex(x) && x.len() == 64)
|
||||
.filter_map(|x| hex::decode(x).ok())
|
||||
.for_each(|x| params.push(Box::new(x)));
|
||||
let query = format!(
|
||||
"UPDATE event SET hidden=TRUE WHERE author=? AND event_hash IN ({})",
|
||||
repeat_vars(params.len() - 1)
|
||||
);
|
||||
let mut stmt = tx.prepare(&query)?;
|
||||
let update_count = stmt.execute(rusqlite::params_from_iter(params))?;
|
||||
info!(
|
||||
"hid {} deleted events for author {:?}",
|
||||
update_count,
|
||||
e.get_author_prefix()
|
||||
);
|
||||
}
|
||||
tx.commit()?;
|
||||
Ok(ins_count)
|
||||
@@ -332,142 +385,166 @@ fn repeat_vars(count: usize) -> String {
|
||||
s
|
||||
}
|
||||
|
||||
/// Create a dynamic SQL query string and params from a subscription.
|
||||
fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
/// Create a dynamic SQL subquery and params from a subscription filter.
|
||||
fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
// build a dynamic SQL query. all user-input is either an integer
|
||||
// (sqli-safe), or a string that is filtered to only contain
|
||||
// hexadecimal characters. Strings that require escaping (tag
|
||||
// names/values) use parameters.
|
||||
let mut query =
|
||||
"SELECT DISTINCT(e.content) FROM event e LEFT JOIN tag t ON e.id=t.event_id ".to_owned();
|
||||
// parameters
|
||||
|
||||
// if the filter is malformed, don't return anything.
|
||||
if f.force_no_match {
|
||||
let empty_query =
|
||||
"SELECT DISTINCT(e.content), e.created_at FROM event e WHERE 1=0".to_owned();
|
||||
// query parameters for SQLite
|
||||
let empty_params: Vec<Box<dyn ToSql>> = vec![];
|
||||
return (empty_query, empty_params);
|
||||
}
|
||||
|
||||
let mut query = "SELECT DISTINCT(e.content), e.created_at FROM event e ".to_owned();
|
||||
// query parameters for SQLite
|
||||
let mut params: Vec<Box<dyn ToSql>> = vec![];
|
||||
|
||||
// for every filter in the subscription, generate a where clause
|
||||
let mut filter_clauses: Vec<String> = Vec::new();
|
||||
for f in sub.filters.iter() {
|
||||
// individual filter components
|
||||
let mut filter_components: Vec<String> = Vec::new();
|
||||
// Query for "authors", allowing prefix matches
|
||||
if let Some(authvec) = &f.authors {
|
||||
// take each author and convert to a hexsearch
|
||||
let mut auth_searches: Vec<String> = vec![];
|
||||
for auth in authvec {
|
||||
match hex_range(auth) {
|
||||
Some(HexSearch::Exact(ex)) => {
|
||||
auth_searches.push("author=?".to_owned());
|
||||
params.push(Box::new(ex));
|
||||
}
|
||||
Some(HexSearch::Range(lower, upper)) => {
|
||||
auth_searches.push("(author>? AND author<?)".to_owned());
|
||||
params.push(Box::new(lower));
|
||||
params.push(Box::new(upper));
|
||||
}
|
||||
Some(HexSearch::LowerOnly(lower)) => {
|
||||
auth_searches.push("author>?".to_owned());
|
||||
params.push(Box::new(lower));
|
||||
}
|
||||
None => {
|
||||
info!("Could not parse hex range from author {:?}", auth);
|
||||
}
|
||||
// individual filter components (single conditions such as an author or event ID)
|
||||
let mut filter_components: Vec<String> = Vec::new();
|
||||
// Query for "authors", allowing prefix matches
|
||||
if let Some(authvec) = &f.authors {
|
||||
// take each author and convert to a hexsearch
|
||||
let mut auth_searches: Vec<String> = vec![];
|
||||
for auth in authvec {
|
||||
match hex_range(auth) {
|
||||
Some(HexSearch::Exact(ex)) => {
|
||||
auth_searches.push("author=?".to_owned());
|
||||
params.push(Box::new(ex));
|
||||
}
|
||||
Some(HexSearch::Range(lower, upper)) => {
|
||||
auth_searches.push("(author>? AND author<?)".to_owned());
|
||||
params.push(Box::new(lower));
|
||||
params.push(Box::new(upper));
|
||||
}
|
||||
Some(HexSearch::LowerOnly(lower)) => {
|
||||
auth_searches.push("author>?".to_owned());
|
||||
params.push(Box::new(lower));
|
||||
}
|
||||
None => {
|
||||
info!("Could not parse hex range from author {:?}", auth);
|
||||
}
|
||||
}
|
||||
let authors_clause = format!("({})", auth_searches.join(" OR "));
|
||||
filter_components.push(authors_clause);
|
||||
}
|
||||
// Query for Kind
|
||||
if let Some(ks) = &f.kinds {
|
||||
// kind is number, no escaping needed
|
||||
let str_kinds: Vec<String> = ks.iter().map(|x| x.to_string()).collect();
|
||||
let kind_clause = format!("kind IN ({})", str_kinds.join(", "));
|
||||
filter_components.push(kind_clause);
|
||||
}
|
||||
// Query for event, allowing prefix matches
|
||||
if let Some(idvec) = &f.ids {
|
||||
// take each author and convert to a hexsearch
|
||||
let mut id_searches: Vec<String> = vec![];
|
||||
for id in idvec {
|
||||
match hex_range(id) {
|
||||
Some(HexSearch::Exact(ex)) => {
|
||||
id_searches.push("event_hash=?".to_owned());
|
||||
params.push(Box::new(ex));
|
||||
}
|
||||
Some(HexSearch::Range(lower, upper)) => {
|
||||
id_searches.push("(event_hash>? AND event_hash<?)".to_owned());
|
||||
params.push(Box::new(lower));
|
||||
params.push(Box::new(upper));
|
||||
}
|
||||
Some(HexSearch::LowerOnly(lower)) => {
|
||||
id_searches.push("event_hash>?".to_owned());
|
||||
params.push(Box::new(lower));
|
||||
}
|
||||
None => {
|
||||
info!("Could not parse hex range from id {:?}", id);
|
||||
}
|
||||
let authors_clause = format!("({})", auth_searches.join(" OR "));
|
||||
filter_components.push(authors_clause);
|
||||
}
|
||||
// Query for Kind
|
||||
if let Some(ks) = &f.kinds {
|
||||
// kind is number, no escaping needed
|
||||
let str_kinds: Vec<String> = ks.iter().map(|x| x.to_string()).collect();
|
||||
let kind_clause = format!("kind IN ({})", str_kinds.join(", "));
|
||||
filter_components.push(kind_clause);
|
||||
}
|
||||
// Query for event, allowing prefix matches
|
||||
if let Some(idvec) = &f.ids {
|
||||
// take each author and convert to a hexsearch
|
||||
let mut id_searches: Vec<String> = vec![];
|
||||
for id in idvec {
|
||||
match hex_range(id) {
|
||||
Some(HexSearch::Exact(ex)) => {
|
||||
id_searches.push("event_hash=?".to_owned());
|
||||
params.push(Box::new(ex));
|
||||
}
|
||||
Some(HexSearch::Range(lower, upper)) => {
|
||||
id_searches.push("(event_hash>? AND event_hash<?)".to_owned());
|
||||
params.push(Box::new(lower));
|
||||
params.push(Box::new(upper));
|
||||
}
|
||||
Some(HexSearch::LowerOnly(lower)) => {
|
||||
id_searches.push("event_hash>?".to_owned());
|
||||
params.push(Box::new(lower));
|
||||
}
|
||||
None => {
|
||||
info!("Could not parse hex range from id {:?}", id);
|
||||
}
|
||||
}
|
||||
let id_clause = format!("({})", id_searches.join(" OR "));
|
||||
filter_components.push(id_clause);
|
||||
}
|
||||
// Query for tags
|
||||
if let Some(map) = &f.tags {
|
||||
for (key, val) in map.iter() {
|
||||
let mut str_vals: Vec<Box<dyn ToSql>> = vec![];
|
||||
let mut blob_vals: Vec<Box<dyn ToSql>> = vec![];
|
||||
for v in val {
|
||||
if is_hex(v) {
|
||||
if let Ok(h) = hex::decode(&v) {
|
||||
blob_vals.push(Box::new(h));
|
||||
}
|
||||
} else {
|
||||
str_vals.push(Box::new(v.to_owned()));
|
||||
let id_clause = format!("({})", id_searches.join(" OR "));
|
||||
filter_components.push(id_clause);
|
||||
}
|
||||
// Query for tags
|
||||
if let Some(map) = &f.tags {
|
||||
for (key, val) in map.iter() {
|
||||
let mut str_vals: Vec<Box<dyn ToSql>> = vec![];
|
||||
let mut blob_vals: Vec<Box<dyn ToSql>> = vec![];
|
||||
for v in val {
|
||||
if (v.len()%2==0) && is_lower_hex(v) {
|
||||
if let Ok(h) = hex::decode(&v) {
|
||||
blob_vals.push(Box::new(h));
|
||||
}
|
||||
} else {
|
||||
str_vals.push(Box::new(v.to_owned()));
|
||||
}
|
||||
// create clauses with "?" params for each tag value being searched
|
||||
let str_clause = format!("value IN ({})", repeat_vars(str_vals.len()));
|
||||
let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
|
||||
let tag_clause = format!("(name=? AND ({} OR {}))", str_clause, blob_clause);
|
||||
// add the tag name as the first parameter
|
||||
params.push(Box::new(key.to_owned()));
|
||||
// add all tag values that are plain strings as params
|
||||
params.append(&mut str_vals);
|
||||
// add all tag values that are blobs as params
|
||||
params.append(&mut blob_vals);
|
||||
filter_components.push(tag_clause);
|
||||
}
|
||||
}
|
||||
// Query for timestamp
|
||||
if f.since.is_some() {
|
||||
let created_clause = format!("created_at > {}", f.since.unwrap());
|
||||
filter_components.push(created_clause);
|
||||
}
|
||||
// Query for timestamp
|
||||
if f.until.is_some() {
|
||||
let until_clause = format!("created_at < {}", f.until.unwrap());
|
||||
filter_components.push(until_clause);
|
||||
}
|
||||
|
||||
// combine all clauses, and add to filter_clauses
|
||||
if !filter_components.is_empty() {
|
||||
let mut fc = "( ".to_owned();
|
||||
fc.push_str(&filter_components.join(" AND "));
|
||||
fc.push_str(" )");
|
||||
filter_clauses.push(fc);
|
||||
// create clauses with "?" params for each tag value being searched
|
||||
let str_clause = format!("value IN ({})", repeat_vars(str_vals.len()));
|
||||
let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
|
||||
// find evidence of the target tag name/value existing for this event.
|
||||
let tag_clause = format!("e.id IN (SELECT e.id FROM event e LEFT JOIN tag t on e.id=t.event_id WHERE hidden!=TRUE and (name=? AND ({} OR {})))", str_clause, blob_clause);
|
||||
// add the tag name as the first parameter
|
||||
params.push(Box::new(key.to_string()));
|
||||
// add all tag values that are plain strings as params
|
||||
params.append(&mut str_vals);
|
||||
// add all tag values that are blobs as params
|
||||
params.append(&mut blob_vals);
|
||||
filter_components.push(tag_clause);
|
||||
}
|
||||
}
|
||||
|
||||
// Query for timestamp
|
||||
if f.since.is_some() {
|
||||
let created_clause = format!("created_at > {}", f.since.unwrap());
|
||||
filter_components.push(created_clause);
|
||||
}
|
||||
// Query for timestamp
|
||||
if f.until.is_some() {
|
||||
let until_clause = format!("created_at < {}", f.until.unwrap());
|
||||
filter_components.push(until_clause);
|
||||
}
|
||||
// never display hidden events
|
||||
query.push_str(" WHERE hidden!=TRUE ");
|
||||
|
||||
// combine all filters with OR clauses, if any exist
|
||||
if !filter_clauses.is_empty() {
|
||||
query.push_str(" AND (");
|
||||
query.push_str(&filter_clauses.join(" OR "));
|
||||
query.push_str(") ");
|
||||
query.push_str(" WHERE hidden!=TRUE");
|
||||
// build filter component conditions
|
||||
if !filter_components.is_empty() {
|
||||
query.push_str(" AND ");
|
||||
query.push_str(&filter_components.join(" AND "));
|
||||
}
|
||||
// add order clause
|
||||
query.push_str(" ORDER BY created_at ASC");
|
||||
debug!("query string: {}", query);
|
||||
// Apply per-filter limit to this subquery.
|
||||
// The use of a LIMIT implies a DESC order, to capture only the most recent events.
|
||||
if let Some(lim) = f.limit {
|
||||
query.push_str(&format!(" ORDER BY e.created_at DESC LIMIT {}", lim))
|
||||
} else {
|
||||
query.push_str(" ORDER BY e.created_at ASC")
|
||||
}
|
||||
(query, params)
|
||||
}
|
||||
|
||||
/// Create a dynamic SQL query string and params from a subscription.
|
||||
fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
// build a dynamic SQL query for an entire subscription, based on
|
||||
// SQL subqueries for filters.
|
||||
let mut subqueries: Vec<String> = Vec::new();
|
||||
// subquery params
|
||||
let mut params: Vec<Box<dyn ToSql>> = vec![];
|
||||
// for every filter in the subscription, generate a subquery
|
||||
for f in sub.filters.iter() {
|
||||
let (f_subquery, mut f_params) = query_from_filter(&f);
|
||||
subqueries.push(f_subquery);
|
||||
params.append(&mut f_params);
|
||||
}
|
||||
// encapsulate subqueries into select statements
|
||||
let subqueries_selects: Vec<String> = subqueries
|
||||
.iter()
|
||||
.map(|s| {
|
||||
return format!("SELECT content, created_at FROM ({})", s);
|
||||
})
|
||||
.collect();
|
||||
let query: String = subqueries_selects.join(" UNION ");
|
||||
info!("final query string: {}", query);
|
||||
(query, params)
|
||||
}
|
||||
|
||||
@@ -479,7 +556,8 @@ fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
/// query is immediately aborted.
|
||||
pub async fn db_query(
|
||||
sub: Subscription,
|
||||
conn: PooledConnection,
|
||||
client_id: String,
|
||||
pool: SqlitePool,
|
||||
query_tx: tokio::sync::mpsc::Sender<QueryResult>,
|
||||
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
|
||||
) {
|
||||
@@ -489,29 +567,56 @@ pub async fn db_query(
|
||||
let start = Instant::now();
|
||||
// generate SQL query
|
||||
let (q, p) = query_from_sub(&sub);
|
||||
// execute the query. Don't cache, since queries vary so much.
|
||||
let mut stmt = conn.prepare(&q)?;
|
||||
let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
|
||||
while let Some(row) = event_rows.next()? {
|
||||
// check if this is still active (we could do this every N rows)
|
||||
if abandon_query_rx.try_recv().is_ok() {
|
||||
debug!("query aborted");
|
||||
return Ok(());
|
||||
debug!("SQL generated in {:?}", start.elapsed());
|
||||
// show pool stats
|
||||
debug!("DB pool stats: {:?}", pool.state());
|
||||
let start = Instant::now();
|
||||
if let Ok(conn) = pool.get() {
|
||||
// execute the query. Don't cache, since queries vary so much.
|
||||
let mut stmt = conn.prepare(&q)?;
|
||||
let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
|
||||
let mut first_result = true;
|
||||
while let Some(row) = event_rows.next()? {
|
||||
if first_result {
|
||||
debug!(
|
||||
"time to first result: {:?} (cid={}, sub={:?})",
|
||||
start.elapsed(),
|
||||
client_id,
|
||||
sub.id
|
||||
);
|
||||
first_result = false;
|
||||
}
|
||||
// check if this is still active
|
||||
// TODO: check every N rows
|
||||
if abandon_query_rx.try_recv().is_ok() {
|
||||
debug!("query aborted (sub={:?})", sub.id);
|
||||
return Ok(());
|
||||
}
|
||||
row_count += 1;
|
||||
let event_json = row.get(0)?;
|
||||
query_tx
|
||||
.blocking_send(QueryResult {
|
||||
sub_id: sub.get_id(),
|
||||
event: event_json,
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
row_count += 1;
|
||||
let event_json = row.get(0)?;
|
||||
query_tx
|
||||
.blocking_send(QueryResult {
|
||||
sub_id: sub.get_id(),
|
||||
event: event_json,
|
||||
event: "EOSE".to_string(),
|
||||
})
|
||||
.ok();
|
||||
debug!(
|
||||
"query completed ({} rows) in {:?} (cid={}, sub={:?})",
|
||||
row_count,
|
||||
start.elapsed(),
|
||||
client_id,
|
||||
sub.id
|
||||
);
|
||||
} else {
|
||||
warn!("Could not get a database connection for querying");
|
||||
}
|
||||
debug!(
|
||||
"query completed ({} rows) in {:?}",
|
||||
row_count,
|
||||
start.elapsed()
|
||||
);
|
||||
let ok: Result<()> = Ok(());
|
||||
ok
|
||||
});
|
||||
|
@@ -48,6 +48,8 @@ pub enum Error {
|
||||
JoinError,
|
||||
#[error("Hyper Client error")]
|
||||
HyperError(hyper::Error),
|
||||
#[error("Hex encoding error")]
|
||||
HexError(hex::FromHexError),
|
||||
#[error("Unknown/Undocumented")]
|
||||
UnknownError,
|
||||
}
|
||||
@@ -58,6 +60,12 @@ pub enum Error {
|
||||
// }
|
||||
//}
|
||||
|
||||
impl From<hex::FromHexError> for Error {
|
||||
fn from(h: hex::FromHexError) -> Self {
|
||||
Error::HexError(h)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<hyper::Error> for Error {
|
||||
fn from(h: hyper::Error) -> Self {
|
||||
Error::HyperError(h)
|
||||
|
87
src/event.rs
87
src/event.rs
@@ -39,9 +39,9 @@ pub struct Event {
|
||||
pub(crate) tags: Vec<Vec<String>>,
|
||||
pub(crate) content: String,
|
||||
pub(crate) sig: String,
|
||||
// Optimization for tag search, built on demand
|
||||
// Optimization for tag search, built on demand.
|
||||
#[serde(skip)]
|
||||
pub(crate) tagidx: Option<HashMap<String, HashSet<String>>>,
|
||||
pub(crate) tagidx: Option<HashMap<char, HashSet<String>>>,
|
||||
}
|
||||
|
||||
/// Simple tag type for array of array of strings.
|
||||
@@ -56,6 +56,25 @@ where
|
||||
Ok(opt.unwrap_or_else(Vec::new))
|
||||
}
|
||||
|
||||
/// Attempt to form a single-char tag name.
|
||||
pub fn single_char_tagname(tagname: &str) -> Option<char> {
|
||||
// We return the tag character if and only if the tagname consists
|
||||
// of a single char.
|
||||
let mut tagnamechars = tagname.chars();
|
||||
let firstchar = tagnamechars.next();
|
||||
return match firstchar {
|
||||
Some(_) => {
|
||||
// check second char
|
||||
if tagnamechars.next().is_none() {
|
||||
firstchar
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
}
|
||||
|
||||
/// Convert network event to parsed/validated event.
|
||||
impl From<EventCmd> for Result<Event> {
|
||||
fn from(ec: EventCmd) -> Result<Event> {
|
||||
@@ -99,17 +118,22 @@ impl Event {
|
||||
return;
|
||||
}
|
||||
// otherwise, build an index
|
||||
let mut idx: HashMap<String, HashSet<String>> = HashMap::new();
|
||||
let mut idx: HashMap<char, HashSet<String>> = HashMap::new();
|
||||
// iterate over tags that have at least 2 elements
|
||||
for t in self.tags.iter().filter(|x| x.len() > 1) {
|
||||
let tagname = t.get(0).unwrap();
|
||||
let tagnamechar_opt = single_char_tagname(tagname);
|
||||
if tagnamechar_opt.is_none() {
|
||||
continue;
|
||||
}
|
||||
let tagnamechar = tagnamechar_opt.unwrap();
|
||||
let tagval = t.get(1).unwrap();
|
||||
// ensure a vector exists for this tag
|
||||
if !idx.contains_key(tagname) {
|
||||
idx.insert(tagname.clone(), HashSet::new());
|
||||
if !idx.contains_key(&tagnamechar) {
|
||||
idx.insert(tagnamechar.clone(), HashSet::new());
|
||||
}
|
||||
// get the tag vec and insert entry
|
||||
let tidx = idx.get_mut(tagname).expect("could not get tag vector");
|
||||
let tidx = idx.get_mut(&tagnamechar).expect("could not get tag vector");
|
||||
tidx.insert(tagval.clone());
|
||||
}
|
||||
// save the tag structure
|
||||
@@ -124,6 +148,16 @@ impl Event {
|
||||
self.pubkey.chars().take(8).collect()
|
||||
}
|
||||
|
||||
/// Retrieve tag values
|
||||
pub fn tag_values_by_name(&self, tag_name: &str) -> Vec<String> {
|
||||
self.tags
|
||||
.iter()
|
||||
.filter(|x| x.len() > 1)
|
||||
.filter(|x| x.get(0).unwrap() == tag_name)
|
||||
.map(|x| x.get(1).unwrap().to_owned())
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Check if this event has a valid signature.
|
||||
fn is_valid(&self) -> bool {
|
||||
// TODO: return a Result with a reason for invalid events
|
||||
@@ -136,7 +170,7 @@ impl Event {
|
||||
if curr_time + (allowable_future as u64) < self.created_at {
|
||||
let delta = self.created_at - curr_time;
|
||||
debug!(
|
||||
"Event is too far in the future ({} seconds), rejecting",
|
||||
"event is too far in the future ({} seconds), rejecting",
|
||||
delta
|
||||
);
|
||||
return false;
|
||||
@@ -169,11 +203,11 @@ impl Event {
|
||||
let verify = SECP.verify_schnorr(&sig, &msg, &pubkey);
|
||||
matches!(verify, Ok(()))
|
||||
} else {
|
||||
debug!("Client sent malformed pubkey");
|
||||
debug!("client sent malformed pubkey");
|
||||
false
|
||||
}
|
||||
} else {
|
||||
info!("Error converting digest to secp256k1 message");
|
||||
info!("error converting digest to secp256k1 message");
|
||||
false
|
||||
}
|
||||
}
|
||||
@@ -216,9 +250,10 @@ impl Event {
|
||||
}
|
||||
|
||||
/// Determine if the given tag and value set intersect with tags in this event.
|
||||
pub fn generic_tag_val_intersect(&self, tagname: &str, check: &HashSet<String>) -> bool {
|
||||
pub fn generic_tag_val_intersect(&self, tagname: char, check: &HashSet<String>) -> bool {
|
||||
match &self.tagidx {
|
||||
Some(idx) => match idx.get(tagname) {
|
||||
// check if this is indexable tagname
|
||||
Some(idx) => match idx.get(&tagname) {
|
||||
Some(valset) => {
|
||||
let common = valset.intersection(check);
|
||||
common.count() > 0
|
||||
@@ -266,7 +301,7 @@ mod tests {
|
||||
fn empty_event_tag_match() -> Result<()> {
|
||||
let event = simple_event();
|
||||
assert!(!event
|
||||
.generic_tag_val_intersect("e", &HashSet::from(["foo".to_owned(), "bar".to_owned()])));
|
||||
.generic_tag_val_intersect('e', &HashSet::from(["foo".to_owned(), "bar".to_owned()])));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -277,7 +312,7 @@ mod tests {
|
||||
event.build_index();
|
||||
assert_eq!(
|
||||
event.generic_tag_val_intersect(
|
||||
"e",
|
||||
'e',
|
||||
&HashSet::from(["foo".to_owned(), "bar".to_owned()])
|
||||
),
|
||||
true
|
||||
@@ -332,6 +367,32 @@ mod tests {
|
||||
assert_eq!(c, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_tag_select() {
|
||||
let e = Event {
|
||||
id: "999".to_owned(),
|
||||
pubkey: "012345".to_owned(),
|
||||
created_at: 501234,
|
||||
kind: 1,
|
||||
tags: vec![
|
||||
vec!["j".to_owned(), "abc".to_owned()],
|
||||
vec!["e".to_owned(), "foo".to_owned()],
|
||||
vec!["e".to_owned(), "bar".to_owned()],
|
||||
vec!["e".to_owned(), "baz".to_owned()],
|
||||
vec![
|
||||
"p".to_owned(),
|
||||
"aaaa".to_owned(),
|
||||
"ws://example.com".to_owned(),
|
||||
],
|
||||
],
|
||||
content: "this is a test".to_owned(),
|
||||
sig: "abcde".to_owned(),
|
||||
tagidx: None,
|
||||
};
|
||||
let v = e.tag_values_by_name("e");
|
||||
assert_eq!(v, vec!["foo", "bar", "baz"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_canonical_with_tags() {
|
||||
let e = Event {
|
||||
|
@@ -80,6 +80,7 @@ pub fn hex_range(s: &str) -> Option<HexSearch> {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::error::Result;
|
||||
|
||||
#[test]
|
||||
fn hex_range_exact() -> Result<()> {
|
||||
|
@@ -35,7 +35,7 @@ impl From<config::Info> for RelayInfo {
|
||||
description: i.description,
|
||||
pubkey: i.pubkey,
|
||||
contact: i.contact,
|
||||
supported_nips: Some(vec![1, 2, 11]),
|
||||
supported_nips: Some(vec![1, 2, 11, 15, 16]),
|
||||
software: Some("https://git.sr.ht/~gheartsfield/nostr-rs-relay".to_owned()),
|
||||
version: CARGO_PKG_VERSION.map(|x| x.to_owned()),
|
||||
}
|
||||
|
@@ -2,12 +2,11 @@ pub mod close;
|
||||
pub mod config;
|
||||
pub mod conn;
|
||||
pub mod db;
|
||||
pub mod schema;
|
||||
pub mod error;
|
||||
pub mod event;
|
||||
pub mod hexrange;
|
||||
pub mod info;
|
||||
pub mod nip05;
|
||||
pub mod protostream;
|
||||
pub mod schema;
|
||||
pub mod subscription;
|
||||
pub mod utils;
|
||||
|
213
src/main.rs
213
src/main.rs
@@ -9,27 +9,34 @@ use hyper::{
|
||||
};
|
||||
use log::*;
|
||||
use nostr_rs_relay::close::Close;
|
||||
use nostr_rs_relay::close::CloseCmd;
|
||||
use nostr_rs_relay::config;
|
||||
use nostr_rs_relay::conn;
|
||||
use nostr_rs_relay::db;
|
||||
use nostr_rs_relay::db::SubmittedEvent;
|
||||
use nostr_rs_relay::error::{Error, Result};
|
||||
use nostr_rs_relay::event::Event;
|
||||
use nostr_rs_relay::event::EventCmd;
|
||||
use nostr_rs_relay::info::RelayInfo;
|
||||
use nostr_rs_relay::nip05;
|
||||
use nostr_rs_relay::protostream;
|
||||
use nostr_rs_relay::protostream::NostrMessage::*;
|
||||
use nostr_rs_relay::protostream::NostrResponse::*;
|
||||
use nostr_rs_relay::subscription::Subscription;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use std::collections::HashMap;
|
||||
use std::convert::Infallible;
|
||||
use std::env;
|
||||
use std::net::SocketAddr;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tokio::runtime::Builder;
|
||||
use tokio::sync::broadcast::{self, Receiver, Sender};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
use tungstenite::error::Error as WsError;
|
||||
use tungstenite::handshake;
|
||||
use tungstenite::protocol::Message;
|
||||
use tungstenite::protocol::WebSocketConfig;
|
||||
|
||||
/// Return a requested DB name from command line arguments.
|
||||
@@ -46,7 +53,7 @@ async fn handle_web_request(
|
||||
pool: db::SqlitePool,
|
||||
remote_addr: SocketAddr,
|
||||
broadcast: Sender<Event>,
|
||||
event_tx: tokio::sync::mpsc::Sender<Event>,
|
||||
event_tx: tokio::sync::mpsc::Sender<SubmittedEvent>,
|
||||
shutdown: Receiver<()>,
|
||||
) -> Result<Response<Body>, Infallible> {
|
||||
match (
|
||||
@@ -55,7 +62,7 @@ async fn handle_web_request(
|
||||
) {
|
||||
// Request for / as websocket
|
||||
("/", true) => {
|
||||
debug!("websocket with upgrade request");
|
||||
trace!("websocket with upgrade request");
|
||||
//assume request is a handshake, so create the handshake response
|
||||
let response = match handshake::server::create_response_with_body(&request, || {
|
||||
Body::empty()
|
||||
@@ -127,14 +134,17 @@ async fn handle_web_request(
|
||||
return Ok(Response::builder()
|
||||
.status(200)
|
||||
.header("Content-Type", "application/nostr+json")
|
||||
.header("Access-Control-Allow-Origin", "*")
|
||||
.body(b)
|
||||
.unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Response::new(Body::from(
|
||||
"Please use a Nostr client to connect.",
|
||||
)))
|
||||
Ok(Response::builder()
|
||||
.status(200)
|
||||
.header("Content-Type", "text/plain")
|
||||
.body(Body::from("Please use a Nostr client to connect."))
|
||||
.unwrap())
|
||||
}
|
||||
(_, _) => {
|
||||
//handle any other url
|
||||
@@ -227,7 +237,8 @@ fn main() -> Result<(), Error> {
|
||||
let (bcast_tx, _) = broadcast::channel::<Event>(settings.limits.broadcast_buffer);
|
||||
// validated events that need to be persisted are sent to the
|
||||
// database on via this channel.
|
||||
let (event_tx, event_rx) = mpsc::channel::<Event>(settings.limits.event_persist_buffer);
|
||||
let (event_tx, event_rx) =
|
||||
mpsc::channel::<SubmittedEvent>(settings.limits.event_persist_buffer);
|
||||
// establish a channel for letting all threads now about a
|
||||
// requested server shutdown.
|
||||
let (invoke_shutdown, shutdown_listen) = broadcast::channel::<()>(1);
|
||||
@@ -312,32 +323,84 @@ fn main() -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Nostr protocol messages from a client
|
||||
#[derive(Deserialize, Serialize, Clone, PartialEq, Debug)]
|
||||
#[serde(untagged)]
|
||||
pub enum NostrMessage {
|
||||
/// An `EVENT` message
|
||||
EventMsg(EventCmd),
|
||||
/// A `REQ` message
|
||||
SubMsg(Subscription),
|
||||
/// A `CLOSE` message
|
||||
CloseMsg(CloseCmd),
|
||||
}
|
||||
|
||||
/// Convert Message to NostrMessage
|
||||
fn convert_to_msg(msg: String) -> Result<NostrMessage> {
|
||||
let config = config::SETTINGS.read().unwrap();
|
||||
let parsed_res: Result<NostrMessage> = serde_json::from_str(&msg).map_err(|e| e.into());
|
||||
match parsed_res {
|
||||
Ok(m) => {
|
||||
if let NostrMessage::EventMsg(_) = m {
|
||||
if let Some(max_size) = config.limits.max_event_bytes {
|
||||
// check length, ensure that some max size is set.
|
||||
if msg.len() > max_size && max_size > 0 {
|
||||
return Err(Error::EventMaxLengthError(msg.len()));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(m)
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("proto parse error: {:?}", e);
|
||||
debug!("parse error on message: {}", msg.trim());
|
||||
Err(Error::ProtoParseError)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Turn a string into a NOTICE message ready to send over a WebSocket
|
||||
fn make_notice_message(msg: &str) -> Message {
|
||||
Message::text(json!(["NOTICE", msg]).to_string())
|
||||
}
|
||||
|
||||
/// Handle new client connections. This runs through an event loop
|
||||
/// for all client communication.
|
||||
async fn nostr_server(
|
||||
pool: db::SqlitePool,
|
||||
ws_stream: WebSocketStream<Upgraded>,
|
||||
mut ws_stream: WebSocketStream<Upgraded>,
|
||||
broadcast: Sender<Event>,
|
||||
event_tx: tokio::sync::mpsc::Sender<Event>,
|
||||
event_tx: mpsc::Sender<SubmittedEvent>,
|
||||
mut shutdown: Receiver<()>,
|
||||
) {
|
||||
// get a broadcast channel for clients to communicate on
|
||||
let mut bcast_rx = broadcast.subscribe();
|
||||
// upgrade the TCP connection to WebSocket
|
||||
//let conn = tokio_tungstenite::accept_async_with_config(stream, Some(config)).await;
|
||||
//let ws_stream = conn.expect("websocket handshake error");
|
||||
// wrap websocket into a stream & sink of Nostr protocol messages
|
||||
let mut nostr_stream = protostream::wrap_ws_in_nostr(ws_stream);
|
||||
// Track internal client state
|
||||
let mut conn = conn::ClientConn::new();
|
||||
let cid = conn.get_client_prefix();
|
||||
// Create a channel for receiving query results from the database.
|
||||
// we will send out the tx handle to any query we generate.
|
||||
let (query_tx, mut query_rx) = mpsc::channel::<db::QueryResult>(256);
|
||||
// Create channel for receiving NOTICEs
|
||||
let (notice_tx, mut notice_rx) = mpsc::channel::<String>(32);
|
||||
|
||||
// last time this client sent data (message, ping, etc.)
|
||||
let mut last_message_time = Instant::now();
|
||||
|
||||
// ping interval (every 5 minutes)
|
||||
let default_ping_dur = Duration::from_secs(300);
|
||||
|
||||
// disconnect after 20 minutes without a ping response or event.
|
||||
let max_quiet_time = Duration::from_secs(60 * 20);
|
||||
|
||||
let start = tokio::time::Instant::now() + default_ping_dur;
|
||||
let mut ping_interval = tokio::time::interval_at(start, default_ping_dur);
|
||||
|
||||
// maintain a hashmap of a oneshot channel for active subscriptions.
|
||||
// when these subscriptions are cancelled, make a message
|
||||
// available to the executing query so it knows to stop.
|
||||
let mut running_queries: HashMap<String, oneshot::Sender<()>> = HashMap::new();
|
||||
|
||||
// for stats, keep track of how many events the client published,
|
||||
// and how many it received from queries.
|
||||
let mut client_published_event_count: usize = 0;
|
||||
@@ -349,11 +412,31 @@ async fn nostr_server(
|
||||
// server shutting down, exit loop
|
||||
break;
|
||||
},
|
||||
_ = ping_interval.tick() => {
|
||||
// check how long since we talked to client
|
||||
// if it has been too long, disconnect
|
||||
if last_message_time.elapsed() > max_quiet_time {
|
||||
debug!("ending connection due to lack of client ping response");
|
||||
break;
|
||||
}
|
||||
// Send a ping
|
||||
ws_stream.send(Message::Ping(Vec::new())).await.ok();
|
||||
},
|
||||
Some(notice_msg) = notice_rx.recv() => {
|
||||
ws_stream.send(make_notice_message(¬ice_msg)).await.ok();
|
||||
},
|
||||
Some(query_result) = query_rx.recv() => {
|
||||
// database informed us of a query result we asked for
|
||||
let res = EventRes(query_result.sub_id,query_result.event);
|
||||
client_received_event_count += 1;
|
||||
nostr_stream.send(res).await.ok();
|
||||
let subesc = query_result.sub_id.replace("\"", "");
|
||||
if query_result.event == "EOSE" {
|
||||
let send_str = format!("[\"EOSE\",\"{}\"]", subesc);
|
||||
ws_stream.send(Message::Text(send_str)).await.ok();
|
||||
} else {
|
||||
client_received_event_count += 1;
|
||||
// send a result
|
||||
let send_str = format!("[\"EVENT\",\"{}\",{}]", subesc, &query_result.event);
|
||||
ws_stream.send(Message::Text(send_str)).await.ok();
|
||||
}
|
||||
},
|
||||
// TODO: consider logging the LaggedRecv error
|
||||
Ok(global_event) = bcast_rx.recv() => {
|
||||
@@ -368,17 +451,55 @@ async fn nostr_server(
|
||||
cid, s,
|
||||
global_event.get_event_id_prefix());
|
||||
// create an event response and send it
|
||||
let res = EventRes(s.to_owned(),event_str);
|
||||
nostr_stream.send(res).await.ok();
|
||||
let subesc = s.replace("\"", "");
|
||||
ws_stream.send(Message::Text(format!("[\"EVENT\",\"{}\",{}]", subesc, event_str))).await.ok();
|
||||
//nostr_stream.send(res).await.ok();
|
||||
} else {
|
||||
warn!("could not serialize event {:?}", global_event.get_event_id_prefix());
|
||||
}
|
||||
}
|
||||
},
|
||||
// check if this client has a subscription
|
||||
proto_next = nostr_stream.next() => {
|
||||
match proto_next {
|
||||
Some(Ok(EventMsg(ec))) => {
|
||||
ws_next = ws_stream.next() => {
|
||||
// update most recent message time for client
|
||||
last_message_time = Instant::now();
|
||||
// Consume text messages from the client, parse into Nostr messages.
|
||||
let nostr_msg = match ws_next {
|
||||
Some(Ok(Message::Text(m))) => {
|
||||
convert_to_msg(m)
|
||||
},
|
||||
Some(Ok(Message::Binary(_))) => {
|
||||
ws_stream.send(make_notice_message("binary messages are not accepted")).await.ok();
|
||||
continue;
|
||||
},
|
||||
Some(Ok(Message::Ping(_))) | Some(Ok(Message::Pong(_))) => {
|
||||
// get a ping/pong, ignore. tungstenite will
|
||||
// send responses automatically.
|
||||
continue;
|
||||
},
|
||||
None |
|
||||
Some(Ok(Message::Close(_))) |
|
||||
Some(Err(WsError::AlreadyClosed)) |
|
||||
Some(Err(WsError::ConnectionClosed)) |
|
||||
Some(Err(WsError::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake)))
|
||||
=> {
|
||||
debug!("websocket close from client: {:?}",cid);
|
||||
break;
|
||||
},
|
||||
Some(Err(WsError::Io(e))) => {
|
||||
// IO errors are considered fatal
|
||||
warn!("IO error (client: {:?}): {:?}", cid, e);
|
||||
break;
|
||||
}
|
||||
x => {
|
||||
// default condition on error is to close the client connection
|
||||
info!("unknown error (client: {:?}): {:?} (closing conn)", cid, x);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
// convert ws_next into proto_next
|
||||
match nostr_msg {
|
||||
Ok(NostrMessage::EventMsg(ec)) => {
|
||||
// An EventCmd needs to be validated to be converted into an Event
|
||||
// handle each type of message
|
||||
let parsed : Result<Event> = Result::<Event>::from(ec);
|
||||
@@ -386,23 +507,18 @@ async fn nostr_server(
|
||||
Ok(e) => {
|
||||
let id_prefix:String = e.id.chars().take(8).collect();
|
||||
debug!("successfully parsed/validated event: {:?} from client: {:?}", id_prefix, cid);
|
||||
// TODO: consider moving some/all
|
||||
// authorization checks here, instead
|
||||
// of the DB module, so we can send a
|
||||
// proper NOTICE back to the client if
|
||||
// they are unable to write.
|
||||
|
||||
// Write this to the database
|
||||
event_tx.send(e.clone()).await.ok();
|
||||
// Write this to the database.
|
||||
let submit_event = SubmittedEvent { event: e.clone(), notice_tx: notice_tx.clone() };
|
||||
event_tx.send(submit_event).await.ok();
|
||||
client_published_event_count += 1;
|
||||
},
|
||||
Err(_) => {
|
||||
info!("client {:?} sent an invalid event", cid);
|
||||
nostr_stream.send(NoticeRes("event was invalid".to_owned())).await.ok();
|
||||
ws_stream.send(make_notice_message("event was invalid")).await.ok();
|
||||
}
|
||||
}
|
||||
},
|
||||
Some(Ok(SubMsg(s))) => {
|
||||
Ok(NostrMessage::SubMsg(s)) => {
|
||||
debug!("client {} requesting a subscription", cid);
|
||||
// subscription handling consists of:
|
||||
// * registering the subscription so future events can be matched
|
||||
@@ -416,18 +532,15 @@ async fn nostr_server(
|
||||
previous_query.send(()).ok();
|
||||
}
|
||||
// start a database query
|
||||
// show pool stats
|
||||
debug!("DB pool stats: {:?}", pool.state());
|
||||
db::db_query(s, pool.get().expect("could not get connection"), query_tx.clone(), abandon_query_rx).await;
|
||||
db::db_query(s, cid.to_owned(), pool.clone(), query_tx.clone(), abandon_query_rx).await;
|
||||
},
|
||||
Err(e) => {
|
||||
info!("Subscription error: {}", e);
|
||||
nostr_stream.send(NoticeRes(format!("{}",e))).await.ok();
|
||||
|
||||
ws_stream.send(make_notice_message(&e.to_string())).await.ok();
|
||||
}
|
||||
}
|
||||
},
|
||||
Some(Ok(CloseMsg(cc))) => {
|
||||
Ok(NostrMessage::CloseMsg(cc)) => {
|
||||
// closing a request simply removes the subscription.
|
||||
let parsed : Result<Close> = Result::<Close>::from(cc);
|
||||
match parsed {
|
||||
@@ -444,23 +557,23 @@ async fn nostr_server(
|
||||
},
|
||||
Err(_) => {
|
||||
info!("invalid command ignored");
|
||||
|
||||
ws_stream.send(make_notice_message("could not parse command")).await.ok();
|
||||
}
|
||||
}
|
||||
},
|
||||
None => {
|
||||
debug!("normal websocket close from client: {:?}",cid);
|
||||
break;
|
||||
},
|
||||
Some(Err(Error::ConnError)) => {
|
||||
Err(Error::ConnError) => {
|
||||
debug!("got connection close/error, disconnecting client: {:?}",cid);
|
||||
break;
|
||||
}
|
||||
Some(Err(Error::EventMaxLengthError(s))) => {
|
||||
Err(Error::EventMaxLengthError(s)) => {
|
||||
info!("client {:?} sent event larger ({} bytes) than max size", cid, s);
|
||||
nostr_stream.send(NoticeRes("event exceeded max size".to_owned())).await.ok();
|
||||
ws_stream.send(make_notice_message("event exceeded max size")).await.ok();
|
||||
},
|
||||
Some(Err(e)) => {
|
||||
Err(Error::ProtoParseError) => {
|
||||
info!("client {:?} sent event that could not be parsed", cid);
|
||||
ws_stream.send(make_notice_message("could not parse command")).await.ok();
|
||||
},
|
||||
Err(e) => {
|
||||
info!("got non-fatal error from client: {:?}, error: {:?}", cid, e);
|
||||
},
|
||||
}
|
||||
|
@@ -1,141 +0,0 @@
|
||||
//! Nostr protocol layered over WebSocket
|
||||
use crate::close::CloseCmd;
|
||||
use crate::config;
|
||||
use crate::error::{Error, Result};
|
||||
use crate::event::EventCmd;
|
||||
use crate::subscription::Subscription;
|
||||
use core::pin::Pin;
|
||||
use futures::sink::Sink;
|
||||
use futures::stream::Stream;
|
||||
use futures::task::Context;
|
||||
use futures::task::Poll;
|
||||
use hyper::upgrade::Upgraded;
|
||||
use log::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
use tungstenite::error::Error as WsError;
|
||||
use tungstenite::protocol::Message;
|
||||
|
||||
/// Nostr protocol messages from a client
|
||||
#[derive(Deserialize, Serialize, Clone, PartialEq, Debug)]
|
||||
#[serde(untagged)]
|
||||
pub enum NostrMessage {
|
||||
/// An `EVENT` message
|
||||
EventMsg(EventCmd),
|
||||
/// A `REQ` message
|
||||
SubMsg(Subscription),
|
||||
/// A `CLOSE` message
|
||||
CloseMsg(CloseCmd),
|
||||
}
|
||||
|
||||
/// Nostr protocol messages from a relay/server
|
||||
#[derive(Deserialize, Serialize, Clone, PartialEq, Debug)]
|
||||
pub enum NostrResponse {
|
||||
/// A `NOTICE` response
|
||||
NoticeRes(String),
|
||||
/// An `EVENT` response, composed of the subscription identifier,
|
||||
/// and serialized event JSON
|
||||
EventRes(String, String),
|
||||
}
|
||||
|
||||
/// A Nostr protocol stream is layered on top of a Websocket stream.
|
||||
pub struct NostrStream {
|
||||
ws_stream: WebSocketStream<Upgraded>,
|
||||
}
|
||||
|
||||
/// Given a websocket, return a protocol stream wrapper.
|
||||
pub fn wrap_ws_in_nostr(ws: WebSocketStream<Upgraded>) -> NostrStream {
|
||||
NostrStream { ws_stream: ws }
|
||||
}
|
||||
|
||||
/// Implement the [`Stream`] interface to produce Nostr messages.
|
||||
impl Stream for NostrStream {
|
||||
type Item = Result<NostrMessage>;
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
// get the configuration
|
||||
/// Convert Message to NostrMessage
|
||||
fn convert(msg: String) -> Result<NostrMessage> {
|
||||
let config = config::SETTINGS.read().unwrap();
|
||||
let parsed_res: Result<NostrMessage> = serde_json::from_str(&msg).map_err(|e| e.into());
|
||||
match parsed_res {
|
||||
Ok(m) => {
|
||||
if let NostrMessage::EventMsg(_) = m {
|
||||
if let Some(max_size) = config.limits.max_event_bytes {
|
||||
// check length, ensure that some max size is set.
|
||||
if msg.len() > max_size && max_size > 0 {
|
||||
return Err(Error::EventMaxLengthError(msg.len()));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(m)
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("proto parse error: {:?}", e);
|
||||
debug!("parse error on message: {}", msg.trim());
|
||||
Err(Error::ProtoParseError)
|
||||
}
|
||||
}
|
||||
}
|
||||
match Pin::new(&mut self.ws_stream).poll_next(cx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
Poll::Ready(Some(v)) => match v {
|
||||
Ok(Message::Text(vs)) => Poll::Ready(Some(convert(vs))),
|
||||
Ok(Message::Ping(x)) => {
|
||||
debug!("client ping ({:?})", x);
|
||||
//Pin::new(&mut self.ws_stream).start_send(Message::Pong(x));
|
||||
// TODO: restructure this so that Pongs work
|
||||
//Pin::new(&mut self.ws_stream).write_pending();
|
||||
//info!("sent pong");
|
||||
Poll::Pending
|
||||
}
|
||||
Ok(Message::Binary(_)) => Poll::Ready(Some(Err(Error::ProtoParseError))),
|
||||
Ok(Message::Pong(_)) => Poll::Pending,
|
||||
Ok(Message::Close(_)) => Poll::Ready(None),
|
||||
Err(WsError::AlreadyClosed) | Err(WsError::ConnectionClosed) => Poll::Ready(None),
|
||||
Err(_) => Poll::Ready(Some(Err(Error::ConnError))),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Implement the [`Sink`] interface to produce Nostr responses.
|
||||
impl Sink<NostrResponse> for NostrStream {
|
||||
type Error = Error;
|
||||
|
||||
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
// map the error type
|
||||
match Pin::new(&mut self.ws_stream).poll_ready(cx) {
|
||||
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
|
||||
Poll::Ready(Err(_)) => Poll::Ready(Err(Error::ConnWriteError)),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
|
||||
fn start_send(mut self: Pin<&mut Self>, item: NostrResponse) -> Result<(), Self::Error> {
|
||||
// TODO: do real escaping for these - at least on NOTICE,
|
||||
// which surely has some problems if arbitrary text is sent.
|
||||
let send_str = match item {
|
||||
NostrResponse::NoticeRes(msg) => {
|
||||
let s = msg.replace("\"", "");
|
||||
format!("[\"NOTICE\",\"{}\"]", s)
|
||||
}
|
||||
NostrResponse::EventRes(sub, eventstr) => {
|
||||
let subesc = sub.replace("\"", "");
|
||||
format!("[\"EVENT\",\"{}\",{}]", subesc, eventstr)
|
||||
}
|
||||
};
|
||||
match Pin::new(&mut self.ws_stream).start_send(Message::Text(send_str)) {
|
||||
Ok(()) => Ok(()),
|
||||
Err(_) => Err(Error::ConnWriteError),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
@@ -1,13 +1,13 @@
|
||||
//! Database schema and migrations
|
||||
use crate::db::PooledConnection;
|
||||
use crate::error::Result;
|
||||
use crate::utils::is_hex;
|
||||
use crate::event::{single_char_tagname, Event};
|
||||
use crate::utils::is_lower_hex;
|
||||
use log::*;
|
||||
use rusqlite::limits::Limit;
|
||||
use rusqlite::params;
|
||||
use rusqlite::Connection;
|
||||
|
||||
// TODO: drop the pubkey_ref and event_ref tables
|
||||
use std::time::Instant;
|
||||
|
||||
/// Startup DB Pragmas
|
||||
pub const STARTUP_SQL: &str = r##"
|
||||
@@ -24,7 +24,7 @@ PRAGMA journal_mode=WAL;
|
||||
PRAGMA main.synchronous=NORMAL;
|
||||
PRAGMA foreign_keys = ON;
|
||||
PRAGMA application_id = 1654008667;
|
||||
PRAGMA user_version = 5;
|
||||
PRAGMA user_version = 6;
|
||||
|
||||
-- Event Table
|
||||
CREATE TABLE IF NOT EXISTS event (
|
||||
@@ -53,7 +53,7 @@ id INTEGER PRIMARY KEY,
|
||||
event_id INTEGER NOT NULL, -- an event ID that contains a tag.
|
||||
name TEXT, -- the tag name ("p", "e", whatever)
|
||||
value TEXT, -- the tag value, if not hex.
|
||||
value_hex BLOB, -- the tag value, if it can be interpreted as a hex string.
|
||||
value_hex BLOB, -- the tag value, if it can be interpreted as a lowercase hex string.
|
||||
FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS tag_val_index ON tag(value);
|
||||
@@ -103,7 +103,7 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<()> {
|
||||
if curr_version == 0 {
|
||||
match conn.execute_batch(INIT_SQL) {
|
||||
Ok(()) => {
|
||||
info!("database pragma/schema initialized to v4, and ready");
|
||||
info!("database pragma/schema initialized to v6, and ready");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
@@ -154,7 +154,6 @@ PRAGMA user_version = 3;
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
info!("Starting transaction");
|
||||
// iterate over every event/pubkey tag
|
||||
let tx = conn.transaction()?;
|
||||
{
|
||||
@@ -166,7 +165,7 @@ PRAGMA user_version = 3;
|
||||
let tag_name: String = row.get(1)?;
|
||||
let tag_value: String = row.get(2)?;
|
||||
// this will leave behind p/e tags that were non-hex, but they are invalid anyways.
|
||||
if is_hex(&tag_value) {
|
||||
if is_lower_hex(&tag_value) {
|
||||
tx.execute(
|
||||
"INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);",
|
||||
params![event_id, tag_name, hex::decode(&tag_value).ok()],
|
||||
@@ -225,9 +224,63 @@ PRAGMA user_version=5;
|
||||
}
|
||||
}
|
||||
} else if curr_version == 5 {
|
||||
debug!("Database version was already current");
|
||||
} else if curr_version > 5 {
|
||||
panic!("Database version is newer than supported by this executable");
|
||||
info!("database schema needs update from 5->6");
|
||||
// We need to rebuild the tags table. iterate through the
|
||||
// event table. build event from json, insert tags into a
|
||||
// fresh tag table. This was needed due to a logic error in
|
||||
// how hex-like tags got indexed.
|
||||
let start = Instant::now();
|
||||
let tx = conn.transaction()?;
|
||||
{
|
||||
// Clear out table
|
||||
tx.execute("DELETE FROM tag;", [])?;
|
||||
let mut stmt = tx.prepare("select id, content from event order by id;")?;
|
||||
let mut tag_rows = stmt.query([])?;
|
||||
while let Some(row) = tag_rows.next()? {
|
||||
// we want to capture the event_id that had the tag, the tag name, and the tag hex value.
|
||||
let event_id: u64 = row.get(0)?;
|
||||
let event_json: String = row.get(1)?;
|
||||
let event: Event = serde_json::from_str(&event_json)?;
|
||||
// look at each event, and each tag, creating new tag entries if appropriate.
|
||||
for t in event.tags.iter().filter(|x| x.len() > 1) {
|
||||
let tagname = t.get(0).unwrap();
|
||||
let tagnamechar_opt = single_char_tagname(tagname);
|
||||
if tagnamechar_opt.is_none() {
|
||||
continue;
|
||||
}
|
||||
// safe because len was > 1
|
||||
let tagval = t.get(1).unwrap();
|
||||
// insert as BLOB if we can restore it losslessly.
|
||||
// this means it needs to be even length and lowercase.
|
||||
if (tagval.len() % 2 == 0) && is_lower_hex(&tagval) {
|
||||
tx.execute(
|
||||
"INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);",
|
||||
params![event_id, tagname, hex::decode(&tagval).ok()],
|
||||
)?;
|
||||
} else {
|
||||
// otherwise, insert as text
|
||||
tx.execute(
|
||||
"INSERT INTO tag (event_id, name, value) VALUES (?1, ?2, ?3);",
|
||||
params![event_id, tagname, &tagval],
|
||||
)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
tx.execute("PRAGMA user_version = 6;", [])?;
|
||||
}
|
||||
tx.commit()?;
|
||||
info!("database schema upgraded v5 -> v6 in {:?}", start.elapsed());
|
||||
// vacuum after large table modification
|
||||
let start = Instant::now();
|
||||
conn.execute("VACUUM;", [])?;
|
||||
info!("vacuumed DB after tags rebuild in {:?}", start.elapsed());
|
||||
} else if curr_version == 6 {
|
||||
debug!("Database version was already current (v6)");
|
||||
} else if curr_version > 7 {
|
||||
panic!(
|
||||
"Database version is newer than supported by this executable (v{})",
|
||||
curr_version
|
||||
);
|
||||
}
|
||||
|
||||
// Setup PRAGMA
|
||||
|
@@ -31,9 +31,13 @@ pub struct ReqFilter {
|
||||
pub until: Option<u64>,
|
||||
/// List of author public keys
|
||||
pub authors: Option<Vec<String>>,
|
||||
/// Limit number of results
|
||||
pub limit: Option<u64>,
|
||||
/// Set of tags
|
||||
#[serde(skip)]
|
||||
pub tags: Option<HashMap<String, HashSet<String>>>,
|
||||
pub tags: Option<HashMap<char, HashSet<String>>>,
|
||||
/// Force no matches due to malformed data
|
||||
pub force_no_match: bool,
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for ReqFilter {
|
||||
@@ -54,7 +58,9 @@ impl<'de> Deserialize<'de> for ReqFilter {
|
||||
since: None,
|
||||
until: None,
|
||||
authors: None,
|
||||
limit: None,
|
||||
tags: None,
|
||||
force_no_match: false,
|
||||
};
|
||||
let mut ts = None;
|
||||
// iterate through each key, and assign values that exist
|
||||
@@ -68,22 +74,28 @@ impl<'de> Deserialize<'de> for ReqFilter {
|
||||
rf.since = Deserialize::deserialize(val).ok();
|
||||
} else if key == "until" {
|
||||
rf.until = Deserialize::deserialize(val).ok();
|
||||
} else if key == "limit" {
|
||||
rf.limit = Deserialize::deserialize(val).ok();
|
||||
} else if key == "authors" {
|
||||
rf.authors = Deserialize::deserialize(val).ok();
|
||||
} else if key.starts_with('#') && key.len() > 1 && val.is_array() {
|
||||
// remove the prefix
|
||||
let tagname = &key[1..];
|
||||
if ts.is_none() {
|
||||
// Initialize the tag if necessary
|
||||
ts = Some(HashMap::new());
|
||||
}
|
||||
if let Some(m) = ts.as_mut() {
|
||||
let tag_vals: Option<Vec<String>> = Deserialize::deserialize(val).ok();
|
||||
if let Some(v) = tag_vals {
|
||||
let hs = HashSet::from_iter(v.into_iter());
|
||||
m.insert(tagname.to_owned(), hs);
|
||||
if let Some(tag_search) = tag_search_char_from_filter(key) {
|
||||
if ts.is_none() {
|
||||
// Initialize the tag if necessary
|
||||
ts = Some(HashMap::new());
|
||||
}
|
||||
};
|
||||
if let Some(m) = ts.as_mut() {
|
||||
let tag_vals: Option<Vec<String>> = Deserialize::deserialize(val).ok();
|
||||
if let Some(v) = tag_vals {
|
||||
let hs = HashSet::from_iter(v.into_iter());
|
||||
m.insert(tag_search.to_owned(), hs);
|
||||
}
|
||||
};
|
||||
} else {
|
||||
// tag search that is multi-character, don't add to subscription
|
||||
rf.force_no_match = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
rf.tags = ts;
|
||||
@@ -91,6 +103,26 @@ impl<'de> Deserialize<'de> for ReqFilter {
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempt to form a single-char identifier from a tag search filter
|
||||
fn tag_search_char_from_filter(tagname: &str) -> Option<char> {
|
||||
let tagname_nohash = &tagname[1..];
|
||||
// We return the tag character if and only if the tagname consists
|
||||
// of a single char.
|
||||
let mut tagnamechars = tagname_nohash.chars();
|
||||
let firstchar = tagnamechars.next();
|
||||
return match firstchar {
|
||||
Some(_) => {
|
||||
// check second char
|
||||
if tagnamechars.next().is_none() {
|
||||
firstchar
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for Subscription {
|
||||
/// Custom deserializer for subscriptions, which have a more
|
||||
/// complex structure than the other message types.
|
||||
@@ -189,7 +221,7 @@ impl ReqFilter {
|
||||
// get the hashset from the filter.
|
||||
if let Some(map) = &self.tags {
|
||||
for (key, val) in map.iter() {
|
||||
let tag_match = event.generic_tag_val_intersect(key, val);
|
||||
let tag_match = event.generic_tag_val_intersect(*key, val);
|
||||
// if there is no match for this tag, the match fails.
|
||||
if !tag_match {
|
||||
return false;
|
||||
@@ -214,9 +246,11 @@ impl ReqFilter {
|
||||
// self.id.as_ref().map(|v| v == &event.id).unwrap_or(true)
|
||||
self.ids_match(event)
|
||||
&& self.since.map(|t| event.created_at > t).unwrap_or(true)
|
||||
&& self.until.map(|t| event.created_at < t).unwrap_or(true)
|
||||
&& self.kind_match(event.kind)
|
||||
&& self.authors_match(event)
|
||||
&& self.tag_match(event)
|
||||
&& !self.force_no_match
|
||||
}
|
||||
}
|
||||
|
||||
@@ -321,6 +355,50 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn interest_until() -> Result<()> {
|
||||
// subscription with a filter for ID and time
|
||||
let s: Subscription =
|
||||
serde_json::from_str(r#"["REQ","xyz",{"ids": ["abc"], "until": 1000}]"#)?;
|
||||
let e = Event {
|
||||
id: "abc".to_owned(),
|
||||
pubkey: "".to_owned(),
|
||||
created_at: 50,
|
||||
kind: 0,
|
||||
tags: Vec::new(),
|
||||
content: "".to_owned(),
|
||||
sig: "".to_owned(),
|
||||
tagidx: None,
|
||||
};
|
||||
assert!(s.interested_in_event(&e));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn interest_range() -> Result<()> {
|
||||
// subscription with a filter for ID and time
|
||||
let s_in: Subscription =
|
||||
serde_json::from_str(r#"["REQ","xyz",{"ids": ["abc"], "since": 100, "until": 200}]"#)?;
|
||||
let s_before: Subscription =
|
||||
serde_json::from_str(r#"["REQ","xyz",{"ids": ["abc"], "since": 100, "until": 140}]"#)?;
|
||||
let s_after: Subscription =
|
||||
serde_json::from_str(r#"["REQ","xyz",{"ids": ["abc"], "since": 160, "until": 200}]"#)?;
|
||||
let e = Event {
|
||||
id: "abc".to_owned(),
|
||||
pubkey: "".to_owned(),
|
||||
created_at: 150,
|
||||
kind: 0,
|
||||
tags: Vec::new(),
|
||||
content: "".to_owned(),
|
||||
sig: "".to_owned(),
|
||||
tagidx: None,
|
||||
};
|
||||
assert!(s_in.interested_in_event(&e));
|
||||
assert!(!s_before.interested_in_event(&e));
|
||||
assert!(!s_after.interested_in_event(&e));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn interest_time_and_id() -> Result<()> {
|
||||
// subscription with a filter for ID and time
|
||||
|
18
src/utils.rs
18
src/utils.rs
@@ -13,3 +13,21 @@ pub fn unix_time() -> u64 {
|
||||
pub fn is_hex(s: &str) -> bool {
|
||||
s.chars().all(|x| char::is_ascii_hexdigit(&x))
|
||||
}
|
||||
|
||||
/// Check if a string contains only lower-case hex chars.
|
||||
pub fn is_lower_hex(s: &str) -> bool {
|
||||
s.chars().all(|x| {
|
||||
(char::is_ascii_lowercase(&x) || char::is_ascii_digit(&x)) && char::is_ascii_hexdigit(&x)
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn lower_hex() {
|
||||
let hexstr = "abcd0123";
|
||||
assert_eq!(is_lower_hex(hexstr), true);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user