From 0a05082da8f14fb4a4785f02cffd0520d8ac1185 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 3 Sep 2021 21:57:05 +0200 Subject: [PATCH] Count edge events --- daqbufp2/src/nodes.rs | 12 +- daqbufp2/src/test.rs | 10 +- daqbufp2/src/test/timeweightedjson.rs | 127 ++++++++++++++++++--- disk/src/agg/binnedt.rs | 17 +-- disk/src/aggtest.rs | 2 + disk/src/eventblobs.rs | 19 +++- disk/src/eventchunker.rs | 36 +++--- disk/src/gen.rs | 88 +++++++++++++-- disk/src/raw/conn.rs | 1 + httpret/src/lib.rs | 30 +++-- httpret/src/proxy.rs | 2 +- items/src/eventvalues.rs | 157 +++++++++++++++++++++----- items/src/lib.rs | 2 +- items/src/minmaxavgbins.rs | 13 ++- items/src/minmaxavgdim1bins.rs | 18 ++- items/src/minmaxavgwavebins.rs | 23 ++-- items/src/waveevents.rs | 18 ++- items/src/xbinnedscalarevents.rs | 13 ++- items/src/xbinnedwaveevents.rs | 23 ++-- netpod/src/lib.rs | 13 ++- parse/src/channelconfig.rs | 2 +- taskrun/src/lib.rs | 93 +++++++++------ 22 files changed, 544 insertions(+), 175 deletions(-) diff --git a/daqbufp2/src/nodes.rs b/daqbufp2/src/nodes.rs index 22208a4..f6de991 100644 --- a/daqbufp2/src/nodes.rs +++ b/daqbufp2/src/nodes.rs @@ -9,6 +9,12 @@ pub struct RunningHosts { _jhs: Vec>>, } +impl Drop for RunningHosts { + fn drop(&mut self) { + netpod::log::error!("\n\n+++++++++++++++++++ impl Drop for RunningHost\n\n"); + } +} + lazy_static::lazy_static! { static ref HOSTS_RUNNING: Mutex>> = Mutex::new(None); } @@ -17,6 +23,7 @@ pub fn require_test_hosts_running() -> Result, Error> { let mut g = HOSTS_RUNNING.lock().unwrap(); match g.as_ref() { None => { + netpod::log::error!("\n\n+++++++++++++++++++ MAKE NEW RunningHosts\n\n"); let cluster = taskrun::test_cluster(); let jhs = spawn_test_hosts(cluster.clone()); let ret = RunningHosts { @@ -27,6 +34,9 @@ pub fn require_test_hosts_running() -> Result, Error> { *g = Some(a.clone()); Ok(a) } - Some(gg) => Ok(gg.clone()), + Some(gg) => { + netpod::log::error!("\n\n+++++++++++++++++++ REUSE RunningHost\n\n"); + Ok(gg.clone()) + } } } diff --git a/daqbufp2/src/test.rs b/daqbufp2/src/test.rs index 9aedc16..94af6d3 100644 --- a/daqbufp2/src/test.rs +++ b/daqbufp2/src/test.rs @@ -9,10 +9,14 @@ use std::future::Future; fn run_test(f: F) where - F: Future>, + F: Future> + Send, { - std::env::set_current_dir("..").unwrap(); - taskrun::run(f).unwrap(); + //taskrun::run(f).unwrap(); + let runtime = taskrun::get_runtime(); + let _g = runtime.enter(); + runtime.block_on(f).unwrap(); + //let jh = tokio::spawn(f); + //jh.await; } #[test] diff --git a/daqbufp2/src/test/timeweightedjson.rs b/daqbufp2/src/test/timeweightedjson.rs index b3396ef..cf967d7 100644 --- a/daqbufp2/src/test/timeweightedjson.rs +++ b/daqbufp2/src/test/timeweightedjson.rs @@ -10,11 +10,86 @@ use std::time::Duration; use url::Url; #[test] -fn time_weighted_json_0() { - super::run_test(time_weighted_json_0_inner()); +fn time_weighted_json_00() { + // Each test must make sure that the nodes are running. + // Can I use absolute paths in the Node Configs to make me independent of the CWD? + // run_test must assume that the CWD when it is started, is the crate directory. + super::run_test(time_weighted_json_00_inner()); } -async fn time_weighted_json_0_inner() -> Result<(), Error> { +async fn time_weighted_json_00_inner() -> Result<(), Error> { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + let res = get_json_common( + "const-regular-scalar-i32-be", + "1970-01-01T00:20:10.000Z", + "1970-01-01T04:20:30.000Z", + 20, + AggKind::DimXBins1, + cluster, + 25, + true, + ) + .await?; + let v = res.avgs[0]; + assert!(v > 41.9999 && v < 42.0001); + Ok(()) +} + +#[test] +fn time_weighted_json_01() { + super::run_test(time_weighted_json_01_inner()); +} + +async fn time_weighted_json_01_inner() -> Result<(), Error> { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + let res = get_json_common( + "const-regular-scalar-i32-be", + "1970-01-01T00:20:10.000Z", + "1970-01-01T10:20:30.000Z", + 10, + AggKind::DimXBins1, + cluster, + 11, + true, + ) + .await?; + let v = res.avgs[0]; + assert!(v > 41.9999 && v < 42.0001); + Ok(()) +} + +#[test] +fn time_weighted_json_02() { + super::run_test(time_weighted_json_02_inner()); +} + +async fn time_weighted_json_02_inner() -> Result<(), Error> { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + let res = get_json_common( + "const-regular-scalar-i32-be", + "1970-01-01T00:20:10.000Z", + "1970-01-01T00:20:20.000Z", + 20, + AggKind::TimeWeightedScalar, + cluster, + 100, + true, + ) + .await?; + let v = res.avgs[0]; + assert!(v > 41.9999 && v < 42.0001); + Ok(()) +} + +#[test] +fn time_weighted_json_10() { + super::run_test(time_weighted_json_10_inner()); +} + +async fn time_weighted_json_10_inner() -> Result<(), Error> { let rh = require_test_hosts_running()?; let cluster = &rh.cluster; get_json_common( @@ -22,20 +97,21 @@ async fn time_weighted_json_0_inner() -> Result<(), Error> { "1970-01-01T00:20:10.000Z", "1970-01-01T01:20:30.000Z", 10, - AggKind::TimeWeightedScalar, + AggKind::DimXBins1, cluster, 13, true, ) - .await + .await?; + Ok(()) } #[test] -fn time_weighted_json_1() { - super::run_test(time_weighted_json_1_inner()); +fn time_weighted_json_20() { + super::run_test(time_weighted_json_20_inner()); } -async fn time_weighted_json_1_inner() -> Result<(), Error> { +async fn time_weighted_json_20_inner() -> Result<(), Error> { let rh = require_test_hosts_running()?; let cluster = &rh.cluster; get_json_common( @@ -48,11 +124,16 @@ async fn time_weighted_json_1_inner() -> Result<(), Error> { 13, true, ) - .await + .await?; + Ok(()) } // For waveform with N x-bins, see test::binnedjson +struct DataResult { + avgs: Vec, +} + async fn get_json_common( channel_name: &str, beg_date: &str, @@ -62,7 +143,7 @@ async fn get_json_common( cluster: &Cluster, expect_bin_count: u32, expect_finalised_range: bool, -) -> Result<(), Error> { +) -> Result { let t1 = Utc::now(); let node0 = &cluster.nodes[0]; let beg_date: DateTime = beg_date.parse()?; @@ -74,7 +155,7 @@ async fn get_json_common( }; let range = NanoRange::from_date_time(beg_date, end_date); let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind); - query.set_timeout(Duration::from_millis(10000)); + query.set_timeout(Duration::from_millis(40000)); query.set_cache_usage(CacheUsage::Ignore); let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", node0.host, node0.port))?; query.append_to_url(&mut url); @@ -116,17 +197,27 @@ async fn get_json_common( return Err(Error::with_msg("expect absent finalisedRange")); } } - if res.get("counts").unwrap().as_array().unwrap().len() != expect_bin_count as usize { + let counts = res.get("counts").unwrap().as_array().unwrap(); + let mins = res.get("mins").unwrap().as_array().unwrap(); + let maxs = res.get("maxs").unwrap().as_array().unwrap(); + let avgs = res.get("avgs").unwrap().as_array().unwrap(); + if counts.len() != expect_bin_count as usize { + return Err(Error::with_msg(format!( + "expect_bin_count {} got {}", + expect_bin_count, + counts.len() + ))); + } + if mins.len() != expect_bin_count as usize { return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count))); } - if res.get("mins").unwrap().as_array().unwrap().len() != expect_bin_count as usize { + if maxs.len() != expect_bin_count as usize { return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count))); } - if res.get("maxs").unwrap().as_array().unwrap().len() != expect_bin_count as usize { + let avgs: Vec<_> = avgs.into_iter().map(|k| k.as_f64().unwrap()).collect(); + if avgs.len() != expect_bin_count as usize { return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count))); } - if res.get("avgs").unwrap().as_array().unwrap().len() != expect_bin_count as usize { - return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count))); - } - Ok(()) + let ret = DataResult { avgs }; + Ok(ret) } diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index f495eb6..09472ae 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -112,18 +112,13 @@ where } // TODO handle unwrap error, or use a mem replace type instead of option: - fn cycle_current_bin(&mut self) { + fn cycle_current_bin(&mut self, expand: bool) { self.curbin += 1; - let range = self.spec.get_range(self.curbin); let ret = self .aggtor - .replace(::aggregator( - range, - self.x_bin_count, - self.do_time_weight, - )) + .as_mut() .unwrap() - .result(); + .result_reset(self.spec.get_range(self.curbin), expand); // TODO should we accumulate bins before emit? Maybe not, we want to stay responsive. // Only if the frequency would be high, that would require cpu time checks. Worth it? Measure.. self.tmp_agg_results.push_back(ret); @@ -159,7 +154,7 @@ where } else if item.starts_after(ag.range().clone()) { self.left = Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))); - self.cycle_current_bin(); + self.cycle_current_bin(true); // TODO cycle_current_bin enqueues the bin, can I return here instead? None } else { @@ -167,7 +162,7 @@ where if item.ends_after(ag.range().clone()) { self.left = Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))); - self.cycle_current_bin(); + self.cycle_current_bin(true); } // TODO cycle_current_bin enqueues the bin, can I return here instead? None @@ -185,7 +180,7 @@ where if self.all_bins_emitted { None } else { - self.cycle_current_bin(); + self.cycle_current_bin(false); // TODO cycle_current_bin enqueues the bin, can I return here instead? None } diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 5ea710d..f29df72 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -61,6 +61,7 @@ async fn agg_x_dim_0_inner() { 0, query.buffer_size as usize, event_chunker_conf, + false, ); let _ = fut1; // TODO add the binning and expectation and await the result. @@ -110,6 +111,7 @@ async fn agg_x_dim_1_inner() { 0, query.buffer_size as usize, event_chunker_conf, + false, ); let _ = fut1; // TODO add the binning and expectation and await the result. diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index b50caf2..8c3df76 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -27,6 +27,8 @@ pub struct EventChunkerMultifile { files_count: u32, node_ix: usize, seen_before_range_count: usize, + seen_after_range_count: usize, + expand: bool, } impl EventChunkerMultifile { @@ -37,8 +39,9 @@ impl EventChunkerMultifile { node_ix: usize, buffer_size: usize, event_chunker_conf: EventChunkerConf, + expand: bool, ) -> Self { - let file_chan = if true { + let file_chan = if expand { open_expanded_files(&range, &channel_config, node) } else { open_files(&range, &channel_config, node) @@ -57,6 +60,8 @@ impl EventChunkerMultifile { files_count: 0, node_ix, seen_before_range_count: 0, + seen_after_range_count: 0, + expand, } } @@ -113,6 +118,7 @@ impl Stream for EventChunkerMultifile { self.event_chunker_conf.clone(), path, self.max_ts.clone(), + self.expand, ); self.evs = Some(chunker); } @@ -174,8 +180,15 @@ fn read_expanded_for_range(range: netpod::NanoRange) -> Result<(usize, usize), E }; let task = async move { let mut event_count = 0; - let mut events = - EventChunkerMultifile::new(range, channel_config, node, node_ix, buffer_size, event_chunker_conf); + let mut events = EventChunkerMultifile::new( + range, + channel_config, + node, + node_ix, + buffer_size, + event_chunker_conf, + true, + ); while let Some(item) = events.next().await { match item { Ok(item) => match item { diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 2c8214b..db93a8d 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -30,7 +30,9 @@ pub struct EventChunker { parsed_bytes: u64, path: PathBuf, max_ts: Arc, + expand: bool, seen_before_range_count: usize, + seen_after_range_count: usize, } enum DataFileState { @@ -62,6 +64,7 @@ impl EventChunker { stats_conf: EventChunkerConf, path: PathBuf, max_ts: Arc, + expand: bool, ) -> Self { let mut inp = NeedMinBuffer::new(inp); inp.set_need_min(6); @@ -81,7 +84,9 @@ impl EventChunker { parsed_bytes: 0, path, max_ts, + expand, seen_before_range_count: 0, + seen_after_range_count: 0, } } @@ -92,8 +97,9 @@ impl EventChunker { stats_conf: EventChunkerConf, path: PathBuf, max_ts: Arc, + expand: bool, ) -> Self { - let mut ret = Self::from_start(inp, channel_config, range, stats_conf, path, max_ts); + let mut ret = Self::from_start(inp, channel_config, range, stats_conf, path, max_ts, expand); ret.state = DataFileState::Event; ret.need_min = 4; ret.inp.set_need_min(4); @@ -174,9 +180,12 @@ impl EventChunker { } self.max_ts.store(ts, Ordering::SeqCst); if ts >= self.range.end { - self.seen_beyond_range = true; - self.data_emit_complete = true; - break; + self.seen_after_range_count += 1; + if !self.expand || self.seen_after_range_count >= 2 { + self.seen_beyond_range = true; + self.data_emit_complete = true; + break; + } } if ts < self.range.beg { self.seen_before_range_count += 1; @@ -276,18 +285,13 @@ impl EventChunker { ) { Ok(c1) => { assert!(c1 as u32 == k1); - if ts < self.range.beg { - } else if ts >= self.range.end { - Err(Error::with_msg(format!("event after range {}", ts / SEC)))?; - } else { - ret.add_event( - ts, - pulse, - Some(decomp), - ScalarType::from_dtype_index(type_index)?, - is_big_endian, - ); - } + ret.add_event( + ts, + pulse, + Some(decomp), + ScalarType::from_dtype_index(type_index)?, + is_big_endian, + ); } Err(e) => { Err(Error::with_msg(format!("decompression failed {:?}", e)))?; diff --git a/disk/src/gen.rs b/disk/src/gen.rs index adeca43..64590cc 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -2,7 +2,7 @@ use crate::ChannelConfigExt; use bitshuffle::bitshuffle_compress; use bytes::{BufMut, BytesMut}; use err::Error; -use netpod::{timeunits::*, ByteOrder, Channel, ChannelConfig, Node, Shape}; +use netpod::{timeunits::*, ByteOrder, Channel, ChannelConfig, GenVar, Node, Shape}; use netpod::{Nanos, ScalarType}; use std::path::{Path, PathBuf}; use tokio::fs::{File, OpenOptions}; @@ -10,9 +10,8 @@ use tokio::io::AsyncWriteExt; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; -//#[test] +#[test] pub fn gen_test_data_test() { - std::env::set_current_dir("..").unwrap(); taskrun::run(gen_test_data()).unwrap(); } @@ -38,6 +37,7 @@ pub async fn gen_test_data() -> Result<(), Error> { array: false, compression: false, }, + gen_var: netpod::GenVar::Default, time_spacing: MS * 500, }; ensemble.channels.push(chn); @@ -55,6 +55,7 @@ pub async fn gen_test_data() -> Result<(), Error> { byte_order: ByteOrder::big_endian(), compression: true, }, + gen_var: netpod::GenVar::Default, time_spacing: MS * 4000, }; ensemble.channels.push(chn); @@ -66,12 +67,49 @@ pub async fn gen_test_data() -> Result<(), Error> { }, keyspace: 3, time_bin_size: Nanos { ns: DAY }, - array: true, scalar_type: ScalarType::U16, - shape: Shape::Wave(77), byte_order: ByteOrder::little_endian(), + shape: Shape::Wave(77), + array: true, compression: true, }, + gen_var: netpod::GenVar::Default, + time_spacing: MS * 500, + }; + ensemble.channels.push(chn); + let chn = ChannelGenProps { + config: ChannelConfig { + channel: Channel { + backend: "testbackend".into(), + name: "tw-scalar-i32-be".into(), + }, + keyspace: 2, + time_bin_size: Nanos { ns: DAY }, + scalar_type: ScalarType::I32, + byte_order: ByteOrder::little_endian(), + shape: Shape::Scalar, + array: false, + compression: false, + }, + gen_var: netpod::GenVar::TimeWeight, + time_spacing: MS * 500, + }; + ensemble.channels.push(chn); + let chn = ChannelGenProps { + config: ChannelConfig { + channel: Channel { + backend: "testbackend".into(), + name: "const-regular-scalar-i32-be".into(), + }, + keyspace: 2, + time_bin_size: Nanos { ns: DAY }, + scalar_type: ScalarType::I32, + byte_order: ByteOrder::little_endian(), + shape: Shape::Scalar, + array: false, + compression: false, + }, + gen_var: netpod::GenVar::ConstRegular, time_spacing: MS * 500, }; ensemble.channels.push(chn); @@ -105,6 +143,7 @@ struct Ensemble { pub struct ChannelGenProps { config: ChannelConfig, time_spacing: u64, + gen_var: GenVar, } async fn gen_node(node: &Node, ensemble: &Ensemble) -> Result<(), Error> { @@ -138,6 +177,7 @@ async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) -> &chn.config, node, ensemble, + &chn.gen_var, ) .await?; evix = res.evix; @@ -279,6 +319,7 @@ async fn gen_timebin( config: &ChannelConfig, node: &Node, ensemble: &Ensemble, + gen_var: &GenVar, ) -> Result { let tb = ts.ns / config.time_bin_size.ns; let path = channel_path @@ -317,8 +358,25 @@ async fn gen_timebin( ns: (tb + 1) * config.time_bin_size.ns, }; while ts.ns < tsmax.ns { - if evix % ensemble.nodes.len() as u64 == node.split as u64 { - gen_event(&mut file, index_file.as_mut(), evix, ts, pulse, config).await?; + match gen_var { + GenVar::Default => { + if evix % ensemble.nodes.len() as u64 == node.split as u64 { + gen_event(&mut file, index_file.as_mut(), evix, ts, pulse, config, gen_var).await?; + } + } + GenVar::ConstRegular => { + if evix % ensemble.nodes.len() as u64 == node.split as u64 { + gen_event(&mut file, index_file.as_mut(), evix, ts, pulse, config, gen_var).await?; + } + } + GenVar::TimeWeight => { + let m = evix % 20; + if m == 0 || m == 1 { + if evix % ensemble.nodes.len() as u64 == node.split as u64 { + gen_event(&mut file, index_file.as_mut(), evix, ts, pulse, config, gen_var).await?; + } + } + } } evix += 1; ts.ns += ts_spacing; @@ -347,6 +405,7 @@ async fn gen_event( ts: Nanos, pulse: u64, config: &ChannelConfig, + gen_var: &GenVar, ) -> Result<(), Error> { let ttl = 0xcafecafe; let ioc_ts = 0xcafecafe; @@ -436,7 +495,20 @@ async fn gen_event( buf.put_u8(config.scalar_type.index()); match &config.scalar_type { ScalarType::I32 => { - let v = evix as i32; + let v = match gen_var { + GenVar::Default => evix as i32, + GenVar::ConstRegular => 42 as i32, + GenVar::TimeWeight => { + let m = evix % 20; + if m == 0 { + 200 + } else if m == 1 { + 400 + } else { + 0 + } + } + }; if config.byte_order.is_be() { buf.put_i32(v); } else { diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 3fd943f..5202af0 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -179,6 +179,7 @@ pub async fn make_event_pipe( node_config.ix, evq.disk_io_buffer_size, event_chunker_conf, + true, ); let shape = entry.to_shape()?; let pipe = pipe1!( diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 6a126fa..124b13b 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -41,7 +41,8 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { use std::str::FromStr; let addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen, node_config.node.port))?; let make_service = make_service_fn({ - move |_conn| { + move |conn| { + info!("»»»»»»»»»»» new connection {:?}", conn); let node_config = node_config.clone(); async move { Ok::<_, Error>(service_fn({ @@ -54,6 +55,7 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { } }); Server::bind(&addr).serve(make_service).await?; + warn!("»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»» SERVICE DONE ««««««««««««««««««««««««««««««««««««««««"); rawjh.await??; Ok(()) } @@ -359,6 +361,11 @@ async fn binned(req: Request, node_config: &NodeConfigCached) -> Result binned_binary(query, node_config).await, Some(v) if v == APP_JSON => binned_json(query, node_config).await, @@ -399,19 +406,18 @@ async fn binned_json(query: BinnedQuery, node_config: &NodeConfigCached) -> Resu async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let (head, _body) = req.into_parts(); let query = PreBinnedQuery::from_request(&head)?; - let desc = format!("pre-b-{}", query.patch().bin_t_len() / SEC); - let span1 = span!(Level::INFO, "httpret::prebinned_DISABLED", desc = &desc.as_str()); - //span1.in_scope(|| {}); + let desc = format!( + "pre-W-{}-B-{}", + query.patch().bin_t_len() / SEC, + query.patch().patch_beg() / SEC + ); + let span1 = span!(Level::INFO, "httpret::prebinned", desc = &desc.as_str()); + span1.in_scope(|| { + info!("prebinned STARTING"); + }); let fut = pre_binned_bytes_for_http(node_config, &query).instrument(span1); let ret = match fut.await { - Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped( - s, - format!( - "pre-b-{}-p-{}", - query.patch().bin_t_len() / SEC, - query.patch().patch_beg() / SEC, - ), - ))?, + Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, desc))?, Err(e) => { if query.report_error() { response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))? diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index a0160ba..5f5a1f2 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -239,7 +239,7 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R let mut a = vec![]; if let Some(g) = k.first() { for c in &g.channels { - let mut z = ChannelSearchSingleResult { + let z = ChannelSearchSingleResult { backend: c.backend.clone(), description: String::new(), name: c.name.clone(), diff --git a/items/src/eventvalues.rs b/items/src/eventvalues.rs index 69cd51c..f8251df 100644 --- a/items/src/eventvalues.rs +++ b/items/src/eventvalues.rs @@ -7,7 +7,8 @@ use crate::{ WithLen, WithTimestamps, }; use err::Error; -use netpod::timeunits::SEC; +use netpod::log::*; +use netpod::timeunits::*; use netpod::NanoRange; use serde::{Deserialize, Serialize}; use std::fmt; @@ -264,13 +265,18 @@ pub struct EventValuesAggregator { max: Option, sumc: u64, sum: f32, + int_ts: u64, last_ts: u64, last_val: Option, do_time_weight: bool, } -impl EventValuesAggregator { +impl EventValuesAggregator +where + NTY: NumOps, +{ pub fn new(range: NanoRange, do_time_weight: bool) -> Self { + let int_ts = range.beg; Self { range, count: 0, @@ -278,16 +284,43 @@ impl EventValuesAggregator { max: None, sum: 0f32, sumc: 0, + int_ts, last_ts: 0, last_val: None, do_time_weight, } } - fn apply_event(&mut self, ts: u64, val: Option) - where - NTY: NumOps, - { + fn apply_event_unweight(&mut self, val: NTY) { + self.min = match self.min { + None => Some(val), + Some(min) => { + if val < min { + Some(val) + } else { + Some(min) + } + } + }; + self.max = match self.max { + None => Some(val), + Some(max) => { + if val > max { + Some(val) + } else { + Some(max) + } + } + }; + let vf = val.as_(); + if vf.is_nan() { + } else { + self.sum += vf; + self.sumc += 1; + } + } + + fn apply_event_time_weight(&mut self, ts: u64, val: Option) { if let Some(v) = self.last_val { self.min = match self.min { None => Some(v), @@ -310,7 +343,7 @@ impl EventValuesAggregator { } }; let w = if self.do_time_weight { - (ts - self.last_ts) as f32 / 1000000000 as f32 + (ts - self.int_ts) as f32 * 1e-9 } else { 1. }; @@ -320,11 +353,89 @@ impl EventValuesAggregator { self.sum += vf * w; self.sumc += 1; } - self.count += 1; + self.int_ts = ts; } self.last_ts = ts; self.last_val = val; } + + fn ingest_unweight(&mut self, item: &::Input) { + for i1 in 0..item.tss.len() { + let ts = item.tss[i1]; + let val = item.values[i1]; + if ts < self.range.beg { + } else if ts >= self.range.end { + } else { + self.count += 1; + self.apply_event_unweight(val); + } + } + } + + fn ingest_time_weight(&mut self, item: &::Input) { + for i1 in 0..item.tss.len() { + let ts = item.tss[i1]; + let val = item.values[i1]; + if ts < self.int_ts { + self.last_ts = ts; + self.last_val = Some(val); + } else if ts >= self.range.end { + info!("event after {}", ts / MS); + return; + } else { + self.count += 1; + self.apply_event_time_weight(ts, Some(val)); + } + } + } + + fn result_reset_unweight(&mut self, range: NanoRange, expand: bool) -> MinMaxAvgBins { + let avg = if self.sumc == 0 { + None + } else { + Some(self.sum / self.sumc as f32) + }; + let ret = MinMaxAvgBins { + ts1s: vec![self.range.beg], + ts2s: vec![self.range.end], + counts: vec![self.count], + mins: vec![self.min], + maxs: vec![self.max], + avgs: vec![avg], + }; + self.range = range; + self.count = 0; + self.min = None; + self.max = None; + self.sum = 0f32; + self.sumc = 0; + ret + } + + fn result_reset_time_weight(&mut self, range: NanoRange, expand: bool) -> MinMaxAvgBins { + if expand { + self.apply_event_time_weight(self.range.end, self.last_val); + } + let avg = { + let sc = self.range.delta() as f32 * 1e-9; + Some(self.sum / sc) + }; + let ret = MinMaxAvgBins { + ts1s: vec![self.range.beg], + ts2s: vec![self.range.end], + counts: vec![self.count], + mins: vec![self.min], + maxs: vec![self.max], + avgs: vec![avg], + }; + self.range = range; + self.count = 0; + self.min = None; + self.max = None; + self.sum = 0f32; + self.sumc = 0; + ret + } } impl TimeBinnableTypeAggregator for EventValuesAggregator @@ -339,32 +450,18 @@ where } fn ingest(&mut self, item: &Self::Input) { - for i1 in 0..item.tss.len() { - let ts = item.tss[i1]; - if ts < self.range.beg { - self.last_ts = ts; - self.last_val = Some(item.values[i1]); - } else if ts >= self.range.end { - } else { - self.apply_event(ts, Some(item.values[i1])); - } + if self.do_time_weight { + self.ingest_time_weight(item) + } else { + self.ingest_unweight(item) } } - fn result(mut self) -> Self::Output { - self.apply_event(self.range.end, None); - let avg = if self.sumc == 0 { - None + fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output { + if self.do_time_weight { + self.result_reset_time_weight(range, expand) } else { - Some(self.sum / self.sumc as f32) - }; - Self::Output { - ts1s: vec![self.range.beg], - ts2s: vec![self.range.end], - counts: vec![self.count], - mins: vec![self.min], - maxs: vec![self.max], - avgs: vec![avg], + self.result_reset_unweight(range, expand) } } } diff --git a/items/src/lib.rs b/items/src/lib.rs index 9e3124e..bbf384e 100644 --- a/items/src/lib.rs +++ b/items/src/lib.rs @@ -400,5 +400,5 @@ pub trait TimeBinnableTypeAggregator: Send { type Output: TimeBinnableType; fn range(&self) -> &NanoRange; fn ingest(&mut self, item: &Self::Input); - fn result(self) -> Self::Output; + fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output; } diff --git a/items/src/minmaxavgbins.rs b/items/src/minmaxavgbins.rs index 066de18..8654da2 100644 --- a/items/src/minmaxavgbins.rs +++ b/items/src/minmaxavgbins.rs @@ -402,19 +402,26 @@ where } } - fn result(self) -> Self::Output { + fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output { let avg = if self.sumc == 0 { None } else { Some(self.sum / self.sumc as f32) }; - Self::Output { + let ret = Self::Output { ts1s: vec![self.range.beg], ts2s: vec![self.range.end], counts: vec![self.count], mins: vec![self.min], maxs: vec![self.max], avgs: vec![avg], - } + }; + self.count = 0; + self.min = None; + self.max = None; + self.range = range; + self.sum = 0f32; + self.sumc = 0; + ret } } diff --git a/items/src/minmaxavgdim1bins.rs b/items/src/minmaxavgdim1bins.rs index 9ddcf3b..42ecf5b 100644 --- a/items/src/minmaxavgdim1bins.rs +++ b/items/src/minmaxavgdim1bins.rs @@ -404,7 +404,7 @@ where } } - fn result(self) -> Self::Output { + fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output { let avg = if self.sumc == 0 { None } else { @@ -417,14 +417,22 @@ where .collect(); Some(avg) }; - Self::Output { + let ret = Self::Output { ts1s: vec![self.range.beg], ts2s: vec![self.range.end], counts: vec![self.count], - mins: vec![self.min], - maxs: vec![self.max], + // TODO replace with reset-value instead: + mins: vec![self.min.clone()], + maxs: vec![self.max.clone()], avgs: vec![avg], - } + }; + self.range = range; + self.count = 0; + self.min = None; + self.max = None; + self.sum = None; + self.sumc = 0; + ret } } diff --git a/items/src/minmaxavgwavebins.rs b/items/src/minmaxavgwavebins.rs index 081812f..b8799e0 100644 --- a/items/src/minmaxavgwavebins.rs +++ b/items/src/minmaxavgwavebins.rs @@ -397,26 +397,35 @@ where } } - fn result(self) -> Self::Output { + fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output { + let ret; if self.sumc == 0 { - Self::Output { + ret = Self::Output { ts1s: vec![self.range.beg], ts2s: vec![self.range.end], counts: vec![self.count], mins: vec![None], maxs: vec![None], avgs: vec![None], - } + }; } else { let avg = self.sum.iter().map(|j| *j / self.sumc as f32).collect(); - Self::Output { + ret = Self::Output { ts1s: vec![self.range.beg], ts2s: vec![self.range.end], counts: vec![self.count], - mins: vec![Some(self.min)], - maxs: vec![Some(self.max)], + // TODO replace with reset-value instead: + mins: vec![Some(self.min.clone())], + maxs: vec![Some(self.max.clone())], avgs: vec![Some(avg)], - } + }; } + self.range = range; + self.count = 0; + self.min = vec![NTY::max_or_nan(); self.min.len()]; + self.max = vec![NTY::min_or_nan(); self.min.len()]; + self.sum = vec![0f32; self.min.len()]; + self.sumc = 0; + ret } } diff --git a/items/src/waveevents.rs b/items/src/waveevents.rs index d50707b..6e4a4b5 100644 --- a/items/src/waveevents.rs +++ b/items/src/waveevents.rs @@ -245,7 +245,7 @@ where } } - fn result(self) -> Self::Output { + fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output { let avg = if self.sumc == 0 { None } else { @@ -258,14 +258,22 @@ where .collect(); Some(avg) }; - Self::Output { + let ret = Self::Output { ts1s: vec![self.range.beg], ts2s: vec![self.range.end], counts: vec![self.count], - mins: vec![self.min], - maxs: vec![self.max], + // TODO replace with reset-value instead. + mins: vec![self.min.clone()], + maxs: vec![self.max.clone()], avgs: vec![avg], - } + }; + self.range = range; + self.count = 0; + self.min = None; + self.max = None; + self.sum = None; + self.sumc = 0; + ret } } diff --git a/items/src/xbinnedscalarevents.rs b/items/src/xbinnedscalarevents.rs index b41aa21..5345124 100644 --- a/items/src/xbinnedscalarevents.rs +++ b/items/src/xbinnedscalarevents.rs @@ -248,20 +248,27 @@ where } } - fn result(self) -> Self::Output { + fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output { let avg = if self.sumc == 0 { None } else { Some(self.sum / self.sumc as f32) }; - Self::Output { + let ret = Self::Output { ts1s: vec![self.range.beg], ts2s: vec![self.range.end], counts: vec![self.count], mins: vec![self.min], maxs: vec![self.max], avgs: vec![avg], - } + }; + self.range = range; + self.count = 0; + self.min = None; + self.max = None; + self.sum = 0f32; + self.sumc = 0; + ret } } diff --git a/items/src/xbinnedwaveevents.rs b/items/src/xbinnedwaveevents.rs index 683123f..c968b06 100644 --- a/items/src/xbinnedwaveevents.rs +++ b/items/src/xbinnedwaveevents.rs @@ -178,7 +178,7 @@ impl XBinnedWaveEventsAggregator where NTY: NumOps, { - pub fn new(range: NanoRange, bin_count: usize, do_time_weight: bool) -> Self { + pub fn new(range: NanoRange, bin_count: usize, _do_time_weight: bool) -> Self { if bin_count == 0 { panic!("bin_count == 0"); } @@ -237,31 +237,38 @@ where } } - fn result(self) -> Self::Output { + fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output { + let ret; if self.sumc == 0 { - Self::Output { + ret = Self::Output { ts1s: vec![self.range.beg], ts2s: vec![self.range.end], counts: vec![self.count], mins: vec![None], maxs: vec![None], avgs: vec![None], - } + }; } else { let avg = self.sum.iter().map(|k| *k / self.sumc as f32).collect(); - let ret = Self::Output { + ret = Self::Output { ts1s: vec![self.range.beg], ts2s: vec![self.range.end], counts: vec![self.count], - mins: vec![Some(self.min)], - maxs: vec![Some(self.max)], + // TODO replace with the reset-value instead. + mins: vec![Some(self.min.clone())], + maxs: vec![Some(self.max.clone())], avgs: vec![Some(avg)], }; if ret.ts1s[0] < 1300 { info!("XBinnedWaveEventsAggregator result {:?}", ret); } - ret } + self.range = range; + self.count = 0; + self.min = vec![NTY::max_or_nan(); self.min.len()]; + self.max = vec![NTY::min_or_nan(); self.min.len()]; + self.sum = vec![0f32; self.min.len()]; + ret } } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 1801ed6..0338b9e 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -274,9 +274,11 @@ impl fmt::Debug for NanoRange { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!( fmt, - "NanoRange {{ beg: {} s, end: {} s }}", + "NanoRange {{ beg: {}.{:03} s, end: {}.{:03} s }}", self.beg / SEC, - self.end / SEC + (self.beg % SEC) / MS, + self.end / SEC, + (self.end % SEC) / MS, ) } } @@ -334,6 +336,13 @@ impl ByteOrder { } } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum GenVar { + Default, + TimeWeight, + ConstRegular, +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ChannelConfig { pub channel: Channel, diff --git a/parse/src/channelconfig.rs b/parse/src/channelconfig.rs index 8181279..3718109 100644 --- a/parse/src/channelconfig.rs +++ b/parse/src/channelconfig.rs @@ -282,7 +282,7 @@ pub async fn read_local_config(channel: &Channel, node: &Node) -> Result k, Err(e) => match e.kind() { - ErrorKind::NotFound => return Err(Error::with_msg("ErrorKind::NotFound")), + ErrorKind::NotFound => return Err(Error::with_msg(format!("ErrorKind::NotFound for {:?}", path))), _ => return Err(e.into()), }, }; diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index 634032c..6192cae 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -2,7 +2,8 @@ use crate::log::*; use err::Error; use std::future::Future; use std::panic; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; +use tokio::runtime::Runtime; use tokio::task::JoinHandle; pub mod log { @@ -10,41 +11,58 @@ pub mod log { pub use tracing::{debug, error, info, trace, warn}; } +lazy_static::lazy_static! { + static ref RUNTIME: Mutex>> = Mutex::new(None); +} + +pub fn get_runtime() -> Arc { + let mut g = RUNTIME.lock().unwrap(); + match g.as_ref() { + None => { + tracing_init(); + let res = tokio::runtime::Builder::new_multi_thread() + .worker_threads(12) + .max_blocking_threads(256) + .enable_all() + .on_thread_start(|| { + let _old = panic::take_hook(); + panic::set_hook(Box::new(move |info| { + let payload = if let Some(k) = info.payload().downcast_ref::() { + format!("{:?}", k) + } + else if let Some(k) = info.payload().downcast_ref::() { + k.into() + } + else if let Some(&k) = info.payload().downcast_ref::<&str>() { + k.into() + } + else { + format!("unknown payload type") + }; + error!( + "✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ panicking\n{:?}\nLOCATION: {:?}\nPAYLOAD: {:?}\ninfo object: {:?}\nerr: {:?}", + Error::with_msg("catched panic in taskrun::run"), + info.location(), + info.payload(), + info, + payload, + ); + //old(info); + })); + }) + .build() + .unwrap(); + let a = Arc::new(res); + *g = Some(a.clone()); + a + } + Some(g) => g.clone(), + } +} + pub fn run>>(f: F) -> Result { - tracing_init(); - let res = tokio::runtime::Builder::new_multi_thread() - .worker_threads(12) - .max_blocking_threads(256) - .enable_all() - .on_thread_start(|| { - let _old = panic::take_hook(); - panic::set_hook(Box::new(move |info| { - let payload = if let Some(k) = info.payload().downcast_ref::() { - format!("{:?}", k) - } - else if let Some(k) = info.payload().downcast_ref::() { - k.into() - } - else if let Some(&k) = info.payload().downcast_ref::<&str>() { - k.into() - } - else { - format!("unknown payload type") - }; - error!( - "✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ panicking\n{:?}\nLOCATION: {:?}\nPAYLOAD: {:?}\ninfo object: {:?}\nerr: {:?}", - Error::with_msg("catched panic in taskrun::run"), - info.location(), - info.payload(), - info, - payload, - ); - //old(info); - })); - }) - .build() - .unwrap() - .block_on(async { f.await }); + let runtime = get_runtime(); + let res = runtime.block_on(async { f.await }); match res { Ok(k) => Ok(k), Err(e) => { @@ -70,6 +88,7 @@ pub fn tracing_init() { "info,daqbuffer=trace,daqbuffer::test=trace,disk::raw::conn=info", )) .init(); + warn!("tracing_init done"); *g = 1; } } @@ -90,8 +109,8 @@ pub fn test_cluster() -> netpod::Cluster { listen: "0.0.0.0".into(), port: 8360 + id as u16, port_raw: 8360 + id as u16 + 100, - data_base_path: format!("tmpdata/node{:02}", id).into(), - cache_base_path: format!("tmpdata/node{:02}", id).into(), + data_base_path: format!("../tmpdata/node{:02}", id).into(), + cache_base_path: format!("../tmpdata/node{:02}", id).into(), ksprefix: "ks".into(), split: id, backend: "testbackend".into(),