WIP on dim-0
This commit is contained in:
+178
-15
@@ -63,6 +63,54 @@ impl AggregatorTdim for () {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub struct ValuesDim0 {
|
||||||
|
tss: Vec<u64>,
|
||||||
|
values: Vec<Vec<f32>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Debug for ValuesDim0 {
|
||||||
|
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||||
|
write!(fmt, "count {} tsA {:?} tsB {:?}", self.tss.len(), self.tss.first(), self.tss.last())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AggregatableXdim1Bin for ValuesDim1 {
|
||||||
|
type Output = MinMaxAvgScalarEventBatch;
|
||||||
|
|
||||||
|
fn into_agg(self) -> Self::Output {
|
||||||
|
let mut ret = MinMaxAvgScalarEventBatch {
|
||||||
|
tss: Vec::with_capacity(self.tss.len()),
|
||||||
|
mins: Vec::with_capacity(self.tss.len()),
|
||||||
|
maxs: Vec::with_capacity(self.tss.len()),
|
||||||
|
avgs: Vec::with_capacity(self.tss.len()),
|
||||||
|
};
|
||||||
|
for i1 in 0..self.tss.len() {
|
||||||
|
let ts = self.tss[i1];
|
||||||
|
let mut min = f32::MAX;
|
||||||
|
let mut max = f32::MIN;
|
||||||
|
let mut sum = 0f32;
|
||||||
|
let vals = &self.values[i1];
|
||||||
|
assert!(vals.len() > 0);
|
||||||
|
for i2 in 0..vals.len() {
|
||||||
|
let v = vals[i2];
|
||||||
|
//info!("value {} {} {}", i1, i2, v);
|
||||||
|
min = min.min(v);
|
||||||
|
max = max.max(v);
|
||||||
|
sum += v;
|
||||||
|
}
|
||||||
|
if min == f32::MAX { min = f32::NAN; }
|
||||||
|
if max == f32::MIN { max = f32::NAN; }
|
||||||
|
ret.tss.push(ts);
|
||||||
|
ret.mins.push(min);
|
||||||
|
ret.maxs.push(max);
|
||||||
|
ret.avgs.push(sum / vals.len() as f32);
|
||||||
|
}
|
||||||
|
ret
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
pub struct ValuesDim1 {
|
pub struct ValuesDim1 {
|
||||||
tss: Vec<u64>,
|
tss: Vec<u64>,
|
||||||
values: Vec<Vec<f32>>,
|
values: Vec<Vec<f32>>,
|
||||||
@@ -74,7 +122,7 @@ impl std::fmt::Debug for ValuesDim1 {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AggregatableXdim1Bin for ValuesDim1 {
|
impl AggregatableXdim1Bin for ValuesDim0 {
|
||||||
type Output = MinMaxAvgScalarEventBatch;
|
type Output = MinMaxAvgScalarEventBatch;
|
||||||
|
|
||||||
fn into_agg(self) -> Self::Output {
|
fn into_agg(self) -> Self::Output {
|
||||||
@@ -349,15 +397,89 @@ impl AggregatorTdim for MinMaxAvgScalarBinSingleAggregator {
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
pub struct Dim1F32Stream<S>
|
pub struct Dim0F32Stream<S> where S: Stream<Item=Result<EventFull, Error>> {
|
||||||
where S: Stream<Item=Result<EventFull, Error>>
|
|
||||||
{
|
|
||||||
inp: S,
|
inp: S,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S> Stream for Dim1F32Stream<S>
|
impl<S> Stream for Dim0F32Stream<S> where S: Stream<Item=Result<EventFull, Error>> + Unpin {
|
||||||
where S: Stream<Item=Result<EventFull, Error>> + Unpin
|
type Item = Result<ValuesDim0, Error>;
|
||||||
{
|
|
||||||
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
|
use Poll::*;
|
||||||
|
match self.inp.poll_next_unpin(cx) {
|
||||||
|
Ready(Some(Ok(k))) => {
|
||||||
|
let mut ret = ValuesDim1 {
|
||||||
|
tss: vec![],
|
||||||
|
values: vec![],
|
||||||
|
};
|
||||||
|
use ScalarType::*;
|
||||||
|
for i1 in 0..k.tss.len() {
|
||||||
|
// TODO iterate sibling arrays after single bounds check
|
||||||
|
let ty = &k.scalar_types[i1];
|
||||||
|
let decomp = k.decomps[i1].as_ref().unwrap();
|
||||||
|
match ty {
|
||||||
|
F64 => {
|
||||||
|
const BY: usize = 8;
|
||||||
|
// do the conversion
|
||||||
|
|
||||||
|
// TODO only a scalar!
|
||||||
|
todo!();
|
||||||
|
|
||||||
|
let n1 = decomp.len();
|
||||||
|
assert!(n1 % ty.bytes() as usize == 0);
|
||||||
|
let ele_count = n1 / ty.bytes() as usize;
|
||||||
|
let mut j = Vec::with_capacity(ele_count);
|
||||||
|
// this is safe for ints and floats
|
||||||
|
unsafe { j.set_len(ele_count); }
|
||||||
|
let mut p1 = 0;
|
||||||
|
for i1 in 0..ele_count {
|
||||||
|
let u = unsafe {
|
||||||
|
let mut r = [0u8; BY];
|
||||||
|
std::ptr::copy_nonoverlapping(&decomp[p1], r.as_mut_ptr(), BY);
|
||||||
|
f64::from_be_bytes(r)
|
||||||
|
//f64::from_be_bytes(std::mem::transmute::<_, [u8; 8]>(&decomp[p1]))
|
||||||
|
};
|
||||||
|
j[i1] = u as f32;
|
||||||
|
p1 += BY;
|
||||||
|
}
|
||||||
|
ret.tss.push(k.tss[i1]);
|
||||||
|
ret.values.push(j);
|
||||||
|
},
|
||||||
|
_ => todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ready(Some(Ok(todo!())))
|
||||||
|
}
|
||||||
|
Ready(Some(Err(e))) => Ready(Some(Err(e))),
|
||||||
|
Ready(None) => Ready(None),
|
||||||
|
Pending => Pending,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait IntoDim0F32Stream {
|
||||||
|
fn into_dim_0_f32_stream(self) -> Dim0F32Stream<Self> where Self: Stream<Item=Result<EventFull, Error>> + Sized;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> IntoDim0F32Stream for T where T: Stream<Item=Result<EventFull, Error>> {
|
||||||
|
|
||||||
|
fn into_dim_0_f32_stream(self) -> Dim0F32Stream<T> {
|
||||||
|
Dim0F32Stream {
|
||||||
|
inp: self,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
pub struct Dim1F32Stream<S> where S: Stream<Item=Result<EventFull, Error>> {
|
||||||
|
inp: S,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> Stream for Dim1F32Stream<S> where S: Stream<Item=Result<EventFull, Error>> + Unpin {
|
||||||
type Item = Result<ValuesDim1, Error>;
|
type Item = Result<ValuesDim1, Error>;
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
@@ -411,17 +533,12 @@ where S: Stream<Item=Result<EventFull, Error>> + Unpin
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub trait IntoDim1F32Stream {
|
pub trait IntoDim1F32Stream {
|
||||||
fn into_dim_1_f32_stream(self) -> Dim1F32Stream<Self>
|
fn into_dim_1_f32_stream(self) -> Dim1F32Stream<Self> where Self: Stream<Item=Result<EventFull, Error>> + Sized;
|
||||||
where Self: Sized,
|
|
||||||
Self: Stream<Item=Result<EventFull, Error>>;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> IntoDim1F32Stream for T
|
impl<T> IntoDim1F32Stream for T where T: Stream<Item=Result<EventFull, Error>> {
|
||||||
where T: Stream<Item=Result<EventFull, Error>>
|
|
||||||
{
|
|
||||||
|
|
||||||
fn into_dim_1_f32_stream(self) -> Dim1F32Stream<T>
|
fn into_dim_1_f32_stream(self) -> Dim1F32Stream<T> {
|
||||||
{
|
|
||||||
Dim1F32Stream {
|
Dim1F32Stream {
|
||||||
inp: self,
|
inp: self,
|
||||||
}
|
}
|
||||||
@@ -647,6 +764,52 @@ const WEEK: u64 = DAY * 7;
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn agg_x_dim_0() {
|
||||||
|
crate::run(async { agg_x_dim_0_inner().await; Ok(()) }).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn agg_x_dim_0_inner() {
|
||||||
|
let query = netpod::AggQuerySingleChannel {
|
||||||
|
ksprefix: "daq_swissfel".into(),
|
||||||
|
keyspace: 2,
|
||||||
|
channel: netpod::Channel {
|
||||||
|
name: "S10BC01-DBAM070:EOM1_T1".into(),
|
||||||
|
backend: "sf-databuffer".into(),
|
||||||
|
},
|
||||||
|
timebin: 18723,
|
||||||
|
tb_file_count: 1,
|
||||||
|
split: 12,
|
||||||
|
tbsize: 1000 * 60 * 60 * 24,
|
||||||
|
buffer_size: 1024 * 4,
|
||||||
|
};
|
||||||
|
let bin_count = 20;
|
||||||
|
let ts1 = query.timebin as u64 * query.tbsize as u64 * MS;
|
||||||
|
let ts2 = ts1 + HOUR * 24;
|
||||||
|
let fut1 = crate::EventBlobsComplete::new(&query)
|
||||||
|
.into_dim_1_f32_stream()
|
||||||
|
//.take(1000)
|
||||||
|
.map(|q| {
|
||||||
|
if let Ok(ref k) = q {
|
||||||
|
//info!("vals: {:?}", k);
|
||||||
|
}
|
||||||
|
q
|
||||||
|
})
|
||||||
|
.into_binned_x_bins_1()
|
||||||
|
.map(|k| {
|
||||||
|
//info!("after X binning {:?}", k.as_ref().unwrap());
|
||||||
|
k
|
||||||
|
})
|
||||||
|
.into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2))
|
||||||
|
.map(|k| {
|
||||||
|
info!("after T binning {:?}", k.as_ref().unwrap());
|
||||||
|
k
|
||||||
|
})
|
||||||
|
.for_each(|k| ready(()));
|
||||||
|
fut1.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn agg_x_dim_1() {
|
fn agg_x_dim_1() {
|
||||||
crate::run(async { agg_x_dim_1_inner().await; Ok(()) }).unwrap();
|
crate::run(async { agg_x_dim_1_inner().await; Ok(()) }).unwrap();
|
||||||
|
|||||||
Reference in New Issue
Block a user