upport framing for container and output format

This commit is contained in:
Dominik Werder
2024-12-04 12:14:44 +01:00
parent 896a723a3e
commit 7ed9cf358f
14 changed files with 647 additions and 347 deletions

View File

@@ -1,4 +1,5 @@
use crate::binning::container::bins::BinAggedType;
use crate::binning::container_events::Container;
use crate::binning::container_events::EventValueType;
use crate::offsets::ts_offs_from_abs;
use crate::offsets::ts_offs_from_abs_with_anchor;
@@ -7,6 +8,7 @@ use items_0::collect_s::ToCborValue;
use items_0::collect_s::ToJsonValue;
use netpod::TsNano;
use serde::Serialize;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::fmt;
@@ -15,7 +17,7 @@ pub struct ContainerEventsApi<EVT>
where
EVT: EventValueType,
{
pub tss: VecDeque<u64>,
pub tss: VecDeque<TsNano>,
pub values: EVT::Container,
#[serde(skip_serializing_if = "netpod::is_false")]
pub range_final: bool,
@@ -39,9 +41,21 @@ impl<EVT> ToCborValue for ContainerEventsApi<EVT>
where
EVT: EventValueType,
{
fn to_cbor_value(&self) -> Result<ciborium::Value, ciborium::value::Error> {
let val = ciborium::value::Value::serialized(self).unwrap();
Ok(val)
fn into_fields(self) -> Vec<(String, Box<dyn erased_serde::Serialize>)> {
let tss: Vec<_> = self.tss.into_iter().map(|x| x.ns()).collect();
let mut ret = self.values.into_user_facing_fields();
ret.push(("tss".into(), Box::new(tss)));
if self.range_final {
ret.push(("rangeFinal".into(), Box::new(true)));
}
if self.timed_out {
ret.push(("timedOut".into(), Box::new(true)));
}
ret
}
fn into_fields_box(self: Box<Self>) -> Vec<(String, Box<dyn erased_serde::Serialize>)> {
ToCborValue::into_fields(*self)
}
}
@@ -49,13 +63,46 @@ impl<EVT> ToJsonValue for ContainerEventsApi<EVT>
where
EVT: EventValueType,
{
fn to_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
let ret = serde_json::to_value(self);
fn into_fields(self) -> Vec<(String, Box<dyn erased_serde::Serialize>)> {
let mut ret = self.values.into_user_facing_fields_json();
let (ts_anch, ts_ms, ts_ns) = ts_offs_from_abs(&self.tss);
ret.push(("tsAnchor".into(), Box::new(ts_anch)));
ret.push(("tsMs".into(), Box::new(ts_ms)));
ret.push(("tsNs".into(), Box::new(ts_ns)));
if self.range_final {
ret.push(("rangeFinal".into(), Box::new(true)));
}
if self.timed_out {
ret.push(("timedOut".into(), Box::new(true)));
}
ret
}
fn into_fields_box(self: Box<Self>) -> Vec<(String, Box<dyn erased_serde::Serialize>)> {
ToJsonValue::into_fields(*self)
}
}
impl<EVT> UserApiType for ContainerEventsApi<EVT> where EVT: EventValueType {}
impl<EVT> UserApiType for ContainerEventsApi<EVT>
where
EVT: EventValueType,
{
fn into_serializable(self: Box<Self>) -> Box<dyn erased_serde::Serialize> {
let mut map = BTreeMap::new();
for (k, v) in ToCborValue::into_fields_box(self) {
map.insert(k, v);
}
Box::new(map)
}
fn into_serializable_json(self: Box<Self>) -> Box<dyn erased_serde::Serialize> {
let mut map = BTreeMap::new();
for (k, v) in ToJsonValue::into_fields_box(self) {
map.insert(k, v);
}
Box::new(map)
}
}
#[derive(Serialize)]
pub struct ContainerBinsApi<EVT, BVT>
@@ -66,8 +113,8 @@ where
pub ts1s: VecDeque<TsNano>,
pub ts2s: VecDeque<TsNano>,
pub cnts: VecDeque<u64>,
pub mins: VecDeque<EVT>,
pub maxs: VecDeque<EVT>,
pub mins: <EVT as EventValueType>::Container,
pub maxs: <EVT as EventValueType>::Container,
pub aggs: VecDeque<BVT>,
pub fnls: VecDeque<bool>,
}
@@ -90,11 +137,40 @@ where
EVT: EventValueType,
BVT: BinAggedType,
{
fn to_cbor_value(&self) -> Result<ciborium::Value, ciborium::value::Error> {
// let val = ciborium::value::Value::serialized(self).unwrap();
// Ok(val)
let e = ciborium::value::Error::Custom("binned data as cbor is not yet available".into());
Err(e)
fn into_fields(self) -> Vec<(String, Box<dyn erased_serde::Serialize>)> {
let mut ret = Vec::<(String, Box<dyn erased_serde::Serialize>)>::new();
// let mut ret = self.aggs.into_user_facing_fields_json();
ret.push(("ts1s".into(), Box::new(self.ts1s)));
ret.push(("ts2s".into(), Box::new(self.ts2s)));
ret.push(("counts".into(), Box::new(self.cnts)));
{
let fields = self.mins.into_user_facing_fields();
for (k, v) in fields {
let k = if k == "values" {
"mins".to_string()
} else {
format!("mins_{}", k)
};
ret.push((k, v));
}
}
{
let fields = self.maxs.into_user_facing_fields();
for (k, v) in fields {
let k = if k == "values" {
"maxs".to_string()
} else {
format!("maxs_{}", k)
};
ret.push((k, v));
}
}
ret.push(("avgs".into(), Box::new(self.aggs)));
ret
}
fn into_fields_box(self: Box<Self>) -> Vec<(String, Box<dyn erased_serde::Serialize>)> {
ToCborValue::into_fields(*self)
}
}
@@ -103,26 +179,25 @@ where
EVT: EventValueType,
BVT: BinAggedType,
{
fn to_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
use serde_json::json;
use serde_json::Value;
// let ret = serde_json::to_value(self);
// ret
let (ts_anch, ts1ms, ts1ns) = ts_offs_from_abs(&self.ts1s);
let (ts2ms, ts2ns) = ts_offs_from_abs_with_anchor(ts_anch, &self.ts2s);
fn into_fields(self) -> Vec<(String, Box<dyn erased_serde::Serialize>)> {
let mut ret = Vec::<(String, Box<dyn erased_serde::Serialize>)>::new();
// let mut ret = self.aggs.into_user_facing_fields_json();
let (ts_anch, ts1_ms, ts1_ns) = ts_offs_from_abs(&self.ts1s);
let (ts2_ms, ts2_ns) = ts_offs_from_abs_with_anchor(ts_anch, &self.ts2s);
ret.push(("tsAnchor".into(), Box::new(ts_anch)));
ret.push(("ts1Ms".into(), Box::new(ts1_ms)));
ret.push(("ts1Ns".into(), Box::new(ts1_ns)));
ret.push(("ts2Ms".into(), Box::new(ts2_ms)));
ret.push(("ts2Ns".into(), Box::new(ts2_ns)));
ret.push(("counts".into(), Box::new(self.cnts)));
ret.push(("mins".into(), Box::new(self.mins)));
ret.push(("maxs".into(), Box::new(self.maxs)));
ret.push(("avgs".into(), Box::new(self.aggs)));
ret
}
let ret = json!({
"tsAnchor": ts_anch,
"ts1Ms": ts1ms,
"ts2Ms": ts2ms,
"ts1Ns": ts1ns,
"ts2Ns": ts2ns,
"counts": self.cnts,
"mins": self.mins,
"maxs": self.maxs,
"avgs": self.aggs,
});
Ok(ret)
fn into_fields_box(self: Box<Self>) -> Vec<(String, Box<dyn erased_serde::Serialize>)> {
ToJsonValue::into_fields(*self)
}
}
@@ -131,4 +206,19 @@ where
EVT: EventValueType,
BVT: BinAggedType,
{
fn into_serializable(self: Box<Self>) -> Box<dyn erased_serde::Serialize> {
let mut map = BTreeMap::new();
for (k, v) in ToCborValue::into_fields_box(self) {
map.insert(k, v);
}
Box::new(map)
}
fn into_serializable_json(self: Box<Self>) -> Box<dyn erased_serde::Serialize> {
let mut map = BTreeMap::new();
for (k, v) in ToJsonValue::into_fields_box(self) {
map.insert(k, v);
}
Box::new(map)
}
}

View File

@@ -2,7 +2,9 @@ pub mod agg_bins;
use super::container::bins::BinAggedType;
use super::container_events::EventValueType;
use super::container_events::PulsedVal;
use core::fmt;
use items_0::subfr::SubFrId;
use netpod::log::*;
use netpod::DtNano;
use netpod::EnumVariant;
@@ -373,3 +375,38 @@ impl AggregatorTimeWeight<Vec<EnumVariant>> for AggregatorVecNumeric {
sum / filled_width_fraction
}
}
#[derive(Debug)]
pub struct AggregatorPulsedNumeric<EVT>
where
EVT: EventValueType,
{
evt_agg: EVT::AggregatorTimeWeight,
}
impl<EVT> AggregatorTimeWeight<PulsedVal<EVT>> for AggregatorPulsedNumeric<EVT>
where
EVT: EventValueType + SubFrId,
{
fn new() -> Self {
Self {
evt_agg: EVT::AggregatorTimeWeight::new(),
}
}
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: PulsedVal<EVT>) {
self.evt_agg.ingest(dt, bl, val.1);
}
fn reset_for_new_bin(&mut self) {
self.evt_agg.reset_for_new_bin();
}
fn result_and_reset_for_new_bin(
&mut self,
filled_width_fraction: f32,
) -> <PulsedVal<EVT> as EventValueType>::AggTimeWeightOutputAvg {
self.evt_agg
.result_and_reset_for_new_bin(filled_width_fraction)
}
}

