Merger for dim-1
This commit is contained in:
@@ -11,8 +11,8 @@ serde_json = "1.0"
|
||||
async-channel = "1.6"
|
||||
bytes = "1.0.1"
|
||||
byteorder = "1.4.3"
|
||||
futures-core = "0.3.12"
|
||||
futures-util = "0.3.13"
|
||||
futures-core = "0.3.14"
|
||||
futures-util = "0.3.14"
|
||||
async-stream = "0.3.0"
|
||||
hex = "0.4.3"
|
||||
err = { path = "../err" }
|
||||
|
||||
@@ -7,6 +7,7 @@ use crate::EventFull;
|
||||
use futures_core::Stream;
|
||||
use futures_util::{pin_mut, StreamExt, future::ready};
|
||||
use netpod::{Channel, ChannelConfig, ScalarType, Shape, Node, timeunits::*};
|
||||
use crate::merge::MergeDim1F32Stream;
|
||||
|
||||
pub trait AggregatorTdim {
|
||||
type InputValue;
|
||||
@@ -112,8 +113,19 @@ impl AggregatableXdim1Bin for ValuesDim1 {
|
||||
|
||||
|
||||
pub struct ValuesDim1 {
|
||||
tss: Vec<u64>,
|
||||
values: Vec<Vec<f32>>,
|
||||
pub tss: Vec<u64>,
|
||||
pub values: Vec<Vec<f32>>,
|
||||
}
|
||||
|
||||
impl ValuesDim1 {
|
||||
|
||||
pub fn empty() -> Self {
|
||||
Self {
|
||||
tss: vec![],
|
||||
values: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ValuesDim1 {
|
||||
@@ -757,19 +769,24 @@ pub struct TimeRange {
|
||||
}
|
||||
|
||||
|
||||
pub fn make_test_node(ix: u8) -> Node {
|
||||
Node {
|
||||
host: "localhost".into(),
|
||||
port: 8800 + ix as u16,
|
||||
data_base_path: format!("../tmpdata/node{:02}", ix).into(),
|
||||
split: ix,
|
||||
ksprefix: "ks".into(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[test]
|
||||
fn agg_x_dim_0() {
|
||||
taskrun::run(async { agg_x_dim_0_inner().await; Ok(()) }).unwrap();
|
||||
}
|
||||
|
||||
async fn agg_x_dim_0_inner() {
|
||||
let node = Node {
|
||||
host: "localhost".into(),
|
||||
port: 8888,
|
||||
data_base_path: "../tmpdata/node0".into(),
|
||||
split: 0,
|
||||
ksprefix: "ks".into(),
|
||||
};
|
||||
let node = make_test_node(0);
|
||||
let query = netpod::AggQuerySingleChannel {
|
||||
channel_config: ChannelConfig {
|
||||
channel: Channel {
|
||||
@@ -823,13 +840,7 @@ async fn agg_x_dim_1_inner() {
|
||||
// sf-databuffer
|
||||
// /data/sf-databuffer/daq_swissfel/daq_swissfel_3/byTime/S10BC01-DBAM070\:BAM_CH1_NORM/*
|
||||
// S10BC01-DBAM070:BAM_CH1_NORM
|
||||
let node = Node {
|
||||
host: "localhost".into(),
|
||||
port: 8888,
|
||||
data_base_path: "../tmpdata/node0".into(),
|
||||
split: 0,
|
||||
ksprefix: "ks".into(),
|
||||
};
|
||||
let node = make_test_node(0);
|
||||
let query = netpod::AggQuerySingleChannel {
|
||||
channel_config: ChannelConfig {
|
||||
channel: Channel {
|
||||
@@ -873,6 +884,47 @@ async fn agg_x_dim_1_inner() {
|
||||
fut1.await;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn merge_0() {
|
||||
taskrun::run(async { merge_0_inner().await; Ok(()) }).unwrap();
|
||||
}
|
||||
|
||||
async fn merge_0_inner() {
|
||||
let nodes = vec![
|
||||
make_test_node(0),
|
||||
make_test_node(1),
|
||||
];
|
||||
let query = netpod::AggQuerySingleChannel {
|
||||
channel_config: ChannelConfig {
|
||||
channel: Channel {
|
||||
backend: "ks".into(),
|
||||
keyspace: 3,
|
||||
name: "wave1".into(),
|
||||
},
|
||||
time_bin_size: DAY,
|
||||
shape: Shape::Wave(1024),
|
||||
scalar_type: ScalarType::F64,
|
||||
big_endian: true,
|
||||
compression: true,
|
||||
},
|
||||
timebin: 0,
|
||||
tb_file_count: 1,
|
||||
buffer_size: 17,
|
||||
};
|
||||
let streams: Vec<_> = nodes.into_iter().map(|node| {
|
||||
crate::EventBlobsComplete::new(&query, &node)
|
||||
.into_dim_1_f32_stream()
|
||||
})
|
||||
.collect();
|
||||
MergeDim1F32Stream::new(streams)
|
||||
.map(|k| {
|
||||
//info!("NEXT MERGED ITEM ts {:?}", k.as_ref().unwrap().tss);
|
||||
})
|
||||
.fold(0, |k, q| async { 0 })
|
||||
.await;
|
||||
}
|
||||
|
||||
|
||||
|
||||
pub fn tmp_some_older_things() {
|
||||
let vals = ValuesDim1 {
|
||||
|
||||
@@ -53,14 +53,14 @@ pub async fn gen_test_data() -> Result<(), Error> {
|
||||
host: "localhost".into(),
|
||||
port: 7780,
|
||||
split: 0,
|
||||
data_base_path: data_base_path.join("node0"),
|
||||
data_base_path: data_base_path.join("node00"),
|
||||
ksprefix: ksprefix.clone(),
|
||||
};
|
||||
let node1 = Node {
|
||||
host: "localhost".into(),
|
||||
port: 7781,
|
||||
split: 1,
|
||||
data_base_path: data_base_path.join("node1"),
|
||||
data_base_path: data_base_path.join("node01"),
|
||||
ksprefix: ksprefix.clone(),
|
||||
};
|
||||
ensemble.nodes.push(node0);
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
pub mod agg;
|
||||
pub mod gen;
|
||||
pub mod merge;
|
||||
|
||||
#[allow(unused_imports)]
|
||||
use tracing::{error, warn, info, debug, trace};
|
||||
|
||||
129
disk/src/merge.rs
Normal file
129
disk/src/merge.rs
Normal file
@@ -0,0 +1,129 @@
|
||||
#[allow(unused_imports)]
|
||||
use tracing::{error, warn, info, debug, trace};
|
||||
use futures_core::Stream;
|
||||
use err::Error;
|
||||
use std::task::{Poll, Context};
|
||||
use std::pin::Pin;
|
||||
use crate::agg::{Dim1F32Stream, ValuesDim1};
|
||||
use crate::EventFull;
|
||||
use futures_util::{pin_mut, StreamExt, future::ready};
|
||||
|
||||
pub struct MergeDim1F32Stream<S> where S: Stream<Item=Result<EventFull, Error>> {
|
||||
inps: Vec<Dim1F32Stream<S>>,
|
||||
current: Vec<CurVal>,
|
||||
ixs: Vec<usize>,
|
||||
emitted_complete: bool,
|
||||
batch: ValuesDim1,
|
||||
}
|
||||
|
||||
impl<S> MergeDim1F32Stream<S> where S: Stream<Item=Result<EventFull, Error>> {
|
||||
|
||||
pub fn new(inps: Vec<Dim1F32Stream<S>>) -> Self {
|
||||
let n = inps.len();
|
||||
let mut current = vec![];
|
||||
for _ in 0..n {
|
||||
current.push(CurVal::None);
|
||||
}
|
||||
Self {
|
||||
inps,
|
||||
current: current,
|
||||
ixs: vec![0; n],
|
||||
emitted_complete: false,
|
||||
batch: ValuesDim1::empty(),
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl<S> Stream for MergeDim1F32Stream<S> where S: Stream<Item=Result<EventFull, Error>> + Unpin {
|
||||
//type Item = <Dim1F32Stream as Stream>::Item;
|
||||
type Item = Result<ValuesDim1, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
'outer: loop {
|
||||
if self.emitted_complete {
|
||||
panic!("poll on complete stream");
|
||||
}
|
||||
// can only run logic if all streams are either finished, errored or have some current value.
|
||||
for i1 in 0..self.inps.len() {
|
||||
match self.current[i1] {
|
||||
CurVal::None => {
|
||||
match self.inps[i1].poll_next_unpin(cx) {
|
||||
Ready(Some(Ok(k))) => {
|
||||
self.current[i1] = CurVal::Val(k);
|
||||
}
|
||||
Ready(Some(Err(e))) => {
|
||||
//self.current[i1] = CurVal::Err(e);
|
||||
return Ready(Some(Err(e)));
|
||||
// TODO emit this error, consider this stream as done.
|
||||
todo!()
|
||||
}
|
||||
Ready(None) => {
|
||||
self.current[i1] = CurVal::Finish;
|
||||
}
|
||||
Pending => {
|
||||
// TODO is this behaviour correct?
|
||||
return Pending;
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
let mut lowest_ix = usize::MAX;
|
||||
let mut lowest_ts = u64::MAX;
|
||||
for i1 in 0..self.inps.len() {
|
||||
match &self.current[i1] {
|
||||
CurVal::Finish => {}
|
||||
CurVal::Val(val) => {
|
||||
let u = self.ixs[i1];
|
||||
if u >= val.tss.len() {
|
||||
self.ixs[i1] = 0;
|
||||
self.current[i1] = CurVal::None;
|
||||
continue 'outer;
|
||||
}
|
||||
else {
|
||||
let ts = val.tss[u];
|
||||
if ts < lowest_ts {
|
||||
lowest_ix = i1;
|
||||
lowest_ts = ts;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => panic!()
|
||||
}
|
||||
}
|
||||
if lowest_ix == usize::MAX {
|
||||
// TODO all inputs in finished state
|
||||
break Ready(None);
|
||||
}
|
||||
else {
|
||||
//trace!("decided on next lowest ts {} ix {}", lowest_ts, lowest_ix);
|
||||
self.batch.tss.push(lowest_ts);
|
||||
let rix = self.ixs[lowest_ix];
|
||||
match &mut self.current[lowest_ix] {
|
||||
CurVal::Val(ref mut k) => {
|
||||
let k = std::mem::replace(&mut k.values[rix], vec![]);
|
||||
self.batch.values.push(k);
|
||||
}
|
||||
_ => panic!()
|
||||
}
|
||||
self.ixs[lowest_ix] += 1;
|
||||
}
|
||||
if self.batch.tss.len() >= 64 {
|
||||
let k = std::mem::replace(&mut self.batch, ValuesDim1::empty());
|
||||
break Ready(Some(Ok(k)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
enum CurVal {
|
||||
None,
|
||||
Finish,
|
||||
Err(Error),
|
||||
Val(ValuesDim1),
|
||||
}
|
||||
Reference in New Issue
Block a user