From 61000f23363cf605df4990d7d688129341b46651 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Sun, 11 Apr 2021 15:26:24 +0200 Subject: [PATCH] Merger for dim-1 --- disk/Cargo.toml | 4 +- disk/src/agg.rs | 84 ++++++++++++++++++++++++------ disk/src/gen.rs | 4 +- disk/src/lib.rs | 1 + disk/src/merge.rs | 129 ++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 202 insertions(+), 20 deletions(-) create mode 100644 disk/src/merge.rs diff --git a/disk/Cargo.toml b/disk/Cargo.toml index cc134b3..7bae80c 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -11,8 +11,8 @@ serde_json = "1.0" async-channel = "1.6" bytes = "1.0.1" byteorder = "1.4.3" -futures-core = "0.3.12" -futures-util = "0.3.13" +futures-core = "0.3.14" +futures-util = "0.3.14" async-stream = "0.3.0" hex = "0.4.3" err = { path = "../err" } diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 281a8ae..aaa6a21 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -7,6 +7,7 @@ use crate::EventFull; use futures_core::Stream; use futures_util::{pin_mut, StreamExt, future::ready}; use netpod::{Channel, ChannelConfig, ScalarType, Shape, Node, timeunits::*}; +use crate::merge::MergeDim1F32Stream; pub trait AggregatorTdim { type InputValue; @@ -112,8 +113,19 @@ impl AggregatableXdim1Bin for ValuesDim1 { pub struct ValuesDim1 { - tss: Vec, - values: Vec>, + pub tss: Vec, + pub values: Vec>, +} + +impl ValuesDim1 { + + pub fn empty() -> Self { + Self { + tss: vec![], + values: vec![], + } + } + } impl std::fmt::Debug for ValuesDim1 { @@ -757,19 +769,24 @@ pub struct TimeRange { } +pub fn make_test_node(ix: u8) -> Node { + Node { + host: "localhost".into(), + port: 8800 + ix as u16, + data_base_path: format!("../tmpdata/node{:02}", ix).into(), + split: ix, + ksprefix: "ks".into(), + } +} + + #[test] fn agg_x_dim_0() { taskrun::run(async { agg_x_dim_0_inner().await; Ok(()) }).unwrap(); } async fn agg_x_dim_0_inner() { - let node = Node { - host: "localhost".into(), - port: 8888, - data_base_path: "../tmpdata/node0".into(), - split: 0, - ksprefix: "ks".into(), - }; + let node = make_test_node(0); let query = netpod::AggQuerySingleChannel { channel_config: ChannelConfig { channel: Channel { @@ -823,13 +840,7 @@ async fn agg_x_dim_1_inner() { // sf-databuffer // /data/sf-databuffer/daq_swissfel/daq_swissfel_3/byTime/S10BC01-DBAM070\:BAM_CH1_NORM/* // S10BC01-DBAM070:BAM_CH1_NORM - let node = Node { - host: "localhost".into(), - port: 8888, - data_base_path: "../tmpdata/node0".into(), - split: 0, - ksprefix: "ks".into(), - }; + let node = make_test_node(0); let query = netpod::AggQuerySingleChannel { channel_config: ChannelConfig { channel: Channel { @@ -873,6 +884,47 @@ async fn agg_x_dim_1_inner() { fut1.await; } +#[test] +fn merge_0() { + taskrun::run(async { merge_0_inner().await; Ok(()) }).unwrap(); +} + +async fn merge_0_inner() { + let nodes = vec![ + make_test_node(0), + make_test_node(1), + ]; + let query = netpod::AggQuerySingleChannel { + channel_config: ChannelConfig { + channel: Channel { + backend: "ks".into(), + keyspace: 3, + name: "wave1".into(), + }, + time_bin_size: DAY, + shape: Shape::Wave(1024), + scalar_type: ScalarType::F64, + big_endian: true, + compression: true, + }, + timebin: 0, + tb_file_count: 1, + buffer_size: 17, + }; + let streams: Vec<_> = nodes.into_iter().map(|node| { + crate::EventBlobsComplete::new(&query, &node) + .into_dim_1_f32_stream() + }) + .collect(); + MergeDim1F32Stream::new(streams) + .map(|k| { + //info!("NEXT MERGED ITEM ts {:?}", k.as_ref().unwrap().tss); + }) + .fold(0, |k, q| async { 0 }) + .await; +} + + pub fn tmp_some_older_things() { let vals = ValuesDim1 { diff --git a/disk/src/gen.rs b/disk/src/gen.rs index e066aab..e5866d4 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -53,14 +53,14 @@ pub async fn gen_test_data() -> Result<(), Error> { host: "localhost".into(), port: 7780, split: 0, - data_base_path: data_base_path.join("node0"), + data_base_path: data_base_path.join("node00"), ksprefix: ksprefix.clone(), }; let node1 = Node { host: "localhost".into(), port: 7781, split: 1, - data_base_path: data_base_path.join("node1"), + data_base_path: data_base_path.join("node01"), ksprefix: ksprefix.clone(), }; ensemble.nodes.push(node0); diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 0466f18..1eccbea 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -1,5 +1,6 @@ pub mod agg; pub mod gen; +pub mod merge; #[allow(unused_imports)] use tracing::{error, warn, info, debug, trace}; diff --git a/disk/src/merge.rs b/disk/src/merge.rs new file mode 100644 index 0000000..6ab7eab --- /dev/null +++ b/disk/src/merge.rs @@ -0,0 +1,129 @@ +#[allow(unused_imports)] +use tracing::{error, warn, info, debug, trace}; +use futures_core::Stream; +use err::Error; +use std::task::{Poll, Context}; +use std::pin::Pin; +use crate::agg::{Dim1F32Stream, ValuesDim1}; +use crate::EventFull; +use futures_util::{pin_mut, StreamExt, future::ready}; + +pub struct MergeDim1F32Stream where S: Stream> { + inps: Vec>, + current: Vec, + ixs: Vec, + emitted_complete: bool, + batch: ValuesDim1, +} + +impl MergeDim1F32Stream where S: Stream> { + + pub fn new(inps: Vec>) -> Self { + let n = inps.len(); + let mut current = vec![]; + for _ in 0..n { + current.push(CurVal::None); + } + Self { + inps, + current: current, + ixs: vec![0; n], + emitted_complete: false, + batch: ValuesDim1::empty(), + } + } + +} + +impl Stream for MergeDim1F32Stream where S: Stream> + Unpin { + //type Item = ::Item; + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + 'outer: loop { + if self.emitted_complete { + panic!("poll on complete stream"); + } + // can only run logic if all streams are either finished, errored or have some current value. + for i1 in 0..self.inps.len() { + match self.current[i1] { + CurVal::None => { + match self.inps[i1].poll_next_unpin(cx) { + Ready(Some(Ok(k))) => { + self.current[i1] = CurVal::Val(k); + } + Ready(Some(Err(e))) => { + //self.current[i1] = CurVal::Err(e); + return Ready(Some(Err(e))); + // TODO emit this error, consider this stream as done. + todo!() + } + Ready(None) => { + self.current[i1] = CurVal::Finish; + } + Pending => { + // TODO is this behaviour correct? + return Pending; + todo!() + } + } + } + _ => (), + } + } + let mut lowest_ix = usize::MAX; + let mut lowest_ts = u64::MAX; + for i1 in 0..self.inps.len() { + match &self.current[i1] { + CurVal::Finish => {} + CurVal::Val(val) => { + let u = self.ixs[i1]; + if u >= val.tss.len() { + self.ixs[i1] = 0; + self.current[i1] = CurVal::None; + continue 'outer; + } + else { + let ts = val.tss[u]; + if ts < lowest_ts { + lowest_ix = i1; + lowest_ts = ts; + } + } + } + _ => panic!() + } + } + if lowest_ix == usize::MAX { + // TODO all inputs in finished state + break Ready(None); + } + else { + //trace!("decided on next lowest ts {} ix {}", lowest_ts, lowest_ix); + self.batch.tss.push(lowest_ts); + let rix = self.ixs[lowest_ix]; + match &mut self.current[lowest_ix] { + CurVal::Val(ref mut k) => { + let k = std::mem::replace(&mut k.values[rix], vec![]); + self.batch.values.push(k); + } + _ => panic!() + } + self.ixs[lowest_ix] += 1; + } + if self.batch.tss.len() >= 64 { + let k = std::mem::replace(&mut self.batch, ValuesDim1::empty()); + break Ready(Some(Ok(k))); + } + } + } + +} + +enum CurVal { + None, + Finish, + Err(Error), + Val(ValuesDim1), +}