WIP on adding merger impls, it checks
This commit is contained in:
@@ -2,8 +2,12 @@ use crate::agg::enp::XBinnedScalarEvents;
|
|||||||
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
|
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
|
||||||
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
|
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
|
||||||
use crate::agg::streams::StreamItem;
|
use crate::agg::streams::StreamItem;
|
||||||
use crate::binned::{BinsTimeBinner, EventsTimeBinner, MinMaxAvgBins, NumOps, RangeCompletableItem, RangeOverlapInfo};
|
use crate::binned::{
|
||||||
|
BinsTimeBinner, EventsTimeBinner, EventsTimeBinnerAggregator, MinMaxAvgAggregator, MinMaxAvgBins, NumOps,
|
||||||
|
RangeCompletableItem, RangeOverlapInfo, SingleXBinAggregator,
|
||||||
|
};
|
||||||
use crate::decode::EventValues;
|
use crate::decode::EventValues;
|
||||||
|
use crate::Sitemty;
|
||||||
use err::Error;
|
use err::Error;
|
||||||
use futures_core::Stream;
|
use futures_core::Stream;
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
@@ -24,9 +28,10 @@ where
|
|||||||
{
|
{
|
||||||
type Input = EventValues<NTY>;
|
type Input = EventValues<NTY>;
|
||||||
type Output = MinMaxAvgBins<NTY>;
|
type Output = MinMaxAvgBins<NTY>;
|
||||||
|
type Aggregator = MinMaxAvgAggregator<NTY>;
|
||||||
|
|
||||||
fn process(inp: Self::Input) -> Self::Output {
|
fn aggregator(range: NanoRange) -> Self::Aggregator {
|
||||||
todo!()
|
Self::Aggregator::new(range)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -39,11 +44,13 @@ where
|
|||||||
NTY: NumOps,
|
NTY: NumOps,
|
||||||
{
|
{
|
||||||
type Input = XBinnedScalarEvents<NTY>;
|
type Input = XBinnedScalarEvents<NTY>;
|
||||||
// TODO is that output type good enough for now?
|
// TODO is that output type good enough for now? Maybe better with a new one also
|
||||||
|
// to distinguish from the earlier one.
|
||||||
type Output = MinMaxAvgBins<NTY>;
|
type Output = MinMaxAvgBins<NTY>;
|
||||||
|
type Aggregator = SingleXBinAggregator<NTY>;
|
||||||
|
|
||||||
fn process(inp: Self::Input) -> Self::Output {
|
fn aggregator(range: NanoRange) -> Self::Aggregator {
|
||||||
todo!()
|
Self::Aggregator::new(range)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -134,54 +141,49 @@ impl Agg3 {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Aggregator3Tdim for Agg3 {
|
pub struct TBinnerStream<S, ETB>
|
||||||
type InputValue = MinMaxAvgScalarEventBatch;
|
where
|
||||||
type OutputValue = MinMaxAvgScalarBinBatch;
|
S: Stream<Item = Sitemty<<ETB as EventsTimeBinner>::Input>> + Send + Unpin + 'static,
|
||||||
}
|
ETB: EventsTimeBinner + Send + Unpin + 'static,
|
||||||
|
{
|
||||||
pub struct BinnedT3Stream {
|
inp: S,
|
||||||
// TODO get rid of box:
|
|
||||||
inp: Pin<Box<dyn Stream<Item = Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarEventBatch>>, Error>> + Send>>,
|
|
||||||
//aggtor: Option<<<SK as BinnedStreamKind>::XBinnedEvents as AggregatableTdim<SK>>::Aggregator>,
|
|
||||||
aggtor: Option<Agg3>,
|
|
||||||
spec: BinnedRange,
|
spec: BinnedRange,
|
||||||
curbin: u32,
|
curbin: u32,
|
||||||
|
left: Option<Poll<Option<Sitemty<<ETB as EventsTimeBinner>::Input>>>>,
|
||||||
|
aggtor: Option<<ETB as EventsTimeBinner>::Aggregator>,
|
||||||
|
tmp_agg_results: VecDeque<<<ETB as EventsTimeBinner>::Aggregator as EventsTimeBinnerAggregator>::Output>,
|
||||||
inp_completed: bool,
|
inp_completed: bool,
|
||||||
all_bins_emitted: bool,
|
all_bins_emitted: bool,
|
||||||
range_complete_observed: bool,
|
range_complete_observed: bool,
|
||||||
range_complete_emitted: bool,
|
range_complete_emitted: bool,
|
||||||
left: Option<Poll<Option<Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarEventBatch>>, Error>>>>,
|
|
||||||
errored: bool,
|
errored: bool,
|
||||||
completed: bool,
|
completed: bool,
|
||||||
tmp_agg_results: VecDeque<MinMaxAvgScalarBinBatch>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BinnedT3Stream {
|
impl<S, ETB> TBinnerStream<S, ETB>
|
||||||
pub fn new<S>(inp: S, spec: BinnedRange) -> Self
|
where
|
||||||
where
|
S: Stream<Item = Sitemty<<ETB as EventsTimeBinner>::Input>> + Send + Unpin + 'static,
|
||||||
S: Stream<Item = Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarEventBatch>>, Error>> + Send + 'static,
|
ETB: EventsTimeBinner,
|
||||||
{
|
{
|
||||||
|
pub fn new(inp: S, spec: BinnedRange) -> Self {
|
||||||
let range = spec.get_range(0);
|
let range = spec.get_range(0);
|
||||||
Self {
|
Self {
|
||||||
inp: Box::pin(inp),
|
inp,
|
||||||
aggtor: Some(Agg3::new(range)),
|
|
||||||
spec,
|
spec,
|
||||||
curbin: 0,
|
curbin: 0,
|
||||||
|
left: None,
|
||||||
|
aggtor: Some(<ETB as EventsTimeBinner>::aggregator(range)),
|
||||||
|
tmp_agg_results: VecDeque::new(),
|
||||||
inp_completed: false,
|
inp_completed: false,
|
||||||
all_bins_emitted: false,
|
all_bins_emitted: false,
|
||||||
range_complete_observed: false,
|
range_complete_observed: false,
|
||||||
range_complete_emitted: false,
|
range_complete_emitted: false,
|
||||||
left: None,
|
|
||||||
errored: false,
|
errored: false,
|
||||||
completed: false,
|
completed: false,
|
||||||
tmp_agg_results: VecDeque::new(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn cur(
|
fn cur(&mut self, cx: &mut Context) -> Poll<Option<Sitemty<<ETB as EventsTimeBinner>::Input>>> {
|
||||||
&mut self,
|
|
||||||
cx: &mut Context,
|
|
||||||
) -> Poll<Option<Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarEventBatch>>, Error>>> {
|
|
||||||
if let Some(cur) = self.left.take() {
|
if let Some(cur) = self.left.take() {
|
||||||
cur
|
cur
|
||||||
} else if self.inp_completed {
|
} else if self.inp_completed {
|
||||||
@@ -197,11 +199,13 @@ impl BinnedT3Stream {
|
|||||||
let range = self.spec.get_range(self.curbin);
|
let range = self.spec.get_range(self.curbin);
|
||||||
let ret = self
|
let ret = self
|
||||||
.aggtor
|
.aggtor
|
||||||
.replace(Agg3::new(range))
|
.replace(<ETB as EventsTimeBinner>::aggregator(range))
|
||||||
// TODO handle None case, or remove Option if Agg is always present
|
// TODO handle None case, or remove Option if Agg is always present
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.result();
|
.result();
|
||||||
self.tmp_agg_results = VecDeque::from(ret);
|
// TODO should we accumulate bins before emit? Maybe not, we want to stay responsive.
|
||||||
|
// Only if the frequency would be high, that would require cpu time checks. Worth it? Measure..
|
||||||
|
self.tmp_agg_results.push_back(ret);
|
||||||
if self.curbin >= self.spec.count as u32 {
|
if self.curbin >= self.spec.count as u32 {
|
||||||
self.all_bins_emitted = true;
|
self.all_bins_emitted = true;
|
||||||
}
|
}
|
||||||
@@ -209,8 +213,8 @@ impl BinnedT3Stream {
|
|||||||
|
|
||||||
fn handle(
|
fn handle(
|
||||||
&mut self,
|
&mut self,
|
||||||
cur: Poll<Option<Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarEventBatch>>, Error>>>,
|
cur: Poll<Option<Sitemty<<ETB as EventsTimeBinner>::Input>>>,
|
||||||
) -> Option<Poll<Option<Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarBinBatch>>, Error>>>> {
|
) -> Option<Poll<Option<Sitemty<<ETB as EventsTimeBinner>::Output>>>> {
|
||||||
use Poll::*;
|
use Poll::*;
|
||||||
match cur {
|
match cur {
|
||||||
Ready(Some(Ok(item))) => match item {
|
Ready(Some(Ok(item))) => match item {
|
||||||
@@ -228,9 +232,9 @@ impl BinnedT3Stream {
|
|||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
let ag = self.aggtor.as_mut().unwrap();
|
let ag = self.aggtor.as_mut().unwrap();
|
||||||
if item.ends_before(ag.range.clone()) {
|
if item.ends_before(ag.range().clone()) {
|
||||||
None
|
None
|
||||||
} else if item.starts_after(ag.range.clone()) {
|
} else if item.starts_after(ag.range().clone()) {
|
||||||
self.left =
|
self.left =
|
||||||
Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))));
|
Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))));
|
||||||
self.cycle_current_bin();
|
self.cycle_current_bin();
|
||||||
@@ -238,7 +242,7 @@ impl BinnedT3Stream {
|
|||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
ag.ingest(&item);
|
ag.ingest(&item);
|
||||||
if item.ends_after(ag.range.clone()) {
|
if item.ends_after(ag.range().clone()) {
|
||||||
self.left =
|
self.left =
|
||||||
Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))));
|
Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))));
|
||||||
self.cycle_current_bin();
|
self.cycle_current_bin();
|
||||||
@@ -269,8 +273,12 @@ impl BinnedT3Stream {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Stream for BinnedT3Stream {
|
impl<S, ETB> Stream for TBinnerStream<S, ETB>
|
||||||
type Item = Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarBinBatch>>, Error>;
|
where
|
||||||
|
S: Stream<Item = Sitemty<<ETB as EventsTimeBinner>::Input>> + Send + Unpin + 'static,
|
||||||
|
ETB: EventsTimeBinner + Send + Unpin + 'static,
|
||||||
|
{
|
||||||
|
type Item = Sitemty<<ETB as EventsTimeBinner>::Output>;
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
use Poll::*;
|
use Poll::*;
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
use crate::binned::{EventsNodeProcessor, NumOps};
|
use crate::agg::streams::Appendable;
|
||||||
|
use crate::binned::{EventsNodeProcessor, NumOps, PushableIndex, RangeOverlapInfo, WithLen, WithTimestamps};
|
||||||
use crate::decode::EventValues;
|
use crate::decode::EventValues;
|
||||||
|
use netpod::NanoRange;
|
||||||
use serde::de::DeserializeOwned;
|
use serde::de::DeserializeOwned;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
@@ -29,6 +31,83 @@ pub struct XBinnedScalarEvents<NTY> {
|
|||||||
xbincount: Vec<u32>,
|
xbincount: Vec<u32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<NTY> XBinnedScalarEvents<NTY> {
|
||||||
|
pub fn empty() -> Self {
|
||||||
|
Self {
|
||||||
|
tss: vec![],
|
||||||
|
mins: vec![],
|
||||||
|
maxs: vec![],
|
||||||
|
avgs: vec![],
|
||||||
|
xbincount: vec![],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<NTY> WithLen for XBinnedScalarEvents<NTY> {
|
||||||
|
fn len(&self) -> usize {
|
||||||
|
self.tss.len()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<NTY> WithTimestamps for XBinnedScalarEvents<NTY> {
|
||||||
|
fn ts(&self, ix: usize) -> u64 {
|
||||||
|
self.tss[ix]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<NTY> RangeOverlapInfo for XBinnedScalarEvents<NTY> {
|
||||||
|
fn ends_before(&self, range: NanoRange) -> bool {
|
||||||
|
match self.tss.last() {
|
||||||
|
Some(&ts) => ts < range.beg,
|
||||||
|
None => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn ends_after(&self, range: NanoRange) -> bool {
|
||||||
|
match self.tss.last() {
|
||||||
|
Some(&ts) => ts >= range.end,
|
||||||
|
None => panic!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn starts_after(&self, range: NanoRange) -> bool {
|
||||||
|
match self.tss.first() {
|
||||||
|
Some(&ts) => ts >= range.end,
|
||||||
|
None => panic!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<NTY> PushableIndex for XBinnedScalarEvents<NTY>
|
||||||
|
where
|
||||||
|
NTY: NumOps,
|
||||||
|
{
|
||||||
|
fn push_index(&mut self, src: &Self, ix: usize) {
|
||||||
|
self.tss.push(src.tss[ix]);
|
||||||
|
self.xbincount.push(src.xbincount[ix]);
|
||||||
|
self.mins.push(src.mins[ix]);
|
||||||
|
self.maxs.push(src.maxs[ix]);
|
||||||
|
self.avgs.push(src.avgs[ix]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<NTY> Appendable for XBinnedScalarEvents<NTY>
|
||||||
|
where
|
||||||
|
NTY: NumOps,
|
||||||
|
{
|
||||||
|
fn empty() -> Self {
|
||||||
|
Self::empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn append(&mut self, src: &Self) {
|
||||||
|
self.tss.extend_from_slice(&src.tss);
|
||||||
|
self.xbincount.extend_from_slice(&src.xbincount);
|
||||||
|
self.mins.extend_from_slice(&src.mins);
|
||||||
|
self.maxs.extend_from_slice(&src.maxs);
|
||||||
|
self.avgs.extend_from_slice(&src.avgs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct WaveXBinner<NTY> {
|
pub struct WaveXBinner<NTY> {
|
||||||
_m1: PhantomData<NTY>,
|
_m1: PhantomData<NTY>,
|
||||||
}
|
}
|
||||||
@@ -40,7 +119,60 @@ where
|
|||||||
type Input = Vec<NTY>;
|
type Input = Vec<NTY>;
|
||||||
type Output = XBinnedScalarEvents<NTY>;
|
type Output = XBinnedScalarEvents<NTY>;
|
||||||
|
|
||||||
fn process(_inp: EventValues<Self::Input>) -> Self::Output {
|
fn process(inp: EventValues<Self::Input>) -> Self::Output {
|
||||||
todo!()
|
let nev = inp.tss.len();
|
||||||
|
let mut ret = XBinnedScalarEvents {
|
||||||
|
tss: inp.tss,
|
||||||
|
xbincount: Vec::with_capacity(nev),
|
||||||
|
mins: Vec::with_capacity(nev),
|
||||||
|
maxs: Vec::with_capacity(nev),
|
||||||
|
avgs: Vec::with_capacity(nev),
|
||||||
|
};
|
||||||
|
for i1 in 0..nev {
|
||||||
|
let mut min = None;
|
||||||
|
let mut max = None;
|
||||||
|
let mut sum = 0f32;
|
||||||
|
let mut count = 0;
|
||||||
|
let vals = &inp.values[i1];
|
||||||
|
for i2 in 0..vals.len() {
|
||||||
|
let v = vals[i2];
|
||||||
|
min = match min {
|
||||||
|
None => Some(v),
|
||||||
|
Some(min) => {
|
||||||
|
if v < min {
|
||||||
|
Some(v)
|
||||||
|
} else {
|
||||||
|
Some(min)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
max = match max {
|
||||||
|
None => Some(v),
|
||||||
|
Some(max) => {
|
||||||
|
if v > max {
|
||||||
|
Some(v)
|
||||||
|
} else {
|
||||||
|
Some(max)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let vf = v.as_();
|
||||||
|
if vf.is_nan() {
|
||||||
|
} else {
|
||||||
|
sum += vf;
|
||||||
|
count += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// TODO while X-binning I expect values, otherwise it is illegal input.
|
||||||
|
ret.xbincount.push(nev as u32);
|
||||||
|
ret.mins.push(min.unwrap());
|
||||||
|
ret.maxs.push(max.unwrap());
|
||||||
|
if count == 0 {
|
||||||
|
ret.avgs.push(f32::NAN);
|
||||||
|
} else {
|
||||||
|
ret.avgs.push(sum / count as f32);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ret
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
use crate::agg::binnedt::AggregatableTdim;
|
use crate::agg::binnedt::AggregatableTdim;
|
||||||
use crate::agg::binnedt2::AggregatableTdim2;
|
use crate::agg::binnedt2::AggregatableTdim2;
|
||||||
use crate::agg::binnedt3::{Agg3, BinnedT3Stream};
|
use crate::agg::binnedt3::{Agg3, BinnedT3Stream};
|
||||||
|
use crate::agg::enp::XBinnedScalarEvents;
|
||||||
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
|
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
|
||||||
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
|
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
|
||||||
use crate::agg::streams::{Appendable, Collectable, Collected, StreamItem, ToJsonResult};
|
use crate::agg::streams::{Appendable, Collectable, Collected, StreamItem, ToJsonResult};
|
||||||
@@ -559,9 +560,9 @@ pub trait EventsDecoder {
|
|||||||
fn result(&mut self) -> Self::Output;
|
fn result(&mut self) -> Self::Output;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait EventsNodeProcessor {
|
pub trait EventsNodeProcessor: Send + Unpin {
|
||||||
type Input;
|
type Input;
|
||||||
type Output: Send + DeserializeOwned;
|
type Output: Send + Unpin + DeserializeOwned + WithTimestamps;
|
||||||
fn process(inp: EventValues<Self::Input>) -> Self::Output;
|
fn process(inp: EventValues<Self::Input>) -> Self::Output;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -570,10 +571,19 @@ pub trait TimeBins: Send + Unpin + WithLen + Appendable {
|
|||||||
fn ts2s(&self) -> &Vec<u64>;
|
fn ts2s(&self) -> &Vec<u64>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait EventsTimeBinner {
|
pub trait EventsTimeBinner: Send + Unpin {
|
||||||
type Input;
|
type Input: Unpin + RangeOverlapInfo;
|
||||||
type Output: TimeBins;
|
type Output: TimeBins;
|
||||||
fn process(inp: Self::Input) -> Self::Output;
|
type Aggregator: EventsTimeBinnerAggregator<Input = Self::Input, Output = Self::Output> + Unpin;
|
||||||
|
fn aggregator(range: NanoRange) -> Self::Aggregator;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait EventsTimeBinnerAggregator: Send {
|
||||||
|
type Input: Unpin;
|
||||||
|
type Output: Unpin;
|
||||||
|
fn range(&self) -> &NanoRange;
|
||||||
|
fn ingest(&mut self, item: &Self::Input);
|
||||||
|
fn result(self) -> Self::Output;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait BinsTimeBinner {
|
pub trait BinsTimeBinner {
|
||||||
@@ -582,6 +592,7 @@ pub trait BinsTimeBinner {
|
|||||||
fn process(inp: Self::Input) -> Self::Output;
|
fn process(inp: Self::Input) -> Self::Output;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
pub struct MinMaxAvgBins<NTY> {
|
pub struct MinMaxAvgBins<NTY> {
|
||||||
ts1s: Vec<u64>,
|
ts1s: Vec<u64>,
|
||||||
ts2s: Vec<u64>,
|
ts2s: Vec<u64>,
|
||||||
@@ -641,6 +652,102 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<NTY> ReadableFromFile for MinMaxAvgBins<NTY>
|
||||||
|
where
|
||||||
|
NTY: NumOps,
|
||||||
|
{
|
||||||
|
// TODO this function is not needed in the trait:
|
||||||
|
fn read_from_file(file: File) -> Result<ReadPbv<Self>, Error> {
|
||||||
|
Ok(ReadPbv::new(file))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn from_buf(buf: &[u8]) -> Result<Self, Error> {
|
||||||
|
let dec = serde_cbor::from_slice(&buf)?;
|
||||||
|
Ok(dec)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct MinMaxAvgAggregator<NTY> {
|
||||||
|
range: NanoRange,
|
||||||
|
count: u32,
|
||||||
|
min: Option<NTY>,
|
||||||
|
max: Option<NTY>,
|
||||||
|
avg: Option<f32>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<NTY> MinMaxAvgAggregator<NTY> {
|
||||||
|
pub fn new(range: NanoRange) -> Self {
|
||||||
|
Self {
|
||||||
|
range,
|
||||||
|
count: 0,
|
||||||
|
min: None,
|
||||||
|
max: None,
|
||||||
|
avg: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<NTY> EventsTimeBinnerAggregator for MinMaxAvgAggregator<NTY>
|
||||||
|
where
|
||||||
|
NTY: NumOps,
|
||||||
|
{
|
||||||
|
type Input = EventValues<NTY>;
|
||||||
|
type Output = MinMaxAvgBins<NTY>;
|
||||||
|
|
||||||
|
fn range(&self) -> &NanoRange {
|
||||||
|
&self.range
|
||||||
|
}
|
||||||
|
|
||||||
|
fn ingest(&mut self, item: &Self::Input) {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn result(self) -> Self::Output {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct SingleXBinAggregator<NTY> {
|
||||||
|
range: NanoRange,
|
||||||
|
count: u32,
|
||||||
|
min: Option<NTY>,
|
||||||
|
max: Option<NTY>,
|
||||||
|
avg: Option<f32>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<NTY> SingleXBinAggregator<NTY> {
|
||||||
|
pub fn new(range: NanoRange) -> Self {
|
||||||
|
Self {
|
||||||
|
range,
|
||||||
|
count: 0,
|
||||||
|
min: None,
|
||||||
|
max: None,
|
||||||
|
avg: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<NTY> EventsTimeBinnerAggregator for SingleXBinAggregator<NTY>
|
||||||
|
where
|
||||||
|
NTY: NumOps,
|
||||||
|
{
|
||||||
|
type Input = XBinnedScalarEvents<NTY>;
|
||||||
|
// TODO do I need another type to carry the x-bin count as well? No xbincount is static anyways.
|
||||||
|
type Output = MinMaxAvgBins<NTY>;
|
||||||
|
|
||||||
|
fn range(&self) -> &NanoRange {
|
||||||
|
&self.range
|
||||||
|
}
|
||||||
|
|
||||||
|
fn ingest(&mut self, item: &Self::Input) {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn result(self) -> Self::Output {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub trait StreamKind: Clone + Unpin + Send + Sync + 'static {
|
pub trait StreamKind: Clone + Unpin + Send + Sync + 'static {
|
||||||
type TBinnedStreamType: Stream<Item = Result<StreamItem<RangeCompletableItem<Self::TBinnedBins>>, Error>> + Send;
|
type TBinnedStreamType: Stream<Item = Result<StreamItem<RangeCompletableItem<Self::TBinnedBins>>, Error>> + Send;
|
||||||
type XBinnedEvents: XBinnedEvents<Self>;
|
type XBinnedEvents: XBinnedEvents<Self>;
|
||||||
|
|||||||
@@ -1,13 +1,15 @@
|
|||||||
|
use crate::agg::binnedt4::TBinnerStream;
|
||||||
use crate::agg::streams::{Appendable, StreamItem};
|
use crate::agg::streams::{Appendable, StreamItem};
|
||||||
use crate::binned::query::{CacheUsage, PreBinnedQuery};
|
use crate::binned::query::{CacheUsage, PreBinnedQuery};
|
||||||
use crate::binned::{
|
use crate::binned::{
|
||||||
BinnedStreamKindScalar, BinsTimeBinner, EventsNodeProcessor, EventsTimeBinner, NumOps, RangeCompletableItem,
|
BinnedStreamKindScalar, BinsTimeBinner, EventsNodeProcessor, EventsTimeBinner, NumOps, PushableIndex,
|
||||||
ReadableFromFile, StreamKind, WithLen,
|
RangeCompletableItem, ReadableFromFile, StreamKind, WithLen,
|
||||||
};
|
};
|
||||||
use crate::cache::pbvfs::PreBinnedScalarValueFetchedStream;
|
use crate::cache::pbvfs::PreBinnedScalarValueFetchedStream;
|
||||||
use crate::cache::{write_pb_cache_min_max_avg_scalar, CacheFileDesc, MergedFromRemotes, WrittenPbCache};
|
use crate::cache::{write_pb_cache_min_max_avg_scalar, CacheFileDesc, MergedFromRemotes, WrittenPbCache};
|
||||||
use crate::decode::{Endianness, EventValueFromBytes, EventValueShape, NumFromBytes};
|
use crate::decode::{Endianness, EventValueFromBytes, EventValueShape, NumFromBytes};
|
||||||
use crate::frame::makeframe::{make_frame, FrameType};
|
use crate::frame::makeframe::{make_frame, FrameType};
|
||||||
|
use crate::merge::mergefromremote::MergedFromRemotes2;
|
||||||
use crate::raw::EventsQuery;
|
use crate::raw::EventsQuery;
|
||||||
use crate::streamlog::Streamlog;
|
use crate::streamlog::Streamlog;
|
||||||
use crate::Sitemty;
|
use crate::Sitemty;
|
||||||
@@ -27,19 +29,18 @@ use tokio::fs::{File, OpenOptions};
|
|||||||
|
|
||||||
//pub type SomeScc = netpod::streamext::SCC<u32>;
|
//pub type SomeScc = netpod::streamext::SCC<u32>;
|
||||||
|
|
||||||
pub struct PreBinnedValueStream<NTY, END, EVS, ENP, ETB, BTB>
|
pub struct PreBinnedValueStream<NTY, END, EVS, ENP, ETB>
|
||||||
where
|
where
|
||||||
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + 'static,
|
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + 'static,
|
||||||
END: Endianness + 'static,
|
END: Endianness + 'static,
|
||||||
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
|
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
|
||||||
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output>,
|
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output>,
|
||||||
ETB: EventsTimeBinner<Input = <ENP as EventsNodeProcessor>::Output>,
|
ETB: EventsTimeBinner<Input = <ENP as EventsNodeProcessor>::Output>,
|
||||||
BTB: BinsTimeBinner<Input = <ETB as EventsTimeBinner>::Output, Output = <ETB as EventsTimeBinner>::Output>,
|
|
||||||
{
|
{
|
||||||
query: PreBinnedQuery,
|
query: PreBinnedQuery,
|
||||||
node_config: NodeConfigCached,
|
node_config: NodeConfigCached,
|
||||||
open_check_local_file: Option<Pin<Box<dyn Future<Output = Result<File, io::Error>> + Send>>>,
|
open_check_local_file: Option<Pin<Box<dyn Future<Output = Result<File, io::Error>> + Send>>>,
|
||||||
fut2: Option<Pin<Box<dyn Stream<Item = Sitemty<<BTB as BinsTimeBinner>::Output>> + Send>>>,
|
fut2: Option<Pin<Box<dyn Stream<Item = Sitemty<<ETB as EventsTimeBinner>::Output>> + Send>>>,
|
||||||
read_from_cache: bool,
|
read_from_cache: bool,
|
||||||
cache_written: bool,
|
cache_written: bool,
|
||||||
data_complete: bool,
|
data_complete: bool,
|
||||||
@@ -48,9 +49,9 @@ where
|
|||||||
errored: bool,
|
errored: bool,
|
||||||
completed: bool,
|
completed: bool,
|
||||||
streamlog: Streamlog,
|
streamlog: Streamlog,
|
||||||
values: <BTB as BinsTimeBinner>::Output,
|
values: <ETB as EventsTimeBinner>::Output,
|
||||||
write_fut: Option<Pin<Box<dyn Future<Output = Result<WrittenPbCache, Error>> + Send>>>,
|
write_fut: Option<Pin<Box<dyn Future<Output = Result<WrittenPbCache, Error>> + Send>>>,
|
||||||
read_cache_fut: Option<Pin<Box<dyn Future<Output = Sitemty<<BTB as BinsTimeBinner>::Output>> + Send>>>,
|
read_cache_fut: Option<Pin<Box<dyn Future<Output = Sitemty<<ETB as EventsTimeBinner>::Output>> + Send>>>,
|
||||||
_m1: PhantomData<NTY>,
|
_m1: PhantomData<NTY>,
|
||||||
_m2: PhantomData<END>,
|
_m2: PhantomData<END>,
|
||||||
_m3: PhantomData<EVS>,
|
_m3: PhantomData<EVS>,
|
||||||
@@ -58,15 +59,16 @@ where
|
|||||||
_m5: PhantomData<ETB>,
|
_m5: PhantomData<ETB>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<NTY, END, EVS, ENP, ETB, BTB> PreBinnedValueStream<NTY, END, EVS, ENP, ETB, BTB>
|
impl<NTY, END, EVS, ENP, ETB> PreBinnedValueStream<NTY, END, EVS, ENP, ETB>
|
||||||
where
|
where
|
||||||
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + 'static,
|
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + 'static,
|
||||||
END: Endianness + 'static,
|
END: Endianness + 'static,
|
||||||
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
|
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
|
||||||
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output>,
|
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output> + 'static,
|
||||||
ETB: EventsTimeBinner<Input = <ENP as EventsNodeProcessor>::Output>,
|
ETB: EventsTimeBinner<Input = <ENP as EventsNodeProcessor>::Output> + 'static,
|
||||||
BTB: BinsTimeBinner<Input = <ETB as EventsTimeBinner>::Output, Output = <ETB as EventsTimeBinner>::Output>,
|
<ENP as EventsNodeProcessor>::Output: PushableIndex + Appendable,
|
||||||
<BTB as BinsTimeBinner>::Output: Appendable,
|
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType,
|
||||||
|
<ETB as EventsTimeBinner>::Output: Appendable,
|
||||||
{
|
{
|
||||||
pub fn new(query: PreBinnedQuery, node_config: &NodeConfigCached) -> Self {
|
pub fn new(query: PreBinnedQuery, node_config: &NodeConfigCached) -> Self {
|
||||||
Self {
|
Self {
|
||||||
@@ -82,7 +84,7 @@ where
|
|||||||
errored: false,
|
errored: false,
|
||||||
completed: false,
|
completed: false,
|
||||||
streamlog: Streamlog::new(node_config.ix as u32),
|
streamlog: Streamlog::new(node_config.ix as u32),
|
||||||
values: <<BTB as BinsTimeBinner>::Output as Appendable>::empty(),
|
values: <<ETB as EventsTimeBinner>::Output as Appendable>::empty(),
|
||||||
write_fut: None,
|
write_fut: None,
|
||||||
read_cache_fut: None,
|
read_cache_fut: None,
|
||||||
_m1: PhantomData,
|
_m1: PhantomData,
|
||||||
@@ -93,10 +95,9 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO handle errors also here via return type.
|
|
||||||
fn setup_merged_from_remotes(
|
fn setup_merged_from_remotes(
|
||||||
&mut self,
|
&mut self,
|
||||||
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<<BTB as BinsTimeBinner>::Output>> + Send>>, Error> {
|
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<<ETB as EventsTimeBinner>::Output>> + Send>>, Error> {
|
||||||
let evq = EventsQuery {
|
let evq = EventsQuery {
|
||||||
channel: self.query.channel().clone(),
|
channel: self.query.channel().clone(),
|
||||||
range: self.query.patch().patch_range(),
|
range: self.query.patch().patch_range(),
|
||||||
@@ -113,30 +114,27 @@ where
|
|||||||
}
|
}
|
||||||
// TODO do I need to set up more transformations or binning to deliver the requested data?
|
// TODO do I need to set up more transformations or binning to deliver the requested data?
|
||||||
let count = self.query.patch().patch_t_len() / self.query.patch().bin_t_len();
|
let count = self.query.patch().patch_t_len() / self.query.patch().bin_t_len();
|
||||||
let range = BinnedRange::covering_range(evq.range.clone(), count as u32)
|
let range = BinnedRange::covering_range(evq.range.clone(), count as u32)?
|
||||||
.unwrap()
|
.ok_or(Error::with_msg("covering_range returns None"))?;
|
||||||
.ok_or(Error::with_msg("covering_range returns None"))
|
|
||||||
.unwrap();
|
|
||||||
let perf_opts = PerfOpts { inmem_bufcap: 512 };
|
let perf_opts = PerfOpts { inmem_bufcap: 512 };
|
||||||
|
|
||||||
// TODO copy the MergedFromRemotes and adapt...
|
// TODO copy the MergedFromRemotes and adapt...
|
||||||
/*let s1 = MergedFromRemotes::new(
|
let s1 = MergedFromRemotes2::<ENP>::new(evq, perf_opts, self.node_config.node_config.cluster.clone());
|
||||||
evq,
|
|
||||||
perf_opts,
|
|
||||||
self.node_config.node_config.cluster.clone(),
|
|
||||||
..........,
|
|
||||||
);*/
|
|
||||||
let s1: MergedFromRemotes<BinnedStreamKindScalar> = err::todoval();
|
|
||||||
|
|
||||||
// TODO
|
// TODO
|
||||||
|
// Go from ENP values to a T-binned stream...
|
||||||
|
// Most of the algo is static same.
|
||||||
|
// What varies: init aggregator for next T-bin.
|
||||||
|
let ret = TBinnerStream::<_, ETB>::new(s1, range);
|
||||||
|
|
||||||
//let s1 = todo_convert_stream_to_tbinned_stream(s1, range);
|
//let s1 = todo_convert_stream_to_tbinned_stream(s1, range);
|
||||||
Ok(err::todoval())
|
Ok(Box::pin(ret))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn setup_from_higher_res_prebinned(
|
fn setup_from_higher_res_prebinned(
|
||||||
&mut self,
|
&mut self,
|
||||||
range: PreBinnedPatchRange,
|
range: PreBinnedPatchRange,
|
||||||
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<<BTB as BinsTimeBinner>::Output>> + Send>>, Error> {
|
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<<ETB as EventsTimeBinner>::Output>> + Send>>, Error> {
|
||||||
let g = self.query.patch().bin_t_len();
|
let g = self.query.patch().bin_t_len();
|
||||||
let h = range.grid_spec.bin_t_len();
|
let h = range.grid_spec.bin_t_len();
|
||||||
trace!(
|
trace!(
|
||||||
@@ -207,17 +205,18 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<NTY, END, EVS, ENP, ETB, BTB> Stream for PreBinnedValueStream<NTY, END, EVS, ENP, ETB, BTB>
|
impl<NTY, END, EVS, ENP, ETB> Stream for PreBinnedValueStream<NTY, END, EVS, ENP, ETB>
|
||||||
where
|
where
|
||||||
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + Unpin + 'static,
|
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + Unpin + 'static,
|
||||||
END: Endianness + Unpin + 'static,
|
END: Endianness + Unpin + 'static,
|
||||||
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + Unpin + 'static,
|
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + Unpin + 'static,
|
||||||
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output> + Unpin,
|
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output> + Unpin + 'static,
|
||||||
ETB: EventsTimeBinner<Input = <ENP as EventsNodeProcessor>::Output> + Unpin,
|
ETB: EventsTimeBinner<Input = <ENP as EventsNodeProcessor>::Output> + Unpin + 'static,
|
||||||
BTB: BinsTimeBinner<Input = <ETB as EventsTimeBinner>::Output, Output = <ETB as EventsTimeBinner>::Output>,
|
<ENP as EventsNodeProcessor>::Output: PushableIndex + Appendable,
|
||||||
|
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType,
|
||||||
<ETB as EventsTimeBinner>::Output: Serialize + ReadableFromFile + 'static,
|
<ETB as EventsTimeBinner>::Output: Serialize + ReadableFromFile + 'static,
|
||||||
{
|
{
|
||||||
type Item = Sitemty<<BTB as BinsTimeBinner>::Output>;
|
type Item = Sitemty<<ETB as EventsTimeBinner>::Output>;
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
use Poll::*;
|
use Poll::*;
|
||||||
@@ -291,7 +290,7 @@ where
|
|||||||
self.values.len(),
|
self.values.len(),
|
||||||
);
|
);
|
||||||
self.streamlog.append(Level::INFO, msg);
|
self.streamlog.append(Level::INFO, msg);
|
||||||
let emp = <<BTB as BinsTimeBinner>::Output as Appendable>::empty();
|
let emp = <<ETB as EventsTimeBinner>::Output as Appendable>::empty();
|
||||||
let values = std::mem::replace(&mut self.values, emp);
|
let values = std::mem::replace(&mut self.values, emp);
|
||||||
let fut = write_pb_cache_min_max_avg_scalar(
|
let fut = write_pb_cache_min_max_avg_scalar(
|
||||||
values,
|
values,
|
||||||
@@ -344,7 +343,8 @@ where
|
|||||||
match item {
|
match item {
|
||||||
Ok(file) => {
|
Ok(file) => {
|
||||||
self.read_from_cache = true;
|
self.read_from_cache = true;
|
||||||
let fut = <<BTB as BinsTimeBinner>::Output as ReadableFromFile>::read_from_file(file)?;
|
let fut =
|
||||||
|
<<ETB as EventsTimeBinner>::Output as ReadableFromFile>::read_from_file(file)?;
|
||||||
self.read_cache_fut = Some(Box::pin(fut));
|
self.read_cache_fut = Some(Box::pin(fut));
|
||||||
continue 'outer;
|
continue 'outer;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,11 +1,14 @@
|
|||||||
use crate::agg::binnedt4::{DefaultBinsTimeBinner, DefaultScalarEventsTimeBinner, DefaultSingleXBinTimeBinner};
|
use crate::agg::binnedt4::{DefaultBinsTimeBinner, DefaultScalarEventsTimeBinner, DefaultSingleXBinTimeBinner};
|
||||||
use crate::agg::enp::{Identity, WaveXBinner};
|
use crate::agg::enp::{Identity, WaveXBinner};
|
||||||
use crate::agg::streams::StreamItem;
|
use crate::agg::streams::{Appendable, StreamItem};
|
||||||
use crate::binned::pbv2::{
|
use crate::binned::pbv2::{
|
||||||
pre_binned_value_byte_stream_new, PreBinnedValueByteStream, PreBinnedValueByteStreamInner, PreBinnedValueStream,
|
pre_binned_value_byte_stream_new, PreBinnedValueByteStream, PreBinnedValueByteStreamInner, PreBinnedValueStream,
|
||||||
};
|
};
|
||||||
use crate::binned::query::PreBinnedQuery;
|
use crate::binned::query::PreBinnedQuery;
|
||||||
use crate::binned::{BinsTimeBinner, EventsNodeProcessor, EventsTimeBinner, NumOps, RangeCompletableItem, StreamKind};
|
use crate::binned::{
|
||||||
|
BinsTimeBinner, EventsNodeProcessor, EventsTimeBinner, NumOps, PushableIndex, RangeCompletableItem,
|
||||||
|
ReadableFromFile, StreamKind,
|
||||||
|
};
|
||||||
use crate::cache::node_ix_for_patch;
|
use crate::cache::node_ix_for_patch;
|
||||||
use crate::decode::{
|
use crate::decode::{
|
||||||
BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case,
|
BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case,
|
||||||
@@ -23,52 +26,62 @@ use parse::channelconfig::{extract_matching_config_entry, read_local_config, Mat
|
|||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
|
|
||||||
// TODO instead of EventNodeProcessor, use a T-binning processor here
|
|
||||||
// TODO might also want another stateful processor which can run on the merged event stream, like smoothing.
|
|
||||||
|
|
||||||
fn make_num_pipeline_nty_end_evs_enp<NTY, END, EVS, ENP, ETB>(
|
fn make_num_pipeline_nty_end_evs_enp<NTY, END, EVS, ENP, ETB>(
|
||||||
|
query: PreBinnedQuery,
|
||||||
event_value_shape: EVS,
|
event_value_shape: EVS,
|
||||||
|
node_config: &NodeConfigCached,
|
||||||
) -> Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>
|
) -> Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>
|
||||||
where
|
where
|
||||||
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + 'static,
|
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + 'static,
|
||||||
END: Endianness + 'static,
|
END: Endianness + 'static,
|
||||||
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
|
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
|
||||||
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output>,
|
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output> + 'static,
|
||||||
ETB: EventsTimeBinner<Input = <ENP as EventsNodeProcessor>::Output>,
|
ETB: EventsTimeBinner<Input = <ENP as EventsNodeProcessor>::Output> + 'static,
|
||||||
Sitemty<<ENP as EventsNodeProcessor>::Output>: Framable + 'static,
|
Sitemty<<ENP as EventsNodeProcessor>::Output>: Framable + 'static,
|
||||||
<ENP as EventsNodeProcessor>::Output: 'static,
|
<ENP as EventsNodeProcessor>::Output: PushableIndex + Appendable + 'static,
|
||||||
|
<ETB as EventsTimeBinner>::Output: Serialize + ReadableFromFile + 'static,
|
||||||
|
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType,
|
||||||
|
Sitemty<<ETB as EventsTimeBinner>::Output>: Framable,
|
||||||
{
|
{
|
||||||
// TODO
|
// TODO
|
||||||
// Use the pre-binned fetch machinery, refactored...
|
// Currently, this mod uses stuff from pbv2, therefore complete path:
|
||||||
err::todoval()
|
let ret = crate::binned::pbv::PreBinnedValueStream::<NTY, END, EVS, ENP, ETB>::new(query, node_config);
|
||||||
|
let ret = StreamExt::map(ret, |item| Box::new(item) as Box<dyn Framable>);
|
||||||
|
Box::pin(ret)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn make_num_pipeline_nty_end<NTY, END>(shape: Shape) -> Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>
|
fn make_num_pipeline_nty_end<NTY, END>(
|
||||||
|
shape: Shape,
|
||||||
|
query: PreBinnedQuery,
|
||||||
|
node_config: &NodeConfigCached,
|
||||||
|
) -> Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>
|
||||||
where
|
where
|
||||||
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + 'static,
|
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + 'static,
|
||||||
END: Endianness + 'static,
|
END: Endianness + 'static,
|
||||||
{
|
{
|
||||||
// TODO pass all the correct types.
|
|
||||||
err::todo();
|
|
||||||
match shape {
|
match shape {
|
||||||
Shape::Scalar => {
|
Shape::Scalar => {
|
||||||
make_num_pipeline_nty_end_evs_enp::<NTY, END, _, Identity<NTY>, DefaultScalarEventsTimeBinner<NTY>>(
|
make_num_pipeline_nty_end_evs_enp::<NTY, END, _, Identity<NTY>, DefaultScalarEventsTimeBinner<NTY>>(
|
||||||
|
query,
|
||||||
EventValuesDim0Case::new(),
|
EventValuesDim0Case::new(),
|
||||||
|
node_config,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
Shape::Wave(n) => {
|
Shape::Wave(n) => {
|
||||||
make_num_pipeline_nty_end_evs_enp::<NTY, END, _, WaveXBinner<NTY>, DefaultSingleXBinTimeBinner<NTY>>(
|
make_num_pipeline_nty_end_evs_enp::<NTY, END, _, WaveXBinner<NTY>, DefaultSingleXBinTimeBinner<NTY>>(
|
||||||
|
query,
|
||||||
EventValuesDim1Case::new(n),
|
EventValuesDim1Case::new(n),
|
||||||
|
node_config,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
macro_rules! match_end {
|
macro_rules! match_end {
|
||||||
($nty:ident, $end:expr, $shape:expr) => {
|
($nty:ident, $end:expr, $shape:expr, $query:expr, $node_config:expr) => {
|
||||||
match $end {
|
match $end {
|
||||||
ByteOrder::LE => make_num_pipeline_nty_end::<$nty, LittleEndian>($shape),
|
ByteOrder::LE => make_num_pipeline_nty_end::<$nty, LittleEndian>($shape, $query, $node_config),
|
||||||
ByteOrder::BE => make_num_pipeline_nty_end::<$nty, BigEndian>($shape),
|
ByteOrder::BE => make_num_pipeline_nty_end::<$nty, BigEndian>($shape, $query, $node_config),
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@@ -77,10 +90,12 @@ fn make_num_pipeline(
|
|||||||
scalar_type: ScalarType,
|
scalar_type: ScalarType,
|
||||||
byte_order: ByteOrder,
|
byte_order: ByteOrder,
|
||||||
shape: Shape,
|
shape: Shape,
|
||||||
|
query: PreBinnedQuery,
|
||||||
|
node_config: &NodeConfigCached,
|
||||||
) -> Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>> {
|
) -> Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>> {
|
||||||
match scalar_type {
|
match scalar_type {
|
||||||
ScalarType::I32 => match_end!(i32, byte_order, shape),
|
ScalarType::I32 => match_end!(i32, byte_order, shape, query, node_config),
|
||||||
ScalarType::F32 => match_end!(f32, byte_order, shape),
|
ScalarType::F64 => match_end!(f64, byte_order, shape, query, node_config),
|
||||||
_ => todo!(),
|
_ => todo!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -129,6 +144,8 @@ where
|
|||||||
entry.scalar_type.clone(),
|
entry.scalar_type.clone(),
|
||||||
entry.byte_order.clone(),
|
entry.byte_order.clone(),
|
||||||
entry.to_shape().unwrap(),
|
entry.to_shape().unwrap(),
|
||||||
|
query.clone(),
|
||||||
|
node_config,
|
||||||
)
|
)
|
||||||
.map(|item| match item.make_frame() {
|
.map(|item| match item.make_frame() {
|
||||||
Ok(item) => Ok(item.freeze()),
|
Ok(item) => Ok(item.freeze()),
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
|
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
|
||||||
use crate::agg::streams::StreamItem;
|
use crate::agg::streams::{Appendable, StreamItem};
|
||||||
use crate::binned::{EventsNodeProcessor, NumOps, RangeCompletableItem};
|
use crate::binned::{
|
||||||
|
EventsNodeProcessor, NumOps, PushableIndex, RangeCompletableItem, RangeOverlapInfo, WithLen, WithTimestamps,
|
||||||
|
};
|
||||||
use crate::eventblobs::EventBlobsComplete;
|
use crate::eventblobs::EventBlobsComplete;
|
||||||
use crate::eventchunker::EventFull;
|
use crate::eventchunker::EventFull;
|
||||||
use crate::frame::makeframe::{make_frame, Framable};
|
use crate::frame::makeframe::{make_frame, Framable};
|
||||||
@@ -8,6 +10,7 @@ use bytes::BytesMut;
|
|||||||
use err::Error;
|
use err::Error;
|
||||||
use futures_core::Stream;
|
use futures_core::Stream;
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
|
use netpod::NanoRange;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::mem::size_of;
|
use std::mem::size_of;
|
||||||
@@ -111,13 +114,17 @@ impl<NTY> EventValuesDim0Case<NTY> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO get rid of this dummy:
|
||||||
pub struct ProcAA<NTY> {
|
pub struct ProcAA<NTY> {
|
||||||
_m1: PhantomData<NTY>,
|
_m1: PhantomData<NTY>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<NTY> EventsNodeProcessor for ProcAA<NTY> {
|
impl<NTY> EventsNodeProcessor for ProcAA<NTY>
|
||||||
|
where
|
||||||
|
NTY: NumOps,
|
||||||
|
{
|
||||||
type Input = NTY;
|
type Input = NTY;
|
||||||
type Output = MinMaxAvgScalarBinBatch;
|
type Output = EventValues<NTY>;
|
||||||
|
|
||||||
fn process(_inp: EventValues<Self::Input>) -> Self::Output {
|
fn process(_inp: EventValues<Self::Input>) -> Self::Output {
|
||||||
todo!()
|
todo!()
|
||||||
@@ -132,6 +139,12 @@ where
|
|||||||
type NumXAggToNBins = ProcAA<NTY>;
|
type NumXAggToNBins = ProcAA<NTY>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<NTY> WithTimestamps for ProcAA<NTY> {
|
||||||
|
fn ts(&self, ix: usize) -> u64 {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct EventValuesDim1Case<NTY> {
|
pub struct EventValuesDim1Case<NTY> {
|
||||||
n: u32,
|
n: u32,
|
||||||
_m1: PhantomData<NTY>,
|
_m1: PhantomData<NTY>,
|
||||||
@@ -166,6 +179,12 @@ impl<NTY> MinMaxAvgScalarEventBatchGen<NTY> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<NTY> WithTimestamps for MinMaxAvgScalarEventBatchGen<NTY> {
|
||||||
|
fn ts(&self, ix: usize) -> u64 {
|
||||||
|
self.tss[ix]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<NTY> Framable for Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarEventBatchGen<NTY>>>, Error>
|
impl<NTY> Framable for Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarEventBatchGen<NTY>>>, Error>
|
||||||
where
|
where
|
||||||
NTY: NumOps + Serialize,
|
NTY: NumOps + Serialize,
|
||||||
@@ -183,57 +202,7 @@ where
|
|||||||
type Output = MinMaxAvgScalarEventBatchGen<NTY>;
|
type Output = MinMaxAvgScalarEventBatchGen<NTY>;
|
||||||
|
|
||||||
fn process(inp: EventValues<Self::Input>) -> Self::Output {
|
fn process(inp: EventValues<Self::Input>) -> Self::Output {
|
||||||
let nev = inp.tss.len();
|
err::todoval()
|
||||||
let mut ret = MinMaxAvgScalarEventBatchGen {
|
|
||||||
tss: inp.tss,
|
|
||||||
mins: Vec::with_capacity(nev),
|
|
||||||
maxs: Vec::with_capacity(nev),
|
|
||||||
avgs: Vec::with_capacity(nev),
|
|
||||||
};
|
|
||||||
for i1 in 0..nev {
|
|
||||||
let mut min = None;
|
|
||||||
let mut max = None;
|
|
||||||
let mut sum = 0f32;
|
|
||||||
let mut count = 0;
|
|
||||||
let vals = &inp.values[i1];
|
|
||||||
for i2 in 0..vals.len() {
|
|
||||||
let v = vals[i2];
|
|
||||||
min = match min {
|
|
||||||
None => Some(v),
|
|
||||||
Some(min) => {
|
|
||||||
if v < min {
|
|
||||||
Some(v)
|
|
||||||
} else {
|
|
||||||
Some(min)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
max = match max {
|
|
||||||
None => Some(v),
|
|
||||||
Some(max) => {
|
|
||||||
if v > max {
|
|
||||||
Some(v)
|
|
||||||
} else {
|
|
||||||
Some(max)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let vf = v.as_();
|
|
||||||
if vf.is_nan() {
|
|
||||||
} else {
|
|
||||||
sum += vf;
|
|
||||||
count += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ret.mins.push(min);
|
|
||||||
ret.maxs.push(max);
|
|
||||||
if count == 0 {
|
|
||||||
ret.avgs.push(None);
|
|
||||||
} else {
|
|
||||||
ret.avgs.push(Some(sum / count as f32));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ret
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -280,6 +249,65 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<VT> WithLen for EventValues<VT> {
|
||||||
|
fn len(&self) -> usize {
|
||||||
|
self.tss.len()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<VT> WithTimestamps for EventValues<VT> {
|
||||||
|
fn ts(&self, ix: usize) -> u64 {
|
||||||
|
self.tss[ix]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<VT> RangeOverlapInfo for EventValues<VT> {
|
||||||
|
fn ends_before(&self, range: NanoRange) -> bool {
|
||||||
|
match self.tss.last() {
|
||||||
|
Some(&ts) => ts < range.beg,
|
||||||
|
None => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn ends_after(&self, range: NanoRange) -> bool {
|
||||||
|
match self.tss.last() {
|
||||||
|
Some(&ts) => ts >= range.end,
|
||||||
|
None => panic!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn starts_after(&self, range: NanoRange) -> bool {
|
||||||
|
match self.tss.first() {
|
||||||
|
Some(&ts) => ts >= range.end,
|
||||||
|
None => panic!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<NTY> PushableIndex for EventValues<NTY>
|
||||||
|
where
|
||||||
|
NTY: NumOps,
|
||||||
|
{
|
||||||
|
fn push_index(&mut self, src: &Self, ix: usize) {
|
||||||
|
self.tss.push(src.tss[ix]);
|
||||||
|
self.values.push(src.values[ix]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<NTY> Appendable for EventValues<NTY>
|
||||||
|
where
|
||||||
|
NTY: NumOps,
|
||||||
|
{
|
||||||
|
fn empty() -> Self {
|
||||||
|
Self::empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn append(&mut self, src: &Self) {
|
||||||
|
self.tss.extend_from_slice(&src.tss);
|
||||||
|
self.values.extend_from_slice(&src.values);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct EventsDecodedStream<NTY, END, EVS>
|
pub struct EventsDecodedStream<NTY, END, EVS>
|
||||||
where
|
where
|
||||||
NTY: NumOps + NumFromBytes<NTY, END>,
|
NTY: NumOps + NumFromBytes<NTY, END>,
|
||||||
|
|||||||
@@ -2,10 +2,11 @@ use crate::agg::enp::XBinnedScalarEvents;
|
|||||||
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
|
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
|
||||||
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
|
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
|
||||||
use crate::agg::streams::StreamItem;
|
use crate::agg::streams::StreamItem;
|
||||||
use crate::binned::{NumOps, RangeCompletableItem};
|
use crate::binned::{MinMaxAvgBins, NumOps, RangeCompletableItem};
|
||||||
use crate::decode::{EventValues, MinMaxAvgScalarEventBatchGen};
|
use crate::decode::{EventValues, MinMaxAvgScalarEventBatchGen};
|
||||||
use crate::frame::inmem::InMemoryFrame;
|
use crate::frame::inmem::InMemoryFrame;
|
||||||
use crate::raw::EventQueryJsonStringFrame;
|
use crate::raw::EventQueryJsonStringFrame;
|
||||||
|
use crate::Sitemty;
|
||||||
use bytes::{BufMut, BytesMut};
|
use bytes::{BufMut, BytesMut};
|
||||||
use err::Error;
|
use err::Error;
|
||||||
use serde::{de::DeserializeOwned, Serialize};
|
use serde::{de::DeserializeOwned, Serialize};
|
||||||
@@ -67,35 +68,42 @@ impl FrameType for EventQueryJsonStringFrame {
|
|||||||
const FRAME_TYPE_ID: u32 = 0x100;
|
const FRAME_TYPE_ID: u32 = 0x100;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FrameType for Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarBinBatch>>, Error> {
|
impl FrameType for Sitemty<MinMaxAvgScalarBinBatch> {
|
||||||
const FRAME_TYPE_ID: u32 = 0x200;
|
const FRAME_TYPE_ID: u32 = 0x200;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FrameType for Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarEventBatch>>, Error> {
|
impl FrameType for Sitemty<MinMaxAvgScalarEventBatch> {
|
||||||
const FRAME_TYPE_ID: u32 = 0x300;
|
const FRAME_TYPE_ID: u32 = 0x300;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<NTY> FrameType for Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarEventBatchGen<NTY>>>, Error>
|
impl<NTY> FrameType for Sitemty<MinMaxAvgScalarEventBatchGen<NTY>>
|
||||||
where
|
where
|
||||||
NTY: SubFrId,
|
NTY: SubFrId,
|
||||||
{
|
{
|
||||||
const FRAME_TYPE_ID: u32 = 0x400 + NTY::SUB;
|
const FRAME_TYPE_ID: u32 = 0x400 + NTY::SUB;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<NTY> FrameType for Result<StreamItem<RangeCompletableItem<EventValues<NTY>>>, Error>
|
impl<NTY> FrameType for Sitemty<EventValues<NTY>>
|
||||||
where
|
where
|
||||||
NTY: SubFrId,
|
NTY: SubFrId,
|
||||||
{
|
{
|
||||||
const FRAME_TYPE_ID: u32 = 0x500 + NTY::SUB;
|
const FRAME_TYPE_ID: u32 = 0x500 + NTY::SUB;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<NTY> FrameType for Result<StreamItem<RangeCompletableItem<XBinnedScalarEvents<NTY>>>, Error>
|
impl<NTY> FrameType for Sitemty<XBinnedScalarEvents<NTY>>
|
||||||
where
|
where
|
||||||
NTY: SubFrId,
|
NTY: SubFrId,
|
||||||
{
|
{
|
||||||
const FRAME_TYPE_ID: u32 = 0x600 + NTY::SUB;
|
const FRAME_TYPE_ID: u32 = 0x600 + NTY::SUB;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<NTY> FrameType for Sitemty<MinMaxAvgBins<NTY>>
|
||||||
|
where
|
||||||
|
NTY: SubFrId,
|
||||||
|
{
|
||||||
|
const FRAME_TYPE_ID: u32 = 0x700 + NTY::SUB;
|
||||||
|
}
|
||||||
|
|
||||||
pub trait ProvidesFrameType {
|
pub trait ProvidesFrameType {
|
||||||
fn frame_type_id(&self) -> u32;
|
fn frame_type_id(&self) -> u32;
|
||||||
}
|
}
|
||||||
@@ -134,6 +142,15 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<NTY> Framable for Sitemty<MinMaxAvgBins<NTY>>
|
||||||
|
where
|
||||||
|
NTY: NumOps + Serialize,
|
||||||
|
{
|
||||||
|
fn make_frame(&self) -> Result<BytesMut, Error> {
|
||||||
|
make_frame(self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn make_frame<FT>(item: &FT) -> Result<BytesMut, Error>
|
pub fn make_frame<FT>(item: &FT) -> Result<BytesMut, Error>
|
||||||
where
|
where
|
||||||
FT: FrameType + Serialize,
|
FT: FrameType + Serialize,
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
use crate::agg::streams::{Appendable, StatsItem, StreamItem};
|
use crate::agg::streams::{Appendable, StatsItem, StreamItem};
|
||||||
use crate::binned::{PushableIndex, RangeCompletableItem, StreamKind, WithLen, WithTimestamps};
|
use crate::binned::{EventsNodeProcessor, PushableIndex, RangeCompletableItem, StreamKind, WithLen, WithTimestamps};
|
||||||
use crate::streamlog::LogItem;
|
use crate::streamlog::LogItem;
|
||||||
|
use crate::Sitemty;
|
||||||
use err::Error;
|
use err::Error;
|
||||||
use futures_core::Stream;
|
use futures_core::Stream;
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
@@ -18,6 +19,232 @@ enum MergedCurVal<T> {
|
|||||||
Val(T),
|
Val(T),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO rename after refactor
|
||||||
|
pub struct MergedStream2<S, ENP>
|
||||||
|
where
|
||||||
|
S: Stream<Item = Sitemty<<ENP as EventsNodeProcessor>::Output>>,
|
||||||
|
ENP: EventsNodeProcessor,
|
||||||
|
{
|
||||||
|
inps: Vec<S>,
|
||||||
|
current: Vec<MergedCurVal<<ENP as EventsNodeProcessor>::Output>>,
|
||||||
|
ixs: Vec<usize>,
|
||||||
|
errored: bool,
|
||||||
|
completed: bool,
|
||||||
|
batch: <ENP as EventsNodeProcessor>::Output,
|
||||||
|
ts_last_emit: u64,
|
||||||
|
range_complete_observed: Vec<bool>,
|
||||||
|
range_complete_observed_all: bool,
|
||||||
|
range_complete_observed_all_emitted: bool,
|
||||||
|
data_emit_complete: bool,
|
||||||
|
batch_size: usize,
|
||||||
|
logitems: VecDeque<LogItem>,
|
||||||
|
event_data_read_stats_items: VecDeque<EventDataReadStats>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, ENP> MergedStream2<S, ENP>
|
||||||
|
where
|
||||||
|
S: Stream<Item = Sitemty<<ENP as EventsNodeProcessor>::Output>> + Unpin,
|
||||||
|
ENP: EventsNodeProcessor,
|
||||||
|
<ENP as EventsNodeProcessor>::Output: Appendable,
|
||||||
|
{
|
||||||
|
pub fn new(inps: Vec<S>) -> Self {
|
||||||
|
let n = inps.len();
|
||||||
|
let current = (0..n).into_iter().map(|_| MergedCurVal::None).collect();
|
||||||
|
Self {
|
||||||
|
inps,
|
||||||
|
current: current,
|
||||||
|
ixs: vec![0; n],
|
||||||
|
errored: false,
|
||||||
|
completed: false,
|
||||||
|
batch: <<ENP as EventsNodeProcessor>::Output as Appendable>::empty(),
|
||||||
|
ts_last_emit: 0,
|
||||||
|
range_complete_observed: vec![false; n],
|
||||||
|
range_complete_observed_all: false,
|
||||||
|
range_complete_observed_all_emitted: false,
|
||||||
|
data_emit_complete: false,
|
||||||
|
batch_size: 64,
|
||||||
|
logitems: VecDeque::new(),
|
||||||
|
event_data_read_stats_items: VecDeque::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn replenish(self: &mut Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
|
||||||
|
use Poll::*;
|
||||||
|
let mut pending = 0;
|
||||||
|
for i1 in 0..self.inps.len() {
|
||||||
|
match self.current[i1] {
|
||||||
|
MergedCurVal::None => {
|
||||||
|
'l1: loop {
|
||||||
|
break match self.inps[i1].poll_next_unpin(cx) {
|
||||||
|
Ready(Some(Ok(k))) => match k {
|
||||||
|
StreamItem::Log(item) => {
|
||||||
|
self.logitems.push_back(item);
|
||||||
|
continue 'l1;
|
||||||
|
}
|
||||||
|
StreamItem::Stats(item) => {
|
||||||
|
match item {
|
||||||
|
StatsItem::EventDataReadStats(item) => {
|
||||||
|
self.event_data_read_stats_items.push_back(item);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue 'l1;
|
||||||
|
}
|
||||||
|
StreamItem::DataItem(item) => match item {
|
||||||
|
RangeCompletableItem::RangeComplete => {
|
||||||
|
self.range_complete_observed[i1] = true;
|
||||||
|
let d = self.range_complete_observed.iter().filter(|&&k| k).count();
|
||||||
|
if d == self.range_complete_observed.len() {
|
||||||
|
self.range_complete_observed_all = true;
|
||||||
|
debug!("MergedStream range_complete d {} COMPLETE", d);
|
||||||
|
} else {
|
||||||
|
trace!("MergedStream range_complete d {}", d);
|
||||||
|
}
|
||||||
|
continue 'l1;
|
||||||
|
}
|
||||||
|
RangeCompletableItem::Data(item) => {
|
||||||
|
self.ixs[i1] = 0;
|
||||||
|
self.current[i1] = MergedCurVal::Val(item);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Ready(Some(Err(e))) => {
|
||||||
|
// TODO emit this error, consider this stream as done, anything more to do here?
|
||||||
|
//self.current[i1] = CurVal::Err(e);
|
||||||
|
self.errored = true;
|
||||||
|
return Ready(Err(e));
|
||||||
|
}
|
||||||
|
Ready(None) => {
|
||||||
|
self.current[i1] = MergedCurVal::Finish;
|
||||||
|
}
|
||||||
|
Pending => {
|
||||||
|
pending += 1;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if pending > 0 {
|
||||||
|
Pending
|
||||||
|
} else {
|
||||||
|
Ready(Ok(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, ENP> Stream for MergedStream2<S, ENP>
|
||||||
|
where
|
||||||
|
S: Stream<Item = Sitemty<<ENP as EventsNodeProcessor>::Output>> + Unpin,
|
||||||
|
ENP: EventsNodeProcessor,
|
||||||
|
<ENP as EventsNodeProcessor>::Output: PushableIndex + Appendable,
|
||||||
|
{
|
||||||
|
type Item = Sitemty<<ENP as EventsNodeProcessor>::Output>;
|
||||||
|
|
||||||
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
|
use Poll::*;
|
||||||
|
'outer: loop {
|
||||||
|
break if self.completed {
|
||||||
|
panic!("poll_next on completed");
|
||||||
|
} else if self.errored {
|
||||||
|
self.completed = true;
|
||||||
|
Ready(None)
|
||||||
|
} else if let Some(item) = self.logitems.pop_front() {
|
||||||
|
Ready(Some(Ok(StreamItem::Log(item))))
|
||||||
|
} else if let Some(item) = self.event_data_read_stats_items.pop_front() {
|
||||||
|
Ready(Some(Ok(StreamItem::Stats(StatsItem::EventDataReadStats(item)))))
|
||||||
|
} else if self.data_emit_complete {
|
||||||
|
if self.range_complete_observed_all {
|
||||||
|
if self.range_complete_observed_all_emitted {
|
||||||
|
self.completed = true;
|
||||||
|
Ready(None)
|
||||||
|
} else {
|
||||||
|
self.range_complete_observed_all_emitted = true;
|
||||||
|
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
self.completed = true;
|
||||||
|
Ready(None)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Can only run logic if all streams are either finished, errored or have some current value.
|
||||||
|
match self.replenish(cx) {
|
||||||
|
Ready(Ok(_)) => {
|
||||||
|
let mut lowest_ix = usize::MAX;
|
||||||
|
let mut lowest_ts = u64::MAX;
|
||||||
|
for i1 in 0..self.inps.len() {
|
||||||
|
if let MergedCurVal::Val(val) = &self.current[i1] {
|
||||||
|
let u = self.ixs[i1];
|
||||||
|
if u >= val.len() {
|
||||||
|
self.ixs[i1] = 0;
|
||||||
|
self.current[i1] = MergedCurVal::None;
|
||||||
|
continue 'outer;
|
||||||
|
} else {
|
||||||
|
let ts = val.ts(u);
|
||||||
|
if ts < lowest_ts {
|
||||||
|
lowest_ix = i1;
|
||||||
|
lowest_ts = ts;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if lowest_ix == usize::MAX {
|
||||||
|
if self.batch.len() != 0 {
|
||||||
|
//let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty());
|
||||||
|
//let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k);
|
||||||
|
let emp = <<ENP as EventsNodeProcessor>::Output>::empty();
|
||||||
|
let ret = std::mem::replace(&mut self.batch, emp);
|
||||||
|
self.data_emit_complete = true;
|
||||||
|
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
|
||||||
|
} else {
|
||||||
|
self.data_emit_complete = true;
|
||||||
|
continue 'outer;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
assert!(lowest_ts >= self.ts_last_emit);
|
||||||
|
let emp = <<ENP as EventsNodeProcessor>::Output>::empty();
|
||||||
|
let mut local_batch = std::mem::replace(&mut self.batch, emp);
|
||||||
|
self.ts_last_emit = lowest_ts;
|
||||||
|
let rix = self.ixs[lowest_ix];
|
||||||
|
match &self.current[lowest_ix] {
|
||||||
|
MergedCurVal::Val(val) => {
|
||||||
|
local_batch.push_index(val, rix);
|
||||||
|
}
|
||||||
|
MergedCurVal::None => panic!(),
|
||||||
|
MergedCurVal::Finish => panic!(),
|
||||||
|
}
|
||||||
|
self.batch = local_batch;
|
||||||
|
self.ixs[lowest_ix] += 1;
|
||||||
|
let curlen = match &self.current[lowest_ix] {
|
||||||
|
MergedCurVal::Val(val) => val.len(),
|
||||||
|
MergedCurVal::None => panic!(),
|
||||||
|
MergedCurVal::Finish => panic!(),
|
||||||
|
};
|
||||||
|
if self.ixs[lowest_ix] >= curlen {
|
||||||
|
self.ixs[lowest_ix] = 0;
|
||||||
|
self.current[lowest_ix] = MergedCurVal::None;
|
||||||
|
}
|
||||||
|
if self.batch.len() >= self.batch_size {
|
||||||
|
let emp = <<ENP as EventsNodeProcessor>::Output>::empty();
|
||||||
|
let ret = std::mem::replace(&mut self.batch, emp);
|
||||||
|
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
|
||||||
|
} else {
|
||||||
|
continue 'outer;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ready(Err(e)) => {
|
||||||
|
self.errored = true;
|
||||||
|
Ready(Some(Err(e)))
|
||||||
|
}
|
||||||
|
Pending => Pending,
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO remove after refactor
|
||||||
pub struct MergedStream<S, SK>
|
pub struct MergedStream<S, SK>
|
||||||
where
|
where
|
||||||
S: Stream<Item = Result<StreamItem<RangeCompletableItem<SK::XBinnedEvents>>, Error>> + Unpin,
|
S: Stream<Item = Result<StreamItem<RangeCompletableItem<SK::XBinnedEvents>>, Error>> + Unpin,
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
use crate::binned::EventsNodeProcessor;
|
use crate::agg::streams::Appendable;
|
||||||
|
use crate::binned::{EventsNodeProcessor, PushableIndex};
|
||||||
use crate::frame::makeframe::FrameType;
|
use crate::frame::makeframe::FrameType;
|
||||||
|
use crate::merge::{MergedStream, MergedStream2};
|
||||||
use crate::raw::{x_processed_stream_from_node2, EventsQuery};
|
use crate::raw::{x_processed_stream_from_node2, EventsQuery};
|
||||||
use crate::Sitemty;
|
use crate::Sitemty;
|
||||||
use err::Error;
|
use err::Error;
|
||||||
@@ -31,7 +33,7 @@ where
|
|||||||
<ENP as EventsNodeProcessor>::Output: Unpin,
|
<ENP as EventsNodeProcessor>::Output: Unpin,
|
||||||
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType,
|
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType,
|
||||||
{
|
{
|
||||||
pub fn new(evq: EventsQuery, perf_opts: PerfOpts, cluster: Cluster, stream_kind: ENP) -> Self {
|
pub fn new(evq: EventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self {
|
||||||
let mut tcp_establish_futs = vec![];
|
let mut tcp_establish_futs = vec![];
|
||||||
for node in &cluster.nodes {
|
for node in &cluster.nodes {
|
||||||
let f = x_processed_stream_from_node2::<ENP>(evq.clone(), perf_opts.clone(), node.clone());
|
let f = x_processed_stream_from_node2::<ENP>(evq.clone(), perf_opts.clone(), node.clone());
|
||||||
@@ -51,7 +53,8 @@ where
|
|||||||
|
|
||||||
impl<ENP> Stream for MergedFromRemotes2<ENP>
|
impl<ENP> Stream for MergedFromRemotes2<ENP>
|
||||||
where
|
where
|
||||||
ENP: EventsNodeProcessor,
|
ENP: EventsNodeProcessor + 'static,
|
||||||
|
<ENP as EventsNodeProcessor>::Output: PushableIndex + Appendable,
|
||||||
{
|
{
|
||||||
type Item = Sitemty<<ENP as EventsNodeProcessor>::Output>;
|
type Item = Sitemty<<ENP as EventsNodeProcessor>::Output>;
|
||||||
|
|
||||||
@@ -104,13 +107,8 @@ where
|
|||||||
} else {
|
} else {
|
||||||
if c1 == self.tcp_establish_futs.len() {
|
if c1 == self.tcp_establish_futs.len() {
|
||||||
let inps: Vec<_> = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect();
|
let inps: Vec<_> = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect();
|
||||||
//let s1 = MergedStream::<_, ENP>::new(inps);
|
let s1 = MergedStream2::<_, ENP>::new(inps);
|
||||||
|
self.merged = Some(Box::pin(s1));
|
||||||
// TODO
|
|
||||||
|
|
||||||
err::todo();
|
|
||||||
//let s1 = err::todoval();
|
|
||||||
//self.merged = Some(Box::pin(s1));
|
|
||||||
}
|
}
|
||||||
continue 'outer;
|
continue 'outer;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user