WIP Read3
This commit is contained in:
@@ -17,6 +17,7 @@ tokio = { version = "1.11.0", features = ["rt-multi-thread", "io-util", "net", "
|
||||
tokio-stream = {version = "0.1.5", features = ["fs"]}
|
||||
hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] }
|
||||
async-channel = "1.6"
|
||||
crossbeam = "0.8"
|
||||
bytes = "1.0.1"
|
||||
crc32fast = "1.2.1"
|
||||
arrayref = "0.3.6"
|
||||
|
||||
130
disk/src/disk.rs
130
disk/src/disk.rs
@@ -3,18 +3,22 @@ use bytes::{Bytes, BytesMut};
|
||||
use err::Error;
|
||||
use futures_core::Stream;
|
||||
use futures_util::future::FusedFuture;
|
||||
use futures_util::StreamExt;
|
||||
use futures_util::{FutureExt, StreamExt, TryFutureExt};
|
||||
use netpod::histo::HistoLog2;
|
||||
use netpod::{log::*, FileIoBufferSize};
|
||||
use netpod::{ChannelConfig, Node, Shape};
|
||||
use readat::ReadResult;
|
||||
use std::collections::VecDeque;
|
||||
use std::future::Future;
|
||||
use std::io::SeekFrom;
|
||||
use std::os::unix::prelude::AsRawFd;
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{fmt, mem};
|
||||
use tokio::fs::{File, OpenOptions};
|
||||
use tokio::io::{AsyncRead, ReadBuf};
|
||||
use tokio::io::{AsyncRead, AsyncSeekExt, ReadBuf};
|
||||
|
||||
pub mod agg;
|
||||
#[cfg(test)]
|
||||
@@ -34,6 +38,7 @@ pub mod index;
|
||||
pub mod merge;
|
||||
pub mod paths;
|
||||
pub mod raw;
|
||||
pub mod readat;
|
||||
pub mod streamlog;
|
||||
|
||||
// TODO transform this into a self-test or remove.
|
||||
@@ -269,6 +274,127 @@ pub fn file_content_stream(
|
||||
FileContentStream::new(file, file_io_buffer_size)
|
||||
}
|
||||
|
||||
enum FCS2 {
|
||||
GetPosition,
|
||||
Reading,
|
||||
}
|
||||
|
||||
enum ReadStep {
|
||||
Fut(Pin<Box<dyn Future<Output = Result<ReadResult, Error>> + Send>>),
|
||||
Res(Result<ReadResult, Error>),
|
||||
}
|
||||
|
||||
pub struct FileContentStream2 {
|
||||
fcs2: FCS2,
|
||||
file: Pin<Box<File>>,
|
||||
file_pos: u64,
|
||||
file_io_buffer_size: FileIoBufferSize,
|
||||
get_position_fut: Pin<Box<dyn Future<Output = Result<u64, Error>> + Send>>,
|
||||
reads: VecDeque<ReadStep>,
|
||||
nlog: usize,
|
||||
done: bool,
|
||||
complete: bool,
|
||||
}
|
||||
|
||||
impl FileContentStream2 {
|
||||
pub fn new(file: File, file_io_buffer_size: FileIoBufferSize) -> Self {
|
||||
let mut file = Box::pin(file);
|
||||
let ffr = unsafe {
|
||||
let ffr = Pin::get_unchecked_mut(file.as_mut());
|
||||
std::mem::transmute::<&mut File, &'static mut File>(ffr)
|
||||
};
|
||||
let ff = ffr
|
||||
.seek(SeekFrom::Current(0))
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("Seek error")));
|
||||
Self {
|
||||
fcs2: FCS2::GetPosition,
|
||||
file,
|
||||
file_pos: 0,
|
||||
file_io_buffer_size,
|
||||
get_position_fut: Box::pin(ff),
|
||||
reads: VecDeque::new(),
|
||||
nlog: 0,
|
||||
done: false,
|
||||
complete: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for FileContentStream2 {
|
||||
type Item = Result<FileChunkRead, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
loop {
|
||||
break if self.complete {
|
||||
panic!("poll_next on complete")
|
||||
} else if self.done {
|
||||
self.complete = true;
|
||||
Ready(None)
|
||||
} else {
|
||||
match self.fcs2 {
|
||||
FCS2::GetPosition => match self.get_position_fut.poll_unpin(cx) {
|
||||
Ready(Ok(k)) => {
|
||||
self.file_pos = k;
|
||||
continue;
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
self.done = true;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
Pending => Pending,
|
||||
},
|
||||
FCS2::Reading => {
|
||||
// TODO Keep the read queue full.
|
||||
// TODO Do not add more reads when EOF is encountered.
|
||||
while self.reads.len() < 4 {
|
||||
let count = self.file_io_buffer_size.bytes() as u64;
|
||||
let r3 = readat::Read3::get();
|
||||
let x = r3.read(self.file.as_raw_fd(), self.file_pos, count);
|
||||
self.reads.push_back(ReadStep::Fut(Box::pin(x)));
|
||||
self.file_pos += count;
|
||||
}
|
||||
// TODO must poll all futures to make progress... but if they resolve, must poll no more!
|
||||
// therefore, need some enum type for the pending futures list to also store the resolved ones.
|
||||
for e in &mut self.reads {
|
||||
match e {
|
||||
ReadStep::Fut(k) => match k.poll_unpin(cx) {
|
||||
Ready(k) => {
|
||||
*e = ReadStep::Res(k);
|
||||
}
|
||||
Pending => {}
|
||||
},
|
||||
ReadStep::Res(_k) => {}
|
||||
}
|
||||
}
|
||||
// TODO Check the front if something is ready.
|
||||
if let Some(ReadStep::Res(_)) = self.reads.front() {
|
||||
if let Some(ReadStep::Res(res)) = self.reads.pop_front() {
|
||||
// TODO check for error or return the read data.
|
||||
// TODO if read data contains EOF flag, raise EOF flag also in self,
|
||||
// and abort.
|
||||
// TODO make sure that everything runs stable even if this Stream is simply dropped
|
||||
// or read results are not waited for and channels or oneshots get dropped.
|
||||
} else {
|
||||
// TODO return error, this should never happen because we check before.
|
||||
}
|
||||
}
|
||||
// TODO handle case that self.reads is empty.
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn file_content_stream_2(
|
||||
file: File,
|
||||
file_io_buffer_size: FileIoBufferSize,
|
||||
) -> impl Stream<Item = Result<FileChunkRead, Error>> + Send {
|
||||
FileContentStream2::new(file, file_io_buffer_size)
|
||||
}
|
||||
|
||||
pub struct NeedMinBuffer {
|
||||
inp: Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>>,
|
||||
need_min: u32,
|
||||
|
||||
@@ -32,6 +32,7 @@ pub struct EventChunkerMultifile {
|
||||
expand: bool,
|
||||
do_decompress: bool,
|
||||
max_ts: u64,
|
||||
emit_count: usize,
|
||||
}
|
||||
|
||||
impl EventChunkerMultifile {
|
||||
@@ -65,6 +66,7 @@ impl EventChunkerMultifile {
|
||||
expand,
|
||||
do_decompress,
|
||||
max_ts: 0,
|
||||
emit_count: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -106,10 +108,25 @@ impl Stream for EventChunkerMultifile {
|
||||
Ready(Some(Err(e)))
|
||||
} else {
|
||||
self.max_ts = g;
|
||||
if true {
|
||||
info!("EventChunkerMultifile emit {} events", h.tss.len());
|
||||
const EMIT_COUNT_MAX: usize = 10;
|
||||
if self.emit_count < EMIT_COUNT_MAX {
|
||||
info!(
|
||||
"EventChunkerMultifile emit {}/{} events {}",
|
||||
self.emit_count,
|
||||
EMIT_COUNT_MAX,
|
||||
h.tss.len()
|
||||
);
|
||||
self.emit_count += 1;
|
||||
Ready(Some(k))
|
||||
} else if (self.range.beg % 1000000) / 1000 == 666 {
|
||||
// TODO move this test feature into some other query parameter.
|
||||
warn!("GENERATE ERROR FOR TESTING PURPOSE");
|
||||
let e = Error::with_msg(format!("Private-error-message"));
|
||||
let e = e.add_public_msg(format!("Public-error-message"));
|
||||
Ready(Some(Err(e)))
|
||||
} else {
|
||||
Ready(Some(k))
|
||||
}
|
||||
Ready(Some(k))
|
||||
}
|
||||
} else {
|
||||
Ready(Some(k))
|
||||
|
||||
@@ -238,6 +238,10 @@ pub fn make_local_event_blobs_stream(
|
||||
file_io_buffer_size: FileIoBufferSize,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<EventChunkerMultifile, Error> {
|
||||
info!("make_local_event_blobs_stream do_decompress {do_decompress} file_io_buffer_size {file_io_buffer_size:?}");
|
||||
if do_decompress {
|
||||
warn!("Possible issue: decompress central storage event blob stream");
|
||||
}
|
||||
let shape = match entry.to_shape() {
|
||||
Ok(k) => k,
|
||||
Err(e) => return Err(e)?,
|
||||
|
||||
153
disk/src/readat.rs
Normal file
153
disk/src/readat.rs
Normal file
@@ -0,0 +1,153 @@
|
||||
use bytes::BytesMut;
|
||||
use err::Error;
|
||||
use netpod::log::*;
|
||||
use std::os::unix::prelude::RawFd;
|
||||
use std::sync::atomic::{AtomicPtr, Ordering};
|
||||
use std::sync::Once;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
pub struct ReadTask {
|
||||
fd: RawFd,
|
||||
pos: u64,
|
||||
count: u64,
|
||||
rescell: oneshot::Sender<Result<ReadResult, Error>>,
|
||||
}
|
||||
|
||||
pub struct ReadResult {
|
||||
pub buf: BytesMut,
|
||||
pub eof: bool,
|
||||
}
|
||||
|
||||
/*
|
||||
Async code must be able to interact with the Read3 system via async methods.
|
||||
The async code must be able to enqueue a read in non-blocking fashion.
|
||||
Since the queue of pending read requests must be bounded, this must be able to async-block.
|
||||
*/
|
||||
pub struct Read3 {
|
||||
jobs_tx: mpsc::Sender<ReadTask>,
|
||||
rtx: crossbeam::channel::Sender<mpsc::Receiver<ReadTask>>,
|
||||
}
|
||||
|
||||
impl Read3 {
|
||||
pub fn get() -> &'static Self {
|
||||
static INIT: Once = Once::new();
|
||||
INIT.call_once(|| {
|
||||
let (jtx, jrx) = mpsc::channel(32);
|
||||
let (rtx, rrx) = crossbeam::channel::bounded(16);
|
||||
let read3 = Read3 { jobs_tx: jtx, rtx };
|
||||
let b = Box::new(read3);
|
||||
let ptr = Box::into_raw(b);
|
||||
READ3.store(ptr, Ordering::SeqCst);
|
||||
let ptr = READ3.load(Ordering::SeqCst);
|
||||
let h = unsafe { &*ptr };
|
||||
if let Err(_) = h.rtx.send(jrx) {
|
||||
error!("Read3 INIT: can not enqueue main job reader");
|
||||
}
|
||||
for _ in 0..2 {
|
||||
let rrx = rrx.clone();
|
||||
tokio::task::spawn_blocking(move || h.read_worker(rrx));
|
||||
}
|
||||
});
|
||||
let ptr = READ3.load(Ordering::SeqCst);
|
||||
unsafe { &*ptr }
|
||||
}
|
||||
|
||||
pub async fn read(&self, fd: RawFd, pos: u64, count: u64) -> Result<ReadResult, Error> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let rt = ReadTask {
|
||||
fd,
|
||||
pos,
|
||||
count,
|
||||
rescell: tx,
|
||||
};
|
||||
match self.jobs_tx.send(rt).await {
|
||||
Ok(_) => match rx.await {
|
||||
Ok(res) => res,
|
||||
Err(e) => Err(Error::with_msg(format!("can not receive read task result: {e}"))),
|
||||
},
|
||||
Err(e) => Err(Error::with_msg(format!("can not send read job task: {e}"))),
|
||||
}
|
||||
}
|
||||
|
||||
fn read_worker(&self, rrx: crossbeam::channel::Receiver<mpsc::Receiver<ReadTask>>) {
|
||||
'outer: loop {
|
||||
match rrx.recv() {
|
||||
Ok(mut jrx) => match jrx.blocking_recv() {
|
||||
Some(rt) => match self.rtx.send(jrx) {
|
||||
Ok(_) => {
|
||||
let mut buf = BytesMut::with_capacity(rt.count as usize);
|
||||
let mut writable = rt.count as usize;
|
||||
let rr = unsafe {
|
||||
loop {
|
||||
info!("do pread fd {} count {} offset {}", rt.fd, writable, rt.pos);
|
||||
let ec = libc::pread(rt.fd, buf.as_mut_ptr() as _, writable, rt.pos as i64);
|
||||
if ec == -1 {
|
||||
let errno = *libc::__errno_location();
|
||||
if errno == libc::EINVAL {
|
||||
info!("pread EOF fd {} count {} offset {}", rt.fd, writable, rt.pos);
|
||||
let rr = ReadResult { buf, eof: true };
|
||||
break Ok(rr);
|
||||
} else {
|
||||
warn!(
|
||||
"pread ERROR errno {} fd {} count {} offset {}",
|
||||
errno, rt.fd, writable, rt.pos
|
||||
);
|
||||
// TODO use a more structured error
|
||||
let e = Error::with_msg_no_trace(format!(
|
||||
"pread ERROR errno {} fd {} count {} offset {}",
|
||||
errno, rt.fd, writable, rt.pos
|
||||
));
|
||||
break Err(e);
|
||||
}
|
||||
} else if ec == 0 {
|
||||
info!("pread EOF fd {} count {} offset {}", rt.fd, writable, rt.pos);
|
||||
let rr = ReadResult { buf, eof: true };
|
||||
break Ok(rr);
|
||||
} else if ec > 0 {
|
||||
buf.set_len(ec as usize);
|
||||
if ec as usize > writable {
|
||||
error!(
|
||||
"pread TOOLARGE ec {} fd {} count {} offset {}",
|
||||
ec, rt.fd, writable, rt.pos
|
||||
);
|
||||
break 'outer;
|
||||
}
|
||||
writable -= ec as usize;
|
||||
if writable == 0 {
|
||||
let rr = ReadResult { buf, eof: false };
|
||||
break Ok(rr);
|
||||
}
|
||||
} else {
|
||||
error!(
|
||||
"pread UNEXPECTED ec {} fd {} count {} offset {}",
|
||||
ec, rt.fd, writable, rt.pos
|
||||
);
|
||||
break 'outer;
|
||||
}
|
||||
}
|
||||
};
|
||||
match rt.rescell.send(rr) {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
error!("can not publish the read result");
|
||||
break 'outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("can not return the job receiver: {e}");
|
||||
break 'outer;
|
||||
}
|
||||
},
|
||||
None => break 'outer,
|
||||
},
|
||||
Err(e) => {
|
||||
error!("read_worker sees: {e}");
|
||||
break 'outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static READ3: AtomicPtr<Read3> = AtomicPtr::new(std::ptr::null_mut());
|
||||
Reference in New Issue
Block a user