From 0f29eac2b57ee16074b4be1311d88f727d0ea451 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 13 Feb 2023 09:56:53 +0100 Subject: [PATCH] Remove unused --- items_2/src/channelevents.rs | 19 --- items_2/src/eventsdim0.rs | 25 +++ items_2/src/items_2.rs | 1 - items_2/src/merger_cev.rs | 295 ----------------------------------- items_2/src/test.rs | 11 +- 5 files changed, 30 insertions(+), 321 deletions(-) delete mode 100644 items_2/src/merger_cev.rs diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index 28c6b5b..fc77ad8 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -1,5 +1,4 @@ use crate::merger; -use crate::merger_cev::MergeableCev; use crate::Events; use items::FrameType; use items::FrameTypeInnerStatic; @@ -471,24 +470,6 @@ impl PartialEq for ChannelEvents { } } -impl MergeableCev for ChannelEvents { - fn ts_min(&self) -> Option { - use ChannelEvents::*; - match self { - Events(k) => k.ts_min(), - Status(k) => match k { - Some(k) => Some(k.ts), - None => None, - }, - } - } - - fn ts_max(&self) -> Option { - error!("TODO impl MergableEvents for ChannelEvents"); - err::todoval() - } -} - impl crate::merger::Mergeable for ChannelEvents { fn len(&self) -> usize { match self { diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 54b5e14..acbdca5 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -1218,6 +1218,31 @@ mod test_frame { } } +#[cfg(test)] +mod test_serde_opt { + use super::*; + + #[derive(Serialize)] + struct A { + a: Option, + #[serde(default)] + b: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + c: Option, + } + + #[test] + fn test_a() { + let s = serde_json::to_string(&A { + a: None, + b: None, + c: None, + }) + .unwrap(); + assert_eq!(s, r#"{"a":null,"b":null}"#); + } +} + /* TODO adapt and enable #[test] diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index dcce32b..04784e8 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -6,7 +6,6 @@ pub mod eventsdim0; pub mod eventsdim1; pub mod eventsxbindim0; pub mod merger; -pub mod merger_cev; pub mod streams; #[cfg(test)] pub mod test; diff --git a/items_2/src/merger_cev.rs b/items_2/src/merger_cev.rs deleted file mode 100644 index 5c55848..0000000 --- a/items_2/src/merger_cev.rs +++ /dev/null @@ -1,295 +0,0 @@ -use crate::{ChannelEvents, Error}; -use futures_util::{Stream, StreamExt}; -use items::{RangeCompletableItem, Sitemty, StreamItem}; -use netpod::log::*; -use std::any::Any; -use std::fmt; -use std::ops::ControlFlow; -use std::pin::Pin; -use std::task::{Context, Poll}; - -pub trait MergeableCev: Any { - fn ts_min(&self) -> Option; - fn ts_max(&self) -> Option; -} - -type MergeInp = Pin> + Send>>; - -pub struct ChannelEventsMerger { - inps: Vec>, - items: Vec>, - range_complete: bool, - done: bool, - done2: bool, - complete: bool, -} - -impl fmt::Debug for ChannelEventsMerger { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - let inps: Vec<_> = self.inps.iter().map(|x| x.is_some()).collect(); - fmt.debug_struct(std::any::type_name::()) - .field("inps", &inps) - .field("items", &self.items) - .field("range_complete", &self.range_complete) - .field("done", &self.done) - .field("done2", &self.done2) - .finish() - } -} - -impl ChannelEventsMerger { - pub fn new(inps: Vec) -> Self { - let n = inps.len(); - Self { - done: false, - done2: false, - complete: false, - inps: inps.into_iter().map(|x| Some(x)).collect(), - items: (0..n).into_iter().map(|_| None).collect(), - range_complete: false, - } - } - - fn process(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result, Error> { - use ControlFlow::*; - let mut tslows = [None, None]; - for (i1, itemopt) in self.items.iter_mut().enumerate() { - if let Some(item) = itemopt { - let t1 = item.ts_min(); - if let Some(t1) = t1 { - if let Some((_, a)) = tslows[0] { - if t1 < a { - tslows[1] = tslows[0]; - tslows[0] = Some((i1, t1)); - } else { - if let Some((_, b)) = tslows[1] { - if t1 < b { - tslows[1] = Some((i1, t1)); - } else { - // nothing to do - } - } else { - tslows[1] = Some((i1, t1)); - } - } - } else { - tslows[0] = Some((i1, t1)); - } - } else { - match item { - ChannelEvents::Events(_) => { - trace!("events item without ts min discovered {item:?}"); - itemopt.take(); - return Ok(Continue(())); - } - ChannelEvents::Status(_) => { - return Err(format!("channel status without timestamp").into()); - } - } - } - } - } - if let Some((il0, _tl0)) = tslows[0] { - if let Some((_il1, tl1)) = tslows[1] { - let item = self.items[il0].as_mut().unwrap(); - match item { - ChannelEvents::Events(item) => { - if let Some(th0) = item.ts_max() { - if th0 < tl1 { - let ret = self.items[il0].take().unwrap(); - Ok(Break(ret)) - } else { - let ritem = item.take_new_events_until_ts(tl1); - if item.len() == 0 { - // TODO should never be here - self.items[il0] = None; - } - Ok(Break(ChannelEvents::Events(ritem))) - } - } else { - // TODO should never be here because ts-max should always exist here. - let ritem = item.take_new_events_until_ts(tl1); - if item.len() == 0 {} - Ok(Break(ChannelEvents::Events(ritem))) - } - } - ChannelEvents::Status(_) => { - let ret = self.items[il0].take().unwrap(); - Ok(Break(ret)) - } - } - } else { - let item = self.items[il0].as_mut().unwrap(); - match item { - ChannelEvents::Events(_) => { - let ret = self.items[il0].take().unwrap(); - Ok(Break(ret)) - } - ChannelEvents::Status(_) => { - let ret = self.items[il0].take().unwrap(); - Ok(Break(ret)) - } - } - } - } else { - Err(format!("after low ts search nothing found").into()) - } - } - - fn refill(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow> { - use ControlFlow::*; - use Poll::*; - let mut has_pending = false; - for i1 in 0..self.inps.len() { - let item = &self.items[i1]; - if item.is_none() { - while let Some(inp) = &mut self.inps[i1] { - match inp.poll_next_unpin(cx) { - Ready(Some(Ok(k))) => { - match k { - StreamItem::DataItem(k) => match k { - RangeCompletableItem::RangeComplete => { - trace!("--------------------- ChannelEvents::RangeComplete \n======================"); - // TODO track range complete for all inputs, it's only complete if all inputs are complete. - self.range_complete = true; - eprintln!("TODO inp RangeComplete which does not fill slot"); - } - RangeCompletableItem::Data(k) => { - match &k { - ChannelEvents::Events(events) => { - if events.len() == 0 { - warn!("empty events item {events:?}"); - } else { - trace!( - "\nrefilled with events {}\nREFILLED\n{:?}\n\n", - events.len(), - events - ); - } - } - ChannelEvents::Status(_) => { - eprintln!("TODO inp Status which does not fill slot"); - } - } - self.items[i1] = Some(k); - break; - } - }, - StreamItem::Log(_) => { - eprintln!("TODO inp Log which does not fill slot"); - } - StreamItem::Stats(_) => { - eprintln!("TODO inp Stats which does not fill slot"); - } - } - } - Ready(Some(Err(e))) => return Break(Ready(e.into())), - Ready(None) => { - self.inps[i1] = None; - } - Pending => { - has_pending = true; - } - } - } - } - } - if has_pending { - Break(Pending) - } else { - Continue(()) - } - } - - fn poll2(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow>>> { - use ControlFlow::*; - use Poll::*; - let mut has_pending = false; - match Self::refill(Pin::new(&mut self), cx) { - Break(Ready(e)) => return Break(Ready(Some(Err(e)))), - Break(Pending) => { - has_pending = true; - } - Continue(()) => {} - } - let ninps = self.inps.iter().filter(|a| a.is_some()).count(); - let nitems = self.items.iter().filter(|a| a.is_some()).count(); - let nitemsmissing = self - .inps - .iter() - .zip(self.items.iter()) - .filter(|(a, b)| a.is_some() && b.is_none()) - .count(); - if ninps == 0 && nitems == 0 { - self.done = true; - Break(Ready(None)) - } else if nitemsmissing != 0 { - if !has_pending { - let e = Error::from(format!("missing but no pending")); - Break(Ready(Some(Err(e)))) - } else { - Break(Pending) - } - } else { - match Self::process(Pin::new(&mut self), cx) { - Ok(Break(item)) => Break(Ready(Some(Ok(item)))), - Ok(Continue(())) => Continue(()), - Err(e) => Break(Ready(Some(Err(e)))), - } - } - } -} - -impl Stream for ChannelEventsMerger { - type Item = Sitemty; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - const NAME: &str = "ChannelEventsMerger"; - let span = span!(Level::TRACE, NAME); - let _spanguard = span.enter(); - loop { - break if self.complete { - panic!("poll after complete"); - } else if self.done2 { - self.complete = true; - Ready(None) - } else if self.done { - self.done2 = true; - if self.range_complete { - trace!("MERGER EMITTING ChannelEvents::RangeComplete"); - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) - } else { - continue; - } - } else { - match Self::poll2(self.as_mut(), cx) { - ControlFlow::Continue(()) => continue, - ControlFlow::Break(k) => match k { - Ready(Some(Ok(ChannelEvents::Events(item)))) => { - trace!("\n\nMERGER EMITTING\n{:?}\n\n", item); - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data( - ChannelEvents::Events(item), - ))))) - } - Ready(Some(Ok(ChannelEvents::Status(item)))) => { - trace!("\n\nMERGER EMITTING\n{:?}\n\n", item); - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data( - ChannelEvents::Status(item), - ))))) - } - Ready(Some(Err(e))) => { - self.done = true; - Ready(Some(Err(e.into()))) - } - Ready(None) => { - self.done = true; - continue; - } - Pending => Pending, - }, - } - }; - } - } -} diff --git a/items_2/src/test.rs b/items_2/src/test.rs index 601280f..cdcd94d 100644 --- a/items_2/src/test.rs +++ b/items_2/src/test.rs @@ -5,7 +5,6 @@ use crate::channelevents::ConnStatusEvent; use crate::eventsdim0::EventsDim0; use crate::merger::Mergeable; use crate::merger::Merger; -use crate::merger_cev::ChannelEventsMerger; use crate::runfut; use crate::testgen::make_some_boxed_d0_f32; use crate::ChannelEvents; @@ -176,7 +175,7 @@ fn merge01() { let inp2: Vec> = Vec::new(); let inp2 = futures_util::stream::iter(inp2); let inp2 = Box::pin(inp2); - let mut merger = ChannelEventsMerger::new(vec![inp1, inp2]); + let mut merger = crate::merger::Merger::new(vec![inp1, inp2], 32); let item = merger.next().await; assert_eq!(item.as_ref(), events_vec2.get(0)); let item = merger.next().await; @@ -210,7 +209,7 @@ fn merge02() { let inp2: Vec> = Vec::new(); let inp2 = futures_util::stream::iter(inp2); let inp2 = Box::pin(inp2); - let mut merger = ChannelEventsMerger::new(vec![inp1, inp2]); + let mut merger = crate::merger::Merger::new(vec![inp1, inp2], 10); let item = merger.next().await; assert_eq!(item.as_ref(), exp.get(0)); let item = merger.next().await; @@ -294,7 +293,7 @@ fn merge03() { let inp2: Vec> = inp2_events_a; let inp2 = futures_util::stream::iter(inp2); let inp2 = Box::pin(inp2); - let mut merger = ChannelEventsMerger::new(vec![inp1, inp2]); + let mut merger = crate::merger::Merger::new(vec![inp1, inp2], 10); let item = merger.next().await; assert_eq!(item.as_ref(), events_vec2.get(0)); let item = merger.next().await; @@ -327,7 +326,7 @@ fn bin01() { let inp1 = futures_util::stream::iter(inp1); let inp1 = Box::pin(inp1); let inp2 = Box::pin(futures_util::stream::empty()) as _; - let mut stream = ChannelEventsMerger::new(vec![inp1, inp2]); + let mut stream = crate::merger::Merger::new(vec![inp1, inp2], 32); let mut coll = None; let mut binner = None; let range = NanoRange { @@ -443,7 +442,7 @@ fn bin02() { let inp1 = futures_util::stream::iter(inp1); let inp1 = Box::pin(inp1); let inp2 = Box::pin(futures_util::stream::empty()) as _; - let stream = ChannelEventsMerger::new(vec![inp1, inp2]); + let stream = crate::merger::Merger::new(vec![inp1, inp2], 32); // covering_range result is subject to adjustments, instead, manually choose bin edges let range = NanoRange { beg: TSBASE + SEC * 1,