From 2df0be8ed231e43fad7147cab691b5dba26eb231 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Sat, 8 May 2021 14:28:51 +0200 Subject: [PATCH] Remove unused merger --- disk/src/aggtest.rs | 72 ----------------------- disk/src/merge.rs | 140 -------------------------------------------- 2 files changed, 212 deletions(-) diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 027e161..3d218ea 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -1,5 +1,4 @@ use super::agg::IntoDim1F32Stream; -use super::merge::MergeDim1F32Stream; use crate::agg::binnedt::IntoBinnedT; use crate::agg::binnedx::IntoBinnedXBins1; use futures_util::StreamExt; @@ -145,74 +144,3 @@ async fn agg_x_dim_1_inner() { .for_each(|_k| ready(())); fut1.await; } - -#[test] -fn merge_0() { - taskrun::run(async { - merge_0_inner().await; - Ok(()) - }) - .unwrap(); -} - -async fn merge_0_inner() { - let query = netpod::AggQuerySingleChannel { - channel_config: ChannelConfig { - channel: Channel { - backend: "ks".into(), - name: "wave1".into(), - }, - keyspace: 3, - time_bin_size: Nanos { ns: DAY }, - array: true, - shape: Shape::Wave(17), - scalar_type: ScalarType::F64, - big_endian: true, - compression: true, - }, - timebin: 0, - tb_file_count: 1, - buffer_size: 1024 * 8, - }; - let range: NanoRange = err::todoval(); - let streams = (0..13) - .into_iter() - .map(|k| make_test_node(k)) - .map(|node| { - super::eventblobs::EventBlobsComplete::new( - range.clone(), - query.channel_config.clone(), - node.clone(), - query.buffer_size as usize, - ) - .into_dim_1_f32_stream() - }) - .collect(); - MergeDim1F32Stream::new(streams) - .map(|_k| { - //info!("NEXT MERGED ITEM ts {:?}", k.as_ref().unwrap().tss); - }) - .fold(0, |_k, _q| ready(0)) - .await; -} - -pub fn tmp_some_older_things() { - // I want to distinguish already in the outer part between dim-0 and dim-1 and generate - // separate code for these cases... - // That means that also the reading chain itself needs to be typed on that. - // Need to supply some event-payload converter type which has that type as Output type. - // Now the T-binning: - - /* - T-aggregator must be able to produce empty-values of correct type even if we never get - a single value of input data. - Therefore, it needs the bin range definition. - How do I want to drive the system? - If I write the T-binner as a Stream, then I also need to pass it the input! - Meaning, I need to pass the Stream which produces the actual numbers from disk. - - readchannel() -> Stream of timestamped byte blobs - .to_f32() -> Stream ? indirection to branch on the underlying shape - .agg_x_bins_1() -> Stream ? can I keep it at the single indirection on the top level? - */ -} diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 84079f7..37cf04e 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -1,6 +1,4 @@ use crate::agg::eventbatch::{MinMaxAvgScalarEventBatch, MinMaxAvgScalarEventBatchStreamItem}; -use crate::agg::{Dim1F32Stream, Dim1F32StreamItem, ValuesDim1}; -use crate::eventchunker::EventChunkerItem; use crate::streamlog::LogItem; use err::Error; use futures_core::Stream; @@ -12,144 +10,6 @@ use std::task::{Context, Poll}; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; -pub struct MergeDim1F32Stream { - // yields Dim1F32StreamItem - inps: Vec>, - current: Vec, - ixs: Vec, - errored: bool, - completed: bool, - batch: ValuesDim1, -} - -impl MergeDim1F32Stream { - pub fn new(inps: Vec>) -> Self { - let n = inps.len(); - let mut current = vec![]; - for _ in 0..n { - current.push(CurVal::None); - } - Self { - inps, - current: current, - ixs: vec![0; n], - batch: ValuesDim1::empty(), - errored: false, - completed: false, - } - } -} - -impl Stream for MergeDim1F32Stream -where - S: Stream> + Unpin, -{ - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - // TODO rewrite making the break the default and explicit continue. - 'outer: loop { - if self.completed { - panic!("poll on complete stream"); - } - if self.errored { - self.completed = true; - return Ready(None); - } - // can only run logic if all streams are either finished, errored or have some current value. - for i1 in 0..self.inps.len() { - match self.current[i1] { - CurVal::None => { - match self.inps[i1].poll_next_unpin(cx) { - Ready(Some(Ok(k))) => { - // TODO do I keep only the values as "current" or also the other kinds of items? - // Can I process the other kinds instantly? - match k { - Dim1F32StreamItem::Values(vals) => { - self.current[i1] = CurVal::Val(vals); - } - Dim1F32StreamItem::RangeComplete => { - todo!(); - } - Dim1F32StreamItem::EventDataReadStats(_stats) => { - todo!(); - } - } - } - Ready(Some(Err(e))) => { - self.current[i1] = CurVal::Err(Error::with_msg(format!( - "MergeDim1F32Stream error from upstream {:?}", - e - ))); - return Ready(Some(Err(e))); - } - Ready(None) => { - self.current[i1] = CurVal::Finish; - } - Pending => { - // TODO is this behaviour correct? - return Pending; - } - } - } - _ => (), - } - } - let mut lowest_ix = usize::MAX; - let mut lowest_ts = u64::MAX; - for i1 in 0..self.inps.len() { - match &self.current[i1] { - CurVal::Finish => {} - CurVal::Val(val) => { - let u = self.ixs[i1]; - if u >= val.tss.len() { - self.ixs[i1] = 0; - self.current[i1] = CurVal::None; - continue 'outer; - } else { - let ts = val.tss[u]; - if ts < lowest_ts { - lowest_ix = i1; - lowest_ts = ts; - } - } - } - _ => panic!(), - } - } - if lowest_ix == usize::MAX { - // TODO all inputs in finished state - break Ready(None); - } else { - //trace!("decided on next lowest ts {} ix {}", lowest_ts, lowest_ix); - self.batch.tss.push(lowest_ts); - let rix = self.ixs[lowest_ix]; - match &mut self.current[lowest_ix] { - CurVal::Val(ref mut k) => { - let k = std::mem::replace(&mut k.values[rix], vec![]); - self.batch.values.push(k); - } - _ => panic!(), - } - self.ixs[lowest_ix] += 1; - } - if self.batch.tss.len() >= 64 { - let k = std::mem::replace(&mut self.batch, ValuesDim1::empty()); - let ret = Dim1F32StreamItem::Values(k); - break Ready(Some(Ok(ret))); - } - } - } -} - -enum CurVal { - None, - Finish, - Err(Error), - Val(ValuesDim1), -} - pub struct MergedMinMaxAvgScalarStream where S: Stream>,