Simple binning test

This commit is contained in:
Dominik Werder
2023-04-20 19:49:38 +02:00
parent cf3a876bcc
commit 95af6c359c
10 changed files with 292 additions and 156 deletions

View File

@@ -62,7 +62,7 @@ fn time_bin_00() {
d
};
let deadline = Instant::now() + Duration::from_millis(2000000);
let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream0, binned_range, true, deadline);
let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream0, binned_range, true);
while let Some(item) = binned_stream.next().await {
//eprintln!("{item:?}");
match item {
@@ -121,7 +121,7 @@ fn time_bin_01() {
});
let stream0 = Box::pin(stream0);
let deadline = Instant::now() + Duration::from_millis(200);
let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream0, binned_range, true, deadline);
let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream0, binned_range, true);
while let Some(item) = binned_stream.next().await {
if true {
eprintln!("{item:?}");
@@ -192,8 +192,7 @@ fn time_bin_02() -> Result<(), Error> {
x
});
let stream = Box::pin(stream);
let mut binned_stream =
crate::timebin::TimeBinnedStream::new(stream, binned_range.clone(), do_time_weight, deadline);
let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream, binned_range.clone(), do_time_weight);
// From there on it should no longer be neccessary to distinguish whether its still events or time bins.
// Then, optionally collect for output type like json, or stream as batches.
// TODO the timebinner should already provide batches to make this efficient.

View File

