From aa74fd4f259ea04b2ece99b9a22cdc11ee62d043 Mon Sep 17 00:00:00 2001
From: Dominik Werder
Date: Mon, 5 Dec 2022 12:01:19 +0100
Subject: [PATCH] Move binned type, add tests
---
daqbufp2/src/test.rs | 26 +-
daqbufp2/src/test/api4/binnedjson.rs | 240 ++++-
daqbufp2/src/test/api4/eventsjson.rs | 2 +-
daqbufp2/src/test/archapp.rs | 4 +-
daqbufp2/src/test/binnedjson.rs | 390 +--------
.../src/test/binnedjson/channelarchiver.rs | 10 +-
daqbufp2/src/test/timeweightedjson.rs | 85 +-
disk/src/agg/binnedt.rs | 2 +-
disk/src/binned.rs | 6 +-
disk/src/raw/conn.rs | 41 +-
httpret/src/api4/binned.rs | 13 +-
httpret/src/channelconfig.rs | 37 +-
httpret/static/documentation/api4.html | 8 +-
items/src/binsdim0.rs | 2 +-
items/src/binsdim1.rs | 4 +-
items/src/items.rs | 2 +-
items/src/scalarevents.rs | 2 +-
items/src/statsevents.rs | 2 +-
items/src/xbinnedscalarevents.rs | 2 +-
items/src/xbinnedwaveevents.rs | 2 +-
items_0/src/collect_c.rs | 84 +-
items_0/src/collect_s.rs | 2 +
items_0/src/items_0.rs | 8 +-
items_2/src/binsdim0.rs | 174 +++-
items_2/src/binsxbindim0.rs | 819 ++++++++++++++++++
items_2/src/channelevents.rs | 49 +-
items_2/src/eventsdim0.rs | 83 +-
items_2/src/eventsxbindim0.rs | 498 +++++++++++
items_2/src/items_2.rs | 9 +
netpod/src/netpod.rs | 52 +-
streams/src/collect.rs | 6 +-
streams/src/test/timebin.rs | 1 +
streams/src/timebinnedjson.rs | 22 +-
33 files changed, 1988 insertions(+), 699 deletions(-)
create mode 100644 items_2/src/binsxbindim0.rs
create mode 100644 items_2/src/eventsxbindim0.rs
diff --git a/daqbufp2/src/test.rs b/daqbufp2/src/test.rs
index 99dabb1..26709d2 100644
--- a/daqbufp2/src/test.rs
+++ b/daqbufp2/src/test.rs
@@ -14,6 +14,20 @@ use bytes::BytesMut;
use err::Error;
use std::future::Future;
+fn f32_cmp_near(x: f32, y: f32) -> bool {
+ let x = {
+ let mut a = x.to_le_bytes();
+ a[0] &= 0xf0;
+ f32::from_ne_bytes(a)
+ };
+ let y = {
+ let mut a = y.to_le_bytes();
+ a[0] &= 0xf0;
+ f32::from_ne_bytes(a)
+ };
+ x == y
+}
+
fn f32_iter_cmp_near(a: A, b: B) -> bool
where
A: IntoIterator- ,
@@ -25,17 +39,7 @@ where
let x = a.next();
let y = b.next();
if let (Some(x), Some(y)) = (x, y) {
- let x = {
- let mut a = x.to_ne_bytes();
- a[0] &= 0xf0;
- f32::from_ne_bytes(a)
- };
- let y = {
- let mut a = y.to_ne_bytes();
- a[0] &= 0xf0;
- f32::from_ne_bytes(a)
- };
- if x != y {
+ if !f32_cmp_near(x, y) {
return false;
}
} else if x.is_some() || y.is_some() {
diff --git a/daqbufp2/src/test/api4/binnedjson.rs b/daqbufp2/src/test/api4/binnedjson.rs
index 257981a..21ace3d 100644
--- a/daqbufp2/src/test/api4/binnedjson.rs
+++ b/daqbufp2/src/test/api4/binnedjson.rs
@@ -1,5 +1,6 @@
use crate::err::ErrConv;
use crate::nodes::require_test_hosts_running;
+use crate::test::f32_cmp_near;
use chrono::{DateTime, Utc};
use err::Error;
use http::StatusCode;
@@ -16,7 +17,7 @@ fn binned_d0_json_00() -> Result<(), Error> {
let fut = async {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
- let jsv = binned_d0_json(
+ let jsv = get_binned_json(
Channel {
backend: "test-disk-databuffer".into(),
name: "scalar-i32-be".into(),
@@ -28,17 +29,235 @@ fn binned_d0_json_00() -> Result<(), Error> {
cluster,
)
.await?;
- info!("Receveided a response json value: {jsv:?}");
- let res: items_2::eventsdim0::EventsDim0CollectorOutput
= serde_json::from_value(jsv)?;
+ debug!("Receveided a response json value: {jsv:?}");
+ let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?;
// inmem was meant just for functional test, ignores the requested time range
- assert_eq!(res.len(), 20);
- assert_eq!(res.ts_anchor_sec(), 0);
+ assert_eq!(res.ts_anchor_sec(), 1200);
+ assert_eq!(res.len(), 8);
+ assert_eq!(res.ts1_off_ms()[0], 0);
+ assert_eq!(res.ts2_off_ms()[0], 5000);
+ assert_eq!(res.counts()[0], 5);
+ assert_eq!(res.counts()[1], 10);
+ assert_eq!(res.counts()[7], 7);
+ assert_eq!(res.mins()[0], 2405);
+ assert_eq!(res.maxs()[0], 2409);
+ assert_eq!(res.mins()[1], 2410);
+ assert_eq!(res.maxs()[1], 2419);
Ok(())
};
taskrun::run(fut)
}
-async fn binned_d0_json(
+#[test]
+fn binned_d0_json_01() -> Result<(), Error> {
+ let fut = async {
+ let rh = require_test_hosts_running()?;
+ let cluster = &rh.cluster;
+ let jsv = get_binned_json(
+ Channel {
+ backend: "test-disk-databuffer".into(),
+ name: "scalar-i32-be".into(),
+ series: None,
+ },
+ "1970-01-01T00:20:10.000Z",
+ "1970-01-01T01:20:30.000Z",
+ 10,
+ cluster,
+ )
+ .await?;
+ debug!("Receveided a response json value: {jsv:?}");
+ let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?;
+ // inmem was meant just for functional test, ignores the requested time range
+ assert_eq!(res.ts_anchor_sec(), 1200);
+ assert_eq!(res.len(), 13);
+ assert_eq!(res.range_final(), true);
+ Ok(())
+ };
+ taskrun::run(fut)
+}
+
+#[test]
+fn binned_d0_json_02() -> Result<(), Error> {
+ let fut = async {
+ let rh = require_test_hosts_running()?;
+ let cluster = &rh.cluster;
+ let jsv = get_binned_json(
+ Channel {
+ backend: "test-disk-databuffer".into(),
+ name: "wave-f64-be-n21".into(),
+ series: None,
+ },
+ "1970-01-01T00:20:10.000Z",
+ "1970-01-01T01:20:45.000Z",
+ 10,
+ cluster,
+ )
+ .await?;
+ debug!("Receveided a response json value: {jsv:?}");
+ let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?;
+ // inmem was meant just for functional test, ignores the requested time range
+ assert_eq!(res.ts_anchor_sec(), 1200);
+ assert_eq!(res.len(), 13);
+ assert_eq!(res.range_final(), true);
+ Ok(())
+ };
+ taskrun::run(fut)
+}
+
+#[test]
+fn binned_d0_json_03() -> Result<(), Error> {
+ let fut = async {
+ let rh = require_test_hosts_running()?;
+ let cluster = &rh.cluster;
+ let jsv = get_binned_json(
+ Channel {
+ backend: "test-disk-databuffer".into(),
+ name: "wave-f64-be-n21".into(),
+ series: None,
+ },
+ // TODO This test was meant to ask `AggKind::DimXBinsN(3)`
+ "1970-01-01T00:20:10.000Z",
+ "1970-01-01T01:20:20.000Z",
+ 2,
+ cluster,
+ )
+ .await?;
+ debug!("Receveided a response json value: {jsv:?}");
+ let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?;
+ // inmem was meant just for functional test, ignores the requested time range
+ assert_eq!(res.ts_anchor_sec(), 1200);
+ assert_eq!(res.len(), 4);
+ assert_eq!(res.range_final(), true);
+ assert_eq!(res.counts()[0], 300);
+ assert_eq!(res.counts()[3], 8);
+ assert_eq!(f32_cmp_near(res.avgs()[0], 44950.00390625), true);
+ Ok(())
+ };
+ taskrun::run(fut)
+}
+
+#[test]
+fn binned_d0_json_04() -> Result<(), Error> {
+ let fut = async {
+ let rh = require_test_hosts_running()?;
+ let cluster = &rh.cluster;
+ let jsv = get_binned_json(
+ Channel {
+ backend: "test-disk-databuffer".into(),
+ name: "const-regular-scalar-i32-be".into(),
+ series: None,
+ },
+ "1970-01-01T00:20:10.000Z",
+ "1970-01-01T04:20:30.000Z",
+ // TODO must use AggKind::DimXBins1
+ 20,
+ cluster,
+ )
+ .await?;
+ debug!("Receveided a response json value: {jsv:?}");
+ let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?;
+ // inmem was meant just for functional test, ignores the requested time range
+ assert_eq!(res.ts_anchor_sec(), 1200);
+ assert_eq!(res.len(), 17);
+ // TODO I would expect rangeFinal to be set, or?
+ assert_eq!(res.range_final(), false);
+ assert_eq!(f32_cmp_near(res.avgs()[0], 42.0), true);
+ Ok(())
+ };
+ taskrun::run(fut)
+}
+
+#[test]
+fn binned_d0_json_05() -> Result<(), Error> {
+ let fut = async {
+ let rh = require_test_hosts_running()?;
+ let cluster = &rh.cluster;
+ let jsv = get_binned_json(
+ Channel {
+ backend: "test-disk-databuffer".into(),
+ name: "const-regular-scalar-i32-be".into(),
+ series: None,
+ },
+ "1970-01-01T00:20:10.000Z",
+ "1970-01-01T10:20:30.000Z",
+ // TODO must use AggKind::DimXBins1
+ 10,
+ cluster,
+ )
+ .await?;
+ debug!("Receveided a response json value: {jsv:?}");
+ let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?;
+ // inmem was meant just for functional test, ignores the requested time range
+ assert_eq!(res.ts_anchor_sec(), 0);
+ assert_eq!(res.len(), 3);
+ assert_eq!(res.range_final(), false);
+ assert_eq!(f32_cmp_near(res.avgs()[0], 42.0), true);
+ Ok(())
+ };
+ taskrun::run(fut)
+}
+
+#[test]
+fn binned_d0_json_06() -> Result<(), Error> {
+ let fut = async {
+ let rh = require_test_hosts_running()?;
+ let cluster = &rh.cluster;
+ let jsv = get_binned_json(
+ Channel {
+ backend: "test-disk-databuffer".into(),
+ name: "const-regular-scalar-i32-be".into(),
+ series: None,
+ },
+ "1970-01-01T00:20:10.000Z",
+ "1970-01-01T00:20:20.000Z",
+ // TODO must use AggKind::TimeWeightedScalar
+ 20,
+ cluster,
+ )
+ .await?;
+ debug!("Receveided a response json value: {jsv:?}");
+ let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?;
+ // inmem was meant just for functional test, ignores the requested time range
+ assert_eq!(res.ts_anchor_sec(), 1210);
+ assert_eq!(res.len(), 20);
+ assert_eq!(res.range_final(), true);
+ assert_eq!(f32_cmp_near(res.avgs()[0], 42.0), true);
+ Ok(())
+ };
+ taskrun::run(fut)
+}
+
+#[test]
+fn binned_d0_json_07() -> Result<(), Error> {
+ let fut = async {
+ let rh = require_test_hosts_running()?;
+ let cluster = &rh.cluster;
+ let jsv = get_binned_json(
+ Channel {
+ backend: "test-disk-databuffer".into(),
+ name: "const-regular-scalar-i32-be".into(),
+ series: None,
+ },
+ "1970-01-01T00:20:11.000Z",
+ "1970-01-01T00:30:20.000Z",
+ // TODO must use AggKind::TimeWeightedScalar
+ 10,
+ cluster,
+ )
+ .await?;
+ debug!("Receveided a response json value: {jsv:?}");
+ let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?;
+ // inmem was meant just for functional test, ignores the requested time range
+ assert_eq!(res.ts_anchor_sec(), 1200);
+ assert_eq!(res.len(), 11);
+ assert_eq!(res.range_final(), true);
+ assert_eq!(f32_cmp_near(res.avgs()[0], 42.0), true);
+ Ok(())
+ };
+ taskrun::run(fut)
+}
+
+async fn get_binned_json(
channel: Channel,
beg_date: &str,
end_date: &str,
@@ -55,7 +274,7 @@ async fn binned_d0_json(
let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?;
query.append_to_url(&mut url);
let url = url;
- info!("http get {}", url);
+ debug!("http get {}", url);
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri(url.to_string())
@@ -65,8 +284,11 @@ async fn binned_d0_json(
let client = hyper::Client::new();
let res = client.request(req).await.ec()?;
if res.status() != StatusCode::OK {
- error!("client response {:?}", res);
- return Err(Error::with_msg_no_trace(format!("bad result {res:?}")));
+ error!("error response {:?}", res);
+ let buf = hyper::body::to_bytes(res.into_body()).await.ec()?;
+ let s = String::from_utf8_lossy(&buf);
+ error!("body of error response: {s}");
+ return Err(Error::with_msg_no_trace(format!("error response")));
}
let buf = hyper::body::to_bytes(res.into_body()).await.ec()?;
let s = String::from_utf8_lossy(&buf);
diff --git a/daqbufp2/src/test/api4/eventsjson.rs b/daqbufp2/src/test/api4/eventsjson.rs
index 1a67457..1a6c025 100644
--- a/daqbufp2/src/test/api4/eventsjson.rs
+++ b/daqbufp2/src/test/api4/eventsjson.rs
@@ -31,8 +31,8 @@ fn events_plain_json_00() -> Result<(), Error> {
info!("Receveided a response json value: {jsv:?}");
let res: items_2::eventsdim0::EventsDim0CollectorOutput = serde_json::from_value(jsv)?;
// inmem was meant just for functional test, ignores the requested time range
- assert_eq!(res.len(), 20);
assert_eq!(res.ts_anchor_sec(), 0);
+ assert_eq!(res.len(), 60);
Ok(())
};
taskrun::run(fut)
diff --git a/daqbufp2/src/test/archapp.rs b/daqbufp2/src/test/archapp.rs
index b9cba88..7f29855 100644
--- a/daqbufp2/src/test/archapp.rs
+++ b/daqbufp2/src/test/archapp.rs
@@ -1,4 +1,4 @@
-use super::binnedjson::ScalarEventsResponse;
+#![allow(unused)]
use super::events::get_plain_events_json;
use crate::nodes::require_archapp_test_host_running;
use crate::test::events::ch_gen;
@@ -8,6 +8,8 @@ use netpod::log::*;
#[test]
fn get_events_1() -> Result<(), Error> {
+ let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) };
+ #[cfg(DISABLED)]
let fut = async {
let rh = require_archapp_test_host_running()?;
let cluster = &rh.cluster;
diff --git a/daqbufp2/src/test/binnedjson.rs b/daqbufp2/src/test/binnedjson.rs
index 5bd5ecc..5e2f575 100644
--- a/daqbufp2/src/test/binnedjson.rs
+++ b/daqbufp2/src/test/binnedjson.rs
@@ -1,117 +1,11 @@
mod channelarchiver;
-use crate::err::ErrConv;
-use crate::nodes::{require_sls_test_host_running, require_test_hosts_running};
-use chrono::{DateTime, Utc};
use err::Error;
-use http::StatusCode;
-use hyper::Body;
-use netpod::log::*;
-use netpod::query::{BinnedQuery, CacheUsage, PlainEventsQuery};
-use netpod::{f64_close, AppendToUrl};
-use netpod::{AggKind, Channel, Cluster, NanoRange, APP_JSON};
-use serde::{Deserialize, Serialize};
-use std::time::Duration;
-use url::Url;
-
-#[test]
-fn get_binned_json_0() {
- taskrun::run(get_binned_json_0_inner()).unwrap();
-}
-
-async fn get_binned_json_0_inner() -> Result<(), Error> {
- let rh = require_test_hosts_running()?;
- let cluster = &rh.cluster;
- get_binned_json_common(
- "scalar-i32-be",
- "1970-01-01T00:20:10.000Z",
- "1970-01-01T01:20:30.000Z",
- 10,
- AggKind::DimXBins1,
- cluster,
- 13,
- true,
- )
- .await
-}
-
-#[test]
-fn get_binned_json_1() {
- taskrun::run(get_binned_json_1_inner()).unwrap();
-}
-
-async fn get_binned_json_1_inner() -> Result<(), Error> {
- let rh = require_test_hosts_running()?;
- let cluster = &rh.cluster;
- get_binned_json_common(
- "wave-f64-be-n21",
- "1970-01-01T00:20:10.000Z",
- "1970-01-01T01:20:45.000Z",
- 10,
- AggKind::DimXBins1,
- cluster,
- 13,
- true,
- )
- .await
-}
-
-#[test]
-fn get_binned_json_2() {
- taskrun::run(get_binned_json_2_inner()).unwrap();
-}
-
-async fn get_binned_json_2_inner() -> Result<(), Error> {
- let rh = require_test_hosts_running()?;
- let cluster = &rh.cluster;
- get_binned_json_common(
- "wave-f64-be-n21",
- "1970-01-01T00:20:10.000Z",
- "1970-01-01T00:20:20.000Z",
- 2,
- AggKind::DimXBinsN(3),
- cluster,
- 2,
- true,
- )
- .await
-}
-
-#[allow(unused)]
-fn check_close_events(a: &WaveEventsResponse, b: &WaveEventsResponse, jsstr: &String) -> Result<(), Error> {
- match a.is_close(b) {
- Ok(true) => Ok(()),
- Ok(false) => {
- error!("Mismatch, original JSON:\n{}", jsstr);
- Err(Error::with_msg_no_trace("mismatch"))
- }
- Err(e) => {
- error!("Mismatch, original JSON:\n{}", jsstr);
- Err(e)
- }
- }
-}
-
-fn check_close(a: &BinnedResponse, b: &BinnedResponse, jsstr: &String) -> Result<(), Error> {
- match a.is_close(b) {
- Ok(true) => Ok(()),
- Ok(false) => {
- error!("Mismatch, original JSON:\n{}", jsstr);
- Err(Error::with_msg_no_trace("mismatch"))
- }
- Err(e) => {
- error!("Mismatch, original JSON:\n{}", jsstr);
- Err(e)
- }
- }
-}
#[test]
fn get_sls_archive_1() -> Result<(), Error> {
- // TODO OFFENDING TEST
- if true {
- return Ok(());
- }
+ let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) };
+ #[cfg(DISABLED)]
let fut = async move {
let rh = require_sls_test_host_running()?;
let cluster = &rh.cluster;
@@ -124,8 +18,8 @@ fn get_sls_archive_1() -> Result<(), Error> {
let endstr = "2021-11-10T01:01:00Z";
let (res, jsstr) =
get_binned_json_common_res(channel, begstr, endstr, 10, AggKind::TimeWeightedScalar, cluster).await?;
- let exp = r##"{"avgs":[24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875],"counts":[0,0,0,0,0,0,0,0,0,0,0,0],"finalisedRange":true,"maxs":[24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875],"mins":[24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875],"tsAnchor":1636506000,"tsMs":[0,5000,10000,15000,20000,25000,30000,35000,40000,45000,50000,55000,60000],"tsNs":[0,0,0,0,0,0,0,0,0,0,0,0,0]}"##;
- let exp: BinnedResponse = serde_json::from_str(exp).unwrap();
+ let exp = r##"{"avgs":[24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875],"counts":[0,0,0,0,0,0,0,0,0,0,0,0],"rangeFinal":true,"maxs":[24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875],"mins":[24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875],"tsAnchor":1636506000,"tsMs":[0,5000,10000,15000,20000,25000,30000,35000,40000,45000,50000,55000,60000],"tsNs":[0,0,0,0,0,0,0,0,0,0,0,0,0]}"##;
+ let exp: String = serde_json::from_str(exp).unwrap();
check_close(&res, &exp, &jsstr)?;
Ok(())
};
@@ -134,6 +28,8 @@ fn get_sls_archive_1() -> Result<(), Error> {
#[test]
fn get_sls_archive_3() -> Result<(), Error> {
+ let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) };
+ #[cfg(DISABLED)]
let fut = async move {
let rh = require_sls_test_host_running()?;
let cluster = &rh.cluster;
@@ -156,6 +52,8 @@ fn get_sls_archive_3() -> Result<(), Error> {
#[test]
fn get_sls_archive_wave_2() -> Result<(), Error> {
+ let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) };
+ #[cfg(DISABLED)]
let fut = async move {
let rh = require_sls_test_host_running()?;
let cluster = &rh.cluster;
@@ -175,275 +73,3 @@ fn get_sls_archive_wave_2() -> Result<(), Error> {
};
taskrun::run(fut)
}
-
-async fn get_binned_json_common(
- channel_name: &str,
- beg_date: &str,
- end_date: &str,
- bin_count: u32,
- agg_kind: AggKind,
- cluster: &Cluster,
- expect_bin_count: u32,
- expect_finalised_range: bool,
-) -> Result<(), Error> {
- let t1 = Utc::now();
- let node0 = &cluster.nodes[0];
- let beg_date: DateTime = beg_date.parse()?;
- let end_date: DateTime = end_date.parse()?;
- let channel_backend = "testbackend";
- let channel = Channel {
- backend: channel_backend.into(),
- name: channel_name.into(),
- series: None,
- };
- let range = NanoRange::from_date_time(beg_date, end_date);
- let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind);
- query.set_timeout(Duration::from_millis(15000));
- query.set_cache_usage(CacheUsage::Ignore);
- let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", node0.host, node0.port))?;
- query.append_to_url(&mut url);
- let url = url;
- debug!("get_binned_json_common get {}", url);
- let req = hyper::Request::builder()
- .method(http::Method::GET)
- .uri(url.to_string())
- .header(http::header::ACCEPT, APP_JSON)
- .body(Body::empty())
- .ec()?;
- let client = hyper::Client::new();
- let res = client.request(req).await.ec()?;
- if res.status() != StatusCode::OK {
- error!("get_binned_json_common client response {:?}", res);
- }
- let res = hyper::body::to_bytes(res.into_body()).await.ec()?;
- let t2 = chrono::Utc::now();
- let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
- debug!("get_binned_json_common DONE time {} ms", ms);
- let res = String::from_utf8_lossy(&res).to_string();
- let res: serde_json::Value = serde_json::from_str(res.as_str())?;
- // TODO assert more
- debug!(
- "result from endpoint: --------------\n{}\n--------------",
- serde_json::to_string_pretty(&res)?
- );
- // TODO enable in future:
- if false {
- if expect_finalised_range {
- if !res
- .get("finalisedRange")
- .ok_or(Error::with_msg("missing finalisedRange"))?
- .as_bool()
- .ok_or(Error::with_msg("key finalisedRange not bool"))?
- {
- return Err(Error::with_msg("expected finalisedRange"));
- }
- } else if res.get("finalisedRange").is_some() {
- return Err(Error::with_msg("expect absent finalisedRange"));
- }
- }
- if res.get("counts").unwrap().as_array().unwrap().len() != expect_bin_count as usize {
- return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count)));
- }
- if res.get("mins").unwrap().as_array().unwrap().len() != expect_bin_count as usize {
- return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count)));
- }
- if res.get("maxs").unwrap().as_array().unwrap().len() != expect_bin_count as usize {
- return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count)));
- }
- if res.get("avgs").unwrap().as_array().unwrap().len() != expect_bin_count as usize {
- return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count)));
- }
- Ok(())
-}
-
-// TODO reuse the types from server.
-#[derive(Debug, Serialize, Deserialize)]
-pub struct ScalarEventsResponse {
- #[serde(rename = "tsAnchor")]
- pub ts_anchor: u64,
- #[serde(rename = "tsMs")]
- pub ts_ms: Vec,
- #[serde(rename = "tsNs")]
- pub ts_ns: Vec,
- pub values: Vec,
- #[serde(rename = "finalisedRange", default = "bool_false")]
- pub finalised_range: bool,
-}
-
-#[derive(Debug, Serialize, Deserialize)]
-pub struct WaveEventsResponse {
- #[serde(rename = "tsAnchor")]
- ts_anchor: u64,
- #[serde(rename = "tsMs")]
- ts_ms: Vec,
- #[serde(rename = "tsNs")]
- ts_ns: Vec,
- values: Vec>,
- #[serde(rename = "finalisedRange", default = "bool_false")]
- finalised_range: bool,
-}
-
-impl WaveEventsResponse {
- pub fn is_close(&self, other: &Self) -> Result {
- let reterr = || -> Result {
- Err(Error::with_msg_no_trace(format!(
- "Mismatch\n{:?}\nVS\n{:?}",
- self, other
- )))
- };
- if self.ts_anchor != other.ts_anchor {
- return reterr();
- }
- if self.finalised_range != other.finalised_range {
- return reterr();
- }
- let pairs = [(&self.values, &other.values)];
- for (t, u) in pairs {
- for (j, k) in t.iter().zip(u) {
- for (&a, &b) in j.iter().zip(k) {
- if !f64_close(a, b) {
- return reterr();
- }
- }
- }
- }
- Ok(true)
- }
-}
-
-#[derive(Debug, Serialize, Deserialize)]
-struct BinnedResponse {
- #[serde(rename = "tsAnchor")]
- ts_anchor: u64,
- #[serde(rename = "tsMs")]
- ts_ms: Vec,
- #[serde(rename = "tsNs")]
- ts_ns: Vec,
- mins: Vec>,
- maxs: Vec >,
- avgs: Vec >,
- counts: Vec,
- #[serde(rename = "finalisedRange", default = "bool_false")]
- finalised_range: bool,
-}
-
-impl BinnedResponse {
- pub fn is_close(&self, other: &Self) -> Result {
- let reterr = || -> Result {
- Err(Error::with_msg_no_trace(format!(
- "Mismatch\n{:?}\nVS\n{:?}",
- self, other
- )))
- };
- if self.ts_anchor != other.ts_anchor {
- return reterr();
- }
- if self.finalised_range != other.finalised_range {
- return reterr();
- }
- if self.counts != other.counts {
- return reterr();
- }
- let pairs = [
- (&self.mins, &other.mins),
- (&self.maxs, &other.maxs),
- (&self.avgs, &other.avgs),
- ];
- for (t, u) in pairs {
- for (&a, &b) in t.iter().zip(u) {
- if let (Some(a), Some(b)) = (a, b) {
- if !f64_close(a, b) {
- return reterr();
- }
- } else if let (None, None) = (a, b) {
- } else {
- return reterr();
- }
- }
- }
- Ok(true)
- }
-}
-
-fn bool_false() -> bool {
- false
-}
-
-async fn get_binned_json_common_res(
- channel: Channel,
- beg_date: &str,
- end_date: &str,
- bin_count: u32,
- agg_kind: AggKind,
- cluster: &Cluster,
-) -> Result<(BinnedResponse, String), Error> {
- let t1 = Utc::now();
- let node0 = &cluster.nodes[0];
- let beg_date: DateTime = beg_date.parse()?;
- let end_date: DateTime = end_date.parse()?;
- let range = NanoRange::from_date_time(beg_date, end_date);
- let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind);
- query.set_timeout(Duration::from_millis(15000));
- query.set_cache_usage(CacheUsage::Ignore);
- let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", node0.host, node0.port))?;
- query.append_to_url(&mut url);
- let url = url;
- info!("get_binned_json_common_res get {}", url);
- let req = hyper::Request::builder()
- .method(http::Method::GET)
- .uri(url.to_string())
- .header(http::header::ACCEPT, APP_JSON)
- .body(Body::empty())
- .ec()?;
- let client = hyper::Client::new();
- let res = client.request(req).await.ec()?;
- if res.status() != StatusCode::OK {
- let msg = format!("client response {res:?}");
- error!("{msg}");
- return Err(msg.into());
- }
- let res = hyper::body::to_bytes(res.into_body()).await.ec()?;
- let t2 = chrono::Utc::now();
- let _ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
- let res = String::from_utf8_lossy(&res).to_string();
- let ret: BinnedResponse = serde_json::from_str(res.as_str())?;
- Ok((ret, res))
-}
-
-async fn get_events_json_common_res(
- channel: Channel,
- beg_date: &str,
- end_date: &str,
- cluster: &Cluster,
-) -> Result {
- let t1 = Utc::now();
- let node0 = &cluster.nodes[0];
- let beg_date: DateTime = beg_date.parse()?;
- let end_date: DateTime = end_date.parse()?;
- let range = NanoRange::from_date_time(beg_date, end_date);
- let mut query = PlainEventsQuery::new(channel, range, 4096, None, false);
- query.set_timeout(Duration::from_millis(15000));
- let mut url = Url::parse(&format!("http://{}:{}/api/4/events", node0.host, node0.port))?;
- query.append_to_url(&mut url);
- let url = url;
- info!("get_events_json_common_res get {}", url);
- let req = hyper::Request::builder()
- .method(http::Method::GET)
- .uri(url.to_string())
- .header(http::header::ACCEPT, APP_JSON)
- .body(Body::empty())
- .ec()?;
- let client = hyper::Client::new();
- let res = client.request(req).await.ec()?;
- if res.status() != StatusCode::OK {
- let msg = format!("client response {res:?}");
- error!("{msg}");
- return Err(msg.into());
- }
- let res = hyper::body::to_bytes(res.into_body()).await.ec()?;
- let t2 = chrono::Utc::now();
- let _ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
- let res = String::from_utf8_lossy(&res).to_string();
- //info!("STRING RESULT:{}", res);
- Ok(res)
-}
diff --git a/daqbufp2/src/test/binnedjson/channelarchiver.rs b/daqbufp2/src/test/binnedjson/channelarchiver.rs
index 140e872..0720ba6 100644
--- a/daqbufp2/src/test/binnedjson/channelarchiver.rs
+++ b/daqbufp2/src/test/binnedjson/channelarchiver.rs
@@ -2,6 +2,8 @@ use super::*;
#[test]
fn get_scalar_2_events() -> Result<(), Error> {
+ let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) };
+ #[cfg(DISABLED)]
let fut = async move {
let rh = require_sls_test_host_running()?;
let cluster = &rh.cluster;
@@ -45,6 +47,8 @@ fn get_scalar_2_events() -> Result<(), Error> {
#[test]
fn get_scalar_2_binned() -> Result<(), Error> {
+ let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) };
+ #[cfg(DISABLED)]
let fut = async move {
let rh = require_sls_test_host_running()?;
let cluster = &rh.cluster;
@@ -57,7 +61,7 @@ fn get_scalar_2_binned() -> Result<(), Error> {
let endstr = "2021-11-10T00:10:00Z";
let (res, jsstr) =
get_binned_json_common_res(channel, begstr, endstr, 10, AggKind::TimeWeightedScalar, cluster).await?;
- let exp = r##"{"avgs":[401.1745910644531,401.5135498046875,400.8823547363281,400.66156005859375,401.8301086425781,401.19305419921875,400.5584411621094,401.4371337890625,401.4137268066406,400.77880859375],"counts":[19,6,6,19,6,6,6,19,6,6],"finalisedRange":true,"maxs":[402.04977411361034,401.8439029736943,401.22628955394583,402.1298351124666,402.1298351124666,401.5084092642013,400.8869834159359,402.05358654212733,401.74477983225313,401.1271664125047],"mins":[400.08256099885625,401.22628955394583,400.60867613419754,400.0939982844072,401.5084092642013,400.8869834159359,400.2693699961876,400.05968642775446,401.1271664125047,400.50574056423943],"tsAnchor":1636502400,"tsMs":[0,60000,120000,180000,240000,300000,360000,420000,480000,540000,600000],"tsNs":[0,0,0,0,0,0,0,0,0,0,0]}"##;
+ let exp = r##"{"avgs":[401.1745910644531,401.5135498046875,400.8823547363281,400.66156005859375,401.8301086425781,401.19305419921875,400.5584411621094,401.4371337890625,401.4137268066406,400.77880859375],"counts":[19,6,6,19,6,6,6,19,6,6],"rangeFinal":true,"maxs":[402.04977411361034,401.8439029736943,401.22628955394583,402.1298351124666,402.1298351124666,401.5084092642013,400.8869834159359,402.05358654212733,401.74477983225313,401.1271664125047],"mins":[400.08256099885625,401.22628955394583,400.60867613419754,400.0939982844072,401.5084092642013,400.8869834159359,400.2693699961876,400.05968642775446,401.1271664125047,400.50574056423943],"tsAnchor":1636502400,"tsMs":[0,60000,120000,180000,240000,300000,360000,420000,480000,540000,600000],"tsNs":[0,0,0,0,0,0,0,0,0,0,0]}"##;
let exp: BinnedResponse = serde_json::from_str(exp).unwrap();
check_close(&res, &exp, &jsstr)?;
Ok(())
@@ -67,6 +71,8 @@ fn get_scalar_2_binned() -> Result<(), Error> {
#[test]
fn get_wave_1_events() -> Result<(), Error> {
+ let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) };
+ #[cfg(DISABLED)]
let fut = async move {
let rh = require_sls_test_host_running()?;
let cluster = &rh.cluster;
@@ -108,6 +114,8 @@ fn get_wave_1_events() -> Result<(), Error> {
#[test]
fn get_wave_1_binned() -> Result<(), Error> {
+ let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) };
+ #[cfg(DISABLED)]
let fut = async move {
let rh = require_sls_test_host_running()?;
let cluster = &rh.cluster;
diff --git a/daqbufp2/src/test/timeweightedjson.rs b/daqbufp2/src/test/timeweightedjson.rs
index 22d6767..8a7ed8a 100644
--- a/daqbufp2/src/test/timeweightedjson.rs
+++ b/daqbufp2/src/test/timeweightedjson.rs
@@ -10,79 +10,6 @@ use netpod::{AggKind, Channel, Cluster, NanoRange, APP_JSON};
use std::time::Duration;
use url::Url;
-#[test]
-fn time_weighted_json_00() -> Result<(), Error> {
- async fn inner() -> Result<(), Error> {
- let rh = require_test_hosts_running()?;
- let cluster = &rh.cluster;
- let res = get_json_common(
- "const-regular-scalar-i32-be",
- "1970-01-01T00:20:10.000Z",
- "1970-01-01T04:20:30.000Z",
- 20,
- AggKind::DimXBins1,
- cluster,
- 25,
- true,
- )
- .await?;
- let v = res.avgs[0];
- assert!(v > 41.9999 && v < 42.0001);
- Ok(())
- }
- super::run_test(inner())
-}
-
-#[test]
-fn time_weighted_json_01() -> Result<(), Error> {
- // TODO OFFENDING TEST
- if true {
- return Ok(());
- }
- async fn inner() -> Result<(), Error> {
- let rh = require_test_hosts_running()?;
- let cluster = &rh.cluster;
- let res = get_json_common(
- "const-regular-scalar-i32-be",
- "1970-01-01T00:20:10.000Z",
- "1970-01-01T10:20:30.000Z",
- 10,
- AggKind::DimXBins1,
- cluster,
- 9,
- true,
- )
- .await?;
- let v = res.avgs[0];
- assert!(v > 41.9999 && v < 42.0001);
- Ok(())
- }
- super::run_test(inner())
-}
-
-#[test]
-fn time_weighted_json_02() -> Result<(), Error> {
- async fn inner() -> Result<(), Error> {
- let rh = require_test_hosts_running()?;
- let cluster = &rh.cluster;
- let res = get_json_common(
- "const-regular-scalar-i32-be",
- "1970-01-01T00:20:10.000Z",
- "1970-01-01T00:20:20.000Z",
- 20,
- AggKind::TimeWeightedScalar,
- cluster,
- 100,
- true,
- )
- .await?;
- let v = res.avgs[0];
- assert!(v > 41.9999 && v < 42.0001);
- Ok(())
- }
- super::run_test(inner())
-}
-
#[test]
fn time_weighted_json_03() -> Result<(), Error> {
async fn inner() -> Result<(), Error> {
@@ -209,15 +136,15 @@ async fn get_json_common(
if false {
if expect_finalised_range {
if !res
- .get("finalisedRange")
- .ok_or(Error::with_msg("missing finalisedRange"))?
+ .get("rangeFinal")
+ .ok_or(Error::with_msg("missing rangeFinal"))?
.as_bool()
- .ok_or(Error::with_msg("key finalisedRange not bool"))?
+ .ok_or(Error::with_msg("key rangeFinal not bool"))?
{
- return Err(Error::with_msg("expected finalisedRange"));
+ return Err(Error::with_msg("expected rangeFinal"));
}
- } else if res.get("finalisedRange").is_some() {
- return Err(Error::with_msg("expect absent finalisedRange"));
+ } else if res.get("rangeFinal").is_some() {
+ return Err(Error::with_msg("expect absent rangeFinal"));
}
}
let counts = res.get("counts").unwrap().as_array().unwrap();
diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs
index 734ddc5..198ebf9 100644
--- a/disk/src/agg/binnedt.rs
+++ b/disk/src/agg/binnedt.rs
@@ -121,7 +121,7 @@ where
// TODO should we accumulate bins before emit? Maybe not, we want to stay responsive.
// Only if the frequency would be high, that would require cpu time checks. Worth it? Measure..
self.tmp_agg_results.push_back(ret);
- if self.curbin >= self.spec.count as u32 {
+ if self.curbin >= self.spec.bin_count() as u32 {
self.all_bins_emitted = true;
}
}
diff --git a/disk/src/binned.rs b/disk/src/binned.rs
index 63f18e7..c9693e2 100644
--- a/disk/src/binned.rs
+++ b/disk/src/binned.rs
@@ -77,7 +77,7 @@ impl ChannelExecFunction for BinnedBinaryChannelExec {
let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) {
Ok(Some(pre_range)) => {
debug!("BinnedBinaryChannelExec found pre_range: {pre_range:?}");
- if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
+ if range.grid_spec().bin_t_len() < pre_range.grid_spec.bin_t_len() {
let msg = format!(
"BinnedBinaryChannelExec incompatible ranges:\npre_range: {pre_range:?}\nrange: {range:?}"
);
@@ -323,12 +323,12 @@ impl ChannelExecFunction for BinnedJsonChannelExec {
{
let _ = event_value_shape;
let range = BinnedRange::covering_range(self.query.range().clone(), self.query.bin_count())?;
- let t_bin_count = range.count as u32;
+ let t_bin_count = range.bin_count() as u32;
let perf_opts = PerfOpts { inmem_bufcap: 512 };
let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) {
Ok(Some(pre_range)) => {
info!("BinnedJsonChannelExec found pre_range: {pre_range:?}");
- if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
+ if range.grid_spec().bin_t_len() < pre_range.grid_spec.bin_t_len() {
let msg = format!(
"BinnedJsonChannelExec incompatible ranges:\npre_range: {pre_range:?}\nrange: {range:?}"
);
diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs
index 3f64cb2..cb4b40a 100644
--- a/disk/src/raw/conn.rs
+++ b/disk/src/raw/conn.rs
@@ -49,20 +49,35 @@ where
let item = EventsDim0 { tss, pulses, values };
let item = ChannelEvents::Events(Box::new(item));
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
+ } else if let Some(item) = item.as_any_mut().downcast_mut::>() {
+ warn!("WaveEvents");
+ let _tss: VecDeque = item.tss.iter().map(|x| *x).collect();
+ let _pulses: VecDeque = item.pulses.iter().map(|x| *x).collect();
+ let _values: VecDeque> = item.vals.iter().map(|x| x.clone()).collect();
+ //let item = EventsDim1 { tss, pulses, values };
+ //let item = ChannelEvents::Events(Box::new(item));
+ //Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
+ Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
+ } else if let Some(item) = item
+ .as_any_mut()
+ .downcast_mut::>()
+ {
+ warn!("XBinnedScalarEvents");
+ let tss: VecDeque = item.tss.iter().map(|x| *x).collect();
+ let pulses: VecDeque = (0..tss.len()).map(|_| 0).collect();
+ let _avgs: VecDeque = item.avgs.iter().map(|x| x.clone()).collect();
+ let mins: VecDeque = item.mins.iter().map(|x| x.clone()).collect();
+ let _maxs: VecDeque = item.maxs.iter().map(|x| x.clone()).collect();
+ let item = EventsDim0 {
+ tss,
+ pulses,
+ values: mins,
+ };
+ let item = ChannelEvents::Events(Box::new(item));
+ Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
} else {
- if let Some(item) = item.as_any_mut().downcast_mut::>() {
- warn!("WaveEvents");
- let _tss: VecDeque = item.tss.iter().map(|x| *x).collect();
- let _pulses: VecDeque = item.pulses.iter().map(|x| *x).collect();
- let _values: VecDeque> = item.vals.iter().map(|x| x.clone()).collect();
- //let item = EventsDim1 { tss, pulses, values };
- //let item = ChannelEvents::Events(Box::new(item));
- //Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
- Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
- } else {
- error!("TODO bad, no idea what this item is");
- Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
- }
+ error!("TODO bad, no idea what this item is\n\n{:?}\n\n", item);
+ Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
}
}
RangeCompletableItem::RangeComplete => Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)),
diff --git a/httpret/src/api4/binned.rs b/httpret/src/api4/binned.rs
index 8676ee2..21cd8c6 100644
--- a/httpret/src/api4/binned.rs
+++ b/httpret/src/api4/binned.rs
@@ -16,7 +16,7 @@ use url::Url;
async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCached) -> Result, Error> {
info!("httpret plain_events_json req: {:?}", req);
- let (head, _body) = req.into_parts();
+ let (_head, _body) = req.into_parts();
let query = BinnedQuery::from_url(&url).map_err(|e| {
let msg = format!("can not parse query: {}", e.msg());
e.add_public_msg(msg)
@@ -27,7 +27,6 @@ async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCache
query.set_series_id(chconf.series);
let query = query;
// ---
-
let span1 = span!(
Level::INFO,
"httpret::binned",
@@ -38,15 +37,7 @@ async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCache
span1.in_scope(|| {
debug!("begin");
});
- let _: Result<_, Error> = match head.headers.get(http::header::ACCEPT) {
- //Some(v) if v == APP_OCTET => binned_binary(query, chconf, &ctx, node_config).await,
- //Some(v) if v == APP_JSON || v == ACCEPT_ALL => binned_json(query, chconf, &ctx, node_config).await,
- _ => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?),
- };
-
- // TODO analogue to `streams::plaineventsjson::plain_events_json` create a function for binned json.
-
- let item = streams::plaineventsjson::plain_events_json("", &node_config.node_config.cluster)
+ let item = streams::timebinnedjson::timebinned_json(&query, &node_config.node_config.cluster)
.instrument(span1)
.await?;
let buf = serde_json::to_vec(&item)?;
diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs
index 8d4a5eb..5c4e4ce 100644
--- a/httpret/src/channelconfig.rs
+++ b/httpret/src/channelconfig.rs
@@ -43,24 +43,49 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) ->
);
}
if channel.backend() == "test-inmem" {
- if channel.name() == "inmem-d0-i32" {
+ let ret = if channel.name() == "inmem-d0-i32" {
let ret = ChConf {
series: 1,
scalar_type: ScalarType::I32,
shape: Shape::Scalar,
};
- return Ok(ret);
- }
+ Ok(ret)
+ } else {
+ error!("no test information");
+ Err(Error::with_msg_no_trace(format!("no test information"))
+ .add_public_msg("No channel config for test channel {:?}"))
+ };
+ return ret;
}
if channel.backend() == "test-disk-databuffer" {
- if channel.name() == "scalar-i32-be" {
+ // TODO the series-ids here are just random. Need to integrate with better test setup.
+ let ret = if channel.name() == "scalar-i32-be" {
let ret = ChConf {
series: 1,
scalar_type: ScalarType::I32,
shape: Shape::Scalar,
};
- return Ok(ret);
- }
+ Ok(ret)
+ } else if channel.name() == "wave-f64-be-n21" {
+ let ret = ChConf {
+ series: 2,
+ scalar_type: ScalarType::F64,
+ shape: Shape::Wave(21),
+ };
+ Ok(ret)
+ } else if channel.name() == "const-regular-scalar-i32-be" {
+ let ret = ChConf {
+ series: 3,
+ scalar_type: ScalarType::I32,
+ shape: Shape::Scalar,
+ };
+ Ok(ret)
+ } else {
+ error!("no test information");
+ Err(Error::with_msg_no_trace(format!("no test information"))
+ .add_public_msg("No channel config for test channel {:?}"))
+ };
+ return ret;
}
// TODO use a common already running worker pool for these queries:
let dbconf = &ncc.node_config.cluster.database;
diff --git a/httpret/static/documentation/api4.html b/httpret/static/documentation/api4.html
index 0da21df..01dbb80 100644
--- a/httpret/static/documentation/api4.html
+++ b/httpret/static/documentation/api4.html
@@ -199,7 +199,7 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/events?channel
Example response:
{
- "finalisedRange": true,
+ "rangeFinal": true,
"tsAnchor": 1623763172,
"tsMs": [
5,
@@ -224,7 +224,7 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/events?channel
Finalised range
If the server can determine that no more data will be added to the requested time range
- then it will add the flag finalisedRange: true to the response.
+ then it will add the flag rangeFinal: true to the response.
@@ -312,7 +312,7 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/binned?channel
0,
0
],
- "finalisedRange": true
+ "rangeFinal": true
}
@@ -321,7 +321,7 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/binned?channel
Finalised range
If the server can determine that no more data will be added to the requested time range
- then it will add the flag finalisedRange: true to the response.
+ then it will add the flag rangeFinal: true to the response.
diff --git a/items/src/binsdim0.rs b/items/src/binsdim0.rs
index 6548a19..280d896 100644
--- a/items/src/binsdim0.rs
+++ b/items/src/binsdim0.rs
@@ -266,7 +266,7 @@ pub struct MinMaxAvgBinsCollectedResult {
mins: Vec,
maxs: Vec,
avgs: Vec,
- #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
+ #[serde(skip_serializing_if = "crate::bool_is_false", rename = "rangeFinal")]
finalised_range: bool,
#[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")]
missing_bins: u32,
diff --git a/items/src/binsdim1.rs b/items/src/binsdim1.rs
index a2591f7..535ded8 100644
--- a/items/src/binsdim1.rs
+++ b/items/src/binsdim1.rs
@@ -259,7 +259,7 @@ pub struct MinMaxAvgDim1BinsCollectedResult {
mins: Vec>>,
maxs: Vec >>,
avgs: Vec >>,
- #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
+ #[serde(skip_serializing_if = "crate::bool_is_false", rename = "rangeFinal")]
finalised_range: bool,
#[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")]
missing_bins: u32,
@@ -497,7 +497,7 @@ pub struct WaveEventsCollectedResult {
#[serde(rename = "pulseOff")]
pulse_off: Vec,
values: Vec>,
- #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
+ #[serde(skip_serializing_if = "crate::bool_is_false", rename = "rangeFinal")]
range_complete: bool,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")]
timed_out: bool,
diff --git a/items/src/items.rs b/items/src/items.rs
index a964dd1..b440c1c 100644
--- a/items/src/items.rs
+++ b/items/src/items.rs
@@ -371,7 +371,7 @@ impl FrameType for EventQueryJsonStringFrame {
}
pub trait EventsNodeProcessorOutput:
- Send + Unpin + DeserializeOwned + WithTimestamps + TimeBinnableType + ByteEstimate
+ fmt::Debug + Send + Unpin + DeserializeOwned + WithTimestamps + TimeBinnableType + ByteEstimate
{
fn as_any_mut(&mut self) -> &mut dyn Any;
fn into_parts(self) -> (Box, VecDeque, VecDeque);
diff --git a/items/src/scalarevents.rs b/items/src/scalarevents.rs
index 2e26270..eb61ce8 100644
--- a/items/src/scalarevents.rs
+++ b/items/src/scalarevents.rs
@@ -290,7 +290,7 @@ pub struct EventValuesCollectorOutput {
#[serde(rename = "pulseOff")]
pulse_off: Vec,
values: Vec,
- #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
+ #[serde(skip_serializing_if = "crate::bool_is_false", rename = "rangeFinal")]
range_complete: bool,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")]
timed_out: bool,
diff --git a/items/src/statsevents.rs b/items/src/statsevents.rs
index bd0045f..cfd07f8 100644
--- a/items/src/statsevents.rs
+++ b/items/src/statsevents.rs
@@ -221,7 +221,7 @@ pub struct StatsEventsCollectorOutput {
#[serde(rename = "tsNs")]
ts_off_ns: Vec,
// TODO what to collect? pulse min/max
- #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
+ #[serde(skip_serializing_if = "crate::bool_is_false", rename = "rangeFinal")]
range_complete: bool,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")]
timed_out: bool,
diff --git a/items/src/xbinnedscalarevents.rs b/items/src/xbinnedscalarevents.rs
index 7e68845..338b8ea 100644
--- a/items/src/xbinnedscalarevents.rs
+++ b/items/src/xbinnedscalarevents.rs
@@ -429,7 +429,7 @@ pub struct XBinnedScalarEventsCollectedResult {
mins: Vec,
maxs: Vec,
avgs: Vec,
- #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
+ #[serde(skip_serializing_if = "crate::bool_is_false", rename = "rangeFinal")]
finalised_range: bool,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")]
timed_out: bool,
diff --git a/items/src/xbinnedwaveevents.rs b/items/src/xbinnedwaveevents.rs
index af06bff..516a564 100644
--- a/items/src/xbinnedwaveevents.rs
+++ b/items/src/xbinnedwaveevents.rs
@@ -450,7 +450,7 @@ pub struct XBinnedWaveEventsCollectedResult {
mins: Vec>,
maxs: Vec>,
avgs: Vec>,
- #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
+ #[serde(skip_serializing_if = "crate::bool_is_false", rename = "rangeFinal")]
finalised_range: bool,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")]
timed_out: bool,
diff --git a/items_0/src/collect_c.rs b/items_0/src/collect_c.rs
index 0bba76a..4b74dae 100644
--- a/items_0/src/collect_c.rs
+++ b/items_0/src/collect_c.rs
@@ -7,27 +7,18 @@ use std::any::Any;
use std::fmt;
pub trait Collector: fmt::Debug + Send {
- // TODO should require here Collectable?
- type Input;
- type Output: Collected;
-
fn len(&self) -> usize;
-
- fn ingest(&mut self, item: &mut Self::Input);
-
+ fn ingest(&mut self, item: &mut dyn Collectable);
fn set_range_complete(&mut self);
-
fn set_timed_out(&mut self);
-
- fn result(&mut self) -> Result;
+ fn result(&mut self) -> Result, Error>;
}
-pub trait Collectable: fmt::Debug {
- type Collector: Collector ;
-
- fn new_collector(&self) -> Self::Collector;
+pub trait Collectable: fmt::Debug + crate::AsAnyMut {
+ fn new_collector(&self) -> Box;
}
+// TODO can this get removed?
pub trait Collected: fmt::Debug + ToJsonResult + AsAnyRef + Send {}
erased_serde::serialize_trait_object!(Collected);
@@ -53,6 +44,7 @@ impl Collected for Box {}
#[derive(Debug)]
pub struct CollectorDynDefault {}
+// TODO remove?
pub trait CollectorDyn: fmt::Debug + Send {
fn len(&self) -> usize;
@@ -70,50 +62,15 @@ pub trait CollectableWithDefault {
fn as_any_mut(&mut self) -> &mut dyn Any;
}
-#[derive(Debug)]
-pub struct EventsCollector {
- coll: Box,
-}
-
-impl EventsCollector {
- pub fn new(coll: Box) -> Self {
- Self { coll }
- }
-}
-
-impl Collector for EventsCollector {
- type Input = Box;
-
- // TODO this Output trait does not differentiate between e.g. collected events, collected bins, different aggs, etc...
- type Output = Box;
-
- fn len(&self) -> usize {
- self.coll.len()
- }
-
- fn ingest(&mut self, item: &mut Self::Input) {
- self.coll.ingest(item.as_collectable_with_default_mut());
- }
-
- fn set_range_complete(&mut self) {
- self.coll.set_range_complete()
- }
-
- fn set_timed_out(&mut self) {
- self.coll.set_timed_out()
- }
-
- fn result(&mut self) -> Result {
- self.coll.result()
+impl crate::AsAnyMut for Box {
+ fn as_any_mut(&mut self) -> &mut dyn Any {
+ self
}
}
impl Collectable for Box {
- type Collector = EventsCollector;
-
- fn new_collector(&self) -> Self::Collector {
- let coll = CollectableWithDefault::new_collector(self.as_ref());
- EventsCollector::new(coll)
+ fn new_collector(&self) -> Box {
+ todo!()
}
}
@@ -121,14 +78,11 @@ impl Collectable for Box {
pub struct TimeBinnedCollector {}
impl Collector for TimeBinnedCollector {
- type Input = Box;
- type Output = Box;
-
fn len(&self) -> usize {
todo!()
}
- fn ingest(&mut self, _item: &mut Self::Input) {
+ fn ingest(&mut self, _item: &mut dyn Collectable) {
todo!()
}
@@ -140,15 +94,19 @@ impl Collector for TimeBinnedCollector {
todo!()
}
- fn result(&mut self) -> Result {
+ fn result(&mut self) -> Result, Error> {
todo!()
}
}
+impl crate::AsAnyMut for Box {
+ fn as_any_mut(&mut self) -> &mut dyn Any {
+ self
+ }
+}
+
impl Collectable for Box {
- type Collector = TimeBinnedCollector;
-
- fn new_collector(&self) -> Self::Collector {
- todo!()
+ fn new_collector(&self) -> Box {
+ self.as_ref().new_collector()
}
}
diff --git a/items_0/src/collect_s.rs b/items_0/src/collect_s.rs
index 90dbefb..5efac83 100644
--- a/items_0/src/collect_s.rs
+++ b/items_0/src/collect_s.rs
@@ -5,6 +5,7 @@ use serde::Serialize;
use std::any::Any;
use std::fmt;
+// TODO rename to `Typed`
pub trait CollectorType: Send + Unpin + WithLen {
type Input: Collectable;
type Output: Collected + ToJsonResult + Serialize;
@@ -24,6 +25,7 @@ pub trait Collector: Send + Unpin + WithLen {
fn result(&mut self) -> Result, Error>;
}
+// TODO rename to `Typed`
pub trait CollectableType {
type Collector: CollectorType ;
fn new_collector() -> Self::Collector;
diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs
index 0444541..7564036 100644
--- a/items_0/src/items_0.rs
+++ b/items_0/src/items_0.rs
@@ -61,14 +61,8 @@ pub trait AsAnyMut {
fn as_any_mut(&mut self) -> &mut dyn Any;
}
-/*impl AsAnyRef for Box {
- fn as_any_ref(&self) -> &dyn Any {
- self.as_ref().as_any_ref()
- }
-}*/
-
/// Data in time-binned form.
-pub trait TimeBinned: Any + TimeBinnable {
+pub trait TimeBinned: Any + TimeBinnable + crate::collect_c::Collectable {
fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable;
fn as_collectable_mut(&mut self) -> &mut dyn Collectable;
fn edges_slice(&self) -> (&[u64], &[u64]);
diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs
index 07da626..7930421 100644
--- a/items_2/src/binsdim0.rs
+++ b/items_2/src/binsdim0.rs
@@ -19,6 +19,12 @@ use std::any::Any;
use std::collections::VecDeque;
use std::{fmt, mem};
+#[allow(unused)]
+macro_rules! trace4 {
+ ($($arg:tt)*) => ();
+ ($($arg:tt)*) => (eprintln!($($arg)*));
+}
+
// TODO make members private
#[derive(Clone, PartialEq, Serialize, Deserialize)]
pub struct BinsDim0 {
@@ -51,17 +57,6 @@ where
}
impl BinsDim0 {
- pub fn empty() -> Self {
- Self {
- ts1s: VecDeque::new(),
- ts2s: VecDeque::new(),
- counts: VecDeque::new(),
- mins: VecDeque::new(),
- maxs: VecDeque::new(),
- avgs: VecDeque::new(),
- }
- }
-
pub fn push(&mut self, ts1: u64, ts2: u64, count: u64, min: NTY, max: NTY, avg: f32) {
self.ts1s.push_back(ts1);
self.ts2s.push_back(ts2);
@@ -119,6 +114,19 @@ impl BinsDim0 {
}
}
+impl Empty for BinsDim0 {
+ fn empty() -> Self {
+ Self {
+ ts1s: VecDeque::new(),
+ ts2s: VecDeque::new(),
+ counts: VecDeque::new(),
+ mins: VecDeque::new(),
+ maxs: VecDeque::new(),
+ avgs: VecDeque::new(),
+ }
+ }
+}
+
impl WithLen for BinsDim0 {
fn len(&self) -> usize {
self.ts1s.len()
@@ -151,19 +159,6 @@ impl RangeOverlapInfo for BinsDim0 {
}
}
-impl Empty for BinsDim0 {
- fn empty() -> Self {
- Self {
- ts1s: Default::default(),
- ts2s: Default::default(),
- counts: Default::default(),
- mins: Default::default(),
- maxs: Default::default(),
- avgs: Default::default(),
- }
- }
-}
-
impl AppendEmptyBin for BinsDim0 {
fn append_empty_bin(&mut self, ts1: u64, ts2: u64) {
self.ts1s.push_back(ts1);
@@ -207,7 +202,8 @@ impl TimeBinnableType for BinsDim0 {
}
}
-#[derive(Debug, Serialize)]
+// TODO rename to BinsDim0CollectorOutput
+#[derive(Debug, Serialize, Deserialize)]
pub struct BinsDim0CollectedResult {
#[serde(rename = "tsAnchor")]
ts_anchor_sec: u64,
@@ -219,17 +215,23 @@ pub struct BinsDim0CollectedResult {
ts1_off_ns: VecDeque,
#[serde(rename = "ts2Ns")]
ts2_off_ns: VecDeque,
+ #[serde(rename = "counts")]
counts: VecDeque,
+ #[serde(rename = "mins")]
mins: VecDeque,
+ #[serde(rename = "maxs")]
maxs: VecDeque,
+ #[serde(rename = "avgs")]
avgs: VecDeque,
- #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
+ #[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")]
finalised_range: bool,
- #[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")]
+ #[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")]
+ timed_out: bool,
+ #[serde(rename = "missingBins", default, skip_serializing_if = "Zero::is_zero")]
missing_bins: u32,
- #[serde(skip_serializing_if = "Option::is_none", rename = "continueAt")]
+ #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
continue_at: Option,
- #[serde(skip_serializing_if = "Option::is_none", rename = "finishedAt")]
+ #[serde(rename = "finishedAt", default, skip_serializing_if = "Option::is_none")]
finished_at: Option,
}
@@ -242,14 +244,30 @@ impl items_0::AsAnyRef for BinsDim0CollectedResult {
impl items_0::collect_c::Collected for BinsDim0CollectedResult {}
impl BinsDim0CollectedResult {
+ pub fn len(&self) -> usize {
+ self.ts1_off_ms.len()
+ }
+
pub fn ts_anchor_sec(&self) -> u64 {
self.ts_anchor_sec
}
+ pub fn ts1_off_ms(&self) -> &VecDeque {
+ &self.ts1_off_ms
+ }
+
+ pub fn ts2_off_ms(&self) -> &VecDeque {
+ &self.ts2_off_ms
+ }
+
pub fn counts(&self) -> &VecDeque {
&self.counts
}
+ pub fn range_final(&self) -> bool {
+ self.finalised_range
+ }
+
pub fn missing_bins(&self) -> u32 {
self.missing_bins
}
@@ -265,6 +283,10 @@ impl BinsDim0CollectedResult {
pub fn maxs(&self) -> &VecDeque {
&self.maxs
}
+
+ pub fn avgs(&self) -> &VecDeque {
+ &self.avgs
+ }
}
impl ToJsonResult for BinsDim0CollectedResult {
@@ -278,6 +300,7 @@ impl ToJsonResult for BinsDim0CollectedResult {
}
}
+#[derive(Debug)]
pub struct BinsDim0Collector {
timed_out: bool,
range_complete: bool,
@@ -305,6 +328,7 @@ impl CollectorType for BinsDim0Collector {
type Output = BinsDim0CollectedResult;
fn ingest(&mut self, src: &mut Self::Input) {
+ trace!("\n\n----------- BinsDim0Collector ingest\n{:?}\n\n", src);
// TODO could be optimized by non-contiguous container.
self.vals.ts1s.append(&mut src.ts1s);
self.vals.ts2s.append(&mut src.ts2s);
@@ -362,6 +386,7 @@ impl CollectorType for BinsDim0Collector {
maxs,
avgs,
finalised_range: self.range_complete,
+ timed_out: self.timed_out,
missing_bins,
continue_at,
finished_at,
@@ -378,6 +403,73 @@ impl CollectableType for BinsDim0 {
}
}
+impl items_0::collect_c::Collector for BinsDim0Collector
+where
+ NTY: ScalarOps,
+{
+ fn len(&self) -> usize {
+ self.vals.len()
+ }
+
+ fn ingest(&mut self, item: &mut dyn items_0::collect_c::Collectable) {
+ trace4!("\n\n••••••••••••••••••••••••••••\nINGEST\n{:?}\n\n", item);
+ {
+ {
+ //let tyid = item.type_id();
+ //let tyid = item.as_any_mut().type_id();
+ //let tyid = format!("{:?}", item.type_id().to_owned());
+ trace4!("ty 0: {:40?}", (item.as_any_mut() as &dyn Any).type_id());
+ }
+ trace4!("ty 1: {:40?}", std::any::TypeId::of::>());
+ trace4!("ty 2: {:40?}", std::any::TypeId::of::>());
+ trace4!("ty 3: {:40?}", std::any::TypeId::of::>>());
+ trace4!(
+ "ty 4: {:?}",
+ std::any::TypeId::of::>()
+ );
+ trace4!(
+ "ty 5: {:?}",
+ std::any::TypeId::of::<&mut dyn items_0::collect_c::Collectable>()
+ );
+ trace4!("ty 6: {:?}", std::any::TypeId::of::>());
+ }
+ if let Some(item) = item.as_any_mut().downcast_mut::>() {
+ trace4!("ingest plain");
+ CollectorType::ingest(self, item)
+ } else if let Some(item) = item.as_any_mut().downcast_mut::>>() {
+ trace4!("ingest boxed");
+ CollectorType::ingest(self, item)
+ } else if let Some(item) = item.as_any_mut().downcast_mut::>() {
+ trace4!("ingest boxed dyn TimeBinned");
+ if let Some(item) = item.as_any_mut().downcast_mut::>() {
+ trace4!("ingest boxed dyn TimeBinned match");
+ CollectorType::ingest(self, item)
+ } else {
+ warn!("BinsDim0Collector::ingest unexpected inner item");
+ trace!("BinsDim0Collector::ingest unexpected inner item {:?}", item);
+ }
+ } else {
+ warn!("BinsDim0Collector::ingest unexpected item");
+ trace!("BinsDim0Collector::ingest unexpected item {:?}", item);
+ }
+ }
+
+ fn set_range_complete(&mut self) {
+ CollectorType::set_range_complete(self)
+ }
+
+ fn set_timed_out(&mut self) {
+ CollectorType::set_timed_out(self)
+ }
+
+ fn result(&mut self) -> Result, Error> {
+ match CollectorType::result(self) {
+ Ok(res) => Ok(Box::new(res)),
+ Err(e) => Err(e.into()),
+ }
+ }
+}
+
pub struct BinsDim0Aggregator {
range: NanoRange,
count: u64,
@@ -510,7 +602,7 @@ impl TimeBinner for BinsDim0TimeBinner {
return;
}
if self.edges.len() < 2 {
- warn!("TimeBinnerDyn for {self_name} no more bin in edges A");
+ warn!("TimeBinnerDyn for {self_name} no more bin in edges A\n{:?}\n\n", item);
return;
}
// TODO optimize by remembering at which event array index we have arrived.
@@ -522,7 +614,7 @@ impl TimeBinner for BinsDim0TimeBinner {
}) {
self.cycle();
if self.edges.len() < 2 {
- warn!("TimeBinnerDyn for {self_name} no more bin in edges B");
+ warn!("TimeBinnerDyn for {self_name} no more bin in edges B\n{:?}\n\n", item);
return;
}
}
@@ -560,7 +652,7 @@ impl TimeBinner for BinsDim0TimeBinner {
if item.ends_after(agg.range().clone()) {
self.cycle();
if self.edges.len() < 2 {
- warn!("TimeBinnerDyn for {self_name} no more bin in edges C");
+ warn!("TimeBinnerDyn for {self_name} no more bin in edges C\n{:?}\n\n", item);
return;
}
} else {
@@ -692,3 +784,21 @@ impl TimeBinned for BinsDim0 {
self
}
}
+
+impl items_0::AsAnyMut for BinsDim0
+where
+ NTY: 'static,
+{
+ fn as_any_mut(&mut self) -> &mut dyn Any {
+ self
+ }
+}
+
+impl items_0::collect_c::Collectable for BinsDim0
+where
+ NTY: ScalarOps,
+{
+ fn new_collector(&self) -> Box {
+ Box::new(BinsDim0Collector::::new())
+ }
+}
diff --git a/items_2/src/binsxbindim0.rs b/items_2/src/binsxbindim0.rs
new file mode 100644
index 0000000..301d247
--- /dev/null
+++ b/items_2/src/binsxbindim0.rs
@@ -0,0 +1,819 @@
+use crate::{ts_offs_from_abs, ts_offs_from_abs_with_anchor};
+use crate::{IsoDateTime, RangeOverlapInfo};
+use crate::{TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinner};
+use chrono::{TimeZone, Utc};
+use err::Error;
+use items_0::collect_s::{Collectable, CollectableType, CollectorType, ToJsonResult};
+use items_0::scalar_ops::ScalarOps;
+use items_0::AppendEmptyBin;
+use items_0::Empty;
+use items_0::TimeBinned;
+use items_0::TimeBins;
+use items_0::WithLen;
+use netpod::log::*;
+use netpod::timeunits::SEC;
+use netpod::NanoRange;
+use num_traits::Zero;
+use serde::{Deserialize, Serialize};
+use std::any::Any;
+use std::collections::VecDeque;
+use std::{fmt, mem};
+
+#[allow(unused)]
+macro_rules! trace4 {
+ ($($arg:tt)*) => ();
+ ($($arg:tt)*) => (eprintln!($($arg)*));
+}
+
+#[derive(Clone, PartialEq, Serialize, Deserialize)]
+pub struct BinsXbinDim0 {
+ ts1s: VecDeque,
+ ts2s: VecDeque,
+ counts: VecDeque,
+ mins: VecDeque,
+ maxs: VecDeque,
+ avgs: VecDeque,
+ // TODO could consider more variables:
+ // ts min/max, pulse min/max, avg of mins, avg of maxs, variances, etc...
+}
+
+impl fmt::Debug for BinsXbinDim0
+where
+ NTY: fmt::Debug,
+{
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ let self_name = std::any::type_name::();
+ write!(
+ fmt,
+ "{self_name} count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}",
+ self.ts1s.len(),
+ self.ts1s.iter().map(|k| k / SEC).collect::>(),
+ self.ts2s.iter().map(|k| k / SEC).collect::>(),
+ self.counts,
+ self.mins,
+ self.maxs,
+ self.avgs,
+ )
+ }
+}
+
+impl BinsXbinDim0 {
+ pub fn from_content(
+ ts1s: VecDeque,
+ ts2s: VecDeque,
+ counts: VecDeque,
+ mins: VecDeque,
+ maxs: VecDeque,
+ avgs: VecDeque,
+ ) -> Self {
+ Self {
+ ts1s,
+ ts2s,
+ counts,
+ mins,
+ maxs,
+ avgs,
+ }
+ }
+
+ pub fn push(&mut self, ts1: u64, ts2: u64, count: u64, min: NTY, max: NTY, avg: f32) {
+ self.ts1s.push_back(ts1);
+ self.ts2s.push_back(ts2);
+ self.counts.push_back(count);
+ self.mins.push_back(min);
+ self.maxs.push_back(max);
+ self.avgs.push_back(avg);
+ }
+
+ pub fn append_zero(&mut self, beg: u64, end: u64) {
+ self.ts1s.push_back(beg);
+ self.ts2s.push_back(end);
+ self.counts.push_back(0);
+ self.mins.push_back(NTY::zero_b());
+ self.maxs.push_back(NTY::zero_b());
+ self.avgs.push_back(0.);
+ }
+
+ pub fn append_all_from(&mut self, src: &mut Self) {
+ self.ts1s.extend(src.ts1s.drain(..));
+ self.ts2s.extend(src.ts2s.drain(..));
+ self.counts.extend(src.counts.drain(..));
+ self.mins.extend(src.mins.drain(..));
+ self.maxs.extend(src.maxs.drain(..));
+ self.avgs.extend(src.avgs.drain(..));
+ }
+
+ pub fn equal_slack(&self, other: &Self) -> bool {
+ for (&a, &b) in self.ts1s.iter().zip(other.ts1s.iter()) {
+ if a != b {
+ return false;
+ }
+ }
+ for (&a, &b) in self.ts2s.iter().zip(other.ts2s.iter()) {
+ if a != b {
+ return false;
+ }
+ }
+ for (a, b) in self.mins.iter().zip(other.mins.iter()) {
+ if !a.equal_slack(b) {
+ return false;
+ }
+ }
+ for (a, b) in self.maxs.iter().zip(other.maxs.iter()) {
+ if !a.equal_slack(b) {
+ return false;
+ }
+ }
+ for (a, b) in self.avgs.iter().zip(other.avgs.iter()) {
+ if !a.equal_slack(b) {
+ return false;
+ }
+ }
+ true
+ }
+}
+
+impl Empty for BinsXbinDim0 {
+ fn empty() -> Self {
+ Self {
+ ts1s: VecDeque::new(),
+ ts2s: VecDeque::new(),
+ counts: VecDeque::new(),
+ mins: VecDeque::new(),
+ maxs: VecDeque::new(),
+ avgs: VecDeque::new(),
+ }
+ }
+}
+
+impl WithLen for BinsXbinDim0 {
+ fn len(&self) -> usize {
+ self.ts1s.len()
+ }
+}
+
+impl RangeOverlapInfo for BinsXbinDim0 {
+ fn ends_before(&self, range: NanoRange) -> bool {
+ if let Some(&max) = self.ts2s.back() {
+ max <= range.beg
+ } else {
+ true
+ }
+ }
+
+ fn ends_after(&self, range: NanoRange) -> bool {
+ if let Some(&max) = self.ts2s.back() {
+ max > range.end
+ } else {
+ true
+ }
+ }
+
+ fn starts_after(&self, range: NanoRange) -> bool {
+ if let Some(&min) = self.ts1s.front() {
+ min >= range.end
+ } else {
+ true
+ }
+ }
+}
+
+impl AppendEmptyBin for BinsXbinDim0 {
+ fn append_empty_bin(&mut self, ts1: u64, ts2: u64) {
+ self.ts1s.push_back(ts1);
+ self.ts2s.push_back(ts2);
+ self.counts.push_back(0);
+ self.mins.push_back(NTY::zero_b());
+ self.maxs.push_back(NTY::zero_b());
+ self.avgs.push_back(0.);
+ }
+}
+
+impl TimeBins for BinsXbinDim0 {
+ fn ts_min(&self) -> Option {
+ self.ts1s.front().map(Clone::clone)
+ }
+
+ fn ts_max(&self) -> Option {
+ self.ts2s.back().map(Clone::clone)
+ }
+
+ fn ts_min_max(&self) -> Option<(u64, u64)> {
+ if let (Some(min), Some(max)) = (self.ts1s.front().map(Clone::clone), self.ts2s.back().map(Clone::clone)) {
+ Some((min, max))
+ } else {
+ None
+ }
+ }
+}
+
+impl TimeBinnableType for BinsXbinDim0 {
+ type Output = BinsXbinDim0;
+ type Aggregator = BinsXbinDim0Aggregator;
+
+ fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
+ let self_name = std::any::type_name::();
+ debug!(
+ "TimeBinnableType for {self_name} aggregator() range {:?} x_bin_count {} do_time_weight {}",
+ range, x_bin_count, do_time_weight
+ );
+ Self::Aggregator::new(range, do_time_weight)
+ }
+}
+
+// TODO rename to BinsDim0CollectorOutput
+#[derive(Debug, Serialize, Deserialize)]
+pub struct BinsXbinDim0CollectedResult {
+ #[serde(rename = "tsAnchor")]
+ ts_anchor_sec: u64,
+ #[serde(rename = "ts1Ms")]
+ ts1_off_ms: VecDeque,
+ #[serde(rename = "ts2Ms")]
+ ts2_off_ms: VecDeque,
+ #[serde(rename = "ts1Ns")]
+ ts1_off_ns: VecDeque,
+ #[serde(rename = "ts2Ns")]
+ ts2_off_ns: VecDeque,
+ #[serde(rename = "counts")]
+ counts: VecDeque,
+ #[serde(rename = "mins")]
+ mins: VecDeque,
+ #[serde(rename = "maxs")]
+ maxs: VecDeque,
+ #[serde(rename = "avgs")]
+ avgs: VecDeque,
+ #[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")]
+ finalised_range: bool,
+ #[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")]
+ timed_out: bool,
+ #[serde(rename = "missingBins", default, skip_serializing_if = "Zero::is_zero")]
+ missing_bins: u32,
+ #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
+ continue_at: Option,
+ #[serde(rename = "finishedAt", default, skip_serializing_if = "Option::is_none")]
+ finished_at: Option,
+}
+
+impl items_0::AsAnyRef for BinsXbinDim0CollectedResult {
+ fn as_any_ref(&self) -> &dyn Any {
+ self
+ }
+}
+
+impl items_0::collect_c::Collected for BinsXbinDim0CollectedResult {}
+
+impl BinsXbinDim0CollectedResult {
+ pub fn len(&self) -> usize {
+ self.ts1_off_ms.len()
+ }
+
+ pub fn ts_anchor_sec(&self) -> u64 {
+ self.ts_anchor_sec
+ }
+
+ pub fn ts1_off_ms(&self) -> &VecDeque {
+ &self.ts1_off_ms
+ }
+
+ pub fn ts2_off_ms(&self) -> &VecDeque {
+ &self.ts2_off_ms
+ }
+
+ pub fn counts(&self) -> &VecDeque {
+ &self.counts
+ }
+
+ pub fn range_final(&self) -> bool {
+ self.finalised_range
+ }
+
+ pub fn missing_bins(&self) -> u32 {
+ self.missing_bins
+ }
+
+ pub fn continue_at(&self) -> Option {
+ self.continue_at.clone()
+ }
+
+ pub fn mins(&self) -> &VecDeque {
+ &self.mins
+ }
+
+ pub fn maxs(&self) -> &VecDeque {
+ &self.maxs
+ }
+}
+
+impl ToJsonResult for BinsXbinDim0CollectedResult {
+ fn to_json_result(&self) -> Result, Error> {
+ let k = serde_json::to_value(self)?;
+ Ok(Box::new(k))
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+}
+
+#[derive(Debug)]
+pub struct BinsXbinDim0Collector {
+ timed_out: bool,
+ range_complete: bool,
+ vals: BinsXbinDim0,
+}
+
+impl BinsXbinDim0Collector {
+ pub fn new() -> Self {
+ Self {
+ timed_out: false,
+ range_complete: false,
+ vals: BinsXbinDim0::::empty(),
+ }
+ }
+}
+
+impl WithLen for BinsXbinDim0Collector {
+ fn len(&self) -> usize {
+ self.vals.ts1s.len()
+ }
+}
+
+impl CollectorType for BinsXbinDim0Collector {
+ type Input = BinsXbinDim0;
+ type Output = BinsXbinDim0CollectedResult;
+
+ fn ingest(&mut self, src: &mut Self::Input) {
+ trace!("\n\n----------- BinsXbinDim0Collector ingest\n{:?}\n\n", src);
+ // TODO could be optimized by non-contiguous container.
+ self.vals.ts1s.append(&mut src.ts1s);
+ self.vals.ts2s.append(&mut src.ts2s);
+ self.vals.counts.append(&mut src.counts);
+ self.vals.mins.append(&mut src.mins);
+ self.vals.maxs.append(&mut src.maxs);
+ self.vals.avgs.append(&mut src.avgs);
+ }
+
+ fn set_range_complete(&mut self) {
+ self.range_complete = true;
+ }
+
+ fn set_timed_out(&mut self) {
+ self.timed_out = true;
+ }
+
+ fn result(&mut self) -> Result {
+ let bin_count_exp = 0;
+ let bin_count = self.vals.ts1s.len() as u32;
+ let (missing_bins, continue_at, finished_at) = if bin_count < bin_count_exp {
+ match self.vals.ts2s.back() {
+ Some(&k) => {
+ let missing_bins = bin_count_exp - bin_count;
+ let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64));
+ let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64;
+ let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64));
+ (missing_bins, Some(continue_at), Some(finished_at))
+ }
+ None => Err(Error::with_msg("partial_content but no bin in result"))?,
+ }
+ } else {
+ (0, None, None)
+ };
+ if self.vals.ts1s.as_slices().1.len() != 0 {
+ panic!();
+ }
+ if self.vals.ts2s.as_slices().1.len() != 0 {
+ panic!();
+ }
+ let tst1 = ts_offs_from_abs(self.vals.ts1s.as_slices().0);
+ let tst2 = ts_offs_from_abs_with_anchor(tst1.0, self.vals.ts2s.as_slices().0);
+ let counts = mem::replace(&mut self.vals.counts, VecDeque::new());
+ let mins = mem::replace(&mut self.vals.mins, VecDeque::new());
+ let maxs = mem::replace(&mut self.vals.maxs, VecDeque::new());
+ let avgs = mem::replace(&mut self.vals.avgs, VecDeque::new());
+ let ret = BinsXbinDim0CollectedResult:: {
+ ts_anchor_sec: tst1.0,
+ ts1_off_ms: tst1.1,
+ ts1_off_ns: tst1.2,
+ ts2_off_ms: tst2.0,
+ ts2_off_ns: tst2.1,
+ counts,
+ mins,
+ maxs,
+ avgs,
+ finalised_range: self.range_complete,
+ timed_out: self.timed_out,
+ missing_bins,
+ continue_at,
+ finished_at,
+ };
+ Ok(ret)
+ }
+}
+
+impl CollectableType for BinsXbinDim0 {
+ type Collector = BinsXbinDim0Collector;
+
+ fn new_collector() -> Self::Collector {
+ Self::Collector::new()
+ }
+}
+
+impl items_0::collect_c::Collector for BinsXbinDim0Collector
+where
+ NTY: ScalarOps,
+{
+ fn len(&self) -> usize {
+ self.vals.len()
+ }
+
+ fn ingest(&mut self, item: &mut dyn items_0::collect_c::Collectable) {
+ trace4!("\n\n••••••••••••••••••••••••••••\nINGEST\n{:?}\n\n", item);
+ {
+ {
+ //let tyid = item.type_id();
+ //let tyid = item.as_any_mut().type_id();
+ //let tyid = format!("{:?}", item.type_id().to_owned());
+ trace4!("ty 0: {:40?}", (item.as_any_mut() as &dyn Any).type_id());
+ }
+ trace4!("ty 1: {:40?}", std::any::TypeId::of::>());
+ trace4!("ty 2: {:40?}", std::any::TypeId::of::>());
+ trace4!("ty 3: {:40?}", std::any::TypeId::of::>>());
+ trace4!(
+ "ty 4: {:?}",
+ std::any::TypeId::of::>()
+ );
+ trace4!(
+ "ty 5: {:?}",
+ std::any::TypeId::of::<&mut dyn items_0::collect_c::Collectable>()
+ );
+ trace4!("ty 6: {:?}", std::any::TypeId::of::>());
+ }
+ if let Some(item) = item.as_any_mut().downcast_mut::>() {
+ trace4!("ingest plain");
+ CollectorType::ingest(self, item)
+ } else if let Some(item) = item.as_any_mut().downcast_mut::>>() {
+ trace4!("ingest boxed");
+ CollectorType::ingest(self, item)
+ } else if let Some(item) = item.as_any_mut().downcast_mut::>() {
+ trace4!("ingest boxed dyn TimeBinned");
+ if let Some(item) = item.as_any_mut().downcast_mut::>() {
+ trace4!("ingest boxed dyn TimeBinned match");
+ CollectorType::ingest(self, item)
+ } else {
+ warn!("BinsDim0Collector::ingest unexpected inner item");
+ trace!("BinsDim0Collector::ingest unexpected inner item {:?}", item);
+ }
+ } else {
+ warn!("BinsDim0Collector::ingest unexpected item");
+ trace!("BinsDim0Collector::ingest unexpected item {:?}", item);
+ }
+ }
+
+ fn set_range_complete(&mut self) {
+ CollectorType::set_range_complete(self)
+ }
+
+ fn set_timed_out(&mut self) {
+ CollectorType::set_timed_out(self)
+ }
+
+ fn result(&mut self) -> Result, Error> {
+ match CollectorType::result(self) {
+ Ok(res) => Ok(Box::new(res)),
+ Err(e) => Err(e.into()),
+ }
+ }
+}
+
+pub struct BinsXbinDim0Aggregator {
+ range: NanoRange,
+ count: u64,
+ min: NTY,
+ max: NTY,
+ // Carry over to next bin:
+ avg: f32,
+ sumc: u64,
+ sum: f32,
+}
+
+impl BinsXbinDim0Aggregator {
+ pub fn new(range: NanoRange, _do_time_weight: bool) -> Self {
+ Self {
+ range,
+ count: 0,
+ min: NTY::zero_b(),
+ max: NTY::zero_b(),
+ avg: 0.,
+ sumc: 0,
+ sum: 0f32,
+ }
+ }
+}
+
+impl TimeBinnableTypeAggregator for BinsXbinDim0Aggregator {
+ type Input = BinsXbinDim0;
+ type Output = BinsXbinDim0;
+
+ fn range(&self) -> &NanoRange {
+ &self.range
+ }
+
+ fn ingest(&mut self, item: &Self::Input) {
+ for i1 in 0..item.ts1s.len() {
+ if item.counts[i1] == 0 {
+ } else if item.ts2s[i1] <= self.range.beg {
+ } else if item.ts1s[i1] >= self.range.end {
+ } else {
+ if self.count == 0 {
+ self.min = item.mins[i1].clone();
+ self.max = item.maxs[i1].clone();
+ } else {
+ if self.min > item.mins[i1] {
+ self.min = item.mins[i1].clone();
+ }
+ if self.max < item.maxs[i1] {
+ self.max = item.maxs[i1].clone();
+ }
+ }
+ self.count += item.counts[i1];
+ self.sum += item.avgs[i1];
+ self.sumc += 1;
+ }
+ }
+ }
+
+ fn result_reset(&mut self, range: NanoRange, _expand: bool) -> Self::Output {
+ if self.sumc > 0 {
+ self.avg = self.sum / self.sumc as f32;
+ }
+ let ret = Self::Output {
+ ts1s: [self.range.beg].into(),
+ ts2s: [self.range.end].into(),
+ counts: [self.count].into(),
+ mins: [self.min.clone()].into(),
+ maxs: [self.max.clone()].into(),
+ avgs: [self.avg].into(),
+ };
+ self.range = range;
+ self.count = 0;
+ self.sum = 0f32;
+ self.sumc = 0;
+ ret
+ }
+}
+
+impl TimeBinnable for BinsXbinDim0 {
+ fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box {
+ let ret = BinsXbinDim0TimeBinner::::new(edges.into(), do_time_weight);
+ Box::new(ret)
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ self as &dyn Any
+ }
+
+ fn to_box_to_json_result(&self) -> Box {
+ let k = serde_json::to_value(self).unwrap();
+ Box::new(k) as _
+ }
+}
+
+pub struct BinsXbinDim0TimeBinner {
+ edges: VecDeque,
+ do_time_weight: bool,
+ agg: Option>,
+ ready: Option< as TimeBinnableTypeAggregator>::Output>,
+}
+
+impl BinsXbinDim0TimeBinner {
+ fn new(edges: VecDeque, do_time_weight: bool) -> Self {
+ Self {
+ edges,
+ do_time_weight,
+ agg: None,
+ ready: None,
+ }
+ }
+
+ fn next_bin_range(&mut self) -> Option {
+ if self.edges.len() >= 2 {
+ let ret = NanoRange {
+ beg: self.edges[0],
+ end: self.edges[1],
+ };
+ self.edges.pop_front();
+ Some(ret)
+ } else {
+ None
+ }
+ }
+}
+
+impl TimeBinner for BinsXbinDim0TimeBinner {
+ fn ingest(&mut self, item: &dyn TimeBinnable) {
+ let self_name = std::any::type_name::();
+ if item.len() == 0 {
+ // Return already here, RangeOverlapInfo would not give much sense.
+ return;
+ }
+ if self.edges.len() < 2 {
+ warn!("TimeBinnerDyn for {self_name} no more bin in edges A\n{:?}\n\n", item);
+ return;
+ }
+ // TODO optimize by remembering at which event array index we have arrived.
+ // That needs modified interfaces which can take and yield the start and latest index.
+ loop {
+ while item.starts_after(NanoRange {
+ beg: 0,
+ end: self.edges[1],
+ }) {
+ self.cycle();
+ if self.edges.len() < 2 {
+ warn!("TimeBinnerDyn for {self_name} no more bin in edges B\n{:?}\n\n", item);
+ return;
+ }
+ }
+ if item.ends_before(NanoRange {
+ beg: self.edges[0],
+ end: u64::MAX,
+ }) {
+ return;
+ } else {
+ if self.edges.len() < 2 {
+ warn!("TimeBinnerDyn for {self_name} edge list exhausted");
+ return;
+ } else {
+ let agg = if let Some(agg) = self.agg.as_mut() {
+ agg
+ } else {
+ self.agg = Some(BinsXbinDim0Aggregator::new(
+ // We know here that we have enough edges for another bin.
+ // and `next_bin_range` will pop the first edge.
+ self.next_bin_range().unwrap(),
+ self.do_time_weight,
+ ));
+ self.agg.as_mut().unwrap()
+ };
+ if let Some(item) = item
+ .as_any()
+ // TODO make statically sure that we attempt to cast to the correct type here:
+ .downcast_ref::< as TimeBinnableTypeAggregator>::Input>()
+ {
+ agg.ingest(item);
+ } else {
+ let tyid_item = std::any::Any::type_id(item.as_any());
+ error!("not correct item type {:?}", tyid_item);
+ };
+ if item.ends_after(agg.range().clone()) {
+ self.cycle();
+ if self.edges.len() < 2 {
+ warn!("TimeBinnerDyn for {self_name} no more bin in edges C\n{:?}\n\n", item);
+ return;
+ }
+ } else {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ fn bins_ready_count(&self) -> usize {
+ match &self.ready {
+ Some(k) => k.len(),
+ None => 0,
+ }
+ }
+
+ fn bins_ready(&mut self) -> Option> {
+ match self.ready.take() {
+ Some(k) => Some(Box::new(k)),
+ None => None,
+ }
+ }
+
+ // TODO there is too much common code between implementors:
+ fn push_in_progress(&mut self, push_empty: bool) {
+ // TODO expand should be derived from AggKind. Is it still required after all?
+ let expand = true;
+ if let Some(agg) = self.agg.as_mut() {
+ let dummy_range = NanoRange { beg: 4, end: 5 };
+ let mut bins = agg.result_reset(dummy_range, expand);
+ self.agg = None;
+ assert_eq!(bins.len(), 1);
+ if push_empty || bins.counts[0] != 0 {
+ match self.ready.as_mut() {
+ Some(ready) => {
+ ready.append_all_from(&mut bins);
+ }
+ None => {
+ self.ready = Some(bins);
+ }
+ }
+ }
+ }
+ }
+
+ // TODO there is too much common code between implementors:
+ fn cycle(&mut self) {
+ let n = self.bins_ready_count();
+ self.push_in_progress(true);
+ if self.bins_ready_count() == n {
+ if let Some(range) = self.next_bin_range() {
+ let mut bins = BinsXbinDim0::::empty();
+ bins.append_zero(range.beg, range.end);
+ match self.ready.as_mut() {
+ Some(ready) => {
+ ready.append_all_from(&mut bins);
+ }
+ None => {
+ self.ready = Some(bins);
+ }
+ }
+ if self.bins_ready_count() <= n {
+ error!("failed to push a zero bin");
+ }
+ } else {
+ warn!("cycle: no in-progress bin pushed, but also no more bin to add as zero-bin");
+ }
+ }
+ }
+
+ fn set_range_complete(&mut self) {}
+}
+
+impl TimeBinned for BinsXbinDim0 {
+ fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable {
+ self as &dyn TimeBinnable
+ }
+
+ fn edges_slice(&self) -> (&[u64], &[u64]) {
+ if self.ts1s.as_slices().1.len() != 0 {
+ panic!();
+ }
+ if self.ts2s.as_slices().1.len() != 0 {
+ panic!();
+ }
+ (&self.ts1s.as_slices().0, &self.ts2s.as_slices().0)
+ }
+
+ fn counts(&self) -> &[u64] {
+ // TODO check for contiguous
+ self.counts.as_slices().0
+ }
+
+ // TODO is Vec needed?
+ fn mins(&self) -> Vec {
+ self.mins.iter().map(|x| x.clone().as_prim_f32_b()).collect()
+ }
+
+ // TODO is Vec needed?
+ fn maxs(&self) -> Vec {
+ self.maxs.iter().map(|x| x.clone().as_prim_f32_b()).collect()
+ }
+
+ // TODO is Vec needed?
+ fn avgs(&self) -> Vec {
+ self.avgs.iter().map(Clone::clone).collect()
+ }
+
+ fn validate(&self) -> Result<(), String> {
+ use std::fmt::Write;
+ let mut msg = String::new();
+ if self.ts1s.len() != self.ts2s.len() {
+ write!(&mut msg, "ts1s ≠ ts2s\n").unwrap();
+ }
+ for (i, ((count, min), max)) in self.counts.iter().zip(&self.mins).zip(&self.maxs).enumerate() {
+ if min.as_prim_f32_b() < 1. && *count != 0 {
+ write!(&mut msg, "i {} count {} min {:?} max {:?}\n", i, count, min, max).unwrap();
+ }
+ }
+ if msg.is_empty() {
+ Ok(())
+ } else {
+ Err(msg)
+ }
+ }
+
+ fn as_collectable_mut(&mut self) -> &mut dyn Collectable {
+ self
+ }
+}
+
+impl items_0::AsAnyMut for BinsXbinDim0
+where
+ NTY: 'static,
+{
+ fn as_any_mut(&mut self) -> &mut dyn Any {
+ self
+ }
+}
+
+impl items_0::collect_c::Collectable for BinsXbinDim0
+where
+ NTY: ScalarOps,
+{
+ fn new_collector(&self) -> Box {
+ Box::new(BinsXbinDim0Collector::