diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index 1266713..db598bb 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -4,9 +4,9 @@ version = 3 [[package]] name = "addr2line" -version = "0.19.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a76fd60b23679b7d19bd066031410fb7e458ccc5e958eb5c325888ce4baedc97" +checksum = "f4fa78e18c64fce05e902adecd7a5eed15a5e0a3439f7b0e169f0252214865e3" dependencies = [ "gimli 0.27.3", ] @@ -112,9 +112,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.71" +version = "1.0.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" +checksum = "3b13c32d80ecc7ab747b80c3784bce54ee8a7a0cc4fbda9bf4cda2cf6fe90854" [[package]] name = "arc-swap" @@ -136,9 +136,9 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "async-channel" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" dependencies = [ "concurrent-queue", "event-listener", @@ -164,18 +164,18 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.22", + "syn 2.0.26", ] [[package]] name = "async-trait" -version = "0.1.68" +version = "0.1.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" +checksum = "a564d521dd56509c4c47480d00b80ee55f7e385ae48db5744c67ad50c92d2ebf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.22", + "syn 2.0.26", ] [[package]] @@ -186,13 +186,13 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.6.18" +version = "0.6.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8175979259124331c1d7bf6586ee7e0da434155e4b2d48ec2c8386281d8df39" +checksum = "a6a1de45611fdb535bfde7b7de4fd54f4fd2b17b1737c0a59b69bf9b92074b8c" dependencies = [ "async-trait", "axum-core", - "bitflags", + "bitflags 1.3.2", "bytes", "futures-util", "http", @@ -231,15 +231,15 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.67" +version = "0.3.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "233d376d6d185f2a3093e58f283f60f880315b6c60075b01f36b3b85154564ca" +checksum = "4319208da049c43661739c5fade2ba182f09d1dc2299b32298d3a31692b17e12" dependencies = [ "addr2line", "cc", "cfg-if", "libc", - "miniz_oxide 0.6.2", + "miniz_oxide", "object", "rustc-demangle", ] @@ -282,6 +282,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitflags" +version = "2.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630be753d4e58660abd17930c71b647fe46c27ea6b63cc59e1e3851406972e42" + [[package]] name = "bitshuffle" version = "0.0.2" @@ -381,9 +387,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.3.8" +version = "4.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9394150f5b4273a1763355bd1c2ec54cc5a2593f790587bcd6b2c947cfa9211" +checksum = "8f644d0dac522c8b05ddc39aaaccc5b136d5dc4ff216610c5641e3be5becf56c" dependencies = [ "clap_builder", "clap_derive", @@ -392,13 +398,12 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.3.8" +version = "4.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a78fbdd3cc2914ddf37ba444114bc7765bbdcb55ec9cbe6fa054f0137400717" +checksum = "af410122b9778e024f9e0fb35682cc09cc3f85cad5e8d3ba8f47a9702df6e73d" dependencies = [ "anstream", "anstyle", - "bitflags", "clap_lex", "once_cell", "strsim", @@ -406,14 +411,14 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.3.2" +version = "4.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8cd2b2a819ad6eec39e8f1d6b53001af1e5469f8c177579cdaeb313115b825f" +checksum = "54a9bb5758fc5dfe728d1019941681eccaf0cf8a4189b692a0ee2f2ecf90a050" dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.22", + "syn 2.0.26", ] [[package]] @@ -474,9 +479,9 @@ dependencies = [ [[package]] name = "console-subscriber" -version = "0.1.9" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57ab2224a0311582eb03adba4caaf18644f7b1f10a760803a803b9b605187fc7" +checksum = "d4cf42660ac07fcebed809cfe561dd8730bcd35b075215e6479c516bcd0d11cb" dependencies = [ "console-api", "crossbeam-channel", @@ -527,9 +532,9 @@ dependencies = [ [[package]] name = "cpufeatures" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03e69e28e9f7f77debdedbaafa2866e1de9ba56df55a8bd7cfc724c25a09987c" +checksum = "a17b76ff3a4162b0b27f354a0c87015ddad39d35f9c0c36607a3bdd175dde1f1" dependencies = [ "libc", ] @@ -769,9 +774,9 @@ dependencies = [ [[package]] name = "darling" -version = "0.20.1" +version = "0.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0558d22a7b463ed0241e993f76f09f30b126687447751a8638587b864e4b3944" +checksum = "0209d94da627ab5605dcccf08bb18afa5009cfbef48d8a8b7d7bdbc79be25c5e" dependencies = [ "darling_core", "darling_macro", @@ -779,36 +784,36 @@ dependencies = [ [[package]] name = "darling_core" -version = "0.20.1" +version = "0.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab8bfa2e259f8ee1ce5e97824a3c55ec4404a0d772ca7fa96bf19f0752a046eb" +checksum = "177e3443818124b357d8e76f53be906d60937f0d3a90773a664fa63fa253e621" dependencies = [ "fnv", "ident_case", "proc-macro2", "quote", - "syn 2.0.22", + "syn 2.0.26", ] [[package]] name = "darling_macro" -version = "0.20.1" +version = "0.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29a358ff9f12ec09c3e61fef9b5a9902623a695a46a917b07f269bff1445611a" +checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core", "quote", - "syn 2.0.22", + "syn 2.0.26", ] [[package]] name = "dashmap" -version = "5.4.0" +version = "5.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" +checksum = "6943ae99c34386c84a470c499d3414f66502a41340aa895406e0d2e4a207b91d" dependencies = [ "cfg-if", - "hashbrown 0.12.3", + "hashbrown 0.14.0", "lock_api", "once_cell", "parking_lot_core 0.9.8", @@ -963,20 +968,20 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.22", + "syn 2.0.26", ] [[package]] name = "equivalent" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88bffebc5d80432c9b140ee17875ff173a8ab62faad5b257da912bd2f6c1c0a1" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "erased-serde" -version = "0.3.25" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f2b0c2380453a92ea8b6c8e5f64ecaafccddde8ceab55ff7a8ac1029f894569" +checksum = "da96524cc884f6558f1769b6c46686af2fe8e8b4cd253bd5a3cdba8181b8e070" dependencies = [ "serde", ] @@ -996,7 +1001,7 @@ dependencies = [ "serde", "serde_cbor", "serde_json", - "thiserror", + "thiserror 1.0.41", "url", ] @@ -1049,7 +1054,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b9429470923de8e8cbd4d2dc513535400b4b3fef0319fb5c4e1f520a7bef743" dependencies = [ "crc32fast", - "miniz_oxide 0.7.1", + "miniz_oxide", ] [[package]] @@ -1160,7 +1165,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.22", + "syn 2.0.26", ] [[package]] @@ -1314,18 +1319,9 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" [[package]] name = "hermit-abi" -version = "0.2.6" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" -dependencies = [ - "libc", -] - -[[package]] -name = "hermit-abi" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286" +checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" [[package]] name = "hex" @@ -1578,20 +1574,19 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" dependencies = [ - "hermit-abi 0.3.1", + "hermit-abi", "libc", "windows-sys 0.48.0", ] [[package]] name = "is-terminal" -version = "0.4.7" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f" +checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ - "hermit-abi 0.3.1", - "io-lifetimes", - "rustix", + "hermit-abi", + "rustix 0.38.4", "windows-sys 0.48.0", ] @@ -1614,6 +1609,7 @@ dependencies = [ name = "items_2" version = "0.0.2" dependencies = [ + "bitshuffle", "bytes", "chrono", "crc32fast", @@ -1638,7 +1634,7 @@ dependencies = [ name = "items_proc" version = "0.0.2" dependencies = [ - "syn 2.0.22", + "syn 2.0.26", ] [[package]] @@ -1652,9 +1648,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.6" +version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" +checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" [[package]] name = "js-sys" @@ -1689,6 +1685,12 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" +[[package]] +name = "linux-raw-sys" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09fc20d2ca12cb9f044c93e3bd6d32d523e6e2ec3db4f7b2939cd99026ecd3f0" + [[package]] name = "lock_api" version = "0.4.10" @@ -1729,7 +1731,7 @@ version = "0.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1" dependencies = [ - "regex-automata", + "regex-automata 0.1.10", ] [[package]] @@ -1738,7 +1740,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" dependencies = [ - "regex-automata", + "regex-automata 0.1.10", ] [[package]] @@ -1801,15 +1803,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" -[[package]] -name = "miniz_oxide" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b275950c28b37e794e8c55d88aeb5e139d0ce23fdbbeda68f8d7174abdf9e8fa" -dependencies = [ - "adler", -] - [[package]] name = "miniz_oxide" version = "0.7.1" @@ -1967,11 +1960,11 @@ dependencies = [ [[package]] name = "num_cpus" -version = "1.15.0" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi 0.2.6", + "hermit-abi", "libc", ] @@ -1998,9 +1991,9 @@ dependencies = [ [[package]] name = "object" -version = "0.30.4" +version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03b4680b86d9cfafba8fc491dc9b6df26b68cf40e9e6cd73909194759a63c385" +checksum = "8bda667d9f2b5051b8833f59f3bf748b28ef54f850f4fcb389a252aa383866d1" dependencies = [ "memchr", ] @@ -2017,7 +2010,7 @@ version = "0.10.55" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "345df152bc43501c5eb9e4654ff05f794effb78d4efe3d53abc158baddc0703d" dependencies = [ - "bitflags", + "bitflags 1.3.2", "cfg-if", "foreign-types", "libc", @@ -2034,7 +2027,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.22", + "syn 2.0.26", ] [[package]] @@ -2118,6 +2111,7 @@ dependencies = [ "chrono", "err", "hex", + "humantime-serde", "netpod", "nom", "num-derive", @@ -2129,9 +2123,9 @@ dependencies = [ [[package]] name = "paste" -version = "1.0.12" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f746c4065a8fa3fe23974dd82f15431cc8d40779821001404d10d2e79ca7d79" +checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" [[package]] name = "percent-encoding" @@ -2159,29 +2153,29 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.0" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c95a7476719eab1e366eaf73d0260af3021184f18177925b07f54b30089ceead" +checksum = "030ad2bc4db10a8944cb0d837f158bdfec4d4a4873ab701a95046770d11f8842" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.0" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07" +checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.22", + "syn 2.0.26", ] [[package]] name = "pin-project-lite" -version = "0.2.9" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" +checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57" [[package]] name = "pin-utils" @@ -2269,9 +2263,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.63" +version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b368fba921b0dce7e60f5e04ec15e565b3303972b42bcfde1d0713b881959eb" +checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" dependencies = [ "unicode-ident", ] @@ -2346,9 +2340,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.28" +version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b9ab9c7eadfd8df19006f1cf1a4aed13540ed5cbc047010ece5826e10825488" +checksum = "5fe8a65d69dd0808184ebb5f836ab526bb259db23c657efa38711b1072ee47f0" dependencies = [ "proc-macro2", ] @@ -2426,7 +2420,7 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" dependencies = [ - "bitflags", + "bitflags 1.3.2", ] [[package]] @@ -2435,7 +2429,7 @@ version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" dependencies = [ - "bitflags", + "bitflags 1.3.2", ] [[package]] @@ -2452,13 +2446,14 @@ dependencies = [ [[package]] name = "regex" -version = "1.8.4" +version = "1.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0ab3ca65655bb1e41f2a8c8cd662eb4fb035e67c3f78da1d61dffe89d07300f" +checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.7.2", + "regex-automata 0.3.3", + "regex-syntax 0.7.4", ] [[package]] @@ -2470,6 +2465,17 @@ dependencies = [ "regex-syntax 0.6.29", ] +[[package]] +name = "regex-automata" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39354c10dd07468c2e73926b23bb9c2caca74c5501e38a35da70406f1d923310" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax 0.7.4", +] + [[package]] name = "regex-syntax" version = "0.6.29" @@ -2478,9 +2484,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" [[package]] name = "regex-syntax" -version = "0.7.2" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78" +checksum = "e5ea92a5b6195c6ef2a0295ea818b312502c6fc94dde986c5553242e18fd4ce2" [[package]] name = "region" @@ -2488,7 +2494,7 @@ version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76e189c2369884dce920945e2ddf79b3dff49e071a167dd1817fa9c4c00d512e" dependencies = [ - "bitflags", + "bitflags 1.3.2", "libc", "mach", "winapi", @@ -2562,44 +2568,57 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" [[package]] name = "rustix" -version = "0.37.20" +version = "0.37.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b96e891d04aa506a6d1f318d2771bcb1c7dfda84e126660ace067c9b474bb2c0" +checksum = "4d69718bf81c6127a49dc64e44a742e8bb9213c0ff8869a22c308f84c1d4ab06" dependencies = [ - "bitflags", + "bitflags 1.3.2", "errno", "io-lifetimes", "libc", - "linux-raw-sys", + "linux-raw-sys 0.3.8", + "windows-sys 0.48.0", +] + +[[package]] +name = "rustix" +version = "0.38.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a962918ea88d644592894bc6dc55acc6c0956488adcebbfb6e273506b7fd6e5" +dependencies = [ + "bitflags 2.3.3", + "errno", + "libc", + "linux-raw-sys 0.4.3", "windows-sys 0.48.0", ] [[package]] name = "rustversion" -version = "1.0.12" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f3208ce4d8448b3f3e7d168a73f5e0c43a61e32930de3bceeccedb388b6bf06" +checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" [[package]] name = "ryu" -version = "1.0.13" +version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041" +checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" [[package]] name = "schannel" -version = "0.1.21" +version = "0.1.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "713cfb06c7059f3588fb8044c0fad1d09e3c01d225e25b9220dbfdcf16dbb1b3" +checksum = "0c3733bf4cf7ea0880754e19cb5a462007c4a8c1914bff372ccc95b464f1df88" dependencies = [ - "windows-sys 0.42.0", + "windows-sys 0.48.0", ] [[package]] name = "scopeguard" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "scylla" @@ -2629,7 +2648,7 @@ dependencies = [ "socket2 0.5.3", "strum", "strum_macros", - "thiserror", + "thiserror 1.0.43", "tokio", "tracing", "uuid", @@ -2651,7 +2670,7 @@ dependencies = [ "num_enum", "scylla-macros", "snap", - "thiserror", + "thiserror 1.0.43", "tokio", "uuid", ] @@ -2705,7 +2724,7 @@ version = "2.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fc758eb7bffce5b308734e9b0c1468893cae9ff70ebf13e7090be8dcbcc83a8" dependencies = [ - "bitflags", + "bitflags 1.3.2", "core-foundation", "core-foundation-sys", "libc", @@ -2724,9 +2743,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.164" +version = "1.0.171" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e8c8cf938e98f769bc164923b06dce91cea1751522f46f8466461af04c9027d" +checksum = "30e27d1e4fd7659406c492fd6cfaf2066ba8773de45ca75e855590f856dc34a9" dependencies = [ "serde_derive", ] @@ -2754,20 +2773,20 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.164" +version = "1.0.171" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9735b638ccc51c28bf6914d90a2e9725b377144fc612c49a611fddd1b631d68" +checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682" dependencies = [ "proc-macro2", "quote", - "syn 2.0.22", + "syn 2.0.26", ] [[package]] name = "serde_json" -version = "1.0.99" +version = "1.0.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46266871c240a00b8f503b877622fe33430b3c7d963bdc0f2adc511e54a1eae3" +checksum = "d03b412469450d4404fe8499a268edd7f8b79fecb074b0d812ad64ca21f4031b" dependencies = [ "itoa", "ryu", @@ -2776,9 +2795,9 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.9.22" +version = "0.9.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "452e67b9c20c37fa79df53201dc03839651086ed9bbe92b3ca585ca9fdaa7d85" +checksum = "bd5f51e3fdb5b9cdd1577e1cb7a733474191b1aca6a72c2e50913241632c1180" dependencies = [ "indexmap 2.0.0", "itoa", @@ -2836,9 +2855,9 @@ checksum = "826167069c09b99d56f31e9ae5c99049e932a98c9dc2dac47645b08dbbf76ba7" [[package]] name = "smallvec" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" +checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" [[package]] name = "snap" @@ -2907,9 +2926,9 @@ dependencies = [ [[package]] name = "stringprep" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1" +checksum = "db3737bde7edce97102e0e2b15365bf7a20bfdb5f60f4f9e8d7004258a51a8da" dependencies = [ "unicode-bidi", "unicode-normalization", @@ -2959,9 +2978,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.22" +version = "2.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2efbeae7acf4eabd6bcdcbd11c92f45231ddda7539edc7806bd1a04a03b24616" +checksum = "45c3457aacde3c65315de5031ec191ce46604304d2446e803d71ade03308d970" dependencies = [ "proc-macro2", "quote", @@ -2982,9 +3001,9 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "target-lexicon" -version = "0.12.8" +version = "0.12.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b1c7f239eb94671427157bd93b3694320f3668d4e1eff08c7285366fd777fac" +checksum = "df8e77cb757a61f51b947ec4a7e3646efd825b73561db1c232a8ccb639e611a0" [[package]] name = "taskrun" @@ -2996,7 +3015,7 @@ dependencies = [ "err", "futures-util", "lazy_static", - "time 0.3.22", + "time 0.3.23", "tokio", "tracing", "tracing-subscriber 0.3.17", @@ -3012,28 +3031,44 @@ dependencies = [ "cfg-if", "fastrand", "redox_syscall 0.3.5", - "rustix", + "rustix 0.37.23", "windows-sys 0.48.0", ] [[package]] name = "thiserror" -version = "1.0.40" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac" +version = "1.0.41" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.41", +] + +[[package]] +name = "thiserror" +version = "1.0.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a35fc5b8971143ca348fa6df4f024d4d55264f3468c71ad1c2f365b0a4d58c42" +dependencies = [ + "thiserror-impl 1.0.43", ] [[package]] name = "thiserror-impl" -version = "1.0.40" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" +version = "1.0.41" dependencies = [ "proc-macro2", "quote", - "syn 2.0.22", + "syn 2.0.26", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "463fe12d7993d3b327787537ce8dd4dfa058de32fc2b195ef3cde03dc4771e8f" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.26", ] [[package]] @@ -3059,9 +3094,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.22" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea9e1b3cf1243ae005d9e74085d4d542f3125458f3a81af210d901dcd7411efd" +checksum = "59e399c068f43a5d116fedaf73b203fa4f9c519f17e2b34f63221d3792f81446" dependencies = [ "itoa", "serde", @@ -3077,9 +3112,9 @@ checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" [[package]] name = "time-macros" -version = "0.2.9" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "372950940a5f07bf38dbe211d7283c9e6d7327df53794992d293e534c733d09b" +checksum = "96ba15a897f3c86766b757e5ac7221554c6750054d74d5b28844fce5fb36a6c4" dependencies = [ "time-core", ] @@ -3110,11 +3145,12 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.28.2" +version = "1.29.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2" +checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da" dependencies = [ "autocfg", + "backtrace", "bytes", "libc", "mio", @@ -3144,7 +3180,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.22", + "syn 2.0.26", ] [[package]] @@ -3214,9 +3250,9 @@ checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b" [[package]] name = "toml_edit" -version = "0.19.11" +version = "0.19.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "266f016b7f039eec8a1a80dfe6156b633d208b9fccca5e4db1d6775b0c4e34a7" +checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a" dependencies = [ "indexmap 2.0.0", "toml_datetime", @@ -3303,7 +3339,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.22", + "syn 2.0.26", ] [[package]] @@ -3385,7 +3421,7 @@ dependencies = [ "sharded-slab", "smallvec", "thread_local", - "time 0.3.22", + "time 0.3.23", "tracing", "tracing-core", "tracing-log", @@ -3421,9 +3457,9 @@ checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" [[package]] name = "unicode-ident" -version = "1.0.9" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15811caf2415fb889178633e7724bad2509101cde276048e013b9def5e51fa0" +checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c" [[package]] name = "unicode-normalization" @@ -3442,9 +3478,9 @@ checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36" [[package]] name = "unsafe-libyaml" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1865806a559042e51ab5414598446a5871b561d21b6764f2eabb0dd481d880a6" +checksum = "f28467d3e1d3c6586d8f25fa243f544f5800fec42d97032474e17222c2b75cfa" [[package]] name = "url" @@ -3465,9 +3501,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.4.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d023da39d1fde5a8a3fe1f3e01ca9632ada0a63e9797de55a879d6e2236277be" +checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" dependencies = [ "getrandom", ] @@ -3532,7 +3568,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.22", + "syn 2.0.26", "wasm-bindgen-shared", ] @@ -3577,7 +3613,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.22", + "syn 2.0.26", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3604,7 +3640,7 @@ dependencies = [ "serde", "serde-wasm-bindgen", "target-lexicon", - "thiserror", + "thiserror 1.0.43", "wasm-bindgen", "wasm-bindgen-downcast", "wasmer-compiler", @@ -3631,7 +3667,7 @@ dependencies = [ "more-asserts", "region", "smallvec", - "thiserror", + "thiserror 1.0.43", "wasmer-types", "wasmer-vm", "wasmparser", @@ -3682,7 +3718,7 @@ dependencies = [ "more-asserts", "rkyv", "target-lexicon", - "thiserror", + "thiserror 1.0.43", ] [[package]] @@ -3707,7 +3743,7 @@ dependencies = [ "more-asserts", "region", "scopeguard", - "thiserror", + "thiserror 1.0.43", "wasmer-types", "winapi", ] @@ -3766,21 +3802,6 @@ dependencies = [ "windows_x86_64_msvc 0.33.0", ] -[[package]] -name = "windows-sys" -version = "0.42.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" -dependencies = [ - "windows_aarch64_gnullvm 0.42.2", - "windows_aarch64_msvc 0.42.2", - "windows_i686_gnu 0.42.2", - "windows_i686_msvc 0.42.2", - "windows_x86_64_gnu 0.42.2", - "windows_x86_64_gnullvm 0.42.2", - "windows_x86_64_msvc 0.42.2", -] - [[package]] name = "windows-sys" version = "0.48.0" @@ -3792,25 +3813,19 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.48.0" +version = "0.48.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5" +checksum = "05d4b17490f70499f20b9e791dcf6a299785ce8af4d709018206dc5b4953e95f" dependencies = [ - "windows_aarch64_gnullvm 0.48.0", + "windows_aarch64_gnullvm", "windows_aarch64_msvc 0.48.0", "windows_i686_gnu 0.48.0", "windows_i686_msvc 0.48.0", "windows_x86_64_gnu 0.48.0", - "windows_x86_64_gnullvm 0.48.0", + "windows_x86_64_gnullvm", "windows_x86_64_msvc 0.48.0", ] -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" - [[package]] name = "windows_aarch64_gnullvm" version = "0.48.0" @@ -3823,12 +3838,6 @@ version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd761fd3eb9ab8cc1ed81e56e567f02dd82c4c837e48ac3b2181b9ffc5060807" -[[package]] -name = "windows_aarch64_msvc" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" - [[package]] name = "windows_aarch64_msvc" version = "0.48.0" @@ -3841,12 +3850,6 @@ version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cab0cf703a96bab2dc0c02c0fa748491294bf9b7feb27e1f4f96340f208ada0e" -[[package]] -name = "windows_i686_gnu" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" - [[package]] name = "windows_i686_gnu" version = "0.48.0" @@ -3859,12 +3862,6 @@ version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8cfdbe89cc9ad7ce618ba34abc34bbb6c36d99e96cae2245b7943cd75ee773d0" -[[package]] -name = "windows_i686_msvc" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" - [[package]] name = "windows_i686_msvc" version = "0.48.0" @@ -3877,24 +3874,12 @@ version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4dd9b0c0e9ece7bb22e84d70d01b71c6d6248b81a3c60d11869451b4cb24784" -[[package]] -name = "windows_x86_64_gnu" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" - [[package]] name = "windows_x86_64_gnu" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" - [[package]] name = "windows_x86_64_gnullvm" version = "0.48.0" @@ -3907,12 +3892,6 @@ version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff1e4aa646495048ec7f3ffddc411e1d829c026a2ec62b39da15c1055e406eaa" -[[package]] -name = "windows_x86_64_msvc" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" - [[package]] name = "windows_x86_64_msvc" version = "0.48.0" @@ -3921,9 +3900,9 @@ checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" [[package]] name = "winnow" -version = "0.4.7" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca0ace3845f0d96209f0375e6d367e3eb87eb65d27d445bdc9f1843a26f39448" +checksum = "81fac9742fd1ad1bd9643b991319f72dd031016d44b77039a26977eb667141e7" dependencies = [ "memchr", ] diff --git a/crates/disk/src/disk.rs b/crates/disk/src/disk.rs index 17edc0e..89633dd 100644 --- a/crates/disk/src/disk.rs +++ b/crates/disk/src/disk.rs @@ -61,6 +61,7 @@ use tokio::io::AsyncReadExt; use tokio::io::AsyncSeekExt; use tokio::io::ReadBuf; use tokio::sync::mpsc; +use tracing::Instrument; // TODO move to databuffer-specific crate // TODO duplicate of SfChFetchInfo? @@ -347,8 +348,10 @@ fn start_read5( } } let n = pos - pos_beg; - info!("read5 done {n}"); + debug!("read5 done {n}"); }; + let span = tracing::span!(tracing::Level::INFO, "read5", reqid); + let fut = fut.instrument(span); tokio::task::spawn(fut); Ok(()) } diff --git a/crates/disk/src/eventblobs.rs b/crates/disk/src/eventblobs.rs index dd621c4..6592e10 100644 --- a/crates/disk/src/eventblobs.rs +++ b/crates/disk/src/eventblobs.rs @@ -186,8 +186,7 @@ impl Stream for EventChunkerMultifile { let file = ofs.files.pop().unwrap(); let path = file.path; let msg = format!("handle OFS {:?}", ofs); - debug!("{}", msg); - let item = LogItem::quick(Level::INFO, msg); + let item = LogItem::quick(Level::DEBUG, msg); match file.file { Some(file) => { let inp = Box::pin(crate::file_content_stream( @@ -212,16 +211,12 @@ impl Stream for EventChunkerMultifile { Ready(Some(Ok(StreamItem::Log(item)))) } else if ofs.files.len() == 0 { let msg = format!("handle OFS {:?} NO FILES", ofs); - debug!("{}", msg); - let item = LogItem::quick(Level::INFO, msg); + let item = LogItem::quick(Level::DEBUG, msg); Ready(Some(Ok(StreamItem::Log(item)))) } else { - let msg = format!("handle OFS MERGED timebin {}", ofs.timebin); - info!("{}", msg); - for x in &ofs.files { - info!(" path {:?}", x.path); - } - let item = LogItem::quick(Level::INFO, msg); + let paths: Vec<_> = ofs.files.iter().map(|x| &x.path).collect(); + let msg = format!("handle OFS MERGED timebin {} {:?}", ofs.timebin, paths); + let item = LogItem::quick(Level::DEBUG, msg); let mut chunkers = Vec::new(); for of in ofs.files { if let Some(file) = of.file { @@ -256,7 +251,7 @@ impl Stream for EventChunkerMultifile { Ready(None) => { self.done = true; let item = LogItem::quick( - Level::INFO, + Level::DEBUG, format!( "EventChunkerMultifile used {} datafiles beg {} end {} node_ix {}", self.files_count, diff --git a/crates/disk/src/eventchunker.rs b/crates/disk/src/eventchunker.rs index e2530e2..39155ab 100644 --- a/crates/disk/src/eventchunker.rs +++ b/crates/disk/src/eventchunker.rs @@ -96,7 +96,7 @@ impl Drop for EventChunker { warn!("config_mismatch_discard {}", self.config_mismatch_discard); } debug!( - "EventChunker Drop Stats:\ndecomp_dt_histo: {:?}\nitem_len_emit_histo: {:?}", + "EventChunker-stats {{ decomp_dt_histo: {:?}, item_len_emit_histo: {:?} }}", self.decomp_dt_histo, self.item_len_emit_histo ); } @@ -164,7 +164,7 @@ impl EventChunker { dbg_path: PathBuf, expand: bool, ) -> Self { - info!("{}::{}", Self::self_name(), "from_start"); + debug!("{}::{}", Self::self_name(), "from_start"); let need_min_max = match fetch_info.shape() { Shape::Scalar => 1024 * 8, Shape::Wave(_) => 1024 * 32, @@ -210,7 +210,7 @@ impl EventChunker { dbg_path: PathBuf, expand: bool, ) -> Self { - info!("{}::{}", Self::self_name(), "from_event_boundary"); + debug!("{}::{}", Self::self_name(), "from_event_boundary"); let mut ret = Self::from_start(inp, fetch_info, range, stats_conf, dbg_path, expand); ret.state = DataFileState::Event; ret.need_min = 4; @@ -440,7 +440,7 @@ impl EventChunker { if discard { self.discard_count += 1; } else { - ret.add_event( + ret.push( ts, pulse, databuf.to_vec(), diff --git a/crates/disk/src/raw/generated.rs b/crates/disk/src/raw/generated.rs index 0bee401..9a28e40 100644 --- a/crates/disk/src/raw/generated.rs +++ b/crates/disk/src/raw/generated.rs @@ -74,7 +74,7 @@ impl EventBlobsGeneratorI32Test00 { } let pulse = ts; let value = (ts / (MS * 100) % 1000) as T; - item.add_event( + item.push( ts, pulse, value.to_be_bytes().to_vec(), @@ -174,7 +174,7 @@ impl EventBlobsGeneratorI32Test01 { } let pulse = ts; let value = (ts / self.dts) as T; - item.add_event( + item.push( ts, pulse, value.to_be_bytes().to_vec(), diff --git a/crates/err/src/lib.rs b/crates/err/src/lib.rs index 0f70338..ea36ca0 100644 --- a/crates/err/src/lib.rs +++ b/crates/err/src/lib.rs @@ -466,6 +466,12 @@ impl From<&Error> for PublicError { } } +impl fmt::Display for PublicError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.msg) + } +} + pub fn todo() { let bt = backtrace::Backtrace::new(); eprintln!("TODO\n{bt:?}"); diff --git a/crates/httpret/src/api1.rs b/crates/httpret/src/api1.rs index 17a87c4..0a38618 100644 --- a/crates/httpret/src/api1.rs +++ b/crates/httpret/src/api1.rs @@ -1,3 +1,4 @@ +use crate::err::Error; use crate::gather::gather_get_json_generic; use crate::gather::SubRes; use crate::response; @@ -7,11 +8,8 @@ use bytes::BytesMut; use disk::eventchunker::EventChunkerConf; use disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes; use disk::raw::conn::make_local_event_blobs_stream; -use err::Error; -use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; -use futures_util::TryStreamExt; use http::Method; use http::StatusCode; use hyper::Body; @@ -28,6 +26,7 @@ use netpod::log::*; use netpod::query::api1::Api1Query; use netpod::range::evrange::NanoRange; use netpod::timeunits::SEC; +use netpod::Api1WarningStats; use netpod::ByteSize; use netpod::ChannelSearchQuery; use netpod::ChannelSearchResult; @@ -456,7 +455,7 @@ pub async fn gather_json_2_v1( struct Jres { hosts: Vec, } - let mut a = vec![]; + let mut a = Vec::new(); for tr in spawned { let res = match tr.1.await { Ok(k) => match k { @@ -501,8 +500,6 @@ async fn process_answer(res: Response) -> Result { s1 ))) } else { - //use snafu::IntoError; - //Err(Bad{msg:format!("API error")}.into_error(NoneError)).ctxb(SE!(AddPos)) Ok(JsonValue::String(format!("status {}", pre.status.as_str()))) } } else { @@ -516,15 +513,6 @@ async fn process_answer(res: Response) -> Result { } } -async fn find_ch_conf( - range: NanoRange, - channel: SfDbChannel, - ncc: NodeConfigCached, -) -> Result, Error> { - let ret = nodenet::channelconfig::channel_config(range, channel, &ncc).await?; - Ok(ret) -} - pub struct DataApiPython3DataStream { range: NanoRange, channels: VecDeque, @@ -533,7 +521,6 @@ pub struct DataApiPython3DataStream { current_fetch_info: Option, node_config: NodeConfigCached, chan_stream: Option> + Send>>>, - config_fut: Option, Error>> + Send>>>, disk_io_tune: DiskIoTune, do_decompress: bool, event_count: usize, @@ -543,6 +530,7 @@ pub struct DataApiPython3DataStream { ping_last: Instant, data_done: bool, completed: bool, + stats: Api1WarningStats, } impl DataApiPython3DataStream { @@ -564,7 +552,6 @@ impl DataApiPython3DataStream { current_fetch_info: None, node_config, chan_stream: None, - config_fut: None, disk_io_tune, do_decompress, event_count: 0, @@ -574,9 +561,16 @@ impl DataApiPython3DataStream { ping_last: Instant::now(), data_done: false, completed: false, + stats: Api1WarningStats::new(), } } + fn channel_finished(&mut self) { + self.chan_stream = None; + self.header_out = false; + self.event_count = 0; + } + fn convert_item( b: EventFull, channel: &ChannelTypeConfigGen, @@ -588,7 +582,7 @@ impl DataApiPython3DataStream { let shape = fetch_info.shape(); let mut d = BytesMut::new(); for i1 in 0..b.len() { - const EVIMAX: usize = 6; + const EVIMAX: usize = 20; if *count_events < EVIMAX { debug!( "ev info {}/{} bloblen {:?} BE {:?} scalar-type {:?} shape {:?} comps {:?}", @@ -662,57 +656,83 @@ impl DataApiPython3DataStream { Ok(d) } - fn handle_chan_stream_ready(&mut self, item: Sitemty) -> Option> { - match item { - Ok(k) => { - let n = Instant::now(); - if n.duration_since(self.ping_last) >= Duration::from_millis(2000) { - let mut sb = crate::status_board().unwrap(); - sb.mark_alive(self.reqctx.reqid()); - self.ping_last = n; - } - match k { - StreamItem::DataItem(k) => match k { - RangeCompletableItem::RangeComplete => todo!(), - RangeCompletableItem::Data(k) => { - let item = Self::convert_item( - k, - self.current_channel.as_ref().unwrap(), - self.current_fetch_info.as_ref().unwrap(), - self.do_decompress, - &mut self.header_out, - &mut self.event_count, - )?; - todo!() + fn handle_chan_stream_ready(&mut self, item: Sitemty) -> Result { + let ret = match item { + Ok(k) => match k { + StreamItem::DataItem(k) => match k { + RangeCompletableItem::RangeComplete => { + debug!("sees RangeComplete"); + Ok(BytesMut::new()) + } + RangeCompletableItem::Data(k) => { + self.event_count += k.len(); + if self.events_max != 0 && self.event_count >= self.events_max as usize { + return Err(Error::with_msg_no_trace(format!( + "events_max reached {} {}", + self.event_count, self.events_max + ))); } - }, - StreamItem::Log(k) => todo!(), - StreamItem::Stats(k) => todo!(), + + // NOTE needed because the databuffer actually doesn't write + // the correct shape per event. + let mut k = k; + if let Some(fi) = self.current_fetch_info.as_ref() { + if let Shape::Scalar = fi.shape() { + } else { + k.overwrite_all_shapes(fi.shape()); + } + } + let k = k; + + let item = Self::convert_item( + k, + self.current_channel.as_ref().unwrap(), + self.current_fetch_info.as_ref().unwrap(), + self.do_decompress, + &mut self.header_out, + &mut self.event_count, + )?; + Ok(item) + } + }, + StreamItem::Log(k) => { + let nodeix = k.node_ix; + if k.level == Level::ERROR { + tracing::event!(Level::ERROR, nodeix, message = k.msg); + } else if k.level == Level::WARN { + tracing::event!(Level::WARN, nodeix, message = k.msg); + } else if k.level == Level::INFO { + tracing::event!(Level::INFO, nodeix, message = k.msg); + } else if k.level == Level::DEBUG { + tracing::event!(Level::DEBUG, nodeix, message = k.msg); + } else if k.level == Level::TRACE { + tracing::event!(Level::TRACE, nodeix, message = k.msg); + } else { + tracing::event!(Level::TRACE, nodeix, message = k.msg); + } + Ok(BytesMut::new()) } - } + StreamItem::Stats(k) => { + // + Ok(BytesMut::new()) + } + }, Err(e) => { - error!("DataApiPython3DataStream emit error: {e:?}"); - self.chan_stream = None; - self.current_channel = None; - self.current_fetch_info = None; - self.data_done = true; - let mut sb = crate::status_board().unwrap(); - sb.add_error(self.reqctx.reqid(), e); - if false { - // TODO format as python data api error frame: - let mut buf = BytesMut::with_capacity(1024); - buf.put_slice("".as_bytes()); - Some(Ok(buf)) - } else { - None - } + error!("DataApiPython3DataStream emit error: {e}"); + Err(e.into()) } + }; + let tsnow = Instant::now(); + if tsnow.duration_since(self.ping_last) >= Duration::from_millis(500) { + self.ping_last = tsnow; + let mut sb = crate::status_board().unwrap(); + sb.mark_alive(self.reqctx.reqid()); } + ret } // TODO this stream can currently only handle sf-databuffer type backend anyway. fn handle_config_fut_ready(&mut self, fetch_info: SfChFetchInfo) -> Result<(), Error> { - self.config_fut = None; let select = EventsSubQuerySelect::new( ChannelTypeConfigGen::SfDatabuffer(fetch_info.clone()), self.range.clone().into(), @@ -726,7 +746,7 @@ impl DataApiPython3DataStream { debug!("TODO add timeout option to data api3 download"); // TODO is this a good to place decide this? let stream = if self.node_config.node_config.cluster.is_central_storage { - info!("Set up central storage stream"); + debug!("set up central storage stream"); // TODO pull up this config let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024)); let s = make_local_event_blobs_stream( @@ -744,11 +764,6 @@ impl DataApiPython3DataStream { let s = MergedBlobsFromRemotes::new(subq, self.node_config.node_config.cluster.clone()); Box::pin(s) as Pin> + Send>> }; - let evm = if self.events_max == 0 { - usize::MAX - } else { - self.events_max as usize - }; self.chan_stream = Some(Box::pin(stream)); self.current_fetch_info = Some(fetch_info); Ok(()) @@ -769,63 +784,52 @@ impl Stream for DataApiPython3DataStream { } else { if let Some(stream) = &mut self.chan_stream { match stream.poll_next_unpin(cx) { - Ready(Some(k)) => Ready(self.handle_chan_stream_ready(k)), - Ready(None) => { - self.chan_stream = None; - continue; - } - Pending => Pending, - } - } else if let Some(fut) = &mut self.config_fut { - match fut.poll_unpin(cx) { - Ready(Ok(Some(k))) => match k { - ChannelTypeConfigGen::Scylla(_) => { - let e = Error::with_msg_no_trace("scylla"); + Ready(Some(k)) => match self.handle_chan_stream_ready(k) { + Ok(k) => Ready(Some(Ok(k))), + Err(e) => { error!("{e}"); + self.chan_stream = None; + self.current_channel = None; + self.current_fetch_info = None; self.data_done = true; + let mut sb = crate::status_board().unwrap(); + sb.add_error(self.reqctx.reqid(), e.0.clone()); Ready(Some(Err(e))) } - ChannelTypeConfigGen::SfDatabuffer(k) => match self.handle_config_fut_ready(k) { - Ok(()) => continue, - Err(e) => { - self.config_fut = None; - self.data_done = true; - error!("api1_binary_events error {:?}", e); - Ready(Some(Err(e))) - } - }, }, - Ready(Ok(None)) => { - warn!("logic error"); - self.config_fut = None; + Ready(None) => { + self.channel_finished(); continue; } - Ready(Err(e)) => { - self.data_done = true; - Ready(Some(Err(e))) - } Pending => Pending, } } else { - if let Some(channel) = self.channels.pop_front() { - self.current_channel = Some(channel.clone()); - if false { - self.config_fut = Some(Box::pin(find_ch_conf( - self.range.clone(), - err::todoval(), - self.node_config.clone(), - ))); + if let Some(chconf) = self.channels.pop_front() { + match &chconf { + ChannelTypeConfigGen::Scylla(_) => { + // TODO count + continue; + } + ChannelTypeConfigGen::SfDatabuffer(k) => match self.handle_config_fut_ready(k.clone()) { + Ok(()) => { + self.current_channel = Some(chconf.clone()); + continue; + } + Err(e) => { + error!("api1_binary_events error {:?}", e); + self.stats.subreq_fail += 1; + continue; + } + }, } - self.config_fut = Some(Box::pin(futures_util::future::ready(Ok(Some(channel))))); - continue; } else { self.data_done = true; { let n = Instant::now(); + self.ping_last = n; let mut sb = crate::status_board().unwrap(); sb.mark_alive(self.reqctx.reqid()); - self.ping_last = n; - sb.mark_ok(self.reqctx.reqid()); + sb.mark_done(self.reqctx.reqid()); } continue; } @@ -958,7 +962,14 @@ impl Api1EventsBinaryHandler { } None => { // TODO count in request ctx. + // TODO must already here have the final stats counter container. + // This means, the request status must provide these counters. error!("no config quorum found for {ch:?}"); + let mut sb = crate::status_board().unwrap(); + sb.mark_alive(reqctx.reqid()); + if let Some(e) = sb.get_entry(reqctx.reqid()) { + e.channel_not_found_inc(); + } } } } @@ -1005,7 +1016,7 @@ impl RequestStatusHandler { } } - pub async fn handle(&self, req: Request, _node_config: &NodeConfigCached) -> Result, Error> { + pub async fn handle(&self, req: Request, _ncc: &NodeConfigCached) -> Result, Error> { let (head, body) = req.into_parts(); if head.method != Method::GET { return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?); @@ -1024,8 +1035,9 @@ impl RequestStatusHandler { } let _body_data = hyper::body::to_bytes(body).await?; let status_id = &head.uri.path()[Self::path_prefix().len()..]; - info!("RequestStatusHandler status_id {:?}", status_id); - let s = crate::status_board()?.status_as_json(status_id); + debug!("RequestStatusHandler status_id {:?}", status_id); + let status = crate::status_board()?.status_as_json(status_id); + let s = serde_json::to_string(&status)?; let ret = response(StatusCode::OK).body(Body::from(s))?; Ok(ret) } diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index 9966048..d91d1be 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -1,8 +1,8 @@ use crate::bodystream::response; use crate::bodystream::ToPublicResponse; use crate::channelconfig::ch_conf_from_binned; +use crate::err::Error; use crate::response_err; -use err::Error; use http::Method; use http::Request; use http::Response; @@ -21,7 +21,9 @@ use url::Url; async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCached) -> Result, Error> { debug!("{:?}", req); - let reqid = crate::status_board()?.new_status_id(); + let reqid = crate::status_board() + .map_err(|e| Error::with_msg_no_trace(e.to_string()))? + .new_status_id(); let (_head, _body) = req.into_parts(); let query = BinnedQuery::from_url(&url).map_err(|e| { error!("binned_json: {e:?}"); diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index 68bdc04..7a556c1 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -1,8 +1,8 @@ use crate::channelconfig::chconf_from_events_v1; +use crate::err::Error; use crate::response; use crate::response_err; use crate::ToPublicResponse; -use err::Error; use futures_util::stream; use futures_util::TryStreamExt; use http::Method; diff --git a/crates/httpret/src/api4/search.rs b/crates/httpret/src/api4/search.rs index 1b2a544..67e6a93 100644 --- a/crates/httpret/src/api4/search.rs +++ b/crates/httpret/src/api4/search.rs @@ -1,6 +1,6 @@ use crate::bodystream::response; use crate::bodystream::ToPublicResponse; -use err::Error; +use crate::err::Error; use http::Method; use http::Request; use http::Response; diff --git a/crates/httpret/src/api4/status.rs b/crates/httpret/src/api4/status.rs index 63cfc5b..a17d1fa 100644 --- a/crates/httpret/src/api4/status.rs +++ b/crates/httpret/src/api4/status.rs @@ -1,6 +1,6 @@ use crate::bodystream::response; +use crate::err::Error; use crate::ReqCtx; -use crate::RetrievalError; use http::Request; use http::Response; use http::StatusCode; @@ -14,7 +14,7 @@ use std::collections::VecDeque; use std::time::Duration; #[allow(unused)] -async fn table_sizes(node_config: &NodeConfigCached) -> Result { +async fn table_sizes(node_config: &NodeConfigCached) -> Result { let ret = dbconn::table_sizes(node_config).await?; Ok(ret) } @@ -39,12 +39,12 @@ impl StatusNodesRecursive { req: Request, ctx: &ReqCtx, node_config: &NodeConfigCached, - ) -> Result, RetrievalError> { + ) -> Result, Error> { let res = tokio::time::timeout(Duration::from_millis(1200), self.status(req, ctx, node_config)).await; let res = match res { Ok(res) => res, Err(e) => { - let e = RetrievalError::from(e).add_public_msg("see timeout"); + let e = Error::from(e).add_public_msg("see timeout"); return Ok(crate::bodystream::ToPublicResponse::to_public_response(&e)); } }; @@ -67,7 +67,7 @@ impl StatusNodesRecursive { req: Request, _ctx: &ReqCtx, node_config: &NodeConfigCached, - ) -> Result { + ) -> Result { let (_head, _body) = req.into_parts(); let archiver_appliance_status = match node_config.node.archiver_appliance.as_ref() { Some(k) => { @@ -93,7 +93,7 @@ impl StatusNodesRecursive { } None => None, }; - let database_size = dbconn::database_size(node_config).await.map_err(|e| format!("{e:?}")); + let database_size = dbconn::database_size(node_config).await.map_err(|e| format!("{e}")); let ret = NodeStatus { name: format!("{}:{}", node_config.node.host, node_config.node.port), version: core::env!("CARGO_PKG_VERSION").into(), diff --git a/crates/httpret/src/bodystream.rs b/crates/httpret/src/bodystream.rs index 1bdb1ef..c08400e 100644 --- a/crates/httpret/src/bodystream.rs +++ b/crates/httpret/src/bodystream.rs @@ -1,4 +1,4 @@ -use err::Error; +use crate::err::Error; use futures_util::StreamExt; use http::HeaderMap; use http::Response; @@ -28,6 +28,34 @@ impl ToPublicResponse for Error { } } +impl ToPublicResponse for ::err::Error { + fn to_public_response(&self) -> Response { + use err::Reason; + let e = self.to_public_error(); + let status = match e.reason() { + Some(Reason::BadRequest) => StatusCode::BAD_REQUEST, + Some(Reason::InternalError) => StatusCode::INTERNAL_SERVER_ERROR, + _ => StatusCode::INTERNAL_SERVER_ERROR, + }; + let msg = match serde_json::to_string(&e) { + Ok(s) => s, + Err(_) => "can not serialize error".into(), + }; + match response(status) + .header(http::header::ACCEPT, APP_JSON) + .body(Body::from(msg)) + { + Ok(res) => res, + Err(e) => { + error!("can not generate http error response {e:?}"); + let mut res = Response::new(Body::default()); + *res.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + res + } + } + } +} + struct BodyStreamWrap(netpod::BodyStream); impl hyper::body::HttpBody for BodyStreamWrap { diff --git a/crates/httpret/src/channel_status.rs b/crates/httpret/src/channel_status.rs index 7d4fb6c..0da4caf 100644 --- a/crates/httpret/src/channel_status.rs +++ b/crates/httpret/src/channel_status.rs @@ -1,6 +1,6 @@ use crate::bodystream::response; +use crate::err::Error; use crate::ReqCtx; -use err::Error; use futures_util::StreamExt; use http::Method; use http::Request; diff --git a/crates/httpret/src/channelconfig.rs b/crates/httpret/src/channelconfig.rs index 80ca2d8..7fcc85b 100644 --- a/crates/httpret/src/channelconfig.rs +++ b/crates/httpret/src/channelconfig.rs @@ -1,7 +1,7 @@ +use crate::err::Error; use crate::response; use crate::ToPublicResponse; use dbconn::create_connection; -use err::Error; use futures_util::StreamExt; use http::Method; use http::Request; diff --git a/crates/httpret/src/download.rs b/crates/httpret/src/download.rs index 6277bd0..5422fc9 100644 --- a/crates/httpret/src/download.rs +++ b/crates/httpret/src/download.rs @@ -1,5 +1,5 @@ +use crate::err::Error; use crate::response; -use crate::RetrievalError; use futures_util::TryStreamExt; use http::Method; use http::StatusCode; @@ -67,11 +67,7 @@ impl DownloadHandler { } } - pub async fn get( - &self, - req: Request, - node_config: &NodeConfigCached, - ) -> Result, RetrievalError> { + pub async fn get(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { let (head, _body) = req.into_parts(); let p2 = &head.uri.path()[Self::path_prefix().len()..]; let base = match &node_config.node.sf_databuffer { @@ -88,11 +84,7 @@ impl DownloadHandler { Ok(response(StatusCode::OK).body(Body::wrap_stream(s))?) } - pub async fn handle( - &self, - req: Request, - node_config: &NodeConfigCached, - ) -> Result, RetrievalError> { + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { if req.method() == Method::GET { self.get(req, node_config).await } else { diff --git a/crates/httpret/src/err.rs b/crates/httpret/src/err.rs index fc5ae3f..6495eb7 100644 --- a/crates/httpret/src/err.rs +++ b/crates/httpret/src/err.rs @@ -37,6 +37,10 @@ impl Error { pub fn add_public_msg(self, msg: impl Into) -> Self { Error(self.0.add_public_msg(msg)) } + + pub fn from_to_string(e: E) -> Self { + Self::with_msg_no_trace(e.to_string()) + } } impl fmt::Debug for Error { @@ -93,3 +97,4 @@ impl Convable for http::header::ToStrError {} impl Convable for hyper::Error {} impl Convable for std::array::TryFromSliceError {} impl Convable for err::anyhow::Error {} +impl Convable for crate::RetrievalError {} diff --git a/crates/httpret/src/gather.rs b/crates/httpret/src/gather.rs index 370c445..b5829a5 100644 --- a/crates/httpret/src/gather.rs +++ b/crates/httpret/src/gather.rs @@ -1,5 +1,5 @@ +use crate::err::Error; use crate::response; -use crate::RetrievalError; use futures_util::select; use futures_util::FutureExt; use http::Method; @@ -33,7 +33,7 @@ struct GatherHost { inst: String, } -async fn process_answer(res: Response) -> Result { +async fn process_answer(res: Response) -> Result { let (pre, mut body) = res.into_parts(); if pre.status != StatusCode::OK { use hyper::body::HttpBody; @@ -55,14 +55,11 @@ async fn process_answer(res: Response) -> Result k, Err(_e) => JsonValue::String(String::from_utf8(body_all.to_vec())?), }; - Ok::<_, RetrievalError>(val) + Ok::<_, Error>(val) } } -pub async fn unused_gather_json_from_hosts( - req: Request, - pathpre: &str, -) -> Result, RetrievalError> { +pub async fn unused_gather_json_from_hosts(req: Request, pathpre: &str) -> Result, Error> { let (part_head, part_body) = req.into_parts(); let bodyslice = hyper::body::to_bytes(part_body).await?; let gather_from: GatherFrom = serde_json::from_slice(&bodyslice)?; @@ -82,7 +79,7 @@ pub async fn unused_gather_json_from_hosts( let task = tokio::spawn(async move { select! { _ = sleep(Duration::from_millis(1500)).fuse() => { - Err(RetrievalError::with_msg("timeout")) + Err(Error::with_msg_no_trace(format!("timeout"))) } res = Client::new().request(req?).fuse() => Ok(process_answer(res?).await?) } @@ -115,10 +112,7 @@ pub async fn unused_gather_json_from_hosts( Ok(res) } -pub async fn gather_get_json( - req: Request, - node_config: &NodeConfigCached, -) -> Result, RetrievalError> { +pub async fn gather_get_json(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let (head, body) = req.into_parts(); let _bodyslice = hyper::body::to_bytes(body).await?; let pathpre = "/api/4/gather/"; @@ -136,7 +130,7 @@ pub async fn gather_get_json( let task = tokio::spawn(async move { select! { _ = sleep(Duration::from_millis(1500)).fuse() => { - Err(RetrievalError::with_msg("timeout")) + Err(Error::with_msg_no_trace(format!("timeout"))) } res = Client::new().request(req?).fuse() => Ok(process_answer(res?).await?) } @@ -194,23 +188,23 @@ pub async fn gather_get_json_generic( // TODO use deadline instead. // TODO Wait a bit longer compared to remote to receive partial results. timeout: Duration, -) -> Result +) -> Result where SM: Send + 'static, - NT: Fn(String, Response) -> Pin, RetrievalError>> + Send>> + NT: Fn(String, Response) -> Pin, Error>> + Send>> + Send + Sync + Copy + 'static, - FT: Fn(Vec<(Tag, Result, RetrievalError>)>) -> Result, + FT: Fn(Vec<(Tag, Result, Error>)>) -> Result, { // TODO remove magic constant let extra_timeout = Duration::from_millis(3000); if urls.len() != bodies.len() { - return Err(RetrievalError::TextError(format!("unequal numbers of urls and bodies"))); + return Err(Error::with_msg_no_trace(format!("unequal numbers of urls and bodies"))); } if urls.len() != tags.len() { - return Err(RetrievalError::TextError(format!("unequal numbers of urls and tags"))); + return Err(Error::with_msg_no_trace(format!("unequal numbers of urls and tags"))); } let spawned: Vec<_> = urls .into_iter() @@ -240,7 +234,7 @@ where select! { _ = sleep(timeout + extra_timeout).fuse() => { error!("PROXY TIMEOUT"); - Err(RetrievalError::TextError(format!("timeout"))) + Err(Error::with_msg_no_trace(format!("timeout"))) } res = { let client = Client::new(); diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 54e5e92..876856b 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -4,6 +4,7 @@ pub mod bodystream; pub mod channel_status; pub mod channelconfig; pub mod download; +pub mod err; pub mod gather; pub mod prometheus; pub mod proxy; @@ -12,10 +13,11 @@ pub mod settings; use self::bodystream::ToPublicResponse; use crate::bodystream::response; +use crate::err::Error; use crate::gather::gather_get_json; use crate::pulsemap::UpdateTask; -use err::thiserror; -use err::ThisError; +use ::err::thiserror; +use ::err::ThisError; use futures_util::Future; use futures_util::FutureExt; use futures_util::StreamExt; @@ -32,6 +34,7 @@ use net::SocketAddr; use netpod::is_false; use netpod::log::*; use netpod::query::prebinned::PreBinnedQuery; +use netpod::CmpZero; use netpod::NodeConfigCached; use netpod::ProxyConfig; use netpod::APP_JSON; @@ -61,7 +64,8 @@ pub const PSI_DAQBUFFER_SEEN_URL: &'static str = "PSI-Daqbuffer-Seen-Url"; #[derive(Debug, ThisError, Serialize, Deserialize)] pub enum RetrievalError { - Error(#[from] err::Error), + Error(#[from] ::err::Error), + Error2(#[from] crate::err::Error), TextError(String), #[serde(skip)] Hyper(#[from] hyper::Error), @@ -79,6 +83,7 @@ trait IntoBoxedError: std::error::Error {} impl IntoBoxedError for net::AddrParseError {} impl IntoBoxedError for tokio::task::JoinError {} impl IntoBoxedError for api4::databuffer_tools::FindActiveError {} +impl IntoBoxedError for std::string::FromUtf8Error {} impl From for RetrievalError where @@ -89,6 +94,12 @@ where } } +impl ::err::ToErr for RetrievalError { + fn to_err(self) -> ::err::Error { + ::err::Error::with_msg_no_trace(self.to_string()) + } +} + pub async fn host(node_config: NodeConfigCached) -> Result<(), RetrievalError> { static STATUS_BOARD_INIT: Once = Once::new(); STATUS_BOARD_INIT.call_once(|| { @@ -114,11 +125,11 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), RetrievalError> { let node_config = node_config.clone(); let addr = conn.remote_addr(); async move { - Ok::<_, RetrievalError>(service_fn({ + Ok::<_, Error>(service_fn({ move |req| { // TODO send to logstash info!( - "REQUEST {:?} - {:?} - {:?} - {:?}", + "http-request {:?} - {:?} - {:?} - {:?}", addr, req.method(), req.uri(), @@ -131,12 +142,15 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), RetrievalError> { } } }); - Server::bind(&addr).serve(make_service).await?; + Server::bind(&addr) + .serve(make_service) + .await + .map(|e| RetrievalError::TextError(format!("{e:?}")))?; rawjh.await??; Ok(()) } -async fn http_service(req: Request, node_config: NodeConfigCached) -> Result, RetrievalError> { +async fn http_service(req: Request, node_config: NodeConfigCached) -> Result, Error> { match http_service_try(req, &node_config).await { Ok(k) => Ok(k), Err(e) => { @@ -146,13 +160,14 @@ async fn http_service(req: Request, node_config: NodeConfigCached) -> Resu } } +// TODO move this and related stuff to separate module struct Cont { f: Pin>, } impl Future for Cont where - F: Future>, + F: Future>, { type Output = ::Output; @@ -162,13 +177,13 @@ where Ok(k) => k, Err(e) => { error!("Cont catch_unwind {e:?}"); - match e.downcast_ref::() { + match e.downcast_ref::() { Some(e) => { error!("Cont catch_unwind is Error: {e:?}"); } None => {} } - Poll::Ready(Err(RetrievalError::TextError(format!("{e:?}")))) + Poll::Ready(Err(Error::with_msg_no_trace(format!("{e:?}")))) } } } @@ -271,10 +286,7 @@ macro_rules! static_http_api1 { }; } -async fn http_service_try( - req: Request, - node_config: &NodeConfigCached, -) -> Result, RetrievalError> { +async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> Result, Error> { use http::HeaderValue; let mut urlmarks = Vec::new(); urlmarks.push(format!("{}:{}", req.method(), req.uri())); @@ -343,37 +355,37 @@ async fn http_service_inner( Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } } else if let Some(h) = api4::status::StatusNodesRecursive::handler(&req) { - h.handle(req, ctx, &node_config).await + Ok(h.handle(req, ctx, &node_config).await?) } else if let Some(h) = StatusBoardAllHandler::handler(&req) { h.handle(req, &node_config).await } else if let Some(h) = api4::databuffer_tools::FindActiveHandler::handler(&req) { Ok(h.handle(req, &node_config).await?) } else if let Some(h) = api4::search::ChannelSearchHandler::handler(&req) { - h.handle(req, &node_config).await + Ok(h.handle(req, &node_config).await?) } else if let Some(h) = api4::binned::BinnedHandler::handler(&req) { - h.handle(req, &node_config).await + Ok(h.handle(req, &node_config).await?) } else if let Some(h) = channelconfig::ChannelConfigQuorumHandler::handler(&req) { - h.handle(req, &node_config).await + Ok(h.handle(req, &node_config).await?) } else if let Some(h) = channelconfig::ChannelConfigsHandler::handler(&req) { - h.handle(req, &node_config).await + Ok(h.handle(req, &node_config).await?) } else if let Some(h) = channelconfig::ChannelConfigHandler::handler(&req) { - h.handle(req, &node_config).await + Ok(h.handle(req, &node_config).await?) } else if let Some(h) = channelconfig::ScyllaChannelsWithType::handler(&req) { - h.handle(req, &node_config).await + Ok(h.handle(req, &node_config).await?) } else if let Some(h) = channelconfig::IocForChannel::handler(&req) { - h.handle(req, &node_config).await + Ok(h.handle(req, &node_config).await?) } else if let Some(h) = channelconfig::ScyllaChannelsActive::handler(&req) { - h.handle(req, &node_config).await + Ok(h.handle(req, &node_config).await?) } else if let Some(h) = channelconfig::ScyllaSeriesTsMsp::handler(&req) { - h.handle(req, &node_config).await + Ok(h.handle(req, &node_config).await?) } else if let Some(h) = channelconfig::AmbigiousChannelNames::handler(&req) { - h.handle(req, &node_config).await + Ok(h.handle(req, &node_config).await?) } else if let Some(h) = api4::events::EventsHandler::handler(&req) { - h.handle(req, &node_config).await + Ok(h.handle(req, &node_config).await?) } else if let Some(h) = channel_status::ConnectionStatusEvents::handler(&req) { - h.handle(req, ctx, &node_config).await + Ok(h.handle(req, ctx, &node_config).await?) } else if let Some(h) = channel_status::ChannelStatusEvents::handler(&req) { - h.handle(req, ctx, &node_config).await + Ok(h.handle(req, ctx, &node_config).await?) } else if path == "/api/4/prebinned" { if req.method() == Method::GET { Ok(prebinned(req, ctx, &node_config).await?) @@ -417,29 +429,29 @@ async fn http_service_inner( Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } } else if let Some(h) = download::DownloadHandler::handler(&req) { - h.handle(req, &node_config).await + Ok(h.handle(req, &node_config).await?) } else if let Some(h) = settings::SettingsThreadsMaxHandler::handler(&req) { - h.handle(req, &node_config).await + Ok(h.handle(req, &node_config).await?) } else if let Some(h) = api1::Api1EventsBinaryHandler::handler(&req) { - h.handle(req, ctx, &node_config).await + Ok(h.handle(req, ctx, &node_config).await?) } else if let Some(h) = pulsemap::MapPulseScyllaHandler::handler(&req) { - h.handle(req, &node_config).await + Ok(h.handle(req, &node_config).await?) } else if let Some(h) = pulsemap::IndexFullHttpFunction::handler(&req) { - h.handle(req, &node_config).await + Ok(h.handle(req, &node_config).await?) } else if let Some(h) = pulsemap::MarkClosedHttpFunction::handler(&req) { - h.handle(req, &node_config).await + Ok(h.handle(req, &node_config).await?) } else if let Some(h) = pulsemap::MapPulseLocalHttpFunction::handler(&req) { - h.handle(req, &node_config).await + Ok(h.handle(req, &node_config).await?) } else if let Some(h) = pulsemap::MapPulseHistoHttpFunction::handler(&req) { - h.handle(req, &node_config).await + Ok(h.handle(req, &node_config).await?) } else if let Some(h) = pulsemap::MapPulseHttpFunction::handler(&req) { - h.handle(req, &node_config).await + Ok(h.handle(req, &node_config).await?) } else if let Some(h) = pulsemap::Api4MapPulse2HttpFunction::handler(&req) { - h.handle(req, &node_config).await + Ok(h.handle(req, &node_config).await?) } else if let Some(h) = pulsemap::Api4MapPulseHttpFunction::handler(&req) { - h.handle(req, &node_config).await + Ok(h.handle(req, &node_config).await?) } else if let Some(h) = api1::RequestStatusHandler::handler(&req) { - h.handle(req, &node_config).await + Ok(h.handle(req, &node_config).await?) } else if path.starts_with("/api/1/documentation/") { if req.method() == Method::GET { api_1_docs(path) @@ -670,26 +682,33 @@ async fn update_search_cache( Ok(ret) } -#[derive(Serialize)] +#[derive(Debug, Serialize)] pub struct StatusBoardEntry { #[allow(unused)] #[serde(serialize_with = "instant_serde::ser")] ts_created: SystemTime, #[serde(serialize_with = "instant_serde::ser")] ts_updated: SystemTime, - #[serde(skip_serializing_if = "is_false")] - is_error: bool, - #[serde(skip_serializing_if = "is_false")] - is_ok: bool, + // #[serde(skip_serializing_if = "is_false")] + done: bool, // #[serde(skip_serializing_if = "Vec::is_empty")] - #[serde(skip)] - errors: Vec>, + errors: Vec<::err::Error>, + // TODO make this a better Stats container and remove pub access. + // #[serde(default, skip_serializing_if = "CmpZero::is_zero")] + error_count: usize, + // #[serde(default, skip_serializing_if = "CmpZero::is_zero")] + warn_count: usize, + // #[serde(default, skip_serializing_if = "CmpZero::is_zero")] + channel_not_found: usize, + // #[serde(default, skip_serializing_if = "CmpZero::is_zero")] + subreq_fail: usize, } mod instant_serde { use super::*; use netpod::DATETIME_FMT_3MS; use serde::Serializer; + pub fn ser(x: &SystemTime, ser: S) -> Result { use chrono::LocalResult; let dur = x.duration_since(std::time::UNIX_EPOCH).unwrap(); @@ -713,14 +732,48 @@ impl StatusBoardEntry { Self { ts_created: SystemTime::now(), ts_updated: SystemTime::now(), - is_error: false, - is_ok: false, + done: false, errors: Vec::new(), + error_count: 0, + warn_count: 0, + channel_not_found: 0, + subreq_fail: 0, + } + } + + pub fn warn_inc(&mut self) { + self.warn_count += 1; + } + + pub fn channel_not_found_inc(&mut self) { + self.channel_not_found += 1; + } +} + +#[derive(Debug, Serialize)] +pub struct StatusBoardEntryUser { + // #[serde(default, skip_serializing_if = "CmpZero::is_zero")] + error_count: usize, + // #[serde(default, skip_serializing_if = "CmpZero::is_zero")] + warn_count: usize, + // #[serde(default, skip_serializing_if = "CmpZero::is_zero")] + channel_not_found: usize, + #[serde(skip_serializing_if = "Vec::is_empty")] + errors: Vec<::err::PublicError>, +} + +impl From<&StatusBoardEntry> for StatusBoardEntryUser { + fn from(e: &StatusBoardEntry) -> Self { + Self { + error_count: e.error_count, + warn_count: e.warn_count, + channel_not_found: e.channel_not_found, + errors: e.errors.iter().map(|e| e.to_public_error()).collect(), } } } -#[derive(Serialize)] +#[derive(Debug, Serialize)] pub struct StatusBoard { entries: BTreeMap, } @@ -741,7 +794,7 @@ impl StatusBoard { f.read_exact(&mut buf).unwrap(); let n = u32::from_le_bytes(buf); let s = format!("{:08x}", n); - info!("new_status_id {s}"); + debug!("new_status_id {s}"); self.entries.insert(s.clone(), StatusBoardEntry::new()); s } @@ -757,6 +810,10 @@ impl StatusBoard { } } + pub fn get_entry(&mut self, status_id: &str) -> Option<&mut StatusBoardEntry> { + self.entries.get_mut(status_id) + } + pub fn mark_alive(&mut self, status_id: &str) { match self.entries.get_mut(status_id) { Some(e) => { @@ -768,12 +825,25 @@ impl StatusBoard { } } - pub fn mark_ok(&mut self, status_id: &str) { + pub fn mark_done(&mut self, status_id: &str) { match self.entries.get_mut(status_id) { Some(e) => { e.ts_updated = SystemTime::now(); - if !e.is_error { - e.is_ok = true; + e.done = true; + } + None => { + error!("can not find status id {}", status_id); + } + } + } + + pub fn add_error(&mut self, status_id: &str, err: ::err::Error) { + match self.entries.get_mut(status_id) { + Some(e) => { + e.ts_updated = SystemTime::now(); + if e.errors.len() < 100 { + e.errors.push(err); + e.error_count += 1; } } None => { @@ -782,51 +852,18 @@ impl StatusBoard { } } - pub fn add_error(&mut self, status_id: &str, error: E) - where - E: Into>, - { - match self.entries.get_mut(status_id) { - Some(e) => { - e.ts_updated = SystemTime::now(); - e.is_error = true; - e.is_ok = false; - e.errors.push(error.into()); - } - None => { - error!("can not find status id {}", status_id); - } - } - } - - pub fn status_as_json(&self, status_id: &str) -> String { - #[derive(Serialize)] - struct StatJs { - #[serde(skip_serializing_if = "Vec::is_empty")] - errors: Vec<::err::PublicError>, - } + pub fn status_as_json(&self, status_id: &str) -> StatusBoardEntryUser { match self.entries.get(status_id) { - Some(e) => { - if e.is_ok { - let js = StatJs { errors: Vec::new() }; - return serde_json::to_string(&js).unwrap(); - } else if e.is_error { - // TODO - // let errors = e.errors.iter().map(|e| (&e.0).into()).collect(); - let errors = vec![err::Error::with_msg_no_trace("TODO convert to user error").into()]; - let js = StatJs { errors }; - return serde_json::to_string(&js).unwrap(); - } else { - warn!("requestStatus for unfinished {status_id}"); - let js = StatJs { errors: Vec::new() }; - return serde_json::to_string(&js).unwrap(); - } - } + Some(e) => e.into(), None => { error!("can not find status id {}", status_id); let e = ::err::Error::with_public_msg_no_trace(format!("Request status ID unknown {status_id}")); - let js = StatJs { errors: vec![e.into()] }; - return serde_json::to_string(&js).unwrap(); + StatusBoardEntryUser { + error_count: 1, + warn_count: 0, + channel_not_found: 0, + errors: vec![::err::Error::with_public_msg_no_trace("request-id not found").into()], + } } } } diff --git a/crates/httpret/src/prometheus.rs b/crates/httpret/src/prometheus.rs index 95ce639..2e46700 100644 --- a/crates/httpret/src/prometheus.rs +++ b/crates/httpret/src/prometheus.rs @@ -423,7 +423,7 @@ pub async fn host(bind: SocketAddr) -> Result<(), RetrievalError> { Ok::<_, RetrievalError>(service_fn({ move |req| { info!( - "REQUEST {:?} - {:?} - {:?} - {:?}", + "http-request {:?} - {:?} - {:?} - {:?}", addr, req.method(), req.uri(), diff --git a/crates/httpret/src/proxy.rs b/crates/httpret/src/proxy.rs index 145bd81..5630187 100644 --- a/crates/httpret/src/proxy.rs +++ b/crates/httpret/src/proxy.rs @@ -6,6 +6,7 @@ use crate::api1::channel_search_list_v1; use crate::api1::gather_json_2_v1; use crate::api_1_docs; use crate::api_4_docs; +use crate::err::Error; use crate::gather::gather_get_json_generic; use crate::gather::SubRes; use crate::pulsemap::MapPulseQuery; @@ -14,7 +15,6 @@ use crate::response_err; use crate::Cont; use crate::ReqCtx; use crate::PSI_DAQBUFFER_SERVICE_MARK; -use err::Error; use futures_util::pin_mut; use futures_util::Stream; use http::Method; @@ -66,9 +66,8 @@ pub async fn proxy(proxy_config: ProxyConfig) -> Result<(), Error> { async move { Ok::<_, Error>(service_fn({ move |req| { - // TODO send to logstash info!( - "REQUEST {:?} - {:?} - {:?} - {:?}", + "http-request {:?} - {:?} - {:?} - {:?}", addr, req.method(), req.uri(), @@ -159,13 +158,13 @@ async fn proxy_http_service_inner( Ok(proxy_single_backend_query::(req, ctx, proxy_config).await?) } else if path.starts_with("/api/1/documentation/") { if req.method() == Method::GET { - api_1_docs(path) + Ok(api_1_docs(path)?) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } } else if path.starts_with("/api/4/documentation/") { if req.method() == Method::GET { - api_4_docs(path) + Ok(api_4_docs(path)?) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } diff --git a/crates/httpret/src/proxy/api1.rs b/crates/httpret/src/proxy/api1.rs index 64d224e..d5e3ef9 100644 --- a/crates/httpret/src/proxy/api1.rs +++ b/crates/httpret/src/proxy/api1.rs @@ -1,8 +1,8 @@ pub mod reqstatus; use crate::bodystream::response; +use crate::err::Error; use crate::ReqCtx; -use err::Error; use http::HeaderValue; use http::Method; use http::Request; diff --git a/crates/httpret/src/proxy/api1/reqstatus.rs b/crates/httpret/src/proxy/api1/reqstatus.rs index 5905ba5..b365793 100644 --- a/crates/httpret/src/proxy/api1/reqstatus.rs +++ b/crates/httpret/src/proxy/api1/reqstatus.rs @@ -1,5 +1,5 @@ use crate::bodystream::response; -use err::Error; +use crate::err::Error; use http::Method; use http::Request; use http::Response; @@ -45,7 +45,7 @@ impl RequestStatusHandler { } let _body_data = hyper::body::to_bytes(body).await?; let status_id = &head.uri.path()[Self::path_prefix().len()..]; - info!("RequestStatusHandler status_id {:?}", status_id); + debug!("RequestStatusHandler status_id {:?}", status_id); let back = { let mut ret = None; @@ -59,7 +59,7 @@ impl RequestStatusHandler { }; if let Some(back) = back { let url_str = format!("{}{}{}", back.url, Self::path_prefix(), status_id); - info!("try to ask {url_str}"); + debug!("try to ask {url_str}"); let req = Request::builder() .method(Method::GET) .uri(url_str) @@ -71,7 +71,7 @@ impl RequestStatusHandler { error!("backend returned error: {head:?}"); Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?) } else { - info!("backend returned OK"); + debug!("backend returned OK"); Ok(response(StatusCode::OK).body(body)?) } } else { diff --git a/crates/httpret/src/proxy/api4.rs b/crates/httpret/src/proxy/api4.rs index 612b044..4a5a3be 100644 --- a/crates/httpret/src/proxy/api4.rs +++ b/crates/httpret/src/proxy/api4.rs @@ -1,12 +1,12 @@ pub mod caioclookup; use crate::bodystream::ToPublicResponse; +use crate::err::Error; use crate::gather::gather_get_json_generic; use crate::gather::SubRes; use crate::gather::Tag; use crate::response; use crate::ReqCtx; -use err::Error; use futures_util::Future; use http::Method; use http::Request; diff --git a/crates/httpret/src/proxy/api4/caioclookup.rs b/crates/httpret/src/proxy/api4/caioclookup.rs index ac93a16..105c654 100644 --- a/crates/httpret/src/proxy/api4/caioclookup.rs +++ b/crates/httpret/src/proxy/api4/caioclookup.rs @@ -1,6 +1,6 @@ use crate::bodystream::response; +use crate::err::Error; use crate::ReqCtx; -use err::Error; use http::Request; use http::Response; use http::StatusCode; diff --git a/crates/httpret/src/pulsemap.rs b/crates/httpret/src/pulsemap.rs index 57015c3..aa39bcf 100644 --- a/crates/httpret/src/pulsemap.rs +++ b/crates/httpret/src/pulsemap.rs @@ -1,3 +1,4 @@ +use crate::err::Error; use crate::response; use async_channel::Receiver; use async_channel::Sender; @@ -6,7 +7,6 @@ use bytes::BufMut; use bytes::BytesMut; use chrono::TimeZone; use chrono::Utc; -use err::Error; use futures_util::stream::FuturesOrdered; use futures_util::stream::FuturesUnordered; use futures_util::FutureExt; diff --git a/crates/httpret/src/settings.rs b/crates/httpret/src/settings.rs index ed3c78a..3b42ccc 100644 --- a/crates/httpret/src/settings.rs +++ b/crates/httpret/src/settings.rs @@ -1,5 +1,5 @@ +use crate::err::Error; use crate::response; -use err::Error; use http::Method; use http::StatusCode; use hyper::Body; diff --git a/crates/items_0/src/streamitem.rs b/crates/items_0/src/streamitem.rs index 3e12eef..fb086f4 100644 --- a/crates/items_0/src/streamitem.rs +++ b/crates/items_0/src/streamitem.rs @@ -42,6 +42,7 @@ pub enum StatsItem { EventDataReadStats(EventDataReadStats), RangeFilterStats(RangeFilterStats), DiskStats(DiskStats), + Warnings(), } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] diff --git a/crates/items_2/src/eventfull.rs b/crates/items_2/src/eventfull.rs index da397c4..9341c84 100644 --- a/crates/items_2/src/eventfull.rs +++ b/crates/items_2/src/eventfull.rs @@ -80,7 +80,7 @@ mod decomps_serde { } impl EventFull { - pub fn add_event( + pub fn push( &mut self, ts: u64, pulse: u64, @@ -118,6 +118,13 @@ impl EventFull { self.shapes.truncate(nkeep); self.comps.truncate(nkeep); } + + // NOTE needed because the databuffer actually doesn't write the correct shape per event. + pub fn overwrite_all_shapes(&mut self, shape: &Shape) { + for u in &mut self.shapes { + *u = shape.clone(); + } + } } impl FrameTypeInnerStatic for EventFull { diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 3a92bdc..eb9672c 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -69,6 +69,11 @@ impl CmpZero for u32 { *self == 0 } } +impl CmpZero for usize { + fn is_zero(&self) -> bool { + *self == 0 + } +} pub struct BodyStream { //pub receiver: async_channel::Receiver>, @@ -2265,6 +2270,17 @@ impl ReadExactStats { } } +#[derive(Debug, Serialize, Deserialize)] +pub struct Api1WarningStats { + pub subreq_fail: usize, +} + +impl Api1WarningStats { + pub fn new() -> Self { + Self { subreq_fail: 0 } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ByteSize(pub u32); diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index 8642c3e..0c66fac 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -237,7 +237,7 @@ async fn events_conn_handler_with_reqid( { let item = LogItem { node_ix: ncc.ix as _, - level: Level::INFO, + level: Level::DEBUG, msg: format!("buf_len_histo: {:?}", buf_len_histo), }; let item: Sitemty = Ok(StreamItem::Log(item)); diff --git a/crates/streams/src/collect.rs b/crates/streams/src/collect.rs index 3de80b7..2a32b45 100644 --- a/crates/streams/src/collect.rs +++ b/crates/streams/src/collect.rs @@ -128,6 +128,7 @@ impl Collect { //total_duration += k.duration; } }, + _ => {} } Ok(()) } @@ -281,6 +282,7 @@ where total_duration += k.duration; } }, + _ => {} } } }, diff --git a/crates/streams/src/needminbuffer.rs b/crates/streams/src/needminbuffer.rs index ccfea61..684d61c 100644 --- a/crates/streams/src/needminbuffer.rs +++ b/crates/streams/src/needminbuffer.rs @@ -40,7 +40,7 @@ impl NeedMinBuffer { // TODO collect somewhere else impl Drop for NeedMinBuffer { fn drop(&mut self) { - debug!("NeedMinBuffer Drop Stats:\nbuf_len_histo: {:?}", self.buf_len_histo); + debug!("NeedMinBuffer-drop {{ buf_len_histo: {:?} }}", self.buf_len_histo); } }