WIP merger tests

This commit is contained in:
Dominik Werder
2022-11-11 22:21:57 +01:00
parent 88fa03fb4a
commit 67bca9da5e
10 changed files with 504 additions and 387 deletions
+121 -86
View File
@@ -1,9 +1,11 @@
use crate::binsdim0::BinsDim0CollectedResult;
use crate::eventsdim0::EventsDim0;
use crate::{binned_collected, ChannelEvents, ChannelEventsMerger, Empty, IsoDateTime};
use crate::merger::ChannelEventsMerger;
use crate::{binned_collected, ChannelEvents, Empty, Events, IsoDateTime};
use crate::{ConnStatus, ConnStatusEvent, Error};
use chrono::{TimeZone, Utc};
use futures_util::StreamExt;
use items::{RangeCompletableItem, Sitemty, StreamItem};
use netpod::timeunits::*;
use netpod::{AggKind, BinnedRange, NanoRange, ScalarType, Shape};
use std::time::Duration;
@@ -11,20 +13,22 @@ use std::time::Duration;
#[test]
fn merge01() {
let fut = async {
let mut events_vec1 = Vec::new();
let mut events_vec2 = Vec::new();
let mut events_vec1: Vec<Sitemty<ChannelEvents>> = Vec::new();
let mut events_vec2: Vec<Sitemty<ChannelEvents>> = Vec::new();
{
let mut events = EventsDim0::empty();
for i in 0..10 {
events.push(i * 100, i, i as f32 * 100.);
}
events_vec1.push(Ok(ChannelEvents::Events(Box::new(events.clone()))));
events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone()))));
let cev = ChannelEvents::Events(Box::new(events.clone()));
events_vec1.push(Ok(StreamItem::DataItem(RangeCompletableItem::Data(cev))));
let cev = ChannelEvents::Events(Box::new(events.clone()));
events_vec2.push(Ok(StreamItem::DataItem(RangeCompletableItem::Data(cev))));
}
let inp1 = events_vec1;
let inp1 = futures_util::stream::iter(inp1);
let inp1 = Box::pin(inp1);
let inp2: Vec<Result<ChannelEvents, Error>> = Vec::new();
let inp2: Vec<Sitemty<ChannelEvents>> = Vec::new();
let inp2 = futures_util::stream::iter(inp2);
let inp2 = Box::pin(inp2);
let mut merger = ChannelEventsMerger::new(vec![inp1, inp2]);
@@ -39,91 +43,106 @@ fn merge01() {
#[test]
fn merge02() {
let fut = async {
let mut events_vec1 = Vec::new();
let mut events_vec2 = Vec::new();
{
let events_vec1 = {
let mut vec = Vec::new();
let mut events = EventsDim0::empty();
for i in 0..10 {
events.push(i * 100, i, i as f32 * 100.);
}
events_vec1.push(Ok(ChannelEvents::Events(Box::new(events.clone()))));
events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone()))));
}
{
push_evd0(&mut vec, Box::new(events.clone()));
let mut events = EventsDim0::empty();
for i in 10..20 {
events.push(i * 100, i, i as f32 * 100.);
}
events_vec1.push(Ok(ChannelEvents::Events(Box::new(events.clone()))));
events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone()))));
}
push_evd0(&mut vec, Box::new(events.clone()));
vec
};
let exp = events_vec1.clone();
let inp1 = events_vec1;
let inp1 = futures_util::stream::iter(inp1);
let inp1 = Box::pin(inp1);
let inp2: Vec<Result<ChannelEvents, Error>> = Vec::new();
let inp2: Vec<Sitemty<ChannelEvents>> = Vec::new();
let inp2 = futures_util::stream::iter(inp2);
let inp2 = Box::pin(inp2);
let mut merger = ChannelEventsMerger::new(vec![inp1, inp2]);
let item = merger.next().await;
assert_eq!(item.as_ref(), events_vec2.get(0));
assert_eq!(item.as_ref(), exp.get(0));
let item = merger.next().await;
assert_eq!(item.as_ref(), events_vec2.get(1));
assert_eq!(item.as_ref(), exp.get(1));
let item = merger.next().await;
assert_eq!(item.as_ref(), None);
};
tokio::runtime::Runtime::new().unwrap().block_on(fut);
}
fn push_evd0(vec: &mut Vec<Sitemty<ChannelEvents>>, events: Box<dyn Events>) {
let cev = ChannelEvents::Events(events);
vec.push(Ok(StreamItem::DataItem(RangeCompletableItem::Data(cev))));
}
#[test]
fn merge03() {
let fut = async {
let mut events_vec1 = Vec::new();
{
let events_vec1 = {
let mut vec = Vec::new();
let mut events = EventsDim0::empty();
for i in 0..10 {
events.push(i * 100, i, i as f32 * 100.);
}
events_vec1.push(Ok(ChannelEvents::Events(Box::new(events))));
push_evd0(&mut vec, Box::new(events));
let mut events = EventsDim0::empty();
for i in 10..20 {
events.push(i * 100, i, i as f32 * 100.);
}
events_vec1.push(Ok(ChannelEvents::Events(Box::new(events.clone()))));
}
let events_vec1 = events_vec1;
let mut events_vec2 = Vec::new();
{
push_evd0(&mut vec, Box::new(events));
vec
};
let events_vec2 = {
let mut vec = Vec::new();
let mut events = EventsDim0::empty();
for i in 0..10 {
events.push(i * 100, i, i as f32 * 100.);
}
events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone()))));
push_evd0(&mut vec, Box::new(events));
let mut events = EventsDim0::empty();
for i in 10..12 {
events.push(i * 100, i, i as f32 * 100.);
}
events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone()))));
push_evd0(&mut vec, Box::new(events));
let mut events = EventsDim0::empty();
for i in 12..20 {
events.push(i * 100, i, i as f32 * 100.);
}
events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone()))));
}
let events_vec2 = events_vec2;
push_evd0(&mut vec, Box::new(events));
vec
};
let inp2_events_a: Vec<Result<_, Error>> = vec![Ok(ChannelEvents::Status(ConnStatusEvent {
ts: 1199,
status: ConnStatus::Disconnect,
}))];
let inp2_events_b: Vec<Result<_, Error>> = vec![Ok(ChannelEvents::Status(ConnStatusEvent {
ts: 1199,
status: ConnStatus::Disconnect,
}))];
let inp2_events_a = {
let ev = ConnStatusEvent {
ts: 1199,
status: ConnStatus::Disconnect,
};
let item: Sitemty<ChannelEvents> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(
ChannelEvents::Status(ev),
)));
vec![item]
};
let inp2_events_b = {
let ev = ConnStatusEvent {
ts: 1199,
status: ConnStatus::Disconnect,
};
let item: Sitemty<ChannelEvents> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(
ChannelEvents::Status(ev),
)));
vec![item]
};
let inp1 = events_vec1;
let inp1 = futures_util::stream::iter(inp1);
let inp1 = Box::pin(inp1);
let inp2: Vec<Result<ChannelEvents, Error>> = inp2_events_a;
let inp2: Vec<Sitemty<ChannelEvents>> = inp2_events_a;
let inp2 = futures_util::stream::iter(inp2);
let inp2 = Box::pin(inp2);
let mut merger = ChannelEventsMerger::new(vec![inp1, inp2]);
@@ -144,15 +163,17 @@ fn merge03() {
#[test]
fn bin01() {
let fut = async {
let mut events_vec1 = Vec::new();
for j in 0..2 {
let mut events = EventsDim0::empty();
for i in 10 * j..10 * (1 + j) {
events.push(SEC * i, i, 17f32);
let inp1 = {
let mut vec = Vec::new();
for j in 0..2 {
let mut events = EventsDim0::empty();
for i in 10 * j..10 * (1 + j) {
events.push(SEC * i, i, 17f32);
}
push_evd0(&mut vec, Box::new(events));
}
events_vec1.push(Ok(ChannelEvents::Events(Box::new(events))));
}
let inp1 = events_vec1;
vec
};
let inp1 = futures_util::stream::iter(inp1);
let inp1 = Box::pin(inp1);
let inp2 = Box::pin(futures_util::stream::empty()) as _;
@@ -165,36 +186,44 @@ fn bin01() {
while let Some(item) = stream.next().await {
let item = item?;
match item {
ChannelEvents::Events(events) => {
if binner.is_none() {
let bb = events.as_time_binnable().time_binner_new(edges.clone(), do_time_weight);
binner = Some(bb);
}
let binner = binner.as_mut().unwrap();
binner.ingest(events.as_time_binnable());
eprintln!("bins_ready_count: {}", binner.bins_ready_count());
if binner.bins_ready_count() > 0 {
let ready = binner.bins_ready();
match ready {
Some(mut ready) => {
eprintln!("ready {ready:?}");
if coll.is_none() {
coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp));
}
let cl = coll.as_mut().unwrap();
cl.ingest(ready.as_collectable_mut());
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => todo!(),
RangeCompletableItem::Data(item) => match item {
ChannelEvents::Events(events) => {
if binner.is_none() {
let bb = events.as_time_binnable().time_binner_new(edges.clone(), do_time_weight);
binner = Some(bb);
}
None => {
return Err(format!("bins_ready_count but no result").into());
let binner = binner.as_mut().unwrap();
binner.ingest(events.as_time_binnable());
eprintln!("bins_ready_count: {}", binner.bins_ready_count());
if binner.bins_ready_count() > 0 {
let ready = binner.bins_ready();
match ready {
Some(mut ready) => {
eprintln!("ready {ready:?}");
if coll.is_none() {
coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp));
}
let cl = coll.as_mut().unwrap();
cl.ingest(ready.as_collectable_mut());
}
None => {
return Err(format!("bins_ready_count but no result").into());
}
}
}
}
}
ChannelEvents::Status(_) => {
eprintln!("TODO Status");
}
},
},
StreamItem::Log(_) => {
eprintln!("TODO Log");
}
ChannelEvents::Status(_) => {
eprintln!("TODO Status");
}
ChannelEvents::RangeComplete => {
eprintln!("TODO RangeComplete");
StreamItem::Stats(_) => {
eprintln!("TODO Stats");
}
}
}
@@ -249,26 +278,31 @@ fn bin02() {
events.push(t, t, val(t));
t += MS * 100;
}
events_vec1.push(Ok(ChannelEvents::Events(Box::new(events))));
let cev = ChannelEvents::Events(Box::new(events));
events_vec1.push(Ok(StreamItem::DataItem(RangeCompletableItem::Data(cev))));
}
events_vec1.push(Ok(ChannelEvents::RangeComplete));
events_vec1.push(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)));
let inp1 = events_vec1;
let inp1 = futures_util::stream::iter(inp1);
let inp1 = Box::pin(inp1);
let inp2 = Box::pin(futures_util::stream::empty()) as _;
let stream = ChannelEventsMerger::new(vec![inp1, inp2]);
let range = NanoRange {
beg: TSBASE + SEC * 1,
end: TSBASE + SEC * 10,
};
let covering = BinnedRange::covering_range(range, 3).map_err(|e| format!("{e}"))?;
assert_eq!(covering.edges().len(), 10);
if false {
// covering_range result is subject to adjustments, instead, manually choose bin edges
let range = NanoRange {
beg: TSBASE + SEC * 1,
end: TSBASE + SEC * 10,
};
let covering = BinnedRange::covering_range(range, 3).map_err(|e| format!("{e}"))?;
assert_eq!(covering.edges().len(), 6);
}
let edges = (0..10).into_iter().map(|x| TSBASE + SEC * 1 + SEC * x).collect();
let stream = Box::pin(stream);
let collected = binned_collected(
ScalarType::F32,
Shape::Scalar,
AggKind::TimeWeightedScalar,
covering.edges(),
edges,
Duration::from_millis(2000),
stream,
)
@@ -294,9 +328,10 @@ fn bin03() {
events.push(t, t, val(t));
t += MS * 100;
}
events_vec1.push(Ok(ChannelEvents::Events(Box::new(events))));
let cev = ChannelEvents::Events(Box::new(events));
events_vec1.push(Ok(StreamItem::DataItem(RangeCompletableItem::Data(cev))));
}
events_vec1.push(Ok(ChannelEvents::RangeComplete));
events_vec1.push(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)));
let inp1 = events_vec1;
let inp1 = futures_util::stream::iter(inp1).enumerate().then(|(i, k)| async move {
if i == 4 {