Adapt empty generator

This commit is contained in:
Dominik Werder
2021-10-18 21:41:35 +02:00
parent 7b79070e8a
commit dafe0a6e3b
34 changed files with 1129 additions and 639 deletions

View File

@@ -8,7 +8,6 @@ use std::time::Duration;
pub mod binnedt;
pub mod enp;
pub mod eventbatch;
pub mod scalarbinbatch;
pub mod streams;

View File

@@ -1,169 +0,0 @@
use bytes::{BufMut, Bytes, BytesMut};
use items::{Appendable, RangeOverlapInfo, SitemtyFrameType};
use netpod::log::*;
use netpod::NanoRange;
use serde::{Deserialize, Serialize};
use std::mem::size_of;
#[derive(Serialize, Deserialize)]
pub struct MinMaxAvgScalarEventBatch {
pub tss: Vec<u64>,
pub mins: Vec<f32>,
pub maxs: Vec<f32>,
pub avgs: Vec<f32>,
}
impl SitemtyFrameType for MinMaxAvgScalarEventBatch {
const FRAME_TYPE_ID: u32 = 0x300;
}
impl MinMaxAvgScalarEventBatch {
pub fn empty() -> Self {
Self {
tss: vec![],
mins: vec![],
maxs: vec![],
avgs: vec![],
}
}
#[allow(dead_code)]
pub fn old_from_full_frame(buf: &Bytes) -> Self {
info!("construct MinMaxAvgScalarEventBatch from full frame len {}", buf.len());
assert!(buf.len() >= 4);
let mut g = MinMaxAvgScalarEventBatch::empty();
let n1;
unsafe {
let ptr = (&buf[0] as *const u8) as *const [u8; 4];
n1 = u32::from_le_bytes(*ptr);
trace!("--- +++ --- +++ --- +++ n1: {}", n1);
}
if n1 == 0 {
g
} else {
let n2 = n1 as usize;
g.tss.reserve(n2);
g.mins.reserve(n2);
g.maxs.reserve(n2);
g.avgs.reserve(n2);
unsafe {
// TODO Can I unsafely create ptrs and just assign them?
// TODO What are cases where I really need transmute?
g.tss.set_len(n2);
g.mins.set_len(n2);
g.maxs.set_len(n2);
g.avgs.set_len(n2);
let ptr0 = &buf[4] as *const u8;
{
let ptr1 = ptr0 as *const u64;
for i1 in 0..n2 {
g.tss[i1] = *ptr1.add(i1);
}
}
{
let ptr1 = ptr0.add((8) * n2) as *const f32;
for i1 in 0..n2 {
g.mins[i1] = *ptr1.add(i1);
}
}
{
let ptr1 = ptr0.add((8 + 4) * n2) as *const f32;
for i1 in 0..n2 {
g.maxs[i1] = *ptr1;
}
}
{
let ptr1 = ptr0.add((8 + 4 + 4) * n2) as *const f32;
for i1 in 0..n2 {
g.avgs[i1] = *ptr1;
}
}
}
info!("CONTENT {:?}", g);
g
}
}
}
impl std::fmt::Debug for MinMaxAvgScalarEventBatch {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
fmt,
"MinMaxAvgScalarEventBatch count {} tss {:?} mins {:?} maxs {:?} avgs {:?}",
self.tss.len(),
self.tss,
self.mins,
self.maxs,
self.avgs,
)
}
}
impl MinMaxAvgScalarEventBatch {
#[allow(dead_code)]
fn old_serialized(&self) -> Bytes {
let n1 = self.tss.len();
let mut g = BytesMut::with_capacity(4 + n1 * (8 + 3 * 4));
g.put_u32_le(n1 as u32);
if n1 > 0 {
let ptr = &self.tss[0] as *const u64 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<u64>() * n1) };
g.put(a);
let ptr = &self.mins[0] as *const f32 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<f32>() * n1) };
g.put(a);
let ptr = &self.maxs[0] as *const f32 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<f32>() * n1) };
g.put(a);
let ptr = &self.avgs[0] as *const f32 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<f32>() * n1) };
g.put(a);
}
info!("impl Frameable for MinMaxAvgScalarEventBatch g.len() {}", g.len());
g.freeze()
}
}
/*
TODO remove?
impl MakeBytesFrame for Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarEventBatch>>, Error> {
fn make_bytes_frame(&self) -> Result<Bytes, Error> {
Ok(make_frame(self)?.freeze())
}
}*/
impl RangeOverlapInfo for MinMaxAvgScalarEventBatch {
fn ends_before(&self, range: NanoRange) -> bool {
match self.tss.last() {
Some(&ts) => ts < range.beg,
None => true,
}
}
fn ends_after(&self, range: NanoRange) -> bool {
match self.tss.last() {
Some(&ts) => ts >= range.end,
None => panic!(),
}
}
fn starts_after(&self, range: NanoRange) -> bool {
match self.tss.first() {
Some(&ts) => ts >= range.end,
None => panic!(),
}
}
}
impl Appendable for MinMaxAvgScalarEventBatch {
fn empty() -> Self {
Self::empty()
}
fn append(&mut self, src: &Self) {
self.tss.extend_from_slice(&src.tss);
self.mins.extend_from_slice(&src.mins);
self.maxs.extend_from_slice(&src.maxs);
self.avgs.extend_from_slice(&src.avgs);
}
}

