From 309c995ad7c060f6593a1dfd5d02bdab5ba9bfdc Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Sun, 8 Dec 2024 07:39:33 +0100 Subject: [PATCH] WIP cache read --- Cargo.toml | 1 + src/test/timebin/fromlayers.rs | 7 ++-- src/timebin.rs | 2 +- src/timebin/cached/reader.rs | 42 ++++++++++-------------- src/timebin/fromlayers.rs | 24 +++++++------- src/timebin/gapfill.rs | 59 +++++++++++++++++----------------- src/timebin/opts.rs | 40 +++++++++++++++++++++++ src/timebin/timebin.rs | 1 - src/timebinnedjson.rs | 3 +- 9 files changed, 106 insertions(+), 73 deletions(-) create mode 100644 src/timebin/opts.rs delete mode 100644 src/timebin/timebin.rs diff --git a/Cargo.toml b/Cargo.toml index 17c180b..ab04846 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ http = "1" http-body = "1" http-body-util = "0.1.0" thiserror = "=0.0.1" +autoerr = "0.0.3" chrono = { version = "0.4.38", features = ["serde"] } wasmer = { version = "5.0.1", default-features = false, features = ["sys", "cranelift"], optional = true } netpod = { path = "../daqbuf-netpod", package = "daqbuf-netpod" } diff --git a/src/test/timebin/fromlayers.rs b/src/test/timebin/fromlayers.rs index c4ce2cb..00d8791 100644 --- a/src/test/timebin/fromlayers.rs +++ b/src/test/timebin/fromlayers.rs @@ -2,6 +2,7 @@ use crate::eventsplainreader::DummyCacheReadProvider; use crate::log::*; use crate::test::events_reader::TestEventsReader; use crate::timebin::fromlayers::TimeBinnedFromLayers; +use crate::timebin::opts::BinningOptions; use futures_util::StreamExt; use netpod::query::CacheUsage; use netpod::range::evrange::NanoRange; @@ -38,6 +39,7 @@ async fn timebin_from_layers_inner() -> Result<(), Error> { "basictest-f32", )); let cache_usage = CacheUsage::Ignore; + let binning_opts: BinningOptions = todo!(); let transform_query = TransformQuery::default_time_binned(); let nano_range = NanoRange { beg: 1000 * 1000 * 1000 * 1, @@ -63,7 +65,7 @@ async fn timebin_from_layers_inner() -> Result<(), Error> { let range = BinnedRange::from_nano_range(nano_range, bin_len); let mut stream = TimeBinnedFromLayers::new( ch_conf, - cache_usage, + binning_opts, transform_query, settings, log_level.into(), @@ -100,6 +102,7 @@ async fn timebin_from_layers_1layer_inner() -> Result<(), Error> { "basictest-f32", )); let cache_usage = CacheUsage::Ignore; + let binning_opts: BinningOptions = todo!(); let transform_query = TransformQuery::default_time_binned(); let nano_range = NanoRange { beg: 1000 * 1000 * 1000 * 1, @@ -125,7 +128,7 @@ async fn timebin_from_layers_1layer_inner() -> Result<(), Error> { let range = BinnedRange::from_nano_range(nano_range, bin_len); let mut stream = TimeBinnedFromLayers::new( ch_conf, - cache_usage, + binning_opts, transform_query, settings, log_level.into(), diff --git a/src/timebin.rs b/src/timebin.rs index 0c87f2e..c9e5b9d 100644 --- a/src/timebin.rs +++ b/src/timebin.rs @@ -1,7 +1,7 @@ pub mod cached; pub mod fromevents; pub mod fromlayers; -pub mod timebin; +pub mod opts; mod basic; mod gapfill; diff --git a/src/timebin/cached/reader.rs b/src/timebin/cached/reader.rs index 7bffac4..7b3bf7f 100644 --- a/src/timebin/cached/reader.rs +++ b/src/timebin/cached/reader.rs @@ -1,11 +1,11 @@ use crate as streams; +use crate::log::*; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; use items_0::streamitem::Sitemty; use items_0::timebin::BinsBoxed; use items_2::channelevents::ChannelEvents; -use netpod::log::*; use netpod::BinnedRange; use netpod::DtMs; use netpod::TsNano; @@ -17,17 +17,22 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; -#[derive(Debug, thiserror::Error)] -#[cstm(name = "BinCachedReader")] -pub enum Error { - TodoImpl, - ChannelSend, - ChannelRecv, - Scylla(String), -} +autoerr::create_error_v1!( + name(Error, "BinCachedReader"), + enum variants { + TodoImpl, + ChannelSend, + ChannelRecv, + Scylla(String), + }, +); macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } +pub type BinsReadResErr = streams::timebin::cached::reader::Error; +pub type BinsReadRes = Result, BinsReadResErr>; +pub type BinsReadFutBoxed = Pin + Send>>; + pub fn off_max() -> u64 { 1000 } @@ -74,30 +79,17 @@ pub trait EventsReadProvider: Send + Sync { } pub struct CacheReading { - fut: Pin< - Box< - dyn Future, streams::timebin::cached::reader::Error>> - + Send, - >, - >, + fut: BinsReadFutBoxed, } impl CacheReading { - pub fn new( - fut: Pin< - Box< - dyn Future< - Output = Result, streams::timebin::cached::reader::Error>, - > + Send, - >, - >, - ) -> Self { + pub fn new(fut: BinsReadFutBoxed) -> Self { Self { fut } } } impl Future for CacheReading { - type Output = Result, streams::timebin::cached::reader::Error>; + type Output = BinsReadRes; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { self.fut.poll_unpin(cx) diff --git a/src/timebin/fromlayers.rs b/src/timebin/fromlayers.rs index 22ca79f..d5a513b 100644 --- a/src/timebin/fromlayers.rs +++ b/src/timebin/fromlayers.rs @@ -1,5 +1,6 @@ use super::cached::reader::CacheReadProvider; use super::cached::reader::EventsReadProvider; +use super::opts::BinningOptions; use crate::log::*; use crate::timebin::fromevents::BinnedFromEvents; use crate::timebin::gapfill::GapFill; @@ -9,7 +10,6 @@ use futures_util::StreamExt; use items_0::streamitem::Sitemty; use items_0::timebin::BinsBoxed; use items_2::binning::timeweight::timeweight_bins_stream::BinnedBinsTimeweightStream; -use netpod::query::CacheUsage; use netpod::range::evrange::SeriesRange; use netpod::BinnedRange; use netpod::ChannelTypeConfigGen; @@ -27,14 +27,14 @@ use std::task::Poll; macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } -#[derive(Debug, thiserror::Error)] -#[cstm(name = "TimeBinnedFromLayers")] -pub enum Error { - GapFill(#[from] super::gapfill::Error), - BinnedFromEvents(#[from] super::fromevents::Error), - #[error("FinerGridMismatch({0}, {1})")] - FinerGridMismatch(DtMs, DtMs), -} +autoerr::create_error_v1!( + name(Error, "TimeBinnedFromLayers"), + enum variants { + GapFill(#[from] super::gapfill::Error), + BinnedFromEvents(#[from] super::fromevents::Error), + FinerGridMismatch(DtMs, DtMs), + }, +); type BoxedInput = Pin> + Send>>; @@ -49,7 +49,7 @@ impl TimeBinnedFromLayers { pub fn new( ch_conf: ChannelTypeConfigGen, - cache_usage: CacheUsage, + binning_opts: BinningOptions, transform_query: TransformQuery, sub: EventsSubQuerySettings, log_level: String, @@ -73,7 +73,7 @@ impl TimeBinnedFromLayers { let inp = GapFill::new( "FromLayers-ongrid".into(), ch_conf.clone(), - cache_usage.clone(), + binning_opts.clone(), transform_query.clone(), sub.clone(), log_level.clone(), @@ -107,7 +107,7 @@ impl TimeBinnedFromLayers { let inp = GapFill::new( "FromLayers-finergrid".into(), ch_conf.clone(), - cache_usage.clone(), + binning_opts.clone(), transform_query.clone(), sub.clone(), log_level.clone(), diff --git a/src/timebin/gapfill.rs b/src/timebin/gapfill.rs index b95b5f9..40c60f2 100644 --- a/src/timebin/gapfill.rs +++ b/src/timebin/gapfill.rs @@ -1,5 +1,6 @@ use super::cached::reader::CacheReadProvider; use super::cached::reader::EventsReadProvider; +use super::opts::BinningOptions; use crate::log::*; use crate::timebin::fromevents::BinnedFromEvents; use futures_util::FutureExt; @@ -11,7 +12,6 @@ use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::timebin::BinsBoxed; use items_2::binning::timeweight::timeweight_bins_stream::BinnedBinsTimeweightStream; -use netpod::query::CacheUsage; use netpod::range::evrange::NanoRange; use netpod::range::evrange::SeriesRange; use netpod::BinnedRange; @@ -28,30 +28,27 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; -#[allow(unused)] macro_rules! debug_init { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) } -#[allow(unused)] macro_rules! debug_setup { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) } -#[allow(unused)] -macro_rules! debug_cache { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) } +macro_rules! trace_cache { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) } -#[allow(unused)] macro_rules! trace_handle { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } -#[derive(Debug, thiserror::Error)] -#[cstm(name = "BinCachedGapFill")] -pub enum Error { - CacheReader(#[from] super::cached::reader::Error), - #[error("GapFromFiner({0}, {1}, {2})")] - GapFromFiner(TsNano, TsNano, DtMs), - #[error("MissingBegFromFiner({0}, {1}, {2})")] - MissingBegFromFiner(TsNano, TsNano, DtMs), - #[error("InputBeforeRange({0}, {1})")] - InputBeforeRange(NanoRange, BinnedRange), - EventsReader(#[from] super::fromevents::Error), -} +autoerr::create_error_v1!( + name(Error, "BinCachedGapFill"), + enum variants { + CacheReader(#[from] super::cached::reader::Error), + // #[error("GapFromFiner({0}, {1}, {2})")] + GapFromFiner(TsNano, TsNano, DtMs), + // #[error("MissingBegFromFiner({0}, {1}, {2})")] + MissingBegFromFiner(TsNano, TsNano, DtMs), + // #[error("InputBeforeRange({0}, {1})")] + InputBeforeRange(NanoRange, BinnedRange), + EventsReader(#[from] super::fromevents::Error), + }, +); type Input = Pin> + Send>>; @@ -60,7 +57,7 @@ type Input = Pin> + Send>>; pub struct GapFill { dbgname: String, ch_conf: ChannelTypeConfigGen, - cache_usage: CacheUsage, + binning_opts: BinningOptions, transform_query: TransformQuery, sub: EventsSubQuerySettings, log_level: String, @@ -90,7 +87,7 @@ impl GapFill { pub fn new( dbgname_parent: String, ch_conf: ChannelTypeConfigGen, - cache_usage: CacheUsage, + binning_opts: BinningOptions, transform_query: TransformQuery, sub: EventsSubQuerySettings, log_level: String, @@ -103,7 +100,7 @@ impl GapFill { ) -> Result { let dbgname = format!("{}--[{}]", dbgname_parent, range); debug_init!("new dbgname {}", dbgname); - let inp = if cache_usage.is_cache_read() { + let inp = if binning_opts.cache_usage().is_cache_read() { let series = ch_conf.series().expect("series id for cache read"); let stream = super::cached::reader::CachedReader::new( series, @@ -122,7 +119,7 @@ impl GapFill { let ret = Self { dbgname, ch_conf, - cache_usage, + binning_opts, transform_query, sub, log_level, @@ -173,13 +170,14 @@ impl GapFill { .get_or_insert_with(|| bins.empty()); bins2.drain_into(dst.as_mut(), 0..bins2.len()); } - if self.cache_usage.is_cache_write() { + if self.binning_opts.cache_usage().is_cache_write() { self.cache_write_intermediate()?; - } // TODO make sure that input does not send "made-up" empty future bins. - // On the other hand, if the request is over past range, but the channel was silent ever since? - // Then we should in principle know that from is-alive status checking. - // So, until then, allow made-up bins? - // Maybe, for now, only write those bins before some last non-zero-count bin. The only safe way. + } + // TODO make sure that input does not send "made-up" empty future bins. + // On the other hand, if the request is over past range, but the channel was silent ever since? + // Then we should in principle know that from is-alive status checking. + // So, until then, allow made-up bins? + // Maybe, for now, only write those bins before some last non-zero-count bin. The only safe way. Ok(bins) } @@ -265,7 +263,7 @@ impl GapFill { let inp_finer = GapFill::new( self.dbgname.clone(), self.ch_conf.clone(), - self.cache_usage.clone(), + self.binning_opts.clone(), self.transform_query.clone(), self.sub.clone(), self.log_level.clone(), @@ -338,6 +336,7 @@ impl GapFill { fn cache_write_intermediate(self: Pin<&mut Self>) -> Result<(), Error> { // TODO See cache_write_on_end + trace_cache!("maybe write to cache"); Ok(()) } } @@ -377,7 +376,7 @@ impl Stream for GapFill { trace_handle!("{} RECV RANGE FINAL", self.dbgname); self.inp_finer_range_final = true; self.inp_finer_range_final_cnt += 1; - if self.cache_usage.is_cache_write() { + if self.binning_opts.cache_usage().is_cache_write() { match self.as_mut().cache_write_on_end() { Ok(()) => continue, Err(e) => Ready(Some(sitem_err_from_string(e))), diff --git a/src/timebin/opts.rs b/src/timebin/opts.rs new file mode 100644 index 0000000..1e3aaad --- /dev/null +++ b/src/timebin/opts.rs @@ -0,0 +1,40 @@ +use netpod::query::CacheUsage; +use query::api4::binned::BinnedQuery; + +#[derive(Debug, Clone)] +pub struct BinningOptions { + cache_usage: CacheUsage, + allow_from_events: bool, + allow_from_prebinned: bool, + allow_rebin: bool, +} + +impl BinningOptions { + pub fn cache_usage(&self) -> &CacheUsage { + &self.cache_usage + } + + pub fn allow_from_events(&self) -> bool { + self.allow_from_events + } + + pub fn allow_from_prebinned(&self) -> bool { + self.allow_from_prebinned + } + + pub fn allow_rebin(&self) -> bool { + self.allow_rebin + } +} + +impl From<&BinnedQuery> for BinningOptions { + fn from(value: &BinnedQuery) -> Self { + let cache_usage = value.cache_usage().unwrap_or(CacheUsage::Ignore); + Self { + cache_usage, + allow_from_events: value.allow_from_events().unwrap_or(true), + allow_from_prebinned: value.allow_from_prebinned().unwrap_or(true), + allow_rebin: value.allow_rebin().unwrap_or(true), + } + } +} diff --git a/src/timebin/timebin.rs b/src/timebin/timebin.rs deleted file mode 100644 index 8b13789..0000000 --- a/src/timebin/timebin.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/timebinnedjson.rs b/src/timebinnedjson.rs index b2901f5..e27ed6f 100644 --- a/src/timebinnedjson.rs +++ b/src/timebinnedjson.rs @@ -265,7 +265,6 @@ async fn timebinned_stream( events_read_provider: Arc, ) -> Result>> + Send>>, Error> { use netpod::query::CacheUsage; - let cache_usage = query.cache_usage().unwrap_or(CacheUsage::Ignore); let do_time_weight = true; let bin_len_layers = if let Some(subgrids) = query.subgrids() { subgrids @@ -277,7 +276,7 @@ async fn timebinned_stream( }; let stream = crate::timebin::fromlayers::TimeBinnedFromLayers::new( ch_conf, - cache_usage, + (&query).into(), query.transform().clone(), EventsSubQuerySettings::from(&query), query.log_level().into(),