feat: database reader connection pooling

Added connection pooling for queries, as well as basic configuration
options for min/max connections.
This commit is contained in:
Greg Heartsfield 2022-01-25 20:39:24 -06:00
parent af453548ee
commit f1206e76f2
6 changed files with 95 additions and 15 deletions

32
Cargo.lock generated
View File

@ -662,6 +662,8 @@ dependencies = [
"lazy_static", "lazy_static",
"log", "log",
"nonzero_ext", "nonzero_ext",
"r2d2",
"r2d2_sqlite",
"rusqlite", "rusqlite",
"secp256k1", "secp256k1",
"serde 1.0.131", "serde 1.0.131",
@ -811,6 +813,27 @@ dependencies = [
"proc-macro2", "proc-macro2",
] ]
[[package]]
name = "r2d2"
version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "545c5bc2b880973c9c10e4067418407a0ccaa3091781d1671d46eb35107cb26f"
dependencies = [
"log",
"parking_lot",
"scheduled-thread-pool",
]
[[package]]
name = "r2d2_sqlite"
version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54ca3c9468a76fc2ad724c486a59682fc362efeac7b18d1c012958bc19f34800"
dependencies = [
"r2d2",
"rusqlite",
]
[[package]] [[package]]
name = "rand" name = "rand"
version = "0.6.5" version = "0.6.5"
@ -1028,6 +1051,15 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "254df5081ce98661a883445175e52efe99d1cb2a5552891d965d2f5d0cad1c16" checksum = "254df5081ce98661a883445175e52efe99d1cb2a5552891d965d2f5d0cad1c16"
[[package]]
name = "scheduled-thread-pool"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc6f74fd1204073fa02d5d5d68bec8021be4c38690b61264b2fdb48083d0e7d7"
dependencies = [
"parking_lot",
]
[[package]] [[package]]
name = "scopeguard" name = "scopeguard"
version = "1.1.0" version = "1.1.0"

View File

@ -20,6 +20,8 @@ serde = { version = "^1.0", features = ["derive"] }
serde_json = {version = "^1.0", features = ["preserve_order"]} serde_json = {version = "^1.0", features = ["preserve_order"]}
hex = "^0.4" hex = "^0.4"
rusqlite = { version = "^0.26", features = ["limits"]} rusqlite = { version = "^0.26", features = ["limits"]}
r2d2 = "^0.8"
r2d2_sqlite = "^0.19"
lazy_static = "^1.4" lazy_static = "^1.4"
governor = "^0.4" governor = "^0.4"
nonzero_ext = "^0.3" nonzero_ext = "^0.3"

View File

@ -22,9 +22,18 @@ description = "A newly created nostr-rs-relay.\n\nCustomize this with your own i
# line option. # line option.
data_directory = "." data_directory = "."
# Database connection pool settings:
# Minimum number of SQLite reader connections
#min_conn = 4
# Maximum number of SQLite reader connections
#max_conn = 128
[network] [network]
# Bind to this network address # Bind to this network address
address = "0.0.0.0" address = "0.0.0.0"
# Listen on this port # Listen on this port
port = 8080 port = 8080
@ -37,22 +46,22 @@ reject_future_seconds = 1800
[limits] [limits]
# Limit events created per second, averaged over one minute. Must be # Limit events created per second, averaged over one minute. Must be
# an integer. If not set (or set to 0), defaults to unlimited. # an integer. If not set (or set to 0), defaults to unlimited.
messages_per_sec = 0 #messages_per_sec = 0
# Limit the maximum size of an EVENT message. Defaults to 128 KB. # Limit the maximum size of an EVENT message. Defaults to 128 KB.
# Set to 0 for unlimited. # Set to 0 for unlimited.
max_event_bytes = 131072 #max_event_bytes = 131072
# Maximum WebSocket message in bytes. Defaults to 128 KB. # Maximum WebSocket message in bytes. Defaults to 128 KB.
max_ws_message_bytes = 131072 #max_ws_message_bytes = 131072
# Maximum WebSocket frame size in bytes. Defaults to 128 KB. # Maximum WebSocket frame size in bytes. Defaults to 128 KB.
max_ws_frame_bytes = 131072 #max_ws_frame_bytes = 131072
# Broadcast buffer size, in number of events. This prevents slow # Broadcast buffer size, in number of events. This prevents slow
# readers from consuming memory. Defaults to 4096. # readers from consuming memory. Defaults to 4096.
broadcast_buffer = 4096 #broadcast_buffer = 4096
# Event persistence buffer size, in number of events. This provides # Event persistence buffer size, in number of events. This provides
# backpressure to senders if writes are slow. Defaults to 16. # backpressure to senders if writes are slow. Defaults to 16.
event_persist_buffer = 16 #event_persist_buffer = 16

