diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 592abf8..c81bbc4 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -501,7 +501,10 @@ impl AggregatableXdim1Bin for Dim1F32StreamItem { fn into_agg(self) -> Self::Output { match self { Dim1F32StreamItem::Values(vals) => MinMaxAvgScalarEventBatchStreamItem::Values(vals.into_agg()), - _ => panic!(), + Dim1F32StreamItem::EventDataReadStats(stats) => { + MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats) + } + Dim1F32StreamItem::RangeComplete => MinMaxAvgScalarEventBatchStreamItem::RangeComplete, } } } @@ -516,6 +519,7 @@ pub enum MinMaxAvgScalarBinBatchStreamItem { pub struct MinMaxAvgScalarEventBatchStreamItemAggregator { agg: MinMaxAvgScalarEventBatchAggregator, event_data_read_stats: EventDataReadStats, + range_complete: bool, } impl MinMaxAvgScalarEventBatchStreamItemAggregator { @@ -524,6 +528,7 @@ impl MinMaxAvgScalarEventBatchStreamItemAggregator { Self { agg, event_data_read_stats: EventDataReadStats::new(), + range_complete: false, } } } @@ -535,21 +540,21 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchStreamItemAggregator { fn ends_before(&self, inp: &Self::InputValue) -> bool { match inp { MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ends_before(vals), - _ => todo!(), + _ => false, } } fn ends_after(&self, inp: &Self::InputValue) -> bool { match inp { MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ends_after(vals), - _ => todo!(), + _ => false, } } fn starts_after(&self, inp: &Self::InputValue) -> bool { match inp { MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.starts_after(vals), - _ => todo!(), + _ => false, } } @@ -557,7 +562,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchStreamItemAggregator { match inp { MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ingest(vals), MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats) => self.event_data_read_stats.trans(stats), - MinMaxAvgScalarEventBatchStreamItem::RangeComplete => panic!(), + MinMaxAvgScalarEventBatchStreamItem::RangeComplete => self.range_complete = true, } } @@ -571,6 +576,9 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchStreamItemAggregator { ret.push(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats( self.event_data_read_stats, )); + if self.range_complete { + ret.push(MinMaxAvgScalarBinBatchStreamItem::RangeComplete); + } ret } } @@ -596,6 +604,7 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatchStreamItem { pub struct MinMaxAvgScalarBinBatchStreamItemAggregator { agg: MinMaxAvgScalarBinBatchAggregator, event_data_read_stats: EventDataReadStats, + range_complete: bool, } impl MinMaxAvgScalarBinBatchStreamItemAggregator { @@ -604,6 +613,7 @@ impl MinMaxAvgScalarBinBatchStreamItemAggregator { Self { agg, event_data_read_stats: EventDataReadStats::new(), + range_complete: false, } } } @@ -615,21 +625,21 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchStreamItemAggregator { fn ends_before(&self, inp: &Self::InputValue) -> bool { match inp { MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ends_before(vals), - _ => todo!(), + _ => false, } } fn ends_after(&self, inp: &Self::InputValue) -> bool { match inp { MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ends_after(vals), - _ => todo!(), + _ => false, } } fn starts_after(&self, inp: &Self::InputValue) -> bool { match inp { MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.starts_after(vals), - _ => todo!(), + _ => false, } } @@ -637,7 +647,7 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchStreamItemAggregator { match inp { MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ingest(vals), MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats) => self.event_data_read_stats.trans(stats), - MinMaxAvgScalarBinBatchStreamItem::RangeComplete => panic!(), + MinMaxAvgScalarBinBatchStreamItem::RangeComplete => self.range_complete = true, } } @@ -651,6 +661,9 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchStreamItemAggregator { ret.push(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats( self.event_data_read_stats, )); + if self.range_complete { + ret.push(MinMaxAvgScalarBinBatchStreamItem::RangeComplete); + } ret } } diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index b983833..535df6e 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -143,8 +143,11 @@ impl PreBinnedValueStream { let s1 = MergedFromRemotes::new(evq, self.node_config.node_config.cluster.clone()); let s2 = s1.into_binned_t(range).map(|k| match k { Ok(MinMaxAvgScalarBinBatchStreamItem::Values(k)) => Ok(PreBinnedItem::Batch(k)), + Ok(MinMaxAvgScalarBinBatchStreamItem::RangeComplete) => Ok(PreBinnedItem::RangeComplete), + Ok(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats)) => { + Ok(PreBinnedItem::EventDataReadStats(stats)) + } Err(e) => Err(e), - _ => todo!(), }); self.fut2 = Some(Box::pin(s2)); } diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 77e09c9..aa5f21d 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -160,6 +160,8 @@ where ts_last_emit: u64, range_complete_observed: Vec, range_complete_observed_all: bool, + range_complete_observed_all_emitted: bool, + batch_size: usize, } impl MergedMinMaxAvgScalarStream @@ -182,6 +184,8 @@ where ts_last_emit: 0, range_complete_observed: vec![false; n], range_complete_observed_all: false, + range_complete_observed_all_emitted: false, + batch_size: 64, } } } @@ -202,33 +206,41 @@ where self.completed = true; return Ready(None); } + if self.range_complete_observed_all { + error!("MERGER NOTE range_complete_observed_all"); + if self.range_complete_observed_all_emitted { + // TODO something to do? Returning None is maybe too early if there is stats left. + } else { + error!("MERGER NOTE range_complete_observed_all EMIT NOW"); + self.range_complete_observed_all_emitted = true; + return Ready(Some(Ok(MinMaxAvgScalarEventBatchStreamItem::RangeComplete))); + } + } // can only run logic if all streams are either finished, errored or have some current value. for i1 in 0..self.inps.len() { match self.current[i1] { MergedMinMaxAvgScalarStreamCurVal::None => { match self.inps[i1].poll_next_unpin(cx) { - Ready(Some(Ok(k))) => { - match k { - MinMaxAvgScalarEventBatchStreamItem::Values(vals) => { - self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::Val(vals); - } - MinMaxAvgScalarEventBatchStreamItem::RangeComplete => { - self.range_complete_observed[i1] = true; - let d = self.range_complete_observed.iter().filter(|&&k| k).count(); - if d == self.range_complete_observed.len() { - self.range_complete_observed_all = true; - info!("\n\n:::::: range_complete d {} COMPLETE", d); - } else { - info!("\n\n:::::: range_complete d {}", d); - } - // TODO what else to do here? - todo!(); - } - MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(_stats) => { - todo!(); - } + Ready(Some(Ok(k))) => match k { + MinMaxAvgScalarEventBatchStreamItem::Values(vals) => { + self.ixs[i1] = 0; + self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::Val(vals); } - } + MinMaxAvgScalarEventBatchStreamItem::RangeComplete => { + self.range_complete_observed[i1] = true; + let d = self.range_complete_observed.iter().filter(|&&k| k).count(); + if d == self.range_complete_observed.len() { + self.range_complete_observed_all = true; + info!("\n\n:::::: range_complete d {} COMPLETE", d); + } else { + info!("\n\n:::::: range_complete d {}", d); + } + continue 'outer; + } + MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(_stats) => { + todo!(); + } + }, Ready(Some(Err(e))) => { // TODO emit this error, consider this stream as done, anything more to do here? //self.current[i1] = CurVal::Err(e); @@ -239,7 +251,6 @@ where self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::Finish; } Pending => { - // TODO is this behaviour correct? return Pending; } } @@ -290,21 +301,25 @@ where self.batch.tss.push(lowest_ts); let rix = self.ixs[lowest_ix]; let z = match &self.current[lowest_ix] { - MergedMinMaxAvgScalarStreamCurVal::Val(k) => (k.mins[rix], k.maxs[rix], k.avgs[rix]), + MergedMinMaxAvgScalarStreamCurVal::Val(k) => (k.mins[rix], k.maxs[rix], k.avgs[rix], k.tss.len()), _ => panic!(), }; self.batch.mins.push(z.0); self.batch.maxs.push(z.1); self.batch.avgs.push(z.2); self.ixs[lowest_ix] += 1; + if self.ixs[lowest_ix] >= z.3 { + self.ixs[lowest_ix] = 0; + self.current[lowest_ix] = MergedMinMaxAvgScalarStreamCurVal::None; + } } - if self.batch.tss.len() >= 64 { + if self.batch.tss.len() >= self.batch_size { let mut k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); if self.range_complete_observed_all { k.range_complete_observed = true; } let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k); - break Ready(Some(Ok(ret))); + return Ready(Some(Ok(ret))); } } } diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index 114baaa..385b8bc 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -52,11 +52,12 @@ async fn get_binned_0_inner() -> Result<(), Error> { &cluster, ) .await?; + return Ok(()); get_binned_channel( "wave-u16-le-n77", "1970-01-01T01:11:00.000Z", - "1970-01-01T02:12:00.000Z", - 4, + "1970-01-01T01:40:00.000Z", + 7, &cluster, ) .await?; @@ -142,7 +143,7 @@ where match bincode::deserialize::(frame.buf()) { Ok(item) => match item { Ok(item) => { - info!("TEST GOT ITEM {:?}", item); + info!("TEST GOT ITEM {:?}\n", item); Some(Ok(item)) } Err(e) => {