This commit is contained in:
Dominik Werder
2024-09-18 23:59:03 +02:00
parent e4f8ad1e91
commit 049266bfe5
14 changed files with 211 additions and 65 deletions

View File

@@ -0,0 +1,5 @@
pub mod container_events;
pub mod test;
pub mod timeweight;
use super::binning as ___;

View File

@@ -0,0 +1,42 @@
use super::___;
use netpod::TsNano;
use serde::Deserialize;
use serde::Serialize;
use std::collections::VecDeque;
#[allow(unused)]
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
pub trait Container: Clone {}
impl<T> Container for VecDeque<T> where T: EventValueType {}
pub trait EventValueType: Clone {
type Container: Container;
}
impl EventValueType for f32 {
type Container = VecDeque<Self>;
}
#[derive(Clone)]
pub struct ContainerEvents<EVT>
where
EVT: EventValueType,
{
tss: VecDeque<TsNano>,
// vals: VecDeque<EVT>,
vals: VecDeque<<EVT as EventValueType>::Container>,
}
// TODO why does this already impl Serialize even though there is no bound for EVT?
// TODO try to actually instantiate and serialize in a test.
#[derive(Clone, Serialize, Deserialize)]
pub struct ContainerEvents2<EVT>
where
EVT: EventValueType,
{
tss: VecDeque<TsNano>,
vals: VecDeque<EVT>,
}

View File

@@ -0,0 +1,2 @@
use super::___;
use netpod::log::*;

View File

@@ -0,0 +1,14 @@
pub mod timeweight_bins;
pub mod timeweight_events;
use super::___;
use netpod::log::*;
#[allow(unused)]
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_ingest { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_ingest_detail { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }

View File

@@ -0,0 +1,5 @@
use super::___;
use netpod::log::*;
#[allow(unused)]
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }

View File

@@ -0,0 +1,31 @@
use super::super::container_events::EventValueType;
use super::___;
use futures_util::Stream;
use netpod::log::*;
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
#[allow(unused)]
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
pub struct BinnedEventsTimeweight<EVT>
where
EVT: EventValueType,
{
_evt: PhantomData<EVT>,
}
impl<EVT> BinnedEventsTimeweight<EVT> where EVT: EventValueType {}
pub struct BinnedEventsTimeweightStream {}
impl Stream for BinnedEventsTimeweightStream {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
todo!()
}
}

View File

