diff --git a/daqbufp2/src/test/api1.rs b/daqbufp2/src/test/api1.rs index 6f96405..020ae02 100644 --- a/daqbufp2/src/test/api1.rs +++ b/daqbufp2/src/test/api1.rs @@ -1,5 +1,5 @@ -#[cfg(test)] mod api1_parse; +mod data_api_python; use crate::nodes::require_test_hosts_running; use crate::test::api1::api1_parse::Api1Frame; @@ -8,10 +8,14 @@ use futures_util::Future; use httpclient::http_post; use httpret::api1::Api1ScalarType; use netpod::log::*; -use netpod::query::api1::{Api1Query, Api1Range, ChannelTuple}; +use netpod::query::api1::Api1Query; +use netpod::query::api1::Api1Range; +use netpod::query::api1::ChannelTuple; use std::fmt; use url::Url; +const TEST_BACKEND: &str = "testbackend-00"; + fn testrun(fut: F) -> Result where F: Future>, @@ -52,7 +56,7 @@ fn events_f64_plain() -> Result<(), Error> { let accept = "application/octet-stream"; let range = Api1Range::new("1970-01-01T00:00:00Z".try_into()?, "1970-01-01T00:01:00Z".try_into()?)?; // TODO the channel list needs to get pre-processed to check for backend prefix! - let ch = ChannelTuple::new("test-disk-databuffer".into(), "scalar-i32-be".into()); + let ch = ChannelTuple::new(TEST_BACKEND.into(), "scalar-i32-be".into()); let qu = Api1Query::new(range, vec![ch]); let body = serde_json::to_string(&qu)?; let buf = http_post(url, accept, body.into()).await?; diff --git a/daqbufp2/src/test/api1/data_api_python.rs b/daqbufp2/src/test/api1/data_api_python.rs new file mode 100644 index 0000000..759ecdb --- /dev/null +++ b/daqbufp2/src/test/api1/data_api_python.rs @@ -0,0 +1,83 @@ +use crate::err::ErrConv; +use crate::nodes::require_test_hosts_running; +use chrono::Utc; +use err::Error; +use http::StatusCode; +use hyper::Body; +use netpod::log::*; +use netpod::range::evrange::NanoRange; +use netpod::Channel; +use netpod::Cluster; +use netpod::HostPort; +use netpod::APP_JSON; +use url::Url; + +const TEST_BACKEND: &str = "testbackend-00"; + +// Fetches all data, not streaming, meant for basic test cases that fit in memory. +async fn fetch_data_api_python_blob( + channels: Vec, + beg_date: &str, + end_date: &str, + cluster: &Cluster, +) -> Result, Error> { + let t1 = Utc::now(); + let node0 = &cluster.nodes[0]; + let beg_date = beg_date.parse()?; + let end_date = end_date.parse()?; + let _range = NanoRange::from_date_time(beg_date, end_date); + let start_date = beg_date.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(); + let end_date = end_date.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(); + let query = serde_json::json!({ + "range": { + "type": "date", + "startDate": start_date, + "endDate": end_date, + }, + "channels": channels.iter().map(|x| x.name()).collect::>(), + }); + let query_str = serde_json::to_string_pretty(&query)?; + let hp = HostPort::from_node(node0); + let url = Url::parse(&format!("http://{}:{}/api/1/query", hp.host, hp.port))?; + info!("http get {}", url); + let req = hyper::Request::builder() + .method(http::Method::POST) + .uri(url.to_string()) + .header(http::header::CONTENT_TYPE, APP_JSON) + //.header(http::header::ACCEPT, APP_JSON) + .body(Body::from(query_str)) + .ec()?; + let client = hyper::Client::new(); + let res = client.request(req).await.ec()?; + if res.status() != StatusCode::OK { + error!("client response {:?}", res); + return Err(Error::with_msg_no_trace(format!("bad result {res:?}"))); + } + let buf = hyper::body::to_bytes(res.into_body()).await.ec()?; + let t2 = chrono::Utc::now(); + let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; + // TODO add timeout + info!("time {} ms body len {}", ms, buf.len()); + Ok(buf.into()) +} + +#[test] +fn api3_hdf_dim0_00() -> Result<(), Error> { + let fut = async { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + let jsv = fetch_data_api_python_blob( + vec![Channel { + backend: TEST_BACKEND.into(), + name: "test-gen-i32-dim0-v00".into(), + series: None, + }], + "1970-01-01T00:20:04.000Z", + "1970-01-01T00:21:10.000Z", + cluster, + ) + .await?; + Ok(()) + }; + taskrun::run(fut) +} diff --git a/daqbufp2/src/test/api4/binnedjson.rs b/daqbufp2/src/test/api4/binnedjson.rs index 5adc76a..0d6fb1b 100644 --- a/daqbufp2/src/test/api4/binnedjson.rs +++ b/daqbufp2/src/test/api4/binnedjson.rs @@ -19,6 +19,8 @@ use query::api4::binned::BinnedQuery; use serde_json::Value as JsonValue; use url::Url; +const TEST_BACKEND: &str = "testbackend-00"; + pub fn make_query>( name: S, beg_date: &str, @@ -26,7 +28,7 @@ pub fn make_query>( bin_count_min: u32, ) -> Result { let channel = Channel { - backend: "test-inmem".into(), + backend: TEST_BACKEND.into(), name: name.into(), series: None, }; @@ -44,8 +46,9 @@ fn binned_d0_json_00() -> Result<(), Error> { let cluster = &rh.cluster; let jsv = get_binned_json( Channel { - backend: "test-disk-databuffer".into(), - name: "scalar-i32-be".into(), + backend: TEST_BACKEND.into(), + //name: "scalar-i32-be".into(), + name: "test-gen-i32-dim0-v01".into(), series: None, }, "1970-01-01T00:20:04.000Z", @@ -55,19 +58,36 @@ fn binned_d0_json_00() -> Result<(), Error> { ) .await?; debug!("Receveided a response json value: {jsv:?}"); - let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?; + let res: BinsDim0CollectedResult = serde_json::from_value(jsv)?; // inmem was meant just for functional test, ignores the requested time range - assert_eq!(res.ts_anchor_sec(), 1200); + assert_eq!(res.range_final(), true); assert_eq!(res.len(), 8); - assert_eq!(res.ts1_off_ms()[0], 0); - assert_eq!(res.ts2_off_ms()[0], 5000); - assert_eq!(res.counts()[0], 5); - assert_eq!(res.counts()[1], 10); - assert_eq!(res.counts()[7], 7); - assert_eq!(res.mins()[0], 2405); - assert_eq!(res.maxs()[0], 2409); - assert_eq!(res.mins()[1], 2410); - assert_eq!(res.maxs()[1], 2419); + assert_eq!(res.ts_anchor_sec(), 1200); + { + let a1: Vec<_> = res.ts1_off_ms().iter().map(|x| *x).collect(); + let a2: Vec<_> = (0..8).into_iter().map(|x| 5000 * x).collect(); + assert_eq!(a1, a2); + } + { + let a1: Vec<_> = res.ts2_off_ms().iter().map(|x| *x).collect(); + let a2: Vec<_> = (0..8).into_iter().map(|x| 5000 + 5000 * x).collect(); + assert_eq!(a1, a2); + } + { + let a1: Vec<_> = res.counts().iter().map(|x| *x).collect(); + let a2: Vec<_> = (0..8).into_iter().map(|_| 10).collect(); + assert_eq!(a1, a2); + } + { + let a1: Vec<_> = res.mins().iter().map(|x| *x).collect(); + let a2: Vec<_> = (0..8).into_iter().map(|x| 2400 + 10 * x).collect(); + assert_eq!(a1, a2); + } + { + let a1: Vec<_> = res.maxs().iter().map(|x| *x).collect(); + let a2: Vec<_> = (0..8).into_iter().map(|x| 2409 + 10 * x).collect(); + assert_eq!(a1, a2); + } Ok(()) }; taskrun::run(fut) @@ -80,8 +100,8 @@ fn binned_d0_json_01() -> Result<(), Error> { let cluster = &rh.cluster; let jsv = get_binned_json( Channel { - backend: "test-disk-databuffer".into(), - name: "scalar-i32-be".into(), + backend: TEST_BACKEND.into(), + name: "test-gen-i32-dim0-v01".into(), series: None, }, "1970-01-01T00:20:10.000Z", @@ -91,11 +111,37 @@ fn binned_d0_json_01() -> Result<(), Error> { ) .await?; debug!("Receveided a response json value: {jsv:?}"); - let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?; + let res: BinsDim0CollectedResult = serde_json::from_value(jsv)?; // inmem was meant just for functional test, ignores the requested time range - assert_eq!(res.ts_anchor_sec(), 1200); - assert_eq!(res.len(), 13); assert_eq!(res.range_final(), true); + assert_eq!(res.len(), 13); + assert_eq!(res.ts_anchor_sec(), 1200); + let nb = res.len(); + { + let a1: Vec<_> = res.ts1_off_ms().iter().map(|x| *x).collect(); + let a2: Vec<_> = (0..nb as _).into_iter().map(|x| 300 * 1000 * x).collect(); + assert_eq!(a1, a2); + } + { + let a1: Vec<_> = res.ts2_off_ms().iter().map(|x| *x).collect(); + let a2: Vec<_> = (0..nb as _).into_iter().map(|x| 300 * 1000 * (1 + x)).collect(); + assert_eq!(a1, a2); + } + { + let a1: Vec<_> = res.counts().iter().map(|x| *x).collect(); + let a2: Vec<_> = (0..nb as _).into_iter().map(|_| 600).collect(); + assert_eq!(a1, a2); + } + { + let a1: Vec<_> = res.mins().iter().map(|x| *x).collect(); + let a2: Vec<_> = (0..nb as _).into_iter().map(|x| 2400 + 600 * x).collect(); + assert_eq!(a1, a2); + } + { + let a1: Vec<_> = res.maxs().iter().map(|x| *x).collect(); + let a2: Vec<_> = (0..nb as _).into_iter().map(|x| 2999 + 600 * x).collect(); + assert_eq!(a1, a2); + } Ok(()) }; taskrun::run(fut) @@ -108,8 +154,8 @@ fn binned_d0_json_02() -> Result<(), Error> { let cluster = &rh.cluster; let jsv = get_binned_json( Channel { - backend: "test-disk-databuffer".into(), - name: "wave-f64-be-n21".into(), + backend: TEST_BACKEND.into(), + name: "test-gen-f64-dim1-v00".into(), series: None, }, "1970-01-01T00:20:10.000Z", @@ -119,7 +165,7 @@ fn binned_d0_json_02() -> Result<(), Error> { ) .await?; debug!("Receveided a response json value: {jsv:?}"); - let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?; + let res: BinsDim0CollectedResult = serde_json::from_value(jsv)?; // inmem was meant just for functional test, ignores the requested time range assert_eq!(res.ts_anchor_sec(), 1200); assert_eq!(res.len(), 13); @@ -136,7 +182,7 @@ fn binned_d0_json_03() -> Result<(), Error> { let cluster = &rh.cluster; let jsv = get_binned_json( Channel { - backend: "test-disk-databuffer".into(), + backend: TEST_BACKEND.into(), name: "wave-f64-be-n21".into(), series: None, }, @@ -148,7 +194,7 @@ fn binned_d0_json_03() -> Result<(), Error> { ) .await?; debug!("Receveided a response json value: {jsv:?}"); - let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?; + let res: BinsDim0CollectedResult = serde_json::from_value(jsv)?; // inmem was meant just for functional test, ignores the requested time range assert_eq!(res.ts_anchor_sec(), 1200); assert_eq!(res.len(), 4); @@ -168,7 +214,7 @@ fn binned_d0_json_04() -> Result<(), Error> { let cluster = &rh.cluster; let jsv = get_binned_json( Channel { - backend: "test-disk-databuffer".into(), + backend: TEST_BACKEND.into(), name: "const-regular-scalar-i32-be".into(), series: None, }, @@ -180,7 +226,7 @@ fn binned_d0_json_04() -> Result<(), Error> { ) .await?; debug!("Receveided a response json value: {jsv:?}"); - let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?; + let res: BinsDim0CollectedResult = serde_json::from_value(jsv)?; // inmem was meant just for functional test, ignores the requested time range assert_eq!(res.ts_anchor_sec(), 1200); assert_eq!(res.len(), 17); @@ -199,7 +245,7 @@ fn binned_d0_json_05() -> Result<(), Error> { let cluster = &rh.cluster; let jsv = get_binned_json( Channel { - backend: "test-disk-databuffer".into(), + backend: TEST_BACKEND.into(), name: "const-regular-scalar-i32-be".into(), series: None, }, @@ -211,7 +257,7 @@ fn binned_d0_json_05() -> Result<(), Error> { ) .await?; debug!("Receveided a response json value: {jsv:?}"); - let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?; + let res: BinsDim0CollectedResult = serde_json::from_value(jsv)?; // inmem was meant just for functional test, ignores the requested time range assert_eq!(res.ts_anchor_sec(), 0); // TODO make disk parse faster and avoid timeout @@ -230,7 +276,7 @@ fn binned_d0_json_06() -> Result<(), Error> { let cluster = &rh.cluster; let jsv = get_binned_json( Channel { - backend: "test-disk-databuffer".into(), + backend: TEST_BACKEND.into(), name: "const-regular-scalar-i32-be".into(), series: None, }, @@ -242,7 +288,7 @@ fn binned_d0_json_06() -> Result<(), Error> { ) .await?; debug!("Receveided a response json value: {jsv:?}"); - let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?; + let res: BinsDim0CollectedResult = serde_json::from_value(jsv)?; // inmem was meant just for functional test, ignores the requested time range assert_eq!(res.ts_anchor_sec(), 1210); assert_eq!(res.len(), 20); @@ -260,7 +306,7 @@ fn binned_d0_json_07() -> Result<(), Error> { let cluster = &rh.cluster; let jsv = get_binned_json( Channel { - backend: "test-disk-databuffer".into(), + backend: TEST_BACKEND.into(), name: "const-regular-scalar-i32-be".into(), series: None, }, @@ -272,7 +318,7 @@ fn binned_d0_json_07() -> Result<(), Error> { ) .await?; debug!("Receveided a response json value: {jsv:?}"); - let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?; + let res: BinsDim0CollectedResult = serde_json::from_value(jsv)?; // inmem was meant just for functional test, ignores the requested time range assert_eq!(res.ts_anchor_sec(), 1200); assert_eq!(res.len(), 11); @@ -300,6 +346,7 @@ fn binned_inmem_d0_json_00() -> Result<(), Error> { assert_eq!(res.ts_anchor_sec(), 1200); assert_eq!(res.len(), 14); assert_eq!(res.range_final(), true); + assert_eq!(res.timed_out(), false); { let v1: Vec<_> = res.counts().iter().map(|x| *x).collect(); assert_eq!(&v1, &[5; 14]); @@ -326,9 +373,10 @@ fn binned_inmem_d0_json_00() -> Result<(), Error> { } { let v1: Vec<_> = res.avgs().iter().map(|x| *x).collect(); - let v2: Vec<_> = (0..14).into_iter().map(|x| 1204 + 5 * x).collect(); - //assert_eq!(f32_cmp_near(res.avgs()[0], 42.0), true); - //assert_eq!(&v1, &v2); + let v2: Vec<_> = (0..14).into_iter().map(|x| 1202. + 5. * x as f32).collect(); + for (a, b) in v1.into_iter().zip(v2.into_iter()) { + assert_eq!(f32_cmp_near(a, b), true); + } } Ok(()) }; diff --git a/daqbufp2/src/test/api4/eventsjson.rs b/daqbufp2/src/test/api4/eventsjson.rs index 17d68d2..f266dcc 100644 --- a/daqbufp2/src/test/api4/eventsjson.rs +++ b/daqbufp2/src/test/api4/eventsjson.rs @@ -18,6 +18,8 @@ use query::api4::events::PlainEventsQuery; use serde_json::Value as JsonValue; use url::Url; +const BACKEND: &str = "testbackend-00"; + #[test] fn events_plain_json_00() -> Result<(), Error> { let fut = async { @@ -25,7 +27,7 @@ fn events_plain_json_00() -> Result<(), Error> { let cluster = &rh.cluster; let jsv = events_plain_json( Channel { - backend: "test-inmem".into(), + backend: BACKEND.into(), name: "inmem-d0-i32".into(), series: None, }, @@ -55,7 +57,7 @@ fn events_plain_json_01() -> Result<(), Error> { let cluster = &rh.cluster; let jsv = events_plain_json( Channel { - backend: "test-disk-databuffer".into(), + backend: BACKEND.into(), name: "scalar-i32-be".into(), series: None, }, @@ -83,7 +85,7 @@ fn events_plain_json_02_range_incomplete() -> Result<(), Error> { let cluster = &rh.cluster; let jsv = events_plain_json( Channel { - backend: "test-disk-databuffer".into(), + backend: BACKEND.into(), name: "scalar-i32-be".into(), series: None, }, diff --git a/daqbufp2/src/test/api4/pulseiddiff.rs b/daqbufp2/src/test/api4/pulseiddiff.rs index b22ba56..1295a4d 100644 --- a/daqbufp2/src/test/api4/pulseiddiff.rs +++ b/daqbufp2/src/test/api4/pulseiddiff.rs @@ -9,9 +9,11 @@ use netpod::range::evrange::NanoRange; use netpod::Channel; use query::api4::events::PlainEventsQuery; +const BACKEND: &str = "testbackend-00"; + pub fn make_query>(name: S, beg_date: &str, end_date: &str) -> Result { let channel = Channel { - backend: "test-inmem".into(), + backend: BACKEND.into(), name: name.into(), series: None, }; diff --git a/daqbufp2/src/test/binnedbinary.rs b/daqbufp2/src/test/binnedbinary.rs index 27f6d52..9092f6b 100644 --- a/daqbufp2/src/test/binnedbinary.rs +++ b/daqbufp2/src/test/binnedbinary.rs @@ -27,6 +27,8 @@ use streams::frames::inmem::InMemoryFrameAsyncReadStream; use tokio::io::AsyncRead; use url::Url; +const TEST_BACKEND: &str = "testbackend-00"; + #[test] fn get_binned_binary() { taskrun::run(get_binned_binary_inner()).unwrap(); @@ -105,7 +107,7 @@ where let node0 = &cluster.nodes[0]; let beg_date = beg_date.parse()?; let end_date = end_date.parse()?; - let channel_backend = "testbackend"; + let channel_backend = TEST_BACKEND; let perf_opts = PerfOpts::default(); let channel = Channel { backend: channel_backend.into(), diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs index 9e07fb9..8481883 100644 --- a/daqbufp2/src/test/events.rs +++ b/daqbufp2/src/test/events.rs @@ -27,10 +27,12 @@ use streams::frames::inmem::InMemoryFrameAsyncReadStream; use tokio::io::AsyncRead; use url::Url; +const TEST_BACKEND: &str = "testbackend-00"; + fn ch_adhoc(name: &str) -> Channel { Channel { series: None, - backend: "test-disk-databuffer".into(), + backend: TEST_BACKEND.into(), name: name.into(), } } @@ -38,7 +40,7 @@ fn ch_adhoc(name: &str) -> Channel { pub fn ch_gen(name: &str) -> Channel { Channel { series: None, - backend: "test-disk-databuffer".into(), + backend: TEST_BACKEND.into(), name: name.into(), } } @@ -78,7 +80,7 @@ async fn get_plain_events_binary( let node0 = &cluster.nodes[0]; let beg_date: DateTime = beg_date.parse()?; let end_date: DateTime = end_date.parse()?; - let channel_backend = "testbackend"; + let channel_backend = TEST_BACKEND; let perf_opts = PerfOpts::default(); let channel = Channel { backend: channel_backend.into(), diff --git a/daqbufp2/src/test/timeweightedjson.rs b/daqbufp2/src/test/timeweightedjson.rs index 1cd20ce..151f9d1 100644 --- a/daqbufp2/src/test/timeweightedjson.rs +++ b/daqbufp2/src/test/timeweightedjson.rs @@ -16,6 +16,8 @@ use query::api4::binned::BinnedQuery; use std::time::Duration; use url::Url; +const TEST_BACKEND: &str = "testbackend-00"; + #[test] fn time_weighted_json_03() -> Result<(), Error> { async fn inner() -> Result<(), Error> { @@ -103,7 +105,7 @@ async fn get_json_common( let node0 = &cluster.nodes[0]; let beg_date: DateTime = beg_date.parse()?; let end_date: DateTime = end_date.parse()?; - let channel_backend = "testbackend"; + let channel_backend = TEST_BACKEND; let channel = Channel { backend: channel_backend.into(), name: channel_name.into(), diff --git a/disk/src/channelconfig.rs b/disk/src/channelconfig.rs index 3514182..214c4bb 100644 --- a/disk/src/channelconfig.rs +++ b/disk/src/channelconfig.rs @@ -1,3 +1,4 @@ +use crate::SfDbChConf; use err::Error; use netpod::range::evrange::NanoRange; use netpod::Channel; @@ -7,10 +8,8 @@ use parse::channelconfig::read_local_config; use parse::channelconfig::ChannelConfigs; use parse::channelconfig::MatchingConfigEntry; -use crate::SfDbChConf; - pub async fn config(range: NanoRange, channel: Channel, node_config: &NodeConfigCached) -> Result { - let channel_configs = read_local_config(channel.clone(), node_config.node.clone()).await?; + let channel_configs = read_local_config(channel.clone(), node_config.clone()).await?; let entry_res = match extract_matching_config_entry(&range, &channel_configs) { Ok(k) => k, Err(e) => return Err(e)?, @@ -48,5 +47,5 @@ pub async fn config(range: NanoRange, channel: Channel, node_config: &NodeConfig } pub async fn configs(channel: Channel, node_config: &NodeConfigCached) -> Result { - read_local_config(channel.clone(), node_config.node.clone()).await + read_local_config(channel.clone(), node_config.clone()).await } diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index cecdf0e..38e0de6 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -19,6 +19,8 @@ use tokio::io::AsyncSeekExt; use tokio::io::ErrorKind; use tokio::io::SeekFrom; +const BACKEND: &str = "testbackend-00"; + pub struct Positioned { pub file: OpenedFile, pub found: bool, @@ -821,7 +823,7 @@ mod test { end: DAY + HOUR * 8, }; let chn = netpod::Channel { - backend: "test-disk-databuffer".into(), + backend: BACKEND.into(), name: "scalar-i32-be".into(), series: None, }; diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 9a092e3..b2570f9 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -289,9 +289,11 @@ mod test { use netpod::TsNano; use streams::rangefilter2::RangeFilter2; + const BACKEND: &str = "testbackend-00"; + fn read_expanded_for_range(range: NanoRange, nodeix: usize) -> Result<(usize, Vec), Error> { let chn = netpod::Channel { - backend: "test-disk-databuffer".into(), + backend: BACKEND.into(), name: "scalar-i32-be".into(), series: None, }; diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index a5b0755..9a18fd8 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -575,11 +575,13 @@ mod test { //use netpod::timeunits::*; //use netpod::{ByteSize, Nanos}; + //const TEST_BACKEND: &str = "testbackend-00"; + /* #[test] fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, usize), Error> { let chn = netpod::Channel { - backend: "testbackend".into(), + backend: TEST_BACKEND.into(), name: "scalar-i32-be".into(), }; // TODO read config from disk. diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 03b4373..51ee333 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -20,8 +20,10 @@ use tokio::fs::File; use tokio::fs::OpenOptions; use tokio::io::AsyncWriteExt; +const BACKEND: &str = "testbackend-00"; + pub async fn gen_test_data() -> Result<(), Error> { - let backend = String::from("test-disk-databuffer"); + let backend = String::from(BACKEND); let homedir = std::env::var("HOME").unwrap(); let data_base_path = PathBuf::from(homedir).join("daqbuffer-testdata").join("databuffer"); let ksprefix = String::from("ks"); diff --git a/disk/src/raw.rs b/disk/src/raw.rs index 80ac2a1..256c634 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -1 +1,2 @@ pub mod conn; +pub mod generated; diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 776e764..0c7a281 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -1,5 +1,7 @@ use crate::eventblobs::EventChunkerMultifile; use crate::eventchunker::EventChunkerConf; +use crate::raw::generated::EventBlobsGeneratorI32Test00; +use crate::raw::generated::EventBlobsGeneratorI32Test01; use crate::SfDbChConf; use err::Error; use futures_util::stream; @@ -25,6 +27,8 @@ use parse::channelconfig::MatchingConfigEntry; use query::api4::events::PlainEventsQuery; use std::pin::Pin; +const TEST_BACKEND: &str = "testbackend-00"; + fn make_num_pipeline_stream_evs( chconf: ChConf, agg_kind: AggKind, @@ -130,7 +134,7 @@ pub async fn get_applicable_entry( node_config: &NodeConfigCached, ) -> Result { info!("---------- disk::raw::conn::get_applicable_entry"); - let channel_config = read_local_config(channel.clone(), node_config.node.clone()).await?; + let channel_config = read_local_config(channel.clone(), node_config.clone()).await?; let entry_res = match extract_matching_config_entry(range, &channel_config) { Ok(k) => k, Err(e) => return Err(e)?, @@ -248,11 +252,10 @@ pub fn make_remote_event_blobs_stream( Ok(event_blobs) } -pub async fn make_event_blobs_pipe( +pub async fn make_event_blobs_pipe_real( evq: &PlainEventsQuery, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { - info!("make_event_blobs_pipe {evq:?}"); if false { match dbconn::channel_exists(evq.channel(), &node_config).await { Ok(_) => (), @@ -312,3 +315,58 @@ pub async fn make_event_blobs_pipe( }; Ok(pipe) } + +pub async fn make_event_blobs_pipe_test( + evq: &PlainEventsQuery, + node_config: &NodeConfigCached, +) -> Result> + Send>>, Error> { + warn!("GENERATE INMEM TEST DATA"); + let node_count = node_config.node_config.cluster.nodes.len() as u64; + let node_ix = node_config.ix as u64; + let chn = evq.channel().name(); + let range = evq.range().clone(); + if chn == "test-gen-i32-dim0-v00" { + Ok(Box::pin(EventBlobsGeneratorI32Test00::new(node_ix, node_count, range))) + } else if chn == "test-gen-i32-dim0-v01" { + Ok(Box::pin(EventBlobsGeneratorI32Test01::new(node_ix, node_count, range))) + } else { + let na: Vec<_> = chn.split("-").collect(); + if na.len() != 3 { + Err(Error::with_msg_no_trace(format!( + "can not understand test channel name: {chn:?}" + ))) + } else { + if na[0] != "inmem" { + Err(Error::with_msg_no_trace(format!( + "can not understand test channel name: {chn:?}" + ))) + } else { + if na[1] == "d0" { + if na[2] == "i32" { + Ok(Box::pin(EventBlobsGeneratorI32Test00::new(node_ix, node_count, range))) + } else { + Err(Error::with_msg_no_trace(format!( + "can not understand test channel name: {chn:?}" + ))) + } + } else { + Err(Error::with_msg_no_trace(format!( + "can not understand test channel name: {chn:?}" + ))) + } + } + } + } +} + +pub async fn make_event_blobs_pipe( + evq: &PlainEventsQuery, + node_config: &NodeConfigCached, +) -> Result> + Send>>, Error> { + info!("make_event_blobs_pipe {evq:?}"); + if evq.channel().backend() == TEST_BACKEND { + make_event_blobs_pipe_test(evq, node_config).await + } else { + make_event_blobs_pipe_real(evq, node_config).await + } +} diff --git a/disk/src/raw/generated.rs b/disk/src/raw/generated.rs new file mode 100644 index 0000000..e79c923 --- /dev/null +++ b/disk/src/raw/generated.rs @@ -0,0 +1,223 @@ +use futures_util::Future; +use futures_util::FutureExt; +use futures_util::Stream; +use items_0::container::ByteEstimate; +use items_0::streamitem::sitem_data; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; +use items_0::Empty; +use items_2::eventfull::EventFull; +use netpod::range::evrange::SeriesRange; +use netpod::timeunits::MS; +use netpod::ScalarType; +use netpod::Shape; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use std::time::Duration; + +pub trait TypedGenerator { + type RustScalar; +} + +pub struct EventBlobsGeneratorI32Test00 { + ts: u64, + dts: u64, + tsend: u64, + #[allow(unused)] + c1: u64, + scalar_type: ScalarType, + be: bool, + shape: Shape, + timeout: Option + Send>>>, + done: bool, + done_range_final: bool, +} + +impl TypedGenerator for EventBlobsGeneratorI32Test00 { + type RustScalar = i32; +} + +impl EventBlobsGeneratorI32Test00 { + pub fn new(node_ix: u64, node_count: u64, range: SeriesRange) -> Self { + let range = match range { + SeriesRange::TimeRange(k) => k, + SeriesRange::PulseRange(_) => todo!(), + }; + let dts = MS * 1000 * node_count as u64; + let ts = (range.beg / dts + node_ix) * dts; + let tsend = range.end; + Self { + ts, + dts, + tsend, + c1: 0, + scalar_type: ScalarType::I32, + be: true, + shape: Shape::Scalar, + timeout: None, + done: false, + done_range_final: false, + } + } + + fn make_batch(&mut self) -> Sitemty { + // TODO should not repeat self type name + type T = ::RustScalar; + let mut item = EventFull::empty(); + let mut ts = self.ts; + loop { + if self.ts >= self.tsend || item.byte_estimate() > 200 { + break; + } + let pulse = ts; + let value = (ts / (MS * 100) % 1000) as T; + item.add_event( + ts, + pulse, + Some(value.to_be_bytes().to_vec()), + None, + self.scalar_type.clone(), + self.be, + self.shape.clone(), + None, + ); + ts += self.dts; + } + self.ts = ts; + let w = sitem_data(item); + w + } +} + +impl Stream for EventBlobsGeneratorI32Test00 { + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break if self.done_range_final { + Ready(None) + } else if self.ts >= self.tsend { + self.done = true; + self.done_range_final = true; + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } else if false { + // To use the generator without throttling, use this scope + Ready(Some(self.make_batch())) + } else if let Some(fut) = self.timeout.as_mut() { + match fut.poll_unpin(cx) { + Ready(()) => { + self.timeout = None; + Ready(Some(self.make_batch())) + } + Pending => Pending, + } + } else { + self.timeout = Some(Box::pin(tokio::time::sleep(Duration::from_millis(2)))); + continue; + }; + } + } +} + +pub struct EventBlobsGeneratorI32Test01 { + ts: u64, + dts: u64, + tsend: u64, + #[allow(unused)] + c1: u64, + scalar_type: ScalarType, + be: bool, + shape: Shape, + timeout: Option + Send>>>, + done: bool, + done_range_final: bool, +} + +impl TypedGenerator for EventBlobsGeneratorI32Test01 { + type RustScalar = i32; +} + +impl EventBlobsGeneratorI32Test01 { + pub fn new(node_ix: u64, node_count: u64, range: SeriesRange) -> Self { + let range = match range { + SeriesRange::TimeRange(k) => k, + SeriesRange::PulseRange(_) => todo!(), + }; + let dts = MS * 500 * node_count as u64; + let ts = (range.beg / dts + node_ix) * dts; + let tsend = range.end; + Self { + ts, + dts, + tsend, + c1: 0, + scalar_type: ScalarType::I32, + be: true, + shape: Shape::Scalar, + timeout: None, + done: false, + done_range_final: false, + } + } + + fn make_batch(&mut self) -> Sitemty { + type T = i32; + let mut item = EventFull::empty(); + let mut ts = self.ts; + loop { + if self.ts >= self.tsend || item.byte_estimate() > 400 { + break; + } + let pulse = ts; + let value = (ts / self.dts) as T; + item.add_event( + ts, + pulse, + Some(value.to_be_bytes().to_vec()), + None, + self.scalar_type.clone(), + self.be, + self.shape.clone(), + None, + ); + ts += self.dts; + } + self.ts = ts; + let w = sitem_data(item); + w + } +} + +impl Stream for EventBlobsGeneratorI32Test01 { + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break if self.done_range_final { + Ready(None) + } else if self.ts >= self.tsend { + self.done = true; + self.done_range_final = true; + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } else if false { + // To use the generator without throttling, use this scope + Ready(Some(self.make_batch())) + } else if let Some(fut) = self.timeout.as_mut() { + match fut.poll_unpin(cx) { + Ready(()) => { + self.timeout = None; + Ready(Some(self.make_batch())) + } + Pending => Pending, + } + } else { + self.timeout = Some(Box::pin(tokio::time::sleep(Duration::from_millis(2)))); + continue; + }; + } + } +} diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 8fcbabc..20cec69 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -925,9 +925,8 @@ impl Stream for DataApiPython3DataStream { } else { let channel = self.channels[self.chan_ix].clone(); self.chan_ix += 1; - self.config_fut = Some(Box::pin( - read_local_config(channel.clone(), self.node_config.node.clone()).map_err(Error::from), - )); + let fut = read_local_config(channel.clone(), self.node_config.clone()).map_err(Error::from); + self.config_fut = Some(Box::pin(fut)); continue; } } diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index a68c991..32b40ac 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -115,7 +115,7 @@ impl ChannelConfigHandler { } else if let Some(_) = &node_config.node.archiver_appliance { return Err(Error::with_msg_no_trace("no archapp")); } else { - parse::channelconfig::channel_config(&q, &node_config.node).await? + parse::channelconfig::channel_config(&q, node_config).await? }; let ret = response(StatusCode::OK) .header(http::header::CONTENT_TYPE, APP_JSON) diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 5f73f9a..f5129ad 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -630,11 +630,11 @@ mod instant_serde { match res { LocalResult::None => Err(serde::ser::Error::custom(format!("Bad local instant conversion"))), LocalResult::Single(dt) => { - let s = dt.format("%Y-%m-%dT%H:%M:%S%.3f").to_string(); + let s = dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(); ser.serialize_str(&s) } LocalResult::Ambiguous(dt, _dt2) => { - let s = dt.format("%Y-%m-%dT%H:%M:%S%.3f").to_string(); + let s = dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(); ser.serialize_str(&s) } } diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs index 6677c47..f56da26 100644 --- a/items_0/src/items_0.rs +++ b/items_0/src/items_0.rs @@ -2,6 +2,7 @@ pub mod collect_s; pub mod container; pub mod framable; pub mod isodate; +pub mod overlap; pub mod scalar_ops; pub mod streamitem; pub mod subfr; @@ -26,12 +27,6 @@ pub trait WithLen { fn len(&self) -> usize; } -pub trait RangeOverlapInfo { - fn ends_before(&self, range: &SeriesRange) -> bool; - fn ends_after(&self, range: &SeriesRange) -> bool; - fn starts_after(&self, range: &SeriesRange) -> bool; -} - pub trait Empty { fn empty() -> Self; } diff --git a/items_0/src/overlap.rs b/items_0/src/overlap.rs new file mode 100644 index 0000000..24413db --- /dev/null +++ b/items_0/src/overlap.rs @@ -0,0 +1,156 @@ +use netpod::log::*; +use netpod::range::evrange::SeriesRange; + +pub trait HasTimestampDeque { + fn timestamp_min(&self) -> Option; + fn timestamp_max(&self) -> Option; + fn pulse_min(&self) -> Option; + fn pulse_max(&self) -> Option; +} + +pub trait RangeOverlapCmp { + fn range_overlap_cmp_beg(a: u64, b: u64) -> bool; + fn range_overlap_cmp_end(a: u64, b: u64) -> bool; +} + +pub trait RangeOverlapInfo { + fn ends_before(&self, range: &SeriesRange) -> bool; + fn ends_after(&self, range: &SeriesRange) -> bool; + fn starts_after(&self, range: &SeriesRange) -> bool; +} + +#[macro_export] +macro_rules! impl_range_overlap_info_events { + ($ty:ident) => { + impl RangeOverlapInfo for $ty + where + STY: ScalarOps, + { + fn ends_before(&self, range: &SeriesRange) -> bool { + if range.is_time() { + if let Some(max) = HasTimestampDeque::timestamp_max(self) { + max < range.beg_u64() + //::range_overlap_cmp_beg(max, range.beg_u64()) + } else { + true + } + } else if range.is_pulse() { + if let Some(max) = HasTimestampDeque::pulse_max(self) { + max < range.beg_u64() + } else { + true + } + } else { + error!("unexpected"); + true + } + } + + fn ends_after(&self, range: &SeriesRange) -> bool { + if range.is_time() { + if let Some(max) = HasTimestampDeque::timestamp_max(self) { + max >= range.beg_u64() + } else { + true + } + } else if range.is_pulse() { + if let Some(max) = HasTimestampDeque::pulse_max(self) { + max >= range.beg_u64() + } else { + true + } + } else { + error!("unexpected"); + false + } + } + + fn starts_after(&self, range: &SeriesRange) -> bool { + if range.is_time() { + if let Some(min) = HasTimestampDeque::timestamp_min(self) { + min >= range.end_u64() + } else { + true + } + } else if range.is_pulse() { + if let Some(min) = HasTimestampDeque::pulse_min(self) { + min >= range.end_u64() + } else { + true + } + } else { + error!("unexpected"); + true + } + } + } + }; +} + +#[macro_export] +macro_rules! impl_range_overlap_info_bins { + ($ty:ident) => { + impl RangeOverlapInfo for $ty + where + STY: ScalarOps, + { + fn ends_before(&self, range: &SeriesRange) -> bool { + if range.is_time() { + if let Some(&max) = self.ts2s.back() { + max <= range.beg_u64() + } else { + true + } + } else if range.is_pulse() { + // TODO for the time being, the ts represent either ts or pulse + if let Some(&max) = self.ts2s.back() { + max <= range.beg_u64() + } else { + true + } + } else { + error!("unexpected"); + true + } + } + + fn ends_after(&self, range: &SeriesRange) -> bool { + if range.is_time() { + if let Some(&max) = self.ts2s.back() { + max > range.end_u64() + } else { + true + } + } else if range.is_pulse() { + if let Some(&max) = self.ts2s.back() { + max > range.end_u64() + } else { + true + } + } else { + error!("unexpected"); + false + } + } + + fn starts_after(&self, range: &SeriesRange) -> bool { + if range.is_time() { + if let Some(&min) = self.ts1s.front() { + min >= range.end_u64() + } else { + true + } + } else if range.is_pulse() { + if let Some(&min) = self.ts1s.front() { + min >= range.end_u64() + } else { + true + } + } else { + error!("unexpected"); + true + } + } + } + }; +} diff --git a/items_0/src/scalar_ops.rs b/items_0/src/scalar_ops.rs index c071f0e..50e2db2 100644 --- a/items_0/src/scalar_ops.rs +++ b/items_0/src/scalar_ops.rs @@ -1,6 +1,7 @@ use crate::subfr::SubFrId; use serde::Serialize; use std::fmt; +use std::ops; #[allow(unused)] const fn is_nan_int(_x: &T) -> bool { @@ -64,10 +65,15 @@ pub trait ScalarOps: { fn zero_b() -> Self; fn equal_slack(&self, rhs: &Self) -> bool; + fn add(&mut self, rhs: &Self); + fn div(&mut self, n: usize); + fn find_vec_min(a: &Vec) -> Option; + fn find_vec_max(a: &Vec) -> Option; + fn avg_vec(a: &Vec) -> Option; } macro_rules! impl_scalar_ops { - ($ty:ident, $zero:expr, $equal_slack:ident) => { + ($ty:ident, $zero:expr, $equal_slack:ident, $mac_add:ident, $mac_div:ident) => { impl ScalarOps for $ty { fn zero_b() -> Self { $zero @@ -76,6 +82,57 @@ macro_rules! impl_scalar_ops { fn equal_slack(&self, rhs: &Self) -> bool { $equal_slack(self, rhs) } + + fn add(&mut self, rhs: &Self) { + $mac_add!(self, rhs); + } + + fn div(&mut self, n: usize) { + $mac_div!(self, n); + } + + fn find_vec_min(a: &Vec) -> Option { + if a.len() == 0 { + None + } else { + let mut k = &a[0]; + for (i, v) in a.iter().enumerate() { + if *v < *k { + k = &a[i]; + } + } + Some(k.clone()) + } + } + + fn find_vec_max(a: &Vec) -> Option { + if a.len() == 0 { + None + } else { + let mut k = &a[0]; + for (i, v) in a.iter().enumerate() { + if *v > *k { + k = &a[i]; + } + } + Some(k.clone()) + } + } + + fn avg_vec(a: &Vec) -> Option { + if a.len() == 0 { + None + } else { + let mut sum = Self::zero_b(); + let mut c = 0; + for v in a.iter() { + sum.add(v); + c += 1; + } + ScalarOps::div(&mut sum, c); + Some(sum) + } + } } }; } @@ -100,15 +157,58 @@ fn equal_string(a: &String, b: &String) -> bool { a == b } -impl_scalar_ops!(u8, 0, equal_int); -impl_scalar_ops!(u16, 0, equal_int); -impl_scalar_ops!(u32, 0, equal_int); -impl_scalar_ops!(u64, 0, equal_int); -impl_scalar_ops!(i8, 0, equal_int); -impl_scalar_ops!(i16, 0, equal_int); -impl_scalar_ops!(i32, 0, equal_int); -impl_scalar_ops!(i64, 0, equal_int); -impl_scalar_ops!(f32, 0., equal_f32); -impl_scalar_ops!(f64, 0., equal_f64); -impl_scalar_ops!(bool, false, equal_bool); -impl_scalar_ops!(String, String::new(), equal_string); +fn add_int(a: &mut T, b: &T) { + ops::AddAssign::add_assign(a, todo!()); +} + +macro_rules! add_int { + ($a:expr, $b:expr) => { + *$a += $b; + }; +} + +macro_rules! add_bool { + ($a:expr, $b:expr) => { + *$a |= $b; + }; +} + +macro_rules! add_string { + ($a:expr, $b:expr) => { + $a.push_str($b); + }; +} + +macro_rules! div_int { + ($a:expr, $b:expr) => { + // TODO for average calculation, the accumulator must be large enough! + // Use u64 for all ints, and f32 for all floats. + // Therefore, the name "add" is too general. + //*$a /= $b; + }; +} + +macro_rules! div_bool { + ($a:expr, $b:expr) => { + // + }; +} + +macro_rules! div_string { + ($a:expr, $b:expr) => { + // + }; +} + +impl_scalar_ops!(u8, 0, equal_int, add_int, div_int); +impl_scalar_ops!(u16, 0, equal_int, add_int, div_int); +impl_scalar_ops!(u32, 0, equal_int, add_int, div_int); +impl_scalar_ops!(u64, 0, equal_int, add_int, div_int); +impl_scalar_ops!(i8, 0, equal_int, add_int, div_int); +impl_scalar_ops!(i16, 0, equal_int, add_int, div_int); +impl_scalar_ops!(i32, 0, equal_int, add_int, div_int); +impl_scalar_ops!(i64, 0, equal_int, add_int, div_int); +impl_scalar_ops!(f32, 0., equal_f32, add_int, div_int); +impl_scalar_ops!(f64, 0., equal_f64, add_int, div_int); +impl_scalar_ops!(bool, false, equal_bool, add_bool, div_bool); +impl_scalar_ops!(String, String::new(), equal_string, add_string, div_string); diff --git a/items_0/src/timebin.rs b/items_0/src/timebin.rs index ff8ce0b..5664477 100644 --- a/items_0/src/timebin.rs +++ b/items_0/src/timebin.rs @@ -1,10 +1,10 @@ use crate::collect_s::Collectable; use crate::collect_s::Collector; use crate::collect_s::ToJsonResult; +use crate::overlap::RangeOverlapInfo; use crate::AsAnyMut; use crate::AsAnyRef; use crate::Events; -use crate::RangeOverlapInfo; use crate::TypeName; use crate::WithLen; use netpod::log::*; diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs index 4b7596a..195ac74 100644 --- a/items_2/src/binsdim0.rs +++ b/items_2/src/binsdim0.rs @@ -375,6 +375,10 @@ impl BinsDim0CollectedResult { self.range_final } + pub fn timed_out(&self) -> bool { + self.timed_out + } + pub fn missing_bins(&self) -> u32 { self.missing_bins } diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index a64b0a0..bb207ae 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -6,6 +6,7 @@ use items_0::collect_s::Collected; use items_0::collect_s::Collector; use items_0::container::ByteEstimate; use items_0::framable::FrameTypeInnerStatic; +use items_0::overlap::RangeOverlapInfo; use items_0::streamitem::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID; use items_0::timebin::TimeBinnable; use items_0::timebin::TimeBinnableTy; @@ -16,7 +17,6 @@ use items_0::AsAnyMut; use items_0::AsAnyRef; use items_0::EventsNonObj; use items_0::MergeError; -use items_0::RangeOverlapInfo; use items_0::TypeName; use items_0::WithLen; use netpod::log::*; @@ -159,6 +159,7 @@ mod serde_channel_events { use crate::channelevents::ConnStatusEvent; use crate::eventsdim0::EventsDim0; use crate::eventsdim1::EventsDim1; + use crate::eventsxbindim0::EventsXbinDim0; use items_0::subfr::SubFrId; use serde::de::{self, EnumAccess, VariantAccess, Visitor}; use serde::ser::SerializeSeq; @@ -257,12 +258,35 @@ mod serde_channel_events { let obj: EventsDim1 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } + f64::SUB => { + let obj: EventsDim1 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + Ok(EvBox(Box::new(obj))) + } bool::SUB => { let obj: EventsDim1 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } _ => Err(de::Error::custom(&format!("unknown nty {e1}"))), } + } else if e0 == EventsXbinDim0::::serde_id() { + match e1 { + f32::SUB => { + let obj: EventsXbinDim0 = + seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + Ok(EvBox(Box::new(obj))) + } + f64::SUB => { + let obj: EventsXbinDim0 = + seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + Ok(EvBox(Box::new(obj))) + } + bool::SUB => { + let obj: EventsXbinDim0 = + seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + Ok(EvBox(Box::new(obj))) + } + _ => Err(de::Error::custom(&format!("unknown nty {e1}"))), + } } else { Err(de::Error::custom(&format!("unknown cty {e0}"))) } diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index c01ce1d..34a3240 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -14,6 +14,8 @@ use items_0::collect_s::ToJsonBytes; use items_0::collect_s::ToJsonResult; use items_0::container::ByteEstimate; use items_0::framable::FrameTypeInnerStatic; +use items_0::overlap::HasTimestampDeque; +use items_0::overlap::RangeOverlapCmp; use items_0::scalar_ops::ScalarOps; use items_0::timebin::TimeBinnable; use items_0::timebin::TimeBinned; @@ -59,14 +61,14 @@ macro_rules! trace2 { } #[derive(Clone, PartialEq, Serialize, Deserialize)] -pub struct EventsDim0 { +pub struct EventsDim0 { pub tss: VecDeque, pub pulses: VecDeque, - pub values: VecDeque, + pub values: VecDeque, } -impl EventsDim0 { - pub fn push_front(&mut self, ts: u64, pulse: u64, value: NTY) { +impl EventsDim0 { + pub fn push_front(&mut self, ts: u64, pulse: u64, value: STY) { self.tss.push_front(ts); self.pulses.push_front(pulse); self.values.push_front(value); @@ -81,25 +83,25 @@ impl EventsDim0 { } } -impl AsAnyRef for EventsDim0 +impl AsAnyRef for EventsDim0 where - NTY: ScalarOps, + STY: ScalarOps, { fn as_any_ref(&self) -> &dyn Any { self } } -impl AsAnyMut for EventsDim0 +impl AsAnyMut for EventsDim0 where - NTY: ScalarOps, + STY: ScalarOps, { fn as_any_mut(&mut self) -> &mut dyn Any { self } } -impl Empty for EventsDim0 { +impl Empty for EventsDim0 { fn empty() -> Self { Self { tss: VecDeque::new(), @@ -109,15 +111,16 @@ impl Empty for EventsDim0 { } } -impl fmt::Debug for EventsDim0 +impl fmt::Debug for EventsDim0 where - NTY: fmt::Debug, + STY: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { if false { write!( fmt, - "EventsDim0 {{ count {} ts {:?} vals {:?} }}", + "{} {{ count {} ts {:?} vals {:?} }}", + self.type_name(), self.tss.len(), self.tss.iter().map(|x| x / SEC).collect::>(), self.values, @@ -125,7 +128,8 @@ where } else { write!( fmt, - "EventsDim0 {{ count {} ts {:?} .. {:?} vals {:?} .. {:?} }}", + "{} {{ count {} ts {:?} .. {:?} vals {:?} .. {:?} }}", + self.type_name(), self.tss.len(), self.tss.front().map(|x| x / SEC), self.tss.back().map(|x| x / SEC), @@ -136,7 +140,7 @@ where } } -impl WithLen for EventsDim0 { +impl WithLen for EventsDim0 { fn len(&self) -> usize { self.tss.len() } @@ -149,71 +153,32 @@ impl ByteEstimate for EventsDim0 { } } -impl RangeOverlapInfo for EventsDim0 { - fn ends_before(&self, range: &SeriesRange) -> bool { - if range.is_time() { - if let Some(&max) = self.tss.back() { - max < range.beg_u64() - } else { - true - } - } else if range.is_pulse() { - if let Some(&max) = self.pulses.back() { - max < range.beg_u64() - } else { - true - } - } else { - error!("unexpected"); - true - } +impl HasTimestampDeque for EventsDim0 { + fn timestamp_min(&self) -> Option { + self.tss.front().map(|x| *x) } - fn ends_after(&self, range: &SeriesRange) -> bool { - if range.is_time() { - if let Some(&max) = self.tss.back() { - max >= range.end_u64() - } else { - true - } - } else if range.is_pulse() { - if let Some(&max) = self.pulses.back() { - max >= range.end_u64() - } else { - true - } - } else { - error!("unexpected"); - false - } + fn timestamp_max(&self) -> Option { + self.tss.back().map(|x| *x) } - fn starts_after(&self, range: &SeriesRange) -> bool { - if range.is_time() { - if let Some(&min) = self.tss.front() { - min >= range.end_u64() - } else { - true - } - } else if range.is_pulse() { - if let Some(&min) = self.pulses.front() { - min >= range.end_u64() - } else { - true - } - } else { - error!("unexpected"); - true - } + fn pulse_min(&self) -> Option { + self.pulses.front().map(|x| *x) + } + + fn pulse_max(&self) -> Option { + self.pulses.back().map(|x| *x) } } -impl TimeBinnableType for EventsDim0 +items_0::impl_range_overlap_info_events!(EventsDim0); + +impl TimeBinnableType for EventsDim0 where - NTY: ScalarOps, + STY: ScalarOps, { - type Output = BinsDim0; - type Aggregator = EventsDim0Aggregator; + type Output = BinsDim0; + type Aggregator = EventsDim0Aggregator; fn aggregator(range: SeriesRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { let self_name = any::type_name::(); @@ -226,13 +191,13 @@ where } #[derive(Debug)] -pub struct EventsDim0Collector { - vals: Option>, +pub struct EventsDim0Collector { + vals: Option>, range_final: bool, timed_out: bool, } -impl EventsDim0Collector { +impl EventsDim0Collector { pub fn new() -> Self { Self { vals: None, @@ -242,14 +207,14 @@ impl EventsDim0Collector { } } -impl WithLen for EventsDim0Collector { +impl WithLen for EventsDim0Collector { fn len(&self) -> usize { self.vals.as_ref().map_or(0, |x| x.tss.len()) } } #[derive(Debug, Serialize, Deserialize)] -pub struct EventsDim0CollectorOutput { +pub struct EventsDim0CollectorOutput { #[serde(rename = "tsAnchor")] ts_anchor_sec: u64, #[serde(rename = "tsMs")] @@ -261,7 +226,7 @@ pub struct EventsDim0CollectorOutput { #[serde(rename = "pulseOff")] pulse_off: VecDeque, #[serde(rename = "values")] - values: VecDeque, + values: VecDeque, #[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")] range_final: bool, #[serde(rename = "timedOut", default, skip_serializing_if = "is_false")] @@ -270,7 +235,7 @@ pub struct EventsDim0CollectorOutput { continue_at: Option, } -impl EventsDim0CollectorOutput { +impl EventsDim0CollectorOutput { pub fn ts_anchor_sec(&self) -> u64 { self.ts_anchor_sec } @@ -328,42 +293,42 @@ impl EventsDim0CollectorOutput { } } -impl AsAnyRef for EventsDim0CollectorOutput +impl AsAnyRef for EventsDim0CollectorOutput where - NTY: ScalarOps, + STY: ScalarOps, { fn as_any_ref(&self) -> &dyn Any { self } } -impl AsAnyMut for EventsDim0CollectorOutput +impl AsAnyMut for EventsDim0CollectorOutput where - NTY: ScalarOps, + STY: ScalarOps, { fn as_any_mut(&mut self) -> &mut dyn Any { self } } -impl WithLen for EventsDim0CollectorOutput { +impl WithLen for EventsDim0CollectorOutput { fn len(&self) -> usize { self.values.len() } } -impl ToJsonResult for EventsDim0CollectorOutput { +impl ToJsonResult for EventsDim0CollectorOutput { fn to_json_result(&self) -> Result, Error> { let k = serde_json::to_value(self)?; Ok(Box::new(k)) } } -impl Collected for EventsDim0CollectorOutput {} +impl Collected for EventsDim0CollectorOutput {} -impl CollectorType for EventsDim0Collector { - type Input = EventsDim0; - type Output = EventsDim0CollectorOutput; +impl CollectorType for EventsDim0Collector { + type Input = EventsDim0; + type Output = EventsDim0CollectorOutput; fn ingest(&mut self, src: &mut Self::Input) { if self.vals.is_none() { @@ -451,8 +416,8 @@ impl CollectorType for EventsDim0Collector { } } -impl items_0::collect_s::CollectableType for EventsDim0 { - type Collector = EventsDim0Collector; +impl items_0::collect_s::CollectableType for EventsDim0 { + type Collector = EventsDim0Collector; fn new_collector() -> Self::Collector { Self::Collector::new() @@ -460,23 +425,23 @@ impl items_0::collect_s::CollectableType for EventsDim0 { } #[derive(Debug)] -pub struct EventsDim0Aggregator { +pub struct EventsDim0Aggregator { range: SeriesRange, count: u64, - min: NTY, - max: NTY, + min: STY, + max: STY, sumc: u64, sum: f32, int_ts: u64, last_seen_ts: u64, - last_seen_val: Option, + last_seen_val: Option, did_min_max: bool, do_time_weight: bool, events_taken_count: u64, events_ignored_count: u64, } -impl Drop for EventsDim0Aggregator { +impl Drop for EventsDim0Aggregator { fn drop(&mut self) { // TODO collect as stats for the request context: trace!( @@ -487,9 +452,9 @@ impl Drop for EventsDim0Aggregator { } } -impl EventsDim0Aggregator { +impl EventsDim0Aggregator { fn self_name() -> String { - format!("{}<{}>", any::type_name::(), any::type_name::()) + format!("{}<{}>", any::type_name::(), any::type_name::()) } pub fn new(binrange: SeriesRange, do_time_weight: bool) -> Self { @@ -497,8 +462,8 @@ impl EventsDim0Aggregator { Self { range: binrange, count: 0, - min: NTY::zero_b(), - max: NTY::zero_b(), + min: STY::zero_b(), + max: STY::zero_b(), sum: 0., sumc: 0, int_ts, @@ -512,7 +477,7 @@ impl EventsDim0Aggregator { } // TODO reduce clone.. optimize via more traits to factor the trade-offs? - fn apply_min_max(&mut self, val: NTY) { + fn apply_min_max(&mut self, val: STY) { trace_ingest!( "apply_min_max val {:?} last_val {:?} count {} sumc {:?} min {:?} max {:?}", val, @@ -536,7 +501,7 @@ impl EventsDim0Aggregator { } } - fn apply_event_unweight(&mut self, val: NTY) { + fn apply_event_unweight(&mut self, val: STY) { error!("TODO check again result_reset_unweight"); err::todo(); let vf = val.as_prim_f32_b(); @@ -636,7 +601,7 @@ impl EventsDim0Aggregator { } } - fn result_reset_unweight(&mut self, range: SeriesRange) -> BinsDim0 { + fn result_reset_unweight(&mut self, range: SeriesRange) -> BinsDim0 { trace!("TODO check again result_reset_unweight"); err::todo(); let (min, max, avg) = if self.sumc > 0 { @@ -645,7 +610,7 @@ impl EventsDim0Aggregator { } else { let g = match &self.last_seen_val { Some(x) => x.clone(), - None => NTY::zero_b(), + None => STY::zero_b(), }; (g.clone(), g.clone(), g.as_prim_f32_b()) }; @@ -672,7 +637,7 @@ impl EventsDim0Aggregator { ret } - fn result_reset_time_weight(&mut self, range: SeriesRange) -> BinsDim0 { + fn result_reset_time_weight(&mut self, range: SeriesRange) -> BinsDim0 { // TODO check callsite for correct expand status. debug!("result_reset_time_weight calls apply_event_time_weight"); if self.range.is_time() { @@ -687,7 +652,7 @@ impl EventsDim0Aggregator { } else { let g = match &self.last_seen_val { Some(x) => x.clone(), - None => NTY::zero_b(), + None => STY::zero_b(), }; (g.clone(), g.clone(), g.as_prim_f32_b()) }; @@ -711,15 +676,15 @@ impl EventsDim0Aggregator { self.sum = 0.; self.sumc = 0; self.did_min_max = false; - self.min = NTY::zero_b(); - self.max = NTY::zero_b(); + self.min = STY::zero_b(); + self.max = STY::zero_b(); ret } } -impl TimeBinnableTypeAggregator for EventsDim0Aggregator { - type Input = EventsDim0; - type Output = BinsDim0; +impl TimeBinnableTypeAggregator for EventsDim0Aggregator { + type Input = EventsDim0; + type Output = BinsDim0; fn range(&self) -> &SeriesRange { &self.range @@ -751,9 +716,9 @@ impl TimeBinnableTypeAggregator for EventsDim0Aggregator { } } -impl TimeBinnable for EventsDim0 { +impl TimeBinnable for EventsDim0 { fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box { - let ret = EventsDim0TimeBinner::::new(binrange, do_time_weight).unwrap(); + let ret = EventsDim0TimeBinner::::new(binrange, do_time_weight).unwrap(); Box::new(ret) } diff --git a/items_2/src/eventsdim1.rs b/items_2/src/eventsdim1.rs index 32a6440..1717efd 100644 --- a/items_2/src/eventsdim1.rs +++ b/items_2/src/eventsdim1.rs @@ -1,4 +1,5 @@ use crate::binsdim0::BinsDim0; +use crate::eventsxbindim0::EventsXbinDim0; use crate::framable::FrameType; use crate::IsoDateTime; use crate::RangeOverlapInfo; @@ -12,6 +13,8 @@ use items_0::collect_s::CollectorType; use items_0::collect_s::ToJsonBytes; use items_0::collect_s::ToJsonResult; use items_0::container::ByteEstimate; +use items_0::overlap::HasTimestampDeque; +use items_0::overlap::RangeOverlapCmp; use items_0::scalar_ops::ScalarOps; use items_0::timebin::TimeBinnable; use items_0::timebin::TimeBinned; @@ -143,20 +146,26 @@ impl ByteEstimate for EventsDim1 { } } -impl RangeOverlapInfo for EventsDim1 { - fn ends_before(&self, range: &SeriesRange) -> bool { - todo!() +impl HasTimestampDeque for EventsDim1 { + fn timestamp_min(&self) -> Option { + self.tss.front().map(|x| *x) } - fn ends_after(&self, range: &SeriesRange) -> bool { - todo!() + fn timestamp_max(&self) -> Option { + self.tss.back().map(|x| *x) } - fn starts_after(&self, range: &SeriesRange) -> bool { - todo!() + fn pulse_min(&self) -> Option { + self.pulses.front().map(|x| *x) + } + + fn pulse_max(&self) -> Option { + self.pulses.back().map(|x| *x) } } +items_0::impl_range_overlap_info_events!(EventsDim1); + impl TimeBinnableType for EventsDim1 where NTY: ScalarOps, @@ -821,7 +830,33 @@ impl Events for EventsDim1 { } fn to_min_max_avg(&mut self) -> Box { - todo!() + let mins = self + .values + .iter() + .map(|x| STY::find_vec_min(x)) + .map(|x| x.unwrap_or_else(|| STY::zero_b())) + .collect(); + let maxs = self + .values + .iter() + .map(|x| STY::find_vec_max(x)) + .map(|x| x.unwrap_or_else(|| STY::zero_b())) + .collect(); + let avgs = self + .values + .iter() + .map(|x| STY::avg_vec(x)) + .map(|x| x.unwrap_or_else(|| STY::zero_b())) + .map(|x| x.as_prim_f32_b()) + .collect(); + let item = EventsXbinDim0 { + tss: mem::replace(&mut self.tss, VecDeque::new()), + pulses: mem::replace(&mut self.pulses, VecDeque::new()), + mins, + maxs, + avgs, + }; + Box::new(item) } } diff --git a/items_2/src/eventsxbindim0.rs b/items_2/src/eventsxbindim0.rs index 08a4272..08ff546 100644 --- a/items_2/src/eventsxbindim0.rs +++ b/items_2/src/eventsxbindim0.rs @@ -4,16 +4,23 @@ use crate::RangeOverlapInfo; use crate::TimeBinnableType; use crate::TimeBinnableTypeAggregator; use err::Error; +use items_0::collect_s::Collectable; use items_0::collect_s::CollectableType; use items_0::collect_s::Collected; use items_0::collect_s::CollectorType; use items_0::collect_s::ToJsonBytes; use items_0::collect_s::ToJsonResult; use items_0::container::ByteEstimate; +use items_0::overlap::HasTimestampDeque; +use items_0::overlap::RangeOverlapCmp; use items_0::scalar_ops::ScalarOps; +use items_0::timebin::TimeBinnable; use items_0::AsAnyMut; use items_0::AsAnyRef; use items_0::Empty; +use items_0::Events; +use items_0::EventsNonObj; +use items_0::MergeError; use items_0::TypeName; use items_0::WithLen; use netpod::is_false; @@ -26,6 +33,7 @@ use std::any; use std::any::Any; use std::collections::VecDeque; use std::fmt; +use std::mem; #[derive(Clone, PartialEq, Serialize, Deserialize)] pub struct EventsXbinDim0 { @@ -55,6 +63,10 @@ impl EventsXbinDim0 { self.maxs.push_front(max); self.avgs.push_front(avg); } + + pub fn serde_id() -> &'static str { + "EventsXbinDim0" + } } impl TypeName for EventsXbinDim0 { @@ -120,17 +132,207 @@ impl WithLen for EventsXbinDim0 { } } -impl RangeOverlapInfo for EventsXbinDim0 { - fn ends_before(&self, range: &SeriesRange) -> bool { - todo!() +impl HasTimestampDeque for EventsXbinDim0 { + fn timestamp_min(&self) -> Option { + self.tss.front().map(|x| *x) } - fn ends_after(&self, range: &SeriesRange) -> bool { - todo!() + fn timestamp_max(&self) -> Option { + self.tss.back().map(|x| *x) } - fn starts_after(&self, range: &SeriesRange) -> bool { - todo!() + fn pulse_min(&self) -> Option { + self.pulses.front().map(|x| *x) + } + + fn pulse_max(&self) -> Option { + self.pulses.back().map(|x| *x) + } +} + +items_0::impl_range_overlap_info_events!(EventsXbinDim0); + +impl EventsNonObj for EventsXbinDim0 { + fn into_tss_pulses(self: Box) -> (VecDeque, VecDeque) { + info!( + "EventsXbinDim0::into_tss_pulses len {} len {}", + self.tss.len(), + self.pulses.len() + ); + (self.tss, self.pulses) + } +} + +impl Events for EventsXbinDim0 { + fn as_time_binnable_mut(&mut self) -> &mut dyn TimeBinnable { + self as &mut dyn TimeBinnable + } + + fn verify(&self) -> bool { + let mut good = true; + let mut ts_max = 0; + for ts in &self.tss { + let ts = *ts; + if ts < ts_max { + good = false; + error!("unordered event data ts {} ts_max {}", ts, ts_max); + } + ts_max = ts_max.max(ts); + } + good + } + + fn output_info(&self) { + if false { + info!("output_info len {}", self.tss.len()); + if self.tss.len() == 1 { + info!( + " only: ts {} pulse {} value {:?}", + self.tss[0], self.pulses[0], self.avgs[0] + ); + } else if self.tss.len() > 1 { + info!( + " first: ts {} pulse {} value {:?}", + self.tss[0], self.pulses[0], self.avgs[0] + ); + let n = self.tss.len() - 1; + info!( + " last: ts {} pulse {} value {:?}", + self.tss[n], self.pulses[n], self.avgs[n] + ); + } + } + } + + fn as_collectable_mut(&mut self) -> &mut dyn Collectable { + self + } + + fn as_collectable_with_default_ref(&self) -> &dyn Collectable { + self + } + + fn as_collectable_with_default_mut(&mut self) -> &mut dyn Collectable { + self + } + + fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box { + // TODO improve the search + let n1 = self.tss.iter().take_while(|&&x| x <= ts_end).count(); + let tss = self.tss.drain(..n1).collect(); + let pulses = self.pulses.drain(..n1).collect(); + let mins = self.mins.drain(..n1).collect(); + let maxs = self.maxs.drain(..n1).collect(); + let avgs = self.avgs.drain(..n1).collect(); + let ret = Self { + tss, + pulses, + mins, + maxs, + avgs, + }; + Box::new(ret) + } + + fn new_empty_evs(&self) -> Box { + Box::new(Self::empty()) + } + + fn drain_into_evs(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError> { + // TODO as_any and as_any_mut are declared on unrelated traits. Simplify. + if let Some(dst) = dst.as_mut().as_any_mut().downcast_mut::() { + // TODO make it harder to forget new members when the struct may get modified in the future + let r = range.0..range.1; + dst.tss.extend(self.tss.drain(r.clone())); + dst.pulses.extend(self.pulses.drain(r.clone())); + dst.mins.extend(self.mins.drain(r.clone())); + dst.maxs.extend(self.maxs.drain(r.clone())); + dst.avgs.extend(self.avgs.drain(r.clone())); + Ok(()) + } else { + error!("downcast to {} FAILED", self.type_name()); + Err(MergeError::NotCompatible) + } + } + + fn find_lowest_index_gt_evs(&self, ts: u64) -> Option { + for (i, &m) in self.tss.iter().enumerate() { + if m > ts { + return Some(i); + } + } + None + } + + fn find_lowest_index_ge_evs(&self, ts: u64) -> Option { + for (i, &m) in self.tss.iter().enumerate() { + if m >= ts { + return Some(i); + } + } + None + } + + fn find_highest_index_lt_evs(&self, ts: u64) -> Option { + for (i, &m) in self.tss.iter().enumerate().rev() { + if m < ts { + return Some(i); + } + } + None + } + + fn ts_min(&self) -> Option { + self.tss.front().map(|&x| x) + } + + fn ts_max(&self) -> Option { + self.tss.back().map(|&x| x) + } + + fn partial_eq_dyn(&self, other: &dyn Events) -> bool { + if let Some(other) = other.as_any_ref().downcast_ref::() { + self == other + } else { + false + } + } + + fn serde_id(&self) -> &'static str { + Self::serde_id() + } + + fn nty_id(&self) -> u32 { + STY::SUB + } + + fn clone_dyn(&self) -> Box { + Box::new(self.clone()) + } + + fn tss(&self) -> &VecDeque { + &self.tss + } + + fn pulses(&self) -> &VecDeque { + &self.pulses + } + + fn frame_type_id(&self) -> u32 { + error!("TODO frame_type_id should not be called"); + // TODO make more nice + panic!() + } + + fn to_min_max_avg(&mut self) -> Box { + let dst = Self { + tss: mem::replace(&mut self.tss, Default::default()), + pulses: mem::replace(&mut self.pulses, Default::default()), + mins: mem::replace(&mut self.mins, Default::default()), + maxs: mem::replace(&mut self.maxs, Default::default()), + avgs: mem::replace(&mut self.avgs, Default::default()), + }; + Box::new(dst) } } @@ -151,6 +353,23 @@ where } } +impl TimeBinnable for EventsXbinDim0 +where + NTY: ScalarOps, +{ + fn time_binner_new( + &self, + binrange: BinnedRangeEnum, + do_time_weight: bool, + ) -> Box { + todo!() + } + + fn to_box_to_json_result(&self) -> Box { + todo!() + } +} + pub struct EventsXbinDim0Aggregator where NTY: ScalarOps, diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index 87c2cb2..2518b20 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -27,7 +27,7 @@ use items_0::transform::EventTransform; use items_0::Empty; use items_0::Events; use items_0::MergeError; -use items_0::RangeOverlapInfo; +use items_0::overlap::RangeOverlapInfo; use merger::Mergeable; use netpod::range::evrange::SeriesRange; use netpod::timeunits::*; diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 7a9fd0d..24a16c6 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -47,6 +47,8 @@ pub const CONNECTION_STATUS_DIV: u64 = timeunits::DAY; pub const TS_MSP_GRID_UNIT: u64 = timeunits::SEC * 10; pub const TS_MSP_GRID_SPACING: u64 = 6 * 2; +const TEST_BACKEND: &str = "testbackend-00"; + pub fn is_false(x: T) -> bool where T: std::borrow::Borrow, @@ -2359,7 +2361,7 @@ pub fn test_cluster() -> Cluster { }) .collect(); Cluster { - backend: "testbackend".into(), + backend: TEST_BACKEND.into(), nodes, database: Database { host: "127.0.0.1".into(), diff --git a/nodenet/src/channelconfig.rs b/nodenet/src/channelconfig.rs index bc7e89d..ec03289 100644 --- a/nodenet/src/channelconfig.rs +++ b/nodenet/src/channelconfig.rs @@ -8,11 +8,13 @@ use netpod::NodeConfigCached; use netpod::ScalarType; use netpod::Shape; +const TEST_BACKEND: &str = "testbackend-00"; + pub async fn channel_config(range: NanoRange, channel: Channel, ncc: &NodeConfigCached) -> Result { - if channel.backend() == "test-disk-databuffer" { + if channel.backend() == TEST_BACKEND { let backend = channel.backend().into(); // TODO the series-ids here are just random. Need to integrate with better test setup. - let ret = if channel.name() == "scalar-i32-be" { + let ret = if channel.name() == "inmem-d0-i32" { let ret = ChConf { backend, series: Some(1), @@ -21,10 +23,19 @@ pub async fn channel_config(range: NanoRange, channel: Channel, ncc: &NodeConfig shape: Shape::Scalar, }; Ok(ret) + } else if channel.name() == "scalar-i32-be" { + let ret = ChConf { + backend, + series: Some(2), + name: channel.name().into(), + scalar_type: ScalarType::I32, + shape: Shape::Scalar, + }; + Ok(ret) } else if channel.name() == "wave-f64-be-n21" { let ret = ChConf { backend, - series: Some(2), + series: Some(3), name: channel.name().into(), scalar_type: ScalarType::F64, shape: Shape::Wave(21), @@ -33,29 +44,39 @@ pub async fn channel_config(range: NanoRange, channel: Channel, ncc: &NodeConfig } else if channel.name() == "const-regular-scalar-i32-be" { let ret = ChConf { backend, - series: Some(3), + series: Some(4), name: channel.name().into(), scalar_type: ScalarType::I32, shape: Shape::Scalar, }; Ok(ret) - } else { - error!("no test information"); - Err(Error::with_msg_no_trace(format!("no test information")) - .add_public_msg("No channel config for test channel {:?}")) - }; - ret - } else if channel.backend() == "test-inmem" { - let backend = channel.backend().into(); - let ret = if channel.name() == "inmem-d0-i32" { + } else if channel.name() == "test-gen-i32-dim0-v00" { let ret = ChConf { backend, - series: Some(1), + series: Some(5), name: channel.name().into(), scalar_type: ScalarType::I32, shape: Shape::Scalar, }; Ok(ret) + } else if channel.name() == "test-gen-i32-dim0-v01" { + let ret = ChConf { + backend, + series: Some(6), + name: channel.name().into(), + scalar_type: ScalarType::I32, + shape: Shape::Scalar, + }; + Ok(ret) + } else if channel.name() == "test-gen-f64-dim1-v00" { + let ret = ChConf { + backend, + series: Some(7), + name: channel.name().into(), + scalar_type: ScalarType::F64, + shape: Shape::Wave(21), + }; + Ok(ret) } else { error!("no test information"); Err(Error::with_msg_no_trace(format!("no test information")) diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index f778e96..3d406ad 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -28,6 +28,8 @@ use query::api4::events::PlainEventsQuery; use std::net::SocketAddr; use std::pin::Pin; use streams::frames::inmem::InMemoryFrameAsyncReadStream; +use streams::generators::GenerateF64V00; +use streams::generators::GenerateI32V01; use streams::transform::build_event_transform; use tokio::io::AsyncWriteExt; use tokio::net::tcp::OwnedReadHalf; @@ -35,6 +37,8 @@ use tokio::net::tcp::OwnedWriteHalf; use tokio::net::TcpStream; use tracing::Instrument; +const TEST_BACKEND: &str = "testbackend-00"; + #[cfg(test)] mod test; @@ -72,37 +76,44 @@ async fn make_channel_events_stream_data( node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { info!("nodenet::conn::make_channel_events_stream"); - if evq.channel().backend() == "test-inmem" { + if evq.channel().backend() == TEST_BACKEND { warn!("GENERATE INMEM TEST DATA"); let node_count = node_config.node_config.cluster.nodes.len() as u64; let node_ix = node_config.ix as u64; let chn = evq.channel().name(); - let na: Vec<_> = chn.split("-").collect(); - if na.len() != 3 { - Err(Error::with_msg_no_trace(format!( - "can not understand test channel name: {chn:?}" - ))) + let range = evq.range().clone(); + if chn == "test-gen-i32-dim0-v01" { + Ok(Box::pin(GenerateI32V01::new(node_ix, node_count, range))) + } else if chn == "test-gen-f64-dim1-v00" { + Ok(Box::pin(GenerateF64V00::new(node_ix, node_count, range))) } else { - if na[0] != "inmem" { + let na: Vec<_> = chn.split("-").collect(); + if na.len() != 3 { Err(Error::with_msg_no_trace(format!( "can not understand test channel name: {chn:?}" ))) } else { - let range = evq.range().clone(); - if na[1] == "d0" { - if na[2] == "i32" { - generator::generate_i32(node_ix, node_count, range) - } else if na[2] == "f32" { - generator::generate_f32(node_ix, node_count, range) + if na[0] != "inmem" { + Err(Error::with_msg_no_trace(format!( + "can not understand test channel name: {chn:?}" + ))) + } else { + let range = evq.range().clone(); + if na[1] == "d0" { + if na[2] == "i32" { + generator::generate_i32(node_ix, node_count, range) + } else if na[2] == "f32" { + generator::generate_f32(node_ix, node_count, range) + } else { + Err(Error::with_msg_no_trace(format!( + "can not understand test channel name: {chn:?}" + ))) + } } else { Err(Error::with_msg_no_trace(format!( "can not understand test channel name: {chn:?}" ))) } - } else { - Err(Error::with_msg_no_trace(format!( - "can not understand test channel name: {chn:?}" - ))) } } } diff --git a/nodenet/src/conn/test.rs b/nodenet/src/conn/test.rs index 70d0384..b15e997 100644 --- a/nodenet/src/conn/test.rs +++ b/nodenet/src/conn/test.rs @@ -28,6 +28,8 @@ use tokio::io::AsyncWriteExt; use tokio::net::TcpListener; use tokio::net::TcpStream; +const TEST_BACKEND: &str = "testbackend-00"; + #[test] fn raw_data_00() { let fut = async { @@ -38,8 +40,8 @@ fn raw_data_00() { node_config: NodeConfig { name: "node_name_dummy".into(), cluster: Cluster { - backend: "testbackend".into(), - nodes: vec![], + backend: TEST_BACKEND.into(), + nodes: Vec::new(), database: Database { name: "".into(), host: "".into(), @@ -73,7 +75,7 @@ fn raw_data_00() { }; let channel = Channel { series: None, - backend: "test-adhoc-dyn".into(), + backend: TEST_BACKEND.into(), name: "scalar-i32".into(), }; let range = NanoRange { diff --git a/parse/src/channelconfig.rs b/parse/src/channelconfig.rs index 4f1349e..300ae95 100644 --- a/parse/src/channelconfig.rs +++ b/parse/src/channelconfig.rs @@ -1,11 +1,12 @@ use err::Error; use netpod::range::evrange::NanoRange; +use netpod::timeunits::DAY; use netpod::timeunits::MS; use netpod::ByteOrder; use netpod::Channel; use netpod::ChannelConfigQuery; use netpod::ChannelConfigResponse; -use netpod::Node; +use netpod::NodeConfigCached; use netpod::ScalarType; use netpod::Shape; use netpod::TsNano; @@ -24,6 +25,8 @@ use serde::Serialize; use std::fmt; use tokio::io::ErrorKind; +const TEST_BACKEND: &str = "testbackend-00"; + #[derive(Debug)] pub struct NErr { msg: String, @@ -302,8 +305,8 @@ pub fn parse_config(inp: &[u8]) -> NRes { Ok((inp, ret)) } -pub async fn channel_config(q: &ChannelConfigQuery, node: &Node) -> Result { - let conf = read_local_config(q.channel.clone(), node.clone()).await?; +pub async fn channel_config(q: &ChannelConfigQuery, ncc: &NodeConfigCached) -> Result { + let conf = read_local_config(q.channel.clone(), ncc.clone()).await?; let entry_res = extract_matching_config_entry(&q.range, &conf)?; let entry = match entry_res { MatchingConfigEntry::None => return Err(Error::with_public_msg("no config entry found")), @@ -319,9 +322,9 @@ pub async fn channel_config(q: &ChannelConfigQuery, node: &Node) -> Result Result { - let path = node +async fn read_local_config_real(channel: Channel, ncc: &NodeConfigCached) -> Result { + let path = ncc + .node .sf_databuffer .as_ref() .ok_or_else(|| Error::with_msg(format!("missing sf databuffer config in node")))? @@ -353,6 +356,81 @@ pub async fn read_local_config(channel: Channel, node: Node) -> Result Result { + if channel.name() == "test-gen-i32-dim0-v00" { + let ret = ChannelConfigs { + format_version: 0, + channel_name: channel.name().into(), + entries: vec![ConfigEntry { + ts: TsNano(0), + pulse: 0, + ks: 2, + bs: TsNano(DAY), + split_count: ncc.node_config.cluster.nodes.len() as _, + status: -1, + bb: -1, + modulo: -1, + offset: -1, + precision: -1, + scalar_type: ScalarType::I32, + is_compressed: false, + is_shaped: false, + is_array: false, + byte_order: ByteOrder::Big, + compression_method: None, + shape: None, + source_name: None, + unit: None, + description: None, + optional_fields: None, + value_converter: None, + }], + }; + Ok(ret) + } else if channel.name() == "test-gen-i32-dim0-v01" { + let ret = ChannelConfigs { + format_version: 0, + channel_name: channel.name().into(), + entries: vec![ConfigEntry { + ts: TsNano(0), + pulse: 0, + ks: 2, + bs: TsNano(DAY), + split_count: ncc.node_config.cluster.nodes.len() as _, + status: -1, + bb: -1, + modulo: -1, + offset: -1, + precision: -1, + scalar_type: ScalarType::I32, + is_compressed: false, + is_shaped: false, + is_array: false, + byte_order: ByteOrder::Big, + compression_method: None, + shape: None, + source_name: None, + unit: None, + description: None, + optional_fields: None, + value_converter: None, + }], + }; + Ok(ret) + } else { + Err(Error::with_msg_no_trace(format!("unknown test channel {channel:?}"))) + } +} + +// TODO can I take parameters as ref, even when used in custom streams? +pub async fn read_local_config(channel: Channel, ncc: NodeConfigCached) -> Result { + if channel.backend() == TEST_BACKEND { + read_local_config_test(channel, &ncc).await + } else { + read_local_config_real(channel, &ncc).await + } +} + #[derive(Clone)] pub enum MatchingConfigEntry<'a> { None, diff --git a/streams/src/generators.rs b/streams/src/generators.rs index cc9d583..480c2e0 100644 --- a/streams/src/generators.rs +++ b/streams/src/generators.rs @@ -9,8 +9,12 @@ use items_0::streamitem::StreamItem; use items_0::Appendable; use items_0::Empty; use items_2::channelevents::ChannelEvents; +use items_2::eventsdim0::EventsDim0; +use items_2::eventsdim1::EventsDim1; +use netpod::log::*; use netpod::range::evrange::SeriesRange; use netpod::timeunits::MS; +use std::f64::consts::PI; use std::pin::Pin; use std::task::Context; use std::task::Poll; @@ -49,7 +53,7 @@ impl GenerateI32 { fn make_batch(&mut self) -> Sitemty { type T = i32; - let mut item = items_2::eventsdim0::EventsDim0::empty(); + let mut item = EventsDim0::empty(); let mut ts = self.ts; loop { if self.ts >= self.tsend || item.byte_estimate() > 200 { @@ -97,3 +101,194 @@ impl Stream for GenerateI32 { } } } + +pub struct GenerateI32V01 { + ivl: u64, + ts: u64, + dts: u64, + tsend: u64, + #[allow(unused)] + c1: u64, + node_ix: u64, + timeout: Option + Send>>>, + done: bool, + done_range_final: bool, +} + +impl GenerateI32V01 { + pub fn new(node_ix: u64, node_count: u64, range: SeriesRange) -> Self { + let range = match range { + SeriesRange::TimeRange(k) => k, + SeriesRange::PulseRange(_) => todo!(), + }; + let ivl = MS * 500; + let dts = ivl * node_count as u64; + let ts = (range.beg / ivl + node_ix) * ivl; + let tsend = range.end; + info!("START GENERATOR GenerateI32V01 ivl {} dts {} ts {}", ivl, dts, ts); + Self { + ivl, + ts, + dts, + tsend, + c1: 0, + node_ix, + timeout: None, + done: false, + done_range_final: false, + } + } + + fn make_batch(&mut self) -> Sitemty { + type T = i32; + let mut item = EventsDim0::empty(); + let mut ts = self.ts; + loop { + if self.ts >= self.tsend || item.byte_estimate() > 400 { + break; + } + let pulse = ts; + let value = (ts / self.ivl) as T; + if false { + info!( + "v01 node {} made event ts {} pulse {} value {}", + self.node_ix, ts, pulse, value + ); + } + item.push(ts, pulse, value); + ts += self.dts; + } + self.ts = ts; + let w = ChannelEvents::Events(Box::new(item) as _); + let w = sitem_data(w); + w + } +} + +impl Stream for GenerateI32V01 { + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break if self.done_range_final { + Ready(None) + } else if self.ts >= self.tsend { + self.done = true; + self.done_range_final = true; + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } else if false { + // To use the generator without throttling, use this scope + Ready(Some(self.make_batch())) + } else if let Some(fut) = self.timeout.as_mut() { + match fut.poll_unpin(cx) { + Ready(()) => { + self.timeout = None; + Ready(Some(self.make_batch())) + } + Pending => Pending, + } + } else { + self.timeout = Some(Box::pin(tokio::time::sleep(Duration::from_millis(2)))); + continue; + }; + } + } +} + +pub struct GenerateF64V00 { + ivl: u64, + ts: u64, + dts: u64, + tsend: u64, + node_ix: u64, + timeout: Option + Send>>>, + done: bool, + done_range_final: bool, +} + +impl GenerateF64V00 { + pub fn new(node_ix: u64, node_count: u64, range: SeriesRange) -> Self { + let range = match range { + SeriesRange::TimeRange(k) => k, + SeriesRange::PulseRange(_) => todo!(), + }; + let ivl = MS * 100; + let dts = ivl * node_count as u64; + let ts = (range.beg / ivl + node_ix) * ivl; + let tsend = range.end; + info!("START GENERATOR GenerateF64V00 ivl {} dts {} ts {}", ivl, dts, ts); + Self { + ivl, + ts, + dts, + tsend, + node_ix, + timeout: None, + done: false, + done_range_final: false, + } + } + + fn make_batch(&mut self) -> Sitemty { + type T = f64; + let mut item = EventsDim1::empty(); + let mut ts = self.ts; + loop { + if self.ts >= self.tsend || item.byte_estimate() > 1024 * 4 { + break; + } + let pulse = ts; + let ampl = ((ts / self.ivl) as T).sin() + 2.; + let mut value = Vec::new(); + let pi = PI; + for i in 0..21 { + let x = ((-pi + (2. * pi / 20.) * i as f64).cos() + 1.) * ampl; + value.push(x); + } + if true { + info!( + "v01 node {} made event ts {} pulse {} value {:?}", + self.node_ix, ts, pulse, value + ); + } + item.push(ts, pulse, value); + ts += self.dts; + } + self.ts = ts; + let w = ChannelEvents::Events(Box::new(item) as _); + let w = sitem_data(w); + w + } +} + +impl Stream for GenerateF64V00 { + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break if self.done_range_final { + Ready(None) + } else if self.ts >= self.tsend { + self.done = true; + self.done_range_final = true; + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } else if false { + // To use the generator without throttling, use this scope + Ready(Some(self.make_batch())) + } else if let Some(fut) = self.timeout.as_mut() { + match fut.poll_unpin(cx) { + Ready(()) => { + self.timeout = None; + Ready(Some(self.make_batch())) + } + Pending => Pending, + } + } else { + self.timeout = Some(Box::pin(tokio::time::sleep(Duration::from_millis(2)))); + continue; + }; + } + } +} diff --git a/streams/src/timebin.rs b/streams/src/timebin.rs index ad20972..8d14806 100644 --- a/streams/src/timebin.rs +++ b/streams/src/timebin.rs @@ -18,19 +18,19 @@ use std::task::Poll; #[allow(unused)] macro_rules! trace2 { - (__$($arg:tt)*) => (); + ($($arg:tt)*) => (); ($($arg:tt)*) => (trace!($($arg)*)); } #[allow(unused)] macro_rules! trace3 { - (__$($arg:tt)*) => (); + ($($arg:tt)*) => (); ($($arg:tt)*) => (trace!($($arg)*)); } #[allow(unused)] macro_rules! trace4 { - (__$($arg:tt)*) => (); + ($($arg:tt)*) => (); ($($arg:tt)*) => (trace!($($arg)*)); } @@ -178,7 +178,7 @@ where ) -> Result::TimeBinner as TimeBinnerTy>::Output>>>, Error> { use ControlFlow::*; use Poll::*; - info!("================= handle_none"); + trace2!("================= handle_none"); let self_range_complete = self.range_final; if let Some(binner) = self.binner.as_mut() { trace!("bins ready count before finish {}", binner.bins_ready_count()); @@ -220,7 +220,7 @@ where ) -> Result::TimeBinner as TimeBinnerTy>::Output>>>, Error> { use ControlFlow::*; use Poll::*; - info!("================= poll_input"); + trace2!("================= poll_input"); match self.inp.poll_next_unpin(cx) { Ready(Some(item)) => self.handle_item(item), Ready(None) => self.handle_none(), @@ -239,7 +239,7 @@ where use Poll::*; let span = span!(Level::INFO, "TimeBinner"); let _spg = span.enter(); - info!("================= POLL"); + trace2!("================= POLL"); loop { break if self.complete { panic!("poll on complete")