From f869047cf5de62c15649f4788facd997b1a17202 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 14 Sep 2023 10:14:50 +0200 Subject: [PATCH] WIP --- .cargo/cargo-lock | 255 ++++++++++++++++++--------------- daqingest/src/daemon.rs | 39 +++-- dbpg/Cargo.toml | 1 + dbpg/src/seriesbychannel.rs | 29 ++-- netfetch/src/ca/conn.rs | 42 ++++-- netfetch/src/ca/connset.rs | 261 ++++++++++++++++++++++++++++------ netfetch/src/ca/statemap.rs | 23 ++- netfetch/src/insertworker.rs | 1 - netfetch/src/lib.rs | 1 - netfetch/src/metrics.rs | 66 +++++++-- netfetch/src/throttletrace.rs | 5 + netfetch/src/timebin.rs | 3 + scywr/src/insertworker.rs | 71 ++++----- scywr/src/iteminsertqueue.rs | 13 +- stats/src/stats.rs | 51 ++++++- stats_proc/src/stats_proc.rs | 10 +- 16 files changed, 611 insertions(+), 260 deletions(-) delete mode 100644 netfetch/src/insertworker.rs diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index b18cb6d..a42a6ee 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -68,9 +68,9 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15c4c2c83f81532e5845a733998b6971faca23490340a418e9b72a3ec9de12ea" +checksum = "b84bf0a05bbb2a83e5eb6fa36bb6e87baa08193c35ff52bbf6b38d8af2890e46" [[package]] name = "anstyle-parse" @@ -143,7 +143,7 @@ checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.33", ] [[package]] @@ -233,9 +233,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" [[package]] name = "base64" -version = "0.21.3" +version = "0.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "414dcefbc63d77c526a76b3afcf6fbb9b5e2791c19c3aa2297733208750c6e53" +checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2" [[package]] name = "batchtools" @@ -344,9 +344,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" -version = "1.4.0" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" [[package]] name = "cc" @@ -365,25 +365,24 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.28" +version = "0.4.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95ed24df0632f708f5f6d8082675bef2596f7084dee3dd55f632290bf35bfe0f" +checksum = "defd4e7873dbddba6c7c91e199c7fcb946abc4a6a4ac3195400bcfb01b5de877" dependencies = [ "android-tzdata", "iana-time-zone", "js-sys", "num-traits", "serde", - "time 0.1.45", "wasm-bindgen", "windows-targets", ] [[package]] name = "clap" -version = "4.4.2" +version = "4.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a13b88d2c62ff462f88e4a121f17a82c1af05693a2f192b5c38d14de73c19f6" +checksum = "84ed82781cea27b43c9b106a979fe450a13a31aab0500595fb3fc06616de08e6" dependencies = [ "clap_builder", "clap_derive", @@ -410,7 +409,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.33", ] [[package]] @@ -646,6 +645,16 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.16" @@ -667,10 +676,9 @@ dependencies = [ [[package]] name = "daqingest" -version = "0.2.0-alpha.0" +version = "0.2.0-alpha.1" dependencies = [ "async-channel", - "batchtools", "bytes", "chrono", "clap", @@ -712,7 +720,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.33", ] [[package]] @@ -723,7 +731,7 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core", "quote", - "syn 2.0.29", + "syn 2.0.33", ] [[package]] @@ -751,6 +759,7 @@ dependencies = [ "md-5", "netpod", "series", + "stats", "taskrun", "tokio-postgres", ] @@ -827,7 +836,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.33", ] [[package]] @@ -902,6 +911,12 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764" +[[package]] +name = "finl_unicode" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fcfdc7a0362c9f4444381a9e697c79d435fe65b52a37466fc2c1184cee9edc6" + [[package]] name = "flate2" version = "1.0.27" @@ -1004,7 +1019,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.33", ] [[package]] @@ -1064,7 +1079,7 @@ checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" dependencies = [ "cfg-if", "libc", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasi", ] [[package]] @@ -1463,7 +1478,7 @@ dependencies = [ name = "items_proc" version = "0.0.2" dependencies = [ - "syn 2.0.29", + "syn 2.0.33", ] [[package]] @@ -1513,15 +1528,15 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" [[package]] name = "libc" -version = "0.2.147" +version = "0.2.148" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b" [[package]] name = "linux-raw-sys" -version = "0.4.5" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57bcfdad1b858c2db7c38303a6d2ad4dfaf5eb53dfeb0910128b2c26d6158503" +checksum = "1a9bad9f94746442c783ca431b22403b519cd7fbeed0533fdd6328b2f2212128" [[package]] name = "lock_api" @@ -1590,9 +1605,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.6.2" +version = "2.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5486aed0026218e61b8a01d5fbd5a0a134649abb71a0e53b7bc088529dced86e" +checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" [[package]] name = "memmap2" @@ -1603,6 +1618,15 @@ dependencies = [ "libc", ] +[[package]] +name = "memmap2" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d28bba84adfe6646737845bc5ebbfa2c08424eb1c37e94a1fd2a82adb56a872" +dependencies = [ + "libc", +] + [[package]] name = "memoffset" version = "0.8.0" @@ -1649,7 +1673,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" dependencies = [ "libc", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasi", "windows-sys 0.48.0", ] @@ -1684,6 +1708,7 @@ dependencies = [ "arrayref", "async-channel", "axum", + "batchtools", "bitshuffle", "byteorder", "bytes", @@ -1817,14 +1842,14 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.33", ] [[package]] name = "object" -version = "0.32.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77ac5bbd07aea88c60a577a1ce218075ffd59208b2d7ca97adf9bfc5aeb21ebe" +checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0" dependencies = [ "memchr", ] @@ -1858,7 +1883,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.33", ] [[package]] @@ -1869,9 +1894,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.92" +version = "0.9.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db7e971c2c2bba161b2d2fdf37080177eff520b3bc044787c7f1f5f9e78d869b" +checksum = "db4d56a4c0478783083cfafcc42493dd4a981d41669da64b4572a2a089b51b1d" dependencies = [ "cc", "libc", @@ -1972,7 +1997,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.33", ] [[package]] @@ -2010,7 +2035,7 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49b6c5ef183cd3ab4ba005f1ca64c21e8bd97ce4699cfea9e8d9a2c4958ca520" dependencies = [ - "base64 0.21.3", + "base64 0.21.4", "byteorder", "bytes", "fallible-iterator", @@ -2076,9 +2101,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.66" +version = "1.0.67" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" +checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328" dependencies = [ "unicode-ident", ] @@ -2250,13 +2275,13 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.4" +version = "1.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12de2eff854e5fa4b1295edd650e227e9d8fb0c9e90b12e7f36d6a6811791a29" +checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.3.7", + "regex-automata 0.3.8", "regex-syntax 0.7.5", ] @@ -2271,9 +2296,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.7" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49530408a136e16e5b486e883fbb6ba058e8e4e8ae6621a77b048b314336e629" +checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" dependencies = [ "aho-corasick", "memchr", @@ -2381,9 +2406,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.11" +version = "0.38.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0c3dde1fc030af041adc40e79c0e7fbcf431dd24870053d187d7c66e4b87453" +checksum = "d7db8590df6dfcd144d22afd1b83b36c21a18d7cbc1dc4bb5295a8712e9eb662" dependencies = [ "bitflags 2.4.0", "errno", @@ -2444,10 +2469,10 @@ dependencies = [ "scylla-macros", "smallvec", "snap", - "socket2 0.5.3", + "socket2 0.5.4", "strum", "strum_macros", - "thiserror 1.0.47", + "thiserror 1.0.48", "tokio", "tracing", "uuid", @@ -2469,7 +2494,7 @@ dependencies = [ "num_enum", "scylla-macros", "snap", - "thiserror 1.0.47", + "thiserror 1.0.48", "tokio", "uuid", ] @@ -2529,6 +2554,12 @@ dependencies = [ "libc", ] +[[package]] +name = "self_cell" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c309e515543e67811222dbc9e3dd7e1056279b782e1dacffe4242b718734fb6" + [[package]] name = "semver" version = "1.0.18" @@ -2573,14 +2604,14 @@ checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.33", ] [[package]] name = "serde_json" -version = "1.0.105" +version = "1.0.107" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "693151e1ac27563d6dbcec9dee9fbd5da8539b20fa14ad3752b2e6d363ace360" +checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" dependencies = [ "itoa", "ryu", @@ -2650,6 +2681,16 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shared-buffer" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cf61602ee61e2f83dd016b3e6387245291cf728ea071c378b35088125b4d995" +dependencies = [ + "bytes", + "memmap2 0.6.2", +] + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -2714,9 +2755,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.3" +version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877" +checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" dependencies = [ "libc", "windows-sys 0.48.0", @@ -2760,7 +2801,7 @@ version = "0.0.1" dependencies = [ "quote", "stats_types", - "syn 2.0.29", + "syn 2.0.33", ] [[package]] @@ -2796,10 +2837,11 @@ dependencies = [ [[package]] name = "stringprep" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db3737bde7edce97102e0e2b15365bf7a20bfdb5f60f4f9e8d7004258a51a8da" +checksum = "bb41d74e231a107a1b4ee36bd1214b11285b77768d2e3824aedafa988fd36ee6" dependencies = [ + "finl_unicode", "unicode-bidi", "unicode-normalization", ] @@ -2848,9 +2890,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.29" +version = "2.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c324c494eba9d92503e6f1ef2e6df781e78f6a7705a0202d9801b198807d518a" +checksum = "9caece70c63bfba29ec2fed841a09851b14a235c60010fa4de58089b6c025668" dependencies = [ "proc-macro2", "quote", @@ -2885,7 +2927,7 @@ dependencies = [ "err", "futures-util", "lazy_static", - "time 0.3.28", + "time", "tokio", "tracing", "tracing-subscriber", @@ -2914,11 +2956,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.47" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f" +checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7" dependencies = [ - "thiserror-impl 1.0.47", + "thiserror-impl 1.0.48", ] [[package]] @@ -2928,18 +2970,18 @@ source = "git+https://github.com/dominikwerder/thiserror.git#052df05c18b5f26b462 dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.33", ] [[package]] name = "thiserror-impl" -version = "1.0.47" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b" +checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.33", ] [[package]] @@ -2952,17 +2994,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "time" -version = "0.1.45" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a" -dependencies = [ - "libc", - "wasi 0.10.0+wasi-snapshot-preview1", - "winapi", -] - [[package]] name = "time" version = "0.3.28" @@ -3020,7 +3051,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.3", + "socket2 0.5.4", "tokio-macros", "tracing", "windows-sys 0.48.0", @@ -3044,7 +3075,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.33", ] [[package]] @@ -3077,7 +3108,7 @@ dependencies = [ "postgres-protocol", "postgres-types", "rand", - "socket2 0.5.3", + "socket2 0.5.4", "tokio", "tokio-util", "whoami", @@ -3116,9 +3147,9 @@ checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b" [[package]] name = "toml_edit" -version = "0.19.14" +version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a" +checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ "indexmap 2.0.0", "toml_datetime", @@ -3133,7 +3164,7 @@ checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" dependencies = [ "async-trait", "axum", - "base64 0.21.3", + "base64 0.21.4", "bytes", "futures-core", "futures-util", @@ -3206,7 +3237,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.33", ] [[package]] @@ -3243,7 +3274,7 @@ dependencies = [ "sharded-slab", "smallvec", "thread_local", - "time 0.3.28", + "time", "tracing", "tracing-core", "tracing-log", @@ -3279,9 +3310,9 @@ checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" [[package]] name = "unicode-ident" -version = "1.0.11" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" [[package]] name = "unicode-normalization" @@ -3357,12 +3388,6 @@ dependencies = [ "try-lock", ] -[[package]] -name = "wasi" -version = "0.10.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" - [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -3390,7 +3415,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.33", "wasm-bindgen-shared", ] @@ -3435,7 +3460,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.33", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3448,9 +3473,9 @@ checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" [[package]] name = "wasmer" -version = "4.1.2" +version = "4.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc7142dbb91ede83cc0aef2301fa75fcc7e0c9e5a7d5358e3c4f3a7249fe9ce8" +checksum = "b7954f3bdeb6cc9b46ddfbab7d795fc6a249a79f51c102d95679390526cf43d8" dependencies = [ "bytes", "cfg-if", @@ -3461,8 +3486,9 @@ dependencies = [ "rustc-demangle", "serde", "serde-wasm-bindgen", + "shared-buffer", "target-lexicon", - "thiserror 1.0.47", + "thiserror 1.0.48", "wasm-bindgen", "wasm-bindgen-downcast", "wasmer-compiler", @@ -3475,21 +3501,25 @@ dependencies = [ [[package]] name = "wasmer-compiler" -version = "4.1.2" +version = "4.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5b99c70711ec7631b602a9fc95577c40df21e8f3916159c9d80c3fb4f77abdc" +checksum = "7c72380ddff4d9710366527100183df6f943caab5b63adc7cac6d90bacdf4628" dependencies = [ "backtrace", + "bytes", "cfg-if", "enum-iterator", "enumset", "lazy_static", "leb128", - "memmap2", + "memmap2 0.5.10", "more-asserts", "region", + "rkyv", + "self_cell", + "shared-buffer", "smallvec", - "thiserror 1.0.47", + "thiserror 1.0.48", "wasmer-types", "wasmer-vm", "wasmparser", @@ -3498,9 +3528,9 @@ dependencies = [ [[package]] name = "wasmer-compiler-cranelift" -version = "4.1.2" +version = "4.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52aef2ef35513a04fed54de9a7dc9c469d4742a5c2e378a5f7e2a79b1327e3bd" +checksum = "957adcc170f007bf26293c7cd95352c1e471d7bf4295a1dfda796702a79d9648" dependencies = [ "cranelift-codegen", "cranelift-entity", @@ -3517,9 +3547,9 @@ dependencies = [ [[package]] name = "wasmer-derive" -version = "4.1.2" +version = "4.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25bb1425c9e4dc3e2d3aacd6e82e22e27a3127379e0d09bcbdf25ff376229162" +checksum = "6cd92754ce26eec6136ae6282d96ca59632bd47237ecba82824de16e34b3ce6b" dependencies = [ "proc-macro-error", "proc-macro2", @@ -3529,9 +3559,9 @@ dependencies = [ [[package]] name = "wasmer-types" -version = "4.1.2" +version = "4.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7e32ed799fa8c0d96ca9615d9ea8006857a0f0c18e7c2ed8082bd5c63a9ea70" +checksum = "d93ca35c9a184e7d561f496c5d6da835c8309a8f76fcca1384f5800127395735" dependencies = [ "bytecheck", "enum-iterator", @@ -3540,19 +3570,20 @@ dependencies = [ "more-asserts", "rkyv", "target-lexicon", - "thiserror 1.0.47", + "thiserror 1.0.48", ] [[package]] name = "wasmer-vm" -version = "4.1.2" +version = "4.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0847513cb176b5d62a6f65d6ae474594935e726a10e9e3387177d9cbf8b8cda0" +checksum = "200ff711048c0a2cb045d4984a673a1edea18f1e14f024694a43f4951922998b" dependencies = [ "backtrace", "cc", "cfg-if", "corosensei", + "crossbeam-queue", "dashmap", "derivative", "enum-iterator", @@ -3565,7 +3596,7 @@ dependencies = [ "more-asserts", "region", "scopeguard", - "thiserror 1.0.47", + "thiserror 1.0.48", "wasmer-types", "winapi", ] diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 9f9bc4e..743c060 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -27,6 +27,8 @@ use serde::Serialize; use series::ChannelStatusSeriesId; use series::SeriesId; use stats::DaemonStats; +use stats::InsertWorkerStats; +use stats::SeriesByChannelStats; use std::collections::BTreeMap; use std::collections::VecDeque; use std::net::SocketAddr; @@ -87,8 +89,8 @@ pub struct Daemon { last_status_print: SystemTime, insert_workers_jh: Vec>>, stats: Arc, + insert_worker_stats: Arc, shutting_down: bool, - insert_rx_weak: WeakReceiver, connset_ctrl: CaConnSetCtrl, connset_status_last: CheckPeriodic, // TODO should be a stats object? @@ -100,10 +102,14 @@ impl Daemon { pub async fn new(opts: DaemonOpts) -> Result { let (daemon_ev_tx, daemon_ev_rx) = async_channel::bounded(32); + let series_by_channel_stats = Arc::new(SeriesByChannelStats::new()); + let insert_worker_stats = Arc::new(InsertWorkerStats::new()); + // TODO keep join handles and await later - let (channel_info_query_tx, jhs, jh) = dbpg::seriesbychannel::start_lookup_workers(4, &opts.pgconf) - .await - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let (channel_info_query_tx, jhs, jh) = + dbpg::seriesbychannel::start_lookup_workers(4, &opts.pgconf, series_by_channel_stats) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; let (query_item_tx, query_item_rx) = async_channel::bounded(opts.insert_item_queue_cap); let query_item_tx_weak = query_item_tx.downgrade(); @@ -159,9 +165,9 @@ impl Daemon { opts.scyconf.clone(), opts.insert_scylla_sessions, opts.insert_worker_count, - query_item_rx.clone(), + query_item_rx, insert_worker_opts, - store_stats.clone(), + insert_worker_stats.clone(), use_rate_limit_queue, ttls, ) @@ -214,8 +220,8 @@ impl Daemon { last_status_print: SystemTime::now(), insert_workers_jh, stats: Arc::new(DaemonStats::new()), + insert_worker_stats, shutting_down: false, - insert_rx_weak: query_item_rx.downgrade(), connset_ctrl: conn_set_ctrl, connset_status_last: CheckPeriodic::Waiting(Instant::now()), insert_workers_running: AtomicU64::new(0), @@ -231,7 +237,7 @@ impl Daemon { async fn check_caconn_chans(&mut self, ts1: Instant) -> Result<(), Error> { match &self.connset_status_last { CheckPeriodic::Waiting(since) => { - if *since + Duration::from_millis(2000) < ts1 { + if *since + Duration::from_millis(5000) < ts1 { debug!("======================================== issue health check CaConn"); self.connset_ctrl.check_health().await?; self.connset_status_last = CheckPeriodic::Ongoing(ts1); @@ -242,7 +248,7 @@ impl Daemon { } CheckPeriodic::Ongoing(since) => { let dt = ts1.saturating_duration_since(*since); - if dt > Duration::from_millis(4000) { + if dt > Duration::from_millis(2000) { error!("======================================== CaConnSet has not reported health status since {:.0}", dt.as_secs_f32() * 1e3); } } @@ -254,7 +260,7 @@ impl Daemon { if self.shutting_down { let nworkers = self.insert_workers_running.load(atomic::Ordering::Acquire); let nitems = self - .insert_rx_weak + .query_item_tx_weak .upgrade() .map(|x| (x.sender_count(), x.receiver_count(), x.len())); info!("qu senders A nworkers {} nitems {:?}", nworkers, nitems); @@ -592,13 +598,19 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> let daemon = Daemon::new(opts2).await?; let tx = daemon.tx.clone(); let daemon_stats = daemon.stats().clone(); + let connset_cmd_tx = daemon.connset_ctrl.sender().clone(); let (metrics_shutdown_tx, metrics_shutdown_rx) = async_channel::bounded(8); let dcom = Arc::new(netfetch::metrics::DaemonComm::new(tx.clone())); let metrics_jh = { - let stats_set = StatsSet::new(daemon_stats); - let fut = netfetch::metrics::metrics_service(opts.api_bind(), dcom, stats_set, metrics_shutdown_rx); + let stats_set = StatsSet::new( + daemon_stats, + daemon.connset_ctrl.stats().clone(), + daemon.insert_worker_stats.clone(), + ); + let fut = + netfetch::metrics::metrics_service(opts.api_bind(), dcom, connset_cmd_tx, stats_set, metrics_shutdown_rx); tokio::task::spawn(fut) }; @@ -615,9 +627,6 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> } thr_msg.trigger_fmt("sent ChannelAdd", &[&i as &_]); i += 1; - // if i % 100 == 0 { - // debug!("sent {} ChannelAdd", i); - // } } debug!("{} configured channels applied", channels.len()); daemon_jh.await.map_err(|e| Error::with_msg_no_trace(e.to_string()))??; diff --git a/dbpg/Cargo.toml b/dbpg/Cargo.toml index 5e4fee2..47739e9 100644 --- a/dbpg/Cargo.toml +++ b/dbpg/Cargo.toml @@ -10,6 +10,7 @@ err = { path = "../../daqbuffer/crates/err" } netpod = { path = "../../daqbuffer/crates/netpod" } taskrun = { path = "../../daqbuffer/crates/taskrun" } batchtools = { path = "../batchtools" } +stats = { path = "../stats" } series = { path = "../series" } tokio-postgres = { version = "0.7.10", features = ["with-chrono-0_4"] } futures-util = "0.3" diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index ab573a5..9a6c1d7 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -10,7 +10,9 @@ use md5::Digest; use netpod::Database; use series::series::Existence; use series::SeriesId; +use stats::SeriesByChannelStats; use std::pin::Pin; +use std::sync::Arc; use std::time::Duration; use std::time::Instant; use taskrun::tokio; @@ -97,10 +99,15 @@ struct Worker { qu_select: PgStatement, qu_insert: PgStatement, batch_rx: Receiver>, + stats: Arc, } impl Worker { - async fn new(db: &Database, batch_rx: Receiver>) -> Result { + async fn new( + db: &Database, + batch_rx: Receiver>, + stats: Arc, + ) -> Result { let pg = crate::conn::make_pg_client(db).await?; let sql = concat!( "with q1 as (select * from unnest($1::text[], $2::text[], $3::int[], $4::text[], $5::int[])", @@ -125,6 +132,7 @@ impl Worker { qu_select, qu_insert, batch_rx, + stats, }; Ok(ret) } @@ -319,16 +327,14 @@ impl Worker { channel: r.channel, series: r.series, }; - trace3!("try to send result for {:?}", item); + // trace3!("try to send result for {:?}", item); let fut = r.tx.make_send(Ok(item)); - match tokio::time::timeout(Duration::from_millis(2000), fut).await { - Ok(Ok(())) => {} - Ok(Err(_e)) => { - warn!("can not deliver result"); - return Err(Error::ChannelError); - } - Err(_) => { - debug!("timeout can not deliver result"); + match fut.await { + Ok(()) => {} + Err(_e) => { + //warn!("can not deliver result"); + // return Err(Error::ChannelError); + self.stats.res_tx_fail.inc(); } } } @@ -341,6 +347,7 @@ impl Worker { pub async fn start_lookup_workers( worker_count: usize, db: &Database, + stats: Arc, ) -> Result< ( Sender, @@ -356,7 +363,7 @@ pub async fn start_lookup_workers( let (batch_rx, bjh) = batchtools::batcher::batch(inp_cap, timeout, batch_out_cap, query_rx); let mut jhs = Vec::new(); for _ in 0..worker_count { - let mut worker = Worker::new(db, batch_rx.clone()).await?; + let mut worker = Worker::new(db, batch_rx.clone(), stats.clone()).await?; let jh = tokio::task::spawn(async move { worker.work().await }); jhs.push(jh); } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 3519be1..fba3be5 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -185,7 +185,6 @@ struct CreatedState { info_store_msp_last: u32, } -#[allow(unused)] #[derive(Clone, Debug)] enum ChannelState { Init(ChannelStatusSeriesId), @@ -392,9 +391,14 @@ impl ConnCommand { } } +#[derive(Debug)] +pub struct CheckHealthResult { + pub channel_statuses: BTreeMap, +} + #[derive(Debug)] pub enum ConnCommandResultKind { - CheckHealth, + CheckHealth(CheckHealthResult), } #[derive(Debug)] @@ -403,6 +407,17 @@ pub struct ConnCommandResult { pub kind: ConnCommandResultKind, } +impl ConnCommandResult { + pub fn id(&self) -> usize { + self.id + } + + fn make_id() -> usize { + static ID: AtomicUsize = AtomicUsize::new(0); + ID.fetch_add(1, atomic::Ordering::AcqRel) + } +} + #[derive(Debug)] pub enum CaConnEventValue { None, @@ -562,12 +577,20 @@ impl CaConn { } } // TODO return the result + let mut channel_statuses = BTreeMap::new(); + for (k, v) in self.channels.iter() { + let name = self + .name_by_cid(*k) + .map_or_else(|| format!("{k:?}"), ToString::to_string); + let info = v.to_info(name.clone(), self.remote_addr_dbg); + channel_statuses.insert(name, info); + } + let health = CheckHealthResult { channel_statuses }; let res = ConnCommandResult { - id: 0, - kind: ConnCommandResultKind::CheckHealth, + id: ConnCommandResult::make_id(), + kind: ConnCommandResultKind::CheckHealth(health), }; self.cmd_res_queue.push_back(res); - //self.stats.caconn_command_can_not_reply.inc(); } fn cmd_find_channel(&self, pattern: &str) { @@ -869,7 +892,7 @@ impl CaConn { fn check_channels_alive(&mut self) -> Result<(), Error> { let tsnow = Instant::now(); - trace!("check_channels_alive {addr:?}", addr = &self.remote_addr_dbg); + trace2!("check_channels_alive {addr:?}", addr = &self.remote_addr_dbg); if self.ioc_ping_last.elapsed() > Duration::from_millis(20000) { if let Some(started) = self.ioc_ping_start { if started.elapsed() > Duration::from_millis(4000) { @@ -884,7 +907,7 @@ impl CaConn { } else { self.ioc_ping_start = Some(Instant::now()); if let Some(proto) = &mut self.proto { - debug!("ping to {}", self.remote_addr_dbg); + trace2!("ping to {}", self.remote_addr_dbg); let msg = CaMsg { ty: CaMsgTy::Echo }; proto.push_out(msg); } else { @@ -1762,7 +1785,10 @@ impl Stream for CaConn { self.stats.caconn_poll_count.inc(); let poll_ts1 = Instant::now(); let ret = loop { - self.thr_msg_poll.trigger("CaConn::poll_next"); + let qlen = self.insert_item_queue.len(); + if qlen >= 200 { + self.thr_msg_poll.trigger_fmt("CaConn::poll_next", &[&qlen]); + } break if let CaConnState::EndOfStream = self.state { Ready(None) } else if let Err(e) = self.as_mut().handle_own_ticker(cx) { diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 2b5a46b..6a9251a 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -1,3 +1,5 @@ +use super::conn::ChannelStateInfo; +use super::conn::ConnCommandResult; use super::findioc::FindIocRes; use super::statemap; use super::statemap::ChannelState; @@ -19,6 +21,7 @@ use crate::throttletrace::ThrottleTrace; use async_channel::Receiver; use async_channel::Sender; use atomic::AtomicUsize; +use core::fmt; use dbpg::seriesbychannel::BoxedSend; use dbpg::seriesbychannel::CanSendChannelInfoResult; use dbpg::seriesbychannel::ChannelInfoQuery; @@ -32,6 +35,7 @@ use netpod::Database; use netpod::Shape; use scywr::iteminsertqueue::ChannelStatusItem; use scywr::iteminsertqueue::QueryItem; +use serde::Serialize; use series::series::Existence; use series::ChannelStatusSeriesId; use series::SeriesId; @@ -108,6 +112,7 @@ pub struct CaConnRes { state: CaConnState, sender: Sender, stats: Arc, + cmd_queue: VecDeque, // TODO await on jh jh: JoinHandle>, } @@ -147,12 +152,29 @@ pub struct ChannelRemove { name: String, } +#[derive(Debug, Clone, Serialize)] +pub struct ChannelStatusesResponse { + pub channels_ca_conn: BTreeMap, + pub channels_ca_conn_set: BTreeMap, +} + +pub struct ChannelStatusesRequest { + pub tx: Sender, +} + +impl fmt::Debug for ChannelStatusesRequest { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("ChannelStatusesRequest").finish() + } +} + #[derive(Debug)] pub enum ConnSetCmd { ChannelAdd(ChannelAdd), ChannelRemove(ChannelRemove), CheckHealth(Instant), Shutdown, + ChannelStatuses(ChannelStatusesRequest), } #[derive(Debug)] @@ -160,6 +182,10 @@ pub enum CaConnSetEvent { ConnSetCmd(ConnSetCmd), } +impl CaConnSetEvent { + // pub fn new_cmd_channel_statuses() -> (Self, Receiver) {} +} + #[derive(Debug, Clone)] pub enum CaConnSetItem { Error(Error), @@ -169,10 +195,15 @@ pub enum CaConnSetItem { pub struct CaConnSetCtrl { tx: Sender, rx: Receiver, + stats: Arc, jh: JoinHandle>, } impl CaConnSetCtrl { + pub fn sender(&self) -> Sender { + self.tx.clone() + } + pub fn receiver(&self) -> Receiver { self.rx.clone() } @@ -211,6 +242,10 @@ impl CaConnSetCtrl { self.jh.await.map_err(|e| Error::with_msg_no_trace(e.to_string()))??; Ok(()) } + + pub fn stats(&self) -> &Arc { + &self.stats + } } #[derive(Debug)] @@ -239,6 +274,7 @@ pub struct CaConnSet { local_epics_hostname: String, ca_conn_ress: BTreeMap, channel_states: ChannelStateMap, + ca_conn_channel_states: BTreeMap, connset_inp_rx: Receiver, channel_info_query_queue: VecDeque, channel_info_query_sender: SenderPolling, @@ -248,6 +284,7 @@ pub struct CaConnSet { find_ioc_query_queue: VecDeque, find_ioc_query_sender: SenderPolling, find_ioc_res_rx: Receiver>, + storage_insert_tx: Sender, storage_insert_queue: VecDeque, storage_insert_sender: SenderPolling, ca_conn_res_tx: Sender<(SocketAddr, CaConnEvent)>, @@ -257,11 +294,12 @@ pub struct CaConnSet { shutdown_stopping: bool, shutdown_done: bool, chan_check_next: Option, - stats: CaConnSetStats, + stats: Arc, ioc_finder_jh: JoinHandle>, await_ca_conn_jhs: VecDeque<(SocketAddr, JoinHandle>)>, thr_msg_poll_1: ThrottleTrace, thr_msg_storage_len: ThrottleTrace, + did_connset_out_queue: bool, } impl CaConnSet { @@ -272,18 +310,20 @@ impl CaConnSet { channel_info_query_tx: Sender, pgconf: Database, ) -> CaConnSetCtrl { - let (ca_conn_res_tx, ca_conn_res_rx) = async_channel::bounded(5000); - let (connset_inp_tx, connset_inp_rx) = async_channel::bounded(5000); - let (connset_out_tx, connset_out_rx) = async_channel::bounded(5000); - let (find_ioc_res_tx, find_ioc_res_rx) = async_channel::bounded(5000); + let (ca_conn_res_tx, ca_conn_res_rx) = async_channel::bounded(200); + let (connset_inp_tx, connset_inp_rx) = async_channel::bounded(200); + let (connset_out_tx, connset_out_rx) = async_channel::bounded(200); + let (find_ioc_res_tx, find_ioc_res_rx) = async_channel::bounded(400); let (find_ioc_query_tx, ioc_finder_jh) = super::finder::start_finder(find_ioc_res_tx.clone(), backend.clone(), pgconf); - let (channel_info_res_tx, channel_info_res_rx) = async_channel::bounded(5000); + let (channel_info_res_tx, channel_info_res_rx) = async_channel::bounded(400); + let stats = Arc::new(CaConnSetStats::new()); let connset = Self { backend, local_epics_hostname, ca_conn_ress: BTreeMap::new(), channel_states: ChannelStateMap::new(), + ca_conn_channel_states: BTreeMap::new(), connset_inp_rx, channel_info_query_queue: VecDeque::new(), channel_info_query_sender: SenderPolling::new(channel_info_query_tx.clone()), @@ -293,6 +333,7 @@ impl CaConnSet { find_ioc_query_queue: VecDeque::new(), find_ioc_query_sender: SenderPolling::new(find_ioc_query_tx), find_ioc_res_rx, + storage_insert_tx: storage_insert_tx.clone(), storage_insert_queue: VecDeque::new(), storage_insert_sender: SenderPolling::new(storage_insert_tx), ca_conn_res_tx, @@ -300,7 +341,7 @@ impl CaConnSet { shutdown_stopping: false, shutdown_done: false, chan_check_next: None, - stats: CaConnSetStats::new(), + stats: stats.clone(), connset_out_tx, connset_out_queue: VecDeque::new(), // connset_out_sender: SenderPolling::new(connset_out_tx), @@ -308,17 +349,20 @@ impl CaConnSet { await_ca_conn_jhs: VecDeque::new(), thr_msg_poll_1: ThrottleTrace::new(Duration::from_millis(2000)), thr_msg_storage_len: ThrottleTrace::new(Duration::from_millis(1000)), + did_connset_out_queue: false, }; // TODO await on jh let jh = tokio::spawn(CaConnSet::run(connset)); CaConnSetCtrl { tx: connset_inp_tx, rx: connset_out_rx, + stats, jh, } } async fn run(mut this: CaConnSet) -> Result<(), Error> { + debug!("CaConnSet run begin"); loop { let x = this.next().await; match x { @@ -331,6 +375,7 @@ impl CaConnSet { // this.find_ioc_query_tx.sender_count(), // this.find_ioc_query_tx.receiver_count() // ); + debug!("CaConnSet EndOfStream"); this.ioc_finder_jh .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))??; @@ -353,6 +398,7 @@ impl CaConnSet { // ConnSetCmd::SeriesLookupResult(x) => self.handle_series_lookup_result(x).await, ConnSetCmd::CheckHealth(ts1) => self.handle_check_health(ts1), ConnSetCmd::Shutdown => self.handle_shutdown(), + ConnSetCmd::ChannelStatuses(x) => self.handle_channel_statuses_req(x), }, } } @@ -361,7 +407,7 @@ impl CaConnSet { match ev.value { CaConnEventValue::None => Ok(()), CaConnEventValue::EchoTimeout => todo!(), - CaConnEventValue::ConnCommandResult(_) => todo!(), + CaConnEventValue::ConnCommandResult(x) => self.handle_conn_command_result(addr, x), CaConnEventValue::QueryItem(item) => { self.storage_insert_queue.push_back(item); Ok(()) @@ -405,6 +451,7 @@ impl CaConnSet { since: SystemTime::now(), }), }); + self.stats.channel_wait_for_status_id.inc(); let item = ChannelInfoQuery { backend: add.backend, channel: add.name, @@ -436,6 +483,7 @@ impl CaConnSet { }, }, }; + self.stats.channel_wait_for_address.inc(); let qu = IocAddrQuery { name: add.name }; self.find_ioc_query_queue.push_back(qu); } else { @@ -562,12 +610,44 @@ impl CaConnSet { self.thr_msg_storage_len .trigger_fmt("msg", &[&self.storage_insert_sender.len()]); debug!("TODO handle_check_health"); + + // Trigger already the next health check, but use the current data that we have. + + // TODO try to deliver a command to CaConn + // Add some queue for commands to CaConn to the ress. + // Fail here if that queue gets too long. + // Try to push the commands periodically. + for (_, res) in self.ca_conn_ress.iter_mut() { + let item = ConnCommand::check_health(); + res.cmd_queue.push_back(item); + } + let ts2 = Instant::now(); let item = CaConnSetItem::Healthy(ts1, ts2); self.connset_out_queue.push_back(item); Ok(()) } + fn handle_channel_statuses_req(&mut self, req: ChannelStatusesRequest) -> Result<(), Error> { + if self.shutdown_stopping { + return Ok(()); + } + debug!("handle_channel_statuses_req"); + let item = ChannelStatusesResponse { + channels_ca_conn: self.ca_conn_channel_states.clone(), + channels_ca_conn_set: self + .channel_states + .inner() + .iter() + .map(|(k, v)| (k.id().to_string(), v.clone())) + .collect(), + }; + if req.tx.try_send(item).is_err() { + self.stats.response_tx_fail.inc(); + } + Ok(()) + } + fn handle_shutdown(&mut self) -> Result<(), Error> { if self.shutdown_stopping { return Ok(()); @@ -585,22 +665,45 @@ impl CaConnSet { Ok(()) } + fn handle_conn_command_result(&mut self, addr: SocketAddr, res: ConnCommandResult) -> Result<(), Error> { + use crate::ca::conn::ConnCommandResultKind::*; + match res.kind { + CheckHealth(health) => { + // debug!("handle_conn_command_result {addr}"); + for (k, v) in health.channel_statuses { + self.ca_conn_channel_states.insert(k, v); + } + Ok(()) + } + } + } + fn handle_ca_conn_eos(&mut self, addr: SocketAddr) -> Result<(), Error> { - debug!("handle_ca_conn_eos {addr}"); + trace2!("handle_ca_conn_eos {addr}"); if let Some(e) = self.ca_conn_ress.remove(&addr) { + self.stats.ca_conn_eos_ok().inc(); self.await_ca_conn_jhs.push_back((addr, e.jh)); } else { - self.stats.ca_conn_task_eos_non_exist.inc(); + self.stats.ca_conn_eos_unexpected().inc(); warn!("end-of-stream received for non-existent CaConn {addr}"); } + self.remove_status_for_addr(addr)?; debug!("still CaConn left {}", self.ca_conn_ress.len()); Ok(()) } + fn remove_status_for_addr(&mut self, addr: SocketAddr) -> Result<(), Error> { + self.ca_conn_channel_states + .retain(|_k, v| SocketAddr::V4(v.addr) != addr); + Ok(()) + } + fn ready_for_end_of_stream(&self) -> bool { - if self.ca_conn_ress.len() > 0 { + if !self.shutdown_stopping { false - } else if self.await_ca_conn_jhs.len() > 1 { + } else if self.ca_conn_ress.len() > 0 { + false + } else if self.await_ca_conn_jhs.len() > 0 { false } else { true @@ -626,12 +729,14 @@ impl CaConnSet { ); let conn_tx = conn.conn_command_tx(); let conn_stats = conn.stats(); - let ca_conn_res_tx = self.ca_conn_res_tx.clone(); - let jh = tokio::spawn(Self::ca_conn_item_merge(conn, ca_conn_res_tx, addr)); + let tx1 = self.ca_conn_res_tx.clone(); + let tx2 = self.storage_insert_tx.clone(); + let jh = tokio::spawn(Self::ca_conn_item_merge(conn, tx1, tx2, addr)); let ca_conn_res = CaConnRes { state: CaConnState::new(CaConnStateValue::Fresh), sender: conn_tx, stats: conn_stats, + cmd_queue: VecDeque::new(), jh, }; Ok(ca_conn_res) @@ -639,7 +744,8 @@ impl CaConnSet { async fn ca_conn_item_merge( conn: CaConn, - tx: Sender<(SocketAddr, CaConnEvent)>, + tx1: Sender<(SocketAddr, CaConnEvent)>, + tx2: Sender, addr: SocketAddr, ) -> Result<(), Error> { trace2!("ca_conn_consumer begin {}", addr); @@ -650,7 +756,23 @@ impl CaConnSet { match item { Ok(item) => { stats.conn_item_count.inc(); - tx.send((addr, item)).await?; + match item.value { + CaConnEventValue::QueryItem(x) => { + tx2.send(x).await; + } + CaConnEventValue::None => { + tx1.send((addr, item)).await; + } + CaConnEventValue::EchoTimeout => { + tx1.send((addr, item)).await; + } + CaConnEventValue::ConnCommandResult(_) => { + tx1.send((addr, item)).await; + } + CaConnEventValue::EndOfStream => { + tx1.send((addr, item)).await; + } + } } Err(e) => { error!("CaConn gives error: {e:?}"); @@ -659,7 +781,7 @@ impl CaConnSet { } } trace2!("ca_conn_consumer ended {}", addr); - tx.send(( + tx1.send(( addr, CaConnEvent { ts: Instant::now(), @@ -932,7 +1054,9 @@ impl CaConnSet { Ok(()) } + // TODO should use both counters and values fn update_channel_state_counts(&mut self) -> (u64,) { + return (0,); let mut unknown_address = 0; let mut search_pending = 0; let mut unassigned = 0; @@ -979,6 +1103,31 @@ impl CaConnSet { self.stats.channel_assigned.__set(assigned); (search_pending,) } + + fn try_push_ca_conn_cmds(&mut self) { + // debug!("try_push_ca_conn_cmds"); + for (_, v) in self.ca_conn_ress.iter_mut() { + loop { + break if let Some(item) = v.cmd_queue.pop_front() { + match v.sender.try_send(item) { + Ok(()) => continue, + Err(e) => match e { + async_channel::TrySendError::Full(e) => { + self.stats.try_push_ca_conn_cmds_full.inc(); + v.cmd_queue.push_front(e); + break; + } + async_channel::TrySendError::Closed(_) => { + // TODO + self.stats.try_push_ca_conn_cmds_closed.inc(); + break; + } + }, + } + }; + } + } + } } impl Stream for CaConnSet { @@ -986,13 +1135,39 @@ impl Stream for CaConnSet { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + self.stats.poll_fn_begin().inc(); + debug!("CaConnSet poll"); loop { - self.thr_msg_poll_1.trigger("CaConnSet"); + self.stats.poll_loop_begin().inc(); + let n1 = self.channel_info_query_queue.len(); + let p2 = self.channel_info_query_sender.len(); + let p3 = self.channel_info_res_rx.len(); + self.thr_msg_poll_1 + .trigger_fmt("CaConnSet channel_info_query_queue", &[&n1, &p2, &p3]); + + self.stats.storage_insert_tx_len.set(self.storage_insert_tx.len() as _); + self.stats + .channel_info_query_sender_len + .set(self.channel_info_query_sender.len().unwrap_or(0) as _); + self.stats + .channel_info_res_tx_len + .set(self.channel_info_res_tx.len() as _); + self.stats + .find_ioc_query_sender_len + .set(self.find_ioc_query_sender.len().unwrap_or(0) as _); + self.stats.ca_conn_res_tx_len.set(self.ca_conn_res_tx.len() as _); let mut have_pending = false; - if let Some(item) = self.connset_out_queue.pop_front() { - break Ready(Some(item)); + self.try_push_ca_conn_cmds(); + + if self.did_connset_out_queue { + self.did_connset_out_queue = false; + } else { + if let Some(item) = self.connset_out_queue.pop_front() { + self.did_connset_out_queue = true; + break Ready(Some(item)); + } } if let Some((addr, jh)) = self.await_ca_conn_jhs.front_mut() { @@ -1016,19 +1191,23 @@ impl Stream for CaConnSet { } } } - Pending => {} + Pending => { + have_pending = true; + } } } + // TODO should never send from here, track. if self.storage_insert_sender.is_idle() { if let Some(item) = self.storage_insert_queue.pop_front() { + self.stats.logic_error().inc(); self.storage_insert_sender.send(item); } } if self.storage_insert_sender.is_sending() { match self.storage_insert_sender.poll_unpin(cx) { - Ready(Ok(())) => continue, - Ready(Err(e)) => { + Ready(Ok(())) => {} + Ready(Err(_)) => { let e = Error::with_msg_no_trace("can not send into channel"); error!("{e}"); break Ready(Some(CaConnSetItem::Error(e))); @@ -1046,8 +1225,8 @@ impl Stream for CaConnSet { } if self.find_ioc_query_sender.is_sending() { match self.find_ioc_query_sender.poll_unpin(cx) { - Ready(Ok(())) => continue, - Ready(Err(e)) => { + Ready(Ok(())) => {} + Ready(Err(_)) => { let e = Error::with_msg_no_trace("can not send into channel"); error!("{e}"); break Ready(Some(CaConnSetItem::Error(e))); @@ -1065,8 +1244,8 @@ impl Stream for CaConnSet { } if self.channel_info_query_sender.is_sending() { match self.channel_info_query_sender.poll_unpin(cx) { - Ready(Ok(())) => continue, - Ready(Err(e)) => { + Ready(Ok(())) => {} + Ready(Err(_)) => { let e = Error::with_msg_no_trace("can not send into channel"); error!("{e}"); break Ready(Some(CaConnSetItem::Error(e))); @@ -1079,7 +1258,7 @@ impl Stream for CaConnSet { let item = match self.find_ioc_res_rx.poll_next_unpin(cx) { Ready(Some(x)) => match self.handle_ioc_query_result(x) { - Ok(()) => continue, + Ok(()) => Ready(None), Err(e) => Ready(Some(CaConnSetItem::Error(e))), }, Ready(None) => Ready(None), @@ -1095,7 +1274,7 @@ impl Stream for CaConnSet { let item = match self.ca_conn_res_rx.poll_next_unpin(cx) { Ready(Some((addr, ev))) => match self.handle_ca_conn_event(addr, ev) { - Ok(()) => continue, + Ok(()) => Ready(None), Err(e) => Ready(Some(CaConnSetItem::Error(e))), }, Ready(None) => Ready(None), @@ -1111,7 +1290,7 @@ impl Stream for CaConnSet { let item = match self.channel_info_res_rx.poll_next_unpin(cx) { Ready(Some(x)) => match self.handle_series_lookup_result(x) { - Ok(()) => continue, + Ok(()) => Ready(None), Err(e) => Ready(Some(CaConnSetItem::Error(e))), }, Ready(None) => Ready(None), @@ -1127,7 +1306,7 @@ impl Stream for CaConnSet { let item = match self.connset_inp_rx.poll_next_unpin(cx) { Ready(Some(x)) => match self.handle_event(x) { - Ok(()) => continue, + Ok(()) => Ready(None), Err(e) => Ready(Some(CaConnSetItem::Error(e))), }, Ready(None) => Ready(None), @@ -1141,19 +1320,21 @@ impl Stream for CaConnSet { _ => {} } - break if have_pending { - if self.shutdown_stopping && self.ready_for_end_of_stream() { - Ready(None) + break if self.ready_for_end_of_stream() { + if have_pending { + self.stats.ready_for_end_of_stream_with_pending().inc(); } else { - Pending + self.stats.ready_for_end_of_stream_no_pending().inc(); } - } else if self.shutdown_stopping && self.ready_for_end_of_stream() { - debug!("nothing to do but shutdown"); Ready(None) } else { - let e = Error::with_msg_no_trace("connset not pending and not shutdown"); - error!("{e}"); - Ready(Some(CaConnSetItem::Error(e))) + if have_pending { + self.stats.poll_pending().inc(); + Pending + } else { + self.stats.poll_reloop().inc(); + continue; + } }; } } diff --git a/netfetch/src/ca/statemap.rs b/netfetch/src/ca/statemap.rs index 0ef2339..dff54b8 100644 --- a/netfetch/src/ca/statemap.rs +++ b/netfetch/src/ca/statemap.rs @@ -33,23 +33,20 @@ impl CaConnState { } } -#[derive(Clone, Debug, Serialize)] +#[derive(Debug, Clone, Serialize)] pub enum ConnectionStateValue { Unconnected, - Connected { - //#[serde(with = "serde_Instant")] - since: SystemTime, - }, + Connected, } -#[derive(Clone, Debug, Serialize)] +#[derive(Debug, Clone, Serialize)] pub struct ConnectionState { //#[serde(with = "serde_Instant")] pub updated: SystemTime, pub value: ConnectionStateValue, } -#[derive(Clone, Debug, Serialize)] +#[derive(Debug, Clone, Serialize)] pub enum WithAddressState { Unassigned { //#[serde(with = "serde_Instant")] @@ -58,7 +55,7 @@ pub enum WithAddressState { Assigned(ConnectionState), } -#[derive(Clone, Debug, Serialize)] +#[derive(Debug, Clone, Serialize)] pub enum WithStatusSeriesIdStateInner { UnknownAddress { since: SystemTime, @@ -76,12 +73,12 @@ pub enum WithStatusSeriesIdStateInner { }, } -#[derive(Clone, Debug, Serialize)] +#[derive(Debug, Clone, Serialize)] pub struct WithStatusSeriesIdState { pub inner: WithStatusSeriesIdStateInner, } -#[derive(Clone, Debug)] +#[derive(Debug, Clone, Serialize)] pub enum ActiveChannelState { Init { since: SystemTime, @@ -95,18 +92,18 @@ pub enum ActiveChannelState { }, } -#[derive(Debug)] +#[derive(Debug, Clone, Serialize)] pub enum ChannelStateValue { Active(ActiveChannelState), ToRemove { addr: Option }, } -#[derive(Debug)] +#[derive(Debug, Clone, Serialize)] pub struct ChannelState { pub value: ChannelStateValue, } -#[derive(Debug)] +#[derive(Debug, Clone, Serialize)] pub struct ChannelStateMap { map: BTreeMap, } diff --git a/netfetch/src/insertworker.rs b/netfetch/src/insertworker.rs deleted file mode 100644 index 8b13789..0000000 --- a/netfetch/src/insertworker.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/netfetch/src/lib.rs b/netfetch/src/lib.rs index e101dbf..79c23f2 100644 --- a/netfetch/src/lib.rs +++ b/netfetch/src/lib.rs @@ -2,7 +2,6 @@ pub mod ca; pub mod conf; pub mod daemon_common; pub mod errconv; -pub mod insertworker; pub mod linuxhelper; pub mod metrics; pub mod netbuf; diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 7511424..c479bdc 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -1,3 +1,8 @@ +use crate::ca::conn::ChannelStateInfo; +use crate::ca::connset::CaConnSetEvent; +use crate::ca::connset::ChannelStatusesRequest; +use crate::ca::connset::ChannelStatusesResponse; +use crate::ca::connset::ConnSetCmd; use crate::ca::METRICS; use crate::daemon_common::DaemonEvent; use async_channel::Receiver; @@ -10,10 +15,12 @@ use log::*; use scywr::iteminsertqueue::QueryItem; use serde::Deserialize; use serde::Serialize; +use stats::CaConnSetStats; use stats::CaConnStats; use stats::CaConnStatsAgg; use stats::CaConnStatsAggDiff; use stats::DaemonStats; +use stats::InsertWorkerStats; use std::collections::HashMap; use std::net::SocketAddrV4; use std::sync::atomic::Ordering; @@ -23,11 +30,21 @@ use taskrun::tokio; pub struct StatsSet { daemon: Arc, + ca_conn_set: Arc, + insert_worker_stats: Arc, } impl StatsSet { - pub fn new(daemon: Arc) -> Self { - Self { daemon } + pub fn new( + daemon: Arc, + ca_conn_set: Arc, + insert_worker_stats: Arc, + ) -> Self { + Self { + daemon, + ca_conn_set, + insert_worker_stats, + } } } @@ -102,13 +119,28 @@ async fn channel_state(params: HashMap, dcom: Arc) - axum::Json(false) } -async fn channel_states( - params: HashMap, - dcom: Arc, -) -> axum::Json> { - let limit = params.get("limit").map(|x| x.parse()).unwrap_or(Ok(40)).unwrap_or(40); - error!("TODO channel_state"); - axum::Json(Vec::new()) +// axum::Json +async fn channel_states(params: HashMap, tx: Sender) -> String { + let limit = params + .get("limit") + .map(|x| x.parse().ok()) + .unwrap_or(None) + .unwrap_or(40); + let (tx2, rx2) = async_channel::bounded(1); + let req = ChannelStatusesRequest { tx: tx2 }; + let item = CaConnSetEvent::ConnSetCmd(ConnSetCmd::ChannelStatuses(req)); + // TODO handle error + tx.send(item).await; + let res = rx2.recv().await.unwrap(); + match serde_json::to_string(&res) { + Ok(x) => x, + Err(e) => { + error!("Serialize error {e}"); + Err::<(), _>(e).unwrap(); + panic!(); + } + } + // axum::Json(res) } async fn extra_inserts_conf_set(v: ExtraInsertsConf, dcom: Arc) -> axum::Json { @@ -136,7 +168,7 @@ impl DaemonComm { } } -fn make_routes(dcom: Arc, stats_set: StatsSet) -> axum::Router { +fn make_routes(dcom: Arc, connset_cmd_tx: Sender, stats_set: StatsSet) -> axum::Router { use axum::extract; use axum::routing::get; use axum::routing::put; @@ -162,8 +194,12 @@ fn make_routes(dcom: Arc, stats_set: StatsSet) -> axum::Router { get({ // || async move { - info!("metrics"); - let s1 = stats_set.daemon.prometheus(); + debug!("metrics"); + let mut s1 = stats_set.daemon.prometheus(); + let s2 = stats_set.ca_conn_set.prometheus(); + let s3 = stats_set.insert_worker_stats.prometheus(); + s1.push_str(&s2); + s1.push_str(&s3); s1 } }), @@ -186,7 +222,8 @@ fn make_routes(dcom: Arc, stats_set: StatsSet) -> axum::Router { "/daqingest/channel/states", get({ let dcom = dcom.clone(); - |Query(params): Query>| channel_states(params, dcom) + let tx = connset_cmd_tx.clone(); + |Query(params): Query>| channel_states(params, tx) }), ) .route( @@ -248,11 +285,12 @@ fn make_routes(dcom: Arc, stats_set: StatsSet) -> axum::Router { pub async fn metrics_service( bind_to: String, dcom: Arc, + connset_cmd_tx: Sender, stats_set: StatsSet, shutdown_signal: Receiver, ) { let addr = bind_to.parse().unwrap(); - let router = make_routes(dcom, stats_set).into_make_service(); + let router = make_routes(dcom, connset_cmd_tx, stats_set).into_make_service(); axum::Server::bind(&addr) .serve(router) .with_graceful_shutdown(async move { diff --git a/netfetch/src/throttletrace.rs b/netfetch/src/throttletrace.rs index 49d3ab8..c5ba9b9 100644 --- a/netfetch/src/throttletrace.rs +++ b/netfetch/src/throttletrace.rs @@ -36,6 +36,11 @@ impl ThrottleTrace { debug!("{} {:?} (count {})", msg, params[0], self.count); } else if params.len() == 2 { debug!("{} {:?} {:?} (count {})", msg, params[0], params[1], self.count); + } else if params.len() == 3 { + debug!( + "{} {:?} {:?} {:?} (count {})", + msg, params[0], params[1], params[2], self.count + ); } else { debug!("{} (count {})", msg, self.count); } diff --git a/netfetch/src/timebin.rs b/netfetch/src/timebin.rs index 1a23de3..fb22ede 100644 --- a/netfetch/src/timebin.rs +++ b/netfetch/src/timebin.rs @@ -139,6 +139,9 @@ impl ConnTimeBin { self.tick_fn = Box::new(tick::); self.did_setup = true; } + STRING => { + trace2!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); + } _ => { warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); } diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index ec8c0c7..a5eb9c1 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -11,6 +11,7 @@ use netpod::timeunits::MS; use netpod::timeunits::SEC; use netpod::ScyllaConfig; use stats::CaConnStats; +use stats::InsertWorkerStats; use std::sync::atomic; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; @@ -20,26 +21,26 @@ use std::time::Instant; use taskrun::tokio; use taskrun::tokio::task::JoinHandle; -fn stats_inc_for_err(stats: &stats::CaConnStats, err: &crate::iteminsertqueue::Error) { +fn stats_inc_for_err(stats: &stats::InsertWorkerStats, err: &crate::iteminsertqueue::Error) { use crate::iteminsertqueue::Error; match err { Error::DbOverload => { - stats.store_worker_insert_overload.inc(); + stats.db_overload().inc(); } Error::DbTimeout => { - stats.store_worker_insert_timeout.inc(); + stats.db_timeout().inc(); } Error::DbUnavailable => { - stats.store_worker_insert_unavailable.inc(); + stats.db_unavailable().inc(); } Error::DbError(e) => { if false { warn!("db error {e}"); } - stats.store_worker_insert_error.inc(); + stats.db_error().inc(); } Error::QueryError(_) => { - stats.store_worker_insert_error.inc(); + stats.query_error().inc(); } } } @@ -75,7 +76,7 @@ async fn rate_limiter_worker( rate: Arc, inp: Receiver, tx: Sender, - stats: Arc, + stats: Arc, ) { let mut ts_forward_last = Instant::now(); let mut ivl_ema = stats::Ema64::with_k(0.00001); @@ -103,7 +104,7 @@ async fn rate_limiter_worker( let ivl2 = Duration::from_nanos(ema2.ema() as u64); if allowed_to_drop && ivl2 < dt_min { //tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await; - stats.store_worker_ratelimit_drop.inc(); + stats.ratelimit_drop().inc(); } else { if tx.send(item).await.is_err() { break; @@ -113,7 +114,7 @@ async fn rate_limiter_worker( let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64; ivl_ema.update(dt_ns.min(MS * 100) as f32); ts_forward_last = tsnow; - stats.inter_ivl_ema.set(ivl_ema.ema() as u64); + // stats.inter_ivl_ema.set(ivl_ema.ema() as u64); } } } @@ -123,7 +124,7 @@ async fn rate_limiter_worker( fn rate_limiter( inp: Receiver, opts: Arc, - stats: Arc, + stats: Arc, ) -> Receiver { let (tx, rx) = async_channel::bounded(inp.capacity().unwrap_or(256)); tokio::spawn(rate_limiter_worker(opts.store_workers_rate.clone(), inp, tx, stats)); @@ -136,7 +137,7 @@ async fn worker( ttls: Ttls, insert_worker_opts: Arc, data_store: Arc, - stats: Arc, + stats: Arc, ) -> Result<(), Error> { insert_worker_opts .insert_workers_running @@ -146,7 +147,7 @@ async fn worker( let mut i1 = 0; loop { let item = if let Ok(item) = item_inp.recv().await { - stats.store_worker_item_recv.inc(); + stats.item_recv.inc(); item } else { break; @@ -155,7 +156,7 @@ async fn worker( QueryItem::ConnectionStatus(item) => { match insert_connection_status(item, ttls.index, &data_store, &stats).await { Ok(_) => { - stats.connection_status_insert_done.inc(); + stats.inserted_connection_status().inc(); backoff = backoff_0; } Err(e) => { @@ -167,7 +168,7 @@ async fn worker( QueryItem::ChannelStatus(item) => { match insert_channel_status(item, ttls.index, &data_store, &stats).await { Ok(_) => { - stats.channel_status_insert_done.inc(); + stats.inserted_channel_status().inc(); backoff = backoff_0; } Err(e) => { @@ -177,22 +178,26 @@ async fn worker( } } QueryItem::Insert(item) => { - let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire); - if i1 % 1000 < insert_frac { - match insert_item(item, ttls.index, ttls.d0, ttls.d1, &data_store, &stats).await { - Ok(_) => { - stats.store_worker_insert_done.inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } - } + if true { + stats.inserted_values().inc(); } else { - stats.store_worker_fraction_drop.inc(); + let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire); + if i1 % 1000 < insert_frac { + match insert_item(item, ttls.index, ttls.d0, ttls.d1, &data_store, &stats).await { + Ok(_) => { + stats.inserted_values().inc(); + backoff = backoff_0; + } + Err(e) => { + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; + } + } + } else { + stats.fraction_drop().inc(); + } + i1 += 1; } - i1 += 1; } QueryItem::Mute(item) => { let values = ( @@ -206,7 +211,7 @@ async fn worker( let qres = data_store.scy.execute(&data_store.qu_insert_muted, values).await; match qres { Ok(_) => { - stats.mute_insert_done.inc(); + stats.inserted_mute().inc(); backoff = backoff_0; } Err(e) => { @@ -230,7 +235,7 @@ async fn worker( .await; match qres { Ok(_) => { - stats.ivl_insert_done.inc(); + stats.inserted_interval().inc(); backoff = backoff_0; } Err(e) => { @@ -252,7 +257,7 @@ async fn worker( let qres = data_store.scy.execute(&data_store.qu_insert_channel_ping, params).await; match qres { Ok(_) => { - stats.channel_info_insert_done.inc(); + stats.inserted_channel_info().inc(); backoff = backoff_0; } Err(e) => { @@ -281,7 +286,7 @@ async fn worker( .await; match qres { Ok(_) => { - stats.store_worker_insert_binned_done.inc(); + stats.inserted_binned().inc(); backoff = backoff_0; } Err(e) => { @@ -305,7 +310,7 @@ pub async fn spawn_scylla_insert_workers( insert_worker_count: usize, item_inp: Receiver, insert_worker_opts: Arc, - store_stats: Arc, + store_stats: Arc, use_rate_limit_queue: bool, ttls: Ttls, ) -> Result>>, Error> { diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 84179d1..a49b3a2 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -13,6 +13,7 @@ use scylla::transport::errors::DbError; use scylla::transport::errors::QueryError; use series::SeriesId; use stats::CaConnStats; +use stats::InsertWorkerStats; use std::net::SocketAddrV4; use std::sync::atomic; use std::sync::atomic::AtomicU64; @@ -388,12 +389,12 @@ pub async fn insert_item( ttl_0d: Duration, ttl_1d: Duration, data_store: &DataStore, - stats: &CaConnStats, + stats: &InsertWorkerStats, ) -> Result<(), Error> { if item.msp_bump { let params = (item.series.id() as i64, item.ts_msp as i64, ttl_index.as_secs() as i32); data_store.scy.execute(&data_store.qu_insert_ts_msp, params).await?; - stats.inserts_msp.inc(); + stats.inserts_msp().inc(); } if let Some(ts_msp_grid) = item.ts_msp_grid { let params = ( @@ -408,7 +409,7 @@ pub async fn insert_item( .scy .execute(&data_store.qu_insert_series_by_ts_msp, params) .await?; - stats.inserts_msp_grid.inc(); + stats.inserts_msp_grid().inc(); } use DataValue::*; match item.val { @@ -467,7 +468,7 @@ pub async fn insert_item( } } } - stats.inserts_val.inc(); + stats.inserts_value().inc(); Ok(()) } @@ -475,7 +476,7 @@ pub async fn insert_connection_status( item: ConnectionStatusItem, ttl: Duration, data_store: &DataStore, - _stats: &CaConnStats, + _stats: &InsertWorkerStats, ) -> Result<(), Error> { let tsunix = item.ts.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::ZERO); let secs = tsunix.as_secs() * netpod::timeunits::SEC; @@ -497,7 +498,7 @@ pub async fn insert_channel_status( item: ChannelStatusItem, ttl: Duration, data_store: &DataStore, - _stats: &CaConnStats, + _stats: &InsertWorkerStats, ) -> Result<(), Error> { let tsunix = item.ts.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::ZERO); let secs = tsunix.as_secs() * netpod::timeunits::SEC; diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 6ffe9df..5282440 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -221,11 +221,60 @@ stats_proc::stats_struct!(( ca_conn_task_join_done_ok, ca_conn_task_join_done_err, ca_conn_task_join_err, - ca_conn_task_eos_non_exist, + ca_conn_eos_ok, + ca_conn_eos_unexpected, + response_tx_fail, + try_push_ca_conn_cmds_full, + try_push_ca_conn_cmds_closed, + channel_wait_for_status_id, + channel_wait_for_address, + logic_error, + ready_for_end_of_stream_with_pending, + ready_for_end_of_stream_no_pending, + poll_fn_begin, + poll_loop_begin, + poll_pending, + poll_reloop, + ), + values( + storage_insert_tx_len, + channel_info_query_sender_len, + channel_info_res_tx_len, + find_ioc_query_sender_len, + ca_conn_res_tx_len, ), ), // agg(name(CaConnSetStatsAgg), parent(CaConnSetStats)), // diff(name(CaConnSetStatsDiff), input(CaConnSetStats)), + stats_struct( + name(SeriesByChannelStats), + prefix(seriesbychannel), + counters(res_tx_fail, res_tx_timeout,), + ), + stats_struct( + name(InsertWorkerStats), + prefix(insert_worker), + counters( + item_recv, + inserted_values, + inserted_connection_status, + inserted_channel_status, + fraction_drop, + inserted_mute, + inserted_interval, + inserted_channel_info, + inserted_binned, + db_overload, + db_timeout, + db_unavailable, + db_error, + query_error, + inserts_msp, + inserts_msp_grid, + inserts_value, + ratelimit_drop, + ) + ), )); // #[cfg(DISABLED)] diff --git a/stats_proc/src/stats_proc.rs b/stats_proc/src/stats_proc.rs index cc431db..b61add6 100644 --- a/stats_proc/src/stats_proc.rs +++ b/stats_proc/src/stats_proc.rs @@ -101,14 +101,14 @@ fn stats_struct_impl(st: &StatsStructDef) -> String { } for x in &st.values { let n = x.to_string(); - let nn = if let Some(pre) = &st.prefix { - format!("{pre}_{n}") + let pre = if let Some(x) = &st.prefix { + format!("_{}", x) } else { - n.to_string() + String::new() }; buf.push_str(&format!( - "ret.push_str(&format!(\"daqingest_{} {{}}\\n\", self.{}.load()));\n", - nn, n + "ret.push_str(&format!(\"daqingest{}_{} {{}}\\n\", self.{}.load()));\n", + pre, n, n )); } format!(