@@ -52,10 +52,7 @@ use std::mem;
use std::ops::Range;
#[allow(unused)]
macro_rules! trace44 {
($($arg:tt)*) => ();
($($arg:tt)*) => (eprintln!($($arg)*));
}
macro_rules! trace_ingest { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
// TODO make members private
#[derive(Clone, PartialEq, Serialize, Deserialize)]
@@ -82,6 +79,9 @@ where
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let self_name = any::type_name::<Self>();
// if true {
// return fmt::Display::fmt(self, fmt);
// }
if true {
write!(
fmt,
@@ -163,7 +163,7 @@ where
let self_name = any::type_name::<Self>();
write!(
fmt,
"{self_name} {{ len: {:?}, ts1s: {}, ts2s {}, counts {}, mins {}, maxs {}, avgs {} }}",
"{self_name} {{ len: {:?}, ts1s: {}, ts2s {}, counts {}, mins {}, maxs {}, avgs {}, lsts {} }}",
self.len(),
VecPreview::new(&self.ts1s),
VecPreview::new(&self.ts2s),
@@ -171,6 +171,7 @@ where
VecPreview::new(&self.mins),
VecPreview::new(&self.maxs),
VecPreview::new(&self.avgs),
VecPreview::new(&self.lsts),
)
}
}
@@ -296,6 +297,7 @@ impl<STY> Resettable for BinsDim0<STY> {
self.mins.clear();
self.maxs.clear();
self.avgs.clear();
self.lsts.clear();
}
}
@@ -327,23 +329,27 @@ items_0::impl_range_overlap_info_bins!(BinsDim0);
impl<NTY: ScalarOps> AppendEmptyBin for BinsDim0<NTY> {
fn append_empty_bin(&mut self, ts1: u64, ts2: u64) {
error!("AppendEmptyBin::append_empty_bin should not get used");
self.ts1s.push_back(ts1);
self.ts2s.push_back(ts2);
self.cnts.push_back(0);
self.mins.push_back(NTY::zero_b());
self.maxs.push_back(NTY::zero_b());
self.avgs.push_back(0.);
self.lsts.push_back(NTY::zero_b());
}
}
impl<NTY: ScalarOps> AppendAllFrom for BinsDim0<NTY> {
fn append_all_from(&mut self, src: &mut Self) {
error!("AppendAllFrom::append_all_from should not get used");
self.ts1s.extend(src.ts1s.drain(..));
self.ts2s.extend(src.ts2s.drain(..));
self.cnts.extend(src.cnts.drain(..));
self.mins.extend(src.mins.drain(..));
self.maxs.extend(src.maxs.drain(..));
self.avgs.extend(src.avgs.drain(..));
self.lsts.extend(src.lsts.drain(..));
}
}
@@ -396,6 +402,7 @@ where
min: STY,
max: STY,
avg: f64,
lst: STY,
filled_up_to: TsNano,
last_seen_avg: f32,
}
@@ -404,12 +411,15 @@ impl<STY> BinsDim0TimeBinnerTy<STY>
where
STY: ScalarOps,
{
pub fn type_name() -> &'static str {
any::type_name::<Self>()
}
pub fn new(binrange: BinnedRange<TsNano>, do_time_weight: bool, emit_empty_bins: bool) -> Self {
// let ts1now = TsNano::from_ns(binrange.bin_off * binrange.bin_len.ns());
// let ts2 = ts1.add_dt_nano(binrange.bin_len.to_dt_nano());
let ts1now = TsNano::from_ns(binrange.full_range().beg());
let ts1now = TsNano::from_ns(binrange.nano_beg().ns());
let ts2now = ts1now.add_dt_nano(binrange.bin_len.to_dt_nano());
let buf = <Self as TimeBinnerTy>::Output::empty();
Self {
ts1now,
ts2now,
@@ -422,6 +432,7 @@ where
min: STY::zero_b(),
max: STY::zero_b(),
avg: 0.,
lst: STY::zero_b(),
filled_up_to: ts1now,
last_seen_avg: 0.,
}
@@ -444,8 +455,9 @@ where
type Output = BinsDim0<STY>;
fn ingest(&mut self, item: &mut Self::Input) {
trace_ingest!("<{} as TimeBinnerTy>::ingest {:?}", Self::type_name(), item);
let mut count_before = 0;
for (((((&ts1, &ts2), &cnt), min), max), &avg) in item
for ((((((&ts1, &ts2), &cnt), min), max), &avg), lst) in item
.ts1s
.iter()
.zip(&item.ts2s)
@@ -453,9 +465,18 @@ where
.zip(&item.mins)
.zip(&item.maxs)
.zip(&item.avgs)
.zip(&item.lsts)
{
if ts1 < self.ts1now.ns() {
if ts2 > self.ts1now.ns() {
error!("{} bad input grid mismatch", Self::type_name());
continue;
}
// warn!("encountered bin from time before {} {}", ts1, self.ts1now.ns());
trace_ingest!("{} input bin before {}", Self::type_name(), TsNano::from_ns(ts1));
self.min = min.clone();
self.max = max.clone();
self.lst = lst.clone();
count_before += 1;
continue;
} else {
@@ -543,7 +564,7 @@ where
if self.do_time_weight {
let f = (self.ts2now.ns() - self.filled_up_to.ns()) as f64
/ (self.ts2now.ns() - self.ts1now.ns()) as f64;
self.avg += self.last_seen_avg as f64 * f;
self.avg += self.lst.as_prim_f32_b() as f64 * f;
self.filled_up_to = self.ts2now;
} else {
panic!("TODO non-time-weighted binning to be impl");
@@ -563,6 +584,7 @@ where
self.out.mins.push_back(self.min.clone());
self.out.maxs.push_back(self.max.clone());
self.out.avgs.push_back(self.avg as f32);
self.out.lsts.push_back(self.lst.clone());
self.reset_agg();
}
}
@@ -823,6 +845,7 @@ impl<NTY: ScalarOps> CollectorType for BinsDim0Collector<NTY> {
vals.mins.append(&mut src.mins);
vals.maxs.append(&mut src.maxs);
vals.avgs.append(&mut src.avgs);
vals.lsts.append(&mut src.lsts);
}
fn set_range_complete(&mut self) {
@@ -1377,6 +1400,7 @@ impl<NTY: ScalarOps> TimeBinned for BinsDim0<NTY> {
dst.mins.extend(self.mins.drain(range.clone()));
dst.maxs.extend(self.maxs.drain(range.clone()));
dst.avgs.extend(self.avgs.drain(range.clone()));
dst.lsts.extend(self.lsts.drain(range.clone()));
Ok(())
} else {
let type_name = any::type_name::<Self>();

View File

@@ -34,6 +34,9 @@ use std::fmt;
use std::time::Duration;
use std::time::SystemTime;
#[allow(unused)]
macro_rules! trace_ingest { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
// TODO maybe rename to ChannelStatus?
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum ConnStatus {
@@ -1062,26 +1065,21 @@ impl TimeBinnerTy for ChannelEventsTimeBinner {
type Output = Box<dyn TimeBinned>;
fn ingest(&mut self, item: &mut Self::Input) {
trace!("{} INGEST {:?}", Self::type_name(), item);
trace_ingest!("{} INGEST {:?}", Self::type_name(), item);
match item {
ChannelEvents::Events(item) => {
if self.binner.is_none() {
let binner = item.time_binner_new(self.binrange.clone(), self.do_time_weight, self.emit_empty_bins);
self.binner = Some(binner);
}
match self.binner.as_mut() {
Some(binner) => binner.ingest(item.as_time_binnable_mut()),
None => {
error!("ingest without active binner item {item:?}");
()
}
}
let binner = self.binner.get_or_insert_with(|| {
item.time_binner_new(self.binrange.clone(), self.do_time_weight, self.emit_empty_bins)
});
binner.ingest(item.as_time_binnable_mut())
}
ChannelEvents::Status(item) => {
warn!("TODO consider channel status in time binning {item:?}");
}
}
trace_ingest!("{} INGEST RETURN {:?}", Self::type_name(), item);
}
fn bins_ready_count(&self) -> usize {
match &self.binner {
Some(binner) => binner.bins_ready_count(),

View File

@@ -566,6 +566,7 @@ impl<STY: ScalarOps> TimeAggregatorCommonV0Trait for EventsDim0Aggregator<STY> {
}
fn common_ingest_unweight_range(&mut self, item: &Self::Input, r: core::ops::Range<usize>) {
panic!("TODO common_ingest_unweight_range");
for (&ts, val) in item.tss.range(r.clone()).zip(item.values.range(r)) {
self.apply_event_unweight(val.clone());
self.count += 1;
@@ -574,7 +575,7 @@ impl<STY: ScalarOps> TimeAggregatorCommonV0Trait for EventsDim0Aggregator<STY> {
}
fn common_ingest_one_before(&mut self, item: &Self::Input, j: usize) {
//trace_ingest!("{self_name} ingest {:6} {:20} {:10?} BEFORE", i1, ts, val);
trace_ingest!("{} common_ingest_one_before {:?} {:?}", Self::type_name(), j, item,);
self.apply_min_max_lst(item.values[j].clone());
self.last_ts = item.tss[j];
}
@@ -676,7 +677,7 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
}
fn ingest_unweight(&mut self, item: &<Self as TimeBinnableTypeAggregator>::Input) {
TimeAggregatorCommonV0Func::ingest_time_weight(self, item)
TimeAggregatorCommonV0Func::ingest_unweight(self, item)
}
fn ingest_time_weight(&mut self, item: &<Self as TimeBinnableTypeAggregator>::Input) {
@@ -1174,6 +1175,7 @@ impl<STY: ScalarOps> TimeBinnerCommonV0Trait for EventsDim0TimeBinner<STY> {
impl<STY: ScalarOps> TimeBinner for EventsDim0TimeBinner<STY> {
fn ingest(&mut self, item: &mut dyn TimeBinnable) {
trace_ingest!("{}::ingest {:?}", Self::type_name(), item);
TimeBinnerCommonV0Func::ingest(self, item)
}

View File

@@ -1,5 +1,6 @@
pub mod accounting;
pub mod binnedcollected;
pub mod binning;
pub mod binsdim0;
pub mod binsxbindim0;
pub mod channelevents;

View File

@@ -11,31 +11,13 @@ use std::collections::VecDeque;
use std::ops::Range;
#[allow(unused)]
macro_rules! trace_ingest {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_ingest_item {
($($arg:tt)*) => {
if false {
info!($($arg)*);
}
};
}
macro_rules! trace_ingest { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace2 {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
macro_rules! trace_ingest_detail { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
pub trait TimeBinnerCommonV0Trait {
type Input: RangeOverlapInfo + 'static;
@@ -59,7 +41,7 @@ impl TimeBinnerCommonV0Func {
B: TimeBinnerCommonV0Trait,
{
let self_name = B::type_name();
trace_ingest_item!(
trace_ingest!(
"TimeBinner for {} ingest common_range_current {:?} item {:?}",
self_name,
binner.common_range_current(),
@@ -74,19 +56,20 @@ impl TimeBinnerCommonV0Func {
// Or consume the input data.
loop {
while item.starts_after(B::common_range_current(binner)) {
trace_ingest_item!("{self_name} ignore item and cycle starts_after");
trace_ingest!("{self_name} ignore item and cycle starts_after");
TimeBinnerCommonV0Func::cycle(binner);
if !B::common_has_more_range(binner) {
debug!("{self_name} no more bin in edges after starts_after");
return;
}
}
if item.ends_before(B::common_range_current(binner)) {
trace_ingest_item!("{self_name} ignore item ends_before");
return;
} else {
// if item.ends_before(B::common_range_current(binner)) {
// trace_ingest_item!("{self_name} ignore item ends_before");
// return;
// }
{
if !B::common_has_more_range(binner) {
trace_ingest_item!("{self_name} no more bin in edges");
trace_ingest!("{self_name} no more bin in edges");
return;
} else {
if let Some(item) = item
@@ -95,10 +78,10 @@ impl TimeBinnerCommonV0Func {
.downcast_mut::<B::Input>()
{
// TODO collect statistics associated with this request:
trace_ingest_item!("{self_name} FEED THE ITEM...");
trace_ingest!("{self_name} FEED THE ITEM...");
TimeBinnerCommonV0Func::agg_ingest(binner, item);
if item.ends_after(B::common_range_current(binner)) {
trace_ingest_item!(
trace_ingest!(
"{self_name} FED ITEM, ENDS AFTER agg-range {:?}",
B::common_range_current(binner)
);
@@ -107,14 +90,18 @@ impl TimeBinnerCommonV0Func {
warn!("{self_name} no more bin in edges after ingest and cycle");
return;
} else {
trace_ingest_item!("{self_name} item fed, cycled, continue");
trace_ingest!("{self_name} item fed, cycled, continue");
}
} else {
trace_ingest_item!("{self_name} item fed, break");
trace_ingest!("{self_name} item fed, break");
break;
}
} else {
error!("{self_name}::ingest unexpected item type");
error!(
"{self_name}::ingest unexpected item type {} expected {}",
item.type_name(),
any::type_name::<B::Input>()
);
};
}
}
@@ -134,7 +121,7 @@ impl TimeBinnerCommonV0Func {
B: TimeBinnerCommonV0Trait,
{
let self_name = B::type_name();
trace_ingest_item!("{self_name}::push_in_progress push_empty {push_empty}");
trace_ingest!("{self_name}::push_in_progress push_empty {push_empty}");
// TODO expand should be derived from AggKind. Is it still required after all?
// TODO here, the expand means that agg will assume that the current value is kept constant during
// the rest of the time range.
@@ -158,7 +145,7 @@ impl TimeBinnerCommonV0Func {
B: TimeBinnerCommonV0Trait,
{
let self_name = any::type_name::<Self>();
trace_ingest_item!("{self_name}::cycle");
trace_ingest!("{self_name}::cycle");
// TODO refactor this logic.
let n = TimeBinnerCommonV0Trait::common_bins_ready_count(binner);
TimeBinnerCommonV0Func::push_in_progress(binner, true);

View File

@@ -2475,6 +2475,18 @@ impl BinnedRange<TsNano> {
pub fn nano_end(&self) -> TsNano {
self.bin_len.times(self.bin_off + self.bin_cnt)
}
pub fn one_before_bin(&self) -> Self {
Self {
bin_len: self.bin_len,
bin_off: self.bin_off - 1,
bin_cnt: self.bin_cnt + 1,
}
}
pub fn bin_len_dt_ms(&self) -> DtMs {
self.bin_len.to_dt_ms()
}
}
impl<T> BinnedRange<T>
@@ -2501,7 +2513,7 @@ where
}*/
let beg = self.bin_len.times(self.bin_off).as_u64();
let end = self.bin_len.times(self.bin_off + self.bin_cnt).as_u64();
debug!("TODO make generic for pulse");
panic!("TODO make generic for pulse");
NanoRange { beg, end }
}

View File

@@ -7,9 +7,17 @@ use scylla::Session as ScySession;
#[derive(Debug, ThisError)]
#[cstm(name = "ScyllaSchema")]
pub enum Error {
Scylla,
Scylla(#[from] scylla::transport::errors::QueryError),
}
pub async fn schema(rt: RetentionTime, scyco: &ScyllaConfig, scy: &ScySession) -> Result<(), Error> {
todo!()
let table = "binned_scalar_f32";
let cql = format!(
concat!("alter table {}.{}{}", " add lst float"),
&scyco.keyspace,
rt.table_prefix(),
table
);
let _ = scy.query(cql, ()).await;
Ok(())
}

View File

@@ -46,7 +46,10 @@ macro_rules! trace_handle { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
#[cstm(name = "BinCachedGapFill")]
pub enum Error {
CacheReader(#[from] super::cached::reader::Error),
GapFromFiner,
#[error("GapFromFiner({0}, {1}, {2})")]
GapFromFiner(TsNano, TsNano, DtMs),
#[error("MissingBegFromFiner({0}, {1}, {2})")]
MissingBegFromFiner(TsNano, TsNano, DtMs),
#[error("InputBeforeRange({0}, {1})")]
InputBeforeRange(NanoRange, BinnedRange<TsNano>),
SfDatabufferNotSupported,
@@ -153,13 +156,24 @@ impl GapFill {
for (&ts1, &ts2) in bins.ts1s.iter().zip(&bins.ts2s) {
if let Some(last) = self.last_bin_ts2 {
if ts1 != last.ns() {
return Err(Error::GapFromFiner);
return Err(Error::GapFromFiner(
TsNano::from_ns(ts1),
last,
self.range.bin_len_dt_ms(),
));
}
} else if ts1 != self.range.nano_beg().ns() {
return Err(Error::MissingBegFromFiner(
TsNano::from_ns(ts1),
self.range.nano_beg(),
self.range.bin_len_dt_ms(),
));
}
self.last_bin_ts2 = Some(TsNano::from_ns(ts2));
}
if bins.len() != 0 {
bins.clone().drain_into(&mut self.bins_for_cache_write, 0..bins.len());
let mut bins2 = bins.clone();
bins2.drain_into(&mut self.bins_for_cache_write, 0..bins2.len());
}
if self.cache_usage.is_cache_write() {
self.cache_write_intermediate()?;
@@ -239,6 +253,7 @@ impl GapFill {
self.range.bin_len.to_dt_ms()
);
let range_finer = BinnedRange::from_nano_range(range, bin_len_finer);
let range_finer_one_before_bin = range_finer.one_before_bin();
let inp_finer = GapFill::new(
self.dbgname.clone(),
self.ch_conf.clone(),
@@ -248,7 +263,7 @@ impl GapFill {
self.log_level.clone(),
self.ctx.clone(),
self.series,
range_finer.clone(),
range_finer_one_before_bin,
self.do_time_weight,
self.bin_len_layers.clone(),
self.cache_read_provider.clone(),