From 67bca9da5e8ae6be6465fb75e6e97dc2492996e8 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 11 Nov 2022 22:21:57 +0100 Subject: [PATCH] WIP merger tests --- err/src/lib.rs | 4 +- httpret/src/events.rs | 13 +- items/src/items.rs | 8 +- items_2/src/eventsdim0.rs | 4 + items_2/src/items_2.rs | 344 ++++++++----------------------------- items_2/src/merger.rs | 289 +++++++++++++++++++++++++++++++ items_2/src/test.rs | 207 ++++++++++++---------- netpod/src/netpod.rs | 14 +- nodenet/src/conn.rs | 3 - scyllaconn/src/bincache.rs | 5 +- 10 files changed, 504 insertions(+), 387 deletions(-) create mode 100644 items_2/src/merger.rs diff --git a/err/src/lib.rs b/err/src/lib.rs index 70d5bc6..58466a2 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -14,7 +14,7 @@ pub mod bt { pub use backtrace::Backtrace; } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum Reason { InternalError, BadRequest, @@ -24,7 +24,7 @@ pub enum Reason { /** The common error type for this application. */ -#[derive(Serialize, Deserialize)] +#[derive(Clone, PartialEq, Serialize, Deserialize)] pub struct Error { msg: String, trace_str: Option, diff --git a/httpret/src/events.rs b/httpret/src/events.rs index ae13796..052c7ad 100644 --- a/httpret/src/events.rs +++ b/httpret/src/events.rs @@ -4,7 +4,8 @@ use crate::{response, response_err, BodyStream, ToPublicResponse}; use futures_util::{Stream, StreamExt, TryStreamExt}; use http::{Method, Request, Response, StatusCode}; use hyper::Body; -use items_2::{binned_collected, empty_events_dyn, empty_events_dyn_2, ChannelEvents, ChannelEventsMerger}; +use items_2::merger::ChannelEventsMerger; +use items_2::{binned_collected, empty_events_dyn, empty_events_dyn_2, ChannelEvents}; use netpod::log::*; use netpod::query::{BinnedQuery, ChannelStateEventsQuery, PlainEventsQuery}; use netpod::{AggKind, BinnedRange, FromUrl, NodeConfigCached}; @@ -239,7 +240,6 @@ impl EventsHandlerScylla { cl.ingest(item.as_collectable_mut()); } ChannelEvents::Status(..) => {} - ChannelEvents::RangeComplete => {} }, Err(e) => { return Err(e.into()); @@ -360,9 +360,12 @@ impl BinnedHandlerScylla { x }) .map_err(|e| items_2::Error::from(format!("{e}"))); - let data_stream = Box::pin(data_stream) as _; - let state_stream = Box::pin(state_stream) as _; - let merged_stream = ChannelEventsMerger::new(vec![data_stream, state_stream]); + todo!(); + type Items = Pin> + Send>>; + let data_stream = Box::pin(data_stream) as Items; + let state_stream = Box::pin(state_stream) as Items; + let merged_stream = ChannelEventsMerger::new(todo!()); + //let merged_stream = ChannelEventsMerger::new(vec![data_stream, state_stream]); let merged_stream = Box::pin(merged_stream) as Pin + Send>>; let binned_collected = binned_collected( scalar_type.clone(), diff --git a/items/src/items.rs b/items/src/items.rs index c4a17e1..c732548 100644 --- a/items/src/items.rs +++ b/items/src/items.rs @@ -58,27 +58,27 @@ pub fn bool_is_false(j: &bool) -> bool { *j == false } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum RangeCompletableItem { RangeComplete, Data(T), } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum StatsItem { EventDataReadStats(EventDataReadStats), RangeFilterStats(RangeFilterStats), DiskStats(DiskStats), } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum StreamItem { DataItem(T), Log(LogItem), Stats(StatsItem), } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct LogItem { pub node_ix: u32, #[serde(with = "levelserde")] diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index a93b26f..0f29c88 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -573,6 +573,10 @@ impl Events for EventsDim0 { fn nty_id(&self) -> u32 { NTY::SUB } + + fn clone_dyn(&self) -> Box { + Box::new(self.clone()) + } } pub struct EventsDim0TimeBinner { diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index 6b538be..91d2ada 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -1,5 +1,6 @@ pub mod binsdim0; pub mod eventsdim0; +pub mod merger; pub mod streams; #[cfg(test)] pub mod test; @@ -9,6 +10,9 @@ use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; use items::FrameTypeInnerStatic; +use items::RangeCompletableItem; +use items::Sitemty; +use items::StreamItem; use items::SubFrId; use netpod::log::*; use netpod::timeunits::*; @@ -17,9 +21,7 @@ use serde::{Deserialize, Serialize, Serializer}; use std::any::Any; use std::collections::VecDeque; use std::fmt; -use std::ops::ControlFlow; use std::pin::Pin; -use std::task::{Context, Poll}; use std::time::Duration; use std::time::Instant; use streams::Collectable; @@ -139,6 +141,7 @@ pub enum ErrorKind { MismatchedType, } +// TODO stack error better #[derive(Debug, PartialEq)] pub struct Error { #[allow(unused)] @@ -167,6 +170,23 @@ impl From for Error { } } +// TODO this discards structure +impl From for Error { + fn from(e: err::Error) -> Self { + Self { + msg: Some(format!("{e}")), + kind: ErrorKind::General, + } + } +} + +// TODO this discards structure +impl From for err::Error { + fn from(e: Error) -> Self { + err::Error::with_msg_no_trace(format!("{e}")) + } +} + impl std::error::Error for Error {} impl serde::de::Error for Error { @@ -276,6 +296,7 @@ pub trait Events: fmt::Debug + Any + Collectable + TimeBinnable + Send + erased_ fn ts_min(&self) -> Option; fn ts_max(&self) -> Option; fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box; + fn clone_dyn(&self) -> Box; fn partial_eq_dyn(&self, other: &dyn Events) -> bool; fn serde_id(&self) -> &'static str; fn nty_id(&self) -> u32; @@ -483,8 +504,15 @@ impl MergableEvents for Box { pub enum ChannelEvents { Events(Box), Status(ConnStatusEvent), - // TODO the RangeComplete event would probably fit better on some outer layer: - RangeComplete, +} + +impl Clone for ChannelEvents { + fn clone(&self) -> Self { + match self { + Self::Events(arg0) => Self::Events(arg0.clone_dyn()), + Self::Status(arg0) => Self::Status(arg0.clone()), + } + } } mod serde_channel_events { @@ -510,7 +538,6 @@ mod serde_channel_events { ser.end() } ChannelEvents::Status(val) => serializer.serialize_newtype_variant(name, 1, "Status", val), - ChannelEvents::RangeComplete => serializer.serialize_unit_variant(name, 2, "RangeComplete"), } } } @@ -627,7 +654,6 @@ impl MergableEvents for ChannelEvents { match self { Events(k) => k.ts_min(), Status(k) => Some(k.ts), - RangeComplete => None, } } @@ -641,258 +667,6 @@ impl FrameTypeInnerStatic for ChannelEvents { const FRAME_TYPE_ID: u32 = items::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID; } -type MergeInp = Pin> + Send>>; - -pub struct ChannelEventsMerger { - inps: Vec>, - items: Vec>, - range_complete: bool, - done: bool, - done2: bool, - complete: bool, -} - -impl fmt::Debug for ChannelEventsMerger { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - let inps: Vec<_> = self.inps.iter().map(|x| x.is_some()).collect(); - fmt.debug_struct(std::any::type_name::()) - .field("inps", &inps) - .field("items", &self.items) - .field("range_complete", &self.range_complete) - .field("done", &self.done) - .field("done2", &self.done2) - .finish() - } -} - -impl ChannelEventsMerger { - pub fn new(inps: Vec) -> Self { - let n = inps.len(); - Self { - done: false, - done2: false, - complete: false, - inps: inps.into_iter().map(|x| Some(x)).collect(), - items: (0..n).into_iter().map(|_| None).collect(), - range_complete: false, - } - } - - fn process(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result, Error> { - use ControlFlow::*; - let mut tslows = [None, None]; - for (i1, itemopt) in self.items.iter_mut().enumerate() { - if let Some(item) = itemopt { - let t1 = item.ts_min(); - if let Some(t1) = t1 { - if let Some((_, a)) = tslows[0] { - if t1 < a { - tslows[1] = tslows[0]; - tslows[0] = Some((i1, t1)); - } else { - if let Some((_, b)) = tslows[1] { - if t1 < b { - tslows[1] = Some((i1, t1)); - } else { - // nothing to do - } - } else { - tslows[1] = Some((i1, t1)); - } - } - } else { - tslows[0] = Some((i1, t1)); - } - } else { - match item { - ChannelEvents::Events(_) => { - trace!("events item without ts min discovered {item:?}"); - itemopt.take(); - return Ok(Continue(())); - } - ChannelEvents::Status(_) => { - return Err(format!("channel status without timestamp").into()); - } - ChannelEvents::RangeComplete => { - trace!("--------------------- ChannelEvents::RangeComplete \n======================"); - *itemopt = None; - self.range_complete = true; - return Ok(Continue(())); - } - } - } - } - } - if let Some((il0, _tl0)) = tslows[0] { - if let Some((_il1, tl1)) = tslows[1] { - let item = self.items[il0].as_mut().unwrap(); - match item { - ChannelEvents::Events(item) => { - if let Some(th0) = item.ts_max() { - if th0 < tl1 { - let ret = self.items[il0].take().unwrap(); - Ok(Break(ret)) - } else { - let ritem = item.take_new_events_until_ts(tl1); - if item.len() == 0 { - // TODO should never be here - self.items[il0] = None; - } - Ok(Break(ChannelEvents::Events(ritem))) - } - } else { - // TODO should never be here because ts-max should always exist here. - let ritem = item.take_new_events_until_ts(tl1); - if item.len() == 0 {} - Ok(Break(ChannelEvents::Events(ritem))) - } - } - ChannelEvents::Status(_) => { - let ret = self.items[il0].take().unwrap(); - Ok(Break(ret)) - } - ChannelEvents::RangeComplete => Err(format!("RangeComplete considered in merge-lowest").into()), - } - } else { - let item = self.items[il0].as_mut().unwrap(); - match item { - ChannelEvents::Events(_) => { - let ret = self.items[il0].take().unwrap(); - Ok(Break(ret)) - } - ChannelEvents::Status(_) => { - let ret = self.items[il0].take().unwrap(); - Ok(Break(ret)) - } - ChannelEvents::RangeComplete => Err(format!("RangeComplete considered in merge-lowest").into()), - } - } - } else { - Err(format!("after low ts search nothing found").into()) - } - } - - fn refill(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow> { - use ControlFlow::*; - use Poll::*; - let mut has_pending = false; - for i1 in 0..self.inps.len() { - let item = &self.items[i1]; - if item.is_none() { - if let Some(inp) = &mut self.inps[i1] { - match inp.poll_next_unpin(cx) { - Ready(Some(Ok(k))) => { - if let ChannelEvents::Events(events) = &k { - if events.len() == 0 { - warn!("empty events item {events:?}"); - } else { - trace!("\nrefilled with events {}\nREFILLED\n{:?}\n\n", events.len(), events); - } - } - self.items[i1] = Some(k); - } - Ready(Some(Err(e))) => return Break(Ready(e)), - Ready(None) => { - self.inps[i1] = None; - } - Pending => { - has_pending = true; - } - } - } - } - } - if has_pending { - Break(Pending) - } else { - Continue(()) - } - } - - fn poll2(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow::Item>>> { - use ControlFlow::*; - use Poll::*; - let mut has_pending = false; - match Self::refill(Pin::new(&mut self), cx) { - Break(Ready(e)) => return Break(Ready(Some(Err(e)))), - Break(Pending) => { - has_pending = true; - } - Continue(()) => {} - } - let ninps = self.inps.iter().filter(|a| a.is_some()).count(); - let nitems = self.items.iter().filter(|a| a.is_some()).count(); - let nitemsmissing = self - .inps - .iter() - .zip(self.items.iter()) - .filter(|(a, b)| a.is_some() && b.is_none()) - .count(); - if ninps == 0 && nitems == 0 { - self.done = true; - Break(Ready(None)) - } else if nitemsmissing != 0 { - if !has_pending { - let e = Error::from(format!("missing but no pending")); - Break(Ready(Some(Err(e)))) - } else { - Break(Pending) - } - } else { - match Self::process(Pin::new(&mut self), cx) { - Ok(Break(item)) => Break(Ready(Some(Ok(item)))), - Ok(Continue(())) => Continue(()), - Err(e) => Break(Ready(Some(Err(e)))), - } - } - } -} - -impl Stream for ChannelEventsMerger { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - //let self_name = std::any::type_name::(); - //eprintln!("{self_name} poll_next"); - loop { - break if self.complete { - panic!("poll after complete"); - } else if self.done2 { - self.complete = true; - Ready(None) - } else if self.done { - self.done2 = true; - if self.range_complete { - trace!("MERGER EMITTING ChannelEvents::RangeComplete"); - Ready(Some(Ok(ChannelEvents::RangeComplete))) - } else { - continue; - } - } else { - match Self::poll2(self.as_mut(), cx) { - ControlFlow::Continue(()) => continue, - ControlFlow::Break(k) => { - match &k { - Ready(Some(Ok(ChannelEvents::Events(item)))) => { - trace!("\n\nMERGER EMITTING\n{:?}\n\n", item); - } - Ready(Some(Ok(ChannelEvents::RangeComplete))) => { - trace!("\nLOGIC ERROR MERGER EMITTING PLAIN ChannelEvents::RangeComplete"); - } - Ready(Some(Err(_))) => { - self.done = true; - } - _ => {} - } - k - } - } - }; - } - } -} - // TODO do this with some blanket impl: impl Collectable for Box { fn new_collector(&self, bin_count_exp: u32) -> Box { @@ -911,8 +685,9 @@ pub async fn binned_collected( agg_kind: AggKind, edges: Vec, timeout: Duration, - inp: Pin> + Send>>, + inp: Pin> + Send>>, ) -> Result, Error> { + info!("binned_collected"); let deadline = Instant::now() + timeout; let mut did_timeout = false; let bin_count_exp = edges.len().max(2) as u32 - 1; @@ -925,10 +700,10 @@ pub async fn binned_collected( bin_count_exp: u32, force: bool, ) -> Result<(), Error> { - //info!("bins_ready_count: {}", binner.bins_ready_count()); + info!("flush_binned bins_ready_count: {}", binner.bins_ready_count()); if force { if binner.bins_ready_count() == 0 { - warn!("cycle the binner"); + warn!("cycle the binner forced"); binner.cycle(); } else { warn!("binner was some ready, do nothing"); @@ -938,7 +713,7 @@ pub async fn binned_collected( let ready = binner.bins_ready(); match ready { Some(mut ready) => { - trace!("binned_collected ready {ready:?}"); + info!("binned_collected ready {ready:?}"); if coll.is_none() { *coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp)); } @@ -955,7 +730,9 @@ pub async fn binned_collected( let mut coll = None; let mut binner = None; let empty_item = empty_events_dyn_2(&scalar_type, &shape, &AggKind::TimeWeightedScalar); - let empty_stream = futures_util::stream::once(futures_util::future::ready(Ok(ChannelEvents::Events(empty_item)))); + let empty_stream = futures_util::stream::once(futures_util::future::ready(Ok(StreamItem::DataItem( + RangeCompletableItem::Data(ChannelEvents::Events(empty_item)), + )))); let mut stream = empty_stream.chain(inp); loop { let item = futures_util::select! { @@ -972,22 +749,34 @@ pub async fn binned_collected( } }; match item { - ChannelEvents::Events(events) => { - trace!("binned_collected sees\n{:?}", events); - if binner.is_none() { - let bb = events.as_time_binnable().time_binner_new(edges.clone(), do_time_weight); - binner = Some(bb); + StreamItem::DataItem(k) => match k { + RangeCompletableItem::RangeComplete => { + warn!("binned_collected TODO RangeComplete"); + did_range_complete = true; } - let binner = binner.as_mut().unwrap(); - binner.ingest(events.as_time_binnable()); - flush_binned(binner, &mut coll, bin_count_exp, false)?; + RangeCompletableItem::Data(k) => match k { + ChannelEvents::Events(events) => { + info!("binned_collected sees\n{:?}", events); + if binner.is_none() { + let bb = events.as_time_binnable().time_binner_new(edges.clone(), do_time_weight); + binner = Some(bb); + } + let binner = binner.as_mut().unwrap(); + binner.ingest(events.as_time_binnable()); + flush_binned(binner, &mut coll, bin_count_exp, false)?; + } + ChannelEvents::Status(item) => { + info!("{:?}", item); + } + }, + }, + StreamItem::Log(item) => { + // TODO collect also errors here? + info!("{:?}", item); } - ChannelEvents::Status(_) => { - warn!("binned_collected TODO Status"); - } - ChannelEvents::RangeComplete => { - warn!("binned_collected TODO RangeComplete"); - did_range_complete = true; + StreamItem::Stats(item) => { + // TODO do something with the stats + info!("{:?}", item); } } } @@ -996,6 +785,7 @@ pub async fn binned_collected( binner.set_range_complete(); } if !did_timeout { + warn!("cycle the binner"); binner.cycle(); } flush_binned(&mut binner, &mut coll, bin_count_exp, false)?; diff --git a/items_2/src/merger.rs b/items_2/src/merger.rs new file mode 100644 index 0000000..c676701 --- /dev/null +++ b/items_2/src/merger.rs @@ -0,0 +1,289 @@ +use crate::{ChannelEvents, Error, MergableEvents}; +use futures_util::{Stream, StreamExt}; +use items::{RangeCompletableItem, Sitemty, StreamItem}; +use netpod::log::*; +use std::fmt; +use std::ops::ControlFlow; +use std::pin::Pin; +use std::task::{Context, Poll}; + +type MergeInp = Pin> + Send>>; + +pub struct ChannelEventsMerger { + inps: Vec>, + items: Vec>, + range_complete: bool, + done: bool, + done2: bool, + complete: bool, +} + +impl fmt::Debug for ChannelEventsMerger { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let inps: Vec<_> = self.inps.iter().map(|x| x.is_some()).collect(); + fmt.debug_struct(std::any::type_name::()) + .field("inps", &inps) + .field("items", &self.items) + .field("range_complete", &self.range_complete) + .field("done", &self.done) + .field("done2", &self.done2) + .finish() + } +} + +impl ChannelEventsMerger { + pub fn new(inps: Vec) -> Self { + let n = inps.len(); + Self { + done: false, + done2: false, + complete: false, + inps: inps.into_iter().map(|x| Some(x)).collect(), + items: (0..n).into_iter().map(|_| None).collect(), + range_complete: false, + } + } + + fn process(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result, Error> { + use ControlFlow::*; + let mut tslows = [None, None]; + for (i1, itemopt) in self.items.iter_mut().enumerate() { + if let Some(item) = itemopt { + let t1 = item.ts_min(); + if let Some(t1) = t1 { + if let Some((_, a)) = tslows[0] { + if t1 < a { + tslows[1] = tslows[0]; + tslows[0] = Some((i1, t1)); + } else { + if let Some((_, b)) = tslows[1] { + if t1 < b { + tslows[1] = Some((i1, t1)); + } else { + // nothing to do + } + } else { + tslows[1] = Some((i1, t1)); + } + } + } else { + tslows[0] = Some((i1, t1)); + } + } else { + match item { + ChannelEvents::Events(_) => { + trace!("events item without ts min discovered {item:?}"); + itemopt.take(); + return Ok(Continue(())); + } + ChannelEvents::Status(_) => { + return Err(format!("channel status without timestamp").into()); + } + } + } + } + } + if let Some((il0, _tl0)) = tslows[0] { + if let Some((_il1, tl1)) = tslows[1] { + let item = self.items[il0].as_mut().unwrap(); + match item { + ChannelEvents::Events(item) => { + if let Some(th0) = item.ts_max() { + if th0 < tl1 { + let ret = self.items[il0].take().unwrap(); + Ok(Break(ret)) + } else { + let ritem = item.take_new_events_until_ts(tl1); + if item.len() == 0 { + // TODO should never be here + self.items[il0] = None; + } + Ok(Break(ChannelEvents::Events(ritem))) + } + } else { + // TODO should never be here because ts-max should always exist here. + let ritem = item.take_new_events_until_ts(tl1); + if item.len() == 0 {} + Ok(Break(ChannelEvents::Events(ritem))) + } + } + ChannelEvents::Status(_) => { + let ret = self.items[il0].take().unwrap(); + Ok(Break(ret)) + } + } + } else { + let item = self.items[il0].as_mut().unwrap(); + match item { + ChannelEvents::Events(_) => { + let ret = self.items[il0].take().unwrap(); + Ok(Break(ret)) + } + ChannelEvents::Status(_) => { + let ret = self.items[il0].take().unwrap(); + Ok(Break(ret)) + } + } + } + } else { + Err(format!("after low ts search nothing found").into()) + } + } + + fn refill(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow> { + use ControlFlow::*; + use Poll::*; + let mut has_pending = false; + for i1 in 0..self.inps.len() { + let item = &self.items[i1]; + if item.is_none() { + while let Some(inp) = &mut self.inps[i1] { + match inp.poll_next_unpin(cx) { + Ready(Some(Ok(k))) => { + match k { + StreamItem::DataItem(k) => match k { + RangeCompletableItem::RangeComplete => { + trace!("--------------------- ChannelEvents::RangeComplete \n======================"); + // TODO track range complete for all inputs, it's only complete if all inputs are complete. + self.range_complete = true; + eprintln!("TODO inp RangeComplete which does not fill slot"); + } + RangeCompletableItem::Data(k) => { + match &k { + ChannelEvents::Events(events) => { + if events.len() == 0 { + warn!("empty events item {events:?}"); + } else { + trace!( + "\nrefilled with events {}\nREFILLED\n{:?}\n\n", + events.len(), + events + ); + } + } + ChannelEvents::Status(_) => { + eprintln!("TODO inp Status which does not fill slot"); + } + } + self.items[i1] = Some(k); + break; + } + }, + StreamItem::Log(_) => { + eprintln!("TODO inp Log which does not fill slot"); + } + StreamItem::Stats(_) => { + eprintln!("TODO inp Stats which does not fill slot"); + } + } + } + Ready(Some(Err(e))) => return Break(Ready(e.into())), + Ready(None) => { + self.inps[i1] = None; + } + Pending => { + has_pending = true; + } + } + } + } + } + if has_pending { + Break(Pending) + } else { + Continue(()) + } + } + + fn poll2(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow>>> { + use ControlFlow::*; + use Poll::*; + let mut has_pending = false; + match Self::refill(Pin::new(&mut self), cx) { + Break(Ready(e)) => return Break(Ready(Some(Err(e)))), + Break(Pending) => { + has_pending = true; + } + Continue(()) => {} + } + let ninps = self.inps.iter().filter(|a| a.is_some()).count(); + let nitems = self.items.iter().filter(|a| a.is_some()).count(); + let nitemsmissing = self + .inps + .iter() + .zip(self.items.iter()) + .filter(|(a, b)| a.is_some() && b.is_none()) + .count(); + if ninps == 0 && nitems == 0 { + self.done = true; + Break(Ready(None)) + } else if nitemsmissing != 0 { + if !has_pending { + let e = Error::from(format!("missing but no pending")); + Break(Ready(Some(Err(e)))) + } else { + Break(Pending) + } + } else { + match Self::process(Pin::new(&mut self), cx) { + Ok(Break(item)) => Break(Ready(Some(Ok(item)))), + Ok(Continue(())) => Continue(()), + Err(e) => Break(Ready(Some(Err(e)))), + } + } + } +} + +impl Stream for ChannelEventsMerger { + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + const NAME: &str = "ChannelEventsMerger"; + let span = span!(Level::TRACE, NAME); + let _spanguard = span.enter(); + loop { + break if self.complete { + panic!("poll after complete"); + } else if self.done2 { + self.complete = true; + Ready(None) + } else if self.done { + self.done2 = true; + if self.range_complete { + trace!("MERGER EMITTING ChannelEvents::RangeComplete"); + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } else { + continue; + } + } else { + match Self::poll2(self.as_mut(), cx) { + ControlFlow::Continue(()) => continue, + ControlFlow::Break(k) => match k { + Ready(Some(Ok(ChannelEvents::Events(item)))) => { + trace!("\n\nMERGER EMITTING\n{:?}\n\n", item); + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data( + ChannelEvents::Events(item), + ))))) + } + Ready(Some(Ok(ChannelEvents::Status(item)))) => { + trace!("\n\nMERGER EMITTING\n{:?}\n\n", item); + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data( + ChannelEvents::Status(item), + ))))) + } + Ready(Some(Err(e))) => { + self.done = true; + Ready(Some(Err(e.into()))) + } + Ready(None) => { + self.done = true; + continue; + } + Pending => Pending, + }, + } + }; + } + } +} diff --git a/items_2/src/test.rs b/items_2/src/test.rs index 7e405c0..d6d2890 100644 --- a/items_2/src/test.rs +++ b/items_2/src/test.rs @@ -1,9 +1,11 @@ use crate::binsdim0::BinsDim0CollectedResult; use crate::eventsdim0::EventsDim0; -use crate::{binned_collected, ChannelEvents, ChannelEventsMerger, Empty, IsoDateTime}; +use crate::merger::ChannelEventsMerger; +use crate::{binned_collected, ChannelEvents, Empty, Events, IsoDateTime}; use crate::{ConnStatus, ConnStatusEvent, Error}; use chrono::{TimeZone, Utc}; use futures_util::StreamExt; +use items::{RangeCompletableItem, Sitemty, StreamItem}; use netpod::timeunits::*; use netpod::{AggKind, BinnedRange, NanoRange, ScalarType, Shape}; use std::time::Duration; @@ -11,20 +13,22 @@ use std::time::Duration; #[test] fn merge01() { let fut = async { - let mut events_vec1 = Vec::new(); - let mut events_vec2 = Vec::new(); + let mut events_vec1: Vec> = Vec::new(); + let mut events_vec2: Vec> = Vec::new(); { let mut events = EventsDim0::empty(); for i in 0..10 { events.push(i * 100, i, i as f32 * 100.); } - events_vec1.push(Ok(ChannelEvents::Events(Box::new(events.clone())))); - events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone())))); + let cev = ChannelEvents::Events(Box::new(events.clone())); + events_vec1.push(Ok(StreamItem::DataItem(RangeCompletableItem::Data(cev)))); + let cev = ChannelEvents::Events(Box::new(events.clone())); + events_vec2.push(Ok(StreamItem::DataItem(RangeCompletableItem::Data(cev)))); } let inp1 = events_vec1; let inp1 = futures_util::stream::iter(inp1); let inp1 = Box::pin(inp1); - let inp2: Vec> = Vec::new(); + let inp2: Vec> = Vec::new(); let inp2 = futures_util::stream::iter(inp2); let inp2 = Box::pin(inp2); let mut merger = ChannelEventsMerger::new(vec![inp1, inp2]); @@ -39,91 +43,106 @@ fn merge01() { #[test] fn merge02() { let fut = async { - let mut events_vec1 = Vec::new(); - let mut events_vec2 = Vec::new(); - { + let events_vec1 = { + let mut vec = Vec::new(); let mut events = EventsDim0::empty(); for i in 0..10 { events.push(i * 100, i, i as f32 * 100.); } - events_vec1.push(Ok(ChannelEvents::Events(Box::new(events.clone())))); - events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone())))); - } - { + push_evd0(&mut vec, Box::new(events.clone())); let mut events = EventsDim0::empty(); for i in 10..20 { events.push(i * 100, i, i as f32 * 100.); } - events_vec1.push(Ok(ChannelEvents::Events(Box::new(events.clone())))); - events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone())))); - } + push_evd0(&mut vec, Box::new(events.clone())); + vec + }; + let exp = events_vec1.clone(); let inp1 = events_vec1; let inp1 = futures_util::stream::iter(inp1); let inp1 = Box::pin(inp1); - let inp2: Vec> = Vec::new(); + let inp2: Vec> = Vec::new(); let inp2 = futures_util::stream::iter(inp2); let inp2 = Box::pin(inp2); let mut merger = ChannelEventsMerger::new(vec![inp1, inp2]); let item = merger.next().await; - assert_eq!(item.as_ref(), events_vec2.get(0)); + assert_eq!(item.as_ref(), exp.get(0)); let item = merger.next().await; - assert_eq!(item.as_ref(), events_vec2.get(1)); + assert_eq!(item.as_ref(), exp.get(1)); let item = merger.next().await; assert_eq!(item.as_ref(), None); }; tokio::runtime::Runtime::new().unwrap().block_on(fut); } +fn push_evd0(vec: &mut Vec>, events: Box) { + let cev = ChannelEvents::Events(events); + vec.push(Ok(StreamItem::DataItem(RangeCompletableItem::Data(cev)))); +} + #[test] fn merge03() { let fut = async { - let mut events_vec1 = Vec::new(); - { + let events_vec1 = { + let mut vec = Vec::new(); let mut events = EventsDim0::empty(); for i in 0..10 { events.push(i * 100, i, i as f32 * 100.); } - events_vec1.push(Ok(ChannelEvents::Events(Box::new(events)))); + push_evd0(&mut vec, Box::new(events)); let mut events = EventsDim0::empty(); for i in 10..20 { events.push(i * 100, i, i as f32 * 100.); } - events_vec1.push(Ok(ChannelEvents::Events(Box::new(events.clone())))); - } - let events_vec1 = events_vec1; - let mut events_vec2 = Vec::new(); - { + push_evd0(&mut vec, Box::new(events)); + vec + }; + let events_vec2 = { + let mut vec = Vec::new(); let mut events = EventsDim0::empty(); for i in 0..10 { events.push(i * 100, i, i as f32 * 100.); } - events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone())))); + push_evd0(&mut vec, Box::new(events)); let mut events = EventsDim0::empty(); for i in 10..12 { events.push(i * 100, i, i as f32 * 100.); } - events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone())))); + push_evd0(&mut vec, Box::new(events)); let mut events = EventsDim0::empty(); for i in 12..20 { events.push(i * 100, i, i as f32 * 100.); } - events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone())))); - } - let events_vec2 = events_vec2; + push_evd0(&mut vec, Box::new(events)); + vec + }; - let inp2_events_a: Vec> = vec![Ok(ChannelEvents::Status(ConnStatusEvent { - ts: 1199, - status: ConnStatus::Disconnect, - }))]; - let inp2_events_b: Vec> = vec![Ok(ChannelEvents::Status(ConnStatusEvent { - ts: 1199, - status: ConnStatus::Disconnect, - }))]; + let inp2_events_a = { + let ev = ConnStatusEvent { + ts: 1199, + status: ConnStatus::Disconnect, + }; + let item: Sitemty = Ok(StreamItem::DataItem(RangeCompletableItem::Data( + ChannelEvents::Status(ev), + ))); + vec![item] + }; + + let inp2_events_b = { + let ev = ConnStatusEvent { + ts: 1199, + status: ConnStatus::Disconnect, + }; + let item: Sitemty = Ok(StreamItem::DataItem(RangeCompletableItem::Data( + ChannelEvents::Status(ev), + ))); + vec![item] + }; let inp1 = events_vec1; let inp1 = futures_util::stream::iter(inp1); let inp1 = Box::pin(inp1); - let inp2: Vec> = inp2_events_a; + let inp2: Vec> = inp2_events_a; let inp2 = futures_util::stream::iter(inp2); let inp2 = Box::pin(inp2); let mut merger = ChannelEventsMerger::new(vec![inp1, inp2]); @@ -144,15 +163,17 @@ fn merge03() { #[test] fn bin01() { let fut = async { - let mut events_vec1 = Vec::new(); - for j in 0..2 { - let mut events = EventsDim0::empty(); - for i in 10 * j..10 * (1 + j) { - events.push(SEC * i, i, 17f32); + let inp1 = { + let mut vec = Vec::new(); + for j in 0..2 { + let mut events = EventsDim0::empty(); + for i in 10 * j..10 * (1 + j) { + events.push(SEC * i, i, 17f32); + } + push_evd0(&mut vec, Box::new(events)); } - events_vec1.push(Ok(ChannelEvents::Events(Box::new(events)))); - } - let inp1 = events_vec1; + vec + }; let inp1 = futures_util::stream::iter(inp1); let inp1 = Box::pin(inp1); let inp2 = Box::pin(futures_util::stream::empty()) as _; @@ -165,36 +186,44 @@ fn bin01() { while let Some(item) = stream.next().await { let item = item?; match item { - ChannelEvents::Events(events) => { - if binner.is_none() { - let bb = events.as_time_binnable().time_binner_new(edges.clone(), do_time_weight); - binner = Some(bb); - } - let binner = binner.as_mut().unwrap(); - binner.ingest(events.as_time_binnable()); - eprintln!("bins_ready_count: {}", binner.bins_ready_count()); - if binner.bins_ready_count() > 0 { - let ready = binner.bins_ready(); - match ready { - Some(mut ready) => { - eprintln!("ready {ready:?}"); - if coll.is_none() { - coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp)); - } - let cl = coll.as_mut().unwrap(); - cl.ingest(ready.as_collectable_mut()); + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => todo!(), + RangeCompletableItem::Data(item) => match item { + ChannelEvents::Events(events) => { + if binner.is_none() { + let bb = events.as_time_binnable().time_binner_new(edges.clone(), do_time_weight); + binner = Some(bb); } - None => { - return Err(format!("bins_ready_count but no result").into()); + let binner = binner.as_mut().unwrap(); + binner.ingest(events.as_time_binnable()); + eprintln!("bins_ready_count: {}", binner.bins_ready_count()); + if binner.bins_ready_count() > 0 { + let ready = binner.bins_ready(); + match ready { + Some(mut ready) => { + eprintln!("ready {ready:?}"); + if coll.is_none() { + coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp)); + } + let cl = coll.as_mut().unwrap(); + cl.ingest(ready.as_collectable_mut()); + } + None => { + return Err(format!("bins_ready_count but no result").into()); + } + } } } - } + ChannelEvents::Status(_) => { + eprintln!("TODO Status"); + } + }, + }, + StreamItem::Log(_) => { + eprintln!("TODO Log"); } - ChannelEvents::Status(_) => { - eprintln!("TODO Status"); - } - ChannelEvents::RangeComplete => { - eprintln!("TODO RangeComplete"); + StreamItem::Stats(_) => { + eprintln!("TODO Stats"); } } } @@ -249,26 +278,31 @@ fn bin02() { events.push(t, t, val(t)); t += MS * 100; } - events_vec1.push(Ok(ChannelEvents::Events(Box::new(events)))); + let cev = ChannelEvents::Events(Box::new(events)); + events_vec1.push(Ok(StreamItem::DataItem(RangeCompletableItem::Data(cev)))); } - events_vec1.push(Ok(ChannelEvents::RangeComplete)); + events_vec1.push(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))); let inp1 = events_vec1; let inp1 = futures_util::stream::iter(inp1); let inp1 = Box::pin(inp1); let inp2 = Box::pin(futures_util::stream::empty()) as _; let stream = ChannelEventsMerger::new(vec![inp1, inp2]); - let range = NanoRange { - beg: TSBASE + SEC * 1, - end: TSBASE + SEC * 10, - }; - let covering = BinnedRange::covering_range(range, 3).map_err(|e| format!("{e}"))?; - assert_eq!(covering.edges().len(), 10); + if false { + // covering_range result is subject to adjustments, instead, manually choose bin edges + let range = NanoRange { + beg: TSBASE + SEC * 1, + end: TSBASE + SEC * 10, + }; + let covering = BinnedRange::covering_range(range, 3).map_err(|e| format!("{e}"))?; + assert_eq!(covering.edges().len(), 6); + } + let edges = (0..10).into_iter().map(|x| TSBASE + SEC * 1 + SEC * x).collect(); let stream = Box::pin(stream); let collected = binned_collected( ScalarType::F32, Shape::Scalar, AggKind::TimeWeightedScalar, - covering.edges(), + edges, Duration::from_millis(2000), stream, ) @@ -294,9 +328,10 @@ fn bin03() { events.push(t, t, val(t)); t += MS * 100; } - events_vec1.push(Ok(ChannelEvents::Events(Box::new(events)))); + let cev = ChannelEvents::Events(Box::new(events)); + events_vec1.push(Ok(StreamItem::DataItem(RangeCompletableItem::Data(cev)))); } - events_vec1.push(Ok(ChannelEvents::RangeComplete)); + events_vec1.push(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))); let inp1 = events_vec1; let inp1 = futures_util::stream::iter(inp1).enumerate().then(|(i, k)| async move { if i == 4 { diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 2a12c6a..c5c0c8c 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -1640,7 +1640,7 @@ pub mod log { pub use tracing::{debug, error, event, info, span, trace, warn, Level}; } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct EventDataReadStats { pub parsed_bytes: u64, } @@ -1655,7 +1655,7 @@ impl EventDataReadStats { } } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct RangeFilterStats { pub events_pre: u64, pub events_post: u64, @@ -1672,7 +1672,7 @@ impl RangeFilterStats { } } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum DiskStats { OpenStats(OpenStats), SeekStats(SeekStats), @@ -1680,7 +1680,7 @@ pub enum DiskStats { ReadExactStats(ReadExactStats), } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct OpenStats { pub duration: Duration, } @@ -1691,7 +1691,7 @@ impl OpenStats { } } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct SeekStats { pub duration: Duration, } @@ -1702,7 +1702,7 @@ impl SeekStats { } } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct ReadStats { pub duration: Duration, } @@ -1713,7 +1713,7 @@ impl ReadStats { } } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct ReadExactStats { pub duration: Duration, } diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 65c8753..fe1e370 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -169,9 +169,6 @@ async fn events_conn_handler_inner_try( let item = items::scalarevents::ScalarEvents::::empty(); Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) } - items_2::ChannelEvents::RangeComplete => { - Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) - } items_2::ChannelEvents::Status(_item) => todo!(), }, Err(e) => Err(e), diff --git a/scyllaconn/src/bincache.rs b/scyllaconn/src/bincache.rs index 0976cf0..c749ac0 100644 --- a/scyllaconn/src/bincache.rs +++ b/scyllaconn/src/bincache.rs @@ -383,6 +383,8 @@ pub async fn fetch_uncached_binned_events( ))); } }; + // TODO as soon we encounter RangeComplete we just: + // complete = true; match item { Ok(ChannelEvents::Events(item)) => { time_binner.ingest(item.as_time_binnable()); @@ -394,9 +396,6 @@ pub async fn fetch_uncached_binned_events( "unexpected read of channel status events" ))); } - Ok(ChannelEvents::RangeComplete) => { - complete = true; - } Err(e) => return Err(e), } }