From c6dba54a623064ccbecd3049a4debea703e0a79b Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 17 Feb 2022 16:19:39 +0100 Subject: [PATCH] Add json retrieve test for archiver appliance --- archapp/src/events.rs | 4 +- daqbufp2/src/nodes.rs | 34 +++++++ daqbufp2/src/test.rs | 1 + daqbufp2/src/test/archapp.rs | 96 +++++++++++++++++++ daqbufp2/src/test/binnedjson.rs | 15 +-- .../src/test/binnedjson/channelarchiver.rs | 2 + daqbufp2/src/test/events.rs | 9 +- netpod/src/netpod.rs | 54 ++++++++++- 8 files changed, 201 insertions(+), 14 deletions(-) create mode 100644 daqbufp2/src/test/archapp.rs diff --git a/archapp/src/events.rs b/archapp/src/events.rs index 1636dbf..1ea712d 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -715,7 +715,9 @@ pub async fn channel_info(channel: &Channel, aa: &ArchiverAppliance) -> Result>>, +} + +impl Drop for RunningArchappHost { + fn drop(&mut self) { + netpod::log::info!("\n\n+++++++++++++++++++ impl Drop for RunningArchappHost\n\n"); + } +} + lazy_static::lazy_static! { static ref HOSTS_RUNNING: Mutex>> = Mutex::new(None); static ref SLS_HOST_RUNNING: Mutex>> = Mutex::new(None); + static ref ARCHAPP_HOST_RUNNING: Mutex>> = Mutex::new(None); } pub fn require_test_hosts_running() -> Result, Error> { @@ -74,3 +86,25 @@ pub fn require_sls_test_host_running() -> Result, Error> { } } } + +pub fn require_archapp_test_host_running() -> Result, Error> { + let mut g = ARCHAPP_HOST_RUNNING.lock().unwrap(); + match g.as_ref() { + None => { + netpod::log::info!("\n\n+++++++++++++++++++ MAKE NEW RunningArchappHost\n\n"); + let cluster = netpod::archapp_test_cluster(); + let jhs = spawn_test_hosts(cluster.clone()); + let ret = RunningArchappHost { + cluster: cluster.clone(), + _jhs: jhs, + }; + let a = Arc::new(ret); + *g = Some(a.clone()); + Ok(a) + } + Some(gg) => { + netpod::log::debug!("\n\n+++++++++++++++++++ REUSE RunningArchappHost\n\n"); + Ok(gg.clone()) + } + } +} diff --git a/daqbufp2/src/test.rs b/daqbufp2/src/test.rs index cf05573..4845b04 100644 --- a/daqbufp2/src/test.rs +++ b/daqbufp2/src/test.rs @@ -1,3 +1,4 @@ +pub mod archapp; pub mod binnedbinary; pub mod binnedjson; pub mod events; diff --git a/daqbufp2/src/test/archapp.rs b/daqbufp2/src/test/archapp.rs new file mode 100644 index 0000000..60cacf9 --- /dev/null +++ b/daqbufp2/src/test/archapp.rs @@ -0,0 +1,96 @@ +use super::binnedjson::ScalarEventsResponse; +use super::events::get_plain_events_json; +use crate::nodes::require_archapp_test_host_running; +use err::Error; +use netpod::{f64_close, log::*}; + +#[test] +fn get_events_1() -> Result<(), Error> { + let fut = async { + let rh = require_archapp_test_host_running()?; + let cluster = &rh.cluster; + let res = get_plain_events_json( + "SARUN16-MQUA080:X", + "2021-01-04T00:00:00Z", + "2021-01-30T00:00:00Z", + cluster, + true, + 4, + ) + .await?; + let res: ScalarEventsResponse = serde_json::from_value(res)?; + info!("RESULT: {res:?}"); + let ts_anchor = 1609763681; + let ts_ms = vec![ + 617, 2569805, 2936041, 3010344, 3049906, 3708678, 5909539, 6477893, 6610677, 6758112, 71757772, 786724766, + 1308470149, 1890757180, 1915078958, 1915194844, 1915194947, 1915362469, 1915362571, 1915417056, 1915465737, + 1915520190, 1915520293, 1915571058, 1915805484, 1915805589, 1915965029, 1915965133, 1916031220, 1916031324, + 1916082787, 1916082889, 1916157130, 1916157233, 1916345254, 1916345356, 1916488147, 1916513221, 1916620067, + 1916620173, 1916672379, 1916693598, 1916723207, 1916723309, 1916745319, 1916745420, 1916775502, 1916775609, + ]; + let ts_ns = vec![ + 584454, 368902, 427972, 160693, 58866, 902958, 192718, 479215, 450894, 681257, 19499, 84254, 273548, + 721894, 78541, 169037, 501222, 573798, 341840, 736887, 939637, 906430, 566278, 630241, 189349, 565614, + 447258, 899381, 129461, 21285, 901927, 791954, 915058, 435737, 379707, 850017, 251317, 283772, 869783, + 687797, 556662, 527206, 790635, 502581, 307019, 218006, 121460, 750763, + ]; + let values = vec![ + -0.25704250552462327, + -0.25704250552462327, + -0.25704250552462327, + -0.25704250552462327, + -0.25704250552462327, + -0.25704250552462327, + -0.25704250552462327, + -0.25704250552462327, + -0.25704250552462327, + -0.25704250552462327, + -0.25704250552462327, + -0.25704250552462327, + -0.25704250552462327, + -0.25704250552462327, + -0.25704250552462327, + -0.22139999999990323, + -0.22139999999990323, + -0.20710000000008225, + -0.20710000000008225, + -0.20714250552464364, + -0.20704250552444137, + -0.1999999999998181, + -0.1999999999998181, + -0.2001425055245818, + -0.1999999999998181, + -0.1999999999998181, + -0.1950000000001637, + -0.1950000000001637, + -0.20499999999992724, + -0.20499999999992724, + -0.2100000000000364, + -0.2100000000000364, + -0.2199999999997999, + -0.2199999999997999, + -0.2300000000000182, + -0.2300000000000182, + -0.22994250552437737, + -0.22994250552437737, + -0.2300000000000182, + -0.2300000000000182, + -0.2300425055245796, + -0.22994250552437737, + -0.2157000000001972, + -0.2157000000001972, + -0.2015000000001237, + -0.2015000000001237, + -0.2015000000001237, + -0.2015000000001237, + ]; + assert_eq!(res.ts_anchor, ts_anchor); + assert_eq!(&res.ts_ms, &ts_ms); + assert_eq!(&res.ts_ns, &ts_ns); + for (_i, (&a, &b)) in res.values.iter().zip(values.iter()).enumerate() { + assert!(f64_close(a, b)); + } + Ok(()) + }; + taskrun::run(fut) +} diff --git a/daqbufp2/src/test/binnedjson.rs b/daqbufp2/src/test/binnedjson.rs index 4692167..c205bd3 100644 --- a/daqbufp2/src/test/binnedjson.rs +++ b/daqbufp2/src/test/binnedjson.rs @@ -253,21 +253,22 @@ async fn get_binned_json_common( Ok(()) } +// TODO reuse the types from server. #[derive(Debug, Serialize, Deserialize)] -struct ScalarEventsResponse { +pub struct ScalarEventsResponse { #[serde(rename = "tsAnchor")] - ts_anchor: u64, + pub ts_anchor: u64, #[serde(rename = "tsMs")] - ts_ms: Vec, + pub ts_ms: Vec, #[serde(rename = "tsNs")] - ts_ns: Vec, - values: Vec, + pub ts_ns: Vec, + pub values: Vec, #[serde(rename = "finalisedRange", default = "bool_false")] - finalised_range: bool, + pub finalised_range: bool, } #[derive(Debug, Serialize, Deserialize)] -struct WaveEventsResponse { +pub struct WaveEventsResponse { #[serde(rename = "tsAnchor")] ts_anchor: u64, #[serde(rename = "tsMs")] diff --git a/daqbufp2/src/test/binnedjson/channelarchiver.rs b/daqbufp2/src/test/binnedjson/channelarchiver.rs index 05b359a..b1e8b49 100644 --- a/daqbufp2/src/test/binnedjson/channelarchiver.rs +++ b/daqbufp2/src/test/binnedjson/channelarchiver.rs @@ -36,6 +36,7 @@ fn get_scalar_2_events() -> Result<(), Error> { assert_eq!(res.ts_anchor, 1636502401); assert_eq!(&res.ts_ms, &ts_ms); assert_eq!(&res.ts_ns, &ts_ns); + assert_eq!(res.finalised_range, true); Ok(()) }; taskrun::run(fut) @@ -87,6 +88,7 @@ fn get_wave_1_events() -> Result<(), Error> { assert_eq!(res.ts_anchor, 1636416014); assert_eq!(&res.ts_ms, &ts_ms); assert_eq!(&res.ts_ns, &ts_ns); + assert_eq!(res.finalised_range, true); assert_eq!(res.values.len(), 24); assert_eq!(res.values[0].len(), 480); assert_eq!(res.values[1].len(), 480); diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs index 0a8167e..82e96eb 100644 --- a/daqbufp2/src/test/events.rs +++ b/daqbufp2/src/test/events.rs @@ -255,14 +255,15 @@ async fn get_plain_events_json_1_inner() -> Result<(), Error> { Ok(()) } -async fn get_plain_events_json( +// TODO improve by a more information-rich return type. +pub async fn get_plain_events_json( channel_name: &str, beg_date: &str, end_date: &str, cluster: &Cluster, _expect_range_complete: bool, _expect_event_count: u64, -) -> Result<(), Error> { +) -> Result { let t1 = Utc::now(); let node0 = &cluster.nodes[0]; let beg_date: DateTime = beg_date.parse()?; @@ -292,11 +293,11 @@ async fn get_plain_events_json( } let buf = hyper::body::to_bytes(res.into_body()).await.ec()?; let s = String::from_utf8_lossy(&buf); - let _res: JsonValue = serde_json::from_str(&s)?; + let res: JsonValue = serde_json::from_str(&s)?; // TODO assert more let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; // TODO add timeout debug!("time {} ms", ms); - Ok(()) + Ok(res) } diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 4d04573..b2cfaea 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -184,6 +184,7 @@ impl ScalarType { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ArchiverAppliance { pub data_base_paths: Vec, + pub database: Database, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -1501,9 +1502,9 @@ pub fn sls_test_cluster() -> Cluster { listen: "0.0.0.0".into(), port: 6190 + id as u16, port_raw: 6190 + id as u16 + 100, - data_base_path: format!("NOdatapath{}", id).into(), + data_base_path: "UNUSED".into(), cache_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)), - ksprefix: "NOKS".into(), + ksprefix: "UNUSED".into(), backend: "sls-archive".into(), splits: None, archiver_appliance: None, @@ -1532,6 +1533,45 @@ pub fn sls_test_cluster() -> Cluster { } } +pub fn archapp_test_cluster() -> Cluster { + let nodes = (0..1) + .into_iter() + .map(|id| Node { + host: "localhost".into(), + listen: "0.0.0.0".into(), + port: 6200 + id as u16, + port_raw: 6200 + id as u16 + 100, + data_base_path: "UNUSED".into(), + cache_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)), + ksprefix: "UNUSED".into(), + backend: "sf-archive".into(), + splits: None, + channel_archiver: None, + archiver_appliance: Some(ArchiverAppliance { + data_base_paths: vec![test_data_base_path_archiver_appliance()], + database: Database { + host: "localhost".into(), + name: "testingdaq".into(), + user: "testingdaq".into(), + pass: "testingdaq".into(), + }, + }), + }) + .collect(); + Cluster { + nodes, + database: Database { + host: "localhost".into(), + name: "testingdaq".into(), + user: "testingdaq".into(), + pass: "testingdaq".into(), + }, + run_map_pulse_task: false, + is_central_storage: false, + file_io_buffer_size: Default::default(), + } +} + pub fn test_data_base_path_databuffer() -> PathBuf { let homedir = std::env::var("HOME").unwrap(); let data_base_path = PathBuf::from(homedir).join("daqbuffer-testdata").join("databuffer"); @@ -1546,3 +1586,13 @@ pub fn test_data_base_path_channel_archiver_sls() -> PathBuf { .join("gfa03"); data_base_path } + +pub fn test_data_base_path_archiver_appliance() -> PathBuf { + let homedir = std::env::var("HOME").unwrap(); + let data_base_path = PathBuf::from(homedir) + .join("daqbuffer-testdata") + .join("archappdata") + .join("lts") + .join("ArchiverStore"); + data_base_path +}