diff --git a/src/binning/container_events.rs b/src/binning/container_events.rs index 9b6bf8a..24fb67c 100644 --- a/src/binning/container_events.rs +++ b/src/binning/container_events.rs @@ -11,6 +11,7 @@ use err::thiserror; use err::ThisError; use items_0::container::ByteEstimate; use items_0::merge::DrainIntoDstResult; +use items_0::merge::DrainIntoNewDynResult; use items_0::merge::DrainIntoNewResult; use items_0::merge::MergeableDyn; use items_0::merge::MergeableTy; @@ -18,17 +19,19 @@ use items_0::subfr::SubFrId; use items_0::timebin::BinningggContainerEventsDyn; use items_0::vecpreview::PreviewRange; use items_0::vecpreview::VecPreview; +use items_0::Appendable; use items_0::AsAnyMut; use items_0::AsAnyRef; +use items_0::Empty; use items_0::WithLen; use netpod::BinnedRange; use netpod::EnumVariant; +use netpod::TsMs; use netpod::TsNano; use serde::Deserialize; use serde::Serialize; use std::any; use std::collections::VecDeque; -use std::iter; macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } @@ -46,6 +49,7 @@ where fn pop_front(&mut self) -> Option; fn get_iter_ty_1(&self, pos: usize) -> Option>; fn iter_ty_1(&self) -> impl Iterator>; + fn drain_into(&mut self, dst: &mut Self, range: Range); } pub trait PartialOrdEvtA { @@ -86,6 +90,10 @@ where fn iter_ty_1(&self) -> impl Iterator::IterTy1<'_>> { self.iter().map(|x| x.clone()) } + + fn drain_into(&mut self, dst: &mut Self, range: Range) { + dst.extend(self.drain(range)); + } } impl Container for VecDeque { @@ -108,6 +116,10 @@ impl Container for VecDeque { fn iter_ty_1(&self) -> impl Iterator::IterTy1<'_>> { self.iter().map(|x| x.as_str()) } + + fn drain_into(&mut self, dst: &mut Self, range: Range) { + dst.extend(self.drain(range)) + } } macro_rules! impl_event_value_type { @@ -475,6 +487,24 @@ where } } +impl Empty for ContainerEvents +where + EVT: EventValueType, +{ + fn empty() -> Self { + ContainerEvents::new() + } +} + +impl Appendable for ContainerEvents +where + EVT: EventValueType, +{ + fn push(&mut self, ts: TsNano, value: EVT) { + self.push_back(ts, value); + } +} + pub struct ContainerEventsTakeUpTo<'a, EVT> where EVT: EventValueType, @@ -546,35 +576,54 @@ where EVT: EventValueType, { fn ts_min(&self) -> Option { - todo!() + self.tss.front().copied() } fn ts_max(&self) -> Option { - todo!() - } - - fn drain_into(&mut self, dst: &mut Self, range: Range) -> DrainIntoDstResult { - todo!() - } - - fn drain_into_new(&mut self, range: Range) -> DrainIntoNewResult { - todo!() + self.tss.back().copied() } fn find_lowest_index_gt(&self, ts: TsNano) -> Option { - todo!() + let x = self.tss.partition_point(|&x| x <= ts); + if x >= self.tss.len() { + None + } else { + Some(x) + } } fn find_lowest_index_ge(&self, ts: TsNano) -> Option { - todo!() + let x = self.tss.partition_point(|&x| x < ts); + if x >= self.tss.len() { + None + } else { + Some(x) + } } fn find_highest_index_lt(&self, ts: TsNano) -> Option { - todo!() + let x = self.tss.partition_point(|&x| x < ts); + if x == 0 || x >= self.tss.len() { + None + } else { + Some(x - 1) + } } - fn tss_for_testing(&self) -> Vec { - todo!() + fn tss_for_testing(&self) -> Vec { + self.tss.iter().map(|&x| x.to_ts_ms()).collect() + } + + fn drain_into(&mut self, dst: &mut Self, range: Range) -> DrainIntoDstResult { + dst.tss.extend(self.tss.drain(range.clone())); + self.vals.drain_into(&mut dst.vals, range); + DrainIntoDstResult::Done + } + + fn drain_into_new(&mut self, range: Range) -> DrainIntoNewResult { + let mut dst = Self::new(); + MergeableTy::drain_into(self, &mut dst, range); + DrainIntoNewResult::Done(dst) } } @@ -583,27 +632,27 @@ where EVT: EventValueType, { fn ts_min(&self) -> Option { - todo!() + MergeableTy::ts_min(self) } fn ts_max(&self) -> Option { - todo!() + MergeableTy::ts_max(self) } fn find_lowest_index_gt(&self, ts: TsNano) -> Option { - todo!() + MergeableTy::find_lowest_index_gt(self, ts) } fn find_lowest_index_ge(&self, ts: TsNano) -> Option { - todo!() + MergeableTy::find_lowest_index_ge(self, ts) } fn find_highest_index_lt(&self, ts: TsNano) -> Option { - todo!() + MergeableTy::find_highest_index_lt(self, ts) } fn tss_for_testing(&self) -> Vec { - todo!() + MergeableTy::tss_for_testing(self) } fn drain_into( @@ -611,7 +660,19 @@ where dst: &mut dyn MergeableDyn, range: Range, ) -> DrainIntoDstResult { - todo!() + if let Some(dst) = dst.as_any_mut().downcast_mut::() { + MergeableTy::drain_into(self, dst, range) + } else { + DrainIntoDstResult::NotCompatible + } + } + + fn drain_into_new(&mut self, range: Range) -> DrainIntoNewDynResult { + match MergeableTy::drain_into_new(self, range) { + DrainIntoNewResult::Done(x) => DrainIntoNewDynResult::Done(Box::new(x)), + DrainIntoNewResult::Partial(x) => DrainIntoNewDynResult::Partial(Box::new(x)), + DrainIntoNewResult::NotCompatible => DrainIntoNewDynResult::NotCompatible, + } } } @@ -654,6 +715,23 @@ where false } } + + fn verify(&self) -> bool { + let mut good = true; + let n = self.tss.len(); + for (&ts1, &ts2) in self.tss.iter().zip(self.tss.range(n.min(1)..n)) { + if ts1 > ts2 { + good = false; + error!("unordered event data ts1 {} ts2 {}", ts1, ts2); + break; + } + } + good + } + + fn as_mergeable_dyn_mut(&mut self) -> &mut dyn MergeableDyn { + self + } } #[cfg(test)] diff --git a/src/binning/valuetype.rs b/src/binning/valuetype.rs index a8de496..e7fae65 100644 --- a/src/binning/valuetype.rs +++ b/src/binning/valuetype.rs @@ -71,6 +71,11 @@ impl Container for EnumVariantContainer { name: x.1.as_str(), }) } + + fn drain_into(&mut self, dst: &mut Self, range: std::ops::Range) { + dst.ixs.extend(self.ixs.drain(range.clone())); + dst.names.extend(self.names.drain(range)); + } } #[derive(Debug)] diff --git a/src/channelevents.rs b/src/channelevents.rs index 80e779c..4bcdf3b 100644 --- a/src/channelevents.rs +++ b/src/channelevents.rs @@ -11,6 +11,7 @@ use items_0::container::ByteEstimate; use items_0::framable::FrameTypeInnerStatic; use items_0::isodate::IsoDateTime; use items_0::merge::DrainIntoDstResult; +use items_0::merge::DrainIntoNewDynResult; use items_0::merge::DrainIntoNewResult; use items_0::merge::MergeableDyn; use items_0::merge::MergeableTy; @@ -29,6 +30,7 @@ use netpod::BinnedRangeEnum; use netpod::TsNano; use serde::Deserialize; use serde::Serialize; +use serde_json::map::Keys; use std::any; use std::any::Any; use std::collections::VecDeque; @@ -791,11 +793,7 @@ impl MergeableTy for ChannelEvents { fn drain_into(&mut self, dst: &mut Self, range: Range) -> DrainIntoDstResult { match self { ChannelEvents::Events(k) => match dst { - ChannelEvents::Events(j) => { - // - // k.drain_into(j, range) - todo!() - } + ChannelEvents::Events(j) => k.drain_into(j.as_mergeable_dyn_mut(), range), ChannelEvents::Status(_) => DrainIntoDstResult::NotCompatible, }, ChannelEvents::Status(k) => match dst { @@ -821,7 +819,18 @@ impl MergeableTy for ChannelEvents { } fn drain_into_new(&mut self, range: Range) -> DrainIntoNewResult { - todo!() + match self { + ChannelEvents::Events(k) => match k.drain_into_new(range) { + DrainIntoNewDynResult::Done(x) => { + DrainIntoNewResult::Done(ChannelEvents::Events(x)) + } + DrainIntoNewDynResult::Partial(x) => { + DrainIntoNewResult::Partial(ChannelEvents::Events(x)) + } + DrainIntoNewDynResult::NotCompatible => DrainIntoNewResult::NotCompatible, + }, + ChannelEvents::Status(k) => DrainIntoNewResult::Done(ChannelEvents::Status(k.clone())), + } } fn find_lowest_index_gt(&self, ts: TsNano) -> Option { diff --git a/src/eventsdim0.rs b/src/eventsdim0.rs index f62135b..cf72bde 100644 --- a/src/eventsdim0.rs +++ b/src/eventsdim0.rs @@ -726,7 +726,7 @@ impl Events for EventsDim0 { fn to_dim0_f32_for_binning(&self) -> Box { let mut ret = EventsDim0::empty(); for (&ts, val) in self.tss.iter().zip(self.values.iter()) { - ret.push(ts, 0, val.as_prim_f32_b()); + ret.push(TsNano::from_ns(ts), val.as_prim_f32_b()); } Box::new(ret) } @@ -768,9 +768,9 @@ impl Appendable for EventsDim0 where STY: ScalarOps, { - fn push(&mut self, ts: u64, pulse: u64, value: STY) { - self.tss.push_back(ts); - self.pulses.push_back(pulse); + fn push(&mut self, ts: TsNano, value: STY) { + self.tss.push_back(ts.ns()); + self.pulses.push_back(0); self.values.push_back(value); } } diff --git a/src/eventsdim1.rs b/src/eventsdim1.rs index 8d4a965..9d9ba66 100644 --- a/src/eventsdim1.rs +++ b/src/eventsdim1.rs @@ -23,6 +23,7 @@ use netpod::range::evrange::SeriesRange; use netpod::timeunits::MS; use netpod::timeunits::SEC; use netpod::BinnedRangeEnum; +use netpod::TsNano; use serde::Deserialize; use serde::Serialize; use std::any; @@ -664,7 +665,7 @@ impl Appendable> for EventsDim1 where STY: ScalarOps, { - fn push(&mut self, ts: u64, pulse: u64, value: Vec) { - Self::push(self, ts, pulse, value) + fn push(&mut self, ts: TsNano, value: Vec) { + Self::push(self, ts.ns(), 0, value) } } diff --git a/src/testgen.rs b/src/testgen.rs index c253c3c..48a6d2e 100644 --- a/src/testgen.rs +++ b/src/testgen.rs @@ -4,6 +4,7 @@ use crate::eventsdim0::EventsDim0; use crate::Events; use items_0::Appendable; use items_0::Empty; +use netpod::TsNano; #[allow(unused)] fn xorshift32(state: u32) -> u32 { @@ -27,7 +28,7 @@ pub fn make_some_boxed_d0_f32( vstate = xorshift32(vstate); let ts = t0 + i as u64 * tstep + (vstate as u64 & tmask); let value = i as f32 * 100. + vstate as f32 / u32::MAX as f32 / 10.; - events.push(ts, ts, value); + events.push(TsNano::from_ns(ts), value); } Box::new(events) }