WIP on second AggKind

This commit is contained in:
Dominik Werder
2021-05-19 19:33:34 +02:00
parent 3b0404f2ba
commit 2b1e68a222
5 changed files with 151 additions and 33 deletions

View File

@@ -24,6 +24,7 @@ pub mod binnedt;
pub mod binnedx;
pub mod eventbatch;
pub mod scalarbinbatch;
pub mod streams;
pub trait AggregatableXdim1Bin {
type Output: AggregatableXdim1Bin + AggregatableTdim;

View File

@@ -1,4 +1,5 @@
use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim};
use crate::agg::streams::{Batchable, Bins};
use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside};
use crate::binned::MakeBytesFrame;
use crate::frame::makeframe::make_frame;
@@ -233,6 +234,23 @@ impl AggregatableTdim for MinMaxAvgScalarBinBatch {
}
}
impl Batchable for MinMaxAvgScalarBinBatch {
fn append(&mut self, k: &mut Self) {
self.ts1s.append(&mut k.ts1s);
self.ts2s.append(&mut k.ts2s);
self.counts.append(&mut k.counts);
self.mins.append(&mut k.mins);
self.maxs.append(&mut k.maxs);
self.avgs.append(&mut k.avgs);
}
}
impl Bins for MinMaxAvgScalarBinBatch {
fn bin_count(&self) -> u32 {
self.ts1s.len() as u32
}
}
pub struct MinMaxAvgScalarBinBatchAggregator {
ts1: u64,
ts2: u64,

23
disk/src/agg/streams.rs Normal file
View File

@@ -0,0 +1,23 @@
use crate::streamlog::LogItem;
use netpod::EventDataReadStats;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub enum StatsItem {
EventDataReadStats(EventDataReadStats),
}
#[derive(Debug, Serialize, Deserialize)]
pub enum StreamItem<T> {
DataItem(T),
Log(LogItem),
Stats(StatsItem),
}
pub trait Bins {
fn bin_count(&self) -> u32;
}
pub trait Batchable {
fn append(&mut self, k: &mut Self);
}

View File

@@ -1,8 +1,10 @@
use crate::agg::binnedt::IntoBinnedT;
use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem};
use crate::agg::streams::{Batchable, Bins, StatsItem, StreamItem};
use crate::binnedstream::{BinnedStream, BinnedStreamFromPreBinnedPatches};
use crate::cache::{BinnedQuery, MergedFromRemotes};
use crate::channelconfig::{extract_matching_config_entry, read_local_config};
use crate::frame::makeframe::{make_frame, FrameType};
use crate::raw::EventsQuery;
use bytes::Bytes;
use chrono::{TimeZone, Utc};
@@ -22,10 +24,46 @@ pub struct BinnedStreamRes<I> {
pub range: BinnedRange,
}
pub async fn binned_stream(
#[derive(Debug, Serialize, Deserialize)]
pub enum BinnedScalarStreamItem {
Values(MinMaxAvgScalarBinBatch),
RangeComplete,
}
impl MakeBytesFrame for Result<StreamItem<BinnedScalarStreamItem>, Error> {
fn make_bytes_frame(&self) -> Result<Bytes, Error> {
Ok(make_frame::<Self>(self)?.freeze())
}
}
impl FrameType for Result<StreamItem<BinnedScalarStreamItem>, Error> {
const FRAME_TYPE_ID: u32 = 0x02;
}
fn adapter_to_stream_item(
k: Result<MinMaxAvgScalarBinBatchStreamItem, Error>,
) -> Result<StreamItem<BinnedScalarStreamItem>, Error> {
match k {
Ok(k) => match k {
MinMaxAvgScalarBinBatchStreamItem::Log(item) => Ok(StreamItem::Log(item)),
MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(item) => {
Ok(StreamItem::Stats(StatsItem::EventDataReadStats(item)))
}
MinMaxAvgScalarBinBatchStreamItem::RangeComplete => {
Ok(StreamItem::DataItem(BinnedScalarStreamItem::RangeComplete))
}
MinMaxAvgScalarBinBatchStreamItem::Values(item) => {
Ok(StreamItem::DataItem(BinnedScalarStreamItem::Values(item)))
}
},
Err(e) => Err(e),
}
}
pub async fn binned_scalar_stream(
node_config: &NodeConfigCached,
query: &BinnedQuery,
) -> Result<BinnedStreamRes<Result<MinMaxAvgScalarBinBatchStreamItem, Error>>, Error> {
) -> Result<BinnedStreamRes<Result<StreamItem<BinnedScalarStreamItem>, Error>>, Error> {
if query.channel().backend != node_config.node.backend {
let err = Error::with_msg(format!(
"backend mismatch node: {} requested: {}",
@@ -34,15 +72,11 @@ pub async fn binned_stream(
));
return Err(err);
}
let range = query.range();
let channel_config = read_local_config(&query.channel(), &node_config.node).await?;
let entry = extract_matching_config_entry(range, &channel_config)?;
info!("binned_bytes_for_http found config entry {:?}", entry);
let range = BinnedRange::covering_range(range.clone(), query.bin_count())?.ok_or(Error::with_msg(format!(
"binned_bytes_for_http BinnedRange::covering_range returned None"
)))?;
let range = BinnedRange::covering_range(query.range().clone(), query.bin_count())?.ok_or(Error::with_msg(
format!("binned_bytes_for_http BinnedRange::covering_range returned None"),
))?;
let perf_opts = PerfOpts { inmem_bufcap: 512 };
let _shape = entry.to_shape()?;
//let _shape = entry.to_shape()?;
match PreBinnedPatchRange::covering_range(query.range().clone(), query.bin_count()) {
Ok(Some(pre_range)) => {
info!("binned_bytes_for_http found pre_range: {:?}", pre_range);
@@ -61,7 +95,8 @@ pub async fn binned_stream(
query.cache_usage().clone(),
node_config,
query.disk_stats_every().clone(),
)?;
)?
.map(adapter_to_stream_item);
let s = BinnedStream::new(Box::pin(s1))?;
let ret = BinnedStreamRes {
binned_stream: s,
@@ -82,6 +117,7 @@ pub async fn binned_stream(
// TODO do I need to set up more transformations or binning to deliver the requested data?
let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone());
let s = s.into_binned_t(range.clone());
let s = s.map(adapter_to_stream_item);
let s = BinnedStream::new(Box::pin(s))?;
let ret = BinnedStreamRes {
binned_stream: s,
@@ -100,10 +136,28 @@ pub async fn binned_bytes_for_http(
query: &BinnedQuery,
) -> Result<BinnedStreamBox, Error> {
// TODO must decide here already which AggKind so that I can call into the generic code.
// Really here?
// But don't I need the channel config to make a decision if the requested binning actually makes sense?
// In that case, the function I call here, should return a boxed trait object.
// What traits must the returned stream fulfill?
// Also, the items in that case must be trait objects as well.
// But that is ok, because there are only few items in the stream anyways at this stage.
// TODO what traits do I need downstream from here?
// TODO use boxed trait objects from here on.
// Must be able to convert them to bytes frames..
// TODO but the json endpoint: how should that be handled?
// Maybe it is enough if the items can turn themselves into serde_json::Value ?
//todo::todo;
let channel_config = read_local_config(&query.channel(), &node_config.node).await?;
let entry = extract_matching_config_entry(query.range(), &channel_config)?;
info!("binned_bytes_for_http found config entry {:?}", entry);
match query.agg_kind() {
AggKind::DimXBins1 => {
let res = binned_stream(node_config, query).await?;
let res = binned_scalar_stream(node_config, query).await?;
let ret = BinnedBytesForHttpStream::new(res.binned_stream);
Ok(Box::pin(ret))
}
@@ -111,6 +165,7 @@ pub async fn binned_bytes_for_http(
}
}
// TODO remove this when no longer used, gets replaced by Result<StreamItem<BinnedStreamItem>, Error>
pub type BinnedBytesForHttpStreamFrame = <BinnedStreamFromPreBinnedPatches as Stream>::Item;
pub struct BinnedBytesForHttpStream<S> {
@@ -199,15 +254,28 @@ pub struct BinnedJsonResult {
}
pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result<serde_json::Value, Error> {
let channel_config = read_local_config(&query.channel(), &node_config.node).await?;
let entry = extract_matching_config_entry(query.range(), &channel_config)?;
info!("binned_json found config entry {:?}", entry);
// TODO create the matching stream based on AggKind and ConfigEntry.
// TODO must batch the whole result stream. Can I have a trait to append all to the first received?
// TODO must convert the all-batched last item together.
let deadline = tokio::time::Instant::now() + Duration::from_millis(1000);
let mut batch = MinMaxAvgScalarBinBatch::empty();
let t = binned_stream(node_config, query).await?;
let t = binned_scalar_stream(node_config, query).await?;
let bin_count_exp = t.range.count;
let mut bin_count = 0;
let mut items = t.binned_stream;
let mut i1 = 0;
let mut partial_content = false;
let mut finalised_range = false;
// TODO factor out the collecting:
// How can I make this generic on the item type?
let mut main_item: Option<MinMaxAvgScalarBinBatch> = None;
loop {
let item = if i1 == 0 {
items.next().await
@@ -222,25 +290,29 @@ pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) ->
};
match item {
Some(item) => {
use MinMaxAvgScalarBinBatchStreamItem::*;
match item {
Ok(item) => match item {
Values(mut vals) => {
// TODO gather stats about the batch sizes.
bin_count += vals.ts1s.len() as u64;
batch.ts1s.append(&mut vals.ts1s);
batch.ts2s.append(&mut vals.ts2s);
batch.counts.append(&mut vals.counts);
batch.mins.append(&mut vals.mins);
batch.maxs.append(&mut vals.maxs);
batch.avgs.append(&mut vals.avgs);
i1 += 1;
}
Log(_) => {}
EventDataReadStats(_) => {}
RangeComplete => {
finalised_range = true;
}
StreamItem::Log(_) => {}
StreamItem::Stats(_) => {}
StreamItem::DataItem(item) => match item {
BinnedScalarStreamItem::RangeComplete => {
finalised_range = true;
}
BinnedScalarStreamItem::Values(mut vals) => {
bin_count += vals.bin_count();
match &mut main_item {
Some(main) => {
main.append(&mut vals);
}
None => {
main_item = Some(vals);
}
}
// TODO solve this via some trait to append to a batch:
// TODO gather stats about the batch sizes.
i1 += 1;
}
},
},
Err(e) => {
// TODO Need to use some flags to get good enough error message for remote user.
@@ -251,6 +323,9 @@ pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) ->
None => break,
}
}
// TODO handle the case when I didn't get any items..
let batch = main_item.unwrap();
let mut tsa: Vec<_> = batch
.ts1s
.iter()
@@ -267,13 +342,13 @@ pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) ->
} else {
None
};
if bin_count_exp < bin_count {
if bin_count_exp < bin_count as u64 {
Err(Error::with_msg("bin_count_exp < bin_count"))?
}
let ret = BinnedJsonResult {
ts_bin_edges: tsa,
counts: batch.counts,
missing_bins: bin_count_exp - bin_count,
missing_bins: bin_count_exp - bin_count as u64,
finalised_range,
continue_at,
};

View File

@@ -15,6 +15,7 @@ pub trait FrameType {
const FRAME_TYPE_ID: u32;
}
// TODO replaced by Result<StreamItem<BinnedStreamItem>, Error>
impl FrameType for BinnedBytesForHttpStreamFrame {
const FRAME_TYPE_ID: u32 = 0x02;
}