View File

@@ -1,5 +1,4 @@
use crate::agg::binnedt::TBinnerStream;
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::binned::binnedfrompbv::BinnedFromPreBinned;
use crate::binned::query::BinnedQuery;
use crate::binnedstream::BoxedStream;
@@ -406,27 +405,6 @@ pub async fn binned_json(
Ok(Box::pin(ret))
}
impl WithLen for MinMaxAvgScalarEventBatch {
fn len(&self) -> usize {
self.tss.len()
}
}
impl WithTimestamps for MinMaxAvgScalarEventBatch {
fn ts(&self, ix: usize) -> u64 {
self.tss[ix]
}
}
impl PushableIndex for MinMaxAvgScalarEventBatch {
fn push_index(&mut self, src: &Self, ix: usize) {
self.tss.push(src.tss[ix]);
self.mins.push(src.mins[ix]);
self.maxs.push(src.maxs[ix]);
self.avgs.push(src.avgs[ix]);
}
}
pub trait EventsDecoder {
type Output;
fn ingest(&mut self, event: &[u8]);

View File

@@ -11,8 +11,8 @@ use futures_core::Stream;
use futures_util::{FutureExt, StreamExt};
use items::numops::NumOps;
use items::{
Appendable, Clearable, EventsNodeProcessor, FrameType, PushableIndex, RangeCompletableItem, ReadableFromFile,
Sitemty, StreamItem, TimeBinnableType,
Appendable, Clearable, EventsNodeProcessor, EventsTypeAliases, FrameType, PushableIndex, RangeCompletableItem,
ReadableFromFile, Sitemty, StreamItem, TimeBinnableType,
};
use netpod::log::*;
use netpod::query::RawEventsQuery;
@@ -41,9 +41,7 @@ where
agg_kind: AggKind,
node_config: NodeConfigCached,
open_check_local_file: Option<Pin<Box<dyn Future<Output = Result<File, io::Error>> + Send>>>,
fut2: Option<
Pin<Box<dyn Stream<Item = Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>> + Send>>,
>,
fut2: Option<Pin<Box<dyn Stream<Item = Sitemty<<ENP as EventsTypeAliases>::TimeBinOutput>> + Send>>>,
read_from_cache: bool,
cache_written: bool,
data_complete: bool,
@@ -52,15 +50,9 @@ where
errored: bool,
completed: bool,
streamlog: Streamlog,
values: <<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output,
values: Option<<ENP as EventsTypeAliases>::TimeBinOutput>,
write_fut: Option<Pin<Box<dyn Future<Output = Result<WrittenPbCache, Error>> + Send>>>,
read_cache_fut: Option<
Pin<
Box<
dyn Future<Output = Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>> + Send,
>,
>,
>,
read_cache_fut: Option<Pin<Box<dyn Future<Output = Sitemty<<ENP as EventsTypeAliases>::TimeBinOutput>> + Send>>>,
_m1: PhantomData<NTY>,
_m2: PhantomData<END>,
_m3: PhantomData<EVS>,
@@ -96,7 +88,8 @@ where
completed: false,
streamlog: Streamlog::new(node_config.ix as u32),
// TODO use alias via some trait associated type:
values: <<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output as Appendable>::empty(),
//values: <<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output as Appendable>::empty(),
values: None,
write_fut: None,
read_cache_fut: None,
_m1: PhantomData,
@@ -309,24 +302,26 @@ where
} else {
match self.query.cache_usage() {
CacheUsage::Use | CacheUsage::Recreate => {
let msg = format!(
"write cache file query: {:?} bin count: {}",
self.query.patch(),
self.values.len(),
);
self.streamlog.append(Level::INFO, msg);
// TODO use alias vias trait:
let emp = <<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output as Appendable>::empty();
let values = std::mem::replace(&mut self.values, emp);
let fut = write_pb_cache_min_max_avg_scalar(
values,
self.query.patch().clone(),
self.query.agg_kind().clone(),
self.query.channel().clone(),
self.node_config.clone(),
);
self.write_fut = Some(Box::pin(fut));
continue 'outer;
if let Some(values) = self.values.take() {
let msg = format!(
"write cache file query: {:?} bin count: {}",
self.query.patch(),
values.len(),
);
self.streamlog.append(Level::INFO, msg);
let fut = write_pb_cache_min_max_avg_scalar(
values,
self.query.patch().clone(),
self.query.agg_kind().clone(),
self.query.channel().clone(),
self.node_config.clone(),
);
self.write_fut = Some(Box::pin(fut));
continue 'outer;
} else {
warn!("no values to write to cache");
continue 'outer;
}
}
_ => {
self.cache_written = true;
@@ -346,7 +341,13 @@ where
continue 'outer;
}
RangeCompletableItem::Data(item) => {
self.values.append(&item);
if let Some(values) = &mut self.values {
values.append(&item);
} else {
let mut values = item.empty_like_self();
values.append(&item);
self.values = Some(values);
}
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))
}
},

