Read events and assert one event before requested range

This commit is contained in:
Dominik Werder
2021-09-01 15:49:35 +02:00
parent 6e7a279a11
commit 370b33c6a5
4 changed files with 115 additions and 18 deletions

View File

@@ -1,12 +1,12 @@
use crate::dataopen::{open_files, OpenedFile};
use crate::dataopen::{open_expanded_files, open_files, OpenedFile};
use crate::eventchunker::{EventChunker, EventChunkerConf, EventFull};
use crate::file_content_stream;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::{LogItem, RangeCompletableItem, StreamItem};
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{log::*, ByteSize};
use netpod::{ChannelConfig, NanoRange, Node};
use std::pin::Pin;
use std::sync::atomic::AtomicU64;
@@ -26,6 +26,7 @@ pub struct EventChunkerMultifile {
max_ts: Arc<AtomicU64>,
files_count: u32,
node_ix: usize,
seen_before_range_count: usize,
}
impl EventChunkerMultifile {
@@ -37,8 +38,13 @@ impl EventChunkerMultifile {
buffer_size: usize,
event_chunker_conf: EventChunkerConf,
) -> Self {
let file_chan = if true {
open_expanded_files(&range, &channel_config, node)
} else {
open_files(&range, &channel_config, node)
};
Self {
file_chan: open_files(&range, &channel_config, node),
file_chan,
evs: None,
buffer_size,
event_chunker_conf,
@@ -50,6 +56,18 @@ impl EventChunkerMultifile {
max_ts: Arc::new(AtomicU64::new(0)),
files_count: 0,
node_ix,
seen_before_range_count: 0,
}
}
pub fn seen_before_range_count(&self) -> usize {
self.seen_before_range_count
}
pub fn close(&mut self) {
if let Some(evs) = &mut self.evs {
self.seen_before_range_count += evs.seen_before_range_count();
self.evs = None;
}
}
}
@@ -73,6 +91,7 @@ impl Stream for EventChunkerMultifile {
Some(evs) => match evs.poll_next_unpin(cx) {
Ready(Some(k)) => Ready(Some(k)),
Ready(None) => {
self.seen_before_range_count += evs.seen_before_range_count();
self.evs = None;
continue 'outer;
}
@@ -127,3 +146,71 @@ impl Stream for EventChunkerMultifile {
}
}
}
#[test]
fn read_expanded() {
use netpod::timeunits::*;
use netpod::Nanos;
let range = netpod::NanoRange {
beg: DAY + HOUR * 5,
end: DAY + HOUR * 8,
};
let chn = netpod::Channel {
backend: "testbackend".into(),
name: "scalar-i32-be".into(),
};
// TODO read config from disk.
let channel_config = ChannelConfig {
channel: chn,
keyspace: 2,
time_bin_size: Nanos { ns: DAY },
scalar_type: netpod::ScalarType::I32,
byte_order: netpod::ByteOrder::big_endian(),
shape: netpod::Shape::Scalar,
array: false,
compression: false,
};
let cluster = taskrun::test_cluster();
let node_ix = 0;
let node = cluster.nodes[node_ix].clone();
let buffer_size = 512;
let event_chunker_conf = EventChunkerConf {
disk_stats_every: ByteSize::kb(1024),
};
let task = async move {
let mut c1 = 0;
let mut event_count = 0;
let mut events =
EventChunkerMultifile::new(range, channel_config, node, node_ix, buffer_size, event_chunker_conf);
while let Some(item) = events.next().await {
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::Data(item) => {
event_count += item.tss.len();
}
_ => {}
},
_ => {}
},
Err(e) => return Err(e.into()),
}
c1 += 1;
if c1 >= 6 {
break;
}
}
events.close();
if events.seen_before_range_count() != 1 {
return Err(Error::with_msg(format!(
"seen_before_range_count error: {}",
events.seen_before_range_count(),
)));
}
if event_count != 49 {
return Err(Error::with_msg(format!("unexpected number of events: {}", event_count)));
}
Ok(())
};
taskrun::run(task).unwrap();
}

View File

@@ -30,6 +30,7 @@ pub struct EventChunker {
parsed_bytes: u64,
path: PathBuf,
max_ts: Arc<AtomicU64>,
seen_before_range_count: usize,
}
enum DataFileState {
@@ -44,7 +45,7 @@ struct ParseResult {
#[derive(Clone, Debug)]
pub struct EventChunkerConf {
disk_stats_every: ByteSize,
pub disk_stats_every: ByteSize,
}
impl EventChunkerConf {
@@ -80,6 +81,7 @@ impl EventChunker {
parsed_bytes: 0,
path,
max_ts,
seen_before_range_count: 0,
}
}
@@ -177,18 +179,22 @@ impl EventChunker {
break;
}
if ts < self.range.beg {
Err(Error::with_msg(format!(
"seen before range: event ts: {}.{} range beg: {}.{} range end: {}.{} pulse {} config {:?} path {:?}",
ts / SEC,
ts % SEC,
self.range.beg / SEC,
self.range.beg % SEC,
self.range.end / SEC,
self.range.end % SEC,
pulse,
self.channel_config.shape,
self.path
)))?;
self.seen_before_range_count += 1;
if self.seen_before_range_count > 1 {
let e = Error::with_msg(format!(
"seen before range: event ts: {}.{} range beg: {}.{} range end: {}.{} pulse {} config {:?} path {:?}",
ts / SEC,
ts % SEC,
self.range.beg / SEC,
self.range.beg % SEC,
self.range.end / SEC,
self.range.end % SEC,
pulse,
self.channel_config.shape,
self.path
));
Err(e)?;
}
}
let _ioc_ts = sl.read_i64::<BE>().unwrap();
let status = sl.read_i8().unwrap();
@@ -315,8 +321,13 @@ impl EventChunker {
parsed_bytes,
})
}
pub fn seen_before_range_count(&self) -> usize {
self.seen_before_range_count
}
}
#[derive(Debug)]
pub struct EventFull {
pub tss: Vec<u64>,
pub pulses: Vec<u64>,