Emit events in better batch sizes

This commit is contained in:
Dominik Werder
2021-09-16 20:44:07 +02:00
parent 7166e65277
commit ad7f25b4c6
24 changed files with 441 additions and 330 deletions

View File

@@ -63,6 +63,7 @@ async fn agg_x_dim_0_inner() {
file_io_buffer_size,
event_chunker_conf,
false,
true,
);
let _ = fut1;
// TODO add the binning and expectation and await the result.
@@ -114,6 +115,7 @@ async fn agg_x_dim_1_inner() {
file_io_buffer_size,
event_chunker_conf,
false,
true,
);
let _ = fut1;
// TODO add the binning and expectation and await the result.

View File

@@ -119,6 +119,7 @@ impl ChannelExecFunction for BinnedBinaryChannelExec {
range: self.query.range().clone(),
agg_kind: self.query.agg_kind().clone(),
disk_io_buffer_size: self.query.disk_io_buffer_size(),
do_decompress: true,
};
let x_bin_count = x_bin_count(&shape, self.query.agg_kind());
let s = MergedFromRemotes::<ENP>::new(evq, perf_opts, self.node_config.node_config.cluster.clone());
@@ -363,6 +364,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec {
range: self.query.range().clone(),
agg_kind: self.query.agg_kind().clone(),
disk_io_buffer_size: self.query.disk_io_buffer_size(),
do_decompress: true,
};
let x_bin_count = x_bin_count(&shape, self.query.agg_kind());
let s = MergedFromRemotes::<ENP>::new(evq, perf_opts, self.node_config.node_config.cluster.clone());

View File

