This commit is contained in:
Dominik Werder
2021-09-29 23:21:38 +02:00
parent cd500620aa
commit c6b9ba7154
4 changed files with 141 additions and 88 deletions

View File

@@ -81,6 +81,7 @@ impl EventChunkerMultifile {
self.seen_after_range_count
}
// TODO remove or use Drop impl?
pub fn close(&mut self) {
if let Some(evs) = &mut self.evs {
self.seen_before_range_count += evs.seen_before_range_count();
@@ -220,6 +221,8 @@ impl Stream for EventChunkerMultifile {
#[cfg(test)]
mod test {
use crate::merge::MergedStream;
use crate::rangefilter::RangeFilter;
use crate::{eventblobs::EventChunkerMultifile, eventchunker::EventChunkerConf};
use err::Error;
use futures_util::StreamExt;
@@ -252,8 +255,8 @@ mod test {
};
let task = async move {
let mut event_count = 0;
let mut events = EventChunkerMultifile::new(
range,
let events = EventChunkerMultifile::new(
range.clone(),
channel_config,
node,
nodeix,
@@ -262,6 +265,8 @@ mod test {
true,
true,
);
//let mut events = MergedStream::new(vec![events], range.clone(), true);
let mut events = RangeFilter::new(events, range.clone(), true);
let mut tss = vec![];
while let Some(item) = events.next().await {
match item {
@@ -281,7 +286,6 @@ mod test {
Err(e) => return Err(e.into()),
}
}
events.close();
Ok((event_count, tss))
};
Ok(taskrun::run(task).unwrap())
@@ -298,6 +302,7 @@ mod test {
if res.0 != 3 {
Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?;
}
assert_eq!(res.1, vec![DAY - MS * 1500, DAY, DAY + MS * 1500]);
Ok(())
}
@@ -308,9 +313,10 @@ mod test {
end: DAY + MS * 1501,
};
let res = read_expanded_for_range(range, 0)?;
if res.0 != 3 {
if res.0 != 4 {
Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?;
}
assert_eq!(res.1, vec![DAY - MS * 1500, DAY, DAY + MS * 1500, DAY + MS * 3000]);
Ok(())
}
@@ -321,9 +327,7 @@ mod test {
end: DAY + MS * 1501,
};
let res = read_expanded_for_range(range, 0)?;
if res.0 != 3 {
Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?;
}
assert_eq!(res.1, vec![DAY - MS * 1500, DAY, DAY + MS * 1500, DAY + MS * 3000]);
Ok(())
}
@@ -335,9 +339,10 @@ mod test {
end: DAY + MS * 1501,
};
let res = read_expanded_for_range(range, 0)?;
if res.0 != 4 {
Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?;
}
assert_eq!(
res.1,
vec![DAY - MS * 3000, DAY - MS * 1500, DAY, DAY + MS * 1500, DAY + MS * 3000]
);
Ok(())
}
}

View File

@@ -34,6 +34,7 @@ pub mod index;
pub mod merge;
pub mod mergeblobs;
pub mod paths;
pub mod rangefilter;
pub mod raw;
pub mod streamlog;

View File

@@ -30,8 +30,6 @@ pub struct MergedStream<S, ITY> {
errored: bool,
completed: bool,
batch: ITY,
range: NanoRange,
expand: bool,
ts_last_emit: u64,
range_complete_observed: Vec<bool>,
range_complete_observed_all: bool,
@@ -39,8 +37,6 @@ pub struct MergedStream<S, ITY> {
data_emit_complete: bool,
batch_size: ByteSize,
batch_len_emit_histo: HistoLog2,
emitted_after_range: usize,
pre_range_buf: ITY,
logitems: VecDeque<LogItem>,
event_data_read_stats_items: VecDeque<EventDataReadStats>,
}
@@ -70,8 +66,6 @@ where
errored: false,
completed: false,
batch: <ITY as Appendable>::empty(),
range,
expand,
ts_last_emit: 0,
range_complete_observed: vec![false; n],
range_complete_observed_all: false,
@@ -79,8 +73,6 @@ where
data_emit_complete: false,
batch_size: ByteSize::kb(128),
batch_len_emit_histo: HistoLog2::new(0),
emitted_after_range: 0,
pre_range_buf: ITY::empty(),
logitems: VecDeque::new(),
event_data_read_stats_items: VecDeque::new(),
}
@@ -154,7 +146,7 @@ where
impl<S, ITY> Stream for MergedStream<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: PushableIndex + Appendable + Clearable + ByteEstimate + WithTimestamps + Unpin,
ITY: PushableIndex + Appendable + ByteEstimate + WithTimestamps + Unpin,
{
type Item = Sitemty<ITY>;
@@ -205,21 +197,8 @@ where
}
}
if lowest_ix == usize::MAX {
if self.pre_range_buf.len() == 1 {
let mut ldst = std::mem::replace(&mut self.batch, ITY::empty());
let ts4 = self.pre_range_buf.ts(0);
info!("\n\nMERGER enqueue after exhausted from stash {}", ts4);
ldst.push_index(&self.pre_range_buf, 0);
self.pre_range_buf.clear();
self.ts_last_emit = ts4;
self.batch = ldst;
} else if self.pre_range_buf.len() > 1 {
panic!();
} else {
};
if self.batch.len() != 0 {
let emp = ITY::empty();
let ret = std::mem::replace(&mut self.batch, emp);
let ret = std::mem::replace(&mut self.batch, ITY::empty());
self.batch_len_emit_histo.ingest(ret.len() as u32);
self.data_emit_complete = true;
if LOG_EMIT_ITEM {
@@ -236,58 +215,8 @@ where
}
} else {
assert!(lowest_ts >= self.ts_last_emit);
let do_emit_event;
let emit_packet_now_2;
if lowest_ts < self.range.beg {
do_emit_event = false;
emit_packet_now_2 = false;
if self.expand {
info!("\n\nMERGER stash {}", lowest_ts);
let mut ldst = std::mem::replace(&mut self.pre_range_buf, ITY::empty());
ldst.clear();
let rix = self.ixs[lowest_ix];
match &self.current[lowest_ix] {
MergedCurVal::Val(val) => ldst.push_index(val, rix),
MergedCurVal::None => panic!(),
MergedCurVal::Finish => panic!(),
}
self.pre_range_buf = ldst;
} else {
};
} else if lowest_ts >= self.range.end {
if self.expand {
if self.emitted_after_range == 0 {
do_emit_event = true;
emit_packet_now_2 = true;
self.emitted_after_range += 1;
self.range_complete_observed_all = true;
self.data_emit_complete = true;
} else {
do_emit_event = false;
emit_packet_now_2 = false;
};
} else {
do_emit_event = false;
emit_packet_now_2 = false;
self.data_emit_complete = true;
};
} else {
do_emit_event = true;
emit_packet_now_2 = false;
};
if do_emit_event {
{
let mut ldst = std::mem::replace(&mut self.batch, ITY::empty());
if self.pre_range_buf.len() == 1 {
let ts4 = self.pre_range_buf.ts(0);
info!("\n\nMERGER enqueue from stash {}", ts4);
ldst.push_index(&self.pre_range_buf, 0);
self.pre_range_buf.clear();
} else if self.pre_range_buf.len() > 1 {
panic!();
} else {
info!("\n\nMERGER nothing in stash");
};
info!("\n\nMERGER enqueue {}", lowest_ts);
self.ts_last_emit = lowest_ts;
let rix = self.ixs[lowest_ix];
match &self.current[lowest_ix] {
@@ -309,11 +238,10 @@ where
self.ixs[lowest_ix] = 0;
self.current[lowest_ix] = MergedCurVal::None;
}
let emit_packet_now;
if emit_packet_now_2 || self.batch.byte_estimate() >= self.batch_size.bytes() as u64 {
emit_packet_now = true;
let emit_packet_now = if self.batch.byte_estimate() >= self.batch_size.bytes() as u64 {
true
} else {
emit_packet_now = false;
false
};
if emit_packet_now {
trace!("emit item because over threshold len {}", self.batch.len());

119
disk/src/rangefilter.rs Normal file
View File

@@ -0,0 +1,119 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::Stream;
use futures_util::StreamExt;
use items::{Appendable, Clearable, PushableIndex, RangeCompletableItem, Sitemty, StreamItem, WithTimestamps};
use netpod::NanoRange;
pub struct RangeFilter<S, ITY> {
inp: S,
range: NanoRange,
expand: bool,
prerange: ITY,
have_pre: bool,
emitted_pre: bool,
emitted_post: bool,
done: bool,
complete: bool,
}
impl<S, ITY> RangeFilter<S, ITY>
where
ITY: Appendable,
{
pub fn new(inp: S, range: NanoRange, expand: bool) -> Self {
Self {
inp,
range,
expand,
prerange: ITY::empty(),
have_pre: false,
emitted_pre: false,
emitted_post: false,
done: false,
complete: false,
}
}
}
impl<S, ITY> Stream for RangeFilter<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin,
{
type Item = Sitemty<ITY>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
if self.complete {
panic!("poll_next on complete");
} else if self.done {
self.complete = true;
Ready(None)
} else {
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => match item {
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => {
let mut ret = ITY::empty();
for i1 in 0..item.len() {
let ts = item.ts(i1);
if ts < self.range.beg {
if self.expand {
self.prerange.clear();
self.prerange.push_index(&item, i1);
self.have_pre = true;
} else {
};
} else if ts >= self.range.end {
if self.expand {
if self.have_pre {
ret.push_index(&self.prerange, 0);
self.prerange.clear();
self.have_pre = false;
};
if !self.emitted_post {
self.emitted_post = true;
ret.push_index(&item, i1);
self.done = true;
} else {
panic!();
};
} else {
self.done = true;
};
} else {
if self.expand {
if self.have_pre {
ret.push_index(&self.prerange, 0);
self.prerange.clear();
self.have_pre = false;
}
ret.push_index(&item, i1);
} else {
ret.push_index(&item, i1);
};
};
}
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
}
k => Ready(Some(k)),
},
Ready(None) => {
if self.have_pre {
let mut ret = ITY::empty();
ret.push_index(&self.prerange, 0);
self.have_pre = false;
self.done = true;
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
} else {
self.done = true;
self.complete = true;
Ready(None)
}
}
Pending => Pending,
}
}
}
}