WIP on RangeComplete handling

This commit is contained in:
Dominik Werder
2021-05-05 11:50:14 +02:00
parent 6ebb03a2f2
commit e80a6c1688
12 changed files with 252 additions and 250 deletions
+3 -219
View File
@@ -3,8 +3,8 @@ Aggregation and binning support.
*/ */
use super::eventchunker::EventFull; use super::eventchunker::EventFull;
use crate::agg::eventbatch::{MinMaxAvgScalarEventBatch, MinMaxAvgScalarEventBatchAggregator}; use crate::agg::binnedt::AggregatableTdim;
use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchAggregator}; use crate::agg::eventbatch::{MinMaxAvgScalarEventBatch, MinMaxAvgScalarEventBatchStreamItem};
use crate::eventchunker::EventChunkerItem; use crate::eventchunker::EventChunkerItem;
use err::Error; use err::Error;
use futures_core::Stream; use futures_core::Stream;
@@ -23,27 +23,11 @@ pub mod binnedx;
pub mod eventbatch; pub mod eventbatch;
pub mod scalarbinbatch; pub mod scalarbinbatch;
pub trait AggregatorTdim {
type InputValue;
type OutputValue: AggregatableXdim1Bin + AggregatableTdim + Unpin;
fn ends_before(&self, inp: &Self::InputValue) -> bool;
fn ends_after(&self, inp: &Self::InputValue) -> bool;
fn starts_after(&self, inp: &Self::InputValue) -> bool;
fn ingest(&mut self, inp: &mut Self::InputValue);
fn result(self) -> Vec<Self::OutputValue>;
}
pub trait AggregatableXdim1Bin { pub trait AggregatableXdim1Bin {
type Output: AggregatableXdim1Bin + AggregatableTdim; type Output: AggregatableXdim1Bin + AggregatableTdim;
fn into_agg(self) -> Self::Output; fn into_agg(self) -> Self::Output;
} }
pub trait AggregatableTdim {
type Output: AggregatableXdim1Bin + AggregatableTdim;
type Aggregator: AggregatorTdim<InputValue = Self>;
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator;
}
/// Batch of events with a scalar (zero dimensions) numeric value. /// Batch of events with a scalar (zero dimensions) numeric value.
pub struct ValuesDim0 { pub struct ValuesDim0 {
tss: Vec<u64>, tss: Vec<u64>,
@@ -66,21 +50,13 @@ impl std::fmt::Debug for ValuesDim0 {
impl AggregatableXdim1Bin for ValuesDim1 { impl AggregatableXdim1Bin for ValuesDim1 {
type Output = MinMaxAvgScalarEventBatch; type Output = MinMaxAvgScalarEventBatch;
fn into_agg(mut self) -> Self::Output { fn into_agg(self) -> Self::Output {
let mut ret = MinMaxAvgScalarEventBatch { let mut ret = MinMaxAvgScalarEventBatch {
tss: Vec::with_capacity(self.tss.len()), tss: Vec::with_capacity(self.tss.len()),
mins: Vec::with_capacity(self.tss.len()), mins: Vec::with_capacity(self.tss.len()),
maxs: Vec::with_capacity(self.tss.len()), maxs: Vec::with_capacity(self.tss.len()),
avgs: Vec::with_capacity(self.tss.len()), avgs: Vec::with_capacity(self.tss.len()),
event_data_read_stats: EventDataReadStats::new(),
values_extract_stats: ValuesExtractStats::new(),
range_complete_observed: false,
}; };
ret.event_data_read_stats.trans(&mut self.event_data_read_stats);
ret.values_extract_stats.trans(&mut self.values_extract_stats);
if self.range_complete_observed {
ret.range_complete_observed = true;
}
for i1 in 0..self.tss.len() { for i1 in 0..self.tss.len() {
let ts = self.tss[i1]; let ts = self.tss[i1];
let mut min = f32::MAX; let mut min = f32::MAX;
@@ -131,9 +107,6 @@ impl ValuesExtractStats {
pub struct ValuesDim1 { pub struct ValuesDim1 {
pub tss: Vec<u64>, pub tss: Vec<u64>,
pub values: Vec<Vec<f32>>, pub values: Vec<Vec<f32>>,
pub event_data_read_stats: EventDataReadStats,
pub values_extract_stats: ValuesExtractStats,
pub range_complete_observed: bool,
} }
impl ValuesDim1 { impl ValuesDim1 {
@@ -141,9 +114,6 @@ impl ValuesDim1 {
Self { Self {
tss: vec![], tss: vec![],
values: vec![], values: vec![],
event_data_read_stats: EventDataReadStats::new(),
values_extract_stats: ValuesExtractStats::new(),
range_complete_observed: false,
} }
} }
} }
@@ -169,9 +139,6 @@ impl AggregatableXdim1Bin for ValuesDim0 {
mins: Vec::with_capacity(self.tss.len()), mins: Vec::with_capacity(self.tss.len()),
maxs: Vec::with_capacity(self.tss.len()), maxs: Vec::with_capacity(self.tss.len()),
avgs: Vec::with_capacity(self.tss.len()), avgs: Vec::with_capacity(self.tss.len()),
event_data_read_stats: EventDataReadStats::new(),
values_extract_stats: ValuesExtractStats::new(),
range_complete_observed: false,
}; };
// TODO stats are not yet in ValuesDim0 // TODO stats are not yet in ValuesDim0
err::todoval::<u32>(); err::todoval::<u32>();
@@ -488,13 +455,6 @@ where
} }
} }
#[derive(Debug, Serialize, Deserialize)]
pub enum MinMaxAvgScalarEventBatchStreamItem {
Values(MinMaxAvgScalarEventBatch),
RangeComplete,
EventDataReadStats(EventDataReadStats),
}
impl AggregatableXdim1Bin for Dim1F32StreamItem { impl AggregatableXdim1Bin for Dim1F32StreamItem {
type Output = MinMaxAvgScalarEventBatchStreamItem; type Output = MinMaxAvgScalarEventBatchStreamItem;
@@ -508,179 +468,3 @@ impl AggregatableXdim1Bin for Dim1F32StreamItem {
} }
} }
} }
#[derive(Debug, Serialize, Deserialize)]
pub enum MinMaxAvgScalarBinBatchStreamItem {
Values(MinMaxAvgScalarBinBatch),
RangeComplete,
EventDataReadStats(EventDataReadStats),
}
pub struct MinMaxAvgScalarEventBatchStreamItemAggregator {
agg: MinMaxAvgScalarEventBatchAggregator,
event_data_read_stats: EventDataReadStats,
range_complete: bool,
}
impl MinMaxAvgScalarEventBatchStreamItemAggregator {
pub fn new(ts1: u64, ts2: u64) -> Self {
let agg = <MinMaxAvgScalarEventBatch as AggregatableTdim>::aggregator_new_static(ts1, ts2);
Self {
agg,
event_data_read_stats: EventDataReadStats::new(),
range_complete: false,
}
}
}
impl AggregatorTdim for MinMaxAvgScalarEventBatchStreamItemAggregator {
type InputValue = MinMaxAvgScalarEventBatchStreamItem;
type OutputValue = MinMaxAvgScalarBinBatchStreamItem;
fn ends_before(&self, inp: &Self::InputValue) -> bool {
match inp {
MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ends_before(vals),
_ => false,
}
}
fn ends_after(&self, inp: &Self::InputValue) -> bool {
match inp {
MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ends_after(vals),
_ => false,
}
}
fn starts_after(&self, inp: &Self::InputValue) -> bool {
match inp {
MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.starts_after(vals),
_ => false,
}
}
fn ingest(&mut self, inp: &mut Self::InputValue) {
match inp {
MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ingest(vals),
MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats) => self.event_data_read_stats.trans(stats),
MinMaxAvgScalarEventBatchStreamItem::RangeComplete => self.range_complete = true,
}
}
fn result(self) -> Vec<Self::OutputValue> {
let mut ret: Vec<Self::OutputValue> = self
.agg
.result()
.into_iter()
.map(MinMaxAvgScalarBinBatchStreamItem::Values)
.collect();
ret.push(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(
self.event_data_read_stats,
));
if self.range_complete {
ret.push(MinMaxAvgScalarBinBatchStreamItem::RangeComplete);
}
ret
}
}
impl AggregatableTdim for MinMaxAvgScalarEventBatchStreamItem {
type Output = MinMaxAvgScalarBinBatchStreamItem;
type Aggregator = MinMaxAvgScalarEventBatchStreamItemAggregator;
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
//<Self as AggregatableTdim>::Aggregator::new(ts1, ts2)
Self::Aggregator::new(ts1, ts2)
}
}
impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatchStreamItem {
type Output = MinMaxAvgScalarEventBatchStreamItem;
fn into_agg(self) -> Self::Output {
self
}
}
pub struct MinMaxAvgScalarBinBatchStreamItemAggregator {
agg: MinMaxAvgScalarBinBatchAggregator,
event_data_read_stats: EventDataReadStats,
range_complete: bool,
}
impl MinMaxAvgScalarBinBatchStreamItemAggregator {
pub fn new(ts1: u64, ts2: u64) -> Self {
let agg = <MinMaxAvgScalarBinBatch as AggregatableTdim>::aggregator_new_static(ts1, ts2);
Self {
agg,
event_data_read_stats: EventDataReadStats::new(),
range_complete: false,
}
}
}
impl AggregatorTdim for MinMaxAvgScalarBinBatchStreamItemAggregator {
type InputValue = MinMaxAvgScalarBinBatchStreamItem;
type OutputValue = MinMaxAvgScalarBinBatchStreamItem;
fn ends_before(&self, inp: &Self::InputValue) -> bool {
match inp {
MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ends_before(vals),
_ => false,
}
}
fn ends_after(&self, inp: &Self::InputValue) -> bool {
match inp {
MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ends_after(vals),
_ => false,
}
}
fn starts_after(&self, inp: &Self::InputValue) -> bool {
match inp {
MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.starts_after(vals),
_ => false,
}
}
fn ingest(&mut self, inp: &mut Self::InputValue) {
match inp {
MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ingest(vals),
MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats) => self.event_data_read_stats.trans(stats),
MinMaxAvgScalarBinBatchStreamItem::RangeComplete => self.range_complete = true,
}
}
fn result(self) -> Vec<Self::OutputValue> {
let mut ret: Vec<Self::OutputValue> = self
.agg
.result()
.into_iter()
.map(MinMaxAvgScalarBinBatchStreamItem::Values)
.collect();
ret.push(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(
self.event_data_read_stats,
));
if self.range_complete {
ret.push(MinMaxAvgScalarBinBatchStreamItem::RangeComplete);
}
ret
}
}
impl AggregatableTdim for MinMaxAvgScalarBinBatchStreamItem {
type Output = MinMaxAvgScalarBinBatchStreamItem;
type Aggregator = MinMaxAvgScalarBinBatchStreamItemAggregator;
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
Self::Aggregator::new(ts1, ts2)
}
}
impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatchStreamItem {
type Output = MinMaxAvgScalarBinBatchStreamItem;
fn into_agg(self) -> Self::Output {
self
}
}
+22 -1
View File
@@ -1,4 +1,4 @@
use crate::agg::{AggregatableTdim, AggregatorTdim}; use crate::agg::AggregatableXdim1Bin;
use err::Error; use err::Error;
use futures_core::Stream; use futures_core::Stream;
use futures_util::StreamExt; use futures_util::StreamExt;
@@ -8,6 +8,23 @@ use std::collections::VecDeque;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
pub trait AggregatorTdim {
type InputValue;
type OutputValue: AggregatableXdim1Bin + AggregatableTdim + Unpin;
fn ends_before(&self, inp: &Self::InputValue) -> bool;
fn ends_after(&self, inp: &Self::InputValue) -> bool;
fn starts_after(&self, inp: &Self::InputValue) -> bool;
fn ingest(&mut self, inp: &mut Self::InputValue);
fn result(self) -> Vec<Self::OutputValue>;
}
pub trait AggregatableTdim {
type Output: AggregatableXdim1Bin + AggregatableTdim;
type Aggregator: AggregatorTdim<InputValue = Self>;
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator;
fn is_range_complete(&self) -> bool;
}
pub trait IntoBinnedT { pub trait IntoBinnedT {
type StreamOut: Stream; type StreamOut: Stream;
fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut; fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut;
@@ -94,6 +111,10 @@ where
}; };
break match cur { break match cur {
Ready(Some(Ok(k))) => { Ready(Some(Ok(k))) => {
// TODO need some trait to know whether the incoming item is a RangeComplete
err::todo();
let ag = self.aggtor.as_mut().unwrap(); let ag = self.aggtor.as_mut().unwrap();
if ag.ends_before(&k) { if ag.ends_before(&k) {
//info!("ENDS BEFORE"); //info!("ENDS BEFORE");
+102 -11
View File
@@ -1,5 +1,6 @@
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim};
use crate::agg::{AggregatableTdim, AggregatableXdim1Bin, AggregatorTdim, ValuesExtractStats}; use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem};
use crate::agg::AggregatableXdim1Bin;
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use netpod::log::*; use netpod::log::*;
use netpod::timeunits::SEC; use netpod::timeunits::SEC;
@@ -13,9 +14,6 @@ pub struct MinMaxAvgScalarEventBatch {
pub mins: Vec<f32>, pub mins: Vec<f32>,
pub maxs: Vec<f32>, pub maxs: Vec<f32>,
pub avgs: Vec<f32>, pub avgs: Vec<f32>,
pub event_data_read_stats: EventDataReadStats,
pub values_extract_stats: ValuesExtractStats,
pub range_complete_observed: bool,
} }
impl MinMaxAvgScalarEventBatch { impl MinMaxAvgScalarEventBatch {
@@ -25,9 +23,6 @@ impl MinMaxAvgScalarEventBatch {
mins: vec![], mins: vec![],
maxs: vec![], maxs: vec![],
avgs: vec![], avgs: vec![],
event_data_read_stats: EventDataReadStats::new(),
values_extract_stats: ValuesExtractStats::new(),
range_complete_observed: false,
} }
} }
@@ -93,14 +88,12 @@ impl std::fmt::Debug for MinMaxAvgScalarEventBatch {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!( write!(
fmt, fmt,
"MinMaxAvgScalarEventBatch count {} tss {:?} mins {:?} maxs {:?} avgs {:?} EDS {:?} VXS {:?}", "MinMaxAvgScalarEventBatch count {} tss {:?} mins {:?} maxs {:?} avgs {:?}",
self.tss.len(), self.tss.len(),
self.tss, self.tss,
self.mins, self.mins,
self.maxs, self.maxs,
self.avgs, self.avgs,
self.event_data_read_stats,
self.values_extract_stats,
) )
} }
} }
@@ -118,6 +111,9 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatch {
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
MinMaxAvgScalarEventBatchAggregator::new(ts1, ts2) MinMaxAvgScalarEventBatchAggregator::new(ts1, ts2)
} }
fn is_range_complete(&self) -> bool {
false
}
} }
impl MinMaxAvgScalarEventBatch { impl MinMaxAvgScalarEventBatch {
@@ -258,3 +254,98 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
vec![v] vec![v]
} }
} }
#[derive(Debug, Serialize, Deserialize)]
pub enum MinMaxAvgScalarEventBatchStreamItem {
Values(MinMaxAvgScalarEventBatch),
RangeComplete,
EventDataReadStats(EventDataReadStats),
}
impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatchStreamItem {
type Output = MinMaxAvgScalarEventBatchStreamItem;
fn into_agg(self) -> Self::Output {
self
}
}
impl AggregatableTdim for MinMaxAvgScalarEventBatchStreamItem {
type Output = MinMaxAvgScalarBinBatchStreamItem;
type Aggregator = MinMaxAvgScalarEventBatchStreamItemAggregator;
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
//<Self as AggregatableTdim>::Aggregator::new(ts1, ts2)
Self::Aggregator::new(ts1, ts2)
}
fn is_range_complete(&self) -> bool {
if let MinMaxAvgScalarEventBatchStreamItem::RangeComplete = self {
true
} else {
false
}
}
}
pub struct MinMaxAvgScalarEventBatchStreamItemAggregator {
agg: MinMaxAvgScalarEventBatchAggregator,
event_data_read_stats: EventDataReadStats,
}
impl MinMaxAvgScalarEventBatchStreamItemAggregator {
pub fn new(ts1: u64, ts2: u64) -> Self {
let agg = <MinMaxAvgScalarEventBatch as AggregatableTdim>::aggregator_new_static(ts1, ts2);
Self {
agg,
event_data_read_stats: EventDataReadStats::new(),
}
}
}
impl AggregatorTdim for MinMaxAvgScalarEventBatchStreamItemAggregator {
type InputValue = MinMaxAvgScalarEventBatchStreamItem;
type OutputValue = MinMaxAvgScalarBinBatchStreamItem;
fn ends_before(&self, inp: &Self::InputValue) -> bool {
match inp {
MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ends_before(vals),
_ => false,
}
}
fn ends_after(&self, inp: &Self::InputValue) -> bool {
match inp {
MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ends_after(vals),
_ => false,
}
}
fn starts_after(&self, inp: &Self::InputValue) -> bool {
match inp {
MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.starts_after(vals),
_ => false,
}
}
fn ingest(&mut self, inp: &mut Self::InputValue) {
match inp {
MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ingest(vals),
MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats) => self.event_data_read_stats.trans(stats),
MinMaxAvgScalarEventBatchStreamItem::RangeComplete => (),
}
}
fn result(self) -> Vec<Self::OutputValue> {
let mut ret: Vec<Self::OutputValue> = self
.agg
.result()
.into_iter()
.map(MinMaxAvgScalarBinBatchStreamItem::Values)
.collect();
ret.push(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(
self.event_data_read_stats,
));
ret
}
}
+102 -2
View File
@@ -1,8 +1,9 @@
use crate::agg::{AggregatableTdim, AggregatableXdim1Bin, AggregatorTdim, Fits, FitsInside}; use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim};
use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside};
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use netpod::log::*; use netpod::log::*;
use netpod::timeunits::SEC; use netpod::timeunits::SEC;
use netpod::NanoRange; use netpod::{EventDataReadStats, NanoRange};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::mem::size_of; use std::mem::size_of;
@@ -188,9 +189,14 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatch {
impl AggregatableTdim for MinMaxAvgScalarBinBatch { impl AggregatableTdim for MinMaxAvgScalarBinBatch {
type Output = MinMaxAvgScalarBinBatch; type Output = MinMaxAvgScalarBinBatch;
type Aggregator = MinMaxAvgScalarBinBatchAggregator; type Aggregator = MinMaxAvgScalarBinBatchAggregator;
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
MinMaxAvgScalarBinBatchAggregator::new(ts1, ts2) MinMaxAvgScalarBinBatchAggregator::new(ts1, ts2)
} }
fn is_range_complete(&self) -> bool {
false
}
} }
pub struct MinMaxAvgScalarBinBatchAggregator { pub struct MinMaxAvgScalarBinBatchAggregator {
@@ -279,3 +285,97 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
vec![v] vec![v]
} }
} }
#[derive(Debug, Serialize, Deserialize)]
pub enum MinMaxAvgScalarBinBatchStreamItem {
Values(MinMaxAvgScalarBinBatch),
RangeComplete,
EventDataReadStats(EventDataReadStats),
}
impl AggregatableTdim for MinMaxAvgScalarBinBatchStreamItem {
type Output = MinMaxAvgScalarBinBatchStreamItem;
type Aggregator = MinMaxAvgScalarBinBatchStreamItemAggregator;
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
Self::Aggregator::new(ts1, ts2)
}
fn is_range_complete(&self) -> bool {
if let MinMaxAvgScalarBinBatchStreamItem::RangeComplete = self {
true
} else {
false
}
}
}
impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatchStreamItem {
type Output = MinMaxAvgScalarBinBatchStreamItem;
fn into_agg(self) -> Self::Output {
self
}
}
pub struct MinMaxAvgScalarBinBatchStreamItemAggregator {
agg: MinMaxAvgScalarBinBatchAggregator,
event_data_read_stats: EventDataReadStats,
}
impl MinMaxAvgScalarBinBatchStreamItemAggregator {
pub fn new(ts1: u64, ts2: u64) -> Self {
let agg = <MinMaxAvgScalarBinBatch as AggregatableTdim>::aggregator_new_static(ts1, ts2);
Self {
agg,
event_data_read_stats: EventDataReadStats::new(),
}
}
}
impl AggregatorTdim for MinMaxAvgScalarBinBatchStreamItemAggregator {
type InputValue = MinMaxAvgScalarBinBatchStreamItem;
type OutputValue = MinMaxAvgScalarBinBatchStreamItem;
fn ends_before(&self, inp: &Self::InputValue) -> bool {
match inp {
MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ends_before(vals),
_ => false,
}
}
fn ends_after(&self, inp: &Self::InputValue) -> bool {
match inp {
MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ends_after(vals),
_ => false,
}
}
fn starts_after(&self, inp: &Self::InputValue) -> bool {
match inp {
MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.starts_after(vals),
_ => false,
}
}
fn ingest(&mut self, inp: &mut Self::InputValue) {
match inp {
MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ingest(vals),
MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats) => self.event_data_read_stats.trans(stats),
MinMaxAvgScalarBinBatchStreamItem::RangeComplete => (),
}
}
fn result(self) -> Vec<Self::OutputValue> {
let mut ret: Vec<Self::OutputValue> = self
.agg
.result()
.into_iter()
.map(MinMaxAvgScalarBinBatchStreamItem::Values)
.collect();
ret.push(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(
self.event_data_read_stats,
));
ret
}
}
+1 -1
View File
@@ -1,4 +1,4 @@
use crate::agg::MinMaxAvgScalarBinBatchStreamItem; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem;
use crate::cache::pbvfs::{PreBinnedItem, PreBinnedValueFetchedStream}; use crate::cache::pbvfs::{PreBinnedItem, PreBinnedValueFetchedStream};
use crate::cache::{CacheUsage, PreBinnedQuery}; use crate::cache::{CacheUsage, PreBinnedQuery};
use err::Error; use err::Error;
+1 -1
View File
@@ -1,4 +1,4 @@
use crate::agg::MinMaxAvgScalarEventBatchStreamItem; use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem;
use crate::binnedstream::BinnedStream; use crate::binnedstream::BinnedStream;
use crate::cache::pbv::PreBinnedValueByteStream; use crate::cache::pbv::PreBinnedValueByteStream;
use crate::channelconfig::{extract_matching_config_entry, read_local_config}; use crate::channelconfig::{extract_matching_config_entry, read_local_config};
+1 -1
View File
@@ -1,5 +1,5 @@
use crate::agg::binnedt::IntoBinnedT; use crate::agg::binnedt::IntoBinnedT;
use crate::agg::MinMaxAvgScalarBinBatchStreamItem; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem;
use crate::cache::pbvfs::{PreBinnedItem, PreBinnedValueFetchedStream}; use crate::cache::pbvfs::{PreBinnedItem, PreBinnedValueFetchedStream};
use crate::cache::{node_ix_for_patch, MergedFromRemotes, PreBinnedQuery}; use crate::cache::{node_ix_for_patch, MergedFromRemotes, PreBinnedQuery};
use crate::frame::makeframe::make_frame; use crate::frame::makeframe::make_frame;
+12 -8
View File
@@ -1,5 +1,5 @@
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::eventbatch::{MinMaxAvgScalarEventBatch, MinMaxAvgScalarEventBatchStreamItem};
use crate::agg::{Dim1F32Stream, Dim1F32StreamItem, MinMaxAvgScalarEventBatchStreamItem, ValuesDim1}; use crate::agg::{Dim1F32Stream, Dim1F32StreamItem, ValuesDim1};
use crate::eventchunker::EventChunkerItem; use crate::eventchunker::EventChunkerItem;
use err::Error; use err::Error;
use futures_core::Stream; use futures_core::Stream;
@@ -282,14 +282,21 @@ where
} }
if lowest_ix == usize::MAX { if lowest_ix == usize::MAX {
if self.batch.tss.len() != 0 { if self.batch.tss.len() != 0 {
let mut k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty());
if self.range_complete_observed_all { if self.range_complete_observed_all {
k.range_complete_observed = true; // TODO we don't want to emit range complete here, instead we want to emit potentially
// a RangeComplete at the very end when data and stats are emitted and all inputs finished.
err::todo();
} }
info!("MergedMinMaxAvgScalarStream no more lowest emit Ready(Some( current batch ))"); info!("MergedMinMaxAvgScalarStream no more lowest emit Ready(Some( current batch ))");
let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k); let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k);
break Ready(Some(Ok(ret))); break Ready(Some(Ok(ret)));
} else { } else {
if self.range_complete_observed_all {
// TODO we don't want to emit range complete here, instead we want to emit potentially
// a RangeComplete at the very end when data and stats are emitted and all inputs finished.
err::todo();
}
info!("MergedMinMaxAvgScalarStream no more lowest emit Ready(None)"); info!("MergedMinMaxAvgScalarStream no more lowest emit Ready(None)");
self.completed = true; self.completed = true;
break Ready(None); break Ready(None);
@@ -314,10 +321,7 @@ where
} }
} }
if self.batch.tss.len() >= self.batch_size { if self.batch.tss.len() >= self.batch_size {
let mut k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty());
if self.range_complete_observed_all {
k.range_complete_observed = true;
}
let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k); let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k);
return Ready(Some(Ok(ret))); return Ready(Some(Ok(ret)));
} }
+1 -1
View File
@@ -5,7 +5,7 @@ Delivers event data (not yet time-binned) from local storage and provides client
to request such data from nodes. to request such data from nodes.
*/ */
use crate::agg::MinMaxAvgScalarEventBatchStreamItem; use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem;
use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::inmem::InMemoryFrameAsyncReadStream;
use crate::frame::makeframe::{make_frame, make_term_frame}; use crate::frame::makeframe::{make_frame, make_term_frame};
use crate::raw::bffr::MinMaxAvgScalarEventBatchStreamFromFrames; use crate::raw::bffr::MinMaxAvgScalarEventBatchStreamFromFrames;
+1 -1
View File
@@ -1,4 +1,4 @@
use crate::agg::MinMaxAvgScalarEventBatchStreamItem; use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem;
use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::inmem::InMemoryFrameAsyncReadStream;
use crate::frame::makeframe::decode_frame; use crate::frame::makeframe::decode_frame;
use crate::raw::conn::RawConnOut; use crate::raw::conn::RawConnOut;
+2 -2
View File
@@ -1,6 +1,6 @@
use crate::agg::binnedx::IntoBinnedXBins1; use crate::agg::binnedx::IntoBinnedXBins1;
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::eventbatch::{MinMaxAvgScalarEventBatch, MinMaxAvgScalarEventBatchStreamItem};
use crate::agg::{IntoDim1F32Stream, MinMaxAvgScalarEventBatchStreamItem}; use crate::agg::IntoDim1F32Stream;
use crate::channelconfig::{extract_matching_config_entry, read_local_config}; use crate::channelconfig::{extract_matching_config_entry, read_local_config};
use crate::eventblobs::EventBlobsComplete; use crate::eventblobs::EventBlobsComplete;
use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::inmem::InMemoryFrameAsyncReadStream;
+4 -2
View File
@@ -1,7 +1,7 @@
use crate::spawn_test_hosts; use crate::spawn_test_hosts;
use bytes::BytesMut; use bytes::BytesMut;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use disk::agg::MinMaxAvgScalarBinBatchStreamItem; use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem;
use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::frame::inmem::InMemoryFrameAsyncReadStream;
use err::Error; use err::Error;
use futures_util::StreamExt; use futures_util::StreamExt;
@@ -52,7 +52,9 @@ async fn get_binned_0_inner() -> Result<(), Error> {
&cluster, &cluster,
) )
.await?; .await?;
return Ok(()); if true {
return Ok(());
}
get_binned_channel( get_binned_channel(
"wave-u16-le-n77", "wave-u16-le-n77",
"1970-01-01T01:11:00.000Z", "1970-01-01T01:11:00.000Z",