#[allow(unused_imports)] use tracing::{error, warn, info, debug, trace}; use err::Error; use std::task::{Context, Poll}; use std::pin::Pin; use tokio::io::AsyncRead; //use std::future::Future; //use futures_core::Stream; pub async fn read_test_1() -> Result { let path = "/data/sf-databuffer/daq_swissfel/daq_swissfel_3/byTime/S10CB01-RIQM-DCP10:FOR-AMPLT/0000000000000018714/0000000012/0000000000086400000_00000_Data"; let fin = tokio::fs::OpenOptions::new() .read(true) .open(path) .await?; let meta = fin.metadata().await; debug!("file meta {:?}", meta); let stream = netpod::BodyStream { inner: Box::new(FileReader { file: fin, nreads: 0, }), }; Ok(stream) } struct FileReader { file: tokio::fs::File, nreads: u32, } impl futures_core::Stream for FileReader { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if self.nreads >= 10 { return Poll::Ready(None); } let blen = 13; let mut buf2 = bytes::BytesMut::with_capacity(blen); buf2.resize(buf2.capacity(), 0); if buf2.as_mut().len() != blen { panic!("todo prepare slice"); } let mut buf = tokio::io::ReadBuf::new(buf2.as_mut()); match Pin::new(&mut self.file).poll_read(cx, &mut buf) { Poll::Ready(Ok(_)) => { info!("read from disk: {} nreads {}", buf.filled().len(), self.nreads); info!("buf2 len: {}", buf2.len()); self.nreads += 1; Poll::Ready(Some(Ok(buf2.freeze()))) } Poll::Ready(Err(e)) => { Poll::Ready(Some(Err(Error::from(e)))) } Poll::Pending => Poll::Pending } } }