diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 8065884..ecd52c1 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -1,18 +1,24 @@ use crate::agg::binnedt::AggregatableTdim; use crate::agg::binnedt2::AggregatableTdim2; use crate::agg::binnedt3::{Agg3, BinnedT3Stream}; -use crate::agg::enp::XBinnedScalarEvents; +use crate::agg::binnedt4::{DefaultScalarEventsTimeBinner, DefaultSingleXBinTimeBinner}; +use crate::agg::enp::{Identity, WaveXBinner, XBinnedScalarEvents}; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::{Appendable, Collectable, Collected, StreamItem, ToJsonResult}; use crate::agg::{Fits, FitsInside}; -use crate::binned::query::BinnedQuery; +use crate::binned::query::{BinnedQuery, PreBinnedQuery}; use crate::binned::scalar::binned_stream; use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BoxedStream}; use crate::cache::MergedFromRemotes; -use crate::decode::{Endianness, EventValues}; -use crate::frame::makeframe::{FrameType, SubFrId}; +use crate::decode::{ + BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValues, EventValuesDim0Case, EventValuesDim1Case, + LittleEndian, NumFromBytes, +}; +use crate::frame::makeframe::{Framable, FrameType, SubFrId}; +use crate::merge::mergefromremote::MergedFromRemotes2; use crate::raw::EventsQuery; +use crate::Sitemty; use bytes::Bytes; use chrono::{TimeZone, Utc}; use err::Error; @@ -20,7 +26,8 @@ use futures_core::Stream; use futures_util::StreamExt; use netpod::log::*; use netpod::{ - AggKind, BinnedRange, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange, + AggKind, BinnedRange, ByteOrder, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, + PreBinnedPatchRange, ScalarType, Shape, }; use num_traits::{AsPrimitive, Bounded, Zero}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; @@ -158,20 +165,186 @@ impl ToJsonResult for MinMaxAvgScalarBinBatch { type BinnedBytesStreamBox = Pin> + Send>>; +// TODO Can I unify these functions with the ones from prebinned.rs? +// They also must resolve to the same types, so would be good to unify. + +fn make_num_pipeline_nty_end_evs_enp( + query: BinnedQuery, + _event_value_shape: EVS, + node_config: &NodeConfigCached, +) -> Result> + Send>>, Error> +where + NTY: NumOps + NumFromBytes + Serialize + 'static, + END: Endianness + 'static, + EVS: EventValueShape + EventValueFromBytes + 'static, + ENP: EventsNodeProcessor>::Output> + 'static, + ETB: EventsTimeBinner::Output> + 'static, + ::Output: PushableIndex + Appendable + 'static, + ::Output: Serialize + ReadableFromFile + 'static, + Sitemty<::Output>: FrameType + Framable + 'static, + Sitemty<::Output>: Framable, +{ + // TODO construct the binned pipeline: + // Either take from prebinned sub sstream, or directly from a merged. + //let ret = crate::binned::pbv::PreBinnedValueStream::::new(query, node_config); + //let ret = StreamExt::map(ret, |item| Box::new(item) as Box); + //Box::pin(ret) + if query.channel().backend != node_config.node.backend { + let err = Error::with_msg(format!( + "backend mismatch node: {} requested: {}", + node_config.node.backend, + query.channel().backend + )); + return Err(err); + } + let range = BinnedRange::covering_range(query.range().clone(), query.bin_count())?.ok_or(Error::with_msg( + format!("binned_bytes_for_http BinnedRange::covering_range returned None"), + ))?; + let perf_opts = PerfOpts { inmem_bufcap: 512 }; + //let _shape = entry.to_shape()?; + match PreBinnedPatchRange::covering_range(query.range().clone(), query.bin_count()) { + Ok(Some(pre_range)) => { + info!("binned_bytes_for_http found pre_range: {:?}", pre_range); + if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { + let msg = format!( + "binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}", + pre_range, range + ); + return Err(Error::with_msg(msg)); + } + + // TODO + // Must generify the BinnedScalarStreamFromPreBinnedPatches. + // Copy code and introduce type parameters. + let s = BinnedScalarStreamFromPreBinnedPatches::new( + PreBinnedPatchIterator::from_range(pre_range), + query.channel().clone(), + range.clone(), + query.agg_kind().clone(), + query.cache_usage().clone(), + node_config, + query.disk_stats_every().clone(), + query.report_error(), + self.clone(), + )?; + + let s = BoxedStream::new(Box::pin(s))?; + let ret = BinnedStreamRes { + binned_stream: s, + range, + }; + Ok(ret) + } + Ok(None) => { + info!( + "binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}", + range + ); + let evq = EventsQuery { + channel: query.channel().clone(), + range: query.range().clone(), + agg_kind: query.agg_kind().clone(), + }; + + // TODO do I need to set up more transformations or binning to deliver the requested data? + //let s = SK::new_binned_from_merged(&stream_kind, evq, perf_opts, range.clone(), node_config)?; + + // TODO adapt the usage the same way how I do in prebinned.rs: + let s = MergedFromRemotes2::new(evq, perf_opts, node_config.node_config.cluster.clone(), self.clone()); + let s = Self::xbinned_to_tbinned(s, range); + + let s = BoxedStream::new(Box::pin(s))?; + let ret = BinnedStreamRes { + binned_stream: s, + range, + }; + Ok(ret) + } + Err(e) => Err(e), + } + err::todoval() +} + +fn make_num_pipeline_nty_end( + shape: Shape, + query: BinnedQuery, + node_config: &NodeConfigCached, +) -> Result> + Send>>, Error> +where + NTY: NumOps + NumFromBytes + Serialize + 'static, + END: Endianness + 'static, +{ + match shape { + Shape::Scalar => { + make_num_pipeline_nty_end_evs_enp::, DefaultScalarEventsTimeBinner>( + query, + EventValuesDim0Case::new(), + node_config, + ) + } + Shape::Wave(n) => { + make_num_pipeline_nty_end_evs_enp::, DefaultSingleXBinTimeBinner>( + query, + EventValuesDim1Case::new(n), + node_config, + ) + } + } +} + +macro_rules! match_end { + ($nty:ident, $end:expr, $shape:expr, $query:expr, $node_config:expr) => { + match $end { + ByteOrder::LE => make_num_pipeline_nty_end::<$nty, LittleEndian>($shape, $query, $node_config), + ByteOrder::BE => make_num_pipeline_nty_end::<$nty, BigEndian>($shape, $query, $node_config), + } + }; +} + +fn make_num_pipeline( + scalar_type: ScalarType, + byte_order: ByteOrder, + shape: Shape, + query: BinnedQuery, + node_config: &NodeConfigCached, +) -> Result> + Send>>, Error> { + match scalar_type { + ScalarType::I32 => match_end!(i32, byte_order, shape, query, node_config), + ScalarType::F64 => match_end!(f64, byte_order, shape, query, node_config), + _ => todo!(), + } +} + pub async fn binned_bytes_for_http( node_config: &NodeConfigCached, query: &BinnedQuery, ) -> Result { let channel_config = read_local_config(&query.channel(), &node_config.node).await?; match extract_matching_config_entry(query.range(), &channel_config)? { + MatchingConfigEntry::Multiple => Err(Error::with_msg("multiple config entries found"))?, MatchingConfigEntry::None => { // TODO can I use the same binned_stream machinery to construct the matching empty result? + // Need the requested range all with empty/nan values and zero counts. let s = futures_util::stream::empty(); Ok(Box::pin(s)) } - MatchingConfigEntry::Multiple => Err(Error::with_msg("multiple config entries found"))?, MatchingConfigEntry::Entry(entry) => { + // TODO make this a stream log: info!("binned_bytes_for_http found config entry {:?}", entry); + let res = make_num_pipeline( + entry.scalar_type.clone(), + entry.byte_order.clone(), + entry.to_shape()?, + query.clone(), + node_config, + )?; + let res = res.map(|item| item.make_frame()); + let res = res.map(|item| match item { + Ok(item) => Ok(item.freeze()), + Err(e) => Err(e), + }); + let res = Box::pin(res); + return Ok(res); match query.agg_kind() { AggKind::DimXBins1 => { let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?; diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index 419bc95..22e59e4 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -28,7 +28,7 @@ use std::pin::Pin; fn make_num_pipeline_nty_end_evs_enp( query: PreBinnedQuery, - event_value_shape: EVS, + _event_value_shape: EVS, node_config: &NodeConfigCached, ) -> Pin> + Send>> where @@ -37,10 +37,9 @@ where EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Output> + 'static, ETB: EventsTimeBinner::Output> + 'static, - Sitemty<::Output>: Framable + 'static, ::Output: PushableIndex + Appendable + 'static, ::Output: Serialize + ReadableFromFile + 'static, - Sitemty<::Output>: FrameType, + Sitemty<::Output>: FrameType + Framable + 'static, Sitemty<::Output>: Framable, { // TODO diff --git a/disk/src/binned/scalar.rs b/disk/src/binned/scalar.rs index 91546e0..250a0f4 100644 --- a/disk/src/binned/scalar.rs +++ b/disk/src/binned/scalar.rs @@ -6,6 +6,7 @@ use err::Error; use netpod::log::*; use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchRange}; +// TODO can be removed when StreamKind no longer used. pub async fn binned_stream( node_config: &NodeConfigCached, query: &BinnedQuery, diff --git a/disk/src/frame/makeframe.rs b/disk/src/frame/makeframe.rs index c0491ef..9e32265 100644 --- a/disk/src/frame/makeframe.rs +++ b/disk/src/frame/makeframe.rs @@ -206,7 +206,7 @@ where } if frame.tyid() != ::FRAME_TYPE_ID { return Err(Error::with_msg(format!( - "type id mismatch expect {} found {:?}", + "type id mismatch expect {:x} found {:?}", ::FRAME_TYPE_ID, frame )));