Remove traits EventsTimeBinner and BinsTimeBinner
This commit is contained in:
@@ -17,7 +17,6 @@ use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
pub mod binnedt;
|
||||
pub mod binnedt2;
|
||||
pub mod binnedt3;
|
||||
pub mod binnedt4;
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
|
||||
@@ -3,8 +3,8 @@ use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
|
||||
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
|
||||
use crate::agg::streams::{Appendable, StreamItem};
|
||||
use crate::binned::{
|
||||
BinsTimeBinner, EventsTimeBinner, EventsTimeBinnerAggregator, FilterFittingInside, MinMaxAvgAggregator,
|
||||
MinMaxAvgBins, NumOps, RangeCompletableItem, RangeOverlapInfo, ReadPbv, ReadableFromFile, SingleXBinAggregator,
|
||||
FilterFittingInside, MinMaxAvgAggregator, MinMaxAvgBins, NumOps, RangeCompletableItem, RangeOverlapInfo, ReadPbv,
|
||||
ReadableFromFile, SingleXBinAggregator,
|
||||
};
|
||||
use crate::decode::{EventValues, MinMaxAvgScalarEventBatchGen};
|
||||
use crate::frame::makeframe::Framable;
|
||||
@@ -21,131 +21,15 @@ use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use tokio::fs::File;
|
||||
|
||||
// TODO no longer needed
|
||||
pub struct DefaultScalarEventsTimeBinner<VT> {
|
||||
_m1: PhantomData<VT>,
|
||||
}
|
||||
|
||||
impl<NTY> EventsTimeBinner for DefaultScalarEventsTimeBinner<NTY>
|
||||
where
|
||||
NTY: NumOps,
|
||||
{
|
||||
type Input = EventValues<NTY>;
|
||||
type Output = MinMaxAvgBins<NTY>;
|
||||
type Aggregator = MinMaxAvgAggregator<NTY>;
|
||||
|
||||
fn aggregator(range: NanoRange) -> Self::Aggregator {
|
||||
Self::Aggregator::new(range)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO no longer needed
|
||||
pub struct DefaultSingleXBinTimeBinner<VT> {
|
||||
_m1: PhantomData<VT>,
|
||||
}
|
||||
|
||||
impl<NTY> EventsTimeBinner for DefaultSingleXBinTimeBinner<NTY>
|
||||
where
|
||||
NTY: NumOps,
|
||||
{
|
||||
type Input = XBinnedScalarEvents<NTY>;
|
||||
// TODO is that output type good enough for now? Maybe better with a new one also
|
||||
// to distinguish from the earlier one.
|
||||
type Output = MinMaxAvgBins<NTY>;
|
||||
type Aggregator = SingleXBinAggregator<NTY>;
|
||||
|
||||
fn aggregator(range: NanoRange) -> Self::Aggregator {
|
||||
Self::Aggregator::new(range)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DefaultBinsTimeBinner<NTY> {
|
||||
_m1: PhantomData<NTY>,
|
||||
}
|
||||
|
||||
impl<NTY> BinsTimeBinner for DefaultBinsTimeBinner<NTY>
|
||||
where
|
||||
NTY: NumOps,
|
||||
{
|
||||
type Input = MinMaxAvgBins<NTY>;
|
||||
type Output = MinMaxAvgBins<NTY>;
|
||||
|
||||
fn process(inp: Self::Input) -> Self::Output {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Aggregator3Tdim {
|
||||
type InputValue;
|
||||
type OutputValue;
|
||||
}
|
||||
|
||||
pub struct Agg3 {
|
||||
range: NanoRange,
|
||||
count: u64,
|
||||
min: f32,
|
||||
max: f32,
|
||||
sum: f32,
|
||||
sumc: u64,
|
||||
}
|
||||
|
||||
impl Agg3 {
|
||||
fn new(range: NanoRange) -> Self {
|
||||
Self {
|
||||
range,
|
||||
count: 0,
|
||||
min: f32::MAX,
|
||||
max: f32::MIN,
|
||||
sum: f32::NAN,
|
||||
sumc: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn ingest(&mut self, item: &MinMaxAvgScalarEventBatch) {
|
||||
for i1 in 0..item.tss.len() {
|
||||
let ts = item.tss[i1];
|
||||
if ts < self.range.beg {
|
||||
continue;
|
||||
} else if ts >= self.range.end {
|
||||
continue;
|
||||
} else {
|
||||
self.count += 1;
|
||||
self.min = self.min.min(item.mins[i1]);
|
||||
self.max = self.max.max(item.maxs[i1]);
|
||||
let x = item.avgs[i1];
|
||||
if x.is_nan() {
|
||||
} else {
|
||||
if self.sum.is_nan() {
|
||||
self.sum = x;
|
||||
} else {
|
||||
self.sum += x;
|
||||
}
|
||||
self.sumc += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn result(self) -> Vec<MinMaxAvgScalarBinBatch> {
|
||||
let min = if self.min == f32::MAX { f32::NAN } else { self.min };
|
||||
let max = if self.max == f32::MIN { f32::NAN } else { self.max };
|
||||
let avg = if self.sumc == 0 {
|
||||
f32::NAN
|
||||
} else {
|
||||
self.sum / self.sumc as f32
|
||||
};
|
||||
let v = MinMaxAvgScalarBinBatch {
|
||||
ts1s: vec![self.range.beg],
|
||||
ts2s: vec![self.range.end],
|
||||
counts: vec![self.count],
|
||||
mins: vec![min],
|
||||
maxs: vec![max],
|
||||
avgs: vec![avg],
|
||||
};
|
||||
vec![v]
|
||||
}
|
||||
}
|
||||
|
||||
pub trait TimeBinnableTypeAggregator: Send {
|
||||
type Input: TimeBinnableType;
|
||||
type Output: TimeBinnableType;
|
||||
|
||||
@@ -1,9 +1,6 @@
|
||||
use crate::agg::binnedt2::AggregatableTdim2;
|
||||
use crate::agg::binnedt3::{Agg3, BinnedT3Stream};
|
||||
use crate::agg::binnedt4::{
|
||||
DefaultScalarEventsTimeBinner, DefaultSingleXBinTimeBinner, TBinnerStream, TimeBinnableType,
|
||||
TimeBinnableTypeAggregator,
|
||||
};
|
||||
use crate::agg::binnedt4::{TBinnerStream, TimeBinnableType, TimeBinnableTypeAggregator};
|
||||
use crate::agg::enp::{Identity, WaveXBinner, XBinnedScalarEvents};
|
||||
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
|
||||
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
|
||||
@@ -770,29 +767,6 @@ pub trait TimeBins: Send + Unpin + WithLen + Appendable + FilterFittingInside {
|
||||
fn ts2s(&self) -> &Vec<u64>;
|
||||
}
|
||||
|
||||
// TODO remove in favor of the one in binnedt4
|
||||
pub trait EventsTimeBinner: Send + Unpin {
|
||||
type Input: Unpin + RangeOverlapInfo;
|
||||
type Output: TimeBins;
|
||||
type Aggregator: EventsTimeBinnerAggregator<Input = Self::Input, Output = Self::Output> + Unpin;
|
||||
fn aggregator(range: NanoRange) -> Self::Aggregator;
|
||||
}
|
||||
|
||||
// TODO remove in favor of the one in binnedt4
|
||||
pub trait EventsTimeBinnerAggregator: Send {
|
||||
type Input: Unpin;
|
||||
type Output: Unpin;
|
||||
fn range(&self) -> &NanoRange;
|
||||
fn ingest(&mut self, item: &Self::Input);
|
||||
fn result(self) -> Self::Output;
|
||||
}
|
||||
|
||||
pub trait BinsTimeBinner {
|
||||
type Input: TimeBins;
|
||||
type Output: TimeBins;
|
||||
fn process(inp: Self::Input) -> Self::Output;
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct MinMaxAvgBins<NTY> {
|
||||
pub ts1s: Vec<u64>,
|
||||
@@ -1120,27 +1094,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// TODO after refactor get rid of this impl:
|
||||
impl<NTY> EventsTimeBinnerAggregator for MinMaxAvgAggregator<NTY>
|
||||
where
|
||||
NTY: NumOps,
|
||||
{
|
||||
type Input = EventValues<NTY>;
|
||||
type Output = MinMaxAvgBins<NTY>;
|
||||
|
||||
fn range(&self) -> &NanoRange {
|
||||
&self.range
|
||||
}
|
||||
|
||||
fn ingest(&mut self, item: &Self::Input) {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn result(self) -> Self::Output {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MinMaxAvgBinsAggregator<NTY> {
|
||||
range: NanoRange,
|
||||
count: u32,
|
||||
@@ -1262,27 +1215,6 @@ impl<NTY> SingleXBinAggregator<NTY> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY> EventsTimeBinnerAggregator for SingleXBinAggregator<NTY>
|
||||
where
|
||||
NTY: NumOps,
|
||||
{
|
||||
type Input = XBinnedScalarEvents<NTY>;
|
||||
// TODO do I need another type to carry the x-bin count as well? No xbincount is static anyways.
|
||||
type Output = MinMaxAvgBins<NTY>;
|
||||
|
||||
fn range(&self) -> &NanoRange {
|
||||
&self.range
|
||||
}
|
||||
|
||||
fn ingest(&mut self, item: &Self::Input) {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn result(self) -> Self::Output {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
pub trait StreamKind: Clone + Unpin + Send + Sync + 'static {
|
||||
type TBinnedStreamType: Stream<Item = Result<StreamItem<RangeCompletableItem<Self::TBinnedBins>>, Error>> + Send;
|
||||
type XBinnedEvents: XBinnedEvents<Self>;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::agg::binnedt4::{TBinnerStream, TimeBinnableType, TimeBinnableTypeAggregator};
|
||||
use crate::agg::streams::StreamItem;
|
||||
use crate::binned::query::{CacheUsage, PreBinnedQuery};
|
||||
use crate::binned::{EventsTimeBinner, RangeCompletableItem};
|
||||
use crate::binned::RangeCompletableItem;
|
||||
use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead};
|
||||
use crate::frame::inmem::InMemoryFrameAsyncReadStream;
|
||||
use crate::frame::makeframe::{decode_frame, FrameType};
|
||||
|
||||
@@ -3,8 +3,8 @@ use crate::agg::streams::{Appendable, StreamItem};
|
||||
use crate::binned::binnedfrompbv::FetchedPreBinned;
|
||||
use crate::binned::query::{CacheUsage, PreBinnedQuery};
|
||||
use crate::binned::{
|
||||
BinnedStreamKindScalar, BinsTimeBinner, EventsNodeProcessor, EventsTimeBinner, NumOps, PushableIndex,
|
||||
RangeCompletableItem, ReadableFromFile, StreamKind, WithLen,
|
||||
BinnedStreamKindScalar, EventsNodeProcessor, NumOps, PushableIndex, RangeCompletableItem, ReadableFromFile,
|
||||
StreamKind, WithLen,
|
||||
};
|
||||
use crate::cache::pbvfs::PreBinnedScalarValueFetchedStream;
|
||||
use crate::cache::{write_pb_cache_min_max_avg_scalar, CacheFileDesc, MergedFromRemotes, WrittenPbCache};
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
use crate::agg::binnedt4::{
|
||||
DefaultBinsTimeBinner, DefaultScalarEventsTimeBinner, DefaultSingleXBinTimeBinner, TimeBinnableType,
|
||||
};
|
||||
use crate::agg::binnedt4::{DefaultBinsTimeBinner, TimeBinnableType};
|
||||
use crate::agg::enp::{Identity, WaveXBinner};
|
||||
use crate::agg::streams::{Appendable, StreamItem};
|
||||
// use crate::binned::pbv2::{
|
||||
@@ -8,10 +6,7 @@ use crate::agg::streams::{Appendable, StreamItem};
|
||||
// };
|
||||
use crate::binned::pbv::PreBinnedValueStream;
|
||||
use crate::binned::query::PreBinnedQuery;
|
||||
use crate::binned::{
|
||||
BinsTimeBinner, EventsNodeProcessor, EventsTimeBinner, NumOps, PushableIndex, RangeCompletableItem,
|
||||
ReadableFromFile, StreamKind,
|
||||
};
|
||||
use crate::binned::{EventsNodeProcessor, NumOps, PushableIndex, RangeCompletableItem, ReadableFromFile, StreamKind};
|
||||
use crate::cache::node_ix_for_patch;
|
||||
use crate::decode::{
|
||||
BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case,
|
||||
|
||||
Reference in New Issue
Block a user