From b8ad53f798bc82e6729593382e412cc93bdde092 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 21 May 2021 12:56:45 +0200 Subject: [PATCH] Does not compile, type issues --- disk/src/agg/binnedt.rs | 2 +- disk/src/binned.rs | 33 +++++++++++++++++++++++++-- disk/src/binnedstream.rs | 48 ++++++++++++++++++++-------------------- disk/src/cache/pbvfs.rs | 2 +- 4 files changed, 57 insertions(+), 28 deletions(-) diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index 302a42b..34be794 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -9,7 +9,7 @@ use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; -pub trait AggregatorTdim { +pub trait AggregatorTdim: Sized + Unpin { type InputValue; type OutputValue: AggregatableXdim1Bin + AggregatableTdim + Unpin; fn ends_before(&self, inp: &Self::InputValue) -> bool; diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 8547e41..60271df 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -1,9 +1,10 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim, IntoBinnedT}; use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchAggregator}; use crate::agg::streams::{Collectable, Collected, StreamItem, ToJsonResult}; -use crate::agg::AggregatableXdim1Bin; +use crate::agg::{AggregatableXdim1Bin, FitsInside}; use crate::binned::scalar::{adapter_to_stream_item, binned_stream}; use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BinnedStream}; +use crate::cache::pbvfs::PreBinnedScalarItem; use crate::cache::{BinnedQuery, MergedFromRemotes}; use crate::channelconfig::{extract_matching_config_entry, read_local_config}; use crate::frame::makeframe::make_frame; @@ -14,7 +15,9 @@ use err::Error; use futures_core::Stream; use futures_util::StreamExt; use netpod::log::*; -use netpod::{AggKind, BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange}; +use netpod::{ + AggKind, BinnedRange, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange, +}; use num_traits::Zero; use serde::{Deserialize, Serialize, Serializer}; use std::pin::Pin; @@ -391,10 +394,35 @@ pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> Ok(serde_json::to_value(ret)?) } +pub trait PreBinnedItem: Unpin { + type BinnedStreamItem: AggregatableTdim + Unpin + Send; + fn into_binned_stream_item(self, fit_range: NanoRange) -> Option; +} + +impl PreBinnedItem for PreBinnedScalarItem { + type BinnedStreamItem = BinnedScalarStreamItem; + + fn into_binned_stream_item(self, fit_range: NanoRange) -> Option { + match self { + Self::RangeComplete => Some(Self::BinnedStreamItem::RangeComplete), + Self::Batch(item) => { + use super::agg::{Fits, FitsInside}; + match item.fits_inside(fit_range) { + Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => { + Some(Self::BinnedStreamItem::Values(item)) + } + _ => None, + } + } + } + } +} + pub trait BinnedStreamKind: Clone + Unpin + Send + Sync + 'static { type BinnedStreamItem: MakeBytesFrame; type BinnedStreamType: Stream + Send + 'static; type Dummy: Default + Unpin + Send; + type PreBinnedItem: PreBinnedItem + Send; fn new_binned_from_prebinned( &self, @@ -434,6 +462,7 @@ impl BinnedStreamKind for BinnedStreamKindScalar { type BinnedStreamItem = Result, Error>; type BinnedStreamType = BinnedStream; type Dummy = u32; + type PreBinnedItem = PreBinnedScalarItem; fn new_binned_from_prebinned( &self, diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index 3ebb0f7..05bf407 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -1,6 +1,6 @@ use crate::agg::binnedt::IntoBinnedT; use crate::agg::streams::StreamItem; -use crate::binned::{BinnedScalarStreamItem, BinnedStreamKind}; +use crate::binned::{BinnedScalarStreamItem, BinnedStreamKind, PreBinnedItem}; use crate::cache::pbvfs::{PreBinnedScalarItem, PreBinnedScalarValueFetchedStream}; use crate::cache::{CacheUsage, PreBinnedQuery}; use err::Error; @@ -16,8 +16,18 @@ pub struct BinnedScalarStreamFromPreBinnedPatches where BK: BinnedStreamKind, { - inp: Pin, Error>> + Send>>, - _marker: BK::Dummy, + //inp: Pin, Error>> + Send>>, + inp: Pin< + Box< + dyn Stream< + Item = Result< + StreamItem<<::PreBinnedItem as PreBinnedItem>::BinnedStreamItem>, + Error, + >, + > + Send, + >, + >, + stream_kind: BK, } impl BinnedScalarStreamFromPreBinnedPatches @@ -75,34 +85,24 @@ where Ok(item) => match item { StreamItem::Log(item) => Some(Ok(StreamItem::Log(item))), StreamItem::Stats(item) => Some(Ok(StreamItem::Stats(item))), - StreamItem::DataItem(item) => match item { - PreBinnedScalarItem::RangeComplete => { - Some(Ok(StreamItem::DataItem(BinnedScalarStreamItem::RangeComplete))) + StreamItem::DataItem(item) => { + match crate::binned::PreBinnedItem::into_binned_stream_item(item, fit_range) { + Some(item) => Some(Ok(StreamItem::DataItem(item))), + None => None, } - PreBinnedScalarItem::Batch(item) => { - use super::agg::{Fits, FitsInside}; - match item.fits_inside(fit_range) { - Fits::Inside - | Fits::PartlyGreater - | Fits::PartlyLower - | Fits::PartlyLowerAndGreater => { - Some(Ok(StreamItem::DataItem(BinnedScalarStreamItem::Values(item)))) - } - _ => None, - } - } - }, + } }, Err(e) => Some(Err(e)), }; ready(g) } - }) - .into_binned_t(range); - let mm = BK::Dummy::default(); + }); + // TODO activate the T-binning via the bin-to-bin binning trait. + err::todo(); + //let inp = IntoBinnedT::into_binned_t(inp, range); Ok(Self { inp: Box::pin(inp), - _marker: mm, + stream_kind, }) } } @@ -111,7 +111,7 @@ impl Stream for BinnedScalarStreamFromPreBinnedPatches where BK: BinnedStreamKind, { - type Item = Result, Error>; + type Item = Result::PreBinnedItem as PreBinnedItem>::BinnedStreamItem>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs index c2b8ee5..23e873b 100644 --- a/disk/src/cache/pbvfs.rs +++ b/disk/src/cache/pbvfs.rs @@ -62,7 +62,7 @@ impl Stream for PreBinnedScalarValueFetchedStream where BK: BinnedStreamKind, { - type Item = Result, Error>; + type Item = Result, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*;