Customize the log out

This commit is contained in:
Dominik Werder
2024-07-03 10:20:39 +02:00
parent 21259e6591
commit 584d977675
15 changed files with 329 additions and 103 deletions

View File

@@ -108,15 +108,15 @@ fn decide_best_matching_index(range: (TsMs, TsMs), rows: &[TsMs]) -> Result<usiz
Duration::from_millis(0)
} else if x.0 <= range.0 {
if x.1 >= range.1 {
Duration::from_millis((range.1.clone() - range.0.clone()).to_u64())
Duration::from_millis((range.1.clone() - range.0.clone()).ms())
} else {
Duration::from_millis((x.1.clone() - range.0.clone()).to_u64())
Duration::from_millis((x.1.clone() - range.0.clone()).ms())
}
} else {
if x.1 >= range.1 {
Duration::from_millis((range.1.clone() - x.0.clone()).to_u64())
Duration::from_millis((range.1.clone() - x.0.clone()).ms())
} else {
Duration::from_millis((x.1.clone() - x.0.clone()).to_u64())
Duration::from_millis((x.1.clone() - x.0.clone()).ms())
}
};
dur

View File

@@ -54,7 +54,12 @@ pub trait TimeBinnerTy: fmt::Debug + Send + Unpin {
pub trait TimeBinnableTy: fmt::Debug + WithLen + Send + Sized {
type TimeBinner: TimeBinnerTy<Input = Self>;
fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Self::TimeBinner;
fn time_binner_new(
&self,
binrange: BinnedRangeEnum,
do_time_weight: bool,
emit_empty_bins: bool,
) -> Self::TimeBinner;
}
/// Data in time-binned form.
@@ -95,12 +100,17 @@ impl RangeOverlapInfo for Box<dyn TimeBinned> {
}
impl TimeBinnable for Box<dyn TimeBinned> {
fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box<dyn TimeBinner> {
todo!()
fn time_binner_new(
&self,
binrange: BinnedRangeEnum,
do_time_weight: bool,
emit_empty_bins: bool,
) -> Box<dyn TimeBinner> {
self.as_ref().time_binner_new(binrange, do_time_weight, emit_empty_bins)
}
fn to_box_to_json_result(&self) -> Box<dyn ToJsonResult> {
todo!()
self.as_ref().to_box_to_json_result()
}
}
@@ -135,7 +145,12 @@ pub trait TimeBinnable:
fmt::Debug + WithLen + RangeOverlapInfo + Collectable + Any + AsAnyRef + AsAnyMut + Send
{
// TODO implementors may fail if edges contain not at least 2 entries.
fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box<dyn TimeBinner>;
fn time_binner_new(
&self,
binrange: BinnedRangeEnum,
do_time_weight: bool,
emit_empty_bins: bool,
) -> Box<dyn TimeBinner>;
// TODO just a helper for the empty result.
fn to_box_to_json_result(&self) -> Box<dyn ToJsonResult>;
}
@@ -161,8 +176,13 @@ impl RangeOverlapInfo for Box<dyn TimeBinnable> {
}
impl TimeBinnable for Box<dyn TimeBinnable> {
fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box<dyn TimeBinner> {
todo!()
fn time_binner_new(
&self,
binrange: BinnedRangeEnum,
do_time_weight: bool,
emit_empty_bins: bool,
) -> Box<dyn TimeBinner> {
self.as_ref().time_binner_new(binrange, do_time_weight, emit_empty_bins)
}
fn to_box_to_json_result(&self) -> Box<dyn ToJsonResult> {
@@ -185,8 +205,13 @@ impl RangeOverlapInfo for Box<dyn Events> {
}
impl TimeBinnable for Box<dyn Events> {
fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box<dyn TimeBinner> {
TimeBinnable::time_binner_new(self.as_ref(), binrange, do_time_weight)
fn time_binner_new(
&self,
binrange: BinnedRangeEnum,
do_time_weight: bool,
emit_empty_bins: bool,
) -> Box<dyn TimeBinner> {
TimeBinnable::time_binner_new(self.as_ref(), binrange, do_time_weight, emit_empty_bins)
}
fn to_box_to_json_result(&self) -> Box<dyn ToJsonResult> {
@@ -211,6 +236,7 @@ pub struct TimeBinnerDynStruct {
binrange: BinnedRangeEnum,
do_time_weight: bool,
binner: Option<Box<dyn TimeBinner>>,
emit_empty_bins: bool,
}
impl TimeBinnerDynStruct {
@@ -218,11 +244,17 @@ impl TimeBinnerDynStruct {
std::any::type_name::<Self>()
}
pub fn new(binrange: BinnedRangeEnum, do_time_weight: bool, binner: Box<dyn TimeBinner>) -> Self {
pub fn new(
binrange: BinnedRangeEnum,
do_time_weight: bool,
emit_empty_bins: bool,
binner: Box<dyn TimeBinner>,
) -> Self {
Self {
binrange,
do_time_weight,
binner: Some(binner),
emit_empty_bins,
}
}
}
@@ -238,6 +270,7 @@ impl TimeBinnerTy for TimeBinnerDynStruct {
item,
self.binrange.clone(),
self.do_time_weight,
self.emit_empty_bins,
)));
}
self.binner.as_mut().unwrap().as_mut().ingest(item.as_mut())
@@ -329,6 +362,7 @@ impl TimeBinner for TimeBinnerDynStruct {
pub struct TimeBinnerDynStruct2 {
binrange: BinnedRangeEnum,
do_time_weight: bool,
emit_empty_bins: bool,
binner: Option<Box<dyn TimeBinner>>,
}
@@ -337,10 +371,16 @@ impl TimeBinnerDynStruct2 {
std::any::type_name::<Self>()
}
pub fn new(binrange: BinnedRangeEnum, do_time_weight: bool, binner: Box<dyn TimeBinner>) -> Self {
pub fn new(
binrange: BinnedRangeEnum,
do_time_weight: bool,
emit_empty_bins: bool,
binner: Box<dyn TimeBinner>,
) -> Self {
Self {
binrange,
do_time_weight,
emit_empty_bins,
binner: Some(binner),
}
}
@@ -357,6 +397,7 @@ impl TimeBinnerTy for TimeBinnerDynStruct2 {
item,
self.binrange.clone(),
self.do_time_weight,
self.emit_empty_bins,
)));
}
self.binner
@@ -451,20 +492,32 @@ impl TimeBinner for TimeBinnerDynStruct2 {
impl TimeBinnableTy for Box<dyn TimeBinnable> {
type TimeBinner = TimeBinnerDynStruct;
fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Self::TimeBinner {
let binner = self.as_ref().time_binner_new(binrange.clone(), do_time_weight);
TimeBinnerDynStruct::new(binrange, do_time_weight, binner)
fn time_binner_new(
&self,
binrange: BinnedRangeEnum,
do_time_weight: bool,
emit_empty_bins: bool,
) -> Self::TimeBinner {
let binner = self
.as_ref()
.time_binner_new(binrange.clone(), do_time_weight, emit_empty_bins);
TimeBinnerDynStruct::new(binrange, do_time_weight, emit_empty_bins, binner)
}
}
impl TimeBinnableTy for Box<dyn TimeBinned> {
type TimeBinner = TimeBinnerDynStruct2;
fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Self::TimeBinner {
fn time_binner_new(
&self,
binrange: BinnedRangeEnum,
do_time_weight: bool,
emit_empty_bins: bool,
) -> Self::TimeBinner {
let binner = self
.as_time_binnable_ref()
.time_binner_new(binrange.clone(), do_time_weight);
TimeBinnerDynStruct2::new(binrange, do_time_weight, binner)
.time_binner_new(binrange.clone(), do_time_weight, emit_empty_bins);
TimeBinnerDynStruct2::new(binrange, do_time_weight, emit_empty_bins, binner)
}
}

View File

@@ -99,6 +99,7 @@ pub struct BinnedCollected {
scalar_type: ScalarType,
shape: Shape,
do_time_weight: bool,
emit_empty_bins: bool,
did_timeout: bool,
range_final: bool,
coll: Option<Box<dyn Collector>>,
@@ -116,6 +117,7 @@ impl BinnedCollected {
scalar_type: ScalarType,
shape: Shape,
do_time_weight: bool,
emit_empty_bins: bool,
//transformer: &dyn Transformer,
deadline: Instant,
inp: Pin<Box<dyn ChannelEventsInput>>,
@@ -126,6 +128,7 @@ impl BinnedCollected {
scalar_type,
shape,
do_time_weight,
emit_empty_bins,
did_timeout: false,
range_final: false,
coll: None,
@@ -143,9 +146,11 @@ impl BinnedCollected {
RangeCompletableItem::Data(k) => match k {
ChannelEvents::Events(mut events) => {
if self.binner.is_none() {
let bb = events
.as_time_binnable_mut()
.time_binner_new(self.binrange.clone(), self.do_time_weight);
let bb = events.as_time_binnable_mut().time_binner_new(
self.binrange.clone(),
self.do_time_weight,
self.emit_empty_bins,
);
self.binner = Some(bb);
}
let binner = self.binner.as_mut().unwrap();

View File

@@ -647,8 +647,14 @@ impl<NTY: ScalarOps> TimeBinnableTypeAggregator for BinsDim0Aggregator<NTY> {
}
impl<NTY: ScalarOps> TimeBinnable for BinsDim0<NTY> {
fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box<dyn TimeBinner> {
fn time_binner_new(
&self,
binrange: BinnedRangeEnum,
do_time_weight: bool,
emit_empty_bins: bool,
) -> Box<dyn TimeBinner> {
// TODO get rid of unwrap
// TODO respect emit_empty_bins
let ret = BinsDim0TimeBinner::<NTY>::new(binrange, do_time_weight).unwrap();
Box::new(ret)
}
@@ -1016,7 +1022,9 @@ fn bins_timebin_fill_empty_00() {
bin_cnt: 5,
});
let do_time_weight = true;
let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight);
let mut binner = bins
.as_time_binnable_ref()
.time_binner_new(binrange, do_time_weight, false);
binner.ingest(&mut bins);
binner.append_empty_until_end();
let ready = binner.bins_ready();
@@ -1038,7 +1046,9 @@ fn bins_timebin_fill_empty_01() {
bin_cnt: 5,
});
let do_time_weight = true;
let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight);
let mut binner = bins
.as_time_binnable_ref()
.time_binner_new(binrange, do_time_weight, false);
binner.ingest(&mut bins);
binner.push_in_progress(true);
binner.append_empty_until_end();
@@ -1061,7 +1071,9 @@ fn bins_timebin_push_empty_00() {
bin_cnt: 5,
});
let do_time_weight = true;
let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight);
let mut binner = bins
.as_time_binnable_ref()
.time_binner_new(binrange, do_time_weight, false);
binner.ingest(&mut bins);
binner.push_in_progress(true);
let ready = binner.bins_ready();
@@ -1083,7 +1095,9 @@ fn bins_timebin_push_empty_01() {
bin_cnt: 5,
});
let do_time_weight = true;
let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight);
let mut binner = bins
.as_time_binnable_ref()
.time_binner_new(binrange, do_time_weight, false);
binner.ingest(&mut bins);
binner.push_in_progress(true);
binner.push_in_progress(true);
@@ -1109,7 +1123,9 @@ fn bins_timebin_ingest_only_before() {
bin_cnt: 5,
});
let do_time_weight = true;
let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight);
let mut binner = bins
.as_time_binnable_ref()
.time_binner_new(binrange, do_time_weight, false);
binner.ingest(&mut bins);
binner.push_in_progress(true);
let ready = binner.bins_ready();
@@ -1132,7 +1148,9 @@ fn bins_timebin_ingest_00() {
bin_cnt: 5,
});
let do_time_weight = true;
let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight);
let mut binner = bins
.as_time_binnable_ref()
.time_binner_new(binrange, do_time_weight, false);
binner.ingest(&mut bins);
binner.push_in_progress(true);
let ready = binner.bins_ready();
@@ -1157,7 +1175,9 @@ fn bins_timebin_ingest_continuous_00() {
bins.push(SEC * 20, SEC * 21, 3, 70, 94, 82.);
//bins.push(SEC * 21, SEC * 22, 5, 71, 93, 86.);
//bins.push(SEC * 23, SEC * 24, 6, 72, 92, 81.);
let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight);
let mut binner = bins
.as_time_binnable_ref()
.time_binner_new(binrange, do_time_weight, false);
binner.ingest(&mut bins);
//binner.push_in_progress(true);
let ready = binner.bins_ready();

View File

@@ -607,7 +607,13 @@ impl<NTY: ScalarOps> TimeBinnableTypeAggregator for BinsXbinDim0Aggregator<NTY>
}
impl<NTY: ScalarOps> TimeBinnable for BinsXbinDim0<NTY> {
fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box<dyn TimeBinner> {
fn time_binner_new(
&self,
binrange: BinnedRangeEnum,
do_time_weight: bool,
emit_empty_bins: bool,
) -> Box<dyn TimeBinner> {
// TODO respect emit_empty_bins
let ret = BinsXbinDim0TimeBinner::<NTY>::new(binrange, do_time_weight);
Box::new(ret)
}

View File

@@ -820,8 +820,13 @@ impl RangeOverlapInfo for ChannelEvents {
}
impl TimeBinnable for ChannelEvents {
fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box<dyn TimeBinner> {
let ret = <ChannelEvents as TimeBinnableTy>::time_binner_new(&self, binrange, do_time_weight);
fn time_binner_new(
&self,
binrange: BinnedRangeEnum,
do_time_weight: bool,
emit_empty_bins: bool,
) -> Box<dyn TimeBinner> {
let ret = <ChannelEvents as TimeBinnableTy>::time_binner_new(&self, binrange, do_time_weight, emit_empty_bins);
Box::new(ret)
}
@@ -997,6 +1002,7 @@ pub struct ChannelEventsTimeBinner {
// here we would rather require a simplified current state for binning purpose.
binrange: BinnedRangeEnum,
do_time_weight: bool,
emit_empty_bins: bool,
conn_state: ConnStatus,
binner: Option<Box<dyn TimeBinner>>,
}
@@ -1012,6 +1018,7 @@ impl fmt::Debug for ChannelEventsTimeBinner {
fmt.debug_struct(Self::type_name())
.field("binrange", &self.binrange)
.field("do_time_weight", &self.do_time_weight)
.field("emit_empty_bins", &self.emit_empty_bins)
.field("conn_state", &self.conn_state)
.finish()
}
@@ -1028,7 +1035,7 @@ impl TimeBinnerTy for ChannelEventsTimeBinner {
match item {
ChannelEvents::Events(item) => {
if self.binner.is_none() {
let binner = item.time_binner_new(self.binrange.clone(), self.do_time_weight);
let binner = item.time_binner_new(self.binrange.clone(), self.do_time_weight, self.emit_empty_bins);
self.binner = Some(binner);
}
match self.binner.as_mut() {
@@ -1141,7 +1148,12 @@ impl TimeBinner for ChannelEventsTimeBinner {
impl TimeBinnableTy for ChannelEvents {
type TimeBinner = ChannelEventsTimeBinner;
fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Self::TimeBinner {
fn time_binner_new(
&self,
binrange: BinnedRangeEnum,
do_time_weight: bool,
emit_empty_bins: bool,
) -> Self::TimeBinner {
trace!("TimeBinnableTy for ChannelEvents make ChannelEventsTimeBinner");
// TODO probably wrong?
let (binner, status) = match self {
@@ -1151,6 +1163,7 @@ impl TimeBinnableTy for ChannelEvents {
ChannelEventsTimeBinner {
binrange,
do_time_weight,
emit_empty_bins,
conn_state: status,
binner,
}

View File

@@ -545,7 +545,7 @@ impl<STY: ScalarOps> TimeAggregatorCommonV0Trait for EventsDim0Aggregator<STY> {
&self.range
}
fn common_ingest_unweight_range(&mut self, item: &Self::Input, r: std::ops::Range<usize>) {
fn common_ingest_unweight_range(&mut self, item: &Self::Input, r: core::ops::Range<usize>) {
for (&ts, val) in item.tss.range(r.clone()).zip(item.values.range(r)) {
self.apply_event_unweight(val.clone());
self.count += 1;
@@ -560,7 +560,7 @@ impl<STY: ScalarOps> TimeAggregatorCommonV0Trait for EventsDim0Aggregator<STY> {
self.last_val = Some(item.values[j].clone());
}
fn common_ingest_range(&mut self, item: &Self::Input, r: std::ops::Range<usize>) {
fn common_ingest_range(&mut self, item: &Self::Input, r: core::ops::Range<usize>) {
let beg = self.range.beg_u64();
for (&ts, val) in item.tss.range(r.clone()).zip(item.values.range(r)) {
if ts > beg {
@@ -787,9 +787,14 @@ impl<STY: ScalarOps> TimeBinnableTypeAggregator for EventsDim0Aggregator<STY> {
}
impl<STY: ScalarOps> TimeBinnable for EventsDim0<STY> {
fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box<dyn TimeBinner> {
fn time_binner_new(
&self,
binrange: BinnedRangeEnum,
do_time_weight: bool,
emit_empty_bins: bool,
) -> Box<dyn TimeBinner> {
// TODO get rid of unwrap
let ret = EventsDim0TimeBinner::<STY>::new(binrange, do_time_weight).unwrap();
let ret = EventsDim0TimeBinner::<STY>::new(binrange, do_time_weight, emit_empty_bins).unwrap();
Box::new(ret)
}
@@ -1027,6 +1032,7 @@ pub struct EventsDim0TimeBinner<STY: ScalarOps> {
agg: EventsDim0Aggregator<STY>,
ready: Option<<EventsDim0Aggregator<STY> as TimeBinnableTypeAggregator>::Output>,
range_final: bool,
emit_empty_bins: bool,
}
impl<STY: ScalarOps> EventsDim0TimeBinner<STY> {
@@ -1034,11 +1040,12 @@ impl<STY: ScalarOps> EventsDim0TimeBinner<STY> {
any::type_name::<Self>()
}
pub fn new(binrange: BinnedRangeEnum, do_time_weight: bool) -> Result<Self, Error> {
pub fn new(binrange: BinnedRangeEnum, do_time_weight: bool, emit_empty_bins: bool) -> Result<Self, Error> {
trace!("{}::new binrange {:?}", Self::type_name(), binrange);
let rng = binrange
.range_at(0)
.ok_or_else(|| Error::with_msg_no_trace("empty binrange"))?;
trace!("{}::new rng {:?}", Self::type_name(), rng);
let agg = EventsDim0Aggregator::new(rng, do_time_weight);
let ret = Self {
binrange,
@@ -1047,6 +1054,7 @@ impl<STY: ScalarOps> EventsDim0TimeBinner<STY> {
agg,
ready: None,
range_final: false,
emit_empty_bins,
};
Ok(ret)
}
@@ -1121,6 +1129,10 @@ impl<STY: ScalarOps> TimeBinnerCommonV0Trait for EventsDim0TimeBinner<STY> {
}
impl<STY: ScalarOps> TimeBinner for EventsDim0TimeBinner<STY> {
fn ingest(&mut self, item: &mut dyn TimeBinnable) {
TimeBinnerCommonV0Func::ingest(self, item)
}
fn bins_ready_count(&self) -> usize {
TimeBinnerCommonV0Trait::common_bins_ready_count(self)
}
@@ -1132,10 +1144,6 @@ impl<STY: ScalarOps> TimeBinner for EventsDim0TimeBinner<STY> {
}
}
fn ingest(&mut self, item: &mut dyn TimeBinnable) {
TimeBinnerCommonV0Func::ingest(self, item)
}
fn push_in_progress(&mut self, push_empty: bool) {
TimeBinnerCommonV0Func::push_in_progress(self, push_empty)
}
@@ -1308,7 +1316,7 @@ fn binner_00() {
ev1.push(MS * 1200, 3, 1.2f32);
ev1.push(MS * 3200, 3, 3.2f32);
let binrange = BinnedRangeEnum::from_custom(TsNano::from_ns(SEC), 0, 10);
let mut binner = ev1.time_binner_new(binrange, true);
let mut binner = ev1.time_binner_new(binrange, true, false);
binner.ingest(ev1.as_time_binnable_mut());
eprintln!("{:?}", binner);
// TODO add actual asserts
@@ -1322,7 +1330,7 @@ fn binner_01() {
ev1.push(MS * 2100, 3, 2.1);
ev1.push(MS * 2300, 3, 2.3);
let binrange = BinnedRangeEnum::from_custom(TsNano::from_ns(SEC), 0, 10);
let mut binner = ev1.time_binner_new(binrange, true);
let mut binner = ev1.time_binner_new(binrange, true, false);
binner.ingest(ev1.as_time_binnable_mut());
eprintln!("{:?}", binner);
// TODO add actual asserts
@@ -1409,7 +1417,9 @@ fn events_timebin_ingest_continuous_00() {
let mut bins = EventsDim0::<u32>::empty();
bins.push(SEC * 20, 1, 20);
bins.push(SEC * 23, 2, 23);
let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight);
let mut binner = bins
.as_time_binnable_ref()
.time_binner_new(binrange, do_time_weight, false);
binner.ingest(&mut bins);
//binner.push_in_progress(true);
let ready = binner.bins_ready();

View File

@@ -742,7 +742,13 @@ impl<STY: ScalarOps> TimeBinnableTypeAggregator for EventsDim1Aggregator<STY> {
}
impl<STY: ScalarOps> TimeBinnable for EventsDim1<STY> {
fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box<dyn TimeBinner> {
fn time_binner_new(
&self,
binrange: BinnedRangeEnum,
do_time_weight: bool,
emit_empty_bins: bool,
) -> Box<dyn TimeBinner> {
// TODO respect emit_empty_bins
let ret = EventsDim1TimeBinner::<STY>::new(binrange, do_time_weight).unwrap();
Box::new(ret)
}

View File

@@ -613,7 +613,9 @@ where
&self,
binrange: BinnedRangeEnum,
do_time_weight: bool,
emit_empty_bins: bool,
) -> Box<dyn items_0::timebin::TimeBinner> {
// TODO respect emit_empty_bins
let ret = EventsXbinDim0TimeBinner::<STY>::new(binrange, do_time_weight).unwrap();
Box::new(ret)
}

View File

@@ -373,11 +373,13 @@ fn bin_00() {
let binrange = BinnedRangeEnum::covering_range(range.into(), 10).unwrap();
let deadline = Instant::now() + Duration::from_millis(4000);
let do_time_weight = true;
let emit_empty_bins = false;
let res = BinnedCollected::new(
binrange,
ScalarType::F32,
Shape::Scalar,
do_time_weight,
emit_empty_bins,
deadline,
Box::pin(stream),
)
@@ -421,11 +423,13 @@ fn bin_01() {
let stream = Box::pin(stream);
let deadline = Instant::now() + Duration::from_millis(4000);
let do_time_weight = true;
let emit_empty_bins = false;
let res = BinnedCollected::new(
binrange,
ScalarType::F32,
Shape::Scalar,
do_time_weight,
emit_empty_bins,
deadline,
Box::pin(stream),
)
@@ -481,8 +485,17 @@ fn binned_timeout_00() {
let timeout = Duration::from_millis(400);
let deadline = Instant::now() + timeout;
let do_time_weight = true;
let res =
BinnedCollected::new(binrange, ScalarType::F32, Shape::Scalar, do_time_weight, deadline, inp1).await?;
let emit_empty_bins = false;
let res = BinnedCollected::new(
binrange,
ScalarType::F32,
Shape::Scalar,
do_time_weight,
emit_empty_bins,
deadline,
inp1,
)
.await?;
let r2: &BinsDim0CollectedResult<f32> = res.result.as_any_ref().downcast_ref().expect("res seems wrong type");
eprintln!("rs: {r2:?}");
assert_eq!(SEC * r2.ts_anchor_sec(), TSBASE + SEC);

View File

@@ -12,14 +12,20 @@ use std::ops::Range;
#[allow(unused)]
macro_rules! trace_ingest {
($($arg:tt)*) => {};
($($arg:tt)*) => { trace!($($arg)*); };
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
#[allow(unused)]
macro_rules! trace_ingest_item {
($($arg:tt)*) => {};
($($arg:tt)*) => { trace!($($arg)*); };
($($arg:tt)*) => {
if true {
info!($($arg)*);
}
};
}
#[allow(unused)]
@@ -45,23 +51,15 @@ pub trait TimeBinnerCommonV0Trait {
pub struct TimeBinnerCommonV0Func {}
impl TimeBinnerCommonV0Func {
pub fn agg_ingest<B>(binner: &mut B, item: &mut <B as TimeBinnerCommonV0Trait>::Input)
where
B: TimeBinnerCommonV0Trait,
{
//self.agg.ingest(item);
<B as TimeBinnerCommonV0Trait>::common_agg_ingest(binner, item)
}
pub fn ingest<B>(binner: &mut B, item: &mut dyn TimeBinnable)
where
B: TimeBinnerCommonV0Trait,
{
let self_name = B::type_name();
trace_ingest_item!(
"TimeBinner for {} ingest agg.range {:?} item {:?}",
Self::type_name(),
self.agg.range(),
"TimeBinner for {} ingest common_range_current {:?} item {:?}",
self_name,
binner.common_range_current(),
item
);
if item.len() == 0 {
@@ -120,6 +118,14 @@ impl TimeBinnerCommonV0Func {
}
}
fn agg_ingest<B>(binner: &mut B, item: &mut <B as TimeBinnerCommonV0Trait>::Input)
where
B: TimeBinnerCommonV0Trait,
{
//self.agg.ingest(item);
<B as TimeBinnerCommonV0Trait>::common_agg_ingest(binner, item)
}
pub fn push_in_progress<B>(binner: &mut B, push_empty: bool)
where
B: TimeBinnerCommonV0Trait,
@@ -200,20 +206,21 @@ impl ChooseIndicesForTimeBinEvents {
}
pub fn choose_timeweight(beg: u64, end: u64, tss: &VecDeque<u64>) -> (Option<usize>, usize, usize) {
let self_name = "choose_timeweight";
// TODO improve via binary search.
let mut one_before = None;
let mut j = 0;
let mut k = tss.len();
for (i1, &ts) in tss.iter().enumerate() {
if ts >= end {
trace_ingest!("{self_name} ingest {:6} {:20} {:10?} AFTER", i1, ts, val);
trace_ingest!("{self_name} ingest {:6} {:20} AFTER", i1, ts);
// TODO count all the ignored events for stats
k = i1;
break;
} else if ts >= beg {
trace_ingest!("{self_name} ingest {:6} {:20} {:10?} INSIDE", i1, ts, val);
trace_ingest!("{self_name} ingest {:6} {:20} INSIDE", i1, ts);
} else {
trace_ingest!("{self_name} ingest {:6} {:20} {:10?} BEFORE", i1, ts, val);
trace_ingest!("{self_name} ingest {:6} {:20} BEFORE", i1, ts);
one_before = Some(i1);
j = i1 + 1;
}
@@ -223,7 +230,7 @@ impl ChooseIndicesForTimeBinEvents {
}
pub trait TimeAggregatorCommonV0Trait {
type Input: RangeOverlapInfo + ChooseIndicesForTimeBin + 'static;
type Input: WithLen + RangeOverlapInfo + ChooseIndicesForTimeBin + 'static;
type Output: WithLen + Empty + AppendEmptyBin + HasNonemptyFirstBin + 'static;
fn type_name() -> &'static str;
fn common_range_current(&self) -> &SeriesRange;
@@ -240,10 +247,12 @@ impl TimeAggregatorCommonV0Func {
B: TimeAggregatorCommonV0Trait,
{
let self_name = B::type_name();
// TODO
let items_seen = 777;
trace_ingest!(
"{self_name}::ingest_unweight item len {} items_seen {}",
item.len(),
self.items_seen
items_seen
);
let rng = B::common_range_current(binner);
if rng.is_time() {
@@ -265,10 +274,12 @@ impl TimeAggregatorCommonV0Func {
B: TimeAggregatorCommonV0Trait,
{
let self_name = B::type_name();
// TODO
let items_seen = 777;
trace_ingest!(
"{self_name}::ingest_time_weight item len {} items_seen {}",
item.len(),
self.items_seen
items_seen
);
let rng = B::common_range_current(binner);
if rng.is_time() {

View File

@@ -6,8 +6,56 @@ pub mod status;
pub mod streamext;
pub mod ttl;
pub mod log_macros {
#[allow(unused)]
#[macro_export]
macro_rules! trace {
($($arg:tt)*) => {
eprintln!($($arg)*);
};
}
#[allow(unused)]
#[macro_export]
macro_rules! debug {
($($arg:tt)*) => {
eprintln!($($arg)*);
};
}
#[allow(unused)]
#[macro_export]
macro_rules! info {
($($arg:tt)*) => {
eprintln!($($arg)*);
};
}
#[allow(unused)]
#[macro_export]
macro_rules! warn {
($($arg:tt)*) => {
eprintln!($($arg)*);
};
}
#[allow(unused)]
#[macro_export]
macro_rules! error {
($($arg:tt)*) => {
eprintln!($($arg)*);
};
}
}
pub mod log {
pub use tracing::{self, debug, error, event, info, span, trace, warn, Level};
pub use tracing::{self, event, span, Level};
pub use tracing::{debug, error, info, trace, warn};
}
pub mod log2 {
pub use crate::{debug, error, info, trace, warn};
pub use tracing::{self, event, span, Level};
}
use crate::log::*;
@@ -1500,8 +1548,8 @@ impl DtNano {
Self(ns)
}
pub const fn from_ms(ns: u64) -> Self {
Self(1000000 * ns)
pub const fn from_ms(ms: u64) -> Self {
Self(1000000 * ms)
}
pub const fn ns(&self) -> u64 {
@@ -2544,11 +2592,11 @@ impl fmt::Display for TsMs {
}
}
impl std::ops::Sub for TsMs {
type Output = TsMs;
impl core::ops::Sub for TsMs {
type Output = DtMs;
fn sub(self, rhs: Self) -> Self::Output {
Self(self.0.saturating_sub(rhs.0))
DtMs(self.0.saturating_sub(rhs.0))
}
}

View File

@@ -83,10 +83,11 @@ where
}
fn process_item(&mut self, mut item: T) -> () {
let emit_empty_bins = true;
trace2!("process_item {item:?}");
if self.binner.is_none() {
trace!("process_item call time_binner_new");
let binner = item.time_binner_new(self.range.clone(), self.do_time_weight);
let binner = item.time_binner_new(self.range.clone(), self.do_time_weight, emit_empty_bins);
self.binner = Some(binner);
}
let binner = self.binner.as_mut().unwrap();

View File

@@ -13,9 +13,18 @@ futures-util = "0.3.28"
tracing = "0.1.40"
tracing-log = "0.2.0"
tracing-subscriber = { version = "0.3.18", features = ["fmt", "time"] }
#tracing-loki = { version = "0.2.1", default-features = false, features = ["compat-0-2-1"] }
console-subscriber = { version = "0.2.0" }
time = { version = "0.3", features = ["formatting"] }
backtrace = "0.3.71"
chrono = "0.4.38"
err = { path = "../err" }
[features]
with-console = []
#console-subscriber = { version = "0.3.0" }
with-loki = []
#tracing-loki = { version = "0.2.1", default-features = false, features = ["compat-0-2-1"] }
# The tracing subscriber env filter would require its feature "env-filter" enabled
with-env-filter = []

View File

@@ -3,7 +3,7 @@ pub mod formatter;
pub use tokio;
use crate::log::*;
use console_subscriber::ConsoleLayer;
// use console_subscriber::ConsoleLayer;
use err::Error;
use std::fmt;
use std::future::Future;
@@ -122,6 +122,7 @@ where
L: tracing_subscriber::Layer<S>,
S: tracing::Subscriber,
{
#[allow(unused)]
fn new(name: String, inner: L) -> Self {
Self {
name,
@@ -138,6 +139,15 @@ where
{
}
fn collect_env_list(env: &str) -> Vec<String> {
std::env::var(env)
.unwrap_or(String::new())
.split(",")
.map(str::trim)
.map(ToString::to_string)
.collect()
}
fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
@@ -150,30 +160,42 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
// Only async console
// let console_layer = console_subscriber::spawn();
// let console_layer = ConsoleLayer::builder().with_default_env().init();
let console_layer = ConsoleLayer::builder().spawn();
tracing_subscriber::registry()
.with(console_layer)
.with(tracing_subscriber::fmt::layer().with_ansi(false))
// .with(other_layer)
.init();
console_subscriber::init();
#[cfg(feature = "with-console")]
{
let console_layer = ConsoleLayer::builder().spawn();
tracing_subscriber::registry()
.with(console_layer)
.with(tracing_subscriber::fmt::layer().with_ansi(false))
// .with(other_layer)
.init();
console_subscriber::init();
}
} else {
// Logging setup
#[cfg(feature = "with-env-filter")]
let filter_1 = tracing_subscriber::EnvFilter::builder()
.with_default_directive(tracing::metadata::LevelFilter::INFO.into())
.from_env()
.map_err(|e| Error::with_msg_no_trace(format!("can not build tracing env filter {e}")))?;
#[cfg(feature = "with-env-filter")]
let filter_2 = tracing_subscriber::EnvFilter::builder()
.with_env_var("RUST_LOG_2")
.with_default_directive(tracing::metadata::LevelFilter::INFO.into())
.from_env()
.map_err(|e| Error::with_msg_no_trace(format!("can not build tracing env filter {e}")))?;
/*let filter_3 = tracing_subscriber::filter::dynamic_filter_fn(|meta, ctx| {
if true {
return false;
}
if *meta.level() <= tracing::Level::TRACE {
if ["httpret", "scyllaconn"].contains(&meta.target()) {
let tracing_debug = collect_env_list("TRACING_DEBUG");
let tracing_trace = collect_env_list("TRACING_TRACE");
let filter_3 = tracing_subscriber::filter::DynFilterFn::new(move |meta, ctx| {
if *meta.level() >= tracing::Level::TRACE {
let mut target_match = false;
for e in &tracing_trace {
if meta.target().starts_with(e) {
target_match = true;
break;
}
}
if target_match {
let mut sr = ctx.lookup_current();
let mut allow = false;
while let Some(g) = sr {
@@ -188,8 +210,15 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
} else {
false
}
} else if *meta.level() <= tracing::Level::DEBUG {
if ["httpret", "scyllaconn", "items_0", "items_2", "streams"].contains(&meta.target()) {
} else if *meta.level() >= tracing::Level::DEBUG {
let mut target_match = false;
for e in &tracing_debug {
if meta.target().starts_with(e) {
target_match = true;
break;
}
}
if target_match {
let mut sr = ctx.lookup_current();
let mut allow = false;
while let Some(g) = sr {
@@ -207,7 +236,7 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
} else {
true
}
});*/
});
let fmt_layer = tracing_subscriber::fmt::Layer::new()
.with_writer(io::stderr)
.with_timer(timer)
@@ -215,9 +244,10 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
.with_ansi(false)
.with_thread_names(true)
.event_format(formatter::FormatTxt)
// .with_filter(filter_3)
.with_filter(filter_2)
.with_filter(filter_1)
.with_filter(filter_3)
// .with_filter(filter_2)
// .with_filter(filter_1)
;
// let fmt_layer = fmt_layer.with_filter(filter_3);
// let fmt_layer: Box<dyn Layer<tracing_subscriber::Registry>> = if std::env::var("RUST_LOG_USE_2").is_ok() {
// let a = fmt_layer.with_filter(filter_2);
@@ -230,7 +260,6 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
// .and_then(LogFilterLayer::new("lay1".into()))
// .and_then(LogFilterLayer::new("lay2".into()))
// let layer_2 = LogFilterLayer::new("lay1".into(), fmt_layer);
;
let reg = tracing_subscriber::registry();