From 98dbae02d55719e923224d0d7549f93a9e7ef66a Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 9 Jun 2021 12:30:29 +0200 Subject: [PATCH] WIP pipeline post processor checks, but no general make frame yet --- disk/src/agg/scalarbinbatch.rs | 8 +- disk/src/agg/streams.rs | 7 +- disk/src/binned.rs | 328 +++++++++++++++++++++++---------- httpret/src/lib.rs | 2 +- 4 files changed, 247 insertions(+), 98 deletions(-) diff --git a/disk/src/agg/scalarbinbatch.rs b/disk/src/agg/scalarbinbatch.rs index 00363c3..1b79a8d 100644 --- a/disk/src/agg/scalarbinbatch.rs +++ b/disk/src/agg/scalarbinbatch.rs @@ -1,5 +1,5 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; -use crate::agg::streams::{Appendable, Bins, StreamItem}; +use crate::agg::streams::{Appendable, Bins, StreamItem, ToJsonBytes}; use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside}; use crate::binned::{MakeBytesFrame, RangeCompletableItem, StreamKind}; use crate::frame::makeframe::make_frame; @@ -331,3 +331,9 @@ impl Appendable for MinMaxAvgScalarBinBatch { self.avgs.extend_from_slice(&src.avgs); } } + +impl ToJsonBytes for MinMaxAvgScalarBinBatch { + fn to_json_bytes(&self) -> Result, Error> { + Ok(serde_json::to_vec(self)?) + } +} diff --git a/disk/src/agg/streams.rs b/disk/src/agg/streams.rs index 01ff468..d474619 100644 --- a/disk/src/agg/streams.rs +++ b/disk/src/agg/streams.rs @@ -48,9 +48,12 @@ pub trait CollectionSpecMaker2 { fn spec(bin_count_exp: u32) -> Box; } +pub trait ToJsonBytes { + fn to_json_bytes(&self) -> Result, Error>; +} + pub trait ToJsonResult { - type Output; - fn to_json_result(&self) -> Result; + fn to_json_result(&self) -> Result, Error>; } pub trait Appendable: WithLen { diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 301391f..a1acb21 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -9,7 +9,8 @@ use crate::agg::enp::{Identity, WaveXBinner, XBinnedScalarEvents}; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::{ - Appendable, Collectable, Collectable2, Collected, CollectionSpec2, CollectionSpecMaker2, StreamItem, ToJsonResult, + Appendable, Collectable, Collectable2, Collected, CollectionSpec2, CollectionSpecMaker2, StreamItem, ToJsonBytes, + ToJsonResult, }; use crate::agg::{Fits, FitsInside}; use crate::binned::binnedfrompbv::BinnedFromPreBinned; @@ -21,11 +22,11 @@ use crate::decode::{ BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValues, EventValuesDim0Case, EventValuesDim1Case, LittleEndian, NumFromBytes, }; -use crate::frame::makeframe::{Framable, FrameType, SubFrId}; +use crate::frame::makeframe::{make_frame, Framable, FrameType, SubFrId}; use crate::merge::mergedfromremotes::MergedFromRemotes2; use crate::raw::EventsQuery; use crate::Sitemty; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use chrono::{TimeZone, Utc}; use err::Error; use futures_core::Stream; @@ -121,10 +122,14 @@ pub struct MinMaxAvgScalarBinBatchCollectedJsonResult { continue_at: Option, } -impl ToJsonResult for MinMaxAvgScalarBinBatchCollected { - type Output = MinMaxAvgScalarBinBatchCollectedJsonResult; +impl ToJsonBytes for MinMaxAvgScalarBinBatchCollectedJsonResult { + fn to_json_bytes(&self) -> Result, Error> { + Ok(serde_json::to_vec(self)?) + } +} - fn to_json_result(&self) -> Result { +impl ToJsonResult for MinMaxAvgScalarBinBatchCollected { + fn to_json_result(&self) -> Result, Error> { let mut tsa: Vec<_> = self .batch .ts1s @@ -152,14 +157,12 @@ impl ToJsonResult for MinMaxAvgScalarBinBatchCollected { ts_bin_edges: tsa, continue_at, }; - Ok(ret) + Ok(Box::new(ret)) } } impl ToJsonResult for MinMaxAvgScalarBinBatch { - type Output = MinMaxAvgScalarBinBatch; - - fn to_json_result(&self) -> Result { + fn to_json_result(&self) -> Result, Error> { err::todo(); let ret = MinMaxAvgScalarBinBatch { ts1s: self.ts1s.clone(), @@ -169,27 +172,23 @@ impl ToJsonResult for MinMaxAvgScalarBinBatch { maxs: self.maxs.clone(), avgs: self.avgs.clone(), }; - Ok(ret) + Ok(Box::new(ret)) } } -pub trait BinnedResponseItem: Framable {} -impl BinnedResponseItem for T where T: Framable {} - -pub struct BinnedResponse { - stream: Pin> + Send>>, +pub struct BinnedResponseStat { + stream: Pin> + Send>>, bin_count: u32, - collection_spec: Box, } // TODO Can I unify these functions with the ones from prebinned.rs? // They also must resolve to the same types, so would be good to unify. -fn make_num_pipeline_nty_end_evs_enp( +fn make_num_pipeline_nty_end_evs_enp_stat( + event_value_shape: EVS, query: BinnedQuery, - _event_value_shape: EVS, node_config: &NodeConfigCached, -) -> Result +) -> Result::Output as TimeBinnableType>::Output>, Error> where NTY: NumOps + NumFromBytes + Serialize + 'static, END: Endianness + 'static, @@ -205,16 +204,9 @@ where Sitemty<::Output>: FrameType + Framable + 'static, Sitemty<<::Output as TimeBinnableType>::Output>: FrameType + Framable + DeserializeOwned, - <::Output as TimeBinnableType>::Output: CollectionSpecMaker2, + <::Output as TimeBinnableType>::Output: Sized, { - if query.channel().backend != node_config.node.backend { - let err = Error::with_msg(format!( - "backend mismatch node: {} requested: {}", - node_config.node.backend, - query.channel().backend - )); - return Err(err); - } + let _ = event_value_shape; let range = BinnedRange::covering_range(query.range().clone(), query.bin_count())?.ok_or(Error::with_msg( format!("binned_bytes_for_http BinnedRange::covering_range returned None"), ))?; @@ -239,15 +231,10 @@ where node_config, query.disk_stats_every().clone(), query.report_error(), - )? - .map(|item| Box::new(item) as Box); - let ret = BinnedResponse { + )?; + let ret = BinnedResponseStat { stream: Box::pin(s), bin_count: range.count as u32, - collection_spec: - <<::Output as TimeBinnableType>::Output as CollectionSpecMaker2>::spec( - range.count as u32, - ), }; Ok(ret) } @@ -264,14 +251,9 @@ where }; let s = MergedFromRemotes2::::new(evq, perf_opts, node_config.node_config.cluster.clone()); let s = TBinnerStream::<_, ::Output>::new(s, range); - let s = StreamExt::map(s, |item| Box::new(item) as Box); - let ret = BinnedResponse { + let ret = BinnedResponseStat { stream: Box::pin(s), bin_count, - collection_spec: - <<::Output as TimeBinnableType>::Output as CollectionSpecMaker2>::spec( - range.count as u32, - ), }; Ok(ret) } @@ -279,66 +261,146 @@ where } } -fn make_num_pipeline_nty_end( +pub trait MakeFrame2 { + fn make_frame_2(&self) -> Result; +} + +impl MakeFrame2 for Sitemty +where + Sitemty: Framable, +{ + fn make_frame_2(&self) -> Result { + todo!() + } +} + +pub trait DataFramable { + fn make_data_frame(&self) -> Result; +} + +pub trait BinnedResponseItem: Send + ToJsonResult + DataFramable {} + +impl BinnedResponseItem for T +where + T: Send + ToJsonResult + DataFramable, + Sitemty: Framable, +{ +} + +pub struct BinnedResponseDyn { + stream: Pin>> + Send>>, + bin_count: u32, +} + +fn make_num_pipeline_nty_end_evs_enp( + event_value_shape: EVS, + query: BinnedQuery, + ppp: PPP, + node_config: &NodeConfigCached, +) -> Result +where + PPP: PipelinePostProcessA, + PPP: PipelinePostProcessB<<::Output as TimeBinnableType>::Output>, + NTY: NumOps + NumFromBytes + Serialize + 'static, + END: Endianness + 'static, + EVS: EventValueShape + EventValueFromBytes + 'static, + ENP: EventsNodeProcessor>::Output> + 'static, + ::Output: TimeBinnableType + PushableIndex + Appendable + 'static, + <::Output as TimeBinnableType>::Output: + TimeBinnableType::Output as TimeBinnableType>::Output> + Unpin, + Sitemty< + <<::Output as TimeBinnableType>::Aggregator as TimeBinnableTypeAggregator>::Output, + >: Framable, + // TODO require these things in general? + Sitemty<::Output>: FrameType + Framable + 'static, + Sitemty<<::Output as TimeBinnableType>::Output>: + FrameType + Framable + DeserializeOwned, + <::Output as TimeBinnableType>::Output: ToJsonResult + DataFramable, +{ + let res = make_num_pipeline_nty_end_evs_enp_stat::<_, _, _, ENP>(event_value_shape, query, node_config)?; + let s = PPP::convert(res.stream); + let ret = BinnedResponseDyn { + stream: Box::pin(s), + bin_count: res.bin_count, + }; + Ok(ret) +} + +fn make_num_pipeline_nty_end( shape: Shape, query: BinnedQuery, + ppp: PPP, node_config: &NodeConfigCached, -) -> Result +) -> Result where + PPP: PipelinePostProcessA, + PPP: PipelinePostProcessB>, NTY: NumOps + NumFromBytes + Serialize + 'static, END: Endianness + 'static, { match shape { - Shape::Scalar => make_num_pipeline_nty_end_evs_enp::>( - query, + Shape::Scalar => make_num_pipeline_nty_end_evs_enp::<_, NTY, _, _, Identity<_>>( EventValuesDim0Case::new(), + query, + ppp, node_config, ), - Shape::Wave(n) => make_num_pipeline_nty_end_evs_enp::>( - query, + Shape::Wave(n) => make_num_pipeline_nty_end_evs_enp::<_, NTY, _, _, WaveXBinner<_>>( EventValuesDim1Case::new(n), + query, + ppp, node_config, ), } } macro_rules! match_end { - ($nty:ident, $end:expr, $shape:expr, $query:expr, $node_config:expr) => { + ($nty:ident, $end:expr, $shape:expr, $query:expr, $ppp:expr, $node_config:expr) => { match $end { - ByteOrder::LE => make_num_pipeline_nty_end::<$nty, LittleEndian>($shape, $query, $node_config), - ByteOrder::BE => make_num_pipeline_nty_end::<$nty, BigEndian>($shape, $query, $node_config), + ByteOrder::LE => make_num_pipeline_nty_end::<_, $nty, LittleEndian>($shape, $query, $ppp, $node_config), + ByteOrder::BE => make_num_pipeline_nty_end::<_, $nty, BigEndian>($shape, $query, $ppp, $node_config), } }; } -fn make_num_pipeline( +fn make_num_pipeline_entry( scalar_type: ScalarType, byte_order: ByteOrder, shape: Shape, query: BinnedQuery, + ppp: PPP, node_config: &NodeConfigCached, -) -> Result { +) -> Result +where + PPP: PipelinePostProcessA, + PPP: PipelinePostProcessB>, + PPP: PipelinePostProcessB>, +{ match scalar_type { - ScalarType::I32 => match_end!(i32, byte_order, shape, query, node_config), - ScalarType::F64 => match_end!(f64, byte_order, shape, query, node_config), + ScalarType::I32 => match_end!(i32, byte_order, shape, query, ppp, node_config), + ScalarType::F64 => match_end!(f64, byte_order, shape, query, ppp, node_config), _ => todo!(), } } -pub trait PipelinePostProcess { - type Input; - type Output; - fn post(inp: Self::Input) -> Self::Output; -} - -// TODO return impl Stream instead. -async fn make_num_pipeline_for_entry( +async fn make_num_pipeline( query: &BinnedQuery, + ppp: PPP, node_config: &NodeConfigCached, -) -> Result +) -> Result where - PPP: PipelinePostProcess, + PPP: PipelinePostProcessA, + PPP: PipelinePostProcessB>, + PPP: PipelinePostProcessB>, { + if query.channel().backend != node_config.node.backend { + let err = Error::with_msg(format!( + "backend mismatch node: {} requested: {}", + node_config.node.backend, + query.channel().backend + )); + return Err(err); + } let channel_config = read_local_config(&query.channel(), &node_config.node).await?; match extract_matching_config_entry(query.range(), &channel_config)? { MatchingConfigEntry::Multiple => Err(Error::with_msg("multiple config entries found"))?, @@ -346,21 +408,21 @@ where // TODO can I use the same binned_stream machinery to construct the matching empty result? // Need the requested range all with empty/nan values and zero counts. let s = futures_util::stream::empty(); - let ret = BinnedResponse { + let ret = BinnedResponseDyn { stream: Box::pin(s), bin_count: 0, - collection_spec: as CollectionSpecMaker2>::spec(0), }; Ok(ret) } MatchingConfigEntry::Entry(entry) => { // TODO make this a stream log: info!("binned_bytes_for_http found config entry {:?}", entry); - let ret = make_num_pipeline( + let ret = make_num_pipeline_entry( entry.scalar_type.clone(), entry.byte_order.clone(), entry.to_shape()?, query.clone(), + ppp, node_config, )?; Ok(ret) @@ -368,25 +430,79 @@ where } } -struct ToFrameBytes {} +pub trait PipelinePostProcessA { + fn unused(&self); +} -impl PipelinePostProcess for ToFrameBytes { - type Input = (); - type Output = (); +struct Ppp1 {} - fn post(inp: Self::Input) -> Self::Output { +impl PipelinePostProcessA for Ppp1 { + fn unused(&self) { todo!() } } +pub trait PipelinePostProcessB { + fn convert( + inp: Pin> + Send>>, + ) -> Pin>> + Send>>; +} + +impl PipelinePostProcessB> for Ppp1 +where + NTY: NumOps, +{ + fn convert( + inp: Pin>> + Send>>, + ) -> Pin>> + Send>> { + let s = StreamExt::map(inp, |item| match item { + Ok(item) => Ok(match item { + StreamItem::DataItem(item) => StreamItem::DataItem(match item { + RangeCompletableItem::Data(item) => { + RangeCompletableItem::Data(Box::new(item) as Box) + } + RangeCompletableItem::RangeComplete => RangeCompletableItem::RangeComplete, + }), + StreamItem::Log(item) => StreamItem::Log(item), + StreamItem::Stats(item) => StreamItem::Stats(item), + }), + Err(e) => Err(e), + }); + Box::pin(s) + } +} + pub async fn binned_bytes_for_http( - node_config: &NodeConfigCached, query: &BinnedQuery, + node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { - let pl = make_num_pipeline_for_entry::(query, node_config).await?; - let ret = pl.stream.map(|item| match item.make_frame() { - Ok(item) => Ok(item.freeze()), - Err(e) => Err(e), + let pl = make_num_pipeline::(query, Ppp1 {}, node_config).await?; + let ret = pl.stream.map(|item| { + // TODO + + // TODO + + // Even for the "common" frame types I need the type of the inner item because the serialization + // depends on the full type. The representation of the "common" variants are not necessarily + // the same for different inner type! + + // Therefore, need a "make frame" on the full Sitemty> + + let fr = match item { + Ok(item) => match item { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::Data(item) => item.make_data_frame(), + RangeCompletableItem::RangeComplete => { + make_frame(&Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))) + } + }, + StreamItem::Log(item) => make_frame(&Ok(StreamItem::Log(item))), + StreamItem::Stats(item) => make_frame(&Ok(StreamItem::Stats(item))), + }, + Err(e) => make_frame(&Err(e)), + }; + let fr = fr?; + Ok(fr.freeze()) }); Ok(Box::pin(ret)) } @@ -534,11 +650,13 @@ pub struct UnusedBinnedJsonResult { pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result { err::todoval() - /*let pl = make_num_pipeline_for_entry(query, node_config).await?; + /* + let pl = make_num_pipeline_for_entry(query, node_config).await?; let collected = collect_all(pl.stream, pl.bin_count).await?; let ret = ToJsonResult::to_json_result(&collected)?; let ret = serde_json::to_value(ret)?; - Ok(ret)*/ + Ok(ret) + */ } pub struct ReadPbv @@ -731,18 +849,9 @@ pub trait NumOps: Sized + Copy + Send + Unpin + Zero + AsPrimitive + Bounded + PartialOrd + SubFrId + Serialize + DeserializeOwned { } + impl NumOps for T where - T: Sized - + Copy - + Send - + Unpin - + Zero - + AsPrimitive - + Bounded - + PartialOrd - + SubFrId - + Serialize - + DeserializeOwned + T: Send + Unpin + Zero + AsPrimitive + Bounded + PartialOrd + SubFrId + Serialize + DeserializeOwned { } @@ -786,7 +895,7 @@ pub trait BinsTimeBinner { fn process(inp: Self::Input) -> Self::Output; } -#[derive(Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize)] pub struct MinMaxAvgBins { pub ts1s: Vec, pub ts2s: Vec, @@ -948,6 +1057,34 @@ where } } +impl DataFramable for MinMaxAvgBins +where + NTY: NumOps, + Sitemty: FrameType, +{ + fn make_data_frame(&self) -> Result { + let item = Self { + ts1s: self.ts1s.clone(), + ts2s: self.ts2s.clone(), + counts: self.counts.clone(), + mins: self.mins.clone(), + maxs: self.maxs.clone(), + avgs: self.avgs.clone(), + }; + make_frame(&Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))) + } +} + +impl ToJsonResult for MinMaxAvgBins +where + NTY: NumOps, +{ + fn to_json_result(&self) -> Result, Error> { + // not available. + panic!() + } +} + pub struct MinMaxAvgBinsCollected { _m1: PhantomData, } @@ -958,7 +1095,7 @@ impl MinMaxAvgBinsCollected { } } -impl Collectable2 for Sitemty> +impl Collectable2 for MinMaxAvgBins where NTY: 'static, { @@ -976,7 +1113,10 @@ pub struct MinMaxAvgBinsCollectionSpec { _m1: PhantomData, } -impl CollectionSpec2 for MinMaxAvgBinsCollectionSpec { +impl CollectionSpec2 for MinMaxAvgBinsCollectionSpec +where + NTY: 'static, +{ fn empty(&self) -> Box { Box::new(MinMaxAvgBins::::empty()) } diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index f311500..c0d1344 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -330,7 +330,7 @@ async fn binned(req: Request, node_config: &NodeConfigCached) -> Result Result, Error> { - let ret = match disk::binned::binned_bytes_for_http(node_config, &query).await { + let ret = match disk::binned::binned_bytes_for_http(&query, node_config).await { Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("desc-BINNED")))?, Err(e) => { if query.report_error() {