Log message about number of opened datafiles

This commit is contained in:
Dominik Werder
2021-05-17 10:11:14 +02:00
parent f63624f8e2
commit ac5b10216b
2 changed files with 14 additions and 14 deletions
-9
View File
@@ -3,7 +3,6 @@ use bytes::BytesMut;
use err::Error; use err::Error;
use futures_util::StreamExt; use futures_util::StreamExt;
use netpod::log::*; use netpod::log::*;
use netpod::timeunits::MS;
use netpod::{ChannelConfig, NanoRange, Nanos, Node}; use netpod::{ChannelConfig, NanoRange, Nanos, Node};
use std::path::PathBuf; use std::path::PathBuf;
use tokio::fs::{File, OpenOptions}; use tokio::fs::{File, OpenOptions};
@@ -63,9 +62,6 @@ async fn open_files_inner(
} }
} }
timebins.sort_unstable(); timebins.sort_unstable();
let tgt_tb = (range.beg / MS) as f32 / (channel_config.time_bin_size.ns / MS) as f32;
info!("tgt_tb: {:?}", tgt_tb);
info!("timebins found: {:?}", timebins);
for &tb in &timebins { for &tb in &timebins {
let ts_bin = Nanos { let ts_bin = Nanos {
ns: tb * channel_config.time_bin_size.ns, ns: tb * channel_config.time_bin_size.ns,
@@ -76,11 +72,8 @@ async fn open_files_inner(
if ts_bin.ns + channel_config.time_bin_size.ns <= range.beg { if ts_bin.ns + channel_config.time_bin_size.ns <= range.beg {
continue; continue;
} }
//debug!("opening tb {:?}", &tb);
let path = paths::datapath(tb, &channel_config, &node); let path = paths::datapath(tb, &channel_config, &node);
info!("opening path {:?}", &path);
let mut file = OpenOptions::new().read(true).open(&path).await?; let mut file = OpenOptions::new().read(true).open(&path).await?;
//info!("opened file {:?} {:?}", &path, &file);
let ret = { let ret = {
let index_path = paths::index_path(ts_bin, &channel_config, &node)?; let index_path = paths::index_path(ts_bin, &channel_config, &node)?;
match OpenOptions::new().read(true).open(&index_path).await { match OpenOptions::new().read(true).open(&index_path).await {
@@ -109,7 +102,6 @@ async fn open_files_inner(
} }
let mut buf = BytesMut::with_capacity(meta.len() as usize); let mut buf = BytesMut::with_capacity(meta.len() as usize);
buf.resize(buf.capacity(), 0); buf.resize(buf.capacity(), 0);
debug!("read exact index file {} {}", buf.len(), buf.len() % 16);
index_file.read_exact(&mut buf).await?; index_file.read_exact(&mut buf).await?;
match super::index::find_ge(range.beg, &buf[2..])? { match super::index::find_ge(range.beg, &buf[2..])? {
Some(o) => { Some(o) => {
@@ -160,6 +152,5 @@ async fn open_files_inner(
chtx.send(Ok(ret)).await?; chtx.send(Ok(ret)).await?;
} }
// TODO keep track of number of running // TODO keep track of number of running
debug!("open_files_inner done");
Ok(()) Ok(())
} }
+14 -5
View File
@@ -6,7 +6,6 @@ use err::Error;
use futures_core::Stream; use futures_core::Stream;
use futures_util::StreamExt; use futures_util::StreamExt;
use netpod::log::*; use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{ChannelConfig, NanoRange, Node}; use netpod::{ChannelConfig, NanoRange, Node};
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicU64;
@@ -20,9 +19,11 @@ pub struct EventBlobsComplete {
buffer_size: usize, buffer_size: usize,
event_chunker_conf: EventChunkerConf, event_chunker_conf: EventChunkerConf,
range: NanoRange, range: NanoRange,
data_completed: bool,
errored: bool, errored: bool,
completed: bool, completed: bool,
max_ts: Arc<AtomicU64>, max_ts: Arc<AtomicU64>,
files_count: u32,
} }
impl EventBlobsComplete { impl EventBlobsComplete {
@@ -33,7 +34,6 @@ impl EventBlobsComplete {
buffer_size: usize, buffer_size: usize,
event_chunker_conf: EventChunkerConf, event_chunker_conf: EventChunkerConf,
) -> Self { ) -> Self {
//info!("EventBlobsComplete::new beg {}", range.beg / SEC);
Self { Self {
file_chan: open_files(&range, &channel_config, node), file_chan: open_files(&range, &channel_config, node),
evs: None, evs: None,
@@ -41,9 +41,11 @@ impl EventBlobsComplete {
event_chunker_conf, event_chunker_conf,
channel_config, channel_config,
range, range,
data_completed: false,
errored: false, errored: false,
completed: false, completed: false,
max_ts: Arc::new(AtomicU64::new(0)), max_ts: Arc::new(AtomicU64::new(0)),
files_count: 0,
} }
} }
} }
@@ -59,6 +61,9 @@ impl Stream for EventBlobsComplete {
} else if self.errored { } else if self.errored {
self.completed = true; self.completed = true;
return Ready(None); return Ready(None);
} else if self.data_completed {
self.completed = true;
return Ready(None);
} else { } else {
match &mut self.evs { match &mut self.evs {
Some(evs) => match evs.poll_next_unpin(cx) { Some(evs) => match evs.poll_next_unpin(cx) {
@@ -72,6 +77,7 @@ impl Stream for EventBlobsComplete {
None => match self.file_chan.poll_next_unpin(cx) { None => match self.file_chan.poll_next_unpin(cx) {
Ready(Some(k)) => match k { Ready(Some(k)) => match k {
Ok(file) => { Ok(file) => {
self.files_count += 1;
let path = file.path; let path = file.path;
let item = LogItem::quick(Level::INFO, format!("handle file {:?}", path)); let item = LogItem::quick(Level::INFO, format!("handle file {:?}", path));
match file.file { match file.file {
@@ -93,7 +99,6 @@ impl Stream for EventBlobsComplete {
} }
} }
Ready(Some(Ok(EventChunkerItem::Log(item)))) Ready(Some(Ok(EventChunkerItem::Log(item))))
//continue 'outer;
} }
Err(e) => { Err(e) => {
self.errored = true; self.errored = true;
@@ -101,8 +106,12 @@ impl Stream for EventBlobsComplete {
} }
}, },
Ready(None) => { Ready(None) => {
self.completed = true; self.data_completed = true;
Ready(None) let item = LogItem::quick(
Level::INFO,
format!("EventBlobsComplete used {} datafiles", self.files_count),
);
Ready(Some(Ok(EventChunkerItem::Log(item))))
} }
Pending => Pending, Pending => Pending,
}, },