Simplify and tune defaults

This commit is contained in:
Dominik Werder
2023-07-25 14:51:39 +02:00
parent 7c26b72537
commit 952e92101c
5 changed files with 159 additions and 19 deletions

View File

@@ -19,6 +19,8 @@ pub mod streamlog;
pub use parse;
use async_channel::Receiver;
use async_channel::Sender;
use bytes::BytesMut;
use err::Error;
use futures_util::future::FusedFuture;
@@ -719,6 +721,128 @@ impl Stream for FileContentStream4 {
}
}
async fn file_tokio_to_std(file: File) -> std::fs::File {
file.into_std().await
}
struct BlockingTaskIntoChannel {
convert_file_fut: Option<Pin<Box<dyn Future<Output = std::fs::File> + Send>>>,
tx: Option<Sender<Result<FileChunkRead, Error>>>,
rx: Receiver<Result<FileChunkRead, Error>>,
cap: usize,
reqid: String,
}
impl BlockingTaskIntoChannel {
fn new(file: File, disk_io_tune: DiskIoTune, reqid: String) -> Self {
let (tx, rx) = async_channel::bounded(disk_io_tune.read_queue_len);
Self {
convert_file_fut: Some(Box::pin(file_tokio_to_std(file))),
tx: Some(tx),
rx,
cap: disk_io_tune.read_buffer_len,
reqid,
}
}
fn readloop(mut file: std::fs::File, cap: usize, tx: Sender<Result<FileChunkRead, Error>>, reqid: String) {
let span = tracing::span!(tracing::Level::DEBUG, "bticrl", reqid);
let _spg = span.entered();
loop {
use std::io::Read;
let ts1 = Instant::now();
let mut buf = BytesMut::with_capacity(cap);
unsafe {
// SAFETY we can always size up to capacity
buf.set_len(buf.capacity());
}
match file.read(&mut buf) {
Ok(n) => {
if n == 0 {
tx.close();
break;
} else if n > buf.capacity() {
let msg = format!("blocking_task_into_channel read more than buffer cap");
error!("{msg}");
match tx.send_blocking(Err(Error::with_msg_no_trace(msg))) {
Ok(()) => (),
Err(e) => {
error!("blocking_task_into_channel can not send into channel {e}");
}
}
break;
} else {
let ts2 = Instant::now();
unsafe {
// SAFETY we checked before that n <= capacity
buf.set_len(n);
}
let item = FileChunkRead::with_buf_dur(buf, ts2.duration_since(ts1));
match tx.send_blocking(Ok(item)) {
Ok(()) => (),
Err(e) => {
error!("blocking_task_into_channel can not send into channel {e}");
break;
}
}
}
}
Err(e) => {
match tx.send_blocking(Err(e.into())) {
Ok(()) => (),
Err(e) => {
error!("blocking_task_into_channel can not send into channel {e}");
}
}
break;
}
}
}
}
fn setup(&mut self, file: std::fs::File) {
let cap = self.cap;
let tx = self.tx.take().unwrap();
let reqid = self.reqid.clone();
taskrun::tokio::task::spawn_blocking(move || Self::readloop(file, cap, tx, reqid));
}
}
impl Stream for BlockingTaskIntoChannel {
type Item = Result<FileChunkRead, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
if let Some(fut) = &mut self.convert_file_fut {
match fut.poll_unpin(cx) {
Ready(file) => {
self.convert_file_fut = None;
self.setup(file);
cx.waker().wake_by_ref();
Pending
}
Pending => Pending,
}
} else {
match self.rx.poll_next_unpin(cx) {
Ready(Some(Ok(item))) => Ready(Some(Ok(item))),
Ready(Some(Err(e))) => Ready(Some(Err(e))),
Ready(None) => Ready(None),
Pending => Pending,
}
}
}
}
fn blocking_task_into_channel(
path: PathBuf,
file: File,
disk_io_tune: DiskIoTune,
reqid: String,
) -> Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>> {
Box::pin(BlockingTaskIntoChannel::new(file, disk_io_tune, reqid)) as _
}
pub fn file_content_stream<S>(
path: PathBuf,
file: File,
@@ -728,7 +852,7 @@ pub fn file_content_stream<S>(
where
S: Into<String>,
{
if let ReadSys::TokioAsyncRead = disk_io_tune.read_sys {
if let ReadSys::BlockingTaskIntoChannel = disk_io_tune.read_sys {
} else {
warn!("reading via {:?}", disk_io_tune.read_sys);
}
@@ -755,6 +879,7 @@ where
let s = FileContentStream5::new(path, file, disk_io_tune, reqid).unwrap();
Box::pin(s) as _
}
ReadSys::BlockingTaskIntoChannel => blocking_task_into_channel(path, file, disk_io_tune, reqid),
}
}

View File

@@ -79,7 +79,7 @@ pub async fn make_event_pipe(
fetch_info.clone(),
ncc.node.clone(),
ncc.ix,
DiskIoTune::default(),
evq.disk_io_tune(),
event_chunker_conf,
one_before,
out_max_len,
@@ -175,7 +175,7 @@ pub async fn make_event_blobs_pipe_real(
fetch_info.clone(),
expand,
event_chunker_conf,
DiskIoTune::default().with_read_buffer_len(subq.buf_len_disk_io()),
subq.disk_io_tune(),
reqctx,
node_config,
)?;
@@ -186,7 +186,7 @@ pub async fn make_event_blobs_pipe_real(
fetch_info.clone(),
expand,
event_chunker_conf,
DiskIoTune::default().with_read_buffer_len(subq.buf_len_disk_io()),
subq.disk_io_tune(),
reqctx,
node_config,
)?;

View File

@@ -913,7 +913,7 @@ mod serde_shape {
{
use Shape::*;
match self {
Scalar => ser.collect_seq([0u32; 0].iter()),
Scalar => ser.collect_seq(std::iter::empty::<u32>()),
Wave(a) => ser.collect_seq([*a].iter()),
Image(a, b) => ser.collect_seq([*a, *b].iter()),
}
@@ -2353,11 +2353,12 @@ pub enum ReadSys {
Read3,
Read4,
Read5,
BlockingTaskIntoChannel,
}
impl ReadSys {
pub fn default() -> Self {
Self::TokioAsyncRead
Self::BlockingTaskIntoChannel
}
}
@@ -2373,6 +2374,8 @@ impl From<&str> for ReadSys {
Self::Read4
} else if k == "Read5" {
Self::Read5
} else if k == "BlockingTaskIntoChannel" {
Self::BlockingTaskIntoChannel
} else {
Self::default()
}
@@ -2390,7 +2393,7 @@ impl DiskIoTune {
pub fn default_for_testing() -> Self {
Self {
read_sys: ReadSys::default(),
read_buffer_len: 1024 * 4,
read_buffer_len: 1024 * 8,
read_queue_len: 4,
}
}
@@ -2398,7 +2401,7 @@ impl DiskIoTune {
pub fn default() -> Self {
Self {
read_sys: ReadSys::default(),
read_buffer_len: 1024 * 4,
read_buffer_len: 1024 * 16,
read_queue_len: 4,
}
}

View File

@@ -263,8 +263,8 @@ impl Api1Query {
if let Some(x) = &self.file_io_buffer_size {
k.read_buffer_len = x.0;
}
if let Some(io_queue_len) = self.io_queue_len {
k.read_queue_len = io_queue_len as usize;
if let Some(x) = self.io_queue_len {
k.read_queue_len = x as usize;
}
let read_sys: ReadSys = self.read_sys.as_str().into();
k.read_sys = read_sys;

View File

@@ -10,6 +10,7 @@ use netpod::range::evrange::SeriesRange;
use netpod::AppendToUrl;
use netpod::ByteSize;
use netpod::ChannelTypeConfigGen;
use netpod::DiskIoTune;
use netpod::FromUrl;
use netpod::HasBackend;
use netpod::HasTimeout;
@@ -316,6 +317,7 @@ pub struct EventsSubQuerySettings {
event_delay: Option<Duration>,
stream_batch_len: Option<usize>,
buf_len_disk_io: Option<usize>,
queue_len_disk_io: Option<usize>,
test_do_wasm: bool,
create_errors: Vec<String>,
}
@@ -323,13 +325,14 @@ pub struct EventsSubQuerySettings {
impl Default for EventsSubQuerySettings {
fn default() -> Self {
Self {
timeout: Default::default(),
events_max: Default::default(),
event_delay: Default::default(),
stream_batch_len: Default::default(),
buf_len_disk_io: Default::default(),
test_do_wasm: Default::default(),
create_errors: Default::default(),
timeout: None,
events_max: None,
event_delay: None,
stream_batch_len: None,
buf_len_disk_io: None,
queue_len_disk_io: None,
test_do_wasm: false,
create_errors: Vec::new(),
}
}
}
@@ -342,6 +345,8 @@ impl From<&PlainEventsQuery> for EventsSubQuerySettings {
event_delay: value.event_delay,
stream_batch_len: value.stream_batch_len,
buf_len_disk_io: value.buf_len_disk_io,
// TODO add to query
queue_len_disk_io: None,
test_do_wasm: value.test_do_wasm,
create_errors: value.create_errors.clone(),
}
@@ -357,6 +362,8 @@ impl From<&BinnedQuery> for EventsSubQuerySettings {
event_delay: None,
stream_batch_len: None,
buf_len_disk_io: None,
// TODO add to query
queue_len_disk_io: None,
test_do_wasm: false,
create_errors: Vec::new(),
}
@@ -373,6 +380,7 @@ impl From<&Api1Query> for EventsSubQuerySettings {
event_delay: None,
stream_batch_len: None,
buf_len_disk_io: Some(disk_io_tune.read_buffer_len),
queue_len_disk_io: Some(disk_io_tune.read_queue_len),
test_do_wasm: false,
create_errors: Vec::new(),
}
@@ -429,8 +437,12 @@ impl EventsSubQuery {
&self.settings.event_delay
}
pub fn buf_len_disk_io(&self) -> usize {
self.settings.buf_len_disk_io.unwrap_or(1024 * 8)
pub fn disk_io_tune(&self) -> DiskIoTune {
let mut tune = DiskIoTune::default();
if let Some(x) = self.settings.buf_len_disk_io {
tune.read_buffer_len = x;
}
tune
}
pub fn inmem_bufcap(&self) -> ByteSize {