diff --git a/disk/src/binned.rs b/disk/src/binned.rs index c5ef750..1fffdfb 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -75,10 +75,7 @@ impl ChannelExecFunction for BinnedBinaryChannelExec { FrameType + Framable + DeserializeOwned, { let _ = event_value_shape; - let range = - BinnedRange::covering_range(self.query.range().clone(), self.query.bin_count())?.ok_or(Error::with_msg( - format!("BinnedBinaryChannelExec BinnedRange::covering_range returned None"), - ))?; + let range = BinnedRange::covering_range(self.query.range().clone(), self.query.bin_count())?; let perf_opts = PerfOpts { inmem_bufcap: 512 }; let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) { Ok(Some(pre_range)) => { @@ -328,10 +325,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec { FrameType + Framable + DeserializeOwned, { let _ = event_value_shape; - let range = - BinnedRange::covering_range(self.query.range().clone(), self.query.bin_count())?.ok_or(Error::with_msg( - format!("BinnedJsonChannelExec BinnedRange::covering_range returned None"), - ))?; + let range = BinnedRange::covering_range(self.query.range().clone(), self.query.bin_count())?; let t_bin_count = range.count as u32; let perf_opts = PerfOpts { inmem_bufcap: 512 }; let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) { diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index 540e59e..27dc012 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -123,8 +123,7 @@ where } // TODO do I need to set up more transformations or binning to deliver the requested data? let count = self.query.patch().patch_t_len() / self.query.patch().bin_t_len(); - let range = BinnedRange::covering_range(evq.range.clone(), count as u32)? - .ok_or(Error::with_msg("covering_range returns None"))?; + let range = BinnedRange::covering_range(evq.range.clone(), count as u32)?; let perf_opts = PerfOpts { inmem_bufcap: 512 }; let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); let ret = TBinnerStream::<_, ::Output>::new( diff --git a/err/src/lib.rs b/err/src/lib.rs index 74815a9..70d5bc6 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -10,6 +10,10 @@ use std::num::{ParseFloatError, ParseIntError}; use std::string::FromUtf8Error; use std::sync::PoisonError; +pub mod bt { + pub use backtrace::Backtrace; +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub enum Reason { InternalError, @@ -409,9 +413,13 @@ impl From<&Error> for PublicError { } pub fn todo() { - todo!("TODO"); + let bt = backtrace::Backtrace::new(); + eprintln!("TODO\n{bt:?}"); + todo!("TODO\n{bt:?}"); } pub fn todoval() -> T { - todo!("TODO todoval") + let bt = backtrace::Backtrace::new(); + eprintln!("TODO\n{bt:?}"); + todo!("TODO todoval\n{bt:?}") } diff --git a/httpret/src/events.rs b/httpret/src/events.rs index 95147d8..46d77e4 100644 --- a/httpret/src/events.rs +++ b/httpret/src/events.rs @@ -7,11 +7,11 @@ use hyper::Body; use items_2::{binned_collected, ChannelEvents, ChannelEventsMerger}; use netpod::log::*; use netpod::query::{BinnedQuery, ChannelStateEventsQuery, PlainEventsQuery}; -use netpod::{AggKind, FromUrl, NodeConfigCached}; +use netpod::{AggKind, BinnedRange, FromUrl, NodeConfigCached}; use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET}; use scyllaconn::create_scy_session; use scyllaconn::errconv::ErrConv; -use scyllaconn::events::{channel_state_events, make_scylla_stream}; +use scyllaconn::events::{channel_state_events, find_series, make_scylla_stream}; use std::pin::Pin; use std::sync::Arc; use url::Url; @@ -184,7 +184,8 @@ impl EventsHandlerScylla { }; let mut stream = if let Some(scyco) = &node_config.node_config.cluster.scylla { let scy = create_scy_session(scyco).await?; - let stream = make_scylla_stream(&evq, scy, &pgclient, false).await?; + let (series, scalar_type, shape) = { find_series(evq.channel(), pgclient.clone()).await? }; + let stream = make_scylla_stream(&evq, series, scalar_type.clone(), shape.clone(), scy, false).await?; stream } else { return Err(Error::with_public_msg(format!("no scylla configured"))); @@ -195,7 +196,7 @@ impl EventsHandlerScylla { Ok(k) => match k { ChannelEvents::Events(mut item) => { if coll.is_none() { - coll = Some(item.new_collector()); + coll = Some(item.new_collector(0)); } let cl = coll.as_mut().unwrap(); cl.ingest(item.as_collectable_mut()); @@ -244,7 +245,10 @@ impl BinnedHandlerScylla { } match self.fetch(req, node_config).await { Ok(ret) => Ok(ret), - Err(e) => Ok(e.to_public_response()), + Err(e) => { + eprintln!("error: {e}"); + Ok(e.to_public_response()) + } } } @@ -287,17 +291,41 @@ impl BinnedHandlerScylla { let mut query2 = PlainEventsQuery::new(evq.channel().clone(), evq.range().clone(), 0, None, false); query2.set_timeout(evq.timeout()); let query2 = query2; - let stream = make_scylla_stream(&query2, scy.clone(), &pgclient, false).await?; + let (series, scalar_type, shape) = { find_series(evq.channel(), pgclient.clone()).await? }; + let stream = + make_scylla_stream(&query2, series, scalar_type.clone(), shape.clone(), scy.clone(), false).await?; let query3 = ChannelStateEventsQuery::new(evq.channel().clone(), evq.range().clone()); - let state_stream = channel_state_events(&query3, scy.clone()).await?; + let state_stream = channel_state_events(&query3, scy.clone()) + .await? + .map(|x| { + //eprintln!("state_stream {x:?}"); + x + }) + .map_err(|e| items_2::Error::from(format!("{e}"))); // TODO let the stream itself use the items_2 error, do not convert here. - let data_stream = Box::pin(stream.map_err(|e| items_2::Error::from(format!("{e}")))); - let state_stream = Box::pin(state_stream.map_err(|e| items_2::Error::from(format!("{e}")))); - let merged_stream = ChannelEventsMerger::new(data_stream, state_stream); + let data_stream = stream + .map(|x| { + //eprintln!("data_stream {x:?}"); + x + }) + .map_err(|e| items_2::Error::from(format!("{e}"))); + let data_stream = Box::pin(data_stream) as _; + let state_stream = Box::pin(state_stream) as _; + let merged_stream = ChannelEventsMerger::new(vec![data_stream, state_stream]); let merged_stream = Box::pin(merged_stream) as Pin + Send>>; - let binned_collected = binned_collected(merged_stream) - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e}")))?; + let covering = BinnedRange::covering_range(evq.range().clone(), evq.bin_count())?; + //eprintln!("edges {:?}", covering.edges()); + // TODO return partial result if timed out. + let binned_collected = binned_collected( + scalar_type.clone(), + shape.clone(), + evq.agg_kind().clone(), + covering.edges(), + evq.timeout(), + merged_stream, + ) + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e}")))?; let res = binned_collected.to_json_result()?; let res = res.to_json_bytes()?; let ret = response(StatusCode::OK).body(Body::from(res))?; diff --git a/items_2/Cargo.toml b/items_2/Cargo.toml index 7cbd5eb..ae88426 100644 --- a/items_2/Cargo.toml +++ b/items_2/Cargo.toml @@ -17,7 +17,7 @@ num-traits = "0.2.15" chrono = { version = "0.4.19", features = ["serde"] } crc32fast = "1.3.2" futures-util = "0.3.24" -tokio = { version = "1.20", features = ["rt-multi-thread", "sync"] } +tokio = { version = "1.20", features = ["rt-multi-thread", "sync", "time"] } err = { path = "../err" } items_proc = { path = "../items_proc" } netpod = { path = "../netpod" } diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs index 5fd9354..1b377b0 100644 --- a/items_2/src/binsdim0.rs +++ b/items_2/src/binsdim0.rs @@ -45,7 +45,7 @@ where } } -impl BinsDim0 { +impl BinsDim0 { pub fn empty() -> Self { Self { ts1s: VecDeque::new(), @@ -56,6 +56,24 @@ impl BinsDim0 { avgs: VecDeque::new(), } } + + pub fn append_zero(&mut self, beg: u64, end: u64) { + self.ts1s.push_back(beg); + self.ts2s.push_back(end); + self.counts.push_back(0); + self.mins.push_back(NTY::zero()); + self.maxs.push_back(NTY::zero()); + self.avgs.push_back(0.); + } + + pub fn append_all_from(&mut self, src: &mut Self) { + self.ts1s.extend(src.ts1s.drain(..)); + self.ts2s.extend(src.ts2s.drain(..)); + self.counts.extend(src.counts.drain(..)); + self.mins.extend(src.mins.drain(..)); + self.maxs.extend(src.maxs.drain(..)); + self.avgs.extend(src.avgs.drain(..)); + } } impl WithLen for BinsDim0 { @@ -170,25 +188,57 @@ pub struct BinsDim0CollectedResult { continue_at: Option, } +impl BinsDim0CollectedResult { + pub fn ts_anchor_sec(&self) -> u64 { + self.ts_anchor_sec + } + + pub fn counts(&self) -> &VecDeque { + &self.counts + } + + pub fn missing_bins(&self) -> u32 { + self.missing_bins + } + + pub fn continue_at(&self) -> Option { + self.continue_at.clone() + } + + pub fn mins(&self) -> &VecDeque { + &self.mins + } + + pub fn maxs(&self) -> &VecDeque { + &self.maxs + } +} + impl ToJsonResult for BinsDim0CollectedResult { fn to_json_result(&self) -> Result, Error> { let k = serde_json::to_value(self)?; Ok(Box::new(k)) } + + fn as_any(&self) -> &dyn Any { + self + } } pub struct BinsDim0Collector { timed_out: bool, range_complete: bool, vals: BinsDim0, + bin_count_exp: u32, } impl BinsDim0Collector { - pub fn new() -> Self { + pub fn new(bin_count_exp: u32) -> Self { Self { timed_out: false, range_complete: false, vals: BinsDim0::::empty(), + bin_count_exp, } } } @@ -223,25 +273,21 @@ impl CollectorType for BinsDim0Collector { fn result(&mut self) -> Result { let bin_count = self.vals.ts1s.len() as u32; - // TODO save the clone: - let mut ts_all = self.vals.ts1s.clone(); - if self.vals.ts2s.len() > 0 { - ts_all.push_back(*self.vals.ts2s.back().unwrap()); - } - info!("TODO return proper continueAt"); - let bin_count_exp = 100 as u32; - let continue_at = if self.vals.ts1s.len() < bin_count_exp as usize { - match ts_all.back() { + let (missing_bins, continue_at) = if bin_count < self.bin_count_exp { + match self.vals.ts2s.back() { Some(&k) => { let iso = IsoDateTime(Utc.timestamp_nanos(k as i64)); - Some(iso) + (self.bin_count_exp - bin_count, Some(iso)) } None => Err(Error::with_msg("partial_content but no bin in result"))?, } } else { - None + (0, None) }; - if ts_all.as_slices().1.len() != 0 { + if self.vals.ts1s.as_slices().1.len() != 0 { + panic!(); + } + if self.vals.ts2s.as_slices().1.len() != 0 { panic!(); } let tst1 = ts_offs_from_abs(self.vals.ts1s.as_slices().0); @@ -261,7 +307,7 @@ impl CollectorType for BinsDim0Collector { maxs, avgs, finalised_range: self.range_complete, - missing_bins: bin_count_exp - bin_count, + missing_bins, continue_at, }; Ok(ret) @@ -271,8 +317,8 @@ impl CollectorType for BinsDim0Collector { impl CollectableType for BinsDim0 { type Collector = BinsDim0Collector; - fn new_collector() -> Self::Collector { - Self::Collector::new() + fn new_collector(bin_count_exp: u32) -> Self::Collector { + Self::Collector::new(bin_count_exp) } } @@ -362,6 +408,11 @@ impl TimeBinnable for BinsDim0 { fn as_any(&self) -> &dyn Any { self as &dyn Any } + + fn to_box_to_json_result(&self) -> Box { + let k = serde_json::to_value(self).unwrap(); + Box::new(k) as _ + } } pub struct BinsDim0TimeBinner { @@ -484,14 +535,13 @@ impl TimeBinner for BinsDim0TimeBinner { let expand = true; if let Some(agg) = self.agg.as_mut() { let dummy_range = NanoRange { beg: 4, end: 5 }; - let bins = agg.result_reset(dummy_range, expand); + let mut bins = agg.result_reset(dummy_range, expand); self.agg = None; assert_eq!(bins.len(), 1); if push_empty || bins.counts[0] != 0 { match self.ready.as_mut() { - Some(_ready) => { - err::todo(); - //ready.append(&mut bins); + Some(ready) => { + ready.append_all_from(&mut bins); } None => { self.ready = Some(bins); @@ -506,14 +556,12 @@ impl TimeBinner for BinsDim0TimeBinner { let n = self.bins_ready_count(); self.push_in_progress(true); if self.bins_ready_count() == n { - if let Some(_range) = self.next_bin_range() { - let bins = BinsDim0::::empty(); - err::todo(); - //bins.append_zero(range.beg, range.end); + if let Some(range) = self.next_bin_range() { + let mut bins = BinsDim0::::empty(); + bins.append_zero(range.beg, range.end); match self.ready.as_mut() { - Some(_ready) => { - err::todo(); - //ready.append(&mut bins); + Some(ready) => { + ready.append_all_from(&mut bins); } None => { self.ready = Some(bins); diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index e4c80a3..6cf16a4 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -11,6 +11,13 @@ use std::any::Any; use std::collections::VecDeque; use std::{fmt, mem}; +#[allow(unused)] +macro_rules! trace { + ($($x:expr),*) => { + {let _ = format!($($x),*);} + }; +} + #[derive(Clone, PartialEq, Serialize, Deserialize)] pub struct EventsDim0 { pub tss: VecDeque, @@ -88,7 +95,7 @@ impl RangeOverlapInfo for EventsDim0 { impl TimeBinnableType for EventsDim0 { type Output = BinsDim0; - type Aggregator = EventValuesAggregator; + type Aggregator = EventsDim0Aggregator; fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { let self_name = std::any::type_name::(); @@ -100,30 +107,33 @@ impl TimeBinnableType for EventsDim0 { } } -pub struct EventValuesCollector { +pub struct EventsDim0Collector { vals: EventsDim0, range_complete: bool, timed_out: bool, + #[allow(unused)] + bin_count_exp: u32, } -impl EventValuesCollector { - pub fn new() -> Self { +impl EventsDim0Collector { + pub fn new(bin_count_exp: u32) -> Self { Self { vals: EventsDim0::empty(), range_complete: false, timed_out: false, + bin_count_exp, } } } -impl WithLen for EventValuesCollector { +impl WithLen for EventsDim0Collector { fn len(&self) -> usize { self.vals.tss.len() } } #[derive(Debug, Serialize)] -pub struct EventValuesCollectorOutput { +pub struct EventsDim0CollectorOutput { #[serde(rename = "tsAnchor")] ts_anchor_sec: u64, #[serde(rename = "tsMs")] @@ -141,16 +151,20 @@ pub struct EventValuesCollectorOutput { timed_out: bool, } -impl ToJsonResult for EventValuesCollectorOutput { +impl ToJsonResult for EventsDim0CollectorOutput { fn to_json_result(&self) -> Result, Error> { let k = serde_json::to_value(self)?; Ok(Box::new(k)) } + + fn as_any(&self) -> &dyn Any { + self + } } -impl CollectorType for EventValuesCollector { +impl CollectorType for EventsDim0Collector { type Input = EventsDim0; - type Output = EventValuesCollectorOutput; + type Output = EventsDim0CollectorOutput; fn ingest(&mut self, src: &mut Self::Input) { // TODO could be optimized by non-contiguous container. @@ -186,14 +200,14 @@ impl CollectorType for EventValuesCollector { } impl CollectableType for EventsDim0 { - type Collector = EventValuesCollector; + type Collector = EventsDim0Collector; - fn new_collector() -> Self::Collector { - Self::Collector::new() + fn new_collector(bin_count_exp: u32) -> Self::Collector { + Self::Collector::new(bin_count_exp) } } -pub struct EventValuesAggregator { +pub struct EventsDim0Aggregator { range: NanoRange, count: u64, min: NTY, @@ -201,14 +215,15 @@ pub struct EventValuesAggregator { sumc: u64, sum: f32, int_ts: u64, - last_ts: u64, - last_val: Option, + last_seen_ts: u64, + last_seen_val: Option, + did_min_max: bool, do_time_weight: bool, events_taken_count: u64, events_ignored_count: u64, } -impl Drop for EventValuesAggregator { +impl Drop for EventsDim0Aggregator { fn drop(&mut self) { // TODO collect as stats for the request context: trace!( @@ -219,7 +234,7 @@ impl Drop for EventValuesAggregator { } } -impl EventValuesAggregator { +impl EventsDim0Aggregator { pub fn new(range: NanoRange, do_time_weight: bool) -> Self { let int_ts = range.beg; Self { @@ -230,8 +245,9 @@ impl EventValuesAggregator { sum: 0., sumc: 0, int_ts, - last_ts: 0, - last_val: None, + last_seen_ts: 0, + last_seen_val: None, + did_min_max: false, do_time_weight, events_taken_count: 0, events_ignored_count: 0, @@ -240,7 +256,17 @@ impl EventValuesAggregator { // TODO reduce clone.. optimize via more traits to factor the trade-offs? fn apply_min_max(&mut self, val: NTY) { - if self.count == 0 { + trace!( + "apply_min_max val {:?} last_val {:?} count {} sumc {:?} min {:?} max {:?}", + val, + self.last_seen_val, + self.count, + self.sumc, + self.min, + self.max + ); + if self.did_min_max == false { + self.did_min_max = true; self.min = val.clone(); self.max = val.clone(); } else { @@ -254,6 +280,8 @@ impl EventValuesAggregator { } fn apply_event_unweight(&mut self, val: NTY) { + trace!("TODO check again result_reset_unweight"); + err::todo(); let vf = val.as_prim_f32(); self.apply_min_max(val); if vf.is_nan() { @@ -264,10 +292,12 @@ impl EventValuesAggregator { } fn apply_event_time_weight(&mut self, ts: u64) { - if let Some(v) = &self.last_val { + if let Some(v) = &self.last_seen_val { let vf = v.as_prim_f32(); let v2 = v.clone(); - self.apply_min_max(v2); + if ts > self.range.beg { + self.apply_min_max(v2); + } let w = if self.do_time_weight { (ts - self.int_ts) as f32 * 1e-9 } else { @@ -288,6 +318,8 @@ impl EventValuesAggregator { } fn ingest_unweight(&mut self, item: &::Input) { + trace!("TODO check again result_reset_unweight"); + err::todo(); for i1 in 0..item.tss.len() { let ts = item.tss[i1]; let val = item.values[i1].clone(); @@ -305,45 +337,51 @@ impl EventValuesAggregator { } fn ingest_time_weight(&mut self, item: &::Input) { + let self_name = std::any::type_name::(); + trace!("{self_name}::ingest_time_weight item len {}", item.len()); for i1 in 0..item.tss.len() { let ts = item.tss[i1]; let val = item.values[i1].clone(); + trace!("{self_name} ingest {:6} {:20} {:10?}", i1, ts, val); if ts < self.int_ts { - if self.last_val.is_none() { + if self.last_seen_val.is_none() { info!( "ingest_time_weight event before range, only set last ts {} val {:?}", ts, val ); } self.events_ignored_count += 1; - self.last_ts = ts; - self.last_val = Some(val); + self.last_seen_ts = ts; + self.last_seen_val = Some(val); } else if ts >= self.range.end { self.events_ignored_count += 1; return; } else { - self.apply_event_time_weight(ts); - if self.last_val.is_none() { + if false && self.last_seen_val.is_none() { + // TODO no longer needed or? info!( "call apply_min_max without last val, use current instead {} {:?}", ts, val ); self.apply_min_max(val.clone()); } + self.apply_event_time_weight(ts); self.count += 1; - self.last_ts = ts; - self.last_val = Some(val); + self.last_seen_ts = ts; + self.last_seen_val = Some(val); self.events_taken_count += 1; } } } fn result_reset_unweight(&mut self, range: NanoRange, _expand: bool) -> BinsDim0 { + trace!("TODO check again result_reset_unweight"); + err::todo(); let (min, max, avg) = if self.sumc > 0 { let avg = self.sum / self.sumc as f32; (self.min.clone(), self.max.clone(), avg) } else { - let g = match &self.last_val { + let g = match &self.last_seen_val { Some(x) => x.clone(), None => NTY::zero(), }; @@ -362,6 +400,7 @@ impl EventValuesAggregator { self.count = 0; self.sum = 0f32; self.sumc = 0; + self.did_min_max = false; ret } @@ -377,7 +416,7 @@ impl EventValuesAggregator { let avg = self.sum / (self.range.delta() as f32 * 1e-9); (self.min.clone(), self.max.clone(), avg) } else { - let g = match &self.last_val { + let g = match &self.last_seen_val { Some(x) => x.clone(), None => NTY::zero(), }; @@ -394,13 +433,16 @@ impl EventValuesAggregator { self.int_ts = range.beg; self.range = range; self.count = 0; - self.sum = 0f32; + self.sum = 0.; self.sumc = 0; + self.did_min_max = false; + self.min = NTY::zero(); + self.max = NTY::zero(); ret } } -impl TimeBinnableTypeAggregator for EventValuesAggregator { +impl TimeBinnableTypeAggregator for EventsDim0Aggregator { type Input = EventsDim0; type Output = BinsDim0; @@ -409,8 +451,13 @@ impl TimeBinnableTypeAggregator for EventValuesAggregator { } fn ingest(&mut self, item: &Self::Input) { - for ts in &item.tss { - eprintln!("EventValuesAggregator ingest {ts:20}"); + if true { + trace!("{} ingest {} events", std::any::type_name::(), item.len()); + } + if false { + for (i, &ts) in item.tss.iter().enumerate() { + trace!("{} ingest {:6} {:20}", std::any::type_name::(), i, ts); + } } if self.do_time_weight { self.ingest_time_weight(item) @@ -420,7 +467,7 @@ impl TimeBinnableTypeAggregator for EventValuesAggregator { } fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output { - debug!("Produce for {:?} next {:?}", self.range, range); + trace!("result_reset {} {}", range.beg, range.end); if self.do_time_weight { self.result_reset_time_weight(range, expand) } else { @@ -431,29 +478,37 @@ impl TimeBinnableTypeAggregator for EventValuesAggregator { impl TimeBinnable for EventsDim0 { fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box { - let ret = ScalarEventsTimeBinner::::new(edges.into(), do_time_weight); + let ret = EventsDim0TimeBinner::::new(edges.into(), do_time_weight).unwrap(); Box::new(ret) } fn as_any(&self) -> &dyn Any { self as &dyn Any } + + fn to_box_to_json_result(&self) -> Box { + let k = serde_json::to_value(self).unwrap(); + Box::new(k) as _ + } } impl Events for EventsDim0 { - fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable { + fn as_time_binnable(&self) -> &dyn TimeBinnable { self as &dyn TimeBinnable } - fn verify(&self) { + fn verify(&self) -> bool { + let mut good = true; let mut ts_max = 0; for ts in &self.tss { let ts = *ts; if ts < ts_max { + good = false; error!("unordered event data ts {} ts_max {}", ts, ts_max); } ts_max = ts_max.max(ts); } + good } fn output_info(&self) { @@ -495,6 +550,10 @@ impl Events for EventsDim0 { self.tss.front().map(|&x| x) } + fn ts_max(&self) -> Option { + self.tss.back().map(|&x| x) + } + fn partial_eq_dyn(&self, other: &dyn Events) -> bool { if let Some(other) = other.as_any().downcast_ref::() { self == other @@ -504,41 +563,53 @@ impl Events for EventsDim0 { } } -pub struct ScalarEventsTimeBinner { - // The first two edges are used the next time that we create an aggregator, or push a zero bin. +pub struct EventsDim0TimeBinner { edges: VecDeque, - do_time_weight: bool, - agg: Option>, - ready: Option< as TimeBinnableTypeAggregator>::Output>, + agg: EventsDim0Aggregator, + ready: Option< as TimeBinnableTypeAggregator>::Output>, } -impl ScalarEventsTimeBinner { - fn new(edges: VecDeque, do_time_weight: bool) -> Self { - let self_name = std::any::type_name::(); - eprintln!("{self_name}::new edges {edges:?}"); - Self { - edges, - do_time_weight, - agg: None, - ready: None, +impl EventsDim0TimeBinner { + fn new(edges: VecDeque, do_time_weight: bool) -> Result { + if edges.len() < 2 { + return Err(Error::with_msg_no_trace(format!("need at least 2 edges"))); } + let self_name = std::any::type_name::(); + trace!("{self_name}::new edges {edges:?}"); + let agg = EventsDim0Aggregator::new( + NanoRange { + beg: edges[0], + end: edges[1], + }, + do_time_weight, + ); + let ret = Self { + edges, + agg, + ready: None, + }; + Ok(ret) } fn next_bin_range(&mut self) -> Option { - if self.edges.len() >= 2 { + let self_name = std::any::type_name::(); + if self.edges.len() >= 3 { + self.edges.pop_front(); let ret = NanoRange { beg: self.edges[0], end: self.edges[1], }; - self.edges.pop_front(); + trace!("{self_name} next_bin_range {} {}", ret.beg, ret.end); Some(ret) } else { + self.edges.clear(); + trace!("{self_name} next_bin_range None"); None } } } -impl TimeBinner for ScalarEventsTimeBinner { +impl TimeBinner for EventsDim0TimeBinner { fn bins_ready_count(&self) -> usize { match &self.ready { Some(k) => k.len(), @@ -554,103 +625,97 @@ impl TimeBinner for ScalarEventsTimeBinner { } fn ingest(&mut self, item: &dyn TimeBinnable) { - const SELF: &str = "ScalarEventsTimeBinner"; + let self_name = std::any::type_name::(); + if true { + trace!( + "TimeBinner for EventsDim0TimeBinner {:?}\n{:?}\n------------------------------------", + self.edges.iter().take(2).collect::>(), + item + ); + } if item.len() == 0 { // Return already here, RangeOverlapInfo would not give much sense. return; } if self.edges.len() < 2 { - warn!("TimeBinnerDyn for {SELF} no more bin in edges A"); + warn!("{self_name} no more bin in edges A"); return; } // TODO optimize by remembering at which event array index we have arrived. // That needs modified interfaces which can take and yield the start and latest index. loop { - while item.starts_after(NanoRange { - beg: 0, - end: self.edges[1], - }) { + while item.starts_after(self.agg.range().clone()) { + trace!("{self_name} IGNORE ITEM AND CYCLE BECAUSE item.starts_after"); self.cycle(); if self.edges.len() < 2 { - warn!("TimeBinnerDyn for {SELF} no more bin in edges B"); + warn!("{self_name} no more bin in edges B"); return; } } - if item.ends_before(NanoRange { - beg: self.edges[0], - end: u64::MAX, - }) { + if item.ends_before(self.agg.range().clone()) { + trace!("{self_name} IGNORE ITEM BECAUSE ends_before\n------------- -----------"); return; } else { if self.edges.len() < 2 { - warn!("TimeBinnerDyn for {SELF} edge list exhausted"); + trace!("{self_name} edge list exhausted"); return; } else { - let agg = if let Some(agg) = self.agg.as_mut() { - agg - } else { - self.agg = Some(EventValuesAggregator::new( - // We know here that we have enough edges for another bin. - // and `next_bin_range` will pop the first edge. - self.next_bin_range().unwrap(), - self.do_time_weight, - )); - self.agg.as_mut().unwrap() - }; if let Some(item) = item .as_any() // TODO make statically sure that we attempt to cast to the correct type here: - .downcast_ref::< as TimeBinnableTypeAggregator>::Input>() + .downcast_ref::< as TimeBinnableTypeAggregator>::Input>() { // TODO collect statistics associated with this request: - agg.ingest(item); - } else { - error!("not correct item type"); - }; - if item.ends_after(agg.range().clone()) { - self.cycle(); - if self.edges.len() < 2 { - warn!("TimeBinnerDyn for {SELF} no more bin in edges C"); - return; + trace!("{self_name} FEED THE ITEM..."); + self.agg.ingest(item); + if item.ends_after(self.agg.range().clone()) { + trace!("{self_name} FED ITEM, ENDS AFTER."); + self.cycle(); + if self.edges.len() < 2 { + warn!("{self_name} no more bin in edges C"); + return; + } else { + trace!("{self_name} FED ITEM, CYCLED, CONTINUE."); + } + } else { + trace!("{self_name} FED ITEM."); + break; } } else { - break; - } + panic!("{self_name} not correct item type"); + }; } } } } fn push_in_progress(&mut self, push_empty: bool) { + let self_name = std::any::type_name::(); + trace!("{self_name}::push_in_progress"); // TODO expand should be derived from AggKind. Is it still required after all? // TODO here, the expand means that agg will assume that the current value is kept constant during // the rest of the time range. - let expand = true; - let range_next = if self.agg.is_some() { - if let Some(x) = self.next_bin_range() { + if self.edges.len() >= 2 { + let expand = true; + let range_next = if let Some(x) = self.next_bin_range() { Some(x) } else { None - } - } else { - None - }; - if let Some(agg) = self.agg.as_mut() { - let bins; - if let Some(range_next) = range_next { - bins = agg.result_reset(range_next, expand); + }; + let mut bins = if let Some(range_next) = range_next { + self.agg.result_reset(range_next, expand) } else { - let range_next = NanoRange { beg: 4, end: 5 }; - bins = agg.result_reset(range_next, expand); - self.agg = None; - } + let range_next = NanoRange { + beg: u64::MAX - 1, + end: u64::MAX, + }; + self.agg.result_reset(range_next, expand) + }; assert_eq!(bins.len(), 1); if push_empty || bins.counts[0] != 0 { match self.ready.as_mut() { - Some(_ready) => { - error!("TODO eventsdim0 time binner append"); - err::todo(); - //ready.append(&mut bins); + Some(ready) => { + ready.append_all_from(&mut bins); } None => { self.ready = Some(bins); @@ -661,19 +726,18 @@ impl TimeBinner for ScalarEventsTimeBinner { } fn cycle(&mut self) { + let self_name = std::any::type_name::(); + trace!("{self_name}::cycle"); + // TODO refactor this logic. let n = self.bins_ready_count(); self.push_in_progress(true); if self.bins_ready_count() == n { - if let Some(_range) = self.next_bin_range() { - let bins = BinsDim0::::empty(); - error!("TODO eventsdim0 time binner append"); - err::todo(); - //bins.append_zero(range.beg, range.end); + if let Some(range) = self.next_bin_range() { + let mut bins = BinsDim0::::empty(); + bins.append_zero(range.beg, range.end); match self.ready.as_mut() { - Some(_ready) => { - error!("TODO eventsdim0 time binner append"); - err::todo(); - //ready.append(&mut bins); + Some(ready) => { + ready.append_all_from(&mut bins); } None => { self.ready = Some(bins); diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index 1ee1dce..1000b46 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -5,6 +5,7 @@ pub mod streams; pub mod test; use chrono::{DateTime, TimeZone, Utc}; +use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; use netpod::log::*; @@ -17,6 +18,8 @@ use std::fmt; use std::ops::ControlFlow; use std::pin::Pin; use std::task::{Context, Poll}; +use std::time::Duration; +use std::time::Instant; use streams::Collectable; use streams::ToJsonResult; @@ -201,7 +204,7 @@ pub trait AppendEmptyBin { fn append_empty_bin(&mut self, ts1: u64, ts2: u64); } -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, PartialEq, Deserialize)] pub struct IsoDateTime(DateTime); impl Serialize for IsoDateTime { @@ -237,17 +240,22 @@ pub trait TimeBinner: Send { /// Provides a time-binned representation of the implementing type. /// In contrast to `TimeBinnableType` this is meant for trait objects. pub trait TimeBinnable: fmt::Debug + WithLen + RangeOverlapInfo + Any + Send { + // TODO implementors may fail if edges contain not at least 2 entries. fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box; fn as_any(&self) -> &dyn Any; + + // TODO just a helper for the empty result. + fn to_box_to_json_result(&self) -> Box; } /// Container of some form of events, for use as trait object. pub trait Events: fmt::Debug + Any + Collectable + TimeBinnable + Send { - fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable; - fn verify(&self); + fn as_time_binnable(&self) -> &dyn TimeBinnable; + fn verify(&self) -> bool; fn output_info(&self); fn as_collectable_mut(&mut self) -> &mut dyn Collectable; fn ts_min(&self) -> Option; + fn ts_max(&self) -> Option; fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box; fn partial_eq_dyn(&self, other: &dyn Events) -> bool; } @@ -396,11 +404,13 @@ trait MergableEvents: Any { impl MergableEvents for Box { fn ts_min(&self) -> Option { - todo!() + eprintln!("TODO MergableEvents for Box"); + err::todoval() } fn ts_max(&self) -> Option { - todo!() + eprintln!("TODO MergableEvents for Box"); + err::todoval() } } @@ -421,282 +431,268 @@ impl PartialEq for ChannelEvents { } } -impl ChannelEvents { - fn forget_after_merge_process(&self) -> bool { - use ChannelEvents::*; - match self { - Events(k) => k.len() == 0, - Status(_) => true, - RangeComplete => true, - } - } -} - impl MergableEvents for ChannelEvents { fn ts_min(&self) -> Option { use ChannelEvents::*; match self { Events(k) => k.ts_min(), Status(k) => Some(k.ts), - RangeComplete => panic!(), + RangeComplete => None, } } fn ts_max(&self) -> Option { - error!("TODO MergableEvents"); - todo!() + error!("TODO impl MergableEvents for ChannelEvents"); + err::todoval() } } +type MergeInp = Pin> + Send>>; + pub struct ChannelEventsMerger { - inp1: Pin> + Send>>, - inp2: Pin> + Send>>, - inp1_done: bool, - inp2_done: bool, - inp1_item: Option, - inp2_item: Option, - out: Option, + inps: Vec>, + items: Vec>, range_complete: bool, done: bool, + done2: bool, complete: bool, } +impl fmt::Debug for ChannelEventsMerger { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let inps: Vec<_> = self.inps.iter().map(|x| x.is_some()).collect(); + fmt.debug_struct(std::any::type_name::()) + .field("inps", &inps) + .field("items", &self.items) + .field("range_complete", &self.range_complete) + .field("done", &self.done) + .field("done2", &self.done2) + .finish() + } +} + impl ChannelEventsMerger { - pub fn new( - inp1: Pin> + Send>>, - inp2: Pin> + Send>>, - ) -> Self { + pub fn new(inps: Vec) -> Self { + let n = inps.len(); Self { done: false, + done2: false, complete: false, - inp1, - inp2, - inp1_done: false, - inp2_done: false, - inp1_item: None, - inp2_item: None, - out: None, + inps: inps.into_iter().map(|x| Some(x)).collect(), + items: (0..n).into_iter().map(|_| None).collect(), range_complete: false, } } - fn handle_no_ts_event(item: &mut Option, range_complete: &mut bool) -> bool { - match item { - Some(k) => match k { - ChannelEvents::Events(_) => false, - ChannelEvents::Status(_) => false, - ChannelEvents::RangeComplete => { - *range_complete = true; - item.take(); - true - } - }, - None => false, - } - } - - fn process(mut self: Pin<&mut Self>, _cx: &mut Context) -> ControlFlow::Item>>> { - eprintln!("process {self:?}"); + fn process(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result, Error> { use ControlFlow::*; - use Poll::*; - // Some event types have no timestamp. - let gg: &mut ChannelEventsMerger = &mut self; - let &mut ChannelEventsMerger { - inp1_item: ref mut item, - range_complete: ref mut raco, - .. - } = gg; - if Self::handle_no_ts_event(item, raco) { - return Continue(()); - } - let &mut ChannelEventsMerger { - inp2_item: ref mut item, - range_complete: ref mut raco, - .. - } = gg; - if Self::handle_no_ts_event(item, raco) { - return Continue(()); - } - - // Find the two lowest ts. - let mut tsj = [None, None]; - for (i1, item) in [&self.inp1_item, &self.inp2_item].into_iter().enumerate() { - if let Some(a) = &item { - if let Some(tsmin) = a.ts_min() { - if let Some((_, k)) = tsj[0] { - if tsmin < k { - tsj[1] = tsj[0]; - tsj[0] = Some((i1, tsmin)); + let mut tslows = [None, None]; + for (i1, itemopt) in self.items.iter_mut().enumerate() { + if let Some(item) = itemopt { + let t1 = item.ts_min(); + if let Some(t1) = t1 { + if let Some((_, a)) = tslows[0] { + if t1 < a { + tslows[1] = tslows[0]; + tslows[0] = Some((i1, t1)); } else { - if let Some((_, k)) = tsj[1] { - if tsmin < k { - tsj[1] = Some((i1, tsmin)); + if let Some((_, b)) = tslows[1] { + if t1 < b { + tslows[1] = Some((i1, t1)); } else { + // nothing to do } } else { - tsj[1] = Some((i1, tsmin)); + tslows[1] = Some((i1, t1)); } } } else { - tsj[0] = Some((i1, tsmin)); + tslows[0] = Some((i1, t1)); } } else { - // TODO design such that this can't occur. - warn!("found input item without timestamp"); - //Break(Ready(Some(Err(Error::)))) + match item { + ChannelEvents::Events(_) => { + trace!("events item without ts min discovered {item:?}"); + itemopt.take(); + return Ok(Continue(())); + } + ChannelEvents::Status(_) => { + return Err(format!("channel status without timestamp").into()); + } + ChannelEvents::RangeComplete => { + trace!("--------------------- ChannelEvents::RangeComplete \n======================"); + *itemopt = None; + self.range_complete = true; + return Ok(Continue(())); + } + } } } } - eprintln!("---------- Found lowest: {tsj:?}"); - - if tsj[0].is_none() { - Continue(()) - } else if tsj[1].is_none() { - let (_ts_min, itemref) = if tsj[0].as_mut().unwrap().0 == 0 { - (tsj[0].as_ref().unwrap().1, self.inp1_item.as_mut()) + if let Some((il0, _tl0)) = tslows[0] { + if let Some((_il1, tl1)) = tslows[1] { + let item = self.items[il0].as_mut().unwrap(); + match item { + ChannelEvents::Events(item) => { + if let Some(th0) = item.ts_max() { + if th0 < tl1 { + let ret = self.items[il0].take().unwrap(); + Ok(Break(ret)) + } else { + let ritem = item.take_new_events_until_ts(tl1); + if item.len() == 0 { + // TODO should never be here + self.items[il0] = None; + } + Ok(Break(ChannelEvents::Events(ritem))) + } + } else { + // TODO should never be here because ts-max should always exist here. + let ritem = item.take_new_events_until_ts(tl1); + if item.len() == 0 {} + Ok(Break(ChannelEvents::Events(ritem))) + } + } + ChannelEvents::Status(_) => { + let ret = self.items[il0].take().unwrap(); + Ok(Break(ret)) + } + ChannelEvents::RangeComplete => Err(format!("RangeComplete considered in merge-lowest").into()), + } } else { - (tsj[0].as_ref().unwrap().1, self.inp2_item.as_mut()) - }; - if itemref.is_none() { - panic!("logic error"); - } - // Precondition: at least one event is before the requested range. - use ChannelEvents::*; - let itemout = match itemref { - Some(Events(k)) => Events(k.take_new_events_until_ts(u64::MAX)), - Some(Status(k)) => Status(k.clone()), - Some(RangeComplete) => RangeComplete, - None => panic!(), - }; - { - // TODO refactor - if tsj[0].as_mut().unwrap().0 == 0 { - if let Some(item) = self.inp1_item.as_ref() { - if item.forget_after_merge_process() { - self.inp1_item.take(); - } + let item = self.items[il0].as_mut().unwrap(); + match item { + ChannelEvents::Events(_) => { + let ret = self.items[il0].take().unwrap(); + Ok(Break(ret)) } - } - if tsj[0].as_mut().unwrap().0 == 1 { - if let Some(item) = self.inp2_item.as_ref() { - if item.forget_after_merge_process() { - self.inp2_item.take(); - } + ChannelEvents::Status(_) => { + let ret = self.items[il0].take().unwrap(); + Ok(Break(ret)) } + ChannelEvents::RangeComplete => Err(format!("RangeComplete considered in merge-lowest").into()), } } - Break(Ready(Some(Ok(itemout)))) } else { - let (ts_end, itemref) = if tsj[0].as_mut().unwrap().0 == 0 { - (tsj[1].as_ref().unwrap().1, self.inp1_item.as_mut()) - } else { - (tsj[1].as_ref().unwrap().1, self.inp2_item.as_mut()) - }; - if itemref.is_none() { - panic!("logic error"); - } - // Precondition: at least one event is before the requested range. - use ChannelEvents::*; - let itemout = match itemref { - Some(Events(k)) => Events(k.take_new_events_until_ts(ts_end)), - Some(Status(k)) => Status(k.clone()), - Some(RangeComplete) => RangeComplete, - None => panic!(), - }; - { - // TODO refactor - if tsj[0].as_mut().unwrap().0 == 0 { - if let Some(item) = self.inp1_item.as_ref() { - if item.forget_after_merge_process() { - self.inp1_item.take(); + Err(format!("after low ts search nothing found").into()) + } + } + + fn refill(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow> { + use ControlFlow::*; + use Poll::*; + let mut has_pending = false; + for i1 in 0..self.inps.len() { + let item = &self.items[i1]; + if item.is_none() { + if let Some(inp) = &mut self.inps[i1] { + match inp.poll_next_unpin(cx) { + Ready(Some(Ok(k))) => { + if let ChannelEvents::Events(events) = &k { + if events.len() == 0 { + eprintln!("ERROR bad events item {events:?}"); + } else { + trace!("\nrefilled with events {}\nREFILLED\n{:?}\n\n", events.len(), events); + } + } + self.items[i1] = Some(k); } - } - } - if tsj[0].as_mut().unwrap().0 == 1 { - if let Some(item) = self.inp2_item.as_ref() { - if item.forget_after_merge_process() { - self.inp2_item.take(); + Ready(Some(Err(e))) => return Break(Ready(e)), + Ready(None) => { + self.inps[i1] = None; + } + Pending => { + has_pending = true; } } } } - Break(Ready(Some(Ok(itemout)))) + } + if has_pending { + Break(Pending) + } else { + Continue(()) } } fn poll2(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow::Item>>> { use ControlFlow::*; use Poll::*; - if self.inp1_item.is_none() && !self.inp1_done { - match Pin::new(&mut self.inp1).poll_next(cx) { - Ready(Some(Ok(k))) => { - self.inp1_item = Some(k); - Continue(()) - } - Ready(Some(Err(e))) => Break(Ready(Some(Err(e)))), - Ready(None) => { - self.inp1_done = true; - Continue(()) - } - Pending => Break(Pending), + let mut has_pending = false; + match Self::refill(Pin::new(&mut self), cx) { + Break(Ready(e)) => return Break(Ready(Some(Err(e)))), + Break(Pending) => { + has_pending = true; } - } else if self.inp2_item.is_none() && !self.inp2_done { - match Pin::new(&mut self.inp2).poll_next(cx) { - Ready(Some(Ok(k))) => { - self.inp2_item = Some(k); - Continue(()) - } - Ready(Some(Err(e))) => Break(Ready(Some(Err(e)))), - Ready(None) => { - self.inp2_done = true; - Continue(()) - } - Pending => Break(Pending), - } - } else if self.inp1_item.is_some() || self.inp2_item.is_some() { - let process_res = Self::process(self.as_mut(), cx); - match process_res { - Continue(()) => Continue(()), - Break(k) => Break(k), + Continue(()) => {} + } + let ninps = self.inps.iter().filter(|a| a.is_some()).count(); + let nitems = self.items.iter().filter(|a| a.is_some()).count(); + let nitemsmissing = self + .inps + .iter() + .zip(self.items.iter()) + .filter(|(a, b)| a.is_some() && b.is_none()) + .count(); + if ninps == 0 && nitems == 0 { + self.done = true; + Break(Ready(None)) + } else if nitemsmissing != 0 { + if !has_pending { + let e = Error::from(format!("missing but no pending")); + Break(Ready(Some(Err(e)))) + } else { + Break(Pending) } } else { - self.done = true; - Continue(()) + match Self::process(Pin::new(&mut self), cx) { + Ok(Break(item)) => Break(Ready(Some(Ok(item)))), + Ok(Continue(())) => Continue(()), + Err(e) => Break(Ready(Some(Err(e)))), + } } } } -impl fmt::Debug for ChannelEventsMerger { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct(std::any::type_name::()) - .field("inp1_done", &self.inp1_done) - .field("inp2_done", &self.inp2_done) - .field("inp1_item", &self.inp1_item) - .field("inp2_item", &self.inp2_item) - .field("out", &self.out) - .field("range_complete", &self.range_complete) - .field("done", &self.done) - .field("complete", &self.complete) - .finish() - } -} - impl Stream for ChannelEventsMerger { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - eprintln!("ChannelEventsMerger poll_next"); + //let self_name = std::any::type_name::(); + //eprintln!("{self_name} poll_next"); loop { break if self.complete { panic!("poll after complete"); - } else if self.done { + } else if self.done2 { self.complete = true; Ready(None) + } else if self.done { + self.done2 = true; + if self.range_complete { + trace!("MERGER EMITTING ChannelEvents::RangeComplete"); + Ready(Some(Ok(ChannelEvents::RangeComplete))) + } else { + continue; + } } else { match Self::poll2(self.as_mut(), cx) { ControlFlow::Continue(()) => continue, - ControlFlow::Break(k) => break k, + ControlFlow::Break(k) => { + match &k { + Ready(Some(Ok(ChannelEvents::Events(item)))) => { + trace!("\n\nMERGER EMITTING\n{:?}\n\n", item); + } + Ready(Some(Ok(ChannelEvents::RangeComplete))) => { + trace!("\nLOGIC ERROR MERGER EMITTING PLAIN ChannelEvents::RangeComplete"); + } + Ready(Some(Err(_))) => { + self.done = true; + } + _ => {} + } + k + } } }; } @@ -705,8 +701,8 @@ impl Stream for ChannelEventsMerger { // TODO do this with some blanket impl: impl Collectable for Box { - fn new_collector(&self) -> Box { - Collectable::new_collector(self.as_ref()) + fn new_collector(&self, bin_count_exp: u32) -> Box { + Collectable::new_collector(self.as_ref(), bin_count_exp) } fn as_any_mut(&mut self) -> &mut dyn Any { @@ -715,33 +711,49 @@ impl Collectable for Box { } pub async fn binned_collected( + scalar_type: ScalarType, + shape: Shape, + agg_kind: AggKind, + edges: Vec, + timeout: Duration, inp: Pin> + Send>>, ) -> Result, Error> { + let bin_count_exp = edges.len().max(2) as u32 - 1; + let do_time_weight = agg_kind.do_time_weighted(); let mut coll = None; let mut binner = None; - let edges: Vec<_> = (0..10).into_iter().map(|t| SEC * 10 * t).collect(); - let do_time_weight = true; let mut inp = inp; - while let Some(item) = inp.next().await { - let item = item?; + let deadline = Instant::now() + timeout; + loop { + let item = futures_util::select! { + k = inp.next().fuse() => { + if let Some(k) = k { + k? + }else { + break; + } + }, + _ = tokio::time::sleep_until(deadline.into()).fuse() => { + break; + } + }; match item { ChannelEvents::Events(events) => { + trace!("binned_collected sees\n{:?}", events); if binner.is_none() { - let bb = events - .as_time_binnable_dyn() - .time_binner_new(edges.clone(), do_time_weight); + let bb = events.as_time_binnable().time_binner_new(edges.clone(), do_time_weight); binner = Some(bb); } let binner = binner.as_mut().unwrap(); - binner.ingest(events.as_time_binnable_dyn()); - eprintln!("bins_ready_count: {}", binner.bins_ready_count()); + binner.ingest(events.as_time_binnable()); + trace!("bins_ready_count: {}", binner.bins_ready_count()); if binner.bins_ready_count() > 0 { let ready = binner.bins_ready(); match ready { Some(mut ready) => { - eprintln!("ready {ready:?}"); + trace!("binned_collected ready {ready:?}"); if coll.is_none() { - coll = Some(ready.as_collectable_mut().new_collector()); + coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp)); } let cl = coll.as_mut().unwrap(); cl.ingest(ready.as_collectable_mut()); @@ -753,10 +765,10 @@ pub async fn binned_collected( } } ChannelEvents::Status(_) => { - eprintln!("TODO Status"); + trace!("binned_collected TODO Status"); } ChannelEvents::RangeComplete => { - eprintln!("TODO RangeComplete"); + trace!("binned_collected TODO RangeComplete"); } } } @@ -767,15 +779,15 @@ pub async fn binned_collected( let ready = binner.bins_ready(); match ready { Some(mut ready) => { - eprintln!("ready {ready:?}"); + trace!("binned_collected ready {ready:?}"); if coll.is_none() { - coll = Some(ready.as_collectable_mut().new_collector()); + coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp)); } let cl = coll.as_mut().unwrap(); cl.ingest(ready.as_collectable_mut()); } None => { - return Err(format!("bins_ready_count but no result").into()); + return Err(format!("binned_collected bins_ready_count but no result").into()); } } } @@ -783,14 +795,12 @@ pub async fn binned_collected( match coll { Some(mut coll) => { let res = coll.result().map_err(|e| format!("{e}"))?; - //let res = res.to_json_result().map_err(|e| format!("{e}"))?; - //let res = res.to_json_bytes().map_err(|e| format!("{e}"))?; - eprintln!("res {res:?}"); Ok(res) } None => { - //empty_binned_dyn(scalar_type, shape, agg_kind) - Err(format!("TODO produce empty result"))? + let item = empty_binned_dyn(&scalar_type, &shape, &AggKind::DimXBins1); + let ret = item.to_box_to_json_result(); + Ok(ret) } } } diff --git a/items_2/src/streams.rs b/items_2/src/streams.rs index 46771ef..73735d8 100644 --- a/items_2/src/streams.rs +++ b/items_2/src/streams.rs @@ -24,11 +24,11 @@ pub trait Collector: Send + Unpin + WithLen { pub trait CollectableType { type Collector: CollectorType; - fn new_collector() -> Self::Collector; + fn new_collector(bin_count_exp: u32) -> Self::Collector; } pub trait Collectable: Any { - fn new_collector(&self) -> Box; + fn new_collector(&self, bin_count_exp: u32) -> Box; fn as_any_mut(&mut self) -> &mut dyn Any; } @@ -53,8 +53,8 @@ impl Collector for T { } impl Collectable for T { - fn new_collector(&self) -> Box { - Box::new(T::new_collector()) as _ + fn new_collector(&self, bin_count_exp: u32) -> Box { + Box::new(T::new_collector(bin_count_exp)) as _ } fn as_any_mut(&mut self) -> &mut dyn Any { @@ -63,12 +63,25 @@ impl Collectable for T { } } +// TODO check usage of this trait pub trait ToJsonBytes { fn to_json_bytes(&self) -> Result, Error>; } +// TODO check usage of this trait pub trait ToJsonResult: fmt::Debug + Send { fn to_json_result(&self) -> Result, Error>; + fn as_any(&self) -> &dyn Any; +} + +impl ToJsonResult for serde_json::Value { + fn to_json_result(&self) -> Result, Error> { + Ok(Box::new(self.clone())) + } + + fn as_any(&self) -> &dyn Any { + self + } } impl ToJsonBytes for serde_json::Value { diff --git a/items_2/src/test.rs b/items_2/src/test.rs index 2cd0004..7e405c0 100644 --- a/items_2/src/test.rs +++ b/items_2/src/test.rs @@ -1,8 +1,12 @@ +use crate::binsdim0::BinsDim0CollectedResult; use crate::eventsdim0::EventsDim0; -use crate::{ChannelEvents, ChannelEventsMerger, ConnStatus, Empty}; -use crate::{ConnStatusEvent, Error}; +use crate::{binned_collected, ChannelEvents, ChannelEventsMerger, Empty, IsoDateTime}; +use crate::{ConnStatus, ConnStatusEvent, Error}; +use chrono::{TimeZone, Utc}; use futures_util::StreamExt; -use netpod::timeunits::SEC; +use netpod::timeunits::*; +use netpod::{AggKind, BinnedRange, NanoRange, ScalarType, Shape}; +use std::time::Duration; #[test] fn merge01() { @@ -23,7 +27,7 @@ fn merge01() { let inp2: Vec> = Vec::new(); let inp2 = futures_util::stream::iter(inp2); let inp2 = Box::pin(inp2); - let mut merger = ChannelEventsMerger::new(inp1, inp2); + let mut merger = ChannelEventsMerger::new(vec![inp1, inp2]); let item = merger.next().await; assert_eq!(item.as_ref(), events_vec2.get(0)); let item = merger.next().await; @@ -59,7 +63,7 @@ fn merge02() { let inp2: Vec> = Vec::new(); let inp2 = futures_util::stream::iter(inp2); let inp2 = Box::pin(inp2); - let mut merger = ChannelEventsMerger::new(inp1, inp2); + let mut merger = ChannelEventsMerger::new(vec![inp1, inp2]); let item = merger.next().await; assert_eq!(item.as_ref(), events_vec2.get(0)); let item = merger.next().await; @@ -122,7 +126,7 @@ fn merge03() { let inp2: Vec> = inp2_events_a; let inp2 = futures_util::stream::iter(inp2); let inp2 = Box::pin(inp2); - let mut merger = ChannelEventsMerger::new(inp1, inp2); + let mut merger = ChannelEventsMerger::new(vec![inp1, inp2]); let item = merger.next().await; assert_eq!(item.as_ref(), events_vec2.get(0)); let item = merger.next().await; @@ -151,24 +155,23 @@ fn bin01() { let inp1 = events_vec1; let inp1 = futures_util::stream::iter(inp1); let inp1 = Box::pin(inp1); - let inp2 = Box::pin(futures_util::stream::empty()); - let mut stream = ChannelEventsMerger::new(inp1, inp2); + let inp2 = Box::pin(futures_util::stream::empty()) as _; + let mut stream = ChannelEventsMerger::new(vec![inp1, inp2]); let mut coll = None; let mut binner = None; let edges: Vec<_> = (0..10).into_iter().map(|t| SEC * 10 * t).collect(); + let bin_count_exp = (edges.len() - 1) as u32; let do_time_weight = true; while let Some(item) = stream.next().await { let item = item?; match item { ChannelEvents::Events(events) => { if binner.is_none() { - let bb = events - .as_time_binnable_dyn() - .time_binner_new(edges.clone(), do_time_weight); + let bb = events.as_time_binnable().time_binner_new(edges.clone(), do_time_weight); binner = Some(bb); } let binner = binner.as_mut().unwrap(); - binner.ingest(events.as_time_binnable_dyn()); + binner.ingest(events.as_time_binnable()); eprintln!("bins_ready_count: {}", binner.bins_ready_count()); if binner.bins_ready_count() > 0 { let ready = binner.bins_ready(); @@ -176,7 +179,7 @@ fn bin01() { Some(mut ready) => { eprintln!("ready {ready:?}"); if coll.is_none() { - coll = Some(ready.as_collectable_mut().new_collector()); + coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp)); } let cl = coll.as_mut().unwrap(); cl.ingest(ready.as_collectable_mut()); @@ -204,7 +207,7 @@ fn bin01() { Some(mut ready) => { eprintln!("ready {ready:?}"); if coll.is_none() { - coll = Some(ready.as_collectable_mut().new_collector()); + coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp)); } let cl = coll.as_mut().unwrap(); cl.ingest(ready.as_collectable_mut()); @@ -230,3 +233,100 @@ fn bin01() { }; tokio::runtime::Runtime::new().unwrap().block_on(fut).unwrap(); } + +#[test] +fn bin02() { + const TSBASE: u64 = SEC * 1600000000; + fn val(ts: u64) -> f32 { + 2f32 + ((ts / SEC) % 2) as f32 + 0.2 * ((ts / (MS * 100)) % 2) as f32 + } + let fut = async { + let mut events_vec1 = Vec::new(); + let mut t = TSBASE; + for _ in 0..20 { + let mut events = EventsDim0::empty(); + for _ in 0..10 { + events.push(t, t, val(t)); + t += MS * 100; + } + events_vec1.push(Ok(ChannelEvents::Events(Box::new(events)))); + } + events_vec1.push(Ok(ChannelEvents::RangeComplete)); + let inp1 = events_vec1; + let inp1 = futures_util::stream::iter(inp1); + let inp1 = Box::pin(inp1); + let inp2 = Box::pin(futures_util::stream::empty()) as _; + let stream = ChannelEventsMerger::new(vec![inp1, inp2]); + let range = NanoRange { + beg: TSBASE + SEC * 1, + end: TSBASE + SEC * 10, + }; + let covering = BinnedRange::covering_range(range, 3).map_err(|e| format!("{e}"))?; + assert_eq!(covering.edges().len(), 10); + let stream = Box::pin(stream); + let collected = binned_collected( + ScalarType::F32, + Shape::Scalar, + AggKind::TimeWeightedScalar, + covering.edges(), + Duration::from_millis(2000), + stream, + ) + .await?; + eprintln!("collected {:?}", collected); + Ok::<_, Error>(()) + }; + tokio::runtime::Runtime::new().unwrap().block_on(fut).unwrap(); +} + +#[test] +fn bin03() { + const TSBASE: u64 = SEC * 1600000000; + fn val(ts: u64) -> f32 { + 2f32 + ((ts / SEC) % 2) as f32 + 0.2 * ((ts / (MS * 100)) % 2) as f32 + } + let fut = async { + let mut events_vec1 = Vec::new(); + let mut t = TSBASE; + for _ in 0..20 { + let mut events = EventsDim0::empty(); + for _ in 0..10 { + events.push(t, t, val(t)); + t += MS * 100; + } + events_vec1.push(Ok(ChannelEvents::Events(Box::new(events)))); + } + events_vec1.push(Ok(ChannelEvents::RangeComplete)); + let inp1 = events_vec1; + let inp1 = futures_util::stream::iter(inp1).enumerate().then(|(i, k)| async move { + if i == 4 { + let _ = tokio::time::sleep(Duration::from_millis(10000)).await; + } + k + }); + let edges = (0..10).into_iter().map(|x| TSBASE + SEC * 1 + SEC * x).collect(); + let inp1 = Box::pin(inp1) as _; + let timeout = Duration::from_millis(400); + let res = binned_collected( + ScalarType::F32, + Shape::Scalar, + AggKind::TimeWeightedScalar, + edges, + timeout, + inp1, + ) + .await?; + let r2: &BinsDim0CollectedResult = res.as_any().downcast_ref().expect("res seems wrong type"); + assert_eq!(SEC * r2.ts_anchor_sec(), TSBASE + SEC); + assert_eq!(r2.counts(), &[10, 10, 10]); + assert_eq!(r2.mins(), &[3.0, 2.0, 3.0]); + assert_eq!(r2.maxs(), &[3.2, 2.2, 3.2]); + assert_eq!(r2.missing_bins(), 6); + assert_eq!( + r2.continue_at(), + Some(IsoDateTime(Utc.timestamp_nanos((TSBASE + SEC * 4) as i64))) + ); + Ok::<_, Error>(()) + }; + tokio::runtime::Runtime::new().unwrap().block_on(fut).unwrap(); +} diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 93b022f..e03c2a8 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -1050,24 +1050,32 @@ const PATCH_T_LEN_OPTIONS_WAVE: [u64; 3] = [ DAY * 32, ]; -const BIN_THRESHOLDS: [u64; 31] = [ - 2, - 10, - 100, - 1000, - 10_000, - 100_000, +const BIN_THRESHOLDS: [u64; 39] = [ MU, + MU * 2, + MU * 5, MU * 10, + MU * 20, + MU * 50, MU * 100, + MU * 200, + MU * 500, MS, + MS * 2, + MS * 5, MS * 10, + MS * 20, + MS * 50, MS * 100, + MS * 200, + MS * 500, SEC, + SEC * 2, SEC * 5, SEC * 10, SEC * 20, MIN, + MIN * 2, MIN * 5, MIN * 10, MIN * 20, @@ -1386,7 +1394,7 @@ pub struct BinnedRange { } impl BinnedRange { - pub fn covering_range(range: NanoRange, min_bin_count: u32) -> Result, Error> { + pub fn covering_range(range: NanoRange, min_bin_count: u32) -> Result { let thresholds = &BIN_THRESHOLDS; if min_bin_count < 1 { Err(Error::with_msg("min_bin_count < 1"))?; @@ -1402,7 +1410,7 @@ impl BinnedRange { let mut i1 = thresholds.len(); loop { if i1 <= 0 { - break Ok(None); + panic!(); } else { i1 -= 1; let t = thresholds[i1]; @@ -1418,7 +1426,7 @@ impl BinnedRange { count, offset, }; - break Ok(Some(ret)); + break Ok(ret); } } } @@ -1433,10 +1441,21 @@ impl BinnedRange { pub fn full_range(&self) -> NanoRange { NanoRange { - beg: (self.offset + 0) * self.grid_spec.bin_t_len, + beg: self.offset * self.grid_spec.bin_t_len, end: (self.offset + self.count) * self.grid_spec.bin_t_len, } } + + pub fn edges(&self) -> Vec { + let mut ret = Vec::new(); + let mut t = self.offset * self.grid_spec.bin_t_len; + let end = (self.offset + self.count) * self.grid_spec.bin_t_len; + while t <= end { + ret.push(t); + t += self.grid_spec.bin_t_len; + } + ret + } } #[derive(Clone, Serialize, Deserialize)] diff --git a/scyllaconn/src/bincache.rs b/scyllaconn/src/bincache.rs index 0924e36..39fdc4c 100644 --- a/scyllaconn/src/bincache.rs +++ b/scyllaconn/src/bincache.rs @@ -376,7 +376,7 @@ pub async fn fetch_uncached_binned_events( }; match item { Ok(ChannelEvents::Events(item)) => { - time_binner.ingest(item.as_time_binnable_dyn()); + time_binner.ingest(item.as_time_binnable()); // TODO could also ask the binner here whether we are "complete" to stop sending useless data. } Ok(ChannelEvents::Status(_)) => { diff --git a/scyllaconn/src/events.rs b/scyllaconn/src/events.rs index abfcee4..dd0f64a 100644 --- a/scyllaconn/src/events.rs +++ b/scyllaconn/src/events.rs @@ -2,7 +2,7 @@ use crate::errconv::ErrConv; use err::Error; use futures_util::{Future, FutureExt, Stream, StreamExt}; use items_2::eventsdim0::EventsDim0; -use items_2::{ChannelEvents, ConnStatus, ConnStatusEvent, Empty, Events}; +use items_2::{ChannelEvents, ConnStatus, ConnStatusEvent, Empty, Events, WithLen}; use netpod::log::*; use netpod::query::{ChannelStateEventsQuery, PlainEventsQuery}; use netpod::timeunits::*; @@ -17,16 +17,14 @@ use tokio_postgres::Client as PgClient; macro_rules! read_values { ($fname:ident, $self:expr, $ts_msp:expr) => {{ let fut = $fname($self.series, $ts_msp, $self.range.clone(), $self.fwd, $self.scy.clone()); - let fut = fut.map(|x| { - match x { - Ok(k) => { - // TODO why static needed? - //let b = Box::new(k) as Box; - let b = Box::new(k) as Box; - Ok(b) - } - Err(e) => Err(e), + let fut = fut.map(|x| match x { + Ok(k) => { + let self_name = std::any::type_name::(); + info!("{self_name} read values len {}", k.len()); + let b = Box::new(k) as Box; + Ok(b) } + Err(e) => Err(e), }); let fut = Box::pin(fut) as Pin, Error>> + Send>>; fut @@ -347,7 +345,6 @@ impl Stream for EventsStreamScylla { }, FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) { Ready(Ok(item)) => { - info!("read values"); item.verify(); item.output_info(); if !st.next() { @@ -365,7 +362,7 @@ impl Stream for EventsStreamScylla { } } -async fn find_series(channel: &Channel, pgclient: Arc) -> Result<(u64, ScalarType, Shape), Error> { +pub async fn find_series(channel: &Channel, pgclient: Arc) -> Result<(u64, ScalarType, Shape), Error> { info!("find_series channel {:?}", channel); let rows = if let Some(series) = channel.series() { let q = "select series, facility, channel, scalar_type, shape_dims from series_by_channel where series = $1"; @@ -549,13 +546,12 @@ read_next_array_values!(read_next_values_array_u16, u16, i16, "events_wave_u16") pub async fn make_scylla_stream( evq: &PlainEventsQuery, + series: u64, + scalar_type: ScalarType, + shape: Shape, scy: Arc, - pgclient: &Arc, do_test_stream_error: bool, ) -> Result> + Send>>, Error> { - // Need to know details about the series in order to fetch data: - // TODO should query already contain ScalarType and Shape? - let (series, scalar_type, shape) = { find_series(evq.channel(), pgclient.clone()).await? }; let res = Box::pin(EventsStreamScylla::new( series, evq,