Add json retrieve test for archiver appliance
This commit is contained in:
@@ -715,7 +715,9 @@ pub async fn channel_info(channel: &Channel, aa: &ArchiverAppliance) -> Result<C
|
|||||||
msgs.push(format!("path: {}", dir.to_string_lossy()));
|
msgs.push(format!("path: {}", dir.to_string_lossy()));
|
||||||
let mut scalar_type = None;
|
let mut scalar_type = None;
|
||||||
let mut shape = None;
|
let mut shape = None;
|
||||||
let mut rd = read_dir(&dir).await?;
|
let mut rd = read_dir(&dir)
|
||||||
|
.await
|
||||||
|
.map_err(|e| Error::with_msg(format!("Can not open directory {dir:?} {e:?}")))?;
|
||||||
while let Some(de) = rd.next_entry().await? {
|
while let Some(de) = rd.next_entry().await? {
|
||||||
let s = de.file_name().to_string_lossy().into_owned();
|
let s = de.file_name().to_string_lossy().into_owned();
|
||||||
if s.starts_with(&prefix) && s.ends_with(".pb") {
|
if s.starts_with(&prefix) && s.ends_with(".pb") {
|
||||||
|
|||||||
@@ -26,9 +26,21 @@ impl Drop for RunningSlsHost {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct RunningArchappHost {
|
||||||
|
pub cluster: Cluster,
|
||||||
|
_jhs: Vec<JoinHandle<Result<(), Error>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for RunningArchappHost {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
netpod::log::info!("\n\n+++++++++++++++++++ impl Drop for RunningArchappHost\n\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
lazy_static::lazy_static! {
|
lazy_static::lazy_static! {
|
||||||
static ref HOSTS_RUNNING: Mutex<Option<Arc<RunningHosts>>> = Mutex::new(None);
|
static ref HOSTS_RUNNING: Mutex<Option<Arc<RunningHosts>>> = Mutex::new(None);
|
||||||
static ref SLS_HOST_RUNNING: Mutex<Option<Arc<RunningSlsHost>>> = Mutex::new(None);
|
static ref SLS_HOST_RUNNING: Mutex<Option<Arc<RunningSlsHost>>> = Mutex::new(None);
|
||||||
|
static ref ARCHAPP_HOST_RUNNING: Mutex<Option<Arc<RunningArchappHost>>> = Mutex::new(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn require_test_hosts_running() -> Result<Arc<RunningHosts>, Error> {
|
pub fn require_test_hosts_running() -> Result<Arc<RunningHosts>, Error> {
|
||||||
@@ -74,3 +86,25 @@ pub fn require_sls_test_host_running() -> Result<Arc<RunningSlsHost>, Error> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn require_archapp_test_host_running() -> Result<Arc<RunningArchappHost>, Error> {
|
||||||
|
let mut g = ARCHAPP_HOST_RUNNING.lock().unwrap();
|
||||||
|
match g.as_ref() {
|
||||||
|
None => {
|
||||||
|
netpod::log::info!("\n\n+++++++++++++++++++ MAKE NEW RunningArchappHost\n\n");
|
||||||
|
let cluster = netpod::archapp_test_cluster();
|
||||||
|
let jhs = spawn_test_hosts(cluster.clone());
|
||||||
|
let ret = RunningArchappHost {
|
||||||
|
cluster: cluster.clone(),
|
||||||
|
_jhs: jhs,
|
||||||
|
};
|
||||||
|
let a = Arc::new(ret);
|
||||||
|
*g = Some(a.clone());
|
||||||
|
Ok(a)
|
||||||
|
}
|
||||||
|
Some(gg) => {
|
||||||
|
netpod::log::debug!("\n\n+++++++++++++++++++ REUSE RunningArchappHost\n\n");
|
||||||
|
Ok(gg.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
pub mod archapp;
|
||||||
pub mod binnedbinary;
|
pub mod binnedbinary;
|
||||||
pub mod binnedjson;
|
pub mod binnedjson;
|
||||||
pub mod events;
|
pub mod events;
|
||||||
|
|||||||
@@ -0,0 +1,96 @@
|
|||||||
|
use super::binnedjson::ScalarEventsResponse;
|
||||||
|
use super::events::get_plain_events_json;
|
||||||
|
use crate::nodes::require_archapp_test_host_running;
|
||||||
|
use err::Error;
|
||||||
|
use netpod::{f64_close, log::*};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn get_events_1() -> Result<(), Error> {
|
||||||
|
let fut = async {
|
||||||
|
let rh = require_archapp_test_host_running()?;
|
||||||
|
let cluster = &rh.cluster;
|
||||||
|
let res = get_plain_events_json(
|
||||||
|
"SARUN16-MQUA080:X",
|
||||||
|
"2021-01-04T00:00:00Z",
|
||||||
|
"2021-01-30T00:00:00Z",
|
||||||
|
cluster,
|
||||||
|
true,
|
||||||
|
4,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
let res: ScalarEventsResponse = serde_json::from_value(res)?;
|
||||||
|
info!("RESULT: {res:?}");
|
||||||
|
let ts_anchor = 1609763681;
|
||||||
|
let ts_ms = vec![
|
||||||
|
617, 2569805, 2936041, 3010344, 3049906, 3708678, 5909539, 6477893, 6610677, 6758112, 71757772, 786724766,
|
||||||
|
1308470149, 1890757180, 1915078958, 1915194844, 1915194947, 1915362469, 1915362571, 1915417056, 1915465737,
|
||||||
|
1915520190, 1915520293, 1915571058, 1915805484, 1915805589, 1915965029, 1915965133, 1916031220, 1916031324,
|
||||||
|
1916082787, 1916082889, 1916157130, 1916157233, 1916345254, 1916345356, 1916488147, 1916513221, 1916620067,
|
||||||
|
1916620173, 1916672379, 1916693598, 1916723207, 1916723309, 1916745319, 1916745420, 1916775502, 1916775609,
|
||||||
|
];
|
||||||
|
let ts_ns = vec![
|
||||||
|
584454, 368902, 427972, 160693, 58866, 902958, 192718, 479215, 450894, 681257, 19499, 84254, 273548,
|
||||||
|
721894, 78541, 169037, 501222, 573798, 341840, 736887, 939637, 906430, 566278, 630241, 189349, 565614,
|
||||||
|
447258, 899381, 129461, 21285, 901927, 791954, 915058, 435737, 379707, 850017, 251317, 283772, 869783,
|
||||||
|
687797, 556662, 527206, 790635, 502581, 307019, 218006, 121460, 750763,
|
||||||
|
];
|
||||||
|
let values = vec![
|
||||||
|
-0.25704250552462327,
|
||||||
|
-0.25704250552462327,
|
||||||
|
-0.25704250552462327,
|
||||||
|
-0.25704250552462327,
|
||||||
|
-0.25704250552462327,
|
||||||
|
-0.25704250552462327,
|
||||||
|
-0.25704250552462327,
|
||||||
|
-0.25704250552462327,
|
||||||
|
-0.25704250552462327,
|
||||||
|
-0.25704250552462327,
|
||||||
|
-0.25704250552462327,
|
||||||
|
-0.25704250552462327,
|
||||||
|
-0.25704250552462327,
|
||||||
|
-0.25704250552462327,
|
||||||
|
-0.25704250552462327,
|
||||||
|
-0.22139999999990323,
|
||||||
|
-0.22139999999990323,
|
||||||
|
-0.20710000000008225,
|
||||||
|
-0.20710000000008225,
|
||||||
|
-0.20714250552464364,
|
||||||
|
-0.20704250552444137,
|
||||||
|
-0.1999999999998181,
|
||||||
|
-0.1999999999998181,
|
||||||
|
-0.2001425055245818,
|
||||||
|
-0.1999999999998181,
|
||||||
|
-0.1999999999998181,
|
||||||
|
-0.1950000000001637,
|
||||||
|
-0.1950000000001637,
|
||||||
|
-0.20499999999992724,
|
||||||
|
-0.20499999999992724,
|
||||||
|
-0.2100000000000364,
|
||||||
|
-0.2100000000000364,
|
||||||
|
-0.2199999999997999,
|
||||||
|
-0.2199999999997999,
|
||||||
|
-0.2300000000000182,
|
||||||
|
-0.2300000000000182,
|
||||||
|
-0.22994250552437737,
|
||||||
|
-0.22994250552437737,
|
||||||
|
-0.2300000000000182,
|
||||||
|
-0.2300000000000182,
|
||||||
|
-0.2300425055245796,
|
||||||
|
-0.22994250552437737,
|
||||||
|
-0.2157000000001972,
|
||||||
|
-0.2157000000001972,
|
||||||
|
-0.2015000000001237,
|
||||||
|
-0.2015000000001237,
|
||||||
|
-0.2015000000001237,
|
||||||
|
-0.2015000000001237,
|
||||||
|
];
|
||||||
|
assert_eq!(res.ts_anchor, ts_anchor);
|
||||||
|
assert_eq!(&res.ts_ms, &ts_ms);
|
||||||
|
assert_eq!(&res.ts_ns, &ts_ns);
|
||||||
|
for (_i, (&a, &b)) in res.values.iter().zip(values.iter()).enumerate() {
|
||||||
|
assert!(f64_close(a, b));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
};
|
||||||
|
taskrun::run(fut)
|
||||||
|
}
|
||||||
@@ -253,21 +253,22 @@ async fn get_binned_json_common(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO reuse the types from server.
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
struct ScalarEventsResponse {
|
pub struct ScalarEventsResponse {
|
||||||
#[serde(rename = "tsAnchor")]
|
#[serde(rename = "tsAnchor")]
|
||||||
ts_anchor: u64,
|
pub ts_anchor: u64,
|
||||||
#[serde(rename = "tsMs")]
|
#[serde(rename = "tsMs")]
|
||||||
ts_ms: Vec<u64>,
|
pub ts_ms: Vec<u64>,
|
||||||
#[serde(rename = "tsNs")]
|
#[serde(rename = "tsNs")]
|
||||||
ts_ns: Vec<u64>,
|
pub ts_ns: Vec<u64>,
|
||||||
values: Vec<f64>,
|
pub values: Vec<f64>,
|
||||||
#[serde(rename = "finalisedRange", default = "bool_false")]
|
#[serde(rename = "finalisedRange", default = "bool_false")]
|
||||||
finalised_range: bool,
|
pub finalised_range: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
struct WaveEventsResponse {
|
pub struct WaveEventsResponse {
|
||||||
#[serde(rename = "tsAnchor")]
|
#[serde(rename = "tsAnchor")]
|
||||||
ts_anchor: u64,
|
ts_anchor: u64,
|
||||||
#[serde(rename = "tsMs")]
|
#[serde(rename = "tsMs")]
|
||||||
|
|||||||
@@ -36,6 +36,7 @@ fn get_scalar_2_events() -> Result<(), Error> {
|
|||||||
assert_eq!(res.ts_anchor, 1636502401);
|
assert_eq!(res.ts_anchor, 1636502401);
|
||||||
assert_eq!(&res.ts_ms, &ts_ms);
|
assert_eq!(&res.ts_ms, &ts_ms);
|
||||||
assert_eq!(&res.ts_ns, &ts_ns);
|
assert_eq!(&res.ts_ns, &ts_ns);
|
||||||
|
assert_eq!(res.finalised_range, true);
|
||||||
Ok(())
|
Ok(())
|
||||||
};
|
};
|
||||||
taskrun::run(fut)
|
taskrun::run(fut)
|
||||||
@@ -87,6 +88,7 @@ fn get_wave_1_events() -> Result<(), Error> {
|
|||||||
assert_eq!(res.ts_anchor, 1636416014);
|
assert_eq!(res.ts_anchor, 1636416014);
|
||||||
assert_eq!(&res.ts_ms, &ts_ms);
|
assert_eq!(&res.ts_ms, &ts_ms);
|
||||||
assert_eq!(&res.ts_ns, &ts_ns);
|
assert_eq!(&res.ts_ns, &ts_ns);
|
||||||
|
assert_eq!(res.finalised_range, true);
|
||||||
assert_eq!(res.values.len(), 24);
|
assert_eq!(res.values.len(), 24);
|
||||||
assert_eq!(res.values[0].len(), 480);
|
assert_eq!(res.values[0].len(), 480);
|
||||||
assert_eq!(res.values[1].len(), 480);
|
assert_eq!(res.values[1].len(), 480);
|
||||||
|
|||||||
@@ -255,14 +255,15 @@ async fn get_plain_events_json_1_inner() -> Result<(), Error> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_plain_events_json(
|
// TODO improve by a more information-rich return type.
|
||||||
|
pub async fn get_plain_events_json(
|
||||||
channel_name: &str,
|
channel_name: &str,
|
||||||
beg_date: &str,
|
beg_date: &str,
|
||||||
end_date: &str,
|
end_date: &str,
|
||||||
cluster: &Cluster,
|
cluster: &Cluster,
|
||||||
_expect_range_complete: bool,
|
_expect_range_complete: bool,
|
||||||
_expect_event_count: u64,
|
_expect_event_count: u64,
|
||||||
) -> Result<(), Error> {
|
) -> Result<JsonValue, Error> {
|
||||||
let t1 = Utc::now();
|
let t1 = Utc::now();
|
||||||
let node0 = &cluster.nodes[0];
|
let node0 = &cluster.nodes[0];
|
||||||
let beg_date: DateTime<Utc> = beg_date.parse()?;
|
let beg_date: DateTime<Utc> = beg_date.parse()?;
|
||||||
@@ -292,11 +293,11 @@ async fn get_plain_events_json(
|
|||||||
}
|
}
|
||||||
let buf = hyper::body::to_bytes(res.into_body()).await.ec()?;
|
let buf = hyper::body::to_bytes(res.into_body()).await.ec()?;
|
||||||
let s = String::from_utf8_lossy(&buf);
|
let s = String::from_utf8_lossy(&buf);
|
||||||
let _res: JsonValue = serde_json::from_str(&s)?;
|
let res: JsonValue = serde_json::from_str(&s)?;
|
||||||
// TODO assert more
|
// TODO assert more
|
||||||
let t2 = chrono::Utc::now();
|
let t2 = chrono::Utc::now();
|
||||||
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
|
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
|
||||||
// TODO add timeout
|
// TODO add timeout
|
||||||
debug!("time {} ms", ms);
|
debug!("time {} ms", ms);
|
||||||
Ok(())
|
Ok(res)
|
||||||
}
|
}
|
||||||
|
|||||||
+52
-2
@@ -184,6 +184,7 @@ impl ScalarType {
|
|||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct ArchiverAppliance {
|
pub struct ArchiverAppliance {
|
||||||
pub data_base_paths: Vec<PathBuf>,
|
pub data_base_paths: Vec<PathBuf>,
|
||||||
|
pub database: Database,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
@@ -1501,9 +1502,9 @@ pub fn sls_test_cluster() -> Cluster {
|
|||||||
listen: "0.0.0.0".into(),
|
listen: "0.0.0.0".into(),
|
||||||
port: 6190 + id as u16,
|
port: 6190 + id as u16,
|
||||||
port_raw: 6190 + id as u16 + 100,
|
port_raw: 6190 + id as u16 + 100,
|
||||||
data_base_path: format!("NOdatapath{}", id).into(),
|
data_base_path: "UNUSED".into(),
|
||||||
cache_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)),
|
cache_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)),
|
||||||
ksprefix: "NOKS".into(),
|
ksprefix: "UNUSED".into(),
|
||||||
backend: "sls-archive".into(),
|
backend: "sls-archive".into(),
|
||||||
splits: None,
|
splits: None,
|
||||||
archiver_appliance: None,
|
archiver_appliance: None,
|
||||||
@@ -1532,6 +1533,45 @@ pub fn sls_test_cluster() -> Cluster {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn archapp_test_cluster() -> Cluster {
|
||||||
|
let nodes = (0..1)
|
||||||
|
.into_iter()
|
||||||
|
.map(|id| Node {
|
||||||
|
host: "localhost".into(),
|
||||||
|
listen: "0.0.0.0".into(),
|
||||||
|
port: 6200 + id as u16,
|
||||||
|
port_raw: 6200 + id as u16 + 100,
|
||||||
|
data_base_path: "UNUSED".into(),
|
||||||
|
cache_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)),
|
||||||
|
ksprefix: "UNUSED".into(),
|
||||||
|
backend: "sf-archive".into(),
|
||||||
|
splits: None,
|
||||||
|
channel_archiver: None,
|
||||||
|
archiver_appliance: Some(ArchiverAppliance {
|
||||||
|
data_base_paths: vec![test_data_base_path_archiver_appliance()],
|
||||||
|
database: Database {
|
||||||
|
host: "localhost".into(),
|
||||||
|
name: "testingdaq".into(),
|
||||||
|
user: "testingdaq".into(),
|
||||||
|
pass: "testingdaq".into(),
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
Cluster {
|
||||||
|
nodes,
|
||||||
|
database: Database {
|
||||||
|
host: "localhost".into(),
|
||||||
|
name: "testingdaq".into(),
|
||||||
|
user: "testingdaq".into(),
|
||||||
|
pass: "testingdaq".into(),
|
||||||
|
},
|
||||||
|
run_map_pulse_task: false,
|
||||||
|
is_central_storage: false,
|
||||||
|
file_io_buffer_size: Default::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn test_data_base_path_databuffer() -> PathBuf {
|
pub fn test_data_base_path_databuffer() -> PathBuf {
|
||||||
let homedir = std::env::var("HOME").unwrap();
|
let homedir = std::env::var("HOME").unwrap();
|
||||||
let data_base_path = PathBuf::from(homedir).join("daqbuffer-testdata").join("databuffer");
|
let data_base_path = PathBuf::from(homedir).join("daqbuffer-testdata").join("databuffer");
|
||||||
@@ -1546,3 +1586,13 @@ pub fn test_data_base_path_channel_archiver_sls() -> PathBuf {
|
|||||||
.join("gfa03");
|
.join("gfa03");
|
||||||
data_base_path
|
data_base_path
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn test_data_base_path_archiver_appliance() -> PathBuf {
|
||||||
|
let homedir = std::env::var("HOME").unwrap();
|
||||||
|
let data_base_path = PathBuf::from(homedir)
|
||||||
|
.join("daqbuffer-testdata")
|
||||||
|
.join("archappdata")
|
||||||
|
.join("lts")
|
||||||
|
.join("ArchiverStore");
|
||||||
|
data_base_path
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user