Improve byte estimate

This commit is contained in:
Dominik Werder
2025-03-07 11:42:14 +01:00
parent 57c6561fa2
commit ed23e352f6
17 changed files with 253 additions and 231 deletions

View File

@@ -2,7 +2,7 @@
name = "daqbuf-items-2"
version = "0.0.4"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
edition = "2024"
[dependencies]
serde = { version = "1", features = ["derive"] }
@@ -20,7 +20,6 @@ futures-util = "0.3.24"
humantime-serde = "1.1.1"
itertools = "0.13.0"
autoerr = "0.0.3"
thiserror = "=0.0.1"
daqbuf-err = { path = "../daqbuf-err" }
items_0 = { path = "../daqbuf-items-0", package = "daqbuf-items-0" }
items_proc = { path = "../daqbuf-items-proc", package = "daqbuf-items-proc" }
@@ -28,9 +27,6 @@ netpod = { path = "../daqbuf-netpod", package = "daqbuf-netpod" }
parse = { path = "../daqbuf-parse", package = "daqbuf-parse" }
bitshuffle = { path = "../daqbuf-bitshuffle", package = "daqbuf-bitshuffle" }
[patch.crates-io]
thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" }
[features]
heavy = []

View File

@@ -1,5 +1,4 @@
pub mod aggregator;
pub mod binnedvaluetype;
pub mod container;
pub mod container_bins;
pub mod container_events;

View File

