From fdf8b38569c41e4cc9406b042c5c271d74d67a6a Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Sun, 11 Apr 2021 12:24:16 +0200 Subject: [PATCH] Factor a bit the data generator --- disk/src/agg.rs | 18 ++++++------ disk/src/gen.rs | 52 ++++++++++++++++++---------------- disk/src/lib.rs | 14 ++------- httpret/src/lib.rs | 3 +- netpod/src/lib.rs | 10 +++++++ retrieval/src/bin/retrieval.rs | 26 ++++++++++++----- 6 files changed, 70 insertions(+), 53 deletions(-) diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 0cd4a9c..281a8ae 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -6,7 +6,7 @@ use std::pin::Pin; use crate::EventFull; use futures_core::Stream; use futures_util::{pin_mut, StreamExt, future::ready}; -use netpod::{Channel, ChannelConfig, ScalarType, Shape, Node}; +use netpod::{Channel, ChannelConfig, ScalarType, Shape, Node, timeunits::*}; pub trait AggregatorTdim { type InputValue; @@ -192,10 +192,10 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatch { pub struct MinMaxAvgScalarEventBatchAggregator { ts1: u64, ts2: u64, + count: u64, min: f32, max: f32, sum: f32, - count: u64, } impl MinMaxAvgScalarEventBatchAggregator { @@ -272,6 +272,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { MinMaxAvgScalarBinSingle { ts1: self.ts1, ts2: self.ts2, + count: self.count, min, max, avg, @@ -284,6 +285,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { pub struct MinMaxAvgScalarBinBatch { ts1s: Vec, ts2s: Vec, + counts: Vec, mins: Vec, maxs: Vec, avgs: Vec, @@ -328,7 +330,6 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { todo!() } - fn ingest(&mut self, v: &Self::InputValue) { todo!() } @@ -341,6 +342,7 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { pub struct MinMaxAvgScalarBinSingle { ts1: u64, ts2: u64, + count: u64, min: f32, max: f32, avg: f32, @@ -348,7 +350,7 @@ pub struct MinMaxAvgScalarBinSingle { impl std::fmt::Debug for MinMaxAvgScalarBinSingle { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(fmt, "MinMaxAvgScalarBinSingle ts1 {} ts2 {} min {:7.2e} max {:7.2e} avg {:7.2e}", self.ts1, self.ts2, self.min, self.max, self.avg) + write!(fmt, "MinMaxAvgScalarBinSingle ts1 {} ts2 {} count {} min {:7.2e} max {:7.2e} avg {:7.2e}", self.ts1, self.ts2, self.count, self.min, self.max, self.avg) } } @@ -389,7 +391,9 @@ impl AggregatorTdim for MinMaxAvgScalarBinSingleAggregator { todo!() } - fn result(self) -> Self::OutputValue { todo!() } + fn result(self) -> Self::OutputValue { + todo!() + } } @@ -753,10 +757,6 @@ pub struct TimeRange { } -use crate::timeunits::*; - - - #[test] fn agg_x_dim_0() { taskrun::run(async { agg_x_dim_0_inner().await; Ok(()) }).unwrap(); diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 1263259..e066aab 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -14,8 +14,7 @@ use std::path::{Path, PathBuf}; use bitshuffle::bitshuffle_compress; use netpod::ScalarType; use std::sync::Arc; -use crate::timeunits::*; -use netpod::{Node, Channel, ChannelConfig, Shape}; +use netpod::{Node, Channel, ChannelConfig, Shape, timeunits::*}; #[test] fn test_gen_test_data() { @@ -33,19 +32,22 @@ pub async fn gen_test_data() -> Result<(), Error> { channels: vec![], }; { - let config = ChannelConfig { - channel: Channel { - backend: "test".into(), - keyspace: 3, - name: "wave1".into(), + let chn = ChannelGenProps { + config: ChannelConfig { + channel: Channel { + backend: "test".into(), + keyspace: 3, + name: "wave1".into(), + }, + time_bin_size: DAY, + scalar_type: ScalarType::F64, + shape: Shape::Wave(9), + big_endian: true, + compression: true, }, - time_bin_size: DAY, - scalar_type: ScalarType::F64, - shape: Shape::Wave(9), - big_endian: true, - compression: true, + time_spacing: SEC * 1, }; - ensemble.channels.push(config); + ensemble.channels.push(chn); } let node0 = Node { host: "localhost".into(), @@ -71,31 +73,35 @@ pub async fn gen_test_data() -> Result<(), Error> { struct Ensemble { nodes: Vec, - channels: Vec, + channels: Vec, +} + +pub struct ChannelGenProps { + config: ChannelConfig, + time_spacing: u64, } async fn gen_node(node: &Node, ensemble: &Ensemble) -> Result<(), Error> { - for config in &ensemble.channels { - gen_channel(config, node, ensemble).await? + for chn in &ensemble.channels { + gen_channel(chn, node, ensemble).await? } Ok(()) } -async fn gen_channel(config: &ChannelConfig, node: &Node, ensemble: &Ensemble) -> Result<(), Error> { +async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) -> Result<(), Error> { let config_path = node.data_base_path .join("config") - .join(&config.channel.name); + .join(&chn.config.channel.name); tokio::fs::create_dir_all(&config_path).await?; let channel_path = node.data_base_path - .join(format!("{}_{}", node.ksprefix, config.channel.keyspace)) + .join(format!("{}_{}", node.ksprefix, chn.config.channel.keyspace)) .join("byTime") - .join(&config.channel.name); + .join(&chn.config.channel.name); tokio::fs::create_dir_all(&channel_path).await?; - let ts_spacing = HOUR * 1; let mut evix = 0; let mut ts = 0; while ts < DAY { - let res = gen_timebin(evix, ts, ts_spacing, &channel_path, config, node, ensemble).await?; + let res = gen_timebin(evix, ts, chn.time_spacing, &channel_path, &chn.config, node, ensemble).await?; evix = res.evix; ts = res.ts; } @@ -120,7 +126,6 @@ async fn gen_timebin(evix: u64, ts: u64, ts_spacing: u64, channel_path: &Path, c let tsmax = (tb + 1) * config.time_bin_size; while ts < tsmax { if evix % ensemble.nodes.len() as u64 == node.split as u64 { - trace!("gen ts {}", ts); gen_event(&mut file, evix, ts, config).await?; } evix += 1; @@ -179,7 +184,6 @@ async fn gen_event(file: &mut File, evix: u64, ts: u64, config: &ChannelConfig) } let mut comp = vec![0u8; ele_size * ele_count + 64]; let n1 = bitshuffle_compress(&vals, &mut comp, ele_count, ele_size, 0).unwrap(); - trace!("comp size {} {}e-2", n1, 100 * n1 / vals.len()); buf.put_u64(vals.len() as u64); let comp_block_size = 0; buf.put_u32(comp_block_size); diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 262debd..0466f18 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -605,7 +605,7 @@ impl EventChunker { let block_size = sl.read_u32::().unwrap(); let p1 = sl.position() as u32; let k1 = len as u32 - p1 - 4; - debug!("event len {} ts {} is_compressed {} shape_dim {} len-dim-0 {} value_bytes {} block_size {}", len, ts, is_compressed, shape_dim, shape_lens[0], value_bytes, block_size); + //debug!("event len {} ts {} is_compressed {} shape_dim {} len-dim-0 {} value_bytes {} block_size {}", len, ts, is_compressed, shape_dim, shape_lens[0], value_bytes, block_size); assert!(value_bytes < 1024 * 256); assert!(block_size < 1024 * 32); //let value_bytes = value_bytes; @@ -878,7 +878,7 @@ fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &netpod::Node) - .join(config.channel.name.clone()) .join(format!("{:019}", timebin)) .join(format!("{:010}", node.split)) - .join(format!("{:019}_00000_Data", config.time_bin_size / timeunits::MS)) + .join(format!("{:019}_00000_Data", config.time_bin_size / netpod::timeunits::MS)) } @@ -925,16 +925,6 @@ impl futures_core::Stream for RawConcatChannelReader { } -pub mod timeunits { - pub const MU: u64 = 1000; - pub const MS: u64 = MU * 1000; - pub const SEC: u64 = MS * 1000; - pub const MIN: u64 = SEC * 60; - pub const HOUR: u64 = MIN * 60; - pub const DAY: u64 = HOUR * 24; - pub const WEEK: u64 = DAY * 7; -} - pub mod dtflags { pub const COMPRESSION: u8 = 0x80; pub const ARRAY: u8 = 0x40; diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 7e3858c..7f8246d 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -59,13 +59,14 @@ fn response(status: T) -> http::response::Builder } async fn parsed_raw(req: Request) -> Result, Error> { + let node = todo!("get node from config"); use netpod::AggQuerySingleChannel; let reqbody = req.into_body(); let bodyslice = hyper::body::to_bytes(reqbody).await?; let query: AggQuerySingleChannel = serde_json::from_slice(&bodyslice)?; //let q = disk::read_test_1(&query).await?; //let s = q.inner; - let s = disk::parsed1(&query); + let s = disk::parsed1(&query, &node); let res = response(StatusCode::OK) .body(Body::wrap_stream(s))?; /* diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 7632505..b04078d 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -136,3 +136,13 @@ pub enum Shape { Scalar, Wave(usize), } + +pub mod timeunits { + pub const MU: u64 = 1000; + pub const MS: u64 = MU * 1000; + pub const SEC: u64 = MS * 1000; + pub const MIN: u64 = SEC * 60; + pub const HOUR: u64 = MIN * 60; + pub const DAY: u64 = HOUR * 24; + pub const WEEK: u64 = DAY * 7; +} diff --git a/retrieval/src/bin/retrieval.rs b/retrieval/src/bin/retrieval.rs index 36d26fa..072feba 100644 --- a/retrieval/src/bin/retrieval.rs +++ b/retrieval/src/bin/retrieval.rs @@ -1,6 +1,7 @@ #[allow(unused_imports)] use tracing::{error, warn, info, debug, trace}; use err::Error; +use netpod::{ChannelConfig, Channel, timeunits::*, ScalarType, Shape, Node}; pub fn main() { match taskrun::run(go()) { @@ -31,17 +32,28 @@ async fn go() -> Result<(), Error> { fn simple_fetch() { taskrun::run(async { let t1 = chrono::Utc::now(); - let query = netpod::AggQuerySingleChannel { + let node = Node { + host: "localhost".into(), + port: 8888, + data_base_path: todo!(), ksprefix: "daq_swissfel".into(), - keyspace: 3, - channel: netpod::Channel { - name: "S10BC01-DBAM070:BAM_CH1_NORM".into(), - backend: "sf-databuffer".into(), + split: 0, + }; + let query = netpod::AggQuerySingleChannel { + channel_config: ChannelConfig { + channel: Channel { + backend: "sf-databuffer".into(), + keyspace: 3, + name: "S10BC01-DBAM070:BAM_CH1_NORM".into(), + }, + time_bin_size: DAY, + scalar_type: ScalarType::F64, + shape: Shape::Wave(todo!()), + big_endian: true, + compression: true, }, timebin: 18720, tb_file_count: 1, - split: 12, - tbsize: 1000 * 60 * 60 * 24, buffer_size: 1024 * 8, }; let query_string = serde_json::to_string(&query).unwrap();