More generic BinnedBytesForHttpStream, changes for status page

This commit is contained in:
Dominik Werder
2021-05-18 16:35:51 +02:00
parent 530f88cd44
commit 19ff08a4bd
10 changed files with 289 additions and 38 deletions

View File

@@ -1,7 +1,10 @@
use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim};
use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside};
use crate::binned::MakeBytesFrame;
use crate::frame::makeframe::make_frame;
use crate::streamlog::LogItem;
use bytes::{BufMut, Bytes, BytesMut};
use err::Error;
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{EventDataReadStats, NanoRange};
@@ -402,6 +405,12 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatchStreamItem {
}
}
impl MakeBytesFrame for Result<MinMaxAvgScalarBinBatchStreamItem, Error> {
fn make_bytes_frame(&self) -> Result<Bytes, Error> {
Ok(make_frame::<Self>(self)?.freeze())
}
}
pub struct MinMaxAvgScalarBinBatchStreamItemAggregator {
agg: MinMaxAvgScalarBinBatchAggregator,
}

View File

@@ -3,7 +3,6 @@ use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatc
use crate::binnedstream::{BinnedStream, BinnedStreamFromPreBinnedPatches};
use crate::cache::{BinnedQuery, MergedFromRemotes};
use crate::channelconfig::{extract_matching_config_entry, read_local_config};
use crate::frame::makeframe::make_frame;
use crate::raw::EventsQuery;
use bytes::Bytes;
use chrono::{TimeZone, Utc};
@@ -120,9 +119,14 @@ impl<S> BinnedBytesForHttpStream<S> {
}
}
impl<S> Stream for BinnedBytesForHttpStream<S>
pub trait MakeBytesFrame {
fn make_bytes_frame(&self) -> Result<Bytes, Error>;
}
impl<S, I> Stream for BinnedBytesForHttpStream<S>
where
S: Stream<Item = Result<MinMaxAvgScalarBinBatchStreamItem, Error>> + Unpin,
S: Stream<Item = I> + Unpin,
I: MakeBytesFrame,
{
type Item = Result<Bytes, Error>;
@@ -136,8 +140,8 @@ where
return Ready(None);
}
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => match make_frame::<BinnedBytesForHttpStreamFrame>(&item) {
Ok(buf) => Ready(Some(Ok(buf.freeze()))),
Ready(Some(item)) => match item.make_bytes_frame() {
Ok(buf) => Ready(Some(Ok(buf))),
Err(e) => {
self.errored = true;
Ready(Some(Err(e.into())))

View File

@@ -16,6 +16,7 @@ use netpod::{
};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
@@ -45,6 +46,27 @@ impl CacheUsage {
}
.into()
}
pub fn from_params(params: &BTreeMap<String, String>) -> Result<Self, Error> {
let ret = params.get("cache_usage").map_or(Ok::<_, Error>(CacheUsage::Use), |k| {
if k == "use" {
Ok(CacheUsage::Use)
} else if k == "ignore" {
Ok(CacheUsage::Ignore)
} else if k == "recreate" {
Ok(CacheUsage::Recreate)
} else {
Err(Error::with_msg(format!("unexpected cache_usage {:?}", k)))?
}
})?;
Ok(ret)
}
}
impl Display for CacheUsage {
fn fmt(&self, fmt: &mut Formatter) -> std::fmt::Result {
write!(fmt, "{}", self.query_param_value())
}
}
#[derive(Clone, Debug)]
@@ -76,9 +98,13 @@ impl BinnedQuery {
.ok_or(Error::with_msg("missing bin_count"))?
.parse()
.map_err(|e| Error::with_msg(format!("can not parse bin_count {:?}", e)))?,
agg_kind: AggKind::DimXBins1,
agg_kind: params
.get("agg_kind")
.map_or("DimXBins1", |k| k)
.parse()
.map_err(|e| Error::with_msg(format!("can not parse agg_kind {:?}", e)))?,
channel: channel_from_params(&params)?,
cache_usage: cache_usage_from_params(&params)?,
cache_usage: CacheUsage::from_params(&params)?,
disk_stats_every: ByteSize::kb(disk_stats_every),
};
info!("BinnedQuery::from_request {:?}", ret);
@@ -158,27 +184,26 @@ impl PreBinnedQuery {
.map_err(|e| Error::with_msg(format!("can not parse disk_stats_every_kb {:?}", e)))?;
let ret = PreBinnedQuery {
patch: PreBinnedPatchCoord::new(bin_t_len, patch_t_len, patch_ix),
agg_kind: AggKind::DimXBins1,
agg_kind: params
.get("agg_kind")
.map_or(&format!("{:?}", AggKind::DimXBins1), |k| k)
.parse()
.map_err(|e| Error::with_msg(format!("can not parse agg_kind {:?}", e)))?,
channel: channel_from_params(&params)?,
cache_usage: cache_usage_from_params(&params)?,
cache_usage: CacheUsage::from_params(&params)?,
disk_stats_every: ByteSize::kb(disk_stats_every),
};
Ok(ret)
}
pub fn make_query_string(&self) -> String {
let cache_usage = match self.cache_usage {
CacheUsage::Use => "use",
CacheUsage::Ignore => "ignore",
CacheUsage::Recreate => "recreate",
};
format!(
"{}&channel_backend={}&channel_name={}&agg_kind={:?}&cache_usage={}&disk_stats_every_kb={}",
self.patch.to_url_params_strings(),
self.channel.backend,
self.channel.name,
self.agg_kind,
cache_usage,
self.cache_usage,
self.disk_stats_every.bytes() / 1024,
)
}
@@ -202,21 +227,6 @@ fn channel_from_params(params: &BTreeMap<String, String>) -> Result<Channel, Err
Ok(ret)
}
fn cache_usage_from_params(params: &BTreeMap<String, String>) -> Result<CacheUsage, Error> {
let ret = params.get("cache_usage").map_or(Ok::<_, Error>(CacheUsage::Use), |k| {
if k == "use" {
Ok(CacheUsage::Use)
} else if k == "ignore" {
Ok(CacheUsage::Ignore)
} else if k == "recreate" {
Ok(CacheUsage::Recreate)
} else {
Err(Error::with_msg(format!("unexpected cache_usage {:?}", k)))?
}
})?;
Ok(ret)
}
// NOTE This answers a request for a single valid pre-binned patch.
// A user must first make sure that the grid spec is valid, and that this node is responsible for it.
// Otherwise it is an error.