From 584d9776759946edd7d86b0e1d9cb13652f962bd Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 3 Jul 2024 10:20:39 +0200 Subject: [PATCH] Customize the log out --- crates/dbconn/src/channelconfig.rs | 8 +-- crates/items_0/src/timebin.rs | 87 +++++++++++++++++++++------ crates/items_2/src/binnedcollected.rs | 11 +++- crates/items_2/src/binsdim0.rs | 36 ++++++++--- crates/items_2/src/binsxbindim0.rs | 8 ++- crates/items_2/src/channelevents.rs | 21 +++++-- crates/items_2/src/eventsdim0.rs | 34 +++++++---- crates/items_2/src/eventsdim1.rs | 8 ++- crates/items_2/src/eventsxbindim0.rs | 2 + crates/items_2/src/test.rs | 17 +++++- crates/items_2/src/timebin.rs | 53 +++++++++------- crates/netpod/src/netpod.rs | 60 ++++++++++++++++-- crates/streams/src/timebin.rs | 3 +- crates/taskrun/Cargo.toml | 13 +++- crates/taskrun/src/taskrun.rs | 71 +++++++++++++++------- 15 files changed, 329 insertions(+), 103 deletions(-) diff --git a/crates/dbconn/src/channelconfig.rs b/crates/dbconn/src/channelconfig.rs index c031bd3..09c8dc9 100644 --- a/crates/dbconn/src/channelconfig.rs +++ b/crates/dbconn/src/channelconfig.rs @@ -108,15 +108,15 @@ fn decide_best_matching_index(range: (TsMs, TsMs), rows: &[TsMs]) -> Result= range.1 { - Duration::from_millis((range.1.clone() - range.0.clone()).to_u64()) + Duration::from_millis((range.1.clone() - range.0.clone()).ms()) } else { - Duration::from_millis((x.1.clone() - range.0.clone()).to_u64()) + Duration::from_millis((x.1.clone() - range.0.clone()).ms()) } } else { if x.1 >= range.1 { - Duration::from_millis((range.1.clone() - x.0.clone()).to_u64()) + Duration::from_millis((range.1.clone() - x.0.clone()).ms()) } else { - Duration::from_millis((x.1.clone() - x.0.clone()).to_u64()) + Duration::from_millis((x.1.clone() - x.0.clone()).ms()) } }; dur diff --git a/crates/items_0/src/timebin.rs b/crates/items_0/src/timebin.rs index 40aa5a0..fee111f 100644 --- a/crates/items_0/src/timebin.rs +++ b/crates/items_0/src/timebin.rs @@ -54,7 +54,12 @@ pub trait TimeBinnerTy: fmt::Debug + Send + Unpin { pub trait TimeBinnableTy: fmt::Debug + WithLen + Send + Sized { type TimeBinner: TimeBinnerTy; - fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Self::TimeBinner; + fn time_binner_new( + &self, + binrange: BinnedRangeEnum, + do_time_weight: bool, + emit_empty_bins: bool, + ) -> Self::TimeBinner; } /// Data in time-binned form. @@ -95,12 +100,17 @@ impl RangeOverlapInfo for Box { } impl TimeBinnable for Box { - fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box { - todo!() + fn time_binner_new( + &self, + binrange: BinnedRangeEnum, + do_time_weight: bool, + emit_empty_bins: bool, + ) -> Box { + self.as_ref().time_binner_new(binrange, do_time_weight, emit_empty_bins) } fn to_box_to_json_result(&self) -> Box { - todo!() + self.as_ref().to_box_to_json_result() } } @@ -135,7 +145,12 @@ pub trait TimeBinnable: fmt::Debug + WithLen + RangeOverlapInfo + Collectable + Any + AsAnyRef + AsAnyMut + Send { // TODO implementors may fail if edges contain not at least 2 entries. - fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box; + fn time_binner_new( + &self, + binrange: BinnedRangeEnum, + do_time_weight: bool, + emit_empty_bins: bool, + ) -> Box; // TODO just a helper for the empty result. fn to_box_to_json_result(&self) -> Box; } @@ -161,8 +176,13 @@ impl RangeOverlapInfo for Box { } impl TimeBinnable for Box { - fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box { - todo!() + fn time_binner_new( + &self, + binrange: BinnedRangeEnum, + do_time_weight: bool, + emit_empty_bins: bool, + ) -> Box { + self.as_ref().time_binner_new(binrange, do_time_weight, emit_empty_bins) } fn to_box_to_json_result(&self) -> Box { @@ -185,8 +205,13 @@ impl RangeOverlapInfo for Box { } impl TimeBinnable for Box { - fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box { - TimeBinnable::time_binner_new(self.as_ref(), binrange, do_time_weight) + fn time_binner_new( + &self, + binrange: BinnedRangeEnum, + do_time_weight: bool, + emit_empty_bins: bool, + ) -> Box { + TimeBinnable::time_binner_new(self.as_ref(), binrange, do_time_weight, emit_empty_bins) } fn to_box_to_json_result(&self) -> Box { @@ -211,6 +236,7 @@ pub struct TimeBinnerDynStruct { binrange: BinnedRangeEnum, do_time_weight: bool, binner: Option>, + emit_empty_bins: bool, } impl TimeBinnerDynStruct { @@ -218,11 +244,17 @@ impl TimeBinnerDynStruct { std::any::type_name::() } - pub fn new(binrange: BinnedRangeEnum, do_time_weight: bool, binner: Box) -> Self { + pub fn new( + binrange: BinnedRangeEnum, + do_time_weight: bool, + emit_empty_bins: bool, + binner: Box, + ) -> Self { Self { binrange, do_time_weight, binner: Some(binner), + emit_empty_bins, } } } @@ -238,6 +270,7 @@ impl TimeBinnerTy for TimeBinnerDynStruct { item, self.binrange.clone(), self.do_time_weight, + self.emit_empty_bins, ))); } self.binner.as_mut().unwrap().as_mut().ingest(item.as_mut()) @@ -329,6 +362,7 @@ impl TimeBinner for TimeBinnerDynStruct { pub struct TimeBinnerDynStruct2 { binrange: BinnedRangeEnum, do_time_weight: bool, + emit_empty_bins: bool, binner: Option>, } @@ -337,10 +371,16 @@ impl TimeBinnerDynStruct2 { std::any::type_name::() } - pub fn new(binrange: BinnedRangeEnum, do_time_weight: bool, binner: Box) -> Self { + pub fn new( + binrange: BinnedRangeEnum, + do_time_weight: bool, + emit_empty_bins: bool, + binner: Box, + ) -> Self { Self { binrange, do_time_weight, + emit_empty_bins, binner: Some(binner), } } @@ -357,6 +397,7 @@ impl TimeBinnerTy for TimeBinnerDynStruct2 { item, self.binrange.clone(), self.do_time_weight, + self.emit_empty_bins, ))); } self.binner @@ -451,20 +492,32 @@ impl TimeBinner for TimeBinnerDynStruct2 { impl TimeBinnableTy for Box { type TimeBinner = TimeBinnerDynStruct; - fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Self::TimeBinner { - let binner = self.as_ref().time_binner_new(binrange.clone(), do_time_weight); - TimeBinnerDynStruct::new(binrange, do_time_weight, binner) + fn time_binner_new( + &self, + binrange: BinnedRangeEnum, + do_time_weight: bool, + emit_empty_bins: bool, + ) -> Self::TimeBinner { + let binner = self + .as_ref() + .time_binner_new(binrange.clone(), do_time_weight, emit_empty_bins); + TimeBinnerDynStruct::new(binrange, do_time_weight, emit_empty_bins, binner) } } impl TimeBinnableTy for Box { type TimeBinner = TimeBinnerDynStruct2; - fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Self::TimeBinner { + fn time_binner_new( + &self, + binrange: BinnedRangeEnum, + do_time_weight: bool, + emit_empty_bins: bool, + ) -> Self::TimeBinner { let binner = self .as_time_binnable_ref() - .time_binner_new(binrange.clone(), do_time_weight); - TimeBinnerDynStruct2::new(binrange, do_time_weight, binner) + .time_binner_new(binrange.clone(), do_time_weight, emit_empty_bins); + TimeBinnerDynStruct2::new(binrange, do_time_weight, emit_empty_bins, binner) } } diff --git a/crates/items_2/src/binnedcollected.rs b/crates/items_2/src/binnedcollected.rs index 900fa9d..55febc1 100644 --- a/crates/items_2/src/binnedcollected.rs +++ b/crates/items_2/src/binnedcollected.rs @@ -99,6 +99,7 @@ pub struct BinnedCollected { scalar_type: ScalarType, shape: Shape, do_time_weight: bool, + emit_empty_bins: bool, did_timeout: bool, range_final: bool, coll: Option>, @@ -116,6 +117,7 @@ impl BinnedCollected { scalar_type: ScalarType, shape: Shape, do_time_weight: bool, + emit_empty_bins: bool, //transformer: &dyn Transformer, deadline: Instant, inp: Pin>, @@ -126,6 +128,7 @@ impl BinnedCollected { scalar_type, shape, do_time_weight, + emit_empty_bins, did_timeout: false, range_final: false, coll: None, @@ -143,9 +146,11 @@ impl BinnedCollected { RangeCompletableItem::Data(k) => match k { ChannelEvents::Events(mut events) => { if self.binner.is_none() { - let bb = events - .as_time_binnable_mut() - .time_binner_new(self.binrange.clone(), self.do_time_weight); + let bb = events.as_time_binnable_mut().time_binner_new( + self.binrange.clone(), + self.do_time_weight, + self.emit_empty_bins, + ); self.binner = Some(bb); } let binner = self.binner.as_mut().unwrap(); diff --git a/crates/items_2/src/binsdim0.rs b/crates/items_2/src/binsdim0.rs index fe8ed32..73024a4 100644 --- a/crates/items_2/src/binsdim0.rs +++ b/crates/items_2/src/binsdim0.rs @@ -647,8 +647,14 @@ impl TimeBinnableTypeAggregator for BinsDim0Aggregator { } impl TimeBinnable for BinsDim0 { - fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box { + fn time_binner_new( + &self, + binrange: BinnedRangeEnum, + do_time_weight: bool, + emit_empty_bins: bool, + ) -> Box { // TODO get rid of unwrap + // TODO respect emit_empty_bins let ret = BinsDim0TimeBinner::::new(binrange, do_time_weight).unwrap(); Box::new(ret) } @@ -1016,7 +1022,9 @@ fn bins_timebin_fill_empty_00() { bin_cnt: 5, }); let do_time_weight = true; - let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight); + let mut binner = bins + .as_time_binnable_ref() + .time_binner_new(binrange, do_time_weight, false); binner.ingest(&mut bins); binner.append_empty_until_end(); let ready = binner.bins_ready(); @@ -1038,7 +1046,9 @@ fn bins_timebin_fill_empty_01() { bin_cnt: 5, }); let do_time_weight = true; - let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight); + let mut binner = bins + .as_time_binnable_ref() + .time_binner_new(binrange, do_time_weight, false); binner.ingest(&mut bins); binner.push_in_progress(true); binner.append_empty_until_end(); @@ -1061,7 +1071,9 @@ fn bins_timebin_push_empty_00() { bin_cnt: 5, }); let do_time_weight = true; - let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight); + let mut binner = bins + .as_time_binnable_ref() + .time_binner_new(binrange, do_time_weight, false); binner.ingest(&mut bins); binner.push_in_progress(true); let ready = binner.bins_ready(); @@ -1083,7 +1095,9 @@ fn bins_timebin_push_empty_01() { bin_cnt: 5, }); let do_time_weight = true; - let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight); + let mut binner = bins + .as_time_binnable_ref() + .time_binner_new(binrange, do_time_weight, false); binner.ingest(&mut bins); binner.push_in_progress(true); binner.push_in_progress(true); @@ -1109,7 +1123,9 @@ fn bins_timebin_ingest_only_before() { bin_cnt: 5, }); let do_time_weight = true; - let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight); + let mut binner = bins + .as_time_binnable_ref() + .time_binner_new(binrange, do_time_weight, false); binner.ingest(&mut bins); binner.push_in_progress(true); let ready = binner.bins_ready(); @@ -1132,7 +1148,9 @@ fn bins_timebin_ingest_00() { bin_cnt: 5, }); let do_time_weight = true; - let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight); + let mut binner = bins + .as_time_binnable_ref() + .time_binner_new(binrange, do_time_weight, false); binner.ingest(&mut bins); binner.push_in_progress(true); let ready = binner.bins_ready(); @@ -1157,7 +1175,9 @@ fn bins_timebin_ingest_continuous_00() { bins.push(SEC * 20, SEC * 21, 3, 70, 94, 82.); //bins.push(SEC * 21, SEC * 22, 5, 71, 93, 86.); //bins.push(SEC * 23, SEC * 24, 6, 72, 92, 81.); - let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight); + let mut binner = bins + .as_time_binnable_ref() + .time_binner_new(binrange, do_time_weight, false); binner.ingest(&mut bins); //binner.push_in_progress(true); let ready = binner.bins_ready(); diff --git a/crates/items_2/src/binsxbindim0.rs b/crates/items_2/src/binsxbindim0.rs index c97bbbf..cefb254 100644 --- a/crates/items_2/src/binsxbindim0.rs +++ b/crates/items_2/src/binsxbindim0.rs @@ -607,7 +607,13 @@ impl TimeBinnableTypeAggregator for BinsXbinDim0Aggregator } impl TimeBinnable for BinsXbinDim0 { - fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box { + fn time_binner_new( + &self, + binrange: BinnedRangeEnum, + do_time_weight: bool, + emit_empty_bins: bool, + ) -> Box { + // TODO respect emit_empty_bins let ret = BinsXbinDim0TimeBinner::::new(binrange, do_time_weight); Box::new(ret) } diff --git a/crates/items_2/src/channelevents.rs b/crates/items_2/src/channelevents.rs index fe16a6e..3870e8c 100644 --- a/crates/items_2/src/channelevents.rs +++ b/crates/items_2/src/channelevents.rs @@ -820,8 +820,13 @@ impl RangeOverlapInfo for ChannelEvents { } impl TimeBinnable for ChannelEvents { - fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box { - let ret = ::time_binner_new(&self, binrange, do_time_weight); + fn time_binner_new( + &self, + binrange: BinnedRangeEnum, + do_time_weight: bool, + emit_empty_bins: bool, + ) -> Box { + let ret = ::time_binner_new(&self, binrange, do_time_weight, emit_empty_bins); Box::new(ret) } @@ -997,6 +1002,7 @@ pub struct ChannelEventsTimeBinner { // here we would rather require a simplified current state for binning purpose. binrange: BinnedRangeEnum, do_time_weight: bool, + emit_empty_bins: bool, conn_state: ConnStatus, binner: Option>, } @@ -1012,6 +1018,7 @@ impl fmt::Debug for ChannelEventsTimeBinner { fmt.debug_struct(Self::type_name()) .field("binrange", &self.binrange) .field("do_time_weight", &self.do_time_weight) + .field("emit_empty_bins", &self.emit_empty_bins) .field("conn_state", &self.conn_state) .finish() } @@ -1028,7 +1035,7 @@ impl TimeBinnerTy for ChannelEventsTimeBinner { match item { ChannelEvents::Events(item) => { if self.binner.is_none() { - let binner = item.time_binner_new(self.binrange.clone(), self.do_time_weight); + let binner = item.time_binner_new(self.binrange.clone(), self.do_time_weight, self.emit_empty_bins); self.binner = Some(binner); } match self.binner.as_mut() { @@ -1141,7 +1148,12 @@ impl TimeBinner for ChannelEventsTimeBinner { impl TimeBinnableTy for ChannelEvents { type TimeBinner = ChannelEventsTimeBinner; - fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Self::TimeBinner { + fn time_binner_new( + &self, + binrange: BinnedRangeEnum, + do_time_weight: bool, + emit_empty_bins: bool, + ) -> Self::TimeBinner { trace!("TimeBinnableTy for ChannelEvents make ChannelEventsTimeBinner"); // TODO probably wrong? let (binner, status) = match self { @@ -1151,6 +1163,7 @@ impl TimeBinnableTy for ChannelEvents { ChannelEventsTimeBinner { binrange, do_time_weight, + emit_empty_bins, conn_state: status, binner, } diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index 0b95187..0bc7b5a 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -545,7 +545,7 @@ impl TimeAggregatorCommonV0Trait for EventsDim0Aggregator { &self.range } - fn common_ingest_unweight_range(&mut self, item: &Self::Input, r: std::ops::Range) { + fn common_ingest_unweight_range(&mut self, item: &Self::Input, r: core::ops::Range) { for (&ts, val) in item.tss.range(r.clone()).zip(item.values.range(r)) { self.apply_event_unweight(val.clone()); self.count += 1; @@ -560,7 +560,7 @@ impl TimeAggregatorCommonV0Trait for EventsDim0Aggregator { self.last_val = Some(item.values[j].clone()); } - fn common_ingest_range(&mut self, item: &Self::Input, r: std::ops::Range) { + fn common_ingest_range(&mut self, item: &Self::Input, r: core::ops::Range) { let beg = self.range.beg_u64(); for (&ts, val) in item.tss.range(r.clone()).zip(item.values.range(r)) { if ts > beg { @@ -787,9 +787,14 @@ impl TimeBinnableTypeAggregator for EventsDim0Aggregator { } impl TimeBinnable for EventsDim0 { - fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box { + fn time_binner_new( + &self, + binrange: BinnedRangeEnum, + do_time_weight: bool, + emit_empty_bins: bool, + ) -> Box { // TODO get rid of unwrap - let ret = EventsDim0TimeBinner::::new(binrange, do_time_weight).unwrap(); + let ret = EventsDim0TimeBinner::::new(binrange, do_time_weight, emit_empty_bins).unwrap(); Box::new(ret) } @@ -1027,6 +1032,7 @@ pub struct EventsDim0TimeBinner { agg: EventsDim0Aggregator, ready: Option< as TimeBinnableTypeAggregator>::Output>, range_final: bool, + emit_empty_bins: bool, } impl EventsDim0TimeBinner { @@ -1034,11 +1040,12 @@ impl EventsDim0TimeBinner { any::type_name::() } - pub fn new(binrange: BinnedRangeEnum, do_time_weight: bool) -> Result { + pub fn new(binrange: BinnedRangeEnum, do_time_weight: bool, emit_empty_bins: bool) -> Result { trace!("{}::new binrange {:?}", Self::type_name(), binrange); let rng = binrange .range_at(0) .ok_or_else(|| Error::with_msg_no_trace("empty binrange"))?; + trace!("{}::new rng {:?}", Self::type_name(), rng); let agg = EventsDim0Aggregator::new(rng, do_time_weight); let ret = Self { binrange, @@ -1047,6 +1054,7 @@ impl EventsDim0TimeBinner { agg, ready: None, range_final: false, + emit_empty_bins, }; Ok(ret) } @@ -1121,6 +1129,10 @@ impl TimeBinnerCommonV0Trait for EventsDim0TimeBinner { } impl TimeBinner for EventsDim0TimeBinner { + fn ingest(&mut self, item: &mut dyn TimeBinnable) { + TimeBinnerCommonV0Func::ingest(self, item) + } + fn bins_ready_count(&self) -> usize { TimeBinnerCommonV0Trait::common_bins_ready_count(self) } @@ -1132,10 +1144,6 @@ impl TimeBinner for EventsDim0TimeBinner { } } - fn ingest(&mut self, item: &mut dyn TimeBinnable) { - TimeBinnerCommonV0Func::ingest(self, item) - } - fn push_in_progress(&mut self, push_empty: bool) { TimeBinnerCommonV0Func::push_in_progress(self, push_empty) } @@ -1308,7 +1316,7 @@ fn binner_00() { ev1.push(MS * 1200, 3, 1.2f32); ev1.push(MS * 3200, 3, 3.2f32); let binrange = BinnedRangeEnum::from_custom(TsNano::from_ns(SEC), 0, 10); - let mut binner = ev1.time_binner_new(binrange, true); + let mut binner = ev1.time_binner_new(binrange, true, false); binner.ingest(ev1.as_time_binnable_mut()); eprintln!("{:?}", binner); // TODO add actual asserts @@ -1322,7 +1330,7 @@ fn binner_01() { ev1.push(MS * 2100, 3, 2.1); ev1.push(MS * 2300, 3, 2.3); let binrange = BinnedRangeEnum::from_custom(TsNano::from_ns(SEC), 0, 10); - let mut binner = ev1.time_binner_new(binrange, true); + let mut binner = ev1.time_binner_new(binrange, true, false); binner.ingest(ev1.as_time_binnable_mut()); eprintln!("{:?}", binner); // TODO add actual asserts @@ -1409,7 +1417,9 @@ fn events_timebin_ingest_continuous_00() { let mut bins = EventsDim0::::empty(); bins.push(SEC * 20, 1, 20); bins.push(SEC * 23, 2, 23); - let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight); + let mut binner = bins + .as_time_binnable_ref() + .time_binner_new(binrange, do_time_weight, false); binner.ingest(&mut bins); //binner.push_in_progress(true); let ready = binner.bins_ready(); diff --git a/crates/items_2/src/eventsdim1.rs b/crates/items_2/src/eventsdim1.rs index 9e959e7..ea7b1e4 100644 --- a/crates/items_2/src/eventsdim1.rs +++ b/crates/items_2/src/eventsdim1.rs @@ -742,7 +742,13 @@ impl TimeBinnableTypeAggregator for EventsDim1Aggregator { } impl TimeBinnable for EventsDim1 { - fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box { + fn time_binner_new( + &self, + binrange: BinnedRangeEnum, + do_time_weight: bool, + emit_empty_bins: bool, + ) -> Box { + // TODO respect emit_empty_bins let ret = EventsDim1TimeBinner::::new(binrange, do_time_weight).unwrap(); Box::new(ret) } diff --git a/crates/items_2/src/eventsxbindim0.rs b/crates/items_2/src/eventsxbindim0.rs index b8e004d..12f4aae 100644 --- a/crates/items_2/src/eventsxbindim0.rs +++ b/crates/items_2/src/eventsxbindim0.rs @@ -613,7 +613,9 @@ where &self, binrange: BinnedRangeEnum, do_time_weight: bool, + emit_empty_bins: bool, ) -> Box { + // TODO respect emit_empty_bins let ret = EventsXbinDim0TimeBinner::::new(binrange, do_time_weight).unwrap(); Box::new(ret) } diff --git a/crates/items_2/src/test.rs b/crates/items_2/src/test.rs index 85ae0d0..0d032e6 100644 --- a/crates/items_2/src/test.rs +++ b/crates/items_2/src/test.rs @@ -373,11 +373,13 @@ fn bin_00() { let binrange = BinnedRangeEnum::covering_range(range.into(), 10).unwrap(); let deadline = Instant::now() + Duration::from_millis(4000); let do_time_weight = true; + let emit_empty_bins = false; let res = BinnedCollected::new( binrange, ScalarType::F32, Shape::Scalar, do_time_weight, + emit_empty_bins, deadline, Box::pin(stream), ) @@ -421,11 +423,13 @@ fn bin_01() { let stream = Box::pin(stream); let deadline = Instant::now() + Duration::from_millis(4000); let do_time_weight = true; + let emit_empty_bins = false; let res = BinnedCollected::new( binrange, ScalarType::F32, Shape::Scalar, do_time_weight, + emit_empty_bins, deadline, Box::pin(stream), ) @@ -481,8 +485,17 @@ fn binned_timeout_00() { let timeout = Duration::from_millis(400); let deadline = Instant::now() + timeout; let do_time_weight = true; - let res = - BinnedCollected::new(binrange, ScalarType::F32, Shape::Scalar, do_time_weight, deadline, inp1).await?; + let emit_empty_bins = false; + let res = BinnedCollected::new( + binrange, + ScalarType::F32, + Shape::Scalar, + do_time_weight, + emit_empty_bins, + deadline, + inp1, + ) + .await?; let r2: &BinsDim0CollectedResult = res.result.as_any_ref().downcast_ref().expect("res seems wrong type"); eprintln!("rs: {r2:?}"); assert_eq!(SEC * r2.ts_anchor_sec(), TSBASE + SEC); diff --git a/crates/items_2/src/timebin.rs b/crates/items_2/src/timebin.rs index 63bf142..010d416 100644 --- a/crates/items_2/src/timebin.rs +++ b/crates/items_2/src/timebin.rs @@ -12,14 +12,20 @@ use std::ops::Range; #[allow(unused)] macro_rules! trace_ingest { - ($($arg:tt)*) => {}; - ($($arg:tt)*) => { trace!($($arg)*); }; + ($($arg:tt)*) => { + if false { + trace!($($arg)*); + } + }; } #[allow(unused)] macro_rules! trace_ingest_item { - ($($arg:tt)*) => {}; - ($($arg:tt)*) => { trace!($($arg)*); }; + ($($arg:tt)*) => { + if true { + info!($($arg)*); + } + }; } #[allow(unused)] @@ -45,23 +51,15 @@ pub trait TimeBinnerCommonV0Trait { pub struct TimeBinnerCommonV0Func {} impl TimeBinnerCommonV0Func { - pub fn agg_ingest(binner: &mut B, item: &mut ::Input) - where - B: TimeBinnerCommonV0Trait, - { - //self.agg.ingest(item); - ::common_agg_ingest(binner, item) - } - pub fn ingest(binner: &mut B, item: &mut dyn TimeBinnable) where B: TimeBinnerCommonV0Trait, { let self_name = B::type_name(); trace_ingest_item!( - "TimeBinner for {} ingest agg.range {:?} item {:?}", - Self::type_name(), - self.agg.range(), + "TimeBinner for {} ingest common_range_current {:?} item {:?}", + self_name, + binner.common_range_current(), item ); if item.len() == 0 { @@ -120,6 +118,14 @@ impl TimeBinnerCommonV0Func { } } + fn agg_ingest(binner: &mut B, item: &mut ::Input) + where + B: TimeBinnerCommonV0Trait, + { + //self.agg.ingest(item); + ::common_agg_ingest(binner, item) + } + pub fn push_in_progress(binner: &mut B, push_empty: bool) where B: TimeBinnerCommonV0Trait, @@ -200,20 +206,21 @@ impl ChooseIndicesForTimeBinEvents { } pub fn choose_timeweight(beg: u64, end: u64, tss: &VecDeque) -> (Option, usize, usize) { + let self_name = "choose_timeweight"; // TODO improve via binary search. let mut one_before = None; let mut j = 0; let mut k = tss.len(); for (i1, &ts) in tss.iter().enumerate() { if ts >= end { - trace_ingest!("{self_name} ingest {:6} {:20} {:10?} AFTER", i1, ts, val); + trace_ingest!("{self_name} ingest {:6} {:20} AFTER", i1, ts); // TODO count all the ignored events for stats k = i1; break; } else if ts >= beg { - trace_ingest!("{self_name} ingest {:6} {:20} {:10?} INSIDE", i1, ts, val); + trace_ingest!("{self_name} ingest {:6} {:20} INSIDE", i1, ts); } else { - trace_ingest!("{self_name} ingest {:6} {:20} {:10?} BEFORE", i1, ts, val); + trace_ingest!("{self_name} ingest {:6} {:20} BEFORE", i1, ts); one_before = Some(i1); j = i1 + 1; } @@ -223,7 +230,7 @@ impl ChooseIndicesForTimeBinEvents { } pub trait TimeAggregatorCommonV0Trait { - type Input: RangeOverlapInfo + ChooseIndicesForTimeBin + 'static; + type Input: WithLen + RangeOverlapInfo + ChooseIndicesForTimeBin + 'static; type Output: WithLen + Empty + AppendEmptyBin + HasNonemptyFirstBin + 'static; fn type_name() -> &'static str; fn common_range_current(&self) -> &SeriesRange; @@ -240,10 +247,12 @@ impl TimeAggregatorCommonV0Func { B: TimeAggregatorCommonV0Trait, { let self_name = B::type_name(); + // TODO + let items_seen = 777; trace_ingest!( "{self_name}::ingest_unweight item len {} items_seen {}", item.len(), - self.items_seen + items_seen ); let rng = B::common_range_current(binner); if rng.is_time() { @@ -265,10 +274,12 @@ impl TimeAggregatorCommonV0Func { B: TimeAggregatorCommonV0Trait, { let self_name = B::type_name(); + // TODO + let items_seen = 777; trace_ingest!( "{self_name}::ingest_time_weight item len {} items_seen {}", item.len(), - self.items_seen + items_seen ); let rng = B::common_range_current(binner); if rng.is_time() { diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 0a28ad1..26dddb1 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -6,8 +6,56 @@ pub mod status; pub mod streamext; pub mod ttl; +pub mod log_macros { + #[allow(unused)] + #[macro_export] + macro_rules! trace { + ($($arg:tt)*) => { + eprintln!($($arg)*); + }; + } + + #[allow(unused)] + #[macro_export] + macro_rules! debug { + ($($arg:tt)*) => { + eprintln!($($arg)*); + }; + } + + #[allow(unused)] + #[macro_export] + macro_rules! info { + ($($arg:tt)*) => { + eprintln!($($arg)*); + }; + } + + #[allow(unused)] + #[macro_export] + macro_rules! warn { + ($($arg:tt)*) => { + eprintln!($($arg)*); + }; + } + + #[allow(unused)] + #[macro_export] + macro_rules! error { + ($($arg:tt)*) => { + eprintln!($($arg)*); + }; + } +} + pub mod log { - pub use tracing::{self, debug, error, event, info, span, trace, warn, Level}; + pub use tracing::{self, event, span, Level}; + pub use tracing::{debug, error, info, trace, warn}; +} + +pub mod log2 { + pub use crate::{debug, error, info, trace, warn}; + pub use tracing::{self, event, span, Level}; } use crate::log::*; @@ -1500,8 +1548,8 @@ impl DtNano { Self(ns) } - pub const fn from_ms(ns: u64) -> Self { - Self(1000000 * ns) + pub const fn from_ms(ms: u64) -> Self { + Self(1000000 * ms) } pub const fn ns(&self) -> u64 { @@ -2544,11 +2592,11 @@ impl fmt::Display for TsMs { } } -impl std::ops::Sub for TsMs { - type Output = TsMs; +impl core::ops::Sub for TsMs { + type Output = DtMs; fn sub(self, rhs: Self) -> Self::Output { - Self(self.0.saturating_sub(rhs.0)) + DtMs(self.0.saturating_sub(rhs.0)) } } diff --git a/crates/streams/src/timebin.rs b/crates/streams/src/timebin.rs index 59ac179..ee0cb76 100644 --- a/crates/streams/src/timebin.rs +++ b/crates/streams/src/timebin.rs @@ -83,10 +83,11 @@ where } fn process_item(&mut self, mut item: T) -> () { + let emit_empty_bins = true; trace2!("process_item {item:?}"); if self.binner.is_none() { trace!("process_item call time_binner_new"); - let binner = item.time_binner_new(self.range.clone(), self.do_time_weight); + let binner = item.time_binner_new(self.range.clone(), self.do_time_weight, emit_empty_bins); self.binner = Some(binner); } let binner = self.binner.as_mut().unwrap(); diff --git a/crates/taskrun/Cargo.toml b/crates/taskrun/Cargo.toml index 890647e..db1c073 100644 --- a/crates/taskrun/Cargo.toml +++ b/crates/taskrun/Cargo.toml @@ -13,9 +13,18 @@ futures-util = "0.3.28" tracing = "0.1.40" tracing-log = "0.2.0" tracing-subscriber = { version = "0.3.18", features = ["fmt", "time"] } -#tracing-loki = { version = "0.2.1", default-features = false, features = ["compat-0-2-1"] } -console-subscriber = { version = "0.2.0" } time = { version = "0.3", features = ["formatting"] } backtrace = "0.3.71" chrono = "0.4.38" err = { path = "../err" } + + +[features] +with-console = [] +#console-subscriber = { version = "0.3.0" } + +with-loki = [] +#tracing-loki = { version = "0.2.1", default-features = false, features = ["compat-0-2-1"] } + +# The tracing subscriber env filter would require its feature "env-filter" enabled +with-env-filter = [] diff --git a/crates/taskrun/src/taskrun.rs b/crates/taskrun/src/taskrun.rs index aebe991..41c267f 100644 --- a/crates/taskrun/src/taskrun.rs +++ b/crates/taskrun/src/taskrun.rs @@ -3,7 +3,7 @@ pub mod formatter; pub use tokio; use crate::log::*; -use console_subscriber::ConsoleLayer; +// use console_subscriber::ConsoleLayer; use err::Error; use std::fmt; use std::future::Future; @@ -122,6 +122,7 @@ where L: tracing_subscriber::Layer, S: tracing::Subscriber, { + #[allow(unused)] fn new(name: String, inner: L) -> Self { Self { name, @@ -138,6 +139,15 @@ where { } +fn collect_env_list(env: &str) -> Vec { + std::env::var(env) + .unwrap_or(String::new()) + .split(",") + .map(str::trim) + .map(ToString::to_string) + .collect() +} + fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; @@ -150,30 +160,42 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { // Only async console // let console_layer = console_subscriber::spawn(); // let console_layer = ConsoleLayer::builder().with_default_env().init(); - let console_layer = ConsoleLayer::builder().spawn(); - tracing_subscriber::registry() - .with(console_layer) - .with(tracing_subscriber::fmt::layer().with_ansi(false)) - // .with(other_layer) - .init(); - console_subscriber::init(); + + #[cfg(feature = "with-console")] + { + let console_layer = ConsoleLayer::builder().spawn(); + tracing_subscriber::registry() + .with(console_layer) + .with(tracing_subscriber::fmt::layer().with_ansi(false)) + // .with(other_layer) + .init(); + console_subscriber::init(); + } } else { // Logging setup + #[cfg(feature = "with-env-filter")] let filter_1 = tracing_subscriber::EnvFilter::builder() .with_default_directive(tracing::metadata::LevelFilter::INFO.into()) .from_env() .map_err(|e| Error::with_msg_no_trace(format!("can not build tracing env filter {e}")))?; + #[cfg(feature = "with-env-filter")] let filter_2 = tracing_subscriber::EnvFilter::builder() .with_env_var("RUST_LOG_2") .with_default_directive(tracing::metadata::LevelFilter::INFO.into()) .from_env() .map_err(|e| Error::with_msg_no_trace(format!("can not build tracing env filter {e}")))?; - /*let filter_3 = tracing_subscriber::filter::dynamic_filter_fn(|meta, ctx| { - if true { - return false; - } - if *meta.level() <= tracing::Level::TRACE { - if ["httpret", "scyllaconn"].contains(&meta.target()) { + let tracing_debug = collect_env_list("TRACING_DEBUG"); + let tracing_trace = collect_env_list("TRACING_TRACE"); + let filter_3 = tracing_subscriber::filter::DynFilterFn::new(move |meta, ctx| { + if *meta.level() >= tracing::Level::TRACE { + let mut target_match = false; + for e in &tracing_trace { + if meta.target().starts_with(e) { + target_match = true; + break; + } + } + if target_match { let mut sr = ctx.lookup_current(); let mut allow = false; while let Some(g) = sr { @@ -188,8 +210,15 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { } else { false } - } else if *meta.level() <= tracing::Level::DEBUG { - if ["httpret", "scyllaconn", "items_0", "items_2", "streams"].contains(&meta.target()) { + } else if *meta.level() >= tracing::Level::DEBUG { + let mut target_match = false; + for e in &tracing_debug { + if meta.target().starts_with(e) { + target_match = true; + break; + } + } + if target_match { let mut sr = ctx.lookup_current(); let mut allow = false; while let Some(g) = sr { @@ -207,7 +236,7 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { } else { true } - });*/ + }); let fmt_layer = tracing_subscriber::fmt::Layer::new() .with_writer(io::stderr) .with_timer(timer) @@ -215,9 +244,10 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { .with_ansi(false) .with_thread_names(true) .event_format(formatter::FormatTxt) - // .with_filter(filter_3) - .with_filter(filter_2) - .with_filter(filter_1) + .with_filter(filter_3) + // .with_filter(filter_2) + // .with_filter(filter_1) + ; // let fmt_layer = fmt_layer.with_filter(filter_3); // let fmt_layer: Box> = if std::env::var("RUST_LOG_USE_2").is_ok() { // let a = fmt_layer.with_filter(filter_2); @@ -230,7 +260,6 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { // .and_then(LogFilterLayer::new("lay1".into())) // .and_then(LogFilterLayer::new("lay2".into())) // let layer_2 = LogFilterLayer::new("lay1".into(), fmt_layer); - ; let reg = tracing_subscriber::registry();