WIP change container type

This commit is contained in:
Dominik Werder
2024-11-30 11:44:09 +01:00
parent 7075248225
commit 896a723a3e
14 changed files with 584 additions and 190 deletions

134
src/apitypes.rs Normal file
View File

@@ -0,0 +1,134 @@
use crate::binning::container::bins::BinAggedType;
use crate::binning::container_events::EventValueType;
use crate::offsets::ts_offs_from_abs;
use crate::offsets::ts_offs_from_abs_with_anchor;
use items_0::apitypes::UserApiType;
use items_0::collect_s::ToCborValue;
use items_0::collect_s::ToJsonValue;
use netpod::TsNano;
use serde::Serialize;
use std::collections::VecDeque;
use std::fmt;
#[derive(Serialize)]
pub struct ContainerEventsApi<EVT>
where
EVT: EventValueType,
{
pub tss: VecDeque<u64>,
pub values: EVT::Container,
#[serde(skip_serializing_if = "netpod::is_false")]
pub range_final: bool,
#[serde(skip_serializing_if = "netpod::is_false")]
pub timed_out: bool,
}
impl<EVT> fmt::Debug for ContainerEventsApi<EVT>
where
EVT: EventValueType,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("ContainerEventsApi")
// .field("tss", &self.tss)
// .field("values", &self.values)
.finish()
}
}
impl<EVT> ToCborValue for ContainerEventsApi<EVT>
where
EVT: EventValueType,
{
fn to_cbor_value(&self) -> Result<ciborium::Value, ciborium::value::Error> {
let val = ciborium::value::Value::serialized(self).unwrap();
Ok(val)
}
}
impl<EVT> ToJsonValue for ContainerEventsApi<EVT>
where
EVT: EventValueType,
{
fn to_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
let ret = serde_json::to_value(self);
ret
}
}
impl<EVT> UserApiType for ContainerEventsApi<EVT> where EVT: EventValueType {}
#[derive(Serialize)]
pub struct ContainerBinsApi<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
pub ts1s: VecDeque<TsNano>,
pub ts2s: VecDeque<TsNano>,
pub cnts: VecDeque<u64>,
pub mins: VecDeque<EVT>,
pub maxs: VecDeque<EVT>,
pub aggs: VecDeque<BVT>,
pub fnls: VecDeque<bool>,
}
impl<EVT, BVT> fmt::Debug for ContainerBinsApi<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("ContainerBinsApi")
// .field("tss", &self.tss)
// .field("values", &self.values)
.finish()
}
}
impl<EVT, BVT> ToCborValue for ContainerBinsApi<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn to_cbor_value(&self) -> Result<ciborium::Value, ciborium::value::Error> {
// let val = ciborium::value::Value::serialized(self).unwrap();
// Ok(val)
let e = ciborium::value::Error::Custom("binned data as cbor is not yet available".into());
Err(e)
}
}
impl<EVT, BVT> ToJsonValue for ContainerBinsApi<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn to_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
use serde_json::json;
use serde_json::Value;
// let ret = serde_json::to_value(self);
// ret
let (ts_anch, ts1ms, ts1ns) = ts_offs_from_abs(&self.ts1s);
let (ts2ms, ts2ns) = ts_offs_from_abs_with_anchor(ts_anch, &self.ts2s);
let ret = json!({
"tsAnchor": ts_anch,
"ts1Ms": ts1ms,
"ts2Ms": ts2ms,
"ts1Ns": ts1ns,
"ts2Ns": ts2ns,
"counts": self.cnts,
"mins": self.mins,
"maxs": self.maxs,
"avgs": self.aggs,
});
Ok(ret)
}
}
impl<EVT, BVT> UserApiType for ContainerBinsApi<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
}

View File

