diff --git a/archapp/src/events.rs b/archapp/src/events.rs index 5a5e4d8..1a511b4 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -276,6 +276,7 @@ macro_rules! arm1 { $sty1, $sty2 ), + AggKind::TimeWeightedScalar => panic!(), AggKind::DimXBinsN(_) => arm2!( $item, EventValues, @@ -298,6 +299,7 @@ macro_rules! arm1 { $sty1, $sty2 ), + AggKind::TimeWeightedScalar => panic!(), AggKind::DimXBins1 => arm2!( $item, XBinnedScalarEvents, diff --git a/archapp/src/lib.rs b/archapp/src/lib.rs index 5bc8c05..102b8e7 100644 --- a/archapp/src/lib.rs +++ b/archapp/src/lib.rs @@ -141,6 +141,11 @@ macro_rules! wagg1 { ($k:expr, $ak:expr, $shape:expr, $sty:ident) => { match $ak { AggKind::Plain => EventsItem::Plain(PlainEvents::Wave(WavePlainEvents::$sty($k))), + AggKind::TimeWeightedScalar => { + let p = WaveXBinner::create($shape, $ak.clone()); + let j = p.process($k); + EventsItem::XBinnedEvents(XBinnedEvents::SingleBinWave(SingleBinWaveEvents::$sty(j))) + } AggKind::DimXBins1 => { let p = WaveXBinner::create($shape, $ak.clone()); let j = p.process($k); @@ -254,6 +259,7 @@ impl MultiBinWaveEvents { match self { Byte(k) => match ak { AggKind::Plain => EventsItem::XBinnedEvents(XBinnedEvents::MultiBinWave(MultiBinWaveEvents::Byte(k))), + AggKind::TimeWeightedScalar => err::todoval(), AggKind::DimXBins1 => err::todoval(), AggKind::DimXBinsN(_) => EventsItem::Plain(PlainEvents::Wave(err::todoval())), }, @@ -340,6 +346,7 @@ impl SingleBinWaveEvents { match self { Byte(k) => match ak { AggKind::Plain => EventsItem::XBinnedEvents(XBinnedEvents::SingleBinWave(SingleBinWaveEvents::Byte(k))), + AggKind::TimeWeightedScalar => err::todoval(), AggKind::DimXBins1 => err::todoval(), AggKind::DimXBinsN(_) => EventsItem::Plain(PlainEvents::Wave(err::todoval())), }, diff --git a/daqbufp2/src/lib.rs b/daqbufp2/src/lib.rs index 5b08722..5671f9f 100644 --- a/daqbufp2/src/lib.rs +++ b/daqbufp2/src/lib.rs @@ -1,13 +1,12 @@ -use tokio::task::JoinHandle; - -use err::Error; -use netpod::{Cluster, NodeConfig, NodeConfigCached, ProxyConfig}; - pub mod client; pub mod nodes; #[cfg(test)] pub mod test; +use err::Error; +use netpod::{Cluster, NodeConfig, NodeConfigCached, ProxyConfig}; +use tokio::task::JoinHandle; + pub fn spawn_test_hosts(cluster: Cluster) -> Vec>> { let mut ret = vec![]; for node in &cluster.nodes { diff --git a/daqbufp2/src/nodes.rs b/daqbufp2/src/nodes.rs index b011cd6..22208a4 100644 --- a/daqbufp2/src/nodes.rs +++ b/daqbufp2/src/nodes.rs @@ -1,6 +1,6 @@ use crate::spawn_test_hosts; use err::Error; -use netpod::{Cluster, Database, Node}; +use netpod::Cluster; use std::sync::{Arc, Mutex}; use tokio::task::JoinHandle; diff --git a/daqbufp2/src/test.rs b/daqbufp2/src/test.rs index c66a05a..9aedc16 100644 --- a/daqbufp2/src/test.rs +++ b/daqbufp2/src/test.rs @@ -1,8 +1,19 @@ -use bytes::BytesMut; - pub mod binnedbinary; pub mod binnedjson; pub mod events; +pub mod timeweightedjson; + +use bytes::BytesMut; +use err::Error; +use std::future::Future; + +fn run_test(f: F) +where + F: Future>, +{ + std::env::set_current_dir("..").unwrap(); + taskrun::run(f).unwrap(); +} #[test] fn bufs() { diff --git a/daqbufp2/src/test/timeweightedjson.rs b/daqbufp2/src/test/timeweightedjson.rs new file mode 100644 index 0000000..b3396ef --- /dev/null +++ b/daqbufp2/src/test/timeweightedjson.rs @@ -0,0 +1,132 @@ +use crate::nodes::require_test_hosts_running; +use chrono::{DateTime, Utc}; +use disk::binned::query::{BinnedQuery, CacheUsage}; +use err::Error; +use http::StatusCode; +use hyper::Body; +use netpod::log::*; +use netpod::{AggKind, AppendToUrl, Channel, Cluster, NanoRange, APP_JSON}; +use std::time::Duration; +use url::Url; + +#[test] +fn time_weighted_json_0() { + super::run_test(time_weighted_json_0_inner()); +} + +async fn time_weighted_json_0_inner() -> Result<(), Error> { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + get_json_common( + "scalar-i32-be", + "1970-01-01T00:20:10.000Z", + "1970-01-01T01:20:30.000Z", + 10, + AggKind::TimeWeightedScalar, + cluster, + 13, + true, + ) + .await +} + +#[test] +fn time_weighted_json_1() { + super::run_test(time_weighted_json_1_inner()); +} + +async fn time_weighted_json_1_inner() -> Result<(), Error> { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + get_json_common( + "wave-f64-be-n21", + "1970-01-01T00:20:10.000Z", + "1970-01-01T01:20:45.000Z", + 10, + AggKind::TimeWeightedScalar, + cluster, + 13, + true, + ) + .await +} + +// For waveform with N x-bins, see test::binnedjson + +async fn get_json_common( + channel_name: &str, + beg_date: &str, + end_date: &str, + bin_count: u32, + agg_kind: AggKind, + cluster: &Cluster, + expect_bin_count: u32, + expect_finalised_range: bool, +) -> Result<(), Error> { + let t1 = Utc::now(); + let node0 = &cluster.nodes[0]; + let beg_date: DateTime = beg_date.parse()?; + let end_date: DateTime = end_date.parse()?; + let channel_backend = "testbackend"; + let channel = Channel { + backend: channel_backend.into(), + name: channel_name.into(), + }; + let range = NanoRange::from_date_time(beg_date, end_date); + let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind); + query.set_timeout(Duration::from_millis(10000)); + query.set_cache_usage(CacheUsage::Ignore); + let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", node0.host, node0.port))?; + query.append_to_url(&mut url); + let url = url; + info!("get_json_common get {}", url); + let req = hyper::Request::builder() + .method(http::Method::GET) + .uri(url.to_string()) + .header(http::header::ACCEPT, APP_JSON) + .body(Body::empty())?; + let client = hyper::Client::new(); + let res = client.request(req).await?; + if res.status() != StatusCode::OK { + error!("get_json_common client response {:?}", res); + } + let res = hyper::body::to_bytes(res.into_body()).await?; + let t2 = chrono::Utc::now(); + let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; + info!("get_json_common DONE time {} ms", ms); + let res = String::from_utf8_lossy(&res).to_string(); + //info!("get_json_common res: {}", res); + let res: serde_json::Value = serde_json::from_str(res.as_str())?; + info!( + "result from endpoint: --------------\n{}\n--------------", + serde_json::to_string_pretty(&res)? + ); + // TODO enable in future: + if false { + if expect_finalised_range { + if !res + .get("finalisedRange") + .ok_or(Error::with_msg("missing finalisedRange"))? + .as_bool() + .ok_or(Error::with_msg("key finalisedRange not bool"))? + { + return Err(Error::with_msg("expected finalisedRange")); + } + } else if res.get("finalisedRange").is_some() { + return Err(Error::with_msg("expect absent finalisedRange")); + } + } + if res.get("counts").unwrap().as_array().unwrap().len() != expect_bin_count as usize { + return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count))); + } + if res.get("mins").unwrap().as_array().unwrap().len() != expect_bin_count as usize { + return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count))); + } + if res.get("maxs").unwrap().as_array().unwrap().len() != expect_bin_count as usize { + return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count))); + } + if res.get("avgs").unwrap().as_array().unwrap().len() != expect_bin_count as usize { + return Err(Error::with_msg(format!("expect_bin_count {}", expect_bin_count))); + } + Ok(()) +} diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index 1e4065c..f495eb6 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -3,13 +3,28 @@ use futures_util::StreamExt; use items::{RangeCompletableItem, Sitemty, StreamItem, TimeBinnableType, TimeBinnableTypeAggregator}; use netpod::log::*; use netpod::BinnedRange; +use netpod::NanoRange; use std::collections::VecDeque; -use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; -pub struct DefaultBinsTimeBinner { - _m1: PhantomData, +pub trait TimeBinningChoice { + type Output: TimeBinnableType; + type Aggregator: TimeBinnableTypeAggregator + Send + Unpin; + fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator; +} + +pub struct TimeWeightedBinMethodMarker {} + +pub struct TBinnerStreamPlay +where + S: Stream>, + TBT: TimeBinnableType, +{ + inp: Pin>, + left: Option>>>, + //aggtor: Option<::Aggregator>, + a: Option, } pub struct TBinnerStream @@ -30,6 +45,7 @@ where range_complete_emitted: bool, errored: bool, completed: bool, + do_time_weight: bool, } impl TBinnerStream @@ -37,7 +53,7 @@ where S: Stream> + Send + Unpin + 'static, TBT: TimeBinnableType, { - pub fn new(inp: S, spec: BinnedRange, x_bin_count: usize) -> Self { + pub fn new(inp: S, spec: BinnedRange, x_bin_count: usize, do_time_weight: bool) -> Self { let range = spec.get_range(0); Self { inp: Box::pin(inp), @@ -45,7 +61,11 @@ where x_bin_count, curbin: 0, left: None, - aggtor: Some(::aggregator(range, x_bin_count)), + aggtor: Some(::aggregator( + range, + x_bin_count, + do_time_weight, + )), tmp_agg_results: VecDeque::new(), inp_completed: false, all_bins_emitted: false, @@ -53,6 +73,7 @@ where range_complete_emitted: false, errored: false, completed: false, + do_time_weight, } } @@ -96,7 +117,11 @@ where let range = self.spec.get_range(self.curbin); let ret = self .aggtor - .replace(::aggregator(range, self.x_bin_count)) + .replace(::aggregator( + range, + self.x_bin_count, + self.do_time_weight, + )) .unwrap() .result(); // TODO should we accumulate bins before emit? Maybe not, we want to stay responsive. diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 1ce34f5..e9a935a 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -122,7 +122,12 @@ impl ChannelExecFunction for BinnedBinaryChannelExec { }; let x_bin_count = x_bin_count(&shape, self.query.agg_kind()); let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); - let s = TBinnerStream::<_, ::Output>::new(s, range, x_bin_count); + let s = TBinnerStream::<_, ::Output>::new( + s, + range, + x_bin_count, + self.query.agg_kind().do_time_weighted(), + ); let s = s.map(|item| match item.make_frame() { Ok(item) => Ok(item.freeze()), Err(e) => Err(e), @@ -361,7 +366,12 @@ impl ChannelExecFunction for BinnedJsonChannelExec { }; let x_bin_count = x_bin_count(&shape, self.query.agg_kind()); let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); - let s = TBinnerStream::<_, ::Output>::new(s, range, x_bin_count); + let s = TBinnerStream::<_, ::Output>::new( + s, + range, + x_bin_count, + self.query.agg_kind().do_time_weighted(), + ); let f = collect_plain_events_json(s, self.timeout, t_bin_count, self.query.do_log()); let s = futures_util::stream::once(f).map(|item| match item { Ok(item) => Ok(Bytes::from(serde_json::to_vec(&item)?)), diff --git a/disk/src/binned/binnedfrompbv.rs b/disk/src/binned/binnedfrompbv.rs index fe328db..77af6e2 100644 --- a/disk/src/binned/binnedfrompbv.rs +++ b/disk/src/binned/binnedfrompbv.rs @@ -235,7 +235,7 @@ where ready(g) } }); - let inp = TBinnerStream::<_, TBT>::new(inp, range, x_bin_count(&shape, &agg_kind)); + let inp = TBinnerStream::<_, TBT>::new(inp, range, x_bin_count(&shape, &agg_kind), agg_kind.do_time_weighted()); Ok(Self { inp: Box::pin(inp), _m1: PhantomData, diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index 1924605..39cd9a6 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -137,6 +137,7 @@ where s, range, x_bin_count(&self.shape, &self.agg_kind), + self.agg_kind.do_time_weighted(), ); Ok(Box::pin(ret)) } diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index 946e564..f860e6f 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -53,7 +53,7 @@ where Shape::Scalar => { let evs = EventValuesDim0Case::new(); match agg_kind { - AggKind::DimXBins1 => { + AggKind::TimeWeightedScalar | AggKind::DimXBins1 => { let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); make_num_pipeline_nty_end_evs_enp::( shape, @@ -83,7 +83,7 @@ where Shape::Wave(n) => { let evs = EventValuesDim1Case::new(n); match agg_kind { - AggKind::DimXBins1 => { + AggKind::TimeWeightedScalar | AggKind::DimXBins1 => { let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); make_num_pipeline_nty_end_evs_enp::( shape, diff --git a/disk/src/binned/query.rs b/disk/src/binned/query.rs index d9be8ba..4dbefd7 100644 --- a/disk/src/binned/query.rs +++ b/disk/src/binned/query.rs @@ -69,7 +69,7 @@ impl PreBinnedQuery { let ret = Self { patch: PreBinnedPatchCoord::new(bin_t_len, patch_t_len, patch_ix), channel: channel_from_pairs(&pairs)?, - agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::DimXBins1), + agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::TimeWeightedScalar), cache_usage: CacheUsage::from_pairs(&pairs)?, disk_io_buffer_size: pairs .get("diskIoBufferSize") @@ -312,7 +312,7 @@ impl FromUrl for BinnedQuery { .ok_or(Error::with_msg("missing binCount"))? .parse() .map_err(|e| Error::with_msg(format!("can not parse binCount {:?}", e)))?, - agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::DimXBins1), + agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::TimeWeightedScalar), cache_usage: CacheUsage::from_pairs(&pairs)?, disk_io_buffer_size: pairs .get("diskIoBufferSize") @@ -382,11 +382,14 @@ impl AppendToUrl for BinnedQuery { fn binning_scheme_append_to_url(agg_kind: &AggKind, url: &mut Url) { let mut g = url.query_pairs_mut(); match agg_kind { + AggKind::TimeWeightedScalar => { + g.append_pair("binningScheme", "timeWeightedScalar"); + } AggKind::Plain => { g.append_pair("binningScheme", "fullValue"); } AggKind::DimXBins1 => { - g.append_pair("binningScheme", "toScalarX"); + g.append_pair("binningScheme", "unweightedScalar"); } AggKind::DimXBinsN(n) => { g.append_pair("binningScheme", "toScalarX"); @@ -402,7 +405,9 @@ fn agg_kind_from_binning_scheme(pairs: &BTreeMap) -> Result as EventValueShape>::NumXAggPlain as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); channel_exec_nty_end_evs_enp(f, byte_order, shape, evs, events_node_proc) } + AggKind::TimeWeightedScalar => { + let evs = EventValuesDim0Case::new(); + let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); + channel_exec_nty_end_evs_enp(f, byte_order, shape, evs, events_node_proc) + } AggKind::DimXBins1 => { let evs = EventValuesDim0Case::new(); let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); @@ -118,6 +123,11 @@ where let events_node_proc = < as EventValueShape>::NumXAggPlain as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); channel_exec_nty_end_evs_enp(f, byte_order, shape, evs, events_node_proc) } + AggKind::TimeWeightedScalar => { + let evs = EventValuesDim1Case::new(n); + let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); + channel_exec_nty_end_evs_enp(f, byte_order, shape, evs, events_node_proc) + } AggKind::DimXBins1 => { let evs = EventValuesDim1Case::new(n); let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); diff --git a/disk/src/raw/client.rs b/disk/src/raw/client.rs index 1547fb0..3487c28 100644 --- a/disk/src/raw/client.rs +++ b/disk/src/raw/client.rs @@ -3,7 +3,7 @@ Delivers event data. Delivers event data (not yet time-binned) from local storage and provides client functions to request such data from nodes. - */ +*/ use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::raw::eventsfromframes::EventsFromFrames; diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index d37f665..3fd943f 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -49,11 +49,13 @@ where macro_rules! pipe4 { ($nty:ident, $end:ident, $shape:expr, $evs:ident, $evsv:expr, $agg_kind:expr, $event_blobs:expr) => { match $agg_kind { - AggKind::DimXBins1 => make_num_pipeline_stream_evs::<$nty, $end, $evs<$nty>, _>( - $evsv, - <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToSingleBin::create($shape, $agg_kind), - $event_blobs, - ), + AggKind::TimeWeightedScalar | AggKind::DimXBins1 => { + make_num_pipeline_stream_evs::<$nty, $end, $evs<$nty>, _>( + $evsv, + <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToSingleBin::create($shape, $agg_kind), + $event_blobs, + ) + } AggKind::DimXBinsN(_) => make_num_pipeline_stream_evs::<$nty, $end, $evs<$nty>, _>( $evsv, <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToNBins::create($shape, $agg_kind), @@ -128,9 +130,11 @@ pub async fn make_event_pipe( evq: &RawEventsQuery, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { - match dbconn::channel_exists(&evq.channel, &node_config).await { - Ok(_) => (), - Err(e) => return Err(e)?, + if false { + match dbconn::channel_exists(&evq.channel, &node_config).await { + Ok(_) => (), + Err(e) => return Err(e)?, + } } let range = &evq.range; let channel_config = match read_local_config(&evq.channel, &node_config.node).await { @@ -176,7 +180,7 @@ pub async fn make_event_pipe( evq.disk_io_buffer_size, event_chunker_conf, ); - let shape = entry.to_shape().unwrap(); + let shape = entry.to_shape()?; let pipe = pipe1!( entry.scalar_type, entry.byte_order, diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index d59a0b2..a0160ba 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -197,7 +197,7 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R source_regex: String, ordering: String, reload: bool, - }; + } hosts.iter().zip(backends.iter()).for_each(|(sh, back)| { let url = Url::parse(&format!("{}/channels/config", sh)).unwrap(); urls.push(url); diff --git a/httpret/static/documentation/api4.html b/httpret/static/documentation/api4.html index 5c34af7..64a071e 100644 --- a/httpret/static/documentation/api4.html +++ b/httpret/static/documentation/api4.html @@ -197,9 +197,11 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/events?channel
  • binCount (number of requested bins in time-dimension, e.g. "6")
  • binningScheme (optional)
    • -
    • if not specified: waveform gets first binned to a scalar.
    • -
    • "binningScheme=binnedX&binnedXcount=13": waveform gets first binned to 13 bins in X-dimension (waveform-dimension).
    • -
    • "binningScheme=binnedX&binnedXcount=0": waveform is not binned in X-dimension but kept at full length.
    • +
    • if not specified: default is "binningScheme=timeWeightedScalar".
    • +
    • "binningScheme=timeWeightedScalar": time-weighted binning, waveform gets first averaged to a scalar.
    • +
    • "binningScheme=unweightedScalar": non-weighted binning, waveform gets first averaged to a scalar.
    • +
    • "binningScheme=binnedX&binnedXcount=13": waveform gets first binned to 13 bins in X-dimension (waveform-dimension).
    • +
    • "binningScheme=binnedX&binnedXcount=0": waveform is not binned in X-dimension but kept at full length.

    Request header: "Accept" must be "application/json"

    diff --git a/items/src/eventvalues.rs b/items/src/eventvalues.rs index 1558386..69cd51c 100644 --- a/items/src/eventvalues.rs +++ b/items/src/eventvalues.rs @@ -7,6 +7,7 @@ use crate::{ WithLen, WithTimestamps, }; use err::Error; +use netpod::timeunits::SEC; use netpod::NanoRange; use serde::{Deserialize, Serialize}; use std::fmt; @@ -166,8 +167,12 @@ where type Output = MinMaxAvgBins; type Aggregator = EventValuesAggregator; - fn aggregator(range: NanoRange, _bin_count: usize) -> Self::Aggregator { - Self::Aggregator::new(range) + fn aggregator(range: NanoRange, _bin_count: usize, do_time_weight: bool) -> Self::Aggregator { + // TODO remove output + if range.delta() > SEC * 5000 { + netpod::log::info!("TimeBinnableType for EventValues aggregator() range {:?}", range); + } + Self::Aggregator::new(range, do_time_weight) } } @@ -259,10 +264,13 @@ pub struct EventValuesAggregator { max: Option, sumc: u64, sum: f32, + last_ts: u64, + last_val: Option, + do_time_weight: bool, } impl EventValuesAggregator { - pub fn new(range: NanoRange) -> Self { + pub fn new(range: NanoRange, do_time_weight: bool) -> Self { Self { range, count: 0, @@ -270,8 +278,53 @@ impl EventValuesAggregator { max: None, sum: 0f32, sumc: 0, + last_ts: 0, + last_val: None, + do_time_weight, } } + + fn apply_event(&mut self, ts: u64, val: Option) + where + NTY: NumOps, + { + if let Some(v) = self.last_val { + self.min = match self.min { + None => Some(v), + Some(min) => { + if v < min { + Some(v) + } else { + Some(min) + } + } + }; + self.max = match self.max { + None => Some(v), + Some(max) => { + if v > max { + Some(v) + } else { + Some(max) + } + } + }; + let w = if self.do_time_weight { + (ts - self.last_ts) as f32 / 1000000000 as f32 + } else { + 1. + }; + let vf = v.as_(); + if vf.is_nan() { + } else { + self.sum += vf * w; + self.sumc += 1; + } + self.count += 1; + } + self.last_ts = ts; + self.last_val = val; + } } impl TimeBinnableTypeAggregator for EventValuesAggregator @@ -289,43 +342,17 @@ where for i1 in 0..item.tss.len() { let ts = item.tss[i1]; if ts < self.range.beg { - continue; + self.last_ts = ts; + self.last_val = Some(item.values[i1]); } else if ts >= self.range.end { - continue; } else { - let v = item.values[i1]; - self.min = match self.min { - None => Some(v), - Some(min) => { - if v < min { - Some(v) - } else { - Some(min) - } - } - }; - self.max = match self.max { - None => Some(v), - Some(max) => { - if v > max { - Some(v) - } else { - Some(max) - } - } - }; - let vf = v.as_(); - if vf.is_nan() { - } else { - self.sum += vf; - self.sumc += 1; - } - self.count += 1; + self.apply_event(ts, Some(item.values[i1])); } } } - fn result(self) -> Self::Output { + fn result(mut self) -> Self::Output { + self.apply_event(self.range.end, None); let avg = if self.sumc == 0 { None } else { diff --git a/items/src/lib.rs b/items/src/lib.rs index 2e22288..9e3124e 100644 --- a/items/src/lib.rs +++ b/items/src/lib.rs @@ -308,7 +308,7 @@ pub trait TimeBinnableType: { type Output: TimeBinnableType; type Aggregator: TimeBinnableTypeAggregator + Send + Unpin; - fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator; + fn aggregator(range: NanoRange, bin_count: usize, do_time_weight: bool) -> Self::Aggregator; } // TODO should get I/O and tokio dependence out of this crate diff --git a/items/src/minmaxavgbins.rs b/items/src/minmaxavgbins.rs index 0e42e96..066de18 100644 --- a/items/src/minmaxavgbins.rs +++ b/items/src/minmaxavgbins.rs @@ -180,7 +180,7 @@ where type Output = MinMaxAvgBins; type Aggregator = MinMaxAvgBinsAggregator; - fn aggregator(range: NanoRange, _x_bin_count: usize) -> Self::Aggregator { + fn aggregator(range: NanoRange, _x_bin_count: usize, _do_time_weight: bool) -> Self::Aggregator { Self::Aggregator::new(range) } } diff --git a/items/src/minmaxavgdim1bins.rs b/items/src/minmaxavgdim1bins.rs index 93b4e88..9ddcf3b 100644 --- a/items/src/minmaxavgdim1bins.rs +++ b/items/src/minmaxavgdim1bins.rs @@ -181,7 +181,7 @@ where type Output = MinMaxAvgDim1Bins; type Aggregator = MinMaxAvgDim1BinsAggregator; - fn aggregator(range: NanoRange, x_bin_count: usize) -> Self::Aggregator { + fn aggregator(range: NanoRange, x_bin_count: usize, _do_time_weight: bool) -> Self::Aggregator { Self::Aggregator::new(range, x_bin_count) } } diff --git a/items/src/minmaxavgwavebins.rs b/items/src/minmaxavgwavebins.rs index 8ceeecb..081812f 100644 --- a/items/src/minmaxavgwavebins.rs +++ b/items/src/minmaxavgwavebins.rs @@ -179,7 +179,7 @@ where type Output = MinMaxAvgWaveBins; type Aggregator = MinMaxAvgWaveBinsAggregator; - fn aggregator(range: NanoRange, x_bin_count: usize) -> Self::Aggregator { + fn aggregator(range: NanoRange, x_bin_count: usize, _do_time_weight: bool) -> Self::Aggregator { Self::Aggregator::new(range, x_bin_count) } } diff --git a/items/src/waveevents.rs b/items/src/waveevents.rs index b93b435..d50707b 100644 --- a/items/src/waveevents.rs +++ b/items/src/waveevents.rs @@ -150,8 +150,8 @@ where type Output = MinMaxAvgDim1Bins; type Aggregator = WaveEventsAggregator; - fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator { - Self::Aggregator::new(range, bin_count) + fn aggregator(range: NanoRange, bin_count: usize, do_time_weight: bool) -> Self::Aggregator { + Self::Aggregator::new(range, bin_count, do_time_weight) } } @@ -171,7 +171,7 @@ impl WaveEventsAggregator where NTY: NumOps, { - pub fn new(range: NanoRange, _x_bin_count: usize) -> Self { + pub fn new(range: NanoRange, _x_bin_count: usize, _do_time_weight: bool) -> Self { Self { range, count: 0, @@ -196,6 +196,8 @@ where } fn ingest(&mut self, item: &Self::Input) { + error!("time-weighted binning not available"); + err::todo(); for i1 in 0..item.tss.len() { let ts = item.tss[i1]; if ts < self.range.beg { diff --git a/items/src/xbinnedscalarevents.rs b/items/src/xbinnedscalarevents.rs index 667a895..b41aa21 100644 --- a/items/src/xbinnedscalarevents.rs +++ b/items/src/xbinnedscalarevents.rs @@ -6,6 +6,8 @@ use crate::{ ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; use err::Error; +use netpod::log::error; +use netpod::timeunits::SEC; use netpod::NanoRange; use serde::{Deserialize, Serialize}; use tokio::fs::File; @@ -154,8 +156,15 @@ where type Output = MinMaxAvgBins; type Aggregator = XBinnedScalarEventsAggregator; - fn aggregator(range: NanoRange, _x_bin_count: usize) -> Self::Aggregator { - Self::Aggregator::new(range) + fn aggregator(range: NanoRange, _x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { + // TODO remove output + if range.delta() > SEC * 0 { + netpod::log::info!( + "TimeBinnableType for XBinnedScalarEvents aggregator() range {:?}", + range + ); + } + Self::Aggregator::new(range, do_time_weight) } } @@ -175,7 +184,7 @@ impl XBinnedScalarEventsAggregator where NTY: NumOps, { - pub fn new(range: NanoRange) -> Self { + pub fn new(range: NanoRange, _do_time_weight: bool) -> Self { Self { range, count: 0, @@ -199,6 +208,8 @@ where } fn ingest(&mut self, item: &Self::Input) { + error!("time-weighted binning not available here."); + err::todo(); for i1 in 0..item.tss.len() { let ts = item.tss[i1]; if ts < self.range.beg { diff --git a/items/src/xbinnedwaveevents.rs b/items/src/xbinnedwaveevents.rs index 5d4dc29..683123f 100644 --- a/items/src/xbinnedwaveevents.rs +++ b/items/src/xbinnedwaveevents.rs @@ -157,8 +157,8 @@ where type Output = MinMaxAvgWaveBins; type Aggregator = XBinnedWaveEventsAggregator; - fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator { - Self::Aggregator::new(range, bin_count) + fn aggregator(range: NanoRange, bin_count: usize, do_time_weight: bool) -> Self::Aggregator { + Self::Aggregator::new(range, bin_count, do_time_weight) } } @@ -178,7 +178,7 @@ impl XBinnedWaveEventsAggregator where NTY: NumOps, { - pub fn new(range: NanoRange, bin_count: usize) -> Self { + pub fn new(range: NanoRange, bin_count: usize, do_time_weight: bool) -> Self { if bin_count == 0 { panic!("bin_count == 0"); } @@ -205,6 +205,8 @@ where } fn ingest(&mut self, item: &Self::Input) { + error!("time-weighted binning not available"); + err::todo(); //info!("XBinnedWaveEventsAggregator ingest item {:?}", item); for i1 in 0..item.tss.len() { let ts = item.tss[i1]; diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 4a6a18a..1801ed6 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -4,7 +4,7 @@ pub mod status; pub mod streamext; use std::collections::BTreeMap; -use std::fmt::{self, Debug, Display, Formatter}; +use std::fmt; use std::iter::FromIterator; use std::path::PathBuf; use std::pin::Pin; @@ -270,8 +270,8 @@ pub struct NanoRange { pub end: u64, } -impl std::fmt::Debug for NanoRange { - fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { +impl fmt::Debug for NanoRange { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!( fmt, "NanoRange {{ beg: {} s, end: {} s }}", @@ -714,10 +714,23 @@ pub enum AggKind { DimXBins1, DimXBinsN(u32), Plain, + TimeWeightedScalar, +} + +impl AggKind { + pub fn do_time_weighted(&self) -> bool { + match self { + Self::TimeWeightedScalar => true, + Self::DimXBins1 => false, + Self::DimXBinsN(_) => false, + Self::Plain => false, + } + } } pub fn x_bin_count(shape: &Shape, agg_kind: &AggKind) -> usize { match agg_kind { + AggKind::TimeWeightedScalar => 0, AggKind::DimXBins1 => 0, AggKind::DimXBinsN(n) => { if *n == 0 { @@ -736,8 +749,8 @@ pub fn x_bin_count(shape: &Shape, agg_kind: &AggKind) -> usize { } } -impl Display for AggKind { - fn fmt(&self, fmt: &mut Formatter) -> std::fmt::Result { +impl fmt::Display for AggKind { + fn fmt(&self, fmt: &mut fmt::Formatter) -> std::fmt::Result { match self { Self::DimXBins1 => { write!(fmt, "DimXBins1") @@ -748,13 +761,16 @@ impl Display for AggKind { Self::Plain => { write!(fmt, "Plain") } + Self::TimeWeightedScalar => { + write!(fmt, "TimeWeightedScalar") + } } } } -impl Debug for AggKind { - fn fmt(&self, fmt: &mut Formatter) -> std::fmt::Result { - Display::fmt(self, fmt) +impl fmt::Debug for AggKind { + fn fmt(&self, fmt: &mut fmt::Formatter) -> std::fmt::Result { + fmt::Display::fmt(self, fmt) } } @@ -765,6 +781,8 @@ impl FromStr for AggKind { let nmark = "DimXBinsN"; if s == "DimXBins1" { Ok(AggKind::DimXBins1) + } else if s == "TimeWeightedScalar" { + Ok(AggKind::TimeWeightedScalar) } else if s.starts_with(nmark) { let nbins: u32 = s[nmark.len()..].parse()?; Ok(AggKind::DimXBinsN(nbins)) diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 41734f3..f6e8c5c 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -51,6 +51,7 @@ async fn events_conn_handler_inner( match events_conn_handler_inner_try(stream, addr, node_config).await { Ok(_) => (), Err(ce) => { + // TODO pass errors over network. error!("events_conn_handler_inner: {:?}", ce.err); } } diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index 1424144..634032c 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -90,8 +90,8 @@ pub fn test_cluster() -> netpod::Cluster { listen: "0.0.0.0".into(), port: 8360 + id as u16, port_raw: 8360 + id as u16 + 100, - data_base_path: format!("../tmpdata/node{:02}", id).into(), - cache_base_path: format!("../tmpdata/node{:02}", id).into(), + data_base_path: format!("tmpdata/node{:02}", id).into(), + cache_base_path: format!("tmpdata/node{:02}", id).into(), ksprefix: "ks".into(), split: id, backend: "testbackend".into(),