diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index 0d60bd6..a71a2f9 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqbuffer" -version = "0.5.3" +version = "0.5.4" authors = ["Dominik Werder "] edition = "2021" diff --git a/crates/items_0/src/timebin.rs b/crates/items_0/src/timebin.rs index 72e97ed..0e846fc 100644 --- a/crates/items_0/src/timebin.rs +++ b/crates/items_0/src/timebin.rs @@ -23,6 +23,7 @@ pub trait TimeBins { fn ts_min_max(&self) -> Option<(u64, u64)>; } +// TODO remove pub trait TimeBinnerTy: fmt::Debug + Send + Unpin { type Input: fmt::Debug; type Output: fmt::Debug; diff --git a/crates/items_2/src/binning/container_events.rs b/crates/items_2/src/binning/container_events.rs index 45e38f8..5a08ff8 100644 --- a/crates/items_2/src/binning/container_events.rs +++ b/crates/items_2/src/binning/container_events.rs @@ -2,7 +2,6 @@ use super::aggregator::AggTimeWeightOutputAvg; use super::aggregator::AggregatorNumeric; use super::aggregator::AggregatorTimeWeight; use super::timeweight::timeweight_events_dyn::BinnedEventsTimeweightDynbox; -use super::___; use core::fmt; use err::thiserror; use err::ThisError; @@ -123,20 +122,6 @@ where vals: ::Container, } -macro_rules! try_to_events_dim0 { - ($sty:ty, $this:expr) => { - let this = $this; - if let Some(evs) = this.as_any_ref().downcast_ref::>() { - use crate::eventsdim0::EventsDim0; - let tss: VecDeque<_> = this.tss.iter().map(|x| x.ns()).collect(); - let pulses = tss.iter().map(|_| 0).collect(); - let values = evs.vals.clone(); - let ret = EventsDim0::<$sty> { tss, pulses, values }; - return Box::new(ret); - } - }; -} - impl ContainerEvents where EVT: EventValueType, @@ -193,12 +178,6 @@ where self.tss.push_back(ts); self.vals.push_back(val); } - - pub fn to_events_dim0(&self) -> Box { - try_to_events_dim0!(f64, self); - let styn = any::type_name::(); - todo!("TODO to_container_events for {styn}") - } } impl fmt::Debug for ContainerEvents diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index 9aef218..8aeadc6 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -1,8 +1,3 @@ -use crate::binsdim0::BinsDim0; -use crate::timebin::ChooseIndicesForTimeBin; -use crate::timebin::ChooseIndicesForTimeBinEvents; -use crate::timebin::TimeAggregatorCommonV0Func; -use crate::timebin::TimeAggregatorCommonV0Trait; use crate::IsoDateTime; use err::Error; use items_0::collect_s::CollectableDyn; @@ -12,7 +7,6 @@ use items_0::collect_s::ToJsonResult; use items_0::container::ByteEstimate; use items_0::overlap::HasTimestampDeque; use items_0::scalar_ops::ScalarOps; -use items_0::AppendAllFrom; use items_0::Appendable; use items_0::AsAnyMut; use items_0::AsAnyRef; @@ -220,16 +214,6 @@ impl HasTimestampDeque for EventsDim0 { } } -impl ChooseIndicesForTimeBin for EventsDim0 { - fn choose_indices_unweight(&self, beg: u64, end: u64) -> (Option, usize, usize) { - ChooseIndicesForTimeBinEvents::choose_unweight(beg, end, &self.tss) - } - - fn choose_indices_timeweight(&self, beg: u64, end: u64) -> (Option, usize, usize) { - ChooseIndicesForTimeBinEvents::choose_timeweight(beg, end, &self.tss) - } -} - #[derive(Debug, Serialize, Deserialize)] pub struct EventsDim0ChunkOutput { tss: VecDeque, @@ -855,33 +839,6 @@ mod test_frame { panic!() }; assert_eq!(item.tss(), &[123]); - #[cfg(DISABLED)] - { - eprintln!("NOW WE SEE: {:?}", item); - // type_name_of_val alloc::boxed::Box - eprintln!("0 {:22?}", item.as_any_mut().type_id()); - eprintln!("A {:22?}", std::any::TypeId::of::>()); - eprintln!("B {:22?}", std::any::TypeId::of::()); - eprintln!("C {:22?}", std::any::TypeId::of::<&dyn items_0::Events>()); - eprintln!("D {:22?}", std::any::TypeId::of::<&mut dyn items_0::Events>()); - eprintln!("E {:22?}", std::any::TypeId::of::<&mut Box>()); - eprintln!("F {:22?}", std::any::TypeId::of::>>()); - eprintln!("G {:22?}", std::any::TypeId::of::<&EventsDim0>()); - eprintln!("H {:22?}", std::any::TypeId::of::<&mut EventsDim0>()); - eprintln!("I {:22?}", std::any::TypeId::of::>>>()); - //let item = item.as_mut(); - //eprintln!("1 {:22?}", item.type_id()); - /* - let item = if let Some(item) = - items_0::collect_s::Collectable::as_any_mut(item).downcast_ref::>>() - { - item - } else { - panic!() - }; - */ - //eprintln!("Final value: {item:?}"); - } } } diff --git a/crates/items_2/src/items_2.rs b/crates/items_2/src/items_2.rs index 9f709f0..00030c8 100644 --- a/crates/items_2/src/items_2.rs +++ b/crates/items_2/src/items_2.rs @@ -17,7 +17,6 @@ pub mod streams; #[cfg(test)] pub mod test; pub mod testgen; -pub mod timebin; pub mod transform; use channelevents::ChannelEvents; diff --git a/crates/items_2/src/timebin.rs b/crates/items_2/src/timebin.rs deleted file mode 100644 index 290a1e0..0000000 --- a/crates/items_2/src/timebin.rs +++ /dev/null @@ -1,139 +0,0 @@ -use items_0::AppendEmptyBin; -use items_0::Empty; -use items_0::HasNonemptyFirstBin; -use items_0::WithLen; -use netpod::log::*; -use netpod::range::evrange::SeriesRange; -use std::any; -use std::collections::VecDeque; -use std::ops::Range; - -#[allow(unused)] -macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } - -#[allow(unused)] -macro_rules! trace_ingest_item { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } - -#[allow(unused)] -macro_rules! trace_ingest_event { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } - -#[allow(unused)] -macro_rules! trace_ingest_detail { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } - -pub trait ChooseIndicesForTimeBin { - fn choose_indices_unweight(&self, beg: u64, end: u64) -> (Option, usize, usize); - fn choose_indices_timeweight(&self, beg: u64, end: u64) -> (Option, usize, usize); -} - -pub struct ChooseIndicesForTimeBinEvents {} - -impl ChooseIndicesForTimeBinEvents { - pub fn choose_unweight(beg: u64, end: u64, tss: &VecDeque) -> (Option, usize, usize) { - // TODO improve via binary search. - let mut one_before = None; - let mut j = 0; - let mut k = tss.len(); - for (i1, &ts) in tss.iter().enumerate() { - if ts >= end { - break; - } else if ts >= beg { - } else { - one_before = Some(i1); - j = i1 + 1; - } - } - (one_before, j, k) - } - - pub fn choose_timeweight(beg: u64, end: u64, tss: &VecDeque) -> (Option, usize, usize) { - let self_name = "choose_timeweight"; - // TODO improve via binary search. - let mut one_before = None; - let mut j = 0; - let mut k = tss.len(); - for (i1, &ts) in tss.iter().enumerate() { - if ts >= end { - trace_ingest_event!("{self_name} ingest {:6} {:20} AFTER", i1, ts); - // TODO count all the ignored events for stats - k = i1; - break; - } else if ts >= beg { - trace_ingest_event!("{self_name} ingest {:6} {:20} INSIDE", i1, ts); - } else { - trace_ingest_event!("{self_name} ingest {:6} {:20} BEFORE", i1, ts); - one_before = Some(i1); - j = i1 + 1; - } - } - trace_ingest_item!("{self_name} chosen {one_before:?} {j:?} {k:?}"); - (one_before, j, k) - } -} - -pub trait TimeAggregatorCommonV0Trait { - type Input: WithLen + ChooseIndicesForTimeBin + 'static; - type Output: WithLen + Empty + AppendEmptyBin + HasNonemptyFirstBin + 'static; - fn type_name() -> &'static str; - fn common_range_current(&self) -> &SeriesRange; - fn common_ingest_unweight_range(&mut self, item: &Self::Input, r: Range); - fn common_ingest_one_before(&mut self, item: &Self::Input, j: usize); - fn common_ingest_range(&mut self, item: &Self::Input, r: Range); -} - -pub struct TimeAggregatorCommonV0Func {} - -impl TimeAggregatorCommonV0Func { - pub fn ingest_unweight(binner: &mut B, item: &B::Input) - where - B: TimeAggregatorCommonV0Trait, - { - let self_name = B::type_name(); - // TODO - let items_seen = 777; - trace_ingest_item!( - "{self_name}::ingest_unweight item len {} items_seen {}", - item.len(), - items_seen - ); - let rng = B::common_range_current(binner); - if rng.is_time() { - let beg = rng.beg_u64(); - let end = rng.end_u64(); - let (one_before, j, k) = item.choose_indices_unweight(beg, end); - if let Some(j) = one_before { - //::common_ingest_one_before(binner, item, j); - } - ::common_ingest_unweight_range(binner, item, j..k); - } else { - error!("TODO ingest_unweight for pulse range"); - err::todo(); - } - } - - pub fn ingest_time_weight(binner: &mut B, item: &B::Input) - where - B: TimeAggregatorCommonV0Trait, - { - let self_name = B::type_name(); - // TODO - let items_seen = 777; - trace_ingest_item!( - "{self_name}::ingest_time_weight item len {} items_seen {}", - item.len(), - items_seen - ); - let rng = B::common_range_current(binner); - if rng.is_time() { - let beg = rng.beg_u64(); - let end = rng.end_u64(); - let (one_before, j, k) = item.choose_indices_timeweight(beg, end); - if let Some(j) = one_before { - ::common_ingest_one_before(binner, item, j); - } - ::common_ingest_range(binner, item, j..k); - } else { - error!("TODO ingest_time_weight for pulse range"); - err::todo(); - } - } -} diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index 3d02961..5aa5e0c 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -44,6 +44,7 @@ pub async fn scylla_channel_event_stream( evq.settings().scylla_read_queue_len(), ); let stream: Pin + Send>> = if let Some(rt) = evq.use_rt() { + info!("========= SOLO {rt:?} ====================="); let x = scyllaconn::events2::events::EventsStreamRt::new( rt, chconf.clone(), @@ -54,6 +55,7 @@ pub async fn scylla_channel_event_stream( .map_err(|e| scyllaconn::events2::mergert::Error::from(e)); Box::pin(x) } else { + info!("========= MERGED ====================="); let x = scyllaconn::events2::mergert::MergeRts::new(chconf.clone(), evq.range().into(), readopts, scyqueue.clone()); Box::pin(x) diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index c9d69f1..cc9bf6b 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -620,6 +620,7 @@ impl Stream for EventsStreamRt { }, State::ReadingFwd(st) => { let mut have_pending = false; + let mut dbg_have_new_msp_fut = false; if let Some(fut) = st.msp_fut.as_mut() { match fut.fut.poll_unpin(cx) { Ready(a) => { @@ -648,6 +649,7 @@ impl Stream for EventsStreamRt { trace_msp_fetch!("create msp read fut"); let fut = Self::make_msp_read_fut(&mut self2.msp_inp); st.msp_fut = Some(FetchMsp { fut }); + dbg_have_new_msp_fut = true; } if st.qu.has_space() { Self::redo_fwd_read(st, msp_buf); @@ -683,6 +685,8 @@ impl Stream for EventsStreamRt { continue; } else if self.out.len() != 0 { continue; + } else if dbg_have_new_msp_fut { + continue; } else { panic!("not pending, nothing to output") } diff --git a/crates/streams/src/plaineventscbor.rs b/crates/streams/src/plaineventscbor.rs index af59606..62cee23 100644 --- a/crates/streams/src/plaineventscbor.rs +++ b/crates/streams/src/plaineventscbor.rs @@ -23,7 +23,6 @@ pub async fn plain_events_cbor_stream( ctx: &ReqCtx, open_bytes: OpenBoxedBytesStreamsBox, ) -> Result { - trace!("build stream"); let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?; let stream = events_stream_to_cbor_stream(stream); let stream = non_empty(stream); diff --git a/crates/streams/src/plaineventsstream.rs b/crates/streams/src/plaineventsstream.rs index 8fc0140..21af217 100644 --- a/crates/streams/src/plaineventsstream.rs +++ b/crates/streams/src/plaineventsstream.rs @@ -57,18 +57,18 @@ pub async fn dyn_events_stream( // TODO propagate also the max-buf-len for the first stage event reader. // TODO use a mixture of count and byte-size as threshold. let stream = Merger::new(inps, evq.merger_out_len_max()); - #[cfg(DISABLED)] - let stream = stream.map(|item| { - info!("item after merge: {item:?}"); - item - }); - //#[cfg(DISABLED)] + + // let stream = stream.map(|item| { + // info!("item after merge: {item:?}"); + // item + // }); + let stream = crate::rangefilter2::RangeFilter2::new(stream, evq.range().try_into()?, evq.one_before_range()); - #[cfg(DISABLED)] - let stream = stream.map(|item| { - info!("item after rangefilter: {item:?}"); - item - }); + + // let stream = stream.map(|item| { + // info!("item after rangefilter: {item:?}"); + // item + // }); let stream = stream.map(move |k| { on_sitemty_data!(k, |k| { @@ -83,12 +83,11 @@ pub async fn dyn_events_stream( let stream = transform_wasm(stream, wasmname, ctx).await?; Ok(Box::pin(stream)) } else { - // let stream = stream.map(|x| x); Ok(Box::pin(stream)) } } -#[cfg(not(wasm_transform))] +#[cfg(not(feature = "wasm_transform"))] async fn transform_wasm( stream: INP, _wasmname: &str, @@ -101,8 +100,7 @@ where Ok(ret) } -#[cfg(DISABLED)] -#[cfg(wasm_transform)] +#[cfg(feature = "wasm_transform")] async fn transform_wasm( stream: INP, wasmname: &str,