Adapt binnedjson case

This commit is contained in:
Dominik Werder
2021-11-11 23:53:07 +01:00
parent ceb995f8ca
commit 8ce786d304
11 changed files with 246 additions and 40 deletions

View File

@@ -15,8 +15,20 @@ impl Drop for RunningHosts {
}
}
pub struct RunningSlsHost {
pub cluster: Cluster,
_jhs: Vec<JoinHandle<Result<(), Error>>>,
}
impl Drop for RunningSlsHost {
fn drop(&mut self) {
netpod::log::info!("\n\n+++++++++++++++++++ impl Drop for RunningSlsHost\n\n");
}
}
lazy_static::lazy_static! {
static ref HOSTS_RUNNING: Mutex<Option<Arc<RunningHosts>>> = Mutex::new(None);
static ref SLS_HOST_RUNNING: Mutex<Option<Arc<RunningSlsHost>>> = Mutex::new(None);
}
pub fn require_test_hosts_running() -> Result<Arc<RunningHosts>, Error> {
@@ -40,3 +52,25 @@ pub fn require_test_hosts_running() -> Result<Arc<RunningHosts>, Error> {
}
}
}
pub fn require_sls_test_host_running() -> Result<Arc<RunningSlsHost>, Error> {
let mut g = SLS_HOST_RUNNING.lock().unwrap();
match g.as_ref() {
None => {
netpod::log::info!("\n\n+++++++++++++++++++ MAKE NEW RunningSlsHost\n\n");
let cluster = taskrun::sls_test_cluster();
let jhs = spawn_test_hosts(cluster.clone());
let ret = RunningSlsHost {
cluster: cluster.clone(),
_jhs: jhs,
};
let a = Arc::new(ret);
*g = Some(a.clone());
Ok(a)
}
Some(gg) => {
netpod::log::debug!("\n\n+++++++++++++++++++ REUSE RunningSlsHost\n\n");
Ok(gg.clone())
}
}
}

View File

@@ -1,4 +1,4 @@
use crate::nodes::require_test_hosts_running;
use crate::nodes::{require_sls_test_host_running, require_test_hosts_running};
use chrono::{DateTime, Utc};
use err::Error;
use http::StatusCode;
@@ -6,6 +6,7 @@ use hyper::Body;
use netpod::query::{BinnedQuery, CacheUsage};
use netpod::{log::*, AppendToUrl};
use netpod::{AggKind, Channel, Cluster, NanoRange, APP_JSON};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use url::Url;
@@ -72,6 +73,28 @@ async fn get_binned_json_2_inner() -> Result<(), Error> {
.await
}
#[test]
fn get_sls_archive_1() -> Result<(), Error> {
let fut = async move {
let rh = require_sls_test_host_running()?;
let cluster = &rh.cluster;
let channel = Channel {
backend: "sls-archive".into(),
name: "ABOMA-CH-6G:U-DCLINK".into(),
};
let begstr = "2021-10-20T22:00:00Z";
let endstr = "2021-11-12T00:00:00Z";
let res = get_binned_json_common_res(channel, begstr, endstr, 10, AggKind::TimeWeightedScalar, cluster).await?;
assert_eq!(res.finalised_range, true);
assert_eq!(res.ts_anchor, 1634688000);
assert!((res.avgs[3].unwrap() - 1.01470947265625).abs() < 1e-4);
assert!((res.avgs[4].unwrap() - 24.06792449951172).abs() < 1e-4);
assert!((res.avgs[11].unwrap() - 0.00274658203125).abs() < 1e-4);
Ok(())
};
taskrun::run(fut)
}
async fn get_binned_json_common(
channel_name: &str,
beg_date: &str,
@@ -149,3 +172,62 @@ async fn get_binned_json_common(
}
Ok(())
}
#[derive(Debug, Serialize, Deserialize)]
struct BinnedResponse {
#[serde(rename = "tsAnchor")]
ts_anchor: u64,
#[serde(rename = "tsMs")]
ts_ms: Vec<u64>,
#[serde(rename = "tsNs")]
ts_ns: Vec<u64>,
mins: Vec<Option<f64>>,
maxs: Vec<Option<f64>>,
avgs: Vec<Option<f64>>,
counts: Vec<u64>,
#[serde(rename = "finalisedRange", default = "bool_false")]
finalised_range: bool,
}
fn bool_false() -> bool {
false
}
async fn get_binned_json_common_res(
channel: Channel,
beg_date: &str,
end_date: &str,
bin_count: u32,
agg_kind: AggKind,
cluster: &Cluster,
) -> Result<BinnedResponse, Error> {
let t1 = Utc::now();
let node0 = &cluster.nodes[0];
let beg_date: DateTime<Utc> = beg_date.parse()?;
let end_date: DateTime<Utc> = end_date.parse()?;
let range = NanoRange::from_date_time(beg_date, end_date);
let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind);
query.set_timeout(Duration::from_millis(15000));
query.set_cache_usage(CacheUsage::Ignore);
let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", node0.host, node0.port))?;
query.append_to_url(&mut url);
let url = url;
debug!("get_binned_json_common get {}", url);
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri(url.to_string())
.header(http::header::ACCEPT, APP_JSON)
.body(Body::empty())?;
let client = hyper::Client::new();
let res = client.request(req).await?;
if res.status() != StatusCode::OK {
error!("get_binned_json_common client response {:?}", res);
}
let res = hyper::body::to_bytes(res.into_body()).await?;
let t2 = chrono::Utc::now();
let _ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
let res = String::from_utf8_lossy(&res).to_string();
info!("GOT: {}", res);
let res: BinnedResponse = serde_json::from_str(res.as_str())?;
Ok(res)
}

