From 3b062b2f5c6594440924766314824282fc6071ff Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 8 Apr 2021 18:03:31 +0200 Subject: [PATCH] WIP on dim-0 --- disk/src/agg.rs | 193 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 178 insertions(+), 15 deletions(-) diff --git a/disk/src/agg.rs b/disk/src/agg.rs index b75f784..b9d9690 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -63,6 +63,54 @@ impl AggregatorTdim for () { } +pub struct ValuesDim0 { + tss: Vec, + values: Vec>, +} + +impl std::fmt::Debug for ValuesDim0 { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(fmt, "count {} tsA {:?} tsB {:?}", self.tss.len(), self.tss.first(), self.tss.last()) + } +} + +impl AggregatableXdim1Bin for ValuesDim1 { + type Output = MinMaxAvgScalarEventBatch; + + fn into_agg(self) -> Self::Output { + let mut ret = MinMaxAvgScalarEventBatch { + tss: Vec::with_capacity(self.tss.len()), + mins: Vec::with_capacity(self.tss.len()), + maxs: Vec::with_capacity(self.tss.len()), + avgs: Vec::with_capacity(self.tss.len()), + }; + for i1 in 0..self.tss.len() { + let ts = self.tss[i1]; + let mut min = f32::MAX; + let mut max = f32::MIN; + let mut sum = 0f32; + let vals = &self.values[i1]; + assert!(vals.len() > 0); + for i2 in 0..vals.len() { + let v = vals[i2]; + //info!("value {} {} {}", i1, i2, v); + min = min.min(v); + max = max.max(v); + sum += v; + } + if min == f32::MAX { min = f32::NAN; } + if max == f32::MIN { max = f32::NAN; } + ret.tss.push(ts); + ret.mins.push(min); + ret.maxs.push(max); + ret.avgs.push(sum / vals.len() as f32); + } + ret + } + +} + + pub struct ValuesDim1 { tss: Vec, values: Vec>, @@ -74,7 +122,7 @@ impl std::fmt::Debug for ValuesDim1 { } } -impl AggregatableXdim1Bin for ValuesDim1 { +impl AggregatableXdim1Bin for ValuesDim0 { type Output = MinMaxAvgScalarEventBatch; fn into_agg(self) -> Self::Output { @@ -349,15 +397,89 @@ impl AggregatorTdim for MinMaxAvgScalarBinSingleAggregator { -pub struct Dim1F32Stream -where S: Stream> -{ +pub struct Dim0F32Stream where S: Stream> { inp: S, } -impl Stream for Dim1F32Stream -where S: Stream> + Unpin -{ +impl Stream for Dim0F32Stream where S: Stream> + Unpin { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(k))) => { + let mut ret = ValuesDim1 { + tss: vec![], + values: vec![], + }; + use ScalarType::*; + for i1 in 0..k.tss.len() { + // TODO iterate sibling arrays after single bounds check + let ty = &k.scalar_types[i1]; + let decomp = k.decomps[i1].as_ref().unwrap(); + match ty { + F64 => { + const BY: usize = 8; + // do the conversion + + // TODO only a scalar! + todo!(); + + let n1 = decomp.len(); + assert!(n1 % ty.bytes() as usize == 0); + let ele_count = n1 / ty.bytes() as usize; + let mut j = Vec::with_capacity(ele_count); + // this is safe for ints and floats + unsafe { j.set_len(ele_count); } + let mut p1 = 0; + for i1 in 0..ele_count { + let u = unsafe { + let mut r = [0u8; BY]; + std::ptr::copy_nonoverlapping(&decomp[p1], r.as_mut_ptr(), BY); + f64::from_be_bytes(r) + //f64::from_be_bytes(std::mem::transmute::<_, [u8; 8]>(&decomp[p1])) + }; + j[i1] = u as f32; + p1 += BY; + } + ret.tss.push(k.tss[i1]); + ret.values.push(j); + }, + _ => todo!() + } + } + Ready(Some(Ok(todo!()))) + } + Ready(Some(Err(e))) => Ready(Some(Err(e))), + Ready(None) => Ready(None), + Pending => Pending, + } + } + +} + +pub trait IntoDim0F32Stream { + fn into_dim_0_f32_stream(self) -> Dim0F32Stream where Self: Stream> + Sized; +} + +impl IntoDim0F32Stream for T where T: Stream> { + + fn into_dim_0_f32_stream(self) -> Dim0F32Stream { + Dim0F32Stream { + inp: self, + } + } + +} + + + + +pub struct Dim1F32Stream where S: Stream> { + inp: S, +} + +impl Stream for Dim1F32Stream where S: Stream> + Unpin { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { @@ -411,17 +533,12 @@ where S: Stream> + Unpin } pub trait IntoDim1F32Stream { - fn into_dim_1_f32_stream(self) -> Dim1F32Stream - where Self: Sized, - Self: Stream>; + fn into_dim_1_f32_stream(self) -> Dim1F32Stream where Self: Stream> + Sized; } -impl IntoDim1F32Stream for T - where T: Stream> -{ +impl IntoDim1F32Stream for T where T: Stream> { - fn into_dim_1_f32_stream(self) -> Dim1F32Stream - { + fn into_dim_1_f32_stream(self) -> Dim1F32Stream { Dim1F32Stream { inp: self, } @@ -647,6 +764,52 @@ const WEEK: u64 = DAY * 7; +#[test] +fn agg_x_dim_0() { + crate::run(async { agg_x_dim_0_inner().await; Ok(()) }).unwrap(); +} + +async fn agg_x_dim_0_inner() { + let query = netpod::AggQuerySingleChannel { + ksprefix: "daq_swissfel".into(), + keyspace: 2, + channel: netpod::Channel { + name: "S10BC01-DBAM070:EOM1_T1".into(), + backend: "sf-databuffer".into(), + }, + timebin: 18723, + tb_file_count: 1, + split: 12, + tbsize: 1000 * 60 * 60 * 24, + buffer_size: 1024 * 4, + }; + let bin_count = 20; + let ts1 = query.timebin as u64 * query.tbsize as u64 * MS; + let ts2 = ts1 + HOUR * 24; + let fut1 = crate::EventBlobsComplete::new(&query) + .into_dim_1_f32_stream() + //.take(1000) + .map(|q| { + if let Ok(ref k) = q { + //info!("vals: {:?}", k); + } + q + }) + .into_binned_x_bins_1() + .map(|k| { + //info!("after X binning {:?}", k.as_ref().unwrap()); + k + }) + .into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2)) + .map(|k| { + info!("after T binning {:?}", k.as_ref().unwrap()); + k + }) + .for_each(|k| ready(())); + fut1.await; +} + + #[test] fn agg_x_dim_1() { crate::run(async { agg_x_dim_1_inner().await; Ok(()) }).unwrap();