Add alternative time binner
This commit is contained in:
@@ -3,7 +3,6 @@ use futures_util::{Stream, StreamExt};
|
||||
use items::{RangeCompletableItem, Sitemty, StreamItem};
|
||||
use items_2::streams::{Collectable, Collector};
|
||||
use netpod::log::*;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value as JsonValue;
|
||||
use std::fmt;
|
||||
use std::time::Duration;
|
||||
@@ -11,12 +10,7 @@ use std::time::Duration;
|
||||
// This is meant to work with trait object event containers (crate items_2)
|
||||
|
||||
// TODO rename, it is also used for binned:
|
||||
pub async fn collect_plain_events_json<T, S>(
|
||||
stream: S,
|
||||
timeout: Duration,
|
||||
events_max: u64,
|
||||
do_log: bool,
|
||||
) -> Result<JsonValue, Error>
|
||||
pub async fn collect_plain_events_json<T, S>(stream: S, timeout: Duration, events_max: u64) -> Result<JsonValue, Error>
|
||||
where
|
||||
S: Stream<Item = Sitemty<T>> + Unpin,
|
||||
T: Collectable + fmt::Debug,
|
||||
@@ -52,9 +46,7 @@ where
|
||||
match item {
|
||||
Ok(item) => match item {
|
||||
StreamItem::Log(item) => {
|
||||
if do_log {
|
||||
debug!("collect_plain_events_json log {:?}", item);
|
||||
}
|
||||
trace!("collect_plain_events_json log {:?}", item);
|
||||
}
|
||||
StreamItem::Stats(item) => {
|
||||
use items::StatsItem;
|
||||
|
||||
@@ -8,3 +8,6 @@ pub mod needminbuffer;
|
||||
pub mod plaineventsjson;
|
||||
pub mod rangefilter;
|
||||
pub mod tcprawclient;
|
||||
#[cfg(test)]
|
||||
pub mod test;
|
||||
pub mod timebin;
|
||||
|
||||
@@ -27,7 +27,7 @@ where
|
||||
SER: Serialize,
|
||||
{
|
||||
let inps = open_tcp_streams(&query, cluster).await?;
|
||||
let mut merged = items_2::merger::ChannelEventsMerger::new(inps);
|
||||
let mut merged = items_2::merger_cev::ChannelEventsMerger::new(inps);
|
||||
let timeout = Duration::from_millis(2000);
|
||||
let events_max = 100;
|
||||
let do_log = false;
|
||||
|
||||
86
streams/src/test.rs
Normal file
86
streams/src/test.rs
Normal file
@@ -0,0 +1,86 @@
|
||||
#[cfg(test)]
|
||||
mod timebin;
|
||||
|
||||
use err::Error;
|
||||
use futures_util::{stream, Stream, StreamExt};
|
||||
use items::{sitem_data, Sitemty};
|
||||
use items_2::eventsdim0::EventsDim0;
|
||||
use items_2::merger_cev::ChannelEventsMerger;
|
||||
use items_2::{ChannelEvents, Empty};
|
||||
use netpod::timeunits::SEC;
|
||||
use std::pin::Pin;
|
||||
use std::time::Duration;
|
||||
|
||||
type BoxedEventStream = Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>;
|
||||
|
||||
// TODO use some xorshift generator.
|
||||
|
||||
fn inmem_test_events_d0_i32_00() -> BoxedEventStream {
|
||||
let mut evs = EventsDim0::empty();
|
||||
evs.push(SEC * 1, 1, 10001);
|
||||
evs.push(SEC * 4, 4, 10004);
|
||||
let cev = ChannelEvents::Events(Box::new(evs));
|
||||
let item = sitem_data(cev);
|
||||
let stream = stream::iter(vec![item]);
|
||||
Box::pin(stream)
|
||||
}
|
||||
|
||||
fn inmem_test_events_d0_i32_01() -> BoxedEventStream {
|
||||
let mut evs = EventsDim0::empty();
|
||||
evs.push(SEC * 2, 2, 10002);
|
||||
let cev = ChannelEvents::Events(Box::new(evs));
|
||||
let item = sitem_data(cev);
|
||||
let stream = stream::iter(vec![item]);
|
||||
Box::pin(stream)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn empty_input() -> Result<(), Error> {
|
||||
// TODO with a pipeline of x-binning, merging, t-binning and collection, how do I get a meaningful
|
||||
// result even if there is no input data at all?
|
||||
Err(Error::with_msg_no_trace("TODO"))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn merge_channel_events() -> Result<(), Error> {
|
||||
let fut = async {
|
||||
let inp0 = inmem_test_events_d0_i32_00();
|
||||
let inp1 = inmem_test_events_d0_i32_01();
|
||||
let mut merged = ChannelEventsMerger::new(vec![inp0, inp1]);
|
||||
while let Some(item) = merged.next().await {
|
||||
eprintln!("item {item:?}");
|
||||
}
|
||||
let timeout = Duration::from_millis(4000);
|
||||
let events_max = 10000;
|
||||
// TODO add event collection
|
||||
let collected = crate::collect::collect_plain_events_json(merged, timeout, events_max).await?;
|
||||
Ok(())
|
||||
};
|
||||
runfut(fut)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn merge_mergeable_00() -> Result<(), Error> {
|
||||
let fut = async {
|
||||
let inp0 = inmem_test_events_d0_i32_00();
|
||||
let inp1 = inmem_test_events_d0_i32_01();
|
||||
let mut merger = items_2::merger::Merger::new(vec![inp0, inp1], 4);
|
||||
Ok(())
|
||||
};
|
||||
runfut(fut)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn timeout() -> Result<(), Error> {
|
||||
// TODO expand from items_2::test
|
||||
Err(Error::with_msg_no_trace("TODO"))
|
||||
}
|
||||
|
||||
fn runfut<T, F>(fut: F) -> Result<T, err::Error>
|
||||
where
|
||||
F: std::future::Future<Output = Result<T, Error>>,
|
||||
{
|
||||
use futures_util::TryFutureExt;
|
||||
let fut = fut.map_err(|e| e.into());
|
||||
taskrun::run(fut)
|
||||
}
|
||||
31
streams/src/test/timebin.rs
Normal file
31
streams/src/test/timebin.rs
Normal file
@@ -0,0 +1,31 @@
|
||||
use crate::test::runfut;
|
||||
use futures_util::{stream, StreamExt};
|
||||
use items::sitem_data;
|
||||
use items_2::testgen::make_some_boxed_d0_f32;
|
||||
use items_2::{ChannelEvents, ConnStatus, ConnStatusEvent};
|
||||
use netpod::timeunits::{MS, SEC};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
#[test]
|
||||
fn time_bin_00() {
|
||||
let fut = async {
|
||||
let edges = [0, 1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(|x| SEC * x).collect();
|
||||
let evs0 = make_some_boxed_d0_f32(10, SEC * 1, MS * 500, 0, 1846713782);
|
||||
let v0 = ChannelEvents::Events(evs0);
|
||||
let v2 = ChannelEvents::Status(ConnStatusEvent::new(MS * 100, ConnStatus::Connect));
|
||||
let v4 = ChannelEvents::Status(ConnStatusEvent::new(MS * 6000, ConnStatus::Disconnect));
|
||||
let stream0 = Box::pin(stream::iter(vec![
|
||||
//
|
||||
sitem_data(v2),
|
||||
sitem_data(v0),
|
||||
sitem_data(v4),
|
||||
]));
|
||||
let deadline = Instant::now() + Duration::from_millis(2000);
|
||||
let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream0, edges, true, deadline);
|
||||
while let Some(item) = binned_stream.next().await {
|
||||
eprintln!("{item:?}");
|
||||
}
|
||||
Ok(())
|
||||
};
|
||||
runfut(fut).unwrap()
|
||||
}
|
||||
198
streams/src/timebin.rs
Normal file
198
streams/src/timebin.rs
Normal file
@@ -0,0 +1,198 @@
|
||||
use err::Error;
|
||||
use futures_util::{Future, FutureExt, Stream, StreamExt};
|
||||
use items::{sitem_data, RangeCompletableItem, Sitemty, StreamItem};
|
||||
use items_2::timebin::{TimeBinnable, TimeBinner};
|
||||
use netpod::log::*;
|
||||
use std::fmt;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Instant;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace2 {
|
||||
(D$($arg:tt)*) => ();
|
||||
($($arg:tt)*) => (eprintln!($($arg)*));
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace3 {
|
||||
(D$($arg:tt)*) => ();
|
||||
($($arg:tt)*) => (eprintln!($($arg)*));
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace4 {
|
||||
(D$($arg:tt)*) => ();
|
||||
($($arg:tt)*) => (eprintln!($($arg)*));
|
||||
}
|
||||
|
||||
type MergeInp<T> = Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>;
|
||||
|
||||
pub struct TimeBinnedStream<T>
|
||||
where
|
||||
T: TimeBinnable,
|
||||
{
|
||||
inp: MergeInp<T>,
|
||||
edges: Vec<u64>,
|
||||
do_time_weight: bool,
|
||||
deadline: Instant,
|
||||
deadline_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
|
||||
range_complete: bool,
|
||||
binner: Option<<T as TimeBinnable>::TimeBinner>,
|
||||
done_data: bool,
|
||||
done: bool,
|
||||
complete: bool,
|
||||
}
|
||||
|
||||
impl<T> fmt::Debug for TimeBinnedStream<T>
|
||||
where
|
||||
T: TimeBinnable,
|
||||
{
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt.debug_struct("TimeBinnedStream")
|
||||
.field("edges", &self.edges)
|
||||
.field("deadline", &self.deadline)
|
||||
.field("range_complete", &self.range_complete)
|
||||
.field("binner", &self.binner)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> TimeBinnedStream<T>
|
||||
where
|
||||
T: TimeBinnable,
|
||||
{
|
||||
pub fn new(inp: MergeInp<T>, edges: Vec<u64>, do_time_weight: bool, deadline: Instant) -> Self {
|
||||
let deadline_fut = tokio::time::sleep_until(deadline.into());
|
||||
let deadline_fut = Box::pin(deadline_fut);
|
||||
Self {
|
||||
inp,
|
||||
edges,
|
||||
do_time_weight,
|
||||
deadline,
|
||||
deadline_fut,
|
||||
range_complete: false,
|
||||
binner: None,
|
||||
done_data: false,
|
||||
done: false,
|
||||
complete: false,
|
||||
}
|
||||
}
|
||||
|
||||
fn process_item(&mut self, mut item: T) -> () {
|
||||
if self.binner.is_none() {
|
||||
let binner = item.time_binner_new(self.edges.clone(), self.do_time_weight);
|
||||
self.binner = Some(binner);
|
||||
}
|
||||
let binner = self.binner.as_mut().unwrap();
|
||||
binner.ingest(&mut item);
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Stream for TimeBinnedStream<T>
|
||||
where
|
||||
T: TimeBinnable + Unpin,
|
||||
{
|
||||
type Item = Sitemty<<<T as TimeBinnable>::TimeBinner as TimeBinner>::Output>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
let span = tracing::span!(tracing::Level::TRACE, "poll");
|
||||
let _spg = span.enter();
|
||||
loop {
|
||||
break if self.complete {
|
||||
panic!("poll on complete")
|
||||
} else if self.done {
|
||||
self.complete = true;
|
||||
Ready(None)
|
||||
} else if self.done_data {
|
||||
self.done = true;
|
||||
if self.range_complete {
|
||||
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
match self.deadline_fut.poll_unpin(cx) {
|
||||
Ready(()) => {
|
||||
// TODO add timeout behavior
|
||||
todo!();
|
||||
}
|
||||
Pending => {}
|
||||
}
|
||||
match self.inp.poll_next_unpin(cx) {
|
||||
Ready(Some(item)) => match item {
|
||||
Ok(item) => match item {
|
||||
StreamItem::DataItem(item) => match item {
|
||||
RangeCompletableItem::RangeComplete => {
|
||||
debug!("see RangeComplete");
|
||||
self.range_complete = true;
|
||||
continue;
|
||||
}
|
||||
RangeCompletableItem::Data(item) => {
|
||||
self.process_item(item);
|
||||
if let Some(binner) = self.binner.as_mut() {
|
||||
trace3!("bins ready count {}", binner.bins_ready_count());
|
||||
if binner.bins_ready_count() > 0 {
|
||||
if let Some(bins) = binner.bins_ready() {
|
||||
Ready(Some(sitem_data(bins)))
|
||||
} else {
|
||||
trace2!("bins ready but got nothing");
|
||||
Pending
|
||||
}
|
||||
} else {
|
||||
trace3!("no bins ready yet");
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
trace2!("processed item, but no binner yet");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
},
|
||||
StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))),
|
||||
StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))),
|
||||
},
|
||||
Err(e) => {
|
||||
self.done_data = true;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
},
|
||||
Ready(None) => {
|
||||
trace2!("finish up");
|
||||
let self_range_complete = self.range_complete;
|
||||
if let Some(binner) = self.binner.as_mut() {
|
||||
trace2!("bins ready count before finish {}", binner.bins_ready_count());
|
||||
// TODO rework the finish logic
|
||||
if self_range_complete {
|
||||
binner.set_range_complete();
|
||||
}
|
||||
binner.push_in_progress(false);
|
||||
trace2!("bins ready count after finish {}", binner.bins_ready_count());
|
||||
if binner.bins_ready_count() > 0 {
|
||||
if let Some(bins) = binner.bins_ready() {
|
||||
self.done_data = true;
|
||||
Ready(Some(sitem_data(bins)))
|
||||
} else {
|
||||
trace2!("bins ready but got nothing");
|
||||
self.done_data = true;
|
||||
let e = Error::with_msg_no_trace(format!("bins ready but got nothing"));
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
} else {
|
||||
trace2!("no bins ready yet");
|
||||
self.done_data = true;
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
trace2!("input stream finished, still no binner");
|
||||
self.done_data = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user