release 0.5.4

This commit is contained in:
Dominik Werder
2024-10-31 23:11:02 +01:00
parent 4e1874381c
commit 3b31c244ee
10 changed files with 21 additions and 221 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqbuffer"
version = "0.5.3"
version = "0.5.4"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -23,6 +23,7 @@ pub trait TimeBins {
fn ts_min_max(&self) -> Option<(u64, u64)>;
}
// TODO remove
pub trait TimeBinnerTy: fmt::Debug + Send + Unpin {
type Input: fmt::Debug;
type Output: fmt::Debug;

View File

@@ -2,7 +2,6 @@ use super::aggregator::AggTimeWeightOutputAvg;
use super::aggregator::AggregatorNumeric;
use super::aggregator::AggregatorTimeWeight;
use super::timeweight::timeweight_events_dyn::BinnedEventsTimeweightDynbox;
use super::___;
use core::fmt;
use err::thiserror;
use err::ThisError;
@@ -123,20 +122,6 @@ where
vals: <EVT as EventValueType>::Container,
}
macro_rules! try_to_events_dim0 {
($sty:ty, $this:expr) => {
let this = $this;
if let Some(evs) = this.as_any_ref().downcast_ref::<ContainerEvents<$sty>>() {
use crate::eventsdim0::EventsDim0;
let tss: VecDeque<_> = this.tss.iter().map(|x| x.ns()).collect();
let pulses = tss.iter().map(|_| 0).collect();
let values = evs.vals.clone();
let ret = EventsDim0::<$sty> { tss, pulses, values };
return Box::new(ret);
}
};
}
impl<EVT> ContainerEvents<EVT>
where
EVT: EventValueType,
@@ -193,12 +178,6 @@ where
self.tss.push_back(ts);
self.vals.push_back(val);
}
pub fn to_events_dim0(&self) -> Box<dyn items_0::Events> {
try_to_events_dim0!(f64, self);
let styn = any::type_name::<EVT>();
todo!("TODO to_container_events for {styn}")
}
}
impl<EVT> fmt::Debug for ContainerEvents<EVT>

View File