View File

@@ -212,8 +212,10 @@ where
}
}
fn decode(&mut self, ev: &EventFull) -> Result<<EVS as EventValueFromBytes<NTY, END>>::Batch, Error> {
let mut ret = <<EVS as EventValueFromBytes<NTY, END>>::Batch as Appendable>::empty();
fn decode(&mut self, ev: &EventFull) -> Result<Option<<EVS as EventValueFromBytes<NTY, END>>::Batch>, Error> {
//let mut ret = <<EVS as EventValueFromBytes<NTY, END>>::Batch as Appendable>::empty();
//let mut ret = EventValues::<<EVS as EventValueFromBytes<NTY, END>>::Output>::empty();
let mut ret = None;
//ret.tss.reserve(ev.tss.len());
//ret.values.reserve(ev.tss.len());
for i1 in 0..ev.tss.len() {
@@ -231,9 +233,9 @@ where
}
let decomp = ev.decomps[i1].as_ref().unwrap().as_ref();
let val = self.evs.convert(decomp, be)?;
<<EVS as EventValueFromBytes<NTY, END>>::Batch as EventAppendable>::append_event(&mut ret, ev.tss[i1], val);
//ret.tss.push(ev.tss[i1]);
//ret.values.push(val);
let k =
<<EVS as EventValueFromBytes<NTY, END>>::Batch as EventAppendable>::append_event(ret, ev.tss[i1], val);
ret = Some(k);
}
Ok(ret)
}
@@ -265,9 +267,14 @@ where
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
}
RangeCompletableItem::Data(item) => match self.decode(&item) {
Ok(res) => {
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(res)))))
}
Ok(res) => match res {
Some(res) => {
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(res)))))
}
None => {
continue;
}
},
Err(e) => {
self.errored = true;
Ready(Some(Err(e)))

View File

@@ -2,7 +2,6 @@ use crate::dataopen::{open_expanded_files, open_files, OpenedFileSet};
use crate::eventchunker::{EventChunker, EventChunkerConf, EventFull};
use crate::file_content_stream;
use crate::merge::MergedStream;
use crate::rangefilter::RangeFilter;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
@@ -14,6 +13,7 @@ use std::pin::Pin;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::task::{Context, Poll};
use streams::rangefilter::RangeFilter;
pub trait InputTraits: Stream<Item = Sitemty<EventFull>> {}
@@ -204,7 +204,6 @@ impl Stream for EventChunkerMultifile {
#[cfg(test)]
mod test {
use crate::rangefilter::RangeFilter;
use crate::{eventblobs::EventChunkerMultifile, eventchunker::EventChunkerConf};
use err::Error;
use futures_util::StreamExt;
@@ -212,6 +211,7 @@ mod test {
use netpod::log::*;
use netpod::timeunits::{DAY, MS};
use netpod::{ByteSize, ChannelConfig, FileIoBufferSize, Nanos};
use streams::rangefilter::RangeFilter;
fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, Vec<u64>), Error> {
let chn = netpod::Channel {

View File

@@ -524,7 +524,7 @@ impl WithLen for EventFull {
}
impl Appendable for EventFull {
fn empty() -> Self {
fn empty_like_self(&self) -> Self {
Self::empty()
}

View File

@@ -32,9 +32,7 @@ pub mod frame;
pub mod gen;
pub mod index;
pub mod merge;
pub mod mergeblobs;
pub mod paths;
pub mod rangefilter;
pub mod raw;
pub mod streamlog;

View File

@@ -6,7 +6,6 @@ use items::{Appendable, LogItem, PushableIndex, RangeCompletableItem, Sitemty, S
use netpod::histo::HistoLog2;
use netpod::log::*;
use netpod::ByteSize;
use netpod::EventDataReadStats;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -28,7 +27,7 @@ pub struct MergedStream<S, ITY> {
ixs: Vec<usize>,
errored: bool,
completed: bool,
batch: ITY,
batch: Option<ITY>,
ts_last_emit: u64,
range_complete_observed: Vec<bool>,
range_complete_observed_all: bool,
@@ -37,7 +36,7 @@ pub struct MergedStream<S, ITY> {
batch_size: ByteSize,
batch_len_emit_histo: HistoLog2,
logitems: VecDeque<LogItem>,
event_data_read_stats_items: VecDeque<EventDataReadStats>,
stats_items: VecDeque<StatsItem>,
}
// TODO get rid, log info explicitly.
@@ -65,7 +64,7 @@ where
ixs: vec![0; n],
errored: false,
completed: false,
batch: <ITY as Appendable>::empty(),
batch: None,
ts_last_emit: 0,
range_complete_observed: vec![false; n],
range_complete_observed_all: false,
@@ -74,7 +73,7 @@ where
batch_size: ByteSize::kb(128),
batch_len_emit_histo: HistoLog2::new(0),
logitems: VecDeque::new(),
event_data_read_stats_items: VecDeque::new(),
stats_items: VecDeque::new(),
}
}
@@ -92,11 +91,7 @@ where
continue 'l1;
}
StreamItem::Stats(item) => {
match item {
StatsItem::EventDataReadStats(item) => {
self.event_data_read_stats_items.push_back(item);
}
}
self.stats_items.push_back(item);
continue 'l1;
}
StreamItem::DataItem(item) => match item {
@@ -160,8 +155,8 @@ where
Ready(None)
} else if let Some(item) = self.logitems.pop_front() {
Ready(Some(Ok(StreamItem::Log(item))))
} else if let Some(item) = self.event_data_read_stats_items.pop_front() {
Ready(Some(Ok(StreamItem::Stats(StatsItem::EventDataReadStats(item)))))
} else if let Some(item) = self.stats_items.pop_front() {
Ready(Some(Ok(StreamItem::Stats(item))))
} else if self.data_emit_complete {
if self.range_complete_observed_all {
if self.range_complete_observed_all_emitted {
@@ -197,18 +192,22 @@ where
}
}
if lowest_ix == usize::MAX {
if self.batch.len() != 0 {
let ret = std::mem::replace(&mut self.batch, ITY::empty());
self.batch_len_emit_histo.ingest(ret.len() as u32);
self.data_emit_complete = true;
if LOG_EMIT_ITEM {
let mut aa = vec![];
for ii in 0..ret.len() {
aa.push(ret.ts(ii));
}
info!("MergedBlobsStream A emits {} events tss {:?}", ret.len(), aa);
};
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
if let Some(batch) = self.batch.take() {
if batch.len() != 0 {
self.batch_len_emit_histo.ingest(batch.len() as u32);
self.data_emit_complete = true;
if LOG_EMIT_ITEM {
let mut aa = vec![];
for ii in 0..batch.len() {
aa.push(batch.ts(ii));
}
info!("MergedBlobsStream A emits {} events tss {:?}", batch.len(), aa);
};
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(batch)))))
} else {
self.data_emit_complete = true;
continue 'outer;
}
} else {
self.data_emit_complete = true;
continue 'outer;
@@ -216,17 +215,18 @@ where
} else {
assert!(lowest_ts >= self.ts_last_emit);
{
let mut ldst = std::mem::replace(&mut self.batch, ITY::empty());
let batch = self.batch.take();
self.ts_last_emit = lowest_ts;
let rix = self.ixs[lowest_ix];
match &self.current[lowest_ix] {
MergedCurVal::Val(val) => {
let mut ldst = batch.unwrap_or_else(|| val.empty_like_self());
ldst.push_index(val, rix);
self.batch = Some(ldst);
}
MergedCurVal::None => panic!(),
MergedCurVal::Finish => panic!(),
}
self.batch = ldst;
}
self.ixs[lowest_ix] += 1;
let curlen = match &self.current[lowest_ix] {
@@ -238,24 +238,30 @@ where
self.ixs[lowest_ix] = 0;
self.current[lowest_ix] = MergedCurVal::None;
}
let emit_packet_now = if self.batch.byte_estimate() >= self.batch_size.bytes() as u64 {
true
let emit_packet_now = if let Some(batch) = &self.batch {
if batch.byte_estimate() >= self.batch_size.bytes() as u64 {
true
} else {
false
}
} else {
false
};
if emit_packet_now {
trace!("emit item because over threshold len {}", self.batch.len());
let emp = ITY::empty();
let ret = std::mem::replace(&mut self.batch, emp);
self.batch_len_emit_histo.ingest(ret.len() as u32);
if LOG_EMIT_ITEM {
let mut aa = vec![];
for ii in 0..ret.len() {
aa.push(ret.ts(ii));
}
info!("MergedBlobsStream B emits {} events tss {:?}", ret.len(), aa);
};
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
if let Some(batch) = self.batch.take() {
trace!("emit item because over threshold len {}", batch.len());
self.batch_len_emit_histo.ingest(batch.len() as u32);
if LOG_EMIT_ITEM {
let mut aa = vec![];
for ii in 0..batch.len() {
aa.push(batch.ts(ii));
}
info!("MergedBlobsStream B emits {} events tss {:?}", batch.len(), aa);
};
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(batch)))))
} else {
continue 'outer;
}
} else {
continue 'outer;
}

View File

@@ -1,278 +0,0 @@
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::ByteEstimate;
use items::{
Appendable, LogItem, PushableIndex, RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithLen, WithTimestamps,
};
use netpod::histo::HistoLog2;
use netpod::EventDataReadStats;
use netpod::{log::*, ByteSize};
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
const LOG_EMIT_ITEM: bool = false;
enum MergedCurVal<T> {
None,
Finish,
Val(T),
}
pub struct MergedBlobsStream<S, I>
where
S: Stream<Item = Sitemty<I>> + Unpin,
I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen + ByteEstimate,
{
inps: Vec<S>,
current: Vec<MergedCurVal<I>>,
ixs: Vec<usize>,
errored: bool,
completed: bool,
batch: I,
ts_last_emit: u64,
range_complete_observed: Vec<bool>,
range_complete_observed_all: bool,
range_complete_observed_all_emitted: bool,
data_emit_complete: bool,
batch_size: ByteSize,
batch_len_emit_histo: HistoLog2,
logitems: VecDeque<LogItem>,
event_data_read_stats_items: VecDeque<EventDataReadStats>,
}
// TODO get rid, log info explicitly.
impl<S, I> Drop for MergedBlobsStream<S, I>
where
S: Stream<Item = Sitemty<I>> + Unpin,
I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen + ByteEstimate,
{
fn drop(&mut self) {
info!(
"MergedBlobsStream Drop Stats:\nbatch_len_emit_histo: {:?}",
self.batch_len_emit_histo
);
}
}
impl<S, I> MergedBlobsStream<S, I>
where
S: Stream<Item = Sitemty<I>> + Unpin,
I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen + ByteEstimate,
{
pub fn new(inps: Vec<S>) -> Self {
// TODO remove MergedBlobsStream
err::todo();
let n = inps.len();
let current = (0..n).into_iter().map(|_| MergedCurVal::None).collect();
Self {
inps,
current: current,
ixs: vec![0; n],
errored: false,
completed: false,
batch: I::empty(),
ts_last_emit: 0,
range_complete_observed: vec![false; n],
range_complete_observed_all: false,
range_complete_observed_all_emitted: false,
data_emit_complete: false,
batch_size: ByteSize::kb(128),
batch_len_emit_histo: HistoLog2::new(0),
logitems: VecDeque::new(),
event_data_read_stats_items: VecDeque::new(),
}
}
fn replenish(self: &mut Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
use Poll::*;
let mut pending = 0;
for i1 in 0..self.inps.len() {
match self.current[i1] {
MergedCurVal::None => {
'l1: loop {
break match self.inps[i1].poll_next_unpin(cx) {
Ready(Some(Ok(k))) => match k {
StreamItem::Log(item) => {
self.logitems.push_back(item);
continue 'l1;
}
StreamItem::Stats(item) => {
match item {
StatsItem::EventDataReadStats(item) => {
self.event_data_read_stats_items.push_back(item);
}
}
continue 'l1;
}
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
self.range_complete_observed[i1] = true;
let d = self.range_complete_observed.iter().filter(|&&k| k).count();
if d == self.range_complete_observed.len() {
self.range_complete_observed_all = true;
debug!("MergedStream range_complete d {} COMPLETE", d);
} else {
trace!("MergedStream range_complete d {}", d);
}
continue 'l1;
}
RangeCompletableItem::Data(item) => {
self.ixs[i1] = 0;
self.current[i1] = MergedCurVal::Val(item);
}
},
},
Ready(Some(Err(e))) => {
// TODO emit this error, consider this stream as done, anything more to do here?
//self.current[i1] = CurVal::Err(e);
self.errored = true;
return Ready(Err(e));
}
Ready(None) => {
self.current[i1] = MergedCurVal::Finish;
}
Pending => {
pending += 1;
}
};
}
}
_ => (),
}
}
if pending > 0 {
Pending
} else {
Ready(Ok(()))
}
}
}
impl<S, I> Stream for MergedBlobsStream<S, I>
where
S: Stream<Item = Sitemty<I>> + Unpin,
I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen + ByteEstimate,
{
type Item = Sitemty<I>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
// TODO remove MergedBlobsStream
err::todo();
use Poll::*;
'outer: loop {
break if self.completed {
panic!("poll_next on completed");
} else if self.errored {
self.completed = true;
Ready(None)
} else if let Some(item) = self.logitems.pop_front() {
Ready(Some(Ok(StreamItem::Log(item))))
} else if let Some(item) = self.event_data_read_stats_items.pop_front() {
Ready(Some(Ok(StreamItem::Stats(StatsItem::EventDataReadStats(item)))))
} else if self.data_emit_complete {
if self.range_complete_observed_all {
if self.range_complete_observed_all_emitted {
self.completed = true;
Ready(None)
} else {
self.range_complete_observed_all_emitted = true;
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
}
} else {
self.completed = true;
Ready(None)
}
} else {
// Can only run logic if all streams are either finished, errored or have some current value.
match self.replenish(cx) {
Ready(Ok(_)) => {
let mut lowest_ix = usize::MAX;
let mut lowest_ts = u64::MAX;
for i1 in 0..self.inps.len() {
if let MergedCurVal::Val(val) = &self.current[i1] {
let u = self.ixs[i1];
if u >= val.len() {
self.ixs[i1] = 0;
self.current[i1] = MergedCurVal::None;
continue 'outer;
} else {
let ts = val.ts(u);
if ts < lowest_ts {
lowest_ix = i1;
lowest_ts = ts;
}
}
}
}
if lowest_ix == usize::MAX {
if self.batch.len() != 0 {
let emp = I::empty();
let ret = std::mem::replace(&mut self.batch, emp);
self.batch_len_emit_histo.ingest(ret.len() as u32);
self.data_emit_complete = true;
if LOG_EMIT_ITEM {
let mut aa = vec![];
for ii in 0..ret.len() {
aa.push(ret.ts(ii));
}
info!("MergedBlobsStream A emits {} events tss {:?}", ret.len(), aa);
};
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
} else {
self.data_emit_complete = true;
continue 'outer;
}
} else {
assert!(lowest_ts >= self.ts_last_emit);
let emp = I::empty();
let mut local_batch = std::mem::replace(&mut self.batch, emp);
self.ts_last_emit = lowest_ts;
let rix = self.ixs[lowest_ix];
match &self.current[lowest_ix] {
MergedCurVal::Val(val) => {
local_batch.push_index(val, rix);
}
MergedCurVal::None => panic!(),
MergedCurVal::Finish => panic!(),
}
self.batch = local_batch;
self.ixs[lowest_ix] += 1;
let curlen = match &self.current[lowest_ix] {
MergedCurVal::Val(val) => val.len(),
MergedCurVal::None => panic!(),
MergedCurVal::Finish => panic!(),
};
if self.ixs[lowest_ix] >= curlen {
self.ixs[lowest_ix] = 0;
self.current[lowest_ix] = MergedCurVal::None;
}
if self.batch.byte_estimate() >= self.batch_size.bytes() as u64 {
trace!("emit item because over threshold len {}", self.batch.len());
let emp = I::empty();
let ret = std::mem::replace(&mut self.batch, emp);
self.batch_len_emit_histo.ingest(ret.len() as u32);
if LOG_EMIT_ITEM {
let mut aa = vec![];
for ii in 0..ret.len() {
aa.push(ret.ts(ii));
}
info!("MergedBlobsStream B emits {} events tss {:?}", ret.len(), aa);
};
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
} else {
continue 'outer;
}
}
}
Ready(Err(e)) => {
self.errored = true;
Ready(Some(Err(e)))
}
Pending => Pending,
}
};
}
}
}

