diff --git a/items_0/src/collect_c.rs b/items_0/src/collect_c.rs index 5db322c..1010776 100644 --- a/items_0/src/collect_c.rs +++ b/items_0/src/collect_c.rs @@ -5,7 +5,7 @@ use crate::AsAnyRef; use crate::Events; use crate::WithLen; use err::Error; -use netpod::BinnedRange; +use netpod::BinnedRangeEnum; use netpod::NanoRange; use std::fmt; @@ -14,7 +14,11 @@ pub trait Collector: fmt::Debug + Send { fn ingest(&mut self, item: &mut dyn Collectable); fn set_range_complete(&mut self); fn set_timed_out(&mut self); - fn result(&mut self, range: Option, binrange: Option) -> Result, Error>; + fn result( + &mut self, + range: Option, + binrange: Option, + ) -> Result, Error>; } pub trait Collectable: fmt::Debug + AsAnyMut + crate::WithLen { @@ -46,7 +50,11 @@ pub trait CollectorDyn: fmt::Debug + Send { fn set_timed_out(&mut self); - fn result(&mut self, range: Option, binrange: Option) -> Result, Error>; + fn result( + &mut self, + range: Option, + binrange: Option, + ) -> Result, Error>; } pub trait CollectableWithDefault: AsAnyMut { @@ -88,7 +96,7 @@ impl Collector for TimeBinnedCollector { fn result( &mut self, _range: Option, - _binrange: Option, + _binrange: Option, ) -> Result, Error> { todo!() } diff --git a/items_0/src/collect_s.rs b/items_0/src/collect_s.rs index fec4518..16b92d2 100644 --- a/items_0/src/collect_s.rs +++ b/items_0/src/collect_s.rs @@ -3,7 +3,7 @@ use crate::AsAnyMut; use crate::AsAnyRef; use crate::WithLen; use err::Error; -use netpod::BinnedRange; +use netpod::BinnedRangeEnum; use netpod::NanoRange; use serde::Serialize; use std::any::Any; @@ -19,17 +19,18 @@ pub trait CollectorType: Send + Unpin + WithLen { fn set_timed_out(&mut self); // TODO use this crate's Error instead: - fn result(&mut self, range: Option, binrange: Option) -> Result; + fn result(&mut self, range: Option, binrange: Option) -> Result; } pub trait Collector: Send + Unpin + WithLen { fn ingest(&mut self, src: &mut dyn Collectable); fn set_range_complete(&mut self); fn set_timed_out(&mut self); + // TODO factor the required parameters into new struct? Generic over events or binned? fn result( &mut self, range: Option, - binrange: Option, + binrange: Option, ) -> Result, Error>; } @@ -60,7 +61,7 @@ impl Collector for T { fn result( &mut self, range: Option, - binrange: Option, + binrange: Option, ) -> Result, Error> { let ret = T::result(self, range, binrange)?; Ok(Box::new(ret) as _) diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs index dcc6768..a2b01c0 100644 --- a/items_0/src/items_0.rs +++ b/items_0/src/items_0.rs @@ -5,6 +5,7 @@ pub mod isodate; pub mod scalar_ops; pub mod streamitem; pub mod subfr; +pub mod transform; pub mod bincode { pub use bincode::*; @@ -13,8 +14,10 @@ pub mod bincode { use collect_c::CollectableWithDefault; use collect_s::Collectable; use collect_s::ToJsonResult; +use netpod::BinnedRangeEnum; use netpod::NanoRange; use netpod::ScalarType; +use netpod::SeriesRange; use netpod::Shape; use std::any::Any; use std::collections::VecDeque; @@ -42,9 +45,9 @@ pub enum Fits { } pub trait RangeOverlapInfo { - fn ends_before(&self, range: NanoRange) -> bool; - fn ends_after(&self, range: NanoRange) -> bool; - fn starts_after(&self, range: NanoRange) -> bool; + fn ends_before(&self, range: &SeriesRange) -> bool; + fn ends_after(&self, range: &SeriesRange) -> bool; + fn starts_after(&self, range: &SeriesRange) -> bool; } pub trait EmptyForScalarTypeShape { @@ -134,7 +137,7 @@ pub trait TimeBinner: Send { /// In contrast to `TimeBinnableType` this is meant for trait objects. pub trait TimeBinnable: fmt::Debug + WithLen + RangeOverlapInfo + Any + AsAnyRef + AsAnyMut + Send { // TODO implementors may fail if edges contain not at least 2 entries. - fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box; + fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box; // TODO just a helper for the empty result. fn to_box_to_json_result(&self) -> Box; } @@ -202,9 +205,28 @@ impl PartialEq for Box { } pub struct TransformProperties { + pub needs_one_before_range: bool, pub needs_value: bool, } -pub trait TransformStage { +pub trait Transformer { fn query_transform_properties(&self) -> TransformProperties; } + +impl Transformer for Box +where + T: Transformer, +{ + fn query_transform_properties(&self) -> TransformProperties { + self.as_ref().query_transform_properties() + } +} + +impl Transformer for std::pin::Pin> +where + T: Transformer, +{ + fn query_transform_properties(&self) -> TransformProperties { + self.as_ref().query_transform_properties() + } +} diff --git a/items_0/src/transform.rs b/items_0/src/transform.rs new file mode 100644 index 0000000..e69de29 diff --git a/items_2/src/binnedcollected.rs b/items_2/src/binnedcollected.rs new file mode 100644 index 0000000..791d656 --- /dev/null +++ b/items_2/src/binnedcollected.rs @@ -0,0 +1,252 @@ +use crate::channelevents::ChannelEvents; +use crate::empty::empty_events_dyn_ev; +use crate::ChannelEventsInput; +use crate::Error; +use futures_util::Future; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::collect_s::Collector; +use items_0::collect_s::ToJsonResult; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; +use items_0::TimeBinnable; +use items_0::TimeBinner; +use items_0::Transformer; +use netpod::log::*; +use netpod::transform::Transform; +use netpod::BinnedRange; +use netpod::BinnedRangeEnum; +use netpod::ScalarType; +use netpod::Shape; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use std::time::Duration; +use std::time::Instant; + +fn flush_binned( + binner: &mut Box, + coll: &mut Option>, + force: bool, +) -> Result<(), Error> { + trace!("flush_binned bins_ready_count: {}", binner.bins_ready_count()); + if force { + if binner.bins_ready_count() == 0 { + debug!("cycle the binner forced"); + binner.cycle(); + } else { + debug!("bins ready, do not force"); + } + } + if binner.bins_ready_count() > 0 { + let ready = binner.bins_ready(); + match ready { + Some(mut ready) => { + trace!("binned_collected ready {ready:?}"); + if coll.is_none() { + *coll = Some(ready.as_collectable_mut().new_collector()); + } + let cl = coll.as_mut().unwrap(); + cl.ingest(ready.as_collectable_mut()); + Ok(()) + } + None => Err(format!("bins_ready_count but no result").into()), + } + } else { + Ok(()) + } +} + +#[derive(Debug)] +pub struct BinnedCollectedResult { + pub range_final: bool, + pub did_timeout: bool, + pub result: Box, +} + +fn _old_binned_collected( + scalar_type: ScalarType, + shape: Shape, + binrange: BinnedRangeEnum, + transformer: &dyn Transformer, + deadline: Instant, + inp: Pin> + Send>>, +) -> Result { + event!(Level::TRACE, "binned_collected"); + let transprops = transformer.query_transform_properties(); + // TODO use a trait to allow check of unfinished data [hcn2956jxhwsf] + // TODO implement continue-at [hcn2956jxhwsf] + // TODO maybe TimeBinner should take all ChannelEvents and handle this? + let empty_item = empty_events_dyn_ev(&scalar_type, &shape)?; + let tmp_item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(ChannelEvents::Events( + empty_item, + )))); + let empty_stream = futures_util::stream::once(futures_util::future::ready(tmp_item)); + let mut stream = empty_stream.chain(inp); + todo!() +} + +enum BinnedCollectedState { + Init, + Run, + Done, +} + +pub struct BinnedCollected { + state: BinnedCollectedState, + binrange: BinnedRangeEnum, + scalar_type: ScalarType, + shape: Shape, + do_time_weight: bool, + did_timeout: bool, + range_final: bool, + coll: Option>, + binner: Option>, + inp: Pin>, +} + +impl BinnedCollected { + fn self_name() -> &'static str { + "BinnedCollected" + } + + pub fn new( + binrange: BinnedRangeEnum, + scalar_type: ScalarType, + shape: Shape, + do_time_weight: bool, + //transformer: &dyn Transformer, + deadline: Instant, + inp: Pin>, + ) -> Self { + Self { + state: BinnedCollectedState::Init, + binrange, + scalar_type, + shape, + do_time_weight, + did_timeout: false, + range_final: false, + coll: None, + binner: None, + inp, + } + } + + fn handle_item(&mut self, item: StreamItem>) -> Result<(), Error> { + match item { + StreamItem::DataItem(k) => match k { + RangeCompletableItem::RangeComplete => { + self.range_final = true; + } + RangeCompletableItem::Data(k) => match k { + ChannelEvents::Events(events) => { + if self.binner.is_none() { + let bb = events + .as_time_binnable() + .time_binner_new(self.binrange.clone(), self.do_time_weight); + self.binner = Some(bb); + } + let binner = self.binner.as_mut().unwrap(); + binner.ingest(events.as_time_binnable()); + flush_binned(binner, &mut self.coll, false)?; + } + ChannelEvents::Status(item) => { + trace!("{:?}", item); + } + }, + }, + StreamItem::Log(item) => { + // TODO collect also errors here? + trace!("{:?}", item); + } + StreamItem::Stats(item) => { + // TODO do something with the stats + trace!("{:?}", item); + } + } + Ok(()) + } + + fn result(&mut self) -> Result { + if let Some(mut binner) = self.binner.take() { + if self.range_final { + trace!("range_final"); + binner.set_range_complete(); + } else { + debug!("not range_final"); + } + if self.did_timeout { + warn!("did_timeout"); + } else { + trace!("not did_timeout"); + binner.cycle(); + } + flush_binned(&mut binner, &mut self.coll, false)?; + if self.coll.is_none() { + debug!("force a bin"); + flush_binned(&mut binner, &mut self.coll, true)?; + } else { + trace!("coll is already some"); + } + } else { + error!("no binner, should always have one"); + } + let result = match self.coll.take() { + Some(mut coll) => { + let res = coll + .result(None, Some(self.binrange.clone())) + .map_err(|e| format!("{e}"))?; + res + } + None => { + error!("binned_collected nothing collected"); + return Err(Error::from(format!("binned_collected nothing collected"))); + } + }; + let ret = BinnedCollectedResult { + range_final: self.range_final, + did_timeout: self.did_timeout, + result, + }; + Ok(ret) + } +} + +impl Future for BinnedCollected { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let span = span!(Level::INFO, BinnedCollected::self_name()); + let _spg = span.enter(); + use Poll::*; + loop { + break match &self.state { + BinnedCollectedState::Init => { + self.state = BinnedCollectedState::Run; + continue; + } + BinnedCollectedState::Run => match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(item))) => match self.handle_item(item) { + Ok(()) => continue, + Err(e) => { + self.state = BinnedCollectedState::Done; + Ready(Err(e)) + } + }, + Ready(Some(Err(e))) => { + self.state = BinnedCollectedState::Done; + Ready(Err(e.into())) + } + Ready(None) => { + self.state = BinnedCollectedState::Done; + Ready(self.result()) + } + Pending => Pending, + }, + BinnedCollectedState::Done => Ready(Err(Error::from(format!("already done")))), + }; + } + } +} diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs index 5a2620a..50563e8 100644 --- a/items_2/src/binsdim0.rs +++ b/items_2/src/binsdim0.rs @@ -1,23 +1,39 @@ -use crate::{ts_offs_from_abs, ts_offs_from_abs_with_anchor}; -use crate::{IsoDateTime, RangeOverlapInfo}; -use crate::{TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinner}; -use chrono::{TimeZone, Utc}; +use crate::ts_offs_from_abs; +use crate::ts_offs_from_abs_with_anchor; +use crate::IsoDateTime; +use crate::RangeOverlapInfo; +use crate::TimeBinnableType; +use crate::TimeBinnableTypeAggregator; +use chrono::TimeZone; +use chrono::Utc; use err::Error; -use items_0::collect_s::{Collectable, CollectableType, CollectorType, ToJsonResult}; +use items_0::collect_s::Collectable; +use items_0::collect_s::CollectableType; +use items_0::collect_s::CollectorType; +use items_0::collect_s::ToJsonResult; use items_0::scalar_ops::ScalarOps; +use items_0::AppendEmptyBin; +use items_0::AsAnyMut; +use items_0::AsAnyRef; +use items_0::Empty; +use items_0::TimeBinnable; use items_0::TimeBinned; +use items_0::TimeBinner; use items_0::TimeBins; use items_0::WithLen; -use items_0::{AppendEmptyBin, AsAnyRef}; -use items_0::{AsAnyMut, Empty}; +use netpod::SeriesRange; +use netpod::log::*; use netpod::timeunits::SEC; +use netpod::BinnedRange; +use netpod::BinnedRangeEnum; use netpod::NanoRange; -use netpod::{log::*, BinnedRange}; use num_traits::Zero; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; +use serde::Serialize; use std::any::Any; use std::collections::VecDeque; -use std::{fmt, mem}; +use std::fmt; +use std::mem; #[allow(unused)] macro_rules! trace4 { @@ -166,26 +182,59 @@ impl WithLen for BinsDim0 { } impl RangeOverlapInfo for BinsDim0 { - fn ends_before(&self, range: NanoRange) -> bool { - if let Some(&max) = self.ts2s.back() { - max <= range.beg + fn ends_before(&self, range: &SeriesRange) -> bool { + if range.is_time() { + if let Some(&max) = self.ts2s.back() { + max <= range.beg_u64() + } else { + true + } + } else if range.is_pulse() { + if let Some(&max) = self.pulses.back() { + max <= range.beg_u64() + } else { + true + } } else { + error!("unexpected"); true } } - fn ends_after(&self, range: NanoRange) -> bool { - if let Some(&max) = self.ts2s.back() { - max > range.end + fn ends_after(&self, range: &SeriesRange) -> bool { + if range.is_time() { + if let Some(&max) = self.ts2s.back() { + max > range.end_u64() + } else { + true + } + } else if range.is_pulse() { + if let Some(&max) = self.pulses.back() { + max > range.end_u64() + } else { + true + } } else { - true + error!("unexpected"); + false } } - fn starts_after(&self, range: NanoRange) -> bool { - if let Some(&min) = self.ts1s.front() { - min >= range.end + fn starts_after(&self, range: &SeriesRange) -> bool { + if range.is_time() { + if let Some(&min) = self.ts1s.front() { + min >= range.end_u64() + } else { + true + } + } else if range.is_pulse() { + if let Some(&min) = self.pulses.front() { + min >= range.end_u64() + } else { + true + } } else { + error!("unexpected"); true } } @@ -386,7 +435,7 @@ impl CollectorType for BinsDim0Collector { self.timed_out = true; } - fn result(&mut self, _range: Option, binrange: Option) -> Result { + fn result(&mut self, _range: Option, binrange: Option) -> Result { let bin_count_exp = if let Some(r) = &binrange { r.bin_count() as u32 } else { @@ -517,7 +566,7 @@ where fn result( &mut self, range: Option, - binrange: Option, + binrange: Option, ) -> Result, Error> { match CollectorType::result(self, range, binrange) { Ok(res) => Ok(Box::new(res)), @@ -604,8 +653,8 @@ impl TimeBinnableTypeAggregator for BinsDim0Aggregator { } impl TimeBinnable for BinsDim0 { - fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box { - let ret = BinsDim0TimeBinner::::new(edges.into(), do_time_weight); + fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box { + let ret = BinsDim0TimeBinner::::new(binrange, do_time_weight); Box::new(ret) } @@ -623,7 +672,7 @@ pub struct BinsDim0TimeBinner { } impl BinsDim0TimeBinner { - fn new(edges: VecDeque, do_time_weight: bool) -> Self { + fn new(binrange: BinnedRangeEnum, do_time_weight: bool) -> Self { Self { edges, do_time_weight, diff --git a/items_2/src/binsxbindim0.rs b/items_2/src/binsxbindim0.rs index f6990d4..f15d8a4 100644 --- a/items_2/src/binsxbindim0.rs +++ b/items_2/src/binsxbindim0.rs @@ -1,23 +1,37 @@ -use crate::{ts_offs_from_abs, ts_offs_from_abs_with_anchor}; -use crate::{IsoDateTime, RangeOverlapInfo}; -use crate::{TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinner}; +use crate::ts_offs_from_abs; +use crate::ts_offs_from_abs_with_anchor; +use crate::IsoDateTime; +use crate::RangeOverlapInfo; +use crate::TimeBinnableType; +use crate::TimeBinnableTypeAggregator; use chrono::{TimeZone, Utc}; use err::Error; -use items_0::collect_s::{Collectable, CollectableType, CollectorType, ToJsonResult}; +use items_0::collect_s::Collectable; +use items_0::collect_s::CollectableType; +use items_0::collect_s::CollectorType; +use items_0::collect_s::ToJsonResult; use items_0::scalar_ops::ScalarOps; +use items_0::AppendEmptyBin; +use items_0::AsAnyMut; +use items_0::AsAnyRef; use items_0::Empty; +use items_0::TimeBinnable; use items_0::TimeBinned; +use items_0::TimeBinner; use items_0::TimeBins; use items_0::WithLen; -use items_0::{AppendEmptyBin, AsAnyMut, AsAnyRef}; +use netpod::log::*; use netpod::timeunits::SEC; +use netpod::BinnedRange; +use netpod::BinnedRangeEnum; use netpod::NanoRange; -use netpod::{log::*, BinnedRange}; use num_traits::Zero; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; +use serde::Serialize; use std::any::Any; use std::collections::VecDeque; -use std::{fmt, mem}; +use std::fmt; +use std::mem; #[allow(unused)] macro_rules! trace4 { @@ -387,7 +401,7 @@ impl CollectorType for BinsXbinDim0Collector { self.timed_out = true; } - fn result(&mut self, _range: Option, binrange: Option) -> Result { + fn result(&mut self, _range: Option, binrange: Option) -> Result { let bin_count_exp = if let Some(r) = &binrange { r.bin_count() as u32 } else { @@ -517,7 +531,7 @@ where fn result( &mut self, range: Option, - binrange: Option, + binrange: Option, ) -> Result, Error> { match CollectorType::result(self, range, binrange) { Ok(res) => Ok(Box::new(res)), @@ -604,8 +618,8 @@ impl TimeBinnableTypeAggregator for BinsXbinDim0Aggregator } impl TimeBinnable for BinsXbinDim0 { - fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box { - let ret = BinsXbinDim0TimeBinner::::new(edges.into(), do_time_weight); + fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box { + let ret = BinsXbinDim0TimeBinner::::new(binrange, do_time_weight); Box::new(ret) } @@ -623,7 +637,7 @@ pub struct BinsXbinDim0TimeBinner { } impl BinsXbinDim0TimeBinner { - fn new(edges: VecDeque, do_time_weight: bool) -> Self { + fn new(binrange: BinnedRangeEnum, do_time_weight: bool) -> Self { Self { edges, do_time_weight, diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index bf4b339..95406c3 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -9,6 +9,7 @@ use items_0::AsAnyMut; use items_0::AsAnyRef; use netpod::log::*; use netpod::BinnedRange; +use netpod::BinnedRangeEnum; use netpod::NanoRange; use serde::Deserialize; use serde::Serialize; @@ -607,7 +608,7 @@ pub struct ChannelEventsTimeBinner { edges: Vec, do_time_weight: bool, conn_state: ConnStatus, - binner: Option>, + binner: Option>, } impl fmt::Debug for ChannelEventsTimeBinner { @@ -719,7 +720,7 @@ impl AsAnyMut for ChannelEventsCollectorOutput { } } -impl crate::ToJsonResult for ChannelEventsCollectorOutput { +impl items_0::collect_s::ToJsonResult for ChannelEventsCollectorOutput { fn to_json_result(&self) -> Result, err::Error> { todo!() } @@ -783,7 +784,7 @@ impl items_0::collect_c::Collector for ChannelEventsCollector { fn result( &mut self, range: Option, - binrange: Option, + binrange: Option, ) -> Result, err::Error> { match self.coll.as_mut() { Some(coll) => { diff --git a/items_2/src/empty.rs b/items_2/src/empty.rs new file mode 100644 index 0000000..ae7b2e0 --- /dev/null +++ b/items_2/src/empty.rs @@ -0,0 +1,54 @@ +use crate::eventsdim0::EventsDim0; +use crate::eventsdim1::EventsDim1; +use crate::Error; +use items_0::Empty; +use items_0::Events; +use netpod::log::*; +use netpod::ScalarType; +use netpod::Shape; + +pub fn empty_events_dyn_ev(scalar_type: &ScalarType, shape: &Shape) -> Result, Error> { + let ret: Box = match shape { + Shape::Scalar => { + use ScalarType::*; + type K = EventsDim0; + match scalar_type { + U8 => Box::new(K::::empty()), + U16 => Box::new(K::::empty()), + U32 => Box::new(K::::empty()), + U64 => Box::new(K::::empty()), + I8 => Box::new(K::::empty()), + I16 => Box::new(K::::empty()), + I32 => Box::new(K::::empty()), + I64 => Box::new(K::::empty()), + F32 => Box::new(K::::empty()), + F64 => Box::new(K::::empty()), + BOOL => Box::new(K::::empty()), + STRING => Box::new(K::::empty()), + } + } + Shape::Wave(..) => { + use ScalarType::*; + type K = EventsDim1; + match scalar_type { + U8 => Box::new(K::::empty()), + U16 => Box::new(K::::empty()), + U32 => Box::new(K::::empty()), + U64 => Box::new(K::::empty()), + I8 => Box::new(K::::empty()), + I16 => Box::new(K::::empty()), + I32 => Box::new(K::::empty()), + I64 => Box::new(K::::empty()), + F32 => Box::new(K::::empty()), + F64 => Box::new(K::::empty()), + BOOL => Box::new(K::::empty()), + STRING => Box::new(K::::empty()), + } + } + Shape::Image(..) => { + error!("TODO empty_events_dyn_ev {scalar_type:?} {shape:?}"); + err::todoval() + } + }; + Ok(ret) +} diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 9ed69f7..08ec53d 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -1,25 +1,27 @@ use crate::binsdim0::BinsDim0; use crate::IsoDateTime; use crate::RangeOverlapInfo; -use crate::TimeBinnable; use crate::TimeBinnableType; use crate::TimeBinnableTypeAggregator; -use crate::TimeBinner; use err::Error; -use items_0::Appendable; use items_0::scalar_ops::ScalarOps; +use items_0::Appendable; use items_0::AsAnyMut; use items_0::AsAnyRef; use items_0::Empty; use items_0::Events; use items_0::EventsNonObj; +use items_0::TimeBinnable; +use items_0::TimeBinner; use items_0::WithLen; use netpod::log::*; use netpod::timeunits::SEC; -use netpod::BinnedRange; +use netpod::BinnedRangeEnum; use netpod::NanoRange; +use netpod::SeriesRange; use serde::Deserialize; use serde::Serialize; +use std::any; use std::any::Any; use std::collections::VecDeque; use std::fmt; @@ -124,26 +126,59 @@ impl WithLen for EventsDim0 { } impl RangeOverlapInfo for EventsDim0 { - fn ends_before(&self, range: NanoRange) -> bool { - if let Some(&max) = self.tss.back() { - max < range.beg + fn ends_before(&self, range: &SeriesRange) -> bool { + if range.is_time() { + if let Some(&max) = self.tss.back() { + max < range.beg_u64() + } else { + true + } + } else if range.is_pulse() { + if let Some(&max) = self.pulses.back() { + max < range.beg_u64() + } else { + true + } } else { + error!("unexpected"); true } } - fn ends_after(&self, range: NanoRange) -> bool { - if let Some(&max) = self.tss.back() { - max >= range.end + fn ends_after(&self, range: &SeriesRange) -> bool { + if range.is_time() { + if let Some(&max) = self.tss.back() { + max >= range.end_u64() + } else { + true + } + } else if range.is_pulse() { + if let Some(&max) = self.pulses.back() { + max >= range.end_u64() + } else { + true + } } else { - true + error!("unexpected"); + false } } - fn starts_after(&self, range: NanoRange) -> bool { - if let Some(&min) = self.tss.front() { - min >= range.end + fn starts_after(&self, range: &SeriesRange) -> bool { + if range.is_time() { + if let Some(&min) = self.tss.front() { + min >= range.end_u64() + } else { + true + } + } else if range.is_pulse() { + if let Some(&min) = self.pulses.front() { + min >= range.end_u64() + } else { + true + } } else { + error!("unexpected"); true } } @@ -156,8 +191,8 @@ where type Output = BinsDim0; type Aggregator = EventsDim0Aggregator; - fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { - let self_name = std::any::type_name::(); + fn aggregator(range: SeriesRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { + let self_name = any::type_name::(); debug!( "TimeBinnableType for {self_name} aggregator() range {:?} x_bin_count {} do_time_weight {}", range, x_bin_count, do_time_weight @@ -334,7 +369,7 @@ impl items_0::collect_s::CollectorType for EventsDim0Collector, _binrange: Option) -> Result { + fn result(&mut self, range: Option, _binrange: Option) -> Result { // If we timed out, we want to hint the client from where to continue. // This is tricky: currently, client can not request a left-exclusive range. // We currently give the timestamp of the last event plus a small delta. @@ -418,7 +453,7 @@ impl items_0::collect_c::Collector for EventsDim0Collector fn result( &mut self, range: Option, - binrange: Option, + binrange: Option, ) -> Result, err::Error> { match items_0::collect_s::CollectorType::result(self, range, binrange) { Ok(x) => Ok(Box::new(x)), @@ -428,7 +463,7 @@ impl items_0::collect_c::Collector for EventsDim0Collector } pub struct EventsDim0Aggregator { - range: NanoRange, + range: SeriesRange, count: u64, min: NTY, max: NTY, @@ -455,10 +490,10 @@ impl Drop for EventsDim0Aggregator { } impl EventsDim0Aggregator { - pub fn new(range: NanoRange, do_time_weight: bool) -> Self { - let int_ts = range.beg; + pub fn new(binrange: SeriesRange, do_time_weight: bool) -> Self { + let int_ts = binrange.beg_u64(); Self { - range, + range: binrange, count: 0, min: NTY::zero_b(), max: NTY::zero_b(), @@ -511,15 +546,15 @@ impl EventsDim0Aggregator { } } - fn apply_event_time_weight(&mut self, ts: u64) { + fn apply_event_time_weight(&mut self, px: u64, pxbeg: u64) { if let Some(v) = &self.last_seen_val { let vf = v.as_prim_f32_b(); let v2 = v.clone(); - if ts > self.range.beg { + if px > pxbeg { self.apply_min_max(v2); } let w = if self.do_time_weight { - (ts - self.int_ts) as f32 * 1e-9 + (px - self.int_ts) as f32 * 1e-9 } else { 1. }; @@ -528,73 +563,80 @@ impl EventsDim0Aggregator { self.sum += vf * w; self.sumc += 1; } - self.int_ts = ts; + self.int_ts = px; } else { - debug!( - "apply_event_time_weight NO VALUE {}", - ts as i64 - self.range.beg as i64 - ); + debug!("apply_event_time_weight NO VALUE"); } } fn ingest_unweight(&mut self, item: &::Input) { trace!("TODO check again result_reset_unweight"); err::todo(); - for i1 in 0..item.tss.len() { - let ts = item.tss[i1]; - let val = item.values[i1].clone(); - if ts < self.range.beg { - self.events_ignored_count += 1; - } else if ts >= self.range.end { - self.events_ignored_count += 1; - return; - } else { - self.apply_event_unweight(val); - self.count += 1; - self.events_taken_count += 1; + if self.range.is_time() { + for i1 in 0..item.tss.len() { + let ts = item.tss[i1]; + let val = item.values[i1].clone(); + if ts < self.range.beg_u64() { + self.events_ignored_count += 1; + } else if ts >= self.range.end_u64() { + self.events_ignored_count += 1; + return; + } else { + self.apply_event_unweight(val); + self.count += 1; + self.events_taken_count += 1; + } } + } else { + error!("TODO ingest_unweight"); + err::todo(); } } fn ingest_time_weight(&mut self, item: &::Input) { - let self_name = std::any::type_name::(); + let self_name = any::type_name::(); trace!("{self_name}::ingest_time_weight item len {}", item.len()); - for i1 in 0..item.tss.len() { - let ts = item.tss[i1]; - let val = item.values[i1].clone(); - trace!("{self_name} ingest {:6} {:20} {:10?}", i1, ts, val); - if ts < self.int_ts { - if self.last_seen_val.is_none() { - info!( - "ingest_time_weight event before range, only set last ts {} val {:?}", - ts, val - ); + if self.range.is_time() { + for i1 in 0..item.tss.len() { + let ts = item.tss[i1]; + let val = item.values[i1].clone(); + trace!("{self_name} ingest {:6} {:20} {:10?}", i1, ts, val); + if ts < self.int_ts { + if self.last_seen_val.is_none() { + info!( + "ingest_time_weight event before range, only set last ts {} val {:?}", + ts, val + ); + } + self.events_ignored_count += 1; + self.last_seen_ts = ts; + self.last_seen_val = Some(val); + } else if ts >= self.range.end_u64() { + self.events_ignored_count += 1; + return; + } else { + if false && self.last_seen_val.is_none() { + // TODO no longer needed or? + info!( + "call apply_min_max without last val, use current instead {} {:?}", + ts, val + ); + self.apply_min_max(val.clone()); + } + self.apply_event_time_weight(ts, self.range.beg_u64()); + self.count += 1; + self.last_seen_ts = ts; + self.last_seen_val = Some(val); + self.events_taken_count += 1; } - self.events_ignored_count += 1; - self.last_seen_ts = ts; - self.last_seen_val = Some(val); - } else if ts >= self.range.end { - self.events_ignored_count += 1; - return; - } else { - if false && self.last_seen_val.is_none() { - // TODO no longer needed or? - info!( - "call apply_min_max without last val, use current instead {} {:?}", - ts, val - ); - self.apply_min_max(val.clone()); - } - self.apply_event_time_weight(ts); - self.count += 1; - self.last_seen_ts = ts; - self.last_seen_val = Some(val); - self.events_taken_count += 1; } + } else { + error!("TODO ingest_unweight"); + err::todo(); } } - fn result_reset_unweight(&mut self, range: NanoRange, _expand: bool) -> BinsDim0 { + fn result_reset_unweight(&mut self, range: SeriesRange) -> BinsDim0 { trace!("TODO check again result_reset_unweight"); err::todo(); let (min, max, avg) = if self.sumc > 0 { @@ -607,15 +649,20 @@ impl EventsDim0Aggregator { }; (g.clone(), g.clone(), g.as_prim_f32_b()) }; - let ret = BinsDim0 { - ts1s: [self.range.beg].into(), - ts2s: [self.range.end].into(), - counts: [self.count].into(), - mins: [min].into(), - maxs: [max].into(), - avgs: [avg].into(), + let ret = if self.range.is_time() { + BinsDim0 { + ts1s: [self.range.beg_u64()].into(), + ts2s: [self.range.end_u64()].into(), + counts: [self.count].into(), + mins: [min].into(), + maxs: [max].into(), + avgs: [avg].into(), + } + } else { + error!("TODO result_reset_unweight"); + err::todoval() }; - self.int_ts = range.beg; + self.int_ts = range.beg_u64(); self.range = range; self.count = 0; self.sum = 0f32; @@ -624,16 +671,17 @@ impl EventsDim0Aggregator { ret } - fn result_reset_time_weight(&mut self, range: NanoRange, expand: bool) -> BinsDim0 { + fn result_reset_time_weight(&mut self, range: SeriesRange) -> BinsDim0 { // TODO check callsite for correct expand status. - if expand { - debug!("result_reset_time_weight calls apply_event_time_weight"); - self.apply_event_time_weight(self.range.end); + debug!("result_reset_time_weight calls apply_event_time_weight"); + if self.range.is_time() { + self.apply_event_time_weight(self.range.end_u64(), self.range.beg_u64()); } else { - debug!("result_reset_time_weight NO EXPAND"); + error!("TODO result_reset_time_weight"); + err::todoval() } let (min, max, avg) = if self.sumc > 0 { - let avg = self.sum / (self.range.delta() as f32 * 1e-9); + let avg = self.sum / (self.range.delta_u64() as f32 * 1e-9); (self.min.clone(), self.max.clone(), avg) } else { let g = match &self.last_seen_val { @@ -642,15 +690,20 @@ impl EventsDim0Aggregator { }; (g.clone(), g.clone(), g.as_prim_f32_b()) }; - let ret = BinsDim0 { - ts1s: [self.range.beg].into(), - ts2s: [self.range.end].into(), - counts: [self.count].into(), - mins: [min].into(), - maxs: [max].into(), - avgs: [avg].into(), + let ret = if self.range.is_time() { + BinsDim0 { + ts1s: [self.range.beg_u64()].into(), + ts2s: [self.range.end_u64()].into(), + counts: [self.count].into(), + mins: [min].into(), + maxs: [max].into(), + avgs: [avg].into(), + } + } else { + error!("TODO result_reset_time_weight"); + err::todoval() }; - self.int_ts = range.beg; + self.int_ts = range.beg_u64(); self.range = range; self.count = 0; self.sum = 0.; @@ -666,7 +719,7 @@ impl TimeBinnableTypeAggregator for EventsDim0Aggregator { type Input = EventsDim0; type Output = BinsDim0; - fn range(&self) -> &NanoRange { + fn range(&self) -> &SeriesRange { &self.range } @@ -686,19 +739,19 @@ impl TimeBinnableTypeAggregator for EventsDim0Aggregator { } } - fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output { - trace!("result_reset {} {}", range.beg, range.end); + fn result_reset(&mut self, range: SeriesRange, _expand: bool) -> Self::Output { + trace!("result_reset {:?}", range); if self.do_time_weight { - self.result_reset_time_weight(range, expand) + self.result_reset_time_weight(range) } else { - self.result_reset_unweight(range, expand) + self.result_reset_unweight(range) } } } impl TimeBinnable for EventsDim0 { - fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box { - let ret = EventsDim0TimeBinner::::new(edges.into(), do_time_weight).unwrap(); + fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box { + let ret = EventsDim0TimeBinner::::new(binrange, do_time_weight).unwrap(); Box::new(ret) } @@ -873,28 +926,29 @@ impl Events for EventsDim0 { } pub struct EventsDim0TimeBinner { - edges: VecDeque, + binrange: BinnedRangeEnum, + rix: usize, + rng: Option, agg: EventsDim0Aggregator, ready: Option< as TimeBinnableTypeAggregator>::Output>, range_complete: bool, } impl EventsDim0TimeBinner { - fn new(edges: VecDeque, do_time_weight: bool) -> Result { - if edges.len() < 2 { - return Err(Error::with_msg_no_trace(format!("need at least 2 edges"))); - } - let self_name = std::any::type_name::(); - trace!("{self_name}::new edges {edges:?}"); - let agg = EventsDim0Aggregator::new( - NanoRange { - beg: edges[0], - end: edges[1], - }, - do_time_weight, - ); + fn self_name() -> &'static str { + any::type_name::() + } + + fn new(binrange: BinnedRangeEnum, do_time_weight: bool) -> Result { + trace!("{}::new binrange {binrange:?}", Self::self_name()); + let rng = binrange + .range_at(0) + .ok_or_else(|| Error::with_msg_no_trace("empty binrange"))?; + let agg = EventsDim0Aggregator::new(rng, do_time_weight); let ret = Self { - edges, + binrange, + rix: 0, + rng: Some(agg.range.clone()), agg, ready: None, range_complete: false, @@ -902,18 +956,13 @@ impl EventsDim0TimeBinner { Ok(ret) } - fn next_bin_range(&mut self) -> Option { - let self_name = std::any::type_name::(); - if self.edges.len() >= 3 { - self.edges.pop_front(); - let ret = NanoRange { - beg: self.edges[0], - end: self.edges[1], - }; - trace!("{self_name} next_bin_range {} {}", ret.beg, ret.end); - Some(ret) + fn next_bin_range(&mut self) -> Option { + let self_name = any::type_name::(); + self.rix += 1; + if let Some(rng) = self.binrange.range_at(self.rix) { + trace!("{self_name} next_bin_range {:?}", rng); + Some(rng) } else { - self.edges.clear(); trace!("{self_name} next_bin_range None"); None } @@ -936,37 +985,29 @@ impl TimeBinner for EventsDim0TimeBinner { } fn ingest(&mut self, item: &dyn TimeBinnable) { - let self_name = std::any::type_name::(); - trace2!( - "TimeBinner for EventsDim0TimeBinner {:?}\n{:?}\n------------------------------------", - self.edges.iter().take(2).collect::>(), - item - ); + let self_name = any::type_name::(); + trace2!("TimeBinner for {self_name} {:?}", item); if item.len() == 0 { // Return already here, RangeOverlapInfo would not give much sense. return; } - if self.edges.len() < 2 { - warn!("{self_name} no more bin in edges A"); - return; - } // TODO optimize by remembering at which event array index we have arrived. // That needs modified interfaces which can take and yield the start and latest index. loop { - while item.starts_after(self.agg.range().clone()) { + while item.starts_after(self.agg.range()) { trace!("{self_name} IGNORE ITEM AND CYCLE BECAUSE item.starts_after"); self.cycle(); - if self.edges.len() < 2 { + if self.rng.is_none() { warn!("{self_name} no more bin in edges B"); return; } } - if item.ends_before(self.agg.range().clone()) { - trace!("{self_name} IGNORE ITEM BECAUSE ends_before\n------------- -----------"); + if item.ends_before(self.agg.range()) { + trace!("{self_name} IGNORE ITEM BECAUSE ends_before"); return; } else { - if self.edges.len() < 2 { - trace!("{self_name} edge list exhausted"); + if self.rng.is_none() { + trace!("{self_name} no more bin in edges D"); return; } else { if let Some(item) = item @@ -977,10 +1018,10 @@ impl TimeBinner for EventsDim0TimeBinner { // TODO collect statistics associated with this request: trace!("{self_name} FEED THE ITEM..."); self.agg.ingest(item); - if item.ends_after(self.agg.range().clone()) { + if item.ends_after(self.agg.range()) { trace!("{self_name} FED ITEM, ENDS AFTER."); self.cycle(); - if self.edges.len() < 2 { + if self.rng.is_none() { warn!("{self_name} no more bin in edges C"); return; } else { @@ -991,7 +1032,7 @@ impl TimeBinner for EventsDim0TimeBinner { break; } } else { - panic!("{self_name} not correct item type"); + error!("{self_name}::ingest unexpected item type"); }; } } @@ -999,35 +1040,38 @@ impl TimeBinner for EventsDim0TimeBinner { } fn push_in_progress(&mut self, push_empty: bool) { - let self_name = std::any::type_name::(); + let self_name = any::type_name::(); trace!("{self_name}::push_in_progress"); // TODO expand should be derived from AggKind. Is it still required after all? // TODO here, the expand means that agg will assume that the current value is kept constant during // the rest of the time range. - if self.edges.len() >= 2 { + if self.rng.is_none() { + } else { let expand = true; - let range_next = if let Some(x) = self.next_bin_range() { - Some(x) - } else { - None - }; + let range_next = self.next_bin_range(); + self.rng = range_next.clone(); let mut bins = if let Some(range_next) = range_next { self.agg.result_reset(range_next, expand) } else { + // Acts as placeholder let range_next = NanoRange { beg: u64::MAX - 1, end: u64::MAX, }; - self.agg.result_reset(range_next, expand) + self.agg.result_reset(range_next.into(), expand) }; - assert_eq!(bins.len(), 1); - if push_empty || bins.counts[0] != 0 { - match self.ready.as_mut() { - Some(ready) => { - ready.append_all_from(&mut bins); - } - None => { - self.ready = Some(bins); + if bins.len() != 1 { + error!("{self_name}::push_in_progress bins.len() {}", bins.len()); + return; + } else { + if push_empty || bins.counts[0] != 0 { + match self.ready.as_mut() { + Some(ready) => { + ready.append_all_from(&mut bins); + } + None => { + self.ready = Some(bins); + } } } } @@ -1035,15 +1079,21 @@ impl TimeBinner for EventsDim0TimeBinner { } fn cycle(&mut self) { - let self_name = std::any::type_name::(); + let self_name = any::type_name::(); trace!("{self_name}::cycle"); // TODO refactor this logic. let n = self.bins_ready_count(); self.push_in_progress(true); if self.bins_ready_count() == n { - if let Some(range) = self.next_bin_range() { + let range_next = self.next_bin_range(); + self.rng = range_next.clone(); + if let Some(range) = range_next { let mut bins = BinsDim0::::empty(); - bins.append_zero(range.beg, range.end); + if range.is_time() { + bins.append_zero(range.beg_u64(), range.end_u64()); + } else { + error!("TODO {self_name}::cycle is_pulse"); + } match self.ready.as_mut() { Some(ready) => { ready.append_all_from(&mut bins); @@ -1101,7 +1151,7 @@ impl items_0::collect_c::CollectorDyn for EventsDim0CollectorDyn { fn result( &mut self, _range: Option, - _binrange: Option, + _binrange: Option, ) -> Result, err::Error> { todo!() } @@ -1133,7 +1183,7 @@ impl items_0::collect_c::CollectorDyn for EventsDim0Collector, - binrange: Option, + binrange: Option, ) -> Result, err::Error> { items_0::collect_s::CollectorType::result(self, range, binrange) .map(|x| Box::new(x) as _) diff --git a/items_2/src/eventsdim1.rs b/items_2/src/eventsdim1.rs index b29c289..640ad48 100644 --- a/items_2/src/eventsdim1.rs +++ b/items_2/src/eventsdim1.rs @@ -1,10 +1,8 @@ use crate::binsdim0::BinsDim0; use crate::IsoDateTime; use crate::RangeOverlapInfo; -use crate::TimeBinnable; use crate::TimeBinnableType; use crate::TimeBinnableTypeAggregator; -use crate::TimeBinner; use err::Error; use items_0::scalar_ops::ScalarOps; use items_0::Appendable; @@ -13,10 +11,13 @@ use items_0::AsAnyRef; use items_0::Empty; use items_0::Events; use items_0::EventsNonObj; +use items_0::TimeBinnable; +use items_0::TimeBinner; use items_0::WithLen; use netpod::log::*; use netpod::timeunits::SEC; use netpod::BinnedRange; +use netpod::BinnedRangeEnum; use netpod::NanoRange; use serde::Deserialize; use serde::Serialize; @@ -295,7 +296,7 @@ impl items_0::collect_s::CollectorType for EventsDim1Collector, _binrange: Option) -> Result { + fn result(&mut self, range: Option, _binrange: Option) -> Result { // If we timed out, we want to hint the client from where to continue. // This is tricky: currently, client can not request a left-exclusive range. // We currently give the timestamp of the last event plus a small delta. @@ -366,7 +367,7 @@ impl items_0::collect_c::Collector for EventsDim1Collector fn result( &mut self, range: Option, - binrange: Option, + binrange: Option, ) -> Result, err::Error> { match items_0::collect_s::CollectorType::result(self, range, binrange) { Ok(x) => Ok(Box::new(x)), @@ -651,8 +652,8 @@ impl TimeBinnableTypeAggregator for EventsDim1Aggregator { } impl TimeBinnable for EventsDim1 { - fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box { - let ret = EventsDim1TimeBinner::::new(edges.into(), do_time_weight).unwrap(); + fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box { + let ret = EventsDim1TimeBinner::::new(binrange, do_time_weight).unwrap(); Box::new(ret) } @@ -834,7 +835,7 @@ pub struct EventsDim1TimeBinner { } impl EventsDim1TimeBinner { - fn new(edges: VecDeque, do_time_weight: bool) -> Result { + fn new(binrange: BinnedRangeEnum, do_time_weight: bool) -> Result { if edges.len() < 2 { return Err(Error::with_msg_no_trace(format!("need at least 2 edges"))); } @@ -1055,7 +1056,7 @@ impl items_0::collect_c::CollectorDyn for EventsDim1CollectorDyn { fn result( &mut self, _range: Option, - _binrange: Option, + _binrange: Option, ) -> Result, err::Error> { todo!() } @@ -1087,7 +1088,7 @@ impl items_0::collect_c::CollectorDyn for EventsDim1Collector, - binrange: Option, + binrange: Option, ) -> Result, err::Error> { items_0::collect_s::CollectorType::result(self, range, binrange) .map(|x| Box::new(x) as _) diff --git a/items_2/src/eventsxbindim0.rs b/items_2/src/eventsxbindim0.rs index cb95027..a907e23 100644 --- a/items_2/src/eventsxbindim0.rs +++ b/items_2/src/eventsxbindim0.rs @@ -11,6 +11,7 @@ use items_0::Empty; use items_0::WithLen; use netpod::log::*; use netpod::BinnedRange; +use netpod::BinnedRangeEnum; use netpod::NanoRange; use serde::Deserialize; use serde::Serialize; @@ -447,7 +448,7 @@ where self.timed_out = true; } - fn result(&mut self, range: Option, _binrange: Option) -> Result { + fn result(&mut self, range: Option, _binrange: Option) -> Result { use std::mem::replace; let continue_at = if self.timed_out { if let Some(ts) = self.vals.tss.back() { @@ -521,7 +522,7 @@ where fn result( &mut self, _range: Option, - _binrange: Option, + _binrange: Option, ) -> Result, Error> { todo!() } diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index 1d31049..39120f9 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -1,6 +1,8 @@ +pub mod binnedcollected; pub mod binsdim0; pub mod binsxbindim0; pub mod channelevents; +pub mod empty; pub mod eventfull; pub mod eventsdim0; pub mod eventsdim1; @@ -23,32 +25,21 @@ use chrono::Utc; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; -use items_0::collect_s::Collector; -use items_0::collect_s::ToJsonResult; -use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; -use items_0::streamitem::StreamItem; use items_0::Empty; use items_0::Events; use items_0::RangeOverlapInfo; -use items_0::TimeBinnable; -use items_0::TimeBinner; use netpod::log::*; use netpod::timeunits::*; -use netpod::transform::Transform; -use netpod::AggKind; -use netpod::BinnedRange; use netpod::NanoRange; use netpod::ScalarType; +use netpod::SeriesRange; use netpod::Shape; use serde::Deserialize; use serde::Serialize; use serde::Serializer; use std::collections::VecDeque; use std::fmt; -use std::pin::Pin; -use std::time::Duration; -use std::time::Instant; pub fn bool_is_false(x: &bool) -> bool { *x == false @@ -212,241 +203,20 @@ impl crate::merger::Mergeable for Box { pub trait TimeBinnableType: Send + Unpin + RangeOverlapInfo + Empty { type Output: TimeBinnableType; type Aggregator: TimeBinnableTypeAggregator + Send + Unpin; - fn aggregator(range: NanoRange, bin_count: usize, do_time_weight: bool) -> Self::Aggregator; + fn aggregator(range: SeriesRange, bin_count: usize, do_time_weight: bool) -> Self::Aggregator; } pub trait TimeBinnableTypeAggregator: Send { type Input: TimeBinnableType; type Output: TimeBinnableType; - fn range(&self) -> &NanoRange; + fn range(&self) -> &SeriesRange; fn ingest(&mut self, item: &Self::Input); - fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output; + fn result_reset(&mut self, range: SeriesRange, expand: bool) -> Self::Output; } -pub fn empty_events_dyn_ev( - scalar_type: &ScalarType, - shape: &Shape, - transform: &Transform, -) -> Result, Error> { - let ret: Box = match shape { - Shape::Scalar => match transform { - _ if true => { - use ScalarType::*; - type K = eventsdim0::EventsDim0; - match scalar_type { - U8 => Box::new(K::::empty()), - U16 => Box::new(K::::empty()), - U32 => Box::new(K::::empty()), - U64 => Box::new(K::::empty()), - I8 => Box::new(K::::empty()), - I16 => Box::new(K::::empty()), - I32 => Box::new(K::::empty()), - I64 => Box::new(K::::empty()), - F32 => Box::new(K::::empty()), - F64 => Box::new(K::::empty()), - BOOL => Box::new(K::::empty()), - STRING => Box::new(K::::empty()), - } - } - _ if true => Box::new(eventsdim0::EventsDim0::::empty()), - _ => { - error!("TODO empty_events_dyn_ev {transform:?} {scalar_type:?} {shape:?}"); - err::todoval() - } - }, - Shape::Wave(..) => match transform { - _ if true => { - use ScalarType::*; - type K = eventsdim1::EventsDim1; - match scalar_type { - U8 => Box::new(K::::empty()), - U16 => Box::new(K::::empty()), - U32 => Box::new(K::::empty()), - U64 => Box::new(K::::empty()), - I8 => Box::new(K::::empty()), - I16 => Box::new(K::::empty()), - I32 => Box::new(K::::empty()), - I64 => Box::new(K::::empty()), - F32 => Box::new(K::::empty()), - F64 => Box::new(K::::empty()), - BOOL => Box::new(K::::empty()), - STRING => Box::new(K::::empty()), - } - } - _ if true => Box::new(eventsdim0::EventsDim0::::empty()), - _ => { - error!("TODO empty_events_dyn_ev {transform:?} {scalar_type:?} {shape:?}"); - err::todoval() - } - }, - Shape::Image(..) => { - error!("TODO empty_events_dyn_ev {transform:?} {scalar_type:?} {shape:?}"); - err::todoval() - } - }; - Ok(ret) -} +pub trait ChannelEventsInput: Stream> + items_0::Transformer + Send {} -pub fn empty_binned_dyn_tb(scalar_type: &ScalarType, shape: &Shape, transform: &Transform) -> Box { - error!("TODO empty_binned_dyn_tb"); - todo!() -} - -fn flush_binned( - binner: &mut Box, - coll: &mut Option>, - force: bool, -) -> Result<(), Error> { - trace!("flush_binned bins_ready_count: {}", binner.bins_ready_count()); - if force { - if binner.bins_ready_count() == 0 { - debug!("cycle the binner forced"); - binner.cycle(); - } else { - debug!("bins ready, do not force"); - } - } - if binner.bins_ready_count() > 0 { - let ready = binner.bins_ready(); - match ready { - Some(mut ready) => { - trace!("binned_collected ready {ready:?}"); - if coll.is_none() { - *coll = Some(ready.as_collectable_mut().new_collector()); - } - let cl = coll.as_mut().unwrap(); - cl.ingest(ready.as_collectable_mut()); - Ok(()) - } - None => Err(format!("bins_ready_count but no result").into()), - } - } else { - Ok(()) - } -} - -// TODO remove. -// Compare with items_2::test::bin01 -pub async fn binned_collected( - scalar_type: ScalarType, - shape: Shape, - agg_kind: AggKind, - binrange: BinnedRange, - timeout: Duration, - inp: Pin> + Send>>, -) -> Result, Error> { - event!(Level::TRACE, "binned_collected"); - let transform = Transform::default_time_binned(); - let edges = binrange.edges(); - if edges.len() < 2 { - return Err(format!("binned_collected but edges.len() {}", edges.len()).into()); - } - let ts_edges_max = *edges.last().unwrap(); - let deadline = Instant::now() + timeout; - let mut did_timeout = false; - // TODO use a trait to allow check of unfinished data [hcn2956jxhwsf] - #[allow(unused)] - let bin_count_exp = edges.len().max(2) as u32 - 1; - let do_time_weight = agg_kind.do_time_weighted(); - // TODO maybe TimeBinner should take all ChannelEvents and handle this? - let mut did_range_complete = false; - let mut coll = None; - let mut binner = None; - let empty_item = empty_events_dyn_ev(&scalar_type, &shape, &transform)?; - let tmp_item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(ChannelEvents::Events( - empty_item, - )))); - let empty_stream = futures_util::stream::once(futures_util::future::ready(tmp_item)); - let mut stream = empty_stream.chain(inp); - loop { - let item = futures_util::select! { - k = stream.next().fuse() => { - if let Some(k) = k { - k? - }else { - break; - } - }, - _ = tokio::time::sleep_until(deadline.into()).fuse() => { - did_timeout = true; - break; - } - }; - match item { - StreamItem::DataItem(k) => match k { - RangeCompletableItem::RangeComplete => { - did_range_complete = true; - } - RangeCompletableItem::Data(k) => match k { - ChannelEvents::Events(events) => { - if events.starts_after(NanoRange { - beg: 0, - end: ts_edges_max, - }) { - } else { - if binner.is_none() { - let bb = events.as_time_binnable().time_binner_new(edges.clone(), do_time_weight); - binner = Some(bb); - } - let binner = binner.as_mut().unwrap(); - binner.ingest(events.as_time_binnable()); - flush_binned(binner, &mut coll, false)?; - } - } - ChannelEvents::Status(item) => { - trace!("{:?}", item); - } - }, - }, - StreamItem::Log(item) => { - // TODO collect also errors here? - trace!("{:?}", item); - } - StreamItem::Stats(item) => { - // TODO do something with the stats - trace!("{:?}", item); - } - } - } - if let Some(mut binner) = binner { - if did_range_complete { - trace!("did_range_complete"); - binner.set_range_complete(); - } else { - debug!("range not complete"); - } - if did_timeout { - warn!("timeout"); - } else { - trace!("cycle the binner"); - binner.cycle(); - } - trace!("flush binned"); - flush_binned(&mut binner, &mut coll, false)?; - if coll.is_none() { - debug!("force a bin"); - flush_binned(&mut binner, &mut coll, true)?; - } else { - trace!("coll is already some"); - } - } else { - error!("no binner, should always have one"); - } - match coll { - Some(mut coll) => { - let res = coll.result(None, Some(binrange)).map_err(|e| format!("{e}"))?; - tokio::time::sleep(Duration::from_millis(2000)).await; - Ok(res) - } - None => { - error!("binned_collected nothing collected"); - let item = empty_binned_dyn_tb(&scalar_type, &shape, &transform); - let ret = item.to_box_to_json_result(); - tokio::time::sleep(Duration::from_millis(2000)).await; - Ok(ret) - } - } -} +impl ChannelEventsInput for T where T: Stream> + items_0::Transformer + Send {} pub fn runfut(fut: F) -> Result where diff --git a/items_2/src/merger.rs b/items_2/src/merger.rs index f283d6e..c1dfe98 100644 --- a/items_2/src/merger.rs +++ b/items_2/src/merger.rs @@ -409,3 +409,9 @@ where } } } + +impl items_0::Transformer for Merger { + fn query_transform_properties(&self) -> items_0::TransformProperties { + todo!() + } +} diff --git a/items_2/src/streams.rs b/items_2/src/streams.rs index 8b13789..d2a3c79 100644 --- a/items_2/src/streams.rs +++ b/items_2/src/streams.rs @@ -1 +1,167 @@ +use futures_util::Future; +use futures_util::FutureExt; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::TransformProperties; +use items_0::Transformer; +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +pub struct Enumerate2 { + inp: T, + cnt: usize, +} + +impl Enumerate2 { + pub fn new(inp: T) -> Self + where + T: Transformer, + { + Self { inp, cnt: 0 } + } +} + +impl Stream for Enumerate2 +where + T: Stream + Unpin, +{ + type Item = (usize, ::Item); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + match self.inp.poll_next_unpin(cx) { + Ready(Some(item)) => { + let i = self.cnt; + self.cnt += 1; + Ready(Some((i, item))) + } + Ready(None) => Ready(None), + Pending => Pending, + } + } +} + +impl Transformer for Enumerate2 { + fn query_transform_properties(&self) -> TransformProperties { + todo!() + } +} + +pub struct Then2 { + inp: T, + f: F, + fut: Option>>, +} + +impl Then2 +where + T: Stream, + F: FnMut(::Item) -> Fut, + Fut: Future, +{ + pub fn new(inp: T, f: F) -> Self { + Self { inp, f, fut: None } + } + + fn prepare_fut(&mut self, item: ::Item) { + self.fut = Some(Box::pin((self.f)(item))); + } +} + +impl Stream for Then2 +where + T: Stream + Unpin, + Fut: Future, +{ + type Item = ::Output; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break if let Some(fut) = &mut self.fut { + match fut.poll_unpin(cx) { + Ready(item) => { + self.fut = None; + Ready(Some(item)) + } + Pending => Pending, + } + } else { + match self.inp.poll_next_unpin(cx) { + Ready(Some(item)) => { + continue; + } + Ready(None) => Ready(None), + Pending => Pending, + } + }; + } + } +} + +impl Transformer for Then2 { + fn query_transform_properties(&self) -> TransformProperties { + todo!() + } +} + +pub trait TransformerExt { + fn enumerate2(self) -> Enumerate2 + where + Self: Transformer + Sized; + + fn then2(self, f: F) -> Then2 + where + Self: Transformer + Stream + Sized, + F: FnMut(::Item) -> Fut, + Fut: Future; +} + +impl TransformerExt for T { + fn enumerate2(self) -> Enumerate2 + where + Self: Transformer + Sized, + { + Enumerate2::new(self) + } + + fn then2(self, f: F) -> Then2 + where + Self: Transformer + Stream + Sized, + F: FnMut(::Item) -> Fut, + Fut: Future, + { + Then2::new(self, f) + } +} + +pub struct VecStream { + inp: VecDeque, +} + +impl VecStream { + pub fn new(inp: VecDeque) -> Self { + Self { inp } + } +} + +impl Stream for VecStream { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + if let Some(item) = self.inp.pop_front() { + Ready(Some(item)) + } else { + Ready(None) + } + } +} + +impl Transformer for VecStream { + fn query_transform_properties(&self) -> TransformProperties { + todo!() + } +} diff --git a/items_2/src/test.rs b/items_2/src/test.rs index f36dfaa..20a040e 100644 --- a/items_2/src/test.rs +++ b/items_2/src/test.rs @@ -1,4 +1,4 @@ -use crate::binned_collected; +use crate::binnedcollected::BinnedCollected; use crate::binsdim0::BinsDim0CollectedResult; use crate::channelevents::ConnStatus; use crate::channelevents::ConnStatusEvent; @@ -6,6 +6,8 @@ use crate::eventsdim0::EventsDim0; use crate::merger::Mergeable; use crate::merger::Merger; use crate::runfut; +use crate::streams::TransformerExt; +use crate::streams::VecStream; use crate::testgen::make_some_boxed_d0_f32; use crate::ChannelEvents; use crate::Error; @@ -24,10 +26,12 @@ use netpod::log::*; use netpod::timeunits::*; use netpod::AggKind; use netpod::BinnedRange; +use netpod::BinnedRangeEnum; use netpod::NanoRange; use netpod::ScalarType; use netpod::Shape; use std::time::Duration; +use std::time::Instant; #[test] fn items_move_events() { @@ -326,94 +330,24 @@ fn bin01() { let inp1 = futures_util::stream::iter(inp1); let inp1 = Box::pin(inp1); let inp2 = Box::pin(futures_util::stream::empty()) as _; - let mut stream = crate::merger::Merger::new(vec![inp1, inp2], 32); - let mut coll = None; - let mut binner = None; + let stream = crate::merger::Merger::new(vec![inp1, inp2], 32); let range = NanoRange { beg: SEC * 0, end: SEC * 100, }; - let binrange = BinnedRange::covering_range(range, 10).unwrap(); - let edges = binrange.edges(); - // TODO implement continue-at [hcn2956jxhwsf] - #[allow(unused)] - let bin_count_exp = (edges.len() - 1) as u32; + let binrange = BinnedRangeEnum::covering_range(range.into(), 10).unwrap(); + let deadline = Instant::now() + Duration::from_millis(4000); let do_time_weight = true; - while let Some(item) = stream.next().await { - let item = item?; - match item { - StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => todo!(), - RangeCompletableItem::Data(item) => match item { - ChannelEvents::Events(events) => { - if binner.is_none() { - let bb = events.as_time_binnable().time_binner_new(edges.clone(), do_time_weight); - binner = Some(bb); - } - let binner = binner.as_mut().unwrap(); - binner.ingest(events.as_time_binnable()); - eprintln!("bins_ready_count: {}", binner.bins_ready_count()); - if binner.bins_ready_count() > 0 { - let ready = binner.bins_ready(); - match ready { - Some(mut ready) => { - eprintln!("ready {ready:?}"); - if coll.is_none() { - coll = Some(ready.as_collectable_mut().new_collector()); - } - let cl = coll.as_mut().unwrap(); - cl.ingest(ready.as_collectable_mut()); - } - None => { - return Err(format!("bins_ready_count but no result").into()); - } - } - } - } - ChannelEvents::Status(_) => { - eprintln!("TODO Status"); - } - }, - }, - StreamItem::Log(_) => { - eprintln!("TODO Log"); - } - StreamItem::Stats(_) => { - eprintln!("TODO Stats"); - } - } - } - if let Some(mut binner) = binner { - binner.cycle(); - // TODO merge with the same logic above in the loop. - if binner.bins_ready_count() > 0 { - let ready = binner.bins_ready(); - match ready { - Some(mut ready) => { - eprintln!("ready {ready:?}"); - if coll.is_none() { - coll = Some(ready.as_collectable_mut().new_collector()); - } - let cl = coll.as_mut().unwrap(); - cl.ingest(ready.as_collectable_mut()); - } - None => { - return Err(format!("bins_ready_count but no result").into()); - } - } - } - } - match coll { - Some(mut coll) => { - let res = coll.result(None, Some(binrange.clone())).map_err(|e| format!("{e}"))?; - //let res = res.to_json_result().map_err(|e| format!("{e}"))?; - //let res = res.to_json_bytes().map_err(|e| format!("{e}"))?; - eprintln!("res {res:?}"); - } - None => { - panic!(); - } - } + let res = BinnedCollected::new( + binrange, + ScalarType::F32, + Shape::Scalar, + do_time_weight, + deadline, + Box::pin(stream), + ) + .await?; + // TODO assert Ok::<_, Error>(()) }; runfut(fut).unwrap(); @@ -448,19 +382,20 @@ fn bin02() { beg: TSBASE + SEC * 1, end: TSBASE + SEC * 10, }; - let binrange = BinnedRange::covering_range(range, 9).map_err(|e| format!("{e}"))?; - assert_eq!(binrange.edges().len(), 10); + let binrange = BinnedRangeEnum::covering_range(range.into(), 9).map_err(|e| format!("{e}"))?; let stream = Box::pin(stream); - let collected = binned_collected( + let deadline = Instant::now() + Duration::from_millis(4000); + let do_time_weight = true; + let res = BinnedCollected::new( + binrange, ScalarType::F32, Shape::Scalar, - AggKind::TimeWeightedScalar, - binrange, - Duration::from_millis(2000), - stream, + do_time_weight, + deadline, + Box::pin(stream), ) .await?; - eprintln!("collected {:?}", collected); + eprintln!("res {:?}", res); Ok::<_, Error>(()) }; runfut(fut).unwrap(); @@ -488,8 +423,8 @@ fn binned_timeout_01() { events_vec1.push(Ok(StreamItem::DataItem(RangeCompletableItem::Data(cev)))); } events_vec1.push(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))); - let inp1 = events_vec1; - let inp1 = futures_util::stream::iter(inp1).enumerate().then(|(i, k)| async move { + let inp1 = VecStream::new(events_vec1.into_iter().collect()); + let inp1 = inp1.enumerate2().then2(|(i, k)| async move { if i == 5 { let _ = tokio::time::sleep(Duration::from_millis(10000)).await; } @@ -500,21 +435,16 @@ fn binned_timeout_01() { beg: TSBASE + SEC * 1, end: TSBASE + SEC * 10, }; - let binrange = BinnedRange::covering_range(range, 9)?; + let binrange = BinnedRangeEnum::covering_range(range.into(), 9)?; eprintln!("edges1: {:?}", edges); - eprintln!("edges2: {:?}", binrange.edges()); - let inp1 = Box::pin(inp1) as _; + //eprintln!("edges2: {:?}", binrange.edges()); + let inp1 = Box::pin(inp1); let timeout = Duration::from_millis(400); - let res = binned_collected( - ScalarType::F32, - Shape::Scalar, - AggKind::TimeWeightedScalar, - binrange, - timeout, - inp1, - ) - .await?; - let r2: &BinsDim0CollectedResult = res.as_any_ref().downcast_ref().expect("res seems wrong type"); + let deadline = Instant::now() + Duration::from_millis(4000); + let do_time_weight = true; + let res = + BinnedCollected::new(binrange, ScalarType::F32, Shape::Scalar, do_time_weight, deadline, inp1).await?; + let r2: &BinsDim0CollectedResult = res.result.as_any_ref().downcast_ref().expect("res seems wrong type"); eprintln!("rs: {r2:?}"); assert_eq!(SEC * r2.ts_anchor_sec(), TSBASE + SEC); assert_eq!(r2.counts(), &[10, 10, 10]); diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index e9764e9..b15a2ad 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -769,6 +769,43 @@ pub enum SeriesRange { PulseRange(PulseRange), } +impl SeriesRange { + pub fn is_time(&self) -> bool { + match self { + SeriesRange::TimeRange(_) => true, + SeriesRange::PulseRange(_) => false, + } + } + + pub fn is_pulse(&self) -> bool { + match self { + SeriesRange::TimeRange(_) => false, + SeriesRange::PulseRange(_) => true, + } + } + + pub fn beg_u64(&self) -> u64 { + match self { + SeriesRange::TimeRange(x) => x.beg, + SeriesRange::PulseRange(x) => x.beg, + } + } + + pub fn end_u64(&self) -> u64 { + match self { + SeriesRange::TimeRange(x) => x.beg, + SeriesRange::PulseRange(x) => x.beg, + } + } + + pub fn delta_u64(&self) -> u64 { + match self { + SeriesRange::TimeRange(x) => x.end - x.beg, + SeriesRange::PulseRange(x) => x.end - x.beg, + } + } +} + impl From for SeriesRange { fn from(k: NanoRange) -> Self { Self::TimeRange(k) @@ -1120,16 +1157,16 @@ pub trait Dim0Index: Clone + fmt::Debug + PartialOrd { fn div_v(&self, v: &Self) -> u64; fn as_u64(&self) -> u64; fn series_range(a: Self, b: Self) -> SeriesRange; - fn prebin_bin_len_opts() -> &'static [Self]; + fn prebin_bin_len_opts() -> Vec; fn prebin_patch_len_for(i: usize) -> Self; fn to_pre_binned_patch_range_enum( - bin_len: &Self, + &self, bin_count: u64, patch_offset: u64, patch_count: u64, ) -> PreBinnedPatchRangeEnum; - fn binned_bin_len_opts() -> &'static [Self]; - fn to_binned_range_enum(bin_len: &Self, bin_off: u64, bin_cnt: u64) -> BinnedRangeEnum; + fn binned_bin_len_opts() -> Vec; + fn to_binned_range_enum(&self, bin_off: u64, bin_cnt: u64) -> BinnedRangeEnum; } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd)] @@ -1140,39 +1177,39 @@ pub struct PulseId(u64); impl Dim0Index for TsNano { fn add(&self, v: &Self) -> Self { - todo!() + Self(self.0 + v.0) } fn sub(&self, v: &Self) -> Self { - todo!() + Self(self.0 - v.0) } fn sub_n(&self, v: u64) -> Self { - todo!() + Self(self.0 - v) } fn times(&self, x: u64) -> Self { - todo!() + Self(self.0 * x) } fn div_n(&self, n: u64) -> Self { - todo!() + Self(self.0 / n) } fn div_v(&self, v: &Self) -> u64 { - todo!() + self.0 / v.0 } fn as_u64(&self) -> u64 { - todo!() + self.0 } fn series_range(a: Self, b: Self) -> SeriesRange { todo!() } - fn prebin_bin_len_opts() -> &'static [Self] { - todo!() + fn prebin_bin_len_opts() -> Vec { + PREBIN_TIME_BIN_LEN_VAR0.iter().map(|&x| Self(x)).collect() } fn prebin_patch_len_for(i: usize) -> Self { @@ -1180,58 +1217,69 @@ impl Dim0Index for TsNano { } fn to_pre_binned_patch_range_enum( - bin_len: &Self, + &self, bin_count: u64, patch_offset: u64, patch_count: u64, ) -> PreBinnedPatchRangeEnum { - todo!() + PreBinnedPatchRangeEnum::Time(PreBinnedPatchRange { + first: PreBinnedPatchCoord { + bin_len: self.clone(), + bin_count, + patch_offset, + }, + patch_count, + }) } - fn binned_bin_len_opts() -> &'static [Self] { - todo!() + fn binned_bin_len_opts() -> Vec { + TIME_BIN_THRESHOLDS.iter().map(|&x| Self(x)).collect() } - fn to_binned_range_enum(bin_len: &Self, bin_off: u64, bin_cnt: u64) -> BinnedRangeEnum { - todo!() + fn to_binned_range_enum(&self, bin_off: u64, bin_cnt: u64) -> BinnedRangeEnum { + BinnedRangeEnum::Time(BinnedRange { + bin_len: self.clone(), + bin_off, + bin_cnt, + }) } } impl Dim0Index for PulseId { fn add(&self, v: &Self) -> Self { - todo!() + Self(self.0 + v.0) } fn sub(&self, v: &Self) -> Self { - todo!() + Self(self.0 - v.0) } fn sub_n(&self, v: u64) -> Self { - todo!() + Self(self.0 - v) } fn times(&self, x: u64) -> Self { - todo!() + Self(self.0 * x) } fn div_n(&self, n: u64) -> Self { - todo!() + Self(self.0 / n) } fn div_v(&self, v: &Self) -> u64 { - todo!() + self.0 / v.0 } fn as_u64(&self) -> u64 { - todo!() + self.0 } fn series_range(a: Self, b: Self) -> SeriesRange { todo!() } - fn prebin_bin_len_opts() -> &'static [Self] { - todo!() + fn prebin_bin_len_opts() -> Vec { + PREBIN_PULSE_BIN_LEN_VAR0.iter().map(|&x| Self(x)).collect() } fn prebin_patch_len_for(i: usize) -> Self { @@ -1239,24 +1287,37 @@ impl Dim0Index for PulseId { } fn to_pre_binned_patch_range_enum( - bin_len: &Self, + &self, bin_count: u64, patch_offset: u64, patch_count: u64, ) -> PreBinnedPatchRangeEnum { - todo!() + PreBinnedPatchRangeEnum::Pulse(PreBinnedPatchRange { + first: PreBinnedPatchCoord { + bin_len: self.clone(), + bin_count, + patch_offset, + }, + patch_count, + }) } - fn binned_bin_len_opts() -> &'static [Self] { - todo!() + fn binned_bin_len_opts() -> Vec { + PULSE_BIN_THRESHOLDS.iter().map(|&x| Self(x)).collect() } - fn to_binned_range_enum(bin_len: &Self, bin_off: u64, bin_cnt: u64) -> BinnedRangeEnum { - todo!() + fn to_binned_range_enum(&self, bin_off: u64, bin_cnt: u64) -> BinnedRangeEnum { + BinnedRangeEnum::Pulse(BinnedRange { + bin_len: self.clone(), + bin_off, + bin_cnt, + }) } } -const PREBIN_TIME_BIN_LEN_VAR0: [TsNano; 3] = [TsNano(MIN * 1), TsNano(HOUR * 1), TsNano(DAY)]; +const PREBIN_TIME_BIN_LEN_VAR0: [u64; 3] = [MIN * 1, HOUR * 1, DAY]; + +const PREBIN_PULSE_BIN_LEN_VAR0: [u64; 4] = [100, 10000, 1000000, 100000000]; const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 3] = [ // @@ -1485,11 +1546,11 @@ impl PreBinnedPatchRangeEnum { let du = b.sub(&a); let max_bin_len = du.div_n(min_bin_count as u64); for (i1, bl) in opts.iter().enumerate().rev() { - if bl <= &du { + if bl <= &max_bin_len { let patch_len = ::prebin_patch_len_for(i1); let bin_count = patch_len.div_v(bl); let patch_off_1 = a.div_v(&patch_len); - let patch_off_2 = b.div_v(&patch_len.add(&patch_len).sub_n(1)); + let patch_off_2 = (b.add(&patch_len).sub_n(1)).div_v(&patch_len); let patch_count = patch_off_2 - patch_off_1; let ret = T::to_pre_binned_patch_range_enum(&bl, bin_count, patch_off_1, patch_count); return Ok(ret); @@ -1554,23 +1615,22 @@ where err::todoval() } - pub fn edges(&self) -> Vec { - /* + pub fn edges_u64(&self) -> Vec { let mut ret = Vec::new(); - let mut t = self.offset * self.grid_spec.bin_t_len; - let end = (self.offset + self.bin_count) * self.grid_spec.bin_t_len; + let mut t = self.bin_len.times(self.bin_off); + let end = self.bin_len.times(self.bin_off + self.bin_cnt); while t <= end { - ret.push(t); - t += self.grid_spec.bin_t_len; + ret.push(t.as_u64()); + t = t.add(&self.bin_len); } - */ - err::todoval() + ret } } +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum BinnedRangeEnum { - Time(PreBinnedPatchRange), - Pulse(PreBinnedPatchRange), + Time(BinnedRange), + Pulse(BinnedRange), } impl BinnedRangeEnum { @@ -1587,10 +1647,11 @@ impl BinnedRangeEnum { } let du = b.sub(&a); let max_bin_len = du.div_n(min_bin_count as u64); - for (i1, bl) in opts.iter().enumerate().rev() { - if bl <= &du { + for (_, bl) in opts.iter().enumerate().rev() { + if bl <= &max_bin_len { let off_1 = a.div_v(&bl); - let off_2 = b.div_v(&bl.add(&bl).sub_n(1)); + let off_2 = (b.add(&bl).sub_n(1)).div_v(&bl); + eprintln!("off_1 {off_1:?} off_2 {off_2:?}"); let bin_cnt = off_2 - off_1; let ret = T::to_binned_range_enum(bl, off_1, bin_cnt); return Ok(ret); @@ -1606,6 +1667,17 @@ impl BinnedRangeEnum { SeriesRange::PulseRange(k) => Self::covering_range_ty(PulseId(k.beg), PulseId(k.end), min_bin_count), } } + + pub fn bin_count(&self) -> u64 { + match self { + BinnedRangeEnum::Time(k) => k.bin_count(), + BinnedRangeEnum::Pulse(k) => k.bin_count(), + } + } + + pub fn range_at(&self, i: usize) -> Option { + err::todoval() + } } #[cfg(test)] @@ -1619,11 +1691,11 @@ mod test_binned_range { end: HOUR * 73, }; let range = BinnedRangeEnum::covering_range(range.into(), 10).unwrap(); + assert_eq!(range.bin_count(), 12); match range { BinnedRangeEnum::Time(range) => { - assert_eq!(range.bin_count(), 12); - assert_eq!(range.edges()[0], HOUR * 72); - assert_eq!(range.edges()[2], HOUR * 72 + MIN * 5 * 2); + assert_eq!(range.edges_u64()[0], HOUR * 72); + assert_eq!(range.edges_u64()[2], HOUR * 72 + MIN * 5 * 2); } BinnedRangeEnum::Pulse(_) => panic!(), } @@ -1636,12 +1708,12 @@ mod test_binned_range { end: HOUR * 10 + MIN * 20 + SEC * 30, }; let range = BinnedRangeEnum::covering_range(range.into(), 10).unwrap(); + assert_eq!(range.bin_count(), 11); match range { BinnedRangeEnum::Time(range) => { - assert_eq!(range.bin_count(), 11); - assert_eq!(range.edges()[0], HOUR * 0); - assert_eq!(range.edges()[1], HOUR * 1); - assert_eq!(range.edges()[11], HOUR * 11); + assert_eq!(range.edges_u64()[0], HOUR * 0); + assert_eq!(range.edges_u64()[1], HOUR * 1); + assert_eq!(range.edges_u64()[11], HOUR * 11); } BinnedRangeEnum::Pulse(_) => panic!(), }