it checks
This commit is contained in:
2
.cargo/config.toml
Normal file
2
.cargo/config.toml
Normal file
@@ -0,0 +1,2 @@
|
||||
[build]
|
||||
rustflags = ["-C", "force-frame-pointers"]
|
||||
@@ -62,7 +62,23 @@ impl AggregatableXdim1Bin for ValuesDim1 {
|
||||
maxs: Vec::with_capacity(self.tss.len()),
|
||||
avgs: Vec::with_capacity(self.tss.len()),
|
||||
};
|
||||
// TODO do the actual binning
|
||||
for i1 in 0..self.tss.len() {
|
||||
let ts = self.tss[i1];
|
||||
let mut min = f32::MAX;
|
||||
let mut max = f32::MIN;
|
||||
let mut sum = 0f32;
|
||||
let vals = &self.values[i1];
|
||||
for i2 in 0..vals.len() {
|
||||
let v = vals[i2];
|
||||
min = min.min(v);
|
||||
max = max.max(v);
|
||||
sum += v;
|
||||
}
|
||||
ret.tss.push(ts);
|
||||
ret.mins.push(min);
|
||||
ret.maxs.push(max);
|
||||
ret.avgs.push(sum / ret.tss.len() as f32);
|
||||
}
|
||||
ret
|
||||
}
|
||||
|
||||
@@ -92,12 +108,24 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatch {
|
||||
impl AggregatableTdim for MinMaxAvgScalarEventBatch {
|
||||
type Output = MinMaxAvgScalarBinBatch;
|
||||
type Aggregator = MinMaxAvgScalarEventBatchAggregator;
|
||||
|
||||
fn aggregator_new(&self) -> Self::Aggregator {
|
||||
todo!()
|
||||
MinMaxAvgScalarEventBatchAggregator::new()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
pub struct MinMaxAvgScalarEventBatchAggregator {}
|
||||
pub struct MinMaxAvgScalarEventBatchAggregator {
|
||||
}
|
||||
|
||||
impl MinMaxAvgScalarEventBatchAggregator {
|
||||
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
|
||||
type OutputValue = MinMaxAvgScalarBinBatch;
|
||||
@@ -254,20 +282,59 @@ pub struct IntoBinnedXBins1DefaultStream<S, I> where S: Stream<Item=Result<I, Er
|
||||
}
|
||||
|
||||
impl<S, I> Stream for IntoBinnedXBins1DefaultStream<S, I> where S: Stream<Item=Result<I, Error>> + Unpin, I: AggregatableXdim1Bin {
|
||||
type Item = Result<MinMaxAvgScalarEventBatch, Error>;
|
||||
type Item = Result<I::Output, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
match self.inp.poll_next_unpin(cx) {
|
||||
Ready(Some(Ok(k))) => Ready(Some(Ok(k.into_agg()))),
|
||||
Ready(Some(Err(e))) => Ready(Some(Err(e))),
|
||||
Ready(None) => Ready(None),
|
||||
Pending => Pending,
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
pub trait IntoBinnedT {
|
||||
type StreamOut: Stream;
|
||||
fn into_binned_t(self) -> Self::StreamOut;
|
||||
}
|
||||
|
||||
impl<T, I> IntoBinnedT for T where I: AggregatableTdim, T: Stream<Item=Result<I, Error>> + Unpin {
|
||||
//type Bla = <<I as AggregatableTdim>::Aggregator as AggregatorTdim>::OutputValue;
|
||||
type StreamOut = IntoBinnedTDefaultStream<T, I>;
|
||||
|
||||
fn into_binned_t(self) -> Self::StreamOut {
|
||||
IntoBinnedTDefaultStream::new(self)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
pub struct IntoBinnedTDefaultStream<S, I> where I: AggregatableTdim, S: Stream<Item=Result<I, Error>> {
|
||||
inp: S,
|
||||
}
|
||||
|
||||
impl<S, I> IntoBinnedTDefaultStream<S, I> where I: AggregatableTdim, S: Stream<Item=Result<I, Error>> {
|
||||
|
||||
pub fn new(inp: S) -> Self {
|
||||
Self {
|
||||
inp: inp,
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl<T, I> Stream for IntoBinnedTDefaultStream<T, I> where I: AggregatableTdim, T: Stream<Item=Result<I, Error>> + Unpin {
|
||||
type Item = Result<MinMaxAvgScalarBinSingle, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
match self.inp.poll_next_unpin(cx) {
|
||||
Ready(Some(Ok(k))) => {
|
||||
let ret = MinMaxAvgScalarEventBatch {
|
||||
// TODO fill in the details
|
||||
tss: vec![],
|
||||
mins: vec![],
|
||||
maxs: vec![],
|
||||
avgs: vec![],
|
||||
};
|
||||
Ready(Some(Ok(ret)))
|
||||
todo!()
|
||||
}
|
||||
Ready(Some(Err(e))) => Ready(Some(Err(e))),
|
||||
Ready(None) => Ready(None),
|
||||
@@ -281,6 +348,7 @@ impl<S, I> Stream for IntoBinnedXBins1DefaultStream<S, I> where S: Stream<Item=R
|
||||
|
||||
|
||||
|
||||
|
||||
#[test]
|
||||
fn agg_x_dim_1() {
|
||||
crate::run(async { agg_x_dim_1_inner().await; Ok(()) }).unwrap();
|
||||
@@ -339,10 +407,10 @@ async fn agg_x_dim_1_inner() {
|
||||
})
|
||||
.into_binned_x_bins_1()
|
||||
.map(|k| {
|
||||
let k = k.unwrap();
|
||||
info!("after X binning {:?}", k);
|
||||
info!("after X binning {:?}", k.as_ref().unwrap());
|
||||
k
|
||||
})
|
||||
.into_binned_t()
|
||||
.for_each(|k| ready(()));
|
||||
fut1.await;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user