Add drain impls

This commit is contained in:
Dominik Werder
2024-11-25 11:18:57 +01:00
parent 46b3d28db2
commit f803e07296
6 changed files with 130 additions and 36 deletions

View File

@@ -11,6 +11,7 @@ use err::thiserror;
use err::ThisError;
use items_0::container::ByteEstimate;
use items_0::merge::DrainIntoDstResult;
use items_0::merge::DrainIntoNewDynResult;
use items_0::merge::DrainIntoNewResult;
use items_0::merge::MergeableDyn;
use items_0::merge::MergeableTy;
@@ -18,17 +19,19 @@ use items_0::subfr::SubFrId;
use items_0::timebin::BinningggContainerEventsDyn;
use items_0::vecpreview::PreviewRange;
use items_0::vecpreview::VecPreview;
use items_0::Appendable;
use items_0::AsAnyMut;
use items_0::AsAnyRef;
use items_0::Empty;
use items_0::WithLen;
use netpod::BinnedRange;
use netpod::EnumVariant;
use netpod::TsMs;
use netpod::TsNano;
use serde::Deserialize;
use serde::Serialize;
use std::any;
use std::collections::VecDeque;
use std::iter;
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
@@ -46,6 +49,7 @@ where
fn pop_front(&mut self) -> Option<EVT>;
fn get_iter_ty_1(&self, pos: usize) -> Option<EVT::IterTy1<'_>>;
fn iter_ty_1(&self) -> impl Iterator<Item = EVT::IterTy1<'_>>;
fn drain_into(&mut self, dst: &mut Self, range: Range<usize>);
}
pub trait PartialOrdEvtA<EVT> {
@@ -86,6 +90,10 @@ where
fn iter_ty_1(&self) -> impl Iterator<Item = <EVT as EventValueType>::IterTy1<'_>> {
self.iter().map(|x| x.clone())
}
fn drain_into(&mut self, dst: &mut Self, range: Range<usize>) {
dst.extend(self.drain(range));
}
}
impl Container<String> for VecDeque<String> {
@@ -108,6 +116,10 @@ impl Container<String> for VecDeque<String> {
fn iter_ty_1(&self) -> impl Iterator<Item = <String as EventValueType>::IterTy1<'_>> {
self.iter().map(|x| x.as_str())
}
fn drain_into(&mut self, dst: &mut Self, range: Range<usize>) {
dst.extend(self.drain(range))
}
}
macro_rules! impl_event_value_type {
@@ -475,6 +487,24 @@ where
}
}
impl<EVT> Empty for ContainerEvents<EVT>
where
EVT: EventValueType,
{
fn empty() -> Self {
ContainerEvents::new()
}
}
impl<EVT> Appendable<EVT> for ContainerEvents<EVT>
where
EVT: EventValueType,
{
fn push(&mut self, ts: TsNano, value: EVT) {
self.push_back(ts, value);
}
}
pub struct ContainerEventsTakeUpTo<'a, EVT>
where
EVT: EventValueType,
@@ -546,35 +576,54 @@ where
EVT: EventValueType,
{
fn ts_min(&self) -> Option<TsNano> {
todo!()
self.tss.front().copied()
}
fn ts_max(&self) -> Option<TsNano> {
todo!()
}
fn drain_into(&mut self, dst: &mut Self, range: Range<usize>) -> DrainIntoDstResult {
todo!()
}
fn drain_into_new(&mut self, range: Range<usize>) -> DrainIntoNewResult<Self> {
todo!()
self.tss.back().copied()
}
fn find_lowest_index_gt(&self, ts: TsNano) -> Option<usize> {
todo!()
let x = self.tss.partition_point(|&x| x <= ts);
if x >= self.tss.len() {
None
} else {
Some(x)
}
}
fn find_lowest_index_ge(&self, ts: TsNano) -> Option<usize> {
todo!()
let x = self.tss.partition_point(|&x| x < ts);
if x >= self.tss.len() {
None
} else {
Some(x)
}
}
fn find_highest_index_lt(&self, ts: TsNano) -> Option<usize> {
todo!()
let x = self.tss.partition_point(|&x| x < ts);
if x == 0 || x >= self.tss.len() {
None
} else {
Some(x - 1)
}
}
fn tss_for_testing(&self) -> Vec<netpod::TsMs> {
todo!()
fn tss_for_testing(&self) -> Vec<TsMs> {
self.tss.iter().map(|&x| x.to_ts_ms()).collect()
}
fn drain_into(&mut self, dst: &mut Self, range: Range<usize>) -> DrainIntoDstResult {
dst.tss.extend(self.tss.drain(range.clone()));
self.vals.drain_into(&mut dst.vals, range);
DrainIntoDstResult::Done
}
fn drain_into_new(&mut self, range: Range<usize>) -> DrainIntoNewResult<Self> {
let mut dst = Self::new();
MergeableTy::drain_into(self, &mut dst, range);
DrainIntoNewResult::Done(dst)
}
}
@@ -583,27 +632,27 @@ where
EVT: EventValueType,
{
fn ts_min(&self) -> Option<TsNano> {
todo!()
MergeableTy::ts_min(self)
}
fn ts_max(&self) -> Option<TsNano> {
todo!()
MergeableTy::ts_max(self)
}
fn find_lowest_index_gt(&self, ts: TsNano) -> Option<usize> {
todo!()
MergeableTy::find_lowest_index_gt(self, ts)
}
fn find_lowest_index_ge(&self, ts: TsNano) -> Option<usize> {
todo!()
MergeableTy::find_lowest_index_ge(self, ts)
}
fn find_highest_index_lt(&self, ts: TsNano) -> Option<usize> {
todo!()
MergeableTy::find_highest_index_lt(self, ts)
}
fn tss_for_testing(&self) -> Vec<netpod::TsMs> {
todo!()
MergeableTy::tss_for_testing(self)
}
fn drain_into(
@@ -611,7 +660,19 @@ where
dst: &mut dyn MergeableDyn,
range: Range<usize>,
) -> DrainIntoDstResult {
todo!()
if let Some(dst) = dst.as_any_mut().downcast_mut::<Self>() {
MergeableTy::drain_into(self, dst, range)
} else {
DrainIntoDstResult::NotCompatible
}
}
fn drain_into_new(&mut self, range: Range<usize>) -> DrainIntoNewDynResult {
match MergeableTy::drain_into_new(self, range) {
DrainIntoNewResult::Done(x) => DrainIntoNewDynResult::Done(Box::new(x)),
DrainIntoNewResult::Partial(x) => DrainIntoNewDynResult::Partial(Box::new(x)),
DrainIntoNewResult::NotCompatible => DrainIntoNewDynResult::NotCompatible,
}
}
}
@@ -654,6 +715,23 @@ where
false
}
}
fn verify(&self) -> bool {
let mut good = true;
let n = self.tss.len();
for (&ts1, &ts2) in self.tss.iter().zip(self.tss.range(n.min(1)..n)) {
if ts1 > ts2 {
good = false;
error!("unordered event data ts1 {} ts2 {}", ts1, ts2);
break;
}
}
good
}
fn as_mergeable_dyn_mut(&mut self) -> &mut dyn MergeableDyn {
self
}
}
#[cfg(test)]