@@ -49,7 +49,7 @@ pub struct AggregatorNumeric {
sum: f64,
}
trait AggWithF64: EventValueType<AggTimeWeightOutputAvg = f64> {
pub trait AggWithF64: EventValueType<AggTimeWeightOutputAvg = f64> {
fn as_f64(&self) -> f64;
}
@@ -129,6 +129,10 @@ macro_rules! impl_agg_tw_for_agg_num {
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: $evt) {
let f = dt.ns() as f64 / bl.ns() as f64;
trace_event!("INGEST {} {}", f, val);
if true {
panic!();
}
let val = 42;
self.sum += f * val as f64;
}
@@ -158,6 +162,8 @@ impl_agg_tw_for_agg_num!(i16);
impl_agg_tw_for_agg_num!(i32);
impl_agg_tw_for_agg_num!(i64);
impl_agg_tw_for_agg_num!(super::container_events::PulsedVal<u8>);
impl AggregatorTimeWeight<u64> for AggregatorNumeric {
fn new() -> Self {
Self { sum: 0. }

View File

@@ -1,11 +1,13 @@
use super::container::bins::BinAggedType;
use super::container_events::EventValueType;
use crate::apitypes::ContainerBinsApi;
use crate::offsets::ts_offs_from_abs;
use crate::offsets::ts_offs_from_abs_with_anchor;
use core::fmt;
use daqbuf_err as err;
use err::thiserror;
use err::ThisError;
use items_0::apitypes::ToUserFacingApiType;
use items_0::collect_s::CollectableDyn;
use items_0::collect_s::CollectedDyn;
use items_0::collect_s::ToJsonValue;
@@ -16,9 +18,7 @@ use items_0::AsAnyMut;
use items_0::AsAnyRef;
use items_0::TypeName;
use items_0::WithLen;
use netpod::log::*;
use netpod::TsNano;
use serde::Deserialize;
use serde::Serialize;
use std::any;
use std::collections::VecDeque;
@@ -494,10 +494,8 @@ where
{
fn to_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
let bins = &self.bins;
let ts1sns: Vec<_> = bins.ts1s.iter().map(|x| x.ns()).collect();
let ts2sns: Vec<_> = bins.ts2s.iter().map(|x| x.ns()).collect();
let (ts_anch, ts1ms, ts1ns) = ts_offs_from_abs(&ts1sns);
let (ts2ms, ts2ns) = ts_offs_from_abs_with_anchor(ts_anch, &ts2sns);
let (ts_anch, ts1ms, ts1ns) = ts_offs_from_abs(&bins.ts1s);
let (ts2ms, ts2ns) = ts_offs_from_abs_with_anchor(ts_anch, &bins.ts2s);
let counts = bins.cnts.clone();
let mins = bins.mins.clone();
let maxs = bins.maxs.clone();
@@ -517,6 +515,29 @@ where
}
}
impl<EVT, BVT> ToUserFacingApiType for ContainerBinsCollectorOutput<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn to_user_facing_api_type(self) -> Box<dyn items_0::apitypes::UserApiType> {
let ret = ContainerBinsApi::<EVT, BVT> {
ts1s: self.bins.ts1s,
ts2s: self.bins.ts2s,
cnts: self.bins.cnts,
mins: self.bins.mins,
maxs: self.bins.maxs,
aggs: self.bins.aggs,
fnls: self.bins.fnls,
};
Box::new(ret)
}
fn to_user_facing_api_type_box(self: Box<Self>) -> Box<dyn items_0::apitypes::UserApiType> {
(*self).to_user_facing_api_type()
}
}
impl<EVT, BVT> CollectedDyn for ContainerBinsCollectorOutput<EVT, BVT>
where
EVT: EventValueType,
@@ -585,15 +606,7 @@ where
self.timed_out = true;
}
fn set_continue_at_here(&mut self) {
debug!("TODO remember the continue at");
}
fn result(
&mut self,
_range: Option<netpod::range::evrange::SeriesRange>,
_binrange: Option<netpod::BinnedRangeEnum>,
) -> Result<Box<dyn items_0::collect_s::CollectedDyn>, err::Error> {
fn result(&mut self) -> Result<Box<dyn items_0::collect_s::CollectedDyn>, err::Error> {
// TODO do we need to set timeout, continueAt or anything?
let bins = mem::replace(&mut self.bins, ContainerBins::new());
let ret = ContainerBinsCollectorOutput { bins };

View File

@@ -3,15 +3,19 @@ use super::aggregator::AggregatorNumeric;
use super::aggregator::AggregatorTimeWeight;
use super::aggregator::AggregatorVecNumeric;
use super::timeweight::timeweight_events_dyn::BinnedEventsTimeweightDynbox;
use crate::apitypes::ContainerEventsApi;
use crate::log::*;
use core::fmt;
use core::ops::Range;
use daqbuf_err as err;
use err::thiserror;
use err::ThisError;
use items_0::apitypes::ContainerEventsApi;
use items_0::apitypes::ToUserFacingApiType;
use items_0::apitypes::UserApiType;
use items_0::collect_s::CollectableDyn;
use items_0::collect_s::CollectedDyn;
use items_0::collect_s::CollectorDyn;
use items_0::collect_s::CollectorTy;
use items_0::collect_s::ToCborValue;
use items_0::collect_s::ToJsonValue;
use items_0::container::ByteEstimate;
@@ -28,10 +32,10 @@ use items_0::Appendable;
use items_0::AsAnyMut;
use items_0::AsAnyRef;
use items_0::Empty;
use items_0::TypeName;
use items_0::WithLen;
use netpod::BinnedRange;
use netpod::EnumVariant;
use netpod::TsMs;
use netpod::TsNano;
use serde::Deserialize;
use serde::Serialize;
@@ -69,6 +73,7 @@ pub trait EventValueType:
type AggTimeWeightOutputAvg: AggTimeWeightOutputAvg;
type IterTy1<'a>: fmt::Debug + Clone + PartialOrdEvtA<Self> + Into<Self>;
const SERDE_ID: u32;
const BYTE_ESTIMATE_V00: u32;
}
impl<EVT> Container<EVT> for VecDeque<EVT>
@@ -135,6 +140,7 @@ macro_rules! impl_event_value_type {
type AggTimeWeightOutputAvg = f64;
type IterTy1<'a> = $evt;
const SERDE_ID: u32 = <$evt as SubFrId>::SUB;
const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<$evt>() as u32;
}
impl PartialOrdEvtA<$evt> for $evt {
@@ -186,6 +192,7 @@ impl EventValueType for f32 {
type AggTimeWeightOutputAvg = f32;
type IterTy1<'a> = f32;
const SERDE_ID: u32 = <f32 as SubFrId>::SUB;
const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<Self>() as u32;
}
impl EventValueType for f64 {
@@ -194,6 +201,7 @@ impl EventValueType for f64 {
type AggTimeWeightOutputAvg = f64;
type IterTy1<'a> = f64;
const SERDE_ID: u32 = <f64 as SubFrId>::SUB;
const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<Self>() as u32;
}
impl EventValueType for bool {
@@ -202,6 +210,7 @@ impl EventValueType for bool {
type AggTimeWeightOutputAvg = f64;
type IterTy1<'a> = bool;
const SERDE_ID: u32 = <bool as SubFrId>::SUB;
const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<Self>() as u32;
}
impl EventValueType for String {
@@ -210,6 +219,7 @@ impl EventValueType for String {
type AggTimeWeightOutputAvg = f64;
type IterTy1<'a> = &'a str;
const SERDE_ID: u32 = <String as SubFrId>::SUB;
const BYTE_ESTIMATE_V00: u32 = 400;
}
macro_rules! impl_event_value_type_vec {
@@ -220,6 +230,8 @@ macro_rules! impl_event_value_type_vec {
type AggTimeWeightOutputAvg = f32;
type IterTy1<'a> = Vec<$evt>;
const SERDE_ID: u32 = <Vec<$evt> as SubFrId>::SUB;
// TODO must use a more precise number dependent on actual elements
const BYTE_ESTIMATE_V00: u32 = 1200 * core::mem::size_of::<Self>() as u32;
}
impl PartialOrdEvtA<Vec<$evt>> for Vec<$evt> {
@@ -244,6 +256,113 @@ impl_event_value_type_vec!(bool);
impl_event_value_type_vec!(String);
impl_event_value_type_vec!(EnumVariant);
#[derive(Debug, Clone, PartialEq, PartialOrd, Serialize)]
pub struct PulsedVal<EVT>(EVT)
where
EVT: EventValueType;
impl<EVT> fmt::Display for PulsedVal<EVT>
where
EVT: EventValueType,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{:?}", self)
}
}
mod serde_pulsed_val {
use super::EventValueType;
use super::PulsedVal;
use serde::Deserialize;
use serde::Deserializer;
impl<'de, EVT> Deserialize<'de> for PulsedVal<EVT>
where
EVT: EventValueType,
{
fn deserialize<D>(de: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
todo!("TODO mod serde_pulsed_val")
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VecDequePulsed<EVT>
where
EVT: EventValueType,
{
pulses: VecDeque<u64>,
vals: EVT::Container,
}
impl<EVT> PreviewRange for VecDequePulsed<EVT>
where
EVT: EventValueType,
{
fn preview<'a>(&'a self) -> Box<dyn fmt::Debug + 'a> {
todo!()
}
}
impl<EVT> Container<PulsedVal<EVT>> for VecDequePulsed<EVT>
where
EVT: EventValueType,
PulsedVal<EVT>: EventValueType,
{
fn new() -> Self {
Self {
pulses: VecDeque::new(),
vals: <<EVT as EventValueType>::Container as Container<EVT>>::new(),
}
}
fn push_back(&mut self, val: PulsedVal<EVT>) {
todo!()
}
fn pop_front(&mut self) -> Option<PulsedVal<EVT>> {
todo!()
}
fn get_iter_ty_1(&self, pos: usize) -> Option<<PulsedVal<EVT> as EventValueType>::IterTy1<'_>> {
todo!()
}
fn iter_ty_1(&self) -> impl Iterator<Item = <PulsedVal<EVT> as EventValueType>::IterTy1<'_>> {
todo!();
self.vals.iter_ty_1().map(|x| todo!())
}
fn drain_into(&mut self, dst: &mut Self, range: Range<usize>) {
dst.pulses.extend(self.pulses.drain(range.clone()));
self.vals.drain_into(&mut dst.vals, range.clone());
}
}
macro_rules! impl_pulse_evt {
($evt:ty) => {
impl EventValueType for PulsedVal<$evt> {
type Container = VecDequePulsed<$evt>;
type AggregatorTimeWeight = AggregatorNumeric;
type AggTimeWeightOutputAvg = f64;
type IterTy1<'a> = PulsedVal<$evt>;
const SERDE_ID: u32 = <$evt as SubFrId>::SUB;
const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<$evt>() as u32;
}
impl PartialOrdEvtA<PulsedVal<$evt>> for PulsedVal<$evt> {
fn cmp_a(&self, other: &PulsedVal<$evt>) -> Option<std::cmp::Ordering> {
self.partial_cmp(other)
}
}
};
}
impl_pulse_evt!(u8);
#[derive(Debug, Clone)]
pub struct EventSingleRef<'a, EVT>
where
@@ -543,7 +662,6 @@ where
EVT: EventValueType,
{
evs: &'a ContainerEvents<EVT>,
beg: usize,
end: usize,
pos: usize,
}
@@ -555,7 +673,6 @@ where
pub fn new(evs: &'a ContainerEvents<EVT>) -> Self {
Self {
evs,
beg: 0,
end: evs.len(),
pos: 0,
}
@@ -636,15 +753,15 @@ where
fn find_highest_index_lt(&self, ts: TsNano) -> Option<usize> {
let x = self.tss.partition_point(|&x| x < ts);
if x == 0 || x >= self.tss.len() {
if x == 0 {
None
} else {
Some(x - 1)
}
}
fn tss_for_testing(&self) -> Vec<TsMs> {
self.tss.iter().map(|&x| x.to_ts_ms()).collect()
fn tss_for_testing(&self) -> VecDeque<TsNano> {
self.tss.clone()
}
fn drain_into(&mut self, dst: &mut Self, range: Range<usize>) -> DrainIntoDstResult {
@@ -697,7 +814,7 @@ where
MergeableTy::find_highest_index_lt(self, ts)
}
fn tss_for_testing(&self) -> Vec<netpod::TsMs> {
fn tss_for_testing(&self) -> VecDeque<TsNano> {
MergeableTy::tss_for_testing(self)
}
@@ -726,6 +843,146 @@ where
}
}
impl<EVT> TypeName for ContainerEvents<EVT>
where
EVT: EventValueType,
{
fn type_name(&self) -> String {
std::any::type_name::<Self>().into()
}
}
#[derive(Debug, Serialize)]
pub struct ContainerEventsCollected<EVT>
where
EVT: EventValueType,
{
evs: ContainerEvents<EVT>,
range_final: bool,
timed_out: bool,
}
impl<EVT> WithLen for ContainerEventsCollected<EVT>
where
EVT: EventValueType,
{
fn len(&self) -> usize {
self.evs.len()
}
}
impl<EVT> TypeName for ContainerEventsCollected<EVT>
where
EVT: EventValueType,
{
fn type_name(&self) -> String {
std::any::type_name::<Self>().into()
}
}
impl<EVT> ToUserFacingApiType for ContainerEventsCollected<EVT>
where
EVT: EventValueType,
{
fn to_user_facing_api_type(self: Self) -> Box<dyn UserApiType> {
let evs = ContainerEventsApi::<EVT> {
tss: self.evs.tss.into_iter().map(|x| x.ns()).collect(),
values: self.evs.vals,
range_final: self.range_final,
timed_out: self.timed_out,
};
Box::new(evs)
}
fn to_user_facing_api_type_box(self: Box<Self>) -> Box<dyn UserApiType> {
(*self).to_user_facing_api_type()
}
}
impl<EVT> CollectedDyn for ContainerEventsCollected<EVT> where EVT: EventValueType {}
impl<EVT> ByteEstimate for ContainerEventsCollector<EVT>
where
EVT: EventValueType,
{
fn byte_estimate(&self) -> u64 {
EVT::BYTE_ESTIMATE_V00 as _
}
}
#[derive(Debug)]
pub struct ContainerEventsCollector<EVT>
where
EVT: EventValueType,
{
evs: ContainerEvents<EVT>,
range_final: bool,
timed_out: bool,
}
impl<EVT> ContainerEventsCollector<EVT>
where
EVT: EventValueType,
{
pub fn new() -> Self {
Self {
evs: ContainerEvents::new(),
range_final: false,
timed_out: false,
}
}
}
impl<EVT> WithLen for ContainerEventsCollector<EVT>
where
EVT: EventValueType,
{
fn len(&self) -> usize {
self.evs.len()
}
}
impl<EVT> CollectorTy for ContainerEventsCollector<EVT>
where
EVT: EventValueType,
{
type Input = ContainerEvents<EVT>;
type Output = ContainerEventsCollected<EVT>;
fn ingest(&mut self, src: &mut Self::Input) {
let n = self.len();
info!("CollectorTy for ContainerEventsCollector n {}", n);
MergeableTy::drain_into(src, &mut self.evs, 0..n);
}
fn set_range_complete(&mut self) {
self.range_final = true;
}
fn set_timed_out(&mut self) {
self.timed_out = true;
}
fn result(&mut self) -> Result<Self::Output, daqbuf_err::Error> {
let ret = Self::Output {
evs: self.evs.clone(),
range_final: self.range_final,
timed_out: self.timed_out,
};
Ok(ret)
}
}
impl<EVT> CollectableDyn for ContainerEvents<EVT>
where
EVT: EventValueType,
{
fn new_collector(&self) -> Box<dyn CollectorDyn> {
// crate::eventsdim0::EventsDim0;
Box::new(ContainerEventsCollector::<EVT>::new())
}
}
impl<EVT> ToCborValue for ContainerEvents<EVT>
where
EVT: EventValueType,
@@ -749,31 +1006,11 @@ where
EVT: EventValueType,
{
fn to_user_facing_api_type(self: Self) -> Box<dyn UserApiType> {
let this = self;
let tss: VecDeque<_> = this.tss.into_iter().map(|x| x.ms()).collect();
let ret = ContainerEventsApi {
tss: tss.clone(),
values: tss.clone(),
};
Box::new(ret)
}
fn to_user_facing_api_type_box(self: Box<Self>) -> Box<dyn UserApiType> {
let this = *self;
this.to_user_facing_api_type()
}
}
impl<EVT> ToUserFacingApiType for Box<ContainerEvents<EVT>>
where
EVT: EventValueType,
{
fn to_user_facing_api_type(self: Self) -> Box<dyn UserApiType> {
let this = *self;
let tss: VecDeque<_> = this.tss.into_iter().map(|x| x.ms()).collect();
let ret = ContainerEventsApi {
tss: tss.clone(),
values: tss.clone(),
let ret = ContainerEventsApi::<EVT> {
tss: self.tss.into_iter().map(|x| x.ns()).collect(),
values: self.vals,
range_final: false,
timed_out: false,
};
Box::new(ret)
}
@@ -788,10 +1025,6 @@ impl<EVT> BinningggContainerEventsDyn for ContainerEvents<EVT>
where
EVT: EventValueType,
{
fn type_name(&self) -> &'static str {
std::any::type_name::<Self>()
}
fn binned_events_timeweight_traitobj(
&self,
range: BinnedRange<TsNano>,
@@ -827,6 +1060,10 @@ where
fn as_mergeable_dyn_mut(&mut self) -> &mut dyn MergeableDyn {
self
}
fn as_collectable_dyn_mut(&mut self) -> &mut dyn CollectableDyn {
self
}
}
#[cfg(test)]

View File

@@ -131,4 +131,5 @@ impl EventValueType for EnumVariant {
type AggTimeWeightOutputAvg = f32;
type IterTy1<'a> = EnumVariantRef<'a>;
const SERDE_ID: u32 = Self::SUB;
const BYTE_ESTIMATE_V00: u32 = 40;
}

View File

@@ -29,7 +29,6 @@ use items_0::TypeName;
use items_0::WithLen;
use netpod::range::evrange::SeriesRange;
use netpod::BinnedRangeEnum;
use netpod::TsMs;
use netpod::TsNano;
use serde::Deserialize;
use serde::Serialize;
@@ -765,12 +764,12 @@ impl MergeableTy for ChannelEvents {
}
}
fn tss_for_testing(&self) -> Vec<TsMs> {
fn tss_for_testing(&self) -> VecDeque<TsNano> {
match self {
ChannelEvents::Events(x) => x.tss_for_testing(),
ChannelEvents::Status(x) => match x {
Some(x) => vec![x.ts.to_ts_ms()],
None => Vec::new(),
Some(x) => [x.ts].into_iter().collect(),
None => VecDeque::new(),
},
}
}
@@ -824,9 +823,13 @@ impl WithLen for ChannelEventsCollectorOutput {
}
}
impl items_0::collect_s::ToJsonValue for ChannelEventsCollectorOutput {
fn to_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
serde_json::to_value(self)
impl ToUserFacingApiType for ChannelEventsCollectorOutput {
fn to_user_facing_api_type(self: Self) -> Box<dyn UserApiType> {
todo!()
}
fn to_user_facing_api_type_box(self: Box<Self>) -> Box<dyn UserApiType> {
todo!()
}
}
@@ -876,13 +879,8 @@ impl CollectorDyn for ChannelEventsCollector {
if let Some(item) = item.as_any_mut().downcast_mut::<ChannelEvents>() {
match item {
ChannelEvents::Events(item) => {
// let coll = self.coll.get_or_insert_with(|| {
// item.as_ref()
// .as_collectable_with_default_ref()
// .new_collector()
// });
// coll.ingest(item.as_collectable_with_default_mut());
todo!()
let coll = self.coll.get_or_insert_with(|| item.new_collector());
coll.ingest(item.as_collectable_dyn_mut());
}
ChannelEvents::Status(_) => {
// TODO decide on output format to collect also the connection status events
@@ -908,21 +906,9 @@ impl CollectorDyn for ChannelEventsCollector {
self.timed_out = true;
}
fn set_continue_at_here(&mut self) {
self.needs_continue_at = true;
}
fn result(
&mut self,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn CollectedDyn>, err::Error> {
fn result(&mut self) -> Result<Box<dyn CollectedDyn>, err::Error> {
match self.coll.as_mut() {
Some(coll) => {
if self.needs_continue_at {
debug!("ChannelEventsCollector set_continue_at_here");
coll.set_continue_at_here();
}
if self.range_complete {
coll.set_range_complete();
}
@@ -930,7 +916,7 @@ impl CollectorDyn for ChannelEventsCollector {
debug!("ChannelEventsCollector set_timed_out");
coll.set_timed_out();
}
let res = coll.result(range, binrange)?;
let res = coll.result()?;
Ok(res)
}
None => {

View File

@@ -251,11 +251,8 @@ impl MergeableTy for EventFull {
None
}
fn tss_for_testing(&self) -> Vec<netpod::TsMs> {
self.tss
.iter()
.map(|x| netpod::TsMs::from_ns_u64(*x))
.collect()
fn tss_for_testing(&self) -> VecDeque<TsNano> {
self.tss.iter().map(|&x| TsNano::from_ns(x)).collect()
}
fn is_consistent(&self) -> bool {

View File

@@ -1,6 +1,7 @@
use crate::IsoDateTime;
use daqbuf_err as err;
use err::Error;
use items_0::apitypes::ToUserFacingApiType;
use items_0::collect_s::CollectableDyn;
use items_0::collect_s::CollectedDyn;
use items_0::collect_s::CollectorTy;
@@ -380,6 +381,16 @@ impl<STY: ScalarOps> ToJsonValue for EventsDim0CollectorOutput<STY> {
}
}
impl<STY: ScalarOps> ToUserFacingApiType for EventsDim0CollectorOutput<STY> {
fn to_user_facing_api_type(self: Self) -> Box<dyn items_0::apitypes::UserApiType> {
todo!()
}
fn to_user_facing_api_type_box(self: Box<Self>) -> Box<dyn items_0::apitypes::UserApiType> {
todo!()
}
}
impl<STY: ScalarOps> CollectedDyn for EventsDim0CollectorOutput<STY> {}
impl<STY: ScalarOps> CollectorTy for EventsDim0Collector<STY> {
@@ -401,15 +412,8 @@ impl<STY: ScalarOps> CollectorTy for EventsDim0Collector<STY> {
self.needs_continue_at = true;
}
fn set_continue_at_here(&mut self) {
self.needs_continue_at = true;
}
fn result(
&mut self,
range: Option<SeriesRange>,
_binrange: Option<BinnedRangeEnum>,
) -> Result<Self::Output, Error> {
fn result(&mut self) -> Result<Self::Output, Error> {
let range: Option<SeriesRange> = None;
debug!(
"{} result() needs_continue_at {}",
Self::self_name(),
@@ -441,10 +445,8 @@ impl<STY: ScalarOps> CollectorTy for EventsDim0Collector<STY> {
} else {
None
};
let tss_sl = vals.tss.make_contiguous();
let pulses_sl = vals.pulses.make_contiguous();
let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(tss_sl);
let (pulse_anchor, pulse_off) = crate::offsets::pulse_offs_from_abs(pulses_sl);
let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(todo!());
let (pulse_anchor, pulse_off) = crate::offsets::pulse_offs_from_abs(&self.vals.pulses);
let values = mem::replace(&mut vals.values, VecDeque::new());
if ts_off_ms.len() != ts_off_ns.len() {
return Err(Error::with_msg_no_trace("collected len mismatch"));
@@ -677,14 +679,10 @@ impl<STY: ScalarOps> Events for EventsDim0<STY> {
fn to_json_string(&self) -> String {
// TODO redesign with mut access, rename to `into_` and take the values out.
let mut tss = self.tss.clone();
let mut pulses = self.pulses.clone();
let mut values = self.values.clone();
let tss_sl = tss.make_contiguous();
let pulses_sl = pulses.make_contiguous();
let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(tss_sl);
let (pulse_anchor, pulse_off) = crate::offsets::pulse_offs_from_abs(pulses_sl);
let values = mem::replace(&mut values, VecDeque::new());
let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(todo!());
let (pulse_anchor, pulse_off) = crate::offsets::pulse_offs_from_abs(&self.pulses);
// let values = mem::replace(&mut values, VecDeque::new());
let ret = EventsDim0CollectorOutput {
ts_anchor_sec,
ts_off_ms,

View File

@@ -1,5 +1,6 @@
use daqbuf_err as err;
use err::Error;
use items_0::apitypes::ToUserFacingApiType;
use items_0::collect_s::CollectableDyn;
use items_0::collect_s::CollectedDyn;
use items_0::collect_s::CollectorDyn;
@@ -127,8 +128,12 @@ impl TypeName for EventsDim0EnumCollectorOutput {
}
}
impl ToJsonValue for EventsDim0EnumCollectorOutput {
fn to_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
impl ToUserFacingApiType for EventsDim0EnumCollectorOutput {
fn to_user_facing_api_type(self: Self) -> Box<dyn items_0::apitypes::UserApiType> {
todo!()
}
fn to_user_facing_api_type_box(self: Box<Self>) -> Box<dyn items_0::apitypes::UserApiType> {
todo!()
}
}
@@ -154,20 +159,13 @@ impl CollectorTy for EventsDim0EnumCollector {
self.needs_continue_at = true;
}
fn set_continue_at_here(&mut self) {
self.needs_continue_at = true;
}
fn result(
&mut self,
range: Option<SeriesRange>,
_binrange: Option<BinnedRangeEnum>,
) -> Result<EventsDim0EnumCollectorOutput, Error> {
fn result(&mut self) -> Result<EventsDim0EnumCollectorOutput, Error> {
trace_collect_result!(
"{} result() needs_continue_at {}",
self.type_name(),
self.needs_continue_at
);
let range: Option<SeriesRange> = None;
// If we timed out, we want to hint the client from where to continue.
// This is tricky: currently, client can not request a left-exclusive range.
// We currently give the timestamp of the last event plus a small delta.
@@ -194,8 +192,7 @@ impl CollectorTy for EventsDim0EnumCollector {
} else {
None
};
let tss_sl = vals.tss.make_contiguous();
let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(tss_sl);
let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(todo!());
let valixs = mem::replace(&mut vals.values, VecDeque::new());
let valstrs = mem::replace(&mut vals.valuestrs, VecDeque::new());
let vals = valixs;

View File

@@ -1,6 +1,7 @@
use crate::IsoDateTime;
use daqbuf_err as err;
use err::Error;
use items_0::apitypes::ToUserFacingApiType;
use items_0::collect_s::CollectableDyn;
use items_0::collect_s::CollectableType;
use items_0::collect_s::CollectedDyn;
@@ -335,9 +336,13 @@ impl<STY: ScalarOps> WithLen for EventsDim1CollectorOutput<STY> {
}
}
impl<STY: ScalarOps> ToJsonValue for EventsDim1CollectorOutput<STY> {
fn to_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
serde_json::to_value(self)
impl<STY: ScalarOps> ToUserFacingApiType for EventsDim1CollectorOutput<STY> {
fn to_user_facing_api_type(self: Self) -> Box<dyn items_0::apitypes::UserApiType> {
todo!()
}
fn to_user_facing_api_type_box(self: Box<Self>) -> Box<dyn items_0::apitypes::UserApiType> {
todo!()
}
}
@@ -361,22 +366,14 @@ impl<STY: ScalarOps> CollectorTy for EventsDim1Collector<STY> {
self.timed_out = true;
}
fn set_continue_at_here(&mut self) {
debug!("{}::set_continue_at_here", Self::self_name());
self.needs_continue_at = true;
}
// TODO unify with dim0 case
fn result(
&mut self,
range: Option<SeriesRange>,
_binrange: Option<BinnedRangeEnum>,
) -> Result<Self::Output, Error> {
fn result(&mut self) -> Result<Self::Output, Error> {
// If we timed out, we want to hint the client from where to continue.
// This is tricky: currently, client can not request a left-exclusive range.
// We currently give the timestamp of the last event plus a small delta.
// The amount of the delta must take into account what kind of timestamp precision the client
// can parse and handle.
let range: Option<SeriesRange> = None;
let vals = &mut self.vals;
let continue_at = if self.timed_out {
if let Some(ts) = vals.tss.back() {
@@ -398,10 +395,8 @@ impl<STY: ScalarOps> CollectorTy for EventsDim1Collector<STY> {
} else {
None
};
let tss_sl = vals.tss.make_contiguous();
let pulses_sl = vals.pulses.make_contiguous();
let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(tss_sl);
let (pulse_anchor, pulse_off) = crate::offsets::pulse_offs_from_abs(pulses_sl);
let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(todo!());
let (pulse_anchor, pulse_off) = crate::offsets::pulse_offs_from_abs(&vals.pulses);
let values = mem::replace(&mut vals.values, VecDeque::new());
if ts_off_ms.len() != ts_off_ns.len() {
return Err(Error::with_msg_no_trace("collected len mismatch"));

View File

@@ -1,4 +1,5 @@
pub mod accounting;
pub mod apitypes;
pub mod binning;
pub mod channelevents;
pub mod empty;
@@ -27,7 +28,7 @@ mod log {
#[cfg(not(test))]
pub use netpod::log::*;
#[cfg(test)]
pub use netpod::log::*;
pub use netpod::log_direct::*;
}
#[derive(Debug, PartialEq)]

View File

@@ -1,3 +1,6 @@
#[cfg(test)]
mod test;
use crate::log::*;
use core::ops::Range;
use futures_util::Stream;
@@ -34,6 +37,8 @@ macro_rules! trace3 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace4 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
#[derive(Debug, thiserror::Error)]
#[cstm(name = "MergerError")]
pub enum Error {
@@ -56,8 +61,7 @@ pub struct Merger<T> {
out_of_band_queue: VecDeque<Sitemty<T>>,
log_queue: VecDeque<LogItem>,
dim0ix_max: TsNano,
done_data: bool,
done_buffered: bool,
done_inp: bool,
done_range_complete: bool,
complete: bool,
poll_count: usize,
@@ -75,8 +79,7 @@ where
.field("out_max_len", &self.out_max_len)
.field("range_complete", &self.range_complete)
.field("out_of_band_queue", &self.out_of_band_queue.len())
.field("done_data", &self.done_data)
.field("done_buffered", &self.done_buffered)
.field("done_data", &self.done_inp)
.field("done_range_complete", &self.done_range_complete)
.finish()
}
@@ -98,8 +101,7 @@ where
out_of_band_queue: VecDeque::new(),
log_queue: VecDeque::new(),
dim0ix_max: TsNano::from_ns(0),
done_data: false,
done_buffered: false,
done_inp: false,
done_range_complete: false,
complete: false,
poll_count: 0,
@@ -380,16 +382,22 @@ where
|| self.do_clear_out
|| last_emit
{
if o.len() > self.out_max_len {
if o.len() > 2 * self.out_max_len {
debug!(
"MERGER OVERWEIGHT ITEM {} vs {}",
"MERGER OVERLENGTH ITEM {} vs {}",
o.len(),
self.out_max_len
);
}
if o.byte_estimate() > 2 * OUT_MAX_BYTES {
debug!(
"MERGER OVERWEIGHT ITEM {} vs {}",
o.byte_estimate(),
OUT_MAX_BYTES
);
}
trace3!("decide to output");
self.do_clear_out = false;
//Break(Ready(Some(Ok(self.out.take().unwrap()))))
let item = sitem_data(self.out.take().unwrap());
self.out_of_band_queue.push_back(item);
Continue(())
@@ -431,9 +439,7 @@ where
let _spg = span1.enter();
loop {
trace3!("poll");
break if let Some(item) = self.log_queue.pop_front() {
Ready(Some(Ok(StreamItem::Log(item))))
} else if self.poll_count == usize::MAX {
break if self.poll_count == usize::MAX {
self.done_range_complete = true;
continue;
} else if self.complete {
@@ -441,46 +447,42 @@ where
} else if self.done_range_complete {
self.complete = true;
Ready(None)
} else if self.done_buffered {
self.done_range_complete = true;
if self.range_complete.iter().all(|x| *x) {
trace!("emit RangeComplete");
Ready(Some(Ok(StreamItem::DataItem(
RangeCompletableItem::RangeComplete,
))))
} else {
continue;
}
} else if self.done_data {
trace!("done_data");
self.done_buffered = true;
if let Some(out) = self.out.take() {
trace!("done_data emit buffered len {}", out.len());
Ready(Some(sitem_data(out)))
} else {
continue;
}
} else if let Some(item) = self.log_queue.pop_front() {
Ready(Some(Ok(StreamItem::Log(item))))
} else if let Some(item) = self.out_of_band_queue.pop_front() {
trace_emit!("emit item");
let item = on_sitemty_data!(item, |k: T| {
trace3!("emit out-of-band data len {}", k.len());
trace_emit!("emit item len {}", k.len());
sitem_data(k)
});
Ready(Some(item))
} else {
} else if self.done_inp == false {
match Self::poll2(self.as_mut(), cx) {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(k) => match k {
Ready(Some(e)) => {
self.done_data = true;
self.done_inp = true;
Ready(Some(Err(sitem_err2_from_string(e))))
}
Ready(None) => {
self.done_data = true;
self.done_inp = true;
if let Some(out) = self.out.take() {
trace!("done_data emit buffered len {}", out.len());
self.out_of_band_queue.push_back(sitem_data(out));
}
continue;
}
Pending => Pending,
},
}
} else {
self.done_range_complete = true;
if self.range_complete.iter().all(|x| *x) {
trace!("emit RangeComplete");
let item = Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete));
self.out_of_band_queue.push_back(item);
}
continue;
};
}
}

32
src/merger/test.rs Normal file
View File

@@ -0,0 +1,32 @@
use super::MergeInp;
use super::Merger;
use crate::binning::container_events::ContainerEvents;
use crate::log::*;
use futures_util::StreamExt;
use items_0::streamitem::sitem_data;
use netpod::TsNano;
async fn merger_00_inner() {
let mut evs0 = ContainerEvents::<f32>::new();
evs0.push_back(TsNano::from_ns(9), 9.0);
let mut evs1 = ContainerEvents::<f32>::new();
evs1.push_back(TsNano::from_ns(11), 11.0);
let inp0: MergeInp<_> = Box::pin(futures_util::stream::iter([
sitem_data(evs0),
sitem_data(evs1),
]));
let inps = vec![inp0];
let mut merger = Merger::new(inps, None);
while let Some(x) = merger.next().await {
trace!("{:?}", x);
}
trace!("DONE");
}
#[test]
fn merger_00() {
tokio::runtime::Builder::new_current_thread()
.build()
.unwrap()
.block_on(merger_00_inner());
}

View File

@@ -1,35 +1,30 @@
use netpod::timeunits::MS;
use netpod::timeunits::SEC;
use netpod::TsNano;
use std::collections::VecDeque;
pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, VecDeque<u64>, VecDeque<u64>) {
let ts_anchor_sec = tss.first().map_or(0, |&k| k) / SEC;
let ts_anchor_ns = ts_anchor_sec * SEC;
let ts_off_ms: VecDeque<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect();
let ts_off_ns = tss
.iter()
.zip(ts_off_ms.iter().map(|&k| k * MS))
.map(|(&j, k)| (j - ts_anchor_ns - k))
.collect();
(ts_anchor_sec, ts_off_ms, ts_off_ns)
}
pub fn ts_offs_from_abs_with_anchor(
ts_anchor_sec: u64,
tss: &[u64],
tss: &VecDeque<TsNano>,
) -> (VecDeque<u64>, VecDeque<u64>) {
let ts_anchor_ns = ts_anchor_sec * SEC;
let ts_off_ms: VecDeque<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect();
let ts_off_ms: VecDeque<_> = tss.iter().map(|&k| (k.ns() - ts_anchor_ns) / MS).collect();
let ts_off_ns = tss
.iter()
.zip(ts_off_ms.iter().map(|&k| k * MS))
.map(|(&j, k)| (j - ts_anchor_ns - k))
.map(|(&j, k)| (j.ns() - ts_anchor_ns - k))
.collect();
(ts_off_ms, ts_off_ns)
}
pub fn pulse_offs_from_abs(pulse: &[u64]) -> (u64, VecDeque<u64>) {
let pulse_anchor = pulse.first().map_or(0, |&k| k) / 10000 * 10000;
let pulse_off = pulse.iter().map(|&k| k - pulse_anchor).collect();
pub fn ts_offs_from_abs(tss: &VecDeque<TsNano>) -> (u64, VecDeque<u64>, VecDeque<u64>) {
let ts_anchor_sec = tss.front().map_or(TsNano::from_ns(0), |&k| k).ns() / SEC;
let (ts_off_ms, ts_off_ns) = ts_offs_from_abs_with_anchor(ts_anchor_sec, tss);
(ts_anchor_sec, ts_off_ms, ts_off_ns)
}
pub fn pulse_offs_from_abs(pulses: &VecDeque<u64>) -> (u64, VecDeque<u64>) {
let pulse_anchor = pulses.front().map_or(0, |&k| k) / 10000 * 10000;
let pulse_off = pulses.iter().map(|&k| k - pulse_anchor).collect();
(pulse_anchor, pulse_off)
}