diff --git a/src/channelevents.rs b/src/channelevents.rs index ff1b3d4..e815646 100644 --- a/src/channelevents.rs +++ b/src/channelevents.rs @@ -9,8 +9,6 @@ use items_0::apitypes::UserApiType; use items_0::collect_s::CollectableDyn; use items_0::collect_s::CollectedDyn; use items_0::collect_s::CollectorDyn; -use items_0::collect_s::ToCborValue; -use items_0::collect_s::ToJsonValue; use items_0::container::ByteEstimate; use items_0::framable::FrameTypeInnerStatic; use items_0::isodate::IsoDateTime; @@ -23,12 +21,9 @@ use items_0::timebin::BinningggContainerEventsDyn; use items_0::AsAnyMut; use items_0::AsAnyRef; use items_0::Empty; -use items_0::EventsNonObj; use items_0::Extendable; use items_0::TypeName; use items_0::WithLen; -use netpod::range::evrange::SeriesRange; -use netpod::BinnedRangeEnum; use netpod::TsNano; use serde::Deserialize; use serde::Serialize; @@ -819,12 +814,6 @@ impl MergeableTy for ChannelEvents { } } -impl EventsNonObj for ChannelEvents { - fn into_tss_pulses(self: Box) -> (VecDeque, VecDeque) { - todo!() - } -} - impl CollectableDyn for ChannelEvents { fn new_collector(&self) -> Box { Box::new(ChannelEventsCollector::new()) diff --git a/src/eventsdim0.rs b/src/eventsdim0.rs deleted file mode 100644 index 4f7242a..0000000 --- a/src/eventsdim0.rs +++ /dev/null @@ -1,768 +0,0 @@ -use crate::IsoDateTime; -use daqbuf_err as err; -use err::Error; -use items_0::apitypes::ToUserFacingApiType; -use items_0::collect_s::CollectableDyn; -use items_0::collect_s::CollectedDyn; -use items_0::collect_s::CollectorTy; -use items_0::collect_s::ToJsonValue; -use items_0::container::ByteEstimate; -use items_0::overlap::HasTimestampDeque; -use items_0::scalar_ops::ScalarOps; -use items_0::Appendable; -use items_0::AsAnyMut; -use items_0::AsAnyRef; -use items_0::Empty; -use items_0::Events; -use items_0::EventsNonObj; -use items_0::Resettable; -use items_0::TypeName; -use items_0::WithLen; -use netpod::is_false; -use netpod::log::*; -use netpod::range::evrange::SeriesRange; -use netpod::timeunits::MS; -use netpod::timeunits::SEC; -use netpod::BinnedRangeEnum; -use netpod::TsNano; -use serde::Deserialize; -use serde::Serialize; -use std::any; -use std::any::Any; -use std::collections::VecDeque; -use std::fmt; -use std::mem; - -#[allow(unused)] -macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } - -#[allow(unused)] -macro_rules! trace_ingest_item { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } - -#[allow(unused)] -macro_rules! trace_ingest_event { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } - -#[allow(unused)] -macro_rules! trace2 { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } - -#[allow(unused)] -macro_rules! trace_binning { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } - -#[allow(unused)] -macro_rules! debug_ingest { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } - -#[derive(Clone, PartialEq, Serialize, Deserialize)] -pub struct EventsDim0NoPulse { - pub tss: VecDeque, - pub values: VecDeque, -} - -impl From> for EventsDim0 { - fn from(value: EventsDim0NoPulse) -> Self { - let pulses = vec![0; value.tss.len()].into(); - Self { - tss: value.tss, - pulses, - values: value.values, - } - } -} - -#[derive(Clone, PartialEq, Serialize, Deserialize)] -pub struct EventsDim0 { - pub tss: VecDeque, - pub pulses: VecDeque, - pub values: VecDeque, -} - -impl EventsDim0 { - pub fn type_name() -> &'static str { - std::any::type_name::() - } - - pub fn push_back(&mut self, ts: u64, pulse: u64, value: STY) { - self.tss.push_back(ts); - self.pulses.push_back(pulse); - self.values.push_back(value); - } - - pub fn push_front(&mut self, ts: u64, pulse: u64, value: STY) { - self.tss.push_front(ts); - self.pulses.push_front(pulse); - self.values.push_front(value); - } - - pub fn serde_id() -> &'static str { - "EventsDim0" - } - - pub fn tss(&self) -> &VecDeque { - &self.tss - } - - // only for testing at the moment - pub fn private_values_ref(&self) -> &VecDeque { - &self.values - } - pub fn private_values_mut(&mut self) -> &mut VecDeque { - &mut self.values - } -} - -impl AsAnyRef for EventsDim0 -where - STY: ScalarOps, -{ - fn as_any_ref(&self) -> &dyn Any { - self - } -} - -impl AsAnyMut for EventsDim0 -where - STY: ScalarOps, -{ - fn as_any_mut(&mut self) -> &mut dyn Any { - self - } -} - -impl Empty for EventsDim0 { - fn empty() -> Self { - Self { - tss: VecDeque::new(), - pulses: VecDeque::new(), - values: VecDeque::new(), - } - } -} - -impl fmt::Debug for EventsDim0 -where - STY: fmt::Debug, -{ - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - if false { - write!( - fmt, - "{} {{ count {} ts {:?} vals {:?} }}", - self.type_name(), - self.tss.len(), - self.tss.iter().map(|x| x / SEC).collect::>(), - self.values, - ) - } else { - write!( - fmt, - "{} {{ count {} ts {:?} .. {:?} vals {:?} .. {:?} }}", - self.type_name(), - self.tss.len(), - self.tss.front().map(|&x| TsNano::from_ns(x)), - self.tss.back().map(|&x| TsNano::from_ns(x)), - self.values.front(), - self.values.back(), - ) - } - } -} - -impl WithLen for EventsDim0 { - fn len(&self) -> usize { - self.tss.len() - } -} - -impl ByteEstimate for EventsDim0 { - fn byte_estimate(&self) -> u64 { - // TODO - // Should use a better estimate for waveform and string types, - // or keep some aggregated byte count on push. - let n = self.len(); - if n == 0 { - 0 - } else { - // TODO use the actual size of one/some of the elements. - let i = n * 2 / 3; - let sty_bytes = self.values[i].byte_estimate(); - (n as u64 * (8 + 8 + sty_bytes)) as u64 - } - } -} - -impl Resettable for EventsDim0 { - fn reset(&mut self) { - self.tss.clear(); - self.pulses.clear(); - self.values.clear(); - } -} - -impl HasTimestampDeque for EventsDim0 { - fn timestamp_min(&self) -> Option { - self.tss.front().map(|x| *x) - } - - fn timestamp_max(&self) -> Option { - self.tss.back().map(|x| *x) - } - - fn pulse_min(&self) -> Option { - self.pulses.front().map(|x| *x) - } - - fn pulse_max(&self) -> Option { - self.pulses.back().map(|x| *x) - } -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct EventsDim0ChunkOutput { - tss: VecDeque, - pulses: VecDeque, - values: VecDeque, - scalar_type: String, -} - -impl EventsDim0ChunkOutput {} - -#[derive(Debug)] -pub struct EventsDim0Collector { - vals: EventsDim0, - range_final: bool, - timed_out: bool, - needs_continue_at: bool, -} - -impl EventsDim0Collector { - pub fn self_name() -> &'static str { - any::type_name::() - } - - pub fn new() -> Self { - debug!("EventsDim0Collector NEW"); - Self { - vals: EventsDim0::empty(), - range_final: false, - timed_out: false, - needs_continue_at: false, - } - } -} - -impl WithLen for EventsDim0Collector { - fn len(&self) -> usize { - WithLen::len(&self.vals) - } -} - -impl ByteEstimate for EventsDim0Collector { - fn byte_estimate(&self) -> u64 { - ByteEstimate::byte_estimate(&self.vals) - } -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct EventsDim0CollectorOutput { - #[serde(rename = "tsAnchor")] - ts_anchor_sec: u64, - #[serde(rename = "tsMs")] - ts_off_ms: VecDeque, - #[serde(rename = "tsNs")] - ts_off_ns: VecDeque, - #[serde(rename = "pulseAnchor")] - pulse_anchor: u64, - #[serde(rename = "pulseOff")] - pulse_off: VecDeque, - #[serde(rename = "values")] - values: VecDeque, - #[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")] - range_final: bool, - #[serde(rename = "timedOut", default, skip_serializing_if = "is_false")] - timed_out: bool, - #[serde( - rename = "continueAt", - default, - skip_serializing_if = "Option::is_none" - )] - continue_at: Option, -} - -impl EventsDim0CollectorOutput { - pub fn ts_anchor_sec(&self) -> u64 { - self.ts_anchor_sec - } - - pub fn ts_off_ms(&self) -> &VecDeque { - &self.ts_off_ms - } - - pub fn pulse_anchor(&self) -> u64 { - self.pulse_anchor - } - - pub fn pulse_off(&self) -> &VecDeque { - &self.pulse_off - } - - /// Note: only used for unit tests. - pub fn values_to_f32(&self) -> VecDeque { - self.values.iter().map(|x| x.as_prim_f32_b()).collect() - } - - pub fn range_final(&self) -> bool { - self.range_final - } - - pub fn timed_out(&self) -> bool { - self.timed_out - } - - pub fn is_valid(&self) -> bool { - if self.ts_off_ms.len() != self.ts_off_ns.len() { - false - } else if self.ts_off_ms.len() != self.pulse_off.len() { - false - } else if self.ts_off_ms.len() != self.values.len() { - false - } else { - true - } - } - - pub fn info_str(&self) -> String { - use fmt::Write; - let mut out = String::new(); - write!( - out, - "ts_off_ms {} ts_off_ns {} pulse_off {} values {}", - self.ts_off_ms.len(), - self.ts_off_ns.len(), - self.pulse_off.len(), - self.values.len(), - ) - .unwrap(); - out - } -} - -impl AsAnyRef for EventsDim0CollectorOutput -where - STY: 'static, -{ - fn as_any_ref(&self) -> &dyn Any { - self - } -} - -impl AsAnyMut for EventsDim0CollectorOutput -where - STY: 'static, -{ - fn as_any_mut(&mut self) -> &mut dyn Any { - self - } -} - -impl TypeName for EventsDim0CollectorOutput { - fn type_name(&self) -> String { - any::type_name::().into() - } -} - -impl WithLen for EventsDim0CollectorOutput { - fn len(&self) -> usize { - self.values.len() - } -} - -impl ToUserFacingApiType for EventsDim0CollectorOutput { - fn into_user_facing_api_type(self: Self) -> Box { - todo!() - } - - fn into_user_facing_api_type_box(self: Box) -> Box { - todo!() - } -} - -impl CollectedDyn for EventsDim0CollectorOutput {} - -impl CollectorTy for EventsDim0Collector { - type Input = EventsDim0; - type Output = EventsDim0CollectorOutput; - - fn ingest(&mut self, src: &mut Self::Input) { - self.vals.tss.append(&mut src.tss); - self.vals.pulses.append(&mut src.pulses); - self.vals.values.append(&mut src.values); - } - - fn set_range_complete(&mut self) { - self.range_final = true; - } - - fn set_timed_out(&mut self) { - self.timed_out = true; - self.needs_continue_at = true; - } - - fn result(&mut self) -> Result { - let range: Option = None; - debug!( - "{} result() needs_continue_at {}", - Self::self_name(), - self.needs_continue_at - ); - // If we timed out, we want to hint the client from where to continue. - // This is tricky: currently, client can not request a left-exclusive range. - // We currently give the timestamp of the last event plus a small delta. - // The amount of the delta must take into account what kind of timestamp precision the client - // can parse and handle. - let vals = &mut self.vals; - let continue_at = if self.needs_continue_at { - if let Some(ts) = vals.tss.back() { - let x = Some(IsoDateTime::from_ns_u64(*ts / MS * MS + MS)); - x - } else { - if let Some(range) = &range { - match range { - SeriesRange::TimeRange(x) => Some(IsoDateTime::from_ns_u64(x.beg + SEC)), - SeriesRange::PulseRange(_) => { - error!("TODO emit create continueAt for pulse range"); - Some(IsoDateTime::from_ns_u64(0)) - } - } - } else { - Some(IsoDateTime::from_ns_u64(0)) - } - } - } else { - None - }; - let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(todo!()); - let (pulse_anchor, pulse_off) = crate::offsets::pulse_offs_from_abs(&self.vals.pulses); - let values = mem::replace(&mut vals.values, VecDeque::new()); - if ts_off_ms.len() != ts_off_ns.len() { - return Err(Error::with_msg_no_trace("collected len mismatch")); - } - if ts_off_ms.len() != pulse_off.len() { - return Err(Error::with_msg_no_trace("collected len mismatch")); - } - if ts_off_ms.len() != values.len() { - return Err(Error::with_msg_no_trace("collected len mismatch")); - } - let ret = Self::Output { - ts_anchor_sec, - ts_off_ms, - ts_off_ns, - pulse_anchor, - pulse_off, - values, - range_final: self.range_final, - timed_out: self.timed_out, - continue_at, - }; - if !ret.is_valid() { - error!("invalid:\n{}", ret.info_str()); - } - Ok(ret) - } -} - -impl items_0::collect_s::CollectableType for EventsDim0 { - type Collector = EventsDim0Collector; - - fn new_collector() -> Self::Collector { - Self::Collector::new() - } -} - -impl TypeName for EventsDim0 { - fn type_name(&self) -> String { - let self_name = any::type_name::(); - format!("{self_name}") - } -} - -impl EventsNonObj for EventsDim0 { - fn into_tss_pulses(self: Box) -> (VecDeque, VecDeque) { - trace!( - "{}::into_tss_pulses len {} len {}", - Self::type_name(), - self.tss.len(), - self.pulses.len() - ); - (self.tss, self.pulses) - } -} - -macro_rules! try_to_container_events { - ($sty:ty, $this:expr) => { - let this = $this; - if let Some(evs) = this.as_any_ref().downcast_ref::>() { - use crate::binning::container_events::ContainerEvents; - let tss = this.tss.iter().map(|&x| TsNano::from_ns(x)).collect(); - let vals = evs.values.clone(); - let ret = ContainerEvents::<$sty>::from_constituents(tss, vals); - return Box::new(ret); - } - }; -} - -impl Events for EventsDim0 { - fn verify(&self) -> bool { - let mut good = true; - let n = self.tss.len(); - for (&ts1, &ts2) in self.tss.iter().zip(self.tss.range(n.min(1)..n)) { - if ts1 > ts2 { - good = false; - error!("unordered event data ts1 {} ts2 {}", ts1, ts2); - break; - } - } - good - } - - fn output_info(&self) -> String { - let n2 = self.tss.len().max(1) - 1; - let min = if let Some(ts) = self.tss.get(0) { - TsNano::from_ns(*ts).fmt().to_string() - } else { - String::from("None") - }; - let max = if let Some(ts) = self.tss.get(n2) { - TsNano::from_ns(*ts).fmt().to_string() - } else { - String::from("None") - }; - format!( - "EventsDim0OutputInfo {{ len {}, ts_min {}, ts_max {} }}", - self.tss.len(), - min, - max, - ) - } - - fn as_collectable_mut(&mut self) -> &mut dyn CollectableDyn { - self - } - - fn as_collectable_with_default_ref(&self) -> &dyn CollectableDyn { - self - } - - fn as_collectable_with_default_mut(&mut self) -> &mut dyn CollectableDyn { - self - } - - fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box { - // TODO improve the search - let n1 = self.tss.iter().take_while(|&&x| x <= ts_end).count(); - let tss = self.tss.drain(..n1).collect(); - let pulses = self.pulses.drain(..n1).collect(); - let values = self.values.drain(..n1).collect(); - let ret = Self { - tss, - pulses, - values, - }; - Box::new(ret) - } - - fn new_empty_evs(&self) -> Box { - Box::new(Self::empty()) - } - - fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), Error> { - // TODO as_any and as_any_mut are declared on unrelated traits. Simplify. - if let Some(dst) = dst.as_any_mut().downcast_mut::() { - // TODO make it harder to forget new members when the struct may get modified in the future - let r = range.0..range.1; - dst.tss.extend(self.tss.drain(r.clone())); - dst.pulses.extend(self.pulses.drain(r.clone())); - dst.values.extend(self.values.drain(r.clone())); - Ok(()) - } else { - error!( - "downcast to EventsDim0 FAILED\n\n{}\n\n{}\n\n", - self.type_name(), - dst.type_name() - ); - panic!(); - // Err(MergeError::NotCompatible) - } - } - - fn find_lowest_index_gt_evs(&self, ts: u64) -> Option { - for (i, &m) in self.tss.iter().enumerate() { - if m > ts { - return Some(i); - } - } - None - } - - fn find_lowest_index_ge_evs(&self, ts: u64) -> Option { - for (i, &m) in self.tss.iter().enumerate() { - if m >= ts { - return Some(i); - } - } - None - } - - fn find_highest_index_lt_evs(&self, ts: u64) -> Option { - for (i, &m) in self.tss.iter().enumerate().rev() { - if m < ts { - return Some(i); - } - } - None - } - - fn ts_min(&self) -> Option { - self.tss.front().map(|&x| x) - } - - fn ts_max(&self) -> Option { - self.tss.back().map(|&x| x) - } - - fn partial_eq_dyn(&self, other: &dyn Events) -> bool { - if let Some(other) = other.as_any_ref().downcast_ref::() { - self == other - } else { - false - } - } - - fn serde_id(&self) -> &'static str { - Self::serde_id() - } - - fn nty_id(&self) -> u32 { - STY::SUB as u32 - } - - fn clone_dyn(&self) -> Box { - Box::new(self.clone()) - } - - fn tss(&self) -> &VecDeque { - &self.tss - } - - fn pulses(&self) -> &VecDeque { - &self.pulses - } - - fn frame_type_id(&self) -> u32 { - error!("TODO frame_type_id should not be called"); - // TODO make more nice - panic!() - } - - fn to_min_max_avg(&mut self) -> Box { - let dst = Self { - tss: mem::replace(&mut self.tss, Default::default()), - pulses: mem::replace(&mut self.pulses, Default::default()), - values: mem::replace(&mut self.values, Default::default()), - }; - Box::new(dst) - } - - fn to_json_string(&self) -> String { - // TODO redesign with mut access, rename to `into_` and take the values out. - let mut values = self.values.clone(); - let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(todo!()); - let (pulse_anchor, pulse_off) = crate::offsets::pulse_offs_from_abs(&self.pulses); - // let values = mem::replace(&mut values, VecDeque::new()); - let ret = EventsDim0CollectorOutput { - ts_anchor_sec, - ts_off_ms, - ts_off_ns, - pulse_anchor, - pulse_off, - values, - range_final: false, - timed_out: false, - continue_at: None, - }; - serde_json::to_string(&ret).unwrap() - } - - fn to_json_vec_u8(&self) -> Vec { - self.to_json_string().into_bytes() - } - - fn to_cbor_vec_u8(&self) -> Vec { - // TODO redesign with mut access, rename to `into_` and take the values out. - let ret = EventsDim0ChunkOutput { - // TODO use &mut to swap the content - tss: self.tss.clone(), - pulses: self.pulses.clone(), - values: self.values.clone(), - scalar_type: STY::scalar_type_name().into(), - }; - let mut buf = Vec::new(); - ciborium::into_writer(&ret, &mut buf).unwrap(); - buf - } - - fn clear(&mut self) { - self.tss.clear(); - self.pulses.clear(); - self.values.clear(); - } - - fn to_dim0_f32_for_binning(&self) -> Box { - let mut ret = EventsDim0::empty(); - for (&ts, val) in self.tss.iter().zip(self.values.iter()) { - ret.push(TsNano::from_ns(ts), val.as_prim_f32_b()); - } - Box::new(ret) - } - - fn to_container_events(&self) -> Box { - try_to_container_events!(u8, self); - try_to_container_events!(u16, self); - try_to_container_events!(u32, self); - try_to_container_events!(u64, self); - try_to_container_events!(i8, self); - try_to_container_events!(i16, self); - try_to_container_events!(i32, self); - try_to_container_events!(i64, self); - try_to_container_events!(f32, self); - try_to_container_events!(f64, self); - try_to_container_events!(bool, self); - try_to_container_events!(String, self); - let this = self; - if let Some(evs) = self - .as_any_ref() - .downcast_ref::>() - { - use crate::binning::container_events::ContainerEvents; - let tss = this.tss.iter().map(|&x| TsNano::from_ns(x)).collect(); - use crate::binning::container_events::Container; - let mut vals = crate::binning::valuetype::EnumVariantContainer::new(); - for x in evs.values.iter() { - vals.push_back(x.clone()); - } - let ret = ContainerEvents::::from_constituents(tss, vals); - return Box::new(ret); - } - let styn = any::type_name::(); - todo!("TODO to_container_events for {styn}") - } -} - -impl Appendable for EventsDim0 -where - STY: ScalarOps, -{ - fn push(&mut self, ts: TsNano, value: STY) { - self.tss.push_back(ts.ns()); - self.pulses.push_back(0); - self.values.push_back(value); - } -} diff --git a/src/eventsdim0enum.rs b/src/eventsdim0enum.rs deleted file mode 100644 index ba72f69..0000000 --- a/src/eventsdim0enum.rs +++ /dev/null @@ -1,477 +0,0 @@ -use daqbuf_err as err; -use err::Error; -use items_0::apitypes::ToUserFacingApiType; -use items_0::collect_s::CollectableDyn; -use items_0::collect_s::CollectedDyn; -use items_0::collect_s::CollectorDyn; -use items_0::collect_s::CollectorTy; -use items_0::collect_s::ToJsonValue; -use items_0::container::ByteEstimate; -use items_0::isodate::IsoDateTime; -use items_0::scalar_ops::ScalarOps; -use items_0::timebin::TimeBinnableTy; -use items_0::timebin::TimeBinnerTy; -use items_0::AsAnyMut; -use items_0::AsAnyRef; -use items_0::Events; -use items_0::EventsNonObj; -use items_0::TypeName; -use items_0::WithLen; -use netpod::log::*; -use netpod::range::evrange::SeriesRange; -use netpod::timeunits::MS; -use netpod::timeunits::SEC; -use netpod::BinnedRangeEnum; -use serde::Deserialize; -use serde::Serialize; -use std::any; -use std::any::Any; -use std::collections::VecDeque; -use std::mem; - -#[allow(unused)] -macro_rules! trace_collect_result { - ($($arg:tt)*) => { - if false { - trace!($($arg)*); - } - }; -} - -#[derive(Debug)] -pub struct EventsDim0EnumCollector { - vals: EventsDim0Enum, - range_final: bool, - timed_out: bool, - needs_continue_at: bool, -} - -impl EventsDim0EnumCollector { - pub fn new() -> Self { - Self { - vals: EventsDim0Enum::new(), - range_final: false, - timed_out: false, - needs_continue_at: false, - } - } -} - -impl TypeName for EventsDim0EnumCollector { - fn type_name(&self) -> String { - "EventsDim0EnumCollector".into() - } -} - -impl WithLen for EventsDim0EnumCollector { - fn len(&self) -> usize { - self.vals.tss.len() - } -} - -impl ByteEstimate for EventsDim0EnumCollector { - fn byte_estimate(&self) -> u64 { - // TODO does it need to be more accurate? - 30 * self.len() as u64 - } -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct EventsDim0EnumCollectorOutput { - #[serde(rename = "tsAnchor")] - ts_anchor_sec: u64, - #[serde(rename = "tsMs")] - ts_off_ms: VecDeque, - #[serde(rename = "tsNs")] - ts_off_ns: VecDeque, - #[serde(rename = "values")] - vals: VecDeque, - #[serde(rename = "valuestrings")] - valstrs: VecDeque, - #[serde( - rename = "rangeFinal", - default, - skip_serializing_if = "netpod::is_false" - )] - range_final: bool, - #[serde(rename = "timedOut", default, skip_serializing_if = "netpod::is_false")] - timed_out: bool, - #[serde( - rename = "continueAt", - default, - skip_serializing_if = "Option::is_none" - )] - continue_at: Option, -} - -impl WithLen for EventsDim0EnumCollectorOutput { - fn len(&self) -> usize { - todo!() - } -} - -impl AsAnyRef for EventsDim0EnumCollectorOutput { - fn as_any_ref(&self) -> &dyn Any { - todo!() - } -} - -impl AsAnyMut for EventsDim0EnumCollectorOutput { - fn as_any_mut(&mut self) -> &mut dyn Any { - todo!() - } -} - -impl TypeName for EventsDim0EnumCollectorOutput { - fn type_name(&self) -> String { - any::type_name::().into() - } -} - -impl ToUserFacingApiType for EventsDim0EnumCollectorOutput { - fn into_user_facing_api_type(self: Self) -> Box { - todo!() - } - - fn into_user_facing_api_type_box(self: Box) -> Box { - todo!() - } -} - -impl CollectedDyn for EventsDim0EnumCollectorOutput {} - -impl CollectorTy for EventsDim0EnumCollector { - type Input = EventsDim0Enum; - type Output = EventsDim0EnumCollectorOutput; - - fn ingest(&mut self, src: &mut EventsDim0Enum) { - self.vals.tss.append(&mut src.tss); - self.vals.values.append(&mut src.values); - self.vals.valuestrs.append(&mut src.valuestrs); - } - - fn set_range_complete(&mut self) { - self.range_final = true; - } - - fn set_timed_out(&mut self) { - self.timed_out = true; - self.needs_continue_at = true; - } - - fn result(&mut self) -> Result { - trace_collect_result!( - "{} result() needs_continue_at {}", - self.type_name(), - self.needs_continue_at - ); - let range: Option = None; - // If we timed out, we want to hint the client from where to continue. - // This is tricky: currently, client can not request a left-exclusive range. - // We currently give the timestamp of the last event plus a small delta. - // The amount of the delta must take into account what kind of timestamp precision the client - // can parse and handle. - let vals = &mut self.vals; - let continue_at = if self.needs_continue_at { - if let Some(ts) = vals.tss.back() { - let x = Some(IsoDateTime::from_ns_u64(*ts / MS * MS + MS)); - x - } else { - if let Some(range) = &range { - match range { - SeriesRange::TimeRange(x) => Some(IsoDateTime::from_ns_u64(x.beg + SEC)), - SeriesRange::PulseRange(_) => { - error!("TODO emit create continueAt for pulse range"); - Some(IsoDateTime::from_ns_u64(0)) - } - } - } else { - Some(IsoDateTime::from_ns_u64(0)) - } - } - } else { - None - }; - let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(todo!()); - let valixs = mem::replace(&mut vals.values, VecDeque::new()); - let valstrs = mem::replace(&mut vals.valuestrs, VecDeque::new()); - let vals = valixs; - if ts_off_ms.len() != ts_off_ns.len() { - return Err(Error::with_msg_no_trace("collected len mismatch")); - } - if ts_off_ms.len() != vals.len() { - return Err(Error::with_msg_no_trace("collected len mismatch")); - } - if ts_off_ms.len() != valstrs.len() { - return Err(Error::with_msg_no_trace("collected len mismatch")); - } - let ret = Self::Output { - ts_anchor_sec, - ts_off_ms, - ts_off_ns, - vals, - valstrs, - range_final: self.range_final, - timed_out: self.timed_out, - continue_at, - }; - Ok(ret) - } -} - -// Experiment with having this special case for enums -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct EventsDim0Enum { - pub tss: VecDeque, - pub values: VecDeque, - pub valuestrs: VecDeque, -} - -impl EventsDim0Enum { - pub fn new() -> Self { - Self { - tss: VecDeque::new(), - values: VecDeque::new(), - valuestrs: VecDeque::new(), - } - } - - pub fn push_back(&mut self, ts: u64, value: u16, valuestr: String) { - self.tss.push_back(ts); - self.values.push_back(value); - self.valuestrs.push_back(valuestr); - } -} - -impl TypeName for EventsDim0Enum { - fn type_name(&self) -> String { - "EventsDim0Enum".into() - } -} - -impl AsAnyRef for EventsDim0Enum { - fn as_any_ref(&self) -> &dyn Any { - self - } -} - -impl AsAnyMut for EventsDim0Enum { - fn as_any_mut(&mut self) -> &mut dyn Any { - self - } -} - -impl WithLen for EventsDim0Enum { - fn len(&self) -> usize { - self.tss.len() - } -} - -impl CollectableDyn for EventsDim0Enum { - fn new_collector(&self) -> Box { - Box::new(EventsDim0EnumCollector::new()) - } -} - -// impl Events - -impl ByteEstimate for EventsDim0Enum { - fn byte_estimate(&self) -> u64 { - todo!() - } -} - -impl EventsNonObj for EventsDim0Enum { - fn into_tss_pulses(self: Box) -> (VecDeque, VecDeque) { - todo!() - } -} - -// NOTE just a dummy because currently we don't use this for time binning -#[derive(Debug)] -pub struct EventsDim0EnumTimeBinner; - -impl TimeBinnerTy for EventsDim0EnumTimeBinner { - type Input = EventsDim0Enum; - type Output = (); - - fn ingest(&mut self, _item: &mut Self::Input) { - todo!() - } - - fn set_range_complete(&mut self) { - todo!() - } - - fn bins_ready_count(&self) -> usize { - todo!() - } - - fn bins_ready(&mut self) -> Option { - todo!() - } - - fn push_in_progress(&mut self, _push_empty: bool) { - todo!() - } - - fn cycle(&mut self) { - todo!() - } - - fn empty(&self) -> Option { - todo!() - } - - fn append_empty_until_end(&mut self) { - todo!() - } -} - -// NOTE just a dummy because currently we don't use this for time binning -impl TimeBinnableTy for EventsDim0Enum { - type TimeBinner = EventsDim0EnumTimeBinner; - - fn time_binner_new( - &self, - _binrange: BinnedRangeEnum, - _do_time_weight: bool, - _emit_empty_bins: bool, - ) -> Self::TimeBinner { - todo!() - } -} - -// NOTE just a dummy because currently we don't use this for time binning - -#[derive(Debug, Serialize, Deserialize)] -pub struct EventsDim0EnumChunkOutput { - tss: VecDeque, - values: VecDeque, - valuestrings: VecDeque, - scalar_type: String, -} - -impl Events for EventsDim0Enum { - fn verify(&self) -> bool { - todo!() - } - - fn output_info(&self) -> String { - todo!() - } - - fn as_collectable_mut(&mut self) -> &mut dyn CollectableDyn { - todo!() - } - - fn as_collectable_with_default_ref(&self) -> &dyn CollectableDyn { - todo!() - } - - fn as_collectable_with_default_mut(&mut self) -> &mut dyn CollectableDyn { - todo!() - } - - fn ts_min(&self) -> Option { - todo!() - } - - fn ts_max(&self) -> Option { - todo!() - } - - fn take_new_events_until_ts(&mut self, _ts_end: u64) -> Box { - todo!() - } - - fn new_empty_evs(&self) -> Box { - todo!() - } - - fn drain_into_evs( - &mut self, - _dst: &mut dyn Events, - _range: (usize, usize), - ) -> Result<(), err::Error> { - todo!() - } - - fn find_lowest_index_gt_evs(&self, _ts: u64) -> Option { - todo!() - } - - fn find_lowest_index_ge_evs(&self, _ts: u64) -> Option { - todo!() - } - - fn find_highest_index_lt_evs(&self, _ts: u64) -> Option { - todo!() - } - - fn clone_dyn(&self) -> Box { - todo!() - } - - fn partial_eq_dyn(&self, _other: &dyn Events) -> bool { - todo!() - } - - fn serde_id(&self) -> &'static str { - todo!() - } - - fn nty_id(&self) -> u32 { - todo!() - } - - fn tss(&self) -> &VecDeque { - todo!() - } - - fn pulses(&self) -> &VecDeque { - todo!() - } - - fn frame_type_id(&self) -> u32 { - todo!() - } - - fn to_min_max_avg(&mut self) -> Box { - todo!() - } - - fn to_json_string(&self) -> String { - todo!() - } - - fn to_json_vec_u8(&self) -> Vec { - self.to_json_string().into_bytes() - } - - fn to_cbor_vec_u8(&self) -> Vec { - // TODO redesign with mut access, rename to `into_` and take the values out. - let ret = EventsDim0EnumChunkOutput { - // TODO use &mut to swap the content - tss: self.tss.clone(), - values: self.values.clone(), - valuestrings: self.valuestrs.clone(), - scalar_type: netpod::EnumVariant::scalar_type_name().into(), - }; - let mut buf = Vec::new(); - ciborium::into_writer(&ret, &mut buf).unwrap(); - buf - } - - fn clear(&mut self) { - todo!() - } - - fn to_dim0_f32_for_binning(&self) -> Box { - todo!("{}::to_dim0_f32_for_binning", self.type_name()) - } - - fn to_container_events(&self) -> Box { - todo!("{}::to_container_events", self.type_name()) - } -} diff --git a/src/eventsdim1.rs b/src/eventsdim1.rs deleted file mode 100644 index 0ed8cfd..0000000 --- a/src/eventsdim1.rs +++ /dev/null @@ -1,666 +0,0 @@ -use crate::IsoDateTime; -use daqbuf_err as err; -use err::Error; -use items_0::apitypes::ToUserFacingApiType; -use items_0::collect_s::CollectableDyn; -use items_0::collect_s::CollectableType; -use items_0::collect_s::CollectedDyn; -use items_0::collect_s::CollectorTy; -use items_0::collect_s::ToJsonValue; -use items_0::container::ByteEstimate; -use items_0::overlap::HasTimestampDeque; -use items_0::scalar_ops::ScalarOps; -use items_0::Appendable; -use items_0::AsAnyMut; -use items_0::AsAnyRef; -use items_0::Empty; -use items_0::Events; -use items_0::EventsNonObj; -use items_0::TypeName; -use items_0::WithLen; -use netpod::is_false; -use netpod::log::*; -use netpod::range::evrange::SeriesRange; -use netpod::timeunits::MS; -use netpod::timeunits::SEC; -use netpod::BinnedRangeEnum; -use netpod::TsNano; -use serde::Deserialize; -use serde::Serialize; -use std::any; -use std::any::Any; -use std::collections::VecDeque; -use std::fmt; -use std::mem; - -#[allow(unused)] -macro_rules! trace2 { - (EN$($arg:tt)*) => (); - ($($arg:tt)*) => (trace!($($arg)*)); -} - -#[derive(Clone, PartialEq, Serialize, Deserialize)] -pub struct EventsDim1NoPulse { - pub tss: VecDeque, - pub values: VecDeque>, -} - -impl From> for EventsDim1 { - fn from(value: EventsDim1NoPulse) -> Self { - let pulses = vec![0; value.tss.len()].into(); - Self { - tss: value.tss, - pulses, - values: value.values, - } - } -} - -#[derive(Clone, PartialEq, Serialize, Deserialize)] -pub struct EventsDim1 { - pub tss: VecDeque, - pub pulses: VecDeque, - pub values: VecDeque>, -} - -impl EventsDim1 { - #[inline(always)] - pub fn push(&mut self, ts: u64, pulse: u64, value: Vec) { - self.tss.push_back(ts); - self.pulses.push_back(pulse); - self.values.push_back(value); - } - - #[inline(always)] - pub fn push_front(&mut self, ts: u64, pulse: u64, value: Vec) { - self.tss.push_front(ts); - self.pulses.push_front(pulse); - self.values.push_front(value); - } - - pub fn serde_id() -> &'static str { - "EventsDim1" - } - - pub fn tss(&self) -> &VecDeque { - &self.tss - } -} - -impl AsAnyRef for EventsDim1 -where - STY: ScalarOps, -{ - fn as_any_ref(&self) -> &dyn Any { - self - } -} - -impl AsAnyMut for EventsDim1 -where - STY: ScalarOps, -{ - fn as_any_mut(&mut self) -> &mut dyn Any { - self - } -} - -impl Empty for EventsDim1 { - fn empty() -> Self { - Self { - tss: VecDeque::new(), - pulses: VecDeque::new(), - values: VecDeque::new(), - } - } -} - -impl fmt::Debug for EventsDim1 -where - STY: fmt::Debug, -{ - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - if false { - write!( - fmt, - "EventsDim1 {{ count {} ts {:?} vals {:?} }}", - self.tss.len(), - self.tss.iter().map(|x| x / SEC).collect::>(), - self.values, - ) - } else { - write!( - fmt, - "EventsDim1 {{ count {} ts {:?} .. {:?} vals {:?} .. {:?} }}", - self.tss.len(), - self.tss.front().map(|x| x / SEC), - self.tss.back().map(|x| x / SEC), - self.values.front(), - self.values.back(), - ) - } - } -} - -impl WithLen for EventsDim1 { - fn len(&self) -> usize { - self.tss.len() - } -} - -impl ByteEstimate for EventsDim1 { - fn byte_estimate(&self) -> u64 { - let stylen = mem::size_of::(); - let n = self.values.front().map_or(0, Vec::len); - (self.len() * (8 + 8 + n * stylen)) as u64 - } -} - -impl HasTimestampDeque for EventsDim1 { - fn timestamp_min(&self) -> Option { - self.tss.front().map(|x| *x) - } - - fn timestamp_max(&self) -> Option { - self.tss.back().map(|x| *x) - } - - fn pulse_min(&self) -> Option { - self.pulses.front().map(|x| *x) - } - - fn pulse_max(&self) -> Option { - self.pulses.back().map(|x| *x) - } -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct EventsDim1ChunkOutput { - tss: VecDeque, - pulses: VecDeque, - values: VecDeque>, - scalar_type: String, -} - -impl EventsDim1ChunkOutput {} - -#[derive(Debug)] -pub struct EventsDim1Collector { - vals: EventsDim1, - range_final: bool, - timed_out: bool, - needs_continue_at: bool, -} - -impl EventsDim1Collector { - pub fn self_name() -> &'static str { - any::type_name::() - } - - pub fn new() -> Self { - Self { - vals: EventsDim1::empty(), - range_final: false, - timed_out: false, - needs_continue_at: false, - } - } -} - -impl WithLen for EventsDim1Collector { - fn len(&self) -> usize { - WithLen::len(&self.vals) - } -} - -impl ByteEstimate for EventsDim1Collector { - fn byte_estimate(&self) -> u64 { - ByteEstimate::byte_estimate(&self.vals) - } -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct EventsDim1CollectorOutput { - #[serde(rename = "tsAnchor")] - ts_anchor_sec: u64, - #[serde(rename = "tsMs")] - ts_off_ms: VecDeque, - #[serde(rename = "tsNs")] - ts_off_ns: VecDeque, - #[serde(rename = "pulseAnchor")] - pulse_anchor: u64, - #[serde(rename = "pulseOff")] - pulse_off: VecDeque, - #[serde(rename = "values")] - values: VecDeque>, - #[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")] - range_final: bool, - #[serde(rename = "timedOut", default, skip_serializing_if = "is_false")] - timed_out: bool, - #[serde( - rename = "continueAt", - default, - skip_serializing_if = "Option::is_none" - )] - continue_at: Option, -} - -impl EventsDim1CollectorOutput { - pub fn ts_anchor_sec(&self) -> u64 { - self.ts_anchor_sec - } - - pub fn ts_off_ms(&self) -> &VecDeque { - &self.ts_off_ms - } - - pub fn pulse_anchor(&self) -> u64 { - self.pulse_anchor - } - - pub fn pulse_off(&self) -> &VecDeque { - &self.pulse_off - } - - /// Note: only used for unit tests. - pub fn values_to_f32(&self) -> VecDeque> { - self.values - .iter() - .map(|x| x.iter().map(|x| x.as_prim_f32_b()).collect()) - .collect() - } - - pub fn range_final(&self) -> bool { - self.range_final - } - - pub fn timed_out(&self) -> bool { - self.timed_out - } - - pub fn is_valid(&self) -> bool { - if self.ts_off_ms.len() != self.ts_off_ns.len() { - false - } else if self.ts_off_ms.len() != self.pulse_off.len() { - false - } else if self.ts_off_ms.len() != self.values.len() { - false - } else { - true - } - } - - pub fn info_str(&self) -> String { - use fmt::Write; - let mut out = String::new(); - write!( - out, - "ts_off_ms {} ts_off_ns {} pulse_off {} values {}", - self.ts_off_ms.len(), - self.ts_off_ns.len(), - self.pulse_off.len(), - self.values.len(), - ) - .unwrap(); - out - } -} - -impl AsAnyRef for EventsDim1CollectorOutput -where - STY: 'static, -{ - fn as_any_ref(&self) -> &dyn Any { - self - } -} - -impl AsAnyMut for EventsDim1CollectorOutput -where - STY: 'static, -{ - fn as_any_mut(&mut self) -> &mut dyn Any { - self - } -} - -impl TypeName for EventsDim1CollectorOutput { - fn type_name(&self) -> String { - any::type_name::().into() - } -} - -impl WithLen for EventsDim1CollectorOutput { - fn len(&self) -> usize { - self.values.len() - } -} - -impl ToUserFacingApiType for EventsDim1CollectorOutput { - fn into_user_facing_api_type(self: Self) -> Box { - todo!() - } - - fn into_user_facing_api_type_box(self: Box) -> Box { - todo!() - } -} - -impl CollectedDyn for EventsDim1CollectorOutput {} - -impl CollectorTy for EventsDim1Collector { - type Input = EventsDim1; - type Output = EventsDim1CollectorOutput; - - fn ingest(&mut self, src: &mut Self::Input) { - self.vals.tss.append(&mut src.tss); - self.vals.pulses.append(&mut src.pulses); - self.vals.values.append(&mut src.values); - } - - fn set_range_complete(&mut self) { - self.range_final = true; - } - - fn set_timed_out(&mut self) { - self.timed_out = true; - } - - // TODO unify with dim0 case - fn result(&mut self) -> Result { - // If we timed out, we want to hint the client from where to continue. - // This is tricky: currently, client can not request a left-exclusive range. - // We currently give the timestamp of the last event plus a small delta. - // The amount of the delta must take into account what kind of timestamp precision the client - // can parse and handle. - let range: Option = None; - let vals = &mut self.vals; - let continue_at = if self.timed_out { - if let Some(ts) = vals.tss.back() { - Some(IsoDateTime::from_ns_u64(*ts + MS)) - } else { - if let Some(range) = &range { - match range { - SeriesRange::TimeRange(x) => Some(IsoDateTime::from_ns_u64(x.beg + SEC)), - SeriesRange::PulseRange(_) => { - error!("TODO emit create continueAt for pulse range"); - Some(IsoDateTime::from_ns_u64(0)) - } - } - } else { - warn!("can not determine continue-at parameters"); - Some(IsoDateTime::from_ns_u64(0)) - } - } - } else { - None - }; - let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(todo!()); - let (pulse_anchor, pulse_off) = crate::offsets::pulse_offs_from_abs(&vals.pulses); - let values = mem::replace(&mut vals.values, VecDeque::new()); - if ts_off_ms.len() != ts_off_ns.len() { - return Err(Error::with_msg_no_trace("collected len mismatch")); - } - if ts_off_ms.len() != pulse_off.len() { - return Err(Error::with_msg_no_trace("collected len mismatch")); - } - if ts_off_ms.len() != values.len() { - return Err(Error::with_msg_no_trace("collected len mismatch")); - } - let ret = Self::Output { - ts_anchor_sec, - ts_off_ms, - ts_off_ns, - pulse_anchor, - pulse_off, - values, - range_final: self.range_final, - timed_out: self.timed_out, - continue_at, - }; - if !ret.is_valid() { - error!("invalid:\n{}", ret.info_str()); - } - Ok(ret) - } -} - -impl CollectableType for EventsDim1 { - type Collector = EventsDim1Collector; - - fn new_collector() -> Self::Collector { - Self::Collector::new() - } -} - -#[derive(Debug)] -pub struct EventsDim1Aggregator { - _last_seen_val: Option, - events_taken_count: u64, - events_ignored_count: u64, -} - -impl Drop for EventsDim1Aggregator { - fn drop(&mut self) { - // TODO collect as stats for the request context: - trace!( - "taken {} ignored {}", - self.events_taken_count, - self.events_ignored_count - ); - } -} - -impl EventsDim1Aggregator { - pub fn new(_range: SeriesRange, _do_time_weight: bool) -> Self { - panic!("TODO remove") - } -} - -impl items_0::TypeName for EventsDim1 { - fn type_name(&self) -> String { - let sty = std::any::type_name::(); - format!("EventsDim1<{sty}>") - } -} - -impl EventsNonObj for EventsDim1 { - fn into_tss_pulses(self: Box) -> (VecDeque, VecDeque) { - panic!("TODO remove") - } -} - -impl Events for EventsDim1 { - fn verify(&self) -> bool { - let mut good = true; - let mut ts_max = 0; - for ts in &self.tss { - let ts = *ts; - if ts < ts_max { - good = false; - error!("unordered event data ts {} ts_max {}", ts, ts_max); - } - ts_max = ts_max.max(ts); - } - good - } - - fn output_info(&self) -> String { - let n2 = self.tss.len().max(1) - 1; - format!( - "EventsDim1OutputInfo {{ len {}, ts_min {}, ts_max {} }}", - self.tss.len(), - self.tss.get(0).map_or(-1i64, |&x| x as i64), - self.tss.get(n2).map_or(-1i64, |&x| x as i64), - ) - } - - fn as_collectable_mut(&mut self) -> &mut dyn CollectableDyn { - self - } - - fn as_collectable_with_default_ref(&self) -> &dyn CollectableDyn { - self - } - - fn as_collectable_with_default_mut(&mut self) -> &mut dyn CollectableDyn { - self - } - - fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box { - // TODO improve the search - let n1 = self.tss.iter().take_while(|&&x| x <= ts_end).count(); - let tss = self.tss.drain(..n1).collect(); - let pulses = self.pulses.drain(..n1).collect(); - let values = self.values.drain(..n1).collect(); - let ret = Self { - tss, - pulses, - values, - }; - Box::new(ret) - } - - fn new_empty_evs(&self) -> Box { - Box::new(Self::empty()) - } - - fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), Error> { - // TODO as_any and as_any_mut are declared on unrelated traits. Simplify. - if let Some(dst) = dst.as_any_mut().downcast_mut::() { - // TODO make it harder to forget new members when the struct may get modified in the future - let r = range.0..range.1; - dst.tss.extend(self.tss.drain(r.clone())); - dst.pulses.extend(self.pulses.drain(r.clone())); - dst.values.extend(self.values.drain(r.clone())); - Ok(()) - } else { - error!("downcast to EventsDim0 FAILED"); - // Err(Error::NotCompatible) - todo!() - } - } - - fn find_lowest_index_gt_evs(&self, ts: u64) -> Option { - for (i, &m) in self.tss.iter().enumerate() { - if m > ts { - return Some(i); - } - } - None - } - - fn find_lowest_index_ge_evs(&self, ts: u64) -> Option { - for (i, &m) in self.tss.iter().enumerate() { - if m >= ts { - return Some(i); - } - } - None - } - - fn find_highest_index_lt_evs(&self, ts: u64) -> Option { - for (i, &m) in self.tss.iter().enumerate().rev() { - if m < ts { - return Some(i); - } - } - None - } - - fn ts_min(&self) -> Option { - self.tss.front().map(|&x| x) - } - - fn ts_max(&self) -> Option { - self.tss.back().map(|&x| x) - } - - fn partial_eq_dyn(&self, other: &dyn Events) -> bool { - if let Some(other) = other.as_any_ref().downcast_ref::() { - self == other - } else { - false - } - } - - fn serde_id(&self) -> &'static str { - Self::serde_id() - } - - fn nty_id(&self) -> u32 { - STY::SUB as u32 - } - - fn clone_dyn(&self) -> Box { - Box::new(self.clone()) - } - - fn tss(&self) -> &VecDeque { - &self.tss - } - - fn pulses(&self) -> &VecDeque { - &self.pulses - } - - fn frame_type_id(&self) -> u32 { - // TODO make more nice - panic!() - } - - fn to_min_max_avg(&mut self) -> Box { - panic!("discontinued support for EventsDim1") - } - - fn to_json_string(&self) -> String { - let ret = EventsDim1ChunkOutput { - // TODO use &mut to swap the content - tss: self.tss.clone(), - pulses: self.pulses.clone(), - values: self.values.clone(), - scalar_type: STY::scalar_type_name().into(), - }; - serde_json::to_string(&ret).unwrap() - } - - fn to_json_vec_u8(&self) -> Vec { - self.to_json_string().into_bytes() - } - - fn to_cbor_vec_u8(&self) -> Vec { - let ret = EventsDim1ChunkOutput { - // TODO use &mut to swap the content - tss: self.tss.clone(), - pulses: self.pulses.clone(), - values: self.values.clone(), - scalar_type: STY::scalar_type_name().into(), - }; - let mut buf = Vec::new(); - ciborium::into_writer(&ret, &mut buf).unwrap(); - buf - } - - fn clear(&mut self) { - self.tss.clear(); - self.pulses.clear(); - self.values.clear(); - } - - fn to_dim0_f32_for_binning(&self) -> Box { - todo!("{}::to_dim0_f32_for_binning", self.type_name()) - } - - fn to_container_events(&self) -> Box { - todo!("{}::to_container_events", self.type_name()) - } -} - -impl Appendable> for EventsDim1 -where - STY: ScalarOps, -{ - fn push(&mut self, ts: TsNano, value: Vec) { - Self::push(self, ts.ns(), 0, value) - } -} diff --git a/src/framable.rs b/src/framable.rs index f049810..2ac07e9 100644 --- a/src/framable.rs +++ b/src/framable.rs @@ -16,7 +16,6 @@ use items_0::streamitem::StreamItem; use items_0::streamitem::ERROR_FRAME_TYPE_ID; use items_0::streamitem::EVENT_QUERY_JSON_STRING_FRAME; use items_0::streamitem::SITEMTY_NONSPEC_FRAME_TYPE_ID; -use items_0::Events; use netpod::log::*; use serde::de::DeserializeOwned; use serde::Deserialize; @@ -74,12 +73,6 @@ where } } -impl FrameType for Box { - fn frame_type_id(&self) -> u32 { - self.as_ref().frame_type_id() - } -} - pub trait Framable { fn make_frame_dyn(&self) -> Result; } diff --git a/src/lib.rs b/src/lib.rs index b82735c..38be9c3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,9 +4,6 @@ pub mod binning; pub mod channelevents; pub mod empty; pub mod eventfull; -pub mod eventsdim0; -pub mod eventsdim0enum; -pub mod eventsdim1; pub mod framable; pub mod frame; pub mod inmem; @@ -21,7 +18,6 @@ pub mod testgen; use daqbuf_err as err; use items_0::isodate::IsoDateTime; -use items_0::Events; use std::fmt; mod log { diff --git a/src/streams.rs b/src/streams.rs index dbbdc69..45b7d5b 100644 --- a/src/streams.rs +++ b/src/streams.rs @@ -2,13 +2,6 @@ use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; -use items_0::streamitem::RangeCompletableItem; -use items_0::streamitem::Sitemty; -use items_0::streamitem::StreamItem; -use items_0::transform::EventStreamTrait; -use items_0::transform::TransformProperties; -use items_0::transform::WithTransformProperties; -use items_0::Events; use std::collections::VecDeque; use std::pin::Pin; use std::task::Context; @@ -45,15 +38,6 @@ where } } -impl WithTransformProperties for Enumerate2 -where - T: WithTransformProperties, -{ - fn query_transform_properties(&self) -> TransformProperties { - self.inp.query_transform_properties() - } -} - pub struct Then2 { inp: Pin>, f: Pin>, @@ -144,68 +128,3 @@ where } } } - -/// Wrap any event stream and provide transformation properties. -pub struct PlainEventStream -where - T: Events, - INP: Stream>, -{ - inp: Pin>, -} - -impl PlainEventStream -where - T: Events, - INP: Stream>, -{ - pub fn new(inp: INP) -> Self { - Self { inp: Box::pin(inp) } - } -} - -impl Stream for PlainEventStream -where - T: Events, - INP: Stream>, -{ - type Item = Sitemty>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - match self.inp.poll_next_unpin(cx) { - Ready(Some(item)) => Ready(Some(match item { - Ok(item) => Ok(match item { - StreamItem::DataItem(item) => StreamItem::DataItem(match item { - RangeCompletableItem::RangeComplete => RangeCompletableItem::RangeComplete, - RangeCompletableItem::Data(item) => { - RangeCompletableItem::Data(Box::new(item)) - } - }), - StreamItem::Log(item) => StreamItem::Log(item), - StreamItem::Stats(item) => StreamItem::Stats(item), - }), - Err(e) => Err(e), - })), - Ready(None) => Ready(None), - Pending => Pending, - } - } -} - -impl WithTransformProperties for PlainEventStream -where - T: Events, - INP: Stream>, -{ - fn query_transform_properties(&self) -> TransformProperties { - todo!() - } -} - -impl EventStreamTrait for PlainEventStream -where - T: Events, - INP: Stream> + Send, -{ -}