Report on i/o timeout

This commit is contained in:
Dominik Werder
2023-02-02 08:15:26 +01:00
parent f20765ec56
commit faa9158719
5 changed files with 75 additions and 34 deletions

View File

@@ -35,6 +35,7 @@ use std::path::PathBuf;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
use streams::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE};
use streams::filechunkread::FileChunkRead;
@@ -235,32 +236,72 @@ impl Stream for FileContentStream {
}
fn start_read5(
path: PathBuf,
file: File,
tx: async_channel::Sender<Result<FileChunkRead, Error>>,
disk_io_tune: DiskIoTune,
) -> Result<(), Error> {
let fut = async move {
info!("start_read5 BEGIN {disk_io_tune:?}");
let mut file = file;
let pos_beg = match file.stream_position().await {
Ok(x) => x,
Err(e) => {
error!("stream_position {e} {path:?}");
if let Err(_) = tx
.send(Err(Error::with_msg_no_trace(format!("seek error {path:?}"))))
.await
{
error!("broken channel");
}
return;
}
};
let mut pos = pos_beg;
info!("start_read5 BEGIN {disk_io_tune:?}");
loop {
let mut buf = BytesMut::new();
buf.resize(1024 * 256, 0);
match file.read(&mut buf).await {
Ok(n) => {
match tokio::time::timeout(Duration::from_millis(8000), file.read(&mut buf)).await {
Ok(Ok(n)) => {
if n == 0 {
info!("read5 EOF pos_beg {pos_beg} pos {pos} path {path:?}");
break;
}
pos += n as u64;
buf.truncate(n);
let item = FileChunkRead::with_buf(buf);
match tx.send(Ok(item)).await {
Ok(()) => {}
Err(_e) => break,
Err(_) => {
error!("broken channel");
break;
}
}
}
Err(e) => match tx.send(Err(e.into())).await {
Ok(()) => {}
Err(_e) => break,
Ok(Err(e)) => match tx.send(Err(e.into())).await {
Ok(()) => {
break;
}
Err(_) => {
error!("broken channel");
break;
}
},
Err(_) => {
let msg = format!("I/O timeout pos_beg {pos_beg} pos {pos} path {path:?}");
error!("{msg}");
let e = Error::with_msg_no_trace(msg);
match tx.send(Err(e)).await {
Ok(()) => {}
Err(_e) => {
error!("broken channel");
break;
}
}
break;
}
}
}
info!("start_read5 DONE");
};
tokio::task::spawn(fut);
Ok(())
@@ -271,9 +312,9 @@ pub struct FileContentStream5 {
}
impl FileContentStream5 {
pub fn new(file: File, disk_io_tune: DiskIoTune) -> Result<Self, Error> {
pub fn new(path: PathBuf, file: File, disk_io_tune: DiskIoTune) -> Result<Self, Error> {
let (tx, rx) = async_channel::bounded(32);
start_read5(file, tx, disk_io_tune)?;
start_read5(path, file, tx, disk_io_tune)?;
let ret = Self { rx };
Ok(ret)
}
@@ -658,6 +699,7 @@ impl Stream for FileContentStream4 {
}
pub fn file_content_stream(
path: PathBuf,
file: File,
disk_io_tune: DiskIoTune,
) -> Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>> {
@@ -680,7 +722,7 @@ pub fn file_content_stream(
Box::pin(s) as _
}
ReadSys::Read5 => {
let s = FileContentStream5::new(file, disk_io_tune).unwrap();
let s = FileContentStream5::new(path, file, disk_io_tune).unwrap();
Box::pin(s) as _
}
}

View File

@@ -162,6 +162,7 @@ impl Stream for EventChunkerMultifile {
match file.file {
Some(file) => {
let inp = Box::pin(crate::file_content_stream(
path.clone(),
file,
self.disk_io_tune.clone(),
));
@@ -193,10 +194,14 @@ impl Stream for EventChunkerMultifile {
info!(" path {:?}", x.path);
}
let item = LogItem::quick(Level::INFO, msg);
let mut chunkers = vec![];
let mut chunkers = Vec::new();
for of in ofs.files {
if let Some(file) = of.file {
let inp = crate::file_content_stream(file, self.disk_io_tune.clone());
let inp = crate::file_content_stream(
of.path.clone(),
file,
self.disk_io_tune.clone(),
);
let chunker = EventChunker::from_event_boundary(
inp,
self.channel_config.clone(),

View File

@@ -330,23 +330,23 @@ mod test {
}
async fn collect_merged_events(paths: Vec<PathBuf>, range: NanoRange) -> Result<CollectedEvents, Error> {
let mut files = vec![];
let mut files = Vec::new();
for path in paths {
let p = position_file_for_test(&path, &range, false, false).await?;
if !p.found {
return Err(Error::with_msg_no_trace("can not position file??"));
}
files.push(
p.file
.file
.ok_or_else(|| Error::with_msg(format!("can not open file {:?}", path)))?,
);
let file = p
.file
.file
.ok_or_else(|| Error::with_msg(format!("can not open file {:?}", path)))?;
files.push((path, file));
}
let inps = files
.into_iter()
.map(|file| {
.map(|(path, file)| {
let disk_io_tune = netpod::DiskIoTune::default();
let inp = file_content_stream(file, disk_io_tune);
let inp = file_content_stream(path, file, disk_io_tune);
inp
})
.map(|inp| {

View File

@@ -67,9 +67,10 @@ pub fn main() -> Result<(), Error> {
Err(e) => return Err(Error::with_msg_no_trace(format!("can not parse: {:?}", e))),
};
eprintln!("Read config: {:?}", config);
let file = File::open(&sub.datafile).await?;
let path = sub.datafile;
let file = File::open(&path).await?;
let disk_io_tune = netpod::DiskIoTune::default();
let inp = Box::pin(disk::file_content_stream(file, disk_io_tune));
let inp = Box::pin(disk::file_content_stream(path.clone(), file, disk_io_tune));
let ce = &config.entries[0];
let channel_config = ChannelConfig {
channel: Channel {
@@ -92,15 +93,8 @@ pub fn main() -> Result<(), Error> {
let stats_conf = EventChunkerConf {
disk_stats_every: ByteSize::mb(2),
};
let chunks = EventChunker::from_start(
inp,
channel_config.clone(),
range,
stats_conf,
sub.datafile.clone(),
false,
true,
);
let chunks =
EventChunker::from_start(inp, channel_config.clone(), range, stats_conf, path, false, true);
use futures_util::stream::StreamExt;
use items::WithLen;
//let evs = EventValuesDim0Case::<f64>::new();

View File

@@ -73,8 +73,8 @@ impl DownloadHandler {
// TODO wrap file operation to return a better error.
let pp = base.join(p2);
info!("Try to open {pp:?}");
let file = tokio::fs::OpenOptions::new().read(true).open(pp).await?;
let s = disk::file_content_stream(file, query.disk_io_tune.clone()).map_ok(|x| x.into_buf());
let file = tokio::fs::OpenOptions::new().read(true).open(&pp).await?;
let s = disk::file_content_stream(pp, file, query.disk_io_tune.clone()).map_ok(|x| x.into_buf());
Ok(response(StatusCode::OK).body(Body::wrap_stream(s))?)
}