View File

@@ -7,7 +7,7 @@ use crate::{
TimeBinnableTypeAggregator, WithLen, WithTimestamps,
};
use err::Error;
use netpod::timeunits::*;
use netpod::log::*;
use netpod::NanoRange;
use serde::{Deserialize, Serialize};
use std::fmt;
@@ -194,11 +194,11 @@ where
type Output = MinMaxAvgBins<NTY>;
type Aggregator = EventValuesAggregator<NTY>;
fn aggregator(range: NanoRange, _bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
// TODO remove output
if range.delta() > SEC * 5000 {
netpod::log::info!("TimeBinnableType for EventValues aggregator() range {:?}", range);
}
fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
debug!(
"TimeBinnableType for EventValues aggregator() range {:?} x_bin_count {} do_time_weight {}",
range, x_bin_count, do_time_weight
);
Self::Aggregator::new(range, do_time_weight)
}
}
@@ -248,6 +248,8 @@ where
type Output = EventValuesCollectorOutput<NTY>;
fn ingest(&mut self, src: &Self::Input) {
// TODO should be able to remove this
err::todo();
self.vals.append(src);
}
@@ -260,6 +262,8 @@ where
}
fn result(self) -> Result<Self::Output, Error> {
// TODO should be able to remove this
err::todo();
let tst = ts_offs_from_abs(&self.vals.tss);
let ret = Self::Output {
ts_anchor_sec: tst.0,
@@ -350,8 +354,9 @@ where
}
}
fn apply_event_time_weight(&mut self, ts: u64, val: Option<NTY>) {
fn apply_event_time_weight(&mut self, ts: u64) {
if let Some(v) = self.last_val {
debug!("apply_event_time_weight");
self.apply_min_max(v);
let w = if self.do_time_weight {
(ts - self.int_ts) as f32 * 1e-9
@@ -365,9 +370,12 @@ where
self.sumc += 1;
}
self.int_ts = ts;
} else {
debug!(
"apply_event_time_weight NO VALUE {}",
ts as i64 - self.range.beg as i64
);
}
self.last_ts = ts;
self.last_val = val;
}
fn ingest_unweight(&mut self, item: &<Self as TimeBinnableTypeAggregator>::Input) {
@@ -377,8 +385,8 @@ where
if ts < self.range.beg {
} else if ts >= self.range.end {
} else {
self.count += 1;
self.apply_event_unweight(val);
self.count += 1;
}
}
}
@@ -388,13 +396,18 @@ where
let ts = item.tss[i1];
let val = item.values[i1];
if ts < self.int_ts {
debug!("just set int_ts");
self.last_ts = ts;
self.last_val = Some(val);
} else if ts >= self.range.end {
debug!("after range");
return;
} else {
debug!("regular");
self.apply_event_time_weight(ts);
self.count += 1;
self.apply_event_time_weight(ts, Some(val));
self.last_ts = ts;
self.last_val = Some(val);
}
}
}
@@ -413,6 +426,7 @@ where
maxs: vec![self.max],
avgs: vec![avg],
};
self.int_ts = range.beg;
self.range = range;
self.count = 0;
self.min = None;
@@ -423,8 +437,12 @@ where
}
fn result_reset_time_weight(&mut self, range: NanoRange, expand: bool) -> MinMaxAvgBins<NTY> {
if expand {
self.apply_event_time_weight(self.range.end, self.last_val);
// TODO check callsite for correct expand status.
if true || expand {
debug!("result_reset_time_weight calls apply_event_time_weight");
self.apply_event_time_weight(self.range.end);
} else {
debug!("result_reset_time_weight NO EXPAND");
}
let avg = {
let sc = self.range.delta() as f32 * 1e-9;
@@ -438,6 +456,7 @@ where
maxs: vec![self.max],
avgs: vec![avg],
};
self.int_ts = range.beg;
self.range = range;
self.count = 0;
self.min = None;
@@ -460,6 +479,7 @@ where
}
fn ingest(&mut self, item: &Self::Input) {
debug!("ingest len {}", item.len());
if self.do_time_weight {
self.ingest_time_weight(item)
} else {
@@ -468,6 +488,7 @@ where
}
fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output {
debug!("Produce for {:?} next {:?}", self.range, range);
if self.do_time_weight {
self.result_reset_time_weight(range, expand)
} else {

View File

@@ -7,6 +7,7 @@ use crate::{
};
use chrono::{TimeZone, Utc};
use err::Error;
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::NanoRange;
use num_traits::Zero;
@@ -180,7 +181,11 @@ where
type Output = MinMaxAvgBins<NTY>;
type Aggregator = MinMaxAvgBinsAggregator<NTY>;
fn aggregator(range: NanoRange, _x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
debug!(
"TimeBinnableType for XBinnedScalarEvents aggregator() range {:?} x_bin_count {} do_time_weight {}",
range, x_bin_count, do_time_weight
);
Self::Aggregator::new(range, do_time_weight)
}
}

View File

@@ -182,6 +182,10 @@ where
type Aggregator = MinMaxAvgDim1BinsAggregator<NTY>;
fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
debug!(
"TimeBinnableType for MinMaxAvgDim1Bins aggregator() range {:?} x_bin_count {} do_time_weight {}",
range, x_bin_count, do_time_weight
);
Self::Aggregator::new(range, x_bin_count, do_time_weight)
}
}

View File

@@ -7,6 +7,7 @@ use crate::{
};
use chrono::{TimeZone, Utc};
use err::Error;
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::NanoRange;
use num_traits::Zero;
@@ -180,6 +181,10 @@ where
type Aggregator = MinMaxAvgWaveBinsAggregator<NTY>;
fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
debug!(
"TimeBinnableType for MinMaxAvgWaveBins aggregator() range {:?} x_bin_count {} do_time_weight {}",
range, x_bin_count, do_time_weight
);
Self::Aggregator::new(range, x_bin_count, do_time_weight)
}
}

