Count edge events

This commit is contained in:
Dominik Werder
2021-09-03 21:57:05 +02:00
parent d9fe5259bd
commit 0a05082da8
22 changed files with 544 additions and 175 deletions

View File

@@ -112,18 +112,13 @@ where
}
// TODO handle unwrap error, or use a mem replace type instead of option:
fn cycle_current_bin(&mut self) {
fn cycle_current_bin(&mut self, expand: bool) {
self.curbin += 1;
let range = self.spec.get_range(self.curbin);
let ret = self
.aggtor
.replace(<TBT as TimeBinnableType>::aggregator(
range,
self.x_bin_count,
self.do_time_weight,
))
.as_mut()
.unwrap()
.result();
.result_reset(self.spec.get_range(self.curbin), expand);
// TODO should we accumulate bins before emit? Maybe not, we want to stay responsive.
// Only if the frequency would be high, that would require cpu time checks. Worth it? Measure..
self.tmp_agg_results.push_back(ret);
@@ -159,7 +154,7 @@ where
} else if item.starts_after(ag.range().clone()) {
self.left =
Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))));
self.cycle_current_bin();
self.cycle_current_bin(true);
// TODO cycle_current_bin enqueues the bin, can I return here instead?
None
} else {
@@ -167,7 +162,7 @@ where
if item.ends_after(ag.range().clone()) {
self.left =
Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))));
self.cycle_current_bin();
self.cycle_current_bin(true);
}
// TODO cycle_current_bin enqueues the bin, can I return here instead?
None
@@ -185,7 +180,7 @@ where
if self.all_bins_emitted {
None
} else {
self.cycle_current_bin();
self.cycle_current_bin(false);
// TODO cycle_current_bin enqueues the bin, can I return here instead?
None
}

View File

@@ -61,6 +61,7 @@ async fn agg_x_dim_0_inner() {
0,
query.buffer_size as usize,
event_chunker_conf,
false,
);
let _ = fut1;
// TODO add the binning and expectation and await the result.
@@ -110,6 +111,7 @@ async fn agg_x_dim_1_inner() {
0,
query.buffer_size as usize,
event_chunker_conf,
false,
);
let _ = fut1;
// TODO add the binning and expectation and await the result.

View File

