Add support for scalar and add test
This commit is contained in:
@@ -84,6 +84,18 @@ fn get_binned_binary() {
|
|||||||
async fn get_binned_binary_inner() -> Result<(), Error> {
|
async fn get_binned_binary_inner() -> Result<(), Error> {
|
||||||
let rh = require_test_hosts_running()?;
|
let rh = require_test_hosts_running()?;
|
||||||
let cluster = &rh.cluster;
|
let cluster = &rh.cluster;
|
||||||
|
if true {
|
||||||
|
get_binned_channel::<i32>(
|
||||||
|
"scalar-i32-be",
|
||||||
|
"1970-01-01T00:20:10.000Z",
|
||||||
|
"1970-01-01T00:20:50.000Z",
|
||||||
|
3,
|
||||||
|
cluster,
|
||||||
|
true,
|
||||||
|
4,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
if true {
|
if true {
|
||||||
get_binned_channel::<f64>(
|
get_binned_channel::<f64>(
|
||||||
"wave-f64-be-n21",
|
"wave-f64-be-n21",
|
||||||
|
|||||||
@@ -22,8 +22,8 @@ where
|
|||||||
type Input = NTY;
|
type Input = NTY;
|
||||||
type Output = EventValues<NTY>;
|
type Output = EventValues<NTY>;
|
||||||
|
|
||||||
fn process(_inp: EventValues<Self::Input>) -> Self::Output {
|
fn process(inp: EventValues<Self::Input>) -> Self::Output {
|
||||||
todo!()
|
inp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -150,9 +150,10 @@ where
|
|||||||
NTY: NumOps,
|
NTY: NumOps,
|
||||||
{
|
{
|
||||||
range: NanoRange,
|
range: NanoRange,
|
||||||
|
count: u64,
|
||||||
min: Option<NTY>,
|
min: Option<NTY>,
|
||||||
max: Option<NTY>,
|
max: Option<NTY>,
|
||||||
sumc: u32,
|
sumc: u64,
|
||||||
sum: f32,
|
sum: f32,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -163,6 +164,7 @@ where
|
|||||||
pub fn new(range: NanoRange) -> Self {
|
pub fn new(range: NanoRange) -> Self {
|
||||||
Self {
|
Self {
|
||||||
range,
|
range,
|
||||||
|
count: 0,
|
||||||
min: None,
|
min: None,
|
||||||
max: None,
|
max: None,
|
||||||
sumc: 0,
|
sumc: 0,
|
||||||
@@ -216,6 +218,7 @@ where
|
|||||||
self.sum += x;
|
self.sum += x;
|
||||||
self.sumc += 1;
|
self.sumc += 1;
|
||||||
}
|
}
|
||||||
|
self.count += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -229,8 +232,7 @@ where
|
|||||||
Self::Output {
|
Self::Output {
|
||||||
ts1s: vec![self.range.beg],
|
ts1s: vec![self.range.beg],
|
||||||
ts2s: vec![self.range.end],
|
ts2s: vec![self.range.end],
|
||||||
// TODO
|
counts: vec![self.count],
|
||||||
counts: vec![0],
|
|
||||||
mins: vec![self.min],
|
mins: vec![self.min],
|
||||||
maxs: vec![self.max],
|
maxs: vec![self.max],
|
||||||
avgs: vec![avg],
|
avgs: vec![avg],
|
||||||
|
|||||||
@@ -999,10 +999,11 @@ where
|
|||||||
|
|
||||||
pub struct EventValuesAggregator<NTY> {
|
pub struct EventValuesAggregator<NTY> {
|
||||||
range: NanoRange,
|
range: NanoRange,
|
||||||
count: u32,
|
count: u64,
|
||||||
min: Option<NTY>,
|
min: Option<NTY>,
|
||||||
max: Option<NTY>,
|
max: Option<NTY>,
|
||||||
avg: Option<f32>,
|
sumc: u64,
|
||||||
|
sum: f32,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<NTY> EventValuesAggregator<NTY> {
|
impl<NTY> EventValuesAggregator<NTY> {
|
||||||
@@ -1012,7 +1013,8 @@ impl<NTY> EventValuesAggregator<NTY> {
|
|||||||
count: 0,
|
count: 0,
|
||||||
min: None,
|
min: None,
|
||||||
max: None,
|
max: None,
|
||||||
avg: None,
|
sumc: 0,
|
||||||
|
sum: 0f32,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1028,13 +1030,60 @@ where
|
|||||||
&self.range
|
&self.range
|
||||||
}
|
}
|
||||||
|
|
||||||
fn ingest(&mut self, _item: &Self::Input) {
|
fn ingest(&mut self, item: &Self::Input) {
|
||||||
// TODO construct test case to hit this:
|
for i1 in 0..item.tss.len() {
|
||||||
todo!()
|
let ts = item.tss[i1];
|
||||||
|
if ts < self.range.beg {
|
||||||
|
continue;
|
||||||
|
} else if ts >= self.range.end {
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
let v = item.values[i1];
|
||||||
|
let vf = v.as_();
|
||||||
|
self.min = match self.min {
|
||||||
|
None => Some(v),
|
||||||
|
Some(min) => {
|
||||||
|
if v < min {
|
||||||
|
Some(v)
|
||||||
|
} else {
|
||||||
|
Some(min)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
self.max = match self.max {
|
||||||
|
None => Some(v),
|
||||||
|
Some(max) => {
|
||||||
|
if v > max {
|
||||||
|
Some(v)
|
||||||
|
} else {
|
||||||
|
Some(max)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if vf.is_nan() {
|
||||||
|
} else {
|
||||||
|
self.sum += vf;
|
||||||
|
self.sumc += 1;
|
||||||
|
}
|
||||||
|
self.count += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn result(self) -> Self::Output {
|
fn result(self) -> Self::Output {
|
||||||
todo!()
|
let avg = if self.sumc == 0 {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(self.sum / self.sumc as f32)
|
||||||
|
};
|
||||||
|
Self::Output {
|
||||||
|
ts1s: vec![self.range.beg],
|
||||||
|
ts2s: vec![self.range.end],
|
||||||
|
counts: vec![self.count],
|
||||||
|
mins: vec![self.min],
|
||||||
|
maxs: vec![self.max],
|
||||||
|
avgs: vec![avg],
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1043,20 +1092,19 @@ pub struct MinMaxAvgBinsAggregator<NTY> {
|
|||||||
count: u64,
|
count: u64,
|
||||||
min: Option<NTY>,
|
min: Option<NTY>,
|
||||||
max: Option<NTY>,
|
max: Option<NTY>,
|
||||||
|
sumc: u64,
|
||||||
sum: f32,
|
sum: f32,
|
||||||
sumc: u32,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<NTY> MinMaxAvgBinsAggregator<NTY> {
|
impl<NTY> MinMaxAvgBinsAggregator<NTY> {
|
||||||
pub fn new(range: NanoRange) -> Self {
|
pub fn new(range: NanoRange) -> Self {
|
||||||
Self {
|
Self {
|
||||||
range,
|
range,
|
||||||
// TODO: actually count events through the whole pipeline.
|
|
||||||
count: 0,
|
count: 0,
|
||||||
min: None,
|
min: None,
|
||||||
max: None,
|
max: None,
|
||||||
sum: 0f32,
|
|
||||||
sumc: 0,
|
sumc: 0,
|
||||||
|
sum: 0f32,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1115,6 +1163,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
self.count += item.counts[i1];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1128,7 +1177,7 @@ where
|
|||||||
Self::Output {
|
Self::Output {
|
||||||
ts1s: vec![self.range.beg],
|
ts1s: vec![self.range.beg],
|
||||||
ts2s: vec![self.range.end],
|
ts2s: vec![self.range.end],
|
||||||
counts: vec![self.count as u64],
|
counts: vec![self.count],
|
||||||
mins: vec![self.min],
|
mins: vec![self.min],
|
||||||
maxs: vec![self.max],
|
maxs: vec![self.max],
|
||||||
avgs: vec![avg],
|
avgs: vec![avg],
|
||||||
|
|||||||
Reference in New Issue
Block a user