Basic time-weighted binning
This commit is contained in:
@@ -3,13 +3,28 @@ use futures_util::StreamExt;
|
||||
use items::{RangeCompletableItem, Sitemty, StreamItem, TimeBinnableType, TimeBinnableTypeAggregator};
|
||||
use netpod::log::*;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::NanoRange;
|
||||
use std::collections::VecDeque;
|
||||
use std::marker::PhantomData;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
pub struct DefaultBinsTimeBinner<NTY> {
|
||||
_m1: PhantomData<NTY>,
|
||||
pub trait TimeBinningChoice {
|
||||
type Output: TimeBinnableType;
|
||||
type Aggregator: TimeBinnableTypeAggregator<Input = Self, Output = Self::Output> + Send + Unpin;
|
||||
fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator;
|
||||
}
|
||||
|
||||
pub struct TimeWeightedBinMethodMarker {}
|
||||
|
||||
pub struct TBinnerStreamPlay<S, TBT>
|
||||
where
|
||||
S: Stream<Item = Sitemty<TBT>>,
|
||||
TBT: TimeBinnableType,
|
||||
{
|
||||
inp: Pin<Box<S>>,
|
||||
left: Option<Poll<Option<Sitemty<TBT>>>>,
|
||||
//aggtor: Option<<TBT as TimeBinnableType>::Aggregator>,
|
||||
a: Option<TBT>,
|
||||
}
|
||||
|
||||
pub struct TBinnerStream<S, TBT>
|
||||
@@ -30,6 +45,7 @@ where
|
||||
range_complete_emitted: bool,
|
||||
errored: bool,
|
||||
completed: bool,
|
||||
do_time_weight: bool,
|
||||
}
|
||||
|
||||
impl<S, TBT> TBinnerStream<S, TBT>
|
||||
@@ -37,7 +53,7 @@ where
|
||||
S: Stream<Item = Sitemty<TBT>> + Send + Unpin + 'static,
|
||||
TBT: TimeBinnableType,
|
||||
{
|
||||
pub fn new(inp: S, spec: BinnedRange, x_bin_count: usize) -> Self {
|
||||
pub fn new(inp: S, spec: BinnedRange, x_bin_count: usize, do_time_weight: bool) -> Self {
|
||||
let range = spec.get_range(0);
|
||||
Self {
|
||||
inp: Box::pin(inp),
|
||||
@@ -45,7 +61,11 @@ where
|
||||
x_bin_count,
|
||||
curbin: 0,
|
||||
left: None,
|
||||
aggtor: Some(<TBT as TimeBinnableType>::aggregator(range, x_bin_count)),
|
||||
aggtor: Some(<TBT as TimeBinnableType>::aggregator(
|
||||
range,
|
||||
x_bin_count,
|
||||
do_time_weight,
|
||||
)),
|
||||
tmp_agg_results: VecDeque::new(),
|
||||
inp_completed: false,
|
||||
all_bins_emitted: false,
|
||||
@@ -53,6 +73,7 @@ where
|
||||
range_complete_emitted: false,
|
||||
errored: false,
|
||||
completed: false,
|
||||
do_time_weight,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,7 +117,11 @@ where
|
||||
let range = self.spec.get_range(self.curbin);
|
||||
let ret = self
|
||||
.aggtor
|
||||
.replace(<TBT as TimeBinnableType>::aggregator(range, self.x_bin_count))
|
||||
.replace(<TBT as TimeBinnableType>::aggregator(
|
||||
range,
|
||||
self.x_bin_count,
|
||||
self.do_time_weight,
|
||||
))
|
||||
.unwrap()
|
||||
.result();
|
||||
// TODO should we accumulate bins before emit? Maybe not, we want to stay responsive.
|
||||
|
||||
@@ -122,7 +122,12 @@ impl ChannelExecFunction for BinnedBinaryChannelExec {
|
||||
};
|
||||
let x_bin_count = x_bin_count(&shape, self.query.agg_kind());
|
||||
let s = MergedFromRemotes::<ENP>::new(evq, perf_opts, self.node_config.node_config.cluster.clone());
|
||||
let s = TBinnerStream::<_, <ENP as EventsNodeProcessor>::Output>::new(s, range, x_bin_count);
|
||||
let s = TBinnerStream::<_, <ENP as EventsNodeProcessor>::Output>::new(
|
||||
s,
|
||||
range,
|
||||
x_bin_count,
|
||||
self.query.agg_kind().do_time_weighted(),
|
||||
);
|
||||
let s = s.map(|item| match item.make_frame() {
|
||||
Ok(item) => Ok(item.freeze()),
|
||||
Err(e) => Err(e),
|
||||
@@ -361,7 +366,12 @@ impl ChannelExecFunction for BinnedJsonChannelExec {
|
||||
};
|
||||
let x_bin_count = x_bin_count(&shape, self.query.agg_kind());
|
||||
let s = MergedFromRemotes::<ENP>::new(evq, perf_opts, self.node_config.node_config.cluster.clone());
|
||||
let s = TBinnerStream::<_, <ENP as EventsNodeProcessor>::Output>::new(s, range, x_bin_count);
|
||||
let s = TBinnerStream::<_, <ENP as EventsNodeProcessor>::Output>::new(
|
||||
s,
|
||||
range,
|
||||
x_bin_count,
|
||||
self.query.agg_kind().do_time_weighted(),
|
||||
);
|
||||
let f = collect_plain_events_json(s, self.timeout, t_bin_count, self.query.do_log());
|
||||
let s = futures_util::stream::once(f).map(|item| match item {
|
||||
Ok(item) => Ok(Bytes::from(serde_json::to_vec(&item)?)),
|
||||
|
||||
@@ -235,7 +235,7 @@ where
|
||||
ready(g)
|
||||
}
|
||||
});
|
||||
let inp = TBinnerStream::<_, TBT>::new(inp, range, x_bin_count(&shape, &agg_kind));
|
||||
let inp = TBinnerStream::<_, TBT>::new(inp, range, x_bin_count(&shape, &agg_kind), agg_kind.do_time_weighted());
|
||||
Ok(Self {
|
||||
inp: Box::pin(inp),
|
||||
_m1: PhantomData,
|
||||
|
||||
@@ -137,6 +137,7 @@ where
|
||||
s,
|
||||
range,
|
||||
x_bin_count(&self.shape, &self.agg_kind),
|
||||
self.agg_kind.do_time_weighted(),
|
||||
);
|
||||
Ok(Box::pin(ret))
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ where
|
||||
Shape::Scalar => {
|
||||
let evs = EventValuesDim0Case::new();
|
||||
match agg_kind {
|
||||
AggKind::DimXBins1 => {
|
||||
AggKind::TimeWeightedScalar | AggKind::DimXBins1 => {
|
||||
let events_node_proc = <<EventValuesDim0Case<NTY> as EventValueShape<NTY, END>>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
|
||||
make_num_pipeline_nty_end_evs_enp::<NTY, END, _, _>(
|
||||
shape,
|
||||
@@ -83,7 +83,7 @@ where
|
||||
Shape::Wave(n) => {
|
||||
let evs = EventValuesDim1Case::new(n);
|
||||
match agg_kind {
|
||||
AggKind::DimXBins1 => {
|
||||
AggKind::TimeWeightedScalar | AggKind::DimXBins1 => {
|
||||
let events_node_proc = <<EventValuesDim1Case<NTY> as EventValueShape<NTY, END>>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
|
||||
make_num_pipeline_nty_end_evs_enp::<NTY, END, _, _>(
|
||||
shape,
|
||||
|
||||
@@ -69,7 +69,7 @@ impl PreBinnedQuery {
|
||||
let ret = Self {
|
||||
patch: PreBinnedPatchCoord::new(bin_t_len, patch_t_len, patch_ix),
|
||||
channel: channel_from_pairs(&pairs)?,
|
||||
agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::DimXBins1),
|
||||
agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::TimeWeightedScalar),
|
||||
cache_usage: CacheUsage::from_pairs(&pairs)?,
|
||||
disk_io_buffer_size: pairs
|
||||
.get("diskIoBufferSize")
|
||||
@@ -312,7 +312,7 @@ impl FromUrl for BinnedQuery {
|
||||
.ok_or(Error::with_msg("missing binCount"))?
|
||||
.parse()
|
||||
.map_err(|e| Error::with_msg(format!("can not parse binCount {:?}", e)))?,
|
||||
agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::DimXBins1),
|
||||
agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::TimeWeightedScalar),
|
||||
cache_usage: CacheUsage::from_pairs(&pairs)?,
|
||||
disk_io_buffer_size: pairs
|
||||
.get("diskIoBufferSize")
|
||||
@@ -382,11 +382,14 @@ impl AppendToUrl for BinnedQuery {
|
||||
fn binning_scheme_append_to_url(agg_kind: &AggKind, url: &mut Url) {
|
||||
let mut g = url.query_pairs_mut();
|
||||
match agg_kind {
|
||||
AggKind::TimeWeightedScalar => {
|
||||
g.append_pair("binningScheme", "timeWeightedScalar");
|
||||
}
|
||||
AggKind::Plain => {
|
||||
g.append_pair("binningScheme", "fullValue");
|
||||
}
|
||||
AggKind::DimXBins1 => {
|
||||
g.append_pair("binningScheme", "toScalarX");
|
||||
g.append_pair("binningScheme", "unweightedScalar");
|
||||
}
|
||||
AggKind::DimXBinsN(n) => {
|
||||
g.append_pair("binningScheme", "toScalarX");
|
||||
@@ -402,7 +405,9 @@ fn agg_kind_from_binning_scheme(pairs: &BTreeMap<String, String>) -> Result<AggK
|
||||
.map_or(Err(Error::with_msg(format!("can not find {}", key))), |k| Ok(k))?;
|
||||
let ret = if s == "fullValue" {
|
||||
AggKind::Plain
|
||||
} else if s == "toScalarX" {
|
||||
} else if s == "timeWeightedScalar" {
|
||||
AggKind::TimeWeightedScalar
|
||||
} else if s == "unweightedScalar" {
|
||||
AggKind::DimXBins1
|
||||
} else if s == "binnedX" {
|
||||
let u = pairs.get("binnedXcount").map_or("1", |k| k).parse()?;
|
||||
|
||||
@@ -98,6 +98,11 @@ where
|
||||
let events_node_proc = <<EventValuesDim0Case<NTY> as EventValueShape<NTY, END>>::NumXAggPlain as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
|
||||
channel_exec_nty_end_evs_enp(f, byte_order, shape, evs, events_node_proc)
|
||||
}
|
||||
AggKind::TimeWeightedScalar => {
|
||||
let evs = EventValuesDim0Case::new();
|
||||
let events_node_proc = <<EventValuesDim0Case<NTY> as EventValueShape<NTY, END>>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
|
||||
channel_exec_nty_end_evs_enp(f, byte_order, shape, evs, events_node_proc)
|
||||
}
|
||||
AggKind::DimXBins1 => {
|
||||
let evs = EventValuesDim0Case::new();
|
||||
let events_node_proc = <<EventValuesDim0Case<NTY> as EventValueShape<NTY, END>>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
|
||||
@@ -118,6 +123,11 @@ where
|
||||
let events_node_proc = <<EventValuesDim1Case<NTY> as EventValueShape<NTY, END>>::NumXAggPlain as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
|
||||
channel_exec_nty_end_evs_enp(f, byte_order, shape, evs, events_node_proc)
|
||||
}
|
||||
AggKind::TimeWeightedScalar => {
|
||||
let evs = EventValuesDim1Case::new(n);
|
||||
let events_node_proc = <<EventValuesDim1Case<NTY> as EventValueShape<NTY, END>>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
|
||||
channel_exec_nty_end_evs_enp(f, byte_order, shape, evs, events_node_proc)
|
||||
}
|
||||
AggKind::DimXBins1 => {
|
||||
let evs = EventValuesDim1Case::new(n);
|
||||
let events_node_proc = <<EventValuesDim1Case<NTY> as EventValueShape<NTY, END>>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
|
||||
|
||||
@@ -3,7 +3,7 @@ Delivers event data.
|
||||
|
||||
Delivers event data (not yet time-binned) from local storage and provides client functions
|
||||
to request such data from nodes.
|
||||
*/
|
||||
*/
|
||||
|
||||
use crate::frame::inmem::InMemoryFrameAsyncReadStream;
|
||||
use crate::raw::eventsfromframes::EventsFromFrames;
|
||||
|
||||
@@ -49,11 +49,13 @@ where
|
||||
macro_rules! pipe4 {
|
||||
($nty:ident, $end:ident, $shape:expr, $evs:ident, $evsv:expr, $agg_kind:expr, $event_blobs:expr) => {
|
||||
match $agg_kind {
|
||||
AggKind::DimXBins1 => make_num_pipeline_stream_evs::<$nty, $end, $evs<$nty>, _>(
|
||||
$evsv,
|
||||
<$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToSingleBin::create($shape, $agg_kind),
|
||||
$event_blobs,
|
||||
),
|
||||
AggKind::TimeWeightedScalar | AggKind::DimXBins1 => {
|
||||
make_num_pipeline_stream_evs::<$nty, $end, $evs<$nty>, _>(
|
||||
$evsv,
|
||||
<$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToSingleBin::create($shape, $agg_kind),
|
||||
$event_blobs,
|
||||
)
|
||||
}
|
||||
AggKind::DimXBinsN(_) => make_num_pipeline_stream_evs::<$nty, $end, $evs<$nty>, _>(
|
||||
$evsv,
|
||||
<$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToNBins::create($shape, $agg_kind),
|
||||
@@ -128,9 +130,11 @@ pub async fn make_event_pipe(
|
||||
evq: &RawEventsQuery,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>, Error> {
|
||||
match dbconn::channel_exists(&evq.channel, &node_config).await {
|
||||
Ok(_) => (),
|
||||
Err(e) => return Err(e)?,
|
||||
if false {
|
||||
match dbconn::channel_exists(&evq.channel, &node_config).await {
|
||||
Ok(_) => (),
|
||||
Err(e) => return Err(e)?,
|
||||
}
|
||||
}
|
||||
let range = &evq.range;
|
||||
let channel_config = match read_local_config(&evq.channel, &node_config.node).await {
|
||||
@@ -176,7 +180,7 @@ pub async fn make_event_pipe(
|
||||
evq.disk_io_buffer_size,
|
||||
event_chunker_conf,
|
||||
);
|
||||
let shape = entry.to_shape().unwrap();
|
||||
let shape = entry.to_shape()?;
|
||||
let pipe = pipe1!(
|
||||
entry.scalar_type,
|
||||
entry.byte_order,
|
||||
|
||||
Reference in New Issue
Block a user