Remove old PPP-based stream in favor of channel-exec
This commit is contained in:
@@ -1,2 +1,2 @@
|
||||
[build]
|
||||
rustflags = ["-C", "force-frame-pointers"]
|
||||
#rustflags = ["-C", "force-frame-pointers"]
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::agg::binnedt::{TBinnerStream, TimeBinnableType, TimeBinnableTypeAggregator};
|
||||
use crate::agg::enp::{ts_offs_from_abs, Identity, WaveXBinner};
|
||||
use crate::agg::enp::ts_offs_from_abs;
|
||||
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
|
||||
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
|
||||
use crate::agg::streams::{Appendable, Collectable, Collector, StreamItem, ToJsonBytes, ToJsonResult};
|
||||
@@ -8,10 +8,7 @@ use crate::binned::binnedfrompbv::BinnedFromPreBinned;
|
||||
use crate::binned::query::BinnedQuery;
|
||||
use crate::binnedstream::BoxedStream;
|
||||
use crate::channelexec::{channel_exec, collect_plain_events_json, ChannelExecFunction};
|
||||
use crate::decode::{
|
||||
Endianness, EventValueFromBytes, EventValueShape, EventValues, EventValuesDim0Case, EventValuesDim1Case,
|
||||
NumFromBytes,
|
||||
};
|
||||
use crate::decode::{Endianness, EventValueFromBytes, EventValueShape, EventValues, NumFromBytes};
|
||||
use crate::frame::makeframe::{Framable, FrameType, SubFrId};
|
||||
use crate::merge::mergedfromremotes::MergedFromRemotes;
|
||||
use crate::raw::RawEventsQuery;
|
||||
@@ -20,7 +17,7 @@ use bytes::Bytes;
|
||||
use chrono::{TimeZone, Utc};
|
||||
use err::Error;
|
||||
use futures_core::Stream;
|
||||
use futures_util::{FutureExt, StreamExt};
|
||||
use futures_util::StreamExt;
|
||||
use netpod::log::*;
|
||||
use netpod::timeunits::SEC;
|
||||
use netpod::{
|
||||
@@ -28,7 +25,6 @@ use netpod::{
|
||||
PreBinnedPatchRange, Shape,
|
||||
};
|
||||
use num_traits::{AsPrimitive, Bounded, Float, Zero};
|
||||
use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry};
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
use std::fmt;
|
||||
@@ -52,443 +48,6 @@ pub struct BinnedStreamRes<I> {
|
||||
pub range: BinnedRange,
|
||||
}
|
||||
|
||||
pub struct BinnedResponseStat<T> {
|
||||
stream: Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>,
|
||||
bin_count: u32,
|
||||
}
|
||||
|
||||
// TODO Can I unify these functions with the ones from prebinned.rs?
|
||||
// They also must resolve to the same types, so would be good to unify.
|
||||
|
||||
fn make_num_pipeline_nty_end_evs_enp_stat<NTY, END, EVS, ENP>(
|
||||
shape: Shape,
|
||||
event_value_shape: EVS,
|
||||
query: BinnedQuery,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<BinnedResponseStat<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>, Error>
|
||||
where
|
||||
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + 'static,
|
||||
END: Endianness + 'static,
|
||||
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
|
||||
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output> + 'static,
|
||||
<ENP as EventsNodeProcessor>::Output: TimeBinnableType + PushableIndex + Appendable + 'static,
|
||||
<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output:
|
||||
TimeBinnableType<Output = <<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output> + Unpin,
|
||||
Sitemty<
|
||||
<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Aggregator as TimeBinnableTypeAggregator>::Output,
|
||||
>: Framable,
|
||||
// TODO require these things in general?
|
||||
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType + Framable + 'static,
|
||||
Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>:
|
||||
FrameType + Framable + DeserializeOwned,
|
||||
<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output: Sized,
|
||||
{
|
||||
let _ = event_value_shape;
|
||||
let range = BinnedRange::covering_range(
|
||||
query.range().clone(),
|
||||
query.bin_count(),
|
||||
node_config.node.bin_grain_kind,
|
||||
)?
|
||||
.ok_or(Error::with_msg(format!(
|
||||
"binned_bytes_for_http BinnedRange::covering_range returned None"
|
||||
)))?;
|
||||
let perf_opts = PerfOpts { inmem_bufcap: 512 };
|
||||
match PreBinnedPatchRange::covering_range(
|
||||
query.range().clone(),
|
||||
query.bin_count(),
|
||||
node_config.node.bin_grain_kind,
|
||||
) {
|
||||
Ok(Some(pre_range)) => {
|
||||
info!("binned_bytes_for_http found pre_range: {:?}", pre_range);
|
||||
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
|
||||
let msg = format!(
|
||||
"binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}",
|
||||
pre_range, range
|
||||
);
|
||||
return Err(Error::with_msg(msg));
|
||||
}
|
||||
let s = BinnedFromPreBinned::<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>::new(
|
||||
PreBinnedPatchIterator::from_range(pre_range),
|
||||
query.channel().clone(),
|
||||
range.clone(),
|
||||
shape,
|
||||
query.agg_kind().clone(),
|
||||
query.cache_usage().clone(),
|
||||
node_config,
|
||||
query.disk_stats_every().clone(),
|
||||
query.report_error(),
|
||||
)?;
|
||||
let ret = BinnedResponseStat {
|
||||
stream: Box::pin(s),
|
||||
bin_count: range.count as u32,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
Ok(None) => {
|
||||
info!(
|
||||
"binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}",
|
||||
range
|
||||
);
|
||||
let bin_count = range.count as u32;
|
||||
let evq = RawEventsQuery {
|
||||
channel: query.channel().clone(),
|
||||
range: query.range().clone(),
|
||||
agg_kind: query.agg_kind().clone(),
|
||||
};
|
||||
let x_bin_count = if let AggKind::DimXBinsN(n) = query.agg_kind() {
|
||||
*n as usize
|
||||
} else {
|
||||
0
|
||||
};
|
||||
let s = MergedFromRemotes::<ENP>::new(evq, perf_opts, node_config.node_config.cluster.clone());
|
||||
let s = TBinnerStream::<_, <ENP as EventsNodeProcessor>::Output>::new(s, range, x_bin_count);
|
||||
let ret = BinnedResponseStat {
|
||||
stream: Box::pin(s),
|
||||
bin_count,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
pub trait BinnedResponseItem: Send + ToJsonResult + Framable {}
|
||||
|
||||
impl<T> BinnedResponseItem for T where T: Send + ToJsonResult + Framable {}
|
||||
|
||||
pub struct BinnedResponseDyn {
|
||||
stream: Pin<Box<dyn Stream<Item = Box<dyn BinnedResponseItem>> + Send>>,
|
||||
}
|
||||
|
||||
// TODO remove after refactor of PPP:
|
||||
fn make_num_pipeline_nty_end_evs_enp<PPP, NTY, END, EVS, ENP>(
|
||||
shape: Shape,
|
||||
_agg_kind: AggKind,
|
||||
event_value_shape: EVS,
|
||||
_events_node_proc: ENP,
|
||||
query: BinnedQuery,
|
||||
ppp: PPP,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<BinnedResponseDyn, Error>
|
||||
where
|
||||
PPP: PipelinePostProcessA,
|
||||
PPP: PipelinePostProcessB<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>,
|
||||
NTY: NumOps + NumFromBytes<NTY, END> + 'static,
|
||||
END: Endianness + 'static,
|
||||
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
|
||||
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output> + 'static,
|
||||
// TODO require these properties in general:
|
||||
<ENP as EventsNodeProcessor>::Output: TimeBinnableType + PushableIndex + Appendable + 'static,
|
||||
<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output:
|
||||
TimeBinnableType<Output = <<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output> + Unpin,
|
||||
Sitemty<
|
||||
<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Aggregator as TimeBinnableTypeAggregator>::Output,
|
||||
>: Framable,
|
||||
// TODO require these things in general?
|
||||
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType + Framable + 'static,
|
||||
// TODO is this correct? why do I want the Output to be Framable?
|
||||
Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>:
|
||||
FrameType + Framable + DeserializeOwned,
|
||||
Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>: ToJsonResult + Framable,
|
||||
{
|
||||
let res = make_num_pipeline_nty_end_evs_enp_stat::<_, _, _, ENP>(shape, event_value_shape, query, node_config)?;
|
||||
let s = ppp.convert(res.stream, res.bin_count);
|
||||
let ret = BinnedResponseDyn { stream: Box::pin(s) };
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
// TODO remove after refactor of PPP:
|
||||
#[allow(dead_code)]
|
||||
fn make_num_pipeline_nty_end<PPP, NTY, END>(
|
||||
shape: Shape,
|
||||
query: BinnedQuery,
|
||||
ppp: PPP,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<BinnedResponseDyn, Error>
|
||||
where
|
||||
PPP: PipelinePostProcessA,
|
||||
PPP: PipelinePostProcessB<MinMaxAvgBins<NTY>>,
|
||||
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + 'static,
|
||||
END: Endianness + 'static,
|
||||
{
|
||||
let agg_kind = query.agg_kind().clone();
|
||||
match shape {
|
||||
Shape::Scalar => {
|
||||
let evs = EventValuesDim0Case::new();
|
||||
match agg_kind {
|
||||
AggKind::DimXBins1 => {
|
||||
let events_node_proc = <<EventValuesDim0Case<NTY> as EventValueShape<NTY, END>>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
|
||||
make_num_pipeline_nty_end_evs_enp::<_, NTY, END, _, _>(
|
||||
shape,
|
||||
agg_kind,
|
||||
evs,
|
||||
events_node_proc,
|
||||
query,
|
||||
ppp,
|
||||
node_config,
|
||||
)
|
||||
}
|
||||
AggKind::DimXBinsN(_) => {
|
||||
let events_node_proc = <<EventValuesDim0Case<NTY> as EventValueShape<NTY, END>>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
|
||||
make_num_pipeline_nty_end_evs_enp::<_, NTY, END, _, _>(
|
||||
shape,
|
||||
agg_kind,
|
||||
evs,
|
||||
events_node_proc,
|
||||
query,
|
||||
ppp,
|
||||
node_config,
|
||||
)
|
||||
}
|
||||
AggKind::Plain => {
|
||||
panic!();
|
||||
}
|
||||
}
|
||||
}
|
||||
Shape::Wave(n) => {
|
||||
let evs = EventValuesDim1Case::new(n);
|
||||
match agg_kind {
|
||||
AggKind::DimXBins1 => {
|
||||
let events_node_proc = <<EventValuesDim1Case<NTY> as EventValueShape<NTY, END>>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
|
||||
make_num_pipeline_nty_end_evs_enp::<PPP, NTY, END, _, _>(
|
||||
shape,
|
||||
agg_kind,
|
||||
evs,
|
||||
events_node_proc,
|
||||
query,
|
||||
ppp,
|
||||
node_config,
|
||||
)
|
||||
}
|
||||
AggKind::DimXBinsN(_) => {
|
||||
let _events_node_proc = <<EventValuesDim1Case<NTY> as EventValueShape<NTY, END>>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
|
||||
/*let yo = make_num_pipeline_nty_end_evs_enp::<PPP, NTY, END, _, _>(
|
||||
shape,
|
||||
agg_kind,
|
||||
evs,
|
||||
events_node_proc,
|
||||
query,
|
||||
ppp,
|
||||
node_config,
|
||||
);*/
|
||||
err::todoval()
|
||||
}
|
||||
AggKind::Plain => {
|
||||
panic!();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO remove after refactor of PPP:
|
||||
#[allow(dead_code)]
|
||||
fn make_num_pipeline_nty_end_old<PPP, NTY, END>(
|
||||
shape: Shape,
|
||||
query: BinnedQuery,
|
||||
ppp: PPP,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<BinnedResponseDyn, Error>
|
||||
where
|
||||
PPP: PipelinePostProcessA,
|
||||
PPP: PipelinePostProcessB<MinMaxAvgBins<NTY>>,
|
||||
PPP: PipelinePostProcessB<MinMaxAvgWaveBins<NTY>>,
|
||||
NTY: NumOps + NumFromBytes<NTY, END> + 'static,
|
||||
END: Endianness + 'static,
|
||||
{
|
||||
let agg_kind = query.agg_kind().clone();
|
||||
match shape {
|
||||
Shape::Scalar => make_num_pipeline_nty_end_evs_enp::<_, NTY, _, _, Identity<_>>(
|
||||
shape.clone(),
|
||||
agg_kind.clone(),
|
||||
EventValuesDim0Case::new(),
|
||||
Identity::create(shape.clone(), agg_kind.clone()),
|
||||
query,
|
||||
ppp,
|
||||
node_config,
|
||||
),
|
||||
Shape::Wave(n) => make_num_pipeline_nty_end_evs_enp::<_, NTY, _, _, WaveXBinner<_>>(
|
||||
shape.clone(),
|
||||
agg_kind.clone(),
|
||||
EventValuesDim1Case::new(n),
|
||||
WaveXBinner::create(shape.clone(), agg_kind.clone()),
|
||||
query,
|
||||
ppp,
|
||||
node_config,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
// TODO remove after refactor of PPP:
|
||||
#[allow(unused_macros)]
|
||||
macro_rules! match_end {
|
||||
($nty:ident, $end:expr, $shape:expr, $query:expr, $ppp:expr, $node_config:expr) => {
|
||||
match $end {
|
||||
ByteOrder::LE => make_num_pipeline_nty_end::<_, $nty, LittleEndian>($shape, $query, $ppp, $node_config),
|
||||
ByteOrder::BE => make_num_pipeline_nty_end::<_, $nty, BigEndian>($shape, $query, $ppp, $node_config),
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// TODO remove after refactor of PPP
|
||||
/*fn make_num_pipeline_entry<PPP>(
|
||||
scalar_type: ScalarType,
|
||||
byte_order: ByteOrder,
|
||||
shape: Shape,
|
||||
query: BinnedQuery,
|
||||
ppp: PPP,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<BinnedResponseDyn, Error>
|
||||
where
|
||||
PPP: PipelinePostProcessA,
|
||||
PPP: PipelinePostProcessB<MinMaxAvgBins<u8>>,
|
||||
PPP: PipelinePostProcessB<MinMaxAvgBins<u16>>,
|
||||
PPP: PipelinePostProcessB<MinMaxAvgBins<u32>>,
|
||||
PPP: PipelinePostProcessB<MinMaxAvgBins<u64>>,
|
||||
PPP: PipelinePostProcessB<MinMaxAvgBins<i8>>,
|
||||
PPP: PipelinePostProcessB<MinMaxAvgBins<i16>>,
|
||||
PPP: PipelinePostProcessB<MinMaxAvgBins<i32>>,
|
||||
PPP: PipelinePostProcessB<MinMaxAvgBins<i64>>,
|
||||
PPP: PipelinePostProcessB<MinMaxAvgBins<f32>>,
|
||||
PPP: PipelinePostProcessB<MinMaxAvgBins<f64>>,
|
||||
{
|
||||
match scalar_type {
|
||||
ScalarType::U8 => match_end!(u8, byte_order, shape, query, ppp, node_config),
|
||||
ScalarType::U16 => match_end!(u16, byte_order, shape, query, ppp, node_config),
|
||||
ScalarType::U32 => match_end!(u32, byte_order, shape, query, ppp, node_config),
|
||||
ScalarType::U64 => match_end!(u64, byte_order, shape, query, ppp, node_config),
|
||||
ScalarType::I8 => match_end!(i8, byte_order, shape, query, ppp, node_config),
|
||||
ScalarType::I16 => match_end!(i16, byte_order, shape, query, ppp, node_config),
|
||||
ScalarType::I32 => match_end!(i32, byte_order, shape, query, ppp, node_config),
|
||||
ScalarType::I64 => match_end!(i64, byte_order, shape, query, ppp, node_config),
|
||||
ScalarType::F32 => match_end!(f32, byte_order, shape, query, ppp, node_config),
|
||||
ScalarType::F64 => match_end!(f64, byte_order, shape, query, ppp, node_config),
|
||||
ScalarType::BOOL => match_end!(BoolNum, byte_order, shape, query, ppp, node_config),
|
||||
}
|
||||
}*/
|
||||
|
||||
async fn make_num_pipeline<PPP>(
|
||||
query: &BinnedQuery,
|
||||
_ppp: PPP,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<BinnedResponseDyn, Error> {
|
||||
if query.channel().backend != node_config.node.backend {
|
||||
let err = Error::with_msg(format!(
|
||||
"backend mismatch node: {} requested: {}",
|
||||
node_config.node.backend,
|
||||
query.channel().backend
|
||||
));
|
||||
return Err(err);
|
||||
}
|
||||
let channel_config = match read_local_config(&query.channel(), &node_config.node).await {
|
||||
Ok(k) => k,
|
||||
Err(e) => {
|
||||
if e.msg().contains("ErrorKind::NotFound") {
|
||||
let s = futures_util::stream::empty();
|
||||
let ret = BinnedResponseDyn { stream: Box::pin(s) };
|
||||
return Ok(ret);
|
||||
} else {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
match extract_matching_config_entry(query.range(), &channel_config)? {
|
||||
MatchingConfigEntry::Multiple => Err(Error::with_msg("multiple config entries found"))?,
|
||||
MatchingConfigEntry::None => {
|
||||
// TODO can I use the same binned_stream machinery to construct the matching empty result?
|
||||
// Need the requested range all with empty/nan values and zero counts.
|
||||
let s = futures_util::stream::empty();
|
||||
let ret = BinnedResponseDyn { stream: Box::pin(s) };
|
||||
Ok(ret)
|
||||
}
|
||||
MatchingConfigEntry::Entry(entry) => {
|
||||
// TODO make this a stream log:
|
||||
info!("binned_bytes_for_http found config entry {:?}", entry);
|
||||
/*let ret = make_num_pipeline_entry(
|
||||
entry.scalar_type.clone(),
|
||||
entry.byte_order.clone(),
|
||||
entry.to_shape()?,
|
||||
query.clone(),
|
||||
ppp,
|
||||
node_config,
|
||||
)?;
|
||||
Ok(ret)*/
|
||||
err::todoval()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait PipelinePostProcessA {}
|
||||
|
||||
struct MakeBoxedItems {}
|
||||
|
||||
impl PipelinePostProcessA for MakeBoxedItems {}
|
||||
|
||||
pub trait PipelinePostProcessB<T> {
|
||||
fn convert(
|
||||
&self,
|
||||
inp: Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>,
|
||||
bin_count_exp: u32,
|
||||
) -> Pin<Box<dyn Stream<Item = Box<dyn BinnedResponseItem>> + Send>>;
|
||||
}
|
||||
|
||||
impl<NTY> PipelinePostProcessB<MinMaxAvgBins<NTY>> for MakeBoxedItems
|
||||
where
|
||||
NTY: NumOps,
|
||||
{
|
||||
fn convert(
|
||||
&self,
|
||||
inp: Pin<Box<dyn Stream<Item = Sitemty<MinMaxAvgBins<NTY>>> + Send>>,
|
||||
_bin_count_exp: u32,
|
||||
) -> Pin<Box<dyn Stream<Item = Box<dyn BinnedResponseItem>> + Send>> {
|
||||
let s = StreamExt::map(inp, |item| Box::new(item) as Box<dyn BinnedResponseItem>);
|
||||
Box::pin(s)
|
||||
}
|
||||
}
|
||||
|
||||
struct CollectForJson {
|
||||
timeout: Duration,
|
||||
abort_after_bin_count: u32,
|
||||
}
|
||||
|
||||
impl CollectForJson {
|
||||
#[allow(dead_code)]
|
||||
pub fn new(timeout: Duration, abort_after_bin_count: u32) -> Self {
|
||||
Self {
|
||||
timeout,
|
||||
abort_after_bin_count,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PipelinePostProcessA for CollectForJson {}
|
||||
|
||||
pub struct JsonCollector {
|
||||
fut: Pin<Box<dyn Future<Output = Result<serde_json::Value, Error>> + Send>>,
|
||||
completed: bool,
|
||||
done: bool,
|
||||
}
|
||||
|
||||
impl JsonCollector {
|
||||
pub fn new<NTY>(
|
||||
inp: Pin<Box<dyn Stream<Item = Sitemty<MinMaxAvgBins<NTY>>> + Send>>,
|
||||
bin_count_exp: u32,
|
||||
timeout: Duration,
|
||||
abort_after_bin_count: u32,
|
||||
) -> Self
|
||||
where
|
||||
NTY: NumOps + Serialize + 'static,
|
||||
{
|
||||
let fut = collect_all(inp, bin_count_exp, timeout, abort_after_bin_count);
|
||||
let fut = Box::pin(fut);
|
||||
Self {
|
||||
fut,
|
||||
completed: false,
|
||||
done: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ToJsonBytes for serde_json::Value {
|
||||
fn to_json_bytes(&self) -> Result<Vec<u8>, Error> {
|
||||
Ok(serde_json::to_vec(self)?)
|
||||
@@ -511,50 +70,109 @@ impl ToJsonResult for Sitemty<serde_json::Value> {
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for JsonCollector {
|
||||
type Item = Box<dyn BinnedResponseItem>;
|
||||
pub struct BinnedBinaryChannelExec {
|
||||
query: BinnedQuery,
|
||||
node_config: NodeConfigCached,
|
||||
}
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
loop {
|
||||
break if self.completed {
|
||||
panic!("poll_next on completed")
|
||||
} else if self.done {
|
||||
self.completed = true;
|
||||
Ready(None)
|
||||
} else {
|
||||
match self.fut.poll_unpin(cx) {
|
||||
Ready(Ok(item)) => {
|
||||
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)));
|
||||
let item = Box::new(item) as Box<dyn BinnedResponseItem>;
|
||||
self.done = true;
|
||||
Ready(Some(item))
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
// TODO don't emit the error as json.
|
||||
let item = Err::<StreamItem<RangeCompletableItem<serde_json::Value>>, _>(e);
|
||||
let item = Box::new(item) as Box<dyn BinnedResponseItem>;
|
||||
self.done = true;
|
||||
Ready(Some(item))
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
};
|
||||
}
|
||||
impl BinnedBinaryChannelExec {
|
||||
pub fn new(query: BinnedQuery, node_config: NodeConfigCached) -> Self {
|
||||
Self { query, node_config }
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY> PipelinePostProcessB<MinMaxAvgBins<NTY>> for CollectForJson
|
||||
where
|
||||
NTY: NumOps,
|
||||
{
|
||||
fn convert(
|
||||
&self,
|
||||
inp: Pin<Box<dyn Stream<Item = Sitemty<MinMaxAvgBins<NTY>>> + Send>>,
|
||||
bin_count_exp: u32,
|
||||
) -> Pin<Box<dyn Stream<Item = Box<dyn BinnedResponseItem>> + Send>> {
|
||||
let s = JsonCollector::new(inp, bin_count_exp, self.timeout, self.abort_after_bin_count);
|
||||
Box::pin(s)
|
||||
impl ChannelExecFunction for BinnedBinaryChannelExec {
|
||||
type Output = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>;
|
||||
|
||||
fn exec<NTY, END, EVS, ENP>(
|
||||
self,
|
||||
_byte_order: END,
|
||||
shape: Shape,
|
||||
event_value_shape: EVS,
|
||||
_events_node_proc: ENP,
|
||||
) -> Result<Self::Output, Error>
|
||||
where
|
||||
NTY: NumOps + NumFromBytes<NTY, END> + 'static,
|
||||
END: Endianness + 'static,
|
||||
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
|
||||
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output> + 'static,
|
||||
// TODO require these things in general?
|
||||
<ENP as EventsNodeProcessor>::Output: Collectable + PushableIndex,
|
||||
<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output: Debug
|
||||
+ TimeBinnableType<Output = <<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>
|
||||
+ Collectable
|
||||
+ Unpin,
|
||||
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType + Framable + 'static,
|
||||
Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>:
|
||||
FrameType + Framable + DeserializeOwned,
|
||||
{
|
||||
let _ = event_value_shape;
|
||||
let range = BinnedRange::covering_range(
|
||||
self.query.range().clone(),
|
||||
self.query.bin_count(),
|
||||
self.node_config.node.bin_grain_kind,
|
||||
)?
|
||||
.ok_or(Error::with_msg(format!(
|
||||
"BinnedBinaryChannelExec BinnedRange::covering_range returned None"
|
||||
)))?;
|
||||
let perf_opts = PerfOpts { inmem_bufcap: 512 };
|
||||
let souter = match PreBinnedPatchRange::covering_range(
|
||||
self.query.range().clone(),
|
||||
self.query.bin_count(),
|
||||
self.node_config.node.bin_grain_kind,
|
||||
) {
|
||||
Ok(Some(pre_range)) => {
|
||||
info!("BinnedBinaryChannelExec found pre_range: {:?}", pre_range);
|
||||
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
|
||||
let msg = format!(
|
||||
"BinnedBinaryChannelExec incompatible ranges:\npre_range: {:?}\nrange: {:?}",
|
||||
pre_range, range
|
||||
);
|
||||
return Err(Error::with_msg(msg));
|
||||
}
|
||||
let s = BinnedFromPreBinned::<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>::new(
|
||||
PreBinnedPatchIterator::from_range(pre_range),
|
||||
self.query.channel().clone(),
|
||||
range.clone(),
|
||||
shape,
|
||||
self.query.agg_kind().clone(),
|
||||
self.query.cache_usage().clone(),
|
||||
&self.node_config,
|
||||
self.query.disk_stats_every().clone(),
|
||||
self.query.report_error(),
|
||||
)?
|
||||
.map(|item| match item.make_frame() {
|
||||
Ok(item) => Ok(item.freeze()),
|
||||
Err(e) => Err(e),
|
||||
});
|
||||
Ok(Box::pin(s) as Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>)
|
||||
}
|
||||
Ok(None) => {
|
||||
info!(
|
||||
"BinnedBinaryChannelExec no covering range for prebinned, merge from remotes instead {:?}",
|
||||
range
|
||||
);
|
||||
let evq = RawEventsQuery {
|
||||
channel: self.query.channel().clone(),
|
||||
range: self.query.range().clone(),
|
||||
agg_kind: self.query.agg_kind().clone(),
|
||||
};
|
||||
let x_bin_count = x_bin_count(&shape, self.query.agg_kind());
|
||||
let s = MergedFromRemotes::<ENP>::new(evq, perf_opts, self.node_config.node_config.cluster.clone());
|
||||
let s = TBinnerStream::<_, <ENP as EventsNodeProcessor>::Output>::new(s, range, x_bin_count);
|
||||
let s = s.map(|item| match item.make_frame() {
|
||||
Ok(item) => Ok(item.freeze()),
|
||||
Err(e) => Err(e),
|
||||
});
|
||||
Ok(Box::pin(s) as Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>)
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}?;
|
||||
Ok(souter)
|
||||
}
|
||||
|
||||
fn empty() -> Self::Output {
|
||||
Box::pin(futures_util::stream::empty())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -562,12 +180,14 @@ pub async fn binned_bytes_for_http(
|
||||
query: &BinnedQuery,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>, Error> {
|
||||
let pl = make_num_pipeline::<MakeBoxedItems>(query, MakeBoxedItems {}, node_config).await?;
|
||||
let ret = pl.stream.map(|item| {
|
||||
let fr = item.make_frame();
|
||||
let fr = fr?;
|
||||
Ok(fr.freeze())
|
||||
});
|
||||
let ret = channel_exec(
|
||||
BinnedBinaryChannelExec::new(query.clone(), node_config.clone()),
|
||||
query.channel(),
|
||||
query.range(),
|
||||
query.agg_kind().clone(),
|
||||
node_config,
|
||||
)
|
||||
.await?;
|
||||
Ok(Box::pin(ret))
|
||||
}
|
||||
|
||||
@@ -758,7 +378,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec {
|
||||
self.node_config.node.bin_grain_kind,
|
||||
)?
|
||||
.ok_or(Error::with_msg(format!(
|
||||
"binned_bytes_for_http BinnedRange::covering_range returned None"
|
||||
"BinnedJsonChannelExec BinnedRange::covering_range returned None"
|
||||
)))?;
|
||||
let t_bin_count = range.count as u32;
|
||||
let perf_opts = PerfOpts { inmem_bufcap: 512 };
|
||||
@@ -768,10 +388,10 @@ impl ChannelExecFunction for BinnedJsonChannelExec {
|
||||
self.node_config.node.bin_grain_kind,
|
||||
) {
|
||||
Ok(Some(pre_range)) => {
|
||||
info!("binned_bytes_for_http found pre_range: {:?}", pre_range);
|
||||
info!("BinnedJsonChannelExec found pre_range: {:?}", pre_range);
|
||||
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
|
||||
let msg = format!(
|
||||
"binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}",
|
||||
"BinnedJsonChannelExec incompatible ranges:\npre_range: {:?}\nrange: {:?}",
|
||||
pre_range, range
|
||||
);
|
||||
return Err(Error::with_msg(msg));
|
||||
@@ -796,7 +416,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec {
|
||||
}
|
||||
Ok(None) => {
|
||||
info!(
|
||||
"binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}",
|
||||
"BinnedJsonChannelExec no covering range for prebinned, merge from remotes instead {:?}",
|
||||
range
|
||||
);
|
||||
let evq = RawEventsQuery {
|
||||
|
||||
@@ -393,10 +393,9 @@ const BIN_T_LEN_OPTIONS_1: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY];
|
||||
|
||||
const PATCH_T_LEN_KEY: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY];
|
||||
|
||||
//const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 4] = [MIN * 60, HOUR * 4, DAY * 4, DAY * 32];
|
||||
|
||||
// Testing this for GLS:
|
||||
const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 4] = [HOUR * 4, DAY * 4, DAY * 16, DAY * 32];
|
||||
const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 4] = [MIN * 60, HOUR * 4, DAY * 4, DAY * 32];
|
||||
// Maybe alternative for GLS:
|
||||
//const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 4] = [HOUR * 4, DAY * 4, DAY * 16, DAY * 32];
|
||||
|
||||
const PATCH_T_LEN_OPTIONS_WAVE: [u64; 4] = [MIN * 10, HOUR * 2, DAY * 4, DAY * 32];
|
||||
|
||||
@@ -517,8 +516,8 @@ impl PreBinnedPatchRange {
|
||||
if min_bin_count < 1 {
|
||||
Err(Error::with_msg("min_bin_count < 1"))?;
|
||||
}
|
||||
if min_bin_count > 6000 {
|
||||
Err(Error::with_msg("min_bin_count > 6000"))?;
|
||||
if min_bin_count > 20000 {
|
||||
Err(Error::with_msg(format!("min_bin_count > 20000: {}", min_bin_count)))?;
|
||||
}
|
||||
let dt = range.delta();
|
||||
if dt > DAY * 200 {
|
||||
@@ -690,8 +689,8 @@ impl BinnedRange {
|
||||
if min_bin_count < 1 {
|
||||
Err(Error::with_msg("min_bin_count < 1"))?;
|
||||
}
|
||||
if min_bin_count > 6000 {
|
||||
Err(Error::with_msg("min_bin_count > 6000"))?;
|
||||
if min_bin_count > 20000 {
|
||||
Err(Error::with_msg(format!("min_bin_count > 20000: {}", min_bin_count)))?;
|
||||
}
|
||||
let dt = range.delta();
|
||||
if dt > DAY * 200 {
|
||||
|
||||
Reference in New Issue
Block a user