diff --git a/dbconn/src/scan.rs b/dbconn/src/scan.rs index ad209b6..def28c8 100644 --- a/dbconn/src/scan.rs +++ b/dbconn/src/scan.rs @@ -228,7 +228,7 @@ pub async fn update_db_with_channel_names( async move { crate::delay_io_short().await; dbc.query( - "insert into channels (facility, name) values ($1, $2) on conflict do nothing", + "insert into channels (facility, name) select facility, name from (values ($1, $2)) v1 (facility, name) where not exists (select 1 from channels t1 where t1.facility = v1.facility and t1.name = v1.name) on conflict do nothing", &[&fac, &ch], ) .await @@ -408,10 +408,10 @@ pub async fn update_db_with_all_channel_configs( Ok(rx) } -pub async fn update_search_cache(node_config: &NodeConfigCached) -> Result<(), Error> { +pub async fn update_search_cache(node_config: &NodeConfigCached) -> Result { let dbc = create_connection(&node_config.node_config.cluster.database).await?; dbc.query("select update_cache()", &[]).await.err_conv()?; - Ok(()) + Ok(true) } pub enum UpdateChannelConfigResult { diff --git a/items_0/src/timebin.rs b/items_0/src/timebin.rs index 6ed2d6e..e2008f7 100644 --- a/items_0/src/timebin.rs +++ b/items_0/src/timebin.rs @@ -45,6 +45,8 @@ pub trait TimeBinnerTy: fmt::Debug + Send + Unpin { fn cycle(&mut self); fn empty(&self) -> Option; + + fn append_empty_until_end(&mut self); } pub trait TimeBinnableTy: fmt::Debug + WithLen + Send + Sized { @@ -91,7 +93,9 @@ impl TimeBinnable for Box { pub trait TimeBinner: fmt::Debug + Send { fn ingest(&mut self, item: &mut dyn TimeBinnable); + fn bins_ready_count(&self) -> usize; + fn bins_ready(&mut self) -> Option>; /// If there is a bin in progress with non-zero count, push it to the result set. @@ -106,6 +110,8 @@ pub trait TimeBinner: fmt::Debug + Send { fn set_range_complete(&mut self); fn empty(&self) -> Box; + + fn append_empty_until_end(&mut self); } // TODO remove the Any bound. Factor out into custom AsAny trait. @@ -266,6 +272,10 @@ impl TimeBinnerTy for TimeBinnerDynStruct { None } } + + fn append_empty_until_end(&mut self) { + todo!() + } } impl TimeBinner for TimeBinnerDynStruct { @@ -296,6 +306,10 @@ impl TimeBinner for TimeBinnerDynStruct { fn empty(&self) -> Box { todo!() } + + fn append_empty_until_end(&mut self) { + todo!() + } } impl TimeBinnableTy for Box { diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs index fcd08f9..68e5397 100644 --- a/items_2/src/binsdim0.rs +++ b/items_2/src/binsdim0.rs @@ -1,3 +1,5 @@ +use crate::timebin::TimeBinnerCommonV0Func; +use crate::timebin::TimeBinnerCommonV0Trait; use crate::ts_offs_from_abs; use crate::ts_offs_from_abs_with_anchor; use crate::IsoDateTime; @@ -30,9 +32,11 @@ use netpod::is_false; use netpod::log::*; use netpod::range::evrange::SeriesRange; use netpod::timeunits::SEC; +use netpod::BinnedRange; use netpod::BinnedRangeEnum; use netpod::CmpZero; use netpod::Dim0Kind; +use netpod::TsNano; use serde::Deserialize; use serde::Serialize; use std::any; @@ -505,10 +509,7 @@ impl CollectableType for BinsDim0 { pub struct BinsDim0Aggregator { range: SeriesRange, count: u64, - min: NTY, - max: NTY, - // Carry over to next bin: - avg: f32, + minmax: Option<(NTY, NTY)>, sumc: u64, sum: f32, } @@ -518,9 +519,7 @@ impl BinsDim0Aggregator { Self { range, count: 0, - min: NTY::zero_b(), - max: NTY::zero_b(), - avg: 0., + minmax: None, sumc: 0, sum: 0f32, } @@ -536,54 +535,72 @@ impl TimeBinnableTypeAggregator for BinsDim0Aggregator { } fn ingest(&mut self, item: &Self::Input) { - /*for i1 in 0..item.ts1s.len() { - if item.counts[i1] == 0 { - } else if item.ts2s[i1] <= self.range.beg { - } else if item.ts1s[i1] >= self.range.end { + let beg = self.range.beg_u64(); + let end = self.range.end_u64(); + for (((((&ts1, &ts2), &count), min), max), &avg) in item + .ts1s + .iter() + .zip(item.ts2s.iter()) + .zip(item.counts.iter()) + .zip(item.mins.iter()) + .zip(item.maxs.iter()) + .zip(item.avgs.iter()) + { + if count == 0 { + } else if ts2 <= beg { + } else if ts1 >= end { } else { - if self.count == 0 { - self.min = item.mins[i1].clone(); - self.max = item.maxs[i1].clone(); + if let Some((cmin, cmax)) = self.minmax.as_mut() { + if min < cmin { + *cmin = min.clone(); + } + if max > cmax { + *cmax = max.clone(); + } } else { - if self.min > item.mins[i1] { - self.min = item.mins[i1].clone(); - } - if self.max < item.maxs[i1] { - self.max = item.maxs[i1].clone(); - } + self.minmax = Some((min.clone(), max.clone())); } - self.count += item.counts[i1]; - self.sum += item.avgs[i1]; + self.count += count; self.sumc += 1; + self.sum += avg; } - }*/ - todo!() + } } fn result_reset(&mut self, range: SeriesRange) -> Self::Output { - /*if self.sumc > 0 { - self.avg = self.sum / self.sumc as f32; - } + let (min, max) = if let Some((min, max)) = self.minmax.take() { + (min, max) + } else { + (NTY::zero_b(), NTY::zero_b()) + }; + let avg = if self.sumc > 0 { + self.sum / self.sumc as f32 + } else { + NTY::zero_b().as_prim_f32_b() + }; let ret = Self::Output { - ts1s: [self.range.beg].into(), - ts2s: [self.range.end].into(), + ts1s: [self.range.beg_u64()].into(), + ts2s: [self.range.end_u64()].into(), counts: [self.count].into(), - mins: [self.min.clone()].into(), - maxs: [self.max.clone()].into(), - avgs: [self.avg].into(), + mins: [min].into(), + maxs: [max].into(), + avgs: [avg].into(), + // TODO + dim0kind: None, }; self.range = range; self.count = 0; - self.sum = 0f32; + self.minmax = None; self.sumc = 0; - ret*/ - todo!() + self.sum = 0.; + ret } } impl TimeBinnable for BinsDim0 { fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box { - let ret = BinsDim0TimeBinner::::new(binrange, do_time_weight); + // TODO get rid of unwrap + let ret = BinsDim0TimeBinner::::new(binrange, do_time_weight).unwrap(); Box::new(ret) } @@ -596,39 +613,115 @@ impl TimeBinnable for BinsDim0 { #[derive(Debug)] pub struct BinsDim0TimeBinner { binrange: BinnedRangeEnum, - do_time_weight: bool, - agg: Option>, + rix: usize, + rng: Option, + agg: BinsDim0Aggregator, ready: Option< as TimeBinnableTypeAggregator>::Output>, range_final: bool, } impl BinsDim0TimeBinner { - fn new(binrange: BinnedRangeEnum, do_time_weight: bool) -> Self { - Self { + fn type_name() -> &'static str { + any::type_name::() + } + + fn new(binrange: BinnedRangeEnum, do_time_weight: bool) -> Result { + let rng = binrange + .range_at(0) + .ok_or_else(|| Error::with_msg_no_trace("empty binrange"))?; + let agg = BinsDim0Aggregator::new(rng, do_time_weight); + let ret = Self { binrange, - do_time_weight, - agg: None, + rix: 0, + rng: Some(agg.range().clone()), + agg, ready: None, range_final: false, - } + }; + Ok(ret) } fn next_bin_range(&mut self) -> Option { - /*if self.edges.len() >= 2 { - let ret = NanoRange { - beg: self.edges[0], - end: self.edges[1], - }; - self.edges.pop_front(); - Some(ret) + self.rix += 1; + if let Some(rng) = self.binrange.range_at(self.rix) { + trace!("{} next_bin_range {:?}", Self::type_name(), rng); + Some(rng) } else { + trace!("{} next_bin_range None", Self::type_name()); None - }*/ - todo!() + } + } +} + +impl TimeBinnerCommonV0Trait for BinsDim0TimeBinner { + type Input = as TimeBinnableTypeAggregator>::Input; + type Output = as TimeBinnableTypeAggregator>::Output; + + fn type_name() -> &'static str { + Self::type_name() + } + + fn common_bins_ready_count(&self) -> usize { + match &self.ready { + Some(k) => k.len(), + None => 0, + } + } + + fn common_range_current(&self) -> &SeriesRange { + self.agg.range() + } + + fn common_has_more_range(&self) -> bool { + self.rng.is_some() + } + + fn common_next_bin_range(&mut self) -> Option { + self.next_bin_range() + } + + fn common_set_current_range(&mut self, range: Option) { + self.rng = range; + } + fn common_take_or_append_all_from(&mut self, item: Self::Output) { + let mut item = item; + match self.ready.as_mut() { + Some(ready) => { + ready.append_all_from(&mut item); + } + None => { + self.ready = Some(item); + } + } + } + + fn common_result_reset(&mut self, range: Option) -> Self::Output { + // TODO maybe better to wrap the aggregator in Option and remove the whole thing when no more bins? + self.agg.result_reset(range.unwrap_or_else(|| { + SeriesRange::TimeRange(netpod::range::evrange::NanoRange { + beg: u64::MAX, + end: u64::MAX, + }) + })) + } + + fn common_agg_ingest(&mut self, item: &mut Self::Input) { + self.agg.ingest(item) } } impl TimeBinner for BinsDim0TimeBinner { + fn bins_ready_count(&self) -> usize { + TimeBinnerCommonV0Trait::common_bins_ready_count(self) + } + + fn bins_ready(&mut self) -> Option> { + match self.ready.take() { + Some(k) => Some(Box::new(k)), + None => None, + } + } + fn ingest(&mut self, item: &mut dyn TimeBinnable) { /*let self_name = any::type_name::(); if item.len() == 0 { @@ -695,22 +788,7 @@ impl TimeBinner for BinsDim0TimeBinner { } } }*/ - error!("!!!!!!!!!!!!!!!! TODO actually do something"); - todo!() - } - - fn bins_ready_count(&self) -> usize { - match &self.ready { - Some(k) => k.len(), - None => 0, - } - } - - fn bins_ready(&mut self) -> Option> { - match self.ready.take() { - Some(k) => Some(Box::new(k)), - None => None, - } + TimeBinnerCommonV0Func::ingest(self, item) } // TODO there is too much common code between implementors: @@ -733,7 +811,7 @@ impl TimeBinner for BinsDim0TimeBinner { } } }*/ - todo!() + TimeBinnerCommonV0Func::push_in_progress(self, push_empty) } // TODO there is too much common code between implementors: @@ -759,7 +837,7 @@ impl TimeBinner for BinsDim0TimeBinner { warn!("cycle: no in-progress bin pushed, but also no more bin to add as zero-bin"); } }*/ - todo!() + TimeBinnerCommonV0Func::cycle(self) } fn set_range_complete(&mut self) { @@ -770,6 +848,12 @@ impl TimeBinner for BinsDim0TimeBinner { let ret = as TimeBinnableTypeAggregator>::Output::empty(); Box::new(ret) } + + fn append_empty_until_end(&mut self) { + while self.common_has_more_range() { + TimeBinnerCommonV0Func::push_in_progress(self, true); + } + } } impl TimeBinned for BinsDim0 { @@ -808,7 +892,7 @@ impl TimeBinned for BinsDim0 { } fn validate(&self) -> Result<(), String> { - use std::fmt::Write; + use fmt::Write; let mut msg = String::new(); if self.ts1s.len() != self.ts2s.len() { write!(&mut msg, "ts1s ≠ ts2s\n").unwrap(); @@ -829,3 +913,141 @@ impl TimeBinned for BinsDim0 { self } } + +#[test] +fn bins_timebin_fill_empty_00() { + let mut bins = BinsDim0::::empty(); + let binrange = BinnedRangeEnum::Time(BinnedRange { + bin_len: TsNano(SEC * 2), + bin_off: 9, + bin_cnt: 5, + }); + let do_time_weight = true; + let mut binner = bins.as_time_binnable_dyn().time_binner_new(binrange, do_time_weight); + binner.ingest(&mut bins); + binner.append_empty_until_end(); + let ready = binner.bins_ready(); + let got = ready.unwrap(); + let got: &BinsDim0 = got.as_any_ref().downcast_ref().unwrap(); + let mut exp = BinsDim0::empty(); + for i in 0..5 { + exp.push(SEC * 2 * (9 + i), SEC * 2 * (10 + i), 0, 0, 0, 0.); + } + assert_eq!(got, &exp); +} + +#[test] +fn bins_timebin_fill_empty_01() { + let mut bins = BinsDim0::::empty(); + let binrange = BinnedRangeEnum::Time(BinnedRange { + bin_len: TsNano(SEC * 2), + bin_off: 9, + bin_cnt: 5, + }); + let do_time_weight = true; + let mut binner = bins.as_time_binnable_dyn().time_binner_new(binrange, do_time_weight); + binner.ingest(&mut bins); + binner.push_in_progress(true); + binner.append_empty_until_end(); + let ready = binner.bins_ready(); + let got = ready.unwrap(); + let got: &BinsDim0 = got.as_any_ref().downcast_ref().unwrap(); + let mut exp = BinsDim0::empty(); + for i in 0..5 { + exp.push(SEC * 2 * (9 + i), SEC * 2 * (10 + i), 0, 0, 0, 0.); + } + assert_eq!(got, &exp); +} + +#[test] +fn bins_timebin_push_empty_00() { + let mut bins = BinsDim0::::empty(); + let binrange = BinnedRangeEnum::Time(BinnedRange { + bin_len: TsNano(SEC * 2), + bin_off: 9, + bin_cnt: 5, + }); + let do_time_weight = true; + let mut binner = bins.as_time_binnable_dyn().time_binner_new(binrange, do_time_weight); + binner.ingest(&mut bins); + binner.push_in_progress(true); + let ready = binner.bins_ready(); + let got = ready.unwrap(); + let got: &BinsDim0 = got.as_any_ref().downcast_ref().unwrap(); + let mut exp = BinsDim0::empty(); + for i in 0..1 { + exp.push(SEC * 2 * (9 + i), SEC * 2 * (10 + i), 0, 0, 0, 0.); + } + assert_eq!(got, &exp); +} + +#[test] +fn bins_timebin_push_empty_01() { + let mut bins = BinsDim0::::empty(); + let binrange = BinnedRangeEnum::Time(BinnedRange { + bin_len: TsNano(SEC * 2), + bin_off: 9, + bin_cnt: 5, + }); + let do_time_weight = true; + let mut binner = bins.as_time_binnable_dyn().time_binner_new(binrange, do_time_weight); + binner.ingest(&mut bins); + binner.push_in_progress(true); + binner.push_in_progress(true); + binner.push_in_progress(true); + let ready = binner.bins_ready(); + let got = ready.unwrap(); + let got: &BinsDim0 = got.as_any_ref().downcast_ref().unwrap(); + let mut exp = BinsDim0::empty(); + for i in 0..3 { + exp.push(SEC * 2 * (9 + i), SEC * 2 * (10 + i), 0, 0, 0, 0.); + } + assert_eq!(got, &exp); +} + +#[test] +fn bins_timebin_ingest_only_before() { + let mut bins = BinsDim0::::empty(); + bins.push(SEC * 2, SEC * 4, 3, 7, 9, 8.1); + bins.push(SEC * 4, SEC * 6, 3, 6, 9, 8.2); + let binrange = BinnedRangeEnum::Time(BinnedRange { + bin_len: TsNano(SEC * 2), + bin_off: 9, + bin_cnt: 5, + }); + let do_time_weight = true; + let mut binner = bins.as_time_binnable_dyn().time_binner_new(binrange, do_time_weight); + binner.ingest(&mut bins); + binner.push_in_progress(true); + let ready = binner.bins_ready(); + let got = ready.unwrap(); + let got: &BinsDim0 = got.as_any_ref().downcast_ref().unwrap(); + let mut exp = BinsDim0::empty(); + exp.push(SEC * 18, SEC * 20, 0, 0, 0, 0.); + assert_eq!(got, &exp); +} + +#[test] +fn bins_timebin_ingest_00() { + let mut bins = BinsDim0::::empty(); + bins.push(SEC * 20, SEC * 21, 3, 70, 94, 82.); + bins.push(SEC * 21, SEC * 22, 5, 71, 93, 86.); + bins.push(SEC * 23, SEC * 24, 6, 72, 92, 81.); + let binrange = BinnedRangeEnum::Time(BinnedRange { + bin_len: TsNano(SEC * 2), + bin_off: 9, + bin_cnt: 5, + }); + let do_time_weight = true; + let mut binner = bins.as_time_binnable_dyn().time_binner_new(binrange, do_time_weight); + binner.ingest(&mut bins); + binner.push_in_progress(true); + let ready = binner.bins_ready(); + let got = ready.unwrap(); + let got: &BinsDim0 = got.as_any_ref().downcast_ref().unwrap(); + let mut exp = BinsDim0::empty(); + exp.push(SEC * 18, SEC * 20, 0, 0, 0, 0.); + exp.push(SEC * 20, SEC * 22, 8, 70, 94, 84.); + exp.push(SEC * 22, SEC * 24, 6, 72, 92, 81.); + assert_eq!(got, &exp); +} diff --git a/items_2/src/binsxbindim0.rs b/items_2/src/binsxbindim0.rs index 99b8f6d..13faa7b 100644 --- a/items_2/src/binsxbindim0.rs +++ b/items_2/src/binsxbindim0.rs @@ -742,6 +742,14 @@ impl TimeBinner for BinsXbinDim0TimeBinner { let ret = as TimeBinnableTypeAggregator>::Output::empty(); Box::new(ret) } + + fn append_empty_until_end(&mut self) { + // TODO + todo!(); + /*while self.rng.is_some() { + TimeBinnerCommonV0Func::push_in_progress(self, true); + }*/ + } } impl TimeBinned for BinsXbinDim0 { diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index a3d0552..8086ef4 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -891,6 +891,13 @@ impl TimeBinnerTy for ChannelEventsTimeBinner { None => None, } } + + fn append_empty_until_end(&mut self) { + match self.binner.as_mut() { + Some(binner) => binner.append_empty_until_end(), + None => panic!(), + } + } } impl TimeBinner for ChannelEventsTimeBinner { @@ -928,6 +935,13 @@ impl TimeBinner for ChannelEventsTimeBinner { None => panic!("TODO TimeBinner::empty for ChannelEventsTimeBinner"), } } + + fn append_empty_until_end(&mut self) { + match self.binner.as_mut() { + Some(binner) => binner.append_empty_until_end(), + None => panic!(), + } + } } impl TimeBinnableTy for ChannelEvents { diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 3d1ceba..5a28579 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -726,6 +726,7 @@ impl TimeBinnableTypeAggregator for EventsDim0Aggregator { impl TimeBinnable for EventsDim0 { fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box { + // TODO get rid of unwrap let ret = EventsDim0TimeBinner::::new(binrange, do_time_weight).unwrap(); Box::new(ret) } @@ -940,7 +941,7 @@ impl EventsDim0TimeBinner { let ret = Self { binrange, rix: 0, - rng: Some(agg.range.clone()), + rng: Some(agg.range().clone()), agg, ready: None, range_final: false, @@ -968,6 +969,13 @@ impl TimeBinnerCommonV0Trait for EventsDim0TimeBinner { Self::type_name() } + fn common_bins_ready_count(&self) -> usize { + match &self.ready { + Some(k) => k.len(), + None => 0, + } + } + fn common_range_current(&self) -> &SeriesRange { self.agg.range() } @@ -976,18 +984,16 @@ impl TimeBinnerCommonV0Trait for EventsDim0TimeBinner { self.rng.is_some() } - fn common_bins_ready_count(&self) -> usize { - match &self.ready { - Some(k) => k.len(), - None => 0, - } - } - fn common_next_bin_range(&mut self) -> Option { self.next_bin_range() } - fn common_take_or_append_all_from(&mut self, mut item: Self::Output) { + fn common_set_current_range(&mut self, range: Option) { + self.rng = range; + } + + fn common_take_or_append_all_from(&mut self, item: Self::Output) { + let mut item = item; match self.ready.as_mut() { Some(ready) => { ready.append_all_from(&mut item); @@ -998,8 +1004,13 @@ impl TimeBinnerCommonV0Trait for EventsDim0TimeBinner { } } - fn common_result_reset(&mut self, range: SeriesRange) -> Self::Output { - self.agg.result_reset(range) + fn common_result_reset(&mut self, range: Option) -> Self::Output { + self.agg.result_reset(range.unwrap_or_else(|| { + SeriesRange::TimeRange(netpod::range::evrange::NanoRange { + beg: u64::MAX, + end: u64::MAX, + }) + })) } fn common_agg_ingest(&mut self, item: &mut Self::Input) { @@ -1039,6 +1050,10 @@ impl TimeBinner for EventsDim0TimeBinner { let ret = as TimeBinnableTypeAggregator>::Output::empty(); Box::new(ret) } + + fn append_empty_until_end(&mut self) { + // nothing to do for events + } } impl Appendable for EventsDim0 diff --git a/items_2/src/eventsdim1.rs b/items_2/src/eventsdim1.rs index cbd16b0..09d18a1 100644 --- a/items_2/src/eventsdim1.rs +++ b/items_2/src/eventsdim1.rs @@ -1062,6 +1062,10 @@ impl TimeBinner for EventsDim1TimeBinner { let ret = as TimeBinnableTypeAggregator>::Output::empty(); Box::new(ret) } + + fn append_empty_until_end(&mut self) { + // nothing to do for events + } } impl Appendable> for EventsDim1 diff --git a/items_2/src/eventsxbindim0.rs b/items_2/src/eventsxbindim0.rs index 58ac82c..e269a37 100644 --- a/items_2/src/eventsxbindim0.rs +++ b/items_2/src/eventsxbindim0.rs @@ -574,6 +574,10 @@ impl TimeBinner for EventsXbinDim0TimeBinner { let ret = as TimeBinnableTypeAggregator>::Output::empty(); Box::new(ret) } + + fn append_empty_until_end(&mut self) { + // nothing to do for events + } } impl TimeBinnableType for EventsXbinDim0 diff --git a/items_2/src/timebin.rs b/items_2/src/timebin.rs index 9cb76a9..63bf142 100644 --- a/items_2/src/timebin.rs +++ b/items_2/src/timebin.rs @@ -1,16 +1,10 @@ -use crate::eventsdim0::EventsDim0TimeBinner; -use items_0::overlap::HasTimestampDeque; use items_0::overlap::RangeOverlapInfo; -use items_0::scalar_ops::ScalarOps; use items_0::timebin::TimeBinnable; use items_0::AppendEmptyBin; -use items_0::Appendable; use items_0::Empty; -use items_0::Events; use items_0::HasNonemptyFirstBin; use items_0::WithLen; use netpod::log::*; -use netpod::range::evrange::NanoRange; use netpod::range::evrange::SeriesRange; use std::any; use std::collections::VecDeque; @@ -40,10 +34,11 @@ pub trait TimeBinnerCommonV0Trait { fn type_name() -> &'static str; fn common_bins_ready_count(&self) -> usize; fn common_range_current(&self) -> &SeriesRange; - fn common_next_bin_range(&mut self) -> Option; fn common_has_more_range(&self) -> bool; + fn common_next_bin_range(&mut self) -> Option; + fn common_set_current_range(&mut self, range: Option); fn common_take_or_append_all_from(&mut self, item: Self::Output); - fn common_result_reset(&mut self, range: SeriesRange) -> Self::Output; + fn common_result_reset(&mut self, range: Option) -> Self::Output; fn common_agg_ingest(&mut self, item: &mut Self::Input); } @@ -136,19 +131,8 @@ impl TimeBinnerCommonV0Func { // the rest of the time range. if B::common_has_more_range(binner) { let range_next = TimeBinnerCommonV0Trait::common_next_bin_range(binner); - let bins = if let Some(range_next) = range_next { - TimeBinnerCommonV0Trait::common_result_reset(binner, range_next) - //self.agg.result_reset(range_next, expand) - } else { - // Acts as placeholder - // TODO clean up - let range_next = NanoRange { - beg: u64::MAX - 1, - end: u64::MAX, - }; - TimeBinnerCommonV0Trait::common_result_reset(binner, range_next.into()) - //self.agg.result_reset(range_next.into(), expand) - }; + B::common_set_current_range(binner, range_next.clone()); + let bins = TimeBinnerCommonV0Trait::common_result_reset(binner, range_next); if bins.len() != 1 { error!("{self_name}::push_in_progress bins.len() {}", bins.len()); return; @@ -171,6 +155,7 @@ impl TimeBinnerCommonV0Func { TimeBinnerCommonV0Func::push_in_progress(binner, true); if TimeBinnerCommonV0Trait::common_bins_ready_count(binner) == n { let range_next = TimeBinnerCommonV0Trait::common_next_bin_range(binner); + B::common_set_current_range(binner, range_next.clone()); if let Some(range) = range_next { let mut bins = ::Output::empty(); if range.is_time() { diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 357520c..8ea0be1 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -1526,9 +1526,10 @@ pub struct BinnedRange where T: Dim0Index, { - bin_len: T, - bin_off: u64, - bin_cnt: u64, + // TODO remove pub, which is currently used in tests + pub bin_len: T, + pub bin_off: u64, + pub bin_cnt: u64, } impl fmt::Debug for BinnedRange diff --git a/streams/src/timebin.rs b/streams/src/timebin.rs index 9a0413e..59ac179 100644 --- a/streams/src/timebin.rs +++ b/streams/src/timebin.rs @@ -18,20 +18,20 @@ use std::task::Poll; #[allow(unused)] macro_rules! trace2 { - ($($arg:tt)*) => (); - ($($arg:tt)*) => (trace!($($arg)*)); + ($($arg:tt)*) => {}; + ($($arg:tt)*) => { trace!($($arg)*) }; } #[allow(unused)] macro_rules! trace3 { - ($($arg:tt)*) => (); - ($($arg:tt)*) => (trace!($($arg)*)); + ($($arg:tt)*) => {}; + ($($arg:tt)*) => { trace!($($arg)*) }; } #[allow(unused)] macro_rules! trace4 { - ($($arg:tt)*) => (); - ($($arg:tt)*) => (trace!($($arg)*)); + ($($arg:tt)*) => {}; + ($($arg:tt)*) => { trace!($($arg)*) }; } type MergeInp = Pin> + Send>>;