Remove Dim0F32Stream and Dim1F32Stream
This commit is contained in:
-304
@@ -99,125 +99,6 @@ pub trait FitsInside {
|
|||||||
fn fits_inside(&self, range: NanoRange) -> Fits;
|
fn fits_inside(&self, range: NanoRange) -> Fits;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Dim0F32Stream<S>
|
|
||||||
where
|
|
||||||
S: Stream<Item = Result<EventFull, Error>>,
|
|
||||||
{
|
|
||||||
inp: S,
|
|
||||||
errored: bool,
|
|
||||||
completed: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S> Dim0F32Stream<S>
|
|
||||||
where
|
|
||||||
S: Stream<Item = Result<EventFull, Error>>,
|
|
||||||
{
|
|
||||||
pub fn new(inp: S) -> Self {
|
|
||||||
Self {
|
|
||||||
inp,
|
|
||||||
errored: false,
|
|
||||||
completed: false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S> Stream for Dim0F32Stream<S>
|
|
||||||
where
|
|
||||||
S: Stream<Item = Result<EventFull, Error>> + Unpin,
|
|
||||||
{
|
|
||||||
type Item = Result<ValuesDim0, Error>;
|
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
|
||||||
use Poll::*;
|
|
||||||
if self.completed {
|
|
||||||
panic!("Dim0F32Stream poll_next on completed");
|
|
||||||
}
|
|
||||||
if self.errored {
|
|
||||||
self.completed = true;
|
|
||||||
return Ready(None);
|
|
||||||
}
|
|
||||||
match self.inp.poll_next_unpin(cx) {
|
|
||||||
Ready(Some(Ok(k))) => {
|
|
||||||
// TODO implement here for dim-0
|
|
||||||
let mut ret = ValuesDim0 {
|
|
||||||
tss: vec![],
|
|
||||||
values: vec![],
|
|
||||||
};
|
|
||||||
use ScalarType::*;
|
|
||||||
for i1 in 0..k.tss.len() {
|
|
||||||
// TODO iterate sibling arrays after single bounds check
|
|
||||||
let ty = &k.scalar_types[i1];
|
|
||||||
let decomp = k.decomps[i1].as_ref().unwrap();
|
|
||||||
match ty {
|
|
||||||
F64 => {
|
|
||||||
const BY: usize = 8;
|
|
||||||
// do the conversion
|
|
||||||
|
|
||||||
// TODO only a scalar!
|
|
||||||
err::todoval::<u32>();
|
|
||||||
|
|
||||||
let n1 = decomp.len();
|
|
||||||
assert!(n1 % ty.bytes() as usize == 0);
|
|
||||||
let ele_count = n1 / ty.bytes() as usize;
|
|
||||||
let mut j = Vec::with_capacity(ele_count);
|
|
||||||
// this is safe for ints and floats
|
|
||||||
unsafe {
|
|
||||||
j.set_len(ele_count);
|
|
||||||
}
|
|
||||||
let mut p1 = 0;
|
|
||||||
for i1 in 0..ele_count {
|
|
||||||
let u = unsafe {
|
|
||||||
let mut r = [0u8; BY];
|
|
||||||
std::ptr::copy_nonoverlapping(&decomp[p1], r.as_mut_ptr(), BY);
|
|
||||||
f64::from_be_bytes(r)
|
|
||||||
//f64::from_be_bytes(std::mem::transmute::<_, [u8; 8]>(&decomp[p1]))
|
|
||||||
};
|
|
||||||
j[i1] = u as f32;
|
|
||||||
p1 += BY;
|
|
||||||
}
|
|
||||||
ret.tss.push(k.tss[i1]);
|
|
||||||
ret.values.push(j);
|
|
||||||
}
|
|
||||||
_ => err::todoval(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
self.errored = true;
|
|
||||||
Ready(Some(Err(Error::with_msg(format!("TODO not yet implemented")))))
|
|
||||||
}
|
|
||||||
Ready(Some(Err(e))) => {
|
|
||||||
self.errored = true;
|
|
||||||
Ready(Some(Err(e)))
|
|
||||||
}
|
|
||||||
Ready(None) => {
|
|
||||||
self.completed = true;
|
|
||||||
Ready(None)
|
|
||||||
}
|
|
||||||
Pending => Pending,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait IntoDim0F32Stream {
|
|
||||||
fn into_dim_0_f32_stream(self) -> Dim0F32Stream<Self>
|
|
||||||
where
|
|
||||||
Self: Stream<Item = Result<EventFull, Error>> + Sized;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> IntoDim0F32Stream for T
|
|
||||||
where
|
|
||||||
T: Stream<Item = Result<EventFull, Error>>,
|
|
||||||
{
|
|
||||||
fn into_dim_0_f32_stream(self) -> Dim0F32Stream<T> {
|
|
||||||
Dim0F32Stream::new(self)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Dim1F32Stream<S> {
|
|
||||||
inp: S,
|
|
||||||
errored: bool,
|
|
||||||
completed: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
trait NumEx {
|
trait NumEx {
|
||||||
const BY: usize;
|
const BY: usize;
|
||||||
}
|
}
|
||||||
@@ -287,188 +168,3 @@ make_get_values!(get_values_i32_be, i32, from_be_bytes, 4);
|
|||||||
make_get_values!(get_values_i64_be, i64, from_be_bytes, 8);
|
make_get_values!(get_values_i64_be, i64, from_be_bytes, 8);
|
||||||
make_get_values!(get_values_f32_be, f32, from_be_bytes, 4);
|
make_get_values!(get_values_f32_be, f32, from_be_bytes, 4);
|
||||||
make_get_values!(get_values_f64_be, f64, from_be_bytes, 8);
|
make_get_values!(get_values_f64_be, f64, from_be_bytes, 8);
|
||||||
|
|
||||||
impl<S> Dim1F32Stream<S> {
|
|
||||||
pub fn new(inp: S) -> Self {
|
|
||||||
Self {
|
|
||||||
inp,
|
|
||||||
errored: false,
|
|
||||||
completed: false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn process_event_data(&mut self, k: &EventFull) -> Result<ValuesDim1, Error> {
|
|
||||||
let mut ret = ValuesDim1::empty();
|
|
||||||
use ScalarType::*;
|
|
||||||
for i1 in 0..k.tss.len() {
|
|
||||||
// TODO iterate sibling arrays after single bounds check
|
|
||||||
let ty = &k.scalar_types[i1];
|
|
||||||
let be = k.be[i1];
|
|
||||||
let decomp = k.decomps[i1].as_ref().unwrap();
|
|
||||||
match ty {
|
|
||||||
U8 => {
|
|
||||||
let value = if be {
|
|
||||||
get_values_u8_be(decomp, ty)?
|
|
||||||
} else {
|
|
||||||
get_values_u8_le(decomp, ty)?
|
|
||||||
};
|
|
||||||
ret.tss.push(k.tss[i1]);
|
|
||||||
ret.values.push(value);
|
|
||||||
}
|
|
||||||
U16 => {
|
|
||||||
let value = if be {
|
|
||||||
get_values_u16_be(decomp, ty)?
|
|
||||||
} else {
|
|
||||||
get_values_u16_le(decomp, ty)?
|
|
||||||
};
|
|
||||||
ret.tss.push(k.tss[i1]);
|
|
||||||
ret.values.push(value);
|
|
||||||
}
|
|
||||||
U32 => {
|
|
||||||
let value = if be {
|
|
||||||
get_values_u32_be(decomp, ty)?
|
|
||||||
} else {
|
|
||||||
get_values_u32_le(decomp, ty)?
|
|
||||||
};
|
|
||||||
ret.tss.push(k.tss[i1]);
|
|
||||||
ret.values.push(value);
|
|
||||||
}
|
|
||||||
U64 => {
|
|
||||||
let value = if be {
|
|
||||||
get_values_u64_be(decomp, ty)?
|
|
||||||
} else {
|
|
||||||
get_values_u64_le(decomp, ty)?
|
|
||||||
};
|
|
||||||
ret.tss.push(k.tss[i1]);
|
|
||||||
ret.values.push(value);
|
|
||||||
}
|
|
||||||
I8 => {
|
|
||||||
let value = if be {
|
|
||||||
get_values_i8_be(decomp, ty)?
|
|
||||||
} else {
|
|
||||||
get_values_i8_le(decomp, ty)?
|
|
||||||
};
|
|
||||||
ret.tss.push(k.tss[i1]);
|
|
||||||
ret.values.push(value);
|
|
||||||
}
|
|
||||||
I16 => {
|
|
||||||
let value = if be {
|
|
||||||
get_values_i16_be(decomp, ty)?
|
|
||||||
} else {
|
|
||||||
get_values_i16_le(decomp, ty)?
|
|
||||||
};
|
|
||||||
ret.tss.push(k.tss[i1]);
|
|
||||||
ret.values.push(value);
|
|
||||||
}
|
|
||||||
I32 => {
|
|
||||||
let value = if be {
|
|
||||||
get_values_i32_be(decomp, ty)?
|
|
||||||
} else {
|
|
||||||
get_values_i32_le(decomp, ty)?
|
|
||||||
};
|
|
||||||
ret.tss.push(k.tss[i1]);
|
|
||||||
ret.values.push(value);
|
|
||||||
}
|
|
||||||
I64 => {
|
|
||||||
let value = if be {
|
|
||||||
get_values_i64_be(decomp, ty)?
|
|
||||||
} else {
|
|
||||||
get_values_i64_le(decomp, ty)?
|
|
||||||
};
|
|
||||||
ret.tss.push(k.tss[i1]);
|
|
||||||
ret.values.push(value);
|
|
||||||
}
|
|
||||||
F32 => {
|
|
||||||
let value = if be {
|
|
||||||
get_values_f32_be(decomp, ty)?
|
|
||||||
} else {
|
|
||||||
get_values_f32_le(decomp, ty)?
|
|
||||||
};
|
|
||||||
ret.tss.push(k.tss[i1]);
|
|
||||||
ret.values.push(value);
|
|
||||||
}
|
|
||||||
F64 => {
|
|
||||||
let value = if be {
|
|
||||||
get_values_f64_be(decomp, ty)?
|
|
||||||
} else {
|
|
||||||
get_values_f64_le(decomp, ty)?
|
|
||||||
};
|
|
||||||
ret.tss.push(k.tss[i1]);
|
|
||||||
ret.values.push(value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(ret)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S> Stream for Dim1F32Stream<S>
|
|
||||||
where
|
|
||||||
S: Stream<Item = Result<StreamItem<RangeCompletableItem<EventFull>>, Error>> + Unpin,
|
|
||||||
{
|
|
||||||
type Item = Result<StreamItem<RangeCompletableItem<ValuesDim1>>, Error>;
|
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
|
||||||
use Poll::*;
|
|
||||||
if self.completed {
|
|
||||||
panic!("Dim1F32Stream poll_next on completed");
|
|
||||||
}
|
|
||||||
if self.errored {
|
|
||||||
self.completed = true;
|
|
||||||
return Ready(None);
|
|
||||||
}
|
|
||||||
match self.inp.poll_next_unpin(cx) {
|
|
||||||
Ready(Some(Ok(k))) => {
|
|
||||||
let inst1 = Instant::now();
|
|
||||||
let u = match k {
|
|
||||||
StreamItem::DataItem(item) => match item {
|
|
||||||
RangeCompletableItem::RangeComplete => {
|
|
||||||
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
|
|
||||||
}
|
|
||||||
RangeCompletableItem::Data(item) => match self.process_event_data(&item) {
|
|
||||||
Ok(item) => {
|
|
||||||
let ret = RangeCompletableItem::Data(item);
|
|
||||||
let ret = StreamItem::DataItem(ret);
|
|
||||||
Ready(Some(Ok(ret)))
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
self.errored = true;
|
|
||||||
Ready(Some(Err(e)))
|
|
||||||
}
|
|
||||||
},
|
|
||||||
},
|
|
||||||
StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))),
|
|
||||||
StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))),
|
|
||||||
};
|
|
||||||
let inst2 = Instant::now();
|
|
||||||
// TODO do something with the measured time.
|
|
||||||
let _ = inst2.duration_since(inst1);
|
|
||||||
u
|
|
||||||
}
|
|
||||||
Ready(Some(Err(e))) => {
|
|
||||||
self.errored = true;
|
|
||||||
Ready(Some(Err(e)))
|
|
||||||
}
|
|
||||||
Ready(None) => {
|
|
||||||
self.completed = true;
|
|
||||||
Ready(None)
|
|
||||||
}
|
|
||||||
Pending => Pending,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait IntoDim1F32Stream {
|
|
||||||
fn into_dim_1_f32_stream(self) -> Dim1F32Stream<Self>
|
|
||||||
where
|
|
||||||
Self: Stream<Item = Result<StreamItem<RangeCompletableItem<EventFull>>, Error>> + Sized;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> IntoDim1F32Stream for T
|
|
||||||
where
|
|
||||||
T: Stream<Item = Result<StreamItem<RangeCompletableItem<EventFull>>, Error>>,
|
|
||||||
{
|
|
||||||
fn into_dim_1_f32_stream(self) -> Dim1F32Stream<T> {
|
|
||||||
Dim1F32Stream::new(self)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
+1
-12
@@ -1,4 +1,3 @@
|
|||||||
use super::agg::IntoDim1F32Stream;
|
|
||||||
use crate::binned::BinnedStreamKindScalar;
|
use crate::binned::BinnedStreamKindScalar;
|
||||||
use crate::eventblobs::EventBlobsComplete;
|
use crate::eventblobs::EventBlobsComplete;
|
||||||
use crate::eventchunker::EventChunkerConf;
|
use crate::eventchunker::EventChunkerConf;
|
||||||
@@ -111,16 +110,6 @@ async fn agg_x_dim_1_inner() {
|
|||||||
0,
|
0,
|
||||||
query.buffer_size as usize,
|
query.buffer_size as usize,
|
||||||
event_chunker_conf,
|
event_chunker_conf,
|
||||||
)
|
);
|
||||||
.into_dim_1_f32_stream()
|
|
||||||
//.take(1000)
|
|
||||||
.map(|q| {
|
|
||||||
if false {
|
|
||||||
if let Ok(ref k) = q {
|
|
||||||
info!("vals: {:?}", k);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
q
|
|
||||||
});
|
|
||||||
// TODO add the binning and expectation and await the result.
|
// TODO add the binning and expectation and await the result.
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
use crate::agg::enp::{Identity, WaveXBinner};
|
use crate::agg::enp::{Identity, WaveXBinner};
|
||||||
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
|
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
|
||||||
use crate::agg::streams::StreamItem;
|
use crate::agg::streams::StreamItem;
|
||||||
use crate::agg::IntoDim1F32Stream;
|
|
||||||
use crate::binned::{BinnedStreamKindScalar, EventsNodeProcessor, NumOps, RangeCompletableItem, StreamKind};
|
use crate::binned::{BinnedStreamKindScalar, EventsNodeProcessor, NumOps, RangeCompletableItem, StreamKind};
|
||||||
use crate::decode::{
|
use crate::decode::{
|
||||||
BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case,
|
BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case,
|
||||||
|
|||||||
Reference in New Issue
Block a user