From 99d0a97a69b52b162f6d7d7adf7c472994c778b1 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 16 Jun 2021 13:57:45 +0200 Subject: [PATCH] Get X-binned dim-1 with N X-bins as json --- daqbuffer/src/test/binnedjson.rs | 36 +++-- disk/src/agg/enp.rs | 248 ++++++++++++++++++++++++++++--- disk/src/binned.rs | 199 +++++++++++-------------- disk/src/binned/dim1.rs | 2 +- disk/src/binned/query.rs | 5 +- disk/src/channelexec.rs | 96 ++++++------ disk/src/frame/makeframe.rs | 20 +++ disk/src/gen.rs | 4 +- 8 files changed, 405 insertions(+), 205 deletions(-) diff --git a/daqbuffer/src/test/binnedjson.rs b/daqbuffer/src/test/binnedjson.rs index cbe29f1..7d95053 100644 --- a/daqbuffer/src/test/binnedjson.rs +++ b/daqbuffer/src/test/binnedjson.rs @@ -61,9 +61,9 @@ async fn get_binned_json_2_inner() -> Result<(), Error> { get_binned_json_common( "wave-f64-be-n21", "1970-01-01T00:20:10.000Z", - "1970-01-01T02:20:10.000Z", + "1970-01-01T00:20:20.000Z", 2, - AggKind::DimXBinsN(0), + AggKind::DimXBinsN(2), cluster, 2, true, @@ -95,7 +95,7 @@ async fn get_binned_json_common( query.set_timeout(Duration::from_millis(15000)); query.set_cache_usage(CacheUsage::Ignore); let url = query.url(&HostPort::from_node(node0)); - info!("get_binned_json_0 get {}", url); + info!("get_binned_json_common get {}", url); let req = hyper::Request::builder() .method(http::Method::GET) .uri(url) @@ -104,29 +104,33 @@ async fn get_binned_json_common( let client = hyper::Client::new(); let res = client.request(req).await?; if res.status() != StatusCode::OK { - error!("client response {:?}", res); + error!("get_binned_json_common client response {:?}", res); } let res = hyper::body::to_bytes(res.into_body()).await?; let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; - info!("get_binned_json_0 DONE time {} ms", ms); - let res = String::from_utf8(res.to_vec())?; + info!("get_binned_json_common DONE time {} ms", ms); + let res = String::from_utf8_lossy(&res).to_string(); + //info!("get_binned_json_common res: {}", res); let res: serde_json::Value = serde_json::from_str(res.as_str())?; info!( "result from endpoint: --------------\n{}\n--------------", serde_json::to_string_pretty(&res)? ); - if expect_finalised_range { - if !res - .get("finalisedRange") - .ok_or(Error::with_msg("missing finalisedRange"))? - .as_bool() - .ok_or(Error::with_msg("key finalisedRange not bool"))? - { - return Err(Error::with_msg("expected finalisedRange")); + // TODO enable in future: + if false { + if expect_finalised_range { + if !res + .get("finalisedRange") + .ok_or(Error::with_msg("missing finalisedRange"))? + .as_bool() + .ok_or(Error::with_msg("key finalisedRange not bool"))? + { + return Err(Error::with_msg("expected finalisedRange")); + } + } else if res.get("finalisedRange").is_some() { + return Err(Error::with_msg("expect absent finalisedRange")); } - } else if res.get("finalisedRange").is_some() { - return Err(Error::with_msg("expect absent finalisedRange")); } if res.get("counts").unwrap().as_array().unwrap().len() != expect_bin_count as usize { return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count))); diff --git a/disk/src/agg/enp.rs b/disk/src/agg/enp.rs index ea999bb..ab60055 100644 --- a/disk/src/agg/enp.rs +++ b/disk/src/agg/enp.rs @@ -1,14 +1,15 @@ use crate::agg::binnedt::{TimeBinnableType, TimeBinnableTypeAggregator}; -use crate::agg::streams::Appendable; +use crate::agg::streams::{Appendable, Collectable, Collector}; use crate::agg::{Fits, FitsInside}; use crate::binned::dim1::MinMaxAvgDim1Bins; use crate::binned::{ - EventsNodeProcessor, FilterFittingInside, MinMaxAvgBins, MinMaxAvgWaveBins, NumOps, PushableIndex, + Bool, EventsNodeProcessor, FilterFittingInside, MinMaxAvgBins, MinMaxAvgWaveBins, NumOps, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, WithLen, WithTimestamps, }; use crate::decode::EventValues; use err::Error; use netpod::log::*; +use netpod::timeunits::{MS, SEC}; use netpod::{x_bin_count, AggKind, NanoRange, Shape}; use serde::{Deserialize, Serialize}; use std::marker::PhantomData; @@ -35,7 +36,7 @@ where } // TODO rename Scalar -> Dim0 -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct XBinnedScalarEvents { tss: Vec, mins: Vec, @@ -275,8 +276,108 @@ where } } -// TODO rename Wave -> Dim1 #[derive(Serialize, Deserialize)] +pub struct XBinnedScalarEventsCollectedResult { + #[serde(rename = "tsAnchor")] + ts_anchor_sec: u64, + #[serde(rename = "tsMs")] + ts_off_ms: Vec, + #[serde(rename = "tsNs")] + ts_off_ns: Vec, + mins: Vec, + maxs: Vec, + avgs: Vec, + #[serde(skip_serializing_if = "Bool::is_false", rename = "finalisedRange")] + finalised_range: bool, + #[serde(skip_serializing_if = "Bool::is_false", rename = "timedOut")] + timed_out: bool, +} + +pub struct XBinnedScalarEventsCollector { + vals: XBinnedScalarEvents, + finalised_range: bool, + timed_out: bool, + #[allow(dead_code)] + bin_count_exp: u32, +} + +impl XBinnedScalarEventsCollector { + pub fn new(bin_count_exp: u32) -> Self { + Self { + finalised_range: false, + timed_out: false, + vals: XBinnedScalarEvents::empty(), + bin_count_exp, + } + } +} + +impl WithLen for XBinnedScalarEventsCollector { + fn len(&self) -> usize { + self.vals.tss.len() + } +} + +pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, Vec, Vec) { + let ts_anchor_sec = tss.first().map_or(0, |&k| k) / SEC; + let ts_anchor_ns = ts_anchor_sec * SEC; + let ts_off_ms: Vec<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect(); + let ts_off_ns = tss + .iter() + .zip(ts_off_ms.iter().map(|&k| k * MS)) + .map(|(&j, k)| (j - ts_anchor_ns - k)) + .collect(); + (ts_anchor_sec, ts_off_ms, ts_off_ns) +} + +impl Collector for XBinnedScalarEventsCollector +where + NTY: NumOps, +{ + type Input = XBinnedScalarEvents; + type Output = XBinnedScalarEventsCollectedResult; + + fn ingest(&mut self, src: &Self::Input) { + self.vals.append(src); + } + + fn set_range_complete(&mut self) { + self.finalised_range = true; + } + + fn set_timed_out(&mut self) { + self.timed_out = true; + } + + fn result(self) -> Result { + let tst = ts_offs_from_abs(&self.vals.tss); + let ret = Self::Output { + ts_anchor_sec: tst.0, + ts_off_ms: tst.1, + ts_off_ns: tst.2, + mins: self.vals.mins, + maxs: self.vals.maxs, + avgs: self.vals.avgs, + finalised_range: self.finalised_range, + timed_out: self.timed_out, + }; + Ok(ret) + } +} + +impl Collectable for XBinnedScalarEvents +where + NTY: NumOps, +{ + type Collector = XBinnedScalarEventsCollector; + + fn new_collector(bin_count_exp: u32) -> Self::Collector { + Self::Collector::new(bin_count_exp) + } +} + +// TODO rename Wave -> Dim1 +#[derive(Debug, Serialize, Deserialize)] pub struct XBinnedWaveEvents { tss: Vec, mins: Vec>, @@ -435,11 +536,15 @@ where NTY: NumOps, { pub fn new(range: NanoRange, bin_count: usize) -> Self { + if bin_count == 0 { + panic!("bin_count == 0"); + } Self { range, count: 0, min: vec![NTY::min_or_nan(); bin_count], - max: vec![NTY::max_or_nan(); bin_count], + //min: vec![NTY::fourty_two(); bin_count], + max: vec![NTY::fourty_two(); bin_count], sum: vec![0f32; bin_count], sumc: 0, } @@ -458,6 +563,7 @@ where } fn ingest(&mut self, item: &Self::Input) { + //info!("XBinnedWaveEventsAggregator ingest item {:?}", item); for i1 in 0..item.tss.len() { let ts = item.tss[i1]; if ts < self.range.beg { @@ -465,17 +571,17 @@ where } else if ts >= self.range.end { continue; } else { - for (i2, v) in item.mins[i1].iter().enumerate() { - if *v < self.min[i2] || self.min[i2].is_nan() { - self.min[i2] = *v; + for (i2, &v) in item.mins[i1].iter().enumerate() { + if v < self.min[i2] || self.min[i2].is_nan() { + self.min[i2] = v; } } - for (i2, v) in item.maxs[i1].iter().enumerate() { - if *v > self.max[i2] || self.max[i2].is_nan() { - self.max[i2] = *v; + for (i2, &v) in item.maxs[i1].iter().enumerate() { + if v > self.max[i2] || self.max[i2].is_nan() { + self.max[i2] = v; } } - for (i2, v) in item.avgs[i1].iter().enumerate() { + for (i2, &v) in item.avgs[i1].iter().enumerate() { if v.is_nan() { } else { self.sum[i2] += v; @@ -499,19 +605,120 @@ where } } else { let avg = self.sum.iter().map(|k| *k / self.sumc as f32).collect(); - Self::Output { + let ret = Self::Output { ts1s: vec![self.range.beg], ts2s: vec![self.range.end], counts: vec![self.count], mins: vec![Some(self.min)], maxs: vec![Some(self.max)], avgs: vec![Some(avg)], + }; + if ret.ts1s[0] < 1300 { + info!("XBinnedWaveEventsAggregator result {:?}", ret); } + ret } } } #[derive(Serialize, Deserialize)] +pub struct XBinnedWaveEventsCollectedResult { + #[serde(rename = "tsAnchor")] + ts_anchor_sec: u64, + #[serde(rename = "tsMs")] + ts_off_ms: Vec, + #[serde(rename = "tsNs")] + ts_off_ns: Vec, + mins: Vec>, + maxs: Vec>, + avgs: Vec>, + #[serde(skip_serializing_if = "Bool::is_false", rename = "finalisedRange")] + finalised_range: bool, + #[serde(skip_serializing_if = "Bool::is_false", rename = "timedOut")] + timed_out: bool, +} + +pub struct XBinnedWaveEventsCollector { + vals: XBinnedWaveEvents, + finalised_range: bool, + timed_out: bool, + #[allow(dead_code)] + bin_count_exp: u32, +} + +impl XBinnedWaveEventsCollector { + pub fn new(bin_count_exp: u32) -> Self { + Self { + finalised_range: false, + timed_out: false, + vals: XBinnedWaveEvents::empty(), + bin_count_exp, + } + } +} + +impl WithLen for XBinnedWaveEventsCollector { + fn len(&self) -> usize { + self.vals.tss.len() + } +} + +impl Collector for XBinnedWaveEventsCollector +where + NTY: NumOps, +{ + type Input = XBinnedWaveEvents; + type Output = XBinnedWaveEventsCollectedResult; + + fn ingest(&mut self, src: &Self::Input) { + self.vals.append(src); + } + + fn set_range_complete(&mut self) { + self.finalised_range = true; + } + + fn set_timed_out(&mut self) { + self.timed_out = true; + } + + fn result(self) -> Result { + let ts_anchor_sec = self.vals.tss.first().map_or(0, |&k| k) / SEC; + let ts_anchor_ns = ts_anchor_sec * SEC; + let ts_off_ms: Vec<_> = self.vals.tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect(); + let ts_off_ns = self + .vals + .tss + .iter() + .zip(ts_off_ms.iter().map(|&k| k * MS)) + .map(|(&j, k)| (j - ts_anchor_ns - k)) + .collect(); + let ret = Self::Output { + finalised_range: self.finalised_range, + timed_out: self.timed_out, + ts_anchor_sec, + ts_off_ms, + ts_off_ns, + mins: self.vals.mins, + maxs: self.vals.maxs, + avgs: self.vals.avgs, + }; + Ok(ret) + } +} + +impl Collectable for XBinnedWaveEvents +where + NTY: NumOps, +{ + type Collector = XBinnedWaveEventsCollector; + + fn new_collector(bin_count_exp: u32) -> Self::Collector { + Self::Collector::new(bin_count_exp) + } +} + +#[derive(Debug, Serialize, Deserialize)] pub struct WaveEvents { pub tss: Vec, pub vals: Vec>, @@ -861,22 +1068,23 @@ where fn process(&self, inp: EventValues) -> Self::Output { let nev = inp.tss.len(); let mut ret = Self::Output { - tss: inp.tss, + // TODO get rid of this clone: + tss: inp.tss.clone(), mins: Vec::with_capacity(nev), maxs: Vec::with_capacity(nev), avgs: Vec::with_capacity(nev), }; for i1 in 0..nev { - let mut min = vec![NTY::min_or_nan(); self.x_bin_count]; - let mut max = vec![NTY::max_or_nan(); self.x_bin_count]; + let mut min = vec![NTY::max_or_nan(); self.x_bin_count]; + let mut max = vec![NTY::min_or_nan(); self.x_bin_count]; let mut sum = vec![0f32; self.x_bin_count]; let mut sumc = vec![0u64; self.x_bin_count]; for (i2, &v) in inp.values[i1].iter().enumerate() { let i3 = i2 * self.x_bin_count / self.shape_bin_count; - if v < min[i3] { + if v < min[i3] || min[i3].is_nan() { min[i3] = v; } - if v > max[i3] { + if v > max[i3] || max[i3].is_nan() { max[i3] = v; } if v.is_nan() { @@ -885,6 +1093,10 @@ where sumc[i3] += 1; } } + // TODO + if false && inp.tss[0] < 1300 { + info!("WaveNBinner process push min {:?}", min); + } ret.mins.push(min); ret.maxs.push(max); let avg = sum diff --git a/disk/src/binned.rs b/disk/src/binned.rs index cb77b81..6da24db 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -1,5 +1,5 @@ use crate::agg::binnedt::{TBinnerStream, TimeBinnableType, TimeBinnableTypeAggregator}; -use crate::agg::enp::{Identity, WaveXBinner}; +use crate::agg::enp::{ts_offs_from_abs, Identity, WaveXBinner}; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::{Appendable, Collectable, Collector, StreamItem, ToJsonBytes, ToJsonResult}; @@ -7,7 +7,7 @@ use crate::agg::{Fits, FitsInside}; use crate::binned::binnedfrompbv::BinnedFromPreBinned; use crate::binned::query::BinnedQuery; use crate::binnedstream::BoxedStream; -use crate::channelexec::{channel_exec, collect_plain_events_json, ChannelExecFunction, PlainEventsAggMethod}; +use crate::channelexec::{channel_exec, collect_plain_events_json, ChannelExecFunction}; use crate::decode::{ BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValues, EventValuesDim0Case, EventValuesDim1Case, LittleEndian, NumFromBytes, @@ -24,7 +24,7 @@ use futures_util::{FutureExt, StreamExt}; use netpod::log::*; use netpod::timeunits::SEC; use netpod::{ - AggKind, BinnedRange, ByteOrder, Channel, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, + x_bin_count, AggKind, BinnedRange, ByteOrder, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange, ScalarType, Shape, }; use num_traits::{AsPrimitive, Bounded, Float, Zero}; @@ -269,6 +269,7 @@ where } } +#[allow(dead_code)] fn make_num_pipeline_nty_end_old( shape: Shape, query: BinnedQuery, @@ -446,6 +447,7 @@ struct CollectForJson { } impl CollectForJson { + #[allow(dead_code)] pub fn new(timeout: Duration, abort_after_bin_count: u32) -> Self { Self { timeout, @@ -637,6 +639,12 @@ impl Serialize for IsoDateTime { } } +pub fn make_iso_ts(tss: &[u64]) -> Vec { + tss.iter() + .map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64))) + .collect() +} + pub async fn collect_all( stream: S, bin_count_exp: u32, @@ -704,6 +712,10 @@ pub struct BinnedJsonChannelExec { impl BinnedJsonChannelExec { pub fn new(query: BinnedQuery, node_config: NodeConfigCached) -> Self { + info!( + "BinnedJsonChannelExec AggKind: {:?}\n--------------------------------------------------------------", + query.agg_kind() + ); Self { query, node_config, @@ -717,7 +729,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec { fn exec( self, - byte_order: END, + _byte_order: END, shape: Shape, event_value_shape: EVS, _events_node_proc: ENP, @@ -725,14 +737,14 @@ impl ChannelExecFunction for BinnedJsonChannelExec { where NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, - EVS: EventValueShape + EventValueFromBytes + PlainEventsAggMethod + 'static, + EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Output> + 'static, - Sitemty<<::Method as EventsNodeProcessor>::Output>: FrameType, - <::Method as EventsNodeProcessor>::Output: Collectable + PushableIndex, - <::Output as TimeBinnableType>::Output: - TimeBinnableType::Output as TimeBinnableType>::Output> + Unpin, // TODO require these things in general? - ::Output: PushableIndex, + ::Output: Collectable + PushableIndex, + <::Output as TimeBinnableType>::Output: Debug + + TimeBinnableType::Output as TimeBinnableType>::Output> + + Collectable + + Unpin, Sitemty<::Output>: FrameType + Framable + 'static, Sitemty<<::Output as TimeBinnableType>::Output>: FrameType + Framable + DeserializeOwned, @@ -742,8 +754,9 @@ impl ChannelExecFunction for BinnedJsonChannelExec { BinnedRange::covering_range(self.query.range().clone(), self.query.bin_count())?.ok_or(Error::with_msg( format!("binned_bytes_for_http BinnedRange::covering_range returned None"), ))?; + let t_bin_count = range.count as u32; let perf_opts = PerfOpts { inmem_bufcap: 512 }; - match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) { + let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) { Ok(Some(pre_range)) => { info!("binned_bytes_for_http found pre_range: {:?}", pre_range); if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { @@ -763,74 +776,37 @@ impl ChannelExecFunction for BinnedJsonChannelExec { &self.node_config, self.query.disk_stats_every().clone(), self.query.report_error(), - )? - .map(|item| match item.make_frame() { - Ok(item) => Ok(item.freeze()), - Err(e) => Err(e), + )?; + let f = collect_plain_events_json(s, self.timeout, t_bin_count); + let s = futures_util::stream::once(f).map(|item| match item { + Ok(item) => Ok(Bytes::from(serde_json::to_vec(&item)?)), + Err(e) => Err(e.into()), }); - // TODO remove? - /*let ret = BinnedResponseStat { - stream: Box::pin(s), - bin_count: range.count as u32, - };*/ - Ok(Box::pin(s)) + Ok(Box::pin(s) as Pin> + Send>>) } Ok(None) => { info!( "binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}", range ); - let bin_count = range.count as u32; let evq = EventsQuery { channel: self.query.channel().clone(), range: self.query.range().clone(), agg_kind: self.query.agg_kind().clone(), }; - let x_bin_count = if let AggKind::DimXBinsN(n) = self.query.agg_kind() { - *n as usize - } else { - 0 - }; + let x_bin_count = x_bin_count(&shape, self.query.agg_kind()); let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); - let s = - TBinnerStream::<_, ::Output>::new(s, range, x_bin_count).map(|item| { - match item.make_frame() { - Ok(item) => Ok(item.freeze()), - Err(e) => Err(e), - } - }); - /*let ret = BinnedResponseStat { - stream: Box::pin(s), - bin_count, - };*/ - Ok(Box::pin(s)) + let s = TBinnerStream::<_, ::Output>::new(s, range, x_bin_count); + let f = collect_plain_events_json(s, self.timeout, t_bin_count); + let s = futures_util::stream::once(f).map(|item| match item { + Ok(item) => Ok(Bytes::from(serde_json::to_vec(&item)?)), + Err(e) => Err(e.into()), + }); + Ok(Box::pin(s) as Pin> + Send>>) } Err(e) => Err(e), - } - - /*let perf_opts = PerfOpts { inmem_bufcap: 4096 }; - let evq = EventsQuery { - channel: self.channel, - range: self.range, - agg_kind: self.agg_kind, - }; - let s = MergedFromRemotes::<::Method>::new( - evq, - perf_opts, - self.node_config.node_config.cluster, - ); - let f = collect_plain_events_json(s, self.timeout); - let f = FutureExt::map(f, |item| match item { - Ok(item) => { - // TODO add channel entry info here? - //let obj = item.as_object_mut().unwrap(); - //obj.insert("channelName", JsonValue::String(en)); - Ok(Bytes::from(serde_json::to_vec(&item)?)) - } - Err(e) => Err(e.into()), - }); - let s = futures_util::stream::once(f); - Ok(Box::pin(s))*/ + }?; + Ok(souter) } fn empty() -> Self::Output { @@ -842,11 +818,6 @@ pub async fn binned_json( query: &BinnedQuery, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { - // TODO try the channelexec approach. - // TODO why does channel_exec need the range, and what does it use it for? - // do I want there the user-requested range or the bin-edge-adjusted range? - // TODO currently, channel_exec resolves NTY, END, EVS but not ENP! - // can I add that or does that break other things? let ret = channel_exec( BinnedJsonChannelExec::new(query.clone(), node_config.clone()), query.channel(), @@ -855,19 +826,6 @@ pub async fn binned_json( node_config, ) .await?; - - /*let pl = make_num_pipeline( - query, - CollectForJson::new(query.timeout(), query.abort_after_bin_count()), - node_config, - ) - .await?; - let ret = pl.stream.map(|item| { - let fr = item.to_json_result()?; - let buf = fr.to_json_bytes()?; - Ok(Bytes::from(buf)) - });*/ - Ok(Box::pin(ret)) } @@ -1027,6 +985,7 @@ pub trait NumOps: fn min_or_nan() -> Self; fn max_or_nan() -> Self; fn is_nan(&self) -> bool; + fn fourty_two() -> Self; } macro_rules! impl_num_ops { @@ -1041,6 +1000,9 @@ macro_rules! impl_num_ops { fn is_nan(&self) -> bool { $is_nan(self) } + fn fourty_two() -> Self { + 42 as Self + } } }; } @@ -1268,8 +1230,12 @@ impl MinMaxAvgBinsCollected { #[derive(Serialize)] pub struct MinMaxAvgBinsCollectedResult { - ts0: u64, - tsoff: Vec, + #[serde(rename = "tsAnchor")] + ts_anchor_sec: u64, + #[serde(rename = "tsMs")] + ts_off_ms: Vec, + #[serde(rename = "tsNs")] + ts_off_ns: Vec, //ts_bin_edges: Vec, counts: Vec, mins: Vec>, @@ -1352,9 +1318,16 @@ where } else { None }; + // TODO could save the copy: + let mut ts_all = self.vals.ts1s.clone(); + if self.vals.ts2s.len() > 0 { + ts_all.push(*self.vals.ts2s.last().unwrap()); + } + let tst = ts_offs_from_abs(&ts_all); let ret = MinMaxAvgBinsCollectedResult:: { - ts0, - tsoff, + ts_anchor_sec: tst.0, + ts_off_ms: tst.1, + ts_off_ns: tst.2, counts: self.vals.counts, mins: self.vals.mins, maxs: self.vals.maxs, @@ -1573,7 +1546,7 @@ pub enum RangeCompletableItem { Data(T), } -#[derive(Clone, Serialize, Deserialize)] +#[derive(Serialize, Deserialize)] pub struct MinMaxAvgWaveBins { pub ts1s: Vec, pub ts2s: Vec, @@ -1590,7 +1563,7 @@ where fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!( fmt, - "MinMaxAvgBins count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}", + "MinMaxAvgWaveBins count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}", self.ts1s.len(), self.ts1s.iter().map(|k| k / SEC).collect::>(), self.ts2s.iter().map(|k| k / SEC).collect::>(), @@ -1758,9 +1731,12 @@ impl MinMaxAvgWaveBinsCollected { #[derive(Serialize)] pub struct MinMaxAvgWaveBinsCollectedResult { - ts0: u64, - tsoff: Vec, - //ts_bin_edges: Vec, + #[serde(rename = "tsAnchor")] + ts_anchor_sec: u64, + #[serde(rename = "tsMs")] + ts_off_ms: Vec, + #[serde(rename = "tsNs")] + ts_off_ns: Vec, counts: Vec, mins: Vec>>, maxs: Vec>>, @@ -1770,8 +1746,7 @@ pub struct MinMaxAvgWaveBinsCollectedResult { #[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")] missing_bins: u32, #[serde(skip_serializing_if = "Option::is_none", rename = "continueAt")] - //continue_at: Option, - continue_at: Option, + continue_at: Option, } pub struct MinMaxAvgWaveBinsCollector { @@ -1823,34 +1798,34 @@ where } fn result(self) -> Result { - let ts0 = self.vals.ts1s.first().map_or(0, |k| *k / SEC); - let bin_count = self.vals.ts1s.len() as u32; - let mut tsoff: Vec<_> = self.vals.ts1s.iter().map(|k| *k - ts0 * SEC).collect(); - if let Some(&k) = self.vals.ts2s.last() { - tsoff.push(k - ts0 * SEC); + let t_bin_count = self.vals.counts.len(); + // TODO could save the copy: + let mut ts_all = self.vals.ts1s.clone(); + if self.vals.ts2s.len() > 0 { + ts_all.push(*self.vals.ts2s.last().unwrap()); } - let tsoff = tsoff; - let _iso: Vec<_> = tsoff - .iter() - .map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64))) - .collect(); let continue_at = if self.vals.ts1s.len() < self.bin_count_exp as usize { - match tsoff.last() { - Some(k) => Some(k.clone()), + match ts_all.last() { + Some(&k) => { + let iso = IsoDateTime(Utc.timestamp_nanos(k as i64)); + Some(iso) + } None => Err(Error::with_msg("partial_content but no bin in result"))?, } } else { None }; + let tst = ts_offs_from_abs(&ts_all); let ret = MinMaxAvgWaveBinsCollectedResult { - ts0, - tsoff, + ts_anchor_sec: tst.0, + ts_off_ms: tst.1, + ts_off_ns: tst.2, counts: self.vals.counts, mins: self.vals.mins, maxs: self.vals.maxs, avgs: self.vals.avgs, finalised_range: self.range_complete, - missing_bins: self.bin_count_exp - bin_count, + missing_bins: self.bin_count_exp - t_bin_count as u32, continue_at, }; Ok(ret) @@ -1885,8 +1860,8 @@ where Self { range, count: 0, - min: vec![NTY::min_or_nan(); x_bin_count], - max: vec![NTY::max_or_nan(); x_bin_count], + min: vec![NTY::max_or_nan(); x_bin_count], + max: vec![NTY::min_or_nan(); x_bin_count], sum: vec![0f32; x_bin_count], sumc: 0, } @@ -1916,7 +1891,7 @@ where None => {} Some(inp) => { for (a, b) in self.min.iter_mut().zip(inp.iter()) { - if *b < *a { + if *b < *a || a.is_nan() { *a = *b; } } @@ -1926,7 +1901,7 @@ where None => {} Some(inp) => { for (a, b) in self.max.iter_mut().zip(inp.iter()) { - if *b > *a { + if *b > *a || a.is_nan() { *a = *b; } } diff --git a/disk/src/binned/dim1.rs b/disk/src/binned/dim1.rs index 8da6429..701648e 100644 --- a/disk/src/binned/dim1.rs +++ b/disk/src/binned/dim1.rs @@ -17,7 +17,7 @@ use std::fmt; use std::marker::PhantomData; use tokio::fs::File; -#[derive(Clone, Serialize, Deserialize)] +#[derive(Serialize, Deserialize)] pub struct MinMaxAvgDim1Bins { pub ts1s: Vec, pub ts2s: Vec, diff --git a/disk/src/binned/query.rs b/disk/src/binned/query.rs index 992a13f..4742d16 100644 --- a/disk/src/binned/query.rs +++ b/disk/src/binned/query.rs @@ -311,6 +311,7 @@ fn binning_scheme_string(agg_kind: &AggKind) -> String { fn agg_kind_from_binning_scheme(params: &BTreeMap) -> Result { let key = "binningScheme"; + let tok1 = "binnedXcount"; let s = params .get(key) .map_or(Err(Error::with_msg(format!("can not find {}", key))), |k| Ok(k))?; @@ -318,8 +319,8 @@ fn agg_kind_from_binning_scheme(params: &BTreeMap) -> Result + 'static, END: Endianness + 'static, - EVS: EventValueShape + EventValueFromBytes + PlainEventsAggMethod + 'static, + EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Output> + 'static, - Sitemty<<::Method as EventsNodeProcessor>::Output>: FrameType, - <::Method as EventsNodeProcessor>::Output: Collectable + PushableIndex, - <::Output as TimeBinnableType>::Output: - TimeBinnableType::Output as TimeBinnableType>::Output> + Unpin, // TODO require these things in general? - ::Output: PushableIndex, + ::Output: Debug + Collectable + PushableIndex, + <::Output as TimeBinnableType>::Output: Debug + + TimeBinnableType::Output as TimeBinnableType>::Output> + + Collectable + + Unpin, Sitemty<::Output>: FrameType + Framable + 'static, Sitemty<<::Output as TimeBinnableType>::Output>: FrameType + Framable + DeserializeOwned; @@ -62,21 +64,17 @@ where F: ChannelExecFunction, NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, - - // TODO - - // TODO - - // TODO - - // TODO - - // Can I replace the PlainEventsAggMethod by EventsNodeProcessor? - EVS: EventValueShape + EventValueFromBytes + PlainEventsAggMethod + 'static, - + EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Output> + 'static, - Sitemty<<::Method as EventsNodeProcessor>::Output>: FrameType, - <::Method as EventsNodeProcessor>::Output: Collectable + PushableIndex, + // TODO require these things in general? + ::Output: Debug + Collectable + PushableIndex, + <::Output as TimeBinnableType>::Output: Debug + + TimeBinnableType::Output as TimeBinnableType>::Output> + + Collectable + + Unpin, + Sitemty<::Output>: FrameType + Framable + 'static, + Sitemty<<::Output as TimeBinnableType>::Output>: + FrameType + Framable + DeserializeOwned, { Ok(f.exec(byte_order, shape, event_value_shape, events_node_proc)?) } @@ -238,9 +236,9 @@ impl ChannelExecFunction for PlainEvents { fn exec( self, byte_order: END, - shape: Shape, + _shape: Shape, event_value_shape: EVS, - events_node_proc: ENP, + _events_node_proc: ENP, ) -> Result where NTY: NumOps + NumFromBytes + 'static, @@ -294,16 +292,20 @@ impl PlainEventsJson { } } -pub async fn collect_plain_events_json(stream: S, timeout: Duration) -> Result +pub async fn collect_plain_events_json( + stream: S, + timeout: Duration, + bin_count_exp: u32, +) -> Result where S: Stream> + Unpin, - T: Collectable, + T: Collectable + Debug, { let deadline = tokio::time::Instant::now() + timeout; // TODO in general a Collector does not need to know about the expected number of bins. // It would make more sense for some specific Collector kind to know. // Therefore introduce finer grained types. - let mut collector = ::new_collector(0); + let mut collector = ::new_collector(bin_count_exp); let mut i1 = 0; let mut stream = stream; loop { @@ -333,6 +335,7 @@ where collector.set_range_complete(); } RangeCompletableItem::Data(item) => { + info!("collect_plain_events_json GOT ITEM {:?}", item); collector.ingest(&item); i1 += 1; } @@ -351,41 +354,30 @@ where Ok(ret) } -pub trait PlainEventsAggMethod { - type Method: EventsNodeProcessor; -} - -impl PlainEventsAggMethod for EventValuesDim0Case -where - NTY: NumOps, -{ - type Method = Identity; -} - -impl PlainEventsAggMethod for EventValuesDim1Case -where - NTY: NumOps, -{ - type Method = WavePlainProc; -} - impl ChannelExecFunction for PlainEventsJson { type Output = Pin> + Send>>; fn exec( self, byte_order: END, - shape: Shape, + _shape: Shape, event_value_shape: EVS, _events_node_proc: ENP, ) -> Result where NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, - EVS: EventValueShape + EventValueFromBytes + PlainEventsAggMethod + 'static, + EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Output> + 'static, - Sitemty<<::Method as EventsNodeProcessor>::Output>: FrameType, - <::Method as EventsNodeProcessor>::Output: Collectable + PushableIndex, + // TODO require these things in general? + ::Output: Debug + Collectable + PushableIndex, + <::Output as TimeBinnableType>::Output: Debug + + TimeBinnableType::Output as TimeBinnableType>::Output> + + Collectable + + Unpin, + Sitemty<::Output>: FrameType + Framable + 'static, + Sitemty<<::Output as TimeBinnableType>::Output>: + FrameType + Framable + DeserializeOwned, { let _ = byte_order; let _ = event_value_shape; @@ -395,12 +387,8 @@ impl ChannelExecFunction for PlainEventsJson { range: self.range, agg_kind: self.agg_kind, }; - let s = MergedFromRemotes::<::Method>::new( - evq, - perf_opts, - self.node_config.node_config.cluster, - ); - let f = collect_plain_events_json(s, self.timeout); + let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster); + let f = collect_plain_events_json(s, self.timeout, 0); let f = FutureExt::map(f, |item| match item { Ok(item) => { // TODO add channel entry info here? diff --git a/disk/src/frame/makeframe.rs b/disk/src/frame/makeframe.rs index 4ba539f..7faa120 100644 --- a/disk/src/frame/makeframe.rs +++ b/disk/src/frame/makeframe.rs @@ -2,6 +2,7 @@ use crate::agg::enp::{WaveEvents, XBinnedScalarEvents, XBinnedWaveEvents}; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; +use crate::binned::dim1::MinMaxAvgDim1Bins; use crate::binned::{MinMaxAvgBins, MinMaxAvgWaveBins, NumOps, RangeCompletableItem}; use crate::decode::EventValues; use crate::frame::inmem::InMemoryFrame; @@ -118,6 +119,13 @@ where const FRAME_TYPE_ID: u32 = 0xa00 + NTY::SUB; } +impl FrameType for Sitemty> +where + NTY: SubFrId, +{ + const FRAME_TYPE_ID: u32 = 0xb00 + NTY::SUB; +} + pub trait ProvidesFrameType { fn frame_type_id(&self) -> u32; } @@ -226,6 +234,18 @@ where } } +impl Framable for Sitemty> +where + NTY: NumOps + Serialize, +{ + fn typeid(&self) -> u32 { + Self::FRAME_TYPE_ID + } + fn make_frame(&self) -> Result { + make_frame(self) + } +} + pub fn make_frame(item: &FT) -> Result where FT: FrameType + Serialize, diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 144cd88..5122e88 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -370,7 +370,7 @@ async fn gen_event( let ele_size = 8; let mut vals = vec![0; (ele_size * ele_count) as usize]; for i1 in 0..ele_count { - let v = evix as f64; + let v = (evix as f64) * 100.0 + i1 as f64; let a = if config.byte_order.is_be() { v.to_be_bytes() } else { @@ -393,7 +393,7 @@ async fn gen_event( let ele_size = 2; let mut vals = vec![0; (ele_size * ele_count) as usize]; for i1 in 0..ele_count { - let v = evix as u16; + let v = (evix as u16).wrapping_mul(100).wrapping_add(i1 as u16); let a = if config.byte_order.is_be() { v.to_be_bytes() } else {