Try specific read

This commit is contained in:
Dominik Werder
2022-03-09 20:34:24 +01:00
parent e9b87bf9fa
commit 04def20be3
6 changed files with 483 additions and 123 deletions

View File

@@ -17,6 +17,7 @@ pub mod merge;
pub mod paths;
pub mod raw;
pub mod read3;
pub mod read4;
pub mod streamlog;
use crate::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE};
@@ -26,9 +27,8 @@ use futures_core::Stream;
use futures_util::future::FusedFuture;
use futures_util::{FutureExt, StreamExt, TryFutureExt};
use netpod::histo::HistoLog2;
use netpod::log::*;
use netpod::{log::*, ReadSys};
use netpod::{ChannelConfig, DiskIoTune, Node, Shape};
use read3::ReadResult;
use std::collections::VecDeque;
use std::future::Future;
use std::io::SeekFrom;
@@ -40,6 +40,7 @@ use std::time::{Duration, Instant};
use std::{fmt, mem};
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncRead, AsyncSeekExt, ReadBuf};
use tokio::sync::mpsc;
// TODO transform this into a self-test or remove.
pub async fn read_test_1(query: &netpod::AggQuerySingleChannel, node: Node) -> Result<netpod::BodyStream, Error> {
@@ -258,39 +259,31 @@ impl Stream for FileContentStream {
}
}
pub fn file_content_stream(
file: File,
disk_io_tune: DiskIoTune,
) -> impl Stream<Item = Result<FileChunkRead, Error>> + Send {
warn!("file_content_stream disk_io_tune {disk_io_tune:?}");
FileContentStream::new(file, disk_io_tune)
}
enum FCS2 {
enum FCS3 {
GetPosition,
ReadingSimple,
Reading,
}
enum ReadStep {
Fut(Pin<Box<dyn Future<Output = Result<ReadResult, Error>> + Send>>),
Res(Result<ReadResult, Error>),
Fut(Pin<Box<dyn Future<Output = Result<read3::ReadResult, Error>> + Send>>),
Res(Result<read3::ReadResult, Error>),
}
pub struct FileContentStream2 {
fcs2: FCS2,
pub struct FileContentStream3 {
fcs: FCS3,
file: Pin<Box<File>>,
file_pos: u64,
eof: bool,
disk_io_tune: DiskIoTune,
get_position_fut: Pin<Box<dyn Future<Output = Result<u64, Error>> + Send>>,
read_fut: Pin<Box<dyn Future<Output = Result<ReadResult, Error>> + Send>>,
read_fut: Pin<Box<dyn Future<Output = Result<read3::ReadResult, Error>> + Send>>,
reads: VecDeque<ReadStep>,
done: bool,
complete: bool,
}
impl FileContentStream2 {
impl FileContentStream3 {
pub fn new(file: File, disk_io_tune: DiskIoTune) -> Self {
let mut file = Box::pin(file);
let ffr = unsafe {
@@ -301,7 +294,7 @@ impl FileContentStream2 {
.seek(SeekFrom::Current(0))
.map_err(|_| Error::with_msg_no_trace(format!("Seek error")));
Self {
fcs2: FCS2::GetPosition,
fcs: FCS3::GetPosition,
file,
file_pos: 0,
eof: false,
@@ -317,7 +310,7 @@ impl FileContentStream2 {
}
}
impl Stream for FileContentStream2 {
impl Stream for FileContentStream3 {
type Item = Result<FileChunkRead, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
@@ -329,8 +322,8 @@ impl Stream for FileContentStream2 {
self.complete = true;
Ready(None)
} else {
match self.fcs2 {
FCS2::GetPosition => match self.get_position_fut.poll_unpin(cx) {
match self.fcs {
FCS3::GetPosition => match self.get_position_fut.poll_unpin(cx) {
Ready(Ok(k)) => {
info!("current file pos: {k}");
self.file_pos = k;
@@ -339,9 +332,9 @@ impl Stream for FileContentStream2 {
let count = self.disk_io_tune.read_buffer_len as u64;
self.read_fut = Box::pin(read3::Read3::get().read(fd, self.file_pos, count));
self.file_pos += count;
self.fcs2 = FCS2::ReadingSimple;
self.fcs = FCS3::ReadingSimple;
} else {
self.fcs2 = FCS2::Reading;
self.fcs = FCS3::Reading;
}
continue;
}
@@ -351,7 +344,7 @@ impl Stream for FileContentStream2 {
}
Pending => Pending,
},
FCS2::ReadingSimple => match self.read_fut.poll_unpin(cx) {
FCS3::ReadingSimple => match self.read_fut.poll_unpin(cx) {
Ready(Ok(res)) => {
if res.eof {
let item = FileChunkRead {
@@ -378,7 +371,7 @@ impl Stream for FileContentStream2 {
}
Pending => Pending,
},
FCS2::Reading => {
FCS3::Reading => {
while !self.eof && self.reads.len() < self.disk_io_tune.read_queue_len {
let fd = self.file.as_raw_fd();
let pos = self.file_pos;
@@ -448,12 +441,146 @@ impl Stream for FileContentStream2 {
}
}
pub fn file_content_stream_2(
enum FCS4 {
Init,
Setup,
Reading,
}
pub struct FileContentStream4 {
fcs: FCS4,
file: Pin<Box<File>>,
disk_io_tune: DiskIoTune,
setup_fut:
Option<Pin<Box<dyn Future<Output = Result<mpsc::Receiver<Result<read4::ReadResult, Error>>, Error>> + Send>>>,
inp: Option<mpsc::Receiver<Result<read4::ReadResult, Error>>>,
recv_fut: Pin<Box<dyn Future<Output = Option<Result<read4::ReadResult, Error>>> + Send>>,
done: bool,
complete: bool,
}
impl FileContentStream4 {
pub fn new(file: File, disk_io_tune: DiskIoTune) -> Self {
let file = Box::pin(file);
Self {
fcs: FCS4::Init,
file,
disk_io_tune,
setup_fut: None,
inp: None,
recv_fut: Box::pin(futures_util::future::ready(Some(Err(Error::with_msg_no_trace(
format!("dummy"),
))))),
done: false,
complete: false,
}
}
}
impl Stream for FileContentStream4 {
type Item = Result<FileChunkRead, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break if self.complete {
panic!("poll_next on complete")
} else if self.done {
self.complete = true;
Ready(None)
} else {
match self.fcs {
FCS4::Init => {
let read4 = read4::Read4::get();
let fd = self.file.as_raw_fd();
let buflen = self.disk_io_tune.read_buffer_len as u64;
let fut = read4.read(fd, buflen, self.disk_io_tune.read_queue_len);
self.setup_fut = Some(Box::pin(fut) as _);
self.fcs = FCS4::Setup;
continue;
}
FCS4::Setup => match self.setup_fut.as_mut().unwrap().poll_unpin(cx) {
Ready(k) => match k {
Ok(k) => {
self.setup_fut = None;
self.fcs = FCS4::Reading;
self.inp = Some(k);
// TODO
let rm = self.inp.as_mut().unwrap();
let rm = unsafe {
std::mem::transmute::<
&mut mpsc::Receiver<Result<read4::ReadResult, Error>>,
&'static mut mpsc::Receiver<Result<read4::ReadResult, Error>>,
>(rm)
};
self.recv_fut = Box::pin(rm.recv()) as _;
continue;
}
Err(e) => {
self.done = true;
let e = Error::with_msg_no_trace(format!("init failed {e:?}"));
Ready(Some(Err(e)))
}
},
Pending => Pending,
},
FCS4::Reading => match self.recv_fut.poll_unpin(cx) {
Ready(k) => match k {
Some(k) => match k {
Ok(k) => {
// TODO
let rm = self.inp.as_mut().unwrap();
let rm = unsafe {
std::mem::transmute::<
&mut mpsc::Receiver<Result<read4::ReadResult, Error>>,
&'static mut mpsc::Receiver<Result<read4::ReadResult, Error>>,
>(rm)
};
self.recv_fut = Box::pin(rm.recv()) as _;
let item = FileChunkRead {
buf: k.buf,
duration: Duration::from_millis(0),
};
Ready(Some(Ok(item)))
}
Err(e) => {
self.done = true;
let e = Error::with_msg_no_trace(format!("init failed {e:?}"));
Ready(Some(Err(e)))
}
},
None => {
self.done = true;
continue;
}
},
Pending => Pending,
},
}
};
}
}
}
pub fn file_content_stream(
file: File,
disk_io_tune: DiskIoTune,
) -> impl Stream<Item = Result<FileChunkRead, Error>> + Send {
warn!("file_content_stream_2 disk_io_tune {disk_io_tune:?}");
FileContentStream2::new(file, disk_io_tune)
) -> Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>> {
warn!("file_content_stream disk_io_tune {disk_io_tune:?}");
match &disk_io_tune.read_sys {
ReadSys::TokioAsyncRead => {
let s = FileContentStream::new(file, disk_io_tune);
Box::pin(s) as Pin<Box<dyn Stream<Item = _> + Send>>
}
ReadSys::Read3 => {
let s = FileContentStream3::new(file, disk_io_tune);
Box::pin(s) as _
}
ReadSys::Read4 => {
let s = FileContentStream4::new(file, disk_io_tune);
Box::pin(s) as _
}
}
}
pub struct NeedMinBuffer {

View File

@@ -188,10 +188,7 @@ impl Stream for EventChunkerMultifile {
let mut chunkers = vec![];
for of in ofs.files {
if let Some(file) = of.file {
let inp = Box::pin(crate::file_content_stream_2(
file,
self.disk_io_tune.clone(),
));
let inp = crate::file_content_stream(file, self.disk_io_tune.clone());
let chunker = EventChunker::from_event_boundary(
inp,
self.channel_config.clone(),

View File

@@ -9,6 +9,8 @@ use tokio::sync::{mpsc, oneshot};
const DO_TRACE: bool = false;
static READ3: AtomicPtr<Read3> = AtomicPtr::new(std::ptr::null_mut());
pub struct ReadTask {
fd: RawFd,
pos: u64,
@@ -90,78 +92,7 @@ impl Read3 {
match rrx.recv() {
Ok(mut jrx) => match jrx.blocking_recv() {
Some(rt) => match self.rtx.send(jrx) {
Ok(_) => {
let ts1 = Instant::now();
let mut prc = 0;
let fd = rt.fd;
let mut rpos = rt.pos;
let mut buf = BytesMut::with_capacity(rt.count as usize);
let mut writable = rt.count as usize;
let rr = unsafe {
loop {
if DO_TRACE {
trace!("do pread fd {fd} count {writable} offset {rpos} wid {wid}");
}
let ec = libc::pread(fd, buf.as_mut_ptr() as _, writable, rpos as i64);
prc += 1;
if ec == -1 {
let errno = *libc::__errno_location();
if errno == libc::EINVAL {
debug!("pread EOF fd {fd} count {writable} offset {rpos} wid {wid}");
let rr = ReadResult { buf, eof: true };
break Ok(rr);
} else {
warn!("pread ERROR errno {errno} fd {fd} count {writable} offset {rpos} wid {wid}");
// TODO use a more structured error
let e = Error::with_msg_no_trace(format!(
"pread ERROR errno {errno} fd {fd} count {writable} offset {rpos} wid {wid}"
));
break Err(e);
}
} else if ec == 0 {
debug!("pread EOF fd {fd} count {writable} offset {rpos} wid {wid} prc {prc}");
let rr = ReadResult { buf, eof: true };
break Ok(rr);
} else if ec > 0 {
if ec as usize > writable {
error!(
"pread TOOLARGE ec {ec} fd {fd} count {writable} offset {rpos} wid {wid} prc {prc}"
);
break 'outer;
} else {
rpos += ec as u64;
writable -= ec as usize;
buf.set_len(buf.len() + (ec as usize));
if writable == 0 {
let ts2 = Instant::now();
let dur = ts2.duration_since(ts1);
let dms = 1e3 * dur.as_secs_f32();
if DO_TRACE {
trace!(
"pread DONE ec {ec} fd {fd} wid {wid} prc {prc} dms {dms:.2}"
);
}
let rr = ReadResult { buf, eof: false };
break Ok(rr);
}
}
} else {
error!(
"pread UNEXPECTED ec {} fd {} count {} offset {rpos} wid {wid}",
ec, rt.fd, writable
);
break 'outer;
}
}
};
match rt.rescell.send(rr) {
Ok(_) => {}
Err(_) => {
self.can_not_publish.fetch_add(1, Ordering::AcqRel);
warn!("can not publish the read result wid {wid}");
}
}
}
Ok(_) => self.read_worker_job(wid, rt),
Err(e) => {
error!("can not return the job receiver: wid {wid} {e}");
break 'outer;
@@ -179,6 +110,71 @@ impl Read3 {
}
}
}
}
static READ3: AtomicPtr<Read3> = AtomicPtr::new(std::ptr::null_mut());
fn read_worker_job(&self, wid: u32, rt: ReadTask) {
let ts1 = Instant::now();
let mut prc = 0;
let fd = rt.fd;
let mut rpos = rt.pos;
let mut buf = BytesMut::with_capacity(rt.count as usize);
let mut writable = rt.count as usize;
let rr = loop {
if DO_TRACE {
trace!("do pread fd {fd} count {writable} offset {rpos} wid {wid}");
}
let ec = unsafe { libc::pread(fd, buf.as_mut_ptr() as _, writable, rpos as i64) };
prc += 1;
if ec == -1 {
let errno = unsafe { *libc::__errno_location() };
if errno == libc::EINVAL {
debug!("pread EOF fd {fd} count {writable} offset {rpos} wid {wid}");
let rr = ReadResult { buf, eof: true };
break Ok(rr);
} else {
warn!("pread ERROR errno {errno} fd {fd} count {writable} offset {rpos} wid {wid}");
// TODO use a more structured error
let e = Error::with_msg_no_trace(format!(
"pread ERROR errno {errno} fd {fd} count {writable} offset {rpos} wid {wid}"
));
break Err(e);
}
} else if ec == 0 {
debug!("pread EOF fd {fd} count {writable} offset {rpos} wid {wid} prc {prc}");
let rr = ReadResult { buf, eof: true };
break Ok(rr);
} else if ec > 0 {
if ec as usize > writable {
error!("pread TOOLARGE ec {ec} fd {fd} count {writable} offset {rpos} wid {wid} prc {prc}");
return;
} else {
rpos += ec as u64;
writable -= ec as usize;
unsafe { buf.set_len(buf.len() + (ec as usize)) };
if writable == 0 {
let ts2 = Instant::now();
let dur = ts2.duration_since(ts1);
let dms = 1e3 * dur.as_secs_f32();
if DO_TRACE {
trace!("pread DONE ec {ec} fd {fd} wid {wid} prc {prc} dms {dms:.2}");
}
let rr = ReadResult { buf, eof: false };
break Ok(rr);
}
}
} else {
error!(
"pread UNEXPECTED ec {} fd {} count {} offset {rpos} wid {wid}",
ec, rt.fd, writable
);
return;
}
};
match rt.rescell.send(rr) {
Ok(_) => {}
Err(_) => {
self.can_not_publish.fetch_add(1, Ordering::AcqRel);
warn!("can not publish the read result wid {wid}");
}
}
}
}

244
disk/src/read4.rs Normal file
View File

@@ -0,0 +1,244 @@
use bytes::BytesMut;
use err::Error;
use netpod::log::*;
use std::os::unix::prelude::RawFd;
use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use std::sync::Once;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
static READ4: AtomicPtr<Read4> = AtomicPtr::new(std::ptr::null_mut());
const DO_TRACE: bool = false;
pub struct ReadTask {
fd: RawFd,
buflen: u64,
read_queue_len: usize,
results: mpsc::Sender<Result<ReadResult, Error>>,
}
pub struct ReadResult {
pub buf: BytesMut,
pub eof: bool,
}
pub struct Read4 {
jobs_tx: mpsc::Sender<ReadTask>,
rtx: crossbeam::channel::Sender<mpsc::Receiver<ReadTask>>,
threads_max: AtomicUsize,
can_not_publish: AtomicUsize,
}
impl Read4 {
pub fn get() -> &'static Self {
static INIT: Once = Once::new();
INIT.call_once(|| {
let (jtx, jrx) = mpsc::channel(512);
let (rtx, rrx) = crossbeam::channel::bounded(32);
let read4 = Read4 {
jobs_tx: jtx,
rtx,
threads_max: AtomicUsize::new(32),
can_not_publish: AtomicUsize::new(0),
};
let b = Box::new(read4);
let ptr = Box::into_raw(b);
READ4.store(ptr, Ordering::Release);
let ptr = READ4.load(Ordering::Acquire);
let h = unsafe { &*ptr };
if let Err(_) = h.rtx.send(jrx) {
error!("Read4 INIT: can not enqueue main job reader");
}
for wid in 0..16 {
let rrx = rrx.clone();
tokio::task::spawn_blocking(move || h.read_worker(wid, rrx));
}
});
let ptr = READ4.load(Ordering::Acquire);
unsafe { &*ptr }
}
pub fn threads_max(&self) -> usize {
self.threads_max.load(Ordering::Acquire)
}
pub fn set_threads_max(&self, max: usize) {
self.threads_max.store(max, Ordering::Release);
}
pub async fn read(
&self,
fd: RawFd,
buflen: u64,
read_queue_len: usize,
) -> Result<mpsc::Receiver<Result<ReadResult, Error>>, Error> {
let (tx, rx) = mpsc::channel(32);
let rt = ReadTask {
fd,
buflen,
read_queue_len,
results: tx,
};
match self.jobs_tx.send(rt).await {
Ok(_) => Ok(rx),
Err(e) => Err(Error::with_msg(format!("can not send read job task: {e}"))),
}
}
fn read_worker(&self, wid: u32, rrx: crossbeam::channel::Receiver<mpsc::Receiver<ReadTask>>) {
loop {
while wid as usize >= self.threads_max.load(Ordering::Acquire) {
std::thread::sleep(Duration::from_millis(4000));
}
match rrx.recv() {
Ok(mut jrx) => match jrx.blocking_recv() {
Some(rt) => match self.rtx.send(jrx) {
Ok(_) => self.read_worker_job(wid, rt),
Err(e) => {
error!("can not return the job receiver: wid {wid} {e}");
return;
}
},
None => {
let _ = self.rtx.send(jrx);
return;
}
},
Err(e) => {
error!("read_worker sees: wid {wid} {e}");
return;
}
}
}
}
fn read_worker_job(&self, wid: u32, rt: ReadTask) {
let fd = rt.fd;
let ec = unsafe { libc::lseek(fd, 0, libc::SEEK_CUR) };
if ec == -1 {
let errno = unsafe { *libc::__errno_location() };
let msg = format!("seek error wid {wid} fd {fd} errno {errno}");
error!("{}", msg);
let e = Error::with_msg_no_trace(msg);
match rt.results.blocking_send(Err(e)) {
Ok(_) => {}
Err(_) => {
self.can_not_publish.fetch_add(1, Ordering::AcqRel);
error!("Can not publish error");
}
}
return;
}
if false {
let ec = unsafe { libc::madvise(fd, 0, libc::SEEK_CUR) };
if ec == -1 {
let errno = unsafe { *libc::__errno_location() };
let msg = format!("seek error wid {wid} fd {fd} errno {errno}");
error!("{}", msg);
let e = Error::with_msg_no_trace(msg);
match rt.results.blocking_send(Err(e)) {
Ok(_) => {}
Err(_) => {
self.can_not_publish.fetch_add(1, Ordering::AcqRel);
error!("Can not publish error");
}
}
return;
}
}
let mut rpos = ec as u64;
let mut apos = rpos / rt.buflen * rt.buflen;
let mut prc = 0;
loop {
let ts1 = Instant::now();
while apos < rpos + rt.read_queue_len as u64 * rt.buflen {
if DO_TRACE {
trace!("READAHEAD wid {wid} fd {fd} apos {apos}");
}
let n = unsafe { libc::readahead(fd, apos as _, rt.buflen as _) };
if n == -1 {
let errno = unsafe { *libc::__errno_location() };
let msg = format!("READAHEAD ERROR wid {wid} errno {errno} fd {fd} apos {apos}");
warn!("{}", msg);
// TODO use a more structured error
let e = Error::with_msg_no_trace(msg);
match rt.results.blocking_send(Err(e)) {
Ok(_) => {}
Err(_) => {
self.can_not_publish.fetch_add(1, Ordering::AcqRel);
warn!("can not publish the read result wid {wid}");
}
}
} else {
apos += rt.buflen;
}
}
if DO_TRACE {
trace!("READ wid {wid} fd {fd} rpos {rpos}");
}
let mut buf = BytesMut::with_capacity(rt.buflen as usize);
let bufptr = buf.as_mut_ptr() as _;
let buflen = buf.capacity() as _;
let ec = unsafe { libc::read(fd, bufptr, buflen) };
prc += 1;
if ec == -1 {
let errno = unsafe { *libc::__errno_location() };
{
let msg = format!("READ ERROR wid {wid} errno {errno} fd {fd} offset {rpos}");
warn!("{}", msg);
// TODO use a more structured error
let e = Error::with_msg_no_trace(msg);
match rt.results.blocking_send(Err(e)) {
Ok(_) => {}
Err(_) => {
self.can_not_publish.fetch_add(1, Ordering::AcqRel);
warn!("can not publish the read result wid {wid}");
return;
}
}
}
} else if ec == 0 {
debug!("READ EOF wid {wid} prc {prc} fd {fd} offset {rpos} prc {prc}");
let rr = ReadResult { buf, eof: true };
match rt.results.blocking_send(Ok(rr)) {
Ok(_) => {}
Err(_) => {
self.can_not_publish.fetch_add(1, Ordering::AcqRel);
warn!("can not publish the read result wid {wid}");
return;
}
}
return;
} else if ec > 0 {
if ec as usize > buf.capacity() {
error!("READ TOOLARGE wid {wid} ec {ec} fd {fd} offset {rpos} prc {prc}");
return;
} else {
rpos += ec as u64;
unsafe { buf.set_len(buf.len() + (ec as usize)) };
{
let ts2 = Instant::now();
let dur = ts2.duration_since(ts1);
let dms = 1e3 * dur.as_secs_f32();
if DO_TRACE {
trace!("READ DONE wid {wid} ec {ec} fd {fd} prc {prc} dms {dms:.2}");
}
let rr = ReadResult { buf, eof: false };
match rt.results.blocking_send(Ok(rr)) {
Ok(_) => {}
Err(_) => {
self.can_not_publish.fetch_add(1, Ordering::AcqRel);
warn!("can not publish the read result wid {wid}");
return;
}
}
}
}
} else {
error!("READ UNEXPECTED wid {wid} ec {ec} fd {fd} offset {rpos}");
return;
}
}
}
}