@@ -1,6 +1,4 @@
use err::Error;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::sitem_data;
@@ -8,20 +6,15 @@ use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::timebin::TimeBinnableTy;
use items_0::timebin::TimeBinner;
use items_0::timebin::TimeBinnerTy;
use items_0::transform::TimeBinnableStreamTrait;
use items_0::transform::WithTransformProperties;
use netpod::log::*;
use netpod::BinnedRange;
use netpod::BinnedRangeEnum;
use netpod::Dim0Index;
use std::any;
use std::fmt;
use std::ops::ControlFlow;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Instant;
#[allow(unused)]
macro_rules! trace2 {
@@ -50,10 +43,9 @@ where
inp: MergeInp<T>,
range: BinnedRangeEnum,
do_time_weight: bool,
deadline: Instant,
deadline_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
range_final: bool,
binner: Option<<T as TimeBinnableTy>::TimeBinner>,
done_first_input: bool,
done_data: bool,
done: bool,
complete: bool,
@@ -66,7 +58,6 @@ where
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct(any::type_name::<Self>())
.field("range", &self.range)
.field("deadline", &self.deadline)
.field("range_final", &self.range_final)
.field("binner", &self.binner)
.finish()
@@ -77,17 +68,14 @@ impl<T> TimeBinnedStream<T>
where
T: TimeBinnableTy,
{
pub fn new(inp: MergeInp<T>, range: BinnedRangeEnum, do_time_weight: bool, deadline: Instant) -> Self {
let deadline_fut = tokio::time::sleep_until(deadline.into());
let deadline_fut = Box::pin(deadline_fut);
pub fn new(inp: MergeInp<T>, range: BinnedRangeEnum, do_time_weight: bool) -> Self {
Self {
inp,
range,
do_time_weight,
deadline,
deadline_fut,
range_final: false,
binner: None,
done_first_input: false,
done_data: false,
done: false,
complete: false,
@@ -95,7 +83,7 @@ where
}
fn process_item(&mut self, mut item: T) -> () {
trace!("process_item {item:?}");
info!("process_item {item:?}");
if self.binner.is_none() {
trace!("process_item call time_binner_new");
let binner = item.time_binner_new(self.range.clone(), self.do_time_weight);
@@ -105,6 +93,140 @@ where
trace!("process_item call binner ingest");
binner.ingest(&mut item);
}
fn handle_data_item(
&mut self,
item: T,
) -> Result<ControlFlow<Poll<Sitemty<<<T as TimeBinnableTy>::TimeBinner as TimeBinnerTy>::Output>>>, Error> {
use ControlFlow::*;
use Poll::*;
info!("================= handle_data_item");
let item_len = item.len();
self.process_item(item);
let mut do_emit = false;
if self.done_first_input == false {
info!(
"emit container after the first input len {} binner {}",
item_len,
self.binner.is_some()
);
if self.binner.is_none() {
let e = Error::with_msg_no_trace("must emit on first input but no binner");
self.done = true;
return Err(e);
}
do_emit = true;
self.done_first_input = true;
}
if let Some(binner) = self.binner.as_mut() {
trace3!("bins ready count {}", binner.bins_ready_count());
if binner.bins_ready_count() > 0 {
do_emit = true
}
if do_emit {
if let Some(bins) = binner.bins_ready() {
Ok(Break(Ready(sitem_data(bins))))
} else {
warn!("bins ready but got nothing");
if let Some(bins) = binner.empty() {
Ok(Break(Ready(sitem_data(bins))))
} else {
let e = Error::with_msg_no_trace("bins ready, but nothing, can not even create empty A");
error!("{e}");
Err(e)
}
}
} else {
trace3!("not emit");
Ok(ControlFlow::Continue(()))
}
} else {
warn!("processed item, but no binner yet");
Ok(ControlFlow::Continue(()))
}
}
fn handle_item(
&mut self,
item: Result<StreamItem<RangeCompletableItem<T>>, Error>,
) -> Result<ControlFlow<Poll<Sitemty<<<T as TimeBinnableTy>::TimeBinner as TimeBinnerTy>::Output>>>, Error> {
use ControlFlow::*;
use Poll::*;
info!("================= handle_item");
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
debug!("see RangeComplete");
self.range_final = true;
Ok(Continue(()))
}
RangeCompletableItem::Data(item) => self.handle_data_item(item),
},
StreamItem::Log(item) => Ok(Break(Ready(Ok(StreamItem::Log(item))))),
StreamItem::Stats(item) => Ok(Break(Ready(Ok(StreamItem::Stats(item))))),
},
Err(e) => {
self.done = true;
Err(e)
}
}
}
fn handle_none(
&mut self,
) -> Result<ControlFlow<Poll<Sitemty<<<T as TimeBinnableTy>::TimeBinner as TimeBinnerTy>::Output>>>, Error> {
use ControlFlow::*;
use Poll::*;
info!("================= handle_none");
let self_range_complete = self.range_final;
if let Some(binner) = self.binner.as_mut() {
trace!("bins ready count before finish {}", binner.bins_ready_count());
// TODO rework the finish logic
if self_range_complete {
binner.set_range_complete();
}
binner.push_in_progress(false);
trace!("bins ready count after finish {}", binner.bins_ready_count());
if let Some(bins) = binner.bins_ready() {
self.done_data = true;
Ok(Break(Ready(sitem_data(bins))))
} else {
warn!("bins ready but got nothing");
if let Some(bins) = binner.empty() {
Ok(Break(Ready(sitem_data(bins))))
} else {
let e = Error::with_msg_no_trace("bins ready, but nothing, can not even create empty B");
error!("{e}");
self.done_data = true;
Err(e)
}
}
} else {
warn!("input stream finished, still no binner");
self.done_data = true;
let e = Error::with_msg_no_trace(format!("input stream finished, still no binner"));
Err(e)
}
}
// TODO
// Original block inside the poll loop was able to:
// continue
// break with Poll<Option<Item>>
fn poll_input(
&mut self,
cx: &mut Context,
) -> Result<ControlFlow<Poll<Sitemty<<<T as TimeBinnableTy>::TimeBinner as TimeBinnerTy>::Output>>>, Error> {
use ControlFlow::*;
use Poll::*;
info!("================= poll_input");
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => self.handle_item(item),
Ready(None) => self.handle_none(),
Pending => Ok(Break(Pending)),
}
}
}
impl<T> Stream for TimeBinnedStream<T>
@@ -115,8 +237,9 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let span = span!(Level::INFO, "poll");
let span = span!(Level::INFO, "TimeBinner");
let _spg = span.enter();
info!("================= POLL");
loop {
break if self.complete {
panic!("poll on complete")
@@ -131,109 +254,18 @@ where
continue;
}
} else {
match self.deadline_fut.poll_unpin(cx) {
Ready(()) => {
trace2!("timeout");
let self_range_complete = self.range_final;
if let Some(binner) = self.binner.as_mut() {
trace2!("bins ready count before finish {}", binner.bins_ready_count());
// TODO rework the finish logic
if self_range_complete {
binner.set_range_complete();
}
trace2!("bins ready count after finish {}", binner.bins_ready_count());
if let Some(bins) = binner.bins_ready() {
self.done_data = true;
return Ready(Some(sitem_data(bins)));
} else {
self.done_data = true;
continue;
}
} else {
continue;
}
}
Pending => {}
}
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
debug!("see RangeComplete");
self.range_final = true;
continue;
}
RangeCompletableItem::Data(item) => {
self.process_item(item);
if let Some(binner) = self.binner.as_mut() {
trace3!("bins ready count {}", binner.bins_ready_count());
if binner.bins_ready_count() > 0 {
if let Some(bins) = binner.bins_ready() {
Ready(Some(sitem_data(bins)))
} else {
trace2!("bins ready but got nothing");
Pending
}
} else {
trace3!("no bins ready yet");
continue;
}
} else {
trace2!("processed item, but no binner yet");
continue;
}
}
},
StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))),
StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))),
match self.poll_input(cx) {
Ok(item) => match item {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(item) => match item {
Ready(item) => break Ready(Some(item)),
Pending => break Pending,
},
Err(e) => {
self.done_data = true;
Ready(Some(Err(e)))
}
},
Ready(None) => {
trace!("finish up");
let self_range_complete = self.range_final;
if let Some(binner) = self.binner.as_mut() {
trace!("bins ready count before finish {}", binner.bins_ready_count());
// TODO rework the finish logic
if self_range_complete {
binner.set_range_complete();
}
binner.push_in_progress(false);
trace!("bins ready count after finish {}", binner.bins_ready_count());
if binner.bins_ready_count() > 0 {
if let Some(bins) = binner.bins_ready() {
self.done_data = true;
Ready(Some(sitem_data(bins)))
} else {
trace2!("bins ready but got nothing");
self.done_data = true;
let e = Error::with_msg_no_trace(format!("bins ready but got nothing"));
Ready(Some(Err(e)))
}
} else {
if let Some(bins) = binner.empty() {
trace!("at end of stream, bin count zero, return {bins:?}");
self.done_data = true;
Ready(Some(sitem_data(bins)))
} else {
error!("at the end, no bins, can not get empty");
self.done_data = true;
let e = Error::with_msg_no_trace(format!("no bins"))
.add_public_msg(format!("unable to produce bins"));
Ready(Some(Err(e)))
}
}
} else {
trace!("input stream finished, still no binner");
self.done_data = true;
continue;
}
Err(e) => {
self.done = true;
break Ready(Some(Err(e)));
}
Pending => Pending,
}
};
}

