First basic plain event fetch as binary

This commit is contained in:
Dominik Werder
2021-06-10 17:37:40 +02:00
parent 99c45985ef
commit 0c43d5367e
9 changed files with 499 additions and 18 deletions

View File

@@ -12,7 +12,7 @@ use crate::decode::{
LittleEndian, NumFromBytes,
};
use crate::frame::makeframe::{Framable, FrameType, SubFrId};
use crate::merge::mergedfromremotes::MergedFromRemotes2;
use crate::merge::mergedfromremotes::MergedFromRemotes;
use crate::raw::EventsQuery;
use crate::Sitemty;
use bytes::{Bytes, BytesMut};
@@ -30,6 +30,7 @@ use num_traits::{AsPrimitive, Bounded, Zero};
use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize, Serializer};
use std::fmt::Debug;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
@@ -83,7 +84,6 @@ where
format!("binned_bytes_for_http BinnedRange::covering_range returned None"),
))?;
let perf_opts = PerfOpts { inmem_bufcap: 512 };
//let _shape = entry.to_shape()?;
match PreBinnedPatchRange::covering_range(query.range().clone(), query.bin_count()) {
Ok(Some(pre_range)) => {
info!("binned_bytes_for_http found pre_range: {:?}", pre_range);
@@ -121,7 +121,7 @@ where
range: query.range().clone(),
agg_kind: query.agg_kind().clone(),
};
let s = MergedFromRemotes2::<ENP>::new(evq, perf_opts, node_config.node_config.cluster.clone());
let s = MergedFromRemotes::<ENP>::new(evq, perf_opts, node_config.node_config.cluster.clone());
let s = TBinnerStream::<_, <ENP as EventsNodeProcessor>::Output>::new(s, range);
let ret = BinnedResponseStat {
stream: Box::pin(s),
@@ -150,10 +150,11 @@ fn make_num_pipeline_nty_end_evs_enp<PPP, NTY, END, EVS, ENP>(
where
PPP: PipelinePostProcessA,
PPP: PipelinePostProcessB<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>,
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + 'static,
NTY: NumOps + NumFromBytes<NTY, END> + 'static,
END: Endianness + 'static,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output> + 'static,
// TODO require these properties in general:
<ENP as EventsNodeProcessor>::Output: TimeBinnableType + PushableIndex + Appendable + 'static,
<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output:
TimeBinnableType<Output = <<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output> + Unpin,
@@ -182,7 +183,7 @@ fn make_num_pipeline_nty_end<PPP, NTY, END>(
where
PPP: PipelinePostProcessA,
PPP: PipelinePostProcessB<MinMaxAvgBins<NTY>>,
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + 'static,
NTY: NumOps + NumFromBytes<NTY, END> + 'static,
END: Endianness + 'static,
{
match shape {
@@ -744,12 +745,23 @@ pub trait RangeOverlapInfo {
}
pub trait NumOps:
Sized + Copy + Send + Unpin + Zero + AsPrimitive<f32> + Bounded + PartialOrd + SubFrId + Serialize + DeserializeOwned
Sized
+ Copy
+ Send
+ Unpin
+ Debug
+ Zero
+ AsPrimitive<f32>
+ Bounded
+ PartialOrd
+ SubFrId
+ Serialize
+ DeserializeOwned
{
}
impl<T> NumOps for T where
T: Send + Unpin + Zero + AsPrimitive<f32> + Bounded + PartialOrd + SubFrId + Serialize + DeserializeOwned
T: Send + Unpin + Debug + Zero + AsPrimitive<f32> + Bounded + PartialOrd + SubFrId + Serialize + DeserializeOwned
{
}