Move binned type, add tests

This commit is contained in:
Dominik Werder
2022-12-05 12:01:19 +01:00
parent 4a250227cd
commit aa74fd4f25
33 changed files with 1988 additions and 699 deletions

View File

@@ -14,6 +14,20 @@ use bytes::BytesMut;
use err::Error;
use std::future::Future;
fn f32_cmp_near(x: f32, y: f32) -> bool {
let x = {
let mut a = x.to_le_bytes();
a[0] &= 0xf0;
f32::from_ne_bytes(a)
};
let y = {
let mut a = y.to_le_bytes();
a[0] &= 0xf0;
f32::from_ne_bytes(a)
};
x == y
}
fn f32_iter_cmp_near<A, B>(a: A, b: B) -> bool
where
A: IntoIterator<Item = f32>,
@@ -25,17 +39,7 @@ where
let x = a.next();
let y = b.next();
if let (Some(x), Some(y)) = (x, y) {
let x = {
let mut a = x.to_ne_bytes();
a[0] &= 0xf0;
f32::from_ne_bytes(a)
};
let y = {
let mut a = y.to_ne_bytes();
a[0] &= 0xf0;
f32::from_ne_bytes(a)
};
if x != y {
if !f32_cmp_near(x, y) {
return false;
}
} else if x.is_some() || y.is_some() {

View File

@@ -1,5 +1,6 @@
use crate::err::ErrConv;
use crate::nodes::require_test_hosts_running;
use crate::test::f32_cmp_near;
use chrono::{DateTime, Utc};
use err::Error;
use http::StatusCode;
@@ -16,7 +17,7 @@ fn binned_d0_json_00() -> Result<(), Error> {
let fut = async {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
let jsv = binned_d0_json(
let jsv = get_binned_json(
Channel {
backend: "test-disk-databuffer".into(),
name: "scalar-i32-be".into(),
@@ -28,17 +29,235 @@ fn binned_d0_json_00() -> Result<(), Error> {
cluster,
)
.await?;
info!("Receveided a response json value: {jsv:?}");
let res: items_2::eventsdim0::EventsDim0CollectorOutput<i32> = serde_json::from_value(jsv)?;
debug!("Receveided a response json value: {jsv:?}");
let res: items_2::binsdim0::BinsDim0CollectedResult<i32> = serde_json::from_value(jsv)?;
// inmem was meant just for functional test, ignores the requested time range
assert_eq!(res.len(), 20);
assert_eq!(res.ts_anchor_sec(), 0);
assert_eq!(res.ts_anchor_sec(), 1200);
assert_eq!(res.len(), 8);
assert_eq!(res.ts1_off_ms()[0], 0);
assert_eq!(res.ts2_off_ms()[0], 5000);
assert_eq!(res.counts()[0], 5);
assert_eq!(res.counts()[1], 10);
assert_eq!(res.counts()[7], 7);
assert_eq!(res.mins()[0], 2405);
assert_eq!(res.maxs()[0], 2409);
assert_eq!(res.mins()[1], 2410);
assert_eq!(res.maxs()[1], 2419);
Ok(())
};
taskrun::run(fut)
}
async fn binned_d0_json(
#[test]
fn binned_d0_json_01() -> Result<(), Error> {
let fut = async {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
let jsv = get_binned_json(
Channel {
backend: "test-disk-databuffer".into(),
name: "scalar-i32-be".into(),
series: None,
},
"1970-01-01T00:20:10.000Z",
"1970-01-01T01:20:30.000Z",
10,
cluster,
)
.await?;
debug!("Receveided a response json value: {jsv:?}");
let res: items_2::binsdim0::BinsDim0CollectedResult<i32> = serde_json::from_value(jsv)?;
// inmem was meant just for functional test, ignores the requested time range
assert_eq!(res.ts_anchor_sec(), 1200);
assert_eq!(res.len(), 13);
assert_eq!(res.range_final(), true);
Ok(())
};
taskrun::run(fut)
}
#[test]
fn binned_d0_json_02() -> Result<(), Error> {
let fut = async {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
let jsv = get_binned_json(
Channel {
backend: "test-disk-databuffer".into(),
name: "wave-f64-be-n21".into(),
series: None,
},
"1970-01-01T00:20:10.000Z",
"1970-01-01T01:20:45.000Z",
10,
cluster,
)
.await?;
debug!("Receveided a response json value: {jsv:?}");
let res: items_2::binsdim0::BinsDim0CollectedResult<f64> = serde_json::from_value(jsv)?;
// inmem was meant just for functional test, ignores the requested time range
assert_eq!(res.ts_anchor_sec(), 1200);
assert_eq!(res.len(), 13);
assert_eq!(res.range_final(), true);
Ok(())
};
taskrun::run(fut)
}
#[test]
fn binned_d0_json_03() -> Result<(), Error> {
let fut = async {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
let jsv = get_binned_json(
Channel {
backend: "test-disk-databuffer".into(),
name: "wave-f64-be-n21".into(),
series: None,
},
// TODO This test was meant to ask `AggKind::DimXBinsN(3)`
"1970-01-01T00:20:10.000Z",
"1970-01-01T01:20:20.000Z",
2,
cluster,
)
.await?;
debug!("Receveided a response json value: {jsv:?}");
let res: items_2::binsdim0::BinsDim0CollectedResult<f64> = serde_json::from_value(jsv)?;
// inmem was meant just for functional test, ignores the requested time range
assert_eq!(res.ts_anchor_sec(), 1200);
assert_eq!(res.len(), 4);
assert_eq!(res.range_final(), true);
assert_eq!(res.counts()[0], 300);
assert_eq!(res.counts()[3], 8);
assert_eq!(f32_cmp_near(res.avgs()[0], 44950.00390625), true);
Ok(())
};
taskrun::run(fut)
}
#[test]
fn binned_d0_json_04() -> Result<(), Error> {
let fut = async {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
let jsv = get_binned_json(
Channel {
backend: "test-disk-databuffer".into(),
name: "const-regular-scalar-i32-be".into(),
series: None,
},
"1970-01-01T00:20:10.000Z",
"1970-01-01T04:20:30.000Z",
// TODO must use AggKind::DimXBins1
20,
cluster,
)
.await?;
debug!("Receveided a response json value: {jsv:?}");
let res: items_2::binsdim0::BinsDim0CollectedResult<i32> = serde_json::from_value(jsv)?;
// inmem was meant just for functional test, ignores the requested time range
assert_eq!(res.ts_anchor_sec(), 1200);
assert_eq!(res.len(), 17);
// TODO I would expect rangeFinal to be set, or?
assert_eq!(res.range_final(), false);
assert_eq!(f32_cmp_near(res.avgs()[0], 42.0), true);
Ok(())
};
taskrun::run(fut)
}
#[test]
fn binned_d0_json_05() -> Result<(), Error> {
let fut = async {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
let jsv = get_binned_json(
Channel {
backend: "test-disk-databuffer".into(),
name: "const-regular-scalar-i32-be".into(),
series: None,
},
"1970-01-01T00:20:10.000Z",
"1970-01-01T10:20:30.000Z",
// TODO must use AggKind::DimXBins1
10,
cluster,
)
.await?;
debug!("Receveided a response json value: {jsv:?}");
let res: items_2::binsdim0::BinsDim0CollectedResult<i32> = serde_json::from_value(jsv)?;
// inmem was meant just for functional test, ignores the requested time range
assert_eq!(res.ts_anchor_sec(), 0);
assert_eq!(res.len(), 3);
assert_eq!(res.range_final(), false);
assert_eq!(f32_cmp_near(res.avgs()[0], 42.0), true);
Ok(())
};
taskrun::run(fut)
}
#[test]
fn binned_d0_json_06() -> Result<(), Error> {
let fut = async {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
let jsv = get_binned_json(
Channel {
backend: "test-disk-databuffer".into(),
name: "const-regular-scalar-i32-be".into(),
series: None,
},
"1970-01-01T00:20:10.000Z",
"1970-01-01T00:20:20.000Z",
// TODO must use AggKind::TimeWeightedScalar
20,
cluster,
)
.await?;
debug!("Receveided a response json value: {jsv:?}");
let res: items_2::binsdim0::BinsDim0CollectedResult<i32> = serde_json::from_value(jsv)?;
// inmem was meant just for functional test, ignores the requested time range
assert_eq!(res.ts_anchor_sec(), 1210);
assert_eq!(res.len(), 20);
assert_eq!(res.range_final(), true);
assert_eq!(f32_cmp_near(res.avgs()[0], 42.0), true);
Ok(())
};
taskrun::run(fut)
}
#[test]
fn binned_d0_json_07() -> Result<(), Error> {
let fut = async {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
let jsv = get_binned_json(
Channel {
backend: "test-disk-databuffer".into(),
name: "const-regular-scalar-i32-be".into(),
series: None,
},
"1970-01-01T00:20:11.000Z",
"1970-01-01T00:30:20.000Z",
// TODO must use AggKind::TimeWeightedScalar
10,
cluster,
)
.await?;
debug!("Receveided a response json value: {jsv:?}");
let res: items_2::binsdim0::BinsDim0CollectedResult<i32> = serde_json::from_value(jsv)?;
// inmem was meant just for functional test, ignores the requested time range
assert_eq!(res.ts_anchor_sec(), 1200);
assert_eq!(res.len(), 11);
assert_eq!(res.range_final(), true);
assert_eq!(f32_cmp_near(res.avgs()[0], 42.0), true);
Ok(())
};
taskrun::run(fut)
}
async fn get_binned_json(
channel: Channel,
beg_date: &str,
end_date: &str,
@@ -55,7 +274,7 @@ async fn binned_d0_json(
let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?;
query.append_to_url(&mut url);
let url = url;
info!("http get {}", url);
debug!("http get {}", url);
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri(url.to_string())
@@ -65,8 +284,11 @@ async fn binned_d0_json(
let client = hyper::Client::new();
let res = client.request(req).await.ec()?;
if res.status() != StatusCode::OK {
error!("client response {:?}", res);
return Err(Error::with_msg_no_trace(format!("bad result {res:?}")));
error!("error response {:?}", res);
let buf = hyper::body::to_bytes(res.into_body()).await.ec()?;
let s = String::from_utf8_lossy(&buf);
error!("body of error response: {s}");
return Err(Error::with_msg_no_trace(format!("error response")));
}
let buf = hyper::body::to_bytes(res.into_body()).await.ec()?;
let s = String::from_utf8_lossy(&buf);

View File

@@ -31,8 +31,8 @@ fn events_plain_json_00() -> Result<(), Error> {
info!("Receveided a response json value: {jsv:?}");
let res: items_2::eventsdim0::EventsDim0CollectorOutput<i32> = serde_json::from_value(jsv)?;
// inmem was meant just for functional test, ignores the requested time range
assert_eq!(res.len(), 20);
assert_eq!(res.ts_anchor_sec(), 0);
assert_eq!(res.len(), 60);
Ok(())
};
taskrun::run(fut)

View File

@@ -1,4 +1,4 @@
use super::binnedjson::ScalarEventsResponse;
#![allow(unused)]
use super::events::get_plain_events_json;
use crate::nodes::require_archapp_test_host_running;
use crate::test::events::ch_gen;
@@ -8,6 +8,8 @@ use netpod::log::*;
#[test]
fn get_events_1() -> Result<(), Error> {
let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) };
#[cfg(DISABLED)]
let fut = async {
let rh = require_archapp_test_host_running()?;
let cluster = &rh.cluster;

View File

@@ -1,117 +1,11 @@
mod channelarchiver;
use crate::err::ErrConv;
use crate::nodes::{require_sls_test_host_running, require_test_hosts_running};
use chrono::{DateTime, Utc};
use err::Error;
use http::StatusCode;
use hyper::Body;
use netpod::log::*;
use netpod::query::{BinnedQuery, CacheUsage, PlainEventsQuery};
use netpod::{f64_close, AppendToUrl};
use netpod::{AggKind, Channel, Cluster, NanoRange, APP_JSON};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use url::Url;
#[test]
fn get_binned_json_0() {
taskrun::run(get_binned_json_0_inner()).unwrap();
}
async fn get_binned_json_0_inner() -> Result<(), Error> {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
get_binned_json_common(
"scalar-i32-be",
"1970-01-01T00:20:10.000Z",
"1970-01-01T01:20:30.000Z",
10,
AggKind::DimXBins1,
cluster,
13,
true,
)
.await
}
#[test]
fn get_binned_json_1() {
taskrun::run(get_binned_json_1_inner()).unwrap();
}
async fn get_binned_json_1_inner() -> Result<(), Error> {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
get_binned_json_common(
"wave-f64-be-n21",
"1970-01-01T00:20:10.000Z",
"1970-01-01T01:20:45.000Z",
10,
AggKind::DimXBins1,
cluster,
13,
true,
)
.await
}
#[test]
fn get_binned_json_2() {
taskrun::run(get_binned_json_2_inner()).unwrap();
}
async fn get_binned_json_2_inner() -> Result<(), Error> {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
get_binned_json_common(
"wave-f64-be-n21",
"1970-01-01T00:20:10.000Z",
"1970-01-01T00:20:20.000Z",
2,
AggKind::DimXBinsN(3),
cluster,
2,
true,
)
.await
}
#[allow(unused)]
fn check_close_events(a: &WaveEventsResponse, b: &WaveEventsResponse, jsstr: &String) -> Result<(), Error> {
match a.is_close(b) {
Ok(true) => Ok(()),
Ok(false) => {
error!("Mismatch, original JSON:\n{}", jsstr);
Err(Error::with_msg_no_trace("mismatch"))
}
Err(e) => {
error!("Mismatch, original JSON:\n{}", jsstr);
Err(e)
}
}
}
fn check_close(a: &BinnedResponse, b: &BinnedResponse, jsstr: &String) -> Result<(), Error> {
match a.is_close(b) {
Ok(true) => Ok(()),
Ok(false) => {
error!("Mismatch, original JSON:\n{}", jsstr);
Err(Error::with_msg_no_trace("mismatch"))
}
Err(e) => {
error!("Mismatch, original JSON:\n{}", jsstr);
Err(e)
}
}
}
#[test]
fn get_sls_archive_1() -> Result<(), Error> {
// TODO OFFENDING TEST
if true {
return Ok(());
}
let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) };
#[cfg(DISABLED)]
let fut = async move {
let rh = require_sls_test_host_running()?;
let cluster = &rh.cluster;
@@ -124,8 +18,8 @@ fn get_sls_archive_1() -> Result<(), Error> {
let endstr = "2021-11-10T01:01:00Z";
let (res, jsstr) =
get_binned_json_common_res(channel, begstr, endstr, 10, AggKind::TimeWeightedScalar, cluster).await?;
let exp = r##"{"avgs":[24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875],"counts":[0,0,0,0,0,0,0,0,0,0,0,0],"finalisedRange":true,"maxs":[24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875],"mins":[24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875],"tsAnchor":1636506000,"tsMs":[0,5000,10000,15000,20000,25000,30000,35000,40000,45000,50000,55000,60000],"tsNs":[0,0,0,0,0,0,0,0,0,0,0,0,0]}"##;
let exp: BinnedResponse = serde_json::from_str(exp).unwrap();
let exp = r##"{"avgs":[24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875],"counts":[0,0,0,0,0,0,0,0,0,0,0,0],"rangeFinal":true,"maxs":[24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875],"mins":[24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875],"tsAnchor":1636506000,"tsMs":[0,5000,10000,15000,20000,25000,30000,35000,40000,45000,50000,55000,60000],"tsNs":[0,0,0,0,0,0,0,0,0,0,0,0,0]}"##;
let exp: String = serde_json::from_str(exp).unwrap();
check_close(&res, &exp, &jsstr)?;
Ok(())
};
@@ -134,6 +28,8 @@ fn get_sls_archive_1() -> Result<(), Error> {
#[test]
fn get_sls_archive_3() -> Result<(), Error> {
let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) };
#[cfg(DISABLED)]
let fut = async move {
let rh = require_sls_test_host_running()?;
let cluster = &rh.cluster;
@@ -156,6 +52,8 @@ fn get_sls_archive_3() -> Result<(), Error> {
#[test]
fn get_sls_archive_wave_2() -> Result<(), Error> {
let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) };
#[cfg(DISABLED)]
let fut = async move {
let rh = require_sls_test_host_running()?;
let cluster = &rh.cluster;
@@ -175,275 +73,3 @@ fn get_sls_archive_wave_2() -> Result<(), Error> {
};
taskrun::run(fut)
}
async fn get_binned_json_common(
channel_name: &str,
beg_date: &str,
end_date: &str,
bin_count: u32,
agg_kind: AggKind,
cluster: &Cluster,
expect_bin_count: u32,
expect_finalised_range: bool,
) -> Result<(), Error> {
let t1 = Utc::now();
let node0 = &cluster.nodes[0];
let beg_date: DateTime<Utc> = beg_date.parse()?;
let end_date: DateTime<Utc> = end_date.parse()?;
let channel_backend = "testbackend";
let channel = Channel {
backend: channel_backend.into(),
name: channel_name.into(),
series: None,
};
let range = NanoRange::from_date_time(beg_date, end_date);
let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind);
query.set_timeout(Duration::from_millis(15000));
query.set_cache_usage(CacheUsage::Ignore);
let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", node0.host, node0.port))?;
query.append_to_url(&mut url);
let url = url;
debug!("get_binned_json_common get {}", url);
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri(url.to_string())
.header(http::header::ACCEPT, APP_JSON)
.body(Body::empty())
.ec()?;
let client = hyper::Client::new();
let res = client.request(req).await.ec()?;
if res.status() != StatusCode::OK {
error!("get_binned_json_common client response {:?}", res);
}
let res = hyper::body::to_bytes(res.into_body()).await.ec()?;
let t2 = chrono::Utc::now();
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
debug!("get_binned_json_common DONE time {} ms", ms);
let res = String::from_utf8_lossy(&res).to_string();
let res: serde_json::Value = serde_json::from_str(res.as_str())?;
// TODO assert more
debug!(
"result from endpoint: --------------\n{}\n--------------",
serde_json::to_string_pretty(&res)?
);
// TODO enable in future:
if false {
if expect_finalised_range {
if !res
.get("finalisedRange")
.ok_or(Error::with_msg("missing finalisedRange"))?
.as_bool()
.ok_or(Error::with_msg("key finalisedRange not bool"))?
{
return Err(Error::with_msg("expected finalisedRange"));
}
} else if res.get("finalisedRange").is_some() {
return Err(Error::with_msg("expect absent finalisedRange"));
}
}
if res.get("counts").unwrap().as_array().unwrap().len() != expect_bin_count as usize {
return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count)));
}
if res.get("mins").unwrap().as_array().unwrap().len() != expect_bin_count as usize {
return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count)));
}
if res.get("maxs").unwrap().as_array().unwrap().len() != expect_bin_count as usize {
return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count)));
}
if res.get("avgs").unwrap().as_array().unwrap().len() != expect_bin_count as usize {
return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count)));
}
Ok(())
}
// TODO reuse the types from server.
#[derive(Debug, Serialize, Deserialize)]
pub struct ScalarEventsResponse {
#[serde(rename = "tsAnchor")]
pub ts_anchor: u64,
#[serde(rename = "tsMs")]
pub ts_ms: Vec<u64>,
#[serde(rename = "tsNs")]
pub ts_ns: Vec<u64>,
pub values: Vec<f64>,
#[serde(rename = "finalisedRange", default = "bool_false")]
pub finalised_range: bool,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct WaveEventsResponse {
#[serde(rename = "tsAnchor")]
ts_anchor: u64,
#[serde(rename = "tsMs")]
ts_ms: Vec<u64>,
#[serde(rename = "tsNs")]
ts_ns: Vec<u64>,
values: Vec<Vec<f64>>,
#[serde(rename = "finalisedRange", default = "bool_false")]
finalised_range: bool,
}
impl WaveEventsResponse {
pub fn is_close(&self, other: &Self) -> Result<bool, Error> {
let reterr = || -> Result<bool, Error> {
Err(Error::with_msg_no_trace(format!(
"Mismatch\n{:?}\nVS\n{:?}",
self, other
)))
};
if self.ts_anchor != other.ts_anchor {
return reterr();
}
if self.finalised_range != other.finalised_range {
return reterr();
}
let pairs = [(&self.values, &other.values)];
for (t, u) in pairs {
for (j, k) in t.iter().zip(u) {
for (&a, &b) in j.iter().zip(k) {
if !f64_close(a, b) {
return reterr();
}
}
}
}
Ok(true)
}
}
#[derive(Debug, Serialize, Deserialize)]
struct BinnedResponse {
#[serde(rename = "tsAnchor")]
ts_anchor: u64,
#[serde(rename = "tsMs")]
ts_ms: Vec<u64>,
#[serde(rename = "tsNs")]
ts_ns: Vec<u64>,
mins: Vec<Option<f64>>,
maxs: Vec<Option<f64>>,
avgs: Vec<Option<f64>>,
counts: Vec<u64>,
#[serde(rename = "finalisedRange", default = "bool_false")]
finalised_range: bool,
}
impl BinnedResponse {
pub fn is_close(&self, other: &Self) -> Result<bool, Error> {
let reterr = || -> Result<bool, Error> {
Err(Error::with_msg_no_trace(format!(
"Mismatch\n{:?}\nVS\n{:?}",
self, other
)))
};
if self.ts_anchor != other.ts_anchor {
return reterr();
}
if self.finalised_range != other.finalised_range {
return reterr();
}
if self.counts != other.counts {
return reterr();
}
let pairs = [
(&self.mins, &other.mins),
(&self.maxs, &other.maxs),
(&self.avgs, &other.avgs),
];
for (t, u) in pairs {
for (&a, &b) in t.iter().zip(u) {
if let (Some(a), Some(b)) = (a, b) {
if !f64_close(a, b) {
return reterr();
}
} else if let (None, None) = (a, b) {
} else {
return reterr();
}
}
}
Ok(true)
}
}
fn bool_false() -> bool {
false
}
async fn get_binned_json_common_res(
channel: Channel,
beg_date: &str,
end_date: &str,
bin_count: u32,
agg_kind: AggKind,
cluster: &Cluster,
) -> Result<(BinnedResponse, String), Error> {
let t1 = Utc::now();
let node0 = &cluster.nodes[0];
let beg_date: DateTime<Utc> = beg_date.parse()?;
let end_date: DateTime<Utc> = end_date.parse()?;
let range = NanoRange::from_date_time(beg_date, end_date);
let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind);
query.set_timeout(Duration::from_millis(15000));
query.set_cache_usage(CacheUsage::Ignore);
let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", node0.host, node0.port))?;
query.append_to_url(&mut url);
let url = url;
info!("get_binned_json_common_res get {}", url);
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri(url.to_string())
.header(http::header::ACCEPT, APP_JSON)
.body(Body::empty())
.ec()?;
let client = hyper::Client::new();
let res = client.request(req).await.ec()?;
if res.status() != StatusCode::OK {
let msg = format!("client response {res:?}");
error!("{msg}");
return Err(msg.into());
}
let res = hyper::body::to_bytes(res.into_body()).await.ec()?;
let t2 = chrono::Utc::now();
let _ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
let res = String::from_utf8_lossy(&res).to_string();
let ret: BinnedResponse = serde_json::from_str(res.as_str())?;
Ok((ret, res))
}
async fn get_events_json_common_res(
channel: Channel,
beg_date: &str,
end_date: &str,
cluster: &Cluster,
) -> Result<String, Error> {
let t1 = Utc::now();
let node0 = &cluster.nodes[0];
let beg_date: DateTime<Utc> = beg_date.parse()?;
let end_date: DateTime<Utc> = end_date.parse()?;
let range = NanoRange::from_date_time(beg_date, end_date);
let mut query = PlainEventsQuery::new(channel, range, 4096, None, false);
query.set_timeout(Duration::from_millis(15000));
let mut url = Url::parse(&format!("http://{}:{}/api/4/events", node0.host, node0.port))?;
query.append_to_url(&mut url);
let url = url;
info!("get_events_json_common_res get {}", url);
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri(url.to_string())
.header(http::header::ACCEPT, APP_JSON)
.body(Body::empty())
.ec()?;
let client = hyper::Client::new();
let res = client.request(req).await.ec()?;
if res.status() != StatusCode::OK {
let msg = format!("client response {res:?}");
error!("{msg}");
return Err(msg.into());
}
let res = hyper::body::to_bytes(res.into_body()).await.ec()?;
let t2 = chrono::Utc::now();
let _ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
let res = String::from_utf8_lossy(&res).to_string();
//info!("STRING RESULT:{}", res);
Ok(res)
}

