Remove unused merger
This commit is contained in:
@@ -1,5 +1,4 @@
|
|||||||
use super::agg::IntoDim1F32Stream;
|
use super::agg::IntoDim1F32Stream;
|
||||||
use super::merge::MergeDim1F32Stream;
|
|
||||||
use crate::agg::binnedt::IntoBinnedT;
|
use crate::agg::binnedt::IntoBinnedT;
|
||||||
use crate::agg::binnedx::IntoBinnedXBins1;
|
use crate::agg::binnedx::IntoBinnedXBins1;
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
@@ -145,74 +144,3 @@ async fn agg_x_dim_1_inner() {
|
|||||||
.for_each(|_k| ready(()));
|
.for_each(|_k| ready(()));
|
||||||
fut1.await;
|
fut1.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn merge_0() {
|
|
||||||
taskrun::run(async {
|
|
||||||
merge_0_inner().await;
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn merge_0_inner() {
|
|
||||||
let query = netpod::AggQuerySingleChannel {
|
|
||||||
channel_config: ChannelConfig {
|
|
||||||
channel: Channel {
|
|
||||||
backend: "ks".into(),
|
|
||||||
name: "wave1".into(),
|
|
||||||
},
|
|
||||||
keyspace: 3,
|
|
||||||
time_bin_size: Nanos { ns: DAY },
|
|
||||||
array: true,
|
|
||||||
shape: Shape::Wave(17),
|
|
||||||
scalar_type: ScalarType::F64,
|
|
||||||
big_endian: true,
|
|
||||||
compression: true,
|
|
||||||
},
|
|
||||||
timebin: 0,
|
|
||||||
tb_file_count: 1,
|
|
||||||
buffer_size: 1024 * 8,
|
|
||||||
};
|
|
||||||
let range: NanoRange = err::todoval();
|
|
||||||
let streams = (0..13)
|
|
||||||
.into_iter()
|
|
||||||
.map(|k| make_test_node(k))
|
|
||||||
.map(|node| {
|
|
||||||
super::eventblobs::EventBlobsComplete::new(
|
|
||||||
range.clone(),
|
|
||||||
query.channel_config.clone(),
|
|
||||||
node.clone(),
|
|
||||||
query.buffer_size as usize,
|
|
||||||
)
|
|
||||||
.into_dim_1_f32_stream()
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
MergeDim1F32Stream::new(streams)
|
|
||||||
.map(|_k| {
|
|
||||||
//info!("NEXT MERGED ITEM ts {:?}", k.as_ref().unwrap().tss);
|
|
||||||
})
|
|
||||||
.fold(0, |_k, _q| ready(0))
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn tmp_some_older_things() {
|
|
||||||
// I want to distinguish already in the outer part between dim-0 and dim-1 and generate
|
|
||||||
// separate code for these cases...
|
|
||||||
// That means that also the reading chain itself needs to be typed on that.
|
|
||||||
// Need to supply some event-payload converter type which has that type as Output type.
|
|
||||||
// Now the T-binning:
|
|
||||||
|
|
||||||
/*
|
|
||||||
T-aggregator must be able to produce empty-values of correct type even if we never get
|
|
||||||
a single value of input data.
|
|
||||||
Therefore, it needs the bin range definition.
|
|
||||||
How do I want to drive the system?
|
|
||||||
If I write the T-binner as a Stream, then I also need to pass it the input!
|
|
||||||
Meaning, I need to pass the Stream which produces the actual numbers from disk.
|
|
||||||
|
|
||||||
readchannel() -> Stream of timestamped byte blobs
|
|
||||||
.to_f32() -> Stream ? indirection to branch on the underlying shape
|
|
||||||
.agg_x_bins_1() -> Stream ? can I keep it at the single indirection on the top level?
|
|
||||||
*/
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -1,6 +1,4 @@
|
|||||||
use crate::agg::eventbatch::{MinMaxAvgScalarEventBatch, MinMaxAvgScalarEventBatchStreamItem};
|
use crate::agg::eventbatch::{MinMaxAvgScalarEventBatch, MinMaxAvgScalarEventBatchStreamItem};
|
||||||
use crate::agg::{Dim1F32Stream, Dim1F32StreamItem, ValuesDim1};
|
|
||||||
use crate::eventchunker::EventChunkerItem;
|
|
||||||
use crate::streamlog::LogItem;
|
use crate::streamlog::LogItem;
|
||||||
use err::Error;
|
use err::Error;
|
||||||
use futures_core::Stream;
|
use futures_core::Stream;
|
||||||
@@ -12,144 +10,6 @@ use std::task::{Context, Poll};
|
|||||||
#[allow(unused_imports)]
|
#[allow(unused_imports)]
|
||||||
use tracing::{debug, error, info, trace, warn};
|
use tracing::{debug, error, info, trace, warn};
|
||||||
|
|
||||||
pub struct MergeDim1F32Stream<S> {
|
|
||||||
// yields Dim1F32StreamItem
|
|
||||||
inps: Vec<Dim1F32Stream<S>>,
|
|
||||||
current: Vec<CurVal>,
|
|
||||||
ixs: Vec<usize>,
|
|
||||||
errored: bool,
|
|
||||||
completed: bool,
|
|
||||||
batch: ValuesDim1,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S> MergeDim1F32Stream<S> {
|
|
||||||
pub fn new(inps: Vec<Dim1F32Stream<S>>) -> Self {
|
|
||||||
let n = inps.len();
|
|
||||||
let mut current = vec![];
|
|
||||||
for _ in 0..n {
|
|
||||||
current.push(CurVal::None);
|
|
||||||
}
|
|
||||||
Self {
|
|
||||||
inps,
|
|
||||||
current: current,
|
|
||||||
ixs: vec![0; n],
|
|
||||||
batch: ValuesDim1::empty(),
|
|
||||||
errored: false,
|
|
||||||
completed: false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S> Stream for MergeDim1F32Stream<S>
|
|
||||||
where
|
|
||||||
S: Stream<Item = Result<EventChunkerItem, Error>> + Unpin,
|
|
||||||
{
|
|
||||||
type Item = Result<Dim1F32StreamItem, Error>;
|
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
|
||||||
use Poll::*;
|
|
||||||
// TODO rewrite making the break the default and explicit continue.
|
|
||||||
'outer: loop {
|
|
||||||
if self.completed {
|
|
||||||
panic!("poll on complete stream");
|
|
||||||
}
|
|
||||||
if self.errored {
|
|
||||||
self.completed = true;
|
|
||||||
return Ready(None);
|
|
||||||
}
|
|
||||||
// can only run logic if all streams are either finished, errored or have some current value.
|
|
||||||
for i1 in 0..self.inps.len() {
|
|
||||||
match self.current[i1] {
|
|
||||||
CurVal::None => {
|
|
||||||
match self.inps[i1].poll_next_unpin(cx) {
|
|
||||||
Ready(Some(Ok(k))) => {
|
|
||||||
// TODO do I keep only the values as "current" or also the other kinds of items?
|
|
||||||
// Can I process the other kinds instantly?
|
|
||||||
match k {
|
|
||||||
Dim1F32StreamItem::Values(vals) => {
|
|
||||||
self.current[i1] = CurVal::Val(vals);
|
|
||||||
}
|
|
||||||
Dim1F32StreamItem::RangeComplete => {
|
|
||||||
todo!();
|
|
||||||
}
|
|
||||||
Dim1F32StreamItem::EventDataReadStats(_stats) => {
|
|
||||||
todo!();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ready(Some(Err(e))) => {
|
|
||||||
self.current[i1] = CurVal::Err(Error::with_msg(format!(
|
|
||||||
"MergeDim1F32Stream error from upstream {:?}",
|
|
||||||
e
|
|
||||||
)));
|
|
||||||
return Ready(Some(Err(e)));
|
|
||||||
}
|
|
||||||
Ready(None) => {
|
|
||||||
self.current[i1] = CurVal::Finish;
|
|
||||||
}
|
|
||||||
Pending => {
|
|
||||||
// TODO is this behaviour correct?
|
|
||||||
return Pending;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => (),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let mut lowest_ix = usize::MAX;
|
|
||||||
let mut lowest_ts = u64::MAX;
|
|
||||||
for i1 in 0..self.inps.len() {
|
|
||||||
match &self.current[i1] {
|
|
||||||
CurVal::Finish => {}
|
|
||||||
CurVal::Val(val) => {
|
|
||||||
let u = self.ixs[i1];
|
|
||||||
if u >= val.tss.len() {
|
|
||||||
self.ixs[i1] = 0;
|
|
||||||
self.current[i1] = CurVal::None;
|
|
||||||
continue 'outer;
|
|
||||||
} else {
|
|
||||||
let ts = val.tss[u];
|
|
||||||
if ts < lowest_ts {
|
|
||||||
lowest_ix = i1;
|
|
||||||
lowest_ts = ts;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => panic!(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if lowest_ix == usize::MAX {
|
|
||||||
// TODO all inputs in finished state
|
|
||||||
break Ready(None);
|
|
||||||
} else {
|
|
||||||
//trace!("decided on next lowest ts {} ix {}", lowest_ts, lowest_ix);
|
|
||||||
self.batch.tss.push(lowest_ts);
|
|
||||||
let rix = self.ixs[lowest_ix];
|
|
||||||
match &mut self.current[lowest_ix] {
|
|
||||||
CurVal::Val(ref mut k) => {
|
|
||||||
let k = std::mem::replace(&mut k.values[rix], vec![]);
|
|
||||||
self.batch.values.push(k);
|
|
||||||
}
|
|
||||||
_ => panic!(),
|
|
||||||
}
|
|
||||||
self.ixs[lowest_ix] += 1;
|
|
||||||
}
|
|
||||||
if self.batch.tss.len() >= 64 {
|
|
||||||
let k = std::mem::replace(&mut self.batch, ValuesDim1::empty());
|
|
||||||
let ret = Dim1F32StreamItem::Values(k);
|
|
||||||
break Ready(Some(Ok(ret)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
enum CurVal {
|
|
||||||
None,
|
|
||||||
Finish,
|
|
||||||
Err(Error),
|
|
||||||
Val(ValuesDim1),
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct MergedMinMaxAvgScalarStream<S>
|
pub struct MergedMinMaxAvgScalarStream<S>
|
||||||
where
|
where
|
||||||
S: Stream<Item = Result<MinMaxAvgScalarEventBatchStreamItem, Error>>,
|
S: Stream<Item = Result<MinMaxAvgScalarEventBatchStreamItem, Error>>,
|
||||||
|
|||||||
Reference in New Issue
Block a user