WIP
This commit is contained in:
@@ -28,6 +28,7 @@ netpod = { path = "../netpod" }
|
||||
httpret = { path = "../httpret" }
|
||||
httpclient = { path = "../httpclient" }
|
||||
disk = { path = "../disk" }
|
||||
items_0 = { path = "../items_0" }
|
||||
items = { path = "../items" }
|
||||
streams = { path = "../streams" }
|
||||
|
||||
|
||||
@@ -8,7 +8,8 @@ use http::StatusCode;
|
||||
use httpclient::HttpBodyAsAsyncRead;
|
||||
use hyper::Body;
|
||||
use items::binsdim0::MinMaxAvgDim0Bins;
|
||||
use items::{RangeCompletableItem, Sitemty, StatsItem, StreamItem, SubFrId, WithLen};
|
||||
use items::{RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithLen};
|
||||
use items_0::subfr::SubFrId;
|
||||
use netpod::log::*;
|
||||
use netpod::query::{BinnedQuery, CacheUsage};
|
||||
use netpod::AppendToUrl;
|
||||
|
||||
@@ -41,6 +41,7 @@ bitshuffle = { path = "../bitshuffle" }
|
||||
dbconn = { path = "../dbconn" }
|
||||
parse = { path = "../parse" }
|
||||
items = { path = "../items" }
|
||||
items_0 = { path = "../items_0" }
|
||||
items_2 = { path = "../items_2" }
|
||||
streams = { path = "../streams" }
|
||||
httpclient = { path = "../httpclient" }
|
||||
|
||||
@@ -11,6 +11,7 @@ use netpod::log::*;
|
||||
use netpod::query::RawEventsQuery;
|
||||
use netpod::{AggKind, ByteOrder, ByteSize, Channel, DiskIoTune, NanoRange, NodeConfigCached, ScalarType, Shape};
|
||||
use parse::channelconfig::{extract_matching_config_entry, read_local_config, ConfigEntry, MatchingConfigEntry};
|
||||
use std::collections::VecDeque;
|
||||
use std::pin::Pin;
|
||||
use streams::eventchunker::EventChunkerConf;
|
||||
|
||||
@@ -33,7 +34,16 @@ where
|
||||
StreamItem::DataItem(item) => match item {
|
||||
RangeCompletableItem::Data(item) => {
|
||||
let item = events_node_proc.process(item);
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
|
||||
use items::EventsNodeProcessorOutput;
|
||||
let parts = item.into_parts::<NTY>();
|
||||
let item = items_2::eventsdim0::EventsDim0 {
|
||||
tss: parts.1,
|
||||
pulses: VecDeque::new(),
|
||||
values: parts.0,
|
||||
};
|
||||
let item = Box::new(item) as Box<dyn items_0::Events>;
|
||||
//Ok(StreamItem::DataItem(RangeCompletableItem::Data(todo!())))
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
|
||||
}
|
||||
RangeCompletableItem::RangeComplete => Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)),
|
||||
},
|
||||
|
||||
@@ -10,7 +10,6 @@ use crate::IsoDateTime;
|
||||
use crate::ReadPbv;
|
||||
use crate::ReadableFromFile;
|
||||
use crate::Sitemty;
|
||||
use crate::SubFrId;
|
||||
use crate::TimeBinnableDyn;
|
||||
use crate::{ts_offs_from_abs, FrameType};
|
||||
use crate::{NewEmpty, RangeOverlapInfo, WithLen};
|
||||
@@ -18,6 +17,7 @@ use crate::{TimeBinnableType, TimeBinnableTypeAggregator};
|
||||
use crate::{TimeBinned, TimeBinnerDyn, TimeBins};
|
||||
use chrono::{TimeZone, Utc};
|
||||
use err::Error;
|
||||
use items_0::subfr::SubFrId;
|
||||
use netpod::log::*;
|
||||
use netpod::timeunits::SEC;
|
||||
use netpod::{NanoRange, Shape};
|
||||
|
||||
@@ -14,9 +14,10 @@ use crate::TimeBinnableType;
|
||||
use crate::TimeBinnableTypeAggregator;
|
||||
use crate::TimeBins;
|
||||
use crate::{pulse_offs_from_abs, FrameType};
|
||||
use crate::{Fits, FitsInside, NewEmpty, ReadPbv, Sitemty, SubFrId, TimeBinned, WithLen};
|
||||
use crate::{Fits, FitsInside, NewEmpty, ReadPbv, Sitemty, TimeBinned, WithLen};
|
||||
use chrono::{TimeZone, Utc};
|
||||
use err::Error;
|
||||
use items_0::subfr::SubFrId;
|
||||
use netpod::log::*;
|
||||
use netpod::timeunits::SEC;
|
||||
use netpod::{NanoRange, Shape};
|
||||
|
||||
@@ -15,7 +15,6 @@ pub mod xbinnedscalarevents;
|
||||
pub mod xbinnedwaveevents;
|
||||
|
||||
use crate::frame::make_frame_2;
|
||||
use crate::numops::BoolNum;
|
||||
use bytes::BytesMut;
|
||||
use chrono::{TimeZone, Utc};
|
||||
use err::Error;
|
||||
@@ -25,10 +24,10 @@ use netpod::log::*;
|
||||
use netpod::timeunits::{MS, SEC};
|
||||
use netpod::{log::Level, AggKind, EventDataReadStats, NanoRange, Shape};
|
||||
use netpod::{DiskStats, RangeFilterStats, ScalarType};
|
||||
use numops::StringNum;
|
||||
use serde::de::{self, DeserializeOwned, Visitor};
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
use std::any::Any;
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::marker::PhantomData;
|
||||
@@ -190,58 +189,6 @@ pub const INMEM_FRAME_HEAD: usize = 20;
|
||||
pub const INMEM_FRAME_FOOT: usize = 4;
|
||||
pub const INMEM_FRAME_MAGIC: u32 = 0xc6c3b73d;
|
||||
|
||||
pub trait SubFrId {
|
||||
const SUB: u32;
|
||||
}
|
||||
|
||||
impl SubFrId for u8 {
|
||||
const SUB: u32 = 0x03;
|
||||
}
|
||||
|
||||
impl SubFrId for u16 {
|
||||
const SUB: u32 = 0x05;
|
||||
}
|
||||
|
||||
impl SubFrId for u32 {
|
||||
const SUB: u32 = 0x08;
|
||||
}
|
||||
|
||||
impl SubFrId for u64 {
|
||||
const SUB: u32 = 0x0a;
|
||||
}
|
||||
|
||||
impl SubFrId for i8 {
|
||||
const SUB: u32 = 0x02;
|
||||
}
|
||||
|
||||
impl SubFrId for i16 {
|
||||
const SUB: u32 = 0x04;
|
||||
}
|
||||
|
||||
impl SubFrId for i32 {
|
||||
const SUB: u32 = 0x07;
|
||||
}
|
||||
|
||||
impl SubFrId for i64 {
|
||||
const SUB: u32 = 0x09;
|
||||
}
|
||||
|
||||
impl SubFrId for f32 {
|
||||
const SUB: u32 = 0x0b;
|
||||
}
|
||||
|
||||
impl SubFrId for f64 {
|
||||
const SUB: u32 = 0x0c;
|
||||
}
|
||||
|
||||
impl SubFrId for StringNum {
|
||||
const SUB: u32 = 0x0d;
|
||||
}
|
||||
|
||||
impl SubFrId for BoolNum {
|
||||
const SUB: u32 = 0x0e;
|
||||
}
|
||||
|
||||
// Required for any inner type of Sitemty.
|
||||
pub trait FrameTypeInnerStatic {
|
||||
const FRAME_TYPE_ID: u32;
|
||||
@@ -421,6 +368,7 @@ impl FrameType for EventQueryJsonStringFrame {
|
||||
pub trait EventsNodeProcessorOutput:
|
||||
Send + Unpin + DeserializeOwned + WithTimestamps + TimeBinnableType + ByteEstimate
|
||||
{
|
||||
fn into_parts<NTY>(self) -> (VecDeque<NTY>, VecDeque<u64>);
|
||||
}
|
||||
|
||||
pub trait EventsNodeProcessor: Send + Unpin {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::SubFrId;
|
||||
use items_0::subfr::SubFrId;
|
||||
use num_traits::{Bounded, Float, Zero};
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -123,6 +123,7 @@ pub trait NumOps:
|
||||
+ SubFrId
|
||||
+ Serialize
|
||||
+ DeserializeOwned
|
||||
+ items_0::scalar_ops::ScalarOps
|
||||
{
|
||||
fn min_or_nan() -> Self;
|
||||
fn max_or_nan() -> Self;
|
||||
@@ -203,3 +204,43 @@ impl_num_ops!(f32, NAN, NAN, is_nan_float);
|
||||
impl_num_ops!(f64, NAN, NAN, is_nan_float);
|
||||
impl_num_ops!(BoolNum, MIN, MAX, is_nan_int);
|
||||
impl_num_ops!(StringNum, MIN, MAX, is_nan_int);
|
||||
|
||||
impl SubFrId for StringNum {
|
||||
const SUB: u32 = 0x0d;
|
||||
}
|
||||
|
||||
impl SubFrId for BoolNum {
|
||||
const SUB: u32 = 0x0e;
|
||||
}
|
||||
|
||||
impl items_0::scalar_ops::AsPrimF32 for BoolNum {
|
||||
fn as_prim_f32_b(&self) -> f32 {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl items_0::scalar_ops::AsPrimF32 for StringNum {
|
||||
fn as_prim_f32_b(&self) -> f32 {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl items_0::scalar_ops::ScalarOps for BoolNum {
|
||||
fn zero_b() -> Self {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn equal_slack(&self, _rhs: &Self) -> bool {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl items_0::scalar_ops::ScalarOps for StringNum {
|
||||
fn zero_b() -> Self {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn equal_slack(&self, _rhs: &Self) -> bool {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -823,4 +823,11 @@ impl<NTY: NumOps + 'static> TimeBinnerDyn for ScalarEventsTimeBinner<NTY> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY> EventsNodeProcessorOutput for ScalarEvents<NTY> where NTY: NumOps {}
|
||||
impl<NTY> EventsNodeProcessorOutput for ScalarEvents<NTY>
|
||||
where
|
||||
NTY: NumOps,
|
||||
{
|
||||
fn into_parts<NTY2>(self) -> (VecDeque<NTY2>, VecDeque<u64>) {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ use err::Error;
|
||||
use netpod::log::*;
|
||||
use netpod::{NanoRange, Shape};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
use tokio::fs::File;
|
||||
|
||||
@@ -421,4 +422,8 @@ impl EventAppendable for StatsEvents {
|
||||
}
|
||||
}
|
||||
|
||||
impl EventsNodeProcessorOutput for StatsEvents {}
|
||||
impl EventsNodeProcessorOutput for StatsEvents {
|
||||
fn into_parts<NTY2>(self) -> (VecDeque<NTY2>, VecDeque<u64>) {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,13 +5,14 @@ use crate::xbinnedwaveevents::XBinnedWaveEvents;
|
||||
use crate::{
|
||||
Appendable, ByteEstimate, Clearable, EventAppendable, EventsDyn, EventsNodeProcessor, EventsNodeProcessorOutput,
|
||||
FilterFittingInside, Fits, FitsInside, FrameType, FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo,
|
||||
ReadPbv, ReadableFromFile, SubFrId, TimeBinnableDyn, TimeBinnableType, TimeBinnableTypeAggregator, WithLen,
|
||||
WithTimestamps,
|
||||
ReadPbv, ReadableFromFile, TimeBinnableDyn, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps,
|
||||
};
|
||||
use err::Error;
|
||||
use items_0::subfr::SubFrId;
|
||||
use netpod::log::*;
|
||||
use netpod::{x_bin_count, AggKind, NanoRange, Shape};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::VecDeque;
|
||||
use std::marker::PhantomData;
|
||||
use tokio::fs::File;
|
||||
|
||||
@@ -535,4 +536,11 @@ impl<NTY: NumOps> EventsDyn for WaveEvents<NTY> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY> EventsNodeProcessorOutput for WaveEvents<NTY> where NTY: NumOps {}
|
||||
impl<NTY> EventsNodeProcessorOutput for WaveEvents<NTY>
|
||||
where
|
||||
NTY: NumOps,
|
||||
{
|
||||
fn into_parts<NTY2>(self) -> (VecDeque<NTY2>, VecDeque<u64>) {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
use std::collections::VecDeque;
|
||||
|
||||
use crate::binsdim0::MinMaxAvgDim0Bins;
|
||||
use crate::numops::NumOps;
|
||||
use crate::streams::{Collectable, Collector};
|
||||
use crate::{
|
||||
ts_offs_from_abs, Appendable, ByteEstimate, Clearable, EventsNodeProcessorOutput, FilterFittingInside, Fits,
|
||||
FitsInside, FrameType, FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile,
|
||||
SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps,
|
||||
TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps,
|
||||
};
|
||||
use err::Error;
|
||||
use items_0::subfr::SubFrId;
|
||||
use netpod::log::*;
|
||||
use netpod::{NanoRange, Shape};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -502,4 +505,11 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY> EventsNodeProcessorOutput for XBinnedScalarEvents<NTY> where NTY: NumOps {}
|
||||
impl<NTY> EventsNodeProcessorOutput for XBinnedScalarEvents<NTY>
|
||||
where
|
||||
NTY: NumOps,
|
||||
{
|
||||
fn into_parts<NTY2>(self) -> (VecDeque<NTY2>, VecDeque<u64>) {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,14 +3,16 @@ use crate::numops::NumOps;
|
||||
use crate::streams::{Collectable, Collector};
|
||||
use crate::{
|
||||
Appendable, ByteEstimate, Clearable, EventsNodeProcessorOutput, FilterFittingInside, Fits, FitsInside, FrameType,
|
||||
FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SubFrId,
|
||||
TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps,
|
||||
FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, TimeBinnableType,
|
||||
TimeBinnableTypeAggregator, WithLen, WithTimestamps,
|
||||
};
|
||||
use err::Error;
|
||||
use items_0::subfr::SubFrId;
|
||||
use netpod::log::*;
|
||||
use netpod::timeunits::*;
|
||||
use netpod::{NanoRange, Shape};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::VecDeque;
|
||||
use std::mem;
|
||||
use tokio::fs::File;
|
||||
|
||||
@@ -533,4 +535,11 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY> EventsNodeProcessorOutput for XBinnedWaveEvents<NTY> where NTY: NumOps {}
|
||||
impl<NTY> EventsNodeProcessorOutput for XBinnedWaveEvents<NTY>
|
||||
where
|
||||
NTY: NumOps,
|
||||
{
|
||||
fn into_parts<NTY2>(self) -> (VecDeque<NTY2>, VecDeque<u64>) {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
pub mod collect_c;
|
||||
pub mod collect_s;
|
||||
pub mod scalar_ops;
|
||||
pub mod subfr;
|
||||
|
||||
use collect_c::CollectableWithDefault;
|
||||
use collect_s::Collectable;
|
||||
|
||||
87
items_0/src/scalar_ops.rs
Normal file
87
items_0/src/scalar_ops.rs
Normal file
@@ -0,0 +1,87 @@
|
||||
use crate::subfr::SubFrId;
|
||||
use serde::Serialize;
|
||||
use std::fmt;
|
||||
|
||||
#[allow(unused)]
|
||||
const fn is_nan_int<T>(_x: &T) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
fn is_nan_f32(x: f32) -> bool {
|
||||
x.is_nan()
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
fn is_nan_f64(x: f64) -> bool {
|
||||
x.is_nan()
|
||||
}
|
||||
|
||||
pub trait AsPrimF32 {
|
||||
fn as_prim_f32_b(&self) -> f32;
|
||||
}
|
||||
|
||||
macro_rules! impl_as_prim_f32 {
|
||||
($ty:ident) => {
|
||||
impl AsPrimF32 for $ty {
|
||||
fn as_prim_f32_b(&self) -> f32 {
|
||||
*self as f32
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
impl_as_prim_f32!(u8);
|
||||
impl_as_prim_f32!(u16);
|
||||
impl_as_prim_f32!(u32);
|
||||
impl_as_prim_f32!(u64);
|
||||
impl_as_prim_f32!(i8);
|
||||
impl_as_prim_f32!(i16);
|
||||
impl_as_prim_f32!(i32);
|
||||
impl_as_prim_f32!(i64);
|
||||
impl_as_prim_f32!(f32);
|
||||
impl_as_prim_f32!(f64);
|
||||
|
||||
pub trait ScalarOps:
|
||||
fmt::Debug + Clone + PartialOrd + SubFrId + AsPrimF32 + Serialize + Unpin + Send + 'static
|
||||
{
|
||||
fn zero_b() -> Self;
|
||||
fn equal_slack(&self, rhs: &Self) -> bool;
|
||||
}
|
||||
|
||||
macro_rules! impl_num_ops {
|
||||
($ty:ident, $zero:expr, $equal_slack:ident) => {
|
||||
impl ScalarOps for $ty {
|
||||
fn zero_b() -> Self {
|
||||
$zero
|
||||
}
|
||||
|
||||
fn equal_slack(&self, rhs: &Self) -> bool {
|
||||
$equal_slack(*self, *rhs)
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
fn equal_int<T: PartialEq>(a: T, b: T) -> bool {
|
||||
a == b
|
||||
}
|
||||
|
||||
fn equal_f32(a: f32, b: f32) -> bool {
|
||||
(a - b).abs() < 1e-4 || (a / b > 0.999 && a / b < 1.001)
|
||||
}
|
||||
|
||||
fn equal_f64(a: f64, b: f64) -> bool {
|
||||
(a - b).abs() < 1e-6 || (a / b > 0.99999 && a / b < 1.00001)
|
||||
}
|
||||
|
||||
impl_num_ops!(u8, 0, equal_int);
|
||||
impl_num_ops!(u16, 0, equal_int);
|
||||
impl_num_ops!(u32, 0, equal_int);
|
||||
impl_num_ops!(u64, 0, equal_int);
|
||||
impl_num_ops!(i8, 0, equal_int);
|
||||
impl_num_ops!(i16, 0, equal_int);
|
||||
impl_num_ops!(i32, 0, equal_int);
|
||||
impl_num_ops!(i64, 0, equal_int);
|
||||
impl_num_ops!(f32, 0., equal_f32);
|
||||
impl_num_ops!(f64, 0., equal_f64);
|
||||
43
items_0/src/subfr.rs
Normal file
43
items_0/src/subfr.rs
Normal file
@@ -0,0 +1,43 @@
|
||||
pub trait SubFrId {
|
||||
const SUB: u32;
|
||||
}
|
||||
|
||||
impl SubFrId for u8 {
|
||||
const SUB: u32 = 0x03;
|
||||
}
|
||||
|
||||
impl SubFrId for u16 {
|
||||
const SUB: u32 = 0x05;
|
||||
}
|
||||
|
||||
impl SubFrId for u32 {
|
||||
const SUB: u32 = 0x08;
|
||||
}
|
||||
|
||||
impl SubFrId for u64 {
|
||||
const SUB: u32 = 0x0a;
|
||||
}
|
||||
|
||||
impl SubFrId for i8 {
|
||||
const SUB: u32 = 0x02;
|
||||
}
|
||||
|
||||
impl SubFrId for i16 {
|
||||
const SUB: u32 = 0x04;
|
||||
}
|
||||
|
||||
impl SubFrId for i32 {
|
||||
const SUB: u32 = 0x07;
|
||||
}
|
||||
|
||||
impl SubFrId for i64 {
|
||||
const SUB: u32 = 0x09;
|
||||
}
|
||||
|
||||
impl SubFrId for f32 {
|
||||
const SUB: u32 = 0x0b;
|
||||
}
|
||||
|
||||
impl SubFrId for f64 {
|
||||
const SUB: u32 = 0x0c;
|
||||
}
|
||||
@@ -1,9 +1,10 @@
|
||||
use crate::{ts_offs_from_abs, ts_offs_from_abs_with_anchor};
|
||||
use crate::{IsoDateTime, RangeOverlapInfo, ScalarOps};
|
||||
use crate::{IsoDateTime, RangeOverlapInfo};
|
||||
use crate::{TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinner};
|
||||
use chrono::{TimeZone, Utc};
|
||||
use err::Error;
|
||||
use items_0::collect_s::{Collectable, CollectableType, CollectorType, ToJsonResult};
|
||||
use items_0::scalar_ops::ScalarOps;
|
||||
use items_0::AppendEmptyBin;
|
||||
use items_0::Empty;
|
||||
use items_0::TimeBinned;
|
||||
@@ -74,8 +75,8 @@ impl<NTY: ScalarOps> BinsDim0<NTY> {
|
||||
self.ts1s.push_back(beg);
|
||||
self.ts2s.push_back(end);
|
||||
self.counts.push_back(0);
|
||||
self.mins.push_back(NTY::zero());
|
||||
self.maxs.push_back(NTY::zero());
|
||||
self.mins.push_back(NTY::zero_b());
|
||||
self.maxs.push_back(NTY::zero_b());
|
||||
self.avgs.push_back(0.);
|
||||
}
|
||||
|
||||
@@ -168,8 +169,8 @@ impl<NTY: ScalarOps> AppendEmptyBin for BinsDim0<NTY> {
|
||||
self.ts1s.push_back(ts1);
|
||||
self.ts2s.push_back(ts2);
|
||||
self.counts.push_back(0);
|
||||
self.mins.push_back(NTY::zero());
|
||||
self.maxs.push_back(NTY::zero());
|
||||
self.mins.push_back(NTY::zero_b());
|
||||
self.maxs.push_back(NTY::zero_b());
|
||||
self.avgs.push_back(0.);
|
||||
}
|
||||
}
|
||||
@@ -393,8 +394,8 @@ impl<NTY: ScalarOps> BinsDim0Aggregator<NTY> {
|
||||
Self {
|
||||
range,
|
||||
count: 0,
|
||||
min: NTY::zero(),
|
||||
max: NTY::zero(),
|
||||
min: NTY::zero_b(),
|
||||
max: NTY::zero_b(),
|
||||
avg: 0.,
|
||||
sumc: 0,
|
||||
sum: 0f32,
|
||||
@@ -656,12 +657,12 @@ impl<NTY: ScalarOps> TimeBinned for BinsDim0<NTY> {
|
||||
|
||||
// TODO is Vec needed?
|
||||
fn mins(&self) -> Vec<f32> {
|
||||
self.mins.iter().map(|x| x.clone().as_prim_f32()).collect()
|
||||
self.mins.iter().map(|x| x.clone().as_prim_f32_b()).collect()
|
||||
}
|
||||
|
||||
// TODO is Vec needed?
|
||||
fn maxs(&self) -> Vec<f32> {
|
||||
self.maxs.iter().map(|x| x.clone().as_prim_f32()).collect()
|
||||
self.maxs.iter().map(|x| x.clone().as_prim_f32_b()).collect()
|
||||
}
|
||||
|
||||
// TODO is Vec needed?
|
||||
@@ -676,7 +677,7 @@ impl<NTY: ScalarOps> TimeBinned for BinsDim0<NTY> {
|
||||
write!(&mut msg, "ts1s ≠ ts2s\n").unwrap();
|
||||
}
|
||||
for (i, ((count, min), max)) in self.counts.iter().zip(&self.mins).zip(&self.maxs).enumerate() {
|
||||
if min.as_prim_f32() < 1. && *count != 0 {
|
||||
if min.as_prim_f32_b() < 1. && *count != 0 {
|
||||
write!(&mut msg, "i {} count {} min {:?} max {:?}\n", i, count, min, max).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,7 +97,7 @@ mod serde_channel_events {
|
||||
where
|
||||
A: de::SeqAccess<'de>,
|
||||
{
|
||||
use items::SubFrId;
|
||||
use items_0::subfr::SubFrId;
|
||||
let e0: &str = seq.next_element()?.ok_or(de::Error::missing_field("ty .0"))?;
|
||||
let e1: u32 = seq.next_element()?.ok_or(de::Error::missing_field("nty .1"))?;
|
||||
if e0 == EventsDim0::<u8>::serde_id() {
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use crate::binsdim0::BinsDim0;
|
||||
use crate::ScalarOps;
|
||||
use crate::{pulse_offs_from_abs, ts_offs_from_abs, RangeOverlapInfo};
|
||||
use crate::{TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinner};
|
||||
use err::Error;
|
||||
use items_0::scalar_ops::ScalarOps;
|
||||
use items_0::{Empty, Events, WithLen};
|
||||
use netpod::log::*;
|
||||
use netpod::timeunits::SEC;
|
||||
@@ -292,8 +292,8 @@ impl<NTY: ScalarOps> EventsDim0Aggregator<NTY> {
|
||||
Self {
|
||||
range,
|
||||
count: 0,
|
||||
min: NTY::zero(),
|
||||
max: NTY::zero(),
|
||||
min: NTY::zero_b(),
|
||||
max: NTY::zero_b(),
|
||||
sum: 0.,
|
||||
sumc: 0,
|
||||
int_ts,
|
||||
@@ -334,7 +334,7 @@ impl<NTY: ScalarOps> EventsDim0Aggregator<NTY> {
|
||||
fn apply_event_unweight(&mut self, val: NTY) {
|
||||
trace!("TODO check again result_reset_unweight");
|
||||
err::todo();
|
||||
let vf = val.as_prim_f32();
|
||||
let vf = val.as_prim_f32_b();
|
||||
self.apply_min_max(val);
|
||||
if vf.is_nan() {
|
||||
} else {
|
||||
@@ -345,7 +345,7 @@ impl<NTY: ScalarOps> EventsDim0Aggregator<NTY> {
|
||||
|
||||
fn apply_event_time_weight(&mut self, ts: u64) {
|
||||
if let Some(v) = &self.last_seen_val {
|
||||
let vf = v.as_prim_f32();
|
||||
let vf = v.as_prim_f32_b();
|
||||
let v2 = v.clone();
|
||||
if ts > self.range.beg {
|
||||
self.apply_min_max(v2);
|
||||
@@ -435,9 +435,9 @@ impl<NTY: ScalarOps> EventsDim0Aggregator<NTY> {
|
||||
} else {
|
||||
let g = match &self.last_seen_val {
|
||||
Some(x) => x.clone(),
|
||||
None => NTY::zero(),
|
||||
None => NTY::zero_b(),
|
||||
};
|
||||
(g.clone(), g.clone(), g.as_prim_f32())
|
||||
(g.clone(), g.clone(), g.as_prim_f32_b())
|
||||
};
|
||||
let ret = BinsDim0 {
|
||||
ts1s: [self.range.beg].into(),
|
||||
@@ -470,9 +470,9 @@ impl<NTY: ScalarOps> EventsDim0Aggregator<NTY> {
|
||||
} else {
|
||||
let g = match &self.last_seen_val {
|
||||
Some(x) => x.clone(),
|
||||
None => NTY::zero(),
|
||||
None => NTY::zero_b(),
|
||||
};
|
||||
(g.clone(), g.clone(), g.as_prim_f32())
|
||||
(g.clone(), g.clone(), g.as_prim_f32_b())
|
||||
};
|
||||
let ret = BinsDim0 {
|
||||
ts1s: [self.range.beg].into(),
|
||||
@@ -488,8 +488,8 @@ impl<NTY: ScalarOps> EventsDim0Aggregator<NTY> {
|
||||
self.sum = 0.;
|
||||
self.sumc = 0;
|
||||
self.did_min_max = false;
|
||||
self.min = NTY::zero();
|
||||
self.max = NTY::zero();
|
||||
self.min = NTY::zero_b();
|
||||
self.max = NTY::zero_b();
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,6 @@ use futures_util::StreamExt;
|
||||
use items::RangeCompletableItem;
|
||||
use items::Sitemty;
|
||||
use items::StreamItem;
|
||||
use items::SubFrId;
|
||||
use items_0::collect_s::Collector;
|
||||
use items_0::collect_s::ToJsonResult;
|
||||
use items_0::Empty;
|
||||
@@ -70,90 +69,6 @@ pub fn pulse_offs_from_abs(pulse: &[u64]) -> (u64, VecDeque<u64>) {
|
||||
(pulse_anchor, pulse_off)
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
const fn is_nan_int<T>(_x: &T) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
fn is_nan_f32(x: f32) -> bool {
|
||||
x.is_nan()
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
fn is_nan_f64(x: f64) -> bool {
|
||||
x.is_nan()
|
||||
}
|
||||
|
||||
pub trait AsPrimF32 {
|
||||
fn as_prim_f32(&self) -> f32;
|
||||
}
|
||||
|
||||
macro_rules! impl_as_prim_f32 {
|
||||
($ty:ident) => {
|
||||
impl AsPrimF32 for $ty {
|
||||
fn as_prim_f32(&self) -> f32 {
|
||||
*self as f32
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
impl_as_prim_f32!(u8);
|
||||
impl_as_prim_f32!(u16);
|
||||
impl_as_prim_f32!(u32);
|
||||
impl_as_prim_f32!(u64);
|
||||
impl_as_prim_f32!(i8);
|
||||
impl_as_prim_f32!(i16);
|
||||
impl_as_prim_f32!(i32);
|
||||
impl_as_prim_f32!(i64);
|
||||
impl_as_prim_f32!(f32);
|
||||
impl_as_prim_f32!(f64);
|
||||
|
||||
pub trait ScalarOps:
|
||||
fmt::Debug + Clone + PartialOrd + SubFrId + AsPrimF32 + Serialize + Unpin + Send + 'static
|
||||
{
|
||||
fn zero() -> Self;
|
||||
fn equal_slack(&self, rhs: &Self) -> bool;
|
||||
}
|
||||
|
||||
macro_rules! impl_num_ops {
|
||||
($ty:ident, $zero:expr, $equal_slack:ident) => {
|
||||
impl ScalarOps for $ty {
|
||||
fn zero() -> Self {
|
||||
$zero
|
||||
}
|
||||
|
||||
fn equal_slack(&self, rhs: &Self) -> bool {
|
||||
$equal_slack(*self, *rhs)
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
fn equal_int<T: PartialEq>(a: T, b: T) -> bool {
|
||||
a == b
|
||||
}
|
||||
|
||||
fn equal_f32(a: f32, b: f32) -> bool {
|
||||
(a - b).abs() < 1e-4 || (a / b > 0.999 && a / b < 1.001)
|
||||
}
|
||||
|
||||
fn equal_f64(a: f64, b: f64) -> bool {
|
||||
(a - b).abs() < 1e-6 || (a / b > 0.99999 && a / b < 1.00001)
|
||||
}
|
||||
|
||||
impl_num_ops!(u8, 0, equal_int);
|
||||
impl_num_ops!(u16, 0, equal_int);
|
||||
impl_num_ops!(u32, 0, equal_int);
|
||||
impl_num_ops!(u64, 0, equal_int);
|
||||
impl_num_ops!(i8, 0, equal_int);
|
||||
impl_num_ops!(i16, 0, equal_int);
|
||||
impl_num_ops!(i32, 0, equal_int);
|
||||
impl_num_ops!(i64, 0, equal_int);
|
||||
impl_num_ops!(f32, 0., equal_f32);
|
||||
impl_num_ops!(f64, 0., equal_f64);
|
||||
|
||||
#[allow(unused)]
|
||||
struct Ts(u64);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user