From 16f6e974c8da536a155f74f77440e2a484dd9693 Mon Sep 17 00:00:00 2001 From: Kieran Date: Sun, 22 Jan 2023 16:26:54 -0600 Subject: [PATCH] feat: add support for PostgreSQL as a backend repository --- Cargo.lock | 692 ++++++++++++++++++++++++++++++++- Cargo.toml | 3 + config.toml | 10 +- src/config.rs | 2 + src/db.rs | 25 +- src/error.rs | 10 + src/repo/mod.rs | 2 + src/repo/postgres.rs | 692 +++++++++++++++++++++++++++++++++ src/repo/postgres_migration.rs | 120 ++++++ 9 files changed, 1539 insertions(+), 17 deletions(-) create mode 100644 src/repo/postgres.rs create mode 100644 src/repo/postgres_migration.rs diff --git a/Cargo.lock b/Cargo.lock index cfb382a..856bd8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -58,6 +58,102 @@ version = "1.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61" +[[package]] +name = "async-channel" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + +[[package]] +name = "async-executor" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17adb73da160dfb475c183343c8cccd80721ea5a605d3eb57125f0a7b7a92d0b" +dependencies = [ + "async-lock", + "async-task", + "concurrent-queue", + "fastrand", + "futures-lite", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776" +dependencies = [ + "async-channel", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "once_cell", +] + +[[package]] +name = "async-io" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c374dda1ed3e7d8f0d9ba58715f924862c63eae6849c92d3a18e7fbde9e2794" +dependencies = [ + "async-lock", + "autocfg 1.1.0", + "concurrent-queue", + "futures-lite", + "libc", + "log", + "parking", + "polling", + "slab", + "socket2", + "waker-fn", + "windows-sys", +] + +[[package]] +name = "async-lock" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8101efe8695a6c17e02911402145357e718ac92d3ff88ae8419e84b1707b685" +dependencies = [ + "event-listener", + "futures-lite", +] + +[[package]] +name = "async-std" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +dependencies = [ + "async-channel", + "async-global-executor", + "async-io", + "async-lock", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + [[package]] name = "async-stream" version = "0.3.3" @@ -79,6 +175,12 @@ dependencies = [ "syn", ] +[[package]] +name = "async-task" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a40729d2133846d9ed0ea60a8b9541bccddab49cd30f0715a1da672fe9a2524" + [[package]] name = "async-trait" version = "0.1.61" @@ -90,6 +192,21 @@ dependencies = [ "syn", ] +[[package]] +name = "atoi" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7c57d12312ff59c811c0643f4d80830505833c9ffaebd193d819392b265be8e" +dependencies = [ + "num-traits", +] + +[[package]] +name = "atomic-waker" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "debc29dde2e69f9e47506b525f639ed42300fc014a3e007832592448fa8e4599" + [[package]] name = "autocfg" version = "0.1.8" @@ -157,6 +274,12 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" +[[package]] +name = "base64" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a" + [[package]] name = "bitcoin_hashes" version = "0.10.0" @@ -181,6 +304,20 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c67b173a56acffd6d2326fb7ab938ba0b00a71480e14902b2591c87bc5741e8" +dependencies = [ + "async-channel", + "async-lock", + "async-task", + "atomic-waker", + "fastrand", + "futures-lite", +] + [[package]] name = "bumpalo" version = "3.11.1" @@ -218,8 +355,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16b0a3d9ed01224b22057780a37bb8c5dbfe1be8ba48678e7bf57ec4b385411f" dependencies = [ "iana-time-zone", + "js-sys", "num-integer", "num-traits", + "time", + "wasm-bindgen", "winapi", ] @@ -279,6 +419,15 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "concurrent-queue" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c278839b831783b70278b14df4d45e1beb1aad306c07bb796637de9a0e323e8e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "config" version = "0.12.0" @@ -379,6 +528,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53757d12b596c16c78b83458d732a5d1a17ab3f53f2f7412f6fb57cc8a140ab3" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484" + [[package]] name = "crc32fast" version = "1.3.2" @@ -398,6 +562,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.14" @@ -417,6 +591,16 @@ dependencies = [ "typenum", ] +[[package]] +name = "ctor" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "cxx" version = "1.0.86" @@ -471,7 +655,7 @@ dependencies = [ "hashbrown 0.12.3", "lock_api", "once_cell", - "parking_lot_core", + "parking_lot_core 0.9.6", ] [[package]] @@ -482,6 +666,27 @@ checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f" dependencies = [ "block-buffer", "crypto-common", + "subtle", +] + +[[package]] +name = "dirs" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6" +dependencies = [ + "libc", + "redox_users", + "winapi", ] [[package]] @@ -493,6 +698,12 @@ dependencies = [ "rand 0.8.5", ] +[[package]] +name = "dotenvy" +version = "0.15.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03d8c417d7a8cb362e0c37e5d815f5eb7c37f79ff93707329d5a194e42e54ca0" + [[package]] name = "either" version = "1.8.0" @@ -520,6 +731,12 @@ dependencies = [ "libc", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "fallible-iterator" version = "0.2.0" @@ -629,12 +846,38 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-intrusive" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a604f7a68fbf8103337523b1fadc8ade7361ee3f112f7c680ad179651616aed5" +dependencies = [ + "futures-core", + "lock_api", + "parking_lot 0.11.2", +] + [[package]] name = "futures-io" version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" +[[package]] +name = "futures-lite" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.25" @@ -703,6 +946,18 @@ dependencies = [ "wasi 0.11.0+wasi-snapshot-preview1", ] +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "governor" version = "0.4.2" @@ -714,7 +969,7 @@ dependencies = [ "futures-timer", "no-std-compat", "nonzero_ext", - "parking_lot", + "parking_lot 0.12.1", "quanta", "rand 0.8.5", "smallvec", @@ -762,6 +1017,9 @@ name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +dependencies = [ + "ahash 0.7.6", +] [[package]] name = "hashlink" @@ -772,13 +1030,22 @@ dependencies = [ "hashbrown 0.11.2", ] +[[package]] +name = "hashlink" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69fe1fcf8b4278d860ad0548329f892a3631fb63f82574df68275f34cdbe0ffa" +dependencies = [ + "hashbrown 0.12.3", +] + [[package]] name = "hdrhistogram" version = "7.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f19b9f54f7c7f55e31401bb647626ce0cf0f67b0004982ce815b3ee72a02aa8" dependencies = [ - "base64", + "base64 0.13.1", "byteorder", "flate2", "nom", @@ -790,6 +1057,9 @@ name = "heck" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" +dependencies = [ + "unicode-segmentation", +] [[package]] name = "hermit-abi" @@ -806,6 +1076,24 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hkdf" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "791a029f6b9fc27657f6f188ec6e5e43f6911f6f878e0dc5501396e09809d437" +dependencies = [ + "hmac", +] + +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + [[package]] name = "http" version = "0.2.8" @@ -1011,6 +1299,15 @@ dependencies = [ "serde", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -1072,6 +1369,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" dependencies = [ "cfg-if", + "value-bag", ] [[package]] @@ -1107,6 +1405,15 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" +[[package]] +name = "md-5" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca" +dependencies = [ + "digest", +] + [[package]] name = "memchr" version = "2.5.0" @@ -1191,8 +1498,10 @@ name = "nostr-rs-relay" version = "0.7.17" dependencies = [ "anyhow", + "async-std", "async-trait", "bitcoin_hashes", + "chrono", "clap", "config", "console-subscriber", @@ -1216,6 +1525,7 @@ dependencies = [ "secp256k1", "serde", "serde_json", + "sqlx", "thiserror", "tokio", "tokio-tungstenite", @@ -1379,6 +1689,23 @@ version = "6.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee" +[[package]] +name = "parking" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" + +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.6", +] + [[package]] name = "parking_lot" version = "0.12.1" @@ -1386,7 +1713,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.6", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall", + "smallvec", + "winapi", ] [[package]] @@ -1413,6 +1754,12 @@ dependencies = [ "regex", ] +[[package]] +name = "paste" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d01a5bd0424d00070b0098dd17ebca6f961a959dead1dbcbbbc1d1cd8d3deeba" + [[package]] name = "pathdiff" version = "0.2.1" @@ -1507,6 +1854,20 @@ version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160" +[[package]] +name = "polling" +version = "2.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22122d5ec4f9fe1b3916419b76be1e80bcb93f618d071d2edf841b137b2a2bd6" +dependencies = [ + "autocfg 1.1.0", + "cfg-if", + "libc", + "log", + "wepoll-ffi", + "windows-sys", +] + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1556,7 +1917,7 @@ dependencies = [ "fnv", "lazy_static", "memchr", - "parking_lot", + "parking_lot 0.12.1", "protobuf", "thiserror", ] @@ -1611,7 +1972,7 @@ dependencies = [ "mach", "once_cell", "raw-cpuid", - "wasi 0.10.2+wasi-snapshot-preview1", + "wasi 0.10.0+wasi-snapshot-preview1", "web-sys", "winapi", ] @@ -1632,7 +1993,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" dependencies = [ "log", - "parking_lot", + "parking_lot 0.12.1", "scheduled-thread-pool", ] @@ -1809,6 +2170,17 @@ dependencies = [ "bitflags", ] +[[package]] +name = "redox_users" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b" +dependencies = [ + "getrandom", + "redox_syscall", + "thiserror", +] + [[package]] name = "regex" version = "1.7.1" @@ -1844,13 +2216,28 @@ dependencies = [ "winapi", ] +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted", + "web-sys", + "winapi", +] + [[package]] name = "ron" version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "88073939a61e5b7680558e6be56b419e208420c2adb92be54921fa6b72283f1a" dependencies = [ - "base64", + "base64 0.13.1", "bitflags", "serde", ] @@ -1864,7 +2251,7 @@ dependencies = [ "bitflags", "fallible-iterator", "fallible-streaming-iterator", - "hashlink", + "hashlink 0.7.0", "libsqlite3-sys", "memchr", "smallvec", @@ -1894,6 +2281,27 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "rustls" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f" +dependencies = [ + "log", + "ring", + "sct", + "webpki", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b" +dependencies = [ + "base64 0.21.0", +] + [[package]] name = "rustversion" version = "1.0.11" @@ -1921,7 +2329,7 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "977a7519bff143a44f842fd07e80ad1329295bd71686457f18e496736f4bf9bf" dependencies = [ - "parking_lot", + "parking_lot 0.12.1", ] [[package]] @@ -1936,6 +2344,16 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2" +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "secp256k1" version = "0.21.3" @@ -2023,6 +2441,17 @@ dependencies = [ "digest", ] +[[package]] +name = "sha1" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.6" @@ -2077,12 +2506,139 @@ dependencies = [ "winapi", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + +[[package]] +name = "sqlformat" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c12bc9199d1db8234678b7051747c07f517cdcf019262d1847b94ec8b1aee3e" +dependencies = [ + "itertools", + "nom", + "unicode_categories", +] + +[[package]] +name = "sqlx" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9249290c05928352f71c077cc44a464d880c63f26f7534728cca008e135c0428" +dependencies = [ + "sqlx-core", + "sqlx-macros", +] + +[[package]] +name = "sqlx-core" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcbc16ddba161afc99e14d1713a453747a2b07fc097d2009f4c300ec99286105" +dependencies = [ + "ahash 0.7.6", + "atoi", + "base64 0.13.1", + "bitflags", + "byteorder", + "bytes", + "chrono", + "crc", + "crossbeam-queue", + "dirs", + "dotenvy", + "either", + "event-listener", + "futures-channel", + "futures-core", + "futures-intrusive", + "futures-util", + "hashlink 0.8.1", + "hex", + "hkdf", + "hmac", + "indexmap", + "itoa", + "libc", + "log", + "md-5", + "memchr", + "once_cell", + "paste", + "percent-encoding", + "rand 0.8.5", + "rustls", + "rustls-pemfile", + "serde", + "serde_json", + "sha1", + "sha2", + "smallvec", + "sqlformat", + "sqlx-rt", + "stringprep", + "thiserror", + "tokio-stream", + "url", + "webpki-roots", + "whoami", +] + +[[package]] +name = "sqlx-macros" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b850fa514dc11f2ee85be9d055c512aa866746adfacd1cb42d867d68e6a5b0d9" +dependencies = [ + "dotenvy", + "either", + "heck", + "once_cell", + "proc-macro2", + "quote", + "sha2", + "sqlx-core", + "sqlx-rt", + "syn", + "url", +] + +[[package]] +name = "sqlx-rt" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24c5b2d25fa654cc5f841750b8e1cdedbe21189bf9a9382ee90bfa9dd3562396" +dependencies = [ + "once_cell", + "tokio", + "tokio-rustls", +] + +[[package]] +name = "stringprep" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "strsim" version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "subtle" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" + [[package]] name = "syn" version = "1.0.107" @@ -2152,6 +2708,17 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a" +dependencies = [ + "libc", + "wasi 0.10.0+wasi-snapshot-preview1", + "winapi", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -2179,7 +2746,7 @@ dependencies = [ "memchr", "mio", "num_cpus", - "parking_lot", + "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", "socket2", @@ -2219,6 +2786,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.23.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" +dependencies = [ + "rustls", + "tokio", + "webpki", +] + [[package]] name = "tokio-stream" version = "0.1.11" @@ -2274,7 +2852,7 @@ dependencies = [ "async-stream", "async-trait", "axum", - "base64", + "base64 0.13.1", "bytes", "futures-core", "futures-util", @@ -2462,7 +3040,7 @@ version = "0.17.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e27992fd6a8c29ee7eef28fc78349aa244134e10ad447ce3b9f0ac0ed0fa4ce0" dependencies = [ - "base64", + "base64 0.13.1", "byteorder", "bytes", "http", @@ -2508,6 +3086,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-segmentation" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fdbf052a0783de01e944a6ce7a8cb939e295b1e7be835a1112c3b9a7f047a5a" + [[package]] name = "unicode-width" version = "0.1.10" @@ -2520,6 +3104,18 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" +[[package]] +name = "unicode_categories" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" + +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "url" version = "2.3.1" @@ -2552,6 +3148,16 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "value-bag" +version = "1.0.0-alpha.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2209b78d1249f7e6f3293657c9779fe31ced465df091bbd433a1cf88e916ec55" +dependencies = [ + "ctor", + "version_check", +] + [[package]] name = "vcpkg" version = "0.2.15" @@ -2564,6 +3170,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "want" version = "0.3.0" @@ -2576,9 +3188,9 @@ dependencies = [ [[package]] name = "wasi" -version = "0.10.2+wasi-snapshot-preview1" +version = "0.10.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" +checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" [[package]] name = "wasi" @@ -2611,6 +3223,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23639446165ca5a5de86ae1d8896b737ae80319560fbaa4c2887b7da6e7ebd7d" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.83" @@ -2650,6 +3274,44 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + +[[package]] +name = "webpki-roots" +version = "0.22.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87" +dependencies = [ + "webpki", +] + +[[package]] +name = "wepoll-ffi" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d743fdedc5c64377b5fc2bc036b01c7fd642205a0d96356034ae3404d49eb7fb" +dependencies = [ + "cc", +] + +[[package]] +name = "whoami" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45dbc71f0cdca27dc261a9bd37ddec174e4a0af2b900b890f378460f745426e3" +dependencies = [ + "wasm-bindgen", + "web-sys", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 3b2164b..c267fe7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,9 @@ rand = "0.8" const_format = "0.2.28" regex = "1" async-trait = "0.1.60" +async-std = "1.12.0" +sqlx = { version ="0.6.2", features=["runtime-tokio-rustls", "postgres", "chrono"]} +chrono = "0.4.23" prometheus = "0.13.3" [dev-dependencies] diff --git a/config.toml b/config.toml index 9777280..f6ff68f 100644 --- a/config.toml +++ b/config.toml @@ -21,13 +21,17 @@ description = "A newly created nostr-rs-relay.\n\nCustomize this with your own i #tracing = false [database] +# Database engine (sqlite/postgres). Defaults to sqlite. +# Support for postgres is currently experimental. +#engine = "sqlite" + # Directory for SQLite files. Defaults to the current directory. Can # also be specified (and overriden) with the "--db dirname" command # line option. #data_directory = "." - # Use an in-memory database instead of 'nostr.db'. +# Requires sqlite engine. # Caution; this will not survive a process restart! #in_memory = false @@ -40,6 +44,10 @@ description = "A newly created nostr-rs-relay.\n\nCustomize this with your own i # to approx the number of cores. #max_conn = 8 +# Database connection string. Required for postgres; not used for +# sqlite. +#connection = "postgresql://postgres:nostr@localhost:7500/nostr" + [network] # Bind to this network address address = "0.0.0.0" diff --git a/src/config.rs b/src/config.rs index f2e5530..dec643f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -22,6 +22,7 @@ pub struct Database { pub in_memory: bool, pub min_conn: u32, pub max_conn: u32, + pub connection: String, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -211,6 +212,7 @@ impl Default for Settings { in_memory: false, min_conn: 4, max_conn: 8, + connection: "".to_owned(), }, network: Network { port: 8080, diff --git a/src/db.rs b/src/db.rs index 1bdd03c..4a64b21 100644 --- a/src/db.rs +++ b/src/db.rs @@ -9,9 +9,14 @@ use governor::{Quota, RateLimiter}; use r2d2; use std::sync::Arc; use std::thread; +use sqlx::pool::PoolOptions; +use sqlx::postgres::PgConnectOptions; +use sqlx::ConnectOptions; use crate::repo::sqlite::SqliteRepo; +use crate::repo::postgres::{PostgresRepo,PostgresPool,PostgresRepoSettings}; use crate::repo::NostrRepo; -use std::time::Instant; +use std::time::{Instant, Duration}; +use tracing::log::LevelFilter; use tracing::{debug, info, trace, warn}; pub type SqlitePool = r2d2::Pool; @@ -33,6 +38,7 @@ pub const DB_FILE: &str = "nostr.db"; pub async fn build_repo(settings: &Settings, metrics: NostrMetrics) -> Arc { match settings.database.engine.as_str() { "sqlite" => {Arc::new(build_sqlite_pool(settings, metrics).await)}, + "postgres" => {Arc::new(build_postgres_pool(settings, metrics).await)}, _ => panic!("Unknown database engine"), } } @@ -44,6 +50,23 @@ async fn build_sqlite_pool(settings: &Settings, metrics: NostrMetrics) -> Sqlite repo } +async fn build_postgres_pool(settings: &Settings, metrics: NostrMetrics) -> PostgresRepo { + let mut options: PgConnectOptions = settings.database.connection.as_str().parse().unwrap(); + options.log_statements(LevelFilter::Debug); + options.log_slow_statements(LevelFilter::Warn, Duration::from_secs(60)); + + let pool: PostgresPool = PoolOptions::new() + .max_connections(settings.database.max_conn) + .min_connections(settings.database.min_conn) + .idle_timeout(Duration::from_secs(60)) + .connect_with(options) + .await + .unwrap(); + PostgresRepo::new(pool, metrics, PostgresRepoSettings { + cleanup_contact_list: true + }) +} + /// Spawn a database writer that persists events to the `SQLite` store. pub async fn db_writer( repo: Arc, diff --git a/src/error.rs b/src/error.rs index 525c459..0f87060 100644 --- a/src/error.rs +++ b/src/error.rs @@ -48,6 +48,10 @@ pub enum Error { DatabaseDirError, #[error("Database Connection Pool Error")] DatabasePoolError(r2d2::Error), + #[error("SQL error")] + SqlxError(sqlx::Error), + #[error("Database Connection Pool Error")] + SqlxDatabasePoolError(sqlx::Error), #[error("Custom Error : {0}")] CustomError(String), #[error("Task join error")] @@ -100,6 +104,12 @@ impl From for Error { } } +impl From for Error { + fn from(d: sqlx::Error) -> Self { + Error::SqlxDatabasePoolError(d) + } +} + impl From for Error { /// Wrap JSON error fn from(r: serde_json::Error) -> Self { diff --git a/src/repo/mod.rs b/src/repo/mod.rs index 191409f..a74c665 100644 --- a/src/repo/mod.rs +++ b/src/repo/mod.rs @@ -9,6 +9,8 @@ use rand::Rng; pub mod sqlite; pub mod sqlite_migration; +pub mod postgres; +pub mod postgres_migration; #[async_trait] pub trait NostrRepo: Send + Sync { diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs new file mode 100644 index 0000000..add7856 --- /dev/null +++ b/src/repo/postgres.rs @@ -0,0 +1,692 @@ +use crate::db::QueryResult; +use crate::error::Result; +use crate::event::{single_char_tagname, Event}; +use crate::nip05::{Nip05Name, VerificationRecord}; +use crate::repo::{now_jitter, NostrRepo}; +use crate::subscription::{ReqFilter, Subscription}; +use async_std::stream::StreamExt; +use async_trait::async_trait; +use chrono::{DateTime, TimeZone, Utc}; +use sqlx::postgres::PgRow; +use sqlx::{Error, Execute, FromRow, Postgres, QueryBuilder, Row}; +use std::time::{Duration, Instant}; +use sqlx::Error::RowNotFound; + +use crate::hexrange::{hex_range, HexSearch}; +use crate::repo::postgres_migration::run_migrations; +use crate::server::NostrMetrics; +use crate::utils::{is_hex, is_lower_hex}; +use tokio::sync::mpsc::Sender; +use tokio::sync::oneshot::Receiver; +use tracing::log::trace; +use tracing::{debug, error, info}; +use crate::error; + +pub type PostgresPool = sqlx::pool::Pool; + +pub struct PostgresRepo { + conn: PostgresPool, + metrics: NostrMetrics, + settings: PostgresRepoSettings +} + +pub struct PostgresRepoSettings { + pub cleanup_contact_list: bool +} + +impl PostgresRepo { + pub fn new(c: PostgresPool, m: NostrMetrics, s: PostgresRepoSettings) -> PostgresRepo { + PostgresRepo { + conn: c, + metrics: m, + settings: s, + } + } +} + +#[async_trait] +impl NostrRepo for PostgresRepo { + + async fn start(&self) -> Result<()> { + info!("not implemented"); + Ok(()) + } + + async fn migrate_up(&self) -> Result { + run_migrations(&self.conn).await + } + + async fn write_event(&self, e: &Event) -> Result { + // start transaction + let mut tx = self.conn.begin().await?; + let start = Instant::now(); + + // get relevant fields from event and convert to blobs. + let id_blob = hex::decode(&e.id).ok(); + let pubkey_blob: Option> = hex::decode(&e.pubkey).ok(); + let delegator_blob: Option> = + e.delegated_by.as_ref().and_then(|d| hex::decode(d).ok()); + let event_str = serde_json::to_string(&e).unwrap(); + + // ignore if the event hash is a duplicate. + let mut ins_count = sqlx::query( + r#"INSERT INTO "event" +(id, pub_key, created_at, kind, "content", delegated_by) +VALUES($1, $2, $3, $4, $5, $6) +ON CONFLICT (id) DO NOTHING"#, + ) + .bind(&id_blob) + .bind(&pubkey_blob) + .bind(Utc.timestamp_opt(e.created_at as i64, 0).unwrap()) + .bind(e.kind as i64) + .bind(event_str.into_bytes()) + .bind(delegator_blob) + .execute(&mut tx) + .await? + .rows_affected(); + + if ins_count == 0 { + // if the event was a duplicate, no need to insert event or + // pubkey references. This will abort the txn. + return Ok(0); + } + + // add all tags to the tag table + for tag in e.tags.iter() { + // ensure we have 2 values. + if tag.len() >= 2 { + let tag_name = &tag[0]; + let tag_val = &tag[1]; + // only single-char tags are searchable + let tag_char_opt = single_char_tagname(tag_name); + let query = "INSERT INTO tag (event_id, \"name\", value) VALUES($1, $2, $3) \ + ON CONFLICT (event_id, \"name\", value) DO NOTHING"; + match &tag_char_opt { + Some(_) => { + // if tag value is lowercase hex; + if is_lower_hex(tag_val) && (tag_val.len() % 2 == 0) { + sqlx::query(query) + .bind(&id_blob) + .bind(tag_name) + .bind(hex::decode(tag_val).ok()) + .execute(&mut tx) + .await?; + } else { + sqlx::query(query) + .bind(&id_blob) + .bind(tag_name) + .bind(tag_val.as_bytes()) + .execute(&mut tx) + .await?; + } + } + None => {} + } + } + } + + // if this event is replaceable update, hide every other replaceable + // event with the same kind from the same author that was issued + // earlier than this. + if e.kind == 3 && self.settings.cleanup_contact_list { + sqlx::query("delete from \"event\" where id != $1 and kind = 3 and pub_key = $2") + .bind(&id_blob) + .bind(hex::decode(&e.pubkey).ok()) + .execute(&mut tx) + .await?; + } else if e.kind == 0 || e.kind == 3 || (e.kind >= 10000 && e.kind < 20000) { + let update_count = sqlx::query("UPDATE \"event\" SET hidden = 1::bit(1) \ + WHERE id != $1 AND kind = $2 AND pub_key = $3 AND created_at <= $4 and hidden != 1::bit(1)") + .bind(&id_blob) + .bind(e.kind as i64) + .bind(hex::decode(&e.pubkey).ok()) + .bind(Utc.timestamp_opt(e.created_at as i64, 0).unwrap()) + .execute(&mut tx) + .await? + .rows_affected(); + if update_count > 0 { + info!( + "hid {} older replaceable kind {} events for author: {:?}", + update_count, + e.kind, + e.get_author_prefix() + ); + } + } + + // if this event is a deletion, hide the referenced events from the same author. + if e.kind == 5 { + let event_candidates = e.tag_values_by_name("e"); + let pub_keys: Vec> = event_candidates + .iter() + .filter(|x| is_hex(x) && x.len() == 64) + .filter_map(|x| hex::decode(x).ok()) + .collect(); + + let mut builder = QueryBuilder::new( + "UPDATE \"event\" SET hidden = 1::bit(1) WHERE kind != 5 AND pub_key = ", + ); + builder.push_bind(hex::decode(&e.pubkey).ok()); + builder.push(" AND id IN ("); + + let mut sep = builder.separated(", "); + for pk in pub_keys { + sep.push_bind(pk); + } + sep.push_unseparated(")"); + + let update_count = builder.build().execute(&mut tx).await?.rows_affected(); + info!( + "hid {} deleted events for author {:?}", + update_count, + e.get_author_prefix() + ); + } else { + // check if a deletion has already been recorded for this event. + // Only relevant for non-deletion events + let del_count = sqlx::query( + "SELECT e.id FROM \"event\" e \ + LEFT JOIN tag t ON e.id = t.event_id \ + WHERE e.pub_key = $1 AND t.\"name\" = 'e' AND e.kind = 5 AND t.value = $2 LIMIT 1", + ) + .bind(&pubkey_blob) + .bind(&id_blob) + .fetch_optional(&mut tx) + .await?; + + // check if a the query returned a result, meaning we should + // hid the current event + if del_count.is_some() { + // a deletion already existed, mark original event as hidden. + info!( + "hid event: {:?} due to existing deletion by author: {:?}", + e.get_event_id_prefix(), + e.get_author_prefix() + ); + sqlx::query("UPDATE \"event\" SET hidden = 1::bit(1) WHERE id = $1") + .bind(&id_blob) + .execute(&mut tx) + .await?; + // event was deleted, so let caller know nothing new + // arrived, preventing this from being sent to active + // subscriptions + ins_count = 0; + } + } + tx.commit().await?; + self.metrics + .write_events + .observe(start.elapsed().as_secs_f64()); + Ok(ins_count) + } + + async fn query_subscription( + &self, + sub: Subscription, + client_id: String, + query_tx: Sender, + mut abandon_query_rx: Receiver<()>, + ) -> Result<()> { + let start = Instant::now(); + let mut row_count: usize = 0; + + for filter in sub.filters.iter() { + let start = Instant::now(); + // generate SQL query + let q_filter = query_from_filter(filter); + if q_filter.is_none() { + debug!("Failed to generate query!"); + continue; + } + + debug!("SQL generated in {:?}", start.elapsed()); + + // cutoff for displaying slow queries + let slow_cutoff = Duration::from_millis(2000); + + // any client that doesn't cause us to generate new rows in 5 + // seconds gets dropped. + let abort_cutoff = Duration::from_secs(5); + + let start = Instant::now(); + let mut slow_first_event; + let mut last_successful_send = Instant::now(); + + // execute the query. Don't cache, since queries vary so much. + let mut q_filter = q_filter.unwrap(); + let q_build = q_filter.build(); + let sql = q_build.sql(); + let mut results = q_build.fetch(&self.conn); + + let mut first_result = true; + while let Some(row) = results.next().await { + if let Err(e) = row { + error!("Query failed: {} {} {:?}", e, sql, filter); + break; + } + let first_event_elapsed = start.elapsed(); + slow_first_event = first_event_elapsed >= slow_cutoff; + if first_result { + debug!( + "first result in {:?} (cid: {}, sub: {:?})", + first_event_elapsed, client_id, sub.id + ); + first_result = false; + } + + // logging for slow queries; show sub and SQL. + // to reduce logging; only show 1/16th of clients (leading 0) + if slow_first_event && client_id.starts_with("00") { + debug!( + "query req (slow): {:?} (cid: {}, sub: {:?})", + &sub, client_id, sub.id + ); + } else { + trace!( + "query req: {:?} (cid: {}, sub: {:?})", + &sub, + client_id, + sub.id + ); + } + + // check if this is still active; every 100 rows + if row_count % 100 == 0 && abandon_query_rx.try_recv().is_ok() { + debug!("query aborted (cid: {}, sub: {:?})", client_id, sub.id); + return Ok(()); + } + + row_count += 1; + let event_json: Vec = row.unwrap().get(0); + loop { + if query_tx.capacity() != 0 { + // we have capacity to add another item + break; + } else { + // the queue is full + trace!("db reader thread is stalled"); + if last_successful_send + abort_cutoff < Instant::now() { + // the queue has been full for too long, abort + info!("aborting database query due to slow client"); + return Ok(()); + } + // give the queue a chance to clear before trying again + async_std::task::sleep(Duration::from_millis(100)).await; + } + } + + // TODO: we could use try_send, but we'd have to juggle + // getting the query result back as part of the error + // result. + query_tx + .send(QueryResult { + sub_id: sub.get_id(), + event: String::from_utf8(event_json).unwrap(), + }) + .await + .ok(); + last_successful_send = Instant::now(); + } + } + query_tx + .send(QueryResult { + sub_id: sub.get_id(), + event: "EOSE".to_string(), + }) + .await + .ok(); + self.metrics + .query_sub + .observe(start.elapsed().as_secs_f64()); + debug!( + "query completed in {:?} (cid: {}, sub: {:?}, db_time: {:?}, rows: {})", + start.elapsed(), + client_id, + sub.id, + start.elapsed(), + row_count + ); + Ok(()) + } + + async fn optimize_db(&self) -> Result<()> { + // Not implemented + Ok(()) + } + + async fn create_verification_record(&self, event_id: &str, name: &str) -> Result<()> { + let mut tx = self.conn.begin().await?; + + sqlx::query("DELETE FROM user_verification WHERE \"name\" = $1") + .bind(name) + .execute(&mut tx) + .await?; + + sqlx::query("INSERT INTO user_verification (event_id, \"name\", verified_at) VALUES ($1, $2, now())") + .bind(hex::decode(event_id).ok()) + .bind(name) + .execute(&mut tx) + .await?; + + tx.commit().await?; + info!("saved new verification record for ({:?})", name); + Ok(()) + } + + async fn update_verification_timestamp(&self, id: u64) -> Result<()> { + // add some jitter to the verification to prevent everything from stacking up together. + let verify_time = now_jitter(600); + + // update verification time and reset any failure count + sqlx::query( + "UPDATE user_verification SET verified_at = $1, fail_count = 0 WHERE id = $2", + ) + .bind(Utc.timestamp_opt(verify_time as i64, 0).unwrap()) + .bind(id as i64) + .execute(&self.conn) + .await?; + + info!("verification updated for {}", id); + Ok(()) + } + + async fn fail_verification(&self, id: u64) -> Result<()> { + sqlx::query("UPDATE user_verification SET failed_at = now(), fail_count = fail_count + 1 WHERE id = $1") + .bind(id as i64) + .execute(&self.conn) + .await?; + Ok(()) + } + + async fn delete_verification(&self, id: u64) -> Result<()> { + sqlx::query("DELETE FROM user_verification WHERE id = $1") + .bind(id as i64) + .execute(&self.conn) + .await?; + Ok(()) + } + + async fn get_latest_user_verification(&self, pub_key: &str) -> Result { + let query = r#"SELECT + v.id, + v."name", + e.id as event_id, + e.pub_key, + e.created_at, + v.verified_at, + v.failed_at, + v.fail_count + FROM user_verification v + LEFT JOIN "event" e ON e.id = v.event_id + WHERE e.pub_key = $1 + ORDER BY e.created_at DESC, v.verified_at DESC, v.failed_at DESC + LIMIT 1"#; + sqlx::query_as::<_, VerificationRecord>(query) + .bind(hex::decode(pub_key).ok()) + .fetch_optional(&self.conn) + .await? + .ok_or(error::Error::SqlxError(RowNotFound)) + } + + async fn get_oldest_user_verification(&self, before: u64) -> Result { + let query = r#"SELECT + v.id, + v."name", + e.id as event_id, + e.pub_key, + e.created_at, + v.verified_at, + v.failed_at, + v.fail_count + FROM user_verification v + LEFT JOIN "event" e ON e.id = v.event_id + WHERE (v.verified_at < $1 OR v.verified_at IS NULL) + AND (v.failed_at < $1 OR v.failed_at IS NULL) + ORDER BY v.verified_at ASC, v.failed_at ASC + LIMIT 1"#; + sqlx::query_as::<_, VerificationRecord>(query) + .bind(Utc.timestamp_opt(before as i64, 0).unwrap()) + .fetch_optional(&self.conn) + .await? + .ok_or(error::Error::SqlxError(RowNotFound)) + } +} + +/// Create a dynamic SQL query and params from a subscription filter. +fn query_from_filter(f: &ReqFilter) -> Option> { + // if the filter is malformed, don't return anything. + if f.force_no_match { + return None; + } + + let mut query = QueryBuilder::new("SELECT e.\"content\", e.created_at FROM \"event\" e WHERE "); + + let mut push_and = false; + // Query for "authors", allowing prefix matches + if let Some(auth_vec) = &f.authors { + // filter out non-hex values + let auth_vec: Vec<&String> = auth_vec.iter().filter(|a| is_hex(a)).collect(); + + if !auth_vec.is_empty() { + query.push("("); + + // shortcut authors into "IN" query + let any_is_range = auth_vec.iter().any(|pk| pk.len() != 64); + if !any_is_range { + query.push("e.pub_key in ("); + let mut pk_sep = query.separated(", "); + for pk in auth_vec.iter() { + pk_sep.push_bind(hex::decode(pk).ok()); + } + query.push(") OR e.delegated_by in ("); + let mut pk_delegated_sep = query.separated(", "); + for pk in auth_vec.iter() { + pk_delegated_sep.push_bind(hex::decode(pk).ok()); + } + query.push(")"); + push_and = true; + } else { + let mut range_authors = query.separated(" OR "); + for auth in auth_vec { + match hex_range(auth) { + Some(HexSearch::Exact(ex)) => { + range_authors + .push("(e.pub_key = ") + .push_bind_unseparated(ex.clone()) + .push_unseparated(" OR e.delegated_by = ") + .push_bind_unseparated(ex) + .push_unseparated(")"); + } + Some(HexSearch::Range(lower, upper)) => { + range_authors + .push("((e.pub_key > ") + .push_bind_unseparated(lower.clone()) + .push_unseparated(" AND e.pub_key < ") + .push_bind_unseparated(upper.clone()) + .push_unseparated(") OR (e.delegated_by > ") + .push_bind_unseparated(lower) + .push_unseparated(" AND e.delegated_by < ") + .push_bind_unseparated(upper) + .push_unseparated("))"); + } + Some(HexSearch::LowerOnly(lower)) => { + range_authors + .push("(e.pub_key > ") + .push_bind_unseparated(lower.clone()) + .push_unseparated(" OR e.delegated_by > ") + .push_bind_unseparated(lower) + .push_unseparated(")"); + } + None => { + info!("Could not parse hex range from author {:?}", auth); + } + } + push_and = true; + } + } + query.push(")"); + } + } + + // Query for Kind + if let Some(ks) = &f.kinds { + if !ks.is_empty() { + if push_and { + query.push(" AND "); + } + push_and = true; + + query.push("e.kind in ("); + let mut list_query = query.separated(", "); + for k in ks.iter() { + list_query.push_bind(*k as i64); + } + query.push(")"); + } + } + + // Query for event, allowing prefix matches + if let Some(id_vec) = &f.ids { + // filter out non-hex values + let id_vec: Vec<&String> = id_vec.iter().filter(|a| is_hex(a)).collect(); + + if !id_vec.is_empty() { + if push_and { + query.push(" AND ("); + } else { + query.push("("); + } + push_and = true; + + // shortcut ids into "IN" query + let any_is_range = id_vec.iter().any(|pk| pk.len() != 64); + if !any_is_range { + query.push("id in ("); + let mut sep = query.separated(", "); + for id in id_vec.iter() { + sep.push_bind(hex::decode(id).ok()); + } + query.push(")"); + } else { + // take each author and convert to a hex search + let mut id_query = query.separated(" OR "); + for id in id_vec { + match hex_range(id) { + Some(HexSearch::Exact(ex)) => { + id_query + .push("(id = ") + .push_bind_unseparated(ex) + .push_unseparated(")"); + } + Some(HexSearch::Range(lower, upper)) => { + id_query + .push("(id > ") + .push_bind_unseparated(lower) + .push_unseparated(" AND id < ") + .push_bind_unseparated(upper) + .push_unseparated(")"); + } + Some(HexSearch::LowerOnly(lower)) => { + id_query + .push("(id > ") + .push_bind_unseparated(lower) + .push_unseparated(")"); + } + None => { + info!("Could not parse hex range from id {:?}", id); + } + } + } + } + + query.push(")"); + } + } + + // Query for tags + if let Some(map) = &f.tags { + if !map.is_empty() { + if push_and { + query.push(" AND "); + } + push_and = true; + + for (key, val) in map.iter() { + query.push("e.id IN (SELECT ee.id FROM \"event\" ee LEFT JOIN tag t on ee.id = t.event_id WHERE ee.hidden != 1::bit(1) and (t.\"name\" = ") + .push_bind(key.to_string()) + .push(" AND (value in ("); + + // plain value match first + let mut tag_query = query.separated(", "); + for v in val.iter() { + if (v.len() % 2 != 0) && !is_lower_hex(v) { + tag_query.push_bind(v.as_bytes()); + } else { + tag_query.push_bind(hex::decode(v).ok()); + } + } + query.push("))))"); + } + } + } + + // Query for timestamp + if f.since.is_some() { + if push_and { + query.push(" AND "); + } + push_and = true; + query + .push("e.created_at > ") + .push_bind(Utc.timestamp_opt(f.since.unwrap() as i64, 0).unwrap()); + } + + // Query for timestamp + if f.until.is_some() { + if push_and { + query.push(" AND "); + } + push_and = true; + query + .push("e.created_at < ") + .push_bind(Utc.timestamp_opt(f.until.unwrap() as i64, 0).unwrap()); + } + + // never display hidden events + if push_and { + query.push(" AND e.hidden != 1::bit(1)"); + } else { + query.push("e.hidden != 1::bit(1)"); + } + + // Apply per-filter limit to this query. + // The use of a LIMIT implies a DESC order, to capture only the most recent events. + if let Some(lim) = f.limit { + query.push(" ORDER BY e.created_at DESC LIMIT "); + query.push(lim.min(1000)); + } else { + query.push(" ORDER BY e.created_at ASC LIMIT "); + query.push(1000); + } + Some(query) +} + +impl FromRow<'_, PgRow> for VerificationRecord { + fn from_row(row: &'_ PgRow) -> std::result::Result { + let name = + Nip05Name::try_from(row.get::<'_, &str, &str>("name")).or(Err(RowNotFound))?; + Ok(VerificationRecord { + rowid: row.get::<'_, i64, &str>("id") as u64, + name, + address: hex::encode(row.get::<'_, Vec, &str>("pub_key")), + event: hex::encode(row.get::<'_, Vec, &str>("event_id")), + event_created: row.get::<'_, DateTime, &str>("created_at").timestamp() as u64, + last_success: None, + last_failure: match row.try_get::<'_, DateTime, &str>("failed_at") { + Ok(x) => Some(x.timestamp() as u64), + _ => None, + }, + failure_count: row.get::<'_, i32, &str>("fail_count") as u64, + }) + } +} diff --git a/src/repo/postgres_migration.rs b/src/repo/postgres_migration.rs new file mode 100644 index 0000000..5640396 --- /dev/null +++ b/src/repo/postgres_migration.rs @@ -0,0 +1,120 @@ +use crate::repo::postgres::PostgresPool; +use async_trait::async_trait; +use sqlx::{Executor, Postgres, Transaction}; + +#[async_trait] +pub trait Migration { + fn serial_number(&self) -> i64; + async fn run(&self, tx: &mut Transaction); +} + +struct SimpleSqlMigration { + pub serial_number: i64, + pub sql: Vec<&'static str>, +} + +#[async_trait] +impl Migration for SimpleSqlMigration { + fn serial_number(&self) -> i64 { + self.serial_number + } + + async fn run(&self, tx: &mut Transaction) { + for sql in self.sql.iter() { + tx.execute(*sql).await.unwrap(); + } + } +} + +/// Execute all migrations on the database. +pub async fn run_migrations(db: &PostgresPool) -> crate::error::Result { + prepare_migrations_table(db).await; + run_migration(m001::migration(), db).await; + Ok(m001::migration().serial_number() as usize) +} + +async fn prepare_migrations_table(db: &PostgresPool) { + sqlx::query("CREATE TABLE IF NOT EXISTS migrations (serial_number bigint)") + .execute(db) + .await + .unwrap(); +} + +async fn run_migration(migration: impl Migration, db: &PostgresPool) { + let row: i64 = + sqlx::query_scalar("SELECT COUNT(*) AS count FROM migrations WHERE serial_number = $1") + .bind(migration.serial_number()) + .fetch_one(db) + .await + .unwrap(); + + if row > 0 { + return; + } + + let mut transaction = db.begin().await.unwrap(); + migration.run(&mut transaction).await; + + sqlx::query("INSERT INTO migrations VALUES ($1)") + .bind(migration.serial_number()) + .execute(&mut transaction) + .await + .unwrap(); + + transaction.commit().await.unwrap(); +} + +mod m001 { + use crate::repo::postgres_migration::{Migration, SimpleSqlMigration}; + + pub fn migration() -> impl Migration { + SimpleSqlMigration { + serial_number: 1, + sql: vec![ + r#" +-- Events table +CREATE TABLE "event" ( + id bytea NOT NULL, + pub_key bytea NOT NULL, + created_at timestamp with time zone NOT NULL, + kind integer NOT NULL, + "content" bytea NOT NULL, + hidden bit(1) NOT NULL DEFAULT 0::bit(1), + delegated_by bytea NULL, + first_seen timestamp with time zone NOT NULL DEFAULT now(), + CONSTRAINT event_pkey PRIMARY KEY (id) +); +CREATE INDEX event_created_at_idx ON "event" (created_at,kind); +CREATE INDEX event_pub_key_idx ON "event" (pub_key); +CREATE INDEX event_delegated_by_idx ON "event" (delegated_by); + +-- Tags table +CREATE TABLE "tag" ( + id int8 NOT NULL GENERATED BY DEFAULT AS IDENTITY, + event_id bytea NOT NULL, + "name" varchar NOT NULL, + value bytea NOT NULL, + CONSTRAINT tag_fk FOREIGN KEY (event_id) REFERENCES "event"(id) ON DELETE CASCADE +); +CREATE INDEX tag_event_id_idx ON tag USING btree (event_id, name); +CREATE UNIQUE INDEX tag_event_id_value_idx ON tag (event_id,name,value); +CREATE INDEX tag_value_idx ON tag USING btree (value); + +-- NIP-05 Verfication table +CREATE TABLE "user_verification" ( + id int8 NOT NULL GENERATED BY DEFAULT AS IDENTITY, + event_id bytea NOT NULL, + "name" varchar NOT NULL, + verified_at timestamptz NULL, + failed_at timestamptz NULL, + fail_count int4 NULL DEFAULT 0, + CONSTRAINT user_verification_pk PRIMARY KEY (id), + CONSTRAINT user_verification_fk FOREIGN KEY (event_id) REFERENCES "event"(id) ON DELETE CASCADE +); +CREATE INDEX user_verification_event_id_idx ON user_verification USING btree (event_id); +CREATE INDEX user_verification_name_idx ON user_verification USING btree (name); + "#, + ], + } + } +}