From fde7872dca2f2b2b57d3c12c0f682cf2c3f94493 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 12 May 2025 11:32:15 +0200 Subject: [PATCH] Update deps, wip on bin read index --- .cargo/cargo-lock | 241 +++++++------- crates/daqbuf-redis/Cargo.toml | 4 +- crates/httpret/src/api4.rs | 1 + crates/httpret/src/api4/binned_v2.rs | 302 ++++++++++++++++++ crates/httpret/src/channelconfig.rs | 9 +- crates/httpret/src/err.rs | 1 + crates/httpret/src/httpret.rs | 2 + crates/nodenet/src/scylla.rs | 13 +- crates/scyllaconn/Cargo.toml | 7 +- crates/scyllaconn/src/accounting/totals.rs | 4 +- crates/scyllaconn/src/binned2.rs | 4 + .../scyllaconn/src/binned2/binnedrtbinlen.rs | 47 +++ .../scyllaconn/src/binned2/binnedrtmsplsps.rs | 82 +++++ crates/scyllaconn/src/binned2/intraday.rs | 1 + crates/scyllaconn/src/binned2/msplspiter.rs | 51 +++ crates/scyllaconn/src/binwriteindex.rs | 11 +- .../src/binwriteindex/read_all_coarse.rs | 54 ++++ crates/scyllaconn/src/conn.rs | 21 +- crates/scyllaconn/src/events2/events.rs | 12 +- .../scyllaconn/src/{scyllaconn.rs => lib.rs} | 1 + crates/scyllaconn/src/worker.rs | 7 +- 21 files changed, 703 insertions(+), 172 deletions(-) create mode 100644 crates/httpret/src/api4/binned_v2.rs create mode 100644 crates/scyllaconn/src/binned2.rs create mode 100644 crates/scyllaconn/src/binned2/binnedrtbinlen.rs create mode 100644 crates/scyllaconn/src/binned2/binnedrtmsplsps.rs create mode 100644 crates/scyllaconn/src/binned2/intraday.rs create mode 100644 crates/scyllaconn/src/binned2/msplspiter.rs create mode 100644 crates/scyllaconn/src/binwriteindex/read_all_coarse.rs rename crates/scyllaconn/src/{scyllaconn.rs => lib.rs} (96%) diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index 1d0ebfb..e4b9e6d 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -19,14 +19,14 @@ checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" [[package]] name = "ahash" -version = "0.8.11" +version = "0.8.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" dependencies = [ "cfg-if", "once_cell", "version_check", - "zerocopy 0.7.35", + "zerocopy", ] [[package]] @@ -325,9 +325,9 @@ checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" [[package]] name = "cc" -version = "1.2.21" +version = "1.2.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8691782945451c1c383942c4874dbe63814f61cb57ef773cda2972682b7bb3c0" +checksum = "32db95edf998450acc7881c932f94cd9b05c87b4b2599e8bab064753da4acfd1" dependencies = [ "shlex", ] @@ -394,9 +394,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.37" +version = "4.5.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eccb054f56cbd38340b380d4a8e69ef1f02f1af43db2f0cc817a4774d80ae071" +checksum = "ed93b9805f8ba930df42c2590f05453d5ec36cbb85d018868a5b24d31f6ac000" dependencies = [ "clap_builder", "clap_derive", @@ -404,9 +404,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.37" +version = "4.5.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efd9466fac8543255d3b1fcad4762c5e116ffe808c8a3043d4263cd4fd4862a2" +checksum = "379026ff283facf611b0ea629334361c4211d1b12ee01024eec1591133b04120" dependencies = [ "anstream", "anstyle", @@ -747,7 +747,7 @@ dependencies = [ [[package]] name = "daqbuf-streams" -version = "0.0.3" +version = "0.0.4" dependencies = [ "arrayref", "async-channel 2.3.1", @@ -1210,9 +1210,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73fea8450eea4bac3940448fb7ae50d91f034f941199fcd9d909a5a07aa455f0" +checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" dependencies = [ "cfg-if", "js-sys", @@ -1559,21 +1559,22 @@ dependencies = [ [[package]] name = "icu_collections" -version = "1.5.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db2fa452206ebee18c4b5c2274dbf1de17008e874b4dc4f0aea9d01ca79e4526" +checksum = "200072f5d0e3614556f94a9930d5dc3e0662a652823904c3a75dc3b0af7fee47" dependencies = [ "displaydoc", - "yoke", + "potential_utf", + "yoke 0.8.0", "zerofrom", "zerovec", ] [[package]] -name = "icu_locid" -version = "1.5.0" +name = "icu_locale_core" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13acbb8371917fc971be86fc8057c41a64b521c184808a698c02acc242dbf637" +checksum = "0cde2700ccaed3872079a65fb1a78f6c0a36c91570f28755dda67bc8f7d9f00a" dependencies = [ "displaydoc", "litemap", @@ -1582,31 +1583,11 @@ dependencies = [ "zerovec", ] -[[package]] -name = "icu_locid_transform" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01d11ac35de8e40fdeda00d9e1e9d92525f3f9d887cdd7aa81d727596788b54e" -dependencies = [ - "displaydoc", - "icu_locid", - "icu_locid_transform_data", - "icu_provider", - "tinystr", - "zerovec", -] - -[[package]] -name = "icu_locid_transform_data" -version = "1.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7515e6d781098bf9f7205ab3fc7e9709d34554ae0b21ddbcb5febfa4bc7df11d" - [[package]] name = "icu_normalizer" -version = "1.5.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19ce3e0da2ec68599d193c93d088142efd7f9c5d6fc9b803774855747dc6a84f" +checksum = "436880e8e18df4d7bbc06d58432329d6458cc84531f7ac5f024e93deadb37979" dependencies = [ "displaydoc", "icu_collections", @@ -1614,67 +1595,54 @@ dependencies = [ "icu_properties", "icu_provider", "smallvec", - "utf16_iter", - "utf8_iter", - "write16", "zerovec", ] [[package]] name = "icu_normalizer_data" -version = "1.5.1" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5e8338228bdc8ab83303f16b797e177953730f601a96c25d10cb3ab0daa0cb7" +checksum = "00210d6893afc98edb752b664b8890f0ef174c8adbb8d0be9710fa66fbbf72d3" [[package]] name = "icu_properties" -version = "1.5.1" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93d6020766cfc6302c15dbbc9c8778c37e62c14427cb7f6e601d849e092aeef5" +checksum = "2549ca8c7241c82f59c80ba2a6f415d931c5b58d24fb8412caa1a1f02c49139a" dependencies = [ "displaydoc", "icu_collections", - "icu_locid_transform", + "icu_locale_core", "icu_properties_data", "icu_provider", - "tinystr", + "potential_utf", + "zerotrie", "zerovec", ] [[package]] name = "icu_properties_data" -version = "1.5.1" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85fb8799753b75aee8d2a21d7c14d9f38921b54b3dbda10f5a3c7a7b82dba5e2" +checksum = "8197e866e47b68f8f7d95249e172903bec06004b18b2937f1095d40a0c57de04" [[package]] name = "icu_provider" -version = "1.5.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ed421c8a8ef78d3e2dbc98a973be2f3770cb42b606e3ab18d6237c4dfde68d9" +checksum = "03c80da27b5f4187909049ee2d72f276f0d9f99a42c306bd0131ecfe04d8e5af" dependencies = [ "displaydoc", - "icu_locid", - "icu_provider_macros", + "icu_locale_core", "stable_deref_trait", "tinystr", "writeable", - "yoke", + "yoke 0.8.0", "zerofrom", + "zerotrie", "zerovec", ] -[[package]] -name = "icu_provider_macros" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "ident_case" version = "1.0.1" @@ -1694,9 +1662,9 @@ dependencies = [ [[package]] name = "idna_adapter" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "daca1df1c957320b2cf139ac61e7bd64fed304c5040df000a745aa1de3b4ef71" +checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344" dependencies = [ "icu_normalizer", "icu_properties", @@ -1797,9 +1765,9 @@ checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" [[package]] name = "litemap" -version = "0.7.5" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23fb14cb19457329c82206317a5663005a4d404783dc74f4252769b0d5f42856" +checksum = "241eaef5fd12c88705a01fc1066c48c4b36e0dd4377dcdc7ec3942cea7a69956" [[package]] name = "lock_api" @@ -1817,6 +1785,12 @@ version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" +[[package]] +name = "lru-slab" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" + [[package]] name = "lz4_flex" version = "0.11.3" @@ -2127,6 +2101,15 @@ dependencies = [ "serde_json", ] +[[package]] +name = "potential_utf" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5a7c30837279ca13e7c867e9e40053bc68740f988cb07f7ca6df43cc734b585" +dependencies = [ + "zerovec", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -2139,7 +2122,7 @@ version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" dependencies = [ - "zerocopy 0.8.25", + "zerocopy", ] [[package]] @@ -2153,9 +2136,9 @@ dependencies = [ [[package]] name = "quinn" -version = "0.11.7" +version = "0.11.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3bd15a6f2967aef83887dcb9fec0014580467e33720d073560cf015a5683012" +checksum = "626214629cda6781b6dc1d316ba307189c85ba657213ce642d9c77670f8202c8" dependencies = [ "bytes", "cfg_aliases", @@ -2174,12 +2157,13 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.11" +version = "0.11.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcbafbbdbb0f638fe3f35f3c56739f77a8a1d070cb25603226c83339b391472b" +checksum = "49df843a9161c85bb8aae55f101bc0bac8bcafd637a620d9122fd7e0b2f7422e" dependencies = [ "bytes", - "getrandom 0.3.2", + "getrandom 0.3.3", + "lru-slab", "rand 0.9.1", "ring", "rustc-hash", @@ -2278,7 +2262,7 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" dependencies = [ - "getrandom 0.3.2", + "getrandom 0.3.3", ] [[package]] @@ -2310,11 +2294,10 @@ dependencies = [ [[package]] name = "redis" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "438a4e5f8e9aa246d6f3666d6978441bf1b37d5f417b50c4dd220be09f5fcc17" +checksum = "0bc1ea653e0b2e097db3ebb5b7f678be339620b8041f66b30a308c1d45d36a7f" dependencies = [ - "arc-swap", "combine", "itoa", "num-bigint", @@ -2449,11 +2432,12 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.11.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c" +checksum = "229a4a4c221013e7e1f1a043678c5cc39fe5171437c88fb47151a21e6f5b5c79" dependencies = [ "web-time", + "zeroize", ] [[package]] @@ -2485,9 +2469,9 @@ checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" [[package]] name = "rustls-webpki" -version = "0.103.2" +version = "0.103.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7149975849f1abb3832b246010ef62ccc80d3a76169517ada7188252b9cfb437" +checksum = "e4a72fe2bcf7a6ac6fd7d0b9e5cb68aeb7d4c0a0271730218b3e92d43b4eb435" dependencies = [ "ring", "rustls-pki-types", @@ -2577,7 +2561,7 @@ dependencies = [ "thiserror 2.0.12", "tokio", "uuid", - "yoke", + "yoke 0.7.5", ] [[package]] @@ -2594,7 +2578,7 @@ dependencies = [ [[package]] name = "scyllaconn" -version = "0.0.1" +version = "0.0.2" dependencies = [ "async-channel 2.3.1", "autoerr", @@ -2997,9 +2981,9 @@ dependencies = [ [[package]] name = "tinystr" -version = "0.7.6" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9117f5d4db391c1cf6927e7bea3db74b9a1c1add8f7eda9ffd5364f40f57b82f" +checksum = "5d4f6d1145dcb577acf783d4e601bc1d76a13337bb54e6233add580b07344c8b" dependencies = [ "displaydoc", "zerovec", @@ -3279,12 +3263,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "utf16_iter" -version = "1.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8232dd3cdaed5356e0f716d285e4b40b932ac434100fe9b7e0e8e935b9e6246" - [[package]] name = "utf8_iter" version = "1.0.4" @@ -3303,7 +3281,7 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9" dependencies = [ - "getrandom 0.3.2", + "getrandom 0.3.3", ] [[package]] @@ -3712,17 +3690,11 @@ dependencies = [ "bitflags", ] -[[package]] -name = "write16" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1890f4022759daae28ed4fe62859b1236caebfc61ede2f63ed4e695f3f6d936" - [[package]] name = "writeable" -version = "0.5.5" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" +checksum = "ea2f10b9bb0928dfb1b42b65e1f9e36f7f54dbdf08457afefb38afcdec4fa2bb" [[package]] name = "yoke" @@ -3732,7 +3704,19 @@ checksum = "120e6aef9aa629e3d4f52dc8cc43a015c7724194c97dfaf45180d2daf2b77f40" dependencies = [ "serde", "stable_deref_trait", - "yoke-derive", + "yoke-derive 0.7.5", + "zerofrom", +] + +[[package]] +name = "yoke" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f41bb01b8226ef4bfd589436a297c53d118f65921786300e427be8d487695cc" +dependencies = [ + "serde", + "stable_deref_trait", + "yoke-derive 0.8.0", "zerofrom", ] @@ -3749,12 +3733,15 @@ dependencies = [ ] [[package]] -name = "zerocopy" -version = "0.7.35" +name = "yoke-derive" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +checksum = "38da3c9736e16c5d3c8c597a9aaa5d1fa565d0532ae05e27c24aa62fb32c0ab6" dependencies = [ - "zerocopy-derive 0.7.35", + "proc-macro2", + "quote", + "syn", + "synstructure", ] [[package]] @@ -3763,18 +3750,7 @@ version = "0.8.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1702d9583232ddb9174e01bb7c15a2ab8fb1bc6f227aa1233858c351a3ba0cb" dependencies = [ - "zerocopy-derive 0.8.25", -] - -[[package]] -name = "zerocopy-derive" -version = "0.7.35" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" -dependencies = [ - "proc-macro2", - "quote", - "syn", + "zerocopy-derive", ] [[package]] @@ -3816,21 +3792,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" [[package]] -name = "zerovec" -version = "0.10.4" +name = "zerotrie" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa2b893d79df23bfb12d5461018d408ea19dfafe76c2c7ef6d4eba614f8ff079" +checksum = "36f0bbd478583f79edad978b407914f61b2972f5af6fa089686016be8f9af595" dependencies = [ - "yoke", + "displaydoc", + "yoke 0.8.0", + "zerofrom", +] + +[[package]] +name = "zerovec" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a05eb080e015ba39cc9e23bbe5e7fb04d5fb040350f99f34e338d5fdd294428" +dependencies = [ + "yoke 0.8.0", "zerofrom", "zerovec-derive", ] [[package]] name = "zerovec-derive" -version = "0.10.3" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" +checksum = "5b96237efa0c878c64bd89c436f661be4e46b2f3eff1ebb976f7ef2321d2f58f" dependencies = [ "proc-macro2", "quote", diff --git a/crates/daqbuf-redis/Cargo.toml b/crates/daqbuf-redis/Cargo.toml index 04d4f4a..9e5b9b4 100644 --- a/crates/daqbuf-redis/Cargo.toml +++ b/crates/daqbuf-redis/Cargo.toml @@ -2,11 +2,11 @@ name = "daqbuf-redis" version = "0.0.1" authors = ["Dominik Werder "] -edition = "2021" +edition = "2024" [dependencies] serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" autoerr = "0.0.3" taskrun = { path = "../taskrun" } -redis = { version = "0.30.0", features = [] } +redis = { version = "0.31.0", features = [] } diff --git a/crates/httpret/src/api4.rs b/crates/httpret/src/api4.rs index 7e63603..be21a19 100644 --- a/crates/httpret/src/api4.rs +++ b/crates/httpret/src/api4.rs @@ -1,6 +1,7 @@ pub mod accounting; pub mod backend; pub mod binned; +pub mod binned_v2; pub mod binwriteindex; pub mod databuffer_tools; pub mod docs; diff --git a/crates/httpret/src/api4/binned_v2.rs b/crates/httpret/src/api4/binned_v2.rs new file mode 100644 index 0000000..dbd592d --- /dev/null +++ b/crates/httpret/src/api4/binned_v2.rs @@ -0,0 +1,302 @@ +use crate::bodystream::response; +use crate::channelconfig::ch_conf_from_binned; +use crate::requests::accepts_cbor_framed; +use crate::requests::accepts_json_framed; +use crate::requests::accepts_json_or_all; +use crate::requests::accepts_octets; +use crate::ServiceSharedResources; +use daqbuf_err as err; +use dbconn::worker::PgQueue; +use futures_util::StreamExt; +use futures_util::TryStreamExt; +use http::header::CONTENT_TYPE; +use http::request::Parts; +use http::Method; +use http::StatusCode; +use httpclient::bad_request_response; +use httpclient::body_empty; +use httpclient::body_stream; +use httpclient::error_response; +use httpclient::error_status_response; +use httpclient::not_found_response; +use httpclient::IntoBody; +use httpclient::Requ; +use httpclient::StreamResponse; +use httpclient::ToJsonBody; +use netpod::log; +use netpod::req_uri_to_url; +use netpod::timeunits::SEC; +use netpod::ttl::RetentionTime; +use netpod::ChannelTypeConfigGen; +use netpod::FromUrl; +use netpod::NodeConfigCached; +use netpod::ReqCtx; +use netpod::APP_CBOR_FRAMED; +use netpod::APP_JSON; +use netpod::APP_JSON_FRAMED; +use netpod::HEADER_NAME_REQUEST_ID; +use nodenet::client::OpenBoxedBytesViaHttp; +use nodenet::scylla::ScyllaEventReadProvider; +use query::api4::binned::BinWriteIndexQuery; +use query::api4::binned::BinnedQuery; +use scyllaconn::worker::ScyllaQueue; +use series::msp::PrebinnedPartitioning; +use series::SeriesId; +use std::pin::Pin; +use std::sync::Arc; +use streams::eventsplainreader::DummyCacheReadProvider; +use streams::eventsplainreader::SfDatabufferEventReadProvider; +use streams::streamtimeout::StreamTimeout2; +use streams::timebin::cached::reader::EventsReadProvider; +use streams::timebin::CacheReadProvider; +use tracing::Instrument; +use tracing::Span; +use url::Url; + +macro_rules! error { ($($arg:expr),*) => ( if true { log::error!($($arg),*); } ); } +macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ); } +macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); } +macro_rules! trace { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ); } + +autoerr::create_error_v1!( + name(Error, "Api4BinnedV2"), + enum variants { + ChannelNotFound, + BadQuery(String), + HttpLib(#[from] http::Error), + ChannelConfig(crate::channelconfig::Error), + Retrieval(#[from] crate::RetrievalError), + EventsCbor(#[from] streams::plaineventscbor::Error), + EventsJson(#[from] streams::plaineventsjson::Error), + ServerError, + BinnedStream(err::Error), + TimebinnedJson(#[from] streams::timebinnedjson::Error), + ReadAllCoarse(#[from] scyllaconn::binwriteindex::read_all_coarse::Error), + }, +); + +impl From for Error { + fn from(value: crate::channelconfig::Error) -> Self { + use crate::channelconfig::Error::*; + match value { + NotFound(_) => Self::ChannelNotFound, + _ => Self::ChannelConfig(value), + } + } +} + +impl From for crate::RetrievalError { + fn from(value: Error) -> Self { + crate::RetrievalError::TextError(value.to_string()) + } +} + +pub struct BinnedV2Handler {} + +impl BinnedV2Handler { + pub fn handler(req: &Requ) -> Option { + if req.uri().path() == "/api/4/private/binnedv2" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle( + &self, + req: Requ, + ctx: &ReqCtx, + shared_res: &ServiceSharedResources, + ncc: &NodeConfigCached, + ) -> Result { + if req.method() != Method::GET { + return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?); + } + match handle_request(req, ctx, &shared_res.pgqueue, shared_res.scyqueue.clone(), ncc).await { + Ok(ret) => Ok(ret), + Err(e) => match e { + Error::ChannelNotFound => { + let res = not_found_response("channel not found".into(), ctx.reqid()); + Ok(res) + } + Error::BadQuery(msg) => { + let res = bad_request_response(msg, ctx.reqid()); + Ok(res) + } + _ => { + error!("EventsHandler sees: {}", e); + Ok(error_response(e.to_string(), ctx.reqid())) + } + }, + } + } +} + +async fn handle_request( + req: Requ, + ctx: &ReqCtx, + pgqueue: &PgQueue, + scyqueue: Option, + ncc: &NodeConfigCached, +) -> Result { + let url = req_uri_to_url(req.uri()).map_err(|e| Error::BadQuery(e.to_string()))?; + if req + .uri() + .path_and_query() + .map_or(false, |x| x.as_str().contains("DOERR")) + { + Err(Error::ServerError)?; + } + let reqid = ctx.reqid(); + let (head, _body) = req.into_parts(); + let query = BinnedQuery::from_url(&url).map_err(|e| { + error!("handle_request: {}", e); + Error::BadQuery(e.to_string()) + })?; + info!("{:?}", query); + let logspan = if query.log_level() == "trace" { + trace!("enable trace for handler"); + tracing::span!(tracing::Level::INFO, "log_span_trace") + } else if query.log_level() == "debug" { + debug!("enable debug for handler"); + tracing::span!(tracing::Level::INFO, "log_span_debug") + } else { + tracing::Span::none() + }; + let span1 = tracing::span!( + tracing::Level::INFO, + "binwriteindex", + reqid, + beg = query.range().beg_u64() / SEC, + end = query.range().end_u64() / SEC, + ch = query.channel().name(), + ); + span1.in_scope(|| { + debug!("binned begin {:?}", query); + }); + binned_instrumented(head, ctx, url, query, pgqueue, scyqueue, ncc, logspan.clone()) + .instrument(logspan) + .instrument(span1) + .await +} + +async fn binned_instrumented( + head: Parts, + ctx: &ReqCtx, + url: Url, + query: BinnedQuery, + pgqueue: &PgQueue, + scyqueue: Option, + ncc: &NodeConfigCached, + logspan: Span, +) -> Result { + let res2 = HandleRes2::new(ctx, logspan, url, query.clone(), pgqueue, scyqueue, ncc).await?; + if accepts_json_framed(&head.headers) { + Ok(binned_json_framed(res2, ctx, ncc).await?) + } else { + let ret = error_response(format!("unsupported accept: {:?}", &head.headers), ctx.reqid()); + Ok(ret) + } +} + +fn make_read_provider( + chname: &str, + scyqueue: Option, + open_bytes: Pin>, + ctx: &ReqCtx, + ncc: &NodeConfigCached, +) -> (Arc, Arc) { + let events_read_provider = if chname.starts_with("unittest") { + let x = streams::teststream::UnitTestStream::new(); + Arc::new(x) + } else if ncc.node_config.cluster.scylla_lt().is_some() { + scyqueue + .clone() + .map(|qu| ScyllaEventReadProvider::new(qu)) + .map(|x| Arc::new(x) as Arc) + .expect("scylla queue") + } else if ncc.node.sf_databuffer.is_some() { + // TODO do not clone the request. Pass an Arc up to here. + let x = SfDatabufferEventReadProvider::new(Arc::new(ctx.clone()), open_bytes); + Arc::new(x) + } else { + panic!("unexpected backend") + }; + let cache_read_provider = if ncc.node_config.cluster.scylla_lt().is_some() { + scyqueue + .clone() + .map(|qu| scyllaconn::bincache::ScyllaPrebinnedReadProvider::new(qu)) + .map(|x| Arc::new(x) as Arc) + .expect("scylla queue") + } else if ncc.node.sf_databuffer.is_some() { + let x = DummyCacheReadProvider::new(); + Arc::new(x) + } else { + panic!("unexpected backend") + }; + (events_read_provider, cache_read_provider) +} + +async fn binned_json_framed( + res2: HandleRes2<'_>, + ctx: &ReqCtx, + _ncc: &NodeConfigCached, +) -> Result { + let series = SeriesId::new(res2.ch_conf.series().unwrap()); + let range = res2.query.range().to_time().unwrap(); + let scyqueue = res2.scyqueue.as_ref().unwrap(); + let res = scyllaconn::binwriteindex::read_all_coarse::read_all_coarse(series, range, scyqueue).await?; + let mut strings = Vec::new(); + for e in res { + strings.push(format!("{:?}", e)); + } + let ret = response(StatusCode::OK) + .header(CONTENT_TYPE, APP_JSON) + .header(HEADER_NAME_REQUEST_ID, ctx.reqid()) + .body(ToJsonBody::from(&strings).into_body())?; + Ok(ret) +} + +struct HandleRes2<'a> { + logspan: Span, + url: Url, + query: BinnedQuery, + ch_conf: ChannelTypeConfigGen, + events_read_provider: Arc, + cache_read_provider: Arc, + timeout_provider: Box, + pgqueue: &'a PgQueue, + scyqueue: Option, +} + +impl<'a> HandleRes2<'a> { + async fn new( + ctx: &ReqCtx, + logspan: Span, + url: Url, + query: BinnedQuery, + pgqueue: &'a PgQueue, + scyqueue: Option, + ncc: &NodeConfigCached, + ) -> Result { + let ch_conf = ch_conf_from_binned(&query, ctx, pgqueue, ncc) + .await? + .ok_or_else(|| Error::ChannelNotFound)?; + let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); + let (events_read_provider, cache_read_provider) = + make_read_provider(ch_conf.name(), scyqueue.clone(), open_bytes, ctx, ncc); + let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed(); + let ret = Self { + logspan, + url, + query, + ch_conf, + events_read_provider, + cache_read_provider, + timeout_provider, + pgqueue, + scyqueue, + }; + Ok(ret) + } +} diff --git a/crates/httpret/src/channelconfig.rs b/crates/httpret/src/channelconfig.rs index 59f761d..baf0ef9 100644 --- a/crates/httpret/src/channelconfig.rs +++ b/crates/httpret/src/channelconfig.rs @@ -68,6 +68,7 @@ autoerr::create_error_v1!( Async(#[from] netpod::AsyncChannelError), ChannelConfig(#[from] dbconn::channelconfig::Error), Netpod(#[from] netpod::Error), + ScyllaConn(#[from] scyllaconn::conn::Error), ScyllaExecution(#[from] scyllaconn::scylla::errors::ExecutionError), ScyllaPagerExecution(#[from] scyllaconn::scylla::errors::PagerExecutionError), ScyllanextRow(#[from] scyllaconn::scylla::errors::NextRowError), @@ -484,9 +485,7 @@ impl ScyllaChannelsActive { .cluster .scylla_st() .ok_or_else(|| Error::ExpectScyllaBackend)?; - let scy = scyllaconn::conn::create_scy_session(scyco) - .await - .map_err(other_err_error)?; + let scy = scyllaconn::conn::create_scy_session(scyco).await?; // Database stores tsedge/ts_msp in units of (10 sec), and we additionally map to the grid. let tsedge = q.tsedge / 10 / (6 * 2) * (6 * 2); info!( @@ -875,9 +874,7 @@ impl GenerateScyllaTestData { async fn process(&self, node_config: &NodeConfigCached) -> Result<(), Error> { let scyconf = node_config.node_config.cluster.scylla_st().unwrap(); - let scy = scyllaconn::conn::create_scy_session(scyconf) - .await - .map_err(other_err_error)?; + let scy = scyllaconn::conn::create_scy_session(scyconf).await?; let series: u64 = 42001; // TODO query `ts_msp` for all MSP values und use that to delete from event table first. // Only later delete also from the `ts_msp` table. diff --git a/crates/httpret/src/err.rs b/crates/httpret/src/err.rs index 8970893..afdfa5a 100644 --- a/crates/httpret/src/err.rs +++ b/crates/httpret/src/err.rs @@ -115,3 +115,4 @@ impl Convable for query::api4::Error {} impl Convable for query::api4::events::Error {} impl Convable for netpod::Error {} impl Convable for crate::http3::Error {} +impl Convable for scyllaconn::conn::Error {} diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 0073dc5..560b83c 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -381,6 +381,8 @@ async fn http_service_inner( } } else if let Some(h) = api4::binwriteindex::BinWriteIndexHandler::handler(&req) { Ok(h.handle(req, ctx, &shared_res, &node_config).await?) + } else if let Some(h) = api4::binned_v2::BinnedV2Handler::handler(&req) { + Ok(h.handle(req, ctx, &shared_res, &node_config).await?) } else if let Some(h) = api4::eventdata::EventDataHandler::handler(&req) { Ok(h.handle(req, ctx, &node_config, shared_res) .await diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index 84db512..c0d4aea 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -1,6 +1,4 @@ use daqbuf_err as err; -use err::thiserror; -use err::ThisError; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; @@ -25,11 +23,12 @@ use std::task::Poll; use streams::timebin::cached::reader::EventsReadProvider; use taskrun::tokio; -#[derive(Debug, ThisError)] -#[cstm(name = "ScyllaChannelEventStream")] -pub enum Error { - MergeRt(#[from] mergert::Error), -} +autoerr::create_error_v1!( + name(Error, "ScyllaChannelEventStream"), + enum variants { + MergeRt(#[from] mergert::Error), + }, +); pub async fn scylla_channel_event_stream( evq: EventsSubQuery, diff --git a/crates/scyllaconn/Cargo.toml b/crates/scyllaconn/Cargo.toml index cf39d61..6c9234b 100644 --- a/crates/scyllaconn/Cargo.toml +++ b/crates/scyllaconn/Cargo.toml @@ -1,11 +1,8 @@ [package] name = "scyllaconn" -version = "0.0.1" +version = "0.0.2" authors = ["Dominik Werder "] -edition = "2021" - -[lib] -path = "src/scyllaconn.rs" +edition = "2024" [dependencies] futures-util = "0.3.31" diff --git a/crates/scyllaconn/src/accounting/totals.rs b/crates/scyllaconn/src/accounting/totals.rs index 7558f73..8b99abe 100644 --- a/crates/scyllaconn/src/accounting/totals.rs +++ b/crates/scyllaconn/src/accounting/totals.rs @@ -9,10 +9,10 @@ use items_0::Empty; use items_0::Extendable; use items_0::WithLen; use items_2::accounting::AccountingEvents; +use netpod::EMIT_ACCOUNTING_SNAP; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::timeunits; -use netpod::EMIT_ACCOUNTING_SNAP; use scylla::statement::prepared::PreparedStatement; use std::collections::VecDeque; use std::pin::Pin; @@ -215,7 +215,7 @@ impl Stream for AccountingStreamScylla { continue; } } - FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) { + FrState::ReadValues(st) => match st.fut.poll_unpin(cx) { Ready(Ok(mut item)) => { if !st.next() { self.state = FrState::Done; diff --git a/crates/scyllaconn/src/binned2.rs b/crates/scyllaconn/src/binned2.rs new file mode 100644 index 0000000..56fe580 --- /dev/null +++ b/crates/scyllaconn/src/binned2.rs @@ -0,0 +1,4 @@ +pub mod binnedrtbinlen; +pub mod binnedrtmsplsps; +pub mod intraday; +pub mod msplspiter; diff --git a/crates/scyllaconn/src/binned2/binnedrtbinlen.rs b/crates/scyllaconn/src/binned2/binnedrtbinlen.rs new file mode 100644 index 0000000..cf8c419 --- /dev/null +++ b/crates/scyllaconn/src/binned2/binnedrtbinlen.rs @@ -0,0 +1,47 @@ +use crate::worker::ScyllaQueue; +use daqbuf_series::SeriesId; +use daqbuf_series::msp::PrebinnedPartitioning; +use netpod::BinnedRange; +use netpod::TsNano; +use netpod::ttl::RetentionTime; + +/* +Given RT, PBP and range, loop over all the bins to retrieve. + +When there is no content, skip over. +*/ + +pub struct BinnedRtBinlenStream { + series: SeriesId, + rt: RetentionTime, + pbp: PrebinnedPartitioning, + range: BinnedRange, + scyqueue: ScyllaQueue, +} + +impl BinnedRtBinlenStream { + pub fn new( + series: SeriesId, + rt: RetentionTime, + pbp: PrebinnedPartitioning, + range: BinnedRange, + scyqueue: ScyllaQueue, + ) -> Self { + Self { + series, + rt, + pbp, + range, + scyqueue, + } + } + + fn make_next_fut(&mut self) -> Option<()> { + let series = self.series.clone(); + let rt = self.rt.clone(); + let msp = todo!(); + let binlen = todo!(); + let lsps = todo!(); + super::binnedrtmsplsps::BinnedRtMspLsps::new(series, rt, msp, binlen, lsps, self.scyqueue.clone()); + } +} diff --git a/crates/scyllaconn/src/binned2/binnedrtmsplsps.rs b/crates/scyllaconn/src/binned2/binnedrtmsplsps.rs new file mode 100644 index 0000000..4eaffbc --- /dev/null +++ b/crates/scyllaconn/src/binned2/binnedrtmsplsps.rs @@ -0,0 +1,82 @@ +/* +Fetches the bins for a given RT, binlen and MSP. +Issues the scylla commands. +Assembles the results. +Does basic sanity checks. +May re-chunk the result if too large. +*/ + +use crate::worker::ScyllaQueue; +use daqbuf_series::SeriesId; +use daqbuf_series::msp::LspU32; +use daqbuf_series::msp::MspU32; +use items_2::binning::container_bins::ContainerBins; +use netpod::DtMs; +use netpod::ttl::RetentionTime; +use std::fmt; +use std::pin::Pin; + +macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ); } +macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); } + +autoerr::create_error_v1!( + name(Error, "BinnedRtMsp"), + enum variants { + ReadJob(#[from] streams::timebin::cached::reader::Error), + }, +); + +type Fut = + Pin, streams::timebin::cached::reader::Error>> + Send>>; + +struct FutW(Fut); + +impl fmt::Debug for FutW { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_tuple("Fut").finish() + } +} + +pub struct BinnedRtMspLsps { + series: SeriesId, + rt: RetentionTime, + msp: MspU32, + lsps: (LspU32, LspU32), + binlen: DtMs, + scyqueue: ScyllaQueue, + fut: Option, +} + +impl BinnedRtMspLsps { + pub fn new( + series: SeriesId, + rt: RetentionTime, + msp: MspU32, + binlen: DtMs, + lsps: (LspU32, LspU32), + scyqueue: ScyllaQueue, + ) -> Self { + Self { + series, + rt, + msp, + lsps, + binlen, + scyqueue, + fut: None, + } + } + + fn make_next_fut(&mut self) -> Option { + let rt = self.rt.clone(); + let series = self.series.id(); + let binlen = self.binlen.clone(); + let msp = self.msp.to_u64(); + let offs = self.lsps.0.to_u32()..self.lsps.1.to_u32(); + // SAFETY we only use scyqueue while we self are alive. + let scyqueue = unsafe { &mut *(&mut self.scyqueue as *mut ScyllaQueue) }; + let fut = scyqueue.read_prebinned_f32(rt, series, binlen, msp, offs); + let fut = Box::pin(fut); + Some(fut) + } +} diff --git a/crates/scyllaconn/src/binned2/intraday.rs b/crates/scyllaconn/src/binned2/intraday.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/scyllaconn/src/binned2/intraday.rs @@ -0,0 +1 @@ + diff --git a/crates/scyllaconn/src/binned2/msplspiter.rs b/crates/scyllaconn/src/binned2/msplspiter.rs new file mode 100644 index 0000000..0487940 --- /dev/null +++ b/crates/scyllaconn/src/binned2/msplspiter.rs @@ -0,0 +1,51 @@ +use daqbuf_series::msp::LspU32; +use daqbuf_series::msp::MspU32; +use daqbuf_series::msp::PrebinnedPartitioning; +use netpod::TsMs; +use netpod::range::evrange::NanoRange; + +#[derive(Debug, Clone)] +pub struct MspLspItem { + pub msp: MspU32, + pub lsp: LspU32, +} + +#[derive(Debug)] +pub struct MspLspIter { + range: NanoRange, + pbp: PrebinnedPartitioning, + ts: TsMs, +} + +impl MspLspIter { + pub fn new(range: NanoRange, pbp: PrebinnedPartitioning) -> Self { + let ts = range.beg_ts().to_ts_ms(); + Self { range, pbp, ts } + } +} + +impl Iterator for MspLspIter { + type Item = (MspU32, LspU32); + + fn next(&mut self) -> Option { + if self.ts >= self.range.end_ts().to_ts_ms() { + None + } else { + let x = self.pbp.msp_lsp(self.ts); + let msp = MspU32(x.0); + let lsp = LspU32(x.1); + self.ts = self.ts.add_dt_ms(self.pbp.bin_len()); + Some((msp, lsp)) + } + } +} + +#[test] +fn test_iter_00() { + let range = NanoRange::from_strings("", "").unwrap(); + let pbp = PrebinnedPartitioning::Sec1; + let mut it = MspLspIter::new(range, pbp); + for x in it { + eprintln!("{:?}", x); + } +} diff --git a/crates/scyllaconn/src/binwriteindex.rs b/crates/scyllaconn/src/binwriteindex.rs index a1f5357..4595442 100644 --- a/crates/scyllaconn/src/binwriteindex.rs +++ b/crates/scyllaconn/src/binwriteindex.rs @@ -1,4 +1,5 @@ pub mod bwxcmb; +pub mod read_all_coarse; use crate::worker::ScyllaQueue; use daqbuf_series::msp::MspU32; @@ -10,6 +11,7 @@ use futures_util::Stream; use netpod::log; use netpod::range::evrange::NanoRange; use netpod::ttl::RetentionTime; +use netpod::DtMs; use std::collections::VecDeque; use std::fmt; use std::pin::Pin; @@ -45,7 +47,6 @@ pub struct BinWriteIndexEntry { #[derive(Debug)] pub struct BinWriteIndexSet { - pub rt: RetentionTime, pub msp: MspU32, pub entries: VecDeque, } @@ -78,7 +79,12 @@ impl BinWriteIndexRtStream { info!("{}::new INFO/DEBUG test", Self::type_name()); debug!("{}::new", Self::type_name()); let (msp_beg, lsp_beg) = pbp.msp_lsp(range.beg_ts().to_ts_ms()); - let (msp_end, lsp_end) = pbp.msp_lsp(range.end_ts().add_dt_nano(pbp.bin_len().dt_ns()).to_ts_ms()); + let (msp_end, lsp_end) = pbp.msp_lsp( + range + .end_ts() + .add_dt_nano(DtMs::from_ms_u64(pbp.bin_len().ms() - 1).dt_ns()) + .to_ts_ms(), + ); BinWriteIndexRtStream { rt1, series, @@ -148,7 +154,6 @@ impl Stream for BinWriteIndexRtStream { Ready(Ok(x)) => { self.fut1 = None; let item = BinWriteIndexSet { - rt: self.rt1.clone(), msp: MspU32(x.0), entries: x.3, }; diff --git a/crates/scyllaconn/src/binwriteindex/read_all_coarse.rs b/crates/scyllaconn/src/binwriteindex/read_all_coarse.rs new file mode 100644 index 0000000..5bf2a17 --- /dev/null +++ b/crates/scyllaconn/src/binwriteindex/read_all_coarse.rs @@ -0,0 +1,54 @@ +use super::BinWriteIndexRtStream; +use crate::worker::ScyllaQueue; +use daqbuf_series::msp::MspU32; +use daqbuf_series::msp::PrebinnedPartitioning; +use daqbuf_series::SeriesId; +use futures_util::TryStreamExt; +use netpod::log; +use netpod::range::evrange::NanoRange; +use netpod::ttl::RetentionTime; +use netpod::DtMs; +use std::collections::VecDeque; + +macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ); } +macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); } + +autoerr::create_error_v1!( + name(Error, "BinIndexReadAllCoarse"), + enum variants { + Worker(#[from] crate::worker::Error), + BinWriteIndexRead(#[from] super::Error), + }, +); + +pub async fn read_all_coarse( + series: SeriesId, + range: NanoRange, + scyqueue: &ScyllaQueue, +) -> Result, Error> { + let rts = { + use RetentionTime::*; + [Long, Medium, Short] + }; + let mut ret = VecDeque::new(); + for rt in rts { + let pbp = PrebinnedPartitioning::Day1; + let mut stream = BinWriteIndexRtStream::new(rt.clone(), series, pbp, range.clone(), scyqueue.clone()); + while let Some(x) = stream.try_next().await? { + for e in x.entries { + let binlen = DtMs::from_ms_u64(e.binlen as u64); + let item = (rt.clone(), x.msp.clone(), e.lsp, binlen); + ret.push_back(item); + } + } + } + Ok(ret) +} + +pub fn select_potential_binlen(options: VecDeque<(RetentionTime, MspU32, u32, DtMs)>) -> Result<(), Error> { + // Check first if there are common binlen over all the range. + // If not, filter out the options which could build content from finer resolution. + // Then heuristically select the best match. + // PrebinnedPartitioning::Day1.msp_lsp(val) + todo!() +} diff --git a/crates/scyllaconn/src/conn.rs b/crates/scyllaconn/src/conn.rs index 79bb57b..01c285f 100644 --- a/crates/scyllaconn/src/conn.rs +++ b/crates/scyllaconn/src/conn.rs @@ -1,19 +1,23 @@ -use crate::errconv::ErrConv; -use daqbuf_err as err; -use err::Error; use netpod::log::*; use netpod::ScyllaConfig; use scylla::client::execution_profile::ExecutionProfileBuilder; use scylla::client::session::Session; use scylla::client::session_builder::SessionBuilder; +use scylla::errors::NewSessionError; use scylla::statement::Consistency; use std::sync::Arc; +autoerr::create_error_v1!( + name(Error, "ScyllaSessionCreate"), + enum variants { + ScyllaSessionNew(#[from] NewSessionError), + ScyllaUseKeyspace(#[from] scylla::errors::UseKeyspaceError), + }, +); + pub async fn create_scy_session(scyconf: &ScyllaConfig) -> Result, Error> { let scy = create_scy_session_no_ks(scyconf).await?; - scy.use_keyspace(&scyconf.keyspace, true) - .await - .map_err(Error::from_string)?; + scy.use_keyspace(&scyconf.keyspace, true).await?; let ret = Arc::new(scy); Ok(ret) } @@ -24,12 +28,11 @@ pub async fn create_scy_session_no_ks(scyconf: &ScyllaConfig) -> Result Result<(), Error> { - let scy = create_scy_session_no_ks(&self.scyconf_st) - .await - .map_err(Error::ScyllaConnection)?; + let scy = create_scy_session_no_ks(&self.scyconf_st).await?; let scy = Arc::new(scy); let kss = [ self.scyconf_st.keyspace.as_str(),