WIP better binning, simple rate limit

This commit is contained in:
Dominik Werder
2024-06-28 16:31:39 +02:00
parent e0d24b6258
commit e34fee60fd
8 changed files with 338 additions and 287 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.2.2-aa.1"
version = "0.2.2-aa.2"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -1124,12 +1124,15 @@ impl CaConn {
let conf_poll_conf = conf.poll_conf();
let chst = &mut conf.state;
if let ChannelState::MakingSeriesWriter(st2) = chst {
let dt = stnow.duration_since(SystemTime::UNIX_EPOCH).unwrap();
let beg = TsNano::from_ns(SEC * dt.as_secs() + dt.subsec_nanos() as u64);
let binwriter = BinWriter::new(
beg,
RetentionTime::Short,
st2.channel.cssid,
writer.sid(),
st2.channel.scalar_type.clone(),
st2.channel.shape.clone(),
stnow,
)?;
self.stats.get_series_id_ok.inc();
{
@@ -1759,7 +1762,7 @@ impl CaConn {
let ts_ioc = TsNano::from_ns(ts);
let ts_local = TsNano::from_ns(ts_local);
let val: DataValue = value.data.into();
binwriter.ingest(ts_ioc, ts_local, &val, iqdqs)?;
// binwriter.ingest(ts_ioc, ts_local, &val, iqdqs)?;
{
let ((dwst, dwmt, dwlt),) = writer.write(ts_ioc, ts_local, val, iqdqs)?;
if dwst {

View File

@@ -364,85 +364,36 @@ pub trait GetValHelp<T> {
fn get(&self) -> Result<&Self::ScalTy, Error>;
}
impl GetValHelp<i8> for DataValue {
type ScalTy = i8;
fn get(&self) -> Result<&Self::ScalTy, Error> {
match self {
DataValue::Scalar(v) => match v {
ScalarValue::I8(v) => Ok(v),
_ => {
//let ty = any::type_name::<Self::ScalTy>();
Err(Error::GetValHelpInnerTypeMismatch)
macro_rules! impl_scalar_get_val_help {
($sty:ty, $varname:ident) => {
impl GetValHelp<$sty> for DataValue {
type ScalTy = $sty;
fn get(&self) -> Result<&Self::ScalTy, Error> {
match self {
DataValue::Scalar(v) => match v {
ScalarValue::$varname(v) => Ok(v),
_ => {
//let ty = any::type_name::<Self::ScalTy>();
Err(Error::GetValHelpInnerTypeMismatch)
}
},
_ => Err(Error::GetValHelpTodoWaveform),
}
},
_ => Err(Error::GetValHelpTodoWaveform),
}
}
}
};
}
impl GetValHelp<i16> for DataValue {
type ScalTy = i16;
fn get(&self) -> Result<&Self::ScalTy, Error> {
match self {
DataValue::Scalar(v) => match v {
ScalarValue::I16(v) => Ok(v),
_ => {
//let ty = any::type_name::<Self::ScalTy>();
Err(Error::GetValHelpInnerTypeMismatch)
}
},
_ => Err(Error::GetValHelpTodoWaveform),
}
}
}
impl GetValHelp<i32> for DataValue {
type ScalTy = i32;
fn get(&self) -> Result<&Self::ScalTy, Error> {
match self {
DataValue::Scalar(v) => match v {
ScalarValue::I32(v) => Ok(v),
_ => {
//let ty = any::type_name::<Self::ScalTy>();
Err(Error::GetValHelpInnerTypeMismatch)
}
},
_ => Err(Error::GetValHelpTodoWaveform),
}
}
}
impl GetValHelp<f32> for DataValue {
type ScalTy = f32;
fn get(&self) -> Result<&Self::ScalTy, Error> {
match self {
DataValue::Scalar(v) => match v {
ScalarValue::F32(v) => Ok(v),
_ => {
//let ty = any::type_name::<Self::ScalTy>();
Err(Error::GetValHelpInnerTypeMismatch)
}
},
_ => Err(Error::GetValHelpTodoWaveform),
}
}
}
impl GetValHelp<f64> for DataValue {
type ScalTy = f64;
fn get(&self) -> Result<&Self::ScalTy, Error> {
match self {
DataValue::Scalar(v) => match v {
ScalarValue::F64(v) => Ok(v),
_ => {
//let ty = any::type_name::<Self::ScalTy>();
Err(Error::GetValHelpInnerTypeMismatch)
}
},
_ => Err(Error::GetValHelpTodoWaveform),
}
}
}
impl_scalar_get_val_help!(u8, U8);
impl_scalar_get_val_help!(u16, U16);
impl_scalar_get_val_help!(u32, U32);
impl_scalar_get_val_help!(u64, U64);
impl_scalar_get_val_help!(i8, I8);
impl_scalar_get_val_help!(i16, I16);
impl_scalar_get_val_help!(i32, I32);
impl_scalar_get_val_help!(i64, I64);
impl_scalar_get_val_help!(f32, F32);
impl_scalar_get_val_help!(f64, F64);
#[derive(Debug, Clone)]
pub enum ConnectionStatus {

View File

@@ -4,6 +4,8 @@ use async_channel::Sender;
use err::thiserror;
use err::ThisError;
use netpod::log::*;
use netpod::ttl::RetentionTime;
use netpod::DtNano;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsNano;
@@ -34,6 +36,7 @@ pub enum Error {
#[derive(Debug)]
pub struct BinWriter {
rt: RetentionTime,
sid: SeriesId,
scalar_type: ScalarType,
shape: Shape,
@@ -42,17 +45,28 @@ pub struct BinWriter {
impl BinWriter {
pub fn new(
beg: TsNano,
rt: RetentionTime,
// channel_info_tx: Sender<ChannelInfoQuery>,
cssid: ChannelStatusSeriesId,
sid: SeriesId,
scalar_type: ScalarType,
shape: Shape,
stnow: SystemTime,
) -> Result<Self, Error> {
type A = SeriesWriter;
let mut binner = ConnTimeBin::empty(sid.clone(), TsNano::from_ms(1000 * 2));
binner.setup_for(&scalar_type, &shape, stnow)?;
// TODO select the desired bin width based on channel configuration:
// that's user knowledge, it really depends on what users want.
// For the moment, assume a fixed value.
let bin_len = DtNano::from_ms(1000 * 10);
let binner = ConnTimeBin::new(
rt.clone(),
sid.clone(),
beg,
bin_len,
scalar_type.clone(),
shape.clone(),
)?;
let ret = Self {
rt,
sid,
scalar_type,
shape,

View File

@@ -17,7 +17,7 @@ pub struct PatchCollect {
}
impl PatchCollect {
pub fn new(bin_len: TsNano, bin_count: u64) -> Self {
fn new(bin_len: TsNano, bin_count: u64) -> Self {
Self {
patch_len: TsNano::from_ns(bin_len.ns() * bin_count),
bin_len,
@@ -40,7 +40,7 @@ impl PatchCollect {
self.bin_count
}
pub fn ingest(&mut self, item: &mut dyn TimeBinned) -> Result<(), Error> {
fn ingest(&mut self, item: &mut dyn TimeBinned) -> Result<(), Error> {
let mut n1 = 0;
let mut item_len_exp = item.len();
loop {

View File

@@ -4,6 +4,7 @@ use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use netpod::log::*;
use netpod::DtNano;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
@@ -182,6 +183,9 @@ impl RtWriter {
} else if ts_local.ms() - last.ts_local.ms() < 1000 * min_quiet.as_secs() {
trace_rt_decision!("{rt} {sid} ignore, because not min quiet");
false
} else if ts_local.delta(last.ts_local) < DtNano::from_ms(5) {
trace_rt_decision!("{rt} {sid} ignore, because store rate cap");
false
} else if val == last.val {
trace_rt_decision!("{rt} {sid} ignore, because value did not change");
false

View File

@@ -1,4 +1,4 @@
use crate::patchcollect::PatchCollect;
use any::Any;
use core::fmt;
use err::thiserror;
use err::ThisError;
@@ -12,11 +12,14 @@ use items_0::WithLen;
use items_2::binsdim0::BinsDim0;
use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim0::EventsDim0TimeBinner;
use netpod::f32_close;
use netpod::log::*;
use netpod::timeunits::MS;
use netpod::timeunits::SEC;
use netpod::ttl::RetentionTime;
use netpod::BinnedRange;
use netpod::BinnedRangeEnum;
use netpod::DtMs;
use netpod::DtNano;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsMs;
@@ -28,14 +31,47 @@ use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::TimeBinSimpleF32;
use series::SeriesId;
use std::any;
use std::any::Any;
use std::collections::VecDeque;
use std::time::SystemTime;
#[allow(unused)]
macro_rules! trace2 {
macro_rules! todo_setup {
($($arg:tt)*) => {
if false {
if true {
debug!($($arg)*);
}
};
}
#[allow(unused)]
macro_rules! trace_store_bin {
($($arg:tt)*) => {
if true {
trace!($($arg)*);
}
};
}
#[allow(unused)]
macro_rules! trace_setup {
($($arg:tt)*) => {
if true {
trace!($($arg)*);
}
};
}
#[allow(unused)]
macro_rules! trace_tick {
($($arg:tt)*) => {
if true {
trace!($($arg)*);
}
};
}
#[allow(unused)]
macro_rules! trace_push {
($($arg:tt)*) => {
if true {
trace!($($arg)*);
}
};
@@ -43,59 +79,76 @@ macro_rules! trace2 {
#[derive(Debug, ThisError)]
pub enum Error {
UnexpectedContainer,
PatchWithoutBins,
PatchUnexpectedContainer,
GetValHelpMismatch,
HaveBinsButNoneReturned,
ErrError(#[from] err::Error),
UnsupportedType,
Unsupported,
}
struct TickParams<'a> {
rt: RetentionTime,
series: SeriesId,
acc: &'a mut Box<dyn Any + Send>,
tb: &'a mut Box<dyn TimeBinner>,
pc: &'a mut PatchCollect,
acc: &'a mut (dyn Any + Send),
tb: &'a mut dyn TimeBinner,
iqdqs: &'a mut InsertDeques,
next_coarse: Option<&'a mut EventsDim0TimeBinner<f32>>,
}
pub struct PushFnParams<'a> {
sid: SeriesId,
acc: &'a mut Box<dyn Any + Send>,
series: SeriesId,
acc: &'a mut (dyn Any + Send),
ts: TsNano,
val: &'a DataValue,
}
pub struct ConnTimeBin {
did_setup: bool,
series: SeriesId,
bin_len: TsNano,
next_coarse: Option<Box<EventsDim0TimeBinner<f32>>>,
patch_collect: PatchCollect,
events_binner: Option<Box<dyn TimeBinner>>,
struct Internal {
push_fn: Box<dyn Fn(PushFnParams) -> Result<(), Error> + Send>,
tick_fn: Box<dyn Fn(TickParams) -> Result<(), Error> + Send>,
}
impl fmt::Debug for Internal {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Internal")
.field("push_fn", &())
.field("tick_fn", &())
.finish()
}
}
struct SetupResult {
events_binner: Box<dyn TimeBinner>,
acc: Box<dyn Any + Send>,
push_fn: Box<dyn Fn(PushFnParams) -> Result<(), Error> + Send>,
tick_fn: Box<dyn Fn(TickParams) -> Result<(), Error> + Send>,
}
impl fmt::Debug for ConnTimeBin {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("ConnTimeBin")
.field("did_setup", &self.did_setup)
.field("series", &self.series)
.field("acc", &self.acc)
// .field("push_fn", &self.push_fn)
// .field("tick_fn", &self.tick_fn)
.field("events_binner", &self.events_binner)
.field("patch_collect", &self.patch_collect)
.finish()
}
#[derive(Debug)]
pub struct ConnTimeBin {
rt: RetentionTime,
series: SeriesId,
#[allow(unused)]
bin_len: DtNano,
next_coarse: Option<Box<EventsDim0TimeBinner<f32>>>,
events_binner: Box<dyn TimeBinner>,
acc: Box<dyn Any + Send>,
internal: Internal,
unsup: bool,
}
impl ConnTimeBin {
pub fn empty(series: SeriesId, bin_len: TsNano) -> Self {
pub fn new(
rt: RetentionTime,
series: SeriesId,
beg: TsNano,
bin_len: DtNano,
scalar_type: ScalarType,
shape: Shape,
) -> Result<Self, Error> {
let do_time_weight = true;
#[cfg(DISABLED)]
#[cfg(target_abi = "12")]
let next_coarse = if bin_len.ns() < SEC * 60 {
type ST = f32;
let brange = BinnedRange {
@@ -114,122 +167,110 @@ impl ConnTimeBin {
None
}
.map(Box::new);
Self {
patch_collect: PatchCollect::new(bin_len.clone(), 1),
did_setup: false,
let mut unsup = false;
let k = Self::setup_for(beg, bin_len, &scalar_type, &shape, do_time_weight);
let k = if k.is_ok() {
k
} else {
unsup = true;
Self::setup_for(beg, bin_len, &ScalarType::F32, &Shape::Scalar, do_time_weight)
};
let k = k?;
let ret = Self {
rt,
series,
bin_len,
next_coarse: None,
events_binner: None,
acc: Box::new(()),
push_fn: Box::new(push::<i32>),
tick_fn: Box::new(tick::<i32>),
events_binner: k.events_binner,
acc: k.acc,
internal: Internal {
push_fn: k.push_fn,
tick_fn: k.tick_fn,
},
unsup,
};
Ok(ret)
}
fn setup_for(
beg: TsNano,
bin_len: DtNano,
scalar_type: &ScalarType,
shape: &Shape,
do_time_weight: bool,
) -> Result<SetupResult, Error> {
// TODO should not take a system time here:
let range1 = BinnedRange {
bin_off: beg.ns() / bin_len.ns(),
bin_cnt: u64::MAX / bin_len.ns() - 10,
// TODO fix trait requirements
bin_len: TsNano::from_ns(bin_len.ns()),
};
let binrange = BinnedRangeEnum::Time(range1);
match shape {
Shape::Scalar => {
use ScalarType::*;
match scalar_type {
U8 => Self::setup_scalar::<u8>(binrange, do_time_weight),
U16 => Self::setup_scalar::<u16>(binrange, do_time_weight),
U32 => Self::setup_scalar::<u32>(binrange, do_time_weight),
U64 => Self::setup_scalar::<u64>(binrange, do_time_weight),
I8 => Self::setup_scalar::<i8>(binrange, do_time_weight),
I16 => Self::setup_scalar::<i16>(binrange, do_time_weight),
I32 => Self::setup_scalar::<i32>(binrange, do_time_weight),
I64 => Self::setup_scalar::<i64>(binrange, do_time_weight),
F32 => Self::setup_scalar::<f32>(binrange, do_time_weight),
F64 => Self::setup_scalar::<f64>(binrange, do_time_weight),
STRING => {
todo_setup!("TODO setup_event_acc {:?} {:?}", scalar_type, shape);
Err(Error::UnsupportedType)
}
_ => {
todo_setup!("TODO setup_event_acc {:?} {:?}", scalar_type, shape);
Err(Error::UnsupportedType)
}
}
}
Shape::Wave(..) => match scalar_type {
_ => {
todo_setup!("TODO setup_event_acc {:?} {:?}", scalar_type, shape);
Err(Error::UnsupportedType)
}
},
_ => {
todo_setup!("TODO setup_event_acc {:?} {:?}", scalar_type, shape);
Err(Error::UnsupportedType)
}
}
}
pub fn setup_for(&mut self, scalar_type: &ScalarType, shape: &Shape, tsnow: SystemTime) -> Result<(), Error> {
use ScalarType::*;
// TODO should not take a system time here:
let bin_len = &self.bin_len;
let ts0 = SEC * tsnow.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
let range1 = BinnedRange {
bin_off: ts0 / bin_len.ns(),
bin_cnt: u64::MAX / bin_len.ns() - 10,
bin_len: bin_len.clone(),
fn setup_scalar<ST>(binrange: BinnedRangeEnum, do_time_weight: bool) -> Result<SetupResult, Error>
where
ST: ScalarOps,
DataValue: GetValHelp<ST, ScalTy = ST>,
{
trace_setup!("SCALAR {}", any::type_name::<ST>());
type Cont<T> = EventsDim0<T>;
let cont = Cont::<ST>::empty();
let emit_empty_bins = false;
let ret = SetupResult {
events_binner: cont
.as_time_binnable_ref()
.time_binner_new(binrange, do_time_weight, emit_empty_bins),
acc: Box::new(cont),
push_fn: Box::new(push::<ST>),
tick_fn: Box::new(tick::<ST>),
};
let binrange = BinnedRangeEnum::Time(range1);
//info!("binrange {binrange:?}");
let do_time_weight = true;
match shape {
Shape::Scalar => {
type Cont<T> = EventsDim0<T>;
match scalar_type {
I8 => {
type ST = i8;
trace2!("SCALAR {}", any::type_name::<ST>());
let cont = Cont::<ST>::empty();
self.events_binner =
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
self.acc = Box::new(cont);
self.push_fn = Box::new(push::<ST>);
self.tick_fn = Box::new(tick::<ST>);
self.did_setup = true;
}
I16 => {
type ST = i16;
trace2!("SCALAR {}", std::any::type_name::<ST>());
let cont = Cont::<ST>::empty();
self.events_binner =
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
self.acc = Box::new(cont);
self.push_fn = Box::new(push::<ST>);
self.tick_fn = Box::new(tick::<ST>);
self.did_setup = true;
}
I32 => {
type ST = i32;
trace2!("SCALAR {}", std::any::type_name::<ST>());
let cont = Cont::<ST>::empty();
self.events_binner =
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
self.acc = Box::new(cont);
self.push_fn = Box::new(push::<ST>);
self.tick_fn = Box::new(tick::<ST>);
self.did_setup = true;
}
F32 => {
type ST = f32;
trace2!("SCALAR {}", std::any::type_name::<ST>());
let cont = Cont::<ST>::empty();
self.events_binner =
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
self.acc = Box::new(cont);
self.push_fn = Box::new(push::<ST>);
self.tick_fn = Box::new(tick::<ST>);
self.did_setup = true;
}
F64 => {
type ST = f64;
trace2!("SCALAR {}", std::any::type_name::<ST>());
let cont = Cont::<ST>::empty();
self.events_binner =
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
self.acc = Box::new(cont);
self.push_fn = Box::new(push::<ST>);
self.tick_fn = Box::new(tick::<ST>);
self.did_setup = true;
}
STRING => {
trace2!("TODO setup_event_acc {:?} {:?}", scalar_type, shape);
}
_ => {
trace2!("TODO setup_event_acc {:?} {:?}", scalar_type, shape);
}
}
}
Shape::Wave(..) => {
//type Cont<T> = EventsDim1<T>;
match scalar_type {
_ => {
trace2!("TODO setup_event_acc {:?} {:?}", scalar_type, shape);
}
}
}
_ => {
trace2!("TODO setup_event_acc {:?} {:?}", scalar_type, shape);
}
}
Ok(())
Ok(ret)
}
pub fn push(&mut self, ts: TsNano, val: &DataValue) -> Result<(), Error> {
if !self.did_setup {
// TODO record as logic error
if self.unsup {
return Ok(());
}
let (f, acc) = (&self.push_fn, &mut self.acc);
let (f, acc) = (&self.internal.push_fn, self.acc.as_mut());
let params = PushFnParams {
sid: self.series.clone(),
series: self.series.clone(),
acc,
ts,
val,
@@ -238,20 +279,38 @@ impl ConnTimeBin {
}
pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {
if !self.did_setup {
if self.unsup {
return Ok(());
}
let (f,) = (&self.tick_fn,);
let (f,) = (&self.internal.tick_fn,);
let params = TickParams {
rt: self.rt.clone(),
series: self.series.clone(),
acc: &mut self.acc,
tb: self.events_binner.as_mut().unwrap(),
pc: &mut self.patch_collect,
acc: self.acc.as_mut(),
tb: self.events_binner.as_mut(),
// pc: &mut self.patch_collect,
iqdqs,
next_coarse: self.next_coarse.as_mut().map(|x| x.as_mut()),
};
f(params)
}
pub fn finish(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {
if self.unsup {
return Ok(());
}
let series = self.series.clone();
let tb = self.events_binner.as_mut();
tb.push_in_progress(false);
let nbins = tb.bins_ready_count();
if nbins >= 1 {
trace_store_bin!("finish nbins {} {:?}", nbins, series);
let rt = self.rt.clone();
let next_coarse = self.next_coarse.as_mut().map(|x| x.as_mut());
store_bins(rt, series, tb, iqdqs, next_coarse)?;
}
Ok(())
}
}
fn push<STY>(params: PushFnParams) -> Result<(), Error>
@@ -259,7 +318,7 @@ where
STY: ScalarOps,
DataValue: GetValHelp<STY, ScalTy = STY>,
{
let sid = &params.sid;
let series = &params.series;
let ts = params.ts;
let v = match GetValHelp::<STY>::get(params.val) {
Ok(x) => x,
@@ -267,7 +326,7 @@ where
// TODO throttle the error
let msg = format!(
"GetValHelp mismatch: series {:?} STY {} data {:?} {e}",
sid,
series,
any::type_name::<STY>(),
params.val
);
@@ -276,13 +335,11 @@ where
}
};
if let Some(c) = params.acc.downcast_mut::<EventsDim0<STY>>() {
trace_push!("PUSHED");
c.push(ts.ns(), 0, v.clone());
Ok(())
} else {
// TODO report once and error out
error!("unexpected container");
//Err(Error::with_msg_no_trace("unexpected container"))
Ok(())
Err(Error::UnexpectedContainer)
}
}
@@ -290,19 +347,23 @@ fn tick<STY>(params: TickParams) -> Result<(), Error>
where
STY: ScalarOps,
{
let rt = params.rt;
let acc = params.acc;
let tb = params.tb;
// let pc = params.pc;
let iqdqs = params.iqdqs;
let next = params.next_coarse;
if let Some(c) = acc.downcast_mut::<EventsDim0<STY>>() {
trace_tick!("TICK CONV");
if c.len() >= 1 {
trace_tick!("TICK EVENTS");
tb.ingest(c);
c.reset();
trace_tick!("TICK INGESTED");
let nbins = tb.bins_ready_count();
if nbins >= 1 {
trace!("store bins len {} {:?}", nbins, params.series);
store_bins(params.series.clone(), tb, iqdqs, next)?;
trace_tick!("TICK READY {nbins}");
trace_store_bin!("store bins len {} {:?}", nbins, params.series);
store_bins(rt, params.series.clone(), tb, iqdqs, next)?;
// if let Some(mut bins) = tb.bins_ready() {
// //info!("store bins {bins:?}");
// let mut bins = bins.to_simple_bins_f32();
@@ -326,21 +387,23 @@ where
Ok(())
} else {
trace_tick!("TICK NOT READY");
Ok(())
}
} else {
trace_tick!("TICK NO EVENTS TO PROCESS");
Ok(())
}
} else {
error!("unexpected container");
//Err(Error::with_msg_no_trace("unexpected container"))
Ok(())
trace_tick!("TICK UNEXPECTED CONTAINER");
Err(Error::UnexpectedContainer)
}
}
fn store_bins(
rt: RetentionTime,
series: SeriesId,
tb: &mut Box<dyn TimeBinner>,
tb: &mut dyn TimeBinner,
iqdqs: &mut InsertDeques,
next: Option<&mut EventsDim0TimeBinner<f32>>,
) -> Result<(), Error> {
@@ -360,23 +423,25 @@ fn store_bins(
.zip(k.avgs.iter())
{
// TODO the inner must be of BinsDim0<f32> type so we feed also count, min, max, etc.
if let Some(next) = &next {
if let Some(_next) = &next {
// next.ingest();
}
// TODO this must depend on the data type: waveforms need smaller batches
let bins_per_msp = 82000;
let ts1ms = ts1 / MS;
let ts2ms = ts2 / MS;
let bin_len_ms = ts2ms - ts1ms;
let h = bins_per_msp * bin_len_ms;
let ts_msp = ts1ms / h * h;
let off = (ts1ms - ts_msp) / bin_len_ms;
let ts1 = TsMs::from_ms_u64(ts1 / MS);
let ts2 = TsMs::from_ms_u64(ts2 / MS);
let bin_len = ts2 - ts1;
let h = if bin_len == DtMs::from_ms_u64(1000 * 10) {
DtMs::from_ms_u64(1000 * 60 * 60 * 2)
} else {
// TODO
return Err(Error::Unsupported);
};
let ts_msp = TsMs::from_ms_u64(ts1.ms() / h.ms() * h.ms());
let off = (ts1.ms() - ts_msp.ms()) / bin_len.ms();
let item = TimeBinSimpleF32 {
series: series.clone(),
bin_len_ms: bin_len_ms as i32,
ts_msp: TsMs::from_ms_u64(ts_msp),
bin_len_ms: bin_len.ms() as i32,
ts_msp,
off: off as i32,
count: count as i64,
min,
@@ -384,12 +449,8 @@ fn store_bins(
avg,
};
let item = QueryItem::TimeBinSimpleF32(item);
trace!("push item B ts1ms {ts1ms} bin_len_ms {bin_len_ms} ts_msp {ts_msp} off {off}");
// TODO check which RT we want to push into
iqdqs.st_rf3_rx.push_back(item.clone());
// iqdqs.mt_rf3_rx.push_back(item.clone());
// iqdqs.lt_rf3_rx.push_back(item);
trace_store_bin!("push item B ts1ms {ts1:?} bin_len_ms {bin_len:?} ts_msp {ts_msp} off {off}");
iqdqs.deque(rt.clone()).push_back(item.clone());
}
}
Ok(())
@@ -414,38 +475,57 @@ fn store_bins(
}
}
fn store_patch(series: SeriesId, pc: &mut PatchCollect, iiq: &mut VecDeque<QueryItem>) -> Result<(), Error> {
// TODO
// I probably still want to keep the "patchcollect" because I want to store also the next
// resolutions.
// But I need to emit each bin as they come.
#[test]
fn test_00() {
let mut ctb = init_scalar_f32_conn_time_bin().unwrap();
let mut iqdqs = InsertDeques::new();
ctb.tick(&mut iqdqs).unwrap();
assert_eq!(iqdqs.len(), 0);
}
for item in pc.take_outq() {
if let Some(k) = item.as_any_ref().downcast_ref::<BinsDim0<f32>>() {
let ts0 = if let Some(x) = k.ts1s.front() {
*x
} else {
return Err(Error::PatchWithoutBins);
};
#[test]
fn test_01() {
use scywr::iteminsertqueue::ScalarValue;
let mut ctb = init_scalar_f32_conn_time_bin().unwrap();
ctb.push(TsNano::from_ms(1000), &DataValue::Scalar(ScalarValue::I32(10)))
.unwrap();
let mut iqdqs = InsertDeques::new();
ctb.tick(&mut iqdqs).unwrap();
assert_eq!(iqdqs.len(), 0);
}
// TODO insert each bin individually
let bin_len_sec = (pc.bin_len().ns() / MS);
let bin_count = pc.bin_count();
let off = ts0 / pc.patch_len().ns();
let off_msp = off / 1000;
let off_lsp = off % 1000;
// let item = TimeBinSimpleF32 {
// };
// let item = QueryItem::TimeBinSimpleF32(item);
// warn!(
// "push item B bin_len_sec {bin_len_sec} bin_count {bin_count} off_msp {off_msp} off_lsp {off_lsp}"
// );
// iiq.push_back(item);
#[test]
fn test_02() {
use scywr::iteminsertqueue::ScalarValue;
let mut ctb = init_scalar_f32_conn_time_bin().unwrap();
ctb.push(TsNano::from_ms(1000 * 10), &DataValue::Scalar(ScalarValue::I32(10)))
.unwrap();
ctb.push(TsNano::from_ms(1000 * 11), &DataValue::Scalar(ScalarValue::I32(12)))
.unwrap();
ctb.push(TsNano::from_ms(1000 * 12), &DataValue::Scalar(ScalarValue::I32(10)))
.unwrap();
let mut iqdqs = InsertDeques::new();
ctb.tick(&mut iqdqs).unwrap();
ctb.finish(&mut iqdqs).unwrap();
assert_eq!(iqdqs.len(), 1);
for e in iqdqs.st_rf3_rx {
eprintln!("{e:?}");
if let QueryItem::TimeBinSimpleF32(x) = e {
assert!(f32_close(x.avg, 10.2));
} else {
error!("unexpected container!");
return Err(Error::PatchUnexpectedContainer);
panic!();
}
}
Ok(())
}
#[cfg(test)]
fn init_scalar_f32_conn_time_bin() -> Result<ConnTimeBin, Error> {
let rt = RetentionTime::Short;
let series = SeriesId::new(1);
let beg = TsNano::from_ms(1000 * 10);
let bin_len = DtNano::from_ms(1000 * 10);
let scalar_type = ScalarType::I32;
let shape = Shape::Scalar;
let ctb = ConnTimeBin::new(rt, series, beg, bin_len, scalar_type, shape).unwrap();
Ok(ctb)
}

View File

@@ -28,7 +28,6 @@ pub enum Error {
Scy(#[from] scywr::session::Error),
ScySchema(#[from] scywr::schema::Error),
Series(#[from] dbpg::seriesbychannel::Error),
Timebin(#[from] crate::timebin::Error),
}
impl<T> From<async_channel::SendError<T>> for Error {