mirror of
https://github.com/scsibug/nostr-rs-relay.git
synced 2024-11-24 17:49:07 -05:00
refactor: Fix clippy warnings
This commit is contained in:
parent
bc4b45d4b8
commit
bd07a11f50
|
@ -35,7 +35,7 @@ pub fn main() -> Result<()> {
|
||||||
// ensure the schema version is current.
|
// ensure the schema version is current.
|
||||||
if version != DB_VERSION {
|
if version != DB_VERSION {
|
||||||
info!("version is not current, exiting");
|
info!("version is not current, exiting");
|
||||||
panic!("cannot write to schema other than v{}", DB_VERSION);
|
panic!("cannot write to schema other than v{DB_VERSION}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// this channel will contain parsed events ready to be inserted
|
// this channel will contain parsed events ready to be inserted
|
||||||
|
|
|
@ -108,7 +108,7 @@ impl ConditionQuery {
|
||||||
sigstr: &str,
|
sigstr: &str,
|
||||||
) -> Option<ConditionQuery> {
|
) -> Option<ConditionQuery> {
|
||||||
// form the token
|
// form the token
|
||||||
let tok = format!("nostr:delegation:{}:{}", delegatee, cond_query);
|
let tok = format!("nostr:delegation:{delegatee}:{cond_query}");
|
||||||
// form SHA256 hash
|
// form SHA256 hash
|
||||||
let digest: sha256::Hash = sha256::Hash::hash(tok.as_bytes());
|
let digest: sha256::Hash = sha256::Hash::hash(tok.as_bytes());
|
||||||
let sig = schnorr::Signature::from_str(sigstr).unwrap();
|
let sig = schnorr::Signature::from_str(sigstr).unwrap();
|
||||||
|
|
|
@ -143,9 +143,9 @@ impl Event {
|
||||||
let default = "".to_string();
|
let default = "".to_string();
|
||||||
let dvals:Vec<&String> = self.tags
|
let dvals:Vec<&String> = self.tags
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|x| x.len() >= 1)
|
.filter(|x| !x.is_empty())
|
||||||
.filter(|x| x.get(0).unwrap() == "d")
|
.filter(|x| x.get(0).unwrap() == "d")
|
||||||
.map(|x| x.get(1).unwrap_or_else(|| &default)).take(1)
|
.map(|x| x.get(1).unwrap_or(&default)).take(1)
|
||||||
.collect();
|
.collect();
|
||||||
let dval_first = dvals.get(0);
|
let dval_first = dvals.get(0);
|
||||||
match dval_first {
|
match dval_first {
|
||||||
|
@ -292,7 +292,7 @@ impl Event {
|
||||||
let c = c_opt.unwrap();
|
let c = c_opt.unwrap();
|
||||||
// * compute the sha256sum.
|
// * compute the sha256sum.
|
||||||
let digest: sha256::Hash = sha256::Hash::hash(c.as_bytes());
|
let digest: sha256::Hash = sha256::Hash::hash(c.as_bytes());
|
||||||
let hex_digest = format!("{:x}", digest);
|
let hex_digest = format!("{digest:x}");
|
||||||
// * ensure the id matches the computed sha256sum.
|
// * ensure the id matches the computed sha256sum.
|
||||||
if self.id != hex_digest {
|
if self.id != hex_digest {
|
||||||
debug!("event id does not match digest");
|
debug!("event id does not match digest");
|
||||||
|
|
|
@ -107,7 +107,7 @@ impl std::fmt::Display for Nip05Name {
|
||||||
/// Check if the specified username and address are present and match in this response body
|
/// Check if the specified username and address are present and match in this response body
|
||||||
fn body_contains_user(username: &str, address: &str, bytes: &hyper::body::Bytes) -> Result<bool> {
|
fn body_contains_user(username: &str, address: &str, bytes: &hyper::body::Bytes) -> Result<bool> {
|
||||||
// convert the body into json
|
// convert the body into json
|
||||||
let body: serde_json::Value = serde_json::from_slice(&bytes)?;
|
let body: serde_json::Value = serde_json::from_slice(bytes)?;
|
||||||
// ensure we have a names object.
|
// ensure we have a names object.
|
||||||
let names_map = body
|
let names_map = body
|
||||||
.as_object()
|
.as_object()
|
||||||
|
|
|
@ -77,26 +77,25 @@ impl NostrRepo for PostgresRepo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if let Some(d_tag) = e.distinct_param() {
|
if let Some(d_tag) = e.distinct_param() {
|
||||||
let repl_count:i64;
|
let repl_count:i64 = if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) {
|
||||||
if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) {
|
sqlx::query_scalar(
|
||||||
repl_count = sqlx::query_scalar(
|
|
||||||
"SELECT count(*) AS count FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.pub_key=$1 AND e.kind=$2 AND t.name='d' AND t.value_hex=$3 AND e.created_at >= $4 LIMIT 1;")
|
"SELECT count(*) AS count FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.pub_key=$1 AND e.kind=$2 AND t.name='d' AND t.value_hex=$3 AND e.created_at >= $4 LIMIT 1;")
|
||||||
.bind(hex::decode(&e.pubkey).ok())
|
.bind(hex::decode(&e.pubkey).ok())
|
||||||
.bind(e.kind as i64)
|
.bind(e.kind as i64)
|
||||||
.bind(hex::decode(d_tag).ok())
|
.bind(hex::decode(d_tag).ok())
|
||||||
.bind(Utc.timestamp_opt(e.created_at as i64, 0).unwrap())
|
.bind(Utc.timestamp_opt(e.created_at as i64, 0).unwrap())
|
||||||
.fetch_one(&mut tx)
|
.fetch_one(&mut tx)
|
||||||
.await?;
|
.await?
|
||||||
} else {
|
} else {
|
||||||
repl_count = sqlx::query_scalar(
|
sqlx::query_scalar(
|
||||||
"SELECT count(*) AS count FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.pub_key=$1 AND e.kind=$2 AND t.name='d' AND t.value=$3 AND e.created_at >= $4 LIMIT 1;")
|
"SELECT count(*) AS count FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.pub_key=$1 AND e.kind=$2 AND t.name='d' AND t.value=$3 AND e.created_at >= $4 LIMIT 1;")
|
||||||
.bind(hex::decode(&e.pubkey).ok())
|
.bind(hex::decode(&e.pubkey).ok())
|
||||||
.bind(e.kind as i64)
|
.bind(e.kind as i64)
|
||||||
.bind(d_tag.as_bytes())
|
.bind(d_tag.as_bytes())
|
||||||
.bind(Utc.timestamp_opt(e.created_at as i64, 0).unwrap())
|
.bind(Utc.timestamp_opt(e.created_at as i64, 0).unwrap())
|
||||||
.fetch_one(&mut tx)
|
.fetch_one(&mut tx)
|
||||||
.await?;
|
.await?
|
||||||
}
|
};
|
||||||
// if any rows were returned, then some newer event with
|
// if any rows were returned, then some newer event with
|
||||||
// the same author/kind/tag value exist, and we can ignore
|
// the same author/kind/tag value exist, and we can ignore
|
||||||
// this event.
|
// this event.
|
||||||
|
@ -178,22 +177,21 @@ ON CONFLICT (id) DO NOTHING"#,
|
||||||
// parameterized replaceable events
|
// parameterized replaceable events
|
||||||
// check for parameterized replaceable events that would be hidden; don't insert these either.
|
// check for parameterized replaceable events that would be hidden; don't insert these either.
|
||||||
if let Some(d_tag) = e.distinct_param() {
|
if let Some(d_tag) = e.distinct_param() {
|
||||||
let update_count;
|
let update_count = if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) {
|
||||||
if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) {
|
sqlx::query("DELETE FROM event WHERE kind=$1 AND pub_key=$2 AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=$1 AND e.pub_key=$2 AND t.name='d' AND t.value_hex=$3 ORDER BY created_at DESC OFFSET 1);")
|
||||||
update_count = sqlx::query("DELETE FROM event WHERE kind=$1 AND pub_key=$2 AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=$1 AND e.pub_key=$2 AND t.name='d' AND t.value_hex=$3 ORDER BY created_at DESC OFFSET 1);")
|
|
||||||
.bind(e.kind as i64)
|
.bind(e.kind as i64)
|
||||||
.bind(hex::decode(&e.pubkey).ok())
|
.bind(hex::decode(&e.pubkey).ok())
|
||||||
.bind(hex::decode(d_tag).ok())
|
.bind(hex::decode(d_tag).ok())
|
||||||
.execute(&mut tx)
|
.execute(&mut tx)
|
||||||
.await?.rows_affected();
|
.await?.rows_affected()
|
||||||
} else {
|
} else {
|
||||||
update_count = sqlx::query("DELETE FROM event WHERE kind=$1 AND pub_key=$2 AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=$1 AND e.pub_key=$2 AND t.name='d' AND t.value=$3 ORDER BY created_at DESC OFFSET 1);")
|
sqlx::query("DELETE FROM event WHERE kind=$1 AND pub_key=$2 AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=$1 AND e.pub_key=$2 AND t.name='d' AND t.value=$3 ORDER BY created_at DESC OFFSET 1);")
|
||||||
.bind(e.kind as i64)
|
.bind(e.kind as i64)
|
||||||
.bind(hex::decode(&e.pubkey).ok())
|
.bind(hex::decode(&e.pubkey).ok())
|
||||||
.bind(d_tag.as_bytes())
|
.bind(d_tag.as_bytes())
|
||||||
.execute(&mut tx)
|
.execute(&mut tx)
|
||||||
.await?.rows_affected();
|
.await?.rows_affected()
|
||||||
}
|
};
|
||||||
if update_count > 0 {
|
if update_count > 0 {
|
||||||
info!(
|
info!(
|
||||||
"removed {} older parameterized replaceable kind {} events for author: {:?}",
|
"removed {} older parameterized replaceable kind {} events for author: {:?}",
|
||||||
|
|
|
@ -81,7 +81,7 @@ async fn run_migration(migration: impl Migration, db: &PostgresPool) -> Migratio
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
transaction.commit().await.unwrap();
|
transaction.commit().await.unwrap();
|
||||||
return MigrationResult::Upgraded;
|
MigrationResult::Upgraded
|
||||||
}
|
}
|
||||||
|
|
||||||
mod m001 {
|
mod m001 {
|
||||||
|
@ -216,7 +216,7 @@ CREATE INDEX tag_value_hex_idx ON tag USING btree (value_hex);
|
||||||
let q = "INSERT INTO tag (event_id, \"name\", value_hex) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;";
|
let q = "INSERT INTO tag (event_id, \"name\", value_hex) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;";
|
||||||
sqlx::query(q)
|
sqlx::query(q)
|
||||||
.bind(&event_id)
|
.bind(&event_id)
|
||||||
.bind(&tagname)
|
.bind(tagname)
|
||||||
.bind(hex::decode(tagval).ok())
|
.bind(hex::decode(tagval).ok())
|
||||||
.execute(&mut update_tx)
|
.execute(&mut update_tx)
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -224,7 +224,7 @@ CREATE INDEX tag_value_hex_idx ON tag USING btree (value_hex);
|
||||||
let q = "INSERT INTO tag (event_id, \"name\", value) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;";
|
let q = "INSERT INTO tag (event_id, \"name\", value) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;";
|
||||||
sqlx::query(q)
|
sqlx::query(q)
|
||||||
.bind(&event_id)
|
.bind(&event_id)
|
||||||
.bind(&tagname)
|
.bind(tagname)
|
||||||
.bind(tagval.as_bytes())
|
.bind(tagval.as_bytes())
|
||||||
.execute(&mut update_tx)
|
.execute(&mut update_tx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
|
@ -123,16 +123,15 @@ impl SqliteRepo {
|
||||||
}
|
}
|
||||||
// check for parameterized replaceable events that would be hidden; don't insert these either.
|
// check for parameterized replaceable events that would be hidden; don't insert these either.
|
||||||
if let Some(d_tag) = e.distinct_param() {
|
if let Some(d_tag) = e.distinct_param() {
|
||||||
let repl_count;
|
let repl_count = if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) {
|
||||||
if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) {
|
tx.query_row(
|
||||||
repl_count = tx.query_row(
|
|
||||||
"SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND e.kind=? AND t.name='d' AND t.value_hex=? AND e.created_at >= ? LIMIT 1;",
|
"SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND e.kind=? AND t.name='d' AND t.value_hex=? AND e.created_at >= ? LIMIT 1;",
|
||||||
params![pubkey_blob, e.kind, hex::decode(d_tag).ok(), e.created_at],|row| row.get::<usize, usize>(0));
|
params![pubkey_blob, e.kind, hex::decode(d_tag).ok(), e.created_at],|row| row.get::<usize, usize>(0))
|
||||||
} else {
|
} else {
|
||||||
repl_count = tx.query_row(
|
tx.query_row(
|
||||||
"SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND e.kind=? AND t.name='d' AND t.value=? AND e.created_at >= ? LIMIT 1;",
|
"SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND e.kind=? AND t.name='d' AND t.value=? AND e.created_at >= ? LIMIT 1;",
|
||||||
params![pubkey_blob, e.kind, d_tag, e.created_at],|row| row.get::<usize, usize>(0));
|
params![pubkey_blob, e.kind, d_tag, e.created_at],|row| row.get::<usize, usize>(0))
|
||||||
}
|
};
|
||||||
// if any rows were returned, then some newer event with
|
// if any rows were returned, then some newer event with
|
||||||
// the same author/kind/tag value exist, and we can ignore
|
// the same author/kind/tag value exist, and we can ignore
|
||||||
// this event.
|
// this event.
|
||||||
|
@ -201,16 +200,15 @@ impl SqliteRepo {
|
||||||
}
|
}
|
||||||
// if this event is parameterized replaceable, remove other events.
|
// if this event is parameterized replaceable, remove other events.
|
||||||
if let Some(d_tag) = e.distinct_param() {
|
if let Some(d_tag) = e.distinct_param() {
|
||||||
let update_count;
|
let update_count = if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) {
|
||||||
if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) {
|
tx.execute(
|
||||||
update_count = tx.execute(
|
|
||||||
"DELETE FROM event WHERE kind=? AND author=? AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=? AND e.author=? AND t.name='d' AND t.value_hex=? ORDER BY created_at DESC LIMIT -1 OFFSET 1);",
|
"DELETE FROM event WHERE kind=? AND author=? AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=? AND e.author=? AND t.name='d' AND t.value_hex=? ORDER BY created_at DESC LIMIT -1 OFFSET 1);",
|
||||||
params![e.kind, pubkey_blob, e.kind, pubkey_blob, hex::decode(d_tag).ok()])?;
|
params![e.kind, pubkey_blob, e.kind, pubkey_blob, hex::decode(d_tag).ok()])?
|
||||||
} else {
|
} else {
|
||||||
update_count = tx.execute(
|
tx.execute(
|
||||||
"DELETE FROM event WHERE kind=? AND author=? AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=? AND e.author=? AND t.name='d' AND t.value=? ORDER BY created_at DESC LIMIT -1 OFFSET 1);",
|
"DELETE FROM event WHERE kind=? AND author=? AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=? AND e.author=? AND t.name='d' AND t.value=? ORDER BY created_at DESC LIMIT -1 OFFSET 1);",
|
||||||
params![e.kind, pubkey_blob, e.kind, pubkey_blob, d_tag])?;
|
params![e.kind, pubkey_blob, e.kind, pubkey_blob, d_tag])?
|
||||||
}
|
};
|
||||||
if update_count > 0 {
|
if update_count > 0 {
|
||||||
info!(
|
info!(
|
||||||
"removed {} older parameterized replaceable kind {} events for author: {:?}",
|
"removed {} older parameterized replaceable kind {} events for author: {:?}",
|
||||||
|
@ -365,7 +363,7 @@ impl NostrRepo for SqliteRepo {
|
||||||
let filter_start = Instant::now();
|
let filter_start = Instant::now();
|
||||||
filter_count += 1;
|
filter_count += 1;
|
||||||
let sql_gen_elapsed = start.elapsed();
|
let sql_gen_elapsed = start.elapsed();
|
||||||
let (q, p, idx) = query_from_filter(&filter);
|
let (q, p, idx) = query_from_filter(filter);
|
||||||
if sql_gen_elapsed > Duration::from_millis(10) {
|
if sql_gen_elapsed > Duration::from_millis(10) {
|
||||||
debug!("SQL (slow) generated in {:?}", filter_start.elapsed());
|
debug!("SQL (slow) generated in {:?}", filter_start.elapsed());
|
||||||
}
|
}
|
||||||
|
@ -719,8 +717,8 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
|
||||||
|
|
||||||
// check if the index needs to be overriden
|
// check if the index needs to be overriden
|
||||||
let idx_name = override_index(f);
|
let idx_name = override_index(f);
|
||||||
let idx_stmt = idx_name.as_ref().map_or_else(|| "".to_owned(), |i| format!("INDEXED BY {}",i));
|
let idx_stmt = idx_name.as_ref().map_or_else(|| "".to_owned(), |i| format!("INDEXED BY {i}"));
|
||||||
let mut query = format!("SELECT e.content FROM event e {}", idx_stmt);
|
let mut query = format!("SELECT e.content FROM event e {idx_stmt}");
|
||||||
// query parameters for SQLite
|
// query parameters for SQLite
|
||||||
let mut params: Vec<Box<dyn ToSql>> = vec![];
|
let mut params: Vec<Box<dyn ToSql>> = vec![];
|
||||||
|
|
||||||
|
@ -814,26 +812,24 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// do not mix value and value_hex; this is a temporary special case.
|
// do not mix value and value_hex; this is a temporary special case.
|
||||||
if str_vals.len() == 0 {
|
if str_vals.is_empty() {
|
||||||
// create clauses with "?" params for each tag value being searched
|
// create clauses with "?" params for each tag value being searched
|
||||||
let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
|
let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
|
||||||
// find evidence of the target tag name/value existing for this event.
|
// find evidence of the target tag name/value existing for this event.
|
||||||
let tag_clause = format!(
|
let tag_clause = format!(
|
||||||
"e.id IN (SELECT t.event_id FROM tag t WHERE (name=? AND {}))",
|
"e.id IN (SELECT t.event_id FROM tag t WHERE (name=? AND {blob_clause}))",
|
||||||
blob_clause
|
|
||||||
);
|
);
|
||||||
// add the tag name as the first parameter
|
// add the tag name as the first parameter
|
||||||
params.push(Box::new(key.to_string()));
|
params.push(Box::new(key.to_string()));
|
||||||
// add all tag values that are blobs as params
|
// add all tag values that are blobs as params
|
||||||
params.append(&mut blob_vals);
|
params.append(&mut blob_vals);
|
||||||
filter_components.push(tag_clause);
|
filter_components.push(tag_clause);
|
||||||
} else if blob_vals.len() == 0 {
|
} else if blob_vals.is_empty() {
|
||||||
// create clauses with "?" params for each tag value being searched
|
// create clauses with "?" params for each tag value being searched
|
||||||
let str_clause = format!("value IN ({})", repeat_vars(str_vals.len()));
|
let str_clause = format!("value IN ({})", repeat_vars(str_vals.len()));
|
||||||
// find evidence of the target tag name/value existing for this event.
|
// find evidence of the target tag name/value existing for this event.
|
||||||
let tag_clause = format!(
|
let tag_clause = format!(
|
||||||
"e.id IN (SELECT t.event_id FROM tag t WHERE (name=? AND {}))",
|
"e.id IN (SELECT t.event_id FROM tag t WHERE (name=? AND {str_clause}))",
|
||||||
str_clause
|
|
||||||
);
|
);
|
||||||
// add the tag name as the first parameter
|
// add the tag name as the first parameter
|
||||||
params.push(Box::new(key.to_string()));
|
params.push(Box::new(key.to_string()));
|
||||||
|
@ -847,8 +843,7 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
|
||||||
let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
|
let blob_clause = format!("value_hex IN ({})", repeat_vars(blob_vals.len()));
|
||||||
// find evidence of the target tag name/value existing for this event.
|
// find evidence of the target tag name/value existing for this event.
|
||||||
let tag_clause = format!(
|
let tag_clause = format!(
|
||||||
"e.id IN (SELECT t.event_id FROM tag t WHERE (name=? AND ({} OR {})))",
|
"e.id IN (SELECT t.event_id FROM tag t WHERE (name=? AND ({str_clause} OR {blob_clause})))",
|
||||||
str_clause, blob_clause
|
|
||||||
);
|
);
|
||||||
// add the tag name as the first parameter
|
// add the tag name as the first parameter
|
||||||
params.push(Box::new(key.to_string()));
|
params.push(Box::new(key.to_string()));
|
||||||
|
@ -880,7 +875,7 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec<Box<dyn ToSql>>, Option<Stri
|
||||||
// Apply per-filter limit to this subquery.
|
// Apply per-filter limit to this subquery.
|
||||||
// The use of a LIMIT implies a DESC order, to capture only the most recent events.
|
// The use of a LIMIT implies a DESC order, to capture only the most recent events.
|
||||||
if let Some(lim) = f.limit {
|
if let Some(lim) = f.limit {
|
||||||
let _ = write!(query, " ORDER BY e.created_at DESC LIMIT {}", lim);
|
let _ = write!(query, " ORDER BY e.created_at DESC LIMIT {lim}");
|
||||||
} else {
|
} else {
|
||||||
query.push_str(" ORDER BY e.created_at ASC");
|
query.push_str(" ORDER BY e.created_at ASC");
|
||||||
}
|
}
|
||||||
|
@ -907,7 +902,7 @@ fn _query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>, Vec<Stri
|
||||||
// encapsulate subqueries into select statements
|
// encapsulate subqueries into select statements
|
||||||
let subqueries_selects: Vec<String> = subqueries
|
let subqueries_selects: Vec<String> = subqueries
|
||||||
.iter()
|
.iter()
|
||||||
.map(|s| format!("SELECT distinct content, created_at FROM ({})", s))
|
.map(|s| format!("SELECT distinct content, created_at FROM ({s})"))
|
||||||
.collect();
|
.collect();
|
||||||
let query: String = subqueries_selects.join(" UNION ");
|
let query: String = subqueries_selects.join(" UNION ");
|
||||||
(query, params,indexes)
|
(query, params,indexes)
|
||||||
|
|
|
@ -211,13 +211,12 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result<usize> {
|
||||||
}
|
}
|
||||||
// Database is current, all is good
|
// Database is current, all is good
|
||||||
Ordering::Equal => {
|
Ordering::Equal => {
|
||||||
debug!("Database version was already current (v{})", DB_VERSION);
|
debug!("Database version was already current (v{DB_VERSION})");
|
||||||
}
|
}
|
||||||
// Database is newer than what this code understands, abort
|
// Database is newer than what this code understands, abort
|
||||||
Ordering::Greater => {
|
Ordering::Greater => {
|
||||||
panic!(
|
panic!(
|
||||||
"Database version is newer than supported by this executable (v{} > v{})",
|
"Database version is newer than supported by this executable (v{curr_version} > v{DB_VERSION})",
|
||||||
curr_version, DB_VERSION
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,6 +50,7 @@ use tungstenite::protocol::Message;
|
||||||
use tungstenite::protocol::WebSocketConfig;
|
use tungstenite::protocol::WebSocketConfig;
|
||||||
|
|
||||||
/// Handle arbitrary HTTP requests, including for `WebSocket` upgrades.
|
/// Handle arbitrary HTTP requests, including for `WebSocket` upgrades.
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
async fn handle_web_request(
|
async fn handle_web_request(
|
||||||
mut request: Request<Body>,
|
mut request: Request<Body>,
|
||||||
repo: Arc<dyn NostrRepo>,
|
repo: Arc<dyn NostrRepo>,
|
||||||
|
@ -127,9 +128,8 @@ async fn handle_web_request(
|
||||||
// todo: trace, don't print...
|
// todo: trace, don't print...
|
||||||
Err(e) => println!(
|
Err(e) => println!(
|
||||||
"error when trying to upgrade connection \
|
"error when trying to upgrade connection \
|
||||||
from address {} to websocket connection. \
|
from address {remote_addr} to websocket connection. \
|
||||||
Error is: {}",
|
Error is: {e}",
|
||||||
remote_addr, e
|
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -139,7 +139,7 @@ async fn handle_web_request(
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
warn!("websocket response failed");
|
warn!("websocket response failed");
|
||||||
let mut res =
|
let mut res =
|
||||||
Response::new(Body::from(format!("Failed to create websocket: {}", error)));
|
Response::new(Body::from(format!("Failed to create websocket: {error}")));
|
||||||
*res.status_mut() = StatusCode::BAD_REQUEST;
|
*res.status_mut() = StatusCode::BAD_REQUEST;
|
||||||
return Ok(res);
|
return Ok(res);
|
||||||
}
|
}
|
||||||
|
@ -346,7 +346,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
|
||||||
// give each thread a unique numeric name
|
// give each thread a unique numeric name
|
||||||
static ATOMIC_ID: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
|
static ATOMIC_ID: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
|
||||||
let id = ATOMIC_ID.fetch_add(1,Ordering::SeqCst);
|
let id = ATOMIC_ID.fetch_add(1,Ordering::SeqCst);
|
||||||
format!("tokio-ws-{}", id)
|
format!("tokio-ws-{id}")
|
||||||
})
|
})
|
||||||
// limit concurrent SQLite blocking threads
|
// limit concurrent SQLite blocking threads
|
||||||
.max_blocking_threads(settings.limits.max_blocking_threads)
|
.max_blocking_threads(settings.limits.max_blocking_threads)
|
||||||
|
@ -478,7 +478,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul
|
||||||
.with_graceful_shutdown(ctrl_c_or_signal(webserver_shutdown_listen));
|
.with_graceful_shutdown(ctrl_c_or_signal(webserver_shutdown_listen));
|
||||||
// run hyper in this thread. This is why the thread does not return.
|
// run hyper in this thread. This is why the thread does not return.
|
||||||
if let Err(e) = server.await {
|
if let Err(e) = server.await {
|
||||||
eprintln!("server error: {}", e);
|
eprintln!("server error: {e}");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -541,6 +541,7 @@ struct ClientInfo {
|
||||||
|
|
||||||
/// Handle new client connections. This runs through an event loop
|
/// Handle new client connections. This runs through an event loop
|
||||||
/// for all client communication.
|
/// for all client communication.
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
async fn nostr_server(
|
async fn nostr_server(
|
||||||
repo: Arc<dyn NostrRepo>,
|
repo: Arc<dyn NostrRepo>,
|
||||||
client_info: ClientInfo,
|
client_info: ClientInfo,
|
||||||
|
@ -639,7 +640,7 @@ async fn nostr_server(
|
||||||
// database informed us of a query result we asked for
|
// database informed us of a query result we asked for
|
||||||
let subesc = query_result.sub_id.replace('"', "");
|
let subesc = query_result.sub_id.replace('"', "");
|
||||||
if query_result.event == "EOSE" {
|
if query_result.event == "EOSE" {
|
||||||
let send_str = format!("[\"EOSE\",\"{}\"]", subesc);
|
let send_str = format!("[\"EOSE\",\"{subesc}\"]");
|
||||||
ws_stream.send(Message::Text(send_str)).await.ok();
|
ws_stream.send(Message::Text(send_str)).await.ok();
|
||||||
} else {
|
} else {
|
||||||
client_received_event_count += 1;
|
client_received_event_count += 1;
|
||||||
|
@ -666,7 +667,7 @@ async fn nostr_server(
|
||||||
// create an event response and send it
|
// create an event response and send it
|
||||||
let subesc = s.replace('"', "");
|
let subesc = s.replace('"', "");
|
||||||
metrics.sent_events.with_label_values(&["realtime"]).inc();
|
metrics.sent_events.with_label_values(&["realtime"]).inc();
|
||||||
ws_stream.send(Message::Text(format!("[\"EVENT\",\"{}\",{}]", subesc, event_str))).await.ok();
|
ws_stream.send(Message::Text(format!("[\"EVENT\",\"{subesc}\",{event_str}]"))).await.ok();
|
||||||
} else {
|
} else {
|
||||||
warn!("could not serialize event: {:?}", global_event.get_event_id_prefix());
|
warn!("could not serialize event: {:?}", global_event.get_event_id_prefix());
|
||||||
}
|
}
|
||||||
|
@ -692,7 +693,7 @@ async fn nostr_server(
|
||||||
},
|
},
|
||||||
Some(Err(WsError::Capacity(MessageTooLong{size, max_size}))) => {
|
Some(Err(WsError::Capacity(MessageTooLong{size, max_size}))) => {
|
||||||
ws_stream.send(
|
ws_stream.send(
|
||||||
make_notice_message(&Notice::message(format!("message too large ({} > {})",size, max_size)))).await.ok();
|
make_notice_message(&Notice::message(format!("message too large ({size} > {max_size})")))).await.ok();
|
||||||
continue;
|
continue;
|
||||||
},
|
},
|
||||||
None |
|
None |
|
||||||
|
@ -741,7 +742,7 @@ async fn nostr_server(
|
||||||
} else {
|
} else {
|
||||||
info!("client: {} sent a far future-dated event", cid);
|
info!("client: {} sent a far future-dated event", cid);
|
||||||
if let Some(fut_sec) = settings.options.reject_future_seconds {
|
if let Some(fut_sec) = settings.options.reject_future_seconds {
|
||||||
let msg = format!("The event created_at field is out of the acceptable range (+{}sec) for this relay.",fut_sec);
|
let msg = format!("The event created_at field is out of the acceptable range (+{fut_sec}sec) for this relay.");
|
||||||
let notice = Notice::invalid(e.id, &msg);
|
let notice = Notice::invalid(e.id, &msg);
|
||||||
ws_stream.send(make_notice_message(¬ice)).await.ok();
|
ws_stream.send(make_notice_message(¬ice)).await.ok();
|
||||||
}
|
}
|
||||||
|
@ -749,7 +750,7 @@ async fn nostr_server(
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
info!("client sent an invalid event (cid: {})", cid);
|
info!("client sent an invalid event (cid: {})", cid);
|
||||||
ws_stream.send(make_notice_message(&Notice::invalid(evid, &format!("{}", e)))).await.ok();
|
ws_stream.send(make_notice_message(&Notice::invalid(evid, &format!("{e}")))).await.ok();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -782,7 +783,7 @@ async fn nostr_server(
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
info!("Subscription error: {} (cid: {}, sub: {:?})", e, cid, s.id);
|
info!("Subscription error: {} (cid: {}, sub: {:?})", e, cid, s.id);
|
||||||
ws_stream.send(make_notice_message(&Notice::message(format!("Subscription error: {}", e)))).await.ok();
|
ws_stream.send(make_notice_message(&Notice::message(format!("Subscription error: {e}")))).await.ok();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,7 +70,7 @@ impl Serialize for ReqFilter {
|
||||||
if let Some(tags) = &self.tags {
|
if let Some(tags) = &self.tags {
|
||||||
for (k,v) in tags {
|
for (k,v) in tags {
|
||||||
let vals:Vec<&String> = v.iter().collect();
|
let vals:Vec<&String> = v.iter().collect();
|
||||||
map.serialize_entry(&format!("#{}",k), &vals)?;
|
map.serialize_entry(&format!("#{k}"), &vals)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
map.end()
|
map.end()
|
||||||
|
|
Loading…
Reference in New Issue
Block a user