diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index 6406d91..99becac 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -4,9 +4,9 @@ version = 3 [[package]] name = "addr2line" -version = "0.17.0" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9ecd88a8c8378ca913a680cd98f0f13ac67383d35993f86c90a70e3f137816b" +checksum = "a76fd60b23679b7d19bd066031410fb7e458ccc5e958eb5c325888ce4baedc97" dependencies = [ "gimli", ] @@ -57,15 +57,15 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.66" +version = "1.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" +checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61" [[package]] name = "arc-swap" -version = "1.5.1" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "983cd8b9d4b02a6dc6ffa557262eb5858a27a0038ffffe21a0f133eaa819a164" +checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" [[package]] name = "arrayref" @@ -107,9 +107,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.59" +version = "0.1.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31e6e93155431f3931513b243d371981bb2770112b370c82745a1d19d2f99364" +checksum = "eff18d764974428cf3a9328e23fc5c986f5fbed46e6cd4cdf42544df5d297ec1" dependencies = [ "proc-macro2", "quote", @@ -124,9 +124,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.6.1" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08b108ad2665fa3f6e6a517c3d80ec3e77d224c47d605167aefaa5d7ef97fa48" +checksum = "e5694b64066a2459918d8074c2ce0d5a88f409431994c2356617c8ae0c4721fc" dependencies = [ "async-trait", "axum-core", @@ -153,9 +153,9 @@ dependencies = [ [[package]] name = "axum-core" -version = "0.3.0" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79b8558f5a0581152dc94dcd289132a1d377494bdeafcd41869b3258e3e2ad92" +checksum = "1cae3e661676ffbacb30f1a824089a8c9150e71017f7e1e38f2aa32009188d34" dependencies = [ "async-trait", "bytes", @@ -170,15 +170,15 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.66" +version = "0.3.67" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cab84319d616cfb654d03394f38ab7e6f0919e181b1b57e1fd15e7fb4077d9a7" +checksum = "233d376d6d185f2a3093e58f283f60f880315b6c60075b01f36b3b85154564ca" dependencies = [ "addr2line", "cc", "cfg-if", "libc", - "miniz_oxide 0.5.4", + "miniz_oxide", "object", "rustc-demangle", ] @@ -234,9 +234,9 @@ dependencies = [ [[package]] name = "bson" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99d76085681585d39016f4d3841eb019201fc54d2dd0d92ad1e4fab3bfb32754" +checksum = "8746d07211bb12a7c34d995539b4a2acd4e0b0e757de98ce2ab99bcf17443fad" dependencies = [ "ahash", "base64", @@ -253,9 +253,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.11.1" +version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba" +checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535" [[package]] name = "byteorder" @@ -271,9 +271,9 @@ checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c" [[package]] name = "cc" -version = "1.0.77" +version = "1.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9f73505338f7d905b19d18738976aae232eb46b8efc15554ffc56deb5d9ebe4" +checksum = "a20104e2335ce8a659d6dd92a51a767a0c062599c73b343fd152cb401e828c3d" [[package]] name = "cfg-if" @@ -326,9 +326,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.0.29" +version = "4.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d63b9e9c07271b9957ad22c173bae2a4d9a81127680962039296abcd2f8251d" +checksum = "f13b9c79b5d1dd500d20ef541215a6423c75829ef43117e1b4d17fd8af0b5d76" dependencies = [ "bitflags", "clap_derive", @@ -341,9 +341,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.0.21" +version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0177313f9f02afc995627906bbd8967e2be069f5261954222dac78290c2b9014" +checksum = "684a277d672e91966334af371f1a7b5833f9aa00b07c84e92fbce95e00208ce8" dependencies = [ "heck 0.4.0", "proc-macro-error", @@ -354,9 +354,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d4198f73e42b4936b35b5bb248d81d2b595ecb170da0bac7655c54eedfa8da8" +checksum = "783fe232adfca04f90f56201b26d79682d4cd2625e0bc7290b95123afe558ade" dependencies = [ "os_str_bytes", ] @@ -396,9 +396,9 @@ dependencies = [ [[package]] name = "concurrent-queue" -version = "2.0.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd7bef69dc86e3c610e4e7aed41035e2a7ed12e72dd7530f61327a6579a4390b" +checksum = "c278839b831783b70278b14df4d45e1beb1aad306c07bb796637de9a0e323e8e" dependencies = [ "crossbeam-utils", ] @@ -558,9 +558,9 @@ dependencies = [ [[package]] name = "cxx" -version = "1.0.83" +version = "1.0.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdf07d07d6531bfcdbe9b8b739b104610c6508dcc4d63b410585faf338241daf" +checksum = "322296e2f2e5af4270b54df9e85a02ff037e271af20ba3e7fe1575515dc840b8" dependencies = [ "cc", "cxxbridge-flags", @@ -570,9 +570,9 @@ dependencies = [ [[package]] name = "cxx-build" -version = "1.0.83" +version = "1.0.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2eb5b96ecdc99f72657332953d4d9c50135af1bac34277801cc3937906ebd39" +checksum = "017a1385b05d631e7875b1f151c9f012d37b53491e2a87f65bff5c262b2111d8" dependencies = [ "cc", "codespan-reporting", @@ -585,15 +585,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.83" +version = "1.0.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac040a39517fd1674e0f32177648334b0f4074625b5588a64519804ba0553b12" +checksum = "c26bbb078acf09bc1ecda02d4223f03bdd28bd4874edcb0379138efc499ce971" [[package]] name = "cxxbridge-macro" -version = "1.0.83" +version = "1.0.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1362b0ddcfc4eb0a1f57b68bd77dd99f0e826958a96abd0ae9bd092e114ffed6" +checksum = "357f40d1f06a24b60ae1fe122542c1fb05d28d32acb2aed064e84bc2ad1e252e" dependencies = [ "proc-macro2", "quote", @@ -602,7 +602,7 @@ dependencies = [ [[package]] name = "daqbuffer" -version = "4.1.0" +version = "0.3.6" dependencies = [ "bytes", "chrono", @@ -619,6 +619,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", + "serde_yaml", "taskrun", "tokio", "tracing", @@ -667,7 +668,7 @@ dependencies = [ "hashbrown", "lock_api", "once_cell", - "parking_lot_core 0.9.5", + "parking_lot_core 0.9.6", ] [[package]] @@ -769,15 +770,15 @@ dependencies = [ [[package]] name = "either" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797" +checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91" [[package]] name = "erased-serde" -version = "0.3.23" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54558e0ba96fbe24280072642eceb9d7d442e32c7ec0ea9e7ecd7b4ea2cf4e11" +checksum = "e4ca605381c017ec7a5fef5e548f1cfaa419ed0f6df6367339300db74c92aa7d" dependencies = [ "serde", ] @@ -848,7 +849,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8a2db397cb1c8772f31494cb8917e48cd1e64f0fa7efac59fbd741a0a8ce841" dependencies = [ "crc32fast", - "miniz_oxide 0.6.2", + "miniz_oxide", ] [[package]] @@ -1040,9 +1041,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.26.2" +version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22030e2c5a68ec659fde1e949a745124b48e6fa8b045b7ed5bd1fe4ccc5c4e5d" +checksum = "221996f774192f0f718773def8201c4ae31f02616a54ccfc2d358bb0e5cefdec" [[package]] name = "h2" @@ -1107,15 +1108,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" -[[package]] -name = "hermit-abi" -version = "0.1.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" -dependencies = [ - "libc", -] - [[package]] name = "hermit-abi" version = "0.2.6" @@ -1209,7 +1201,7 @@ checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" [[package]] name = "httpret" -version = "0.0.2" +version = "0.3.6" dependencies = [ "async-channel", "bytes", @@ -1248,6 +1240,16 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "humantime-serde" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c" +dependencies = [ + "humantime", + "serde", +] + [[package]] name = "hyper" version = "0.14.23" @@ -1352,24 +1354,24 @@ dependencies = [ [[package]] name = "io-lifetimes" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46112a93252b123d31a119a8d1a1ac19deac4fac6e0e8b0df58f0d4e5870e63c" +checksum = "e7d6c6f8c91b4b9ed43484ad1a938e393caf35960fce7f82a040497207bd8e9e" dependencies = [ "libc", - "windows-sys 0.42.0", + "windows-sys", ] [[package]] name = "is-terminal" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "927609f78c2913a6f6ac3c27a4fe87f43e2a35367c0c4b0f8265e8f49a104330" +checksum = "28dfb6c8100ccc63462345b67d1bbc3679177c75ee4bf59bf29c8b1d110b8189" dependencies = [ - "hermit-abi 0.2.6", + "hermit-abi", "io-lifetimes", "rustix", - "windows-sys 0.42.0", + "windows-sys", ] [[package]] @@ -1417,6 +1419,7 @@ dependencies = [ "erased-serde", "err", "futures-util", + "humantime-serde", "items", "items_0", "items_proc", @@ -1447,9 +1450,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4217ad341ebadf8d8e724e264f13e593e0648f5b3e94b3896a5df283be015ecc" +checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440" [[package]] name = "js-sys" @@ -1468,24 +1471,24 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.138" +version = "0.2.139" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db6d7e329c562c5dfab7a46a2afabc8b987ab9a4834c9d1ca04dc54c1546cef8" +checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79" [[package]] name = "link-cplusplus" -version = "1.0.7" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9272ab7b96c9046fbc5bc56c06c117cb639fe2d509df0c421cad82d2915cf369" +checksum = "ecd207c9c713c34f95a097a5b029ac2ce6010530c7b49d7fea24d977dede04f5" dependencies = [ "cc", ] [[package]] name = "linux-raw-sys" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f9f08d8963a6c613f4b1a78f4f4a4dbfadf8e6545b2d72861731e4858b8b47f" +checksum = "f051f77a7c8e6957c0696eac88f26b0117e54f52d3fc682ab19397a8812846a4" [[package]] name = "lock_api" @@ -1575,15 +1578,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" -[[package]] -name = "miniz_oxide" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96590ba8f175222643a85693f33d26e9c8a015f599c216509b1a6894af675d34" -dependencies = [ - "adler", -] - [[package]] name = "miniz_oxide" version = "0.6.2" @@ -1602,7 +1596,7 @@ dependencies = [ "libc", "log", "wasi 0.11.0+wasi-snapshot-preview1", - "windows-sys 0.42.0", + "windows-sys", ] [[package]] @@ -1632,6 +1626,7 @@ dependencies = [ "chrono", "err", "futures-util", + "humantime-serde", "num-traits", "serde", "serde_json", @@ -1672,14 +1667,23 @@ dependencies = [ [[package]] name = "nom" -version = "7.1.1" +version = "7.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8903e5a29a317527874d0402f867152a3d21c908bb0b933e416c65e301d4c36" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" dependencies = [ "memchr", "minimal-lexical", ] +[[package]] +name = "nom8" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae01545c9c7fc4486ab7debaf2aad7003ac19431791868fb2e8066df97fad2f8" +dependencies = [ + "memchr", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1733,28 +1737,28 @@ dependencies = [ [[package]] name = "num_cpus" -version = "1.14.0" +version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6058e64324c71e02bc2b150e4f3bc8286db6c83092132ffa3f6b1eab0f9def5" +checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b" dependencies = [ - "hermit-abi 0.1.19", + "hermit-abi", "libc", ] [[package]] name = "num_enum" -version = "0.5.7" +version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf5395665662ef45796a4ff5486c5d41d29e0c09640af4c5f17fd94ee2c119c9" +checksum = "8d829733185c1ca374f17e52b762f24f535ec625d2cc1f070e34c8a9068f341b" dependencies = [ "num_enum_derive", ] [[package]] name = "num_enum_derive" -version = "0.5.7" +version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b0498641e53dd6ac1a4f22547548caa6864cc4933784319cd1775271c5a46ce" +checksum = "2be1598bf1c313dcdd12092e3f1920f463462525a21b7b4e11b4168353d0123e" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1764,24 +1768,24 @@ dependencies = [ [[package]] name = "object" -version = "0.29.0" +version = "0.30.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21158b2c33aa6d4561f1c0a6ea283ca92bc54802a93b263e910746d679a7eb53" +checksum = "ea86265d3d3dcb6a27fc51bd29a4bf387fae9d2986b823079d4986af253eb439" dependencies = [ "memchr", ] [[package]] name = "once_cell" -version = "1.16.0" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860" +checksum = "6f61fba1741ea2b3d6a1e3178721804bb716a68a6aeba1149b5d52e3d464ea66" [[package]] name = "openssl" -version = "0.10.44" +version = "0.10.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29d971fd5722fec23977260f6e81aa67d2f22cadbdc2aa049f1022d9a3be1566" +checksum = "b102428fd03bc5edf97f62620f7298614c45cedf287c271e7ed450bbaf83f2e1" dependencies = [ "bitflags", "cfg-if", @@ -1811,9 +1815,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.79" +version = "0.9.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5454462c0eced1e97f2ec09036abc8da362e66802f66fd20f86854d9d8cbcbc4" +checksum = "23bbbf7854cd45b83958ebe919f0e8e516793727652e27fda10a8384cfc790b7" dependencies = [ "autocfg", "cc", @@ -1852,7 +1856,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core 0.9.5", + "parking_lot_core 0.9.6", ] [[package]] @@ -1871,15 +1875,15 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.5" +version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ff9f3fef3968a3ec5945535ed654cb38ff72d7495a25619e2247fb15a2ed9ba" +checksum = "ba1ef8814b5c993410bb3adfad7a5ed269563e4a2f90c41f5d85be7fb47133bf" dependencies = [ "cfg-if", "libc", "redox_syscall", "smallvec", - "windows-sys 0.42.0", + "windows-sys", ] [[package]] @@ -1902,9 +1906,9 @@ dependencies = [ [[package]] name = "paste" -version = "1.0.10" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf1c2c742266c2f1041c914ba65355a83ae8747b05f208319784083583494b4b" +checksum = "d01a5bd0424d00070b0098dd17ebca6f961a959dead1dbcbbbc1d1cd8d3deeba" [[package]] name = "percent-encoding" @@ -2008,13 +2012,12 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro-crate" -version = "1.2.1" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eda0fc3b0fb7c975631757e14d9049da17374063edb6ebbcbc54d880d4fe94e9" +checksum = "66618389e4ec1c7afe67d51a9bf34ff9236480f8d51e7489b7d5ab0303c13f34" dependencies = [ "once_cell", - "thiserror", - "toml", + "toml_edit", ] [[package]] @@ -2043,18 +2046,18 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.47" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725" +checksum = "6ef7d57beacfaf2d8aee5937dab7b7f28de3cb8b1828479bb5de2a7106f2bae2" dependencies = [ "unicode-ident", ] [[package]] name = "prost" -version = "0.11.3" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0b18e655c21ff5ac2084a5ad0611e827b3f92badf79f4910b5a5c58f4d87ff0" +checksum = "21dc42e00223fc37204bd4aa177e69420c604ca4a183209a8f9de30c6d934698" dependencies = [ "bytes", "prost-derive", @@ -2062,9 +2065,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.11.2" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "164ae68b6587001ca506d3bf7f1000bfa248d0e1217b618108fba4ec1d0cc306" +checksum = "8bda8c0881ea9f722eb9629376db3d0b903b462477c1aafcb0566610ac28ac5d" dependencies = [ "anyhow", "itertools", @@ -2075,9 +2078,9 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.11.2" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "747761bc3dc48f9a34553bf65605cf6cb6288ba219f3450b4275dbd81539551a" +checksum = "a5e0526209433e96d83d750dd81a99118edbc55739e7e61a46764fd2ad537788" dependencies = [ "bytes", "prost", @@ -2085,9 +2088,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.21" +version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbe448f377a7d6961e30f5955f9b8d106c3f5e449d493ee1b125c1d43c2b5179" +checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b" dependencies = [ "proc-macro2", ] @@ -2133,9 +2136,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.7.0" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e076559ef8e241f2ae3479e36f97bd5741c0330689e217ad51ce2c76808b868a" +checksum = "48aaa5748ba571fb95cd2c85c09f629215d3a6ece942baa100950af03a34f733" dependencies = [ "aho-corasick", "memchr", @@ -2196,38 +2199,37 @@ checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" [[package]] name = "rustix" -version = "0.36.5" +version = "0.36.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3807b5d10909833d3e9acd1eb5fb988f79376ff10fce42937de71a449c4c588" +checksum = "d4fdebc4b395b7fbb9ab11e462e20ed9051e7b16e42d24042c776eca0ac81b03" dependencies = [ "bitflags", "errno", "io-lifetimes", "libc", "linux-raw-sys", - "windows-sys 0.42.0", + "windows-sys", ] [[package]] name = "rustversion" -version = "1.0.9" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97477e48b4cf8603ad5f7aaf897467cf42ab4218a38ef76fb14c2d6773a6d6a8" +checksum = "5583e89e108996506031660fe09baa5011b9dd0341b89029313006d1fb508d70" [[package]] name = "ryu" -version = "1.0.11" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09" +checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde" [[package]] name = "schannel" -version = "0.1.20" +version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88d6731146462ea25d9244b2ed5fd1d716d25c52e4d54aa4fb0f3c4e9854dbe2" +checksum = "713cfb06c7059f3588fb8044c0fad1d09e3c01d225e25b9220dbfdcf16dbb1b3" dependencies = [ - "lazy_static", - "windows-sys 0.36.1", + "windows-sys", ] [[package]] @@ -2238,15 +2240,15 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "scratch" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c8132065adcfd6e02db789d9285a0deb2f3fcb04002865ab67d5fb103533898" +checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2" [[package]] name = "scylla" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "345a33a39eb25c82e48f7e82cb7378ec724e72a48579bb9c83e1c511d5b44fb6" +checksum = "4b9d4ef7fb24d95d30c4a8da782bb2afead3a8b66f32c805bb82a03722d7be33" dependencies = [ "arc-swap", "async-trait", @@ -2276,10 +2278,11 @@ dependencies = [ [[package]] name = "scylla-cql" -version = "0.0.2" +version = "0.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f01015d74993239c1ad6fa89e4c0baed8af37efaaaf1fb9a5abecc119a7e31c6" +checksum = "6972061cbcc83754b4243d007ae51c1a1345a950b368cbdaad0186eac8799203" dependencies = [ + "async-trait", "bigdecimal", "byteorder", "bytes", @@ -2296,9 +2299,9 @@ dependencies = [ [[package]] name = "scylla-macros" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13dc0caffb1274feb3df615e3260cb71a5a7a5d579adc49ba5544c87950a701c" +checksum = "e03b3a19daa79085439113c746d2946e5e6effd2d9039bf092bb08df915487b2" dependencies = [ "quote", "syn", @@ -2326,13 +2329,14 @@ dependencies = [ "serde_json", "tokio", "tokio-postgres", + "tracing", ] [[package]] name = "security-framework" -version = "2.7.0" +version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bc1bb97804af6631813c55739f771071e0f2ed33ee20b68c86ec505d906356c" +checksum = "7c4437699b6d34972de58652c68b98cb5b53a4199ab126db8e20ec8ded29a721" dependencies = [ "bitflags", "core-foundation", @@ -2343,9 +2347,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.6.1" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0160a13a177a45bfb43ce71c01580998474f556ad854dcbca936dd2841a5c556" +checksum = "31c9bb296072e961fcbd8853511dd39c2d8be2deb1e17c6860b1d30732b323b4" dependencies = [ "core-foundation-sys", "libc", @@ -2353,18 +2357,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.150" +version = "1.0.152" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e326c9ec8042f1b5da33252c8a37e9ffbd2c9bef0155215b6e6c80c790e05f91" +checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb" dependencies = [ "serde_derive", ] [[package]] name = "serde_bytes" -version = "0.11.7" +version = "0.11.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfc50e8183eeeb6178dcb167ae34a8051d63535023ae38b5d8d12beae193d37b" +checksum = "718dc5fff5b36f99093fc49b280cfc96ce6fc824317783bff5a1fed0c7a64819" dependencies = [ "serde", ] @@ -2381,9 +2385,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.150" +version = "1.0.152" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42a3df25b0713732468deadad63ab9da1f1fd75a48a15024b50363f128db627e" +checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" dependencies = [ "proc-macro2", "quote", @@ -2392,9 +2396,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.89" +version = "1.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "020ff22c755c2ed3f8cf162dbb41a7268d934702f3ed3631656ea597e08fc3db" +checksum = "877c235533714907a8c2464236f5c4b2a17262ef1bd71f38f35ea592c8da6883" dependencies = [ "indexmap", "itoa", @@ -2402,6 +2406,19 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_yaml" +version = "0.9.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fb06d4b6cdaef0e0c51fa881acb721bed3c924cfaa71d9c94a3b771dfdf6567" +dependencies = [ + "indexmap", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + [[package]] name = "sha2" version = "0.10.6" @@ -2534,9 +2551,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.105" +version = "1.0.107" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60b9b43d45702de4c839cb9b51d9f529c5dd26a4aff255b42b1ebc03e88ee908" +checksum = "1f4064b5b16e03ae50984a5a8ed5d4f8803e6bc1fd170a3cda91a1be4b18e3f5" dependencies = [ "proc-macro2", "quote", @@ -2581,27 +2598,27 @@ dependencies = [ [[package]] name = "termcolor" -version = "1.1.3" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" +checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6" dependencies = [ "winapi-util", ] [[package]] name = "thiserror" -version = "1.0.37" +version = "1.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e" +checksum = "6a9cd18aa97d5c45c6603caea1da6628790b37f7a34b6ca89522331c5180fed0" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.37" +version = "1.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb" +checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f" dependencies = [ "proc-macro2", "quote", @@ -2681,9 +2698,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.23.0" +version = "1.24.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eab6d665857cc6ca78d6e80303a02cea7a7851e85dfbd77cbdc09bd129f1ef46" +checksum = "597a12a59981d9e3c38d216785b0c37399f6e415e8d0712047620f189371b0bb" dependencies = [ "autocfg", "bytes", @@ -2695,7 +2712,7 @@ dependencies = [ "socket2", "tokio-macros", "tracing", - "windows-sys 0.42.0", + "windows-sys", ] [[package]] @@ -2779,12 +2796,20 @@ dependencies = [ ] [[package]] -name = "toml" -version = "0.5.9" +name = "toml_datetime" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d82e1a7758622a465f8cee077614c73484dac5b836c02ff6a40d5d1010324d7" +checksum = "4553f467ac8e3d374bc9a177a26801e5d0f9b211aa1673fb137a403afd1c9cf5" + +[[package]] +name = "toml_edit" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56c59d8dd7d0dcbc6428bf7aa2f0e823e26e43b3c9aca15bbc9475d23e5fa12b" dependencies = [ - "serde", + "indexmap", + "nom8", + "toml_datetime", ] [[package]] @@ -2981,9 +3006,9 @@ dependencies = [ [[package]] name = "try-lock" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" [[package]] name = "twox-hash" @@ -3003,15 +3028,15 @@ checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" [[package]] name = "unicode-bidi" -version = "0.3.8" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992" +checksum = "d54675592c1dbefd78cbd98db9bacd89886e1ca50692a0692baefffdeb92dd58" [[package]] name = "unicode-ident" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3" +checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc" [[package]] name = "unicode-normalization" @@ -3034,6 +3059,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b" +[[package]] +name = "unsafe-libyaml" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc7ed8ba44ca06be78ea1ad2c3682a43349126c8818054231ee6f4748012aed2" + [[package]] name = "url" version = "2.3.1" @@ -3180,19 +3211,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" -[[package]] -name = "windows-sys" -version = "0.36.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" -dependencies = [ - "windows_aarch64_msvc 0.36.1", - "windows_i686_gnu 0.36.1", - "windows_i686_msvc 0.36.1", - "windows_x86_64_gnu 0.36.1", - "windows_x86_64_msvc 0.36.1", -] - [[package]] name = "windows-sys" version = "0.42.0" @@ -3200,82 +3218,52 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" dependencies = [ "windows_aarch64_gnullvm", - "windows_aarch64_msvc 0.42.0", - "windows_i686_gnu 0.42.0", - "windows_i686_msvc 0.42.0", - "windows_x86_64_gnu 0.42.0", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", "windows_x86_64_gnullvm", - "windows_x86_64_msvc 0.42.0", + "windows_x86_64_msvc", ] [[package]] name = "windows_aarch64_gnullvm" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e" +checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608" [[package]] name = "windows_aarch64_msvc" -version = "0.36.1" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" - -[[package]] -name = "windows_aarch64_msvc" -version = "0.42.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4" +checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7" [[package]] name = "windows_i686_gnu" -version = "0.36.1" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" - -[[package]] -name = "windows_i686_gnu" -version = "0.42.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7" +checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640" [[package]] name = "windows_i686_msvc" -version = "0.36.1" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" - -[[package]] -name = "windows_i686_msvc" -version = "0.42.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246" +checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605" [[package]] name = "windows_x86_64_gnu" -version = "0.36.1" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" - -[[package]] -name = "windows_x86_64_gnu" -version = "0.42.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed" +checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45" [[package]] name = "windows_x86_64_gnullvm" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028" +checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463" [[package]] name = "windows_x86_64_msvc" -version = "0.36.1" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" - -[[package]] -name = "windows_x86_64_msvc" -version = "0.42.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5" +checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd" diff --git a/daqbuffer/src/bin/daqbuffer.rs b/daqbuffer/src/bin/daqbuffer.rs index f50940f..0fb1aa1 100644 --- a/daqbuffer/src/bin/daqbuffer.rs +++ b/daqbuffer/src/bin/daqbuffer.rs @@ -108,7 +108,7 @@ async fn go() -> Result<(), Error> { } SubCmd::Logappend(k) => { let jh = tokio::task::spawn_blocking(move || { - taskrun::append::append(&k.dir, std::io::stdin()).unwrap(); + taskrun::append::append(&k.dir, k.total_size_max_bytes(), std::io::stdin()).unwrap(); }); jh.await.map_err(Error::from_string)?; } diff --git a/daqbuffer/src/cli.rs b/daqbuffer/src/cli.rs index 0ccf18c..0f363ea 100644 --- a/daqbuffer/src/cli.rs +++ b/daqbuffer/src/cli.rs @@ -78,4 +78,12 @@ pub struct BinnedClient { pub struct Logappend { #[arg(long)] pub dir: String, + #[arg(long)] + pub total_mb: Option, +} + +impl Logappend { + pub fn total_size_max_bytes(&self) -> u64 { + 1024 * 1024 * self.total_mb.unwrap_or(20) + } } diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index fe221df..9f7be4b 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -1,18 +1,42 @@ -use crate::decode::{BigEndian, Endianness, LittleEndian}; -use crate::decode::{EventValueFromBytes, EventValueShape, EventsDecodedStream, NumFromBytes}; -use crate::decode::{EventValuesDim0Case, EventValuesDim1Case}; +use crate::decode::BigEndian; +use crate::decode::Endianness; +use crate::decode::EventValueFromBytes; +use crate::decode::EventValueShape; +use crate::decode::EventValuesDim0Case; +use crate::decode::EventValuesDim1Case; +use crate::decode::EventsDecodedStream; +use crate::decode::LittleEndian; +use crate::decode::NumFromBytes; use crate::eventblobs::EventChunkerMultifile; use err::Error; -use futures_util::{Stream, StreamExt}; +use futures_util::Stream; +use futures_util::StreamExt; use items::eventfull::EventFull; -use items::numops::{BoolNum, NumOps, StringNum}; -use items::{EventsNodeProcessor, Framable, RangeCompletableItem, Sitemty, StreamItem}; +use items::numops::BoolNum; +use items::numops::NumOps; +use items::numops::StringNum; +use items::EventsNodeProcessor; +use items::Framable; +use items::RangeCompletableItem; +use items::Sitemty; +use items::StreamItem; use items_2::channelevents::ChannelEvents; use items_2::eventsdim0::EventsDim0; use netpod::log::*; use netpod::query::PlainEventsQuery; -use netpod::{AggKind, ByteOrder, ByteSize, Channel, DiskIoTune, NanoRange, NodeConfigCached, ScalarType, Shape}; -use parse::channelconfig::{extract_matching_config_entry, read_local_config, ConfigEntry, MatchingConfigEntry}; +use netpod::AggKind; +use netpod::ByteOrder; +use netpod::ByteSize; +use netpod::Channel; +use netpod::DiskIoTune; +use netpod::NanoRange; +use netpod::NodeConfigCached; +use netpod::ScalarType; +use netpod::Shape; +use parse::channelconfig::extract_matching_config_entry; +use parse::channelconfig::read_local_config; +use parse::channelconfig::ConfigEntry; +use parse::channelconfig::MatchingConfigEntry; use std::collections::VecDeque; use std::pin::Pin; use streams::eventchunker::EventChunkerConf; @@ -322,6 +346,7 @@ pub fn make_remote_event_blobs_stream( disk_io_tune: DiskIoTune, node_config: &NodeConfigCached, ) -> Result>, Error> { + info!("make_remote_event_blobs_stream"); let shape = match entry.to_shape() { Ok(k) => k, Err(e) => return Err(e)?, @@ -352,7 +377,7 @@ pub fn make_remote_event_blobs_stream( pub async fn make_event_blobs_pipe( evq: &PlainEventsQuery, node_config: &NodeConfigCached, -) -> Result> + Send>>, Error> { +) -> Result> + Send>>, Error> { info!("make_event_blobs_pipe {evq:?}"); if false { match dbconn::channel_exists(evq.channel(), &node_config).await { @@ -377,11 +402,12 @@ pub async fn make_event_blobs_pipe( DiskIoTune::default(), node_config, )?; - let s = event_blobs.map(|item: ItemType| Box::new(item) as Box); + /*let s = event_blobs.map(|item: ItemType| Box::new(item) as Box); //let s = tracing_futures::Instrumented::instrument(s, tracing::info_span!("make_event_blobs_pipe")); let pipe: Pin> + Send>>; pipe = Box::pin(s); - pipe + pipe*/ + Box::pin(event_blobs) as _ } else { let event_blobs = make_local_event_blobs_stream( range.clone(), @@ -393,11 +419,12 @@ pub async fn make_event_blobs_pipe( DiskIoTune::default(), node_config, )?; - let s = event_blobs.map(|item: ItemType| Box::new(item) as Box); + /*let s = event_blobs.map(|item: ItemType| Box::new(item) as Box); //let s = tracing_futures::Instrumented::instrument(s, tracing::info_span!("make_event_blobs_pipe")); let pipe: Pin> + Send>>; pipe = Box::pin(s); - pipe + pipe*/ + Box::pin(event_blobs) as _ }; Ok(pipe) } diff --git a/httpret/src/channel_status.rs b/httpret/src/channel_status.rs index a773e71..49b2d65 100644 --- a/httpret/src/channel_status.rs +++ b/httpret/src/channel_status.rs @@ -7,6 +7,7 @@ use http::Request; use http::Response; use http::StatusCode; use hyper::Body; +use items_2::channelevents::ChannelStatusEvent; use items_2::channelevents::ConnStatusEvent; use netpod::log::*; use netpod::query::ChannelStateEventsQuery; @@ -72,17 +73,20 @@ impl ConnectionStatusEvents { .scylla .as_ref() .ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?; - let scy = scyllaconn::create_scy_session(scyco).await?; + let _scy = scyllaconn::create_scy_session(scyco).await?; let chconf = dbconn::channelconfig::chconf_from_database(q.channel(), node_config).await?; - let series = chconf.series; - let do_one_before_range = true; - let mut stream = + let _series = chconf.series; + let _do_one_before_range = true; + let ret = Vec::new(); + if true { + return Err(Error::with_msg_no_trace("TODO channel_status fetch_data")); + } + /*let mut stream = scyllaconn::status::StatusStreamScylla::new(series, q.range().clone(), do_one_before_range, scy); - let mut ret = Vec::new(); while let Some(item) = stream.next().await { let item = item?; ret.push(item); - } + }*/ Ok(ret) } } @@ -136,7 +140,7 @@ impl ChannelStatusEvents { &self, q: &ChannelStateEventsQuery, node_config: &NodeConfigCached, - ) -> Result, Error> { + ) -> Result, Error> { let scyco = node_config .node_config .cluster diff --git a/httpret/src/events.rs b/httpret/src/events.rs index 9976391..ec1e094 100644 --- a/httpret/src/events.rs +++ b/httpret/src/events.rs @@ -88,6 +88,7 @@ async fn plain_events_json( info!("httpret plain_events_json req: {:?}", req); let (_head, _body) = req.into_parts(); let query = PlainEventsQuery::from_url(&url)?; + info!("plain_events_json query {query:?}"); let chconf = chconf_from_events_json(&query, node_config) .await .map_err(Error::from)?; diff --git a/httpret/src/pulsemap.rs b/httpret/src/pulsemap.rs index 7ea1d60..309e48d 100644 --- a/httpret/src/pulsemap.rs +++ b/httpret/src/pulsemap.rs @@ -1,32 +1,159 @@ use crate::err::Error; use crate::response; +use async_channel::Receiver; +use async_channel::Sender; +use bytes::Buf; use bytes::BufMut; -use bytes::{Buf, BytesMut}; -use futures_util::stream::{FuturesOrdered, FuturesUnordered}; +use bytes::BytesMut; +use futures_util::stream::FuturesOrdered; +use futures_util::stream::FuturesUnordered; use futures_util::FutureExt; -use http::{Method, StatusCode, Uri}; -use hyper::{Body, Request, Response}; +use http::Method; +use http::StatusCode; +use http::Uri; +use hyper::Body; +use hyper::Request; +use hyper::Response; use netpod::log::*; use netpod::AppendToUrl; use netpod::FromUrl; use netpod::HasBackend; use netpod::HasTimeout; use netpod::NodeConfigCached; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; +use serde::Serialize; use std::collections::BTreeMap; use std::future::Future; +use std::io::SeekFrom; use std::path::Path; +use std::path::PathBuf; use std::pin::Pin; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; use std::sync::Arc; -use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; -use std::{io::SeekFrom, path::PathBuf}; +use std::sync::Mutex; +use std::task::Context; +use std::task::Poll; +use std::time::Duration; +use std::time::Instant; +use std::time::SystemTime; use tokio::fs::File; -use tokio::io::{AsyncReadExt, AsyncSeekExt}; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncSeekExt; use tokio::task::JoinHandle; +use tokio::time::error::Elapsed; use url::Url; +struct Dummy; + +enum CachePortal { + Fresh, + Existing(Receiver), + Known(V), +} + +impl CachePortal {} + +enum CacheEntry { + Waiting(SystemTime, Sender, Receiver), + Known(SystemTime, V), +} + +impl CacheEntry { + fn ts(&self) -> &SystemTime { + match self { + CacheEntry::Waiting(ts, _, _) => ts, + CacheEntry::Known(ts, _) => ts, + } + } +} + +struct CacheInner { + map: BTreeMap>, +} + +impl CacheInner +where + K: Ord, +{ + const fn new() -> Self { + Self { map: BTreeMap::new() } + } + + fn housekeeping(&mut self) { + if self.map.len() > 200 { + info!("trigger housekeeping with len {}", self.map.len()); + let mut v: Vec<_> = self.map.iter().map(|(k, v)| (v.ts(), k)).collect(); + v.sort(); + let ts0 = v[v.len() / 2].0.clone(); + //let tsnow = SystemTime::now(); + //let tscut = tsnow.checked_sub(Duration::from_secs(60 * 10)).unwrap_or(tsnow); + self.map.retain(|_k, v| v.ts() >= &ts0); + info!("housekeeping kept len {}", self.map.len()); + } + } +} + +struct Cache { + inner: Mutex>, +} + +impl Cache +where + K: Ord, + V: Clone, +{ + const fn new() -> Self { + Self { + inner: Mutex::new(CacheInner::new()), + } + } + + fn housekeeping(&self) { + let mut g = self.inner.lock().unwrap(); + g.housekeeping(); + } + + fn portal(&self, key: K) -> CachePortal { + use std::collections::btree_map::Entry; + let mut g = self.inner.lock().unwrap(); + g.housekeeping(); + match g.map.entry(key) { + Entry::Vacant(e) => { + let (tx, rx) = async_channel::bounded(16); + let ret = CachePortal::Fresh; + let v = CacheEntry::Waiting(SystemTime::now(), tx, rx); + e.insert(v); + ret + } + Entry::Occupied(e) => match e.get() { + CacheEntry::Waiting(_ts, _tx, rx) => CachePortal::Existing(rx.clone()), + CacheEntry::Known(_ts, v) => CachePortal::Known(v.clone()), + }, + } + } + + fn set_value(&self, key: K, val: V) { + let mut g = self.inner.lock().unwrap(); + if let Some(e) = g.map.get_mut(&key) { + match e { + CacheEntry::Waiting(ts, tx, _rx) => { + let tx = tx.clone(); + *e = CacheEntry::Known(*ts, val); + tx.close(); + } + CacheEntry::Known(_ts, _val) => { + error!("set_value already known"); + } + } + } else { + error!("set_value no entry for key"); + } + } +} + +static CACHE: Cache = Cache::new(); + pub struct MapPulseHisto { _pulse: u64, _tss: Vec, @@ -41,6 +168,9 @@ const MAP_PULSE_LOCAL_URL_PREFIX: &'static str = "/api/1/map/pulse/local/"; const MAP_PULSE_MARK_CLOSED_URL_PREFIX: &'static str = "/api/1/map/pulse/mark/closed/"; const API_4_MAP_PULSE_URL_PREFIX: &'static str = "/api/4/map/pulse/"; +const MAP_PULSE_LOCAL_TIMEOUT: Duration = Duration::from_millis(8000); +const MAP_PULSE_QUERY_TIMEOUT: Duration = Duration::from_millis(10000); + async fn make_tables(node_config: &NodeConfigCached) -> Result<(), Error> { let conn = dbconn::create_connection(&node_config.node_config.cluster.database).await?; let sql = "set client_min_messages = 'warning'"; @@ -389,7 +519,7 @@ impl IndexFullHttpFunction { let n1 = files.len().min(3); let m1 = files.len() - n1; for ch in &files[m1..] { - info!(" index over {:?}", ch); + trace!(" index over {:?}", ch); } for mp in files[m1..].into_iter() { match mp { @@ -463,7 +593,12 @@ impl IndexFullHttpFunction { } } } - info!("latest for {channel_name} {latest_pair:?}"); + if channel_name.contains("SAT-CVME-TIFALL5:EvtSet") + || channel_name.contains("SINSB04") + || channel_name.contains("SINSB03") + { + info!("latest for {channel_name} {latest_pair:?}"); + } Ok(msg) } @@ -500,7 +635,7 @@ impl UpdateTaskGuard { pub async fn abort_wait(&mut self) -> Result<(), Error> { if let Some(jh) = self.jh.take() { info!("UpdateTaskGuard::abort_wait"); - let fut = tokio::time::timeout(Duration::from_millis(6000), async { jh.await }); + let fut = tokio::time::timeout(Duration::from_millis(20000), async { jh.await }); Ok(fut.await???) } else { Ok(()) @@ -527,6 +662,7 @@ async fn update_task(do_abort: Arc, node_config: NodeConfigCached) break; } let ts1 = Instant::now(); + CACHE.housekeeping(); match IndexFullHttpFunction::index(&node_config).await { Ok(_) => {} Err(e) => { @@ -536,7 +672,7 @@ async fn update_task(do_abort: Arc, node_config: NodeConfigCached) } let ts2 = Instant::now(); let dt = ts2.duration_since(ts1).as_secs_f32() * 1e3; - info!("Done update task {:.0} ms", dt); + info!("Done update task {:.0}ms", dt); } Ok(()) } @@ -721,7 +857,7 @@ impl HasBackend for MapPulseQuery { impl HasTimeout for MapPulseQuery { fn timeout(&self) -> Duration { - Duration::from_millis(2000) + MAP_PULSE_QUERY_TIMEOUT } } @@ -860,6 +996,7 @@ impl MapPulseLocalHttpFunction { let pulse: u64 = urls[MAP_PULSE_LOCAL_URL_PREFIX.len()..] .parse() .map_err(|_| Error::with_public_msg_no_trace(format!("can not understand pulse map url: {}", req.uri())))?; + let req_from = req.headers().get("x-req-from").map_or(None, |x| Some(format!("{x:?}"))); let ts1 = Instant::now(); let conn = dbconn::create_connection(&node_config.node_config.cluster.database).await?; let sql = "select channel, hostname, timebin, split, ks from map_pulse_files where hostname = $1 and pulse_min <= $2 and (pulse_max >= $2 or closed = 0)"; @@ -875,9 +1012,10 @@ impl MapPulseLocalHttpFunction { (channel, hostname, timebin as u32, split as u32, ks as u32) }) .collect(); - trace!( - "database query took {}s", - Instant::now().duration_since(ts1).as_secs_f32() + info!( + "map pulse local req-from {:?} candidate list in {:.0}ms", + req_from, + Instant::now().duration_since(ts1).as_secs_f32() * 1e3 ); //let mut msg = String::new(); //use std::fmt::Write; @@ -1016,23 +1154,46 @@ impl MapPulseHistoHttpFunction { node.host, node.port, MAP_PULSE_LOCAL_URL_PREFIX, pulse ); let uri: Uri = s.parse()?; - let fut = hyper::Client::new().get(uri); - let fut = tokio::time::timeout(Duration::from_millis(1000), fut); + let req = Request::get(uri) + .header("x-req-from", &node_config.node.host) + .body(Body::empty())?; + let fut = hyper::Client::new().request(req); + //let fut = hyper::Client::new().get(uri); + let fut = tokio::time::timeout(MAP_PULSE_LOCAL_TIMEOUT, fut); futs.push_back(fut); } use futures_util::stream::StreamExt; let mut map = BTreeMap::new(); - while let Some(Ok(Ok(res))) = futs.next().await { - if let Ok(b) = hyper::body::to_bytes(res.into_body()).await { - if let Ok(lm) = serde_json::from_slice::(&b) { - for ts in lm.tss { - let a = map.get(&ts); - if let Some(&j) = a { - map.insert(ts, j + 1); - } else { - map.insert(ts, 1); + while let Some(futres) = futs.next().await { + match futres { + Ok(res) => match res { + Ok(res) => match hyper::body::to_bytes(res.into_body()).await { + Ok(body) => match serde_json::from_slice::(&body) { + Ok(lm) => { + for ts in lm.tss { + let a = map.get(&ts); + if let Some(&j) = a { + map.insert(ts, j + 1); + } else { + map.insert(ts, 1); + } + } + } + Err(e) => { + error!("pulse map sub request pulse {pulse} serde error {e}"); + } + }, + Err(e) => { + error!("pulse map sub request pulse {pulse} body error {e}"); } + }, + Err(e) => { + error!("pulse map sub request pulse {pulse} error {e}"); } + }, + Err(e) => { + let _: Elapsed = e; + error!("pulse map sub request timed out pulse {pulse}"); } } } @@ -1063,19 +1224,56 @@ impl MapPulseHttpFunction { info!("MapPulseHttpFunction handle uri: {:?}", req.uri()); let urls = format!("{}", req.uri()); let pulse: u64 = urls[MAP_PULSE_URL_PREFIX.len()..].parse()?; - let histo = MapPulseHistoHttpFunction::histo(pulse, node_config).await?; - let mut i1 = 0; - let mut max = 0; - for i2 in 0..histo.tss.len() { - if histo.counts[i2] > max { - max = histo.counts[i2]; - i1 = i2; + match CACHE.portal(pulse) { + CachePortal::Fresh => { + info!("value not yet in cache pulse {pulse}"); + let histo = MapPulseHistoHttpFunction::histo(pulse, node_config).await?; + let mut i1 = 0; + let mut max = 0; + for i2 in 0..histo.tss.len() { + if histo.counts[i2] > max { + max = histo.counts[i2]; + i1 = i2; + } + } + if max > 0 { + let val = histo.tss[i1]; + CACHE.set_value(pulse, val); + Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?) + } else { + Ok(response(StatusCode::NO_CONTENT).body(Body::empty())?) + } + } + CachePortal::Existing(rx) => { + info!("waiting for already running pulse map pulse {pulse}"); + match rx.recv().await { + Ok(_) => { + error!("should never recv from existing operation pulse {pulse}"); + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?) + } + Err(_e) => { + info!("woken up while value wait pulse {pulse}"); + match CACHE.portal(pulse) { + CachePortal::Known(val) => { + info!("good, value after wakeup pulse {pulse}"); + Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?) + } + CachePortal::Fresh => { + error!("woken up, but portal fresh pulse {pulse}"); + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?) + } + CachePortal::Existing(..) => { + error!("woken up, but portal existing pulse {pulse}"); + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?) + } + } + } + } + } + CachePortal::Known(val) => { + info!("value already in cache pulse {pulse} ts {val}"); + Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?) } - } - if max > 0 { - Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&histo.tss[i1])?))?) - } else { - Ok(response(StatusCode::NO_CONTENT).body(Body::empty())?) } } } @@ -1103,7 +1301,69 @@ impl Api4MapPulseHttpFunction { info!("Api4MapPulseHttpFunction handle uri: {:?}", req.uri()); let url = Url::parse(&format!("dummy:{}", req.uri()))?; let q = MapPulseQuery::from_url(&url)?; - let histo = MapPulseHistoHttpFunction::histo(q.pulse, node_config).await?; + let pulse = q.pulse; + + let ret = match CACHE.portal(pulse) { + CachePortal::Fresh => { + info!("value not yet in cache pulse {pulse}"); + let histo = MapPulseHistoHttpFunction::histo(pulse, node_config).await?; + let mut i1 = 0; + let mut max = 0; + for i2 in 0..histo.tss.len() { + if histo.counts[i2] > max { + max = histo.counts[i2]; + i1 = i2; + } + } + if histo.tss.len() > 1 { + warn!("Ambigious pulse map pulse {} histo {:?}", pulse, histo); + } + if max > 0 { + let val = histo.tss[i1]; + CACHE.set_value(pulse, val); + Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?) + } else { + Ok(response(StatusCode::NO_CONTENT).body(Body::empty())?) + } + } + CachePortal::Existing(rx) => { + info!("waiting for already running pulse map pulse {pulse}"); + match rx.recv().await { + Ok(_) => { + error!("should never recv from existing operation pulse {pulse}"); + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?) + } + Err(_e) => { + info!("woken up while value wait pulse {pulse}"); + match CACHE.portal(pulse) { + CachePortal::Known(val) => { + info!("good, value after wakeup pulse {pulse}"); + Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?) + } + CachePortal::Fresh => { + error!("woken up, but portal fresh pulse {pulse}"); + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?) + } + CachePortal::Existing(..) => { + error!("woken up, but portal existing pulse {pulse}"); + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?) + } + } + } + } + } + CachePortal::Known(val) => { + info!("value already in cache pulse {pulse} ts {val}"); + Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?) + } + }; + let ts2 = Instant::now(); + info!( + "Api4MapPulseHttpFunction took {:.2}s", + ts2.duration_since(ts1).as_secs_f32() + ); + ret + /*let histo = MapPulseHistoHttpFunction::histo(q.pulse, node_config).await?; let mut i1 = 0; let mut max = 0; for i2 in 0..histo.tss.len() { @@ -1124,7 +1384,7 @@ impl Api4MapPulseHttpFunction { Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&histo.tss[i1])?))?) } else { Ok(response(StatusCode::NO_CONTENT).body(Body::empty())?) - } + }*/ } } diff --git a/items/src/items.rs b/items/src/items.rs index b7cd971..8288da9 100644 --- a/items/src/items.rs +++ b/items/src/items.rs @@ -55,6 +55,7 @@ pub const STATS_EVENTS_FRAME_TYPE_ID: u32 = 0x2400; pub const ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID: u32 = 0x2500; pub const X_BINNED_SCALAR_EVENTS_FRAME_TYPE_ID: u32 = 0x8800; pub const X_BINNED_WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0x8900; +pub const DATABUFFER_EVENT_BLOB_FRAME_TYPE_ID: u32 = 0x8a00; pub fn bool_is_false(j: &bool) -> bool { *j == false @@ -525,7 +526,17 @@ pub trait TimeBinnableDyn: } pub trait TimeBinnableDynStub: - fmt::Debug + FramableInner + FrameType + FrameTypeInnerDyn + WithLen + RangeOverlapInfo + Any + AsAnyRef + Sync + Send + 'static + fmt::Debug + + FramableInner + + FrameType + + FrameTypeInnerDyn + + WithLen + + RangeOverlapInfo + + Any + + AsAnyRef + + Sync + + Send + + 'static { } @@ -748,10 +759,16 @@ pub fn empty_events_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggK I64 => Box::new(K::::empty()), F32 => Box::new(K::::empty()), F64 => Box::new(K::::empty()), - _ => err::todoval(), + _ => { + error!("TODO for {:?} {:?} {:?}", scalar_type, shape, agg_kind); + err::todoval() + } } } - _ => err::todoval(), + _ => { + error!("TODO for {:?} {:?} {:?}", scalar_type, shape, agg_kind); + err::todoval() + } }, Shape::Wave(_n) => match agg_kind { AggKind::DimXBins1 => { @@ -761,12 +778,36 @@ pub fn empty_events_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggK U8 => Box::new(K::::empty()), F32 => Box::new(K::::empty()), F64 => Box::new(K::::empty()), - _ => err::todoval(), + BOOL => Box::new(K::::empty()), + _ => { + error!("TODO for {:?} {:?} {:?}", scalar_type, shape, agg_kind); + err::todoval() + } } } - _ => err::todoval(), + AggKind::Plain => { + use ScalarType::*; + type K = waveevents::WaveEvents; + match scalar_type { + U8 => Box::new(K::::empty()), + F32 => Box::new(K::::empty()), + F64 => Box::new(K::::empty()), + BOOL => Box::new(K::::empty()), + _ => { + error!("TODO for {:?} {:?} {:?}", scalar_type, shape, agg_kind); + err::todoval() + } + } + } + _ => { + error!("TODO for {:?} {:?} {:?}", scalar_type, shape, agg_kind); + err::todoval() + } }, - Shape::Image(..) => err::todoval(), + Shape::Image(..) => { + error!("TODO for {:?} {:?} {:?}", scalar_type, shape, agg_kind); + err::todoval() + } } } diff --git a/items/src/numops.rs b/items/src/numops.rs index 56c4865..569a4f8 100644 --- a/items/src/numops.rs +++ b/items/src/numops.rs @@ -1,7 +1,7 @@ use items_0::subfr::SubFrId; -use num_traits::{Bounded, Float, Zero}; use serde::de::DeserializeOwned; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; +use serde::Serialize; use std::cmp::Ordering; use std::fmt::Debug; use std::ops::Add; @@ -117,8 +117,8 @@ pub trait NumOps: + 'static + Unpin + Debug - + Zero - + Bounded + //+ Zero + //+ Bounded + PartialOrd + SubFrId + Serialize @@ -128,10 +128,11 @@ pub trait NumOps: fn min_or_nan() -> Self; fn max_or_nan() -> Self; fn is_nan(&self) -> bool; + fn zero() -> Self; } macro_rules! impl_num_ops { - ($ty:ident, $min_or_nan:ident, $max_or_nan:ident, $is_nan:ident) => { + ($ty:ident, $min_or_nan:ident, $max_or_nan:ident, $is_nan:ident, $zero:expr) => { impl NumOps for $ty { fn min_or_nan() -> Self { $ty::$min_or_nan @@ -142,16 +143,51 @@ macro_rules! impl_num_ops { fn is_nan(&self) -> bool { $is_nan(self) } + fn zero() -> Self { + $zero + } } }; } +impl AsPrimF32 for bool { + fn as_prim_f32(&self) -> f32 { + if *self { + 1. + } else { + 0. + } + } +} + +impl NumOps for bool { + fn min_or_nan() -> Self { + todo!() + } + + fn max_or_nan() -> Self { + todo!() + } + + fn is_nan(&self) -> bool { + todo!() + } + + fn zero() -> Self { + false + } +} + fn is_nan_int(_x: &T) -> bool { false } -fn is_nan_float(x: &T) -> bool { - x.is_nan() +fn is_nan_f32(x: &f32) -> bool { + f32::is_nan(*x) +} + +fn is_nan_f64(x: &f64) -> bool { + f64::is_nan(*x) } pub trait AsPrimF32 { @@ -192,18 +228,18 @@ impl AsPrimF32 for StringNum { } } -impl_num_ops!(u8, MIN, MAX, is_nan_int); -impl_num_ops!(u16, MIN, MAX, is_nan_int); -impl_num_ops!(u32, MIN, MAX, is_nan_int); -impl_num_ops!(u64, MIN, MAX, is_nan_int); -impl_num_ops!(i8, MIN, MAX, is_nan_int); -impl_num_ops!(i16, MIN, MAX, is_nan_int); -impl_num_ops!(i32, MIN, MAX, is_nan_int); -impl_num_ops!(i64, MIN, MAX, is_nan_int); -impl_num_ops!(f32, NAN, NAN, is_nan_float); -impl_num_ops!(f64, NAN, NAN, is_nan_float); -impl_num_ops!(BoolNum, MIN, MAX, is_nan_int); -impl_num_ops!(StringNum, MIN, MAX, is_nan_int); +impl_num_ops!(u8, MIN, MAX, is_nan_int, 0); +impl_num_ops!(u16, MIN, MAX, is_nan_int, 0); +impl_num_ops!(u32, MIN, MAX, is_nan_int, 0); +impl_num_ops!(u64, MIN, MAX, is_nan_int, 0); +impl_num_ops!(i8, MIN, MAX, is_nan_int, 0); +impl_num_ops!(i16, MIN, MAX, is_nan_int, 0); +impl_num_ops!(i32, MIN, MAX, is_nan_int, 0); +impl_num_ops!(i64, MIN, MAX, is_nan_int, 0); +impl_num_ops!(f32, NAN, NAN, is_nan_f32, 0.); +impl_num_ops!(f64, NAN, NAN, is_nan_f64, 0.); +impl_num_ops!(BoolNum, MIN, MAX, is_nan_int, BoolNum(0)); +impl_num_ops!(StringNum, MIN, MAX, is_nan_int, StringNum(String::new())); impl SubFrId for StringNum { const SUB: u32 = 0x0d; diff --git a/items_0/src/scalar_ops.rs b/items_0/src/scalar_ops.rs index f5f77fa..0b6a2a1 100644 --- a/items_0/src/scalar_ops.rs +++ b/items_0/src/scalar_ops.rs @@ -42,6 +42,16 @@ impl_as_prim_f32!(i64); impl_as_prim_f32!(f32); impl_as_prim_f32!(f64); +impl AsPrimF32 for bool { + fn as_prim_f32_b(&self) -> f32 { + if *self { + 1. + } else { + 0. + } + } +} + pub trait ScalarOps: fmt::Debug + Clone + PartialOrd + SubFrId + AsPrimF32 + Serialize + Unpin + Send + 'static { @@ -49,7 +59,7 @@ pub trait ScalarOps: fn equal_slack(&self, rhs: &Self) -> bool; } -macro_rules! impl_num_ops { +macro_rules! impl_scalar_ops { ($ty:ident, $zero:expr, $equal_slack:ident) => { impl ScalarOps for $ty { fn zero_b() -> Self { @@ -75,13 +85,18 @@ fn equal_f64(a: f64, b: f64) -> bool { (a - b).abs() < 1e-6 || (a / b > 0.99999 && a / b < 1.00001) } -impl_num_ops!(u8, 0, equal_int); -impl_num_ops!(u16, 0, equal_int); -impl_num_ops!(u32, 0, equal_int); -impl_num_ops!(u64, 0, equal_int); -impl_num_ops!(i8, 0, equal_int); -impl_num_ops!(i16, 0, equal_int); -impl_num_ops!(i32, 0, equal_int); -impl_num_ops!(i64, 0, equal_int); -impl_num_ops!(f32, 0., equal_f32); -impl_num_ops!(f64, 0., equal_f64); +fn equal_bool(a: bool, b: bool) -> bool { + a == b +} + +impl_scalar_ops!(u8, 0, equal_int); +impl_scalar_ops!(u16, 0, equal_int); +impl_scalar_ops!(u32, 0, equal_int); +impl_scalar_ops!(u64, 0, equal_int); +impl_scalar_ops!(i8, 0, equal_int); +impl_scalar_ops!(i16, 0, equal_int); +impl_scalar_ops!(i32, 0, equal_int); +impl_scalar_ops!(i64, 0, equal_int); +impl_scalar_ops!(f32, 0., equal_f32); +impl_scalar_ops!(f64, 0., equal_f64); +impl_scalar_ops!(bool, false, equal_bool); diff --git a/items_0/src/subfr.rs b/items_0/src/subfr.rs index fcea23f..a822bf5 100644 --- a/items_0/src/subfr.rs +++ b/items_0/src/subfr.rs @@ -41,3 +41,7 @@ impl SubFrId for f32 { impl SubFrId for f64 { const SUB: u32 = 0x0c; } + +impl SubFrId for bool { + const SUB: u32 = 0x0d; +} diff --git a/items_2/Cargo.toml b/items_2/Cargo.toml index 438502e..da49755 100644 --- a/items_2/Cargo.toml +++ b/items_2/Cargo.toml @@ -19,6 +19,7 @@ chrono = { version = "0.4.19", features = ["serde"] } crc32fast = "1.3.2" futures-util = "0.3.24" tokio = { version = "1.20", features = ["rt-multi-thread", "sync", "time"] } +humantime-serde = "1.1.1" err = { path = "../err" } items = { path = "../items" } items_0 = { path = "../items_0" } diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index 827fb29..a719f13 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -10,9 +10,12 @@ use items_0::AsAnyRef; use netpod::log::*; use netpod::BinnedRange; use netpod::NanoRange; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; +use serde::Serialize; use std::any::Any; use std::fmt; +use std::time::Duration; +use std::time::SystemTime; // TODO maybe rename to ChannelStatus? #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -25,8 +28,6 @@ impl ConnStatus { pub fn from_ca_ingest_status_kind(k: u32) -> Self { match k { 1 => Self::Connect, - 2 => Self::Disconnect, - 3 => Self::Disconnect, _ => Self::Disconnect, } } @@ -35,12 +36,47 @@ impl ConnStatus { #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct ConnStatusEvent { pub ts: u64, + #[serde(with = "humantime_serde")] + //pub datetime: chrono::DateTime, + pub datetime: SystemTime, pub status: ConnStatus, } impl ConnStatusEvent { pub fn new(ts: u64, status: ConnStatus) -> Self { - Self { ts, status } + let datetime = SystemTime::UNIX_EPOCH + Duration::from_millis(ts / 1000000); + Self { ts, datetime, status } + } +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum ChannelStatus { + Connect, + Disconnect, +} + +impl ChannelStatus { + pub fn from_ca_ingest_status_kind(k: u32) -> Self { + match k { + 1 => Self::Connect, + _ => Self::Disconnect, + } + } +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct ChannelStatusEvent { + pub ts: u64, + #[serde(with = "humantime_serde")] + //pub datetime: chrono::DateTime, + pub datetime: SystemTime, + pub status: ChannelStatus, +} + +impl ChannelStatusEvent { + pub fn new(ts: u64, status: ChannelStatus) -> Self { + let datetime = SystemTime::UNIX_EPOCH + Duration::from_millis(ts / 1000000); + Self { ts, datetime, status } } } @@ -88,6 +124,7 @@ mod serde_channel_events { use super::{ChannelEvents, Events}; use crate::channelevents::ConnStatusEvent; use crate::eventsdim0::EventsDim0; + use crate::eventsdim1::EventsDim1; use items_0::subfr::SubFrId; use serde::de::{self, EnumAccess, VariantAccess, Visitor}; use serde::ser::SerializeSeq; @@ -174,6 +211,22 @@ mod serde_channel_events { let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } + bool::SUB => { + let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + Ok(EvBox(Box::new(obj))) + } + _ => Err(de::Error::custom(&format!("unknown nty {e1}"))), + } + } else if e0 == EventsDim1::::serde_id() { + match e1 { + f32::SUB => { + let obj: EventsDim1 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + Ok(EvBox(Box::new(obj))) + } + bool::SUB => { + let obj: EventsDim1 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + Ok(EvBox(Box::new(obj))) + } _ => Err(de::Error::custom(&format!("unknown nty {e1}"))), } } else { @@ -324,7 +377,9 @@ mod test_channel_events_serde { use bincode::DefaultOptions; use items_0::bincode; use items_0::Empty; - use serde::{Deserialize, Serialize}; + use serde::Deserialize; + use serde::Serialize; + use std::time::SystemTime; #[test] fn channel_events() { @@ -382,6 +437,7 @@ mod test_channel_events_serde { evs.push(12, 3, 3.2f32); let status = ConnStatusEvent { ts: 567, + datetime: SystemTime::UNIX_EPOCH, status: crate::channelevents::ConnStatus::Connect, }; let item = ChannelEvents::Status(status); diff --git a/items_2/src/databuffereventblobs.rs b/items_2/src/databuffereventblobs.rs new file mode 100644 index 0000000..c1eaeea --- /dev/null +++ b/items_2/src/databuffereventblobs.rs @@ -0,0 +1,24 @@ +use items::FrameType; +use items::FrameTypeInnerStatic; +use serde::Serialize; + +pub struct DatabufferEventBlob {} + +impl FrameTypeInnerStatic for DatabufferEventBlob { + const FRAME_TYPE_ID: u32 = items::DATABUFFER_EVENT_BLOB_FRAME_TYPE_ID; +} + +impl FrameType for DatabufferEventBlob { + fn frame_type_id(&self) -> u32 { + ::FRAME_TYPE_ID + } +} + +impl Serialize for DatabufferEventBlob { + fn serialize(&self, _serializer: S) -> Result + where + S: serde::Serializer, + { + todo!() + } +} diff --git a/items_2/src/eventsdim1.rs b/items_2/src/eventsdim1.rs new file mode 100644 index 0000000..ebf6aa6 --- /dev/null +++ b/items_2/src/eventsdim1.rs @@ -0,0 +1,1059 @@ +use crate::binsdim0::BinsDim0; +use crate::pulse_offs_from_abs; +use crate::ts_offs_from_abs; +use crate::IsoDateTime; +use crate::RangeOverlapInfo; +use crate::TimeBinnable; +use crate::TimeBinnableType; +use crate::TimeBinnableTypeAggregator; +use crate::TimeBinner; +use err::Error; +use items_0::scalar_ops::ScalarOps; +use items_0::AsAnyMut; +use items_0::{AsAnyRef, Empty, Events, WithLen}; +use netpod::log::*; +use netpod::timeunits::SEC; +use netpod::BinnedRange; +use netpod::NanoRange; +use serde::{Deserialize, Serialize}; +use std::any::Any; +use std::collections::VecDeque; +use std::fmt; +use std::mem; + +#[allow(unused)] +macro_rules! trace2 { + (EN$($arg:tt)*) => (); + ($($arg:tt)*) => (trace!($($arg)*)); +} + +#[derive(Clone, PartialEq, Serialize, Deserialize)] +pub struct EventsDim1 { + pub tss: VecDeque, + pub pulses: VecDeque, + pub values: VecDeque>, +} + +impl EventsDim1 { + #[inline(always)] + pub fn push(&mut self, ts: u64, pulse: u64, value: Vec) { + self.tss.push_back(ts); + self.pulses.push_back(pulse); + self.values.push_back(value); + } + + #[inline(always)] + pub fn push_front(&mut self, ts: u64, pulse: u64, value: Vec) { + self.tss.push_front(ts); + self.pulses.push_front(pulse); + self.values.push_front(value); + } + + pub fn serde_id() -> &'static str { + "EventsDim1" + } + + pub fn tss(&self) -> &VecDeque { + &self.tss + } +} + +impl AsAnyRef for EventsDim1 +where + NTY: ScalarOps, +{ + fn as_any_ref(&self) -> &dyn Any { + self + } +} + +impl AsAnyMut for EventsDim1 +where + NTY: ScalarOps, +{ + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} + +impl Empty for EventsDim1 { + fn empty() -> Self { + Self { + tss: VecDeque::new(), + pulses: VecDeque::new(), + values: VecDeque::new(), + } + } +} + +impl fmt::Debug for EventsDim1 +where + NTY: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + if false { + write!( + fmt, + "EventsDim1 {{ count {} ts {:?} vals {:?} }}", + self.tss.len(), + self.tss.iter().map(|x| x / SEC).collect::>(), + self.values, + ) + } else { + write!( + fmt, + "EventsDim1 {{ count {} ts {:?} .. {:?} vals {:?} .. {:?} }}", + self.tss.len(), + self.tss.front().map(|x| x / SEC), + self.tss.back().map(|x| x / SEC), + self.values.front(), + self.values.back(), + ) + } + } +} + +impl WithLen for EventsDim1 { + fn len(&self) -> usize { + self.tss.len() + } +} + +impl RangeOverlapInfo for EventsDim1 { + fn ends_before(&self, range: NanoRange) -> bool { + if let Some(&max) = self.tss.back() { + max < range.beg + } else { + true + } + } + + fn ends_after(&self, range: NanoRange) -> bool { + if let Some(&max) = self.tss.back() { + max >= range.end + } else { + true + } + } + + fn starts_after(&self, range: NanoRange) -> bool { + if let Some(&min) = self.tss.front() { + min >= range.end + } else { + true + } + } +} + +impl TimeBinnableType for EventsDim1 +where + NTY: ScalarOps, +{ + // TODO + type Output = BinsDim0; + type Aggregator = EventsDim1Aggregator; + + fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { + let self_name = std::any::type_name::(); + debug!( + "TimeBinnableType for {self_name} aggregator() range {:?} x_bin_count {} do_time_weight {}", + range, x_bin_count, do_time_weight + ); + Self::Aggregator::new(range, do_time_weight) + } +} + +#[derive(Debug)] +pub struct EventsDim1Collector { + vals: EventsDim1, + range_final: bool, + timed_out: bool, +} + +impl EventsDim1Collector { + pub fn new() -> Self { + Self { + vals: EventsDim1::empty(), + range_final: false, + timed_out: false, + } + } +} + +impl WithLen for EventsDim1Collector { + fn len(&self) -> usize { + self.vals.tss.len() + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct EventsDim1CollectorOutput { + #[serde(rename = "tsAnchor")] + ts_anchor_sec: u64, + #[serde(rename = "tsMs")] + ts_off_ms: VecDeque, + #[serde(rename = "tsNs")] + ts_off_ns: VecDeque, + #[serde(rename = "pulseAnchor")] + pulse_anchor: u64, + #[serde(rename = "pulseOff")] + pulse_off: VecDeque, + #[serde(rename = "values")] + values: VecDeque>, + #[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")] + range_final: bool, + #[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")] + timed_out: bool, + #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] + continue_at: Option, +} + +impl EventsDim1CollectorOutput { + pub fn len(&self) -> usize { + self.values.len() + } + + pub fn ts_anchor_sec(&self) -> u64 { + self.ts_anchor_sec + } + + pub fn ts_off_ms(&self) -> &VecDeque { + &self.ts_off_ms + } + + pub fn pulse_anchor(&self) -> u64 { + self.pulse_anchor + } + + pub fn pulse_off(&self) -> &VecDeque { + &self.pulse_off + } + + /// Note: only used for unit tests. + pub fn values_to_f32(&self) -> VecDeque> { + self.values + .iter() + .map(|x| x.iter().map(|x| x.as_prim_f32_b()).collect()) + .collect() + } + + pub fn range_complete(&self) -> bool { + self.range_final + } + + pub fn timed_out(&self) -> bool { + self.timed_out + } +} + +impl AsAnyRef for EventsDim1CollectorOutput +where + NTY: ScalarOps, +{ + fn as_any_ref(&self) -> &dyn Any { + self + } +} + +impl AsAnyMut for EventsDim1CollectorOutput +where + NTY: ScalarOps, +{ + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} + +impl items_0::collect_s::ToJsonResult for EventsDim1CollectorOutput { + fn to_json_result(&self) -> Result, Error> { + let k = serde_json::to_value(self)?; + Ok(Box::new(k)) + } +} + +impl items_0::collect_c::Collected for EventsDim1CollectorOutput {} + +impl items_0::collect_s::CollectorType for EventsDim1Collector { + type Input = EventsDim1; + type Output = EventsDim1CollectorOutput; + + fn ingest(&mut self, src: &mut Self::Input) { + self.vals.tss.append(&mut src.tss); + self.vals.pulses.append(&mut src.pulses); + self.vals.values.append(&mut src.values); + } + + fn set_range_complete(&mut self) { + self.range_final = true; + } + + fn set_timed_out(&mut self) { + self.timed_out = true; + } + + fn result(&mut self, range: Option, _binrange: Option) -> Result { + // If we timed out, we want to hint the client from where to continue. + // This is tricky: currently, client can not request a left-exclusive range. + // We currently give the timestamp of the last event plus a small delta. + // The amount of the delta must take into account what kind of timestamp precision the client + // can parse and handle. + let continue_at = if self.timed_out { + if let Some(ts) = self.vals.tss.back() { + Some(IsoDateTime::from_u64(*ts + netpod::timeunits::MS)) + } else { + if let Some(range) = &range { + Some(IsoDateTime::from_u64(range.beg + netpod::timeunits::SEC)) + } else { + warn!("can not determine continue-at parameters"); + None + } + } + } else { + None + }; + let tss_sl = self.vals.tss.make_contiguous(); + let pulses_sl = self.vals.pulses.make_contiguous(); + let (ts_anchor_sec, ts_off_ms, ts_off_ns) = ts_offs_from_abs(tss_sl); + let (pulse_anchor, pulse_off) = pulse_offs_from_abs(pulses_sl); + let ret = Self::Output { + ts_anchor_sec, + ts_off_ms, + ts_off_ns, + pulse_anchor, + pulse_off: pulse_off, + values: mem::replace(&mut self.vals.values, VecDeque::new()), + range_final: self.range_final, + timed_out: self.timed_out, + continue_at, + }; + Ok(ret) + } +} + +impl items_0::collect_s::CollectableType for EventsDim1 { + type Collector = EventsDim1Collector; + + fn new_collector() -> Self::Collector { + Self::Collector::new() + } +} + +impl items_0::collect_c::Collector for EventsDim1Collector { + fn len(&self) -> usize { + self.vals.len() + } + + fn ingest(&mut self, item: &mut dyn items_0::collect_c::Collectable) { + if let Some(item) = item.as_any_mut().downcast_mut::>() { + items_0::collect_s::CollectorType::ingest(self, item) + } else { + error!("EventsDim0Collector::ingest unexpected item {:?}", item); + } + } + + fn set_range_complete(&mut self) { + items_0::collect_s::CollectorType::set_range_complete(self) + } + + fn set_timed_out(&mut self) { + items_0::collect_s::CollectorType::set_timed_out(self) + } + + fn result( + &mut self, + range: Option, + binrange: Option, + ) -> Result, err::Error> { + match items_0::collect_s::CollectorType::result(self, range, binrange) { + Ok(x) => Ok(Box::new(x)), + Err(e) => Err(e.into()), + } + } +} + +pub struct EventsDim1Aggregator { + range: NanoRange, + count: u64, + min: NTY, + max: NTY, + sumc: u64, + sum: f32, + int_ts: u64, + last_seen_ts: u64, + last_seen_val: Option, + did_min_max: bool, + do_time_weight: bool, + events_taken_count: u64, + events_ignored_count: u64, +} + +impl Drop for EventsDim1Aggregator { + fn drop(&mut self) { + // TODO collect as stats for the request context: + trace!( + "taken {} ignored {}", + self.events_taken_count, + self.events_ignored_count + ); + } +} + +impl EventsDim1Aggregator { + pub fn new(range: NanoRange, do_time_weight: bool) -> Self { + let int_ts = range.beg; + Self { + range, + count: 0, + min: NTY::zero_b(), + max: NTY::zero_b(), + sum: 0., + sumc: 0, + int_ts, + last_seen_ts: 0, + last_seen_val: None, + did_min_max: false, + do_time_weight, + events_taken_count: 0, + events_ignored_count: 0, + } + } + + // TODO reduce clone.. optimize via more traits to factor the trade-offs? + fn apply_min_max(&mut self, val: NTY) { + trace!( + "apply_min_max val {:?} last_val {:?} count {} sumc {:?} min {:?} max {:?}", + val, + self.last_seen_val, + self.count, + self.sumc, + self.min, + self.max + ); + if self.did_min_max == false { + self.did_min_max = true; + self.min = val.clone(); + self.max = val.clone(); + } else { + if self.min > val { + self.min = val.clone(); + } + if self.max < val { + self.max = val.clone(); + } + } + } + + fn apply_event_unweight(&mut self, val: NTY) { + trace!("TODO check again result_reset_unweight"); + err::todo(); + let vf = val.as_prim_f32_b(); + self.apply_min_max(val); + if vf.is_nan() { + } else { + self.sum += vf; + self.sumc += 1; + } + } + + fn apply_event_time_weight(&mut self, ts: u64) { + if let Some(v) = &self.last_seen_val { + let vf = v.as_prim_f32_b(); + let v2 = v.clone(); + if ts > self.range.beg { + self.apply_min_max(v2); + } + let w = if self.do_time_weight { + (ts - self.int_ts) as f32 * 1e-9 + } else { + 1. + }; + if vf.is_nan() { + } else { + self.sum += vf * w; + self.sumc += 1; + } + self.int_ts = ts; + } else { + debug!( + "apply_event_time_weight NO VALUE {}", + ts as i64 - self.range.beg as i64 + ); + } + } + + fn ingest_unweight(&mut self, item: &::Input) { + trace!("TODO check again result_reset_unweight"); + err::todo(); + for i1 in 0..item.tss.len() { + let ts = item.tss[i1]; + let val = item.values[i1].clone(); + if ts < self.range.beg { + self.events_ignored_count += 1; + } else if ts >= self.range.end { + self.events_ignored_count += 1; + return; + } else { + error!("TODO ingest_unweight"); + err::todo(); + //self.apply_event_unweight(val); + self.count += 1; + self.events_taken_count += 1; + } + } + } + + fn ingest_time_weight(&mut self, item: &::Input) { + let self_name = std::any::type_name::(); + trace!("{self_name}::ingest_time_weight item len {}", item.len()); + for i1 in 0..item.tss.len() { + let ts = item.tss[i1]; + let val = item.values[i1].clone(); + trace!("{self_name} ingest {:6} {:20} {:10?}", i1, ts, val); + if ts < self.int_ts { + if self.last_seen_val.is_none() { + info!( + "ingest_time_weight event before range, only set last ts {} val {:?}", + ts, val + ); + } + self.events_ignored_count += 1; + self.last_seen_ts = ts; + error!("TODO ingest_time_weight"); + err::todo(); + //self.last_seen_val = Some(val); + } else if ts >= self.range.end { + self.events_ignored_count += 1; + return; + } else { + if false && self.last_seen_val.is_none() { + // TODO no longer needed or? + info!( + "call apply_min_max without last val, use current instead {} {:?}", + ts, val + ); + // TODO: self.apply_min_max(val.clone()); + } + self.apply_event_time_weight(ts); + self.count += 1; + self.last_seen_ts = ts; + error!("TODO ingest_time_weight"); + err::todo(); + //self.last_seen_val = Some(val); + self.events_taken_count += 1; + } + } + } + + fn result_reset_unweight(&mut self, range: NanoRange, _expand: bool) -> BinsDim0 { + trace!("TODO check again result_reset_unweight"); + err::todo(); + let (min, max, avg) = if self.sumc > 0 { + let avg = self.sum / self.sumc as f32; + (self.min.clone(), self.max.clone(), avg) + } else { + let g = match &self.last_seen_val { + Some(x) => x.clone(), + None => NTY::zero_b(), + }; + (g.clone(), g.clone(), g.as_prim_f32_b()) + }; + let ret = BinsDim0 { + ts1s: [self.range.beg].into(), + ts2s: [self.range.end].into(), + counts: [self.count].into(), + mins: [min].into(), + maxs: [max].into(), + avgs: [avg].into(), + }; + self.int_ts = range.beg; + self.range = range; + self.count = 0; + self.sum = 0f32; + self.sumc = 0; + self.did_min_max = false; + ret + } + + fn result_reset_time_weight(&mut self, range: NanoRange, expand: bool) -> BinsDim0 { + // TODO check callsite for correct expand status. + if expand { + debug!("result_reset_time_weight calls apply_event_time_weight"); + self.apply_event_time_weight(self.range.end); + } else { + debug!("result_reset_time_weight NO EXPAND"); + } + let (min, max, avg) = if self.sumc > 0 { + let avg = self.sum / (self.range.delta() as f32 * 1e-9); + (self.min.clone(), self.max.clone(), avg) + } else { + let g = match &self.last_seen_val { + Some(x) => x.clone(), + None => NTY::zero_b(), + }; + (g.clone(), g.clone(), g.as_prim_f32_b()) + }; + let ret = BinsDim0 { + ts1s: [self.range.beg].into(), + ts2s: [self.range.end].into(), + counts: [self.count].into(), + mins: [min].into(), + maxs: [max].into(), + avgs: [avg].into(), + }; + self.int_ts = range.beg; + self.range = range; + self.count = 0; + self.sum = 0.; + self.sumc = 0; + self.did_min_max = false; + self.min = NTY::zero_b(); + self.max = NTY::zero_b(); + ret + } +} + +impl TimeBinnableTypeAggregator for EventsDim1Aggregator { + type Input = EventsDim1; + type Output = BinsDim0; + + fn range(&self) -> &NanoRange { + &self.range + } + + fn ingest(&mut self, item: &Self::Input) { + if true { + trace!("{} ingest {} events", std::any::type_name::(), item.len()); + } + if false { + for (i, &ts) in item.tss.iter().enumerate() { + trace!("{} ingest {:6} {:20}", std::any::type_name::(), i, ts); + } + } + if self.do_time_weight { + self.ingest_time_weight(item) + } else { + self.ingest_unweight(item) + } + } + + fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output { + trace!("result_reset {} {}", range.beg, range.end); + if self.do_time_weight { + self.result_reset_time_weight(range, expand) + } else { + self.result_reset_unweight(range, expand) + } + } +} + +impl TimeBinnable for EventsDim1 { + fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box { + let ret = EventsDim1TimeBinner::::new(edges.into(), do_time_weight).unwrap(); + Box::new(ret) + } + + fn to_box_to_json_result(&self) -> Box { + let k = serde_json::to_value(self).unwrap(); + Box::new(k) as _ + } +} + +impl Events for EventsDim1 { + fn as_time_binnable(&self) -> &dyn TimeBinnable { + self as &dyn TimeBinnable + } + + fn verify(&self) -> bool { + let mut good = true; + let mut ts_max = 0; + for ts in &self.tss { + let ts = *ts; + if ts < ts_max { + good = false; + error!("unordered event data ts {} ts_max {}", ts, ts_max); + } + ts_max = ts_max.max(ts); + } + good + } + + fn output_info(&self) { + if false { + info!("output_info len {}", self.tss.len()); + if self.tss.len() == 1 { + info!( + " only: ts {} pulse {} value {:?}", + self.tss[0], self.pulses[0], self.values[0] + ); + } else if self.tss.len() > 1 { + info!( + " first: ts {} pulse {} value {:?}", + self.tss[0], self.pulses[0], self.values[0] + ); + let n = self.tss.len() - 1; + info!( + " last: ts {} pulse {} value {:?}", + self.tss[n], self.pulses[n], self.values[n] + ); + } + } + } + + fn as_collectable_mut(&mut self) -> &mut dyn items_0::collect_s::Collectable { + self + } + + fn as_collectable_with_default_ref(&self) -> &dyn items_0::collect_c::CollectableWithDefault { + self + } + + fn as_collectable_with_default_mut(&mut self) -> &mut dyn items_0::collect_c::CollectableWithDefault { + self + } + + fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box { + // TODO improve the search + let n1 = self.tss.iter().take_while(|&&x| x <= ts_end).count(); + let tss = self.tss.drain(..n1).collect(); + let pulses = self.pulses.drain(..n1).collect(); + let values = self.values.drain(..n1).collect(); + let ret = Self { tss, pulses, values }; + Box::new(ret) + } + + fn move_into_fresh(&mut self, ts_end: u64) -> Box { + // TODO improve the search + let n1 = self.tss.iter().take_while(|&&x| x <= ts_end).count(); + let tss = self.tss.drain(..n1).collect(); + let pulses = self.pulses.drain(..n1).collect(); + let values = self.values.drain(..n1).collect(); + let ret = Self { tss, pulses, values }; + Box::new(ret) + } + + fn move_into_existing(&mut self, tgt: &mut Box, ts_end: u64) -> Result<(), ()> { + // TODO as_any and as_any_mut are declared on unrealted traits. Simplify. + if let Some(tgt) = tgt.as_mut().as_any_mut().downcast_mut::() { + // TODO improve the search + let n1 = self.tss.iter().take_while(|&&x| x <= ts_end).count(); + // TODO make it harder to forget new members when the struct may get modified in the future + tgt.tss.extend(self.tss.drain(..n1)); + tgt.pulses.extend(self.pulses.drain(..n1)); + tgt.values.extend(self.values.drain(..n1)); + Ok(()) + } else { + eprintln!("downcast to EventsDim0 FAILED"); + Err(()) + } + } + + fn ts_min(&self) -> Option { + self.tss.front().map(|&x| x) + } + + fn ts_max(&self) -> Option { + self.tss.back().map(|&x| x) + } + + fn partial_eq_dyn(&self, other: &dyn Events) -> bool { + if let Some(other) = other.as_any_ref().downcast_ref::() { + self == other + } else { + false + } + } + + fn serde_id(&self) -> &'static str { + Self::serde_id() + } + + fn nty_id(&self) -> u32 { + NTY::SUB + } + + fn clone_dyn(&self) -> Box { + Box::new(self.clone()) + } +} + +pub struct EventsDim1TimeBinner { + edges: VecDeque, + agg: EventsDim1Aggregator, + ready: Option< as TimeBinnableTypeAggregator>::Output>, + range_complete: bool, +} + +impl EventsDim1TimeBinner { + fn new(edges: VecDeque, do_time_weight: bool) -> Result { + if edges.len() < 2 { + return Err(Error::with_msg_no_trace(format!("need at least 2 edges"))); + } + let self_name = std::any::type_name::(); + trace!("{self_name}::new edges {edges:?}"); + let agg = EventsDim1Aggregator::new( + NanoRange { + beg: edges[0], + end: edges[1], + }, + do_time_weight, + ); + let ret = Self { + edges, + agg, + ready: None, + range_complete: false, + }; + Ok(ret) + } + + fn next_bin_range(&mut self) -> Option { + let self_name = std::any::type_name::(); + if self.edges.len() >= 3 { + self.edges.pop_front(); + let ret = NanoRange { + beg: self.edges[0], + end: self.edges[1], + }; + trace!("{self_name} next_bin_range {} {}", ret.beg, ret.end); + Some(ret) + } else { + self.edges.clear(); + trace!("{self_name} next_bin_range None"); + None + } + } +} + +impl TimeBinner for EventsDim1TimeBinner { + fn bins_ready_count(&self) -> usize { + match &self.ready { + Some(k) => k.len(), + None => 0, + } + } + + fn bins_ready(&mut self) -> Option> { + match self.ready.take() { + Some(k) => Some(Box::new(k)), + None => None, + } + } + + fn ingest(&mut self, item: &dyn TimeBinnable) { + let self_name = std::any::type_name::(); + trace2!( + "TimeBinner for EventsDim1TimeBinner {:?}\n{:?}\n------------------------------------", + self.edges.iter().take(2).collect::>(), + item + ); + if item.len() == 0 { + // Return already here, RangeOverlapInfo would not give much sense. + return; + } + if self.edges.len() < 2 { + warn!("{self_name} no more bin in edges A"); + return; + } + // TODO optimize by remembering at which event array index we have arrived. + // That needs modified interfaces which can take and yield the start and latest index. + loop { + while item.starts_after(self.agg.range().clone()) { + trace!("{self_name} IGNORE ITEM AND CYCLE BECAUSE item.starts_after"); + self.cycle(); + if self.edges.len() < 2 { + warn!("{self_name} no more bin in edges B"); + return; + } + } + if item.ends_before(self.agg.range().clone()) { + trace!("{self_name} IGNORE ITEM BECAUSE ends_before\n------------- -----------"); + return; + } else { + if self.edges.len() < 2 { + trace!("{self_name} edge list exhausted"); + return; + } else { + if let Some(item) = item + .as_any_ref() + // TODO make statically sure that we attempt to cast to the correct type here: + .downcast_ref::< as TimeBinnableTypeAggregator>::Input>() + { + // TODO collect statistics associated with this request: + trace!("{self_name} FEED THE ITEM..."); + self.agg.ingest(item); + if item.ends_after(self.agg.range().clone()) { + trace!("{self_name} FED ITEM, ENDS AFTER."); + self.cycle(); + if self.edges.len() < 2 { + warn!("{self_name} no more bin in edges C"); + return; + } else { + trace!("{self_name} FED ITEM, CYCLED, CONTINUE."); + } + } else { + trace!("{self_name} FED ITEM."); + break; + } + } else { + panic!("{self_name} not correct item type"); + }; + } + } + } + } + + fn push_in_progress(&mut self, push_empty: bool) { + let self_name = std::any::type_name::(); + trace!("{self_name}::push_in_progress"); + // TODO expand should be derived from AggKind. Is it still required after all? + // TODO here, the expand means that agg will assume that the current value is kept constant during + // the rest of the time range. + if self.edges.len() >= 2 { + let expand = true; + let range_next = if let Some(x) = self.next_bin_range() { + Some(x) + } else { + None + }; + let mut bins = if let Some(range_next) = range_next { + self.agg.result_reset(range_next, expand) + } else { + let range_next = NanoRange { + beg: u64::MAX - 1, + end: u64::MAX, + }; + self.agg.result_reset(range_next, expand) + }; + assert_eq!(bins.len(), 1); + if push_empty || bins.counts[0] != 0 { + match self.ready.as_mut() { + Some(ready) => { + ready.append_all_from(&mut bins); + } + None => { + self.ready = Some(bins); + } + } + } + } + } + + fn cycle(&mut self) { + let self_name = std::any::type_name::(); + trace!("{self_name}::cycle"); + // TODO refactor this logic. + let n = self.bins_ready_count(); + self.push_in_progress(true); + if self.bins_ready_count() == n { + if let Some(range) = self.next_bin_range() { + let mut bins = BinsDim0::::empty(); + bins.append_zero(range.beg, range.end); + match self.ready.as_mut() { + Some(ready) => { + ready.append_all_from(&mut bins); + } + None => { + self.ready = Some(bins); + } + } + if self.bins_ready_count() <= n { + error!("failed to push a zero bin"); + } + } else { + warn!("cycle: no in-progress bin pushed, but also no more bin to add as zero-bin"); + } + } + } + + fn set_range_complete(&mut self) { + self.range_complete = true; + } + + fn empty(&self) -> Box { + let ret = as TimeBinnableTypeAggregator>::Output::empty(); + Box::new(ret) + } +} + +// TODO remove this struct? +#[derive(Debug)] +pub struct EventsDim1CollectorDyn {} + +impl EventsDim1CollectorDyn { + pub fn new() -> Self { + Self {} + } +} + +impl items_0::collect_c::CollectorDyn for EventsDim1CollectorDyn { + fn len(&self) -> usize { + todo!() + } + + fn ingest(&mut self, _item: &mut dyn items_0::collect_c::CollectableWithDefault) { + todo!() + } + + fn set_range_complete(&mut self) { + todo!() + } + + fn set_timed_out(&mut self) { + todo!() + } + + fn result( + &mut self, + _range: Option, + _binrange: Option, + ) -> Result, err::Error> { + todo!() + } +} + +impl items_0::collect_c::CollectorDyn for EventsDim1Collector { + fn len(&self) -> usize { + WithLen::len(self) + } + + fn ingest(&mut self, item: &mut dyn items_0::collect_c::CollectableWithDefault) { + let x = item.as_any_mut(); + if let Some(item) = x.downcast_mut::>() { + items_0::collect_s::CollectorType::ingest(self, item) + } else { + // TODO need possibility to return error + () + } + } + + fn set_range_complete(&mut self) { + items_0::collect_s::CollectorType::set_range_complete(self); + } + + fn set_timed_out(&mut self) { + items_0::collect_s::CollectorType::set_timed_out(self); + } + + fn result( + &mut self, + range: Option, + binrange: Option, + ) -> Result, err::Error> { + items_0::collect_s::CollectorType::result(self, range, binrange) + .map(|x| Box::new(x) as _) + .map_err(|e| e.into()) + } +} + +impl items_0::collect_c::CollectableWithDefault for EventsDim1 { + fn new_collector(&self) -> Box { + let coll = EventsDim1Collector::::new(); + Box::new(coll) + } +} + +impl items_0::collect_c::Collectable for EventsDim1 { + fn new_collector(&self) -> Box { + Box::new(EventsDim1Collector::::new()) + } +} diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index 18bde2e..2ac09f3 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -1,7 +1,9 @@ pub mod binsdim0; pub mod binsxbindim0; pub mod channelevents; +pub mod databuffereventblobs; pub mod eventsdim0; +pub mod eventsdim1; pub mod eventsxbindim0; pub mod merger; pub mod merger_cev; @@ -222,23 +224,47 @@ pub fn empty_events_dyn_2(scalar_type: &ScalarType, shape: &Shape, agg_kind: &Ag I64 => Box::new(K::::empty()), F32 => Box::new(K::::empty()), F64 => Box::new(K::::empty()), + BOOL => Box::new(K::::empty()), _ => { - error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}"); + error!("TODO empty_events_dyn_2 {scalar_type:?} {shape:?} {agg_kind:?}"); err::todoval() } } } _ => { - error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}"); + error!("TODO empty_events_dyn_2 {scalar_type:?} {shape:?} {agg_kind:?}"); + err::todoval() + } + }, + Shape::Wave(..) => match agg_kind { + AggKind::Plain => { + use ScalarType::*; + type K = eventsdim1::EventsDim1; + match scalar_type { + U8 => Box::new(K::::empty()), + U16 => Box::new(K::::empty()), + U32 => Box::new(K::::empty()), + U64 => Box::new(K::::empty()), + I8 => Box::new(K::::empty()), + I16 => Box::new(K::::empty()), + I32 => Box::new(K::::empty()), + I64 => Box::new(K::::empty()), + F32 => Box::new(K::::empty()), + F64 => Box::new(K::::empty()), + BOOL => Box::new(K::::empty()), + _ => { + error!("TODO empty_events_dyn_2 {scalar_type:?} {shape:?} {agg_kind:?}"); + err::todoval() + } + } + } + _ => { + error!("TODO empty_events_dyn_2 {scalar_type:?} {shape:?} {agg_kind:?}"); err::todoval() } }, - Shape::Wave(..) => { - error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}"); - err::todoval() - } Shape::Image(..) => { - error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}"); + error!("TODO empty_events_dyn_2 {scalar_type:?} {shape:?} {agg_kind:?}"); err::todoval() } } @@ -273,10 +299,33 @@ pub fn empty_events_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggK err::todoval() } }, - Shape::Wave(..) => { - error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}"); - err::todoval() - } + Shape::Wave(..) => match agg_kind { + AggKind::Plain => { + use ScalarType::*; + type K = eventsdim1::EventsDim1; + match scalar_type { + U8 => Box::new(K::::empty()), + U16 => Box::new(K::::empty()), + U32 => Box::new(K::::empty()), + U64 => Box::new(K::::empty()), + I8 => Box::new(K::::empty()), + I16 => Box::new(K::::empty()), + I32 => Box::new(K::::empty()), + I64 => Box::new(K::::empty()), + F32 => Box::new(K::::empty()), + F64 => Box::new(K::::empty()), + BOOL => Box::new(K::::empty()), + _ => { + error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}"); + err::todoval() + } + } + } + _ => { + error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}"); + err::todoval() + } + }, Shape::Image(..) => { error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}"); err::todoval() diff --git a/items_2/src/test.rs b/items_2/src/test.rs index 38f2452..7961b4d 100644 --- a/items_2/src/test.rs +++ b/items_2/src/test.rs @@ -237,6 +237,7 @@ fn merge03() { let inp2_events_a = { let ev = ConnStatusEvent { ts: 1199, + datetime: std::time::SystemTime::UNIX_EPOCH, status: ConnStatus::Disconnect, }; let item: Sitemty = Ok(StreamItem::DataItem(RangeCompletableItem::Data( @@ -248,6 +249,7 @@ fn merge03() { let inp2_events_b = { let ev = ConnStatusEvent { ts: 1199, + datetime: std::time::SystemTime::UNIX_EPOCH, status: ConnStatus::Disconnect, }; let item: Sitemty = Ok(StreamItem::DataItem(RangeCompletableItem::Data( diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 174a853..ef229ca 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -1,14 +1,22 @@ use err::Error; -use futures_util::{Stream, StreamExt}; -use items::frame::{decode_frame, make_term_frame}; -use items::{EventQueryJsonStringFrame, Framable, RangeCompletableItem, Sitemty, StreamItem}; +use futures_util::Stream; +use futures_util::StreamExt; +use items::eventfull::EventFull; +use items::frame::decode_frame; +use items::frame::make_term_frame; +use items::EventQueryJsonStringFrame; +use items::Framable; +use items::RangeCompletableItem; +use items::Sitemty; +use items::StreamItem; use items_0::Empty; use items_2::channelevents::ChannelEvents; use netpod::histo::HistoLog2; use netpod::log::*; use netpod::query::PlainEventsQuery; use netpod::AggKind; -use netpod::{NodeConfigCached, PerfOpts}; +use netpod::NodeConfigCached; +use netpod::PerfOpts; use std::net::SocketAddr; use std::pin::Pin; use streams::frames::inmem::InMemoryFrameAsyncReadStream; @@ -48,6 +56,125 @@ impl> From<(E, OwnedWriteHalf)> for ConnErr { } } +async fn make_channel_events_stream( + evq: PlainEventsQuery, + node_config: &NodeConfigCached, +) -> Result> + Send>>, Error> { + if evq.channel().backend() == "test-inmem" { + warn!("TEST BACKEND DATA"); + use netpod::timeunits::MS; + let node_count = node_config.node_config.cluster.nodes.len(); + let node_ix = node_config.ix; + if evq.channel().name() == "inmem-d0-i32" { + let mut item = items_2::eventsdim0::EventsDim0::::empty(); + let td = MS * 10; + for i in 0..20 { + let ts = MS * 17 + td * node_ix as u64 + td * node_count as u64 * i; + let pulse = 1 + node_ix as u64 + node_count as u64 * i; + item.push(ts, pulse, pulse as _); + } + let item = ChannelEvents::Events(Box::new(item) as _); + let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); + let stream = futures_util::stream::iter([item]); + Ok(Box::pin(stream)) + } else if evq.channel().name() == "inmem-d0-f32" { + let mut item = items_2::eventsdim0::EventsDim0::::empty(); + let td = MS * 10; + for i in 0..20 { + let ts = MS * 17 + td * node_ix as u64 + td * node_count as u64 * i; + let pulse = 1 + node_ix as u64 + node_count as u64 * i; + item.push(ts, pulse, ts as _); + } + let item = ChannelEvents::Events(Box::new(item) as _); + let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); + let stream = futures_util::stream::iter([item]); + Ok(Box::pin(stream)) + } else { + let stream = futures_util::stream::empty(); + Ok(Box::pin(stream)) + } + } else if let Some(conf) = &node_config.node_config.cluster.scylla { + // TODO depends in general on the query + // TODO why both in PlainEventsQuery and as separate parameter? Check other usages. + let do_one_before_range = false; + // TODO use better builder pattern with shortcuts for production and dev defaults + let f = dbconn::channelconfig::chconf_from_database(evq.channel(), node_config) + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let scyco = conf; + let scy = scyllaconn::create_scy_session(scyco).await?; + let series = f.series; + let scalar_type = f.scalar_type; + let shape = f.shape; + let do_test_stream_error = false; + debug!("Make EventsStreamScylla for {series} {scalar_type:?} {shape:?}"); + let stream = scyllaconn::events::EventsStreamScylla::new( + series, + evq.range().clone(), + do_one_before_range, + scalar_type, + shape, + scy, + do_test_stream_error, + ); + let stream = stream + .map(move |item| match &item { + Ok(k) => match k { + ChannelEvents::Events(k) => { + let n = k.len(); + let d = evq.event_delay(); + (item, n, d.clone()) + } + ChannelEvents::Status(_) => (item, 1, None), + }, + Err(_) => (item, 1, None), + }) + .then(|(item, n, d)| async move { + if let Some(d) = d { + warn!("sleep {} times {:?}", n, d); + tokio::time::sleep(d).await; + } + item + }) + .map(|item| { + let item = match item { + Ok(item) => match item { + ChannelEvents::Events(item) => { + let item = ChannelEvents::Events(item); + let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); + item + } + ChannelEvents::Status(item) => { + let item = ChannelEvents::Status(item); + let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); + item + } + }, + Err(e) => Err(e), + }; + item + }); + Ok(Box::pin(stream)) + } else if let Some(_) = &node_config.node.channel_archiver { + let e = Error::with_msg_no_trace("archapp not built"); + Err(e) + } else if let Some(_) = &node_config.node.archiver_appliance { + let e = Error::with_msg_no_trace("archapp not built"); + Err(e) + } else { + Ok(disk::raw::conn::make_event_pipe(&evq, node_config).await?) + } +} + +async fn make_event_blobs_stream( + evq: PlainEventsQuery, + node_config: &NodeConfigCached, +) -> Result> + Send>>, Error> { + info!("make_event_blobs_stream"); + let stream = disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await?; + Ok(stream) +} + async fn events_conn_handler_inner_try( stream: TcpStream, addr: SocketAddr, @@ -117,138 +244,31 @@ async fn events_conn_handler_inner_try( return Err((e, netout).into()); } - let p1: Pin> + Send>> = if evq.channel().backend() == "test-inmem" { - warn!("TEST BACKEND DATA"); - use netpod::timeunits::MS; - let node_count = node_config.node_config.cluster.nodes.len(); - let node_ix = node_config.ix; - if evq.channel().name() == "inmem-d0-i32" { - let mut item = items_2::eventsdim0::EventsDim0::::empty(); - let td = MS * 10; - for i in 0..20 { - let ts = MS * 17 + td * node_ix as u64 + td * node_count as u64 * i; - let pulse = 1 + node_ix as u64 + node_count as u64 * i; - item.push(ts, pulse, pulse as _); + let mut stream: Pin> + Send>> = + if let AggKind::EventBlobs = evq.agg_kind() { + match make_event_blobs_stream(evq, node_config).await { + Ok(stream) => { + let stream = stream.map(|x| Box::new(x) as _); + Box::pin(stream) + } + Err(e) => { + return Err((e, netout).into()); + } } - let item = ChannelEvents::Events(Box::new(item) as _); - let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); - let stream = futures_util::stream::iter([item]); - Box::pin(stream) - } else if evq.channel().name() == "inmem-d0-f32" { - let mut item = items_2::eventsdim0::EventsDim0::::empty(); - let td = MS * 10; - for i in 0..20 { - let ts = MS * 17 + td * node_ix as u64 + td * node_count as u64 * i; - let pulse = 1 + node_ix as u64 + node_count as u64 * i; - item.push(ts, pulse, ts as _); - } - let item = ChannelEvents::Events(Box::new(item) as _); - let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); - let stream = futures_util::stream::iter([item]); - Box::pin(stream) } else { - let stream = futures_util::stream::empty(); - Box::pin(stream) - } - } else if let Some(conf) = &node_config.node_config.cluster.scylla { - // TODO depends in general on the query - // TODO why both in PlainEventsQuery and as separate parameter? Check other usages. - let do_one_before_range = false; - // TODO use better builder pattern with shortcuts for production and dev defaults - let f = dbconn::channelconfig::chconf_from_database(evq.channel(), node_config) - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}"))); - let f = match f { - Ok(k) => k, - Err(e) => return Err((e, netout))?, - }; - let scyco = conf; - let scy = match scyllaconn::create_scy_session(scyco).await { - Ok(k) => k, - Err(e) => return Err((e, netout))?, - }; - let series = f.series; - let scalar_type = f.scalar_type; - let shape = f.shape; - let do_test_stream_error = false; - debug!("Make EventsStreamScylla for {series} {scalar_type:?} {shape:?}"); - let stream = scyllaconn::events::EventsStreamScylla::new( - series, - evq.range().clone(), - do_one_before_range, - scalar_type, - shape, - scy, - do_test_stream_error, - ); - let stream = stream - .map(|item| match &item { - Ok(k) => match k { - ChannelEvents::Events(k) => { - let n = k.len(); - let d = evq.event_delay(); - (item, n, d.clone()) - } - ChannelEvents::Status(_) => (item, 1, None), - }, - Err(_) => (item, 1, None), - }) - .then(|(item, n, d)| async move { - if let Some(d) = d { - warn!("sleep {} times {:?}", n, d); - tokio::time::sleep(d).await; + match make_channel_events_stream(evq, node_config).await { + Ok(stream) => { + let stream = stream.map(|x| Box::new(x) as _); + Box::pin(stream) } - item - }) - .map(|item| { - let item = match item { - Ok(item) => match item { - ChannelEvents::Events(item) => { - let item = ChannelEvents::Events(item); - let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); - item - } - ChannelEvents::Status(item) => { - let item = ChannelEvents::Status(item); - let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); - item - } - }, - Err(e) => Err(e), - }; - item - }); - Box::pin(stream) - } else if let Some(_) = &node_config.node.channel_archiver { - let e = Error::with_msg_no_trace("archapp not built"); - return Err((e, netout))?; - } else if let Some(_) = &node_config.node.archiver_appliance { - let e = Error::with_msg_no_trace("archapp not built"); - return Err((e, netout))?; - } else { - let stream = match evq.agg_kind() { - AggKind::EventBlobs => match disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await { - Ok(_stream) => { - let e = Error::with_msg_no_trace("TODO make_event_blobs_pipe"); - return Err((e, netout))?; + Err(e) => { + return Err((e, netout).into()); } - Err(e) => return Err((e, netout))?, - }, - _ => match disk::raw::conn::make_event_pipe(&evq, node_config).await { - Ok(j) => j, - Err(e) => return Err((e, netout))?, - }, + } }; - stream - }; - let p1 = p1.inspect(|x| { - items::on_sitemty_range_complete!(x, warn!("GOOD ----------- SEE RangeComplete in conn.rs")); - }); - - let mut p1 = p1; let mut buf_len_histo = HistoLog2::new(5); - while let Some(item) = p1.next().await { + while let Some(item) = stream.next().await { let item = item.make_frame(); match item { Ok(buf) => { diff --git a/scyllaconn/src/events.rs b/scyllaconn/src/events.rs index 20ad709..0514f9e 100644 --- a/scyllaconn/src/events.rs +++ b/scyllaconn/src/events.rs @@ -3,17 +3,13 @@ use err::Error; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; -use futures_util::StreamExt; use items_0::Empty; use items_0::Events; use items_0::WithLen; use items_2::channelevents::ChannelEvents; -use items_2::channelevents::ConnStatus; -use items_2::channelevents::ConnStatusEvent; use items_2::eventsdim0::EventsDim0; +use items_2::eventsdim1::EventsDim1; use netpod::log::*; -use netpod::query::ChannelStateEventsQuery; -use netpod::timeunits::*; use netpod::NanoRange; use netpod::ScalarType; use netpod::Shape; @@ -61,7 +57,7 @@ async fn find_ts_msp( macro_rules! read_next_scalar_values { ($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => { async fn $fname( - series: i64, + series: u64, ts_msp: u64, range: NanoRange, fwd: bool, @@ -97,7 +93,10 @@ macro_rules! read_next_scalar_values { " where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?" ); let res = scy - .query(cql, (series, ts_msp as i64, ts_lsp_min as i64, ts_lsp_max as i64)) + .query( + cql, + (series as i64, ts_msp as i64, ts_lsp_min as i64, ts_lsp_max as i64), + ) .await .err_conv()?; let mut last_before = None; @@ -135,7 +134,7 @@ macro_rules! read_next_scalar_values { " where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1" ); let res = scy - .query(cql, (series, ts_msp as i64, ts_lsp_max as i64)) + .query(cql, (series as i64, ts_msp as i64, ts_lsp_max as i64)) .await .err_conv()?; let mut seen_before = false; @@ -166,37 +165,107 @@ macro_rules! read_next_scalar_values { macro_rules! read_next_array_values { ($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => { async fn $fname( - series: i64, + series: u64, ts_msp: u64, - _range: NanoRange, - _fwd: bool, + range: NanoRange, + fwd: bool, scy: Arc, - ) -> Result, Error> { - // TODO change return type: so far EventsDim1 does not exist. - error!("TODO read_next_array_values"); - err::todo(); - if true { - return Err(Error::with_msg_no_trace("redo based on scalar case")); - } + ) -> Result, Error> { type ST = $st; - type _SCYTY = $scyty; - trace!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp); - let cql = concat!( - "select ts_lsp, pulse, value from ", - $table_name, - " where series = ? and ts_msp = ?" - ); - let _res = scy.query(cql, (series, ts_msp as i64)).await.err_conv()?; - let ret = EventsDim0::::empty(); - /* - for row in res.rows_typed_or_empty::<(i64, i64, Vec)>() { - let row = row.err_conv()?; - let ts = ts_msp + row.0 as u64; - let pulse = row.1 as u64; - let value = row.2.into_iter().map(|x| x as ST).collect(); - ret.push(ts, pulse, value); + type SCYTY = $scyty; + if ts_msp >= range.end { + warn!( + "given ts_msp {} >= range.end {} not necessary to read this", + ts_msp, range.end + ); } - */ + if range.end > i64::MAX as u64 { + return Err(Error::with_msg_no_trace(format!("range.end overflows i64"))); + } + let ret = if fwd { + let ts_lsp_min = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; + let ts_lsp_max = if ts_msp < range.end { range.end - ts_msp } else { 0 }; + trace!( + "FWD {} ts_msp {} ts_lsp_min {} ts_lsp_max {} beg {} end {} {}", + stringify!($fname), + ts_msp, + ts_lsp_min, + ts_lsp_max, + range.beg, + range.end, + stringify!($table_name) + ); + // TODO use prepared! + let cql = concat!( + "select ts_lsp, pulse, value from ", + $table_name, + " where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?" + ); + let res = scy + .query( + cql, + (series as i64, ts_msp as i64, ts_lsp_min as i64, ts_lsp_max as i64), + ) + .await + .err_conv()?; + let mut last_before = None; + let mut ret = EventsDim1::::empty(); + for row in res.rows_typed_or_empty::<(i64, i64, Vec)>() { + let row = row.err_conv()?; + let ts = ts_msp + row.0 as u64; + let pulse = row.1 as u64; + let value = row.2.into_iter().map(|x| x as ST).collect(); + if ts >= range.end { + } else if ts >= range.beg { + ret.push(ts, pulse, value); + } else { + if last_before.is_none() { + warn!("encounter event before range in forward read {ts}"); + } + last_before = Some((ts, pulse, value)); + } + } + ret + } else { + let ts_lsp_max = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; + trace!( + "BCK {} ts_msp {} ts_lsp_max {} beg {} end {} {}", + stringify!($fname), + ts_msp, + ts_lsp_max, + range.beg, + range.end, + stringify!($table_name) + ); + // TODO use prepared! + let cql = concat!( + "select ts_lsp, pulse, value from ", + $table_name, + " where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1" + ); + let res = scy + .query(cql, (series as i64, ts_msp as i64, ts_lsp_max as i64)) + .await + .err_conv()?; + let mut seen_before = false; + let mut ret = EventsDim1::::empty(); + for row in res.rows_typed_or_empty::<(i64, i64, Vec)>() { + let row = row.err_conv()?; + let ts = ts_msp + row.0 as u64; + let pulse = row.1 as u64; + let value = row.2.into_iter().map(|x| x as ST).collect(); + if ts >= range.end { + } else if ts < range.beg { + ret.push(ts, pulse, value); + } else { + if !seen_before { + warn!("encounter event before range in forward read {ts}"); + } + seen_before = true; + } + } + ret + }; trace!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); Ok(ret) } @@ -214,7 +283,8 @@ read_next_scalar_values!(read_next_values_scalar_i64, i64, i64, "events_scalar_i read_next_scalar_values!(read_next_values_scalar_f32, f32, f32, "events_scalar_f32"); read_next_scalar_values!(read_next_values_scalar_f64, f64, f64, "events_scalar_f64"); -read_next_array_values!(read_next_values_array_u16, u16, i16, "events_wave_u16"); +read_next_array_values!(read_next_values_array_u16, u16, i16, "events_array_u16"); +read_next_array_values!(read_next_values_array_bool, bool, bool, "events_array_bool"); macro_rules! read_values { ($fname:ident, $self:expr, $ts_msp:expr) => {{ @@ -234,19 +304,20 @@ macro_rules! read_values { } struct ReadValues { - series: i64, + series: u64, scalar_type: ScalarType, shape: Shape, range: NanoRange, ts_msps: VecDeque, fwd: bool, fut: Pin, Error>> + Send>>, + fut_done: bool, scy: Arc, } impl ReadValues { fn new( - series: i64, + series: u64, scalar_type: ScalarType, shape: Shape, range: NanoRange, @@ -264,6 +335,7 @@ impl ReadValues { fut: Box::pin(futures_util::future::ready(Err(Error::with_msg_no_trace( "future not initialized", )))), + fut_done: false, scy, }; ret.next(); @@ -273,6 +345,7 @@ impl ReadValues { fn next(&mut self) -> bool { if let Some(ts_msp) = self.ts_msps.pop_front() { self.fut = self.make_fut(ts_msp); + self.fut_done = false; true } else { false @@ -321,6 +394,10 @@ impl ReadValues { ScalarType::U16 => { read_values!(read_next_values_array_u16, self, ts_msp) } + ScalarType::BOOL => { + info!("attempt to read bool"); + read_values!(read_next_values_array_bool, self, ts_msp) + } _ => { error!("TODO ReadValues add more types"); err::todoval() @@ -401,7 +478,7 @@ impl EventsStreamScylla { if let Some(msp) = self.ts_msp_b1.clone() { trace!("Try ReadBack1"); let st = ReadValues::new( - self.series as i64, + self.series, self.scalar_type.clone(), self.shape.clone(), self.range.clone(), @@ -413,7 +490,7 @@ impl EventsStreamScylla { } else if self.ts_msps.len() >= 1 { trace!("Go straight for forward read"); let st = ReadValues::new( - self.series as i64, + self.series, self.scalar_type.clone(), self.shape.clone(), self.range.clone(), @@ -433,7 +510,7 @@ impl EventsStreamScylla { self.outqueue.push_back(item); if self.ts_msps.len() > 0 { let st = ReadValues::new( - self.series as i64, + self.series, self.scalar_type.clone(), self.shape.clone(), self.range.clone(), @@ -449,7 +526,7 @@ impl EventsStreamScylla { if let Some(msp) = self.ts_msp_b2.clone() { trace!("Try ReadBack2"); let st = ReadValues::new( - self.series as i64, + self.series, self.scalar_type.clone(), self.shape.clone(), self.range.clone(), @@ -461,7 +538,7 @@ impl EventsStreamScylla { } else if self.ts_msps.len() >= 1 { trace!("No 2nd back MSP, go for forward read"); let st = ReadValues::new( - self.series as i64, + self.series, self.scalar_type.clone(), self.shape.clone(), self.range.clone(), @@ -483,7 +560,7 @@ impl EventsStreamScylla { } if self.ts_msps.len() >= 1 { let st = ReadValues::new( - self.series as i64, + self.series, self.scalar_type.clone(), self.shape.clone(), self.range.clone(), @@ -534,10 +611,12 @@ impl Stream for EventsStreamScylla { }, FrState::ReadBack1(ref mut st) => match st.fut.poll_unpin(cx) { Ready(Ok(item)) => { + st.fut_done = true; self.back_1_done(item); continue; } Ready(Err(e)) => { + st.fut_done = true; self.state = FrState::Done; Ready(Some(Err(e))) } @@ -545,10 +624,12 @@ impl Stream for EventsStreamScylla { }, FrState::ReadBack2(ref mut st) => match st.fut.poll_unpin(cx) { Ready(Ok(item)) => { + st.fut_done = true; self.back_2_done(item); continue; } Ready(Err(e)) => { + st.fut_done = true; self.state = FrState::Done; Ready(Some(Err(e))) } @@ -556,6 +637,7 @@ impl Stream for EventsStreamScylla { }, FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) { Ready(Ok(item)) => { + st.fut_done = true; if !st.next() { trace!("ReadValues exhausted"); self.state = FrState::Done; @@ -565,7 +647,10 @@ impl Stream for EventsStreamScylla { } continue; } - Ready(Err(e)) => Ready(Some(Err(e))), + Ready(Err(e)) => { + st.fut_done = true; + Ready(Some(Err(e))) + } Pending => Pending, }, FrState::Done => Ready(None), @@ -573,57 +658,3 @@ impl Stream for EventsStreamScylla { } } } - -async fn _channel_state_events( - evq: &ChannelStateEventsQuery, - scy: Arc, -) -> Result> + Send>>, Error> { - let (tx, rx) = async_channel::bounded(8); - let evq = evq.clone(); - let fut = async move { - let div = DAY; - let mut ts_msp = evq.range().beg / div * div; - loop { - let series = (evq - .channel() - .series() - .ok_or(Error::with_msg_no_trace(format!("series id not given"))))?; - let params = (series as i64, ts_msp as i64); - let mut res = scy - .query_iter( - "select ts_lsp, kind from channel_status where series = ? and ts_msp = ?", - params, - ) - .await - .err_conv()?; - while let Some(row) = res.next().await { - let row = row.err_conv()?; - let (ts_lsp, kind): (i64, i32) = row.into_typed().err_conv()?; - let ts = ts_msp + ts_lsp as u64; - let kind = kind as u32; - if ts >= evq.range().beg && ts < evq.range().end { - let status = match kind { - 1 => ConnStatus::Connect, - 2 => ConnStatus::Disconnect, - _ => { - let e = Error::with_msg_no_trace(format!("bad status kind {kind}")); - let e2 = Error::with_msg_no_trace(format!("bad status kind {kind}")); - let _ = tx.send(Err(e)).await; - return Err(e2); - } - }; - let ev = ConnStatusEvent { ts, status }; - tx.send(Ok(ev)).await.map_err(|e| format!("{e}"))?; - } - } - ts_msp += div; - if ts_msp >= evq.range().end { - break; - } - } - Ok(()) - }; - // TODO join the task (better: rewrite as proper stream) - tokio::spawn(fut); - Ok(Box::pin(rx)) -} diff --git a/scyllaconn/src/status.rs b/scyllaconn/src/status.rs index 32b8379..f53afac 100644 --- a/scyllaconn/src/status.rs +++ b/scyllaconn/src/status.rs @@ -1,7 +1,10 @@ use crate::errconv::ErrConv; use err::Error; -use futures_util::{Future, FutureExt, Stream}; -use items_2::channelevents::{ConnStatus, ConnStatusEvent}; +use futures_util::Future; +use futures_util::FutureExt; +use futures_util::Stream; +use items_2::channelevents::ChannelStatus; +use items_2::channelevents::ChannelStatusEvent; use netpod::log::*; use netpod::NanoRange; use netpod::CONNECTION_STATUS_DIV; @@ -9,7 +12,10 @@ use scylla::Session as ScySession; use std::collections::VecDeque; use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::task::Context; +use std::task::Poll; +use std::time::Duration; +use std::time::SystemTime; async fn read_next_status_events( series: u64, @@ -18,7 +24,7 @@ async fn read_next_status_events( fwd: bool, do_one_before: bool, scy: Arc, -) -> Result, Error> { +) -> Result, Error> { if ts_msp >= range.end { warn!( "given ts_msp {} >= range.end {} not necessary to read this", @@ -73,10 +79,10 @@ async fn read_next_status_events( let row = row.err_conv()?; let ts = ts_msp + row.0 as u64; let kind = row.1 as u32; - // from netfetch::store::ChannelStatus - let ev = ConnStatusEvent { + let ev = ChannelStatusEvent { ts, - status: ConnStatus::from_ca_ingest_status_kind(kind), + datetime: SystemTime::UNIX_EPOCH + Duration::from_millis(ts / 1000000), + status: ChannelStatus::from_ca_ingest_status_kind(kind), }; if ts >= range.end { } else if ts >= range.beg { @@ -101,7 +107,7 @@ struct ReadValues { ts_msps: VecDeque, fwd: bool, do_one_before_range: bool, - fut: Pin, Error>> + Send>>, + fut: Pin, Error>> + Send>>, scy: Arc, } @@ -142,7 +148,7 @@ impl ReadValues { fn make_fut( &mut self, ts_msp: u64, - ) -> Pin, Error>> + Send>> { + ) -> Pin, Error>> + Send>> { info!("make fut for {ts_msp}"); let fut = read_next_status_events( self.series, @@ -168,7 +174,7 @@ pub struct StatusStreamScylla { range: NanoRange, do_one_before_range: bool, scy: Arc, - outbuf: VecDeque, + outbuf: VecDeque, } impl StatusStreamScylla { @@ -185,7 +191,7 @@ impl StatusStreamScylla { } impl Stream for StatusStreamScylla { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; diff --git a/taskrun/src/append.rs b/taskrun/src/append.rs index fbc2375..84ed1d6 100644 --- a/taskrun/src/append.rs +++ b/taskrun/src/append.rs @@ -4,6 +4,8 @@ use std::fs; use std::io::{BufWriter, Read, Seek, SeekFrom, Stdin, Write}; use std::path::{Path, PathBuf}; +const MAX_PER_FILE: u64 = 1024 * 1024 * 2; + pub struct Buffer { buf: Vec, wp: usize, @@ -97,7 +99,7 @@ impl Buffer { } fn parse_lines(buf: &[u8]) -> Result<(Vec>, usize), Error> { - let mut ret = vec![]; + let mut ret = Vec::new(); let mut i1 = 0; let mut i2 = 0; while i1 < buf.len() { @@ -134,9 +136,6 @@ fn parse_lines(buf: &[u8]) -> Result<(Vec>, usize), Error> { Ok((ret, i2)) } -const MAX_PER_FILE: u64 = 1024 * 1024 * 2; -const MAX_TOTAL_SIZE: u64 = 1024 * 1024 * 20; - struct Fileinfo { path: PathBuf, name: String, @@ -144,7 +143,7 @@ struct Fileinfo { } fn file_list(dir: &Path) -> Result, Error> { - let mut ret = vec![]; + let mut ret = Vec::new(); let rd = fs::read_dir(&dir)?; for e in rd { let e = e?; @@ -191,7 +190,7 @@ fn next_file(dir: &Path) -> Result, Error> { Ok(ret) } -pub fn append_inner(dirname: &str, mut stdin: Stdin) -> Result<(), Error> { +pub fn append_inner(dirname: &str, total_size_max: u64, mut stdin: Stdin) -> Result<(), Error> { let mut bytes_written = 0; let dir = PathBuf::from(dirname); let mut fout = open_latest_or_new(&dir)?; @@ -265,7 +264,7 @@ pub fn append_inner(dirname: &str, mut stdin: Stdin) -> Result<(), Error> { let l1 = fout.seek(SeekFrom::End(0))?; if l1 >= MAX_PER_FILE { let rd = fs::read_dir(&dir)?; - let mut w = vec![]; + let mut w = Vec::new(); for e in rd { let e = e?; let fnos = e.file_name(); @@ -282,7 +281,7 @@ pub fn append_inner(dirname: &str, mut stdin: Stdin) -> Result<(), Error> { let mut lentot = w.iter().map(|g| g.1).fold(0, |a, x| a + x); write!(&mut fout, "[APPEND-LENTOT] {}\n", lentot)?; for q in w { - if lentot <= MAX_TOTAL_SIZE as u64 { + if lentot <= total_size_max { break; } write!(&mut fout, "[APPEND-REMOVE] {} {}\n", q.1, q.0.to_string_lossy())?; @@ -303,8 +302,8 @@ pub fn append_inner(dirname: &str, mut stdin: Stdin) -> Result<(), Error> { } } -pub fn append(dirname: &str, stdin: Stdin) -> Result<(), Error> { - match append_inner(dirname, stdin) { +pub fn append(dirname: &str, total_size_max: u64, stdin: Stdin) -> Result<(), Error> { + match append_inner(dirname, total_size_max, stdin) { Ok(k) => { eprintln!("append_inner has returned"); Ok(k)