diff --git a/disk/src/disk.rs b/disk/src/disk.rs index 8952b63..6a430ad 100644 --- a/disk/src/disk.rs +++ b/disk/src/disk.rs @@ -35,6 +35,7 @@ use std::path::PathBuf; use std::pin::Pin; use std::task::Context; use std::task::Poll; +use std::time::Duration; use std::time::Instant; use streams::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE}; use streams::filechunkread::FileChunkRead; @@ -235,32 +236,72 @@ impl Stream for FileContentStream { } fn start_read5( + path: PathBuf, file: File, tx: async_channel::Sender>, disk_io_tune: DiskIoTune, ) -> Result<(), Error> { let fut = async move { - info!("start_read5 BEGIN {disk_io_tune:?}"); let mut file = file; + let pos_beg = match file.stream_position().await { + Ok(x) => x, + Err(e) => { + error!("stream_position {e} {path:?}"); + if let Err(_) = tx + .send(Err(Error::with_msg_no_trace(format!("seek error {path:?}")))) + .await + { + error!("broken channel"); + } + return; + } + }; + let mut pos = pos_beg; + info!("start_read5 BEGIN {disk_io_tune:?}"); loop { let mut buf = BytesMut::new(); buf.resize(1024 * 256, 0); - match file.read(&mut buf).await { - Ok(n) => { + match tokio::time::timeout(Duration::from_millis(8000), file.read(&mut buf)).await { + Ok(Ok(n)) => { + if n == 0 { + info!("read5 EOF pos_beg {pos_beg} pos {pos} path {path:?}"); + break; + } + pos += n as u64; buf.truncate(n); let item = FileChunkRead::with_buf(buf); match tx.send(Ok(item)).await { Ok(()) => {} - Err(_e) => break, + Err(_) => { + error!("broken channel"); + break; + } } } - Err(e) => match tx.send(Err(e.into())).await { - Ok(()) => {} - Err(_e) => break, + Ok(Err(e)) => match tx.send(Err(e.into())).await { + Ok(()) => { + break; + } + Err(_) => { + error!("broken channel"); + break; + } }, + Err(_) => { + let msg = format!("I/O timeout pos_beg {pos_beg} pos {pos} path {path:?}"); + error!("{msg}"); + let e = Error::with_msg_no_trace(msg); + match tx.send(Err(e)).await { + Ok(()) => {} + Err(_e) => { + error!("broken channel"); + break; + } + } + break; + } } } - info!("start_read5 DONE"); }; tokio::task::spawn(fut); Ok(()) @@ -271,9 +312,9 @@ pub struct FileContentStream5 { } impl FileContentStream5 { - pub fn new(file: File, disk_io_tune: DiskIoTune) -> Result { + pub fn new(path: PathBuf, file: File, disk_io_tune: DiskIoTune) -> Result { let (tx, rx) = async_channel::bounded(32); - start_read5(file, tx, disk_io_tune)?; + start_read5(path, file, tx, disk_io_tune)?; let ret = Self { rx }; Ok(ret) } @@ -658,6 +699,7 @@ impl Stream for FileContentStream4 { } pub fn file_content_stream( + path: PathBuf, file: File, disk_io_tune: DiskIoTune, ) -> Pin> + Send>> { @@ -680,7 +722,7 @@ pub fn file_content_stream( Box::pin(s) as _ } ReadSys::Read5 => { - let s = FileContentStream5::new(file, disk_io_tune).unwrap(); + let s = FileContentStream5::new(path, file, disk_io_tune).unwrap(); Box::pin(s) as _ } } diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 87880ca..8647d4b 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -162,6 +162,7 @@ impl Stream for EventChunkerMultifile { match file.file { Some(file) => { let inp = Box::pin(crate::file_content_stream( + path.clone(), file, self.disk_io_tune.clone(), )); @@ -193,10 +194,14 @@ impl Stream for EventChunkerMultifile { info!(" path {:?}", x.path); } let item = LogItem::quick(Level::INFO, msg); - let mut chunkers = vec![]; + let mut chunkers = Vec::new(); for of in ofs.files { if let Some(file) = of.file { - let inp = crate::file_content_stream(file, self.disk_io_tune.clone()); + let inp = crate::file_content_stream( + of.path.clone(), + file, + self.disk_io_tune.clone(), + ); let chunker = EventChunker::from_event_boundary( inp, self.channel_config.clone(), diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 3f87a13..794174c 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -330,23 +330,23 @@ mod test { } async fn collect_merged_events(paths: Vec, range: NanoRange) -> Result { - let mut files = vec![]; + let mut files = Vec::new(); for path in paths { let p = position_file_for_test(&path, &range, false, false).await?; if !p.found { return Err(Error::with_msg_no_trace("can not position file??")); } - files.push( - p.file - .file - .ok_or_else(|| Error::with_msg(format!("can not open file {:?}", path)))?, - ); + let file = p + .file + .file + .ok_or_else(|| Error::with_msg(format!("can not open file {:?}", path)))?; + files.push((path, file)); } let inps = files .into_iter() - .map(|file| { + .map(|(path, file)| { let disk_io_tune = netpod::DiskIoTune::default(); - let inp = file_content_stream(file, disk_io_tune); + let inp = file_content_stream(path, file, disk_io_tune); inp }) .map(|inp| { diff --git a/dq/src/bin/dq.rs b/dq/src/bin/dq.rs index a1031ee..fb0577f 100644 --- a/dq/src/bin/dq.rs +++ b/dq/src/bin/dq.rs @@ -67,9 +67,10 @@ pub fn main() -> Result<(), Error> { Err(e) => return Err(Error::with_msg_no_trace(format!("can not parse: {:?}", e))), }; eprintln!("Read config: {:?}", config); - let file = File::open(&sub.datafile).await?; + let path = sub.datafile; + let file = File::open(&path).await?; let disk_io_tune = netpod::DiskIoTune::default(); - let inp = Box::pin(disk::file_content_stream(file, disk_io_tune)); + let inp = Box::pin(disk::file_content_stream(path.clone(), file, disk_io_tune)); let ce = &config.entries[0]; let channel_config = ChannelConfig { channel: Channel { @@ -92,15 +93,8 @@ pub fn main() -> Result<(), Error> { let stats_conf = EventChunkerConf { disk_stats_every: ByteSize::mb(2), }; - let chunks = EventChunker::from_start( - inp, - channel_config.clone(), - range, - stats_conf, - sub.datafile.clone(), - false, - true, - ); + let chunks = + EventChunker::from_start(inp, channel_config.clone(), range, stats_conf, path, false, true); use futures_util::stream::StreamExt; use items::WithLen; //let evs = EventValuesDim0Case::::new(); diff --git a/httpret/src/download.rs b/httpret/src/download.rs index 04e4aab..3519033 100644 --- a/httpret/src/download.rs +++ b/httpret/src/download.rs @@ -73,8 +73,8 @@ impl DownloadHandler { // TODO wrap file operation to return a better error. let pp = base.join(p2); info!("Try to open {pp:?}"); - let file = tokio::fs::OpenOptions::new().read(true).open(pp).await?; - let s = disk::file_content_stream(file, query.disk_io_tune.clone()).map_ok(|x| x.into_buf()); + let file = tokio::fs::OpenOptions::new().read(true).open(&pp).await?; + let s = disk::file_content_stream(pp, file, query.disk_io_tune.clone()).map_ok(|x| x.into_buf()); Ok(response(StatusCode::OK).body(Body::wrap_stream(s))?) }