WIP trace i/o buffer size

This commit is contained in:
Dominik Werder
2021-09-15 16:46:14 +02:00
parent 22ba7bb0d3
commit 7166e65277
6 changed files with 87 additions and 31 deletions

View File

@@ -1,6 +1,6 @@
use crate::eventblobs::EventChunkerMultifile;
use crate::eventchunker::EventChunkerConf;
use netpod::timeunits::*;
use netpod::{timeunits::*, FileIoBufferSize};
use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, NanoRange, Nanos, Node, ScalarType, Shape};
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
@@ -54,12 +54,13 @@ async fn agg_x_dim_0_inner() {
let ts2 = ts1 + HOUR * 24;
let range = NanoRange { beg: ts1, end: ts2 };
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
let file_io_buffer_size = FileIoBufferSize::new(query.buffer_size as usize);
let fut1 = EventChunkerMultifile::new(
range.clone(),
query.channel_config.clone(),
node.clone(),
0,
query.buffer_size as usize,
file_io_buffer_size,
event_chunker_conf,
false,
);
@@ -104,12 +105,13 @@ async fn agg_x_dim_1_inner() {
let ts2 = ts1 + HOUR * 24;
let range = NanoRange { beg: ts1, end: ts2 };
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
let file_io_buffer_size = FileIoBufferSize::new(query.buffer_size as usize);
let fut1 = super::eventblobs::EventChunkerMultifile::new(
range.clone(),
query.channel_config.clone(),
node.clone(),
0,
query.buffer_size as usize,
file_io_buffer_size,
event_chunker_conf,
false,
);

View File

@@ -6,8 +6,8 @@ use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::{LogItem, RangeCompletableItem, Sitemty, StreamItem};
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{log::*, FileIoBufferSize};
use netpod::{ChannelConfig, NanoRange, Node};
use std::pin::Pin;
use std::sync::atomic::AtomicU64;
@@ -22,7 +22,7 @@ pub struct EventChunkerMultifile {
channel_config: ChannelConfig,
file_chan: async_channel::Receiver<Result<OpenedFileSet, Error>>,
evs: Option<Pin<Box<dyn InputTraits + Send>>>,
buffer_size: usize,
file_io_buffer_size: FileIoBufferSize,
event_chunker_conf: EventChunkerConf,
range: NanoRange,
data_completed: bool,
@@ -42,7 +42,7 @@ impl EventChunkerMultifile {
channel_config: ChannelConfig,
node: Node,
node_ix: usize,
buffer_size: usize,
file_io_buffer_size: FileIoBufferSize,
event_chunker_conf: EventChunkerConf,
expand: bool,
) -> Self {
@@ -54,7 +54,7 @@ impl EventChunkerMultifile {
Self {
file_chan,
evs: None,
buffer_size,
file_io_buffer_size,
event_chunker_conf,
channel_config,
range,
@@ -132,8 +132,10 @@ impl Stream for EventChunkerMultifile {
let item = LogItem::quick(Level::INFO, msg);
match file.file {
Some(file) => {
let inp =
Box::pin(file_content_stream(file, self.buffer_size as usize));
let inp = Box::pin(file_content_stream(
file,
self.file_io_buffer_size.clone(),
));
let chunker = EventChunker::from_event_boundary(
inp,
self.channel_config.clone(),
@@ -155,8 +157,10 @@ impl Stream for EventChunkerMultifile {
let mut chunkers = vec![];
for of in ofs.files {
if let Some(file) = of.file {
let inp =
Box::pin(file_content_stream(file, self.buffer_size as usize));
let inp = Box::pin(file_content_stream(
file,
self.file_io_buffer_size.clone(),
));
let chunker = EventChunker::from_event_boundary(
inp,
self.channel_config.clone(),
@@ -240,7 +244,7 @@ fn read_expanded_for_range(range: netpod::NanoRange) -> Result<(usize, usize), E
channel_config,
node,
node_ix,
buffer_size,
FileIoBufferSize::new(buffer_size),
event_chunker_conf,
true,
);

View File

@@ -5,7 +5,7 @@ use err::Error;
use futures_core::Stream;
use futures_util::future::FusedFuture;
use futures_util::{pin_mut, select, FutureExt, StreamExt};
use netpod::log::*;
use netpod::{log::*, FileIoBufferSize};
use netpod::{ChannelConfig, NanoRange, Node, Shape};
use std::future::Future;
use std::path::PathBuf;
@@ -313,13 +313,13 @@ pub struct FileChunkRead {
pub fn file_content_stream(
mut file: File,
buffer_size: usize,
file_io_buffer_size: FileIoBufferSize,
) -> impl Stream<Item = Result<FileChunkRead, Error>> + Send {
async_stream::stream! {
use tokio::io::AsyncReadExt;
loop {
let ts1 = Instant::now();
let mut buf = BytesMut::with_capacity(buffer_size);
let mut buf = BytesMut::with_capacity(file_io_buffer_size.0);
let n1 = file.read_buf(&mut buf).await?;
let ts2 = Instant::now();
if n1 == 0 {
@@ -341,6 +341,7 @@ pub struct NeedMinBuffer {
inp: Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>>,
need_min: u32,
left: Option<FileChunkRead>,
buf_len_histo: [u32; 16],
errored: bool,
completed: bool,
}
@@ -351,6 +352,7 @@ impl NeedMinBuffer {
inp: inp,
need_min: 1,
left: None,
buf_len_histo: Default::default(),
errored: false,
completed: false,
}
@@ -366,6 +368,13 @@ impl NeedMinBuffer {
}
}
// TODO remove this again
impl Drop for NeedMinBuffer {
fn drop(&mut self) {
info!("NeedMinBuffer histo: {:?}", self.buf_len_histo);
}
}
impl Stream for NeedMinBuffer {
type Item = Result<FileChunkRead, Error>;
@@ -382,7 +391,15 @@ impl Stream for NeedMinBuffer {
let mut again = false;
let z = match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(fcr))) => {
//info!("NeedMin got buf len {}", buf.len());
let mut u = fcr.buf.len();
let mut po = 0;
while u != 0 && po < 15 {
u /= 2;
po += 1;
}
let po = if po > 8 { po - 8 } else { 0 };
self.buf_len_histo[po] += 1;
//info!("NeedMinBuffer got buf len {}", fcr.buf.len());
match self.left.take() {
Some(mut lfcr) => {
// TODO measure:
@@ -413,7 +430,10 @@ impl Stream for NeedMinBuffer {
}
}
Ready(Some(Err(e))) => Ready(Some(Err(e.into()))),
Ready(None) => Ready(None),
Ready(None) => {
info!("NeedMinBuffer histo: {:?}", self.buf_len_histo);
Ready(None)
}
Pending => Pending,
};
if !again {

View File

@@ -10,7 +10,7 @@ use futures_util::StreamExt;
use items::numops::{BoolNum, NumOps};
use items::{EventsNodeProcessor, Framable, RangeCompletableItem, Sitemty, StreamItem};
use netpod::query::RawEventsQuery;
use netpod::{AggKind, ByteOrder, ByteSize, Channel, NanoRange, NodeConfigCached, ScalarType, Shape};
use netpod::{AggKind, ByteOrder, ByteSize, Channel, FileIoBufferSize, NanoRange, NodeConfigCached, ScalarType, Shape};
use parse::channelconfig::{extract_matching_config_entry, read_local_config, ConfigEntry, MatchingConfigEntry};
use std::pin::Pin;
@@ -183,7 +183,7 @@ pub async fn make_event_pipe(
channel_config.clone(),
node_config.node.clone(),
node_config.ix,
evq.disk_io_buffer_size,
FileIoBufferSize::new(evq.disk_io_buffer_size),
event_chunker_conf,
true,
);
@@ -222,7 +222,7 @@ pub fn make_local_event_blobs_stream(
entry: &ConfigEntry,
expand: bool,
event_chunker_conf: EventChunkerConf,
disk_io_buffer_size: usize,
file_io_buffer_size: FileIoBufferSize,
node_config: &NodeConfigCached,
) -> Result<EventChunkerMultifile, Error> {
let shape = match entry.to_shape() {
@@ -244,7 +244,7 @@ pub fn make_local_event_blobs_stream(
channel_config.clone(),
node_config.node.clone(),
node_config.ix,
disk_io_buffer_size,
file_io_buffer_size,
event_chunker_conf,
expand,
);
@@ -257,7 +257,7 @@ pub fn make_remote_event_blobs_stream(
entry: &ConfigEntry,
expand: bool,
event_chunker_conf: EventChunkerConf,
disk_io_buffer_size: usize,
file_io_buffer_size: FileIoBufferSize,
node_config: &NodeConfigCached,
) -> Result<impl Stream<Item = Sitemty<EventFull>>, Error> {
let shape = match entry.to_shape() {
@@ -279,7 +279,7 @@ pub fn make_remote_event_blobs_stream(
channel_config.clone(),
node_config.node.clone(),
node_config.ix,
disk_io_buffer_size,
file_io_buffer_size,
event_chunker_conf,
expand,
);
@@ -296,6 +296,7 @@ pub async fn make_event_blobs_pipe(
Err(e) => return Err(e)?,
}
}
let file_io_buffer_size = FileIoBufferSize::new(evq.disk_io_buffer_size);
let expand = evq.agg_kind.need_expand();
let range = &evq.range;
let entry = get_applicable_entry(&evq.range, evq.channel.clone(), node_config).await?;
@@ -307,7 +308,7 @@ pub async fn make_event_blobs_pipe(
&entry,
expand,
event_chunker_conf,
evq.disk_io_buffer_size,
file_io_buffer_size,
node_config,
)?;
let s = event_blobs.map(|item| Box::new(item) as Box<dyn Framable>);
@@ -322,7 +323,7 @@ pub async fn make_event_blobs_pipe(
&entry,
expand,
event_chunker_conf,
evq.disk_io_buffer_size,
file_io_buffer_size,
node_config,
)?;
let s = event_blobs.map(|item| Box::new(item) as Box<dyn Framable>);