From 04def20be3bf918cadc0cff494166911a74ceda2 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 9 Mar 2022 20:34:24 +0100 Subject: [PATCH] Try specific read --- disk/src/disk.rs | 185 +++++++++++++++++++++++++----- disk/src/eventblobs.rs | 5 +- disk/src/read3.rs | 144 ++++++++++++------------ disk/src/read4.rs | 244 ++++++++++++++++++++++++++++++++++++++++ httpret/src/download.rs | 25 ++-- netpod/src/netpod.rs | 3 + 6 files changed, 483 insertions(+), 123 deletions(-) create mode 100644 disk/src/read4.rs diff --git a/disk/src/disk.rs b/disk/src/disk.rs index 0f43c55..56647b0 100644 --- a/disk/src/disk.rs +++ b/disk/src/disk.rs @@ -17,6 +17,7 @@ pub mod merge; pub mod paths; pub mod raw; pub mod read3; +pub mod read4; pub mod streamlog; use crate::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE}; @@ -26,9 +27,8 @@ use futures_core::Stream; use futures_util::future::FusedFuture; use futures_util::{FutureExt, StreamExt, TryFutureExt}; use netpod::histo::HistoLog2; -use netpod::log::*; +use netpod::{log::*, ReadSys}; use netpod::{ChannelConfig, DiskIoTune, Node, Shape}; -use read3::ReadResult; use std::collections::VecDeque; use std::future::Future; use std::io::SeekFrom; @@ -40,6 +40,7 @@ use std::time::{Duration, Instant}; use std::{fmt, mem}; use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncRead, AsyncSeekExt, ReadBuf}; +use tokio::sync::mpsc; // TODO transform this into a self-test or remove. pub async fn read_test_1(query: &netpod::AggQuerySingleChannel, node: Node) -> Result { @@ -258,39 +259,31 @@ impl Stream for FileContentStream { } } -pub fn file_content_stream( - file: File, - disk_io_tune: DiskIoTune, -) -> impl Stream> + Send { - warn!("file_content_stream disk_io_tune {disk_io_tune:?}"); - FileContentStream::new(file, disk_io_tune) -} - -enum FCS2 { +enum FCS3 { GetPosition, ReadingSimple, Reading, } enum ReadStep { - Fut(Pin> + Send>>), - Res(Result), + Fut(Pin> + Send>>), + Res(Result), } -pub struct FileContentStream2 { - fcs2: FCS2, +pub struct FileContentStream3 { + fcs: FCS3, file: Pin>, file_pos: u64, eof: bool, disk_io_tune: DiskIoTune, get_position_fut: Pin> + Send>>, - read_fut: Pin> + Send>>, + read_fut: Pin> + Send>>, reads: VecDeque, done: bool, complete: bool, } -impl FileContentStream2 { +impl FileContentStream3 { pub fn new(file: File, disk_io_tune: DiskIoTune) -> Self { let mut file = Box::pin(file); let ffr = unsafe { @@ -301,7 +294,7 @@ impl FileContentStream2 { .seek(SeekFrom::Current(0)) .map_err(|_| Error::with_msg_no_trace(format!("Seek error"))); Self { - fcs2: FCS2::GetPosition, + fcs: FCS3::GetPosition, file, file_pos: 0, eof: false, @@ -317,7 +310,7 @@ impl FileContentStream2 { } } -impl Stream for FileContentStream2 { +impl Stream for FileContentStream3 { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { @@ -329,8 +322,8 @@ impl Stream for FileContentStream2 { self.complete = true; Ready(None) } else { - match self.fcs2 { - FCS2::GetPosition => match self.get_position_fut.poll_unpin(cx) { + match self.fcs { + FCS3::GetPosition => match self.get_position_fut.poll_unpin(cx) { Ready(Ok(k)) => { info!("current file pos: {k}"); self.file_pos = k; @@ -339,9 +332,9 @@ impl Stream for FileContentStream2 { let count = self.disk_io_tune.read_buffer_len as u64; self.read_fut = Box::pin(read3::Read3::get().read(fd, self.file_pos, count)); self.file_pos += count; - self.fcs2 = FCS2::ReadingSimple; + self.fcs = FCS3::ReadingSimple; } else { - self.fcs2 = FCS2::Reading; + self.fcs = FCS3::Reading; } continue; } @@ -351,7 +344,7 @@ impl Stream for FileContentStream2 { } Pending => Pending, }, - FCS2::ReadingSimple => match self.read_fut.poll_unpin(cx) { + FCS3::ReadingSimple => match self.read_fut.poll_unpin(cx) { Ready(Ok(res)) => { if res.eof { let item = FileChunkRead { @@ -378,7 +371,7 @@ impl Stream for FileContentStream2 { } Pending => Pending, }, - FCS2::Reading => { + FCS3::Reading => { while !self.eof && self.reads.len() < self.disk_io_tune.read_queue_len { let fd = self.file.as_raw_fd(); let pos = self.file_pos; @@ -448,12 +441,146 @@ impl Stream for FileContentStream2 { } } -pub fn file_content_stream_2( +enum FCS4 { + Init, + Setup, + Reading, +} + +pub struct FileContentStream4 { + fcs: FCS4, + file: Pin>, + disk_io_tune: DiskIoTune, + setup_fut: + Option>, Error>> + Send>>>, + inp: Option>>, + recv_fut: Pin>> + Send>>, + done: bool, + complete: bool, +} + +impl FileContentStream4 { + pub fn new(file: File, disk_io_tune: DiskIoTune) -> Self { + let file = Box::pin(file); + Self { + fcs: FCS4::Init, + file, + disk_io_tune, + setup_fut: None, + inp: None, + recv_fut: Box::pin(futures_util::future::ready(Some(Err(Error::with_msg_no_trace( + format!("dummy"), + ))))), + done: false, + complete: false, + } + } +} + +impl Stream for FileContentStream4 { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break if self.complete { + panic!("poll_next on complete") + } else if self.done { + self.complete = true; + Ready(None) + } else { + match self.fcs { + FCS4::Init => { + let read4 = read4::Read4::get(); + let fd = self.file.as_raw_fd(); + let buflen = self.disk_io_tune.read_buffer_len as u64; + let fut = read4.read(fd, buflen, self.disk_io_tune.read_queue_len); + self.setup_fut = Some(Box::pin(fut) as _); + self.fcs = FCS4::Setup; + continue; + } + FCS4::Setup => match self.setup_fut.as_mut().unwrap().poll_unpin(cx) { + Ready(k) => match k { + Ok(k) => { + self.setup_fut = None; + self.fcs = FCS4::Reading; + self.inp = Some(k); + // TODO + let rm = self.inp.as_mut().unwrap(); + let rm = unsafe { + std::mem::transmute::< + &mut mpsc::Receiver>, + &'static mut mpsc::Receiver>, + >(rm) + }; + self.recv_fut = Box::pin(rm.recv()) as _; + continue; + } + Err(e) => { + self.done = true; + let e = Error::with_msg_no_trace(format!("init failed {e:?}")); + Ready(Some(Err(e))) + } + }, + Pending => Pending, + }, + FCS4::Reading => match self.recv_fut.poll_unpin(cx) { + Ready(k) => match k { + Some(k) => match k { + Ok(k) => { + // TODO + let rm = self.inp.as_mut().unwrap(); + let rm = unsafe { + std::mem::transmute::< + &mut mpsc::Receiver>, + &'static mut mpsc::Receiver>, + >(rm) + }; + self.recv_fut = Box::pin(rm.recv()) as _; + let item = FileChunkRead { + buf: k.buf, + duration: Duration::from_millis(0), + }; + Ready(Some(Ok(item))) + } + Err(e) => { + self.done = true; + let e = Error::with_msg_no_trace(format!("init failed {e:?}")); + Ready(Some(Err(e))) + } + }, + None => { + self.done = true; + continue; + } + }, + Pending => Pending, + }, + } + }; + } + } +} + +pub fn file_content_stream( file: File, disk_io_tune: DiskIoTune, -) -> impl Stream> + Send { - warn!("file_content_stream_2 disk_io_tune {disk_io_tune:?}"); - FileContentStream2::new(file, disk_io_tune) +) -> Pin> + Send>> { + warn!("file_content_stream disk_io_tune {disk_io_tune:?}"); + match &disk_io_tune.read_sys { + ReadSys::TokioAsyncRead => { + let s = FileContentStream::new(file, disk_io_tune); + Box::pin(s) as Pin + Send>> + } + ReadSys::Read3 => { + let s = FileContentStream3::new(file, disk_io_tune); + Box::pin(s) as _ + } + ReadSys::Read4 => { + let s = FileContentStream4::new(file, disk_io_tune); + Box::pin(s) as _ + } + } } pub struct NeedMinBuffer { diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index bd53926..b1b080d 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -188,10 +188,7 @@ impl Stream for EventChunkerMultifile { let mut chunkers = vec![]; for of in ofs.files { if let Some(file) = of.file { - let inp = Box::pin(crate::file_content_stream_2( - file, - self.disk_io_tune.clone(), - )); + let inp = crate::file_content_stream(file, self.disk_io_tune.clone()); let chunker = EventChunker::from_event_boundary( inp, self.channel_config.clone(), diff --git a/disk/src/read3.rs b/disk/src/read3.rs index 1daa19b..28fed68 100644 --- a/disk/src/read3.rs +++ b/disk/src/read3.rs @@ -9,6 +9,8 @@ use tokio::sync::{mpsc, oneshot}; const DO_TRACE: bool = false; +static READ3: AtomicPtr = AtomicPtr::new(std::ptr::null_mut()); + pub struct ReadTask { fd: RawFd, pos: u64, @@ -90,78 +92,7 @@ impl Read3 { match rrx.recv() { Ok(mut jrx) => match jrx.blocking_recv() { Some(rt) => match self.rtx.send(jrx) { - Ok(_) => { - let ts1 = Instant::now(); - let mut prc = 0; - let fd = rt.fd; - let mut rpos = rt.pos; - let mut buf = BytesMut::with_capacity(rt.count as usize); - let mut writable = rt.count as usize; - let rr = unsafe { - loop { - if DO_TRACE { - trace!("do pread fd {fd} count {writable} offset {rpos} wid {wid}"); - } - let ec = libc::pread(fd, buf.as_mut_ptr() as _, writable, rpos as i64); - prc += 1; - if ec == -1 { - let errno = *libc::__errno_location(); - if errno == libc::EINVAL { - debug!("pread EOF fd {fd} count {writable} offset {rpos} wid {wid}"); - let rr = ReadResult { buf, eof: true }; - break Ok(rr); - } else { - warn!("pread ERROR errno {errno} fd {fd} count {writable} offset {rpos} wid {wid}"); - // TODO use a more structured error - let e = Error::with_msg_no_trace(format!( - "pread ERROR errno {errno} fd {fd} count {writable} offset {rpos} wid {wid}" - )); - break Err(e); - } - } else if ec == 0 { - debug!("pread EOF fd {fd} count {writable} offset {rpos} wid {wid} prc {prc}"); - let rr = ReadResult { buf, eof: true }; - break Ok(rr); - } else if ec > 0 { - if ec as usize > writable { - error!( - "pread TOOLARGE ec {ec} fd {fd} count {writable} offset {rpos} wid {wid} prc {prc}" - ); - break 'outer; - } else { - rpos += ec as u64; - writable -= ec as usize; - buf.set_len(buf.len() + (ec as usize)); - if writable == 0 { - let ts2 = Instant::now(); - let dur = ts2.duration_since(ts1); - let dms = 1e3 * dur.as_secs_f32(); - if DO_TRACE { - trace!( - "pread DONE ec {ec} fd {fd} wid {wid} prc {prc} dms {dms:.2}" - ); - } - let rr = ReadResult { buf, eof: false }; - break Ok(rr); - } - } - } else { - error!( - "pread UNEXPECTED ec {} fd {} count {} offset {rpos} wid {wid}", - ec, rt.fd, writable - ); - break 'outer; - } - } - }; - match rt.rescell.send(rr) { - Ok(_) => {} - Err(_) => { - self.can_not_publish.fetch_add(1, Ordering::AcqRel); - warn!("can not publish the read result wid {wid}"); - } - } - } + Ok(_) => self.read_worker_job(wid, rt), Err(e) => { error!("can not return the job receiver: wid {wid} {e}"); break 'outer; @@ -179,6 +110,71 @@ impl Read3 { } } } -} -static READ3: AtomicPtr = AtomicPtr::new(std::ptr::null_mut()); + fn read_worker_job(&self, wid: u32, rt: ReadTask) { + let ts1 = Instant::now(); + let mut prc = 0; + let fd = rt.fd; + let mut rpos = rt.pos; + let mut buf = BytesMut::with_capacity(rt.count as usize); + let mut writable = rt.count as usize; + let rr = loop { + if DO_TRACE { + trace!("do pread fd {fd} count {writable} offset {rpos} wid {wid}"); + } + let ec = unsafe { libc::pread(fd, buf.as_mut_ptr() as _, writable, rpos as i64) }; + prc += 1; + if ec == -1 { + let errno = unsafe { *libc::__errno_location() }; + if errno == libc::EINVAL { + debug!("pread EOF fd {fd} count {writable} offset {rpos} wid {wid}"); + let rr = ReadResult { buf, eof: true }; + break Ok(rr); + } else { + warn!("pread ERROR errno {errno} fd {fd} count {writable} offset {rpos} wid {wid}"); + // TODO use a more structured error + let e = Error::with_msg_no_trace(format!( + "pread ERROR errno {errno} fd {fd} count {writable} offset {rpos} wid {wid}" + )); + break Err(e); + } + } else if ec == 0 { + debug!("pread EOF fd {fd} count {writable} offset {rpos} wid {wid} prc {prc}"); + let rr = ReadResult { buf, eof: true }; + break Ok(rr); + } else if ec > 0 { + if ec as usize > writable { + error!("pread TOOLARGE ec {ec} fd {fd} count {writable} offset {rpos} wid {wid} prc {prc}"); + return; + } else { + rpos += ec as u64; + writable -= ec as usize; + unsafe { buf.set_len(buf.len() + (ec as usize)) }; + if writable == 0 { + let ts2 = Instant::now(); + let dur = ts2.duration_since(ts1); + let dms = 1e3 * dur.as_secs_f32(); + if DO_TRACE { + trace!("pread DONE ec {ec} fd {fd} wid {wid} prc {prc} dms {dms:.2}"); + } + let rr = ReadResult { buf, eof: false }; + break Ok(rr); + } + } + } else { + error!( + "pread UNEXPECTED ec {} fd {} count {} offset {rpos} wid {wid}", + ec, rt.fd, writable + ); + return; + } + }; + match rt.rescell.send(rr) { + Ok(_) => {} + Err(_) => { + self.can_not_publish.fetch_add(1, Ordering::AcqRel); + warn!("can not publish the read result wid {wid}"); + } + } + } +} diff --git a/disk/src/read4.rs b/disk/src/read4.rs new file mode 100644 index 0000000..5ef6cd0 --- /dev/null +++ b/disk/src/read4.rs @@ -0,0 +1,244 @@ +use bytes::BytesMut; +use err::Error; +use netpod::log::*; +use std::os::unix::prelude::RawFd; +use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; +use std::sync::Once; +use std::time::{Duration, Instant}; +use tokio::sync::mpsc; + +static READ4: AtomicPtr = AtomicPtr::new(std::ptr::null_mut()); + +const DO_TRACE: bool = false; + +pub struct ReadTask { + fd: RawFd, + buflen: u64, + read_queue_len: usize, + results: mpsc::Sender>, +} + +pub struct ReadResult { + pub buf: BytesMut, + pub eof: bool, +} + +pub struct Read4 { + jobs_tx: mpsc::Sender, + rtx: crossbeam::channel::Sender>, + threads_max: AtomicUsize, + can_not_publish: AtomicUsize, +} + +impl Read4 { + pub fn get() -> &'static Self { + static INIT: Once = Once::new(); + INIT.call_once(|| { + let (jtx, jrx) = mpsc::channel(512); + let (rtx, rrx) = crossbeam::channel::bounded(32); + let read4 = Read4 { + jobs_tx: jtx, + rtx, + threads_max: AtomicUsize::new(32), + can_not_publish: AtomicUsize::new(0), + }; + let b = Box::new(read4); + let ptr = Box::into_raw(b); + READ4.store(ptr, Ordering::Release); + let ptr = READ4.load(Ordering::Acquire); + let h = unsafe { &*ptr }; + if let Err(_) = h.rtx.send(jrx) { + error!("Read4 INIT: can not enqueue main job reader"); + } + for wid in 0..16 { + let rrx = rrx.clone(); + tokio::task::spawn_blocking(move || h.read_worker(wid, rrx)); + } + }); + let ptr = READ4.load(Ordering::Acquire); + unsafe { &*ptr } + } + + pub fn threads_max(&self) -> usize { + self.threads_max.load(Ordering::Acquire) + } + + pub fn set_threads_max(&self, max: usize) { + self.threads_max.store(max, Ordering::Release); + } + + pub async fn read( + &self, + fd: RawFd, + buflen: u64, + read_queue_len: usize, + ) -> Result>, Error> { + let (tx, rx) = mpsc::channel(32); + let rt = ReadTask { + fd, + buflen, + read_queue_len, + results: tx, + }; + match self.jobs_tx.send(rt).await { + Ok(_) => Ok(rx), + Err(e) => Err(Error::with_msg(format!("can not send read job task: {e}"))), + } + } + + fn read_worker(&self, wid: u32, rrx: crossbeam::channel::Receiver>) { + loop { + while wid as usize >= self.threads_max.load(Ordering::Acquire) { + std::thread::sleep(Duration::from_millis(4000)); + } + match rrx.recv() { + Ok(mut jrx) => match jrx.blocking_recv() { + Some(rt) => match self.rtx.send(jrx) { + Ok(_) => self.read_worker_job(wid, rt), + Err(e) => { + error!("can not return the job receiver: wid {wid} {e}"); + return; + } + }, + None => { + let _ = self.rtx.send(jrx); + return; + } + }, + Err(e) => { + error!("read_worker sees: wid {wid} {e}"); + return; + } + } + } + } + + fn read_worker_job(&self, wid: u32, rt: ReadTask) { + let fd = rt.fd; + let ec = unsafe { libc::lseek(fd, 0, libc::SEEK_CUR) }; + if ec == -1 { + let errno = unsafe { *libc::__errno_location() }; + let msg = format!("seek error wid {wid} fd {fd} errno {errno}"); + error!("{}", msg); + let e = Error::with_msg_no_trace(msg); + match rt.results.blocking_send(Err(e)) { + Ok(_) => {} + Err(_) => { + self.can_not_publish.fetch_add(1, Ordering::AcqRel); + error!("Can not publish error"); + } + } + return; + } + if false { + let ec = unsafe { libc::madvise(fd, 0, libc::SEEK_CUR) }; + if ec == -1 { + let errno = unsafe { *libc::__errno_location() }; + let msg = format!("seek error wid {wid} fd {fd} errno {errno}"); + error!("{}", msg); + let e = Error::with_msg_no_trace(msg); + match rt.results.blocking_send(Err(e)) { + Ok(_) => {} + Err(_) => { + self.can_not_publish.fetch_add(1, Ordering::AcqRel); + error!("Can not publish error"); + } + } + return; + } + } + let mut rpos = ec as u64; + let mut apos = rpos / rt.buflen * rt.buflen; + let mut prc = 0; + loop { + let ts1 = Instant::now(); + while apos < rpos + rt.read_queue_len as u64 * rt.buflen { + if DO_TRACE { + trace!("READAHEAD wid {wid} fd {fd} apos {apos}"); + } + let n = unsafe { libc::readahead(fd, apos as _, rt.buflen as _) }; + if n == -1 { + let errno = unsafe { *libc::__errno_location() }; + let msg = format!("READAHEAD ERROR wid {wid} errno {errno} fd {fd} apos {apos}"); + warn!("{}", msg); + // TODO use a more structured error + let e = Error::with_msg_no_trace(msg); + match rt.results.blocking_send(Err(e)) { + Ok(_) => {} + Err(_) => { + self.can_not_publish.fetch_add(1, Ordering::AcqRel); + warn!("can not publish the read result wid {wid}"); + } + } + } else { + apos += rt.buflen; + } + } + if DO_TRACE { + trace!("READ wid {wid} fd {fd} rpos {rpos}"); + } + let mut buf = BytesMut::with_capacity(rt.buflen as usize); + let bufptr = buf.as_mut_ptr() as _; + let buflen = buf.capacity() as _; + let ec = unsafe { libc::read(fd, bufptr, buflen) }; + prc += 1; + if ec == -1 { + let errno = unsafe { *libc::__errno_location() }; + { + let msg = format!("READ ERROR wid {wid} errno {errno} fd {fd} offset {rpos}"); + warn!("{}", msg); + // TODO use a more structured error + let e = Error::with_msg_no_trace(msg); + match rt.results.blocking_send(Err(e)) { + Ok(_) => {} + Err(_) => { + self.can_not_publish.fetch_add(1, Ordering::AcqRel); + warn!("can not publish the read result wid {wid}"); + return; + } + } + } + } else if ec == 0 { + debug!("READ EOF wid {wid} prc {prc} fd {fd} offset {rpos} prc {prc}"); + let rr = ReadResult { buf, eof: true }; + match rt.results.blocking_send(Ok(rr)) { + Ok(_) => {} + Err(_) => { + self.can_not_publish.fetch_add(1, Ordering::AcqRel); + warn!("can not publish the read result wid {wid}"); + return; + } + } + return; + } else if ec > 0 { + if ec as usize > buf.capacity() { + error!("READ TOOLARGE wid {wid} ec {ec} fd {fd} offset {rpos} prc {prc}"); + return; + } else { + rpos += ec as u64; + unsafe { buf.set_len(buf.len() + (ec as usize)) }; + { + let ts2 = Instant::now(); + let dur = ts2.duration_since(ts1); + let dms = 1e3 * dur.as_secs_f32(); + if DO_TRACE { + trace!("READ DONE wid {wid} ec {ec} fd {fd} prc {prc} dms {dms:.2}"); + } + let rr = ReadResult { buf, eof: false }; + match rt.results.blocking_send(Ok(rr)) { + Ok(_) => {} + Err(_) => { + self.can_not_publish.fetch_add(1, Ordering::AcqRel); + warn!("can not publish the read result wid {wid}"); + return; + } + } + } + } + } else { + error!("READ UNEXPECTED wid {wid} ec {ec} fd {fd} offset {rpos}"); + return; + } + } + } +} diff --git a/httpret/src/download.rs b/httpret/src/download.rs index 462f8fa..2d36f88 100644 --- a/httpret/src/download.rs +++ b/httpret/src/download.rs @@ -1,11 +1,10 @@ -use std::pin::Pin; - use crate::err::Error; use crate::response; -use futures_util::{Stream, TryStreamExt}; +use futures_util::TryStreamExt; use http::{Method, StatusCode}; use hyper::{Body, Request, Response}; -use netpod::{get_url_query_pairs, DiskIoTune, FromUrl, NodeConfigCached, ReadSys}; +use netpod::log::*; +use netpod::{get_url_query_pairs, DiskIoTune, FromUrl, NodeConfigCached}; use url::Url; #[derive(Clone, Debug)] @@ -67,18 +66,12 @@ impl DownloadHandler { }; let url = url::Url::parse(&format!("http://dummy{}", head.uri))?; let query = DownloadQuery::from_url(&url)?; - let file = tokio::fs::OpenOptions::new().read(true).open(base.join(p2)).await?; - let s = match query.disk_io_tune.read_sys { - ReadSys::TokioAsyncRead => { - let s = disk::file_content_stream(file, query.disk_io_tune.clone()).map_ok(|x| x.into_buf()); - Box::pin(s) as Pin + Send>> - } - ReadSys::Read3 => { - let s = disk::file_content_stream_2(file, query.disk_io_tune.clone()).map_ok(|x| x.into_buf()); - Box::pin(s) as _ - } - }; - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::wrap_stream(s))?) + // TODO wrap file operation to return a better error. + let pp = base.join(p2); + info!("Try to open {pp:?}"); + let file = tokio::fs::OpenOptions::new().read(true).open(pp).await?; + let s = disk::file_content_stream(file, query.disk_io_tune.clone()).map_ok(|x| x.into_buf()); + Ok(response(StatusCode::OK).body(Body::wrap_stream(s))?) } pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 8bfcc09..980b653 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -1336,6 +1336,7 @@ impl Default for FileIoBufferSize { pub enum ReadSys { TokioAsyncRead, Read3, + Read4, } impl ReadSys { @@ -1350,6 +1351,8 @@ impl From<&str> for ReadSys { Self::TokioAsyncRead } else if k == "Read3" { Self::Read3 + } else if k == "Read4" { + Self::Read4 } else { Self::default() }