From b02b2a7add488c8f403c348d3d34b4ef23bf5429 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 25 Oct 2024 11:54:03 +0200 Subject: [PATCH] WIP --- crates/httpret/src/api4/binned.rs | 13 ++- crates/items_0/src/timebin.rs | 1 + crates/items_2/src/binning/container_bins.rs | 4 + crates/items_2/src/streams.rs | 2 - crates/netpod/src/netpod.rs | 8 +- crates/streams/Cargo.toml | 1 + crates/streams/src/eventsplainreader.rs | 13 +-- crates/streams/src/lib.rs | 1 + crates/streams/src/teststream.rs | 37 ++++++ crates/streams/src/timebin/cached/reader.rs | 114 ++++++++++++++++++- crates/streams/src/timebin/fromevents.rs | 10 -- crates/streams/src/timebinnedjson.rs | 25 +--- 12 files changed, 179 insertions(+), 50 deletions(-) create mode 100644 crates/streams/src/teststream.rs diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index 6b3c164..444ed69 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -140,12 +140,16 @@ async fn binned( } fn make_read_provider( + chname: &str, scyqueue: Option, open_bytes: Pin>, ctx: &ReqCtx, ncc: &NodeConfigCached, ) -> (Arc, Arc) { - let events_read_provider = if ncc.node_config.cluster.scylla_lt().is_some() { + let events_read_provider = if chname.starts_with("unittest") { + let x = streams::teststream::UnitTestStream::new(); + Arc::new(x) + } else if ncc.node_config.cluster.scylla_lt().is_some() { scyqueue .clone() .map(|qu| ScyllaEventReadProvider::new(qu)) @@ -189,7 +193,6 @@ async fn binned_json_single( error!("binned_json: {e:?}"); Error::BadQuery(e.to_string()) })?; - // TODO handle None case better and return 404 let ch_conf = ch_conf_from_binned(&query, ctx, pgqueue, ncc) .await? .ok_or_else(|| Error::ChannelNotFound)?; @@ -207,7 +210,8 @@ async fn binned_json_single( let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); let open_bytes = Arc::pin(open_bytes); let open_bytes2 = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); - let (events_read_provider, cache_read_provider) = make_read_provider(scyqueue, open_bytes2, ctx, ncc); + let (events_read_provider, cache_read_provider) = + make_read_provider(ch_conf.name(), scyqueue, open_bytes2, ctx, ncc); let item = streams::timebinnedjson::timebinned_json( query, ch_conf, @@ -271,7 +275,8 @@ async fn binned_json_framed( let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); let open_bytes = Arc::pin(open_bytes); let open_bytes2 = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); - let (events_read_provider, cache_read_provider) = make_read_provider(scyqueue, open_bytes2, ctx, ncc); + let (events_read_provider, cache_read_provider) = + make_read_provider(ch_conf.name(), scyqueue, open_bytes2, ctx, ncc); let stream = streams::timebinnedjson::timebinned_json_framed( query, ch_conf, diff --git a/crates/items_0/src/timebin.rs b/crates/items_0/src/timebin.rs index 6a8e780..c6ba4d0 100644 --- a/crates/items_0/src/timebin.rs +++ b/crates/items_0/src/timebin.rs @@ -112,6 +112,7 @@ pub trait BinningggContainerBinsDyn: fmt::Debug + Send + fmt::Display + WithLen &self, ) -> std::iter::Zip, std::collections::vec_deque::Iter>; fn drain_into(&mut self, dst: &mut dyn BinningggContainerBinsDyn, range: Range); + fn fix_numerics(&mut self); fn to_old_time_binned(&self) -> Box; } diff --git a/crates/items_2/src/binning/container_bins.rs b/crates/items_2/src/binning/container_bins.rs index 33c42e7..c4c16b2 100644 --- a/crates/items_2/src/binning/container_bins.rs +++ b/crates/items_2/src/binning/container_bins.rs @@ -618,6 +618,10 @@ where } } + fn fix_numerics(&mut self) { + for ((min, max), avg) in self.mins.iter_mut().zip(self.maxs.iter_mut()).zip(self.avgs.iter_mut()) {} + } + fn to_old_time_binned(&self) -> Box { try_to_old_time_binned!(u8, self, 0); try_to_old_time_binned!(u16, self, 0); diff --git a/crates/items_2/src/streams.rs b/crates/items_2/src/streams.rs index 9c2a9e1..2597924 100644 --- a/crates/items_2/src/streams.rs +++ b/crates/items_2/src/streams.rs @@ -199,8 +199,6 @@ impl VecStream { } } -/*impl Unpin for VecStream where T: Unpin {}*/ - impl Stream for VecStream where T: Unpin, diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index c5dfa18..ec7b208 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -2165,10 +2165,10 @@ const TIME_BIN_THRESHOLDS: [u64; 26] = [ DAY * 64, ]; -const TIME_BIN_LEN_CACHE_OPTS: [DtMs; 2] = [ +const TIME_BIN_LEN_CACHE_OPTS: [DtMs; 0] = [ // - DtMs(1000 * 10), - DtMs(1000 * 60 * 60), + // DtMs(1000 * 10), + // DtMs(1000 * 60 * 60), ]; pub fn time_bin_len_cache_opts() -> &'static [DtMs] { @@ -2592,7 +2592,7 @@ impl BinnedRangeEnum { return Ok(ret); } } - Err(Error::with_msg_no_trace("can not find matching pre-binned grid")) + Err(Error::with_msg_no_trace("can not find matching binned grid")) } /// Cover at least the given range while selecting the bin width which best fits the requested bin width. diff --git a/crates/streams/Cargo.toml b/crates/streams/Cargo.toml index 18392b7..19c9407 100644 --- a/crates/streams/Cargo.toml +++ b/crates/streams/Cargo.toml @@ -19,6 +19,7 @@ arrayref = "0.3.6" crc32fast = "1.3.2" byteorder = "1.4.3" async-channel = "1.8.0" +rand_xoshiro = "0.6.0" chrono = { version = "0.4.19", features = ["serde"] } wasmer = { version = "4.1.0", default-features = false, features = ["sys", "cranelift"], optional = true } err = { path = "../err" } diff --git a/crates/streams/src/eventsplainreader.rs b/crates/streams/src/eventsplainreader.rs index 3d99e30..29ae922 100644 --- a/crates/streams/src/eventsplainreader.rs +++ b/crates/streams/src/eventsplainreader.rs @@ -15,16 +15,11 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; +type ChEvsBox = Pin> + Send>>; + enum StreamState { - Opening( - Pin< - Box< - dyn Future> + Send>>, ::err::Error>> - + Send, - >, - >, - ), - Reading(Pin> + Send>>), + Opening(Pin> + Send>>), + Reading(ChEvsBox), } struct InnerStream { diff --git a/crates/streams/src/lib.rs b/crates/streams/src/lib.rs index a4233df..605ce96 100644 --- a/crates/streams/src/lib.rs +++ b/crates/streams/src/lib.rs @@ -22,6 +22,7 @@ pub mod slidebuf; pub mod tcprawclient; #[cfg(test)] pub mod test; +pub mod teststream; pub mod timebin; pub mod timebinnedjson; pub mod transform; diff --git a/crates/streams/src/teststream.rs b/crates/streams/src/teststream.rs new file mode 100644 index 0000000..d1c85f6 --- /dev/null +++ b/crates/streams/src/teststream.rs @@ -0,0 +1,37 @@ +use crate::timebin::cached::reader::EventsReadProvider; +use crate::timebin::cached::reader::EventsReading; +use futures_util::Stream; +use items_0::streamitem::Sitemty; +use items_2::channelevents::ChannelEvents; +use netpod::range::evrange::SeriesRange; +use query::api4::events::EventsSubQuery; +use rand_xoshiro::rand_core::SeedableRng; +use rand_xoshiro::Xoshiro128PlusPlus; +use std::pin::Pin; + +fn make_stream(chname: &str, range: &SeriesRange) -> Pin> + Send>> { + if chname == "unittest;scylla;cont;scalar;f32" { + let e = ::err::Error::with_msg_no_trace("unknown channel {chname}"); + let ret = futures_util::stream::iter([Err(e)]); + Box::pin(ret) + } else { + let e = ::err::Error::with_msg_no_trace("unknown channel {chname}"); + let ret = futures_util::stream::iter([Err(e)]); + Box::pin(ret) + } +} + +pub struct UnitTestStream {} + +impl UnitTestStream { + pub fn new() -> Self { + Self {} + } +} + +impl EventsReadProvider for UnitTestStream { + fn read(&self, evq: EventsSubQuery) -> EventsReading { + let stream = make_stream(evq.name(), evq.range()); + EventsReading::new(stream) + } +} diff --git a/crates/streams/src/timebin/cached/reader.rs b/crates/streams/src/timebin/cached/reader.rs index 099169b..c25cfc3 100644 --- a/crates/streams/src/timebin/cached/reader.rs +++ b/crates/streams/src/timebin/cached/reader.rs @@ -7,9 +7,11 @@ use futures_util::StreamExt; use items_0::streamitem::Sitemty; use items_0::timebin::BinsBoxed; use items_2::channelevents::ChannelEvents; +use items_2::eventsdim0::EventsDim0; use netpod::log::*; use netpod::BinnedRange; use netpod::DtMs; +use netpod::EnumVariant; use netpod::TsNano; use query::api4::events::EventsSubQuery; use std::future::Future; @@ -44,7 +46,117 @@ impl Stream for EventsReading { type Item = Sitemty; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.stream.poll_next_unpin(cx) + use Poll::*; + match self.stream.poll_next_unpin(cx) { + Ready(Some(item)) => { + use items_0::streamitem::RangeCompletableItem::*; + use items_0::streamitem::StreamItem::*; + match &item { + Ok(DataItem(Data(cevs))) => match cevs { + ChannelEvents::Events(evs) => { + if let Some(evs) = evs.as_any_ref().downcast_ref::>() { + use items_0::Empty; + let mut dst = EventsDim0::::empty(); + for ((&ts, &pulse), val) in evs + .tss() + .iter() + .zip(evs.pulses.iter()) + .zip(evs.private_values_ref().iter()) + { + dst.push_back(ts, pulse, val.ix()); + } + let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); + Ready(Some(item)) + } else if let Some(evs) = evs.as_any_ref().downcast_ref::>() { + use items_0::Empty; + let mut dst = EventsDim0::::empty(); + for ((&ts, &pulse), &val) in evs + .tss() + .iter() + .zip(evs.pulses.iter()) + .zip(evs.private_values_ref().iter()) + { + dst.push_back(ts, pulse, val as u8); + } + let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); + Ready(Some(item)) + } else if let Some(evs) = evs.as_any_ref().downcast_ref::>() { + use items_0::Empty; + let mut dst = EventsDim0::::empty(); + for ((&ts, &pulse), _) in evs + .tss() + .iter() + .zip(evs.pulses.iter()) + .zip(evs.private_values_ref().iter()) + { + dst.push_back(ts, pulse, 1); + } + let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); + Ready(Some(item)) + } else if let Some(evs) = evs.as_any_ref().downcast_ref::>() { + let buf = std::fs::read("evmod").unwrap_or(Vec::new()); + let s = String::from_utf8_lossy(&buf); + if s.contains("u8") { + use items_0::Empty; + let mut dst = EventsDim0::::empty(); + for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) { + let v = (val * 1e6) as u8; + dst.push_back(*ts, 0, v); + } + let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); + Ready(Some(item)) + } else if s.contains("i16") { + use items_0::Empty; + let mut dst = EventsDim0::::empty(); + for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) { + let v = (val * 1e6) as i16 - 50; + dst.push_back(*ts, 0, v); + } + let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); + Ready(Some(item)) + } else if s.contains("bool") { + use items_0::Empty; + let mut dst = EventsDim0::::empty(); + for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) { + let g = u64::from_ne_bytes(val.to_ne_bytes()); + let val = g % 2 == 0; + dst.push_back(*ts, 0, val); + } + let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); + Ready(Some(item)) + } else if s.contains("enum") { + use items_0::Empty; + let mut dst = EventsDim0::::empty(); + for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) { + let buf = val.to_ne_bytes(); + let h = buf[0] ^ buf[1] ^ buf[2] ^ buf[3] ^ buf[4] ^ buf[5] ^ buf[6] ^ buf[7]; + dst.push_back(*ts, 0, EnumVariant::new(h as u16, h.to_string())); + } + let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); + Ready(Some(item)) + } else if s.contains("string") { + use items_0::Empty; + let mut dst = EventsDim0::::empty(); + for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) { + dst.push_back(*ts, 0, val.to_string()); + } + let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); + Ready(Some(item)) + } else { + Ready(Some(item)) + } + } else { + Ready(Some(item)) + } + } + ChannelEvents::Status(conn_status_event) => Ready(Some(item)), + }, + _ => Ready(Some(item)), + } + } + Ready(None) => Ready(None), + Pending => Pending, + } } } diff --git a/crates/streams/src/timebin/fromevents.rs b/crates/streams/src/timebin/fromevents.rs index f4e43da..a8a5961 100644 --- a/crates/streams/src/timebin/fromevents.rs +++ b/crates/streams/src/timebin/fromevents.rs @@ -39,16 +39,6 @@ impl BinnedFromEvents { panic!(); } let stream = read_provider.read(evq); - // let stream = stream.map(|x| { - // let x = items_0::try_map_sitemty_data!(x, |x| match x { - // ChannelEvents::Events(x) => { - // let x = x.to_dim0_f32_for_binning(); - // Ok(ChannelEvents::Events(x)) - // } - // ChannelEvents::Status(x) => Ok(ChannelEvents::Status(x)), - // }); - // x - // }); let stream = if do_time_weight { let stream = Box::pin(stream); items_2::binning::timeweight::timeweight_events_dyn::BinnedEventsTimeweightStream::new(range, stream) diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index 5da0399..b883faf 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -246,16 +246,6 @@ async fn timebinnable_stream_sf_databuffer_binnable_box( open_bytes, ) .await?; - // let stream = stream.map(|x| x); - // let stream = stream.map(|x| ChannelEvents::Events(x)); - - // let stream = stream.map(move |k| { - // on_sitemty_data!(k, |k| { - // let k: Box = Box::new(k); - // Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) - // }) - // }); - let stream = PlainEventStream::new(stream); let stream = EventsToTimeBinnable::new(stream); let stream = Box::pin(stream); @@ -376,9 +366,10 @@ async fn timebinned_stream( events_read_provider: Arc, ) -> Result>> + Send>>, Error> { use netpod::query::CacheUsage; - let cache_usage = query.cache_usage().unwrap_or(CacheUsage::V0NoCache); + let cache_usage = query.cache_usage().unwrap_or(CacheUsage::Ignore); match cache_usage.clone() { CacheUsage::Use | CacheUsage::Recreate | CacheUsage::Ignore => { + debug!("BINNING NEW METHOD"); debug!( "timebinned_stream caching {:?} subgrids {:?}", query, @@ -407,18 +398,11 @@ async fn timebinned_stream( events_read_provider, ) .map_err(Error::from_string)?; - // let stream = stream.map(|item| { - // on_sitemty_data!(item, |k: items_0::timebin::BinsBoxed| { - // let ret = k.to_old_time_binned(); - // Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))) - // }) - // }); let stream = stream.map(|item| { use items_0::timebin::BinningggContainerBinsDyn; - on_sitemty_data!(item, |x: Box| { - let g = x.new_collector(); + on_sitemty_data!(item, |mut x: Box| { + x.fix_numerics(); let ret = Box::new(x) as Box; - // let ret = x as Box; Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))) }) }); @@ -426,6 +410,7 @@ async fn timebinned_stream( Ok(stream) } _ => { + debug!("BINNING OLD METHOD"); let range = binned_range.binned_range_time().to_nano_range(); let do_time_weight = true; let one_before_range = true;