WIP query options

This commit is contained in:
Dominik Werder
2023-02-08 16:53:18 +01:00
parent b93bb9b467
commit 4694f98758
22 changed files with 664 additions and 272 deletions

View File

@@ -78,7 +78,7 @@ where
let coll = collector.as_mut().unwrap();
coll.ingest(&mut item);
if coll.len() as u64 >= events_max {
warn!("Reached events_max {} abort", events_max);
warn!("TODO compute continue-at reached events_max {} abort", events_max);
break;
}
}

View File

@@ -40,6 +40,7 @@ pub struct EventChunker {
seen_after_range_count: usize,
unordered_warn_count: usize,
repeated_ts_warn_count: usize,
dbgdesc: String,
}
impl Drop for EventChunker {
@@ -83,6 +84,7 @@ impl EventChunker {
dbg_path: PathBuf,
expand: bool,
do_decompress: bool,
dbgdesc: String,
) -> Self {
trace!("EventChunker::from_start");
let mut inp = NeedMinBuffer::new(inp);
@@ -111,6 +113,7 @@ impl EventChunker {
seen_after_range_count: 0,
unordered_warn_count: 0,
repeated_ts_warn_count: 0,
dbgdesc,
}
}
@@ -123,8 +126,18 @@ impl EventChunker {
dbg_path: PathBuf,
expand: bool,
do_decompress: bool,
dbgdesc: String,
) -> Self {
let mut ret = Self::from_start(inp, channel_config, range, stats_conf, dbg_path, expand, do_decompress);
let mut ret = Self::from_start(
inp,
channel_config,
range,
stats_conf,
dbg_path,
expand,
do_decompress,
dbgdesc,
);
ret.state = DataFileState::Event;
ret.need_min = 4;
ret.inp.set_need_min(4);
@@ -192,6 +205,7 @@ impl EventChunker {
let _ttl = sl.read_i64::<BE>().unwrap();
let ts = sl.read_i64::<BE>().unwrap() as u64;
let pulse = sl.read_i64::<BE>().unwrap() as u64;
info!("SEE {ts:20} {pulse:20} {0}", self.dbgdesc);
if ts == self.max_ts {
if self.repeated_ts_warn_count < 20 {
let msg = format!(
@@ -362,17 +376,16 @@ impl EventChunker {
}
}
}
let data = &buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)];
let decomp = {
if self.do_decompress {
assert!(data.len() > 12);
let ts1 = Instant::now();
let decomp_bytes = (type_size * ele_count as u32) as usize;
let mut decomp = BytesMut::with_capacity(decomp_bytes);
unsafe {
decomp.set_len(decomp_bytes);
}
let mut decomp = vec![0; decomp_bytes];
// TODO limit the buf slice range
match bitshuffle_decompress(
&buf.as_ref()[(p1 as usize + 12)..(p1 as usize + k1 as usize)],
&data[12..],
&mut decomp,
ele_count as usize,
ele_size as usize,
@@ -382,6 +395,7 @@ impl EventChunker {
assert!(c1 as u64 + 12 == k1);
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1);
// TODO analyze the histo
self.decomp_dt_histo.ingest(dt.as_secs() as u32 + dt.subsec_micros());
Some(decomp)
}
@@ -396,7 +410,7 @@ impl EventChunker {
ret.add_event(
ts,
pulse,
buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)].to_vec(),
Some(data.to_vec()),
decomp,
ScalarType::from_dtype_index(type_index)?,
is_big_endian,
@@ -409,13 +423,12 @@ impl EventChunker {
Err(Error::with_msg(msg))?;
}
let vlen = len - p1 as u32 - 4;
// TODO in this case, decomp and comp is the same and not needed.
let decomp = BytesMut::from(&buf[p1 as usize..(p1 as u32 + vlen) as usize]);
let data = &buf[p1 as usize..(p1 as u32 + vlen) as usize];
ret.add_event(
ts,
pulse,
buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)].to_vec(),
Some(decomp),
Some(data.to_vec()),
Some(data.to_vec()),
ScalarType::from_dtype_index(type_index)?,
is_big_endian,
shape_this,

View File

@@ -1,12 +1,16 @@
use super::inmem::InMemoryFrameAsyncReadStream;
use futures_util::{Stream, StreamExt};
use futures_util::Stream;
use futures_util::StreamExt;
use items::frame::decode_frame;
use items::{FrameTypeInnerStatic, Sitemty, StreamItem};
use items::FrameTypeInnerStatic;
use items::Sitemty;
use items::StreamItem;
use netpod::log::*;
use serde::de::DeserializeOwned;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::Context;
use std::task::Poll;
use tokio::io::AsyncRead;
pub struct EventsFromFrames<T, I>

View File

@@ -7,6 +7,7 @@ pub mod merge;
pub mod needminbuffer;
pub mod plaineventsjson;
pub mod rangefilter;
pub mod rangefilter2;
pub mod slidebuf;
pub mod tcprawclient;
#[cfg(test)]

View File

@@ -3,6 +3,7 @@ use err::Error;
use futures_util::stream;
use futures_util::StreamExt;
use items_2::channelevents::ChannelEvents;
use netpod::log::*;
use netpod::query::PlainEventsQuery;
use netpod::ChConf;
use netpod::Cluster;
@@ -18,24 +19,17 @@ pub async fn plain_events_json(
// TODO remove magic constant
let deadline = Instant::now() + query.timeout() + Duration::from_millis(1000);
let events_max = query.events_max();
let empty = items_2::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape, query.agg_kind())?;
let mut evquery = query.clone();
evquery.adjust_for_events_query();
let empty = items_2::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape, evquery.agg_kind())?;
info!("plain_events_json with empty item {}", empty.type_name());
let empty = ChannelEvents::Events(empty);
let empty = items::sitem_data(empty);
// TODO should be able to ask for data-events only, instead of mixed data and status events.
let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&query, cluster).await?;
let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&evquery, cluster).await?;
//let inps = open_tcp_streams::<_, Box<dyn items_2::Events>>(&query, cluster).await?;
// TODO propagate also the max-buf-len for the first stage event reader:
#[cfg(NOTHING)]
let stream = {
let mut it = inps.into_iter();
let inp0 = it.next().unwrap();
let inp1 = it.next().unwrap();
let inp2 = it.next().unwrap();
let stream = inp0.chain(inp1).chain(inp2);
stream
};
netpod::log::info!("plain_events_json with empty item {empty:?}");
let stream = { items_2::merger::Merger::new(inps, 1) };
let stream = items_2::merger::Merger::new(inps, 1024);
let stream = stream::iter([empty]).chain(stream);
let collected = crate::collect::collect(stream, deadline, events_max, Some(query.range().clone()), None).await?;
let jsval = serde_json::to_value(&collected)?;

