diff --git a/src/bin/bulkloader.rs b/src/bin/bulkloader.rs index 59a3bd8..723c86e 100644 --- a/src/bin/bulkloader.rs +++ b/src/bin/bulkloader.rs @@ -35,7 +35,7 @@ pub fn main() -> Result<()> { // ensure the schema version is current. if version != DB_VERSION { info!("version is not current, exiting"); - panic!("cannot write to schema other than v{}", DB_VERSION); + panic!("cannot write to schema other than v{DB_VERSION}"); } } // this channel will contain parsed events ready to be inserted diff --git a/src/delegation.rs b/src/delegation.rs index 640cb00..8d06a74 100644 --- a/src/delegation.rs +++ b/src/delegation.rs @@ -108,7 +108,7 @@ impl ConditionQuery { sigstr: &str, ) -> Option { // form the token - let tok = format!("nostr:delegation:{}:{}", delegatee, cond_query); + let tok = format!("nostr:delegation:{delegatee}:{cond_query}"); // form SHA256 hash let digest: sha256::Hash = sha256::Hash::hash(tok.as_bytes()); let sig = schnorr::Signature::from_str(sigstr).unwrap(); diff --git a/src/event.rs b/src/event.rs index 8b3529a..06be1e2 100644 --- a/src/event.rs +++ b/src/event.rs @@ -143,9 +143,9 @@ impl Event { let default = "".to_string(); let dvals:Vec<&String> = self.tags .iter() - .filter(|x| x.len() >= 1) + .filter(|x| !x.is_empty()) .filter(|x| x.get(0).unwrap() == "d") - .map(|x| x.get(1).unwrap_or_else(|| &default)).take(1) + .map(|x| x.get(1).unwrap_or(&default)).take(1) .collect(); let dval_first = dvals.get(0); match dval_first { @@ -292,7 +292,7 @@ impl Event { let c = c_opt.unwrap(); // * compute the sha256sum. let digest: sha256::Hash = sha256::Hash::hash(c.as_bytes()); - let hex_digest = format!("{:x}", digest); + let hex_digest = format!("{digest:x}"); // * ensure the id matches the computed sha256sum. if self.id != hex_digest { debug!("event id does not match digest"); diff --git a/src/nip05.rs b/src/nip05.rs index d5a1dd4..4ba9763 100644 --- a/src/nip05.rs +++ b/src/nip05.rs @@ -107,7 +107,7 @@ impl std::fmt::Display for Nip05Name { /// Check if the specified username and address are present and match in this response body fn body_contains_user(username: &str, address: &str, bytes: &hyper::body::Bytes) -> Result { // convert the body into json - let body: serde_json::Value = serde_json::from_slice(&bytes)?; + let body: serde_json::Value = serde_json::from_slice(bytes)?; // ensure we have a names object. let names_map = body .as_object() diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index 346670d..457060b 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -77,26 +77,25 @@ impl NostrRepo for PostgresRepo { } } if let Some(d_tag) = e.distinct_param() { - let repl_count:i64; - if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) { - repl_count = sqlx::query_scalar( + let repl_count:i64 = if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) { + sqlx::query_scalar( "SELECT count(*) AS count FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.pub_key=$1 AND e.kind=$2 AND t.name='d' AND t.value_hex=$3 AND e.created_at >= $4 LIMIT 1;") .bind(hex::decode(&e.pubkey).ok()) .bind(e.kind as i64) .bind(hex::decode(d_tag).ok()) .bind(Utc.timestamp_opt(e.created_at as i64, 0).unwrap()) .fetch_one(&mut tx) - .await?; + .await? } else { - repl_count = sqlx::query_scalar( + sqlx::query_scalar( "SELECT count(*) AS count FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.pub_key=$1 AND e.kind=$2 AND t.name='d' AND t.value=$3 AND e.created_at >= $4 LIMIT 1;") .bind(hex::decode(&e.pubkey).ok()) .bind(e.kind as i64) .bind(d_tag.as_bytes()) .bind(Utc.timestamp_opt(e.created_at as i64, 0).unwrap()) .fetch_one(&mut tx) - .await?; - } + .await? + }; // if any rows were returned, then some newer event with // the same author/kind/tag value exist, and we can ignore // this event. @@ -178,22 +177,21 @@ ON CONFLICT (id) DO NOTHING"#, // parameterized replaceable events // check for parameterized replaceable events that would be hidden; don't insert these either. if let Some(d_tag) = e.distinct_param() { - let update_count; - if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) { - update_count = sqlx::query("DELETE FROM event WHERE kind=$1 AND pub_key=$2 AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=$1 AND e.pub_key=$2 AND t.name='d' AND t.value_hex=$3 ORDER BY created_at DESC OFFSET 1);") + let update_count = if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) { + sqlx::query("DELETE FROM event WHERE kind=$1 AND pub_key=$2 AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=$1 AND e.pub_key=$2 AND t.name='d' AND t.value_hex=$3 ORDER BY created_at DESC OFFSET 1);") .bind(e.kind as i64) .bind(hex::decode(&e.pubkey).ok()) .bind(hex::decode(d_tag).ok()) .execute(&mut tx) - .await?.rows_affected(); + .await?.rows_affected() } else { - update_count = sqlx::query("DELETE FROM event WHERE kind=$1 AND pub_key=$2 AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=$1 AND e.pub_key=$2 AND t.name='d' AND t.value=$3 ORDER BY created_at DESC OFFSET 1);") + sqlx::query("DELETE FROM event WHERE kind=$1 AND pub_key=$2 AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=$1 AND e.pub_key=$2 AND t.name='d' AND t.value=$3 ORDER BY created_at DESC OFFSET 1);") .bind(e.kind as i64) .bind(hex::decode(&e.pubkey).ok()) .bind(d_tag.as_bytes()) .execute(&mut tx) - .await?.rows_affected(); - } + .await?.rows_affected() + }; if update_count > 0 { info!( "removed {} older parameterized replaceable kind {} events for author: {:?}", diff --git a/src/repo/postgres_migration.rs b/src/repo/postgres_migration.rs index 605f3ab..c27ece8 100644 --- a/src/repo/postgres_migration.rs +++ b/src/repo/postgres_migration.rs @@ -81,7 +81,7 @@ async fn run_migration(migration: impl Migration, db: &PostgresPool) -> Migratio .unwrap(); transaction.commit().await.unwrap(); - return MigrationResult::Upgraded; + MigrationResult::Upgraded } mod m001 { @@ -216,7 +216,7 @@ CREATE INDEX tag_value_hex_idx ON tag USING btree (value_hex); let q = "INSERT INTO tag (event_id, \"name\", value_hex) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;"; sqlx::query(q) .bind(&event_id) - .bind(&tagname) + .bind(tagname) .bind(hex::decode(tagval).ok()) .execute(&mut update_tx) .await?; @@ -224,7 +224,7 @@ CREATE INDEX tag_value_hex_idx ON tag USING btree (value_hex); let q = "INSERT INTO tag (event_id, \"name\", value) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;"; sqlx::query(q) .bind(&event_id) - .bind(&tagname) + .bind(tagname) .bind(tagval.as_bytes()) .execute(&mut update_tx) .await?; diff --git a/src/repo/sqlite.rs b/src/repo/sqlite.rs index daefa46..55e1daf 100644 --- a/src/repo/sqlite.rs +++ b/src/repo/sqlite.rs @@ -123,16 +123,15 @@ impl SqliteRepo { } // check for parameterized replaceable events that would be hidden; don't insert these either. if let Some(d_tag) = e.distinct_param() { - let repl_count; - if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) { - repl_count = tx.query_row( + let repl_count = if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) { + tx.query_row( "SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND e.kind=? AND t.name='d' AND t.value_hex=? AND e.created_at >= ? LIMIT 1;", - params![pubkey_blob, e.kind, hex::decode(d_tag).ok(), e.created_at],|row| row.get::(0)); + params![pubkey_blob, e.kind, hex::decode(d_tag).ok(), e.created_at],|row| row.get::(0)) } else { - repl_count = tx.query_row( + tx.query_row( "SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.author=? AND e.kind=? AND t.name='d' AND t.value=? AND e.created_at >= ? LIMIT 1;", - params![pubkey_blob, e.kind, d_tag, e.created_at],|row| row.get::(0)); - } + params![pubkey_blob, e.kind, d_tag, e.created_at],|row| row.get::(0)) + }; // if any rows were returned, then some newer event with // the same author/kind/tag value exist, and we can ignore // this event. @@ -201,16 +200,15 @@ impl SqliteRepo { } // if this event is parameterized replaceable, remove other events. if let Some(d_tag) = e.distinct_param() { - let update_count; - if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) { - update_count = tx.execute( + let update_count = if is_lower_hex(&d_tag) && (d_tag.len() % 2 == 0) { + tx.execute( "DELETE FROM event WHERE kind=? AND author=? AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=? AND e.author=? AND t.name='d' AND t.value_hex=? ORDER BY created_at DESC LIMIT -1 OFFSET 1);", - params![e.kind, pubkey_blob, e.kind, pubkey_blob, hex::decode(d_tag).ok()])?; + params![e.kind, pubkey_blob, e.kind, pubkey_blob, hex::decode(d_tag).ok()])? } else { - update_count = tx.execute( + tx.execute( "DELETE FROM event WHERE kind=? AND author=? AND id IN (SELECT e.id FROM event e LEFT JOIN tag t ON e.id=t.event_id WHERE e.kind=? AND e.author=? AND t.name='d' AND t.value=? ORDER BY created_at DESC LIMIT -1 OFFSET 1);", - params![e.kind, pubkey_blob, e.kind, pubkey_blob, d_tag])?; - } + params![e.kind, pubkey_blob, e.kind, pubkey_blob, d_tag])? + }; if update_count > 0 { info!( "removed {} older parameterized replaceable kind {} events for author: {:?}", @@ -365,7 +363,7 @@ impl NostrRepo for SqliteRepo { let filter_start = Instant::now(); filter_count += 1; let sql_gen_elapsed = start.elapsed(); - let (q, p, idx) = query_from_filter(&filter); + let (q, p, idx) = query_from_filter(filter); if sql_gen_elapsed > Duration::from_millis(10) { debug!("SQL (slow) generated in {:?}", filter_start.elapsed()); } @@ -719,8 +717,8 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec>, Option> = vec![]; @@ -814,26 +812,24 @@ fn query_from_filter(f: &ReqFilter) -> (String, Vec>, Option (String, Vec>, Option (String, Vec>, Option (String, Vec>, Vec = subqueries .iter() - .map(|s| format!("SELECT distinct content, created_at FROM ({})", s)) + .map(|s| format!("SELECT distinct content, created_at FROM ({s})")) .collect(); let query: String = subqueries_selects.join(" UNION "); (query, params,indexes) diff --git a/src/repo/sqlite_migration.rs b/src/repo/sqlite_migration.rs index 6990106..f38b2ec 100644 --- a/src/repo/sqlite_migration.rs +++ b/src/repo/sqlite_migration.rs @@ -211,13 +211,12 @@ pub fn upgrade_db(conn: &mut PooledConnection) -> Result { } // Database is current, all is good Ordering::Equal => { - debug!("Database version was already current (v{})", DB_VERSION); + debug!("Database version was already current (v{DB_VERSION})"); } // Database is newer than what this code understands, abort Ordering::Greater => { panic!( - "Database version is newer than supported by this executable (v{} > v{})", - curr_version, DB_VERSION + "Database version is newer than supported by this executable (v{curr_version} > v{DB_VERSION})", ); } } diff --git a/src/server.rs b/src/server.rs index fa53ba1..a9173d9 100644 --- a/src/server.rs +++ b/src/server.rs @@ -50,6 +50,7 @@ use tungstenite::protocol::Message; use tungstenite::protocol::WebSocketConfig; /// Handle arbitrary HTTP requests, including for `WebSocket` upgrades. +#[allow(clippy::too_many_arguments)] async fn handle_web_request( mut request: Request, repo: Arc, @@ -127,9 +128,8 @@ async fn handle_web_request( // todo: trace, don't print... Err(e) => println!( "error when trying to upgrade connection \ - from address {} to websocket connection. \ - Error is: {}", - remote_addr, e + from address {remote_addr} to websocket connection. \ + Error is: {e}", ), } }); @@ -139,7 +139,7 @@ async fn handle_web_request( Err(error) => { warn!("websocket response failed"); let mut res = - Response::new(Body::from(format!("Failed to create websocket: {}", error))); + Response::new(Body::from(format!("Failed to create websocket: {error}"))); *res.status_mut() = StatusCode::BAD_REQUEST; return Ok(res); } @@ -346,7 +346,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul // give each thread a unique numeric name static ATOMIC_ID: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0); let id = ATOMIC_ID.fetch_add(1,Ordering::SeqCst); - format!("tokio-ws-{}", id) + format!("tokio-ws-{id}") }) // limit concurrent SQLite blocking threads .max_blocking_threads(settings.limits.max_blocking_threads) @@ -478,7 +478,7 @@ pub fn start_server(settings: &Settings, shutdown_rx: MpscReceiver<()>) -> Resul .with_graceful_shutdown(ctrl_c_or_signal(webserver_shutdown_listen)); // run hyper in this thread. This is why the thread does not return. if let Err(e) = server.await { - eprintln!("server error: {}", e); + eprintln!("server error: {e}"); } }); Ok(()) @@ -541,6 +541,7 @@ struct ClientInfo { /// Handle new client connections. This runs through an event loop /// for all client communication. +#[allow(clippy::too_many_arguments)] async fn nostr_server( repo: Arc, client_info: ClientInfo, @@ -639,7 +640,7 @@ async fn nostr_server( // database informed us of a query result we asked for let subesc = query_result.sub_id.replace('"', ""); if query_result.event == "EOSE" { - let send_str = format!("[\"EOSE\",\"{}\"]", subesc); + let send_str = format!("[\"EOSE\",\"{subesc}\"]"); ws_stream.send(Message::Text(send_str)).await.ok(); } else { client_received_event_count += 1; @@ -666,7 +667,7 @@ async fn nostr_server( // create an event response and send it let subesc = s.replace('"', ""); metrics.sent_events.with_label_values(&["realtime"]).inc(); - ws_stream.send(Message::Text(format!("[\"EVENT\",\"{}\",{}]", subesc, event_str))).await.ok(); + ws_stream.send(Message::Text(format!("[\"EVENT\",\"{subesc}\",{event_str}]"))).await.ok(); } else { warn!("could not serialize event: {:?}", global_event.get_event_id_prefix()); } @@ -692,7 +693,7 @@ async fn nostr_server( }, Some(Err(WsError::Capacity(MessageTooLong{size, max_size}))) => { ws_stream.send( - make_notice_message(&Notice::message(format!("message too large ({} > {})",size, max_size)))).await.ok(); + make_notice_message(&Notice::message(format!("message too large ({size} > {max_size})")))).await.ok(); continue; }, None | @@ -741,7 +742,7 @@ async fn nostr_server( } else { info!("client: {} sent a far future-dated event", cid); if let Some(fut_sec) = settings.options.reject_future_seconds { - let msg = format!("The event created_at field is out of the acceptable range (+{}sec) for this relay.",fut_sec); + let msg = format!("The event created_at field is out of the acceptable range (+{fut_sec}sec) for this relay."); let notice = Notice::invalid(e.id, &msg); ws_stream.send(make_notice_message(¬ice)).await.ok(); } @@ -749,7 +750,7 @@ async fn nostr_server( }, Err(e) => { info!("client sent an invalid event (cid: {})", cid); - ws_stream.send(make_notice_message(&Notice::invalid(evid, &format!("{}", e)))).await.ok(); + ws_stream.send(make_notice_message(&Notice::invalid(evid, &format!("{e}")))).await.ok(); } } }, @@ -782,7 +783,7 @@ async fn nostr_server( }, Err(e) => { info!("Subscription error: {} (cid: {}, sub: {:?})", e, cid, s.id); - ws_stream.send(make_notice_message(&Notice::message(format!("Subscription error: {}", e)))).await.ok(); + ws_stream.send(make_notice_message(&Notice::message(format!("Subscription error: {e}")))).await.ok(); } } } diff --git a/src/subscription.rs b/src/subscription.rs index 34a4a48..55dd757 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -70,7 +70,7 @@ impl Serialize for ReqFilter { if let Some(tags) = &self.tags { for (k,v) in tags { let vals:Vec<&String> = v.iter().collect(); - map.serialize_entry(&format!("#{}",k), &vals)?; + map.serialize_entry(&format!("#{k}"), &vals)?; } } map.end()