diff --git a/daqbufp2/src/nodes.rs b/daqbufp2/src/nodes.rs index 2d1f320..3b81707 100644 --- a/daqbufp2/src/nodes.rs +++ b/daqbufp2/src/nodes.rs @@ -15,8 +15,20 @@ impl Drop for RunningHosts { } } +pub struct RunningSlsHost { + pub cluster: Cluster, + _jhs: Vec>>, +} + +impl Drop for RunningSlsHost { + fn drop(&mut self) { + netpod::log::info!("\n\n+++++++++++++++++++ impl Drop for RunningSlsHost\n\n"); + } +} + lazy_static::lazy_static! { static ref HOSTS_RUNNING: Mutex>> = Mutex::new(None); + static ref SLS_HOST_RUNNING: Mutex>> = Mutex::new(None); } pub fn require_test_hosts_running() -> Result, Error> { @@ -40,3 +52,25 @@ pub fn require_test_hosts_running() -> Result, Error> { } } } + +pub fn require_sls_test_host_running() -> Result, Error> { + let mut g = SLS_HOST_RUNNING.lock().unwrap(); + match g.as_ref() { + None => { + netpod::log::info!("\n\n+++++++++++++++++++ MAKE NEW RunningSlsHost\n\n"); + let cluster = taskrun::sls_test_cluster(); + let jhs = spawn_test_hosts(cluster.clone()); + let ret = RunningSlsHost { + cluster: cluster.clone(), + _jhs: jhs, + }; + let a = Arc::new(ret); + *g = Some(a.clone()); + Ok(a) + } + Some(gg) => { + netpod::log::debug!("\n\n+++++++++++++++++++ REUSE RunningSlsHost\n\n"); + Ok(gg.clone()) + } + } +} diff --git a/daqbufp2/src/test/binnedjson.rs b/daqbufp2/src/test/binnedjson.rs index 00adb16..0ffd96c 100644 --- a/daqbufp2/src/test/binnedjson.rs +++ b/daqbufp2/src/test/binnedjson.rs @@ -1,4 +1,4 @@ -use crate::nodes::require_test_hosts_running; +use crate::nodes::{require_sls_test_host_running, require_test_hosts_running}; use chrono::{DateTime, Utc}; use err::Error; use http::StatusCode; @@ -6,6 +6,7 @@ use hyper::Body; use netpod::query::{BinnedQuery, CacheUsage}; use netpod::{log::*, AppendToUrl}; use netpod::{AggKind, Channel, Cluster, NanoRange, APP_JSON}; +use serde::{Deserialize, Serialize}; use std::time::Duration; use url::Url; @@ -72,6 +73,28 @@ async fn get_binned_json_2_inner() -> Result<(), Error> { .await } +#[test] +fn get_sls_archive_1() -> Result<(), Error> { + let fut = async move { + let rh = require_sls_test_host_running()?; + let cluster = &rh.cluster; + let channel = Channel { + backend: "sls-archive".into(), + name: "ABOMA-CH-6G:U-DCLINK".into(), + }; + let begstr = "2021-10-20T22:00:00Z"; + let endstr = "2021-11-12T00:00:00Z"; + let res = get_binned_json_common_res(channel, begstr, endstr, 10, AggKind::TimeWeightedScalar, cluster).await?; + assert_eq!(res.finalised_range, true); + assert_eq!(res.ts_anchor, 1634688000); + assert!((res.avgs[3].unwrap() - 1.01470947265625).abs() < 1e-4); + assert!((res.avgs[4].unwrap() - 24.06792449951172).abs() < 1e-4); + assert!((res.avgs[11].unwrap() - 0.00274658203125).abs() < 1e-4); + Ok(()) + }; + taskrun::run(fut) +} + async fn get_binned_json_common( channel_name: &str, beg_date: &str, @@ -149,3 +172,62 @@ async fn get_binned_json_common( } Ok(()) } + +#[derive(Debug, Serialize, Deserialize)] +struct BinnedResponse { + #[serde(rename = "tsAnchor")] + ts_anchor: u64, + #[serde(rename = "tsMs")] + ts_ms: Vec, + #[serde(rename = "tsNs")] + ts_ns: Vec, + mins: Vec>, + maxs: Vec>, + avgs: Vec>, + counts: Vec, + #[serde(rename = "finalisedRange", default = "bool_false")] + finalised_range: bool, +} + +fn bool_false() -> bool { + false +} + +async fn get_binned_json_common_res( + channel: Channel, + beg_date: &str, + end_date: &str, + bin_count: u32, + agg_kind: AggKind, + cluster: &Cluster, +) -> Result { + let t1 = Utc::now(); + let node0 = &cluster.nodes[0]; + let beg_date: DateTime = beg_date.parse()?; + let end_date: DateTime = end_date.parse()?; + let range = NanoRange::from_date_time(beg_date, end_date); + let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind); + query.set_timeout(Duration::from_millis(15000)); + query.set_cache_usage(CacheUsage::Ignore); + let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", node0.host, node0.port))?; + query.append_to_url(&mut url); + let url = url; + debug!("get_binned_json_common get {}", url); + let req = hyper::Request::builder() + .method(http::Method::GET) + .uri(url.to_string()) + .header(http::header::ACCEPT, APP_JSON) + .body(Body::empty())?; + let client = hyper::Client::new(); + let res = client.request(req).await?; + if res.status() != StatusCode::OK { + error!("get_binned_json_common client response {:?}", res); + } + let res = hyper::body::to_bytes(res.into_body()).await?; + let t2 = chrono::Utc::now(); + let _ms = t2.signed_duration_since(t1).num_milliseconds() as u64; + let res = String::from_utf8_lossy(&res).to_string(); + info!("GOT: {}", res); + let res: BinnedResponse = serde_json::from_str(res.as_str())?; + Ok(res) +} diff --git a/items/src/eventvalues.rs b/items/src/eventvalues.rs index 5fdcaf4..b18befa 100644 --- a/items/src/eventvalues.rs +++ b/items/src/eventvalues.rs @@ -7,7 +7,7 @@ use crate::{ TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; use err::Error; -use netpod::timeunits::*; +use netpod::log::*; use netpod::NanoRange; use serde::{Deserialize, Serialize}; use std::fmt; @@ -194,11 +194,11 @@ where type Output = MinMaxAvgBins; type Aggregator = EventValuesAggregator; - fn aggregator(range: NanoRange, _bin_count: usize, do_time_weight: bool) -> Self::Aggregator { - // TODO remove output - if range.delta() > SEC * 5000 { - netpod::log::info!("TimeBinnableType for EventValues aggregator() range {:?}", range); - } + fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { + debug!( + "TimeBinnableType for EventValues aggregator() range {:?} x_bin_count {} do_time_weight {}", + range, x_bin_count, do_time_weight + ); Self::Aggregator::new(range, do_time_weight) } } @@ -248,6 +248,8 @@ where type Output = EventValuesCollectorOutput; fn ingest(&mut self, src: &Self::Input) { + // TODO should be able to remove this + err::todo(); self.vals.append(src); } @@ -260,6 +262,8 @@ where } fn result(self) -> Result { + // TODO should be able to remove this + err::todo(); let tst = ts_offs_from_abs(&self.vals.tss); let ret = Self::Output { ts_anchor_sec: tst.0, @@ -350,8 +354,9 @@ where } } - fn apply_event_time_weight(&mut self, ts: u64, val: Option) { + fn apply_event_time_weight(&mut self, ts: u64) { if let Some(v) = self.last_val { + debug!("apply_event_time_weight"); self.apply_min_max(v); let w = if self.do_time_weight { (ts - self.int_ts) as f32 * 1e-9 @@ -365,9 +370,12 @@ where self.sumc += 1; } self.int_ts = ts; + } else { + debug!( + "apply_event_time_weight NO VALUE {}", + ts as i64 - self.range.beg as i64 + ); } - self.last_ts = ts; - self.last_val = val; } fn ingest_unweight(&mut self, item: &::Input) { @@ -377,8 +385,8 @@ where if ts < self.range.beg { } else if ts >= self.range.end { } else { - self.count += 1; self.apply_event_unweight(val); + self.count += 1; } } } @@ -388,13 +396,18 @@ where let ts = item.tss[i1]; let val = item.values[i1]; if ts < self.int_ts { + debug!("just set int_ts"); self.last_ts = ts; self.last_val = Some(val); } else if ts >= self.range.end { + debug!("after range"); return; } else { + debug!("regular"); + self.apply_event_time_weight(ts); self.count += 1; - self.apply_event_time_weight(ts, Some(val)); + self.last_ts = ts; + self.last_val = Some(val); } } } @@ -413,6 +426,7 @@ where maxs: vec![self.max], avgs: vec![avg], }; + self.int_ts = range.beg; self.range = range; self.count = 0; self.min = None; @@ -423,8 +437,12 @@ where } fn result_reset_time_weight(&mut self, range: NanoRange, expand: bool) -> MinMaxAvgBins { - if expand { - self.apply_event_time_weight(self.range.end, self.last_val); + // TODO check callsite for correct expand status. + if true || expand { + debug!("result_reset_time_weight calls apply_event_time_weight"); + self.apply_event_time_weight(self.range.end); + } else { + debug!("result_reset_time_weight NO EXPAND"); } let avg = { let sc = self.range.delta() as f32 * 1e-9; @@ -438,6 +456,7 @@ where maxs: vec![self.max], avgs: vec![avg], }; + self.int_ts = range.beg; self.range = range; self.count = 0; self.min = None; @@ -460,6 +479,7 @@ where } fn ingest(&mut self, item: &Self::Input) { + debug!("ingest len {}", item.len()); if self.do_time_weight { self.ingest_time_weight(item) } else { @@ -468,6 +488,7 @@ where } fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output { + debug!("Produce for {:?} next {:?}", self.range, range); if self.do_time_weight { self.result_reset_time_weight(range, expand) } else { diff --git a/items/src/minmaxavgbins.rs b/items/src/minmaxavgbins.rs index 2c6d963..dd46d6f 100644 --- a/items/src/minmaxavgbins.rs +++ b/items/src/minmaxavgbins.rs @@ -7,6 +7,7 @@ use crate::{ }; use chrono::{TimeZone, Utc}; use err::Error; +use netpod::log::*; use netpod::timeunits::SEC; use netpod::NanoRange; use num_traits::Zero; @@ -180,7 +181,11 @@ where type Output = MinMaxAvgBins; type Aggregator = MinMaxAvgBinsAggregator; - fn aggregator(range: NanoRange, _x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { + fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { + debug!( + "TimeBinnableType for XBinnedScalarEvents aggregator() range {:?} x_bin_count {} do_time_weight {}", + range, x_bin_count, do_time_weight + ); Self::Aggregator::new(range, do_time_weight) } } diff --git a/items/src/minmaxavgdim1bins.rs b/items/src/minmaxavgdim1bins.rs index 72824e4..82d094e 100644 --- a/items/src/minmaxavgdim1bins.rs +++ b/items/src/minmaxavgdim1bins.rs @@ -182,6 +182,10 @@ where type Aggregator = MinMaxAvgDim1BinsAggregator; fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { + debug!( + "TimeBinnableType for MinMaxAvgDim1Bins aggregator() range {:?} x_bin_count {} do_time_weight {}", + range, x_bin_count, do_time_weight + ); Self::Aggregator::new(range, x_bin_count, do_time_weight) } } diff --git a/items/src/minmaxavgwavebins.rs b/items/src/minmaxavgwavebins.rs index 08f1159..15de521 100644 --- a/items/src/minmaxavgwavebins.rs +++ b/items/src/minmaxavgwavebins.rs @@ -7,6 +7,7 @@ use crate::{ }; use chrono::{TimeZone, Utc}; use err::Error; +use netpod::log::*; use netpod::timeunits::SEC; use netpod::NanoRange; use num_traits::Zero; @@ -180,6 +181,10 @@ where type Aggregator = MinMaxAvgWaveBinsAggregator; fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { + debug!( + "TimeBinnableType for MinMaxAvgWaveBins aggregator() range {:?} x_bin_count {} do_time_weight {}", + range, x_bin_count, do_time_weight + ); Self::Aggregator::new(range, x_bin_count, do_time_weight) } } diff --git a/items/src/waveevents.rs b/items/src/waveevents.rs index f0cdeda..dd8554f 100644 --- a/items/src/waveevents.rs +++ b/items/src/waveevents.rs @@ -179,8 +179,12 @@ where type Output = MinMaxAvgDim1Bins; type Aggregator = WaveEventsAggregator; - fn aggregator(range: NanoRange, bin_count: usize, do_time_weight: bool) -> Self::Aggregator { - Self::Aggregator::new(range, bin_count, do_time_weight) + fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { + debug!( + "TimeBinnableType for WaveEvents aggregator() range {:?} x_bin_count {} do_time_weight {}", + range, x_bin_count, do_time_weight + ); + Self::Aggregator::new(range, x_bin_count, do_time_weight) } } diff --git a/items/src/xbinnedscalarevents.rs b/items/src/xbinnedscalarevents.rs index 3573051..6fcb0df 100644 --- a/items/src/xbinnedscalarevents.rs +++ b/items/src/xbinnedscalarevents.rs @@ -7,7 +7,7 @@ use crate::{ TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; use err::Error; -use netpod::timeunits::SEC; +use netpod::log::*; use netpod::NanoRange; use serde::{Deserialize, Serialize}; use tokio::fs::File; @@ -176,14 +176,11 @@ where type Output = MinMaxAvgBins; type Aggregator = XBinnedScalarEventsAggregator; - fn aggregator(range: NanoRange, _x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { - // TODO remove output - if range.delta() > SEC * 0 { - netpod::log::debug!( - "TimeBinnableType for XBinnedScalarEvents aggregator() range {:?}", - range - ); - } + fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { + debug!( + "TimeBinnableType for XBinnedScalarEvents aggregator() range {:?} x_bin_count {} do_time_weight {}", + range, x_bin_count, do_time_weight + ); Self::Aggregator::new(range, do_time_weight) } } @@ -252,6 +249,7 @@ where } fn apply_event_unweight(&mut self, avg: f32, min: NTY, max: NTY) { + debug!("apply_event_unweight"); self.apply_min_max(min, max); let vf = avg; if vf.is_nan() { @@ -261,9 +259,10 @@ where } } - fn apply_event_time_weight(&mut self, ts: u64, avg: Option, min: Option, max: Option) { - if let Some(v) = self.last_avg { - self.apply_min_max(min.unwrap(), max.unwrap()); + fn apply_event_time_weight(&mut self, ts: u64) { + debug!("apply_event_time_weight"); + if let (Some(v), Some(min), Some(max)) = (self.last_avg, self.last_min, self.last_max) { + self.apply_min_max(min, max); let w = if self.do_time_weight { (ts - self.int_ts) as f32 * 1e-9 } else { @@ -277,10 +276,6 @@ where } self.int_ts = ts; } - self.last_ts = ts; - self.last_avg = avg; - self.last_min = min; - self.last_max = max; } fn ingest_unweight(&mut self, item: &XBinnedScalarEvents) { @@ -292,8 +287,8 @@ where if ts < self.range.beg { } else if ts >= self.range.end { } else { - self.count += 1; self.apply_event_unweight(avg, min, max); + self.count += 1; } } } @@ -312,8 +307,12 @@ where } else if ts >= self.range.end { return; } else { + self.apply_event_time_weight(ts); self.count += 1; - self.apply_event_time_weight(ts, Some(avg), Some(min), Some(max)); + self.last_ts = ts; + self.last_avg = Some(avg); + self.last_min = Some(min); + self.last_max = Some(max); } } } @@ -332,6 +331,7 @@ where maxs: vec![self.max], avgs: vec![avg], }; + self.int_ts = range.beg; self.range = range; self.count = 0; self.min = None; @@ -342,8 +342,9 @@ where } fn result_reset_time_weight(&mut self, range: NanoRange, expand: bool) -> MinMaxAvgBins { - if expand { - self.apply_event_time_weight(self.range.end, self.last_avg, self.last_min, self.last_max); + // TODO check callsite for correct expand status. + if true || expand { + self.apply_event_time_weight(self.range.end); } let avg = { let sc = self.range.delta() as f32 * 1e-9; @@ -357,6 +358,7 @@ where maxs: vec![self.max], avgs: vec![avg], }; + self.int_ts = range.beg; self.range = range; self.count = 0; self.min = None; @@ -379,6 +381,7 @@ where } fn ingest(&mut self, item: &Self::Input) { + debug!("ingest"); if self.do_time_weight { self.ingest_time_weight(item) } else { diff --git a/items/src/xbinnedwaveevents.rs b/items/src/xbinnedwaveevents.rs index eab1ad2..4b6943d 100644 --- a/items/src/xbinnedwaveevents.rs +++ b/items/src/xbinnedwaveevents.rs @@ -178,8 +178,12 @@ where type Output = MinMaxAvgWaveBins; type Aggregator = XBinnedWaveEventsAggregator; - fn aggregator(range: NanoRange, bin_count: usize, do_time_weight: bool) -> Self::Aggregator { - Self::Aggregator::new(range, bin_count, do_time_weight) + fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { + debug!( + "TimeBinnableType for XBinnedWaveEvents aggregator() range {:?} x_bin_count {} do_time_weight {}", + range, x_bin_count, do_time_weight + ); + Self::Aggregator::new(range, x_bin_count, do_time_weight) } } diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index a115598..71778f0 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -115,7 +115,7 @@ async fn events_conn_handler_inner_try( return Err((Error::with_msg("json parse error"), netout))?; } }; - debug!("--- nodenet::conn got query -------------------\nevq {:?}", evq); + debug!("--- got query evq {:?}", evq); let mut p1: Pin> + Send>> = if let Some(aa) = &node_config.node.channel_archiver { diff --git a/taskrun/src/taskrun.rs b/taskrun/src/taskrun.rs index 314d408..2be9c53 100644 --- a/taskrun/src/taskrun.rs +++ b/taskrun/src/taskrun.rs @@ -2,8 +2,10 @@ pub mod append; use crate::log::*; use err::Error; +use netpod::ChannelArchiver; use std::future::Future; use std::panic; +use std::path::PathBuf; use std::sync::{Arc, Mutex}; use tokio::runtime::Runtime; use tokio::task::JoinHandle; @@ -102,7 +104,10 @@ pub fn tracing_init() { "archapp::archeng::pipe=info", "archapp::storagemerge=info", "streams::rangefilter=info", + "items::eventvalues=debug", + "items::xbinnedscalarevents=debug", "disk::binned=info", + "nodenet::conn=debug", "daqbuffer::test=info", ] .join(","), @@ -151,3 +156,42 @@ pub fn test_cluster() -> netpod::Cluster { file_io_buffer_size: Default::default(), } } + +pub fn sls_test_cluster() -> netpod::Cluster { + let nodes = (0..1) + .into_iter() + .map(|id| netpod::Node { + host: "localhost".into(), + listen: "0.0.0.0".into(), + port: 8362 + id as u16, + port_raw: 8362 + id as u16 + 100, + data_base_path: format!("NOdatapath{}", id).into(), + cache_base_path: format!("../tmpdata/node{:02}", id).into(), + ksprefix: "NOKS".into(), + backend: "sls-archive".into(), + splits: None, + archiver_appliance: None, + channel_archiver: Some(ChannelArchiver { + data_base_paths: vec![PathBuf::from("/data/daqbuffer-testdata/sls/gfa03")], + database: netpod::Database { + host: "localhost".into(), + name: "testingdaq".into(), + user: "testingdaq".into(), + pass: "testingdaq".into(), + }, + }), + }) + .collect(); + netpod::Cluster { + nodes, + database: netpod::Database { + host: "localhost".into(), + name: "testingdaq".into(), + user: "testingdaq".into(), + pass: "testingdaq".into(), + }, + run_map_pulse_task: false, + is_central_storage: false, + file_io_buffer_size: Default::default(), + } +}