diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index d40d2c8..df43655 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -17,6 +17,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + [[package]] name = "ahash" version = "0.7.8" @@ -87,9 +93,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.6.14" +version = "0.6.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "418c75fa768af9c03be99d17643f93f79bbba589895012a80e3452a19ddda15b" +checksum = "64e15c1ab1f89faffbf04a634d5e1962e9074f2741eef6d97f3c4e322426d526" dependencies = [ "anstyle", "anstyle-parse", @@ -102,33 +108,33 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.7" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "038dfcf04a5feb68e9c60b21c9625a54c2c0616e79b72b0fd87075a056ae1d1b" +checksum = "1bec1de6f59aedf83baf9ff929c98f2ad654b97c9510f4e70cf6f661d49fd5b1" [[package]] name = "anstyle-parse" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c03a11a9034d92058ceb6ee011ce58af4a9bf61491aa7e1e59ecd24bd40d22d4" +checksum = "eb47de1e80c2b463c735db5b217a0ddc39d612e7ac9e2e96a5aed1f57616c1cb" dependencies = [ "utf8parse", ] [[package]] name = "anstyle-query" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad186efb764318d35165f1758e7dcef3b10628e26d41a44bc5550652e6804391" +checksum = "6d36fc52c7f6c869915e99412912f22093507da8d9e942ceaf66fe4b7c14422a" dependencies = [ "windows-sys 0.52.0", ] [[package]] name = "anstyle-wincon" -version = "3.0.3" +version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61a38449feb7068f52bb06c12759005cf459ee52bb4adc1d5a7c4322d716fb19" +checksum = "5bf74e1b6e971609db8ca7a9ce79fd5768ab6ae46441c572e46cf596f59e57f8" dependencies = [ "anstyle", "windows-sys 0.52.0", @@ -148,15 +154,15 @@ checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" [[package]] name = "arrayref" -version = "0.3.7" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b4930d2cb77ce62f89ee5d5289b4ac049559b1c45539271f5ed4fdc7db34545" +checksum = "9d151e35f61089500b617991b791fc8bfd237ae50cd5950803758a179b41e67a" [[package]] name = "arrayvec" -version = "0.7.4" +version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "async-channel" @@ -200,18 +206,18 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.75", ] [[package]] name = "async-trait" -version = "0.1.80" +version = "0.1.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" +checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.75", ] [[package]] @@ -245,16 +251,16 @@ dependencies = [ "cc", "cfg-if", "libc", - "miniz_oxide", + "miniz_oxide 0.7.4", "object", "rustc-demangle", ] [[package]] name = "base64" -version = "0.21.7" +version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "bincode" @@ -363,9 +369,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.6.0" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" +checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" dependencies = [ "serde", ] @@ -381,9 +387,12 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.104" +version = "1.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74b6a57f98764a267ff415d50a25e6e166f3831a5071af4995296ea97d210490" +checksum = "72db2f7947ecee9b03b510377e8bb9077afa27176fdbff55c51027e976fdcc48" +dependencies = [ + "shlex", +] [[package]] name = "cfg-if" @@ -403,7 +412,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.52.6", + "windows-targets", ] [[package]] @@ -435,9 +444,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.8" +version = "4.5.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84b3edb18336f4df585bc9aa31dd99c036dfa5dc5e9a2939a722a188f3a8970d" +checksum = "ed6719fffa43d0d87e5fd8caeab59be1554fb028cd30edc88fc4369b17971019" dependencies = [ "clap_builder", "clap_derive", @@ -445,9 +454,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.8" +version = "4.5.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1c09dd5ada6c6c78075d6fd0da3f90d8080651e2d6cc8eb2f1aaa4034ced708" +checksum = "216aec2b177652e3846684cbfe25c9964d18ec45234f0f5da5157b207ed1aab6" dependencies = [ "anstream", "anstyle", @@ -457,21 +466,21 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.8" +version = "4.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bac35c6dafb060fd4d275d9a4ffae97917c13a6327903a8be2153cd964f7085" +checksum = "501d359d5f3dcaf6ecdeee48833ae73ec6e42723a1e52419c79abf9507eec0a0" dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.75", ] [[package]] name = "clap_lex" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b82cf0babdbd58558212896d1a4272303a57bdb245c2bf1147185fb45640e70" +checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" [[package]] name = "cobs" @@ -481,9 +490,9 @@ checksum = "67ba02a97a2bd10f4b59b25c7973101c79642302776489e030cd13cdab09ed15" [[package]] name = "colorchoice" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" +checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" [[package]] name = "commonio" @@ -516,9 +525,9 @@ dependencies = [ [[package]] name = "core-foundation-sys" -version = "0.8.6" +version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] name = "corosensei" @@ -535,9 +544,9 @@ dependencies = [ [[package]] name = "cpufeatures" -version = "0.2.12" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53fe5e26ff1b7aef8bca9c6080520cfb8d9333c7568e1829cef191a9723e5504" +checksum = "51e852e6dc9a5bed1fae92dd2375037bf2b768725bf3be87811edee3249d09ad" dependencies = [ "libc", ] @@ -714,7 +723,7 @@ dependencies = [ [[package]] name = "daqbuffer" -version = "0.5.1-aa.3" +version = "0.5.3-aa.0" dependencies = [ "bytes", "chrono", @@ -776,12 +785,12 @@ dependencies = [ [[package]] name = "darling" -version = "0.20.9" +version = "0.20.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83b2eb4d90d12bdda5ed17de686c2acb4c57914f8f921b8da7e112b5a36f3fe1" +checksum = "6f63b86c8a8826a49b8c21f08a2d07338eec8d900540f8630dc76284be802989" dependencies = [ - "darling_core 0.20.9", - "darling_macro 0.20.9", + "darling_core 0.20.10", + "darling_macro 0.20.10", ] [[package]] @@ -800,16 +809,16 @@ dependencies = [ [[package]] name = "darling_core" -version = "0.20.9" +version = "0.20.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "622687fe0bac72a04e5599029151f5796111b90f1baaa9b544d807a5e31cd120" +checksum = "95133861a8032aaea082871032f5815eb9e98cef03fa916ab4500513994df9e5" dependencies = [ "fnv", "ident_case", "proc-macro2", "quote", "strsim 0.11.1", - "syn 2.0.68", + "syn 2.0.75", ] [[package]] @@ -825,13 +834,13 @@ dependencies = [ [[package]] name = "darling_macro" -version = "0.20.9" +version = "0.20.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "733cabb43482b1a1b53eee8583c2b9e8684d592215ea83efd305dd31bc2f0178" +checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ - "darling_core 0.20.9", + "darling_core 0.20.10", "quote", - "syn 2.0.68", + "syn 2.0.75", ] [[package]] @@ -971,9 +980,9 @@ dependencies = [ [[package]] name = "document-features" -version = "0.2.8" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef5282ad69563b5fc40319526ba27e0e7363d552a896f0297d54f767717f9b95" +checksum = "cb6969eaabd2421f8a2775cfd2471a2b634372b4a25d41e3bd647b79912850a0" dependencies = [ "litrs", ] @@ -1013,6 +1022,12 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef1a6892d9eef45c8fa6b9e0086428a2cca8491aca8f787c534a3d6d0bcb3ced" +[[package]] +name = "embedded-io" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edd0f118536f44f5ccd48bcb8b111bdc3de888b58c74639dfb034a357d0f206d" + [[package]] name = "enum-iterator" version = "0.7.0" @@ -1035,23 +1050,23 @@ dependencies = [ [[package]] name = "enumset" -version = "1.1.3" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "226c0da7462c13fb57e5cc9e0dc8f0635e7d27f276a3a7fd30054647f669007d" +checksum = "d07a4b049558765cef5f0c1a273c3fc57084d768b44d2f98127aef4cceb17293" dependencies = [ "enumset_derive", ] [[package]] name = "enumset_derive" -version = "0.8.1" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e08b6c6ab82d70f08844964ba10c7babb716de2ecaeab9be5717918a5177d3af" +checksum = "59c3b24c345d8c314966bdc1832f6c2635bfcce8e7cf363bd115987bba2ee242" dependencies = [ - "darling 0.20.9", + "darling 0.20.10", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.75", ] [[package]] @@ -1142,24 +1157,24 @@ checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" [[package]] name = "filetime" -version = "0.2.23" +version = "0.2.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ee447700ac8aa0b2f2bd7bc4462ad686ba06baa6727ac149a2d6277f0d240fd" +checksum = "bf401df4a4e3872c4fe8151134cf483738e74b67fc934d6532c882b3d24a4550" dependencies = [ "cfg-if", "libc", - "redox_syscall 0.4.1", - "windows-sys 0.52.0", + "libredox", + "windows-sys 0.59.0", ] [[package]] name = "flate2" -version = "1.0.30" +version = "1.0.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f54427cfd1c7829e2a139fcefea601bf088ebca651d2bf53ebc600eac295dae" +checksum = "9c0596c1eac1f9e04ed902702e9878208b336edc9d6fddc8a48387349bab3666" dependencies = [ "crc32fast", - "miniz_oxide", + "miniz_oxide 0.8.0", ] [[package]] @@ -1249,7 +1264,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.75", ] [[package]] @@ -1333,9 +1348,9 @@ checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" [[package]] name = "h2" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab" +checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" dependencies = [ "atomic-waker", "bytes", @@ -1343,7 +1358,7 @@ dependencies = [ "futures-core", "futures-sink", "http", - "indexmap 2.2.6", + "indexmap 2.4.0", "slab", "tokio", "tokio-util", @@ -1458,9 +1473,9 @@ dependencies = [ [[package]] name = "http-body" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", "http", @@ -1571,9 +1586,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.4.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4fe55fb7a772d59a5ff1dfbff4fe0258d19b89fec4b233e75d35d5d2316badc" +checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" dependencies = [ "bytes", "futures-channel", @@ -1592,9 +1607,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ab92f4f49ee4fb4f997c784b7a2e0fa70050211e0b6a287f898c3c9785ca956" +checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9" dependencies = [ "bytes", "futures-channel", @@ -1662,9 +1677,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.6" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" +checksum = "93ead53efc7ea8ed3cfb0c79fc8023fbb782a5432b52830b6518941cebe6505c" dependencies = [ "equivalent", "hashbrown 0.14.5", @@ -1679,9 +1694,9 @@ checksum = "f958d3d68f4167080a18141e10381e7634563984a537f2a49a30fd8e53ac5767" [[package]] name = "is_terminal_polyfill" -version = "1.70.0" +version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8478577c03552c21db0e2724ffb8986a5ce7af88107e6be5d2ee6e158c12800" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" [[package]] name = "items_0" @@ -1731,7 +1746,7 @@ dependencies = [ name = "items_proc" version = "0.0.2" dependencies = [ - "syn 2.0.68", + "syn 2.0.75", ] [[package]] @@ -1751,9 +1766,9 @@ checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" [[package]] name = "js-sys" -version = "0.3.69" +version = "0.3.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29c15563dc2726973df627357ce0c9ddddbea194836909d655df6a75d2cf296d" +checksum = "1868808506b929d7b0cfa8f75951347aa71bb21144b7791bae35d9bccfcfe37a" dependencies = [ "wasm-bindgen", ] @@ -1772,9 +1787,20 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" [[package]] name = "libc" -version = "0.2.155" +version = "0.2.158" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" + +[[package]] +name = "libredox" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" +dependencies = [ + "bitflags 2.6.0", + "libc", + "redox_syscall 0.5.3", +] [[package]] name = "linux-raw-sys" @@ -1888,14 +1914,24 @@ dependencies = [ ] [[package]] -name = "mio" -version = "0.8.11" +name = "miniz_oxide" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" dependencies = [ + "adler2", +] + +[[package]] +name = "mio" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" +dependencies = [ + "hermit-abi", "libc", "wasi", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -1988,7 +2024,7 @@ checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.75", ] [[package]] @@ -2000,21 +2036,11 @@ dependencies = [ "autocfg", ] -[[package]] -name = "num_cpus" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" -dependencies = [ - "hermit-abi", - "libc", -] - [[package]] name = "object" -version = "0.36.1" +version = "0.36.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "081b846d1d56ddfc18fdf1a922e4f6e07a11768ea1b92dec44e42b72712ccfce" +checksum = "27b64972346851a39438c60b341ebc01bba47464ae329e55cf343eb93964efd9" dependencies = [ "memchr", ] @@ -2055,9 +2081,9 @@ checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" dependencies = [ "cfg-if", "libc", - "redox_syscall 0.5.2", + "redox_syscall 0.5.3", "smallvec", - "windows-targets 0.52.6", + "windows-targets", ] [[package]] @@ -2124,7 +2150,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.75", ] [[package]] @@ -2141,21 +2167,22 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "postcard" -version = "1.0.8" +version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a55c51ee6c0db07e68448e336cf8ea4131a620edefebf9893e759b2d793420f8" +checksum = "20ee10b999a00ca189ac2cb99f5db1ca71fb7371e3d5f493b879ca95d2a67220" dependencies = [ "cobs", - "embedded-io", + "embedded-io 0.4.0", + "embedded-io 0.6.1", "heapless", "serde", ] [[package]] name = "postgres-protocol" -version = "0.6.6" +version = "0.6.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49b6c5ef183cd3ab4ba005f1ca64c21e8bd97ce4699cfea9e8d9a2c4958ca520" +checksum = "acda0ebdebc28befa84bee35e651e4c5f09073d668c7aed4cf7e23c3cda84b23" dependencies = [ "base64", "byteorder", @@ -2171,9 +2198,9 @@ dependencies = [ [[package]] name = "postgres-types" -version = "0.2.6" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d2234cdee9408b523530a9b6d2d6b373d1db34f6a8e51dc03ded1828d7fb67c" +checksum = "02048d9e032fb3cc3413bbf7b83a15d84a5d419778e2628751896d856498eee9" dependencies = [ "bytes", "chrono", @@ -2191,9 +2218,12 @@ checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" [[package]] name = "ppv-lite86" -version = "0.2.17" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" +dependencies = [ + "zerocopy", +] [[package]] name = "proc-macro-error" @@ -2350,9 +2380,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c82cf8cff14456045f55ec4241383baeff27af886adb72ffb2162f99911de0fd" +checksum = "2a908a6e00f1fdd0dfd9c0eb08ce85126f6d8bbda50017e74bc4a4b7d4a926a4" dependencies = [ "bitflags 2.6.0", ] @@ -2371,9 +2401,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.5" +version = "1.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b91213439dad192326a0d7c6ee3955910425f441d7038e0d6933b0aec5c4517f" +checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619" dependencies = [ "aho-corasick", "memchr", @@ -2421,9 +2451,9 @@ dependencies = [ [[package]] name = "rkyv" -version = "0.7.44" +version = "0.7.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cba464629b3394fc4dbc6f940ff8f5b4ff5c7aef40f29166fd4ad12acbc99c0" +checksum = "9008cd6385b9e161d8229e1f6549dd23c3d022f132a2ea37ac3a10ac4935779b" dependencies = [ "bitvec", "bytecheck", @@ -2440,9 +2470,9 @@ dependencies = [ [[package]] name = "rkyv_derive" -version = "0.7.44" +version = "0.7.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7dddfff8de25e6f62b9d64e6e432bf1c6736c57d20323e15ee10435fbda7c65" +checksum = "503d1d27590a2b0a3a4ca4c94755aa2875657196ecbf401a42eff41d7de532c0" dependencies = [ "proc-macro2", "quote", @@ -2527,7 +2557,7 @@ dependencies = [ "proc-macro2", "quote", "serde_derive_internals", - "syn 2.0.68", + "syn 2.0.75", ] [[package]] @@ -2538,9 +2568,9 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "scylla" -version = "0.13.0" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9439d92eea9f86c07175c819c3a129ca28b02477b47df26db354a1f4ea7ee276" +checksum = "b20b46cf4ea921ba41121ba9ddf933185cd830cbe2c4fa6272a6e274a6b7368d" dependencies = [ "arc-swap", "async-trait", @@ -2561,7 +2591,7 @@ dependencies = [ "smallvec", "snap", "socket2", - "thiserror 1.0.61", + "thiserror 1.0.63", "tokio", "tracing", "uuid", @@ -2569,9 +2599,9 @@ dependencies = [ [[package]] name = "scylla-cql" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64037fb9d9c59ae15137fff9a56c4d528908dfd38d09e75b5f8e56e3894966dd" +checksum = "27ea3cd3ff5bf9d7db7a6d65c54cecf52f7c40b8e3e32c8c2d6da84d23776ea4" dependencies = [ "async-trait", "byteorder", @@ -2579,21 +2609,21 @@ dependencies = [ "lz4_flex", "scylla-macros", "snap", - "thiserror 1.0.61", + "thiserror 1.0.63", "tokio", "uuid", ] [[package]] name = "scylla-macros" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e5fe1d389adebe6a1a27bce18b81a65ff18c25d58a795de490e18b0e7a27b9f" +checksum = "e50f3e2aec7ea9f495e029fb783eb34c64d26a8f2055e1d6b43d00e04d2fbda6" dependencies = [ - "darling 0.20.9", + "darling 0.20.10", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.75", ] [[package]] @@ -2610,6 +2640,7 @@ dependencies = [ "query", "scylla", "series", + "taskrun", ] [[package]] @@ -2635,9 +2666,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.203" +version = "1.0.208" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7253ab4de971e72fb7be983802300c30b5a7f0c2e56fab8abfc6a214307c0094" +checksum = "cff085d2cb684faa248efb494c39b68e522822ac0de72ccf08109abde717cfb2" dependencies = [ "serde_derive", ] @@ -2665,13 +2696,13 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.203" +version = "1.0.208" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba" +checksum = "24008e81ff7613ed8e5ba0cfaf24e2c2f1e5b8a0495711e44fcd4882fca62bcf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.75", ] [[package]] @@ -2682,25 +2713,26 @@ checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.75", ] [[package]] name = "serde_json" -version = "1.0.120" +version = "1.0.125" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e0d21c9a8cae1235ad58a00c11cb40d4b1e5c784f1ef2c537876ed6ffd8b7c5" +checksum = "83c8e735a073ccf5be70aa8066aa984eaf2fa000db6c8d0100ae605b366d31ed" dependencies = [ "itoa", + "memchr", "ryu", "serde", ] [[package]] name = "serde_spanned" -version = "0.6.6" +version = "0.6.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79e674e01f999af37c49f70a6ede167a8a60b2503e56c5599532a65baa5969a0" +checksum = "eb5b1b31579f3811bf615c144393417496f152e12ac8b7663bf664f4a815306d" dependencies = [ "serde", ] @@ -2711,7 +2743,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap 2.2.6", + "indexmap 2.4.0", "itoa", "ryu", "serde", @@ -2757,6 +2789,12 @@ dependencies = [ "memmap2 0.6.2", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -2908,9 +2946,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.68" +version = "2.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "901fa70d88b9d6c98022e23b4136f9f3e54e4662c3bc1bd1d84a42a9a0f0c1e9" +checksum = "f6af063034fc1935ede7be0122941bafa9bacb949334d090b77ca98b5817c7d9" dependencies = [ "proc-macro2", "quote", @@ -2936,9 +2974,9 @@ dependencies = [ [[package]] name = "target-lexicon" -version = "0.12.14" +version = "0.12.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1fc403891a21bcfb7c37834ba66a547a8f402146eba7265b5a6d88059c9ff2f" +checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" [[package]] name = "taskrun" @@ -2957,52 +2995,53 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.10.1" +version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" +checksum = "04cbcdd0c794ebb0d4cf35e88edd2f7d2c4c3e9a5a6dab322839b321c6a87a64" dependencies = [ "cfg-if", "fastrand", + "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] name = "thiserror" version = "0.0.1" -source = "git+https://github.com/dominikwerder/thiserror.git?branch=cstm#ddbee753d83a11d7c716fbb6d7fe6678968eb0a9" +source = "git+https://github.com/dominikwerder/thiserror.git?branch=cstm#8d3fc303d3741068c05ce2b533c058fa44bf9a1d" dependencies = [ - "thiserror-impl 1.0.61 (git+https://github.com/dominikwerder/thiserror.git?branch=cstm)", + "thiserror-impl 1.0.61", ] [[package]] name = "thiserror" -version = "1.0.61" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709" +checksum = "c0342370b38b6a11b6cc11d6a805569958d54cfa061a29969c3b5ce2ea405724" dependencies = [ - "thiserror-impl 1.0.61 (registry+https://github.com/rust-lang/crates.io-index)", + "thiserror-impl 1.0.63", ] [[package]] name = "thiserror-impl" version = "1.0.61" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" +source = "git+https://github.com/dominikwerder/thiserror.git?branch=cstm#8d3fc303d3741068c05ce2b533c058fa44bf9a1d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.75", ] [[package]] name = "thiserror-impl" -version = "1.0.61" -source = "git+https://github.com/dominikwerder/thiserror.git?branch=cstm#ddbee753d83a11d7c716fbb6d7fe6678968eb0a9" +version = "1.0.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.75", ] [[package]] @@ -3057,9 +3096,9 @@ dependencies = [ [[package]] name = "tinyvec" -version = "1.6.1" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c55115c6fbe2d2bef26eb09ad74bde02d8255476fc0c7b515ef09fbb35742d82" +checksum = "445e881f4f6d382d5f27c034e25eb92edd7c784ceab92a0937db7f2e9471b938" dependencies = [ "tinyvec_macros", ] @@ -3072,40 +3111,39 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.38.0" +version = "1.39.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a" +checksum = "9babc99b9923bfa4804bd74722ff02c0381021eafa4db9949217e3be8e84fff5" dependencies = [ "backtrace", "bytes", "libc", "mio", - "num_cpus", "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", "tokio-macros", "tracing", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] name = "tokio-macros" -version = "2.3.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.75", ] [[package]] name = "tokio-postgres" -version = "0.7.10" +version = "0.7.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d340244b32d920260ae7448cb72b6e238bddc3d4f7603394e7dd46ed8e48f5b8" +checksum = "03adcf0147e203b6032c0b2d30be1415ba03bc348901f3ff1cc0df6a733e60c3" dependencies = [ "async-trait", "byteorder", @@ -3153,61 +3191,36 @@ dependencies = [ [[package]] name = "toml" -version = "0.7.8" +version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd79e69d3b627db300ff956027cc6c3798cef26d22526befdfcd12feeb6d2257" +checksum = "a1ed1f98e3fdc28d6d910e6737ae6ab1a93bf1985935a1193e68f93eeb68d24e" dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit 0.19.15", -] - -[[package]] -name = "toml" -version = "0.8.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f49eb2ab21d2f26bd6db7bf383edc527a7ebaee412d17af4d40fdccd442f335" -dependencies = [ - "serde", - "serde_spanned", - "toml_datetime", - "toml_edit 0.22.14", + "toml_edit", ] [[package]] name = "toml_datetime" -version = "0.6.6" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4badfd56924ae69bcc9039335b2e017639ce3f9b001c393c1b2d1ef846ce2cbf" +checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" dependencies = [ "serde", ] [[package]] name = "toml_edit" -version = "0.19.15" +version = "0.22.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" +checksum = "583c44c02ad26b0c3f3066fe629275e50627026c51ac2e595cca4c230ce1ce1d" dependencies = [ - "indexmap 2.2.6", + "indexmap 2.4.0", "serde", "serde_spanned", "toml_datetime", - "winnow 0.5.40", -] - -[[package]] -name = "toml_edit" -version = "0.22.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f21c7aaf97f1bd9ca9d4f9e73b0a6c74bd5afef56f2bc931943a6e1c37e04e38" -dependencies = [ - "indexmap 2.2.6", - "serde", - "serde_spanned", - "toml_datetime", - "winnow 0.6.13", + "winnow", ] [[package]] @@ -3227,15 +3240,15 @@ dependencies = [ [[package]] name = "tower-layer" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-service" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" @@ -3256,7 +3269,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.75", ] [[package]] @@ -3325,9 +3338,9 @@ dependencies = [ [[package]] name = "typeid" -version = "1.0.0" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "059d83cc991e7a42fc37bd50941885db0888e34209f8cfd9aab07ddec03bc9cf" +checksum = "0e13db2e0ccd5e14a544e8a246ba2312cd25223f616442d7f2cb0e3db614236e" [[package]] name = "typenum" @@ -3337,9 +3350,9 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" [[package]] name = "typetag" -version = "0.2.16" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "661d18414ec032a49ece2d56eee03636e43c4e8d577047ab334c0ba892e29aaf" +checksum = "52ba3b6e86ffe0054b2c44f2d86407388b933b16cb0a70eea3929420db1d9bbe" dependencies = [ "erased-serde", "inventory", @@ -3350,13 +3363,13 @@ dependencies = [ [[package]] name = "typetag-impl" -version = "0.2.16" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac73887f47b9312552aa90ef477927ff014d63d1920ca8037c6c1951eab64bb1" +checksum = "70b20a22c42c8f1cd23ce5e34f165d4d37038f5b663ad20fb6adbdf029172483" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.75", ] [[package]] @@ -3382,9 +3395,9 @@ dependencies = [ [[package]] name = "unicode-properties" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4259d9d4425d9f0661581b804cb85fe66a4c631cadd8f490d1c13a35d5d9291" +checksum = "52ea75f83c0137a9b98608359a5f1af8144876eb67bcb1ce837368e906a9f524" [[package]] name = "unsafe-libyaml" @@ -3412,9 +3425,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.9.1" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5de17fd2f7da591098415cff336e12965a28061ddace43b59cb3c430179c9439" +checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" dependencies = [ "getrandom", ] @@ -3427,9 +3440,9 @@ checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" [[package]] name = "version_check" -version = "0.9.4" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" [[package]] name = "want" @@ -3454,34 +3467,35 @@ checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" [[package]] name = "wasm-bindgen" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8" +checksum = "a82edfc16a6c469f5f44dc7b571814045d60404b55a0ee849f9bcfa2e63dd9b5" dependencies = [ "cfg-if", + "once_cell", "wasm-bindgen-macro", ] [[package]] name = "wasm-bindgen-backend" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "614d787b966d3989fa7bb98a654e369c762374fd3213d212cfc0251257e747da" +checksum = "9de396da306523044d3302746f1208fa71d7532227f15e347e2d93e4145dd77b" dependencies = [ "bumpalo", "log 0.4.22", "once_cell", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.75", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-macro" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726" +checksum = "585c4c91a46b072c92e908d99cb1dcdf95c5218eeb6f3bf1efa991ee7a68cccf" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -3489,28 +3503,28 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" +checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.75", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" +checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484" [[package]] name = "wasmer" -version = "4.3.3" +version = "4.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3de82c1520fb75ade394d6c89cac2b05d1b7b4205a45099efcd078b0bc828375" +checksum = "c3a6e0f73e5ae361fe64db607eaf4ab2381d88ad2c1b0bb8cf254cf35d894687" dependencies = [ "bytes", "cfg-if", @@ -3523,7 +3537,7 @@ dependencies = [ "serde-wasm-bindgen", "shared-buffer", "target-lexicon", - "thiserror 1.0.61", + "thiserror 1.0.63", "tracing", "wasm-bindgen", "wasmer-compiler", @@ -3536,9 +3550,9 @@ dependencies = [ [[package]] name = "wasmer-compiler" -version = "4.3.3" +version = "4.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6c18588e4b92adb2ea88a2897c93615538c7e40cfec8dc6efc3fcee63299a81" +checksum = "cb1e7c79507f5d55f1afd99984717e8380440cd98e13d542e4d00661f986f2d4" dependencies = [ "backtrace", "bytes", @@ -3555,7 +3569,7 @@ dependencies = [ "self_cell", "shared-buffer", "smallvec", - "thiserror 1.0.61", + "thiserror 1.0.63", "wasmer-types", "wasmer-vm", "wasmparser", @@ -3565,9 +3579,9 @@ dependencies = [ [[package]] name = "wasmer-compiler-cranelift" -version = "4.3.3" +version = "4.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf497986b83f1ae8589a22f046b803d6d1d77bbeda93e75baa89ab6501838f7c" +checksum = "8f3352014573750327646a690d32774312b0e8b7920e7e8ba00c0449eac18390" dependencies = [ "cranelift-codegen", "cranelift-entity", @@ -3584,31 +3598,31 @@ dependencies = [ [[package]] name = "wasmer-config" -version = "0.2.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54a0f70c177b1c5062cfe0f5308c3317751796fef9403c22a0cd7b4cacd4ccd8" +checksum = "4b4a632496950fde9ad821e195ef1a301440076f7c7d80de55239a140359bcbd" dependencies = [ "anyhow", "bytesize", "derive_builder", "hex", - "indexmap 2.2.6", + "indexmap 2.4.0", "schemars", "semver", "serde", "serde_cbor", "serde_json", "serde_yaml", - "thiserror 1.0.61", - "toml 0.8.14", + "thiserror 1.0.63", + "toml", "url", ] [[package]] name = "wasmer-derive" -version = "4.3.3" +version = "4.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45ab99baa393da623dbca6390c17bd9cd7666e8c48f6b42b4f8c635106a09ca8" +checksum = "ac6b0b0580cfa1fc7ad58cca3626a742f2b2e5ccd51cfc5de43e8edb0d1daa4c" dependencies = [ "proc-macro-error", "proc-macro2", @@ -3618,9 +3632,9 @@ dependencies = [ [[package]] name = "wasmer-types" -version = "4.3.3" +version = "4.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0aebf59870ee6e2ef0264d87e6a83f767f5cfe83e21470c0cc32e06231335e23" +checksum = "576442cc3d302ca215fd40aa7826a078571dca7eaa773d8cdedca14a2ec7c9a1" dependencies = [ "bytecheck", "enum-iterator", @@ -3632,16 +3646,16 @@ dependencies = [ "rkyv", "sha2", "target-lexicon", - "thiserror 1.0.61", + "thiserror 1.0.63", "webc", "xxhash-rust", ] [[package]] name = "wasmer-vm" -version = "4.3.3" +version = "4.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0df3b84ac2c450ad04931b558392cadd03d606d94ab15496e13ce56ee8320634" +checksum = "6483035d1df84a978cd6c6a35878e913dc8ec6311f8712548a922a75e87957ba" dependencies = [ "backtrace", "cc", @@ -3660,7 +3674,7 @@ dependencies = [ "more-asserts", "region", "scopeguard", - "thiserror 1.0.61", + "thiserror 1.0.63", "wasmer-types", "winapi", ] @@ -3672,15 +3686,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9dbe55c8f9d0dbd25d9447a5a889ff90c0cc3feaa7395310d3d826b2c703eaab" dependencies = [ "bitflags 2.6.0", - "indexmap 2.2.6", + "indexmap 2.4.0", "semver", ] [[package]] name = "web-sys" -version = "0.3.69" +version = "0.3.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77afa9a11836342370f4817622a2f0f418b134426d91a82dfb48f532d2ec13ef" +checksum = "26fdeaafd9bd129f65e7c031593c24d62186301e0c72c8978fa1678be7d532c0" dependencies = [ "js-sys", "wasm-bindgen", @@ -3688,9 +3702,9 @@ dependencies = [ [[package]] name = "webc" -version = "6.0.0-rc1" +version = "6.0.0-rc2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1fc686c7b43c9bc630a499f6ae1f0a4c4bd656576a53ae8a147b0cc9bc983ad" +checksum = "fb3e2ccb43d303c5bd48f31db7a129481a9aaa5343d623f92951751df190df81" dependencies = [ "anyhow", "base64", @@ -3709,8 +3723,8 @@ dependencies = [ "shared-buffer", "tar", "tempfile", - "thiserror 1.0.61", - "toml 0.7.8", + "thiserror 1.0.63", + "toml", "url", "wasmer-config", ] @@ -3754,7 +3768,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.52.6", + "windows-targets", ] [[package]] @@ -3770,37 +3784,22 @@ dependencies = [ "windows_x86_64_msvc 0.33.0", ] -[[package]] -name = "windows-sys" -version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" -dependencies = [ - "windows-targets 0.48.5", -] - [[package]] name = "windows-sys" version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.6", + "windows-targets", ] [[package]] -name = "windows-targets" -version = "0.48.5" +name = "windows-sys" +version = "0.59.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" dependencies = [ - "windows_aarch64_gnullvm 0.48.5", - "windows_aarch64_msvc 0.48.5", - "windows_i686_gnu 0.48.5", - "windows_i686_msvc 0.48.5", - "windows_x86_64_gnu 0.48.5", - "windows_x86_64_gnullvm 0.48.5", - "windows_x86_64_msvc 0.48.5", + "windows-targets", ] [[package]] @@ -3809,22 +3808,16 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_gnullvm", "windows_aarch64_msvc 0.52.6", "windows_i686_gnu 0.52.6", "windows_i686_gnullvm", "windows_i686_msvc 0.52.6", "windows_x86_64_gnu 0.52.6", - "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_gnullvm", "windows_x86_64_msvc 0.52.6", ] -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" - [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -3837,12 +3830,6 @@ version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd761fd3eb9ab8cc1ed81e56e567f02dd82c4c837e48ac3b2181b9ffc5060807" -[[package]] -name = "windows_aarch64_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" - [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -3855,12 +3842,6 @@ version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cab0cf703a96bab2dc0c02c0fa748491294bf9b7feb27e1f4f96340f208ada0e" -[[package]] -name = "windows_i686_gnu" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" - [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -3879,12 +3860,6 @@ version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8cfdbe89cc9ad7ce618ba34abc34bbb6c36d99e96cae2245b7943cd75ee773d0" -[[package]] -name = "windows_i686_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" - [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -3897,24 +3872,12 @@ version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4dd9b0c0e9ece7bb22e84d70d01b71c6d6248b81a3c60d11869451b4cb24784" -[[package]] -name = "windows_x86_64_gnu" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" - [[package]] name = "windows_x86_64_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" - [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -3927,12 +3890,6 @@ version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff1e4aa646495048ec7f3ffddc411e1d829c026a2ec62b39da15c1055e406eaa" -[[package]] -name = "windows_x86_64_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" - [[package]] name = "windows_x86_64_msvc" version = "0.52.6" @@ -3941,18 +3898,9 @@ checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" [[package]] name = "winnow" -version = "0.5.40" +version = "0.6.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f593a95398737aeed53e489c785df13f3618e41dbcd6718c6addbf1395aa6876" -dependencies = [ - "memchr", -] - -[[package]] -name = "winnow" -version = "0.6.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59b5e5f6c299a3c7890b876a2a587f3115162487e704907d9b6cd29473052ba1" +checksum = "68a9bda4691f099d435ad181000724da8e5899daa10713c2d432552b9ccd3a6f" dependencies = [ "memchr", ] @@ -3979,9 +3927,9 @@ dependencies = [ [[package]] name = "xxhash-rust" -version = "0.8.11" +version = "0.8.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63658493314859b4dfdf3fb8c1defd61587839def09582db50b8a4e93afca6bb" +checksum = "6a5cbf750400958819fb6178eaa83bee5cd9c29a26a40cc241df8c70fdd46984" [[package]] name = "zerocopy" @@ -3989,6 +3937,7 @@ version = "0.7.35" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" dependencies = [ + "byteorder", "zerocopy-derive", ] @@ -4000,5 +3949,5 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.75", ] diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index 4b95e65..cb1922b 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqbuffer" -version = "0.5.2" +version = "0.5.3-aa.0" authors = ["Dominik Werder "] edition = "2021" diff --git a/crates/dbconn/src/channelconfig.rs b/crates/dbconn/src/channelconfig.rs index f2ff1d2..b37a462 100644 --- a/crates/dbconn/src/channelconfig.rs +++ b/crates/dbconn/src/channelconfig.rs @@ -1,7 +1,7 @@ -use crate::ErrConv; use chrono::DateTime; use chrono::Utc; -use err::Error; +use err::thiserror; +use err::ThisError; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::ChConf; @@ -13,6 +13,19 @@ use netpod::TsMs; use std::time::Duration; use tokio_postgres::Client; +#[derive(Debug, ThisError)] +#[cstm(name = "DbChannelConfig")] +pub enum Error { + Pg(#[from] tokio_postgres::Error), + #[error("NotFound({0}, {1})")] + NotFound(SfDbChannel, NanoRange), + SeriesNotFound(String, u64), + BadScalarType(i32), + BadShape(Vec), + BadKind(i16), + NoInput, +} + /// It is an unsolved question as to how we want to uniquely address channels. /// Currently, the usual (backend, channelname) works in 99% of the cases, but the edge-cases /// are not solved. At the same time, it is desirable to avoid to complicate things for users. @@ -27,21 +40,6 @@ pub(super) async fn chconf_best_matching_for_name_and_range( pg: &Client, ) -> Result { debug!("chconf_best_matching_for_name_and_range {channel:?} {range:?}"); - #[cfg(DISABLED)] - if ncc.node_config.cluster.scylla.is_none() { - let e = Error::with_msg_no_trace(format!( - "chconf_best_matching_for_name_and_range but not a scylla backend" - )); - error!("{e}"); - return Err(e); - }; - #[cfg(DISABLED)] - if backend != ncc.node_config.cluster.backend { - warn!( - "mismatched backend {} vs {}", - backend, ncc.node_config.cluster.backend - ); - } let sql = concat!( "select unnest(tscs) as tsc, series, scalar_type, shape_dims", " from series_by_channel", @@ -52,10 +50,9 @@ pub(super) async fn chconf_best_matching_for_name_and_range( ); let res = pg .query(sql, &[&channel.backend(), &channel.name(), &channel.kind().to_db_i16()]) - .await - .err_conv()?; + .await?; if res.len() == 0 { - let e = Error::with_public_msg_no_trace(format!("can not find channel information for {channel:?} {range:?}")); + let e = Error::NotFound(channel, range); warn!("{e}"); Err(e) } else if res.len() > 1 { @@ -67,8 +64,9 @@ pub(super) async fn chconf_best_matching_for_name_and_range( // TODO can I get a slice from psql driver? let shape_dims: Vec = r.get(3); let series = series as u64; - let _scalar_type = ScalarType::from_scylla_i32(scalar_type)?; - let _shape = Shape::from_scylla_shape_dims(&shape_dims)?; + let _scalar_type = + ScalarType::from_scylla_i32(scalar_type).map_err(|_| Error::BadScalarType(scalar_type))?; + let _shape = Shape::from_scylla_shape_dims(&shape_dims).map_err(|_| Error::BadShape(shape_dims))?; let tsms = tsc.signed_duration_since(DateTime::UNIX_EPOCH).num_milliseconds() as u64; let ts = TsMs::from_ms_u64(tsms); rows.push((ts, series)); @@ -88,8 +86,8 @@ pub(super) async fn chconf_best_matching_for_name_and_range( let shape_dims: Vec = r.get(3); let series = series as u64; let kind = channel.kind(); - let scalar_type = ScalarType::from_scylla_i32(scalar_type)?; - let shape = Shape::from_scylla_shape_dims(&shape_dims)?; + let scalar_type = ScalarType::from_scylla_i32(scalar_type).map_err(|_| Error::BadScalarType(scalar_type))?; + let shape = Shape::from_scylla_shape_dims(&shape_dims).map_err(|_| Error::BadShape(shape_dims))?; let ret = ChConf::new(channel.backend(), series, kind, scalar_type, shape, channel.name()); Ok(ret) } @@ -97,7 +95,7 @@ pub(super) async fn chconf_best_matching_for_name_and_range( fn decide_best_matching_index(range: (TsMs, TsMs), rows: &[TsMs]) -> Result { if rows.len() < 1 { - let e = Error::with_msg_no_trace("decide_best_matching_index no rows"); + let e = Error::NoInput; warn!("{e}"); Err(e) } else { @@ -205,22 +203,22 @@ pub(super) async fn chconf_for_series(backend: &str, series: u64, pg: &Client) - "select channel, scalar_type, shape_dims, kind from series_by_channel where facility = $1 and series = $2", &[&backend, &(series as i64)], ) - .await - .err_conv()?; + .await?; if res.len() < 1 { - let e = Error::with_public_msg_no_trace(format!( - "can not find channel information backend {backend} series {series}" - )); + let e = Error::SeriesNotFound(backend.into(), series); warn!("{e}"); Err(e) } else { let row = res.first().unwrap(); let name: String = row.get(0); - let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(1) as u8)?; + let scalar_type = row.get::<_, i32>(1); + let scalar_type = + ScalarType::from_dtype_index(scalar_type as _).map_err(|_| Error::BadScalarType(scalar_type))?; // TODO can I get a slice from psql driver? - let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec>(2))?; + let shape = row.get::<_, Vec>(2); + let shape = Shape::from_scylla_shape_dims(&shape).map_err(|_| Error::BadShape(shape))?; let kind: i16 = row.get(3); - let kind = SeriesKind::from_db_i16(kind)?; + let kind = SeriesKind::from_db_i16(kind).map_err(|_| Error::BadKind(kind))?; let ret = ChConf::new(backend, series, kind, scalar_type, shape, name); Ok(ret) } diff --git a/crates/dbconn/src/worker.rs b/crates/dbconn/src/worker.rs index 664718f..510867e 100644 --- a/crates/dbconn/src/worker.rs +++ b/crates/dbconn/src/worker.rs @@ -23,6 +23,7 @@ pub enum Error { ChannelSend, ChannelRecv, Join, + ChannelConfig(#[from] crate::channelconfig::Error), } impl From for Error { @@ -39,8 +40,12 @@ impl err::ToErr for Error { #[derive(Debug)] enum Job { - ChConfBestMatchingNameRange(SfDbChannel, NanoRange, Sender>), - ChConfForSeries(String, u64, Sender>), + ChConfBestMatchingNameRange( + SfDbChannel, + NanoRange, + Sender>, + ), + ChConfForSeries(String, u64, Sender>), InfoForSeriesIds( Vec, Sender>, crate::channelinfo::Error>>, @@ -58,26 +63,28 @@ pub struct PgQueue { } impl PgQueue { - pub async fn chconf_for_series( - &self, - backend: &str, - series: u64, - ) -> Result>, Error> { - let (tx, rx) = async_channel::bounded(1); - let job = Job::ChConfForSeries(backend.into(), series, tx); - self.tx.send(job).await.map_err(|_| Error::ChannelSend)?; - Ok(rx) - } - pub async fn chconf_best_matching_name_range( &self, channel: SfDbChannel, range: NanoRange, - ) -> Result>, Error> { + ) -> Result, netpod::AsyncChannelError> { let (tx, rx) = async_channel::bounded(1); let job = Job::ChConfBestMatchingNameRange(channel, range, tx); - self.tx.send(job).await.map_err(|_| Error::ChannelSend)?; - Ok(rx) + self.tx.send(job).await.map_err(|_| netpod::AsyncChannelError::Send)?; + let res = rx.recv().await.map_err(|_| netpod::AsyncChannelError::Recv)?; + Ok(res) + } + + pub async fn chconf_for_series( + &self, + backend: &str, + series: u64, + ) -> Result, netpod::AsyncChannelError> { + let (tx, rx) = async_channel::bounded(1); + let job = Job::ChConfForSeries(backend.into(), series, tx); + self.tx.send(job).await.map_err(|_| netpod::AsyncChannelError::Send)?; + let res = rx.recv().await.map_err(|_| netpod::AsyncChannelError::Recv)?; + Ok(res) } pub async fn info_for_series_ids( diff --git a/crates/httpclient/src/httpclient.rs b/crates/httpclient/src/httpclient.rs index 8864e29..25ea6eb 100644 --- a/crates/httpclient/src/httpclient.rs +++ b/crates/httpclient/src/httpclient.rs @@ -74,6 +74,57 @@ pub fn body_bytes>(body: D) -> StreamBody { http_body_util::StreamBody::new(Box::pin(stream)) } +pub fn internal_error() -> http::Response { + let mut res = http::Response::new(body_empty()); + *res.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + res +} + +pub fn error_response(msg: String, reqid: impl AsRef) -> http::Response { + let status = StatusCode::INTERNAL_SERVER_ERROR; + let js = serde_json::json!({ + "message": msg.to_string(), + "requestid": reqid.as_ref(), + }); + if let Ok(body) = serde_json::to_string_pretty(&js) { + match Response::builder() + .status(status) + .header(http::header::CONTENT_TYPE, APP_JSON) + .body(body_string(body)) + { + Ok(res) => res, + Err(e) => { + error!("can not generate http error response {e}"); + internal_error() + } + } + } else { + internal_error() + } +} +pub fn not_found_response(msg: String, reqid: impl AsRef) -> http::Response { + let status = StatusCode::NOT_FOUND; + let js = serde_json::json!({ + "message": msg.to_string(), + "requestid": reqid.as_ref(), + }); + if let Ok(body) = serde_json::to_string_pretty(&js) { + match Response::builder() + .status(status) + .header(http::header::CONTENT_TYPE, APP_JSON) + .body(body_string(body)) + { + Ok(res) => res, + Err(e) => { + error!("can not generate http error response {e}"); + internal_error() + } + } + } else { + internal_error() + } +} + pub trait IntoBody { fn into_body(self) -> StreamBody; } @@ -153,6 +204,7 @@ impl Stream for StreamIncoming { if x.is_data() { Ready(Some(Ok(x.into_data().unwrap()))) } else { + warn!("non-data in stream: {x:?}"); Ready(Some(Ok(Bytes::new()))) } } diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index e87517f..afbaab0 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -1,15 +1,16 @@ use crate::bodystream::response; -use crate::bodystream::response_err_msg; -use crate::bodystream::ToPublicResponse; use crate::channelconfig::ch_conf_from_binned; -use crate::err::Error; use crate::requests::accepts_json_or_all; use crate::requests::accepts_octets; use crate::ServiceSharedResources; use dbconn::worker::PgQueue; +use err::thiserror; +use err::ThisError; use http::Method; use http::StatusCode; use httpclient::body_empty; +use httpclient::error_response; +use httpclient::not_found_response; use httpclient::IntoBody; use httpclient::Requ; use httpclient::StreamResponse; @@ -25,69 +26,27 @@ use query::api4::binned::BinnedQuery; use tracing::Instrument; use url::Url; -async fn binned_json( - url: Url, - req: Requ, - ctx: &ReqCtx, - pgqueue: &PgQueue, - ncc: &NodeConfigCached, -) -> Result { - debug!("{:?}", req); - let reqid = crate::status_board() - .map_err(|e| Error::with_msg_no_trace(e.to_string()))? - .new_status_id(); - let (_head, _body) = req.into_parts(); - let query = BinnedQuery::from_url(&url).map_err(|e| { - error!("binned_json: {e:?}"); - let msg = format!("can not parse query: {}", e.msg()); - e.add_public_msg(msg) - })?; - // TODO handle None case better and return 404 - let ch_conf = ch_conf_from_binned(&query, ctx, pgqueue, ncc) - .await? - .ok_or_else(|| Error::with_msg_no_trace("channel not found"))?; - let span1 = span!( - Level::INFO, - "httpret::binned", - reqid, - beg = query.range().beg_u64() / SEC, - end = query.range().end_u64() / SEC, - ch = query.channel().name(), - ); - span1.in_scope(|| { - debug!("begin"); - }); - let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); - let open_bytes = Box::pin(open_bytes); - let item = streams::timebinnedjson::timebinned_json(query, ch_conf, ctx, open_bytes) - .instrument(span1) - .await?; - let ret = response(StatusCode::OK).body(ToJsonBody::from(&item).into_body())?; - Ok(ret) +#[derive(Debug, ThisError)] +#[cstm(name = "Api4Binned")] +pub enum Error { + ChannelNotFound, + BadQuery(String), + HttpLib(#[from] http::Error), + ChannelConfig(crate::channelconfig::Error), + Retrieval(#[from] crate::RetrievalError), + EventsCbor(#[from] streams::plaineventscbor::Error), + EventsJson(#[from] streams::plaineventsjson::Error), + ServerError, + BinnedStream(::err::Error), } -async fn binned(req: Requ, ctx: &ReqCtx, pgqueue: &PgQueue, ncc: &NodeConfigCached) -> Result { - let url = req_uri_to_url(req.uri())?; - if req - .uri() - .path_and_query() - .map_or(false, |x| x.as_str().contains("DOERR")) - { - Err(Error::with_msg_no_trace("hidden message").add_public_msg("PublicMessage"))?; - } - if accepts_json_or_all(&req.headers()) { - Ok(binned_json(url, req, ctx, pgqueue, ncc).await?) - } else if accepts_octets(&req.headers()) { - Ok(response_err_msg( - StatusCode::NOT_ACCEPTABLE, - format!("binary binned data not yet available"), - )?) - } else { - let ret = response_err_msg( - StatusCode::NOT_ACCEPTABLE, - format!("Unsupported Accept: {:?}", req.headers()), - )?; - Ok(ret) +impl From for Error { + fn from(value: crate::channelconfig::Error) -> Self { + use crate::channelconfig::Error::*; + match value { + NotFound(_) => Self::ChannelNotFound, + _ => Self::ChannelConfig(value), + } } } @@ -110,14 +69,86 @@ impl BinnedHandler { ncc: &NodeConfigCached, ) -> Result { if req.method() != Method::GET { - return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); + return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?); } match binned(req, ctx, &shared_res.pgqueue, ncc).await { Ok(ret) => Ok(ret), - Err(e) => { - warn!("BinnedHandler handle sees: {e}"); - Ok(e.to_public_response()) - } + Err(e) => match e { + Error::ChannelNotFound => { + let res = not_found_response("channel not found".into(), ctx.reqid()); + Ok(res) + } + Error::BadQuery(msg) => { + let res = error_response(format!("bad query: {msg}"), ctx.reqid()); + Ok(res) + } + _ => { + error!("EventsHandler sees: {e}"); + Ok(error_response(e.public_message(), ctx.reqid())) + } + }, } } } + +async fn binned(req: Requ, ctx: &ReqCtx, pgqueue: &PgQueue, ncc: &NodeConfigCached) -> Result { + let url = req_uri_to_url(req.uri()).map_err(|e| Error::BadQuery(e.to_string()))?; + if req + .uri() + .path_and_query() + .map_or(false, |x| x.as_str().contains("DOERR")) + { + Err(Error::ServerError)?; + } + if accepts_json_or_all(&req.headers()) { + Ok(binned_json(url, req, ctx, pgqueue, ncc).await?) + } else if accepts_octets(&req.headers()) { + Ok(error_response( + format!("binary binned data not yet available"), + ctx.reqid(), + )) + } else { + let ret = error_response(format!("Unsupported Accept: {:?}", req.headers()), ctx.reqid()); + Ok(ret) + } +} + +async fn binned_json( + url: Url, + req: Requ, + ctx: &ReqCtx, + pgqueue: &PgQueue, + ncc: &NodeConfigCached, +) -> Result { + debug!("{:?}", req); + let reqid = crate::status_board().map_err(|_e| Error::ServerError)?.new_status_id(); + let (_head, _body) = req.into_parts(); + let query = BinnedQuery::from_url(&url).map_err(|e| { + error!("binned_json: {e:?}"); + Error::BadQuery(e.to_string()) + })?; + // TODO handle None case better and return 404 + let ch_conf = ch_conf_from_binned(&query, ctx, pgqueue, ncc) + .await? + .ok_or_else(|| Error::ChannelNotFound)?; + let span1 = span!( + Level::INFO, + "httpret::binned", + reqid, + beg = query.range().beg_u64() / SEC, + end = query.range().end_u64() / SEC, + ch = query.channel().name(), + ); + span1.in_scope(|| { + debug!("begin"); + }); + let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); + let open_bytes = Box::pin(open_bytes); + let item = streams::timebinnedjson::timebinned_json(query, ch_conf, ctx, open_bytes) + .instrument(span1) + .await + .map_err(|e| Error::BinnedStream(e))?; + let ret = response(StatusCode::OK).body(ToJsonBody::from(&item).into_body())?; + // let ret = error_response(e.public_message(), ctx.reqid()); + Ok(ret) +} diff --git a/crates/httpret/src/api4/databuffer_tools.rs b/crates/httpret/src/api4/databuffer_tools.rs index 72d3a97..b4eb31e 100644 --- a/crates/httpret/src/api4/databuffer_tools.rs +++ b/crates/httpret/src/api4/databuffer_tools.rs @@ -1,5 +1,4 @@ use crate::bodystream::response; -use crate::bodystream::response_err_msg; use async_channel::Receiver; use async_channel::Sender; use bytes::Bytes; @@ -14,6 +13,7 @@ use http::Response; use http::StatusCode; use httpclient::body_empty; use httpclient::body_stream; +use httpclient::error_response; use httpclient::Requ; use httpclient::StreamResponse; use netpod::log::*; @@ -76,8 +76,7 @@ impl FindActiveHandler { Ok(ret) => Ok(ret), Err(e) => { error!("{e}"); - let res = response_err_msg(StatusCode::NOT_ACCEPTABLE, e.to_public_error()) - .map_err(|_| FindActiveError::InternalError)?; + let res = error_response(e.to_public_error().to_string(), "missing-req"); Ok(res) } } diff --git a/crates/httpret/src/api4/eventdata.rs b/crates/httpret/src/api4/eventdata.rs index d01569a..b23834f 100644 --- a/crates/httpret/src/api4/eventdata.rs +++ b/crates/httpret/src/api4/eventdata.rs @@ -1,4 +1,3 @@ -use crate::bodystream::response_err_msg; use crate::response; use crate::ReqCtx; use crate::ServiceSharedResources; @@ -10,6 +9,7 @@ use http::Method; use http::StatusCode; use httpclient::body_empty; use httpclient::body_stream; +use httpclient::error_response; use httpclient::read_body_bytes; use httpclient::Requ; use httpclient::StreamResponse; @@ -50,7 +50,7 @@ impl EventDataHandler { pub async fn handle( &self, req: Requ, - _ctx: &ReqCtx, + ctx: &ReqCtx, ncc: &NodeConfigCached, shared_res: Arc, ) -> Result { @@ -63,8 +63,7 @@ impl EventDataHandler { Ok(ret) => Ok(ret), Err(e) => { error!("{e}"); - let res = response_err_msg(StatusCode::NOT_ACCEPTABLE, e.to_public_error()) - .map_err(|_| EventDataError::InternalError)?; + let res = error_response(e.to_public_error().to_string(), ctx.reqid()); Ok(res) } } diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index 14c903b..c73feb5 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -1,23 +1,25 @@ -use crate::bodystream::response_err_msg; use crate::channelconfig::chconf_from_events_quorum; -use crate::err::Error; use crate::requests::accepts_cbor_framed; use crate::requests::accepts_json_framed; use crate::requests::accepts_json_or_all; use crate::response; use crate::ServiceSharedResources; -use crate::ToPublicResponse; use bytes::Bytes; use bytes::BytesMut; use dbconn::worker::PgQueue; +use err::thiserror; +use err::ThisError; use futures_util::future; use futures_util::stream; use futures_util::Stream; use futures_util::StreamExt; +use http::header::CONTENT_TYPE; use http::Method; use http::StatusCode; use httpclient::body_empty; use httpclient::body_stream; +use httpclient::error_response; +use httpclient::not_found_response; use httpclient::IntoBody; use httpclient::Requ; use httpclient::StreamResponse; @@ -28,11 +30,36 @@ use netpod::ChannelTypeConfigGen; use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::ReqCtx; +use netpod::APP_CBOR_FRAMED; +use netpod::APP_JSON; +use netpod::APP_JSON_FRAMED; +use netpod::HEADER_NAME_REQUEST_ID; use nodenet::client::OpenBoxedBytesViaHttp; use query::api4::events::PlainEventsQuery; use streams::instrument::InstrumentStream; use tracing::Instrument; +#[derive(Debug, ThisError)] +#[cstm(name = "Api4Events")] +pub enum Error { + ChannelNotFound, + HttpLib(#[from] http::Error), + ChannelConfig(crate::channelconfig::Error), + Retrieval(#[from] crate::RetrievalError), + EventsCbor(#[from] streams::plaineventscbor::Error), + EventsJson(#[from] streams::plaineventsjson::Error), +} + +impl From for Error { + fn from(value: crate::channelconfig::Error) -> Self { + use crate::channelconfig::Error::*; + match value { + NotFound(_) => Self::ChannelNotFound, + _ => Self::ChannelConfig(value), + } + } +} + pub struct EventsHandler {} impl EventsHandler { @@ -50,9 +77,9 @@ impl EventsHandler { ctx: &ReqCtx, shared_res: &ServiceSharedResources, ncc: &NodeConfigCached, - ) -> Result { + ) -> Result { if req.method() != Method::GET { - return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); + return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?); } let self_name = "handle"; let url = req_uri_to_url(req.uri())?; @@ -73,10 +100,16 @@ impl EventsHandler { .await { Ok(ret) => Ok(ret), - Err(e) => { - error!("EventsHandler sees: {e}"); - Ok(e.to_public_response()) - } + Err(e) => match e { + Error::ChannelNotFound => { + let res = not_found_response("channel not found".into(), ctx.reqid()); + Ok(res) + } + _ => { + error!("EventsHandler sees: {e}"); + Ok(error_response(e.public_message(), ctx.reqid())) + } + }, } } } @@ -90,7 +123,7 @@ async fn plain_events( ) -> Result { let ch_conf = chconf_from_events_quorum(&evq, ctx, pgqueue, ncc) .await? - .ok_or_else(|| Error::with_msg_no_trace("channel not found"))?; + .ok_or_else(|| Error::ChannelNotFound)?; if accepts_cbor_framed(req.headers()) { Ok(plain_events_cbor_framed(req, evq, ch_conf, ctx, ncc).await?) } else if accepts_json_framed(req.headers()) { @@ -98,7 +131,7 @@ async fn plain_events( } else if accepts_json_or_all(req.headers()) { Ok(plain_events_json(req, evq, ch_conf, ctx, ncc).await?) } else { - let ret = response_err_msg(StatusCode::NOT_ACCEPTABLE, format!("unsupported accept {:?}", req))?; + let ret = error_response(format!("unsupported accept"), ctx.reqid()); Ok(ret) } } @@ -124,7 +157,10 @@ async fn plain_events_cbor_framed( tracing::Span::none() }; let stream = InstrumentStream::new(stream, logspan); - let ret = response(StatusCode::OK).body(body_stream(stream))?; + let ret = response(StatusCode::OK) + .header(CONTENT_TYPE, APP_CBOR_FRAMED) + .header(HEADER_NAME_REQUEST_ID, ctx.reqid()) + .body(body_stream(stream))?; Ok(ret) } @@ -139,7 +175,10 @@ async fn plain_events_json_framed( let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); let stream = streams::plaineventsjson::plain_events_json_stream(&evq, ch_conf, ctx, Box::pin(open_bytes)).await?; let stream = bytes_chunks_to_len_framed_str(stream); - let ret = response(StatusCode::OK).body(body_stream(stream))?; + let ret = response(StatusCode::OK) + .header(CONTENT_TYPE, APP_JSON_FRAMED) + .header(HEADER_NAME_REQUEST_ID, ctx.reqid()) + .body(body_stream(stream))?; Ok(ret) } @@ -167,12 +206,15 @@ async fn plain_events_json( return Err(e.into()); } }; - let ret = response(StatusCode::OK).body(ToJsonBody::from(&item).into_body())?; + let ret = response(StatusCode::OK) + .header(CONTENT_TYPE, APP_JSON) + .header(HEADER_NAME_REQUEST_ID, ctx.reqid()) + .body(ToJsonBody::from(&item).into_body())?; debug!("{self_name} response created"); Ok(ret) } -fn bytes_chunks_to_framed(stream: S) -> impl Stream> +fn bytes_chunks_to_framed(stream: S) -> impl Stream> where S: Stream>, T: Into, @@ -191,19 +233,19 @@ where b2.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]); let mut b3 = BytesMut::with_capacity(16); b3.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0][..pad]); - stream::iter([Ok::<_, Error>(b2.freeze()), Ok(buf), Ok(b3.freeze())]) + stream::iter([Ok::<_, crate::err::Error>(b2.freeze()), Ok(buf), Ok(b3.freeze())]) } Err(e) => { - let e = Error::with_msg_no_trace(e.to_string()); + let e = crate::err::Error::with_msg_no_trace(e.to_string()); stream::iter([Err(e), Ok(Bytes::new()), Ok(Bytes::new())]) } }) .filter(|x| if let Ok(x) = x { ready(x.len() > 0) } else { ready(true) }) } -fn bytes_chunks_to_len_framed_str(stream: S) -> impl Stream> +fn bytes_chunks_to_len_framed_str(stream: S) -> impl Stream> where - S: Stream>, + S: Stream>, T: Into, { use future::ready; @@ -214,10 +256,10 @@ where let s = y.into(); let mut b2 = String::with_capacity(16); write!(b2, "\n{}\n", s.len()).unwrap(); - stream::iter([Ok::<_, Error>(b2), Ok(s)]) + stream::iter([Ok::<_, crate::err::Error>(b2), Ok(s)]) } Err(e) => { - let e = Error::with_msg_no_trace(e.to_string()); + let e = crate::err::Error::with_msg_no_trace(e.to_string()); stream::iter([Err(e), Ok(String::new())]) } }) diff --git a/crates/httpret/src/bodystream.rs b/crates/httpret/src/bodystream.rs index f306ac2..68e68c7 100644 --- a/crates/httpret/src/bodystream.rs +++ b/crates/httpret/src/bodystream.rs @@ -17,14 +17,6 @@ where Response::builder().status(status) } -pub fn response_err_msg(status: StatusCode, msg: T) -> Result -where - T: ToString, -{ - let ret = response(status).body(body_string(msg))?; - Ok(ret) -} - pub trait ToPublicResponse { fn to_public_response(&self) -> StreamResponse; } diff --git a/crates/httpret/src/channelconfig.rs b/crates/httpret/src/channelconfig.rs index e2803e0..39a2d9c 100644 --- a/crates/httpret/src/channelconfig.rs +++ b/crates/httpret/src/channelconfig.rs @@ -1,7 +1,6 @@ -use crate::err::Error; use crate::response; use crate::ServiceSharedResources; -use crate::ToPublicResponse; +use core::fmt; use dbconn::create_connection; use dbconn::worker::PgQueue; use futures_util::StreamExt; @@ -36,12 +35,191 @@ use netpod::APP_JSON; use nodenet::configquorum::find_config_basics_quorum; use query::api4::binned::BinnedQuery; use query::api4::events::PlainEventsQuery; -use scyllaconn::errconv::ErrConv; use serde::Deserialize; use serde::Serialize; use std::collections::BTreeMap; use url::Url; +#[derive(Debug)] +pub enum Error { + NotFound(SfDbChannel), + ConfigQuorum(nodenet::configquorum::Error), + ConfigNode(nodenet::channelconfig::Error), + Http(crate::Error), + HttpCrate(http::Error), + // TODO create dedicated error type for query parsing + BadQuery(err::Error), + MissingBackend, + MissingScalarType, + MissingShape, + MissingShapeKind, + MissingEdge, + Uri(netpod::UriError), + ChannelConfigQuery(err::Error), + ExpectScyllaBackend, + Pg(dbconn::pg::Error), + Scylla(String), + Join, + OtherErr(err::Error), + PgWorker(dbconn::worker::Error), + Async(netpod::AsyncChannelError), + ChannelConfig(dbconn::channelconfig::Error), +} + +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let name = "HttpChannelConfigError"; + write!(fmt, "{name}(")?; + match self { + Error::NotFound(chn) => write!(fmt, "NotFound({chn}")?, + Error::ConfigQuorum(e) => write!(fmt, "ConfigQuorum({e})")?, + Error::ConfigNode(e) => write!(fmt, "ConfigNode({e})")?, + Error::Http(e) => write!(fmt, "Http({e})")?, + Error::HttpCrate(e) => write!(fmt, "HttpCrate({e})")?, + Error::BadQuery(e) => write!(fmt, "BadQuery({e})")?, + Error::MissingBackend => write!(fmt, "MissingBackend")?, + Error::MissingScalarType => write!(fmt, "MissingScalarType")?, + Error::MissingShape => write!(fmt, "MissingShape")?, + Error::MissingShapeKind => write!(fmt, "MissingShapeKind")?, + Error::MissingEdge => write!(fmt, "MissingEdge")?, + Error::Uri(x) => write!(fmt, "Uri({x})")?, + Error::ChannelConfigQuery(e) => write!(fmt, "ChannelConfigQuery({e})")?, + Error::ExpectScyllaBackend => write!(fmt, "ExpectScyllaBackend")?, + Error::Pg(e) => write!(fmt, "Pg({e})")?, + Error::Scylla(e) => write!(fmt, "Scylla({e})")?, + Error::Join => write!(fmt, "Join")?, + Error::OtherErr(e) => write!(fmt, "OtherErr({e})")?, + Error::PgWorker(e) => write!(fmt, "PgWorker({e})")?, + Error::Async(e) => write!(fmt, "Async({e})")?, + Error::ChannelConfig(e) => write!(fmt, "ChannelConfig({e})")?, + } + write!(fmt, ")")?; + Ok(()) + } +} + +fn other_err_error(e: err::Error) -> Error { + Error::OtherErr(e) +} + +impl std::error::Error for Error {} + +impl From for Error { + fn from(e: crate::Error) -> Self { + Self::Http(e) + } +} +impl From for Error { + fn from(e: http::Error) -> Self { + Self::HttpCrate(e) + } +} + +impl From for Error { + fn from(e: nodenet::configquorum::Error) -> Self { + use nodenet::configquorum::Error::*; + match e { + NotFound(a) => Self::NotFound(a), + _ => Self::ConfigQuorum(e), + } + } +} + +impl From for Error { + fn from(e: nodenet::channelconfig::Error) -> Self { + match e { + nodenet::channelconfig::Error::NotFoundChannel(a) => Self::NotFound(a), + _ => Self::ConfigNode(e), + } + } +} + +impl From for Error { + fn from(e: netpod::UriError) -> Self { + Self::Uri(e) + } +} + +impl From for Error { + fn from(e: dbconn::pg::Error) -> Self { + Self::Pg(e) + } +} + +impl From for Error { + fn from(e: dbconn::worker::Error) -> Self { + Self::PgWorker(e) + } +} + +impl From for Error { + fn from(e: scyllaconn::scylla::cql_to_rust::FromRowError) -> Self { + Self::Scylla(e.to_string()) + } +} + +impl From for Error { + fn from(e: scyllaconn::scylla::transport::errors::QueryError) -> Self { + Self::Scylla(e.to_string()) + } +} + +impl From for Error { + fn from(e: scyllaconn::scylla::transport::iterator::NextRowError) -> Self { + Self::Scylla(e.to_string()) + } +} + +impl From for Error { + fn from(_e: taskrun::tokio::task::JoinError) -> Self { + Self::Join + } +} + +impl From for Error { + fn from(e: netpod::AsyncChannelError) -> Self { + Self::Async(e) + } +} + +impl From for Error { + fn from(e: dbconn::channelconfig::Error) -> Self { + Self::ChannelConfig(e) + } +} + +impl From for crate::err::Error { + fn from(e: Error) -> Self { + Self::with_msg_no_trace(format!("{e} TODO add public message")) + } +} + +impl Error { + fn to_public_response(self) -> http::Response { + use httpclient::internal_error; + let status = StatusCode::INTERNAL_SERVER_ERROR; + let js = serde_json::json!({ + "message": self.to_string(), + }); + if let Ok(body) = serde_json::to_string_pretty(&js) { + match response(status) + .header(http::header::CONTENT_TYPE, APP_JSON) + .body(body_string(body)) + { + Ok(res) => res, + Err(e) => { + error!("can not generate http error response {e}"); + internal_error() + } + } + } else { + internal_error() + } + } +} + +impl crate::IntoBoxedError for Error {} + pub async fn chconf_from_events_quorum( q: &PlainEventsQuery, ctx: &ReqCtx, @@ -118,7 +296,7 @@ impl ChannelConfigHandler { node_config: &NodeConfigCached, ) -> Result { let url = req_uri_to_url(req.uri())?; - let q = ChannelConfigQuery::from_url(&url)?; + let q = ChannelConfigQuery::from_url(&url).map_err(|e| Error::BadQuery(e))?; let conf = nodenet::channelconfig::channel_config(q.range.clone(), q.channel.clone(), pgqueue, node_config).await?; match conf { @@ -176,7 +354,7 @@ impl ChannelConfigsHandler { async fn channel_configs(&self, req: Requ, ncc: &NodeConfigCached) -> Result { info!("channel_configs"); let url = req_uri_to_url(req.uri())?; - let q = ChannelConfigQuery::from_url(&url)?; + let q = ChannelConfigQuery::from_url(&url).map_err(|e| Error::BadQuery(e))?; info!("channel_configs for q {q:?}"); let ch_confs = nodenet::channelconfig::channel_configs(q.channel, ncc).await?; let ret = response(StatusCode::OK) @@ -235,7 +413,7 @@ impl ChannelConfigQuorumHandler { ) -> Result { info!("channel_config_quorum"); let url = req_uri_to_url(req.uri())?; - let q = ChannelConfigQuery::from_url(&url)?; + let q = ChannelConfigQuery::from_url(&url).map_err(|e| Error::ChannelConfigQuery(e))?; info!("channel_config_quorum for q {q:?}"); let ch_confs = nodenet::configquorum::find_config_basics_quorum(q.channel, q.range.into(), ctx, pgqueue, ncc).await?; @@ -266,12 +444,12 @@ impl FromUrl for ChannelsWithTypeQuery { fn from_pairs(pairs: &BTreeMap) -> Result { let s = pairs .get("scalar_type") - .ok_or_else(|| Error::with_public_msg_no_trace("missing scalar_type"))?; + .ok_or_else(|| err::Error::with_public_msg_no_trace("missing scalar_type"))?; //let scalar_type = ScalarType::from_bsread_str(s)?; let scalar_type: ScalarType = serde_json::from_str(&format!("\"{s}\""))?; let s = pairs .get("shape") - .ok_or_else(|| Error::with_public_msg_no_trace("missing shape"))?; + .ok_or_else(|| err::Error::with_public_msg_no_trace("missing shape"))?; let shape = Shape::from_dims_str(s)?; Ok(Self { scalar_type, shape }) } @@ -302,19 +480,19 @@ impl FromUrl for ScyllaChannelEventSeriesIdQuery { fn from_pairs(pairs: &BTreeMap) -> Result { let backend = pairs .get("backend") - .ok_or_else(|| Error::with_public_msg_no_trace("missing backend"))? + .ok_or_else(|| err::Error::with_public_msg_no_trace("missing backend"))? .into(); let name = pairs .get("channelName") - .ok_or_else(|| Error::with_public_msg_no_trace("missing channelName"))? + .ok_or_else(|| err::Error::with_public_msg_no_trace("missing channelName"))? .into(); let s = pairs .get("scalarType") - .ok_or_else(|| Error::with_public_msg_no_trace("missing scalarType"))?; + .ok_or_else(|| err::Error::with_public_msg_no_trace("missing scalarType"))?; let scalar_type: ScalarType = serde_json::from_str(&format!("\"{s}\""))?; let s = pairs .get("shape") - .ok_or_else(|| Error::with_public_msg_no_trace("missing shape"))?; + .ok_or_else(|| err::Error::with_public_msg_no_trace("missing shape"))?; let shape = Shape::from_dims_str(s)?; let do_create = pairs.get("doCreate").map_or("false", |x| x.as_str()) == "true"; Ok(Self { @@ -351,15 +529,15 @@ impl FromUrl for ScyllaChannelsActiveQuery { fn from_pairs(pairs: &BTreeMap) -> Result { let s = pairs .get("tsedge") - .ok_or_else(|| Error::with_public_msg_no_trace("missing tsedge"))?; + .ok_or_else(|| err::Error::with_public_msg_no_trace("missing tsedge"))?; let tsedge: u64 = s.parse()?; let s = pairs .get("shapeKind") - .ok_or_else(|| Error::with_public_msg_no_trace("missing shapeKind"))?; + .ok_or_else(|| err::Error::with_public_msg_no_trace("missing shapeKind"))?; let shape_kind: u32 = s.parse()?; let s = pairs .get("scalarType") - .ok_or_else(|| Error::with_public_msg_no_trace("missing scalarType"))?; + .ok_or_else(|| err::Error::with_public_msg_no_trace("missing scalarType"))?; let scalar_type: ScalarType = serde_json::from_str(&format!("\"{s}\""))?; info!("parsed scalar type inp: {s:?} val: {scalar_type:?}"); Ok(Self { @@ -390,7 +568,7 @@ impl ScyllaChannelsActive { .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); if accept == APP_JSON || accept == ACCEPT_ALL { let url = req_uri_to_url(req.uri())?; - let q = ScyllaChannelsActiveQuery::from_url(&url)?; + let q = ScyllaChannelsActiveQuery::from_url(&url).map_err(|e| Error::BadQuery(e))?; let res = self.get_channels(&q, node_config).await?; let body = ToJsonBody::from(&res).into_body(); Ok(response(StatusCode::OK).body(body)?) @@ -411,8 +589,10 @@ impl ScyllaChannelsActive { .node_config .cluster .scylla_st() - .ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?; - let scy = scyllaconn::conn::create_scy_session(scyco).await?; + .ok_or_else(|| Error::ExpectScyllaBackend)?; + let scy = scyllaconn::conn::create_scy_session(scyco) + .await + .map_err(other_err_error)?; // Database stores tsedge/ts_msp in units of (10 sec), and we additionally map to the grid. let tsedge = q.tsedge / 10 / (6 * 2) * (6 * 2); info!( @@ -427,11 +607,10 @@ impl ScyllaChannelsActive { "select series from series_by_ts_msp where part = ? and ts_msp = ? and shape_kind = ? and scalar_type = ?", (part as i32, tsedge as i32, q.shape_kind as i32, q.scalar_type.to_scylla_i32()), ) - .await - .err_conv()?; + .await.map_err(|e| Error::Scylla(e.to_string()))?; while let Some(row) = res.next().await { - let row = row.err_conv()?; - let (series,): (i64,) = row.into_typed().err_conv()?; + let row = row?; + let (series,): (i64,) = row.into_typed()?; ret.push(series as u64); } } @@ -456,11 +635,11 @@ impl FromUrl for IocForChannelQuery { fn from_pairs(pairs: &BTreeMap) -> Result { let backend = pairs .get("backend") - .ok_or_else(|| Error::with_public_msg_no_trace("missing backend"))? + .ok_or_else(|| err::Error::with_public_msg_no_trace("missing backend"))? .into(); let name = pairs .get("channelName") - .ok_or_else(|| Error::with_public_msg_no_trace("missing channelName"))? + .ok_or_else(|| err::Error::with_public_msg_no_trace("missing channelName"))? .into(); Ok(Self { backend, name }) } @@ -492,16 +671,13 @@ impl IocForChannel { .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); if accept == APP_JSON || accept == ACCEPT_ALL { let url = req_uri_to_url(req.uri())?; - let q = IocForChannelQuery::from_url(&url)?; + let q = IocForChannelQuery::from_url(&url).map_err(|e| Error::BadQuery(e))?; match self.find(&q, node_config).await { Ok(k) => { let body = ToJsonBody::from(&k).into_body(); Ok(response(StatusCode::OK).body(body)?) } - Err(e) => { - let body = body_string(format!("{:?}", e.public_msg())); - Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body)?) - } + Err(e) => Ok(e.to_public_response()), } } else { Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?) @@ -517,7 +693,7 @@ impl IocForChannel { node_config: &NodeConfigCached, ) -> Result, Error> { let dbconf = &node_config.node_config.cluster.database; - let (pg_client, pgjh) = create_connection(dbconf).await?; + let (pg_client, pgjh) = create_connection(dbconf).await.map_err(other_err_error)?; let rows = pg_client .query( "select addr from ioc_by_channel where facility = $1 and channel = $2", @@ -525,7 +701,7 @@ impl IocForChannel { ) .await?; drop(pg_client); - pgjh.await??; + pgjh.await?.map_err(other_err_error)?; if let Some(row) = rows.first() { let ioc_addr = row.get(0); let ret = IocForChannelRes { ioc_addr }; @@ -593,14 +769,13 @@ impl ScyllaSeriesTsMsp { .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); if accept == APP_JSON || accept == ACCEPT_ALL { let url = req_uri_to_url(req.uri())?; - let q = ScyllaSeriesTsMspQuery::from_url(&url)?; + let q = ScyllaSeriesTsMspQuery::from_url(&url).map_err(|e| Error::BadQuery(e))?; match self.get_ts_msps(&q, shared_res).await { Ok(k) => { let body = ToJsonBody::from(&k).into_body(); Ok(response(StatusCode::OK).body(body)?) } - Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR) - .body(body_string(format!("{:?}", e.public_msg())))?), + Err(e) => Ok(e.to_public_response()), } } else { Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?) @@ -623,12 +798,7 @@ impl ScyllaSeriesTsMsp { let chconf = shared_res .pgqueue .chconf_best_matching_name_range(q.channel.clone(), nano_range) - .await - .map_err(|e| Error::with_msg_no_trace(format!("error from pg worker: {e}")))? - .recv() - .await - .unwrap() - .unwrap(); + .await??; use scyllaconn::SeriesId; let sid = SeriesId::new(chconf.series()); let scyqueue = shared_res.scyqueue.clone().unwrap(); @@ -710,8 +880,7 @@ impl AmbigiousChannelNames { let body = ToJsonBody::from(&k).into_body(); Ok(response(StatusCode::OK).body(body)?) } - Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR) - .body(body_string(format!("{:?}", e.public_msg())))?), + Err(e) => Ok(e.to_public_response()), } } else { Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?) @@ -723,7 +892,7 @@ impl AmbigiousChannelNames { async fn process(&self, ncc: &NodeConfigCached) -> Result { let dbconf = &ncc.node_config.cluster.database; - let (pg_client, pgjh) = create_connection(dbconf).await?; + let (pg_client, pgjh) = create_connection(dbconf).await.map_err(other_err_error)?; let rows = pg_client .query( "select t2.series, t2.channel, t2.scalar_type, t2.shape_dims, t2.agg_kind from series_by_channel t1, series_by_channel t2 where t2.channel = t1.channel and t2.series != t1.series", @@ -731,14 +900,14 @@ impl AmbigiousChannelNames { ) .await?; drop(pg_client); - pgjh.await??; + pgjh.await?.map_err(other_err_error)?; let mut ret = AmbigiousChannelNamesResponse { ambigious: Vec::new() }; for row in rows { let g = AmbigiousChannel { series: row.get::<_, i64>(0) as u64, name: row.get(1), - scalar_type: ScalarType::from_scylla_i32(row.get(2))?, - shape: Shape::from_scylla_shape_dims(&row.get::<_, Vec>(3))?, + scalar_type: ScalarType::from_scylla_i32(row.get(2)).map_err(other_err_error)?, + shape: Shape::from_scylla_shape_dims(&row.get::<_, Vec>(3)).map_err(other_err_error)?, }; ret.ambigious.push(g); } @@ -798,8 +967,7 @@ impl GenerateScyllaTestData { let body = ToJsonBody::from(&k).into_body(); Ok(response(StatusCode::OK).body(body)?) } - Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR) - .body(body_string(format!("{:?}", e.public_msg())))?), + Err(e) => Ok(e.to_public_response()), } } else { Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?) @@ -811,25 +979,24 @@ impl GenerateScyllaTestData { async fn process(&self, node_config: &NodeConfigCached) -> Result<(), Error> { let scyconf = node_config.node_config.cluster.scylla_st().unwrap(); - let scy = scyllaconn::conn::create_scy_session(scyconf).await?; + let scy = scyllaconn::conn::create_scy_session(scyconf) + .await + .map_err(other_err_error)?; let series: u64 = 42001; // TODO query `ts_msp` for all MSP values und use that to delete from event table first. // Only later delete also from the `ts_msp` table. let it = scy .query_iter("select ts_msp from ts_msp where series = ?", (series as i64,)) - .await - .err_conv()?; + .await?; let mut it = it.into_typed::<(i64,)>(); while let Some(row) = it.next().await { - let row = row.map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let row = row?; let values = (series as i64, row.0); scy.query("delete from events_scalar_f64 where series = ? and ts_msp = ?", values) - .await - .err_conv()?; + .await?; } scy.query("delete from ts_msp where series = ?", (series as i64,)) - .await - .err_conv()?; + .await?; // Generate let (msps, lsps, pulses, vals) = test_data_f64_01(); @@ -840,8 +1007,7 @@ impl GenerateScyllaTestData { "insert into ts_msp (series, ts_msp) values (?, ?)", (series as i64, msp as i64), ) - .await - .err_conv()?; + .await?; } last = msp; } @@ -850,8 +1016,7 @@ impl GenerateScyllaTestData { "insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)", (series as i64, msp as i64, lsp as i64, pulse as i64, val), ) - .await - .err_conv()?; + .await?; } Ok(()) } diff --git a/crates/httpret/src/err.rs b/crates/httpret/src/err.rs index 0606233..1ff3e8e 100644 --- a/crates/httpret/src/err.rs +++ b/crates/httpret/src/err.rs @@ -1,7 +1,6 @@ use err::ToPublicError; use serde::Deserialize; use serde::Serialize; -use serde_json::Value as JsVal; use std::fmt; use taskrun::tokio; @@ -108,3 +107,6 @@ impl Convable for std::array::TryFromSliceError {} impl Convable for err::anyhow::Error {} impl Convable for crate::RetrievalError {} impl Convable for httpclient::Error {} +impl Convable for netpod::UriError {} +impl Convable for nodenet::configquorum::Error {} +impl Convable for nodenet::channelconfig::Error {} diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index dcb1949..1dec158 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -87,6 +87,8 @@ impl IntoBoxedError for api4::databuffer_tools::FindActiveError {} impl IntoBoxedError for std::string::FromUtf8Error {} impl IntoBoxedError for std::io::Error {} impl IntoBoxedError for dbconn::worker::Error {} +impl IntoBoxedError for netpod::UriError {} +impl IntoBoxedError for crate::api4::binned::Error {} impl From for RetrievalError where diff --git a/crates/httpret/src/proxy.rs b/crates/httpret/src/proxy.rs index 8982828..00c93cd 100644 --- a/crates/httpret/src/proxy.rs +++ b/crates/httpret/src/proxy.rs @@ -4,7 +4,6 @@ pub mod api4; use crate::api1::channel_search_configs_v1; use crate::api1::channel_search_list_v1; use crate::api1::gather_json_2_v1; -use crate::bodystream::response_err_msg; use crate::err::Error; use crate::gather::gather_get_json_generic; use crate::gather::SubRes; @@ -21,6 +20,7 @@ use http::StatusCode; use httpclient::body_empty; use httpclient::body_stream; use httpclient::body_string; +use httpclient::error_response; use httpclient::http; use httpclient::http::header; use httpclient::read_body_bytes; @@ -509,9 +509,9 @@ where let mut query = match QT::from_url(&url) { Ok(k) => k, Err(_) => { - let msg = format!("malformed request or missing parameters {:?}", req.uri()); + let msg = format!("malformed request or missing parameters {}", req.uri()); warn!("{msg}"); - return Ok(response_err_msg(StatusCode::BAD_REQUEST, msg)?); + return Ok(error_response(msg, ctx.reqid())); } }; trace!("proxy_backend_query {:?} {:?}", query, req.uri()); diff --git a/crates/httpret/src/proxy/api4/events.rs b/crates/httpret/src/proxy/api4/events.rs index 322e988..e9f1331 100644 --- a/crates/httpret/src/proxy/api4/events.rs +++ b/crates/httpret/src/proxy/api4/events.rs @@ -8,6 +8,7 @@ use crate::ReqCtx; use http::header; use http::Method; use http::Request; +use http::Response; use http::StatusCode; use http::Uri; use httpclient::body_empty; @@ -68,7 +69,7 @@ impl EventsHandler { let url = req_uri_to_url(&head.uri)?; let pairs = get_url_query_pairs(&url); let evq = PlainEventsQuery::from_pairs(&pairs)?; - debug!("{:?}", evq); + debug!("handle_framed {evq:?}"); let query_host = get_query_host_for_backend(evq.backend(), proxy_config)?; let url_str = format!( "{}{}", @@ -92,10 +93,14 @@ impl EventsHandler { let (head, body) = res.into_parts(); if head.status != StatusCode::OK { warn!("backend returned error: {head:?}"); - Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?) - } else { - debug!("backend returned OK"); - Ok(response(StatusCode::OK).body(body_stream(StreamIncoming::new(body)))?) } + let mut resb = Response::builder().status(head.status); + for h in head.headers { + if let (Some(hn), hv) = h { + resb = resb.header(hn, hv); + } + } + let res = resb.body(body_stream(StreamIncoming::new(body)))?; + Ok(res) } } diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index dd21331..9d35922 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -63,7 +63,9 @@ use bytes::Bytes; use chrono::DateTime; use chrono::TimeZone; use chrono::Utc; +use err::thiserror; use err::Error; +use err::ThisError; use futures_util::Stream; use futures_util::StreamExt; use http::Request; @@ -103,6 +105,7 @@ pub const APP_CBOR_FRAMED: &str = "application/cbor-framed"; pub const APP_JSON_FRAMED: &str = "application/json-framed"; pub const ACCEPT_ALL: &str = "*/*"; pub const X_DAQBUF_REQID: &str = "x-daqbuffer-request-id"; +pub const HEADER_NAME_REQUEST_ID: &str = "requestid"; pub const CONNECTION_STATUS_DIV: DtMs = DtMs::from_ms_u64(1000 * 60 * 60); // pub const TS_MSP_GRID_UNIT: DtMs = DtMs::from_ms_u64(1000 * 10); @@ -176,6 +179,13 @@ impl CmpZero for usize { } } +#[derive(Debug, err::ThisError)] +#[cstm(name = "AsyncChannelError")] +pub enum AsyncChannelError { + Send, + Recv, +} + pub struct BodyStream { //pub receiver: async_channel::Receiver>, pub inner: Box> + Send + Unpin>, @@ -1071,6 +1081,16 @@ impl SfDbChannel { } } +impl fmt::Display for SfDbChannel { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!( + fmt, + "SfDbChannel {{ series: {:?}, backend: {:?}, name: {:?}, kind: {:?} }}", + self.series, self.backend, self.name, self.kind + ) + } +} + impl FromUrl for SfDbChannel { fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); @@ -4123,14 +4143,18 @@ pub fn status_board_init() { }); } -pub fn req_uri_to_url(uri: &Uri) -> Result { +#[derive(Debug, ThisError)] +#[cstm(name = "UriError")] +pub enum UriError { + ParseError(Uri), +} + +pub fn req_uri_to_url(uri: &Uri) -> Result { if uri.scheme().is_none() { format!("dummy:{uri}") .parse() - .map_err(|_| Error::with_msg_no_trace(format!("can not use uri {uri}"))) + .map_err(|_| UriError::ParseError(uri.clone())) } else { - uri.to_string() - .parse() - .map_err(|_| Error::with_msg_no_trace(format!("can not use uri {uri}"))) + uri.to_string().parse().map_err(|_| UriError::ParseError(uri.clone())) } } diff --git a/crates/netpod/src/range/evrange.rs b/crates/netpod/src/range/evrange.rs index 05ff755..d2af8cf 100644 --- a/crates/netpod/src/range/evrange.rs +++ b/crates/netpod/src/range/evrange.rs @@ -60,6 +60,12 @@ impl fmt::Debug for NanoRange { } } +impl fmt::Display for NanoRange { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(self, fmt) + } +} + impl NanoRange { pub fn from_date_time(beg: DateTime, end: DateTime) -> Self { Self { diff --git a/crates/nodenet/src/channelconfig.rs b/crates/nodenet/src/channelconfig.rs index 8487f71..862fcbf 100644 --- a/crates/nodenet/src/channelconfig.rs +++ b/crates/nodenet/src/channelconfig.rs @@ -1,5 +1,6 @@ use dbconn::worker::PgQueue; -use err::Error; +use err::thiserror; +use err::ThisError; use httpclient::url::Url; use netpod::log::*; use netpod::range::evrange::NanoRange; @@ -20,6 +21,50 @@ use netpod::Shape; use netpod::APP_JSON; use serde::Serialize; +#[derive(Debug, ThisError)] +#[cstm(name = "ChannelConfigNode")] +pub enum Error { + NotFoundChannel(SfDbChannel), + ChannelConfig(dbconn::channelconfig::Error), + DbWorker(#[from] dbconn::worker::Error), + DiskConfig(#[from] disk::channelconfig::ConfigError), + BackendConfigError, + BadTestSetup, + HttpReqError, + HttpClient(#[from] httpclient::Error), + ConfigParse(#[from] disk::parse::channelconfig::ConfigParseError), + JsonParse(#[from] serde_json::Error), + SearchWithGivenSeries, + AsyncSend, + AsyncRecv, + Todo, +} + +impl From for Error { + fn from(_value: async_channel::RecvError) -> Self { + Error::AsyncRecv + } +} + +impl From for Error { + fn from(value: netpod::AsyncChannelError) -> Self { + match value { + netpod::AsyncChannelError::Send => Self::AsyncSend, + netpod::AsyncChannelError::Recv => Self::AsyncRecv, + } + } +} + +impl From for Error { + fn from(value: dbconn::channelconfig::Error) -> Self { + use dbconn::channelconfig::Error::*; + match value { + NotFound(chn, _) => Self::NotFoundChannel(chn), + _ => Self::ChannelConfig(value), + } + } +} + const TEST_BACKEND: &str = "testbackend-00"; fn channel_config_test_backend(channel: SfDbChannel) -> Result { @@ -92,8 +137,7 @@ fn channel_config_test_backend(channel: SfDbChannel) -> Result { debug!("channel_config config {config:?}"); @@ -135,10 +175,7 @@ pub async fn channel_config( None => Ok(None), } } else { - return Err( - Error::with_msg_no_trace(format!("no channel config for backend {}", channel.backend())) - .add_public_msg(format!("no channel config for backend {}", channel.backend())), - ); + Err(Error::BackendConfigError) } } @@ -154,7 +191,7 @@ pub async fn channel_configs(channel: SfDbChannel, ncc: &NodeConfigCached) -> Re ChannelTypeConfigGen::Scylla(x) => ChannelConfigsGen::Scylla(x), ChannelTypeConfigGen::SfDatabuffer(_) => { // ChannelConfigsGen::SfDatabuffer(todo!()) - let e = Error::with_msg_no_trace("channel_configs test backend TODO SfDatabuffer"); + let e = Error::BadTestSetup; warn!("{e}"); return Err(e); } @@ -168,15 +205,10 @@ pub async fn channel_configs(channel: SfDbChannel, ncc: &NodeConfigCached) -> Re Ok(ChannelConfigsGen::Scylla(ret)) } else if ncc.node.sf_databuffer.is_some() { debug!("channel_config channel {channel:?}"); - let configs = disk::channelconfig::channel_configs(channel.clone(), ncc) - .await - .map_err(|e| Error::from(e.to_string()))?; + let configs = disk::channelconfig::channel_configs(channel.clone(), ncc).await?; Ok(ChannelConfigsGen::SfDatabuffer(configs)) } else { - return Err( - Error::with_msg_no_trace(format!("no channel config for backend {}", channel.backend())) - .add_public_msg(format!("no channel config for backend {}", channel.backend())), - ); + return Err(Error::BackendConfigError); } } @@ -196,12 +228,9 @@ pub async fn http_get_channel_config( let ret: ChannelConfigResponse = serde_json::from_slice(&res.body)?; Ok(Some(ret)) } else { - let b = &res.body; - let s = String::from_utf8_lossy(&b[0..b.len().min(256)]); - Err(Error::with_msg_no_trace(format!( - "http_get_channel_config {} {}", - res.head.status, s - ))) + // let b = &res.body; + // let s = String::from_utf8_lossy(&b[0..b.len().min(256)]); + Err(Error::HttpReqError) } } @@ -210,43 +239,21 @@ async fn scylla_chconf_from_sf_db_channel( channel: SfDbChannel, pgqueue: &PgQueue, ) -> Result { + trace!("scylla_chconf_from_sf_db_channel {:?}", channel); if let Some(series) = channel.series() { - let ret = pgqueue - .chconf_for_series(channel.backend(), series) - .await? - .recv() - .await??; + let ret = pgqueue.chconf_for_series(channel.backend(), series).await??; Ok(ret) } else { // TODO let called function allow to return None instead of error-not-found - let ret = pgqueue - .chconf_best_matching_name_range(channel, range) - .await? - .recv() - .await??; + let ret = pgqueue.chconf_best_matching_name_range(channel, range).await??; Ok(ret) } } async fn scylla_all_chconf_from_sf_db_channel(channel: &SfDbChannel, _ncc: &NodeConfigCached) -> Result { if let Some(_) = channel.series() { - let e = Error::with_msg_no_trace(format!( - "scylla_all_chconf_from_sf_db_channel but series anyways specified {channel:?}" - )); - // dbconn::channelconfig::chconf_for_series(channel.backend(), series, ncc).await - warn!("{e}"); - Err(e) + Err(Error::SearchWithGivenSeries) } else { - #[cfg(DISABLED)] - { - // TODO let called function allow to return None instead of error-not-found - let ret = dbconn::channelconfig::chconf_from_scylla_type_backend(&channel, ncc) - .await - .map_err(Error::from)?; - Ok(Some(ChannelTypeConfigGen::Scylla(ret))) - } - let e = Error::with_msg_no_trace(format!("scylla_all_chconf_from_sf_db_channel TODO")); - warn!("{e}"); - Err(e) + Err(Error::Todo) } } diff --git a/crates/nodenet/src/configquorum.rs b/crates/nodenet/src/configquorum.rs index b5618d0..a91b657 100644 --- a/crates/nodenet/src/configquorum.rs +++ b/crates/nodenet/src/configquorum.rs @@ -1,6 +1,7 @@ use crate::channelconfig::http_get_channel_config; use dbconn::worker::PgQueue; -use err::Error; +use err::thiserror; +use err::ThisError; use netpod::log::*; use netpod::range::evrange::SeriesRange; use netpod::ChConf; @@ -16,6 +17,30 @@ use std::collections::BTreeMap; use std::time::Duration; use taskrun::tokio; +#[derive(Debug, ThisError)] +#[cstm(name = "ConfigQuorum")] +pub enum Error { + NotFound(SfDbChannel), + MissingTimeRange, + Timeout, + ChannelConfig(crate::channelconfig::Error), + ExpectSfDatabufferBackend, + UnsupportedBackend, + BadTimeRange, + DbWorker(#[from] dbconn::worker::Error), + FindChannel(#[from] dbconn::FindChannelError), +} + +impl From for Error { + fn from(value: crate::channelconfig::Error) -> Self { + use crate::channelconfig::Error::*; + match value { + NotFoundChannel(chn) => Self::NotFound(chn), + _ => Self::ChannelConfig(value), + } + } +} + fn decide_sf_ch_config_quorum(inp: Vec) -> Result, Error> { let mut histo = BTreeMap::new(); for item in inp { @@ -55,7 +80,7 @@ async fn find_sf_ch_config_quorum( ) -> Result, Error> { let range = match range { SeriesRange::TimeRange(x) => x, - SeriesRange::PulseRange(_) => return Err(Error::with_msg_no_trace("expect TimeRange")), + SeriesRange::PulseRange(_) => return Err(Error::MissingTimeRange), }; let mut all = Vec::new(); for node in &ncc.node_config.cluster.nodes { @@ -71,16 +96,14 @@ async fn find_sf_ch_config_quorum( http_get_channel_config(qu, node.baseurl(), ctx), ) .await - .map_err(|_| Error::with_msg_no_trace("timeout"))??; + .map_err(|_| Error::Timeout)??; all.push(res); } let all: Vec<_> = all.into_iter().filter_map(|x| x).collect(); let qu = decide_sf_ch_config_quorum(all)?; match qu { Some(item) => match item { - ChannelTypeConfigGen::Scylla(_) => Err(Error::with_msg_no_trace( - "find_sf_ch_config_quorum not a sf-databuffer config", - )), + ChannelTypeConfigGen::Scylla(_) => Err(Error::ExpectSfDatabufferBackend), ChannelTypeConfigGen::SfDatabuffer(item) => Ok(Some(item)), }, None => Ok(None), @@ -98,11 +121,7 @@ pub async fn find_config_basics_quorum( if let Some(_cfg) = &ncc.node.sf_databuffer { let channel = if channel.name().is_empty() { if let Some(_) = channel.series() { - pgqueue - .find_sf_channel_by_series(channel) - .await - .map_err(|e| Error::with_msg_no_trace(e.to_string()))? - .map_err(|e| Error::with_msg_no_trace(e.to_string()))? + pgqueue.find_sf_channel_by_series(channel).await?? } else { channel } @@ -114,12 +133,10 @@ pub async fn find_config_basics_quorum( None => Ok(None), } } else if let Some(_) = &ncc.node_config.cluster.scylla_st() { - let range = netpod::range::evrange::NanoRange::try_from(&range)?; + let range = netpod::range::evrange::NanoRange::try_from(&range).map_err(|_| Error::BadTimeRange)?; let ret = crate::channelconfig::channel_config(range, channel, pgqueue, ncc).await?; Ok(ret) } else { - Err(Error::with_msg_no_trace( - "find_config_basics_quorum not supported backend", - )) + Err(Error::UnsupportedBackend) } } diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index 2c0b87f..f505b04 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -53,6 +53,15 @@ macro_rules! warn_item { }; } +#[allow(unused)] +macro_rules! trace_every_event { + ($($arg:tt)*) => { + if false { + trace!($($arg)*); + } + }; +} + #[derive(Debug, Clone)] pub struct EventReadOpts { pub with_values: bool, @@ -433,7 +442,7 @@ impl Stream for EventsStreamRt { use items_2::merger::Mergeable; trace_fetch!("ReadingBck FetchEvents got len {}", x.len()); for ts in Mergeable::tss(&x) { - trace_fetch!("ReadingBck FetchEvents ts {}", ts.fmt()); + trace_every_event!("ReadingBck FetchEvents ts {}", ts.fmt()); } if let Some(ix) = Mergeable::find_highest_index_lt(&x, self.range.beg().ns()) { trace_fetch!("ReadingBck FetchEvents find_highest_index_lt {:?}", ix); @@ -480,10 +489,10 @@ impl Stream for EventsStreamRt { }, ReadingState::FetchEvents(st2) => match st2.fut.poll_unpin(cx) { Ready(Ok(x)) => { + use items_2::merger::Mergeable; trace_fetch!("ReadingFwd FetchEvents got len {:?}", x.len()); - for ts_ns in x.tss() { - let ts = TsNano::from_ns(*ts_ns).to_ts_ms(); - trace_fetch!("ReadingFwd FetchEvents ts {}", ts.fmt()); + for ts in Mergeable::tss(&x) { + trace_every_event!("ReadingFwd FetchEvents ts {}", ts.fmt()); } self.out.push_back(x); self.setup_fwd_read(); diff --git a/crates/streams/src/plaineventscbor.rs b/crates/streams/src/plaineventscbor.rs index 356fefb..af59606 100644 --- a/crates/streams/src/plaineventscbor.rs +++ b/crates/streams/src/plaineventscbor.rs @@ -4,12 +4,19 @@ use crate::firsterr::non_empty; use crate::firsterr::only_first_err; use crate::plaineventsstream::dyn_events_stream; use crate::tcprawclient::OpenBoxedBytesStreamsBox; -use err::Error; +use err::thiserror; +use err::ThisError; use netpod::log::*; use netpod::ChannelTypeConfigGen; use netpod::ReqCtx; use query::api4::events::PlainEventsQuery; +#[derive(Debug, ThisError)] +#[cstm(name = "PlainEventsCbor")] +pub enum Error { + Stream(#[from] crate::plaineventsstream::Error), +} + pub async fn plain_events_cbor_stream( evq: &PlainEventsQuery, ch_conf: ChannelTypeConfigGen, diff --git a/crates/streams/src/plaineventsjson.rs b/crates/streams/src/plaineventsjson.rs index d599711..b578e7e 100644 --- a/crates/streams/src/plaineventsjson.rs +++ b/crates/streams/src/plaineventsjson.rs @@ -5,7 +5,8 @@ use crate::json_stream::events_stream_to_json_stream; use crate::json_stream::JsonStream; use crate::plaineventsstream::dyn_events_stream; use crate::tcprawclient::OpenBoxedBytesStreamsBox; -use err::Error; +use err::thiserror; +use err::ThisError; use futures_util::StreamExt; use items_0::collect_s::Collectable; use items_0::on_sitemty_data; @@ -17,6 +18,14 @@ use query::api4::events::PlainEventsQuery; use serde_json::Value as JsonValue; use std::time::Instant; +#[derive(Debug, ThisError)] +#[cstm(name = "PlainEventsJson")] +pub enum Error { + Stream(#[from] crate::plaineventsstream::Error), + Collect(err::Error), + Json(#[from] serde_json::Error), +} + pub async fn plain_events_json( evq: &PlainEventsQuery, ch_conf: ChannelTypeConfigGen, @@ -49,7 +58,8 @@ pub async fn plain_events_json( Some(evq.range().clone()), None, ) - .await?; + .await + .map_err(Error::Collect)?; debug!("plain_events_json collected"); let jsval = serde_json::to_value(&collected)?; debug!("plain_events_json json serialized"); diff --git a/crates/streams/src/plaineventsstream.rs b/crates/streams/src/plaineventsstream.rs index a53abf0..de63145 100644 --- a/crates/streams/src/plaineventsstream.rs +++ b/crates/streams/src/plaineventsstream.rs @@ -2,7 +2,8 @@ use crate::tcprawclient::container_stream_from_bytes_stream; use crate::tcprawclient::make_sub_query; use crate::tcprawclient::OpenBoxedBytesStreamsBox; use crate::transform::build_merged_event_transform; -use err::Error; +use err::thiserror; +use err::ThisError; use futures_util::Stream; use futures_util::StreamExt; use items_0::on_sitemty_data; @@ -18,6 +19,12 @@ use netpod::ReqCtx; use query::api4::events::PlainEventsQuery; use std::pin::Pin; +#[derive(Debug, ThisError)] +#[cstm(name = "PlainEventsStream")] +pub enum Error { + OtherErr(#[from] err::Error), +} + pub type DynEventsStream = Pin>> + Send>>; pub async fn dyn_events_stream( @@ -86,9 +93,9 @@ async fn transform_wasm( stream: INP, _wasmname: &str, _ctx: &ReqCtx, -) -> Result>>, Error>> + Send, Error> +) -> Result>>, err::Error>> + Send, err::Error> where - INP: Stream>>, Error>> + Send + 'static, + INP: Stream>>, err::Error>> + Send + 'static, { let ret: Pin>> + Send>> = Box::pin(stream); Ok(ret)