Factor a bit the data generator

This commit is contained in:
Dominik Werder
2021-04-11 12:24:16 +02:00
parent 6dc4ccd5a2
commit fdf8b38569
6 changed files with 70 additions and 53 deletions

View File

@@ -6,7 +6,7 @@ use std::pin::Pin;
use crate::EventFull;
use futures_core::Stream;
use futures_util::{pin_mut, StreamExt, future::ready};
use netpod::{Channel, ChannelConfig, ScalarType, Shape, Node};
use netpod::{Channel, ChannelConfig, ScalarType, Shape, Node, timeunits::*};
pub trait AggregatorTdim {
type InputValue;
@@ -192,10 +192,10 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatch {
pub struct MinMaxAvgScalarEventBatchAggregator {
ts1: u64,
ts2: u64,
count: u64,
min: f32,
max: f32,
sum: f32,
count: u64,
}
impl MinMaxAvgScalarEventBatchAggregator {
@@ -272,6 +272,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
MinMaxAvgScalarBinSingle {
ts1: self.ts1,
ts2: self.ts2,
count: self.count,
min,
max,
avg,
@@ -284,6 +285,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
pub struct MinMaxAvgScalarBinBatch {
ts1s: Vec<u64>,
ts2s: Vec<u64>,
counts: Vec<u64>,
mins: Vec<f32>,
maxs: Vec<f32>,
avgs: Vec<f32>,
@@ -328,7 +330,6 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
todo!()
}
fn ingest(&mut self, v: &Self::InputValue) {
todo!()
}
@@ -341,6 +342,7 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
pub struct MinMaxAvgScalarBinSingle {
ts1: u64,
ts2: u64,
count: u64,
min: f32,
max: f32,
avg: f32,
@@ -348,7 +350,7 @@ pub struct MinMaxAvgScalarBinSingle {
impl std::fmt::Debug for MinMaxAvgScalarBinSingle {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(fmt, "MinMaxAvgScalarBinSingle ts1 {} ts2 {} min {:7.2e} max {:7.2e} avg {:7.2e}", self.ts1, self.ts2, self.min, self.max, self.avg)
write!(fmt, "MinMaxAvgScalarBinSingle ts1 {} ts2 {} count {} min {:7.2e} max {:7.2e} avg {:7.2e}", self.ts1, self.ts2, self.count, self.min, self.max, self.avg)
}
}
@@ -389,7 +391,9 @@ impl AggregatorTdim for MinMaxAvgScalarBinSingleAggregator {
todo!()
}
fn result(self) -> Self::OutputValue { todo!() }
fn result(self) -> Self::OutputValue {
todo!()
}
}
@@ -753,10 +757,6 @@ pub struct TimeRange {
}
use crate::timeunits::*;
#[test]
fn agg_x_dim_0() {
taskrun::run(async { agg_x_dim_0_inner().await; Ok(()) }).unwrap();

View File

@@ -14,8 +14,7 @@ use std::path::{Path, PathBuf};
use bitshuffle::bitshuffle_compress;
use netpod::ScalarType;
use std::sync::Arc;
use crate::timeunits::*;
use netpod::{Node, Channel, ChannelConfig, Shape};
use netpod::{Node, Channel, ChannelConfig, Shape, timeunits::*};
#[test]
fn test_gen_test_data() {
@@ -33,19 +32,22 @@ pub async fn gen_test_data() -> Result<(), Error> {
channels: vec![],
};
{
let config = ChannelConfig {
channel: Channel {
backend: "test".into(),
keyspace: 3,
name: "wave1".into(),
let chn = ChannelGenProps {
config: ChannelConfig {
channel: Channel {
backend: "test".into(),
keyspace: 3,
name: "wave1".into(),
},
time_bin_size: DAY,
scalar_type: ScalarType::F64,
shape: Shape::Wave(9),
big_endian: true,
compression: true,
},
time_bin_size: DAY,
scalar_type: ScalarType::F64,
shape: Shape::Wave(9),
big_endian: true,
compression: true,
time_spacing: SEC * 1,
};
ensemble.channels.push(config);
ensemble.channels.push(chn);
}
let node0 = Node {
host: "localhost".into(),
@@ -71,31 +73,35 @@ pub async fn gen_test_data() -> Result<(), Error> {
struct Ensemble {
nodes: Vec<Node>,
channels: Vec<ChannelConfig>,
channels: Vec<ChannelGenProps>,
}
pub struct ChannelGenProps {
config: ChannelConfig,
time_spacing: u64,
}
async fn gen_node(node: &Node, ensemble: &Ensemble) -> Result<(), Error> {
for config in &ensemble.channels {
gen_channel(config, node, ensemble).await?
for chn in &ensemble.channels {
gen_channel(chn, node, ensemble).await?
}
Ok(())
}
async fn gen_channel(config: &ChannelConfig, node: &Node, ensemble: &Ensemble) -> Result<(), Error> {
async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) -> Result<(), Error> {
let config_path = node.data_base_path
.join("config")
.join(&config.channel.name);
.join(&chn.config.channel.name);
tokio::fs::create_dir_all(&config_path).await?;
let channel_path = node.data_base_path
.join(format!("{}_{}", node.ksprefix, config.channel.keyspace))
.join(format!("{}_{}", node.ksprefix, chn.config.channel.keyspace))
.join("byTime")
.join(&config.channel.name);
.join(&chn.config.channel.name);
tokio::fs::create_dir_all(&channel_path).await?;
let ts_spacing = HOUR * 1;
let mut evix = 0;
let mut ts = 0;
while ts < DAY {
let res = gen_timebin(evix, ts, ts_spacing, &channel_path, config, node, ensemble).await?;
let res = gen_timebin(evix, ts, chn.time_spacing, &channel_path, &chn.config, node, ensemble).await?;
evix = res.evix;
ts = res.ts;
}
@@ -120,7 +126,6 @@ async fn gen_timebin(evix: u64, ts: u64, ts_spacing: u64, channel_path: &Path, c
let tsmax = (tb + 1) * config.time_bin_size;
while ts < tsmax {
if evix % ensemble.nodes.len() as u64 == node.split as u64 {
trace!("gen ts {}", ts);
gen_event(&mut file, evix, ts, config).await?;
}
evix += 1;
@@ -179,7 +184,6 @@ async fn gen_event(file: &mut File, evix: u64, ts: u64, config: &ChannelConfig)
}
let mut comp = vec![0u8; ele_size * ele_count + 64];
let n1 = bitshuffle_compress(&vals, &mut comp, ele_count, ele_size, 0).unwrap();
trace!("comp size {} {}e-2", n1, 100 * n1 / vals.len());
buf.put_u64(vals.len() as u64);
let comp_block_size = 0;
buf.put_u32(comp_block_size);

View File

@@ -605,7 +605,7 @@ impl EventChunker {
let block_size = sl.read_u32::<BE>().unwrap();
let p1 = sl.position() as u32;
let k1 = len as u32 - p1 - 4;
debug!("event len {} ts {} is_compressed {} shape_dim {} len-dim-0 {} value_bytes {} block_size {}", len, ts, is_compressed, shape_dim, shape_lens[0], value_bytes, block_size);
//debug!("event len {} ts {} is_compressed {} shape_dim {} len-dim-0 {} value_bytes {} block_size {}", len, ts, is_compressed, shape_dim, shape_lens[0], value_bytes, block_size);
assert!(value_bytes < 1024 * 256);
assert!(block_size < 1024 * 32);
//let value_bytes = value_bytes;
@@ -878,7 +878,7 @@ fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &netpod::Node) -
.join(config.channel.name.clone())
.join(format!("{:019}", timebin))
.join(format!("{:010}", node.split))
.join(format!("{:019}_00000_Data", config.time_bin_size / timeunits::MS))
.join(format!("{:019}_00000_Data", config.time_bin_size / netpod::timeunits::MS))
}
@@ -925,16 +925,6 @@ impl futures_core::Stream for RawConcatChannelReader {
}
pub mod timeunits {
pub const MU: u64 = 1000;
pub const MS: u64 = MU * 1000;
pub const SEC: u64 = MS * 1000;
pub const MIN: u64 = SEC * 60;
pub const HOUR: u64 = MIN * 60;
pub const DAY: u64 = HOUR * 24;
pub const WEEK: u64 = DAY * 7;
}
pub mod dtflags {
pub const COMPRESSION: u8 = 0x80;
pub const ARRAY: u8 = 0x40;

View File

@@ -59,13 +59,14 @@ fn response<T>(status: T) -> http::response::Builder
}
async fn parsed_raw(req: Request<Body>) -> Result<Response<Body>, Error> {
let node = todo!("get node from config");
use netpod::AggQuerySingleChannel;
let reqbody = req.into_body();
let bodyslice = hyper::body::to_bytes(reqbody).await?;
let query: AggQuerySingleChannel = serde_json::from_slice(&bodyslice)?;
//let q = disk::read_test_1(&query).await?;
//let s = q.inner;
let s = disk::parsed1(&query);
let s = disk::parsed1(&query, &node);
let res = response(StatusCode::OK)
.body(Body::wrap_stream(s))?;
/*

View File

@@ -136,3 +136,13 @@ pub enum Shape {
Scalar,
Wave(usize),
}
pub mod timeunits {
pub const MU: u64 = 1000;
pub const MS: u64 = MU * 1000;
pub const SEC: u64 = MS * 1000;
pub const MIN: u64 = SEC * 60;
pub const HOUR: u64 = MIN * 60;
pub const DAY: u64 = HOUR * 24;
pub const WEEK: u64 = DAY * 7;
}

View File

@@ -1,6 +1,7 @@
#[allow(unused_imports)]
use tracing::{error, warn, info, debug, trace};
use err::Error;
use netpod::{ChannelConfig, Channel, timeunits::*, ScalarType, Shape, Node};
pub fn main() {
match taskrun::run(go()) {
@@ -31,17 +32,28 @@ async fn go() -> Result<(), Error> {
fn simple_fetch() {
taskrun::run(async {
let t1 = chrono::Utc::now();
let query = netpod::AggQuerySingleChannel {
let node = Node {
host: "localhost".into(),
port: 8888,
data_base_path: todo!(),
ksprefix: "daq_swissfel".into(),
keyspace: 3,
channel: netpod::Channel {
name: "S10BC01-DBAM070:BAM_CH1_NORM".into(),
backend: "sf-databuffer".into(),
split: 0,
};
let query = netpod::AggQuerySingleChannel {
channel_config: ChannelConfig {
channel: Channel {
backend: "sf-databuffer".into(),
keyspace: 3,
name: "S10BC01-DBAM070:BAM_CH1_NORM".into(),
},
time_bin_size: DAY,
scalar_type: ScalarType::F64,
shape: Shape::Wave(todo!()),
big_endian: true,
compression: true,
},
timebin: 18720,
tb_file_count: 1,
split: 12,
tbsize: 1000 * 60 * 60 * 24,
buffer_size: 1024 * 8,
};
let query_string = serde_json::to_string(&query).unwrap();