Simple single bin layer

This commit is contained in:
Dominik Werder
2024-11-21 13:51:25 +01:00
parent 1a0d3dc5c9
commit a7cd1977dc
9 changed files with 181 additions and 13 deletions

View File

@@ -30,8 +30,8 @@ items_0 = { path = "../daqbuf-items-0", package = "daqbuf-items-0" }
items_2 = { path = "../daqbuf-items-2", package = "daqbuf-items-2" }
parse = { path = "../daqbuf-parse", package = "daqbuf-parse" }
#[dev-dependencies]
#taskrun =
[dev-dependencies]
tokio = { version = "1", features = ["rt"] }
[features]
wasm_transform = ["wasmer"]

View File

@@ -35,3 +35,10 @@ pub mod transform;
fn todoval<T>() -> T {
todo!()
}
mod log {
#[cfg(not(test))]
pub use netpod::log::*;
#[cfg(test)]
pub use netpod::log_direct::*;
}

View File

@@ -1,5 +1,6 @@
mod collect;
mod events;
mod events_reader;
mod timebin;
use futures_util::stream;

45
src/test/events_reader.rs Normal file
View File

@@ -0,0 +1,45 @@
use crate::timebin::cached::reader::EventsReadProvider;
use crate::timebin::cached::reader::EventsReading;
use futures_util::StreamExt;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_2::channelevents::ChannelEvents;
use netpod::range::evrange::NanoRange;
use query::api4::events::EventsSubQuery;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "TestEventsReader")]
pub enum Error {}
pub struct TestEventsReader {
range: NanoRange,
}
impl TestEventsReader {
pub fn new(range: NanoRange) -> Self {
Self { range }
}
}
impl EventsReadProvider for TestEventsReader {
fn read(&self, evq: EventsSubQuery) -> EventsReading {
let stream = items_2::testgen::events_gen::old_events_gen_dim0_f32_v00(self.range.clone());
let stream = stream
.map(|x| {
let x = Box::new(x);
let x = ChannelEvents::Events(x);
let x: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)));
x
})
.chain({
use RangeCompletableItem::*;
use StreamItem::*;
let item1 = Ok(DataItem(RangeComplete));
futures_util::stream::iter([item1])
});
let stream = Box::pin(stream);
let ret = EventsReading::new(stream);
ret
}
}

View File

@@ -1,3 +1,6 @@
// #[cfg(test)]
mod fromlayers;
// use crate::collect::collect;
// use crate::generators::GenerateI32V00;
// use crate::generators::GenerateI32V01;

View File

