From 3795a5782620d26a19d3e2ba4c11ebc489d1c7f1 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 6 Sep 2022 12:54:21 +0200 Subject: [PATCH] Merger for events and channel status --- .gitignore | 1 + items_2/Cargo.toml | 1 + items_2/src/binsdim0.rs | 149 +++++++++----- items_2/src/eventsdim0.rs | 134 +++++++----- items_2/src/items_2.rs | 410 +++++++++++++++++++++++++++++-------- items_2/src/test.rs | 137 +++++++++++++ scyllaconn/src/bincache.rs | 13 +- 7 files changed, 640 insertions(+), 205 deletions(-) create mode 100644 items_2/src/test.rs diff --git a/.gitignore b/.gitignore index 9018c9f..055b30d 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ /.idea /.vscode /tmpdata +/tmpdoc /docs diff --git a/items_2/Cargo.toml b/items_2/Cargo.toml index eb8c34a..7cbd5eb 100644 --- a/items_2/Cargo.toml +++ b/items_2/Cargo.toml @@ -17,6 +17,7 @@ num-traits = "0.2.15" chrono = { version = "0.4.19", features = ["serde"] } crc32fast = "1.3.2" futures-util = "0.3.24" +tokio = { version = "1.20", features = ["rt-multi-thread", "sync"] } err = { path = "../err" } items_proc = { path = "../items_proc" } netpod = { path = "../netpod" } diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs index 888f1fd..72f5732 100644 --- a/items_2/src/binsdim0.rs +++ b/items_2/src/binsdim0.rs @@ -1,8 +1,6 @@ use crate::streams::{CollectableType, CollectorType, ToJsonResult}; -use crate::{ - ts_offs_from_abs, AppendEmptyBin, Empty, IsoDateTime, ScalarOps, TimeBinnable, TimeBinnableType, - TimeBinnableTypeAggregator, TimeBinned, TimeBinner, TimeSeries, WithLen, -}; +use crate::{ts_offs_from_abs, AppendEmptyBin, Empty, IsoDateTime, RangeOverlapInfo, ScalarOps, TimeBins, WithLen}; +use crate::{TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinned, TimeBinner}; use chrono::{TimeZone, Utc}; use err::Error; use netpod::log::*; @@ -17,12 +15,12 @@ use std::{fmt, mem}; #[derive(Clone, Serialize, Deserialize)] pub struct MinMaxAvgDim0Bins { - pub ts1s: Vec, - pub ts2s: Vec, - pub counts: Vec, - pub mins: Vec, - pub maxs: Vec, - pub avgs: Vec, + pub ts1s: VecDeque, + pub ts2s: VecDeque, + pub counts: VecDeque, + pub mins: VecDeque, + pub maxs: VecDeque, + pub avgs: VecDeque, } impl fmt::Debug for MinMaxAvgDim0Bins @@ -48,12 +46,12 @@ where impl MinMaxAvgDim0Bins { pub fn empty() -> Self { Self { - ts1s: vec![], - ts2s: vec![], - counts: vec![], - mins: vec![], - maxs: vec![], - avgs: vec![], + ts1s: VecDeque::new(), + ts2s: VecDeque::new(), + counts: VecDeque::new(), + mins: VecDeque::new(), + maxs: VecDeque::new(), + avgs: VecDeque::new(), } } } @@ -64,41 +62,71 @@ impl WithLen for MinMaxAvgDim0Bins { } } +impl RangeOverlapInfo for MinMaxAvgDim0Bins { + fn ends_before(&self, range: NanoRange) -> bool { + if let Some(&max) = self.ts2s.back() { + max <= range.beg + } else { + true + } + } + + fn ends_after(&self, range: NanoRange) -> bool { + if let Some(&max) = self.ts2s.back() { + max > range.end + } else { + true + } + } + + fn starts_after(&self, range: NanoRange) -> bool { + if let Some(&min) = self.ts1s.front() { + min >= range.end + } else { + true + } + } +} + impl Empty for MinMaxAvgDim0Bins { fn empty() -> Self { Self { - ts1s: Vec::new(), - ts2s: Vec::new(), - counts: Vec::new(), - mins: Vec::new(), - maxs: Vec::new(), - avgs: Vec::new(), + ts1s: Default::default(), + ts2s: Default::default(), + counts: Default::default(), + mins: Default::default(), + maxs: Default::default(), + avgs: Default::default(), } } } impl AppendEmptyBin for MinMaxAvgDim0Bins { fn append_empty_bin(&mut self, ts1: u64, ts2: u64) { - self.ts1s.push(ts1); - self.ts2s.push(ts2); - self.counts.push(0); - self.mins.push(NTY::zero()); - self.maxs.push(NTY::zero()); - self.avgs.push(0.); + self.ts1s.push_back(ts1); + self.ts2s.push_back(ts2); + self.counts.push_back(0); + self.mins.push_back(NTY::zero()); + self.maxs.push_back(NTY::zero()); + self.avgs.push_back(0.); } } -impl TimeSeries for MinMaxAvgDim0Bins { +impl TimeBins for MinMaxAvgDim0Bins { fn ts_min(&self) -> Option { - todo!("collection of bins can not be TimeSeries") + self.ts1s.front().map(Clone::clone) } fn ts_max(&self) -> Option { - todo!("collection of bins can not be TimeSeries") + self.ts2s.back().map(Clone::clone) } fn ts_min_max(&self) -> Option<(u64, u64)> { - todo!("collection of bins can not be TimeSeries") + if let (Some(min), Some(max)) = (self.ts1s.front().map(Clone::clone), self.ts2s.back().map(Clone::clone)) { + Some((min, max)) + } else { + None + } } } @@ -131,13 +159,13 @@ pub struct MinMaxAvgBinsCollectedResult { #[serde(rename = "tsAnchor")] ts_anchor_sec: u64, #[serde(rename = "tsMs")] - ts_off_ms: Vec, + ts_off_ms: VecDeque, #[serde(rename = "tsNs")] - ts_off_ns: Vec, - counts: Vec, - mins: Vec, - maxs: Vec, - avgs: Vec, + ts_off_ns: VecDeque, + counts: VecDeque, + mins: VecDeque, + maxs: VecDeque, + avgs: VecDeque, #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")] finalised_range: bool, #[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")] @@ -196,12 +224,12 @@ impl CollectorType for MinMaxAvgBinsCollector { // TODO could save the copy: let mut ts_all = self.vals.ts1s.clone(); if self.vals.ts2s.len() > 0 { - ts_all.push(*self.vals.ts2s.last().unwrap()); + ts_all.push_back(*self.vals.ts2s.back().unwrap()); } info!("TODO return proper continueAt"); let bin_count_exp = 100 as u32; let continue_at = if self.vals.ts1s.len() < bin_count_exp as usize { - match ts_all.last() { + match ts_all.back() { Some(&k) => { let iso = IsoDateTime(Utc.timestamp_nanos(k as i64)); Some(iso) @@ -211,11 +239,14 @@ impl CollectorType for MinMaxAvgBinsCollector { } else { None }; - let tst = ts_offs_from_abs(&ts_all); - let counts = mem::replace(&mut self.vals.counts, Vec::new()); - let mins = mem::replace(&mut self.vals.mins, Vec::new()); - let maxs = mem::replace(&mut self.vals.maxs, Vec::new()); - let avgs = mem::replace(&mut self.vals.avgs, Vec::new()); + if ts_all.as_slices().1.len() != 0 { + panic!(); + } + let tst = ts_offs_from_abs(ts_all.as_slices().0); + let counts = mem::replace(&mut self.vals.counts, VecDeque::new()); + let mins = mem::replace(&mut self.vals.mins, VecDeque::new()); + let maxs = mem::replace(&mut self.vals.maxs, VecDeque::new()); + let avgs = mem::replace(&mut self.vals.avgs, VecDeque::new()); let ret = MinMaxAvgBinsCollectedResult:: { ts_anchor_sec: tst.0, ts_off_ms: tst.1, @@ -302,12 +333,12 @@ impl TimeBinnableTypeAggregator for MinMaxAvgDim0BinsAggregator< self.avg = self.sum / self.sumc as f32; } let ret = Self::Output { - ts1s: vec![self.range.beg], - ts2s: vec![self.range.end], - counts: vec![self.count], - mins: vec![self.min.clone()], - maxs: vec![self.max.clone()], - avgs: vec![self.avg], + ts1s: [self.range.beg].into(), + ts2s: [self.range.end].into(), + counts: [self.count].into(), + mins: [self.min.clone()].into(), + maxs: [self.max.clone()].into(), + avgs: [self.avg].into(), }; self.range = range; self.count = 0; @@ -499,23 +530,33 @@ impl TimeBinned for MinMaxAvgDim0Bins { } fn edges_slice(&self) -> (&[u64], &[u64]) { - (&self.ts1s[..], &self.ts2s[..]) + if self.ts1s.as_slices().1.len() != 0 { + panic!(); + } + if self.ts2s.as_slices().1.len() != 0 { + panic!(); + } + (&self.ts1s.as_slices().0, &self.ts2s.as_slices().0) } fn counts(&self) -> &[u64] { - &self.counts[..] + // TODO check for contiguous + self.counts.as_slices().0 } + // TODO is Vec needed? fn mins(&self) -> Vec { self.mins.iter().map(|x| x.clone().as_prim_f32()).collect() } + // TODO is Vec needed? fn maxs(&self) -> Vec { self.maxs.iter().map(|x| x.clone().as_prim_f32()).collect() } + // TODO is Vec needed? fn avgs(&self) -> Vec { - self.avgs.clone() + self.avgs.iter().map(Clone::clone).collect() } fn validate(&self) -> Result<(), String> { diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 09ceb45..d48a25f 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -1,10 +1,8 @@ use crate::binsdim0::MinMaxAvgDim0Bins; use crate::streams::{CollectableType, CollectorType, ToJsonResult}; -use crate::{pulse_offs_from_abs, ts_offs_from_abs}; -use crate::{ - Empty, Events, ScalarOps, TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinner, TimeSeries, - WithLen, -}; +use crate::{pulse_offs_from_abs, ts_offs_from_abs, RangeOverlapInfo}; +use crate::{Empty, Events, ScalarOps, WithLen}; +use crate::{TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinner}; use err::Error; use netpod::log::*; use netpod::NanoRange; @@ -13,30 +11,28 @@ use std::any::Any; use std::collections::VecDeque; use std::{fmt, mem}; -// TODO in this module reduce clones. - -#[derive(Serialize, Deserialize)] +#[derive(Clone, PartialEq, Serialize, Deserialize)] pub struct EventsDim0 { - pub tss: Vec, - pub pulses: Vec, - pub values: Vec, + pub tss: VecDeque, + pub pulses: VecDeque, + pub values: VecDeque, } impl EventsDim0 { #[inline(always)] pub fn push(&mut self, ts: u64, pulse: u64, value: NTY) { - self.tss.push(ts); - self.pulses.push(pulse); - self.values.push(value); + self.tss.push_back(ts); + self.pulses.push_back(pulse); + self.values.push_back(value); } } impl Empty for EventsDim0 { fn empty() -> Self { Self { - tss: vec![], - pulses: vec![], - values: vec![], + tss: VecDeque::new(), + pulses: VecDeque::new(), + values: VecDeque::new(), } } } @@ -50,10 +46,10 @@ where fmt, "count {} ts {:?} .. {:?} vals {:?} .. {:?}", self.tss.len(), - self.tss.first(), - self.tss.last(), - self.values.first(), - self.values.last(), + self.tss.front(), + self.tss.back(), + self.values.front(), + self.values.back(), ) } } @@ -64,20 +60,28 @@ impl WithLen for EventsDim0 { } } -impl TimeSeries for EventsDim0 { - fn ts_min(&self) -> Option { - self.tss.first().map(Clone::clone) - } - - fn ts_max(&self) -> Option { - self.tss.last().map(Clone::clone) - } - - fn ts_min_max(&self) -> Option<(u64, u64)> { - if self.tss.len() == 0 { - None +impl RangeOverlapInfo for EventsDim0 { + fn ends_before(&self, range: NanoRange) -> bool { + if let Some(&max) = self.tss.back() { + max < range.beg } else { - Some((self.tss.first().unwrap().clone(), self.tss.last().unwrap().clone())) + true + } + } + + fn ends_after(&self, range: NanoRange) -> bool { + if let Some(&max) = self.tss.back() { + max >= range.end + } else { + true + } + } + + fn starts_after(&self, range: NanoRange) -> bool { + if let Some(&min) = self.tss.front() { + min >= range.end + } else { + true } } } @@ -123,14 +127,14 @@ pub struct EventValuesCollectorOutput { #[serde(rename = "tsAnchor")] ts_anchor_sec: u64, #[serde(rename = "tsMs")] - ts_off_ms: Vec, + ts_off_ms: VecDeque, #[serde(rename = "tsNs")] - ts_off_ns: Vec, + ts_off_ns: VecDeque, #[serde(rename = "pulseAnchor")] pulse_anchor: u64, #[serde(rename = "pulseOff")] - pulse_off: Vec, - values: Vec, + pulse_off: VecDeque, + values: VecDeque, #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")] range_complete: bool, #[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")] @@ -164,15 +168,16 @@ impl CollectorType for EventValuesCollector { } fn result(&mut self) -> Result { - let tst = ts_offs_from_abs(&self.vals.tss); - let (pulse_anchor, pulse_off) = pulse_offs_from_abs(&self.vals.pulses); + // TODO require contiguous slices + let tst = ts_offs_from_abs(&self.vals.tss.as_slices().0); + let (pulse_anchor, pulse_off) = pulse_offs_from_abs(&self.vals.pulses.as_slices().0); let ret = Self::Output { ts_anchor_sec: tst.0, ts_off_ms: tst.1, ts_off_ns: tst.2, pulse_anchor, - pulse_off, - values: mem::replace(&mut self.vals.values, Vec::new()), + pulse_off: pulse_off, + values: mem::replace(&mut self.vals.values, VecDeque::new()), range_complete: self.range_complete, timed_out: self.timed_out, }; @@ -345,12 +350,12 @@ impl EventValuesAggregator { (g.clone(), g.clone(), g.as_prim_f32()) }; let ret = MinMaxAvgDim0Bins { - ts1s: vec![self.range.beg], - ts2s: vec![self.range.end], - counts: vec![self.count], - mins: vec![min], - maxs: vec![max], - avgs: vec![avg], + ts1s: [self.range.beg].into(), + ts2s: [self.range.end].into(), + counts: [self.count].into(), + mins: [min].into(), + maxs: [max].into(), + avgs: [avg].into(), }; self.int_ts = range.beg; self.range = range; @@ -379,12 +384,12 @@ impl EventValuesAggregator { (g.clone(), g.clone(), g.as_prim_f32()) }; let ret = MinMaxAvgDim0Bins { - ts1s: vec![self.range.beg], - ts2s: vec![self.range.end], - counts: vec![self.count], - mins: vec![min], - maxs: vec![max], - avgs: vec![avg], + ts1s: [self.range.beg].into(), + ts2s: [self.range.end].into(), + counts: [self.count].into(), + mins: [min].into(), + maxs: [max].into(), + avgs: [avg].into(), }; self.int_ts = range.beg; self.range = range; @@ -474,6 +479,27 @@ impl Events for EventsDim0 { fn as_collectable_mut(&mut self) -> &mut dyn crate::streams::Collectable { self } + + fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box { + let n1 = self.tss.iter().take_while(|&&x| x < ts_end).count(); + let tss = self.tss.drain(..n1).collect(); + let pulses = self.pulses.drain(..n1).collect(); + let values = self.values.drain(..n1).collect(); + let ret = Self { tss, pulses, values }; + Box::new(ret) + } + + fn ts_min(&self) -> Option { + self.tss.front().map(|&x| x) + } + + fn partial_eq_dyn(&self, other: &dyn Events) -> bool { + if let Some(other) = other.as_any().downcast_ref::() { + self == other + } else { + false + } + } } pub struct ScalarEventsTimeBinner { diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index 5185cd5..0de6b2b 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -1,15 +1,19 @@ pub mod binsdim0; pub mod eventsdim0; pub mod streams; +#[cfg(test)] +pub mod test; use chrono::{DateTime, TimeZone, Utc}; use futures_util::Stream; -use netpod::log::error; +use netpod::log::*; use netpod::timeunits::*; use netpod::{AggKind, NanoRange, ScalarType, Shape}; use serde::{Deserialize, Serialize, Serializer}; use std::any::Any; +use std::collections::VecDeque; use std::fmt; +use std::ops::ControlFlow; use std::pin::Pin; use std::task::{Context, Poll}; use streams::Collectable; @@ -18,10 +22,11 @@ pub fn bool_is_false(x: &bool) -> bool { *x == false } -pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, Vec, Vec) { +// TODO take iterator instead of slice, because a VecDeque can't produce a slice in general. +pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, VecDeque, VecDeque) { let ts_anchor_sec = tss.first().map_or(0, |&k| k) / SEC; let ts_anchor_ns = ts_anchor_sec * SEC; - let ts_off_ms: Vec<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect(); + let ts_off_ms: VecDeque<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect(); let ts_off_ns = tss .iter() .zip(ts_off_ms.iter().map(|&k| k * MS)) @@ -30,9 +35,10 @@ pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, Vec, Vec) { (ts_anchor_sec, ts_off_ms, ts_off_ns) } -pub fn pulse_offs_from_abs(pulse: &[u64]) -> (u64, Vec) { +// TODO take iterator instead of slice, because a VecDeque can't produce a slice in general. +pub fn pulse_offs_from_abs(pulse: &[u64]) -> (u64, VecDeque) { let pulse_anchor = pulse.first().map_or(0, |k| *k); - let pulse_off: Vec<_> = pulse.iter().map(|k| *k - pulse_anchor).collect(); + let pulse_off = pulse.iter().map(|k| *k - pulse_anchor).collect(); (pulse_anchor, pulse_off) } @@ -104,7 +110,14 @@ impl_num_ops!(f64, 0.); #[allow(unused)] struct Ts(u64); -struct Error { +#[derive(Debug, PartialEq)] +pub enum ErrorKind { + #[allow(unused)] + MismatchedType, +} + +#[derive(Debug, PartialEq)] +pub struct Error { #[allow(unused)] kind: ErrorKind, } @@ -115,16 +128,12 @@ impl From for Error { } } -enum ErrorKind { - #[allow(unused)] - MismatchedType, -} - pub trait WithLen { fn len(&self) -> usize; } -pub trait TimeSeries { +// TODO can probably be removed. +pub trait TimeBins { fn ts_min(&self) -> Option; fn ts_max(&self) -> Option; fn ts_min_max(&self) -> Option<(u64, u64)>; @@ -140,71 +149,12 @@ pub enum Fits { PartlyLowerAndGreater, } -// TODO can this be removed? -pub trait FitsInside { - fn fits_inside(&self, range: NanoRange) -> Fits; -} - -impl FitsInside for T -where - T: TimeSeries, -{ - fn fits_inside(&self, range: NanoRange) -> Fits { - if let Some((min, max)) = self.ts_min_max() { - if max <= range.beg { - Fits::Lower - } else if min >= range.end { - Fits::Greater - } else if min < range.beg && max > range.end { - Fits::PartlyLowerAndGreater - } else if min < range.beg { - Fits::PartlyLower - } else if max > range.end { - Fits::PartlyGreater - } else { - Fits::Inside - } - } else { - Fits::Empty - } - } -} - pub trait RangeOverlapInfo { fn ends_before(&self, range: NanoRange) -> bool; fn ends_after(&self, range: NanoRange) -> bool; fn starts_after(&self, range: NanoRange) -> bool; } -impl RangeOverlapInfo for T -where - T: TimeSeries, -{ - fn ends_before(&self, range: NanoRange) -> bool { - if let Some(max) = self.ts_max() { - max <= range.beg - } else { - true - } - } - - fn ends_after(&self, range: NanoRange) -> bool { - if let Some(max) = self.ts_max() { - max > range.end - } else { - true - } - } - - fn starts_after(&self, range: NanoRange) -> bool { - if let Some(min) = self.ts_min() { - min >= range.end - } else { - true - } - } -} - pub trait EmptyForScalarTypeShape { fn empty(scalar_type: ScalarType, shape: Shape) -> Self; } @@ -262,11 +212,20 @@ pub trait TimeBinnable: WithLen + RangeOverlapInfo + Any + Send { } /// Container of some form of events, for use as trait object. -pub trait Events: Collectable + TimeBinnable { +pub trait Events: fmt::Debug + Any + Collectable + TimeBinnable { fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable; fn verify(&self); fn output_info(&self); fn as_collectable_mut(&mut self) -> &mut dyn Collectable; + fn ts_min(&self) -> Option; + fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box; + fn partial_eq_dyn(&self, other: &dyn Events) -> bool; +} + +impl PartialEq for Box { + fn eq(&self, other: &Self) -> bool { + Events::partial_eq_dyn(self.as_ref(), other.as_ref()) + } } /// Data in time-binned form. @@ -387,60 +346,329 @@ pub fn empty_binned_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggK } } -pub enum ConnStatus {} +#[derive(Clone, Debug, PartialEq)] +pub enum ConnStatus { + Connect, + Disconnect, +} +#[derive(Clone, Debug, PartialEq)] pub struct ConnStatusEvent { pub ts: u64, pub status: ConnStatus, } trait MergableEvents: Any { - fn len(&self) -> usize; fn ts_min(&self) -> Option; fn ts_max(&self) -> Option; - fn take_from(&mut self, src: &mut dyn MergableEvents, ts_end: u64) -> Result<(), Error>; } +impl MergableEvents for Box { + fn ts_min(&self) -> Option { + todo!() + } + + fn ts_max(&self) -> Option { + todo!() + } +} + +#[derive(Debug)] pub enum ChannelEvents { Events(Box), Status(ConnStatusEvent), RangeComplete, } -impl MergableEvents for ChannelEvents { - fn len(&self) -> usize { - error!("TODO MergableEvents"); - todo!() +impl PartialEq for ChannelEvents { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::Events(l0), Self::Events(r0)) => l0 == r0, + (Self::Status(l0), Self::Status(r0)) => l0 == r0, + _ => core::mem::discriminant(self) == core::mem::discriminant(other), + } } +} +impl ChannelEvents { + fn forget_after_merge_process(&self) -> bool { + use ChannelEvents::*; + match self { + Events(k) => k.len() == 0, + Status(_) => true, + RangeComplete => true, + } + } +} + +impl MergableEvents for ChannelEvents { fn ts_min(&self) -> Option { - error!("TODO MergableEvents"); - todo!() + use ChannelEvents::*; + match self { + Events(k) => k.ts_min(), + Status(k) => Some(k.ts), + RangeComplete => panic!(), + } } fn ts_max(&self) -> Option { error!("TODO MergableEvents"); todo!() } +} - fn take_from(&mut self, _src: &mut dyn MergableEvents, _ts_end: u64) -> Result<(), Error> { - error!("TODO MergableEvents"); - todo!() +pub struct ChannelEventsMerger { + inp1: Pin>>>, + inp2: Pin>>>, + inp1_done: bool, + inp2_done: bool, + inp1_item: Option, + inp2_item: Option, + out: Option, + range_complete: bool, + done: bool, + complete: bool, +} + +impl ChannelEventsMerger { + pub fn new( + inp1: Pin>>>, + inp2: Pin>>>, + ) -> Self { + Self { + done: false, + complete: false, + inp1, + inp2, + inp1_done: false, + inp2_done: false, + inp1_item: None, + inp2_item: None, + out: None, + range_complete: false, + } + } + + fn handle_no_ts_event(item: &mut Option, range_complete: &mut bool) -> bool { + match item { + Some(k) => match k { + ChannelEvents::Events(_) => false, + ChannelEvents::Status(_) => false, + ChannelEvents::RangeComplete => { + *range_complete = true; + item.take(); + true + } + }, + None => false, + } + } + + fn process(mut self: Pin<&mut Self>, _cx: &mut Context) -> ControlFlow::Item>>> { + eprintln!("process {self:?}"); + use ControlFlow::*; + use Poll::*; + // Some event types have no timestamp. + let gg: &mut ChannelEventsMerger = &mut self; + let &mut ChannelEventsMerger { + inp1_item: ref mut item, + range_complete: ref mut raco, + .. + } = gg; + if Self::handle_no_ts_event(item, raco) { + return Continue(()); + } + let &mut ChannelEventsMerger { + inp2_item: ref mut item, + range_complete: ref mut raco, + .. + } = gg; + if Self::handle_no_ts_event(item, raco) { + return Continue(()); + } + + // Find the two lowest ts. + let mut tsj = [None, None]; + for (i1, item) in [&self.inp1_item, &self.inp2_item].into_iter().enumerate() { + if let Some(a) = &item { + if let Some(tsmin) = a.ts_min() { + if let Some((_, k)) = tsj[0] { + if tsmin < k { + tsj[1] = tsj[0]; + tsj[0] = Some((i1, tsmin)); + } else { + if let Some((_, k)) = tsj[1] { + if tsmin < k { + tsj[1] = Some((i1, tsmin)); + } else { + } + } else { + tsj[1] = Some((i1, tsmin)); + } + } + } else { + tsj[0] = Some((i1, tsmin)); + } + } else { + // TODO design such that this can't occur. + warn!("found input item without timestamp"); + //Break(Ready(Some(Err(Error::)))) + } + } + } + eprintln!("---------- Found lowest: {tsj:?}"); + + if tsj[0].is_none() { + Continue(()) + } else if tsj[1].is_none() { + let (_ts_min, itemref) = if tsj[0].as_mut().unwrap().0 == 0 { + (tsj[0].as_ref().unwrap().1, self.inp1_item.as_mut()) + } else { + (tsj[0].as_ref().unwrap().1, self.inp2_item.as_mut()) + }; + if itemref.is_none() { + panic!("logic error"); + } + // Precondition: at least one event is before the requested range. + use ChannelEvents::*; + let itemout = match itemref { + Some(Events(k)) => Events(k.take_new_events_until_ts(u64::MAX)), + Some(Status(k)) => Status(k.clone()), + Some(RangeComplete) => RangeComplete, + None => panic!(), + }; + { + // TODO refactor + if tsj[0].as_mut().unwrap().0 == 0 { + if let Some(item) = self.inp1_item.as_ref() { + if item.forget_after_merge_process() { + self.inp1_item.take(); + } + } + } + if tsj[0].as_mut().unwrap().0 == 1 { + if let Some(item) = self.inp2_item.as_ref() { + if item.forget_after_merge_process() { + self.inp2_item.take(); + } + } + } + } + Break(Ready(Some(Ok(itemout)))) + } else { + let (ts_end, itemref) = if tsj[0].as_mut().unwrap().0 == 0 { + (tsj[1].as_ref().unwrap().1, self.inp1_item.as_mut()) + } else { + (tsj[1].as_ref().unwrap().1, self.inp2_item.as_mut()) + }; + if itemref.is_none() { + panic!("logic error"); + } + // Precondition: at least one event is before the requested range. + use ChannelEvents::*; + let itemout = match itemref { + Some(Events(k)) => Events(k.take_new_events_until_ts(ts_end)), + Some(Status(k)) => Status(k.clone()), + Some(RangeComplete) => RangeComplete, + None => panic!(), + }; + { + // TODO refactor + if tsj[0].as_mut().unwrap().0 == 0 { + if let Some(item) = self.inp1_item.as_ref() { + if item.forget_after_merge_process() { + self.inp1_item.take(); + } + } + } + if tsj[0].as_mut().unwrap().0 == 1 { + if let Some(item) = self.inp2_item.as_ref() { + if item.forget_after_merge_process() { + self.inp2_item.take(); + } + } + } + } + Break(Ready(Some(Ok(itemout)))) + } + } + + fn poll2(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow::Item>>> { + use ControlFlow::*; + use Poll::*; + if self.inp1_item.is_none() && !self.inp1_done { + match Pin::new(&mut self.inp1).poll_next(cx) { + Ready(Some(Ok(k))) => { + self.inp1_item = Some(k); + Continue(()) + } + Ready(Some(Err(e))) => Break(Ready(Some(Err(e)))), + Ready(None) => { + self.inp1_done = true; + Continue(()) + } + Pending => Break(Pending), + } + } else if self.inp2_item.is_none() && !self.inp2_done { + match Pin::new(&mut self.inp2).poll_next(cx) { + Ready(Some(Ok(k))) => { + self.inp2_item = Some(k); + Continue(()) + } + Ready(Some(Err(e))) => Break(Ready(Some(Err(e)))), + Ready(None) => { + self.inp2_done = true; + Continue(()) + } + Pending => Break(Pending), + } + } else if self.inp1_item.is_some() || self.inp2_item.is_some() { + let process_res = Self::process(self.as_mut(), cx); + match process_res { + Continue(()) => Continue(()), + Break(k) => Break(k), + } + } else { + self.done = true; + Continue(()) + } } } -struct ChannelEventsMerger { - _inp_1: Pin, Error>>>>, - _inp_2: Pin, Error>>>>, +impl fmt::Debug for ChannelEventsMerger { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct(std::any::type_name::()) + .field("inp1_done", &self.inp1_done) + .field("inp2_done", &self.inp2_done) + .field("inp1_item", &self.inp1_item) + .field("inp2_item", &self.inp2_item) + .field("out", &self.out) + .field("range_complete", &self.range_complete) + .field("done", &self.done) + .field("complete", &self.complete) + .finish() + } } impl Stream for ChannelEventsMerger { type Item = Result; - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { - //use Poll::*; - error!("TODO ChannelEventsMerger"); - err::todoval() + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + eprintln!("ChannelEventsMerger poll_next"); + loop { + break if self.complete { + panic!("poll after complete"); + } else if self.done { + self.complete = true; + Ready(None) + } else { + match Self::poll2(self.as_mut(), cx) { + ControlFlow::Continue(()) => continue, + ControlFlow::Break(k) => break k, + } + }; + } } } diff --git a/items_2/src/test.rs b/items_2/src/test.rs new file mode 100644 index 0000000..2a1c6f4 --- /dev/null +++ b/items_2/src/test.rs @@ -0,0 +1,137 @@ +use crate::eventsdim0::EventsDim0; +use crate::{ChannelEvents, ChannelEventsMerger, ConnStatus, Empty}; +use crate::{ConnStatusEvent, Error}; +use futures_util::StreamExt; + +#[test] +fn merge01() { + let fut = async { + let mut events_vec1 = Vec::new(); + let mut events_vec2 = Vec::new(); + { + let mut events = EventsDim0::empty(); + for i in 0..10 { + events.push(i * 100, i, i as f32 * 100.); + } + events_vec1.push(Ok(ChannelEvents::Events(Box::new(events.clone())))); + events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone())))); + } + let inp1 = events_vec1; + let inp1 = futures_util::stream::iter(inp1); + let inp1 = Box::pin(inp1); + let inp2: Vec> = Vec::new(); + let inp2 = futures_util::stream::iter(inp2); + let inp2 = Box::pin(inp2); + let mut merger = ChannelEventsMerger::new(inp1, inp2); + let item = merger.next().await; + assert_eq!(item.as_ref(), events_vec2.get(0)); + let item = merger.next().await; + assert_eq!(item.as_ref(), None); + }; + tokio::runtime::Runtime::new().unwrap().block_on(fut); +} + +#[test] +fn merge02() { + let fut = async { + let mut events_vec1 = Vec::new(); + let mut events_vec2 = Vec::new(); + { + let mut events = EventsDim0::empty(); + for i in 0..10 { + events.push(i * 100, i, i as f32 * 100.); + } + events_vec1.push(Ok(ChannelEvents::Events(Box::new(events.clone())))); + events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone())))); + } + { + let mut events = EventsDim0::empty(); + for i in 10..20 { + events.push(i * 100, i, i as f32 * 100.); + } + events_vec1.push(Ok(ChannelEvents::Events(Box::new(events.clone())))); + events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone())))); + } + let inp1 = events_vec1; + let inp1 = futures_util::stream::iter(inp1); + let inp1 = Box::pin(inp1); + let inp2: Vec> = Vec::new(); + let inp2 = futures_util::stream::iter(inp2); + let inp2 = Box::pin(inp2); + let mut merger = ChannelEventsMerger::new(inp1, inp2); + let item = merger.next().await; + assert_eq!(item.as_ref(), events_vec2.get(0)); + let item = merger.next().await; + assert_eq!(item.as_ref(), events_vec2.get(1)); + let item = merger.next().await; + assert_eq!(item.as_ref(), None); + }; + tokio::runtime::Runtime::new().unwrap().block_on(fut); +} + +#[test] +fn merge03() { + let fut = async { + let mut events_vec1 = Vec::new(); + { + let mut events = EventsDim0::empty(); + for i in 0..10 { + events.push(i * 100, i, i as f32 * 100.); + } + events_vec1.push(Ok(ChannelEvents::Events(Box::new(events)))); + let mut events = EventsDim0::empty(); + for i in 10..20 { + events.push(i * 100, i, i as f32 * 100.); + } + events_vec1.push(Ok(ChannelEvents::Events(Box::new(events.clone())))); + } + let events_vec1 = events_vec1; + let mut events_vec2 = Vec::new(); + { + let mut events = EventsDim0::empty(); + for i in 0..10 { + events.push(i * 100, i, i as f32 * 100.); + } + events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone())))); + let mut events = EventsDim0::empty(); + for i in 10..12 { + events.push(i * 100, i, i as f32 * 100.); + } + events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone())))); + let mut events = EventsDim0::empty(); + for i in 12..20 { + events.push(i * 100, i, i as f32 * 100.); + } + events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone())))); + } + let events_vec2 = events_vec2; + + let inp2_events_a: Vec> = vec![Ok(ChannelEvents::Status(ConnStatusEvent { + ts: 1199, + status: ConnStatus::Disconnect, + }))]; + let inp2_events_b: Vec> = vec![Ok(ChannelEvents::Status(ConnStatusEvent { + ts: 1199, + status: ConnStatus::Disconnect, + }))]; + + let inp1 = events_vec1; + let inp1 = futures_util::stream::iter(inp1); + let inp1 = Box::pin(inp1); + let inp2: Vec> = inp2_events_a; + let inp2 = futures_util::stream::iter(inp2); + let inp2 = Box::pin(inp2); + let mut merger = ChannelEventsMerger::new(inp1, inp2); + let item = merger.next().await; + assert_eq!(item.as_ref(), events_vec2.get(0)); + let item = merger.next().await; + assert_eq!(item.as_ref(), events_vec2.get(1)); + let item = merger.next().await; + assert_eq!(item.as_ref(), inp2_events_b.get(0)); + let item = merger.next().await; + assert_eq!(item.as_ref(), events_vec2.get(2)); + let item = merger.next().await; + assert_eq!(item.as_ref(), None); + }; + tokio::runtime::Runtime::new().unwrap().block_on(fut); +} diff --git a/scyllaconn/src/bincache.rs b/scyllaconn/src/bincache.rs index 3c2cd10..3291e40 100644 --- a/scyllaconn/src/bincache.rs +++ b/scyllaconn/src/bincache.rs @@ -10,6 +10,7 @@ use netpod::timeunits::*; use netpod::{AggKind, ChannelTyped, ScalarType, Shape}; use netpod::{PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange}; use scylla::Session as ScySession; +use std::collections::VecDeque; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -49,8 +50,8 @@ pub async fn read_cached_scylla( if counts.len() != avgs.len() { counts_mismatch = true; } - let ts1s = edges[..(edges.len() - 1).min(edges.len())].to_vec(); - let ts2s = edges[1.min(edges.len())..].to_vec(); + let ts1s: VecDeque<_> = edges[..(edges.len() - 1).min(edges.len())].iter().map(|&x| x).collect(); + let ts2s: VecDeque<_> = edges[1.min(edges.len())..].iter().map(|&x| x).collect(); if ts1s.len() != ts2s.len() { error!("ts1s vs ts2s mismatch"); counts_mismatch = true; @@ -58,9 +59,9 @@ pub async fn read_cached_scylla( if ts1s.len() != counts.len() { counts_mismatch = true; } - let avgs = avgs.into_iter().map(|x| x).collect::>(); - let mins = mins.into_iter().map(|x| x as _).collect::>(); - let maxs = maxs.into_iter().map(|x| x as _).collect::>(); + let avgs: VecDeque<_> = avgs.into_iter().map(|x| x).collect(); + let mins: VecDeque<_> = mins.into_iter().map(|x| x as _).collect(); + let maxs: VecDeque<_> = maxs.into_iter().map(|x| x as _).collect(); if counts_mismatch { error!( "mismatch: edges {} ts1s {} ts2s {} counts {} avgs {} mins {} maxs {}", @@ -73,7 +74,7 @@ pub async fn read_cached_scylla( maxs.len(), ); } - let counts: Vec<_> = counts.into_iter().map(|x| x as u64).collect(); + let counts: VecDeque<_> = counts.into_iter().map(|x| x as u64).collect(); // TODO construct a dyn TimeBinned using the scalar type and shape information. // TODO place the values with little copying into the TimeBinned. use ScalarType::*;