From 9fe63706cf83e3a225241f39bb8d7ea723318661 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 29 Nov 2022 15:50:16 +0100 Subject: [PATCH] factor out into common crate --- httpret/Cargo.toml | 1 + httpret/src/events.rs | 2 +- items/Cargo.toml | 1 + items_0/Cargo.toml | 15 ++ items_0/src/collect_c.rs | 118 ++++++++++++++++ items_0/src/collect_s.rs | 106 ++++++++++++++ items_0/src/items_0.rs | 139 ++++++++++++++++++ items_2/Cargo.toml | 1 + items_2/src/binsdim0.rs | 23 +-- items_2/src/channelevents.rs | 36 ++--- items_2/src/collect.rs | 67 --------- items_2/src/eventsdim0.rs | 66 ++++----- items_2/src/items_2.rs | 225 +----------------------------- items_2/src/streams.rs | 93 ------------ items_2/src/test.rs | 3 +- items_2/src/testgen.rs | 3 +- nodenet/Cargo.toml | 1 + nodenet/src/conn.rs | 2 +- scyllaconn/Cargo.toml | 1 + scyllaconn/src/bincache.rs | 3 +- scyllaconn/src/events.rs | 2 +- streams/Cargo.toml | 1 + streams/src/collect.rs | 2 +- streams/src/merge/mergedstream.rs | 2 +- streams/src/test.rs | 2 +- 25 files changed, 466 insertions(+), 449 deletions(-) create mode 100644 items_0/Cargo.toml create mode 100644 items_0/src/collect_c.rs create mode 100644 items_0/src/collect_s.rs create mode 100644 items_0/src/items_0.rs diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index 2ed4431..1e6a3db 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -29,6 +29,7 @@ dbconn = { path = "../dbconn" } tokio-postgres = { version = "0.7.6", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] } disk = { path = "../disk" } items = { path = "../items" } +items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } parse = { path = "../parse" } streams = { path = "../streams" } diff --git a/httpret/src/events.rs b/httpret/src/events.rs index cd72f27..79a129c 100644 --- a/httpret/src/events.rs +++ b/httpret/src/events.rs @@ -273,7 +273,7 @@ impl EventsHandlerScylla { Ok(k) => match k { ChannelEvents::Events(mut item) => { if coll.is_none() { - coll = Some(items_2::streams::Collectable::new_collector(item.as_ref())); + coll = Some(items_0::collect_s::Collectable::new_collector(item.as_ref())); } let cl = coll.as_mut().unwrap(); cl.ingest(item.as_collectable_mut()); diff --git a/items/Cargo.toml b/items/Cargo.toml index b58c420..e3be5f9 100644 --- a/items/Cargo.toml +++ b/items/Cargo.toml @@ -23,5 +23,6 @@ chrono = { version = "0.4.22", features = ["serde"] } crc32fast = "1.3.2" err = { path = "../err" } items_proc = { path = "../items_proc" } +items_0 = { path = "../items_0" } netpod = { path = "../netpod" } parse = { path = "../parse" } diff --git a/items_0/Cargo.toml b/items_0/Cargo.toml new file mode 100644 index 0000000..13c4c5a --- /dev/null +++ b/items_0/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "items_0" +version = "0.0.1" +authors = ["Dominik Werder "] +edition = "2021" + +[lib] +path = "src/items_0.rs" + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +erased-serde = "0.3" +serde_json = "1.0" +netpod = { path = "../netpod" } +err = { path = "../err" } diff --git a/items_0/src/collect_c.rs b/items_0/src/collect_c.rs new file mode 100644 index 0000000..31e0acf --- /dev/null +++ b/items_0/src/collect_c.rs @@ -0,0 +1,118 @@ +use crate::collect_s::ToJsonBytes; +use crate::collect_s::ToJsonResult; +use crate::AsAnyRef; +use crate::Events; +use err::Error; +use std::any::Any; +use std::fmt; + +pub trait Collector: fmt::Debug + Send { + // TODO should require here Collectable? + type Input; + type Output: Collected; + + fn len(&self) -> usize; + + fn ingest(&mut self, item: &mut Self::Input); + + fn set_range_complete(&mut self); + + fn set_timed_out(&mut self); + + fn result(&mut self) -> Result; +} + +pub trait Collectable: fmt::Debug { + type Collector: Collector; + + fn new_collector(&self) -> Self::Collector; +} + +pub trait Collected: fmt::Debug + ToJsonResult + AsAnyRef + Send {} + +erased_serde::serialize_trait_object!(Collected); + +impl AsAnyRef for Box { + fn as_any_ref(&self) -> &dyn Any { + self.as_ref().as_any_ref() + } +} + +impl ToJsonResult for Box { + fn to_json_result(&self) -> Result, Error> { + self.as_ref().to_json_result() + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl Collected for Box {} + +#[derive(Debug)] +pub struct CollectorDynDefault {} + +pub trait CollectorDyn: fmt::Debug + Send { + fn len(&self) -> usize; + + fn ingest(&mut self, item: &mut dyn CollectableWithDefault); + + fn set_range_complete(&mut self); + + fn set_timed_out(&mut self); + + fn result(&mut self) -> Result, Error>; +} + +pub trait CollectableWithDefault { + fn new_collector(&self) -> Box; + fn as_any_mut(&mut self) -> &mut dyn Any; +} + +#[derive(Debug)] +pub struct EventsCollector { + coll: Box, +} + +impl EventsCollector { + pub fn new(coll: Box) -> Self { + Self { coll } + } +} + +impl Collector for EventsCollector { + type Input = Box; + + // TODO this Output trait does not differentiate between e.g. collected events, collected bins, different aggs, etc... + type Output = Box; + + fn len(&self) -> usize { + self.coll.len() + } + + fn ingest(&mut self, item: &mut Self::Input) { + self.coll.ingest(item.as_collectable_with_default_mut()); + } + + fn set_range_complete(&mut self) { + self.coll.set_range_complete() + } + + fn set_timed_out(&mut self) { + self.coll.set_timed_out() + } + + fn result(&mut self) -> Result { + self.coll.result() + } +} + +impl Collectable for Box { + type Collector = EventsCollector; + + fn new_collector(&self) -> Self::Collector { + let coll = CollectableWithDefault::new_collector(self.as_ref()); + EventsCollector::new(coll) + } +} diff --git a/items_0/src/collect_s.rs b/items_0/src/collect_s.rs new file mode 100644 index 0000000..90dbefb --- /dev/null +++ b/items_0/src/collect_s.rs @@ -0,0 +1,106 @@ +use super::collect_c::Collected; +use crate::WithLen; +use err::Error; +use serde::Serialize; +use std::any::Any; +use std::fmt; + +pub trait CollectorType: Send + Unpin + WithLen { + type Input: Collectable; + type Output: Collected + ToJsonResult + Serialize; + + fn ingest(&mut self, src: &mut Self::Input); + fn set_range_complete(&mut self); + fn set_timed_out(&mut self); + + // TODO use this crate's Error instead: + fn result(&mut self) -> Result; +} + +pub trait Collector: Send + Unpin + WithLen { + fn ingest(&mut self, src: &mut dyn Collectable); + fn set_range_complete(&mut self); + fn set_timed_out(&mut self); + fn result(&mut self) -> Result, Error>; +} + +pub trait CollectableType { + type Collector: CollectorType; + fn new_collector() -> Self::Collector; +} + +pub trait Collectable: Any { + fn new_collector(&self) -> Box; + fn as_any_mut(&mut self) -> &mut dyn Any; +} + +impl Collector for T { + fn ingest(&mut self, src: &mut dyn Collectable) { + let src: &mut ::Input = src.as_any_mut().downcast_mut().expect("can not downcast"); + T::ingest(self, src) + } + + fn set_range_complete(&mut self) { + T::set_range_complete(self) + } + + fn set_timed_out(&mut self) { + T::set_timed_out(self) + } + + fn result(&mut self) -> Result, Error> { + let ret = T::result(self)?; + Ok(Box::new(ret) as _) + } +} + +impl Collectable for T { + fn new_collector(&self) -> Box { + Box::new(T::new_collector()) as _ + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + // TODO interesting: why exactly does returning `&mut self` not work here? + self + } +} + +// TODO check usage of this trait +pub trait ToJsonBytes { + fn to_json_bytes(&self) -> Result, Error>; +} + +// TODO check usage of this trait +pub trait ToJsonResult: erased_serde::Serialize + fmt::Debug + Send { + fn to_json_result(&self) -> Result, Error>; + fn as_any(&self) -> &dyn Any; +} + +erased_serde::serialize_trait_object!(ToJsonResult); + +impl ToJsonResult for serde_json::Value { + fn to_json_result(&self) -> Result, Error> { + Ok(Box::new(self.clone())) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl ToJsonBytes for serde_json::Value { + fn to_json_bytes(&self) -> Result, Error> { + Ok(serde_json::to_vec(self)?) + } +} + +// TODO do this with some blanket impl: +impl Collectable for Box { + fn new_collector(&self) -> Box { + Collectable::new_collector(self.as_ref()) + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + Collectable::as_any_mut(self.as_mut()) + } +} diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs new file mode 100644 index 0000000..967205a --- /dev/null +++ b/items_0/src/items_0.rs @@ -0,0 +1,139 @@ +pub mod collect_c; +pub mod collect_s; + +use collect_c::CollectableWithDefault; +use collect_s::Collectable; +use collect_s::ToJsonResult; +use netpod::{NanoRange, ScalarType, Shape}; +use std::any::Any; +use std::fmt; + +pub trait WithLen { + fn len(&self) -> usize; +} + +// TODO can probably be removed. +pub trait TimeBins { + fn ts_min(&self) -> Option; + fn ts_max(&self) -> Option; + fn ts_min_max(&self) -> Option<(u64, u64)>; +} + +pub enum Fits { + Empty, + Lower, + Greater, + Inside, + PartlyLower, + PartlyGreater, + PartlyLowerAndGreater, +} + +pub trait RangeOverlapInfo { + fn ends_before(&self, range: NanoRange) -> bool; + fn ends_after(&self, range: NanoRange) -> bool; + fn starts_after(&self, range: NanoRange) -> bool; +} + +pub trait EmptyForScalarTypeShape { + fn empty(scalar_type: ScalarType, shape: Shape) -> Self; +} + +pub trait EmptyForShape { + fn empty(shape: Shape) -> Self; +} + +pub trait Empty { + fn empty() -> Self; +} + +pub trait AppendEmptyBin { + fn append_empty_bin(&mut self, ts1: u64, ts2: u64); +} + +pub trait AsAnyRef { + fn as_any_ref(&self) -> &dyn Any; +} + +pub trait AsAnyMut { + fn as_any_mut(&mut self) -> &mut dyn Any; +} + +/*impl AsAnyRef for Box { + fn as_any_ref(&self) -> &dyn Any { + self.as_ref().as_any_ref() + } +}*/ + +/// Data in time-binned form. +pub trait TimeBinned: Any + TimeBinnable { + fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable; + fn as_collectable_mut(&mut self) -> &mut dyn Collectable; + fn edges_slice(&self) -> (&[u64], &[u64]); + fn counts(&self) -> &[u64]; + fn mins(&self) -> Vec; + fn maxs(&self) -> Vec; + fn avgs(&self) -> Vec; + fn validate(&self) -> Result<(), String>; +} + +pub trait TimeBinner: Send { + fn ingest(&mut self, item: &dyn TimeBinnable); + fn bins_ready_count(&self) -> usize; + fn bins_ready(&mut self) -> Option>; + + /// If there is a bin in progress with non-zero count, push it to the result set. + /// With push_empty == true, a bin in progress is pushed even if it contains no counts. + fn push_in_progress(&mut self, push_empty: bool); + + /// Implies `Self::push_in_progress` but in addition, pushes a zero-count bin if the call + /// to `push_in_progress` did not change the result count, as long as edges are left. + /// The next call to `Self::bins_ready_count` must return one higher count than before. + fn cycle(&mut self); + + fn set_range_complete(&mut self); +} + +// TODO remove the Any bound. Factor out into custom AsAny trait. + +/// Provides a time-binned representation of the implementing type. +/// In contrast to `TimeBinnableType` this is meant for trait objects. +pub trait TimeBinnable: fmt::Debug + WithLen + RangeOverlapInfo + Any + Send { + // TODO implementors may fail if edges contain not at least 2 entries. + fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box; + fn as_any(&self) -> &dyn Any; + + // TODO just a helper for the empty result. + fn to_box_to_json_result(&self) -> Box; +} + +// TODO can I remove the Any bound? + +/// Container of some form of events, for use as trait object. +pub trait Events: + fmt::Debug + Any + Collectable + CollectableWithDefault + TimeBinnable + Send + erased_serde::Serialize +{ + fn as_time_binnable(&self) -> &dyn TimeBinnable; + fn verify(&self) -> bool; + fn output_info(&self); + fn as_collectable_mut(&mut self) -> &mut dyn Collectable; + fn as_collectable_with_default_ref(&self) -> &dyn CollectableWithDefault; + fn as_collectable_with_default_mut(&mut self) -> &mut dyn CollectableWithDefault; + fn ts_min(&self) -> Option; + fn ts_max(&self) -> Option; + fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box; + fn move_into_fresh(&mut self, ts_end: u64) -> Box; + fn move_into_existing(&mut self, tgt: &mut Box, ts_end: u64) -> Result<(), ()>; + fn clone_dyn(&self) -> Box; + fn partial_eq_dyn(&self, other: &dyn Events) -> bool; + fn serde_id(&self) -> &'static str; + fn nty_id(&self) -> u32; +} + +erased_serde::serialize_trait_object!(Events); + +impl PartialEq for Box { + fn eq(&self, other: &Self) -> bool { + Events::partial_eq_dyn(self.as_ref(), other.as_ref()) + } +} diff --git a/items_2/Cargo.toml b/items_2/Cargo.toml index e778a75..53915d2 100644 --- a/items_2/Cargo.toml +++ b/items_2/Cargo.toml @@ -21,6 +21,7 @@ futures-util = "0.3.24" tokio = { version = "1.20", features = ["rt-multi-thread", "sync", "time"] } err = { path = "../err" } items = { path = "../items" } +items_0 = { path = "../items_0" } items_proc = { path = "../items_proc" } netpod = { path = "../netpod" } taskrun = { path = "../taskrun" } diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs index c467068..1e8c026 100644 --- a/items_2/src/binsdim0.rs +++ b/items_2/src/binsdim0.rs @@ -1,11 +1,14 @@ -use crate::streams::{Collectable, CollectableType, CollectorType, ToJsonResult}; -use crate::{ - ts_offs_from_abs, ts_offs_from_abs_with_anchor, AppendEmptyBin, Empty, IsoDateTime, RangeOverlapInfo, ScalarOps, - TimeBins, WithLen, -}; -use crate::{TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinned, TimeBinner}; +use crate::{ts_offs_from_abs, ts_offs_from_abs_with_anchor}; +use crate::{IsoDateTime, RangeOverlapInfo, ScalarOps}; +use crate::{TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinner}; use chrono::{TimeZone, Utc}; use err::Error; +use items_0::collect_s::{Collectable, CollectableType, CollectorType, ToJsonResult}; +use items_0::AppendEmptyBin; +use items_0::Empty; +use items_0::TimeBinned; +use items_0::TimeBins; +use items_0::WithLen; use netpod::log::*; use netpod::timeunits::SEC; use netpod::NanoRange; @@ -229,13 +232,13 @@ pub struct BinsDim0CollectedResult { finished_at: Option, } -impl crate::AsAnyRef for BinsDim0CollectedResult { +impl items_0::AsAnyRef for BinsDim0CollectedResult { fn as_any_ref(&self) -> &dyn Any { self } } -impl crate::collect::Collected for BinsDim0CollectedResult {} +impl items_0::collect_c::Collected for BinsDim0CollectedResult {} impl BinsDim0CollectedResult { pub fn ts_anchor_sec(&self) -> u64 { @@ -264,7 +267,7 @@ impl BinsDim0CollectedResult { } impl ToJsonResult for BinsDim0CollectedResult { - fn to_json_result(&self) -> Result, Error> { + fn to_json_result(&self) -> Result, Error> { let k = serde_json::to_value(self)?; Ok(Box::new(k)) } @@ -574,7 +577,7 @@ impl TimeBinner for BinsDim0TimeBinner { } } - fn bins_ready(&mut self) -> Option> { + fn bins_ready(&mut self) -> Option> { match self.ready.take() { Some(k) => Some(Box::new(k)), None => None, diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index 3ff2fef..4660b2a 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -1,14 +1,14 @@ -use std::any::Any; -use std::fmt; - +use crate::merger; use crate::merger_cev::MergeableCev; -use crate::streams::Collectable; -use crate::streams::Collector; -use crate::{merger, Events}; +use crate::Events; use items::FrameType; use items::FrameTypeInnerStatic; +use items_0::collect_s::Collectable; +use items_0::collect_s::Collector; use netpod::log::*; use serde::{Deserialize, Serialize}; +use std::any::Any; +use std::fmt; // TODO maybe rename to ChannelStatus? #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -170,7 +170,7 @@ mod serde_channel_events { mod test_channel_events_serde { use super::ChannelEvents; use crate::eventsdim0::EventsDim0; - use crate::Empty; + use items_0::Empty; #[test] fn channel_events() { @@ -298,8 +298,8 @@ pub struct ChannelEventsTimeBinner { } impl fmt::Debug for ChannelEventsTimeBinner { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ChannelEventsTimeBinner") + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("ChannelEventsTimeBinner") .field("conn_state", &self.conn_state) .finish() } @@ -307,7 +307,7 @@ impl fmt::Debug for ChannelEventsTimeBinner { impl crate::timebin::TimeBinner for ChannelEventsTimeBinner { type Input = ChannelEvents; - type Output = Box; + type Output = Box; fn ingest(&mut self, item: &mut Self::Input) { match item { @@ -386,14 +386,14 @@ impl crate::timebin::TimeBinnable for ChannelEvents { #[derive(Debug, Serialize, Deserialize)] pub struct ChannelEventsCollectorOutput {} -impl crate::AsAnyRef for ChannelEventsCollectorOutput { +impl items_0::AsAnyRef for ChannelEventsCollectorOutput { fn as_any_ref(&self) -> &dyn Any { self } } impl crate::ToJsonResult for ChannelEventsCollectorOutput { - fn to_json_result(&self) -> Result, err::Error> { + fn to_json_result(&self) -> Result, err::Error> { todo!() } @@ -402,11 +402,11 @@ impl crate::ToJsonResult for ChannelEventsCollectorOutput { } } -impl crate::collect::Collected for ChannelEventsCollectorOutput {} +impl items_0::collect_c::Collected for ChannelEventsCollectorOutput {} #[derive(Debug)] pub struct ChannelEventsCollector { - coll: Option>, + coll: Option>, range_complete: bool, timed_out: bool, } @@ -421,9 +421,9 @@ impl ChannelEventsCollector { } } -impl crate::collect::Collector for ChannelEventsCollector { +impl items_0::collect_c::Collector for ChannelEventsCollector { type Input = ChannelEvents; - type Output = Box; + type Output = Box; fn len(&self) -> usize { match &self.coll { @@ -456,7 +456,7 @@ impl crate::collect::Collector for ChannelEventsCollector { self.timed_out = true; } - fn result(&mut self) -> Result { + fn result(&mut self) -> Result { match self.coll.as_mut() { Some(coll) => { if self.range_complete { @@ -479,7 +479,7 @@ impl crate::collect::Collector for ChannelEventsCollector { } } -impl crate::collect::Collectable for ChannelEvents { +impl items_0::collect_c::Collectable for ChannelEvents { type Collector = ChannelEventsCollector; fn new_collector(&self) -> Self::Collector { diff --git a/items_2/src/collect.rs b/items_2/src/collect.rs index eb08751..8b13789 100644 --- a/items_2/src/collect.rs +++ b/items_2/src/collect.rs @@ -1,68 +1 @@ -use crate::AsAnyRef; -use crate::Error; -use std::any::Any; -use std::fmt; -pub trait Collector: fmt::Debug + Send { - // TODO should require here Collectable? - type Input; - type Output: Collected; - - fn len(&self) -> usize; - - fn ingest(&mut self, item: &mut Self::Input); - - fn set_range_complete(&mut self); - - fn set_timed_out(&mut self); - - fn result(&mut self) -> Result; -} - -pub trait Collectable: fmt::Debug { - type Collector: Collector; - - fn new_collector(&self) -> Self::Collector; -} - -pub trait Collected: fmt::Debug + crate::streams::ToJsonResult + AsAnyRef + Send {} - -erased_serde::serialize_trait_object!(Collected); - -impl AsAnyRef for Box { - fn as_any_ref(&self) -> &dyn Any { - self.as_ref().as_any_ref() - } -} - -impl crate::streams::ToJsonResult for Box { - fn to_json_result(&self) -> Result, err::Error> { - self.as_ref().to_json_result() - } - - fn as_any(&self) -> &dyn Any { - self - } -} - -impl Collected for Box {} - -#[derive(Debug)] -pub struct CollectorDynDefault {} - -pub trait CollectorDyn: fmt::Debug + Send { - fn len(&self) -> usize; - - fn ingest(&mut self, item: &mut dyn CollectableWithDefault); - - fn set_range_complete(&mut self); - - fn set_timed_out(&mut self); - - fn result(&mut self) -> Result, Error>; -} - -pub trait CollectableWithDefault { - fn new_collector(&self) -> Box; - fn as_any_mut(&mut self) -> &mut dyn Any; -} diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 06cffc1..367902c 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -1,9 +1,9 @@ use crate::binsdim0::BinsDim0; -use crate::streams::{CollectableType, CollectorType, ToJsonResult}; +use crate::ScalarOps; use crate::{pulse_offs_from_abs, ts_offs_from_abs, RangeOverlapInfo}; -use crate::{Empty, Events, ScalarOps, WithLen}; use crate::{TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinner}; use err::Error; +use items_0::{Empty, Events, WithLen}; use netpod::log::*; use netpod::timeunits::SEC; use netpod::NanoRange; @@ -170,16 +170,16 @@ impl EventsDim0CollectorOutput { } } -impl crate::AsAnyRef for EventsDim0CollectorOutput { +impl items_0::AsAnyRef for EventsDim0CollectorOutput { fn as_any_ref(&self) -> &dyn Any { self } } -impl crate::collect::Collected for EventsDim0CollectorOutput {} +impl items_0::collect_c::Collected for EventsDim0CollectorOutput {} -impl ToJsonResult for EventsDim0CollectorOutput { - fn to_json_result(&self) -> Result, Error> { +impl items_0::collect_s::ToJsonResult for EventsDim0CollectorOutput { + fn to_json_result(&self) -> Result, Error> { let k = serde_json::to_value(self)?; Ok(Box::new(k)) } @@ -189,7 +189,7 @@ impl ToJsonResult for EventsDim0CollectorOutput { } } -impl CollectorType for EventsDim0Collector { +impl items_0::collect_s::CollectorType for EventsDim0Collector { type Input = EventsDim0; type Output = EventsDim0CollectorOutput; @@ -226,7 +226,7 @@ impl CollectorType for EventsDim0Collector { } } -impl CollectableType for EventsDim0 { +impl items_0::collect_s::CollectableType for EventsDim0 { type Collector = EventsDim0Collector; fn new_collector() -> Self::Collector { @@ -234,7 +234,7 @@ impl CollectableType for EventsDim0 { } } -impl crate::collect::Collector for EventsDim0Collector { +impl items_0::collect_c::Collector for EventsDim0Collector { type Input = EventsDim0; type Output = EventsDim0CollectorOutput; @@ -243,19 +243,19 @@ impl crate::collect::Collector for EventsDim0Collector { } fn ingest(&mut self, item: &mut Self::Input) { - CollectorType::ingest(self, item) + items_0::collect_s::CollectorType::ingest(self, item) } fn set_range_complete(&mut self) { - CollectorType::set_range_complete(self) + items_0::collect_s::CollectorType::set_range_complete(self) } fn set_timed_out(&mut self) { - CollectorType::set_timed_out(self) + items_0::collect_s::CollectorType::set_timed_out(self) } - fn result(&mut self) -> Result { - CollectorType::result(self).map_err(Into::into) + fn result(&mut self) -> Result { + items_0::collect_s::CollectorType::result(self).map_err(Into::into) } } @@ -538,7 +538,7 @@ impl TimeBinnable for EventsDim0 { self as &dyn Any } - fn to_box_to_json_result(&self) -> Box { + fn to_box_to_json_result(&self) -> Box { let k = serde_json::to_value(self).unwrap(); Box::new(k) as _ } @@ -585,15 +585,15 @@ impl Events for EventsDim0 { } } - fn as_collectable_mut(&mut self) -> &mut dyn crate::streams::Collectable { + fn as_collectable_mut(&mut self) -> &mut dyn items_0::collect_s::Collectable { self } - fn as_collectable_with_default_ref(&self) -> &dyn crate::collect::CollectableWithDefault { + fn as_collectable_with_default_ref(&self) -> &dyn items_0::collect_c::CollectableWithDefault { self } - fn as_collectable_with_default_mut(&mut self) -> &mut dyn crate::collect::CollectableWithDefault { + fn as_collectable_with_default_mut(&mut self) -> &mut dyn items_0::collect_c::CollectableWithDefault { self } @@ -619,7 +619,7 @@ impl Events for EventsDim0 { fn move_into_existing(&mut self, tgt: &mut Box, ts_end: u64) -> Result<(), ()> { // TODO as_any and as_any_mut are declared on unrealted traits. Simplify. - if let Some(tgt) = crate::streams::Collectable::as_any_mut(tgt.as_mut()).downcast_mut::() { + if let Some(tgt) = items_0::collect_s::Collectable::as_any_mut(tgt.as_mut()).downcast_mut::() { // TODO improve the search let n1 = self.tss.iter().take_while(|&&x| x <= ts_end).count(); // TODO make it harder to forget new members when the struct may get modified in the future @@ -718,7 +718,7 @@ impl TimeBinner for EventsDim0TimeBinner { } } - fn bins_ready(&mut self) -> Option> { + fn bins_ready(&mut self) -> Option> { match self.ready.take() { Some(k) => Some(Box::new(k)), None => None, @@ -867,12 +867,12 @@ impl EventsDim0CollectorDyn { } } -impl crate::collect::CollectorDyn for EventsDim0CollectorDyn { +impl items_0::collect_c::CollectorDyn for EventsDim0CollectorDyn { fn len(&self) -> usize { todo!() } - fn ingest(&mut self, _item: &mut dyn crate::collect::CollectableWithDefault) { + fn ingest(&mut self, _item: &mut dyn items_0::collect_c::CollectableWithDefault) { // TODO remove this struct? todo!() } @@ -885,20 +885,20 @@ impl crate::collect::CollectorDyn for EventsDim0CollectorDyn { todo!() } - fn result(&mut self) -> Result, crate::Error> { + fn result(&mut self) -> Result, err::Error> { todo!() } } -impl crate::collect::CollectorDyn for EventsDim0Collector { +impl items_0::collect_c::CollectorDyn for EventsDim0Collector { fn len(&self) -> usize { WithLen::len(self) } - fn ingest(&mut self, item: &mut dyn crate::collect::CollectableWithDefault) { + fn ingest(&mut self, item: &mut dyn items_0::collect_c::CollectableWithDefault) { let x = item.as_any_mut(); if let Some(item) = x.downcast_mut::>() { - CollectorType::ingest(self, item) + items_0::collect_s::CollectorType::ingest(self, item) } else { // TODO need possibility to return error () @@ -906,22 +906,22 @@ impl crate::collect::CollectorDyn for EventsDim0Collector { } fn set_range_complete(&mut self) { - CollectorType::set_range_complete(self); + items_0::collect_s::CollectorType::set_range_complete(self); } fn set_timed_out(&mut self) { - CollectorType::set_timed_out(self); + items_0::collect_s::CollectorType::set_timed_out(self); } - fn result(&mut self) -> Result, crate::Error> { - CollectorType::result(self) + fn result(&mut self) -> Result, err::Error> { + items_0::collect_s::CollectorType::result(self) .map(|x| Box::new(x) as _) .map_err(|e| e.into()) } } -impl crate::collect::CollectableWithDefault for EventsDim0 { - fn new_collector(&self) -> Box { +impl items_0::collect_c::CollectableWithDefault for EventsDim0 { + fn new_collector(&self) -> Box { let coll = EventsDim0Collector::::new(); Box::new(coll) } @@ -931,7 +931,7 @@ impl crate::collect::CollectableWithDefault for EventsDim0 } } -impl crate::collect::Collectable for EventsDim0 { +impl items_0::collect_c::Collectable for EventsDim0 { type Collector = EventsDim0Collector; fn new_collector(&self) -> Self::Collector { diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index 05db162..fec3093 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -10,10 +10,6 @@ pub mod test; pub mod testgen; pub mod timebin; -use crate as items_2; -use crate::streams::Collectable; -use crate::streams::Collector; -use crate::streams::ToJsonResult; use channelevents::ChannelEvents; use chrono::{DateTime, TimeZone, Utc}; use futures_util::FutureExt; @@ -23,11 +19,16 @@ use items::RangeCompletableItem; use items::Sitemty; use items::StreamItem; use items::SubFrId; +use items_0::collect_s::Collector; +use items_0::collect_s::ToJsonResult; +use items_0::Empty; +use items_0::Events; +use items_0::RangeOverlapInfo; +use items_0::{TimeBinnable, TimeBinner}; use netpod::log::*; use netpod::timeunits::*; use netpod::{AggKind, NanoRange, ScalarType, Shape}; use serde::{Deserialize, Serialize, Serializer}; -use std::any::Any; use std::collections::VecDeque; use std::fmt; use std::pin::Pin; @@ -220,63 +221,6 @@ impl serde::de::Error for Error { } } -pub trait WithLen { - fn len(&self) -> usize; -} - -// TODO can probably be removed. -pub trait TimeBins { - fn ts_min(&self) -> Option; - fn ts_max(&self) -> Option; - fn ts_min_max(&self) -> Option<(u64, u64)>; -} - -pub enum Fits { - Empty, - Lower, - Greater, - Inside, - PartlyLower, - PartlyGreater, - PartlyLowerAndGreater, -} - -pub trait RangeOverlapInfo { - fn ends_before(&self, range: NanoRange) -> bool; - fn ends_after(&self, range: NanoRange) -> bool; - fn starts_after(&self, range: NanoRange) -> bool; -} - -pub trait EmptyForScalarTypeShape { - fn empty(scalar_type: ScalarType, shape: Shape) -> Self; -} - -pub trait EmptyForShape { - fn empty(shape: Shape) -> Self; -} - -pub trait Empty { - fn empty() -> Self; -} - -pub trait AppendEmptyBin { - fn append_empty_bin(&mut self, ts1: u64, ts2: u64); -} - -pub trait AsAnyRef { - fn as_any_ref(&self) -> &dyn Any; -} - -pub trait AsAnyMut { - fn as_any_mut(&mut self) -> &mut dyn Any; -} - -/*impl AsAnyRef for Box { - fn as_any_ref(&self) -> &dyn Any { - self.as_ref().as_any_ref() - } -}*/ - #[derive(Clone, Debug, PartialEq, Deserialize)] pub struct IsoDateTime(DateTime); @@ -295,87 +239,6 @@ pub fn make_iso_ts(tss: &[u64]) -> Vec { .collect() } -pub trait TimeBinner: Send { - fn ingest(&mut self, item: &dyn TimeBinnable); - fn bins_ready_count(&self) -> usize; - fn bins_ready(&mut self) -> Option>; - - /// If there is a bin in progress with non-zero count, push it to the result set. - /// With push_empty == true, a bin in progress is pushed even if it contains no counts. - fn push_in_progress(&mut self, push_empty: bool); - - /// Implies `Self::push_in_progress` but in addition, pushes a zero-count bin if the call - /// to `push_in_progress` did not change the result count, as long as edges are left. - /// The next call to `Self::bins_ready_count` must return one higher count than before. - fn cycle(&mut self); - - fn set_range_complete(&mut self); -} - -// TODO remove the Any bound. Factor out into custom AsAny trait. - -/// Provides a time-binned representation of the implementing type. -/// In contrast to `TimeBinnableType` this is meant for trait objects. -pub trait TimeBinnable: fmt::Debug + WithLen + RangeOverlapInfo + Any + Send { - // TODO implementors may fail if edges contain not at least 2 entries. - fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box; - fn as_any(&self) -> &dyn Any; - - // TODO just a helper for the empty result. - fn to_box_to_json_result(&self) -> Box; -} - -// Helper trait to bridge between impls of event containers during refactoring. -// TODO get rid when no longer needed. -pub trait IntoEvents { - fn into_events(self) -> Box; -} - -impl IntoEvents for items::scalarevents::ScalarEvents -where - NTY: ScalarOps, -{ - fn into_events(self) -> Box { - let ret = items_2::eventsdim0::EventsDim0:: { - tss: self.tss.into(), - pulses: self.pulses.into(), - values: self.values.into(), - }; - Box::new(ret) - } -} - -// TODO can I remove the Any bound? - -/// Container of some form of events, for use as trait object. -pub trait Events: - fmt::Debug - + Any - + Collectable - + items_2::collect::CollectableWithDefault - + TimeBinnable - + Send - + erased_serde::Serialize -{ - fn as_time_binnable(&self) -> &dyn TimeBinnable; - fn verify(&self) -> bool; - fn output_info(&self); - fn as_collectable_mut(&mut self) -> &mut dyn Collectable; - fn as_collectable_with_default_ref(&self) -> &dyn crate::collect::CollectableWithDefault; - fn as_collectable_with_default_mut(&mut self) -> &mut dyn crate::collect::CollectableWithDefault; - fn ts_min(&self) -> Option; - fn ts_max(&self) -> Option; - fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box; - fn move_into_fresh(&mut self, ts_end: u64) -> Box; - fn move_into_existing(&mut self, tgt: &mut Box, ts_end: u64) -> Result<(), ()>; - fn clone_dyn(&self) -> Box; - fn partial_eq_dyn(&self, other: &dyn Events) -> bool; - fn serde_id(&self) -> &'static str; - fn nty_id(&self) -> u32; -} - -erased_serde::serialize_trait_object!(Events); - impl crate::merger::Mergeable for Box { fn len(&self) -> usize { self.as_ref().len() @@ -405,24 +268,6 @@ impl crate::merger::Mergeable for Box { } } -impl PartialEq for Box { - fn eq(&self, other: &Self) -> bool { - Events::partial_eq_dyn(self.as_ref(), other.as_ref()) - } -} - -/// Data in time-binned form. -pub trait TimeBinned: Any + TimeBinnable { - fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable; - fn as_collectable_mut(&mut self) -> &mut dyn Collectable; - fn edges_slice(&self) -> (&[u64], &[u64]); - fn counts(&self) -> &[u64]; - fn mins(&self) -> Vec; - fn maxs(&self) -> Vec; - fn avgs(&self) -> Vec; - fn validate(&self) -> Result<(), String>; -} - pub trait TimeBinnableType: Send + Unpin + RangeOverlapInfo { type Output: TimeBinnableType; type Aggregator: TimeBinnableTypeAggregator + Send + Unpin; @@ -570,64 +415,6 @@ pub fn empty_binned_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggK } } -#[derive(Debug)] -pub struct EventsCollector { - coll: Box, -} - -impl EventsCollector { - pub fn new(coll: Box) -> Self { - Self { coll } - } -} - -impl items_2::collect::Collector for EventsCollector { - type Input = Box; - - // TODO this Output trait does not differentiate between e.g. collected events, collected bins, different aggs, etc... - type Output = Box; - - fn len(&self) -> usize { - self.coll.len() - } - - fn ingest(&mut self, item: &mut Self::Input) { - self.coll.ingest(item.as_collectable_with_default_mut()); - } - - fn set_range_complete(&mut self) { - self.coll.set_range_complete() - } - - fn set_timed_out(&mut self) { - self.coll.set_timed_out() - } - - fn result(&mut self) -> Result { - self.coll.result() - } -} - -impl items_2::collect::Collectable for Box { - type Collector = EventsCollector; - - fn new_collector(&self) -> Self::Collector { - let coll = items_2::collect::CollectableWithDefault::new_collector(self.as_ref()); - EventsCollector::new(coll) - } -} - -// TODO do this with some blanket impl: -impl Collectable for Box { - fn new_collector(&self) -> Box { - Collectable::new_collector(self.as_ref()) - } - - fn as_any_mut(&mut self) -> &mut dyn Any { - Collectable::as_any_mut(self.as_mut()) - } -} - fn flush_binned( binner: &mut Box, coll: &mut Option>, diff --git a/items_2/src/streams.rs b/items_2/src/streams.rs index bb595f6..8b13789 100644 --- a/items_2/src/streams.rs +++ b/items_2/src/streams.rs @@ -1,94 +1 @@ -use crate::WithLen; -use err::Error; -use serde::Serialize; -use std::any::Any; -use std::fmt; -pub trait CollectorType: Send + Unpin + WithLen { - type Input: Collectable; - type Output: crate::collect::Collected + ToJsonResult + Serialize; - - fn ingest(&mut self, src: &mut Self::Input); - fn set_range_complete(&mut self); - fn set_timed_out(&mut self); - - // TODO use this crate's Error instead: - fn result(&mut self) -> Result; -} - -pub trait Collector: Send + Unpin + WithLen { - fn ingest(&mut self, src: &mut dyn Collectable); - fn set_range_complete(&mut self); - fn set_timed_out(&mut self); - fn result(&mut self) -> Result, Error>; -} - -pub trait CollectableType { - type Collector: CollectorType; - fn new_collector() -> Self::Collector; -} - -pub trait Collectable: Any { - fn new_collector(&self) -> Box; - fn as_any_mut(&mut self) -> &mut dyn Any; -} - -impl Collector for T { - fn ingest(&mut self, src: &mut dyn Collectable) { - let src: &mut ::Input = src.as_any_mut().downcast_mut().expect("can not downcast"); - T::ingest(self, src) - } - - fn set_range_complete(&mut self) { - T::set_range_complete(self) - } - - fn set_timed_out(&mut self) { - T::set_timed_out(self) - } - - fn result(&mut self) -> Result, Error> { - let ret = T::result(self)?; - Ok(Box::new(ret) as _) - } -} - -impl Collectable for T { - fn new_collector(&self) -> Box { - Box::new(T::new_collector()) as _ - } - - fn as_any_mut(&mut self) -> &mut dyn Any { - // TODO interesting: why exactly does returning `&mut self` not work here? - self - } -} - -// TODO check usage of this trait -pub trait ToJsonBytes { - fn to_json_bytes(&self) -> Result, Error>; -} - -// TODO check usage of this trait -pub trait ToJsonResult: erased_serde::Serialize + fmt::Debug + Send { - fn to_json_result(&self) -> Result, Error>; - fn as_any(&self) -> &dyn Any; -} - -erased_serde::serialize_trait_object!(ToJsonResult); - -impl ToJsonResult for serde_json::Value { - fn to_json_result(&self) -> Result, Error> { - Ok(Box::new(self.clone())) - } - - fn as_any(&self) -> &dyn Any { - self - } -} - -impl ToJsonBytes for serde_json::Value { - fn to_json_bytes(&self) -> Result, Error> { - Ok(serde_json::to_vec(self)?) - } -} diff --git a/items_2/src/test.rs b/items_2/src/test.rs index 6914380..ddc5833 100644 --- a/items_2/src/test.rs +++ b/items_2/src/test.rs @@ -5,10 +5,11 @@ use crate::merger::{Mergeable, Merger}; use crate::merger_cev::ChannelEventsMerger; use crate::testgen::make_some_boxed_d0_f32; use crate::Error; -use crate::{binned_collected, runfut, ChannelEvents, Empty, Events, IsoDateTime}; +use crate::{binned_collected, runfut, ChannelEvents, Events, IsoDateTime}; use chrono::{TimeZone, Utc}; use futures_util::{stream, StreamExt}; use items::{sitem_data, RangeCompletableItem, Sitemty, StreamItem}; +use items_0::Empty; use netpod::log::*; use netpod::timeunits::*; use netpod::{AggKind, BinnedRange, NanoRange, ScalarType, Shape}; diff --git a/items_2/src/testgen.rs b/items_2/src/testgen.rs index 3389f9b..8f72352 100644 --- a/items_2/src/testgen.rs +++ b/items_2/src/testgen.rs @@ -1,5 +1,6 @@ use crate::eventsdim0::EventsDim0; -use crate::{Empty, Events}; +use crate::Events; +use items_0::Empty; #[allow(unused)] fn xorshift32(state: u32) -> u32 { diff --git a/nodenet/Cargo.toml b/nodenet/Cargo.toml index 70669f1..56b7ba0 100644 --- a/nodenet/Cargo.toml +++ b/nodenet/Cargo.toml @@ -30,6 +30,7 @@ netpod = { path = "../netpod" } disk = { path = "../disk" } #parse = { path = "../parse" } items = { path = "../items" } +items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } dbconn = { path = "../dbconn" } scyllaconn = { path = "../scyllaconn" } diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 6d0a55d..0aba3e0 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -110,7 +110,7 @@ async fn events_conn_handler_inner_try( let mut p1: Pin> + Send>> = if evq.channel().backend() == "test-inmem" { warn!("TEST BACKEND DATA"); - use items_2::Empty; + use items_0::Empty; use netpod::timeunits::MS; let node_count = node_config.node_config.cluster.nodes.len(); let node_ix = node_config.ix; diff --git a/scyllaconn/Cargo.toml b/scyllaconn/Cargo.toml index d393286..1b3cdcc 100644 --- a/scyllaconn/Cargo.toml +++ b/scyllaconn/Cargo.toml @@ -24,4 +24,5 @@ scylla = "0.5" tokio-postgres = { version = "0.7.7", features = ["with-chrono-0_4", "with-serde_json-1"] } err = { path = "../err" } netpod = { path = "../netpod" } +items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } diff --git a/scyllaconn/src/bincache.rs b/scyllaconn/src/bincache.rs index aa51c32..8fa40af 100644 --- a/scyllaconn/src/bincache.rs +++ b/scyllaconn/src/bincache.rs @@ -2,9 +2,10 @@ use crate::errconv::ErrConv; use crate::events::EventsStreamScylla; use err::Error; use futures_util::{Future, Stream, StreamExt}; +use items_0::TimeBinned; use items_2::binsdim0::BinsDim0; use items_2::channelevents::ChannelEvents; -use items_2::{empty_binned_dyn, empty_events_dyn, TimeBinned}; +use items_2::{empty_binned_dyn, empty_events_dyn}; use netpod::log::*; use netpod::query::{CacheUsage, PlainEventsQuery, RawEventsQuery}; use netpod::timeunits::*; diff --git a/scyllaconn/src/events.rs b/scyllaconn/src/events.rs index 819f2f1..c28e8fb 100644 --- a/scyllaconn/src/events.rs +++ b/scyllaconn/src/events.rs @@ -1,9 +1,9 @@ use crate::errconv::ErrConv; use err::Error; use futures_util::{Future, FutureExt, Stream, StreamExt}; +use items_0::{Empty, Events, WithLen}; use items_2::channelevents::{ChannelEvents, ConnStatus, ConnStatusEvent}; use items_2::eventsdim0::EventsDim0; -use items_2::{Empty, Events, WithLen}; use netpod::log::*; use netpod::query::{ChannelStateEventsQuery, PlainEventsQuery}; use netpod::timeunits::*; diff --git a/streams/Cargo.toml b/streams/Cargo.toml index bcb81df..5a6979f 100644 --- a/streams/Cargo.toml +++ b/streams/Cargo.toml @@ -20,6 +20,7 @@ chrono = { version = "0.4.19", features = ["serde"] } err = { path = "../err" } netpod = { path = "../netpod" } items = { path = "../items" } +items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } parse = { path = "../parse" } bitshuffle = { path = "../bitshuffle" } diff --git a/streams/src/collect.rs b/streams/src/collect.rs index 2cdce7b..10ca585 100644 --- a/streams/src/collect.rs +++ b/streams/src/collect.rs @@ -1,7 +1,7 @@ use err::Error; use futures_util::{Stream, StreamExt}; use items::{RangeCompletableItem, Sitemty, StreamItem}; -use items_2::collect::{Collectable, Collector}; +use items_0::collect_c::{Collectable, Collector}; use netpod::log::*; use std::fmt; use std::time::{Duration, Instant}; diff --git a/streams/src/merge/mergedstream.rs b/streams/src/merge/mergedstream.rs index a6105d4..f9cb39b 100644 --- a/streams/src/merge/mergedstream.rs +++ b/streams/src/merge/mergedstream.rs @@ -296,8 +296,8 @@ where #[cfg(test)] mod test { + use items_0::Empty; use items_2::channelevents::ChannelEvents; - use items_2::Empty; #[test] fn merge_channel_events() { diff --git a/streams/src/test.rs b/streams/src/test.rs index 3115a4e..60ced5d 100644 --- a/streams/src/test.rs +++ b/streams/src/test.rs @@ -6,9 +6,9 @@ mod timebin; use err::Error; use futures_util::{stream, Stream}; use items::{sitem_data, Sitemty}; +use items_0::Empty; use items_2::channelevents::ChannelEvents; use items_2::eventsdim0::EventsDim0; -use items_2::Empty; use netpod::timeunits::SEC; use std::pin::Pin;