diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index a09ab4c..81a9b2a 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -1760,11 +1760,22 @@ impl DtMs { 1000000 * self.0 } + pub const fn dt_ns(&self) -> DtNano { + DtNano::from_ms(self.0) + } + pub const fn to_i64(&self) -> i64 { self.0 as i64 } } +impl fmt::Display for DtMs { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let dur = Duration::from_millis(self.ms()); + write!(fmt, "{}", humantime::format_duration(dur)) + } +} + #[derive(Copy, Clone, PartialEq, PartialOrd, Eq, Ord)] pub struct TsNano(u64); @@ -1866,6 +1877,10 @@ impl TsNano { TsMs::from_ms_u64(self.ms()) } + pub const fn to_dt_ms(self) -> DtMs { + DtMs::from_ms_u64(self.ms()) + } + pub fn from_system_time(st: SystemTime) -> Self { let tsunix = st.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO); let x = tsunix.as_secs() * 1_000_000_000 + tsunix.subsec_nanos() as u64; @@ -2378,6 +2393,16 @@ impl BinnedRange { pub fn to_nano_range(&self) -> NanoRange { self.full_range() } + + pub fn from_nano_range(range: NanoRange, bin_len: DtMs) -> Self { + let off1 = range.beg() / bin_len.ns(); + let off2 = (bin_len.ns() - 1 + range.end()) / bin_len.ns(); + Self { + bin_len: TsNano::from_ns(bin_len.ns()), + bin_off: off1, + bin_cnt: off2 - off1, + } + } } impl BinnedRange diff --git a/crates/streams/src/timebin/cached/reader.rs b/crates/streams/src/timebin/cached/reader.rs index 66b7a6e..185e264 100644 --- a/crates/streams/src/timebin/cached/reader.rs +++ b/crates/streams/src/timebin/cached/reader.rs @@ -16,8 +16,9 @@ pub enum Error {} pub struct CachedReader {} impl CachedReader { - pub fn new(series: u64, bin_len: DtMs, range: BinnedRange) -> Self { - todo!() + pub fn new(series: u64, bin_len: DtMs, range: BinnedRange) -> Result { + let ret = Self {}; + Ok(ret) } } @@ -25,6 +26,7 @@ impl Stream for CachedReader { type Item = Result, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - todo!() + use Poll::*; + Ready(None) } } diff --git a/crates/streams/src/timebin/fromlayers.rs b/crates/streams/src/timebin/fromlayers.rs index 2fd75cb..43e1763 100644 --- a/crates/streams/src/timebin/fromlayers.rs +++ b/crates/streams/src/timebin/fromlayers.rs @@ -4,6 +4,7 @@ use err::ThisError; use futures_util::Stream; use futures_util::StreamExt; use futures_util::TryStreamExt; +use items_0::on_sitemty_data; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; @@ -57,7 +58,8 @@ impl TimeBinnedFromLayers { // must produce bins missing in cache from separate stream. let bin_len = DtMs::from_ms_u64(range.bin_len.ms()); if bin_len_layers.contains(&bin_len) { - let inp = super::gapfill::GapFill::new(series, bin_len, range)?; + info!("{}::new bin_len in layers", Self::type_name()); + let inp = super::gapfill::GapFill::new(series, range, do_time_weight, bin_len_layers)?; let ret = Self { inp: Box::pin(inp) }; Ok(ret) } else { @@ -65,15 +67,9 @@ impl TimeBinnedFromLayers { Some(finer) => { // TODO // produce from binned sub-stream with additional binner. - let inp = super::gapfill::GapFill::new(series, bin_len, range.clone())? - // .map(|item| { - // let ret = match item { - // Ok(k) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))), - // Err(e) => Err(::err::Error::from_string(e)), - // }; - // ret - // }) - ; + let range = BinnedRange::from_nano_range(range.to_nano_range(), finer); + info!("{}::new next finer {:?} {:?}", Self::type_name(), finer, range); + let inp = super::gapfill::GapFill::new(series, range.clone(), do_time_weight, bin_len_layers)?; let inp = super::basic::TimeBinnedStream::new( Box::pin(inp), BinnedRangeEnum::Time(range), @@ -83,6 +79,7 @@ impl TimeBinnedFromLayers { Ok(ret) } None => { + info!("{}::new NO next finer", Self::type_name()); // TODO // produce from events todo!() diff --git a/crates/streams/src/timebin/gapfill.rs b/crates/streams/src/timebin/gapfill.rs index 27f62de..db82cb2 100644 --- a/crates/streams/src/timebin/gapfill.rs +++ b/crates/streams/src/timebin/gapfill.rs @@ -1,6 +1,7 @@ use err::thiserror; use err::ThisError; use futures_util::Stream; +use futures_util::TryStreamExt; use items_0::streamitem::Sitemty; use items_2::binsdim0::BinsDim0; use netpod::BinnedRange; @@ -12,16 +13,31 @@ use std::task::Poll; #[derive(Debug, ThisError)] #[cstm(name = "BinCachedGapFill")] -pub enum Error {} +pub enum Error { + CacheReader(#[from] super::cached::reader::Error), +} + +type INP = Pin, Error>> + Send>>; // Try to read from cache for the given bin len. // For gaps in the stream, construct an alternative input from finer bin len with a binner. -pub struct GapFill {} +pub struct GapFill { + inp: INP, +} impl GapFill { - pub fn new(series: u64, bin_len: DtMs, range: BinnedRange) -> Result { - // TODO assert that the requested bin_len is a cacheable length. - todo!() + // bin_len of the given range must be a cacheable bin_len. + pub fn new( + series: u64, + range: BinnedRange, + do_time_weight: bool, + bin_len_layers: Vec, + ) -> Result { + // super::fromlayers::TimeBinnedFromLayers::new(series, range, do_time_weight, bin_len_layers)?; + let inp = + super::cached::reader::CachedReader::new(series, range.bin_len.to_dt_ms(), range)?.map_err(Error::from); + let ret = Self { inp: Box::pin(inp) }; + Ok(ret) } } @@ -29,6 +45,7 @@ impl Stream for GapFill { type Item = Sitemty>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; // When do we detect a gap: // - when the current item poses a gap to the last. // - when we see EOS before the requested range is filled. @@ -44,6 +61,6 @@ impl Stream for GapFill { // It does not attempt to read the given bin-len from a cache, because we just did attempt that. // It still requires that bin-len is cacheable. (NO! it must work with the layering that I passed!) // Then it finds the next cacheable - todo!() + Ready(None) } } diff --git a/crates/streams/src/timebin/grid.rs b/crates/streams/src/timebin/grid.rs index ad2cc13..bd6e333 100644 --- a/crates/streams/src/timebin/grid.rs +++ b/crates/streams/src/timebin/grid.rs @@ -3,5 +3,10 @@ use netpod::DtMs; // Find the next finer bin len from the passed list. // The list is assumed to be sorted ascending, meaning finer bin len first. pub fn find_next_finer_bin_len(bin_len: DtMs, layers: &[DtMs]) -> Option { - todo!("find_next_finer_bin_len") + for l in layers.iter().rev() { + if *l < bin_len { + return Some(*l); + } + } + None } diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index 87eb985..4009eda 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -263,25 +263,11 @@ async fn timebinned_stream( bin_len_layers, ) .map_err(Error::from_string)?; - // Possible to simplify these kind of seemingly simple type conversions? - let stream = stream.map(|item| match item { - Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) => Ok(StreamItem::DataItem( - RangeCompletableItem::Data(Box::new(k) as Box), - )), - Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => { - Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) - } - Ok(StreamItem::Log(k)) => Ok(StreamItem::Log(k)), - Ok(StreamItem::Stats(k)) => Ok(StreamItem::Stats(k)), - Err(e) => Err(e), + let stream = stream.map(|item| { + on_sitemty_data!(item, |k| Ok(StreamItem::DataItem(RangeCompletableItem::Data( + Box::new(k) as Box + )))) }); - // let stream = stream.map(|item| match item { - // Ok(k) => { - // let k = Box::new(k) as Box; - // Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) - // } - // Err(e) => Err(::err::Error::from_string(e)), - // }); let stream: Pin>> + Send>> = Box::pin(stream); Ok(stream) }