This commit is contained in:
Dominik Werder
2023-02-10 11:37:17 +01:00
parent 4694f98758
commit 4d9a33b77f
25 changed files with 609 additions and 502 deletions

View File

@@ -205,7 +205,6 @@ impl EventChunker {
let _ttl = sl.read_i64::<BE>().unwrap();
let ts = sl.read_i64::<BE>().unwrap() as u64;
let pulse = sl.read_i64::<BE>().unwrap() as u64;
info!("SEE {ts:20} {pulse:20} {0}", self.dbgdesc);
if ts == self.max_ts {
if self.repeated_ts_warn_count < 20 {
let msg = format!(

View File

@@ -5,6 +5,7 @@ use futures_util::StreamExt;
use items_2::channelevents::ChannelEvents;
use netpod::log::*;
use netpod::query::PlainEventsQuery;
use netpod::AggKind;
use netpod::ChConf;
use netpod::Cluster;
use serde_json::Value as JsonValue;
@@ -19,9 +20,11 @@ pub async fn plain_events_json(
// TODO remove magic constant
let deadline = Instant::now() + query.timeout() + Duration::from_millis(1000);
let events_max = query.events_max();
let mut evquery = query.clone();
evquery.adjust_for_events_query();
let empty = items_2::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape, evquery.agg_kind())?;
let evquery = query.clone();
info!("plain_events_json evquery {:?}", evquery);
let ev_agg_kind = evquery.agg_kind().as_ref().map_or(AggKind::Plain, |x| x.clone());
info!("plain_events_json ev_agg_kind {:?}", ev_agg_kind);
let empty = items_2::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape, &ev_agg_kind)?;
info!("plain_events_json with empty item {}", empty.type_name());
let empty = ChannelEvents::Events(empty);
let empty = items::sitem_data(empty);
@@ -30,6 +33,7 @@ pub async fn plain_events_json(
//let inps = open_tcp_streams::<_, Box<dyn items_2::Events>>(&query, cluster).await?;
// TODO propagate also the max-buf-len for the first stage event reader:
let stream = items_2::merger::Merger::new(inps, 1024);
let stream = crate::rangefilter2::RangeFilter2::new(stream, query.range().clone(), evquery.one_before_range());
let stream = stream::iter([empty]).chain(stream);
let collected = crate::collect::collect(stream, deadline, events_max, Some(query.range().clone()), None).await?;
let jsval = serde_json::to_value(&collected)?;

View File

@@ -1,17 +1,13 @@
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
use items::Appendable;
use items::Clearable;
use items::PushableIndex;
use items::RangeCompletableItem;
use items::Sitemty;
use items::StatsItem;
use items::StreamItem;
use items::WithTimestamps;
use items_2::merger::Mergeable;
use netpod::log::*;
use netpod::NanoRange;
use netpod::Nanos;
use netpod::RangeFilterStats;
use std::fmt;
use std::pin::Pin;
@@ -21,64 +17,125 @@ use std::task::Poll;
pub struct RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin,
ITY: Mergeable,
{
inp: S,
range: NanoRange,
range_str: String,
expand: bool,
ts_max: u64,
one_before_range: bool,
stats: RangeFilterStats,
prerange: Option<ITY>,
have_pre: bool,
slot1: Option<ITY>,
slot2: Option<ITY>,
have_range_complete: bool,
emitted_post: bool,
data_done: bool,
raco_done: bool,
done: bool,
complete: bool,
items_with_pre: usize,
items_with_post: usize,
items_with_unordered: usize,
}
impl<S, ITY> RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin,
ITY: Mergeable,
{
fn selfname() -> &'static str {
const fn selfname() -> &'static str {
"RangeFilter2"
}
pub fn new(inp: S, range: NanoRange, expand: bool) -> Self {
trace!("{}::new range: {:?} expand: {:?}", Self::selfname(), range, expand);
pub fn new(inp: S, range: NanoRange, one_before_range: bool) -> Self {
trace!(
"{}::new range: {:?} one_before_range: {:?}",
Self::selfname(),
range,
one_before_range
);
Self {
inp,
range_str: format!("{:?}", range),
range,
expand,
ts_max: 0,
one_before_range,
stats: RangeFilterStats::new(),
prerange: None,
have_pre: false,
slot1: None,
slot2: None,
have_range_complete: false,
emitted_post: false,
data_done: false,
raco_done: false,
done: false,
complete: false,
items_with_pre: 0,
items_with_post: 0,
items_with_unordered: 0,
}
}
fn prune_high(&mut self, mut item: ITY, ts: u64) -> Result<ITY, Error> {
let ret = match item.find_highest_index_lt(ts) {
Some(ihlt) => {
let n = item.len();
if ihlt + 1 == n {
// TODO gather stats, this should be the most common case.
self.stats.items_no_prune_high += 1;
item
} else {
self.stats.items_part_prune_high += 1;
let mut dummy = item.new_empty();
item.drain_into(&mut dummy, (ihlt + 1, n))?;
item
}
}
None => {
self.stats.items_all_prune_high += 1;
item.new_empty()
}
};
Ok(ret)
}
fn handle_item(&mut self, item: ITY) -> Result<ITY, Error> {
let mut item = self.prune_high(item, self.range.end)?;
let ret = if self.one_before_range {
match item.find_lowest_index_ge(self.range.beg) {
Some(ilge) => {
if ilge == 0 {
if let Some(sl1) = self.slot1.take() {
self.slot2 = Some(item);
sl1
} else {
item
}
} else {
let mut dummy = item.new_empty();
item.drain_into(&mut dummy, (0, ilge - 1))?;
self.slot1 = None;
item
}
}
None => {
let n = item.len();
let mut keep = item.new_empty();
item.drain_into(&mut keep, (n.max(1) - 1, n))?;
self.slot1 = Some(keep);
item.new_empty()
}
}
} else {
match item.find_lowest_index_ge(self.range.beg) {
Some(ilge) => {
let mut dummy = item.new_empty();
item.drain_into(&mut dummy, (0, ilge))?;
item
}
None => {
// TODO count case for stats
item.new_empty()
}
}
};
Ok(ret)
}
}
impl<S, ITY> RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin,
ITY: Mergeable,
{
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<<Self as Stream>::Item>> {
use Poll::*;
@@ -100,95 +157,18 @@ where
} else {
continue;
}
} else if let Some(sl2) = self.slot2.take() {
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(sl2)))))
} else {
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => match item {
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => {
let mut contains_pre = false;
let mut contains_post = false;
let mut contains_unordered = false;
let mut ret = item.empty_like_self();
for i1 in 0..item.len() {
let ts = item.ts(i1);
if ts < self.ts_max {
contains_unordered = true;
if false {
self.done = true;
let msg = format!(
"unordered event i1 {} / {} ts {:?} ts_max {:?}",
i1,
item.len(),
Nanos::from_ns(ts),
Nanos::from_ns(self.ts_max)
);
error!("{}", msg);
return Ready(Some(Err(Error::with_msg(msg))));
}
} else {
self.ts_max = ts;
if ts < self.range.beg {
contains_pre = true;
if self.expand {
let mut prerange = if let Some(prerange) = self.prerange.take() {
prerange
} else {
item.empty_like_self()
};
prerange.clear();
prerange.push_index(&item, i1);
self.prerange = Some(prerange);
self.have_pre = true;
}
} else if ts >= self.range.end {
contains_post = true;
self.have_range_complete = true;
if self.expand {
if self.have_pre {
let prerange = if let Some(prerange) = &mut self.prerange {
prerange
} else {
panic!()
};
ret.push_index(prerange, 0);
prerange.clear();
self.have_pre = false;
}
if !self.emitted_post {
self.emitted_post = true;
ret.push_index(&item, i1);
//self.data_done = true;
}
} else {
//self.data_done = true;
}
} else {
if self.expand {
if self.have_pre {
let prerange = if let Some(prerange) = &mut self.prerange {
prerange
} else {
panic!()
};
ret.push_index(prerange, 0);
prerange.clear();
self.have_pre = false;
}
}
ret.push_index(&item, i1);
}
}
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => match self.handle_item(item) {
Ok(item) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))),
Err(e) => {
self.data_done = true;
Ready(Some(Err(e)))
}
if contains_pre {
self.items_with_pre += 1;
}
if contains_post {
self.items_with_post += 1;
}
if contains_unordered {
self.items_with_unordered += 1;
}
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
}
},
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => {
self.have_range_complete = true;
continue;
@@ -197,20 +177,7 @@ where
},
Ready(None) => {
self.data_done = true;
if self.have_pre {
let prerange = if let Some(prerange) = &mut self.prerange {
prerange
} else {
panic!()
};
let mut ret = prerange.empty_like_self();
ret.push_index(&prerange, 0);
prerange.clear();
self.have_pre = false;
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
} else {
continue;
}
continue;
}
Pending => Pending,
}
@@ -222,35 +189,32 @@ where
impl<S, ITY> Stream for RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin,
ITY: Mergeable,
{
type Item = Sitemty<ITY>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let span1 = span!(Level::INFO, "RangeFilter", range = tracing::field::Empty);
let span1 = span!(Level::TRACE, "RangeFilter2", range = tracing::field::Empty);
span1.record("range", &self.range_str.as_str());
span1.in_scope(|| Self::poll_next(self, cx))
let _spg = span1.enter();
Self::poll_next(self, cx)
}
}
impl<S, ITY> fmt::Debug for RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin,
ITY: Mergeable,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct(Self::selfname())
.field("items_with_pre", &self.items_with_pre)
.field("items_with_post", &self.items_with_post)
.field("items_with_unordered", &self.items_with_unordered)
.finish()
f.debug_struct(Self::selfname()).finish()
}
}
impl<S, ITY> Drop for RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin,
ITY: Mergeable,
{
fn drop(&mut self) {
debug!("drop {} {:?}", Self::selfname(), self);

View File

@@ -11,31 +11,29 @@ use netpod::BinnedRange;
use netpod::ChConf;
use netpod::Cluster;
use serde_json::Value as JsonValue;
use std::time::Duration;
use std::time::Instant;
pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Cluster) -> Result<JsonValue, Error> {
let binned_range = BinnedRange::covering_range(query.range().clone(), query.bin_count())?;
let bins_max = 10000;
let do_time_weight = query.agg_kind().do_time_weighted();
let timeout = Duration::from_millis(7500);
let deadline = Instant::now() + timeout;
let empty = items_2::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape, query.agg_kind())?;
let deadline = Instant::now() + query.timeout_value();
let empty = items_2::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape, &query.agg_kind())?;
let empty = ChannelEvents::Events(empty);
let empty = items::sitem_data(empty);
let rawquery = PlainEventsQuery::new(
let evquery = PlainEventsQuery::new(
query.channel().clone(),
query.range().clone(),
query.agg_kind().clone(),
timeout,
Some(query.agg_kind()),
query.timeout(),
None,
true,
);
let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&rawquery, cluster).await?;
let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&evquery, cluster).await?;
// TODO propagate also the max-buf-len for the first stage event reader:
info!("timebinned_json with empty item {empty:?}");
let stream = items_2::merger::Merger::new(inps, 128);
let stream = stream::iter([empty]).chain(stream);
let stream = crate::rangefilter2::RangeFilter2::new(stream, query.range().clone(), evquery.one_before_range());
let stream = Box::pin(stream);
let stream = crate::timebin::TimeBinnedStream::new(stream, binned_range.edges(), do_time_weight, deadline);
if false {