diff --git a/crates/streams/src/timebin/cached/reader.rs b/crates/streams/src/timebin/cached/reader.rs index b554ad9..ec15ffd 100644 --- a/crates/streams/src/timebin/cached/reader.rs +++ b/crates/streams/src/timebin/cached/reader.rs @@ -1,5 +1,6 @@ use err::thiserror; use err::ThisError; +use futures_util::FutureExt; use futures_util::Stream; use items_2::binsdim0::BinsDim0; use netpod::BinnedRange; @@ -14,6 +15,14 @@ pub struct Reading { fut: Pin, Box>> + Send>>, } +impl Future for Reading { + type Output = Result, Box>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + self.fut.poll_unpin(cx) + } +} + pub trait CacheReadProvider: Send { fn read(&self) -> Reading; } diff --git a/crates/streams/src/timebin/fromlayers.rs b/crates/streams/src/timebin/fromlayers.rs index 24fdeb3..3ed6246 100644 --- a/crates/streams/src/timebin/fromlayers.rs +++ b/crates/streams/src/timebin/fromlayers.rs @@ -52,12 +52,6 @@ impl TimeBinnedFromLayers { range, bin_len_layers ); - // cases: - // if this bin_len is a cachable bin_len: - // - have to attempt to read from cache. - // expect to read bins in a stream (randomize to small max len for testing). - // also, if this bin_len is a cachable bin_len: - // must produce bins missing in cache from separate stream. let bin_len = DtMs::from_ms_u64(range.bin_len.ms()); if bin_len_layers.contains(&bin_len) { info!("{}::new bin_len in layers", Self::type_name()); diff --git a/crates/streams/src/timebin/gapfill.rs b/crates/streams/src/timebin/gapfill.rs index 3628254..4703c54 100644 --- a/crates/streams/src/timebin/gapfill.rs +++ b/crates/streams/src/timebin/gapfill.rs @@ -64,6 +64,7 @@ impl Stream for GapFill { // It does not attempt to read the given bin-len from a cache, because we just did attempt that. // It still requires that bin-len is cacheable. (NO! it must work with the layering that I passed!) // Then it finds the next cacheable - Ready(None) + // Ready(None) + todo!("poll the already created cached reader, detect and fill in gaps, send off to cache-write") } }