From e8cbe1977019a74874be27430c3e9f48dd53ebcf Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 31 Oct 2024 18:56:50 +0100 Subject: [PATCH] Remove unused code --- netfetch/src/ca/conn.rs | 1 + serieswriter/src/binwriter.rs | 70 ++-- serieswriter/src/lib.rs | 2 - serieswriter/src/patchcollect.rs | 124 ------- serieswriter/src/timebin.rs | 532 ------------------------------- 5 files changed, 39 insertions(+), 690 deletions(-) delete mode 100644 serieswriter/src/patchcollect.rs delete mode 100644 serieswriter/src/timebin.rs diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index ee91dd4..5d3aa00 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1423,6 +1423,7 @@ impl CaConn { if let ChannelState::MakingSeriesWriter(st2) = &mut conf.state { let dt = stnow.duration_since(SystemTime::UNIX_EPOCH).unwrap(); let beg = TsNano::from_ns(SEC * dt.as_secs() + dt.subsec_nanos() as u64); + let a = &conf.conf.min_quiets(); let binwriter = BinWriter::new( beg, RetentionTime::Short, diff --git a/serieswriter/src/binwriter.rs b/serieswriter/src/binwriter.rs index e650a18..8ec85c4 100644 --- a/serieswriter/src/binwriter.rs +++ b/serieswriter/src/binwriter.rs @@ -1,5 +1,6 @@ use err::thiserror; use err::ThisError; +use items_2::binning::container_bins::ContainerBins; use items_2::binning::container_events::ContainerEvents; use items_2::binning::timeweight::timeweight_events::BinnedEventsTimeweight; use netpod::log::*; @@ -32,7 +33,6 @@ macro_rules! trace_tick_verbose { ($($arg:tt)*) => ( if false { trace!($($arg)*) pub enum Error { SeriesLookupError, SeriesWriter(#[from] crate::writer::Error), - Timebin(#[from] crate::timebin::Error), Binning(#[from] items_2::binning::timeweight::timeweight_events::Error), UnsupportedBinGrid(DtMs), } @@ -96,6 +96,42 @@ impl BinWriter { Ok(()) } + fn handle_output_ready(&mut self, out: ContainerBins, iqdqs: &mut InsertDeques) -> Result<(), Error> { + let selfname = "handle_output_ready"; + trace_tick!("{selfname} bins ready len {}", out.len()); + for e in out.iter_debug() { + trace_tick_verbose!("{e:?}"); + } + for ((((((&ts1, &ts2), &cnt), &min), &max), &avg), &fnl) in out.zip_iter() { + if fnl == false { + debug!("non final bin"); + } else if cnt == 0 { + } else { + let bin_len = DtMs::from_ms_u64(ts2.delta(ts1).ms_u64()); + let div = if bin_len == DtMs::from_ms_u64(1000 * 10) { + DtMs::from_ms_u64(1000 * 60 * 60 * 2) + } else { + // TODO + return Err(Error::UnsupportedBinGrid(bin_len)); + }; + let ts_msp = TsMs::from_ms_u64(ts1.ms() / div.ms() * div.ms()); + let off = (ts1.ms() - ts_msp.ms()) / bin_len.ms(); + let item = QueryItem::TimeBinSimpleF32V01(TimeBinSimpleF32V01 { + series: self.sid.clone(), + bin_len_ms: bin_len.ms() as i32, + ts_msp, + off: off as i32, + count: cnt as i64, + min, + max, + avg, + }); + iqdqs.lt_rf3_qu.push_back(item); + } + } + Ok(()) + } + pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { if self.evbuf.len() != 0 { trace_tick!("tick evbuf len {}", self.evbuf.len()); @@ -106,37 +142,7 @@ impl BinWriter { } let out = self.binner.output(); if out.len() != 0 { - trace_tick!("bins ready len {}", out.len()); - for e in out.iter_debug() { - trace_tick_verbose!("{e:?}"); - } - for ((((((&ts1, &ts2), &cnt), &min), &max), &avg), &fnl) in out.zip_iter() { - if fnl == false { - debug!("non final bin"); - } else if cnt == 0 { - } else { - let bin_len = DtMs::from_ms_u64(ts2.delta(ts1).ms_u64()); - let div = if bin_len == DtMs::from_ms_u64(1000 * 10) { - DtMs::from_ms_u64(1000 * 60 * 60 * 2) - } else { - // TODO - return Err(Error::UnsupportedBinGrid(bin_len)); - }; - let ts_msp = TsMs::from_ms_u64(ts1.ms() / div.ms() * div.ms()); - let off = (ts1.ms() - ts_msp.ms()) / bin_len.ms(); - let item = QueryItem::TimeBinSimpleF32V01(TimeBinSimpleF32V01 { - series: self.sid.clone(), - bin_len_ms: bin_len.ms() as i32, - ts_msp, - off: off as i32, - count: cnt as i64, - min, - max, - avg, - }); - iqdqs.lt_rf3_qu.push_back(item); - } - } + self.handle_output_ready(out, iqdqs)?; } else { trace_tick_verbose!("tick NO BINS YET"); } diff --git a/serieswriter/src/lib.rs b/serieswriter/src/lib.rs index 9c48b41..bc5becd 100644 --- a/serieswriter/src/lib.rs +++ b/serieswriter/src/lib.rs @@ -2,8 +2,6 @@ pub mod binwriter; pub mod changewriter; pub mod fixgridwriter; pub mod msptool; -pub mod patchcollect; pub mod ratelimitwriter; pub mod rtwriter; -pub mod timebin; pub mod writer; diff --git a/serieswriter/src/patchcollect.rs b/serieswriter/src/patchcollect.rs deleted file mode 100644 index 8793747..0000000 --- a/serieswriter/src/patchcollect.rs +++ /dev/null @@ -1,124 +0,0 @@ -use err::Error; -use items_0::timebin::TimeBinned; -use log::*; -use netpod::timeunits::SEC; -use netpod::TsNano; -use std::collections::VecDeque; -use std::mem; - -#[derive(Debug)] -pub struct PatchCollect { - patch_len: TsNano, - bin_len: TsNano, - bin_count: u64, - coll: Option>, - locked: bool, - outq: VecDeque>, -} - -impl PatchCollect { - fn new(bin_len: TsNano, bin_count: u64) -> Self { - Self { - patch_len: TsNano::from_ns(bin_len.ns() * bin_count), - bin_len, - bin_count, - coll: None, - locked: false, - outq: VecDeque::new(), - } - } - - pub fn patch_len(&self) -> TsNano { - self.patch_len.clone() - } - - pub fn bin_len(&self) -> TsNano { - self.bin_len.clone() - } - - pub fn bin_count(&self) -> u64 { - self.bin_count - } - - fn ingest(&mut self, item: &mut dyn TimeBinned) -> Result<(), Error> { - let mut n1 = 0; - let mut item_len_exp = item.len(); - loop { - n1 += 1; - if n1 > 20 { - return Err(Error::with_msg_no_trace("patchcollect too many iterations")); - } - info!("ingest loop item len {}", item.len()); - if item.len() != item_len_exp { - return Err(Error::with_msg_no_trace(format!( - "patchcollect item_len_exp mismatch {} vs {}", - item.len(), - item_len_exp - ))); - } - if item.len() == 0 { - break; - } - let coll = self.coll.get_or_insert_with(|| item.empty_like_self_box_time_binned()); - let (ts1s, ts2s) = item.edges_slice(); - let mut discard = false; - let mut emit = false; - let i1 = 0; - let mut i3 = item.len(); - for (i2, (ts1, ts2)) in ts1s.iter().zip(ts2s).enumerate() { - info!("EDGE {}", ts1 / SEC); - if self.locked { - if ts2 % self.patch_len.ns() == 0 { - info!("FOUND PATCH EDGE-END at {}", ts2 / SEC); - i3 = i2 + 1; - emit = true; - } - } else { - if ts1 % self.patch_len.ns() == 0 { - info!("FOUND PATCH EDGE-BEG at {}", ts1 / SEC); - self.locked = true; - i3 = i2; - discard = true; - break; - } - } - } - if !self.locked { - info!("drain all"); - item_len_exp = 0; - item.reset(); - } else if discard { - let range = i1..i3; - info!("discard range-len {}", range.len()); - item_len_exp -= range.len(); - item.drain_into_tb(coll.as_mut(), range)?; - coll.reset(); - } else if emit { - let range = i1..i3; - info!("take and emit range-len {}", range.len()); - item_len_exp -= range.len(); - item.drain_into_tb(coll.as_mut(), range)?; - if coll.len() != self.bin_count as usize { - error!("PatchCollect bin count mismatch {} vs {}", coll.len(), self.bin_count); - } - //info!("Patch EMIT {coll:?}"); - let k = self.coll.take().unwrap(); - self.outq.push_back(k); - } else { - let range = i1..i3; - info!("take all range-len {}", range.len()); - item_len_exp = 0; - item.drain_into_tb(coll.as_mut(), range)?; - } - } - Ok(()) - } - - pub fn outq_len(&self) -> usize { - self.outq.len() - } - - pub fn take_outq(&mut self) -> VecDeque> { - mem::replace(&mut self.outq, VecDeque::new()) - } -} diff --git a/serieswriter/src/timebin.rs b/serieswriter/src/timebin.rs deleted file mode 100644 index 0d1d0ed..0000000 --- a/serieswriter/src/timebin.rs +++ /dev/null @@ -1,532 +0,0 @@ -use any::Any; -use core::fmt; -use err::thiserror; -use err::ThisError; -use items_0::scalar_ops::ScalarOps; -use items_0::timebin::TimeBinner; -use items_0::Appendable; -use items_0::Empty; -use items_0::Events; -use items_0::Resettable; -use items_0::WithLen; -use items_2::binsdim0::BinsDim0; -use items_2::eventsdim0::EventsDim0; -use items_2::eventsdim0::EventsDim0TimeBinner; -use netpod::f32_close; -use netpod::log::*; -use netpod::timeunits::MS; -use netpod::ttl::RetentionTime; -use netpod::BinnedRange; -use netpod::BinnedRangeEnum; -use netpod::DtMs; -use netpod::DtNano; -use netpod::ScalarType; -use netpod::Shape; -use netpod::TsMs; -use netpod::TsNano; -use scywr::insertqueues::InsertDeques; -use scywr::iteminsertqueue::DataValue; -use scywr::iteminsertqueue::GetValHelp; -use scywr::iteminsertqueue::QueryItem; -use scywr::iteminsertqueue::TimeBinSimpleF32; -use series::SeriesId; -use std::any; - -#[allow(unused)] -macro_rules! todo_setup { - ($($arg:tt)*) => { - if true { - debug!($($arg)*); - } - }; -} - -#[allow(unused)] -macro_rules! trace_store_bin { - ($($arg:tt)*) => { - if true { - trace!($($arg)*); - } - }; -} - -#[allow(unused)] -macro_rules! trace_setup { - ($($arg:tt)*) => { - if true { - trace!($($arg)*); - } - }; -} - -#[allow(unused)] -macro_rules! trace_tick { - ($($arg:tt)*) => { - if true { - trace!($($arg)*); - } - }; -} - -#[allow(unused)] -macro_rules! trace_push { - ($($arg:tt)*) => { - if true { - trace!($($arg)*); - } - }; -} - -#[derive(Debug, ThisError)] -#[cstm(name = "SerieswriterTimebin")] -pub enum Error { - UnexpectedContainer, - PatchWithoutBins, - PatchUnexpectedContainer, - GetValHelpMismatch, - HaveBinsButNoneReturned, - UnsupportedType, - Unsupported, -} - -struct TickParams<'a> { - rt: RetentionTime, - series: SeriesId, - acc: &'a mut (dyn Any + Send), - tb: &'a mut dyn TimeBinner, - iqdqs: &'a mut InsertDeques, - next_coarse: Option<&'a mut EventsDim0TimeBinner>, -} - -pub struct PushFnParams<'a> { - series: SeriesId, - acc: &'a mut (dyn Any + Send), - ts: TsNano, - val: &'a DataValue, -} - -struct Internal { - push_fn: Box Result<(), Error> + Send>, - tick_fn: Box Result<(), Error> + Send>, -} - -impl fmt::Debug for Internal { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("Internal") - .field("push_fn", &()) - .field("tick_fn", &()) - .finish() - } -} - -struct SetupResult { - events_binner: Box, - acc: Box, - push_fn: Box Result<(), Error> + Send>, - tick_fn: Box Result<(), Error> + Send>, -} - -#[derive(Debug)] -pub struct ConnTimeBin { - rt: RetentionTime, - series: SeriesId, - #[allow(unused)] - bin_len: DtNano, - next_coarse: Option>>, - events_binner: Box, - acc: Box, - internal: Internal, - unsup: bool, -} - -impl ConnTimeBin { - pub fn new( - rt: RetentionTime, - series: SeriesId, - beg: TsNano, - bin_len: DtNano, - scalar_type: ScalarType, - shape: Shape, - ) -> Result { - let do_time_weight = true; - #[cfg(target_abi = "12")] - let next_coarse = if bin_len.ns() < SEC * 60 { - type ST = f32; - let brange = BinnedRange { - bin_len: TsNano::from_ns(SEC * 60), - bin_off: todo!(), - bin_cnt: todo!(), - }; - let binned_range = BinnedRangeEnum::Time(brange); - let tb = EventsDim0TimeBinner::::new(binned_range, do_time_weight).unwrap(); - Some(tb) - } else if bin_len.ns() < SEC * 60 * 2 { - todo!() - } else if bin_len.ns() < SEC * 60 * 10 { - todo!() - } else { - None - } - .map(Box::new); - let mut unsup = false; - let k = Self::setup_for(beg, bin_len, &scalar_type, &shape, do_time_weight); - let k = if k.is_ok() { - k - } else { - unsup = true; - Self::setup_for(beg, bin_len, &ScalarType::F32, &Shape::Scalar, do_time_weight) - }; - let k = k?; - let ret = Self { - rt, - series, - bin_len, - next_coarse: None, - events_binner: k.events_binner, - acc: k.acc, - internal: Internal { - push_fn: k.push_fn, - tick_fn: k.tick_fn, - }, - unsup, - }; - Ok(ret) - } - - fn setup_for( - beg: TsNano, - bin_len: DtNano, - scalar_type: &ScalarType, - shape: &Shape, - do_time_weight: bool, - ) -> Result { - // TODO should not take a system time here: - let range1 = BinnedRange { - bin_off: beg.ns() / bin_len.ns(), - bin_cnt: u64::MAX / bin_len.ns() - 10, - // TODO fix trait requirements - bin_len: TsNano::from_ns(bin_len.ns()), - }; - let binrange = BinnedRangeEnum::Time(range1); - match shape { - Shape::Scalar => { - use ScalarType::*; - match scalar_type { - U8 => Self::setup_scalar::(binrange, do_time_weight), - U16 => Self::setup_scalar::(binrange, do_time_weight), - U32 => Self::setup_scalar::(binrange, do_time_weight), - U64 => Self::setup_scalar::(binrange, do_time_weight), - I8 => Self::setup_scalar::(binrange, do_time_weight), - I16 => Self::setup_scalar::(binrange, do_time_weight), - I32 => Self::setup_scalar::(binrange, do_time_weight), - I64 => Self::setup_scalar::(binrange, do_time_weight), - F32 => Self::setup_scalar::(binrange, do_time_weight), - F64 => Self::setup_scalar::(binrange, do_time_weight), - STRING => { - todo_setup!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); - Err(Error::UnsupportedType) - } - _ => { - todo_setup!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); - Err(Error::UnsupportedType) - } - } - } - Shape::Wave(..) => match scalar_type { - _ => { - todo_setup!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); - Err(Error::UnsupportedType) - } - }, - _ => { - todo_setup!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); - Err(Error::UnsupportedType) - } - } - } - - fn setup_scalar(binrange: BinnedRangeEnum, do_time_weight: bool) -> Result - where - ST: ScalarOps, - DataValue: GetValHelp, - { - trace_setup!("SCALAR {}", any::type_name::()); - type Cont = EventsDim0; - let cont = Cont::::empty(); - let emit_empty_bins = false; - let ret = SetupResult { - events_binner: cont - .as_time_binnable_ref() - .time_binner_new(binrange, do_time_weight, emit_empty_bins), - acc: Box::new(cont), - push_fn: Box::new(push::), - tick_fn: Box::new(tick::), - }; - Ok(ret) - } - - pub fn push(&mut self, ts: TsNano, val: &DataValue) -> Result<(), Error> { - if self.unsup { - return Ok(()); - } - let (f, acc) = (&self.internal.push_fn, self.acc.as_mut()); - let params = PushFnParams { - series: self.series.clone(), - acc, - ts, - val, - }; - f(params) - } - - pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { - if self.unsup { - return Ok(()); - } - let (f,) = (&self.internal.tick_fn,); - let params = TickParams { - rt: self.rt.clone(), - series: self.series.clone(), - acc: self.acc.as_mut(), - tb: self.events_binner.as_mut(), - // pc: &mut self.patch_collect, - iqdqs, - next_coarse: self.next_coarse.as_mut().map(|x| x.as_mut()), - }; - f(params) - } - - pub fn finish(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { - if self.unsup { - return Ok(()); - } - let series = self.series.clone(); - let tb = self.events_binner.as_mut(); - tb.push_in_progress(false); - let nbins = tb.bins_ready_count(); - if nbins >= 1 { - trace_store_bin!("finish nbins {} {:?}", nbins, series); - let rt = self.rt.clone(); - let next_coarse = self.next_coarse.as_mut().map(|x| x.as_mut()); - store_bins(rt, series, tb, iqdqs, next_coarse)?; - } - Ok(()) - } -} - -fn push(params: PushFnParams) -> Result<(), Error> -where - STY: ScalarOps, - DataValue: GetValHelp, -{ - let series = ¶ms.series; - let ts = params.ts; - let v = match GetValHelp::::get(params.val) { - Ok(x) => x, - Err(e) => { - // TODO throttle the error - let msg = format!( - "GetValHelp mismatch: series {:?} STY {} data {:?} {e}", - series, - any::type_name::(), - params.val - ); - error!("{msg}"); - return Err(Error::GetValHelpMismatch); - } - }; - if let Some(c) = params.acc.downcast_mut::>() { - trace_push!("PUSHED"); - c.push(ts.ns(), 0, v.clone()); - Ok(()) - } else { - Err(Error::UnexpectedContainer) - } -} - -fn tick(params: TickParams) -> Result<(), Error> -where - STY: ScalarOps, -{ - let rt = params.rt; - let acc = params.acc; - let tb = params.tb; - let iqdqs = params.iqdqs; - let next = params.next_coarse; - if let Some(c) = acc.downcast_mut::>() { - trace_tick!("TICK CONV"); - if c.len() >= 1 { - trace_tick!("TICK EVENTS"); - tb.ingest(c); - c.reset(); - trace_tick!("TICK INGESTED"); - let nbins = tb.bins_ready_count(); - if nbins >= 1 { - trace_tick!("TICK READY {nbins}"); - trace_store_bin!("store bins len {} {:?}", nbins, params.series); - store_bins(rt, params.series.clone(), tb, iqdqs, next)?; - // if let Some(mut bins) = tb.bins_ready() { - // //info!("store bins {bins:?}"); - // let mut bins = bins.to_simple_bins_f32(); - - // TODO; - - // pc.ingest(bins.as_mut())?; - // let noutq = pc.outq_len(); - // info!("noutq {noutq}"); - // if noutq != 0 { - // store_patch(params.series.clone(), pc, iiq)?; - // Ok(()) - // } else { - // warn!("pc outq len zero"); - // Ok(()) - // } - // } else { - // error!("have bins but none returned"); - // Err(Error::HaveBinsButNoneReturned) - // } - - Ok(()) - } else { - trace_tick!("TICK NOT READY"); - Ok(()) - } - } else { - trace_tick!("TICK NO EVENTS TO PROCESS"); - Ok(()) - } - } else { - trace_tick!("TICK UNEXPECTED CONTAINER"); - Err(Error::UnexpectedContainer) - } -} - -fn store_bins( - rt: RetentionTime, - series: SeriesId, - tb: &mut dyn TimeBinner, - iqdqs: &mut InsertDeques, - next: Option<&mut EventsDim0TimeBinner>, -) -> Result<(), Error> { - if let Some(mut bins) = tb.bins_ready() { - let bins = bins.to_simple_bins_f32(); - if let Some(k) = bins.as_any_ref().downcast_ref::>() { - if k.len() == 0 { - return Err(Error::PatchWithoutBins); - } else { - for (((((&ts1, &ts2), &count), &min), &max), &avg) in k - .ts1s - .iter() - .zip(k.ts2s.iter()) - .zip(k.cnts.iter()) - .zip(k.mins.iter()) - .zip(k.maxs.iter()) - .zip(k.avgs.iter()) - { - // TODO the inner must be of BinsDim0 type so we feed also count, min, max, etc. - if let Some(_next) = &next { - // next.ingest(); - } - - let ts1 = TsMs::from_ms_u64(ts1 / MS); - let ts2 = TsMs::from_ms_u64(ts2 / MS); - let bin_len = ts2 - ts1; - let h = if bin_len == DtMs::from_ms_u64(1000 * 10) { - DtMs::from_ms_u64(1000 * 60 * 60 * 2) - } else { - // TODO - return Err(Error::Unsupported); - }; - let ts_msp = TsMs::from_ms_u64(ts1.ms() / h.ms() * h.ms()); - let off = (ts1.ms() - ts_msp.ms()) / bin_len.ms(); - let item = TimeBinSimpleF32 { - series: series.clone(), - bin_len_ms: bin_len.ms() as i32, - ts_msp, - off: off as i32, - count: count as i64, - min, - max, - avg, - }; - let item = QueryItem::TimeBinSimpleF32(item); - trace_store_bin!("push item B ts1ms {ts1:?} bin_len_ms {bin_len:?} ts_msp {ts_msp} off {off}"); - iqdqs.deque(rt.clone()).push_back(item.clone()); - } - } - Ok(()) - } else { - error!("unexpected container!"); - Err(Error::PatchUnexpectedContainer) - } - // TODO feed also the next patch collector for the next coarse resolution. - // pc.ingest(bins.as_mut())?; - // let noutq = pc.outq_len(); - // info!("noutq {noutq}"); - // if noutq != 0 { - // store_patch(params.series.clone(), pc, iiq)?; - // Ok(()) - // } else { - // warn!("pc outq len zero"); - // Ok(()) - // } - } else { - error!("have bins but none returned"); - Err(Error::HaveBinsButNoneReturned) - } -} - -#[test] -fn test_00() { - let mut ctb = init_scalar_f32_conn_time_bin().unwrap(); - let mut iqdqs = InsertDeques::new(); - ctb.tick(&mut iqdqs).unwrap(); - assert_eq!(iqdqs.len(), 0); -} - -#[test] -fn test_01() { - use scywr::iteminsertqueue::ScalarValue; - let mut ctb = init_scalar_f32_conn_time_bin().unwrap(); - ctb.push(TsNano::from_ms(1000), &DataValue::Scalar(ScalarValue::I32(10))) - .unwrap(); - let mut iqdqs = InsertDeques::new(); - ctb.tick(&mut iqdqs).unwrap(); - assert_eq!(iqdqs.len(), 0); -} - -#[test] -fn test_02() { - use scywr::iteminsertqueue::ScalarValue; - let mut ctb = init_scalar_f32_conn_time_bin().unwrap(); - ctb.push(TsNano::from_ms(1000 * 10), &DataValue::Scalar(ScalarValue::I32(10))) - .unwrap(); - ctb.push(TsNano::from_ms(1000 * 11), &DataValue::Scalar(ScalarValue::I32(12))) - .unwrap(); - ctb.push(TsNano::from_ms(1000 * 12), &DataValue::Scalar(ScalarValue::I32(10))) - .unwrap(); - let mut iqdqs = InsertDeques::new(); - ctb.tick(&mut iqdqs).unwrap(); - ctb.finish(&mut iqdqs).unwrap(); - assert_eq!(iqdqs.len(), 1); - for e in iqdqs.st_rf3_qu { - eprintln!("{e:?}"); - if let QueryItem::TimeBinSimpleF32(x) = e { - assert!(f32_close(x.avg, 10.2)); - } else { - panic!(); - } - } -} - -#[cfg(test)] -fn init_scalar_f32_conn_time_bin() -> Result { - let rt = RetentionTime::Short; - let series = SeriesId::new(1); - let beg = TsNano::from_ms(1000 * 10); - let bin_len = DtNano::from_ms(1000 * 10); - let scalar_type = ScalarType::I32; - let shape = Shape::Scalar; - let ctb = ConnTimeBin::new(rt, series, beg, bin_len, scalar_type, shape).unwrap(); - Ok(ctb) -}