This commit is contained in:
Dominik Werder
2024-09-06 22:11:57 +02:00
parent f5909ea03c
commit 2c523810c1
6 changed files with 70 additions and 38 deletions

View File

@@ -1760,11 +1760,22 @@ impl DtMs {
1000000 * self.0
}
pub const fn dt_ns(&self) -> DtNano {
DtNano::from_ms(self.0)
}
pub const fn to_i64(&self) -> i64 {
self.0 as i64
}
}
impl fmt::Display for DtMs {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let dur = Duration::from_millis(self.ms());
write!(fmt, "{}", humantime::format_duration(dur))
}
}
#[derive(Copy, Clone, PartialEq, PartialOrd, Eq, Ord)]
pub struct TsNano(u64);
@@ -1866,6 +1877,10 @@ impl TsNano {
TsMs::from_ms_u64(self.ms())
}
pub const fn to_dt_ms(self) -> DtMs {
DtMs::from_ms_u64(self.ms())
}
pub fn from_system_time(st: SystemTime) -> Self {
let tsunix = st.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO);
let x = tsunix.as_secs() * 1_000_000_000 + tsunix.subsec_nanos() as u64;
@@ -2378,6 +2393,16 @@ impl BinnedRange<TsNano> {
pub fn to_nano_range(&self) -> NanoRange {
self.full_range()
}
pub fn from_nano_range(range: NanoRange, bin_len: DtMs) -> Self {
let off1 = range.beg() / bin_len.ns();
let off2 = (bin_len.ns() - 1 + range.end()) / bin_len.ns();
Self {
bin_len: TsNano::from_ns(bin_len.ns()),
bin_off: off1,
bin_cnt: off2 - off1,
}
}
}
impl<T> BinnedRange<T>

View File

@@ -16,8 +16,9 @@ pub enum Error {}
pub struct CachedReader {}
impl CachedReader {
pub fn new(series: u64, bin_len: DtMs, range: BinnedRange<TsNano>) -> Self {
todo!()
pub fn new(series: u64, bin_len: DtMs, range: BinnedRange<TsNano>) -> Result<Self, Error> {
let ret = Self {};
Ok(ret)
}
}
@@ -25,6 +26,7 @@ impl Stream for CachedReader {
type Item = Result<BinsDim0<f32>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
todo!()
use Poll::*;
Ready(None)
}
}

View File