View File

@@ -1,4 +1,5 @@
use super::container::bins::BinAggedType;
use super::container_events::Container;
use super::container_events::EventValueType;
use crate::apitypes::ContainerBinsApi;
use crate::offsets::ts_offs_from_abs;
@@ -42,10 +43,10 @@ where
pub ts1: TsNano,
pub ts2: TsNano,
pub cnt: u64,
pub min: &'a EVT,
pub max: &'a EVT,
pub min: EVT::IterTy1<'a>,
pub max: EVT::IterTy1<'a>,
pub agg: &'a BVT,
pub lst: &'a EVT,
pub lst: EVT::IterTy1<'a>,
pub fnl: bool,
}
@@ -67,6 +68,7 @@ where
type Item = BinRef<'a, EVT, BVT>;
fn next(&mut self) -> Option<Self::Item> {
use crate::binning::container_events::Container;
if self.ix < self.bins.len() && self.ix < self.len {
let b = &self.bins;
let i = self.ix;
@@ -75,10 +77,10 @@ where
ts1: b.ts1s[i],
ts2: b.ts2s[i],
cnt: b.cnts[i],
min: &b.mins[i],
max: &b.maxs[i],
min: b.mins.get_iter_ty_1(i).unwrap(),
max: b.maxs.get_iter_ty_1(i).unwrap(),
agg: &b.aggs[i],
lst: &b.lsts[i],
lst: b.lsts.get_iter_ty_1(i).unwrap(),
fnl: b.fnls[i],
};
Some(ret)
@@ -97,10 +99,10 @@ where
ts1s: VecDeque<TsNano>,
ts2s: VecDeque<TsNano>,
cnts: VecDeque<u64>,
mins: VecDeque<EVT>,
maxs: VecDeque<EVT>,
mins: <EVT as EventValueType>::Container,
maxs: <EVT as EventValueType>::Container,
aggs: VecDeque<BVT>,
lsts: VecDeque<EVT>,
lsts: <EVT as EventValueType>::Container,
fnls: VecDeque<bool>,
}
@@ -145,28 +147,6 @@ where
EVT: EventValueType,
BVT: BinAggedType,
{
pub fn from_constituents(
ts1s: VecDeque<TsNano>,
ts2s: VecDeque<TsNano>,
cnts: VecDeque<u64>,
mins: VecDeque<EVT>,
maxs: VecDeque<EVT>,
aggs: VecDeque<BVT>,
lsts: VecDeque<EVT>,
fnls: VecDeque<bool>,
) -> Self {
Self {
ts1s,
ts2s,
cnts,
mins,
maxs,
aggs,
lsts,
fnls,
}
}
pub fn type_name() -> &'static str {
any::type_name::<Self>()
}
@@ -176,10 +156,10 @@ where
ts1s: VecDeque::new(),
ts2s: VecDeque::new(),
cnts: VecDeque::new(),
mins: VecDeque::new(),
maxs: VecDeque::new(),
mins: <<EVT as EventValueType>::Container as Container<EVT>>::new(),
maxs: <<EVT as EventValueType>::Container as Container<EVT>>::new(),
aggs: VecDeque::new(),
lsts: VecDeque::new(),
lsts: <<EVT as EventValueType>::Container as Container<EVT>>::new(),
fnls: VecDeque::new(),
}
}
@@ -228,20 +208,20 @@ where
self.cnts.iter()
}
pub fn mins_iter(&self) -> std::collections::vec_deque::Iter<EVT> {
self.mins.iter()
pub fn mins_iter(&self) -> impl Iterator<Item = EVT::IterTy1<'_>> {
self.mins.iter_ty_1()
}
pub fn maxs_iter(&self) -> std::collections::vec_deque::Iter<EVT> {
self.maxs.iter()
pub fn maxs_iter(&self) -> impl Iterator<Item = EVT::IterTy1<'_>> {
self.maxs.iter_ty_1()
}
pub fn aggs_iter(&self) -> std::collections::vec_deque::Iter<BVT> {
self.aggs.iter()
}
pub fn lsts_iter(&self) -> std::collections::vec_deque::Iter<EVT> {
self.lsts.iter()
pub fn lsts_iter(&self) -> impl Iterator<Item = EVT::IterTy1<'_>> {
self.lsts.iter_ty_1()
}
pub fn fnls_iter(&self) -> std::collections::vec_deque::Iter<bool> {
@@ -262,13 +242,13 @@ where
>,
std::collections::vec_deque::Iter<u64>,
>,
std::collections::vec_deque::Iter<EVT>,
impl Iterator<Item = EVT::IterTy1<'_>>,
>,
std::collections::vec_deque::Iter<EVT>,
impl Iterator<Item = EVT::IterTy1<'_>>,
>,
std::collections::vec_deque::Iter<BVT>,
>,
std::collections::vec_deque::Iter<EVT>,
impl Iterator<Item = EVT::IterTy1<'_>>,
>,
std::collections::vec_deque::Iter<bool>,
> {
@@ -487,40 +467,12 @@ where
// finished_at: Option<IsoDateTime>,
}
impl<EVT, BVT> ToJsonValue for ContainerBinsCollectorOutput<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn to_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
let bins = &self.bins;
let (ts_anch, ts1ms, ts1ns) = ts_offs_from_abs(&bins.ts1s);
let (ts2ms, ts2ns) = ts_offs_from_abs_with_anchor(ts_anch, &bins.ts2s);
let counts = bins.cnts.clone();
let mins = bins.mins.clone();
let maxs = bins.maxs.clone();
let aggs = bins.aggs.clone();
let val = ContainerBinsCollectorOutputUser::<EVT, BVT> {
ts_anchor_sec: ts_anch,
ts1_off_ms: ts1ms,
ts2_off_ms: ts2ms,
ts1_off_ns: ts1ns,
ts2_off_ns: ts2ns,
counts,
mins,
maxs,
aggs,
};
serde_json::to_value(&val)
}
}
impl<EVT, BVT> ToUserFacingApiType for ContainerBinsCollectorOutput<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn to_user_facing_api_type(self) -> Box<dyn items_0::apitypes::UserApiType> {
fn into_user_facing_api_type(self) -> Box<dyn items_0::apitypes::UserApiType> {
let ret = ContainerBinsApi::<EVT, BVT> {
ts1s: self.bins.ts1s,
ts2s: self.bins.ts2s,
@@ -533,8 +485,8 @@ where
Box::new(ret)
}
fn to_user_facing_api_type_box(self: Box<Self>) -> Box<dyn items_0::apitypes::UserApiType> {
(*self).to_user_facing_api_type()
fn into_user_facing_api_type_box(self: Box<Self>) -> Box<dyn items_0::apitypes::UserApiType> {
(*self).into_user_facing_api_type()
}
}
@@ -661,10 +613,10 @@ where
dst.ts1s.extend(self.ts1s.drain(range.clone()));
dst.ts2s.extend(self.ts2s.drain(range.clone()));
dst.cnts.extend(self.cnts.drain(range.clone()));
dst.mins.extend(self.mins.drain(range.clone()));
dst.maxs.extend(self.maxs.drain(range.clone()));
self.mins.drain_into(&mut dst.mins, range.clone());
self.maxs.drain_into(&mut dst.maxs, range.clone());
dst.aggs.extend(self.aggs.drain(range.clone()));
dst.lsts.extend(self.lsts.drain(range.clone()));
self.lsts.drain_into(&mut dst.lsts, range.clone());
dst.fnls.extend(self.fnls.drain(range.clone()));
} else {
let styn = any::type_name::<EVT>();
@@ -683,14 +635,7 @@ where
Box::new(ret)
}
fn fix_numerics(&mut self) {
for ((_min, _max), _avg) in self
.mins
.iter_mut()
.zip(self.maxs.iter_mut())
.zip(self.aggs.iter_mut())
{}
}
fn fix_numerics(&mut self) {}
}
pub struct ContainerBinsTakeUpTo<'a, EVT, BVT>