View File

@@ -40,7 +40,7 @@ async fn timebinnable_stream(
one_before_range: bool,
cluster: Cluster,
) -> Result<TimeBinnableStreamBox, Error> {
let evq = PlainEventsQuery::new(query.channel().clone(), query.range().clone()).for_time_weighted_scalar();
let evq = PlainEventsQuery::new(query.channel().clone(), range.clone()).for_time_weighted_scalar();
let mut tr = build_merged_event_transform(evq.transform())?;
let inps = open_tcp_streams::<_, ChannelEvents>(&evq, &cluster).await?;
@@ -66,20 +66,19 @@ async fn timebinnable_stream(
async fn timebinned_stream(
query: BinnedQuery,
binned_range: BinnedRangeEnum,
cluster: Cluster,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinned>>> + Send>>, Error> {
let deadline = Instant::now();
let range: NanoRange = query.range().try_into()?;
let range = binned_range.binned_range_time().to_nano_range();
//let binned_range = BinnedRangeEnum::covering_range(SeriesRange::TimeRange(NanoRange { beg: 123, end: 456 }), 10)?;
let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?;
let do_time_weight = true;
let one_before_range = true;
let stream = timebinnable_stream(query.clone(), range, one_before_range, cluster).await?;
let stream: Pin<Box<dyn TimeBinnableStreamTrait>> = stream.0;
let stream = Box::pin(stream);
let stream = TimeBinnedStream::new(stream, binned_range, do_time_weight, deadline);
// TODO rename TimeBinnedStream to make it more clear that it is the component which initiates the time binning.
let stream = TimeBinnedStream::new(stream, binned_range, do_time_weight);
let stream: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinned>>> + Send>> = Box::pin(stream);
Ok(stream)
}
@@ -99,13 +98,13 @@ fn timebinned_to_collectable(
}
pub async fn timebinned_json(query: BinnedQuery, _chconf: ChConf, cluster: Cluster) -> Result<JsonValue, Error> {
let deadline = Instant::now();
let evq = PlainEventsQuery::new(query.channel().clone(), query.range().clone()).for_time_weighted_scalar();
let collect_range = evq.range().clone();
let events_max = evq.events_max();
let stream = timebinned_stream(query.clone(), cluster).await?;
info!("~~~~~~~~~~~ timebinned_json");
let deadline = Instant::now().checked_add(query.timeout_value()).unwrap();
let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?;
let collect_max = 10000;
let stream = timebinned_stream(query.clone(), binned_range.clone(), cluster).await?;
let stream = timebinned_to_collectable(stream);
let collected = Collect::new(stream, deadline, events_max, Some(collect_range), None);
let collected = Collect::new(stream, deadline, collect_max, None, Some(binned_range));
let collected: BoxFuture<_> = Box::pin(collected);
let collected = collected.await?;
let jsval = serde_json::to_value(&collected)?;