This commit is contained in:
Dominik Werder
2023-09-14 10:14:50 +02:00
parent c17109bbb9
commit f869047cf5
16 changed files with 611 additions and 260 deletions

View File

@@ -68,9 +68,9 @@ dependencies = [
[[package]]
name = "anstyle"
version = "1.0.2"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15c4c2c83f81532e5845a733998b6971faca23490340a418e9b72a3ec9de12ea"
checksum = "b84bf0a05bbb2a83e5eb6fa36bb6e87baa08193c35ff52bbf6b38d8af2890e46"
[[package]]
name = "anstyle-parse"
@@ -143,7 +143,7 @@ checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.33",
]
[[package]]
@@ -233,9 +233,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]]
name = "base64"
version = "0.21.3"
version = "0.21.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "414dcefbc63d77c526a76b3afcf6fbb9b5e2791c19c3aa2297733208750c6e53"
checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2"
[[package]]
name = "batchtools"
@@ -344,9 +344,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "1.4.0"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be"
checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223"
[[package]]
name = "cc"
@@ -365,25 +365,24 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.28"
version = "0.4.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95ed24df0632f708f5f6d8082675bef2596f7084dee3dd55f632290bf35bfe0f"
checksum = "defd4e7873dbddba6c7c91e199c7fcb946abc4a6a4ac3195400bcfb01b5de877"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"serde",
"time 0.1.45",
"wasm-bindgen",
"windows-targets",
]
[[package]]
name = "clap"
version = "4.4.2"
version = "4.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a13b88d2c62ff462f88e4a121f17a82c1af05693a2f192b5c38d14de73c19f6"
checksum = "84ed82781cea27b43c9b106a979fe450a13a31aab0500595fb3fc06616de08e6"
dependencies = [
"clap_builder",
"clap_derive",
@@ -410,7 +409,7 @@ dependencies = [
"heck 0.4.1",
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.33",
]
[[package]]
@@ -646,6 +645,16 @@ dependencies = [
"scopeguard",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.16"
@@ -667,10 +676,9 @@ dependencies = [
[[package]]
name = "daqingest"
version = "0.2.0-alpha.0"
version = "0.2.0-alpha.1"
dependencies = [
"async-channel",
"batchtools",
"bytes",
"chrono",
"clap",
@@ -712,7 +720,7 @@ dependencies = [
"ident_case",
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.33",
]
[[package]]
@@ -723,7 +731,7 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5"
dependencies = [
"darling_core",
"quote",
"syn 2.0.29",
"syn 2.0.33",
]
[[package]]
@@ -751,6 +759,7 @@ dependencies = [
"md-5",
"netpod",
"series",
"stats",
"taskrun",
"tokio-postgres",
]
@@ -827,7 +836,7 @@ dependencies = [
"darling",
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.33",
]
[[package]]
@@ -902,6 +911,12 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764"
[[package]]
name = "finl_unicode"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fcfdc7a0362c9f4444381a9e697c79d435fe65b52a37466fc2c1184cee9edc6"
[[package]]
name = "flate2"
version = "1.0.27"
@@ -1004,7 +1019,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.33",
]
[[package]]
@@ -1064,7 +1079,7 @@ checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427"
dependencies = [
"cfg-if",
"libc",
"wasi 0.11.0+wasi-snapshot-preview1",
"wasi",
]
[[package]]
@@ -1463,7 +1478,7 @@ dependencies = [
name = "items_proc"
version = "0.0.2"
dependencies = [
"syn 2.0.29",
"syn 2.0.33",
]
[[package]]
@@ -1513,15 +1528,15 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67"
[[package]]
name = "libc"
version = "0.2.147"
version = "0.2.148"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3"
checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b"
[[package]]
name = "linux-raw-sys"
version = "0.4.5"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57bcfdad1b858c2db7c38303a6d2ad4dfaf5eb53dfeb0910128b2c26d6158503"
checksum = "1a9bad9f94746442c783ca431b22403b519cd7fbeed0533fdd6328b2f2212128"
[[package]]
name = "lock_api"
@@ -1590,9 +1605,9 @@ dependencies = [
[[package]]
name = "memchr"
version = "2.6.2"
version = "2.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5486aed0026218e61b8a01d5fbd5a0a134649abb71a0e53b7bc088529dced86e"
checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c"
[[package]]
name = "memmap2"
@@ -1603,6 +1618,15 @@ dependencies = [
"libc",
]
[[package]]
name = "memmap2"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d28bba84adfe6646737845bc5ebbfa2c08424eb1c37e94a1fd2a82adb56a872"
dependencies = [
"libc",
]
[[package]]
name = "memoffset"
version = "0.8.0"
@@ -1649,7 +1673,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2"
dependencies = [
"libc",
"wasi 0.11.0+wasi-snapshot-preview1",
"wasi",
"windows-sys 0.48.0",
]
@@ -1684,6 +1708,7 @@ dependencies = [
"arrayref",
"async-channel",
"axum",
"batchtools",
"bitshuffle",
"byteorder",
"bytes",
@@ -1817,14 +1842,14 @@ dependencies = [
"proc-macro-crate",
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.33",
]
[[package]]
name = "object"
version = "0.32.0"
version = "0.32.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77ac5bbd07aea88c60a577a1ce218075ffd59208b2d7ca97adf9bfc5aeb21ebe"
checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0"
dependencies = [
"memchr",
]
@@ -1858,7 +1883,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.33",
]
[[package]]
@@ -1869,9 +1894,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
version = "0.9.92"
version = "0.9.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db7e971c2c2bba161b2d2fdf37080177eff520b3bc044787c7f1f5f9e78d869b"
checksum = "db4d56a4c0478783083cfafcc42493dd4a981d41669da64b4572a2a089b51b1d"
dependencies = [
"cc",
"libc",
@@ -1972,7 +1997,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.33",
]
[[package]]
@@ -2010,7 +2035,7 @@ version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49b6c5ef183cd3ab4ba005f1ca64c21e8bd97ce4699cfea9e8d9a2c4958ca520"
dependencies = [
"base64 0.21.3",
"base64 0.21.4",
"byteorder",
"bytes",
"fallible-iterator",
@@ -2076,9 +2101,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.66"
version = "1.0.67"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9"
checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328"
dependencies = [
"unicode-ident",
]
@@ -2250,13 +2275,13 @@ dependencies = [
[[package]]
name = "regex"
version = "1.9.4"
version = "1.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12de2eff854e5fa4b1295edd650e227e9d8fb0c9e90b12e7f36d6a6811791a29"
checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata 0.3.7",
"regex-automata 0.3.8",
"regex-syntax 0.7.5",
]
@@ -2271,9 +2296,9 @@ dependencies = [
[[package]]
name = "regex-automata"
version = "0.3.7"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49530408a136e16e5b486e883fbb6ba058e8e4e8ae6621a77b048b314336e629"
checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795"
dependencies = [
"aho-corasick",
"memchr",
@@ -2381,9 +2406,9 @@ dependencies = [
[[package]]
name = "rustix"
version = "0.38.11"
version = "0.38.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0c3dde1fc030af041adc40e79c0e7fbcf431dd24870053d187d7c66e4b87453"
checksum = "d7db8590df6dfcd144d22afd1b83b36c21a18d7cbc1dc4bb5295a8712e9eb662"
dependencies = [
"bitflags 2.4.0",
"errno",
@@ -2444,10 +2469,10 @@ dependencies = [
"scylla-macros",
"smallvec",
"snap",
"socket2 0.5.3",
"socket2 0.5.4",
"strum",
"strum_macros",
"thiserror 1.0.47",
"thiserror 1.0.48",
"tokio",
"tracing",
"uuid",
@@ -2469,7 +2494,7 @@ dependencies = [
"num_enum",
"scylla-macros",
"snap",
"thiserror 1.0.47",
"thiserror 1.0.48",
"tokio",
"uuid",
]
@@ -2529,6 +2554,12 @@ dependencies = [
"libc",
]
[[package]]
name = "self_cell"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c309e515543e67811222dbc9e3dd7e1056279b782e1dacffe4242b718734fb6"
[[package]]
name = "semver"
version = "1.0.18"
@@ -2573,14 +2604,14 @@ checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.33",
]
[[package]]
name = "serde_json"
version = "1.0.105"
version = "1.0.107"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "693151e1ac27563d6dbcec9dee9fbd5da8539b20fa14ad3752b2e6d363ace360"
checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"
dependencies = [
"itoa",
"ryu",
@@ -2650,6 +2681,16 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "shared-buffer"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cf61602ee61e2f83dd016b3e6387245291cf728ea071c378b35088125b4d995"
dependencies = [
"bytes",
"memmap2 0.6.2",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.1"
@@ -2714,9 +2755,9 @@ dependencies = [
[[package]]
name = "socket2"
version = "0.5.3"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877"
checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e"
dependencies = [
"libc",
"windows-sys 0.48.0",
@@ -2760,7 +2801,7 @@ version = "0.0.1"
dependencies = [
"quote",
"stats_types",
"syn 2.0.29",
"syn 2.0.33",
]
[[package]]
@@ -2796,10 +2837,11 @@ dependencies = [
[[package]]
name = "stringprep"
version = "0.1.3"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db3737bde7edce97102e0e2b15365bf7a20bfdb5f60f4f9e8d7004258a51a8da"
checksum = "bb41d74e231a107a1b4ee36bd1214b11285b77768d2e3824aedafa988fd36ee6"
dependencies = [
"finl_unicode",
"unicode-bidi",
"unicode-normalization",
]
@@ -2848,9 +2890,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.29"
version = "2.0.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c324c494eba9d92503e6f1ef2e6df781e78f6a7705a0202d9801b198807d518a"
checksum = "9caece70c63bfba29ec2fed841a09851b14a235c60010fa4de58089b6c025668"
dependencies = [
"proc-macro2",
"quote",
@@ -2885,7 +2927,7 @@ dependencies = [
"err",
"futures-util",
"lazy_static",
"time 0.3.28",
"time",
"tokio",
"tracing",
"tracing-subscriber",
@@ -2914,11 +2956,11 @@ dependencies = [
[[package]]
name = "thiserror"
version = "1.0.47"
version = "1.0.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f"
checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7"
dependencies = [
"thiserror-impl 1.0.47",
"thiserror-impl 1.0.48",
]
[[package]]
@@ -2928,18 +2970,18 @@ source = "git+https://github.com/dominikwerder/thiserror.git#052df05c18b5f26b462
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.33",
]
[[package]]
name = "thiserror-impl"
version = "1.0.47"
version = "1.0.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b"
checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.33",
]
[[package]]
@@ -2952,17 +2994,6 @@ dependencies = [
"once_cell",
]
[[package]]
name = "time"
version = "0.1.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a"
dependencies = [
"libc",
"wasi 0.10.0+wasi-snapshot-preview1",
"winapi",
]
[[package]]
name = "time"
version = "0.3.28"
@@ -3020,7 +3051,7 @@ dependencies = [
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.5.3",
"socket2 0.5.4",
"tokio-macros",
"tracing",
"windows-sys 0.48.0",
@@ -3044,7 +3075,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.33",
]
[[package]]
@@ -3077,7 +3108,7 @@ dependencies = [
"postgres-protocol",
"postgres-types",
"rand",
"socket2 0.5.3",
"socket2 0.5.4",
"tokio",
"tokio-util",
"whoami",
@@ -3116,9 +3147,9 @@ checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b"
[[package]]
name = "toml_edit"
version = "0.19.14"
version = "0.19.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a"
checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421"
dependencies = [
"indexmap 2.0.0",
"toml_datetime",
@@ -3133,7 +3164,7 @@ checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a"
dependencies = [
"async-trait",
"axum",
"base64 0.21.3",
"base64 0.21.4",
"bytes",
"futures-core",
"futures-util",
@@ -3206,7 +3237,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.33",
]
[[package]]
@@ -3243,7 +3274,7 @@ dependencies = [
"sharded-slab",
"smallvec",
"thread_local",
"time 0.3.28",
"time",
"tracing",
"tracing-core",
"tracing-log",
@@ -3279,9 +3310,9 @@ checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460"
[[package]]
name = "unicode-ident"
version = "1.0.11"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c"
checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b"
[[package]]
name = "unicode-normalization"
@@ -3357,12 +3388,6 @@ dependencies = [
"try-lock",
]
[[package]]
name = "wasi"
version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
@@ -3390,7 +3415,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.33",
"wasm-bindgen-shared",
]
@@ -3435,7 +3460,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.29",
"syn 2.0.33",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@@ -3448,9 +3473,9 @@ checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1"
[[package]]
name = "wasmer"
version = "4.1.2"
version = "4.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc7142dbb91ede83cc0aef2301fa75fcc7e0c9e5a7d5358e3c4f3a7249fe9ce8"
checksum = "b7954f3bdeb6cc9b46ddfbab7d795fc6a249a79f51c102d95679390526cf43d8"
dependencies = [
"bytes",
"cfg-if",
@@ -3461,8 +3486,9 @@ dependencies = [
"rustc-demangle",
"serde",
"serde-wasm-bindgen",
"shared-buffer",
"target-lexicon",
"thiserror 1.0.47",
"thiserror 1.0.48",
"wasm-bindgen",
"wasm-bindgen-downcast",
"wasmer-compiler",
@@ -3475,21 +3501,25 @@ dependencies = [
[[package]]
name = "wasmer-compiler"
version = "4.1.2"
version = "4.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5b99c70711ec7631b602a9fc95577c40df21e8f3916159c9d80c3fb4f77abdc"
checksum = "7c72380ddff4d9710366527100183df6f943caab5b63adc7cac6d90bacdf4628"
dependencies = [
"backtrace",
"bytes",
"cfg-if",
"enum-iterator",
"enumset",
"lazy_static",
"leb128",
"memmap2",
"memmap2 0.5.10",
"more-asserts",
"region",
"rkyv",
"self_cell",
"shared-buffer",
"smallvec",
"thiserror 1.0.47",
"thiserror 1.0.48",
"wasmer-types",
"wasmer-vm",
"wasmparser",
@@ -3498,9 +3528,9 @@ dependencies = [
[[package]]
name = "wasmer-compiler-cranelift"
version = "4.1.2"
version = "4.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52aef2ef35513a04fed54de9a7dc9c469d4742a5c2e378a5f7e2a79b1327e3bd"
checksum = "957adcc170f007bf26293c7cd95352c1e471d7bf4295a1dfda796702a79d9648"
dependencies = [
"cranelift-codegen",
"cranelift-entity",
@@ -3517,9 +3547,9 @@ dependencies = [
[[package]]
name = "wasmer-derive"
version = "4.1.2"
version = "4.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25bb1425c9e4dc3e2d3aacd6e82e22e27a3127379e0d09bcbdf25ff376229162"
checksum = "6cd92754ce26eec6136ae6282d96ca59632bd47237ecba82824de16e34b3ce6b"
dependencies = [
"proc-macro-error",
"proc-macro2",
@@ -3529,9 +3559,9 @@ dependencies = [
[[package]]
name = "wasmer-types"
version = "4.1.2"
version = "4.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7e32ed799fa8c0d96ca9615d9ea8006857a0f0c18e7c2ed8082bd5c63a9ea70"
checksum = "d93ca35c9a184e7d561f496c5d6da835c8309a8f76fcca1384f5800127395735"
dependencies = [
"bytecheck",
"enum-iterator",
@@ -3540,19 +3570,20 @@ dependencies = [
"more-asserts",
"rkyv",
"target-lexicon",
"thiserror 1.0.47",
"thiserror 1.0.48",
]
[[package]]
name = "wasmer-vm"
version = "4.1.2"
version = "4.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0847513cb176b5d62a6f65d6ae474594935e726a10e9e3387177d9cbf8b8cda0"
checksum = "200ff711048c0a2cb045d4984a673a1edea18f1e14f024694a43f4951922998b"
dependencies = [
"backtrace",
"cc",
"cfg-if",
"corosensei",
"crossbeam-queue",
"dashmap",
"derivative",
"enum-iterator",
@@ -3565,7 +3596,7 @@ dependencies = [
"more-asserts",
"region",
"scopeguard",
"thiserror 1.0.47",
"thiserror 1.0.48",
"wasmer-types",
"winapi",
]

View File

@@ -27,6 +27,8 @@ use serde::Serialize;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use stats::DaemonStats;
use stats::InsertWorkerStats;
use stats::SeriesByChannelStats;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::net::SocketAddr;
@@ -87,8 +89,8 @@ pub struct Daemon {
last_status_print: SystemTime,
insert_workers_jh: Vec<JoinHandle<Result<(), Error>>>,
stats: Arc<DaemonStats>,
insert_worker_stats: Arc<InsertWorkerStats>,
shutting_down: bool,
insert_rx_weak: WeakReceiver<QueryItem>,
connset_ctrl: CaConnSetCtrl,
connset_status_last: CheckPeriodic,
// TODO should be a stats object?
@@ -100,10 +102,14 @@ impl Daemon {
pub async fn new(opts: DaemonOpts) -> Result<Self, Error> {
let (daemon_ev_tx, daemon_ev_rx) = async_channel::bounded(32);
let series_by_channel_stats = Arc::new(SeriesByChannelStats::new());
let insert_worker_stats = Arc::new(InsertWorkerStats::new());
// TODO keep join handles and await later
let (channel_info_query_tx, jhs, jh) = dbpg::seriesbychannel::start_lookup_workers(4, &opts.pgconf)
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let (channel_info_query_tx, jhs, jh) =
dbpg::seriesbychannel::start_lookup_workers(4, &opts.pgconf, series_by_channel_stats)
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let (query_item_tx, query_item_rx) = async_channel::bounded(opts.insert_item_queue_cap);
let query_item_tx_weak = query_item_tx.downgrade();
@@ -159,9 +165,9 @@ impl Daemon {
opts.scyconf.clone(),
opts.insert_scylla_sessions,
opts.insert_worker_count,
query_item_rx.clone(),
query_item_rx,
insert_worker_opts,
store_stats.clone(),
insert_worker_stats.clone(),
use_rate_limit_queue,
ttls,
)
@@ -214,8 +220,8 @@ impl Daemon {
last_status_print: SystemTime::now(),
insert_workers_jh,
stats: Arc::new(DaemonStats::new()),
insert_worker_stats,
shutting_down: false,
insert_rx_weak: query_item_rx.downgrade(),
connset_ctrl: conn_set_ctrl,
connset_status_last: CheckPeriodic::Waiting(Instant::now()),
insert_workers_running: AtomicU64::new(0),
@@ -231,7 +237,7 @@ impl Daemon {
async fn check_caconn_chans(&mut self, ts1: Instant) -> Result<(), Error> {
match &self.connset_status_last {
CheckPeriodic::Waiting(since) => {
if *since + Duration::from_millis(2000) < ts1 {
if *since + Duration::from_millis(5000) < ts1 {
debug!("======================================== issue health check CaConn");
self.connset_ctrl.check_health().await?;
self.connset_status_last = CheckPeriodic::Ongoing(ts1);
@@ -242,7 +248,7 @@ impl Daemon {
}
CheckPeriodic::Ongoing(since) => {
let dt = ts1.saturating_duration_since(*since);
if dt > Duration::from_millis(4000) {
if dt > Duration::from_millis(2000) {
error!("======================================== CaConnSet has not reported health status since {:.0}", dt.as_secs_f32() * 1e3);
}
}
@@ -254,7 +260,7 @@ impl Daemon {
if self.shutting_down {
let nworkers = self.insert_workers_running.load(atomic::Ordering::Acquire);
let nitems = self
.insert_rx_weak
.query_item_tx_weak
.upgrade()
.map(|x| (x.sender_count(), x.receiver_count(), x.len()));
info!("qu senders A nworkers {} nitems {:?}", nworkers, nitems);
@@ -592,13 +598,19 @@ pub async fn run(opts: CaIngestOpts, channels: Vec<String>) -> Result<(), Error>
let daemon = Daemon::new(opts2).await?;
let tx = daemon.tx.clone();
let daemon_stats = daemon.stats().clone();
let connset_cmd_tx = daemon.connset_ctrl.sender().clone();
let (metrics_shutdown_tx, metrics_shutdown_rx) = async_channel::bounded(8);
let dcom = Arc::new(netfetch::metrics::DaemonComm::new(tx.clone()));
let metrics_jh = {
let stats_set = StatsSet::new(daemon_stats);
let fut = netfetch::metrics::metrics_service(opts.api_bind(), dcom, stats_set, metrics_shutdown_rx);
let stats_set = StatsSet::new(
daemon_stats,
daemon.connset_ctrl.stats().clone(),
daemon.insert_worker_stats.clone(),
);
let fut =
netfetch::metrics::metrics_service(opts.api_bind(), dcom, connset_cmd_tx, stats_set, metrics_shutdown_rx);
tokio::task::spawn(fut)
};
@@ -615,9 +627,6 @@ pub async fn run(opts: CaIngestOpts, channels: Vec<String>) -> Result<(), Error>
}
thr_msg.trigger_fmt("sent ChannelAdd", &[&i as &_]);
i += 1;
// if i % 100 == 0 {
// debug!("sent {} ChannelAdd", i);
// }
}
debug!("{} configured channels applied", channels.len());
daemon_jh.await.map_err(|e| Error::with_msg_no_trace(e.to_string()))??;

View File

@@ -10,6 +10,7 @@ err = { path = "../../daqbuffer/crates/err" }
netpod = { path = "../../daqbuffer/crates/netpod" }
taskrun = { path = "../../daqbuffer/crates/taskrun" }
batchtools = { path = "../batchtools" }
stats = { path = "../stats" }
series = { path = "../series" }
tokio-postgres = { version = "0.7.10", features = ["with-chrono-0_4"] }
futures-util = "0.3"

View File

@@ -10,7 +10,9 @@ use md5::Digest;
use netpod::Database;
use series::series::Existence;
use series::SeriesId;
use stats::SeriesByChannelStats;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
@@ -97,10 +99,15 @@ struct Worker {
qu_select: PgStatement,
qu_insert: PgStatement,
batch_rx: Receiver<Vec<ChannelInfoQuery>>,
stats: Arc<SeriesByChannelStats>,
}
impl Worker {
async fn new(db: &Database, batch_rx: Receiver<Vec<ChannelInfoQuery>>) -> Result<Self, Error> {
async fn new(
db: &Database,
batch_rx: Receiver<Vec<ChannelInfoQuery>>,
stats: Arc<SeriesByChannelStats>,
) -> Result<Self, Error> {
let pg = crate::conn::make_pg_client(db).await?;
let sql = concat!(
"with q1 as (select * from unnest($1::text[], $2::text[], $3::int[], $4::text[], $5::int[])",
@@ -125,6 +132,7 @@ impl Worker {
qu_select,
qu_insert,
batch_rx,
stats,
};
Ok(ret)
}
@@ -319,16 +327,14 @@ impl Worker {
channel: r.channel,
series: r.series,
};
trace3!("try to send result for {:?}", item);
// trace3!("try to send result for {:?}", item);
let fut = r.tx.make_send(Ok(item));
match tokio::time::timeout(Duration::from_millis(2000), fut).await {
Ok(Ok(())) => {}
Ok(Err(_e)) => {
warn!("can not deliver result");
return Err(Error::ChannelError);
}
Err(_) => {
debug!("timeout can not deliver result");
match fut.await {
Ok(()) => {}
Err(_e) => {
//warn!("can not deliver result");
// return Err(Error::ChannelError);
self.stats.res_tx_fail.inc();
}
}
}
@@ -341,6 +347,7 @@ impl Worker {
pub async fn start_lookup_workers(
worker_count: usize,
db: &Database,
stats: Arc<SeriesByChannelStats>,
) -> Result<
(
Sender<ChannelInfoQuery>,
@@ -356,7 +363,7 @@ pub async fn start_lookup_workers(
let (batch_rx, bjh) = batchtools::batcher::batch(inp_cap, timeout, batch_out_cap, query_rx);
let mut jhs = Vec::new();
for _ in 0..worker_count {
let mut worker = Worker::new(db, batch_rx.clone()).await?;
let mut worker = Worker::new(db, batch_rx.clone(), stats.clone()).await?;
let jh = tokio::task::spawn(async move { worker.work().await });
jhs.push(jh);
}

View File

@@ -185,7 +185,6 @@ struct CreatedState {
info_store_msp_last: u32,
}
#[allow(unused)]
#[derive(Clone, Debug)]
enum ChannelState {
Init(ChannelStatusSeriesId),
@@ -392,9 +391,14 @@ impl ConnCommand {
}
}
#[derive(Debug)]
pub struct CheckHealthResult {
pub channel_statuses: BTreeMap<String, ChannelStateInfo>,
}
#[derive(Debug)]
pub enum ConnCommandResultKind {
CheckHealth,
CheckHealth(CheckHealthResult),
}
#[derive(Debug)]
@@ -403,6 +407,17 @@ pub struct ConnCommandResult {
pub kind: ConnCommandResultKind,
}
impl ConnCommandResult {
pub fn id(&self) -> usize {
self.id
}
fn make_id() -> usize {
static ID: AtomicUsize = AtomicUsize::new(0);
ID.fetch_add(1, atomic::Ordering::AcqRel)
}
}
#[derive(Debug)]
pub enum CaConnEventValue {
None,
@@ -562,12 +577,20 @@ impl CaConn {
}
}
// TODO return the result
let mut channel_statuses = BTreeMap::new();
for (k, v) in self.channels.iter() {
let name = self
.name_by_cid(*k)
.map_or_else(|| format!("{k:?}"), ToString::to_string);
let info = v.to_info(name.clone(), self.remote_addr_dbg);
channel_statuses.insert(name, info);
}
let health = CheckHealthResult { channel_statuses };
let res = ConnCommandResult {
id: 0,
kind: ConnCommandResultKind::CheckHealth,
id: ConnCommandResult::make_id(),
kind: ConnCommandResultKind::CheckHealth(health),
};
self.cmd_res_queue.push_back(res);
//self.stats.caconn_command_can_not_reply.inc();
}
fn cmd_find_channel(&self, pattern: &str) {
@@ -869,7 +892,7 @@ impl CaConn {
fn check_channels_alive(&mut self) -> Result<(), Error> {
let tsnow = Instant::now();
trace!("check_channels_alive {addr:?}", addr = &self.remote_addr_dbg);
trace2!("check_channels_alive {addr:?}", addr = &self.remote_addr_dbg);
if self.ioc_ping_last.elapsed() > Duration::from_millis(20000) {
if let Some(started) = self.ioc_ping_start {
if started.elapsed() > Duration::from_millis(4000) {
@@ -884,7 +907,7 @@ impl CaConn {
} else {
self.ioc_ping_start = Some(Instant::now());
if let Some(proto) = &mut self.proto {
debug!("ping to {}", self.remote_addr_dbg);
trace2!("ping to {}", self.remote_addr_dbg);
let msg = CaMsg { ty: CaMsgTy::Echo };
proto.push_out(msg);
} else {
@@ -1762,7 +1785,10 @@ impl Stream for CaConn {
self.stats.caconn_poll_count.inc();
let poll_ts1 = Instant::now();
let ret = loop {
self.thr_msg_poll.trigger("CaConn::poll_next");
let qlen = self.insert_item_queue.len();
if qlen >= 200 {
self.thr_msg_poll.trigger_fmt("CaConn::poll_next", &[&qlen]);
}
break if let CaConnState::EndOfStream = self.state {
Ready(None)
} else if let Err(e) = self.as_mut().handle_own_ticker(cx) {

View File

@@ -1,3 +1,5 @@
use super::conn::ChannelStateInfo;
use super::conn::ConnCommandResult;
use super::findioc::FindIocRes;
use super::statemap;
use super::statemap::ChannelState;
@@ -19,6 +21,7 @@ use crate::throttletrace::ThrottleTrace;
use async_channel::Receiver;
use async_channel::Sender;
use atomic::AtomicUsize;
use core::fmt;
use dbpg::seriesbychannel::BoxedSend;
use dbpg::seriesbychannel::CanSendChannelInfoResult;
use dbpg::seriesbychannel::ChannelInfoQuery;
@@ -32,6 +35,7 @@ use netpod::Database;
use netpod::Shape;
use scywr::iteminsertqueue::ChannelStatusItem;
use scywr::iteminsertqueue::QueryItem;
use serde::Serialize;
use series::series::Existence;
use series::ChannelStatusSeriesId;
use series::SeriesId;
@@ -108,6 +112,7 @@ pub struct CaConnRes {
state: CaConnState,
sender: Sender<ConnCommand>,
stats: Arc<CaConnStats>,
cmd_queue: VecDeque<ConnCommand>,
// TODO await on jh
jh: JoinHandle<Result<(), Error>>,
}
@@ -147,12 +152,29 @@ pub struct ChannelRemove {
name: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct ChannelStatusesResponse {
pub channels_ca_conn: BTreeMap<String, ChannelStateInfo>,
pub channels_ca_conn_set: BTreeMap<String, ChannelState>,
}
pub struct ChannelStatusesRequest {
pub tx: Sender<ChannelStatusesResponse>,
}
impl fmt::Debug for ChannelStatusesRequest {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("ChannelStatusesRequest").finish()
}
}
#[derive(Debug)]
pub enum ConnSetCmd {
ChannelAdd(ChannelAdd),
ChannelRemove(ChannelRemove),
CheckHealth(Instant),
Shutdown,
ChannelStatuses(ChannelStatusesRequest),
}
#[derive(Debug)]
@@ -160,6 +182,10 @@ pub enum CaConnSetEvent {
ConnSetCmd(ConnSetCmd),
}
impl CaConnSetEvent {
// pub fn new_cmd_channel_statuses() -> (Self, Receiver) {}
}
#[derive(Debug, Clone)]
pub enum CaConnSetItem {
Error(Error),
@@ -169,10 +195,15 @@ pub enum CaConnSetItem {
pub struct CaConnSetCtrl {
tx: Sender<CaConnSetEvent>,
rx: Receiver<CaConnSetItem>,
stats: Arc<CaConnSetStats>,
jh: JoinHandle<Result<(), Error>>,
}
impl CaConnSetCtrl {
pub fn sender(&self) -> Sender<CaConnSetEvent> {
self.tx.clone()
}
pub fn receiver(&self) -> Receiver<CaConnSetItem> {
self.rx.clone()
}
@@ -211,6 +242,10 @@ impl CaConnSetCtrl {
self.jh.await.map_err(|e| Error::with_msg_no_trace(e.to_string()))??;
Ok(())
}
pub fn stats(&self) -> &Arc<CaConnSetStats> {
&self.stats
}
}
#[derive(Debug)]
@@ -239,6 +274,7 @@ pub struct CaConnSet {
local_epics_hostname: String,
ca_conn_ress: BTreeMap<SocketAddr, CaConnRes>,
channel_states: ChannelStateMap,
ca_conn_channel_states: BTreeMap<String, ChannelStateInfo>,
connset_inp_rx: Receiver<CaConnSetEvent>,
channel_info_query_queue: VecDeque<ChannelInfoQuery>,
channel_info_query_sender: SenderPolling<ChannelInfoQuery>,
@@ -248,6 +284,7 @@ pub struct CaConnSet {
find_ioc_query_queue: VecDeque<IocAddrQuery>,
find_ioc_query_sender: SenderPolling<IocAddrQuery>,
find_ioc_res_rx: Receiver<VecDeque<FindIocRes>>,
storage_insert_tx: Sender<QueryItem>,
storage_insert_queue: VecDeque<QueryItem>,
storage_insert_sender: SenderPolling<QueryItem>,
ca_conn_res_tx: Sender<(SocketAddr, CaConnEvent)>,
@@ -257,11 +294,12 @@ pub struct CaConnSet {
shutdown_stopping: bool,
shutdown_done: bool,
chan_check_next: Option<Channel>,
stats: CaConnSetStats,
stats: Arc<CaConnSetStats>,
ioc_finder_jh: JoinHandle<Result<(), Error>>,
await_ca_conn_jhs: VecDeque<(SocketAddr, JoinHandle<Result<(), Error>>)>,
thr_msg_poll_1: ThrottleTrace,
thr_msg_storage_len: ThrottleTrace,
did_connset_out_queue: bool,
}
impl CaConnSet {
@@ -272,18 +310,20 @@ impl CaConnSet {
channel_info_query_tx: Sender<ChannelInfoQuery>,
pgconf: Database,
) -> CaConnSetCtrl {
let (ca_conn_res_tx, ca_conn_res_rx) = async_channel::bounded(5000);
let (connset_inp_tx, connset_inp_rx) = async_channel::bounded(5000);
let (connset_out_tx, connset_out_rx) = async_channel::bounded(5000);
let (find_ioc_res_tx, find_ioc_res_rx) = async_channel::bounded(5000);
let (ca_conn_res_tx, ca_conn_res_rx) = async_channel::bounded(200);
let (connset_inp_tx, connset_inp_rx) = async_channel::bounded(200);
let (connset_out_tx, connset_out_rx) = async_channel::bounded(200);
let (find_ioc_res_tx, find_ioc_res_rx) = async_channel::bounded(400);
let (find_ioc_query_tx, ioc_finder_jh) =
super::finder::start_finder(find_ioc_res_tx.clone(), backend.clone(), pgconf);
let (channel_info_res_tx, channel_info_res_rx) = async_channel::bounded(5000);
let (channel_info_res_tx, channel_info_res_rx) = async_channel::bounded(400);
let stats = Arc::new(CaConnSetStats::new());
let connset = Self {
backend,
local_epics_hostname,
ca_conn_ress: BTreeMap::new(),
channel_states: ChannelStateMap::new(),
ca_conn_channel_states: BTreeMap::new(),
connset_inp_rx,
channel_info_query_queue: VecDeque::new(),
channel_info_query_sender: SenderPolling::new(channel_info_query_tx.clone()),
@@ -293,6 +333,7 @@ impl CaConnSet {
find_ioc_query_queue: VecDeque::new(),
find_ioc_query_sender: SenderPolling::new(find_ioc_query_tx),
find_ioc_res_rx,
storage_insert_tx: storage_insert_tx.clone(),
storage_insert_queue: VecDeque::new(),
storage_insert_sender: SenderPolling::new(storage_insert_tx),
ca_conn_res_tx,
@@ -300,7 +341,7 @@ impl CaConnSet {
shutdown_stopping: false,
shutdown_done: false,
chan_check_next: None,
stats: CaConnSetStats::new(),
stats: stats.clone(),
connset_out_tx,
connset_out_queue: VecDeque::new(),
// connset_out_sender: SenderPolling::new(connset_out_tx),
@@ -308,17 +349,20 @@ impl CaConnSet {
await_ca_conn_jhs: VecDeque::new(),
thr_msg_poll_1: ThrottleTrace::new(Duration::from_millis(2000)),
thr_msg_storage_len: ThrottleTrace::new(Duration::from_millis(1000)),
did_connset_out_queue: false,
};
// TODO await on jh
let jh = tokio::spawn(CaConnSet::run(connset));
CaConnSetCtrl {
tx: connset_inp_tx,
rx: connset_out_rx,
stats,
jh,
}
}
async fn run(mut this: CaConnSet) -> Result<(), Error> {
debug!("CaConnSet run begin");
loop {
let x = this.next().await;
match x {
@@ -331,6 +375,7 @@ impl CaConnSet {
// this.find_ioc_query_tx.sender_count(),
// this.find_ioc_query_tx.receiver_count()
// );
debug!("CaConnSet EndOfStream");
this.ioc_finder_jh
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))??;
@@ -353,6 +398,7 @@ impl CaConnSet {
// ConnSetCmd::SeriesLookupResult(x) => self.handle_series_lookup_result(x).await,
ConnSetCmd::CheckHealth(ts1) => self.handle_check_health(ts1),
ConnSetCmd::Shutdown => self.handle_shutdown(),
ConnSetCmd::ChannelStatuses(x) => self.handle_channel_statuses_req(x),
},
}
}
@@ -361,7 +407,7 @@ impl CaConnSet {
match ev.value {
CaConnEventValue::None => Ok(()),
CaConnEventValue::EchoTimeout => todo!(),
CaConnEventValue::ConnCommandResult(_) => todo!(),
CaConnEventValue::ConnCommandResult(x) => self.handle_conn_command_result(addr, x),
CaConnEventValue::QueryItem(item) => {
self.storage_insert_queue.push_back(item);
Ok(())
@@ -405,6 +451,7 @@ impl CaConnSet {
since: SystemTime::now(),
}),
});
self.stats.channel_wait_for_status_id.inc();
let item = ChannelInfoQuery {
backend: add.backend,
channel: add.name,
@@ -436,6 +483,7 @@ impl CaConnSet {
},
},
};
self.stats.channel_wait_for_address.inc();
let qu = IocAddrQuery { name: add.name };
self.find_ioc_query_queue.push_back(qu);
} else {
@@ -562,12 +610,44 @@ impl CaConnSet {
self.thr_msg_storage_len
.trigger_fmt("msg", &[&self.storage_insert_sender.len()]);
debug!("TODO handle_check_health");
// Trigger already the next health check, but use the current data that we have.
// TODO try to deliver a command to CaConn
// Add some queue for commands to CaConn to the ress.
// Fail here if that queue gets too long.
// Try to push the commands periodically.
for (_, res) in self.ca_conn_ress.iter_mut() {
let item = ConnCommand::check_health();
res.cmd_queue.push_back(item);
}
let ts2 = Instant::now();
let item = CaConnSetItem::Healthy(ts1, ts2);
self.connset_out_queue.push_back(item);
Ok(())
}
fn handle_channel_statuses_req(&mut self, req: ChannelStatusesRequest) -> Result<(), Error> {
if self.shutdown_stopping {
return Ok(());
}
debug!("handle_channel_statuses_req");
let item = ChannelStatusesResponse {
channels_ca_conn: self.ca_conn_channel_states.clone(),
channels_ca_conn_set: self
.channel_states
.inner()
.iter()
.map(|(k, v)| (k.id().to_string(), v.clone()))
.collect(),
};
if req.tx.try_send(item).is_err() {
self.stats.response_tx_fail.inc();
}
Ok(())
}
fn handle_shutdown(&mut self) -> Result<(), Error> {
if self.shutdown_stopping {
return Ok(());
@@ -585,22 +665,45 @@ impl CaConnSet {
Ok(())
}
fn handle_conn_command_result(&mut self, addr: SocketAddr, res: ConnCommandResult) -> Result<(), Error> {
use crate::ca::conn::ConnCommandResultKind::*;
match res.kind {
CheckHealth(health) => {
// debug!("handle_conn_command_result {addr}");
for (k, v) in health.channel_statuses {
self.ca_conn_channel_states.insert(k, v);
}
Ok(())
}
}
}
fn handle_ca_conn_eos(&mut self, addr: SocketAddr) -> Result<(), Error> {
debug!("handle_ca_conn_eos {addr}");
trace2!("handle_ca_conn_eos {addr}");
if let Some(e) = self.ca_conn_ress.remove(&addr) {
self.stats.ca_conn_eos_ok().inc();
self.await_ca_conn_jhs.push_back((addr, e.jh));
} else {
self.stats.ca_conn_task_eos_non_exist.inc();
self.stats.ca_conn_eos_unexpected().inc();
warn!("end-of-stream received for non-existent CaConn {addr}");
}
self.remove_status_for_addr(addr)?;
debug!("still CaConn left {}", self.ca_conn_ress.len());
Ok(())
}
fn remove_status_for_addr(&mut self, addr: SocketAddr) -> Result<(), Error> {
self.ca_conn_channel_states
.retain(|_k, v| SocketAddr::V4(v.addr) != addr);
Ok(())
}
fn ready_for_end_of_stream(&self) -> bool {
if self.ca_conn_ress.len() > 0 {
if !self.shutdown_stopping {
false
} else if self.await_ca_conn_jhs.len() > 1 {
} else if self.ca_conn_ress.len() > 0 {
false
} else if self.await_ca_conn_jhs.len() > 0 {
false
} else {
true
@@ -626,12 +729,14 @@ impl CaConnSet {
);
let conn_tx = conn.conn_command_tx();
let conn_stats = conn.stats();
let ca_conn_res_tx = self.ca_conn_res_tx.clone();
let jh = tokio::spawn(Self::ca_conn_item_merge(conn, ca_conn_res_tx, addr));
let tx1 = self.ca_conn_res_tx.clone();
let tx2 = self.storage_insert_tx.clone();
let jh = tokio::spawn(Self::ca_conn_item_merge(conn, tx1, tx2, addr));
let ca_conn_res = CaConnRes {
state: CaConnState::new(CaConnStateValue::Fresh),
sender: conn_tx,
stats: conn_stats,
cmd_queue: VecDeque::new(),
jh,
};
Ok(ca_conn_res)
@@ -639,7 +744,8 @@ impl CaConnSet {
async fn ca_conn_item_merge(
conn: CaConn,
tx: Sender<(SocketAddr, CaConnEvent)>,
tx1: Sender<(SocketAddr, CaConnEvent)>,
tx2: Sender<QueryItem>,
addr: SocketAddr,
) -> Result<(), Error> {
trace2!("ca_conn_consumer begin {}", addr);
@@ -650,7 +756,23 @@ impl CaConnSet {
match item {
Ok(item) => {
stats.conn_item_count.inc();
tx.send((addr, item)).await?;
match item.value {
CaConnEventValue::QueryItem(x) => {
tx2.send(x).await;
}
CaConnEventValue::None => {
tx1.send((addr, item)).await;
}
CaConnEventValue::EchoTimeout => {
tx1.send((addr, item)).await;
}
CaConnEventValue::ConnCommandResult(_) => {
tx1.send((addr, item)).await;
}
CaConnEventValue::EndOfStream => {
tx1.send((addr, item)).await;
}
}
}
Err(e) => {
error!("CaConn gives error: {e:?}");
@@ -659,7 +781,7 @@ impl CaConnSet {
}
}
trace2!("ca_conn_consumer ended {}", addr);
tx.send((
tx1.send((
addr,
CaConnEvent {
ts: Instant::now(),
@@ -932,7 +1054,9 @@ impl CaConnSet {
Ok(())
}
// TODO should use both counters and values
fn update_channel_state_counts(&mut self) -> (u64,) {
return (0,);
let mut unknown_address = 0;
let mut search_pending = 0;
let mut unassigned = 0;
@@ -979,6 +1103,31 @@ impl CaConnSet {
self.stats.channel_assigned.__set(assigned);
(search_pending,)
}
fn try_push_ca_conn_cmds(&mut self) {
// debug!("try_push_ca_conn_cmds");
for (_, v) in self.ca_conn_ress.iter_mut() {
loop {
break if let Some(item) = v.cmd_queue.pop_front() {
match v.sender.try_send(item) {
Ok(()) => continue,
Err(e) => match e {
async_channel::TrySendError::Full(e) => {
self.stats.try_push_ca_conn_cmds_full.inc();
v.cmd_queue.push_front(e);
break;
}
async_channel::TrySendError::Closed(_) => {
// TODO
self.stats.try_push_ca_conn_cmds_closed.inc();
break;
}
},
}
};
}
}
}
}
impl Stream for CaConnSet {
@@ -986,13 +1135,39 @@ impl Stream for CaConnSet {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
self.stats.poll_fn_begin().inc();
debug!("CaConnSet poll");
loop {
self.thr_msg_poll_1.trigger("CaConnSet");
self.stats.poll_loop_begin().inc();
let n1 = self.channel_info_query_queue.len();
let p2 = self.channel_info_query_sender.len();
let p3 = self.channel_info_res_rx.len();
self.thr_msg_poll_1
.trigger_fmt("CaConnSet channel_info_query_queue", &[&n1, &p2, &p3]);
self.stats.storage_insert_tx_len.set(self.storage_insert_tx.len() as _);
self.stats
.channel_info_query_sender_len
.set(self.channel_info_query_sender.len().unwrap_or(0) as _);
self.stats
.channel_info_res_tx_len
.set(self.channel_info_res_tx.len() as _);
self.stats
.find_ioc_query_sender_len
.set(self.find_ioc_query_sender.len().unwrap_or(0) as _);
self.stats.ca_conn_res_tx_len.set(self.ca_conn_res_tx.len() as _);
let mut have_pending = false;
if let Some(item) = self.connset_out_queue.pop_front() {
break Ready(Some(item));
self.try_push_ca_conn_cmds();
if self.did_connset_out_queue {
self.did_connset_out_queue = false;
} else {
if let Some(item) = self.connset_out_queue.pop_front() {
self.did_connset_out_queue = true;
break Ready(Some(item));
}
}
if let Some((addr, jh)) = self.await_ca_conn_jhs.front_mut() {
@@ -1016,19 +1191,23 @@ impl Stream for CaConnSet {
}
}
}
Pending => {}
Pending => {
have_pending = true;
}
}
}
// TODO should never send from here, track.
if self.storage_insert_sender.is_idle() {
if let Some(item) = self.storage_insert_queue.pop_front() {
self.stats.logic_error().inc();
self.storage_insert_sender.send(item);
}
}
if self.storage_insert_sender.is_sending() {
match self.storage_insert_sender.poll_unpin(cx) {
Ready(Ok(())) => continue,
Ready(Err(e)) => {
Ready(Ok(())) => {}
Ready(Err(_)) => {
let e = Error::with_msg_no_trace("can not send into channel");
error!("{e}");
break Ready(Some(CaConnSetItem::Error(e)));
@@ -1046,8 +1225,8 @@ impl Stream for CaConnSet {
}
if self.find_ioc_query_sender.is_sending() {
match self.find_ioc_query_sender.poll_unpin(cx) {
Ready(Ok(())) => continue,
Ready(Err(e)) => {
Ready(Ok(())) => {}
Ready(Err(_)) => {
let e = Error::with_msg_no_trace("can not send into channel");
error!("{e}");
break Ready(Some(CaConnSetItem::Error(e)));
@@ -1065,8 +1244,8 @@ impl Stream for CaConnSet {
}
if self.channel_info_query_sender.is_sending() {
match self.channel_info_query_sender.poll_unpin(cx) {
Ready(Ok(())) => continue,
Ready(Err(e)) => {
Ready(Ok(())) => {}
Ready(Err(_)) => {
let e = Error::with_msg_no_trace("can not send into channel");
error!("{e}");
break Ready(Some(CaConnSetItem::Error(e)));
@@ -1079,7 +1258,7 @@ impl Stream for CaConnSet {
let item = match self.find_ioc_res_rx.poll_next_unpin(cx) {
Ready(Some(x)) => match self.handle_ioc_query_result(x) {
Ok(()) => continue,
Ok(()) => Ready(None),
Err(e) => Ready(Some(CaConnSetItem::Error(e))),
},
Ready(None) => Ready(None),
@@ -1095,7 +1274,7 @@ impl Stream for CaConnSet {
let item = match self.ca_conn_res_rx.poll_next_unpin(cx) {
Ready(Some((addr, ev))) => match self.handle_ca_conn_event(addr, ev) {
Ok(()) => continue,
Ok(()) => Ready(None),
Err(e) => Ready(Some(CaConnSetItem::Error(e))),
},
Ready(None) => Ready(None),
@@ -1111,7 +1290,7 @@ impl Stream for CaConnSet {
let item = match self.channel_info_res_rx.poll_next_unpin(cx) {
Ready(Some(x)) => match self.handle_series_lookup_result(x) {
Ok(()) => continue,
Ok(()) => Ready(None),
Err(e) => Ready(Some(CaConnSetItem::Error(e))),
},
Ready(None) => Ready(None),
@@ -1127,7 +1306,7 @@ impl Stream for CaConnSet {
let item = match self.connset_inp_rx.poll_next_unpin(cx) {
Ready(Some(x)) => match self.handle_event(x) {
Ok(()) => continue,
Ok(()) => Ready(None),
Err(e) => Ready(Some(CaConnSetItem::Error(e))),
},
Ready(None) => Ready(None),
@@ -1141,19 +1320,21 @@ impl Stream for CaConnSet {
_ => {}
}
break if have_pending {
if self.shutdown_stopping && self.ready_for_end_of_stream() {
Ready(None)
break if self.ready_for_end_of_stream() {
if have_pending {
self.stats.ready_for_end_of_stream_with_pending().inc();
} else {
Pending
self.stats.ready_for_end_of_stream_no_pending().inc();
}
} else if self.shutdown_stopping && self.ready_for_end_of_stream() {
debug!("nothing to do but shutdown");
Ready(None)
} else {
let e = Error::with_msg_no_trace("connset not pending and not shutdown");
error!("{e}");
Ready(Some(CaConnSetItem::Error(e)))
if have_pending {
self.stats.poll_pending().inc();
Pending
} else {
self.stats.poll_reloop().inc();
continue;
}
};
}
}

View File

@@ -33,23 +33,20 @@ impl CaConnState {
}
}
#[derive(Clone, Debug, Serialize)]
#[derive(Debug, Clone, Serialize)]
pub enum ConnectionStateValue {
Unconnected,
Connected {
//#[serde(with = "serde_Instant")]
since: SystemTime,
},
Connected,
}
#[derive(Clone, Debug, Serialize)]
#[derive(Debug, Clone, Serialize)]
pub struct ConnectionState {
//#[serde(with = "serde_Instant")]
pub updated: SystemTime,
pub value: ConnectionStateValue,
}
#[derive(Clone, Debug, Serialize)]
#[derive(Debug, Clone, Serialize)]
pub enum WithAddressState {
Unassigned {
//#[serde(with = "serde_Instant")]
@@ -58,7 +55,7 @@ pub enum WithAddressState {
Assigned(ConnectionState),
}
#[derive(Clone, Debug, Serialize)]
#[derive(Debug, Clone, Serialize)]
pub enum WithStatusSeriesIdStateInner {
UnknownAddress {
since: SystemTime,
@@ -76,12 +73,12 @@ pub enum WithStatusSeriesIdStateInner {
},
}
#[derive(Clone, Debug, Serialize)]
#[derive(Debug, Clone, Serialize)]
pub struct WithStatusSeriesIdState {
pub inner: WithStatusSeriesIdStateInner,
}
#[derive(Clone, Debug)]
#[derive(Debug, Clone, Serialize)]
pub enum ActiveChannelState {
Init {
since: SystemTime,
@@ -95,18 +92,18 @@ pub enum ActiveChannelState {
},
}
#[derive(Debug)]
#[derive(Debug, Clone, Serialize)]
pub enum ChannelStateValue {
Active(ActiveChannelState),
ToRemove { addr: Option<SocketAddrV4> },
}
#[derive(Debug)]
#[derive(Debug, Clone, Serialize)]
pub struct ChannelState {
pub value: ChannelStateValue,
}
#[derive(Debug)]
#[derive(Debug, Clone, Serialize)]
pub struct ChannelStateMap {
map: BTreeMap<Channel, ChannelState>,
}

View File

@@ -1 +0,0 @@

View File

@@ -2,7 +2,6 @@ pub mod ca;
pub mod conf;
pub mod daemon_common;
pub mod errconv;
pub mod insertworker;
pub mod linuxhelper;
pub mod metrics;
pub mod netbuf;

View File

@@ -1,3 +1,8 @@
use crate::ca::conn::ChannelStateInfo;
use crate::ca::connset::CaConnSetEvent;
use crate::ca::connset::ChannelStatusesRequest;
use crate::ca::connset::ChannelStatusesResponse;
use crate::ca::connset::ConnSetCmd;
use crate::ca::METRICS;
use crate::daemon_common::DaemonEvent;
use async_channel::Receiver;
@@ -10,10 +15,12 @@ use log::*;
use scywr::iteminsertqueue::QueryItem;
use serde::Deserialize;
use serde::Serialize;
use stats::CaConnSetStats;
use stats::CaConnStats;
use stats::CaConnStatsAgg;
use stats::CaConnStatsAggDiff;
use stats::DaemonStats;
use stats::InsertWorkerStats;
use std::collections::HashMap;
use std::net::SocketAddrV4;
use std::sync::atomic::Ordering;
@@ -23,11 +30,21 @@ use taskrun::tokio;
pub struct StatsSet {
daemon: Arc<DaemonStats>,
ca_conn_set: Arc<CaConnSetStats>,
insert_worker_stats: Arc<InsertWorkerStats>,
}
impl StatsSet {
pub fn new(daemon: Arc<DaemonStats>) -> Self {
Self { daemon }
pub fn new(
daemon: Arc<DaemonStats>,
ca_conn_set: Arc<CaConnSetStats>,
insert_worker_stats: Arc<InsertWorkerStats>,
) -> Self {
Self {
daemon,
ca_conn_set,
insert_worker_stats,
}
}
}
@@ -102,13 +119,28 @@ async fn channel_state(params: HashMap<String, String>, dcom: Arc<DaemonComm>) -
axum::Json(false)
}
async fn channel_states(
params: HashMap<String, String>,
dcom: Arc<DaemonComm>,
) -> axum::Json<Vec<crate::ca::conn::ChannelStateInfo>> {
let limit = params.get("limit").map(|x| x.parse()).unwrap_or(Ok(40)).unwrap_or(40);
error!("TODO channel_state");
axum::Json(Vec::new())
// axum::Json<ChannelStatusesResponse>
async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSetEvent>) -> String {
let limit = params
.get("limit")
.map(|x| x.parse().ok())
.unwrap_or(None)
.unwrap_or(40);
let (tx2, rx2) = async_channel::bounded(1);
let req = ChannelStatusesRequest { tx: tx2 };
let item = CaConnSetEvent::ConnSetCmd(ConnSetCmd::ChannelStatuses(req));
// TODO handle error
tx.send(item).await;
let res = rx2.recv().await.unwrap();
match serde_json::to_string(&res) {
Ok(x) => x,
Err(e) => {
error!("Serialize error {e}");
Err::<(), _>(e).unwrap();
panic!();
}
}
// axum::Json(res)
}
async fn extra_inserts_conf_set(v: ExtraInsertsConf, dcom: Arc<DaemonComm>) -> axum::Json<bool> {
@@ -136,7 +168,7 @@ impl DaemonComm {
}
}
fn make_routes(dcom: Arc<DaemonComm>, stats_set: StatsSet) -> axum::Router {
fn make_routes(dcom: Arc<DaemonComm>, connset_cmd_tx: Sender<CaConnSetEvent>, stats_set: StatsSet) -> axum::Router {
use axum::extract;
use axum::routing::get;
use axum::routing::put;
@@ -162,8 +194,12 @@ fn make_routes(dcom: Arc<DaemonComm>, stats_set: StatsSet) -> axum::Router {
get({
//
|| async move {
info!("metrics");
let s1 = stats_set.daemon.prometheus();
debug!("metrics");
let mut s1 = stats_set.daemon.prometheus();
let s2 = stats_set.ca_conn_set.prometheus();
let s3 = stats_set.insert_worker_stats.prometheus();
s1.push_str(&s2);
s1.push_str(&s3);
s1
}
}),
@@ -186,7 +222,8 @@ fn make_routes(dcom: Arc<DaemonComm>, stats_set: StatsSet) -> axum::Router {
"/daqingest/channel/states",
get({
let dcom = dcom.clone();
|Query(params): Query<HashMap<String, String>>| channel_states(params, dcom)
let tx = connset_cmd_tx.clone();
|Query(params): Query<HashMap<String, String>>| channel_states(params, tx)
}),
)
.route(
@@ -248,11 +285,12 @@ fn make_routes(dcom: Arc<DaemonComm>, stats_set: StatsSet) -> axum::Router {
pub async fn metrics_service(
bind_to: String,
dcom: Arc<DaemonComm>,
connset_cmd_tx: Sender<CaConnSetEvent>,
stats_set: StatsSet,
shutdown_signal: Receiver<u32>,
) {
let addr = bind_to.parse().unwrap();
let router = make_routes(dcom, stats_set).into_make_service();
let router = make_routes(dcom, connset_cmd_tx, stats_set).into_make_service();
axum::Server::bind(&addr)
.serve(router)
.with_graceful_shutdown(async move {

View File

@@ -36,6 +36,11 @@ impl ThrottleTrace {
debug!("{} {:?} (count {})", msg, params[0], self.count);
} else if params.len() == 2 {
debug!("{} {:?} {:?} (count {})", msg, params[0], params[1], self.count);
} else if params.len() == 3 {
debug!(
"{} {:?} {:?} {:?} (count {})",
msg, params[0], params[1], params[2], self.count
);
} else {
debug!("{} (count {})", msg, self.count);
}

View File

@@ -139,6 +139,9 @@ impl ConnTimeBin {
self.tick_fn = Box::new(tick::<ST>);
self.did_setup = true;
}
STRING => {
trace2!("TODO setup_event_acc {:?} {:?}", scalar_type, shape);
}
_ => {
warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape);
}

View File

@@ -11,6 +11,7 @@ use netpod::timeunits::MS;
use netpod::timeunits::SEC;
use netpod::ScyllaConfig;
use stats::CaConnStats;
use stats::InsertWorkerStats;
use std::sync::atomic;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
@@ -20,26 +21,26 @@ use std::time::Instant;
use taskrun::tokio;
use taskrun::tokio::task::JoinHandle;
fn stats_inc_for_err(stats: &stats::CaConnStats, err: &crate::iteminsertqueue::Error) {
fn stats_inc_for_err(stats: &stats::InsertWorkerStats, err: &crate::iteminsertqueue::Error) {
use crate::iteminsertqueue::Error;
match err {
Error::DbOverload => {
stats.store_worker_insert_overload.inc();
stats.db_overload().inc();
}
Error::DbTimeout => {
stats.store_worker_insert_timeout.inc();
stats.db_timeout().inc();
}
Error::DbUnavailable => {
stats.store_worker_insert_unavailable.inc();
stats.db_unavailable().inc();
}
Error::DbError(e) => {
if false {
warn!("db error {e}");
}
stats.store_worker_insert_error.inc();
stats.db_error().inc();
}
Error::QueryError(_) => {
stats.store_worker_insert_error.inc();
stats.query_error().inc();
}
}
}
@@ -75,7 +76,7 @@ async fn rate_limiter_worker(
rate: Arc<AtomicU64>,
inp: Receiver<QueryItem>,
tx: Sender<QueryItem>,
stats: Arc<stats::CaConnStats>,
stats: Arc<stats::InsertWorkerStats>,
) {
let mut ts_forward_last = Instant::now();
let mut ivl_ema = stats::Ema64::with_k(0.00001);
@@ -103,7 +104,7 @@ async fn rate_limiter_worker(
let ivl2 = Duration::from_nanos(ema2.ema() as u64);
if allowed_to_drop && ivl2 < dt_min {
//tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await;
stats.store_worker_ratelimit_drop.inc();
stats.ratelimit_drop().inc();
} else {
if tx.send(item).await.is_err() {
break;
@@ -113,7 +114,7 @@ async fn rate_limiter_worker(
let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64;
ivl_ema.update(dt_ns.min(MS * 100) as f32);
ts_forward_last = tsnow;
stats.inter_ivl_ema.set(ivl_ema.ema() as u64);
// stats.inter_ivl_ema.set(ivl_ema.ema() as u64);
}
}
}
@@ -123,7 +124,7 @@ async fn rate_limiter_worker(
fn rate_limiter(
inp: Receiver<QueryItem>,
opts: Arc<InsertWorkerOpts>,
stats: Arc<stats::CaConnStats>,
stats: Arc<stats::InsertWorkerStats>,
) -> Receiver<QueryItem> {
let (tx, rx) = async_channel::bounded(inp.capacity().unwrap_or(256));
tokio::spawn(rate_limiter_worker(opts.store_workers_rate.clone(), inp, tx, stats));
@@ -136,7 +137,7 @@ async fn worker(
ttls: Ttls,
insert_worker_opts: Arc<InsertWorkerOpts>,
data_store: Arc<DataStore>,
stats: Arc<CaConnStats>,
stats: Arc<InsertWorkerStats>,
) -> Result<(), Error> {
insert_worker_opts
.insert_workers_running
@@ -146,7 +147,7 @@ async fn worker(
let mut i1 = 0;
loop {
let item = if let Ok(item) = item_inp.recv().await {
stats.store_worker_item_recv.inc();
stats.item_recv.inc();
item
} else {
break;
@@ -155,7 +156,7 @@ async fn worker(
QueryItem::ConnectionStatus(item) => {
match insert_connection_status(item, ttls.index, &data_store, &stats).await {
Ok(_) => {
stats.connection_status_insert_done.inc();
stats.inserted_connection_status().inc();
backoff = backoff_0;
}
Err(e) => {
@@ -167,7 +168,7 @@ async fn worker(
QueryItem::ChannelStatus(item) => {
match insert_channel_status(item, ttls.index, &data_store, &stats).await {
Ok(_) => {
stats.channel_status_insert_done.inc();
stats.inserted_channel_status().inc();
backoff = backoff_0;
}
Err(e) => {
@@ -177,22 +178,26 @@ async fn worker(
}
}
QueryItem::Insert(item) => {
let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire);
if i1 % 1000 < insert_frac {
match insert_item(item, ttls.index, ttls.d0, ttls.d1, &data_store, &stats).await {
Ok(_) => {
stats.store_worker_insert_done.inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &e);
back_off_sleep(&mut backoff).await;
}
}
if true {
stats.inserted_values().inc();
} else {
stats.store_worker_fraction_drop.inc();
let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire);
if i1 % 1000 < insert_frac {
match insert_item(item, ttls.index, ttls.d0, ttls.d1, &data_store, &stats).await {
Ok(_) => {
stats.inserted_values().inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &e);
back_off_sleep(&mut backoff).await;
}
}
} else {
stats.fraction_drop().inc();
}
i1 += 1;
}
i1 += 1;
}
QueryItem::Mute(item) => {
let values = (
@@ -206,7 +211,7 @@ async fn worker(
let qres = data_store.scy.execute(&data_store.qu_insert_muted, values).await;
match qres {
Ok(_) => {
stats.mute_insert_done.inc();
stats.inserted_mute().inc();
backoff = backoff_0;
}
Err(e) => {
@@ -230,7 +235,7 @@ async fn worker(
.await;
match qres {
Ok(_) => {
stats.ivl_insert_done.inc();
stats.inserted_interval().inc();
backoff = backoff_0;
}
Err(e) => {
@@ -252,7 +257,7 @@ async fn worker(
let qres = data_store.scy.execute(&data_store.qu_insert_channel_ping, params).await;
match qres {
Ok(_) => {
stats.channel_info_insert_done.inc();
stats.inserted_channel_info().inc();
backoff = backoff_0;
}
Err(e) => {
@@ -281,7 +286,7 @@ async fn worker(
.await;
match qres {
Ok(_) => {
stats.store_worker_insert_binned_done.inc();
stats.inserted_binned().inc();
backoff = backoff_0;
}
Err(e) => {
@@ -305,7 +310,7 @@ pub async fn spawn_scylla_insert_workers(
insert_worker_count: usize,
item_inp: Receiver<QueryItem>,
insert_worker_opts: Arc<InsertWorkerOpts>,
store_stats: Arc<stats::CaConnStats>,
store_stats: Arc<stats::InsertWorkerStats>,
use_rate_limit_queue: bool,
ttls: Ttls,
) -> Result<Vec<JoinHandle<Result<(), Error>>>, Error> {

View File

@@ -13,6 +13,7 @@ use scylla::transport::errors::DbError;
use scylla::transport::errors::QueryError;
use series::SeriesId;
use stats::CaConnStats;
use stats::InsertWorkerStats;
use std::net::SocketAddrV4;
use std::sync::atomic;
use std::sync::atomic::AtomicU64;
@@ -388,12 +389,12 @@ pub async fn insert_item(
ttl_0d: Duration,
ttl_1d: Duration,
data_store: &DataStore,
stats: &CaConnStats,
stats: &InsertWorkerStats,
) -> Result<(), Error> {
if item.msp_bump {
let params = (item.series.id() as i64, item.ts_msp as i64, ttl_index.as_secs() as i32);
data_store.scy.execute(&data_store.qu_insert_ts_msp, params).await?;
stats.inserts_msp.inc();
stats.inserts_msp().inc();
}
if let Some(ts_msp_grid) = item.ts_msp_grid {
let params = (
@@ -408,7 +409,7 @@ pub async fn insert_item(
.scy
.execute(&data_store.qu_insert_series_by_ts_msp, params)
.await?;
stats.inserts_msp_grid.inc();
stats.inserts_msp_grid().inc();
}
use DataValue::*;
match item.val {
@@ -467,7 +468,7 @@ pub async fn insert_item(
}
}
}
stats.inserts_val.inc();
stats.inserts_value().inc();
Ok(())
}
@@ -475,7 +476,7 @@ pub async fn insert_connection_status(
item: ConnectionStatusItem,
ttl: Duration,
data_store: &DataStore,
_stats: &CaConnStats,
_stats: &InsertWorkerStats,
) -> Result<(), Error> {
let tsunix = item.ts.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::ZERO);
let secs = tsunix.as_secs() * netpod::timeunits::SEC;
@@ -497,7 +498,7 @@ pub async fn insert_channel_status(
item: ChannelStatusItem,
ttl: Duration,
data_store: &DataStore,
_stats: &CaConnStats,
_stats: &InsertWorkerStats,
) -> Result<(), Error> {
let tsunix = item.ts.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::ZERO);
let secs = tsunix.as_secs() * netpod::timeunits::SEC;

View File

@@ -221,11 +221,60 @@ stats_proc::stats_struct!((
ca_conn_task_join_done_ok,
ca_conn_task_join_done_err,
ca_conn_task_join_err,
ca_conn_task_eos_non_exist,
ca_conn_eos_ok,
ca_conn_eos_unexpected,
response_tx_fail,
try_push_ca_conn_cmds_full,
try_push_ca_conn_cmds_closed,
channel_wait_for_status_id,
channel_wait_for_address,
logic_error,
ready_for_end_of_stream_with_pending,
ready_for_end_of_stream_no_pending,
poll_fn_begin,
poll_loop_begin,
poll_pending,
poll_reloop,
),
values(
storage_insert_tx_len,
channel_info_query_sender_len,
channel_info_res_tx_len,
find_ioc_query_sender_len,
ca_conn_res_tx_len,
),
),
// agg(name(CaConnSetStatsAgg), parent(CaConnSetStats)),
// diff(name(CaConnSetStatsDiff), input(CaConnSetStats)),
stats_struct(
name(SeriesByChannelStats),
prefix(seriesbychannel),
counters(res_tx_fail, res_tx_timeout,),
),
stats_struct(
name(InsertWorkerStats),
prefix(insert_worker),
counters(
item_recv,
inserted_values,
inserted_connection_status,
inserted_channel_status,
fraction_drop,
inserted_mute,
inserted_interval,
inserted_channel_info,
inserted_binned,
db_overload,
db_timeout,
db_unavailable,
db_error,
query_error,
inserts_msp,
inserts_msp_grid,
inserts_value,
ratelimit_drop,
)
),
));
// #[cfg(DISABLED)]

View File

@@ -101,14 +101,14 @@ fn stats_struct_impl(st: &StatsStructDef) -> String {
}
for x in &st.values {
let n = x.to_string();
let nn = if let Some(pre) = &st.prefix {
format!("{pre}_{n}")
let pre = if let Some(x) = &st.prefix {
format!("_{}", x)
} else {
n.to_string()
String::new()
};
buf.push_str(&format!(
"ret.push_str(&format!(\"daqingest_{} {{}}\\n\", self.{}.load()));\n",
nn, n
"ret.push_str(&format!(\"daqingest{}_{} {{}}\\n\", self.{}.load()));\n",
pre, n, n
));
}
format!(