Add AggKind variant for N bins in X-dim
This commit is contained in:
@@ -10,7 +10,7 @@ use err::Error;
|
||||
use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::log::*;
|
||||
use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange};
|
||||
use netpod::{AggKind, BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange};
|
||||
use num_traits::Zero;
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
use std::pin::Pin;
|
||||
@@ -100,12 +100,15 @@ pub async fn binned_bytes_for_http(
|
||||
query: &BinnedQuery,
|
||||
) -> Result<BinnedStreamBox, Error> {
|
||||
// TODO must decide here already which AggKind so that I can call into the generic code.
|
||||
|
||||
todo::todo;
|
||||
|
||||
let res = binned_stream(node_config, query).await?;
|
||||
let ret = BinnedBytesForHttpStream::new(res.binned_stream);
|
||||
Ok(Box::pin(ret))
|
||||
//todo::todo;
|
||||
match query.agg_kind() {
|
||||
AggKind::DimXBins1 => {
|
||||
let res = binned_stream(node_config, query).await?;
|
||||
let ret = BinnedBytesForHttpStream::new(res.binned_stream);
|
||||
Ok(Box::pin(ret))
|
||||
}
|
||||
AggKind::DimXBinsN(_) => err::todoval(),
|
||||
}
|
||||
}
|
||||
|
||||
pub type BinnedBytesForHttpStreamFrame = <BinnedStreamFromPreBinnedPatches as Stream>::Item;
|
||||
|
||||
@@ -186,7 +186,7 @@ impl PreBinnedQuery {
|
||||
patch: PreBinnedPatchCoord::new(bin_t_len, patch_t_len, patch_ix),
|
||||
agg_kind: params
|
||||
.get("agg_kind")
|
||||
.map_or(&format!("{:?}", AggKind::DimXBins1), |k| k)
|
||||
.map_or(&format!("{}", AggKind::DimXBins1), |k| k)
|
||||
.parse()
|
||||
.map_err(|e| Error::with_msg(format!("can not parse agg_kind {:?}", e)))?,
|
||||
channel: channel_from_params(¶ms)?,
|
||||
@@ -198,7 +198,7 @@ impl PreBinnedQuery {
|
||||
|
||||
pub fn make_query_string(&self) -> String {
|
||||
format!(
|
||||
"{}&channel_backend={}&channel_name={}&agg_kind={:?}&cache_usage={}&disk_stats_every_kb={}",
|
||||
"{}&channel_backend={}&channel_name={}&agg_kind={}&cache_usage={}&disk_stats_every_kb={}",
|
||||
self.patch.to_url_params_strings(),
|
||||
self.channel.backend,
|
||||
self.channel.name,
|
||||
@@ -452,7 +452,7 @@ impl CacheFileDesc {
|
||||
h.update(b"V000");
|
||||
h.update(self.channel.backend.as_bytes());
|
||||
h.update(self.channel.name.as_bytes());
|
||||
h.update(format!("{:?}", self.agg_kind).as_bytes());
|
||||
h.update(format!("{}", self.agg_kind).as_bytes());
|
||||
h.update(&self.patch.spec().bin_t_len().to_le_bytes());
|
||||
h.update(&self.patch.spec().patch_t_len().to_le_bytes());
|
||||
h.update(&self.patch.ix().to_le_bytes());
|
||||
@@ -481,7 +481,7 @@ impl CacheFileDesc {
|
||||
.join(&hc[0..3])
|
||||
.join(&hc[3..6])
|
||||
.join(&self.channel.name)
|
||||
.join(format!("{:?}", self.agg_kind))
|
||||
.join(format!("{}", self.agg_kind))
|
||||
.join(format!(
|
||||
"{:010}-{:010}",
|
||||
self.patch.spec().bin_t_len() / SEC,
|
||||
|
||||
@@ -3,6 +3,7 @@ use err::Error;
|
||||
use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt::{Debug, Display, Formatter};
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use std::str::FromStr;
|
||||
@@ -591,17 +592,41 @@ impl BinnedRange {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub enum AggKind {
|
||||
DimXBins1,
|
||||
DimXBinsN(u32),
|
||||
}
|
||||
|
||||
impl Display for AggKind {
|
||||
fn fmt(&self, fmt: &mut Formatter) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::DimXBins1 => {
|
||||
write!(fmt, "DimXBins1")
|
||||
}
|
||||
Self::DimXBinsN(n) => {
|
||||
write!(fmt, "DimXBinsN{}", n)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for AggKind {
|
||||
fn fmt(&self, fmt: &mut Formatter) -> std::fmt::Result {
|
||||
Display::fmt(self, fmt)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for AggKind {
|
||||
type Err = Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
let nmark = "DimXBinsN";
|
||||
if s == "DimXBins1" {
|
||||
Ok(AggKind::DimXBins1)
|
||||
} else if s.starts_with(nmark) {
|
||||
let nbins: u32 = s[nmark.len()..].parse()?;
|
||||
Ok(AggKind::DimXBinsN(nbins))
|
||||
} else {
|
||||
Err(Error::with_msg(format!("can not parse {} as AggKind", s)))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user