This commit is contained in:
Dominik Werder
2024-09-09 17:04:20 +02:00
parent 490c1ed0a0
commit 741c1380c7
25 changed files with 638 additions and 85 deletions

View File

@@ -150,6 +150,17 @@ impl<NTY: ScalarOps> BinsDim0<NTY> {
}
true
}
// TODO make this part of a new bins trait, similar like Events trait.
// TODO check for error?
pub fn drain_into(&mut self, dst: &mut Self, range: Range<usize>) -> () {
dst.ts1s.extend(self.ts1s.drain(range.clone()));
dst.ts2s.extend(self.ts2s.drain(range.clone()));
dst.counts.extend(self.counts.drain(range.clone()));
dst.mins.extend(self.mins.drain(range.clone()));
dst.maxs.extend(self.maxs.drain(range.clone()));
dst.avgs.extend(self.avgs.drain(range.clone()));
}
}
impl<NTY> AsAnyRef for BinsDim0<NTY>
@@ -301,8 +312,39 @@ impl<NTY: ScalarOps> TimeBinnableType for BinsDim0<NTY> {
}
#[derive(Debug)]
pub struct BinsDim0TimeBinnerTy<STY> {
_t1: std::marker::PhantomData<STY>,
pub struct BinsDim0TimeBinnerTy<STY>
where
STY: ScalarOps,
{
ts1now: TsNano,
binrange: BinnedRange<TsNano>,
do_time_weight: bool,
emit_empty_bins: bool,
range_complete: bool,
buf: <Self as TimeBinnerTy>::Output,
out: <Self as TimeBinnerTy>::Output,
bins_ready_count: usize,
}
impl<STY> BinsDim0TimeBinnerTy<STY>
where
STY: ScalarOps,
{
pub fn new(binrange: BinnedRange<TsNano>, do_time_weight: bool, emit_empty_bins: bool) -> Self {
// let ts1now = TsNano::from_ns(binrange.bin_off * binrange.bin_len.ns());
// let ts2 = ts1.add_dt_nano(binrange.bin_len.to_dt_nano());
let buf = <Self as TimeBinnerTy>::Output::empty();
Self {
ts1now: TsNano::from_ns(binrange.full_range().beg()),
binrange,
do_time_weight,
emit_empty_bins,
range_complete: false,
buf,
out: <Self as TimeBinnerTy>::Output::empty(),
bins_ready_count: 0,
}
}
}
impl<STY> TimeBinnerTy for BinsDim0TimeBinnerTy<STY>
@@ -313,35 +355,36 @@ where
type Output = BinsDim0<STY>;
fn ingest(&mut self, item: &mut Self::Input) {
todo!()
// item.ts1s;
todo!("TimeBinnerTy::ingest")
}
fn set_range_complete(&mut self) {
todo!()
self.range_complete = true;
}
fn bins_ready_count(&self) -> usize {
todo!()
self.bins_ready_count
}
fn bins_ready(&mut self) -> Option<Self::Output> {
todo!()
todo!("TimeBinnerTy::bins_ready")
}
fn push_in_progress(&mut self, push_empty: bool) {
todo!()
todo!("TimeBinnerTy::push_in_progress")
}
fn cycle(&mut self) {
todo!()
todo!("TimeBinnerTy::cycle")
}
fn empty(&self) -> Option<Self::Output> {
todo!()
todo!("TimeBinnerTy::empty")
}
fn append_empty_until_end(&mut self) {
todo!()
todo!("TimeBinnerTy::append_empty_until_end")
}
}
@@ -354,7 +397,10 @@ impl<STY: ScalarOps> TimeBinnableTy for BinsDim0<STY> {
do_time_weight: bool,
emit_empty_bins: bool,
) -> Self::TimeBinner {
todo!()
match binrange {
BinnedRangeEnum::Time(binrange) => BinsDim0TimeBinnerTy::new(binrange, do_time_weight, emit_empty_bins),
BinnedRangeEnum::Pulse(_) => todo!("TimeBinnableTy for BinsDim0 Pulse"),
}
}
}

View File

