Add test binned json

This commit is contained in:
Dominik Werder
2023-04-14 15:50:10 +02:00
parent 6500310cf3
commit f20beafd96
13 changed files with 169 additions and 68 deletions

View File

@@ -1,7 +1,11 @@
use crate::ts_offs_from_abs;
use crate::ts_offs_from_abs_with_anchor;
use crate::IsoDateTime;
use crate::RangeOverlapInfo;
use crate::TimeBinnableType;
use crate::TimeBinnableTypeAggregator;
use chrono::TimeZone;
use chrono::Utc;
use err::Error;
use items_0::collect_s::Collectable;
use items_0::collect_s::CollectableType;
@@ -342,13 +346,15 @@ where
}
}
impl<NTY: ScalarOps> WithLen for BinsDim0CollectedResult<NTY> {
fn len(&self) -> usize {
self.mins.len()
}
}
impl<NTY: ScalarOps> Collected for BinsDim0CollectedResult<NTY> {}
impl<NTY> BinsDim0CollectedResult<NTY> {
pub fn len(&self) -> usize {
self.ts1_off_ms.len()
}
pub fn ts_anchor_sec(&self) -> u64 {
self.ts_anchor_sec
}
@@ -450,24 +456,29 @@ impl<NTY: ScalarOps> CollectorType for BinsDim0Collector<NTY> {
_range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Self::Output, Error> {
eprintln!("trying to make a result from {self:?}");
/*let bin_count_exp = if let Some(r) = &binrange {
trace!("trying to make a result from {self:?}");
let bin_count_exp = if let Some(r) = &binrange {
r.bin_count() as u32
} else {
eprintln!("no binrange given");
warn!("no binrange given");
0
};
let bin_count = self.vals.ts1s.len() as u32;
let mut vals = if let Some(x) = self.vals.take() {
x
} else {
return Err(Error::with_msg_no_trace("BinsDim0Collector without vals"));
};
let bin_count = vals.ts1s.len() as u32;
eprintln!(
"-------------- MAKE MISSING BINS bin_count_exp {} bin_count {}",
bin_count_exp, bin_count
);
let (missing_bins, continue_at, finished_at) = if bin_count < bin_count_exp {
match self.vals.ts2s.back() {
match vals.ts2s.back() {
Some(&k) => {
let missing_bins = bin_count_exp - bin_count;
let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64));
let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64;
let u = k + (k - vals.ts1s.back().unwrap()) * missing_bins as u64;
let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64));
(missing_bins, Some(continue_at), Some(finished_at))
}
@@ -479,24 +490,26 @@ impl<NTY: ScalarOps> CollectorType for BinsDim0Collector<NTY> {
} else {
(0, None, None)
};
if self.vals.ts1s.as_slices().1.len() != 0 {
panic!();
if vals.ts1s.as_slices().1.len() != 0 {
warn!("ts1s non-contiguous");
}
if self.vals.ts2s.as_slices().1.len() != 0 {
panic!();
if vals.ts2s.as_slices().1.len() != 0 {
warn!("ts2s non-contiguous");
}
let tst1 = ts_offs_from_abs(self.vals.ts1s.as_slices().0);
let tst2 = ts_offs_from_abs_with_anchor(tst1.0, self.vals.ts2s.as_slices().0);
let counts = mem::replace(&mut self.vals.counts, VecDeque::new());
let mins = mem::replace(&mut self.vals.mins, VecDeque::new());
let maxs = mem::replace(&mut self.vals.maxs, VecDeque::new());
let avgs = mem::replace(&mut self.vals.avgs, VecDeque::new());
let ts1s = vals.ts1s.make_contiguous();
let ts2s = vals.ts2s.make_contiguous();
let (ts_anch, ts1ms, ts1ns) = ts_offs_from_abs(ts1s);
let (ts2ms, ts2ns) = ts_offs_from_abs_with_anchor(ts_anch, ts2s);
let counts = vals.counts;
let mins = vals.mins;
let maxs = vals.maxs;
let avgs = vals.avgs;
let ret = BinsDim0CollectedResult::<NTY> {
ts_anchor_sec: tst1.0,
ts1_off_ms: tst1.1,
ts1_off_ns: tst1.2,
ts2_off_ms: tst2.0,
ts2_off_ns: tst2.1,
ts_anchor_sec: ts_anch,
ts1_off_ms: ts1ms,
ts1_off_ns: ts1ns,
ts2_off_ms: ts2ms,
ts2_off_ns: ts2ns,
counts,
mins,
maxs,
@@ -507,8 +520,7 @@ impl<NTY: ScalarOps> CollectorType for BinsDim0Collector<NTY> {
continue_at,
finished_at,
};
Ok(ret)*/
todo!()
Ok(ret)
}
}

View File

@@ -307,13 +307,15 @@ where
}
}
impl<NTY: ScalarOps> WithLen for BinsXbinDim0CollectedResult<NTY> {
fn len(&self) -> usize {
self.mins.len()
}
}
impl<NTY: ScalarOps> Collected for BinsXbinDim0CollectedResult<NTY> {}
impl<NTY> BinsXbinDim0CollectedResult<NTY> {
pub fn len(&self) -> usize {
self.ts1_off_ms.len()
}
pub fn ts_anchor_sec(&self) -> u64 {
self.ts_anchor_sec
}

View File

@@ -915,6 +915,12 @@ impl AsAnyMut for ChannelEventsCollectorOutput {
}
}
impl WithLen for ChannelEventsCollectorOutput {
fn len(&self) -> usize {
todo!()
}
}
impl items_0::collect_s::ToJsonResult for ChannelEventsCollectorOutput {
fn to_json_result(&self) -> Result<Box<dyn items_0::collect_s::ToJsonBytes>, err::Error> {
todo!()

View File

@@ -39,10 +39,20 @@ use std::collections::VecDeque;
use std::fmt;
use std::mem;
#[allow(unused)]
macro_rules! trace_ingest {
($($arg:tt)*) => {
//let _ = ($($arg)*);
//trace!($($arg)*);
};
}
#[allow(unused)]
macro_rules! trace2 {
(EN$($arg:tt)*) => ();
($($arg:tt)*) => (trace!($($arg)*));
($($arg:tt)*) => {
//let _ = ($($arg)*,);
//trace!($($arg)*);
};
}
#[derive(Clone, PartialEq, Serialize, Deserialize)]
@@ -258,10 +268,6 @@ pub struct EventsDim0CollectorOutput<NTY> {
}
impl<NTY: ScalarOps> EventsDim0CollectorOutput<NTY> {
pub fn len(&self) -> usize {
self.values.len()
}
pub fn ts_anchor_sec(&self) -> u64 {
self.ts_anchor_sec
}
@@ -337,6 +343,12 @@ where
}
}
impl<NTY: ScalarOps> WithLen for EventsDim0CollectorOutput<NTY> {
fn len(&self) -> usize {
self.values.len()
}
}
impl<NTY: ScalarOps> ToJsonResult for EventsDim0CollectorOutput<NTY> {
fn to_json_result(&self) -> Result<Box<dyn ToJsonBytes>, Error> {
let k = serde_json::to_value(self)?;
@@ -498,7 +510,7 @@ impl<NTY: ScalarOps> EventsDim0Aggregator<NTY> {
// TODO reduce clone.. optimize via more traits to factor the trade-offs?
fn apply_min_max(&mut self, val: NTY) {
trace!(
trace_ingest!(
"apply_min_max val {:?} last_val {:?} count {} sumc {:?} min {:?} max {:?}",
val,
self.last_seen_val,
@@ -522,7 +534,7 @@ impl<NTY: ScalarOps> EventsDim0Aggregator<NTY> {
}
fn apply_event_unweight(&mut self, val: NTY) {
trace!("TODO check again result_reset_unweight");
error!("TODO check again result_reset_unweight");
err::todo();
let vf = val.as_prim_f32_b();
self.apply_min_max(val);
@@ -535,7 +547,7 @@ impl<NTY: ScalarOps> EventsDim0Aggregator<NTY> {
fn apply_event_time_weight(&mut self, px: u64, pxbeg: u64) {
if let Some(v) = &self.last_seen_val {
trace!("apply_event_time_weight with v {v:?}");
trace_ingest!("apply_event_time_weight with v {v:?}");
let vf = v.as_prim_f32_b();
let v2 = v.clone();
if px > pxbeg {
@@ -558,7 +570,7 @@ impl<NTY: ScalarOps> EventsDim0Aggregator<NTY> {
}
fn ingest_unweight(&mut self, item: &<Self as TimeBinnableTypeAggregator>::Input) {
trace!("TODO check again result_reset_unweight");
error!("TODO check again result_reset_unweight");
err::todo();
if self.range.is_time() {
for i1 in 0..item.tss.len() {
@@ -583,25 +595,25 @@ impl<NTY: ScalarOps> EventsDim0Aggregator<NTY> {
fn ingest_time_weight(&mut self, item: &<Self as TimeBinnableTypeAggregator>::Input) {
let self_name = any::type_name::<Self>();
trace!("{self_name}::ingest_time_weight item len {}", item.len());
trace_ingest!("{self_name}::ingest_time_weight item len {}", item.len());
if self.range.is_time() {
for i1 in 0..item.tss.len() {
let ts = item.tss[i1];
let val = item.values[i1].clone();
if ts < self.int_ts {
trace!("{self_name} ingest {:6} {:20} {:10?} BEFORE", i1, ts, val);
trace_ingest!("{self_name} ingest {:6} {:20} {:10?} BEFORE", i1, ts, val);
self.events_ignored_count += 1;
self.last_seen_ts = ts;
self.last_seen_val = Some(val);
} else if ts >= self.range.end_u64() {
trace!("{self_name} ingest {:6} {:20} {:10?} AFTER", i1, ts, val);
trace_ingest!("{self_name} ingest {:6} {:20} {:10?} AFTER", i1, ts, val);
self.events_ignored_count += 1;
return;
} else {
trace!("{self_name} ingest {:6} {:20} {:10?} IN", i1, ts, val);
trace_ingest!("{self_name} ingest {:6} {:20} {:10?} IN", i1, ts, val);
if false && self.last_seen_val.is_none() {
// TODO no longer needed or?
trace!(
trace_ingest!(
"call apply_min_max without last val, use current instead {} {:?}",
ts,
val
@@ -712,11 +724,11 @@ impl<NTY: ScalarOps> TimeBinnableTypeAggregator for EventsDim0Aggregator<NTY> {
fn ingest(&mut self, item: &Self::Input) {
if true {
trace!("{} ingest {} events", std::any::type_name::<Self>(), item.len());
trace_ingest!("{} ingest {} events", std::any::type_name::<Self>(), item.len());
}
if false {
for (i, &ts) in item.tss.iter().enumerate() {
trace!("{} ingest {:6} {:20}", std::any::type_name::<Self>(), i, ts);
trace_ingest!("{} ingest {:6} {:20}", std::any::type_name::<Self>(), i, ts);
}
}
if self.do_time_weight {
@@ -1008,19 +1020,19 @@ impl<STY: ScalarOps> TimeBinner for EventsDim0TimeBinner<STY> {
.downcast_ref::<<EventsDim0Aggregator<STY> as TimeBinnableTypeAggregator>::Input>()
{
// TODO collect statistics associated with this request:
trace!("{self_name} FEED THE ITEM...");
trace_ingest!("{self_name} FEED THE ITEM...");
self.agg.ingest(item);
if item.ends_after(self.agg.range()) {
trace!("{self_name} FED ITEM, ENDS AFTER.");
trace_ingest!("{self_name} FED ITEM, ENDS AFTER.");
self.cycle();
if self.rng.is_none() {
warn!("{self_name} no more bin in edges C");
return;
} else {
trace!("{self_name} FED ITEM, CYCLED, CONTINUE.");
trace_ingest!("{self_name} FED ITEM, CYCLED, CONTINUE.");
}
} else {
trace!("{self_name} FED ITEM.");
trace_ingest!("{self_name} FED ITEM.");
break;
}
} else {

View File

@@ -220,10 +220,6 @@ pub struct EventsDim1CollectorOutput<NTY> {
}
impl<NTY: ScalarOps> EventsDim1CollectorOutput<NTY> {
pub fn len(&self) -> usize {
self.values.len()
}
pub fn ts_anchor_sec(&self) -> u64 {
self.ts_anchor_sec
}
@@ -275,6 +271,12 @@ where
}
}
impl<NTY: ScalarOps> WithLen for EventsDim1CollectorOutput<NTY> {
fn len(&self) -> usize {
self.values.len()
}
}
impl<NTY: ScalarOps> ToJsonResult for EventsDim1CollectorOutput<NTY> {
fn to_json_result(&self) -> Result<Box<dyn ToJsonBytes>, Error> {
let k = serde_json::to_value(self)?;

View File

@@ -400,6 +400,12 @@ where
}
}
impl<NTY: ScalarOps> WithLen for EventsXbinDim0CollectorOutput<NTY> {
fn len(&self) -> usize {
self.mins.len()
}
}
impl<NTY> ToJsonResult for EventsXbinDim0CollectorOutput<NTY>
where
NTY: ScalarOps,