factor out into common crate

This commit is contained in:
Dominik Werder
2022-11-29 15:50:16 +01:00
parent 075bdd05bd
commit 9fe63706cf
25 changed files with 466 additions and 449 deletions

View File

@@ -10,10 +10,6 @@ pub mod test;
pub mod testgen;
pub mod timebin;
use crate as items_2;
use crate::streams::Collectable;
use crate::streams::Collector;
use crate::streams::ToJsonResult;
use channelevents::ChannelEvents;
use chrono::{DateTime, TimeZone, Utc};
use futures_util::FutureExt;
@@ -23,11 +19,16 @@ use items::RangeCompletableItem;
use items::Sitemty;
use items::StreamItem;
use items::SubFrId;
use items_0::collect_s::Collector;
use items_0::collect_s::ToJsonResult;
use items_0::Empty;
use items_0::Events;
use items_0::RangeOverlapInfo;
use items_0::{TimeBinnable, TimeBinner};
use netpod::log::*;
use netpod::timeunits::*;
use netpod::{AggKind, NanoRange, ScalarType, Shape};
use serde::{Deserialize, Serialize, Serializer};
use std::any::Any;
use std::collections::VecDeque;
use std::fmt;
use std::pin::Pin;
@@ -220,63 +221,6 @@ impl serde::de::Error for Error {
}
}
pub trait WithLen {
fn len(&self) -> usize;
}
// TODO can probably be removed.
pub trait TimeBins {
fn ts_min(&self) -> Option<u64>;
fn ts_max(&self) -> Option<u64>;
fn ts_min_max(&self) -> Option<(u64, u64)>;
}
pub enum Fits {
Empty,
Lower,
Greater,
Inside,
PartlyLower,
PartlyGreater,
PartlyLowerAndGreater,
}
pub trait RangeOverlapInfo {
fn ends_before(&self, range: NanoRange) -> bool;
fn ends_after(&self, range: NanoRange) -> bool;
fn starts_after(&self, range: NanoRange) -> bool;
}
pub trait EmptyForScalarTypeShape {
fn empty(scalar_type: ScalarType, shape: Shape) -> Self;
}
pub trait EmptyForShape {
fn empty(shape: Shape) -> Self;
}
pub trait Empty {
fn empty() -> Self;
}
pub trait AppendEmptyBin {
fn append_empty_bin(&mut self, ts1: u64, ts2: u64);
}
pub trait AsAnyRef {
fn as_any_ref(&self) -> &dyn Any;
}
pub trait AsAnyMut {
fn as_any_mut(&mut self) -> &mut dyn Any;
}
/*impl AsAnyRef for Box<dyn AsAnyRef> {
fn as_any_ref(&self) -> &dyn Any {
self.as_ref().as_any_ref()
}
}*/
#[derive(Clone, Debug, PartialEq, Deserialize)]
pub struct IsoDateTime(DateTime<Utc>);
@@ -295,87 +239,6 @@ pub fn make_iso_ts(tss: &[u64]) -> Vec<IsoDateTime> {
.collect()
}
pub trait TimeBinner: Send {
fn ingest(&mut self, item: &dyn TimeBinnable);
fn bins_ready_count(&self) -> usize;
fn bins_ready(&mut self) -> Option<Box<dyn TimeBinned>>;
/// If there is a bin in progress with non-zero count, push it to the result set.
/// With push_empty == true, a bin in progress is pushed even if it contains no counts.
fn push_in_progress(&mut self, push_empty: bool);
/// Implies `Self::push_in_progress` but in addition, pushes a zero-count bin if the call
/// to `push_in_progress` did not change the result count, as long as edges are left.
/// The next call to `Self::bins_ready_count` must return one higher count than before.
fn cycle(&mut self);
fn set_range_complete(&mut self);
}
// TODO remove the Any bound. Factor out into custom AsAny trait.
/// Provides a time-binned representation of the implementing type.
/// In contrast to `TimeBinnableType` this is meant for trait objects.
pub trait TimeBinnable: fmt::Debug + WithLen + RangeOverlapInfo + Any + Send {
// TODO implementors may fail if edges contain not at least 2 entries.
fn time_binner_new(&self, edges: Vec<u64>, do_time_weight: bool) -> Box<dyn TimeBinner>;
fn as_any(&self) -> &dyn Any;
// TODO just a helper for the empty result.
fn to_box_to_json_result(&self) -> Box<dyn ToJsonResult>;
}
// Helper trait to bridge between impls of event containers during refactoring.
// TODO get rid when no longer needed.
pub trait IntoEvents {
fn into_events(self) -> Box<dyn Events>;
}
impl<NTY> IntoEvents for items::scalarevents::ScalarEvents<NTY>
where
NTY: ScalarOps,
{
fn into_events(self) -> Box<dyn Events> {
let ret = items_2::eventsdim0::EventsDim0::<NTY> {
tss: self.tss.into(),
pulses: self.pulses.into(),
values: self.values.into(),
};
Box::new(ret)
}
}
// TODO can I remove the Any bound?
/// Container of some form of events, for use as trait object.
pub trait Events:
fmt::Debug
+ Any
+ Collectable
+ items_2::collect::CollectableWithDefault
+ TimeBinnable
+ Send
+ erased_serde::Serialize
{
fn as_time_binnable(&self) -> &dyn TimeBinnable;
fn verify(&self) -> bool;
fn output_info(&self);
fn as_collectable_mut(&mut self) -> &mut dyn Collectable;
fn as_collectable_with_default_ref(&self) -> &dyn crate::collect::CollectableWithDefault;
fn as_collectable_with_default_mut(&mut self) -> &mut dyn crate::collect::CollectableWithDefault;
fn ts_min(&self) -> Option<u64>;
fn ts_max(&self) -> Option<u64>;
fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box<dyn Events>;
fn move_into_fresh(&mut self, ts_end: u64) -> Box<dyn Events>;
fn move_into_existing(&mut self, tgt: &mut Box<dyn Events>, ts_end: u64) -> Result<(), ()>;
fn clone_dyn(&self) -> Box<dyn Events>;
fn partial_eq_dyn(&self, other: &dyn Events) -> bool;
fn serde_id(&self) -> &'static str;
fn nty_id(&self) -> u32;
}
erased_serde::serialize_trait_object!(Events);
impl crate::merger::Mergeable for Box<dyn Events> {
fn len(&self) -> usize {
self.as_ref().len()
@@ -405,24 +268,6 @@ impl crate::merger::Mergeable for Box<dyn Events> {
}
}
impl PartialEq for Box<dyn Events> {
fn eq(&self, other: &Self) -> bool {
Events::partial_eq_dyn(self.as_ref(), other.as_ref())
}
}
/// Data in time-binned form.
pub trait TimeBinned: Any + TimeBinnable {
fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable;
fn as_collectable_mut(&mut self) -> &mut dyn Collectable;
fn edges_slice(&self) -> (&[u64], &[u64]);
fn counts(&self) -> &[u64];
fn mins(&self) -> Vec<f32>;
fn maxs(&self) -> Vec<f32>;
fn avgs(&self) -> Vec<f32>;
fn validate(&self) -> Result<(), String>;
}
pub trait TimeBinnableType: Send + Unpin + RangeOverlapInfo {
type Output: TimeBinnableType;
type Aggregator: TimeBinnableTypeAggregator<Input = Self, Output = Self::Output> + Send + Unpin;
@@ -570,64 +415,6 @@ pub fn empty_binned_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggK
}
}
#[derive(Debug)]
pub struct EventsCollector {
coll: Box<dyn items_2::collect::CollectorDyn>,
}
impl EventsCollector {
pub fn new(coll: Box<dyn items_2::collect::CollectorDyn>) -> Self {
Self { coll }
}
}
impl items_2::collect::Collector for EventsCollector {
type Input = Box<dyn Events>;
// TODO this Output trait does not differentiate between e.g. collected events, collected bins, different aggs, etc...
type Output = Box<dyn items_2::collect::Collected>;
fn len(&self) -> usize {
self.coll.len()
}
fn ingest(&mut self, item: &mut Self::Input) {
self.coll.ingest(item.as_collectable_with_default_mut());
}
fn set_range_complete(&mut self) {
self.coll.set_range_complete()
}
fn set_timed_out(&mut self) {
self.coll.set_timed_out()
}
fn result(&mut self) -> Result<Self::Output, Error> {
self.coll.result()
}
}
impl items_2::collect::Collectable for Box<dyn Events> {
type Collector = EventsCollector;
fn new_collector(&self) -> Self::Collector {
let coll = items_2::collect::CollectableWithDefault::new_collector(self.as_ref());
EventsCollector::new(coll)
}
}
// TODO do this with some blanket impl:
impl Collectable for Box<dyn Collectable> {
fn new_collector(&self) -> Box<dyn streams::Collector> {
Collectable::new_collector(self.as_ref())
}
fn as_any_mut(&mut self) -> &mut dyn Any {
Collectable::as_any_mut(self.as_mut())
}
}
fn flush_binned(
binner: &mut Box<dyn TimeBinner>,
coll: &mut Option<Box<dyn Collector>>,