This commit is contained in:
Dominik Werder
2023-03-09 15:53:14 +01:00
parent f262f7e9df
commit 431d98ffea
19 changed files with 362 additions and 286 deletions

View File

@@ -6,7 +6,7 @@ use crate::Events;
use crate::WithLen;
use err::Error;
use netpod::BinnedRangeEnum;
use netpod::NanoRange;
use netpod::SeriesRange;
use std::fmt;
pub trait Collector: fmt::Debug + Send {
@@ -16,7 +16,7 @@ pub trait Collector: fmt::Debug + Send {
fn set_timed_out(&mut self);
fn result(
&mut self,
range: Option<NanoRange>,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn Collected>, Error>;
}
@@ -52,7 +52,7 @@ pub trait CollectorDyn: fmt::Debug + Send {
fn result(
&mut self,
range: Option<NanoRange>,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn Collected>, Error>;
}
@@ -95,7 +95,7 @@ impl Collector for TimeBinnedCollector {
fn result(
&mut self,
_range: Option<NanoRange>,
_range: Option<SeriesRange>,
_binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn Collected>, Error> {
todo!()

View File

@@ -4,7 +4,7 @@ use crate::AsAnyRef;
use crate::WithLen;
use err::Error;
use netpod::BinnedRangeEnum;
use netpod::NanoRange;
use netpod::SeriesRange;
use serde::Serialize;
use std::any::Any;
use std::fmt;
@@ -19,7 +19,7 @@ pub trait CollectorType: Send + Unpin + WithLen {
fn set_timed_out(&mut self);
// TODO use this crate's Error instead:
fn result(&mut self, range: Option<NanoRange>, binrange: Option<BinnedRangeEnum>) -> Result<Self::Output, Error>;
fn result(&mut self, range: Option<SeriesRange>, binrange: Option<BinnedRangeEnum>) -> Result<Self::Output, Error>;
}
pub trait Collector: Send + Unpin + WithLen {
@@ -29,7 +29,7 @@ pub trait Collector: Send + Unpin + WithLen {
// TODO factor the required parameters into new struct? Generic over events or binned?
fn result(
&mut self,
range: Option<NanoRange>,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn ToJsonResult>, Error>;
}
@@ -60,7 +60,7 @@ impl<T: CollectorType + 'static> Collector for T {
fn result(
&mut self,
range: Option<NanoRange>,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn ToJsonResult>, Error> {
let ret = T::result(self, range, binrange)?;

View File

@@ -60,7 +60,7 @@ pub trait EmptyForShape {
}
pub trait Empty {
fn empty(dim0kind: Dim0Kind) -> Self;
fn empty() -> Self;
}
pub trait Appendable<STY>: Empty + WithLen {

View File

@@ -107,7 +107,7 @@ pub struct BinnedCollected {
}
impl BinnedCollected {
fn self_name() -> &'static str {
const fn self_name() -> &'static str {
"BinnedCollected"
}
@@ -217,7 +217,7 @@ impl BinnedCollected {
impl Future for BinnedCollected {
type Output = Result<BinnedCollectedResult, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let span = span!(Level::INFO, BinnedCollected::self_name());
let _spg = span.enter();
use Poll::*;

View File

@@ -26,11 +26,10 @@ use netpod::timeunits::SEC;
use netpod::BinnedRange;
use netpod::BinnedRangeEnum;
use netpod::Dim0Kind;
use netpod::NanoRange;
use netpod::SeriesRange;
use num_traits::Zero;
use serde::Deserialize;
use serde::Serialize;
use std::any;
use std::any::Any;
use std::collections::VecDeque;
use std::fmt;
@@ -51,7 +50,7 @@ pub struct BinsDim0<NTY> {
pub mins: VecDeque<NTY>,
pub maxs: VecDeque<NTY>,
pub avgs: VecDeque<f32>,
pub dim0kind: Dim0Kind,
pub dim0kind: Option<Dim0Kind>,
}
impl<NTY> fmt::Debug for BinsDim0<NTY>
@@ -59,7 +58,7 @@ where
NTY: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let self_name = std::any::type_name::<Self>();
let self_name = any::type_name::<Self>();
if true {
write!(
fmt,
@@ -165,7 +164,7 @@ where
}
impl<NTY> Empty for BinsDim0<NTY> {
fn empty(dim0kind: Dim0Kind) -> Self {
fn empty() -> Self {
Self {
ts1s: VecDeque::new(),
ts2s: VecDeque::new(),
@@ -173,7 +172,7 @@ impl<NTY> Empty for BinsDim0<NTY> {
mins: VecDeque::new(),
maxs: VecDeque::new(),
avgs: VecDeque::new(),
dim0kind,
dim0kind: None,
}
}
}
@@ -277,13 +276,14 @@ impl<NTY: ScalarOps> TimeBinnableType for BinsDim0<NTY> {
type Output = BinsDim0<NTY>;
type Aggregator = BinsDim0Aggregator<NTY>;
fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
let self_name = std::any::type_name::<Self>();
fn aggregator(range: SeriesRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
let self_name = any::type_name::<Self>();
debug!(
"TimeBinnableType for {self_name} aggregator() range {:?} x_bin_count {} do_time_weight {}",
range, x_bin_count, do_time_weight
);
Self::Aggregator::new(range, do_time_weight)
//Self::Aggregator::new(range, do_time_weight)
todo!()
}
}
@@ -312,7 +312,7 @@ pub struct BinsDim0CollectedResult<NTY> {
range_final: bool,
#[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")]
timed_out: bool,
#[serde(rename = "missingBins", default, skip_serializing_if = "Zero::is_zero")]
#[serde(rename = "missingBins", default, skip_serializing_if = "crate::is_zero_u32")]
missing_bins: u32,
#[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
continue_at: Option<IsoDateTime>,
@@ -412,7 +412,7 @@ impl<NTY> BinsDim0Collector<NTY> {
impl<NTY> WithLen for BinsDim0Collector<NTY> {
fn len(&self) -> usize {
self.vals.ts1s.len()
self.vals.as_ref().map_or(0, |x| x.ts1s.len())
}
}
@@ -421,14 +421,14 @@ impl<NTY: ScalarOps> CollectorType for BinsDim0Collector<NTY> {
type Output = BinsDim0CollectedResult<NTY>;
fn ingest(&mut self, src: &mut Self::Input) {
trace!("\n\n----------- BinsDim0Collector ingest\n{:?}\n\n", src);
// TODO could be optimized by non-contiguous container.
self.vals.ts1s.append(&mut src.ts1s);
/*self.vals.ts1s.append(&mut src.ts1s);
self.vals.ts2s.append(&mut src.ts2s);
self.vals.counts.append(&mut src.counts);
self.vals.mins.append(&mut src.mins);
self.vals.maxs.append(&mut src.maxs);
self.vals.avgs.append(&mut src.avgs);
self.vals.avgs.append(&mut src.avgs);*/
todo!()
}
fn set_range_complete(&mut self) {
@@ -439,8 +439,12 @@ impl<NTY: ScalarOps> CollectorType for BinsDim0Collector<NTY> {
self.timed_out = true;
}
fn result(&mut self, _range: Option<NanoRange>, binrange: Option<BinnedRangeEnum>) -> Result<Self::Output, Error> {
let bin_count_exp = if let Some(r) = &binrange {
fn result(
&mut self,
_range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Self::Output, Error> {
/*let bin_count_exp = if let Some(r) = &binrange {
r.bin_count() as u32
} else {
eprintln!("no binrange given");
@@ -496,7 +500,8 @@ impl<NTY: ScalarOps> CollectorType for BinsDim0Collector<NTY> {
continue_at,
finished_at,
};
Ok(ret)
Ok(ret)*/
todo!()
}
}
@@ -513,7 +518,7 @@ where
NTY: ScalarOps,
{
fn len(&self) -> usize {
self.vals.len()
self.vals.as_ref().map_or(0, |x| x.len())
}
fn ingest(&mut self, item: &mut dyn items_0::collect_c::Collectable) {
@@ -569,7 +574,7 @@ where
fn result(
&mut self,
range: Option<NanoRange>,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn items_0::collect_c::Collected>, Error> {
match CollectorType::result(self, range, binrange) {
@@ -580,7 +585,7 @@ where
}
pub struct BinsDim0Aggregator<NTY> {
range: NanoRange,
range: SeriesRange,
count: u64,
min: NTY,
max: NTY,
@@ -591,7 +596,7 @@ pub struct BinsDim0Aggregator<NTY> {
}
impl<NTY: ScalarOps> BinsDim0Aggregator<NTY> {
pub fn new(range: NanoRange, _do_time_weight: bool) -> Self {
pub fn new(range: SeriesRange, _do_time_weight: bool) -> Self {
Self {
range,
count: 0,
@@ -608,12 +613,12 @@ impl<NTY: ScalarOps> TimeBinnableTypeAggregator for BinsDim0Aggregator<NTY> {
type Input = BinsDim0<NTY>;
type Output = BinsDim0<NTY>;
fn range(&self) -> &NanoRange {
fn range(&self) -> &SeriesRange {
&self.range
}
fn ingest(&mut self, item: &Self::Input) {
for i1 in 0..item.ts1s.len() {
/*for i1 in 0..item.ts1s.len() {
if item.counts[i1] == 0 {
} else if item.ts2s[i1] <= self.range.beg {
} else if item.ts1s[i1] >= self.range.end {
@@ -633,11 +638,12 @@ impl<NTY: ScalarOps> TimeBinnableTypeAggregator for BinsDim0Aggregator<NTY> {
self.sum += item.avgs[i1];
self.sumc += 1;
}
}
}*/
todo!()
}
fn result_reset(&mut self, range: NanoRange, _expand: bool) -> Self::Output {
if self.sumc > 0 {
fn result_reset(&mut self, range: SeriesRange, _expand: bool) -> Self::Output {
/*if self.sumc > 0 {
self.avg = self.sum / self.sumc as f32;
}
let ret = Self::Output {
@@ -652,7 +658,8 @@ impl<NTY: ScalarOps> TimeBinnableTypeAggregator for BinsDim0Aggregator<NTY> {
self.count = 0;
self.sum = 0f32;
self.sumc = 0;
ret
ret*/
todo!()
}
}
@@ -669,24 +676,26 @@ impl<NTY: ScalarOps> TimeBinnable for BinsDim0<NTY> {
}
pub struct BinsDim0TimeBinner<NTY: ScalarOps> {
edges: VecDeque<u64>,
binrange: BinnedRangeEnum,
do_time_weight: bool,
agg: Option<BinsDim0Aggregator<NTY>>,
ready: Option<<BinsDim0Aggregator<NTY> as TimeBinnableTypeAggregator>::Output>,
range_final: bool,
}
impl<NTY: ScalarOps> BinsDim0TimeBinner<NTY> {
fn new(binrange: BinnedRangeEnum, do_time_weight: bool) -> Self {
Self {
edges,
binrange,
do_time_weight,
agg: None,
ready: None,
range_final: false,
}
}
fn next_bin_range(&mut self) -> Option<NanoRange> {
if self.edges.len() >= 2 {
fn next_bin_range(&mut self) -> Option<SeriesRange> {
/*if self.edges.len() >= 2 {
let ret = NanoRange {
beg: self.edges[0],
end: self.edges[1],
@@ -695,13 +704,14 @@ impl<NTY: ScalarOps> BinsDim0TimeBinner<NTY> {
Some(ret)
} else {
None
}
}*/
todo!()
}
}
impl<NTY: ScalarOps> TimeBinner for BinsDim0TimeBinner<NTY> {
fn ingest(&mut self, item: &dyn TimeBinnable) {
let self_name = std::any::type_name::<Self>();
/*let self_name = any::type_name::<Self>();
if item.len() == 0 {
// Return already here, RangeOverlapInfo would not give much sense.
return;
@@ -765,7 +775,8 @@ impl<NTY: ScalarOps> TimeBinner for BinsDim0TimeBinner<NTY> {
}
}
}
}
}*/
todo!()
}
fn bins_ready_count(&self) -> usize {
@@ -785,7 +796,7 @@ impl<NTY: ScalarOps> TimeBinner for BinsDim0TimeBinner<NTY> {
// TODO there is too much common code between implementors:
fn push_in_progress(&mut self, push_empty: bool) {
// TODO expand should be derived from AggKind. Is it still required after all?
let expand = true;
/*let expand = true;
if let Some(agg) = self.agg.as_mut() {
let dummy_range = NanoRange { beg: 4, end: 5 };
let mut bins = agg.result_reset(dummy_range, expand);
@@ -801,12 +812,13 @@ impl<NTY: ScalarOps> TimeBinner for BinsDim0TimeBinner<NTY> {
}
}
}
}
}*/
todo!()
}
// TODO there is too much common code between implementors:
fn cycle(&mut self) {
let n = self.bins_ready_count();
/*let n = self.bins_ready_count();
self.push_in_progress(true);
if self.bins_ready_count() == n {
if let Some(range) = self.next_bin_range() {
@@ -826,10 +838,13 @@ impl<NTY: ScalarOps> TimeBinner for BinsDim0TimeBinner<NTY> {
} else {
warn!("cycle: no in-progress bin pushed, but also no more bin to add as zero-bin");
}
}
}*/
todo!()
}
fn set_range_complete(&mut self) {}
fn set_range_complete(&mut self) {
self.range_final = true;
}
fn empty(&self) -> Box<dyn items_0::TimeBinned> {
let ret = <BinsDim0Aggregator<NTY> as TimeBinnableTypeAggregator>::Output::empty();

View File

@@ -24,10 +24,13 @@ use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::BinnedRange;
use netpod::BinnedRangeEnum;
use netpod::Dim0Kind;
use netpod::NanoRange;
use netpod::SeriesRange;
use num_traits::Zero;
use serde::Deserialize;
use serde::Serialize;
use std::any;
use std::any::Any;
use std::collections::VecDeque;
use std::fmt;
@@ -49,6 +52,7 @@ pub struct BinsXbinDim0<NTY> {
avgs: VecDeque<f32>,
// TODO could consider more variables:
// ts min/max, pulse min/max, avg of mins, avg of maxs, variances, etc...
dim0kind: Option<Dim0Kind>,
}
impl<NTY> fmt::Debug for BinsXbinDim0<NTY>
@@ -56,7 +60,7 @@ where
NTY: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let self_name = std::any::type_name::<Self>();
let self_name = any::type_name::<Self>();
write!(
fmt,
"{self_name} count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}",
@@ -87,6 +91,7 @@ impl<NTY: ScalarOps> BinsXbinDim0<NTY> {
mins,
maxs,
avgs,
dim0kind: None,
}
}
@@ -174,6 +179,7 @@ impl<NTY> Empty for BinsXbinDim0<NTY> {
mins: VecDeque::new(),
maxs: VecDeque::new(),
avgs: VecDeque::new(),
dim0kind: None,
}
}
}
@@ -185,28 +191,16 @@ impl<NTY> WithLen for BinsXbinDim0<NTY> {
}
impl<NTY> RangeOverlapInfo for BinsXbinDim0<NTY> {
fn ends_before(&self, range: NanoRange) -> bool {
if let Some(&max) = self.ts2s.back() {
max <= range.beg
} else {
true
}
fn ends_before(&self, range: &SeriesRange) -> bool {
todo!()
}
fn ends_after(&self, range: NanoRange) -> bool {
if let Some(&max) = self.ts2s.back() {
max > range.end
} else {
true
}
fn ends_after(&self, range: &SeriesRange) -> bool {
todo!()
}
fn starts_after(&self, range: NanoRange) -> bool {
if let Some(&min) = self.ts1s.front() {
min >= range.end
} else {
true
}
fn starts_after(&self, range: &SeriesRange) -> bool {
todo!()
}
}
@@ -243,13 +237,14 @@ impl<NTY: ScalarOps> TimeBinnableType for BinsXbinDim0<NTY> {
type Output = BinsXbinDim0<NTY>;
type Aggregator = BinsXbinDim0Aggregator<NTY>;
fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
let self_name = std::any::type_name::<Self>();
fn aggregator(range: SeriesRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
/*let self_name = any::type_name::<Self>();
debug!(
"TimeBinnableType for {self_name} aggregator() range {:?} x_bin_count {} do_time_weight {}",
range, x_bin_count, do_time_weight
);
Self::Aggregator::new(range, do_time_weight)
Self::Aggregator::new(range, do_time_weight)*/
todo!()
}
}
@@ -357,17 +352,17 @@ impl<NTY: ScalarOps> ToJsonResult for BinsXbinDim0CollectedResult<NTY> {
#[derive(Debug)]
pub struct BinsXbinDim0Collector<NTY> {
vals: BinsXbinDim0<NTY>,
timed_out: bool,
range_final: bool,
vals: BinsXbinDim0<NTY>,
}
impl<NTY> BinsXbinDim0Collector<NTY> {
pub fn new() -> Self {
Self {
vals: BinsXbinDim0::empty(),
timed_out: false,
range_final: false,
vals: BinsXbinDim0::<NTY>::empty(),
}
}
}
@@ -401,7 +396,11 @@ impl<NTY: ScalarOps> CollectorType for BinsXbinDim0Collector<NTY> {
self.timed_out = true;
}
fn result(&mut self, _range: Option<NanoRange>, binrange: Option<BinnedRangeEnum>) -> Result<Self::Output, Error> {
fn result(
&mut self,
_range: std::option::Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Self::Output, Error> {
let bin_count_exp = if let Some(r) = &binrange {
r.bin_count() as u32
} else {
@@ -478,7 +477,7 @@ where
}
fn ingest(&mut self, item: &mut dyn items_0::collect_c::Collectable) {
trace4!("\n\n••••••••••••••••••••••••••••\nINGEST\n{:?}\n\n", item);
/*trace4!("\n\n••••••••••••••••••••••••••••\nINGEST\n{:?}\n\n", item);
{
{
//let tyid = item.type_id();
@@ -517,7 +516,8 @@ where
} else {
warn!("BinsDim0Collector::ingest unexpected item");
trace!("BinsDim0Collector::ingest unexpected item {:?}", item);
}
}*/
todo!()
}
fn set_range_complete(&mut self) {
@@ -530,7 +530,7 @@ where
fn result(
&mut self,
range: Option<NanoRange>,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn items_0::collect_c::Collected>, Error> {
match CollectorType::result(self, range, binrange) {
@@ -541,7 +541,7 @@ where
}
pub struct BinsXbinDim0Aggregator<NTY> {
range: NanoRange,
range: SeriesRange,
count: u64,
min: NTY,
max: NTY,
@@ -552,7 +552,7 @@ pub struct BinsXbinDim0Aggregator<NTY> {
}
impl<NTY: ScalarOps> BinsXbinDim0Aggregator<NTY> {
pub fn new(range: NanoRange, _do_time_weight: bool) -> Self {
pub fn new(range: SeriesRange, _do_time_weight: bool) -> Self {
Self {
range,
count: 0,
@@ -569,12 +569,12 @@ impl<NTY: ScalarOps> TimeBinnableTypeAggregator for BinsXbinDim0Aggregator<NTY>
type Input = BinsXbinDim0<NTY>;
type Output = BinsXbinDim0<NTY>;
fn range(&self) -> &NanoRange {
fn range(&self) -> &SeriesRange {
&self.range
}
fn ingest(&mut self, item: &Self::Input) {
for i1 in 0..item.ts1s.len() {
/*for i1 in 0..item.ts1s.len() {
if item.counts[i1] == 0 {
} else if item.ts2s[i1] <= self.range.beg {
} else if item.ts1s[i1] >= self.range.end {
@@ -594,11 +594,12 @@ impl<NTY: ScalarOps> TimeBinnableTypeAggregator for BinsXbinDim0Aggregator<NTY>
self.sum += item.avgs[i1];
self.sumc += 1;
}
}
}*/
todo!()
}
fn result_reset(&mut self, range: NanoRange, _expand: bool) -> Self::Output {
if self.sumc > 0 {
fn result_reset(&mut self, range: SeriesRange, _expand: bool) -> Self::Output {
/*if self.sumc > 0 {
self.avg = self.sum / self.sumc as f32;
}
let ret = Self::Output {
@@ -613,7 +614,8 @@ impl<NTY: ScalarOps> TimeBinnableTypeAggregator for BinsXbinDim0Aggregator<NTY>
self.count = 0;
self.sum = 0f32;
self.sumc = 0;
ret
ret*/
todo!()
}
}
@@ -625,12 +627,12 @@ impl<NTY: ScalarOps> TimeBinnable for BinsXbinDim0<NTY> {
fn to_box_to_json_result(&self) -> Box<dyn ToJsonResult> {
let k = serde_json::to_value(self).unwrap();
Box::new(k) as _
Box::new(k)
}
}
pub struct BinsXbinDim0TimeBinner<NTY: ScalarOps> {
edges: VecDeque<u64>,
binrange: BinnedRangeEnum,
do_time_weight: bool,
agg: Option<BinsXbinDim0Aggregator<NTY>>,
ready: Option<<BinsXbinDim0Aggregator<NTY> as TimeBinnableTypeAggregator>::Output>,
@@ -639,7 +641,7 @@ pub struct BinsXbinDim0TimeBinner<NTY: ScalarOps> {
impl<NTY: ScalarOps> BinsXbinDim0TimeBinner<NTY> {
fn new(binrange: BinnedRangeEnum, do_time_weight: bool) -> Self {
Self {
edges,
binrange,
do_time_weight,
agg: None,
ready: None,
@@ -647,7 +649,7 @@ impl<NTY: ScalarOps> BinsXbinDim0TimeBinner<NTY> {
}
fn next_bin_range(&mut self) -> Option<NanoRange> {
if self.edges.len() >= 2 {
/*if self.edges.len() >= 2 {
let ret = NanoRange {
beg: self.edges[0],
end: self.edges[1],
@@ -657,12 +659,14 @@ impl<NTY: ScalarOps> BinsXbinDim0TimeBinner<NTY> {
} else {
None
}
*/
todo!()
}
}
impl<NTY: ScalarOps> TimeBinner for BinsXbinDim0TimeBinner<NTY> {
fn ingest(&mut self, item: &dyn TimeBinnable) {
let self_name = std::any::type_name::<Self>();
/*let self_name = std::any::type_name::<Self>();
if item.len() == 0 {
// Return already here, RangeOverlapInfo would not give much sense.
return;
@@ -726,7 +730,8 @@ impl<NTY: ScalarOps> TimeBinner for BinsXbinDim0TimeBinner<NTY> {
}
}
}
}
}*/
todo!()
}
fn bins_ready_count(&self) -> usize {
@@ -746,7 +751,7 @@ impl<NTY: ScalarOps> TimeBinner for BinsXbinDim0TimeBinner<NTY> {
// TODO there is too much common code between implementors:
fn push_in_progress(&mut self, push_empty: bool) {
// TODO expand should be derived from AggKind. Is it still required after all?
let expand = true;
/*let expand = true;
if let Some(agg) = self.agg.as_mut() {
let dummy_range = NanoRange { beg: 4, end: 5 };
let mut bins = agg.result_reset(dummy_range, expand);
@@ -762,7 +767,8 @@ impl<NTY: ScalarOps> TimeBinner for BinsXbinDim0TimeBinner<NTY> {
}
}
}
}
}*/
todo!()
}
// TODO there is too much common code between implementors:

View File

@@ -11,6 +11,7 @@ use netpod::log::*;
use netpod::BinnedRange;
use netpod::BinnedRangeEnum;
use netpod::NanoRange;
use netpod::SeriesRange;
use serde::Deserialize;
use serde::Serialize;
use std::any::Any;
@@ -605,7 +606,7 @@ impl Collectable for ChannelEvents {
pub struct ChannelEventsTimeBinner {
// TODO `ConnStatus` contains all the changes that can happen to a connection, but
// here we would rather require a simplified current state for binning purpose.
edges: Vec<u64>,
binrange: BinnedRangeEnum,
do_time_weight: bool,
conn_state: ConnStatus,
binner: Option<Box<dyn items_0::TimeBinner>>,
@@ -627,7 +628,7 @@ impl crate::timebin::TimeBinner for ChannelEventsTimeBinner {
match item {
ChannelEvents::Events(item) => {
if self.binner.is_none() {
let binner = item.time_binner_new(self.edges.clone(), self.do_time_weight);
let binner = item.time_binner_new(self.binrange.clone(), self.do_time_weight);
self.binner = Some(binner);
}
match self.binner.as_mut() {
@@ -690,14 +691,14 @@ impl crate::timebin::TimeBinner for ChannelEventsTimeBinner {
impl crate::timebin::TimeBinnable for ChannelEvents {
type TimeBinner = ChannelEventsTimeBinner;
fn time_binner_new(&self, edges: Vec<u64>, do_time_weight: bool) -> Self::TimeBinner {
fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Self::TimeBinner {
// TODO probably wrong?
let (binner, status) = match self {
ChannelEvents::Events(_events) => (None, ConnStatus::Connect),
ChannelEvents::Status(_status) => (None, ConnStatus::Connect),
};
ChannelEventsTimeBinner {
edges,
binrange,
do_time_weight,
conn_state: status,
binner,
@@ -783,7 +784,7 @@ impl items_0::collect_c::Collector for ChannelEventsCollector {
fn result(
&mut self,
range: Option<NanoRange>,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn items_0::collect_c::Collected>, err::Error> {
match self.coll.as_mut() {

View File

@@ -39,7 +39,6 @@ pub struct EventsDim0<NTY> {
pub tss: VecDeque<u64>,
pub pulses: VecDeque<u64>,
pub values: VecDeque<NTY>,
pub dim0kind: Dim0Kind,
}
impl<NTY> EventsDim0<NTY> {
@@ -85,12 +84,11 @@ where
}
impl<NTY> Empty for EventsDim0<NTY> {
fn empty(dim0kind: Dim0Kind) -> Self {
fn empty() -> Self {
Self {
tss: VecDeque::new(),
pulses: VecDeque::new(),
values: VecDeque::new(),
dim0kind,
}
}
}
@@ -223,7 +221,7 @@ impl<NTY> EventsDim0Collector<NTY> {
impl<NTY> WithLen for EventsDim0Collector<NTY> {
fn len(&self) -> usize {
self.vals.map_or(0, |x| x.tss.len())
self.vals.as_ref().map_or(0, |x| x.tss.len())
}
}
@@ -344,7 +342,7 @@ impl<NTY: ScalarOps> items_0::collect_s::CollectorType for EventsDim0Collector<N
fn ingest(&mut self, src: &mut Self::Input) {
if self.vals.is_none() {
self.vals = Some(EventsDim0::empty(src.dim0kind.clone()));
self.vals = Some(EventsDim0::empty());
}
let vals = self.vals.as_mut().unwrap();
vals.tss.append(&mut src.tss);
@@ -360,7 +358,11 @@ impl<NTY: ScalarOps> items_0::collect_s::CollectorType for EventsDim0Collector<N
self.timed_out = true;
}
fn result(&mut self, range: Option<NanoRange>, _binrange: Option<BinnedRangeEnum>) -> Result<Self::Output, Error> {
fn result(
&mut self,
range: Option<SeriesRange>,
_binrange: Option<BinnedRangeEnum>,
) -> Result<Self::Output, Error> {
let self_name = any::type_name::<Self>();
// If we timed out, we want to hint the client from where to continue.
// This is tricky: currently, client can not request a left-exclusive range.
@@ -377,7 +379,13 @@ impl<NTY: ScalarOps> items_0::collect_s::CollectorType for EventsDim0Collector<N
Some(IsoDateTime::from_u64(*ts + netpod::timeunits::MS))
} else {
if let Some(range) = &range {
Some(IsoDateTime::from_u64(range.beg + netpod::timeunits::SEC))
match range {
SeriesRange::TimeRange(x) => Some(IsoDateTime::from_u64(x.beg + netpod::timeunits::SEC)),
SeriesRange::PulseRange(x) => {
error!("TODO emit create continueAt for pulse range");
None
}
}
} else {
warn!("can not determine continue-at parameters");
None
@@ -428,7 +436,7 @@ impl<NTY: ScalarOps> items_0::collect_s::CollectableType for EventsDim0<NTY> {
impl<NTY: ScalarOps> items_0::collect_c::Collector for EventsDim0Collector<NTY> {
fn len(&self) -> usize {
self.vals.map_or(0, |x| x.len())
self.vals.as_ref().map_or(0, |x| x.len())
}
fn ingest(&mut self, item: &mut dyn items_0::collect_c::Collectable) {
@@ -449,7 +457,7 @@ impl<NTY: ScalarOps> items_0::collect_c::Collector for EventsDim0Collector<NTY>
fn result(
&mut self,
range: Option<NanoRange>,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn items_0::collect_c::Collected>, err::Error> {
match items_0::collect_s::CollectorType::result(self, range, binrange) {
@@ -658,7 +666,7 @@ impl<NTY: ScalarOps> EventsDim0Aggregator<NTY> {
mins: [min].into(),
maxs: [max].into(),
avgs: [avg].into(),
dim0kind: self.range.dim0kind(),
dim0kind: Some(self.range.dim0kind()),
}
} else {
error!("TODO result_reset_unweight");
@@ -700,7 +708,7 @@ impl<NTY: ScalarOps> EventsDim0Aggregator<NTY> {
mins: [min].into(),
maxs: [max].into(),
avgs: [avg].into(),
dim0kind: self.range.dim0kind(),
dim0kind: Some(self.range.dim0kind()),
}
} else {
error!("TODO result_reset_time_weight");
@@ -841,17 +849,12 @@ impl<STY: ScalarOps> Events for EventsDim0<STY> {
let tss = self.tss.drain(..n1).collect();
let pulses = self.pulses.drain(..n1).collect();
let values = self.values.drain(..n1).collect();
let ret = Self {
tss,
pulses,
values,
dim0kind: self.dim0kind.clone(),
};
let ret = Self { tss, pulses, values };
Box::new(ret)
}
fn new_empty(&self) -> Box<dyn Events> {
Box::new(Self::empty(self.dim0kind.clone()))
Box::new(Self::empty())
}
fn drain_into(&mut self, dst: &mut Box<dyn Events>, range: (usize, usize)) -> Result<(), items_0::MergeError> {
@@ -939,7 +942,7 @@ pub struct EventsDim0TimeBinner<NTY: ScalarOps> {
rng: Option<SeriesRange>,
agg: EventsDim0Aggregator<NTY>,
ready: Option<<EventsDim0Aggregator<NTY> as TimeBinnableTypeAggregator>::Output>,
range_complete: bool,
range_final: bool,
}
impl<NTY: ScalarOps> EventsDim0TimeBinner<NTY> {
@@ -959,7 +962,7 @@ impl<NTY: ScalarOps> EventsDim0TimeBinner<NTY> {
rng: Some(agg.range.clone()),
agg,
ready: None,
range_complete: false,
range_final: false,
};
Ok(ret)
}
@@ -1096,7 +1099,7 @@ impl<NTY: ScalarOps> TimeBinner for EventsDim0TimeBinner<NTY> {
let range_next = self.next_bin_range();
self.rng = range_next.clone();
if let Some(range) = range_next {
let mut bins = BinsDim0::empty(self.binrange.dim0kind());
let mut bins = BinsDim0::empty();
if range.is_time() {
bins.append_zero(range.beg_u64(), range.end_u64());
} else {
@@ -1120,11 +1123,11 @@ impl<NTY: ScalarOps> TimeBinner for EventsDim0TimeBinner<NTY> {
}
fn set_range_complete(&mut self) {
self.range_complete = true;
self.range_final = true;
}
fn empty(&self) -> Box<dyn items_0::TimeBinned> {
let ret = <EventsDim0Aggregator<NTY> as TimeBinnableTypeAggregator>::Output::empty(self.binrange.dim0kind());
let ret = <EventsDim0Aggregator<NTY> as TimeBinnableTypeAggregator>::Output::empty();
Box::new(ret)
}
}
@@ -1158,7 +1161,7 @@ impl items_0::collect_c::CollectorDyn for EventsDim0CollectorDyn {
fn result(
&mut self,
_range: Option<NanoRange>,
_range: Option<SeriesRange>,
_binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn items_0::collect_c::Collected>, err::Error> {
todo!()
@@ -1190,7 +1193,7 @@ impl<NTY: ScalarOps> items_0::collect_c::CollectorDyn for EventsDim0Collector<NT
fn result(
&mut self,
range: Option<NanoRange>,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn items_0::collect_c::Collected>, err::Error> {
items_0::collect_s::CollectorType::result(self, range, binrange)
@@ -1236,8 +1239,7 @@ mod test_frame {
#[test]
fn events_bincode() {
taskrun::tracing_init().unwrap();
// core::result::Result<items::StreamItem<items::RangeCompletableItem<items_2::channelevents::ChannelEvents>>, err::Error>
let mut events = EventsDim0::empty(Dim0Kind::Time);
let mut events = EventsDim0::empty();
events.push(123, 234, 55f32);
let events = events;
let events: Box<dyn Events> = Box::new(events);

View File

@@ -18,7 +18,7 @@ use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::BinnedRange;
use netpod::BinnedRangeEnum;
use netpod::NanoRange;
use netpod::SeriesRange;
use serde::Deserialize;
use serde::Serialize;
use std::any::Any;
@@ -125,28 +125,16 @@ impl<NTY> WithLen for EventsDim1<NTY> {
}
impl<NTY: ScalarOps> RangeOverlapInfo for EventsDim1<NTY> {
fn ends_before(&self, range: NanoRange) -> bool {
if let Some(&max) = self.tss.back() {
max < range.beg
} else {
true
}
fn ends_before(&self, range: &SeriesRange) -> bool {
todo!()
}
fn ends_after(&self, range: NanoRange) -> bool {
if let Some(&max) = self.tss.back() {
max >= range.end
} else {
true
}
fn ends_after(&self, range: &SeriesRange) -> bool {
todo!()
}
fn starts_after(&self, range: NanoRange) -> bool {
if let Some(&min) = self.tss.front() {
min >= range.end
} else {
true
}
fn starts_after(&self, range: &SeriesRange) -> bool {
todo!()
}
}
@@ -158,7 +146,7 @@ where
type Output = BinsDim0<NTY>;
type Aggregator = EventsDim1Aggregator<NTY>;
fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
fn aggregator(range: SeriesRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
let self_name = std::any::type_name::<Self>();
debug!(
"TimeBinnableType for {self_name} aggregator() range {:?} x_bin_count {} do_time_weight {}",
@@ -296,13 +284,17 @@ impl<NTY: ScalarOps> items_0::collect_s::CollectorType for EventsDim1Collector<N
self.timed_out = true;
}
fn result(&mut self, range: Option<NanoRange>, _binrange: Option<BinnedRangeEnum>) -> Result<Self::Output, Error> {
fn result(
&mut self,
range: Option<SeriesRange>,
_binrange: Option<BinnedRangeEnum>,
) -> Result<Self::Output, Error> {
// If we timed out, we want to hint the client from where to continue.
// This is tricky: currently, client can not request a left-exclusive range.
// We currently give the timestamp of the last event plus a small delta.
// The amount of the delta must take into account what kind of timestamp precision the client
// can parse and handle.
let continue_at = if self.timed_out {
/*let continue_at = if self.timed_out {
if let Some(ts) = self.vals.tss.back() {
Some(IsoDateTime::from_u64(*ts + netpod::timeunits::MS))
} else {
@@ -331,7 +323,8 @@ impl<NTY: ScalarOps> items_0::collect_s::CollectorType for EventsDim1Collector<N
timed_out: self.timed_out,
continue_at,
};
Ok(ret)
Ok(ret)*/
todo!()
}
}
@@ -366,7 +359,7 @@ impl<NTY: ScalarOps> items_0::collect_c::Collector for EventsDim1Collector<NTY>
fn result(
&mut self,
range: Option<NanoRange>,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn items_0::collect_c::Collected>, err::Error> {
match items_0::collect_s::CollectorType::result(self, range, binrange) {
@@ -377,7 +370,7 @@ impl<NTY: ScalarOps> items_0::collect_c::Collector for EventsDim1Collector<NTY>
}
pub struct EventsDim1Aggregator<NTY> {
range: NanoRange,
range: SeriesRange,
count: u64,
min: NTY,
max: NTY,
@@ -404,8 +397,8 @@ impl<NTY> Drop for EventsDim1Aggregator<NTY> {
}
impl<NTY: ScalarOps> EventsDim1Aggregator<NTY> {
pub fn new(range: NanoRange, do_time_weight: bool) -> Self {
let int_ts = range.beg;
pub fn new(range: SeriesRange, do_time_weight: bool) -> Self {
/*let int_ts = range.beg;
Self {
range,
count: 0,
@@ -420,7 +413,8 @@ impl<NTY: ScalarOps> EventsDim1Aggregator<NTY> {
do_time_weight,
events_taken_count: 0,
events_ignored_count: 0,
}
}*/
todo!()
}
// TODO reduce clone.. optimize via more traits to factor the trade-offs?
@@ -461,7 +455,7 @@ impl<NTY: ScalarOps> EventsDim1Aggregator<NTY> {
}
fn apply_event_time_weight(&mut self, ts: u64) {
if let Some(v) = &self.last_seen_val {
/*if let Some(v) = &self.last_seen_val {
let vf = v.as_prim_f32_b();
let v2 = v.clone();
if ts > self.range.beg {
@@ -483,11 +477,12 @@ impl<NTY: ScalarOps> EventsDim1Aggregator<NTY> {
"apply_event_time_weight NO VALUE {}",
ts as i64 - self.range.beg as i64
);
}
}*/
todo!()
}
fn ingest_unweight(&mut self, item: &<Self as TimeBinnableTypeAggregator>::Input) {
trace!("TODO check again result_reset_unweight");
/*trace!("TODO check again result_reset_unweight");
err::todo();
for i1 in 0..item.tss.len() {
let ts = item.tss[i1];
@@ -504,11 +499,12 @@ impl<NTY: ScalarOps> EventsDim1Aggregator<NTY> {
self.count += 1;
self.events_taken_count += 1;
}
}
}*/
todo!()
}
fn ingest_time_weight(&mut self, item: &<Self as TimeBinnableTypeAggregator>::Input) {
let self_name = std::any::type_name::<Self>();
/*let self_name = std::any::type_name::<Self>();
trace!("{self_name}::ingest_time_weight item len {}", item.len());
for i1 in 0..item.tss.len() {
let ts = item.tss[i1];
@@ -546,11 +542,12 @@ impl<NTY: ScalarOps> EventsDim1Aggregator<NTY> {
//self.last_seen_val = Some(val);
self.events_taken_count += 1;
}
}
}*/
todo!()
}
fn result_reset_unweight(&mut self, range: NanoRange, _expand: bool) -> BinsDim0<NTY> {
trace!("TODO check again result_reset_unweight");
fn result_reset_unweight(&mut self, range: SeriesRange, _expand: bool) -> BinsDim0<NTY> {
/*trace!("TODO check again result_reset_unweight");
err::todo();
let (min, max, avg) = if self.sumc > 0 {
let avg = self.sum / self.sumc as f32;
@@ -576,12 +573,13 @@ impl<NTY: ScalarOps> EventsDim1Aggregator<NTY> {
self.sum = 0f32;
self.sumc = 0;
self.did_min_max = false;
ret
ret*/
todo!()
}
fn result_reset_time_weight(&mut self, range: NanoRange, expand: bool) -> BinsDim0<NTY> {
fn result_reset_time_weight(&mut self, range: SeriesRange, expand: bool) -> BinsDim0<NTY> {
// TODO check callsite for correct expand status.
if expand {
/*if expand {
debug!("result_reset_time_weight calls apply_event_time_weight");
self.apply_event_time_weight(self.range.end);
} else {
@@ -613,7 +611,8 @@ impl<NTY: ScalarOps> EventsDim1Aggregator<NTY> {
self.did_min_max = false;
self.min = NTY::zero_b();
self.max = NTY::zero_b();
ret
ret*/
todo!()
}
}
@@ -621,7 +620,7 @@ impl<NTY: ScalarOps> TimeBinnableTypeAggregator for EventsDim1Aggregator<NTY> {
type Input = EventsDim1<NTY>;
type Output = BinsDim0<NTY>;
fn range(&self) -> &NanoRange {
fn range(&self) -> &SeriesRange {
&self.range
}
@@ -641,13 +640,14 @@ impl<NTY: ScalarOps> TimeBinnableTypeAggregator for EventsDim1Aggregator<NTY> {
}
}
fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output {
trace!("result_reset {} {}", range.beg, range.end);
fn result_reset(&mut self, range: SeriesRange, expand: bool) -> Self::Output {
/*trace!("result_reset {} {}", range.beg, range.end);
if self.do_time_weight {
self.result_reset_time_weight(range, expand)
} else {
self.result_reset_unweight(range, expand)
}
}*/
todo!()
}
}
@@ -836,7 +836,7 @@ pub struct EventsDim1TimeBinner<NTY: ScalarOps> {
impl<NTY: ScalarOps> EventsDim1TimeBinner<NTY> {
fn new(binrange: BinnedRangeEnum, do_time_weight: bool) -> Result<Self, Error> {
if edges.len() < 2 {
/*if edges.len() < 2 {
return Err(Error::with_msg_no_trace(format!("need at least 2 edges")));
}
let self_name = std::any::type_name::<Self>();
@@ -854,11 +854,12 @@ impl<NTY: ScalarOps> EventsDim1TimeBinner<NTY> {
ready: None,
range_complete: false,
};
Ok(ret)
Ok(ret)*/
todo!()
}
fn next_bin_range(&mut self) -> Option<NanoRange> {
let self_name = std::any::type_name::<Self>();
fn next_bin_range(&mut self) -> Option<SeriesRange> {
/*let self_name = std::any::type_name::<Self>();
if self.edges.len() >= 3 {
self.edges.pop_front();
let ret = NanoRange {
@@ -871,7 +872,8 @@ impl<NTY: ScalarOps> EventsDim1TimeBinner<NTY> {
self.edges.clear();
trace!("{self_name} next_bin_range None");
None
}
}*/
todo!()
}
}
@@ -891,7 +893,7 @@ impl<NTY: ScalarOps> TimeBinner for EventsDim1TimeBinner<NTY> {
}
fn ingest(&mut self, item: &dyn TimeBinnable) {
let self_name = std::any::type_name::<Self>();
/*let self_name = std::any::type_name::<Self>();
trace2!(
"TimeBinner for EventsDim1TimeBinner {:?}\n{:?}\n------------------------------------",
self.edges.iter().take(2).collect::<Vec<_>>(),
@@ -950,11 +952,12 @@ impl<NTY: ScalarOps> TimeBinner for EventsDim1TimeBinner<NTY> {
};
}
}
}
}*/
todo!()
}
fn push_in_progress(&mut self, push_empty: bool) {
let self_name = std::any::type_name::<Self>();
/*let self_name = std::any::type_name::<Self>();
trace!("{self_name}::push_in_progress");
// TODO expand should be derived from AggKind. Is it still required after all?
// TODO here, the expand means that agg will assume that the current value is kept constant during
@@ -986,11 +989,12 @@ impl<NTY: ScalarOps> TimeBinner for EventsDim1TimeBinner<NTY> {
}
}
}
}
}*/
todo!()
}
fn cycle(&mut self) {
let self_name = std::any::type_name::<Self>();
/*let self_name = std::any::type_name::<Self>();
trace!("{self_name}::cycle");
// TODO refactor this logic.
let n = self.bins_ready_count();
@@ -1013,7 +1017,8 @@ impl<NTY: ScalarOps> TimeBinner for EventsDim1TimeBinner<NTY> {
} else {
warn!("cycle: no in-progress bin pushed, but also no more bin to add as zero-bin");
}
}
}*/
todo!()
}
fn set_range_complete(&mut self) {
@@ -1055,7 +1060,7 @@ impl items_0::collect_c::CollectorDyn for EventsDim1CollectorDyn {
fn result(
&mut self,
_range: Option<NanoRange>,
_range: Option<SeriesRange>,
_binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn items_0::collect_c::Collected>, err::Error> {
todo!()
@@ -1087,7 +1092,7 @@ impl<NTY: ScalarOps> items_0::collect_c::CollectorDyn for EventsDim1Collector<NT
fn result(
&mut self,
range: Option<NanoRange>,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn items_0::collect_c::Collected>, err::Error> {
items_0::collect_s::CollectorType::result(self, range, binrange)

View File

@@ -12,9 +12,10 @@ use items_0::WithLen;
use netpod::log::*;
use netpod::BinnedRange;
use netpod::BinnedRangeEnum;
use netpod::NanoRange;
use netpod::SeriesRange;
use serde::Deserialize;
use serde::Serialize;
use std::any;
use std::any::Any;
use std::collections::VecDeque;
use std::fmt;
@@ -107,25 +108,16 @@ impl<NTY> WithLen for EventsXbinDim0<NTY> {
}
impl<NTY> RangeOverlapInfo for EventsXbinDim0<NTY> {
fn ends_before(&self, range: NanoRange) -> bool {
match self.tss.back() {
Some(&ts) => ts < range.beg,
None => true,
}
fn ends_before(&self, range: &SeriesRange) -> bool {
todo!()
}
fn ends_after(&self, range: NanoRange) -> bool {
match self.tss.back() {
Some(&ts) => ts >= range.end,
None => panic!(),
}
fn ends_after(&self, range: &SeriesRange) -> bool {
todo!()
}
fn starts_after(&self, range: NanoRange) -> bool {
match self.tss.front() {
Some(&ts) => ts >= range.end,
None => panic!(),
}
fn starts_after(&self, range: &SeriesRange) -> bool {
todo!()
}
}
@@ -136,8 +128,8 @@ where
type Output = BinsXbinDim0<NTY>;
type Aggregator = EventsXbinDim0Aggregator<NTY>;
fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
let name = std::any::type_name::<Self>();
fn aggregator(range: SeriesRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
let name = any::type_name::<Self>();
debug!(
"TimeBinnableType for {} aggregator() range {:?} x_bin_count {} do_time_weight {}",
name, range, x_bin_count, do_time_weight
@@ -150,7 +142,7 @@ pub struct EventsXbinDim0Aggregator<NTY>
where
NTY: ScalarOps,
{
range: NanoRange,
range: SeriesRange,
count: u64,
min: NTY,
max: NTY,
@@ -168,9 +160,9 @@ impl<NTY> EventsXbinDim0Aggregator<NTY>
where
NTY: ScalarOps,
{
pub fn new(range: NanoRange, do_time_weight: bool) -> Self {
pub fn new(range: SeriesRange, do_time_weight: bool) -> Self {
Self {
int_ts: range.beg,
int_ts: todo!(),
range,
count: 0,
min: NTY::zero_b(),
@@ -212,7 +204,7 @@ where
fn apply_event_time_weight(&mut self, ts: u64) {
//debug!("apply_event_time_weight");
if let (Some(avg), Some(min), Some(max)) = (self.last_avg, &self.last_min, &self.last_max) {
/*if let (Some(avg), Some(min), Some(max)) = (self.last_avg, &self.last_min, &self.last_max) {
let min2 = min.clone();
let max2 = max.clone();
self.apply_min_max(min2, max2);
@@ -223,11 +215,12 @@ where
}
self.sumc += 1;
self.int_ts = ts;
}
}*/
todo!()
}
fn ingest_unweight(&mut self, item: &EventsXbinDim0<NTY>) {
for i1 in 0..item.tss.len() {
/*for i1 in 0..item.tss.len() {
let ts = item.tss[i1];
let avg = item.avgs[i1];
let min = item.mins[i1].clone();
@@ -238,11 +231,12 @@ where
self.apply_event_unweight(avg, min, max);
self.count += 1;
}
}
}*/
todo!()
}
fn ingest_time_weight(&mut self, item: &EventsXbinDim0<NTY>) {
for i1 in 0..item.tss.len() {
/*for i1 in 0..item.tss.len() {
let ts = item.tss[i1];
let avg = item.avgs[i1];
let min = item.mins[i1].clone();
@@ -262,11 +256,12 @@ where
self.last_min = Some(min);
self.last_max = Some(max);
}
}
}*/
todo!()
}
fn result_reset_unweight(&mut self, range: NanoRange, _expand: bool) -> BinsXbinDim0<NTY> {
let avg = if self.sumc == 0 {
fn result_reset_unweight(&mut self, range: SeriesRange, _expand: bool) -> BinsXbinDim0<NTY> {
/*let avg = if self.sumc == 0 {
0f32
} else {
self.sum / self.sumc as f32
@@ -286,12 +281,13 @@ where
self.max = NTY::zero_b();
self.sum = 0f32;
self.sumc = 0;
ret
ret*/
todo!()
}
fn result_reset_time_weight(&mut self, range: NanoRange, expand: bool) -> BinsXbinDim0<NTY> {
fn result_reset_time_weight(&mut self, range: SeriesRange, expand: bool) -> BinsXbinDim0<NTY> {
// TODO check callsite for correct expand status.
if true || expand {
/*if true || expand {
self.apply_event_time_weight(self.range.end);
}
let avg = {
@@ -313,7 +309,8 @@ where
self.max = NTY::zero_b();
self.sum = 0f32;
self.sumc = 0;
ret
ret*/
todo!()
}
}
@@ -324,7 +321,7 @@ where
type Input = EventsXbinDim0<NTY>;
type Output = BinsXbinDim0<NTY>;
fn range(&self) -> &NanoRange {
fn range(&self) -> &SeriesRange {
&self.range
}
@@ -337,7 +334,7 @@ where
}
}
fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output {
fn result_reset(&mut self, range: SeriesRange, expand: bool) -> Self::Output {
if self.do_time_weight {
self.result_reset_time_weight(range, expand)
} else {
@@ -448,8 +445,12 @@ where
self.timed_out = true;
}
fn result(&mut self, range: Option<NanoRange>, _binrange: Option<BinnedRangeEnum>) -> Result<Self::Output, Error> {
use std::mem::replace;
fn result(
&mut self,
range: Option<SeriesRange>,
_binrange: Option<BinnedRangeEnum>,
) -> Result<Self::Output, Error> {
/*use std::mem::replace;
let continue_at = if self.timed_out {
if let Some(ts) = self.vals.tss.back() {
Some(IsoDateTime::from_u64(*ts + netpod::timeunits::MS))
@@ -484,7 +485,8 @@ where
timed_out: self.timed_out,
continue_at,
};
Ok(ret)
Ok(ret)*/
todo!()
}
}
@@ -521,7 +523,7 @@ where
fn result(
&mut self,
_range: Option<NanoRange>,
_range: Option<SeriesRange>,
_binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn items_0::collect_c::Collected>, Error> {
todo!()

View File

@@ -45,6 +45,10 @@ pub fn bool_is_false(x: &bool) -> bool {
*x == false
}
pub fn is_zero_u32(x: &u32) -> bool {
*x == 0
}
pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, VecDeque<u64>, VecDeque<u64>) {
let ts_anchor_sec = tss.first().map_or(0, |&k| k) / SEC;
let ts_anchor_ns = ts_anchor_sec * SEC;

View File

@@ -50,19 +50,22 @@ impl<T> Transformer for Enumerate2<T> {
}
pub struct Then2<T, F, Fut> {
inp: T,
f: F,
inp: Pin<Box<T>>,
f: Pin<Box<F>>,
fut: Option<Pin<Box<Fut>>>,
}
impl<T, F, Fut> Then2<T, F, Fut>
where
T: Stream,
F: FnMut(<T as Stream>::Item) -> Fut,
Fut: Future,
F: Fn(<T as Stream>::Item) -> Fut,
{
pub fn new(inp: T, f: F) -> Self {
Self { inp, f, fut: None }
Self {
inp: Box::pin(inp),
f: Box::pin(f),
fut: None,
}
}
fn prepare_fut(&mut self, item: <T as Stream>::Item) {
@@ -70,9 +73,18 @@ where
}
}
/*impl<T, F, Fut> Unpin for Then2<T, F, Fut>
where
T: Unpin,
F: Unpin,
Fut: Unpin,
{
}*/
impl<T, F, Fut> Stream for Then2<T, F, Fut>
where
T: Stream + Unpin,
T: Stream,
F: Fn(<T as Stream>::Item) -> Fut,
Fut: Future,
{
type Item = <Fut as Future>::Output;
@@ -80,7 +92,7 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break if let Some(fut) = &mut self.fut {
break if let Some(fut) = self.fut.as_mut() {
match fut.poll_unpin(cx) {
Ready(item) => {
self.fut = None;
@@ -91,6 +103,7 @@ where
} else {
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => {
self.prepare_fut(item);
continue;
}
Ready(None) => Ready(None),
@@ -115,7 +128,7 @@ pub trait TransformerExt {
fn then2<F, Fut>(self, f: F) -> Then2<Self, F, Fut>
where
Self: Transformer + Stream + Sized,
F: FnMut(<Self as Stream>::Item) -> Fut,
F: Fn(<Self as Stream>::Item) -> Fut,
Fut: Future;
}
@@ -130,7 +143,7 @@ impl<T> TransformerExt for T {
fn then2<F, Fut>(self, f: F) -> Then2<Self, F, Fut>
where
Self: Transformer + Stream + Sized,
F: FnMut(<Self as Stream>::Item) -> Fut,
F: Fn(<Self as Stream>::Item) -> Fut,
Fut: Future,
{
Then2::new(self, f)
@@ -147,10 +160,15 @@ impl<T> VecStream<T> {
}
}
impl<T> Stream for VecStream<T> {
/*impl<T> Unpin for VecStream<T> where T: Unpin {}*/
impl<T> Stream for VecStream<T>
where
T: Unpin,
{
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
if let Some(item) = self.inp.pop_front() {
Ready(Some(item))

View File

@@ -1,3 +1,4 @@
use netpod::BinnedRangeEnum;
use std::fmt;
pub trait TimeBinner: fmt::Debug + Unpin {
@@ -27,5 +28,5 @@ pub trait TimeBinner: fmt::Debug + Unpin {
pub trait TimeBinnable: fmt::Debug + Sized {
type TimeBinner: TimeBinner<Input = Self>;
fn time_binner_new(&self, edges: Vec<u64>, do_time_weight: bool) -> Self::TimeBinner;
fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Self::TimeBinner;
}

View File

@@ -1490,6 +1490,12 @@ pub enum PreBinnedPatchCoordEnum {
Pulse(PreBinnedPatchCoord<PulseId>),
}
impl PreBinnedPatchCoordEnum {
pub fn bin_count(&self) -> u64 {
todo!()
}
}
impl FromUrl for PreBinnedPatchCoordEnum {
fn from_url(url: &Url) -> Result<Self, Error> {
todo!()

View File

@@ -7,8 +7,6 @@ use futures_util::StreamExt;
use items_0::TimeBinned;
use items_2::binsdim0::BinsDim0;
use items_2::channelevents::ChannelEvents;
use items_2::empty_binned_dyn_tb;
use items_2::empty_events_dyn_ev;
use netpod::log::*;
use netpod::query::CacheUsage;
use netpod::query::PlainEventsQuery;
@@ -16,9 +14,11 @@ use netpod::timeunits::*;
use netpod::transform::Transform;
use netpod::AggKind;
use netpod::ChannelTyped;
use netpod::Dim0Kind;
use netpod::PreBinnedPatchCoord;
use netpod::PreBinnedPatchIterator;
use netpod::PreBinnedPatchCoordEnum;
use netpod::PreBinnedPatchRange;
use netpod::PreBinnedPatchRangeEnum;
use netpod::ScalarType;
use netpod::Shape;
use scylla::Session as ScySession;
@@ -33,15 +33,17 @@ use std::time::Instant;
pub async fn read_cached_scylla(
series: u64,
chn: &ChannelTyped,
coord: &PreBinnedPatchCoord,
coord: &PreBinnedPatchCoordEnum,
scy: &ScySession,
) -> Result<Option<Box<dyn TimeBinned>>, Error> {
let vals = (
/*let vals = (
series as i64,
(coord.bin_t_len() / SEC) as i32,
(coord.patch_t_len() / SEC) as i32,
coord.ix() as i64,
);
);*/
todo!();
let vals: (i64, i32, i32, i64) = todo!();
let res = scy
.query_iter(
"select counts, avgs, mins, maxs from binned_scalar_f32 where series = ? and bin_len_sec = ? and patch_len_sec = ? and agg_kind = 'dummy-agg-kind' and offset = ?",
@@ -54,7 +56,8 @@ pub async fn read_cached_scylla(
})?;
while let Some(item) = res.next().await {
let row = item.err_conv()?;
let edges = coord.edges();
// let edges = coord.edges();
let edges: Vec<u64> = todo!();
let (counts, avgs, mins, maxs): (Vec<i64>, Vec<f32>, Vec<f32>, Vec<f32>) = row.into_typed().err_conv()?;
let mut counts_mismatch = false;
if edges.len() != counts.len() + 1 {
@@ -102,6 +105,8 @@ pub async fn read_cached_scylla(
avgs,
mins,
maxs,
// TODO:
dim0kind: Some(Dim0Kind::Time),
};
return Ok(Some(Box::new(ret)));
}
@@ -122,7 +127,7 @@ pub async fn read_cached_scylla(
#[allow(unused)]
struct WriteFut<'a> {
chn: &'a ChannelTyped,
coord: &'a PreBinnedPatchCoord,
coord: &'a PreBinnedPatchCoordEnum,
data: &'a dyn TimeBinned,
scy: &'a ScySession,
}
@@ -131,7 +136,7 @@ impl<'a> WriteFut<'a> {
#[allow(unused)]
fn new(
chn: &'a ChannelTyped,
coord: &'a PreBinnedPatchCoord,
coord: &'a PreBinnedPatchCoordEnum,
data: &'a dyn TimeBinned,
scy: &'a ScySession,
) -> Self {
@@ -151,7 +156,7 @@ impl<'a> Future for WriteFut<'a> {
pub fn write_cached_scylla<'a>(
series: u64,
_chn: &'a ChannelTyped,
coord: &'a PreBinnedPatchCoord,
coord: &'a PreBinnedPatchCoordEnum,
//data: &'a dyn TimeBinned,
data: Box<dyn TimeBinned>,
//scy: &'a ScySession,
@@ -163,9 +168,13 @@ pub fn write_cached_scylla<'a>(
let fut = async move {
//let data = unsafe { &*(data_ptr as *const dyn TimeBinned) };
//let scy = unsafe { &*(scy_ptr as *const ScySession) };
let bin_len_sec = (coord.bin_t_len() / SEC) as i32;
todo!();
/*let bin_len_sec = (coord.bin_t_len() / SEC) as i32;
let patch_len_sec = (coord.patch_t_len() / SEC) as i32;
let offset = coord.ix();
let offset = coord.ix();*/
let bin_len_sec = 0i32;
let patch_len_sec = 0i32;
let offset = 0i32;
warn!(
"write_cached_scylla len {} where series = {} and bin_len_sec = {} and patch_len_sec = {} and agg_kind = 'dummy-agg-kind' and offset = {}",
data.counts().len(),
@@ -205,13 +214,13 @@ pub fn write_cached_scylla<'a>(
pub async fn fetch_uncached_data(
series: u64,
chn: ChannelTyped,
coord: PreBinnedPatchCoord,
coord: PreBinnedPatchCoordEnum,
one_before_range: bool,
transform: Transform,
cache_usage: CacheUsage,
scy: Arc<ScySession>,
) -> Result<Option<(Box<dyn TimeBinned>, bool)>, Error> {
info!("fetch_uncached_data {coord:?}");
/*info!("fetch_uncached_data {coord:?}");
// Try to find a higher resolution pre-binned grid which covers the requested patch.
let (bin, complete) = match PreBinnedPatchRange::covering_range(coord.patch_range(), coord.bin_count() + 1) {
Ok(Some(range)) => {
@@ -262,13 +271,14 @@ pub async fn fetch_uncached_data(
//write_cached_scylla(series, &chn, &coord, bin.as_ref(), &scy).await?;
}
}
Ok(Some((bin, complete)))
Ok(Some((bin, complete)))*/
todo!()
}
pub fn fetch_uncached_data_box(
series: u64,
chn: &ChannelTyped,
coord: &PreBinnedPatchCoord,
coord: &PreBinnedPatchCoordEnum,
one_before_range: bool,
transform: Transform,
cache_usage: CacheUsage,
@@ -288,21 +298,24 @@ pub fn fetch_uncached_data_box(
pub async fn fetch_uncached_higher_res_prebinned(
series: u64,
chn: &ChannelTyped,
coord: PreBinnedPatchCoord,
range: PreBinnedPatchRange,
coord: PreBinnedPatchCoordEnum,
range: PreBinnedPatchRangeEnum,
one_before_range: bool,
transform: Transform,
cache_usage: CacheUsage,
scy: Arc<ScySession>,
) -> Result<(Box<dyn TimeBinned>, bool), Error> {
let edges = coord.edges();
/*let edges = coord.edges();
// TODO refine the AggKind scheme or introduce a new BinningOpts type and get time-weight from there.
let do_time_weight = true;
// We must produce some result with correct types even if upstream delivers nothing at all.
let bin0 = empty_binned_dyn_tb(&chn.scalar_type, &chn.shape, &transform);
//let bin0 = empty_binned_dyn_tb(&chn.scalar_type, &chn.shape, &transform);
let bin0 = err::todoval();
let mut time_binner = bin0.time_binner_new(edges.clone(), do_time_weight);
let mut complete = true;
let patch_it = PreBinnedPatchIterator::from_range(range.clone());
//let patch_it = PreBinnedPatchIterator::from_range(range.clone());
let patches_dummy: Vec<PreBinnedPatchCoordEnum> = Vec::new();
let mut patch_it = patches_dummy.into_iter();
for patch_coord in patch_it {
// We request data here for a Coord, meaning that we expect to receive multiple bins.
// The expectation is that we receive a single TimeBinned which contains all bins of that PatchCoord.
@@ -359,24 +372,25 @@ pub async fn fetch_uncached_higher_res_prebinned(
if let Err(msg) = ready.validate() {
error!("pre-binned final issue {} coord {:?}", msg, coord);
}
Ok((ready, complete))
Ok((ready, complete))*/
todo!()
}
pub async fn fetch_uncached_binned_events(
series: u64,
chn: &ChannelTyped,
coord: PreBinnedPatchCoord,
coord: PreBinnedPatchCoordEnum,
one_before_range: bool,
transform: Transform,
scy: Arc<ScySession>,
) -> Result<(Box<dyn TimeBinned>, bool), Error> {
let edges = coord.edges();
/*let edges = coord.edges();
// TODO refine the AggKind scheme or introduce a new BinningOpts type and get time-weight from there.
let do_time_weight = true;
// We must produce some result with correct types even if upstream delivers nothing at all.
//let bin0 = empty_events_dyn_tb(&chn.scalar_type, &chn.shape, &agg_kind);
//let mut time_binner = bin0.time_binner_new(edges.clone(), do_time_weight);
let mut time_binner = empty_events_dyn_ev(&chn.scalar_type, &chn.shape, &transform)?
let mut time_binner = items_2::empty::empty_events_dyn_ev(&chn.scalar_type, &chn.shape)?
.as_time_binnable()
.time_binner_new(edges.clone(), do_time_weight);
// TODO handle deadline better
@@ -460,13 +474,14 @@ pub async fn fetch_uncached_binned_events(
if let Err(msg) = ready.validate() {
error!("time binned invalid {} coord {:?}", msg, coord);
}
Ok((ready, complete))
Ok((ready, complete))*/
todo!()
}
pub async fn pre_binned_value_stream_with_scy(
series: u64,
chn: &ChannelTyped,
coord: &PreBinnedPatchCoord,
coord: &PreBinnedPatchCoordEnum,
one_before_range: bool,
transform: Transform,
cache_usage: CacheUsage,
@@ -490,7 +505,9 @@ pub async fn pre_binned_value_stream_with_scy(
pub async fn pre_binned_value_stream(
series: u64,
chn: &ChannelTyped,
coord: &PreBinnedPatchCoord,
coord: &PreBinnedPatchCoordEnum,
one_before_range: bool,
transform: Transform,
agg_kind: AggKind,
cache_usage: CacheUsage,
scy: Arc<ScySession>,

View File

@@ -7,9 +7,9 @@ use items_0::streamitem::Sitemty;
use items_0::streamitem::StatsItem;
use items_0::streamitem::StreamItem;
use netpod::log::*;
use netpod::BinnedRange;
use netpod::BinnedRangeEnum;
use netpod::DiskStats;
use netpod::NanoRange;
use netpod::SeriesRange;
use std::fmt;
use std::time::Duration;
use std::time::Instant;
@@ -37,8 +37,8 @@ async fn collect_in_span<T, S>(
stream: S,
deadline: Instant,
events_max: u64,
range: Option<NanoRange>,
binrange: Option<BinnedRange>,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn items_0::collect_c::Collected>, Error>
where
S: Stream<Item = Sitemty<T>> + Unpin,
@@ -136,8 +136,8 @@ pub async fn collect<T, S>(
stream: S,
deadline: Instant,
events_max: u64,
range: Option<NanoRange>,
binrange: Option<BinnedRange>,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn items_0::collect_c::Collected>, Error>
where
S: Stream<Item = Sitemty<T>> + Unpin,

View File

@@ -35,9 +35,10 @@ pub async fn plain_events_json(
let events_max = query.events_max();
let evquery = query.clone();
info!("plain_events_json evquery {:?}", evquery);
let ev_agg_kind = evquery.agg_kind().as_ref().map_or(AggKind::Plain, |x| x.clone());
info!("plain_events_json ev_agg_kind {:?}", ev_agg_kind);
let empty = items_2::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape, &ev_agg_kind)?;
//let ev_agg_kind = evquery.agg_kind().as_ref().map_or(AggKind::Plain, |x| x.clone());
//info!("plain_events_json ev_agg_kind {:?}", ev_agg_kind);
error!("TODO feed through transform chain");
let empty = items_2::empty::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape)?;
info!("plain_events_json with empty item {}", empty.type_name());
let empty = ChannelEvents::Events(empty);
let empty = sitem_data(empty);
@@ -50,7 +51,7 @@ pub async fn plain_events_json(
info!("item after merge: {item:?}");
item
});
let stream = RangeFilter2::new(stream, query.range().clone(), evquery.one_before_range());
let stream = RangeFilter2::new(stream, todo!(), evquery.one_before_range());
let stream = stream.map(|item| {
info!("item after rangefilter: {item:?}");
item

View File

@@ -91,7 +91,7 @@ where
trace!("process_item {item:?}");
if self.binner.is_none() {
trace!("process_item call time_binner_new");
let binner = item.time_binner_new(self.edges.clone(), self.do_time_weight);
let binner = item.time_binner_new(todo!(), self.do_time_weight);
self.binner = Some(binner);
}
let binner = self.binner.as_mut().unwrap();

View File

@@ -12,35 +12,33 @@ use items_2::merger::Merger;
use netpod::log::*;
use netpod::query::BinnedQuery;
use netpod::query::PlainEventsQuery;
use netpod::BinnedRange;
use netpod::BinnedRangeEnum;
use netpod::ChConf;
use netpod::Cluster;
use serde_json::Value as JsonValue;
use std::time::Instant;
pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Cluster) -> Result<JsonValue, Error> {
let binned_range = BinnedRange::covering_range(query.range().clone(), query.bin_count())?;
let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?;
let bins_max = 10000;
let do_time_weight = query.agg_kind().do_time_weighted();
//let do_time_weight = query.agg_kind().do_time_weighted();
let deadline = Instant::now() + query.timeout_value();
let empty = items_2::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape, &query.agg_kind())?;
let empty = items_2::empty::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape)?;
error!("TODO feed through transform chain");
let empty = ChannelEvents::Events(empty);
let empty = sitem_data(empty);
let evquery = PlainEventsQuery::new(
query.channel().clone(),
query.range().clone(),
Some(query.agg_kind()),
query.timeout(),
None,
);
error!("TODO add with_deadline to PlainEventsQuery");
todo!();
let evquery = PlainEventsQuery::new(query.channel().clone(), query.range().clone());
let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&evquery, cluster).await?;
// TODO propagate also the max-buf-len for the first stage event reader:
info!("timebinned_json with empty item {empty:?}");
let stream = Merger::new(inps, 128);
let stream = stream::iter([empty]).chain(stream);
let stream = RangeFilter2::new(stream, query.range().clone(), evquery.one_before_range());
let stream = RangeFilter2::new(stream, todo!(), evquery.one_before_range());
let stream = Box::pin(stream);
let stream = TimeBinnedStream::new(stream, binned_range.edges(), do_time_weight, deadline);
let do_time_weight = todo!();
let stream = TimeBinnedStream::new(stream, todo!(), do_time_weight, deadline);
if false {
let mut stream = stream;
let _: Option<Sitemty<Box<dyn items_0::TimeBinned>>> = stream.next().await;