diff --git a/src/apitypes.rs b/src/apitypes.rs index f7e72e5..b73ca9e 100644 --- a/src/apitypes.rs +++ b/src/apitypes.rs @@ -87,7 +87,7 @@ impl UserApiType for ContainerEventsApi where EVT: EventValueType, { - fn into_serializable(self: Box) -> Box { + fn into_serializable_normal(self: Box) -> Box { let mut map = BTreeMap::new(); for (k, v) in ToCborValue::into_fields_box(self) { map.insert(k, v); @@ -115,7 +115,7 @@ where pub cnts: VecDeque, pub mins: ::Container, pub maxs: ::Container, - pub aggs: VecDeque, + pub aggs: ::Container, pub fnls: VecDeque, } @@ -190,8 +190,28 @@ where ret.push(("ts2Ms".into(), Box::new(ts2_ms))); ret.push(("ts2Ns".into(), Box::new(ts2_ns))); ret.push(("counts".into(), Box::new(self.cnts))); - ret.push(("mins".into(), Box::new(self.mins))); - ret.push(("maxs".into(), Box::new(self.maxs))); + { + let fields = self.mins.into_user_facing_fields(); + for (k, v) in fields { + let k = if k == "values" { + "mins".to_string() + } else { + format!("mins_{}", k) + }; + ret.push((k, v)); + } + } + { + let fields = self.maxs.into_user_facing_fields(); + for (k, v) in fields { + let k = if k == "values" { + "maxs".to_string() + } else { + format!("maxs_{}", k) + }; + ret.push((k, v)); + } + } ret.push(("avgs".into(), Box::new(self.aggs))); ret } @@ -206,7 +226,7 @@ where EVT: EventValueType, BVT: BinAggedType, { - fn into_serializable(self: Box) -> Box { + fn into_serializable_normal(self: Box) -> Box { let mut map = BTreeMap::new(); for (k, v) in ToCborValue::into_fields_box(self) { map.insert(k, v); diff --git a/src/binning/container/bins.rs b/src/binning/container/bins.rs index 1cac8fb..429ae85 100644 --- a/src/binning/container/bins.rs +++ b/src/binning/container/bins.rs @@ -5,6 +5,7 @@ use serde::Deserialize; use serde::Serialize; use std::collections::VecDeque; use std::fmt; +use std::ops::Range; pub trait AggBinValTw: fmt::Debug + Send where @@ -24,7 +25,9 @@ where fn new() -> Self; fn push_back(&mut self, val: BVT); fn pop_front(&mut self) -> Option; + fn iter_ty_1(&self) -> impl Iterator>; fn get_iter_ty_1<'a>(&'a self, pos: usize) -> Option>; + fn drain_into(&mut self, dst: &mut Self, range: Range); } pub trait BinAggedType: @@ -35,47 +38,41 @@ pub trait BinAggedType: type IterTy1<'a>: fmt::Debug + Clone + PartialOrdEvtA + Into; } -impl BinAggedContainer for VecDeque -where - BVT: BinAggedType, -{ - fn new() -> Self { - todo!() - } +macro_rules! impl_bin_agged_cont_simple_copyable { + ($evt:ty) => { + impl BinAggedContainer<$evt> for VecDeque<$evt> { + fn new() -> Self { + Self::new() + } - fn push_back(&mut self, val: BVT) { - todo!() - } + fn push_back(&mut self, val: $evt) { + self.push_back(val); + } - fn pop_front(&mut self) -> Option { - todo!() - } + fn pop_front(&mut self) -> Option<$evt> { + self.pop_front() + } - fn get_iter_ty_1<'a>(&'a self, pos: usize) -> Option<::IterTy1<'a>> { - todo!() - } + fn iter_ty_1(&self) -> impl Iterator::IterTy1<'_>> { + self.iter().map(|&x| x) + } + + fn get_iter_ty_1<'a>( + &'a self, + pos: usize, + ) -> Option<<$evt as BinAggedType>::IterTy1<'a>> { + self.get(pos).map(|&x| x) + } + + fn drain_into(&mut self, dst: &mut Self, range: Range) { + dst.extend(self.drain(range)); + } + } + }; } -impl BinAggedContainer for VecDeque -where - BVT: BinAggedType, -{ - fn new() -> Self { - todo!() - } - - fn push_back(&mut self, val: BVT) { - todo!() - } - - fn pop_front(&mut self) -> Option { - todo!() - } - - fn get_iter_ty_1<'a>(&'a self, pos: usize) -> Option<::IterTy1<'a>> { - todo!() - } -} +impl_bin_agged_cont_simple_copyable!(f32); +impl_bin_agged_cont_simple_copyable!(f64); impl BinAggedType for f32 { type Container = VecDeque; diff --git a/src/binning/container_bins.rs b/src/binning/container_bins.rs index c3d9669..b0d75a3 100644 --- a/src/binning/container_bins.rs +++ b/src/binning/container_bins.rs @@ -2,6 +2,7 @@ use super::container::bins::BinAggedType; use super::container_events::Container; use super::container_events::EventValueType; use crate::apitypes::ContainerBinsApi; +use crate::binning::container::bins::BinAggedContainer; use crate::offsets::ts_offs_from_abs; use crate::offsets::ts_offs_from_abs_with_anchor; use core::fmt; @@ -45,7 +46,7 @@ where pub cnt: u64, pub min: EVT::IterTy1<'a>, pub max: EVT::IterTy1<'a>, - pub agg: &'a BVT, + pub agg: BVT::IterTy1<'a>, pub lst: EVT::IterTy1<'a>, pub fnl: bool, } @@ -79,7 +80,7 @@ where cnt: b.cnts[i], min: b.mins.get_iter_ty_1(i).unwrap(), max: b.maxs.get_iter_ty_1(i).unwrap(), - agg: &b.aggs[i], + agg: BinAggedContainer::get_iter_ty_1(&b.aggs, i).unwrap(), lst: b.lsts.get_iter_ty_1(i).unwrap(), fnl: b.fnls[i], }; @@ -101,7 +102,7 @@ where cnts: VecDeque, mins: ::Container, maxs: ::Container, - aggs: VecDeque, + aggs: ::Container, lsts: ::Container, fnls: VecDeque, } @@ -158,7 +159,7 @@ where cnts: VecDeque::new(), mins: <::Container as Container>::new(), maxs: <::Container as Container>::new(), - aggs: VecDeque::new(), + aggs: <::Container as BinAggedContainer>::new(), lsts: <::Container as Container>::new(), fnls: VecDeque::new(), } @@ -216,8 +217,8 @@ where self.maxs.iter_ty_1() } - pub fn aggs_iter(&self) -> std::collections::vec_deque::Iter { - self.aggs.iter() + pub fn aggs_iter(&self) -> impl Iterator> { + self.aggs.iter_ty_1() } pub fn lsts_iter(&self) -> impl Iterator> { @@ -246,7 +247,7 @@ where >, impl Iterator>, >, - std::collections::vec_deque::Iter, + impl Iterator>, >, impl Iterator>, >, @@ -615,7 +616,7 @@ where dst.cnts.extend(self.cnts.drain(range.clone())); self.mins.drain_into(&mut dst.mins, range.clone()); self.maxs.drain_into(&mut dst.maxs, range.clone()); - dst.aggs.extend(self.aggs.drain(range.clone())); + self.aggs.drain_into(&mut dst.aggs, range.clone()); self.lsts.drain_into(&mut dst.lsts, range.clone()); dst.fnls.extend(self.fnls.drain(range.clone())); } else { @@ -635,7 +636,9 @@ where Box::new(ret) } - fn fix_numerics(&mut self) {} + fn boxed_into_collectable_box(self: Box) -> Box { + Box::new(*self) + } } pub struct ContainerBinsTakeUpTo<'a, EVT, BVT> diff --git a/src/binning/container_events.rs b/src/binning/container_events.rs index ab67f53..087bae4 100644 --- a/src/binning/container_events.rs +++ b/src/binning/container_events.rs @@ -42,7 +42,7 @@ use serde::Serialize; use std::any; use std::collections::VecDeque; -macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } +macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } #[derive(Debug, ThisError)] #[cstm(name = "ValueContainerError")] @@ -292,15 +292,11 @@ where { fn cmp_a(&self, other: &PulsedVal) -> Option { use std::cmp::Ordering; - match self.pulse.cmp(&other.0) { - Ordering::Less => Some(Ordering::Less), - Ordering::Greater => Some(Ordering::Greater), - Ordering::Equal => match self.evt.cmp_a(&other.1) { - Some(Ordering::Less) => Some(Ordering::Less), - Some(Ordering::Greater) => Some(Ordering::Greater), - Some(Ordering::Equal) => Some(Ordering::Equal), - None => None, - }, + match self.evt.cmp_a(&other.1) { + Some(Ordering::Less) => Some(Ordering::Less), + Some(Ordering::Greater) => Some(Ordering::Greater), + Some(Ordering::Equal) => Some(Ordering::Equal), + None => None, } } } diff --git a/src/binning/timeweight/timeweight_bins.rs b/src/binning/timeweight/timeweight_bins.rs index 9582dfa..e56462d 100644 --- a/src/binning/timeweight/timeweight_bins.rs +++ b/src/binning/timeweight/timeweight_bins.rs @@ -116,7 +116,7 @@ where Self::bound(&mut self.max, max, std::cmp::Ordering::Greater); let dt = ts2.delta(ts1); let bl = self.range.bin_len_dt_ns(); - self.agg.ingest(dt, bl, cnt, agg.clone()); + self.agg.ingest(dt, bl, cnt, agg.into()); self.non_fnl |= !fnl; self.lst = Some(lst.into()); } diff --git a/src/channelevents.rs b/src/channelevents.rs index 1b76bf5..ff1b3d4 100644 --- a/src/channelevents.rs +++ b/src/channelevents.rs @@ -557,7 +557,6 @@ mod test_channel_events_serde { use super::ChannelEvents; use crate::binning::container_events::ContainerEvents; use crate::channelevents::ConnStatusEvent; - use crate::eventsdim0::EventsDim0; use bincode::config::FixintEncoding; use bincode::config::LittleEndian; use bincode::config::RejectTrailing; @@ -601,9 +600,9 @@ mod test_channel_events_serde { #[test] fn channel_events_bincode() { - let mut evs = ContainerEvents::new(); - evs.push_back(TsNano::from_ns(8), 3.0f32); - evs.push_back(TsNano::from_ns(12), 3.2f32); + let mut evs = ContainerEvents::::new(); + evs.push_back(TsNano::from_ns(8), 3.0); + evs.push_back(TsNano::from_ns(12), 3.2); let item = ChannelEvents::from(evs); let opts = bincode_opts(); let mut out = Vec::new(); @@ -617,7 +616,7 @@ mod test_channel_events_serde { } else { panic!() }; - let item: &EventsDim0 = item.as_any_ref().downcast_ref().unwrap(); + let item: &ContainerEvents = item.as_any_ref().downcast_ref().unwrap(); assert_eq!(item.tss().len(), 2); assert_eq!(item.tss()[1], 12); } diff --git a/src/merger.rs b/src/merger.rs index 1aa0c13..897f5f7 100644 --- a/src/merger.rs +++ b/src/merger.rs @@ -2,7 +2,6 @@ mod test; use crate::log::*; -use core::ops::Range; use futures_util::Stream; use futures_util::StreamExt; use items_0::container::ByteEstimate; @@ -17,9 +16,6 @@ use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::SitemErrTy; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; -use items_0::Events; -use items_0::WithLen; -use netpod::TsMs; use netpod::TsNano; use std::collections::VecDeque; use std::fmt; @@ -37,7 +33,7 @@ macro_rules! trace3 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } macro_rules! trace4 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } -macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } +macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } #[derive(Debug, thiserror::Error)] #[cstm(name = "MergerError")] @@ -64,7 +60,6 @@ pub struct Merger { done_inp: bool, done_range_complete: bool, complete: bool, - poll_count: usize, } impl fmt::Debug for Merger @@ -104,7 +99,6 @@ where done_inp: false, done_range_complete: false, complete: false, - poll_count: 0, } } @@ -411,7 +405,7 @@ where Continue(()) } } else { - trace!("no output candidate"); + trace4!("no output candidate"); if last_emit { Break(Ready(None)) } else { @@ -439,15 +433,12 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - self.poll_count += 1; - let span1 = span!(Level::INFO, "Merger", pc = self.poll_count); + // let span1 = span!(Level::INFO, "Merger", pc = self.poll_count); + let span1 = span!(Level::INFO, "Merger"); let _spg = span1.enter(); loop { trace3!("poll"); - break if self.poll_count == usize::MAX { - self.done_range_complete = true; - continue; - } else if self.complete { + break if self.complete { panic!("poll after complete"); } else if self.done_range_complete { self.complete = true; diff --git a/src/testgen.rs b/src/testgen.rs index 48a6d2e..a7b875c 100644 --- a/src/testgen.rs +++ b/src/testgen.rs @@ -1,7 +1,6 @@ pub mod events_gen; -use crate::eventsdim0::EventsDim0; -use crate::Events; +use crate::binning::container_events::ContainerEvents; use items_0::Appendable; use items_0::Empty; use netpod::TsNano; @@ -21,14 +20,14 @@ pub fn make_some_boxed_d0_f32( tstep: u64, tmask: u64, seed: u32, -) -> Box { +) -> ContainerEvents { let mut vstate = seed; - let mut events = EventsDim0::empty(); + let mut events = ContainerEvents::empty(); for i in 0..n { vstate = xorshift32(vstate); let ts = t0 + i as u64 * tstep + (vstate as u64 & tmask); let value = i as f32 * 100. + vstate as f32 / u32::MAX as f32 / 10.; events.push(TsNano::from_ns(ts), value); } - Box::new(events) + events } diff --git a/src/testgen/events_gen.rs b/src/testgen/events_gen.rs index 6cb427e..3158d93 100644 --- a/src/testgen/events_gen.rs +++ b/src/testgen/events_gen.rs @@ -1,7 +1,4 @@ use crate::binning::container_events::ContainerEvents; -use crate::eventsdim0::EventsDim0; -use items_0::Empty; -use items_0::WithLen; use netpod::range::evrange::NanoRange; use netpod::TsNano; use std::pin::Pin; @@ -13,32 +10,6 @@ where Box::pin(inp) } -pub fn old_events_gen_dim0_f32_v00(range: NanoRange) -> impl Iterator> { - let dt = 1000 * 1000 * 10; - let beg = range.beg(); - let end = range.end(); - let mut ts = beg - dt; - std::iter::repeat(0) - .map(move |_| { - type T = f32; - let mut c = EventsDim0::empty(); - loop { - let ts1 = TsNano::from_ns(ts); - ts += dt; - if ts1.ns() >= end { - break; - } - let val = (ts / 1_000_000) as T; - c.push_back(ts1.ns(), 0, val); - if c.len() >= 8 { - break; - } - } - c - }) - .take_while(|c| c.len() != 0) -} - pub fn new_events_gen_dim1_f32_v00( range: NanoRange, ) -> impl Iterator>> {