From f8b3c1533b7f56554d3cc74d4a5d7c2b54bae968 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 7 Mar 2025 11:46:51 +0100 Subject: [PATCH] Fix warnings --- .cargo/cargo-lock | 618 ++++++++++++++---- crates/daqbuffer/Cargo.toml | 2 +- crates/daqbufp2/Cargo.toml | 3 + crates/daqbufp2/src/test/api4/common.rs | 1 + crates/daqbufp2/src/test/api4/eventsjson.rs | 6 +- crates/daqbufp2/src/test/api4/pulseiddiff.rs | 4 +- crates/daqbufp2/src/test/archapp.rs | 2 +- crates/daqbufp2/src/test/binnedjson.rs | 6 +- .../src/test/binnedjson/channelarchiver.rs | 8 +- crates/daqbufp2/src/test/timeweightedjson.rs | 1 + crates/disk/Cargo.toml | 3 + crates/disk/src/channelconfig.rs | 2 +- crates/disk/src/eventchunkermultifile.rs | 2 +- crates/dq/Cargo.toml | 5 +- crates/dq/src/dq.rs | 42 -- crates/dq/src/lib.rs | 1 + crates/httpclient/Cargo.toml | 9 +- crates/httpret/src/api4/accounting.rs | 8 +- crates/httpret/src/api4/eventdata.rs | 2 +- crates/httpret/src/cache.rs | 1 + crates/httpret/src/http3.rs | 13 +- crates/httpret/src/httpret.rs | 6 +- crates/httpret/src/proxy/api4/backend.rs | 1 - crates/httpret/src/pulsemap.rs | 8 +- crates/nodenet/src/client.rs | 2 +- crates/nodenet/src/conn/test.rs | 6 +- crates/scyllaconn/src/accounting/toplist.rs | 1 - crates/scyllaconn/src/bincache.rs | 4 +- crates/scyllaconn/src/events.rs | 575 ---------------- crates/scyllaconn/src/events2/events.rs | 618 +++++++++++++++++- .../src/events2/onebeforeandbulk.rs | 70 +- crates/scyllaconn/src/events2/prepare.rs | 27 +- crates/scyllaconn/src/scyllaconn.rs | 1 - crates/scyllaconn/src/worker.rs | 8 +- crates/streamio/Cargo.toml | 4 - crates/streamio/src/tcprawclient.rs | 1 + crates/streamio/src/tcpreadasbytes.rs | 11 +- crates/taskrun/Cargo.toml | 2 + crates/taskrun/src/taskrun.rs | 10 +- 39 files changed, 1208 insertions(+), 886 deletions(-) delete mode 100644 crates/dq/src/dq.rs create mode 100644 crates/dq/src/lib.rs delete mode 100644 crates/scyllaconn/src/events.rs diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index e8763a3..85f9648 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -126,9 +126,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.95" +version = "1.0.97" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04" +checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f" [[package]] name = "arc-swap" @@ -189,9 +189,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.86" +version = "0.1.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "644dd749086bf3771a2fbc5f256fdb982d53f011c7d5d560304eafeecebce79d" +checksum = "d556ec1359574147ec0c4fc5eb525f3f23263a592b1a9c07e0a75b427de55c97" dependencies = [ "proc-macro2", "quote", @@ -271,9 +271,9 @@ dependencies = [ [[package]] name = "bitflags" -version = "2.8.0" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f68f53c83ab957f72c32642f3868eec03eb974d1fb82e453128456482613d36" +checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" [[package]] name = "block-buffer" @@ -319,19 +319,25 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.10.0" +version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f61dac84819c6588b558454b194026eb1f09c293b9036ae9b159e74e73ab6cf9" +checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" [[package]] name = "cc" -version = "1.2.12" +version = "1.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "755717a7de9ec452bf7f3f1a3099085deabd7f2962b861dae91ecd7a365903d2" +checksum = "be714c154be609ec7f5dad223a33bf1482fff90472de28f7362806e6d4832b8c" dependencies = [ "shlex", ] +[[package]] +name = "cesu8" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" + [[package]] name = "cfg-if" version = "1.0.0" @@ -339,10 +345,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] -name = "chrono" -version = "0.4.39" +name = "cfg_aliases" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + +[[package]] +name = "chrono" +version = "0.4.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a7964611d71df112cb1730f2ee67324fcf4d0fc6606acbbe9bfe06df124637c" dependencies = [ "android-tzdata", "iana-time-zone", @@ -350,7 +362,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets", + "windows-link", ] [[package]] @@ -382,9 +394,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.28" +version = "4.5.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e77c3243bd94243c03672cb5154667347c457ca271254724f9f393aee1c05ff" +checksum = "027bb0d98429ae334a8698531da7077bdf906419543a35a55c2cb1b66437d767" dependencies = [ "clap_builder", "clap_derive", @@ -392,9 +404,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.27" +version = "4.5.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b26884eb4b57140e4d2d93652abfa49498b938b3c9179f9fc487b0acc3edad7" +checksum = "5589e0cba072e0f3d23791efac0fd8627b49c829c196a492e88168e6a669d863" dependencies = [ "anstream", "anstyle", @@ -470,6 +482,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -600,8 +622,9 @@ dependencies = [ [[package]] name = "daqbuf-items-0" -version = "0.0.3" +version = "0.0.4" dependencies = [ + "autoerr", "bincode", "bytes", "chrono", @@ -612,7 +635,6 @@ dependencies = [ "futures-util", "serde", "serde_json", - "thiserror 0.0.1", "typetag", ] @@ -640,7 +662,6 @@ dependencies = [ "rmp-serde", "serde", "serde_json", - "thiserror 0.0.1", "typetag", ] @@ -675,6 +696,7 @@ dependencies = [ name = "daqbuf-parse" version = "0.0.3" dependencies = [ + "autoerr", "byteorder", "bytes", "chrono", @@ -757,7 +779,7 @@ dependencies = [ [[package]] name = "daqbuffer" -version = "0.5.5-aa.6" +version = "0.5.5-aa.10" dependencies = [ "bytes", "chrono", @@ -967,9 +989,9 @@ dependencies = [ [[package]] name = "either" -version = "1.13.0" +version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" [[package]] name = "embedded-io" @@ -985,15 +1007,15 @@ checksum = "edd0f118536f44f5ccd48bcb8b111bdc3de888b58c74639dfb034a357d0f206d" [[package]] name = "equivalent" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" [[package]] name = "erased-serde" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24e2389d65ab4fab27dc2a5de7b191e1f6617d1f1c8855c0dc569c94a4cbb18d" +checksum = "e004d887f51fcb9fef17317a2f3525c887d8aa3f4f50fed920816a688284a5b7" dependencies = [ "serde", "typeid", @@ -1033,10 +1055,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" [[package]] -name = "flate2" -version = "1.0.35" +name = "fastrand" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c936bfdafb507ebbf50b8074c54fa31c5be9a1e7e5f467dd659697041407d07c" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + +[[package]] +name = "flate2" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11faaf5a5236997af9848be0bef4db95824b1d534ebc64d0f0c6cf3e67bd38dc" dependencies = [ "crc32fast", "miniz_oxide", @@ -1173,8 +1201,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi 0.11.0+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -1197,9 +1227,9 @@ checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" [[package]] name = "h2" -version = "0.4.7" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccae279728d634d083c00f6099cb58f01cc99c145b84b8be2f6c74618d79922e" +checksum = "5017294ff4bb30944501348f6f8e42e6ad28f42c8bbef7a74029aff064a4e3c2" dependencies = [ "atomic-waker", "bytes", @@ -1214,6 +1244,34 @@ dependencies = [ "tracing", ] +[[package]] +name = "h3" +version = "0.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e7675a0963b47a6d12fe44c279918b4ffb19baee838ac37f48d2722ad5bc6ab" +dependencies = [ + "bytes", + "fastrand", + "futures-util", + "http", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "h3-quinn" +version = "0.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17c799f413fceeea505236c4d8132f084ff4b55a652288d91439ee93dc24d855" +dependencies = [ + "bytes", + "futures", + "h3", + "quinn", + "tokio", + "tokio-util", +] + [[package]] name = "h5out" version = "0.0.1" @@ -1336,15 +1394,16 @@ dependencies = [ [[package]] name = "httparse" -version = "1.10.0" +version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2d708df4e7140240a16cd6ab0ab65c972d7433ab77819ea693fde9c43811e2a" +checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" [[package]] name = "httpclient" -version = "0.0.2" +version = "0.0.3" dependencies = [ "async-channel 1.9.0", + "autoerr", "bytes", "daqbuf-err", "daqbuf-netpod", @@ -1358,7 +1417,6 @@ dependencies = [ "hyper-util", "serde", "serde_json", - "thiserror 0.0.1", "tokio", "tracing", "url", @@ -1374,7 +1432,7 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" name = "httpret" version = "0.4.1" dependencies = [ - "async-channel 1.9.0", + "async-channel 2.3.1", "autoerr", "brotli", "bytes", @@ -1393,6 +1451,8 @@ dependencies = [ "disk", "flate2", "futures-util", + "h3", + "h3-quinn", "http", "http-body-util", "httpclient", @@ -1401,13 +1461,18 @@ dependencies = [ "itertools", "md-5", "nodenet", - "rand 0.8.5", + "pin-project", + "quinn", + "rand 0.9.0", "regex", + "rustls", + "rustls-pki-types", "scyllaconn", "serde", "serde_json", "streamio", "taskrun", + "time", "tracing", "tracing-futures", "url", @@ -1649,9 +1714,9 @@ dependencies = [ [[package]] name = "inventory" -version = "0.3.19" +version = "0.3.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54b12ebb6799019b044deaf431eadfe23245b259bba5a2c0796acec3943a3cdb" +checksum = "ab08d7cd2c5897f2c949e5383ea7c7db03fb19130ffcfbf7eda795137ae3cb83" dependencies = [ "rustversion", ] @@ -1673,9 +1738,29 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.14" +version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" +checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" + +[[package]] +name = "jni" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6df18c2e3db7e453d3c6ac5b3e9d5182664d28788126d39b91f2d1e22b017ec" +dependencies = [ + "cesu8", + "combine", + "jni-sys", + "log", + "thiserror 1.0.69", + "walkdir", +] + +[[package]] +name = "jni-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" [[package]] name = "js-sys" @@ -1695,15 +1780,15 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.169" +version = "0.2.170" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" +checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828" [[package]] name = "litemap" -version = "0.7.4" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ee93343901ab17bd981295f2cf0026d4ad018c7c31ba84549a4ddbb47a45104" +checksum = "23fb14cb19457329c82206317a5663005a4d404783dc74f4252769b0d5f42856" [[package]] name = "lock_api" @@ -1717,9 +1802,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.25" +version = "0.4.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f" +checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e" [[package]] name = "lz4_flex" @@ -1754,9 +1839,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" [[package]] name = "miniz_oxide" -version = "0.8.3" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8402cab7aefae129c6977bb0ff1b8fd9a04eb5b51efc50a70bea51cda0c7924" +checksum = "8e3e04debbb59698c15bacbb6d93584a8c0ca9cc3213cb423d31f760d8843ce5" dependencies = [ "adler2", ] @@ -1778,6 +1863,7 @@ version = "0.0.2" dependencies = [ "arrayref", "async-channel 1.9.0", + "autoerr", "byteorder", "bytes", "chrono", @@ -1882,6 +1968,12 @@ version = "1.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "945462a4b81e43c4e3ba96bd7b49d834c6f61198356aa858733bc4acf3cbe62e" +[[package]] +name = "openssl-probe" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" + [[package]] name = "overload" version = "0.1.1" @@ -1949,18 +2041,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.9" +version = "1.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfe2e71e1471fe07709406bf725f710b02927c9c54b2b5b2ec0e8087d97c327d" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.9" +version = "1.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6e859e6e5bd50440ab63c47e3ebabc90f26251f7c73c3d3e837b74a1cc3fa67" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" dependencies = [ "proc-macro2", "quote", @@ -2041,18 +2133,72 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.93" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60946a68e5f9d28b0dc1c21bb8a97ee7d018a8b322fa57838ba31cc878e22d99" +checksum = "a31971752e70b8b2686d7e46ec17fb38dad4051d94024c88df49b667caea9c84" dependencies = [ "unicode-ident", ] [[package]] -name = "quote" -version = "1.0.38" +name = "quinn" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e4dccaaaf89514f546c693ddc140f729f958c247918a13380cccc6078391acc" +checksum = "62e96808277ec6f97351a2380e6c25114bc9e67037775464979f3037c92d05ef" +dependencies = [ + "bytes", + "futures-io", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls", + "socket2", + "thiserror 2.0.12", + "tokio", + "tracing", +] + +[[package]] +name = "quinn-proto" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2fe5ef3495d7d2e377ff17b1a8ce2ee2ec2a18cde8b6ad6619d65d0701c135d" +dependencies = [ + "bytes", + "getrandom 0.2.15", + "rand 0.8.5", + "ring", + "rustc-hash", + "rustls", + "rustls-pki-types", + "rustls-platform-verifier", + "slab", + "thiserror 2.0.12", + "tinyvec", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-udp" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e46f3055866785f6b92bc6164b76be02ca8f2eb4b002c0354b28cf4c119e5944" +dependencies = [ + "cfg_aliases", + "libc", + "once_cell", + "socket2", + "tracing", + "windows-sys 0.59.0", +] + +[[package]] +name = "quote" +version = "1.0.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1f1914ce909e1658d9907913b4b91947430c7d9be598b15a1912935b8c04801" dependencies = [ "proc-macro2", ] @@ -2075,8 +2221,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" dependencies = [ "rand_chacha 0.9.0", - "rand_core 0.9.0", - "zerocopy 0.8.17", + "rand_core 0.9.3", + "zerocopy 0.8.23", ] [[package]] @@ -2096,7 +2242,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" dependencies = [ "ppv-lite86", - "rand_core 0.9.0", + "rand_core 0.9.3", ] [[package]] @@ -2110,12 +2256,11 @@ dependencies = [ [[package]] name = "rand_core" -version = "0.9.0" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b08f3c9802962f7e1b25113931d94f43ed9725bebc59db9d0c3e9a23b67e15ff" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" dependencies = [ "getrandom 0.3.1", - "zerocopy 0.8.17", ] [[package]] @@ -2138,13 +2283,12 @@ dependencies = [ [[package]] name = "redis" -version = "0.27.6" +version = "0.28.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09d8f99a4090c89cc489a94833c901ead69bfbf3877b4867d5482e321ee875bc" +checksum = "e37ec3fd44bea2ec947ba6cc7634d7999a6590aca7c35827c250bc0de502bda6" dependencies = [ "arc-swap", "combine", - "itertools", "itoa", "num-bigint", "percent-encoding", @@ -2156,9 +2300,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.8" +version = "0.5.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834" +checksum = "0b8c0c260b63a8219631167be35e6a988e9554dbd323f8bd08439c8ed1302bd1" dependencies = [ "bitflags", ] @@ -2192,6 +2336,20 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +[[package]] +name = "ring" +version = "0.17.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ac5d832aa16abd7d1def883a8545280c20a60f523a370aa3a9617c2b8550ee" +dependencies = [ + "cc", + "cfg-if", + "getrandom 0.2.15", + "libc", + "untrusted", + "windows-sys 0.52.0", +] + [[package]] name = "rmp" version = "0.8.14" @@ -2220,6 +2378,12 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" + [[package]] name = "rustc_version" version = "0.4.1" @@ -2230,16 +2394,118 @@ dependencies = [ ] [[package]] -name = "rustversion" -version = "1.0.19" +name = "rustls" +version = "0.23.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7c45b9784283f1b2e7fb61b42047c2fd678ef0960d4f6f1eba131594cc369d4" +checksum = "47796c98c480fce5406ef69d1c76378375492c3b0a0de587be0c1d9feb12f395" +dependencies = [ + "log", + "once_cell", + "ring", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-native-certs" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c" +dependencies = [ + "web-time", +] + +[[package]] +name = "rustls-platform-verifier" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4c7dc240fec5517e6c4eab3310438636cfe6391dfc345ba013109909a90d136" +dependencies = [ + "core-foundation", + "core-foundation-sys", + "jni", + "log", + "once_cell", + "rustls", + "rustls-native-certs", + "rustls-platform-verifier-android", + "rustls-webpki", + "security-framework", + "security-framework-sys", + "webpki-root-certs", + "windows-sys 0.52.0", +] + +[[package]] +name = "rustls-platform-verifier-android" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" + +[[package]] +name = "rustls-webpki" +version = "0.102.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + +[[package]] +name = "rustversion" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eded382c5f5f786b989652c49544c4877d9f015cc22e145a5ea8ea66c2921cd2" [[package]] name = "ryu" -version = "1.0.19" +version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ea1a2d0a644769cc99faa24c3ad26b379b786fe7c36fd3c546254801650e6dd" +checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" + +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "schannel" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d" +dependencies = [ + "windows-sys 0.59.0", +] [[package]] name = "scopeguard" @@ -2272,7 +2538,7 @@ dependencies = [ "smallvec", "snap", "socket2", - "thiserror 2.0.11", + "thiserror 2.0.12", "tokio", "tracing", "uuid", @@ -2291,7 +2557,7 @@ dependencies = [ "scylla-macros", "snap", "stable_deref_trait", - "thiserror 2.0.11", + "thiserror 2.0.12", "tokio", "uuid", "yoke", @@ -2329,16 +2595,40 @@ dependencies = [ ] [[package]] -name = "semver" -version = "1.0.25" +name = "security-framework" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f79dfe2d285b0488816f30e700a7438c5a73d816b5b7d3ac72fbc48b0d185e03" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "num-bigint", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49db231d56a190491cb4aeda9527f1ad45345af50b0851622a7adb8c03b01c32" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "semver" +version = "1.0.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0" [[package]] name = "serde" -version = "1.0.217" +version = "1.0.218" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" +checksum = "e8dfc9d19bdbf6d17e22319da49161d5d0108e4188e8b680aef6299eed22df60" dependencies = [ "serde_derive", ] @@ -2355,9 +2645,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.217" +version = "1.0.218" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" +checksum = "f09503e191f4e797cb8aac08e9a4a4695c5edf6a2e70e376d961ddd5c969f82b" dependencies = [ "proc-macro2", "quote", @@ -2366,9 +2656,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.138" +version = "1.0.140" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d434192e7da787e94a6ea7e9670b26a036d0ca41e0b7efb2676dd32bae872949" +checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" dependencies = [ "itoa", "memchr", @@ -2447,9 +2737,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.13.2" +version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +checksum = "7fcf8323ef1faaee30a44a340193b1ac6814fd9b7b4e88e9d4519a3e4abe1cfd" [[package]] name = "snap" @@ -2516,7 +2806,6 @@ dependencies = [ "serde_cbor", "serde_json", "taskrun", - "thiserror 0.0.1", "tokio", "tokio-stream", "typetag", @@ -2547,9 +2836,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.98" +version = "2.0.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36147f1a48ae0ec2b5b3bc5b537d267457555a10dc06f3dbc8cb11ba3006d3b1" +checksum = "e02e925281e18ffd9d640e234264753c43edc62d64b2d4cf898f1bc5e75f3fc2" dependencies = [ "proc-macro2", "quote", @@ -2592,11 +2881,20 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.11" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d452f284b73e6d76dd36758a0c8684b1d5be31f92b89d07fd5822175732206fc" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" dependencies = [ - "thiserror-impl 2.0.11", + "thiserror-impl 1.0.69", +] + +[[package]] +name = "thiserror" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" +dependencies = [ + "thiserror-impl 2.0.12", ] [[package]] @@ -2611,9 +2909,20 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "2.0.11" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26afc1baea8a989337eeb52b6e72a039780ce45c3edfcc9c5b9d112feeb173c2" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" dependencies = [ "proc-macro2", "quote", @@ -2632,9 +2941,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.37" +version = "0.3.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35e7868883861bd0e56d9ac6efcaaca0d6d5d82a2a7ec8209ff492c07cf37b21" +checksum = "dad298b01a40a23aac4580b67e3dbedb7cc8402f3592d7f49469de2ea4aecdd8" dependencies = [ "deranged", "itoa", @@ -2647,15 +2956,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" +checksum = "765c97a5b985b7c11d7bc27fa927dc4fe6af3a6dfb021d28deb60d3bf51e76ef" [[package]] name = "time-macros" -version = "0.2.19" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2834e6017e3e5e4b9834939793b282bc03b37a3336245fa820e35e233e2a85de" +checksum = "e8093bc3e81c3bc5f7879de09619d06c9a5a5e45ca44dfeeb7225bae38005c5c" dependencies = [ "num-conv", "time-core", @@ -2682,9 +2991,9 @@ dependencies = [ [[package]] name = "tinyvec" -version = "1.8.1" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "022db8904dfa342efe721985167e9fcd16c29b226db4397ed752a761cfce81e8" +checksum = "09b3661f17e86524eccd4371ab0429194e0d7c008abb45f7a7495b1719463c71" dependencies = [ "tinyvec_macros", ] @@ -2787,6 +3096,7 @@ version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -2869,21 +3179,21 @@ dependencies = [ [[package]] name = "typeid" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e13db2e0ccd5e14a544e8a246ba2312cd25223f616442d7f2cb0e3db614236e" +checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c" [[package]] name = "typenum" -version = "1.17.0" +version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" [[package]] name = "typetag" -version = "0.2.19" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "044fc3365ddd307c297fe0fe7b2e70588cdab4d0f62dc52055ca0d11b174cf0e" +checksum = "73f22b40dd7bfe8c14230cf9702081366421890435b2d625fa92b4acc4c3de6f" dependencies = [ "erased-serde", "inventory", @@ -2894,9 +3204,9 @@ dependencies = [ [[package]] name = "typetag-impl" -version = "0.2.19" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9d30226ac9cbd2d1ff775f74e8febdab985dab14fb14aa2582c29a92d5555dc" +checksum = "35f5380909ffc31b4de4f4bdf96b877175a016aa2ca98cee39fcfd8c4d53d952" dependencies = [ "proc-macro2", "quote", @@ -2911,9 +3221,9 @@ checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5" [[package]] name = "unicode-ident" -version = "1.0.16" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a210d160f08b701c8721ba1c726c11662f877ea6b7094007e1ca9a1041945034" +checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" [[package]] name = "unicode-normalization" @@ -2936,6 +3246,12 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.5.4" @@ -2967,9 +3283,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.13.1" +version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ced87ca4be083373936a67f8de945faa23b6b42384bd5b64434850802c6dccd0" +checksum = "e0f540e3240398cce6128b64ba83fdbdd86129c16a3aa1a3a252efd66eb3d587" dependencies = [ "getrandom 0.3.1", ] @@ -2986,6 +3302,16 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -3084,6 +3410,25 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "webpki-root-certs" +version = "0.26.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09aed61f5e8d2c18344b3faa33a4c837855fe56642757754775548fee21386c4" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "whoami" version = "1.5.2" @@ -3111,6 +3456,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" @@ -3126,6 +3480,12 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "windows-link" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dccfd733ce2b1753b03b6d3c65edf020262ea35e20ccdf3e288043e6dd620e3" + [[package]] name = "windows-sys" version = "0.52.0" @@ -3265,11 +3625,11 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.17" +version = "0.8.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa91407dacce3a68c56de03abe2760159582b846c6a4acd2f456618087f12713" +checksum = "fd97444d05a4328b90e75e503a34bad781f14e28a823ad3557f0750df1ebcbc6" dependencies = [ - "zerocopy-derive 0.8.17", + "zerocopy-derive 0.8.23", ] [[package]] @@ -3285,9 +3645,9 @@ dependencies = [ [[package]] name = "zerocopy-derive" -version = "0.8.17" +version = "0.8.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06718a168365cad3d5ff0bb133aad346959a2074bd4a85c121255a11304a8626" +checksum = "6352c01d0edd5db859a63e2605f4ea3183ddbd15e2c4a9e7d32184df75e4f154" dependencies = [ "proc-macro2", "quote", @@ -3296,18 +3656,18 @@ dependencies = [ [[package]] name = "zerofrom" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cff3ee08c995dee1859d998dea82f7374f2826091dd9cd47def953cae446cd2e" +checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5" dependencies = [ "zerofrom-derive", ] [[package]] name = "zerofrom-derive" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" +checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", @@ -3315,6 +3675,12 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zeroize" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" + [[package]] name = "zerovec" version = "0.10.4" diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index 34f7cbc..1999977 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqbuffer" -version = "0.5.5-aa.9" +version = "0.5.5-aa.10" authors = ["Dominik Werder "] edition = "2021" diff --git a/crates/daqbufp2/Cargo.toml b/crates/daqbufp2/Cargo.toml index bcaa700..11f73ec 100644 --- a/crates/daqbufp2/Cargo.toml +++ b/crates/daqbufp2/Cargo.toml @@ -32,3 +32,6 @@ items_0 = { path = "../../../daqbuf-items-0", package = "daqbuf-items-0" } items_2 = { path = "../../../daqbuf-items-2", package = "daqbuf-items-2" } streams = { path = "../../../daqbuf-streams", package = "daqbuf-streams" } parse = { path = "../../../daqbuf-parse", package = "daqbuf-parse" } + +[features] +DISABLED = [] diff --git a/crates/daqbufp2/src/test/api4/common.rs b/crates/daqbufp2/src/test/api4/common.rs index c0e6413..1b18349 100644 --- a/crates/daqbufp2/src/test/api4/common.rs +++ b/crates/daqbufp2/src/test/api4/common.rs @@ -33,6 +33,7 @@ pub async fn fetch_events_json(query: PlainEventsQuery, cluster: &Cluster) -> Re } // TODO improve by a more information-rich return type. +#[allow(unused)] pub async fn fetch_binned_json(query: BinnedQuery, cluster: &Cluster) -> Result { let ctx = ReqCtx::for_test(); let t1 = Utc::now(); diff --git a/crates/daqbufp2/src/test/api4/eventsjson.rs b/crates/daqbufp2/src/test/api4/eventsjson.rs index f363bce..e32bf86 100644 --- a/crates/daqbufp2/src/test/api4/eventsjson.rs +++ b/crates/daqbufp2/src/test/api4/eventsjson.rs @@ -2,7 +2,6 @@ use crate::nodes::require_test_hosts_running; use crate::test::api4::common::fetch_events_json; use chrono::Utc; use daqbuf_err::Error; -use items_0::WithLen; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::AppendToUrl; @@ -36,7 +35,8 @@ fn events_plain_json_00() -> Result<(), Error> { "1970-01-01T00:20:04.000Z", "1970-01-01T00:21:10.000Z", )?; - let jsv = fetch_events_json(query, cluster).await?; + // TODO + let _jsv = fetch_events_json(query, cluster).await?; // let res: EventsDim0CollectorOutput = serde_json::from_value(jsv)?; // Tim-weighted uses one event before requested range: // assert_eq!(res.len(), 133); @@ -51,7 +51,7 @@ fn events_plain_json_02_range_incomplete() -> Result<(), Error> { let fut = async { let rh = require_test_hosts_running()?; let cluster = &rh.cluster; - let jsv = events_plain_json( + let _jsv = events_plain_json( SfDbChannel::from_name(TEST_BACKEND, "test-gen-i32-dim0-v01"), "1970-01-03T23:59:55.000Z", "1970-01-04T00:00:01.000Z", diff --git a/crates/daqbufp2/src/test/api4/pulseiddiff.rs b/crates/daqbufp2/src/test/api4/pulseiddiff.rs index 8ca272d..8b444fd 100644 --- a/crates/daqbufp2/src/test/api4/pulseiddiff.rs +++ b/crates/daqbufp2/src/test/api4/pulseiddiff.rs @@ -1,7 +1,7 @@ use crate::nodes::require_test_hosts_running; use crate::test::api4::common::fetch_events_json; use daqbuf_err::Error; -use items_0::test::f32_iter_cmp_near; +// use items_0::test::f32_iter_cmp_near; use netpod::range::evrange::NanoRange; use netpod::SfDbChannel; use query::api4::events::PlainEventsQuery; @@ -27,7 +27,7 @@ fn events_plain_json_00() -> Result<(), Error> { "1970-01-01T00:20:04.000Z", "1970-01-01T00:21:10.000Z", )?; - let jsv = fetch_events_json(query, cluster).await?; + let _jsv = fetch_events_json(query, cluster).await?; // let res: EventsDim0CollectorOutput = serde_json::from_value(jsv)?; // inmem was meant just for functional test, ignores the requested time range // assert_eq!(res.ts_anchor_sec(), 1204); diff --git a/crates/daqbufp2/src/test/archapp.rs b/crates/daqbufp2/src/test/archapp.rs index 7a72119..7dd6b8d 100644 --- a/crates/daqbufp2/src/test/archapp.rs +++ b/crates/daqbufp2/src/test/archapp.rs @@ -11,7 +11,7 @@ fn get_events_1() -> Result<(), Error> { } // TODO re-use test data in dedicated archapp converter. let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) }; - #[cfg(DISABLED)] + #[cfg(feature = "DISABLED")] let fut = async { let rh = require_archapp_test_host_running()?; let cluster = &rh.cluster; diff --git a/crates/daqbufp2/src/test/binnedjson.rs b/crates/daqbufp2/src/test/binnedjson.rs index 9e8bbb9..1935b07 100644 --- a/crates/daqbufp2/src/test/binnedjson.rs +++ b/crates/daqbufp2/src/test/binnedjson.rs @@ -9,7 +9,7 @@ fn get_sls_archive_1() -> Result<(), Error> { } // TODO re-use test data in dedicated convert application. let fut = async { Err::<(), _>(Error::with_msg_no_trace("TODO")) }; - #[cfg(DISABLED)] + #[cfg(feature = "DISABLED")] let fut = async move { let rh = require_sls_test_host_running()?; let cluster = &rh.cluster; @@ -37,7 +37,7 @@ fn get_sls_archive_3() -> Result<(), Error> { } // TODO re-use test data in dedicated convert application. let fut = async { Err::<(), _>(Error::with_msg_no_trace("TODO")) }; - #[cfg(DISABLED)] + #[cfg(feature = "DISABLED")] let fut = async move { let rh = require_sls_test_host_running()?; let cluster = &rh.cluster; @@ -65,7 +65,7 @@ fn get_sls_archive_wave_2() -> Result<(), Error> { } // TODO re-use test data in dedicated convert application. let fut = async { Err::<(), _>(Error::with_msg_no_trace("TODO")) }; - #[cfg(DISABLED)] + #[cfg(feature = "DISABLED")] let fut = async move { let rh = require_sls_test_host_running()?; let cluster = &rh.cluster; diff --git a/crates/daqbufp2/src/test/binnedjson/channelarchiver.rs b/crates/daqbufp2/src/test/binnedjson/channelarchiver.rs index 5845120..356b598 100644 --- a/crates/daqbufp2/src/test/binnedjson/channelarchiver.rs +++ b/crates/daqbufp2/src/test/binnedjson/channelarchiver.rs @@ -7,7 +7,7 @@ fn get_scalar_2_events() -> Result<(), Error> { } // TODO re-use test data in dedicated convert application. let fut = async { Err::<(), _>(Error::with_msg_no_trace("TODO")) }; - #[cfg(DISABLED)] + #[cfg(feature = "DISABLED")] let fut = async move { let rh = require_sls_test_host_running()?; let cluster = &rh.cluster; @@ -56,7 +56,7 @@ fn get_scalar_2_binned() -> Result<(), Error> { } // TODO re-use test data in dedicated convert application. let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) }; - #[cfg(DISABLED)] + #[cfg(feature = "DISABLED")] let fut = async move { let rh = require_sls_test_host_running()?; let cluster = &rh.cluster; @@ -84,7 +84,7 @@ fn get_wave_1_events() -> Result<(), Error> { } // TODO re-use test data in dedicated convert application. let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) }; - #[cfg(DISABLED)] + #[cfg(feature = "DISABLED")] let fut = async move { let rh = require_sls_test_host_running()?; let cluster = &rh.cluster; @@ -131,7 +131,7 @@ fn get_wave_1_binned() -> Result<(), Error> { } // TODO re-use test data in dedicated convert application. let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) }; - #[cfg(DISABLED)] + #[cfg(feature = "DISABLED")] let fut = async move { let rh = require_sls_test_host_running()?; let cluster = &rh.cluster; diff --git a/crates/daqbufp2/src/test/timeweightedjson.rs b/crates/daqbufp2/src/test/timeweightedjson.rs index dab5093..c331bea 100644 --- a/crates/daqbufp2/src/test/timeweightedjson.rs +++ b/crates/daqbufp2/src/test/timeweightedjson.rs @@ -95,5 +95,6 @@ async fn get_json_common( return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count))); } let ret = DataResult { avgs }; + let _ = &ret.avgs; Ok(ret) } diff --git a/crates/disk/Cargo.toml b/crates/disk/Cargo.toml index 9d02ad1..3ee3000 100644 --- a/crates/disk/Cargo.toml +++ b/crates/disk/Cargo.toml @@ -42,3 +42,6 @@ streams = { path = "../../../daqbuf-streams", package = "daqbuf-streams" } streamio = { path = "../streamio" } httpclient = { path = "../httpclient" } bitshuffle = { path = "../../../daqbuf-bitshuffle", package = "daqbuf-bitshuffle" } + +[features] +DISABLED = [] diff --git a/crates/disk/src/channelconfig.rs b/crates/disk/src/channelconfig.rs index 2613410..e4c957f 100644 --- a/crates/disk/src/channelconfig.rs +++ b/crates/disk/src/channelconfig.rs @@ -79,7 +79,7 @@ async fn read_local_config_real( Ok(buf) => parse_config(&buf), Err(e) => match e.kind() { ErrorKind::NotFound => Err(ConfigParseError::FileNotFound), - ErrorKind::PermissionDenied => Err(ConfigParseError::PermissionDenied(path.clone())), + ErrorKind::PermissionDenied => Err(ConfigParseError::PermissionDenied(path.to_string_lossy().into())), e => { error!("read_local_config_real {e:?}"); Err(ConfigParseError::IO) diff --git a/crates/disk/src/eventchunkermultifile.rs b/crates/disk/src/eventchunkermultifile.rs index 52b5148..8aca1e2 100644 --- a/crates/disk/src/eventchunkermultifile.rs +++ b/crates/disk/src/eventchunkermultifile.rs @@ -282,7 +282,7 @@ impl Stream for EventChunkerMultifile { } // TODO re-enable tests generate data on the fly. -#[cfg(DISABLED)] +#[cfg(feature = "DISABLED")] #[cfg(test)] mod test { use crate::eventchunker::EventChunkerConf; diff --git a/crates/dq/Cargo.toml b/crates/dq/Cargo.toml index 80bec4d..1b5cb9a 100644 --- a/crates/dq/Cargo.toml +++ b/crates/dq/Cargo.toml @@ -2,10 +2,7 @@ name = "dq" version = "0.1.0" authors = ["Dominik Werder "] -edition = "2021" - -[lib] -path = "src/dq.rs" +edition = "2024" [dependencies] tokio = { version = "1.43.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } diff --git a/crates/dq/src/dq.rs b/crates/dq/src/dq.rs deleted file mode 100644 index 9e77193..0000000 --- a/crates/dq/src/dq.rs +++ /dev/null @@ -1,42 +0,0 @@ -use bytes::BufMut; -use std::fmt; - -trait WritableValue: fmt::Debug { - fn put_value(&self, buf: &mut Vec); -} - -impl WritableValue for u32 { - fn put_value(&self, buf: &mut Vec) { - buf.put_u32_le(*self); - } -} - -impl WritableValue for i8 { - fn put_value(&self, buf: &mut Vec) { - buf.put_i8(*self); - } -} - -impl WritableValue for i16 { - fn put_value(&self, buf: &mut Vec) { - buf.put_i16_le(*self); - } -} - -impl WritableValue for i32 { - fn put_value(&self, buf: &mut Vec) { - buf.put_i32_le(*self); - } -} - -impl WritableValue for f32 { - fn put_value(&self, buf: &mut Vec) { - buf.put_f32_le(*self); - } -} - -impl WritableValue for f64 { - fn put_value(&self, buf: &mut Vec) { - buf.put_f64_le(*self); - } -} diff --git a/crates/dq/src/lib.rs b/crates/dq/src/lib.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/dq/src/lib.rs @@ -0,0 +1 @@ + diff --git a/crates/httpclient/Cargo.toml b/crates/httpclient/Cargo.toml index 6fb744a..c97c8c9 100644 --- a/crates/httpclient/Cargo.toml +++ b/crates/httpclient/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "httpclient" -version = "0.0.2" +version = "0.0.3" authors = ["Dominik Werder "] -edition = "2021" +edition = "2024" [dependencies] futures-util = "0.3.31" @@ -19,11 +19,8 @@ hyper = { version = "1.6.0", features = ["http1", "http2", "client", "server"] } hyper-util = { version = "0.1.10", features = ["full"] } bytes = "1.10.0" async-channel = "1.9.0" +autoerr = "0.0.3" daqbuf-err = { path = "../../../daqbuf-err" } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } parse = { path = "../../../daqbuf-parse", package = "daqbuf-parse" } streams = { path = "../../../daqbuf-streams", package = "daqbuf-streams" } -thiserror = "=0.0.1" - -[patch.crates-io] -thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" } diff --git a/crates/httpret/src/api4/accounting.rs b/crates/httpret/src/api4/accounting.rs index e7b5dd2..55cd366 100644 --- a/crates/httpret/src/api4/accounting.rs +++ b/crates/httpret/src/api4/accounting.rs @@ -14,7 +14,6 @@ use httpclient::IntoBody; use httpclient::Requ; use httpclient::StreamResponse; use httpclient::ToJsonBody; -use items_2::accounting::AccountingEvents; use netpod::log::*; use netpod::req_uri_to_url; use netpod::ttl::RetentionTime; @@ -23,7 +22,6 @@ use netpod::NodeConfigCached; use netpod::ScalarType; use netpod::Shape; use netpod::TsMs; -use query::api4::AccountingIngestedBytesQuery; use query::api4::AccountingToplistQuery; use scyllaconn::accounting::toplist::UsageData; use serde::Deserialize; @@ -57,6 +55,7 @@ impl AccountedIngested { self.shapes.push(shape); } + #[allow(unused)] fn sort_by_counts(&mut self) { let mut tmp: Vec<_> = self .counts @@ -70,6 +69,7 @@ impl AccountedIngested { self.reorder_by_index_list(&tmp); } + #[allow(unused)] fn sort_by_bytes(&mut self) { let mut tmp: Vec<_> = self.bytes.iter().map(|&x| x).enumerate().map(|(i, x)| (x, i)).collect(); tmp.sort_unstable(); @@ -85,6 +85,7 @@ impl AccountedIngested { self.shapes = tmp.iter().map(|&x| self.shapes[x].clone()).collect(); } + #[allow(unused)] fn truncate(&mut self, len: usize) { self.names.truncate(len); self.counts.truncate(len); @@ -273,12 +274,13 @@ async fn fetch_data( _ncc: &NodeConfigCached, ) -> Result { let list_len_max = 10000000; + let _ = list_len_max; if let Some(scyqu) = &shared_res.scyqueue { let x = scyqu .accounting_read_ts(rt, ts) .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; - let mut ret = resolve_usages(x, &shared_res.pgqueue).await?; + let ret = resolve_usages(x, &shared_res.pgqueue).await?; // ret.dim0.sort_by_bytes(); // ret.dim1.sort_by_bytes(); // ret.dim0.truncate(list_len_max); diff --git a/crates/httpret/src/api4/eventdata.rs b/crates/httpret/src/api4/eventdata.rs index 7a4d177..c3c9955 100644 --- a/crates/httpret/src/api4/eventdata.rs +++ b/crates/httpret/src/api4/eventdata.rs @@ -83,7 +83,7 @@ impl EventDataHandler { .await .map_err(|_| EventDataError::InternalError)?; let (evsubq,) = nodenet::conn::events_parse_input_query(frames).map_err(|_| EventDataError::QueryParse)?; - debug!("{:?}", evsubq); + info!("{:?}", evsubq); let logspan = if evsubq.log_level() == "trace" { trace!("emit trace span"); tracing::span!(tracing::Level::INFO, "log_span_trace") diff --git a/crates/httpret/src/cache.rs b/crates/httpret/src/cache.rs index 1923d6f..c7b7046 100644 --- a/crates/httpret/src/cache.rs +++ b/crates/httpret/src/cache.rs @@ -5,6 +5,7 @@ use std::collections::BTreeMap; use std::sync::Mutex; use std::time::SystemTime; +#[allow(unused)] pub struct Dummy(u32); pub enum CachePortal { diff --git a/crates/httpret/src/http3.rs b/crates/httpret/src/http3.rs index 07a3944..8abe612 100644 --- a/crates/httpret/src/http3.rs +++ b/crates/httpret/src/http3.rs @@ -11,19 +11,13 @@ use quinn::EndpointConfig; use quinn::Incoming; use rustls::pki_types::pem::PemObject; use rustls::server::ProducesTickets; -use std::future::Future; use std::net::SocketAddr; -use std::pin::Pin; use std::sync::Arc; -use std::task::Context; -use std::task::Poll; use std::time::Duration; use taskrun::tokio; const EARLY_DATA_MAX: u32 = u32::MAX * 0; -macro_rules! info { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); } - macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); } #[derive(Debug)] @@ -38,11 +32,11 @@ impl ProducesTickets for TicketerCustom { 60 * 60 * 24 } - fn encrypt(&self, plain: &[u8]) -> Option> { + fn encrypt(&self, _plain: &[u8]) -> Option> { todo!() } - fn decrypt(&self, cipher: &[u8]) -> Option> { + fn decrypt(&self, _cipher: &[u8]) -> Option> { todo!() } } @@ -111,6 +105,7 @@ impl Http3Support { Ok(ret) } + #[allow(unused)] async fn new_plain_quic(bind_addr: SocketAddr) -> Result { let key = PemObject::from_pem_file("key.pem")?; let cert = PemObject::from_pem_file("cert.pem")?; @@ -160,6 +155,7 @@ impl Http3Support { } } + #[allow(unused)] async fn handle_incoming_inner_1(inc: Incoming, addr_remote: SocketAddr) -> Result<(), Error> { debug!("handle_incoming_inner_1 new incoming {:?}", addr_remote); let conn1 = inc.accept()?.await?; @@ -184,6 +180,7 @@ impl Http3Support { Ok(()) } + #[allow(unused)] async fn handle_incoming_inner_2(inc: Incoming, addr_remote: SocketAddr) -> Result<(), Error> { let selfname = "handle_incoming_inner_2"; debug!("{} new incoming {:?}", selfname, addr_remote); diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index dccc5f5..b7fe833 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -123,7 +123,8 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res } // let rawjh = taskrun::spawn(nodenet::conn::events_service(node_config.clone())); let (pgqueue, pgworker) = PgWorker::new(&ncc.node_config.cluster.database).await?; - let pgworker_jh = taskrun::spawn(async move { + // TODO use + let _pgworker_jh = taskrun::spawn(async move { let x = pgworker.work().await; match x { Ok(()) => {} @@ -143,7 +144,8 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res error!("{e}"); RetrievalError::TextError(e.to_string()) })?; - let scylla_worker_jh = taskrun::spawn(async move { + // TODO use + let _scylla_worker_jh = taskrun::spawn(async move { let x = scylla_worker.work().await; match x { Ok(()) => {} diff --git a/crates/httpret/src/proxy/api4/backend.rs b/crates/httpret/src/proxy/api4/backend.rs index 97f7853..d862144 100644 --- a/crates/httpret/src/proxy/api4/backend.rs +++ b/crates/httpret/src/proxy/api4/backend.rs @@ -9,7 +9,6 @@ use httpclient::Requ; use httpclient::StreamResponse; use netpod::ProxyConfig; use netpod::ReqCtx; -use netpod::ServiceVersion; use std::collections::BTreeMap; pub struct BackendListHandler {} diff --git a/crates/httpret/src/pulsemap.rs b/crates/httpret/src/pulsemap.rs index 4b1fc39..1143fdf 100644 --- a/crates/httpret/src/pulsemap.rs +++ b/crates/httpret/src/pulsemap.rs @@ -427,7 +427,8 @@ impl IndexChannelHttpFunction { async fn index(req: Requ, do_print: bool, node_config: &NodeConfigCached) -> Result { // TODO avoid double-insert on central storage. - let (pgc, pgjh) = dbconn::create_connection(&node_config.node_config.cluster.database).await?; + // TODO + let (pgc, _pgjh) = dbconn::create_connection(&node_config.node_config.cluster.database).await?; // TODO remove update of static columns when older clients are removed. let sql = "insert into map_pulse_files (channel, split, timebin, pulse_min, pulse_max, hostname, ks) values ($1, $2, $3, $4, $5, $6, $7) on conflict (channel, split, timebin) do update set pulse_min = $4, pulse_max = $5, upc1 = map_pulse_files.upc1 + 1, hostname = $6"; let insert_01 = pgc.prepare(sql).await?; @@ -1012,7 +1013,8 @@ impl MapPulseLocalHttpFunction { }) .unwrap_or_else(|| String::from("missing x-req-from")); let ts1 = Instant::now(); - let (conn, pgjh) = dbconn::create_connection(&node_config.node_config.cluster.database).await?; + // TODO + let (conn, _pgjh) = dbconn::create_connection(&node_config.node_config.cluster.database).await?; let sql = "select channel, hostname, timebin, split, ks from map_pulse_files where hostname = $1 and pulse_min <= $2 and (pulse_max >= $2 or closed = 0)"; let rows = conn.query(sql, &[&node_config.node.host, &(pulse as i64)]).await?; let cands: Vec<_> = rows @@ -1552,6 +1554,8 @@ impl MarkClosedHttpFunction { pub async fn mark_closed(node_config: &NodeConfigCached) -> Result<(), Error> { let (conn, pgjh) = dbconn::create_connection(&node_config.node_config.cluster.database).await?; + // TODO + let _ = &pgjh; let sql = "select distinct channel from map_pulse_files order by channel"; let rows = conn.query(sql, &[]).await?; let chns: Vec<_> = rows.iter().map(|r| r.get::<_, String>(0)).collect(); diff --git a/crates/nodenet/src/client.rs b/crates/nodenet/src/client.rs index 396d13b..92e627e 100644 --- a/crates/nodenet/src/client.rs +++ b/crates/nodenet/src/client.rs @@ -10,7 +10,7 @@ use httpclient::http; use httpclient::hyper::StatusCode; use httpclient::hyper::Uri; use items_0::streamitem::sitem_data; -use items_0::streamitem::sitem_err2_from_string; +// use items_0::streamitem::sitem_err2_from_string; use items_2::framable::Framable; use netpod::log::*; use netpod::Cluster; diff --git a/crates/nodenet/src/conn/test.rs b/crates/nodenet/src/conn/test.rs index 9fb47f0..689e61d 100644 --- a/crates/nodenet/src/conn/test.rs +++ b/crates/nodenet/src/conn/test.rs @@ -20,9 +20,7 @@ use netpod::timeunits::DAY; use netpod::timeunits::SEC; use netpod::ByteOrder; use netpod::Cluster; -use netpod::Database; use netpod::DtNano; -use netpod::FileIoBufferSize; use netpod::Node; use netpod::NodeConfig; use netpod::NodeConfigCached; @@ -98,8 +96,8 @@ fn raw_data_00() { eprintln!("written"); con.shutdown().await.unwrap(); eprintln!("shut down"); - - let (netin, netout) = con.into_split(); + // TODO use? + let (netin, _netout) = con.into_split(); let mut frames = InMemoryFrameStream::new(TcpReadAsBytes::new(netin), qu.inmem_bufcap()); while let Some(frame) = frames.next().await { match frame { diff --git a/crates/scyllaconn/src/accounting/toplist.rs b/crates/scyllaconn/src/accounting/toplist.rs index 691c3bf..6a554f5 100644 --- a/crates/scyllaconn/src/accounting/toplist.rs +++ b/crates/scyllaconn/src/accounting/toplist.rs @@ -1,5 +1,4 @@ use crate::log::*; -use daqbuf_err as err; use futures_util::TryStreamExt; use netpod::ttl::RetentionTime; use netpod::TsMs; diff --git a/crates/scyllaconn/src/bincache.rs b/crates/scyllaconn/src/bincache.rs index b93caa6..74bc9ed 100644 --- a/crates/scyllaconn/src/bincache.rs +++ b/crates/scyllaconn/src/bincache.rs @@ -2,6 +2,7 @@ use crate::events2::prepare::StmtsCache; use crate::events2::prepare::StmtsEvents; use crate::log::*; use crate::worker::ScyllaQueue; +use daqbuf_series::msp::PrebinnedPartitioning; use futures_util::TryStreamExt; use items_0::merge::MergeableTy; use items_2::binning::container_bins::ContainerBins; @@ -11,7 +12,6 @@ use netpod::TsNano; use scylla::Session as ScySession; use std::ops::Range; use streams::timebin::cached::reader::BinsReadRes; -use daqbuf_series::msp::PrebinnedPartitioning; async fn scylla_read_prebinned_f32( series: u64, @@ -137,7 +137,7 @@ pub async fn worker_read( scy: &ScySession, ) -> Result, streams::timebin::cached::reader::Error> { let partt = PrebinnedPartitioning::try_from(bin_len)?; - let div = partt.msp_div(); + let div = partt.patch_dt(); let params = ( series as i64, bin_len.ms() as i32, diff --git a/crates/scyllaconn/src/events.rs b/crates/scyllaconn/src/events.rs deleted file mode 100644 index d9bfc9a..0000000 --- a/crates/scyllaconn/src/events.rs +++ /dev/null @@ -1,575 +0,0 @@ -use crate::events2::events::EventReadOpts; -use crate::events2::prepare::StmtsEvents; -use crate::log::*; -use crate::range::ScyllaSeriesRange; -use crate::worker::ScyllaQueue; -use core::fmt; -use daqbuf_series::SeriesId; -use futures_util::Future; -use futures_util::TryStreamExt; -use items_0::scalar_ops::ScalarOps; -use items_0::timebin::BinningggContainerEventsDyn; -use items_0::Appendable; -use items_0::Empty; -use items_0::WithLen; -use items_2::binning::container_events::ContainerEvents; -use netpod::ttl::RetentionTime; -use netpod::DtNano; -use netpod::EnumVariant; -use netpod::TsMs; -use netpod::TsNano; -use scylla::Session; -use std::pin::Pin; -use std::sync::Arc; -use std::time::Instant; -use tracing::Instrument; - -macro_rules! trace_fetch { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } - -autoerr::create_error_v1!( - name(Error, "ScyllaReadEvents"), - enum variants { - Prepare(#[from] crate::events2::prepare::Error), - ScyllaQuery(#[from] scylla::transport::errors::QueryError), - ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError), - ScyllaWorker(Box), - ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError), - MissingQuery(String), - NotTokenAware, - RangeEndOverflow, - InvalidFuture, - TestError(String), - Logic, - TodoUnsupported, - }, -); - -impl From for Error { - fn from(value: crate::worker::Error) -> Self { - Self::ScyllaWorker(Box::new(value)) - } -} - -pub(super) trait ValTy: Sized + 'static { - type ScaTy: ScalarOps + std::default::Default; - type ScyTy: for<'a, 'b> scylla::deserialize::DeserializeValue<'a, 'b>; - type ScyRowTy: for<'a, 'b> scylla::deserialize::DeserializeRow<'a, 'b>; - type Container: BinningggContainerEventsDyn + Empty + Appendable; - fn from_valueblob(inp: Vec) -> Self; - fn table_name() -> &'static str; - fn default() -> Self; - fn is_valueblob() -> bool; - fn st_name() -> &'static str; - fn read_next_values_trait( - opts: ReadNextValuesOpts, - jobtrace: ReadJobTrace, - scy: Arc, - stmts: Arc, - ) -> Pin, ReadJobTrace), Error>> + Send>>; - fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self); -} - -macro_rules! impl_scaty_scalar { - ($st:ty, $st_scy:ty, $st_name:expr, $table_name:expr) => { - impl ValTy for $st { - type ScaTy = $st; - type ScyTy = $st_scy; - type ScyRowTy = (i64, $st_scy); - type Container = ContainerEvents; - - fn from_valueblob(_inp: Vec) -> Self { - panic!("unused") - } - - fn table_name() -> &'static str { - concat!("scalar_", $table_name) - } - - fn default() -> Self { - ::default() - } - - fn is_valueblob() -> bool { - false - } - - fn st_name() -> &'static str { - $st_name - } - - fn read_next_values_trait( - opts: ReadNextValuesOpts, - jobtrace: ReadJobTrace, - scy: Arc, - stmts: Arc, - ) -> Pin, ReadJobTrace), Error>> + Send>> { - Box::pin(read_next_values_2::(opts, jobtrace, scy, stmts)) - } - - fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) { - let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64); - (ts, inp.1 as Self::ScaTy) - } - } - }; -} - -macro_rules! impl_scaty_array { - ($vt:ty, $st:ty, $st_scy:ty, $st_name:expr, $table_name:expr) => { - impl ValTy for $vt { - type ScaTy = $st; - type ScyTy = $st_scy; - type ScyRowTy = (i64, $st_scy); - type Container = ContainerEvents>; - - fn from_valueblob(inp: Vec) -> Self { - if inp.len() < 32 { - ::default() - } else { - let en = std::mem::size_of::(); - let n = (inp.len().max(32) - 32) / en; - let mut c = Vec::with_capacity(n); - for i in 0..n { - let r1 = &inp[32 + en * (0 + i)..32 + en * (1 + i)]; - let p1 = r1 as *const _ as *const $st; - let v1 = unsafe { p1.read_unaligned() }; - c.push(v1); - } - c - } - } - - fn table_name() -> &'static str { - concat!("array_", $table_name) - } - - fn default() -> Self { - Vec::new() - } - - fn is_valueblob() -> bool { - true - } - - fn st_name() -> &'static str { - $st_name - } - - fn read_next_values_trait( - opts: ReadNextValuesOpts, - jobtrace: ReadJobTrace, - scy: Arc, - stmts: Arc, - ) -> Pin, ReadJobTrace), Error>> + Send>> { - Box::pin(read_next_values_2::(opts, jobtrace, scy, stmts)) - } - - fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) { - let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64); - (ts, inp.1 .into_iter().map(|x| x as _).collect()) - } - } - }; -} - -impl ValTy for EnumVariant { - type ScaTy = EnumVariant; - type ScyTy = i16; - type ScyRowTy = (i64, i16, String); - type Container = ContainerEvents; - - fn from_valueblob(_inp: Vec) -> Self { - panic!("unused") - } - - fn table_name() -> &'static str { - "array_string" - } - - fn default() -> Self { - ::default() - } - - fn is_valueblob() -> bool { - false - } - - fn st_name() -> &'static str { - "enum" - } - - fn read_next_values_trait( - opts: ReadNextValuesOpts, - jobtrace: ReadJobTrace, - scy: Arc, - stmts: Arc, - ) -> Pin, ReadJobTrace), Error>> + Send>> { - Box::pin(read_next_values_2::(opts, jobtrace, scy, stmts)) - } - - fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) { - let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64); - (ts, EnumVariant::new(inp.1 as u16, inp.2)) - } -} - -impl ValTy for Vec { - type ScaTy = String; - type ScyTy = Vec; - type ScyRowTy = (i64, Vec); - type Container = ContainerEvents>; - - fn from_valueblob(_inp: Vec) -> Self { - panic!("unused") - } - - fn table_name() -> &'static str { - "array_string" - } - - fn default() -> Self { - Vec::new() - } - - fn is_valueblob() -> bool { - false - } - - fn st_name() -> &'static str { - "string" - } - - fn read_next_values_trait( - opts: ReadNextValuesOpts, - jobtrace: ReadJobTrace, - scy: Arc, - stmts: Arc, - ) -> Pin, ReadJobTrace), Error>> + Send>> { - let fut = read_next_values_2::(opts, jobtrace, scy, stmts); - Box::pin(fut) - } - - fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) { - let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64); - (ts, inp.1) - } -} - -impl_scaty_scalar!(u8, i8, "u8", "u8"); -impl_scaty_scalar!(u16, i16, "u16", "u16"); -impl_scaty_scalar!(u32, i32, "u32", "u32"); -impl_scaty_scalar!(u64, i64, "u64", "u64"); -impl_scaty_scalar!(i8, i8, "i8", "i8"); -impl_scaty_scalar!(i16, i16, "i16", "i16"); -impl_scaty_scalar!(i32, i32, "i32", "i32"); -impl_scaty_scalar!(i64, i64, "i64", "i64"); -impl_scaty_scalar!(f32, f32, "f32", "f32"); -impl_scaty_scalar!(f64, f64, "f64", "f64"); -impl_scaty_scalar!(bool, bool, "bool", "bool"); -impl_scaty_scalar!(String, String, "string", "string"); - -impl_scaty_array!(Vec, u8, Vec, "u8", "u8"); -impl_scaty_array!(Vec, u16, Vec, "u16", "u16"); -impl_scaty_array!(Vec, u32, Vec, "u32", "u32"); -impl_scaty_array!(Vec, u64, Vec, "u64", "u64"); -impl_scaty_array!(Vec, i8, Vec, "i8", "i8"); -impl_scaty_array!(Vec, i16, Vec, "i16", "i16"); -impl_scaty_array!(Vec, i32, Vec, "i32", "i32"); -impl_scaty_array!(Vec, i64, Vec, "i64", "i64"); -impl_scaty_array!(Vec, f32, Vec, "f32", "f32"); -impl_scaty_array!(Vec, f64, Vec, "f64", "f64"); -impl_scaty_array!(Vec, bool, Vec, "bool", "bool"); - -#[derive(Debug)] -pub enum ReadEventKind { - Create, - FutgenCallingReadNextValues, - FutgenFutureCreated, - CallExecuteIter, - ScyllaReadRow(u32), - ScyllaReadRowDone(u32), - ReadNextValuesFutureDone, - EventsStreamRtSees(u32), -} - -#[derive(Debug)] -pub struct ReadJobTrace { - jobid: u64, - ts0: Instant, - events: Vec<(Instant, ReadEventKind)>, -} - -impl ReadJobTrace { - pub fn new() -> Self { - static JOBID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); - Self { - jobid: JOBID.fetch_add(1, std::sync::atomic::Ordering::AcqRel), - ts0: Instant::now(), - events: Vec::with_capacity(128), - } - } - - pub fn add_event_now(&mut self, kind: ReadEventKind) { - self.events.push((Instant::now(), kind)) - } -} - -impl fmt::Display for ReadJobTrace { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "ReadJobTrace jobid {jid}", jid = self.jobid)?; - for (ts, kind) in &self.events { - let dt = 1e3 * ts.saturating_duration_since(self.ts0).as_secs_f32(); - write!(fmt, "\njobid {jid:4} {dt:7.2} {kind:?}", jid = self.jobid)?; - } - Ok(()) - } -} - -#[derive(Debug)] -pub(super) struct ReadNextValuesOpts { - rt: RetentionTime, - series: u64, - ts_msp: TsMs, - range: ScyllaSeriesRange, - fwd: bool, - readopts: EventReadOpts, - scyqueue: ScyllaQueue, -} - -impl ReadNextValuesOpts { - pub(super) fn new( - rt: RetentionTime, - series: SeriesId, - ts_msp: TsMs, - range: ScyllaSeriesRange, - fwd: bool, - readopts: EventReadOpts, - scyqueue: ScyllaQueue, - ) -> Self { - Self { - rt, - series: series.id(), - ts_msp, - range, - fwd, - readopts, - scyqueue, - } - } -} - -pub(super) struct ReadNextValuesParams { - pub opts: ReadNextValuesOpts, - pub jobtrace: ReadJobTrace, -} - -pub(super) async fn read_next_values( - params: ReadNextValuesParams, -) -> Result<(Box, ReadJobTrace), Error> -where - ST: ValTy, -{ - let opts = params.opts; - let jobtrace = params.jobtrace; - // TODO could take scyqeue out of opts struct. - let scyqueue = opts.scyqueue.clone(); - let level = taskrun::query_log_level(); - let futgen = move |scy: Arc, stmts: Arc, mut jobtrace: ReadJobTrace| { - // TODO avoid this - // opts.jobtrace = jobtrace; - let fut = async move { - // let jobtrace = &mut opts.jobtrace; - let logspan = if level == Level::DEBUG { - tracing::span!(Level::INFO, "log_span_debug") - } else if level == Level::TRACE { - tracing::span!(Level::INFO, "log_span_trace") - } else { - tracing::Span::none() - }; - jobtrace.add_event_now(ReadEventKind::FutgenCallingReadNextValues); - let fut = ST::read_next_values_trait(opts, jobtrace, scy, stmts).instrument(logspan); - match fut.await.map_err(crate::worker::Error::from) { - Ok((ret, mut jobtrace)) => { - jobtrace.add_event_now(ReadEventKind::ReadNextValuesFutureDone); - Ok((ret, jobtrace)) - } - Err(e) => Err(e), - } - }; - Box::pin(fut) - as Pin< - Box< - dyn Future< - Output = Result<(Box, ReadJobTrace), crate::worker::Error>, - > + Send, - >, - > - }; - let (res, jobtrace) = scyqueue.read_next_values(futgen, jobtrace).await?; - Ok((res, jobtrace)) -} - -async fn read_next_values_2( - opts: ReadNextValuesOpts, - mut jobtrace: ReadJobTrace, - scy: Arc, - stmts: Arc, -) -> Result<(Box, ReadJobTrace), Error> -where - ST: ValTy, -{ - trace_fetch!("read_next_values_2 {:?} st_name {}", opts, ST::st_name()); - let series = opts.series; - let ts_msp = opts.ts_msp; - let range = opts.range; - let table_name = ST::table_name(); - let with_values = opts.readopts.with_values(); - if range.end() > TsNano::from_ns(i64::MAX as u64) { - return Err(Error::RangeEndOverflow); - } - let ret = if opts.fwd { - let ts_lsp_min = if range.beg() > ts_msp.ns() { - range.beg().delta(ts_msp.ns()) - } else { - DtNano::from_ns(0) - }; - let ts_lsp_max = if range.end() > ts_msp.ns() { - range.end().delta(ts_msp.ns()) - } else { - DtNano::from_ns(0) - }; - trace_fetch!( - "FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} {}", - ts_msp.fmt(), - ts_lsp_min, - ts_lsp_max, - table_name, - ); - let qu = stmts - .rt(&opts.rt) - .lsp(!opts.fwd, with_values) - .shape(ST::is_valueblob()) - .st(ST::st_name())?; - let qu = { - let mut qu = qu.clone(); - if qu.is_token_aware() == false { - return Err(Error::NotTokenAware); - } - qu.set_page_size(10000); - // qu.disable_paging(); - qu - }; - let params = ( - series as i64, - ts_msp.ms() as i64, - ts_lsp_min.ns() as i64, - ts_lsp_max.ns() as i64, - ); - trace_fetch!("FWD event search params {:?}", params); - jobtrace.add_event_now(ReadEventKind::CallExecuteIter); - let res = scy.execute_iter(qu.clone(), params).await?; - { - let mut ret = ::Container::empty(); - // TODO must branch already here depending on what input columns we expect - if with_values { - if ::is_valueblob() { - let mut it = res.rows_stream::<(i64, Vec)>()?; - while let Some(row) = it.try_next().await? { - let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); - let value = ::from_valueblob(row.1); - ret.push(ts, value); - } - ret - } else { - let mut i = 0; - let mut it = res.rows_stream::<::ScyRowTy>()?; - while let Some(row) = it.try_next().await? { - let (ts, value) = ::scy_row_to_ts_val(ts_msp, row); - // let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); - // let value = ::from_scyty(row.1); - ret.push(ts, value); - i += 1; - if i % 2000 == 0 { - jobtrace.add_event_now(ReadEventKind::ScyllaReadRow(i)); - } - } - { - jobtrace.add_event_now(ReadEventKind::ScyllaReadRowDone(i)); - } - ret - } - } else { - let mut it = res.rows_stream::<(i64,)>()?; - while let Some(row) = it.try_next().await? { - let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); - let value = ::default(); - ret.push(ts, value); - } - ret - } - } - } else { - let ts_lsp_max = if ts_msp.ns() < range.beg() { - range.beg().delta(ts_msp.ns()) - } else { - DtNano::from_ns(0) - }; - trace_fetch!( - "BCK ts_msp {} ts_lsp_max {} {}", - ts_msp.fmt(), - ts_lsp_max, - table_name, - ); - let qu = stmts - .rt(&opts.rt) - .lsp(!opts.fwd, with_values) - .shape(ST::is_valueblob()) - .st(ST::st_name())?; - let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64); - trace_fetch!("BCK event search params {:?}", params); - let res = scy.execute_iter(qu.clone(), params).await?; - { - let mut ret = ::Container::empty(); - // TODO must branch already here depending on what input columns we expect - if with_values { - if ::is_valueblob() { - let mut it = res.rows_stream::<(i64, Vec)>()?; - while let Some(row) = it.try_next().await? { - let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); - let value = ::from_valueblob(row.1); - ret.push(ts, value); - } - ret - } else { - let mut i = 0; - let mut it = res.rows_stream::<::ScyRowTy>()?; - while let Some(row) = it.try_next().await? { - let (ts, value) = ::scy_row_to_ts_val(ts_msp, row); - // let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); - // let value = ::from_scyty(row.1); - ret.push(ts, value); - i += 1; - if i % 2000 == 0 { - jobtrace.add_event_now(ReadEventKind::ScyllaReadRow(i)); - } - } - { - jobtrace.add_event_now(ReadEventKind::ScyllaReadRowDone(i)); - } - ret - } - } else { - let mut it = res.rows_stream::<(i64,)>()?; - while let Some(row) = it.try_next().await? { - let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); - let value = ::default(); - ret.push(ts, value); - } - ret - } - } - }; - trace_fetch!("read ts_msp {} len {}", ts_msp.fmt(), ret.len()); - let ret = Box::new(ret); - Ok((ret, jobtrace)) -} diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index 92fd4a3..118aadb 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -1,49 +1,62 @@ use super::msp::MspStreamRt; -use crate::events::read_next_values; -use crate::events::ReadJobTrace; -use crate::events::ReadNextValuesOpts; +use crate::events2::prepare::StmtsEvents; use crate::range::ScyllaSeriesRange; use crate::worker::ScyllaQueue; use daqbuf_err as err; use daqbuf_series::SeriesId; -use err::thiserror; -use err::ThisError; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; +use futures_util::TryStreamExt; +use items_0::container::ByteEstimate; use items_0::merge::DrainIntoNewDynResult; use items_0::merge::MergeableDyn; +use items_0::scalar_ops::ScalarOps; use items_0::timebin::BinningggContainerEventsDyn; +use items_0::Appendable; +use items_0::Empty; +use items_0::WithLen; +use items_2::binning::container_events::ContainerEvents; use items_2::channelevents::ChannelEvents; -use netpod::log::*; +use netpod::log; use netpod::ttl::RetentionTime; use netpod::ChConf; +use netpod::DtNano; use netpod::EnumVariant; use netpod::ScalarType; use netpod::Shape; use netpod::TsMs; use netpod::TsMsVecFmt; use netpod::TsNano; +use scylla::Session; use std::collections::VecDeque; +use std::fmt; use std::pin::Pin; +use std::sync::Arc; use std::task::Context; use std::task::Poll; use std::time::Duration; +use std::time::Instant; +use taskrun::tracing; -macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +macro_rules! error { ($($arg:expr),*) => ( if true { log::error!($($arg),*); } ) } -macro_rules! trace_fetch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +macro_rules! warn { ($($arg:expr),*) => ( if true { log::warn!($($arg),*); } ) } -macro_rules! trace_msp_fetch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +macro_rules! trace_init { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) } -macro_rules! trace_redo_fwd_read { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +macro_rules! trace_fetch { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) } -macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +macro_rules! trace_msp_fetch { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) } -macro_rules! trace_every_event { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +macro_rules! trace_redo_fwd_read { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) } -macro_rules! warn_item { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) } +macro_rules! trace_emit { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) } + +macro_rules! trace_every_event { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) } + +macro_rules! warn_item { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ) } #[derive(Debug, Clone)] pub struct EventReadOpts { @@ -66,21 +79,34 @@ impl EventReadOpts { } } -#[derive(Debug, ThisError)] -#[cstm(name = "ScyllaEvents")] -pub enum Error { - Worker(#[from] crate::worker::Error), - Events(#[from] crate::events::Error), - Msp(#[from] crate::events2::msp::Error), - Unordered, - OutOfRange, - BadBatch, - ReadQueueEmptyBck, - ReadQueueEmptyFwd, - Logic, - TruncateLogic, - AlreadyTaken, - DrainFailure, +autoerr::create_error_v1!( + name(Error, "ScyllaEvents"), + enum variants { + Worker(Box), + Msp(#[from] crate::events2::msp::Error), + Unordered, + OutOfRange, + BadBatch, + ReadQueueEmptyBck, + ReadQueueEmptyFwd, + Logic, + TruncateLogic, + AlreadyTaken, + DrainFailure, + RangeEndOverflow, + NotTokenAware, + Prepare(#[from] crate::events2::prepare::Error), + ScyllaQuery(#[from] scylla::transport::errors::QueryError), + ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError), + ScyllaWorker(Box), + ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError), + }, +); + +impl From for Error { + fn from(e: crate::worker::Error) -> Self { + Error::Worker(Box::new(e)) + } } struct FetchMsp { @@ -218,6 +244,89 @@ impl ReadingFwd { } } +#[derive(Debug)] +pub enum ReadEventKind { + Create, + FutgenCallingReadNextValues, + FutgenFutureCreated, + CallExecuteIter, + ScyllaReadRow(u32), + ScyllaReadRowDone(u32), + ReadNextValuesFutureDone, + EventsStreamRtSees(u32), +} + +#[derive(Debug)] +pub struct ReadJobTrace { + jobid: u64, + ts0: Instant, + events: Vec<(Instant, ReadEventKind)>, +} + +impl ReadJobTrace { + pub fn new() -> Self { + static JOBID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); + Self { + jobid: JOBID.fetch_add(1, std::sync::atomic::Ordering::AcqRel), + ts0: Instant::now(), + events: Vec::with_capacity(128), + } + } + + pub fn add_event_now(&mut self, kind: ReadEventKind) { + self.events.push((Instant::now(), kind)) + } +} + +impl fmt::Display for ReadJobTrace { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "ReadJobTrace jobid {jid}", jid = self.jobid)?; + for (ts, kind) in &self.events { + let dt = 1e3 * ts.saturating_duration_since(self.ts0).as_secs_f32(); + write!(fmt, "\njobid {jid:4} {dt:7.2} {kind:?}", jid = self.jobid)?; + } + Ok(()) + } +} + +#[derive(Debug)] +pub(super) struct ReadNextValuesOpts { + rt: RetentionTime, + series: u64, + ts_msp: TsMs, + range: ScyllaSeriesRange, + fwd: bool, + readopts: EventReadOpts, + scyqueue: ScyllaQueue, +} + +impl ReadNextValuesOpts { + pub(super) fn new( + rt: RetentionTime, + series: SeriesId, + ts_msp: TsMs, + range: ScyllaSeriesRange, + fwd: bool, + readopts: EventReadOpts, + scyqueue: ScyllaQueue, + ) -> Self { + Self { + rt, + series: series.id(), + ts_msp, + range, + fwd, + readopts, + scyqueue, + } + } +} + +struct ReadNextValuesParams { + opts: ReadNextValuesOpts, + jobtrace: ReadJobTrace, +} + #[derive(Clone)] struct MakeFutInfo { scyqueue: ScyllaQueue, @@ -346,7 +455,7 @@ impl EventsStreamRt { if false { taskrun::tokio::time::sleep(Duration::from_millis(10)).await; } - let params = crate::events::ReadNextValuesParams { opts, jobtrace }; + let params = ReadNextValuesParams { opts, jobtrace }; let ret = match &shape { Shape::Scalar => match &scalar_type { ScalarType::U8 => read_next_values::(params).await, @@ -663,8 +772,7 @@ impl Stream for EventsStreamRt { match st.qu.poll_next_unpin(cx) { Ready(Some(x)) => match x { Ok((evs, mut jobtrace)) => { - jobtrace - .add_event_now(crate::events::ReadEventKind::EventsStreamRtSees(evs.len() as u32)); + jobtrace.add_event_now(ReadEventKind::EventsStreamRtSees(evs.len() as u32)); trace_fetch!("ReadingFwd {jobtrace}"); for ts in MergeableDyn::tss_for_testing(evs.as_ref()) { trace_every_event!("ReadingFwd FetchEvents ts {}", ts.fmt()); @@ -741,3 +849,451 @@ fn trait_assert_try() { fn phantomval() -> T { panic!() } + +async fn read_next_values_2( + opts: ReadNextValuesOpts, + mut jobtrace: ReadJobTrace, + scy: Arc, + stmts: Arc, +) -> Result<(Box, ReadJobTrace), Error> +where + ST: ValTy, +{ + trace_fetch!("read_next_values_2 {:?} st_name {}", opts, ST::st_name()); + let series = opts.series; + let ts_msp = opts.ts_msp; + let range = opts.range; + let table_name = ST::table_name(); + let with_values = opts.readopts.with_values(); + if range.end() > TsNano::from_ns(i64::MAX as u64) { + return Err(Error::RangeEndOverflow); + } + let ret = if opts.fwd { + let ts_lsp_min = if range.beg() > ts_msp.ns() { + range.beg().delta(ts_msp.ns()) + } else { + DtNano::from_ns(0) + }; + let ts_lsp_max = if range.end() > ts_msp.ns() { + range.end().delta(ts_msp.ns()) + } else { + DtNano::from_ns(0) + }; + trace_fetch!( + "FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} {}", + ts_msp.fmt(), + ts_lsp_min, + ts_lsp_max, + table_name + ); + let qu = stmts + .rt(&opts.rt) + .lsp(!opts.fwd, with_values) + .shape(ST::is_valueblob()) + .st(ST::st_name())?; + let qu = { + let mut qu = qu.clone(); + if qu.is_token_aware() == false { + return Err(Error::NotTokenAware); + } + qu.set_page_size(10000); + // qu.disable_paging(); + qu + }; + let params = ( + series as i64, + ts_msp.ms() as i64, + ts_lsp_min.ns() as i64, + ts_lsp_max.ns() as i64, + ); + trace_fetch!("FWD event search params {:?}", params); + jobtrace.add_event_now(ReadEventKind::CallExecuteIter); + let res = scy.execute_iter(qu.clone(), params).await?; + { + let mut ret = ::Container::empty(); + // TODO must branch already here depending on what input columns we expect + if with_values { + if ::is_valueblob() { + let mut it = res.rows_stream::<(i64, Vec)>()?; + while let Some(row) = it.try_next().await? { + let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); + let value = ::from_valueblob(row.1); + ret.push(ts, value); + } + ret + } else { + let mut i = 0; + let mut it = res.rows_stream::<::ScyRowTy>()?; + while let Some(row) = it.try_next().await? { + let (ts, value) = ::scy_row_to_ts_val(ts_msp, row); + // let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); + // let value = ::from_scyty(row.1); + ret.push(ts, value); + i += 1; + if i % 2000 == 0 { + jobtrace.add_event_now(ReadEventKind::ScyllaReadRow(i)); + } + } + { + jobtrace.add_event_now(ReadEventKind::ScyllaReadRowDone(i)); + } + ret + } + } else { + let mut it = res.rows_stream::<(i64,)>()?; + while let Some(row) = it.try_next().await? { + let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); + let value = ::default(); + ret.push(ts, value); + } + ret + } + } + } else { + let ts_lsp_max = if ts_msp.ns() < range.beg() { + range.beg().delta(ts_msp.ns()) + } else { + DtNano::from_ns(0) + }; + trace_fetch!( + "BCK ts_msp {} ts_lsp_max {} {}", + ts_msp.fmt(), + ts_lsp_max, + table_name + ); + let qu = stmts + .rt(&opts.rt) + .lsp(!opts.fwd, with_values) + .shape(ST::is_valueblob()) + .st(ST::st_name())?; + let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64); + trace_fetch!("BCK event search params {:?}", params); + let res = scy.execute_iter(qu.clone(), params).await?; + { + let mut ret = ::Container::empty(); + // TODO must branch already here depending on what input columns we expect + if with_values { + if ::is_valueblob() { + let mut it = res.rows_stream::<(i64, Vec)>()?; + while let Some(row) = it.try_next().await? { + let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); + let value = ::from_valueblob(row.1); + ret.push(ts, value); + } + ret + } else { + let mut i = 0; + let mut it = res.rows_stream::<::ScyRowTy>()?; + while let Some(row) = it.try_next().await? { + let (ts, value) = ::scy_row_to_ts_val(ts_msp, row); + // let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); + // let value = ::from_scyty(row.1); + ret.push(ts, value); + i += 1; + if i % 2000 == 0 { + jobtrace.add_event_now(ReadEventKind::ScyllaReadRow(i)); + } + } + { + jobtrace.add_event_now(ReadEventKind::ScyllaReadRowDone(i)); + } + ret + } + } else { + let mut it = res.rows_stream::<(i64,)>()?; + while let Some(row) = it.try_next().await? { + let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); + let value = ::default(); + ret.push(ts, value); + } + ret + } + } + }; + let byte_est = ret.byte_estimate(); + trace_fetch!( + "read ts_msp {} len {} byte_est {}", + ts_msp.fmt(), + ret.len(), + byte_est + ); + let ret = Box::new(ret); + Ok((ret, jobtrace)) +} + +async fn read_next_values( + params: ReadNextValuesParams, +) -> Result<(Box, ReadJobTrace), Error> +where + ST: ValTy, +{ + let opts = params.opts; + let jobtrace = params.jobtrace; + // TODO could take scyqeue out of opts struct. + let scyqueue = opts.scyqueue.clone(); + let level = taskrun::query_log_level(); + let futgen = move |scy: Arc, stmts: Arc, mut jobtrace: ReadJobTrace| { + // TODO avoid this + // opts.jobtrace = jobtrace; + let fut = async move { + // let jobtrace = &mut opts.jobtrace; + let logspan = if level == log::Level::DEBUG { + tracing::span!(log::Level::INFO, "log_span_debug") + } else if level == log::Level::TRACE { + tracing::span!(log::Level::INFO, "log_span_trace") + } else { + tracing::Span::none() + }; + jobtrace.add_event_now(ReadEventKind::FutgenCallingReadNextValues); + let fut = ST::read_next_values_trait(opts, jobtrace, scy, stmts); + let fut = tracing::Instrument::instrument(fut, logspan); + match fut.await.map_err(crate::worker::Error::from) { + Ok((ret, mut jobtrace)) => { + jobtrace.add_event_now(ReadEventKind::ReadNextValuesFutureDone); + Ok((ret, jobtrace)) + } + Err(e) => Err(e), + } + }; + Box::pin(fut) + as Pin< + Box< + dyn Future< + Output = Result<(Box, ReadJobTrace), crate::worker::Error>, + > + Send, + >, + > + }; + let (res, jobtrace) = scyqueue.read_next_values(futgen, jobtrace).await?; + Ok((res, jobtrace)) +} + +trait ValTy: Sized + 'static { + type ScaTy: ScalarOps + std::default::Default; + type ScyTy: for<'a, 'b> scylla::deserialize::DeserializeValue<'a, 'b>; + type ScyRowTy: for<'a, 'b> scylla::deserialize::DeserializeRow<'a, 'b>; + type Container: BinningggContainerEventsDyn + Empty + Appendable; + fn from_valueblob(inp: Vec) -> Self; + fn table_name() -> &'static str; + fn default() -> Self; + fn is_valueblob() -> bool; + fn st_name() -> &'static str; + fn read_next_values_trait( + opts: ReadNextValuesOpts, + jobtrace: ReadJobTrace, + scy: Arc, + stmts: Arc, + ) -> Pin, ReadJobTrace), Error>> + Send>>; + fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self); +} + +macro_rules! impl_scaty_scalar { + ($st:ty, $st_scy:ty, $st_name:expr, $table_name:expr) => { + impl ValTy for $st { + type ScaTy = $st; + type ScyTy = $st_scy; + type ScyRowTy = (i64, $st_scy); + type Container = ContainerEvents; + + fn from_valueblob(_inp: Vec) -> Self { + panic!("unused") + } + + fn table_name() -> &'static str { + concat!("scalar_", $table_name) + } + + fn default() -> Self { + ::default() + } + + fn is_valueblob() -> bool { + false + } + + fn st_name() -> &'static str { + $st_name + } + + fn read_next_values_trait( + opts: ReadNextValuesOpts, + jobtrace: ReadJobTrace, + scy: Arc, + stmts: Arc, + ) -> Pin, ReadJobTrace), Error>> + Send>> { + Box::pin(read_next_values_2::(opts, jobtrace, scy, stmts)) + } + + fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) { + let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64); + (ts, inp.1 as Self::ScaTy) + } + } + }; +} + +macro_rules! impl_scaty_array { + ($vt:ty, $st:ty, $st_scy:ty, $st_name:expr, $table_name:expr) => { + impl ValTy for $vt { + type ScaTy = $st; + type ScyTy = $st_scy; + type ScyRowTy = (i64, $st_scy); + type Container = ContainerEvents>; + + fn from_valueblob(inp: Vec) -> Self { + if inp.len() < 32 { + ::default() + } else { + let en = std::mem::size_of::(); + let n = (inp.len().max(32) - 32) / en; + let mut c = Vec::with_capacity(n); + for i in 0..n { + let r1 = &inp[32 + en * (0 + i)..32 + en * (1 + i)]; + let p1 = r1 as *const _ as *const $st; + let v1 = unsafe { p1.read_unaligned() }; + c.push(v1); + } + c + } + } + + fn table_name() -> &'static str { + concat!("array_", $table_name) + } + + fn default() -> Self { + Vec::new() + } + + fn is_valueblob() -> bool { + true + } + + fn st_name() -> &'static str { + $st_name + } + + fn read_next_values_trait( + opts: ReadNextValuesOpts, + jobtrace: ReadJobTrace, + scy: Arc, + stmts: Arc, + ) -> Pin, ReadJobTrace), Error>> + Send>> { + Box::pin(read_next_values_2::(opts, jobtrace, scy, stmts)) + } + + fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) { + let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64); + (ts, inp.1 .into_iter().map(|x| x as _).collect()) + } + } + }; +} + +impl ValTy for EnumVariant { + type ScaTy = EnumVariant; + type ScyTy = i16; + type ScyRowTy = (i64, i16, String); + type Container = ContainerEvents; + + fn from_valueblob(_inp: Vec) -> Self { + panic!("unused") + } + + fn table_name() -> &'static str { + "array_string" + } + + fn default() -> Self { + ::default() + } + + fn is_valueblob() -> bool { + false + } + + fn st_name() -> &'static str { + "enum" + } + + fn read_next_values_trait( + opts: ReadNextValuesOpts, + jobtrace: ReadJobTrace, + scy: Arc, + stmts: Arc, + ) -> Pin, ReadJobTrace), Error>> + Send>> { + Box::pin(read_next_values_2::(opts, jobtrace, scy, stmts)) + } + + fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) { + let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64); + (ts, EnumVariant::new(inp.1 as u16, inp.2)) + } +} + +impl ValTy for Vec { + type ScaTy = String; + type ScyTy = Vec; + type ScyRowTy = (i64, Vec); + type Container = ContainerEvents>; + + fn from_valueblob(_inp: Vec) -> Self { + panic!("unused") + } + + fn table_name() -> &'static str { + "array_string" + } + + fn default() -> Self { + Vec::new() + } + + fn is_valueblob() -> bool { + false + } + + fn st_name() -> &'static str { + "string" + } + + fn read_next_values_trait( + opts: ReadNextValuesOpts, + jobtrace: ReadJobTrace, + scy: Arc, + stmts: Arc, + ) -> Pin, ReadJobTrace), Error>> + Send>> { + let fut = read_next_values_2::(opts, jobtrace, scy, stmts); + Box::pin(fut) + } + + fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) { + let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64); + (ts, inp.1) + } +} + +impl_scaty_scalar!(u8, i8, "u8", "u8"); +impl_scaty_scalar!(u16, i16, "u16", "u16"); +impl_scaty_scalar!(u32, i32, "u32", "u32"); +impl_scaty_scalar!(u64, i64, "u64", "u64"); +impl_scaty_scalar!(i8, i8, "i8", "i8"); +impl_scaty_scalar!(i16, i16, "i16", "i16"); +impl_scaty_scalar!(i32, i32, "i32", "i32"); +impl_scaty_scalar!(i64, i64, "i64", "i64"); +impl_scaty_scalar!(f32, f32, "f32", "f32"); +impl_scaty_scalar!(f64, f64, "f64", "f64"); +impl_scaty_scalar!(bool, bool, "bool", "bool"); +impl_scaty_scalar!(String, String, "string", "string"); + +impl_scaty_array!(Vec, u8, Vec, "u8", "u8"); +impl_scaty_array!(Vec, u16, Vec, "u16", "u16"); +impl_scaty_array!(Vec, u32, Vec, "u32", "u32"); +impl_scaty_array!(Vec, u64, Vec, "u64", "u64"); +impl_scaty_array!(Vec, i8, Vec, "i8", "i8"); +impl_scaty_array!(Vec, i16, Vec, "i16", "i16"); +impl_scaty_array!(Vec, i32, Vec, "i32", "i32"); +impl_scaty_array!(Vec, i64, Vec, "i64", "i64"); +impl_scaty_array!(Vec, f32, Vec, "f32", "f32"); +impl_scaty_array!(Vec, f64, Vec, "f64", "f64"); +impl_scaty_array!(Vec, bool, Vec, "bool", "bool"); diff --git a/crates/scyllaconn/src/events2/onebeforeandbulk.rs b/crates/scyllaconn/src/events2/onebeforeandbulk.rs index 8c9c0ae..a6cd2f5 100644 --- a/crates/scyllaconn/src/events2/onebeforeandbulk.rs +++ b/crates/scyllaconn/src/events2/onebeforeandbulk.rs @@ -1,5 +1,3 @@ -use daqbuf_err as err; -use err::thiserror; use futures_util::Stream; use futures_util::StreamExt; use items_0::merge::DrainIntoDstResult; @@ -13,9 +11,9 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; -macro_rules! trace_transition { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } +macro_rules! trace_transition { ($($arg:expr),*) => ( if true { trace!($($arg),*); } ) } -macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } +macro_rules! trace_emit { ($($arg:expr),*) => ( if true { trace!($($arg),*); } ) } macro_rules! tracer_poll_enter { ($self:expr) => { @@ -33,18 +31,18 @@ macro_rules! tracer_loop_enter { }; } -#[allow(unused)] -macro_rules! debug_fetch { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) } +macro_rules! debug_fetch { ($($arg:expr),*) => ( if true { debug!($($arg),*); } ) } -#[derive(Debug, thiserror::Error)] -#[cstm(name = "EventsOneBeforeAndBulk")] -pub enum Error { - Unordered, - Logic, - Input(Box), - LimitPoll, - LimitLoop, -} +autoerr::create_error_v1!( + name(Error, "EventsOneBeforeAndBulk"), + enum variants { + Unordered, + Logic, + Input(Box), + LimitPoll, + LimitLoop, + }, +); #[derive(Debug)] pub enum Output { @@ -164,7 +162,7 @@ where } // Separate events into before and bulk let ppp = MergeableTy::find_lowest_index_ge(&item, self.ts0); - trace_transition!("partition_point {ppp:?} {n:?}", n = item.len()); + trace_transition!("partition_point {:?} {:?}", ppp, item.len()); if let Some(pp) = ppp { if pp == 0 { // all entries are bulk @@ -172,12 +170,22 @@ where self.state = State::Bulk; if let Some(before) = self.consume_buf_get_latest() { self.out.push_back(item); + let emit_len = before.len(); let item = Output::Before(before); - trace_emit!("State::Begin Before {} emit {:?}", self.dbgname, item); + trace_emit!( + "State::Begin Before {} emit_len {}", + self.dbgname, + emit_len + ); Ready(Some(Ok(item))) } else { + let emit_len = item.len(); let item = Output::Bulk(item); - trace_emit!("State::Begin Bulk {} emit {:?}", self.dbgname, item); + trace_emit!( + "State::Begin Bulk {} emit_len {}", + self.dbgname, + emit_len + ); Ready(Some(Ok(item))) } } else { @@ -189,19 +197,21 @@ where DrainIntoDstResult::Done => { if let Some(before) = self.consume_buf_get_latest() { self.out.push_back(item); + let emit_len = before.len(); let item = Output::Before(before); trace_emit!( - "State::Begin Before {} emit {:?}", + "State::Begin Before {} emit_len {}", self.dbgname, - item + emit_len ); Ready(Some(Ok(item))) } else { + let emit_len = item.len(); let item = Output::Bulk(item); trace_emit!( - "State::Begin Bulk {} emit {:?}", + "State::Begin Bulk {} emit_len {}", self.dbgname, - item + emit_len ); Ready(Some(Ok(item))) } @@ -214,19 +224,21 @@ where self.buf = Some(buf); if let Some(before) = self.consume_buf_get_latest() { self.out.push_back(item); + let emit_len = before.len(); let item = Output::Before(before); trace_emit!( - "State::Begin Before {} emit {:?}", + "State::Begin Before {} emit_len {}", self.dbgname, - item + emit_len ); Ready(Some(Ok(item))) } else { + let emit_len = item.len(); let item = Output::Bulk(item); trace_emit!( - "State::Begin Bulk {} emit {:?}", + "State::Begin Bulk {} emit_len {}", self.dbgname, - item + emit_len ); Ready(Some(Ok(item))) } @@ -270,8 +282,9 @@ where self.state = State::Done; trace_transition!("transition from Begin to end of stream"); if let Some(before) = self.consume_buf_get_latest() { + let emit_len = before.len(); let item = Output::Before(before); - trace_emit!("State::Begin EOS {} emit {:?}", self.dbgname, item); + trace_emit!("State::Begin EOS {} emit_len {}", self.dbgname, emit_len); Ready(Some(Ok(item))) } else { trace_emit!("State::Begin EOS {} emit None", self.dbgname); @@ -307,8 +320,9 @@ where if item.len() == 0 { self.seen_empty_during_bulk = true; } + let item_len = item.len(); let item = Output::Bulk(item); - trace_emit!("State::Bulk data {} emit {:?}", self.dbgname, item); + trace_emit!("State::Bulk data {} item_len {}", self.dbgname, item_len); Ready(Some(Ok(item))) } } diff --git a/crates/scyllaconn/src/events2/prepare.rs b/crates/scyllaconn/src/events2/prepare.rs index e75fff7..4af3f7a 100644 --- a/crates/scyllaconn/src/events2/prepare.rs +++ b/crates/scyllaconn/src/events2/prepare.rs @@ -1,22 +1,19 @@ -use daqbuf_err as err; -use err::thiserror; -use err::ThisError; use netpod::ttl::RetentionTime; use scylla::prepared_statement::PreparedStatement; use scylla::Session; -#[derive(Debug, ThisError)] -#[cstm(name = "ScyllaPrepare")] -pub enum Error { - ScyllaQuery(#[from] scylla::transport::errors::QueryError), - ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError), - ScyllaTypeConv(#[from] scylla::cql_to_rust::FromRowError), - ScyllaWorker(Box), - MissingQuery(String), - RangeEndOverflow, - InvalidFuture, - TestError(String), -} +autoerr::create_error_v1!( + name(Error, "ScyllaPrepare"), + enum variants { + ScyllaQuery(#[from] scylla::transport::errors::QueryError), + ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError), + ScyllaWorker(Box), + MissingQuery(String), + RangeEndOverflow, + InvalidFuture, + TestError(String), + }, +); #[derive(Debug)] pub struct StmtsLspShape { diff --git a/crates/scyllaconn/src/scyllaconn.rs b/crates/scyllaconn/src/scyllaconn.rs index 5da9244..958db83 100644 --- a/crates/scyllaconn/src/scyllaconn.rs +++ b/crates/scyllaconn/src/scyllaconn.rs @@ -2,7 +2,6 @@ pub mod accounting; pub mod bincache; pub mod conn; pub mod errconv; -pub mod events; pub mod events2; pub mod range; pub mod schema; diff --git a/crates/scyllaconn/src/worker.rs b/crates/scyllaconn/src/worker.rs index b0e2c48..813ed87 100644 --- a/crates/scyllaconn/src/worker.rs +++ b/crates/scyllaconn/src/worker.rs @@ -1,5 +1,5 @@ use crate::conn::create_scy_session_no_ks; -use crate::events::ReadJobTrace; +use crate::events2::events::ReadJobTrace; use crate::events2::prepare::StmtsEvents; use crate::range::ScyllaSeriesRange; use async_channel::Receiver; @@ -28,7 +28,7 @@ autoerr::create_error_v1!( enum variants { ScyllaConnection(err::Error), Prepare(#[from] crate::events2::prepare::Error), - EventsQuery(#[from] crate::events::Error), + Events(#[from] crate::events2::events::Error), Msp(#[from] crate::events2::msp::Error), ChannelSend, ChannelRecv, @@ -270,7 +270,9 @@ impl ScyllaWorker { // TODO count for stats } } - Job::WriteCacheF32(_, _, tx) => { + Job::WriteCacheF32(a, b, tx) => { + let _ = a; + let _ = b; // let res = super::bincache::worker_write(series, bins, &stmts_cache, &scy).await; let res = Err(streams::timebin::cached::reader::Error::TodoImpl); if tx.send(res).await.is_err() { diff --git a/crates/streamio/Cargo.toml b/crates/streamio/Cargo.toml index d6e01c4..8103d36 100644 --- a/crates/streamio/Cargo.toml +++ b/crates/streamio/Cargo.toml @@ -20,7 +20,6 @@ crc32fast = "1.4.2" byteorder = "1.5.0" async-channel = "1.9.0" rand_xoshiro = "0.6.0" -thiserror = "=0.0.1" autoerr = "0.0.3" chrono = { version = "0.4.39", features = ["serde"] } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } @@ -35,6 +34,3 @@ http-body-util = "0.1.2" [dev-dependencies] taskrun = { path = "../taskrun" } - -[patch.crates-io] -thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" } diff --git a/crates/streamio/src/tcprawclient.rs b/crates/streamio/src/tcprawclient.rs index 2b772da..c4048d0 100644 --- a/crates/streamio/src/tcprawclient.rs +++ b/crates/streamio/src/tcprawclient.rs @@ -62,6 +62,7 @@ pub async fn x_processed_event_blobs_stream_from_node_tcp( Ok(Box::pin(items)) } +#[allow(unused)] async fn open_event_data_streams_tcp(subq: EventsSubQuery, cluster: &Cluster) -> Result>, Error> where // TODO group bounds in new trait diff --git a/crates/streamio/src/tcpreadasbytes.rs b/crates/streamio/src/tcpreadasbytes.rs index 916439c..45e63cf 100644 --- a/crates/streamio/src/tcpreadasbytes.rs +++ b/crates/streamio/src/tcpreadasbytes.rs @@ -5,11 +5,12 @@ use std::task::Context; use std::task::Poll; use tokio::io::AsyncRead; -#[derive(Debug, thiserror::Error)] -#[cstm(name = "TcpReadAsBytes")] -pub enum Error { - IO(#[from] std::io::Error), -} +autoerr::create_error_v1!( + name(Error, "TcpReadAsBytes"), + enum variants { + IO(#[from] std::io::Error), + }, +); pub struct TcpReadAsBytes { inp: INP, diff --git a/crates/taskrun/Cargo.toml b/crates/taskrun/Cargo.toml index ac1742d..3fe3aab 100644 --- a/crates/taskrun/Cargo.toml +++ b/crates/taskrun/Cargo.toml @@ -22,6 +22,8 @@ daqbuf-err = { path = "../../../daqbuf-err" } [features] with-console = [] #console-subscriber = { version = "0.3.0" } +DISABLED_LOKI = [] +DISABLED_CONSOLE = [] with-loki = [] #tracing-loki = { version = "0.2.1", default-features = false, features = ["compat-0-2-1"] } diff --git a/crates/taskrun/src/taskrun.rs b/crates/taskrun/src/taskrun.rs index e952e8e..21b557c 100644 --- a/crates/taskrun/src/taskrun.rs +++ b/crates/taskrun/src/taskrun.rs @@ -215,7 +215,7 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { sr = g.parent(); } } - allow = true; + // allow = true; allow } else { false @@ -251,7 +251,7 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { sr = g.parent(); } } - allow = true; + // allow = true; allow } else { false @@ -286,14 +286,14 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { let reg = tracing_subscriber::registry(); - #[cfg(DISABLED_CONSOLE)] + #[cfg(feature = "DISABLED_CONSOLE")] let reg = { let (console_layer, console_server) = console_subscriber::ConsoleLayer::builder().build(); tokio::spawn(console_server.serve()); reg.with(console_layer) }; - #[cfg(DISABLED_CONSOLE)] + #[cfg(feature = "DISABLED_CONSOLE")] let reg = { let pid = std::process::id(); // let cspn = format!("/tmp/daqbuffer.tokio.console.pid.{pid}"); @@ -315,7 +315,7 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { format!("{e}") })?; } - #[cfg(DISABLED_LOKI)] + #[cfg(feature = "DISABLED_LOKI")] // TODO tracing_loki seems not well composable, try open telemetry instead. if false { /*let fmt_layer = tracing_subscriber::fmt::Layer::new()