Rename merger
This commit is contained in:
@@ -2,7 +2,7 @@ use crate::agg::streams::StreamItem;
|
|||||||
use crate::binned::{RangeCompletableItem, StreamKind};
|
use crate::binned::{RangeCompletableItem, StreamKind};
|
||||||
use crate::cache::pbv::PreBinnedValueByteStream;
|
use crate::cache::pbv::PreBinnedValueByteStream;
|
||||||
use crate::frame::makeframe::FrameType;
|
use crate::frame::makeframe::FrameType;
|
||||||
use crate::merge::MergedMinMaxAvgScalarStream;
|
use crate::merge::MergedStream;
|
||||||
use crate::raw::EventsQuery;
|
use crate::raw::EventsQuery;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use chrono::{DateTime, TimeZone, Utc};
|
use chrono::{DateTime, TimeZone, Utc};
|
||||||
@@ -488,7 +488,7 @@ where
|
|||||||
if c1 == self.tcp_establish_futs.len() {
|
if c1 == self.tcp_establish_futs.len() {
|
||||||
debug!("MergedFromRemotes setting up merged stream");
|
debug!("MergedFromRemotes setting up merged stream");
|
||||||
let inps = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect();
|
let inps = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect();
|
||||||
let s1 = MergedMinMaxAvgScalarStream::<_, SK>::new(inps);
|
let s1 = MergedStream::<_, SK>::new(inps);
|
||||||
self.merged = Some(Box::pin(s1));
|
self.merged = Some(Box::pin(s1));
|
||||||
} else {
|
} else {
|
||||||
debug!(
|
debug!(
|
||||||
|
|||||||
@@ -10,7 +10,13 @@ use std::collections::VecDeque;
|
|||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
pub struct MergedMinMaxAvgScalarStream<S, SK>
|
enum MergedCurVal<T> {
|
||||||
|
None,
|
||||||
|
Finish,
|
||||||
|
Val(T),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct MergedStream<S, SK>
|
||||||
where
|
where
|
||||||
S: Stream<Item = Result<StreamItem<RangeCompletableItem<SK::XBinnedEvents>>, Error>> + Unpin,
|
S: Stream<Item = Result<StreamItem<RangeCompletableItem<SK::XBinnedEvents>>, Error>> + Unpin,
|
||||||
SK: StreamKind,
|
SK: StreamKind,
|
||||||
@@ -31,7 +37,7 @@ where
|
|||||||
event_data_read_stats_items: VecDeque<EventDataReadStats>,
|
event_data_read_stats_items: VecDeque<EventDataReadStats>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S, SK> MergedMinMaxAvgScalarStream<S, SK>
|
impl<S, SK> MergedStream<S, SK>
|
||||||
where
|
where
|
||||||
S: Stream<Item = Result<StreamItem<RangeCompletableItem<SK::XBinnedEvents>>, Error>> + Unpin,
|
S: Stream<Item = Result<StreamItem<RangeCompletableItem<SK::XBinnedEvents>>, Error>> + Unpin,
|
||||||
SK: StreamKind,
|
SK: StreamKind,
|
||||||
@@ -122,8 +128,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO change name, it is generic now:
|
impl<S, SK> Stream for MergedStream<S, SK>
|
||||||
impl<S, SK> Stream for MergedMinMaxAvgScalarStream<S, SK>
|
|
||||||
where
|
where
|
||||||
S: Stream<Item = Result<StreamItem<RangeCompletableItem<<SK as StreamKind>::XBinnedEvents>>, Error>> + Unpin,
|
S: Stream<Item = Result<StreamItem<RangeCompletableItem<<SK as StreamKind>::XBinnedEvents>>, Error>> + Unpin,
|
||||||
SK: StreamKind,
|
SK: StreamKind,
|
||||||
@@ -213,18 +218,7 @@ where
|
|||||||
self.ixs[lowest_ix] = 0;
|
self.ixs[lowest_ix] = 0;
|
||||||
self.current[lowest_ix] = MergedCurVal::None;
|
self.current[lowest_ix] = MergedCurVal::None;
|
||||||
}
|
}
|
||||||
//self.batch.tss.push(lowest_ts);
|
|
||||||
/*let z = match &self.current[lowest_ix] {
|
|
||||||
MergedCurVal::Val(k) => (k.mins[rix], k.maxs[rix], k.avgs[rix], k.tss.len()),
|
|
||||||
_ => panic!(),
|
|
||||||
};
|
|
||||||
self.batch.mins.push(z.0);
|
|
||||||
self.batch.maxs.push(z.1);
|
|
||||||
self.batch.avgs.push(z.2);
|
|
||||||
*/
|
|
||||||
if self.batch.len() >= self.batch_size {
|
if self.batch.len() >= self.batch_size {
|
||||||
//let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty());
|
|
||||||
//let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k);
|
|
||||||
let emp = <<SK as StreamKind>::XBinnedEvents as Appendable>::empty();
|
let emp = <<SK as StreamKind>::XBinnedEvents as Appendable>::empty();
|
||||||
let ret = std::mem::replace(&mut self.batch, emp);
|
let ret = std::mem::replace(&mut self.batch, emp);
|
||||||
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
|
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
|
||||||
@@ -243,9 +237,3 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
enum MergedCurVal<T> {
|
|
||||||
None,
|
|
||||||
Finish,
|
|
||||||
Val(T),
|
|
||||||
}
|
|
||||||
|
|||||||
Reference in New Issue
Block a user