View File

@@ -179,8 +179,12 @@ where
type Output = MinMaxAvgDim1Bins<NTY>;
type Aggregator = WaveEventsAggregator<NTY>;
fn aggregator(range: NanoRange, bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
Self::Aggregator::new(range, bin_count, do_time_weight)
fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
debug!(
"TimeBinnableType for WaveEvents aggregator() range {:?} x_bin_count {} do_time_weight {}",
range, x_bin_count, do_time_weight
);
Self::Aggregator::new(range, x_bin_count, do_time_weight)
}
}

View File

@@ -7,7 +7,7 @@ use crate::{
TimeBinnableTypeAggregator, WithLen, WithTimestamps,
};
use err::Error;
use netpod::timeunits::SEC;
use netpod::log::*;
use netpod::NanoRange;
use serde::{Deserialize, Serialize};
use tokio::fs::File;
@@ -176,14 +176,11 @@ where
type Output = MinMaxAvgBins<NTY>;
type Aggregator = XBinnedScalarEventsAggregator<NTY>;
fn aggregator(range: NanoRange, _x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
// TODO remove output
if range.delta() > SEC * 0 {
netpod::log::debug!(
"TimeBinnableType for XBinnedScalarEvents aggregator() range {:?}",
range
);
}
fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
debug!(
"TimeBinnableType for XBinnedScalarEvents aggregator() range {:?} x_bin_count {} do_time_weight {}",
range, x_bin_count, do_time_weight
);
Self::Aggregator::new(range, do_time_weight)
}
}
@@ -252,6 +249,7 @@ where
}
fn apply_event_unweight(&mut self, avg: f32, min: NTY, max: NTY) {
debug!("apply_event_unweight");
self.apply_min_max(min, max);
let vf = avg;
if vf.is_nan() {
@@ -261,9 +259,10 @@ where
}
}
fn apply_event_time_weight(&mut self, ts: u64, avg: Option<f32>, min: Option<NTY>, max: Option<NTY>) {
if let Some(v) = self.last_avg {
self.apply_min_max(min.unwrap(), max.unwrap());
fn apply_event_time_weight(&mut self, ts: u64) {
debug!("apply_event_time_weight");
if let (Some(v), Some(min), Some(max)) = (self.last_avg, self.last_min, self.last_max) {
self.apply_min_max(min, max);
let w = if self.do_time_weight {
(ts - self.int_ts) as f32 * 1e-9
} else {
@@ -277,10 +276,6 @@ where
}
self.int_ts = ts;
}
self.last_ts = ts;
self.last_avg = avg;
self.last_min = min;
self.last_max = max;
}
fn ingest_unweight(&mut self, item: &XBinnedScalarEvents<NTY>) {
@@ -292,8 +287,8 @@ where
if ts < self.range.beg {
} else if ts >= self.range.end {
} else {
self.count += 1;
self.apply_event_unweight(avg, min, max);
self.count += 1;
}
}
}
@@ -312,8 +307,12 @@ where
} else if ts >= self.range.end {
return;
} else {
self.apply_event_time_weight(ts);
self.count += 1;
self.apply_event_time_weight(ts, Some(avg), Some(min), Some(max));
self.last_ts = ts;
self.last_avg = Some(avg);
self.last_min = Some(min);
self.last_max = Some(max);
}
}
}
@@ -332,6 +331,7 @@ where
maxs: vec![self.max],
avgs: vec![avg],
};
self.int_ts = range.beg;
self.range = range;
self.count = 0;
self.min = None;
@@ -342,8 +342,9 @@ where
}
fn result_reset_time_weight(&mut self, range: NanoRange, expand: bool) -> MinMaxAvgBins<NTY> {
if expand {
self.apply_event_time_weight(self.range.end, self.last_avg, self.last_min, self.last_max);
// TODO check callsite for correct expand status.
if true || expand {
self.apply_event_time_weight(self.range.end);
}
let avg = {
let sc = self.range.delta() as f32 * 1e-9;
@@ -357,6 +358,7 @@ where
maxs: vec![self.max],
avgs: vec![avg],
};
self.int_ts = range.beg;
self.range = range;
self.count = 0;
self.min = None;
@@ -379,6 +381,7 @@ where
}
fn ingest(&mut self, item: &Self::Input) {
debug!("ingest");
if self.do_time_weight {
self.ingest_time_weight(item)
} else {

View File

@@ -178,8 +178,12 @@ where
type Output = MinMaxAvgWaveBins<NTY>;
type Aggregator = XBinnedWaveEventsAggregator<NTY>;
fn aggregator(range: NanoRange, bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
Self::Aggregator::new(range, bin_count, do_time_weight)
fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
debug!(
"TimeBinnableType for XBinnedWaveEvents aggregator() range {:?} x_bin_count {} do_time_weight {}",
range, x_bin_count, do_time_weight
);
Self::Aggregator::new(range, x_bin_count, do_time_weight)
}
}

View File

@@ -115,7 +115,7 @@ async fn events_conn_handler_inner_try(
return Err((Error::with_msg("json parse error"), netout))?;
}
};
debug!("--- nodenet::conn got query -------------------\nevq {:?}", evq);
debug!("--- got query evq {:?}", evq);
let mut p1: Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>> =
if let Some(aa) = &node_config.node.channel_archiver {

View File

@@ -2,8 +2,10 @@ pub mod append;
use crate::log::*;
use err::Error;
use netpod::ChannelArchiver;
use std::future::Future;
use std::panic;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;
@@ -102,7 +104,10 @@ pub fn tracing_init() {
"archapp::archeng::pipe=info",
"archapp::storagemerge=info",
"streams::rangefilter=info",
"items::eventvalues=debug",
"items::xbinnedscalarevents=debug",
"disk::binned=info",
"nodenet::conn=debug",
"daqbuffer::test=info",
]
.join(","),
@@ -151,3 +156,42 @@ pub fn test_cluster() -> netpod::Cluster {
file_io_buffer_size: Default::default(),
}
}
pub fn sls_test_cluster() -> netpod::Cluster {
let nodes = (0..1)
.into_iter()
.map(|id| netpod::Node {
host: "localhost".into(),
listen: "0.0.0.0".into(),
port: 8362 + id as u16,
port_raw: 8362 + id as u16 + 100,
data_base_path: format!("NOdatapath{}", id).into(),
cache_base_path: format!("../tmpdata/node{:02}", id).into(),
ksprefix: "NOKS".into(),
backend: "sls-archive".into(),
splits: None,
archiver_appliance: None,
channel_archiver: Some(ChannelArchiver {
data_base_paths: vec![PathBuf::from("/data/daqbuffer-testdata/sls/gfa03")],
database: netpod::Database {
host: "localhost".into(),
name: "testingdaq".into(),
user: "testingdaq".into(),
pass: "testingdaq".into(),
},
}),
})
.collect();
netpod::Cluster {
nodes,
database: netpod::Database {
host: "localhost".into(),
name: "testingdaq".into(),
user: "testingdaq".into(),
pass: "testingdaq".into(),
},
run_map_pulse_task: false,
is_central_storage: false,
file_io_buffer_size: Default::default(),
}
}