This commit is contained in:
Dominik Werder
2024-09-06 22:21:15 +02:00
parent 2c523810c1
commit e701c1b379
4 changed files with 35 additions and 6 deletions

View File

@@ -5,19 +5,35 @@ use items_2::binsdim0::BinsDim0;
use netpod::BinnedRange;
use netpod::DtMs;
use netpod::TsNano;
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
pub struct Reading {
fut: Pin<Box<dyn Future<Output = Result<BinsDim0<f32>, Box<dyn std::error::Error>>> + Send>>,
}
pub trait CacheReadProvider: Send {
fn read(&self) -> Reading;
}
#[derive(Debug, ThisError)]
#[cstm(name = "BinCachedReader")]
pub enum Error {}
pub struct CachedReader {}
pub struct CachedReader {
cache_read_provider: Box<dyn CacheReadProvider>,
}
impl CachedReader {
pub fn new(series: u64, bin_len: DtMs, range: BinnedRange<TsNano>) -> Result<Self, Error> {
let ret = Self {};
pub fn new(
series: u64,
bin_len: DtMs,
range: BinnedRange<TsNano>,
cache_read_provider: Box<dyn CacheReadProvider>,
) -> Result<Self, Error> {
let ret = Self { cache_read_provider };
Ok(ret)
}
}

View File

@@ -1,3 +1,4 @@
use super::cached::reader::CacheReadProvider;
use crate::timebin::grid::find_next_finer_bin_len;
use err::thiserror;
use err::ThisError;
@@ -42,6 +43,7 @@ impl TimeBinnedFromLayers {
range: BinnedRange<TsNano>,
do_time_weight: bool,
bin_len_layers: Vec<DtMs>,
cache_read_provider: Box<dyn CacheReadProvider>,
) -> Result<Self, Error> {
info!(
"{}::new {:?} {:?} {:?}",
@@ -59,7 +61,7 @@ impl TimeBinnedFromLayers {
let bin_len = DtMs::from_ms_u64(range.bin_len.ms());
if bin_len_layers.contains(&bin_len) {
info!("{}::new bin_len in layers", Self::type_name());
let inp = super::gapfill::GapFill::new(series, range, do_time_weight, bin_len_layers)?;
let inp = super::gapfill::GapFill::new(series, range, do_time_weight, bin_len_layers, cache_read_provider)?;
let ret = Self { inp: Box::pin(inp) };
Ok(ret)
} else {
@@ -69,7 +71,13 @@ impl TimeBinnedFromLayers {
// produce from binned sub-stream with additional binner.
let range = BinnedRange::from_nano_range(range.to_nano_range(), finer);
info!("{}::new next finer {:?} {:?}", Self::type_name(), finer, range);
let inp = super::gapfill::GapFill::new(series, range.clone(), do_time_weight, bin_len_layers)?;
let inp = super::gapfill::GapFill::new(
series,
range.clone(),
do_time_weight,
bin_len_layers,
cache_read_provider,
)?;
let inp = super::basic::TimeBinnedStream::new(
Box::pin(inp),
BinnedRangeEnum::Time(range),

View File

@@ -1,3 +1,4 @@
use super::cached::reader::CacheReadProvider;
use err::thiserror;
use err::ThisError;
use futures_util::Stream;
@@ -32,10 +33,12 @@ impl GapFill {
range: BinnedRange<TsNano>,
do_time_weight: bool,
bin_len_layers: Vec<DtMs>,
cache_read_provider: Box<dyn CacheReadProvider>,
) -> Result<Self, Error> {
// super::fromlayers::TimeBinnedFromLayers::new(series, range, do_time_weight, bin_len_layers)?;
let inp =
super::cached::reader::CachedReader::new(series, range.bin_len.to_dt_ms(), range)?.map_err(Error::from);
super::cached::reader::CachedReader::new(series, range.bin_len.to_dt_ms(), range, cache_read_provider)?
.map_err(Error::from);
let ret = Self { inp: Box::pin(inp) };
Ok(ret)
}

View File

@@ -256,11 +256,13 @@ async fn timebinned_stream(
// DtMs::from_ms_u64(1000 * 10),
]
};
let cache_read_provider = err::todoval();
let stream = crate::timebin::TimeBinnedFromLayers::new(
series,
binned_range.binned_range_time(),
do_time_weight,
bin_len_layers,
cache_read_provider,
)
.map_err(Error::from_string)?;
let stream = stream.map(|item| {