View File

@@ -2,6 +2,8 @@ use super::*;
#[test]
fn get_scalar_2_events() -> Result<(), Error> {
let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) };
#[cfg(DISABLED)]
let fut = async move {
let rh = require_sls_test_host_running()?;
let cluster = &rh.cluster;
@@ -45,6 +47,8 @@ fn get_scalar_2_events() -> Result<(), Error> {
#[test]
fn get_scalar_2_binned() -> Result<(), Error> {
let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) };
#[cfg(DISABLED)]
let fut = async move {
let rh = require_sls_test_host_running()?;
let cluster = &rh.cluster;
@@ -57,7 +61,7 @@ fn get_scalar_2_binned() -> Result<(), Error> {
let endstr = "2021-11-10T00:10:00Z";
let (res, jsstr) =
get_binned_json_common_res(channel, begstr, endstr, 10, AggKind::TimeWeightedScalar, cluster).await?;
let exp = r##"{"avgs":[401.1745910644531,401.5135498046875,400.8823547363281,400.66156005859375,401.8301086425781,401.19305419921875,400.5584411621094,401.4371337890625,401.4137268066406,400.77880859375],"counts":[19,6,6,19,6,6,6,19,6,6],"finalisedRange":true,"maxs":[402.04977411361034,401.8439029736943,401.22628955394583,402.1298351124666,402.1298351124666,401.5084092642013,400.8869834159359,402.05358654212733,401.74477983225313,401.1271664125047],"mins":[400.08256099885625,401.22628955394583,400.60867613419754,400.0939982844072,401.5084092642013,400.8869834159359,400.2693699961876,400.05968642775446,401.1271664125047,400.50574056423943],"tsAnchor":1636502400,"tsMs":[0,60000,120000,180000,240000,300000,360000,420000,480000,540000,600000],"tsNs":[0,0,0,0,0,0,0,0,0,0,0]}"##;
let exp = r##"{"avgs":[401.1745910644531,401.5135498046875,400.8823547363281,400.66156005859375,401.8301086425781,401.19305419921875,400.5584411621094,401.4371337890625,401.4137268066406,400.77880859375],"counts":[19,6,6,19,6,6,6,19,6,6],"rangeFinal":true,"maxs":[402.04977411361034,401.8439029736943,401.22628955394583,402.1298351124666,402.1298351124666,401.5084092642013,400.8869834159359,402.05358654212733,401.74477983225313,401.1271664125047],"mins":[400.08256099885625,401.22628955394583,400.60867613419754,400.0939982844072,401.5084092642013,400.8869834159359,400.2693699961876,400.05968642775446,401.1271664125047,400.50574056423943],"tsAnchor":1636502400,"tsMs":[0,60000,120000,180000,240000,300000,360000,420000,480000,540000,600000],"tsNs":[0,0,0,0,0,0,0,0,0,0,0]}"##;
let exp: BinnedResponse = serde_json::from_str(exp).unwrap();
check_close(&res, &exp, &jsstr)?;
Ok(())
@@ -67,6 +71,8 @@ fn get_scalar_2_binned() -> Result<(), Error> {
#[test]
fn get_wave_1_events() -> Result<(), Error> {
let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) };
#[cfg(DISABLED)]
let fut = async move {
let rh = require_sls_test_host_running()?;
let cluster = &rh.cluster;
@@ -108,6 +114,8 @@ fn get_wave_1_events() -> Result<(), Error> {
#[test]
fn get_wave_1_binned() -> Result<(), Error> {
let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) };
#[cfg(DISABLED)]
let fut = async move {
let rh = require_sls_test_host_running()?;
let cluster = &rh.cluster;

View File

@@ -10,79 +10,6 @@ use netpod::{AggKind, Channel, Cluster, NanoRange, APP_JSON};
use std::time::Duration;
use url::Url;
#[test]
fn time_weighted_json_00() -> Result<(), Error> {
async fn inner() -> Result<(), Error> {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
let res = get_json_common(
"const-regular-scalar-i32-be",
"1970-01-01T00:20:10.000Z",
"1970-01-01T04:20:30.000Z",
20,
AggKind::DimXBins1,
cluster,
25,
true,
)
.await?;
let v = res.avgs[0];
assert!(v > 41.9999 && v < 42.0001);
Ok(())
}
super::run_test(inner())
}
#[test]
fn time_weighted_json_01() -> Result<(), Error> {
// TODO OFFENDING TEST
if true {
return Ok(());
}
async fn inner() -> Result<(), Error> {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
let res = get_json_common(
"const-regular-scalar-i32-be",
"1970-01-01T00:20:10.000Z",
"1970-01-01T10:20:30.000Z",
10,
AggKind::DimXBins1,
cluster,
9,
true,
)
.await?;
let v = res.avgs[0];
assert!(v > 41.9999 && v < 42.0001);
Ok(())
}
super::run_test(inner())
}
#[test]
fn time_weighted_json_02() -> Result<(), Error> {
async fn inner() -> Result<(), Error> {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
let res = get_json_common(
"const-regular-scalar-i32-be",
"1970-01-01T00:20:10.000Z",
"1970-01-01T00:20:20.000Z",
20,
AggKind::TimeWeightedScalar,
cluster,
100,
true,
)
.await?;
let v = res.avgs[0];
assert!(v > 41.9999 && v < 42.0001);
Ok(())
}
super::run_test(inner())
}
#[test]
fn time_weighted_json_03() -> Result<(), Error> {
async fn inner() -> Result<(), Error> {
@@ -209,15 +136,15 @@ async fn get_json_common(
if false {
if expect_finalised_range {
if !res
.get("finalisedRange")
.ok_or(Error::with_msg("missing finalisedRange"))?
.get("rangeFinal")
.ok_or(Error::with_msg("missing rangeFinal"))?
.as_bool()
.ok_or(Error::with_msg("key finalisedRange not bool"))?
.ok_or(Error::with_msg("key rangeFinal not bool"))?
{
return Err(Error::with_msg("expected finalisedRange"));
return Err(Error::with_msg("expected rangeFinal"));
}
} else if res.get("finalisedRange").is_some() {
return Err(Error::with_msg("expect absent finalisedRange"));
} else if res.get("rangeFinal").is_some() {
return Err(Error::with_msg("expect absent rangeFinal"));
}
}
let counts = res.get("counts").unwrap().as_array().unwrap();

View File

@@ -121,7 +121,7 @@ where
// TODO should we accumulate bins before emit? Maybe not, we want to stay responsive.
// Only if the frequency would be high, that would require cpu time checks. Worth it? Measure..
self.tmp_agg_results.push_back(ret);
if self.curbin >= self.spec.count as u32 {
if self.curbin >= self.spec.bin_count() as u32 {
self.all_bins_emitted = true;
}
}

View File

@@ -77,7 +77,7 @@ impl ChannelExecFunction for BinnedBinaryChannelExec {
let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) {
Ok(Some(pre_range)) => {
debug!("BinnedBinaryChannelExec found pre_range: {pre_range:?}");
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
if range.grid_spec().bin_t_len() < pre_range.grid_spec.bin_t_len() {
let msg = format!(
"BinnedBinaryChannelExec incompatible ranges:\npre_range: {pre_range:?}\nrange: {range:?}"
);
@@ -323,12 +323,12 @@ impl ChannelExecFunction for BinnedJsonChannelExec {
{
let _ = event_value_shape;
let range = BinnedRange::covering_range(self.query.range().clone(), self.query.bin_count())?;
let t_bin_count = range.count as u32;
let t_bin_count = range.bin_count() as u32;
let perf_opts = PerfOpts { inmem_bufcap: 512 };
let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) {
Ok(Some(pre_range)) => {
info!("BinnedJsonChannelExec found pre_range: {pre_range:?}");
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
if range.grid_spec().bin_t_len() < pre_range.grid_spec.bin_t_len() {
let msg = format!(
"BinnedJsonChannelExec incompatible ranges:\npre_range: {pre_range:?}\nrange: {range:?}"
);

View File

@@ -49,20 +49,35 @@ where
let item = EventsDim0 { tss, pulses, values };
let item = ChannelEvents::Events(Box::new(item));
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
} else if let Some(item) = item.as_any_mut().downcast_mut::<items::waveevents::WaveEvents<NTY>>() {
warn!("WaveEvents");
let _tss: VecDeque<u64> = item.tss.iter().map(|x| *x).collect();
let _pulses: VecDeque<u64> = item.pulses.iter().map(|x| *x).collect();
let _values: VecDeque<Vec<NTY>> = item.vals.iter().map(|x| x.clone()).collect();
//let item = EventsDim1 { tss, pulses, values };
//let item = ChannelEvents::Events(Box::new(item));
//Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
} else if let Some(item) = item
.as_any_mut()
.downcast_mut::<items::xbinnedscalarevents::XBinnedScalarEvents<NTY>>()
{
warn!("XBinnedScalarEvents");
let tss: VecDeque<u64> = item.tss.iter().map(|x| *x).collect();
let pulses: VecDeque<u64> = (0..tss.len()).map(|_| 0).collect();
let _avgs: VecDeque<f32> = item.avgs.iter().map(|x| x.clone()).collect();
let mins: VecDeque<NTY> = item.mins.iter().map(|x| x.clone()).collect();
let _maxs: VecDeque<NTY> = item.maxs.iter().map(|x| x.clone()).collect();
let item = EventsDim0 {
tss,
pulses,
values: mins,
};
let item = ChannelEvents::Events(Box::new(item));
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
} else {
if let Some(item) = item.as_any_mut().downcast_mut::<items::waveevents::WaveEvents<NTY>>() {
warn!("WaveEvents");
let _tss: VecDeque<u64> = item.tss.iter().map(|x| *x).collect();
let _pulses: VecDeque<u64> = item.pulses.iter().map(|x| *x).collect();
let _values: VecDeque<Vec<NTY>> = item.vals.iter().map(|x| x.clone()).collect();
//let item = EventsDim1 { tss, pulses, values };
//let item = ChannelEvents::Events(Box::new(item));
//Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
} else {
error!("TODO bad, no idea what this item is");
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
}
error!("TODO bad, no idea what this item is\n\n{:?}\n\n", item);
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
}
}
RangeCompletableItem::RangeComplete => Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)),

View File

@@ -16,7 +16,7 @@ use url::Url;
async fn binned_json(url: Url, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
info!("httpret plain_events_json req: {:?}", req);
let (head, _body) = req.into_parts();
let (_head, _body) = req.into_parts();
let query = BinnedQuery::from_url(&url).map_err(|e| {
let msg = format!("can not parse query: {}", e.msg());
e.add_public_msg(msg)
@@ -27,7 +27,6 @@ async fn binned_json(url: Url, req: Request<Body>, node_config: &NodeConfigCache
query.set_series_id(chconf.series);
let query = query;
// ---
let span1 = span!(
Level::INFO,
"httpret::binned",
@@ -38,15 +37,7 @@ async fn binned_json(url: Url, req: Request<Body>, node_config: &NodeConfigCache
span1.in_scope(|| {
debug!("begin");
});
let _: Result<_, Error> = match head.headers.get(http::header::ACCEPT) {
//Some(v) if v == APP_OCTET => binned_binary(query, chconf, &ctx, node_config).await,
//Some(v) if v == APP_JSON || v == ACCEPT_ALL => binned_json(query, chconf, &ctx, node_config).await,
_ => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?),
};
// TODO analogue to `streams::plaineventsjson::plain_events_json` create a function for binned json.
let item = streams::plaineventsjson::plain_events_json("", &node_config.node_config.cluster)
let item = streams::timebinnedjson::timebinned_json(&query, &node_config.node_config.cluster)
.instrument(span1)
.await?;
let buf = serde_json::to_vec(&item)?;

View File

@@ -43,24 +43,49 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) ->
);
}
if channel.backend() == "test-inmem" {
if channel.name() == "inmem-d0-i32" {
let ret = if channel.name() == "inmem-d0-i32" {
let ret = ChConf {
series: 1,
scalar_type: ScalarType::I32,
shape: Shape::Scalar,
};
return Ok(ret);
}
Ok(ret)
} else {
error!("no test information");
Err(Error::with_msg_no_trace(format!("no test information"))
.add_public_msg("No channel config for test channel {:?}"))
};
return ret;
}
if channel.backend() == "test-disk-databuffer" {
if channel.name() == "scalar-i32-be" {
// TODO the series-ids here are just random. Need to integrate with better test setup.
let ret = if channel.name() == "scalar-i32-be" {
let ret = ChConf {
series: 1,
scalar_type: ScalarType::I32,
shape: Shape::Scalar,
};
return Ok(ret);
}
Ok(ret)
} else if channel.name() == "wave-f64-be-n21" {
let ret = ChConf {
series: 2,
scalar_type: ScalarType::F64,
shape: Shape::Wave(21),
};
Ok(ret)
} else if channel.name() == "const-regular-scalar-i32-be" {
let ret = ChConf {
series: 3,
scalar_type: ScalarType::I32,
shape: Shape::Scalar,
};
Ok(ret)
} else {
error!("no test information");
Err(Error::with_msg_no_trace(format!("no test information"))
.add_public_msg("No channel config for test channel {:?}"))
};
return ret;
}
// TODO use a common already running worker pool for these queries:
let dbconf = &ncc.node_config.cluster.database;

View File

@@ -199,7 +199,7 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/events?channel
<p>Example response:</p>
<pre>
{
"finalisedRange": true,
"rangeFinal": true,
"tsAnchor": 1623763172,
"tsMs": [
5,
@@ -224,7 +224,7 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/events?channel
<h4>Finalised range</h4>
<p>If the server can determine that no more data will be added to the requested time range
then it will add the flag <strong>finalisedRange: true</strong> to the response.</p>
then it will add the flag <strong>rangeFinal: true</strong> to the response.</p>
@@ -312,7 +312,7 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/binned?channel
0,
0
],
"finalisedRange": true
"rangeFinal": true
}
</pre>
@@ -321,7 +321,7 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/binned?channel
<h4>Finalised range</h4>
<p>If the server can determine that no more data will be added to the requested time range
then it will add the flag <strong>finalisedRange: true</strong> to the response.</p>
then it will add the flag <strong>rangeFinal: true</strong> to the response.</p>

View File

@@ -266,7 +266,7 @@ pub struct MinMaxAvgBinsCollectedResult<NTY> {
mins: Vec<NTY>,
maxs: Vec<NTY>,
avgs: Vec<f32>,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "rangeFinal")]
finalised_range: bool,
#[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")]
missing_bins: u32,

View File

@@ -259,7 +259,7 @@ pub struct MinMaxAvgDim1BinsCollectedResult<NTY> {
mins: Vec<Option<Vec<NTY>>>,
maxs: Vec<Option<Vec<NTY>>>,
avgs: Vec<Option<Vec<f32>>>,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "rangeFinal")]
finalised_range: bool,
#[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")]
missing_bins: u32,
@@ -497,7 +497,7 @@ pub struct WaveEventsCollectedResult<NTY> {
#[serde(rename = "pulseOff")]
pulse_off: Vec<u64>,
values: Vec<Vec<NTY>>,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "rangeFinal")]
range_complete: bool,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")]
timed_out: bool,

View File

@@ -371,7 +371,7 @@ impl FrameType for EventQueryJsonStringFrame {
}
pub trait EventsNodeProcessorOutput:
Send + Unpin + DeserializeOwned + WithTimestamps + TimeBinnableType + ByteEstimate
fmt::Debug + Send + Unpin + DeserializeOwned + WithTimestamps + TimeBinnableType + ByteEstimate
{
fn as_any_mut(&mut self) -> &mut dyn Any;
fn into_parts(self) -> (Box<dyn Any>, VecDeque<u64>, VecDeque<u64>);

View File

@@ -290,7 +290,7 @@ pub struct EventValuesCollectorOutput<NTY> {
#[serde(rename = "pulseOff")]
pulse_off: Vec<u64>,
values: Vec<NTY>,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "rangeFinal")]
range_complete: bool,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")]
timed_out: bool,

View File

@@ -221,7 +221,7 @@ pub struct StatsEventsCollectorOutput {
#[serde(rename = "tsNs")]
ts_off_ns: Vec<u64>,
// TODO what to collect? pulse min/max
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "rangeFinal")]
range_complete: bool,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")]
timed_out: bool,

View File

@@ -429,7 +429,7 @@ pub struct XBinnedScalarEventsCollectedResult<NTY> {
mins: Vec<NTY>,
maxs: Vec<NTY>,
avgs: Vec<f32>,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "rangeFinal")]
finalised_range: bool,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")]
timed_out: bool,

View File

@@ -450,7 +450,7 @@ pub struct XBinnedWaveEventsCollectedResult<NTY> {
mins: Vec<Vec<NTY>>,
maxs: Vec<Vec<NTY>>,
avgs: Vec<Vec<f32>>,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "rangeFinal")]
finalised_range: bool,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")]
timed_out: bool,

View File

@@ -7,27 +7,18 @@ use std::any::Any;
use std::fmt;
pub trait Collector: fmt::Debug + Send {
// TODO should require here Collectable?
type Input;
type Output: Collected;
fn len(&self) -> usize;
fn ingest(&mut self, item: &mut Self::Input);
fn ingest(&mut self, item: &mut dyn Collectable);
fn set_range_complete(&mut self);
fn set_timed_out(&mut self);
fn result(&mut self) -> Result<Self::Output, Error>;
fn result(&mut self) -> Result<Box<dyn Collected>, Error>;
}
pub trait Collectable: fmt::Debug {
type Collector: Collector<Input = Self>;
fn new_collector(&self) -> Self::Collector;
pub trait Collectable: fmt::Debug + crate::AsAnyMut {
fn new_collector(&self) -> Box<dyn Collector>;
}
// TODO can this get removed?
pub trait Collected: fmt::Debug + ToJsonResult + AsAnyRef + Send {}
erased_serde::serialize_trait_object!(Collected);
@@ -53,6 +44,7 @@ impl Collected for Box<dyn Collected> {}
#[derive(Debug)]
pub struct CollectorDynDefault {}
// TODO remove?
pub trait CollectorDyn: fmt::Debug + Send {
fn len(&self) -> usize;
@@ -70,50 +62,15 @@ pub trait CollectableWithDefault {
fn as_any_mut(&mut self) -> &mut dyn Any;
}
#[derive(Debug)]
pub struct EventsCollector {
coll: Box<dyn CollectorDyn>,
}
impl EventsCollector {
pub fn new(coll: Box<dyn CollectorDyn>) -> Self {
Self { coll }
}
}
impl Collector for EventsCollector {
type Input = Box<dyn Events>;
// TODO this Output trait does not differentiate between e.g. collected events, collected bins, different aggs, etc...
type Output = Box<dyn Collected>;
fn len(&self) -> usize {
self.coll.len()
}
fn ingest(&mut self, item: &mut Self::Input) {
self.coll.ingest(item.as_collectable_with_default_mut());
}
fn set_range_complete(&mut self) {
self.coll.set_range_complete()
}
fn set_timed_out(&mut self) {
self.coll.set_timed_out()
}
fn result(&mut self) -> Result<Self::Output, Error> {
self.coll.result()
impl crate::AsAnyMut for Box<dyn Events> {
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
impl Collectable for Box<dyn Events> {
type Collector = EventsCollector;
fn new_collector(&self) -> Self::Collector {
let coll = CollectableWithDefault::new_collector(self.as_ref());
EventsCollector::new(coll)
fn new_collector(&self) -> Box<dyn Collector> {
todo!()
}
}
@@ -121,14 +78,11 @@ impl Collectable for Box<dyn Events> {
pub struct TimeBinnedCollector {}
impl Collector for TimeBinnedCollector {
type Input = Box<dyn crate::TimeBinned>;
type Output = Box<dyn Collected>;
fn len(&self) -> usize {
todo!()
}
fn ingest(&mut self, _item: &mut Self::Input) {
fn ingest(&mut self, _item: &mut dyn Collectable) {
todo!()
}
@@ -140,15 +94,19 @@ impl Collector for TimeBinnedCollector {
todo!()
}
fn result(&mut self) -> Result<Self::Output, Error> {
fn result(&mut self) -> Result<Box<dyn Collected>, Error> {
todo!()
}
}
impl crate::AsAnyMut for Box<dyn crate::TimeBinned> {
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
impl Collectable for Box<dyn crate::TimeBinned> {
type Collector = TimeBinnedCollector;
fn new_collector(&self) -> Self::Collector {
todo!()
fn new_collector(&self) -> Box<dyn Collector> {
self.as_ref().new_collector()
}
}

View File

@@ -5,6 +5,7 @@ use serde::Serialize;
use std::any::Any;
use std::fmt;
// TODO rename to `Typed`
pub trait CollectorType: Send + Unpin + WithLen {
type Input: Collectable;
type Output: Collected + ToJsonResult + Serialize;
@@ -24,6 +25,7 @@ pub trait Collector: Send + Unpin + WithLen {
fn result(&mut self) -> Result<Box<dyn ToJsonResult>, Error>;
}
// TODO rename to `Typed`
pub trait CollectableType {
type Collector: CollectorType<Input = Self>;
fn new_collector() -> Self::Collector;

View File

@@ -61,14 +61,8 @@ pub trait AsAnyMut {
fn as_any_mut(&mut self) -> &mut dyn Any;
}
/*impl AsAnyRef for Box<dyn AsAnyRef> {
fn as_any_ref(&self) -> &dyn Any {
self.as_ref().as_any_ref()
}
}*/
/// Data in time-binned form.
pub trait TimeBinned: Any + TimeBinnable {
pub trait TimeBinned: Any + TimeBinnable + crate::collect_c::Collectable {
fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable;
fn as_collectable_mut(&mut self) -> &mut dyn Collectable;
fn edges_slice(&self) -> (&[u64], &[u64]);

View File

@@ -19,6 +19,12 @@ use std::any::Any;
use std::collections::VecDeque;
use std::{fmt, mem};
#[allow(unused)]
macro_rules! trace4 {
($($arg:tt)*) => ();
($($arg:tt)*) => (eprintln!($($arg)*));
}
// TODO make members private
#[derive(Clone, PartialEq, Serialize, Deserialize)]
pub struct BinsDim0<NTY> {
@@ -51,17 +57,6 @@ where
}
impl<NTY: ScalarOps> BinsDim0<NTY> {
pub fn empty() -> Self {
Self {
ts1s: VecDeque::new(),
ts2s: VecDeque::new(),
counts: VecDeque::new(),
mins: VecDeque::new(),
maxs: VecDeque::new(),
avgs: VecDeque::new(),
}
}
pub fn push(&mut self, ts1: u64, ts2: u64, count: u64, min: NTY, max: NTY, avg: f32) {
self.ts1s.push_back(ts1);
self.ts2s.push_back(ts2);
@@ -119,6 +114,19 @@ impl<NTY: ScalarOps> BinsDim0<NTY> {
}
}
impl<NTY> Empty for BinsDim0<NTY> {
fn empty() -> Self {
Self {
ts1s: VecDeque::new(),
ts2s: VecDeque::new(),
counts: VecDeque::new(),
mins: VecDeque::new(),
maxs: VecDeque::new(),
avgs: VecDeque::new(),
}
}
}
impl<NTY> WithLen for BinsDim0<NTY> {
fn len(&self) -> usize {
self.ts1s.len()
@@ -151,19 +159,6 @@ impl<NTY> RangeOverlapInfo for BinsDim0<NTY> {
}
}
impl<NTY> Empty for BinsDim0<NTY> {
fn empty() -> Self {
Self {
ts1s: Default::default(),
ts2s: Default::default(),
counts: Default::default(),
mins: Default::default(),
maxs: Default::default(),
avgs: Default::default(),
}
}
}
impl<NTY: ScalarOps> AppendEmptyBin for BinsDim0<NTY> {
fn append_empty_bin(&mut self, ts1: u64, ts2: u64) {
self.ts1s.push_back(ts1);
@@ -207,7 +202,8 @@ impl<NTY: ScalarOps> TimeBinnableType for BinsDim0<NTY> {
}
}
#[derive(Debug, Serialize)]
// TODO rename to BinsDim0CollectorOutput
#[derive(Debug, Serialize, Deserialize)]
pub struct BinsDim0CollectedResult<NTY> {
#[serde(rename = "tsAnchor")]
ts_anchor_sec: u64,
@@ -219,17 +215,23 @@ pub struct BinsDim0CollectedResult<NTY> {
ts1_off_ns: VecDeque<u64>,
#[serde(rename = "ts2Ns")]
ts2_off_ns: VecDeque<u64>,
#[serde(rename = "counts")]
counts: VecDeque<u64>,
#[serde(rename = "mins")]
mins: VecDeque<NTY>,
#[serde(rename = "maxs")]
maxs: VecDeque<NTY>,
#[serde(rename = "avgs")]
avgs: VecDeque<f32>,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
#[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")]
finalised_range: bool,
#[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")]
#[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")]
timed_out: bool,
#[serde(rename = "missingBins", default, skip_serializing_if = "Zero::is_zero")]
missing_bins: u32,
#[serde(skip_serializing_if = "Option::is_none", rename = "continueAt")]
#[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
continue_at: Option<IsoDateTime>,
#[serde(skip_serializing_if = "Option::is_none", rename = "finishedAt")]
#[serde(rename = "finishedAt", default, skip_serializing_if = "Option::is_none")]
finished_at: Option<IsoDateTime>,
}
@@ -242,14 +244,30 @@ impl<NTY: ScalarOps> items_0::AsAnyRef for BinsDim0CollectedResult<NTY> {
impl<NTY: ScalarOps> items_0::collect_c::Collected for BinsDim0CollectedResult<NTY> {}
impl<NTY> BinsDim0CollectedResult<NTY> {
pub fn len(&self) -> usize {
self.ts1_off_ms.len()
}
pub fn ts_anchor_sec(&self) -> u64 {
self.ts_anchor_sec
}
pub fn ts1_off_ms(&self) -> &VecDeque<u64> {
&self.ts1_off_ms
}
pub fn ts2_off_ms(&self) -> &VecDeque<u64> {
&self.ts2_off_ms
}
pub fn counts(&self) -> &VecDeque<u64> {
&self.counts
}
pub fn range_final(&self) -> bool {
self.finalised_range
}
pub fn missing_bins(&self) -> u32 {
self.missing_bins
}
@@ -265,6 +283,10 @@ impl<NTY> BinsDim0CollectedResult<NTY> {
pub fn maxs(&self) -> &VecDeque<NTY> {
&self.maxs
}
pub fn avgs(&self) -> &VecDeque<f32> {
&self.avgs
}
}
impl<NTY: ScalarOps> ToJsonResult for BinsDim0CollectedResult<NTY> {
@@ -278,6 +300,7 @@ impl<NTY: ScalarOps> ToJsonResult for BinsDim0CollectedResult<NTY> {
}
}
#[derive(Debug)]
pub struct BinsDim0Collector<NTY> {
timed_out: bool,
range_complete: bool,
@@ -305,6 +328,7 @@ impl<NTY: ScalarOps> CollectorType for BinsDim0Collector<NTY> {
type Output = BinsDim0CollectedResult<NTY>;
fn ingest(&mut self, src: &mut Self::Input) {
trace!("\n\n----------- BinsDim0Collector ingest\n{:?}\n\n", src);
// TODO could be optimized by non-contiguous container.
self.vals.ts1s.append(&mut src.ts1s);
self.vals.ts2s.append(&mut src.ts2s);
@@ -362,6 +386,7 @@ impl<NTY: ScalarOps> CollectorType for BinsDim0Collector<NTY> {
maxs,
avgs,
finalised_range: self.range_complete,
timed_out: self.timed_out,
missing_bins,
continue_at,
finished_at,
@@ -378,6 +403,73 @@ impl<NTY: ScalarOps> CollectableType for BinsDim0<NTY> {
}
}
impl<NTY> items_0::collect_c::Collector for BinsDim0Collector<NTY>
where
NTY: ScalarOps,
{
fn len(&self) -> usize {
self.vals.len()
}
fn ingest(&mut self, item: &mut dyn items_0::collect_c::Collectable) {
trace4!("\n\n••••••••••••••••••••••••••••\nINGEST\n{:?}\n\n", item);
{
{
//let tyid = item.type_id();
//let tyid = item.as_any_mut().type_id();
//let tyid = format!("{:?}", item.type_id().to_owned());
trace4!("ty 0: {:40?}", (item.as_any_mut() as &dyn Any).type_id());
}
trace4!("ty 1: {:40?}", std::any::TypeId::of::<BinsDim0<NTY>>());
trace4!("ty 2: {:40?}", std::any::TypeId::of::<BinsDim0<i32>>());
trace4!("ty 3: {:40?}", std::any::TypeId::of::<Box<BinsDim0<i32>>>());
trace4!(
"ty 4: {:?}",
std::any::TypeId::of::<Box<dyn items_0::collect_c::Collectable>>()
);
trace4!(
"ty 5: {:?}",
std::any::TypeId::of::<&mut dyn items_0::collect_c::Collectable>()
);
trace4!("ty 6: {:?}", std::any::TypeId::of::<Box<dyn items_0::TimeBinned>>());
}
if let Some(item) = item.as_any_mut().downcast_mut::<BinsDim0<NTY>>() {
trace4!("ingest plain");
CollectorType::ingest(self, item)
} else if let Some(item) = item.as_any_mut().downcast_mut::<Box<BinsDim0<NTY>>>() {
trace4!("ingest boxed");
CollectorType::ingest(self, item)
} else if let Some(item) = item.as_any_mut().downcast_mut::<Box<dyn items_0::TimeBinned>>() {
trace4!("ingest boxed dyn TimeBinned");
if let Some(item) = item.as_any_mut().downcast_mut::<BinsDim0<NTY>>() {
trace4!("ingest boxed dyn TimeBinned match");
CollectorType::ingest(self, item)
} else {
warn!("BinsDim0Collector::ingest unexpected inner item");
trace!("BinsDim0Collector::ingest unexpected inner item {:?}", item);
}
} else {
warn!("BinsDim0Collector::ingest unexpected item");
trace!("BinsDim0Collector::ingest unexpected item {:?}", item);
}
}
fn set_range_complete(&mut self) {
CollectorType::set_range_complete(self)
}
fn set_timed_out(&mut self) {
CollectorType::set_timed_out(self)
}
fn result(&mut self) -> Result<Box<dyn items_0::collect_c::Collected>, Error> {
match CollectorType::result(self) {
Ok(res) => Ok(Box::new(res)),
Err(e) => Err(e.into()),
}
}
}
pub struct BinsDim0Aggregator<NTY> {
range: NanoRange,
count: u64,
@@ -510,7 +602,7 @@ impl<NTY: ScalarOps> TimeBinner for BinsDim0TimeBinner<NTY> {
return;
}
if self.edges.len() < 2 {
warn!("TimeBinnerDyn for {self_name} no more bin in edges A");
warn!("TimeBinnerDyn for {self_name} no more bin in edges A\n{:?}\n\n", item);
return;
}
// TODO optimize by remembering at which event array index we have arrived.
@@ -522,7 +614,7 @@ impl<NTY: ScalarOps> TimeBinner for BinsDim0TimeBinner<NTY> {
}) {
self.cycle();
if self.edges.len() < 2 {
warn!("TimeBinnerDyn for {self_name} no more bin in edges B");
warn!("TimeBinnerDyn for {self_name} no more bin in edges B\n{:?}\n\n", item);
return;
}
}
@@ -560,7 +652,7 @@ impl<NTY: ScalarOps> TimeBinner for BinsDim0TimeBinner<NTY> {
if item.ends_after(agg.range().clone()) {
self.cycle();
if self.edges.len() < 2 {
warn!("TimeBinnerDyn for {self_name} no more bin in edges C");
warn!("TimeBinnerDyn for {self_name} no more bin in edges C\n{:?}\n\n", item);
return;
}
} else {
@@ -692,3 +784,21 @@ impl<NTY: ScalarOps> TimeBinned for BinsDim0<NTY> {
self
}
}
impl<NTY> items_0::AsAnyMut for BinsDim0<NTY>
where
NTY: 'static,
{
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
impl<NTY> items_0::collect_c::Collectable for BinsDim0<NTY>
where
NTY: ScalarOps,
{
fn new_collector(&self) -> Box<dyn items_0::collect_c::Collector> {
Box::new(BinsDim0Collector::<NTY>::new())
}
}

819
items_2/src/binsxbindim0.rs Normal file
View File

@@ -0,0 +1,819 @@
use crate::{ts_offs_from_abs, ts_offs_from_abs_with_anchor};
use crate::{IsoDateTime, RangeOverlapInfo};
use crate::{TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinner};
use chrono::{TimeZone, Utc};
use err::Error;
use items_0::collect_s::{Collectable, CollectableType, CollectorType, ToJsonResult};
use items_0::scalar_ops::ScalarOps;
use items_0::AppendEmptyBin;
use items_0::Empty;
use items_0::TimeBinned;
use items_0::TimeBins;
use items_0::WithLen;
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::NanoRange;
use num_traits::Zero;
use serde::{Deserialize, Serialize};
use std::any::Any;
use std::collections::VecDeque;
use std::{fmt, mem};
#[allow(unused)]
macro_rules! trace4 {
($($arg:tt)*) => ();
($($arg:tt)*) => (eprintln!($($arg)*));
}
#[derive(Clone, PartialEq, Serialize, Deserialize)]
pub struct BinsXbinDim0<NTY> {
ts1s: VecDeque<u64>,
ts2s: VecDeque<u64>,
counts: VecDeque<u64>,
mins: VecDeque<NTY>,
maxs: VecDeque<NTY>,
avgs: VecDeque<f32>,
// TODO could consider more variables:
// ts min/max, pulse min/max, avg of mins, avg of maxs, variances, etc...
}
impl<NTY> fmt::Debug for BinsXbinDim0<NTY>
where
NTY: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let self_name = std::any::type_name::<Self>();
write!(
fmt,
"{self_name} count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}",
self.ts1s.len(),
self.ts1s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.ts2s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.counts,
self.mins,
self.maxs,
self.avgs,
)
}
}
impl<NTY: ScalarOps> BinsXbinDim0<NTY> {
pub fn from_content(
ts1s: VecDeque<u64>,
ts2s: VecDeque<u64>,
counts: VecDeque<u64>,
mins: VecDeque<NTY>,
maxs: VecDeque<NTY>,
avgs: VecDeque<f32>,
) -> Self {
Self {
ts1s,
ts2s,
counts,
mins,
maxs,
avgs,
}
}
pub fn push(&mut self, ts1: u64, ts2: u64, count: u64, min: NTY, max: NTY, avg: f32) {
self.ts1s.push_back(ts1);
self.ts2s.push_back(ts2);
self.counts.push_back(count);
self.mins.push_back(min);
self.maxs.push_back(max);
self.avgs.push_back(avg);
}
pub fn append_zero(&mut self, beg: u64, end: u64) {
self.ts1s.push_back(beg);
self.ts2s.push_back(end);
self.counts.push_back(0);
self.mins.push_back(NTY::zero_b());
self.maxs.push_back(NTY::zero_b());
self.avgs.push_back(0.);
}
pub fn append_all_from(&mut self, src: &mut Self) {
self.ts1s.extend(src.ts1s.drain(..));
self.ts2s.extend(src.ts2s.drain(..));
self.counts.extend(src.counts.drain(..));
self.mins.extend(src.mins.drain(..));
self.maxs.extend(src.maxs.drain(..));
self.avgs.extend(src.avgs.drain(..));
}
pub fn equal_slack(&self, other: &Self) -> bool {
for (&a, &b) in self.ts1s.iter().zip(other.ts1s.iter()) {
if a != b {
return false;
}
}
for (&a, &b) in self.ts2s.iter().zip(other.ts2s.iter()) {
if a != b {
return false;
}
}
for (a, b) in self.mins.iter().zip(other.mins.iter()) {
if !a.equal_slack(b) {
return false;
}
}
for (a, b) in self.maxs.iter().zip(other.maxs.iter()) {
if !a.equal_slack(b) {
return false;
}
}
for (a, b) in self.avgs.iter().zip(other.avgs.iter()) {
if !a.equal_slack(b) {
return false;
}
}
true
}
}
impl<NTY> Empty for BinsXbinDim0<NTY> {
fn empty() -> Self {
Self {
ts1s: VecDeque::new(),
ts2s: VecDeque::new(),
counts: VecDeque::new(),
mins: VecDeque::new(),
maxs: VecDeque::new(),
avgs: VecDeque::new(),
}
}
}
impl<NTY> WithLen for BinsXbinDim0<NTY> {
fn len(&self) -> usize {
self.ts1s.len()
}
}
impl<NTY> RangeOverlapInfo for BinsXbinDim0<NTY> {
fn ends_before(&self, range: NanoRange) -> bool {
if let Some(&max) = self.ts2s.back() {
max <= range.beg
} else {
true
}
}
fn ends_after(&self, range: NanoRange) -> bool {
if let Some(&max) = self.ts2s.back() {
max > range.end
} else {
true
}
}
fn starts_after(&self, range: NanoRange) -> bool {
if let Some(&min) = self.ts1s.front() {
min >= range.end
} else {
true
}
}
}
impl<NTY: ScalarOps> AppendEmptyBin for BinsXbinDim0<NTY> {
fn append_empty_bin(&mut self, ts1: u64, ts2: u64) {
self.ts1s.push_back(ts1);
self.ts2s.push_back(ts2);
self.counts.push_back(0);
self.mins.push_back(NTY::zero_b());
self.maxs.push_back(NTY::zero_b());
self.avgs.push_back(0.);
}
}
impl<NTY: ScalarOps> TimeBins for BinsXbinDim0<NTY> {
fn ts_min(&self) -> Option<u64> {
self.ts1s.front().map(Clone::clone)
}
fn ts_max(&self) -> Option<u64> {
self.ts2s.back().map(Clone::clone)
}
fn ts_min_max(&self) -> Option<(u64, u64)> {
if let (Some(min), Some(max)) = (self.ts1s.front().map(Clone::clone), self.ts2s.back().map(Clone::clone)) {
Some((min, max))
} else {
None
}
}
}
impl<NTY: ScalarOps> TimeBinnableType for BinsXbinDim0<NTY> {
type Output = BinsXbinDim0<NTY>;
type Aggregator = BinsXbinDim0Aggregator<NTY>;
fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
let self_name = std::any::type_name::<Self>();
debug!(
"TimeBinnableType for {self_name} aggregator() range {:?} x_bin_count {} do_time_weight {}",
range, x_bin_count, do_time_weight
);
Self::Aggregator::new(range, do_time_weight)
}
}
// TODO rename to BinsDim0CollectorOutput
#[derive(Debug, Serialize, Deserialize)]
pub struct BinsXbinDim0CollectedResult<NTY> {
#[serde(rename = "tsAnchor")]
ts_anchor_sec: u64,
#[serde(rename = "ts1Ms")]
ts1_off_ms: VecDeque<u64>,
#[serde(rename = "ts2Ms")]
ts2_off_ms: VecDeque<u64>,
#[serde(rename = "ts1Ns")]
ts1_off_ns: VecDeque<u64>,
#[serde(rename = "ts2Ns")]
ts2_off_ns: VecDeque<u64>,
#[serde(rename = "counts")]
counts: VecDeque<u64>,
#[serde(rename = "mins")]
mins: VecDeque<NTY>,
#[serde(rename = "maxs")]
maxs: VecDeque<NTY>,
#[serde(rename = "avgs")]
avgs: VecDeque<f32>,
#[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")]
finalised_range: bool,
#[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")]
timed_out: bool,
#[serde(rename = "missingBins", default, skip_serializing_if = "Zero::is_zero")]
missing_bins: u32,
#[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
continue_at: Option<IsoDateTime>,
#[serde(rename = "finishedAt", default, skip_serializing_if = "Option::is_none")]
finished_at: Option<IsoDateTime>,
}
impl<NTY: ScalarOps> items_0::AsAnyRef for BinsXbinDim0CollectedResult<NTY> {
fn as_any_ref(&self) -> &dyn Any {
self
}
}
impl<NTY: ScalarOps> items_0::collect_c::Collected for BinsXbinDim0CollectedResult<NTY> {}
impl<NTY> BinsXbinDim0CollectedResult<NTY> {
pub fn len(&self) -> usize {
self.ts1_off_ms.len()
}
pub fn ts_anchor_sec(&self) -> u64 {
self.ts_anchor_sec
}
pub fn ts1_off_ms(&self) -> &VecDeque<u64> {
&self.ts1_off_ms
}
pub fn ts2_off_ms(&self) -> &VecDeque<u64> {
&self.ts2_off_ms
}
pub fn counts(&self) -> &VecDeque<u64> {
&self.counts
}
pub fn range_final(&self) -> bool {
self.finalised_range
}
pub fn missing_bins(&self) -> u32 {
self.missing_bins
}
pub fn continue_at(&self) -> Option<IsoDateTime> {
self.continue_at.clone()
}
pub fn mins(&self) -> &VecDeque<NTY> {
&self.mins
}
pub fn maxs(&self) -> &VecDeque<NTY> {
&self.maxs
}
}
impl<NTY: ScalarOps> ToJsonResult for BinsXbinDim0CollectedResult<NTY> {
fn to_json_result(&self) -> Result<Box<dyn items_0::collect_s::ToJsonBytes>, Error> {
let k = serde_json::to_value(self)?;
Ok(Box::new(k))
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[derive(Debug)]
pub struct BinsXbinDim0Collector<NTY> {
timed_out: bool,
range_complete: bool,
vals: BinsXbinDim0<NTY>,
}
impl<NTY> BinsXbinDim0Collector<NTY> {
pub fn new() -> Self {
Self {
timed_out: false,
range_complete: false,
vals: BinsXbinDim0::<NTY>::empty(),
}
}
}
impl<NTY> WithLen for BinsXbinDim0Collector<NTY> {
fn len(&self) -> usize {
self.vals.ts1s.len()
}
}
impl<NTY: ScalarOps> CollectorType for BinsXbinDim0Collector<NTY> {
type Input = BinsXbinDim0<NTY>;
type Output = BinsXbinDim0CollectedResult<NTY>;
fn ingest(&mut self, src: &mut Self::Input) {
trace!("\n\n----------- BinsXbinDim0Collector ingest\n{:?}\n\n", src);
// TODO could be optimized by non-contiguous container.
self.vals.ts1s.append(&mut src.ts1s);
self.vals.ts2s.append(&mut src.ts2s);
self.vals.counts.append(&mut src.counts);
self.vals.mins.append(&mut src.mins);
self.vals.maxs.append(&mut src.maxs);
self.vals.avgs.append(&mut src.avgs);
}
fn set_range_complete(&mut self) {
self.range_complete = true;
}
fn set_timed_out(&mut self) {
self.timed_out = true;
}
fn result(&mut self) -> Result<Self::Output, Error> {
let bin_count_exp = 0;
let bin_count = self.vals.ts1s.len() as u32;
let (missing_bins, continue_at, finished_at) = if bin_count < bin_count_exp {
match self.vals.ts2s.back() {
Some(&k) => {
let missing_bins = bin_count_exp - bin_count;
let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64));
let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64;
let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64));
(missing_bins, Some(continue_at), Some(finished_at))
}
None => Err(Error::with_msg("partial_content but no bin in result"))?,
}
} else {
(0, None, None)
};
if self.vals.ts1s.as_slices().1.len() != 0 {
panic!();
}
if self.vals.ts2s.as_slices().1.len() != 0 {
panic!();
}
let tst1 = ts_offs_from_abs(self.vals.ts1s.as_slices().0);
let tst2 = ts_offs_from_abs_with_anchor(tst1.0, self.vals.ts2s.as_slices().0);
let counts = mem::replace(&mut self.vals.counts, VecDeque::new());
let mins = mem::replace(&mut self.vals.mins, VecDeque::new());
let maxs = mem::replace(&mut self.vals.maxs, VecDeque::new());
let avgs = mem::replace(&mut self.vals.avgs, VecDeque::new());
let ret = BinsXbinDim0CollectedResult::<NTY> {
ts_anchor_sec: tst1.0,
ts1_off_ms: tst1.1,
ts1_off_ns: tst1.2,
ts2_off_ms: tst2.0,
ts2_off_ns: tst2.1,
counts,
mins,
maxs,
avgs,
finalised_range: self.range_complete,
timed_out: self.timed_out,
missing_bins,
continue_at,
finished_at,
};
Ok(ret)
}
}
impl<NTY: ScalarOps> CollectableType for BinsXbinDim0<NTY> {
type Collector = BinsXbinDim0Collector<NTY>;
fn new_collector() -> Self::Collector {
Self::Collector::new()
}
}
impl<NTY> items_0::collect_c::Collector for BinsXbinDim0Collector<NTY>
where
NTY: ScalarOps,
{
fn len(&self) -> usize {
self.vals.len()
}
fn ingest(&mut self, item: &mut dyn items_0::collect_c::Collectable) {
trace4!("\n\n••••••••••••••••••••••••••••\nINGEST\n{:?}\n\n", item);
{
{
//let tyid = item.type_id();
//let tyid = item.as_any_mut().type_id();
//let tyid = format!("{:?}", item.type_id().to_owned());
trace4!("ty 0: {:40?}", (item.as_any_mut() as &dyn Any).type_id());
}
trace4!("ty 1: {:40?}", std::any::TypeId::of::<BinsDim0<NTY>>());
trace4!("ty 2: {:40?}", std::any::TypeId::of::<BinsDim0<i32>>());
trace4!("ty 3: {:40?}", std::any::TypeId::of::<Box<BinsDim0<i32>>>());
trace4!(
"ty 4: {:?}",
std::any::TypeId::of::<Box<dyn items_0::collect_c::Collectable>>()
);
trace4!(
"ty 5: {:?}",
std::any::TypeId::of::<&mut dyn items_0::collect_c::Collectable>()
);
trace4!("ty 6: {:?}", std::any::TypeId::of::<Box<dyn items_0::TimeBinned>>());
}
if let Some(item) = item.as_any_mut().downcast_mut::<BinsXbinDim0<NTY>>() {
trace4!("ingest plain");
CollectorType::ingest(self, item)
} else if let Some(item) = item.as_any_mut().downcast_mut::<Box<BinsXbinDim0<NTY>>>() {
trace4!("ingest boxed");
CollectorType::ingest(self, item)
} else if let Some(item) = item.as_any_mut().downcast_mut::<Box<dyn items_0::TimeBinned>>() {
trace4!("ingest boxed dyn TimeBinned");
if let Some(item) = item.as_any_mut().downcast_mut::<BinsXbinDim0<NTY>>() {
trace4!("ingest boxed dyn TimeBinned match");
CollectorType::ingest(self, item)
} else {
warn!("BinsDim0Collector::ingest unexpected inner item");
trace!("BinsDim0Collector::ingest unexpected inner item {:?}", item);
}
} else {
warn!("BinsDim0Collector::ingest unexpected item");
trace!("BinsDim0Collector::ingest unexpected item {:?}", item);
}
}
fn set_range_complete(&mut self) {
CollectorType::set_range_complete(self)
}
fn set_timed_out(&mut self) {
CollectorType::set_timed_out(self)
}
fn result(&mut self) -> Result<Box<dyn items_0::collect_c::Collected>, Error> {
match CollectorType::result(self) {
Ok(res) => Ok(Box::new(res)),
Err(e) => Err(e.into()),
}
}
}
pub struct BinsXbinDim0Aggregator<NTY> {
range: NanoRange,
count: u64,
min: NTY,
max: NTY,
// Carry over to next bin:
avg: f32,
sumc: u64,
sum: f32,
}
impl<NTY: ScalarOps> BinsXbinDim0Aggregator<NTY> {
pub fn new(range: NanoRange, _do_time_weight: bool) -> Self {
Self {
range,
count: 0,
min: NTY::zero_b(),
max: NTY::zero_b(),
avg: 0.,
sumc: 0,
sum: 0f32,
}
}
}
impl<NTY: ScalarOps> TimeBinnableTypeAggregator for BinsXbinDim0Aggregator<NTY> {
type Input = BinsXbinDim0<NTY>;
type Output = BinsXbinDim0<NTY>;
fn range(&self) -> &NanoRange {
&self.range
}
fn ingest(&mut self, item: &Self::Input) {
for i1 in 0..item.ts1s.len() {
if item.counts[i1] == 0 {
} else if item.ts2s[i1] <= self.range.beg {
} else if item.ts1s[i1] >= self.range.end {
} else {
if self.count == 0 {
self.min = item.mins[i1].clone();
self.max = item.maxs[i1].clone();
} else {
if self.min > item.mins[i1] {
self.min = item.mins[i1].clone();
}
if self.max < item.maxs[i1] {
self.max = item.maxs[i1].clone();
}
}
self.count += item.counts[i1];
self.sum += item.avgs[i1];
self.sumc += 1;
}
}
}
fn result_reset(&mut self, range: NanoRange, _expand: bool) -> Self::Output {
if self.sumc > 0 {
self.avg = self.sum / self.sumc as f32;
}
let ret = Self::Output {
ts1s: [self.range.beg].into(),
ts2s: [self.range.end].into(),
counts: [self.count].into(),
mins: [self.min.clone()].into(),
maxs: [self.max.clone()].into(),
avgs: [self.avg].into(),
};
self.range = range;
self.count = 0;
self.sum = 0f32;
self.sumc = 0;
ret
}
}
impl<NTY: ScalarOps> TimeBinnable for BinsXbinDim0<NTY> {
fn time_binner_new(&self, edges: Vec<u64>, do_time_weight: bool) -> Box<dyn TimeBinner> {
let ret = BinsXbinDim0TimeBinner::<NTY>::new(edges.into(), do_time_weight);
Box::new(ret)
}
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
fn to_box_to_json_result(&self) -> Box<dyn ToJsonResult> {
let k = serde_json::to_value(self).unwrap();
Box::new(k) as _
}
}
pub struct BinsXbinDim0TimeBinner<NTY: ScalarOps> {
edges: VecDeque<u64>,
do_time_weight: bool,
agg: Option<BinsXbinDim0Aggregator<NTY>>,
ready: Option<<BinsXbinDim0Aggregator<NTY> as TimeBinnableTypeAggregator>::Output>,
}
impl<NTY: ScalarOps> BinsXbinDim0TimeBinner<NTY> {
fn new(edges: VecDeque<u64>, do_time_weight: bool) -> Self {
Self {
edges,
do_time_weight,
agg: None,
ready: None,
}
}
fn next_bin_range(&mut self) -> Option<NanoRange> {
if self.edges.len() >= 2 {
let ret = NanoRange {
beg: self.edges[0],
end: self.edges[1],
};
self.edges.pop_front();
Some(ret)
} else {
None
}
}
}
impl<NTY: ScalarOps> TimeBinner for BinsXbinDim0TimeBinner<NTY> {
fn ingest(&mut self, item: &dyn TimeBinnable) {
let self_name = std::any::type_name::<Self>();
if item.len() == 0 {
// Return already here, RangeOverlapInfo would not give much sense.
return;
}
if self.edges.len() < 2 {
warn!("TimeBinnerDyn for {self_name} no more bin in edges A\n{:?}\n\n", item);
return;
}
// TODO optimize by remembering at which event array index we have arrived.
// That needs modified interfaces which can take and yield the start and latest index.
loop {
while item.starts_after(NanoRange {
beg: 0,
end: self.edges[1],
}) {
self.cycle();
if self.edges.len() < 2 {
warn!("TimeBinnerDyn for {self_name} no more bin in edges B\n{:?}\n\n", item);
return;
}
}
if item.ends_before(NanoRange {
beg: self.edges[0],
end: u64::MAX,
}) {
return;
} else {
if self.edges.len() < 2 {
warn!("TimeBinnerDyn for {self_name} edge list exhausted");
return;
} else {
let agg = if let Some(agg) = self.agg.as_mut() {
agg
} else {
self.agg = Some(BinsXbinDim0Aggregator::new(
// We know here that we have enough edges for another bin.
// and `next_bin_range` will pop the first edge.
self.next_bin_range().unwrap(),
self.do_time_weight,
));
self.agg.as_mut().unwrap()
};
if let Some(item) = item
.as_any()
// TODO make statically sure that we attempt to cast to the correct type here:
.downcast_ref::<<BinsXbinDim0Aggregator<NTY> as TimeBinnableTypeAggregator>::Input>()
{
agg.ingest(item);
} else {
let tyid_item = std::any::Any::type_id(item.as_any());
error!("not correct item type {:?}", tyid_item);
};
if item.ends_after(agg.range().clone()) {
self.cycle();
if self.edges.len() < 2 {
warn!("TimeBinnerDyn for {self_name} no more bin in edges C\n{:?}\n\n", item);
return;
}
} else {
break;
}
}
}
}
}
fn bins_ready_count(&self) -> usize {
match &self.ready {
Some(k) => k.len(),
None => 0,
}
}
fn bins_ready(&mut self) -> Option<Box<dyn items_0::TimeBinned>> {
match self.ready.take() {
Some(k) => Some(Box::new(k)),
None => None,
}
}
// TODO there is too much common code between implementors:
fn push_in_progress(&mut self, push_empty: bool) {
// TODO expand should be derived from AggKind. Is it still required after all?
let expand = true;
if let Some(agg) = self.agg.as_mut() {
let dummy_range = NanoRange { beg: 4, end: 5 };
let mut bins = agg.result_reset(dummy_range, expand);
self.agg = None;
assert_eq!(bins.len(), 1);
if push_empty || bins.counts[0] != 0 {
match self.ready.as_mut() {
Some(ready) => {
ready.append_all_from(&mut bins);
}
None => {
self.ready = Some(bins);
}
}
}
}
}
// TODO there is too much common code between implementors:
fn cycle(&mut self) {
let n = self.bins_ready_count();
self.push_in_progress(true);
if self.bins_ready_count() == n {
if let Some(range) = self.next_bin_range() {
let mut bins = BinsXbinDim0::<NTY>::empty();
bins.append_zero(range.beg, range.end);
match self.ready.as_mut() {
Some(ready) => {
ready.append_all_from(&mut bins);
}
None => {
self.ready = Some(bins);
}
}
if self.bins_ready_count() <= n {
error!("failed to push a zero bin");
}
} else {
warn!("cycle: no in-progress bin pushed, but also no more bin to add as zero-bin");
}
}
}
fn set_range_complete(&mut self) {}
}
impl<NTY: ScalarOps> TimeBinned for BinsXbinDim0<NTY> {
fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable {
self as &dyn TimeBinnable
}
fn edges_slice(&self) -> (&[u64], &[u64]) {
if self.ts1s.as_slices().1.len() != 0 {
panic!();
}
if self.ts2s.as_slices().1.len() != 0 {
panic!();
}
(&self.ts1s.as_slices().0, &self.ts2s.as_slices().0)
}
fn counts(&self) -> &[u64] {
// TODO check for contiguous
self.counts.as_slices().0
}
// TODO is Vec needed?
fn mins(&self) -> Vec<f32> {
self.mins.iter().map(|x| x.clone().as_prim_f32_b()).collect()
}
// TODO is Vec needed?
fn maxs(&self) -> Vec<f32> {
self.maxs.iter().map(|x| x.clone().as_prim_f32_b()).collect()
}
// TODO is Vec needed?
fn avgs(&self) -> Vec<f32> {
self.avgs.iter().map(Clone::clone).collect()
}
fn validate(&self) -> Result<(), String> {
use std::fmt::Write;
let mut msg = String::new();
if self.ts1s.len() != self.ts2s.len() {
write!(&mut msg, "ts1s ≠ ts2s\n").unwrap();
}
for (i, ((count, min), max)) in self.counts.iter().zip(&self.mins).zip(&self.maxs).enumerate() {
if min.as_prim_f32_b() < 1. && *count != 0 {
write!(&mut msg, "i {} count {} min {:?} max {:?}\n", i, count, min, max).unwrap();
}
}
if msg.is_empty() {
Ok(())
} else {
Err(msg)
}
}
fn as_collectable_mut(&mut self) -> &mut dyn Collectable {
self
}
}
impl<NTY> items_0::AsAnyMut for BinsXbinDim0<NTY>
where
NTY: 'static,
{
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
impl<NTY> items_0::collect_c::Collectable for BinsXbinDim0<NTY>
where
NTY: ScalarOps,
{
fn new_collector(&self) -> Box<dyn items_0::collect_c::Collector> {
Box::new(BinsXbinDim0Collector::<NTY>::new())
}
}

View File

@@ -110,6 +110,10 @@ mod serde_channel_events {
let obj: EventsDim0<f32> = seq.next_element()?.ok_or(de::Error::missing_field("obj .2"))?;
Ok(Box::new(obj))
}
f64::SUB => {
let obj: EventsDim0<f64> = seq.next_element()?.ok_or(de::Error::missing_field("obj .2"))?;
Ok(Box::new(obj))
}
_ => Err(de::Error::custom(&format!("unknown nty {e1}"))),
}
} else {
@@ -422,9 +426,6 @@ impl ChannelEventsCollector {
}
impl items_0::collect_c::Collector for ChannelEventsCollector {
type Input = ChannelEvents;
type Output = Box<dyn items_0::collect_c::Collected>;
fn len(&self) -> usize {
match &self.coll {
Some(coll) => coll.len(),
@@ -432,19 +433,23 @@ impl items_0::collect_c::Collector for ChannelEventsCollector {
}
}
fn ingest(&mut self, item: &mut Self::Input) {
match item {
ChannelEvents::Events(item) => {
if self.coll.is_none() {
let coll = item.as_ref().as_collectable_with_default_ref().new_collector();
self.coll = Some(coll);
fn ingest(&mut self, item: &mut dyn items_0::collect_c::Collectable) {
if let Some(item) = item.as_any_mut().downcast_mut::<ChannelEvents>() {
match item {
ChannelEvents::Events(item) => {
if self.coll.is_none() {
let coll = item.as_ref().as_collectable_with_default_ref().new_collector();
self.coll = Some(coll);
}
let coll = self.coll.as_mut().unwrap();
coll.ingest(item.as_collectable_with_default_mut());
}
ChannelEvents::Status(_) => {
// TODO decide on output format to collect also the connection status events
}
let coll = self.coll.as_mut().unwrap();
coll.ingest(item.as_collectable_with_default_mut());
}
ChannelEvents::Status(_) => {
// TODO decide on output format to collect also the connection status events
}
} else {
error!("ChannelEventsCollector::ingest unexpected item {:?}", item);
}
}
@@ -456,7 +461,7 @@ impl items_0::collect_c::Collector for ChannelEventsCollector {
self.timed_out = true;
}
fn result(&mut self) -> Result<Self::Output, err::Error> {
fn result(&mut self) -> Result<Box<dyn items_0::collect_c::Collected>, err::Error> {
match self.coll.as_mut() {
Some(coll) => {
if self.range_complete {
@@ -479,10 +484,14 @@ impl items_0::collect_c::Collector for ChannelEventsCollector {
}
}
impl items_0::collect_c::Collectable for ChannelEvents {
type Collector = ChannelEventsCollector;
fn new_collector(&self) -> Self::Collector {
ChannelEventsCollector::new()
impl items_0::AsAnyMut for ChannelEvents {
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
impl items_0::collect_c::Collectable for ChannelEvents {
fn new_collector(&self) -> Box<dyn items_0::collect_c::Collector> {
Box::new(ChannelEventsCollector::new())
}
}

View File

@@ -1,5 +1,6 @@
use crate::binsdim0::BinsDim0;
use crate::{pulse_offs_from_abs, ts_offs_from_abs, RangeOverlapInfo};
use crate::{pulse_offs_from_abs, ts_offs_from_abs};
use crate::{IsoDateTime, RangeOverlapInfo};
use crate::{TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinner};
use err::Error;
use items_0::scalar_ops::ScalarOps;
@@ -108,7 +109,10 @@ impl<NTY: ScalarOps> RangeOverlapInfo for EventsDim0<NTY> {
}
}
impl<NTY: ScalarOps> TimeBinnableType for EventsDim0<NTY> {
impl<NTY> TimeBinnableType for EventsDim0<NTY>
where
NTY: ScalarOps,
{
type Output = BinsDim0<NTY>;
type Aggregator = EventsDim0Aggregator<NTY>;
@@ -159,10 +163,12 @@ pub struct EventsDim0CollectorOutput<NTY> {
pulse_off: VecDeque<u64>,
#[serde(rename = "values")]
values: VecDeque<NTY>,
#[serde(rename = "finalisedRange", default, skip_serializing_if = "crate::bool_is_false")]
#[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")]
range_complete: bool,
#[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")]
timed_out: bool,
#[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
continue_at: Option<IsoDateTime>,
}
impl<NTY: ScalarOps> EventsDim0CollectorOutput<NTY> {
@@ -206,8 +212,6 @@ impl<NTY: ScalarOps> items_0::AsAnyRef for EventsDim0CollectorOutput<NTY> {
}
}
impl<NTY: ScalarOps> items_0::collect_c::Collected for EventsDim0CollectorOutput<NTY> {}
impl<NTY: ScalarOps> items_0::collect_s::ToJsonResult for EventsDim0CollectorOutput<NTY> {
fn to_json_result(&self) -> Result<Box<dyn items_0::collect_s::ToJsonBytes>, Error> {
let k = serde_json::to_value(self)?;
@@ -219,12 +223,13 @@ impl<NTY: ScalarOps> items_0::collect_s::ToJsonResult for EventsDim0CollectorOut
}
}
impl<NTY: ScalarOps> items_0::collect_c::Collected for EventsDim0CollectorOutput<NTY> {}
impl<NTY: ScalarOps> items_0::collect_s::CollectorType for EventsDim0Collector<NTY> {
type Input = EventsDim0<NTY>;
type Output = EventsDim0CollectorOutput<NTY>;
fn ingest(&mut self, src: &mut Self::Input) {
// TODO could be optimized by non-contiguous container.
self.vals.tss.append(&mut src.tss);
self.vals.pulses.append(&mut src.pulses);
self.vals.values.append(&mut src.values);
@@ -239,18 +244,38 @@ impl<NTY: ScalarOps> items_0::collect_s::CollectorType for EventsDim0Collector<N
}
fn result(&mut self) -> Result<Self::Output, Error> {
// TODO require contiguous slices
let tst = ts_offs_from_abs(&self.vals.tss.as_slices().0);
let (pulse_anchor, pulse_off) = pulse_offs_from_abs(&self.vals.pulses.as_slices().0);
// If we timed out, we want to hint the client from where to continue.
// This is tricky: currently, client can not request a left-exclusive range.
// We currently give the timestamp of the last event plus a small delta.
// The amount of the delta must take into account what kind of timestamp precision the client
// can parse and handle.
let continue_at = if self.timed_out {
if let Some(ts) = self.vals.tss.back() {
Some(IsoDateTime::from_u64(*ts))
} else {
// TODO tricky: should yield again the original range begin? Leads to recursion.
// Range begin plus delta?
// Anyway, we don't have the range begin here.
warn!("timed out without any result, can not yield a continue-at");
None
}
} else {
None
};
let tss_sl = self.vals.tss.make_contiguous();
let pulses_sl = self.vals.pulses.make_contiguous();
let (ts_anchor_sec, ts_off_ms, ts_off_ns) = ts_offs_from_abs(tss_sl);
let (pulse_anchor, pulse_off) = pulse_offs_from_abs(pulses_sl);
let ret = Self::Output {
ts_anchor_sec: tst.0,
ts_off_ms: tst.1,
ts_off_ns: tst.2,
ts_anchor_sec,
ts_off_ms,
ts_off_ns,
pulse_anchor,
pulse_off: pulse_off,
values: mem::replace(&mut self.vals.values, VecDeque::new()),
range_complete: self.range_complete,
timed_out: self.timed_out,
continue_at,
};
Ok(ret)
}
@@ -265,15 +290,16 @@ impl<NTY: ScalarOps> items_0::collect_s::CollectableType for EventsDim0<NTY> {
}
impl<NTY: ScalarOps> items_0::collect_c::Collector for EventsDim0Collector<NTY> {
type Input = EventsDim0<NTY>;
type Output = EventsDim0CollectorOutput<NTY>;
fn len(&self) -> usize {
self.vals.len()
}
fn ingest(&mut self, item: &mut Self::Input) {
items_0::collect_s::CollectorType::ingest(self, item)
fn ingest(&mut self, item: &mut dyn items_0::collect_c::Collectable) {
if let Some(item) = item.as_any_mut().downcast_mut::<EventsDim0<NTY>>() {
items_0::collect_s::CollectorType::ingest(self, item)
} else {
error!("EventsDim0Collector::ingest unexpected item {:?}", item);
}
}
fn set_range_complete(&mut self) {
@@ -284,8 +310,11 @@ impl<NTY: ScalarOps> items_0::collect_c::Collector for EventsDim0Collector<NTY>
items_0::collect_s::CollectorType::set_timed_out(self)
}
fn result(&mut self) -> Result<Self::Output, err::Error> {
items_0::collect_s::CollectorType::result(self).map_err(Into::into)
fn result(&mut self) -> Result<Box<dyn items_0::collect_c::Collected>, err::Error> {
match items_0::collect_s::CollectorType::result(self) {
Ok(x) => Ok(Box::new(x)),
Err(e) => Err(e.into()),
}
}
}
@@ -888,6 +917,7 @@ impl<NTY: ScalarOps> TimeBinner for EventsDim0TimeBinner<NTY> {
}
}
// TODO remove this struct?
#[derive(Debug)]
pub struct EventsDim0CollectorDyn {}
@@ -903,7 +933,6 @@ impl items_0::collect_c::CollectorDyn for EventsDim0CollectorDyn {
}
fn ingest(&mut self, _item: &mut dyn items_0::collect_c::CollectableWithDefault) {
// TODO remove this struct?
todo!()
}
@@ -961,10 +990,14 @@ impl<NTY: ScalarOps> items_0::collect_c::CollectableWithDefault for EventsDim0<N
}
}
impl<NTY: ScalarOps> items_0::collect_c::Collectable for EventsDim0<NTY> {
type Collector = EventsDim0Collector<NTY>;
fn new_collector(&self) -> Self::Collector {
EventsDim0Collector::new()
impl<NTY: ScalarOps> items_0::AsAnyMut for EventsDim0<NTY> {
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
impl<NTY: ScalarOps> items_0::collect_c::Collectable for EventsDim0<NTY> {
fn new_collector(&self) -> Box<dyn items_0::collect_c::Collector> {
Box::new(EventsDim0Collector::<NTY>::new())
}
}

View File

@@ -0,0 +1,498 @@
use crate::binsxbindim0::BinsXbinDim0;
use crate::RangeOverlapInfo;
use crate::{pulse_offs_from_abs, ts_offs_from_abs};
use crate::{TimeBinnableType, TimeBinnableTypeAggregator};
use err::Error;
use items_0::scalar_ops::ScalarOps;
use items_0::Empty;
use items_0::WithLen;
use netpod::log::*;
use netpod::NanoRange;
use serde::{Deserialize, Serialize};
use std::any::Any;
use std::collections::VecDeque;
use std::fmt;
#[derive(Clone, PartialEq, Serialize, Deserialize)]
pub struct EventsXbinDim0<NTY> {
pub tss: VecDeque<u64>,
pub pulses: VecDeque<u64>,
pub mins: VecDeque<NTY>,
pub maxs: VecDeque<NTY>,
pub avgs: VecDeque<f32>,
// TODO maybe add variance?
}
impl<NTY> EventsXbinDim0<NTY> {
#[inline(always)]
pub fn push(&mut self, ts: u64, pulse: u64, min: NTY, max: NTY, avg: f32) {
self.tss.push_back(ts);
self.pulses.push_back(pulse);
self.mins.push_back(min);
self.maxs.push_back(max);
self.avgs.push_back(avg);
}
#[inline(always)]
pub fn push_front(&mut self, ts: u64, pulse: u64, min: NTY, max: NTY, avg: f32) {
self.tss.push_front(ts);
self.pulses.push_front(pulse);
self.mins.push_front(min);
self.maxs.push_front(max);
self.avgs.push_front(avg);
}
}
impl<NTY> fmt::Debug for EventsXbinDim0<NTY>
where
NTY: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("EventsXbinDim0")
.field("tss", &self.tss)
.field("pulses", &self.pulses)
.field("mins", &self.mins)
.field("maxs", &self.maxs)
.field("avgs", &self.avgs)
.finish()
}
}
impl<NTY> items::ByteEstimate for EventsXbinDim0<NTY> {
fn byte_estimate(&self) -> u64 {
todo!("byte_estimate")
}
}
impl<NTY> Empty for EventsXbinDim0<NTY> {
fn empty() -> Self {
Self {
tss: VecDeque::new(),
pulses: VecDeque::new(),
mins: VecDeque::new(),
maxs: VecDeque::new(),
avgs: VecDeque::new(),
}
}
}
impl<NTY> WithLen for EventsXbinDim0<NTY> {
fn len(&self) -> usize {
self.tss.len()
}
}
impl<NTY> RangeOverlapInfo for EventsXbinDim0<NTY> {
fn ends_before(&self, range: NanoRange) -> bool {
match self.tss.back() {
Some(&ts) => ts < range.beg,
None => true,
}
}
fn ends_after(&self, range: NanoRange) -> bool {
match self.tss.back() {
Some(&ts) => ts >= range.end,
None => panic!(),
}
}
fn starts_after(&self, range: NanoRange) -> bool {
match self.tss.front() {
Some(&ts) => ts >= range.end,
None => panic!(),
}
}
}
impl<NTY> TimeBinnableType for EventsXbinDim0<NTY>
where
NTY: ScalarOps,
{
type Output = BinsXbinDim0<NTY>;
type Aggregator = EventsXbinDim0Aggregator<NTY>;
fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
let name = std::any::type_name::<Self>();
debug!(
"TimeBinnableType for {} aggregator() range {:?} x_bin_count {} do_time_weight {}",
name, range, x_bin_count, do_time_weight
);
Self::Aggregator::new(range, do_time_weight)
}
}
pub struct EventsXbinDim0Aggregator<NTY>
where
NTY: ScalarOps,
{
range: NanoRange,
count: u64,
min: NTY,
max: NTY,
sumc: u64,
sum: f32,
int_ts: u64,
last_ts: u64,
last_avg: Option<f32>,
last_min: Option<NTY>,
last_max: Option<NTY>,
do_time_weight: bool,
}
impl<NTY> EventsXbinDim0Aggregator<NTY>
where
NTY: ScalarOps,
{
pub fn new(range: NanoRange, do_time_weight: bool) -> Self {
Self {
int_ts: range.beg,
range,
count: 0,
min: NTY::zero_b(),
max: NTY::zero_b(),
sumc: 0,
sum: 0f32,
last_ts: 0,
last_avg: None,
last_min: None,
last_max: None,
do_time_weight,
}
}
fn apply_min_max(&mut self, min: NTY, max: NTY) {
if self.count == 0 {
self.min = min;
self.max = max;
} else {
if min < self.min {
self.min = min;
}
if max > self.max {
self.max = max;
}
}
}
fn apply_event_unweight(&mut self, avg: f32, min: NTY, max: NTY) {
//debug!("apply_event_unweight");
self.apply_min_max(min, max);
let vf = avg;
if vf.is_nan() {
} else {
self.sum += vf;
self.sumc += 1;
}
}
fn apply_event_time_weight(&mut self, ts: u64) {
//debug!("apply_event_time_weight");
if let (Some(avg), Some(min), Some(max)) = (self.last_avg, &self.last_min, &self.last_max) {
let min2 = min.clone();
let max2 = max.clone();
self.apply_min_max(min2, max2);
let w = (ts - self.int_ts) as f32 / self.range.delta() as f32;
if avg.is_nan() {
} else {
self.sum += avg * w;
}
self.sumc += 1;
self.int_ts = ts;
}
}
fn ingest_unweight(&mut self, item: &EventsXbinDim0<NTY>) {
for i1 in 0..item.tss.len() {
let ts = item.tss[i1];
let avg = item.avgs[i1];
let min = item.mins[i1].clone();
let max = item.maxs[i1].clone();
if ts < self.range.beg {
} else if ts >= self.range.end {
} else {
self.apply_event_unweight(avg, min, max);
self.count += 1;
}
}
}
fn ingest_time_weight(&mut self, item: &EventsXbinDim0<NTY>) {
for i1 in 0..item.tss.len() {
let ts = item.tss[i1];
let avg = item.avgs[i1];
let min = item.mins[i1].clone();
let max = item.maxs[i1].clone();
if ts < self.int_ts {
self.last_ts = ts;
self.last_avg = Some(avg);
self.last_min = Some(min);
self.last_max = Some(max);
} else if ts >= self.range.end {
return;
} else {
self.apply_event_time_weight(ts);
self.count += 1;
self.last_ts = ts;
self.last_avg = Some(avg);
self.last_min = Some(min);
self.last_max = Some(max);
}
}
}
fn result_reset_unweight(&mut self, range: NanoRange, _expand: bool) -> BinsXbinDim0<NTY> {
let avg = if self.sumc == 0 {
0f32
} else {
self.sum / self.sumc as f32
};
let ret = BinsXbinDim0::from_content(
[self.range.beg].into(),
[self.range.end].into(),
[self.count].into(),
[self.min.clone()].into(),
[self.max.clone()].into(),
[avg].into(),
);
self.int_ts = range.beg;
self.range = range;
self.count = 0;
self.min = NTY::zero_b();
self.max = NTY::zero_b();
self.sum = 0f32;
self.sumc = 0;
ret
}
fn result_reset_time_weight(&mut self, range: NanoRange, expand: bool) -> BinsXbinDim0<NTY> {
// TODO check callsite for correct expand status.
if true || expand {
self.apply_event_time_weight(self.range.end);
}
let avg = {
let sc = self.range.delta() as f32 * 1e-9;
self.sum / sc
};
let ret = BinsXbinDim0::from_content(
[self.range.beg].into(),
[self.range.end].into(),
[self.count].into(),
[self.min.clone()].into(),
[self.max.clone()].into(),
[avg].into(),
);
self.int_ts = range.beg;
self.range = range;
self.count = 0;
self.min = NTY::zero_b();
self.max = NTY::zero_b();
self.sum = 0f32;
self.sumc = 0;
ret
}
}
impl<NTY> TimeBinnableTypeAggregator for EventsXbinDim0Aggregator<NTY>
where
NTY: ScalarOps,
{
type Input = EventsXbinDim0<NTY>;
type Output = BinsXbinDim0<NTY>;
fn range(&self) -> &NanoRange {
&self.range
}
fn ingest(&mut self, item: &Self::Input) {
debug!("ingest");
if self.do_time_weight {
self.ingest_time_weight(item)
} else {
self.ingest_unweight(item)
}
}
fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output {
if self.do_time_weight {
self.result_reset_time_weight(range, expand)
} else {
self.result_reset_unweight(range, expand)
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct EventsXbinDim0CollectorOutput<NTY> {
#[serde(rename = "tsAnchor")]
ts_anchor_sec: u64,
#[serde(rename = "tsMs")]
ts_off_ms: VecDeque<u64>,
#[serde(rename = "tsNs")]
ts_off_ns: VecDeque<u64>,
#[serde(rename = "pulseAnchor")]
pulse_anchor: u64,
#[serde(rename = "pulseOff")]
pulse_off: VecDeque<u64>,
#[serde(rename = "mins")]
mins: VecDeque<NTY>,
#[serde(rename = "maxs")]
maxs: VecDeque<NTY>,
#[serde(rename = "avgs")]
avgs: VecDeque<f32>,
#[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")]
finalised_range: bool,
#[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")]
timed_out: bool,
// TODO add continue-at
}
impl<NTY> items_0::AsAnyRef for EventsXbinDim0CollectorOutput<NTY>
where
NTY: ScalarOps,
{
fn as_any_ref(&self) -> &dyn Any {
self
}
}
impl<NTY> items_0::collect_s::ToJsonResult for EventsXbinDim0CollectorOutput<NTY>
where
NTY: ScalarOps,
{
fn to_json_result(&self) -> Result<Box<dyn items_0::collect_s::ToJsonBytes>, Error> {
let k = serde_json::to_value(self)?;
Ok(Box::new(k))
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl<NTY> items_0::collect_c::Collected for EventsXbinDim0CollectorOutput<NTY> where NTY: ScalarOps {}
#[derive(Debug)]
pub struct EventsXbinDim0Collector<NTY> {
vals: EventsXbinDim0<NTY>,
finalised_range: bool,
timed_out: bool,
}
impl<NTY> EventsXbinDim0Collector<NTY> {
pub fn new() -> Self {
Self {
finalised_range: false,
timed_out: false,
vals: EventsXbinDim0::empty(),
}
}
}
impl<NTY> WithLen for EventsXbinDim0Collector<NTY> {
fn len(&self) -> usize {
self.vals.tss.len()
}
}
impl<NTY> items_0::collect_s::CollectorType for EventsXbinDim0Collector<NTY>
where
NTY: ScalarOps,
{
type Input = EventsXbinDim0<NTY>;
type Output = EventsXbinDim0CollectorOutput<NTY>;
fn ingest(&mut self, src: &mut Self::Input) {
self.vals.tss.append(&mut src.tss);
self.vals.pulses.append(&mut src.pulses);
self.vals.mins.append(&mut src.mins);
self.vals.maxs.append(&mut src.maxs);
self.vals.avgs.append(&mut src.avgs);
}
fn set_range_complete(&mut self) {
self.finalised_range = true;
}
fn set_timed_out(&mut self) {
self.timed_out = true;
}
fn result(&mut self) -> Result<Self::Output, Error> {
use std::mem::replace;
let mins = replace(&mut self.vals.mins, VecDeque::new());
let maxs = replace(&mut self.vals.maxs, VecDeque::new());
let avgs = replace(&mut self.vals.avgs, VecDeque::new());
self.vals.tss.make_contiguous();
self.vals.pulses.make_contiguous();
let tst = ts_offs_from_abs(self.vals.tss.as_slices().0);
let (pulse_anchor, pulse_off) = pulse_offs_from_abs(&self.vals.pulses.as_slices().0);
let ret = Self::Output {
ts_anchor_sec: tst.0,
ts_off_ms: tst.1,
ts_off_ns: tst.2,
pulse_anchor,
pulse_off,
mins,
maxs,
avgs,
finalised_range: self.finalised_range,
timed_out: self.timed_out,
};
Ok(ret)
}
}
impl<NTY> items_0::collect_s::CollectableType for EventsXbinDim0<NTY>
where
NTY: ScalarOps,
{
type Collector = EventsXbinDim0Collector<NTY>;
fn new_collector() -> Self::Collector {
Self::Collector::new()
}
}
impl<NTY> items_0::AsAnyMut for EventsXbinDim0<NTY>
where
NTY: ScalarOps,
{
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
impl<NTY> items_0::collect_c::Collector for EventsXbinDim0Collector<NTY>
where
NTY: ScalarOps,
{
fn len(&self) -> usize {
todo!()
}
fn ingest(&mut self, _item: &mut dyn items_0::collect_c::Collectable) {
todo!()
}
fn set_range_complete(&mut self) {
todo!()
}
fn set_timed_out(&mut self) {
todo!()
}
fn result(&mut self) -> Result<Box<dyn items_0::collect_c::Collected>, Error> {
todo!()
}
}
impl<NTY> items_0::collect_c::Collectable for EventsXbinDim0<NTY>
where
NTY: ScalarOps,
{
fn new_collector(&self) -> Box<dyn items_0::collect_c::Collector> {
Box::new(<Self as items_0::collect_s::CollectableType>::new_collector())
}
}

View File

@@ -1,6 +1,8 @@
pub mod binsdim0;
pub mod binsxbindim0;
pub mod channelevents;
pub mod eventsdim0;
pub mod eventsxbindim0;
pub mod merger;
pub mod merger_cev;
pub mod streams;
@@ -138,6 +140,12 @@ impl serde::de::Error for Error {
#[derive(Clone, Debug, PartialEq, Deserialize)]
pub struct IsoDateTime(DateTime<Utc>);
impl IsoDateTime {
pub fn from_u64(ts: u64) -> Self {
IsoDateTime(Utc.timestamp_nanos(ts as i64))
}
}
impl Serialize for IsoDateTime {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
@@ -182,6 +190,7 @@ impl crate::merger::Mergeable for Box<dyn Events> {
}
}
// TODO rename to `Typed`
pub trait TimeBinnableType: Send + Unpin + RangeOverlapInfo {
type Output: TimeBinnableType;
type Aggregator: TimeBinnableTypeAggregator<Input = Self, Output = Self::Output> + Send + Unpin;

View File

@@ -1,8 +1,8 @@
pub mod api4;
pub mod histo;
pub mod query;
pub mod status;
pub mod streamext;
pub mod api4;
use crate::log::*;
use bytes::Bytes;
@@ -1399,9 +1399,9 @@ impl fmt::Debug for BinnedGridSpec {
#[derive(Clone, Debug)]
pub struct BinnedRange {
pub grid_spec: BinnedGridSpec,
pub offset: u64,
pub count: u64,
grid_spec: BinnedGridSpec,
offset: u64,
bin_count: u64,
}
impl BinnedRange {
@@ -1435,7 +1435,7 @@ impl BinnedRange {
let offset = ts1 / bl;
let ret = Self {
grid_spec,
count,
bin_count: count,
offset,
};
break Ok(ret);
@@ -1444,6 +1444,14 @@ impl BinnedRange {
}
}
pub fn bin_count(&self) -> u64 {
self.bin_count
}
pub fn grid_spec(&self) -> &BinnedGridSpec {
&self.grid_spec
}
pub fn get_range(&self, ix: u32) -> NanoRange {
NanoRange {
beg: (self.offset + ix as u64) * self.grid_spec.bin_t_len,
@@ -1454,14 +1462,14 @@ impl BinnedRange {
pub fn full_range(&self) -> NanoRange {
NanoRange {
beg: self.offset * self.grid_spec.bin_t_len,
end: (self.offset + self.count) * self.grid_spec.bin_t_len,
end: (self.offset + self.bin_count) * self.grid_spec.bin_t_len,
}
}
pub fn edges(&self) -> Vec<u64> {
let mut ret = Vec::new();
let mut t = self.offset * self.grid_spec.bin_t_len;
let end = (self.offset + self.count) * self.grid_spec.bin_t_len;
let end = (self.offset + self.bin_count) * self.grid_spec.bin_t_len;
while t <= end {
ret.push(t);
t += self.grid_spec.bin_t_len;
@@ -1470,6 +1478,36 @@ impl BinnedRange {
}
}
#[cfg(test)]
mod test_binned_range {
use super::*;
#[test]
fn binned_range_00() {
let range = NanoRange {
beg: HOUR * 72,
end: HOUR * 73,
};
let range = BinnedRange::covering_range(range, 10).unwrap();
assert_eq!(range.bin_count(), 12);
assert_eq!(range.edges()[0], HOUR * 72);
assert_eq!(range.edges()[2], HOUR * 72 + MIN * 5 * 2);
}
#[test]
fn binned_range_01() {
let range = NanoRange {
beg: MIN * 20 + SEC * 10,
end: HOUR * 10 + MIN * 20 + SEC * 30,
};
let range = BinnedRange::covering_range(range, 10).unwrap();
assert_eq!(range.bin_count(), 11);
assert_eq!(range.edges()[0], HOUR * 0);
assert_eq!(range.edges()[1], HOUR * 1);
assert_eq!(range.edges()[11], HOUR * 11);
}
}
#[derive(Clone, Serialize, Deserialize)]
pub enum AggKind {
EventBlobs,

View File

@@ -1,7 +1,7 @@
use err::Error;
use futures_util::{Stream, StreamExt};
use items::{RangeCompletableItem, Sitemty, StreamItem};
use items_0::collect_c::{Collectable, Collector};
use items_0::collect_c::Collectable;
use netpod::log::*;
use std::fmt;
use std::time::{Duration, Instant};
@@ -29,14 +29,14 @@ pub async fn collect<T, S>(
stream: S,
deadline: Instant,
events_max: u64,
) -> Result<<<T as Collectable>::Collector as Collector>::Output, Error>
) -> Result<Box<dyn items_0::collect_c::Collected>, Error>
where
S: Stream<Item = Sitemty<T>> + Unpin,
T: Collectable + fmt::Debug,
{
let span = tracing::span!(tracing::Level::TRACE, "collect");
let fut = async {
let mut collector: Option<<T as Collectable>::Collector> = None;
let mut collector: Option<Box<dyn items_0::collect_c::Collector>> = None;
let mut stream = stream;
let deadline = deadline.into();
let mut range_complete = false;

View File

@@ -2,6 +2,7 @@ use crate::test::runfut;
use err::Error;
use futures_util::{stream, StreamExt};
use items::{sitem_data, RangeCompletableItem, StreamItem};
use items_0::Empty;
use items_2::binsdim0::BinsDim0;
use items_2::channelevents::{ChannelEvents, ConnStatus, ConnStatusEvent};
use items_2::testgen::make_some_boxed_d0_f32;

View File

@@ -3,24 +3,22 @@ use err::Error;
use futures_util::StreamExt;
#[allow(unused)]
use netpod::log::*;
use netpod::Cluster;
use serde::Serialize;
use netpod::query::{BinnedQuery, RawEventsQuery};
use netpod::{BinnedRange, Cluster};
use serde_json::Value as JsonValue;
use std::time::{Duration, Instant};
pub async fn timebinned_json<SER>(query: SER, cluster: &Cluster) -> Result<JsonValue, Error>
where
SER: Serialize,
{
// TODO should be able to ask for data-events only, instead of mixed data and status events.
let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&query, cluster).await?;
pub async fn timebinned_json(query: &BinnedQuery, cluster: &Cluster) -> Result<JsonValue, Error> {
let binned_range = BinnedRange::covering_range(query.range().clone(), query.bin_count())?;
let events_max = 10000;
let do_time_weight = query.agg_kind().do_time_weighted();
let deadline = Instant::now() + Duration::from_millis(7500);
let rawquery = RawEventsQuery::new(query.channel().clone(), query.range().clone(), query.agg_kind().clone());
let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&rawquery, cluster).await?;
// TODO propagate also the max-buf-len for the first stage event reader:
let stream = { items_2::merger::Merger::new(inps, 1) };
let events_max = 10000;
let do_time_weight = true;
let deadline = Instant::now() + Duration::from_millis(7500);
let stream = Box::pin(stream);
let stream = crate::timebin::TimeBinnedStream::new(stream, Vec::new(), do_time_weight, deadline);
let stream = crate::timebin::TimeBinnedStream::new(stream, binned_range.edges(), do_time_weight, deadline);
if false {
let mut stream = stream;
let _: Option<items::Sitemty<Box<dyn items_0::TimeBinned>>> = stream.next().await;