258
streams/src/rangefilter2.rs Normal file
View File

@@ -0,0 +1,258 @@
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
use items::Appendable;
use items::Clearable;
use items::PushableIndex;
use items::RangeCompletableItem;
use items::Sitemty;
use items::StatsItem;
use items::StreamItem;
use items::WithTimestamps;
use netpod::log::*;
use netpod::NanoRange;
use netpod::Nanos;
use netpod::RangeFilterStats;
use std::fmt;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
pub struct RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin,
{
inp: S,
range: NanoRange,
range_str: String,
expand: bool,
ts_max: u64,
stats: RangeFilterStats,
prerange: Option<ITY>,
have_pre: bool,
have_range_complete: bool,
emitted_post: bool,
data_done: bool,
raco_done: bool,
done: bool,
complete: bool,
items_with_pre: usize,
items_with_post: usize,
items_with_unordered: usize,
}
impl<S, ITY> RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin,
{
fn selfname() -> &'static str {
"RangeFilter2"
}
pub fn new(inp: S, range: NanoRange, expand: bool) -> Self {
trace!("{}::new range: {:?} expand: {:?}", Self::selfname(), range, expand);
Self {
inp,
range_str: format!("{:?}", range),
range,
expand,
ts_max: 0,
stats: RangeFilterStats::new(),
prerange: None,
have_pre: false,
have_range_complete: false,
emitted_post: false,
data_done: false,
raco_done: false,
done: false,
complete: false,
items_with_pre: 0,
items_with_post: 0,
items_with_unordered: 0,
}
}
}
impl<S, ITY> RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin,
{
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<<Self as Stream>::Item>> {
use Poll::*;
loop {
break if self.complete {
panic!("poll_next on complete");
} else if self.done {
self.complete = true;
Ready(None)
} else if self.raco_done {
self.done = true;
let k = std::mem::replace(&mut self.stats, RangeFilterStats::new());
let k = StatsItem::RangeFilterStats(k);
Ready(Some(Ok(StreamItem::Stats(k))))
} else if self.data_done {
self.raco_done = true;
if self.have_range_complete {
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
} else {
continue;
}
} else {
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => match item {
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => {
let mut contains_pre = false;
let mut contains_post = false;
let mut contains_unordered = false;
let mut ret = item.empty_like_self();
for i1 in 0..item.len() {
let ts = item.ts(i1);
if ts < self.ts_max {
contains_unordered = true;
if false {
self.done = true;
let msg = format!(
"unordered event i1 {} / {} ts {:?} ts_max {:?}",
i1,
item.len(),
Nanos::from_ns(ts),
Nanos::from_ns(self.ts_max)
);
error!("{}", msg);
return Ready(Some(Err(Error::with_msg(msg))));
}
} else {
self.ts_max = ts;
if ts < self.range.beg {
contains_pre = true;
if self.expand {
let mut prerange = if let Some(prerange) = self.prerange.take() {
prerange
} else {
item.empty_like_self()
};
prerange.clear();
prerange.push_index(&item, i1);
self.prerange = Some(prerange);
self.have_pre = true;
}
} else if ts >= self.range.end {
contains_post = true;
self.have_range_complete = true;
if self.expand {
if self.have_pre {
let prerange = if let Some(prerange) = &mut self.prerange {
prerange
} else {
panic!()
};
ret.push_index(prerange, 0);
prerange.clear();
self.have_pre = false;
}
if !self.emitted_post {
self.emitted_post = true;
ret.push_index(&item, i1);
//self.data_done = true;
}
} else {
//self.data_done = true;
}
} else {
if self.expand {
if self.have_pre {
let prerange = if let Some(prerange) = &mut self.prerange {
prerange
} else {
panic!()
};
ret.push_index(prerange, 0);
prerange.clear();
self.have_pre = false;
}
}
ret.push_index(&item, i1);
}
}
}
if contains_pre {
self.items_with_pre += 1;
}
if contains_post {
self.items_with_post += 1;
}
if contains_unordered {
self.items_with_unordered += 1;
}
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
}
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => {
self.have_range_complete = true;
continue;
}
k => Ready(Some(k)),
},
Ready(None) => {
self.data_done = true;
if self.have_pre {
let prerange = if let Some(prerange) = &mut self.prerange {
prerange
} else {
panic!()
};
let mut ret = prerange.empty_like_self();
ret.push_index(&prerange, 0);
prerange.clear();
self.have_pre = false;
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
} else {
continue;
}
}
Pending => Pending,
}
};
}
}
}
impl<S, ITY> Stream for RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin,
{
type Item = Sitemty<ITY>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let span1 = span!(Level::INFO, "RangeFilter", range = tracing::field::Empty);
span1.record("range", &self.range_str.as_str());
span1.in_scope(|| Self::poll_next(self, cx))
}
}
impl<S, ITY> fmt::Debug for RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct(Self::selfname())
.field("items_with_pre", &self.items_with_pre)
.field("items_with_post", &self.items_with_post)
.field("items_with_unordered", &self.items_with_unordered)
.finish()
}
}
impl<S, ITY> Drop for RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin,
{
fn drop(&mut self) {
debug!("drop {} {:?}", Self::selfname(), self);
}
}