View File

@@ -71,6 +71,11 @@ impl Container<EnumVariant> for EnumVariantContainer {
name: x.1.as_str(),
})
}
fn drain_into(&mut self, dst: &mut Self, range: std::ops::Range<usize>) {
dst.ixs.extend(self.ixs.drain(range.clone()));
dst.names.extend(self.names.drain(range));
}
}
#[derive(Debug)]

View File

@@ -11,6 +11,7 @@ use items_0::container::ByteEstimate;
use items_0::framable::FrameTypeInnerStatic;
use items_0::isodate::IsoDateTime;
use items_0::merge::DrainIntoDstResult;
use items_0::merge::DrainIntoNewDynResult;
use items_0::merge::DrainIntoNewResult;
use items_0::merge::MergeableDyn;
use items_0::merge::MergeableTy;
@@ -29,6 +30,7 @@ use netpod::BinnedRangeEnum;
use netpod::TsNano;
use serde::Deserialize;
use serde::Serialize;
use serde_json::map::Keys;
use std::any;
use std::any::Any;
use std::collections::VecDeque;
@@ -791,11 +793,7 @@ impl MergeableTy for ChannelEvents {
fn drain_into(&mut self, dst: &mut Self, range: Range<usize>) -> DrainIntoDstResult {
match self {
ChannelEvents::Events(k) => match dst {
ChannelEvents::Events(j) => {
//
// k.drain_into(j, range)
todo!()
}
ChannelEvents::Events(j) => k.drain_into(j.as_mergeable_dyn_mut(), range),
ChannelEvents::Status(_) => DrainIntoDstResult::NotCompatible,
},
ChannelEvents::Status(k) => match dst {
@@ -821,7 +819,18 @@ impl MergeableTy for ChannelEvents {
}
fn drain_into_new(&mut self, range: Range<usize>) -> DrainIntoNewResult<Self> {
todo!()
match self {
ChannelEvents::Events(k) => match k.drain_into_new(range) {
DrainIntoNewDynResult::Done(x) => {
DrainIntoNewResult::Done(ChannelEvents::Events(x))
}
DrainIntoNewDynResult::Partial(x) => {
DrainIntoNewResult::Partial(ChannelEvents::Events(x))
}
DrainIntoNewDynResult::NotCompatible => DrainIntoNewResult::NotCompatible,
},
ChannelEvents::Status(k) => DrainIntoNewResult::Done(ChannelEvents::Status(k.clone())),
}
}
fn find_lowest_index_gt(&self, ts: TsNano) -> Option<usize> {

View File

@@ -726,7 +726,7 @@ impl<STY: ScalarOps> Events for EventsDim0<STY> {
fn to_dim0_f32_for_binning(&self) -> Box<dyn Events> {
let mut ret = EventsDim0::empty();
for (&ts, val) in self.tss.iter().zip(self.values.iter()) {
ret.push(ts, 0, val.as_prim_f32_b());
ret.push(TsNano::from_ns(ts), val.as_prim_f32_b());
}
Box::new(ret)
}
@@ -768,9 +768,9 @@ impl<STY> Appendable<STY> for EventsDim0<STY>
where
STY: ScalarOps,
{
fn push(&mut self, ts: u64, pulse: u64, value: STY) {
self.tss.push_back(ts);
self.pulses.push_back(pulse);
fn push(&mut self, ts: TsNano, value: STY) {
self.tss.push_back(ts.ns());
self.pulses.push_back(0);
self.values.push_back(value);
}
}

View File

@@ -23,6 +23,7 @@ use netpod::range::evrange::SeriesRange;
use netpod::timeunits::MS;
use netpod::timeunits::SEC;
use netpod::BinnedRangeEnum;
use netpod::TsNano;
use serde::Deserialize;
use serde::Serialize;
use std::any;
@@ -664,7 +665,7 @@ impl<STY> Appendable<Vec<STY>> for EventsDim1<STY>
where
STY: ScalarOps,
{
fn push(&mut self, ts: u64, pulse: u64, value: Vec<STY>) {
Self::push(self, ts, pulse, value)
fn push(&mut self, ts: TsNano, value: Vec<STY>) {
Self::push(self, ts.ns(), 0, value)
}
}

View File

@@ -4,6 +4,7 @@ use crate::eventsdim0::EventsDim0;
use crate::Events;
use items_0::Appendable;
use items_0::Empty;
use netpod::TsNano;
#[allow(unused)]
fn xorshift32(state: u32) -> u32 {
@@ -27,7 +28,7 @@ pub fn make_some_boxed_d0_f32(
vstate = xorshift32(vstate);
let ts = t0 + i as u64 * tstep + (vstate as u64 & tmask);
let value = i as f32 * 100. + vstate as f32 / u32::MAX as f32 / 10.;
events.push(ts, ts, value);
events.push(TsNano::from_ns(ts), value);
}
Box::new(events)
}