diff --git a/Cargo.toml b/Cargo.toml index ab04846..7dd8692 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,8 @@ crc32fast = "1.4.2" byteorder = "1.5.0" async-channel = "2.3.1" rand_xoshiro = "0.6.0" +itertools = "0.13.0" +humantime = "2.1.0" http = "1" http-body = "1" http-body-util = "0.1.0" diff --git a/src/eventsplainreader.rs b/src/eventsplainreader.rs index bb08bda..8cc2fb1 100644 --- a/src/eventsplainreader.rs +++ b/src/eventsplainreader.rs @@ -1,26 +1,35 @@ +use crate::log::*; use crate::tcprawclient::OpenBoxedBytesStreamsBox; use crate::timebin::cached::reader::CacheReadProvider; +use crate::timebin::cached::reader::CacheReading; use crate::timebin::cached::reader::EventsReadProvider; use crate::timebin::cached::reader::EventsReading; +use crate::timebin::cached::reader::PrebinnedPartitioning; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; use items_0::streamitem::sitem_err_from_string; use items_0::streamitem::Sitemty; +use items_0::timebin::BinningggContainerBinsDyn; +use items_2::binning::container_bins::ContainerBins; use items_2::channelevents::ChannelEvents; +use netpod::DtMs; use netpod::ReqCtx; +use netpod::TsNano; use query::api4::events::EventsSubQuery; +use std::ops::Range; use std::pin::Pin; use std::sync::Arc; use std::task::Context; use std::task::Poll; -#[derive(Debug, thiserror::Error)] -#[cstm(name = "EventsPlainReader")] -pub enum Error { - Timebinned(#[from] crate::timebinnedjson::Error), -} +autoerr::create_error_v1!( + name(Error, "EventsPlainReader"), + enum variants { + Timebinned(#[from] crate::timebinnedjson::Error), + }, +); type ChEvsBox = Pin> + Send>>; @@ -118,13 +127,63 @@ impl CacheReadProvider for DummyCacheReadProvider { let stream = futures_util::future::ready(Ok(None)); crate::timebin::cached::reader::CacheReading::new(Box::pin(stream)) } +} - fn write( - &self, - _series: u64, - _bins: items_0::timebin::BinsBoxed, - ) -> crate::timebin::cached::reader::CacheWriting { - let fut = futures_util::future::ready(Ok(())); - crate::timebin::cached::reader::CacheWriting::new(Box::pin(fut)) +pub fn test_bins_gen_dim0_f32_v00( + bin_len: DtMs, + msp: u64, + offs: Range, +) -> ContainerBins { + trace!("test_bins_gen_dim0_f32_v00"); + let partt = PrebinnedPartitioning::try_from(bin_len).unwrap(); + let mut off = offs.start; + type T = f32; + let mut c = ContainerBins::::new(); + loop { + if off >= offs.end { + break; + } + let ts1 = TsNano::from_ns(partt.msp_div().ns() * msp + partt.bin_len().ns() * off as u64); + let ts2 = ts1.add_dt_nano(partt.bin_len().dt_ns()); + off += 1; + if (ts1.ns() / 1000000000) % 5 < 2 { + continue; + } + let cnt = 55; + let min = 42.0; + let max = 46.0; + let agg = 44.0; + let lst = 43.0; + let fnl = true; + c.push_back(ts1, ts2, cnt, min, max, agg, lst, fnl); + } + c +} + +pub struct TestCacheReadProvider {} + +impl TestCacheReadProvider { + pub fn new() -> Self { + Self {} + } +} + +impl CacheReadProvider for TestCacheReadProvider { + fn read(&self, series: u64, bin_len: DtMs, msp: u64, offs: Range) -> CacheReading { + trace!("TestCacheReadProvider series {}", series); + if series == 123 { + if bin_len == DtMs::from_ms_u64(1000) { + let bins = test_bins_gen_dim0_f32_v00(bin_len, msp, offs); + let x: Box = Box::new(bins); + let fut = futures_util::future::ready(Ok(Some(x))); + CacheReading::new(Box::pin(fut)) + } else { + let fut = futures_util::future::ready(Ok(None)); + CacheReading::new(Box::pin(fut)) + } + } else { + let fut = futures_util::future::ready(Ok(None)); + CacheReading::new(Box::pin(fut)) + } } } diff --git a/src/lib.rs b/src/lib.rs index 6e35e58..c2b452e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,8 +38,5 @@ fn todoval() -> T { } mod log { - #[cfg(not(test))] - pub use netpod::log::*; - #[cfg(test)] - pub use netpod::log_direct::*; + pub use netpod::log_macros_branch::*; } diff --git a/src/test/events_reader.rs b/src/test/events_reader.rs index c7f5a6d..26e517a 100644 --- a/src/test/events_reader.rs +++ b/src/test/events_reader.rs @@ -9,25 +9,24 @@ use items_2::channelevents::ChannelEvents; use netpod::range::evrange::NanoRange; use query::api4::events::EventsSubQuery; -pub struct TestEventsReader { +pub struct TestEventsReaderTy { range: NanoRange, gen: GEN, } -impl TestEventsReader { +impl TestEventsReaderTy { pub fn new(range: NanoRange, gen: GEN) -> Self { Self { range, gen } } } -impl EventsReadProvider for TestEventsReader +impl EventsReadProvider for TestEventsReaderTy where GEN: Fn(NanoRange) -> IT + Send + Sync, IT: Iterator> + Send + 'static, TY: EventValueType, { fn read(&self, evq: EventsSubQuery) -> EventsReading { - // let iter = items_2::testgen::events_gen::new_events_gen_dim1_f32_v00(self.range.clone()); let iter = (self.gen)(self.range.clone()); let iter = iter .map(|x| { @@ -47,3 +46,26 @@ where ret } } + +pub struct TestEventsReadProvider {} + +impl TestEventsReadProvider { + pub fn new() -> Self { + Self {} + } +} + +impl EventsReadProvider for TestEventsReadProvider { + fn read(&self, evq: EventsSubQuery) -> EventsReading { + let range: NanoRange = evq.range().try_into().unwrap(); + if evq.ch_conf().series() == Some(123) && evq.name() == "test-reader-dim0-f32-00" { + let gen = TestEventsReaderTy::new( + range, + items_2::testgen::events_gen::new_events_gen_dim0_f32_v00, + ); + gen.read(evq) + } else { + panic!() + } + } +} diff --git a/src/test/timebin/fromlayers.rs b/src/test/timebin/fromlayers.rs index 88099c3..d680c6e 100644 --- a/src/test/timebin/fromlayers.rs +++ b/src/test/timebin/fromlayers.rs @@ -1,9 +1,14 @@ use crate::eventsplainreader::DummyCacheReadProvider; +use crate::eventsplainreader::TestCacheReadProvider; use crate::log::*; -use crate::test::events_reader::TestEventsReader; +use crate::test::events_reader::TestEventsReadProvider; use crate::timebin::fromlayers::TimeBinnedFromLayers; use crate::timebin::opts::BinningOptions; use futures_util::StreamExt; +use items_0::on_sitemty_data; +use items_0::streamitem::sitem_data; +use items_0::timebin::BinningggContainerBinsDyn; +use items_2::binning::container_bins::ContainerBins; use netpod::query::CacheUsage; use netpod::range::evrange::NanoRange; use netpod::range::evrange::SeriesRange; @@ -15,41 +20,41 @@ use netpod::ReqCtx; use netpod::ScalarType; use netpod::SeriesKind; use netpod::Shape; +use netpod::TsNano; use query::api4::events::EventsSubQuery; use query::api4::events::EventsSubQuerySelect; use query::api4::events::EventsSubQuerySettings; use query::transform::TransformQuery; use std::sync::Arc; -#[derive(Debug, thiserror::Error)] -#[cstm(name = "Test")] -pub enum Error { - FromLayers(#[from] crate::timebin::fromlayers::Error), - Msg(String), -} +autoerr::create_error_v1!( + name(Error, "Test"), + enum variants { + FromLayers(#[from] crate::timebin::fromlayers::Error), + Msg(String), + }, +); -async fn timebin_from_layers_inner() -> Result<(), Error> { +async fn timebin_from_layers_00_inner() -> Result<(), Error> { let ctx = Arc::new(ReqCtx::for_test()); let ch_conf = ChannelTypeConfigGen::Scylla(ChConf::new( - "testing", + "test", 123, SeriesKind::ChannelData, ScalarType::F32, Shape::Scalar, - "basictest-f32", + "test-reader-dim0-f32-00", )); - let cache_usage = CacheUsage::Ignore; - let binning_opts: BinningOptions = todo!(); + let binning_opts = BinningOptions::default(); let transform_query = TransformQuery::default_time_binned(); let nano_range = NanoRange { - beg: 1000 * 1000 * 1000 * 1, - end: 1000 * 1000 * 1000 * 2, + beg: 1000 * 1000 * 1000 * 10, + end: 1000 * 1000 * 1000 * 20, }; - let cache_read_provider = Arc::new(DummyCacheReadProvider::new()); - let events_read_provider = Arc::new(TestEventsReader::new( - nano_range.clone(), - items_2::testgen::events_gen::new_events_gen_dim1_f32_v00, - )); + let bin_len = DtMs::from_ms_u64(1000); + let range = BinnedRange::from_nano_range(nano_range, bin_len); + let cache_read_provider = Arc::new(TestCacheReadProvider::new()); + let events_read_provider = Arc::new(TestEventsReadProvider::new()); // let one_before_range = true; // let series_range = SeriesRange::TimeRange(nano_range.clone()); // let select = EventsSubQuerySelect::new( @@ -59,13 +64,9 @@ async fn timebin_from_layers_inner() -> Result<(), Error> { // transform_query.clone(), // ); let settings = EventsSubQuerySettings::default(); - // let reqid = ctx.reqid().into(); let log_level = "INFO"; - // let query = EventsSubQuery::from_parts(select, settings.clone(), reqid, log_level.into()); - let bin_len_layers = [].into_iter().map(DtMs::from_ms_u64).collect(); + let bin_len_layers = [1000].into_iter().map(DtMs::from_ms_u64).collect(); let do_time_weight = true; - let bin_len = DtMs::from_ms_u64(200); - let range = BinnedRange::from_nano_range(nano_range, bin_len); let mut stream = TimeBinnedFromLayers::new( ch_conf, binning_opts, @@ -79,19 +80,82 @@ async fn timebin_from_layers_inner() -> Result<(), Error> { cache_read_provider, events_read_provider, )?; - while let Some(x) = stream.next().await { - let item = x.map_err(|e| Error::Msg(e.to_string()))?; - trace!("item {:?}", item); + let mut exp = ContainerBins::new(); + let s = r" + 10s 11s 100 10000.1 10990.1 10495.1 10990.1 + 11s 12s 100 11000.1 11990.1 11495.1 11990.1 + 12s 13s 55 42.0 46.0 44.0 43.0 + 13s 14s 55 42.0 46.0 44.0 43.0 + 14s 15s 55 42.0 46.0 44.0 43.0 + 15s 16s 100 15000.1 15990.1 15495.1 15990.1 + 16s 17s 100 16000.1 16990.1 16495.1 16990.1 + 17s 18s 55 42.0 46.0 44.0 43.0 + 18s 19s 55 42.0 46.0 44.0 43.0 + 19s 20s 55 42.0 46.0 44.0 43.0 + "; + for line in s.split("\n") { + let a: Vec<_> = line.split(" ").filter(|&x| x != " " && x != "").collect(); + info!("len {} {:?}", a.len(), a); + if a.len() == 7 { + let dt1 = humantime::parse_duration(&a[0]).unwrap(); + let dt2 = humantime::parse_duration(&a[1]).unwrap(); + exp.push_back( + TsNano::from_ms(dt1.as_millis() as u64), + TsNano::from_ms(dt2.as_millis() as u64), + a[2].parse().unwrap(), + a[3].parse().unwrap(), + a[4].parse().unwrap(), + a[5].parse().unwrap(), + a[6].parse().unwrap(), + true, + ); + } } + info!("PARSED EXP len {}", exp.len()); + let mut cbins = ContainerBins::new(); + while let Some(item) = stream.next().await { + let _ = on_sitemty_data!(item, |x: Box| { + if let Some(bins) = x.as_any_ref().downcast_ref::>() { + for (&ts1, &ts2, &cnt, min, max, agg, lst, &fnl) in itertools::izip!( + bins.ts1s_iter(), + bins.ts2s_iter(), + bins.cnts_iter(), + bins.mins_iter(), + bins.maxs_iter(), + bins.aggs_iter(), + bins.lsts_iter(), + bins.fnls_iter(), + ) { + trace!("========="); + trace!( + "{:?} {:5} {:8.2} {:8.2} {:8.2} {:8.2}", + ts1, + cnt, + min, + max, + agg, + lst, + ); + trace!("========="); + cbins.push_back(ts1, ts2, cnt, min, max, agg, lst, fnl); + } + } else { + panic!("expect f32 bins") + } + sitem_data(x) + }); + } + let cmp = items_2::binning::container_bins::compare_boxed_f32(&exp, &cbins); + assert_eq!(cmp, true); Ok(()) } #[test] -fn timebin_from_layers() { +fn timebin_from_layers_00() { let rt = tokio::runtime::Builder::new_current_thread() .build() .unwrap(); - rt.block_on(timebin_from_layers_inner()).unwrap() + rt.block_on(timebin_from_layers_00_inner()).unwrap() } async fn timebin_from_layers_1layer_inner() -> Result<(), Error> { @@ -112,10 +176,7 @@ async fn timebin_from_layers_1layer_inner() -> Result<(), Error> { end: 1000 * 1000 * 1000 * 2, }; let cache_read_provider = Arc::new(DummyCacheReadProvider::new()); - let events_read_provider = Arc::new(TestEventsReader::new( - nano_range.clone(), - items_2::testgen::events_gen::new_events_gen_dim1_f32_v00, - )); + let events_read_provider = Arc::new(TestEventsReadProvider::new()); // let one_before_range = true; // let series_range = SeriesRange::TimeRange(nano_range.clone()); // let select = EventsSubQuerySelect::new( diff --git a/src/timebin/cached/reader.rs b/src/timebin/cached/reader.rs index 0dd30bc..17359b6 100644 --- a/src/timebin/cached/reader.rs +++ b/src/timebin/cached/reader.rs @@ -174,7 +174,6 @@ impl Future for CacheWriting { pub trait CacheReadProvider: Send + Sync { fn read(&self, series: u64, bin_len: DtMs, msp: u64, offs: Range) -> CacheReading; - fn write(&self, series: u64, bins: BinsBoxed) -> CacheWriting; } pub struct CachedReader { diff --git a/src/timebin/gapfill.rs b/src/timebin/gapfill.rs index a2a8cc8..de6ad94 100644 --- a/src/timebin/gapfill.rs +++ b/src/timebin/gapfill.rs @@ -6,6 +6,7 @@ use crate::timebin::fromevents::BinnedFromEvents; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; +use items_0::streamitem::sitem_data; use items_0::streamitem::sitem_err_from_string; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; @@ -184,7 +185,10 @@ impl GapFill { Ok(()) } - fn handle_bins(mut self: Pin<&mut Self>, bins: BinsBoxed) -> Result { + fn handle_bins( + mut self: Pin<&mut Self>, + bins: BinsBoxed, + ) -> Result<(BinsBoxed, BinsBoxed), Error> { trace_handle!("{} handle_bins {}", self.dbgname, bins); // TODO could use an interface to iterate over opaque bin items that only expose // edge and count information with all remaining values opaque. @@ -206,13 +210,12 @@ impl GapFill { let mut ret = bins.empty(); let mut bins = bins; bins.drain_into(ret.as_mut(), 0..i); - self.inp_buf = Some(bins); let range = NanoRange { beg: last.ns(), end: ts1.ns(), }; self.setup_sub(range)?; - return Ok(ret); + return Ok((bins, ret)); } else { // nothing to do } @@ -228,11 +231,13 @@ impl GapFill { end: ts1.ns(), }; self.setup_sub(range)?; - return Ok(bins.empty()); + let empty = bins.empty(); + return Ok((bins, empty)); } self.last_bin_ts2 = Some(ts2); } - Ok(bins) + let empty = bins.empty(); + Ok((empty, bins)) } fn setup_inp_finer( @@ -401,7 +406,7 @@ impl Stream for GapFill { ); self.inp_finer = None; if let Some(j) = self.last_bin_ts2 { - if j.ns() != exp_finer_range.end() { + if j.ns() < exp_finer_range.end() { trace_handle!( "{} inp_finer Ready(None) last_bin_ts2 {:?} exp_finer_range {:?}", self.dbgname, @@ -419,44 +424,57 @@ impl Stream for GapFill { ); continue; } + } else if j.ns() > exp_finer_range.end() { + continue; } else { continue; } - } else if self.inp_finer_fills_gap { - error!( - "{} inp_finer Ready(None) last_bin_ts2 {:?}", - self.dbgname, self.last_bin_ts2 - ); - Ready(Some(sitem_err_from_string( - "finer input delivered nothing, received nothing at all so far", - ))) } else { + warn!( + "-----------------------------------------------------------------" + ); warn!( "{} inp_finer Ready(None) last_bin_ts2 {:?}", self.dbgname, self.last_bin_ts2 ); - continue; + if self.inp_finer_fills_gap { + error!( + "{} inp_finer Ready(None) last_bin_ts2 {:?} inp_finer_fills_gap {}", + self.dbgname, self.last_bin_ts2,self.inp_finer_fills_gap + ); + Ready(Some(sitem_err_from_string( + "finer input delivered nothing, received nothing at all so far", + ))) + } else { + warn!( + "{} inp_finer Ready(None) last_bin_ts2 {:?}", + self.dbgname, self.last_bin_ts2 + ); + continue; + } } } Pending => Pending, } } else if let Some(x) = self.inp_buf.take() { - match self.as_mut().handle_bins_finer(x) { - Ok(x) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data( - x, - ))))), + match self.as_mut().handle_bins(x) { + Ok((keep, item)) => { + if keep.len() != 0 { + self.inp_buf = Some(keep); + } + let item = sitem_data(item); + Ready(Some(item)) + } Err(e) => Ready(Some(sitem_err_from_string(e))), } } else if let Some(inp) = self.inp.as_mut() { match inp.poll_next_unpin(cx) { Ready(Some(Ok(x))) => match x { StreamItem::DataItem(RangeCompletableItem::Data(x)) => { - match self.as_mut().handle_bins(x) { - Ok(x) => Ready(Some(Ok(StreamItem::DataItem( - RangeCompletableItem::Data(x), - )))), - Err(e) => Ready(Some(sitem_err_from_string(e))), + if x.len() != 0 { + self.inp_buf = Some(x); } + continue; } StreamItem::DataItem(RangeCompletableItem::RangeComplete) => { self.inp_range_final = true; diff --git a/src/timebin/opts.rs b/src/timebin/opts.rs index 1e3aaad..40b37f5 100644 --- a/src/timebin/opts.rs +++ b/src/timebin/opts.rs @@ -10,6 +10,24 @@ pub struct BinningOptions { } impl BinningOptions { + pub fn default() -> Self { + Self { + cache_usage: CacheUsage::Read, + allow_from_events: true, + allow_from_prebinned: true, + allow_rebin: true, + } + } + + pub fn testing_no_events() -> Self { + Self { + cache_usage: CacheUsage::Read, + allow_from_events: false, + allow_from_prebinned: true, + allow_rebin: true, + } + } + pub fn cache_usage(&self) -> &CacheUsage { &self.cache_usage }