WIP on binned endpoint
This commit is contained in:
@@ -1,18 +1,24 @@
|
||||
use crate::agg::binnedt::AggregatableTdim;
|
||||
use crate::agg::binnedt2::AggregatableTdim2;
|
||||
use crate::agg::binnedt3::{Agg3, BinnedT3Stream};
|
||||
use crate::agg::enp::XBinnedScalarEvents;
|
||||
use crate::agg::binnedt4::{DefaultScalarEventsTimeBinner, DefaultSingleXBinTimeBinner};
|
||||
use crate::agg::enp::{Identity, WaveXBinner, XBinnedScalarEvents};
|
||||
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
|
||||
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
|
||||
use crate::agg::streams::{Appendable, Collectable, Collected, StreamItem, ToJsonResult};
|
||||
use crate::agg::{Fits, FitsInside};
|
||||
use crate::binned::query::BinnedQuery;
|
||||
use crate::binned::query::{BinnedQuery, PreBinnedQuery};
|
||||
use crate::binned::scalar::binned_stream;
|
||||
use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BoxedStream};
|
||||
use crate::cache::MergedFromRemotes;
|
||||
use crate::decode::{Endianness, EventValues};
|
||||
use crate::frame::makeframe::{FrameType, SubFrId};
|
||||
use crate::decode::{
|
||||
BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValues, EventValuesDim0Case, EventValuesDim1Case,
|
||||
LittleEndian, NumFromBytes,
|
||||
};
|
||||
use crate::frame::makeframe::{Framable, FrameType, SubFrId};
|
||||
use crate::merge::mergefromremote::MergedFromRemotes2;
|
||||
use crate::raw::EventsQuery;
|
||||
use crate::Sitemty;
|
||||
use bytes::Bytes;
|
||||
use chrono::{TimeZone, Utc};
|
||||
use err::Error;
|
||||
@@ -20,7 +26,8 @@ use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::log::*;
|
||||
use netpod::{
|
||||
AggKind, BinnedRange, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange,
|
||||
AggKind, BinnedRange, ByteOrder, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator,
|
||||
PreBinnedPatchRange, ScalarType, Shape,
|
||||
};
|
||||
use num_traits::{AsPrimitive, Bounded, Zero};
|
||||
use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry};
|
||||
@@ -158,20 +165,186 @@ impl ToJsonResult for MinMaxAvgScalarBinBatch {
|
||||
|
||||
type BinnedBytesStreamBox = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>;
|
||||
|
||||
// TODO Can I unify these functions with the ones from prebinned.rs?
|
||||
// They also must resolve to the same types, so would be good to unify.
|
||||
|
||||
fn make_num_pipeline_nty_end_evs_enp<NTY, END, EVS, ENP, ETB>(
|
||||
query: BinnedQuery,
|
||||
_event_value_shape: EVS,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>, Error>
|
||||
where
|
||||
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + 'static,
|
||||
END: Endianness + 'static,
|
||||
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
|
||||
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output> + 'static,
|
||||
ETB: EventsTimeBinner<Input = <ENP as EventsNodeProcessor>::Output> + 'static,
|
||||
<ENP as EventsNodeProcessor>::Output: PushableIndex + Appendable + 'static,
|
||||
<ETB as EventsTimeBinner>::Output: Serialize + ReadableFromFile + 'static,
|
||||
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType + Framable + 'static,
|
||||
Sitemty<<ETB as EventsTimeBinner>::Output>: Framable,
|
||||
{
|
||||
// TODO construct the binned pipeline:
|
||||
// Either take from prebinned sub sstream, or directly from a merged.
|
||||
//let ret = crate::binned::pbv::PreBinnedValueStream::<NTY, END, EVS, ENP, ETB>::new(query, node_config);
|
||||
//let ret = StreamExt::map(ret, |item| Box::new(item) as Box<dyn Framable>);
|
||||
//Box::pin(ret)
|
||||
if query.channel().backend != node_config.node.backend {
|
||||
let err = Error::with_msg(format!(
|
||||
"backend mismatch node: {} requested: {}",
|
||||
node_config.node.backend,
|
||||
query.channel().backend
|
||||
));
|
||||
return Err(err);
|
||||
}
|
||||
let range = BinnedRange::covering_range(query.range().clone(), query.bin_count())?.ok_or(Error::with_msg(
|
||||
format!("binned_bytes_for_http BinnedRange::covering_range returned None"),
|
||||
))?;
|
||||
let perf_opts = PerfOpts { inmem_bufcap: 512 };
|
||||
//let _shape = entry.to_shape()?;
|
||||
match PreBinnedPatchRange::covering_range(query.range().clone(), query.bin_count()) {
|
||||
Ok(Some(pre_range)) => {
|
||||
info!("binned_bytes_for_http found pre_range: {:?}", pre_range);
|
||||
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
|
||||
let msg = format!(
|
||||
"binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}",
|
||||
pre_range, range
|
||||
);
|
||||
return Err(Error::with_msg(msg));
|
||||
}
|
||||
|
||||
// TODO
|
||||
// Must generify the BinnedScalarStreamFromPreBinnedPatches.
|
||||
// Copy code and introduce type parameters.
|
||||
let s = BinnedScalarStreamFromPreBinnedPatches::new(
|
||||
PreBinnedPatchIterator::from_range(pre_range),
|
||||
query.channel().clone(),
|
||||
range.clone(),
|
||||
query.agg_kind().clone(),
|
||||
query.cache_usage().clone(),
|
||||
node_config,
|
||||
query.disk_stats_every().clone(),
|
||||
query.report_error(),
|
||||
self.clone(),
|
||||
)?;
|
||||
|
||||
let s = BoxedStream::new(Box::pin(s))?;
|
||||
let ret = BinnedStreamRes {
|
||||
binned_stream: s,
|
||||
range,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
Ok(None) => {
|
||||
info!(
|
||||
"binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}",
|
||||
range
|
||||
);
|
||||
let evq = EventsQuery {
|
||||
channel: query.channel().clone(),
|
||||
range: query.range().clone(),
|
||||
agg_kind: query.agg_kind().clone(),
|
||||
};
|
||||
|
||||
// TODO do I need to set up more transformations or binning to deliver the requested data?
|
||||
//let s = SK::new_binned_from_merged(&stream_kind, evq, perf_opts, range.clone(), node_config)?;
|
||||
|
||||
// TODO adapt the usage the same way how I do in prebinned.rs:
|
||||
let s = MergedFromRemotes2::new(evq, perf_opts, node_config.node_config.cluster.clone(), self.clone());
|
||||
let s = Self::xbinned_to_tbinned(s, range);
|
||||
|
||||
let s = BoxedStream::new(Box::pin(s))?;
|
||||
let ret = BinnedStreamRes {
|
||||
binned_stream: s,
|
||||
range,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
err::todoval()
|
||||
}
|
||||
|
||||
fn make_num_pipeline_nty_end<NTY, END>(
|
||||
shape: Shape,
|
||||
query: BinnedQuery,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>, Error>
|
||||
where
|
||||
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + 'static,
|
||||
END: Endianness + 'static,
|
||||
{
|
||||
match shape {
|
||||
Shape::Scalar => {
|
||||
make_num_pipeline_nty_end_evs_enp::<NTY, END, _, Identity<NTY>, DefaultScalarEventsTimeBinner<NTY>>(
|
||||
query,
|
||||
EventValuesDim0Case::new(),
|
||||
node_config,
|
||||
)
|
||||
}
|
||||
Shape::Wave(n) => {
|
||||
make_num_pipeline_nty_end_evs_enp::<NTY, END, _, WaveXBinner<NTY>, DefaultSingleXBinTimeBinner<NTY>>(
|
||||
query,
|
||||
EventValuesDim1Case::new(n),
|
||||
node_config,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! match_end {
|
||||
($nty:ident, $end:expr, $shape:expr, $query:expr, $node_config:expr) => {
|
||||
match $end {
|
||||
ByteOrder::LE => make_num_pipeline_nty_end::<$nty, LittleEndian>($shape, $query, $node_config),
|
||||
ByteOrder::BE => make_num_pipeline_nty_end::<$nty, BigEndian>($shape, $query, $node_config),
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
fn make_num_pipeline(
|
||||
scalar_type: ScalarType,
|
||||
byte_order: ByteOrder,
|
||||
shape: Shape,
|
||||
query: BinnedQuery,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>, Error> {
|
||||
match scalar_type {
|
||||
ScalarType::I32 => match_end!(i32, byte_order, shape, query, node_config),
|
||||
ScalarType::F64 => match_end!(f64, byte_order, shape, query, node_config),
|
||||
_ => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn binned_bytes_for_http(
|
||||
node_config: &NodeConfigCached,
|
||||
query: &BinnedQuery,
|
||||
) -> Result<BinnedBytesStreamBox, Error> {
|
||||
let channel_config = read_local_config(&query.channel(), &node_config.node).await?;
|
||||
match extract_matching_config_entry(query.range(), &channel_config)? {
|
||||
MatchingConfigEntry::Multiple => Err(Error::with_msg("multiple config entries found"))?,
|
||||
MatchingConfigEntry::None => {
|
||||
// TODO can I use the same binned_stream machinery to construct the matching empty result?
|
||||
// Need the requested range all with empty/nan values and zero counts.
|
||||
let s = futures_util::stream::empty();
|
||||
Ok(Box::pin(s))
|
||||
}
|
||||
MatchingConfigEntry::Multiple => Err(Error::with_msg("multiple config entries found"))?,
|
||||
MatchingConfigEntry::Entry(entry) => {
|
||||
// TODO make this a stream log:
|
||||
info!("binned_bytes_for_http found config entry {:?}", entry);
|
||||
let res = make_num_pipeline(
|
||||
entry.scalar_type.clone(),
|
||||
entry.byte_order.clone(),
|
||||
entry.to_shape()?,
|
||||
query.clone(),
|
||||
node_config,
|
||||
)?;
|
||||
let res = res.map(|item| item.make_frame());
|
||||
let res = res.map(|item| match item {
|
||||
Ok(item) => Ok(item.freeze()),
|
||||
Err(e) => Err(e),
|
||||
});
|
||||
let res = Box::pin(res);
|
||||
return Ok(res);
|
||||
match query.agg_kind() {
|
||||
AggKind::DimXBins1 => {
|
||||
let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?;
|
||||
|
||||
Reference in New Issue
Block a user