From 952e92101cf51661c7ee271f1b2bc105a1f1d32a Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 25 Jul 2023 14:51:39 +0200 Subject: [PATCH] Simplify and tune defaults --- crates/disk/src/disk.rs | 127 +++++++++++++++++++++++++++++++- crates/disk/src/raw/conn.rs | 6 +- crates/netpod/src/netpod.rs | 11 ++- crates/netpod/src/query/api1.rs | 4 +- crates/query/src/api4/events.rs | 30 +++++--- 5 files changed, 159 insertions(+), 19 deletions(-) diff --git a/crates/disk/src/disk.rs b/crates/disk/src/disk.rs index e773d3d..7b0b201 100644 --- a/crates/disk/src/disk.rs +++ b/crates/disk/src/disk.rs @@ -19,6 +19,8 @@ pub mod streamlog; pub use parse; +use async_channel::Receiver; +use async_channel::Sender; use bytes::BytesMut; use err::Error; use futures_util::future::FusedFuture; @@ -719,6 +721,128 @@ impl Stream for FileContentStream4 { } } +async fn file_tokio_to_std(file: File) -> std::fs::File { + file.into_std().await +} + +struct BlockingTaskIntoChannel { + convert_file_fut: Option + Send>>>, + tx: Option>>, + rx: Receiver>, + cap: usize, + reqid: String, +} + +impl BlockingTaskIntoChannel { + fn new(file: File, disk_io_tune: DiskIoTune, reqid: String) -> Self { + let (tx, rx) = async_channel::bounded(disk_io_tune.read_queue_len); + Self { + convert_file_fut: Some(Box::pin(file_tokio_to_std(file))), + tx: Some(tx), + rx, + cap: disk_io_tune.read_buffer_len, + reqid, + } + } + + fn readloop(mut file: std::fs::File, cap: usize, tx: Sender>, reqid: String) { + let span = tracing::span!(tracing::Level::DEBUG, "bticrl", reqid); + let _spg = span.entered(); + loop { + use std::io::Read; + let ts1 = Instant::now(); + let mut buf = BytesMut::with_capacity(cap); + unsafe { + // SAFETY we can always size up to capacity + buf.set_len(buf.capacity()); + } + match file.read(&mut buf) { + Ok(n) => { + if n == 0 { + tx.close(); + break; + } else if n > buf.capacity() { + let msg = format!("blocking_task_into_channel read more than buffer cap"); + error!("{msg}"); + match tx.send_blocking(Err(Error::with_msg_no_trace(msg))) { + Ok(()) => (), + Err(e) => { + error!("blocking_task_into_channel can not send into channel {e}"); + } + } + break; + } else { + let ts2 = Instant::now(); + unsafe { + // SAFETY we checked before that n <= capacity + buf.set_len(n); + } + let item = FileChunkRead::with_buf_dur(buf, ts2.duration_since(ts1)); + match tx.send_blocking(Ok(item)) { + Ok(()) => (), + Err(e) => { + error!("blocking_task_into_channel can not send into channel {e}"); + break; + } + } + } + } + Err(e) => { + match tx.send_blocking(Err(e.into())) { + Ok(()) => (), + Err(e) => { + error!("blocking_task_into_channel can not send into channel {e}"); + } + } + break; + } + } + } + } + + fn setup(&mut self, file: std::fs::File) { + let cap = self.cap; + let tx = self.tx.take().unwrap(); + let reqid = self.reqid.clone(); + taskrun::tokio::task::spawn_blocking(move || Self::readloop(file, cap, tx, reqid)); + } +} + +impl Stream for BlockingTaskIntoChannel { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + if let Some(fut) = &mut self.convert_file_fut { + match fut.poll_unpin(cx) { + Ready(file) => { + self.convert_file_fut = None; + self.setup(file); + cx.waker().wake_by_ref(); + Pending + } + Pending => Pending, + } + } else { + match self.rx.poll_next_unpin(cx) { + Ready(Some(Ok(item))) => Ready(Some(Ok(item))), + Ready(Some(Err(e))) => Ready(Some(Err(e))), + Ready(None) => Ready(None), + Pending => Pending, + } + } + } +} + +fn blocking_task_into_channel( + path: PathBuf, + file: File, + disk_io_tune: DiskIoTune, + reqid: String, +) -> Pin> + Send>> { + Box::pin(BlockingTaskIntoChannel::new(file, disk_io_tune, reqid)) as _ +} + pub fn file_content_stream( path: PathBuf, file: File, @@ -728,7 +852,7 @@ pub fn file_content_stream( where S: Into, { - if let ReadSys::TokioAsyncRead = disk_io_tune.read_sys { + if let ReadSys::BlockingTaskIntoChannel = disk_io_tune.read_sys { } else { warn!("reading via {:?}", disk_io_tune.read_sys); } @@ -755,6 +879,7 @@ where let s = FileContentStream5::new(path, file, disk_io_tune, reqid).unwrap(); Box::pin(s) as _ } + ReadSys::BlockingTaskIntoChannel => blocking_task_into_channel(path, file, disk_io_tune, reqid), } } diff --git a/crates/disk/src/raw/conn.rs b/crates/disk/src/raw/conn.rs index 4cd4167..ee197f3 100644 --- a/crates/disk/src/raw/conn.rs +++ b/crates/disk/src/raw/conn.rs @@ -79,7 +79,7 @@ pub async fn make_event_pipe( fetch_info.clone(), ncc.node.clone(), ncc.ix, - DiskIoTune::default(), + evq.disk_io_tune(), event_chunker_conf, one_before, out_max_len, @@ -175,7 +175,7 @@ pub async fn make_event_blobs_pipe_real( fetch_info.clone(), expand, event_chunker_conf, - DiskIoTune::default().with_read_buffer_len(subq.buf_len_disk_io()), + subq.disk_io_tune(), reqctx, node_config, )?; @@ -186,7 +186,7 @@ pub async fn make_event_blobs_pipe_real( fetch_info.clone(), expand, event_chunker_conf, - DiskIoTune::default().with_read_buffer_len(subq.buf_len_disk_io()), + subq.disk_io_tune(), reqctx, node_config, )?; diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 0b04c29..c6dacc8 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -913,7 +913,7 @@ mod serde_shape { { use Shape::*; match self { - Scalar => ser.collect_seq([0u32; 0].iter()), + Scalar => ser.collect_seq(std::iter::empty::()), Wave(a) => ser.collect_seq([*a].iter()), Image(a, b) => ser.collect_seq([*a, *b].iter()), } @@ -2353,11 +2353,12 @@ pub enum ReadSys { Read3, Read4, Read5, + BlockingTaskIntoChannel, } impl ReadSys { pub fn default() -> Self { - Self::TokioAsyncRead + Self::BlockingTaskIntoChannel } } @@ -2373,6 +2374,8 @@ impl From<&str> for ReadSys { Self::Read4 } else if k == "Read5" { Self::Read5 + } else if k == "BlockingTaskIntoChannel" { + Self::BlockingTaskIntoChannel } else { Self::default() } @@ -2390,7 +2393,7 @@ impl DiskIoTune { pub fn default_for_testing() -> Self { Self { read_sys: ReadSys::default(), - read_buffer_len: 1024 * 4, + read_buffer_len: 1024 * 8, read_queue_len: 4, } } @@ -2398,7 +2401,7 @@ impl DiskIoTune { pub fn default() -> Self { Self { read_sys: ReadSys::default(), - read_buffer_len: 1024 * 4, + read_buffer_len: 1024 * 16, read_queue_len: 4, } } diff --git a/crates/netpod/src/query/api1.rs b/crates/netpod/src/query/api1.rs index 7ade34c..c645002 100644 --- a/crates/netpod/src/query/api1.rs +++ b/crates/netpod/src/query/api1.rs @@ -263,8 +263,8 @@ impl Api1Query { if let Some(x) = &self.file_io_buffer_size { k.read_buffer_len = x.0; } - if let Some(io_queue_len) = self.io_queue_len { - k.read_queue_len = io_queue_len as usize; + if let Some(x) = self.io_queue_len { + k.read_queue_len = x as usize; } let read_sys: ReadSys = self.read_sys.as_str().into(); k.read_sys = read_sys; diff --git a/crates/query/src/api4/events.rs b/crates/query/src/api4/events.rs index 3897610..7ab7604 100644 --- a/crates/query/src/api4/events.rs +++ b/crates/query/src/api4/events.rs @@ -10,6 +10,7 @@ use netpod::range::evrange::SeriesRange; use netpod::AppendToUrl; use netpod::ByteSize; use netpod::ChannelTypeConfigGen; +use netpod::DiskIoTune; use netpod::FromUrl; use netpod::HasBackend; use netpod::HasTimeout; @@ -316,6 +317,7 @@ pub struct EventsSubQuerySettings { event_delay: Option, stream_batch_len: Option, buf_len_disk_io: Option, + queue_len_disk_io: Option, test_do_wasm: bool, create_errors: Vec, } @@ -323,13 +325,14 @@ pub struct EventsSubQuerySettings { impl Default for EventsSubQuerySettings { fn default() -> Self { Self { - timeout: Default::default(), - events_max: Default::default(), - event_delay: Default::default(), - stream_batch_len: Default::default(), - buf_len_disk_io: Default::default(), - test_do_wasm: Default::default(), - create_errors: Default::default(), + timeout: None, + events_max: None, + event_delay: None, + stream_batch_len: None, + buf_len_disk_io: None, + queue_len_disk_io: None, + test_do_wasm: false, + create_errors: Vec::new(), } } } @@ -342,6 +345,8 @@ impl From<&PlainEventsQuery> for EventsSubQuerySettings { event_delay: value.event_delay, stream_batch_len: value.stream_batch_len, buf_len_disk_io: value.buf_len_disk_io, + // TODO add to query + queue_len_disk_io: None, test_do_wasm: value.test_do_wasm, create_errors: value.create_errors.clone(), } @@ -357,6 +362,8 @@ impl From<&BinnedQuery> for EventsSubQuerySettings { event_delay: None, stream_batch_len: None, buf_len_disk_io: None, + // TODO add to query + queue_len_disk_io: None, test_do_wasm: false, create_errors: Vec::new(), } @@ -373,6 +380,7 @@ impl From<&Api1Query> for EventsSubQuerySettings { event_delay: None, stream_batch_len: None, buf_len_disk_io: Some(disk_io_tune.read_buffer_len), + queue_len_disk_io: Some(disk_io_tune.read_queue_len), test_do_wasm: false, create_errors: Vec::new(), } @@ -429,8 +437,12 @@ impl EventsSubQuery { &self.settings.event_delay } - pub fn buf_len_disk_io(&self) -> usize { - self.settings.buf_len_disk_io.unwrap_or(1024 * 8) + pub fn disk_io_tune(&self) -> DiskIoTune { + let mut tune = DiskIoTune::default(); + if let Some(x) = self.settings.buf_len_disk_io { + tune.read_buffer_len = x; + } + tune } pub fn inmem_bufcap(&self) -> ByteSize {