diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 86c7ea7..b4e1a8f 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.2-aa.1" +version = "0.2.2-aa.2" authors = ["Dominik Werder "] edition = "2021" diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 256f08e..80f8b3c 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1124,12 +1124,15 @@ impl CaConn { let conf_poll_conf = conf.poll_conf(); let chst = &mut conf.state; if let ChannelState::MakingSeriesWriter(st2) = chst { + let dt = stnow.duration_since(SystemTime::UNIX_EPOCH).unwrap(); + let beg = TsNano::from_ns(SEC * dt.as_secs() + dt.subsec_nanos() as u64); let binwriter = BinWriter::new( + beg, + RetentionTime::Short, st2.channel.cssid, writer.sid(), st2.channel.scalar_type.clone(), st2.channel.shape.clone(), - stnow, )?; self.stats.get_series_id_ok.inc(); { @@ -1759,7 +1762,7 @@ impl CaConn { let ts_ioc = TsNano::from_ns(ts); let ts_local = TsNano::from_ns(ts_local); let val: DataValue = value.data.into(); - binwriter.ingest(ts_ioc, ts_local, &val, iqdqs)?; + // binwriter.ingest(ts_ioc, ts_local, &val, iqdqs)?; { let ((dwst, dwmt, dwlt),) = writer.write(ts_ioc, ts_local, val, iqdqs)?; if dwst { diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index a26a970..3fa0065 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -364,85 +364,36 @@ pub trait GetValHelp { fn get(&self) -> Result<&Self::ScalTy, Error>; } -impl GetValHelp for DataValue { - type ScalTy = i8; - fn get(&self) -> Result<&Self::ScalTy, Error> { - match self { - DataValue::Scalar(v) => match v { - ScalarValue::I8(v) => Ok(v), - _ => { - //let ty = any::type_name::(); - Err(Error::GetValHelpInnerTypeMismatch) +macro_rules! impl_scalar_get_val_help { + ($sty:ty, $varname:ident) => { + impl GetValHelp<$sty> for DataValue { + type ScalTy = $sty; + fn get(&self) -> Result<&Self::ScalTy, Error> { + match self { + DataValue::Scalar(v) => match v { + ScalarValue::$varname(v) => Ok(v), + _ => { + //let ty = any::type_name::(); + Err(Error::GetValHelpInnerTypeMismatch) + } + }, + _ => Err(Error::GetValHelpTodoWaveform), } - }, - _ => Err(Error::GetValHelpTodoWaveform), + } } - } + }; } -impl GetValHelp for DataValue { - type ScalTy = i16; - fn get(&self) -> Result<&Self::ScalTy, Error> { - match self { - DataValue::Scalar(v) => match v { - ScalarValue::I16(v) => Ok(v), - _ => { - //let ty = any::type_name::(); - Err(Error::GetValHelpInnerTypeMismatch) - } - }, - _ => Err(Error::GetValHelpTodoWaveform), - } - } -} - -impl GetValHelp for DataValue { - type ScalTy = i32; - fn get(&self) -> Result<&Self::ScalTy, Error> { - match self { - DataValue::Scalar(v) => match v { - ScalarValue::I32(v) => Ok(v), - _ => { - //let ty = any::type_name::(); - Err(Error::GetValHelpInnerTypeMismatch) - } - }, - _ => Err(Error::GetValHelpTodoWaveform), - } - } -} - -impl GetValHelp for DataValue { - type ScalTy = f32; - fn get(&self) -> Result<&Self::ScalTy, Error> { - match self { - DataValue::Scalar(v) => match v { - ScalarValue::F32(v) => Ok(v), - _ => { - //let ty = any::type_name::(); - Err(Error::GetValHelpInnerTypeMismatch) - } - }, - _ => Err(Error::GetValHelpTodoWaveform), - } - } -} - -impl GetValHelp for DataValue { - type ScalTy = f64; - fn get(&self) -> Result<&Self::ScalTy, Error> { - match self { - DataValue::Scalar(v) => match v { - ScalarValue::F64(v) => Ok(v), - _ => { - //let ty = any::type_name::(); - Err(Error::GetValHelpInnerTypeMismatch) - } - }, - _ => Err(Error::GetValHelpTodoWaveform), - } - } -} +impl_scalar_get_val_help!(u8, U8); +impl_scalar_get_val_help!(u16, U16); +impl_scalar_get_val_help!(u32, U32); +impl_scalar_get_val_help!(u64, U64); +impl_scalar_get_val_help!(i8, I8); +impl_scalar_get_val_help!(i16, I16); +impl_scalar_get_val_help!(i32, I32); +impl_scalar_get_val_help!(i64, I64); +impl_scalar_get_val_help!(f32, F32); +impl_scalar_get_val_help!(f64, F64); #[derive(Debug, Clone)] pub enum ConnectionStatus { diff --git a/serieswriter/src/binwriter.rs b/serieswriter/src/binwriter.rs index 5133fc0..2b0ab59 100644 --- a/serieswriter/src/binwriter.rs +++ b/serieswriter/src/binwriter.rs @@ -4,6 +4,8 @@ use async_channel::Sender; use err::thiserror; use err::ThisError; use netpod::log::*; +use netpod::ttl::RetentionTime; +use netpod::DtNano; use netpod::ScalarType; use netpod::Shape; use netpod::TsNano; @@ -34,6 +36,7 @@ pub enum Error { #[derive(Debug)] pub struct BinWriter { + rt: RetentionTime, sid: SeriesId, scalar_type: ScalarType, shape: Shape, @@ -42,17 +45,28 @@ pub struct BinWriter { impl BinWriter { pub fn new( + beg: TsNano, + rt: RetentionTime, // channel_info_tx: Sender, cssid: ChannelStatusSeriesId, sid: SeriesId, scalar_type: ScalarType, shape: Shape, - stnow: SystemTime, ) -> Result { - type A = SeriesWriter; - let mut binner = ConnTimeBin::empty(sid.clone(), TsNano::from_ms(1000 * 2)); - binner.setup_for(&scalar_type, &shape, stnow)?; + // TODO select the desired bin width based on channel configuration: + // that's user knowledge, it really depends on what users want. + // For the moment, assume a fixed value. + let bin_len = DtNano::from_ms(1000 * 10); + let binner = ConnTimeBin::new( + rt.clone(), + sid.clone(), + beg, + bin_len, + scalar_type.clone(), + shape.clone(), + )?; let ret = Self { + rt, sid, scalar_type, shape, diff --git a/serieswriter/src/patchcollect.rs b/serieswriter/src/patchcollect.rs index f1633bf..8793747 100644 --- a/serieswriter/src/patchcollect.rs +++ b/serieswriter/src/patchcollect.rs @@ -17,7 +17,7 @@ pub struct PatchCollect { } impl PatchCollect { - pub fn new(bin_len: TsNano, bin_count: u64) -> Self { + fn new(bin_len: TsNano, bin_count: u64) -> Self { Self { patch_len: TsNano::from_ns(bin_len.ns() * bin_count), bin_len, @@ -40,7 +40,7 @@ impl PatchCollect { self.bin_count } - pub fn ingest(&mut self, item: &mut dyn TimeBinned) -> Result<(), Error> { + fn ingest(&mut self, item: &mut dyn TimeBinned) -> Result<(), Error> { let mut n1 = 0; let mut item_len_exp = item.len(); loop { diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index 6c604b6..59d46e8 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -4,6 +4,7 @@ use dbpg::seriesbychannel::ChannelInfoQuery; use err::thiserror; use err::ThisError; use netpod::log::*; +use netpod::DtNano; use netpod::ScalarType; use netpod::SeriesKind; use netpod::Shape; @@ -182,6 +183,9 @@ impl RtWriter { } else if ts_local.ms() - last.ts_local.ms() < 1000 * min_quiet.as_secs() { trace_rt_decision!("{rt} {sid} ignore, because not min quiet"); false + } else if ts_local.delta(last.ts_local) < DtNano::from_ms(5) { + trace_rt_decision!("{rt} {sid} ignore, because store rate cap"); + false } else if val == last.val { trace_rt_decision!("{rt} {sid} ignore, because value did not change"); false diff --git a/serieswriter/src/timebin.rs b/serieswriter/src/timebin.rs index fa834cc..96599f2 100644 --- a/serieswriter/src/timebin.rs +++ b/serieswriter/src/timebin.rs @@ -1,4 +1,4 @@ -use crate::patchcollect::PatchCollect; +use any::Any; use core::fmt; use err::thiserror; use err::ThisError; @@ -12,11 +12,14 @@ use items_0::WithLen; use items_2::binsdim0::BinsDim0; use items_2::eventsdim0::EventsDim0; use items_2::eventsdim0::EventsDim0TimeBinner; +use netpod::f32_close; use netpod::log::*; use netpod::timeunits::MS; -use netpod::timeunits::SEC; +use netpod::ttl::RetentionTime; use netpod::BinnedRange; use netpod::BinnedRangeEnum; +use netpod::DtMs; +use netpod::DtNano; use netpod::ScalarType; use netpod::Shape; use netpod::TsMs; @@ -28,14 +31,47 @@ use scywr::iteminsertqueue::QueryItem; use scywr::iteminsertqueue::TimeBinSimpleF32; use series::SeriesId; use std::any; -use std::any::Any; -use std::collections::VecDeque; -use std::time::SystemTime; #[allow(unused)] -macro_rules! trace2 { +macro_rules! todo_setup { ($($arg:tt)*) => { - if false { + if true { + debug!($($arg)*); + } + }; +} + +#[allow(unused)] +macro_rules! trace_store_bin { + ($($arg:tt)*) => { + if true { + trace!($($arg)*); + } + }; +} + +#[allow(unused)] +macro_rules! trace_setup { + ($($arg:tt)*) => { + if true { + trace!($($arg)*); + } + }; +} + +#[allow(unused)] +macro_rules! trace_tick { + ($($arg:tt)*) => { + if true { + trace!($($arg)*); + } + }; +} + +#[allow(unused)] +macro_rules! trace_push { + ($($arg:tt)*) => { + if true { trace!($($arg)*); } }; @@ -43,59 +79,76 @@ macro_rules! trace2 { #[derive(Debug, ThisError)] pub enum Error { + UnexpectedContainer, PatchWithoutBins, PatchUnexpectedContainer, GetValHelpMismatch, HaveBinsButNoneReturned, - ErrError(#[from] err::Error), + UnsupportedType, + Unsupported, } struct TickParams<'a> { + rt: RetentionTime, series: SeriesId, - acc: &'a mut Box, - tb: &'a mut Box, - pc: &'a mut PatchCollect, + acc: &'a mut (dyn Any + Send), + tb: &'a mut dyn TimeBinner, iqdqs: &'a mut InsertDeques, next_coarse: Option<&'a mut EventsDim0TimeBinner>, } pub struct PushFnParams<'a> { - sid: SeriesId, - acc: &'a mut Box, + series: SeriesId, + acc: &'a mut (dyn Any + Send), ts: TsNano, val: &'a DataValue, } -pub struct ConnTimeBin { - did_setup: bool, - series: SeriesId, - bin_len: TsNano, - next_coarse: Option>>, - patch_collect: PatchCollect, - events_binner: Option>, +struct Internal { + push_fn: Box Result<(), Error> + Send>, + tick_fn: Box Result<(), Error> + Send>, +} + +impl fmt::Debug for Internal { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Internal") + .field("push_fn", &()) + .field("tick_fn", &()) + .finish() + } +} + +struct SetupResult { + events_binner: Box, acc: Box, push_fn: Box Result<(), Error> + Send>, tick_fn: Box Result<(), Error> + Send>, } -impl fmt::Debug for ConnTimeBin { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("ConnTimeBin") - .field("did_setup", &self.did_setup) - .field("series", &self.series) - .field("acc", &self.acc) - // .field("push_fn", &self.push_fn) - // .field("tick_fn", &self.tick_fn) - .field("events_binner", &self.events_binner) - .field("patch_collect", &self.patch_collect) - .finish() - } +#[derive(Debug)] +pub struct ConnTimeBin { + rt: RetentionTime, + series: SeriesId, + #[allow(unused)] + bin_len: DtNano, + next_coarse: Option>>, + events_binner: Box, + acc: Box, + internal: Internal, + unsup: bool, } impl ConnTimeBin { - pub fn empty(series: SeriesId, bin_len: TsNano) -> Self { + pub fn new( + rt: RetentionTime, + series: SeriesId, + beg: TsNano, + bin_len: DtNano, + scalar_type: ScalarType, + shape: Shape, + ) -> Result { let do_time_weight = true; - #[cfg(DISABLED)] + #[cfg(target_abi = "12")] let next_coarse = if bin_len.ns() < SEC * 60 { type ST = f32; let brange = BinnedRange { @@ -114,122 +167,110 @@ impl ConnTimeBin { None } .map(Box::new); - Self { - patch_collect: PatchCollect::new(bin_len.clone(), 1), - did_setup: false, + let mut unsup = false; + let k = Self::setup_for(beg, bin_len, &scalar_type, &shape, do_time_weight); + let k = if k.is_ok() { + k + } else { + unsup = true; + Self::setup_for(beg, bin_len, &ScalarType::F32, &Shape::Scalar, do_time_weight) + }; + let k = k?; + let ret = Self { + rt, series, bin_len, next_coarse: None, - events_binner: None, - acc: Box::new(()), - push_fn: Box::new(push::), - tick_fn: Box::new(tick::), + events_binner: k.events_binner, + acc: k.acc, + internal: Internal { + push_fn: k.push_fn, + tick_fn: k.tick_fn, + }, + unsup, + }; + Ok(ret) + } + + fn setup_for( + beg: TsNano, + bin_len: DtNano, + scalar_type: &ScalarType, + shape: &Shape, + do_time_weight: bool, + ) -> Result { + // TODO should not take a system time here: + let range1 = BinnedRange { + bin_off: beg.ns() / bin_len.ns(), + bin_cnt: u64::MAX / bin_len.ns() - 10, + // TODO fix trait requirements + bin_len: TsNano::from_ns(bin_len.ns()), + }; + let binrange = BinnedRangeEnum::Time(range1); + match shape { + Shape::Scalar => { + use ScalarType::*; + match scalar_type { + U8 => Self::setup_scalar::(binrange, do_time_weight), + U16 => Self::setup_scalar::(binrange, do_time_weight), + U32 => Self::setup_scalar::(binrange, do_time_weight), + U64 => Self::setup_scalar::(binrange, do_time_weight), + I8 => Self::setup_scalar::(binrange, do_time_weight), + I16 => Self::setup_scalar::(binrange, do_time_weight), + I32 => Self::setup_scalar::(binrange, do_time_weight), + I64 => Self::setup_scalar::(binrange, do_time_weight), + F32 => Self::setup_scalar::(binrange, do_time_weight), + F64 => Self::setup_scalar::(binrange, do_time_weight), + STRING => { + todo_setup!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); + Err(Error::UnsupportedType) + } + _ => { + todo_setup!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); + Err(Error::UnsupportedType) + } + } + } + Shape::Wave(..) => match scalar_type { + _ => { + todo_setup!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); + Err(Error::UnsupportedType) + } + }, + _ => { + todo_setup!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); + Err(Error::UnsupportedType) + } } } - pub fn setup_for(&mut self, scalar_type: &ScalarType, shape: &Shape, tsnow: SystemTime) -> Result<(), Error> { - use ScalarType::*; - // TODO should not take a system time here: - let bin_len = &self.bin_len; - let ts0 = SEC * tsnow.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); - let range1 = BinnedRange { - bin_off: ts0 / bin_len.ns(), - bin_cnt: u64::MAX / bin_len.ns() - 10, - bin_len: bin_len.clone(), + fn setup_scalar(binrange: BinnedRangeEnum, do_time_weight: bool) -> Result + where + ST: ScalarOps, + DataValue: GetValHelp, + { + trace_setup!("SCALAR {}", any::type_name::()); + type Cont = EventsDim0; + let cont = Cont::::empty(); + let emit_empty_bins = false; + let ret = SetupResult { + events_binner: cont + .as_time_binnable_ref() + .time_binner_new(binrange, do_time_weight, emit_empty_bins), + acc: Box::new(cont), + push_fn: Box::new(push::), + tick_fn: Box::new(tick::), }; - let binrange = BinnedRangeEnum::Time(range1); - //info!("binrange {binrange:?}"); - let do_time_weight = true; - match shape { - Shape::Scalar => { - type Cont = EventsDim0; - match scalar_type { - I8 => { - type ST = i8; - trace2!("SCALAR {}", any::type_name::()); - let cont = Cont::::empty(); - self.events_binner = - Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); - self.acc = Box::new(cont); - self.push_fn = Box::new(push::); - self.tick_fn = Box::new(tick::); - self.did_setup = true; - } - I16 => { - type ST = i16; - trace2!("SCALAR {}", std::any::type_name::()); - let cont = Cont::::empty(); - self.events_binner = - Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); - self.acc = Box::new(cont); - self.push_fn = Box::new(push::); - self.tick_fn = Box::new(tick::); - self.did_setup = true; - } - I32 => { - type ST = i32; - trace2!("SCALAR {}", std::any::type_name::()); - let cont = Cont::::empty(); - self.events_binner = - Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); - self.acc = Box::new(cont); - self.push_fn = Box::new(push::); - self.tick_fn = Box::new(tick::); - self.did_setup = true; - } - F32 => { - type ST = f32; - trace2!("SCALAR {}", std::any::type_name::()); - let cont = Cont::::empty(); - self.events_binner = - Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); - self.acc = Box::new(cont); - self.push_fn = Box::new(push::); - self.tick_fn = Box::new(tick::); - self.did_setup = true; - } - F64 => { - type ST = f64; - trace2!("SCALAR {}", std::any::type_name::()); - let cont = Cont::::empty(); - self.events_binner = - Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); - self.acc = Box::new(cont); - self.push_fn = Box::new(push::); - self.tick_fn = Box::new(tick::); - self.did_setup = true; - } - STRING => { - trace2!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); - } - _ => { - trace2!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); - } - } - } - Shape::Wave(..) => { - //type Cont = EventsDim1; - match scalar_type { - _ => { - trace2!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); - } - } - } - _ => { - trace2!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); - } - } - Ok(()) + Ok(ret) } pub fn push(&mut self, ts: TsNano, val: &DataValue) -> Result<(), Error> { - if !self.did_setup { - // TODO record as logic error + if self.unsup { return Ok(()); } - let (f, acc) = (&self.push_fn, &mut self.acc); + let (f, acc) = (&self.internal.push_fn, self.acc.as_mut()); let params = PushFnParams { - sid: self.series.clone(), + series: self.series.clone(), acc, ts, val, @@ -238,20 +279,38 @@ impl ConnTimeBin { } pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { - if !self.did_setup { + if self.unsup { return Ok(()); } - let (f,) = (&self.tick_fn,); + let (f,) = (&self.internal.tick_fn,); let params = TickParams { + rt: self.rt.clone(), series: self.series.clone(), - acc: &mut self.acc, - tb: self.events_binner.as_mut().unwrap(), - pc: &mut self.patch_collect, + acc: self.acc.as_mut(), + tb: self.events_binner.as_mut(), + // pc: &mut self.patch_collect, iqdqs, next_coarse: self.next_coarse.as_mut().map(|x| x.as_mut()), }; f(params) } + + pub fn finish(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { + if self.unsup { + return Ok(()); + } + let series = self.series.clone(); + let tb = self.events_binner.as_mut(); + tb.push_in_progress(false); + let nbins = tb.bins_ready_count(); + if nbins >= 1 { + trace_store_bin!("finish nbins {} {:?}", nbins, series); + let rt = self.rt.clone(); + let next_coarse = self.next_coarse.as_mut().map(|x| x.as_mut()); + store_bins(rt, series, tb, iqdqs, next_coarse)?; + } + Ok(()) + } } fn push(params: PushFnParams) -> Result<(), Error> @@ -259,7 +318,7 @@ where STY: ScalarOps, DataValue: GetValHelp, { - let sid = ¶ms.sid; + let series = ¶ms.series; let ts = params.ts; let v = match GetValHelp::::get(params.val) { Ok(x) => x, @@ -267,7 +326,7 @@ where // TODO throttle the error let msg = format!( "GetValHelp mismatch: series {:?} STY {} data {:?} {e}", - sid, + series, any::type_name::(), params.val ); @@ -276,13 +335,11 @@ where } }; if let Some(c) = params.acc.downcast_mut::>() { + trace_push!("PUSHED"); c.push(ts.ns(), 0, v.clone()); Ok(()) } else { - // TODO report once and error out - error!("unexpected container"); - //Err(Error::with_msg_no_trace("unexpected container")) - Ok(()) + Err(Error::UnexpectedContainer) } } @@ -290,19 +347,23 @@ fn tick(params: TickParams) -> Result<(), Error> where STY: ScalarOps, { + let rt = params.rt; let acc = params.acc; let tb = params.tb; - // let pc = params.pc; let iqdqs = params.iqdqs; let next = params.next_coarse; if let Some(c) = acc.downcast_mut::>() { + trace_tick!("TICK CONV"); if c.len() >= 1 { + trace_tick!("TICK EVENTS"); tb.ingest(c); c.reset(); + trace_tick!("TICK INGESTED"); let nbins = tb.bins_ready_count(); if nbins >= 1 { - trace!("store bins len {} {:?}", nbins, params.series); - store_bins(params.series.clone(), tb, iqdqs, next)?; + trace_tick!("TICK READY {nbins}"); + trace_store_bin!("store bins len {} {:?}", nbins, params.series); + store_bins(rt, params.series.clone(), tb, iqdqs, next)?; // if let Some(mut bins) = tb.bins_ready() { // //info!("store bins {bins:?}"); // let mut bins = bins.to_simple_bins_f32(); @@ -326,21 +387,23 @@ where Ok(()) } else { + trace_tick!("TICK NOT READY"); Ok(()) } } else { + trace_tick!("TICK NO EVENTS TO PROCESS"); Ok(()) } } else { - error!("unexpected container"); - //Err(Error::with_msg_no_trace("unexpected container")) - Ok(()) + trace_tick!("TICK UNEXPECTED CONTAINER"); + Err(Error::UnexpectedContainer) } } fn store_bins( + rt: RetentionTime, series: SeriesId, - tb: &mut Box, + tb: &mut dyn TimeBinner, iqdqs: &mut InsertDeques, next: Option<&mut EventsDim0TimeBinner>, ) -> Result<(), Error> { @@ -360,23 +423,25 @@ fn store_bins( .zip(k.avgs.iter()) { // TODO the inner must be of BinsDim0 type so we feed also count, min, max, etc. - if let Some(next) = &next { + if let Some(_next) = &next { // next.ingest(); } - // TODO this must depend on the data type: waveforms need smaller batches - let bins_per_msp = 82000; - - let ts1ms = ts1 / MS; - let ts2ms = ts2 / MS; - let bin_len_ms = ts2ms - ts1ms; - let h = bins_per_msp * bin_len_ms; - let ts_msp = ts1ms / h * h; - let off = (ts1ms - ts_msp) / bin_len_ms; + let ts1 = TsMs::from_ms_u64(ts1 / MS); + let ts2 = TsMs::from_ms_u64(ts2 / MS); + let bin_len = ts2 - ts1; + let h = if bin_len == DtMs::from_ms_u64(1000 * 10) { + DtMs::from_ms_u64(1000 * 60 * 60 * 2) + } else { + // TODO + return Err(Error::Unsupported); + }; + let ts_msp = TsMs::from_ms_u64(ts1.ms() / h.ms() * h.ms()); + let off = (ts1.ms() - ts_msp.ms()) / bin_len.ms(); let item = TimeBinSimpleF32 { series: series.clone(), - bin_len_ms: bin_len_ms as i32, - ts_msp: TsMs::from_ms_u64(ts_msp), + bin_len_ms: bin_len.ms() as i32, + ts_msp, off: off as i32, count: count as i64, min, @@ -384,12 +449,8 @@ fn store_bins( avg, }; let item = QueryItem::TimeBinSimpleF32(item); - trace!("push item B ts1ms {ts1ms} bin_len_ms {bin_len_ms} ts_msp {ts_msp} off {off}"); - - // TODO check which RT we want to push into - iqdqs.st_rf3_rx.push_back(item.clone()); - // iqdqs.mt_rf3_rx.push_back(item.clone()); - // iqdqs.lt_rf3_rx.push_back(item); + trace_store_bin!("push item B ts1ms {ts1:?} bin_len_ms {bin_len:?} ts_msp {ts_msp} off {off}"); + iqdqs.deque(rt.clone()).push_back(item.clone()); } } Ok(()) @@ -414,38 +475,57 @@ fn store_bins( } } -fn store_patch(series: SeriesId, pc: &mut PatchCollect, iiq: &mut VecDeque) -> Result<(), Error> { - // TODO - // I probably still want to keep the "patchcollect" because I want to store also the next - // resolutions. - // But I need to emit each bin as they come. +#[test] +fn test_00() { + let mut ctb = init_scalar_f32_conn_time_bin().unwrap(); + let mut iqdqs = InsertDeques::new(); + ctb.tick(&mut iqdqs).unwrap(); + assert_eq!(iqdqs.len(), 0); +} - for item in pc.take_outq() { - if let Some(k) = item.as_any_ref().downcast_ref::>() { - let ts0 = if let Some(x) = k.ts1s.front() { - *x - } else { - return Err(Error::PatchWithoutBins); - }; +#[test] +fn test_01() { + use scywr::iteminsertqueue::ScalarValue; + let mut ctb = init_scalar_f32_conn_time_bin().unwrap(); + ctb.push(TsNano::from_ms(1000), &DataValue::Scalar(ScalarValue::I32(10))) + .unwrap(); + let mut iqdqs = InsertDeques::new(); + ctb.tick(&mut iqdqs).unwrap(); + assert_eq!(iqdqs.len(), 0); +} - // TODO insert each bin individually - - let bin_len_sec = (pc.bin_len().ns() / MS); - let bin_count = pc.bin_count(); - let off = ts0 / pc.patch_len().ns(); - let off_msp = off / 1000; - let off_lsp = off % 1000; - // let item = TimeBinSimpleF32 { - // }; - // let item = QueryItem::TimeBinSimpleF32(item); - // warn!( - // "push item B bin_len_sec {bin_len_sec} bin_count {bin_count} off_msp {off_msp} off_lsp {off_lsp}" - // ); - // iiq.push_back(item); +#[test] +fn test_02() { + use scywr::iteminsertqueue::ScalarValue; + let mut ctb = init_scalar_f32_conn_time_bin().unwrap(); + ctb.push(TsNano::from_ms(1000 * 10), &DataValue::Scalar(ScalarValue::I32(10))) + .unwrap(); + ctb.push(TsNano::from_ms(1000 * 11), &DataValue::Scalar(ScalarValue::I32(12))) + .unwrap(); + ctb.push(TsNano::from_ms(1000 * 12), &DataValue::Scalar(ScalarValue::I32(10))) + .unwrap(); + let mut iqdqs = InsertDeques::new(); + ctb.tick(&mut iqdqs).unwrap(); + ctb.finish(&mut iqdqs).unwrap(); + assert_eq!(iqdqs.len(), 1); + for e in iqdqs.st_rf3_rx { + eprintln!("{e:?}"); + if let QueryItem::TimeBinSimpleF32(x) = e { + assert!(f32_close(x.avg, 10.2)); } else { - error!("unexpected container!"); - return Err(Error::PatchUnexpectedContainer); + panic!(); } } - Ok(()) +} + +#[cfg(test)] +fn init_scalar_f32_conn_time_bin() -> Result { + let rt = RetentionTime::Short; + let series = SeriesId::new(1); + let beg = TsNano::from_ms(1000 * 10); + let bin_len = DtNano::from_ms(1000 * 10); + let scalar_type = ScalarType::I32; + let shape = Shape::Scalar; + let ctb = ConnTimeBin::new(rt, series, beg, bin_len, scalar_type, shape).unwrap(); + Ok(ctb) } diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index 5f67c5f..c123fb8 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -28,7 +28,6 @@ pub enum Error { Scy(#[from] scywr::session::Error), ScySchema(#[from] scywr::schema::Error), Series(#[from] dbpg::seriesbychannel::Error), - Timebin(#[from] crate::timebin::Error), } impl From> for Error {