@@ -117,6 +117,7 @@ where
range: self.query.patch().patch_range(),
agg_kind: self.query.agg_kind().clone(),
disk_io_buffer_size: self.query.disk_io_buffer_size(),
do_decompress: true,
};
if self.query.patch().patch_t_len() % self.query.patch().bin_t_len() != 0 {
let msg = format!(

View File

@@ -263,6 +263,7 @@ impl ChannelExecFunction for PlainEvents {
range: self.range,
agg_kind: self.agg_kind,
disk_io_buffer_size: self.disk_io_buffer_size,
do_decompress: true,
};
let s = MergedFromRemotes::<Identity<NTY>>::new(evq, perf_opts, self.node_config.node_config.cluster);
let s = s.map(|item| Box::new(item) as Box<dyn Framable>);
@@ -417,6 +418,7 @@ impl ChannelExecFunction for PlainEventsJson {
range: self.range,
agg_kind: self.agg_kind,
disk_io_buffer_size: self.disk_io_buffer_size,
do_decompress: true,
};
let s = MergedFromRemotes::<ENP>::new(evq, perf_opts, self.node_config.node_config.cluster);
let f = collect_plain_events_json(s, self.timeout, 0, self.do_log);

View File

@@ -34,6 +34,7 @@ pub struct EventChunkerMultifile {
seen_before_range_count: usize,
seen_after_range_count: usize,
expand: bool,
do_decompress: bool,
}
impl EventChunkerMultifile {
@@ -45,6 +46,7 @@ impl EventChunkerMultifile {
file_io_buffer_size: FileIoBufferSize,
event_chunker_conf: EventChunkerConf,
expand: bool,
do_decompress: bool,
) -> Self {
let file_chan = if expand {
open_expanded_files(&range, &channel_config, node)
@@ -67,6 +69,7 @@ impl EventChunkerMultifile {
seen_before_range_count: 0,
seen_after_range_count: 0,
expand,
do_decompress,
}
}
@@ -108,7 +111,9 @@ impl Stream for EventChunkerMultifile {
Some(evs) => match evs.poll_next_unpin(cx) {
Ready(Some(k)) => {
if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(h))) = &k {
info!("EventChunkerMultifile emit {} events", h.tss.len());
if false {
info!("EventChunkerMultifile emit {} events", h.tss.len());
};
}
Ready(Some(k))
}
@@ -144,6 +149,7 @@ impl Stream for EventChunkerMultifile {
path,
self.max_ts.clone(),
self.expand,
self.do_decompress,
);
self.evs = Some(Box::pin(chunker));
}
@@ -169,6 +175,7 @@ impl Stream for EventChunkerMultifile {
of.path,
self.max_ts.clone(),
self.expand,
self.do_decompress,
);
chunkers.push(chunker);
}
@@ -247,6 +254,7 @@ fn read_expanded_for_range(range: netpod::NanoRange) -> Result<(usize, usize), E
FileIoBufferSize::new(buffer_size),
event_chunker_conf,
true,
true,
);
while let Some(item) = events.next().await {
match item {

View File

@@ -5,8 +5,10 @@ use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::{
Appendable, PushableIndex, RangeCompletableItem, SitemtyFrameType, StatsItem, StreamItem, WithLen, WithTimestamps,
Appendable, ByteEstimate, PushableIndex, RangeCompletableItem, SitemtyFrameType, StatsItem, StreamItem, WithLen,
WithTimestamps,
};
use netpod::histo::HistoLog2;
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{ByteSize, ChannelConfig, EventDataReadStats, NanoRange, ScalarType, Shape};
@@ -17,6 +19,7 @@ use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
pub struct EventChunker {
inp: NeedMinBuffer,
@@ -35,11 +38,24 @@ pub struct EventChunker {
path: PathBuf,
max_ts: Arc<AtomicU64>,
expand: bool,
do_decompress: bool,
decomp_dt_histo: HistoLog2,
item_len_emit_histo: HistoLog2,
seen_before_range_count: usize,
seen_after_range_count: usize,
unordered_warn_count: usize,
}
// TODO remove again, use it explicitly
impl Drop for EventChunker {
fn drop(&mut self) {
info!(
"EventChunker Drop Stats:\ndecomp_dt_histo: {:?}\nitem_len_emit_histo: {:?}",
self.decomp_dt_histo, self.item_len_emit_histo
);
}
}
enum DataFileState {
FileHeader,
Event,
@@ -70,6 +86,7 @@ impl EventChunker {
path: PathBuf,
max_ts: Arc<AtomicU64>,
expand: bool,
do_decompress: bool,
) -> Self {
let mut inp = NeedMinBuffer::new(inp);
inp.set_need_min(6);
@@ -90,6 +107,9 @@ impl EventChunker {
path,
max_ts,
expand,
do_decompress,
decomp_dt_histo: HistoLog2::new(8),
item_len_emit_histo: HistoLog2::new(0),
seen_before_range_count: 0,
seen_after_range_count: 0,
unordered_warn_count: 0,
@@ -104,8 +124,18 @@ impl EventChunker {
path: PathBuf,
max_ts: Arc<AtomicU64>,
expand: bool,
do_decompress: bool,
) -> Self {
let mut ret = Self::from_start(inp, channel_config, range, stats_conf, path, max_ts, expand);
let mut ret = Self::from_start(
inp,
channel_config,
range,
stats_conf,
path,
max_ts,
expand,
do_decompress,
);
ret.state = DataFileState::Event;
ret.need_min = 4;
ret.inp.set_need_min(4);
@@ -324,36 +354,47 @@ impl EventChunker {
}
}
}
let decomp_bytes = (type_size * ele_count as u32) as usize;
let mut decomp = BytesMut::with_capacity(decomp_bytes);
unsafe {
decomp.set_len(decomp_bytes);
}
// TODO limit the buf slice range
match bitshuffle_decompress(
&buf.as_ref()[(p1 as usize + 12)..(p1 as usize + k1 as usize)],
&mut decomp,
ele_count as usize,
ele_size as usize,
0,
) {
Ok(c1) => {
assert!(c1 as u64 + 12 == k1);
ret.add_event(
ts,
pulse,
buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)].to_vec(),
Some(decomp),
ScalarType::from_dtype_index(type_index)?,
is_big_endian,
shape_this,
comp_this,
);
}
Err(e) => {
Err(Error::with_msg(format!("decompression failed {:?}", e)))?;
let decomp = {
if self.do_decompress {
let ts1 = Instant::now();
let decomp_bytes = (type_size * ele_count as u32) as usize;
let mut decomp = BytesMut::with_capacity(decomp_bytes);
unsafe {
decomp.set_len(decomp_bytes);
}
// TODO limit the buf slice range
match bitshuffle_decompress(
&buf.as_ref()[(p1 as usize + 12)..(p1 as usize + k1 as usize)],
&mut decomp,
ele_count as usize,
ele_size as usize,
0,
) {
Ok(c1) => {
assert!(c1 as u64 + 12 == k1);
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1);
self.decomp_dt_histo.ingest(dt.as_secs() as u32 + dt.subsec_micros());
decomp
}
Err(e) => {
return Err(Error::with_msg(format!("decompression failed {:?}", e)))?;
}
}
} else {
BytesMut::new()
}
};
ret.add_event(
ts,
pulse,
buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)].to_vec(),
Some(decomp),
ScalarType::from_dtype_index(type_index)?,
is_big_endian,
shape_this,
comp_this,
);
} else {
if len < p1 as u32 + 4 {
let msg = format!("uncomp len: {} p1: {}", len, p1);
@@ -506,6 +547,19 @@ impl WithTimestamps for EventFull {
}
}
impl ByteEstimate for EventFull {
fn byte_estimate(&self) -> u64 {
if self.tss.len() == 0 {
0
} else {
// TODO that is clumsy... it assumes homogenous types.
// TODO improve via a const fn on NTY
let decomp_len = self.decomps[0].as_ref().map_or(0, |h| h.len());
self.tss.len() as u64 * (40 + self.blobs[0].len() as u64 + decomp_len as u64)
}
}
}
impl PushableIndex for EventFull {
// TODO check all use cases, can't we move?
fn push_index(&mut self, src: &Self, ix: usize) {
@@ -599,13 +653,14 @@ impl Stream for EventChunker {
}
let x = self.need_min;
self.inp.set_need_min(x);
{
if false {
info!(
"EventChunker emits {} events tss {:?}",
res.events.len(),
res.events.tss
);
};
self.item_len_emit_histo.ingest(res.events.len() as u32);
let ret = StreamItem::DataItem(RangeCompletableItem::Data(res.events));
Ready(Some(Ok(ret)))
}

View File

@@ -1,19 +1,19 @@
use crate::dataopen::open_files;
use crate::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE};
use bytes::{Bytes, BytesMut};
use err::Error;
use futures_core::Stream;
use futures_util::future::FusedFuture;
use futures_util::{pin_mut, select, FutureExt, StreamExt};
use futures_util::StreamExt;
use netpod::{log::*, FileIoBufferSize};
use netpod::{ChannelConfig, NanoRange, Node, Shape};
use netpod::{ChannelConfig, Node, Shape};
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::{fmt, mem};
use tokio::fs::{File, OpenOptions};
use tokio::io::AsyncRead;
use tokio::io::{AsyncRead, ReadBuf};
pub mod agg;
#[cfg(test)]
@@ -91,7 +91,7 @@ impl Stream for FileReader {
}
}
struct Fopen1 {
pub struct Fopen1 {
#[allow(dead_code)]
opts: OpenOptions,
fut: Pin<Box<dyn Future<Output = Result<File, std::io::Error>>>>,
@@ -142,201 +142,130 @@ impl FusedFuture for Fopen1 {
unsafe impl Send for Fopen1 {}
#[allow(dead_code)]
fn unused_raw_concat_channel_read_stream_try_open_in_background(
query: &netpod::AggQuerySingleChannel,
node: Node,
) -> impl Stream<Item = Result<Bytes, Error>> + Send {
let query = query.clone();
let node = node.clone();
async_stream::stream! {
use tokio::io::AsyncReadExt;
let mut fopen = None;
let mut fopen_avail = false;
let mut file_prep: Option<File> = None;
let mut file: Option<File> = None;
let mut reading = None;
let mut i1 = 0;
let mut i9 = 0;
loop {
let blen = query.buffer_size as usize;
{
if !fopen_avail && file_prep.is_none() && i1 < 16 {
info!("Prepare open task for next file {}", query.timebin + i1);
fopen.replace(Fopen1::new(paths::datapath(query.timebin as u64 + i1 as u64, &query.channel_config, &node)));
fopen_avail = true;
i1 += 1;
}
}
if !fopen_avail && file_prep.is_none() && file.is_none() && reading.is_none() {
info!("Nothing more to do");
break;
}
// TODO
// When the file is available, I can prepare the next reading.
// But next iteration, the file is not available, but reading is, so I should read!
// I can not simply drop the reading future, that would lose the request.
if let Some(read) = &mut reading {
let k: Result<(tokio::fs::File, BytesMut), Error> = read.await;
if k.is_err() {
error!("LONELY READ ERROR");
}
let k = k.unwrap();
reading = None;
file = Some(k.0);
yield Ok(k.1.freeze());
}
else if fopen.is_some() {
if file.is_some() {
if reading.is_none() {
let mut buf = BytesMut::with_capacity(blen);
let mut file2 = file.take().unwrap();
let a = async move {
file2.read_buf(&mut buf).await?;
Ok::<_, Error>((file2, buf))
};
let a = Box::pin(a);
reading = Some(a.fuse());
}
// TODO do I really have to take out the future while waiting on it?
// I think the issue is now with the mutex guard, can I get rid of the mutex again?
let mut fopen3 = fopen.take().unwrap();
let bufres = select! {
// TODO can I avoid the unwraps via matching already above?
f = fopen3 => {
fopen_avail = false;
// TODO feed out the potential error:
file_prep = Some(f.unwrap());
None
}
k = reading.as_mut().unwrap() => {
info!("COMBI read chunk");
reading = None;
// TODO handle the error somehow here...
if k.is_err() {
error!("READ ERROR IN COMBI");
}
let k = k.unwrap();
file = Some(k.0);
Some(k.1)
}
};
if fopen_avail {
fopen.replace(fopen3);
}
if let Some(k) = bufres {
yield Ok(k.freeze());
}
}
else {
info!("----------------- no file open yet, await only opening of the file");
// TODO try to avoid this duplicated code:
if fopen.is_none() {
error!("logic BB");
}
let fopen3 = fopen.take().unwrap();
let f = fopen3.await?;
info!("opened next file SOLO");
fopen_avail = false;
file = Some(f);
}
}
else if file.is_some() {
loop {
let mut buf = BytesMut::with_capacity(blen);
let mut file2 = file.take().unwrap();
let n1 = file2.read_buf(&mut buf).await?;
if n1 == 0 {
if file_prep.is_some() {
file.replace(file_prep.take().unwrap());
}
else {
info!("After read loop, next file not yet ready");
}
break;
}
else {
file.replace(file2);
yield Ok(buf.freeze());
}
}
}
i9 += 1;
if i9 > 100 {
break;
}
}
}
}
#[allow(dead_code)]
fn unused_raw_concat_channel_read_stream_file_pipe(
range: &NanoRange,
channel_config: &ChannelConfig,
node: Node,
buffer_size: usize,
) -> impl Stream<Item = Result<BytesMut, Error>> + Send {
let range = range.clone();
let channel_config = channel_config.clone();
let node = node.clone();
async_stream::stream! {
let chrx = open_files(&range, &channel_config, node);
while let Ok(file) = chrx.recv().await {
let mut file = match file {
Ok(mut k) => {
k.files.truncate(1);
k.files.pop().unwrap().file.unwrap()
}
Err(_) => break
};
loop {
let mut buf = BytesMut::with_capacity(buffer_size);
use tokio::io::AsyncReadExt;
let n1 = file.read_buf(&mut buf).await?;
if n1 == 0 {
info!("file EOF");
break;
}
else {
yield Ok(buf);
}
}
}
}
}
pub struct FileChunkRead {
buf: BytesMut,
cap0: usize,
rem0: usize,
remmut0: usize,
duration: Duration,
}
pub fn file_content_stream(
mut file: File,
impl fmt::Debug for FileChunkRead {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("FileChunkRead")
.field("buf.len", &self.buf.len())
.field("buf.cap", &self.buf.capacity())
.field("cap0", &self.cap0)
.field("rem0", &self.rem0)
.field("remmut0", &self.remmut0)
.field("duration", &self.duration)
.finish()
}
}
pub struct FileContentStream {
file: File,
file_io_buffer_size: FileIoBufferSize,
) -> impl Stream<Item = Result<FileChunkRead, Error>> + Send {
async_stream::stream! {
use tokio::io::AsyncReadExt;
loop {
let ts1 = Instant::now();
let mut buf = BytesMut::with_capacity(file_io_buffer_size.0);
let n1 = file.read_buf(&mut buf).await?;
let ts2 = Instant::now();
if n1 == 0 {
trace!("file EOF");
break;
}
else {
let ret = FileChunkRead {
buf,
duration: ts2.duration_since(ts1),
};
yield Ok(ret);
}
read_going: bool,
buf: BytesMut,
ts1: Instant,
nlog: usize,
done: bool,
complete: bool,
}
impl FileContentStream {
pub fn new(file: File, file_io_buffer_size: FileIoBufferSize) -> Self {
Self {
file,
file_io_buffer_size,
read_going: false,
buf: BytesMut::new(),
ts1: Instant::now(),
nlog: 0,
done: false,
complete: false,
}
}
}
impl Stream for FileContentStream {
type Item = Result<FileChunkRead, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break if self.complete {
panic!("poll_next on complete")
} else if self.done {
self.complete = true;
Ready(None)
} else {
let mut buf = if !self.read_going {
self.ts1 = Instant::now();
let mut buf = BytesMut::new();
buf.resize(self.file_io_buffer_size.0, 0);
buf
} else {
mem::replace(&mut self.buf, BytesMut::new())
};
let mutsl = buf.as_mut();
let mut rb = ReadBuf::new(mutsl);
let f1 = &mut self.file;
let f2 = Pin::new(f1);
let pollres = AsyncRead::poll_read(f2, cx, &mut rb);
match pollres {
Ready(Ok(_)) => {
let nread = rb.filled().len();
let cap0 = rb.capacity();
let rem0 = rb.remaining();
let remmut0 = nread;
buf.truncate(nread);
self.read_going = false;
let ts2 = Instant::now();
if nread == 0 {
let ret = FileChunkRead {
buf,
cap0,
rem0,
remmut0,
duration: ts2.duration_since(self.ts1),
};
self.done = true;
Ready(Some(Ok(ret)))
} else {
let ret = FileChunkRead {
buf,
cap0,
rem0,
remmut0,
duration: ts2.duration_since(self.ts1),
};
if false && self.nlog < 6 {
self.nlog += 1;
info!("{:?} ret {:?}", self.file_io_buffer_size, ret);
}
Ready(Some(Ok(ret)))
}
}
Ready(Err(e)) => {
self.done = true;
Ready(Some(Err(e.into())))
}
Pending => Pending,
}
};
}
}
}
pub fn file_content_stream(
file: File,
file_io_buffer_size: FileIoBufferSize,
) -> impl Stream<Item = Result<FileChunkRead, Error>> + Send {
FileContentStream::new(file, file_io_buffer_size)
}
pub struct NeedMinBuffer {
inp: Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>>,
need_min: u32,
@@ -391,13 +320,22 @@ impl Stream for NeedMinBuffer {
let mut again = false;
let z = match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(fcr))) => {
const SUB: usize = 8;
let mut u = fcr.buf.len();
let mut po = 0;
while u != 0 && po < 15 {
u /= 2;
u = u >> 1;
po += 1;
}
let po = if po > 8 { po - 8 } else { 0 };
let po = if po >= self.buf_len_histo.len() + SUB {
self.buf_len_histo.len() - 1
} else {
if po > SUB {
po - SUB
} else {
0
}
};
self.buf_len_histo[po] += 1;
//info!("NeedMinBuffer got buf len {}", fcr.buf.len());
match self.left.take() {
@@ -443,64 +381,6 @@ impl Stream for NeedMinBuffer {
}
}
#[allow(dead_code)]
fn raw_concat_channel_read_stream(
query: &netpod::AggQuerySingleChannel,
node: Node,
) -> impl Stream<Item = Result<Bytes, Error>> + Send {
let mut query = query.clone();
let node = node.clone();
async_stream::stream! {
let mut i1 = 0;
loop {
let timebin = 18700 + i1;
query.timebin = timebin;
let s2 = raw_concat_channel_read_stream_timebin(&query, node.clone());
pin_mut!(s2);
while let Some(item) = s2.next().await {
yield item;
}
i1 += 1;
if i1 > 15 {
break;
}
}
}
}
#[allow(dead_code)]
fn raw_concat_channel_read_stream_timebin(
query: &netpod::AggQuerySingleChannel,
node: Node,
) -> impl Stream<Item = Result<Bytes, Error>> {
let query = query.clone();
let node = node.clone();
async_stream::stream! {
let path = paths::datapath(query.timebin as u64, &query.channel_config, &node);
debug!("try path: {:?}", path);
let mut fin = OpenOptions::new().read(true).open(path).await?;
let meta = fin.metadata().await?;
debug!("file meta {:?}", meta);
let blen = query.buffer_size as usize;
use tokio::io::AsyncReadExt;
loop {
let mut buf = BytesMut::with_capacity(blen);
assert!(buf.is_empty());
if false {
buf.resize(buf.capacity(), 0);
if buf.as_mut().len() != blen {
panic!("logic");
}
}
let n1 = fin.read_buf(&mut buf).await?;
if n1 == 0 {
break;
}
yield Ok(buf.freeze());
}
}
}
pub mod dtflags {
pub const COMPRESSION: u8 = 0x80;
pub const ARRAY: u8 = 0x40;

View File

@@ -1,11 +1,14 @@
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::ByteEstimate;
use items::{
Appendable, EventsNodeProcessor, LogItem, PushableIndex, RangeCompletableItem, Sitemty, StatsItem, StreamItem,
WithLen, WithTimestamps,
};
use netpod::histo::HistoLog2;
use netpod::log::*;
use netpod::ByteSize;
use netpod::EventDataReadStats;
use std::collections::VecDeque;
use std::pin::Pin;
@@ -36,11 +39,26 @@ where
range_complete_observed_all: bool,
range_complete_observed_all_emitted: bool,
data_emit_complete: bool,
batch_size: usize,
batch_size: ByteSize,
batch_len_emit_histo: HistoLog2,
logitems: VecDeque<LogItem>,
event_data_read_stats_items: VecDeque<EventDataReadStats>,
}
// TODO get rid, log info explicitly.
impl<S, ENP> Drop for MergedStream<S, ENP>
where
S: Stream<Item = Sitemty<<ENP as EventsNodeProcessor>::Output>>,
ENP: EventsNodeProcessor,
{
fn drop(&mut self) {
info!(
"MergedStream Drop Stats:\nbatch_len_emit_histo: {:?}",
self.batch_len_emit_histo
);
}
}
impl<S, ENP> MergedStream<S, ENP>
where
S: Stream<Item = Sitemty<<ENP as EventsNodeProcessor>::Output>> + Unpin,
@@ -62,7 +80,8 @@ where
range_complete_observed_all: false,
range_complete_observed_all_emitted: false,
data_emit_complete: false,
batch_size: 1,
batch_size: ByteSize::kb(128),
batch_len_emit_histo: HistoLog2::new(0),
logitems: VecDeque::new(),
event_data_read_stats_items: VecDeque::new(),
}
@@ -137,7 +156,7 @@ impl<S, ENP> Stream for MergedStream<S, ENP>
where
S: Stream<Item = Sitemty<<ENP as EventsNodeProcessor>::Output>> + Unpin,
ENP: EventsNodeProcessor,
<ENP as EventsNodeProcessor>::Output: PushableIndex + Appendable,
<ENP as EventsNodeProcessor>::Output: PushableIndex + Appendable + ByteEstimate,
{
type Item = Sitemty<<ENP as EventsNodeProcessor>::Output>;
@@ -192,6 +211,7 @@ where
if self.batch.len() != 0 {
let emp = <<ENP as EventsNodeProcessor>::Output>::empty();
let ret = std::mem::replace(&mut self.batch, emp);
self.batch_len_emit_histo.ingest(ret.len() as u32);
self.data_emit_complete = true;
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
} else {
@@ -222,9 +242,11 @@ where
self.ixs[lowest_ix] = 0;
self.current[lowest_ix] = MergedCurVal::None;
}
if self.batch.len() >= self.batch_size {
if self.batch.byte_estimate() >= self.batch_size.bytes() as u64 {
trace!("emit item because over threshold len {}", self.batch.len());
let emp = <<ENP as EventsNodeProcessor>::Output>::empty();
let ret = std::mem::replace(&mut self.batch, emp);
self.batch_len_emit_histo.ingest(ret.len() as u32);
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
} else {
continue 'outer;

View File

@@ -27,7 +27,6 @@ impl MergedBlobsFromRemotes {
pub fn new(evq: RawEventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self {
info!("MergedBlobsFromRemotes evq {:?}", evq);
let mut tcp_establish_futs = vec![];
for node in &cluster.nodes {
let f = x_processed_event_blobs_stream_from_node(evq.clone(), perf_opts.clone(), node.clone());
let f: T002<EventFull> = Box::pin(f);

View File

@@ -2,15 +2,19 @@ use crate::HasSeenBeforeRangeCount;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::ByteEstimate;
use items::{
Appendable, LogItem, PushableIndex, RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithLen, WithTimestamps,
};
use netpod::log::*;
use netpod::histo::HistoLog2;
use netpod::EventDataReadStats;
use netpod::{log::*, ByteSize};
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
const LOG_EMIT_ITEM: bool = false;
enum MergedCurVal<T> {
None,
Finish,
@@ -20,7 +24,7 @@ enum MergedCurVal<T> {
pub struct MergedBlobsStream<S, I>
where
S: Stream<Item = Sitemty<I>> + Unpin,
I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen,
I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen + ByteEstimate,
{
inps: Vec<S>,
current: Vec<MergedCurVal<I>>,
@@ -33,15 +37,30 @@ where
range_complete_observed_all: bool,
range_complete_observed_all_emitted: bool,
data_emit_complete: bool,
batch_size: usize,
batch_size: ByteSize,
batch_len_emit_histo: HistoLog2,
logitems: VecDeque<LogItem>,
event_data_read_stats_items: VecDeque<EventDataReadStats>,
}
// TODO get rid, log info explicitly.
impl<S, I> Drop for MergedBlobsStream<S, I>
where
S: Stream<Item = Sitemty<I>> + Unpin,
I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen + ByteEstimate,
{
fn drop(&mut self) {
info!(
"MergedBlobsStream Drop Stats:\nbatch_len_emit_histo: {:?}",
self.batch_len_emit_histo
);
}
}
impl<S, I> MergedBlobsStream<S, I>
where
S: Stream<Item = Sitemty<I>> + Unpin,
I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen,
I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen + ByteEstimate,
{
pub fn new(inps: Vec<S>) -> Self {
let n = inps.len();
@@ -58,7 +77,8 @@ where
range_complete_observed_all: false,
range_complete_observed_all_emitted: false,
data_emit_complete: false,
batch_size: 1,
batch_size: ByteSize::kb(128),
batch_len_emit_histo: HistoLog2::new(0),
logitems: VecDeque::new(),
event_data_read_stats_items: VecDeque::new(),
}
@@ -132,7 +152,7 @@ where
impl<S, I> Stream for MergedBlobsStream<S, I>
where
S: Stream<Item = Sitemty<I>> + Unpin,
I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen,
I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen + ByteEstimate,
{
type Item = Sitemty<I>;
@@ -187,8 +207,9 @@ where
if self.batch.len() != 0 {
let emp = I::empty();
let ret = std::mem::replace(&mut self.batch, emp);
self.batch_len_emit_histo.ingest(ret.len() as u32);
self.data_emit_complete = true;
{
if LOG_EMIT_ITEM {
let mut aa = vec![];
for ii in 0..ret.len() {
aa.push(ret.ts(ii));
@@ -224,10 +245,12 @@ where
self.ixs[lowest_ix] = 0;
self.current[lowest_ix] = MergedCurVal::None;
}
if self.batch.len() >= self.batch_size {
if self.batch.byte_estimate() >= self.batch_size.bytes() as u64 {
trace!("emit item because over threshold len {}", self.batch.len());
let emp = I::empty();
let ret = std::mem::replace(&mut self.batch, emp);
{
self.batch_len_emit_histo.ingest(ret.len() as u32);
if LOG_EMIT_ITEM {
let mut aa = vec![];
for ii in 0..ret.len() {
aa.push(ret.ts(ii));
@@ -254,7 +277,7 @@ where
impl<S, I> HasSeenBeforeRangeCount for MergedBlobsStream<S, I>
where
S: Stream<Item = Sitemty<I>> + Unpin,
I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen,
I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen + ByteEstimate,
{
fn seen_before_range_count(&self) -> usize {
// TODO (only for debug)

View File

@@ -186,6 +186,7 @@ pub async fn make_event_pipe(
FileIoBufferSize::new(evq.disk_io_buffer_size),
event_chunker_conf,
true,
true,
);
let shape = entry.to_shape()?;
let pipe = pipe1!(
@@ -221,6 +222,7 @@ pub fn make_local_event_blobs_stream(
channel: Channel,
entry: &ConfigEntry,
expand: bool,
do_decompress: bool,
event_chunker_conf: EventChunkerConf,
file_io_buffer_size: FileIoBufferSize,
node_config: &NodeConfigCached,
@@ -247,6 +249,7 @@ pub fn make_local_event_blobs_stream(
file_io_buffer_size,
event_chunker_conf,
expand,
do_decompress,
);
Ok(event_blobs)
}
@@ -256,6 +259,7 @@ pub fn make_remote_event_blobs_stream(
channel: Channel,
entry: &ConfigEntry,
expand: bool,
do_decompress: bool,
event_chunker_conf: EventChunkerConf,
file_io_buffer_size: FileIoBufferSize,
node_config: &NodeConfigCached,
@@ -282,6 +286,7 @@ pub fn make_remote_event_blobs_stream(
file_io_buffer_size,
event_chunker_conf,
expand,
do_decompress,
);
Ok(event_blobs)
}
@@ -307,6 +312,7 @@ pub async fn make_event_blobs_pipe(
evq.channel.clone(),
&entry,
expand,
evq.do_decompress,
event_chunker_conf,
file_io_buffer_size,
node_config,
@@ -322,6 +328,7 @@ pub async fn make_event_blobs_pipe(
evq.channel.clone(),
&entry,
expand,
evq.do_decompress,
event_chunker_conf,
file_io_buffer_size,
node_config,