WIP
This commit is contained in:
@@ -13,7 +13,9 @@ use crate::WithLen;
|
||||
use err::Error;
|
||||
use netpod::log::*;
|
||||
use netpod::range::evrange::SeriesRange;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::BinnedRangeEnum;
|
||||
use netpod::TsNano;
|
||||
use std::any::Any;
|
||||
use std::fmt;
|
||||
use std::ops::Range;
|
||||
@@ -62,6 +64,55 @@ pub trait TimeBinnableTy: fmt::Debug + WithLen + Send + Sized {
|
||||
) -> Self::TimeBinner;
|
||||
}
|
||||
|
||||
pub enum BinningggError {
|
||||
Dyn(Box<dyn std::error::Error>),
|
||||
}
|
||||
|
||||
impl<E> From<E> for BinningggError
|
||||
where
|
||||
E: std::error::Error + 'static,
|
||||
{
|
||||
fn from(value: E) -> Self {
|
||||
Self::Dyn(Box::new(value))
|
||||
}
|
||||
}
|
||||
|
||||
pub trait BinningggBinnerTy: fmt::Debug + Send {
|
||||
type Input: fmt::Debug;
|
||||
type Output: fmt::Debug;
|
||||
|
||||
fn ingest(&mut self, item: &mut Self::Input);
|
||||
fn range_final(&mut self);
|
||||
fn bins_ready_count(&self) -> usize;
|
||||
fn bins_ready(&mut self) -> Option<Self::Output>;
|
||||
|
||||
/// If there is a bin in progress with non-zero count, push it to the result set.
|
||||
/// With push_empty == true, a bin in progress is pushed even if it contains no counts.
|
||||
fn push_in_progress(&mut self, push_empty: bool);
|
||||
|
||||
/// Implies `Self::push_in_progress` but in addition, pushes a zero-count bin if the call
|
||||
/// to `push_in_progress` did not change the result count, as long as edges are left.
|
||||
/// The next call to `Self::bins_ready_count` must return one higher count than before.
|
||||
fn cycle(&mut self);
|
||||
|
||||
fn empty(&self) -> Option<Self::Output>;
|
||||
|
||||
fn append_empty_until_end(&mut self);
|
||||
}
|
||||
|
||||
pub trait BinningggBinnableTy: fmt::Debug + WithLen + Send {
|
||||
type Binner: BinningggBinnerTy<Input = Self>;
|
||||
|
||||
fn binner_new(range: BinnedRange<TsNano>) -> Self::Binner;
|
||||
}
|
||||
|
||||
pub trait BinningggBinnerDyn: fmt::Debug + Send {
|
||||
fn input_done_range_final(&mut self) -> Result<(), BinningggError>;
|
||||
fn input_done_range_open(&mut self) -> Result<(), BinningggError>;
|
||||
}
|
||||
|
||||
pub trait BinningBinnableDyn: fmt::Debug + Send {}
|
||||
|
||||
/// Data in time-binned form.
|
||||
pub trait TimeBinned: Any + TypeName + TimeBinnable + Resettable + Collectable + erased_serde::Serialize {
|
||||
fn clone_box_time_binned(&self) -> Box<dyn TimeBinned>;
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
pub mod timeweight_bins;
|
||||
pub mod timeweight_events;
|
||||
pub mod timeweight_events_dyn;
|
||||
|
||||
use super::___;
|
||||
use netpod::log::*;
|
||||
|
||||
@@ -5,9 +5,13 @@ use crate::binning::container_bins::ContainerBins;
|
||||
use crate::binning::container_events::ContainerEvents;
|
||||
use crate::binning::container_events::ContainerEventsTakeUpTo;
|
||||
use crate::binning::container_events::EventSingle;
|
||||
use crate::channelevents::ChannelEvents;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::Stream;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use items_0::timebin::BinningggBinnerDyn;
|
||||
use items_0::timebin::BinningggBinnerTy;
|
||||
use netpod::log::*;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::DtNano;
|
||||
@@ -576,13 +580,3 @@ where
|
||||
::core::mem::replace(&mut self.out, ContainerBins::new())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BinnedEventsTimeweightStream {}
|
||||
|
||||
impl Stream for BinnedEventsTimeweightStream {
|
||||
type Item = ();
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,65 @@
|
||||
use crate::channelevents::ChannelEvents;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::Stream;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use items_0::timebin::BinningggBinnerDyn;
|
||||
use items_0::timebin::BinningggError;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::TsNano;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[cstm(name = "BinnedEventsTimeweightDyn")]
|
||||
pub enum Error {
|
||||
InnerDynMissing,
|
||||
}
|
||||
|
||||
pub struct BinnedEventsTimeweightDyn {
|
||||
range: BinnedRange<TsNano>,
|
||||
binner: Option<Box<dyn BinningggBinnerDyn>>,
|
||||
}
|
||||
|
||||
impl BinnedEventsTimeweightDyn {
|
||||
pub fn new(range: BinnedRange<TsNano>) -> Self {
|
||||
Self { range, binner: None }
|
||||
}
|
||||
|
||||
pub fn ingest(&mut self, mut evs_all: ContainerEventsDyn) -> Result<(), BinningggError> {
|
||||
TODO;
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub fn input_done_range_final(&mut self) -> Result<(), BinningggError> {
|
||||
self.binner
|
||||
.as_mut()
|
||||
.ok_or(Error::InnerDynMissing)?
|
||||
.input_done_range_final()
|
||||
}
|
||||
|
||||
pub fn input_done_range_open(&mut self) -> Result<(), BinningggError> {
|
||||
self.binner
|
||||
.as_mut()
|
||||
.ok_or(Error::InnerDynMissing)?
|
||||
.input_done_range_open()
|
||||
}
|
||||
|
||||
pub fn output(&mut self) -> ContainerBinsDyn {
|
||||
TODO;
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BinnedEventsTimeweightStream {
|
||||
inp: Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>,
|
||||
}
|
||||
|
||||
impl Stream for BinnedEventsTimeweightStream {
|
||||
type Item = ();
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user