diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..3530dfa --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags = ["-C", "force-frame-pointers"] diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 4c046f7..e442a5f 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -62,7 +62,23 @@ impl AggregatableXdim1Bin for ValuesDim1 { maxs: Vec::with_capacity(self.tss.len()), avgs: Vec::with_capacity(self.tss.len()), }; - // TODO do the actual binning + for i1 in 0..self.tss.len() { + let ts = self.tss[i1]; + let mut min = f32::MAX; + let mut max = f32::MIN; + let mut sum = 0f32; + let vals = &self.values[i1]; + for i2 in 0..vals.len() { + let v = vals[i2]; + min = min.min(v); + max = max.max(v); + sum += v; + } + ret.tss.push(ts); + ret.mins.push(min); + ret.maxs.push(max); + ret.avgs.push(sum / ret.tss.len() as f32); + } ret } @@ -92,12 +108,24 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatch { impl AggregatableTdim for MinMaxAvgScalarEventBatch { type Output = MinMaxAvgScalarBinBatch; type Aggregator = MinMaxAvgScalarEventBatchAggregator; + fn aggregator_new(&self) -> Self::Aggregator { - todo!() + MinMaxAvgScalarEventBatchAggregator::new() } + } -pub struct MinMaxAvgScalarEventBatchAggregator {} +pub struct MinMaxAvgScalarEventBatchAggregator { +} + +impl MinMaxAvgScalarEventBatchAggregator { + + pub fn new() -> Self { + Self { + } + } + +} impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { type OutputValue = MinMaxAvgScalarBinBatch; @@ -254,20 +282,59 @@ pub struct IntoBinnedXBins1DefaultStream where S: Stream Stream for IntoBinnedXBins1DefaultStream where S: Stream> + Unpin, I: AggregatableXdim1Bin { - type Item = Result; + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(k))) => Ready(Some(Ok(k.into_agg()))), + Ready(Some(Err(e))) => Ready(Some(Err(e))), + Ready(None) => Ready(None), + Pending => Pending, + } + } + +} + + + +pub trait IntoBinnedT { + type StreamOut: Stream; + fn into_binned_t(self) -> Self::StreamOut; +} + +impl IntoBinnedT for T where I: AggregatableTdim, T: Stream> + Unpin { + //type Bla = <::Aggregator as AggregatorTdim>::OutputValue; + type StreamOut = IntoBinnedTDefaultStream; + + fn into_binned_t(self) -> Self::StreamOut { + IntoBinnedTDefaultStream::new(self) + } + +} + +pub struct IntoBinnedTDefaultStream where I: AggregatableTdim, S: Stream> { + inp: S, +} + +impl IntoBinnedTDefaultStream where I: AggregatableTdim, S: Stream> { + + pub fn new(inp: S) -> Self { + Self { + inp: inp, + } + } + +} + +impl Stream for IntoBinnedTDefaultStream where I: AggregatableTdim, T: Stream> + Unpin { + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(k))) => { - let ret = MinMaxAvgScalarEventBatch { - // TODO fill in the details - tss: vec![], - mins: vec![], - maxs: vec![], - avgs: vec![], - }; - Ready(Some(Ok(ret))) + todo!() } Ready(Some(Err(e))) => Ready(Some(Err(e))), Ready(None) => Ready(None), @@ -281,6 +348,7 @@ impl Stream for IntoBinnedXBins1DefaultStream where S: Stream