@@ -27,6 +27,8 @@ pub struct EventChunkerMultifile {
files_count: u32,
node_ix: usize,
seen_before_range_count: usize,
seen_after_range_count: usize,
expand: bool,
}
impl EventChunkerMultifile {
@@ -37,8 +39,9 @@ impl EventChunkerMultifile {
node_ix: usize,
buffer_size: usize,
event_chunker_conf: EventChunkerConf,
expand: bool,
) -> Self {
let file_chan = if true {
let file_chan = if expand {
open_expanded_files(&range, &channel_config, node)
} else {
open_files(&range, &channel_config, node)
@@ -57,6 +60,8 @@ impl EventChunkerMultifile {
files_count: 0,
node_ix,
seen_before_range_count: 0,
seen_after_range_count: 0,
expand,
}
}
@@ -113,6 +118,7 @@ impl Stream for EventChunkerMultifile {
self.event_chunker_conf.clone(),
path,
self.max_ts.clone(),
self.expand,
);
self.evs = Some(chunker);
}
@@ -174,8 +180,15 @@ fn read_expanded_for_range(range: netpod::NanoRange) -> Result<(usize, usize), E
};
let task = async move {
let mut event_count = 0;
let mut events =
EventChunkerMultifile::new(range, channel_config, node, node_ix, buffer_size, event_chunker_conf);
let mut events = EventChunkerMultifile::new(
range,
channel_config,
node,
node_ix,
buffer_size,
event_chunker_conf,
true,
);
while let Some(item) = events.next().await {
match item {
Ok(item) => match item {

View File

@@ -30,7 +30,9 @@ pub struct EventChunker {
parsed_bytes: u64,
path: PathBuf,
max_ts: Arc<AtomicU64>,
expand: bool,
seen_before_range_count: usize,
seen_after_range_count: usize,
}
enum DataFileState {
@@ -62,6 +64,7 @@ impl EventChunker {
stats_conf: EventChunkerConf,
path: PathBuf,
max_ts: Arc<AtomicU64>,
expand: bool,
) -> Self {
let mut inp = NeedMinBuffer::new(inp);
inp.set_need_min(6);
@@ -81,7 +84,9 @@ impl EventChunker {
parsed_bytes: 0,
path,
max_ts,
expand,
seen_before_range_count: 0,
seen_after_range_count: 0,
}
}
@@ -92,8 +97,9 @@ impl EventChunker {
stats_conf: EventChunkerConf,
path: PathBuf,
max_ts: Arc<AtomicU64>,
expand: bool,
) -> Self {
let mut ret = Self::from_start(inp, channel_config, range, stats_conf, path, max_ts);
let mut ret = Self::from_start(inp, channel_config, range, stats_conf, path, max_ts, expand);
ret.state = DataFileState::Event;
ret.need_min = 4;
ret.inp.set_need_min(4);
@@ -174,9 +180,12 @@ impl EventChunker {
}
self.max_ts.store(ts, Ordering::SeqCst);
if ts >= self.range.end {
self.seen_beyond_range = true;
self.data_emit_complete = true;
break;
self.seen_after_range_count += 1;
if !self.expand || self.seen_after_range_count >= 2 {
self.seen_beyond_range = true;
self.data_emit_complete = true;
break;
}
}
if ts < self.range.beg {
self.seen_before_range_count += 1;
@@ -276,18 +285,13 @@ impl EventChunker {
) {
Ok(c1) => {
assert!(c1 as u32 == k1);
if ts < self.range.beg {
} else if ts >= self.range.end {
Err(Error::with_msg(format!("event after range {}", ts / SEC)))?;
} else {
ret.add_event(
ts,
pulse,
Some(decomp),
ScalarType::from_dtype_index(type_index)?,
is_big_endian,
);
}
ret.add_event(
ts,
pulse,
Some(decomp),
ScalarType::from_dtype_index(type_index)?,
is_big_endian,
);
}
Err(e) => {
Err(Error::with_msg(format!("decompression failed {:?}", e)))?;

View File

@@ -2,7 +2,7 @@ use crate::ChannelConfigExt;
use bitshuffle::bitshuffle_compress;
use bytes::{BufMut, BytesMut};
use err::Error;
use netpod::{timeunits::*, ByteOrder, Channel, ChannelConfig, Node, Shape};
use netpod::{timeunits::*, ByteOrder, Channel, ChannelConfig, GenVar, Node, Shape};
use netpod::{Nanos, ScalarType};
use std::path::{Path, PathBuf};
use tokio::fs::{File, OpenOptions};
@@ -10,9 +10,8 @@ use tokio::io::AsyncWriteExt;
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
//#[test]
#[test]
pub fn gen_test_data_test() {
std::env::set_current_dir("..").unwrap();
taskrun::run(gen_test_data()).unwrap();
}
@@ -38,6 +37,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
array: false,
compression: false,
},
gen_var: netpod::GenVar::Default,
time_spacing: MS * 500,
};
ensemble.channels.push(chn);
@@ -55,6 +55,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
byte_order: ByteOrder::big_endian(),
compression: true,
},
gen_var: netpod::GenVar::Default,
time_spacing: MS * 4000,
};
ensemble.channels.push(chn);
@@ -66,12 +67,49 @@ pub async fn gen_test_data() -> Result<(), Error> {
},
keyspace: 3,
time_bin_size: Nanos { ns: DAY },
array: true,
scalar_type: ScalarType::U16,
shape: Shape::Wave(77),
byte_order: ByteOrder::little_endian(),
shape: Shape::Wave(77),
array: true,
compression: true,
},
gen_var: netpod::GenVar::Default,
time_spacing: MS * 500,
};
ensemble.channels.push(chn);
let chn = ChannelGenProps {
config: ChannelConfig {
channel: Channel {
backend: "testbackend".into(),
name: "tw-scalar-i32-be".into(),
},
keyspace: 2,
time_bin_size: Nanos { ns: DAY },
scalar_type: ScalarType::I32,
byte_order: ByteOrder::little_endian(),
shape: Shape::Scalar,
array: false,
compression: false,
},
gen_var: netpod::GenVar::TimeWeight,
time_spacing: MS * 500,
};
ensemble.channels.push(chn);
let chn = ChannelGenProps {
config: ChannelConfig {
channel: Channel {
backend: "testbackend".into(),
name: "const-regular-scalar-i32-be".into(),
},
keyspace: 2,
time_bin_size: Nanos { ns: DAY },
scalar_type: ScalarType::I32,
byte_order: ByteOrder::little_endian(),
shape: Shape::Scalar,
array: false,
compression: false,
},
gen_var: netpod::GenVar::ConstRegular,
time_spacing: MS * 500,
};
ensemble.channels.push(chn);
@@ -105,6 +143,7 @@ struct Ensemble {
pub struct ChannelGenProps {
config: ChannelConfig,
time_spacing: u64,
gen_var: GenVar,
}
async fn gen_node(node: &Node, ensemble: &Ensemble) -> Result<(), Error> {
@@ -138,6 +177,7 @@ async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) ->
&chn.config,
node,
ensemble,
&chn.gen_var,
)
.await?;
evix = res.evix;
@@ -279,6 +319,7 @@ async fn gen_timebin(
config: &ChannelConfig,
node: &Node,
ensemble: &Ensemble,
gen_var: &GenVar,
) -> Result<GenTimebinRes, Error> {
let tb = ts.ns / config.time_bin_size.ns;
let path = channel_path
@@ -317,8 +358,25 @@ async fn gen_timebin(
ns: (tb + 1) * config.time_bin_size.ns,
};
while ts.ns < tsmax.ns {
if evix % ensemble.nodes.len() as u64 == node.split as u64 {
gen_event(&mut file, index_file.as_mut(), evix, ts, pulse, config).await?;
match gen_var {
GenVar::Default => {
if evix % ensemble.nodes.len() as u64 == node.split as u64 {
gen_event(&mut file, index_file.as_mut(), evix, ts, pulse, config, gen_var).await?;
}
}
GenVar::ConstRegular => {
if evix % ensemble.nodes.len() as u64 == node.split as u64 {
gen_event(&mut file, index_file.as_mut(), evix, ts, pulse, config, gen_var).await?;
}
}
GenVar::TimeWeight => {
let m = evix % 20;
if m == 0 || m == 1 {
if evix % ensemble.nodes.len() as u64 == node.split as u64 {
gen_event(&mut file, index_file.as_mut(), evix, ts, pulse, config, gen_var).await?;
}
}
}
}
evix += 1;
ts.ns += ts_spacing;
@@ -347,6 +405,7 @@ async fn gen_event(
ts: Nanos,
pulse: u64,
config: &ChannelConfig,
gen_var: &GenVar,
) -> Result<(), Error> {
let ttl = 0xcafecafe;
let ioc_ts = 0xcafecafe;
@@ -436,7 +495,20 @@ async fn gen_event(
buf.put_u8(config.scalar_type.index());
match &config.scalar_type {
ScalarType::I32 => {
let v = evix as i32;
let v = match gen_var {
GenVar::Default => evix as i32,
GenVar::ConstRegular => 42 as i32,
GenVar::TimeWeight => {
let m = evix % 20;
if m == 0 {
200
} else if m == 1 {
400
} else {
0
}
}
};
if config.byte_order.is_be() {
buf.put_i32(v);
} else {

View File

@@ -179,6 +179,7 @@ pub async fn make_event_pipe(
node_config.ix,
evq.disk_io_buffer_size,
event_chunker_conf,
true,
);
let shape = entry.to_shape()?;
let pipe = pipe1!(