Improve the fetch of binned data

This commit is contained in:
Dominik Werder
2022-09-07 23:03:40 +02:00
parent 0ea0711d46
commit 9cefa54a26
13 changed files with 725 additions and 446 deletions
+114 -14
View File
@@ -1,8 +1,12 @@
use crate::binsdim0::BinsDim0CollectedResult;
use crate::eventsdim0::EventsDim0;
use crate::{ChannelEvents, ChannelEventsMerger, ConnStatus, Empty};
use crate::{ConnStatusEvent, Error};
use crate::{binned_collected, ChannelEvents, ChannelEventsMerger, Empty, IsoDateTime};
use crate::{ConnStatus, ConnStatusEvent, Error};
use chrono::{TimeZone, Utc};
use futures_util::StreamExt;
use netpod::timeunits::SEC;
use netpod::timeunits::*;
use netpod::{AggKind, BinnedRange, NanoRange, ScalarType, Shape};
use std::time::Duration;
#[test]
fn merge01() {
@@ -23,7 +27,7 @@ fn merge01() {
let inp2: Vec<Result<ChannelEvents, Error>> = Vec::new();
let inp2 = futures_util::stream::iter(inp2);
let inp2 = Box::pin(inp2);
let mut merger = ChannelEventsMerger::new(inp1, inp2);
let mut merger = ChannelEventsMerger::new(vec![inp1, inp2]);
let item = merger.next().await;
assert_eq!(item.as_ref(), events_vec2.get(0));
let item = merger.next().await;
@@ -59,7 +63,7 @@ fn merge02() {
let inp2: Vec<Result<ChannelEvents, Error>> = Vec::new();
let inp2 = futures_util::stream::iter(inp2);
let inp2 = Box::pin(inp2);
let mut merger = ChannelEventsMerger::new(inp1, inp2);
let mut merger = ChannelEventsMerger::new(vec![inp1, inp2]);
let item = merger.next().await;
assert_eq!(item.as_ref(), events_vec2.get(0));
let item = merger.next().await;
@@ -122,7 +126,7 @@ fn merge03() {
let inp2: Vec<Result<ChannelEvents, Error>> = inp2_events_a;
let inp2 = futures_util::stream::iter(inp2);
let inp2 = Box::pin(inp2);
let mut merger = ChannelEventsMerger::new(inp1, inp2);
let mut merger = ChannelEventsMerger::new(vec![inp1, inp2]);
let item = merger.next().await;
assert_eq!(item.as_ref(), events_vec2.get(0));
let item = merger.next().await;
@@ -151,24 +155,23 @@ fn bin01() {
let inp1 = events_vec1;
let inp1 = futures_util::stream::iter(inp1);
let inp1 = Box::pin(inp1);
let inp2 = Box::pin(futures_util::stream::empty());
let mut stream = ChannelEventsMerger::new(inp1, inp2);
let inp2 = Box::pin(futures_util::stream::empty()) as _;
let mut stream = ChannelEventsMerger::new(vec![inp1, inp2]);
let mut coll = None;
let mut binner = None;
let edges: Vec<_> = (0..10).into_iter().map(|t| SEC * 10 * t).collect();
let bin_count_exp = (edges.len() - 1) as u32;
let do_time_weight = true;
while let Some(item) = stream.next().await {
let item = item?;
match item {
ChannelEvents::Events(events) => {
if binner.is_none() {
let bb = events
.as_time_binnable_dyn()
.time_binner_new(edges.clone(), do_time_weight);
let bb = events.as_time_binnable().time_binner_new(edges.clone(), do_time_weight);
binner = Some(bb);
}
let binner = binner.as_mut().unwrap();
binner.ingest(events.as_time_binnable_dyn());
binner.ingest(events.as_time_binnable());
eprintln!("bins_ready_count: {}", binner.bins_ready_count());
if binner.bins_ready_count() > 0 {
let ready = binner.bins_ready();
@@ -176,7 +179,7 @@ fn bin01() {
Some(mut ready) => {
eprintln!("ready {ready:?}");
if coll.is_none() {
coll = Some(ready.as_collectable_mut().new_collector());
coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp));
}
let cl = coll.as_mut().unwrap();
cl.ingest(ready.as_collectable_mut());
@@ -204,7 +207,7 @@ fn bin01() {
Some(mut ready) => {
eprintln!("ready {ready:?}");
if coll.is_none() {
coll = Some(ready.as_collectable_mut().new_collector());
coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp));
}
let cl = coll.as_mut().unwrap();
cl.ingest(ready.as_collectable_mut());
@@ -230,3 +233,100 @@ fn bin01() {
};
tokio::runtime::Runtime::new().unwrap().block_on(fut).unwrap();
}
#[test]
fn bin02() {
const TSBASE: u64 = SEC * 1600000000;
fn val(ts: u64) -> f32 {
2f32 + ((ts / SEC) % 2) as f32 + 0.2 * ((ts / (MS * 100)) % 2) as f32
}
let fut = async {
let mut events_vec1 = Vec::new();
let mut t = TSBASE;
for _ in 0..20 {
let mut events = EventsDim0::empty();
for _ in 0..10 {
events.push(t, t, val(t));
t += MS * 100;
}
events_vec1.push(Ok(ChannelEvents::Events(Box::new(events))));
}
events_vec1.push(Ok(ChannelEvents::RangeComplete));
let inp1 = events_vec1;
let inp1 = futures_util::stream::iter(inp1);
let inp1 = Box::pin(inp1);
let inp2 = Box::pin(futures_util::stream::empty()) as _;
let stream = ChannelEventsMerger::new(vec![inp1, inp2]);
let range = NanoRange {
beg: TSBASE + SEC * 1,
end: TSBASE + SEC * 10,
};
let covering = BinnedRange::covering_range(range, 3).map_err(|e| format!("{e}"))?;
assert_eq!(covering.edges().len(), 10);
let stream = Box::pin(stream);
let collected = binned_collected(
ScalarType::F32,
Shape::Scalar,
AggKind::TimeWeightedScalar,
covering.edges(),
Duration::from_millis(2000),
stream,
)
.await?;
eprintln!("collected {:?}", collected);
Ok::<_, Error>(())
};
tokio::runtime::Runtime::new().unwrap().block_on(fut).unwrap();
}
#[test]
fn bin03() {
const TSBASE: u64 = SEC * 1600000000;
fn val(ts: u64) -> f32 {
2f32 + ((ts / SEC) % 2) as f32 + 0.2 * ((ts / (MS * 100)) % 2) as f32
}
let fut = async {
let mut events_vec1 = Vec::new();
let mut t = TSBASE;
for _ in 0..20 {
let mut events = EventsDim0::empty();
for _ in 0..10 {
events.push(t, t, val(t));
t += MS * 100;
}
events_vec1.push(Ok(ChannelEvents::Events(Box::new(events))));
}
events_vec1.push(Ok(ChannelEvents::RangeComplete));
let inp1 = events_vec1;
let inp1 = futures_util::stream::iter(inp1).enumerate().then(|(i, k)| async move {
if i == 4 {
let _ = tokio::time::sleep(Duration::from_millis(10000)).await;
}
k
});
let edges = (0..10).into_iter().map(|x| TSBASE + SEC * 1 + SEC * x).collect();
let inp1 = Box::pin(inp1) as _;
let timeout = Duration::from_millis(400);
let res = binned_collected(
ScalarType::F32,
Shape::Scalar,
AggKind::TimeWeightedScalar,
edges,
timeout,
inp1,
)
.await?;
let r2: &BinsDim0CollectedResult<f32> = res.as_any().downcast_ref().expect("res seems wrong type");
assert_eq!(SEC * r2.ts_anchor_sec(), TSBASE + SEC);
assert_eq!(r2.counts(), &[10, 10, 10]);
assert_eq!(r2.mins(), &[3.0, 2.0, 3.0]);
assert_eq!(r2.maxs(), &[3.2, 2.2, 3.2]);
assert_eq!(r2.missing_bins(), 6);
assert_eq!(
r2.continue_at(),
Some(IsoDateTime(Utc.timestamp_nanos((TSBASE + SEC * 4) as i64)))
);
Ok::<_, Error>(())
};
tokio::runtime::Runtime::new().unwrap().block_on(fut).unwrap();
}