From 8938e55f86de4d82feb591b797c0b6cde6dc3625 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 27 Jun 2023 17:02:37 +0200 Subject: [PATCH] WIP --- .cargo/cargo-lock | 168 +++++---- disk/src/channelconfig.rs | 48 ++- disk/src/eventblobs.rs | 1 + disk/src/eventchunker.rs | 424 ++++++++++++----------- disk/src/merge/mergedblobsfromremotes.rs | 3 +- disk/src/raw/conn.rs | 8 +- httpclient/src/httpclient.rs | 28 +- httpclient/src/lib.rs | 1 + httpret/src/api1.rs | 47 ++- httpret/src/api4/binned.rs | 5 +- httpret/src/api4/events.rs | 6 +- httpret/src/channel_status.rs | 5 +- httpret/src/channelconfig.rs | 83 ++++- httpret/src/httpret.rs | 2 + netpod/src/netpod.rs | 4 +- nodenet/src/channelconfig.rs | 66 ++-- nodenet/src/configquorum.rs | 20 +- parse/src/channelconfig.rs | 62 ++-- streams/src/tcprawclient.rs | 1 - 19 files changed, 597 insertions(+), 385 deletions(-) diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index 5dc0386..1266713 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -78,15 +78,15 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41ed9a86bf92ae6580e0a31281f65a1b1d867c0cc68d5346e2ae128dddfa6a7d" +checksum = "3a30da5c5f2d5e72842e00bcb57657162cdabef0931f40e2deb9b4140440cecd" [[package]] name = "anstyle-parse" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e765fd216e48e067936442276d1d57399e37bce53c264d6fefbe298080cb57ee" +checksum = "938874ff5980b03a87c5524b3ae5b59cf99b1d6bc836848df7bc5ada9643c333" dependencies = [ "utf8parse", ] @@ -164,7 +164,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.22", ] [[package]] @@ -175,7 +175,7 @@ checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.22", ] [[package]] @@ -381,9 +381,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.3.4" +version = "4.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80672091db20273a15cf9fdd4e47ed43b5091ec9841bf4c6145c9dfbbcae09ed" +checksum = "d9394150f5b4273a1763355bd1c2ec54cc5a2593f790587bcd6b2c947cfa9211" dependencies = [ "clap_builder", "clap_derive", @@ -392,9 +392,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.3.4" +version = "4.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1458a1df40e1e2afebb7ab60ce55c1fa8f431146205aa5f4887e0b111c27636" +checksum = "9a78fbdd3cc2914ddf37ba444114bc7765bbdcb55ec9cbe6fa054f0137400717" dependencies = [ "anstream", "anstyle", @@ -413,7 +413,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.22", ] [[package]] @@ -587,8 +587,8 @@ checksum = "624b54323b06e675293939311943ba82d323bb340468ce1889be5da7932c8d73" dependencies = [ "cranelift-entity", "fxhash", - "hashbrown", - "indexmap", + "hashbrown 0.12.3", + "indexmap 1.9.3", "log", "smallvec", ] @@ -753,7 +753,7 @@ dependencies = [ "items_2", "lazy_static", "netpod", - "nom", + "parse", "query", "rmp-serde", "serde", @@ -787,7 +787,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.22", ] [[package]] @@ -798,7 +798,7 @@ checksum = "29a358ff9f12ec09c3e61fef9b5a9902623a695a46a917b07f269bff1445611a" dependencies = [ "darling_core", "quote", - "syn 2.0.18", + "syn 2.0.22", ] [[package]] @@ -808,7 +808,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" dependencies = [ "cfg-if", - "hashbrown", + "hashbrown 0.12.3", "lock_api", "once_cell", "parking_lot_core 0.9.8", @@ -963,9 +963,15 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.22", ] +[[package]] +name = "equivalent" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88bffebc5d80432c9b140ee17875ff173a8ab62faad5b257da912bd2f6c1c0a1" + [[package]] name = "erased-serde" version = "0.3.25" @@ -1154,7 +1160,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.22", ] [[package]] @@ -1224,7 +1230,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22030e2c5a68ec659fde1e949a745124b48e6fa8b045b7ed5bd1fe4ccc5c4e5d" dependencies = [ "fallible-iterator", - "indexmap", + "indexmap 1.9.3", "stable_deref_trait", ] @@ -1236,9 +1242,9 @@ checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e" [[package]] name = "h2" -version = "0.3.19" +version = "0.3.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d357c7ae988e7d2182f7d7871d0b963962420b0678b0997ce7de72001aeab782" +checksum = "97ec8491ebaf99c8eaa73058b045fe58073cd6be7f596ac993ced0b0a0c01049" dependencies = [ "bytes", "fnv", @@ -1246,7 +1252,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap", + "indexmap 1.9.3", "slab", "tokio", "tokio-util", @@ -1272,6 +1278,12 @@ dependencies = [ "ahash", ] +[[package]] +name = "hashbrown" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" + [[package]] name = "hdrhistogram" version = "7.5.2" @@ -1404,6 +1416,7 @@ dependencies = [ "err", "futures-util", "http", + "httpclient", "hyper", "items_0", "items_2", @@ -1444,9 +1457,9 @@ dependencies = [ [[package]] name = "hyper" -version = "0.14.26" +version = "0.14.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab302d72a6f11a3b910431ff93aae7e773078c769f0a3ef15fb9ec692ed147d4" +checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468" dependencies = [ "bytes", "futures-channel", @@ -1537,7 +1550,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.12.3", +] + +[[package]] +name = "indexmap" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" +dependencies = [ + "equivalent", + "hashbrown 0.14.0", ] [[package]] @@ -1615,7 +1638,7 @@ dependencies = [ name = "items_proc" version = "0.0.2" dependencies = [ - "syn 2.0.18", + "syn 2.0.22", ] [[package]] @@ -1656,9 +1679,9 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" [[package]] name = "libc" -version = "0.2.146" +version = "0.2.147" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f92be4933c13fd498862a9e02a3055f8a8d9c039ce33db97306fd5a6caa7f29b" +checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" [[package]] name = "linux-raw-sys" @@ -1863,6 +1886,7 @@ dependencies = [ "err", "futures-util", "hex", + "httpclient", "items_0", "items_2", "netpod", @@ -1989,9 +2013,9 @@ checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" [[package]] name = "openssl" -version = "0.10.54" +version = "0.10.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69b3f656a17a6cbc115b5c7a40c616947d213ba182135b014d6051b73ab6f019" +checksum = "345df152bc43501c5eb9e4654ff05f794effb78d4efe3d53abc158baddc0703d" dependencies = [ "bitflags", "cfg-if", @@ -2010,7 +2034,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.22", ] [[package]] @@ -2021,9 +2045,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.88" +version = "0.9.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2ce0f250f34a308dcfdbb351f511359857d4ed2134ba715a4eadd46e1ffd617" +checksum = "374533b0e45f3a7ced10fcaeccca020e66656bc03dac384f852e4e5a7a8104a6" dependencies = [ "cc", "libc", @@ -2117,18 +2141,18 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" [[package]] name = "phf" -version = "0.11.1" +version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "928c6535de93548188ef63bb7c4036bd415cd8f36ad25af44b9789b2ee72a48c" +checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" dependencies = [ "phf_shared", ] [[package]] name = "phf_shared" -version = "0.11.1" +version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1fb5f6f826b772a8d4c0394209441e7d37cbbb967ae9c7e0e8134365c9ee676" +checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" dependencies = [ "siphasher", ] @@ -2150,7 +2174,7 @@ checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.22", ] [[package]] @@ -2245,9 +2269,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.60" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dec2b086b7a862cf4de201096214fa870344cf922b2b30c167badb3af3195406" +checksum = "7b368fba921b0dce7e60f5e04ec15e565b3303972b42bcfde1d0713b881959eb" dependencies = [ "unicode-ident", ] @@ -2487,8 +2511,8 @@ checksum = "0200c8230b013893c0b2d6213d6ec64ed2b9be2e0e016682b7224ff82cff5c58" dependencies = [ "bitvec", "bytecheck", - "hashbrown", - "indexmap", + "hashbrown 0.12.3", + "indexmap 1.9.3", "ptr_meta", "rend", "rkyv_derive", @@ -2736,14 +2760,14 @@ checksum = "d9735b638ccc51c28bf6914d90a2e9725b377144fc612c49a611fddd1b631d68" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.22", ] [[package]] name = "serde_json" -version = "1.0.97" +version = "1.0.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdf3bf93142acad5821c99197022e170842cdbc1c30482b98750c688c640842a" +checksum = "46266871c240a00b8f503b877622fe33430b3c7d963bdc0f2adc511e54a1eae3" dependencies = [ "itoa", "ryu", @@ -2752,11 +2776,11 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.9.21" +version = "0.9.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9d684e3ec7de3bf5466b32bd75303ac16f0736426e5a4e0d6e489559ce1249c" +checksum = "452e67b9c20c37fa79df53201dc03839651086ed9bbe92b3ca585ca9fdaa7d85" dependencies = [ - "indexmap", + "indexmap 2.0.0", "itoa", "ryu", "serde", @@ -2935,9 +2959,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.18" +version = "2.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32d41677bcbe24c20c52e7c70b0d8db04134c5d1066bf98662e2871ad200ea3e" +checksum = "2efbeae7acf4eabd6bcdcbd11c92f45231ddda7539edc7806bd1a04a03b24616" dependencies = [ "proc-macro2", "quote", @@ -2958,9 +2982,9 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "target-lexicon" -version = "0.12.7" +version = "0.12.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd1ba337640d60c3e96bc6f0638a939b9c9a7f2c316a1598c279828b3d1dc8c5" +checksum = "1b1c7f239eb94671427157bd93b3694320f3668d4e1eff08c7285366fd777fac" [[package]] name = "taskrun" @@ -3009,7 +3033,7 @@ checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.22", ] [[package]] @@ -3120,7 +3144,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.22", ] [[package]] @@ -3184,17 +3208,17 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.6.2" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a76a9312f5ba4c2dec6b9161fdf25d87ad8a09256ccea5a556fef03c706a10f" +checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b" [[package]] name = "toml_edit" -version = "0.19.10" +version = "0.19.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2380d56e8670370eee6566b0bfd4265f65b3f432e8c6d85623f728d4fa31f739" +checksum = "266f016b7f039eec8a1a80dfe6156b633d208b9fccca5e4db1d6775b0c4e34a7" dependencies = [ - "indexmap", + "indexmap 2.0.0", "toml_datetime", "winnow", ] @@ -3235,7 +3259,7 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", - "indexmap", + "indexmap 1.9.3", "pin-project", "pin-project-lite", "rand", @@ -3273,13 +3297,13 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.24" +version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74" +checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.22", ] [[package]] @@ -3441,9 +3465,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.3.4" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fa2982af2eec27de306107c027578ff7f423d65f7250e40ce0fea8f45248b81" +checksum = "d023da39d1fde5a8a3fe1f3e01ca9632ada0a63e9797de55a879d6e2236277be" dependencies = [ "getrandom", ] @@ -3508,7 +3532,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.22", "wasm-bindgen-shared", ] @@ -3553,7 +3577,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.22", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3573,7 +3597,7 @@ dependencies = [ "bytes", "cfg-if", "derivative", - "indexmap", + "indexmap 1.9.3", "js-sys", "more-asserts", "rustc-demangle", @@ -3654,7 +3678,7 @@ dependencies = [ "bytecheck", "enum-iterator", "enumset", - "indexmap", + "indexmap 1.9.3", "more-asserts", "rkyv", "target-lexicon", @@ -3675,7 +3699,7 @@ dependencies = [ "derivative", "enum-iterator", "fnv", - "indexmap", + "indexmap 1.9.3", "lazy_static", "libc", "mach", @@ -3694,7 +3718,7 @@ version = "0.95.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2ea896273ea99b15132414be1da01ab0d8836415083298ecaffbe308eaac87a" dependencies = [ - "indexmap", + "indexmap 1.9.3", "url", ] diff --git a/disk/src/channelconfig.rs b/disk/src/channelconfig.rs index f5da346..99e5d9b 100644 --- a/disk/src/channelconfig.rs +++ b/disk/src/channelconfig.rs @@ -1,5 +1,7 @@ use crate::SfDbChConf; -use err::Error; +use err::thiserror; +#[allow(unused)] +use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::NodeConfigCached; use netpod::SfDbChannel; @@ -7,13 +9,43 @@ use parse::channelconfig::extract_matching_config_entry; use parse::channelconfig::read_local_config; use parse::channelconfig::ChannelConfigs; use parse::channelconfig::ConfigEntry; +use parse::channelconfig::ConfigParseError; +use std::fmt; + +#[derive(Debug, thiserror::Error)] +pub enum ConfigError { + ParseError(ConfigParseError), + NotFound, + Error, +} + +impl fmt::Display for ConfigError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "ConfigError::{self:?}") + } +} + +impl From for ConfigError { + fn from(value: ConfigParseError) -> Self { + match value { + ConfigParseError::FileNotFound => ConfigError::NotFound, + x => ConfigError::ParseError(x), + } + } +} pub async fn config_entry_best_match( range: &NanoRange, channel: SfDbChannel, node_config: &NodeConfigCached, -) -> Result, Error> { - let channel_config = read_local_config(channel.clone(), node_config.clone()).await?; +) -> Result, ConfigError> { + let channel_config = match read_local_config(channel.clone(), node_config.clone()).await { + Ok(x) => x, + Err(e) => match e { + ConfigParseError::FileNotFound => return Ok(None), + e => return Err(e.into()), + }, + }; let entry_res = match extract_matching_config_entry(range, &channel_config) { Ok(k) => k, Err(e) => return Err(e)?, @@ -24,7 +56,10 @@ pub async fn config_entry_best_match( } } -pub async fn channel_configs(channel: SfDbChannel, node_config: &NodeConfigCached) -> Result { +pub async fn channel_configs( + channel: SfDbChannel, + node_config: &NodeConfigCached, +) -> Result { read_local_config(channel.clone(), node_config.clone()).await } @@ -32,14 +67,15 @@ pub async fn channel_config_best_match( range: NanoRange, channel: SfDbChannel, node_config: &NodeConfigCached, -) -> Result, Error> { +) -> Result, ConfigError> { let best = config_entry_best_match(&range, channel.clone(), node_config).await?; match best { None => Ok(None), Some(entry) => { let shape = match entry.to_shape() { Ok(k) => k, - Err(e) => return Err(e)?, + // TODO pass error to caller + Err(_e) => return Err(ConfigError::Error)?, }; let channel_config = SfDbChConf { channel: channel.clone(), diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 4b6ed4f..bf7ab87 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -103,6 +103,7 @@ impl Stream for EventChunkerMultifile { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let span1 = span!(Level::INFO, "EvChMul", node_ix = self.node_ix); let _spg = span1.enter(); + info!("EventChunkerMultifile poll_next"); use Poll::*; 'outer: loop { break if let Some(item) = self.log_queue.pop_front() { diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 1c22e80..fcbaae7 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -1,6 +1,7 @@ use bitshuffle::bitshuffle_decompress; use bytes::Buf; use bytes::BytesMut; +use err::thiserror; use err::Error; use futures_util::Stream; use futures_util::StreamExt; @@ -20,6 +21,7 @@ use netpod::ScalarType; use netpod::SfChFetchInfo; use netpod::Shape; use parse::channelconfig::CompressionMethod; +use std::io::Cursor; use std::path::PathBuf; use std::pin::Pin; use std::task::Context; @@ -29,11 +31,46 @@ use streams::dtflags::*; use streams::filechunkread::FileChunkRead; use streams::needminbuffer::NeedMinBuffer; +#[derive(Debug, thiserror::Error)] +pub enum DataParseError { + #[error("DataFrameLengthMismatch")] + DataFrameLengthMismatch, + #[error("FileHeaderTooShort")] + FileHeaderTooShort, + #[error("BadVersionTag")] + BadVersionTag, + #[error("HeaderTooLarge")] + HeaderTooLarge, + #[error("Utf8Error")] + Utf8Error, + #[error("EventTooShort")] + EventTooShort, + #[error("EventTooLong")] + EventTooLong, + #[error("TooManyBeforeRange")] + TooManyBeforeRange, + #[error("EventWithOptional")] + EventWithOptional, + #[error("BadTypeIndex")] + BadTypeIndex, + #[error("WaveShapeWithoutEventArray")] + WaveShapeWithoutEventArray, + #[error("ShapedWithoutDims")] + ShapedWithoutDims, + #[error("TooManyDims")] + TooManyDims, + #[error("UnknownCompression")] + UnknownCompression, + #[error("BadCompresionBlockSize")] + BadCompresionBlockSize, +} + pub struct EventChunker { inp: NeedMinBuffer, state: DataFileState, need_min: u32, fetch_info: SfChFetchInfo, + need_min_max: u32, errored: bool, completed: bool, range: NanoRange, @@ -44,16 +81,17 @@ pub struct EventChunker { final_stats_sent: bool, parsed_bytes: u64, dbg_path: PathBuf, - max_ts: u64, + last_ts: u64, expand: bool, do_decompress: bool, decomp_dt_histo: HistoLog2, item_len_emit_histo: HistoLog2, seen_before_range_count: usize, seen_after_range_count: usize, - unordered_warn_count: usize, - repeated_ts_warn_count: usize, + unordered_count: usize, + repeated_ts_count: usize, config_mismatch_discard: usize, + discard_count: usize, } impl Drop for EventChunker { @@ -90,7 +128,68 @@ impl EventChunkerConf { } } +fn is_config_match(is_array: &bool, ele_count: &u64, fetch_info: &SfChFetchInfo) -> bool { + match fetch_info.shape() { + Shape::Scalar => { + if *is_array { + false + } else { + true + } + } + Shape::Wave(dim1count) => { + if (*dim1count as u64) != *ele_count { + false + } else { + true + } + } + Shape::Image(n1, n2) => { + let nt = (*n1 as u64) * (*n2 as u64); + if nt != *ele_count { + false + } else { + true + } + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum DecompError { + #[error("Error")] + Error, +} + +fn decompress(databuf: &[u8], type_size: u32, ele_count: u64) -> Result, DecompError> { + if databuf.len() < 13 { + return Err(DecompError::Error); + } + let ts1 = Instant::now(); + let decomp_bytes = type_size as u64 * ele_count; + let mut decomp = vec![0; decomp_bytes as usize]; + let ele_size = type_size; + // TODO limit the buf slice range + match bitshuffle_decompress(&databuf[12..], &mut decomp, ele_count as usize, ele_size as usize, 0) { + Ok(c1) => { + if 12 + c1 != databuf.len() {} + let ts2 = Instant::now(); + let dt = ts2.duration_since(ts1); + // TODO analyze the histo + //self.decomp_dt_histo.ingest(dt.as_secs() as u32 + dt.subsec_micros()); + Ok(decomp) + } + Err(e) => { + return Err(DecompError::Error); + } + } +} + impl EventChunker { + pub fn self_name() -> &'static str { + std::any::type_name::() + } + // TODO `expand` flag usage pub fn from_start( inp: Pin> + Send>>, @@ -101,13 +200,24 @@ impl EventChunker { expand: bool, do_decompress: bool, ) -> Self { - info!("EventChunker::{} do_decompress {}", "from_start", do_decompress); + info!( + "{}::{} do_decompress {}", + Self::self_name(), + "from_start", + do_decompress + ); + let need_min_max = match fetch_info.shape() { + Shape::Scalar => 1024 * 8, + Shape::Wave(_) => 1024 * 32, + Shape::Image(_, _) => 1024 * 1024 * 40, + }; let mut inp = NeedMinBuffer::new(inp); inp.set_need_min(6); Self { inp, state: DataFileState::FileHeader, need_min: 6, + need_min_max, fetch_info, errored: false, completed: false, @@ -119,16 +229,17 @@ impl EventChunker { final_stats_sent: false, parsed_bytes: 0, dbg_path, - max_ts: 0, + last_ts: 0, expand, do_decompress, decomp_dt_histo: HistoLog2::new(8), item_len_emit_histo: HistoLog2::new(0), seen_before_range_count: 0, seen_after_range_count: 0, - unordered_warn_count: 0, - repeated_ts_warn_count: 0, + unordered_count: 0, + repeated_ts_count: 0, config_mismatch_discard: 0, + discard_count: 0, } } @@ -143,8 +254,10 @@ impl EventChunker { do_decompress: bool, ) -> Self { info!( - "EventChunker::{} do_decompress {}", - "from_event_boundary", do_decompress + "{}::{} do_decompress {}", + Self::self_name(), + "from_event_boundary", + do_decompress ); let mut ret = Self::from_start(inp, fetch_info, range, stats_conf, dbg_path, expand, do_decompress); ret.state = DataFileState::Event; @@ -154,13 +267,17 @@ impl EventChunker { } fn parse_buf(&mut self, buf: &mut BytesMut) -> Result { - span!(Level::INFO, "EventChunker::parse_buf").in_scope(|| self.parse_buf_inner(buf)) + span!(Level::INFO, "EventChunker::parse_buf") + .in_scope(|| self.parse_buf_inner(buf)) + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}"))) } - fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result { + fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result { + use byteorder::ReadBytesExt; + use byteorder::BE; + info!("parse_buf_inner buf len {}", buf.len()); let mut ret = EventFull::empty(); let mut parsed_bytes = 0; - use byteorder::{ReadBytesExt, BE}; loop { if (buf.len() as u32) < self.need_min { break; @@ -168,16 +285,16 @@ impl EventChunker { match self.state { DataFileState::FileHeader => { if buf.len() < 6 { - Err(Error::with_msg("need min 6 for FileHeader"))?; + return Err(DataParseError::FileHeaderTooShort); } - let mut sl = std::io::Cursor::new(buf.as_ref()); + let mut sl = Cursor::new(buf.as_ref()); let fver = sl.read_i16::().unwrap(); if fver != 0 { - Err(Error::with_msg("unexpected data file version"))?; + return Err(DataParseError::BadVersionTag); } let len = sl.read_i32::().unwrap(); - if len <= 0 || len >= 128 { - Err(Error::with_msg("large channel header len"))?; + if len <= 0 || len >= 512 { + return Err(DataParseError::HeaderTooLarge); } let totlen = len as usize + 2; if buf.len() < totlen { @@ -187,9 +304,10 @@ impl EventChunker { sl.advance(len as usize - 8); let len2 = sl.read_i32::().unwrap(); if len != len2 { - Err(Error::with_msg("channel header len mismatch"))?; + return Err(DataParseError::DataFrameLengthMismatch); } - String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec())?; + let _ = String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec()) + .map_err(|_| DataParseError::Utf8Error); self.state = DataFileState::Event; self.need_min = 4; buf.advance(totlen); @@ -198,58 +316,62 @@ impl EventChunker { } DataFileState::Event => { let p0 = 0; - let mut sl = std::io::Cursor::new(buf.as_ref()); + let mut sl = Cursor::new(buf.as_ref()); let len = sl.read_i32::().unwrap(); - if len < 20 || len > 1024 * 1024 * 20 { - Err(Error::with_msg("unexpected large event chunk"))?; + if len < 20 { + return Err(DataParseError::EventTooShort); + } + match self.fetch_info.shape() { + Shape::Scalar if len > 512 => return Err(DataParseError::EventTooLong), + Shape::Wave(_) if len > 8 * 1024 * 256 => return Err(DataParseError::EventTooLong), + Shape::Image(_, _) if len > 1024 * 1024 * 40 => return Err(DataParseError::EventTooLong), + _ => {} } let len = len as u32; if (buf.len() as u32) < len { self.need_min = len as u32; break; } else { - let mut sl = std::io::Cursor::new(buf.as_ref()); - let len1b = sl.read_i32::().unwrap(); - assert!(len == len1b as u32); + let mut discard = false; let _ttl = sl.read_i64::().unwrap(); let ts = sl.read_i64::().unwrap() as u64; let pulse = sl.read_i64::().unwrap() as u64; - if ts == self.max_ts { - if self.repeated_ts_warn_count < 20 { + if ts == self.last_ts { + self.repeated_ts_count += 1; + if self.repeated_ts_count < 20 { let msg = format!( - "EventChunker repeated event ts ix {} ts {}.{} max_ts {}.{} config {:?} path {:?}", - self.repeated_ts_warn_count, + "EventChunker repeated event ts ix {} ts {}.{} last_ts {}.{} config {:?} path {:?}", + self.repeated_ts_count, ts / SEC, ts % SEC, - self.max_ts / SEC, - self.max_ts % SEC, + self.last_ts / SEC, + self.last_ts % SEC, self.fetch_info.shape(), self.dbg_path ); warn!("{}", msg); - self.repeated_ts_warn_count += 1; } } - if ts < self.max_ts { - if self.unordered_warn_count < 20 { + if ts < self.last_ts { + discard = true; + self.unordered_count += 1; + if self.unordered_count < 20 { let msg = format!( - "EventChunker unordered event ix {} ts {}.{} max_ts {}.{} config {:?} path {:?}", - self.unordered_warn_count, + "EventChunker unordered event ix {} ts {}.{} last_ts {}.{} config {:?} path {:?}", + self.unordered_count, ts / SEC, ts % SEC, - self.max_ts / SEC, - self.max_ts % SEC, + self.last_ts / SEC, + self.last_ts % SEC, self.fetch_info.shape(), self.dbg_path ); warn!("{}", msg); - self.unordered_warn_count += 1; - let e = Error::with_public_msg_no_trace(msg); - return Err(e); } } - self.max_ts = ts; + self.last_ts = ts; if ts >= self.range.end { + discard = true; self.seen_after_range_count += 1; if !self.expand || self.seen_after_range_count >= 2 { self.seen_beyond_range = true; @@ -258,10 +380,12 @@ impl EventChunker { } } if ts < self.range.beg { + discard = true; self.seen_before_range_count += 1; - if self.seen_before_range_count > 1 { + if self.seen_before_range_count < 20 { let msg = format!( - "seen before range: event ts {}.{} range beg {}.{} range end {}.{} pulse {} config {:?} path {:?}", + "seen before range: {} event ts {}.{} range beg {}.{} range end {}.{} pulse {} config {:?} path {:?}", + self.seen_before_range_count, ts / SEC, ts % SEC, self.range.beg / SEC, @@ -273,8 +397,23 @@ impl EventChunker { self.dbg_path ); warn!("{}", msg); - let e = Error::with_public_msg(msg); - Err(e)?; + } + if self.seen_before_range_count > 100 { + let msg = format!( + "too many seen before range: {} event ts {}.{} range beg {}.{} range end {}.{} pulse {} config {:?} path {:?}", + self.seen_before_range_count, + ts / SEC, + ts % SEC, + self.range.beg / SEC, + self.range.beg % SEC, + self.range.end / SEC, + self.range.end % SEC, + pulse, + self.fetch_info.shape(), + self.dbg_path + ); + error!("{}", msg); + return Err(DataParseError::TooManyBeforeRange); } } let _ioc_ts = sl.read_i64::().unwrap(); @@ -282,33 +421,34 @@ impl EventChunker { let severity = sl.read_i8().unwrap(); let optional = sl.read_i32::().unwrap(); if status != 0 { - Err(Error::with_msg(format!("status != 0: {}", status)))?; + // return Err(DataParseError::UnexpectedStatus); + // TODO count } if severity != 0 { - Err(Error::with_msg(format!("severity != 0: {}", severity)))?; + // return Err(DataParseError::TooManyBeforeRange); + // TODO count } if optional != -1 { - Err(Error::with_msg(format!("optional != -1: {}", optional)))?; + return Err(DataParseError::EventWithOptional); } let type_flags = sl.read_u8().unwrap(); let type_index = sl.read_u8().unwrap(); if type_index > 13 { - Err(Error::with_msg(format!("type_index: {}", type_index)))?; + return Err(DataParseError::BadTypeIndex); } - let scalar_type = ScalarType::from_dtype_index(type_index)?; + let scalar_type = + ScalarType::from_dtype_index(type_index).map_err(|_| DataParseError::BadTypeIndex)?; let is_compressed = type_flags & COMPRESSION != 0; let is_array = type_flags & ARRAY != 0; let is_big_endian = type_flags & BIG_ENDIAN != 0; let is_shaped = type_flags & SHAPE != 0; if let Shape::Wave(_) = self.fetch_info.shape() { if !is_array { - Err(Error::with_msg(format!("dim1 but not array {:?}", self.fetch_info)))?; + return Err(DataParseError::WaveShapeWithoutEventArray); } } let compression_method = if is_compressed { sl.read_u8().unwrap() } else { 0 }; let shape_dim = if is_shaped { sl.read_u8().unwrap() } else { 0 }; - assert!(compression_method <= 0); - assert!(!is_shaped || (shape_dim >= 1 && shape_dim <= 2)); let mut shape_lens = [0, 0, 0, 0]; for i1 in 0..shape_dim { shape_lens[i1 as usize] = sl.read_u32::().unwrap(); @@ -319,8 +459,14 @@ impl EventChunker { Shape::Wave(shape_lens[0]) } else if shape_dim == 2 { Shape::Image(shape_lens[0], shape_lens[1]) + } else if shape_dim == 0 { + discard = true; + // return Err(DataParseError::ShapedWithoutDims); + Shape::Scalar } else { - err::todoval() + discard = true; + // return Err(DataParseError::TooManyDims); + Shape::Scalar } } else { Shape::Scalar @@ -330,14 +476,16 @@ impl EventChunker { if compression_method == 0 { Some(CompressionMethod::BitshuffleLZ4) } else { - err::todoval() + return Err(DataParseError::UnknownCompression); } } else { None }; let p1 = sl.position(); - let k1 = len as u64 - (p1 - p0) - 4; - if is_compressed { + let n1 = p1 - p0; + let n2 = len as u64 - n1 - 4; + let databuf = buf[p1 as usize..(p1 as usize + n2 as usize)].as_ref(); + if false && is_compressed { //debug!("event ts {} is_compressed {}", ts, is_compressed); let value_bytes = sl.read_u64::().unwrap(); let block_size = sl.read_u32::().unwrap(); @@ -353,128 +501,22 @@ impl EventChunker { assert!(value_bytes < 1024 * 1024 * 20); } } - assert!(block_size <= 1024 * 32); + if block_size > 1024 * 32 { + return Err(DataParseError::BadCompresionBlockSize); + } let type_size = scalar_type.bytes() as u32; - let ele_count = value_bytes / type_size as u64; - let ele_size = type_size; - let config_matches = match self.fetch_info.shape() { - Shape::Scalar => { - if is_array { - if false { - error!( - "channel config mismatch {:?} {:?} {:?} {:?}", - self.fetch_info, is_array, ele_count, self.dbg_path, - ); - } - if false { - return Err(Error::with_msg(format!( - "ChannelConfig expects {:?} but we find event is_array", - self.fetch_info, - ))); - } - false - } else { - true - } - } - Shape::Wave(dim1count) => { - if *dim1count != ele_count as u32 { - if false { - error!( - "channel config mismatch {:?} {:?} {:?} {:?}", - self.fetch_info, is_array, ele_count, self.dbg_path, - ); - } - if false { - return Err(Error::with_msg(format!( - "ChannelConfig expects {:?} but event has ele_count {}", - self.fetch_info, ele_count, - ))); - } - false - } else { - true - } - } - Shape::Image(n1, n2) => { - let nt = (*n1 as usize) * (*n2 as usize); - if nt != ele_count as usize { - if false { - error!( - "channel config mismatch {:?} {:?} {:?} {:?}", - self.fetch_info, is_array, ele_count, self.dbg_path, - ); - } - if false { - return Err(Error::with_msg(format!( - "ChannelConfig expects {:?} but event has ele_count {}", - self.fetch_info, ele_count, - ))); - } - false - } else { - true - } - } - }; - if config_matches { - let data = buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)].as_ref(); - let decomp = { - if self.do_decompress { - assert!(data.len() > 12); - let ts1 = Instant::now(); - let decomp_bytes = (type_size * ele_count as u32) as usize; - let mut decomp = vec![0; decomp_bytes]; - // TODO limit the buf slice range - match bitshuffle_decompress( - &data[12..], - &mut decomp, - ele_count as usize, - ele_size as usize, - 0, - ) { - Ok(c1) => { - assert!(c1 as u64 + 12 == k1); - let ts2 = Instant::now(); - let dt = ts2.duration_since(ts1); - // TODO analyze the histo - self.decomp_dt_histo.ingest(dt.as_secs() as u32 + dt.subsec_micros()); - Some(decomp) - } - Err(e) => { - return Err(Error::with_msg(format!("decompression failed {:?}", e)))?; - } - } - } else { - None - } - }; - ret.add_event( - ts, - pulse, - Some(data.to_vec()), - decomp, - ScalarType::from_dtype_index(type_index)?, - is_big_endian, - shape_this, - comp_this, - ); - } else { - self.config_mismatch_discard += 1; - } + let _ele_count = value_bytes / type_size as u64; + let _ele_size = type_size; + } + if discard { + self.discard_count += 1; } else { - if len < p1 as u32 + 4 { - let msg = format!("uncomp len: {} p1: {}", len, p1); - Err(Error::with_msg(msg))?; - } - let vlen = len - p1 as u32 - 4; - let data = &buf[p1 as usize..(p1 as u32 + vlen) as usize]; ret.add_event( ts, pulse, - Some(data.to_vec()), - Some(data.to_vec()), - ScalarType::from_dtype_index(type_index)?, + Some(databuf.to_vec()), + None, + scalar_type, is_big_endian, shape_this, comp_this, @@ -552,31 +594,13 @@ impl Stream for EventChunker { // TODO gather stats about this: self.inp.put_back(fcr); } - match self.fetch_info.shape() { - Shape::Scalar => { - if self.need_min > 1024 * 8 { - let msg = - format!("spurious EventChunker asks for need_min {}", self.need_min); - self.errored = true; - return Ready(Some(Err(Error::with_msg(msg)))); - } - } - Shape::Wave(_) => { - if self.need_min > 1024 * 32 { - let msg = - format!("spurious EventChunker asks for need_min {}", self.need_min); - self.errored = true; - return Ready(Some(Err(Error::with_msg(msg)))); - } - } - Shape::Image(_, _) => { - if self.need_min > 1024 * 1024 * 20 { - let msg = - format!("spurious EventChunker asks for need_min {}", self.need_min); - self.errored = true; - return Ready(Some(Err(Error::with_msg(msg)))); - } - } + if self.need_min > self.need_min_max { + let msg = format!( + "spurious EventChunker asks for need_min {} max {}", + self.need_min, self.need_min_max + ); + self.errored = true; + return Ready(Some(Err(Error::with_msg(msg)))); } let x = self.need_min; self.inp.set_need_min(x); diff --git a/disk/src/merge/mergedblobsfromremotes.rs b/disk/src/merge/mergedblobsfromremotes.rs index 5c9c780..5f05592 100644 --- a/disk/src/merge/mergedblobsfromremotes.rs +++ b/disk/src/merge/mergedblobsfromremotes.rs @@ -7,6 +7,7 @@ use items_2::eventfull::EventFull; use items_2::merger::Merger; use netpod::log::*; use netpod::Cluster; +use netpod::SfChFetchInfo; use query::api4::events::EventsSubQuery; use std::future::Future; use std::pin::Pin; @@ -27,7 +28,7 @@ pub struct MergedBlobsFromRemotes { impl MergedBlobsFromRemotes { pub fn new(subq: EventsSubQuery, cluster: Cluster) -> Self { - debug!("MergedBlobsFromRemotes evq {:?}", subq); + debug!("MergedBlobsFromRemotes subq {:?}", subq); let mut tcp_establish_futs = Vec::new(); for node in &cluster.nodes { let f = x_processed_event_blobs_stream_from_node(subq.clone(), node.clone()); diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 8236cfd..352b7af 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -91,7 +91,7 @@ pub async fn make_event_pipe( pub fn make_local_event_blobs_stream( range: NanoRange, - fetch_info: &SfChFetchInfo, + fetch_info: SfChFetchInfo, expand: bool, do_decompress: bool, event_chunker_conf: EventChunkerConf, @@ -127,7 +127,7 @@ pub fn make_local_event_blobs_stream( pub fn make_remote_event_blobs_stream( range: NanoRange, - fetch_info: &SfChFetchInfo, + fetch_info: SfChFetchInfo, expand: bool, do_decompress: bool, event_chunker_conf: EventChunkerConf, @@ -175,7 +175,7 @@ pub async fn make_event_blobs_pipe_real( let pipe = if do_local { let event_blobs = make_local_event_blobs_stream( range.try_into()?, - fetch_info, + fetch_info.clone(), expand, false, event_chunker_conf, @@ -186,7 +186,7 @@ pub async fn make_event_blobs_pipe_real( } else { let event_blobs = make_remote_event_blobs_stream( range.try_into()?, - fetch_info, + fetch_info.clone(), expand, true, event_chunker_conf, diff --git a/httpclient/src/httpclient.rs b/httpclient/src/httpclient.rs index 7231474..9695e5c 100644 --- a/httpclient/src/httpclient.rs +++ b/httpclient/src/httpclient.rs @@ -40,7 +40,12 @@ impl ErrConv for Result { impl Convable for http::Error {} impl Convable for hyper::Error {} -pub async fn http_get(url: Url, accept: &str) -> Result { +pub struct HttpResponse { + pub head: http::response::Parts, + pub body: Bytes, +} + +pub async fn http_get(url: Url, accept: &str) -> Result { let req = Request::builder() .method(http::Method::GET) .uri(url.to_string()) @@ -49,22 +54,11 @@ pub async fn http_get(url: Url, accept: &str) -> Result { .ec()?; let client = hyper::Client::new(); let res = client.request(req).await.ec()?; - if res.status() != StatusCode::OK { - let (head, body) = res.into_parts(); - let buf = hyper::body::to_bytes(body).await.ec()?; - let s = String::from_utf8_lossy(&buf); - return Err(Error::with_msg(format!( - concat!( - "Server error {:?}\n", - "---------------------- message from http body:\n", - "{}\n", - "---------------------- end of http body", - ), - head, s - ))); - } - let body = hyper::body::to_bytes(res.into_body()).await.ec()?; - Ok(body) + let (head, body) = res.into_parts(); + info!("http_get head {head:?}"); + let body = hyper::body::to_bytes(body).await.ec()?; + let ret = HttpResponse { head, body }; + Ok(ret) } pub async fn http_post(url: Url, accept: &str, body: String) -> Result { diff --git a/httpclient/src/lib.rs b/httpclient/src/lib.rs index fa9aa4b..103cd3c 100644 --- a/httpclient/src/lib.rs +++ b/httpclient/src/lib.rs @@ -1,4 +1,5 @@ pub mod httpclient; pub use crate::httpclient::*; +pub use http; pub use url; diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index aed273e..a063823 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -51,6 +51,7 @@ use query::transform::TransformQuery; use serde::Deserialize; use serde::Serialize; use serde_json::Value as JsonValue; +use std::any; use std::collections::VecDeque; use std::future::Future; use std::pin::Pin; @@ -518,7 +519,7 @@ async fn find_ch_conf( range: NanoRange, channel: SfDbChannel, ncc: NodeConfigCached, -) -> Result { +) -> Result, Error> { let ret = nodenet::channelconfig::channel_config(range, channel, &ncc).await?; Ok(ret) } @@ -530,7 +531,7 @@ pub struct DataApiPython3DataStream { current_channel: Option, node_config: NodeConfigCached, chan_stream: Option> + Send>>>, - config_fut: Option> + Send>>>, + config_fut: Option, Error>> + Send>>>, disk_io_tune: DiskIoTune, do_decompress: bool, #[allow(unused)] @@ -676,7 +677,6 @@ impl DataApiPython3DataStream { // TODO this stream can currently only handle sf-databuffer type backend anyway. fn handle_config_fut_ready(&mut self, fetch_info: SfChFetchInfo) -> Result<(), Error> { self.config_fut = None; - debug!("found channel_config {:?}", fetch_info); let select = EventsSubQuerySelect::new( ChannelTypeConfigGen::SfDatabuffer(fetch_info.clone()), self.range.clone().into(), @@ -695,7 +695,7 @@ impl DataApiPython3DataStream { let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024)); let s = make_local_event_blobs_stream( self.range.clone(), - &fetch_info, + fetch_info.clone(), one_before, self.do_decompress, event_chunker_conf, @@ -704,7 +704,7 @@ impl DataApiPython3DataStream { )?; Box::pin(s) as Pin> + Send>> } else { - debug!("Set up merged remote stream"); + debug!("set up merged remote stream {}", fetch_info.name()); let s = MergedBlobsFromRemotes::new(subq, self.node_config.node_config.cluster.clone()); Box::pin(s) as Pin> + Send>> }; @@ -762,7 +762,7 @@ impl Stream for DataApiPython3DataStream { } } else if let Some(fut) = &mut self.config_fut { match fut.poll_unpin(cx) { - Ready(Ok(k)) => match k { + Ready(Ok(Some(k))) => match k { ChannelTypeConfigGen::Scylla(_) => { let e = Error::with_msg_no_trace("scylla"); error!("{e}"); @@ -779,6 +779,11 @@ impl Stream for DataApiPython3DataStream { } }, }, + Ready(Ok(None)) => { + warn!("logic error"); + self.config_fut = None; + continue; + } Ready(Err(e)) => { self.data_done = true; Ready(Some(Err(e))) @@ -795,7 +800,7 @@ impl Stream for DataApiPython3DataStream { self.node_config.clone(), ))); } - self.config_fut = Some(Box::pin(futures_util::future::ready(Ok(channel)))); + self.config_fut = Some(Box::pin(futures_util::future::ready(Ok(Some(channel))))); continue; } else { self.data_done = true; @@ -858,7 +863,7 @@ impl Api1EventsBinaryHandler { .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))? .to_owned(); let body_data = hyper::body::to_bytes(body).await?; - if body_data.len() < 512 && body_data.first() == Some(&"{".as_bytes()[0]) { + if body_data.len() < 1024 * 2 && body_data.first() == Some(&"{".as_bytes()[0]) { info!("request body_data string: {}", String::from_utf8_lossy(&body_data)); } let qu = match serde_json::from_slice::(&body_data) { @@ -895,9 +900,10 @@ impl Api1EventsBinaryHandler { span: tracing::Span, ncc: &NodeConfigCached, ) -> Result, Error> { + let self_name = any::type_name::(); // TODO this should go to usage statistics: info!( - "Handle Api1Query {:?} {} {:?}", + "{self_name} {:?} {} {:?}", qu.range(), qu.channels().len(), qu.channels().first() @@ -905,7 +911,7 @@ impl Api1EventsBinaryHandler { let settings = EventsSubQuerySettings::from(&qu); let beg_date = qu.range().beg().clone(); let end_date = qu.range().end().clone(); - trace!("Api1Query beg_date {:?} end_date {:?}", beg_date, end_date); + trace!("{self_name} beg_date {:?} end_date {:?}", beg_date, end_date); //let url = Url::parse(&format!("dummy:{}", req.uri()))?; //let query = PlainEventsBinaryQuery::from_url(&url)?; if accept.contains(APP_OCTET) || accept.contains(ACCEPT_ALL) { @@ -916,12 +922,25 @@ impl Api1EventsBinaryHandler { let backend = &ncc.node_config.cluster.backend; // TODO ask for channel config quorum for all channels up front. //httpclient::http_get(url, accept); + let ts1 = Instant::now(); let mut chans = Vec::new(); for ch in qu.channels() { + info!("try to find config quorum for {ch:?}"); let ch = SfDbChannel::from_name(backend, ch.name()); - let ch_conf = nodenet::configquorum::find_config_basics_quorum(ch, range.clone().into(), ncc).await?; - chans.push(ch_conf); + let ch_conf = + nodenet::configquorum::find_config_basics_quorum(ch.clone(), range.clone().into(), ncc).await?; + match ch_conf { + Some(x) => { + chans.push(x); + } + None => { + error!("no config quorum found for {ch:?}"); + } + } } + let ts2 = Instant::now(); + let dt = ts2.duration_since(ts1).as_millis(); + info!("{self_name} configs fetched in {} ms", dt); // TODO use a better stream protocol with built-in error delivery. let status_id = super::status_board()?.new_status_id(); let s = DataApiPython3DataStream::new( @@ -942,8 +961,8 @@ impl Api1EventsBinaryHandler { Ok(ret) } else { // TODO set the public error code and message and return Err(e). - let e = Error::with_public_msg(format!("Unsupported Accept: {}", accept)); - error!("{e}"); + let e = Error::with_public_msg(format!("{self_name} unsupported Accept: {}", accept)); + error!("{self_name} {e}"); Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?) } } diff --git a/httpret/src/api4/binned.rs b/httpret/src/api4/binned.rs index 738e660..f6c787b 100644 --- a/httpret/src/api4/binned.rs +++ b/httpret/src/api4/binned.rs @@ -27,7 +27,10 @@ async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCache let msg = format!("can not parse query: {}", e.msg()); e.add_public_msg(msg) })?; - let ch_conf = ch_conf_from_binned(&query, node_config).await?; + // TODO handle None case better and return 404 + let ch_conf = ch_conf_from_binned(&query, node_config) + .await? + .ok_or_else(|| Error::with_msg_no_trace("channel not found"))?; let span1 = span!( Level::INFO, "httpret::binned", diff --git a/httpret/src/api4/events.rs b/httpret/src/api4/events.rs index 092ff79..0147467 100644 --- a/httpret/src/api4/events.rs +++ b/httpret/src/api4/events.rs @@ -93,7 +93,11 @@ async fn plain_events_json( let (_head, _body) = req.into_parts(); let query = PlainEventsQuery::from_url(&url)?; info!("plain_events_json query {query:?}"); - let ch_conf = chconf_from_events_v1(&query, node_config).await.map_err(Error::from)?; + // TODO handle None case better and return 404 + let ch_conf = chconf_from_events_v1(&query, node_config) + .await + .map_err(Error::from)? + .ok_or_else(|| Error::with_msg_no_trace("channel not found"))?; info!("plain_events_json chconf_from_events_v1: {ch_conf:?}"); let item = streams::plaineventsjson::plain_events_json(&query, ch_conf, &node_config.node_config.cluster).await; let item = match item { diff --git a/httpret/src/channel_status.rs b/httpret/src/channel_status.rs index 0b6cbbc..0da4caf 100644 --- a/httpret/src/channel_status.rs +++ b/httpret/src/channel_status.rs @@ -149,8 +149,9 @@ impl ChannelStatusEvents { .as_ref() .ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?; let scy = scyllaconn::create_scy_session(scyco).await?; - let chconf = - nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), node_config).await?; + let chconf = nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), node_config) + .await? + .ok_or_else(|| Error::with_msg_no_trace("channel config not found"))?; let do_one_before_range = true; match chconf { ChannelTypeConfigGen::Scylla(ch_conf) => { diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index 1cf3625..8ea5873 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -36,18 +36,24 @@ use url::Url; pub async fn chconf_from_events_v1( q: &PlainEventsQuery, ncc: &NodeConfigCached, -) -> Result { +) -> Result, Error> { let ret = nodenet::configquorum::find_config_basics_quorum(q.channel().clone(), q.range().clone(), ncc).await?; Ok(ret) } -pub async fn chconf_from_prebinned(q: &PreBinnedQuery, ncc: &NodeConfigCached) -> Result { +pub async fn chconf_from_prebinned( + q: &PreBinnedQuery, + ncc: &NodeConfigCached, +) -> Result, Error> { let ret = nodenet::configquorum::find_config_basics_quorum(q.channel().clone(), q.patch().patch_range(), ncc).await?; Ok(ret) } -pub async fn ch_conf_from_binned(q: &BinnedQuery, ncc: &NodeConfigCached) -> Result { +pub async fn ch_conf_from_binned( + q: &BinnedQuery, + ncc: &NodeConfigCached, +) -> Result, Error> { let ret = nodenet::configquorum::find_config_basics_quorum(q.channel().clone(), q.range().clone(), ncc).await?; Ok(ret) } @@ -91,16 +97,24 @@ impl ChannelConfigHandler { req: Request, node_config: &NodeConfigCached, ) -> Result, Error> { - info!("channel_config"); let url = Url::parse(&format!("dummy:{}", req.uri()))?; let q = ChannelConfigQuery::from_url(&url)?; - info!("channel_config for q {q:?}"); let conf = nodenet::channelconfig::channel_config(q.range.clone(), q.channel.clone(), node_config).await?; - let res: ChannelConfigResponse = conf.into(); - let ret = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, APP_JSON) - .body(Body::from(serde_json::to_string(&res)?))?; - Ok(ret) + match conf { + Some(conf) => { + let res: ChannelConfigResponse = conf.into(); + let ret = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, APP_JSON) + .body(Body::from(serde_json::to_string(&res)?))?; + Ok(ret) + } + None => { + let ret = response(StatusCode::NOT_FOUND) + .header(http::header::CONTENT_TYPE, APP_JSON) + .body(Body::empty())?; + Ok(ret) + } + } } } @@ -126,7 +140,7 @@ impl ChannelConfigsHandler { match self.channel_configs(req, &node_config).await { Ok(k) => Ok(k), Err(e) => { - warn!("ChannelConfigHandler::handle: got error from channel_config: {e:?}"); + warn!("got error from channel_config: {e}"); Ok(e.to_public_response()) } } @@ -151,6 +165,53 @@ impl ChannelConfigsHandler { } } +pub struct ChannelConfigQuorumHandler {} + +impl ChannelConfigQuorumHandler { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/4/channel/config/quorum" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() == Method::GET { + let accept_def = APP_JSON; + let accept = req + .headers() + .get(http::header::ACCEPT) + .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); + if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { + match self.channel_config_quorum(req, &node_config).await { + Ok(k) => Ok(k), + Err(e) => { + warn!("from channel_config_quorum: {e}"); + Ok(e.to_public_response()) + } + } + } else { + Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) + } + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } + + async fn channel_config_quorum(&self, req: Request, ncc: &NodeConfigCached) -> Result, Error> { + info!("channel_config_quorum"); + let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let q = ChannelConfigQuery::from_url(&url)?; + info!("channel_config_quorum for q {q:?}"); + let ch_confs = nodenet::configquorum::find_config_basics_quorum(q.channel, q.range.into(), ncc).await?; + let ret = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, APP_JSON) + .body(Body::from(serde_json::to_string(&ch_confs)?))?; + Ok(ret) + } +} + trait ErrConv { fn err_conv(self) -> Result; } diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 9de47d3..3b00f35 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -317,6 +317,8 @@ async fn http_service_inner( h.handle(req, &node_config).await } else if let Some(h) = api4::binned::BinnedHandler::handler(&req) { h.handle(req, &node_config).await + } else if let Some(h) = channelconfig::ChannelConfigQuorumHandler::handler(&req) { + h.handle(req, &node_config).await } else if let Some(h) = channelconfig::ChannelConfigsHandler::handler(&req) { h.handle(req, &node_config).await } else if let Some(h) = channelconfig::ChannelConfigHandler::handler(&req) { diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index e22e876..a9804bd 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -315,7 +315,7 @@ impl ScalarType { F32 => 4, F64 => 8, BOOL => 1, - STRING => 0, + STRING => 1, } } @@ -1252,7 +1252,7 @@ mod dt_nano_serde { } } -#[derive(Clone, PartialEq, PartialOrd)] +#[derive(Clone, PartialEq, PartialOrd, Eq, Ord)] pub struct TsNano(pub u64); mod ts_nano_ser { diff --git a/nodenet/src/channelconfig.rs b/nodenet/src/channelconfig.rs index 8f98a4d..8fa1672 100644 --- a/nodenet/src/channelconfig.rs +++ b/nodenet/src/channelconfig.rs @@ -98,32 +98,37 @@ pub async fn channel_config( range: NanoRange, channel: SfDbChannel, ncc: &NodeConfigCached, -) -> Result { +) -> Result, Error> { if channel.backend() == TEST_BACKEND { - channel_config_test_backend(channel) + Ok(Some(channel_config_test_backend(channel)?)) } else if ncc.node_config.cluster.scylla.is_some() { debug!("try to get ChConf for scylla type backend"); let ret = dbconn::channelconfig::chconf_from_scylla_type_backend(&channel, ncc) .await .map_err(Error::from)?; - Ok(ChannelTypeConfigGen::Scylla(ret)) + Ok(Some(ChannelTypeConfigGen::Scylla(ret))) } else if ncc.node.sf_databuffer.is_some() { debug!("channel_config channel {channel:?}"); - let config = disk::channelconfig::channel_config_best_match(range, channel.clone(), ncc) - .await? - .ok_or_else(|| Error::with_msg_no_trace("config entry not found"))?; - debug!("channel_config config {config:?}"); - let ret = SfChFetchInfo::new( - config.channel.backend(), - config.channel.name(), - config.keyspace, - config.time_bin_size, - config.byte_order, - config.scalar_type, - config.shape, - ); - let ret = ChannelTypeConfigGen::SfDatabuffer(ret); - Ok(ret) + let k = disk::channelconfig::channel_config_best_match(range, channel.clone(), ncc) + .await + .map_err(|e| Error::from(e.to_string()))?; + match k { + Some(config) => { + debug!("channel_config config {config:?}"); + let ret = SfChFetchInfo::new( + config.channel.backend(), + config.channel.name(), + config.keyspace, + config.time_bin_size, + config.byte_order, + config.scalar_type, + config.shape, + ); + let ret = ChannelTypeConfigGen::SfDatabuffer(ret); + Ok(Some(ret)) + } + None => Ok(None), + } } else { return Err( Error::with_msg_no_trace(format!("no channel config for backend {}", channel.backend())) @@ -144,7 +149,9 @@ pub async fn channel_configs(channel: SfDbChannel, ncc: &NodeConfigCached) -> Re Ok(vec![ChannelTypeConfigGen::Scylla(ret)]) } else if ncc.node.sf_databuffer.is_some() { debug!("channel_config channel {channel:?}"); - let configs = disk::channelconfig::channel_configs(channel.clone(), ncc).await?; + let configs = disk::channelconfig::channel_configs(channel.clone(), ncc) + .await + .map_err(|e| Error::from(e.to_string()))?; let a = configs; let mut configs = Vec::new(); for config in a.entries { @@ -169,11 +176,26 @@ pub async fn channel_configs(channel: SfDbChannel, ncc: &NodeConfigCached) -> Re } } -pub async fn http_get_channel_config(qu: ChannelConfigQuery, baseurl: Url) -> Result { +pub async fn http_get_channel_config( + qu: ChannelConfigQuery, + baseurl: Url, +) -> Result, Error> { let url = baseurl; let mut url = url.join("channel/config").unwrap(); qu.append_to_url(&mut url); let res = httpclient::http_get(url, APP_JSON).await?; - let ret: ChannelConfigResponse = serde_json::from_slice(&res)?; - Ok(ret) + use httpclient::http::StatusCode; + if res.head.status == StatusCode::NOT_FOUND { + Ok(None) + } else if res.head.status == StatusCode::OK { + let ret: ChannelConfigResponse = serde_json::from_slice(&res.body)?; + Ok(Some(ret)) + } else { + let b = &res.body; + let s = String::from_utf8_lossy(&b[0..b.len().min(256)]); + Err(Error::with_msg_no_trace(format!( + "http_get_channel_config {} {}", + res.head.status, s + ))) + } } diff --git a/nodenet/src/configquorum.rs b/nodenet/src/configquorum.rs index cc260bc..fac370a 100644 --- a/nodenet/src/configquorum.rs +++ b/nodenet/src/configquorum.rs @@ -1,5 +1,6 @@ use crate::channelconfig::http_get_channel_config; use err::Error; +use netpod::log::*; use netpod::range::evrange::SeriesRange; use netpod::ChConf; use netpod::ChannelConfigQuery; @@ -10,6 +11,7 @@ use netpod::NodeConfigCached; use netpod::SfChFetchInfo; use netpod::SfDbChannel; use std::collections::BTreeMap; +use std::time::Duration; fn decide_sf_ch_config_quorum(inp: Vec) -> Result, Error> { let mut histo = BTreeMap::new(); @@ -60,9 +62,12 @@ async fn find_sf_ch_config_quorum( // TODO expand: false, }; - let res = http_get_channel_config(qu, node.baseurl()).await?; + let res = tokio::time::timeout(Duration::from_millis(4000), http_get_channel_config(qu, node.baseurl())) + .await + .map_err(|_| Error::with_msg_no_trace("timeout"))??; all.push(res); } + let all: Vec<_> = all.into_iter().filter_map(|x| x).collect(); let qu = decide_sf_ch_config_quorum(all)?; match qu { Some(item) => match item { @@ -79,17 +84,18 @@ pub async fn find_config_basics_quorum( channel: SfDbChannel, range: SeriesRange, ncc: &NodeConfigCached, -) -> Result { +) -> Result, Error> { if let Some(_cfg) = &ncc.node.sf_databuffer { - let ret: SfChFetchInfo = find_sf_ch_config_quorum(channel, range, ncc) - .await? - .ok_or_else(|| Error::with_msg_no_trace("no config found at all"))?; - Ok(ChannelTypeConfigGen::SfDatabuffer(ret)) + match find_sf_ch_config_quorum(channel, range, ncc).await? { + Some(x) => Ok(Some(ChannelTypeConfigGen::SfDatabuffer(x))), + None => Ok(None), + } } else if let Some(_cfg) = &ncc.node_config.cluster.scylla { + // TODO let called function allow to return None instead of error-not-found let ret = dbconn::channelconfig::chconf_from_scylla_type_backend(&channel, ncc) .await .map_err(Error::from)?; - Ok(ChannelTypeConfigGen::Scylla(ret)) + Ok(Some(ChannelTypeConfigGen::Scylla(ret))) } else { Err(Error::with_msg_no_trace( "find_config_basics_quorum not supported backend", diff --git a/parse/src/channelconfig.rs b/parse/src/channelconfig.rs index 952ae24..346bd63 100644 --- a/parse/src/channelconfig.rs +++ b/parse/src/channelconfig.rs @@ -1,3 +1,4 @@ +use err::thiserror; use err::Error; use netpod::log::*; use netpod::range::evrange::NanoRange; @@ -27,6 +28,17 @@ use tokio::io::ErrorKind; const TEST_BACKEND: &str = "testbackend-00"; +#[derive(Debug, thiserror::Error)] +#[error("ConfigParseError")] +pub enum ConfigParseError { + NotSupportedOnNode, + FileNotFound, + PermissionDenied, + IO, + ParseError, + NotSupported, +} + #[derive(Debug)] pub struct NErr { msg: String, @@ -306,41 +318,40 @@ pub fn parse_config(inp: &[u8]) -> NRes { Ok((inp, ret)) } -async fn read_local_config_real(channel: SfDbChannel, ncc: &NodeConfigCached) -> Result { +async fn read_local_config_real( + channel: SfDbChannel, + ncc: &NodeConfigCached, +) -> Result { let path = ncc .node .sf_databuffer .as_ref() - .ok_or_else(|| Error::with_msg(format!("missing sf databuffer config in node")))? + .ok_or_else(|| ConfigParseError::NotSupportedOnNode)? .data_base_path .join("config") .join(channel.name()) .join("latest") .join("00000_Config"); - // TODO use commonio here to wrap the error conversion - let buf = match tokio::fs::read(&path).await { - Ok(k) => k, + match tokio::fs::read(&path).await { + Ok(buf) => { + let config = parse_config(&buf).map_err(|_| ConfigParseError::ParseError)?; + Ok(config.1) + } Err(e) => match e.kind() { - ErrorKind::NotFound => { - let bt = err::bt::Backtrace::new(); - error!("{bt:?}"); - return Err(Error::with_public_msg(format!( - "databuffer channel config file not found for channel {channel:?} at {path:?}" - ))); + ErrorKind::NotFound => Err(ConfigParseError::FileNotFound), + ErrorKind::PermissionDenied => Err(ConfigParseError::PermissionDenied), + e => { + error!("read_local_config_real {e:?}"); + Err(ConfigParseError::IO) } - ErrorKind::PermissionDenied => { - return Err(Error::with_public_msg(format!( - "databuffer channel config file permission denied for channel {channel:?} at {path:?}" - ))) - } - _ => return Err(e.into()), }, - }; - let config = parse_config(&buf).map_err(NErr::from)?; - Ok(config.1) + } } -async fn read_local_config_test(channel: SfDbChannel, ncc: &NodeConfigCached) -> Result { +async fn read_local_config_test( + channel: SfDbChannel, + ncc: &NodeConfigCached, +) -> Result { if channel.name() == "test-gen-i32-dim0-v00" { let ret = ChannelConfigs { format_version: 0, @@ -402,12 +413,15 @@ async fn read_local_config_test(channel: SfDbChannel, ncc: &NodeConfigCached) -> }; Ok(ret) } else { - Err(Error::with_msg_no_trace(format!("unknown test channel {channel:?}"))) + Err(ConfigParseError::NotSupported) } } // TODO can I take parameters as ref, even when used in custom streams? -pub async fn read_local_config(channel: SfDbChannel, ncc: NodeConfigCached) -> Result { +pub async fn read_local_config( + channel: SfDbChannel, + ncc: NodeConfigCached, +) -> Result { if channel.backend() == TEST_BACKEND { read_local_config_test(channel, &ncc).await } else { @@ -436,7 +450,7 @@ impl<'a> MatchingConfigEntry<'a> { pub fn extract_matching_config_entry<'a>( range: &NanoRange, channel_config: &'a ChannelConfigs, -) -> Result, Error> { +) -> Result, ConfigParseError> { let mut a: Vec<_> = channel_config.entries.iter().enumerate().map(|(i, x)| (i, x)).collect(); a.sort_unstable_by_key(|(_, x)| x.ts.ns()); diff --git a/streams/src/tcprawclient.rs b/streams/src/tcprawclient.rs index 45b86b9..401d01b 100644 --- a/streams/src/tcprawclient.rs +++ b/streams/src/tcprawclient.rs @@ -17,7 +17,6 @@ use items_2::framable::EventQueryJsonStringFrame; use items_2::framable::Framable; use items_2::frame::make_term_frame; use netpod::log::*; -use netpod::ChannelTypeConfigGen; use netpod::Cluster; use netpod::Node; use query::api4::events::EventsSubQuery;