@@ -1,8 +1,3 @@
use crate::binsdim0::BinsDim0;
use crate::timebin::ChooseIndicesForTimeBin;
use crate::timebin::ChooseIndicesForTimeBinEvents;
use crate::timebin::TimeAggregatorCommonV0Func;
use crate::timebin::TimeAggregatorCommonV0Trait;
use crate::IsoDateTime;
use err::Error;
use items_0::collect_s::CollectableDyn;
@@ -12,7 +7,6 @@ use items_0::collect_s::ToJsonResult;
use items_0::container::ByteEstimate;
use items_0::overlap::HasTimestampDeque;
use items_0::scalar_ops::ScalarOps;
use items_0::AppendAllFrom;
use items_0::Appendable;
use items_0::AsAnyMut;
use items_0::AsAnyRef;
@@ -220,16 +214,6 @@ impl<STY: ScalarOps> HasTimestampDeque for EventsDim0<STY> {
}
}
impl<STY> ChooseIndicesForTimeBin for EventsDim0<STY> {
fn choose_indices_unweight(&self, beg: u64, end: u64) -> (Option<usize>, usize, usize) {
ChooseIndicesForTimeBinEvents::choose_unweight(beg, end, &self.tss)
}
fn choose_indices_timeweight(&self, beg: u64, end: u64) -> (Option<usize>, usize, usize) {
ChooseIndicesForTimeBinEvents::choose_timeweight(beg, end, &self.tss)
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct EventsDim0ChunkOutput<STY> {
tss: VecDeque<u64>,
@@ -855,33 +839,6 @@ mod test_frame {
panic!()
};
assert_eq!(item.tss(), &[123]);
#[cfg(DISABLED)]
{
eprintln!("NOW WE SEE: {:?}", item);
// type_name_of_val alloc::boxed::Box<dyn items_0::Events>
eprintln!("0 {:22?}", item.as_any_mut().type_id());
eprintln!("A {:22?}", std::any::TypeId::of::<Box<dyn items_0::Events>>());
eprintln!("B {:22?}", std::any::TypeId::of::<dyn items_0::Events>());
eprintln!("C {:22?}", std::any::TypeId::of::<&dyn items_0::Events>());
eprintln!("D {:22?}", std::any::TypeId::of::<&mut dyn items_0::Events>());
eprintln!("E {:22?}", std::any::TypeId::of::<&mut Box<dyn items_0::Events>>());
eprintln!("F {:22?}", std::any::TypeId::of::<Box<EventsDim0<f32>>>());
eprintln!("G {:22?}", std::any::TypeId::of::<&EventsDim0<f32>>());
eprintln!("H {:22?}", std::any::TypeId::of::<&mut EventsDim0<f32>>());
eprintln!("I {:22?}", std::any::TypeId::of::<Box<Box<EventsDim0<f32>>>>());
//let item = item.as_mut();
//eprintln!("1 {:22?}", item.type_id());
/*
let item = if let Some(item) =
items_0::collect_s::Collectable::as_any_mut(item).downcast_ref::<Box<EventsDim0<f32>>>()
{
item
} else {
panic!()
};
*/
//eprintln!("Final value: {item:?}");
}
}
}

View File

@@ -17,7 +17,6 @@ pub mod streams;
#[cfg(test)]
pub mod test;
pub mod testgen;
pub mod timebin;
pub mod transform;
use channelevents::ChannelEvents;

View File

@@ -1,139 +0,0 @@
use items_0::AppendEmptyBin;
use items_0::Empty;
use items_0::HasNonemptyFirstBin;
use items_0::WithLen;
use netpod::log::*;
use netpod::range::evrange::SeriesRange;
use std::any;
use std::collections::VecDeque;
use std::ops::Range;
#[allow(unused)]
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_ingest_item { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_ingest_event { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_ingest_detail { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
pub trait ChooseIndicesForTimeBin {
fn choose_indices_unweight(&self, beg: u64, end: u64) -> (Option<usize>, usize, usize);
fn choose_indices_timeweight(&self, beg: u64, end: u64) -> (Option<usize>, usize, usize);
}
pub struct ChooseIndicesForTimeBinEvents {}
impl ChooseIndicesForTimeBinEvents {
pub fn choose_unweight(beg: u64, end: u64, tss: &VecDeque<u64>) -> (Option<usize>, usize, usize) {
// TODO improve via binary search.
let mut one_before = None;
let mut j = 0;
let mut k = tss.len();
for (i1, &ts) in tss.iter().enumerate() {
if ts >= end {
break;
} else if ts >= beg {
} else {
one_before = Some(i1);
j = i1 + 1;
}
}
(one_before, j, k)
}
pub fn choose_timeweight(beg: u64, end: u64, tss: &VecDeque<u64>) -> (Option<usize>, usize, usize) {
let self_name = "choose_timeweight";
// TODO improve via binary search.
let mut one_before = None;
let mut j = 0;
let mut k = tss.len();
for (i1, &ts) in tss.iter().enumerate() {
if ts >= end {
trace_ingest_event!("{self_name} ingest {:6} {:20} AFTER", i1, ts);
// TODO count all the ignored events for stats
k = i1;
break;
} else if ts >= beg {
trace_ingest_event!("{self_name} ingest {:6} {:20} INSIDE", i1, ts);
} else {
trace_ingest_event!("{self_name} ingest {:6} {:20} BEFORE", i1, ts);
one_before = Some(i1);
j = i1 + 1;
}
}
trace_ingest_item!("{self_name} chosen {one_before:?} {j:?} {k:?}");
(one_before, j, k)
}
}
pub trait TimeAggregatorCommonV0Trait {
type Input: WithLen + ChooseIndicesForTimeBin + 'static;
type Output: WithLen + Empty + AppendEmptyBin + HasNonemptyFirstBin + 'static;
fn type_name() -> &'static str;
fn common_range_current(&self) -> &SeriesRange;
fn common_ingest_unweight_range(&mut self, item: &Self::Input, r: Range<usize>);
fn common_ingest_one_before(&mut self, item: &Self::Input, j: usize);
fn common_ingest_range(&mut self, item: &Self::Input, r: Range<usize>);
}
pub struct TimeAggregatorCommonV0Func {}
impl TimeAggregatorCommonV0Func {
pub fn ingest_unweight<B>(binner: &mut B, item: &B::Input)
where
B: TimeAggregatorCommonV0Trait,
{
let self_name = B::type_name();
// TODO
let items_seen = 777;
trace_ingest_item!(
"{self_name}::ingest_unweight item len {} items_seen {}",
item.len(),
items_seen
);
let rng = B::common_range_current(binner);
if rng.is_time() {
let beg = rng.beg_u64();
let end = rng.end_u64();
let (one_before, j, k) = item.choose_indices_unweight(beg, end);
if let Some(j) = one_before {
//<B as TimeAggregatorCommonV0Trait>::common_ingest_one_before(binner, item, j);
}
<B as TimeAggregatorCommonV0Trait>::common_ingest_unweight_range(binner, item, j..k);
} else {
error!("TODO ingest_unweight for pulse range");
err::todo();
}
}
pub fn ingest_time_weight<B>(binner: &mut B, item: &B::Input)
where
B: TimeAggregatorCommonV0Trait,
{
let self_name = B::type_name();
// TODO
let items_seen = 777;
trace_ingest_item!(
"{self_name}::ingest_time_weight item len {} items_seen {}",
item.len(),
items_seen
);
let rng = B::common_range_current(binner);
if rng.is_time() {
let beg = rng.beg_u64();
let end = rng.end_u64();
let (one_before, j, k) = item.choose_indices_timeweight(beg, end);
if let Some(j) = one_before {
<B as TimeAggregatorCommonV0Trait>::common_ingest_one_before(binner, item, j);
}
<B as TimeAggregatorCommonV0Trait>::common_ingest_range(binner, item, j..k);
} else {
error!("TODO ingest_time_weight for pulse range");
err::todo();
}
}
}

View File

@@ -44,6 +44,7 @@ pub async fn scylla_channel_event_stream(
evq.settings().scylla_read_queue_len(),
);
let stream: Pin<Box<dyn Stream<Item = _> + Send>> = if let Some(rt) = evq.use_rt() {
info!("========= SOLO {rt:?} =====================");
let x = scyllaconn::events2::events::EventsStreamRt::new(
rt,
chconf.clone(),
@@ -54,6 +55,7 @@ pub async fn scylla_channel_event_stream(
.map_err(|e| scyllaconn::events2::mergert::Error::from(e));
Box::pin(x)
} else {
info!("========= MERGED =====================");
let x =
scyllaconn::events2::mergert::MergeRts::new(chconf.clone(), evq.range().into(), readopts, scyqueue.clone());
Box::pin(x)

View File

@@ -620,6 +620,7 @@ impl Stream for EventsStreamRt {
},
State::ReadingFwd(st) => {
let mut have_pending = false;
let mut dbg_have_new_msp_fut = false;
if let Some(fut) = st.msp_fut.as_mut() {
match fut.fut.poll_unpin(cx) {
Ready(a) => {
@@ -648,6 +649,7 @@ impl Stream for EventsStreamRt {
trace_msp_fetch!("create msp read fut");
let fut = Self::make_msp_read_fut(&mut self2.msp_inp);
st.msp_fut = Some(FetchMsp { fut });
dbg_have_new_msp_fut = true;
}
if st.qu.has_space() {
Self::redo_fwd_read(st, msp_buf);
@@ -683,6 +685,8 @@ impl Stream for EventsStreamRt {
continue;
} else if self.out.len() != 0 {
continue;
} else if dbg_have_new_msp_fut {
continue;
} else {
panic!("not pending, nothing to output")
}

View File

@@ -23,7 +23,6 @@ pub async fn plain_events_cbor_stream(
ctx: &ReqCtx,
open_bytes: OpenBoxedBytesStreamsBox,
) -> Result<CborStream, Error> {
trace!("build stream");
let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?;
let stream = events_stream_to_cbor_stream(stream);
let stream = non_empty(stream);

View File

@@ -57,18 +57,18 @@ pub async fn dyn_events_stream(
// TODO propagate also the max-buf-len for the first stage event reader.
// TODO use a mixture of count and byte-size as threshold.
let stream = Merger::new(inps, evq.merger_out_len_max());
#[cfg(DISABLED)]
let stream = stream.map(|item| {
info!("item after merge: {item:?}");
item
});
//#[cfg(DISABLED)]
// let stream = stream.map(|item| {
// info!("item after merge: {item:?}");
// item
// });
let stream = crate::rangefilter2::RangeFilter2::new(stream, evq.range().try_into()?, evq.one_before_range());
#[cfg(DISABLED)]
let stream = stream.map(|item| {
info!("item after rangefilter: {item:?}");
item
});
// let stream = stream.map(|item| {
// info!("item after rangefilter: {item:?}");
// item
// });
let stream = stream.map(move |k| {
on_sitemty_data!(k, |k| {
@@ -83,12 +83,11 @@ pub async fn dyn_events_stream(
let stream = transform_wasm(stream, wasmname, ctx).await?;
Ok(Box::pin(stream))
} else {
// let stream = stream.map(|x| x);
Ok(Box::pin(stream))
}
}
#[cfg(not(wasm_transform))]
#[cfg(not(feature = "wasm_transform"))]
async fn transform_wasm<INP>(
stream: INP,
_wasmname: &str,
@@ -101,8 +100,7 @@ where
Ok(ret)
}
#[cfg(DISABLED)]
#[cfg(wasm_transform)]
#[cfg(feature = "wasm_transform")]
async fn transform_wasm<INP>(
stream: INP,
wasmname: &str,