#[allow(unused_imports)] use tracing::{error, warn, info, debug, trace}; use err::Error; use std::task::{Context, Poll}; use std::pin::Pin; use crate::EventFull; use futures_core::Stream; use futures_util::{pin_mut, StreamExt, future::ready}; use netpod::{Channel, ChannelConfig, ScalarType, Shape, Node, timeunits::*}; use crate::merge::MergeDim1F32Stream; use netpod::BinSpecDimT; use std::sync::Arc; pub trait AggregatorTdim { type InputValue; type OutputValue: AggregatableXdim1Bin + AggregatableTdim; fn ends_before(&self, inp: &Self::InputValue) -> bool; fn ends_after(&self, inp: &Self::InputValue) -> bool; fn starts_after(&self, inp: &Self::InputValue) -> bool; fn ingest(&mut self, inp: &Self::InputValue); fn result(self) -> Self::OutputValue; } pub trait AggregatableXdim1Bin { type Output: AggregatableXdim1Bin + AggregatableTdim; fn into_agg(self) -> Self::Output; } pub trait AggregatableTdim { type Output: AggregatableXdim1Bin + AggregatableTdim; type Aggregator: AggregatorTdim; fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator; } // dummy impl AggregatableXdim1Bin for () { type Output = (); fn into_agg(self) -> Self::Output { todo!() } } impl AggregatableTdim for () { type Output = (); type Aggregator = (); fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator { todo!() } } impl AggregatorTdim for () { type InputValue = (); type OutputValue = (); fn ends_before(&self, inp: &Self::InputValue) -> bool { todo!() } fn ends_after(&self, inp: &Self::InputValue) -> bool { todo!() } fn starts_after(&self, inp: &Self::InputValue) -> bool { todo!() } fn ingest(&mut self, v: &Self::InputValue) { todo!() } fn result(self) -> Self::OutputValue { todo!() } } pub struct ValuesDim0 { tss: Vec, values: Vec>, } impl std::fmt::Debug for ValuesDim0 { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { write!(fmt, "count {} tsA {:?} tsB {:?}", self.tss.len(), self.tss.first(), self.tss.last()) } } impl AggregatableXdim1Bin for ValuesDim1 { type Output = MinMaxAvgScalarEventBatch; fn into_agg(self) -> Self::Output { let mut ret = MinMaxAvgScalarEventBatch { tss: Vec::with_capacity(self.tss.len()), mins: Vec::with_capacity(self.tss.len()), maxs: Vec::with_capacity(self.tss.len()), avgs: Vec::with_capacity(self.tss.len()), }; for i1 in 0..self.tss.len() { let ts = self.tss[i1]; let mut min = f32::MAX; let mut max = f32::MIN; let mut sum = 0f32; let vals = &self.values[i1]; assert!(vals.len() > 0); for i2 in 0..vals.len() { let v = vals[i2]; //info!("value {} {} {}", i1, i2, v); min = min.min(v); max = max.max(v); sum += v; } if min == f32::MAX { min = f32::NAN; } if max == f32::MIN { max = f32::NAN; } ret.tss.push(ts); ret.mins.push(min); ret.maxs.push(max); ret.avgs.push(sum / vals.len() as f32); } ret } } pub struct ValuesDim1 { pub tss: Vec, pub values: Vec>, } impl ValuesDim1 { pub fn empty() -> Self { Self { tss: vec![], values: vec![], } } } impl std::fmt::Debug for ValuesDim1 { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { write!(fmt, "count {} tsA {:?} tsB {:?}", self.tss.len(), self.tss.first(), self.tss.last()) } } impl AggregatableXdim1Bin for ValuesDim0 { type Output = MinMaxAvgScalarEventBatch; fn into_agg(self) -> Self::Output { let mut ret = MinMaxAvgScalarEventBatch { tss: Vec::with_capacity(self.tss.len()), mins: Vec::with_capacity(self.tss.len()), maxs: Vec::with_capacity(self.tss.len()), avgs: Vec::with_capacity(self.tss.len()), }; for i1 in 0..self.tss.len() { let ts = self.tss[i1]; let mut min = f32::MAX; let mut max = f32::MIN; let mut sum = 0f32; let vals = &self.values[i1]; assert!(vals.len() > 0); for i2 in 0..vals.len() { let v = vals[i2]; //info!("value {} {} {}", i1, i2, v); min = min.min(v); max = max.max(v); sum += v; } if min == f32::MAX { min = f32::NAN; } if max == f32::MIN { max = f32::NAN; } ret.tss.push(ts); ret.mins.push(min); ret.maxs.push(max); ret.avgs.push(sum / vals.len() as f32); } ret } } pub struct MinMaxAvgScalarEventBatch { tss: Vec, mins: Vec, maxs: Vec, avgs: Vec, } impl std::fmt::Debug for MinMaxAvgScalarEventBatch { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { write!(fmt, "MinMaxAvgScalarEventBatch count {}", self.tss.len()) } } impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatch { type Output = MinMaxAvgScalarEventBatch; fn into_agg(self) -> Self::Output { self } } impl AggregatableTdim for MinMaxAvgScalarEventBatch { type Output = MinMaxAvgScalarBinBatch; type Aggregator = MinMaxAvgScalarEventBatchAggregator; fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator { MinMaxAvgScalarEventBatchAggregator::new(ts1, ts2) } } pub struct MinMaxAvgScalarEventBatchAggregator { ts1: u64, ts2: u64, count: u64, min: f32, max: f32, sum: f32, } impl MinMaxAvgScalarEventBatchAggregator { pub fn new(ts1: u64, ts2: u64) -> Self { Self { ts1, ts2, min: f32::MAX, max: f32::MIN, sum: 0f32, count: 0, } } } impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { type InputValue = MinMaxAvgScalarEventBatch; type OutputValue = MinMaxAvgScalarBinSingle; fn ends_before(&self, inp: &Self::InputValue) -> bool { match inp.tss.last() { Some(ts) => { *ts < self.ts1 } None => true, } } fn ends_after(&self, inp: &Self::InputValue) -> bool { match inp.tss.last() { Some(ts) => { *ts >= self.ts2 } _ => panic!() } } fn starts_after(&self, inp: &Self::InputValue) -> bool { match inp.tss.first() { Some(ts) => { *ts >= self.ts2 } _ => panic!() } } fn ingest(&mut self, v: &Self::InputValue) { for i1 in 0..v.tss.len() { let ts = v.tss[i1]; if ts < self.ts1 { //info!("EventBatchAgg {} {} {} {} IS BEFORE", v.tss[i1], v.mins[i1], v.maxs[i1], v.avgs[i1]); continue; } else if ts >= self.ts2 { //info!("EventBatchAgg {} {} {} {} IS AFTER", v.tss[i1], v.mins[i1], v.maxs[i1], v.avgs[i1]); continue; } else { //info!("EventBatchAgg {} {} {} {}", v.tss[i1], v.mins[i1], v.maxs[i1], v.avgs[i1]); self.min = self.min.min(v.mins[i1]); self.max = self.max.max(v.maxs[i1]); self.sum += v.avgs[i1]; self.count += 1; } } } fn result(self) -> Self::OutputValue { let min = if self.min == f32::MAX { f32::NAN } else { self.min }; let max = if self.max == f32::MIN { f32::NAN } else { self.max }; let avg = if self.count == 0 { f32::NAN } else { self.sum / self.count as f32 }; MinMaxAvgScalarBinSingle { ts1: self.ts1, ts2: self.ts2, count: self.count, min, max, avg, } } } pub struct MinMaxAvgScalarBinBatch { ts1s: Vec, ts2s: Vec, counts: Vec, mins: Vec, maxs: Vec, avgs: Vec, } impl std::fmt::Debug for MinMaxAvgScalarBinBatch { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { write!(fmt, "MinMaxAvgScalarBinBatch count {}", self.ts1s.len()) } } impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatch { type Output = MinMaxAvgScalarBinBatch; fn into_agg(self) -> Self::Output { todo!() } } impl AggregatableTdim for MinMaxAvgScalarBinBatch { type Output = MinMaxAvgScalarBinSingle; type Aggregator = MinMaxAvgScalarBinBatchAggregator; fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator { todo!() } } pub struct MinMaxAvgScalarBinBatchAggregator {} impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { type InputValue = MinMaxAvgScalarBinBatch; type OutputValue = MinMaxAvgScalarBinSingle; fn ends_before(&self, inp: &Self::InputValue) -> bool { todo!() } fn ends_after(&self, inp: &Self::InputValue) -> bool { todo!() } fn starts_after(&self, inp: &Self::InputValue) -> bool { todo!() } fn ingest(&mut self, v: &Self::InputValue) { todo!() } fn result(self) -> Self::OutputValue { todo!() } } pub struct MinMaxAvgScalarBinSingle { ts1: u64, ts2: u64, count: u64, min: f32, max: f32, avg: f32, } impl std::fmt::Debug for MinMaxAvgScalarBinSingle { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { write!(fmt, "MinMaxAvgScalarBinSingle ts1 {} ts2 {} count {} min {:7.2e} max {:7.2e} avg {:7.2e}", self.ts1, self.ts2, self.count, self.min, self.max, self.avg) } } impl AggregatableTdim for MinMaxAvgScalarBinSingle { type Output = MinMaxAvgScalarBinSingle; type Aggregator = MinMaxAvgScalarBinSingleAggregator; fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator { todo!() } } impl AggregatableXdim1Bin for MinMaxAvgScalarBinSingle { type Output = MinMaxAvgScalarBinSingle; fn into_agg(self) -> Self::Output { self } } pub struct MinMaxAvgScalarBinSingleAggregator {} impl AggregatorTdim for MinMaxAvgScalarBinSingleAggregator { type InputValue = MinMaxAvgScalarBinSingle; type OutputValue = MinMaxAvgScalarBinSingle; fn ends_before(&self, inp: &Self::InputValue) -> bool { todo!() } fn ends_after(&self, inp: &Self::InputValue) -> bool { todo!() } fn starts_after(&self, inp: &Self::InputValue) -> bool { todo!() } fn ingest(&mut self, v: &Self::InputValue) { todo!() } fn result(self) -> Self::OutputValue { todo!() } } pub struct Dim0F32Stream where S: Stream> { inp: S, } impl Stream for Dim0F32Stream where S: Stream> + Unpin { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(k))) => { let mut ret = ValuesDim1 { tss: vec![], values: vec![], }; use ScalarType::*; for i1 in 0..k.tss.len() { // TODO iterate sibling arrays after single bounds check let ty = &k.scalar_types[i1]; let decomp = k.decomps[i1].as_ref().unwrap(); match ty { F64 => { const BY: usize = 8; // do the conversion // TODO only a scalar! todo!(); let n1 = decomp.len(); assert!(n1 % ty.bytes() as usize == 0); let ele_count = n1 / ty.bytes() as usize; let mut j = Vec::with_capacity(ele_count); // this is safe for ints and floats unsafe { j.set_len(ele_count); } let mut p1 = 0; for i1 in 0..ele_count { let u = unsafe { let mut r = [0u8; BY]; std::ptr::copy_nonoverlapping(&decomp[p1], r.as_mut_ptr(), BY); f64::from_be_bytes(r) //f64::from_be_bytes(std::mem::transmute::<_, [u8; 8]>(&decomp[p1])) }; j[i1] = u as f32; p1 += BY; } ret.tss.push(k.tss[i1]); ret.values.push(j); }, _ => todo!() } } Ready(Some(Ok(todo!()))) } Ready(Some(Err(e))) => Ready(Some(Err(e))), Ready(None) => Ready(None), Pending => Pending, } } } pub trait IntoDim0F32Stream { fn into_dim_0_f32_stream(self) -> Dim0F32Stream where Self: Stream> + Sized; } impl IntoDim0F32Stream for T where T: Stream> { fn into_dim_0_f32_stream(self) -> Dim0F32Stream { Dim0F32Stream { inp: self, } } } pub struct Dim1F32Stream where S: Stream> { inp: S, } impl Stream for Dim1F32Stream where S: Stream> + Unpin { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(k))) => { let mut ret = ValuesDim1 { tss: vec![], values: vec![], }; use ScalarType::*; for i1 in 0..k.tss.len() { // TODO iterate sibling arrays after single bounds check let ty = &k.scalar_types[i1]; let decomp = k.decomps[i1].as_ref().unwrap(); match ty { F64 => { const BY: usize = 8; // do the conversion let n1 = decomp.len(); assert!(n1 % ty.bytes() as usize == 0); let ele_count = n1 / ty.bytes() as usize; let mut j = Vec::with_capacity(ele_count); // this is safe for ints and floats unsafe { j.set_len(ele_count); } let mut p1 = 0; for i1 in 0..ele_count { let u = unsafe { let mut r = [0u8; BY]; std::ptr::copy_nonoverlapping(&decomp[p1], r.as_mut_ptr(), BY); f64::from_be_bytes(r) //f64::from_be_bytes(std::mem::transmute::<_, [u8; 8]>(&decomp[p1])) }; j[i1] = u as f32; p1 += BY; } ret.tss.push(k.tss[i1]); ret.values.push(j); }, _ => todo!() } } Ready(Some(Ok(ret))) } Ready(Some(Err(e))) => Ready(Some(Err(e))), Ready(None) => Ready(None), Pending => Pending, } } } pub trait IntoDim1F32Stream { fn into_dim_1_f32_stream(self) -> Dim1F32Stream where Self: Stream> + Sized; } impl IntoDim1F32Stream for T where T: Stream> { fn into_dim_1_f32_stream(self) -> Dim1F32Stream { Dim1F32Stream { inp: self, } } } pub trait IntoBinnedXBins1 { type StreamOut; fn into_binned_x_bins_1(self) -> Self::StreamOut where Self: Stream>; } impl IntoBinnedXBins1 for T where T: Stream> + Unpin { type StreamOut = IntoBinnedXBins1DefaultStream; fn into_binned_x_bins_1(self) -> Self::StreamOut { IntoBinnedXBins1DefaultStream { inp: self, } } } pub struct IntoBinnedXBins1DefaultStream where S: Stream> + Unpin, I: AggregatableXdim1Bin { inp: S, } impl Stream for IntoBinnedXBins1DefaultStream where S: Stream> + Unpin, I: AggregatableXdim1Bin { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(k))) => Ready(Some(Ok(k.into_agg()))), Ready(Some(Err(e))) => Ready(Some(Err(e))), Ready(None) => Ready(None), Pending => Pending, } } } pub trait IntoBinnedT { type StreamOut: Stream; fn into_binned_t(self, spec: BinSpecDimT) -> Self::StreamOut; } impl IntoBinnedT for T where I: AggregatableTdim + Unpin, T: Stream> + Unpin, I::Aggregator: Unpin { type StreamOut = IntoBinnedTDefaultStream; fn into_binned_t(self, spec: BinSpecDimT) -> Self::StreamOut { IntoBinnedTDefaultStream::new(self, spec) } } pub struct IntoBinnedTDefaultStream where I: AggregatableTdim, S: Stream> { inp: S, aggtor: Option, spec: BinSpecDimT, curbin: u32, left: Option>>>, } impl IntoBinnedTDefaultStream where I: AggregatableTdim, S: Stream> { pub fn new(inp: S, spec: BinSpecDimT) -> Self { //info!("spec ts {} {}", spec.ts1, spec.ts2); Self { inp, aggtor: None, spec, curbin: 0, left: None, } } } impl Stream for IntoBinnedTDefaultStream where I: AggregatableTdim + Unpin, T: Stream> + Unpin, I::Aggregator: Unpin { type Item = Result<::OutputValue, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; 'outer: loop { let cur = if self.curbin as u64 >= self.spec.count { Ready(None) } else if let Some(k) = self.left.take() { k } else { self.inp.poll_next_unpin(cx) }; break match cur { Ready(Some(Ok(k))) => { if self.aggtor.is_none() { let range = self.spec.get_range(self.curbin); //info!("range: {} {}", range.ts1, range.ts2); self.aggtor = Some(k.aggregator_new(range.beg, range.end)); } let ag = self.aggtor.as_mut().unwrap(); if ag.ends_before(&k) { //info!("ENDS BEFORE"); continue 'outer; } else if ag.starts_after(&k) { //info!("STARTS AFTER"); self.left = Some(Ready(Some(Ok(k)))); self.curbin += 1; Ready(Some(Ok(self.aggtor.take().unwrap().result()))) } else { //info!("INGEST"); ag.ingest(&k); // if this input contains also data after the current bin, then I need to keep // it for the next round. if ag.ends_after(&k) { //info!("ENDS AFTER"); self.left = Some(Ready(Some(Ok(k)))); self.curbin += 1; Ready(Some(Ok(self.aggtor.take().unwrap().result()))) } else { //info!("ENDS WITHIN"); continue 'outer; } } } Ready(Some(Err(e))) => Ready(Some(Err(e))), Ready(None) => { match self.aggtor.take() { Some(ag) => { Ready(Some(Ok(ag.result()))) } None => { warn!("TODO add trailing bins"); Ready(None) } } }, Pending => Pending, }; } } } pub fn make_test_node(id: u32) -> Node { Node { id, host: "localhost".into(), port: 8800 + id as u16, data_base_path: format!("../tmpdata/node{:02}", id).into(), split: id, ksprefix: "ks".into(), } } #[test] fn agg_x_dim_0() { taskrun::run(async { agg_x_dim_0_inner().await; Ok(()) }).unwrap(); } async fn agg_x_dim_0_inner() { let node = make_test_node(0); let node = Arc::new(node); let query = netpod::AggQuerySingleChannel { channel_config: ChannelConfig { channel: Channel { backend: "sf-databuffer".into(), name: "S10BC01-DBAM070:EOM1_T1".into(), }, keyspace: 2, time_bin_size: DAY, array: false, shape: Shape::Scalar, scalar_type: ScalarType::F64, big_endian: true, compression: true, }, timebin: 18723, tb_file_count: 1, buffer_size: 1024 * 4, }; let bin_count = 20; let ts1 = query.timebin as u64 * query.channel_config.time_bin_size; let ts2 = ts1 + HOUR * 24; let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node) .into_dim_1_f32_stream() //.take(1000) .map(|q| { if let Ok(ref k) = q { //info!("vals: {:?}", k); } q }) .into_binned_x_bins_1() .map(|k| { //info!("after X binning {:?}", k.as_ref().unwrap()); k }) .into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2)) .map(|k| { info!("after T binning {:?}", k.as_ref().unwrap()); k }) .for_each(|k| ready(())); fut1.await; } #[test] fn agg_x_dim_1() { taskrun::run(async { agg_x_dim_1_inner().await; Ok(()) }).unwrap(); } async fn agg_x_dim_1_inner() { // sf-databuffer // /data/sf-databuffer/daq_swissfel/daq_swissfel_3/byTime/S10BC01-DBAM070\:BAM_CH1_NORM/* // S10BC01-DBAM070:BAM_CH1_NORM let node = make_test_node(0); let node = Arc::new(node); let query = netpod::AggQuerySingleChannel { channel_config: ChannelConfig { channel: Channel { backend: "ks".into(), name: "wave1".into(), }, keyspace: 3, time_bin_size: DAY, array: true, shape: Shape::Wave(1024), scalar_type: ScalarType::F64, big_endian: true, compression: true, }, timebin: 0, tb_file_count: 1, buffer_size: 17, }; let bin_count = 10; let ts1 = query.timebin as u64 * query.channel_config.time_bin_size; let ts2 = ts1 + HOUR * 24; let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node) .into_dim_1_f32_stream() //.take(1000) .map(|q| { if let Ok(ref k) = q { //info!("vals: {:?}", k); } q }) .into_binned_x_bins_1() .map(|k| { //info!("after X binning {:?}", k.as_ref().unwrap()); k }) .into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2)) .map(|k| { info!("after T binning {:?}", k.as_ref().unwrap()); k }) .for_each(|k| ready(())); fut1.await; } #[test] fn merge_0() { taskrun::run(async { merge_0_inner().await; Ok(()) }).unwrap(); } async fn merge_0_inner() { let query = netpod::AggQuerySingleChannel { channel_config: ChannelConfig { channel: Channel { backend: "ks".into(), name: "wave1".into(), }, keyspace: 3, time_bin_size: DAY, array: true, shape: Shape::Wave(17), scalar_type: ScalarType::F64, big_endian: true, compression: true, }, timebin: 0, tb_file_count: 1, buffer_size: 1024 * 8, }; let streams = (0..13).into_iter() .map(|k| { make_test_node(k) }) .map(|node| { let node = Arc::new(node); crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node) .into_dim_1_f32_stream() }) .collect(); MergeDim1F32Stream::new(streams) .map(|k| { //info!("NEXT MERGED ITEM ts {:?}", k.as_ref().unwrap().tss); }) .fold(0, |k, q| ready(0)) .await; } pub fn tmp_some_older_things() { let vals = ValuesDim1 { tss: vec![0, 1, 2, 3], values: vec![ vec![0., 0., 0.], vec![1., 1., 1.], vec![2., 2., 2.], vec![3., 3., 3.], ], }; // I want to distinguish already in the outer part between dim-0 and dim-1 and generate // separate code for these cases... // That means that also the reading chain itself needs to be typed on that. // Need to supply some event-payload converter type which has that type as Output type. let vals2 = vals.into_agg(); // Now the T-binning: /* T-aggregator must be able to produce empty-values of correct type even if we never get a single value of input data. Therefore, it needs the bin range definition. How do I want to drive the system? If I write the T-binner as a Stream, then I also need to pass it the input! Meaning, I need to pass the Stream which produces the actual numbers from disk. readchannel() -> Stream of timestamped byte blobs .to_f32() -> Stream ? indirection to branch on the underlying shape .agg_x_bins_1() -> Stream ? can I keep it at the single indirection on the top level? */ }