mirror of
https://github.com/scsibug/nostr-rs-relay.git
synced 2025-09-01 03:40:46 -04:00
Compare commits
25 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
8da6f6555a | ||
|
5bcc63bd56 | ||
|
035cf34673 | ||
|
be8170342e | ||
|
0a3b15f41f | ||
|
2b4b17dbda | ||
|
5058d98ad6 | ||
|
f4ecd43708 | ||
|
a8f465fdc8 | ||
|
1c14adc766 | ||
|
e894a86566 | ||
|
bedc378624 | ||
|
e1c2a6b758 | ||
|
990bb656e8 | ||
|
168cfc3b26 | ||
|
a36ad378f6 | ||
|
538d139ebf | ||
|
23f7730fea | ||
|
8aa1256254 | ||
|
9ed3391b46 | ||
|
4ad483090e | ||
|
9b351aab9b | ||
|
597749890e | ||
|
1d499cf12b | ||
|
ed3a6b9692 |
600
Cargo.lock
generated
600
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "nostr-rs-relay"
|
||||
version = "0.5.2"
|
||||
version = "0.6.2"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
@@ -19,7 +19,7 @@ secp256k1 = {version = "^0.21", features = ["rand", "rand-std", "serde", "bitcoi
|
||||
serde = { version = "^1.0", features = ["derive"] }
|
||||
serde_json = {version = "^1.0", features = ["preserve_order"]}
|
||||
hex = "^0.4"
|
||||
rusqlite = { version = "^0.26", features = ["limits"]}
|
||||
rusqlite = { version = "^0.26", features = ["limits","bundled"]}
|
||||
r2d2 = "^0.8"
|
||||
r2d2_sqlite = "^0.19"
|
||||
lazy_static = "^1.4"
|
||||
|
@@ -1,4 +1,4 @@
|
||||
FROM rust:1.58.1 as builder
|
||||
FROM docker.io/library/rust:1.63.0@sha256:d7e3f69edcdcd03b145d8d9361765b816656755e49c1c1fe28224a4505f91b0a as builder
|
||||
|
||||
RUN USER=root cargo new --bin nostr-rs-relay
|
||||
WORKDIR ./nostr-rs-relay
|
||||
@@ -12,7 +12,7 @@ COPY ./src ./src
|
||||
RUN rm ./target/release/deps/nostr*relay*
|
||||
RUN cargo build --release
|
||||
|
||||
FROM debian:bullseye-20220125-slim
|
||||
FROM docker.io/library/debian:bullseye-20220801-slim@sha256:139a42fa3bde3e5bad6ae912aaaf2103565558a7a73afe6ce6ceed6e46a6e519
|
||||
ARG APP=/usr/src/app
|
||||
ARG APP_DATA=/usr/src/app/db
|
||||
RUN apt-get update \
|
||||
|
10
README.md
10
README.md
@@ -18,9 +18,11 @@ NIPs with a relay-specific implementation are listed here.
|
||||
- [x] NIP-02: Hide old contact list events
|
||||
- [ ] NIP-03: OpenTimestamps
|
||||
- [x] NIP-05: Mapping Nostr keys to DNS identifiers
|
||||
- [ ] NIP-09: Event deletion
|
||||
- [x] NIP-09: Event deletion
|
||||
- [x] NIP-11: Relay information document
|
||||
- [x] NIP-12: Generic tag search (_experimental_)
|
||||
- [x] NIP-15: End of stored events notice
|
||||
- [x] NIP-16: Replaceable and ephemeral events
|
||||
|
||||
## Quick Start
|
||||
|
||||
@@ -79,8 +81,10 @@ termination, load balancing, and other features), see [Reverse
|
||||
Proxy](reverse-proxy.md).
|
||||
|
||||
## Dev Channel
|
||||
The current dev discussions for this project is happening at https://discord.gg/ufG6fH52Vk.
|
||||
Drop in to query any development related questions.
|
||||
|
||||
For development discussions, please feel free to use the [sourcehut
|
||||
mailing list](https://lists.sr.ht/~gheartsfield/nostr-rs-relay-devel).
|
||||
Or, drop by the [Nostr Telegram Channel](https://t.me/nostr_protocol).
|
||||
|
||||
License
|
||||
---
|
||||
|
421
src/db.rs
421
src/db.rs
@@ -2,13 +2,14 @@
|
||||
use crate::config::SETTINGS;
|
||||
use crate::error::Error;
|
||||
use crate::error::Result;
|
||||
use crate::event::Event;
|
||||
use crate::event::{single_char_tagname, Event};
|
||||
use crate::hexrange::hex_range;
|
||||
use crate::hexrange::HexSearch;
|
||||
use crate::nip05;
|
||||
use crate::schema::{upgrade_db, STARTUP_SQL};
|
||||
use crate::subscription::ReqFilter;
|
||||
use crate::subscription::Subscription;
|
||||
use crate::utils::is_hex;
|
||||
use crate::utils::{is_hex, is_lower_hex};
|
||||
use governor::clock::Clock;
|
||||
use governor::{Quota, RateLimiter};
|
||||
use hex;
|
||||
@@ -207,30 +208,41 @@ pub async fn db_writer(
|
||||
}
|
||||
// TODO: cache recent list of authors to remove a DB call.
|
||||
let start = Instant::now();
|
||||
match write_event(&mut pool.get()?, &event) {
|
||||
Ok(updated) => {
|
||||
if updated == 0 {
|
||||
trace!("ignoring duplicate event");
|
||||
} else {
|
||||
info!(
|
||||
"persisted event {:?} from {:?} in {:?}",
|
||||
event.get_event_id_prefix(),
|
||||
event.get_author_prefix(),
|
||||
start.elapsed()
|
||||
);
|
||||
event_write = true;
|
||||
// send this out to all clients
|
||||
bcast_tx.send(event.clone()).ok();
|
||||
if event.kind >= 20000 && event.kind < 30000 {
|
||||
info!(
|
||||
"published ephemeral event {:?} from {:?} in {:?}",
|
||||
event.get_event_id_prefix(),
|
||||
event.get_author_prefix(),
|
||||
start.elapsed()
|
||||
);
|
||||
bcast_tx.send(event.clone()).ok();
|
||||
event_write = true
|
||||
} else {
|
||||
match write_event(&mut pool.get()?, &event) {
|
||||
Ok(updated) => {
|
||||
if updated == 0 {
|
||||
trace!("ignoring duplicate event");
|
||||
} else {
|
||||
info!(
|
||||
"persisted event {:?} from {:?} in {:?}",
|
||||
event.get_event_id_prefix(),
|
||||
event.get_author_prefix(),
|
||||
start.elapsed()
|
||||
);
|
||||
event_write = true;
|
||||
// send this out to all clients
|
||||
bcast_tx.send(event.clone()).ok();
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("event insert failed: {:?}", err);
|
||||
notice_tx
|
||||
.try_send(
|
||||
"relay experienced an error trying to publish the latest event"
|
||||
.to_owned(),
|
||||
)
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("event insert failed: {:?}", err);
|
||||
notice_tx
|
||||
.try_send(
|
||||
"relay experienced an error trying to publish the latest event"
|
||||
.to_owned(),
|
||||
)
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -288,49 +300,66 @@ pub fn write_event(conn: &mut PooledConnection, e: &Event) -> Result<usize> {
|
||||
if tag.len() >= 2 {
|
||||
let tagname = &tag[0];
|
||||
let tagval = &tag[1];
|
||||
// if tagvalue is hex;
|
||||
if is_hex(tagval) {
|
||||
tx.execute(
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
|
||||
params![ev_id, &tagname, hex::decode(&tagval).ok()],
|
||||
)?;
|
||||
} else {
|
||||
tx.execute(
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)",
|
||||
params![ev_id, &tagname, &tagval],
|
||||
)?;
|
||||
// only single-char tags are searchable
|
||||
let tagchar_opt = single_char_tagname(tagname);
|
||||
match &tagchar_opt {
|
||||
Some(_) => {
|
||||
// if tagvalue is lowercase hex;
|
||||
if is_lower_hex(&tagval) && (tagval.len() % 2 == 0) {
|
||||
tx.execute(
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
|
||||
params![ev_id, &tagname, hex::decode(&tagval).ok()],
|
||||
)?;
|
||||
} else {
|
||||
tx.execute(
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)",
|
||||
params![ev_id, &tagname, &tagval],
|
||||
)?;
|
||||
}
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
// if this event is for a metadata update, hide every other kind=0
|
||||
// event from the same author that was issued earlier than this.
|
||||
if e.kind == 0 {
|
||||
// if this event is replaceable update, hide every other replaceable
|
||||
// event with the same kind from the same author that was issued
|
||||
// earlier than this.
|
||||
if e.kind == 0 || e.kind == 3 || (e.kind >= 10000 && e.kind < 20000) {
|
||||
let update_count = tx.execute(
|
||||
"UPDATE event SET hidden=TRUE WHERE id!=? AND kind=0 AND author=? AND created_at <= ? and hidden!=TRUE",
|
||||
params![ev_id, hex::decode(&e.pubkey).ok(), e.created_at],
|
||||
"UPDATE event SET hidden=TRUE WHERE id!=? AND kind=? AND author=? AND created_at <= ? and hidden!=TRUE",
|
||||
params![ev_id, e.kind, hex::decode(&e.pubkey).ok(), e.created_at],
|
||||
)?;
|
||||
if update_count > 0 {
|
||||
info!(
|
||||
"hid {} older metadata events for author {:?}",
|
||||
"hid {} older replaceable kind {} events for author {:?}",
|
||||
update_count,
|
||||
e.kind,
|
||||
e.get_author_prefix()
|
||||
);
|
||||
}
|
||||
}
|
||||
// if this event is for a contact update, hide every other kind=3
|
||||
// event from the same author that was issued earlier than this.
|
||||
if e.kind == 3 {
|
||||
let update_count = tx.execute(
|
||||
"UPDATE event SET hidden=TRUE WHERE id!=? AND kind=3 AND author=? AND created_at <= ? and hidden!=TRUE",
|
||||
params![ev_id, hex::decode(&e.pubkey).ok(), e.created_at],
|
||||
)?;
|
||||
if update_count > 0 {
|
||||
info!(
|
||||
"hid {} older contact events for author {:?}",
|
||||
update_count,
|
||||
e.get_author_prefix()
|
||||
);
|
||||
}
|
||||
// if this event is a deletion, hide the referenced events from the same author.
|
||||
if e.kind == 5 {
|
||||
let event_candidates = e.tag_values_by_name("e");
|
||||
let mut params: Vec<Box<dyn ToSql>> = vec![];
|
||||
// first parameter will be author
|
||||
params.push(Box::new(hex::decode(&e.pubkey)?));
|
||||
event_candidates
|
||||
.iter()
|
||||
.filter(|x| is_hex(x) && x.len() == 64)
|
||||
.filter_map(|x| hex::decode(x).ok())
|
||||
.for_each(|x| params.push(Box::new(x)));
|
||||
let query = format!(
|
||||
"UPDATE event SET hidden=TRUE WHERE author=? AND event_hash IN ({})",
|
||||
repeat_vars(params.len() - 1)
|
||||
);
|
||||
let mut stmt = tx.prepare(&query)?;
|
||||
let update_count = stmt.execute(rusqlite::params_from_iter(params))?;
|
||||
info!(
|
||||
"hid {} deleted events for author {:?}",
|
||||
update_count,
|
||||
e.get_author_prefix()
|
||||
);
|
||||
}
|
||||
tx.commit()?;
|
||||
Ok(ins_count)
|
||||
@@ -356,142 +385,166 @@ fn repeat_vars(count: usize) -> String {
|
||||
s
|
||||
}
|
||||
|
||||
/// Create a dynamic SQL query string and params from a subscription.
|
||||
fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
/// Create a dynamic SQL subquery and params from a subscription filter.
|
||||
fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
// build a dynamic SQL query. all user-input is either an integer
|
||||
// (sqli-safe), or a string that is filtered to only contain
|
||||
// hexadecimal characters. Strings that require escaping (tag
|
||||
// names/values) use parameters.
|
||||
let mut query =
|
||||
"SELECT DISTINCT(e.content) FROM event e LEFT JOIN tag t ON e.id=t.event_id ".to_owned();
|
||||
// parameters
|
||||
|
||||
// if the filter is malformed, don't return anything.
|
||||
if f.force_no_match {
|
||||
let empty_query =
|
||||
"SELECT DISTINCT(e.content), e.created_at FROM event e WHERE 1=0".to_owned();
|
||||
// query parameters for SQLite
|
||||
let empty_params: Vec<Box<dyn ToSql>> = vec![];
|
||||
return (empty_query, empty_params);
|
||||
}
|
||||
|
||||
let mut query = "SELECT DISTINCT(e.content), e.created_at FROM event e ".to_owned();
|
||||
// query parameters for SQLite
|
||||
let mut params: Vec<Box<dyn ToSql>> = vec![];
|
||||
|
||||
// for every filter in the subscription, generate a where clause
|
||||
let mut filter_clauses: Vec<String> = Vec::new();
|
||||
for f in sub.filters.iter() {
|
||||
// individual filter components
|
||||
let mut filter_components: Vec<String> = Vec::new();
|
||||
// Query for "authors", allowing prefix matches
|
||||
if let Some(authvec) = &f.authors {
|
||||
// take each author and convert to a hexsearch
|
||||
let mut auth_searches: Vec<String> = vec![];
|
||||
for auth in authvec {
|
||||
match hex_range(auth) {
|
||||
Some(HexSearch::Exact(ex)) => {
|
||||
auth_searches.push("author=?".to_owned());
|
||||
params.push(Box::new(ex));
|
||||
}
|
||||
Some(HexSearch::Range(lower, upper)) => {
|
||||
auth_searches.push("(author>? AND author<?)".to_owned());
|
||||
params.push(Box::new(lower));
|
||||
params.push(Box::new(upper));
|
||||
}
|
||||
Some(HexSearch::LowerOnly(lower)) => {
|
||||
auth_searches.push("author>?".to_owned());
|
||||
params.push(Box::new(lower));
|
||||
}
|
||||
None => {
|
||||
info!("Could not parse hex range from author {:?}", auth);
|
||||
}
|
||||
// individual filter components (single conditions such as an author or event ID)
|
||||
let mut filter_components: Vec<String> = Vec::new();
|
||||
// Query for "authors", allowing prefix matches
|
||||
if let Some(authvec) = &f.authors {
|
||||
// take each author and convert to a hexsearch
|
||||
let mut auth_searches: Vec<String> = vec![];
|
||||
for auth in authvec {
|
||||
match hex_range(auth) {
|
||||
Some(HexSearch::Exact(ex)) => {
|
||||
auth_searches.push("author=?".to_owned());
|
||||
params.push(Box::new(ex));
|
||||
}
|
||||
Some(HexSearch::Range(lower, upper)) => {
|
||||
auth_searches.push("(author>? AND author<?)".to_owned());
|
||||
params.push(Box::new(lower));
|
||||
params.push(Box::new(upper));
|
||||
}
|
||||
Some(HexSearch::LowerOnly(lower)) => {
|
||||
auth_searches.push("author>?".to_owned());
|
||||
params.push(Box::new(lower));
|
||||
}
|
||||
None => {
|
||||
info!("Could not parse hex range from author {:?}", auth);
|
||||
}
|
||||
}
|
||||
let authors_clause = format!("({})", auth_searches.join(" OR "));
|
||||
filter_components.push(authors_clause);
|
||||
}
|
||||
// Query for Kind
|
||||
if let Some(ks) = &f.kinds {
|
||||
// kind is number, no escaping needed
|
||||
let str_kinds: Vec<String> = ks.iter().map(|x| x.to_string()).collect();
|
||||
let kind_clause = format!("kind IN ({})", str_kinds.join(", "));
|
||||
filter_components.push(kind_clause);
|
||||
}
|
||||
// Query for event, allowing prefix matches
|
||||
if let Some(idvec) = &f.ids {
|
||||
// take each author and convert to a hexsearch
|
||||
let mut id_searches: Vec<String> = vec![];
|
||||
for id in idvec {
|
||||
match hex_range(id) {
|
||||
Some(HexSearch::Exact(ex)) => {
|
||||
id_searches.push("event_hash=?".to_owned());
|
||||
params.push(Box::new(ex));
|
||||
}
|
||||
Some(HexSearch::Range(lower, upper)) => {
|
||||
id_searches.push("(event_hash>? AND event_hash<?)".to_owned());
|
||||
params.push(Box::new(lower));
|
||||
params.push(Box::new(upper));
|
||||
}
|
||||
Some(HexSearch::LowerOnly(lower)) => {
|
||||
id_searches.push("event_hash>?".to_owned());
|
||||
params.push(Box::new(lower));
|
||||
}
|
||||
None => {
|
||||
info!("Could not parse hex range from id {:?}", id);
|
||||
}
|
||||
let authors_clause = format!("({})", auth_searches.join(" OR "));
|
||||
filter_components.push(authors_clause);
|
||||
}
|
||||
// Query for Kind
|
||||
if let Some(ks) = &f.kinds {
|
||||
// kind is number, no escaping needed
|
||||
let str_kinds: Vec<String> = ks.iter().map(|x| x.to_string()).collect();
|
||||
let kind_clause = format!("kind IN ({})", str_kinds.join(", "));
|
||||
filter_components.push(kind_clause);
|
||||
}
|
||||
// Query for event, allowing prefix matches
|
||||
if let Some(idvec) = &f.ids {
|
||||
// take each author and convert to a hexsearch
|
||||
let mut id_searches: Vec<String> = vec![];
|
||||
for id in idvec {
|
||||
match hex_range(id) {
|
||||
Some(HexSearch::Exact(ex)) => {
|
||||
id_searches.push("event_hash=?".to_owned());
|
||||
params.push(Box::new(ex));
|
||||
}
|
||||
Some(HexSearch::Range(lower, upper)) => {
|
||||
id_searches.push("(event_hash>? AND event_hash<?)".to_owned());
|
||||
params.push(Box::new(lower));
|
||||
params.push(Box::new(upper));
|
||||
}
|
||||
Some(HexSearch::LowerOnly(lower)) => {
|
||||
id_searches.push("event_hash>?".to_owned());
|
||||
params.push(Box::new(lower));
|
||||
}
|
||||
None => {
|
||||
info!("Could not parse hex range from id {:?}", id);
|
||||
}
|
||||
}
|
||||
let id_clause = format!("({})", id_searches.join(" OR "));
|
||||
filter_components.push(id_clause);
|
||||
}
|
||||
// Query for tags
|
||||
if let Some(map) = &f.tags {
|
||||
for (key, val) in map.iter() {
|
||||
let mut str_vals: Vec<Box<dyn ToSql>> = vec![];
|
||||
let mut blob_vals: Vec<Box<dyn ToSql>> = vec![];
|
||||
for v in val {
|
||||
if is_hex(v) {
|
||||
if let Ok(h) = hex::decode(&v) {
|
||||
blob_vals.push(Box::new(h));
|
||||
}
|
||||
} else {
|
||||
str_vals.push(Box::new(v.to_owned()));
|
||||
let id_clause = format!("({})", id_searches.join(" OR "));
|
||||
filter_components.push(id_clause);
|
||||
}
|
||||
// Query for tags
|
||||
if let Some(map) = &f.tags {
|
||||
for (key, val) in map.iter() {
|
||||
let mut str_vals: Vec<Box<dyn ToSql>> = vec![];
|
||||
let mut blob_vals: Vec<Box<dyn ToSql>> = vec![];
|
||||
for v in val {
|
||||
if (v.len()%2==0) && is_lower_hex(v) {
|
||||
if let Ok(h) = hex::decode(&v) {
|
||||
blob_vals.push(Box::new(h));
|
||||
}
|
||||
} else {
|
||||
str_vals.push(Box::new(v.to_owned()));
|
||||
}
|
||||
// create clauses with "?" params for each tag value being searched
|
||||
let str_clause = format!("value IN ({})", repeat_vars(str_vals.len()));
|
||||
let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
|
||||
let tag_clause = format!("(name=? AND ({} OR {}))", str_clause, blob_clause);
|
||||
// add the tag name as the first parameter
|
||||
params.push(Box::new(key.to_owned()));
|
||||
// add all tag values that are plain strings as params
|
||||
params.append(&mut str_vals);
|
||||
// add all tag values that are blobs as params
|
||||
params.append(&mut blob_vals);
|
||||
filter_components.push(tag_clause);
|
||||
}
|
||||
}
|
||||
// Query for timestamp
|
||||
if f.since.is_some() {
|
||||
let created_clause = format!("created_at > {}", f.since.unwrap());
|
||||
filter_components.push(created_clause);
|
||||
}
|
||||
// Query for timestamp
|
||||
if f.until.is_some() {
|
||||
let until_clause = format!("created_at < {}", f.until.unwrap());
|
||||
filter_components.push(until_clause);
|
||||
}
|
||||
|
||||
// combine all clauses, and add to filter_clauses
|
||||
if !filter_components.is_empty() {
|
||||
let mut fc = "( ".to_owned();
|
||||
fc.push_str(&filter_components.join(" AND "));
|
||||
fc.push_str(" )");
|
||||
filter_clauses.push(fc);
|
||||
// create clauses with "?" params for each tag value being searched
|
||||
let str_clause = format!("value IN ({})", repeat_vars(str_vals.len()));
|
||||
let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
|
||||
// find evidence of the target tag name/value existing for this event.
|
||||
let tag_clause = format!("e.id IN (SELECT e.id FROM event e LEFT JOIN tag t on e.id=t.event_id WHERE hidden!=TRUE and (name=? AND ({} OR {})))", str_clause, blob_clause);
|
||||
// add the tag name as the first parameter
|
||||
params.push(Box::new(key.to_string()));
|
||||
// add all tag values that are plain strings as params
|
||||
params.append(&mut str_vals);
|
||||
// add all tag values that are blobs as params
|
||||
params.append(&mut blob_vals);
|
||||
filter_components.push(tag_clause);
|
||||
}
|
||||
}
|
||||
|
||||
// Query for timestamp
|
||||
if f.since.is_some() {
|
||||
let created_clause = format!("created_at > {}", f.since.unwrap());
|
||||
filter_components.push(created_clause);
|
||||
}
|
||||
// Query for timestamp
|
||||
if f.until.is_some() {
|
||||
let until_clause = format!("created_at < {}", f.until.unwrap());
|
||||
filter_components.push(until_clause);
|
||||
}
|
||||
// never display hidden events
|
||||
query.push_str(" WHERE hidden!=TRUE ");
|
||||
|
||||
// combine all filters with OR clauses, if any exist
|
||||
if !filter_clauses.is_empty() {
|
||||
query.push_str(" AND (");
|
||||
query.push_str(&filter_clauses.join(" OR "));
|
||||
query.push_str(") ");
|
||||
query.push_str(" WHERE hidden!=TRUE");
|
||||
// build filter component conditions
|
||||
if !filter_components.is_empty() {
|
||||
query.push_str(" AND ");
|
||||
query.push_str(&filter_components.join(" AND "));
|
||||
}
|
||||
// add order clause
|
||||
query.push_str(" ORDER BY created_at ASC");
|
||||
debug!("query string: {}", query);
|
||||
// Apply per-filter limit to this subquery.
|
||||
// The use of a LIMIT implies a DESC order, to capture only the most recent events.
|
||||
if let Some(lim) = f.limit {
|
||||
query.push_str(&format!(" ORDER BY e.created_at DESC LIMIT {}", lim))
|
||||
} else {
|
||||
query.push_str(" ORDER BY e.created_at ASC")
|
||||
}
|
||||
(query, params)
|
||||
}
|
||||
|
||||
/// Create a dynamic SQL query string and params from a subscription.
|
||||
fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
// build a dynamic SQL query for an entire subscription, based on
|
||||
// SQL subqueries for filters.
|
||||
let mut subqueries: Vec<String> = Vec::new();
|
||||
// subquery params
|
||||
let mut params: Vec<Box<dyn ToSql>> = vec![];
|
||||
// for every filter in the subscription, generate a subquery
|
||||
for f in sub.filters.iter() {
|
||||
let (f_subquery, mut f_params) = query_from_filter(&f);
|
||||
subqueries.push(f_subquery);
|
||||
params.append(&mut f_params);
|
||||
}
|
||||
// encapsulate subqueries into select statements
|
||||
let subqueries_selects: Vec<String> = subqueries
|
||||
.iter()
|
||||
.map(|s| {
|
||||
return format!("SELECT content, created_at FROM ({})", s);
|
||||
})
|
||||
.collect();
|
||||
let query: String = subqueries_selects.join(" UNION ");
|
||||
info!("final query string: {}", query);
|
||||
(query, params)
|
||||
}
|
||||
|
||||
@@ -503,6 +556,7 @@ fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
|
||||
/// query is immediately aborted.
|
||||
pub async fn db_query(
|
||||
sub: Subscription,
|
||||
client_id: String,
|
||||
pool: SqlitePool,
|
||||
query_tx: tokio::sync::mpsc::Sender<QueryResult>,
|
||||
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
|
||||
@@ -524,13 +578,18 @@ pub async fn db_query(
|
||||
let mut first_result = true;
|
||||
while let Some(row) = event_rows.next()? {
|
||||
if first_result {
|
||||
debug!("time to first result: {:?}", start.elapsed());
|
||||
debug!(
|
||||
"time to first result: {:?} (cid={}, sub={:?})",
|
||||
start.elapsed(),
|
||||
client_id,
|
||||
sub.id
|
||||
);
|
||||
first_result = false;
|
||||
}
|
||||
// check if this is still active
|
||||
// TODO: check every N rows
|
||||
if abandon_query_rx.try_recv().is_ok() {
|
||||
debug!("query aborted");
|
||||
debug!("query aborted (sub={:?})", sub.id);
|
||||
return Ok(());
|
||||
}
|
||||
row_count += 1;
|
||||
@@ -542,10 +601,18 @@ pub async fn db_query(
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
query_tx
|
||||
.blocking_send(QueryResult {
|
||||
sub_id: sub.get_id(),
|
||||
event: "EOSE".to_string(),
|
||||
})
|
||||
.ok();
|
||||
debug!(
|
||||
"query completed ({} rows) in {:?}",
|
||||
"query completed ({} rows) in {:?} (cid={}, sub={:?})",
|
||||
row_count,
|
||||
start.elapsed()
|
||||
start.elapsed(),
|
||||
client_id,
|
||||
sub.id
|
||||
);
|
||||
} else {
|
||||
warn!("Could not get a database connection for querying");
|
||||
|
@@ -48,6 +48,8 @@ pub enum Error {
|
||||
JoinError,
|
||||
#[error("Hyper Client error")]
|
||||
HyperError(hyper::Error),
|
||||
#[error("Hex encoding error")]
|
||||
HexError(hex::FromHexError),
|
||||
#[error("Unknown/Undocumented")]
|
||||
UnknownError,
|
||||
}
|
||||
@@ -58,6 +60,12 @@ pub enum Error {
|
||||
// }
|
||||
//}
|
||||
|
||||
impl From<hex::FromHexError> for Error {
|
||||
fn from(h: hex::FromHexError) -> Self {
|
||||
Error::HexError(h)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<hyper::Error> for Error {
|
||||
fn from(h: hyper::Error) -> Self {
|
||||
Error::HyperError(h)
|
||||
|
90
src/event.rs
90
src/event.rs
@@ -39,9 +39,9 @@ pub struct Event {
|
||||
pub(crate) tags: Vec<Vec<String>>,
|
||||
pub(crate) content: String,
|
||||
pub(crate) sig: String,
|
||||
// Optimization for tag search, built on demand
|
||||
// Optimization for tag search, built on demand.
|
||||
#[serde(skip)]
|
||||
pub(crate) tagidx: Option<HashMap<String, HashSet<String>>>,
|
||||
pub(crate) tagidx: Option<HashMap<char, HashSet<String>>>,
|
||||
}
|
||||
|
||||
/// Simple tag type for array of array of strings.
|
||||
@@ -56,6 +56,25 @@ where
|
||||
Ok(opt.unwrap_or_else(Vec::new))
|
||||
}
|
||||
|
||||
/// Attempt to form a single-char tag name.
|
||||
pub fn single_char_tagname(tagname: &str) -> Option<char> {
|
||||
// We return the tag character if and only if the tagname consists
|
||||
// of a single char.
|
||||
let mut tagnamechars = tagname.chars();
|
||||
let firstchar = tagnamechars.next();
|
||||
return match firstchar {
|
||||
Some(_) => {
|
||||
// check second char
|
||||
if tagnamechars.next().is_none() {
|
||||
firstchar
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
}
|
||||
|
||||
/// Convert network event to parsed/validated event.
|
||||
impl From<EventCmd> for Result<Event> {
|
||||
fn from(ec: EventCmd) -> Result<Event> {
|
||||
@@ -99,17 +118,22 @@ impl Event {
|
||||
return;
|
||||
}
|
||||
// otherwise, build an index
|
||||
let mut idx: HashMap<String, HashSet<String>> = HashMap::new();
|
||||
let mut idx: HashMap<char, HashSet<String>> = HashMap::new();
|
||||
// iterate over tags that have at least 2 elements
|
||||
for t in self.tags.iter().filter(|x| x.len() > 1) {
|
||||
let tagname = t.get(0).unwrap();
|
||||
let tagnamechar_opt = single_char_tagname(tagname);
|
||||
if tagnamechar_opt.is_none() {
|
||||
continue;
|
||||
}
|
||||
let tagnamechar = tagnamechar_opt.unwrap();
|
||||
let tagval = t.get(1).unwrap();
|
||||
// ensure a vector exists for this tag
|
||||
if !idx.contains_key(tagname) {
|
||||
idx.insert(tagname.clone(), HashSet::new());
|
||||
if !idx.contains_key(&tagnamechar) {
|
||||
idx.insert(tagnamechar.clone(), HashSet::new());
|
||||
}
|
||||
// get the tag vec and insert entry
|
||||
let tidx = idx.get_mut(tagname).expect("could not get tag vector");
|
||||
let tidx = idx.get_mut(&tagnamechar).expect("could not get tag vector");
|
||||
tidx.insert(tagval.clone());
|
||||
}
|
||||
// save the tag structure
|
||||
@@ -124,6 +148,16 @@ impl Event {
|
||||
self.pubkey.chars().take(8).collect()
|
||||
}
|
||||
|
||||
/// Retrieve tag values
|
||||
pub fn tag_values_by_name(&self, tag_name: &str) -> Vec<String> {
|
||||
self.tags
|
||||
.iter()
|
||||
.filter(|x| x.len() > 1)
|
||||
.filter(|x| x.get(0).unwrap() == tag_name)
|
||||
.map(|x| x.get(1).unwrap().to_owned())
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Check if this event has a valid signature.
|
||||
fn is_valid(&self) -> bool {
|
||||
// TODO: return a Result with a reason for invalid events
|
||||
@@ -136,7 +170,7 @@ impl Event {
|
||||
if curr_time + (allowable_future as u64) < self.created_at {
|
||||
let delta = self.created_at - curr_time;
|
||||
debug!(
|
||||
"Event is too far in the future ({} seconds), rejecting",
|
||||
"event is too far in the future ({} seconds), rejecting",
|
||||
delta
|
||||
);
|
||||
return false;
|
||||
@@ -148,8 +182,6 @@ impl Event {
|
||||
// ** [0, pubkey-hex-string, created-at-num, kind-num, tags-array-of-arrays, content-string]
|
||||
// * serialize with no spaces/newlines
|
||||
let c_opt = self.to_canonical();
|
||||
debug!("Canonical: {:?}", &c_opt);
|
||||
debug!("Canonical: {}", c_opt.as_ref().unwrap());
|
||||
if c_opt.is_none() {
|
||||
debug!("event could not be canonicalized");
|
||||
return false;
|
||||
@@ -158,7 +190,6 @@ impl Event {
|
||||
// * compute the sha256sum.
|
||||
let digest: sha256::Hash = sha256::Hash::hash(c.as_bytes());
|
||||
let hex_digest = format!("{:x}", digest);
|
||||
debug!("hex is: {}", hex_digest);
|
||||
// * ensure the id matches the computed sha256sum.
|
||||
if self.id != hex_digest {
|
||||
debug!("event id does not match digest");
|
||||
@@ -172,11 +203,11 @@ impl Event {
|
||||
let verify = SECP.verify_schnorr(&sig, &msg, &pubkey);
|
||||
matches!(verify, Ok(()))
|
||||
} else {
|
||||
debug!("Client sent malformed pubkey");
|
||||
debug!("client sent malformed pubkey");
|
||||
false
|
||||
}
|
||||
} else {
|
||||
info!("Error converting digest to secp256k1 message");
|
||||
info!("error converting digest to secp256k1 message");
|
||||
false
|
||||
}
|
||||
}
|
||||
@@ -219,9 +250,10 @@ impl Event {
|
||||
}
|
||||
|
||||
/// Determine if the given tag and value set intersect with tags in this event.
|
||||
pub fn generic_tag_val_intersect(&self, tagname: &str, check: &HashSet<String>) -> bool {
|
||||
pub fn generic_tag_val_intersect(&self, tagname: char, check: &HashSet<String>) -> bool {
|
||||
match &self.tagidx {
|
||||
Some(idx) => match idx.get(tagname) {
|
||||
// check if this is indexable tagname
|
||||
Some(idx) => match idx.get(&tagname) {
|
||||
Some(valset) => {
|
||||
let common = valset.intersection(check);
|
||||
common.count() > 0
|
||||
@@ -269,7 +301,7 @@ mod tests {
|
||||
fn empty_event_tag_match() -> Result<()> {
|
||||
let event = simple_event();
|
||||
assert!(!event
|
||||
.generic_tag_val_intersect("e", &HashSet::from(["foo".to_owned(), "bar".to_owned()])));
|
||||
.generic_tag_val_intersect('e', &HashSet::from(["foo".to_owned(), "bar".to_owned()])));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -280,7 +312,7 @@ mod tests {
|
||||
event.build_index();
|
||||
assert_eq!(
|
||||
event.generic_tag_val_intersect(
|
||||
"e",
|
||||
'e',
|
||||
&HashSet::from(["foo".to_owned(), "bar".to_owned()])
|
||||
),
|
||||
true
|
||||
@@ -335,6 +367,32 @@ mod tests {
|
||||
assert_eq!(c, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_tag_select() {
|
||||
let e = Event {
|
||||
id: "999".to_owned(),
|
||||
pubkey: "012345".to_owned(),
|
||||
created_at: 501234,
|
||||
kind: 1,
|
||||
tags: vec![
|
||||
vec!["j".to_owned(), "abc".to_owned()],
|
||||
vec!["e".to_owned(), "foo".to_owned()],
|
||||
vec!["e".to_owned(), "bar".to_owned()],
|
||||
vec!["e".to_owned(), "baz".to_owned()],
|
||||
vec![
|
||||
"p".to_owned(),
|
||||
"aaaa".to_owned(),
|
||||
"ws://example.com".to_owned(),
|
||||
],
|
||||
],
|
||||
content: "this is a test".to_owned(),
|
||||
sig: "abcde".to_owned(),
|
||||
tagidx: None,
|
||||
};
|
||||
let v = e.tag_values_by_name("e");
|
||||
assert_eq!(v, vec!["foo", "bar", "baz"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_canonical_with_tags() {
|
||||
let e = Event {
|
||||
|
@@ -35,7 +35,7 @@ impl From<config::Info> for RelayInfo {
|
||||
description: i.description,
|
||||
pubkey: i.pubkey,
|
||||
contact: i.contact,
|
||||
supported_nips: Some(vec![1, 2, 11]),
|
||||
supported_nips: Some(vec![1, 2, 11, 15, 16]),
|
||||
software: Some("https://git.sr.ht/~gheartsfield/nostr-rs-relay".to_owned()),
|
||||
version: CARGO_PKG_VERSION.map(|x| x.to_owned()),
|
||||
}
|
||||
|
48
src/main.rs
48
src/main.rs
@@ -21,6 +21,7 @@ use nostr_rs_relay::info::RelayInfo;
|
||||
use nostr_rs_relay::nip05;
|
||||
use nostr_rs_relay::subscription::Subscription;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use std::collections::HashMap;
|
||||
use std::convert::Infallible;
|
||||
use std::env;
|
||||
@@ -61,7 +62,7 @@ async fn handle_web_request(
|
||||
) {
|
||||
// Request for / as websocket
|
||||
("/", true) => {
|
||||
debug!("websocket with upgrade request");
|
||||
trace!("websocket with upgrade request");
|
||||
//assume request is a handshake, so create the handshake response
|
||||
let response = match handshake::server::create_response_with_body(&request, || {
|
||||
Body::empty()
|
||||
@@ -133,14 +134,17 @@ async fn handle_web_request(
|
||||
return Ok(Response::builder()
|
||||
.status(200)
|
||||
.header("Content-Type", "application/nostr+json")
|
||||
.header("Access-Control-Allow-Origin", "*")
|
||||
.body(b)
|
||||
.unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Response::new(Body::from(
|
||||
"Please use a Nostr client to connect.",
|
||||
)))
|
||||
Ok(Response::builder()
|
||||
.status(200)
|
||||
.header("Content-Type", "text/plain")
|
||||
.body(Body::from("Please use a Nostr client to connect."))
|
||||
.unwrap())
|
||||
}
|
||||
(_, _) => {
|
||||
//handle any other url
|
||||
@@ -355,6 +359,11 @@ fn convert_to_msg(msg: String) -> Result<NostrMessage> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Turn a string into a NOTICE message ready to send over a WebSocket
|
||||
fn make_notice_message(msg: &str) -> Message {
|
||||
Message::text(json!(["NOTICE", msg]).to_string())
|
||||
}
|
||||
|
||||
/// Handle new client connections. This runs through an event loop
|
||||
/// for all client communication.
|
||||
async fn nostr_server(
|
||||
@@ -414,16 +423,20 @@ async fn nostr_server(
|
||||
ws_stream.send(Message::Ping(Vec::new())).await.ok();
|
||||
},
|
||||
Some(notice_msg) = notice_rx.recv() => {
|
||||
let n = notice_msg.to_string().replace("\"", "");
|
||||
ws_stream.send(Message::Text(format!("[\"NOTICE\",\"{}\"]", n))).await.ok();
|
||||
ws_stream.send(make_notice_message(¬ice_msg)).await.ok();
|
||||
},
|
||||
Some(query_result) = query_rx.recv() => {
|
||||
// database informed us of a query result we asked for
|
||||
client_received_event_count += 1;
|
||||
// send a result
|
||||
let subesc = query_result.sub_id.replace("\"", "");
|
||||
let send_str = format!("[\"EVENT\",\"{}\",{}]", subesc, &query_result.event);
|
||||
ws_stream.send(Message::Text(send_str)).await.ok();
|
||||
if query_result.event == "EOSE" {
|
||||
let send_str = format!("[\"EOSE\",\"{}\"]", subesc);
|
||||
ws_stream.send(Message::Text(send_str)).await.ok();
|
||||
} else {
|
||||
client_received_event_count += 1;
|
||||
// send a result
|
||||
let send_str = format!("[\"EVENT\",\"{}\",{}]", subesc, &query_result.event);
|
||||
ws_stream.send(Message::Text(send_str)).await.ok();
|
||||
}
|
||||
},
|
||||
// TODO: consider logging the LaggedRecv error
|
||||
Ok(global_event) = bcast_rx.recv() => {
|
||||
@@ -455,7 +468,7 @@ async fn nostr_server(
|
||||
convert_to_msg(m)
|
||||
},
|
||||
Some(Ok(Message::Binary(_))) => {
|
||||
ws_stream.send(Message::Text(format!("[\"NOTICE\",\"{}\"]", "binary messages are not accepted"))).await.ok();
|
||||
ws_stream.send(make_notice_message("binary messages are not accepted")).await.ok();
|
||||
continue;
|
||||
},
|
||||
Some(Ok(Message::Ping(_))) | Some(Ok(Message::Pong(_))) => {
|
||||
@@ -501,7 +514,7 @@ async fn nostr_server(
|
||||
},
|
||||
Err(_) => {
|
||||
info!("client {:?} sent an invalid event", cid);
|
||||
ws_stream.send(Message::Text(format!("[\"NOTICE\",\"{}\"]", "event was invalid"))).await.ok();
|
||||
ws_stream.send(make_notice_message("event was invalid")).await.ok();
|
||||
}
|
||||
}
|
||||
},
|
||||
@@ -519,12 +532,11 @@ async fn nostr_server(
|
||||
previous_query.send(()).ok();
|
||||
}
|
||||
// start a database query
|
||||
db::db_query(s, pool.clone(), query_tx.clone(), abandon_query_rx).await;
|
||||
db::db_query(s, cid.to_owned(), pool.clone(), query_tx.clone(), abandon_query_rx).await;
|
||||
},
|
||||
Err(e) => {
|
||||
info!("Subscription error: {}", e);
|
||||
let s = e.to_string().replace("\"", "");
|
||||
ws_stream.send(Message::Text(format!("[\"NOTICE\",\"{}\"]", s))).await.ok();
|
||||
ws_stream.send(make_notice_message(&e.to_string())).await.ok();
|
||||
}
|
||||
}
|
||||
},
|
||||
@@ -545,7 +557,7 @@ async fn nostr_server(
|
||||
},
|
||||
Err(_) => {
|
||||
info!("invalid command ignored");
|
||||
ws_stream.send(Message::Text(format!("[\"NOTICE\",\"{}\"]", "could not parse command"))).await.ok();
|
||||
ws_stream.send(make_notice_message("could not parse command")).await.ok();
|
||||
}
|
||||
}
|
||||
},
|
||||
@@ -555,11 +567,11 @@ async fn nostr_server(
|
||||
}
|
||||
Err(Error::EventMaxLengthError(s)) => {
|
||||
info!("client {:?} sent event larger ({} bytes) than max size", cid, s);
|
||||
ws_stream.send(Message::Text(format!("[\"NOTICE\",\"{}\"]", "event exceeded max size"))).await.ok();
|
||||
ws_stream.send(make_notice_message("event exceeded max size")).await.ok();
|
||||
},
|
||||
Err(Error::ProtoParseError) => {
|
||||
info!("client {:?} sent event that could not be parsed", cid);
|
||||
ws_stream.send(Message::Text(format!("[\"NOTICE\",\"{}\"]", "could not parse command"))).await.ok();
|
||||
ws_stream.send(make_notice_message("could not parse command")).await.ok();
|
||||
},
|
||||
Err(e) => {
|
||||
info!("got non-fatal error from client: {:?}, error: {:?}", cid, e);
|
||||
|
@@ -1,13 +1,13 @@
|
||||
//! Database schema and migrations
|
||||
use crate::db::PooledConnection;
|
||||
use crate::error::Result;
|
||||
use crate::utils::is_hex;
|
||||
use crate::event::{single_char_tagname, Event};
|
||||
use crate::utils::is_lower_hex;
|
||||
use log::*;
|
||||
use rusqlite::limits::Limit;
|
||||
use rusqlite::params;
|
||||
use rusqlite::Connection;
|
||||
|
||||
// TODO: drop the pubkey_ref and event_ref tables
|
||||
use std::time::Instant;
|
||||
|
||||
/// Startup DB Pragmas
|
||||
pub const STARTUP_SQL: &str = r##"
|
||||
@@ -24,7 +24,7 @@ PRAGMA journal_mode=WAL;
|
||||
PRAGMA main.synchronous=NORMAL;
|
||||
PRAGMA foreign_keys = ON;
|
||||
PRAGMA application_id = 1654008667;
|
||||
PRAGMA user_version = 5;
|
||||
PRAGMA user_version = 6;
|
||||
|
||||
-- Event Table
|
||||
CREATE TABLE IF NOT EXISTS event (
|
||||
@@ -53,7 +53,7 @@ id INTEGER PRIMARY KEY,
|
||||
event_id INTEGER NOT NULL, -- an event ID that contains a tag.
|
||||
name TEXT, -- the tag name ("p", "e", whatever)
|
||||
value TEXT, -- the tag value, if not hex.
|
||||
value_hex BLOB, -- the tag value, if it can be interpreted as a hex string.
|
||||
value_hex BLOB, -- the tag value, if it can be interpreted as a lowercase hex string.
|
||||
FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE CASCADE ON DELETE CASCADE
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS tag_val_index ON tag(value);
|
||||
@@ -103,7 +103,7 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<()> {
|
||||
if curr_version == 0 {
|
||||
match conn.execute_batch(INIT_SQL) {
|
||||
Ok(()) => {
|
||||
info!("database pragma/schema initialized to v4, and ready");
|
||||
info!("database pragma/schema initialized to v6, and ready");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("update failed: {}", err);
|
||||
@@ -154,7 +154,6 @@ PRAGMA user_version = 3;
|
||||
panic!("database could not be upgraded");
|
||||
}
|
||||
}
|
||||
info!("Starting transaction");
|
||||
// iterate over every event/pubkey tag
|
||||
let tx = conn.transaction()?;
|
||||
{
|
||||
@@ -166,7 +165,7 @@ PRAGMA user_version = 3;
|
||||
let tag_name: String = row.get(1)?;
|
||||
let tag_value: String = row.get(2)?;
|
||||
// this will leave behind p/e tags that were non-hex, but they are invalid anyways.
|
||||
if is_hex(&tag_value) {
|
||||
if is_lower_hex(&tag_value) {
|
||||
tx.execute(
|
||||
"INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);",
|
||||
params![event_id, tag_name, hex::decode(&tag_value).ok()],
|
||||
@@ -225,9 +224,63 @@ PRAGMA user_version=5;
|
||||
}
|
||||
}
|
||||
} else if curr_version == 5 {
|
||||
debug!("Database version was already current");
|
||||
} else if curr_version > 5 {
|
||||
panic!("Database version is newer than supported by this executable");
|
||||
info!("database schema needs update from 5->6");
|
||||
// We need to rebuild the tags table. iterate through the
|
||||
// event table. build event from json, insert tags into a
|
||||
// fresh tag table. This was needed due to a logic error in
|
||||
// how hex-like tags got indexed.
|
||||
let start = Instant::now();
|
||||
let tx = conn.transaction()?;
|
||||
{
|
||||
// Clear out table
|
||||
tx.execute("DELETE FROM tag;", [])?;
|
||||
let mut stmt = tx.prepare("select id, content from event order by id;")?;
|
||||
let mut tag_rows = stmt.query([])?;
|
||||
while let Some(row) = tag_rows.next()? {
|
||||
// we want to capture the event_id that had the tag, the tag name, and the tag hex value.
|
||||
let event_id: u64 = row.get(0)?;
|
||||
let event_json: String = row.get(1)?;
|
||||
let event: Event = serde_json::from_str(&event_json)?;
|
||||
// look at each event, and each tag, creating new tag entries if appropriate.
|
||||
for t in event.tags.iter().filter(|x| x.len() > 1) {
|
||||
let tagname = t.get(0).unwrap();
|
||||
let tagnamechar_opt = single_char_tagname(tagname);
|
||||
if tagnamechar_opt.is_none() {
|
||||
continue;
|
||||
}
|
||||
// safe because len was > 1
|
||||
let tagval = t.get(1).unwrap();
|
||||
// insert as BLOB if we can restore it losslessly.
|
||||
// this means it needs to be even length and lowercase.
|
||||
if (tagval.len() % 2 == 0) && is_lower_hex(&tagval) {
|
||||
tx.execute(
|
||||
"INSERT INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3);",
|
||||
params![event_id, tagname, hex::decode(&tagval).ok()],
|
||||
)?;
|
||||
} else {
|
||||
// otherwise, insert as text
|
||||
tx.execute(
|
||||
"INSERT INTO tag (event_id, name, value) VALUES (?1, ?2, ?3);",
|
||||
params![event_id, tagname, &tagval],
|
||||
)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
tx.execute("PRAGMA user_version = 6;", [])?;
|
||||
}
|
||||
tx.commit()?;
|
||||
info!("database schema upgraded v5 -> v6 in {:?}", start.elapsed());
|
||||
// vacuum after large table modification
|
||||
let start = Instant::now();
|
||||
conn.execute("VACUUM;", [])?;
|
||||
info!("vacuumed DB after tags rebuild in {:?}", start.elapsed());
|
||||
} else if curr_version == 6 {
|
||||
debug!("Database version was already current (v6)");
|
||||
} else if curr_version > 7 {
|
||||
panic!(
|
||||
"Database version is newer than supported by this executable (v{})",
|
||||
curr_version
|
||||
);
|
||||
}
|
||||
|
||||
// Setup PRAGMA
|
||||
|
@@ -31,9 +31,13 @@ pub struct ReqFilter {
|
||||
pub until: Option<u64>,
|
||||
/// List of author public keys
|
||||
pub authors: Option<Vec<String>>,
|
||||
/// Limit number of results
|
||||
pub limit: Option<u64>,
|
||||
/// Set of tags
|
||||
#[serde(skip)]
|
||||
pub tags: Option<HashMap<String, HashSet<String>>>,
|
||||
pub tags: Option<HashMap<char, HashSet<String>>>,
|
||||
/// Force no matches due to malformed data
|
||||
pub force_no_match: bool,
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for ReqFilter {
|
||||
@@ -54,7 +58,9 @@ impl<'de> Deserialize<'de> for ReqFilter {
|
||||
since: None,
|
||||
until: None,
|
||||
authors: None,
|
||||
limit: None,
|
||||
tags: None,
|
||||
force_no_match: false,
|
||||
};
|
||||
let mut ts = None;
|
||||
// iterate through each key, and assign values that exist
|
||||
@@ -68,22 +74,28 @@ impl<'de> Deserialize<'de> for ReqFilter {
|
||||
rf.since = Deserialize::deserialize(val).ok();
|
||||
} else if key == "until" {
|
||||
rf.until = Deserialize::deserialize(val).ok();
|
||||
} else if key == "limit" {
|
||||
rf.limit = Deserialize::deserialize(val).ok();
|
||||
} else if key == "authors" {
|
||||
rf.authors = Deserialize::deserialize(val).ok();
|
||||
} else if key.starts_with('#') && key.len() > 1 && val.is_array() {
|
||||
// remove the prefix
|
||||
let tagname = &key[1..];
|
||||
if ts.is_none() {
|
||||
// Initialize the tag if necessary
|
||||
ts = Some(HashMap::new());
|
||||
}
|
||||
if let Some(m) = ts.as_mut() {
|
||||
let tag_vals: Option<Vec<String>> = Deserialize::deserialize(val).ok();
|
||||
if let Some(v) = tag_vals {
|
||||
let hs = HashSet::from_iter(v.into_iter());
|
||||
m.insert(tagname.to_owned(), hs);
|
||||
if let Some(tag_search) = tag_search_char_from_filter(key) {
|
||||
if ts.is_none() {
|
||||
// Initialize the tag if necessary
|
||||
ts = Some(HashMap::new());
|
||||
}
|
||||
};
|
||||
if let Some(m) = ts.as_mut() {
|
||||
let tag_vals: Option<Vec<String>> = Deserialize::deserialize(val).ok();
|
||||
if let Some(v) = tag_vals {
|
||||
let hs = HashSet::from_iter(v.into_iter());
|
||||
m.insert(tag_search.to_owned(), hs);
|
||||
}
|
||||
};
|
||||
} else {
|
||||
// tag search that is multi-character, don't add to subscription
|
||||
rf.force_no_match = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
rf.tags = ts;
|
||||
@@ -91,6 +103,26 @@ impl<'de> Deserialize<'de> for ReqFilter {
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempt to form a single-char identifier from a tag search filter
|
||||
fn tag_search_char_from_filter(tagname: &str) -> Option<char> {
|
||||
let tagname_nohash = &tagname[1..];
|
||||
// We return the tag character if and only if the tagname consists
|
||||
// of a single char.
|
||||
let mut tagnamechars = tagname_nohash.chars();
|
||||
let firstchar = tagnamechars.next();
|
||||
return match firstchar {
|
||||
Some(_) => {
|
||||
// check second char
|
||||
if tagnamechars.next().is_none() {
|
||||
firstchar
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for Subscription {
|
||||
/// Custom deserializer for subscriptions, which have a more
|
||||
/// complex structure than the other message types.
|
||||
@@ -189,7 +221,7 @@ impl ReqFilter {
|
||||
// get the hashset from the filter.
|
||||
if let Some(map) = &self.tags {
|
||||
for (key, val) in map.iter() {
|
||||
let tag_match = event.generic_tag_val_intersect(key, val);
|
||||
let tag_match = event.generic_tag_val_intersect(*key, val);
|
||||
// if there is no match for this tag, the match fails.
|
||||
if !tag_match {
|
||||
return false;
|
||||
@@ -218,6 +250,7 @@ impl ReqFilter {
|
||||
&& self.kind_match(event.kind)
|
||||
&& self.authors_match(event)
|
||||
&& self.tag_match(event)
|
||||
&& !self.force_no_match
|
||||
}
|
||||
}
|
||||
|
||||
|
18
src/utils.rs
18
src/utils.rs
@@ -13,3 +13,21 @@ pub fn unix_time() -> u64 {
|
||||
pub fn is_hex(s: &str) -> bool {
|
||||
s.chars().all(|x| char::is_ascii_hexdigit(&x))
|
||||
}
|
||||
|
||||
/// Check if a string contains only lower-case hex chars.
|
||||
pub fn is_lower_hex(s: &str) -> bool {
|
||||
s.chars().all(|x| {
|
||||
(char::is_ascii_lowercase(&x) || char::is_ascii_digit(&x)) && char::is_ascii_hexdigit(&x)
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn lower_hex() {
|
||||
let hexstr = "abcd0123";
|
||||
assert_eq!(is_lower_hex(hexstr), true);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user