mirror of
https://github.com/scsibug/nostr-rs-relay.git
synced 2024-11-24 17:49:07 -05:00
refactor: reformat and remove tabs
This commit is contained in:
parent
6800c2e39d
commit
6489e685ab
|
@ -170,9 +170,9 @@ impl Settings {
|
|||
fn new_from_default(default: &Settings) -> Result<Self, ConfigError> {
|
||||
let builder = Config::builder();
|
||||
let config: Config = builder
|
||||
// use defaults
|
||||
// use defaults
|
||||
.add_source(Config::try_from(default)?)
|
||||
// override with file contents
|
||||
// override with file contents
|
||||
.add_source(File::with_name("config.toml"))
|
||||
.build()?;
|
||||
let mut settings: Settings = config.try_deserialize()?;
|
||||
|
@ -207,7 +207,7 @@ impl Default for Settings {
|
|||
diagnostics: Diagnostics { tracing: false },
|
||||
database: Database {
|
||||
data_directory: ".".to_owned(),
|
||||
engine: "sqlite".to_owned(),
|
||||
engine: "sqlite".to_owned(),
|
||||
in_memory: false,
|
||||
min_conn: 4,
|
||||
max_conn: 8,
|
||||
|
|
70
src/db.rs
70
src/db.rs
|
@ -117,7 +117,7 @@ pub async fn db_writer(
|
|||
debug!(
|
||||
"rejecting event: {}, blacklisted kind: {}",
|
||||
&event.get_event_id_prefix(),
|
||||
&event.kind
|
||||
&event.kind
|
||||
);
|
||||
notice_tx
|
||||
.try_send(Notice::blocked(
|
||||
|
@ -138,51 +138,51 @@ pub async fn db_writer(
|
|||
metadata_tx.send(event.clone()).ok();
|
||||
}
|
||||
|
||||
// check for NIP-05 verification
|
||||
// check for NIP-05 verification
|
||||
if nip05_enabled {
|
||||
match repo.get_latest_user_verification(&event.pubkey).await {
|
||||
Ok(uv) => {
|
||||
if uv.is_valid(&settings.verified_users) {
|
||||
info!(
|
||||
"new event from verified author ({:?},{:?})",
|
||||
uv.name.to_string(),
|
||||
event.get_author_prefix()
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
"rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)",
|
||||
uv.name.to_string(),
|
||||
event.get_author_prefix()
|
||||
);
|
||||
notice_tx
|
||||
.try_send(Notice::blocked(
|
||||
event.id,
|
||||
"NIP-05 verification is no longer valid (expired/wrong domain)",
|
||||
))
|
||||
.ok();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Err(Error::SqlError(rusqlite::Error::QueryReturnedNoRows)) => {
|
||||
debug!(
|
||||
"no verification records found for pubkey: {:?}",
|
||||
match repo.get_latest_user_verification(&event.pubkey).await {
|
||||
Ok(uv) => {
|
||||
if uv.is_valid(&settings.verified_users) {
|
||||
info!(
|
||||
"new event from verified author ({:?},{:?})",
|
||||
uv.name.to_string(),
|
||||
event.get_author_prefix()
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
"rejecting event, author ({:?} / {:?}) verification invalid (expired/wrong domain)",
|
||||
uv.name.to_string(),
|
||||
event.get_author_prefix()
|
||||
);
|
||||
notice_tx
|
||||
.try_send(Notice::blocked(
|
||||
event.id,
|
||||
"NIP-05 verification needed to publish events",
|
||||
"NIP-05 verification is no longer valid (expired/wrong domain)",
|
||||
))
|
||||
.ok();
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("checking nip05 verification status failed: {:?}", e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Err(Error::SqlError(rusqlite::Error::QueryReturnedNoRows)) => {
|
||||
debug!(
|
||||
"no verification records found for pubkey: {:?}",
|
||||
event.get_author_prefix()
|
||||
);
|
||||
notice_tx
|
||||
.try_send(Notice::blocked(
|
||||
event.id,
|
||||
"NIP-05 verification needed to publish events",
|
||||
))
|
||||
.ok();
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("checking nip05 verification status failed: {:?}", e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
// TODO: cache recent list of authors to remove a DB call.
|
||||
}
|
||||
// TODO: cache recent list of authors to remove a DB call.
|
||||
let start = Instant::now();
|
||||
if event.kind >= 20000 && event.kind < 30000 {
|
||||
bcast_tx.send(event.clone()).ok();
|
||||
|
@ -203,7 +203,7 @@ pub async fn db_writer(
|
|||
info!(
|
||||
"persisted event: {:?} (kind: {}) from: {:?} in: {:?}",
|
||||
event.get_event_id_prefix(),
|
||||
event.kind,
|
||||
event.kind,
|
||||
event.get_author_prefix(),
|
||||
start.elapsed()
|
||||
);
|
||||
|
|
22
src/event.rs
22
src/event.rs
|
@ -88,14 +88,14 @@ impl From<EventCmd> for Result<Event> {
|
|||
fn from(ec: EventCmd) -> Result<Event> {
|
||||
// ensure command is correct
|
||||
if ec.cmd == "EVENT" {
|
||||
ec.event.validate().map(|_| {
|
||||
ec.event.validate().map(|_| {
|
||||
let mut e = ec.event;
|
||||
e.build_index();
|
||||
e.update_delegation();
|
||||
e
|
||||
})
|
||||
} else {
|
||||
Err(CommandUnknownError)
|
||||
Err(CommandUnknownError)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -122,7 +122,7 @@ impl Event {
|
|||
|
||||
/// Should this event be replaced with newer timestamps from same author?
|
||||
#[must_use] pub fn is_replaceable(&self) -> bool {
|
||||
self.kind == 0 || self.kind == 3 || self.kind == 41 || (self.kind >= 10000 && self.kind < 20000)
|
||||
self.kind == 0 || self.kind == 3 || self.kind == 41 || (self.kind >= 10000 && self.kind < 20000)
|
||||
}
|
||||
|
||||
/// Pull a NIP-05 Name out of the event, if one exists
|
||||
|
@ -359,7 +359,7 @@ mod tests {
|
|||
fn empty_event_tag_match() {
|
||||
let event = Event::simple_event();
|
||||
assert!(!event
|
||||
.generic_tag_val_intersect('e', &HashSet::from(["foo".to_owned(), "bar".to_owned()])));
|
||||
.generic_tag_val_intersect('e', &HashSet::from(["foo".to_owned(), "bar".to_owned()])));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -506,13 +506,13 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn replaceable_event() {
|
||||
let mut event = Event::simple_event();
|
||||
event.kind=0;
|
||||
assert!(event.is_replaceable());
|
||||
event.kind=3;
|
||||
assert!(event.is_replaceable());
|
||||
event.kind=12000;
|
||||
assert!(event.is_replaceable());
|
||||
let mut event = Event::simple_event();
|
||||
event.kind=0;
|
||||
assert!(event.is_replaceable());
|
||||
event.kind=3;
|
||||
assert!(event.is_replaceable());
|
||||
event.kind=12000;
|
||||
assert!(event.is_replaceable());
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -57,8 +57,9 @@ fn is_all_fs(s: &str) -> bool {
|
|||
} else if odd {
|
||||
// check if first char in this byte is NOT 'f'
|
||||
if b < 240 {
|
||||
upper[byte_len] = b + 16; // bump up the first character in this byte
|
||||
// increment done, stop iterating through the vec
|
||||
// bump up the first character in this byte
|
||||
upper[byte_len] = b + 16;
|
||||
// increment done, stop iterating through the vec
|
||||
break;
|
||||
}
|
||||
// if it is 'f', reset the byte to 0 and do a carry
|
||||
|
|
|
@ -7,11 +7,8 @@ use std::sync::mpsc as syncmpsc;
|
|||
use std::sync::mpsc::{Receiver as MpscReceiver, Sender as MpscSender};
|
||||
use std::thread;
|
||||
use tracing::info;
|
||||
|
||||
|
||||
use console_subscriber::ConsoleLayer;
|
||||
|
||||
|
||||
/// Start running a Nostr relay server.
|
||||
fn main() {
|
||||
// configure settings from config.toml
|
||||
|
@ -23,8 +20,8 @@ fn main() {
|
|||
// enable tracing with tokio-console
|
||||
ConsoleLayer::builder().with_default_env().init();
|
||||
} else {
|
||||
// standard logging
|
||||
tracing_subscriber::fmt::try_init().unwrap();
|
||||
// standard logging
|
||||
tracing_subscriber::fmt::try_init().unwrap();
|
||||
}
|
||||
info!("Starting up from main");
|
||||
|
||||
|
|
94
src/nip05.rs
94
src/nip05.rs
|
@ -58,8 +58,8 @@ impl Nip05Name {
|
|||
"https://{}/.well-known/nostr.json?name={}",
|
||||
self.domain, self.local
|
||||
)
|
||||
.parse::<http::Uri>()
|
||||
.ok()
|
||||
.parse::<http::Uri>()
|
||||
.ok()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -70,7 +70,7 @@ impl std::convert::TryFrom<&str> for Nip05Name {
|
|||
// break full name at the @ boundary.
|
||||
let components: Vec<&str> = inet.split('@').collect();
|
||||
if components.len() == 2 {
|
||||
// check if local name is valid
|
||||
// check if local name is valid
|
||||
let local = components[0];
|
||||
let domain = components[1];
|
||||
if local.chars().all(|x| x.is_alphanumeric() || x == '_' || x == '-' || x == '.') {
|
||||
|
@ -93,7 +93,7 @@ impl std::convert::TryFrom<&str> for Nip05Name {
|
|||
))
|
||||
}
|
||||
} else {
|
||||
Err(Error::CustomError("too many/few components".to_owned()))
|
||||
Err(Error::CustomError("too many/few components".to_owned()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -122,7 +122,7 @@ fn body_contains_user(username: &str, address: &str, bytes: &hyper::body::Bytes)
|
|||
|
||||
impl Verifier {
|
||||
pub fn new(
|
||||
repo: Arc<dyn NostrRepo>,
|
||||
repo: Arc<dyn NostrRepo>,
|
||||
metadata_rx: tokio::sync::broadcast::Receiver<Event>,
|
||||
event_tx: tokio::sync::broadcast::Sender<Event>,
|
||||
settings: crate::config::Settings,
|
||||
|
@ -143,7 +143,7 @@ impl Verifier {
|
|||
// duration.
|
||||
let reverify_interval = tokio::time::interval(http_wait_duration);
|
||||
Ok(Verifier {
|
||||
repo,
|
||||
repo,
|
||||
metadata_rx,
|
||||
event_tx,
|
||||
settings,
|
||||
|
@ -343,35 +343,35 @@ impl Verifier {
|
|||
// timestamp.
|
||||
self.repo.update_verification_timestamp(v.rowid)
|
||||
.await?;
|
||||
info!("verification updated for {}", v.to_string());
|
||||
info!("verification updated for {}", v.to_string());
|
||||
|
||||
}
|
||||
UserWebVerificationStatus::DomainNotAllowed
|
||||
| UserWebVerificationStatus::Unknown => {
|
||||
// server may be offline, or temporarily
|
||||
// blocked by the config file. Note the
|
||||
// failure so we can process something
|
||||
// else.
|
||||
| UserWebVerificationStatus::Unknown => {
|
||||
// server may be offline, or temporarily
|
||||
// blocked by the config file. Note the
|
||||
// failure so we can process something
|
||||
// else.
|
||||
|
||||
// have we had enough failures to give up?
|
||||
if v.failure_count >= max_failures as u64 {
|
||||
info!(
|
||||
"giving up on verifying {:?} after {} failures",
|
||||
v.name, v.failure_count
|
||||
);
|
||||
self.repo.delete_verification(v.rowid)
|
||||
.await?;
|
||||
} else {
|
||||
// record normal failure, incrementing failure count
|
||||
info!("verification failed for {}", v.to_string());
|
||||
self.repo.fail_verification(v.rowid).await?;
|
||||
}
|
||||
}
|
||||
// have we had enough failures to give up?
|
||||
if v.failure_count >= max_failures as u64 {
|
||||
info!(
|
||||
"giving up on verifying {:?} after {} failures",
|
||||
v.name, v.failure_count
|
||||
);
|
||||
self.repo.delete_verification(v.rowid)
|
||||
.await?;
|
||||
} else {
|
||||
// record normal failure, incrementing failure count
|
||||
info!("verification failed for {}", v.to_string());
|
||||
self.repo.fail_verification(v.rowid).await?;
|
||||
}
|
||||
}
|
||||
UserWebVerificationStatus::Unverified => {
|
||||
// domain has removed the verification, drop
|
||||
// the record on our side.
|
||||
info!("verification rescinded for {}", v.to_string());
|
||||
self.repo.delete_verification(v.rowid)
|
||||
info!("verification rescinded for {}", v.to_string());
|
||||
self.repo.delete_verification(v.rowid)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
@ -405,27 +405,27 @@ impl Verifier {
|
|||
// disabled/passive, the event has already been persisted.
|
||||
let should_write_event = self.settings.verified_users.is_enabled();
|
||||
if should_write_event {
|
||||
match self.repo.write_event(event).await {
|
||||
Ok(updated) => {
|
||||
if updated != 0 {
|
||||
info!(
|
||||
"persisted event (new verified pubkey): {:?} in {:?}",
|
||||
event.get_event_id_prefix(),
|
||||
start.elapsed()
|
||||
);
|
||||
self.event_tx.send(event.clone()).ok();
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("event insert failed: {:?}", err);
|
||||
if let Error::SqlError(r) = err {
|
||||
warn!("because: : {:?}", r);
|
||||
}
|
||||
}
|
||||
}
|
||||
match self.repo.write_event(event).await {
|
||||
Ok(updated) => {
|
||||
if updated != 0 {
|
||||
info!(
|
||||
"persisted event (new verified pubkey): {:?} in {:?}",
|
||||
event.get_event_id_prefix(),
|
||||
start.elapsed()
|
||||
);
|
||||
self.event_tx.send(event.clone()).ok();
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("event insert failed: {:?}", err);
|
||||
if let Error::SqlError(r) = err {
|
||||
warn!("because: : {:?}", r);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// write the verification record
|
||||
self.repo.create_verification_record(&event.id, name).await?;
|
||||
self.repo.create_verification_record(&event.id, name).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -75,147 +75,147 @@ impl SqliteRepo {
|
|||
false,
|
||||
);
|
||||
|
||||
// this is used to block new reads during critical checkpoints
|
||||
let checkpoint_in_progress = Arc::new(Mutex::new(0));
|
||||
// SQLite can only effectively write single threaded, so don't
|
||||
// block multiple worker threads unnecessarily.
|
||||
let write_in_progress = Arc::new(Mutex::new(0));
|
||||
// this is used to block new reads during critical checkpoints
|
||||
let checkpoint_in_progress = Arc::new(Mutex::new(0));
|
||||
// SQLite can only effectively write single threaded, so don't
|
||||
// block multiple worker threads unnecessarily.
|
||||
let write_in_progress = Arc::new(Mutex::new(0));
|
||||
|
||||
SqliteRepo {
|
||||
read_pool,
|
||||
write_pool,
|
||||
maint_pool,
|
||||
checkpoint_in_progress,
|
||||
write_in_progress,
|
||||
write_pool,
|
||||
maint_pool,
|
||||
checkpoint_in_progress,
|
||||
write_in_progress,
|
||||
}
|
||||
}
|
||||
|
||||
/// Persist an event to the database, returning rows added.
|
||||
pub fn persist_event(conn: &mut PooledConnection, e: &Event) -> Result<u64> {
|
||||
// enable auto vacuum
|
||||
conn.execute_batch("pragma auto_vacuum = FULL")?;
|
||||
// enable auto vacuum
|
||||
conn.execute_batch("pragma auto_vacuum = FULL")?;
|
||||
|
||||
// start transaction
|
||||
let tx = conn.transaction()?;
|
||||
// get relevant fields from event and convert to blobs.
|
||||
let id_blob = hex::decode(&e.id).ok();
|
||||
let pubkey_blob: Option<Vec<u8>> = hex::decode(&e.pubkey).ok();
|
||||
let delegator_blob: Option<Vec<u8>> = e.delegated_by.as_ref().and_then(|d| hex::decode(d).ok());
|
||||
let event_str = serde_json::to_string(&e).ok();
|
||||
// check for replaceable events that would hide this one; we won't even attempt to insert these.
|
||||
if e.is_replaceable() {
|
||||
// start transaction
|
||||
let tx = conn.transaction()?;
|
||||
// get relevant fields from event and convert to blobs.
|
||||
let id_blob = hex::decode(&e.id).ok();
|
||||
let pubkey_blob: Option<Vec<u8>> = hex::decode(&e.pubkey).ok();
|
||||
let delegator_blob: Option<Vec<u8>> = e.delegated_by.as_ref().and_then(|d| hex::decode(d).ok());
|
||||
let event_str = serde_json::to_string(&e).ok();
|
||||
// check for replaceable events that would hide this one; we won't even attempt to insert these.
|
||||
if e.is_replaceable() {
|
||||
let repl_count = tx.query_row(
|
||||
"SELECT e.id FROM event e INDEXED BY author_index WHERE e.author=? AND e.kind=? AND e.created_at > ? LIMIT 1;",
|
||||
params![pubkey_blob, e.kind, e.created_at], |row| row.get::<usize, usize>(0));
|
||||
if repl_count.ok().is_some() {
|
||||
return Ok(0);
|
||||
}
|
||||
}
|
||||
// ignore if the event hash is a duplicate.
|
||||
let mut ins_count = tx.execute(
|
||||
"SELECT e.id FROM event e INDEXED BY author_index WHERE e.author=? AND e.kind=? AND e.created_at > ? LIMIT 1;",
|
||||
params![pubkey_blob, e.kind, e.created_at], |row| row.get::<usize, usize>(0));
|
||||
if repl_count.ok().is_some() {
|
||||
return Ok(0);
|
||||
}
|
||||
}
|
||||
// ignore if the event hash is a duplicate.
|
||||
let mut ins_count = tx.execute(
|
||||
"INSERT OR IGNORE INTO event (event_hash, created_at, kind, author, delegated_by, content, first_seen, hidden) VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%s','now'), FALSE);",
|
||||
params![id_blob, e.created_at, e.kind, pubkey_blob, delegator_blob, event_str]
|
||||
)? as u64;
|
||||
if ins_count == 0 {
|
||||
)? as u64;
|
||||
if ins_count == 0 {
|
||||
// if the event was a duplicate, no need to insert event or
|
||||
// pubkey references.
|
||||
tx.rollback().ok();
|
||||
return Ok(ins_count);
|
||||
}
|
||||
// remember primary key of the event most recently inserted.
|
||||
let ev_id = tx.last_insert_rowid();
|
||||
// add all tags to the tag table
|
||||
for tag in &e.tags {
|
||||
}
|
||||
// remember primary key of the event most recently inserted.
|
||||
let ev_id = tx.last_insert_rowid();
|
||||
// add all tags to the tag table
|
||||
for tag in &e.tags {
|
||||
// ensure we have 2 values.
|
||||
if tag.len() >= 2 {
|
||||
let tagname = &tag[0];
|
||||
let tagval = &tag[1];
|
||||
// only single-char tags are searchable
|
||||
let tagchar_opt = single_char_tagname(tagname);
|
||||
match &tagchar_opt {
|
||||
let tagname = &tag[0];
|
||||
let tagval = &tag[1];
|
||||
// only single-char tags are searchable
|
||||
let tagchar_opt = single_char_tagname(tagname);
|
||||
match &tagchar_opt {
|
||||
Some(_) => {
|
||||
// if tagvalue is lowercase hex;
|
||||
if is_lower_hex(tagval) && (tagval.len() % 2 == 0) {
|
||||
// if tagvalue is lowercase hex;
|
||||
if is_lower_hex(tagval) && (tagval.len() % 2 == 0) {
|
||||
tx.execute(
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
|
||||
params![ev_id, &tagname, hex::decode(tagval).ok()],
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value_hex) VALUES (?1, ?2, ?3)",
|
||||
params![ev_id, &tagname, hex::decode(tagval).ok()],
|
||||
)?;
|
||||
} else {
|
||||
} else {
|
||||
tx.execute(
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)",
|
||||
params![ev_id, &tagname, &tagval],
|
||||
"INSERT OR IGNORE INTO tag (event_id, name, value) VALUES (?1, ?2, ?3)",
|
||||
params![ev_id, &tagname, &tagval],
|
||||
)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// if this event is replaceable update, remove other replaceable
|
||||
// event with the same kind from the same author that was issued
|
||||
// earlier than this.
|
||||
if e.is_replaceable() {
|
||||
let author = hex::decode(&e.pubkey).ok();
|
||||
// this is a backwards check - hide any events that were older.
|
||||
}
|
||||
// if this event is replaceable update, remove other replaceable
|
||||
// event with the same kind from the same author that was issued
|
||||
// earlier than this.
|
||||
if e.is_replaceable() {
|
||||
let author = hex::decode(&e.pubkey).ok();
|
||||
// this is a backwards check - hide any events that were older.
|
||||
let update_count = tx.execute(
|
||||
"DELETE FROM event WHERE kind=? and author=? and id NOT IN (SELECT id FROM event INDEXED BY author_kind_index WHERE kind=? AND author=? ORDER BY created_at DESC LIMIT 1)",
|
||||
params![e.kind, author, e.kind, author],
|
||||
"DELETE FROM event WHERE kind=? and author=? and id NOT IN (SELECT id FROM event INDEXED BY author_kind_index WHERE kind=? AND author=? ORDER BY created_at DESC LIMIT 1)",
|
||||
params![e.kind, author, e.kind, author],
|
||||
)?;
|
||||
if update_count > 0 {
|
||||
info!(
|
||||
info!(
|
||||
"removed {} older replaceable kind {} events for author: {:?}",
|
||||
update_count,
|
||||
e.kind,
|
||||
e.get_author_prefix()
|
||||
);
|
||||
);
|
||||
}
|
||||
}
|
||||
// if this event is a deletion, hide the referenced events from the same author.
|
||||
if e.kind == 5 {
|
||||
}
|
||||
// if this event is a deletion, hide the referenced events from the same author.
|
||||
if e.kind == 5 {
|
||||
let event_candidates = e.tag_values_by_name("e");
|
||||
// first parameter will be author
|
||||
let mut params: Vec<Box<dyn ToSql>> = vec![Box::new(hex::decode(&e.pubkey)?)];
|
||||
event_candidates
|
||||
.iter()
|
||||
.filter(|x| is_hex(x) && x.len() == 64)
|
||||
.filter_map(|x| hex::decode(x).ok())
|
||||
.for_each(|x| params.push(Box::new(x)));
|
||||
.iter()
|
||||
.filter(|x| is_hex(x) && x.len() == 64)
|
||||
.filter_map(|x| hex::decode(x).ok())
|
||||
.for_each(|x| params.push(Box::new(x)));
|
||||
let query = format!(
|
||||
"UPDATE event SET hidden=TRUE WHERE kind!=5 AND author=? AND event_hash IN ({})",
|
||||
repeat_vars(params.len() - 1)
|
||||
"UPDATE event SET hidden=TRUE WHERE kind!=5 AND author=? AND event_hash IN ({})",
|
||||
repeat_vars(params.len() - 1)
|
||||
);
|
||||
let mut stmt = tx.prepare(&query)?;
|
||||
let update_count = stmt.execute(rusqlite::params_from_iter(params))?;
|
||||
info!(
|
||||
"hid {} deleted events for author {:?}",
|
||||
update_count,
|
||||
e.get_author_prefix()
|
||||
"hid {} deleted events for author {:?}",
|
||||
update_count,
|
||||
e.get_author_prefix()
|
||||
);
|
||||
} else {
|
||||
} else {
|
||||
// check if a deletion has already been recorded for this event.
|
||||
// Only relevant for non-deletion events
|
||||
let del_count = tx.query_row(
|
||||
"SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND t.name='e' AND e.kind=5 AND t.value_hex=? LIMIT 1;",
|
||||
params![pubkey_blob, id_blob], |row| row.get::<usize, usize>(0));
|
||||
"SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND t.name='e' AND e.kind=5 AND t.value_hex=? LIMIT 1;",
|
||||
params![pubkey_blob, id_blob], |row| row.get::<usize, usize>(0));
|
||||
// check if a the query returned a result, meaning we should
|
||||
// hid the current event
|
||||
if del_count.ok().is_some() {
|
||||
// a deletion already existed, mark original event as hidden.
|
||||
info!(
|
||||
// a deletion already existed, mark original event as hidden.
|
||||
info!(
|
||||
"hid event: {:?} due to existing deletion by author: {:?}",
|
||||
e.get_event_id_prefix(),
|
||||
e.get_author_prefix()
|
||||
);
|
||||
let _update_count =
|
||||
);
|
||||
let _update_count =
|
||||
tx.execute("UPDATE event SET hidden=TRUE WHERE id=?", params![ev_id])?;
|
||||
// event was deleted, so let caller know nothing new
|
||||
// arrived, preventing this from being sent to active
|
||||
// subscriptions
|
||||
ins_count = 0;
|
||||
// event was deleted, so let caller know nothing new
|
||||
// arrived, preventing this from being sent to active
|
||||
// subscriptions
|
||||
ins_count = 0;
|
||||
}
|
||||
}
|
||||
tx.commit()?;
|
||||
Ok(ins_count)
|
||||
}
|
||||
tx.commit()?;
|
||||
Ok(ins_count)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -223,25 +223,25 @@ impl SqliteRepo {
|
|||
impl NostrRepo for SqliteRepo {
|
||||
|
||||
async fn start(&self) -> Result<()> {
|
||||
db_checkpoint_task(self.maint_pool.clone(), Duration::from_secs(60), self.checkpoint_in_progress.clone()).await
|
||||
db_checkpoint_task(self.maint_pool.clone(), Duration::from_secs(60), self.checkpoint_in_progress.clone()).await
|
||||
}
|
||||
|
||||
async fn migrate_up(&self) -> Result<usize> {
|
||||
let _write_guard = self.write_in_progress.lock().await;
|
||||
let mut conn = self.write_pool.get()?;
|
||||
task::spawn_blocking(move || {
|
||||
upgrade_db(&mut conn)
|
||||
}).await?
|
||||
let _write_guard = self.write_in_progress.lock().await;
|
||||
let mut conn = self.write_pool.get()?;
|
||||
task::spawn_blocking(move || {
|
||||
upgrade_db(&mut conn)
|
||||
}).await?
|
||||
}
|
||||
/// Persist event to database
|
||||
async fn write_event(&self, e: &Event) -> Result<u64> {
|
||||
let _write_guard = self.write_in_progress.lock().await;
|
||||
// spawn a blocking thread
|
||||
let mut conn = self.write_pool.get()?;
|
||||
let e = e.clone();
|
||||
task::spawn_blocking(move || {
|
||||
SqliteRepo::persist_event(&mut conn, &e)
|
||||
}).await?
|
||||
let _write_guard = self.write_in_progress.lock().await;
|
||||
// spawn a blocking thread
|
||||
let mut conn = self.write_pool.get()?;
|
||||
let e = e.clone();
|
||||
task::spawn_blocking(move || {
|
||||
SqliteRepo::persist_event(&mut conn, &e)
|
||||
}).await?
|
||||
}
|
||||
|
||||
/// Perform a database query using a subscription.
|
||||
|
@ -257,28 +257,28 @@ impl NostrRepo for SqliteRepo {
|
|||
query_tx: tokio::sync::mpsc::Sender<QueryResult>,
|
||||
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
|
||||
) -> Result<()> {
|
||||
let pre_spawn_start = Instant::now();
|
||||
let self=self.clone();
|
||||
task::spawn_blocking(move || {
|
||||
{
|
||||
// if we are waiting on a checkpoint, stop until it is complete
|
||||
let _x = self.checkpoint_in_progress.blocking_lock();
|
||||
}
|
||||
let pre_spawn_start = Instant::now();
|
||||
let self=self.clone();
|
||||
task::spawn_blocking(move || {
|
||||
{
|
||||
// if we are waiting on a checkpoint, stop until it is complete
|
||||
let _x = self.checkpoint_in_progress.blocking_lock();
|
||||
}
|
||||
let db_queue_time = pre_spawn_start.elapsed();
|
||||
// if the queue time was very long (>5 seconds), spare the DB and abort.
|
||||
if db_queue_time > Duration::from_secs(5) {
|
||||
info!(
|
||||
info!(
|
||||
"shedding DB query load queued for {:?} (cid: {}, sub: {:?})",
|
||||
db_queue_time, client_id, sub.id
|
||||
);
|
||||
return Ok(());
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
// otherwise, report queuing time if it is slow
|
||||
else if db_queue_time > Duration::from_secs(1) {
|
||||
debug!(
|
||||
debug!(
|
||||
"(slow) DB query queued for {:?} (cid: {}, sub: {:?})",
|
||||
db_queue_time, client_id, sub.id
|
||||
);
|
||||
);
|
||||
}
|
||||
let start = Instant::now();
|
||||
let mut row_count: usize = 0;
|
||||
|
@ -287,7 +287,7 @@ impl NostrRepo for SqliteRepo {
|
|||
let sql_gen_elapsed = start.elapsed();
|
||||
|
||||
if sql_gen_elapsed > Duration::from_millis(10) {
|
||||
debug!("SQL (slow) generated in {:?}", start.elapsed());
|
||||
debug!("SQL (slow) generated in {:?}", start.elapsed());
|
||||
}
|
||||
// cutoff for displaying slow queries
|
||||
let slow_cutoff = Duration::from_millis(2000);
|
||||
|
@ -298,69 +298,69 @@ impl NostrRepo for SqliteRepo {
|
|||
let mut slow_first_event;
|
||||
let mut last_successful_send = Instant::now();
|
||||
if let Ok(mut conn) = self.read_pool.get() {
|
||||
// execute the query.
|
||||
// make the actual SQL query (with parameters inserted) available
|
||||
conn.trace(Some(|x| {trace!("SQL trace: {:?}", x)}));
|
||||
let mut stmt = conn.prepare_cached(&q)?;
|
||||
let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
|
||||
// execute the query.
|
||||
// make the actual SQL query (with parameters inserted) available
|
||||
conn.trace(Some(|x| {trace!("SQL trace: {:?}", x)}));
|
||||
let mut stmt = conn.prepare_cached(&q)?;
|
||||
let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
|
||||
|
||||
let mut first_result = true;
|
||||
while let Some(row) = event_rows.next()? {
|
||||
let mut first_result = true;
|
||||
while let Some(row) = event_rows.next()? {
|
||||
let first_event_elapsed = start.elapsed();
|
||||
slow_first_event = first_event_elapsed >= slow_cutoff;
|
||||
if first_result {
|
||||
debug!(
|
||||
debug!(
|
||||
"first result in {:?} (cid: {}, sub: {:?}) [used indexes: {:?}]",
|
||||
first_event_elapsed, client_id, sub.id, idxs
|
||||
);
|
||||
first_result = false;
|
||||
);
|
||||
first_result = false;
|
||||
}
|
||||
// logging for slow queries; show sub and SQL.
|
||||
// to reduce logging; only show 1/16th of clients (leading 0)
|
||||
if row_count == 0 && slow_first_event && client_id.starts_with('0') {
|
||||
debug!(
|
||||
debug!(
|
||||
"query req (slow): {:?} (cid: {}, sub: {:?})",
|
||||
sub, client_id, sub.id
|
||||
);
|
||||
}
|
||||
// check if a checkpoint is trying to run, and abort
|
||||
if row_count % 100 == 0 {
|
||||
{
|
||||
if self.checkpoint_in_progress.try_lock().is_err() {
|
||||
// lock was held, abort this query
|
||||
debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
// check if a checkpoint is trying to run, and abort
|
||||
if row_count % 100 == 0 {
|
||||
{
|
||||
if self.checkpoint_in_progress.try_lock().is_err() {
|
||||
// lock was held, abort this query
|
||||
debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check if this is still active; every 100 rows
|
||||
if row_count % 100 == 0 && abandon_query_rx.try_recv().is_ok() {
|
||||
debug!("query aborted (cid: {}, sub: {:?})", client_id, sub.id);
|
||||
return Ok(());
|
||||
debug!("query aborted (cid: {}, sub: {:?})", client_id, sub.id);
|
||||
return Ok(());
|
||||
}
|
||||
row_count += 1;
|
||||
let event_json = row.get(0)?;
|
||||
loop {
|
||||
if query_tx.capacity() != 0 {
|
||||
if query_tx.capacity() != 0 {
|
||||
// we have capacity to add another item
|
||||
break;
|
||||
}
|
||||
}
|
||||
// the queue is full
|
||||
trace!("db reader thread is stalled");
|
||||
if last_successful_send + abort_cutoff < Instant::now() {
|
||||
// the queue has been full for too long, abort
|
||||
info!("aborting database query due to slow client (cid: {}, sub: {:?})",
|
||||
client_id, sub.id);
|
||||
let ok: Result<()> = Ok(());
|
||||
return ok;
|
||||
// the queue has been full for too long, abort
|
||||
info!("aborting database query due to slow client (cid: {}, sub: {:?})",
|
||||
client_id, sub.id);
|
||||
let ok: Result<()> = Ok(());
|
||||
return ok;
|
||||
}
|
||||
// check if a checkpoint is trying to run, and abort
|
||||
if self.checkpoint_in_progress.try_lock().is_err() {
|
||||
// lock was held, abort this query
|
||||
debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id);
|
||||
return Ok(());
|
||||
}
|
||||
// check if a checkpoint is trying to run, and abort
|
||||
if self.checkpoint_in_progress.try_lock().is_err() {
|
||||
// lock was held, abort this query
|
||||
debug!("query aborted due to checkpoint (cid: {}, sub: {:?})", client_id, sub.id);
|
||||
return Ok(());
|
||||
}
|
||||
// give the queue a chance to clear before trying again
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
}
|
||||
|
@ -368,79 +368,79 @@ impl NostrRepo for SqliteRepo {
|
|||
// getting the query result back as part of the error
|
||||
// result.
|
||||
query_tx
|
||||
.blocking_send(QueryResult {
|
||||
.blocking_send(QueryResult {
|
||||
sub_id: sub.get_id(),
|
||||
event: event_json,
|
||||
})
|
||||
.ok();
|
||||
})
|
||||
.ok();
|
||||
last_successful_send = Instant::now();
|
||||
}
|
||||
query_tx
|
||||
}
|
||||
query_tx
|
||||
.blocking_send(QueryResult {
|
||||
sub_id: sub.get_id(),
|
||||
event: "EOSE".to_string(),
|
||||
sub_id: sub.get_id(),
|
||||
event: "EOSE".to_string(),
|
||||
})
|
||||
.ok();
|
||||
debug!(
|
||||
debug!(
|
||||
"query completed in {:?} (cid: {}, sub: {:?}, db_time: {:?}, rows: {})",
|
||||
pre_spawn_start.elapsed(),
|
||||
client_id,
|
||||
sub.id,
|
||||
start.elapsed(),
|
||||
row_count
|
||||
);
|
||||
);
|
||||
} else {
|
||||
warn!("Could not get a database connection for querying");
|
||||
warn!("Could not get a database connection for querying");
|
||||
}
|
||||
let ok: Result<()> = Ok(());
|
||||
ok
|
||||
});
|
||||
Ok(())
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Perform normal maintenance
|
||||
async fn optimize_db(&self) -> Result<()> {
|
||||
let conn = self.write_pool.get()?;
|
||||
task::spawn_blocking(move || {
|
||||
let start = Instant::now();
|
||||
conn.execute_batch("PRAGMA optimize;").ok();
|
||||
info!("optimize ran in {:?}", start.elapsed());
|
||||
}).await?;
|
||||
Ok(())
|
||||
let conn = self.write_pool.get()?;
|
||||
task::spawn_blocking(move || {
|
||||
let start = Instant::now();
|
||||
conn.execute_batch("PRAGMA optimize;").ok();
|
||||
info!("optimize ran in {:?}", start.elapsed());
|
||||
}).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create a new verification record connected to a specific event
|
||||
async fn create_verification_record(&self, event_id: &str, name: &str) -> Result<()> {
|
||||
let e = hex::decode(event_id).ok();
|
||||
let n = name.to_owned();
|
||||
let mut conn = self.write_pool.get()?;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let e = hex::decode(event_id).ok();
|
||||
let n = name.to_owned();
|
||||
let mut conn = self.write_pool.get()?;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let tx = conn.transaction()?;
|
||||
{
|
||||
// if we create a /new/ one, we should get rid of any old ones. or group the new ones by name and only consider the latest.
|
||||
let query = "INSERT INTO user_verification (metadata_event, name, verified_at) VALUES ((SELECT id from event WHERE event_hash=?), ?, strftime('%s','now'));";
|
||||
let mut stmt = tx.prepare(query)?;
|
||||
stmt.execute(params![e, n])?;
|
||||
// get the row ID
|
||||
let v_id = tx.last_insert_rowid();
|
||||
// delete everything else by this name
|
||||
let del_query = "DELETE FROM user_verification WHERE name = ? AND id != ?;";
|
||||
let mut del_stmt = tx.prepare(del_query)?;
|
||||
let count = del_stmt.execute(params![n,v_id])?;
|
||||
if count > 0 {
|
||||
// if we create a /new/ one, we should get rid of any old ones. or group the new ones by name and only consider the latest.
|
||||
let query = "INSERT INTO user_verification (metadata_event, name, verified_at) VALUES ((SELECT id from event WHERE event_hash=?), ?, strftime('%s','now'));";
|
||||
let mut stmt = tx.prepare(query)?;
|
||||
stmt.execute(params![e, n])?;
|
||||
// get the row ID
|
||||
let v_id = tx.last_insert_rowid();
|
||||
// delete everything else by this name
|
||||
let del_query = "DELETE FROM user_verification WHERE name = ? AND id != ?;";
|
||||
let mut del_stmt = tx.prepare(del_query)?;
|
||||
let count = del_stmt.execute(params![n,v_id])?;
|
||||
if count > 0 {
|
||||
info!("removed {} old verification records for ({:?})", count, n);
|
||||
}
|
||||
}
|
||||
}
|
||||
tx.commit()?;
|
||||
info!("saved new verification record for ({:?})", n);
|
||||
let ok: Result<()> = Ok(());
|
||||
ok
|
||||
}).await?
|
||||
}).await?
|
||||
}
|
||||
|
||||
/// Update verification timestamp
|
||||
async fn update_verification_timestamp(&self, id: u64) -> Result<()> {
|
||||
let mut conn = self.write_pool.get()?;
|
||||
let mut conn = self.write_pool.get()?;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
// add some jitter to the verification to prevent everything from stacking up together.
|
||||
let verif_time = now_jitter(600);
|
||||
|
@ -462,8 +462,8 @@ impl NostrRepo for SqliteRepo {
|
|||
|
||||
/// Update verification record as failed
|
||||
async fn fail_verification(&self, id: u64) -> Result<()> {
|
||||
let mut conn = self.write_pool.get()?;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let mut conn = self.write_pool.get()?;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
// add some jitter to the verification to prevent everything from stacking up together.
|
||||
let fail_time = now_jitter(600);
|
||||
let tx = conn.transaction()?;
|
||||
|
@ -481,8 +481,8 @@ impl NostrRepo for SqliteRepo {
|
|||
|
||||
/// Delete verification record
|
||||
async fn delete_verification(&self, id: u64) -> Result<()> {
|
||||
let mut conn = self.write_pool.get()?;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let mut conn = self.write_pool.get()?;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let tx = conn.transaction()?;
|
||||
{
|
||||
let query = "DELETE FROM user_verification WHERE id=?;";
|
||||
|
@ -498,78 +498,78 @@ impl NostrRepo for SqliteRepo {
|
|||
|
||||
/// Get the latest verification record for a given pubkey.
|
||||
async fn get_latest_user_verification(&self, pub_key: &str) -> Result<VerificationRecord> {
|
||||
let mut conn = self.read_pool.get()?;
|
||||
let pub_key = pub_key.to_owned();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let tx = conn.transaction()?;
|
||||
let query = "SELECT v.id, v.name, e.event_hash, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v LEFT JOIN event e ON e.id=v.metadata_event WHERE e.author=? ORDER BY e.created_at DESC, v.verified_at DESC, v.failed_at DESC LIMIT 1;";
|
||||
let mut stmt = tx.prepare_cached(query)?;
|
||||
let fields = stmt.query_row(params![hex::decode(&pub_key).ok()], |r| {
|
||||
let rowid: u64 = r.get(0)?;
|
||||
let rowname: String = r.get(1)?;
|
||||
let eventid: Vec<u8> = r.get(2)?;
|
||||
let created_at: u64 = r.get(3)?;
|
||||
// create a tuple since we can't throw non-rusqlite errors in this closure
|
||||
Ok((
|
||||
rowid,
|
||||
rowname,
|
||||
eventid,
|
||||
created_at,
|
||||
r.get(4).ok(),
|
||||
r.get(5).ok(),
|
||||
r.get(6)?,
|
||||
))
|
||||
})?;
|
||||
Ok(VerificationRecord {
|
||||
rowid: fields.0,
|
||||
name: Nip05Name::try_from(&fields.1[..])?,
|
||||
address: pub_key,
|
||||
event: hex::encode(fields.2),
|
||||
event_created: fields.3,
|
||||
last_success: fields.4,
|
||||
last_failure: fields.5,
|
||||
failure_count: fields.6,
|
||||
})
|
||||
}).await?
|
||||
let mut conn = self.read_pool.get()?;
|
||||
let pub_key = pub_key.to_owned();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let tx = conn.transaction()?;
|
||||
let query = "SELECT v.id, v.name, e.event_hash, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v LEFT JOIN event e ON e.id=v.metadata_event WHERE e.author=? ORDER BY e.created_at DESC, v.verified_at DESC, v.failed_at DESC LIMIT 1;";
|
||||
let mut stmt = tx.prepare_cached(query)?;
|
||||
let fields = stmt.query_row(params![hex::decode(&pub_key).ok()], |r| {
|
||||
let rowid: u64 = r.get(0)?;
|
||||
let rowname: String = r.get(1)?;
|
||||
let eventid: Vec<u8> = r.get(2)?;
|
||||
let created_at: u64 = r.get(3)?;
|
||||
// create a tuple since we can't throw non-rusqlite errors in this closure
|
||||
Ok((
|
||||
rowid,
|
||||
rowname,
|
||||
eventid,
|
||||
created_at,
|
||||
r.get(4).ok(),
|
||||
r.get(5).ok(),
|
||||
r.get(6)?,
|
||||
))
|
||||
})?;
|
||||
Ok(VerificationRecord {
|
||||
rowid: fields.0,
|
||||
name: Nip05Name::try_from(&fields.1[..])?,
|
||||
address: pub_key,
|
||||
event: hex::encode(fields.2),
|
||||
event_created: fields.3,
|
||||
last_success: fields.4,
|
||||
last_failure: fields.5,
|
||||
failure_count: fields.6,
|
||||
})
|
||||
}).await?
|
||||
}
|
||||
|
||||
/// Get oldest verification before timestamp
|
||||
async fn get_oldest_user_verification(&self, before: u64) -> Result<VerificationRecord> {
|
||||
let mut conn = self.read_pool.get()?;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let tx = conn.transaction()?;
|
||||
let query = "SELECT v.id, v.name, e.event_hash, e.author, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v INNER JOIN event e ON e.id=v.metadata_event WHERE (v.verified_at < ? OR v.verified_at IS NULL) AND (v.failed_at < ? OR v.failed_at IS NULL) ORDER BY v.verified_at ASC, v.failed_at ASC LIMIT 1;";
|
||||
let mut stmt = tx.prepare_cached(query)?;
|
||||
let fields = stmt.query_row(params![before, before], |r| {
|
||||
let rowid: u64 = r.get(0)?;
|
||||
let rowname: String = r.get(1)?;
|
||||
let eventid: Vec<u8> = r.get(2)?;
|
||||
let pubkey: Vec<u8> = r.get(3)?;
|
||||
let created_at: u64 = r.get(4)?;
|
||||
// create a tuple since we can't throw non-rusqlite errors in this closure
|
||||
Ok((
|
||||
rowid,
|
||||
rowname,
|
||||
eventid,
|
||||
pubkey,
|
||||
created_at,
|
||||
r.get(5).ok(),
|
||||
r.get(6).ok(),
|
||||
r.get(7)?,
|
||||
))
|
||||
})?;
|
||||
let vr = VerificationRecord {
|
||||
rowid: fields.0,
|
||||
name: Nip05Name::try_from(&fields.1[..])?,
|
||||
address: hex::encode(fields.3),
|
||||
event: hex::encode(fields.2),
|
||||
event_created: fields.4,
|
||||
last_success: fields.5,
|
||||
last_failure: fields.6,
|
||||
failure_count: fields.7,
|
||||
};
|
||||
Ok(vr)
|
||||
}).await?
|
||||
let mut conn = self.read_pool.get()?;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let tx = conn.transaction()?;
|
||||
let query = "SELECT v.id, v.name, e.event_hash, e.author, e.created_at, v.verified_at, v.failed_at, v.failure_count FROM user_verification v INNER JOIN event e ON e.id=v.metadata_event WHERE (v.verified_at < ? OR v.verified_at IS NULL) AND (v.failed_at < ? OR v.failed_at IS NULL) ORDER BY v.verified_at ASC, v.failed_at ASC LIMIT 1;";
|
||||
let mut stmt = tx.prepare_cached(query)?;
|
||||
let fields = stmt.query_row(params![before, before], |r| {
|
||||
let rowid: u64 = r.get(0)?;
|
||||
let rowname: String = r.get(1)?;
|
||||
let eventid: Vec<u8> = r.get(2)?;
|
||||
let pubkey: Vec<u8> = r.get(3)?;
|
||||
let created_at: u64 = r.get(4)?;
|
||||
// create a tuple since we can't throw non-rusqlite errors in this closure
|
||||
Ok((
|
||||
rowid,
|
||||
rowname,
|
||||
eventid,
|
||||
pubkey,
|
||||
created_at,
|
||||
r.get(5).ok(),
|
||||
r.get(6).ok(),
|
||||
r.get(7)?,
|
||||
))
|
||||
})?;
|
||||
let vr = VerificationRecord {
|
||||
rowid: fields.0,
|
||||
name: Nip05Name::try_from(&fields.1[..])?,
|
||||
address: hex::encode(fields.3),
|
||||
event: hex::encode(fields.2),
|
||||
event_created: fields.4,
|
||||
last_success: fields.5,
|
||||
last_failure: fields.6,
|
||||
failure_count: fields.7,
|
||||
};
|
||||
Ok(vr)
|
||||
}).await?
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -578,27 +578,27 @@ fn override_index(f: &ReqFilter) -> Option<String> {
|
|||
// queries for multiple kinds default to kind_index, which is
|
||||
// significantly slower than kind_created_at_index.
|
||||
if let Some(ks) = &f.kinds {
|
||||
if f.ids.is_none() &&
|
||||
ks.len() > 1 &&
|
||||
f.since.is_none() &&
|
||||
f.until.is_none() &&
|
||||
f.tags.is_none() &&
|
||||
f.authors.is_none() {
|
||||
return Some("kind_created_at_index".into());
|
||||
}
|
||||
if f.ids.is_none() &&
|
||||
ks.len() > 1 &&
|
||||
f.since.is_none() &&
|
||||
f.until.is_none() &&
|
||||
f.tags.is_none() &&
|
||||
f.authors.is_none() {
|
||||
return Some("kind_created_at_index".into());
|
||||
}
|
||||
}
|
||||
// if there is an author, it is much better to force the authors index.
|
||||
if f.authors.is_some() {
|
||||
if f.since.is_none() && f.until.is_none() {
|
||||
if f.kinds.is_none() {
|
||||
// with no use of kinds/created_at, just author
|
||||
return Some("author_index".into());
|
||||
}
|
||||
// prefer author_kind if there are kinds
|
||||
return Some("author_kind_index".into());
|
||||
}
|
||||
// finally, prefer author_created_at if time is provided
|
||||
return Some("author_created_at_index".into());
|
||||
if f.since.is_none() && f.until.is_none() {
|
||||
if f.kinds.is_none() {
|
||||
// with no use of kinds/created_at, just author
|
||||
return Some("author_index".into());
|
||||
}
|
||||
// prefer author_kind if there are kinds
|
||||
return Some("author_kind_index".into());
|
||||
}
|
||||
// finally, prefer author_created_at if time is provided
|
||||
return Some("author_created_at_index".into());
|
||||
}
|
||||
None
|
||||
}
|
||||
|
@ -654,11 +654,11 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
|
|||
}
|
||||
}
|
||||
if !authvec.is_empty() {
|
||||
let auth_clause = format!("({})", auth_searches.join(" OR "));
|
||||
let auth_clause = format!("({})", auth_searches.join(" OR "));
|
||||
filter_components.push(auth_clause);
|
||||
} else {
|
||||
filter_components.push("false".to_owned());
|
||||
}
|
||||
filter_components.push("false".to_owned());
|
||||
}
|
||||
}
|
||||
// Query for Kind
|
||||
if let Some(ks) = &f.kinds {
|
||||
|
@ -692,11 +692,11 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
|
|||
}
|
||||
}
|
||||
if idvec.is_empty() {
|
||||
// if the ids list was empty, we should never return
|
||||
// if the ids list was empty, we should never return
|
||||
// any results.
|
||||
filter_components.push("false".to_owned());
|
||||
} else {
|
||||
let id_clause = format!("({})", id_searches.join(" OR "));
|
||||
let id_clause = format!("({})", id_searches.join(" OR "));
|
||||
filter_components.push(id_clause);
|
||||
}
|
||||
}
|
||||
|
@ -769,9 +769,9 @@ fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>, Vec<Strin
|
|||
// for every filter in the subscription, generate a subquery
|
||||
for f in &sub.filters {
|
||||
let (f_subquery, mut f_params, index) = query_from_filter(f);
|
||||
if let Some(i) = index {
|
||||
indexes.push(i);
|
||||
}
|
||||
if let Some(i) = index {
|
||||
indexes.push(i);
|
||||
}
|
||||
subqueries.push(f_subquery);
|
||||
params.append(&mut f_params);
|
||||
}
|
||||
|
@ -835,37 +835,37 @@ pub fn build_pool(
|
|||
pub async fn db_checkpoint_task(pool: SqlitePool, frequency: Duration, checkpoint_in_progress: Arc<Mutex<u64>>) -> Result<()> {
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
// WAL size in pages.
|
||||
let mut current_wal_size = 0;
|
||||
// WAL threshold for more aggressive checkpointing (10,000 pages, or about 40MB)
|
||||
let wal_threshold = 1000*10;
|
||||
// default threshold for the busy timer
|
||||
let busy_wait_default = Duration::from_secs(1);
|
||||
// if the WAL file is getting too big, switch to this
|
||||
let busy_wait_default_long = Duration::from_secs(10);
|
||||
// WAL size in pages.
|
||||
let mut current_wal_size = 0;
|
||||
// WAL threshold for more aggressive checkpointing (10,000 pages, or about 40MB)
|
||||
let wal_threshold = 1000*10;
|
||||
// default threshold for the busy timer
|
||||
let busy_wait_default = Duration::from_secs(1);
|
||||
// if the WAL file is getting too big, switch to this
|
||||
let busy_wait_default_long = Duration::from_secs(10);
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(frequency) => {
|
||||
if let Ok(mut conn) = pool.get() {
|
||||
let mut _guard:Option<MutexGuard<u64>> = None;
|
||||
// the busy timer will block writers, so don't set
|
||||
// this any higher than you want max latency for event
|
||||
// writes.
|
||||
if current_wal_size <= wal_threshold {
|
||||
conn.busy_timeout(busy_wait_default).ok();
|
||||
} else {
|
||||
// if the wal size has exceeded a threshold, increase the busy timeout.
|
||||
conn.busy_timeout(busy_wait_default_long).ok();
|
||||
// take a lock that will prevent new readers.
|
||||
info!("blocking new readers to perform wal_checkpoint");
|
||||
_guard = Some(checkpoint_in_progress.lock().await);
|
||||
}
|
||||
// the busy timer will block writers, so don't set
|
||||
// this any higher than you want max latency for event
|
||||
// writes.
|
||||
if current_wal_size <= wal_threshold {
|
||||
conn.busy_timeout(busy_wait_default).ok();
|
||||
} else {
|
||||
// if the wal size has exceeded a threshold, increase the busy timeout.
|
||||
conn.busy_timeout(busy_wait_default_long).ok();
|
||||
// take a lock that will prevent new readers.
|
||||
info!("blocking new readers to perform wal_checkpoint");
|
||||
_guard = Some(checkpoint_in_progress.lock().await);
|
||||
}
|
||||
debug!("running wal_checkpoint(TRUNCATE)");
|
||||
if let Ok(new_size) = checkpoint_db(&mut conn) {
|
||||
current_wal_size = new_size;
|
||||
}
|
||||
current_wal_size = new_size;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
|
@ -922,8 +922,8 @@ fn repeat_vars(count: usize) -> String {
|
|||
pub async fn monitor_pool(name: &str, pool: SqlitePool) {
|
||||
let sleep_dur = Duration::from_secs(60);
|
||||
loop {
|
||||
log_pool_stats(name, &pool);
|
||||
tokio::time::sleep(sleep_dur).await;
|
||||
log_pool_stats(name, &pool);
|
||||
tokio::time::sleep(sleep_dur).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -936,7 +936,7 @@ fn log_pool_stats(name: &str, pool: &SqlitePool) {
|
|||
name,
|
||||
in_use_cxns,
|
||||
state.connections,
|
||||
pool.max_size()
|
||||
pool.max_size()
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -240,10 +240,10 @@ pub fn rebuild_tags(conn: &mut PooledConnection) -> Result<()> {
|
|||
let mut stmt = tx.prepare("select id, content from event order by id;")?;
|
||||
let mut tag_rows = stmt.query([])?;
|
||||
while let Some(row) = tag_rows.next()? {
|
||||
if (events_processed as f32)/(count as f32) > percent_done {
|
||||
info!("Tag update {}% complete...", (100.0*percent_done).round());
|
||||
percent_done += update_each_percent;
|
||||
}
|
||||
if (events_processed as f32)/(count as f32) > percent_done {
|
||||
info!("Tag update {}% complete...", (100.0*percent_done).round());
|
||||
percent_done += update_each_percent;
|
||||
}
|
||||
// we want to capture the event_id that had the tag, the tag name, and the tag hex value.
|
||||
let event_id: u64 = row.get(0)?;
|
||||
let event_json: String = row.get(1)?;
|
||||
|
@ -272,7 +272,7 @@ pub fn rebuild_tags(conn: &mut PooledConnection) -> Result<()> {
|
|||
)?;
|
||||
}
|
||||
}
|
||||
events_processed += 1;
|
||||
events_processed += 1;
|
||||
}
|
||||
}
|
||||
tx.commit()?;
|
||||
|
@ -560,7 +560,7 @@ fn mig_11_to_12(conn: &mut PooledConnection) -> Result<usize> {
|
|||
// Lookup every replaceable event
|
||||
let mut stmt = tx.prepare("select kind,author from event where kind in (0,3,41) or (kind>=10000 and kind<20000) order by id;")?;
|
||||
let mut replaceable_rows = stmt.query([])?;
|
||||
info!("updating replaceable events; this could take awhile...");
|
||||
info!("updating replaceable events; this could take awhile...");
|
||||
while let Some(row) = replaceable_rows.next()? {
|
||||
// we want to capture the event_id that had the tag, the tag name, and the tag hex value.
|
||||
let event_kind: u64 = row.get(0)?;
|
||||
|
@ -641,10 +641,10 @@ PRAGMA user_version = 15;
|
|||
let clear_hidden_sql = r##"DELETE FROM event WHERE HIDDEN=true;"##;
|
||||
info!("removing hidden events; this may take awhile...");
|
||||
match conn.execute_batch(clear_hidden_sql) {
|
||||
Ok(()) => {
|
||||
info!("all hidden events removed");
|
||||
},
|
||||
Err(err) => {
|
||||
Ok(()) => {
|
||||
info!("all hidden events removed");
|
||||
},
|
||||
Err(err) => {
|
||||
error!("delete failed: {}", err);
|
||||
panic!("could not remove hidden events");
|
||||
}
|
||||
|
|
208
src/server.rs
208
src/server.rs
|
@ -90,7 +90,7 @@ async fn handle_web_request(
|
|||
tokio_tungstenite::tungstenite::protocol::Role::Server,
|
||||
Some(config),
|
||||
)
|
||||
.await;
|
||||
.await;
|
||||
let origin = get_header_string("origin", request.headers());
|
||||
let user_agent = get_header_string("user-agent", request.headers());
|
||||
// determine the remote IP from headers if the exist
|
||||
|
@ -109,7 +109,7 @@ async fn handle_web_request(
|
|||
};
|
||||
// spawn a nostr server with our websocket
|
||||
tokio::spawn(nostr_server(
|
||||
repo,
|
||||
repo,
|
||||
client_info,
|
||||
settings,
|
||||
ws_stream,
|
||||
|
@ -154,26 +154,26 @@ async fn handle_web_request(
|
|||
let rinfo = RelayInfo::from(settings.info);
|
||||
let b = Body::from(serde_json::to_string_pretty(&rinfo).unwrap());
|
||||
return Ok(Response::builder()
|
||||
.status(200)
|
||||
.header("Content-Type", "application/nostr+json")
|
||||
.header("Access-Control-Allow-Origin", "*")
|
||||
.body(b)
|
||||
.unwrap());
|
||||
.status(200)
|
||||
.header("Content-Type", "application/nostr+json")
|
||||
.header("Access-Control-Allow-Origin", "*")
|
||||
.body(b)
|
||||
.unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Response::builder()
|
||||
.status(200)
|
||||
.header("Content-Type", "text/plain")
|
||||
.body(Body::from("Please use a Nostr client to connect."))
|
||||
.unwrap())
|
||||
.status(200)
|
||||
.header("Content-Type", "text/plain")
|
||||
.body(Body::from("Please use a Nostr client to connect."))
|
||||
.unwrap())
|
||||
}
|
||||
(_, _) => {
|
||||
//handle any other url
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::NOT_FOUND)
|
||||
.body(Body::from("Nothing here."))
|
||||
.unwrap())
|
||||
.status(StatusCode::NOT_FOUND)
|
||||
.body(Body::from("Nothing here."))
|
||||
.unwrap())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -191,18 +191,18 @@ async fn ctrl_c_or_signal(mut shutdown_signal: Receiver<()>) {
|
|||
loop {
|
||||
tokio::select! {
|
||||
_ = shutdown_signal.recv() => {
|
||||
info!("Shutting down webserver as requested");
|
||||
// server shutting down, exit loop
|
||||
break;
|
||||
},
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
info!("Shutting down webserver due to SIGINT");
|
||||
break;
|
||||
info!("Shutting down webserver as requested");
|
||||
// server shutting down, exit loop
|
||||
break;
|
||||
},
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
info!("Shutting down webserver due to SIGINT");
|
||||
break;
|
||||
},
|
||||
_ = term_signal.recv() => {
|
||||
info!("Shutting down webserver due to SIGTERM");
|
||||
break;
|
||||
},
|
||||
_ = term_signal.recv() => {
|
||||
info!("Shutting down webserver due to SIGTERM");
|
||||
break;
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -251,18 +251,18 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
|
|||
let rt = Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.thread_name_fn(|| {
|
||||
// give each thread a unique numeric name
|
||||
static ATOMIC_ID: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
|
||||
let id = ATOMIC_ID.fetch_add(1,Ordering::SeqCst);
|
||||
format!("tokio-ws-{}", id)
|
||||
})
|
||||
// limit concurrent SQLite blocking threads
|
||||
// give each thread a unique numeric name
|
||||
static ATOMIC_ID: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
|
||||
let id = ATOMIC_ID.fetch_add(1,Ordering::SeqCst);
|
||||
format!("tokio-ws-{}", id)
|
||||
})
|
||||
// limit concurrent SQLite blocking threads
|
||||
.max_blocking_threads(settings.limits.max_blocking_threads)
|
||||
.on_thread_start(|| {
|
||||
trace!("started new thread: {:?}", std::thread::current().name());
|
||||
})
|
||||
.on_thread_stop(|| {
|
||||
trace!("stopped thread: {:?}", std::thread::current().name());
|
||||
trace!("stopped thread: {:?}", std::thread::current().name());
|
||||
})
|
||||
.build()
|
||||
.unwrap();
|
||||
|
@ -293,19 +293,19 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
|
|||
// overwhelming this will drop events and won't register
|
||||
// metadata events.
|
||||
let (metadata_tx, metadata_rx) = broadcast::channel::<Event>(4096);
|
||||
// build a repository for events
|
||||
let repo = db::build_repo(&settings).await;
|
||||
// build a repository for events
|
||||
let repo = db::build_repo(&settings).await;
|
||||
// start the database writer task. Give it a channel for
|
||||
// writing events, and for publishing events that have been
|
||||
// written (to all connected clients).
|
||||
tokio::task::spawn(
|
||||
tokio::task::spawn(
|
||||
db::db_writer(
|
||||
repo.clone(),
|
||||
settings.clone(),
|
||||
event_rx,
|
||||
bcast_tx.clone(),
|
||||
metadata_tx.clone(),
|
||||
shutdown_listen,
|
||||
repo.clone(),
|
||||
settings.clone(),
|
||||
event_rx,
|
||||
bcast_tx.clone(),
|
||||
metadata_tx.clone(),
|
||||
shutdown_listen,
|
||||
));
|
||||
info!("db writer created");
|
||||
|
||||
|
@ -327,7 +327,6 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
|
|||
let controlled_shutdown = invoke_shutdown.clone();
|
||||
tokio::spawn(async move {
|
||||
info!("control message listener started");
|
||||
// we only have good "shutdown" messages propagation from this-> controlled shutdown. Not from controlled_shutdown-> this. Which means we have a task that is stuck waiting on a sync receive. recv is blocking, and this is async.
|
||||
match shutdown_rx.recv() {
|
||||
Ok(()) => {
|
||||
info!("control message requesting shutdown");
|
||||
|
@ -348,14 +347,14 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
|
|||
info!("shutting down due to SIGINT (main)");
|
||||
ctrl_c_shutdown.send(()).ok();
|
||||
});
|
||||
// spawn a task to check the pool size.
|
||||
//let pool_monitor = pool.clone();
|
||||
//tokio::spawn(async move {db::monitor_pool("reader", pool_monitor).await;});
|
||||
// spawn a task to check the pool size.
|
||||
//let pool_monitor = pool.clone();
|
||||
//tokio::spawn(async move {db::monitor_pool("reader", pool_monitor).await;});
|
||||
|
||||
// A `Service` is needed for every connection, so this
|
||||
// creates one from our `handle_request` function.
|
||||
let make_svc = make_service_fn(|conn: &AddrStream| {
|
||||
let repo = repo.clone();
|
||||
let repo = repo.clone();
|
||||
let remote_addr = conn.remote_addr();
|
||||
let bcast = bcast_tx.clone();
|
||||
let event = event_tx.clone();
|
||||
|
@ -366,7 +365,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
|
|||
Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {
|
||||
handle_web_request(
|
||||
request,
|
||||
repo.clone(),
|
||||
repo.clone(),
|
||||
settings.clone(),
|
||||
remote_addr,
|
||||
bcast.clone(),
|
||||
|
@ -404,10 +403,10 @@ fn convert_to_msg(msg: &str, max_bytes: Option<usize>) -> Result<NostrMessage> {
|
|||
let parsed_res: Result<NostrMessage> = serde_json::from_str(msg).map_err(std::convert::Into::into);
|
||||
match parsed_res {
|
||||
Ok(m) => {
|
||||
if let NostrMessage::SubMsg(_) = m {
|
||||
// note; this only prints the first 16k of a REQ and then truncates.
|
||||
trace!("REQ: {:?}",msg);
|
||||
};
|
||||
if let NostrMessage::SubMsg(_) = m {
|
||||
// note; this only prints the first 16k of a REQ and then truncates.
|
||||
trace!("REQ: {:?}",msg);
|
||||
};
|
||||
if let NostrMessage::EventMsg(_) = m {
|
||||
if let Some(max_size) = max_bytes {
|
||||
// check length, ensure that some max size is set.
|
||||
|
@ -514,7 +513,7 @@ async fn nostr_server(
|
|||
loop {
|
||||
tokio::select! {
|
||||
_ = shutdown.recv() => {
|
||||
info!("Close connection down due to shutdown, client: {}, ip: {:?}, connected: {:?}", cid, conn.ip(), orig_start.elapsed());
|
||||
info!("Close connection down due to shutdown, client: {}, ip: {:?}, connected: {:?}", cid, conn.ip(), orig_start.elapsed());
|
||||
// server shutting down, exit loop
|
||||
break;
|
||||
},
|
||||
|
@ -552,7 +551,6 @@ async fn nostr_server(
|
|||
if !sub.interested_in_event(&global_event) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: serialize at broadcast time, instead of
|
||||
// once for each consumer.
|
||||
if let Ok(event_str) = serde_json::to_string(&global_event) {
|
||||
|
@ -575,9 +573,9 @@ async fn nostr_server(
|
|||
Some(Ok(Message::Text(m))) => {
|
||||
convert_to_msg(&m,settings.limits.max_event_bytes)
|
||||
},
|
||||
Some(Ok(Message::Binary(_))) => {
|
||||
ws_stream.send(
|
||||
make_notice_message(&Notice::message("binary messages are not accepted".into()))).await.ok();
|
||||
Some(Ok(Message::Binary(_))) => {
|
||||
ws_stream.send(
|
||||
make_notice_message(&Notice::message("binary messages are not accepted".into()))).await.ok();
|
||||
continue;
|
||||
},
|
||||
Some(Ok(Message::Ping(_) | Message::Pong(_))) => {
|
||||
|
@ -585,19 +583,19 @@ async fn nostr_server(
|
|||
// send responses automatically.
|
||||
continue;
|
||||
},
|
||||
Some(Err(WsError::Capacity(MessageTooLong{size, max_size}))) => {
|
||||
ws_stream.send(
|
||||
make_notice_message(&Notice::message(format!("message too large ({} > {})",size, max_size)))).await.ok();
|
||||
Some(Err(WsError::Capacity(MessageTooLong{size, max_size}))) => {
|
||||
ws_stream.send(
|
||||
make_notice_message(&Notice::message(format!("message too large ({} > {})",size, max_size)))).await.ok();
|
||||
continue;
|
||||
},
|
||||
},
|
||||
None |
|
||||
Some(Ok(Message::Close(_)) |
|
||||
Err(WsError::AlreadyClosed | WsError::ConnectionClosed |
|
||||
WsError::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake)))
|
||||
Some(Ok(Message::Close(_)) |
|
||||
Err(WsError::AlreadyClosed | WsError::ConnectionClosed |
|
||||
WsError::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake)))
|
||||
=> {
|
||||
debug!("websocket close from client (cid: {}, ip: {:?})",cid, conn.ip());
|
||||
break;
|
||||
},
|
||||
break;
|
||||
},
|
||||
Some(Err(WsError::Io(e))) => {
|
||||
// IO errors are considered fatal
|
||||
warn!("IO error (cid: {}, ip: {:?}): {:?}", cid, conn.ip(), e);
|
||||
|
@ -627,14 +625,14 @@ async fn nostr_server(
|
|||
let submit_event = SubmittedEvent { event: e.clone(), notice_tx: notice_tx.clone() };
|
||||
event_tx.send(submit_event).await.ok();
|
||||
client_published_event_count += 1;
|
||||
} else {
|
||||
info!("client: {} sent a far future-dated event", cid);
|
||||
if let Some(fut_sec) = settings.options.reject_future_seconds {
|
||||
let msg = format!("The event created_at field is out of the acceptable range (+{}sec) for this relay.",fut_sec);
|
||||
let notice = Notice::invalid(e.id, &msg);
|
||||
ws_stream.send(make_notice_message(¬ice)).await.ok();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
info!("client: {} sent a far future-dated event", cid);
|
||||
if let Some(fut_sec) = settings.options.reject_future_seconds {
|
||||
let msg = format!("The event created_at field is out of the acceptable range (+{}sec) for this relay.",fut_sec);
|
||||
let notice = Notice::invalid(e.id, &msg);
|
||||
ws_stream.send(make_notice_message(¬ice)).await.ok();
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
info!("client sent an invalid event (cid: {})", cid);
|
||||
|
@ -649,49 +647,49 @@ async fn nostr_server(
|
|||
// * registering the subscription so future events can be matched
|
||||
// * making a channel to cancel to request later
|
||||
// * sending a request for a SQL query
|
||||
// Do nothing if the sub already exists.
|
||||
if conn.has_subscription(&s) {
|
||||
info!("client sent duplicate subscription, ignoring (cid: {}, sub: {:?})", cid, s.id);
|
||||
} else {
|
||||
if let Some(ref lim) = sub_lim_opt {
|
||||
lim.until_ready_with_jitter(jitter).await;
|
||||
}
|
||||
// Do nothing if the sub already exists.
|
||||
if conn.has_subscription(&s) {
|
||||
info!("client sent duplicate subscription, ignoring (cid: {}, sub: {:?})", cid, s.id);
|
||||
} else {
|
||||
if let Some(ref lim) = sub_lim_opt {
|
||||
lim.until_ready_with_jitter(jitter).await;
|
||||
}
|
||||
let (abandon_query_tx, abandon_query_rx) = oneshot::channel::<()>();
|
||||
match conn.subscribe(s.clone()) {
|
||||
Ok(()) => {
|
||||
Ok(()) => {
|
||||
// when we insert, if there was a previous query running with the same name, cancel it.
|
||||
if let Some(previous_query) = running_queries.insert(s.id.clone(), abandon_query_tx) {
|
||||
previous_query.send(()).ok();
|
||||
previous_query.send(()).ok();
|
||||
}
|
||||
if s.needs_historical_events() {
|
||||
// start a database query. this spawns a blocking database query on a worker thread.
|
||||
repo.query_subscription(s, cid.clone(), query_tx.clone(), abandon_query_rx).await.ok();
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
info!("Subscription error: {} (cid: {}, sub: {:?})", e, cid, s.id);
|
||||
if s.needs_historical_events() {
|
||||
// start a database query. this spawns a blocking database query on a worker thread.
|
||||
repo.query_subscription(s, cid.clone(), query_tx.clone(), abandon_query_rx).await.ok();
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
info!("Subscription error: {} (cid: {}, sub: {:?})", e, cid, s.id);
|
||||
ws_stream.send(make_notice_message(&Notice::message(format!("Subscription error: {}", e)))).await.ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Ok(NostrMessage::CloseMsg(cc)) => {
|
||||
// closing a request simply removes the subscription.
|
||||
let parsed : Result<Close> = Result::<Close>::from(cc);
|
||||
if let Ok(c) = parsed {
|
||||
// check if a query is currently
|
||||
// running, and remove it if so.
|
||||
let stop_tx = running_queries.remove(&c.id);
|
||||
if let Some(tx) = stop_tx {
|
||||
tx.send(()).ok();
|
||||
}
|
||||
// stop checking new events against
|
||||
// the subscription
|
||||
conn.unsubscribe(&c);
|
||||
} else {
|
||||
info!("invalid command ignored");
|
||||
ws_stream.send(make_notice_message(&Notice::message("could not parse command".into()))).await.ok();
|
||||
if let Ok(c) = parsed {
|
||||
// check if a query is currently
|
||||
// running, and remove it if so.
|
||||
let stop_tx = running_queries.remove(&c.id);
|
||||
if let Some(tx) = stop_tx {
|
||||
tx.send(()).ok();
|
||||
}
|
||||
// stop checking new events against
|
||||
// the subscription
|
||||
conn.unsubscribe(&c);
|
||||
} else {
|
||||
info!("invalid command ignored");
|
||||
ws_stream.send(make_notice_message(&Notice::message("could not parse command".into()))).await.ok();
|
||||
}
|
||||
},
|
||||
Err(Error::ConnError) => {
|
||||
debug!("got connection close/error, disconnecting cid: {}, ip: {:?}",cid, conn.ip());
|
||||
|
@ -722,6 +720,6 @@ async fn nostr_server(
|
|||
conn.ip(),
|
||||
client_published_event_count,
|
||||
client_received_event_count,
|
||||
orig_start.elapsed()
|
||||
orig_start.elapsed()
|
||||
);
|
||||
}
|
||||
|
|
|
@ -65,21 +65,21 @@ impl<'de> Deserialize<'de> for ReqFilter {
|
|||
tags: None,
|
||||
force_no_match: false,
|
||||
};
|
||||
let empty_string = "".into();
|
||||
let empty_string = "".into();
|
||||
let mut ts = None;
|
||||
// iterate through each key, and assign values that exist
|
||||
for (key, val) in filter {
|
||||
// ids
|
||||
if key == "ids" {
|
||||
let raw_ids: Option<Vec<String>>= Deserialize::deserialize(val).ok();
|
||||
if let Some(a) = raw_ids.as_ref() {
|
||||
if a.contains(&empty_string) {
|
||||
return Err(serde::de::Error::invalid_type(
|
||||
Unexpected::Other("prefix matches must not be empty strings"),
|
||||
&"a json object"));
|
||||
}
|
||||
}
|
||||
rf.ids =raw_ids;
|
||||
let raw_ids: Option<Vec<String>>= Deserialize::deserialize(val).ok();
|
||||
if let Some(a) = raw_ids.as_ref() {
|
||||
if a.contains(&empty_string) {
|
||||
return Err(serde::de::Error::invalid_type(
|
||||
Unexpected::Other("prefix matches must not be empty strings"),
|
||||
&"a json object"));
|
||||
}
|
||||
}
|
||||
rf.ids =raw_ids;
|
||||
} else if key == "kinds" {
|
||||
rf.kinds = Deserialize::deserialize(val).ok();
|
||||
} else if key == "since" {
|
||||
|
@ -90,14 +90,14 @@ impl<'de> Deserialize<'de> for ReqFilter {
|
|||
rf.limit = Deserialize::deserialize(val).ok();
|
||||
} else if key == "authors" {
|
||||
let raw_authors: Option<Vec<String>>= Deserialize::deserialize(val).ok();
|
||||
if let Some(a) = raw_authors.as_ref() {
|
||||
if a.contains(&empty_string) {
|
||||
return Err(serde::de::Error::invalid_type(
|
||||
Unexpected::Other("prefix matches must not be empty strings"),
|
||||
&"a json object"));
|
||||
}
|
||||
}
|
||||
rf.authors = raw_authors;
|
||||
if let Some(a) = raw_authors.as_ref() {
|
||||
if a.contains(&empty_string) {
|
||||
return Err(serde::de::Error::invalid_type(
|
||||
Unexpected::Other("prefix matches must not be empty strings"),
|
||||
&"a json object"));
|
||||
}
|
||||
}
|
||||
rf.authors = raw_authors;
|
||||
} else if key.starts_with('#') && key.len() > 1 && val.is_array() {
|
||||
if let Some(tag_search) = tag_search_char_from_filter(key) {
|
||||
if ts.is_none() {
|
||||
|
@ -107,7 +107,7 @@ impl<'de> Deserialize<'de> for ReqFilter {
|
|||
if let Some(m) = ts.as_mut() {
|
||||
let tag_vals: Option<Vec<String>> = Deserialize::deserialize(val).ok();
|
||||
if let Some(v) = tag_vals {
|
||||
let hs = v.into_iter().collect::<HashSet<_>>();
|
||||
let hs = v.into_iter().collect::<HashSet<_>>();
|
||||
m.insert(tag_search.to_owned(), hs);
|
||||
}
|
||||
};
|
||||
|
@ -204,7 +204,7 @@ impl Subscription {
|
|||
/// Determine if any filter is requesting historical (database)
|
||||
/// queries. If every filter has limit:0, we do not need to query the DB.
|
||||
#[must_use] pub fn needs_historical_events(&self) -> bool {
|
||||
self.filters.iter().any(|f| f.limit!=Some(0))
|
||||
self.filters.iter().any(|f| f.limit!=Some(0))
|
||||
}
|
||||
|
||||
/// Determine if this subscription matches a given [`Event`]. Any
|
||||
|
@ -316,19 +316,19 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn req_empty_authors_prefix() {
|
||||
let raw_json = "[\"REQ\",\"some-id\",{\"authors\": [\"\"]}]";
|
||||
let raw_json = "[\"REQ\",\"some-id\",{\"authors\": [\"\"]}]";
|
||||
assert!(serde_json::from_str::<Subscription>(raw_json).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn req_empty_ids_prefix() {
|
||||
let raw_json = "[\"REQ\",\"some-id\",{\"ids\": [\"\"]}]";
|
||||
let raw_json = "[\"REQ\",\"some-id\",{\"ids\": [\"\"]}]";
|
||||
assert!(serde_json::from_str::<Subscription>(raw_json).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn req_empty_ids_prefix_mixed() {
|
||||
let raw_json = "[\"REQ\",\"some-id\",{\"ids\": [\"\",\"aaa\"]}]";
|
||||
let raw_json = "[\"REQ\",\"some-id\",{\"ids\": [\"\",\"aaa\"]}]";
|
||||
assert!(serde_json::from_str::<Subscription>(raw_json).is_err());
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user