From edafc610c211b8193238350d0292e6628dc34491 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 15 Jun 2021 17:19:47 +0200 Subject: [PATCH] WIP --- daqbuffer/src/test/binnedjson.rs | 3 +- disk/src/agg/binnedt.rs | 10 +- disk/src/agg/enp.rs | 55 +++--- disk/src/binned.rs | 279 ++++++++++++++++++++++++++-- disk/src/binned/binnedfrompbv.rs | 8 +- disk/src/binned/dim1.rs | 8 +- disk/src/binned/pbv.rs | 16 +- disk/src/binned/prebinned.rs | 107 ++++++++--- disk/src/channelexec.rs | 133 ++++++++++--- disk/src/frame/makeframe.rs | 64 ++++++- disk/src/merge/mergedfromremotes.rs | 2 + disk/src/raw.rs | 12 +- disk/src/raw/conn.rs | 45 ++--- httpret/src/lib.rs | 8 +- netpod/src/lib.rs | 20 ++ 15 files changed, 613 insertions(+), 157 deletions(-) diff --git a/daqbuffer/src/test/binnedjson.rs b/daqbuffer/src/test/binnedjson.rs index add377f..cbe29f1 100644 --- a/daqbuffer/src/test/binnedjson.rs +++ b/daqbuffer/src/test/binnedjson.rs @@ -1,6 +1,6 @@ use crate::nodes::require_test_hosts_running; use chrono::{DateTime, Utc}; -use disk::binned::query::BinnedQuery; +use disk::binned::query::{BinnedQuery, CacheUsage}; use err::Error; use http::StatusCode; use hyper::Body; @@ -93,6 +93,7 @@ async fn get_binned_json_common( let range = NanoRange::from_date_time(beg_date, end_date); let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind); query.set_timeout(Duration::from_millis(15000)); + query.set_cache_usage(CacheUsage::Ignore); let url = query.url(&HostPort::from_node(node0)); info!("get_binned_json_0 get {}", url); let req = hyper::Request::builder() diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index 7add8a1..1210239 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -38,7 +38,7 @@ where { inp: Pin>, spec: BinnedRange, - bin_count: usize, + x_bin_count: usize, curbin: u32, left: Option>>>, aggtor: Option<::Aggregator>, @@ -56,15 +56,15 @@ where S: Stream> + Send + Unpin + 'static, TBT: TimeBinnableType, { - pub fn new(inp: S, spec: BinnedRange, bin_count: usize) -> Self { + pub fn new(inp: S, spec: BinnedRange, x_bin_count: usize) -> Self { let range = spec.get_range(0); Self { inp: Box::pin(inp), spec, - bin_count, + x_bin_count, curbin: 0, left: None, - aggtor: Some(::aggregator(range, bin_count)), + aggtor: Some(::aggregator(range, x_bin_count)), tmp_agg_results: VecDeque::new(), inp_completed: false, all_bins_emitted: false, @@ -92,7 +92,7 @@ where let range = self.spec.get_range(self.curbin); let ret = self .aggtor - .replace(::aggregator(range, self.bin_count)) + .replace(::aggregator(range, self.x_bin_count)) .unwrap() .result(); // TODO should we accumulate bins before emit? Maybe not, we want to stay responsive. diff --git a/disk/src/agg/enp.rs b/disk/src/agg/enp.rs index e89d1f1..ea999bb 100644 --- a/disk/src/agg/enp.rs +++ b/disk/src/agg/enp.rs @@ -8,7 +8,8 @@ use crate::binned::{ }; use crate::decode::EventValues; use err::Error; -use netpod::{NanoRange, Shape}; +use netpod::log::*; +use netpod::{x_bin_count, AggKind, NanoRange, Shape}; use serde::{Deserialize, Serialize}; use std::marker::PhantomData; use tokio::fs::File; @@ -24,7 +25,7 @@ where type Input = NTY; type Output = EventValues; - fn create(shape: Shape) -> Self { + fn create(_shape: Shape, _agg_kind: AggKind) -> Self { Self { _m1: PhantomData } } @@ -174,7 +175,7 @@ where type Output = MinMaxAvgBins; type Aggregator = XBinnedScalarEventsAggregator; - fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator { + fn aggregator(range: NanoRange, _x_bin_count: usize) -> Self::Aggregator { Self::Aggregator::new(range) } } @@ -368,9 +369,10 @@ where { fn push_index(&mut self, src: &Self, ix: usize) { self.tss.push(src.tss[ix]); - self.mins.push(src.mins[ix]); - self.maxs.push(src.maxs[ix]); - self.avgs.push(src.avgs[ix]); + // TODO not nice. + self.mins.push(src.mins[ix].clone()); + self.maxs.push(src.maxs[ix].clone()); + self.avgs.push(src.avgs[ix].clone()); } } @@ -659,7 +661,7 @@ impl WaveEventsAggregator where NTY: NumOps, { - pub fn new(range: NanoRange, bin_count: usize) -> Self { + pub fn new(range: NanoRange, _x_bin_count: usize) -> Self { Self { range, count: 0, @@ -766,7 +768,7 @@ where type Input = Vec; type Output = XBinnedScalarEvents; - fn create(shape: Shape) -> Self { + fn create(_shape: Shape, _agg_kind: AggKind) -> Self { Self { _m1: PhantomData } } @@ -831,7 +833,8 @@ where } pub struct WaveNBinner { - bin_count: usize, + shape_bin_count: usize, + x_bin_count: usize, _m1: PhantomData, } @@ -842,11 +845,15 @@ where type Input = Vec; type Output = XBinnedWaveEvents; - fn create(shape: Shape) -> Self { + fn create(shape: Shape, agg_kind: AggKind) -> Self { + info!("WaveNBinner::create"); // TODO get rid of panic potential - let bin_count = if let Shape::Wave(n) = shape { n } else { panic!() } as usize; + let shape_bin_count = if let Shape::Wave(n) = shape { n } else { panic!() } as usize; + let x_bin_count = x_bin_count(&shape, &agg_kind); + info!("shape_bin_count {} x_bin_count {}", shape_bin_count, x_bin_count); Self { - bin_count, + shape_bin_count, + x_bin_count, _m1: PhantomData, } } @@ -860,12 +867,12 @@ where avgs: Vec::with_capacity(nev), }; for i1 in 0..nev { - let mut min = vec![NTY::min_or_nan(); self.bin_count]; - let mut max = vec![NTY::max_or_nan(); self.bin_count]; - let mut sum = vec![0f32; self.bin_count]; - let mut sumc = vec![0; self.bin_count]; + let mut min = vec![NTY::min_or_nan(); self.x_bin_count]; + let mut max = vec![NTY::max_or_nan(); self.x_bin_count]; + let mut sum = vec![0f32; self.x_bin_count]; + let mut sumc = vec![0u64; self.x_bin_count]; for (i2, &v) in inp.values[i1].iter().enumerate() { - let i3 = i2 * self.bin_count / inp.values[i1].len(); + let i3 = i2 * self.x_bin_count / self.shape_bin_count; if v < min[i3] { min[i3] = v; } @@ -881,15 +888,9 @@ where ret.mins.push(min); ret.maxs.push(max); let avg = sum - .iter() - .enumerate() - .map(|(i3, &k)| { - if sumc[i3] > 0 { - sum[i3] / sumc[i3] as f32 - } else { - f32::NAN - } - }) + .into_iter() + .zip(sumc.into_iter()) + .map(|(j, k)| if k > 0 { j / k as f32 } else { f32::NAN }) .collect(); ret.avgs.push(avg); } @@ -908,7 +909,7 @@ where type Input = Vec; type Output = WaveEvents; - fn create(shape: Shape) -> Self { + fn create(_shape: Shape, _agg_kind: AggKind) -> Self { Self { _m1: PhantomData } } diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 47f6764..cb77b81 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -7,6 +7,7 @@ use crate::agg::{Fits, FitsInside}; use crate::binned::binnedfrompbv::BinnedFromPreBinned; use crate::binned::query::BinnedQuery; use crate::binnedstream::BoxedStream; +use crate::channelexec::{channel_exec, collect_plain_events_json, ChannelExecFunction, PlainEventsAggMethod}; use crate::decode::{ BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValues, EventValuesDim0Case, EventValuesDim1Case, LittleEndian, NumFromBytes, @@ -15,7 +16,7 @@ use crate::frame::makeframe::{Framable, FrameType, SubFrId}; use crate::merge::mergedfromremotes::MergedFromRemotes; use crate::raw::EventsQuery; use crate::Sitemty; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use chrono::{TimeZone, Utc}; use err::Error; use futures_core::Stream; @@ -23,7 +24,7 @@ use futures_util::{FutureExt, StreamExt}; use netpod::log::*; use netpod::timeunits::SEC; use netpod::{ - AggKind, BinnedRange, ByteOrder, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, + AggKind, BinnedRange, ByteOrder, Channel, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange, ScalarType, Shape, }; use num_traits::{AsPrimitive, Bounded, Float, Zero}; @@ -60,6 +61,7 @@ pub struct BinnedResponseStat { // They also must resolve to the same types, so would be good to unify. fn make_num_pipeline_nty_end_evs_enp_stat( + shape: Shape, event_value_shape: EVS, query: BinnedQuery, node_config: &NodeConfigCached, @@ -100,6 +102,7 @@ where PreBinnedPatchIterator::from_range(pre_range), query.channel().clone(), range.clone(), + shape, query.agg_kind().clone(), query.cache_usage().clone(), node_config, @@ -149,7 +152,10 @@ pub struct BinnedResponseDyn { } fn make_num_pipeline_nty_end_evs_enp( + shape: Shape, + _agg_kind: AggKind, event_value_shape: EVS, + _events_node_proc: ENP, query: BinnedQuery, ppp: PPP, node_config: &NodeConfigCached, @@ -175,7 +181,7 @@ where FrameType + Framable + DeserializeOwned, Sitemty<<::Output as TimeBinnableType>::Output>: ToJsonResult + Framable, { - let res = make_num_pipeline_nty_end_evs_enp_stat::<_, _, _, ENP>(event_value_shape, query, node_config)?; + let res = make_num_pipeline_nty_end_evs_enp_stat::<_, _, _, ENP>(shape, event_value_shape, query, node_config)?; let s = ppp.convert(res.stream, res.bin_count); let ret = BinnedResponseDyn { stream: Box::pin(s) }; Ok(ret) @@ -190,18 +196,108 @@ fn make_num_pipeline_nty_end( where PPP: PipelinePostProcessA, PPP: PipelinePostProcessB>, + NTY: NumOps + NumFromBytes + Serialize + 'static, + END: Endianness + 'static, +{ + let agg_kind = query.agg_kind().clone(); + match shape { + Shape::Scalar => { + let evs = EventValuesDim0Case::new(); + match agg_kind { + AggKind::DimXBins1 => { + let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); + make_num_pipeline_nty_end_evs_enp::<_, NTY, END, _, _>( + shape, + agg_kind, + evs, + events_node_proc, + query, + ppp, + node_config, + ) + } + AggKind::DimXBinsN(_) => { + let events_node_proc = < as EventValueShape>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); + make_num_pipeline_nty_end_evs_enp::<_, NTY, END, _, _>( + shape, + agg_kind, + evs, + events_node_proc, + query, + ppp, + node_config, + ) + } + AggKind::Plain => { + panic!(); + } + } + } + Shape::Wave(n) => { + let evs = EventValuesDim1Case::new(n); + match agg_kind { + AggKind::DimXBins1 => { + let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); + make_num_pipeline_nty_end_evs_enp::( + shape, + agg_kind, + evs, + events_node_proc, + query, + ppp, + node_config, + ) + } + AggKind::DimXBinsN(_) => { + /*let events_node_proc = < as EventValueShape>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); + let yo = make_num_pipeline_nty_end_evs_enp::( + shape, + agg_kind, + evs, + events_node_proc, + query, + ppp, + node_config, + );*/ + err::todoval() + } + AggKind::Plain => { + panic!(); + } + } + } + } +} + +fn make_num_pipeline_nty_end_old( + shape: Shape, + query: BinnedQuery, + ppp: PPP, + node_config: &NodeConfigCached, +) -> Result +where + PPP: PipelinePostProcessA, + PPP: PipelinePostProcessB>, + PPP: PipelinePostProcessB>, NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, { + let agg_kind = query.agg_kind().clone(); match shape { Shape::Scalar => make_num_pipeline_nty_end_evs_enp::<_, NTY, _, _, Identity<_>>( + shape.clone(), + agg_kind.clone(), EventValuesDim0Case::new(), + Identity::create(shape.clone(), agg_kind.clone()), query, ppp, node_config, ), Shape::Wave(n) => make_num_pipeline_nty_end_evs_enp::<_, NTY, _, _, WaveXBinner<_>>( + shape.clone(), + agg_kind.clone(), EventValuesDim1Case::new(n), + WaveXBinner::create(shape.clone(), agg_kind.clone()), query, ppp, node_config, @@ -386,12 +482,6 @@ impl JsonCollector { } } -impl Framable for Sitemty { - fn make_frame(&self) -> Result { - panic!() - } -} - impl ToJsonBytes for serde_json::Value { fn to_json_bytes(&self) -> Result, Error> { Ok(serde_json::to_vec(self)?) @@ -606,11 +696,167 @@ where Ok(ret) } +pub struct BinnedJsonChannelExec { + query: BinnedQuery, + node_config: NodeConfigCached, + timeout: Duration, +} + +impl BinnedJsonChannelExec { + pub fn new(query: BinnedQuery, node_config: NodeConfigCached) -> Self { + Self { + query, + node_config, + timeout: Duration::from_millis(3000), + } + } +} + +impl ChannelExecFunction for BinnedJsonChannelExec { + type Output = Pin> + Send>>; + + fn exec( + self, + byte_order: END, + shape: Shape, + event_value_shape: EVS, + _events_node_proc: ENP, + ) -> Result + where + NTY: NumOps + NumFromBytes + 'static, + END: Endianness + 'static, + EVS: EventValueShape + EventValueFromBytes + PlainEventsAggMethod + 'static, + ENP: EventsNodeProcessor>::Output> + 'static, + Sitemty<<::Method as EventsNodeProcessor>::Output>: FrameType, + <::Method as EventsNodeProcessor>::Output: Collectable + PushableIndex, + <::Output as TimeBinnableType>::Output: + TimeBinnableType::Output as TimeBinnableType>::Output> + Unpin, + // TODO require these things in general? + ::Output: PushableIndex, + Sitemty<::Output>: FrameType + Framable + 'static, + Sitemty<<::Output as TimeBinnableType>::Output>: + FrameType + Framable + DeserializeOwned, + { + let _ = event_value_shape; + let range = + BinnedRange::covering_range(self.query.range().clone(), self.query.bin_count())?.ok_or(Error::with_msg( + format!("binned_bytes_for_http BinnedRange::covering_range returned None"), + ))?; + let perf_opts = PerfOpts { inmem_bufcap: 512 }; + match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) { + Ok(Some(pre_range)) => { + info!("binned_bytes_for_http found pre_range: {:?}", pre_range); + if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { + let msg = format!( + "binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}", + pre_range, range + ); + return Err(Error::with_msg(msg)); + } + let s = BinnedFromPreBinned::<<::Output as TimeBinnableType>::Output>::new( + PreBinnedPatchIterator::from_range(pre_range), + self.query.channel().clone(), + range.clone(), + shape, + self.query.agg_kind().clone(), + self.query.cache_usage().clone(), + &self.node_config, + self.query.disk_stats_every().clone(), + self.query.report_error(), + )? + .map(|item| match item.make_frame() { + Ok(item) => Ok(item.freeze()), + Err(e) => Err(e), + }); + // TODO remove? + /*let ret = BinnedResponseStat { + stream: Box::pin(s), + bin_count: range.count as u32, + };*/ + Ok(Box::pin(s)) + } + Ok(None) => { + info!( + "binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}", + range + ); + let bin_count = range.count as u32; + let evq = EventsQuery { + channel: self.query.channel().clone(), + range: self.query.range().clone(), + agg_kind: self.query.agg_kind().clone(), + }; + let x_bin_count = if let AggKind::DimXBinsN(n) = self.query.agg_kind() { + *n as usize + } else { + 0 + }; + let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); + let s = + TBinnerStream::<_, ::Output>::new(s, range, x_bin_count).map(|item| { + match item.make_frame() { + Ok(item) => Ok(item.freeze()), + Err(e) => Err(e), + } + }); + /*let ret = BinnedResponseStat { + stream: Box::pin(s), + bin_count, + };*/ + Ok(Box::pin(s)) + } + Err(e) => Err(e), + } + + /*let perf_opts = PerfOpts { inmem_bufcap: 4096 }; + let evq = EventsQuery { + channel: self.channel, + range: self.range, + agg_kind: self.agg_kind, + }; + let s = MergedFromRemotes::<::Method>::new( + evq, + perf_opts, + self.node_config.node_config.cluster, + ); + let f = collect_plain_events_json(s, self.timeout); + let f = FutureExt::map(f, |item| match item { + Ok(item) => { + // TODO add channel entry info here? + //let obj = item.as_object_mut().unwrap(); + //obj.insert("channelName", JsonValue::String(en)); + Ok(Bytes::from(serde_json::to_vec(&item)?)) + } + Err(e) => Err(e.into()), + }); + let s = futures_util::stream::once(f); + Ok(Box::pin(s))*/ + } + + fn empty() -> Self::Output { + Box::pin(futures_util::stream::empty()) + } +} + pub async fn binned_json( - node_config: &NodeConfigCached, query: &BinnedQuery, + node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { - let pl = make_num_pipeline( + // TODO try the channelexec approach. + // TODO why does channel_exec need the range, and what does it use it for? + // do I want there the user-requested range or the bin-edge-adjusted range? + // TODO currently, channel_exec resolves NTY, END, EVS but not ENP! + // can I add that or does that break other things? + let ret = channel_exec( + BinnedJsonChannelExec::new(query.clone(), node_config.clone()), + query.channel(), + query.range(), + query.agg_kind().clone(), + node_config, + ) + .await?; + + /*let pl = make_num_pipeline( query, CollectForJson::new(query.timeout(), query.abort_after_bin_count()), node_config, @@ -620,7 +866,8 @@ pub async fn binned_json( let fr = item.to_json_result()?; let buf = fr.to_json_bytes()?; Ok(Bytes::from(buf)) - }); + });*/ + Ok(Box::pin(ret)) } @@ -744,6 +991,7 @@ impl WithTimestamps for MinMaxAvgScalarEventBatch { } pub trait PushableIndex { + // TODO check whether it makes sense to allow a move out of src. Or use a deque for src type and pop? fn push_index(&mut self, src: &Self, ix: usize); } @@ -781,7 +1029,6 @@ pub trait NumOps: fn is_nan(&self) -> bool; } -fn tmp() {} macro_rules! impl_num_ops { ($ty:ident, $min_or_nan:ident, $max_or_nan:ident, $is_nan:ident) => { impl NumOps for $ty { @@ -798,7 +1045,7 @@ macro_rules! impl_num_ops { }; } -fn is_nan_int(x: &T) -> bool { +fn is_nan_int(_x: &T) -> bool { false } @@ -826,7 +1073,7 @@ pub trait EventsDecoder { pub trait EventsNodeProcessor: Send + Unpin { type Input; type Output: Send + Unpin + DeserializeOwned + WithTimestamps + TimeBinnableType; - fn create(shape: Shape) -> Self; + fn create(shape: Shape, agg_kind: AggKind) -> Self; fn process(&self, inp: EventValues) -> Self::Output; } @@ -993,7 +1240,7 @@ where type Output = MinMaxAvgBins; type Aggregator = MinMaxAvgBinsAggregator; - fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator { + fn aggregator(range: NanoRange, _x_bin_count: usize) -> Self::Aggregator { Self::Aggregator::new(range) } } diff --git a/disk/src/binned/binnedfrompbv.rs b/disk/src/binned/binnedfrompbv.rs index fff0409..58db0a3 100644 --- a/disk/src/binned/binnedfrompbv.rs +++ b/disk/src/binned/binnedfrompbv.rs @@ -11,7 +11,9 @@ use futures_core::Stream; use futures_util::{FutureExt, StreamExt}; use http::{StatusCode, Uri}; use netpod::log::*; -use netpod::{AggKind, BinnedRange, ByteSize, Channel, NodeConfigCached, PerfOpts, PreBinnedPatchIterator}; +use netpod::{ + x_bin_count, AggKind, BinnedRange, ByteSize, Channel, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, Shape, +}; use serde::de::DeserializeOwned; use std::future::ready; use std::marker::PhantomData; @@ -165,6 +167,7 @@ where patch_it: PreBinnedPatchIterator, channel: Channel, range: BinnedRange, + shape: Shape, agg_kind: AggKind, cache_usage: CacheUsage, node_config: &NodeConfigCached, @@ -184,6 +187,7 @@ where let pmax = patches.len(); let inp = futures_util::stream::iter(patches.into_iter().enumerate()) .map({ + let agg_kind = agg_kind.clone(); let node_config = node_config.clone(); move |(pix, patch)| { let query = PreBinnedQuery::new( @@ -235,7 +239,7 @@ where ready(g) } }); - let inp = TBinnerStream::<_, TBT>::new(inp, range); + let inp = TBinnerStream::<_, TBT>::new(inp, range, x_bin_count(&shape, &agg_kind)); Ok(Self { inp: Box::pin(inp), _m1: PhantomData, diff --git a/disk/src/binned/dim1.rs b/disk/src/binned/dim1.rs index 5fd0537..8da6429 100644 --- a/disk/src/binned/dim1.rs +++ b/disk/src/binned/dim1.rs @@ -8,6 +8,7 @@ use crate::binned::{ use crate::Sitemty; use chrono::{TimeZone, Utc}; use err::Error; +use netpod::log::*; use netpod::timeunits::SEC; use netpod::NanoRange; use num_traits::Zero; @@ -173,8 +174,8 @@ where type Output = MinMaxAvgDim1Bins; type Aggregator = MinMaxAvgDim1BinsAggregator; - fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator { - Self::Aggregator::new(range) + fn aggregator(range: NanoRange, x_bin_count: usize) -> Self::Aggregator { + Self::Aggregator::new(range, x_bin_count) } } @@ -317,7 +318,7 @@ pub struct MinMaxAvgDim1BinsAggregator { } impl MinMaxAvgDim1BinsAggregator { - pub fn new(range: NanoRange, bin_count: usize) -> Self { + pub fn new(range: NanoRange, _x_bin_count: usize) -> Self { Self { range, count: 0, @@ -439,6 +440,7 @@ pub struct WaveEventsCollector { impl WaveEventsCollector { pub fn new(_bin_count_exp: u32) -> Self { + info!("\n\nWaveEventsCollector\n\n"); Self { vals: WaveEvents::empty(), range_complete: false, diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index 8599e79..c67b2a0 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -14,7 +14,9 @@ use err::Error; use futures_core::Stream; use futures_util::{FutureExt, StreamExt}; use netpod::log::*; -use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange}; +use netpod::{ + x_bin_count, AggKind, BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange, Shape, +}; use serde::de::DeserializeOwned; use serde::Serialize; use std::future::Future; @@ -33,6 +35,8 @@ where ENP: EventsNodeProcessor>::Output>, { query: PreBinnedQuery, + shape: Shape, + agg_kind: AggKind, node_config: NodeConfigCached, open_check_local_file: Option> + Send>>>, fut2: Option< @@ -73,9 +77,11 @@ where // TODO who exactly needs this DeserializeOwned? Sitemty<<::Output as TimeBinnableType>::Output>: FrameType + DeserializeOwned, { - pub fn new(query: PreBinnedQuery, node_config: &NodeConfigCached) -> Self { + pub fn new(query: PreBinnedQuery, shape: Shape, agg_kind: AggKind, node_config: &NodeConfigCached) -> Self { Self { query, + shape, + agg_kind, node_config: node_config.clone(), open_check_local_file: None, fut2: None, @@ -124,7 +130,11 @@ where .ok_or(Error::with_msg("covering_range returns None"))?; let perf_opts = PerfOpts { inmem_bufcap: 512 }; let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); - let ret = TBinnerStream::<_, ::Output>::new(s, range); + let ret = TBinnerStream::<_, ::Output>::new( + s, + range, + x_bin_count(&self.shape, &self.agg_kind), + ); Ok(Box::pin(ret)) } diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index 02975b1..a239e21 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -1,5 +1,4 @@ use crate::agg::binnedt::TimeBinnableType; -use crate::agg::enp::{Identity, WaveXBinner}; use crate::agg::streams::Appendable; use crate::binned::pbv::PreBinnedValueStream; use crate::binned::query::PreBinnedQuery; @@ -15,14 +14,17 @@ use bytes::Bytes; use err::Error; use futures_core::Stream; use futures_util::StreamExt; -use netpod::{ByteOrder, NodeConfigCached, ScalarType, Shape}; +use netpod::{AggKind, ByteOrder, NodeConfigCached, ScalarType, Shape}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; use serde::de::DeserializeOwned; use serde::Serialize; use std::pin::Pin; fn make_num_pipeline_nty_end_evs_enp( + shape: Shape, + agg_kind: AggKind, _event_value_shape: EVS, + _events_node_proc: ENP, query: PreBinnedQuery, node_config: &NodeConfigCached, ) -> Pin> + Send>> @@ -36,13 +38,14 @@ where Sitemty<<::Output as TimeBinnableType>::Output>: Framable + FrameType + DeserializeOwned, { - let ret = PreBinnedValueStream::::new(query, node_config); + let ret = PreBinnedValueStream::::new(query, shape, agg_kind, node_config); let ret = StreamExt::map(ret, |item| Box::new(item) as Box); Box::pin(ret) } fn make_num_pipeline_nty_end( shape: Shape, + agg_kind: AggKind, query: PreBinnedQuery, node_config: &NodeConfigCached, ) -> Pin> + Send>> @@ -51,24 +54,74 @@ where END: Endianness + 'static, { match shape { - Shape::Scalar => make_num_pipeline_nty_end_evs_enp::>( - EventValuesDim0Case::new(), - query, - node_config, - ), - Shape::Wave(n) => make_num_pipeline_nty_end_evs_enp::>( - EventValuesDim1Case::new(n), - query, - node_config, - ), + Shape::Scalar => { + let evs = EventValuesDim0Case::new(); + match agg_kind { + AggKind::DimXBins1 => { + let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); + make_num_pipeline_nty_end_evs_enp::( + shape, + agg_kind, + evs, + events_node_proc, + query, + node_config, + ) + } + AggKind::DimXBinsN(_) => { + let events_node_proc = < as EventValueShape>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); + make_num_pipeline_nty_end_evs_enp::( + shape, + agg_kind, + evs, + events_node_proc, + query, + node_config, + ) + } + AggKind::Plain => { + panic!(); + } + } + } + Shape::Wave(n) => { + let evs = EventValuesDim1Case::new(n); + match agg_kind { + AggKind::DimXBins1 => { + let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); + make_num_pipeline_nty_end_evs_enp::( + shape, + agg_kind, + evs, + events_node_proc, + query, + node_config, + ) + } + AggKind::DimXBinsN(_) => { + let events_node_proc = < as EventValueShape>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); + make_num_pipeline_nty_end_evs_enp::( + shape, + agg_kind, + evs, + events_node_proc, + query, + node_config, + ) + } + AggKind::Plain => { + panic!(); + } + } + } } } macro_rules! match_end { - ($nty:ident, $end:expr, $shape:expr, $query:expr, $node_config:expr) => { + ($nty:ident, $end:expr, $shape:expr, $agg_kind:expr, $query:expr, $node_config:expr) => { match $end { - ByteOrder::LE => make_num_pipeline_nty_end::<$nty, LittleEndian>($shape, $query, $node_config), - ByteOrder::BE => make_num_pipeline_nty_end::<$nty, BigEndian>($shape, $query, $node_config), + ByteOrder::LE => make_num_pipeline_nty_end::<$nty, LittleEndian>($shape, $agg_kind, $query, $node_config), + ByteOrder::BE => make_num_pipeline_nty_end::<$nty, BigEndian>($shape, $agg_kind, $query, $node_config), } }; } @@ -77,20 +130,21 @@ fn make_num_pipeline( scalar_type: ScalarType, byte_order: ByteOrder, shape: Shape, + agg_kind: AggKind, query: PreBinnedQuery, node_config: &NodeConfigCached, ) -> Pin> + Send>> { match scalar_type { - ScalarType::U8 => match_end!(u8, byte_order, shape, query, node_config), - ScalarType::U16 => match_end!(u16, byte_order, shape, query, node_config), - ScalarType::U32 => match_end!(u32, byte_order, shape, query, node_config), - ScalarType::U64 => match_end!(u64, byte_order, shape, query, node_config), - ScalarType::I8 => match_end!(i8, byte_order, shape, query, node_config), - ScalarType::I16 => match_end!(i16, byte_order, shape, query, node_config), - ScalarType::I32 => match_end!(i32, byte_order, shape, query, node_config), - ScalarType::I64 => match_end!(i64, byte_order, shape, query, node_config), - ScalarType::F32 => match_end!(f32, byte_order, shape, query, node_config), - ScalarType::F64 => match_end!(f64, byte_order, shape, query, node_config), + ScalarType::U8 => match_end!(u8, byte_order, shape, agg_kind, query, node_config), + ScalarType::U16 => match_end!(u16, byte_order, shape, agg_kind, query, node_config), + ScalarType::U32 => match_end!(u32, byte_order, shape, agg_kind, query, node_config), + ScalarType::U64 => match_end!(u64, byte_order, shape, agg_kind, query, node_config), + ScalarType::I8 => match_end!(i8, byte_order, shape, agg_kind, query, node_config), + ScalarType::I16 => match_end!(i16, byte_order, shape, agg_kind, query, node_config), + ScalarType::I32 => match_end!(i32, byte_order, shape, agg_kind, query, node_config), + ScalarType::I64 => match_end!(i64, byte_order, shape, agg_kind, query, node_config), + ScalarType::F32 => match_end!(f32, byte_order, shape, agg_kind, query, node_config), + ScalarType::F64 => match_end!(f64, byte_order, shape, agg_kind, query, node_config), } } @@ -137,6 +191,7 @@ pub async fn pre_binned_bytes_for_http( entry.scalar_type.clone(), entry.byte_order.clone(), entry.to_shape()?, + query.agg_kind().clone(), query.clone(), node_config, ) diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index 95d41d8..6d70709 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -1,3 +1,4 @@ +use crate::agg::binnedt::TimeBinnableType; use crate::agg::enp::{Identity, WavePlainProc}; use crate::agg::streams::{Collectable, Collector, StreamItem}; use crate::binned::{EventsNodeProcessor, NumOps, PushableIndex, RangeCompletableItem}; @@ -16,6 +17,7 @@ use futures_util::future::FutureExt; use futures_util::StreamExt; use netpod::{AggKind, ByteOrder, Channel, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; +use serde::de::DeserializeOwned; use serde_json::Value as JsonValue; use std::pin::Pin; use std::time::Duration; @@ -24,36 +26,62 @@ use tokio::time::timeout_at; pub trait ChannelExecFunction { type Output; - fn exec(self, byte_order: END, event_value_shape: EVS) -> Result + fn exec( + self, + byte_order: END, + shape: Shape, + event_value_shape: EVS, + events_node_proc: ENP, + ) -> Result where NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + PlainEventsAggMethod + 'static, - EventValues: Collectable, + ENP: EventsNodeProcessor>::Output> + 'static, Sitemty<<::Method as EventsNodeProcessor>::Output>: FrameType, - <::Method as EventsNodeProcessor>::Output: Collectable + PushableIndex; + <::Method as EventsNodeProcessor>::Output: Collectable + PushableIndex, + <::Output as TimeBinnableType>::Output: + TimeBinnableType::Output as TimeBinnableType>::Output> + Unpin, + // TODO require these things in general? + ::Output: PushableIndex, + Sitemty<::Output>: FrameType + Framable + 'static, + Sitemty<<::Output as TimeBinnableType>::Output>: + FrameType + Framable + DeserializeOwned; fn empty() -> Self::Output; } -fn channel_exec_nty_end_evs_enp( +fn channel_exec_nty_end_evs_enp( f: F, byte_order: END, + shape: Shape, event_value_shape: EVS, + events_node_proc: ENP, ) -> Result where F: ChannelExecFunction, NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, + + // TODO + + // TODO + + // TODO + + // TODO + + // Can I replace the PlainEventsAggMethod by EventsNodeProcessor? EVS: EventValueShape + EventValueFromBytes + PlainEventsAggMethod + 'static, - EventValues: Collectable, + + ENP: EventsNodeProcessor>::Output> + 'static, Sitemty<<::Method as EventsNodeProcessor>::Output>: FrameType, <::Method as EventsNodeProcessor>::Output: Collectable + PushableIndex, { - Ok(f.exec::(byte_order, event_value_shape)?) + Ok(f.exec(byte_order, shape, event_value_shape, events_node_proc)?) } -fn channel_exec_nty_end(f: F, byte_order: END, shape: Shape) -> Result +fn channel_exec_nty_end(f: F, byte_order: END, shape: Shape, agg_kind: AggKind) -> Result where F: ChannelExecFunction, NTY: NumOps + NumFromBytes + 'static, @@ -61,16 +89,54 @@ where EventValues: Collectable, { match shape { - Shape::Scalar => channel_exec_nty_end_evs_enp::<_, NTY, _, _>(f, byte_order, EventValuesDim0Case::new()), - Shape::Wave(n) => channel_exec_nty_end_evs_enp::<_, NTY, _, _>(f, byte_order, EventValuesDim1Case::new(n)), + Shape::Scalar => { + // + match agg_kind { + AggKind::Plain => { + let evs = EventValuesDim0Case::new(); + let events_node_proc = < as EventValueShape>::NumXAggPlain as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); + channel_exec_nty_end_evs_enp(f, byte_order, shape, evs, events_node_proc) + } + AggKind::DimXBins1 => { + let evs = EventValuesDim0Case::new(); + let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); + channel_exec_nty_end_evs_enp(f, byte_order, shape, evs, events_node_proc) + } + AggKind::DimXBinsN(_) => { + let evs = EventValuesDim0Case::new(); + let events_node_proc = < as EventValueShape>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); + channel_exec_nty_end_evs_enp(f, byte_order, shape, evs, events_node_proc) + } + } + } + Shape::Wave(n) => { + // + match agg_kind { + AggKind::Plain => { + let evs = EventValuesDim1Case::new(n); + let events_node_proc = < as EventValueShape>::NumXAggPlain as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); + channel_exec_nty_end_evs_enp(f, byte_order, shape, evs, events_node_proc) + } + AggKind::DimXBins1 => { + let evs = EventValuesDim1Case::new(n); + let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); + channel_exec_nty_end_evs_enp(f, byte_order, shape, evs, events_node_proc) + } + AggKind::DimXBinsN(_) => { + let evs = EventValuesDim1Case::new(n); + let events_node_proc = < as EventValueShape>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); + channel_exec_nty_end_evs_enp(f, byte_order, shape, evs, events_node_proc) + } + } + } } } macro_rules! match_end { - ($f:expr, $nty:ident, $end:expr, $shape:expr, $node_config:expr) => { + ($f:expr, $nty:ident, $end:expr, $shape:expr, $agg_kind:expr, $node_config:expr) => { match $end { - ByteOrder::LE => channel_exec_nty_end::<_, $nty, _>($f, LittleEndian {}, $shape), - ByteOrder::BE => channel_exec_nty_end::<_, $nty, _>($f, BigEndian {}, $shape), + ByteOrder::LE => channel_exec_nty_end::<_, $nty, _>($f, LittleEndian {}, $shape, $agg_kind), + ByteOrder::BE => channel_exec_nty_end::<_, $nty, _>($f, BigEndian {}, $shape, $agg_kind), } }; } @@ -80,22 +146,23 @@ fn channel_exec_config( scalar_type: ScalarType, byte_order: ByteOrder, shape: Shape, + agg_kind: AggKind, _node_config: &NodeConfigCached, ) -> Result where F: ChannelExecFunction, { match scalar_type { - ScalarType::U8 => match_end!(f, u8, byte_order, shape, node_config), - ScalarType::U16 => match_end!(f, u16, byte_order, shape, node_config), - ScalarType::U32 => match_end!(f, u32, byte_order, shape, node_config), - ScalarType::U64 => match_end!(f, u64, byte_order, shape, node_config), - ScalarType::I8 => match_end!(f, i8, byte_order, shape, node_config), - ScalarType::I16 => match_end!(f, i16, byte_order, shape, node_config), - ScalarType::I32 => match_end!(f, i32, byte_order, shape, node_config), - ScalarType::I64 => match_end!(f, i64, byte_order, shape, node_config), - ScalarType::F32 => match_end!(f, f32, byte_order, shape, node_config), - ScalarType::F64 => match_end!(f, f64, byte_order, shape, node_config), + ScalarType::U8 => match_end!(f, u8, byte_order, shape, agg_kind, node_config), + ScalarType::U16 => match_end!(f, u16, byte_order, shape, agg_kind, node_config), + ScalarType::U32 => match_end!(f, u32, byte_order, shape, agg_kind, node_config), + ScalarType::U64 => match_end!(f, u64, byte_order, shape, agg_kind, node_config), + ScalarType::I8 => match_end!(f, i8, byte_order, shape, agg_kind, node_config), + ScalarType::I16 => match_end!(f, i16, byte_order, shape, agg_kind, node_config), + ScalarType::I32 => match_end!(f, i32, byte_order, shape, agg_kind, node_config), + ScalarType::I64 => match_end!(f, i64, byte_order, shape, agg_kind, node_config), + ScalarType::F32 => match_end!(f, f32, byte_order, shape, agg_kind, node_config), + ScalarType::F64 => match_end!(f, f64, byte_order, shape, agg_kind, node_config), } } @@ -103,6 +170,7 @@ pub async fn channel_exec( f: F, channel: &Channel, range: &NanoRange, + agg_kind: AggKind, node_config: &NodeConfigCached, ) -> Result where @@ -130,6 +198,7 @@ where entry.scalar_type.clone(), entry.byte_order.clone(), entry.to_shape()?, + agg_kind, node_config, )?; Ok(ret) @@ -166,12 +235,18 @@ impl PlainEvents { impl ChannelExecFunction for PlainEvents { type Output = Pin> + Send>>; - fn exec(self, byte_order: END, event_value_shape: EVS) -> Result + fn exec( + self, + byte_order: END, + shape: Shape, + event_value_shape: EVS, + events_node_proc: ENP, + ) -> Result where NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, - EventValues: Collectable, + ENP: EventsNodeProcessor>::Output> + 'static, { let _ = byte_order; let _ = event_value_shape; @@ -297,12 +372,18 @@ where impl ChannelExecFunction for PlainEventsJson { type Output = Pin> + Send>>; - fn exec(self, byte_order: END, event_value_shape: EVS) -> Result + fn exec( + self, + byte_order: END, + shape: Shape, + event_value_shape: EVS, + _events_node_proc: ENP, + ) -> Result where NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + PlainEventsAggMethod + 'static, - EventValues: Collectable, + ENP: EventsNodeProcessor>::Output> + 'static, Sitemty<<::Method as EventsNodeProcessor>::Output>: FrameType, <::Method as EventsNodeProcessor>::Output: Collectable + PushableIndex, { diff --git a/disk/src/frame/makeframe.rs b/disk/src/frame/makeframe.rs index fd5b1c3..4ba539f 100644 --- a/disk/src/frame/makeframe.rs +++ b/disk/src/frame/makeframe.rs @@ -2,7 +2,7 @@ use crate::agg::enp::{WaveEvents, XBinnedScalarEvents, XBinnedWaveEvents}; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; -use crate::binned::{MinMaxAvgBins, NumOps, RangeCompletableItem}; +use crate::binned::{MinMaxAvgBins, MinMaxAvgWaveBins, NumOps, RangeCompletableItem}; use crate::decode::EventValues; use crate::frame::inmem::InMemoryFrame; use crate::raw::EventQueryJsonStringFrame; @@ -108,7 +108,14 @@ impl FrameType for Sitemty> where NTY: SubFrId, { - const FRAME_TYPE_ID: u32 = 0x800 + NTY::SUB; + const FRAME_TYPE_ID: u32 = 0x900 + NTY::SUB; +} + +impl FrameType for Sitemty> +where + NTY: SubFrId, +{ + const FRAME_TYPE_ID: u32 = 0xa00 + NTY::SUB; } pub trait ProvidesFrameType { @@ -116,16 +123,32 @@ pub trait ProvidesFrameType { } pub trait Framable: Send { + fn typeid(&self) -> u32; fn make_frame(&self) -> Result; } +impl Framable for Sitemty { + fn typeid(&self) -> u32 { + EventQueryJsonStringFrame::FRAME_TYPE_ID + } + fn make_frame(&self) -> Result { + panic!() + } +} + impl Framable for Result>, Error> { + fn typeid(&self) -> u32 { + Self::FRAME_TYPE_ID + } fn make_frame(&self) -> Result { make_frame(self) } } impl Framable for Result>, Error> { + fn typeid(&self) -> u32 { + Self::FRAME_TYPE_ID + } fn make_frame(&self) -> Result { make_frame(self) } @@ -135,6 +158,9 @@ impl Framable for Result>> where NTY: NumOps + Serialize, { + fn typeid(&self) -> u32 { + Self::FRAME_TYPE_ID + } fn make_frame(&self) -> Result { make_frame(self) } @@ -144,6 +170,9 @@ impl Framable for Result u32 { + Self::FRAME_TYPE_ID + } fn make_frame(&self) -> Result { make_frame(self) } @@ -153,6 +182,9 @@ impl Framable for Sitemty> where NTY: NumOps + Serialize, { + fn typeid(&self) -> u32 { + Self::FRAME_TYPE_ID + } fn make_frame(&self) -> Result { make_frame(self) } @@ -162,6 +194,9 @@ impl Framable for Sitemty> where NTY: NumOps + Serialize, { + fn typeid(&self) -> u32 { + Self::FRAME_TYPE_ID + } fn make_frame(&self) -> Result { make_frame(self) } @@ -171,6 +206,21 @@ impl Framable for Sitemty> where NTY: NumOps + Serialize, { + fn typeid(&self) -> u32 { + Self::FRAME_TYPE_ID + } + fn make_frame(&self) -> Result { + make_frame(self) + } +} + +impl Framable for Sitemty> +where + NTY: NumOps + Serialize, +{ + fn typeid(&self) -> u32 { + Self::FRAME_TYPE_ID + } fn make_frame(&self) -> Result { make_frame(self) } @@ -249,3 +299,13 @@ where Err(e) => Err(e.into()), } } + +pub fn crchex(t: T) -> String +where + T: AsRef<[u8]>, +{ + let mut h = crc32fast::Hasher::new(); + h.update(t.as_ref()); + let crc = h.finalize(); + format!("{:08x}", crc) +} diff --git a/disk/src/merge/mergedfromremotes.rs b/disk/src/merge/mergedfromremotes.rs index b7b8832..ef2e8d1 100644 --- a/disk/src/merge/mergedfromremotes.rs +++ b/disk/src/merge/mergedfromremotes.rs @@ -7,6 +7,7 @@ use crate::Sitemty; use err::Error; use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; +use netpod::log::*; use netpod::{Cluster, PerfOpts}; use std::future::Future; use std::pin::Pin; @@ -34,6 +35,7 @@ where Sitemty<::Output>: FrameType, { pub fn new(evq: EventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self { + info!("MergedFromRemotes evq {:?}", evq); let mut tcp_establish_futs = vec![]; for node in &cluster.nodes { let f = x_processed_stream_from_node::(evq.clone(), perf_opts.clone(), node.clone()); diff --git a/disk/src/raw.rs b/disk/src/raw.rs index 7ad613b..3ea1bcf 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -13,6 +13,7 @@ use crate::raw::eventsfromframes::EventsFromFrames; use crate::Sitemty; use err::Error; use futures_core::Stream; +use netpod::log::*; use netpod::{AggKind, Channel, NanoRange, Node, PerfOpts}; use serde::{Deserialize, Serialize}; use std::pin::Pin; @@ -47,6 +48,7 @@ where { let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; let qjs = serde_json::to_string(&query)?; + info!("x_processed_stream_from_node qjs {:?}", qjs); let (netin, mut netout) = net.into_split(); let buf = make_frame(&EventQueryJsonStringFrame(qjs))?; netout.write_all(&buf).await?; @@ -58,13 +60,3 @@ where let items = EventsFromFrames::new(frames); Ok(Box::pin(items)) } - -pub fn crchex(t: T) -> String -where - T: AsRef<[u8]>, -{ - let mut h = crc32fast::Hasher::new(); - h.update(t.as_ref()); - let crc = h.finalize(); - format!("{:08x}", crc) -} diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index d76700a..e88f48c 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -105,12 +105,12 @@ where NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Output>, + ENP: EventsNodeProcessor>::Output> + 'static, Sitemty<::Output>: Framable + 'static, ::Output: 'static, { let decs = EventsDecodedStream::::new(event_value_shape, event_blobs); - let s2 = StreamExt::map(decs, |item| match item { + let s2 = StreamExt::map(decs, move |item| match item { Ok(item) => match item { StreamItem::DataItem(item) => match item { RangeCompletableItem::Data(item) => { @@ -131,40 +131,19 @@ where macro_rules! pipe4 { ($nty:ident, $end:ident, $shape:expr, $evs:ident, $evsv:expr, $agg_kind:expr, $event_blobs:expr) => { match $agg_kind { - AggKind::DimXBins1 => make_num_pipeline_stream_evs::< - $nty, - $end, - $evs<$nty>, - //<$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToSingleBin, - _, - //Identity<$nty>, - >( + AggKind::DimXBins1 => make_num_pipeline_stream_evs::<$nty, $end, $evs<$nty>, _>( $evsv, - <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToSingleBin::create($shape), + <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToSingleBin::create($shape, $agg_kind), $event_blobs, ), - AggKind::DimXBinsN(_) => make_num_pipeline_stream_evs::< - $nty, - $end, - $evs<$nty>, - //<$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToNBins, - _, - //WaveXBinner<$nty>, - >( + AggKind::DimXBinsN(_) => make_num_pipeline_stream_evs::<$nty, $end, $evs<$nty>, _>( $evsv, - <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToNBins::create($shape), + <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToNBins::create($shape, $agg_kind), $event_blobs, ), - AggKind::Plain => make_num_pipeline_stream_evs::< - $nty, - $end, - $evs<$nty>, - //<$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggPlain, - _, - //WaveXBinner<$nty>, - >( + AggKind::Plain => make_num_pipeline_stream_evs::<$nty, $end, $evs<$nty>, _>( $evsv, - <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggPlain::create($shape), + <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggPlain::create($shape, $agg_kind), $event_blobs, ), } @@ -186,9 +165,6 @@ macro_rules! pipe3 { ) } Shape::Wave(n) => { - // TODO - // Issue is that I try to generate too many combinations. - // e.g. I try to generic code for the combination of Shape::Scalar with WaveXBinner which does not match. pipe4!( $nty, $end, @@ -322,8 +298,13 @@ async fn events_conn_handler_inner_try( event_chunker_conf, ); let shape = entry.to_shape().unwrap(); + info!( + "+++++--- conn.rs call pipe1 shape {:?} agg_kind {:?}", + shape, evq.agg_kind + ); let mut p1 = pipe1!(entry.scalar_type, entry.byte_order, shape, evq.agg_kind, event_blobs); while let Some(item) = p1.next().await { + //info!("conn.rs encode frame typeid {:x}", item.typeid()); let item = item.make_frame(); match item { Ok(buf) => match netout.write_all(&buf).await { diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index e209972..06ce426 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -13,7 +13,7 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{server::Server, Body, Request, Response}; use net::SocketAddr; use netpod::log::*; -use netpod::{Channel, NodeConfigCached}; +use netpod::{AggKind, Channel, NodeConfigCached}; use panic::{AssertUnwindSafe, UnwindSafe}; use pin::Pin; use serde::{Deserialize, Serialize}; @@ -351,7 +351,7 @@ async fn binned_binary(query: BinnedQuery, node_config: &NodeConfigCached) -> Re } async fn binned_json(query: BinnedQuery, node_config: &NodeConfigCached) -> Result, Error> { - let ret = match disk::binned::binned_json(node_config, &query).await { + let ret = match disk::binned::binned_json(&query, node_config).await { Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("binned_json")))?, Err(e) => { if query.report_error() { @@ -412,7 +412,7 @@ async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) let (head, _body) = req.into_parts(); let query = PlainEventsQuery::from_request(&head)?; let op = disk::channelexec::PlainEvents::new(query.channel().clone(), query.range().clone(), node_config.clone()); - let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), node_config).await?; + let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?; let s = s.map(|item| item.make_frame()); let ret = response(StatusCode::OK).body(BodyStream::wrapped(s, format!("plain_events")))?; Ok(ret) @@ -427,7 +427,7 @@ async fn plain_events_json(req: Request, node_config: &NodeConfigCached) - query.timeout(), node_config.clone(), ); - let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), node_config).await?; + let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?; let ret = response(StatusCode::OK).body(BodyStream::wrapped(s, format!("plain_events")))?; Ok(ret) } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 82234a7..a0423d2 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -661,6 +661,26 @@ pub enum AggKind { Plain, } +pub fn x_bin_count(shape: &Shape, agg_kind: &AggKind) -> usize { + match agg_kind { + AggKind::DimXBins1 => 0, + AggKind::DimXBinsN(n) => { + if *n == 0 { + match shape { + Shape::Scalar => 0, + Shape::Wave(n) => *n as usize, + } + } else { + *n as usize + } + } + AggKind::Plain => match shape { + Shape::Scalar => 0, + Shape::Wave(n) => *n as usize, + }, + } +} + impl Display for AggKind { fn fmt(&self, fmt: &mut Formatter) -> std::fmt::Result { match self {