@@ -4,6 +4,7 @@ use err::ThisError;
use futures_util::Stream;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use items_0::on_sitemty_data;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
@@ -57,7 +58,8 @@ impl TimeBinnedFromLayers {
// must produce bins missing in cache from separate stream.
let bin_len = DtMs::from_ms_u64(range.bin_len.ms());
if bin_len_layers.contains(&bin_len) {
let inp = super::gapfill::GapFill::new(series, bin_len, range)?;
info!("{}::new bin_len in layers", Self::type_name());
let inp = super::gapfill::GapFill::new(series, range, do_time_weight, bin_len_layers)?;
let ret = Self { inp: Box::pin(inp) };
Ok(ret)
} else {
@@ -65,15 +67,9 @@ impl TimeBinnedFromLayers {
Some(finer) => {
// TODO
// produce from binned sub-stream with additional binner.
let inp = super::gapfill::GapFill::new(series, bin_len, range.clone())?
// .map(|item| {
// let ret = match item {
// Ok(k) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))),
// Err(e) => Err(::err::Error::from_string(e)),
// };
// ret
// })
;
let range = BinnedRange::from_nano_range(range.to_nano_range(), finer);
info!("{}::new next finer {:?} {:?}", Self::type_name(), finer, range);
let inp = super::gapfill::GapFill::new(series, range.clone(), do_time_weight, bin_len_layers)?;
let inp = super::basic::TimeBinnedStream::new(
Box::pin(inp),
BinnedRangeEnum::Time(range),
@@ -83,6 +79,7 @@ impl TimeBinnedFromLayers {
Ok(ret)
}
None => {
info!("{}::new NO next finer", Self::type_name());
// TODO
// produce from events
todo!()

View File

@@ -1,6 +1,7 @@
use err::thiserror;
use err::ThisError;
use futures_util::Stream;
use futures_util::TryStreamExt;
use items_0::streamitem::Sitemty;
use items_2::binsdim0::BinsDim0;
use netpod::BinnedRange;
@@ -12,16 +13,31 @@ use std::task::Poll;
#[derive(Debug, ThisError)]
#[cstm(name = "BinCachedGapFill")]
pub enum Error {}
pub enum Error {
CacheReader(#[from] super::cached::reader::Error),
}
type INP = Pin<Box<dyn Stream<Item = Result<BinsDim0<f32>, Error>> + Send>>;
// Try to read from cache for the given bin len.
// For gaps in the stream, construct an alternative input from finer bin len with a binner.
pub struct GapFill {}
pub struct GapFill {
inp: INP,
}
impl GapFill {
pub fn new(series: u64, bin_len: DtMs, range: BinnedRange<TsNano>) -> Result<Self, Error> {
// TODO assert that the requested bin_len is a cacheable length.
todo!()
// bin_len of the given range must be a cacheable bin_len.
pub fn new(
series: u64,
range: BinnedRange<TsNano>,
do_time_weight: bool,
bin_len_layers: Vec<DtMs>,
) -> Result<Self, Error> {
// super::fromlayers::TimeBinnedFromLayers::new(series, range, do_time_weight, bin_len_layers)?;
let inp =
super::cached::reader::CachedReader::new(series, range.bin_len.to_dt_ms(), range)?.map_err(Error::from);
let ret = Self { inp: Box::pin(inp) };
Ok(ret)
}
}
@@ -29,6 +45,7 @@ impl Stream for GapFill {
type Item = Sitemty<BinsDim0<f32>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
// When do we detect a gap:
// - when the current item poses a gap to the last.
// - when we see EOS before the requested range is filled.
@@ -44,6 +61,6 @@ impl Stream for GapFill {
// It does not attempt to read the given bin-len from a cache, because we just did attempt that.
// It still requires that bin-len is cacheable. (NO! it must work with the layering that I passed!)
// Then it finds the next cacheable
todo!()
Ready(None)
}
}

View File

@@ -3,5 +3,10 @@ use netpod::DtMs;
// Find the next finer bin len from the passed list.
// The list is assumed to be sorted ascending, meaning finer bin len first.
pub fn find_next_finer_bin_len(bin_len: DtMs, layers: &[DtMs]) -> Option<DtMs> {
todo!("find_next_finer_bin_len")
for l in layers.iter().rev() {
if *l < bin_len {
return Some(*l);
}
}
None
}

View File

@@ -263,25 +263,11 @@ async fn timebinned_stream(
bin_len_layers,
)
.map_err(Error::from_string)?;
// Possible to simplify these kind of seemingly simple type conversions?
let stream = stream.map(|item| match item {
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) => Ok(StreamItem::DataItem(
RangeCompletableItem::Data(Box::new(k) as Box<dyn TimeBinned>),
)),
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => {
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
}
Ok(StreamItem::Log(k)) => Ok(StreamItem::Log(k)),
Ok(StreamItem::Stats(k)) => Ok(StreamItem::Stats(k)),
Err(e) => Err(e),
let stream = stream.map(|item| {
on_sitemty_data!(item, |k| Ok(StreamItem::DataItem(RangeCompletableItem::Data(
Box::new(k) as Box<dyn TimeBinned>
))))
});
// let stream = stream.map(|item| match item {
// Ok(k) => {
// let k = Box::new(k) as Box<dyn TimeBinned>;
// Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
// }
// Err(e) => Err(::err::Error::from_string(e)),
// });
let stream: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinned>>> + Send>> = Box::pin(stream);
Ok(stream)
}