Clean up left todo items

This commit is contained in:
Dominik Werder
2021-06-10 10:38:51 +02:00
parent da50d772a0
commit 9c1cf3bba3
14 changed files with 288 additions and 143 deletions

View File

@@ -1,5 +1,6 @@
use crate::agg::binnedt::{TimeBinnableType, TimeBinnableTypeAggregator};
use crate::agg::streams::Appendable;
use crate::agg::{Fits, FitsInside};
use crate::binned::{
EventsNodeProcessor, FilterFittingInside, MinMaxAvgBins, NumOps, PushableIndex, RangeOverlapInfo, ReadPbv,
ReadableFromFile, WithLen, WithTimestamps,
@@ -83,9 +84,36 @@ impl<NTY> RangeOverlapInfo for XBinnedScalarEvents<NTY> {
}
}
impl<NTY> FitsInside for XBinnedScalarEvents<NTY> {
fn fits_inside(&self, range: NanoRange) -> Fits {
if self.tss.is_empty() {
Fits::Empty
} else {
let t1 = *self.tss.first().unwrap();
let t2 = *self.tss.last().unwrap();
if t2 < range.beg {
Fits::Lower
} else if t1 > range.end {
Fits::Greater
} else if t1 < range.beg && t2 > range.end {
Fits::PartlyLowerAndGreater
} else if t1 < range.beg {
Fits::PartlyLower
} else if t2 > range.end {
Fits::PartlyGreater
} else {
Fits::Inside
}
}
}
}
impl<NTY> FilterFittingInside for XBinnedScalarEvents<NTY> {
fn filter_fitting_inside(self, _fit_range: NanoRange) -> Option<Self> {
todo!()
fn filter_fitting_inside(self, fit_range: NanoRange) -> Option<Self> {
match self.fits_inside(fit_range) {
Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => Some(self),
_ => None,
}
}
}

View File

@@ -16,7 +16,7 @@ pub enum StreamItem<T> {
Stats(StatsItem),
}
pub trait Collector {
pub trait Collector: WithLen {
type Input: Collectable;
type Output: Serialize;
fn ingest(&mut self, src: &Self::Input);

View File

@@ -220,16 +220,28 @@ fn make_num_pipeline_entry<PPP>(
) -> Result<BinnedResponseDyn, Error>
where
PPP: PipelinePostProcessA,
PPP: PipelinePostProcessB<MinMaxAvgBins<u8>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<u16>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<u32>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<u64>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<i8>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<i16>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<i32>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<i64>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<f32>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<f64>>,
{
match scalar_type {
ScalarType::U8 => match_end!(u8, byte_order, shape, query, ppp, node_config),
ScalarType::U16 => match_end!(u16, byte_order, shape, query, ppp, node_config),
ScalarType::U32 => match_end!(u32, byte_order, shape, query, ppp, node_config),
ScalarType::U64 => match_end!(u64, byte_order, shape, query, ppp, node_config),
ScalarType::I8 => match_end!(i8, byte_order, shape, query, ppp, node_config),
ScalarType::I16 => match_end!(i16, byte_order, shape, query, ppp, node_config),
ScalarType::I32 => match_end!(i32, byte_order, shape, query, ppp, node_config),
ScalarType::I64 => match_end!(i64, byte_order, shape, query, ppp, node_config),
ScalarType::F32 => match_end!(f32, byte_order, shape, query, ppp, node_config),
ScalarType::F64 => match_end!(f64, byte_order, shape, query, ppp, node_config),
// TODO complete set
_ => todo!(),
}
}
@@ -240,8 +252,15 @@ async fn make_num_pipeline<PPP>(
) -> Result<BinnedResponseDyn, Error>
where
PPP: PipelinePostProcessA,
PPP: PipelinePostProcessB<MinMaxAvgBins<u8>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<u16>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<u32>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<u64>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<i8>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<i16>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<i32>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<i64>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<f32>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<f64>>,
{
if query.channel().backend != node_config.node.backend {
@@ -306,11 +325,17 @@ where
}
}
struct CollectForJson {}
struct CollectForJson {
timeout: Duration,
abort_after_bin_count: u32,
}
impl CollectForJson {
pub fn new() -> Self {
Self {}
pub fn new(timeout: Duration, abort_after_bin_count: u32) -> Self {
Self {
timeout,
abort_after_bin_count,
}
}
}
@@ -322,11 +347,16 @@ pub struct JsonCollector {
}
impl JsonCollector {
pub fn new<NTY>(inp: Pin<Box<dyn Stream<Item = Sitemty<MinMaxAvgBins<NTY>>> + Send>>, bin_count_exp: u32) -> Self
pub fn new<NTY>(
inp: Pin<Box<dyn Stream<Item = Sitemty<MinMaxAvgBins<NTY>>> + Send>>,
bin_count_exp: u32,
timeout: Duration,
abort_after_bin_count: u32,
) -> Self
where
NTY: NumOps + Serialize + 'static,
{
let fut = collect_all(inp, bin_count_exp);
let fut = collect_all(inp, bin_count_exp, timeout, abort_after_bin_count);
let fut = Box::pin(fut);
Self { fut, done: false }
}
@@ -397,7 +427,7 @@ where
inp: Pin<Box<dyn Stream<Item = Sitemty<MinMaxAvgBins<NTY>>> + Send>>,
bin_count_exp: u32,
) -> Pin<Box<dyn Stream<Item = Box<dyn BinnedResponseItem>> + Send>> {
let s = JsonCollector::new(inp, bin_count_exp);
let s = JsonCollector::new(inp, bin_count_exp, self.timeout, self.abort_after_bin_count);
Box::pin(s)
}
}
@@ -488,12 +518,17 @@ impl Serialize for IsoDateTime {
}
}
pub async fn collect_all<T, S>(stream: S, bin_count_exp: u32) -> Result<serde_json::Value, Error>
pub async fn collect_all<T, S>(
stream: S,
bin_count_exp: u32,
timeout: Duration,
abort_after_bin_count: u32,
) -> Result<serde_json::Value, Error>
where
S: Stream<Item = Sitemty<T>> + Unpin,
T: Collectable,
{
let deadline = tokio::time::Instant::now() + Duration::from_millis(1000);
let deadline = tokio::time::Instant::now() + timeout;
let mut collector = <T as Collectable>::new_collector(bin_count_exp);
let mut i1 = 0;
let mut stream = stream;
@@ -501,11 +536,15 @@ where
let item = if i1 == 0 {
stream.next().await
} else {
match tokio::time::timeout_at(deadline, stream.next()).await {
Ok(k) => k,
Err(_) => {
collector.set_timed_out();
None
if abort_after_bin_count > 0 && collector.len() >= abort_after_bin_count as usize {
None
} else {
match tokio::time::timeout_at(deadline, stream.next()).await {
Ok(k) => k,
Err(_) => {
collector.set_timed_out();
None
}
}
}
};
@@ -542,7 +581,12 @@ pub async fn binned_json(
node_config: &NodeConfigCached,
query: &BinnedQuery,
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>, Error> {
let pl = make_num_pipeline(query, CollectForJson::new(), node_config).await?;
let pl = make_num_pipeline(
query,
CollectForJson::new(query.timeout(), query.abort_after_bin_count()),
node_config,
)
.await?;
let ret = pl.stream.map(|item| {
let fr = item.to_json_result()?;
let buf = fr.to_json_bytes()?;
@@ -933,6 +977,15 @@ impl<NTY> MinMaxAvgBinsCollector<NTY> {
}
}
impl<NTY> WithLen for MinMaxAvgBinsCollector<NTY>
where
NTY: NumOps + Serialize,
{
fn len(&self) -> usize {
self.vals.ts1s.len()
}
}
impl<NTY> Collector for MinMaxAvgBinsCollector<NTY>
where
NTY: NumOps + Serialize,
@@ -974,7 +1027,7 @@ where
};
let ret = MinMaxAvgBinsCollectedResult::<NTY> {
ts_bin_edges: tsa,
counts: vec![],
counts: self.vals.counts,
mins: self.vals.mins,
maxs: self.vals.maxs,
avgs: self.vals.avgs,

View File

@@ -3,6 +3,7 @@ use err::Error;
use netpod::log::*;
use netpod::{AggKind, ByteSize, Channel, HostPort, NanoRange, PreBinnedPatchCoord, ToNanos};
use std::collections::BTreeMap;
use std::time::Duration;
#[derive(Clone, Debug)]
pub struct PreBinnedQuery {
@@ -171,6 +172,8 @@ pub struct BinnedQuery {
cache_usage: CacheUsage,
disk_stats_every: ByteSize,
report_error: bool,
timeout: Duration,
abort_after_bin_count: u32,
}
impl BinnedQuery {
@@ -183,6 +186,8 @@ impl BinnedQuery {
cache_usage: CacheUsage::Use,
disk_stats_every: ByteSize(1024 * 1024 * 4),
report_error: false,
timeout: Duration::from_millis(2000),
abort_after_bin_count: 0,
}
}
@@ -217,6 +222,17 @@ impl BinnedQuery {
.map_or("false", |k| k)
.parse()
.map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?,
timeout: params
.get("timeout")
.map_or("2000", |k| k)
.parse::<u64>()
.map(|k| Duration::from_millis(k))
.map_err(|e| Error::with_msg(format!("can not parse timeout {:?}", e)))?,
abort_after_bin_count: params
.get("abortAfterBinCount")
.map_or("0", |k| k)
.parse()
.map_err(|e| Error::with_msg(format!("can not parse abortAfterBinCount {:?}", e)))?,
};
info!("BinnedQuery::from_request {:?}", ret);
Ok(ret)
@@ -250,6 +266,14 @@ impl BinnedQuery {
self.report_error
}
pub fn timeout(&self) -> Duration {
self.timeout
}
pub fn abort_after_bin_count(&self) -> u32 {
self.abort_after_bin_count
}
pub fn set_cache_usage(&mut self, k: CacheUsage) {
self.cache_usage = k;
}
@@ -258,12 +282,16 @@ impl BinnedQuery {
self.disk_stats_every = k;
}
pub fn set_timeout(&mut self, k: Duration) {
self.timeout = k;
}
// TODO the BinnedQuery itself should maybe already carry the full HostPort?
// On the other hand, want to keep the flexibility for the fail over possibility..
pub fn url(&self, host: &HostPort) -> String {
let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ";
format!(
"http://{}:{}/api/4/binned?cacheUsage={}&channelBackend={}&channelName={}&binCount={}&begDate={}&endDate={}&diskStatsEveryKb={}",
"http://{}:{}/api/4/binned?cacheUsage={}&channelBackend={}&channelName={}&binCount={}&begDate={}&endDate={}&diskStatsEveryKb={}&timeout={}&abortAfterBinCount={}",
host.host,
host.port,
self.cache_usage,
@@ -273,6 +301,8 @@ impl BinnedQuery {
Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt),
Utc.timestamp_nanos(self.range.end as i64).format(date_fmt),
self.disk_stats_every.bytes() / 1024,
self.timeout.as_millis(),
self.abort_after_bin_count,
)
}
}

View File

@@ -76,15 +76,6 @@ impl AsyncRead for HttpBodyAsAsyncRead {
}
}
pub struct BytesWrap {}
impl From<BytesWrap> for Bytes {
fn from(_k: BytesWrap) -> Self {
error!("TODO convert result to octets");
todo!("TODO convert result to octets")
}
}
pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, cluster: &Cluster) -> u32 {
let mut hash = tiny_keccak::Sha3::v256();
hash.update(channel.backend.as_bytes());

View File

@@ -1,6 +1,7 @@
use crate::agg::binnedt::TimeBinnableType;
use crate::agg::enp::{Identity, WaveXBinner};
use crate::agg::streams::{Appendable, StreamItem};
use crate::agg::{Fits, FitsInside};
use crate::binned::{
EventValuesAggregator, EventsNodeProcessor, FilterFittingInside, MinMaxAvgBins, NumOps, PushableIndex,
RangeCompletableItem, RangeOverlapInfo, ReadPbv, ReadableFromFile, WithLen, WithTimestamps,
@@ -213,9 +214,36 @@ impl<VT> RangeOverlapInfo for EventValues<VT> {
}
}
impl<VT> FitsInside for EventValues<VT> {
fn fits_inside(&self, range: NanoRange) -> Fits {
if self.tss.is_empty() {
Fits::Empty
} else {
let t1 = *self.tss.first().unwrap();
let t2 = *self.tss.last().unwrap();
if t2 < range.beg {
Fits::Lower
} else if t1 > range.end {
Fits::Greater
} else if t1 < range.beg && t2 > range.end {
Fits::PartlyLowerAndGreater
} else if t1 < range.beg {
Fits::PartlyLower
} else if t2 > range.end {
Fits::PartlyGreater
} else {
Fits::Inside
}
}
}
}
impl<VT> FilterFittingInside for EventValues<VT> {
fn filter_fitting_inside(self, _fit_range: NanoRange) -> Option<Self> {
todo!()
fn filter_fitting_inside(self, fit_range: NanoRange) -> Option<Self> {
match self.fits_inside(fit_range) {
Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => Some(self),
_ => None,
}
}
}

View File

@@ -91,8 +91,8 @@ impl Stream for FileReader {
}
}
#[allow(dead_code)]
struct Fopen1 {
#[allow(dead_code)]
opts: OpenOptions,
fut: Pin<Box<dyn Future<Output = Result<File, std::io::Error>>>>,
term: bool,
@@ -104,8 +104,6 @@ impl Fopen1 {
let mut o1 = OpenOptions::new();
let o2 = o1.read(true);
let res = o2.open(path);
//() == res;
//todo!()
res.await
}) as Pin<Box<dyn Future<Output = Result<File, std::io::Error>>>>;
let _fut2: Box<dyn Future<Output = u32>> = Box::new(async { 123 });