View File

@@ -1,143 +0,0 @@
use futures_core::Stream;
use futures_util::StreamExt;
use items::{Appendable, Clearable, PushableIndex, RangeCompletableItem, Sitemty, StreamItem, WithTimestamps};
use netpod::log::*;
use netpod::NanoRange;
use std::pin::Pin;
use std::task::{Context, Poll};
pub struct RangeFilter<S, ITY> {
inp: S,
range: NanoRange,
range_str: String,
expand: bool,
prerange: ITY,
have_pre: bool,
have_range_complete: bool,
emitted_post: bool,
data_done: bool,
done: bool,
complete: bool,
}
impl<S, ITY> RangeFilter<S, ITY>
where
ITY: Appendable,
{
pub fn new(inp: S, range: NanoRange, expand: bool) -> Self {
trace!("RangeFilter::new range: {:?} expand: {:?}", range, expand);
Self {
inp,
range_str: format!("{:?}", range),
range,
expand,
prerange: ITY::empty(),
have_pre: false,
have_range_complete: false,
emitted_post: false,
data_done: false,
done: false,
complete: false,
}
}
}
impl<S, ITY> RangeFilter<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin,
{
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<<Self as Stream>::Item>> {
use Poll::*;
loop {
break if self.complete {
panic!("poll_next on complete");
} else if self.done {
self.complete = true;
Ready(None)
} else if self.data_done {
self.done = true;
if self.have_range_complete {
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
} else {
continue;
}
} else {
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => match item {
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => {
let mut ret = ITY::empty();
for i1 in 0..item.len() {
let ts = item.ts(i1);
if ts < self.range.beg {
if self.expand {
self.prerange.clear();
self.prerange.push_index(&item, i1);
self.have_pre = true;
}
} else if ts >= self.range.end {
self.have_range_complete = true;
if self.expand {
if self.have_pre {
ret.push_index(&self.prerange, 0);
self.prerange.clear();
self.have_pre = false;
}
if !self.emitted_post {
self.emitted_post = true;
ret.push_index(&item, i1);
//self.data_done = true;
}
} else {
//self.data_done = true;
}
} else {
if self.expand {
if self.have_pre {
ret.push_index(&self.prerange, 0);
self.prerange.clear();
self.have_pre = false;
}
}
ret.push_index(&item, i1);
};
}
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
}
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => {
self.have_range_complete = true;
continue;
}
k => Ready(Some(k)),
},
Ready(None) => {
self.data_done = true;
if self.have_pre {
let mut ret = ITY::empty();
ret.push_index(&self.prerange, 0);
self.have_pre = false;
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
} else {
continue;
}
}
Pending => Pending,
}
};
}
}
}
impl<S, ITY> Stream for RangeFilter<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin,
{
type Item = Sitemty<ITY>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let span1 = span!(Level::INFO, "RangeFilter", range = tracing::field::Empty);
span1.record("range", &self.range_str.as_str());
span1.in_scope(|| Self::poll_next(self, cx))
}
}