From ac5b10216b7babbc75fd3cc67c4bc4378f3272ba Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 17 May 2021 10:11:14 +0200 Subject: [PATCH] Log message about number of opened datafiles --- disk/src/dataopen.rs | 9 --------- disk/src/eventblobs.rs | 19 ++++++++++++++----- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index 977f1e4..4f0631a 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -3,7 +3,6 @@ use bytes::BytesMut; use err::Error; use futures_util::StreamExt; use netpod::log::*; -use netpod::timeunits::MS; use netpod::{ChannelConfig, NanoRange, Nanos, Node}; use std::path::PathBuf; use tokio::fs::{File, OpenOptions}; @@ -63,9 +62,6 @@ async fn open_files_inner( } } timebins.sort_unstable(); - let tgt_tb = (range.beg / MS) as f32 / (channel_config.time_bin_size.ns / MS) as f32; - info!("tgt_tb: {:?}", tgt_tb); - info!("timebins found: {:?}", timebins); for &tb in &timebins { let ts_bin = Nanos { ns: tb * channel_config.time_bin_size.ns, @@ -76,11 +72,8 @@ async fn open_files_inner( if ts_bin.ns + channel_config.time_bin_size.ns <= range.beg { continue; } - //debug!("opening tb {:?}", &tb); let path = paths::datapath(tb, &channel_config, &node); - info!("opening path {:?}", &path); let mut file = OpenOptions::new().read(true).open(&path).await?; - //info!("opened file {:?} {:?}", &path, &file); let ret = { let index_path = paths::index_path(ts_bin, &channel_config, &node)?; match OpenOptions::new().read(true).open(&index_path).await { @@ -109,7 +102,6 @@ async fn open_files_inner( } let mut buf = BytesMut::with_capacity(meta.len() as usize); buf.resize(buf.capacity(), 0); - debug!("read exact index file {} {}", buf.len(), buf.len() % 16); index_file.read_exact(&mut buf).await?; match super::index::find_ge(range.beg, &buf[2..])? { Some(o) => { @@ -160,6 +152,5 @@ async fn open_files_inner( chtx.send(Ok(ret)).await?; } // TODO keep track of number of running - debug!("open_files_inner done"); Ok(()) } diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 926fd84..6e29fee 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -6,7 +6,6 @@ use err::Error; use futures_core::Stream; use futures_util::StreamExt; use netpod::log::*; -use netpod::timeunits::SEC; use netpod::{ChannelConfig, NanoRange, Node}; use std::pin::Pin; use std::sync::atomic::AtomicU64; @@ -20,9 +19,11 @@ pub struct EventBlobsComplete { buffer_size: usize, event_chunker_conf: EventChunkerConf, range: NanoRange, + data_completed: bool, errored: bool, completed: bool, max_ts: Arc, + files_count: u32, } impl EventBlobsComplete { @@ -33,7 +34,6 @@ impl EventBlobsComplete { buffer_size: usize, event_chunker_conf: EventChunkerConf, ) -> Self { - //info!("EventBlobsComplete::new beg {}", range.beg / SEC); Self { file_chan: open_files(&range, &channel_config, node), evs: None, @@ -41,9 +41,11 @@ impl EventBlobsComplete { event_chunker_conf, channel_config, range, + data_completed: false, errored: false, completed: false, max_ts: Arc::new(AtomicU64::new(0)), + files_count: 0, } } } @@ -59,6 +61,9 @@ impl Stream for EventBlobsComplete { } else if self.errored { self.completed = true; return Ready(None); + } else if self.data_completed { + self.completed = true; + return Ready(None); } else { match &mut self.evs { Some(evs) => match evs.poll_next_unpin(cx) { @@ -72,6 +77,7 @@ impl Stream for EventBlobsComplete { None => match self.file_chan.poll_next_unpin(cx) { Ready(Some(k)) => match k { Ok(file) => { + self.files_count += 1; let path = file.path; let item = LogItem::quick(Level::INFO, format!("handle file {:?}", path)); match file.file { @@ -93,7 +99,6 @@ impl Stream for EventBlobsComplete { } } Ready(Some(Ok(EventChunkerItem::Log(item)))) - //continue 'outer; } Err(e) => { self.errored = true; @@ -101,8 +106,12 @@ impl Stream for EventBlobsComplete { } }, Ready(None) => { - self.completed = true; - Ready(None) + self.data_completed = true; + let item = LogItem::quick( + Level::INFO, + format!("EventBlobsComplete used {} datafiles", self.files_count), + ); + Ready(Some(Ok(EventChunkerItem::Log(item)))) } Pending => Pending, },