This commit is contained in:
Dominik Werder
2023-04-28 16:36:10 +02:00
parent 524d89b7f9
commit 479cec75e7
12 changed files with 278 additions and 90 deletions

View File

@@ -90,13 +90,77 @@ fn binned_d0_json_00() -> Result<(), Error> {
let a2: Vec<_> = (0..8).into_iter().map(|x| 2409 + 10 * x).collect();
assert_eq!(a1, a2);
}
{
let a1: Vec<_> = res.avgs().iter().map(|x| *x).collect();
let a2: Vec<_> = (0..8).into_iter().map(|x| 2404.5 + 10. * x as f32).collect();
assert_eq!(f32_iter_cmp_near(a1, a2, 0.01, 0.01), true);
}
Ok(())
};
taskrun::run(fut)
}
#[test]
fn binned_d0_json_01() -> Result<(), Error> {
fn binned_d0_json_01a() -> Result<(), Error> {
let fut = async {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
let jsv = get_binned_json(
Channel {
backend: TEST_BACKEND.into(),
name: "test-gen-i32-dim0-v01".into(),
series: None,
},
"1970-01-01T00:20:10.000Z",
"1970-01-01T00:40:30.000Z",
10,
cluster,
)
.await?;
debug!("Receveided a response json value: {jsv:?}");
let res: BinsDim0CollectedResult<i32> = serde_json::from_value(jsv)?;
// inmem was meant just for functional test, ignores the requested time range
assert_eq!(res.range_final(), true);
assert_eq!(res.len(), 11);
assert_eq!(res.ts_anchor_sec(), 1200);
let nb = res.len();
{
let a1: Vec<_> = res.ts1_off_ms().iter().map(|x| *x).collect();
let a2: Vec<_> = (0..nb as _).into_iter().map(|x| 120 * 1000 * x).collect();
assert_eq!(a1, a2);
}
{
let a1: Vec<_> = res.ts2_off_ms().iter().map(|x| *x).collect();
let a2: Vec<_> = (0..nb as _).into_iter().map(|x| 120 * 1000 * (1 + x)).collect();
assert_eq!(a1, a2);
}
{
let a1: Vec<_> = res.counts().iter().map(|x| *x).collect();
let a2: Vec<_> = (0..nb as _).into_iter().map(|_| 240).collect();
assert_eq!(a1, a2);
}
{
let a1: Vec<_> = res.mins().iter().map(|x| *x).collect();
let a2: Vec<_> = (0..nb as _).into_iter().map(|x| 2400 + 240 * x).collect();
assert_eq!(a1, a2);
}
{
let a1: Vec<_> = res.maxs().iter().map(|x| *x).collect();
let a2: Vec<_> = (0..nb as _).into_iter().map(|x| 2639 + 240 * x).collect();
assert_eq!(a1, a2);
}
{
let a1: Vec<_> = res.avgs().iter().map(|x| *x).collect();
let a2: Vec<_> = (0..nb).into_iter().map(|x| 2520. + 240. * x as f32).collect();
assert_eq!(f32_iter_cmp_near(a1, a2, 0.001, 0.001), true);
}
Ok(())
};
taskrun::run(fut)
}
#[test]
fn binned_d0_json_01b() -> Result<(), Error> {
let fut = async {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
@@ -144,6 +208,11 @@ fn binned_d0_json_01() -> Result<(), Error> {
let a2: Vec<_> = (0..nb as _).into_iter().map(|x| 2999 + 600 * x).collect();
assert_eq!(a1, a2);
}
{
let a1: Vec<_> = res.avgs().iter().map(|x| *x).collect();
let a2: Vec<_> = (0..nb).into_iter().map(|x| 2700. + 600. * x as f32).collect();
assert_eq!(f32_iter_cmp_near(a1, a2, 0.001, 0.001), true);
}
Ok(())
};
taskrun::run(fut)
@@ -422,7 +491,8 @@ async fn get_binned_json(
let beg_date = beg_date.parse()?;
let end_date = end_date.parse()?;
let range = NanoRange::from_date_time(beg_date, end_date).into();
let query = BinnedQuery::new(channel, range, bin_count).for_time_weighted_scalar();
let mut query = BinnedQuery::new(channel, range, bin_count).for_time_weighted_scalar();
query.merger_out_len_max = Some(240);
let hp = HostPort::from_node(node0);
let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?;
query.append_to_url(&mut url);

View File

@@ -49,13 +49,13 @@ macro_rules! impl_range_overlap_info_events {
fn ends_after(&self, range: &SeriesRange) -> bool {
if range.is_time() {
if let Some(max) = HasTimestampDeque::timestamp_max(self) {
max >= range.beg_u64()
max >= range.end_u64()
} else {
true
}
} else if range.is_pulse() {
if let Some(max) = HasTimestampDeque::pulse_max(self) {
max >= range.beg_u64()
max >= range.end_u64()
} else {
true
}

View File

@@ -195,6 +195,10 @@ pub struct TimeBinnerDynStruct {
}
impl TimeBinnerDynStruct {
pub fn type_name() -> &'static str {
std::any::type_name::<Self>()
}
pub fn new(binrange: BinnedRangeEnum, do_time_weight: bool, binner: Box<dyn TimeBinner>) -> Self {
Self {
binrange,
@@ -209,6 +213,7 @@ impl TimeBinnerTy for TimeBinnerDynStruct {
type Output = Box<dyn TimeBinned>;
fn ingest(&mut self, item: &mut Self::Input) {
info!("{} INGEST", Self::type_name());
if self.binner.is_none() {
self.binner = Some(Box::new(TimeBinnableTy::time_binner_new(
item,

View File

@@ -807,9 +807,15 @@ pub struct ChannelEventsTimeBinner {
binner: Option<Box<dyn TimeBinner>>,
}
impl ChannelEventsTimeBinner {
pub fn type_name() -> &'static str {
std::any::type_name::<Self>()
}
}
impl fmt::Debug for ChannelEventsTimeBinner {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("ChannelEventsTimeBinner")
fmt.debug_struct(Self::type_name())
.field("binrange", &self.binrange)
.field("do_time_weight", &self.do_time_weight)
.field("conn_state", &self.conn_state)
@@ -824,6 +830,7 @@ impl TimeBinnerTy for ChannelEventsTimeBinner {
type Output = Box<dyn TimeBinned>;
fn ingest(&mut self, item: &mut Self::Input) {
info!("{} INGEST", Self::type_name());
match item {
ChannelEvents::Events(item) => {
if self.binner.is_none() {

View File

@@ -36,6 +36,7 @@ use netpod::range::evrange::SeriesRange;
use netpod::timeunits::MS;
use netpod::timeunits::SEC;
use netpod::BinnedRangeEnum;
use netpod::TsNano;
use serde::Deserialize;
use serde::Serialize;
use std::any;
@@ -46,18 +47,20 @@ use std::mem;
#[allow(unused)]
macro_rules! trace_ingest {
($($arg:tt)*) => {
//let _ = ($($arg)*);
//trace!($($arg)*);
};
($($arg:tt)*) => {};
($($arg:tt)*) => { trace!($($arg)*); };
}
#[allow(unused)]
macro_rules! trace_ingest_item {
(e$($arg:tt)*) => {};
($($arg:tt)*) => { trace!($($arg)*); };
}
#[allow(unused)]
macro_rules! trace2 {
($($arg:tt)*) => {
//let _ = ($($arg)*,);
//trace!($($arg)*);
};
($($arg:tt)*) => {};
($($arg:tt)*) => { trace!($($arg)*); };
}
#[derive(Clone, PartialEq, Serialize, Deserialize)]
@@ -428,16 +431,15 @@ impl<STY: ScalarOps> items_0::collect_s::CollectableType for EventsDim0<STY> {
pub struct EventsDim0Aggregator<STY> {
range: SeriesRange,
count: u64,
min: STY,
max: STY,
minmax: Option<(STY, STY)>,
sumc: u64,
sum: f32,
int_ts: u64,
last_ts: u64,
last_val: Option<STY>,
did_min_max: bool,
do_time_weight: bool,
events_ignored_count: u64,
items_seen: usize,
}
impl<STY> Drop for EventsDim0Aggregator<STY> {
@@ -448,8 +450,8 @@ impl<STY> Drop for EventsDim0Aggregator<STY> {
}
impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
fn self_name() -> String {
any::type_name::<Self>().to_string()
fn type_name() -> &'static str {
any::type_name::<Self>()
}
pub fn new(range: SeriesRange, do_time_weight: bool) -> Self {
@@ -457,41 +459,37 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
Self {
range,
count: 0,
min: STY::zero_b(),
max: STY::zero_b(),
minmax: None,
sumc: 0,
sum: 0.,
int_ts,
last_ts: 0,
last_val: None,
did_min_max: false,
do_time_weight,
events_ignored_count: 0,
items_seen: 0,
}
}
// TODO reduce clone.. optimize via more traits to factor the trade-offs?
fn apply_min_max(&mut self, val: STY) {
trace_ingest!(
"apply_min_max val {:?} last_val {:?} count {} sumc {:?} min {:?} max {:?}",
"apply_min_max val {:?} last_val {:?} count {} sumc {:?} minmax {:?}",
val,
self.last_seen_val,
self.last_val,
self.count,
self.sumc,
self.min,
self.max
self.minmax,
);
if self.did_min_max == false {
self.did_min_max = true;
self.min = val.clone();
self.max = val.clone();
if let Some((min, max)) = self.minmax.as_mut() {
if *min > val {
*min = val.clone();
}
if *max < val {
*max = val.clone();
}
} else {
if self.min > val {
self.min = val.clone();
}
if self.max < val {
self.max = val.clone();
}
self.minmax = Some((val.clone(), val.clone()));
}
}
@@ -511,12 +509,20 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
if let Some(v) = &self.last_val {
trace_ingest!("apply_event_time_weight with v {v:?}");
let vf = v.as_prim_f32_b();
if px > self.range.beg_u64() {
let v2 = v.clone();
self.apply_min_max(v2);
}
let v2 = v.clone();
self.apply_min_max(v2);
self.sumc += 1;
let w = (px - self.int_ts) as f32 * 1e-9;
if false {
trace!(
"int_ts {:10} px {:8} w {:8.1} vf {:8.1} sum {:8.1}",
self.int_ts / MS,
px / MS,
w,
vf,
self.sum
);
}
if vf.is_nan() {
} else {
self.sum += vf * w;
@@ -552,11 +558,16 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
fn ingest_time_weight(&mut self, item: &<Self as TimeBinnableTypeAggregator>::Input) {
let self_name = any::type_name::<Self>();
trace_ingest!("{self_name}::ingest_time_weight item len {}", item.len());
trace_ingest!(
"{self_name}::ingest_time_weight item len {} items_seen {}",
item.len(),
self.items_seen
);
self.items_seen += 1;
if self.range.is_time() {
let range_beg = self.range.beg_u64();
let range_end = self.range.end_u64();
for (&ts, val) in item.tss.iter().zip(item.values.iter()) {
for (i1, (&ts, val)) in item.tss.iter().zip(item.values.iter()).enumerate() {
if ts >= range_end {
trace_ingest!("{self_name} ingest {:6} {:20} {:10?} AFTER", i1, ts, val);
self.events_ignored_count += 1;
@@ -564,7 +575,9 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
break;
} else if ts >= range_beg {
trace_ingest!("{self_name} ingest {:6} {:20} {:10?} INSIDE", i1, ts, val);
self.apply_event_time_weight(ts);
if ts > range_beg {
self.apply_event_time_weight(ts);
}
self.count += 1;
self.last_ts = ts;
self.last_val = Some(val.clone());
@@ -584,15 +597,15 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
fn result_reset_unweight(&mut self, range: SeriesRange) -> BinsDim0<STY> {
trace!("TODO check again result_reset_unweight");
err::todo();
let (min, max, avg) = if self.sumc > 0 {
let avg = self.sum / self.sumc as f32;
(self.min.clone(), self.max.clone(), avg)
let (min, max) = if let Some((min, max)) = self.minmax.take() {
(min, max)
} else {
let g = match &self.last_val {
Some(x) => x.clone(),
None => STY::zero_b(),
};
(g.clone(), g.clone(), g.as_prim_f32_b())
(STY::zero_b(), STY::zero_b())
};
let avg = if self.sumc > 0 {
self.sum / self.sumc as f32
} else {
STY::zero_b().as_prim_f32_b()
};
let ret = if self.range.is_time() {
BinsDim0 {
@@ -609,19 +622,22 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
err::todoval()
};
self.int_ts = range.beg_u64();
trace!("ON RESET SET int_ts {:10}", self.int_ts);
self.range = range;
self.count = 0;
self.sum = 0.;
self.sumc = 0;
self.min = STY::zero_b();
self.max = STY::zero_b();
self.did_min_max = false;
self.minmax = None;
self.items_seen = 0;
ret
}
fn result_reset_time_weight(&mut self, range: SeriesRange) -> BinsDim0<STY> {
// TODO check callsite for correct expand status.
debug!("result_reset_time_weight calls apply_event_time_weight");
debug!(
"result_reset_time_weight calls apply_event_time_weight range {:?} items_seen {} count {}",
self.range, self.items_seen, self.count
);
let range_beg = self.range.beg_u64();
let range_end = self.range.end_u64();
if self.range.is_time() {
@@ -630,15 +646,19 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
error!("TODO result_reset_time_weight");
err::todoval()
}
let (min, max, avg) = if self.sumc > 0 {
let avg = self.sum / (self.range.delta_u64() as f32 * 1e-9);
(self.min.clone(), self.max.clone(), avg)
let (min, max) = if let Some((min, max)) = self.minmax.take() {
(min, max)
} else {
let g = match &self.last_val {
Some(x) => x.clone(),
None => STY::zero_b(),
};
(g.clone(), g.clone(), g.as_prim_f32_b())
(STY::zero_b(), STY::zero_b())
};
let avg = if self.sumc > 0 {
self.sum / (self.range.delta_u64() as f32 * 1e-9)
} else {
if let Some(v) = self.last_val.as_ref() {
v.as_prim_f32_b()
} else {
STY::zero_b().as_prim_f32_b()
}
};
let ret = if self.range.is_time() {
BinsDim0 {
@@ -654,14 +674,13 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
error!("TODO result_reset_time_weight");
err::todoval()
};
self.int_ts = range_beg;
self.int_ts = range.beg_u64();
self.range = range;
self.count = 0;
self.sumc = 0;
self.sum = 0.;
self.min = STY::zero_b();
self.max = STY::zero_b();
self.did_min_max = false;
self.minmax = None;
self.items_seen = 0;
ret
}
}
@@ -676,11 +695,11 @@ impl<STY: ScalarOps> TimeBinnableTypeAggregator for EventsDim0Aggregator<STY> {
fn ingest(&mut self, item: &Self::Input) {
if true {
trace_ingest!("{} ingest {} events", std::any::type_name::<Self>(), item.len());
trace_ingest!("{} ingest {} events", Self::type_name(), item.len());
}
if false {
for (i, &ts) in item.tss.iter().enumerate() {
trace_ingest!("{} ingest {:6} {:20}", std::any::type_name::<Self>(), i, ts);
trace_ingest!("{} ingest {:6} {:20}", Self::type_name(), i, ts);
}
}
if self.do_time_weight {
@@ -902,12 +921,12 @@ pub struct EventsDim0TimeBinner<STY: ScalarOps> {
}
impl<STY: ScalarOps> EventsDim0TimeBinner<STY> {
fn self_name() -> &'static str {
fn type_name() -> &'static str {
any::type_name::<Self>()
}
fn new(binrange: BinnedRangeEnum, do_time_weight: bool) -> Result<Self, Error> {
trace!("{}::new binrange {binrange:?}", Self::self_name());
trace!("{}::new binrange {binrange:?}", Self::type_name());
let rng = binrange
.range_at(0)
.ok_or_else(|| Error::with_msg_no_trace("empty binrange"))?;
@@ -924,13 +943,12 @@ impl<STY: ScalarOps> EventsDim0TimeBinner<STY> {
}
fn next_bin_range(&mut self) -> Option<SeriesRange> {
let self_name = any::type_name::<Self>();
self.rix += 1;
if let Some(rng) = self.binrange.range_at(self.rix) {
trace!("{self_name} next_bin_range {:?}", rng);
trace!("{} next_bin_range {:?}", Self::type_name(), rng);
Some(rng)
} else {
trace!("{self_name} next_bin_range None");
trace!("{} next_bin_range None", Self::type_name());
None
}
}
@@ -952,9 +970,10 @@ impl<STY: ScalarOps> TimeBinner for EventsDim0TimeBinner<STY> {
}
fn ingest(&mut self, item: &mut dyn TimeBinnable) {
let self_name = any::type_name::<Self>();
trace2!(
"TimeBinner for {self_name} ingest agg.range {:?} item {:?}",
let self_name = Self::type_name();
trace_ingest_item!(
"TimeBinner for {} ingest agg.range {:?} item {:?}",
Self::type_name(),
self.agg.range(),
item
);
@@ -966,7 +985,7 @@ impl<STY: ScalarOps> TimeBinner for EventsDim0TimeBinner<STY> {
// That needs modified interfaces which can take and yield the start and latest index.
loop {
while item.starts_after(self.agg.range()) {
trace!("{self_name} IGNORE ITEM AND CYCLE BECAUSE item.starts_after");
trace_ingest_item!("{self_name} IGNORE ITEM AND CYCLE BECAUSE item.starts_after");
self.cycle();
if self.rng.is_none() {
warn!("{self_name} no more bin in edges B");
@@ -974,11 +993,11 @@ impl<STY: ScalarOps> TimeBinner for EventsDim0TimeBinner<STY> {
}
}
if item.ends_before(self.agg.range()) {
trace!("{self_name} IGNORE ITEM BECAUSE ends_before");
trace_ingest_item!("{self_name} IGNORE ITEM BECAUSE ends_before");
return;
} else {
if self.rng.is_none() {
trace!("{self_name} no more bin in edges D");
trace_ingest_item!("{self_name} no more bin in edges D");
return;
} else {
if let Some(item) = item
@@ -987,19 +1006,19 @@ impl<STY: ScalarOps> TimeBinner for EventsDim0TimeBinner<STY> {
.downcast_ref::<<EventsDim0Aggregator<STY> as TimeBinnableTypeAggregator>::Input>()
{
// TODO collect statistics associated with this request:
trace_ingest!("{self_name} FEED THE ITEM...");
trace_ingest_item!("{self_name} FEED THE ITEM...");
self.agg.ingest(item);
if item.ends_after(self.agg.range()) {
trace_ingest!("{self_name} FED ITEM, ENDS AFTER.");
trace_ingest_item!("{self_name} FED ITEM, ENDS AFTER agg-range {:?}", self.agg.range());
self.cycle();
if self.rng.is_none() {
warn!("{self_name} no more bin in edges C");
return;
} else {
trace_ingest!("{self_name} FED ITEM, CYCLED, CONTINUE.");
trace_ingest_item!("{self_name} FED ITEM, CYCLED, CONTINUE.");
}
} else {
trace_ingest!("{self_name} FED ITEM.");
trace_ingest_item!("{self_name} FED ITEM.");
break;
}
} else {
@@ -1012,7 +1031,7 @@ impl<STY: ScalarOps> TimeBinner for EventsDim0TimeBinner<STY> {
fn push_in_progress(&mut self, push_empty: bool) {
let self_name = any::type_name::<Self>();
trace!("{self_name}::push_in_progress push_empty {push_empty}");
trace_ingest_item!("{self_name}::push_in_progress push_empty {push_empty}");
// TODO expand should be derived from AggKind. Is it still required after all?
// TODO here, the expand means that agg will assume that the current value is kept constant during
// the rest of the time range.
@@ -1051,7 +1070,7 @@ impl<STY: ScalarOps> TimeBinner for EventsDim0TimeBinner<STY> {
fn cycle(&mut self) {
let self_name = any::type_name::<Self>();
trace!("{self_name}::cycle");
trace_ingest_item!("{self_name}::cycle");
// TODO refactor this logic.
let n = self.bins_ready_count();
self.push_in_progress(true);
@@ -1212,6 +1231,58 @@ mod test_serde_opt {
}
}
#[test]
fn overlap_info_00() {
let mut ev1 = EventsDim0::empty();
ev1.push(MS * 1200, 3, 1.2f32);
ev1.push(MS * 3200, 3, 3.2f32);
let range = SeriesRange::TimeRange(NanoRange {
beg: MS * 1000,
end: MS * 2000,
});
assert_eq!(ev1.ends_after(&range), true);
}
#[test]
fn overlap_info_01() {
let mut ev1 = EventsDim0::empty();
ev1.push(MS * 1200, 3, 1.2f32);
ev1.push(MS * 1400, 3, 3.2f32);
let range = SeriesRange::TimeRange(NanoRange {
beg: MS * 1000,
end: MS * 2000,
});
assert_eq!(ev1.ends_after(&range), false);
}
#[test]
fn binner_00() {
let mut ev1 = EventsDim0::empty();
ev1.push(MS * 1200, 3, 1.2f32);
ev1.push(MS * 3200, 3, 3.2f32);
let binrange = BinnedRangeEnum::from_custom(TsNano(SEC), 0, 10);
let mut binner = ev1.time_binner_new(binrange, true);
binner.ingest(ev1.as_time_binnable_mut());
eprintln!("{:?}", binner);
panic!();
// TODO add actual asserts
}
#[test]
fn binner_01() {
let mut ev1 = EventsDim0::empty();
ev1.push(MS * 1200, 3, 1.2f32);
ev1.push(MS * 1300, 3, 1.3);
ev1.push(MS * 2100, 3, 2.1);
ev1.push(MS * 2300, 3, 2.3);
let binrange = BinnedRangeEnum::from_custom(TsNano(SEC), 0, 10);
let mut binner = ev1.time_binner_new(binrange, true);
binner.ingest(ev1.as_time_binnable_mut());
eprintln!("{:?}", binner);
panic!();
// TODO add actual asserts
}
/*
TODO adapt and enable
#[test]

View File

@@ -823,7 +823,7 @@ where
[max.clone()].into(),
[avg].into(),
);
self.int_ts = range_beg;
self.int_ts = range.beg_u64();
self.range = range;
self.count = 0;
self.sumc = 0;

View File

@@ -361,6 +361,9 @@ where
}
if let Some(o) = self.out.as_ref() {
if o.len() >= self.out_max_len || o.byte_estimate() >= OUT_MAX_BYTES || self.do_clear_out || last_emit {
if o.len() > self.out_max_len {
info!("MERGER OVERWEIGHT ITEM {} vs {}", o.len(), self.out_max_len);
}
trace3!("decide to output");
self.do_clear_out = false;
//Break(Ready(Some(Ok(self.out.take().unwrap()))))

View File

@@ -1690,6 +1690,16 @@ impl BinnedRangeEnum {
BinnedRangeEnum::Pulse(_) => panic!(),
}
}
// Only a helper for unit tests.
pub fn from_custom(len: TsNano, off: u64, cnt: u64) -> BinnedRangeEnum {
let rng = BinnedRange {
bin_len: len,
bin_off: off,
bin_cnt: cnt,
};
BinnedRangeEnum::Time(rng)
}
}
#[cfg(test)]

View File

@@ -38,6 +38,8 @@ pub struct BinnedQuery {
buf_len_disk_io: Option<usize>,
#[serde(default, skip_serializing_if = "Option::is_none")]
disk_stats_every: Option<ByteSize>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub merger_out_len_max: Option<usize>,
}
impl BinnedQuery {
@@ -52,6 +54,7 @@ impl BinnedQuery {
buf_len_disk_io: None,
disk_stats_every: None,
timeout: None,
merger_out_len_max: None,
}
}
@@ -104,6 +107,10 @@ impl BinnedQuery {
self.bins_max.unwrap_or(2000)
}
pub fn merger_out_len_max(&self) -> usize {
self.merger_out_len_max.unwrap_or(1024)
}
pub fn set_series_id(&mut self, series: u64) {
self.channel.series = Some(series);
}
@@ -189,6 +196,9 @@ impl FromUrl for BinnedQuery {
.map(|x| x.parse::<u64>().map(Duration::from_millis).ok())
.unwrap_or(None),
bins_max: pairs.get("binsMax").map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?,
merger_out_len_max: pairs
.get("mergerOutLenMax")
.map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?,
};
debug!("BinnedQuery::from_url {:?}", ret);
Ok(ret)
@@ -223,5 +233,8 @@ impl AppendToUrl for BinnedQuery {
if let Some(x) = &self.disk_stats_every {
g.append_pair("diskStatsEveryKb", &format!("{}", x.bytes() / 1024));
}
if let Some(x) = self.merger_out_len_max.as_ref() {
g.append_pair("mergerOutLenMax", &format!("{}", x));
}
}
}

View File

@@ -46,6 +46,8 @@ pub struct PlainEventsQuery {
do_test_stream_error: bool,
#[serde(default, skip_serializing_if = "is_false")]
test_do_wasm: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
merger_out_len_max: Option<usize>,
}
impl PlainEventsQuery {
@@ -66,6 +68,7 @@ impl PlainEventsQuery {
do_test_main_error: false,
do_test_stream_error: false,
test_do_wasm: false,
merger_out_len_max: None,
}
}
@@ -108,7 +111,7 @@ impl PlainEventsQuery {
}
pub fn merger_out_len_max(&self) -> usize {
1024
self.merger_out_len_max.unwrap_or(1024)
}
pub fn do_test_main_error(&self) -> bool {
@@ -221,6 +224,9 @@ impl FromUrl for PlainEventsQuery {
.map(|x| x.parse::<bool>().ok())
.unwrap_or(None)
.unwrap_or(false),
merger_out_len_max: pairs
.get("mergerOutLenMax")
.map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?,
};
Ok(ret)
}
@@ -265,5 +271,8 @@ impl AppendToUrl for PlainEventsQuery {
if self.test_do_wasm {
g.append_pair("testDoWasm", "true");
}
if let Some(x) = self.merger_out_len_max.as_ref() {
g.append_pair("mergerOutLenMax", &format!("{}", x));
}
}
}

View File

@@ -148,12 +148,12 @@ impl GenerateI32V01 {
let mut item = EventsDim0::empty();
let mut ts = self.ts;
loop {
if self.ts >= self.tsend || item.byte_estimate() > 400 {
if self.ts >= self.tsend || item.byte_estimate() > 40 {
break;
}
let pulse = ts;
let value = (ts / self.ivl) as T;
if true {
if false {
info!(
"v01 node {} made event ts {} pulse {} value {}",
self.node_ix, ts, pulse, value

View File

@@ -46,7 +46,7 @@ async fn timebinnable_stream(
let inps = open_tcp_streams::<_, ChannelEvents>(&evq, &cluster).await?;
// TODO propagate also the max-buf-len for the first stage event reader.
// TODO use a mixture of count and byte-size as threshold.
let stream = Merger::new(inps, evq.merger_out_len_max());
let stream = Merger::new(inps, query.merger_out_len_max());
let stream = RangeFilter2::new(stream, range, one_before_range);