From 896a723a3ee61b4734bde2d2426e308cadfe3fe8 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Sat, 30 Nov 2024 11:44:09 +0100 Subject: [PATCH] WIP change container type --- src/apitypes.rs | 134 ++++++++++++++ src/binning/aggregator.rs | 8 +- src/binning/container_bins.rs | 43 +++-- src/binning/container_events.rs | 311 ++++++++++++++++++++++++++++---- src/binning/valuetype.rs | 1 + src/channelevents.rs | 42 ++--- src/eventfull.rs | 7 +- src/eventsdim0.rs | 38 ++-- src/eventsdim0enum.rs | 23 +-- src/eventsdim1.rs | 29 ++- src/lib.rs | 3 +- src/merger.rs | 72 ++++---- src/merger/test.rs | 32 ++++ src/offsets.rs | 31 ++-- 14 files changed, 584 insertions(+), 190 deletions(-) create mode 100644 src/apitypes.rs create mode 100644 src/merger/test.rs diff --git a/src/apitypes.rs b/src/apitypes.rs new file mode 100644 index 0000000..ecd316c --- /dev/null +++ b/src/apitypes.rs @@ -0,0 +1,134 @@ +use crate::binning::container::bins::BinAggedType; +use crate::binning::container_events::EventValueType; +use crate::offsets::ts_offs_from_abs; +use crate::offsets::ts_offs_from_abs_with_anchor; +use items_0::apitypes::UserApiType; +use items_0::collect_s::ToCborValue; +use items_0::collect_s::ToJsonValue; +use netpod::TsNano; +use serde::Serialize; +use std::collections::VecDeque; +use std::fmt; + +#[derive(Serialize)] +pub struct ContainerEventsApi +where + EVT: EventValueType, +{ + pub tss: VecDeque, + pub values: EVT::Container, + #[serde(skip_serializing_if = "netpod::is_false")] + pub range_final: bool, + #[serde(skip_serializing_if = "netpod::is_false")] + pub timed_out: bool, +} + +impl fmt::Debug for ContainerEventsApi +where + EVT: EventValueType, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("ContainerEventsApi") + // .field("tss", &self.tss) + // .field("values", &self.values) + .finish() + } +} + +impl ToCborValue for ContainerEventsApi +where + EVT: EventValueType, +{ + fn to_cbor_value(&self) -> Result { + let val = ciborium::value::Value::serialized(self).unwrap(); + Ok(val) + } +} + +impl ToJsonValue for ContainerEventsApi +where + EVT: EventValueType, +{ + fn to_json_value(&self) -> Result { + let ret = serde_json::to_value(self); + ret + } +} + +impl UserApiType for ContainerEventsApi where EVT: EventValueType {} + +#[derive(Serialize)] +pub struct ContainerBinsApi +where + EVT: EventValueType, + BVT: BinAggedType, +{ + pub ts1s: VecDeque, + pub ts2s: VecDeque, + pub cnts: VecDeque, + pub mins: VecDeque, + pub maxs: VecDeque, + pub aggs: VecDeque, + pub fnls: VecDeque, +} + +impl fmt::Debug for ContainerBinsApi +where + EVT: EventValueType, + BVT: BinAggedType, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("ContainerBinsApi") + // .field("tss", &self.tss) + // .field("values", &self.values) + .finish() + } +} + +impl ToCborValue for ContainerBinsApi +where + EVT: EventValueType, + BVT: BinAggedType, +{ + fn to_cbor_value(&self) -> Result { + // let val = ciborium::value::Value::serialized(self).unwrap(); + // Ok(val) + let e = ciborium::value::Error::Custom("binned data as cbor is not yet available".into()); + Err(e) + } +} + +impl ToJsonValue for ContainerBinsApi +where + EVT: EventValueType, + BVT: BinAggedType, +{ + fn to_json_value(&self) -> Result { + use serde_json::json; + use serde_json::Value; + // let ret = serde_json::to_value(self); + // ret + let (ts_anch, ts1ms, ts1ns) = ts_offs_from_abs(&self.ts1s); + let (ts2ms, ts2ns) = ts_offs_from_abs_with_anchor(ts_anch, &self.ts2s); + + let ret = json!({ + "tsAnchor": ts_anch, + "ts1Ms": ts1ms, + "ts2Ms": ts2ms, + "ts1Ns": ts1ns, + "ts2Ns": ts2ns, + "counts": self.cnts, + "mins": self.mins, + "maxs": self.maxs, + "avgs": self.aggs, + }); + Ok(ret) + } +} + +impl UserApiType for ContainerBinsApi +where + EVT: EventValueType, + BVT: BinAggedType, +{ +} diff --git a/src/binning/aggregator.rs b/src/binning/aggregator.rs index caf8357..c619654 100644 --- a/src/binning/aggregator.rs +++ b/src/binning/aggregator.rs @@ -49,7 +49,7 @@ pub struct AggregatorNumeric { sum: f64, } -trait AggWithF64: EventValueType { +pub trait AggWithF64: EventValueType { fn as_f64(&self) -> f64; } @@ -129,6 +129,10 @@ macro_rules! impl_agg_tw_for_agg_num { fn ingest(&mut self, dt: DtNano, bl: DtNano, val: $evt) { let f = dt.ns() as f64 / bl.ns() as f64; trace_event!("INGEST {} {}", f, val); + if true { + panic!(); + } + let val = 42; self.sum += f * val as f64; } @@ -158,6 +162,8 @@ impl_agg_tw_for_agg_num!(i16); impl_agg_tw_for_agg_num!(i32); impl_agg_tw_for_agg_num!(i64); +impl_agg_tw_for_agg_num!(super::container_events::PulsedVal); + impl AggregatorTimeWeight for AggregatorNumeric { fn new() -> Self { Self { sum: 0. } diff --git a/src/binning/container_bins.rs b/src/binning/container_bins.rs index 5a59860..6fcc9c9 100644 --- a/src/binning/container_bins.rs +++ b/src/binning/container_bins.rs @@ -1,11 +1,13 @@ use super::container::bins::BinAggedType; use super::container_events::EventValueType; +use crate::apitypes::ContainerBinsApi; use crate::offsets::ts_offs_from_abs; use crate::offsets::ts_offs_from_abs_with_anchor; use core::fmt; use daqbuf_err as err; use err::thiserror; use err::ThisError; +use items_0::apitypes::ToUserFacingApiType; use items_0::collect_s::CollectableDyn; use items_0::collect_s::CollectedDyn; use items_0::collect_s::ToJsonValue; @@ -16,9 +18,7 @@ use items_0::AsAnyMut; use items_0::AsAnyRef; use items_0::TypeName; use items_0::WithLen; -use netpod::log::*; use netpod::TsNano; -use serde::Deserialize; use serde::Serialize; use std::any; use std::collections::VecDeque; @@ -494,10 +494,8 @@ where { fn to_json_value(&self) -> Result { let bins = &self.bins; - let ts1sns: Vec<_> = bins.ts1s.iter().map(|x| x.ns()).collect(); - let ts2sns: Vec<_> = bins.ts2s.iter().map(|x| x.ns()).collect(); - let (ts_anch, ts1ms, ts1ns) = ts_offs_from_abs(&ts1sns); - let (ts2ms, ts2ns) = ts_offs_from_abs_with_anchor(ts_anch, &ts2sns); + let (ts_anch, ts1ms, ts1ns) = ts_offs_from_abs(&bins.ts1s); + let (ts2ms, ts2ns) = ts_offs_from_abs_with_anchor(ts_anch, &bins.ts2s); let counts = bins.cnts.clone(); let mins = bins.mins.clone(); let maxs = bins.maxs.clone(); @@ -517,6 +515,29 @@ where } } +impl ToUserFacingApiType for ContainerBinsCollectorOutput +where + EVT: EventValueType, + BVT: BinAggedType, +{ + fn to_user_facing_api_type(self) -> Box { + let ret = ContainerBinsApi:: { + ts1s: self.bins.ts1s, + ts2s: self.bins.ts2s, + cnts: self.bins.cnts, + mins: self.bins.mins, + maxs: self.bins.maxs, + aggs: self.bins.aggs, + fnls: self.bins.fnls, + }; + Box::new(ret) + } + + fn to_user_facing_api_type_box(self: Box) -> Box { + (*self).to_user_facing_api_type() + } +} + impl CollectedDyn for ContainerBinsCollectorOutput where EVT: EventValueType, @@ -585,15 +606,7 @@ where self.timed_out = true; } - fn set_continue_at_here(&mut self) { - debug!("TODO remember the continue at"); - } - - fn result( - &mut self, - _range: Option, - _binrange: Option, - ) -> Result, err::Error> { + fn result(&mut self) -> Result, err::Error> { // TODO do we need to set timeout, continueAt or anything? let bins = mem::replace(&mut self.bins, ContainerBins::new()); let ret = ContainerBinsCollectorOutput { bins }; diff --git a/src/binning/container_events.rs b/src/binning/container_events.rs index 1f82dec..ba6b1b9 100644 --- a/src/binning/container_events.rs +++ b/src/binning/container_events.rs @@ -3,15 +3,19 @@ use super::aggregator::AggregatorNumeric; use super::aggregator::AggregatorTimeWeight; use super::aggregator::AggregatorVecNumeric; use super::timeweight::timeweight_events_dyn::BinnedEventsTimeweightDynbox; +use crate::apitypes::ContainerEventsApi; use crate::log::*; use core::fmt; use core::ops::Range; use daqbuf_err as err; use err::thiserror; use err::ThisError; -use items_0::apitypes::ContainerEventsApi; use items_0::apitypes::ToUserFacingApiType; use items_0::apitypes::UserApiType; +use items_0::collect_s::CollectableDyn; +use items_0::collect_s::CollectedDyn; +use items_0::collect_s::CollectorDyn; +use items_0::collect_s::CollectorTy; use items_0::collect_s::ToCborValue; use items_0::collect_s::ToJsonValue; use items_0::container::ByteEstimate; @@ -28,10 +32,10 @@ use items_0::Appendable; use items_0::AsAnyMut; use items_0::AsAnyRef; use items_0::Empty; +use items_0::TypeName; use items_0::WithLen; use netpod::BinnedRange; use netpod::EnumVariant; -use netpod::TsMs; use netpod::TsNano; use serde::Deserialize; use serde::Serialize; @@ -69,6 +73,7 @@ pub trait EventValueType: type AggTimeWeightOutputAvg: AggTimeWeightOutputAvg; type IterTy1<'a>: fmt::Debug + Clone + PartialOrdEvtA + Into; const SERDE_ID: u32; + const BYTE_ESTIMATE_V00: u32; } impl Container for VecDeque @@ -135,6 +140,7 @@ macro_rules! impl_event_value_type { type AggTimeWeightOutputAvg = f64; type IterTy1<'a> = $evt; const SERDE_ID: u32 = <$evt as SubFrId>::SUB; + const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<$evt>() as u32; } impl PartialOrdEvtA<$evt> for $evt { @@ -186,6 +192,7 @@ impl EventValueType for f32 { type AggTimeWeightOutputAvg = f32; type IterTy1<'a> = f32; const SERDE_ID: u32 = ::SUB; + const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::() as u32; } impl EventValueType for f64 { @@ -194,6 +201,7 @@ impl EventValueType for f64 { type AggTimeWeightOutputAvg = f64; type IterTy1<'a> = f64; const SERDE_ID: u32 = ::SUB; + const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::() as u32; } impl EventValueType for bool { @@ -202,6 +210,7 @@ impl EventValueType for bool { type AggTimeWeightOutputAvg = f64; type IterTy1<'a> = bool; const SERDE_ID: u32 = ::SUB; + const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::() as u32; } impl EventValueType for String { @@ -210,6 +219,7 @@ impl EventValueType for String { type AggTimeWeightOutputAvg = f64; type IterTy1<'a> = &'a str; const SERDE_ID: u32 = ::SUB; + const BYTE_ESTIMATE_V00: u32 = 400; } macro_rules! impl_event_value_type_vec { @@ -220,6 +230,8 @@ macro_rules! impl_event_value_type_vec { type AggTimeWeightOutputAvg = f32; type IterTy1<'a> = Vec<$evt>; const SERDE_ID: u32 = as SubFrId>::SUB; + // TODO must use a more precise number dependent on actual elements + const BYTE_ESTIMATE_V00: u32 = 1200 * core::mem::size_of::() as u32; } impl PartialOrdEvtA> for Vec<$evt> { @@ -244,6 +256,113 @@ impl_event_value_type_vec!(bool); impl_event_value_type_vec!(String); impl_event_value_type_vec!(EnumVariant); +#[derive(Debug, Clone, PartialEq, PartialOrd, Serialize)] +pub struct PulsedVal(EVT) +where + EVT: EventValueType; + +impl fmt::Display for PulsedVal +where + EVT: EventValueType, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{:?}", self) + } +} + +mod serde_pulsed_val { + use super::EventValueType; + use super::PulsedVal; + use serde::Deserialize; + use serde::Deserializer; + + impl<'de, EVT> Deserialize<'de> for PulsedVal + where + EVT: EventValueType, + { + fn deserialize(de: D) -> Result + where + D: Deserializer<'de>, + { + todo!("TODO mod serde_pulsed_val") + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VecDequePulsed +where + EVT: EventValueType, +{ + pulses: VecDeque, + vals: EVT::Container, +} + +impl PreviewRange for VecDequePulsed +where + EVT: EventValueType, +{ + fn preview<'a>(&'a self) -> Box { + todo!() + } +} + +impl Container> for VecDequePulsed +where + EVT: EventValueType, + PulsedVal: EventValueType, +{ + fn new() -> Self { + Self { + pulses: VecDeque::new(), + vals: <::Container as Container>::new(), + } + } + + fn push_back(&mut self, val: PulsedVal) { + todo!() + } + + fn pop_front(&mut self) -> Option> { + todo!() + } + + fn get_iter_ty_1(&self, pos: usize) -> Option< as EventValueType>::IterTy1<'_>> { + todo!() + } + + fn iter_ty_1(&self) -> impl Iterator as EventValueType>::IterTy1<'_>> { + todo!(); + self.vals.iter_ty_1().map(|x| todo!()) + } + + fn drain_into(&mut self, dst: &mut Self, range: Range) { + dst.pulses.extend(self.pulses.drain(range.clone())); + self.vals.drain_into(&mut dst.vals, range.clone()); + } +} + +macro_rules! impl_pulse_evt { + ($evt:ty) => { + impl EventValueType for PulsedVal<$evt> { + type Container = VecDequePulsed<$evt>; + type AggregatorTimeWeight = AggregatorNumeric; + type AggTimeWeightOutputAvg = f64; + type IterTy1<'a> = PulsedVal<$evt>; + const SERDE_ID: u32 = <$evt as SubFrId>::SUB; + const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<$evt>() as u32; + } + + impl PartialOrdEvtA> for PulsedVal<$evt> { + fn cmp_a(&self, other: &PulsedVal<$evt>) -> Option { + self.partial_cmp(other) + } + } + }; +} + +impl_pulse_evt!(u8); + #[derive(Debug, Clone)] pub struct EventSingleRef<'a, EVT> where @@ -543,7 +662,6 @@ where EVT: EventValueType, { evs: &'a ContainerEvents, - beg: usize, end: usize, pos: usize, } @@ -555,7 +673,6 @@ where pub fn new(evs: &'a ContainerEvents) -> Self { Self { evs, - beg: 0, end: evs.len(), pos: 0, } @@ -636,15 +753,15 @@ where fn find_highest_index_lt(&self, ts: TsNano) -> Option { let x = self.tss.partition_point(|&x| x < ts); - if x == 0 || x >= self.tss.len() { + if x == 0 { None } else { Some(x - 1) } } - fn tss_for_testing(&self) -> Vec { - self.tss.iter().map(|&x| x.to_ts_ms()).collect() + fn tss_for_testing(&self) -> VecDeque { + self.tss.clone() } fn drain_into(&mut self, dst: &mut Self, range: Range) -> DrainIntoDstResult { @@ -697,7 +814,7 @@ where MergeableTy::find_highest_index_lt(self, ts) } - fn tss_for_testing(&self) -> Vec { + fn tss_for_testing(&self) -> VecDeque { MergeableTy::tss_for_testing(self) } @@ -726,6 +843,146 @@ where } } +impl TypeName for ContainerEvents +where + EVT: EventValueType, +{ + fn type_name(&self) -> String { + std::any::type_name::().into() + } +} + +#[derive(Debug, Serialize)] +pub struct ContainerEventsCollected +where + EVT: EventValueType, +{ + evs: ContainerEvents, + range_final: bool, + timed_out: bool, +} + +impl WithLen for ContainerEventsCollected +where + EVT: EventValueType, +{ + fn len(&self) -> usize { + self.evs.len() + } +} + +impl TypeName for ContainerEventsCollected +where + EVT: EventValueType, +{ + fn type_name(&self) -> String { + std::any::type_name::().into() + } +} + +impl ToUserFacingApiType for ContainerEventsCollected +where + EVT: EventValueType, +{ + fn to_user_facing_api_type(self: Self) -> Box { + let evs = ContainerEventsApi:: { + tss: self.evs.tss.into_iter().map(|x| x.ns()).collect(), + values: self.evs.vals, + range_final: self.range_final, + timed_out: self.timed_out, + }; + Box::new(evs) + } + + fn to_user_facing_api_type_box(self: Box) -> Box { + (*self).to_user_facing_api_type() + } +} + +impl CollectedDyn for ContainerEventsCollected where EVT: EventValueType {} + +impl ByteEstimate for ContainerEventsCollector +where + EVT: EventValueType, +{ + fn byte_estimate(&self) -> u64 { + EVT::BYTE_ESTIMATE_V00 as _ + } +} + +#[derive(Debug)] +pub struct ContainerEventsCollector +where + EVT: EventValueType, +{ + evs: ContainerEvents, + range_final: bool, + timed_out: bool, +} + +impl ContainerEventsCollector +where + EVT: EventValueType, +{ + pub fn new() -> Self { + Self { + evs: ContainerEvents::new(), + range_final: false, + timed_out: false, + } + } +} + +impl WithLen for ContainerEventsCollector +where + EVT: EventValueType, +{ + fn len(&self) -> usize { + self.evs.len() + } +} + +impl CollectorTy for ContainerEventsCollector +where + EVT: EventValueType, +{ + type Input = ContainerEvents; + type Output = ContainerEventsCollected; + + fn ingest(&mut self, src: &mut Self::Input) { + let n = self.len(); + info!("CollectorTy for ContainerEventsCollector n {}", n); + MergeableTy::drain_into(src, &mut self.evs, 0..n); + } + + fn set_range_complete(&mut self) { + self.range_final = true; + } + + fn set_timed_out(&mut self) { + self.timed_out = true; + } + + fn result(&mut self) -> Result { + let ret = Self::Output { + evs: self.evs.clone(), + range_final: self.range_final, + timed_out: self.timed_out, + }; + Ok(ret) + } +} + +impl CollectableDyn for ContainerEvents +where + EVT: EventValueType, +{ + fn new_collector(&self) -> Box { + // crate::eventsdim0::EventsDim0; + Box::new(ContainerEventsCollector::::new()) + } +} + impl ToCborValue for ContainerEvents where EVT: EventValueType, @@ -749,31 +1006,11 @@ where EVT: EventValueType, { fn to_user_facing_api_type(self: Self) -> Box { - let this = self; - let tss: VecDeque<_> = this.tss.into_iter().map(|x| x.ms()).collect(); - let ret = ContainerEventsApi { - tss: tss.clone(), - values: tss.clone(), - }; - Box::new(ret) - } - - fn to_user_facing_api_type_box(self: Box) -> Box { - let this = *self; - this.to_user_facing_api_type() - } -} - -impl ToUserFacingApiType for Box> -where - EVT: EventValueType, -{ - fn to_user_facing_api_type(self: Self) -> Box { - let this = *self; - let tss: VecDeque<_> = this.tss.into_iter().map(|x| x.ms()).collect(); - let ret = ContainerEventsApi { - tss: tss.clone(), - values: tss.clone(), + let ret = ContainerEventsApi:: { + tss: self.tss.into_iter().map(|x| x.ns()).collect(), + values: self.vals, + range_final: false, + timed_out: false, }; Box::new(ret) } @@ -788,10 +1025,6 @@ impl BinningggContainerEventsDyn for ContainerEvents where EVT: EventValueType, { - fn type_name(&self) -> &'static str { - std::any::type_name::() - } - fn binned_events_timeweight_traitobj( &self, range: BinnedRange, @@ -827,6 +1060,10 @@ where fn as_mergeable_dyn_mut(&mut self) -> &mut dyn MergeableDyn { self } + + fn as_collectable_dyn_mut(&mut self) -> &mut dyn CollectableDyn { + self + } } #[cfg(test)] diff --git a/src/binning/valuetype.rs b/src/binning/valuetype.rs index e7fae65..4a4e63b 100644 --- a/src/binning/valuetype.rs +++ b/src/binning/valuetype.rs @@ -131,4 +131,5 @@ impl EventValueType for EnumVariant { type AggTimeWeightOutputAvg = f32; type IterTy1<'a> = EnumVariantRef<'a>; const SERDE_ID: u32 = Self::SUB; + const BYTE_ESTIMATE_V00: u32 = 40; } diff --git a/src/channelevents.rs b/src/channelevents.rs index 332cd2d..3c0ae44 100644 --- a/src/channelevents.rs +++ b/src/channelevents.rs @@ -29,7 +29,6 @@ use items_0::TypeName; use items_0::WithLen; use netpod::range::evrange::SeriesRange; use netpod::BinnedRangeEnum; -use netpod::TsMs; use netpod::TsNano; use serde::Deserialize; use serde::Serialize; @@ -765,12 +764,12 @@ impl MergeableTy for ChannelEvents { } } - fn tss_for_testing(&self) -> Vec { + fn tss_for_testing(&self) -> VecDeque { match self { ChannelEvents::Events(x) => x.tss_for_testing(), ChannelEvents::Status(x) => match x { - Some(x) => vec![x.ts.to_ts_ms()], - None => Vec::new(), + Some(x) => [x.ts].into_iter().collect(), + None => VecDeque::new(), }, } } @@ -824,9 +823,13 @@ impl WithLen for ChannelEventsCollectorOutput { } } -impl items_0::collect_s::ToJsonValue for ChannelEventsCollectorOutput { - fn to_json_value(&self) -> Result { - serde_json::to_value(self) +impl ToUserFacingApiType for ChannelEventsCollectorOutput { + fn to_user_facing_api_type(self: Self) -> Box { + todo!() + } + + fn to_user_facing_api_type_box(self: Box) -> Box { + todo!() } } @@ -876,13 +879,8 @@ impl CollectorDyn for ChannelEventsCollector { if let Some(item) = item.as_any_mut().downcast_mut::() { match item { ChannelEvents::Events(item) => { - // let coll = self.coll.get_or_insert_with(|| { - // item.as_ref() - // .as_collectable_with_default_ref() - // .new_collector() - // }); - // coll.ingest(item.as_collectable_with_default_mut()); - todo!() + let coll = self.coll.get_or_insert_with(|| item.new_collector()); + coll.ingest(item.as_collectable_dyn_mut()); } ChannelEvents::Status(_) => { // TODO decide on output format to collect also the connection status events @@ -908,21 +906,9 @@ impl CollectorDyn for ChannelEventsCollector { self.timed_out = true; } - fn set_continue_at_here(&mut self) { - self.needs_continue_at = true; - } - - fn result( - &mut self, - range: Option, - binrange: Option, - ) -> Result, err::Error> { + fn result(&mut self) -> Result, err::Error> { match self.coll.as_mut() { Some(coll) => { - if self.needs_continue_at { - debug!("ChannelEventsCollector set_continue_at_here"); - coll.set_continue_at_here(); - } if self.range_complete { coll.set_range_complete(); } @@ -930,7 +916,7 @@ impl CollectorDyn for ChannelEventsCollector { debug!("ChannelEventsCollector set_timed_out"); coll.set_timed_out(); } - let res = coll.result(range, binrange)?; + let res = coll.result()?; Ok(res) } None => { diff --git a/src/eventfull.rs b/src/eventfull.rs index d169dfc..0de82f3 100644 --- a/src/eventfull.rs +++ b/src/eventfull.rs @@ -251,11 +251,8 @@ impl MergeableTy for EventFull { None } - fn tss_for_testing(&self) -> Vec { - self.tss - .iter() - .map(|x| netpod::TsMs::from_ns_u64(*x)) - .collect() + fn tss_for_testing(&self) -> VecDeque { + self.tss.iter().map(|&x| TsNano::from_ns(x)).collect() } fn is_consistent(&self) -> bool { diff --git a/src/eventsdim0.rs b/src/eventsdim0.rs index 765a0c2..baa77b4 100644 --- a/src/eventsdim0.rs +++ b/src/eventsdim0.rs @@ -1,6 +1,7 @@ use crate::IsoDateTime; use daqbuf_err as err; use err::Error; +use items_0::apitypes::ToUserFacingApiType; use items_0::collect_s::CollectableDyn; use items_0::collect_s::CollectedDyn; use items_0::collect_s::CollectorTy; @@ -380,6 +381,16 @@ impl ToJsonValue for EventsDim0CollectorOutput { } } +impl ToUserFacingApiType for EventsDim0CollectorOutput { + fn to_user_facing_api_type(self: Self) -> Box { + todo!() + } + + fn to_user_facing_api_type_box(self: Box) -> Box { + todo!() + } +} + impl CollectedDyn for EventsDim0CollectorOutput {} impl CollectorTy for EventsDim0Collector { @@ -401,15 +412,8 @@ impl CollectorTy for EventsDim0Collector { self.needs_continue_at = true; } - fn set_continue_at_here(&mut self) { - self.needs_continue_at = true; - } - - fn result( - &mut self, - range: Option, - _binrange: Option, - ) -> Result { + fn result(&mut self) -> Result { + let range: Option = None; debug!( "{} result() needs_continue_at {}", Self::self_name(), @@ -441,10 +445,8 @@ impl CollectorTy for EventsDim0Collector { } else { None }; - let tss_sl = vals.tss.make_contiguous(); - let pulses_sl = vals.pulses.make_contiguous(); - let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(tss_sl); - let (pulse_anchor, pulse_off) = crate::offsets::pulse_offs_from_abs(pulses_sl); + let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(todo!()); + let (pulse_anchor, pulse_off) = crate::offsets::pulse_offs_from_abs(&self.vals.pulses); let values = mem::replace(&mut vals.values, VecDeque::new()); if ts_off_ms.len() != ts_off_ns.len() { return Err(Error::with_msg_no_trace("collected len mismatch")); @@ -677,14 +679,10 @@ impl Events for EventsDim0 { fn to_json_string(&self) -> String { // TODO redesign with mut access, rename to `into_` and take the values out. - let mut tss = self.tss.clone(); - let mut pulses = self.pulses.clone(); let mut values = self.values.clone(); - let tss_sl = tss.make_contiguous(); - let pulses_sl = pulses.make_contiguous(); - let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(tss_sl); - let (pulse_anchor, pulse_off) = crate::offsets::pulse_offs_from_abs(pulses_sl); - let values = mem::replace(&mut values, VecDeque::new()); + let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(todo!()); + let (pulse_anchor, pulse_off) = crate::offsets::pulse_offs_from_abs(&self.pulses); + // let values = mem::replace(&mut values, VecDeque::new()); let ret = EventsDim0CollectorOutput { ts_anchor_sec, ts_off_ms, diff --git a/src/eventsdim0enum.rs b/src/eventsdim0enum.rs index 9abf329..3ef5953 100644 --- a/src/eventsdim0enum.rs +++ b/src/eventsdim0enum.rs @@ -1,5 +1,6 @@ use daqbuf_err as err; use err::Error; +use items_0::apitypes::ToUserFacingApiType; use items_0::collect_s::CollectableDyn; use items_0::collect_s::CollectedDyn; use items_0::collect_s::CollectorDyn; @@ -127,8 +128,12 @@ impl TypeName for EventsDim0EnumCollectorOutput { } } -impl ToJsonValue for EventsDim0EnumCollectorOutput { - fn to_json_value(&self) -> Result { +impl ToUserFacingApiType for EventsDim0EnumCollectorOutput { + fn to_user_facing_api_type(self: Self) -> Box { + todo!() + } + + fn to_user_facing_api_type_box(self: Box) -> Box { todo!() } } @@ -154,20 +159,13 @@ impl CollectorTy for EventsDim0EnumCollector { self.needs_continue_at = true; } - fn set_continue_at_here(&mut self) { - self.needs_continue_at = true; - } - - fn result( - &mut self, - range: Option, - _binrange: Option, - ) -> Result { + fn result(&mut self) -> Result { trace_collect_result!( "{} result() needs_continue_at {}", self.type_name(), self.needs_continue_at ); + let range: Option = None; // If we timed out, we want to hint the client from where to continue. // This is tricky: currently, client can not request a left-exclusive range. // We currently give the timestamp of the last event plus a small delta. @@ -194,8 +192,7 @@ impl CollectorTy for EventsDim0EnumCollector { } else { None }; - let tss_sl = vals.tss.make_contiguous(); - let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(tss_sl); + let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(todo!()); let valixs = mem::replace(&mut vals.values, VecDeque::new()); let valstrs = mem::replace(&mut vals.valuestrs, VecDeque::new()); let vals = valixs; diff --git a/src/eventsdim1.rs b/src/eventsdim1.rs index 9fa8b02..8fb27e3 100644 --- a/src/eventsdim1.rs +++ b/src/eventsdim1.rs @@ -1,6 +1,7 @@ use crate::IsoDateTime; use daqbuf_err as err; use err::Error; +use items_0::apitypes::ToUserFacingApiType; use items_0::collect_s::CollectableDyn; use items_0::collect_s::CollectableType; use items_0::collect_s::CollectedDyn; @@ -335,9 +336,13 @@ impl WithLen for EventsDim1CollectorOutput { } } -impl ToJsonValue for EventsDim1CollectorOutput { - fn to_json_value(&self) -> Result { - serde_json::to_value(self) +impl ToUserFacingApiType for EventsDim1CollectorOutput { + fn to_user_facing_api_type(self: Self) -> Box { + todo!() + } + + fn to_user_facing_api_type_box(self: Box) -> Box { + todo!() } } @@ -361,22 +366,14 @@ impl CollectorTy for EventsDim1Collector { self.timed_out = true; } - fn set_continue_at_here(&mut self) { - debug!("{}::set_continue_at_here", Self::self_name()); - self.needs_continue_at = true; - } - // TODO unify with dim0 case - fn result( - &mut self, - range: Option, - _binrange: Option, - ) -> Result { + fn result(&mut self) -> Result { // If we timed out, we want to hint the client from where to continue. // This is tricky: currently, client can not request a left-exclusive range. // We currently give the timestamp of the last event plus a small delta. // The amount of the delta must take into account what kind of timestamp precision the client // can parse and handle. + let range: Option = None; let vals = &mut self.vals; let continue_at = if self.timed_out { if let Some(ts) = vals.tss.back() { @@ -398,10 +395,8 @@ impl CollectorTy for EventsDim1Collector { } else { None }; - let tss_sl = vals.tss.make_contiguous(); - let pulses_sl = vals.pulses.make_contiguous(); - let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(tss_sl); - let (pulse_anchor, pulse_off) = crate::offsets::pulse_offs_from_abs(pulses_sl); + let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(todo!()); + let (pulse_anchor, pulse_off) = crate::offsets::pulse_offs_from_abs(&vals.pulses); let values = mem::replace(&mut vals.values, VecDeque::new()); if ts_off_ms.len() != ts_off_ns.len() { return Err(Error::with_msg_no_trace("collected len mismatch")); diff --git a/src/lib.rs b/src/lib.rs index 8d44ee3..b82735c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ pub mod accounting; +pub mod apitypes; pub mod binning; pub mod channelevents; pub mod empty; @@ -27,7 +28,7 @@ mod log { #[cfg(not(test))] pub use netpod::log::*; #[cfg(test)] - pub use netpod::log::*; + pub use netpod::log_direct::*; } #[derive(Debug, PartialEq)] diff --git a/src/merger.rs b/src/merger.rs index 7655b34..581a0c9 100644 --- a/src/merger.rs +++ b/src/merger.rs @@ -1,3 +1,6 @@ +#[cfg(test)] +mod test; + use crate::log::*; use core::ops::Range; use futures_util::Stream; @@ -34,6 +37,8 @@ macro_rules! trace3 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } macro_rules! trace4 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } + #[derive(Debug, thiserror::Error)] #[cstm(name = "MergerError")] pub enum Error { @@ -56,8 +61,7 @@ pub struct Merger { out_of_band_queue: VecDeque>, log_queue: VecDeque, dim0ix_max: TsNano, - done_data: bool, - done_buffered: bool, + done_inp: bool, done_range_complete: bool, complete: bool, poll_count: usize, @@ -75,8 +79,7 @@ where .field("out_max_len", &self.out_max_len) .field("range_complete", &self.range_complete) .field("out_of_band_queue", &self.out_of_band_queue.len()) - .field("done_data", &self.done_data) - .field("done_buffered", &self.done_buffered) + .field("done_data", &self.done_inp) .field("done_range_complete", &self.done_range_complete) .finish() } @@ -98,8 +101,7 @@ where out_of_band_queue: VecDeque::new(), log_queue: VecDeque::new(), dim0ix_max: TsNano::from_ns(0), - done_data: false, - done_buffered: false, + done_inp: false, done_range_complete: false, complete: false, poll_count: 0, @@ -380,16 +382,22 @@ where || self.do_clear_out || last_emit { - if o.len() > self.out_max_len { + if o.len() > 2 * self.out_max_len { debug!( - "MERGER OVERWEIGHT ITEM {} vs {}", + "MERGER OVERLENGTH ITEM {} vs {}", o.len(), self.out_max_len ); } + if o.byte_estimate() > 2 * OUT_MAX_BYTES { + debug!( + "MERGER OVERWEIGHT ITEM {} vs {}", + o.byte_estimate(), + OUT_MAX_BYTES + ); + } trace3!("decide to output"); self.do_clear_out = false; - //Break(Ready(Some(Ok(self.out.take().unwrap())))) let item = sitem_data(self.out.take().unwrap()); self.out_of_band_queue.push_back(item); Continue(()) @@ -431,9 +439,7 @@ where let _spg = span1.enter(); loop { trace3!("poll"); - break if let Some(item) = self.log_queue.pop_front() { - Ready(Some(Ok(StreamItem::Log(item)))) - } else if self.poll_count == usize::MAX { + break if self.poll_count == usize::MAX { self.done_range_complete = true; continue; } else if self.complete { @@ -441,46 +447,42 @@ where } else if self.done_range_complete { self.complete = true; Ready(None) - } else if self.done_buffered { - self.done_range_complete = true; - if self.range_complete.iter().all(|x| *x) { - trace!("emit RangeComplete"); - Ready(Some(Ok(StreamItem::DataItem( - RangeCompletableItem::RangeComplete, - )))) - } else { - continue; - } - } else if self.done_data { - trace!("done_data"); - self.done_buffered = true; - if let Some(out) = self.out.take() { - trace!("done_data emit buffered len {}", out.len()); - Ready(Some(sitem_data(out))) - } else { - continue; - } + } else if let Some(item) = self.log_queue.pop_front() { + Ready(Some(Ok(StreamItem::Log(item)))) } else if let Some(item) = self.out_of_band_queue.pop_front() { + trace_emit!("emit item"); let item = on_sitemty_data!(item, |k: T| { - trace3!("emit out-of-band data len {}", k.len()); + trace_emit!("emit item len {}", k.len()); sitem_data(k) }); Ready(Some(item)) - } else { + } else if self.done_inp == false { match Self::poll2(self.as_mut(), cx) { ControlFlow::Continue(()) => continue, ControlFlow::Break(k) => match k { Ready(Some(e)) => { - self.done_data = true; + self.done_inp = true; Ready(Some(Err(sitem_err2_from_string(e)))) } Ready(None) => { - self.done_data = true; + self.done_inp = true; + if let Some(out) = self.out.take() { + trace!("done_data emit buffered len {}", out.len()); + self.out_of_band_queue.push_back(sitem_data(out)); + } continue; } Pending => Pending, }, } + } else { + self.done_range_complete = true; + if self.range_complete.iter().all(|x| *x) { + trace!("emit RangeComplete"); + let item = Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)); + self.out_of_band_queue.push_back(item); + } + continue; }; } } diff --git a/src/merger/test.rs b/src/merger/test.rs new file mode 100644 index 0000000..4b10ece --- /dev/null +++ b/src/merger/test.rs @@ -0,0 +1,32 @@ +use super::MergeInp; +use super::Merger; +use crate::binning::container_events::ContainerEvents; +use crate::log::*; +use futures_util::StreamExt; +use items_0::streamitem::sitem_data; +use netpod::TsNano; + +async fn merger_00_inner() { + let mut evs0 = ContainerEvents::::new(); + evs0.push_back(TsNano::from_ns(9), 9.0); + let mut evs1 = ContainerEvents::::new(); + evs1.push_back(TsNano::from_ns(11), 11.0); + let inp0: MergeInp<_> = Box::pin(futures_util::stream::iter([ + sitem_data(evs0), + sitem_data(evs1), + ])); + let inps = vec![inp0]; + let mut merger = Merger::new(inps, None); + while let Some(x) = merger.next().await { + trace!("{:?}", x); + } + trace!("DONE"); +} + +#[test] +fn merger_00() { + tokio::runtime::Builder::new_current_thread() + .build() + .unwrap() + .block_on(merger_00_inner()); +} diff --git a/src/offsets.rs b/src/offsets.rs index 9d91046..e99f3d9 100644 --- a/src/offsets.rs +++ b/src/offsets.rs @@ -1,35 +1,30 @@ use netpod::timeunits::MS; use netpod::timeunits::SEC; +use netpod::TsNano; use std::collections::VecDeque; -pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, VecDeque, VecDeque) { - let ts_anchor_sec = tss.first().map_or(0, |&k| k) / SEC; - let ts_anchor_ns = ts_anchor_sec * SEC; - let ts_off_ms: VecDeque<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect(); - let ts_off_ns = tss - .iter() - .zip(ts_off_ms.iter().map(|&k| k * MS)) - .map(|(&j, k)| (j - ts_anchor_ns - k)) - .collect(); - (ts_anchor_sec, ts_off_ms, ts_off_ns) -} - pub fn ts_offs_from_abs_with_anchor( ts_anchor_sec: u64, - tss: &[u64], + tss: &VecDeque, ) -> (VecDeque, VecDeque) { let ts_anchor_ns = ts_anchor_sec * SEC; - let ts_off_ms: VecDeque<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect(); + let ts_off_ms: VecDeque<_> = tss.iter().map(|&k| (k.ns() - ts_anchor_ns) / MS).collect(); let ts_off_ns = tss .iter() .zip(ts_off_ms.iter().map(|&k| k * MS)) - .map(|(&j, k)| (j - ts_anchor_ns - k)) + .map(|(&j, k)| (j.ns() - ts_anchor_ns - k)) .collect(); (ts_off_ms, ts_off_ns) } -pub fn pulse_offs_from_abs(pulse: &[u64]) -> (u64, VecDeque) { - let pulse_anchor = pulse.first().map_or(0, |&k| k) / 10000 * 10000; - let pulse_off = pulse.iter().map(|&k| k - pulse_anchor).collect(); +pub fn ts_offs_from_abs(tss: &VecDeque) -> (u64, VecDeque, VecDeque) { + let ts_anchor_sec = tss.front().map_or(TsNano::from_ns(0), |&k| k).ns() / SEC; + let (ts_off_ms, ts_off_ns) = ts_offs_from_abs_with_anchor(ts_anchor_sec, tss); + (ts_anchor_sec, ts_off_ms, ts_off_ns) +} + +pub fn pulse_offs_from_abs(pulses: &VecDeque) -> (u64, VecDeque) { + let pulse_anchor = pulses.front().map_or(0, |&k| k) / 10000 * 10000; + let pulse_off = pulses.iter().map(|&k| k - pulse_anchor).collect(); (pulse_anchor, pulse_off) }