Add io query parameters and file download test

This commit is contained in:
Dominik Werder
2022-03-04 19:37:54 +01:00
parent d67608fabc
commit e9b87bf9fa
20 changed files with 579 additions and 183 deletions

View File

@@ -1,7 +1,6 @@
use crate::eventblobs::EventChunkerMultifile;
use crate::eventchunker::EventChunkerConf;
use netpod::{test_data_base_path_databuffer, FileIoBufferSize};
use netpod::{timeunits::*, SfDatabuffer};
use netpod::{test_data_base_path_databuffer, timeunits::*, SfDatabuffer};
use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, NanoRange, Nanos, Node, ScalarType, Shape};
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
@@ -58,13 +57,15 @@ async fn agg_x_dim_0_inner() {
let ts2 = ts1 + HOUR * 24;
let range = NanoRange { beg: ts1, end: ts2 };
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
let file_io_buffer_size = FileIoBufferSize::new(query.buffer_size as usize);
// TODO let upstream already provide DiskIoTune:
let mut disk_io_tune = netpod::DiskIoTune::default_for_testing();
disk_io_tune.read_buffer_len = query.buffer_size as usize;
let fut1 = EventChunkerMultifile::new(
range.clone(),
query.channel_config.clone(),
node.clone(),
0,
file_io_buffer_size,
disk_io_tune,
event_chunker_conf,
false,
true,
@@ -110,13 +111,15 @@ async fn agg_x_dim_1_inner() {
let ts2 = ts1 + HOUR * 24;
let range = NanoRange { beg: ts1, end: ts2 };
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
let file_io_buffer_size = FileIoBufferSize::new(query.buffer_size as usize);
// TODO let upstream already provide DiskIoTune:
let mut disk_io_tune = netpod::DiskIoTune::default_for_testing();
disk_io_tune.read_buffer_len = query.buffer_size as usize;
let fut1 = super::eventblobs::EventChunkerMultifile::new(
range.clone(),
query.channel_config.clone(),
node.clone(),
0,
file_io_buffer_size,
disk_io_tune,
event_chunker_conf,
false,
true,

View File

@@ -111,11 +111,14 @@ impl ChannelExecFunction for BinnedBinaryChannelExec {
debug!(
"BinnedBinaryChannelExec no covering range for prebinned, merge from remotes instead {range:?}"
);
// TODO let BinnedQuery provide the DiskIoTune.
let mut disk_io_tune = netpod::DiskIoTune::default();
disk_io_tune.read_buffer_len = self.query.disk_io_buffer_size() as usize;
let evq = RawEventsQuery {
channel: self.query.channel().clone(),
range: self.query.range().clone(),
agg_kind: self.query.agg_kind().clone(),
disk_io_buffer_size: self.query.disk_io_buffer_size(),
disk_io_tune,
do_decompress: true,
};
let x_bin_count = x_bin_count(&shape, self.query.agg_kind());
@@ -360,11 +363,14 @@ impl ChannelExecFunction for BinnedJsonChannelExec {
}
Ok(None) => {
debug!("BinnedJsonChannelExec no covering range for prebinned, merge from remotes instead {range:?}");
// TODO let BinnedQuery provide the DiskIoTune.
let mut disk_io_tune = netpod::DiskIoTune::default();
disk_io_tune.read_buffer_len = self.query.disk_io_buffer_size() as usize;
let evq = RawEventsQuery {
channel: self.query.channel().clone(),
range: self.query.range().clone(),
agg_kind: self.query.agg_kind().clone(),
disk_io_buffer_size: self.query.disk_io_buffer_size(),
disk_io_tune,
do_decompress: true,
};
let x_bin_count = x_bin_count(&shape, self.query.agg_kind());

View File

@@ -105,11 +105,14 @@ where
Pin<Box<dyn Stream<Item = Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>> + Send>>,
Error,
> {
// TODO let PreBinnedQuery provide the tune:
let mut disk_io_tune = netpod::DiskIoTune::default();
disk_io_tune.read_buffer_len = self.query.disk_io_buffer_size();
let evq = RawEventsQuery {
channel: self.query.channel().clone(),
range: self.query.patch().patch_range(),
agg_kind: self.query.agg_kind().clone(),
disk_io_buffer_size: self.query.disk_io_buffer_size(),
disk_io_tune,
do_decompress: true,
};
if self.query.patch().patch_t_len() % self.query.patch().bin_t_len() != 0 {

View File

@@ -274,11 +274,14 @@ impl ChannelExecFunction for PlainEvents {
let _ = byte_order;
let _ = event_value_shape;
let perf_opts = PerfOpts { inmem_bufcap: 4096 };
// TODO let upstream provide DiskIoTune
let mut disk_io_tune = netpod::DiskIoTune::default();
disk_io_tune.read_buffer_len = self.disk_io_buffer_size;
let evq = RawEventsQuery {
channel: self.channel,
range: self.range,
agg_kind: self.agg_kind,
disk_io_buffer_size: self.disk_io_buffer_size,
disk_io_tune,
do_decompress: true,
};
let s = MergedFromRemotes::<Identity<NTY>>::new(evq, perf_opts, self.node_config.node_config.cluster);
@@ -452,11 +455,14 @@ impl ChannelExecFunction for PlainEventsJson {
let _ = byte_order;
let _ = event_value_shape;
let perf_opts = PerfOpts { inmem_bufcap: 4096 };
// TODO let upstream provide DiskIoTune
let mut disk_io_tune = netpod::DiskIoTune::default();
disk_io_tune.read_buffer_len = self.disk_io_buffer_size;
let evq = RawEventsQuery {
channel: self.channel,
range: self.range,
agg_kind: self.agg_kind,
disk_io_buffer_size: self.disk_io_buffer_size,
disk_io_tune,
do_decompress: true,
};
let s = MergedFromRemotes::<ENP>::new(evq, perf_opts, self.node_config.node_config.cluster);

View File

@@ -1,25 +1,3 @@
use crate::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE};
use bytes::{Bytes, BytesMut};
use err::Error;
use futures_core::Stream;
use futures_util::future::FusedFuture;
use futures_util::{FutureExt, StreamExt, TryFutureExt};
use netpod::histo::HistoLog2;
use netpod::{log::*, FileIoBufferSize};
use netpod::{ChannelConfig, Node, Shape};
use readat::ReadResult;
use std::collections::VecDeque;
use std::future::Future;
use std::io::SeekFrom;
use std::os::unix::prelude::AsRawFd;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::{fmt, mem};
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncRead, AsyncSeekExt, ReadBuf};
pub mod agg;
#[cfg(test)]
pub mod aggtest;
@@ -38,9 +16,31 @@ pub mod index;
pub mod merge;
pub mod paths;
pub mod raw;
pub mod readat;
pub mod read3;
pub mod streamlog;
use crate::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE};
use bytes::{Bytes, BytesMut};
use err::Error;
use futures_core::Stream;
use futures_util::future::FusedFuture;
use futures_util::{FutureExt, StreamExt, TryFutureExt};
use netpod::histo::HistoLog2;
use netpod::log::*;
use netpod::{ChannelConfig, DiskIoTune, Node, Shape};
use read3::ReadResult;
use std::collections::VecDeque;
use std::future::Future;
use std::io::SeekFrom;
use std::os::unix::prelude::AsRawFd;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::{fmt, mem};
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncRead, AsyncSeekExt, ReadBuf};
// TODO transform this into a self-test or remove.
pub async fn read_test_1(query: &netpod::AggQuerySingleChannel, node: Node) -> Result<netpod::BodyStream, Error> {
let path = paths::datapath(query.timebin as u64, &query.channel_config, 0, &node);
@@ -152,20 +152,20 @@ unsafe impl Send for Fopen1 {}
pub struct FileChunkRead {
buf: BytesMut,
cap0: usize,
rem0: usize,
remmut0: usize,
duration: Duration,
}
impl FileChunkRead {
pub fn into_buf(self) -> BytesMut {
self.buf
}
}
impl fmt::Debug for FileChunkRead {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("FileChunkRead")
.field("buf.len", &self.buf.len())
.field("buf.cap", &self.buf.capacity())
.field("cap0", &self.cap0)
.field("rem0", &self.rem0)
.field("remmut0", &self.remmut0)
.field("duration", &self.duration)
.finish()
}
@@ -173,7 +173,7 @@ impl fmt::Debug for FileChunkRead {
pub struct FileContentStream {
file: File,
file_io_buffer_size: FileIoBufferSize,
disk_io_tune: DiskIoTune,
read_going: bool,
buf: BytesMut,
ts1: Instant,
@@ -183,10 +183,10 @@ pub struct FileContentStream {
}
impl FileContentStream {
pub fn new(file: File, file_io_buffer_size: FileIoBufferSize) -> Self {
pub fn new(file: File, disk_io_tune: DiskIoTune) -> Self {
Self {
file,
file_io_buffer_size,
disk_io_tune,
read_going: false,
buf: BytesMut::new(),
ts1: Instant::now(),
@@ -212,7 +212,7 @@ impl Stream for FileContentStream {
let mut buf = if !self.read_going {
self.ts1 = Instant::now();
let mut buf = BytesMut::new();
buf.resize(self.file_io_buffer_size.0, 0);
buf.resize(self.disk_io_tune.read_buffer_len, 0);
buf
} else {
mem::replace(&mut self.buf, BytesMut::new())
@@ -225,18 +225,12 @@ impl Stream for FileContentStream {
match pollres {
Ready(Ok(_)) => {
let nread = rb.filled().len();
let cap0 = rb.capacity();
let rem0 = rb.remaining();
let remmut0 = nread;
buf.truncate(nread);
self.read_going = false;
let ts2 = Instant::now();
if nread == 0 {
let ret = FileChunkRead {
buf,
cap0,
rem0,
remmut0,
duration: ts2.duration_since(self.ts1),
};
self.done = true;
@@ -244,14 +238,11 @@ impl Stream for FileContentStream {
} else {
let ret = FileChunkRead {
buf,
cap0,
rem0,
remmut0,
duration: ts2.duration_since(self.ts1),
};
if false && self.nlog < 6 {
self.nlog += 1;
info!("{:?} ret {:?}", self.file_io_buffer_size, ret);
info!("{:?} ret {:?}", self.disk_io_tune, ret);
}
Ready(Some(Ok(ret)))
}
@@ -269,13 +260,15 @@ impl Stream for FileContentStream {
pub fn file_content_stream(
file: File,
file_io_buffer_size: FileIoBufferSize,
disk_io_tune: DiskIoTune,
) -> impl Stream<Item = Result<FileChunkRead, Error>> + Send {
FileContentStream::new(file, file_io_buffer_size)
warn!("file_content_stream disk_io_tune {disk_io_tune:?}");
FileContentStream::new(file, disk_io_tune)
}
enum FCS2 {
GetPosition,
ReadingSimple,
Reading,
}
@@ -288,16 +281,17 @@ pub struct FileContentStream2 {
fcs2: FCS2,
file: Pin<Box<File>>,
file_pos: u64,
file_io_buffer_size: FileIoBufferSize,
eof: bool,
disk_io_tune: DiskIoTune,
get_position_fut: Pin<Box<dyn Future<Output = Result<u64, Error>> + Send>>,
read_fut: Pin<Box<dyn Future<Output = Result<ReadResult, Error>> + Send>>,
reads: VecDeque<ReadStep>,
nlog: usize,
done: bool,
complete: bool,
}
impl FileContentStream2 {
pub fn new(file: File, file_io_buffer_size: FileIoBufferSize) -> Self {
pub fn new(file: File, disk_io_tune: DiskIoTune) -> Self {
let mut file = Box::pin(file);
let ffr = unsafe {
let ffr = Pin::get_unchecked_mut(file.as_mut());
@@ -305,15 +299,18 @@ impl FileContentStream2 {
};
let ff = ffr
.seek(SeekFrom::Current(0))
.map_err(|e| Error::with_msg_no_trace(format!("Seek error")));
.map_err(|_| Error::with_msg_no_trace(format!("Seek error")));
Self {
fcs2: FCS2::GetPosition,
file,
file_pos: 0,
file_io_buffer_size,
eof: false,
disk_io_tune,
get_position_fut: Box::pin(ff),
read_fut: Box::pin(futures_util::future::ready(Err(Error::with_msg_no_trace(format!(
"dummy"
))))),
reads: VecDeque::new(),
nlog: 0,
done: false,
complete: false,
}
@@ -335,7 +332,17 @@ impl Stream for FileContentStream2 {
match self.fcs2 {
FCS2::GetPosition => match self.get_position_fut.poll_unpin(cx) {
Ready(Ok(k)) => {
info!("current file pos: {k}");
self.file_pos = k;
if false {
let fd = self.file.as_raw_fd();
let count = self.disk_io_tune.read_buffer_len as u64;
self.read_fut = Box::pin(read3::Read3::get().read(fd, self.file_pos, count));
self.file_pos += count;
self.fcs2 = FCS2::ReadingSimple;
} else {
self.fcs2 = FCS2::Reading;
}
continue;
}
Ready(Err(e)) => {
@@ -344,43 +351,96 @@ impl Stream for FileContentStream2 {
}
Pending => Pending,
},
FCS2::ReadingSimple => match self.read_fut.poll_unpin(cx) {
Ready(Ok(res)) => {
if res.eof {
let item = FileChunkRead {
buf: res.buf,
duration: Duration::from_millis(0),
};
self.done = true;
Ready(Some(Ok(item)))
} else {
let item = FileChunkRead {
buf: res.buf,
duration: Duration::from_millis(0),
};
let fd = self.file.as_raw_fd();
let count = self.disk_io_tune.read_buffer_len as u64;
self.read_fut = Box::pin(read3::Read3::get().read(fd, self.file_pos, count));
self.file_pos += count;
Ready(Some(Ok(item)))
}
}
Ready(Err(e)) => {
self.done = true;
Ready(Some(Err(e)))
}
Pending => Pending,
},
FCS2::Reading => {
// TODO Keep the read queue full.
// TODO Do not add more reads when EOF is encountered.
while self.reads.len() < 4 {
let count = self.file_io_buffer_size.bytes() as u64;
let r3 = readat::Read3::get();
let x = r3.read(self.file.as_raw_fd(), self.file_pos, count);
self.reads.push_back(ReadStep::Fut(Box::pin(x)));
while !self.eof && self.reads.len() < self.disk_io_tune.read_queue_len {
let fd = self.file.as_raw_fd();
let pos = self.file_pos;
let count = self.disk_io_tune.read_buffer_len as u64;
trace!("create ReadTask fd {fd} pos {pos} count {count}");
let r3 = read3::Read3::get();
let fut = r3.read(fd, pos, count);
self.reads.push_back(ReadStep::Fut(Box::pin(fut)));
self.file_pos += count;
}
// TODO must poll all futures to make progress... but if they resolve, must poll no more!
// therefore, need some enum type for the pending futures list to also store the resolved ones.
for e in &mut self.reads {
match e {
ReadStep::Fut(k) => match k.poll_unpin(cx) {
Ready(k) => {
trace!("received a result");
*e = ReadStep::Res(k);
}
Pending => {}
},
ReadStep::Res(_k) => {}
ReadStep::Res(_) => {}
}
}
// TODO Check the front if something is ready.
if let Some(ReadStep::Res(_)) = self.reads.front() {
if let Some(ReadStep::Res(res)) = self.reads.pop_front() {
// TODO check for error or return the read data.
// TODO if read data contains EOF flag, raise EOF flag also in self,
// and abort.
// TODO make sure that everything runs stable even if this Stream is simply dropped
// or read results are not waited for and channels or oneshots get dropped.
trace!("pop front result");
match res {
Ok(rr) => {
if rr.eof {
if self.eof {
trace!("see EOF in ReadResult AGAIN");
} else {
debug!("see EOF in ReadResult SET OUR FLAG");
self.eof = true;
}
}
let res = FileChunkRead {
buf: rr.buf,
duration: Duration::from_millis(0),
};
Ready(Some(Ok(res)))
}
Err(e) => {
error!("received ReadResult error: {e}");
self.done = true;
let e = Error::with_msg(format!("I/O error: {e}"));
Ready(Some(Err(e)))
}
}
} else {
// TODO return error, this should never happen because we check before.
self.done = true;
let e = Error::with_msg(format!("logic error"));
error!("{e}");
Ready(Some(Err(e)))
}
} else if let None = self.reads.front() {
debug!("empty read fut queue, end");
self.done = true;
continue;
} else {
trace!("read fut queue Pending");
Pending
}
// TODO handle case that self.reads is empty.
todo!()
}
}
};
@@ -390,9 +450,10 @@ impl Stream for FileContentStream2 {
pub fn file_content_stream_2(
file: File,
file_io_buffer_size: FileIoBufferSize,
disk_io_tune: DiskIoTune,
) -> impl Stream<Item = Result<FileChunkRead, Error>> + Send {
FileContentStream2::new(file, file_io_buffer_size)
warn!("file_content_stream_2 disk_io_tune {disk_io_tune:?}");
FileContentStream2::new(file, disk_io_tune)
}
pub struct NeedMinBuffer {

View File

@@ -1,14 +1,13 @@
use crate::dataopen::{open_expanded_files, open_files, OpenedFileSet};
use crate::eventchunker::{EventChunker, EventChunkerConf, EventFull};
use crate::file_content_stream;
use crate::merge::MergedStream;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::{LogItem, RangeCompletableItem, Sitemty, StreamItem};
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{log::*, FileIoBufferSize};
use netpod::{ChannelConfig, NanoRange, Node};
use netpod::{ChannelConfig, DiskIoTune, NanoRange, Node};
use std::pin::Pin;
use std::task::{Context, Poll};
use streams::rangefilter::RangeFilter;
@@ -21,7 +20,7 @@ pub struct EventChunkerMultifile {
channel_config: ChannelConfig,
file_chan: async_channel::Receiver<Result<OpenedFileSet, Error>>,
evs: Option<Pin<Box<dyn InputTraits + Send>>>,
file_io_buffer_size: FileIoBufferSize,
disk_io_tune: DiskIoTune,
event_chunker_conf: EventChunkerConf,
range: NanoRange,
data_completed: bool,
@@ -41,7 +40,7 @@ impl EventChunkerMultifile {
channel_config: ChannelConfig,
node: Node,
node_ix: usize,
file_io_buffer_size: FileIoBufferSize,
disk_io_tune: DiskIoTune,
event_chunker_conf: EventChunkerConf,
expand: bool,
do_decompress: bool,
@@ -54,7 +53,7 @@ impl EventChunkerMultifile {
Self {
file_chan,
evs: None,
file_io_buffer_size,
disk_io_tune,
event_chunker_conf,
channel_config,
range,
@@ -157,9 +156,9 @@ impl Stream for EventChunkerMultifile {
let item = LogItem::quick(Level::INFO, msg);
match file.file {
Some(file) => {
let inp = Box::pin(file_content_stream(
let inp = Box::pin(crate::file_content_stream(
file,
self.file_io_buffer_size.clone(),
self.disk_io_tune.clone(),
));
let chunker = EventChunker::from_event_boundary(
inp,
@@ -184,14 +183,14 @@ impl Stream for EventChunkerMultifile {
Ready(Some(Ok(StreamItem::Log(item))))
} else {
let msg = format!("handle OFS MERGED {:?}", ofs);
debug!("{}", msg);
info!("{}", msg);
let item = LogItem::quick(Level::INFO, msg);
let mut chunkers = vec![];
for of in ofs.files {
if let Some(file) = of.file {
let inp = Box::pin(file_content_stream(
let inp = Box::pin(crate::file_content_stream_2(
file,
self.file_io_buffer_size.clone(),
self.disk_io_tune.clone(),
));
let chunker = EventChunker::from_event_boundary(
inp,
@@ -245,9 +244,9 @@ mod test {
use err::Error;
use futures_util::StreamExt;
use items::{RangeCompletableItem, StreamItem};
use netpod::log::*;
use netpod::timeunits::{DAY, MS};
use netpod::{ByteSize, ChannelConfig, FileIoBufferSize, Nanos};
use netpod::{log::*, DiskIoTune};
use netpod::{ByteSize, ChannelConfig, Nanos};
use streams::rangefilter::RangeFilter;
fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, Vec<u64>), Error> {
@@ -268,10 +267,10 @@ mod test {
};
let cluster = netpod::test_cluster();
let node = cluster.nodes[nodeix].clone();
let buffer_size = 512;
let event_chunker_conf = EventChunkerConf {
disk_stats_every: ByteSize::kb(1024),
};
let disk_io_tune = DiskIoTune::default_for_testing();
let task = async move {
let mut event_count = 0;
let events = EventChunkerMultifile::new(
@@ -279,7 +278,7 @@ mod test {
channel_config,
node,
nodeix,
FileIoBufferSize::new(buffer_size),
disk_io_tune,
event_chunker_conf,
true,
true,

View File

@@ -299,7 +299,7 @@ mod test {
use netpod::log::*;
use netpod::test_data_base_path_databuffer;
use netpod::timeunits::{DAY, MS};
use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, FileIoBufferSize, NanoRange, Nanos, ScalarType, Shape};
use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, NanoRange, Nanos, ScalarType, Shape};
use std::path::PathBuf;
fn scalar_file_path() -> PathBuf {
@@ -336,8 +336,8 @@ mod test {
let inps = files
.into_iter()
.map(|file| {
let file_io_buffer_size = FileIoBufferSize(1024 * 4);
let inp = file_content_stream(file, file_io_buffer_size);
let disk_io_tune = netpod::DiskIoTune::default();
let inp = file_content_stream(file, disk_io_tune);
inp
})
.map(|inp| {

View File

@@ -11,7 +11,7 @@ use items::numops::{BoolNum, NumOps, StringNum};
use items::{EventsNodeProcessor, Framable, RangeCompletableItem, Sitemty, StreamItem};
use netpod::log::*;
use netpod::query::RawEventsQuery;
use netpod::{AggKind, ByteOrder, ByteSize, Channel, FileIoBufferSize, NanoRange, NodeConfigCached, ScalarType, Shape};
use netpod::{AggKind, ByteOrder, ByteSize, Channel, DiskIoTune, NanoRange, NodeConfigCached, ScalarType, Shape};
use parse::channelconfig::{extract_matching_config_entry, read_local_config, ConfigEntry, MatchingConfigEntry};
use std::pin::Pin;
@@ -194,7 +194,7 @@ pub async fn make_event_pipe(
channel_config.clone(),
node_config.node.clone(),
node_config.ix,
FileIoBufferSize::new(evq.disk_io_buffer_size),
evq.disk_io_tune.clone(),
event_chunker_conf,
evq.agg_kind.need_expand(),
true,
@@ -235,10 +235,10 @@ pub fn make_local_event_blobs_stream(
expand: bool,
do_decompress: bool,
event_chunker_conf: EventChunkerConf,
file_io_buffer_size: FileIoBufferSize,
disk_io_tune: DiskIoTune,
node_config: &NodeConfigCached,
) -> Result<EventChunkerMultifile, Error> {
info!("make_local_event_blobs_stream do_decompress {do_decompress} file_io_buffer_size {file_io_buffer_size:?}");
info!("make_local_event_blobs_stream do_decompress {do_decompress} disk_io_tune {disk_io_tune:?}");
if do_decompress {
warn!("Possible issue: decompress central storage event blob stream");
}
@@ -261,7 +261,7 @@ pub fn make_local_event_blobs_stream(
channel_config.clone(),
node_config.node.clone(),
node_config.ix,
file_io_buffer_size,
disk_io_tune,
event_chunker_conf,
expand,
do_decompress,
@@ -276,7 +276,7 @@ pub fn make_remote_event_blobs_stream(
expand: bool,
do_decompress: bool,
event_chunker_conf: EventChunkerConf,
file_io_buffer_size: FileIoBufferSize,
disk_io_tune: DiskIoTune,
node_config: &NodeConfigCached,
) -> Result<impl Stream<Item = Sitemty<EventFull>>, Error> {
let shape = match entry.to_shape() {
@@ -298,7 +298,7 @@ pub fn make_remote_event_blobs_stream(
channel_config.clone(),
node_config.node.clone(),
node_config.ix,
file_io_buffer_size,
disk_io_tune,
event_chunker_conf,
expand,
do_decompress,
@@ -316,7 +316,6 @@ pub async fn make_event_blobs_pipe(
Err(e) => return Err(e)?,
}
}
let file_io_buffer_size = FileIoBufferSize::new(evq.disk_io_buffer_size);
let expand = evq.agg_kind.need_expand();
let range = &evq.range;
let entry = get_applicable_entry(&evq.range, evq.channel.clone(), node_config).await?;
@@ -329,7 +328,7 @@ pub async fn make_event_blobs_pipe(
expand,
evq.do_decompress,
event_chunker_conf,
file_io_buffer_size,
evq.disk_io_tune.clone(),
node_config,
)?;
let s = event_blobs.map(|item| Box::new(item) as Box<dyn Framable>);
@@ -345,7 +344,7 @@ pub async fn make_event_blobs_pipe(
expand,
evq.do_decompress,
event_chunker_conf,
file_io_buffer_size,
evq.disk_io_tune.clone(),
node_config,
)?;
let s = event_blobs.map(|item| Box::new(item) as Box<dyn Framable>);

View File

@@ -2,10 +2,13 @@ use bytes::BytesMut;
use err::Error;
use netpod::log::*;
use std::os::unix::prelude::RawFd;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use std::sync::Once;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, oneshot};
const DO_TRACE: bool = false;
pub struct ReadTask {
fd: RawFd,
pos: u64,
@@ -18,40 +21,50 @@ pub struct ReadResult {
pub eof: bool,
}
/*
Async code must be able to interact with the Read3 system via async methods.
The async code must be able to enqueue a read in non-blocking fashion.
Since the queue of pending read requests must be bounded, this must be able to async-block.
*/
pub struct Read3 {
jobs_tx: mpsc::Sender<ReadTask>,
rtx: crossbeam::channel::Sender<mpsc::Receiver<ReadTask>>,
threads_max: AtomicUsize,
can_not_publish: AtomicUsize,
}
impl Read3 {
pub fn get() -> &'static Self {
static INIT: Once = Once::new();
INIT.call_once(|| {
let (jtx, jrx) = mpsc::channel(32);
let (rtx, rrx) = crossbeam::channel::bounded(16);
let read3 = Read3 { jobs_tx: jtx, rtx };
let (jtx, jrx) = mpsc::channel(512);
let (rtx, rrx) = crossbeam::channel::bounded(32);
let read3 = Read3 {
jobs_tx: jtx,
rtx,
threads_max: AtomicUsize::new(32),
can_not_publish: AtomicUsize::new(0),
};
let b = Box::new(read3);
let ptr = Box::into_raw(b);
READ3.store(ptr, Ordering::SeqCst);
let ptr = READ3.load(Ordering::SeqCst);
READ3.store(ptr, Ordering::Release);
let ptr = READ3.load(Ordering::Acquire);
let h = unsafe { &*ptr };
if let Err(_) = h.rtx.send(jrx) {
error!("Read3 INIT: can not enqueue main job reader");
}
for _ in 0..2 {
for wid in 0..128 {
let rrx = rrx.clone();
tokio::task::spawn_blocking(move || h.read_worker(rrx));
tokio::task::spawn_blocking(move || h.read_worker(wid, rrx));
}
});
let ptr = READ3.load(Ordering::SeqCst);
let ptr = READ3.load(Ordering::Acquire);
unsafe { &*ptr }
}
pub fn threads_max(&self) -> usize {
self.threads_max.load(Ordering::Acquire)
}
pub fn set_threads_max(&self, max: usize) {
self.threads_max.store(max, Ordering::Release);
}
pub async fn read(&self, fd: RawFd, pos: u64, count: u64) -> Result<ReadResult, Error> {
let (tx, rx) = oneshot::channel();
let rt = ReadTask {
@@ -69,58 +82,73 @@ impl Read3 {
}
}
fn read_worker(&self, rrx: crossbeam::channel::Receiver<mpsc::Receiver<ReadTask>>) {
fn read_worker(&self, wid: u32, rrx: crossbeam::channel::Receiver<mpsc::Receiver<ReadTask>>) {
'outer: loop {
while wid as usize >= self.threads_max.load(Ordering::Acquire) {
std::thread::sleep(Duration::from_millis(4000));
}
match rrx.recv() {
Ok(mut jrx) => match jrx.blocking_recv() {
Some(rt) => match self.rtx.send(jrx) {
Ok(_) => {
let ts1 = Instant::now();
let mut prc = 0;
let fd = rt.fd;
let mut rpos = rt.pos;
let mut buf = BytesMut::with_capacity(rt.count as usize);
let mut writable = rt.count as usize;
let rr = unsafe {
loop {
info!("do pread fd {} count {} offset {}", rt.fd, writable, rt.pos);
let ec = libc::pread(rt.fd, buf.as_mut_ptr() as _, writable, rt.pos as i64);
if DO_TRACE {
trace!("do pread fd {fd} count {writable} offset {rpos} wid {wid}");
}
let ec = libc::pread(fd, buf.as_mut_ptr() as _, writable, rpos as i64);
prc += 1;
if ec == -1 {
let errno = *libc::__errno_location();
if errno == libc::EINVAL {
info!("pread EOF fd {} count {} offset {}", rt.fd, writable, rt.pos);
debug!("pread EOF fd {fd} count {writable} offset {rpos} wid {wid}");
let rr = ReadResult { buf, eof: true };
break Ok(rr);
} else {
warn!(
"pread ERROR errno {} fd {} count {} offset {}",
errno, rt.fd, writable, rt.pos
);
warn!("pread ERROR errno {errno} fd {fd} count {writable} offset {rpos} wid {wid}");
// TODO use a more structured error
let e = Error::with_msg_no_trace(format!(
"pread ERROR errno {} fd {} count {} offset {}",
errno, rt.fd, writable, rt.pos
"pread ERROR errno {errno} fd {fd} count {writable} offset {rpos} wid {wid}"
));
break Err(e);
}
} else if ec == 0 {
info!("pread EOF fd {} count {} offset {}", rt.fd, writable, rt.pos);
debug!("pread EOF fd {fd} count {writable} offset {rpos} wid {wid} prc {prc}");
let rr = ReadResult { buf, eof: true };
break Ok(rr);
} else if ec > 0 {
buf.set_len(ec as usize);
if ec as usize > writable {
error!(
"pread TOOLARGE ec {} fd {} count {} offset {}",
ec, rt.fd, writable, rt.pos
"pread TOOLARGE ec {ec} fd {fd} count {writable} offset {rpos} wid {wid} prc {prc}"
);
break 'outer;
}
writable -= ec as usize;
if writable == 0 {
let rr = ReadResult { buf, eof: false };
break Ok(rr);
} else {
rpos += ec as u64;
writable -= ec as usize;
buf.set_len(buf.len() + (ec as usize));
if writable == 0 {
let ts2 = Instant::now();
let dur = ts2.duration_since(ts1);
let dms = 1e3 * dur.as_secs_f32();
if DO_TRACE {
trace!(
"pread DONE ec {ec} fd {fd} wid {wid} prc {prc} dms {dms:.2}"
);
}
let rr = ReadResult { buf, eof: false };
break Ok(rr);
}
}
} else {
error!(
"pread UNEXPECTED ec {} fd {} count {} offset {}",
ec, rt.fd, writable, rt.pos
"pread UNEXPECTED ec {} fd {} count {} offset {rpos} wid {wid}",
ec, rt.fd, writable
);
break 'outer;
}
@@ -129,20 +157,23 @@ impl Read3 {
match rt.rescell.send(rr) {
Ok(_) => {}
Err(_) => {
error!("can not publish the read result");
break 'outer;
self.can_not_publish.fetch_add(1, Ordering::AcqRel);
warn!("can not publish the read result wid {wid}");
}
}
}
Err(e) => {
error!("can not return the job receiver: {e}");
error!("can not return the job receiver: wid {wid} {e}");
break 'outer;
}
},
None => break 'outer,
None => {
let _ = self.rtx.send(jrx);
break 'outer;
}
},
Err(e) => {
error!("read_worker sees: {e}");
error!("read_worker sees: wid {wid} {e}");
break 'outer;
}
}