From b3225ae4c195780a6466e70835b928b7568a44c7 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 18 Nov 2022 12:59:55 +0100 Subject: [PATCH] Add alternative time binner --- httpret/src/events.rs | 2 +- items_2/src/eventsdim0.rs | 58 +++++-- items_2/src/items_2.rs | 237 +++++++++++++++++++++++-- items_2/src/merger.rs | 308 ++++++++++++++++++++++----------- items_2/src/merger_cev.rs | 295 +++++++++++++++++++++++++++++++ items_2/src/test.rs | 118 ++++++++++++- items_2/src/testgen.rs | 23 +++ items_2/src/timebin.rs | 29 ++++ netpod/src/netpod.rs | 24 ++- streams/Cargo.toml | 3 + streams/src/collect.rs | 12 +- streams/src/lib.rs | 3 + streams/src/plaineventsjson.rs | 2 +- streams/src/test.rs | 86 +++++++++ streams/src/test/timebin.rs | 31 ++++ streams/src/timebin.rs | 198 +++++++++++++++++++++ taskrun/src/taskrun.rs | 31 ++-- 17 files changed, 1283 insertions(+), 177 deletions(-) create mode 100644 items_2/src/merger_cev.rs create mode 100644 items_2/src/testgen.rs create mode 100644 items_2/src/timebin.rs create mode 100644 streams/src/test.rs create mode 100644 streams/src/test/timebin.rs create mode 100644 streams/src/timebin.rs diff --git a/httpret/src/events.rs b/httpret/src/events.rs index 3fa3135..7925a0f 100644 --- a/httpret/src/events.rs +++ b/httpret/src/events.rs @@ -4,7 +4,7 @@ use crate::{response, response_err, BodyStream, ToPublicResponse}; use futures_util::{Stream, StreamExt, TryStreamExt}; use http::{Method, Request, Response, StatusCode}; use hyper::Body; -use items_2::merger::ChannelEventsMerger; +use items_2::merger_cev::ChannelEventsMerger; use items_2::{binned_collected, empty_events_dyn, empty_events_dyn_2, ChannelEvents}; use netpod::query::{BinnedQuery, ChannelStateEventsQuery, PlainEventsQuery}; use netpod::{log::*, HasBackend}; diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 344ef57..bc61df4 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -5,6 +5,7 @@ use crate::{Empty, Events, ScalarOps, WithLen}; use crate::{TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinner}; use err::Error; use netpod::log::*; +use netpod::timeunits::SEC; use netpod::NanoRange; use serde::{Deserialize, Serialize}; use std::any::Any; @@ -53,15 +54,25 @@ where NTY: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!( - fmt, - "count {} ts {:?} .. {:?} vals {:?} .. {:?}", - self.tss.len(), - self.tss.front(), - self.tss.back(), - self.values.front(), - self.values.back(), - ) + if true { + write!( + fmt, + "EventsDim0 {{ count {} ts {:?} vals {:?} }}", + self.tss.len(), + self.tss.iter().map(|x| x / SEC).collect::>(), + self.values, + ) + } else { + write!( + fmt, + "EventsDim0 {{ count {} ts {:?} .. {:?} vals {:?} .. {:?} }}", + self.tss.len(), + self.tss.front().map(|x| x / SEC), + self.tss.back().map(|x| x / SEC), + self.values.front(), + self.values.back(), + ) + } } } @@ -539,7 +550,8 @@ impl Events for EventsDim0 { } fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box { - let n1 = self.tss.iter().take_while(|&&x| x < ts_end).count(); + // TODO improve the search + let n1 = self.tss.iter().take_while(|&&x| x <= ts_end).count(); let tss = self.tss.drain(..n1).collect(); let pulses = self.pulses.drain(..n1).collect(); let values = self.values.drain(..n1).collect(); @@ -547,6 +559,32 @@ impl Events for EventsDim0 { Box::new(ret) } + fn move_into_fresh(&mut self, ts_end: u64) -> Box { + // TODO improve the search + let n1 = self.tss.iter().take_while(|&&x| x <= ts_end).count(); + let tss = self.tss.drain(..n1).collect(); + let pulses = self.pulses.drain(..n1).collect(); + let values = self.values.drain(..n1).collect(); + let ret = Self { tss, pulses, values }; + Box::new(ret) + } + + fn move_into_existing(&mut self, tgt: &mut Box, ts_end: u64) -> Result<(), ()> { + // TODO as_any and as_any_mut are declared on unrealted traits. Simplify. + if let Some(tgt) = tgt.as_any_mut().downcast_mut::() { + // TODO improve the search + let n1 = self.tss.iter().take_while(|&&x| x <= ts_end).count(); + // TODO make it harder to forget new members when the struct may get modified in the future + tgt.tss.extend(self.tss.drain(..n1)); + tgt.pulses.extend(self.pulses.drain(..n1)); + tgt.values.extend(self.values.drain(..n1)); + Ok(()) + } else { + eprintln!("downcast to EventsDim0 FAILED"); + Err(()) + } + } + fn ts_min(&self) -> Option { self.tss.front().map(|&x| x) } diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index 77774e3..9594595 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -1,10 +1,14 @@ pub mod binsdim0; pub mod eventsdim0; pub mod merger; +pub mod merger_cev; pub mod streams; #[cfg(test)] pub mod test; +pub mod testgen; +pub mod timebin; +use crate::streams::Collector; use chrono::{DateTime, TimeZone, Utc}; use futures_util::FutureExt; use futures_util::Stream; @@ -14,6 +18,7 @@ use items::RangeCompletableItem; use items::Sitemty; use items::StreamItem; use items::SubFrId; +use merger_cev::MergeableCev; use netpod::log::*; use netpod::timeunits::*; use netpod::{AggKind, NanoRange, ScalarType, Shape}; @@ -27,8 +32,6 @@ use std::time::Instant; use streams::Collectable; use streams::ToJsonResult; -use crate::streams::Collector; - pub fn bool_is_false(x: &bool) -> bool { *x == false } @@ -276,6 +279,8 @@ pub trait TimeBinner: Send { fn set_range_complete(&mut self); } +// TODO remove the Any bound. Factor out into custom AsAny trait. + /// Provides a time-binned representation of the implementing type. /// In contrast to `TimeBinnableType` this is meant for trait objects. pub trait TimeBinnable: fmt::Debug + WithLen + RangeOverlapInfo + Any + Send { @@ -287,6 +292,8 @@ pub trait TimeBinnable: fmt::Debug + WithLen + RangeOverlapInfo + Any + Send { fn to_box_to_json_result(&self) -> Box; } +// TODO can I remove the Any bound? + /// Container of some form of events, for use as trait object. pub trait Events: fmt::Debug + Any + Collectable + TimeBinnable + Send + erased_serde::Serialize { fn as_time_binnable(&self) -> &dyn TimeBinnable; @@ -296,6 +303,8 @@ pub trait Events: fmt::Debug + Any + Collectable + TimeBinnable + Send + erased_ fn ts_min(&self) -> Option; fn ts_max(&self) -> Option; fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box; + fn move_into_fresh(&mut self, ts_end: u64) -> Box; + fn move_into_existing(&mut self, tgt: &mut Box, ts_end: u64) -> Result<(), ()>; fn clone_dyn(&self) -> Box; fn partial_eq_dyn(&self, other: &dyn Events) -> bool; fn serde_id(&self) -> &'static str; @@ -304,6 +313,35 @@ pub trait Events: fmt::Debug + Any + Collectable + TimeBinnable + Send + erased_ erased_serde::serialize_trait_object!(Events); +impl crate::merger::Mergeable for Box { + fn len(&self) -> usize { + self.as_ref().len() + } + + fn ts_min(&self) -> Option { + self.as_ref().ts_min() + } + + fn ts_max(&self) -> Option { + self.as_ref().ts_max() + } + + fn is_compatible_target(&self, _tgt: &Self) -> bool { + // TODO currently unused + todo!() + } + + fn move_into_fresh(&mut self, ts_end: u64) -> Self { + self.as_mut().move_into_fresh(ts_end) + } + + fn move_into_existing(&mut self, tgt: &mut Self, ts_end: u64) -> Result<(), merger::MergeError> { + self.as_mut() + .move_into_existing(tgt, ts_end) + .map_err(|()| merger::MergeError::NotCompatible) + } +} + impl PartialEq for Box { fn eq(&self, other: &Self) -> bool { Events::partial_eq_dyn(self.as_ref(), other.as_ref()) @@ -319,7 +357,7 @@ impl WithLen for EventsCollector { } impl Collector for EventsCollector { - fn ingest(&mut self, src: &mut dyn Collectable) { + fn ingest(&mut self, _src: &mut dyn Collectable) { todo!() } @@ -534,6 +572,7 @@ pub fn empty_binned_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggK } } +// TODO maybe rename to ChannelStatus? #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum ConnStatus { Connect, @@ -546,20 +585,9 @@ pub struct ConnStatusEvent { pub status: ConnStatus, } -trait MergableEvents: Any { - fn ts_min(&self) -> Option; - fn ts_max(&self) -> Option; -} - -impl MergableEvents for Box { - fn ts_min(&self) -> Option { - eprintln!("TODO MergableEvents for Box"); - err::todoval() - } - - fn ts_max(&self) -> Option { - eprintln!("TODO MergableEvents for Box"); - err::todoval() +impl ConnStatusEvent { + pub fn new(ts: u64, status: ConnStatus) -> Self { + Self { ts, status } } } @@ -717,7 +745,7 @@ impl PartialEq for ChannelEvents { } } -impl MergableEvents for ChannelEvents { +impl MergeableCev for ChannelEvents { fn ts_min(&self) -> Option { use ChannelEvents::*; match self { @@ -732,6 +760,179 @@ impl MergableEvents for ChannelEvents { } } +impl crate::merger::Mergeable for ChannelEvents { + fn len(&self) -> usize { + match self { + ChannelEvents::Events(k) => k.len(), + ChannelEvents::Status(_) => 1, + } + } + + fn ts_min(&self) -> Option { + match self { + ChannelEvents::Events(k) => k.ts_min(), + ChannelEvents::Status(k) => Some(k.ts), + } + } + + fn ts_max(&self) -> Option { + match self { + ChannelEvents::Events(k) => k.ts_max(), + ChannelEvents::Status(k) => Some(k.ts), + } + } + + fn is_compatible_target(&self, tgt: &Self) -> bool { + use ChannelEvents::*; + match self { + Events(_) => { + // TODO better to delegate this to inner type? + if let Events(_) = tgt { + true + } else { + false + } + } + Status(_) => { + // TODO better to delegate this to inner type? + if let Status(_) = tgt { + true + } else { + false + } + } + } + } + + fn move_into_fresh(&mut self, ts_end: u64) -> Self { + match self { + ChannelEvents::Events(k) => ChannelEvents::Events(k.move_into_fresh(ts_end)), + ChannelEvents::Status(k) => ChannelEvents::Status(k.clone()), + } + } + + fn move_into_existing(&mut self, tgt: &mut Self, ts_end: u64) -> Result<(), merger::MergeError> { + match self { + ChannelEvents::Events(k) => match tgt { + ChannelEvents::Events(tgt) => k.move_into_existing(tgt, ts_end), + ChannelEvents::Status(_) => Err(merger::MergeError::NotCompatible), + }, + ChannelEvents::Status(_) => match tgt { + ChannelEvents::Events(_) => Err(merger::MergeError::NotCompatible), + ChannelEvents::Status(_) => Err(merger::MergeError::Full), + }, + } + } +} + +impl Collectable for ChannelEvents { + fn new_collector(&self) -> Box { + match self { + ChannelEvents::Events(_item) => todo!(), + ChannelEvents::Status(_) => todo!(), + } + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} + +pub struct ChannelEventsTimeBinner { + // TODO `ConnStatus` contains all the changes that can happen to a connection, but + // here we would rather require a simplified current state for binning purpose. + edges: Vec, + do_time_weight: bool, + conn_state: ConnStatus, + binner: Option>, +} + +impl fmt::Debug for ChannelEventsTimeBinner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ChannelEventsTimeBinner") + .field("conn_state", &self.conn_state) + .finish() + } +} + +impl crate::timebin::TimeBinner for ChannelEventsTimeBinner { + type Input = ChannelEvents; + type Output = Box; + + fn ingest(&mut self, item: &mut Self::Input) { + match item { + ChannelEvents::Events(item) => { + if self.binner.is_none() { + let binner = item.time_binner_new(self.edges.clone(), self.do_time_weight); + self.binner = Some(binner); + } + match self.binner.as_mut() { + Some(binner) => binner.ingest(item.as_time_binnable()), + None => { + error!("ingest without active binner item {item:?}"); + () + } + } + } + ChannelEvents::Status(item) => { + warn!("TODO consider channel status in time binning {item:?}"); + } + } + } + + fn set_range_complete(&mut self) { + match self.binner.as_mut() { + Some(binner) => binner.set_range_complete(), + None => (), + } + } + + fn bins_ready_count(&self) -> usize { + match &self.binner { + Some(binner) => binner.bins_ready_count(), + None => 0, + } + } + + fn bins_ready(&mut self) -> Option { + match self.binner.as_mut() { + Some(binner) => binner.bins_ready(), + None => None, + } + } + + fn push_in_progress(&mut self, push_empty: bool) { + match self.binner.as_mut() { + Some(binner) => binner.push_in_progress(push_empty), + None => (), + } + } + + fn cycle(&mut self) { + match self.binner.as_mut() { + Some(binner) => binner.cycle(), + None => (), + } + } +} + +impl crate::timebin::TimeBinnable for ChannelEvents { + type TimeBinner = ChannelEventsTimeBinner; + + fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Self::TimeBinner { + let (binner, status) = match self { + ChannelEvents::Events(_events) => (None, ConnStatus::Connect), + ChannelEvents::Status(status) => (None, status.status.clone()), + }; + ChannelEventsTimeBinner { + edges, + do_time_weight, + conn_state: status, + binner, + } + } +} + // TODO do this with some blanket impl: impl Collectable for Box { fn new_collector(&self) -> Box { diff --git a/items_2/src/merger.rs b/items_2/src/merger.rs index c676701..1106300 100644 --- a/items_2/src/merger.rs +++ b/items_2/src/merger.rs @@ -1,5 +1,6 @@ -use crate::{ChannelEvents, Error, MergableEvents}; +use crate::Error; use futures_util::{Stream, StreamExt}; +use items::sitem_data; use items::{RangeCompletableItem, Sitemty, StreamItem}; use netpod::log::*; use std::fmt; @@ -7,50 +8,126 @@ use std::ops::ControlFlow; use std::pin::Pin; use std::task::{Context, Poll}; -type MergeInp = Pin> + Send>>; +#[allow(unused)] +macro_rules! trace2 { + ($($arg:tt)*) => (); + ($($arg:tt)*) => (eprintln!($($arg)*)); +} -pub struct ChannelEventsMerger { - inps: Vec>, - items: Vec>, +#[allow(unused)] +macro_rules! trace3 { + ($($arg:tt)*) => (); + ($($arg:tt)*) => (eprintln!($($arg)*)); +} + +#[allow(unused)] +macro_rules! trace4 { + ($($arg:tt)*) => (); + ($($arg:tt)*) => (eprintln!($($arg)*)); +} + +#[derive(Debug)] +pub enum MergeError { + NotCompatible, + Full, +} + +impl From for err::Error { + fn from(e: MergeError) -> Self { + format!("{e:?}").into() + } +} + +pub trait Mergeable: fmt::Debug + Unpin { + fn len(&self) -> usize; + fn ts_min(&self) -> Option; + fn ts_max(&self) -> Option; + // TODO remove, useless. + fn is_compatible_target(&self, tgt: &Rhs) -> bool; + + // TODO rename to `append_*` to make it clear that they simply append, but not re-sort. + fn move_into_fresh(&mut self, ts_end: u64) -> Rhs; + fn move_into_existing(&mut self, tgt: &mut Rhs, ts_end: u64) -> Result<(), MergeError>; +} + +type MergeInp = Pin> + Send>>; + +pub struct Merger { + inps: Vec>>, + items: Vec>, + out: Option, + do_clear_out: bool, + out_max_len: usize, range_complete: bool, done: bool, done2: bool, + done3: bool, complete: bool, } -impl fmt::Debug for ChannelEventsMerger { +impl fmt::Debug for Merger +where + T: Mergeable, +{ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { let inps: Vec<_> = self.inps.iter().map(|x| x.is_some()).collect(); fmt.debug_struct(std::any::type_name::()) .field("inps", &inps) .field("items", &self.items) + .field("out_max_len", &self.out_max_len) .field("range_complete", &self.range_complete) .field("done", &self.done) .field("done2", &self.done2) + .field("done3", &self.done3) .finish() } } -impl ChannelEventsMerger { - pub fn new(inps: Vec) -> Self { +impl Merger +where + T: Mergeable, +{ + pub fn new(inps: Vec>, out_max_len: usize) -> Self { let n = inps.len(); Self { - done: false, - done2: false, - complete: false, inps: inps.into_iter().map(|x| Some(x)).collect(), items: (0..n).into_iter().map(|_| None).collect(), + out: None, + do_clear_out: false, + out_max_len, range_complete: false, + done: false, + done2: false, + done3: false, + complete: false, } } - fn process(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result, Error> { + fn take_into_output_all(&mut self, src: &mut T) -> Result<(), MergeError> { + // TODO optimize the case when some large batch should be added to some existing small batch already in out. + // TODO maybe use two output slots? + self.take_into_output_upto(src, u64::MAX) + } + + fn take_into_output_upto(&mut self, src: &mut T, upto: u64) -> Result<(), MergeError> { + // TODO optimize the case when some large batch should be added to some existing small batch already in out. + // TODO maybe use two output slots? + if self.out.is_none() { + trace2!("move into fresh"); + self.out = Some(src.move_into_fresh(upto)); + Ok(()) + } else { + let out = self.out.as_mut().unwrap(); + src.move_into_existing(out, upto) + } + } + + fn process(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result, Error> { use ControlFlow::*; let mut tslows = [None, None]; for (i1, itemopt) in self.items.iter_mut().enumerate() { if let Some(item) = itemopt { - let t1 = item.ts_min(); - if let Some(t1) = t1 { + if let Some(t1) = item.ts_min() { if let Some((_, a)) = tslows[0] { if t1 < a { tslows[1] = tslows[0]; @@ -70,58 +147,72 @@ impl ChannelEventsMerger { tslows[0] = Some((i1, t1)); } } else { - match item { - ChannelEvents::Events(_) => { - trace!("events item without ts min discovered {item:?}"); - itemopt.take(); - return Ok(Continue(())); - } - ChannelEvents::Status(_) => { - return Err(format!("channel status without timestamp").into()); - } - } + // the item seems empty. + trace2!("empty item, something to do here?"); + *itemopt = None; + return Ok(Continue(())); } } } + trace4!("tslows {tslows:?}"); if let Some((il0, _tl0)) = tslows[0] { if let Some((_il1, tl1)) = tslows[1] { + // There is a second input, take only up to the second highest timestamp let item = self.items[il0].as_mut().unwrap(); - match item { - ChannelEvents::Events(item) => { - if let Some(th0) = item.ts_max() { - if th0 < tl1 { - let ret = self.items[il0].take().unwrap(); - Ok(Break(ret)) - } else { - let ritem = item.take_new_events_until_ts(tl1); - if item.len() == 0 { - // TODO should never be here - self.items[il0] = None; - } - Ok(Break(ChannelEvents::Events(ritem))) + if let Some(th0) = item.ts_max() { + if th0 <= tl1 { + // Can take the whole item + let mut item = self.items[il0].take().unwrap(); + trace3!("Take all from item {item:?}"); + match self.take_into_output_all(&mut item) { + Ok(()) => Ok(Break(())), + Err(MergeError::Full) | Err(MergeError::NotCompatible) => { + // TODO count for stats + trace3!("Put item back"); + self.items[il0] = Some(item); + self.do_clear_out = true; + Ok(Break(())) + } + } + } else { + // Take only up to the lowest ts of the second-lowest input + let mut item = self.items[il0].take().unwrap(); + trace3!("Take up to {tl1} from item {item:?}"); + match self.take_into_output_upto(&mut item, tl1) { + Ok(()) => { + if item.len() == 0 { + // TODO should never be here because we should have taken the whole item + Err(format!("Should have taken the whole item instead").into()) + } else { + self.items[il0] = Some(item); + Ok(Break(())) + } + } + Err(MergeError::Full) | Err(MergeError::NotCompatible) => { + // TODO count for stats + trace3!("Put item back"); + self.items[il0] = Some(item); + self.do_clear_out = true; + Ok(Break(())) } - } else { - // TODO should never be here because ts-max should always exist here. - let ritem = item.take_new_events_until_ts(tl1); - if item.len() == 0 {} - Ok(Break(ChannelEvents::Events(ritem))) } } - ChannelEvents::Status(_) => { - let ret = self.items[il0].take().unwrap(); - Ok(Break(ret)) - } + } else { + // TODO should never be here because ts-max should always exist here. + Err(format!("selected input without max ts").into()) } } else { - let item = self.items[il0].as_mut().unwrap(); - match item { - ChannelEvents::Events(_) => { - let ret = self.items[il0].take().unwrap(); - Ok(Break(ret)) - } - ChannelEvents::Status(_) => { - let ret = self.items[il0].take().unwrap(); - Ok(Break(ret)) + // No other input, take the whole item + let mut item = self.items[il0].take().unwrap(); + trace3!("Take all from item (no other input) {item:?}"); + match self.take_into_output_all(&mut item) { + Ok(()) => Ok(Break(())), + Err(_) => { + // TODO count for stats + trace3!("Put item back"); + self.items[il0] = Some(item); + self.do_clear_out = true; + Ok(Break(())) } } } @@ -131,6 +222,7 @@ impl ChannelEventsMerger { } fn refill(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow> { + trace4!("refill"); use ControlFlow::*; use Poll::*; let mut has_pending = false; @@ -138,6 +230,7 @@ impl ChannelEventsMerger { let item = &self.items[i1]; if item.is_none() { while let Some(inp) = &mut self.inps[i1] { + trace4!("refill while"); match inp.poll_next_unpin(cx) { Ready(Some(Ok(k))) => { match k { @@ -149,22 +242,6 @@ impl ChannelEventsMerger { eprintln!("TODO inp RangeComplete which does not fill slot"); } RangeCompletableItem::Data(k) => { - match &k { - ChannelEvents::Events(events) => { - if events.len() == 0 { - warn!("empty events item {events:?}"); - } else { - trace!( - "\nrefilled with events {}\nREFILLED\n{:?}\n\n", - events.len(), - events - ); - } - } - ChannelEvents::Status(_) => { - eprintln!("TODO inp Status which does not fill slot"); - } - } self.items[i1] = Some(k); break; } @@ -186,6 +263,8 @@ impl ChannelEventsMerger { } } } + } else { + trace4!("refill inp {} has {}", i1, item.as_ref().unwrap().len()); } } if has_pending { @@ -195,17 +274,13 @@ impl ChannelEventsMerger { } } - fn poll2(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow>>> { + fn poll3( + mut self: Pin<&mut Self>, + cx: &mut Context, + has_pending: bool, + ) -> ControlFlow>>> { use ControlFlow::*; use Poll::*; - let mut has_pending = false; - match Self::refill(Pin::new(&mut self), cx) { - Break(Ready(e)) => return Break(Ready(Some(Err(e)))), - Break(Pending) => { - has_pending = true; - } - Continue(()) => {} - } let ninps = self.inps.iter().filter(|a| a.is_some()).count(); let nitems = self.items.iter().filter(|a| a.is_some()).count(); let nitemsmissing = self @@ -214,6 +289,7 @@ impl ChannelEventsMerger { .zip(self.items.iter()) .filter(|(a, b)| a.is_some() && b.is_none()) .count(); + trace3!("ninps {ninps} nitems {nitems} nitemsmissing {nitemsmissing}"); if ninps == 0 && nitems == 0 { self.done = true; Break(Ready(None)) @@ -226,33 +302,74 @@ impl ChannelEventsMerger { } } else { match Self::process(Pin::new(&mut self), cx) { - Ok(Break(item)) => Break(Ready(Some(Ok(item)))), - Ok(Continue(())) => Continue(()), + Ok(Break(())) => { + if let Some(o) = self.out.as_ref() { + // A good threshold varies according to scalar type and shape. + // TODO replace this magic number by a bound on the bytes estimate. + if o.len() >= self.out_max_len || self.do_clear_out { + trace3!("decide to output"); + self.do_clear_out = false; + Break(Ready(Some(Ok(self.out.take().unwrap())))) + } else { + trace4!("output not yet"); + Continue(()) + } + } else { + trace3!("no output candidate"); + Continue(()) + } + } + Ok(Continue(())) => { + trace2!("process returned with Continue"); + Continue(()) + } Err(e) => Break(Ready(Some(Err(e)))), } } } + + fn poll2(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow>>> { + use ControlFlow::*; + use Poll::*; + match Self::refill(Pin::new(&mut self), cx) { + Break(Ready(e)) => Break(Ready(Some(Err(e)))), + Break(Pending) => Self::poll3(self, cx, true), + Continue(()) => Self::poll3(self, cx, false), + } + } } -impl Stream for ChannelEventsMerger { - type Item = Sitemty; +impl Stream for Merger +where + T: Mergeable, +{ + type Item = Sitemty; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - const NAME: &str = "ChannelEventsMerger"; + const NAME: &str = "Merger_mergeable"; let span = span!(Level::TRACE, NAME); let _spanguard = span.enter(); loop { + trace3!("{NAME} poll"); break if self.complete { panic!("poll after complete"); - } else if self.done2 { + } else if self.done3 { self.complete = true; Ready(None) + } else if self.done2 { + self.done3 = true; + if self.range_complete { + warn!("TODO emit range complete only if all inputs signaled complete"); + trace!("{NAME} emit RangeComplete"); + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } else { + continue; + } } else if self.done { self.done2 = true; - if self.range_complete { - trace!("MERGER EMITTING ChannelEvents::RangeComplete"); - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + if let Some(out) = self.out.take() { + Ready(Some(sitem_data(out))) } else { continue; } @@ -260,18 +377,7 @@ impl Stream for ChannelEventsMerger { match Self::poll2(self.as_mut(), cx) { ControlFlow::Continue(()) => continue, ControlFlow::Break(k) => match k { - Ready(Some(Ok(ChannelEvents::Events(item)))) => { - trace!("\n\nMERGER EMITTING\n{:?}\n\n", item); - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data( - ChannelEvents::Events(item), - ))))) - } - Ready(Some(Ok(ChannelEvents::Status(item)))) => { - trace!("\n\nMERGER EMITTING\n{:?}\n\n", item); - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data( - ChannelEvents::Status(item), - ))))) - } + Ready(Some(Ok(item))) => Ready(Some(sitem_data(item))), Ready(Some(Err(e))) => { self.done = true; Ready(Some(Err(e.into()))) diff --git a/items_2/src/merger_cev.rs b/items_2/src/merger_cev.rs new file mode 100644 index 0000000..5c55848 --- /dev/null +++ b/items_2/src/merger_cev.rs @@ -0,0 +1,295 @@ +use crate::{ChannelEvents, Error}; +use futures_util::{Stream, StreamExt}; +use items::{RangeCompletableItem, Sitemty, StreamItem}; +use netpod::log::*; +use std::any::Any; +use std::fmt; +use std::ops::ControlFlow; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pub trait MergeableCev: Any { + fn ts_min(&self) -> Option; + fn ts_max(&self) -> Option; +} + +type MergeInp = Pin> + Send>>; + +pub struct ChannelEventsMerger { + inps: Vec>, + items: Vec>, + range_complete: bool, + done: bool, + done2: bool, + complete: bool, +} + +impl fmt::Debug for ChannelEventsMerger { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let inps: Vec<_> = self.inps.iter().map(|x| x.is_some()).collect(); + fmt.debug_struct(std::any::type_name::()) + .field("inps", &inps) + .field("items", &self.items) + .field("range_complete", &self.range_complete) + .field("done", &self.done) + .field("done2", &self.done2) + .finish() + } +} + +impl ChannelEventsMerger { + pub fn new(inps: Vec) -> Self { + let n = inps.len(); + Self { + done: false, + done2: false, + complete: false, + inps: inps.into_iter().map(|x| Some(x)).collect(), + items: (0..n).into_iter().map(|_| None).collect(), + range_complete: false, + } + } + + fn process(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result, Error> { + use ControlFlow::*; + let mut tslows = [None, None]; + for (i1, itemopt) in self.items.iter_mut().enumerate() { + if let Some(item) = itemopt { + let t1 = item.ts_min(); + if let Some(t1) = t1 { + if let Some((_, a)) = tslows[0] { + if t1 < a { + tslows[1] = tslows[0]; + tslows[0] = Some((i1, t1)); + } else { + if let Some((_, b)) = tslows[1] { + if t1 < b { + tslows[1] = Some((i1, t1)); + } else { + // nothing to do + } + } else { + tslows[1] = Some((i1, t1)); + } + } + } else { + tslows[0] = Some((i1, t1)); + } + } else { + match item { + ChannelEvents::Events(_) => { + trace!("events item without ts min discovered {item:?}"); + itemopt.take(); + return Ok(Continue(())); + } + ChannelEvents::Status(_) => { + return Err(format!("channel status without timestamp").into()); + } + } + } + } + } + if let Some((il0, _tl0)) = tslows[0] { + if let Some((_il1, tl1)) = tslows[1] { + let item = self.items[il0].as_mut().unwrap(); + match item { + ChannelEvents::Events(item) => { + if let Some(th0) = item.ts_max() { + if th0 < tl1 { + let ret = self.items[il0].take().unwrap(); + Ok(Break(ret)) + } else { + let ritem = item.take_new_events_until_ts(tl1); + if item.len() == 0 { + // TODO should never be here + self.items[il0] = None; + } + Ok(Break(ChannelEvents::Events(ritem))) + } + } else { + // TODO should never be here because ts-max should always exist here. + let ritem = item.take_new_events_until_ts(tl1); + if item.len() == 0 {} + Ok(Break(ChannelEvents::Events(ritem))) + } + } + ChannelEvents::Status(_) => { + let ret = self.items[il0].take().unwrap(); + Ok(Break(ret)) + } + } + } else { + let item = self.items[il0].as_mut().unwrap(); + match item { + ChannelEvents::Events(_) => { + let ret = self.items[il0].take().unwrap(); + Ok(Break(ret)) + } + ChannelEvents::Status(_) => { + let ret = self.items[il0].take().unwrap(); + Ok(Break(ret)) + } + } + } + } else { + Err(format!("after low ts search nothing found").into()) + } + } + + fn refill(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow> { + use ControlFlow::*; + use Poll::*; + let mut has_pending = false; + for i1 in 0..self.inps.len() { + let item = &self.items[i1]; + if item.is_none() { + while let Some(inp) = &mut self.inps[i1] { + match inp.poll_next_unpin(cx) { + Ready(Some(Ok(k))) => { + match k { + StreamItem::DataItem(k) => match k { + RangeCompletableItem::RangeComplete => { + trace!("--------------------- ChannelEvents::RangeComplete \n======================"); + // TODO track range complete for all inputs, it's only complete if all inputs are complete. + self.range_complete = true; + eprintln!("TODO inp RangeComplete which does not fill slot"); + } + RangeCompletableItem::Data(k) => { + match &k { + ChannelEvents::Events(events) => { + if events.len() == 0 { + warn!("empty events item {events:?}"); + } else { + trace!( + "\nrefilled with events {}\nREFILLED\n{:?}\n\n", + events.len(), + events + ); + } + } + ChannelEvents::Status(_) => { + eprintln!("TODO inp Status which does not fill slot"); + } + } + self.items[i1] = Some(k); + break; + } + }, + StreamItem::Log(_) => { + eprintln!("TODO inp Log which does not fill slot"); + } + StreamItem::Stats(_) => { + eprintln!("TODO inp Stats which does not fill slot"); + } + } + } + Ready(Some(Err(e))) => return Break(Ready(e.into())), + Ready(None) => { + self.inps[i1] = None; + } + Pending => { + has_pending = true; + } + } + } + } + } + if has_pending { + Break(Pending) + } else { + Continue(()) + } + } + + fn poll2(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow>>> { + use ControlFlow::*; + use Poll::*; + let mut has_pending = false; + match Self::refill(Pin::new(&mut self), cx) { + Break(Ready(e)) => return Break(Ready(Some(Err(e)))), + Break(Pending) => { + has_pending = true; + } + Continue(()) => {} + } + let ninps = self.inps.iter().filter(|a| a.is_some()).count(); + let nitems = self.items.iter().filter(|a| a.is_some()).count(); + let nitemsmissing = self + .inps + .iter() + .zip(self.items.iter()) + .filter(|(a, b)| a.is_some() && b.is_none()) + .count(); + if ninps == 0 && nitems == 0 { + self.done = true; + Break(Ready(None)) + } else if nitemsmissing != 0 { + if !has_pending { + let e = Error::from(format!("missing but no pending")); + Break(Ready(Some(Err(e)))) + } else { + Break(Pending) + } + } else { + match Self::process(Pin::new(&mut self), cx) { + Ok(Break(item)) => Break(Ready(Some(Ok(item)))), + Ok(Continue(())) => Continue(()), + Err(e) => Break(Ready(Some(Err(e)))), + } + } + } +} + +impl Stream for ChannelEventsMerger { + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + const NAME: &str = "ChannelEventsMerger"; + let span = span!(Level::TRACE, NAME); + let _spanguard = span.enter(); + loop { + break if self.complete { + panic!("poll after complete"); + } else if self.done2 { + self.complete = true; + Ready(None) + } else if self.done { + self.done2 = true; + if self.range_complete { + trace!("MERGER EMITTING ChannelEvents::RangeComplete"); + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } else { + continue; + } + } else { + match Self::poll2(self.as_mut(), cx) { + ControlFlow::Continue(()) => continue, + ControlFlow::Break(k) => match k { + Ready(Some(Ok(ChannelEvents::Events(item)))) => { + trace!("\n\nMERGER EMITTING\n{:?}\n\n", item); + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data( + ChannelEvents::Events(item), + ))))) + } + Ready(Some(Ok(ChannelEvents::Status(item)))) => { + trace!("\n\nMERGER EMITTING\n{:?}\n\n", item); + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data( + ChannelEvents::Status(item), + ))))) + } + Ready(Some(Err(e))) => { + self.done = true; + Ready(Some(Err(e.into()))) + } + Ready(None) => { + self.done = true; + continue; + } + Pending => Pending, + }, + } + }; + } + } +} diff --git a/items_2/src/test.rs b/items_2/src/test.rs index a328714..daa63fe 100644 --- a/items_2/src/test.rs +++ b/items_2/src/test.rs @@ -1,16 +1,128 @@ use crate::binsdim0::BinsDim0CollectedResult; use crate::eventsdim0::EventsDim0; -use crate::merger::ChannelEventsMerger; +use crate::merger::{Mergeable, Merger}; +use crate::merger_cev::ChannelEventsMerger; +use crate::testgen::make_some_boxed_d0_f32; use crate::{binned_collected, runfut, ChannelEvents, Empty, Events, IsoDateTime}; use crate::{ConnStatus, ConnStatusEvent, Error}; use chrono::{TimeZone, Utc}; -use futures_util::StreamExt; -use items::{RangeCompletableItem, Sitemty, StreamItem}; +use futures_util::{stream, StreamExt}; +use items::{sitem_data, RangeCompletableItem, Sitemty, StreamItem}; use netpod::log::*; use netpod::timeunits::*; use netpod::{AggKind, BinnedRange, NanoRange, ScalarType, Shape}; use std::time::Duration; +#[test] +fn items_move_events() { + let evs = make_some_boxed_d0_f32(10, SEC, SEC, 0, 1846713782); + let v0 = ChannelEvents::Events(evs); + let mut v1 = v0.clone(); + eprintln!("{v1:?}"); + eprintln!("{}", v1.len()); + let mut v2 = v1.move_into_fresh(4); + eprintln!("{}", v1.len()); + eprintln!("{}", v2.len()); + v1.move_into_existing(&mut v2, u64::MAX).unwrap(); + eprintln!("{}", v1.len()); + eprintln!("{}", v2.len()); + eprintln!("{v1:?}"); + eprintln!("{v2:?}"); + assert_eq!(v1.len(), 0); + assert_eq!(v2.len(), 10); + assert_eq!(v2, v0); +} + +#[test] +fn items_merge_00() { + let fut = async { + use crate::merger::Merger; + let evs0 = make_some_boxed_d0_f32(10, SEC * 1, SEC * 2, 0, 1846713782); + let evs1 = make_some_boxed_d0_f32(10, SEC * 2, SEC * 2, 0, 828764893); + let v0 = ChannelEvents::Events(evs0); + let v1 = ChannelEvents::Events(evs1); + let stream0 = Box::pin(stream::iter(vec![sitem_data(v0)])); + let stream1 = Box::pin(stream::iter(vec![sitem_data(v1)])); + let mut merger = Merger::new(vec![stream0, stream1], 8); + while let Some(item) = merger.next().await { + eprintln!("{item:?}"); + } + Ok(()) + }; + runfut(fut).unwrap(); +} + +#[test] +fn items_merge_01() { + let fut = async { + use crate::merger::Merger; + let evs0 = make_some_boxed_d0_f32(10, SEC * 1, SEC * 2, 0, 1846713782); + let evs1 = make_some_boxed_d0_f32(10, SEC * 2, SEC * 2, 0, 828764893); + let v0 = ChannelEvents::Events(evs0); + let v1 = ChannelEvents::Events(evs1); + let v2 = ChannelEvents::Status(ConnStatusEvent::new(MS * 100, ConnStatus::Connect)); + let v3 = ChannelEvents::Status(ConnStatusEvent::new(MS * 2300, ConnStatus::Disconnect)); + let v4 = ChannelEvents::Status(ConnStatusEvent::new(MS * 2800, ConnStatus::Connect)); + let stream0 = Box::pin(stream::iter(vec![sitem_data(v0)])); + let stream1 = Box::pin(stream::iter(vec![sitem_data(v1)])); + let stream2 = Box::pin(stream::iter(vec![sitem_data(v2), sitem_data(v3), sitem_data(v4)])); + let mut merger = Merger::new(vec![stream0, stream1, stream2], 8); + let mut total_event_count = 0; + while let Some(item) = merger.next().await { + eprintln!("{item:?}"); + let item = item?; + match item { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => {} + RangeCompletableItem::Data(item) => { + total_event_count += item.len(); + } + }, + StreamItem::Log(_) => {} + StreamItem::Stats(_) => {} + } + } + assert_eq!(total_event_count, 23); + Ok(()) + }; + runfut(fut).unwrap(); +} + +#[test] +fn items_merge_02() { + let fut = async { + let evs0 = make_some_boxed_d0_f32(100, SEC * 1, SEC * 2, 0, 1846713782); + let evs1 = make_some_boxed_d0_f32(100, SEC * 2, SEC * 2, 0, 828764893); + let v0 = ChannelEvents::Events(evs0); + let v1 = ChannelEvents::Events(evs1); + let v2 = ChannelEvents::Status(ConnStatusEvent::new(MS * 100, ConnStatus::Connect)); + let v3 = ChannelEvents::Status(ConnStatusEvent::new(MS * 2300, ConnStatus::Disconnect)); + let v4 = ChannelEvents::Status(ConnStatusEvent::new(MS * 2800, ConnStatus::Connect)); + let stream0 = Box::pin(stream::iter(vec![sitem_data(v0)])); + let stream1 = Box::pin(stream::iter(vec![sitem_data(v1)])); + let stream2 = Box::pin(stream::iter(vec![sitem_data(v2), sitem_data(v3), sitem_data(v4)])); + let mut merger = Merger::new(vec![stream0, stream1, stream2], 8); + let mut total_event_count = 0; + while let Some(item) = merger.next().await { + eprintln!("{item:?}"); + let item = item.unwrap(); + match item { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => {} + RangeCompletableItem::Data(item) => { + total_event_count += item.len(); + } + }, + StreamItem::Log(_) => {} + StreamItem::Stats(_) => {} + } + } + assert_eq!(total_event_count, 203); + Ok(()) + }; + runfut(fut).unwrap(); +} + #[test] fn merge01() { let fut = async { diff --git a/items_2/src/testgen.rs b/items_2/src/testgen.rs new file mode 100644 index 0000000..3389f9b --- /dev/null +++ b/items_2/src/testgen.rs @@ -0,0 +1,23 @@ +use crate::eventsdim0::EventsDim0; +use crate::{Empty, Events}; + +#[allow(unused)] +fn xorshift32(state: u32) -> u32 { + let mut x = state; + x ^= x << 13; + x ^= x >> 17; + x ^= x << 5; + x +} + +pub fn make_some_boxed_d0_f32(n: usize, t0: u64, tstep: u64, tmask: u64, seed: u32) -> Box { + let mut vstate = seed; + let mut events = EventsDim0::empty(); + for i in 0..n { + vstate = xorshift32(vstate); + let ts = t0 + i as u64 * tstep + (vstate as u64 & tmask); + let value = i as f32 * 100. + vstate as f32 / u32::MAX as f32 / 10.; + events.push(ts, ts, value); + } + Box::new(events.clone()) +} diff --git a/items_2/src/timebin.rs b/items_2/src/timebin.rs new file mode 100644 index 0000000..21d0550 --- /dev/null +++ b/items_2/src/timebin.rs @@ -0,0 +1,29 @@ +use std::fmt; + +pub trait TimeBinner: fmt::Debug + Unpin { + type Input; + type Output; + + fn ingest(&mut self, item: &mut Self::Input); + + fn set_range_complete(&mut self); + + fn bins_ready_count(&self) -> usize; + + fn bins_ready(&mut self) -> Option; + + /// If there is a bin in progress with non-zero count, push it to the result set. + /// With push_empty == true, a bin in progress is pushed even if it contains no counts. + fn push_in_progress(&mut self, push_empty: bool); + + /// Implies `Self::push_in_progress` but in addition, pushes a zero-count bin if the call + /// to `push_in_progress` did not change the result count, as long as edges are left. + /// The next call to `Self::bins_ready_count` must return one higher count than before. + fn cycle(&mut self); +} + +pub trait TimeBinnable: fmt::Debug + Sized { + type TimeBinner: TimeBinner; + + fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Self::TimeBinner; +} diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index c5c0c8c..26b4a62 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -816,8 +816,8 @@ impl<'de> serde::de::Visitor<'de> for ShapeVis { A: serde::de::MapAccess<'de>, { use serde::de::Error; - while let Some(key) = map.next_key::()? { - return if key == "Wave" { + if let Some(key) = map.next_key::()? { + if key == "Wave" { let n: u32 = map.next_value()?; Ok(Shape::Wave(n)) } else if key == "Image" { @@ -825,9 +825,10 @@ impl<'de> serde::de::Visitor<'de> for ShapeVis { Ok(Shape::Image(a[0], a[1])) } else { Err(A::Error::custom(format!("unexpected key {key:?}"))) - }; + } + } else { + Err(A::Error::custom(format!("invalid shape format"))) } - Err(A::Error::custom(format!("invalid shape format"))) } fn visit_seq(self, mut seq: A) -> Result @@ -1186,7 +1187,7 @@ impl PreBinnedPatchRange { let bs = dt / min_bin_count as u64; let mut i1 = bin_t_len_options.len(); loop { - if i1 <= 0 { + if i1 == 0 { break Ok(None); } else { i1 -= 1; @@ -1409,8 +1410,9 @@ impl BinnedRange { let bs = dt / min_bin_count as u64; let mut i1 = thresholds.len(); loop { - if i1 <= 0 { - panic!(); + if i1 == 0 { + let msg = format!("covering_range thresholds bad i {i1}"); + return Err(Error::with_msg_no_trace(msg)); } else { i1 -= 1; let t = thresholds[i1]; @@ -2027,9 +2029,7 @@ pub struct ChannelInfo { } pub fn f32_close(a: f32, b: f32) -> bool { - if (a - b).abs() < 1e-5 { - true - } else if a / b > 0.9999 && a / b < 1.0001 { + if (a - b).abs() < 1e-4 || (a / b > 0.999 && a / b < 1.001) { true } else { false @@ -2037,9 +2037,7 @@ pub fn f32_close(a: f32, b: f32) -> bool { } pub fn f64_close(a: f64, b: f64) -> bool { - if (a - b).abs() < 1e-5 { - true - } else if a / b > 0.9999 && a / b < 1.0001 { + if (a - b).abs() < 1e-5 || (a / b > 0.9999 && a / b < 1.0001) { true } else { false diff --git a/streams/Cargo.toml b/streams/Cargo.toml index 8f9156c..a381951 100644 --- a/streams/Cargo.toml +++ b/streams/Cargo.toml @@ -24,3 +24,6 @@ items = { path = "../items" } items_2 = { path = "../items_2" } parse = { path = "../parse" } bitshuffle = { path = "../bitshuffle" } + +[dev-dependencies] +taskrun = { path = "../taskrun" } diff --git a/streams/src/collect.rs b/streams/src/collect.rs index 6af8faa..f416980 100644 --- a/streams/src/collect.rs +++ b/streams/src/collect.rs @@ -3,7 +3,6 @@ use futures_util::{Stream, StreamExt}; use items::{RangeCompletableItem, Sitemty, StreamItem}; use items_2::streams::{Collectable, Collector}; use netpod::log::*; -use serde::Serialize; use serde_json::Value as JsonValue; use std::fmt; use std::time::Duration; @@ -11,12 +10,7 @@ use std::time::Duration; // This is meant to work with trait object event containers (crate items_2) // TODO rename, it is also used for binned: -pub async fn collect_plain_events_json( - stream: S, - timeout: Duration, - events_max: u64, - do_log: bool, -) -> Result +pub async fn collect_plain_events_json(stream: S, timeout: Duration, events_max: u64) -> Result where S: Stream> + Unpin, T: Collectable + fmt::Debug, @@ -52,9 +46,7 @@ where match item { Ok(item) => match item { StreamItem::Log(item) => { - if do_log { - debug!("collect_plain_events_json log {:?}", item); - } + trace!("collect_plain_events_json log {:?}", item); } StreamItem::Stats(item) => { use items::StatsItem; diff --git a/streams/src/lib.rs b/streams/src/lib.rs index 751ac3f..3203392 100644 --- a/streams/src/lib.rs +++ b/streams/src/lib.rs @@ -8,3 +8,6 @@ pub mod needminbuffer; pub mod plaineventsjson; pub mod rangefilter; pub mod tcprawclient; +#[cfg(test)] +pub mod test; +pub mod timebin; diff --git a/streams/src/plaineventsjson.rs b/streams/src/plaineventsjson.rs index 2659c29..55e9afb 100644 --- a/streams/src/plaineventsjson.rs +++ b/streams/src/plaineventsjson.rs @@ -27,7 +27,7 @@ where SER: Serialize, { let inps = open_tcp_streams(&query, cluster).await?; - let mut merged = items_2::merger::ChannelEventsMerger::new(inps); + let mut merged = items_2::merger_cev::ChannelEventsMerger::new(inps); let timeout = Duration::from_millis(2000); let events_max = 100; let do_log = false; diff --git a/streams/src/test.rs b/streams/src/test.rs new file mode 100644 index 0000000..cae6246 --- /dev/null +++ b/streams/src/test.rs @@ -0,0 +1,86 @@ +#[cfg(test)] +mod timebin; + +use err::Error; +use futures_util::{stream, Stream, StreamExt}; +use items::{sitem_data, Sitemty}; +use items_2::eventsdim0::EventsDim0; +use items_2::merger_cev::ChannelEventsMerger; +use items_2::{ChannelEvents, Empty}; +use netpod::timeunits::SEC; +use std::pin::Pin; +use std::time::Duration; + +type BoxedEventStream = Pin> + Send>>; + +// TODO use some xorshift generator. + +fn inmem_test_events_d0_i32_00() -> BoxedEventStream { + let mut evs = EventsDim0::empty(); + evs.push(SEC * 1, 1, 10001); + evs.push(SEC * 4, 4, 10004); + let cev = ChannelEvents::Events(Box::new(evs)); + let item = sitem_data(cev); + let stream = stream::iter(vec![item]); + Box::pin(stream) +} + +fn inmem_test_events_d0_i32_01() -> BoxedEventStream { + let mut evs = EventsDim0::empty(); + evs.push(SEC * 2, 2, 10002); + let cev = ChannelEvents::Events(Box::new(evs)); + let item = sitem_data(cev); + let stream = stream::iter(vec![item]); + Box::pin(stream) +} + +#[test] +fn empty_input() -> Result<(), Error> { + // TODO with a pipeline of x-binning, merging, t-binning and collection, how do I get a meaningful + // result even if there is no input data at all? + Err(Error::with_msg_no_trace("TODO")) +} + +#[test] +fn merge_channel_events() -> Result<(), Error> { + let fut = async { + let inp0 = inmem_test_events_d0_i32_00(); + let inp1 = inmem_test_events_d0_i32_01(); + let mut merged = ChannelEventsMerger::new(vec![inp0, inp1]); + while let Some(item) = merged.next().await { + eprintln!("item {item:?}"); + } + let timeout = Duration::from_millis(4000); + let events_max = 10000; + // TODO add event collection + let collected = crate::collect::collect_plain_events_json(merged, timeout, events_max).await?; + Ok(()) + }; + runfut(fut) +} + +#[test] +fn merge_mergeable_00() -> Result<(), Error> { + let fut = async { + let inp0 = inmem_test_events_d0_i32_00(); + let inp1 = inmem_test_events_d0_i32_01(); + let mut merger = items_2::merger::Merger::new(vec![inp0, inp1], 4); + Ok(()) + }; + runfut(fut) +} + +#[test] +fn timeout() -> Result<(), Error> { + // TODO expand from items_2::test + Err(Error::with_msg_no_trace("TODO")) +} + +fn runfut(fut: F) -> Result +where + F: std::future::Future>, +{ + use futures_util::TryFutureExt; + let fut = fut.map_err(|e| e.into()); + taskrun::run(fut) +} diff --git a/streams/src/test/timebin.rs b/streams/src/test/timebin.rs new file mode 100644 index 0000000..657f61e --- /dev/null +++ b/streams/src/test/timebin.rs @@ -0,0 +1,31 @@ +use crate::test::runfut; +use futures_util::{stream, StreamExt}; +use items::sitem_data; +use items_2::testgen::make_some_boxed_d0_f32; +use items_2::{ChannelEvents, ConnStatus, ConnStatusEvent}; +use netpod::timeunits::{MS, SEC}; +use std::time::{Duration, Instant}; + +#[test] +fn time_bin_00() { + let fut = async { + let edges = [0, 1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(|x| SEC * x).collect(); + let evs0 = make_some_boxed_d0_f32(10, SEC * 1, MS * 500, 0, 1846713782); + let v0 = ChannelEvents::Events(evs0); + let v2 = ChannelEvents::Status(ConnStatusEvent::new(MS * 100, ConnStatus::Connect)); + let v4 = ChannelEvents::Status(ConnStatusEvent::new(MS * 6000, ConnStatus::Disconnect)); + let stream0 = Box::pin(stream::iter(vec![ + // + sitem_data(v2), + sitem_data(v0), + sitem_data(v4), + ])); + let deadline = Instant::now() + Duration::from_millis(2000); + let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream0, edges, true, deadline); + while let Some(item) = binned_stream.next().await { + eprintln!("{item:?}"); + } + Ok(()) + }; + runfut(fut).unwrap() +} diff --git a/streams/src/timebin.rs b/streams/src/timebin.rs new file mode 100644 index 0000000..c489026 --- /dev/null +++ b/streams/src/timebin.rs @@ -0,0 +1,198 @@ +use err::Error; +use futures_util::{Future, FutureExt, Stream, StreamExt}; +use items::{sitem_data, RangeCompletableItem, Sitemty, StreamItem}; +use items_2::timebin::{TimeBinnable, TimeBinner}; +use netpod::log::*; +use std::fmt; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Instant; + +#[allow(unused)] +macro_rules! trace2 { + (D$($arg:tt)*) => (); + ($($arg:tt)*) => (eprintln!($($arg)*)); +} + +#[allow(unused)] +macro_rules! trace3 { + (D$($arg:tt)*) => (); + ($($arg:tt)*) => (eprintln!($($arg)*)); +} + +#[allow(unused)] +macro_rules! trace4 { + (D$($arg:tt)*) => (); + ($($arg:tt)*) => (eprintln!($($arg)*)); +} + +type MergeInp = Pin> + Send>>; + +pub struct TimeBinnedStream +where + T: TimeBinnable, +{ + inp: MergeInp, + edges: Vec, + do_time_weight: bool, + deadline: Instant, + deadline_fut: Pin + Send>>, + range_complete: bool, + binner: Option<::TimeBinner>, + done_data: bool, + done: bool, + complete: bool, +} + +impl fmt::Debug for TimeBinnedStream +where + T: TimeBinnable, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("TimeBinnedStream") + .field("edges", &self.edges) + .field("deadline", &self.deadline) + .field("range_complete", &self.range_complete) + .field("binner", &self.binner) + .finish() + } +} + +impl TimeBinnedStream +where + T: TimeBinnable, +{ + pub fn new(inp: MergeInp, edges: Vec, do_time_weight: bool, deadline: Instant) -> Self { + let deadline_fut = tokio::time::sleep_until(deadline.into()); + let deadline_fut = Box::pin(deadline_fut); + Self { + inp, + edges, + do_time_weight, + deadline, + deadline_fut, + range_complete: false, + binner: None, + done_data: false, + done: false, + complete: false, + } + } + + fn process_item(&mut self, mut item: T) -> () { + if self.binner.is_none() { + let binner = item.time_binner_new(self.edges.clone(), self.do_time_weight); + self.binner = Some(binner); + } + let binner = self.binner.as_mut().unwrap(); + binner.ingest(&mut item); + } +} + +impl Stream for TimeBinnedStream +where + T: TimeBinnable + Unpin, +{ + type Item = Sitemty<<::TimeBinner as TimeBinner>::Output>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + let span = tracing::span!(tracing::Level::TRACE, "poll"); + let _spg = span.enter(); + loop { + break if self.complete { + panic!("poll on complete") + } else if self.done { + self.complete = true; + Ready(None) + } else if self.done_data { + self.done = true; + if self.range_complete { + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } else { + continue; + } + } else { + match self.deadline_fut.poll_unpin(cx) { + Ready(()) => { + // TODO add timeout behavior + todo!(); + } + Pending => {} + } + match self.inp.poll_next_unpin(cx) { + Ready(Some(item)) => match item { + Ok(item) => match item { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + debug!("see RangeComplete"); + self.range_complete = true; + continue; + } + RangeCompletableItem::Data(item) => { + self.process_item(item); + if let Some(binner) = self.binner.as_mut() { + trace3!("bins ready count {}", binner.bins_ready_count()); + if binner.bins_ready_count() > 0 { + if let Some(bins) = binner.bins_ready() { + Ready(Some(sitem_data(bins))) + } else { + trace2!("bins ready but got nothing"); + Pending + } + } else { + trace3!("no bins ready yet"); + continue; + } + } else { + trace2!("processed item, but no binner yet"); + continue; + } + } + }, + StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), + StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), + }, + Err(e) => { + self.done_data = true; + Ready(Some(Err(e))) + } + }, + Ready(None) => { + trace2!("finish up"); + let self_range_complete = self.range_complete; + if let Some(binner) = self.binner.as_mut() { + trace2!("bins ready count before finish {}", binner.bins_ready_count()); + // TODO rework the finish logic + if self_range_complete { + binner.set_range_complete(); + } + binner.push_in_progress(false); + trace2!("bins ready count after finish {}", binner.bins_ready_count()); + if binner.bins_ready_count() > 0 { + if let Some(bins) = binner.bins_ready() { + self.done_data = true; + Ready(Some(sitem_data(bins))) + } else { + trace2!("bins ready but got nothing"); + self.done_data = true; + let e = Error::with_msg_no_trace(format!("bins ready but got nothing")); + Ready(Some(Err(e))) + } + } else { + trace2!("no bins ready yet"); + self.done_data = true; + continue; + } + } else { + trace2!("input stream finished, still no binner"); + self.done_data = true; + continue; + } + } + Pending => Pending, + } + }; + } + } +} diff --git a/taskrun/src/taskrun.rs b/taskrun/src/taskrun.rs index 8757069..21f2fdc 100644 --- a/taskrun/src/taskrun.rs +++ b/taskrun/src/taskrun.rs @@ -4,21 +4,17 @@ use crate::log::*; use err::Error; use std::future::Future; use std::panic; -use std::sync::atomic::AtomicUsize; -use std::sync::{Arc, Mutex, Once}; +use std::sync::{Arc, Mutex}; use tokio::runtime::Runtime; use tokio::task::JoinHandle; -static INIT_TRACING_ONCE: Once = Once::new(); - pub mod log { #[allow(unused_imports)] pub use tracing::{debug, error, info, trace, warn}; } -lazy_static::lazy_static! { - static ref RUNTIME: Mutex>> = Mutex::new(None); -} +static INIT_TRACING_ONCE: Mutex = Mutex::new(0); +static RUNTIME: Mutex>> = Mutex::new(None); pub fn get_runtime() -> Arc { get_runtime_opts(24, 128) @@ -153,24 +149,19 @@ fn tracing_init_inner() -> Result<(), Error> { } pub fn tracing_init() -> Result<(), ()> { - use std::sync::atomic::Ordering; - let is_good = Arc::new(AtomicUsize::new(0)); - { - let is_good = is_good.clone(); - INIT_TRACING_ONCE.call_once(move || match tracing_init_inner() { + let mut initg = INIT_TRACING_ONCE.lock().unwrap(); + if *initg == 0 { + match tracing_init_inner() { Ok(_) => { - is_good.store(1, Ordering::Release); + *initg = 1; } Err(e) => { - is_good.store(2, Ordering::Release); + *initg = 2; eprintln!("tracing_init_inner gave error {e}"); } - }); - } - let n = is_good.load(Ordering::Acquire); - if n == 2 { - Err(()) - } else if n == 1 { + } + Ok(()) + } else if *initg == 1 { Ok(()) } else { eprintln!("ERROR Unknown tracing state");