From a7cd1977dcc1f217f3f8ab5ac9ac56cac86bc8d6 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 21 Nov 2024 13:51:25 +0100 Subject: [PATCH] Simple single bin layer --- Cargo.toml | 4 +- src/lib.rs | 7 +++ src/test.rs | 1 + src/test/events_reader.rs | 45 +++++++++++++++++ src/test/timebin.rs | 3 ++ src/test/timebin/fromlayers.rs | 90 ++++++++++++++++++++++++++++++++++ src/timebin/cached/reader.rs | 26 ++++++++-- src/timebin/fromlayers.rs | 16 ++++-- src/timebin/gapfill.rs | 2 +- 9 files changed, 181 insertions(+), 13 deletions(-) create mode 100644 src/test/events_reader.rs create mode 100644 src/test/timebin/fromlayers.rs diff --git a/Cargo.toml b/Cargo.toml index c13314a..17c180b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,8 +30,8 @@ items_0 = { path = "../daqbuf-items-0", package = "daqbuf-items-0" } items_2 = { path = "../daqbuf-items-2", package = "daqbuf-items-2" } parse = { path = "../daqbuf-parse", package = "daqbuf-parse" } -#[dev-dependencies] -#taskrun = +[dev-dependencies] +tokio = { version = "1", features = ["rt"] } [features] wasm_transform = ["wasmer"] diff --git a/src/lib.rs b/src/lib.rs index a42da1c..3ea898c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,3 +35,10 @@ pub mod transform; fn todoval() -> T { todo!() } + +mod log { + #[cfg(not(test))] + pub use netpod::log::*; + #[cfg(test)] + pub use netpod::log_direct::*; +} diff --git a/src/test.rs b/src/test.rs index a4ad10a..6a44f14 100644 --- a/src/test.rs +++ b/src/test.rs @@ -1,5 +1,6 @@ mod collect; mod events; +mod events_reader; mod timebin; use futures_util::stream; diff --git a/src/test/events_reader.rs b/src/test/events_reader.rs new file mode 100644 index 0000000..fca0527 --- /dev/null +++ b/src/test/events_reader.rs @@ -0,0 +1,45 @@ +use crate::timebin::cached::reader::EventsReadProvider; +use crate::timebin::cached::reader::EventsReading; +use futures_util::StreamExt; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; +use items_2::channelevents::ChannelEvents; +use netpod::range::evrange::NanoRange; +use query::api4::events::EventsSubQuery; + +#[derive(Debug, thiserror::Error)] +#[cstm(name = "TestEventsReader")] +pub enum Error {} + +pub struct TestEventsReader { + range: NanoRange, +} + +impl TestEventsReader { + pub fn new(range: NanoRange) -> Self { + Self { range } + } +} + +impl EventsReadProvider for TestEventsReader { + fn read(&self, evq: EventsSubQuery) -> EventsReading { + let stream = items_2::testgen::events_gen::old_events_gen_dim0_f32_v00(self.range.clone()); + let stream = stream + .map(|x| { + let x = Box::new(x); + let x = ChannelEvents::Events(x); + let x: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))); + x + }) + .chain({ + use RangeCompletableItem::*; + use StreamItem::*; + let item1 = Ok(DataItem(RangeComplete)); + futures_util::stream::iter([item1]) + }); + let stream = Box::pin(stream); + let ret = EventsReading::new(stream); + ret + } +} diff --git a/src/test/timebin.rs b/src/test/timebin.rs index 7a20ab3..92283f0 100644 --- a/src/test/timebin.rs +++ b/src/test/timebin.rs @@ -1,3 +1,6 @@ +// #[cfg(test)] +mod fromlayers; + // use crate::collect::collect; // use crate::generators::GenerateI32V00; // use crate::generators::GenerateI32V01; diff --git a/src/test/timebin/fromlayers.rs b/src/test/timebin/fromlayers.rs new file mode 100644 index 0000000..0285920 --- /dev/null +++ b/src/test/timebin/fromlayers.rs @@ -0,0 +1,90 @@ +use crate::eventsplainreader::DummyCacheReadProvider; +use crate::log::*; +use crate::test::events_reader::TestEventsReader; +use crate::timebin::fromlayers::TimeBinnedFromLayers; +use futures_util::StreamExt; +use netpod::query::CacheUsage; +use netpod::range::evrange::NanoRange; +use netpod::range::evrange::SeriesRange; +use netpod::BinnedRange; +use netpod::ChConf; +use netpod::ChannelTypeConfigGen; +use netpod::DtMs; +use netpod::ReqCtx; +use netpod::ScalarType; +use netpod::SeriesKind; +use netpod::Shape; +use query::api4::events::EventsSubQuery; +use query::api4::events::EventsSubQuerySelect; +use query::api4::events::EventsSubQuerySettings; +use query::transform::TransformQuery; +use std::sync::Arc; + +#[derive(Debug, thiserror::Error)] +#[cstm(name = "Test")] +pub enum Error { + FromLayers(#[from] crate::timebin::fromlayers::Error), + Msg(String), +} + +async fn timebin_from_layers_inner() -> Result<(), Error> { + let ctx = Arc::new(ReqCtx::for_test()); + let ch_conf = ChannelTypeConfigGen::Scylla(ChConf::new( + "testing", + 123, + SeriesKind::ChannelData, + ScalarType::F32, + Shape::Scalar, + "basictest-f32", + )); + let cache_usage = CacheUsage::Ignore; + let transform_query = TransformQuery::default_time_binned(); + let nano_range = NanoRange { + beg: 1000 * 1000 * 1000 * 1, + end: 1000 * 1000 * 1000 * 2, + }; + let cache_read_provider = Arc::new(DummyCacheReadProvider::new()); + let events_read_provider = Arc::new(TestEventsReader::new(nano_range.clone())); + // let one_before_range = true; + // let series_range = SeriesRange::TimeRange(nano_range.clone()); + // let select = EventsSubQuerySelect::new( + // ch_conf.clone(), + // series_range, + // one_before_range, + // transform_query.clone(), + // ); + let settings = EventsSubQuerySettings::default(); + // let reqid = ctx.reqid().into(); + let log_level = "INFO"; + // let query = EventsSubQuery::from_parts(select, settings.clone(), reqid, log_level.into()); + let bin_len_layers = [].into_iter().map(DtMs::from_ms_u64).collect(); + let do_time_weight = true; + let bin_len = DtMs::from_ms_u64(200); + let range = BinnedRange::from_nano_range(nano_range, bin_len); + let mut stream = TimeBinnedFromLayers::new( + ch_conf, + cache_usage, + transform_query, + settings, + log_level.into(), + ctx, + range, + do_time_weight, + bin_len_layers, + cache_read_provider, + events_read_provider, + )?; + while let Some(x) = stream.next().await { + let item = x.map_err(|e| Error::Msg(e.to_string()))?; + trace!("item {:?}", item); + } + Ok(()) +} + +#[test] +fn timebin_from_layers() { + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + rt.block_on(timebin_from_layers_inner()).unwrap() +} diff --git a/src/timebin/cached/reader.rs b/src/timebin/cached/reader.rs index b7dd943..7bffac4 100644 --- a/src/timebin/cached/reader.rs +++ b/src/timebin/cached/reader.rs @@ -26,7 +26,6 @@ pub enum Error { Scylla(String), } -#[allow(unused)] macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } pub fn off_max() -> u64 { @@ -75,12 +74,23 @@ pub trait EventsReadProvider: Send + Sync { } pub struct CacheReading { - fut: Pin, streams::timebin::cached::reader::Error>> + Send>>, + fut: Pin< + Box< + dyn Future, streams::timebin::cached::reader::Error>> + + Send, + >, + >, } impl CacheReading { pub fn new( - fut: Pin, streams::timebin::cached::reader::Error>> + Send>>, + fut: Pin< + Box< + dyn Future< + Output = Result, streams::timebin::cached::reader::Error>, + > + Send, + >, + >, ) -> Self { Self { fut } } @@ -99,7 +109,11 @@ pub struct CacheWriting { } impl CacheWriting { - pub fn new(fut: Pin> + Send>>) -> Self { + pub fn new( + fut: Pin< + Box> + Send>, + >, + ) -> Self { Self { fut } } } @@ -180,7 +194,9 @@ impl Stream for CachedReader { let off2 = off2.min(off_max()); self.ts1next = TsNano::from_ns(self.bin_len.ns() * off2 + div * msp); let offs = off as u32..off2 as u32; - let fut = self.cache_read_provider.read(self.series, self.bin_len, msp, offs); + let fut = self + .cache_read_provider + .read(self.series, self.bin_len, msp, offs); self.reading = Some(Box::pin(fut)); continue; } else { diff --git a/src/timebin/fromlayers.rs b/src/timebin/fromlayers.rs index 94af54a..05bc90a 100644 --- a/src/timebin/fromlayers.rs +++ b/src/timebin/fromlayers.rs @@ -6,7 +6,7 @@ use futures_util::Stream; use futures_util::StreamExt; use items_0::streamitem::Sitemty; use items_0::timebin::BinsBoxed; -use items_2::binning::timeweight::timeweight_bins_dyn::BinnedBinsTimeweightStream; +use items_2::binning::timeweight::timeweight_bins_stream::BinnedBinsTimeweightStream; use netpod::log::*; use netpod::query::CacheUsage; use netpod::range::evrange::SeriesRange; @@ -68,7 +68,7 @@ impl TimeBinnedFromLayers { if bin_len_layers.contains(&bin_len) { debug!("{}::new bin_len in layers {:?}", Self::type_name(), range); let inp = super::gapfill::GapFill::new( - "FromLayers".into(), + "FromLayers-ongrid".into(), ch_conf.clone(), cache_usage.clone(), transform_query.clone(), @@ -97,7 +97,7 @@ impl TimeBinnedFromLayers { range_finer ); let inp = super::gapfill::GapFill::new( - "FromLayers".into(), + "FromLayers-finergrid".into(), ch_conf.clone(), cache_usage.clone(), transform_query.clone(), @@ -124,8 +124,14 @@ impl TimeBinnedFromLayers { one_before_range, transform_query.clone(), ); - let evq = EventsSubQuery::from_parts(select, sub.clone(), ctx.reqid().into(), log_level.clone()); - let inp = BinnedFromEvents::new(range, evq, do_time_weight, events_read_provider)?; + let evq = EventsSubQuery::from_parts( + select, + sub.clone(), + ctx.reqid().into(), + log_level.clone(), + ); + let inp = + BinnedFromEvents::new(range, evq, do_time_weight, events_read_provider)?; let ret = Self { inp: Box::pin(inp) }; debug!("{}::new setup from events", Self::type_name()); Ok(ret) diff --git a/src/timebin/gapfill.rs b/src/timebin/gapfill.rs index 58e7473..1a87fc3 100644 --- a/src/timebin/gapfill.rs +++ b/src/timebin/gapfill.rs @@ -9,7 +9,7 @@ use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::timebin::BinsBoxed; -use items_2::binning::timeweight::timeweight_bins_dyn::BinnedBinsTimeweightStream; +use items_2::binning::timeweight::timeweight_bins_stream::BinnedBinsTimeweightStream; use netpod::log::*; use netpod::query::CacheUsage; use netpod::range::evrange::NanoRange;