WIP on collector

This commit is contained in:
Dominik Werder
2022-11-18 16:01:35 +01:00
parent d57aa5474e
commit 7cdf5975b9
7 changed files with 228 additions and 101 deletions

23
items_2/src/collect.rs Normal file
View File

@@ -0,0 +1,23 @@
use crate::Error;
use std::fmt;
pub trait Collector: fmt::Debug {
type Input;
type Output;
fn len(&self) -> usize;
fn ingest(&mut self, item: &mut Self::Input);
fn set_range_complete(&mut self);
fn set_timed_out(&mut self);
fn result(&mut self) -> Result<Self::Output, Error>;
}
pub trait Collectable: fmt::Debug {
type Collector: Collector<Input = Self>;
fn new_collector(&self) -> Self::Collector;
}

View File

@@ -122,6 +122,7 @@ impl<NTY: ScalarOps> TimeBinnableType for EventsDim0<NTY> {
}
}
#[derive(Debug)]
pub struct EventsDim0Collector<NTY> {
vals: EventsDim0<NTY>,
range_complete: bool,
@@ -219,6 +220,32 @@ impl<NTY: ScalarOps> CollectableType for EventsDim0<NTY> {
}
}
impl<NTY: ScalarOps> crate::collect::Collector for EventsDim0Collector<NTY> {
type Input = EventsDim0<NTY>;
// TODO the output probably needs to be different to accommodate also range-complete, continue-at, etc
type Output = EventsDim0CollectorOutput<NTY>;
fn len(&self) -> usize {
self.vals.len()
}
fn ingest(&mut self, item: &mut Self::Input) {
CollectorType::ingest(self, item)
}
fn set_range_complete(&mut self) {
CollectorType::set_range_complete(self)
}
fn set_timed_out(&mut self) {
CollectorType::set_timed_out(self)
}
fn result(&mut self) -> Result<Self::Output, crate::Error> {
CollectorType::result(self).map_err(Into::into)
}
}
pub struct EventsDim0Aggregator<NTY> {
range: NanoRange,
count: u64,
@@ -809,3 +836,11 @@ impl<NTY: ScalarOps> TimeBinner for EventsDim0TimeBinner<NTY> {
self.range_complete = true;
}
}
impl<NTY: ScalarOps> crate::collect::Collectable for EventsDim0<NTY> {
type Collector;
fn new_collector(&self) -> Self::Collector {
todo!()
}
}

View File

@@ -1,4 +1,5 @@
pub mod binsdim0;
pub mod collect;
pub mod eventsdim0;
pub mod merger;
pub mod merger_cev;
@@ -365,15 +366,17 @@ impl PartialEq for Box<dyn Events> {
}
}
struct EventsCollector {}
// TODO remove
struct EventsCollector2 {}
impl WithLen for EventsCollector {
impl WithLen for EventsCollector2 {
fn len(&self) -> usize {
todo!()
}
}
impl Collector for EventsCollector {
// TODO remove
impl Collector for EventsCollector2 {
fn ingest(&mut self, _src: &mut dyn Collectable) {
todo!()
}
@@ -391,9 +394,10 @@ impl Collector for EventsCollector {
}
}
// TODO remove
impl Collectable for Box<dyn Events> {
fn new_collector(&self) -> Box<dyn Collector> {
Box::new(EventsCollector {})
Box::new(EventsCollector2 {})
}
fn as_any_mut(&mut self) -> &mut dyn Any {
@@ -737,7 +741,8 @@ mod serde_channel_events {
#[cfg(test)]
mod test_channel_events_serde {
use super::ChannelEvents;
use crate::{eventsdim0::EventsDim0, Empty};
use crate::eventsdim0::EventsDim0;
use crate::Empty;
#[test]
fn channel_events() {
@@ -950,6 +955,50 @@ impl crate::timebin::TimeBinnable for ChannelEvents {
}
}
#[derive(Debug)]
pub struct EventsCollector {
coll: Option<Box<()>>,
}
impl EventsCollector {
pub fn new() -> Self {
Self { coll: Box::new(()) }
}
}
impl crate::collect::Collector for EventsCollector {
type Input = Box<dyn Events>;
type Output = Box<dyn Events>;
fn len(&self) -> usize {
todo!()
}
fn ingest(&mut self, item: &mut Self::Input) {
todo!()
}
fn set_range_complete(&mut self) {
todo!()
}
fn set_timed_out(&mut self) {
todo!()
}
fn result(&mut self) -> Result<Self::Output, Error> {
todo!()
}
}
impl crate::collect::Collectable for Box<dyn Events> {
type Collector = EventsCollector;
fn new_collector(&self) -> Self::Collector {
Collectable::new_collector(self)
}
}
// TODO do this with some blanket impl:
impl Collectable for Box<dyn Collectable> {
fn new_collector(&self) -> Box<dyn streams::Collector> {
@@ -994,7 +1043,7 @@ fn flush_binned(
}
}
// TODO handle status information.
// TODO remove
pub async fn binned_collected(
scalar_type: ScalarType,
shape: Shape,

View File

@@ -11,6 +11,8 @@ pub trait CollectorType: Send + Unpin + WithLen {
fn ingest(&mut self, src: &mut Self::Input);
fn set_range_complete(&mut self);
fn set_timed_out(&mut self);
// TODO use this crate's Error instead:
fn result(&mut self) -> Result<Self::Output, Error>;
}