@@ -11,9 +11,9 @@ use netpod::EnumVariant;
use serde::Deserialize;
use serde::Serialize;
macro_rules! trace_event { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
macro_rules! trace_event { ($($arg:expr),*) => ( if false { trace!($($arg),*); }) }
macro_rules! trace_result { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
macro_rules! trace_result { ($($arg:expr),*) => ( if false { trace!($($arg),*); }) }
pub trait AggTimeWeightOutputAvg: BinAggedType + Serialize + for<'a> Deserialize<'a> {}

View File

@@ -1,8 +0,0 @@
pub trait BinnedValueType {}
pub struct BinnedNumericValue<EVT> {
avg: f32,
t: Option<EVT>,
}
impl<EVT> BinnedValueType for BinnedNumericValue<EVT> {}

View File

@@ -96,7 +96,7 @@ impl AggBinValTw<f32> for AggBinValTwF32 {
Self { sum: 0. }
}
fn ingest(&mut self, dt: DtNano, bl: DtNano, cnt: u64, val: f32) {
fn ingest(&mut self, dt: DtNano, bl: DtNano, _cnt: u64, val: f32) {
let f = dt.ns() as f32 / bl.ns() as f32;
self.sum += f * val;
}
@@ -122,7 +122,7 @@ impl AggBinValTw<f64> for AggBinValTwF64 {
Self { sum: 0. }
}
fn ingest(&mut self, dt: DtNano, bl: DtNano, cnt: u64, val: f64) {
fn ingest(&mut self, dt: DtNano, bl: DtNano, _cnt: u64, val: f64) {
let f = dt.ns() as f32 / bl.ns() as f32;
self.sum += f as f64 * val;
}

View File

@@ -6,6 +6,10 @@ use crate::binning::container::bins::BinAggedContainer;
use crate::log::*;
use core::fmt;
use daqbuf_err as err;
use items_0::AsAnyMut;
use items_0::AsAnyRef;
use items_0::TypeName;
use items_0::WithLen;
use items_0::apitypes::ToUserFacingApiType;
use items_0::collect_s::CollectableDyn;
use items_0::collect_s::CollectedDyn;
@@ -16,18 +20,12 @@ use items_0::merge::MergeableTy;
use items_0::timebin::BinningggContainerBinsDyn;
use items_0::timebin::BinsBoxed;
use items_0::vecpreview::VecPreview;
use items_0::AsAnyMut;
use items_0::AsAnyRef;
use items_0::TypeName;
use items_0::WithLen;
use netpod::f32_close;
use netpod::TsNano;
use netpod::f32_close;
use std::any;
use std::collections::VecDeque;
use std::mem;
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
autoerr::create_error_v1!(
name(ContainerBinsError, "ContainerBins"),
enum variants {
@@ -121,7 +119,7 @@ mod container_bins_serde {
EVT: EventValueType,
BVT: BinAggedType,
{
fn serialize<S>(&self, ser: S) -> Result<S::Ok, S::Error>
fn serialize<S>(&self, _ser: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
@@ -134,7 +132,7 @@ mod container_bins_serde {
EVT: EventValueType,
BVT: BinAggedType,
{
fn deserialize<D>(de: D) -> Result<Self, D::Error>
fn deserialize<D>(_de: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
@@ -340,6 +338,17 @@ where
}
}
impl<EVT, BVT> ByteEstimate for ContainerBins<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn byte_estimate(&self) -> u32 {
// TODO
self.len() as u32 * 800
}
}
pub fn compare_boxed_f32(lhs: &ContainerBins<f32, f32>, rhs: &ContainerBins<f32, f32>) -> bool {
if let Some(lhs) = lhs.as_any_ref().downcast_ref::<ContainerBins<f32, f32>>() {
if let Some(rhs) = rhs.as_any_ref().downcast_ref::<ContainerBins<f32, f32>>() {
@@ -457,17 +466,6 @@ where
}
}
impl<EVT, BVT> ByteEstimate for ContainerBins<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn byte_estimate(&self) -> u64 {
// TODO ByteEstimate for ContainerBins
128 * self.len() as u64
}
}
#[derive(Debug)]
pub struct ContainerBinsCollectorOutput<EVT, BVT>
where
@@ -580,9 +578,9 @@ where
EVT: EventValueType,
BVT: BinAggedType,
{
fn byte_estimate(&self) -> u64 {
fn byte_estimate(&self) -> u32 {
// TODO need better estimate
self.bins.len() as u64 * 200
self.bins.len() as u32 * 400
}
}
@@ -720,29 +718,17 @@ where
fn find_lowest_index_gt(&self, ts: TsNano) -> Option<usize> {
let x = self.ts1s.partition_point(|&x| x <= ts);
if x >= self.ts1s.len() {
None
} else {
Some(x)
}
if x >= self.ts1s.len() { None } else { Some(x) }
}
fn find_lowest_index_ge(&self, ts: TsNano) -> Option<usize> {
let x = self.ts1s.partition_point(|&x| x < ts);
if x >= self.ts1s.len() {
None
} else {
Some(x)
}
if x >= self.ts1s.len() { None } else { Some(x) }
}
fn find_highest_index_lt(&self, ts: TsNano) -> Option<usize> {
let x = self.ts1s.partition_point(|&x| x < ts);
if x == 0 {
None
} else {
Some(x - 1)
}
if x == 0 { None } else { Some(x - 1) }
}
fn tss_for_testing(&self) -> VecDeque<TsNano> {

View File

@@ -37,6 +37,7 @@ use netpod::TsNano;
use serde::Deserialize;
use serde::Serialize;
use std::any;
use std::cell::RefCell;
use std::collections::VecDeque;
macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
@@ -70,6 +71,7 @@ where
fn truncate_front(&mut self, len: usize);
fn into_user_facing_fields(self) -> Vec<(String, Box<dyn erased_serde::Serialize>)>;
fn into_user_facing_fields_json(self) -> Vec<(String, Box<dyn erased_serde::Serialize>)>;
fn byte_estimate(&self) -> u32;
}
pub trait PartialOrdEvtA<EVT> {
@@ -82,9 +84,9 @@ pub trait EventValueType: fmt::Debug + Clone + PartialOrd + Send + Unpin + 'stat
type AggTimeWeightOutputAvg: AggTimeWeightOutputAvg;
type IterTy1<'a>: fmt::Debug + Clone + PartialOrdEvtA<Self> + Into<Self>;
const SERDE_ID: u32;
const BYTE_ESTIMATE_V00: u32;
fn to_f32_for_binning_v01(&self) -> f32;
fn scalar_type_name_string() -> String;
fn byte_estimate(&self) -> u32;
}
impl<EVT> Container<EVT> for VecDeque<EVT>
@@ -134,6 +136,10 @@ where
fn into_user_facing_fields_json(self) -> Vec<(String, Box<dyn erased_serde::Serialize>)> {
vec![("values".into(), Box::new(self))]
}
fn byte_estimate(&self) -> u32 {
self.iter().fold(0, |a, x| a + x.byte_estimate())
}
}
impl Container<String> for VecDeque<String> {
@@ -179,6 +185,10 @@ impl Container<String> for VecDeque<String> {
fn into_user_facing_fields_json(self) -> Vec<(String, Box<dyn erased_serde::Serialize>)> {
vec![("values".into(), Box::new(self))]
}
fn byte_estimate(&self) -> u32 {
self.iter().fold(0, |a, x| a + x.bytes().len() as u32)
}
}
macro_rules! impl_event_value_type {
@@ -189,15 +199,16 @@ macro_rules! impl_event_value_type {
type AggTimeWeightOutputAvg = f64;
type IterTy1<'a> = $evt;
const SERDE_ID: u32 = <$evt as SubFrId>::SUB as _;
const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<$evt>() as u32;
fn to_f32_for_binning_v01(&self) -> f32 {
*self as _
}
fn scalar_type_name_string() -> String {
$sctname.to_string()
}
fn byte_estimate(&self) -> u32 {
std::mem::size_of::<$evt>() as u32
}
}
impl PartialOrdEvtA<$evt> for $evt {
fn cmp_a(&self, other: &$evt) -> Option<std::cmp::Ordering> {
self.partial_cmp(other)
@@ -247,13 +258,15 @@ impl EventValueType for f32 {
type AggTimeWeightOutputAvg = f32;
type IterTy1<'a> = f32;
const SERDE_ID: u32 = <f32 as SubFrId>::SUB as _;
const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<Self>() as u32;
fn to_f32_for_binning_v01(&self) -> f32 {
*self as _
}
fn scalar_type_name_string() -> String {
"f32".to_string()
}
fn byte_estimate(&self) -> u32 {
std::mem::size_of::<Self>() as u32
}
}
impl EventValueType for f64 {
@@ -262,13 +275,15 @@ impl EventValueType for f64 {
type AggTimeWeightOutputAvg = f64;
type IterTy1<'a> = f64;
const SERDE_ID: u32 = <f64 as SubFrId>::SUB as _;
const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<Self>() as u32;
fn to_f32_for_binning_v01(&self) -> f32 {
*self as _
}
fn scalar_type_name_string() -> String {
"f64".to_string()
}
fn byte_estimate(&self) -> u32 {
std::mem::size_of::<Self>() as u32
}
}
impl EventValueType for bool {
@@ -277,13 +292,15 @@ impl EventValueType for bool {
type AggTimeWeightOutputAvg = f64;
type IterTy1<'a> = bool;
const SERDE_ID: u32 = <bool as SubFrId>::SUB as _;
const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<Self>() as u32;
fn to_f32_for_binning_v01(&self) -> f32 {
f32::from(*self)
}
fn scalar_type_name_string() -> String {
"bool".to_string()
}
fn byte_estimate(&self) -> u32 {
std::mem::size_of::<Self>() as u32
}
}
impl EventValueType for String {
@@ -292,13 +309,15 @@ impl EventValueType for String {
type AggTimeWeightOutputAvg = f64;
type IterTy1<'a> = &'a str;
const SERDE_ID: u32 = <String as SubFrId>::SUB as _;
const BYTE_ESTIMATE_V00: u32 = 400;
fn to_f32_for_binning_v01(&self) -> f32 {
self.len() as _
}
fn scalar_type_name_string() -> String {
"string".to_string()
}
fn byte_estimate(&self) -> u32 {
self.bytes().len() as u32
}
}
macro_rules! impl_event_value_type_vec {
@@ -310,13 +329,16 @@ macro_rules! impl_event_value_type_vec {
type IterTy1<'a> = Vec<$evt>;
const SERDE_ID: u32 = <Vec<$evt> as SubFrId>::SUB as _;
// TODO must use a more precise number dependent on actual elements
const BYTE_ESTIMATE_V00: u32 = 1200 * core::mem::size_of::<Self>() as u32;
fn to_f32_for_binning_v01(&self) -> f32 {
self.iter().fold(0., |a, x| a + *x as f32)
}
fn scalar_type_name_string() -> String {
$sctname.to_string()
}
fn byte_estimate(&self) -> u32 {
self.iter()
.fold(0, |a, x| a + EventValueType::byte_estimate(x))
}
}
impl PartialOrdEvtA<Vec<$evt>> for Vec<$evt> {
@@ -346,14 +368,15 @@ impl EventValueType for Vec<bool> {
type AggTimeWeightOutputAvg = f32;
type IterTy1<'a> = Vec<bool>;
const SERDE_ID: u32 = <Vec<bool> as SubFrId>::SUB as _;
// TODO must use a more precise number dependent on actual elements
const BYTE_ESTIMATE_V00: u32 = 1200 * core::mem::size_of::<Self>() as u32;
fn to_f32_for_binning_v01(&self) -> f32 {
self.iter().fold(0., |a, x| a + f32::from(*x))
}
fn scalar_type_name_string() -> String {
"bool".to_string()
}
fn byte_estimate(&self) -> u32 {
self.len() as u32 * std::mem::size_of::<Self>() as u32
}
}
impl PartialOrdEvtA<Vec<bool>> for Vec<bool> {
@@ -368,14 +391,16 @@ impl EventValueType for Vec<String> {
type AggTimeWeightOutputAvg = f32;
type IterTy1<'a> = Vec<String>;
const SERDE_ID: u32 = <Vec<String> as SubFrId>::SUB as _;
// TODO must use a more precise number dependent on actual elements
const BYTE_ESTIMATE_V00: u32 = 1200 * core::mem::size_of::<Self>() as u32;
fn to_f32_for_binning_v01(&self) -> f32 {
self.iter().fold(0., |a, x| a + x.len() as f32)
}
fn scalar_type_name_string() -> String {
"string".to_string()
}
fn byte_estimate(&self) -> u32 {
self.iter()
.fold(0, |a, x| a + EventValueType::byte_estimate(x))
}
}
impl PartialOrdEvtA<Vec<String>> for Vec<String> {
@@ -390,14 +415,16 @@ impl EventValueType for Vec<EnumVariant> {
type AggTimeWeightOutputAvg = f32;
type IterTy1<'a> = Vec<EnumVariant>;
const SERDE_ID: u32 = <Vec<EnumVariant> as SubFrId>::SUB as _;
// TODO must use a more precise number dependent on actual elements
const BYTE_ESTIMATE_V00: u32 = 1200 * core::mem::size_of::<Self>() as u32;
fn to_f32_for_binning_v01(&self) -> f32 {
self.iter().fold(0., |a, x| a + x.ix() as f32)
}
fn scalar_type_name_string() -> String {
"enum".to_string()
}
fn byte_estimate(&self) -> u32 {
self.iter()
.fold(0, |a, x| a + EventValueType::byte_estimate(x))
}
}
impl PartialOrdEvtA<Vec<EnumVariant>> for Vec<EnumVariant> {
@@ -484,7 +511,7 @@ mod serde_pulsed_val {
where
EVT: EventValueType,
{
fn deserialize<D>(de: D) -> Result<Self, D::Error>
fn deserialize<D>(_de: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
@@ -586,6 +613,10 @@ where
("values".into(), Box::new(self.vals)),
]
}
fn byte_estimate(&self) -> u32 {
self.len() as u32 * 8 + self.vals.byte_estimate()
}
}
impl<EVT> PartialOrdEvtA<PulsedVal<EVT>> for PulsedVal<EVT>
@@ -606,13 +637,15 @@ where
type AggTimeWeightOutputAvg = EVT::AggTimeWeightOutputAvg;
type IterTy1<'a> = PulsedValIterTy<'a, EVT>;
const SERDE_ID: u32 = items_0::subfr::pulsed_subfr(<EVT as SubFrId>::SUB) as _;
const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<EVT>() as u32;
fn to_f32_for_binning_v01(&self) -> f32 {
self.1.to_f32_for_binning_v01()
}
fn scalar_type_name_string() -> String {
EVT::scalar_type_name_string()
}
fn byte_estimate(&self) -> u32 {
8 + self.1.byte_estimate()
}
}
#[derive(Debug, Clone)]
@@ -663,7 +696,7 @@ where
{
tss: VecDeque<TsNano>,
vals: <EVT as EventValueType>::Container,
byte_estimate: u64,
byte_estimate: RefCell<Option<u32>>,
}
mod container_events_serde {
@@ -677,10 +710,11 @@ mod container_events_serde {
use serde::Deserializer;
use serde::Serialize;
use serde::Serializer;
use std::cell::RefCell;
use std::fmt;
use std::marker::PhantomData;
macro_rules! trace_serde { ($($arg:tt)*) => ( if true { eprintln!($($arg)*); }) }
macro_rules! trace_serde { ($($arg:expr),*) => ( if false { eprintln!($($arg),*); }) }
impl<EVT> Serialize for ContainerEvents<EVT>
where
@@ -726,8 +760,7 @@ mod container_events_serde {
let ret = Self::Value {
tss,
vals,
// TODO make container recompute byte_estimate
byte_estimate: 0,
byte_estimate: RefCell::new(None),
};
Ok(ret)
}
@@ -756,7 +789,7 @@ mod container_events_serde {
let ret = Self::Value {
tss: tss.unwrap(),
vals: vals.unwrap(),
byte_estimate: 0,
byte_estimate: RefCell::new(None),
};
Ok(ret)
}
@@ -787,7 +820,7 @@ where
Self {
tss,
vals,
byte_estimate: 0,
byte_estimate: RefCell::new(None),
}
}
@@ -799,7 +832,7 @@ where
Self {
tss: VecDeque::new(),
vals: Container::new(),
byte_estimate: 0,
byte_estimate: RefCell::new(None),
}
}
@@ -822,6 +855,7 @@ where
pub fn push_back(&mut self, ts: TsNano, val: EVT) {
self.tss.push_back(ts);
self.vals.push_back(val);
self.byte_estimate = RefCell::new(None);
}
pub fn iter_zip<'a>(&'a self) -> impl Iterator<Item = (TsNano, EVT::IterTy1<'a>)> {
@@ -835,7 +869,7 @@ where
pub fn clear(&mut self) {
self.tss.clear();
self.vals.clear();
self.byte_estimate = 0;
*self.byte_estimate.borrow_mut() = Some(0);
}
pub fn truncate_front(&mut self, len: usize) {
@@ -845,6 +879,16 @@ where
self.vals.truncate_front(len);
}
}
fn byte_estimate_mut(&self) -> u32 {
if let Some(x) = self.byte_estimate.borrow().clone() {
x
} else {
let byte_est = 8 * self.len() as u32 + self.vals.byte_estimate();
*self.byte_estimate.borrow_mut() = Some(byte_est);
byte_est
}
}
}
impl<EVT> fmt::Debug for ContainerEvents<EVT>
@@ -890,15 +934,6 @@ where
}
}
impl<EVT> ByteEstimate for ContainerEvents<EVT>
where
EVT: EventValueType,
{
fn byte_estimate(&self) -> u64 {
self.byte_estimate
}
}
impl<EVT> Empty for ContainerEvents<EVT>
where
EVT: EventValueType,
@@ -917,6 +952,15 @@ where
}
}
impl<EVT> ByteEstimate for ContainerEvents<EVT>
where
EVT: EventValueType,
{
fn byte_estimate(&self) -> u32 {
self.byte_estimate_mut()
}
}
pub struct ContainerEventsTakeUpTo<'a, EVT>
where
EVT: EventValueType,
@@ -1169,8 +1213,8 @@ impl<EVT> ByteEstimate for ContainerEventsCollector<EVT>
where
EVT: EventValueType,
{
fn byte_estimate(&self) -> u64 {
EVT::BYTE_ESTIMATE_V00 as _
fn byte_estimate(&self) -> u32 {
self.evs.byte_estimate()
}
}

View File

@@ -1,30 +1,28 @@
use super::timeweight_bins_lazy::BinnedBinsTimeweightLazy;
use crate::log::*;
use crate::log;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::sitem_err2_from_string;
use items_0::streamitem::LogItem;
use items_0::streamitem::Sitemty;
use items_0::timebin::BinnedBinsTimeweightTrait;
use items_0::streamitem::sitem_err2_from_string;
use items_0::timebin::BinningggContainerBinsDyn;
use items_0::timebin::BinsBoxed;
use netpod::BinnedRange;
use netpod::TsNano;
use std::fmt;
use std::ops::ControlFlow;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
macro_rules! trace_input_container { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
macro_rules! trace_input_container { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); }) }
macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
macro_rules! trace_emit { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); }) }
#[derive(Debug, thiserror::Error)]
#[cstm(name = "BinnedEventsTimeweightDyn")]
pub enum Error {
InnerDynMissing,
}
autoerr::create_error_v1!(
name(Error, "BinnedEventsTimeweightDyn"),
enum variants {
InnerDynMissing,
},
);
type ItemA = Box<dyn BinningggContainerBinsDyn>;
type ItemB = Sitemty<ItemA>;
@@ -56,10 +54,10 @@ impl BinnedBinsTimeweightStream {
item: ItemB,
_cx: &mut Context,
) -> ControlFlow<Poll<Option<<Self as Stream>::Item>>> {
use items_0::streamitem::RangeCompletableItem::*;
use items_0::streamitem::StreamItem::*;
use ControlFlow::*;
use Poll::*;
use items_0::streamitem::RangeCompletableItem::*;
use items_0::streamitem::StreamItem::*;
match item {
Ok(x) => match x {
DataItem(x) => match x {
@@ -104,9 +102,9 @@ impl BinnedBinsTimeweightStream {
_cx: &mut Context,
) -> Poll<Option<<Self as Stream>::Item>> {
trace_input_container!("handle_eos");
use Poll::*;
use items_0::streamitem::RangeCompletableItem::*;
use items_0::streamitem::StreamItem::*;
use Poll::*;
self.state = StreamState::Done;
if self.range_complete {
self.binned
@@ -123,7 +121,8 @@ impl BinnedBinsTimeweightStream {
Ready(Some(Ok(DataItem(Data(x)))))
}
None => {
let item = LogItem::from_node(888, Level::INFO, format!("no bins ready on eos"));
let item =
LogItem::from_node(888, log::Level::INFO, format!("no bins ready on eos"));
Ready(Some(Ok(Log(item))))
}
}

View File

@@ -6,9 +6,11 @@ use crate::log;
use daqbuf_err as err;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::sitem_data;
use items_0::streamitem::LogItem;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::streamitem::sitem_data;
use items_0::timebin::BinnedEventsTimeweightTrait;
use items_0::timebin::BinningggContainerBinsDyn;
use items_0::timebin::BinningggContainerEventsDyn;
@@ -27,7 +29,7 @@ macro_rules! debug_input_container { ($($arg:expr),*) => ( if true { log::debug!
macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); }) }
macro_rules! trace_input_container { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); }) }
macro_rules! trace_input_container { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); }) }
macro_rules! trace_emit { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); }) }
@@ -287,10 +289,10 @@ impl BinnedEventsTimeweightStream {
item: Sitemty<ChannelEvents>,
_cx: &mut Context,
) -> ControlFlow<Poll<Option<<Self as Stream>::Item>>> {
use items_0::streamitem::RangeCompletableItem::*;
use items_0::streamitem::StreamItem::*;
use ControlFlow::*;
use Poll::*;
use items_0::streamitem::RangeCompletableItem::*;
use items_0::streamitem::StreamItem::*;
match item {
Ok(x) => match x {
DataItem(x) => match x {
@@ -322,8 +324,8 @@ impl BinnedEventsTimeweightStream {
}
}
fn test1(
mut self: Pin<&mut Self>,
fn _test1(
self: Pin<&mut Self>,
_cx: &mut Context,
) -> Result<ControlFlow<Poll<Option<<Self as Stream>::Item>>>, Error> {
use ControlFlow::*;
@@ -339,8 +341,8 @@ impl BinnedEventsTimeweightStream {
}
}
fn test2(
mut self: Pin<&mut Self>,
fn _test2(
self: Pin<&mut Self>,
_cx: &mut Context,
) -> ControlFlow<Result<Poll<Option<<Self as Stream>::Item>>, Error>> {
use ControlFlow::*;
@@ -350,19 +352,15 @@ impl BinnedEventsTimeweightStream {
} else if false {
Continue(())
} else {
let e = Error::Dummy;
let _e = Error::Dummy;
// unfortunately can not use the `?` operator here:
// let _ = Err(e)?;
Break(Ok(Pending))
}
}
fn handle_eos(
mut self: Pin<&mut Self>,
_cx: &mut Context,
) -> Poll<Option<<Self as Stream>::Item>> {
fn handle_eos(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), err::Error> {
debug_input_container!("handle_eos range final {}", self.range_final);
use Poll::*;
self.state = StreamState::Remains;
if true || self.range_final {
self.binned_events
@@ -373,7 +371,7 @@ impl BinnedEventsTimeweightStream {
.input_done_range_open()
.map_err(err::Error::from_string)?;
}
Ready(None)
Ok(())
}
fn handle_remains(
@@ -381,9 +379,9 @@ impl BinnedEventsTimeweightStream {
_cx: &mut Context,
) -> Poll<Option<<Self as Stream>::Item>> {
debug_input_container!("handle_remains");
use Poll::*;
use items_0::streamitem::RangeCompletableItem::*;
use items_0::streamitem::StreamItem::*;
use Poll::*;
debug!("handle_remains binner {:?}", self.binned_events);
match self
.binned_events
@@ -409,7 +407,7 @@ impl BinnedEventsTimeweightStream {
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> ControlFlow<Poll<Option<<Self as Stream>::Item>>> {
debug_input_container!("handle_main");
trace_input_container!("handle_main");
use ControlFlow::*;
use Poll::*;
let ret = match &self.state {
@@ -418,7 +416,10 @@ impl BinnedEventsTimeweightStream {
StreamState::Reading => match self.as_mut().inp.poll_next_unpin(cx) {
Ready(Some(x)) => self.as_mut().handle_sitemty(x, cx),
Ready(None) => {
self.as_mut().handle_eos(cx);
match self.as_mut().handle_eos(cx) {
Ok(()) => {}
Err(e) => return Break(Ready(Some(Err(e)))),
}
Continue(())
}
Pending => Break(Pending),
@@ -436,6 +437,10 @@ impl BinnedEventsTimeweightStream {
if let Break(Ready(Some(Err(_)))) = ret {
self.state = StreamState::Done;
}
if let Break(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))) = &ret
{
trace_emit!("emit item len {}", item.len());
}
ret
}
}

View File

@@ -102,6 +102,10 @@ impl Container<EnumVariant> for EnumVariantContainer {
("valuestrings".into(), Box::new(self.names)),
]
}
fn byte_estimate(&self) -> u32 {
self.len() as u32 * 24
}
}
#[derive(Debug)]
@@ -140,11 +144,7 @@ impl<'a> PartialOrdEvtA<EnumVariant> for EnumVariantRef<'a> {
let x = self.ix.partial_cmp(&other.ix());
if let Some(Equal) = x {
let x = self.name.partial_cmp(other.name());
if let Some(Equal) = x {
Some(Equal)
} else {
x
}
if let Some(Equal) = x { Some(Equal) } else { x }
} else {
x
}
@@ -157,23 +157,25 @@ impl EventValueType for EnumVariant {
type AggTimeWeightOutputAvg = f32;
type IterTy1<'a> = EnumVariantRef<'a>;
const SERDE_ID: u32 = <Self as SubFrId>::SUB as u32;
const BYTE_ESTIMATE_V00: u32 = 40;
fn to_f32_for_binning_v01(&self) -> f32 {
self.ix() as _
}
fn scalar_type_name_string() -> String {
"enum".to_string()
}
fn byte_estimate(&self) -> u32 {
60
}
}
impl PartialOrdEvtA<netpod::UnsupEvt> for netpod::UnsupEvt {
fn cmp_a(&self, other: &netpod::UnsupEvt) -> Option<std::cmp::Ordering> {
fn cmp_a(&self, _other: &netpod::UnsupEvt) -> Option<std::cmp::Ordering> {
todo!()
}
}
impl PartialOrdEvtA<Vec<netpod::UnsupEvt>> for Vec<netpod::UnsupEvt> {
fn cmp_a(&self, other: &Vec<netpod::UnsupEvt>) -> Option<std::cmp::Ordering> {
fn cmp_a(&self, _other: &Vec<netpod::UnsupEvt>) -> Option<std::cmp::Ordering> {
todo!()
}
}
@@ -186,7 +188,7 @@ impl AggregatorTimeWeight<netpod::UnsupEvt> for UnsupEvtAgg {
todo!()
}
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: netpod::UnsupEvt) {
fn ingest(&mut self, _dt: DtNano, _bl: DtNano, _val: netpod::UnsupEvt) {
todo!()
}
@@ -196,7 +198,7 @@ impl AggregatorTimeWeight<netpod::UnsupEvt> for UnsupEvtAgg {
fn result_and_reset_for_new_bin(
&mut self,
filled_width_fraction: f32,
_filled_width_fraction: f32,
) -> <netpod::UnsupEvt as EventValueType>::AggTimeWeightOutputAvg {
todo!()
}
@@ -207,7 +209,7 @@ impl AggregatorTimeWeight<Vec<netpod::UnsupEvt>> for UnsupEvtAgg {
todo!()
}
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: Vec<netpod::UnsupEvt>) {
fn ingest(&mut self, _dt: DtNano, _bl: DtNano, _val: Vec<netpod::UnsupEvt>) {
todo!()
}
@@ -217,7 +219,7 @@ impl AggregatorTimeWeight<Vec<netpod::UnsupEvt>> for UnsupEvtAgg {
fn result_and_reset_for_new_bin(
&mut self,
filled_width_fraction: f32,
_filled_width_fraction: f32,
) -> <Vec<netpod::UnsupEvt> as EventValueType>::AggTimeWeightOutputAvg {
todo!()
}
@@ -229,13 +231,15 @@ impl EventValueType for netpod::UnsupEvt {
type AggTimeWeightOutputAvg = f32;
type IterTy1<'a> = netpod::UnsupEvt;
const SERDE_ID: u32 = <Self as SubFrId>::SUB as u32;
const BYTE_ESTIMATE_V00: u32 = 4;
fn to_f32_for_binning_v01(&self) -> f32 {
0.
}
fn scalar_type_name_string() -> String {
"unsupevt".to_string()
}
fn byte_estimate(&self) -> u32 {
345
}
}
impl EventValueType for Vec<netpod::UnsupEvt> {
@@ -244,11 +248,13 @@ impl EventValueType for Vec<netpod::UnsupEvt> {
type AggTimeWeightOutputAvg = f32;
type IterTy1<'a> = Vec<netpod::UnsupEvt>;
const SERDE_ID: u32 = <Self as SubFrId>::SUB as u32;
const BYTE_ESTIMATE_V00: u32 = 4;
fn to_f32_for_binning_v01(&self) -> f32 {
0.
}
fn scalar_type_name_string() -> String {
"unsupevt".to_string()
}
fn byte_estimate(&self) -> u32 {
self.len() as u32 * 345
}
}

View File

@@ -4,6 +4,12 @@ use crate::framable::FrameType;
use crate::log::*;
use core::ops::Range;
use daqbuf_err as err;
use items_0::AsAnyMut;
use items_0::AsAnyRef;
use items_0::Empty;
use items_0::Extendable;
use items_0::TypeName;
use items_0::WithLen;
use items_0::apitypes::ToUserFacingApiType;
use items_0::apitypes::UserApiType;
use items_0::collect_s::CollectableDyn;
@@ -18,12 +24,6 @@ use items_0::merge::DrainIntoNewResult;
use items_0::merge::MergeableTy;
use items_0::streamitem::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID;
use items_0::timebin::BinningggContainerEventsDyn;
use items_0::AsAnyMut;
use items_0::AsAnyRef;
use items_0::Empty;
use items_0::Extendable;
use items_0::TypeName;
use items_0::WithLen;
use netpod::TsNano;
use serde::Deserialize;
use serde::Serialize;
@@ -70,7 +70,7 @@ impl ConnStatusEvent {
}
impl ByteEstimate for ConnStatusEvent {
fn byte_estimate(&self) -> u64 {
fn byte_estimate(&self) -> u32 {
// TODO magic number, but maybe good enough
32
}
@@ -147,7 +147,7 @@ impl ChannelStatusEvent {
}
impl ByteEstimate for ChannelStatusEvent {
fn byte_estimate(&self) -> u64 {
fn byte_estimate(&self) -> u32 {
// TODO magic number, but maybe good enough
32
}
@@ -221,26 +221,26 @@ mod serde_channel_events {
use crate::binning::container_events::PulsedVal;
use crate::channelevents::ConnStatusEvent;
use crate::log::*;
use items_0::subfr::SubFrId;
use items_0::subfr::is_container_events;
use items_0::subfr::is_pulsed_subfr;
use items_0::subfr::is_vec_subfr;
use items_0::subfr::subfr_scalar_type;
use items_0::subfr::SubFrId;
use items_0::timebin::BinningggContainerEventsDyn;
use netpod::EnumVariant;
use serde::Deserialize;
use serde::Deserializer;
use serde::Serialize;
use serde::Serializer;
use serde::de;
use serde::de::EnumAccess;
use serde::de::VariantAccess;
use serde::de::Visitor;
use serde::ser::SerializeSeq;
use serde::Deserialize;
use serde::Deserializer;
use serde::Serialize;
use serde::Serializer;
use std::cell::RefCell;
use std::fmt;
macro_rules! trace_serde { ($($arg:expr),*) => ( if true { trace!($($arg),*); }) }
macro_rules! trace_serde { ($($arg:expr),*) => ( if false { trace!($($arg),*); }) }
type C01<T> = ContainerEvents<T>;
type C02<T> = ContainerEvents<Vec<T>>;
@@ -440,7 +440,7 @@ mod serde_channel_events {
ret
}
fn visit_map<A>(self, map: A) -> Result<Self::Value, A::Error>
fn visit_map<A>(self, _map: A) -> Result<Self::Value, A::Error>
where
A: de::MapAccess<'de>,
{
@@ -595,19 +595,19 @@ mod test_channel_events_serde {
use crate::framable::Framable;
use crate::inmem::InMemoryFrame;
use crate::log::*;
use bincode::DefaultOptions;
use bincode::config::FixintEncoding;
use bincode::config::LittleEndian;
use bincode::config::RejectTrailing;
use bincode::config::WithOtherEndian;
use bincode::config::WithOtherIntEncoding;
use bincode::config::WithOtherTrailing;
use bincode::DefaultOptions;
use items_0::bincode;
use items_0::streamitem::sitem_data;
use items_0::streamitem::Sitemty;
use items_0::timebin::BinningggContainerEventsDyn;
use items_0::Appendable;
use items_0::Empty;
use items_0::bincode;
use items_0::streamitem::Sitemty;
use items_0::streamitem::sitem_data;
use items_0::timebin::BinningggContainerEventsDyn;
use netpod::TsNano;
use netpod::UnsupEvt;
use serde::Deserialize;
@@ -758,7 +758,7 @@ impl WithLen for ChannelEvents {
}
impl ByteEstimate for ChannelEvents {
fn byte_estimate(&self) -> u64 {
fn byte_estimate(&self) -> u32 {
match self {
ChannelEvents::Events(k) => k.byte_estimate(),
ChannelEvents::Status(k) => match k {
@@ -838,11 +838,7 @@ impl MergeableTy for ChannelEvents {
ChannelEvents::Events(k) => k.find_lowest_index_gt(ts),
ChannelEvents::Status(k) => {
if let Some(k) = k {
if k.ts > ts {
Some(0)
} else {
None
}
if k.ts > ts { Some(0) } else { None }
} else {
None
}
@@ -855,11 +851,7 @@ impl MergeableTy for ChannelEvents {
ChannelEvents::Events(k) => k.find_lowest_index_ge(ts),
ChannelEvents::Status(k) => {
if let Some(k) = k {
if k.ts >= ts {
Some(0)
} else {
None
}
if k.ts >= ts { Some(0) } else { None }
} else {
None
}
@@ -872,11 +864,7 @@ impl MergeableTy for ChannelEvents {
ChannelEvents::Events(k) => k.find_highest_index_lt(ts),
ChannelEvents::Status(k) => {
if let Some(k) = k {
if k.ts < ts {
Some(0)
} else {
None
}
if k.ts < ts { Some(0) } else { None }
} else {
None
}
@@ -981,7 +969,7 @@ impl WithLen for ChannelEventsCollector {
}
impl ByteEstimate for ChannelEventsCollector {
fn byte_estimate(&self) -> u64 {
fn byte_estimate(&self) -> u32 {
self.coll.as_ref().map_or(0, |x| x.byte_estimate())
}
}
@@ -1044,7 +1032,10 @@ impl ToUserFacingApiType for ChannelEvents {
fn into_user_facing_api_type(self) -> Box<dyn UserApiType> {
match self {
ChannelEvents::Events(x) => x.into_user_facing_api_type_box(),
ChannelEvents::Status(x) => Box::new(items_0::apitypes::EmptyStruct::new()),
ChannelEvents::Status(_) => {
// TODO
Box::new(items_0::apitypes::EmptyStruct::new())
}
}
}

View File

@@ -183,8 +183,8 @@ impl WithLen for EventFull {
}
impl ByteEstimate for EventFull {
fn byte_estimate(&self) -> u64 {
self.len() as u64 * (64 + self.entry_payload_max)
fn byte_estimate(&self) -> u32 {
self.len() as u32 * (64 + self.entry_payload_max as u32)
}
}

View File

@@ -25,13 +25,14 @@ pub const INMEM_FRAME_HEAD: usize = 20;
pub const INMEM_FRAME_FOOT: usize = 4;
pub const INMEM_FRAME_MAGIC: u32 = 0xc6c3b73d;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "ItemFramable")]
pub enum Error {
Msg(String),
DummyError,
Frame(#[from] crate::frame::Error),
}
autoerr::create_error_v1!(
name(Error, "ItemFramable"),
enum variants {
Msg(String),
DummyError,
Frame(#[from] crate::frame::Error),
},
);
struct ErrMsg<E>(E)
where

View File

@@ -5,25 +5,26 @@ use crate::framable::INMEM_FRAME_HEAD;
use crate::framable::INMEM_FRAME_MAGIC;
use crate::inmem::InMemoryFrame;
use crate::log::*;
use bincode::DefaultOptions;
use bincode::config::FixintEncoding;
use bincode::config::LittleEndian;
use bincode::config::RejectTrailing;
use bincode::config::WithOtherEndian;
use bincode::config::WithOtherIntEncoding;
use bincode::config::WithOtherTrailing;
use bincode::DefaultOptions;
use bytes::BufMut;
use bytes::BytesMut;
use core::fmt;
use daqbuf_err as err;
use items_0::bincode;
use items_0::streamitem::LogItem;
use items_0::streamitem::StatsItem;
use items_0::streamitem::ERROR_FRAME_TYPE_ID;
use items_0::streamitem::LOG_FRAME_TYPE_ID;
use items_0::streamitem::LogItem;
use items_0::streamitem::RANGE_COMPLETE_FRAME_TYPE_ID;
use items_0::streamitem::STATS_FRAME_TYPE_ID;
use items_0::streamitem::StatsItem;
use items_0::streamitem::TERM_FRAME_TYPE_ID;
use netpod::log;
use serde::Serialize;
use std::any;
use std::io;
@@ -115,6 +116,7 @@ where
rmp_serde::to_vec_named(&item).map_err(Error::from)
}
#[allow(unused)]
fn msgpack_erased_to_vec<T>(item: T) -> Result<Vec<u8>, Error>
where
T: erased_serde::Serialize,
@@ -149,6 +151,7 @@ where
})
}
#[allow(unused)]
fn postcard_erased_to_vec<T>(item: T) -> Result<Vec<u8>, Error>
where
T: erased_serde::Serialize + fmt::Debug,
@@ -210,6 +213,7 @@ where
})
}
#[allow(unused)]
fn json_erased_to_vec<T>(item: T) -> Result<Vec<u8>, Error>
where
T: erased_serde::Serialize + fmt::Debug,
@@ -260,7 +264,9 @@ where
}
}
pub fn encode_erased_to_vec<T>(item: T) -> Result<Vec<u8>, Error>
// TODO check if better option
#[allow(unused)]
fn encode_erased_to_vec_old<T>(item: T) -> Result<Vec<u8>, Error>
where
T: erased_serde::Serialize + fmt::Debug,
{
@@ -296,6 +302,9 @@ where
if enc.len() > u32::MAX as usize {
return Err(Error::TooLongPayload(enc.len()));
}
if enc.len() > 1024 * 1024 * 20 {
log::debug!("make_frame_2 over weight {} kB", enc.len() / 1024);
}
let mut h = crc32fast::Hasher::new();
h.update(&enc);
let payload_crc = h.finalize();

View File

@@ -1,19 +1,24 @@
use crate::framable::INMEM_FRAME_FOOT;
use crate::framable::INMEM_FRAME_HEAD;
use crate::framable::INMEM_FRAME_MAGIC;
use crate::log::*;
use crate::log;
use bytes::Bytes;
use std::fmt;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "InMemoryFrameError")]
pub enum Error {
LessThanHeader,
TryFromSlice(#[from] std::array::TryFromSliceError),
BadMagic(u32),
HugeFrame(u32),
BadCrc,
}
macro_rules! error { ($($arg:expr),*) => ( if true { log::error!($($arg),*); }) }
macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); }) }
autoerr::create_error_v1!(
name(Error, "InMemoryFrameError"),
enum variants {
LessThanHeader,
TryFromSlice(#[from] std::array::TryFromSliceError),
BadMagic(u32),
HugeFrame(u32),
BadCrc,
},
);
pub enum ParseResult<T> {
NotEnoughData(usize),
@@ -56,6 +61,7 @@ impl InMemoryFrame {
if magic != INMEM_FRAME_MAGIC {
return Err(Error::BadMagic(magic));
}
debug!("frame len {:10}", len);
if len > 1024 * 1024 * 50 {
return Err(Error::HugeFrame(len));
}

View File

@@ -4,18 +4,17 @@ mod test;
use crate::log::*;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::container::ByteEstimate;
use items_0::merge::DrainIntoDstResult;
use items_0::merge::DrainIntoNewResult;
use items_0::merge::MergeableTy;
use items_0::on_sitemty_data;
use items_0::streamitem::sitem_data;
use items_0::streamitem::sitem_err2_from_string;
use items_0::streamitem::LogItem;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::SitemErrTy;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::streamitem::sitem_data;
use items_0::streamitem::sitem_err2_from_string;
use netpod::TsNano;
use std::collections::VecDeque;
use std::fmt;
@@ -24,26 +23,27 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
const OUT_MAX_BYTES: u64 = 1024 * 200;
const OUT_MAX_BYTES: u32 = 1024 * 1024 * 20;
const DO_DETECT_NON_MONO: bool = true;
macro_rules! trace2 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace2 { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) }
macro_rules! trace3 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace3 { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) }
macro_rules! trace4 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace4 { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) }
macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_emit { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) }
#[derive(Debug, thiserror::Error)]
#[cstm(name = "MergerError")]
pub enum Error {
NoPendingButMissing,
Input(SitemErrTy),
ShouldFindTsMin,
ItemShouldHaveTsMax,
PartialPathDrainedAllItems,
}
autoerr::create_error_v1!(
name(Error, "MergerError"),
enum variants {
NoPendingButMissing,
Input(SitemErrTy),
ShouldFindTsMin,
ItemShouldHaveTsMax,
PartialPathDrainedAllItems,
},
);
type MergeInp<T> = Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>;
@@ -382,15 +382,11 @@ where
|| last_emit
{
if o.len() > 2 * self.out_max_len {
debug!(
"MERGER OVERLENGTH ITEM {} vs {}",
o.len(),
self.out_max_len
);
debug!("over length item {} vs {}", o.len(), self.out_max_len);
}
if o.byte_estimate() > 2 * OUT_MAX_BYTES {
debug!(
"MERGER OVERWEIGHT ITEM {} vs {}",
"over weight item {} vs {}",
o.byte_estimate(),
OUT_MAX_BYTES
);

View File

@@ -1,14 +1,6 @@
use crate::binning::container_events::ContainerEvents;
use netpod::range::evrange::NanoRange;
use netpod::TsNano;
use std::pin::Pin;
fn boxed_conts<S>(inp: S) -> Pin<Box<dyn Iterator<Item = <S as Iterator>::Item> + Send>>
where
S: Iterator + Send + 'static,
{
Box::pin(inp)
}
use netpod::range::evrange::NanoRange;
pub fn new_events_gen_dim0_f32_v00(range: NanoRange) -> impl Iterator<Item = ContainerEvents<f32>> {
let dt = 1000 * 1000 * 10;