From 370b33c6a50fe5ff8dbfff32122c559fd184c989 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 1 Sep 2021 15:49:35 +0200 Subject: [PATCH] Read events and assert one event before requested range --- disk/src/eventblobs.rs | 93 ++++++++++++++++++++++++++++++++++++++-- disk/src/eventchunker.rs | 37 ++++++++++------ err/src/lib.rs | 2 +- items/src/lib.rs | 1 - 4 files changed, 115 insertions(+), 18 deletions(-) diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 1a9038c..06e99d8 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -1,12 +1,12 @@ -use crate::dataopen::{open_files, OpenedFile}; +use crate::dataopen::{open_expanded_files, open_files, OpenedFile}; use crate::eventchunker::{EventChunker, EventChunkerConf, EventFull}; use crate::file_content_stream; use err::Error; use futures_core::Stream; use futures_util::StreamExt; use items::{LogItem, RangeCompletableItem, StreamItem}; -use netpod::log::*; use netpod::timeunits::SEC; +use netpod::{log::*, ByteSize}; use netpod::{ChannelConfig, NanoRange, Node}; use std::pin::Pin; use std::sync::atomic::AtomicU64; @@ -26,6 +26,7 @@ pub struct EventChunkerMultifile { max_ts: Arc, files_count: u32, node_ix: usize, + seen_before_range_count: usize, } impl EventChunkerMultifile { @@ -37,8 +38,13 @@ impl EventChunkerMultifile { buffer_size: usize, event_chunker_conf: EventChunkerConf, ) -> Self { + let file_chan = if true { + open_expanded_files(&range, &channel_config, node) + } else { + open_files(&range, &channel_config, node) + }; Self { - file_chan: open_files(&range, &channel_config, node), + file_chan, evs: None, buffer_size, event_chunker_conf, @@ -50,6 +56,18 @@ impl EventChunkerMultifile { max_ts: Arc::new(AtomicU64::new(0)), files_count: 0, node_ix, + seen_before_range_count: 0, + } + } + + pub fn seen_before_range_count(&self) -> usize { + self.seen_before_range_count + } + + pub fn close(&mut self) { + if let Some(evs) = &mut self.evs { + self.seen_before_range_count += evs.seen_before_range_count(); + self.evs = None; } } } @@ -73,6 +91,7 @@ impl Stream for EventChunkerMultifile { Some(evs) => match evs.poll_next_unpin(cx) { Ready(Some(k)) => Ready(Some(k)), Ready(None) => { + self.seen_before_range_count += evs.seen_before_range_count(); self.evs = None; continue 'outer; } @@ -127,3 +146,71 @@ impl Stream for EventChunkerMultifile { } } } + +#[test] +fn read_expanded() { + use netpod::timeunits::*; + use netpod::Nanos; + let range = netpod::NanoRange { + beg: DAY + HOUR * 5, + end: DAY + HOUR * 8, + }; + let chn = netpod::Channel { + backend: "testbackend".into(), + name: "scalar-i32-be".into(), + }; + // TODO read config from disk. + let channel_config = ChannelConfig { + channel: chn, + keyspace: 2, + time_bin_size: Nanos { ns: DAY }, + scalar_type: netpod::ScalarType::I32, + byte_order: netpod::ByteOrder::big_endian(), + shape: netpod::Shape::Scalar, + array: false, + compression: false, + }; + let cluster = taskrun::test_cluster(); + let node_ix = 0; + let node = cluster.nodes[node_ix].clone(); + let buffer_size = 512; + let event_chunker_conf = EventChunkerConf { + disk_stats_every: ByteSize::kb(1024), + }; + let task = async move { + let mut c1 = 0; + let mut event_count = 0; + let mut events = + EventChunkerMultifile::new(range, channel_config, node, node_ix, buffer_size, event_chunker_conf); + while let Some(item) = events.next().await { + match item { + Ok(item) => match item { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::Data(item) => { + event_count += item.tss.len(); + } + _ => {} + }, + _ => {} + }, + Err(e) => return Err(e.into()), + } + c1 += 1; + if c1 >= 6 { + break; + } + } + events.close(); + if events.seen_before_range_count() != 1 { + return Err(Error::with_msg(format!( + "seen_before_range_count error: {}", + events.seen_before_range_count(), + ))); + } + if event_count != 49 { + return Err(Error::with_msg(format!("unexpected number of events: {}", event_count))); + } + Ok(()) + }; + taskrun::run(task).unwrap(); +} diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 41bf2d9..2c8214b 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -30,6 +30,7 @@ pub struct EventChunker { parsed_bytes: u64, path: PathBuf, max_ts: Arc, + seen_before_range_count: usize, } enum DataFileState { @@ -44,7 +45,7 @@ struct ParseResult { #[derive(Clone, Debug)] pub struct EventChunkerConf { - disk_stats_every: ByteSize, + pub disk_stats_every: ByteSize, } impl EventChunkerConf { @@ -80,6 +81,7 @@ impl EventChunker { parsed_bytes: 0, path, max_ts, + seen_before_range_count: 0, } } @@ -177,18 +179,22 @@ impl EventChunker { break; } if ts < self.range.beg { - Err(Error::with_msg(format!( - "seen before range: event ts: {}.{} range beg: {}.{} range end: {}.{} pulse {} config {:?} path {:?}", - ts / SEC, - ts % SEC, - self.range.beg / SEC, - self.range.beg % SEC, - self.range.end / SEC, - self.range.end % SEC, - pulse, - self.channel_config.shape, - self.path - )))?; + self.seen_before_range_count += 1; + if self.seen_before_range_count > 1 { + let e = Error::with_msg(format!( + "seen before range: event ts: {}.{} range beg: {}.{} range end: {}.{} pulse {} config {:?} path {:?}", + ts / SEC, + ts % SEC, + self.range.beg / SEC, + self.range.beg % SEC, + self.range.end / SEC, + self.range.end % SEC, + pulse, + self.channel_config.shape, + self.path + )); + Err(e)?; + } } let _ioc_ts = sl.read_i64::().unwrap(); let status = sl.read_i8().unwrap(); @@ -315,8 +321,13 @@ impl EventChunker { parsed_bytes, }) } + + pub fn seen_before_range_count(&self) -> usize { + self.seen_before_range_count + } } +#[derive(Debug)] pub struct EventFull { pub tss: Vec, pub pulses: Vec, diff --git a/err/src/lib.rs b/err/src/lib.rs index ee6f25d..0345465 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -57,7 +57,7 @@ fn fmt_backtrace(trace: &backtrace::Backtrace) -> String { None => false, Some(s) => { let s = s.to_str().unwrap(); - s.contains("dev/daqbuffer/") || s.contains("/retrsbld/") + s.contains("/dev/daqbuffer/") || s.contains("/data_meta/build/") } }; let name = match sy.name() { diff --git a/items/src/lib.rs b/items/src/lib.rs index 65eeb17..2e22288 100644 --- a/items/src/lib.rs +++ b/items/src/lib.rs @@ -1,4 +1,3 @@ -use crate::eventvalues::EventValues; use crate::frame::make_frame_2; use crate::numops::BoolNum; use bytes::BytesMut;