WIP pipeline post processor checks, but no general make frame yet

This commit is contained in:
Dominik Werder
2021-06-09 12:30:29 +02:00
parent ce86eb9870
commit 98dbae02d5
4 changed files with 247 additions and 98 deletions

View File

@@ -1,5 +1,5 @@
use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim};
use crate::agg::streams::{Appendable, Bins, StreamItem};
use crate::agg::streams::{Appendable, Bins, StreamItem, ToJsonBytes};
use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside};
use crate::binned::{MakeBytesFrame, RangeCompletableItem, StreamKind};
use crate::frame::makeframe::make_frame;
@@ -331,3 +331,9 @@ impl Appendable for MinMaxAvgScalarBinBatch {
self.avgs.extend_from_slice(&src.avgs);
}
}
impl ToJsonBytes for MinMaxAvgScalarBinBatch {
fn to_json_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(serde_json::to_vec(self)?)
}
}

View File

@@ -48,9 +48,12 @@ pub trait CollectionSpecMaker2 {
fn spec(bin_count_exp: u32) -> Box<dyn CollectionSpec2>;
}
pub trait ToJsonBytes {
fn to_json_bytes(&self) -> Result<Vec<u8>, Error>;
}
pub trait ToJsonResult {
type Output;
fn to_json_result(&self) -> Result<Self::Output, Error>;
fn to_json_result(&self) -> Result<Box<dyn ToJsonBytes>, Error>;
}
pub trait Appendable: WithLen {

View File

@@ -9,7 +9,8 @@ use crate::agg::enp::{Identity, WaveXBinner, XBinnedScalarEvents};
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::{
Appendable, Collectable, Collectable2, Collected, CollectionSpec2, CollectionSpecMaker2, StreamItem, ToJsonResult,
Appendable, Collectable, Collectable2, Collected, CollectionSpec2, CollectionSpecMaker2, StreamItem, ToJsonBytes,
ToJsonResult,
};
use crate::agg::{Fits, FitsInside};
use crate::binned::binnedfrompbv::BinnedFromPreBinned;
@@ -21,11 +22,11 @@ use crate::decode::{
BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValues, EventValuesDim0Case, EventValuesDim1Case,
LittleEndian, NumFromBytes,
};
use crate::frame::makeframe::{Framable, FrameType, SubFrId};
use crate::frame::makeframe::{make_frame, Framable, FrameType, SubFrId};
use crate::merge::mergedfromremotes::MergedFromRemotes2;
use crate::raw::EventsQuery;
use crate::Sitemty;
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use chrono::{TimeZone, Utc};
use err::Error;
use futures_core::Stream;
@@ -121,10 +122,14 @@ pub struct MinMaxAvgScalarBinBatchCollectedJsonResult {
continue_at: Option<IsoDateTime>,
}
impl ToJsonResult for MinMaxAvgScalarBinBatchCollected {
type Output = MinMaxAvgScalarBinBatchCollectedJsonResult;
impl ToJsonBytes for MinMaxAvgScalarBinBatchCollectedJsonResult {
fn to_json_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(serde_json::to_vec(self)?)
}
}
fn to_json_result(&self) -> Result<Self::Output, Error> {
impl ToJsonResult for MinMaxAvgScalarBinBatchCollected {
fn to_json_result(&self) -> Result<Box<dyn ToJsonBytes>, Error> {
let mut tsa: Vec<_> = self
.batch
.ts1s
@@ -152,14 +157,12 @@ impl ToJsonResult for MinMaxAvgScalarBinBatchCollected {
ts_bin_edges: tsa,
continue_at,
};
Ok(ret)
Ok(Box::new(ret))
}
}
impl ToJsonResult for MinMaxAvgScalarBinBatch {
type Output = MinMaxAvgScalarBinBatch;
fn to_json_result(&self) -> Result<Self::Output, Error> {
fn to_json_result(&self) -> Result<Box<dyn ToJsonBytes>, Error> {
err::todo();
let ret = MinMaxAvgScalarBinBatch {
ts1s: self.ts1s.clone(),
@@ -169,27 +172,23 @@ impl ToJsonResult for MinMaxAvgScalarBinBatch {
maxs: self.maxs.clone(),
avgs: self.avgs.clone(),
};
Ok(ret)
Ok(Box::new(ret))
}
}
pub trait BinnedResponseItem: Framable {}
impl<T> BinnedResponseItem for T where T: Framable {}
pub struct BinnedResponse {
stream: Pin<Box<dyn Stream<Item = Box<dyn BinnedResponseItem>> + Send>>,
pub struct BinnedResponseStat<T> {
stream: Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>,
bin_count: u32,
collection_spec: Box<dyn CollectionSpec2>,
}
// TODO Can I unify these functions with the ones from prebinned.rs?
// They also must resolve to the same types, so would be good to unify.
fn make_num_pipeline_nty_end_evs_enp<NTY, END, EVS, ENP>(
fn make_num_pipeline_nty_end_evs_enp_stat<NTY, END, EVS, ENP>(
event_value_shape: EVS,
query: BinnedQuery,
_event_value_shape: EVS,
node_config: &NodeConfigCached,
) -> Result<BinnedResponse, Error>
) -> Result<BinnedResponseStat<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>, Error>
where
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + 'static,
END: Endianness + 'static,
@@ -205,16 +204,9 @@ where
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType + Framable + 'static,
Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>:
FrameType + Framable + DeserializeOwned,
<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output: CollectionSpecMaker2,
<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output: Sized,
{
if query.channel().backend != node_config.node.backend {
let err = Error::with_msg(format!(
"backend mismatch node: {} requested: {}",
node_config.node.backend,
query.channel().backend
));
return Err(err);
}
let _ = event_value_shape;
let range = BinnedRange::covering_range(query.range().clone(), query.bin_count())?.ok_or(Error::with_msg(
format!("binned_bytes_for_http BinnedRange::covering_range returned None"),
))?;
@@ -239,15 +231,10 @@ where
node_config,
query.disk_stats_every().clone(),
query.report_error(),
)?
.map(|item| Box::new(item) as Box<dyn BinnedResponseItem>);
let ret = BinnedResponse {
)?;
let ret = BinnedResponseStat {
stream: Box::pin(s),
bin_count: range.count as u32,
collection_spec:
<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output as CollectionSpecMaker2>::spec(
range.count as u32,
),
};
Ok(ret)
}
@@ -264,14 +251,9 @@ where
};
let s = MergedFromRemotes2::<ENP>::new(evq, perf_opts, node_config.node_config.cluster.clone());
let s = TBinnerStream::<_, <ENP as EventsNodeProcessor>::Output>::new(s, range);
let s = StreamExt::map(s, |item| Box::new(item) as Box<dyn BinnedResponseItem>);
let ret = BinnedResponse {
let ret = BinnedResponseStat {
stream: Box::pin(s),
bin_count,
collection_spec:
<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output as CollectionSpecMaker2>::spec(
range.count as u32,
),
};
Ok(ret)
}
@@ -279,66 +261,146 @@ where
}
}
fn make_num_pipeline_nty_end<NTY, END>(
pub trait MakeFrame2 {
fn make_frame_2(&self) -> Result<BytesMut, Error>;
}
impl<T> MakeFrame2 for Sitemty<T>
where
Sitemty<T>: Framable,
{
fn make_frame_2(&self) -> Result<BytesMut, Error> {
todo!()
}
}
pub trait DataFramable {
fn make_data_frame(&self) -> Result<BytesMut, Error>;
}
pub trait BinnedResponseItem: Send + ToJsonResult + DataFramable {}
impl<T> BinnedResponseItem for T
where
T: Send + ToJsonResult + DataFramable,
Sitemty<T>: Framable,
{
}
pub struct BinnedResponseDyn {
stream: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn BinnedResponseItem>>> + Send>>,
bin_count: u32,
}
fn make_num_pipeline_nty_end_evs_enp<PPP, NTY, END, EVS, ENP>(
event_value_shape: EVS,
query: BinnedQuery,
ppp: PPP,
node_config: &NodeConfigCached,
) -> Result<BinnedResponseDyn, Error>
where
PPP: PipelinePostProcessA,
PPP: PipelinePostProcessB<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>,
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + 'static,
END: Endianness + 'static,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output> + 'static,
<ENP as EventsNodeProcessor>::Output: TimeBinnableType + PushableIndex + Appendable + 'static,
<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output:
TimeBinnableType<Output = <<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output> + Unpin,
Sitemty<
<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Aggregator as TimeBinnableTypeAggregator>::Output,
>: Framable,
// TODO require these things in general?
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType + Framable + 'static,
Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>:
FrameType + Framable + DeserializeOwned,
<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output: ToJsonResult + DataFramable,
{
let res = make_num_pipeline_nty_end_evs_enp_stat::<_, _, _, ENP>(event_value_shape, query, node_config)?;
let s = PPP::convert(res.stream);
let ret = BinnedResponseDyn {
stream: Box::pin(s),
bin_count: res.bin_count,
};
Ok(ret)
}
fn make_num_pipeline_nty_end<PPP, NTY, END>(
shape: Shape,
query: BinnedQuery,
ppp: PPP,
node_config: &NodeConfigCached,
) -> Result<BinnedResponse, Error>
) -> Result<BinnedResponseDyn, Error>
where
PPP: PipelinePostProcessA,
PPP: PipelinePostProcessB<MinMaxAvgBins<NTY>>,
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + 'static,
END: Endianness + 'static,
{
match shape {
Shape::Scalar => make_num_pipeline_nty_end_evs_enp::<NTY, END, _, Identity<NTY>>(
query,
Shape::Scalar => make_num_pipeline_nty_end_evs_enp::<_, NTY, _, _, Identity<_>>(
EventValuesDim0Case::new(),
query,
ppp,
node_config,
),
Shape::Wave(n) => make_num_pipeline_nty_end_evs_enp::<NTY, END, _, WaveXBinner<NTY>>(
query,
Shape::Wave(n) => make_num_pipeline_nty_end_evs_enp::<_, NTY, _, _, WaveXBinner<_>>(
EventValuesDim1Case::new(n),
query,
ppp,
node_config,
),
}
}
macro_rules! match_end {
($nty:ident, $end:expr, $shape:expr, $query:expr, $node_config:expr) => {
($nty:ident, $end:expr, $shape:expr, $query:expr, $ppp:expr, $node_config:expr) => {
match $end {
ByteOrder::LE => make_num_pipeline_nty_end::<$nty, LittleEndian>($shape, $query, $node_config),
ByteOrder::BE => make_num_pipeline_nty_end::<$nty, BigEndian>($shape, $query, $node_config),
ByteOrder::LE => make_num_pipeline_nty_end::<_, $nty, LittleEndian>($shape, $query, $ppp, $node_config),
ByteOrder::BE => make_num_pipeline_nty_end::<_, $nty, BigEndian>($shape, $query, $ppp, $node_config),
}
};
}
fn make_num_pipeline(
fn make_num_pipeline_entry<PPP>(
scalar_type: ScalarType,
byte_order: ByteOrder,
shape: Shape,
query: BinnedQuery,
ppp: PPP,
node_config: &NodeConfigCached,
) -> Result<BinnedResponse, Error> {
) -> Result<BinnedResponseDyn, Error>
where
PPP: PipelinePostProcessA,
PPP: PipelinePostProcessB<MinMaxAvgBins<i32>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<f64>>,
{
match scalar_type {
ScalarType::I32 => match_end!(i32, byte_order, shape, query, node_config),
ScalarType::F64 => match_end!(f64, byte_order, shape, query, node_config),
ScalarType::I32 => match_end!(i32, byte_order, shape, query, ppp, node_config),
ScalarType::F64 => match_end!(f64, byte_order, shape, query, ppp, node_config),
_ => todo!(),
}
}
pub trait PipelinePostProcess {
type Input;
type Output;
fn post(inp: Self::Input) -> Self::Output;
}
// TODO return impl Stream instead.
async fn make_num_pipeline_for_entry<PPP>(
async fn make_num_pipeline<PPP>(
query: &BinnedQuery,
ppp: PPP,
node_config: &NodeConfigCached,
) -> Result<BinnedResponse, Error>
) -> Result<BinnedResponseDyn, Error>
where
PPP: PipelinePostProcess,
PPP: PipelinePostProcessA,
PPP: PipelinePostProcessB<MinMaxAvgBins<i32>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<f64>>,
{
if query.channel().backend != node_config.node.backend {
let err = Error::with_msg(format!(
"backend mismatch node: {} requested: {}",
node_config.node.backend,
query.channel().backend
));
return Err(err);
}
let channel_config = read_local_config(&query.channel(), &node_config.node).await?;
match extract_matching_config_entry(query.range(), &channel_config)? {
MatchingConfigEntry::Multiple => Err(Error::with_msg("multiple config entries found"))?,
@@ -346,21 +408,21 @@ where
// TODO can I use the same binned_stream machinery to construct the matching empty result?
// Need the requested range all with empty/nan values and zero counts.
let s = futures_util::stream::empty();
let ret = BinnedResponse {
let ret = BinnedResponseDyn {
stream: Box::pin(s),
bin_count: 0,
collection_spec: <MinMaxAvgBins<u8> as CollectionSpecMaker2>::spec(0),
};
Ok(ret)
}
MatchingConfigEntry::Entry(entry) => {
// TODO make this a stream log:
info!("binned_bytes_for_http found config entry {:?}", entry);
let ret = make_num_pipeline(
let ret = make_num_pipeline_entry(
entry.scalar_type.clone(),
entry.byte_order.clone(),
entry.to_shape()?,
query.clone(),
ppp,
node_config,
)?;
Ok(ret)
@@ -368,25 +430,79 @@ where
}
}
struct ToFrameBytes {}
pub trait PipelinePostProcessA {
fn unused(&self);
}
impl PipelinePostProcess for ToFrameBytes {
type Input = ();
type Output = ();
struct Ppp1 {}
fn post(inp: Self::Input) -> Self::Output {
impl PipelinePostProcessA for Ppp1 {
fn unused(&self) {
todo!()
}
}
pub trait PipelinePostProcessB<T> {
fn convert(
inp: Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>,
) -> Pin<Box<dyn Stream<Item = Sitemty<Box<dyn BinnedResponseItem>>> + Send>>;
}
impl<NTY> PipelinePostProcessB<MinMaxAvgBins<NTY>> for Ppp1
where
NTY: NumOps,
{
fn convert(
inp: Pin<Box<dyn Stream<Item = Sitemty<MinMaxAvgBins<NTY>>> + Send>>,
) -> Pin<Box<dyn Stream<Item = Sitemty<Box<dyn BinnedResponseItem>>> + Send>> {
let s = StreamExt::map(inp, |item| match item {
Ok(item) => Ok(match item {
StreamItem::DataItem(item) => StreamItem::DataItem(match item {
RangeCompletableItem::Data(item) => {
RangeCompletableItem::Data(Box::new(item) as Box<dyn BinnedResponseItem>)
}
RangeCompletableItem::RangeComplete => RangeCompletableItem::RangeComplete,
}),
StreamItem::Log(item) => StreamItem::Log(item),
StreamItem::Stats(item) => StreamItem::Stats(item),
}),
Err(e) => Err(e),
});
Box::pin(s)
}
}
pub async fn binned_bytes_for_http(
node_config: &NodeConfigCached,
query: &BinnedQuery,
node_config: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>, Error> {
let pl = make_num_pipeline_for_entry::<ToFrameBytes>(query, node_config).await?;
let ret = pl.stream.map(|item| match item.make_frame() {
Ok(item) => Ok(item.freeze()),
Err(e) => Err(e),
let pl = make_num_pipeline::<Ppp1>(query, Ppp1 {}, node_config).await?;
let ret = pl.stream.map(|item| {
// TODO
// TODO
// Even for the "common" frame types I need the type of the inner item because the serialization
// depends on the full type. The representation of the "common" variants are not necessarily
// the same for different inner type!
// Therefore, need a "make frame" on the full Sitemty<Box<BinnedResponseItem>>
let fr = match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::Data(item) => item.make_data_frame(),
RangeCompletableItem::RangeComplete => {
make_frame(&Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))
}
},
StreamItem::Log(item) => make_frame(&Ok(StreamItem::Log(item))),
StreamItem::Stats(item) => make_frame(&Ok(StreamItem::Stats(item))),
},
Err(e) => make_frame(&Err(e)),
};
let fr = fr?;
Ok(fr.freeze())
});
Ok(Box::pin(ret))
}
@@ -534,11 +650,13 @@ pub struct UnusedBinnedJsonResult {
pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result<serde_json::Value, Error> {
err::todoval()
/*let pl = make_num_pipeline_for_entry(query, node_config).await?;
/*
let pl = make_num_pipeline_for_entry(query, node_config).await?;
let collected = collect_all(pl.stream, pl.bin_count).await?;
let ret = ToJsonResult::to_json_result(&collected)?;
let ret = serde_json::to_value(ret)?;
Ok(ret)*/
Ok(ret)
*/
}
pub struct ReadPbv<T>
@@ -731,18 +849,9 @@ pub trait NumOps:
Sized + Copy + Send + Unpin + Zero + AsPrimitive<f32> + Bounded + PartialOrd + SubFrId + Serialize + DeserializeOwned
{
}
impl<T> NumOps for T where
T: Sized
+ Copy
+ Send
+ Unpin
+ Zero
+ AsPrimitive<f32>
+ Bounded
+ PartialOrd
+ SubFrId
+ Serialize
+ DeserializeOwned
T: Send + Unpin + Zero + AsPrimitive<f32> + Bounded + PartialOrd + SubFrId + Serialize + DeserializeOwned
{
}
@@ -786,7 +895,7 @@ pub trait BinsTimeBinner {
fn process(inp: Self::Input) -> Self::Output;
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub struct MinMaxAvgBins<NTY> {
pub ts1s: Vec<u64>,
pub ts2s: Vec<u64>,
@@ -948,6 +1057,34 @@ where
}
}
impl<NTY> DataFramable for MinMaxAvgBins<NTY>
where
NTY: NumOps,
Sitemty<Self>: FrameType,
{
fn make_data_frame(&self) -> Result<BytesMut, Error> {
let item = Self {
ts1s: self.ts1s.clone(),
ts2s: self.ts2s.clone(),
counts: self.counts.clone(),
mins: self.mins.clone(),
maxs: self.maxs.clone(),
avgs: self.avgs.clone(),
};
make_frame(&Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))
}
}
impl<NTY> ToJsonResult for MinMaxAvgBins<NTY>
where
NTY: NumOps,
{
fn to_json_result(&self) -> Result<Box<dyn ToJsonBytes>, Error> {
// not available.
panic!()
}
}
pub struct MinMaxAvgBinsCollected<NTY> {
_m1: PhantomData<NTY>,
}
@@ -958,7 +1095,7 @@ impl<NTY> MinMaxAvgBinsCollected<NTY> {
}
}
impl<NTY> Collectable2 for Sitemty<MinMaxAvgBins<NTY>>
impl<NTY> Collectable2 for MinMaxAvgBins<NTY>
where
NTY: 'static,
{
@@ -976,7 +1113,10 @@ pub struct MinMaxAvgBinsCollectionSpec<NTY> {
_m1: PhantomData<NTY>,
}
impl<NTY> CollectionSpec2 for MinMaxAvgBinsCollectionSpec<NTY> {
impl<NTY> CollectionSpec2 for MinMaxAvgBinsCollectionSpec<NTY>
where
NTY: 'static,
{
fn empty(&self) -> Box<dyn Collectable2> {
Box::new(MinMaxAvgBins::<NTY>::empty())
}

View File

@@ -330,7 +330,7 @@ async fn binned(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Re
}
async fn binned_binary(query: BinnedQuery, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
let ret = match disk::binned::binned_bytes_for_http(node_config, &query).await {
let ret = match disk::binned::binned_bytes_for_http(&query, node_config).await {
Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("desc-BINNED")))?,
Err(e) => {
if query.report_error() {