WIP checks but has many todo panics
This commit is contained in:
285
disk/src/agg.rs
285
disk/src/agg.rs
@@ -4,11 +4,13 @@ Aggregation and binning support.
|
||||
|
||||
use super::eventchunker::EventFull;
|
||||
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
|
||||
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
|
||||
use crate::eventchunker::EventChunkerItem;
|
||||
use err::Error;
|
||||
use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::ScalarType;
|
||||
use netpod::{EventDataReadStats, NanoRange};
|
||||
use netpod::{Node, ScalarType};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
@@ -332,19 +334,13 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Dim1F32Stream<S>
|
||||
where
|
||||
S: Stream,
|
||||
{
|
||||
pub struct Dim1F32Stream<S> {
|
||||
inp: S,
|
||||
errored: bool,
|
||||
completed: bool,
|
||||
}
|
||||
|
||||
impl<S> Dim1F32Stream<S>
|
||||
where
|
||||
S: Stream,
|
||||
{
|
||||
impl<S> Dim1F32Stream<S> {
|
||||
pub fn new(inp: S) -> Self {
|
||||
Self {
|
||||
inp,
|
||||
@@ -352,13 +348,82 @@ where
|
||||
completed: false,
|
||||
}
|
||||
}
|
||||
|
||||
fn process_event_data(&mut self, k: &EventFull) -> Result<ValuesDim1, Error> {
|
||||
let mut ret = ValuesDim1::empty();
|
||||
use ScalarType::*;
|
||||
for i1 in 0..k.tss.len() {
|
||||
// TODO iterate sibling arrays after single bounds check
|
||||
let ty = &k.scalar_types[i1];
|
||||
let decomp = k.decomps[i1].as_ref().unwrap();
|
||||
match ty {
|
||||
U16 => {
|
||||
const BY: usize = 2;
|
||||
// do the conversion
|
||||
let n1 = decomp.len();
|
||||
assert!(n1 % ty.bytes() as usize == 0);
|
||||
let ele_count = n1 / ty.bytes() as usize;
|
||||
let mut j = Vec::with_capacity(ele_count);
|
||||
let mut p1 = 0;
|
||||
for _ in 0..ele_count {
|
||||
let u = unsafe {
|
||||
let mut r = [0u8; BY];
|
||||
std::ptr::copy_nonoverlapping(&decomp[p1], r.as_mut_ptr(), BY);
|
||||
u16::from_be_bytes(r)
|
||||
};
|
||||
j.push(u as f32);
|
||||
p1 += BY;
|
||||
}
|
||||
ret.tss.push(k.tss[i1]);
|
||||
ret.values.push(j);
|
||||
}
|
||||
F64 => {
|
||||
const BY: usize = 8;
|
||||
// do the conversion
|
||||
let n1 = decomp.len();
|
||||
assert!(n1 % ty.bytes() as usize == 0);
|
||||
let ele_count = n1 / ty.bytes() as usize;
|
||||
let mut j = Vec::with_capacity(ele_count);
|
||||
unsafe {
|
||||
j.set_len(ele_count);
|
||||
}
|
||||
let mut p1 = 0;
|
||||
for i1 in 0..ele_count {
|
||||
let u = unsafe {
|
||||
let mut r = [0u8; BY];
|
||||
std::ptr::copy_nonoverlapping(&decomp[p1], r.as_mut_ptr(), BY);
|
||||
f64::from_be_bytes(r)
|
||||
//f64::from_be_bytes(std::mem::transmute::<_, [u8; 8]>(&decomp[p1]))
|
||||
};
|
||||
j[i1] = u as f32;
|
||||
p1 += BY;
|
||||
}
|
||||
ret.tss.push(k.tss[i1]);
|
||||
ret.values.push(j);
|
||||
}
|
||||
_ => {
|
||||
let e = Error::with_msg(format!("Dim1F32Stream unhandled scalar type: {:?}", ty));
|
||||
self.errored = true;
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Dim1F32StreamItem {
|
||||
Values(ValuesDim1),
|
||||
RangeComplete,
|
||||
EventDataReadStats(EventDataReadStats),
|
||||
}
|
||||
|
||||
impl<S> Stream for Dim1F32Stream<S>
|
||||
where
|
||||
S: Stream<Item = Result<EventFull, Error>> + Unpin,
|
||||
S: Stream<Item = Result<EventChunkerItem, Error>> + Unpin,
|
||||
{
|
||||
type Item = Result<ValuesDim1, Error>;
|
||||
type Item = Result<Dim1F32StreamItem, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
@@ -372,72 +437,28 @@ where
|
||||
match self.inp.poll_next_unpin(cx) {
|
||||
Ready(Some(Ok(k))) => {
|
||||
let inst1 = Instant::now();
|
||||
let mut ret = ValuesDim1::empty();
|
||||
use ScalarType::*;
|
||||
if k.end_of_range_observed {
|
||||
ret.range_complete_observed = true;
|
||||
}
|
||||
for i1 in 0..k.tss.len() {
|
||||
// TODO iterate sibling arrays after single bounds check
|
||||
let ty = &k.scalar_types[i1];
|
||||
let decomp = k.decomps[i1].as_ref().unwrap();
|
||||
match ty {
|
||||
U16 => {
|
||||
const BY: usize = 2;
|
||||
// do the conversion
|
||||
let n1 = decomp.len();
|
||||
assert!(n1 % ty.bytes() as usize == 0);
|
||||
let ele_count = n1 / ty.bytes() as usize;
|
||||
let mut j = Vec::with_capacity(ele_count);
|
||||
let mut p1 = 0;
|
||||
for _ in 0..ele_count {
|
||||
let u = unsafe {
|
||||
let mut r = [0u8; BY];
|
||||
std::ptr::copy_nonoverlapping(&decomp[p1], r.as_mut_ptr(), BY);
|
||||
u16::from_be_bytes(r)
|
||||
};
|
||||
j.push(u as f32);
|
||||
p1 += BY;
|
||||
}
|
||||
ret.tss.push(k.tss[i1]);
|
||||
ret.values.push(j);
|
||||
let u = match &k {
|
||||
EventChunkerItem::Events(events) => match self.process_event_data(events) {
|
||||
Ok(k) => {
|
||||
let ret = Dim1F32StreamItem::Values(k);
|
||||
Ready(Some(Ok(ret)))
|
||||
}
|
||||
F64 => {
|
||||
const BY: usize = 8;
|
||||
// do the conversion
|
||||
let n1 = decomp.len();
|
||||
assert!(n1 % ty.bytes() as usize == 0);
|
||||
let ele_count = n1 / ty.bytes() as usize;
|
||||
let mut j = Vec::with_capacity(ele_count);
|
||||
unsafe {
|
||||
j.set_len(ele_count);
|
||||
}
|
||||
let mut p1 = 0;
|
||||
for i1 in 0..ele_count {
|
||||
let u = unsafe {
|
||||
let mut r = [0u8; BY];
|
||||
std::ptr::copy_nonoverlapping(&decomp[p1], r.as_mut_ptr(), BY);
|
||||
f64::from_be_bytes(r)
|
||||
//f64::from_be_bytes(std::mem::transmute::<_, [u8; 8]>(&decomp[p1]))
|
||||
};
|
||||
j[i1] = u as f32;
|
||||
p1 += BY;
|
||||
}
|
||||
ret.tss.push(k.tss[i1]);
|
||||
ret.values.push(j);
|
||||
}
|
||||
_ => {
|
||||
let e = Error::with_msg(format!("Dim1F32Stream unhandled scalar type: {:?}", ty));
|
||||
Err(e) => {
|
||||
self.errored = true;
|
||||
return Ready(Some(Err(e)));
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
},
|
||||
EventChunkerItem::RangeComplete => err::todoval(),
|
||||
EventChunkerItem::EventDataReadStats(_stats) => {
|
||||
// TODO ret.event_data_read_stats.trans(&mut k.event_data_read_stats);
|
||||
// TODO ret.values_extract_stats.dur += inst2.duration_since(inst1);
|
||||
err::todoval()
|
||||
}
|
||||
}
|
||||
};
|
||||
let inst2 = Instant::now();
|
||||
let mut k = k;
|
||||
ret.event_data_read_stats.trans(&mut k.event_data_read_stats);
|
||||
ret.values_extract_stats.dur += inst2.duration_since(inst1);
|
||||
Ready(Some(Ok(ret)))
|
||||
// TODO do something with the measured time.
|
||||
let _ = inst2.duration_since(inst1);
|
||||
u
|
||||
}
|
||||
Ready(Some(Err(e))) => {
|
||||
self.errored = true;
|
||||
@@ -455,26 +476,124 @@ where
|
||||
pub trait IntoDim1F32Stream {
|
||||
fn into_dim_1_f32_stream(self) -> Dim1F32Stream<Self>
|
||||
where
|
||||
Self: Stream<Item = Result<EventFull, Error>> + Sized;
|
||||
Self: Stream<Item = Result<EventChunkerItem, Error>> + Sized;
|
||||
}
|
||||
|
||||
impl<T> IntoDim1F32Stream for T
|
||||
where
|
||||
T: Stream<Item = Result<EventFull, Error>>,
|
||||
T: Stream<Item = Result<EventChunkerItem, Error>>,
|
||||
{
|
||||
fn into_dim_1_f32_stream(self) -> Dim1F32Stream<T> {
|
||||
Dim1F32Stream::new(self)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn make_test_node(id: u32) -> Node {
|
||||
Node {
|
||||
host: "localhost".into(),
|
||||
listen: "0.0.0.0".into(),
|
||||
port: 8800 + id as u16,
|
||||
port_raw: 8800 + id as u16 + 100,
|
||||
data_base_path: format!("../tmpdata/node{:02}", id).into(),
|
||||
split: id,
|
||||
ksprefix: "ks".into(),
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum MinMaxAvgScalarEventBatchStreamItem {
|
||||
Values(MinMaxAvgScalarEventBatch),
|
||||
RangeComplete,
|
||||
EventDataReadStats(EventDataReadStats),
|
||||
}
|
||||
|
||||
impl AggregatableXdim1Bin for Dim1F32StreamItem {
|
||||
type Output = MinMaxAvgScalarEventBatchStreamItem;
|
||||
|
||||
fn into_agg(self) -> Self::Output {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum MinMaxAvgScalarBinBatchStreamItem {
|
||||
Values(MinMaxAvgScalarBinBatch),
|
||||
RangeComplete,
|
||||
EventDataReadStats(EventDataReadStats),
|
||||
}
|
||||
|
||||
pub struct MinMaxAvgScalarEventBatchStreamItemAggregator {}
|
||||
|
||||
impl AggregatorTdim for MinMaxAvgScalarEventBatchStreamItemAggregator {
|
||||
type InputValue = MinMaxAvgScalarEventBatchStreamItem;
|
||||
type OutputValue = MinMaxAvgScalarBinBatchStreamItem;
|
||||
|
||||
fn ends_before(&self, _inp: &Self::InputValue) -> bool {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn ends_after(&self, _inp: &Self::InputValue) -> bool {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn starts_after(&self, _inp: &Self::InputValue) -> bool {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn ingest(&mut self, _inp: &mut Self::InputValue) {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn result(self) -> Self::OutputValue {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl AggregatableTdim for MinMaxAvgScalarEventBatchStreamItem {
|
||||
type Output = MinMaxAvgScalarBinBatchStreamItem;
|
||||
type Aggregator = MinMaxAvgScalarEventBatchStreamItemAggregator;
|
||||
|
||||
fn aggregator_new_static(_ts1: u64, _ts2: u64) -> Self::Aggregator {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatchStreamItem {
|
||||
type Output = MinMaxAvgScalarEventBatchStreamItem;
|
||||
|
||||
fn into_agg(self) -> Self::Output {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MinMaxAvgScalarBinBatchStreamItemAggregator {}
|
||||
|
||||
impl AggregatorTdim for MinMaxAvgScalarBinBatchStreamItemAggregator {
|
||||
type InputValue = MinMaxAvgScalarBinBatchStreamItem;
|
||||
type OutputValue = MinMaxAvgScalarBinBatchStreamItem;
|
||||
|
||||
fn ends_before(&self, _inp: &Self::InputValue) -> bool {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn ends_after(&self, _inp: &Self::InputValue) -> bool {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn starts_after(&self, _inp: &Self::InputValue) -> bool {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn ingest(&mut self, _inp: &mut Self::InputValue) {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn result(self) -> Self::OutputValue {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl AggregatableTdim for MinMaxAvgScalarBinBatchStreamItem {
|
||||
type Output = MinMaxAvgScalarBinBatchStreamItem;
|
||||
type Aggregator = MinMaxAvgScalarBinBatchStreamItemAggregator;
|
||||
|
||||
fn aggregator_new_static(_ts1: u64, _ts2: u64) -> Self::Aggregator {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatchStreamItem {
|
||||
type Output = MinMaxAvgScalarBinBatchStreamItem;
|
||||
|
||||
fn into_agg(self) -> Self::Output {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user