From e701c1b379f33bcd431dd01d4c334a1922b19115 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 6 Sep 2024 22:21:15 +0200 Subject: [PATCH] WIP --- crates/streams/src/timebin/cached/reader.rs | 22 ++++++++++++++++++--- crates/streams/src/timebin/fromlayers.rs | 12 +++++++++-- crates/streams/src/timebin/gapfill.rs | 5 ++++- crates/streams/src/timebinnedjson.rs | 2 ++ 4 files changed, 35 insertions(+), 6 deletions(-) diff --git a/crates/streams/src/timebin/cached/reader.rs b/crates/streams/src/timebin/cached/reader.rs index 185e264..b554ad9 100644 --- a/crates/streams/src/timebin/cached/reader.rs +++ b/crates/streams/src/timebin/cached/reader.rs @@ -5,19 +5,35 @@ use items_2::binsdim0::BinsDim0; use netpod::BinnedRange; use netpod::DtMs; use netpod::TsNano; +use std::future::Future; use std::pin::Pin; use std::task::Context; use std::task::Poll; +pub struct Reading { + fut: Pin, Box>> + Send>>, +} + +pub trait CacheReadProvider: Send { + fn read(&self) -> Reading; +} + #[derive(Debug, ThisError)] #[cstm(name = "BinCachedReader")] pub enum Error {} -pub struct CachedReader {} +pub struct CachedReader { + cache_read_provider: Box, +} impl CachedReader { - pub fn new(series: u64, bin_len: DtMs, range: BinnedRange) -> Result { - let ret = Self {}; + pub fn new( + series: u64, + bin_len: DtMs, + range: BinnedRange, + cache_read_provider: Box, + ) -> Result { + let ret = Self { cache_read_provider }; Ok(ret) } } diff --git a/crates/streams/src/timebin/fromlayers.rs b/crates/streams/src/timebin/fromlayers.rs index 43e1763..24fdeb3 100644 --- a/crates/streams/src/timebin/fromlayers.rs +++ b/crates/streams/src/timebin/fromlayers.rs @@ -1,3 +1,4 @@ +use super::cached::reader::CacheReadProvider; use crate::timebin::grid::find_next_finer_bin_len; use err::thiserror; use err::ThisError; @@ -42,6 +43,7 @@ impl TimeBinnedFromLayers { range: BinnedRange, do_time_weight: bool, bin_len_layers: Vec, + cache_read_provider: Box, ) -> Result { info!( "{}::new {:?} {:?} {:?}", @@ -59,7 +61,7 @@ impl TimeBinnedFromLayers { let bin_len = DtMs::from_ms_u64(range.bin_len.ms()); if bin_len_layers.contains(&bin_len) { info!("{}::new bin_len in layers", Self::type_name()); - let inp = super::gapfill::GapFill::new(series, range, do_time_weight, bin_len_layers)?; + let inp = super::gapfill::GapFill::new(series, range, do_time_weight, bin_len_layers, cache_read_provider)?; let ret = Self { inp: Box::pin(inp) }; Ok(ret) } else { @@ -69,7 +71,13 @@ impl TimeBinnedFromLayers { // produce from binned sub-stream with additional binner. let range = BinnedRange::from_nano_range(range.to_nano_range(), finer); info!("{}::new next finer {:?} {:?}", Self::type_name(), finer, range); - let inp = super::gapfill::GapFill::new(series, range.clone(), do_time_weight, bin_len_layers)?; + let inp = super::gapfill::GapFill::new( + series, + range.clone(), + do_time_weight, + bin_len_layers, + cache_read_provider, + )?; let inp = super::basic::TimeBinnedStream::new( Box::pin(inp), BinnedRangeEnum::Time(range), diff --git a/crates/streams/src/timebin/gapfill.rs b/crates/streams/src/timebin/gapfill.rs index db82cb2..3628254 100644 --- a/crates/streams/src/timebin/gapfill.rs +++ b/crates/streams/src/timebin/gapfill.rs @@ -1,3 +1,4 @@ +use super::cached::reader::CacheReadProvider; use err::thiserror; use err::ThisError; use futures_util::Stream; @@ -32,10 +33,12 @@ impl GapFill { range: BinnedRange, do_time_weight: bool, bin_len_layers: Vec, + cache_read_provider: Box, ) -> Result { // super::fromlayers::TimeBinnedFromLayers::new(series, range, do_time_weight, bin_len_layers)?; let inp = - super::cached::reader::CachedReader::new(series, range.bin_len.to_dt_ms(), range)?.map_err(Error::from); + super::cached::reader::CachedReader::new(series, range.bin_len.to_dt_ms(), range, cache_read_provider)? + .map_err(Error::from); let ret = Self { inp: Box::pin(inp) }; Ok(ret) } diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index 4009eda..42bdeee 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -256,11 +256,13 @@ async fn timebinned_stream( // DtMs::from_ms_u64(1000 * 10), ] }; + let cache_read_provider = err::todoval(); let stream = crate::timebin::TimeBinnedFromLayers::new( series, binned_range.binned_range_time(), do_time_weight, bin_len_layers, + cache_read_provider, ) .map_err(Error::from_string)?; let stream = stream.map(|item| {