@@ -0,0 +1,90 @@
use crate::eventsplainreader::DummyCacheReadProvider;
use crate::log::*;
use crate::test::events_reader::TestEventsReader;
use crate::timebin::fromlayers::TimeBinnedFromLayers;
use futures_util::StreamExt;
use netpod::query::CacheUsage;
use netpod::range::evrange::NanoRange;
use netpod::range::evrange::SeriesRange;
use netpod::BinnedRange;
use netpod::ChConf;
use netpod::ChannelTypeConfigGen;
use netpod::DtMs;
use netpod::ReqCtx;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
use query::api4::events::EventsSubQuery;
use query::api4::events::EventsSubQuerySelect;
use query::api4::events::EventsSubQuerySettings;
use query::transform::TransformQuery;
use std::sync::Arc;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "Test")]
pub enum Error {
FromLayers(#[from] crate::timebin::fromlayers::Error),
Msg(String),
}
async fn timebin_from_layers_inner() -> Result<(), Error> {
let ctx = Arc::new(ReqCtx::for_test());
let ch_conf = ChannelTypeConfigGen::Scylla(ChConf::new(
"testing",
123,
SeriesKind::ChannelData,
ScalarType::F32,
Shape::Scalar,
"basictest-f32",
));
let cache_usage = CacheUsage::Ignore;
let transform_query = TransformQuery::default_time_binned();
let nano_range = NanoRange {
beg: 1000 * 1000 * 1000 * 1,
end: 1000 * 1000 * 1000 * 2,
};
let cache_read_provider = Arc::new(DummyCacheReadProvider::new());
let events_read_provider = Arc::new(TestEventsReader::new(nano_range.clone()));
// let one_before_range = true;
// let series_range = SeriesRange::TimeRange(nano_range.clone());
// let select = EventsSubQuerySelect::new(
// ch_conf.clone(),
// series_range,
// one_before_range,
// transform_query.clone(),
// );
let settings = EventsSubQuerySettings::default();
// let reqid = ctx.reqid().into();
let log_level = "INFO";
// let query = EventsSubQuery::from_parts(select, settings.clone(), reqid, log_level.into());
let bin_len_layers = [].into_iter().map(DtMs::from_ms_u64).collect();
let do_time_weight = true;
let bin_len = DtMs::from_ms_u64(200);
let range = BinnedRange::from_nano_range(nano_range, bin_len);
let mut stream = TimeBinnedFromLayers::new(
ch_conf,
cache_usage,
transform_query,
settings,
log_level.into(),
ctx,
range,
do_time_weight,
bin_len_layers,
cache_read_provider,
events_read_provider,
)?;
while let Some(x) = stream.next().await {
let item = x.map_err(|e| Error::Msg(e.to_string()))?;
trace!("item {:?}", item);
}
Ok(())
}
#[test]
fn timebin_from_layers() {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
rt.block_on(timebin_from_layers_inner()).unwrap()
}

View File

@@ -26,7 +26,6 @@ pub enum Error {
Scylla(String),
}
#[allow(unused)]
macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
pub fn off_max() -> u64 {
@@ -75,12 +74,23 @@ pub trait EventsReadProvider: Send + Sync {
}
pub struct CacheReading {
fut: Pin<Box<dyn Future<Output = Result<Option<BinsBoxed>, streams::timebin::cached::reader::Error>> + Send>>,
fut: Pin<
Box<
dyn Future<Output = Result<Option<BinsBoxed>, streams::timebin::cached::reader::Error>>
+ Send,
>,
>,
}
impl CacheReading {
pub fn new(
fut: Pin<Box<dyn Future<Output = Result<Option<BinsBoxed>, streams::timebin::cached::reader::Error>> + Send>>,
fut: Pin<
Box<
dyn Future<
Output = Result<Option<BinsBoxed>, streams::timebin::cached::reader::Error>,
> + Send,
>,
>,
) -> Self {
Self { fut }
}
@@ -99,7 +109,11 @@ pub struct CacheWriting {
}
impl CacheWriting {
pub fn new(fut: Pin<Box<dyn Future<Output = Result<(), streams::timebin::cached::reader::Error>> + Send>>) -> Self {
pub fn new(
fut: Pin<
Box<dyn Future<Output = Result<(), streams::timebin::cached::reader::Error>> + Send>,
>,
) -> Self {
Self { fut }
}
}
@@ -180,7 +194,9 @@ impl Stream for CachedReader {
let off2 = off2.min(off_max());
self.ts1next = TsNano::from_ns(self.bin_len.ns() * off2 + div * msp);
let offs = off as u32..off2 as u32;
let fut = self.cache_read_provider.read(self.series, self.bin_len, msp, offs);
let fut = self
.cache_read_provider
.read(self.series, self.bin_len, msp, offs);
self.reading = Some(Box::pin(fut));
continue;
} else {

View File

@@ -6,7 +6,7 @@ use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::Sitemty;
use items_0::timebin::BinsBoxed;
use items_2::binning::timeweight::timeweight_bins_dyn::BinnedBinsTimeweightStream;
use items_2::binning::timeweight::timeweight_bins_stream::BinnedBinsTimeweightStream;
use netpod::log::*;
use netpod::query::CacheUsage;
use netpod::range::evrange::SeriesRange;
@@ -68,7 +68,7 @@ impl TimeBinnedFromLayers {
if bin_len_layers.contains(&bin_len) {
debug!("{}::new bin_len in layers {:?}", Self::type_name(), range);
let inp = super::gapfill::GapFill::new(
"FromLayers".into(),
"FromLayers-ongrid".into(),
ch_conf.clone(),
cache_usage.clone(),
transform_query.clone(),
@@ -97,7 +97,7 @@ impl TimeBinnedFromLayers {
range_finer
);
let inp = super::gapfill::GapFill::new(
"FromLayers".into(),
"FromLayers-finergrid".into(),
ch_conf.clone(),
cache_usage.clone(),
transform_query.clone(),
@@ -124,8 +124,14 @@ impl TimeBinnedFromLayers {
one_before_range,
transform_query.clone(),
);
let evq = EventsSubQuery::from_parts(select, sub.clone(), ctx.reqid().into(), log_level.clone());
let inp = BinnedFromEvents::new(range, evq, do_time_weight, events_read_provider)?;
let evq = EventsSubQuery::from_parts(
select,
sub.clone(),
ctx.reqid().into(),
log_level.clone(),
);
let inp =
BinnedFromEvents::new(range, evq, do_time_weight, events_read_provider)?;
let ret = Self { inp: Box::pin(inp) };
debug!("{}::new setup from events", Self::type_name());
Ok(ret)

View File

@@ -9,7 +9,7 @@ use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::timebin::BinsBoxed;
use items_2::binning::timeweight::timeweight_bins_dyn::BinnedBinsTimeweightStream;
use items_2::binning::timeweight::timeweight_bins_stream::BinnedBinsTimeweightStream;
use netpod::log::*;
use netpod::query::CacheUsage;
use netpod::range::evrange::NanoRange;