This commit is contained in:
Dominik Werder
2021-06-15 17:19:47 +02:00
parent bebce14f56
commit edafc610c2
15 changed files with 613 additions and 157 deletions

View File

@@ -1,6 +1,6 @@
use crate::nodes::require_test_hosts_running;
use chrono::{DateTime, Utc};
use disk::binned::query::BinnedQuery;
use disk::binned::query::{BinnedQuery, CacheUsage};
use err::Error;
use http::StatusCode;
use hyper::Body;
@@ -93,6 +93,7 @@ async fn get_binned_json_common(
let range = NanoRange::from_date_time(beg_date, end_date);
let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind);
query.set_timeout(Duration::from_millis(15000));
query.set_cache_usage(CacheUsage::Ignore);
let url = query.url(&HostPort::from_node(node0));
info!("get_binned_json_0 get {}", url);
let req = hyper::Request::builder()

View File

@@ -38,7 +38,7 @@ where
{
inp: Pin<Box<S>>,
spec: BinnedRange,
bin_count: usize,
x_bin_count: usize,
curbin: u32,
left: Option<Poll<Option<Sitemty<TBT>>>>,
aggtor: Option<<TBT as TimeBinnableType>::Aggregator>,
@@ -56,15 +56,15 @@ where
S: Stream<Item = Sitemty<TBT>> + Send + Unpin + 'static,
TBT: TimeBinnableType,
{
pub fn new(inp: S, spec: BinnedRange, bin_count: usize) -> Self {
pub fn new(inp: S, spec: BinnedRange, x_bin_count: usize) -> Self {
let range = spec.get_range(0);
Self {
inp: Box::pin(inp),
spec,
bin_count,
x_bin_count,
curbin: 0,
left: None,
aggtor: Some(<TBT as TimeBinnableType>::aggregator(range, bin_count)),
aggtor: Some(<TBT as TimeBinnableType>::aggregator(range, x_bin_count)),
tmp_agg_results: VecDeque::new(),
inp_completed: false,
all_bins_emitted: false,
@@ -92,7 +92,7 @@ where
let range = self.spec.get_range(self.curbin);
let ret = self
.aggtor
.replace(<TBT as TimeBinnableType>::aggregator(range, self.bin_count))
.replace(<TBT as TimeBinnableType>::aggregator(range, self.x_bin_count))
.unwrap()
.result();
// TODO should we accumulate bins before emit? Maybe not, we want to stay responsive.

View File

@@ -8,7 +8,8 @@ use crate::binned::{
};
use crate::decode::EventValues;
use err::Error;
use netpod::{NanoRange, Shape};
use netpod::log::*;
use netpod::{x_bin_count, AggKind, NanoRange, Shape};
use serde::{Deserialize, Serialize};
use std::marker::PhantomData;
use tokio::fs::File;
@@ -24,7 +25,7 @@ where
type Input = NTY;
type Output = EventValues<NTY>;
fn create(shape: Shape) -> Self {
fn create(_shape: Shape, _agg_kind: AggKind) -> Self {
Self { _m1: PhantomData }
}
@@ -174,7 +175,7 @@ where
type Output = MinMaxAvgBins<NTY>;
type Aggregator = XBinnedScalarEventsAggregator<NTY>;
fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator {
fn aggregator(range: NanoRange, _x_bin_count: usize) -> Self::Aggregator {
Self::Aggregator::new(range)
}
}
@@ -368,9 +369,10 @@ where
{
fn push_index(&mut self, src: &Self, ix: usize) {
self.tss.push(src.tss[ix]);
self.mins.push(src.mins[ix]);
self.maxs.push(src.maxs[ix]);
self.avgs.push(src.avgs[ix]);
// TODO not nice.
self.mins.push(src.mins[ix].clone());
self.maxs.push(src.maxs[ix].clone());
self.avgs.push(src.avgs[ix].clone());
}
}
@@ -659,7 +661,7 @@ impl<NTY> WaveEventsAggregator<NTY>
where
NTY: NumOps,
{
pub fn new(range: NanoRange, bin_count: usize) -> Self {
pub fn new(range: NanoRange, _x_bin_count: usize) -> Self {
Self {
range,
count: 0,
@@ -766,7 +768,7 @@ where
type Input = Vec<NTY>;
type Output = XBinnedScalarEvents<NTY>;
fn create(shape: Shape) -> Self {
fn create(_shape: Shape, _agg_kind: AggKind) -> Self {
Self { _m1: PhantomData }
}
@@ -831,7 +833,8 @@ where
}
pub struct WaveNBinner<NTY> {
bin_count: usize,
shape_bin_count: usize,
x_bin_count: usize,
_m1: PhantomData<NTY>,
}
@@ -842,11 +845,15 @@ where
type Input = Vec<NTY>;
type Output = XBinnedWaveEvents<NTY>;
fn create(shape: Shape) -> Self {
fn create(shape: Shape, agg_kind: AggKind) -> Self {
info!("WaveNBinner::create");
// TODO get rid of panic potential
let bin_count = if let Shape::Wave(n) = shape { n } else { panic!() } as usize;
let shape_bin_count = if let Shape::Wave(n) = shape { n } else { panic!() } as usize;
let x_bin_count = x_bin_count(&shape, &agg_kind);
info!("shape_bin_count {} x_bin_count {}", shape_bin_count, x_bin_count);
Self {
bin_count,
shape_bin_count,
x_bin_count,
_m1: PhantomData,
}
}
@@ -860,12 +867,12 @@ where
avgs: Vec::with_capacity(nev),
};
for i1 in 0..nev {
let mut min = vec![NTY::min_or_nan(); self.bin_count];
let mut max = vec![NTY::max_or_nan(); self.bin_count];
let mut sum = vec![0f32; self.bin_count];
let mut sumc = vec![0; self.bin_count];
let mut min = vec![NTY::min_or_nan(); self.x_bin_count];
let mut max = vec![NTY::max_or_nan(); self.x_bin_count];
let mut sum = vec![0f32; self.x_bin_count];
let mut sumc = vec![0u64; self.x_bin_count];
for (i2, &v) in inp.values[i1].iter().enumerate() {
let i3 = i2 * self.bin_count / inp.values[i1].len();
let i3 = i2 * self.x_bin_count / self.shape_bin_count;
if v < min[i3] {
min[i3] = v;
}
@@ -881,15 +888,9 @@ where
ret.mins.push(min);
ret.maxs.push(max);
let avg = sum
.iter()
.enumerate()
.map(|(i3, &k)| {
if sumc[i3] > 0 {
sum[i3] / sumc[i3] as f32
} else {
f32::NAN
}
})
.into_iter()
.zip(sumc.into_iter())
.map(|(j, k)| if k > 0 { j / k as f32 } else { f32::NAN })
.collect();
ret.avgs.push(avg);
}
@@ -908,7 +909,7 @@ where
type Input = Vec<NTY>;
type Output = WaveEvents<NTY>;
fn create(shape: Shape) -> Self {
fn create(_shape: Shape, _agg_kind: AggKind) -> Self {
Self { _m1: PhantomData }
}

View File

@@ -7,6 +7,7 @@ use crate::agg::{Fits, FitsInside};
use crate::binned::binnedfrompbv::BinnedFromPreBinned;
use crate::binned::query::BinnedQuery;
use crate::binnedstream::BoxedStream;
use crate::channelexec::{channel_exec, collect_plain_events_json, ChannelExecFunction, PlainEventsAggMethod};
use crate::decode::{
BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValues, EventValuesDim0Case, EventValuesDim1Case,
LittleEndian, NumFromBytes,
@@ -15,7 +16,7 @@ use crate::frame::makeframe::{Framable, FrameType, SubFrId};
use crate::merge::mergedfromremotes::MergedFromRemotes;
use crate::raw::EventsQuery;
use crate::Sitemty;
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use chrono::{TimeZone, Utc};
use err::Error;
use futures_core::Stream;
@@ -23,7 +24,7 @@ use futures_util::{FutureExt, StreamExt};
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{
AggKind, BinnedRange, ByteOrder, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator,
AggKind, BinnedRange, ByteOrder, Channel, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator,
PreBinnedPatchRange, ScalarType, Shape,
};
use num_traits::{AsPrimitive, Bounded, Float, Zero};
@@ -60,6 +61,7 @@ pub struct BinnedResponseStat<T> {
// They also must resolve to the same types, so would be good to unify.
fn make_num_pipeline_nty_end_evs_enp_stat<NTY, END, EVS, ENP>(
shape: Shape,
event_value_shape: EVS,
query: BinnedQuery,
node_config: &NodeConfigCached,
@@ -100,6 +102,7 @@ where
PreBinnedPatchIterator::from_range(pre_range),
query.channel().clone(),
range.clone(),
shape,
query.agg_kind().clone(),
query.cache_usage().clone(),
node_config,
@@ -149,7 +152,10 @@ pub struct BinnedResponseDyn {
}
fn make_num_pipeline_nty_end_evs_enp<PPP, NTY, END, EVS, ENP>(
shape: Shape,
_agg_kind: AggKind,
event_value_shape: EVS,
_events_node_proc: ENP,
query: BinnedQuery,
ppp: PPP,
node_config: &NodeConfigCached,
@@ -175,7 +181,7 @@ where
FrameType + Framable + DeserializeOwned,
Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>: ToJsonResult + Framable,
{
let res = make_num_pipeline_nty_end_evs_enp_stat::<_, _, _, ENP>(event_value_shape, query, node_config)?;
let res = make_num_pipeline_nty_end_evs_enp_stat::<_, _, _, ENP>(shape, event_value_shape, query, node_config)?;
let s = ppp.convert(res.stream, res.bin_count);
let ret = BinnedResponseDyn { stream: Box::pin(s) };
Ok(ret)
@@ -190,18 +196,108 @@ fn make_num_pipeline_nty_end<PPP, NTY, END>(
where
PPP: PipelinePostProcessA,
PPP: PipelinePostProcessB<MinMaxAvgBins<NTY>>,
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + 'static,
END: Endianness + 'static,
{
let agg_kind = query.agg_kind().clone();
match shape {
Shape::Scalar => {
let evs = EventValuesDim0Case::new();
match agg_kind {
AggKind::DimXBins1 => {
let events_node_proc = <<EventValuesDim0Case<NTY> as EventValueShape<NTY, END>>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
make_num_pipeline_nty_end_evs_enp::<_, NTY, END, _, _>(
shape,
agg_kind,
evs,
events_node_proc,
query,
ppp,
node_config,
)
}
AggKind::DimXBinsN(_) => {
let events_node_proc = <<EventValuesDim0Case<NTY> as EventValueShape<NTY, END>>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
make_num_pipeline_nty_end_evs_enp::<_, NTY, END, _, _>(
shape,
agg_kind,
evs,
events_node_proc,
query,
ppp,
node_config,
)
}
AggKind::Plain => {
panic!();
}
}
}
Shape::Wave(n) => {
let evs = EventValuesDim1Case::new(n);
match agg_kind {
AggKind::DimXBins1 => {
let events_node_proc = <<EventValuesDim1Case<NTY> as EventValueShape<NTY, END>>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
make_num_pipeline_nty_end_evs_enp::<PPP, NTY, END, _, _>(
shape,
agg_kind,
evs,
events_node_proc,
query,
ppp,
node_config,
)
}
AggKind::DimXBinsN(_) => {
/*let events_node_proc = <<EventValuesDim1Case<NTY> as EventValueShape<NTY, END>>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
let yo = make_num_pipeline_nty_end_evs_enp::<PPP, NTY, END, _, _>(
shape,
agg_kind,
evs,
events_node_proc,
query,
ppp,
node_config,
);*/
err::todoval()
}
AggKind::Plain => {
panic!();
}
}
}
}
}
fn make_num_pipeline_nty_end_old<PPP, NTY, END>(
shape: Shape,
query: BinnedQuery,
ppp: PPP,
node_config: &NodeConfigCached,
) -> Result<BinnedResponseDyn, Error>
where
PPP: PipelinePostProcessA,
PPP: PipelinePostProcessB<MinMaxAvgBins<NTY>>,
PPP: PipelinePostProcessB<MinMaxAvgWaveBins<NTY>>,
NTY: NumOps + NumFromBytes<NTY, END> + 'static,
END: Endianness + 'static,
{
let agg_kind = query.agg_kind().clone();
match shape {
Shape::Scalar => make_num_pipeline_nty_end_evs_enp::<_, NTY, _, _, Identity<_>>(
shape.clone(),
agg_kind.clone(),
EventValuesDim0Case::new(),
Identity::create(shape.clone(), agg_kind.clone()),
query,
ppp,
node_config,
),
Shape::Wave(n) => make_num_pipeline_nty_end_evs_enp::<_, NTY, _, _, WaveXBinner<_>>(
shape.clone(),
agg_kind.clone(),
EventValuesDim1Case::new(n),
WaveXBinner::create(shape.clone(), agg_kind.clone()),
query,
ppp,
node_config,
@@ -386,12 +482,6 @@ impl JsonCollector {
}
}
impl Framable for Sitemty<serde_json::Value> {
fn make_frame(&self) -> Result<BytesMut, Error> {
panic!()
}
}
impl ToJsonBytes for serde_json::Value {
fn to_json_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(serde_json::to_vec(self)?)
@@ -606,11 +696,167 @@ where
Ok(ret)
}
pub struct BinnedJsonChannelExec {
query: BinnedQuery,
node_config: NodeConfigCached,
timeout: Duration,
}
impl BinnedJsonChannelExec {
pub fn new(query: BinnedQuery, node_config: NodeConfigCached) -> Self {
Self {
query,
node_config,
timeout: Duration::from_millis(3000),
}
}
}
impl ChannelExecFunction for BinnedJsonChannelExec {
type Output = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>;
fn exec<NTY, END, EVS, ENP>(
self,
byte_order: END,
shape: Shape,
event_value_shape: EVS,
_events_node_proc: ENP,
) -> Result<Self::Output, Error>
where
NTY: NumOps + NumFromBytes<NTY, END> + 'static,
END: Endianness + 'static,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + PlainEventsAggMethod + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output> + 'static,
Sitemty<<<EVS as PlainEventsAggMethod>::Method as EventsNodeProcessor>::Output>: FrameType,
<<EVS as PlainEventsAggMethod>::Method as EventsNodeProcessor>::Output: Collectable + PushableIndex,
<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output:
TimeBinnableType<Output = <<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output> + Unpin,
// TODO require these things in general?
<ENP as EventsNodeProcessor>::Output: PushableIndex,
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType + Framable + 'static,
Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>:
FrameType + Framable + DeserializeOwned,
{
let _ = event_value_shape;
let range =
BinnedRange::covering_range(self.query.range().clone(), self.query.bin_count())?.ok_or(Error::with_msg(
format!("binned_bytes_for_http BinnedRange::covering_range returned None"),
))?;
let perf_opts = PerfOpts { inmem_bufcap: 512 };
match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) {
Ok(Some(pre_range)) => {
info!("binned_bytes_for_http found pre_range: {:?}", pre_range);
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
let msg = format!(
"binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}",
pre_range, range
);
return Err(Error::with_msg(msg));
}
let s = BinnedFromPreBinned::<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>::new(
PreBinnedPatchIterator::from_range(pre_range),
self.query.channel().clone(),
range.clone(),
shape,
self.query.agg_kind().clone(),
self.query.cache_usage().clone(),
&self.node_config,
self.query.disk_stats_every().clone(),
self.query.report_error(),
)?
.map(|item| match item.make_frame() {
Ok(item) => Ok(item.freeze()),
Err(e) => Err(e),
});
// TODO remove?
/*let ret = BinnedResponseStat {
stream: Box::pin(s),
bin_count: range.count as u32,
};*/
Ok(Box::pin(s))
}
Ok(None) => {
info!(
"binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}",
range
);
let bin_count = range.count as u32;
let evq = EventsQuery {
channel: self.query.channel().clone(),
range: self.query.range().clone(),
agg_kind: self.query.agg_kind().clone(),
};
let x_bin_count = if let AggKind::DimXBinsN(n) = self.query.agg_kind() {
*n as usize
} else {
0
};
let s = MergedFromRemotes::<ENP>::new(evq, perf_opts, self.node_config.node_config.cluster.clone());
let s =
TBinnerStream::<_, <ENP as EventsNodeProcessor>::Output>::new(s, range, x_bin_count).map(|item| {
match item.make_frame() {
Ok(item) => Ok(item.freeze()),
Err(e) => Err(e),
}
});
/*let ret = BinnedResponseStat {
stream: Box::pin(s),
bin_count,
};*/
Ok(Box::pin(s))
}
Err(e) => Err(e),
}
/*let perf_opts = PerfOpts { inmem_bufcap: 4096 };
let evq = EventsQuery {
channel: self.channel,
range: self.range,
agg_kind: self.agg_kind,
};
let s = MergedFromRemotes::<<EVS as PlainEventsAggMethod>::Method>::new(
evq,
perf_opts,
self.node_config.node_config.cluster,
);
let f = collect_plain_events_json(s, self.timeout);
let f = FutureExt::map(f, |item| match item {
Ok(item) => {
// TODO add channel entry info here?
//let obj = item.as_object_mut().unwrap();
//obj.insert("channelName", JsonValue::String(en));
Ok(Bytes::from(serde_json::to_vec(&item)?))
}
Err(e) => Err(e.into()),
});
let s = futures_util::stream::once(f);
Ok(Box::pin(s))*/
}
fn empty() -> Self::Output {
Box::pin(futures_util::stream::empty())
}
}
pub async fn binned_json(
node_config: &NodeConfigCached,
query: &BinnedQuery,
node_config: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>, Error> {
let pl = make_num_pipeline(
// TODO try the channelexec approach.
// TODO why does channel_exec need the range, and what does it use it for?
// do I want there the user-requested range or the bin-edge-adjusted range?
// TODO currently, channel_exec resolves NTY, END, EVS but not ENP!
// can I add that or does that break other things?
let ret = channel_exec(
BinnedJsonChannelExec::new(query.clone(), node_config.clone()),
query.channel(),
query.range(),
query.agg_kind().clone(),
node_config,
)
.await?;
/*let pl = make_num_pipeline(
query,
CollectForJson::new(query.timeout(), query.abort_after_bin_count()),
node_config,
@@ -620,7 +866,8 @@ pub async fn binned_json(
let fr = item.to_json_result()?;
let buf = fr.to_json_bytes()?;
Ok(Bytes::from(buf))
});
});*/
Ok(Box::pin(ret))
}
@@ -744,6 +991,7 @@ impl WithTimestamps for MinMaxAvgScalarEventBatch {
}
pub trait PushableIndex {
// TODO check whether it makes sense to allow a move out of src. Or use a deque for src type and pop?
fn push_index(&mut self, src: &Self, ix: usize);
}
@@ -781,7 +1029,6 @@ pub trait NumOps:
fn is_nan(&self) -> bool;
}
fn tmp() {}
macro_rules! impl_num_ops {
($ty:ident, $min_or_nan:ident, $max_or_nan:ident, $is_nan:ident) => {
impl NumOps for $ty {
@@ -798,7 +1045,7 @@ macro_rules! impl_num_ops {
};
}
fn is_nan_int<T>(x: &T) -> bool {
fn is_nan_int<T>(_x: &T) -> bool {
false
}
@@ -826,7 +1073,7 @@ pub trait EventsDecoder {
pub trait EventsNodeProcessor: Send + Unpin {
type Input;
type Output: Send + Unpin + DeserializeOwned + WithTimestamps + TimeBinnableType;
fn create(shape: Shape) -> Self;
fn create(shape: Shape, agg_kind: AggKind) -> Self;
fn process(&self, inp: EventValues<Self::Input>) -> Self::Output;
}
@@ -993,7 +1240,7 @@ where
type Output = MinMaxAvgBins<NTY>;
type Aggregator = MinMaxAvgBinsAggregator<NTY>;
fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator {
fn aggregator(range: NanoRange, _x_bin_count: usize) -> Self::Aggregator {
Self::Aggregator::new(range)
}
}

View File

@@ -11,7 +11,9 @@ use futures_core::Stream;
use futures_util::{FutureExt, StreamExt};
use http::{StatusCode, Uri};
use netpod::log::*;
use netpod::{AggKind, BinnedRange, ByteSize, Channel, NodeConfigCached, PerfOpts, PreBinnedPatchIterator};
use netpod::{
x_bin_count, AggKind, BinnedRange, ByteSize, Channel, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, Shape,
};
use serde::de::DeserializeOwned;
use std::future::ready;
use std::marker::PhantomData;
@@ -165,6 +167,7 @@ where
patch_it: PreBinnedPatchIterator,
channel: Channel,
range: BinnedRange,
shape: Shape,
agg_kind: AggKind,
cache_usage: CacheUsage,
node_config: &NodeConfigCached,
@@ -184,6 +187,7 @@ where
let pmax = patches.len();
let inp = futures_util::stream::iter(patches.into_iter().enumerate())
.map({
let agg_kind = agg_kind.clone();
let node_config = node_config.clone();
move |(pix, patch)| {
let query = PreBinnedQuery::new(
@@ -235,7 +239,7 @@ where
ready(g)
}
});
let inp = TBinnerStream::<_, TBT>::new(inp, range);
let inp = TBinnerStream::<_, TBT>::new(inp, range, x_bin_count(&shape, &agg_kind));
Ok(Self {
inp: Box::pin(inp),
_m1: PhantomData,

View File

@@ -8,6 +8,7 @@ use crate::binned::{
use crate::Sitemty;
use chrono::{TimeZone, Utc};
use err::Error;
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::NanoRange;
use num_traits::Zero;
@@ -173,8 +174,8 @@ where
type Output = MinMaxAvgDim1Bins<NTY>;
type Aggregator = MinMaxAvgDim1BinsAggregator<NTY>;
fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator {
Self::Aggregator::new(range)
fn aggregator(range: NanoRange, x_bin_count: usize) -> Self::Aggregator {
Self::Aggregator::new(range, x_bin_count)
}
}
@@ -317,7 +318,7 @@ pub struct MinMaxAvgDim1BinsAggregator<NTY> {
}
impl<NTY> MinMaxAvgDim1BinsAggregator<NTY> {
pub fn new(range: NanoRange, bin_count: usize) -> Self {
pub fn new(range: NanoRange, _x_bin_count: usize) -> Self {
Self {
range,
count: 0,
@@ -439,6 +440,7 @@ pub struct WaveEventsCollector<NTY> {
impl<NTY> WaveEventsCollector<NTY> {
pub fn new(_bin_count_exp: u32) -> Self {
info!("\n\nWaveEventsCollector\n\n");
Self {
vals: WaveEvents::empty(),
range_complete: false,

View File

@@ -14,7 +14,9 @@ use err::Error;
use futures_core::Stream;
use futures_util::{FutureExt, StreamExt};
use netpod::log::*;
use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange};
use netpod::{
x_bin_count, AggKind, BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange, Shape,
};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::future::Future;
@@ -33,6 +35,8 @@ where
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output>,
{
query: PreBinnedQuery,
shape: Shape,
agg_kind: AggKind,
node_config: NodeConfigCached,
open_check_local_file: Option<Pin<Box<dyn Future<Output = Result<File, io::Error>> + Send>>>,
fut2: Option<
@@ -73,9 +77,11 @@ where
// TODO who exactly needs this DeserializeOwned?
Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>: FrameType + DeserializeOwned,
{
pub fn new(query: PreBinnedQuery, node_config: &NodeConfigCached) -> Self {
pub fn new(query: PreBinnedQuery, shape: Shape, agg_kind: AggKind, node_config: &NodeConfigCached) -> Self {
Self {
query,
shape,
agg_kind,
node_config: node_config.clone(),
open_check_local_file: None,
fut2: None,
@@ -124,7 +130,11 @@ where
.ok_or(Error::with_msg("covering_range returns None"))?;
let perf_opts = PerfOpts { inmem_bufcap: 512 };
let s = MergedFromRemotes::<ENP>::new(evq, perf_opts, self.node_config.node_config.cluster.clone());
let ret = TBinnerStream::<_, <ENP as EventsNodeProcessor>::Output>::new(s, range);
let ret = TBinnerStream::<_, <ENP as EventsNodeProcessor>::Output>::new(
s,
range,
x_bin_count(&self.shape, &self.agg_kind),
);
Ok(Box::pin(ret))
}

View File

@@ -1,5 +1,4 @@
use crate::agg::binnedt::TimeBinnableType;
use crate::agg::enp::{Identity, WaveXBinner};
use crate::agg::streams::Appendable;
use crate::binned::pbv::PreBinnedValueStream;
use crate::binned::query::PreBinnedQuery;
@@ -15,14 +14,17 @@ use bytes::Bytes;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use netpod::{ByteOrder, NodeConfigCached, ScalarType, Shape};
use netpod::{AggKind, ByteOrder, NodeConfigCached, ScalarType, Shape};
use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::pin::Pin;
fn make_num_pipeline_nty_end_evs_enp<NTY, END, EVS, ENP>(
shape: Shape,
agg_kind: AggKind,
_event_value_shape: EVS,
_events_node_proc: ENP,
query: PreBinnedQuery,
node_config: &NodeConfigCached,
) -> Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>
@@ -36,13 +38,14 @@ where
Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>:
Framable + FrameType + DeserializeOwned,
{
let ret = PreBinnedValueStream::<NTY, END, EVS, ENP>::new(query, node_config);
let ret = PreBinnedValueStream::<NTY, END, EVS, ENP>::new(query, shape, agg_kind, node_config);
let ret = StreamExt::map(ret, |item| Box::new(item) as Box<dyn Framable>);
Box::pin(ret)
}
fn make_num_pipeline_nty_end<NTY, END>(
shape: Shape,
agg_kind: AggKind,
query: PreBinnedQuery,
node_config: &NodeConfigCached,
) -> Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>
@@ -51,24 +54,74 @@ where
END: Endianness + 'static,
{
match shape {
Shape::Scalar => make_num_pipeline_nty_end_evs_enp::<NTY, END, _, Identity<NTY>>(
EventValuesDim0Case::new(),
query,
node_config,
),
Shape::Wave(n) => make_num_pipeline_nty_end_evs_enp::<NTY, END, _, WaveXBinner<NTY>>(
EventValuesDim1Case::new(n),
query,
node_config,
),
Shape::Scalar => {
let evs = EventValuesDim0Case::new();
match agg_kind {
AggKind::DimXBins1 => {
let events_node_proc = <<EventValuesDim0Case<NTY> as EventValueShape<NTY, END>>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
make_num_pipeline_nty_end_evs_enp::<NTY, END, _, _>(
shape,
agg_kind,
evs,
events_node_proc,
query,
node_config,
)
}
AggKind::DimXBinsN(_) => {
let events_node_proc = <<EventValuesDim0Case<NTY> as EventValueShape<NTY, END>>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
make_num_pipeline_nty_end_evs_enp::<NTY, END, _, _>(
shape,
agg_kind,
evs,
events_node_proc,
query,
node_config,
)
}
AggKind::Plain => {
panic!();
}
}
}
Shape::Wave(n) => {
let evs = EventValuesDim1Case::new(n);
match agg_kind {
AggKind::DimXBins1 => {
let events_node_proc = <<EventValuesDim1Case<NTY> as EventValueShape<NTY, END>>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
make_num_pipeline_nty_end_evs_enp::<NTY, END, _, _>(
shape,
agg_kind,
evs,
events_node_proc,
query,
node_config,
)
}
AggKind::DimXBinsN(_) => {
let events_node_proc = <<EventValuesDim1Case<NTY> as EventValueShape<NTY, END>>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
make_num_pipeline_nty_end_evs_enp::<NTY, END, _, _>(
shape,
agg_kind,
evs,
events_node_proc,
query,
node_config,
)
}
AggKind::Plain => {
panic!();
}
}
}
}
}
macro_rules! match_end {
($nty:ident, $end:expr, $shape:expr, $query:expr, $node_config:expr) => {
($nty:ident, $end:expr, $shape:expr, $agg_kind:expr, $query:expr, $node_config:expr) => {
match $end {
ByteOrder::LE => make_num_pipeline_nty_end::<$nty, LittleEndian>($shape, $query, $node_config),
ByteOrder::BE => make_num_pipeline_nty_end::<$nty, BigEndian>($shape, $query, $node_config),
ByteOrder::LE => make_num_pipeline_nty_end::<$nty, LittleEndian>($shape, $agg_kind, $query, $node_config),
ByteOrder::BE => make_num_pipeline_nty_end::<$nty, BigEndian>($shape, $agg_kind, $query, $node_config),
}
};
}
@@ -77,20 +130,21 @@ fn make_num_pipeline(
scalar_type: ScalarType,
byte_order: ByteOrder,
shape: Shape,
agg_kind: AggKind,
query: PreBinnedQuery,
node_config: &NodeConfigCached,
) -> Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>> {
match scalar_type {
ScalarType::U8 => match_end!(u8, byte_order, shape, query, node_config),
ScalarType::U16 => match_end!(u16, byte_order, shape, query, node_config),
ScalarType::U32 => match_end!(u32, byte_order, shape, query, node_config),
ScalarType::U64 => match_end!(u64, byte_order, shape, query, node_config),
ScalarType::I8 => match_end!(i8, byte_order, shape, query, node_config),
ScalarType::I16 => match_end!(i16, byte_order, shape, query, node_config),
ScalarType::I32 => match_end!(i32, byte_order, shape, query, node_config),
ScalarType::I64 => match_end!(i64, byte_order, shape, query, node_config),
ScalarType::F32 => match_end!(f32, byte_order, shape, query, node_config),
ScalarType::F64 => match_end!(f64, byte_order, shape, query, node_config),
ScalarType::U8 => match_end!(u8, byte_order, shape, agg_kind, query, node_config),
ScalarType::U16 => match_end!(u16, byte_order, shape, agg_kind, query, node_config),
ScalarType::U32 => match_end!(u32, byte_order, shape, agg_kind, query, node_config),
ScalarType::U64 => match_end!(u64, byte_order, shape, agg_kind, query, node_config),
ScalarType::I8 => match_end!(i8, byte_order, shape, agg_kind, query, node_config),
ScalarType::I16 => match_end!(i16, byte_order, shape, agg_kind, query, node_config),
ScalarType::I32 => match_end!(i32, byte_order, shape, agg_kind, query, node_config),
ScalarType::I64 => match_end!(i64, byte_order, shape, agg_kind, query, node_config),
ScalarType::F32 => match_end!(f32, byte_order, shape, agg_kind, query, node_config),
ScalarType::F64 => match_end!(f64, byte_order, shape, agg_kind, query, node_config),
}
}
@@ -137,6 +191,7 @@ pub async fn pre_binned_bytes_for_http(
entry.scalar_type.clone(),
entry.byte_order.clone(),
entry.to_shape()?,
query.agg_kind().clone(),
query.clone(),
node_config,
)

View File

@@ -1,3 +1,4 @@
use crate::agg::binnedt::TimeBinnableType;
use crate::agg::enp::{Identity, WavePlainProc};
use crate::agg::streams::{Collectable, Collector, StreamItem};
use crate::binned::{EventsNodeProcessor, NumOps, PushableIndex, RangeCompletableItem};
@@ -16,6 +17,7 @@ use futures_util::future::FutureExt;
use futures_util::StreamExt;
use netpod::{AggKind, ByteOrder, Channel, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape};
use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry};
use serde::de::DeserializeOwned;
use serde_json::Value as JsonValue;
use std::pin::Pin;
use std::time::Duration;
@@ -24,36 +26,62 @@ use tokio::time::timeout_at;
pub trait ChannelExecFunction {
type Output;
fn exec<NTY, END, EVS>(self, byte_order: END, event_value_shape: EVS) -> Result<Self::Output, Error>
fn exec<NTY, END, EVS, ENP>(
self,
byte_order: END,
shape: Shape,
event_value_shape: EVS,
events_node_proc: ENP,
) -> Result<Self::Output, Error>
where
NTY: NumOps + NumFromBytes<NTY, END> + 'static,
END: Endianness + 'static,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + PlainEventsAggMethod + 'static,
EventValues<NTY>: Collectable,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output> + 'static,
Sitemty<<<EVS as PlainEventsAggMethod>::Method as EventsNodeProcessor>::Output>: FrameType,
<<EVS as PlainEventsAggMethod>::Method as EventsNodeProcessor>::Output: Collectable + PushableIndex;
<<EVS as PlainEventsAggMethod>::Method as EventsNodeProcessor>::Output: Collectable + PushableIndex,
<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output:
TimeBinnableType<Output = <<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output> + Unpin,
// TODO require these things in general?
<ENP as EventsNodeProcessor>::Output: PushableIndex,
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType + Framable + 'static,
Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>:
FrameType + Framable + DeserializeOwned;
fn empty() -> Self::Output;
}
fn channel_exec_nty_end_evs_enp<F, NTY, END, EVS>(
fn channel_exec_nty_end_evs_enp<F, NTY, END, EVS, ENP>(
f: F,
byte_order: END,
shape: Shape,
event_value_shape: EVS,
events_node_proc: ENP,
) -> Result<F::Output, Error>
where
F: ChannelExecFunction,
NTY: NumOps + NumFromBytes<NTY, END> + 'static,
END: Endianness + 'static,
// TODO
// TODO
// TODO
// TODO
// Can I replace the PlainEventsAggMethod by EventsNodeProcessor?
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + PlainEventsAggMethod + 'static,
EventValues<NTY>: Collectable,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output> + 'static,
Sitemty<<<EVS as PlainEventsAggMethod>::Method as EventsNodeProcessor>::Output>: FrameType,
<<EVS as PlainEventsAggMethod>::Method as EventsNodeProcessor>::Output: Collectable + PushableIndex,
{
Ok(f.exec::<NTY, _, _>(byte_order, event_value_shape)?)
Ok(f.exec(byte_order, shape, event_value_shape, events_node_proc)?)
}
fn channel_exec_nty_end<F, NTY, END>(f: F, byte_order: END, shape: Shape) -> Result<F::Output, Error>
fn channel_exec_nty_end<F, NTY, END>(f: F, byte_order: END, shape: Shape, agg_kind: AggKind) -> Result<F::Output, Error>
where
F: ChannelExecFunction,
NTY: NumOps + NumFromBytes<NTY, END> + 'static,
@@ -61,16 +89,54 @@ where
EventValues<NTY>: Collectable,
{
match shape {
Shape::Scalar => channel_exec_nty_end_evs_enp::<_, NTY, _, _>(f, byte_order, EventValuesDim0Case::new()),
Shape::Wave(n) => channel_exec_nty_end_evs_enp::<_, NTY, _, _>(f, byte_order, EventValuesDim1Case::new(n)),
Shape::Scalar => {
//
match agg_kind {
AggKind::Plain => {
let evs = EventValuesDim0Case::new();
let events_node_proc = <<EventValuesDim0Case<NTY> as EventValueShape<NTY, END>>::NumXAggPlain as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
channel_exec_nty_end_evs_enp(f, byte_order, shape, evs, events_node_proc)
}
AggKind::DimXBins1 => {
let evs = EventValuesDim0Case::new();
let events_node_proc = <<EventValuesDim0Case<NTY> as EventValueShape<NTY, END>>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
channel_exec_nty_end_evs_enp(f, byte_order, shape, evs, events_node_proc)
}
AggKind::DimXBinsN(_) => {
let evs = EventValuesDim0Case::new();
let events_node_proc = <<EventValuesDim0Case<NTY> as EventValueShape<NTY, END>>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
channel_exec_nty_end_evs_enp(f, byte_order, shape, evs, events_node_proc)
}
}
}
Shape::Wave(n) => {
//
match agg_kind {
AggKind::Plain => {
let evs = EventValuesDim1Case::new(n);
let events_node_proc = <<EventValuesDim1Case<NTY> as EventValueShape<NTY, END>>::NumXAggPlain as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
channel_exec_nty_end_evs_enp(f, byte_order, shape, evs, events_node_proc)
}
AggKind::DimXBins1 => {
let evs = EventValuesDim1Case::new(n);
let events_node_proc = <<EventValuesDim1Case<NTY> as EventValueShape<NTY, END>>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
channel_exec_nty_end_evs_enp(f, byte_order, shape, evs, events_node_proc)
}
AggKind::DimXBinsN(_) => {
let evs = EventValuesDim1Case::new(n);
let events_node_proc = <<EventValuesDim1Case<NTY> as EventValueShape<NTY, END>>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
channel_exec_nty_end_evs_enp(f, byte_order, shape, evs, events_node_proc)
}
}
}
}
}
macro_rules! match_end {
($f:expr, $nty:ident, $end:expr, $shape:expr, $node_config:expr) => {
($f:expr, $nty:ident, $end:expr, $shape:expr, $agg_kind:expr, $node_config:expr) => {
match $end {
ByteOrder::LE => channel_exec_nty_end::<_, $nty, _>($f, LittleEndian {}, $shape),
ByteOrder::BE => channel_exec_nty_end::<_, $nty, _>($f, BigEndian {}, $shape),
ByteOrder::LE => channel_exec_nty_end::<_, $nty, _>($f, LittleEndian {}, $shape, $agg_kind),
ByteOrder::BE => channel_exec_nty_end::<_, $nty, _>($f, BigEndian {}, $shape, $agg_kind),
}
};
}
@@ -80,22 +146,23 @@ fn channel_exec_config<F>(
scalar_type: ScalarType,
byte_order: ByteOrder,
shape: Shape,
agg_kind: AggKind,
_node_config: &NodeConfigCached,
) -> Result<F::Output, Error>
where
F: ChannelExecFunction,
{
match scalar_type {
ScalarType::U8 => match_end!(f, u8, byte_order, shape, node_config),
ScalarType::U16 => match_end!(f, u16, byte_order, shape, node_config),
ScalarType::U32 => match_end!(f, u32, byte_order, shape, node_config),
ScalarType::U64 => match_end!(f, u64, byte_order, shape, node_config),
ScalarType::I8 => match_end!(f, i8, byte_order, shape, node_config),
ScalarType::I16 => match_end!(f, i16, byte_order, shape, node_config),
ScalarType::I32 => match_end!(f, i32, byte_order, shape, node_config),
ScalarType::I64 => match_end!(f, i64, byte_order, shape, node_config),
ScalarType::F32 => match_end!(f, f32, byte_order, shape, node_config),
ScalarType::F64 => match_end!(f, f64, byte_order, shape, node_config),
ScalarType::U8 => match_end!(f, u8, byte_order, shape, agg_kind, node_config),
ScalarType::U16 => match_end!(f, u16, byte_order, shape, agg_kind, node_config),
ScalarType::U32 => match_end!(f, u32, byte_order, shape, agg_kind, node_config),
ScalarType::U64 => match_end!(f, u64, byte_order, shape, agg_kind, node_config),
ScalarType::I8 => match_end!(f, i8, byte_order, shape, agg_kind, node_config),
ScalarType::I16 => match_end!(f, i16, byte_order, shape, agg_kind, node_config),
ScalarType::I32 => match_end!(f, i32, byte_order, shape, agg_kind, node_config),
ScalarType::I64 => match_end!(f, i64, byte_order, shape, agg_kind, node_config),
ScalarType::F32 => match_end!(f, f32, byte_order, shape, agg_kind, node_config),
ScalarType::F64 => match_end!(f, f64, byte_order, shape, agg_kind, node_config),
}
}
@@ -103,6 +170,7 @@ pub async fn channel_exec<F>(
f: F,
channel: &Channel,
range: &NanoRange,
agg_kind: AggKind,
node_config: &NodeConfigCached,
) -> Result<F::Output, Error>
where
@@ -130,6 +198,7 @@ where
entry.scalar_type.clone(),
entry.byte_order.clone(),
entry.to_shape()?,
agg_kind,
node_config,
)?;
Ok(ret)
@@ -166,12 +235,18 @@ impl PlainEvents {
impl ChannelExecFunction for PlainEvents {
type Output = Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>;
fn exec<NTY, END, EVS>(self, byte_order: END, event_value_shape: EVS) -> Result<Self::Output, Error>
fn exec<NTY, END, EVS, ENP>(
self,
byte_order: END,
shape: Shape,
event_value_shape: EVS,
events_node_proc: ENP,
) -> Result<Self::Output, Error>
where
NTY: NumOps + NumFromBytes<NTY, END> + 'static,
END: Endianness + 'static,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
EventValues<NTY>: Collectable,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output> + 'static,
{
let _ = byte_order;
let _ = event_value_shape;
@@ -297,12 +372,18 @@ where
impl ChannelExecFunction for PlainEventsJson {
type Output = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>;
fn exec<NTY, END, EVS>(self, byte_order: END, event_value_shape: EVS) -> Result<Self::Output, Error>
fn exec<NTY, END, EVS, ENP>(
self,
byte_order: END,
shape: Shape,
event_value_shape: EVS,
_events_node_proc: ENP,
) -> Result<Self::Output, Error>
where
NTY: NumOps + NumFromBytes<NTY, END> + 'static,
END: Endianness + 'static,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + PlainEventsAggMethod + 'static,
EventValues<NTY>: Collectable,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output> + 'static,
Sitemty<<<EVS as PlainEventsAggMethod>::Method as EventsNodeProcessor>::Output>: FrameType,
<<EVS as PlainEventsAggMethod>::Method as EventsNodeProcessor>::Output: Collectable + PushableIndex,
{

View File

@@ -2,7 +2,7 @@ use crate::agg::enp::{WaveEvents, XBinnedScalarEvents, XBinnedWaveEvents};
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::StreamItem;
use crate::binned::{MinMaxAvgBins, NumOps, RangeCompletableItem};
use crate::binned::{MinMaxAvgBins, MinMaxAvgWaveBins, NumOps, RangeCompletableItem};
use crate::decode::EventValues;
use crate::frame::inmem::InMemoryFrame;
use crate::raw::EventQueryJsonStringFrame;
@@ -108,7 +108,14 @@ impl<NTY> FrameType for Sitemty<XBinnedWaveEvents<NTY>>
where
NTY: SubFrId,
{
const FRAME_TYPE_ID: u32 = 0x800 + NTY::SUB;
const FRAME_TYPE_ID: u32 = 0x900 + NTY::SUB;
}
impl<NTY> FrameType for Sitemty<MinMaxAvgWaveBins<NTY>>
where
NTY: SubFrId,
{
const FRAME_TYPE_ID: u32 = 0xa00 + NTY::SUB;
}
pub trait ProvidesFrameType {
@@ -116,16 +123,32 @@ pub trait ProvidesFrameType {
}
pub trait Framable: Send {
fn typeid(&self) -> u32;
fn make_frame(&self) -> Result<BytesMut, Error>;
}
impl Framable for Sitemty<serde_json::Value> {
fn typeid(&self) -> u32 {
EventQueryJsonStringFrame::FRAME_TYPE_ID
}
fn make_frame(&self) -> Result<BytesMut, Error> {
panic!()
}
}
impl Framable for Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarBinBatch>>, Error> {
fn typeid(&self) -> u32 {
Self::FRAME_TYPE_ID
}
fn make_frame(&self) -> Result<BytesMut, Error> {
make_frame(self)
}
}
impl Framable for Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarEventBatch>>, Error> {
fn typeid(&self) -> u32 {
Self::FRAME_TYPE_ID
}
fn make_frame(&self) -> Result<BytesMut, Error> {
make_frame(self)
}
@@ -135,6 +158,9 @@ impl<NTY> Framable for Result<StreamItem<RangeCompletableItem<EventValues<NTY>>>
where
NTY: NumOps + Serialize,
{
fn typeid(&self) -> u32 {
Self::FRAME_TYPE_ID
}
fn make_frame(&self) -> Result<BytesMut, Error> {
make_frame(self)
}
@@ -144,6 +170,9 @@ impl<NTY> Framable for Result<StreamItem<RangeCompletableItem<XBinnedScalarEvent
where
NTY: NumOps + Serialize,
{
fn typeid(&self) -> u32 {
Self::FRAME_TYPE_ID
}
fn make_frame(&self) -> Result<BytesMut, Error> {
make_frame(self)
}
@@ -153,6 +182,9 @@ impl<NTY> Framable for Sitemty<MinMaxAvgBins<NTY>>
where
NTY: NumOps + Serialize,
{
fn typeid(&self) -> u32 {
Self::FRAME_TYPE_ID
}
fn make_frame(&self) -> Result<BytesMut, Error> {
make_frame(self)
}
@@ -162,6 +194,9 @@ impl<NTY> Framable for Sitemty<WaveEvents<NTY>>
where
NTY: NumOps + Serialize,
{
fn typeid(&self) -> u32 {
Self::FRAME_TYPE_ID
}
fn make_frame(&self) -> Result<BytesMut, Error> {
make_frame(self)
}
@@ -171,6 +206,21 @@ impl<NTY> Framable for Sitemty<XBinnedWaveEvents<NTY>>
where
NTY: NumOps + Serialize,
{
fn typeid(&self) -> u32 {
Self::FRAME_TYPE_ID
}
fn make_frame(&self) -> Result<BytesMut, Error> {
make_frame(self)
}
}
impl<NTY> Framable for Sitemty<MinMaxAvgWaveBins<NTY>>
where
NTY: NumOps + Serialize,
{
fn typeid(&self) -> u32 {
Self::FRAME_TYPE_ID
}
fn make_frame(&self) -> Result<BytesMut, Error> {
make_frame(self)
}
@@ -249,3 +299,13 @@ where
Err(e) => Err(e.into()),
}
}
pub fn crchex<T>(t: T) -> String
where
T: AsRef<[u8]>,
{
let mut h = crc32fast::Hasher::new();
h.update(t.as_ref());
let crc = h.finalize();
format!("{:08x}", crc)
}

View File

@@ -7,6 +7,7 @@ use crate::Sitemty;
use err::Error;
use futures_core::Stream;
use futures_util::{pin_mut, StreamExt};
use netpod::log::*;
use netpod::{Cluster, PerfOpts};
use std::future::Future;
use std::pin::Pin;
@@ -34,6 +35,7 @@ where
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType,
{
pub fn new(evq: EventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self {
info!("MergedFromRemotes evq {:?}", evq);
let mut tcp_establish_futs = vec![];
for node in &cluster.nodes {
let f = x_processed_stream_from_node::<ENP>(evq.clone(), perf_opts.clone(), node.clone());

View File

@@ -13,6 +13,7 @@ use crate::raw::eventsfromframes::EventsFromFrames;
use crate::Sitemty;
use err::Error;
use futures_core::Stream;
use netpod::log::*;
use netpod::{AggKind, Channel, NanoRange, Node, PerfOpts};
use serde::{Deserialize, Serialize};
use std::pin::Pin;
@@ -47,6 +48,7 @@ where
{
let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?;
let qjs = serde_json::to_string(&query)?;
info!("x_processed_stream_from_node qjs {:?}", qjs);
let (netin, mut netout) = net.into_split();
let buf = make_frame(&EventQueryJsonStringFrame(qjs))?;
netout.write_all(&buf).await?;
@@ -58,13 +60,3 @@ where
let items = EventsFromFrames::new(frames);
Ok(Box::pin(items))
}
pub fn crchex<T>(t: T) -> String
where
T: AsRef<[u8]>,
{
let mut h = crc32fast::Hasher::new();
h.update(t.as_ref());
let crc = h.finalize();
format!("{:08x}", crc)
}

View File

@@ -105,12 +105,12 @@ where
NTY: NumOps + NumFromBytes<NTY, END> + 'static,
END: Endianness + 'static,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output>,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output> + 'static,
Sitemty<<ENP as EventsNodeProcessor>::Output>: Framable + 'static,
<ENP as EventsNodeProcessor>::Output: 'static,
{
let decs = EventsDecodedStream::<NTY, END, EVS>::new(event_value_shape, event_blobs);
let s2 = StreamExt::map(decs, |item| match item {
let s2 = StreamExt::map(decs, move |item| match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::Data(item) => {
@@ -131,40 +131,19 @@ where
macro_rules! pipe4 {
($nty:ident, $end:ident, $shape:expr, $evs:ident, $evsv:expr, $agg_kind:expr, $event_blobs:expr) => {
match $agg_kind {
AggKind::DimXBins1 => make_num_pipeline_stream_evs::<
$nty,
$end,
$evs<$nty>,
//<$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToSingleBin,
_,
//Identity<$nty>,
>(
AggKind::DimXBins1 => make_num_pipeline_stream_evs::<$nty, $end, $evs<$nty>, _>(
$evsv,
<$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToSingleBin::create($shape),
<$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToSingleBin::create($shape, $agg_kind),
$event_blobs,
),
AggKind::DimXBinsN(_) => make_num_pipeline_stream_evs::<
$nty,
$end,
$evs<$nty>,
//<$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToNBins,
_,
//WaveXBinner<$nty>,
>(
AggKind::DimXBinsN(_) => make_num_pipeline_stream_evs::<$nty, $end, $evs<$nty>, _>(
$evsv,
<$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToNBins::create($shape),
<$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToNBins::create($shape, $agg_kind),
$event_blobs,
),
AggKind::Plain => make_num_pipeline_stream_evs::<
$nty,
$end,
$evs<$nty>,
//<$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggPlain,
_,
//WaveXBinner<$nty>,
>(
AggKind::Plain => make_num_pipeline_stream_evs::<$nty, $end, $evs<$nty>, _>(
$evsv,
<$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggPlain::create($shape),
<$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggPlain::create($shape, $agg_kind),
$event_blobs,
),
}
@@ -186,9 +165,6 @@ macro_rules! pipe3 {
)
}
Shape::Wave(n) => {
// TODO
// Issue is that I try to generate too many combinations.
// e.g. I try to generic code for the combination of Shape::Scalar with WaveXBinner which does not match.
pipe4!(
$nty,
$end,
@@ -322,8 +298,13 @@ async fn events_conn_handler_inner_try(
event_chunker_conf,
);
let shape = entry.to_shape().unwrap();
info!(
"+++++--- conn.rs call pipe1 shape {:?} agg_kind {:?}",
shape, evq.agg_kind
);
let mut p1 = pipe1!(entry.scalar_type, entry.byte_order, shape, evq.agg_kind, event_blobs);
while let Some(item) = p1.next().await {
//info!("conn.rs encode frame typeid {:x}", item.typeid());
let item = item.make_frame();
match item {
Ok(buf) => match netout.write_all(&buf).await {

View File

@@ -13,7 +13,7 @@ use hyper::service::{make_service_fn, service_fn};
use hyper::{server::Server, Body, Request, Response};
use net::SocketAddr;
use netpod::log::*;
use netpod::{Channel, NodeConfigCached};
use netpod::{AggKind, Channel, NodeConfigCached};
use panic::{AssertUnwindSafe, UnwindSafe};
use pin::Pin;
use serde::{Deserialize, Serialize};
@@ -351,7 +351,7 @@ async fn binned_binary(query: BinnedQuery, node_config: &NodeConfigCached) -> Re
}
async fn binned_json(query: BinnedQuery, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
let ret = match disk::binned::binned_json(node_config, &query).await {
let ret = match disk::binned::binned_json(&query, node_config).await {
Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("binned_json")))?,
Err(e) => {
if query.report_error() {
@@ -412,7 +412,7 @@ async fn plain_events_binary(req: Request<Body>, node_config: &NodeConfigCached)
let (head, _body) = req.into_parts();
let query = PlainEventsQuery::from_request(&head)?;
let op = disk::channelexec::PlainEvents::new(query.channel().clone(), query.range().clone(), node_config.clone());
let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), node_config).await?;
let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?;
let s = s.map(|item| item.make_frame());
let ret = response(StatusCode::OK).body(BodyStream::wrapped(s, format!("plain_events")))?;
Ok(ret)
@@ -427,7 +427,7 @@ async fn plain_events_json(req: Request<Body>, node_config: &NodeConfigCached) -
query.timeout(),
node_config.clone(),
);
let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), node_config).await?;
let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?;
let ret = response(StatusCode::OK).body(BodyStream::wrapped(s, format!("plain_events")))?;
Ok(ret)
}

View File

@@ -661,6 +661,26 @@ pub enum AggKind {
Plain,
}
pub fn x_bin_count(shape: &Shape, agg_kind: &AggKind) -> usize {
match agg_kind {
AggKind::DimXBins1 => 0,
AggKind::DimXBinsN(n) => {
if *n == 0 {
match shape {
Shape::Scalar => 0,
Shape::Wave(n) => *n as usize,
}
} else {
*n as usize
}
}
AggKind::Plain => match shape {
Shape::Scalar => 0,
Shape::Wave(n) => *n as usize,
},
}
}
impl Display for AggKind {
fn fmt(&self, fmt: &mut Formatter) -> std::fmt::Result {
match self {