From f1206e76f2b94a6ef0833c5b7d278418705e470c Mon Sep 17 00:00:00 2001 From: Greg Heartsfield Date: Tue, 25 Jan 2022 20:39:24 -0600 Subject: [PATCH] feat: database reader connection pooling Added connection pooling for queries, as well as basic configuration options for min/max connections. --- Cargo.lock | 32 ++++++++++++++++++++++++++++++++ Cargo.toml | 2 ++ config.toml | 21 +++++++++++++++------ src/config.rs | 11 +++++++++++ src/db.rs | 31 ++++++++++++++++++++++++------- src/main.rs | 13 +++++++++++-- 6 files changed, 95 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1529561..1402cb3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -662,6 +662,8 @@ dependencies = [ "lazy_static", "log", "nonzero_ext", + "r2d2", + "r2d2_sqlite", "rusqlite", "secp256k1", "serde 1.0.131", @@ -811,6 +813,27 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r2d2" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "545c5bc2b880973c9c10e4067418407a0ccaa3091781d1671d46eb35107cb26f" +dependencies = [ + "log", + "parking_lot", + "scheduled-thread-pool", +] + +[[package]] +name = "r2d2_sqlite" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54ca3c9468a76fc2ad724c486a59682fc362efeac7b18d1c012958bc19f34800" +dependencies = [ + "r2d2", + "rusqlite", +] + [[package]] name = "rand" version = "0.6.5" @@ -1028,6 +1051,15 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "254df5081ce98661a883445175e52efe99d1cb2a5552891d965d2f5d0cad1c16" +[[package]] +name = "scheduled-thread-pool" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc6f74fd1204073fa02d5d5d68bec8021be4c38690b61264b2fdb48083d0e7d7" +dependencies = [ + "parking_lot", +] + [[package]] name = "scopeguard" version = "1.1.0" diff --git a/Cargo.toml b/Cargo.toml index 7b8e6c9..51ba1e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,8 @@ serde = { version = "^1.0", features = ["derive"] } serde_json = {version = "^1.0", features = ["preserve_order"]} hex = "^0.4" rusqlite = { version = "^0.26", features = ["limits"]} +r2d2 = "^0.8" +r2d2_sqlite = "^0.19" lazy_static = "^1.4" governor = "^0.4" nonzero_ext = "^0.3" diff --git a/config.toml b/config.toml index 5a2b324..c7880d5 100644 --- a/config.toml +++ b/config.toml @@ -22,9 +22,18 @@ description = "A newly created nostr-rs-relay.\n\nCustomize this with your own i # line option. data_directory = "." +# Database connection pool settings: + +# Minimum number of SQLite reader connections +#min_conn = 4 + +# Maximum number of SQLite reader connections +#max_conn = 128 + [network] # Bind to this network address address = "0.0.0.0" + # Listen on this port port = 8080 @@ -37,22 +46,22 @@ reject_future_seconds = 1800 [limits] # Limit events created per second, averaged over one minute. Must be # an integer. If not set (or set to 0), defaults to unlimited. -messages_per_sec = 0 +#messages_per_sec = 0 # Limit the maximum size of an EVENT message. Defaults to 128 KB. # Set to 0 for unlimited. -max_event_bytes = 131072 +#max_event_bytes = 131072 # Maximum WebSocket message in bytes. Defaults to 128 KB. -max_ws_message_bytes = 131072 +#max_ws_message_bytes = 131072 # Maximum WebSocket frame size in bytes. Defaults to 128 KB. -max_ws_frame_bytes = 131072 +#max_ws_frame_bytes = 131072 # Broadcast buffer size, in number of events. This prevents slow # readers from consuming memory. Defaults to 4096. -broadcast_buffer = 4096 +#broadcast_buffer = 4096 # Event persistence buffer size, in number of events. This provides # backpressure to senders if writes are slow. Defaults to 16. -event_persist_buffer = 16 +#event_persist_buffer = 16 diff --git a/src/config.rs b/src/config.rs index e2cdbde..23048df 100644 --- a/src/config.rs +++ b/src/config.rs @@ -22,6 +22,8 @@ pub struct Info { #[allow(unused)] pub struct Database { pub data_directory: String, + pub min_conn: u32, + pub max_conn: u32, } #[derive(Debug, Serialize, Deserialize)] @@ -93,6 +95,13 @@ impl Settings { // override with file contents .with_merged(config::File::with_name("config"))? .try_into()?; + // ensure connection pool size is logical + if settings.database.min_conn > settings.database.max_conn { + panic!( + "Database min_conn setting ({}) cannot exceed max_conn ({})", + settings.database.min_conn, settings.database.max_conn + ); + } Ok(settings) } } @@ -109,6 +118,8 @@ impl Default for Settings { }, database: Database { data_directory: ".".to_owned(), + min_conn: 4, + max_conn: 128, }, network: Network { port: 8080, diff --git a/src/db.rs b/src/db.rs index 0d07843..0b0103f 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,4 +1,5 @@ //! Event persistence and querying +use crate::config; use crate::error::Result; use crate::event::Event; use crate::subscription::Subscription; @@ -11,6 +12,8 @@ use rusqlite::Connection; use rusqlite::OpenFlags; //use std::num::NonZeroU32; use crate::config::SETTINGS; +use r2d2; +use r2d2_sqlite::SqliteConnectionManager; use rusqlite::limits::Limit; use rusqlite::types::ToSql; use std::path::Path; @@ -18,6 +21,8 @@ use std::thread; use std::time::Instant; use tokio::task; +pub type SqlitePool = r2d2::Pool; + /// Database file const DB_FILE: &str = "nostr.db"; @@ -94,6 +99,23 @@ FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE RESTRICT ON DELETE CASCADE CREATE INDEX IF NOT EXISTS pubkey_ref_index ON pubkey_ref(referenced_pubkey); "##; +pub fn build_read_pool() -> SqlitePool { + info!("Build a connection pool"); + let config = config::SETTINGS.read().unwrap(); + let db_dir = &config.database.data_directory; + let full_path = Path::new(db_dir).join(DB_FILE); + let manager = SqliteConnectionManager::file(&full_path) + .with_flags(OpenFlags::SQLITE_OPEN_READ_ONLY) + .with_init(|c| c.execute_batch(STARTUP_SQL)); + let pool: SqlitePool = r2d2::Pool::builder() + .test_on_check_out(true) // no noticeable performance hit + .min_idle(Some(config.database.min_conn)) + .max_size(config.database.max_conn) + .build(manager) + .unwrap(); + return pool; +} + /// Upgrade DB to latest version, and execute pragma settings pub fn upgrade_db(conn: &mut Connection) -> Result<()> { // check the version. @@ -615,22 +637,17 @@ fn query_from_sub(sub: &Subscription) -> (String, Vec>) { /// query is immediately aborted. pub async fn db_query( sub: Subscription, + conn: r2d2::PooledConnection, query_tx: tokio::sync::mpsc::Sender, mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>, ) { task::spawn_blocking(move || { - let config = SETTINGS.read().unwrap(); - let db_dir = &config.database.data_directory; - let full_path = Path::new(db_dir).join(DB_FILE); - - let conn = Connection::open_with_flags(&full_path, OpenFlags::SQLITE_OPEN_READ_ONLY)?; - debug!("opened database for reading"); debug!("going to query for: {:?}", sub); let mut row_count: usize = 0; let start = Instant::now(); // generate SQL query let (q, p) = query_from_sub(&sub); - // execute the query + // execute the query. Don't cache, since queries vary so much. let mut stmt = conn.prepare(&q)?; let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?; while let Some(row) = event_rows.next()? { diff --git a/src/main.rs b/src/main.rs index bea58ba..3a8d6ae 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,6 +43,7 @@ fn db_from_args(args: Vec) -> Option { /// Handle arbitrary HTTP requests, including for WebSocket upgrades. async fn handle_web_request( mut request: Request, + pool: db::SqlitePool, remote_addr: SocketAddr, broadcast: Sender, event_tx: tokio::sync::mpsc::Sender, @@ -83,8 +84,9 @@ async fn handle_web_request( Some(config), ) .await; + tokio::spawn(nostr_server( - ws_stream, broadcast, event_tx, shutdown, + pool, ws_stream, broadcast, event_tx, shutdown, )); } Err(e) => println!( @@ -188,6 +190,8 @@ fn main() -> Result<(), Error> { rt.block_on(async { let settings = config::SETTINGS.read().unwrap(); info!("listening on: {}", socket_addr); + // build a connection pool for sqlite connections + let pool = db::build_read_pool(); // all client-submitted valid events are broadcast to every // other client on this channel. This should be large enough // to accomodate slower readers (messages are dropped if @@ -214,6 +218,7 @@ fn main() -> Result<(), Error> { // A `Service` is needed for every connection, so this // creates one from our `handle_request` function. let make_svc = make_service_fn(|conn: &AddrStream| { + let svc_pool = pool.clone(); let remote_addr = conn.remote_addr(); let bcast = bcast_tx.clone(); let event = event_tx.clone(); @@ -223,6 +228,7 @@ fn main() -> Result<(), Error> { Ok::<_, Infallible>(service_fn(move |request: Request| { handle_web_request( request, + svc_pool.clone(), remote_addr, bcast.clone(), event.clone(), @@ -246,6 +252,7 @@ fn main() -> Result<(), Error> { /// Handle new client connections. This runs through an event loop /// for all client communication. async fn nostr_server( + pool: db::SqlitePool, ws_stream: WebSocketStream, broadcast: Sender, event_tx: tokio::sync::mpsc::Sender, @@ -337,7 +344,9 @@ async fn nostr_server( Ok(()) => { running_queries.insert(s.id.to_owned(), abandon_query_tx); // start a database query - db::db_query(s, query_tx.clone(), abandon_query_rx).await; + // show pool stats + debug!("DB pool stats: {:?}", pool.state()); + db::db_query(s, pool.get().expect("could not get connection"), query_tx.clone(), abandon_query_rx).await; }, Err(e) => { info!("Subscription error: {}", e);