This commit is contained in:
Dominik Werder
2021-06-08 23:25:06 +02:00
parent 7b40938427
commit ce86eb9870
4 changed files with 185 additions and 104 deletions

View File

@@ -1,11 +1,11 @@
use crate::spawn_test_hosts;
use bytes::BytesMut;
use chrono::{DateTime, Utc};
use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use disk::agg::streams::{Bins, StatsItem, StreamItem};
use disk::binned::query::{BinnedQuery, CacheUsage};
use disk::binned::{MinMaxAvgBins, RangeCompletableItem, WithLen};
use disk::frame::inmem::InMemoryFrameAsyncReadStream;
use disk::frame::makeframe::FrameType;
use disk::streamlog::Streamlog;
use err::Error;
use futures_util::StreamExt;
@@ -227,8 +227,10 @@ where
None
}
StreamItem::DataItem(frame) => {
info!("test receives tyid {:x}", frame.tyid());
type ExpectedType = Result<StreamItem<RangeCompletableItem<MinMaxAvgBins<f64>>>, Error>;
if frame.tyid() != <ExpectedType as FrameType>::FRAME_TYPE_ID {
error!("test receives unexpected tyid {:x}", frame.tyid());
}
match bincode::deserialize::<ExpectedType>(frame.buf()) {
Ok(item) => match item {
Ok(item) => match item {

View File

@@ -21,6 +21,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::fs::File;
// TODO no longer needed
pub struct DefaultScalarEventsTimeBinner<VT> {
_m1: PhantomData<VT>,
}
@@ -38,6 +39,7 @@ where
}
}
// TODO no longer needed
pub struct DefaultSingleXBinTimeBinner<VT> {
_m1: PhantomData<VT>,
}

View File

@@ -3,6 +3,7 @@ use crate::streamlog::LogItem;
use err::Error;
use netpod::EventDataReadStats;
use serde::{Deserialize, Serialize};
use std::any::Any;
#[derive(Debug, Serialize, Deserialize)]
pub enum StatsItem {
@@ -32,6 +33,21 @@ pub trait Collectable {
fn append_to(&self, collected: &mut Self::Collected);
}
pub trait Collectable2: Any {
fn as_any_ref(&self) -> &dyn Any;
fn append(&mut self, src: &dyn Any);
}
pub trait CollectionSpec2 {
// TODO Can I use here associated types and return concrete types?
// Probably not object safe.
fn empty(&self) -> Box<dyn Collectable2>;
}
pub trait CollectionSpecMaker2 {
fn spec(bin_count_exp: u32) -> Box<dyn CollectionSpec2>;
}
pub trait ToJsonResult {
type Output;
fn to_json_result(&self) -> Result<Self::Output, Error>;

View File

@@ -8,7 +8,9 @@ use crate::agg::binnedt4::{
use crate::agg::enp::{Identity, WaveXBinner, XBinnedScalarEvents};
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::{Appendable, Collectable, Collected, StreamItem, ToJsonResult};
use crate::agg::streams::{
Appendable, Collectable, Collectable2, Collected, CollectionSpec2, CollectionSpecMaker2, StreamItem, ToJsonResult,
};
use crate::agg::{Fits, FitsInside};
use crate::binned::binnedfrompbv::BinnedFromPreBinned;
use crate::binned::query::{BinnedQuery, PreBinnedQuery};
@@ -39,6 +41,7 @@ use parse::channelconfig::{extract_matching_config_entry, read_local_config, Mat
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize, Serializer};
use serde_json::Map;
use std::any::Any;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
@@ -49,6 +52,7 @@ use tokio::io::{AsyncRead, ReadBuf};
pub mod binnedfrompbv;
pub mod pbv;
// TODO get rid of whole pbv2 mod?
pub mod pbv2;
pub mod prebinned;
pub mod query;
@@ -169,41 +173,40 @@ impl ToJsonResult for MinMaxAvgScalarBinBatch {
}
}
type BinnedBytesStreamBox = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>;
pub trait BinnedResponseItem: Framable {}
impl<T> BinnedResponseItem for T where T: Framable {}
pub struct BinnedResponse {
stream: Pin<Box<dyn Stream<Item = Box<dyn BinnedResponseItem>> + Send>>,
bin_count: u32,
collection_spec: Box<dyn CollectionSpec2>,
}
// TODO Can I unify these functions with the ones from prebinned.rs?
// They also must resolve to the same types, so would be good to unify.
fn make_num_pipeline_nty_end_evs_enp<NTY, END, EVS, ENP, ETB>(
fn make_num_pipeline_nty_end_evs_enp<NTY, END, EVS, ENP>(
query: BinnedQuery,
_event_value_shape: EVS,
node_config: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>, Error>
) -> Result<BinnedResponse, Error>
where
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + 'static,
END: Endianness + 'static,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output> + 'static,
ETB: EventsTimeBinner<Input = <ENP as EventsNodeProcessor>::Output> + 'static,
<ENP as EventsNodeProcessor>::Output: TimeBinnableType + PushableIndex + Appendable + 'static,
<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output:
TimeBinnableType<Output = <<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output> + Unpin,
<ETB as EventsTimeBinner>::Output: Serialize + ReadableFromFile + 'static,
Sitemty<
<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Aggregator as TimeBinnableTypeAggregator>::Output,
>: Framable,
// TODO require these things in general?
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType + Framable + 'static,
Sitemty<<ETB as EventsTimeBinner>::Output>: FrameType + Framable + DeserializeOwned,
Sitemty<
<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Aggregator as TimeBinnableTypeAggregator>::Output,
>: FrameType + Framable + DeserializeOwned,
Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>:
FrameType + Framable + DeserializeOwned,
<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output: CollectionSpecMaker2,
{
// TODO construct the binned pipeline:
// Either take from prebinned sub sstream, or directly from a merged.
//let ret = crate::binned::pbv::PreBinnedValueStream::<NTY, END, EVS, ENP, ETB>::new(query, node_config);
//let ret = StreamExt::map(ret, |item| Box::new(item) as Box<dyn Framable>);
//Box::pin(ret)
if query.channel().backend != node_config.node.backend {
let err = Error::with_msg(format!(
"backend mismatch node: {} requested: {}",
@@ -237,14 +240,23 @@ where
query.disk_stats_every().clone(),
query.report_error(),
)?
.map(|item| Box::new(item) as Box<dyn Framable>);
Ok(Box::pin(s))
.map(|item| Box::new(item) as Box<dyn BinnedResponseItem>);
let ret = BinnedResponse {
stream: Box::pin(s),
bin_count: range.count as u32,
collection_spec:
<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output as CollectionSpecMaker2>::spec(
range.count as u32,
),
};
Ok(ret)
}
Ok(None) => {
info!(
"binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}",
range
);
let bin_count = range.count as u32;
let evq = EventsQuery {
channel: query.channel().clone(),
range: query.range().clone(),
@@ -252,8 +264,16 @@ where
};
let s = MergedFromRemotes2::<ENP>::new(evq, perf_opts, node_config.node_config.cluster.clone());
let s = TBinnerStream::<_, <ENP as EventsNodeProcessor>::Output>::new(s, range);
let s = StreamExt::map(s, |item| Box::new(item) as Box<dyn Framable>);
Ok(Box::pin(s))
let s = StreamExt::map(s, |item| Box::new(item) as Box<dyn BinnedResponseItem>);
let ret = BinnedResponse {
stream: Box::pin(s),
bin_count,
collection_spec:
<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output as CollectionSpecMaker2>::spec(
range.count as u32,
),
};
Ok(ret)
}
Err(e) => Err(e),
}
@@ -263,26 +283,22 @@ fn make_num_pipeline_nty_end<NTY, END>(
shape: Shape,
query: BinnedQuery,
node_config: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>, Error>
) -> Result<BinnedResponse, Error>
where
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + 'static,
END: Endianness + 'static,
{
match shape {
Shape::Scalar => {
make_num_pipeline_nty_end_evs_enp::<NTY, END, _, Identity<NTY>, DefaultScalarEventsTimeBinner<NTY>>(
query,
EventValuesDim0Case::new(),
node_config,
)
}
Shape::Wave(n) => {
make_num_pipeline_nty_end_evs_enp::<NTY, END, _, WaveXBinner<NTY>, DefaultSingleXBinTimeBinner<NTY>>(
query,
EventValuesDim1Case::new(n),
node_config,
)
}
Shape::Scalar => make_num_pipeline_nty_end_evs_enp::<NTY, END, _, Identity<NTY>>(
query,
EventValuesDim0Case::new(),
node_config,
),
Shape::Wave(n) => make_num_pipeline_nty_end_evs_enp::<NTY, END, _, WaveXBinner<NTY>>(
query,
EventValuesDim1Case::new(n),
node_config,
),
}
}
@@ -301,7 +317,7 @@ fn make_num_pipeline(
shape: Shape,
query: BinnedQuery,
node_config: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>, Error> {
) -> Result<BinnedResponse, Error> {
match scalar_type {
ScalarType::I32 => match_end!(i32, byte_order, shape, query, node_config),
ScalarType::F64 => match_end!(f64, byte_order, shape, query, node_config),
@@ -309,10 +325,20 @@ fn make_num_pipeline(
}
}
pub async fn binned_bytes_for_http(
node_config: &NodeConfigCached,
pub trait PipelinePostProcess {
type Input;
type Output;
fn post(inp: Self::Input) -> Self::Output;
}
// TODO return impl Stream instead.
async fn make_num_pipeline_for_entry<PPP>(
query: &BinnedQuery,
) -> Result<BinnedBytesStreamBox, Error> {
node_config: &NodeConfigCached,
) -> Result<BinnedResponse, Error>
where
PPP: PipelinePostProcess,
{
let channel_config = read_local_config(&query.channel(), &node_config.node).await?;
match extract_matching_config_entry(query.range(), &channel_config)? {
MatchingConfigEntry::Multiple => Err(Error::with_msg("multiple config entries found"))?,
@@ -320,43 +346,51 @@ pub async fn binned_bytes_for_http(
// TODO can I use the same binned_stream machinery to construct the matching empty result?
// Need the requested range all with empty/nan values and zero counts.
let s = futures_util::stream::empty();
Ok(Box::pin(s))
let ret = BinnedResponse {
stream: Box::pin(s),
bin_count: 0,
collection_spec: <MinMaxAvgBins<u8> as CollectionSpecMaker2>::spec(0),
};
Ok(ret)
}
MatchingConfigEntry::Entry(entry) => {
// TODO make this a stream log:
info!("binned_bytes_for_http found config entry {:?}", entry);
let res = make_num_pipeline(
let ret = make_num_pipeline(
entry.scalar_type.clone(),
entry.byte_order.clone(),
entry.to_shape()?,
query.clone(),
node_config,
)?;
let res = res.map(|item| item.make_frame());
let res = res.map(|item| match item {
Ok(item) => Ok(item.freeze()),
Err(e) => Err(e),
});
let res = Box::pin(res);
return Ok(res);
match query.agg_kind() {
AggKind::DimXBins1 => {
let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?;
let ret = BinnedBytesForHttpStream::new(res.binned_stream);
Ok(Box::pin(ret))
}
AggKind::DimXBinsN(_) => {
// TODO pass a different stream kind here:
err::todo();
let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?;
let ret = BinnedBytesForHttpStream::new(res.binned_stream);
Ok(Box::pin(ret))
}
}
Ok(ret)
}
}
}
struct ToFrameBytes {}
impl PipelinePostProcess for ToFrameBytes {
type Input = ();
type Output = ();
fn post(inp: Self::Input) -> Self::Output {
todo!()
}
}
pub async fn binned_bytes_for_http(
node_config: &NodeConfigCached,
query: &BinnedQuery,
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>, Error> {
let pl = make_num_pipeline_for_entry::<ToFrameBytes>(query, node_config).await?;
let ret = pl.stream.map(|item| match item.make_frame() {
Ok(item) => Ok(item.freeze()),
Err(e) => Err(e),
});
Ok(Box::pin(ret))
}
pub struct BinnedBytesForHttpStream<S> {
inp: S,
errored: bool,
@@ -430,15 +464,13 @@ impl Serialize for IsoDateTime {
}
}
pub async fn collect_all<T>(
stream: impl Stream<Item = Result<StreamItem<RangeCompletableItem<T>>, Error>> + Unpin,
bin_count_exp: u32,
) -> Result<<T as Collectable>::Collected, Error>
pub async fn collect_all<S>(stream: S, collection_spec: Box<dyn CollectionSpec2>) -> Result<serde_json::Value, Error>
where
T: Collectable,
S: Stream<Item = Sitemty<Box<dyn Any>>> + Unpin,
{
let deadline = tokio::time::Instant::now() + Duration::from_millis(1000);
let mut main_item = <T as Collectable>::Collected::new(bin_count_exp);
//let mut main_item = <T as Collectable>::Collected::new(bin_count_exp);
let mut main_item = collection_spec.empty();
let mut i1 = 0;
let mut stream = stream;
loop {
@@ -449,7 +481,13 @@ where
match tokio::time::timeout_at(deadline, stream.next()).await {
Ok(k) => k,
Err(_) => {
main_item.timed_out(true);
// TODO
// TODO
// TODO
//main_item.timed_out(true);
None
}
}
@@ -463,7 +501,8 @@ where
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {}
RangeCompletableItem::Data(item) => {
item.append_to(&mut main_item);
main_item.append(&item);
//item.append_to(&mut main_item);
i1 += 1;
}
},
@@ -477,11 +516,12 @@ where
None => break,
}
}
Ok(main_item)
Ok(err::todoval())
}
// TODO remove
#[derive(Debug, Serialize, Deserialize)]
pub struct BinnedJsonResult {
pub struct UnusedBinnedJsonResult {
ts_bin_edges: Vec<IsoDateTime>,
counts: Vec<u64>,
#[serde(skip_serializing_if = "Bool::is_false")]
@@ -493,37 +533,12 @@ pub struct BinnedJsonResult {
}
pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result<serde_json::Value, Error> {
let channel_config = read_local_config(&query.channel(), &node_config.node).await?;
match extract_matching_config_entry(query.range(), &channel_config)? {
MatchingConfigEntry::None => {
// TODO can I use the same binned_stream machinery to construct the matching empty result?
Ok(serde_json::Value::Object(Map::new()))
}
MatchingConfigEntry::Multiple => Err(Error::with_msg("multiple config entries found"))?,
MatchingConfigEntry::Entry(entry) => {
info!("binned_json found config entry {:?}", entry);
match query.agg_kind() {
AggKind::DimXBins1 => {
let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?;
//let ret = BinnedBytesForHttpStream::new(res.binned_stream);
//Ok(Box::pin(ret))
// TODO need to collect also timeout, number of missing expected bins, ...
let collected = collect_all(res.binned_stream, res.range.count as u32).await?;
let ret = ToJsonResult::to_json_result(&collected)?;
Ok(serde_json::to_value(ret)?)
}
AggKind::DimXBinsN(_xbincount) => {
// TODO pass a different stream kind here:
err::todo();
let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?;
// TODO need to collect also timeout, number of missing expected bins, ...
let collected = collect_all(res.binned_stream, res.range.count as u32).await?;
let ret = ToJsonResult::to_json_result(&collected)?;
Ok(serde_json::to_value(ret)?)
}
}
}
}
err::todoval()
/*let pl = make_num_pipeline_for_entry(query, node_config).await?;
let collected = collect_all(pl.stream, pl.bin_count).await?;
let ret = ToJsonResult::to_json_result(&collected)?;
let ret = serde_json::to_value(ret)?;
Ok(ret)*/
}
pub struct ReadPbv<T>
@@ -933,6 +948,52 @@ where
}
}
pub struct MinMaxAvgBinsCollected<NTY> {
_m1: PhantomData<NTY>,
}
impl<NTY> MinMaxAvgBinsCollected<NTY> {
pub fn new() -> Self {
Self { _m1: PhantomData }
}
}
impl<NTY> Collectable2 for Sitemty<MinMaxAvgBins<NTY>>
where
NTY: 'static,
{
fn as_any_ref(&self) -> &dyn Any {
self
}
fn append(&mut self, src: &dyn Any) {
todo!()
}
}
pub struct MinMaxAvgBinsCollectionSpec<NTY> {
bin_count_exp: u32,
_m1: PhantomData<NTY>,
}
impl<NTY> CollectionSpec2 for MinMaxAvgBinsCollectionSpec<NTY> {
fn empty(&self) -> Box<dyn Collectable2> {
Box::new(MinMaxAvgBins::<NTY>::empty())
}
}
impl<NTY> CollectionSpecMaker2 for MinMaxAvgBins<NTY>
where
NTY: 'static,
{
fn spec(bin_count_exp: u32) -> Box<dyn CollectionSpec2> {
Box::new(MinMaxAvgBinsCollectionSpec::<NTY> {
bin_count_exp,
_m1: PhantomData,
})
}
}
pub struct MinMaxAvgAggregator<NTY> {
range: NanoRange,
count: u32,