View File

@ -22,6 +22,8 @@ pub struct Info {
#[allow(unused)] #[allow(unused)]
pub struct Database { pub struct Database {
pub data_directory: String, pub data_directory: String,
pub min_conn: u32,
pub max_conn: u32,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
@ -93,6 +95,13 @@ impl Settings {
// override with file contents // override with file contents
.with_merged(config::File::with_name("config"))? .with_merged(config::File::with_name("config"))?
.try_into()?; .try_into()?;
// ensure connection pool size is logical
if settings.database.min_conn > settings.database.max_conn {
panic!(
"Database min_conn setting ({}) cannot exceed max_conn ({})",
settings.database.min_conn, settings.database.max_conn
);
}
Ok(settings) Ok(settings)
} }
} }
@ -109,6 +118,8 @@ impl Default for Settings {
}, },
database: Database { database: Database {
data_directory: ".".to_owned(), data_directory: ".".to_owned(),
min_conn: 4,
max_conn: 128,
}, },
network: Network { network: Network {
port: 8080, port: 8080,

View File

@ -1,4 +1,5 @@
//! Event persistence and querying //! Event persistence and querying
use crate::config;
use crate::error::Result; use crate::error::Result;
use crate::event::Event; use crate::event::Event;
use crate::subscription::Subscription; use crate::subscription::Subscription;
@ -11,6 +12,8 @@ use rusqlite::Connection;
use rusqlite::OpenFlags; use rusqlite::OpenFlags;
//use std::num::NonZeroU32; //use std::num::NonZeroU32;
use crate::config::SETTINGS; use crate::config::SETTINGS;
use r2d2;
use r2d2_sqlite::SqliteConnectionManager;
use rusqlite::limits::Limit; use rusqlite::limits::Limit;
use rusqlite::types::ToSql; use rusqlite::types::ToSql;
use std::path::Path; use std::path::Path;
@ -18,6 +21,8 @@ use std::thread;
use std::time::Instant; use std::time::Instant;
use tokio::task; use tokio::task;
pub type SqlitePool = r2d2::Pool<r2d2_sqlite::SqliteConnectionManager>;
/// Database file /// Database file
const DB_FILE: &str = "nostr.db"; const DB_FILE: &str = "nostr.db";
@ -94,6 +99,23 @@ FOREIGN KEY(event_id) REFERENCES event(id) ON UPDATE RESTRICT ON DELETE CASCADE
CREATE INDEX IF NOT EXISTS pubkey_ref_index ON pubkey_ref(referenced_pubkey); CREATE INDEX IF NOT EXISTS pubkey_ref_index ON pubkey_ref(referenced_pubkey);
"##; "##;
pub fn build_read_pool() -> SqlitePool {
info!("Build a connection pool");
let config = config::SETTINGS.read().unwrap();
let db_dir = &config.database.data_directory;
let full_path = Path::new(db_dir).join(DB_FILE);
let manager = SqliteConnectionManager::file(&full_path)
.with_flags(OpenFlags::SQLITE_OPEN_READ_ONLY)
.with_init(|c| c.execute_batch(STARTUP_SQL));
let pool: SqlitePool = r2d2::Pool::builder()
.test_on_check_out(true) // no noticeable performance hit
.min_idle(Some(config.database.min_conn))
.max_size(config.database.max_conn)
.build(manager)
.unwrap();
return pool;
}
/// Upgrade DB to latest version, and execute pragma settings /// Upgrade DB to latest version, and execute pragma settings
pub fn upgrade_db(conn: &mut Connection) -> Result<()> { pub fn upgrade_db(conn: &mut Connection) -> Result<()> {
// check the version. // check the version.
@ -615,22 +637,17 @@ fn query_from_sub(sub: &Subscription) -> (String, Vec<Box<dyn ToSql>>) {
/// query is immediately aborted. /// query is immediately aborted.
pub async fn db_query( pub async fn db_query(
sub: Subscription, sub: Subscription,
conn: r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>,
query_tx: tokio::sync::mpsc::Sender<QueryResult>, query_tx: tokio::sync::mpsc::Sender<QueryResult>,
mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>, mut abandon_query_rx: tokio::sync::oneshot::Receiver<()>,
) { ) {
task::spawn_blocking(move || { task::spawn_blocking(move || {
let config = SETTINGS.read().unwrap();
let db_dir = &config.database.data_directory;
let full_path = Path::new(db_dir).join(DB_FILE);
let conn = Connection::open_with_flags(&full_path, OpenFlags::SQLITE_OPEN_READ_ONLY)?;
debug!("opened database for reading");
debug!("going to query for: {:?}", sub); debug!("going to query for: {:?}", sub);
let mut row_count: usize = 0; let mut row_count: usize = 0;
let start = Instant::now(); let start = Instant::now();
// generate SQL query // generate SQL query
let (q, p) = query_from_sub(&sub); let (q, p) = query_from_sub(&sub);
// execute the query // execute the query. Don't cache, since queries vary so much.
let mut stmt = conn.prepare(&q)?; let mut stmt = conn.prepare(&q)?;
let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?; let mut event_rows = stmt.query(rusqlite::params_from_iter(p))?;
while let Some(row) = event_rows.next()? { while let Some(row) = event_rows.next()? {

View File

@ -43,6 +43,7 @@ fn db_from_args(args: Vec<String>) -> Option<String> {
/// Handle arbitrary HTTP requests, including for WebSocket upgrades. /// Handle arbitrary HTTP requests, including for WebSocket upgrades.
async fn handle_web_request( async fn handle_web_request(
mut request: Request<Body>, mut request: Request<Body>,
pool: db::SqlitePool,
remote_addr: SocketAddr, remote_addr: SocketAddr,
broadcast: Sender<Event>, broadcast: Sender<Event>,
event_tx: tokio::sync::mpsc::Sender<Event>, event_tx: tokio::sync::mpsc::Sender<Event>,
@ -83,8 +84,9 @@ async fn handle_web_request(
Some(config), Some(config),
) )
.await; .await;
tokio::spawn(nostr_server( tokio::spawn(nostr_server(
ws_stream, broadcast, event_tx, shutdown, pool, ws_stream, broadcast, event_tx, shutdown,
)); ));
} }
Err(e) => println!( Err(e) => println!(
@ -188,6 +190,8 @@ fn main() -> Result<(), Error> {
rt.block_on(async { rt.block_on(async {
let settings = config::SETTINGS.read().unwrap(); let settings = config::SETTINGS.read().unwrap();
info!("listening on: {}", socket_addr); info!("listening on: {}", socket_addr);
// build a connection pool for sqlite connections
let pool = db::build_read_pool();
// all client-submitted valid events are broadcast to every // all client-submitted valid events are broadcast to every
// other client on this channel. This should be large enough // other client on this channel. This should be large enough
// to accomodate slower readers (messages are dropped if // to accomodate slower readers (messages are dropped if
@ -214,6 +218,7 @@ fn main() -> Result<(), Error> {
// A `Service` is needed for every connection, so this // A `Service` is needed for every connection, so this
// creates one from our `handle_request` function. // creates one from our `handle_request` function.
let make_svc = make_service_fn(|conn: &AddrStream| { let make_svc = make_service_fn(|conn: &AddrStream| {
let svc_pool = pool.clone();
let remote_addr = conn.remote_addr(); let remote_addr = conn.remote_addr();
let bcast = bcast_tx.clone(); let bcast = bcast_tx.clone();
let event = event_tx.clone(); let event = event_tx.clone();
@ -223,6 +228,7 @@ fn main() -> Result<(), Error> {
Ok::<_, Infallible>(service_fn(move |request: Request<Body>| { Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {
handle_web_request( handle_web_request(
request, request,
svc_pool.clone(),
remote_addr, remote_addr,
bcast.clone(), bcast.clone(),
event.clone(), event.clone(),
@ -246,6 +252,7 @@ fn main() -> Result<(), Error> {
/// Handle new client connections. This runs through an event loop /// Handle new client connections. This runs through an event loop
/// for all client communication. /// for all client communication.
async fn nostr_server( async fn nostr_server(
pool: db::SqlitePool,
ws_stream: WebSocketStream<Upgraded>, ws_stream: WebSocketStream<Upgraded>,
broadcast: Sender<Event>, broadcast: Sender<Event>,
event_tx: tokio::sync::mpsc::Sender<Event>, event_tx: tokio::sync::mpsc::Sender<Event>,
@ -337,7 +344,9 @@ async fn nostr_server(
Ok(()) => { Ok(()) => {
running_queries.insert(s.id.to_owned(), abandon_query_tx); running_queries.insert(s.id.to_owned(), abandon_query_tx);
// start a database query // start a database query
db::db_query(s, query_tx.clone(), abandon_query_rx).await; // show pool stats
debug!("DB pool stats: {:?}", pool.state());
db::db_query(s, pool.get().expect("could not get connection"), query_tx.clone(), abandon_query_rx).await;
}, },
Err(e) => { Err(e) => {
info!("Subscription error: {}", e); info!("Subscription error: {}", e);