diff --git a/daqbufp2/src/test/api4/binnedjson.rs b/daqbufp2/src/test/api4/binnedjson.rs index 356b01d..ffd2c8d 100644 --- a/daqbufp2/src/test/api4/binnedjson.rs +++ b/daqbufp2/src/test/api4/binnedjson.rs @@ -90,13 +90,77 @@ fn binned_d0_json_00() -> Result<(), Error> { let a2: Vec<_> = (0..8).into_iter().map(|x| 2409 + 10 * x).collect(); assert_eq!(a1, a2); } + { + let a1: Vec<_> = res.avgs().iter().map(|x| *x).collect(); + let a2: Vec<_> = (0..8).into_iter().map(|x| 2404.5 + 10. * x as f32).collect(); + assert_eq!(f32_iter_cmp_near(a1, a2, 0.01, 0.01), true); + } Ok(()) }; taskrun::run(fut) } #[test] -fn binned_d0_json_01() -> Result<(), Error> { +fn binned_d0_json_01a() -> Result<(), Error> { + let fut = async { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + let jsv = get_binned_json( + Channel { + backend: TEST_BACKEND.into(), + name: "test-gen-i32-dim0-v01".into(), + series: None, + }, + "1970-01-01T00:20:10.000Z", + "1970-01-01T00:40:30.000Z", + 10, + cluster, + ) + .await?; + debug!("Receveided a response json value: {jsv:?}"); + let res: BinsDim0CollectedResult = serde_json::from_value(jsv)?; + // inmem was meant just for functional test, ignores the requested time range + assert_eq!(res.range_final(), true); + assert_eq!(res.len(), 11); + assert_eq!(res.ts_anchor_sec(), 1200); + let nb = res.len(); + { + let a1: Vec<_> = res.ts1_off_ms().iter().map(|x| *x).collect(); + let a2: Vec<_> = (0..nb as _).into_iter().map(|x| 120 * 1000 * x).collect(); + assert_eq!(a1, a2); + } + { + let a1: Vec<_> = res.ts2_off_ms().iter().map(|x| *x).collect(); + let a2: Vec<_> = (0..nb as _).into_iter().map(|x| 120 * 1000 * (1 + x)).collect(); + assert_eq!(a1, a2); + } + { + let a1: Vec<_> = res.counts().iter().map(|x| *x).collect(); + let a2: Vec<_> = (0..nb as _).into_iter().map(|_| 240).collect(); + assert_eq!(a1, a2); + } + { + let a1: Vec<_> = res.mins().iter().map(|x| *x).collect(); + let a2: Vec<_> = (0..nb as _).into_iter().map(|x| 2400 + 240 * x).collect(); + assert_eq!(a1, a2); + } + { + let a1: Vec<_> = res.maxs().iter().map(|x| *x).collect(); + let a2: Vec<_> = (0..nb as _).into_iter().map(|x| 2639 + 240 * x).collect(); + assert_eq!(a1, a2); + } + { + let a1: Vec<_> = res.avgs().iter().map(|x| *x).collect(); + let a2: Vec<_> = (0..nb).into_iter().map(|x| 2520. + 240. * x as f32).collect(); + assert_eq!(f32_iter_cmp_near(a1, a2, 0.001, 0.001), true); + } + Ok(()) + }; + taskrun::run(fut) +} + +#[test] +fn binned_d0_json_01b() -> Result<(), Error> { let fut = async { let rh = require_test_hosts_running()?; let cluster = &rh.cluster; @@ -144,6 +208,11 @@ fn binned_d0_json_01() -> Result<(), Error> { let a2: Vec<_> = (0..nb as _).into_iter().map(|x| 2999 + 600 * x).collect(); assert_eq!(a1, a2); } + { + let a1: Vec<_> = res.avgs().iter().map(|x| *x).collect(); + let a2: Vec<_> = (0..nb).into_iter().map(|x| 2700. + 600. * x as f32).collect(); + assert_eq!(f32_iter_cmp_near(a1, a2, 0.001, 0.001), true); + } Ok(()) }; taskrun::run(fut) @@ -422,7 +491,8 @@ async fn get_binned_json( let beg_date = beg_date.parse()?; let end_date = end_date.parse()?; let range = NanoRange::from_date_time(beg_date, end_date).into(); - let query = BinnedQuery::new(channel, range, bin_count).for_time_weighted_scalar(); + let mut query = BinnedQuery::new(channel, range, bin_count).for_time_weighted_scalar(); + query.merger_out_len_max = Some(240); let hp = HostPort::from_node(node0); let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?; query.append_to_url(&mut url); diff --git a/items_0/src/overlap.rs b/items_0/src/overlap.rs index 6bde8da..5d62ce4 100644 --- a/items_0/src/overlap.rs +++ b/items_0/src/overlap.rs @@ -49,13 +49,13 @@ macro_rules! impl_range_overlap_info_events { fn ends_after(&self, range: &SeriesRange) -> bool { if range.is_time() { if let Some(max) = HasTimestampDeque::timestamp_max(self) { - max >= range.beg_u64() + max >= range.end_u64() } else { true } } else if range.is_pulse() { if let Some(max) = HasTimestampDeque::pulse_max(self) { - max >= range.beg_u64() + max >= range.end_u64() } else { true } diff --git a/items_0/src/timebin.rs b/items_0/src/timebin.rs index f76e35c..b6a321e 100644 --- a/items_0/src/timebin.rs +++ b/items_0/src/timebin.rs @@ -195,6 +195,10 @@ pub struct TimeBinnerDynStruct { } impl TimeBinnerDynStruct { + pub fn type_name() -> &'static str { + std::any::type_name::() + } + pub fn new(binrange: BinnedRangeEnum, do_time_weight: bool, binner: Box) -> Self { Self { binrange, @@ -209,6 +213,7 @@ impl TimeBinnerTy for TimeBinnerDynStruct { type Output = Box; fn ingest(&mut self, item: &mut Self::Input) { + info!("{} INGEST", Self::type_name()); if self.binner.is_none() { self.binner = Some(Box::new(TimeBinnableTy::time_binner_new( item, diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index bb207ae..4219502 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -807,9 +807,15 @@ pub struct ChannelEventsTimeBinner { binner: Option>, } +impl ChannelEventsTimeBinner { + pub fn type_name() -> &'static str { + std::any::type_name::() + } +} + impl fmt::Debug for ChannelEventsTimeBinner { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("ChannelEventsTimeBinner") + fmt.debug_struct(Self::type_name()) .field("binrange", &self.binrange) .field("do_time_weight", &self.do_time_weight) .field("conn_state", &self.conn_state) @@ -824,6 +830,7 @@ impl TimeBinnerTy for ChannelEventsTimeBinner { type Output = Box; fn ingest(&mut self, item: &mut Self::Input) { + info!("{} INGEST", Self::type_name()); match item { ChannelEvents::Events(item) => { if self.binner.is_none() { diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 479e2b6..333b3c9 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -36,6 +36,7 @@ use netpod::range::evrange::SeriesRange; use netpod::timeunits::MS; use netpod::timeunits::SEC; use netpod::BinnedRangeEnum; +use netpod::TsNano; use serde::Deserialize; use serde::Serialize; use std::any; @@ -46,18 +47,20 @@ use std::mem; #[allow(unused)] macro_rules! trace_ingest { - ($($arg:tt)*) => { - //let _ = ($($arg)*); - //trace!($($arg)*); - }; + ($($arg:tt)*) => {}; + ($($arg:tt)*) => { trace!($($arg)*); }; +} + +#[allow(unused)] +macro_rules! trace_ingest_item { + (e$($arg:tt)*) => {}; + ($($arg:tt)*) => { trace!($($arg)*); }; } #[allow(unused)] macro_rules! trace2 { - ($($arg:tt)*) => { - //let _ = ($($arg)*,); - //trace!($($arg)*); - }; + ($($arg:tt)*) => {}; + ($($arg:tt)*) => { trace!($($arg)*); }; } #[derive(Clone, PartialEq, Serialize, Deserialize)] @@ -428,16 +431,15 @@ impl items_0::collect_s::CollectableType for EventsDim0 { pub struct EventsDim0Aggregator { range: SeriesRange, count: u64, - min: STY, - max: STY, + minmax: Option<(STY, STY)>, sumc: u64, sum: f32, int_ts: u64, last_ts: u64, last_val: Option, - did_min_max: bool, do_time_weight: bool, events_ignored_count: u64, + items_seen: usize, } impl Drop for EventsDim0Aggregator { @@ -448,8 +450,8 @@ impl Drop for EventsDim0Aggregator { } impl EventsDim0Aggregator { - fn self_name() -> String { - any::type_name::().to_string() + fn type_name() -> &'static str { + any::type_name::() } pub fn new(range: SeriesRange, do_time_weight: bool) -> Self { @@ -457,41 +459,37 @@ impl EventsDim0Aggregator { Self { range, count: 0, - min: STY::zero_b(), - max: STY::zero_b(), + minmax: None, sumc: 0, sum: 0., int_ts, last_ts: 0, last_val: None, - did_min_max: false, do_time_weight, events_ignored_count: 0, + items_seen: 0, } } // TODO reduce clone.. optimize via more traits to factor the trade-offs? fn apply_min_max(&mut self, val: STY) { trace_ingest!( - "apply_min_max val {:?} last_val {:?} count {} sumc {:?} min {:?} max {:?}", + "apply_min_max val {:?} last_val {:?} count {} sumc {:?} minmax {:?}", val, - self.last_seen_val, + self.last_val, self.count, self.sumc, - self.min, - self.max + self.minmax, ); - if self.did_min_max == false { - self.did_min_max = true; - self.min = val.clone(); - self.max = val.clone(); + if let Some((min, max)) = self.minmax.as_mut() { + if *min > val { + *min = val.clone(); + } + if *max < val { + *max = val.clone(); + } } else { - if self.min > val { - self.min = val.clone(); - } - if self.max < val { - self.max = val.clone(); - } + self.minmax = Some((val.clone(), val.clone())); } } @@ -511,12 +509,20 @@ impl EventsDim0Aggregator { if let Some(v) = &self.last_val { trace_ingest!("apply_event_time_weight with v {v:?}"); let vf = v.as_prim_f32_b(); - if px > self.range.beg_u64() { - let v2 = v.clone(); - self.apply_min_max(v2); - } + let v2 = v.clone(); + self.apply_min_max(v2); self.sumc += 1; let w = (px - self.int_ts) as f32 * 1e-9; + if false { + trace!( + "int_ts {:10} px {:8} w {:8.1} vf {:8.1} sum {:8.1}", + self.int_ts / MS, + px / MS, + w, + vf, + self.sum + ); + } if vf.is_nan() { } else { self.sum += vf * w; @@ -552,11 +558,16 @@ impl EventsDim0Aggregator { fn ingest_time_weight(&mut self, item: &::Input) { let self_name = any::type_name::(); - trace_ingest!("{self_name}::ingest_time_weight item len {}", item.len()); + trace_ingest!( + "{self_name}::ingest_time_weight item len {} items_seen {}", + item.len(), + self.items_seen + ); + self.items_seen += 1; if self.range.is_time() { let range_beg = self.range.beg_u64(); let range_end = self.range.end_u64(); - for (&ts, val) in item.tss.iter().zip(item.values.iter()) { + for (i1, (&ts, val)) in item.tss.iter().zip(item.values.iter()).enumerate() { if ts >= range_end { trace_ingest!("{self_name} ingest {:6} {:20} {:10?} AFTER", i1, ts, val); self.events_ignored_count += 1; @@ -564,7 +575,9 @@ impl EventsDim0Aggregator { break; } else if ts >= range_beg { trace_ingest!("{self_name} ingest {:6} {:20} {:10?} INSIDE", i1, ts, val); - self.apply_event_time_weight(ts); + if ts > range_beg { + self.apply_event_time_weight(ts); + } self.count += 1; self.last_ts = ts; self.last_val = Some(val.clone()); @@ -584,15 +597,15 @@ impl EventsDim0Aggregator { fn result_reset_unweight(&mut self, range: SeriesRange) -> BinsDim0 { trace!("TODO check again result_reset_unweight"); err::todo(); - let (min, max, avg) = if self.sumc > 0 { - let avg = self.sum / self.sumc as f32; - (self.min.clone(), self.max.clone(), avg) + let (min, max) = if let Some((min, max)) = self.minmax.take() { + (min, max) } else { - let g = match &self.last_val { - Some(x) => x.clone(), - None => STY::zero_b(), - }; - (g.clone(), g.clone(), g.as_prim_f32_b()) + (STY::zero_b(), STY::zero_b()) + }; + let avg = if self.sumc > 0 { + self.sum / self.sumc as f32 + } else { + STY::zero_b().as_prim_f32_b() }; let ret = if self.range.is_time() { BinsDim0 { @@ -609,19 +622,22 @@ impl EventsDim0Aggregator { err::todoval() }; self.int_ts = range.beg_u64(); + trace!("ON RESET SET int_ts {:10}", self.int_ts); self.range = range; self.count = 0; self.sum = 0.; self.sumc = 0; - self.min = STY::zero_b(); - self.max = STY::zero_b(); - self.did_min_max = false; + self.minmax = None; + self.items_seen = 0; ret } fn result_reset_time_weight(&mut self, range: SeriesRange) -> BinsDim0 { // TODO check callsite for correct expand status. - debug!("result_reset_time_weight calls apply_event_time_weight"); + debug!( + "result_reset_time_weight calls apply_event_time_weight range {:?} items_seen {} count {}", + self.range, self.items_seen, self.count + ); let range_beg = self.range.beg_u64(); let range_end = self.range.end_u64(); if self.range.is_time() { @@ -630,15 +646,19 @@ impl EventsDim0Aggregator { error!("TODO result_reset_time_weight"); err::todoval() } - let (min, max, avg) = if self.sumc > 0 { - let avg = self.sum / (self.range.delta_u64() as f32 * 1e-9); - (self.min.clone(), self.max.clone(), avg) + let (min, max) = if let Some((min, max)) = self.minmax.take() { + (min, max) } else { - let g = match &self.last_val { - Some(x) => x.clone(), - None => STY::zero_b(), - }; - (g.clone(), g.clone(), g.as_prim_f32_b()) + (STY::zero_b(), STY::zero_b()) + }; + let avg = if self.sumc > 0 { + self.sum / (self.range.delta_u64() as f32 * 1e-9) + } else { + if let Some(v) = self.last_val.as_ref() { + v.as_prim_f32_b() + } else { + STY::zero_b().as_prim_f32_b() + } }; let ret = if self.range.is_time() { BinsDim0 { @@ -654,14 +674,13 @@ impl EventsDim0Aggregator { error!("TODO result_reset_time_weight"); err::todoval() }; - self.int_ts = range_beg; + self.int_ts = range.beg_u64(); self.range = range; self.count = 0; self.sumc = 0; self.sum = 0.; - self.min = STY::zero_b(); - self.max = STY::zero_b(); - self.did_min_max = false; + self.minmax = None; + self.items_seen = 0; ret } } @@ -676,11 +695,11 @@ impl TimeBinnableTypeAggregator for EventsDim0Aggregator { fn ingest(&mut self, item: &Self::Input) { if true { - trace_ingest!("{} ingest {} events", std::any::type_name::(), item.len()); + trace_ingest!("{} ingest {} events", Self::type_name(), item.len()); } if false { for (i, &ts) in item.tss.iter().enumerate() { - trace_ingest!("{} ingest {:6} {:20}", std::any::type_name::(), i, ts); + trace_ingest!("{} ingest {:6} {:20}", Self::type_name(), i, ts); } } if self.do_time_weight { @@ -902,12 +921,12 @@ pub struct EventsDim0TimeBinner { } impl EventsDim0TimeBinner { - fn self_name() -> &'static str { + fn type_name() -> &'static str { any::type_name::() } fn new(binrange: BinnedRangeEnum, do_time_weight: bool) -> Result { - trace!("{}::new binrange {binrange:?}", Self::self_name()); + trace!("{}::new binrange {binrange:?}", Self::type_name()); let rng = binrange .range_at(0) .ok_or_else(|| Error::with_msg_no_trace("empty binrange"))?; @@ -924,13 +943,12 @@ impl EventsDim0TimeBinner { } fn next_bin_range(&mut self) -> Option { - let self_name = any::type_name::(); self.rix += 1; if let Some(rng) = self.binrange.range_at(self.rix) { - trace!("{self_name} next_bin_range {:?}", rng); + trace!("{} next_bin_range {:?}", Self::type_name(), rng); Some(rng) } else { - trace!("{self_name} next_bin_range None"); + trace!("{} next_bin_range None", Self::type_name()); None } } @@ -952,9 +970,10 @@ impl TimeBinner for EventsDim0TimeBinner { } fn ingest(&mut self, item: &mut dyn TimeBinnable) { - let self_name = any::type_name::(); - trace2!( - "TimeBinner for {self_name} ingest agg.range {:?} item {:?}", + let self_name = Self::type_name(); + trace_ingest_item!( + "TimeBinner for {} ingest agg.range {:?} item {:?}", + Self::type_name(), self.agg.range(), item ); @@ -966,7 +985,7 @@ impl TimeBinner for EventsDim0TimeBinner { // That needs modified interfaces which can take and yield the start and latest index. loop { while item.starts_after(self.agg.range()) { - trace!("{self_name} IGNORE ITEM AND CYCLE BECAUSE item.starts_after"); + trace_ingest_item!("{self_name} IGNORE ITEM AND CYCLE BECAUSE item.starts_after"); self.cycle(); if self.rng.is_none() { warn!("{self_name} no more bin in edges B"); @@ -974,11 +993,11 @@ impl TimeBinner for EventsDim0TimeBinner { } } if item.ends_before(self.agg.range()) { - trace!("{self_name} IGNORE ITEM BECAUSE ends_before"); + trace_ingest_item!("{self_name} IGNORE ITEM BECAUSE ends_before"); return; } else { if self.rng.is_none() { - trace!("{self_name} no more bin in edges D"); + trace_ingest_item!("{self_name} no more bin in edges D"); return; } else { if let Some(item) = item @@ -987,19 +1006,19 @@ impl TimeBinner for EventsDim0TimeBinner { .downcast_ref::< as TimeBinnableTypeAggregator>::Input>() { // TODO collect statistics associated with this request: - trace_ingest!("{self_name} FEED THE ITEM..."); + trace_ingest_item!("{self_name} FEED THE ITEM..."); self.agg.ingest(item); if item.ends_after(self.agg.range()) { - trace_ingest!("{self_name} FED ITEM, ENDS AFTER."); + trace_ingest_item!("{self_name} FED ITEM, ENDS AFTER agg-range {:?}", self.agg.range()); self.cycle(); if self.rng.is_none() { warn!("{self_name} no more bin in edges C"); return; } else { - trace_ingest!("{self_name} FED ITEM, CYCLED, CONTINUE."); + trace_ingest_item!("{self_name} FED ITEM, CYCLED, CONTINUE."); } } else { - trace_ingest!("{self_name} FED ITEM."); + trace_ingest_item!("{self_name} FED ITEM."); break; } } else { @@ -1012,7 +1031,7 @@ impl TimeBinner for EventsDim0TimeBinner { fn push_in_progress(&mut self, push_empty: bool) { let self_name = any::type_name::(); - trace!("{self_name}::push_in_progress push_empty {push_empty}"); + trace_ingest_item!("{self_name}::push_in_progress push_empty {push_empty}"); // TODO expand should be derived from AggKind. Is it still required after all? // TODO here, the expand means that agg will assume that the current value is kept constant during // the rest of the time range. @@ -1051,7 +1070,7 @@ impl TimeBinner for EventsDim0TimeBinner { fn cycle(&mut self) { let self_name = any::type_name::(); - trace!("{self_name}::cycle"); + trace_ingest_item!("{self_name}::cycle"); // TODO refactor this logic. let n = self.bins_ready_count(); self.push_in_progress(true); @@ -1212,6 +1231,58 @@ mod test_serde_opt { } } +#[test] +fn overlap_info_00() { + let mut ev1 = EventsDim0::empty(); + ev1.push(MS * 1200, 3, 1.2f32); + ev1.push(MS * 3200, 3, 3.2f32); + let range = SeriesRange::TimeRange(NanoRange { + beg: MS * 1000, + end: MS * 2000, + }); + assert_eq!(ev1.ends_after(&range), true); +} + +#[test] +fn overlap_info_01() { + let mut ev1 = EventsDim0::empty(); + ev1.push(MS * 1200, 3, 1.2f32); + ev1.push(MS * 1400, 3, 3.2f32); + let range = SeriesRange::TimeRange(NanoRange { + beg: MS * 1000, + end: MS * 2000, + }); + assert_eq!(ev1.ends_after(&range), false); +} + +#[test] +fn binner_00() { + let mut ev1 = EventsDim0::empty(); + ev1.push(MS * 1200, 3, 1.2f32); + ev1.push(MS * 3200, 3, 3.2f32); + let binrange = BinnedRangeEnum::from_custom(TsNano(SEC), 0, 10); + let mut binner = ev1.time_binner_new(binrange, true); + binner.ingest(ev1.as_time_binnable_mut()); + eprintln!("{:?}", binner); + panic!(); + // TODO add actual asserts +} + +#[test] +fn binner_01() { + let mut ev1 = EventsDim0::empty(); + ev1.push(MS * 1200, 3, 1.2f32); + ev1.push(MS * 1300, 3, 1.3); + ev1.push(MS * 2100, 3, 2.1); + ev1.push(MS * 2300, 3, 2.3); + let binrange = BinnedRangeEnum::from_custom(TsNano(SEC), 0, 10); + let mut binner = ev1.time_binner_new(binrange, true); + binner.ingest(ev1.as_time_binnable_mut()); + eprintln!("{:?}", binner); + panic!(); + // TODO add actual asserts +} + /* TODO adapt and enable #[test] diff --git a/items_2/src/eventsxbindim0.rs b/items_2/src/eventsxbindim0.rs index 9fb380e..cd32bae 100644 --- a/items_2/src/eventsxbindim0.rs +++ b/items_2/src/eventsxbindim0.rs @@ -823,7 +823,7 @@ where [max.clone()].into(), [avg].into(), ); - self.int_ts = range_beg; + self.int_ts = range.beg_u64(); self.range = range; self.count = 0; self.sumc = 0; diff --git a/items_2/src/merger.rs b/items_2/src/merger.rs index 6517b51..565dd20 100644 --- a/items_2/src/merger.rs +++ b/items_2/src/merger.rs @@ -361,6 +361,9 @@ where } if let Some(o) = self.out.as_ref() { if o.len() >= self.out_max_len || o.byte_estimate() >= OUT_MAX_BYTES || self.do_clear_out || last_emit { + if o.len() > self.out_max_len { + info!("MERGER OVERWEIGHT ITEM {} vs {}", o.len(), self.out_max_len); + } trace3!("decide to output"); self.do_clear_out = false; //Break(Ready(Some(Ok(self.out.take().unwrap())))) diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 1e8397f..357520c 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -1690,6 +1690,16 @@ impl BinnedRangeEnum { BinnedRangeEnum::Pulse(_) => panic!(), } } + + // Only a helper for unit tests. + pub fn from_custom(len: TsNano, off: u64, cnt: u64) -> BinnedRangeEnum { + let rng = BinnedRange { + bin_len: len, + bin_off: off, + bin_cnt: cnt, + }; + BinnedRangeEnum::Time(rng) + } } #[cfg(test)] diff --git a/query/src/api4/binned.rs b/query/src/api4/binned.rs index 46ecf7a..96d98fb 100644 --- a/query/src/api4/binned.rs +++ b/query/src/api4/binned.rs @@ -38,6 +38,8 @@ pub struct BinnedQuery { buf_len_disk_io: Option, #[serde(default, skip_serializing_if = "Option::is_none")] disk_stats_every: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub merger_out_len_max: Option, } impl BinnedQuery { @@ -52,6 +54,7 @@ impl BinnedQuery { buf_len_disk_io: None, disk_stats_every: None, timeout: None, + merger_out_len_max: None, } } @@ -104,6 +107,10 @@ impl BinnedQuery { self.bins_max.unwrap_or(2000) } + pub fn merger_out_len_max(&self) -> usize { + self.merger_out_len_max.unwrap_or(1024) + } + pub fn set_series_id(&mut self, series: u64) { self.channel.series = Some(series); } @@ -189,6 +196,9 @@ impl FromUrl for BinnedQuery { .map(|x| x.parse::().map(Duration::from_millis).ok()) .unwrap_or(None), bins_max: pairs.get("binsMax").map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + merger_out_len_max: pairs + .get("mergerOutLenMax") + .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, }; debug!("BinnedQuery::from_url {:?}", ret); Ok(ret) @@ -223,5 +233,8 @@ impl AppendToUrl for BinnedQuery { if let Some(x) = &self.disk_stats_every { g.append_pair("diskStatsEveryKb", &format!("{}", x.bytes() / 1024)); } + if let Some(x) = self.merger_out_len_max.as_ref() { + g.append_pair("mergerOutLenMax", &format!("{}", x)); + } } } diff --git a/query/src/api4/events.rs b/query/src/api4/events.rs index 903df00..3abcf04 100644 --- a/query/src/api4/events.rs +++ b/query/src/api4/events.rs @@ -46,6 +46,8 @@ pub struct PlainEventsQuery { do_test_stream_error: bool, #[serde(default, skip_serializing_if = "is_false")] test_do_wasm: bool, + #[serde(default, skip_serializing_if = "Option::is_none")] + merger_out_len_max: Option, } impl PlainEventsQuery { @@ -66,6 +68,7 @@ impl PlainEventsQuery { do_test_main_error: false, do_test_stream_error: false, test_do_wasm: false, + merger_out_len_max: None, } } @@ -108,7 +111,7 @@ impl PlainEventsQuery { } pub fn merger_out_len_max(&self) -> usize { - 1024 + self.merger_out_len_max.unwrap_or(1024) } pub fn do_test_main_error(&self) -> bool { @@ -221,6 +224,9 @@ impl FromUrl for PlainEventsQuery { .map(|x| x.parse::().ok()) .unwrap_or(None) .unwrap_or(false), + merger_out_len_max: pairs + .get("mergerOutLenMax") + .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, }; Ok(ret) } @@ -265,5 +271,8 @@ impl AppendToUrl for PlainEventsQuery { if self.test_do_wasm { g.append_pair("testDoWasm", "true"); } + if let Some(x) = self.merger_out_len_max.as_ref() { + g.append_pair("mergerOutLenMax", &format!("{}", x)); + } } } diff --git a/streams/src/generators.rs b/streams/src/generators.rs index 12f8706..7082569 100644 --- a/streams/src/generators.rs +++ b/streams/src/generators.rs @@ -148,12 +148,12 @@ impl GenerateI32V01 { let mut item = EventsDim0::empty(); let mut ts = self.ts; loop { - if self.ts >= self.tsend || item.byte_estimate() > 400 { + if self.ts >= self.tsend || item.byte_estimate() > 40 { break; } let pulse = ts; let value = (ts / self.ivl) as T; - if true { + if false { info!( "v01 node {} made event ts {} pulse {} value {}", self.node_ix, ts, pulse, value diff --git a/streams/src/timebinnedjson.rs b/streams/src/timebinnedjson.rs index 7052ea4..e9e0e32 100644 --- a/streams/src/timebinnedjson.rs +++ b/streams/src/timebinnedjson.rs @@ -46,7 +46,7 @@ async fn timebinnable_stream( let inps = open_tcp_streams::<_, ChannelEvents>(&evq, &cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader. // TODO use a mixture of count and byte-size as threshold. - let stream = Merger::new(inps, evq.merger_out_len_max()); + let stream = Merger::new(inps, query.merger_out_len_max()); let stream = RangeFilter2::new(stream, range, one_before_range);