Get X-binned dim-1 with N X-bins as json

This commit is contained in:
Dominik Werder
2021-06-16 13:57:45 +02:00
parent edafc610c2
commit 99d0a97a69
8 changed files with 405 additions and 205 deletions

View File

@@ -1,5 +1,5 @@
use crate::agg::binnedt::{TBinnerStream, TimeBinnableType, TimeBinnableTypeAggregator};
use crate::agg::enp::{Identity, WaveXBinner};
use crate::agg::enp::{ts_offs_from_abs, Identity, WaveXBinner};
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::{Appendable, Collectable, Collector, StreamItem, ToJsonBytes, ToJsonResult};
@@ -7,7 +7,7 @@ use crate::agg::{Fits, FitsInside};
use crate::binned::binnedfrompbv::BinnedFromPreBinned;
use crate::binned::query::BinnedQuery;
use crate::binnedstream::BoxedStream;
use crate::channelexec::{channel_exec, collect_plain_events_json, ChannelExecFunction, PlainEventsAggMethod};
use crate::channelexec::{channel_exec, collect_plain_events_json, ChannelExecFunction};
use crate::decode::{
BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValues, EventValuesDim0Case, EventValuesDim1Case,
LittleEndian, NumFromBytes,
@@ -24,7 +24,7 @@ use futures_util::{FutureExt, StreamExt};
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{
AggKind, BinnedRange, ByteOrder, Channel, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator,
x_bin_count, AggKind, BinnedRange, ByteOrder, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator,
PreBinnedPatchRange, ScalarType, Shape,
};
use num_traits::{AsPrimitive, Bounded, Float, Zero};
@@ -269,6 +269,7 @@ where
}
}
#[allow(dead_code)]
fn make_num_pipeline_nty_end_old<PPP, NTY, END>(
shape: Shape,
query: BinnedQuery,
@@ -446,6 +447,7 @@ struct CollectForJson {
}
impl CollectForJson {
#[allow(dead_code)]
pub fn new(timeout: Duration, abort_after_bin_count: u32) -> Self {
Self {
timeout,
@@ -637,6 +639,12 @@ impl Serialize for IsoDateTime {
}
}
pub fn make_iso_ts(tss: &[u64]) -> Vec<IsoDateTime> {
tss.iter()
.map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64)))
.collect()
}
pub async fn collect_all<T, S>(
stream: S,
bin_count_exp: u32,
@@ -704,6 +712,10 @@ pub struct BinnedJsonChannelExec {
impl BinnedJsonChannelExec {
pub fn new(query: BinnedQuery, node_config: NodeConfigCached) -> Self {
info!(
"BinnedJsonChannelExec AggKind: {:?}\n--------------------------------------------------------------",
query.agg_kind()
);
Self {
query,
node_config,
@@ -717,7 +729,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec {
fn exec<NTY, END, EVS, ENP>(
self,
byte_order: END,
_byte_order: END,
shape: Shape,
event_value_shape: EVS,
_events_node_proc: ENP,
@@ -725,14 +737,14 @@ impl ChannelExecFunction for BinnedJsonChannelExec {
where
NTY: NumOps + NumFromBytes<NTY, END> + 'static,
END: Endianness + 'static,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + PlainEventsAggMethod + 'static,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output> + 'static,
Sitemty<<<EVS as PlainEventsAggMethod>::Method as EventsNodeProcessor>::Output>: FrameType,
<<EVS as PlainEventsAggMethod>::Method as EventsNodeProcessor>::Output: Collectable + PushableIndex,
<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output:
TimeBinnableType<Output = <<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output> + Unpin,
// TODO require these things in general?
<ENP as EventsNodeProcessor>::Output: PushableIndex,
<ENP as EventsNodeProcessor>::Output: Collectable + PushableIndex,
<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output: Debug
+ TimeBinnableType<Output = <<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>
+ Collectable
+ Unpin,
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType + Framable + 'static,
Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>:
FrameType + Framable + DeserializeOwned,
@@ -742,8 +754,9 @@ impl ChannelExecFunction for BinnedJsonChannelExec {
BinnedRange::covering_range(self.query.range().clone(), self.query.bin_count())?.ok_or(Error::with_msg(
format!("binned_bytes_for_http BinnedRange::covering_range returned None"),
))?;
let t_bin_count = range.count as u32;
let perf_opts = PerfOpts { inmem_bufcap: 512 };
match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) {
let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) {
Ok(Some(pre_range)) => {
info!("binned_bytes_for_http found pre_range: {:?}", pre_range);
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
@@ -763,74 +776,37 @@ impl ChannelExecFunction for BinnedJsonChannelExec {
&self.node_config,
self.query.disk_stats_every().clone(),
self.query.report_error(),
)?
.map(|item| match item.make_frame() {
Ok(item) => Ok(item.freeze()),
Err(e) => Err(e),
)?;
let f = collect_plain_events_json(s, self.timeout, t_bin_count);
let s = futures_util::stream::once(f).map(|item| match item {
Ok(item) => Ok(Bytes::from(serde_json::to_vec(&item)?)),
Err(e) => Err(e.into()),
});
// TODO remove?
/*let ret = BinnedResponseStat {
stream: Box::pin(s),
bin_count: range.count as u32,
};*/
Ok(Box::pin(s))
Ok(Box::pin(s) as Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>)
}
Ok(None) => {
info!(
"binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}",
range
);
let bin_count = range.count as u32;
let evq = EventsQuery {
channel: self.query.channel().clone(),
range: self.query.range().clone(),
agg_kind: self.query.agg_kind().clone(),
};
let x_bin_count = if let AggKind::DimXBinsN(n) = self.query.agg_kind() {
*n as usize
} else {
0
};
let x_bin_count = x_bin_count(&shape, self.query.agg_kind());
let s = MergedFromRemotes::<ENP>::new(evq, perf_opts, self.node_config.node_config.cluster.clone());
let s =
TBinnerStream::<_, <ENP as EventsNodeProcessor>::Output>::new(s, range, x_bin_count).map(|item| {
match item.make_frame() {
Ok(item) => Ok(item.freeze()),
Err(e) => Err(e),
}
});
/*let ret = BinnedResponseStat {
stream: Box::pin(s),
bin_count,
};*/
Ok(Box::pin(s))
let s = TBinnerStream::<_, <ENP as EventsNodeProcessor>::Output>::new(s, range, x_bin_count);
let f = collect_plain_events_json(s, self.timeout, t_bin_count);
let s = futures_util::stream::once(f).map(|item| match item {
Ok(item) => Ok(Bytes::from(serde_json::to_vec(&item)?)),
Err(e) => Err(e.into()),
});
Ok(Box::pin(s) as Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>)
}
Err(e) => Err(e),
}
/*let perf_opts = PerfOpts { inmem_bufcap: 4096 };
let evq = EventsQuery {
channel: self.channel,
range: self.range,
agg_kind: self.agg_kind,
};
let s = MergedFromRemotes::<<EVS as PlainEventsAggMethod>::Method>::new(
evq,
perf_opts,
self.node_config.node_config.cluster,
);
let f = collect_plain_events_json(s, self.timeout);
let f = FutureExt::map(f, |item| match item {
Ok(item) => {
// TODO add channel entry info here?
//let obj = item.as_object_mut().unwrap();
//obj.insert("channelName", JsonValue::String(en));
Ok(Bytes::from(serde_json::to_vec(&item)?))
}
Err(e) => Err(e.into()),
});
let s = futures_util::stream::once(f);
Ok(Box::pin(s))*/
}?;
Ok(souter)
}
fn empty() -> Self::Output {
@@ -842,11 +818,6 @@ pub async fn binned_json(
query: &BinnedQuery,
node_config: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>, Error> {
// TODO try the channelexec approach.
// TODO why does channel_exec need the range, and what does it use it for?
// do I want there the user-requested range or the bin-edge-adjusted range?
// TODO currently, channel_exec resolves NTY, END, EVS but not ENP!
// can I add that or does that break other things?
let ret = channel_exec(
BinnedJsonChannelExec::new(query.clone(), node_config.clone()),
query.channel(),
@@ -855,19 +826,6 @@ pub async fn binned_json(
node_config,
)
.await?;
/*let pl = make_num_pipeline(
query,
CollectForJson::new(query.timeout(), query.abort_after_bin_count()),
node_config,
)
.await?;
let ret = pl.stream.map(|item| {
let fr = item.to_json_result()?;
let buf = fr.to_json_bytes()?;
Ok(Bytes::from(buf))
});*/
Ok(Box::pin(ret))
}
@@ -1027,6 +985,7 @@ pub trait NumOps:
fn min_or_nan() -> Self;
fn max_or_nan() -> Self;
fn is_nan(&self) -> bool;
fn fourty_two() -> Self;
}
macro_rules! impl_num_ops {
@@ -1041,6 +1000,9 @@ macro_rules! impl_num_ops {
fn is_nan(&self) -> bool {
$is_nan(self)
}
fn fourty_two() -> Self {
42 as Self
}
}
};
}
@@ -1268,8 +1230,12 @@ impl<NTY> MinMaxAvgBinsCollected<NTY> {
#[derive(Serialize)]
pub struct MinMaxAvgBinsCollectedResult<NTY> {
ts0: u64,
tsoff: Vec<u64>,
#[serde(rename = "tsAnchor")]
ts_anchor_sec: u64,
#[serde(rename = "tsMs")]
ts_off_ms: Vec<u64>,
#[serde(rename = "tsNs")]
ts_off_ns: Vec<u64>,
//ts_bin_edges: Vec<IsoDateTime>,
counts: Vec<u64>,
mins: Vec<Option<NTY>>,
@@ -1352,9 +1318,16 @@ where
} else {
None
};
// TODO could save the copy:
let mut ts_all = self.vals.ts1s.clone();
if self.vals.ts2s.len() > 0 {
ts_all.push(*self.vals.ts2s.last().unwrap());
}
let tst = ts_offs_from_abs(&ts_all);
let ret = MinMaxAvgBinsCollectedResult::<NTY> {
ts0,
tsoff,
ts_anchor_sec: tst.0,
ts_off_ms: tst.1,
ts_off_ns: tst.2,
counts: self.vals.counts,
mins: self.vals.mins,
maxs: self.vals.maxs,
@@ -1573,7 +1546,7 @@ pub enum RangeCompletableItem<T> {
Data(T),
}
#[derive(Clone, Serialize, Deserialize)]
#[derive(Serialize, Deserialize)]
pub struct MinMaxAvgWaveBins<NTY> {
pub ts1s: Vec<u64>,
pub ts2s: Vec<u64>,
@@ -1590,7 +1563,7 @@ where
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(
fmt,
"MinMaxAvgBins count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}",
"MinMaxAvgWaveBins count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}",
self.ts1s.len(),
self.ts1s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.ts2s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
@@ -1758,9 +1731,12 @@ impl<NTY> MinMaxAvgWaveBinsCollected<NTY> {
#[derive(Serialize)]
pub struct MinMaxAvgWaveBinsCollectedResult<NTY> {
ts0: u64,
tsoff: Vec<u64>,
//ts_bin_edges: Vec<IsoDateTime>,
#[serde(rename = "tsAnchor")]
ts_anchor_sec: u64,
#[serde(rename = "tsMs")]
ts_off_ms: Vec<u64>,
#[serde(rename = "tsNs")]
ts_off_ns: Vec<u64>,
counts: Vec<u64>,
mins: Vec<Option<Vec<NTY>>>,
maxs: Vec<Option<Vec<NTY>>>,
@@ -1770,8 +1746,7 @@ pub struct MinMaxAvgWaveBinsCollectedResult<NTY> {
#[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")]
missing_bins: u32,
#[serde(skip_serializing_if = "Option::is_none", rename = "continueAt")]
//continue_at: Option<IsoDateTime>,
continue_at: Option<u64>,
continue_at: Option<IsoDateTime>,
}
pub struct MinMaxAvgWaveBinsCollector<NTY> {
@@ -1823,34 +1798,34 @@ where
}
fn result(self) -> Result<Self::Output, Error> {
let ts0 = self.vals.ts1s.first().map_or(0, |k| *k / SEC);
let bin_count = self.vals.ts1s.len() as u32;
let mut tsoff: Vec<_> = self.vals.ts1s.iter().map(|k| *k - ts0 * SEC).collect();
if let Some(&k) = self.vals.ts2s.last() {
tsoff.push(k - ts0 * SEC);
let t_bin_count = self.vals.counts.len();
// TODO could save the copy:
let mut ts_all = self.vals.ts1s.clone();
if self.vals.ts2s.len() > 0 {
ts_all.push(*self.vals.ts2s.last().unwrap());
}
let tsoff = tsoff;
let _iso: Vec<_> = tsoff
.iter()
.map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64)))
.collect();
let continue_at = if self.vals.ts1s.len() < self.bin_count_exp as usize {
match tsoff.last() {
Some(k) => Some(k.clone()),
match ts_all.last() {
Some(&k) => {
let iso = IsoDateTime(Utc.timestamp_nanos(k as i64));
Some(iso)
}
None => Err(Error::with_msg("partial_content but no bin in result"))?,
}
} else {
None
};
let tst = ts_offs_from_abs(&ts_all);
let ret = MinMaxAvgWaveBinsCollectedResult {
ts0,
tsoff,
ts_anchor_sec: tst.0,
ts_off_ms: tst.1,
ts_off_ns: tst.2,
counts: self.vals.counts,
mins: self.vals.mins,
maxs: self.vals.maxs,
avgs: self.vals.avgs,
finalised_range: self.range_complete,
missing_bins: self.bin_count_exp - bin_count,
missing_bins: self.bin_count_exp - t_bin_count as u32,
continue_at,
};
Ok(ret)
@@ -1885,8 +1860,8 @@ where
Self {
range,
count: 0,
min: vec![NTY::min_or_nan(); x_bin_count],
max: vec![NTY::max_or_nan(); x_bin_count],
min: vec![NTY::max_or_nan(); x_bin_count],
max: vec![NTY::min_or_nan(); x_bin_count],
sum: vec![0f32; x_bin_count],
sumc: 0,
}
@@ -1916,7 +1891,7 @@ where
None => {}
Some(inp) => {
for (a, b) in self.min.iter_mut().zip(inp.iter()) {
if *b < *a {
if *b < *a || a.is_nan() {
*a = *b;
}
}
@@ -1926,7 +1901,7 @@ where
None => {}
Some(inp) => {
for (a, b) in self.max.iter_mut().zip(inp.iter()) {
if *b > *a {
if *b > *a || a.is_nan() {
*a = *b;
}
}