From b3f53c60d88198679bff3f649c7e9c1bf796a3cf Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 18 Apr 2023 11:51:48 +0200 Subject: [PATCH] WIP --- items_0/src/collect_s.rs | 13 ++- items_0/src/items_0.rs | 88 ++++++++++++++++++++ items_0/src/timebin.rs | 11 ++- items_2/src/channelevents.rs | 4 +- items_2/src/streams.rs | 20 ++++- streams/src/collect.rs | 144 +++++++++++++++++++++++++++++++-- streams/src/plaineventsjson.rs | 3 +- streams/src/test/collect.rs | 43 +++++++++- 8 files changed, 306 insertions(+), 20 deletions(-) diff --git a/items_0/src/collect_s.rs b/items_0/src/collect_s.rs index 95969fb..abcaaf4 100644 --- a/items_0/src/collect_s.rs +++ b/items_0/src/collect_s.rs @@ -135,12 +135,12 @@ where } // TODO rename to `Typed` -pub trait CollectableType: fmt::Debug + AsAnyRef + AsAnyMut + TypeName + Send { +pub trait CollectableType: fmt::Debug + WithLen + AsAnyRef + AsAnyMut + TypeName + Send { type Collector: CollectorType; fn new_collector() -> Self::Collector; } -pub trait Collectable: fmt::Debug + AsAnyRef + AsAnyMut + TypeName + Send { +pub trait Collectable: fmt::Debug + WithLen + AsAnyRef + AsAnyMut + TypeName + Send { fn new_collector(&self) -> Box; } @@ -171,6 +171,13 @@ impl TypeName for Box { } } +// TODO do this with some blanket impl: +impl WithLen for Box { + fn len(&self) -> usize { + WithLen::len(self.as_ref()) + } +} + // TODO do this with some blanket impl: impl Collectable for Box { fn new_collector(&self) -> Box { @@ -180,7 +187,7 @@ impl Collectable for Box { impl WithLen for Box { fn len(&self) -> usize { - self.as_ref().len() + WithLen::len(self.as_ref()) } } diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs index 407321f..d835f81 100644 --- a/items_0/src/items_0.rs +++ b/items_0/src/items_0.rs @@ -141,3 +141,91 @@ impl PartialEq for Box { Events::partial_eq_dyn(self.as_ref(), other.as_ref()) } } + +impl EventsNonObj for Box { + fn into_tss_pulses(self: Box) -> (VecDeque, VecDeque) { + todo!() + } +} + +impl Events for Box { + fn as_time_binnable_mut(&mut self) -> &mut dyn TimeBinnable { + todo!() + } + + fn verify(&self) -> bool { + todo!() + } + + fn output_info(&self) { + todo!() + } + + fn as_collectable_mut(&mut self) -> &mut dyn Collectable { + todo!() + } + + fn as_collectable_with_default_ref(&self) -> &dyn Collectable { + todo!() + } + + fn as_collectable_with_default_mut(&mut self) -> &mut dyn Collectable { + todo!() + } + + fn ts_min(&self) -> Option { + todo!() + } + + fn ts_max(&self) -> Option { + todo!() + } + + fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box { + todo!() + } + + fn new_empty_evs(&self) -> Box { + todo!() + } + + fn drain_into_evs(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError> { + todo!() + } + + fn find_lowest_index_gt_evs(&self, ts: u64) -> Option { + todo!() + } + + fn find_lowest_index_ge_evs(&self, ts: u64) -> Option { + todo!() + } + + fn find_highest_index_lt_evs(&self, ts: u64) -> Option { + todo!() + } + + fn clone_dyn(&self) -> Box { + todo!() + } + + fn partial_eq_dyn(&self, other: &dyn Events) -> bool { + todo!() + } + + fn serde_id(&self) -> &'static str { + todo!() + } + + fn nty_id(&self) -> u32 { + todo!() + } + + fn tss(&self) -> &VecDeque { + todo!() + } + + fn pulses(&self) -> &VecDeque { + todo!() + } +} diff --git a/items_0/src/timebin.rs b/items_0/src/timebin.rs index c59e743..986bfac 100644 --- a/items_0/src/timebin.rs +++ b/items_0/src/timebin.rs @@ -1,4 +1,5 @@ use crate::collect_s::Collectable; +use crate::collect_s::Collector; use crate::collect_s::ToJsonResult; use crate::AsAnyMut; use crate::AsAnyRef; @@ -84,7 +85,9 @@ pub trait TimeBinner: fmt::Debug + Send { /// Provides a time-binned representation of the implementing type. /// In contrast to `TimeBinnableType` this is meant for trait objects. -pub trait TimeBinnable: fmt::Debug + WithLen + RangeOverlapInfo + Any + AsAnyRef + AsAnyMut + Send { +pub trait TimeBinnable: + fmt::Debug + WithLen + RangeOverlapInfo + Collectable + Any + AsAnyRef + AsAnyMut + Send +{ // TODO implementors may fail if edges contain not at least 2 entries. fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box; // TODO just a helper for the empty result. @@ -93,7 +96,7 @@ pub trait TimeBinnable: fmt::Debug + WithLen + RangeOverlapInfo + Any + AsAnyRef impl WithLen for Box { fn len(&self) -> usize { - todo!() + WithLen::len(self.as_ref()) } } @@ -152,8 +155,8 @@ impl TypeName for Box { } impl Collectable for Box { - fn new_collector(&self) -> Box { - todo!() + fn new_collector(&self) -> Box { + self.as_ref().new_collector() } } diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index 430e91a..084e6c6 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -532,7 +532,7 @@ impl ByteEstimate for ChannelEvents { impl Mergeable for ChannelEvents { fn ts_min(&self) -> Option { match self { - ChannelEvents::Events(k) => k.ts_min(), + ChannelEvents::Events(k) => Mergeable::ts_min(k), ChannelEvents::Status(k) => match k { Some(k) => Some(k.ts), None => None, @@ -542,7 +542,7 @@ impl Mergeable for ChannelEvents { fn ts_max(&self) -> Option { match self { - ChannelEvents::Events(k) => k.ts_max(), + ChannelEvents::Events(k) => Mergeable::ts_max(k), ChannelEvents::Status(k) => match k { Some(k) => Some(k.ts), None => None, diff --git a/items_2/src/streams.rs b/items_2/src/streams.rs index 9864297..141c1b4 100644 --- a/items_2/src/streams.rs +++ b/items_2/src/streams.rs @@ -3,7 +3,9 @@ use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; use items_0::collect_s::Collectable; +use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; use items_0::transform::CollectableStreamTrait; use items_0::transform::EventStreamTrait; use items_0::transform::EventTransform; @@ -249,9 +251,23 @@ where { type Item = Sitemty>; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - todo!() + match self.inp.poll_next_unpin(cx) { + Ready(Some(item)) => Ready(Some(match item { + Ok(item) => Ok(match item { + StreamItem::DataItem(item) => StreamItem::DataItem(match item { + RangeCompletableItem::RangeComplete => RangeCompletableItem::RangeComplete, + RangeCompletableItem::Data(item) => RangeCompletableItem::Data(Box::new(item)), + }), + StreamItem::Log(item) => StreamItem::Log(item), + StreamItem::Stats(item) => StreamItem::Stats(item), + }), + Err(e) => Err(e), + })), + Ready(None) => Ready(None), + Pending => Pending, + } } } diff --git a/streams/src/collect.rs b/streams/src/collect.rs index a59558e..3bc2af8 100644 --- a/streams/src/collect.rs +++ b/streams/src/collect.rs @@ -1,5 +1,6 @@ use err::Error; use futures_util::Future; +use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; use items_0::collect_s::Collectable; @@ -46,28 +47,161 @@ macro_rules! trace4 { pub struct Collect { inp: CollectableStreamBox, deadline: Instant, + events_max: u64, + range: Option, + binrange: Option, + collector: Option>, + range_complete: bool, + timeout: bool, + timer: Pin + Send>>, + done_input: bool, } impl Collect { - pub fn new(inp: INP, deadline: Instant) -> Self + pub fn new( + inp: INP, + deadline: Instant, + events_max: u64, + range: Option, + binrange: Option, + ) -> Self where INP: CollectableStreamTrait + 'static, { + let timer = tokio::time::sleep_until(deadline.into()); Self { inp: CollectableStreamBox(Box::pin(inp)), deadline, + events_max, + range, + binrange, + collector: None, + range_complete: false, + timeout: false, + timer: Box::pin(timer), + done_input: false, + } + } + + fn handle_item(&mut self, item: Sitemty>) -> Result<(), Error> { + match item { + Ok(item) => match item { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + self.range_complete = true; + if let Some(coll) = self.collector.as_mut() { + coll.set_range_complete(); + } else { + warn!("collect received RangeComplete but no collector yet"); + } + Ok(()) + } + RangeCompletableItem::Data(mut item) => { + info!("collect sees len {}", item.len()); + let coll = self.collector.get_or_insert_with(|| item.new_collector()); + coll.ingest(&mut item); + if coll.len() as u64 >= self.events_max { + warn!( + "TODO compute continue-at reached events_max {} abort", + self.events_max + ); + } + Ok(()) + } + }, + StreamItem::Log(item) => { + trace!("collect log {:?}", item); + Ok(()) + } + StreamItem::Stats(item) => { + trace!("collect stats {:?}", item); + match item { + // TODO factor and simplify the stats collection: + StatsItem::EventDataReadStats(_) => {} + StatsItem::RangeFilterStats(_) => {} + StatsItem::DiskStats(item) => match item { + DiskStats::OpenStats(k) => { + //total_duration += k.duration; + } + DiskStats::SeekStats(k) => { + //total_duration += k.duration; + } + DiskStats::ReadStats(k) => { + //total_duration += k.duration; + } + DiskStats::ReadExactStats(k) => { + //total_duration += k.duration; + } + }, + } + Ok(()) + } + }, + Err(e) => { + // TODO Need to use some flags to get good enough error message for remote user. + Err(e) + } } } } impl Future for Collect { - type Output = Sitemty>; + type Output = Result, Error>; - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { use Poll::*; let span = tracing::span!(Level::INFO, "Collect"); let _spg = span.enter(); - todo!() + loop { + break if self.done_input { + if self.timeout { + if let Some(coll) = self.collector.as_mut() { + coll.set_timed_out(); + } else { + warn!("collect timeout but no collector yet"); + } + } + // TODO use range_final and timeout in result. + match self.collector.take() { + Some(mut coll) => match coll.result(self.range.clone(), self.binrange.clone()) { + Ok(res) => { + //info!("collect stats total duration: {:?}", total_duration); + Ready(Ok(res)) + } + Err(e) => Ready(Err(e)), + }, + None => { + let e = Error::with_msg_no_trace(format!("no result because no collector was created")); + error!("{e}"); + Ready(Err(e)) + } + } + } else { + match self.timer.poll_unpin(cx) { + Ready(()) => { + self.timeout = true; + self.done_input = true; + continue; + } + Pending => match self.inp.0.poll_next_unpin(cx) { + Ready(Some(item)) => match self.handle_item(item) { + Ok(()) => { + continue; + } + Err(e) => { + error!("{e}"); + Ready(Err(e)) + } + }, + Ready(None) => { + self.done_input = true; + continue; + } + Pending => Pending, + }, + } + }; + } } } @@ -80,7 +214,7 @@ async fn collect_in_span( ) -> Result, Error> where S: Stream> + Unpin, - T: Collectable + WithLen + fmt::Debug, + T: Collectable, { info!("collect events_max {events_max} deadline {deadline:?}"); let mut collector: Option> = None; diff --git a/streams/src/plaineventsjson.rs b/streams/src/plaineventsjson.rs index b8f7c1a..6d98778 100644 --- a/streams/src/plaineventsjson.rs +++ b/streams/src/plaineventsjson.rs @@ -105,10 +105,9 @@ pub async fn plain_events_json(evq: &PlainEventsQuery, chconf: &ChConf, cluster: let stream = PlainEventStream::new(stream); let stream = EventsToTimeBinnable::new(stream); let stream = TimeBinnableToCollectable::new(stream); + let collected = Collect::new(stream, deadline, events_max, Some(evq.range().clone()), None).await; - // TODO allow Collect to respect events_max and give range to compute continue-at. //let collected = crate::collect::collect(stream, deadline, events_max, Some(evq.range().clone()), None).await?; - let collected = Collect::new(stream, deadline).await; let jsval = serde_json::to_value(&collected)?; Ok(jsval) } diff --git a/streams/src/test/collect.rs b/streams/src/test/collect.rs index f756532..eb67142 100644 --- a/streams/src/test/collect.rs +++ b/streams/src/test/collect.rs @@ -1,20 +1,30 @@ +use crate::collect::Collect; use crate::test::runfut; +use crate::transform::EventsToTimeBinnable; +use crate::transform::TimeBinnableToCollectable; use err::Error; use futures_util::stream; use items_0::streamitem::sitem_data; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::StreamItem; use items_0::WithLen; use items_2::eventsdim0::EventsDim0CollectorOutput; +use items_2::streams::PlainEventStream; use items_2::testgen::make_some_boxed_d0_f32; use netpod::timeunits::SEC; use std::time::Duration; use std::time::Instant; #[test] -fn collect_channel_events() -> Result<(), Error> { +fn collect_channel_events_00() -> Result<(), Error> { let fut = async { let evs0 = make_some_boxed_d0_f32(20, SEC * 10, SEC * 1, 0, 28736487); let evs1 = make_some_boxed_d0_f32(20, SEC * 30, SEC * 1, 0, 882716583); - let stream = stream::iter(vec![sitem_data(evs0), sitem_data(evs1)]); + let stream = stream::iter(vec![ + sitem_data(evs0), + sitem_data(evs1), + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)), + ]); let deadline = Instant::now() + Duration::from_millis(4000); let events_max = 10000; let res = crate::collect::collect(stream, deadline, events_max, None, None).await?; @@ -30,3 +40,32 @@ fn collect_channel_events() -> Result<(), Error> { }; runfut(fut) } + +#[test] +fn collect_channel_events_01() -> Result<(), Error> { + let fut = async { + let evs0 = make_some_boxed_d0_f32(20, SEC * 10, SEC * 1, 0, 28736487); + let evs1 = make_some_boxed_d0_f32(20, SEC * 30, SEC * 1, 0, 882716583); + let stream = stream::iter(vec![ + sitem_data(evs0), + sitem_data(evs1), + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)), + ]); + // TODO build like in request code + let deadline = Instant::now() + Duration::from_millis(4000); + let events_max = 10000; + let stream = PlainEventStream::new(stream); + let stream = EventsToTimeBinnable::new(stream); + let stream = TimeBinnableToCollectable::new(stream); + let res = Collect::new(stream, deadline, events_max, None, None).await?; + if let Some(res) = res.as_any_ref().downcast_ref::>() { + eprintln!("Great, a match"); + eprintln!("{res:?}"); + assert_eq!(res.len(), 40); + } else { + return Err(Error::with_msg(format!("bad type of collected result"))); + } + Ok(()) + }; + runfut(fut) +}