From 9ad7b792252b4144af3cef5d3f815746e364c2f1 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 12 Jan 2023 11:04:31 +0100 Subject: [PATCH] Basic ca echo --- .cargo/cargo-lock | 393 +++++++++++++++-------------------- daqingest/src/daemon.rs | 165 +++++++++++---- netfetch/src/ca.rs | 5 + netfetch/src/ca/conn.rs | 85 +++++++- netfetch/src/ca/connset.rs | 22 +- netfetch/src/ca/proto.rs | 9 + netfetch/src/insertworker.rs | 18 +- netfetch/src/zmtp.rs | 15 +- 8 files changed, 408 insertions(+), 304 deletions(-) diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index 1967927..57bd6ec 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -4,9 +4,9 @@ version = 3 [[package]] name = "addr2line" -version = "0.17.0" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9ecd88a8c8378ca913a680cd98f0f13ac67383d35993f86c90a70e3f137816b" +checksum = "a76fd60b23679b7d19bd066031410fb7e458ccc5e958eb5c325888ce4baedc97" dependencies = [ "gimli", ] @@ -37,15 +37,15 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.66" +version = "1.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" +checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61" [[package]] name = "arc-swap" -version = "1.5.1" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "983cd8b9d4b02a6dc6ffa557262eb5858a27a0038ffffe21a0f133eaa819a164" +checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" [[package]] name = "arrayref" @@ -87,9 +87,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.59" +version = "0.1.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31e6e93155431f3931513b243d371981bb2770112b370c82745a1d19d2f99364" +checksum = "705339e0e4a9690e2908d2b3d049d85682cf19fbd5782494498fbf7003a6a282" dependencies = [ "proc-macro2", "quote", @@ -104,12 +104,12 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.5.17" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acee9fd5073ab6b045a275b3e709c163dd36c90685219cb21804a147b58dba43" +checksum = "1304eab461cf02bd70b083ed8273388f9724c549b316ba3d1e213ce0e9e7fb7e" dependencies = [ "async-trait", - "axum-core 0.2.9", + "axum-core", "bitflags", "bytes", "futures-util", @@ -117,13 +117,15 @@ dependencies = [ "http-body", "hyper", "itoa", - "matchit 0.5.0", + "matchit", "memchr", "mime", "percent-encoding", "pin-project-lite", + "rustversion", "serde", "serde_json", + "serde_path_to_error", "serde_urlencoded", "sync_wrapper", "tokio", @@ -133,56 +135,11 @@ dependencies = [ "tower-service", ] -[[package]] -name = "axum" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08b108ad2665fa3f6e6a517c3d80ec3e77d224c47d605167aefaa5d7ef97fa48" -dependencies = [ - "async-trait", - "axum-core 0.3.0", - "bitflags", - "bytes", - "futures-util", - "http", - "http-body", - "hyper", - "itoa", - "matchit 0.7.0", - "memchr", - "mime", - "percent-encoding", - "pin-project-lite", - "rustversion", - "serde", - "sync_wrapper", - "tower", - "tower-http", - "tower-layer", - "tower-service", -] - [[package]] name = "axum-core" -version = "0.2.9" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37e5939e02c56fecd5c017c37df4238c0a839fa76b7f97acdd7efb804fd181cc" -dependencies = [ - "async-trait", - "bytes", - "futures-util", - "http", - "http-body", - "mime", - "tower-layer", - "tower-service", -] - -[[package]] -name = "axum-core" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79b8558f5a0581152dc94dcd289132a1d377494bdeafcd41869b3258e3e2ad92" +checksum = "f487e40dc9daee24d8a1779df88522f159a54a980f99cfbe43db0be0bd3444a8" dependencies = [ "async-trait", "bytes", @@ -197,15 +154,15 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.66" +version = "0.3.67" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cab84319d616cfb654d03394f38ab7e6f0919e181b1b57e1fd15e7fb4077d9a7" +checksum = "233d376d6d185f2a3093e58f283f60f880315b6c60075b01f36b3b85154564ca" dependencies = [ "addr2line", "cc", "cfg-if", "libc", - "miniz_oxide 0.5.4", + "miniz_oxide", "object", "rustc-demangle", ] @@ -241,15 +198,6 @@ dependencies = [ "libc", ] -[[package]] -name = "block-buffer" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" -dependencies = [ - "generic-array", -] - [[package]] name = "block-buffer" version = "0.10.3" @@ -279,9 +227,9 @@ checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c" [[package]] name = "cc" -version = "1.0.77" +version = "1.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9f73505338f7d905b19d18738976aae232eb46b8efc15554ffc56deb5d9ebe4" +checksum = "a20104e2335ce8a659d6dd92a51a767a0c062599c73b343fd152cb401e828c3d" [[package]] name = "cfg-if" @@ -307,9 +255,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.0.29" +version = "4.0.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d63b9e9c07271b9957ad22c173bae2a4d9a81127680962039296abcd2f8251d" +checksum = "a7db700bc935f9e43e88d00b0850dae18a63773cfbec6d8e070fccf7fef89a39" dependencies = [ "bitflags", "clap_derive", @@ -452,9 +400,9 @@ dependencies = [ [[package]] name = "cxx" -version = "1.0.83" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdf07d07d6531bfcdbe9b8b739b104610c6508dcc4d63b410585faf338241daf" +checksum = "51d1075c37807dcf850c379432f0df05ba52cc30f279c5cfc43cc221ce7f8579" dependencies = [ "cc", "cxxbridge-flags", @@ -464,9 +412,9 @@ dependencies = [ [[package]] name = "cxx-build" -version = "1.0.83" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2eb5b96ecdc99f72657332953d4d9c50135af1bac34277801cc3937906ebd39" +checksum = "5044281f61b27bc598f2f6647d480aed48d2bf52d6eb0b627d84c0361b17aa70" dependencies = [ "cc", "codespan-reporting", @@ -479,15 +427,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.83" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac040a39517fd1674e0f32177648334b0f4074625b5588a64519804ba0553b12" +checksum = "61b50bc93ba22c27b0d31128d2d130a0a6b3d267ae27ef7e4fae2167dfe8781c" [[package]] name = "cxxbridge-macro" -version = "1.0.83" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1362b0ddcfc4eb0a1f57b68bd77dd99f0e826958a96abd0ae9bd092e114ffed6" +checksum = "39e61fda7e62115119469c7b3591fd913ecca96fb766cfd3f2e2502ab7bc87a5" dependencies = [ "proc-macro2", "quote", @@ -498,14 +446,22 @@ dependencies = [ name = "daqingest" version = "0.1.3" dependencies = [ + "async-channel", "bytes", "chrono", "clap", "err", + "futures-util", "log 0.0.1", "netfetch", + "netpod", "scylla", + "serde", + "stats", "taskrun", + "tokio", + "tokio-postgres", + "tracing", ] [[package]] @@ -521,22 +477,13 @@ dependencies = [ "parking_lot_core", ] -[[package]] -name = "digest" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" -dependencies = [ - "generic-array", -] - [[package]] name = "digest" version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f" dependencies = [ - "block-buffer 0.10.3", + "block-buffer", "crypto-common", "subtle", ] @@ -549,9 +496,9 @@ checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797" [[package]] name = "erased-serde" -version = "0.3.23" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54558e0ba96fbe24280072642eceb9d7d442e32c7ec0ea9e7ecd7b4ea2cf4e11" +checksum = "e4ca605381c017ec7a5fef5e548f1cfaa419ed0f6df6367339300db74c92aa7d" dependencies = [ "serde", ] @@ -613,7 +560,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8a2db397cb1c8772f31494cb8917e48cd1e64f0fa7efac59fbd741a0a8ce841" dependencies = [ "crc32fast", - "miniz_oxide 0.6.2", + "miniz_oxide", ] [[package]] @@ -743,9 +690,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.26.2" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22030e2c5a68ec659fde1e949a745124b48e6fa8b045b7ed5bd1fe4ccc5c4e5d" +checksum = "dec7af912d60cdbd3677c1af9352ebae6fb8394d165568a2234df0fa00f87793" [[package]] name = "h2" @@ -806,15 +753,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" -[[package]] -name = "hermit-abi" -version = "0.1.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" -dependencies = [ - "libc", -] - [[package]] name = "hermit-abi" version = "0.2.6" @@ -842,7 +780,7 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" dependencies = [ - "digest 0.10.6", + "digest", ] [[package]] @@ -993,11 +931,11 @@ dependencies = [ [[package]] name = "is-terminal" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "927609f78c2913a6f6ac3c27a4fe87f43e2a35367c0c4b0f8265e8f49a104330" +checksum = "28dfb6c8100ccc63462345b67d1bbc3679177c75ee4bf59bf29c8b1d110b8189" dependencies = [ - "hermit-abi 0.2.6", + "hermit-abi", "io-lifetimes", "rustix", "windows-sys", @@ -1014,9 +952,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4217ad341ebadf8d8e724e264f13e593e0648f5b3e94b3896a5df283be015ecc" +checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440" [[package]] name = "js-sys" @@ -1035,30 +973,24 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.138" +version = "0.2.139" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db6d7e329c562c5dfab7a46a2afabc8b987ab9a4834c9d1ca04dc54c1546cef8" +checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79" [[package]] name = "link-cplusplus" -version = "1.0.7" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9272ab7b96c9046fbc5bc56c06c117cb639fe2d509df0c421cad82d2915cf369" +checksum = "ecd207c9c713c34f95a097a5b029ac2ce6010530c7b49d7fea24d977dede04f5" dependencies = [ "cc", ] -[[package]] -name = "linked-hash-map" -version = "0.5.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" - [[package]] name = "linux-raw-sys" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f9f08d8963a6c613f4b1a78f4f4a4dbfadf8e6545b2d72861731e4858b8b47f" +checksum = "f051f77a7c8e6957c0696eac88f26b0117e54f52d3fc682ab19397a8812846a4" [[package]] name = "lock_api" @@ -1104,36 +1036,19 @@ dependencies = [ "regex-automata", ] -[[package]] -name = "matchit" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" - [[package]] name = "matchit" version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" -[[package]] -name = "md-5" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b5a279bb9607f9f53c22d496eade00d138d1bdcccd07d74650387cf94942a15" -dependencies = [ - "block-buffer 0.9.0", - "digest 0.9.0", - "opaque-debug", -] - [[package]] name = "md-5" version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca" dependencies = [ - "digest 0.10.6", + "digest", ] [[package]] @@ -1154,15 +1069,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" -[[package]] -name = "miniz_oxide" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96590ba8f175222643a85693f33d26e9c8a015f599c216509b1a6894af675d34" -dependencies = [ - "adler", -] - [[package]] name = "miniz_oxide" version = "0.6.2" @@ -1190,7 +1096,7 @@ version = "0.0.2" dependencies = [ "arrayref", "async-channel", - "axum 0.5.17", + "axum", "bitshuffle", "byteorder", "bytes", @@ -1205,7 +1111,7 @@ dependencies = [ "lazy_static", "libc", "log 0.0.1", - "md-5 0.9.1", + "md-5", "netpod", "pin-project", "regex", @@ -1231,6 +1137,7 @@ dependencies = [ "chrono", "err", "futures-util", + "humantime-serde", "num-traits", "serde", "serde_json", @@ -1240,9 +1147,9 @@ dependencies = [ [[package]] name = "nom" -version = "7.1.1" +version = "7.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8903e5a29a317527874d0402f867152a3d21c908bb0b933e416c65e301d4c36" +checksum = "e5507769c4919c998e69e49c839d9dc6e693ede4cc4290d6ad8b41d4f09c548c" dependencies = [ "memchr", "minimal-lexical", @@ -1290,11 +1197,11 @@ dependencies = [ [[package]] name = "num_cpus" -version = "1.14.0" +version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6058e64324c71e02bc2b150e4f3bc8286db6c83092132ffa3f6b1eab0f9def5" +checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b" dependencies = [ - "hermit-abi 0.1.19", + "hermit-abi", "libc", ] @@ -1321,24 +1228,18 @@ dependencies = [ [[package]] name = "object" -version = "0.29.0" +version = "0.30.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21158b2c33aa6d4561f1c0a6ea283ca92bc54802a93b263e910746d679a7eb53" +checksum = "2b8c786513eb403643f2a88c244c2aaa270ef2153f55094587d0c48a3cf22a83" dependencies = [ "memchr", ] [[package]] name = "once_cell" -version = "1.16.0" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860" - -[[package]] -name = "opaque-debug" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +checksum = "6f61fba1741ea2b3d6a1e3178721804bb716a68a6aeba1149b5d52e3d464ea66" [[package]] name = "os_str_bytes" @@ -1377,9 +1278,9 @@ dependencies = [ [[package]] name = "paste" -version = "1.0.10" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf1c2c742266c2f1041c914ba65355a83ae8747b05f208319784083583494b4b" +checksum = "d01a5bd0424d00070b0098dd17ebca6f961a959dead1dbcbbbc1d1cd8d3deeba" [[package]] name = "percent-encoding" @@ -1448,7 +1349,7 @@ dependencies = [ "bytes", "fallible-iterator", "hmac", - "md-5 0.10.5", + "md-5", "memchr", "rand", "sha2", @@ -1509,18 +1410,18 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.47" +version = "1.0.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725" +checksum = "57a8eca9f9c4ffde41714334dee777596264c7825420f521abc92b5b5deb63a5" dependencies = [ "unicode-ident", ] [[package]] name = "prost" -version = "0.11.3" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0b18e655c21ff5ac2084a5ad0611e827b3f92badf79f4910b5a5c58f4d87ff0" +checksum = "21dc42e00223fc37204bd4aa177e69420c604ca4a183209a8f9de30c6d934698" dependencies = [ "bytes", "prost-derive", @@ -1528,9 +1429,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.11.2" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "164ae68b6587001ca506d3bf7f1000bfa248d0e1217b618108fba4ec1d0cc306" +checksum = "8bda8c0881ea9f722eb9629376db3d0b903b462477c1aafcb0566610ac28ac5d" dependencies = [ "anyhow", "itertools", @@ -1541,9 +1442,9 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.11.2" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "747761bc3dc48f9a34553bf65605cf6cb6288ba219f3450b4275dbd81539551a" +checksum = "a5e0526209433e96d83d750dd81a99118edbc55739e7e61a46764fd2ad537788" dependencies = [ "bytes", "prost", @@ -1551,9 +1452,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.21" +version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbe448f377a7d6961e30f5955f9b8d106c3f5e449d493ee1b125c1d43c2b5179" +checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b" dependencies = [ "proc-macro2", ] @@ -1599,9 +1500,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.7.0" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e076559ef8e241f2ae3479e36f97bd5741c0330689e217ad51ce2c76808b868a" +checksum = "48aaa5748ba571fb95cd2c85c09f629215d3a6ece942baa100950af03a34f733" dependencies = [ "aho-corasick", "memchr", @@ -1653,9 +1554,9 @@ checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" [[package]] name = "rustix" -version = "0.36.5" +version = "0.36.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3807b5d10909833d3e9acd1eb5fb988f79376ff10fce42937de71a449c4c588" +checksum = "4feacf7db682c6c329c4ede12649cd36ecab0f3be5b7d74e6a20304725db4549" dependencies = [ "bitflags", "errno", @@ -1667,15 +1568,15 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.9" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97477e48b4cf8603ad5f7aaf897467cf42ab4218a38ef76fb14c2d6773a6d6a8" +checksum = "5583e89e108996506031660fe09baa5011b9dd0341b89029313006d1fb508d70" [[package]] name = "ryu" -version = "1.0.11" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09" +checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde" [[package]] name = "scopeguard" @@ -1685,17 +1586,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "scratch" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c8132065adcfd6e02db789d9285a0deb2f3fcb04002865ab67d5fb103533898" +checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2" [[package]] name = "scylla" -version = "0.4.7" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e16bd82cb3eb8961f45759695eee56f162e73c1e2e30042d9450dfa98d600ac1" +checksum = "4b9d4ef7fb24d95d30c4a8da782bb2afead3a8b66f32c805bb82a03722d7be33" dependencies = [ "arc-swap", + "async-trait", "bigdecimal", "byteorder", "bytes", @@ -1708,6 +1610,7 @@ dependencies = [ "num-bigint", "num_enum", "rand", + "scylla-cql", "scylla-macros", "smallvec", "snap", @@ -1720,10 +1623,31 @@ dependencies = [ ] [[package]] -name = "scylla-macros" -version = "0.1.1" +name = "scylla-cql" +version = "0.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13dc0caffb1274feb3df615e3260cb71a5a7a5d579adc49ba5544c87950a701c" +checksum = "6972061cbcc83754b4243d007ae51c1a1345a950b368cbdaad0186eac8799203" +dependencies = [ + "async-trait", + "bigdecimal", + "byteorder", + "bytes", + "chrono", + "lz4_flex", + "num-bigint", + "num_enum", + "scylla-macros", + "snap", + "thiserror", + "tokio", + "uuid", +] + +[[package]] +name = "scylla-macros" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e03b3a19daa79085439113c746d2946e5e6effd2d9039bf092bb08df915487b2" dependencies = [ "quote", "syn", @@ -1731,9 +1655,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.150" +version = "1.0.152" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e326c9ec8042f1b5da33252c8a37e9ffbd2c9bef0155215b6e6c80c790e05f91" +checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb" dependencies = [ "serde_derive", ] @@ -1750,9 +1674,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.150" +version = "1.0.152" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42a3df25b0713732468deadad63ab9da1f1fd75a48a15024b50363f128db627e" +checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" dependencies = [ "proc-macro2", "quote", @@ -1761,15 +1685,24 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.89" +version = "1.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "020ff22c755c2ed3f8cf162dbb41a7268d934702f3ed3631656ea597e08fc3db" +checksum = "877c235533714907a8c2464236f5c4b2a17262ef1bd71f38f35ea592c8da6883" dependencies = [ "itoa", "ryu", "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b04f22b563c91331a10074bda3dd5492e3cc39d56bd557e91c0af42b6c7341" +dependencies = [ + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -1784,14 +1717,15 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.8.26" +version = "0.9.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "578a7433b776b56a35785ed5ce9a7e777ac0598aac5a6dd1b4b18a307c7fc71b" +checksum = "92b5b431e8907b50339b51223b97d102db8d987ced36f6e4d03621db9316c834" dependencies = [ "indexmap", + "itoa", "ryu", "serde", - "yaml-rust", + "unsafe-libyaml", ] [[package]] @@ -1802,7 +1736,7 @@ checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0" dependencies = [ "cfg-if", "cpufeatures", - "digest 0.10.6", + "digest", ] [[package]] @@ -1924,9 +1858,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.105" +version = "1.0.107" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60b9b43d45702de4c839cb9b51d9f529c5dd26a4aff255b42b1ebc03e88ee908" +checksum = "1f4064b5b16e03ae50984a5a8ed5d4f8803e6bc1fd170a3cda91a1be4b18e3f5" dependencies = [ "proc-macro2", "quote", @@ -1966,18 +1900,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.37" +version = "1.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e" +checksum = "6a9cd18aa97d5c45c6603caea1da6628790b37f7a34b6ca89522331c5180fed0" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.37" +version = "1.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb" +checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f" dependencies = [ "proc-macro2", "quote", @@ -2048,9 +1982,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.23.0" +version = "1.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eab6d665857cc6ca78d6e80303a02cea7a7851e85dfbd77cbdc09bd129f1ef46" +checksum = "1d9f76183f91ecfb55e1d7d5602bd1d979e38a3a522fe900241cf195624d67ae" dependencies = [ "autocfg", "bytes", @@ -2137,9 +2071,9 @@ dependencies = [ [[package]] name = "toml" -version = "0.5.9" +version = "0.5.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d82e1a7758622a465f8cee077614c73484dac5b836c02ff6a40d5d1010324d7" +checksum = "1333c76748e868a4d9d1017b5ab53171dfd095f70c712fdb4653a406547f598f" dependencies = [ "serde", ] @@ -2152,7 +2086,7 @@ checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb" dependencies = [ "async-stream", "async-trait", - "axum 0.6.1", + "axum", "base64", "bytes", "futures-core", @@ -2303,9 +2237,9 @@ dependencies = [ [[package]] name = "try-lock" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" [[package]] name = "twox-hash" @@ -2331,9 +2265,9 @@ checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992" [[package]] name = "unicode-ident" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3" +checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc" [[package]] name = "unicode-normalization" @@ -2356,6 +2290,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b" +[[package]] +name = "unsafe-libyaml" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc7ed8ba44ca06be78ea1ad2c3682a43349126c8818054231ee6f4748012aed2" + [[package]] name = "url" version = "2.3.1" @@ -2548,12 +2488,3 @@ name = "windows_x86_64_msvc" version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5" - -[[package]] -name = "yaml-rust" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85" -dependencies = [ - "linked-hash-map", -] diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index fdad9c9..3301a6e 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -5,13 +5,16 @@ use futures_util::Future; use futures_util::FutureExt; use futures_util::StreamExt; use log::*; -use netfetch::ca::conn::CaConn; +use netfetch::ca::conn::CaConnEvent; use netfetch::ca::conn::ConnCommand; +use netfetch::ca::connset::CaConnSet; use netfetch::ca::findioc::FindIocRes; use netfetch::ca::findioc::FindIocStream; use netfetch::ca::store::DataStore; +use netfetch::ca::IngestCommons; use netfetch::conf::CaIngestOpts; use netfetch::errconv::ErrConv; +use netfetch::insertworker::Ttls; use netfetch::metrics::ExtraInsertsConf; use netfetch::store::CommonInsertItemQueue; use netpod::Database; @@ -20,6 +23,7 @@ use serde::Serialize; use std::collections::BTreeMap; use std::collections::VecDeque; use std::fmt; +use std::net::SocketAddr; use std::net::SocketAddrV4; use std::pin::Pin; use std::sync::atomic; @@ -27,13 +31,14 @@ use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::time::Duration; +use std::time::Instant; use std::time::SystemTime; use tokio_postgres::Client as PgClient; use tokio_postgres::Statement as PgStatement; const CHECK_CHANS_PER_TICK: usize = 10000; const FINDER_TIMEOUT: usize = 100; -const TIMEOUT_WARN: usize = 3000; +const TIMEOUT_WARN: usize = 6000; const FINDER_JOB_QUEUE_LEN_MAX: usize = 20; const FINDER_IN_FLIGHT_MAX: usize = 400; const FINDER_BATCH_SIZE: usize = 8; @@ -118,8 +123,10 @@ pub enum DaemonEvent { ChannelAdd(Channel), ChannelRemove(Channel), SearchDone(Result, Error>), + CaConnEvent(CaConnEvent), } +#[derive(Debug, Clone)] pub struct DaemonOpts { backend: String, local_epics_hostname: String, @@ -129,6 +136,7 @@ pub struct DaemonOpts { //search_excl: Vec, pgconf: Database, scyconf: ScyllaConfig, + ttls: Ttls, } impl DaemonOpts { @@ -193,7 +201,6 @@ pub struct Daemon { channel_states: BTreeMap, tx: Sender, rx: Receiver, - conns: BTreeMap, tokio::task::JoinHandle<()>)>, chan_check_next: Option, search_tx: Sender, ioc_finder_jh: tokio::task::JoinHandle<()>, @@ -213,6 +220,8 @@ pub struct Daemon { qu_addr_insert: PgStatement, ioc_addr_inserter_jh: tokio::task::JoinHandle<()>, ioc_addr_inserter_tx: Sender, + ingest_commons: Arc, + caconn_last_channel_check: Instant, } impl Daemon { @@ -241,16 +250,21 @@ impl Daemon { //trace!("insert queue item {item:?}"); match &item { netfetch::store::QueryItem::Insert(item) => { - item.pulse; + let shape_kind = match &item.shape { + netpod::Shape::Scalar => 0 as u32, + netpod::Shape::Wave(_) => 1, + netpod::Shape::Image(_, _) => 2, + }; histo .entry(item.series.clone()) - .and_modify(|(c, msp, lsp, pulse)| { + .and_modify(|(c, msp, lsp, pulse, _shape_kind)| { *c += 1; *msp = item.ts_msp; *lsp = item.ts_lsp; *pulse = item.pulse; + // TODO should check that shape_kind stays the same. }) - .or_insert((0 as usize, item.ts_msp, item.ts_lsp, item.pulse)); + .or_insert((0 as usize, item.ts_msp, item.ts_lsp, item.pulse, shape_kind)); } _ => {} } @@ -266,10 +280,17 @@ impl Daemon { printed_last = tsnow; let mut all: Vec<_> = histo .iter() - .map(|(k, (c, msp, lsp, pulse))| (usize::MAX - *c, k.clone(), *msp, *lsp, *pulse)) + .map(|(k, (c, msp, lsp, pulse, shape_kind))| { + (usize::MAX - *c, k.clone(), *msp, *lsp, *pulse, *shape_kind) + }) .collect(); all.sort_unstable(); - for (c, sid, msp, lsp, pulse) in all.into_iter().take(4) { + info!("Active scalar"); + for (c, sid, msp, lsp, pulse, _shape_kind) in all.iter().filter(|x| x.5 == 0).take(6) { + info!("{:10} {:20} {:14} {:20} {:?}", usize::MAX - c, msp, lsp, pulse, sid); + } + info!("Active wave"); + for (c, sid, msp, lsp, pulse, _shape_kind) in all.iter().filter(|x| x.5 == 1).take(6) { info!("{:10} {:20} {:14} {:20} {:?}", usize::MAX - c, msp, lsp, pulse, sid); } histo.clear(); @@ -279,7 +300,7 @@ impl Daemon { }); } - let ingest_commons = netfetch::ca::IngestCommons { + let ingest_commons = IngestCommons { pgconf: Arc::new(opts.pgconf.clone()), backend: opts.backend().into(), local_epics_hostname: opts.local_epics_hostname.clone(), @@ -289,10 +310,23 @@ impl Daemon { extra_inserts_conf: tokio::sync::Mutex::new(ExtraInsertsConf::new()), store_workers_rate: AtomicU64::new(20000), insert_frac: AtomicU64::new(1000), - ca_conn_set: netfetch::ca::connset::CaConnSet::new(), + ca_conn_set: CaConnSet::new(), }; let ingest_commons = Arc::new(ingest_commons); + tokio::task::spawn({ + let rx = ingest_commons.ca_conn_set.conn_item_rx(); + let tx = tx.clone(); + async move { + while let Ok(item) = rx.recv().await { + match tx.send(DaemonEvent::CaConnEvent(item)).await { + Ok(_) => {} + Err(e) => break, + } + } + } + }); + let qu_addr_insert = { const TEXT: tokio_postgres::types::Type = tokio_postgres::types::Type::TEXT; let sql = "insert into ioc_by_channel_log (facility, channel, responseaddr, addr) values ($1, $2, $3, $4)"; @@ -309,11 +343,7 @@ impl Daemon { // TODO use a new stats type: let store_stats = Arc::new(stats::CaConnStats::new()); - let ttls = netfetch::insertworker::Ttls { - index: Duration::from_secs(60 * 60 * 24 * 4), - d0: Duration::from_secs(60 * 60 * 24 * 3), - d1: Duration::from_secs(60 * 60 * 4), - }; + let ttls = opts.ttls.clone(); let jh_insert_workers = netfetch::insertworker::spawn_scylla_insert_workers( opts.scyconf.clone(), insert_scylla_sessions, @@ -364,7 +394,6 @@ impl Daemon { channel_states: BTreeMap::new(), tx, rx, - conns: BTreeMap::new(), chan_check_next: None, search_tx, ioc_finder_jh, @@ -382,6 +411,8 @@ impl Daemon { qu_addr_insert, ioc_addr_inserter_jh: ioc_addr_inserter_task, ioc_addr_inserter_tx, + ingest_commons, + caconn_last_channel_check: Instant::now(), }; Ok(ret) } @@ -549,46 +580,51 @@ impl Daemon { Unassigned { assign_at } => { if *assign_at <= tsnow { if st.pending_op.is_none() { - if !self.conns.contains_key(addr) { + if self.ingest_commons.ca_conn_set.has_addr(&SocketAddr::V4(*addr)).await { + } else { debug!("==================== create CaConn for {ch:?}"); let backend = self.opts.backend().into(); let local_epics_hostname = self.opts.local_epics_hostname.clone(); let array_truncate = self.opts.array_truncate; - let insert_item_sender = self.common_insert_item_queue.sender().await; - let mut conn = CaConn::new( - backend, - addr.clone(), - local_epics_hostname, - self.datastore.clone(), - insert_item_sender, - array_truncate, - 256, - ); - let conn_tx = conn.conn_command_tx(); - let conn_fut = async move { while let Some(_item) = conn.next().await {} }; - let conn_jh = tokio::spawn(conn_fut); - self.conns.insert(*addr, (conn_tx, conn_jh)); + let insert_item_queue_sender = self.common_insert_item_queue.sender().await; + let insert_queue_max = 256; + let data_store = self.datastore.clone(); + let with_channels = Vec::new(); + // TODO want to atomically use or create a connection. + // TODO creating a connection may block too long, because it establishes TCP first. + self.ingest_commons + .ca_conn_set + .create_ca_conn( + backend, + *addr, + local_epics_hostname, + array_truncate, + insert_queue_max, + insert_item_queue_sender, + data_store, + with_channels, + ) + .await?; } - if let Some((tx, _)) = self.conns.get(addr) { - let tx = tx.clone(); - let (cmd, rx) = ConnCommand::channel_add(ch.id().into()); - let fut = async move { - tx.send(cmd).await?; - let res = rx.recv().await?; - debug!("answer from CaConn: {res:?}"); - if res != true { - warn!("problem from CaConn"); - } - Ok(()) - }; - st.pending_op = Some(ChanOp::ConnCmd(Box::pin(fut))); + { + let backend = self.opts.backend().into(); + let channel_name = ch.id().into(); + let ingest_commons = self.ingest_commons.clone(); + // This operation is meant to complete very quickly + self.ingest_commons + .ca_conn_set + .add_channel_to_addr( + backend, + SocketAddr::V4(*addr), + channel_name, + ingest_commons, + ) + .await?; let cs = ConnectionState { updated: tsnow, value: ConnectionStateValue::Unconnected, }; *state = WithAddressState::Assigned(cs) - } else { - error!("no CaConn for {ch:?}"); } } } @@ -696,9 +732,28 @@ impl Daemon { Ok(()) } + async fn check_caconn_chans(&mut self) -> Result<(), Error> { + if self.caconn_last_channel_check.elapsed() > Duration::from_millis(5000) { + info!("Issue channel check to all CaConn"); + let res = self + .ingest_commons + .ca_conn_set + .send_command_to_all(|| ConnCommand::check_channels_alive()) + .await?; + for x in res { + if !x { + warn!("bad check_channels_alive result"); + } + } + self.caconn_last_channel_check = Instant::now(); + } + Ok(()) + } + async fn handle_timer_tick(&mut self) -> Result<(), Error> { let tsnow = SystemTime::now(); self.check_chans().await?; + self.check_caconn_chans().await?; if tsnow.duration_since(self.last_status_print).unwrap_or(Duration::ZERO) >= Duration::from_millis(1000) { self.last_status_print = tsnow; info!( @@ -830,6 +885,19 @@ impl Daemon { ChannelAdd(ch) => self.handle_channel_add(ch), ChannelRemove(ch) => self.handle_channel_remove(ch), SearchDone(item) => self.handle_search_done(item).await, + CaConnEvent(item) => { + use netfetch::ca::conn::CaConnEventValue::*; + match item.value { + None => { + // TODO count, maybe reduce. + Ok(()) + } + EchoTimeout => { + error!("TODO on EchoTimeout remove the CaConn and reset channels"); + Ok(()) + } + } + } } } @@ -889,6 +957,11 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> pgconf: opts.postgresql().clone(), scyconf: opts.scylla().clone(), search_tgts, + ttls: Ttls { + index: opts.ttl_index(), + d0: opts.ttl_d0(), + d1: opts.ttl_d1(), + }, }; let mut daemon = Daemon::new(opts2).await?; let tx = daemon.tx.clone(); diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index e7be0be..3c1d7da 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -197,6 +197,11 @@ pub async fn ca_connect(opts: CaIngestOpts, channels: &Vec) -> Result<() }; let ingest_commons = Arc::new(ingest_commons); + tokio::spawn({ + let rx = ingest_commons.ca_conn_set.conn_item_rx(); + async move { while let Ok(_item) = rx.recv().await {} } + }); + // TODO use a new stats type: let store_stats = Arc::new(CaConnStats::new()); let ttls = crate::insertworker::Ttls { diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 30de673..1d69b60 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -362,6 +362,18 @@ impl ConnCommand { } } +#[derive(Debug)] +pub enum CaConnEventValue { + None, + EchoTimeout, +} + +#[derive(Debug)] +pub struct CaConnEvent { + pub ts: Instant, + pub value: CaConnEventValue, +} + pub struct CaConn { state: CaConnState, shutdown: bool, @@ -392,6 +404,8 @@ pub struct CaConn { conn_backoff_beg: f32, inserts_counter: u64, extra_inserts_conf: ExtraInsertsConf, + ioc_ping_last: Instant, + ioc_ping_start: Option, } impl CaConn { @@ -434,6 +448,8 @@ impl CaConn { conn_backoff_beg: 0.02, inserts_counter: 0, extra_inserts_conf: ExtraInsertsConf::new(), + ioc_ping_last: Instant::now(), + ioc_ping_start: None, } } @@ -700,6 +716,25 @@ impl CaConn { fn check_channels_alive(&mut self) -> Result<(), Error> { let tsnow = Instant::now(); + trace!("CheckChannelsAlive {addr:?}", addr = &self.remote_addr_dbg); + if self.ioc_ping_last.elapsed() > Duration::from_millis(20000) { + if let Some(started) = self.ioc_ping_start { + if started.elapsed() > Duration::from_millis(4000) { + warn!("Echo timeout {addr:?}", addr = self.remote_addr_dbg); + self.shutdown = true; + } + } else { + self.ioc_ping_start = Some(Instant::now()); + if let Some(proto) = &mut self.proto { + trace!("push echo to {}", self.remote_addr_dbg); + let msg = CaMsg { ty: CaMsgTy::Echo }; + proto.push_out(msg); + } else { + warn!("can not push echo, no proto"); + self.shutdown = true; + } + } + } let mut alive_count = 0; let mut not_alive_count = 0; for (_, st) in &self.channels { @@ -1283,12 +1318,27 @@ impl CaConn { warn!("channel access error message {e:?}"); } CaMsgTy::AccessRightsRes(_) => {} - k => { - warn!("unexpected ca cmd {k:?}"); + CaMsgTy::Echo => { + let addr = &self.remote_addr_dbg; + if let Some(started) = self.ioc_ping_start { + let dt = started.elapsed().as_secs_f32() * 1e3; + if dt > 50. { + info!("Received Echo {dt:10.0}ms {addr:?}"); + } else if dt > 500. { + warn!("Received Echo {dt:10.0}ms {addr:?}"); + } + } else { + info!("Received Echo even though we didn't asked for it {addr:?}"); + } + self.ioc_ping_last = Instant::now(); + self.ioc_ping_start = None; + } + _ => { + warn!("Received unexpected protocol message {:?}", k); } } } - _ => {} + CaItem::Empty => {} } Ready(Some(Ok(()))) } @@ -1342,7 +1392,7 @@ impl CaConn { CaConnState::Unconnected => { let addr = self.remote_addr_dbg.clone(); trace!("create tcp connection to {:?}", (addr.ip(), addr.port())); - let fut = tokio::time::timeout(Duration::from_millis(500), TcpStream::connect(addr)); + let fut = tokio::time::timeout(Duration::from_millis(1000), TcpStream::connect(addr)); self.state = CaConnState::Connecting(addr, Box::pin(fut)); None } @@ -1469,17 +1519,19 @@ impl CaConn { } impl Stream for CaConn { - type Item = Result<(), Error>; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; self.stats.caconn_poll_count_inc(); if self.shutdown { - info!("CaConn poll"); + info!("CaConn poll in shutdown"); } + let mut i1 = 0; let ret = loop { + i1 += 1; if self.shutdown { - info!("CaConn loop 1"); + info!("CaConn in shutdown loop 1"); } self.stats.caconn_loop1_count_inc(); if !self.shutdown { @@ -1493,9 +1545,11 @@ impl Stream for CaConn { if self.shutdown { if self.insert_item_queue.len() == 0 { trace!("no more items to flush"); - break Ready(Ok(())); + if i1 >= 10 { + break Ready(Ok(())); + } } else { - info!("more items {}", self.insert_item_queue.len()); + //info!("more items {}", self.insert_item_queue.len()); } } if self.insert_item_queue.len() >= self.insert_queue_max { @@ -1503,7 +1557,9 @@ impl Stream for CaConn { } if !self.shutdown { if let Some(v) = self.loop_inner(cx) { - break v; + if i1 >= 10 { + break v; + } } } }; @@ -1515,7 +1571,14 @@ impl Stream for CaConn { return Ready(None); } match ret { - Ready(x) => Ready(Some(x)), + Ready(Ok(())) => { + let item = CaConnEvent { + ts: Instant::now(), + value: CaConnEventValue::None, + }; + Ready(Some(Ok(item))) + } + Ready(Err(e)) => Ready(Some(Err(e))), Pending => Pending, } } diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 1f1ee05..dff1037 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -1,11 +1,11 @@ -use super::conn::ConnCommand; +use super::conn::{CaConnEvent, ConnCommand}; use super::store::DataStore; use super::IngestCommons; use crate::ca::conn::CaConn; use crate::errconv::ErrConv; use crate::rt::{JoinHandle, TokMx}; use crate::store::CommonInsertItemQueueSender; -use async_channel::Sender; +use async_channel::{Receiver, Sender}; use err::Error; use futures_util::{FutureExt, StreamExt}; use netpod::log::*; @@ -67,15 +67,24 @@ impl CaConnRess { // There, make spawning part of this function? pub struct CaConnSet { ca_conn_ress: TokMx>, + conn_item_tx: Sender, + conn_item_rx: Receiver, } impl CaConnSet { pub fn new() -> Self { + let (conn_item_tx, conn_item_rx) = async_channel::bounded(10000); Self { ca_conn_ress: Default::default(), + conn_item_tx, + conn_item_rx, } } + pub fn conn_item_rx(&self) -> Receiver { + self.conn_item_rx.clone() + } + pub fn ca_conn_ress(&self) -> &TokMx> { &self.ca_conn_ress } @@ -108,21 +117,23 @@ impl CaConnSet { let conn = conn; let conn_tx = conn.conn_command_tx(); let conn_stats = conn.stats(); + let conn_item_tx = self.conn_item_tx.clone(); let conn_fut = async move { let stats = conn.stats(); let mut conn = conn; while let Some(item) = conn.next().await { match item { - Ok(_) => { + Ok(item) => { stats.conn_item_count_inc(); + conn_item_tx.send(item).await?; } Err(e) => { error!("CaConn gives error: {e:?}"); - break; + return Err(e); } } } - Ok::<_, Error>(()) + Ok(()) }; let jh = tokio::spawn(conn_fut); let ca_conn_ress = CaConnRess { @@ -234,6 +245,7 @@ impl CaConnSet { Ok(()) } + /// Add channel, or create a new CaConn and add the channel. pub async fn add_channel_to_addr( &self, backend: String, diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index b0667d1..0ed27f7 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -205,6 +205,7 @@ pub enum CaMsgTy { EventAddRes(EventAddRes), ReadNotify(ReadNotify), ReadNotifyRes(ReadNotifyRes), + Echo, } impl CaMsgTy { @@ -227,6 +228,7 @@ impl CaMsgTy { EventAddRes(_) => 0x01, ReadNotify(_) => 0x0f, ReadNotifyRes(_) => 0x0f, + Echo => 0x17, } } @@ -259,6 +261,7 @@ impl CaMsgTy { error!("should not attempt to serialize the response again"); panic!(); } + Echo => 0, } } @@ -284,6 +287,7 @@ impl CaMsgTy { EventAddRes(x) => x.data_type, ReadNotify(x) => x.data_type, ReadNotifyRes(x) => x.data_type, + Echo => 0, } } @@ -306,6 +310,7 @@ impl CaMsgTy { EventAddRes(x) => x.data_count, ReadNotify(x) => x.data_count, ReadNotifyRes(x) => x.data_count, + Echo => 0, } } @@ -328,6 +333,7 @@ impl CaMsgTy { EventAddRes(x) => x.status, ReadNotify(x) => x.sid, ReadNotifyRes(x) => x.sid, + Echo => 0, } } @@ -350,6 +356,7 @@ impl CaMsgTy { EventAddRes(x) => x.subid, ReadNotify(x) => x.ioid, ReadNotifyRes(x) => x.ioid, + Echo => 0, } } @@ -413,6 +420,7 @@ impl CaMsgTy { EventAddRes(_) => {} ReadNotify(_) => {} ReadNotifyRes(_) => {} + Echo => {} } } } @@ -708,6 +716,7 @@ impl CaMsg { }), } } + 0x17 => CaMsg { ty: CaMsgTy::Echo }, x => return Err(Error::with_msg_no_trace(format!("unsupported ca command {}", x))), }; Ok(msg) diff --git a/netfetch/src/insertworker.rs b/netfetch/src/insertworker.rs index ec919b3..74a0acd 100644 --- a/netfetch/src/insertworker.rs +++ b/netfetch/src/insertworker.rs @@ -43,6 +43,7 @@ async fn back_off_sleep(backoff_dt: &mut Duration) { tokio::time::sleep(*backoff_dt).await; } +#[derive(Debug, Clone)] pub struct Ttls { pub index: Duration, pub d0: Duration, @@ -130,9 +131,6 @@ pub async fn spawn_scylla_insert_workers( insert_item_queue.receiver() }; let ingest_commons = ingest_commons.clone(); - let ttl_msp = ttls.index; - let ttl_0d = ttls.d0; - let ttl_1d = ttls.d1; let fut = async move { let backoff_0 = Duration::from_millis(10); let mut backoff = backoff_0.clone(); @@ -146,7 +144,7 @@ pub async fn spawn_scylla_insert_workers( }; match item { QueryItem::ConnectionStatus(item) => { - match crate::store::insert_connection_status(item, ttl_msp, &data_store, &stats).await { + match crate::store::insert_connection_status(item, ttls.index, &data_store, &stats).await { Ok(_) => { stats.connection_status_insert_done_inc(); backoff = backoff_0; @@ -158,7 +156,7 @@ pub async fn spawn_scylla_insert_workers( } } QueryItem::ChannelStatus(item) => { - match crate::store::insert_channel_status(item, ttl_msp, &data_store, &stats).await { + match crate::store::insert_channel_status(item, ttls.index, &data_store, &stats).await { Ok(_) => { stats.channel_status_insert_done_inc(); backoff = backoff_0; @@ -172,7 +170,9 @@ pub async fn spawn_scylla_insert_workers( QueryItem::Insert(item) => { let insert_frac = ingest_commons.insert_frac.load(Ordering::Acquire); if i1 % 1000 < insert_frac { - match crate::store::insert_item(item, ttl_msp, ttl_0d, ttl_1d, &data_store, &stats).await { + match crate::store::insert_item(item, ttls.index, ttls.d0, ttls.d1, &data_store, &stats) + .await + { Ok(_) => { stats.store_worker_insert_done_inc(); backoff = backoff_0; @@ -194,7 +194,7 @@ pub async fn spawn_scylla_insert_workers( item.ts as i64, item.ema, item.emd, - ttl_msp.as_secs() as i32, + ttls.index.as_secs() as i32, ); let qres = data_store.scy.execute(&data_store.qu_insert_muted, values).await; match qres { @@ -216,7 +216,7 @@ pub async fn spawn_scylla_insert_workers( item.ts as i64, item.ema, item.emd, - ttl_msp.as_secs() as i32, + ttls.index.as_secs() as i32, ); let qres = data_store .scy @@ -242,7 +242,7 @@ pub async fn spawn_scylla_insert_workers( item.ivl, item.interest, item.evsize as i32, - ttl_msp.as_secs() as i32, + ttls.index.as_secs() as i32, ); let qres = data_store.scy.execute(&data_store.qu_insert_channel_ping, params).await; match qres { diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index 51ebf84..fcb075c 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -3,6 +3,7 @@ use crate::bsread::{ChannelDesc, GlobalTimestamp, HeadA, HeadB}; use crate::channelwriter::{ChannelWriter, ChannelWriterAll}; use crate::errconv::ErrConv; use crate::netbuf::NetBuf; +use crate::store::CommonInsertItemQueueSender; use async_channel::{Receiver, Sender}; #[allow(unused)] use bytes::BufMut; @@ -133,6 +134,7 @@ struct BsreadClient { do_pulse_id: bool, rcvbuf: Option, tmp_vals_pulse_map: Vec<(i64, i32, i64, i32)>, + insert_item_sender: CommonInsertItemQueueSender, scy: Arc, channel_writers: BTreeMap>, common_queries: Arc, @@ -144,6 +146,7 @@ impl BsreadClient { pub async fn new( opts: ZmtpClientOpts, source_addr: String, + insert_item_sender: CommonInsertItemQueueSender, scy: Arc, common_queries: Arc, ) -> Result { @@ -152,7 +155,8 @@ impl BsreadClient { do_pulse_id: opts.do_pulse_id, rcvbuf: opts.rcvbuf, opts, - tmp_vals_pulse_map: vec![], + tmp_vals_pulse_map: Vec::new(), + insert_item_sender, scy, channel_writers: Default::default(), common_queries, @@ -524,7 +528,14 @@ pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> { let common_queries = Arc::new(common_queries); let mut jhs = vec![]; for source_addr in &opts.sources { - let client = BsreadClient::new(opts.clone(), source_addr.into(), scy.clone(), common_queries.clone()).await?; + let client = BsreadClient::new( + opts.clone(), + source_addr.into(), + todo!(), + scy.clone(), + common_queries.clone(), + ) + .await?; let fut = ClientRun::new(client); //clients.push(fut); let jh = tokio::spawn(fut);