@@ -101,14 +101,14 @@ impl<T> Merger<T>
where
T: Mergeable,
{
pub fn new(inps: Vec<MergeInp<T>>, out_max_len: usize) -> Self {
pub fn new(inps: Vec<MergeInp<T>>, out_max_len: Option<u32>) -> Self {
let n = inps.len();
Self {
inps: inps.into_iter().map(|x| Some(x)).collect(),
items: (0..n).into_iter().map(|_| None).collect(),
out: None,
do_clear_out: false,
out_max_len,
out_max_len: out_max_len.unwrap_or(1000) as usize,
range_complete: vec![false; n],
out_of_band_queue: VecDeque::new(),
log_queue: VecDeque::new(),

View File

@@ -86,7 +86,7 @@ fn items_merge_00() {
let v1 = ChannelEvents::Events(evs1);
let stream0 = Box::pin(stream::iter(vec![sitem_data(v0)]));
let stream1 = Box::pin(stream::iter(vec![sitem_data(v1)]));
let mut merger = Merger::new(vec![stream0, stream1], 8);
let mut merger = Merger::new(vec![stream0, stream1], Some(8));
while let Some(item) = merger.next().await {
eprintln!("{item:?}");
}
@@ -109,7 +109,7 @@ fn items_merge_01() {
let stream0 = Box::pin(stream::iter(vec![sitem_data(v0)]));
let stream1 = Box::pin(stream::iter(vec![sitem_data(v1)]));
let stream2 = Box::pin(stream::iter(vec![sitem_data(v2), sitem_data(v3), sitem_data(v4)]));
let mut merger = Merger::new(vec![stream0, stream1, stream2], 8);
let mut merger = Merger::new(vec![stream0, stream1, stream2], Some(8));
let mut total_event_count = 0;
while let Some(item) = merger.next().await {
eprintln!("{item:?}");
@@ -144,7 +144,7 @@ fn items_merge_02() {
let stream0 = Box::pin(stream::iter(vec![sitem_data(v0)]));
let stream1 = Box::pin(stream::iter(vec![sitem_data(v1)]));
let stream2 = Box::pin(stream::iter(vec![sitem_data(v2), sitem_data(v3), sitem_data(v4)]));
let mut merger = Merger::new(vec![stream0, stream1, stream2], 8);
let mut merger = Merger::new(vec![stream0, stream1, stream2], Some(8));
let mut total_event_count = 0;
while let Some(item) = merger.next().await {
eprintln!("{item:?}");
@@ -187,7 +187,7 @@ fn merge_00() {
let inp2: Vec<Sitemty<ChannelEvents>> = Vec::new();
let inp2 = futures_util::stream::iter(inp2);
let inp2 = Box::pin(inp2);
let mut merger = crate::merger::Merger::new(vec![inp1, inp2], 32);
let mut merger = crate::merger::Merger::new(vec![inp1, inp2], Some(32));
// Expect an empty first item.
let item = merger.next().await;
@@ -230,7 +230,7 @@ fn merge_01() {
let inp2: Vec<Sitemty<ChannelEvents>> = Vec::new();
let inp2 = futures_util::stream::iter(inp2);
let inp2 = Box::pin(inp2);
let mut merger = crate::merger::Merger::new(vec![inp1, inp2], 10);
let mut merger = crate::merger::Merger::new(vec![inp1, inp2], Some(10));
// Expect an empty first item.
let item = merger.next().await;
@@ -323,7 +323,7 @@ fn merge_02() {
let inp2: Vec<Sitemty<ChannelEvents>> = inp2_events_a;
let inp2 = futures_util::stream::iter(inp2);
let inp2 = Box::pin(inp2);
let mut merger = crate::merger::Merger::new(vec![inp1, inp2], 10);
let mut merger = crate::merger::Merger::new(vec![inp1, inp2], Some(10));
// Expect an empty first item.
let item = merger.next().await;
@@ -365,7 +365,7 @@ fn bin_00() {
let inp1 = futures_util::stream::iter(inp1);
let inp1 = Box::pin(inp1);
let inp2 = Box::pin(futures_util::stream::empty()) as _;
let stream = crate::merger::Merger::new(vec![inp1, inp2], 32);
let stream = crate::merger::Merger::new(vec![inp1, inp2], Some(32));
let range = NanoRange {
beg: SEC * 0,
end: SEC * 100,
@@ -413,7 +413,7 @@ fn bin_01() {
let inp1 = futures_util::stream::iter(inp1);
let inp1 = Box::pin(inp1);
let inp2 = Box::pin(futures_util::stream::empty()) as _;
let stream = crate::merger::Merger::new(vec![inp1, inp2], 32);
let stream = crate::merger::Merger::new(vec![inp1, inp2], Some(32));
// covering_range result is subject to adjustments, instead, manually choose bin edges
let range = NanoRange {
beg: TSBASE + SEC * 1,