View File

@@ -1,10 +1,12 @@
use super::aggregator::AggTimeWeightOutputAvg;
use super::aggregator::AggregatorNumeric;
use super::aggregator::AggregatorPulsedNumeric;
use super::aggregator::AggregatorTimeWeight;
use super::aggregator::AggregatorVecNumeric;
use super::timeweight::timeweight_events_dyn::BinnedEventsTimeweightDynbox;
use crate::apitypes::ContainerEventsApi;
use crate::log::*;
use crate::offsets::pulse_offs_from_abs;
use core::fmt;
use core::ops::Range;
use daqbuf_err as err;
@@ -16,8 +18,6 @@ use items_0::collect_s::CollectableDyn;
use items_0::collect_s::CollectedDyn;
use items_0::collect_s::CollectorDyn;
use items_0::collect_s::CollectorTy;
use items_0::collect_s::ToCborValue;
use items_0::collect_s::ToJsonValue;
use items_0::container::ByteEstimate;
use items_0::merge::DrainIntoDstResult;
use items_0::merge::DrainIntoNewDynResult;
@@ -55,10 +55,11 @@ where
{
fn new() -> Self;
fn push_back(&mut self, val: EVT);
fn pop_front(&mut self) -> Option<EVT>;
fn get_iter_ty_1(&self, pos: usize) -> Option<EVT::IterTy1<'_>>;
fn iter_ty_1(&self) -> impl Iterator<Item = EVT::IterTy1<'_>>;
fn drain_into(&mut self, dst: &mut Self, range: Range<usize>);
fn into_user_facing_fields(self) -> Vec<(String, Box<dyn erased_serde::Serialize>)>;
fn into_user_facing_fields_json(self) -> Vec<(String, Box<dyn erased_serde::Serialize>)>;
}
pub trait PartialOrdEvtA<EVT> {
@@ -89,10 +90,6 @@ where
self.push_back(val);
}
fn pop_front(&mut self) -> Option<EVT> {
self.pop_front()
}
fn get_iter_ty_1(&self, pos: usize) -> Option<EVT::IterTy1<'_>> {
self.get(pos).map(|x| x.clone())
}
@@ -104,6 +101,14 @@ where
fn drain_into(&mut self, dst: &mut Self, range: Range<usize>) {
dst.extend(self.drain(range));
}
fn into_user_facing_fields(self) -> Vec<(String, Box<dyn erased_serde::Serialize>)> {
vec![("values".into(), Box::new(self))]
}
fn into_user_facing_fields_json(self) -> Vec<(String, Box<dyn erased_serde::Serialize>)> {
vec![("values".into(), Box::new(self))]
}
}
impl Container<String> for VecDeque<String> {
@@ -115,10 +120,6 @@ impl Container<String> for VecDeque<String> {
self.push_back(val);
}
fn pop_front(&mut self) -> Option<String> {
self.pop_front()
}
fn get_iter_ty_1(&self, pos: usize) -> Option<&str> {
self.get(pos).map(|x| x.as_str())
}
@@ -130,6 +131,14 @@ impl Container<String> for VecDeque<String> {
fn drain_into(&mut self, dst: &mut Self, range: Range<usize>) {
dst.extend(self.drain(range))
}
fn into_user_facing_fields(self) -> Vec<(String, Box<dyn erased_serde::Serialize>)> {
vec![("values".into(), Box::new(self))]
}
fn into_user_facing_fields_json(self) -> Vec<(String, Box<dyn erased_serde::Serialize>)> {
vec![("values".into(), Box::new(self))]
}
}
macro_rules! impl_event_value_type {
@@ -139,7 +148,7 @@ macro_rules! impl_event_value_type {
type AggregatorTimeWeight = AggregatorNumeric;
type AggTimeWeightOutputAvg = f64;
type IterTy1<'a> = $evt;
const SERDE_ID: u32 = <$evt as SubFrId>::SUB;
const SERDE_ID: u32 = <$evt as SubFrId>::SUB as _;
const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<$evt>() as u32;
}
@@ -191,7 +200,7 @@ impl EventValueType for f32 {
type AggregatorTimeWeight = AggregatorNumeric;
type AggTimeWeightOutputAvg = f32;
type IterTy1<'a> = f32;
const SERDE_ID: u32 = <f32 as SubFrId>::SUB;
const SERDE_ID: u32 = <f32 as SubFrId>::SUB as _;
const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<Self>() as u32;
}
@@ -200,7 +209,7 @@ impl EventValueType for f64 {
type AggregatorTimeWeight = AggregatorNumeric;
type AggTimeWeightOutputAvg = f64;
type IterTy1<'a> = f64;
const SERDE_ID: u32 = <f64 as SubFrId>::SUB;
const SERDE_ID: u32 = <f64 as SubFrId>::SUB as _;
const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<Self>() as u32;
}
@@ -209,7 +218,7 @@ impl EventValueType for bool {
type AggregatorTimeWeight = AggregatorNumeric;
type AggTimeWeightOutputAvg = f64;
type IterTy1<'a> = bool;
const SERDE_ID: u32 = <bool as SubFrId>::SUB;
const SERDE_ID: u32 = <bool as SubFrId>::SUB as _;
const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<Self>() as u32;
}
@@ -218,7 +227,7 @@ impl EventValueType for String {
type AggregatorTimeWeight = AggregatorNumeric;
type AggTimeWeightOutputAvg = f64;
type IterTy1<'a> = &'a str;
const SERDE_ID: u32 = <String as SubFrId>::SUB;
const SERDE_ID: u32 = <String as SubFrId>::SUB as _;
const BYTE_ESTIMATE_V00: u32 = 400;
}
@@ -229,7 +238,7 @@ macro_rules! impl_event_value_type_vec {
type AggregatorTimeWeight = AggregatorVecNumeric;
type AggTimeWeightOutputAvg = f32;
type IterTy1<'a> = Vec<$evt>;
const SERDE_ID: u32 = <Vec<$evt> as SubFrId>::SUB;
const SERDE_ID: u32 = <Vec<$evt> as SubFrId>::SUB as _;
// TODO must use a more precise number dependent on actual elements
const BYTE_ESTIMATE_V00: u32 = 1200 * core::mem::size_of::<Self>() as u32;
}
@@ -256,8 +265,57 @@ impl_event_value_type_vec!(bool);
impl_event_value_type_vec!(String);
impl_event_value_type_vec!(EnumVariant);
#[derive(Debug, Clone, PartialEq, PartialOrd, Serialize)]
pub struct PulsedVal<EVT>(EVT)
#[derive(Debug)]
pub struct PulsedValIterTy<'a, EVT>
where
EVT: EventValueType,
{
pulse: u64,
evt: EVT::IterTy1<'a>,
}
impl<'a, EVT> Clone for PulsedValIterTy<'a, EVT>
where
EVT: EventValueType + SubFrId,
{
fn clone(&self) -> Self {
Self {
pulse: self.pulse,
evt: self.evt.clone(),
}
}
}
impl<'a, EVT> PartialOrdEvtA<PulsedVal<EVT>> for PulsedValIterTy<'a, EVT>
where
EVT: EventValueType + SubFrId,
{
fn cmp_a(&self, other: &PulsedVal<EVT>) -> Option<std::cmp::Ordering> {
use std::cmp::Ordering;
match self.pulse.cmp(&other.0) {
Ordering::Less => Some(Ordering::Less),
Ordering::Greater => Some(Ordering::Greater),
Ordering::Equal => match self.evt.cmp_a(&other.1) {
Some(Ordering::Less) => Some(Ordering::Less),
Some(Ordering::Greater) => Some(Ordering::Greater),
Some(Ordering::Equal) => Some(Ordering::Equal),
None => None,
},
}
}
}
impl<'a, EVT> From<PulsedValIterTy<'a, EVT>> for PulsedVal<EVT>
where
EVT: EventValueType,
{
fn from(value: PulsedValIterTy<'a, EVT>) -> Self {
Self(value.pulse, value.evt.into())
}
}
#[derive(Debug, Clone, PartialEq, Serialize)]
pub struct PulsedVal<EVT>(pub u64, pub EVT)
where
EVT: EventValueType;
@@ -270,6 +328,15 @@ where
}
}
impl<EVT> PartialOrd for PulsedVal<EVT>
where
EVT: EventValueType,
{
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.1.partial_cmp(&other.1)
}
}
mod serde_pulsed_val {
use super::EventValueType;
use super::PulsedVal;
@@ -303,14 +370,18 @@ where
EVT: EventValueType,
{
fn preview<'a>(&'a self) -> Box<dyn fmt::Debug + 'a> {
todo!()
let ret = items_0::vecpreview::PreviewCell {
a: self.pulses.front(),
b: self.pulses.back(),
};
Box::new(ret)
}
}
impl<EVT> Container<PulsedVal<EVT>> for VecDequePulsed<EVT>
where
EVT: EventValueType,
PulsedVal<EVT>: EventValueType,
for<'a> PulsedVal<EVT>: EventValueType<IterTy1<'a> = PulsedValIterTy<'a, EVT>>,
{
fn new() -> Self {
Self {
@@ -320,48 +391,69 @@ where
}
fn push_back(&mut self, val: PulsedVal<EVT>) {
todo!()
}
fn pop_front(&mut self) -> Option<PulsedVal<EVT>> {
todo!()
self.pulses.push_back(val.0);
self.vals.push_back(val.1);
}
fn get_iter_ty_1(&self, pos: usize) -> Option<<PulsedVal<EVT> as EventValueType>::IterTy1<'_>> {
todo!()
if let (Some(&pulse), Some(val)) = (self.pulses.get(pos), self.vals.get_iter_ty_1(pos)) {
let x = PulsedValIterTy { pulse, evt: val };
Some(x)
} else {
None
}
}
fn iter_ty_1(&self) -> impl Iterator<Item = <PulsedVal<EVT> as EventValueType>::IterTy1<'_>> {
todo!();
self.vals.iter_ty_1().map(|x| todo!())
self.pulses
.iter()
.map(|&x| x)
.zip(self.vals.iter_ty_1())
.map(|(pulse, evt)| PulsedValIterTy { pulse, evt })
}
fn drain_into(&mut self, dst: &mut Self, range: Range<usize>) {
dst.pulses.extend(self.pulses.drain(range.clone()));
self.vals.drain_into(&mut dst.vals, range.clone());
}
fn into_user_facing_fields(self) -> Vec<(String, Box<dyn erased_serde::Serialize>)> {
vec![
("pulses".into(), Box::new(self.pulses)),
("values".into(), Box::new(self.vals)),
]
}
fn into_user_facing_fields_json(self) -> Vec<(String, Box<dyn erased_serde::Serialize>)> {
let (pulses_anch, pulses_offs) = pulse_offs_from_abs(&self.pulses);
vec![
("pulseAnchor".into(), Box::new(pulses_anch)),
("pulseOff".into(), Box::new(pulses_offs)),
("values".into(), Box::new(self.vals)),
]
}
}
macro_rules! impl_pulse_evt {
($evt:ty) => {
impl EventValueType for PulsedVal<$evt> {
type Container = VecDequePulsed<$evt>;
type AggregatorTimeWeight = AggregatorNumeric;
type AggTimeWeightOutputAvg = f64;
type IterTy1<'a> = PulsedVal<$evt>;
const SERDE_ID: u32 = <$evt as SubFrId>::SUB;
const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<$evt>() as u32;
}
impl PartialOrdEvtA<PulsedVal<$evt>> for PulsedVal<$evt> {
fn cmp_a(&self, other: &PulsedVal<$evt>) -> Option<std::cmp::Ordering> {
self.partial_cmp(other)
}
}
};
impl<EVT> PartialOrdEvtA<PulsedVal<EVT>> for PulsedVal<EVT>
where
EVT: EventValueType,
{
fn cmp_a(&self, other: &PulsedVal<EVT>) -> Option<std::cmp::Ordering> {
self.partial_cmp(other)
}
}
impl_pulse_evt!(u8);
impl<EVT> EventValueType for PulsedVal<EVT>
where
EVT: EventValueType + SubFrId,
{
type Container = VecDequePulsed<EVT>;
type AggregatorTimeWeight = AggregatorPulsedNumeric<EVT>;
type AggTimeWeightOutputAvg = EVT::AggTimeWeightOutputAvg;
type IterTy1<'a> = PulsedValIterTy<'a, EVT>;
const SERDE_ID: u32 = items_0::subfr::pulsed_subfr(<EVT as SubFrId>::SUB) as _;
const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<EVT>() as u32;
}
#[derive(Debug, Clone)]
pub struct EventSingleRef<'a, EVT>
@@ -578,8 +670,8 @@ where
self.vals.push_back(val);
}
pub fn iter_zip<'a>(&'a self) -> impl Iterator<Item = (&TsNano, EVT::IterTy1<'a>)> {
self.tss.iter().zip(self.vals.iter_ty_1())
pub fn iter_zip<'a>(&'a self) -> impl Iterator<Item = (TsNano, EVT::IterTy1<'a>)> {
self.tss.iter().map(|&x| x).zip(self.vals.iter_ty_1())
}
pub fn serde_id() -> u32 {
@@ -664,6 +756,7 @@ where
evs: &'a ContainerEvents<EVT>,
end: usize,
pos: usize,
// it: Box<dyn Iterator<Item = (TsNano, EVT::IterTy1<'static>)>>,
}
impl<'a, EVT> ContainerEventsTakeUpTo<'a, EVT>
@@ -671,10 +764,13 @@ where
EVT: EventValueType,
{
pub fn new(evs: &'a ContainerEvents<EVT>) -> Self {
// let it = unsafe { netpod::extltref(evs) }.iter_zip();
// let it = Box::new(it);
Self {
evs,
end: evs.len(),
pos: 0,
// it,
}
}
@@ -884,9 +980,9 @@ impl<EVT> ToUserFacingApiType for ContainerEventsCollected<EVT>
where
EVT: EventValueType,
{
fn to_user_facing_api_type(self: Self) -> Box<dyn UserApiType> {
fn into_user_facing_api_type(self: Self) -> Box<dyn UserApiType> {
let evs = ContainerEventsApi::<EVT> {
tss: self.evs.tss.into_iter().map(|x| x.ns()).collect(),
tss: self.evs.tss,
values: self.evs.vals,
range_final: self.range_final,
timed_out: self.timed_out,
@@ -894,8 +990,8 @@ where
Box::new(evs)
}
fn to_user_facing_api_type_box(self: Box<Self>) -> Box<dyn UserApiType> {
(*self).to_user_facing_api_type()
fn into_user_facing_api_type_box(self: Box<Self>) -> Box<dyn UserApiType> {
(*self).into_user_facing_api_type()
}
}
@@ -925,6 +1021,7 @@ where
EVT: EventValueType,
{
pub fn new() -> Self {
debug!("ContainerEventsCollector::new");
Self {
evs: ContainerEvents::new(),
range_final: false,
@@ -950,9 +1047,7 @@ where
type Output = ContainerEventsCollected<EVT>;
fn ingest(&mut self, src: &mut Self::Input) {
let n = self.len();
info!("CollectorTy for ContainerEventsCollector n {}", n);
MergeableTy::drain_into(src, &mut self.evs, 0..n);
MergeableTy::drain_into(src, &mut self.evs, 0..src.len());
}
fn set_range_complete(&mut self) {
@@ -983,31 +1078,13 @@ where
}
}
impl<EVT> ToCborValue for ContainerEvents<EVT>
where
EVT: EventValueType,
{
fn to_cbor_value(&self) -> Result<ciborium::Value, ciborium::value::Error> {
ciborium::value::Value::serialized(self)
}
}
impl<EVT> ToJsonValue for ContainerEvents<EVT>
where
EVT: EventValueType,
{
fn to_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
serde_json::to_value(self)
}
}
impl<EVT> ToUserFacingApiType for ContainerEvents<EVT>
where
EVT: EventValueType,
{
fn to_user_facing_api_type(self: Self) -> Box<dyn UserApiType> {
fn into_user_facing_api_type(self: Self) -> Box<dyn UserApiType> {
let ret = ContainerEventsApi::<EVT> {
tss: self.tss.into_iter().map(|x| x.ns()).collect(),
tss: self.tss,
values: self.vals,
range_final: false,
timed_out: false,
@@ -1015,9 +1092,9 @@ where
Box::new(ret)
}
fn to_user_facing_api_type_box(self: Box<Self>) -> Box<dyn UserApiType> {
fn into_user_facing_api_type_box(self: Box<Self>) -> Box<dyn UserApiType> {
let this = *self;
this.to_user_facing_api_type()
this.into_user_facing_api_type()
}
}
@@ -1071,29 +1148,26 @@ mod test_frame {
use super::*;
use crate::channelevents::ChannelEvents;
use crate::framable::Framable;
use crate::framable::INMEM_FRAME_ENCID;
use crate::frame::decode_frame;
use crate::inmem::InMemoryFrame;
use crate::inmem::ParseResult;
use items_0::streamitem::sitem_data;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use netpod::TsMs;
#[test]
fn events_serialize() {
let mut evs = ContainerEvents::new();
evs.push_back(TsNano::from_ns(123), 55f32);
let mut evs = ContainerEvents::<f32>::new();
evs.push_back(TsNano::from_ns(123), 55.);
evs.push_back(TsNano::from_ns(124), 56.);
let item = ChannelEvents::from(evs);
let item: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)));
let mut buf = item.make_frame_dyn().unwrap();
let s = String::from_utf8_lossy(&buf[20..buf.len() - 4]);
eprintln!("[[{s}]]");
let buflen = buf.len();
let frame = InMemoryFrame {
encid: INMEM_FRAME_ENCID,
tyid: 0x2500,
len: (buflen - 24) as _,
buf: buf.split_off(20).split_to(buflen - 20 - 4).freeze(),
let item: Sitemty<_> = sitem_data(item);
let buf = item.make_frame_dyn().unwrap();
let frame = match InMemoryFrame::parse(&buf) {
Ok(ParseResult::Parsed(n, val)) => val,
Ok(ParseResult::NotEnoughData(n)) => panic!(),
Err(e) => panic!("{}", e),
};
let item: Sitemty<ChannelEvents> = decode_frame(&frame).unwrap();
let item = if let Ok(x) = item { x } else { panic!() };
@@ -1119,7 +1193,7 @@ mod test_frame {
};
assert_eq!(
MergeableTy::tss_for_testing(item),
&[TsMs::from_ms_u64(123)]
&[TsNano::from_ns(123), TsNano::from_ns(124)]
);
}
}

View File

@@ -2,6 +2,7 @@ use crate::binning::container::bins::AggBinValTw;
use crate::binning::container::bins::BinAggedType;
use crate::binning::container_bins::ContainerBins;
use crate::binning::container_events::EventValueType;
use crate::binning::container_events::PartialOrdEvtA;
use crate::log::*;
use items_0::timebin::BinnedBinsTimeweightTrait;
use items_0::timebin::BinningggError;
@@ -86,13 +87,16 @@ where
self.non_fnl = false;
}
fn bound(a: &mut Option<EVT>, b: &EVT, f: impl Fn(&EVT, &EVT) -> bool) {
fn bound(a: &mut Option<EVT>, b: <EVT as EventValueType>::IterTy1<'_>, d: std::cmp::Ordering) {
if let Some(x) = a.as_mut() {
if f(b, x) {
*x = b.clone();
match b.cmp_a(x) {
Some(x) if x == d => {
*a = Some(b.into());
}
Some(_) | None => {}
}
} else {
*a = Some(b.clone());
*a = Some(b.into());
}
}
@@ -101,20 +105,20 @@ where
let grid = self.range.bin_len_dt_ns();
trace_ingest_bin!("grid {:?} ts1 {:?} agg {:?}", grid, ts1, agg);
if ts1 < self.active_beg {
self.lst = Some(lst.clone());
self.lst = Some(lst.into());
} else {
if ts1 >= self.active_end {
self.maybe_emit_active();
self.active_forward(ts1);
}
self.cnt += cnt;
Self::bound(&mut self.min, min, PartialOrd::lt);
Self::bound(&mut self.max, max, PartialOrd::gt);
Self::bound(&mut self.min, min, std::cmp::Ordering::Less);
Self::bound(&mut self.max, max, std::cmp::Ordering::Greater);
let dt = ts2.delta(ts1);
let bl = self.range.bin_len_dt_ns();
self.agg.ingest(dt, bl, cnt, agg.clone());
self.non_fnl |= !fnl;
self.lst = Some(lst.clone());
self.lst = Some(lst.into());
}
}
Ok(())

View File

@@ -42,14 +42,6 @@ impl Container<EnumVariant> for EnumVariantContainer {
self.names.push_back(name);
}
fn pop_front(&mut self) -> Option<EnumVariant> {
if let (Some(a), Some(b)) = (self.ixs.pop_front(), self.names.pop_front()) {
Some(EnumVariant::new(a, b))
} else {
None
}
}
fn get_iter_ty_1(&self, pos: usize) -> Option<<EnumVariant as EventValueType>::IterTy1<'_>> {
if let (Some(&ix), Some(name)) = (self.ixs.get(pos), self.names.get(pos)) {
let ret = EnumVariantRef {
@@ -76,6 +68,20 @@ impl Container<EnumVariant> for EnumVariantContainer {
dst.ixs.extend(self.ixs.drain(range.clone()));
dst.names.extend(self.names.drain(range));
}
fn into_user_facing_fields(self) -> Vec<(String, Box<dyn erased_serde::Serialize>)> {
vec![
("values".into(), Box::new(self.ixs)),
("valuestrings".into(), Box::new(self.names)),
]
}
fn into_user_facing_fields_json(self) -> Vec<(String, Box<dyn erased_serde::Serialize>)> {
vec![
("values".into(), Box::new(self.ixs)),
("valuestrings".into(), Box::new(self.names)),
]
}
}
#[derive(Debug)]
@@ -130,6 +136,6 @@ impl EventValueType for EnumVariant {
type AggregatorTimeWeight = EnumVariantAggregatorTimeWeight;
type AggTimeWeightOutputAvg = f32;
type IterTy1<'a> = EnumVariantRef<'a>;
const SERDE_ID: u32 = Self::SUB;
const SERDE_ID: u32 = Self::SUB as u32;
const BYTE_ESTIMATE_V00: u32 = 40;
}

View File

@@ -223,8 +223,13 @@ impl AsAnyMut for ChannelEvents {
mod serde_channel_events {
use super::ChannelEvents;
use crate::binning::container_events::ContainerEvents;
use crate::binning::container_events::PulsedVal;
use crate::channelevents::ConnStatusEvent;
use crate::log::*;
use items_0::subfr::is_container_events;
use items_0::subfr::is_pulsed_subfr;
use items_0::subfr::is_vec_subfr;
use items_0::subfr::subfr_scalar_type;
use items_0::subfr::SubFrId;
use items_0::timebin::BinningggContainerEventsDyn;
use netpod::EnumVariant;
@@ -241,6 +246,11 @@ mod serde_channel_events {
macro_rules! trace_serde { ($($arg:tt)*) => ( if false { eprintln!($($arg)*); }) }
type C01<T> = ContainerEvents<T>;
type C02<T> = ContainerEvents<Vec<T>>;
type C03<T> = ContainerEvents<PulsedVal<T>>;
type C04<T> = ContainerEvents<PulsedVal<Vec<T>>>;
fn try_serialize<S, T>(
v: &dyn BinningggContainerEventsDyn,
ser: &mut <S as Serializer>::SerializeSeq,
@@ -258,6 +268,34 @@ mod serde_channel_events {
}
}
macro_rules! ser_inner_nty {
($ser:expr, $cont1:ident, $nty:expr, $val:expr) => {{
let ser = $ser;
let nty_id = subfr_scalar_type($nty);
let v = $val;
type C<T> = $cont1<T>;
match nty_id {
u8::SUB => try_serialize::<S, C<u8>>(v, ser),
// u16::SUB => try_serialize::<S, C1<u16>>(v, ser)?,
// u32::SUB => try_serialize::<S, C1<u32>>(v, ser)?,
// u64::SUB => try_serialize::<S, C1<u64>>(v, ser)?,
// i8::SUB => try_serialize::<S, C1<i8>>(v, ser)?,
// i16::SUB => try_serialize::<S, C1<i16>>(v, ser)?,
// i32::SUB => try_serialize::<S, C1<i32>>(v, ser)?,
// i64::SUB => try_serialize::<S, C1<i64>>(v, ser)?,
f32::SUB => try_serialize::<S, C<f32>>(v, ser),
// f64::SUB => try_serialize::<S, C1<f64>>(v, ser)?,
// bool::SUB => try_serialize::<S, C1<bool>>(v, ser)?,
// String::SUB => try_serialize::<S, C1<String>>(v, ser)?,
// EnumVariant::SUB => try_serialize::<S, C1<EnumVariant>>(v, ser)?,
_ => {
let msg = format!("serde ser not supported evt id 0x{:x}", nty_id);
return Err(serde::ser::Error::custom(msg));
}
}
}};
}
struct EvRef<'a>(&'a dyn BinningggContainerEventsDyn);
struct EvBox(Box<dyn BinningggContainerEventsDyn>);
@@ -270,35 +308,25 @@ mod serde_channel_events {
let mut ser = ser.serialize_seq(Some(3))?;
ser.serialize_element(&self.0.serde_id())?;
ser.serialize_element(&self.0.nty_id())?;
use items_0::streamitem::CONTAINER_EVENTS_TYPE_ID;
type C1<T> = ContainerEvents<T>;
match self.0.serde_id() {
CONTAINER_EVENTS_TYPE_ID => match self.0.nty_id() {
u8::SUB => try_serialize::<S, C1<u8>>(self.0, &mut ser)?,
u16::SUB => try_serialize::<S, C1<u16>>(self.0, &mut ser)?,
u32::SUB => try_serialize::<S, C1<u32>>(self.0, &mut ser)?,
u64::SUB => try_serialize::<S, C1<u64>>(self.0, &mut ser)?,
i8::SUB => try_serialize::<S, C1<i8>>(self.0, &mut ser)?,
i16::SUB => try_serialize::<S, C1<i16>>(self.0, &mut ser)?,
i32::SUB => try_serialize::<S, C1<i32>>(self.0, &mut ser)?,
i64::SUB => try_serialize::<S, C1<i64>>(self.0, &mut ser)?,
f32::SUB => try_serialize::<S, C1<f32>>(self.0, &mut ser)?,
f64::SUB => try_serialize::<S, C1<f64>>(self.0, &mut ser)?,
bool::SUB => try_serialize::<S, C1<bool>>(self.0, &mut ser)?,
String::SUB => try_serialize::<S, C1<String>>(self.0, &mut ser)?,
EnumVariant::SUB => try_serialize::<S, C1<EnumVariant>>(self.0, &mut ser)?,
//
Vec::<f32>::SUB => try_serialize::<S, C1<Vec<f32>>>(self.0, &mut ser)?,
_ => {
let msg = format!("not supported evt id {}", self.0.nty_id());
return Err(serde::ser::Error::custom(msg));
let nty_id = self.0.nty_id() as u16;
if is_container_events(self.0.serde_id()) {
if is_pulsed_subfr(nty_id) {
if is_vec_subfr(nty_id) {
ser_inner_nty!(&mut ser, C04, nty_id, self.0)
} else {
ser_inner_nty!(&mut ser, C03, nty_id, self.0)
}
} else {
if is_vec_subfr(nty_id) {
ser_inner_nty!(&mut ser, C02, nty_id, self.0)
} else {
ser_inner_nty!(&mut ser, C01, nty_id, self.0)
}
},
_ => {
let msg = format!("not supported obj id {}", self.0.serde_id());
return Err(serde::ser::Error::custom(msg));
}
}
} else {
let msg = format!("not supported obj id {}", self.0.serde_id());
return Err(serde::ser::Error::custom(msg));
}?;
ser.end()
}
}
@@ -316,12 +344,40 @@ mod serde_channel_events {
A: de::SeqAccess<'de>,
T: Deserialize<'de> + BinningggContainerEventsDyn + 'static,
{
let s = std::any::type_name::<T>();
trace_serde!("get_2nd_or_err {}", s);
let obj: T = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
macro_rules! de_inner_nty {
($seq:expr, $cont1:ident, $nty:expr) => {{
let seq = $seq;
let nty = subfr_scalar_type($nty);
match nty {
u8::SUB => get_2nd_or_err::<$cont1<u8>, _>(seq),
u16::SUB => get_2nd_or_err::<$cont1<u16>, _>(seq),
u32::SUB => get_2nd_or_err::<$cont1<u32>, _>(seq),
u64::SUB => get_2nd_or_err::<$cont1<u64>, _>(seq),
i8::SUB => get_2nd_or_err::<$cont1<i8>, _>(seq),
i16::SUB => get_2nd_or_err::<$cont1<i16>, _>(seq),
i32::SUB => get_2nd_or_err::<$cont1<i32>, _>(seq),
i64::SUB => get_2nd_or_err::<$cont1<i64>, _>(seq),
f32::SUB => get_2nd_or_err::<$cont1<f32>, _>(seq),
f64::SUB => get_2nd_or_err::<$cont1<f64>, _>(seq),
bool::SUB => get_2nd_or_err::<$cont1<bool>, _>(seq),
String::SUB => get_2nd_or_err::<$cont1<String>, _>(seq),
EnumVariant::SUB => get_2nd_or_err::<$cont1<EnumVariant>, _>(seq),
_ => {
error!("TODO serde::de nty 0x{:x}", nty);
Err(de::Error::custom(&format!("unknown nty 0x{:x}", nty)))
}
}
}};
}
impl<'de> Visitor<'de> for EvBoxVis {
type Value = EvBox;
@@ -334,49 +390,31 @@ mod serde_channel_events {
A: de::SeqAccess<'de>,
{
trace_serde!("EvBoxVis::visit_seq");
type C1<EVT> = ContainerEvents<EVT>;
let cty: u32 = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[0] cty"))?;
let nty: u32 = seq
let nty: u16 = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[1] nty"))?;
if cty == C1::<u8>::serde_id() {
match nty {
u8::SUB => get_2nd_or_err::<C1<u8>, _>(&mut seq),
u16::SUB => get_2nd_or_err::<C1<u16>, _>(&mut seq),
u32::SUB => get_2nd_or_err::<C1<u32>, _>(&mut seq),
u64::SUB => get_2nd_or_err::<C1<u64>, _>(&mut seq),
i8::SUB => get_2nd_or_err::<C1<i8>, _>(&mut seq),
i16::SUB => get_2nd_or_err::<C1<i16>, _>(&mut seq),
i32::SUB => get_2nd_or_err::<C1<i32>, _>(&mut seq),
i64::SUB => get_2nd_or_err::<C1<i64>, _>(&mut seq),
f32::SUB => get_2nd_or_err::<C1<f32>, _>(&mut seq),
f64::SUB => get_2nd_or_err::<C1<f64>, _>(&mut seq),
bool::SUB => get_2nd_or_err::<C1<bool>, _>(&mut seq),
String::SUB => get_2nd_or_err::<C1<String>, _>(&mut seq),
EnumVariant::SUB => get_2nd_or_err::<C1<EnumVariant>, _>(&mut seq),
Vec::<u8>::SUB => get_2nd_or_err::<C1<Vec<u8>>, _>(&mut seq),
Vec::<u16>::SUB => get_2nd_or_err::<C1<Vec<u16>>, _>(&mut seq),
Vec::<u32>::SUB => get_2nd_or_err::<C1<Vec<u32>>, _>(&mut seq),
Vec::<u64>::SUB => get_2nd_or_err::<C1<Vec<u64>>, _>(&mut seq),
Vec::<i8>::SUB => get_2nd_or_err::<C1<Vec<i8>>, _>(&mut seq),
Vec::<i16>::SUB => get_2nd_or_err::<C1<Vec<i16>>, _>(&mut seq),
Vec::<i32>::SUB => get_2nd_or_err::<C1<Vec<i32>>, _>(&mut seq),
Vec::<i64>::SUB => get_2nd_or_err::<C1<Vec<i64>>, _>(&mut seq),
Vec::<f32>::SUB => get_2nd_or_err::<C1<Vec<f32>>, _>(&mut seq),
Vec::<f64>::SUB => get_2nd_or_err::<C1<Vec<f64>>, _>(&mut seq),
Vec::<bool>::SUB => get_2nd_or_err::<C1<Vec<bool>>, _>(&mut seq),
Vec::<String>::SUB => get_2nd_or_err::<C1<Vec<String>>, _>(&mut seq),
Vec::<EnumVariant>::SUB => get_2nd_or_err::<C1<Vec<EnumVariant>>, _>(&mut seq),
_ => {
error!("TODO serde cty {cty} nty {nty}");
Err(de::Error::custom(&format!("unknown nty {nty}")))
let seq = &mut seq;
trace_serde!("EvBoxVis cty 0x{:x} nty 0x{:X}", cty, nty);
if is_container_events(cty) {
if is_pulsed_subfr(nty) {
if is_vec_subfr(nty) {
de_inner_nty!(seq, C04, nty)
} else {
de_inner_nty!(seq, C03, nty)
}
} else {
if is_vec_subfr(nty) {
de_inner_nty!(seq, C02, nty)
} else {
de_inner_nty!(seq, C01, nty)
}
}
} else {
error!("unsupported serde cty {cty} nty {nty}");
Err(de::Error::custom(&format!("unknown cty {cty}")))
error!("unsupported serde cty 0x{:x} nty 0x{:x}", cty, nty);
Err(de::Error::custom(&format!("unknown cty 0x{:x}", cty)))
}
}
}
@@ -682,13 +720,13 @@ impl MergeableTy for ChannelEvents {
Some(_) => DrainIntoDstResult::Partial,
None => {
if range.len() != 1 {
trace!("try to add empty range to status container {range:?}");
trace!("try to add empty range to status container {:?}", range);
}
if range.start != 0 {
trace!("weird range {range:?}");
trace!("weird range {:?}", range);
}
if range.end > 1 {
trace!("weird range {range:?}");
trace!("weird range {:?}", range);
}
*j = k.take();
DrainIntoDstResult::Done
@@ -824,11 +862,11 @@ impl WithLen for ChannelEventsCollectorOutput {
}
impl ToUserFacingApiType for ChannelEventsCollectorOutput {
fn to_user_facing_api_type(self: Self) -> Box<dyn UserApiType> {
fn into_user_facing_api_type(self: Self) -> Box<dyn UserApiType> {
todo!()
}
fn to_user_facing_api_type_box(self: Box<Self>) -> Box<dyn UserApiType> {
fn into_user_facing_api_type_box(self: Box<Self>) -> Box<dyn UserApiType> {
todo!()
}
}
@@ -921,51 +959,23 @@ impl CollectorDyn for ChannelEventsCollector {
}
None => {
let e = err::Error::with_public_msg_no_trace("nothing collected [caa8d2565]");
error!("{e}");
error!("{}", e);
Err(e)
}
}
}
}
impl ToJsonValue for ChannelEvents {
fn to_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
let ret = match self {
ChannelEvents::Events(x) => x.to_json_value().unwrap(),
ChannelEvents::Status(x) => serde_json::json!({
"_private_channel_status": x,
}),
};
Ok(ret)
}
}
impl ToCborValue for ChannelEvents {
fn to_cbor_value(&self) -> Result<ciborium::Value, ciborium::value::Error> {
let ret = match self {
ChannelEvents::Events(x) => x.to_cbor_value()?,
ChannelEvents::Status(x) => {
use ciborium::cbor;
cbor!({
"_private_channel_status" => x,
})
.unwrap()
}
};
Ok(ret)
}
}
impl ToUserFacingApiType for ChannelEvents {
fn to_user_facing_api_type(self) -> Box<dyn UserApiType> {
fn into_user_facing_api_type(self) -> Box<dyn UserApiType> {
match self {
ChannelEvents::Events(x) => x.to_user_facing_api_type_box(),
ChannelEvents::Events(x) => x.into_user_facing_api_type_box(),
ChannelEvents::Status(x) => Box::new(items_0::apitypes::EmptyStruct::new()),
}
}
fn to_user_facing_api_type_box(self: Box<Self>) -> Box<dyn UserApiType> {
fn into_user_facing_api_type_box(self: Box<Self>) -> Box<dyn UserApiType> {
let this = *self;
this.to_user_facing_api_type()
this.into_user_facing_api_type()
}
}

View File

@@ -1,4 +1,5 @@
use crate::binning::container_events::ContainerEvents;
use crate::binning::container_events::PulsedVal;
use crate::Error;
use daqbuf_err as err;
use items_0::timebin::BinningggContainerEventsDyn;
@@ -57,3 +58,54 @@ pub fn empty_events_dyn_ev(
};
Ok(ret)
}
pub fn empty_events_pulsed_dyn_ev(
scalar_type: &ScalarType,
shape: &Shape,
) -> Result<Box<dyn BinningggContainerEventsDyn>, Error> {
let ret: Box<dyn BinningggContainerEventsDyn> = match shape {
Shape::Scalar => {
use ScalarType::*;
type K<T> = ContainerEvents<PulsedVal<T>>;
match scalar_type {
U8 => Box::new(K::<u8>::new()),
U16 => Box::new(K::<u16>::new()),
U32 => Box::new(K::<u32>::new()),
U64 => Box::new(K::<u64>::new()),
I8 => Box::new(K::<i8>::new()),
I16 => Box::new(K::<i16>::new()),
I32 => Box::new(K::<i32>::new()),
I64 => Box::new(K::<i64>::new()),
F32 => Box::new(K::<f32>::new()),
F64 => Box::new(K::<f64>::new()),
BOOL => Box::new(K::<bool>::new()),
STRING => Box::new(K::<String>::new()),
Enum => Box::new(K::<EnumVariant>::new()),
}
}
Shape::Wave(..) => {
use ScalarType::*;
type K<T> = ContainerEvents<Vec<T>>;
match scalar_type {
U8 => Box::new(K::<u8>::new()),
U16 => Box::new(K::<u16>::new()),
U32 => Box::new(K::<u32>::new()),
U64 => Box::new(K::<u64>::new()),
I8 => Box::new(K::<i8>::new()),
I16 => Box::new(K::<i16>::new()),
I32 => Box::new(K::<i32>::new()),
I64 => Box::new(K::<i64>::new()),
F32 => Box::new(K::<f32>::new()),
F64 => Box::new(K::<f64>::new()),
BOOL => Box::new(K::<bool>::new()),
STRING => Box::new(K::<String>::new()),
Enum => Box::new(K::<EnumVariant>::new()),
}
}
Shape::Image(..) => {
error!("TODO empty_events_dyn_ev {scalar_type:?} {shape:?}");
err::todoval()
}
};
Ok(ret)
}

View File

@@ -375,18 +375,12 @@ impl<STY: ScalarOps> WithLen for EventsDim0CollectorOutput<STY> {
}
}
impl<STY: ScalarOps> ToJsonValue for EventsDim0CollectorOutput<STY> {
fn to_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
serde_json::to_value(self)
}
}
impl<STY: ScalarOps> ToUserFacingApiType for EventsDim0CollectorOutput<STY> {
fn to_user_facing_api_type(self: Self) -> Box<dyn items_0::apitypes::UserApiType> {
fn into_user_facing_api_type(self: Self) -> Box<dyn items_0::apitypes::UserApiType> {
todo!()
}
fn to_user_facing_api_type_box(self: Box<Self>) -> Box<dyn items_0::apitypes::UserApiType> {
fn into_user_facing_api_type_box(self: Box<Self>) -> Box<dyn items_0::apitypes::UserApiType> {
todo!()
}
}
@@ -647,7 +641,7 @@ impl<STY: ScalarOps> Events for EventsDim0<STY> {
}
fn nty_id(&self) -> u32 {
STY::SUB
STY::SUB as u32
}
fn clone_dyn(&self) -> Box<dyn Events> {

View File

@@ -129,11 +129,11 @@ impl TypeName for EventsDim0EnumCollectorOutput {
}
impl ToUserFacingApiType for EventsDim0EnumCollectorOutput {
fn to_user_facing_api_type(self: Self) -> Box<dyn items_0::apitypes::UserApiType> {
fn into_user_facing_api_type(self: Self) -> Box<dyn items_0::apitypes::UserApiType> {
todo!()
}
fn to_user_facing_api_type_box(self: Box<Self>) -> Box<dyn items_0::apitypes::UserApiType> {
fn into_user_facing_api_type_box(self: Box<Self>) -> Box<dyn items_0::apitypes::UserApiType> {
todo!()
}
}

View File

@@ -337,11 +337,11 @@ impl<STY: ScalarOps> WithLen for EventsDim1CollectorOutput<STY> {
}
impl<STY: ScalarOps> ToUserFacingApiType for EventsDim1CollectorOutput<STY> {
fn to_user_facing_api_type(self: Self) -> Box<dyn items_0::apitypes::UserApiType> {
fn into_user_facing_api_type(self: Self) -> Box<dyn items_0::apitypes::UserApiType> {
todo!()
}
fn to_user_facing_api_type_box(self: Box<Self>) -> Box<dyn items_0::apitypes::UserApiType> {
fn into_user_facing_api_type_box(self: Box<Self>) -> Box<dyn items_0::apitypes::UserApiType> {
todo!()
}
}
@@ -589,7 +589,7 @@ impl<STY: ScalarOps> Events for EventsDim1<STY> {
}
fn nty_id(&self) -> u32 {
STY::SUB
STY::SUB as u32
}
fn clone_dyn(&self) -> Box<dyn Events> {

View File

@@ -1,6 +1,25 @@
use crate::framable::INMEM_FRAME_FOOT;
use crate::framable::INMEM_FRAME_HEAD;
use crate::framable::INMEM_FRAME_MAGIC;
use crate::log::*;
use bytes::Bytes;
use std::fmt;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "InMemoryFrameError")]
pub enum Error {
LessThanHeader,
TryFromSlice(#[from] std::array::TryFromSliceError),
BadMagic(u32),
HugeFrame(u32),
BadCrc,
}
pub enum ParseResult<T> {
NotEnoughData(usize),
Parsed(usize, T),
}
pub struct InMemoryFrame {
pub encid: u32,
pub tyid: u32,
@@ -12,15 +31,66 @@ impl InMemoryFrame {
pub fn encid(&self) -> u32 {
self.encid
}
pub fn tyid(&self) -> u32 {
self.tyid
}
pub fn len(&self) -> u32 {
self.len
}
pub fn buf(&self) -> &Bytes {
&self.buf
}
pub fn parse(buf: &[u8]) -> Result<ParseResult<Self>, Error> {
if buf.len() < INMEM_FRAME_HEAD {
return Err(Error::LessThanHeader);
}
let magic = u32::from_le_bytes(buf[0..4].try_into()?);
let encid = u32::from_le_bytes(buf[4..8].try_into()?);
let tyid = u32::from_le_bytes(buf[8..12].try_into()?);
let len = u32::from_le_bytes(buf[12..16].try_into()?);
let payload_crc_exp = u32::from_le_bytes(buf[16..20].try_into()?);
if magic != INMEM_FRAME_MAGIC {
return Err(Error::BadMagic(magic));
}
if len > 1024 * 1024 * 50 {
return Err(Error::HugeFrame(len));
}
let lentot = INMEM_FRAME_HEAD + INMEM_FRAME_FOOT + len as usize;
if buf.len() < lentot {
return Ok(ParseResult::NotEnoughData(lentot));
}
let p1 = INMEM_FRAME_HEAD + len as usize;
let mut h = crc32fast::Hasher::new();
h.update(&buf[..p1]);
let frame_crc = h.finalize();
let mut h = crc32fast::Hasher::new();
h.update(&buf[INMEM_FRAME_HEAD..p1]);
let payload_crc = h.finalize();
let frame_crc_ind = u32::from_le_bytes(buf[p1..p1 + 4].try_into()?);
let payload_crc_match = payload_crc_exp == payload_crc;
let frame_crc_match = frame_crc_ind == frame_crc;
if !frame_crc_match || !payload_crc_match {
let _ss = String::from_utf8_lossy(&buf[..buf.len().min(256)]);
let msg = format!(
"InMemoryFrameAsyncReadStream tryparse crc mismatch A {} {}",
payload_crc_match, frame_crc_match,
);
error!("{}", msg);
let e = Error::BadCrc;
return Err(e);
}
let ret = InMemoryFrame {
len,
tyid,
encid,
buf: Bytes::from(buf[INMEM_FRAME_HEAD..p1].to_vec()),
};
Ok(ParseResult::Parsed(lentot, ret))
}
}
impl fmt::Debug for InMemoryFrame {

View File

@@ -1,4 +1,5 @@
use bytes::Bytes;
use core::fmt;
use items_0::WithLen;
pub struct JsonBytes(String);
@@ -15,6 +16,18 @@ impl JsonBytes {
pub fn len(&self) -> u32 {
self.0.len() as _
}
pub fn into_bytes(self) -> Vec<u8> {
self.0.as_bytes().to_vec()
}
}
impl fmt::Debug for JsonBytes {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_tuple("JsonBytes")
.field(&self.0.chars().take(40).collect::<String>())
.finish()
}
}
impl WithLen for JsonBytes {

View File

@@ -208,7 +208,7 @@ where
}
}
}
trace4!("tslows {tslows:?}");
trace4!("tslows {:?}", tslows);
if let Some((il0, _tl0)) = tslows[0] {
if let Some((_il1, tl1)) = tslows[1] {
// There is a second input, take only up to the second highest timestamp
@@ -218,7 +218,7 @@ where
// Can take the whole item
// TODO gather stats about this case. Should be never for databuffer, and often for scylla.
let mut item = self.items[il0].take().unwrap();
trace3!("Take all from item {item:?}");
trace3!("Take all from item {:?}", item);
match self.take_into_output_all(&mut item) {
DrainIntoDstResult::Done => Ok(Break(())),
DrainIntoDstResult::Partial => {
@@ -239,7 +239,7 @@ where
} else {
// Take only up to the lowest ts of the second-lowest input
let mut item = self.items[il0].take().unwrap();
trace3!("Take up to {tl1} from item {item:?}");
trace3!("Take up to {} from item {:?}", tl1, item);
match self.take_into_output_upto(&mut item, tl1) {
DrainIntoDstResult::Done => {
if item.len() == 0 {
@@ -272,7 +272,7 @@ where
} else {
// No other input, take the whole item
let mut item = self.items[il0].take().unwrap();
trace3!("Take all from item (no other input) {item:?}");
trace3!("Take all from item (no other input) {:?}", item);
match self.take_into_output_all(&mut item) {
DrainIntoDstResult::Done => Ok(Break(())),
DrainIntoDstResult::Partial => {
@@ -363,7 +363,12 @@ where
.zip(self.items.iter())
.filter(|(a, b)| a.is_some() && b.is_none())
.count();
trace3!("ninps {ninps} nitems {nitems} nitemsmissing {nitemsmissing}");
trace3!(
"ninps {} nitems {} nitemsmissing {}",
ninps,
nitems,
nitemsmissing
);
if nitemsmissing != 0 {
let e = Error::NoPendingButMissing;
return Break(Ready(Some(e)));