diff --git a/daqbufp2/src/test.rs b/daqbufp2/src/test.rs index 99dabb1..26709d2 100644 --- a/daqbufp2/src/test.rs +++ b/daqbufp2/src/test.rs @@ -14,6 +14,20 @@ use bytes::BytesMut; use err::Error; use std::future::Future; +fn f32_cmp_near(x: f32, y: f32) -> bool { + let x = { + let mut a = x.to_le_bytes(); + a[0] &= 0xf0; + f32::from_ne_bytes(a) + }; + let y = { + let mut a = y.to_le_bytes(); + a[0] &= 0xf0; + f32::from_ne_bytes(a) + }; + x == y +} + fn f32_iter_cmp_near(a: A, b: B) -> bool where A: IntoIterator, @@ -25,17 +39,7 @@ where let x = a.next(); let y = b.next(); if let (Some(x), Some(y)) = (x, y) { - let x = { - let mut a = x.to_ne_bytes(); - a[0] &= 0xf0; - f32::from_ne_bytes(a) - }; - let y = { - let mut a = y.to_ne_bytes(); - a[0] &= 0xf0; - f32::from_ne_bytes(a) - }; - if x != y { + if !f32_cmp_near(x, y) { return false; } } else if x.is_some() || y.is_some() { diff --git a/daqbufp2/src/test/api4/binnedjson.rs b/daqbufp2/src/test/api4/binnedjson.rs index 257981a..21ace3d 100644 --- a/daqbufp2/src/test/api4/binnedjson.rs +++ b/daqbufp2/src/test/api4/binnedjson.rs @@ -1,5 +1,6 @@ use crate::err::ErrConv; use crate::nodes::require_test_hosts_running; +use crate::test::f32_cmp_near; use chrono::{DateTime, Utc}; use err::Error; use http::StatusCode; @@ -16,7 +17,7 @@ fn binned_d0_json_00() -> Result<(), Error> { let fut = async { let rh = require_test_hosts_running()?; let cluster = &rh.cluster; - let jsv = binned_d0_json( + let jsv = get_binned_json( Channel { backend: "test-disk-databuffer".into(), name: "scalar-i32-be".into(), @@ -28,17 +29,235 @@ fn binned_d0_json_00() -> Result<(), Error> { cluster, ) .await?; - info!("Receveided a response json value: {jsv:?}"); - let res: items_2::eventsdim0::EventsDim0CollectorOutput = serde_json::from_value(jsv)?; + debug!("Receveided a response json value: {jsv:?}"); + let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?; // inmem was meant just for functional test, ignores the requested time range - assert_eq!(res.len(), 20); - assert_eq!(res.ts_anchor_sec(), 0); + assert_eq!(res.ts_anchor_sec(), 1200); + assert_eq!(res.len(), 8); + assert_eq!(res.ts1_off_ms()[0], 0); + assert_eq!(res.ts2_off_ms()[0], 5000); + assert_eq!(res.counts()[0], 5); + assert_eq!(res.counts()[1], 10); + assert_eq!(res.counts()[7], 7); + assert_eq!(res.mins()[0], 2405); + assert_eq!(res.maxs()[0], 2409); + assert_eq!(res.mins()[1], 2410); + assert_eq!(res.maxs()[1], 2419); Ok(()) }; taskrun::run(fut) } -async fn binned_d0_json( +#[test] +fn binned_d0_json_01() -> Result<(), Error> { + let fut = async { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + let jsv = get_binned_json( + Channel { + backend: "test-disk-databuffer".into(), + name: "scalar-i32-be".into(), + series: None, + }, + "1970-01-01T00:20:10.000Z", + "1970-01-01T01:20:30.000Z", + 10, + cluster, + ) + .await?; + debug!("Receveided a response json value: {jsv:?}"); + let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?; + // inmem was meant just for functional test, ignores the requested time range + assert_eq!(res.ts_anchor_sec(), 1200); + assert_eq!(res.len(), 13); + assert_eq!(res.range_final(), true); + Ok(()) + }; + taskrun::run(fut) +} + +#[test] +fn binned_d0_json_02() -> Result<(), Error> { + let fut = async { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + let jsv = get_binned_json( + Channel { + backend: "test-disk-databuffer".into(), + name: "wave-f64-be-n21".into(), + series: None, + }, + "1970-01-01T00:20:10.000Z", + "1970-01-01T01:20:45.000Z", + 10, + cluster, + ) + .await?; + debug!("Receveided a response json value: {jsv:?}"); + let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?; + // inmem was meant just for functional test, ignores the requested time range + assert_eq!(res.ts_anchor_sec(), 1200); + assert_eq!(res.len(), 13); + assert_eq!(res.range_final(), true); + Ok(()) + }; + taskrun::run(fut) +} + +#[test] +fn binned_d0_json_03() -> Result<(), Error> { + let fut = async { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + let jsv = get_binned_json( + Channel { + backend: "test-disk-databuffer".into(), + name: "wave-f64-be-n21".into(), + series: None, + }, + // TODO This test was meant to ask `AggKind::DimXBinsN(3)` + "1970-01-01T00:20:10.000Z", + "1970-01-01T01:20:20.000Z", + 2, + cluster, + ) + .await?; + debug!("Receveided a response json value: {jsv:?}"); + let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?; + // inmem was meant just for functional test, ignores the requested time range + assert_eq!(res.ts_anchor_sec(), 1200); + assert_eq!(res.len(), 4); + assert_eq!(res.range_final(), true); + assert_eq!(res.counts()[0], 300); + assert_eq!(res.counts()[3], 8); + assert_eq!(f32_cmp_near(res.avgs()[0], 44950.00390625), true); + Ok(()) + }; + taskrun::run(fut) +} + +#[test] +fn binned_d0_json_04() -> Result<(), Error> { + let fut = async { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + let jsv = get_binned_json( + Channel { + backend: "test-disk-databuffer".into(), + name: "const-regular-scalar-i32-be".into(), + series: None, + }, + "1970-01-01T00:20:10.000Z", + "1970-01-01T04:20:30.000Z", + // TODO must use AggKind::DimXBins1 + 20, + cluster, + ) + .await?; + debug!("Receveided a response json value: {jsv:?}"); + let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?; + // inmem was meant just for functional test, ignores the requested time range + assert_eq!(res.ts_anchor_sec(), 1200); + assert_eq!(res.len(), 17); + // TODO I would expect rangeFinal to be set, or? + assert_eq!(res.range_final(), false); + assert_eq!(f32_cmp_near(res.avgs()[0], 42.0), true); + Ok(()) + }; + taskrun::run(fut) +} + +#[test] +fn binned_d0_json_05() -> Result<(), Error> { + let fut = async { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + let jsv = get_binned_json( + Channel { + backend: "test-disk-databuffer".into(), + name: "const-regular-scalar-i32-be".into(), + series: None, + }, + "1970-01-01T00:20:10.000Z", + "1970-01-01T10:20:30.000Z", + // TODO must use AggKind::DimXBins1 + 10, + cluster, + ) + .await?; + debug!("Receveided a response json value: {jsv:?}"); + let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?; + // inmem was meant just for functional test, ignores the requested time range + assert_eq!(res.ts_anchor_sec(), 0); + assert_eq!(res.len(), 3); + assert_eq!(res.range_final(), false); + assert_eq!(f32_cmp_near(res.avgs()[0], 42.0), true); + Ok(()) + }; + taskrun::run(fut) +} + +#[test] +fn binned_d0_json_06() -> Result<(), Error> { + let fut = async { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + let jsv = get_binned_json( + Channel { + backend: "test-disk-databuffer".into(), + name: "const-regular-scalar-i32-be".into(), + series: None, + }, + "1970-01-01T00:20:10.000Z", + "1970-01-01T00:20:20.000Z", + // TODO must use AggKind::TimeWeightedScalar + 20, + cluster, + ) + .await?; + debug!("Receveided a response json value: {jsv:?}"); + let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?; + // inmem was meant just for functional test, ignores the requested time range + assert_eq!(res.ts_anchor_sec(), 1210); + assert_eq!(res.len(), 20); + assert_eq!(res.range_final(), true); + assert_eq!(f32_cmp_near(res.avgs()[0], 42.0), true); + Ok(()) + }; + taskrun::run(fut) +} + +#[test] +fn binned_d0_json_07() -> Result<(), Error> { + let fut = async { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + let jsv = get_binned_json( + Channel { + backend: "test-disk-databuffer".into(), + name: "const-regular-scalar-i32-be".into(), + series: None, + }, + "1970-01-01T00:20:11.000Z", + "1970-01-01T00:30:20.000Z", + // TODO must use AggKind::TimeWeightedScalar + 10, + cluster, + ) + .await?; + debug!("Receveided a response json value: {jsv:?}"); + let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?; + // inmem was meant just for functional test, ignores the requested time range + assert_eq!(res.ts_anchor_sec(), 1200); + assert_eq!(res.len(), 11); + assert_eq!(res.range_final(), true); + assert_eq!(f32_cmp_near(res.avgs()[0], 42.0), true); + Ok(()) + }; + taskrun::run(fut) +} + +async fn get_binned_json( channel: Channel, beg_date: &str, end_date: &str, @@ -55,7 +274,7 @@ async fn binned_d0_json( let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?; query.append_to_url(&mut url); let url = url; - info!("http get {}", url); + debug!("http get {}", url); let req = hyper::Request::builder() .method(http::Method::GET) .uri(url.to_string()) @@ -65,8 +284,11 @@ async fn binned_d0_json( let client = hyper::Client::new(); let res = client.request(req).await.ec()?; if res.status() != StatusCode::OK { - error!("client response {:?}", res); - return Err(Error::with_msg_no_trace(format!("bad result {res:?}"))); + error!("error response {:?}", res); + let buf = hyper::body::to_bytes(res.into_body()).await.ec()?; + let s = String::from_utf8_lossy(&buf); + error!("body of error response: {s}"); + return Err(Error::with_msg_no_trace(format!("error response"))); } let buf = hyper::body::to_bytes(res.into_body()).await.ec()?; let s = String::from_utf8_lossy(&buf); diff --git a/daqbufp2/src/test/api4/eventsjson.rs b/daqbufp2/src/test/api4/eventsjson.rs index 1a67457..1a6c025 100644 --- a/daqbufp2/src/test/api4/eventsjson.rs +++ b/daqbufp2/src/test/api4/eventsjson.rs @@ -31,8 +31,8 @@ fn events_plain_json_00() -> Result<(), Error> { info!("Receveided a response json value: {jsv:?}"); let res: items_2::eventsdim0::EventsDim0CollectorOutput = serde_json::from_value(jsv)?; // inmem was meant just for functional test, ignores the requested time range - assert_eq!(res.len(), 20); assert_eq!(res.ts_anchor_sec(), 0); + assert_eq!(res.len(), 60); Ok(()) }; taskrun::run(fut) diff --git a/daqbufp2/src/test/archapp.rs b/daqbufp2/src/test/archapp.rs index b9cba88..7f29855 100644 --- a/daqbufp2/src/test/archapp.rs +++ b/daqbufp2/src/test/archapp.rs @@ -1,4 +1,4 @@ -use super::binnedjson::ScalarEventsResponse; +#![allow(unused)] use super::events::get_plain_events_json; use crate::nodes::require_archapp_test_host_running; use crate::test::events::ch_gen; @@ -8,6 +8,8 @@ use netpod::log::*; #[test] fn get_events_1() -> Result<(), Error> { + let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) }; + #[cfg(DISABLED)] let fut = async { let rh = require_archapp_test_host_running()?; let cluster = &rh.cluster; diff --git a/daqbufp2/src/test/binnedjson.rs b/daqbufp2/src/test/binnedjson.rs index 5bd5ecc..5e2f575 100644 --- a/daqbufp2/src/test/binnedjson.rs +++ b/daqbufp2/src/test/binnedjson.rs @@ -1,117 +1,11 @@ mod channelarchiver; -use crate::err::ErrConv; -use crate::nodes::{require_sls_test_host_running, require_test_hosts_running}; -use chrono::{DateTime, Utc}; use err::Error; -use http::StatusCode; -use hyper::Body; -use netpod::log::*; -use netpod::query::{BinnedQuery, CacheUsage, PlainEventsQuery}; -use netpod::{f64_close, AppendToUrl}; -use netpod::{AggKind, Channel, Cluster, NanoRange, APP_JSON}; -use serde::{Deserialize, Serialize}; -use std::time::Duration; -use url::Url; - -#[test] -fn get_binned_json_0() { - taskrun::run(get_binned_json_0_inner()).unwrap(); -} - -async fn get_binned_json_0_inner() -> Result<(), Error> { - let rh = require_test_hosts_running()?; - let cluster = &rh.cluster; - get_binned_json_common( - "scalar-i32-be", - "1970-01-01T00:20:10.000Z", - "1970-01-01T01:20:30.000Z", - 10, - AggKind::DimXBins1, - cluster, - 13, - true, - ) - .await -} - -#[test] -fn get_binned_json_1() { - taskrun::run(get_binned_json_1_inner()).unwrap(); -} - -async fn get_binned_json_1_inner() -> Result<(), Error> { - let rh = require_test_hosts_running()?; - let cluster = &rh.cluster; - get_binned_json_common( - "wave-f64-be-n21", - "1970-01-01T00:20:10.000Z", - "1970-01-01T01:20:45.000Z", - 10, - AggKind::DimXBins1, - cluster, - 13, - true, - ) - .await -} - -#[test] -fn get_binned_json_2() { - taskrun::run(get_binned_json_2_inner()).unwrap(); -} - -async fn get_binned_json_2_inner() -> Result<(), Error> { - let rh = require_test_hosts_running()?; - let cluster = &rh.cluster; - get_binned_json_common( - "wave-f64-be-n21", - "1970-01-01T00:20:10.000Z", - "1970-01-01T00:20:20.000Z", - 2, - AggKind::DimXBinsN(3), - cluster, - 2, - true, - ) - .await -} - -#[allow(unused)] -fn check_close_events(a: &WaveEventsResponse, b: &WaveEventsResponse, jsstr: &String) -> Result<(), Error> { - match a.is_close(b) { - Ok(true) => Ok(()), - Ok(false) => { - error!("Mismatch, original JSON:\n{}", jsstr); - Err(Error::with_msg_no_trace("mismatch")) - } - Err(e) => { - error!("Mismatch, original JSON:\n{}", jsstr); - Err(e) - } - } -} - -fn check_close(a: &BinnedResponse, b: &BinnedResponse, jsstr: &String) -> Result<(), Error> { - match a.is_close(b) { - Ok(true) => Ok(()), - Ok(false) => { - error!("Mismatch, original JSON:\n{}", jsstr); - Err(Error::with_msg_no_trace("mismatch")) - } - Err(e) => { - error!("Mismatch, original JSON:\n{}", jsstr); - Err(e) - } - } -} #[test] fn get_sls_archive_1() -> Result<(), Error> { - // TODO OFFENDING TEST - if true { - return Ok(()); - } + let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) }; + #[cfg(DISABLED)] let fut = async move { let rh = require_sls_test_host_running()?; let cluster = &rh.cluster; @@ -124,8 +18,8 @@ fn get_sls_archive_1() -> Result<(), Error> { let endstr = "2021-11-10T01:01:00Z"; let (res, jsstr) = get_binned_json_common_res(channel, begstr, endstr, 10, AggKind::TimeWeightedScalar, cluster).await?; - let exp = r##"{"avgs":[24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875],"counts":[0,0,0,0,0,0,0,0,0,0,0,0],"finalisedRange":true,"maxs":[24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875],"mins":[24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875],"tsAnchor":1636506000,"tsMs":[0,5000,10000,15000,20000,25000,30000,35000,40000,45000,50000,55000,60000],"tsNs":[0,0,0,0,0,0,0,0,0,0,0,0,0]}"##; - let exp: BinnedResponse = serde_json::from_str(exp).unwrap(); + let exp = r##"{"avgs":[24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875],"counts":[0,0,0,0,0,0,0,0,0,0,0,0],"rangeFinal":true,"maxs":[24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875],"mins":[24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875,24.37225341796875],"tsAnchor":1636506000,"tsMs":[0,5000,10000,15000,20000,25000,30000,35000,40000,45000,50000,55000,60000],"tsNs":[0,0,0,0,0,0,0,0,0,0,0,0,0]}"##; + let exp: String = serde_json::from_str(exp).unwrap(); check_close(&res, &exp, &jsstr)?; Ok(()) }; @@ -134,6 +28,8 @@ fn get_sls_archive_1() -> Result<(), Error> { #[test] fn get_sls_archive_3() -> Result<(), Error> { + let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) }; + #[cfg(DISABLED)] let fut = async move { let rh = require_sls_test_host_running()?; let cluster = &rh.cluster; @@ -156,6 +52,8 @@ fn get_sls_archive_3() -> Result<(), Error> { #[test] fn get_sls_archive_wave_2() -> Result<(), Error> { + let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) }; + #[cfg(DISABLED)] let fut = async move { let rh = require_sls_test_host_running()?; let cluster = &rh.cluster; @@ -175,275 +73,3 @@ fn get_sls_archive_wave_2() -> Result<(), Error> { }; taskrun::run(fut) } - -async fn get_binned_json_common( - channel_name: &str, - beg_date: &str, - end_date: &str, - bin_count: u32, - agg_kind: AggKind, - cluster: &Cluster, - expect_bin_count: u32, - expect_finalised_range: bool, -) -> Result<(), Error> { - let t1 = Utc::now(); - let node0 = &cluster.nodes[0]; - let beg_date: DateTime = beg_date.parse()?; - let end_date: DateTime = end_date.parse()?; - let channel_backend = "testbackend"; - let channel = Channel { - backend: channel_backend.into(), - name: channel_name.into(), - series: None, - }; - let range = NanoRange::from_date_time(beg_date, end_date); - let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind); - query.set_timeout(Duration::from_millis(15000)); - query.set_cache_usage(CacheUsage::Ignore); - let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", node0.host, node0.port))?; - query.append_to_url(&mut url); - let url = url; - debug!("get_binned_json_common get {}", url); - let req = hyper::Request::builder() - .method(http::Method::GET) - .uri(url.to_string()) - .header(http::header::ACCEPT, APP_JSON) - .body(Body::empty()) - .ec()?; - let client = hyper::Client::new(); - let res = client.request(req).await.ec()?; - if res.status() != StatusCode::OK { - error!("get_binned_json_common client response {:?}", res); - } - let res = hyper::body::to_bytes(res.into_body()).await.ec()?; - let t2 = chrono::Utc::now(); - let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; - debug!("get_binned_json_common DONE time {} ms", ms); - let res = String::from_utf8_lossy(&res).to_string(); - let res: serde_json::Value = serde_json::from_str(res.as_str())?; - // TODO assert more - debug!( - "result from endpoint: --------------\n{}\n--------------", - serde_json::to_string_pretty(&res)? - ); - // TODO enable in future: - if false { - if expect_finalised_range { - if !res - .get("finalisedRange") - .ok_or(Error::with_msg("missing finalisedRange"))? - .as_bool() - .ok_or(Error::with_msg("key finalisedRange not bool"))? - { - return Err(Error::with_msg("expected finalisedRange")); - } - } else if res.get("finalisedRange").is_some() { - return Err(Error::with_msg("expect absent finalisedRange")); - } - } - if res.get("counts").unwrap().as_array().unwrap().len() != expect_bin_count as usize { - return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count))); - } - if res.get("mins").unwrap().as_array().unwrap().len() != expect_bin_count as usize { - return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count))); - } - if res.get("maxs").unwrap().as_array().unwrap().len() != expect_bin_count as usize { - return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count))); - } - if res.get("avgs").unwrap().as_array().unwrap().len() != expect_bin_count as usize { - return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count))); - } - Ok(()) -} - -// TODO reuse the types from server. -#[derive(Debug, Serialize, Deserialize)] -pub struct ScalarEventsResponse { - #[serde(rename = "tsAnchor")] - pub ts_anchor: u64, - #[serde(rename = "tsMs")] - pub ts_ms: Vec, - #[serde(rename = "tsNs")] - pub ts_ns: Vec, - pub values: Vec, - #[serde(rename = "finalisedRange", default = "bool_false")] - pub finalised_range: bool, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct WaveEventsResponse { - #[serde(rename = "tsAnchor")] - ts_anchor: u64, - #[serde(rename = "tsMs")] - ts_ms: Vec, - #[serde(rename = "tsNs")] - ts_ns: Vec, - values: Vec>, - #[serde(rename = "finalisedRange", default = "bool_false")] - finalised_range: bool, -} - -impl WaveEventsResponse { - pub fn is_close(&self, other: &Self) -> Result { - let reterr = || -> Result { - Err(Error::with_msg_no_trace(format!( - "Mismatch\n{:?}\nVS\n{:?}", - self, other - ))) - }; - if self.ts_anchor != other.ts_anchor { - return reterr(); - } - if self.finalised_range != other.finalised_range { - return reterr(); - } - let pairs = [(&self.values, &other.values)]; - for (t, u) in pairs { - for (j, k) in t.iter().zip(u) { - for (&a, &b) in j.iter().zip(k) { - if !f64_close(a, b) { - return reterr(); - } - } - } - } - Ok(true) - } -} - -#[derive(Debug, Serialize, Deserialize)] -struct BinnedResponse { - #[serde(rename = "tsAnchor")] - ts_anchor: u64, - #[serde(rename = "tsMs")] - ts_ms: Vec, - #[serde(rename = "tsNs")] - ts_ns: Vec, - mins: Vec>, - maxs: Vec>, - avgs: Vec>, - counts: Vec, - #[serde(rename = "finalisedRange", default = "bool_false")] - finalised_range: bool, -} - -impl BinnedResponse { - pub fn is_close(&self, other: &Self) -> Result { - let reterr = || -> Result { - Err(Error::with_msg_no_trace(format!( - "Mismatch\n{:?}\nVS\n{:?}", - self, other - ))) - }; - if self.ts_anchor != other.ts_anchor { - return reterr(); - } - if self.finalised_range != other.finalised_range { - return reterr(); - } - if self.counts != other.counts { - return reterr(); - } - let pairs = [ - (&self.mins, &other.mins), - (&self.maxs, &other.maxs), - (&self.avgs, &other.avgs), - ]; - for (t, u) in pairs { - for (&a, &b) in t.iter().zip(u) { - if let (Some(a), Some(b)) = (a, b) { - if !f64_close(a, b) { - return reterr(); - } - } else if let (None, None) = (a, b) { - } else { - return reterr(); - } - } - } - Ok(true) - } -} - -fn bool_false() -> bool { - false -} - -async fn get_binned_json_common_res( - channel: Channel, - beg_date: &str, - end_date: &str, - bin_count: u32, - agg_kind: AggKind, - cluster: &Cluster, -) -> Result<(BinnedResponse, String), Error> { - let t1 = Utc::now(); - let node0 = &cluster.nodes[0]; - let beg_date: DateTime = beg_date.parse()?; - let end_date: DateTime = end_date.parse()?; - let range = NanoRange::from_date_time(beg_date, end_date); - let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind); - query.set_timeout(Duration::from_millis(15000)); - query.set_cache_usage(CacheUsage::Ignore); - let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", node0.host, node0.port))?; - query.append_to_url(&mut url); - let url = url; - info!("get_binned_json_common_res get {}", url); - let req = hyper::Request::builder() - .method(http::Method::GET) - .uri(url.to_string()) - .header(http::header::ACCEPT, APP_JSON) - .body(Body::empty()) - .ec()?; - let client = hyper::Client::new(); - let res = client.request(req).await.ec()?; - if res.status() != StatusCode::OK { - let msg = format!("client response {res:?}"); - error!("{msg}"); - return Err(msg.into()); - } - let res = hyper::body::to_bytes(res.into_body()).await.ec()?; - let t2 = chrono::Utc::now(); - let _ms = t2.signed_duration_since(t1).num_milliseconds() as u64; - let res = String::from_utf8_lossy(&res).to_string(); - let ret: BinnedResponse = serde_json::from_str(res.as_str())?; - Ok((ret, res)) -} - -async fn get_events_json_common_res( - channel: Channel, - beg_date: &str, - end_date: &str, - cluster: &Cluster, -) -> Result { - let t1 = Utc::now(); - let node0 = &cluster.nodes[0]; - let beg_date: DateTime = beg_date.parse()?; - let end_date: DateTime = end_date.parse()?; - let range = NanoRange::from_date_time(beg_date, end_date); - let mut query = PlainEventsQuery::new(channel, range, 4096, None, false); - query.set_timeout(Duration::from_millis(15000)); - let mut url = Url::parse(&format!("http://{}:{}/api/4/events", node0.host, node0.port))?; - query.append_to_url(&mut url); - let url = url; - info!("get_events_json_common_res get {}", url); - let req = hyper::Request::builder() - .method(http::Method::GET) - .uri(url.to_string()) - .header(http::header::ACCEPT, APP_JSON) - .body(Body::empty()) - .ec()?; - let client = hyper::Client::new(); - let res = client.request(req).await.ec()?; - if res.status() != StatusCode::OK { - let msg = format!("client response {res:?}"); - error!("{msg}"); - return Err(msg.into()); - } - let res = hyper::body::to_bytes(res.into_body()).await.ec()?; - let t2 = chrono::Utc::now(); - let _ms = t2.signed_duration_since(t1).num_milliseconds() as u64; - let res = String::from_utf8_lossy(&res).to_string(); - //info!("STRING RESULT:{}", res); - Ok(res) -} diff --git a/daqbufp2/src/test/binnedjson/channelarchiver.rs b/daqbufp2/src/test/binnedjson/channelarchiver.rs index 140e872..0720ba6 100644 --- a/daqbufp2/src/test/binnedjson/channelarchiver.rs +++ b/daqbufp2/src/test/binnedjson/channelarchiver.rs @@ -2,6 +2,8 @@ use super::*; #[test] fn get_scalar_2_events() -> Result<(), Error> { + let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) }; + #[cfg(DISABLED)] let fut = async move { let rh = require_sls_test_host_running()?; let cluster = &rh.cluster; @@ -45,6 +47,8 @@ fn get_scalar_2_events() -> Result<(), Error> { #[test] fn get_scalar_2_binned() -> Result<(), Error> { + let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) }; + #[cfg(DISABLED)] let fut = async move { let rh = require_sls_test_host_running()?; let cluster = &rh.cluster; @@ -57,7 +61,7 @@ fn get_scalar_2_binned() -> Result<(), Error> { let endstr = "2021-11-10T00:10:00Z"; let (res, jsstr) = get_binned_json_common_res(channel, begstr, endstr, 10, AggKind::TimeWeightedScalar, cluster).await?; - let exp = r##"{"avgs":[401.1745910644531,401.5135498046875,400.8823547363281,400.66156005859375,401.8301086425781,401.19305419921875,400.5584411621094,401.4371337890625,401.4137268066406,400.77880859375],"counts":[19,6,6,19,6,6,6,19,6,6],"finalisedRange":true,"maxs":[402.04977411361034,401.8439029736943,401.22628955394583,402.1298351124666,402.1298351124666,401.5084092642013,400.8869834159359,402.05358654212733,401.74477983225313,401.1271664125047],"mins":[400.08256099885625,401.22628955394583,400.60867613419754,400.0939982844072,401.5084092642013,400.8869834159359,400.2693699961876,400.05968642775446,401.1271664125047,400.50574056423943],"tsAnchor":1636502400,"tsMs":[0,60000,120000,180000,240000,300000,360000,420000,480000,540000,600000],"tsNs":[0,0,0,0,0,0,0,0,0,0,0]}"##; + let exp = r##"{"avgs":[401.1745910644531,401.5135498046875,400.8823547363281,400.66156005859375,401.8301086425781,401.19305419921875,400.5584411621094,401.4371337890625,401.4137268066406,400.77880859375],"counts":[19,6,6,19,6,6,6,19,6,6],"rangeFinal":true,"maxs":[402.04977411361034,401.8439029736943,401.22628955394583,402.1298351124666,402.1298351124666,401.5084092642013,400.8869834159359,402.05358654212733,401.74477983225313,401.1271664125047],"mins":[400.08256099885625,401.22628955394583,400.60867613419754,400.0939982844072,401.5084092642013,400.8869834159359,400.2693699961876,400.05968642775446,401.1271664125047,400.50574056423943],"tsAnchor":1636502400,"tsMs":[0,60000,120000,180000,240000,300000,360000,420000,480000,540000,600000],"tsNs":[0,0,0,0,0,0,0,0,0,0,0]}"##; let exp: BinnedResponse = serde_json::from_str(exp).unwrap(); check_close(&res, &exp, &jsstr)?; Ok(()) @@ -67,6 +71,8 @@ fn get_scalar_2_binned() -> Result<(), Error> { #[test] fn get_wave_1_events() -> Result<(), Error> { + let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) }; + #[cfg(DISABLED)] let fut = async move { let rh = require_sls_test_host_running()?; let cluster = &rh.cluster; @@ -108,6 +114,8 @@ fn get_wave_1_events() -> Result<(), Error> { #[test] fn get_wave_1_binned() -> Result<(), Error> { + let fut = async { return Err::<(), _>(Error::with_msg_no_trace("TODO")) }; + #[cfg(DISABLED)] let fut = async move { let rh = require_sls_test_host_running()?; let cluster = &rh.cluster; diff --git a/daqbufp2/src/test/timeweightedjson.rs b/daqbufp2/src/test/timeweightedjson.rs index 22d6767..8a7ed8a 100644 --- a/daqbufp2/src/test/timeweightedjson.rs +++ b/daqbufp2/src/test/timeweightedjson.rs @@ -10,79 +10,6 @@ use netpod::{AggKind, Channel, Cluster, NanoRange, APP_JSON}; use std::time::Duration; use url::Url; -#[test] -fn time_weighted_json_00() -> Result<(), Error> { - async fn inner() -> Result<(), Error> { - let rh = require_test_hosts_running()?; - let cluster = &rh.cluster; - let res = get_json_common( - "const-regular-scalar-i32-be", - "1970-01-01T00:20:10.000Z", - "1970-01-01T04:20:30.000Z", - 20, - AggKind::DimXBins1, - cluster, - 25, - true, - ) - .await?; - let v = res.avgs[0]; - assert!(v > 41.9999 && v < 42.0001); - Ok(()) - } - super::run_test(inner()) -} - -#[test] -fn time_weighted_json_01() -> Result<(), Error> { - // TODO OFFENDING TEST - if true { - return Ok(()); - } - async fn inner() -> Result<(), Error> { - let rh = require_test_hosts_running()?; - let cluster = &rh.cluster; - let res = get_json_common( - "const-regular-scalar-i32-be", - "1970-01-01T00:20:10.000Z", - "1970-01-01T10:20:30.000Z", - 10, - AggKind::DimXBins1, - cluster, - 9, - true, - ) - .await?; - let v = res.avgs[0]; - assert!(v > 41.9999 && v < 42.0001); - Ok(()) - } - super::run_test(inner()) -} - -#[test] -fn time_weighted_json_02() -> Result<(), Error> { - async fn inner() -> Result<(), Error> { - let rh = require_test_hosts_running()?; - let cluster = &rh.cluster; - let res = get_json_common( - "const-regular-scalar-i32-be", - "1970-01-01T00:20:10.000Z", - "1970-01-01T00:20:20.000Z", - 20, - AggKind::TimeWeightedScalar, - cluster, - 100, - true, - ) - .await?; - let v = res.avgs[0]; - assert!(v > 41.9999 && v < 42.0001); - Ok(()) - } - super::run_test(inner()) -} - #[test] fn time_weighted_json_03() -> Result<(), Error> { async fn inner() -> Result<(), Error> { @@ -209,15 +136,15 @@ async fn get_json_common( if false { if expect_finalised_range { if !res - .get("finalisedRange") - .ok_or(Error::with_msg("missing finalisedRange"))? + .get("rangeFinal") + .ok_or(Error::with_msg("missing rangeFinal"))? .as_bool() - .ok_or(Error::with_msg("key finalisedRange not bool"))? + .ok_or(Error::with_msg("key rangeFinal not bool"))? { - return Err(Error::with_msg("expected finalisedRange")); + return Err(Error::with_msg("expected rangeFinal")); } - } else if res.get("finalisedRange").is_some() { - return Err(Error::with_msg("expect absent finalisedRange")); + } else if res.get("rangeFinal").is_some() { + return Err(Error::with_msg("expect absent rangeFinal")); } } let counts = res.get("counts").unwrap().as_array().unwrap(); diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index 734ddc5..198ebf9 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -121,7 +121,7 @@ where // TODO should we accumulate bins before emit? Maybe not, we want to stay responsive. // Only if the frequency would be high, that would require cpu time checks. Worth it? Measure.. self.tmp_agg_results.push_back(ret); - if self.curbin >= self.spec.count as u32 { + if self.curbin >= self.spec.bin_count() as u32 { self.all_bins_emitted = true; } } diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 63f18e7..c9693e2 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -77,7 +77,7 @@ impl ChannelExecFunction for BinnedBinaryChannelExec { let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) { Ok(Some(pre_range)) => { debug!("BinnedBinaryChannelExec found pre_range: {pre_range:?}"); - if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { + if range.grid_spec().bin_t_len() < pre_range.grid_spec.bin_t_len() { let msg = format!( "BinnedBinaryChannelExec incompatible ranges:\npre_range: {pre_range:?}\nrange: {range:?}" ); @@ -323,12 +323,12 @@ impl ChannelExecFunction for BinnedJsonChannelExec { { let _ = event_value_shape; let range = BinnedRange::covering_range(self.query.range().clone(), self.query.bin_count())?; - let t_bin_count = range.count as u32; + let t_bin_count = range.bin_count() as u32; let perf_opts = PerfOpts { inmem_bufcap: 512 }; let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) { Ok(Some(pre_range)) => { info!("BinnedJsonChannelExec found pre_range: {pre_range:?}"); - if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { + if range.grid_spec().bin_t_len() < pre_range.grid_spec.bin_t_len() { let msg = format!( "BinnedJsonChannelExec incompatible ranges:\npre_range: {pre_range:?}\nrange: {range:?}" ); diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 3f64cb2..cb4b40a 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -49,20 +49,35 @@ where let item = EventsDim0 { tss, pulses, values }; let item = ChannelEvents::Events(Box::new(item)); Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + } else if let Some(item) = item.as_any_mut().downcast_mut::>() { + warn!("WaveEvents"); + let _tss: VecDeque = item.tss.iter().map(|x| *x).collect(); + let _pulses: VecDeque = item.pulses.iter().map(|x| *x).collect(); + let _values: VecDeque> = item.vals.iter().map(|x| x.clone()).collect(); + //let item = EventsDim1 { tss, pulses, values }; + //let item = ChannelEvents::Events(Box::new(item)); + //Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) + } else if let Some(item) = item + .as_any_mut() + .downcast_mut::>() + { + warn!("XBinnedScalarEvents"); + let tss: VecDeque = item.tss.iter().map(|x| *x).collect(); + let pulses: VecDeque = (0..tss.len()).map(|_| 0).collect(); + let _avgs: VecDeque = item.avgs.iter().map(|x| x.clone()).collect(); + let mins: VecDeque = item.mins.iter().map(|x| x.clone()).collect(); + let _maxs: VecDeque = item.maxs.iter().map(|x| x.clone()).collect(); + let item = EventsDim0 { + tss, + pulses, + values: mins, + }; + let item = ChannelEvents::Events(Box::new(item)); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) } else { - if let Some(item) = item.as_any_mut().downcast_mut::>() { - warn!("WaveEvents"); - let _tss: VecDeque = item.tss.iter().map(|x| *x).collect(); - let _pulses: VecDeque = item.pulses.iter().map(|x| *x).collect(); - let _values: VecDeque> = item.vals.iter().map(|x| x.clone()).collect(); - //let item = EventsDim1 { tss, pulses, values }; - //let item = ChannelEvents::Events(Box::new(item)); - //Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) - Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) - } else { - error!("TODO bad, no idea what this item is"); - Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) - } + error!("TODO bad, no idea what this item is\n\n{:?}\n\n", item); + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) } } RangeCompletableItem::RangeComplete => Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)), diff --git a/httpret/src/api4/binned.rs b/httpret/src/api4/binned.rs index 8676ee2..21cd8c6 100644 --- a/httpret/src/api4/binned.rs +++ b/httpret/src/api4/binned.rs @@ -16,7 +16,7 @@ use url::Url; async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCached) -> Result, Error> { info!("httpret plain_events_json req: {:?}", req); - let (head, _body) = req.into_parts(); + let (_head, _body) = req.into_parts(); let query = BinnedQuery::from_url(&url).map_err(|e| { let msg = format!("can not parse query: {}", e.msg()); e.add_public_msg(msg) @@ -27,7 +27,6 @@ async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCache query.set_series_id(chconf.series); let query = query; // --- - let span1 = span!( Level::INFO, "httpret::binned", @@ -38,15 +37,7 @@ async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCache span1.in_scope(|| { debug!("begin"); }); - let _: Result<_, Error> = match head.headers.get(http::header::ACCEPT) { - //Some(v) if v == APP_OCTET => binned_binary(query, chconf, &ctx, node_config).await, - //Some(v) if v == APP_JSON || v == ACCEPT_ALL => binned_json(query, chconf, &ctx, node_config).await, - _ => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?), - }; - - // TODO analogue to `streams::plaineventsjson::plain_events_json` create a function for binned json. - - let item = streams::plaineventsjson::plain_events_json("", &node_config.node_config.cluster) + let item = streams::timebinnedjson::timebinned_json(&query, &node_config.node_config.cluster) .instrument(span1) .await?; let buf = serde_json::to_vec(&item)?; diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index 8d4a5eb..5c4e4ce 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -43,24 +43,49 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) -> ); } if channel.backend() == "test-inmem" { - if channel.name() == "inmem-d0-i32" { + let ret = if channel.name() == "inmem-d0-i32" { let ret = ChConf { series: 1, scalar_type: ScalarType::I32, shape: Shape::Scalar, }; - return Ok(ret); - } + Ok(ret) + } else { + error!("no test information"); + Err(Error::with_msg_no_trace(format!("no test information")) + .add_public_msg("No channel config for test channel {:?}")) + }; + return ret; } if channel.backend() == "test-disk-databuffer" { - if channel.name() == "scalar-i32-be" { + // TODO the series-ids here are just random. Need to integrate with better test setup. + let ret = if channel.name() == "scalar-i32-be" { let ret = ChConf { series: 1, scalar_type: ScalarType::I32, shape: Shape::Scalar, }; - return Ok(ret); - } + Ok(ret) + } else if channel.name() == "wave-f64-be-n21" { + let ret = ChConf { + series: 2, + scalar_type: ScalarType::F64, + shape: Shape::Wave(21), + }; + Ok(ret) + } else if channel.name() == "const-regular-scalar-i32-be" { + let ret = ChConf { + series: 3, + scalar_type: ScalarType::I32, + shape: Shape::Scalar, + }; + Ok(ret) + } else { + error!("no test information"); + Err(Error::with_msg_no_trace(format!("no test information")) + .add_public_msg("No channel config for test channel {:?}")) + }; + return ret; } // TODO use a common already running worker pool for these queries: let dbconf = &ncc.node_config.cluster.database; diff --git a/httpret/static/documentation/api4.html b/httpret/static/documentation/api4.html index 0da21df..01dbb80 100644 --- a/httpret/static/documentation/api4.html +++ b/httpret/static/documentation/api4.html @@ -199,7 +199,7 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/events?channel

Example response:

 {
-  "finalisedRange": true,
+  "rangeFinal": true,
   "tsAnchor": 1623763172,
   "tsMs": [
     5,
@@ -224,7 +224,7 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/events?channel
 
   

Finalised range

If the server can determine that no more data will be added to the requested time range - then it will add the flag finalisedRange: true to the response.

+ then it will add the flag rangeFinal: true to the response.

@@ -312,7 +312,7 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/binned?channel 0, 0 ], - "finalisedRange": true + "rangeFinal": true }
@@ -321,7 +321,7 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/binned?channel

Finalised range

If the server can determine that no more data will be added to the requested time range - then it will add the flag finalisedRange: true to the response.

+ then it will add the flag rangeFinal: true to the response.

diff --git a/items/src/binsdim0.rs b/items/src/binsdim0.rs index 6548a19..280d896 100644 --- a/items/src/binsdim0.rs +++ b/items/src/binsdim0.rs @@ -266,7 +266,7 @@ pub struct MinMaxAvgBinsCollectedResult { mins: Vec, maxs: Vec, avgs: Vec, - #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")] + #[serde(skip_serializing_if = "crate::bool_is_false", rename = "rangeFinal")] finalised_range: bool, #[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")] missing_bins: u32, diff --git a/items/src/binsdim1.rs b/items/src/binsdim1.rs index a2591f7..535ded8 100644 --- a/items/src/binsdim1.rs +++ b/items/src/binsdim1.rs @@ -259,7 +259,7 @@ pub struct MinMaxAvgDim1BinsCollectedResult { mins: Vec>>, maxs: Vec>>, avgs: Vec>>, - #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")] + #[serde(skip_serializing_if = "crate::bool_is_false", rename = "rangeFinal")] finalised_range: bool, #[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")] missing_bins: u32, @@ -497,7 +497,7 @@ pub struct WaveEventsCollectedResult { #[serde(rename = "pulseOff")] pulse_off: Vec, values: Vec>, - #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")] + #[serde(skip_serializing_if = "crate::bool_is_false", rename = "rangeFinal")] range_complete: bool, #[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")] timed_out: bool, diff --git a/items/src/items.rs b/items/src/items.rs index a964dd1..b440c1c 100644 --- a/items/src/items.rs +++ b/items/src/items.rs @@ -371,7 +371,7 @@ impl FrameType for EventQueryJsonStringFrame { } pub trait EventsNodeProcessorOutput: - Send + Unpin + DeserializeOwned + WithTimestamps + TimeBinnableType + ByteEstimate + fmt::Debug + Send + Unpin + DeserializeOwned + WithTimestamps + TimeBinnableType + ByteEstimate { fn as_any_mut(&mut self) -> &mut dyn Any; fn into_parts(self) -> (Box, VecDeque, VecDeque); diff --git a/items/src/scalarevents.rs b/items/src/scalarevents.rs index 2e26270..eb61ce8 100644 --- a/items/src/scalarevents.rs +++ b/items/src/scalarevents.rs @@ -290,7 +290,7 @@ pub struct EventValuesCollectorOutput { #[serde(rename = "pulseOff")] pulse_off: Vec, values: Vec, - #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")] + #[serde(skip_serializing_if = "crate::bool_is_false", rename = "rangeFinal")] range_complete: bool, #[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")] timed_out: bool, diff --git a/items/src/statsevents.rs b/items/src/statsevents.rs index bd0045f..cfd07f8 100644 --- a/items/src/statsevents.rs +++ b/items/src/statsevents.rs @@ -221,7 +221,7 @@ pub struct StatsEventsCollectorOutput { #[serde(rename = "tsNs")] ts_off_ns: Vec, // TODO what to collect? pulse min/max - #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")] + #[serde(skip_serializing_if = "crate::bool_is_false", rename = "rangeFinal")] range_complete: bool, #[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")] timed_out: bool, diff --git a/items/src/xbinnedscalarevents.rs b/items/src/xbinnedscalarevents.rs index 7e68845..338b8ea 100644 --- a/items/src/xbinnedscalarevents.rs +++ b/items/src/xbinnedscalarevents.rs @@ -429,7 +429,7 @@ pub struct XBinnedScalarEventsCollectedResult { mins: Vec, maxs: Vec, avgs: Vec, - #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")] + #[serde(skip_serializing_if = "crate::bool_is_false", rename = "rangeFinal")] finalised_range: bool, #[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")] timed_out: bool, diff --git a/items/src/xbinnedwaveevents.rs b/items/src/xbinnedwaveevents.rs index af06bff..516a564 100644 --- a/items/src/xbinnedwaveevents.rs +++ b/items/src/xbinnedwaveevents.rs @@ -450,7 +450,7 @@ pub struct XBinnedWaveEventsCollectedResult { mins: Vec>, maxs: Vec>, avgs: Vec>, - #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")] + #[serde(skip_serializing_if = "crate::bool_is_false", rename = "rangeFinal")] finalised_range: bool, #[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")] timed_out: bool, diff --git a/items_0/src/collect_c.rs b/items_0/src/collect_c.rs index 0bba76a..4b74dae 100644 --- a/items_0/src/collect_c.rs +++ b/items_0/src/collect_c.rs @@ -7,27 +7,18 @@ use std::any::Any; use std::fmt; pub trait Collector: fmt::Debug + Send { - // TODO should require here Collectable? - type Input; - type Output: Collected; - fn len(&self) -> usize; - - fn ingest(&mut self, item: &mut Self::Input); - + fn ingest(&mut self, item: &mut dyn Collectable); fn set_range_complete(&mut self); - fn set_timed_out(&mut self); - - fn result(&mut self) -> Result; + fn result(&mut self) -> Result, Error>; } -pub trait Collectable: fmt::Debug { - type Collector: Collector; - - fn new_collector(&self) -> Self::Collector; +pub trait Collectable: fmt::Debug + crate::AsAnyMut { + fn new_collector(&self) -> Box; } +// TODO can this get removed? pub trait Collected: fmt::Debug + ToJsonResult + AsAnyRef + Send {} erased_serde::serialize_trait_object!(Collected); @@ -53,6 +44,7 @@ impl Collected for Box {} #[derive(Debug)] pub struct CollectorDynDefault {} +// TODO remove? pub trait CollectorDyn: fmt::Debug + Send { fn len(&self) -> usize; @@ -70,50 +62,15 @@ pub trait CollectableWithDefault { fn as_any_mut(&mut self) -> &mut dyn Any; } -#[derive(Debug)] -pub struct EventsCollector { - coll: Box, -} - -impl EventsCollector { - pub fn new(coll: Box) -> Self { - Self { coll } - } -} - -impl Collector for EventsCollector { - type Input = Box; - - // TODO this Output trait does not differentiate between e.g. collected events, collected bins, different aggs, etc... - type Output = Box; - - fn len(&self) -> usize { - self.coll.len() - } - - fn ingest(&mut self, item: &mut Self::Input) { - self.coll.ingest(item.as_collectable_with_default_mut()); - } - - fn set_range_complete(&mut self) { - self.coll.set_range_complete() - } - - fn set_timed_out(&mut self) { - self.coll.set_timed_out() - } - - fn result(&mut self) -> Result { - self.coll.result() +impl crate::AsAnyMut for Box { + fn as_any_mut(&mut self) -> &mut dyn Any { + self } } impl Collectable for Box { - type Collector = EventsCollector; - - fn new_collector(&self) -> Self::Collector { - let coll = CollectableWithDefault::new_collector(self.as_ref()); - EventsCollector::new(coll) + fn new_collector(&self) -> Box { + todo!() } } @@ -121,14 +78,11 @@ impl Collectable for Box { pub struct TimeBinnedCollector {} impl Collector for TimeBinnedCollector { - type Input = Box; - type Output = Box; - fn len(&self) -> usize { todo!() } - fn ingest(&mut self, _item: &mut Self::Input) { + fn ingest(&mut self, _item: &mut dyn Collectable) { todo!() } @@ -140,15 +94,19 @@ impl Collector for TimeBinnedCollector { todo!() } - fn result(&mut self) -> Result { + fn result(&mut self) -> Result, Error> { todo!() } } +impl crate::AsAnyMut for Box { + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} + impl Collectable for Box { - type Collector = TimeBinnedCollector; - - fn new_collector(&self) -> Self::Collector { - todo!() + fn new_collector(&self) -> Box { + self.as_ref().new_collector() } } diff --git a/items_0/src/collect_s.rs b/items_0/src/collect_s.rs index 90dbefb..5efac83 100644 --- a/items_0/src/collect_s.rs +++ b/items_0/src/collect_s.rs @@ -5,6 +5,7 @@ use serde::Serialize; use std::any::Any; use std::fmt; +// TODO rename to `Typed` pub trait CollectorType: Send + Unpin + WithLen { type Input: Collectable; type Output: Collected + ToJsonResult + Serialize; @@ -24,6 +25,7 @@ pub trait Collector: Send + Unpin + WithLen { fn result(&mut self) -> Result, Error>; } +// TODO rename to `Typed` pub trait CollectableType { type Collector: CollectorType; fn new_collector() -> Self::Collector; diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs index 0444541..7564036 100644 --- a/items_0/src/items_0.rs +++ b/items_0/src/items_0.rs @@ -61,14 +61,8 @@ pub trait AsAnyMut { fn as_any_mut(&mut self) -> &mut dyn Any; } -/*impl AsAnyRef for Box { - fn as_any_ref(&self) -> &dyn Any { - self.as_ref().as_any_ref() - } -}*/ - /// Data in time-binned form. -pub trait TimeBinned: Any + TimeBinnable { +pub trait TimeBinned: Any + TimeBinnable + crate::collect_c::Collectable { fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable; fn as_collectable_mut(&mut self) -> &mut dyn Collectable; fn edges_slice(&self) -> (&[u64], &[u64]); diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs index 07da626..7930421 100644 --- a/items_2/src/binsdim0.rs +++ b/items_2/src/binsdim0.rs @@ -19,6 +19,12 @@ use std::any::Any; use std::collections::VecDeque; use std::{fmt, mem}; +#[allow(unused)] +macro_rules! trace4 { + ($($arg:tt)*) => (); + ($($arg:tt)*) => (eprintln!($($arg)*)); +} + // TODO make members private #[derive(Clone, PartialEq, Serialize, Deserialize)] pub struct BinsDim0 { @@ -51,17 +57,6 @@ where } impl BinsDim0 { - pub fn empty() -> Self { - Self { - ts1s: VecDeque::new(), - ts2s: VecDeque::new(), - counts: VecDeque::new(), - mins: VecDeque::new(), - maxs: VecDeque::new(), - avgs: VecDeque::new(), - } - } - pub fn push(&mut self, ts1: u64, ts2: u64, count: u64, min: NTY, max: NTY, avg: f32) { self.ts1s.push_back(ts1); self.ts2s.push_back(ts2); @@ -119,6 +114,19 @@ impl BinsDim0 { } } +impl Empty for BinsDim0 { + fn empty() -> Self { + Self { + ts1s: VecDeque::new(), + ts2s: VecDeque::new(), + counts: VecDeque::new(), + mins: VecDeque::new(), + maxs: VecDeque::new(), + avgs: VecDeque::new(), + } + } +} + impl WithLen for BinsDim0 { fn len(&self) -> usize { self.ts1s.len() @@ -151,19 +159,6 @@ impl RangeOverlapInfo for BinsDim0 { } } -impl Empty for BinsDim0 { - fn empty() -> Self { - Self { - ts1s: Default::default(), - ts2s: Default::default(), - counts: Default::default(), - mins: Default::default(), - maxs: Default::default(), - avgs: Default::default(), - } - } -} - impl AppendEmptyBin for BinsDim0 { fn append_empty_bin(&mut self, ts1: u64, ts2: u64) { self.ts1s.push_back(ts1); @@ -207,7 +202,8 @@ impl TimeBinnableType for BinsDim0 { } } -#[derive(Debug, Serialize)] +// TODO rename to BinsDim0CollectorOutput +#[derive(Debug, Serialize, Deserialize)] pub struct BinsDim0CollectedResult { #[serde(rename = "tsAnchor")] ts_anchor_sec: u64, @@ -219,17 +215,23 @@ pub struct BinsDim0CollectedResult { ts1_off_ns: VecDeque, #[serde(rename = "ts2Ns")] ts2_off_ns: VecDeque, + #[serde(rename = "counts")] counts: VecDeque, + #[serde(rename = "mins")] mins: VecDeque, + #[serde(rename = "maxs")] maxs: VecDeque, + #[serde(rename = "avgs")] avgs: VecDeque, - #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")] + #[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")] finalised_range: bool, - #[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")] + #[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")] + timed_out: bool, + #[serde(rename = "missingBins", default, skip_serializing_if = "Zero::is_zero")] missing_bins: u32, - #[serde(skip_serializing_if = "Option::is_none", rename = "continueAt")] + #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] continue_at: Option, - #[serde(skip_serializing_if = "Option::is_none", rename = "finishedAt")] + #[serde(rename = "finishedAt", default, skip_serializing_if = "Option::is_none")] finished_at: Option, } @@ -242,14 +244,30 @@ impl items_0::AsAnyRef for BinsDim0CollectedResult { impl items_0::collect_c::Collected for BinsDim0CollectedResult {} impl BinsDim0CollectedResult { + pub fn len(&self) -> usize { + self.ts1_off_ms.len() + } + pub fn ts_anchor_sec(&self) -> u64 { self.ts_anchor_sec } + pub fn ts1_off_ms(&self) -> &VecDeque { + &self.ts1_off_ms + } + + pub fn ts2_off_ms(&self) -> &VecDeque { + &self.ts2_off_ms + } + pub fn counts(&self) -> &VecDeque { &self.counts } + pub fn range_final(&self) -> bool { + self.finalised_range + } + pub fn missing_bins(&self) -> u32 { self.missing_bins } @@ -265,6 +283,10 @@ impl BinsDim0CollectedResult { pub fn maxs(&self) -> &VecDeque { &self.maxs } + + pub fn avgs(&self) -> &VecDeque { + &self.avgs + } } impl ToJsonResult for BinsDim0CollectedResult { @@ -278,6 +300,7 @@ impl ToJsonResult for BinsDim0CollectedResult { } } +#[derive(Debug)] pub struct BinsDim0Collector { timed_out: bool, range_complete: bool, @@ -305,6 +328,7 @@ impl CollectorType for BinsDim0Collector { type Output = BinsDim0CollectedResult; fn ingest(&mut self, src: &mut Self::Input) { + trace!("\n\n----------- BinsDim0Collector ingest\n{:?}\n\n", src); // TODO could be optimized by non-contiguous container. self.vals.ts1s.append(&mut src.ts1s); self.vals.ts2s.append(&mut src.ts2s); @@ -362,6 +386,7 @@ impl CollectorType for BinsDim0Collector { maxs, avgs, finalised_range: self.range_complete, + timed_out: self.timed_out, missing_bins, continue_at, finished_at, @@ -378,6 +403,73 @@ impl CollectableType for BinsDim0 { } } +impl items_0::collect_c::Collector for BinsDim0Collector +where + NTY: ScalarOps, +{ + fn len(&self) -> usize { + self.vals.len() + } + + fn ingest(&mut self, item: &mut dyn items_0::collect_c::Collectable) { + trace4!("\n\n••••••••••••••••••••••••••••\nINGEST\n{:?}\n\n", item); + { + { + //let tyid = item.type_id(); + //let tyid = item.as_any_mut().type_id(); + //let tyid = format!("{:?}", item.type_id().to_owned()); + trace4!("ty 0: {:40?}", (item.as_any_mut() as &dyn Any).type_id()); + } + trace4!("ty 1: {:40?}", std::any::TypeId::of::>()); + trace4!("ty 2: {:40?}", std::any::TypeId::of::>()); + trace4!("ty 3: {:40?}", std::any::TypeId::of::>>()); + trace4!( + "ty 4: {:?}", + std::any::TypeId::of::>() + ); + trace4!( + "ty 5: {:?}", + std::any::TypeId::of::<&mut dyn items_0::collect_c::Collectable>() + ); + trace4!("ty 6: {:?}", std::any::TypeId::of::>()); + } + if let Some(item) = item.as_any_mut().downcast_mut::>() { + trace4!("ingest plain"); + CollectorType::ingest(self, item) + } else if let Some(item) = item.as_any_mut().downcast_mut::>>() { + trace4!("ingest boxed"); + CollectorType::ingest(self, item) + } else if let Some(item) = item.as_any_mut().downcast_mut::>() { + trace4!("ingest boxed dyn TimeBinned"); + if let Some(item) = item.as_any_mut().downcast_mut::>() { + trace4!("ingest boxed dyn TimeBinned match"); + CollectorType::ingest(self, item) + } else { + warn!("BinsDim0Collector::ingest unexpected inner item"); + trace!("BinsDim0Collector::ingest unexpected inner item {:?}", item); + } + } else { + warn!("BinsDim0Collector::ingest unexpected item"); + trace!("BinsDim0Collector::ingest unexpected item {:?}", item); + } + } + + fn set_range_complete(&mut self) { + CollectorType::set_range_complete(self) + } + + fn set_timed_out(&mut self) { + CollectorType::set_timed_out(self) + } + + fn result(&mut self) -> Result, Error> { + match CollectorType::result(self) { + Ok(res) => Ok(Box::new(res)), + Err(e) => Err(e.into()), + } + } +} + pub struct BinsDim0Aggregator { range: NanoRange, count: u64, @@ -510,7 +602,7 @@ impl TimeBinner for BinsDim0TimeBinner { return; } if self.edges.len() < 2 { - warn!("TimeBinnerDyn for {self_name} no more bin in edges A"); + warn!("TimeBinnerDyn for {self_name} no more bin in edges A\n{:?}\n\n", item); return; } // TODO optimize by remembering at which event array index we have arrived. @@ -522,7 +614,7 @@ impl TimeBinner for BinsDim0TimeBinner { }) { self.cycle(); if self.edges.len() < 2 { - warn!("TimeBinnerDyn for {self_name} no more bin in edges B"); + warn!("TimeBinnerDyn for {self_name} no more bin in edges B\n{:?}\n\n", item); return; } } @@ -560,7 +652,7 @@ impl TimeBinner for BinsDim0TimeBinner { if item.ends_after(agg.range().clone()) { self.cycle(); if self.edges.len() < 2 { - warn!("TimeBinnerDyn for {self_name} no more bin in edges C"); + warn!("TimeBinnerDyn for {self_name} no more bin in edges C\n{:?}\n\n", item); return; } } else { @@ -692,3 +784,21 @@ impl TimeBinned for BinsDim0 { self } } + +impl items_0::AsAnyMut for BinsDim0 +where + NTY: 'static, +{ + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} + +impl items_0::collect_c::Collectable for BinsDim0 +where + NTY: ScalarOps, +{ + fn new_collector(&self) -> Box { + Box::new(BinsDim0Collector::::new()) + } +} diff --git a/items_2/src/binsxbindim0.rs b/items_2/src/binsxbindim0.rs new file mode 100644 index 0000000..301d247 --- /dev/null +++ b/items_2/src/binsxbindim0.rs @@ -0,0 +1,819 @@ +use crate::{ts_offs_from_abs, ts_offs_from_abs_with_anchor}; +use crate::{IsoDateTime, RangeOverlapInfo}; +use crate::{TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinner}; +use chrono::{TimeZone, Utc}; +use err::Error; +use items_0::collect_s::{Collectable, CollectableType, CollectorType, ToJsonResult}; +use items_0::scalar_ops::ScalarOps; +use items_0::AppendEmptyBin; +use items_0::Empty; +use items_0::TimeBinned; +use items_0::TimeBins; +use items_0::WithLen; +use netpod::log::*; +use netpod::timeunits::SEC; +use netpod::NanoRange; +use num_traits::Zero; +use serde::{Deserialize, Serialize}; +use std::any::Any; +use std::collections::VecDeque; +use std::{fmt, mem}; + +#[allow(unused)] +macro_rules! trace4 { + ($($arg:tt)*) => (); + ($($arg:tt)*) => (eprintln!($($arg)*)); +} + +#[derive(Clone, PartialEq, Serialize, Deserialize)] +pub struct BinsXbinDim0 { + ts1s: VecDeque, + ts2s: VecDeque, + counts: VecDeque, + mins: VecDeque, + maxs: VecDeque, + avgs: VecDeque, + // TODO could consider more variables: + // ts min/max, pulse min/max, avg of mins, avg of maxs, variances, etc... +} + +impl fmt::Debug for BinsXbinDim0 +where + NTY: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let self_name = std::any::type_name::(); + write!( + fmt, + "{self_name} count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}", + self.ts1s.len(), + self.ts1s.iter().map(|k| k / SEC).collect::>(), + self.ts2s.iter().map(|k| k / SEC).collect::>(), + self.counts, + self.mins, + self.maxs, + self.avgs, + ) + } +} + +impl BinsXbinDim0 { + pub fn from_content( + ts1s: VecDeque, + ts2s: VecDeque, + counts: VecDeque, + mins: VecDeque, + maxs: VecDeque, + avgs: VecDeque, + ) -> Self { + Self { + ts1s, + ts2s, + counts, + mins, + maxs, + avgs, + } + } + + pub fn push(&mut self, ts1: u64, ts2: u64, count: u64, min: NTY, max: NTY, avg: f32) { + self.ts1s.push_back(ts1); + self.ts2s.push_back(ts2); + self.counts.push_back(count); + self.mins.push_back(min); + self.maxs.push_back(max); + self.avgs.push_back(avg); + } + + pub fn append_zero(&mut self, beg: u64, end: u64) { + self.ts1s.push_back(beg); + self.ts2s.push_back(end); + self.counts.push_back(0); + self.mins.push_back(NTY::zero_b()); + self.maxs.push_back(NTY::zero_b()); + self.avgs.push_back(0.); + } + + pub fn append_all_from(&mut self, src: &mut Self) { + self.ts1s.extend(src.ts1s.drain(..)); + self.ts2s.extend(src.ts2s.drain(..)); + self.counts.extend(src.counts.drain(..)); + self.mins.extend(src.mins.drain(..)); + self.maxs.extend(src.maxs.drain(..)); + self.avgs.extend(src.avgs.drain(..)); + } + + pub fn equal_slack(&self, other: &Self) -> bool { + for (&a, &b) in self.ts1s.iter().zip(other.ts1s.iter()) { + if a != b { + return false; + } + } + for (&a, &b) in self.ts2s.iter().zip(other.ts2s.iter()) { + if a != b { + return false; + } + } + for (a, b) in self.mins.iter().zip(other.mins.iter()) { + if !a.equal_slack(b) { + return false; + } + } + for (a, b) in self.maxs.iter().zip(other.maxs.iter()) { + if !a.equal_slack(b) { + return false; + } + } + for (a, b) in self.avgs.iter().zip(other.avgs.iter()) { + if !a.equal_slack(b) { + return false; + } + } + true + } +} + +impl Empty for BinsXbinDim0 { + fn empty() -> Self { + Self { + ts1s: VecDeque::new(), + ts2s: VecDeque::new(), + counts: VecDeque::new(), + mins: VecDeque::new(), + maxs: VecDeque::new(), + avgs: VecDeque::new(), + } + } +} + +impl WithLen for BinsXbinDim0 { + fn len(&self) -> usize { + self.ts1s.len() + } +} + +impl RangeOverlapInfo for BinsXbinDim0 { + fn ends_before(&self, range: NanoRange) -> bool { + if let Some(&max) = self.ts2s.back() { + max <= range.beg + } else { + true + } + } + + fn ends_after(&self, range: NanoRange) -> bool { + if let Some(&max) = self.ts2s.back() { + max > range.end + } else { + true + } + } + + fn starts_after(&self, range: NanoRange) -> bool { + if let Some(&min) = self.ts1s.front() { + min >= range.end + } else { + true + } + } +} + +impl AppendEmptyBin for BinsXbinDim0 { + fn append_empty_bin(&mut self, ts1: u64, ts2: u64) { + self.ts1s.push_back(ts1); + self.ts2s.push_back(ts2); + self.counts.push_back(0); + self.mins.push_back(NTY::zero_b()); + self.maxs.push_back(NTY::zero_b()); + self.avgs.push_back(0.); + } +} + +impl TimeBins for BinsXbinDim0 { + fn ts_min(&self) -> Option { + self.ts1s.front().map(Clone::clone) + } + + fn ts_max(&self) -> Option { + self.ts2s.back().map(Clone::clone) + } + + fn ts_min_max(&self) -> Option<(u64, u64)> { + if let (Some(min), Some(max)) = (self.ts1s.front().map(Clone::clone), self.ts2s.back().map(Clone::clone)) { + Some((min, max)) + } else { + None + } + } +} + +impl TimeBinnableType for BinsXbinDim0 { + type Output = BinsXbinDim0; + type Aggregator = BinsXbinDim0Aggregator; + + fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { + let self_name = std::any::type_name::(); + debug!( + "TimeBinnableType for {self_name} aggregator() range {:?} x_bin_count {} do_time_weight {}", + range, x_bin_count, do_time_weight + ); + Self::Aggregator::new(range, do_time_weight) + } +} + +// TODO rename to BinsDim0CollectorOutput +#[derive(Debug, Serialize, Deserialize)] +pub struct BinsXbinDim0CollectedResult { + #[serde(rename = "tsAnchor")] + ts_anchor_sec: u64, + #[serde(rename = "ts1Ms")] + ts1_off_ms: VecDeque, + #[serde(rename = "ts2Ms")] + ts2_off_ms: VecDeque, + #[serde(rename = "ts1Ns")] + ts1_off_ns: VecDeque, + #[serde(rename = "ts2Ns")] + ts2_off_ns: VecDeque, + #[serde(rename = "counts")] + counts: VecDeque, + #[serde(rename = "mins")] + mins: VecDeque, + #[serde(rename = "maxs")] + maxs: VecDeque, + #[serde(rename = "avgs")] + avgs: VecDeque, + #[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")] + finalised_range: bool, + #[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")] + timed_out: bool, + #[serde(rename = "missingBins", default, skip_serializing_if = "Zero::is_zero")] + missing_bins: u32, + #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] + continue_at: Option, + #[serde(rename = "finishedAt", default, skip_serializing_if = "Option::is_none")] + finished_at: Option, +} + +impl items_0::AsAnyRef for BinsXbinDim0CollectedResult { + fn as_any_ref(&self) -> &dyn Any { + self + } +} + +impl items_0::collect_c::Collected for BinsXbinDim0CollectedResult {} + +impl BinsXbinDim0CollectedResult { + pub fn len(&self) -> usize { + self.ts1_off_ms.len() + } + + pub fn ts_anchor_sec(&self) -> u64 { + self.ts_anchor_sec + } + + pub fn ts1_off_ms(&self) -> &VecDeque { + &self.ts1_off_ms + } + + pub fn ts2_off_ms(&self) -> &VecDeque { + &self.ts2_off_ms + } + + pub fn counts(&self) -> &VecDeque { + &self.counts + } + + pub fn range_final(&self) -> bool { + self.finalised_range + } + + pub fn missing_bins(&self) -> u32 { + self.missing_bins + } + + pub fn continue_at(&self) -> Option { + self.continue_at.clone() + } + + pub fn mins(&self) -> &VecDeque { + &self.mins + } + + pub fn maxs(&self) -> &VecDeque { + &self.maxs + } +} + +impl ToJsonResult for BinsXbinDim0CollectedResult { + fn to_json_result(&self) -> Result, Error> { + let k = serde_json::to_value(self)?; + Ok(Box::new(k)) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +#[derive(Debug)] +pub struct BinsXbinDim0Collector { + timed_out: bool, + range_complete: bool, + vals: BinsXbinDim0, +} + +impl BinsXbinDim0Collector { + pub fn new() -> Self { + Self { + timed_out: false, + range_complete: false, + vals: BinsXbinDim0::::empty(), + } + } +} + +impl WithLen for BinsXbinDim0Collector { + fn len(&self) -> usize { + self.vals.ts1s.len() + } +} + +impl CollectorType for BinsXbinDim0Collector { + type Input = BinsXbinDim0; + type Output = BinsXbinDim0CollectedResult; + + fn ingest(&mut self, src: &mut Self::Input) { + trace!("\n\n----------- BinsXbinDim0Collector ingest\n{:?}\n\n", src); + // TODO could be optimized by non-contiguous container. + self.vals.ts1s.append(&mut src.ts1s); + self.vals.ts2s.append(&mut src.ts2s); + self.vals.counts.append(&mut src.counts); + self.vals.mins.append(&mut src.mins); + self.vals.maxs.append(&mut src.maxs); + self.vals.avgs.append(&mut src.avgs); + } + + fn set_range_complete(&mut self) { + self.range_complete = true; + } + + fn set_timed_out(&mut self) { + self.timed_out = true; + } + + fn result(&mut self) -> Result { + let bin_count_exp = 0; + let bin_count = self.vals.ts1s.len() as u32; + let (missing_bins, continue_at, finished_at) = if bin_count < bin_count_exp { + match self.vals.ts2s.back() { + Some(&k) => { + let missing_bins = bin_count_exp - bin_count; + let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64)); + let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64; + let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64)); + (missing_bins, Some(continue_at), Some(finished_at)) + } + None => Err(Error::with_msg("partial_content but no bin in result"))?, + } + } else { + (0, None, None) + }; + if self.vals.ts1s.as_slices().1.len() != 0 { + panic!(); + } + if self.vals.ts2s.as_slices().1.len() != 0 { + panic!(); + } + let tst1 = ts_offs_from_abs(self.vals.ts1s.as_slices().0); + let tst2 = ts_offs_from_abs_with_anchor(tst1.0, self.vals.ts2s.as_slices().0); + let counts = mem::replace(&mut self.vals.counts, VecDeque::new()); + let mins = mem::replace(&mut self.vals.mins, VecDeque::new()); + let maxs = mem::replace(&mut self.vals.maxs, VecDeque::new()); + let avgs = mem::replace(&mut self.vals.avgs, VecDeque::new()); + let ret = BinsXbinDim0CollectedResult:: { + ts_anchor_sec: tst1.0, + ts1_off_ms: tst1.1, + ts1_off_ns: tst1.2, + ts2_off_ms: tst2.0, + ts2_off_ns: tst2.1, + counts, + mins, + maxs, + avgs, + finalised_range: self.range_complete, + timed_out: self.timed_out, + missing_bins, + continue_at, + finished_at, + }; + Ok(ret) + } +} + +impl CollectableType for BinsXbinDim0 { + type Collector = BinsXbinDim0Collector; + + fn new_collector() -> Self::Collector { + Self::Collector::new() + } +} + +impl items_0::collect_c::Collector for BinsXbinDim0Collector +where + NTY: ScalarOps, +{ + fn len(&self) -> usize { + self.vals.len() + } + + fn ingest(&mut self, item: &mut dyn items_0::collect_c::Collectable) { + trace4!("\n\n••••••••••••••••••••••••••••\nINGEST\n{:?}\n\n", item); + { + { + //let tyid = item.type_id(); + //let tyid = item.as_any_mut().type_id(); + //let tyid = format!("{:?}", item.type_id().to_owned()); + trace4!("ty 0: {:40?}", (item.as_any_mut() as &dyn Any).type_id()); + } + trace4!("ty 1: {:40?}", std::any::TypeId::of::>()); + trace4!("ty 2: {:40?}", std::any::TypeId::of::>()); + trace4!("ty 3: {:40?}", std::any::TypeId::of::>>()); + trace4!( + "ty 4: {:?}", + std::any::TypeId::of::>() + ); + trace4!( + "ty 5: {:?}", + std::any::TypeId::of::<&mut dyn items_0::collect_c::Collectable>() + ); + trace4!("ty 6: {:?}", std::any::TypeId::of::>()); + } + if let Some(item) = item.as_any_mut().downcast_mut::>() { + trace4!("ingest plain"); + CollectorType::ingest(self, item) + } else if let Some(item) = item.as_any_mut().downcast_mut::>>() { + trace4!("ingest boxed"); + CollectorType::ingest(self, item) + } else if let Some(item) = item.as_any_mut().downcast_mut::>() { + trace4!("ingest boxed dyn TimeBinned"); + if let Some(item) = item.as_any_mut().downcast_mut::>() { + trace4!("ingest boxed dyn TimeBinned match"); + CollectorType::ingest(self, item) + } else { + warn!("BinsDim0Collector::ingest unexpected inner item"); + trace!("BinsDim0Collector::ingest unexpected inner item {:?}", item); + } + } else { + warn!("BinsDim0Collector::ingest unexpected item"); + trace!("BinsDim0Collector::ingest unexpected item {:?}", item); + } + } + + fn set_range_complete(&mut self) { + CollectorType::set_range_complete(self) + } + + fn set_timed_out(&mut self) { + CollectorType::set_timed_out(self) + } + + fn result(&mut self) -> Result, Error> { + match CollectorType::result(self) { + Ok(res) => Ok(Box::new(res)), + Err(e) => Err(e.into()), + } + } +} + +pub struct BinsXbinDim0Aggregator { + range: NanoRange, + count: u64, + min: NTY, + max: NTY, + // Carry over to next bin: + avg: f32, + sumc: u64, + sum: f32, +} + +impl BinsXbinDim0Aggregator { + pub fn new(range: NanoRange, _do_time_weight: bool) -> Self { + Self { + range, + count: 0, + min: NTY::zero_b(), + max: NTY::zero_b(), + avg: 0., + sumc: 0, + sum: 0f32, + } + } +} + +impl TimeBinnableTypeAggregator for BinsXbinDim0Aggregator { + type Input = BinsXbinDim0; + type Output = BinsXbinDim0; + + fn range(&self) -> &NanoRange { + &self.range + } + + fn ingest(&mut self, item: &Self::Input) { + for i1 in 0..item.ts1s.len() { + if item.counts[i1] == 0 { + } else if item.ts2s[i1] <= self.range.beg { + } else if item.ts1s[i1] >= self.range.end { + } else { + if self.count == 0 { + self.min = item.mins[i1].clone(); + self.max = item.maxs[i1].clone(); + } else { + if self.min > item.mins[i1] { + self.min = item.mins[i1].clone(); + } + if self.max < item.maxs[i1] { + self.max = item.maxs[i1].clone(); + } + } + self.count += item.counts[i1]; + self.sum += item.avgs[i1]; + self.sumc += 1; + } + } + } + + fn result_reset(&mut self, range: NanoRange, _expand: bool) -> Self::Output { + if self.sumc > 0 { + self.avg = self.sum / self.sumc as f32; + } + let ret = Self::Output { + ts1s: [self.range.beg].into(), + ts2s: [self.range.end].into(), + counts: [self.count].into(), + mins: [self.min.clone()].into(), + maxs: [self.max.clone()].into(), + avgs: [self.avg].into(), + }; + self.range = range; + self.count = 0; + self.sum = 0f32; + self.sumc = 0; + ret + } +} + +impl TimeBinnable for BinsXbinDim0 { + fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box { + let ret = BinsXbinDim0TimeBinner::::new(edges.into(), do_time_weight); + Box::new(ret) + } + + fn as_any(&self) -> &dyn Any { + self as &dyn Any + } + + fn to_box_to_json_result(&self) -> Box { + let k = serde_json::to_value(self).unwrap(); + Box::new(k) as _ + } +} + +pub struct BinsXbinDim0TimeBinner { + edges: VecDeque, + do_time_weight: bool, + agg: Option>, + ready: Option< as TimeBinnableTypeAggregator>::Output>, +} + +impl BinsXbinDim0TimeBinner { + fn new(edges: VecDeque, do_time_weight: bool) -> Self { + Self { + edges, + do_time_weight, + agg: None, + ready: None, + } + } + + fn next_bin_range(&mut self) -> Option { + if self.edges.len() >= 2 { + let ret = NanoRange { + beg: self.edges[0], + end: self.edges[1], + }; + self.edges.pop_front(); + Some(ret) + } else { + None + } + } +} + +impl TimeBinner for BinsXbinDim0TimeBinner { + fn ingest(&mut self, item: &dyn TimeBinnable) { + let self_name = std::any::type_name::(); + if item.len() == 0 { + // Return already here, RangeOverlapInfo would not give much sense. + return; + } + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for {self_name} no more bin in edges A\n{:?}\n\n", item); + return; + } + // TODO optimize by remembering at which event array index we have arrived. + // That needs modified interfaces which can take and yield the start and latest index. + loop { + while item.starts_after(NanoRange { + beg: 0, + end: self.edges[1], + }) { + self.cycle(); + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for {self_name} no more bin in edges B\n{:?}\n\n", item); + return; + } + } + if item.ends_before(NanoRange { + beg: self.edges[0], + end: u64::MAX, + }) { + return; + } else { + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for {self_name} edge list exhausted"); + return; + } else { + let agg = if let Some(agg) = self.agg.as_mut() { + agg + } else { + self.agg = Some(BinsXbinDim0Aggregator::new( + // We know here that we have enough edges for another bin. + // and `next_bin_range` will pop the first edge. + self.next_bin_range().unwrap(), + self.do_time_weight, + )); + self.agg.as_mut().unwrap() + }; + if let Some(item) = item + .as_any() + // TODO make statically sure that we attempt to cast to the correct type here: + .downcast_ref::< as TimeBinnableTypeAggregator>::Input>() + { + agg.ingest(item); + } else { + let tyid_item = std::any::Any::type_id(item.as_any()); + error!("not correct item type {:?}", tyid_item); + }; + if item.ends_after(agg.range().clone()) { + self.cycle(); + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for {self_name} no more bin in edges C\n{:?}\n\n", item); + return; + } + } else { + break; + } + } + } + } + } + + fn bins_ready_count(&self) -> usize { + match &self.ready { + Some(k) => k.len(), + None => 0, + } + } + + fn bins_ready(&mut self) -> Option> { + match self.ready.take() { + Some(k) => Some(Box::new(k)), + None => None, + } + } + + // TODO there is too much common code between implementors: + fn push_in_progress(&mut self, push_empty: bool) { + // TODO expand should be derived from AggKind. Is it still required after all? + let expand = true; + if let Some(agg) = self.agg.as_mut() { + let dummy_range = NanoRange { beg: 4, end: 5 }; + let mut bins = agg.result_reset(dummy_range, expand); + self.agg = None; + assert_eq!(bins.len(), 1); + if push_empty || bins.counts[0] != 0 { + match self.ready.as_mut() { + Some(ready) => { + ready.append_all_from(&mut bins); + } + None => { + self.ready = Some(bins); + } + } + } + } + } + + // TODO there is too much common code between implementors: + fn cycle(&mut self) { + let n = self.bins_ready_count(); + self.push_in_progress(true); + if self.bins_ready_count() == n { + if let Some(range) = self.next_bin_range() { + let mut bins = BinsXbinDim0::::empty(); + bins.append_zero(range.beg, range.end); + match self.ready.as_mut() { + Some(ready) => { + ready.append_all_from(&mut bins); + } + None => { + self.ready = Some(bins); + } + } + if self.bins_ready_count() <= n { + error!("failed to push a zero bin"); + } + } else { + warn!("cycle: no in-progress bin pushed, but also no more bin to add as zero-bin"); + } + } + } + + fn set_range_complete(&mut self) {} +} + +impl TimeBinned for BinsXbinDim0 { + fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable { + self as &dyn TimeBinnable + } + + fn edges_slice(&self) -> (&[u64], &[u64]) { + if self.ts1s.as_slices().1.len() != 0 { + panic!(); + } + if self.ts2s.as_slices().1.len() != 0 { + panic!(); + } + (&self.ts1s.as_slices().0, &self.ts2s.as_slices().0) + } + + fn counts(&self) -> &[u64] { + // TODO check for contiguous + self.counts.as_slices().0 + } + + // TODO is Vec needed? + fn mins(&self) -> Vec { + self.mins.iter().map(|x| x.clone().as_prim_f32_b()).collect() + } + + // TODO is Vec needed? + fn maxs(&self) -> Vec { + self.maxs.iter().map(|x| x.clone().as_prim_f32_b()).collect() + } + + // TODO is Vec needed? + fn avgs(&self) -> Vec { + self.avgs.iter().map(Clone::clone).collect() + } + + fn validate(&self) -> Result<(), String> { + use std::fmt::Write; + let mut msg = String::new(); + if self.ts1s.len() != self.ts2s.len() { + write!(&mut msg, "ts1s ≠ ts2s\n").unwrap(); + } + for (i, ((count, min), max)) in self.counts.iter().zip(&self.mins).zip(&self.maxs).enumerate() { + if min.as_prim_f32_b() < 1. && *count != 0 { + write!(&mut msg, "i {} count {} min {:?} max {:?}\n", i, count, min, max).unwrap(); + } + } + if msg.is_empty() { + Ok(()) + } else { + Err(msg) + } + } + + fn as_collectable_mut(&mut self) -> &mut dyn Collectable { + self + } +} + +impl items_0::AsAnyMut for BinsXbinDim0 +where + NTY: 'static, +{ + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} + +impl items_0::collect_c::Collectable for BinsXbinDim0 +where + NTY: ScalarOps, +{ + fn new_collector(&self) -> Box { + Box::new(BinsXbinDim0Collector::::new()) + } +} diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index e24eb29..7ca18b0 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -110,6 +110,10 @@ mod serde_channel_events { let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("obj .2"))?; Ok(Box::new(obj)) } + f64::SUB => { + let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("obj .2"))?; + Ok(Box::new(obj)) + } _ => Err(de::Error::custom(&format!("unknown nty {e1}"))), } } else { @@ -422,9 +426,6 @@ impl ChannelEventsCollector { } impl items_0::collect_c::Collector for ChannelEventsCollector { - type Input = ChannelEvents; - type Output = Box; - fn len(&self) -> usize { match &self.coll { Some(coll) => coll.len(), @@ -432,19 +433,23 @@ impl items_0::collect_c::Collector for ChannelEventsCollector { } } - fn ingest(&mut self, item: &mut Self::Input) { - match item { - ChannelEvents::Events(item) => { - if self.coll.is_none() { - let coll = item.as_ref().as_collectable_with_default_ref().new_collector(); - self.coll = Some(coll); + fn ingest(&mut self, item: &mut dyn items_0::collect_c::Collectable) { + if let Some(item) = item.as_any_mut().downcast_mut::() { + match item { + ChannelEvents::Events(item) => { + if self.coll.is_none() { + let coll = item.as_ref().as_collectable_with_default_ref().new_collector(); + self.coll = Some(coll); + } + let coll = self.coll.as_mut().unwrap(); + coll.ingest(item.as_collectable_with_default_mut()); + } + ChannelEvents::Status(_) => { + // TODO decide on output format to collect also the connection status events } - let coll = self.coll.as_mut().unwrap(); - coll.ingest(item.as_collectable_with_default_mut()); - } - ChannelEvents::Status(_) => { - // TODO decide on output format to collect also the connection status events } + } else { + error!("ChannelEventsCollector::ingest unexpected item {:?}", item); } } @@ -456,7 +461,7 @@ impl items_0::collect_c::Collector for ChannelEventsCollector { self.timed_out = true; } - fn result(&mut self) -> Result { + fn result(&mut self) -> Result, err::Error> { match self.coll.as_mut() { Some(coll) => { if self.range_complete { @@ -479,10 +484,14 @@ impl items_0::collect_c::Collector for ChannelEventsCollector { } } -impl items_0::collect_c::Collectable for ChannelEvents { - type Collector = ChannelEventsCollector; - - fn new_collector(&self) -> Self::Collector { - ChannelEventsCollector::new() +impl items_0::AsAnyMut for ChannelEvents { + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} + +impl items_0::collect_c::Collectable for ChannelEvents { + fn new_collector(&self) -> Box { + Box::new(ChannelEventsCollector::new()) } } diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 76afccb..c46eed9 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -1,5 +1,6 @@ use crate::binsdim0::BinsDim0; -use crate::{pulse_offs_from_abs, ts_offs_from_abs, RangeOverlapInfo}; +use crate::{pulse_offs_from_abs, ts_offs_from_abs}; +use crate::{IsoDateTime, RangeOverlapInfo}; use crate::{TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinner}; use err::Error; use items_0::scalar_ops::ScalarOps; @@ -108,7 +109,10 @@ impl RangeOverlapInfo for EventsDim0 { } } -impl TimeBinnableType for EventsDim0 { +impl TimeBinnableType for EventsDim0 +where + NTY: ScalarOps, +{ type Output = BinsDim0; type Aggregator = EventsDim0Aggregator; @@ -159,10 +163,12 @@ pub struct EventsDim0CollectorOutput { pulse_off: VecDeque, #[serde(rename = "values")] values: VecDeque, - #[serde(rename = "finalisedRange", default, skip_serializing_if = "crate::bool_is_false")] + #[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")] range_complete: bool, #[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")] timed_out: bool, + #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] + continue_at: Option, } impl EventsDim0CollectorOutput { @@ -206,8 +212,6 @@ impl items_0::AsAnyRef for EventsDim0CollectorOutput { } } -impl items_0::collect_c::Collected for EventsDim0CollectorOutput {} - impl items_0::collect_s::ToJsonResult for EventsDim0CollectorOutput { fn to_json_result(&self) -> Result, Error> { let k = serde_json::to_value(self)?; @@ -219,12 +223,13 @@ impl items_0::collect_s::ToJsonResult for EventsDim0CollectorOut } } +impl items_0::collect_c::Collected for EventsDim0CollectorOutput {} + impl items_0::collect_s::CollectorType for EventsDim0Collector { type Input = EventsDim0; type Output = EventsDim0CollectorOutput; fn ingest(&mut self, src: &mut Self::Input) { - // TODO could be optimized by non-contiguous container. self.vals.tss.append(&mut src.tss); self.vals.pulses.append(&mut src.pulses); self.vals.values.append(&mut src.values); @@ -239,18 +244,38 @@ impl items_0::collect_s::CollectorType for EventsDim0Collector Result { - // TODO require contiguous slices - let tst = ts_offs_from_abs(&self.vals.tss.as_slices().0); - let (pulse_anchor, pulse_off) = pulse_offs_from_abs(&self.vals.pulses.as_slices().0); + // If we timed out, we want to hint the client from where to continue. + // This is tricky: currently, client can not request a left-exclusive range. + // We currently give the timestamp of the last event plus a small delta. + // The amount of the delta must take into account what kind of timestamp precision the client + // can parse and handle. + let continue_at = if self.timed_out { + if let Some(ts) = self.vals.tss.back() { + Some(IsoDateTime::from_u64(*ts)) + } else { + // TODO tricky: should yield again the original range begin? Leads to recursion. + // Range begin plus delta? + // Anyway, we don't have the range begin here. + warn!("timed out without any result, can not yield a continue-at"); + None + } + } else { + None + }; + let tss_sl = self.vals.tss.make_contiguous(); + let pulses_sl = self.vals.pulses.make_contiguous(); + let (ts_anchor_sec, ts_off_ms, ts_off_ns) = ts_offs_from_abs(tss_sl); + let (pulse_anchor, pulse_off) = pulse_offs_from_abs(pulses_sl); let ret = Self::Output { - ts_anchor_sec: tst.0, - ts_off_ms: tst.1, - ts_off_ns: tst.2, + ts_anchor_sec, + ts_off_ms, + ts_off_ns, pulse_anchor, pulse_off: pulse_off, values: mem::replace(&mut self.vals.values, VecDeque::new()), range_complete: self.range_complete, timed_out: self.timed_out, + continue_at, }; Ok(ret) } @@ -265,15 +290,16 @@ impl items_0::collect_s::CollectableType for EventsDim0 { } impl items_0::collect_c::Collector for EventsDim0Collector { - type Input = EventsDim0; - type Output = EventsDim0CollectorOutput; - fn len(&self) -> usize { self.vals.len() } - fn ingest(&mut self, item: &mut Self::Input) { - items_0::collect_s::CollectorType::ingest(self, item) + fn ingest(&mut self, item: &mut dyn items_0::collect_c::Collectable) { + if let Some(item) = item.as_any_mut().downcast_mut::>() { + items_0::collect_s::CollectorType::ingest(self, item) + } else { + error!("EventsDim0Collector::ingest unexpected item {:?}", item); + } } fn set_range_complete(&mut self) { @@ -284,8 +310,11 @@ impl items_0::collect_c::Collector for EventsDim0Collector items_0::collect_s::CollectorType::set_timed_out(self) } - fn result(&mut self) -> Result { - items_0::collect_s::CollectorType::result(self).map_err(Into::into) + fn result(&mut self) -> Result, err::Error> { + match items_0::collect_s::CollectorType::result(self) { + Ok(x) => Ok(Box::new(x)), + Err(e) => Err(e.into()), + } } } @@ -888,6 +917,7 @@ impl TimeBinner for EventsDim0TimeBinner { } } +// TODO remove this struct? #[derive(Debug)] pub struct EventsDim0CollectorDyn {} @@ -903,7 +933,6 @@ impl items_0::collect_c::CollectorDyn for EventsDim0CollectorDyn { } fn ingest(&mut self, _item: &mut dyn items_0::collect_c::CollectableWithDefault) { - // TODO remove this struct? todo!() } @@ -961,10 +990,14 @@ impl items_0::collect_c::CollectableWithDefault for EventsDim0 items_0::collect_c::Collectable for EventsDim0 { - type Collector = EventsDim0Collector; - - fn new_collector(&self) -> Self::Collector { - EventsDim0Collector::new() +impl items_0::AsAnyMut for EventsDim0 { + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} + +impl items_0::collect_c::Collectable for EventsDim0 { + fn new_collector(&self) -> Box { + Box::new(EventsDim0Collector::::new()) } } diff --git a/items_2/src/eventsxbindim0.rs b/items_2/src/eventsxbindim0.rs new file mode 100644 index 0000000..7488c5a --- /dev/null +++ b/items_2/src/eventsxbindim0.rs @@ -0,0 +1,498 @@ +use crate::binsxbindim0::BinsXbinDim0; +use crate::RangeOverlapInfo; +use crate::{pulse_offs_from_abs, ts_offs_from_abs}; +use crate::{TimeBinnableType, TimeBinnableTypeAggregator}; +use err::Error; +use items_0::scalar_ops::ScalarOps; +use items_0::Empty; +use items_0::WithLen; +use netpod::log::*; +use netpod::NanoRange; +use serde::{Deserialize, Serialize}; +use std::any::Any; +use std::collections::VecDeque; +use std::fmt; + +#[derive(Clone, PartialEq, Serialize, Deserialize)] +pub struct EventsXbinDim0 { + pub tss: VecDeque, + pub pulses: VecDeque, + pub mins: VecDeque, + pub maxs: VecDeque, + pub avgs: VecDeque, + // TODO maybe add variance? +} + +impl EventsXbinDim0 { + #[inline(always)] + pub fn push(&mut self, ts: u64, pulse: u64, min: NTY, max: NTY, avg: f32) { + self.tss.push_back(ts); + self.pulses.push_back(pulse); + self.mins.push_back(min); + self.maxs.push_back(max); + self.avgs.push_back(avg); + } + + #[inline(always)] + pub fn push_front(&mut self, ts: u64, pulse: u64, min: NTY, max: NTY, avg: f32) { + self.tss.push_front(ts); + self.pulses.push_front(pulse); + self.mins.push_front(min); + self.maxs.push_front(max); + self.avgs.push_front(avg); + } +} + +impl fmt::Debug for EventsXbinDim0 +where + NTY: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("EventsXbinDim0") + .field("tss", &self.tss) + .field("pulses", &self.pulses) + .field("mins", &self.mins) + .field("maxs", &self.maxs) + .field("avgs", &self.avgs) + .finish() + } +} + +impl items::ByteEstimate for EventsXbinDim0 { + fn byte_estimate(&self) -> u64 { + todo!("byte_estimate") + } +} + +impl Empty for EventsXbinDim0 { + fn empty() -> Self { + Self { + tss: VecDeque::new(), + pulses: VecDeque::new(), + mins: VecDeque::new(), + maxs: VecDeque::new(), + avgs: VecDeque::new(), + } + } +} + +impl WithLen for EventsXbinDim0 { + fn len(&self) -> usize { + self.tss.len() + } +} + +impl RangeOverlapInfo for EventsXbinDim0 { + fn ends_before(&self, range: NanoRange) -> bool { + match self.tss.back() { + Some(&ts) => ts < range.beg, + None => true, + } + } + + fn ends_after(&self, range: NanoRange) -> bool { + match self.tss.back() { + Some(&ts) => ts >= range.end, + None => panic!(), + } + } + + fn starts_after(&self, range: NanoRange) -> bool { + match self.tss.front() { + Some(&ts) => ts >= range.end, + None => panic!(), + } + } +} + +impl TimeBinnableType for EventsXbinDim0 +where + NTY: ScalarOps, +{ + type Output = BinsXbinDim0; + type Aggregator = EventsXbinDim0Aggregator; + + fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { + let name = std::any::type_name::(); + debug!( + "TimeBinnableType for {} aggregator() range {:?} x_bin_count {} do_time_weight {}", + name, range, x_bin_count, do_time_weight + ); + Self::Aggregator::new(range, do_time_weight) + } +} + +pub struct EventsXbinDim0Aggregator +where + NTY: ScalarOps, +{ + range: NanoRange, + count: u64, + min: NTY, + max: NTY, + sumc: u64, + sum: f32, + int_ts: u64, + last_ts: u64, + last_avg: Option, + last_min: Option, + last_max: Option, + do_time_weight: bool, +} + +impl EventsXbinDim0Aggregator +where + NTY: ScalarOps, +{ + pub fn new(range: NanoRange, do_time_weight: bool) -> Self { + Self { + int_ts: range.beg, + range, + count: 0, + min: NTY::zero_b(), + max: NTY::zero_b(), + sumc: 0, + sum: 0f32, + last_ts: 0, + last_avg: None, + last_min: None, + last_max: None, + do_time_weight, + } + } + + fn apply_min_max(&mut self, min: NTY, max: NTY) { + if self.count == 0 { + self.min = min; + self.max = max; + } else { + if min < self.min { + self.min = min; + } + if max > self.max { + self.max = max; + } + } + } + + fn apply_event_unweight(&mut self, avg: f32, min: NTY, max: NTY) { + //debug!("apply_event_unweight"); + self.apply_min_max(min, max); + let vf = avg; + if vf.is_nan() { + } else { + self.sum += vf; + self.sumc += 1; + } + } + + fn apply_event_time_weight(&mut self, ts: u64) { + //debug!("apply_event_time_weight"); + if let (Some(avg), Some(min), Some(max)) = (self.last_avg, &self.last_min, &self.last_max) { + let min2 = min.clone(); + let max2 = max.clone(); + self.apply_min_max(min2, max2); + let w = (ts - self.int_ts) as f32 / self.range.delta() as f32; + if avg.is_nan() { + } else { + self.sum += avg * w; + } + self.sumc += 1; + self.int_ts = ts; + } + } + + fn ingest_unweight(&mut self, item: &EventsXbinDim0) { + for i1 in 0..item.tss.len() { + let ts = item.tss[i1]; + let avg = item.avgs[i1]; + let min = item.mins[i1].clone(); + let max = item.maxs[i1].clone(); + if ts < self.range.beg { + } else if ts >= self.range.end { + } else { + self.apply_event_unweight(avg, min, max); + self.count += 1; + } + } + } + + fn ingest_time_weight(&mut self, item: &EventsXbinDim0) { + for i1 in 0..item.tss.len() { + let ts = item.tss[i1]; + let avg = item.avgs[i1]; + let min = item.mins[i1].clone(); + let max = item.maxs[i1].clone(); + if ts < self.int_ts { + self.last_ts = ts; + self.last_avg = Some(avg); + self.last_min = Some(min); + self.last_max = Some(max); + } else if ts >= self.range.end { + return; + } else { + self.apply_event_time_weight(ts); + self.count += 1; + self.last_ts = ts; + self.last_avg = Some(avg); + self.last_min = Some(min); + self.last_max = Some(max); + } + } + } + + fn result_reset_unweight(&mut self, range: NanoRange, _expand: bool) -> BinsXbinDim0 { + let avg = if self.sumc == 0 { + 0f32 + } else { + self.sum / self.sumc as f32 + }; + let ret = BinsXbinDim0::from_content( + [self.range.beg].into(), + [self.range.end].into(), + [self.count].into(), + [self.min.clone()].into(), + [self.max.clone()].into(), + [avg].into(), + ); + self.int_ts = range.beg; + self.range = range; + self.count = 0; + self.min = NTY::zero_b(); + self.max = NTY::zero_b(); + self.sum = 0f32; + self.sumc = 0; + ret + } + + fn result_reset_time_weight(&mut self, range: NanoRange, expand: bool) -> BinsXbinDim0 { + // TODO check callsite for correct expand status. + if true || expand { + self.apply_event_time_weight(self.range.end); + } + let avg = { + let sc = self.range.delta() as f32 * 1e-9; + self.sum / sc + }; + let ret = BinsXbinDim0::from_content( + [self.range.beg].into(), + [self.range.end].into(), + [self.count].into(), + [self.min.clone()].into(), + [self.max.clone()].into(), + [avg].into(), + ); + self.int_ts = range.beg; + self.range = range; + self.count = 0; + self.min = NTY::zero_b(); + self.max = NTY::zero_b(); + self.sum = 0f32; + self.sumc = 0; + ret + } +} + +impl TimeBinnableTypeAggregator for EventsXbinDim0Aggregator +where + NTY: ScalarOps, +{ + type Input = EventsXbinDim0; + type Output = BinsXbinDim0; + + fn range(&self) -> &NanoRange { + &self.range + } + + fn ingest(&mut self, item: &Self::Input) { + debug!("ingest"); + if self.do_time_weight { + self.ingest_time_weight(item) + } else { + self.ingest_unweight(item) + } + } + + fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output { + if self.do_time_weight { + self.result_reset_time_weight(range, expand) + } else { + self.result_reset_unweight(range, expand) + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct EventsXbinDim0CollectorOutput { + #[serde(rename = "tsAnchor")] + ts_anchor_sec: u64, + #[serde(rename = "tsMs")] + ts_off_ms: VecDeque, + #[serde(rename = "tsNs")] + ts_off_ns: VecDeque, + #[serde(rename = "pulseAnchor")] + pulse_anchor: u64, + #[serde(rename = "pulseOff")] + pulse_off: VecDeque, + #[serde(rename = "mins")] + mins: VecDeque, + #[serde(rename = "maxs")] + maxs: VecDeque, + #[serde(rename = "avgs")] + avgs: VecDeque, + #[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")] + finalised_range: bool, + #[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")] + timed_out: bool, + // TODO add continue-at +} + +impl items_0::AsAnyRef for EventsXbinDim0CollectorOutput +where + NTY: ScalarOps, +{ + fn as_any_ref(&self) -> &dyn Any { + self + } +} + +impl items_0::collect_s::ToJsonResult for EventsXbinDim0CollectorOutput +where + NTY: ScalarOps, +{ + fn to_json_result(&self) -> Result, Error> { + let k = serde_json::to_value(self)?; + Ok(Box::new(k)) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl items_0::collect_c::Collected for EventsXbinDim0CollectorOutput where NTY: ScalarOps {} + +#[derive(Debug)] +pub struct EventsXbinDim0Collector { + vals: EventsXbinDim0, + finalised_range: bool, + timed_out: bool, +} + +impl EventsXbinDim0Collector { + pub fn new() -> Self { + Self { + finalised_range: false, + timed_out: false, + vals: EventsXbinDim0::empty(), + } + } +} + +impl WithLen for EventsXbinDim0Collector { + fn len(&self) -> usize { + self.vals.tss.len() + } +} + +impl items_0::collect_s::CollectorType for EventsXbinDim0Collector +where + NTY: ScalarOps, +{ + type Input = EventsXbinDim0; + type Output = EventsXbinDim0CollectorOutput; + + fn ingest(&mut self, src: &mut Self::Input) { + self.vals.tss.append(&mut src.tss); + self.vals.pulses.append(&mut src.pulses); + self.vals.mins.append(&mut src.mins); + self.vals.maxs.append(&mut src.maxs); + self.vals.avgs.append(&mut src.avgs); + } + + fn set_range_complete(&mut self) { + self.finalised_range = true; + } + + fn set_timed_out(&mut self) { + self.timed_out = true; + } + + fn result(&mut self) -> Result { + use std::mem::replace; + let mins = replace(&mut self.vals.mins, VecDeque::new()); + let maxs = replace(&mut self.vals.maxs, VecDeque::new()); + let avgs = replace(&mut self.vals.avgs, VecDeque::new()); + self.vals.tss.make_contiguous(); + self.vals.pulses.make_contiguous(); + let tst = ts_offs_from_abs(self.vals.tss.as_slices().0); + let (pulse_anchor, pulse_off) = pulse_offs_from_abs(&self.vals.pulses.as_slices().0); + let ret = Self::Output { + ts_anchor_sec: tst.0, + ts_off_ms: tst.1, + ts_off_ns: tst.2, + pulse_anchor, + pulse_off, + mins, + maxs, + avgs, + finalised_range: self.finalised_range, + timed_out: self.timed_out, + }; + Ok(ret) + } +} + +impl items_0::collect_s::CollectableType for EventsXbinDim0 +where + NTY: ScalarOps, +{ + type Collector = EventsXbinDim0Collector; + + fn new_collector() -> Self::Collector { + Self::Collector::new() + } +} + +impl items_0::AsAnyMut for EventsXbinDim0 +where + NTY: ScalarOps, +{ + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} + +impl items_0::collect_c::Collector for EventsXbinDim0Collector +where + NTY: ScalarOps, +{ + fn len(&self) -> usize { + todo!() + } + + fn ingest(&mut self, _item: &mut dyn items_0::collect_c::Collectable) { + todo!() + } + + fn set_range_complete(&mut self) { + todo!() + } + + fn set_timed_out(&mut self) { + todo!() + } + + fn result(&mut self) -> Result, Error> { + todo!() + } +} + +impl items_0::collect_c::Collectable for EventsXbinDim0 +where + NTY: ScalarOps, +{ + fn new_collector(&self) -> Box { + Box::new(::new_collector()) + } +} diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index c2ea8b7..d3e3e92 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -1,6 +1,8 @@ pub mod binsdim0; +pub mod binsxbindim0; pub mod channelevents; pub mod eventsdim0; +pub mod eventsxbindim0; pub mod merger; pub mod merger_cev; pub mod streams; @@ -138,6 +140,12 @@ impl serde::de::Error for Error { #[derive(Clone, Debug, PartialEq, Deserialize)] pub struct IsoDateTime(DateTime); +impl IsoDateTime { + pub fn from_u64(ts: u64) -> Self { + IsoDateTime(Utc.timestamp_nanos(ts as i64)) + } +} + impl Serialize for IsoDateTime { fn serialize(&self, serializer: S) -> Result where @@ -182,6 +190,7 @@ impl crate::merger::Mergeable for Box { } } +// TODO rename to `Typed` pub trait TimeBinnableType: Send + Unpin + RangeOverlapInfo { type Output: TimeBinnableType; type Aggregator: TimeBinnableTypeAggregator + Send + Unpin; diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 9ad22b2..d8223fd 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -1,8 +1,8 @@ +pub mod api4; pub mod histo; pub mod query; pub mod status; pub mod streamext; -pub mod api4; use crate::log::*; use bytes::Bytes; @@ -1399,9 +1399,9 @@ impl fmt::Debug for BinnedGridSpec { #[derive(Clone, Debug)] pub struct BinnedRange { - pub grid_spec: BinnedGridSpec, - pub offset: u64, - pub count: u64, + grid_spec: BinnedGridSpec, + offset: u64, + bin_count: u64, } impl BinnedRange { @@ -1435,7 +1435,7 @@ impl BinnedRange { let offset = ts1 / bl; let ret = Self { grid_spec, - count, + bin_count: count, offset, }; break Ok(ret); @@ -1444,6 +1444,14 @@ impl BinnedRange { } } + pub fn bin_count(&self) -> u64 { + self.bin_count + } + + pub fn grid_spec(&self) -> &BinnedGridSpec { + &self.grid_spec + } + pub fn get_range(&self, ix: u32) -> NanoRange { NanoRange { beg: (self.offset + ix as u64) * self.grid_spec.bin_t_len, @@ -1454,14 +1462,14 @@ impl BinnedRange { pub fn full_range(&self) -> NanoRange { NanoRange { beg: self.offset * self.grid_spec.bin_t_len, - end: (self.offset + self.count) * self.grid_spec.bin_t_len, + end: (self.offset + self.bin_count) * self.grid_spec.bin_t_len, } } pub fn edges(&self) -> Vec { let mut ret = Vec::new(); let mut t = self.offset * self.grid_spec.bin_t_len; - let end = (self.offset + self.count) * self.grid_spec.bin_t_len; + let end = (self.offset + self.bin_count) * self.grid_spec.bin_t_len; while t <= end { ret.push(t); t += self.grid_spec.bin_t_len; @@ -1470,6 +1478,36 @@ impl BinnedRange { } } +#[cfg(test)] +mod test_binned_range { + use super::*; + + #[test] + fn binned_range_00() { + let range = NanoRange { + beg: HOUR * 72, + end: HOUR * 73, + }; + let range = BinnedRange::covering_range(range, 10).unwrap(); + assert_eq!(range.bin_count(), 12); + assert_eq!(range.edges()[0], HOUR * 72); + assert_eq!(range.edges()[2], HOUR * 72 + MIN * 5 * 2); + } + + #[test] + fn binned_range_01() { + let range = NanoRange { + beg: MIN * 20 + SEC * 10, + end: HOUR * 10 + MIN * 20 + SEC * 30, + }; + let range = BinnedRange::covering_range(range, 10).unwrap(); + assert_eq!(range.bin_count(), 11); + assert_eq!(range.edges()[0], HOUR * 0); + assert_eq!(range.edges()[1], HOUR * 1); + assert_eq!(range.edges()[11], HOUR * 11); + } +} + #[derive(Clone, Serialize, Deserialize)] pub enum AggKind { EventBlobs, diff --git a/streams/src/collect.rs b/streams/src/collect.rs index 9d22589..a672e1b 100644 --- a/streams/src/collect.rs +++ b/streams/src/collect.rs @@ -1,7 +1,7 @@ use err::Error; use futures_util::{Stream, StreamExt}; use items::{RangeCompletableItem, Sitemty, StreamItem}; -use items_0::collect_c::{Collectable, Collector}; +use items_0::collect_c::Collectable; use netpod::log::*; use std::fmt; use std::time::{Duration, Instant}; @@ -29,14 +29,14 @@ pub async fn collect( stream: S, deadline: Instant, events_max: u64, -) -> Result<<::Collector as Collector>::Output, Error> +) -> Result, Error> where S: Stream> + Unpin, T: Collectable + fmt::Debug, { let span = tracing::span!(tracing::Level::TRACE, "collect"); let fut = async { - let mut collector: Option<::Collector> = None; + let mut collector: Option> = None; let mut stream = stream; let deadline = deadline.into(); let mut range_complete = false; diff --git a/streams/src/test/timebin.rs b/streams/src/test/timebin.rs index fb39974..a061447 100644 --- a/streams/src/test/timebin.rs +++ b/streams/src/test/timebin.rs @@ -2,6 +2,7 @@ use crate::test::runfut; use err::Error; use futures_util::{stream, StreamExt}; use items::{sitem_data, RangeCompletableItem, StreamItem}; +use items_0::Empty; use items_2::binsdim0::BinsDim0; use items_2::channelevents::{ChannelEvents, ConnStatus, ConnStatusEvent}; use items_2::testgen::make_some_boxed_d0_f32; diff --git a/streams/src/timebinnedjson.rs b/streams/src/timebinnedjson.rs index ae366fe..744b20a 100644 --- a/streams/src/timebinnedjson.rs +++ b/streams/src/timebinnedjson.rs @@ -3,24 +3,22 @@ use err::Error; use futures_util::StreamExt; #[allow(unused)] use netpod::log::*; -use netpod::Cluster; -use serde::Serialize; +use netpod::query::{BinnedQuery, RawEventsQuery}; +use netpod::{BinnedRange, Cluster}; use serde_json::Value as JsonValue; use std::time::{Duration, Instant}; -pub async fn timebinned_json(query: SER, cluster: &Cluster) -> Result -where - SER: Serialize, -{ - // TODO should be able to ask for data-events only, instead of mixed data and status events. - let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&query, cluster).await?; +pub async fn timebinned_json(query: &BinnedQuery, cluster: &Cluster) -> Result { + let binned_range = BinnedRange::covering_range(query.range().clone(), query.bin_count())?; + let events_max = 10000; + let do_time_weight = query.agg_kind().do_time_weighted(); + let deadline = Instant::now() + Duration::from_millis(7500); + let rawquery = RawEventsQuery::new(query.channel().clone(), query.range().clone(), query.agg_kind().clone()); + let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&rawquery, cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader: let stream = { items_2::merger::Merger::new(inps, 1) }; - let events_max = 10000; - let do_time_weight = true; - let deadline = Instant::now() + Duration::from_millis(7500); let stream = Box::pin(stream); - let stream = crate::timebin::TimeBinnedStream::new(stream, Vec::new(), do_time_weight, deadline); + let stream = crate::timebin::TimeBinnedStream::new(stream, binned_range.edges(), do_time_weight, deadline); if false { let mut stream = stream; let _: Option>> = stream.next().await;