From 047237e2505c35ec53574fbde5e13fcbd2bcaa73 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 29 Nov 2022 16:37:09 +0100 Subject: [PATCH] WIP --- daqbufp2/Cargo.toml | 1 + daqbufp2/src/test/binnedbinary.rs | 3 +- disk/Cargo.toml | 1 + disk/src/raw/conn.rs | 12 ++++- items/src/binsdim0.rs | 2 +- items/src/binsdim1.rs | 3 +- items/src/items.rs | 56 +------------------- items/src/numops.rs | 43 ++++++++++++++- items/src/scalarevents.rs | 9 +++- items/src/statsevents.rs | 7 ++- items/src/waveevents.rs | 14 +++-- items/src/xbinnedscalarevents.rs | 14 ++++- items/src/xbinnedwaveevents.rs | 15 ++++-- items_0/src/items_0.rs | 2 + items_0/src/scalar_ops.rs | 87 +++++++++++++++++++++++++++++++ items_0/src/subfr.rs | 43 +++++++++++++++ items_2/src/binsdim0.rs | 21 ++++---- items_2/src/channelevents.rs | 2 +- items_2/src/eventsdim0.rs | 22 ++++---- items_2/src/items_2.rs | 85 ------------------------------ 20 files changed, 266 insertions(+), 176 deletions(-) create mode 100644 items_0/src/scalar_ops.rs create mode 100644 items_0/src/subfr.rs diff --git a/daqbufp2/Cargo.toml b/daqbufp2/Cargo.toml index 450fb08..93d0bf8 100644 --- a/daqbufp2/Cargo.toml +++ b/daqbufp2/Cargo.toml @@ -28,6 +28,7 @@ netpod = { path = "../netpod" } httpret = { path = "../httpret" } httpclient = { path = "../httpclient" } disk = { path = "../disk" } +items_0 = { path = "../items_0" } items = { path = "../items" } streams = { path = "../streams" } diff --git a/daqbufp2/src/test/binnedbinary.rs b/daqbufp2/src/test/binnedbinary.rs index 5cfc504..d99fb43 100644 --- a/daqbufp2/src/test/binnedbinary.rs +++ b/daqbufp2/src/test/binnedbinary.rs @@ -8,7 +8,8 @@ use http::StatusCode; use httpclient::HttpBodyAsAsyncRead; use hyper::Body; use items::binsdim0::MinMaxAvgDim0Bins; -use items::{RangeCompletableItem, Sitemty, StatsItem, StreamItem, SubFrId, WithLen}; +use items::{RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithLen}; +use items_0::subfr::SubFrId; use netpod::log::*; use netpod::query::{BinnedQuery, CacheUsage}; use netpod::AppendToUrl; diff --git a/disk/Cargo.toml b/disk/Cargo.toml index deeea27..ca61db7 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -41,6 +41,7 @@ bitshuffle = { path = "../bitshuffle" } dbconn = { path = "../dbconn" } parse = { path = "../parse" } items = { path = "../items" } +items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } streams = { path = "../streams" } httpclient = { path = "../httpclient" } diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index ca21c73..947f754 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -11,6 +11,7 @@ use netpod::log::*; use netpod::query::RawEventsQuery; use netpod::{AggKind, ByteOrder, ByteSize, Channel, DiskIoTune, NanoRange, NodeConfigCached, ScalarType, Shape}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, ConfigEntry, MatchingConfigEntry}; +use std::collections::VecDeque; use std::pin::Pin; use streams::eventchunker::EventChunkerConf; @@ -33,7 +34,16 @@ where StreamItem::DataItem(item) => match item { RangeCompletableItem::Data(item) => { let item = events_node_proc.process(item); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + use items::EventsNodeProcessorOutput; + let parts = item.into_parts::(); + let item = items_2::eventsdim0::EventsDim0 { + tss: parts.1, + pulses: VecDeque::new(), + values: parts.0, + }; + let item = Box::new(item) as Box; + //Ok(StreamItem::DataItem(RangeCompletableItem::Data(todo!()))) + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) } RangeCompletableItem::RangeComplete => Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)), }, diff --git a/items/src/binsdim0.rs b/items/src/binsdim0.rs index 440f002..6548a19 100644 --- a/items/src/binsdim0.rs +++ b/items/src/binsdim0.rs @@ -10,7 +10,6 @@ use crate::IsoDateTime; use crate::ReadPbv; use crate::ReadableFromFile; use crate::Sitemty; -use crate::SubFrId; use crate::TimeBinnableDyn; use crate::{ts_offs_from_abs, FrameType}; use crate::{NewEmpty, RangeOverlapInfo, WithLen}; @@ -18,6 +17,7 @@ use crate::{TimeBinnableType, TimeBinnableTypeAggregator}; use crate::{TimeBinned, TimeBinnerDyn, TimeBins}; use chrono::{TimeZone, Utc}; use err::Error; +use items_0::subfr::SubFrId; use netpod::log::*; use netpod::timeunits::SEC; use netpod::{NanoRange, Shape}; diff --git a/items/src/binsdim1.rs b/items/src/binsdim1.rs index 3ceeff6..a2591f7 100644 --- a/items/src/binsdim1.rs +++ b/items/src/binsdim1.rs @@ -14,9 +14,10 @@ use crate::TimeBinnableType; use crate::TimeBinnableTypeAggregator; use crate::TimeBins; use crate::{pulse_offs_from_abs, FrameType}; -use crate::{Fits, FitsInside, NewEmpty, ReadPbv, Sitemty, SubFrId, TimeBinned, WithLen}; +use crate::{Fits, FitsInside, NewEmpty, ReadPbv, Sitemty, TimeBinned, WithLen}; use chrono::{TimeZone, Utc}; use err::Error; +use items_0::subfr::SubFrId; use netpod::log::*; use netpod::timeunits::SEC; use netpod::{NanoRange, Shape}; diff --git a/items/src/items.rs b/items/src/items.rs index e93c1cd..06158a1 100644 --- a/items/src/items.rs +++ b/items/src/items.rs @@ -15,7 +15,6 @@ pub mod xbinnedscalarevents; pub mod xbinnedwaveevents; use crate::frame::make_frame_2; -use crate::numops::BoolNum; use bytes::BytesMut; use chrono::{TimeZone, Utc}; use err::Error; @@ -25,10 +24,10 @@ use netpod::log::*; use netpod::timeunits::{MS, SEC}; use netpod::{log::Level, AggKind, EventDataReadStats, NanoRange, Shape}; use netpod::{DiskStats, RangeFilterStats, ScalarType}; -use numops::StringNum; use serde::de::{self, DeserializeOwned, Visitor}; use serde::{Deserialize, Serialize, Serializer}; use std::any::Any; +use std::collections::VecDeque; use std::fmt; use std::future::Future; use std::marker::PhantomData; @@ -190,58 +189,6 @@ pub const INMEM_FRAME_HEAD: usize = 20; pub const INMEM_FRAME_FOOT: usize = 4; pub const INMEM_FRAME_MAGIC: u32 = 0xc6c3b73d; -pub trait SubFrId { - const SUB: u32; -} - -impl SubFrId for u8 { - const SUB: u32 = 0x03; -} - -impl SubFrId for u16 { - const SUB: u32 = 0x05; -} - -impl SubFrId for u32 { - const SUB: u32 = 0x08; -} - -impl SubFrId for u64 { - const SUB: u32 = 0x0a; -} - -impl SubFrId for i8 { - const SUB: u32 = 0x02; -} - -impl SubFrId for i16 { - const SUB: u32 = 0x04; -} - -impl SubFrId for i32 { - const SUB: u32 = 0x07; -} - -impl SubFrId for i64 { - const SUB: u32 = 0x09; -} - -impl SubFrId for f32 { - const SUB: u32 = 0x0b; -} - -impl SubFrId for f64 { - const SUB: u32 = 0x0c; -} - -impl SubFrId for StringNum { - const SUB: u32 = 0x0d; -} - -impl SubFrId for BoolNum { - const SUB: u32 = 0x0e; -} - // Required for any inner type of Sitemty. pub trait FrameTypeInnerStatic { const FRAME_TYPE_ID: u32; @@ -421,6 +368,7 @@ impl FrameType for EventQueryJsonStringFrame { pub trait EventsNodeProcessorOutput: Send + Unpin + DeserializeOwned + WithTimestamps + TimeBinnableType + ByteEstimate { + fn into_parts(self) -> (VecDeque, VecDeque); } pub trait EventsNodeProcessor: Send + Unpin { diff --git a/items/src/numops.rs b/items/src/numops.rs index d424199..56c4865 100644 --- a/items/src/numops.rs +++ b/items/src/numops.rs @@ -1,4 +1,4 @@ -use crate::SubFrId; +use items_0::subfr::SubFrId; use num_traits::{Bounded, Float, Zero}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; @@ -123,6 +123,7 @@ pub trait NumOps: + SubFrId + Serialize + DeserializeOwned + + items_0::scalar_ops::ScalarOps { fn min_or_nan() -> Self; fn max_or_nan() -> Self; @@ -203,3 +204,43 @@ impl_num_ops!(f32, NAN, NAN, is_nan_float); impl_num_ops!(f64, NAN, NAN, is_nan_float); impl_num_ops!(BoolNum, MIN, MAX, is_nan_int); impl_num_ops!(StringNum, MIN, MAX, is_nan_int); + +impl SubFrId for StringNum { + const SUB: u32 = 0x0d; +} + +impl SubFrId for BoolNum { + const SUB: u32 = 0x0e; +} + +impl items_0::scalar_ops::AsPrimF32 for BoolNum { + fn as_prim_f32_b(&self) -> f32 { + todo!() + } +} + +impl items_0::scalar_ops::AsPrimF32 for StringNum { + fn as_prim_f32_b(&self) -> f32 { + todo!() + } +} + +impl items_0::scalar_ops::ScalarOps for BoolNum { + fn zero_b() -> Self { + todo!() + } + + fn equal_slack(&self, _rhs: &Self) -> bool { + todo!() + } +} + +impl items_0::scalar_ops::ScalarOps for StringNum { + fn zero_b() -> Self { + todo!() + } + + fn equal_slack(&self, _rhs: &Self) -> bool { + todo!() + } +} diff --git a/items/src/scalarevents.rs b/items/src/scalarevents.rs index 499caee..6bb3c83 100644 --- a/items/src/scalarevents.rs +++ b/items/src/scalarevents.rs @@ -823,4 +823,11 @@ impl TimeBinnerDyn for ScalarEventsTimeBinner { } } -impl EventsNodeProcessorOutput for ScalarEvents where NTY: NumOps {} +impl EventsNodeProcessorOutput for ScalarEvents +where + NTY: NumOps, +{ + fn into_parts(self) -> (VecDeque, VecDeque) { + todo!() + } +} diff --git a/items/src/statsevents.rs b/items/src/statsevents.rs index 6431d10..a6844d9 100644 --- a/items/src/statsevents.rs +++ b/items/src/statsevents.rs @@ -8,6 +8,7 @@ use err::Error; use netpod::log::*; use netpod::{NanoRange, Shape}; use serde::{Deserialize, Serialize}; +use std::collections::VecDeque; use std::fmt; use tokio::fs::File; @@ -421,4 +422,8 @@ impl EventAppendable for StatsEvents { } } -impl EventsNodeProcessorOutput for StatsEvents {} +impl EventsNodeProcessorOutput for StatsEvents { + fn into_parts(self) -> (VecDeque, VecDeque) { + todo!() + } +} diff --git a/items/src/waveevents.rs b/items/src/waveevents.rs index d22ca72..884910d 100644 --- a/items/src/waveevents.rs +++ b/items/src/waveevents.rs @@ -5,13 +5,14 @@ use crate::xbinnedwaveevents::XBinnedWaveEvents; use crate::{ Appendable, ByteEstimate, Clearable, EventAppendable, EventsDyn, EventsNodeProcessor, EventsNodeProcessorOutput, FilterFittingInside, Fits, FitsInside, FrameType, FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo, - ReadPbv, ReadableFromFile, SubFrId, TimeBinnableDyn, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, - WithTimestamps, + ReadPbv, ReadableFromFile, TimeBinnableDyn, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; use err::Error; +use items_0::subfr::SubFrId; use netpod::log::*; use netpod::{x_bin_count, AggKind, NanoRange, Shape}; use serde::{Deserialize, Serialize}; +use std::collections::VecDeque; use std::marker::PhantomData; use tokio::fs::File; @@ -535,4 +536,11 @@ impl EventsDyn for WaveEvents { } } -impl EventsNodeProcessorOutput for WaveEvents where NTY: NumOps {} +impl EventsNodeProcessorOutput for WaveEvents +where + NTY: NumOps, +{ + fn into_parts(self) -> (VecDeque, VecDeque) { + todo!() + } +} diff --git a/items/src/xbinnedscalarevents.rs b/items/src/xbinnedscalarevents.rs index 0b9182c..ed9f131 100644 --- a/items/src/xbinnedscalarevents.rs +++ b/items/src/xbinnedscalarevents.rs @@ -1,12 +1,15 @@ +use std::collections::VecDeque; + use crate::binsdim0::MinMaxAvgDim0Bins; use crate::numops::NumOps; use crate::streams::{Collectable, Collector}; use crate::{ ts_offs_from_abs, Appendable, ByteEstimate, Clearable, EventsNodeProcessorOutput, FilterFittingInside, Fits, FitsInside, FrameType, FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, - SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, + TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; use err::Error; +use items_0::subfr::SubFrId; use netpod::log::*; use netpod::{NanoRange, Shape}; use serde::{Deserialize, Serialize}; @@ -502,4 +505,11 @@ where } } -impl EventsNodeProcessorOutput for XBinnedScalarEvents where NTY: NumOps {} +impl EventsNodeProcessorOutput for XBinnedScalarEvents +where + NTY: NumOps, +{ + fn into_parts(self) -> (VecDeque, VecDeque) { + todo!() + } +} diff --git a/items/src/xbinnedwaveevents.rs b/items/src/xbinnedwaveevents.rs index 5020aec..c67f144 100644 --- a/items/src/xbinnedwaveevents.rs +++ b/items/src/xbinnedwaveevents.rs @@ -3,14 +3,16 @@ use crate::numops::NumOps; use crate::streams::{Collectable, Collector}; use crate::{ Appendable, ByteEstimate, Clearable, EventsNodeProcessorOutput, FilterFittingInside, Fits, FitsInside, FrameType, - FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SubFrId, - TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, + FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, TimeBinnableType, + TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; use err::Error; +use items_0::subfr::SubFrId; use netpod::log::*; use netpod::timeunits::*; use netpod::{NanoRange, Shape}; use serde::{Deserialize, Serialize}; +use std::collections::VecDeque; use std::mem; use tokio::fs::File; @@ -533,4 +535,11 @@ where } } -impl EventsNodeProcessorOutput for XBinnedWaveEvents where NTY: NumOps {} +impl EventsNodeProcessorOutput for XBinnedWaveEvents +where + NTY: NumOps, +{ + fn into_parts(self) -> (VecDeque, VecDeque) { + todo!() + } +} diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs index 967205a..0444541 100644 --- a/items_0/src/items_0.rs +++ b/items_0/src/items_0.rs @@ -1,5 +1,7 @@ pub mod collect_c; pub mod collect_s; +pub mod scalar_ops; +pub mod subfr; use collect_c::CollectableWithDefault; use collect_s::Collectable; diff --git a/items_0/src/scalar_ops.rs b/items_0/src/scalar_ops.rs new file mode 100644 index 0000000..f5f77fa --- /dev/null +++ b/items_0/src/scalar_ops.rs @@ -0,0 +1,87 @@ +use crate::subfr::SubFrId; +use serde::Serialize; +use std::fmt; + +#[allow(unused)] +const fn is_nan_int(_x: &T) -> bool { + false +} + +#[allow(unused)] +fn is_nan_f32(x: f32) -> bool { + x.is_nan() +} + +#[allow(unused)] +fn is_nan_f64(x: f64) -> bool { + x.is_nan() +} + +pub trait AsPrimF32 { + fn as_prim_f32_b(&self) -> f32; +} + +macro_rules! impl_as_prim_f32 { + ($ty:ident) => { + impl AsPrimF32 for $ty { + fn as_prim_f32_b(&self) -> f32 { + *self as f32 + } + } + }; +} + +impl_as_prim_f32!(u8); +impl_as_prim_f32!(u16); +impl_as_prim_f32!(u32); +impl_as_prim_f32!(u64); +impl_as_prim_f32!(i8); +impl_as_prim_f32!(i16); +impl_as_prim_f32!(i32); +impl_as_prim_f32!(i64); +impl_as_prim_f32!(f32); +impl_as_prim_f32!(f64); + +pub trait ScalarOps: + fmt::Debug + Clone + PartialOrd + SubFrId + AsPrimF32 + Serialize + Unpin + Send + 'static +{ + fn zero_b() -> Self; + fn equal_slack(&self, rhs: &Self) -> bool; +} + +macro_rules! impl_num_ops { + ($ty:ident, $zero:expr, $equal_slack:ident) => { + impl ScalarOps for $ty { + fn zero_b() -> Self { + $zero + } + + fn equal_slack(&self, rhs: &Self) -> bool { + $equal_slack(*self, *rhs) + } + } + }; +} + +fn equal_int(a: T, b: T) -> bool { + a == b +} + +fn equal_f32(a: f32, b: f32) -> bool { + (a - b).abs() < 1e-4 || (a / b > 0.999 && a / b < 1.001) +} + +fn equal_f64(a: f64, b: f64) -> bool { + (a - b).abs() < 1e-6 || (a / b > 0.99999 && a / b < 1.00001) +} + +impl_num_ops!(u8, 0, equal_int); +impl_num_ops!(u16, 0, equal_int); +impl_num_ops!(u32, 0, equal_int); +impl_num_ops!(u64, 0, equal_int); +impl_num_ops!(i8, 0, equal_int); +impl_num_ops!(i16, 0, equal_int); +impl_num_ops!(i32, 0, equal_int); +impl_num_ops!(i64, 0, equal_int); +impl_num_ops!(f32, 0., equal_f32); +impl_num_ops!(f64, 0., equal_f64); diff --git a/items_0/src/subfr.rs b/items_0/src/subfr.rs new file mode 100644 index 0000000..fcea23f --- /dev/null +++ b/items_0/src/subfr.rs @@ -0,0 +1,43 @@ +pub trait SubFrId { + const SUB: u32; +} + +impl SubFrId for u8 { + const SUB: u32 = 0x03; +} + +impl SubFrId for u16 { + const SUB: u32 = 0x05; +} + +impl SubFrId for u32 { + const SUB: u32 = 0x08; +} + +impl SubFrId for u64 { + const SUB: u32 = 0x0a; +} + +impl SubFrId for i8 { + const SUB: u32 = 0x02; +} + +impl SubFrId for i16 { + const SUB: u32 = 0x04; +} + +impl SubFrId for i32 { + const SUB: u32 = 0x07; +} + +impl SubFrId for i64 { + const SUB: u32 = 0x09; +} + +impl SubFrId for f32 { + const SUB: u32 = 0x0b; +} + +impl SubFrId for f64 { + const SUB: u32 = 0x0c; +} diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs index 1e8c026..07da626 100644 --- a/items_2/src/binsdim0.rs +++ b/items_2/src/binsdim0.rs @@ -1,9 +1,10 @@ use crate::{ts_offs_from_abs, ts_offs_from_abs_with_anchor}; -use crate::{IsoDateTime, RangeOverlapInfo, ScalarOps}; +use crate::{IsoDateTime, RangeOverlapInfo}; use crate::{TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinner}; use chrono::{TimeZone, Utc}; use err::Error; use items_0::collect_s::{Collectable, CollectableType, CollectorType, ToJsonResult}; +use items_0::scalar_ops::ScalarOps; use items_0::AppendEmptyBin; use items_0::Empty; use items_0::TimeBinned; @@ -74,8 +75,8 @@ impl BinsDim0 { self.ts1s.push_back(beg); self.ts2s.push_back(end); self.counts.push_back(0); - self.mins.push_back(NTY::zero()); - self.maxs.push_back(NTY::zero()); + self.mins.push_back(NTY::zero_b()); + self.maxs.push_back(NTY::zero_b()); self.avgs.push_back(0.); } @@ -168,8 +169,8 @@ impl AppendEmptyBin for BinsDim0 { self.ts1s.push_back(ts1); self.ts2s.push_back(ts2); self.counts.push_back(0); - self.mins.push_back(NTY::zero()); - self.maxs.push_back(NTY::zero()); + self.mins.push_back(NTY::zero_b()); + self.maxs.push_back(NTY::zero_b()); self.avgs.push_back(0.); } } @@ -393,8 +394,8 @@ impl BinsDim0Aggregator { Self { range, count: 0, - min: NTY::zero(), - max: NTY::zero(), + min: NTY::zero_b(), + max: NTY::zero_b(), avg: 0., sumc: 0, sum: 0f32, @@ -656,12 +657,12 @@ impl TimeBinned for BinsDim0 { // TODO is Vec needed? fn mins(&self) -> Vec { - self.mins.iter().map(|x| x.clone().as_prim_f32()).collect() + self.mins.iter().map(|x| x.clone().as_prim_f32_b()).collect() } // TODO is Vec needed? fn maxs(&self) -> Vec { - self.maxs.iter().map(|x| x.clone().as_prim_f32()).collect() + self.maxs.iter().map(|x| x.clone().as_prim_f32_b()).collect() } // TODO is Vec needed? @@ -676,7 +677,7 @@ impl TimeBinned for BinsDim0 { write!(&mut msg, "ts1s ≠ ts2s\n").unwrap(); } for (i, ((count, min), max)) in self.counts.iter().zip(&self.mins).zip(&self.maxs).enumerate() { - if min.as_prim_f32() < 1. && *count != 0 { + if min.as_prim_f32_b() < 1. && *count != 0 { write!(&mut msg, "i {} count {} min {:?} max {:?}\n", i, count, min, max).unwrap(); } } diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index 4660b2a..e24eb29 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -97,7 +97,7 @@ mod serde_channel_events { where A: de::SeqAccess<'de>, { - use items::SubFrId; + use items_0::subfr::SubFrId; let e0: &str = seq.next_element()?.ok_or(de::Error::missing_field("ty .0"))?; let e1: u32 = seq.next_element()?.ok_or(de::Error::missing_field("nty .1"))?; if e0 == EventsDim0::::serde_id() { diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 367902c..06acb91 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -1,8 +1,8 @@ use crate::binsdim0::BinsDim0; -use crate::ScalarOps; use crate::{pulse_offs_from_abs, ts_offs_from_abs, RangeOverlapInfo}; use crate::{TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinner}; use err::Error; +use items_0::scalar_ops::ScalarOps; use items_0::{Empty, Events, WithLen}; use netpod::log::*; use netpod::timeunits::SEC; @@ -292,8 +292,8 @@ impl EventsDim0Aggregator { Self { range, count: 0, - min: NTY::zero(), - max: NTY::zero(), + min: NTY::zero_b(), + max: NTY::zero_b(), sum: 0., sumc: 0, int_ts, @@ -334,7 +334,7 @@ impl EventsDim0Aggregator { fn apply_event_unweight(&mut self, val: NTY) { trace!("TODO check again result_reset_unweight"); err::todo(); - let vf = val.as_prim_f32(); + let vf = val.as_prim_f32_b(); self.apply_min_max(val); if vf.is_nan() { } else { @@ -345,7 +345,7 @@ impl EventsDim0Aggregator { fn apply_event_time_weight(&mut self, ts: u64) { if let Some(v) = &self.last_seen_val { - let vf = v.as_prim_f32(); + let vf = v.as_prim_f32_b(); let v2 = v.clone(); if ts > self.range.beg { self.apply_min_max(v2); @@ -435,9 +435,9 @@ impl EventsDim0Aggregator { } else { let g = match &self.last_seen_val { Some(x) => x.clone(), - None => NTY::zero(), + None => NTY::zero_b(), }; - (g.clone(), g.clone(), g.as_prim_f32()) + (g.clone(), g.clone(), g.as_prim_f32_b()) }; let ret = BinsDim0 { ts1s: [self.range.beg].into(), @@ -470,9 +470,9 @@ impl EventsDim0Aggregator { } else { let g = match &self.last_seen_val { Some(x) => x.clone(), - None => NTY::zero(), + None => NTY::zero_b(), }; - (g.clone(), g.clone(), g.as_prim_f32()) + (g.clone(), g.clone(), g.as_prim_f32_b()) }; let ret = BinsDim0 { ts1s: [self.range.beg].into(), @@ -488,8 +488,8 @@ impl EventsDim0Aggregator { self.sum = 0.; self.sumc = 0; self.did_min_max = false; - self.min = NTY::zero(); - self.max = NTY::zero(); + self.min = NTY::zero_b(); + self.max = NTY::zero_b(); ret } } diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index fec3093..2545a61 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -18,7 +18,6 @@ use futures_util::StreamExt; use items::RangeCompletableItem; use items::Sitemty; use items::StreamItem; -use items::SubFrId; use items_0::collect_s::Collector; use items_0::collect_s::ToJsonResult; use items_0::Empty; @@ -70,90 +69,6 @@ pub fn pulse_offs_from_abs(pulse: &[u64]) -> (u64, VecDeque) { (pulse_anchor, pulse_off) } -#[allow(unused)] -const fn is_nan_int(_x: &T) -> bool { - false -} - -#[allow(unused)] -fn is_nan_f32(x: f32) -> bool { - x.is_nan() -} - -#[allow(unused)] -fn is_nan_f64(x: f64) -> bool { - x.is_nan() -} - -pub trait AsPrimF32 { - fn as_prim_f32(&self) -> f32; -} - -macro_rules! impl_as_prim_f32 { - ($ty:ident) => { - impl AsPrimF32 for $ty { - fn as_prim_f32(&self) -> f32 { - *self as f32 - } - } - }; -} - -impl_as_prim_f32!(u8); -impl_as_prim_f32!(u16); -impl_as_prim_f32!(u32); -impl_as_prim_f32!(u64); -impl_as_prim_f32!(i8); -impl_as_prim_f32!(i16); -impl_as_prim_f32!(i32); -impl_as_prim_f32!(i64); -impl_as_prim_f32!(f32); -impl_as_prim_f32!(f64); - -pub trait ScalarOps: - fmt::Debug + Clone + PartialOrd + SubFrId + AsPrimF32 + Serialize + Unpin + Send + 'static -{ - fn zero() -> Self; - fn equal_slack(&self, rhs: &Self) -> bool; -} - -macro_rules! impl_num_ops { - ($ty:ident, $zero:expr, $equal_slack:ident) => { - impl ScalarOps for $ty { - fn zero() -> Self { - $zero - } - - fn equal_slack(&self, rhs: &Self) -> bool { - $equal_slack(*self, *rhs) - } - } - }; -} - -fn equal_int(a: T, b: T) -> bool { - a == b -} - -fn equal_f32(a: f32, b: f32) -> bool { - (a - b).abs() < 1e-4 || (a / b > 0.999 && a / b < 1.001) -} - -fn equal_f64(a: f64, b: f64) -> bool { - (a - b).abs() < 1e-6 || (a / b > 0.99999 && a / b < 1.00001) -} - -impl_num_ops!(u8, 0, equal_int); -impl_num_ops!(u16, 0, equal_int); -impl_num_ops!(u32, 0, equal_int); -impl_num_ops!(u64, 0, equal_int); -impl_num_ops!(i8, 0, equal_int); -impl_num_ops!(i16, 0, equal_int); -impl_num_ops!(i32, 0, equal_int); -impl_num_ops!(i64, 0, equal_int); -impl_num_ops!(f32, 0., equal_f32); -impl_num_ops!(f64, 0., equal_f64); - #[allow(unused)] struct Ts(u64);