From 75abb4140a2c41300e5e22628d650694622bb3a7 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 8 Sep 2021 13:45:46 +0200 Subject: [PATCH] WIP on time weight bins --- archapp/src/events.rs | 13 +- archapp/src/lib.rs | 102 ++++++------ archapp/src/parse.rs | 5 +- archapp/src/parse/multi.rs | 2 +- daqbufp2/src/nodes.rs | 6 +- daqbufp2/src/test.rs | 1 - daqbufp2/src/test/timeweightedjson.rs | 190 +++++++++++----------- disk/src/agg/binnedt.rs | 7 +- disk/src/eventblobs.rs | 9 +- h5out/src/lib.rs | 18 ++- httpret/src/gather.rs | 2 +- httpret/src/lib.rs | 9 +- httpret/src/proxy.rs | 4 +- items/src/eventvalues.rs | 29 +--- items/src/minmaxavgbins.rs | 2 +- items/src/minmaxavgdim1bins.rs | 2 +- items/src/minmaxavgwavebins.rs | 2 +- items/src/waveevents.rs | 2 +- items/src/xbinnedscalarevents.rs | 218 +++++++++++++++++++------- items/src/xbinnedwaveevents.rs | 2 +- 20 files changed, 357 insertions(+), 268 deletions(-) diff --git a/archapp/src/events.rs b/archapp/src/events.rs index 1a511b4..0f74e35 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -9,12 +9,10 @@ use err::Error; use futures_core::Stream; use futures_util::StreamExt; use items::eventvalues::EventValues; -use items::waveevents::{WaveEvents, WaveXBinner}; +use items::waveevents::WaveEvents; use items::xbinnedscalarevents::XBinnedScalarEvents; use items::xbinnedwaveevents::XBinnedWaveEvents; -use items::{ - EventsNodeProcessor, Framable, RangeCompletableItem, Sitemty, SitemtyFrameType, StreamItem, WithLen, WithTimestamps, -}; +use items::{Framable, RangeCompletableItem, Sitemty, SitemtyFrameType, StreamItem, WithLen, WithTimestamps}; use netpod::log::*; use netpod::query::RawEventsQuery; use netpod::timeunits::{DAY, SEC}; @@ -106,6 +104,7 @@ impl StorageMerge { let mut i1 = self.inprng; let mut j1 = not_found; let mut tsmin = u64::MAX; + #[allow(unused)] use items::{WithLen, WithTimestamps}; loop { if self.completed_inps[i1] { @@ -175,7 +174,8 @@ struct FrameMaker { } impl FrameMaker { - fn make_frame_gen(item: Sitemty) -> Box + #[allow(dead_code)] + fn make_frame_gen(_item: Sitemty) -> Box where T: SitemtyFrameType + Serialize + Send + 'static, { @@ -183,6 +183,7 @@ impl FrameMaker { } } +#[allow(unused_macros)] macro_rules! events_item_to_sitemty { ($ei:expr, $t1:ident, $t2:ident, $t3:ident) => {{ let ret = match $ei { @@ -501,7 +502,7 @@ async fn position_file_for_evq(mut file: File, evq: RawEventsQuery, year: u32) - } } -async fn position_file_for_evq_linear(mut file: File, evq: RawEventsQuery, year: u32) -> Result { +async fn position_file_for_evq_linear(file: File, evq: RawEventsQuery, _year: u32) -> Result { let mut pbr = PbFileReader::new(file).await; pbr.read_header().await?; loop { diff --git a/archapp/src/lib.rs b/archapp/src/lib.rs index 102b8e7..25b6f06 100644 --- a/archapp/src/lib.rs +++ b/archapp/src/lib.rs @@ -13,7 +13,7 @@ use items::numops::NumOps; use items::waveevents::{WaveEvents, WaveXBinner}; use items::xbinnedscalarevents::XBinnedScalarEvents; use items::xbinnedwaveevents::XBinnedWaveEvents; -use items::{EventsNodeProcessor, Framable, SitemtyFrameType, WithLen, WithTimestamps}; +use items::{EventsNodeProcessor, SitemtyFrameType, WithLen, WithTimestamps}; use netpod::{AggKind, HasScalarType, HasShape, ScalarType, Shape}; #[cfg(not(feature = "devread"))] pub use parsestub as parse; @@ -60,11 +60,11 @@ impl ScalarPlainEvents { pub fn variant_name(&self) -> String { use ScalarPlainEvents::*; match self { - Byte(h) => format!("Byte"), - Short(h) => format!("Short"), - Int(h) => format!("Int"), - Float(h) => format!("Float"), - Double(h) => format!("Double"), + Byte(_) => format!("Byte"), + Short(_) => format!("Short"), + Int(_) => format!("Int"), + Float(_) => format!("Float"), + Double(_) => format!("Double"), } } } @@ -97,7 +97,6 @@ impl WithTimestamps for ScalarPlainEvents { impl HasShape for ScalarPlainEvents { fn shape(&self) -> Shape { - use ScalarPlainEvents::*; match self { _ => Shape::Scalar, } @@ -108,11 +107,11 @@ impl HasScalarType for ScalarPlainEvents { fn scalar_type(&self) -> ScalarType { use ScalarPlainEvents::*; match self { - Byte(h) => ScalarType::I8, - Short(h) => ScalarType::I16, - Int(h) => ScalarType::I32, - Float(h) => ScalarType::F32, - Double(h) => ScalarType::F64, + Byte(_) => ScalarType::I8, + Short(_) => ScalarType::I16, + Int(_) => ScalarType::I32, + Float(_) => ScalarType::F32, + Double(_) => ScalarType::F64, } } } @@ -126,13 +125,16 @@ pub enum WavePlainEvents { Double(WaveEvents), } -fn tmp1() { - let ev = EventValues:: { +fn _tmp1() { + let _ev = EventValues:: { tss: vec![], values: vec![], }; ::is_nan(err::todoval()); - as SitemtyFrameType>::FRAME_TYPE_ID; + if as SitemtyFrameType>::FRAME_TYPE_ID == 0 { + // Just a dummy.. + panic!(); + } // as NumOps>::is_nan(err::todoval()); //> as SitemtyFrameType>::FRAME_TYPE_ID; } @@ -224,11 +226,11 @@ impl HasScalarType for WavePlainEvents { fn scalar_type(&self) -> ScalarType { use WavePlainEvents::*; match self { - Byte(h) => ScalarType::I8, - Short(h) => ScalarType::I16, - Int(h) => ScalarType::I32, - Float(h) => ScalarType::F32, - Double(h) => ScalarType::F64, + Byte(_) => ScalarType::I8, + Short(_) => ScalarType::I16, + Int(_) => ScalarType::I32, + Float(_) => ScalarType::F32, + Double(_) => ScalarType::F64, } } } @@ -246,11 +248,11 @@ impl MultiBinWaveEvents { pub fn variant_name(&self) -> String { use MultiBinWaveEvents::*; match self { - Byte(h) => format!("Byte"), - Short(h) => format!("Short"), - Int(h) => format!("Int"), - Float(h) => format!("Float"), - Double(h) => format!("Double"), + Byte(_) => format!("Byte"), + Short(_) => format!("Short"), + Int(_) => format!("Int"), + Float(_) => format!("Float"), + Double(_) => format!("Double"), } } @@ -298,11 +300,11 @@ impl HasShape for MultiBinWaveEvents { fn shape(&self) -> Shape { use MultiBinWaveEvents::*; match self { - Byte(h) => Shape::Scalar, - Short(h) => Shape::Scalar, - Int(h) => Shape::Scalar, - Float(h) => Shape::Scalar, - Double(h) => Shape::Scalar, + Byte(_) => Shape::Scalar, + Short(_) => Shape::Scalar, + Int(_) => Shape::Scalar, + Float(_) => Shape::Scalar, + Double(_) => Shape::Scalar, } } } @@ -311,11 +313,11 @@ impl HasScalarType for MultiBinWaveEvents { fn scalar_type(&self) -> ScalarType { use MultiBinWaveEvents::*; match self { - Byte(h) => ScalarType::I8, - Short(h) => ScalarType::I16, - Int(h) => ScalarType::I32, - Float(h) => ScalarType::F32, - Double(h) => ScalarType::F64, + Byte(_) => ScalarType::I8, + Short(_) => ScalarType::I16, + Int(_) => ScalarType::I32, + Float(_) => ScalarType::F32, + Double(_) => ScalarType::F64, } } } @@ -333,11 +335,11 @@ impl SingleBinWaveEvents { pub fn variant_name(&self) -> String { use SingleBinWaveEvents::*; match self { - Byte(h) => format!("Byte"), - Short(h) => format!("Short"), - Int(h) => format!("Int"), - Float(h) => format!("Float"), - Double(h) => format!("Double"), + Byte(_) => format!("Byte"), + Short(_) => format!("Short"), + Int(_) => format!("Int"), + Float(_) => format!("Float"), + Double(_) => format!("Double"), } } @@ -385,11 +387,11 @@ impl HasShape for SingleBinWaveEvents { fn shape(&self) -> Shape { use SingleBinWaveEvents::*; match self { - Byte(h) => Shape::Scalar, - Short(h) => Shape::Scalar, - Int(h) => Shape::Scalar, - Float(h) => Shape::Scalar, - Double(h) => Shape::Scalar, + Byte(_) => Shape::Scalar, + Short(_) => Shape::Scalar, + Int(_) => Shape::Scalar, + Float(_) => Shape::Scalar, + Double(_) => Shape::Scalar, } } } @@ -398,11 +400,11 @@ impl HasScalarType for SingleBinWaveEvents { fn scalar_type(&self) -> ScalarType { use SingleBinWaveEvents::*; match self { - Byte(h) => ScalarType::I8, - Short(h) => ScalarType::I16, - Int(h) => ScalarType::I32, - Float(h) => ScalarType::F32, - Double(h) => ScalarType::F64, + Byte(_) => ScalarType::I8, + Short(_) => ScalarType::I16, + Int(_) => ScalarType::I32, + Float(_) => ScalarType::F32, + Double(_) => ScalarType::F64, } } } diff --git a/archapp/src/parse.rs b/archapp/src/parse.rs index f2069a2..6e36035 100644 --- a/archapp/src/parse.rs +++ b/archapp/src/parse.rs @@ -387,7 +387,7 @@ impl LruCache { tss.sort_unstable(); let thr = tss[1500]; let m1 = std::mem::replace(&mut self.map, BTreeMap::new()); - self.map = m1.into_iter().filter(|(j, k)| k > &thr).collect(); + self.map = m1.into_iter().filter(|(_j, k)| k > &thr).collect(); } } @@ -421,7 +421,6 @@ pub async fn scan_files_inner( let proots = proot.to_str().unwrap().to_string(); let meta = tokio::fs::metadata(&proot).await?; let mut paths = VecDeque::new(); - let mut waves_found = 0; paths.push_back(PE { path: proot, fty: meta.file_type(), @@ -451,7 +450,7 @@ pub async fn scan_files_inner( } else if pe.fty.is_file() { //tx.send(Ok(Box::new(path.clone()) as RT1)).await?; let fns = pe.path.to_str().ok_or_else(|| Error::with_msg("invalid path string"))?; - if let Ok(fnp) = parse_data_filename(&fns) { + if let Ok(_fnp) = parse_data_filename(&fns) { //tx.send(Ok(Box::new(serde_json::to_value(fns)?) as ItemSerBox)).await?; let channel_path = &fns[proots.len() + 1..fns.len() - 11]; if !lru.query(channel_path) { diff --git a/archapp/src/parse/multi.rs b/archapp/src/parse/multi.rs index f82e74c..a18ebe4 100644 --- a/archapp/src/parse/multi.rs +++ b/archapp/src/parse/multi.rs @@ -37,7 +37,7 @@ pub fn parse_all_ts(off: u64, buf: &[u8], payload_type: PayloadType, year: u32) ret.push(h); } } - Err(e) => { + Err(_e) => { // TODO ignore except if it's the last chunk. } } diff --git a/daqbufp2/src/nodes.rs b/daqbufp2/src/nodes.rs index f6de991..2d1f320 100644 --- a/daqbufp2/src/nodes.rs +++ b/daqbufp2/src/nodes.rs @@ -11,7 +11,7 @@ pub struct RunningHosts { impl Drop for RunningHosts { fn drop(&mut self) { - netpod::log::error!("\n\n+++++++++++++++++++ impl Drop for RunningHost\n\n"); + netpod::log::info!("\n\n+++++++++++++++++++ impl Drop for RunningHost\n\n"); } } @@ -23,7 +23,7 @@ pub fn require_test_hosts_running() -> Result, Error> { let mut g = HOSTS_RUNNING.lock().unwrap(); match g.as_ref() { None => { - netpod::log::error!("\n\n+++++++++++++++++++ MAKE NEW RunningHosts\n\n"); + netpod::log::info!("\n\n+++++++++++++++++++ MAKE NEW RunningHosts\n\n"); let cluster = taskrun::test_cluster(); let jhs = spawn_test_hosts(cluster.clone()); let ret = RunningHosts { @@ -35,7 +35,7 @@ pub fn require_test_hosts_running() -> Result, Error> { Ok(a) } Some(gg) => { - netpod::log::error!("\n\n+++++++++++++++++++ REUSE RunningHost\n\n"); + netpod::log::debug!("\n\n+++++++++++++++++++ REUSE RunningHost\n\n"); Ok(gg.clone()) } } diff --git a/daqbufp2/src/test.rs b/daqbufp2/src/test.rs index 94af6d3..e0e3ca9 100644 --- a/daqbufp2/src/test.rs +++ b/daqbufp2/src/test.rs @@ -11,7 +11,6 @@ fn run_test(f: F) where F: Future> + Send, { - //taskrun::run(f).unwrap(); let runtime = taskrun::get_runtime(); let _g = runtime.enter(); runtime.block_on(f).unwrap(); diff --git a/daqbufp2/src/test/timeweightedjson.rs b/daqbufp2/src/test/timeweightedjson.rs index cf967d7..1b94009 100644 --- a/daqbufp2/src/test/timeweightedjson.rs +++ b/daqbufp2/src/test/timeweightedjson.rs @@ -11,121 +11,113 @@ use url::Url; #[test] fn time_weighted_json_00() { - // Each test must make sure that the nodes are running. - // Can I use absolute paths in the Node Configs to make me independent of the CWD? - // run_test must assume that the CWD when it is started, is the crate directory. - super::run_test(time_weighted_json_00_inner()); -} - -async fn time_weighted_json_00_inner() -> Result<(), Error> { - let rh = require_test_hosts_running()?; - let cluster = &rh.cluster; - let res = get_json_common( - "const-regular-scalar-i32-be", - "1970-01-01T00:20:10.000Z", - "1970-01-01T04:20:30.000Z", - 20, - AggKind::DimXBins1, - cluster, - 25, - true, - ) - .await?; - let v = res.avgs[0]; - assert!(v > 41.9999 && v < 42.0001); - Ok(()) + async fn inner() -> Result<(), Error> { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + let res = get_json_common( + "const-regular-scalar-i32-be", + "1970-01-01T00:20:10.000Z", + "1970-01-01T04:20:30.000Z", + 20, + AggKind::DimXBins1, + cluster, + 25, + true, + ) + .await?; + let v = res.avgs[0]; + assert!(v > 41.9999 && v < 42.0001); + Ok(()) + } + super::run_test(inner()); } #[test] fn time_weighted_json_01() { - super::run_test(time_weighted_json_01_inner()); -} - -async fn time_weighted_json_01_inner() -> Result<(), Error> { - let rh = require_test_hosts_running()?; - let cluster = &rh.cluster; - let res = get_json_common( - "const-regular-scalar-i32-be", - "1970-01-01T00:20:10.000Z", - "1970-01-01T10:20:30.000Z", - 10, - AggKind::DimXBins1, - cluster, - 11, - true, - ) - .await?; - let v = res.avgs[0]; - assert!(v > 41.9999 && v < 42.0001); - Ok(()) + async fn inner() -> Result<(), Error> { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + let res = get_json_common( + "const-regular-scalar-i32-be", + "1970-01-01T00:20:10.000Z", + "1970-01-01T10:20:30.000Z", + 10, + AggKind::DimXBins1, + cluster, + 11, + true, + ) + .await?; + let v = res.avgs[0]; + assert!(v > 41.9999 && v < 42.0001); + Ok(()) + } + super::run_test(inner()); } #[test] fn time_weighted_json_02() { - super::run_test(time_weighted_json_02_inner()); -} - -async fn time_weighted_json_02_inner() -> Result<(), Error> { - let rh = require_test_hosts_running()?; - let cluster = &rh.cluster; - let res = get_json_common( - "const-regular-scalar-i32-be", - "1970-01-01T00:20:10.000Z", - "1970-01-01T00:20:20.000Z", - 20, - AggKind::TimeWeightedScalar, - cluster, - 100, - true, - ) - .await?; - let v = res.avgs[0]; - assert!(v > 41.9999 && v < 42.0001); - Ok(()) + async fn inner() -> Result<(), Error> { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + let res = get_json_common( + "const-regular-scalar-i32-be", + "1970-01-01T00:20:10.000Z", + "1970-01-01T00:20:20.000Z", + 20, + AggKind::TimeWeightedScalar, + cluster, + 100, + true, + ) + .await?; + let v = res.avgs[0]; + assert!(v > 41.9999 && v < 42.0001); + Ok(()) + } + super::run_test(inner()); } #[test] fn time_weighted_json_10() { - super::run_test(time_weighted_json_10_inner()); -} - -async fn time_weighted_json_10_inner() -> Result<(), Error> { - let rh = require_test_hosts_running()?; - let cluster = &rh.cluster; - get_json_common( - "scalar-i32-be", - "1970-01-01T00:20:10.000Z", - "1970-01-01T01:20:30.000Z", - 10, - AggKind::DimXBins1, - cluster, - 13, - true, - ) - .await?; - Ok(()) + async fn inner() -> Result<(), Error> { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + get_json_common( + "scalar-i32-be", + "1970-01-01T00:20:10.000Z", + "1970-01-01T01:20:30.000Z", + 10, + AggKind::DimXBins1, + cluster, + 13, + true, + ) + .await?; + Ok(()) + } + super::run_test(inner()); } #[test] fn time_weighted_json_20() { - super::run_test(time_weighted_json_20_inner()); -} - -async fn time_weighted_json_20_inner() -> Result<(), Error> { - let rh = require_test_hosts_running()?; - let cluster = &rh.cluster; - get_json_common( - "wave-f64-be-n21", - "1970-01-01T00:20:10.000Z", - "1970-01-01T01:20:45.000Z", - 10, - AggKind::TimeWeightedScalar, - cluster, - 13, - true, - ) - .await?; - Ok(()) + async fn inner() -> Result<(), Error> { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + get_json_common( + "wave-f64-be-n21", + "1970-01-01T00:20:10.000Z", + "1970-01-01T01:20:45.000Z", + 10, + AggKind::TimeWeightedScalar, + cluster, + 13, + true, + ) + .await?; + Ok(()) + } + super::run_test(inner()); } // For waveform with N x-bins, see test::binnedjson diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index 09472ae..734ddc5 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -21,9 +21,12 @@ where S: Stream>, TBT: TimeBinnableType, { + #[allow(unused)] inp: Pin>, + #[allow(unused)] left: Option>>>, //aggtor: Option<::Aggregator>, + #[allow(unused)] a: Option, } @@ -34,7 +37,6 @@ where { inp: Pin>, spec: BinnedRange, - x_bin_count: usize, curbin: u32, left: Option>>>, aggtor: Option<::Aggregator>, @@ -45,7 +47,6 @@ where range_complete_emitted: bool, errored: bool, completed: bool, - do_time_weight: bool, } impl TBinnerStream @@ -58,7 +59,6 @@ where Self { inp: Box::pin(inp), spec, - x_bin_count, curbin: 0, left: None, aggtor: Some(::aggregator( @@ -73,7 +73,6 @@ where range_complete_emitted: false, errored: false, completed: false, - do_time_weight, } } diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 8c3df76..754dd5f 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -5,8 +5,8 @@ use err::Error; use futures_core::Stream; use futures_util::StreamExt; use items::{LogItem, RangeCompletableItem, StreamItem}; +use netpod::log::*; use netpod::timeunits::SEC; -use netpod::{log::*, ByteSize}; use netpod::{ChannelConfig, NanoRange, Node}; use std::pin::Pin; use std::sync::atomic::AtomicU64; @@ -69,6 +69,10 @@ impl EventChunkerMultifile { self.seen_before_range_count } + pub fn seen_after_range_count(&self) -> usize { + self.seen_after_range_count + } + pub fn close(&mut self) { if let Some(evs) = &mut self.evs { self.seen_before_range_count += evs.seen_before_range_count(); @@ -153,9 +157,10 @@ impl Stream for EventChunkerMultifile { } } +#[cfg(test)] fn read_expanded_for_range(range: netpod::NanoRange) -> Result<(usize, usize), Error> { use netpod::timeunits::*; - use netpod::Nanos; + use netpod::{ByteSize, Nanos}; let chn = netpod::Channel { backend: "testbackend".into(), name: "scalar-i32-be".into(), diff --git a/h5out/src/lib.rs b/h5out/src/lib.rs index 8daf193..8f3da83 100644 --- a/h5out/src/lib.rs +++ b/h5out/src/lib.rs @@ -12,30 +12,30 @@ impl From for Error { } } -struct Out { +pub struct Out { cur: io::Cursor>, } impl Out { - fn new() -> Self { + pub fn new() -> Self { Self { cur: Cursor::new(vec![]), } } - fn write_u8(&mut self, k: u8) -> io::Result { + pub fn write_u8(&mut self, k: u8) -> io::Result { self.write(&k.to_le_bytes()) } - fn write_u16(&mut self, k: u16) -> io::Result { + pub fn write_u16(&mut self, k: u16) -> io::Result { self.write(&k.to_le_bytes()) } - fn write_u32(&mut self, k: u32) -> io::Result { + pub fn write_u32(&mut self, k: u32) -> io::Result { self.write(&k.to_le_bytes()) } - fn write_u64(&mut self, k: u64) -> io::Result { + pub fn write_u64(&mut self, k: u64) -> io::Result { self.write(&k.to_le_bytes()) } } @@ -61,6 +61,7 @@ fn emit() { write_h5().unwrap(); } +#[allow(unused)] fn write_h5() -> Result<(), Error> { let mut out = Out::new(); write_superblock(&mut out)?; @@ -72,6 +73,7 @@ fn write_h5() -> Result<(), Error> { Ok(()) } +#[allow(unused)] fn write_file(out: &Out) -> Result<(), Error> { eprintln!("Write {} bytes", out.cur.get_ref().len()); let mut f = OpenOptions::new() @@ -83,6 +85,7 @@ fn write_file(out: &Out) -> Result<(), Error> { Ok(()) } +#[allow(unused)] fn write_padding(out: &mut Out) -> Result<(), Error> { let n = out.cur.get_ref().len(); let m = n % 8; @@ -94,6 +97,7 @@ fn write_padding(out: &mut Out) -> Result<(), Error> { Ok(()) } +#[allow(unused)] fn write_superblock(out: &mut Out) -> Result<(), Error> { let super_ver = 0; let free_ver = 0; @@ -140,6 +144,7 @@ fn write_superblock(out: &mut Out) -> Result<(), Error> { Ok(()) } +#[allow(unused)] fn write_root_object_header(out: &mut Out) -> Result<(), Error> { write_padding(out)?; let pos0 = out.cur.get_ref().len() as u64; @@ -169,6 +174,7 @@ fn write_root_object_header(out: &mut Out) -> Result<(), Error> { Ok(()) } +#[allow(unused)] fn write_local_heap(out: &mut Out) -> Result<(), Error> { write_padding(out)?; let pos0 = out.cur.get_ref().len() as u64; diff --git a/httpret/src/gather.rs b/httpret/src/gather.rs index c20bb87..765a02d 100644 --- a/httpret/src/gather.rs +++ b/httpret/src/gather.rs @@ -168,7 +168,7 @@ pub struct SubRes { } pub async fn gather_get_json_generic( - method: http::Method, + _method: http::Method, urls: Vec, bodies: Vec>, tags: Vec, diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 124b13b..99a88e8 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -1,6 +1,5 @@ use crate::gather::gather_get_json; use bytes::Bytes; -use disk::binned::prebinned::pre_binned_bytes_for_http; use disk::binned::query::{BinnedQuery, PreBinnedQuery}; use disk::events::{PlainEventsBinaryQuery, PlainEventsJsonQuery}; use err::Error; @@ -8,6 +7,7 @@ use future::Future; use futures_core::Stream; use futures_util::{FutureExt, StreamExt}; use http::{HeaderMap, Method, StatusCode}; +use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; use hyper::{server::Server, Body, Request, Response}; use net::SocketAddr; @@ -41,8 +41,8 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { use std::str::FromStr; let addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen, node_config.node.port))?; let make_service = make_service_fn({ - move |conn| { - info!("»»»»»»»»»»» new connection {:?}", conn); + move |conn: &AddrStream| { + info!("new connection from {:?}", conn.remote_addr()); let node_config = node_config.clone(); async move { Ok::<_, Error>(service_fn({ @@ -55,7 +55,6 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { } }); Server::bind(&addr).serve(make_service).await?; - warn!("»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»» SERVICE DONE ««««««««««««««««««««««««««««««««««««««««"); rawjh.await??; Ok(()) } @@ -415,7 +414,7 @@ async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result span1.in_scope(|| { info!("prebinned STARTING"); }); - let fut = pre_binned_bytes_for_http(node_config, &query).instrument(span1); + let fut = disk::binned::prebinned::pre_binned_bytes_for_http(node_config, &query).instrument(span1); let ret = match fut.await { Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, desc))?, Err(e) => { diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index 5f5a1f2..64dbfd8 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -299,8 +299,8 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R } pub async fn proxy_api1_single_backend_query( - req: Request, - proxy_config: &ProxyConfig, + _req: Request, + _proxy_config: &ProxyConfig, ) -> Result, Error> { panic!() } diff --git a/items/src/eventvalues.rs b/items/src/eventvalues.rs index f8251df..8c353f7 100644 --- a/items/src/eventvalues.rs +++ b/items/src/eventvalues.rs @@ -291,7 +291,7 @@ where } } - fn apply_event_unweight(&mut self, val: NTY) { + fn apply_min_max(&mut self, val: NTY) { self.min = match self.min { None => Some(val), Some(min) => { @@ -312,6 +312,10 @@ where } } }; + } + + fn apply_event_unweight(&mut self, val: NTY) { + self.apply_min_max(val); let vf = val.as_(); if vf.is_nan() { } else { @@ -322,26 +326,7 @@ where fn apply_event_time_weight(&mut self, ts: u64, val: Option) { if let Some(v) = self.last_val { - self.min = match self.min { - None => Some(v), - Some(min) => { - if v < min { - Some(v) - } else { - Some(min) - } - } - }; - self.max = match self.max { - None => Some(v), - Some(max) => { - if v > max { - Some(v) - } else { - Some(max) - } - } - }; + self.apply_min_max(v); let w = if self.do_time_weight { (ts - self.int_ts) as f32 * 1e-9 } else { @@ -389,7 +374,7 @@ where } } - fn result_reset_unweight(&mut self, range: NanoRange, expand: bool) -> MinMaxAvgBins { + fn result_reset_unweight(&mut self, range: NanoRange, _expand: bool) -> MinMaxAvgBins { let avg = if self.sumc == 0 { None } else { diff --git a/items/src/minmaxavgbins.rs b/items/src/minmaxavgbins.rs index 8654da2..eed9efc 100644 --- a/items/src/minmaxavgbins.rs +++ b/items/src/minmaxavgbins.rs @@ -402,7 +402,7 @@ where } } - fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output { + fn result_reset(&mut self, range: NanoRange, _expand: bool) -> Self::Output { let avg = if self.sumc == 0 { None } else { diff --git a/items/src/minmaxavgdim1bins.rs b/items/src/minmaxavgdim1bins.rs index 42ecf5b..c5c8b72 100644 --- a/items/src/minmaxavgdim1bins.rs +++ b/items/src/minmaxavgdim1bins.rs @@ -404,7 +404,7 @@ where } } - fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output { + fn result_reset(&mut self, range: NanoRange, _expand: bool) -> Self::Output { let avg = if self.sumc == 0 { None } else { diff --git a/items/src/minmaxavgwavebins.rs b/items/src/minmaxavgwavebins.rs index b8799e0..1a1b97d 100644 --- a/items/src/minmaxavgwavebins.rs +++ b/items/src/minmaxavgwavebins.rs @@ -397,7 +397,7 @@ where } } - fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output { + fn result_reset(&mut self, range: NanoRange, _expand: bool) -> Self::Output { let ret; if self.sumc == 0 { ret = Self::Output { diff --git a/items/src/waveevents.rs b/items/src/waveevents.rs index 6e4a4b5..c758055 100644 --- a/items/src/waveevents.rs +++ b/items/src/waveevents.rs @@ -245,7 +245,7 @@ where } } - fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output { + fn result_reset(&mut self, range: NanoRange, _expand: bool) -> Self::Output { let avg = if self.sumc == 0 { None } else { diff --git a/items/src/xbinnedscalarevents.rs b/items/src/xbinnedscalarevents.rs index 5345124..5ca5836 100644 --- a/items/src/xbinnedscalarevents.rs +++ b/items/src/xbinnedscalarevents.rs @@ -6,7 +6,6 @@ use crate::{ ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; use err::Error; -use netpod::log::error; use netpod::timeunits::SEC; use netpod::NanoRange; use serde::{Deserialize, Serialize}; @@ -178,13 +177,20 @@ where max: Option, sumc: u64, sum: f32, + int_ts: u64, + last_ts: u64, + last_avg: Option, + last_min: Option, + last_max: Option, + do_time_weight: bool, } impl XBinnedScalarEventsAggregator where NTY: NumOps, { - pub fn new(range: NanoRange, _do_time_weight: bool) -> Self { + pub fn new(range: NanoRange, do_time_weight: bool) -> Self { + let int_ts = range.beg; Self { range, count: 0, @@ -192,8 +198,152 @@ where max: None, sumc: 0, sum: 0f32, + int_ts, + last_ts: 0, + last_avg: None, + last_min: None, + last_max: None, + do_time_weight, } } + + fn apply_min_max(&mut self, min: NTY, max: NTY) { + self.min = match self.min { + None => Some(min), + Some(cmin) => { + if min < cmin { + Some(min) + } else { + Some(cmin) + } + } + }; + self.max = match self.max { + None => Some(max), + Some(cmax) => { + if max > cmax { + Some(max) + } else { + Some(cmax) + } + } + }; + } + + fn apply_event_unweight(&mut self, avg: f32, min: NTY, max: NTY) { + self.apply_min_max(min, max); + let vf = avg; + if vf.is_nan() { + } else { + self.sum += vf; + self.sumc += 1; + } + } + + fn apply_event_time_weight(&mut self, ts: u64, avg: Option, min: Option, max: Option) { + if let Some(v) = self.last_avg { + self.apply_min_max(min.unwrap(), max.unwrap()); + let w = if self.do_time_weight { + (ts - self.int_ts) as f32 * 1e-9 + } else { + 1. + }; + let vf = v; + if vf.is_nan() { + } else { + self.sum += vf * w; + self.sumc += 1; + } + self.int_ts = ts; + } + self.last_ts = ts; + self.last_avg = avg; + self.last_min = min; + self.last_max = max; + } + + fn ingest_unweight(&mut self, item: &XBinnedScalarEvents) { + for i1 in 0..item.tss.len() { + let ts = item.tss[i1]; + let avg = item.avgs[i1]; + let min = item.mins[i1]; + let max = item.maxs[i1]; + if ts < self.range.beg { + } else if ts >= self.range.end { + } else { + self.count += 1; + self.apply_event_unweight(avg, min, max); + } + } + } + + fn ingest_time_weight(&mut self, item: &XBinnedScalarEvents) { + for i1 in 0..item.tss.len() { + let ts = item.tss[i1]; + let avg = item.avgs[i1]; + let min = item.mins[i1]; + let max = item.maxs[i1]; + if ts < self.int_ts { + self.last_ts = ts; + self.last_avg = Some(avg); + self.last_min = Some(min); + self.last_max = Some(max); + } else if ts >= self.range.end { + return; + } else { + self.count += 1; + self.apply_event_time_weight(ts, Some(avg), Some(min), Some(max)); + } + } + } + + fn result_reset_unweight(&mut self, range: NanoRange, _expand: bool) -> MinMaxAvgBins { + let avg = if self.sumc == 0 { + None + } else { + Some(self.sum / self.sumc as f32) + }; + let ret = MinMaxAvgBins { + ts1s: vec![self.range.beg], + ts2s: vec![self.range.end], + counts: vec![self.count], + mins: vec![self.min], + maxs: vec![self.max], + avgs: vec![avg], + }; + self.range = range; + self.count = 0; + self.min = None; + self.max = None; + self.sum = 0f32; + self.sumc = 0; + ret + } + + fn result_reset_time_weight(&mut self, range: NanoRange, expand: bool) -> MinMaxAvgBins { + if expand { + self.apply_event_time_weight(self.range.end, self.last_avg, self.last_min, self.last_max); + } + let avg = { + let sc = self.range.delta() as f32 * 1e-9; + Some(self.sum / sc) + }; + let ret = MinMaxAvgBins { + ts1s: vec![self.range.beg], + ts2s: vec![self.range.end], + counts: vec![self.count], + mins: vec![self.min], + maxs: vec![self.max], + avgs: vec![avg], + }; + self.range = range; + self.count = 0; + self.min = None; + self.max = None; + self.sum = 0f32; + self.sumc = 0; + ret + } } impl TimeBinnableTypeAggregator for XBinnedScalarEventsAggregator @@ -208,67 +358,19 @@ where } fn ingest(&mut self, item: &Self::Input) { - error!("time-weighted binning not available here."); - err::todo(); - for i1 in 0..item.tss.len() { - let ts = item.tss[i1]; - if ts < self.range.beg { - continue; - } else if ts >= self.range.end { - continue; - } else { - self.min = match self.min { - None => Some(item.mins[i1]), - Some(min) => { - if item.mins[i1] < min { - Some(item.mins[i1]) - } else { - Some(min) - } - } - }; - self.max = match self.max { - None => Some(item.maxs[i1]), - Some(max) => { - if item.maxs[i1] > max { - Some(item.maxs[i1]) - } else { - Some(max) - } - } - }; - let x = item.avgs[i1]; - if x.is_nan() { - } else { - self.sum += x; - self.sumc += 1; - } - self.count += 1; - } + if self.do_time_weight { + self.ingest_time_weight(item) + } else { + self.ingest_unweight(item) } } fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output { - let avg = if self.sumc == 0 { - None + if self.do_time_weight { + self.result_reset_time_weight(range, expand) } else { - Some(self.sum / self.sumc as f32) - }; - let ret = Self::Output { - ts1s: vec![self.range.beg], - ts2s: vec![self.range.end], - counts: vec![self.count], - mins: vec![self.min], - maxs: vec![self.max], - avgs: vec![avg], - }; - self.range = range; - self.count = 0; - self.min = None; - self.max = None; - self.sum = 0f32; - self.sumc = 0; - ret + self.result_reset_unweight(range, expand) + } } } diff --git a/items/src/xbinnedwaveevents.rs b/items/src/xbinnedwaveevents.rs index c968b06..224da44 100644 --- a/items/src/xbinnedwaveevents.rs +++ b/items/src/xbinnedwaveevents.rs @@ -237,7 +237,7 @@ where } } - fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output { + fn result_reset(&mut self, range: NanoRange, _expand: bool) -> Self::Output { let ret; if self.sumc == 0 { ret = Self::Output {