From 1067869f34f171732b248fe0363aa77e5e0033df Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Sat, 10 Apr 2021 09:17:04 +0200 Subject: [PATCH] Refactor --- disk/src/agg.rs | 62 ++++++++++++++++++++++++++++------------- disk/src/gen.rs | 71 +++++++++++++++++++++++++---------------------- disk/src/lib.rs | 60 ++++++++++++++++----------------------- netpod/src/lib.rs | 7 ++--- 4 files changed, 108 insertions(+), 92 deletions(-) diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 7bb18cf..a4d1c2a 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -6,7 +6,7 @@ use std::pin::Pin; use crate::EventFull; use futures_core::Stream; use futures_util::{pin_mut, StreamExt, future::ready}; -use netpod::ScalarType; +use netpod::{Channel, ChannelConfig, ScalarType, Shape, Node}; pub trait AggregatorTdim { type InputValue; @@ -763,23 +763,34 @@ fn agg_x_dim_0() { } async fn agg_x_dim_0_inner() { + let node = Node { + host: "localhost".into(), + port: 8888, + data_base_path: "../tmpdata/node0".into(), + split: 0, + ksprefix: "ks".into(), + }; let query = netpod::AggQuerySingleChannel { - ksprefix: "daq_swissfel".into(), - keyspace: 2, - channel: netpod::Channel { - name: "S10BC01-DBAM070:EOM1_T1".into(), - backend: "sf-databuffer".into(), + channel_config: ChannelConfig { + channel: Channel { + backend: "sf-databuffer".into(), + keyspace: 2, + name: "S10BC01-DBAM070:EOM1_T1".into(), + }, + time_bin_size: DAY, + shape: Shape::Scalar, + scalar_type: ScalarType::F64, + big_endian: true, + compression: true, }, timebin: 18723, tb_file_count: 1, - split: 12, - tbsize: 1000 * 60 * 60 * 24, buffer_size: 1024 * 4, }; let bin_count = 20; - let ts1 = query.timebin as u64 * query.tbsize as u64 * MS; + let ts1 = query.timebin as u64 * query.channel_config.time_bin_size * MS; let ts2 = ts1 + HOUR * 24; - let fut1 = crate::EventBlobsComplete::new(&query) + let fut1 = crate::EventBlobsComplete::new(&query, &node) .into_dim_1_f32_stream() //.take(1000) .map(|q| { @@ -809,23 +820,36 @@ fn agg_x_dim_1() { } async fn agg_x_dim_1_inner() { + // sf-databuffer + // /data/sf-databuffer/daq_swissfel/daq_swissfel_3/byTime/S10BC01-DBAM070\:BAM_CH1_NORM/* + let node = Node { + host: "localhost".into(), + port: 8888, + data_base_path: "../tmpdata/node0".into(), + split: 0, + ksprefix: "ks".into(), + }; let query = netpod::AggQuerySingleChannel { - ksprefix: "daq_swissfel".into(), - keyspace: 3, - channel: netpod::Channel { - name: "S10BC01-DBAM070:BAM_CH1_NORM".into(), - backend: "sf-databuffer".into(), + channel_config: ChannelConfig { + channel: Channel { + backend: "ks".into(), + keyspace: 3, + name: "S10BC01-DBAM070:BAM_CH1_NORM".into(), + }, + time_bin_size: DAY, + shape: Shape::Wave(1024), + scalar_type: ScalarType::F64, + big_endian: true, + compression: true, }, timebin: 18722, tb_file_count: 1, - split: 12, - tbsize: 1000 * 60 * 60 * 24, buffer_size: 1024 * 4, }; let bin_count = 100; - let ts1 = query.timebin as u64 * query.tbsize as u64 * MS; + let ts1 = query.timebin as u64 * query.channel_config.time_bin_size * MS; let ts2 = ts1 + HOUR * 24; - let fut1 = crate::EventBlobsComplete::new(&query) + let fut1 = crate::EventBlobsComplete::new(&query, &node) .into_dim_1_f32_stream() //.take(1000) .map(|q| { diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 113cf8a..9d70eda 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -15,7 +15,7 @@ use bitshuffle::bitshuffle_compress; use netpod::ScalarType; use std::sync::Arc; use crate::timeunits::*; -use netpod::{Node, Channel, Shape}; +use netpod::{Node, Channel, ChannelConfig, Shape}; #[test] fn test_gen_test_data() { @@ -33,30 +33,32 @@ pub async fn gen_test_data() -> Result<(), Error> { channels: vec![], }; { - let chan = Channel { - backend: "test".into(), - keyspace: 3, - name: "wave1".into(), + let config = ChannelConfig { + channel: Channel { + backend: "test".into(), + keyspace: 3, + name: "wave1".into(), + }, time_bin_size: DAY, scalar_type: ScalarType::F64, shape: Shape::Wave(9), + big_endian: true, compression: true, - time_spacing: HOUR * 6, }; - ensemble.channels.push(chan); + ensemble.channels.push(config); } let node0 = Node { host: "localhost".into(), port: 7780, split: 0, - data_base_path: data_base_path.clone(), + data_base_path: data_base_path.join("node0"), ksprefix: ksprefix.clone(), }; let node1 = Node { host: "localhost".into(), port: 7781, split: 1, - data_base_path: data_base_path.clone(), + data_base_path: data_base_path.join("node1"), ksprefix: ksprefix.clone(), }; ensemble.nodes.push(node0); @@ -69,27 +71,30 @@ pub async fn gen_test_data() -> Result<(), Error> { struct Ensemble { nodes: Vec, - channels: Vec, + channels: Vec, } async fn gen_node(node: &Node, ensemble: &Ensemble) -> Result<(), Error> { - tokio::fs::create_dir_all(&ensemble.direnv.path).await?; - for channel in &ensemble.channels { - gen_channel(channel, node, ensemble).await? + for config in &ensemble.channels { + gen_channel(config, node, ensemble).await? } Ok(()) } -async fn gen_channel(channel: &Channel, node: &Node, ensemble: &Ensemble) -> Result<(), Error> { - let channel_path = ensemble.direnv.path.clone() - .join(node.name()) - .join(format!("{}_{}", ensemble.direnv.ksprefix, channel.keyspace)) +async fn gen_channel(config: &ChannelConfig, node: &Node, ensemble: &Ensemble) -> Result<(), Error> { + let config_path = node.data_base_path + .join("config") + .join(&config.channel.name); + tokio::fs::create_dir_all(&config_path).await?; + let channel_path = node.data_base_path + .join(format!("{}_{}", node.ksprefix, config.channel.keyspace)) .join("byTime") - .join(&channel.name); + .join(&config.channel.name); tokio::fs::create_dir_all(&channel_path).await?; + let ts_spacing = HOUR * 6; let mut ts = 0; while ts < DAY { - let res = gen_timebin(ts, &channel_path, channel, node, ensemble).await?; + let res = gen_timebin(ts, ts_spacing, &channel_path, config, node, ensemble).await?; ts = res.ts; } Ok(()) @@ -99,20 +104,20 @@ struct GenTimebinRes { ts: u64, } -async fn gen_timebin(ts: u64, channel_path: &Path, channel: &Channel, node: &Node, ensemble: &Ensemble) -> Result { - let tb = ts / channel.time_bin_size; +async fn gen_timebin(ts: u64, ts_spacing: u64, channel_path: &Path, config: &ChannelConfig, node: &Node, ensemble: &Ensemble) -> Result { + let tb = ts / config.time_bin_size; let path = channel_path.join(format!("{:019}", tb)).join(format!("{:010}", node.split)); tokio::fs::create_dir_all(&path).await?; - let path = path.join(format!("{:019}_{:05}_Data", channel.time_bin_size / MS, 0)); + let path = path.join(format!("{:019}_{:05}_Data", config.time_bin_size / MS, 0)); info!("open file {:?}", path); let mut file = OpenOptions::new().write(true).create(true).truncate(true).open(path).await?; - gen_datafile_header(&mut file, channel).await?; + gen_datafile_header(&mut file, config).await?; let mut ts = ts; - let tsmax = (tb + 1) * channel.time_bin_size; + let tsmax = (tb + 1) * config.time_bin_size; while ts < tsmax { trace!("gen ts {}", ts); - gen_event(&mut file, ts, channel).await?; - ts += channel.time_spacing; + gen_event(&mut file, ts, config).await?; + ts += ts_spacing; } let ret = GenTimebinRes { ts, @@ -120,9 +125,9 @@ async fn gen_timebin(ts: u64, channel_path: &Path, channel: &Channel, node: &Nod Ok(ret) } -async fn gen_datafile_header(file: &mut File, channel: &Channel) -> Result<(), Error> { +async fn gen_datafile_header(file: &mut File, config: &ChannelConfig) -> Result<(), Error> { let mut buf = BytesMut::with_capacity(1024); - let cnenc = channel.name.as_bytes(); + let cnenc = config.channel.name.as_bytes(); let len1 = cnenc.len() + 8; buf.put_i16(0); buf.put_i32(len1 as i32); @@ -132,7 +137,7 @@ async fn gen_datafile_header(file: &mut File, channel: &Channel) -> Result<(), E Ok(()) } -async fn gen_event(file: &mut File, ts: u64, channel: &Channel) -> Result<(), Error> { +async fn gen_event(file: &mut File, ts: u64, config: &ChannelConfig) -> Result<(), Error> { let mut buf = BytesMut::with_capacity(1024 * 16); buf.put_i32(0xcafecafe as u32 as i32); buf.put_u64(0xcafecafe); @@ -143,16 +148,16 @@ async fn gen_event(file: &mut File, ts: u64, channel: &Channel) -> Result<(), Er buf.put_u8(0); buf.put_i32(-1); use crate::dtflags::*; - if channel.compression { - match channel.shape { + if config.compression { + match config.shape { Shape::Wave(ele_count) => { buf.put_u8(COMPRESSION | ARRAY | SHAPE | BIG_ENDIAN); - buf.put_u8(channel.scalar_type.index()); + buf.put_u8(config.scalar_type.index()); let comp_method = 0 as u8; buf.put_u8(comp_method); buf.put_u8(1); buf.put_u32(ele_count as u32); - match &channel.scalar_type { + match &config.scalar_type { ScalarType::F64 => { let ele_size = 8; let mut vals = vec![0; ele_size * ele_count]; diff --git a/disk/src/lib.rs b/disk/src/lib.rs index d782687..66c571c 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -143,6 +143,7 @@ unsafe impl Send for Fopen1 {} pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel, node: &Node) -> impl Stream> + Send { let mut query = query.clone(); + let node = node.clone(); async_stream::stream! { use tokio::io::AsyncReadExt; let mut fopen = None; @@ -157,7 +158,7 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg { if !fopen_avail && file_prep.is_none() && i1 < 16 { info!("Prepare open task for next file {}", query.timebin + i1); - fopen.replace(Fopen1::new(datapath(query.timebin as u64 + i1 as u64, &query.channel_config, node))); + fopen.replace(Fopen1::new(datapath(query.timebin as u64 + i1 as u64, &query.channel_config, &node))); fopen_avail = true; i1 += 1; } @@ -265,10 +266,11 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg } -pub fn raw_concat_channel_read_stream_file_pipe(query: &netpod::AggQuerySingleChannel) -> impl Stream> + Send { +pub fn raw_concat_channel_read_stream_file_pipe(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> impl Stream> + Send { let query = query.clone(); + let node = node.clone(); async_stream::stream! { - let chrx = open_files(&query); + let chrx = open_files(&query, &node); while let Ok(file) = chrx.recv().await { let mut file = match file { Ok(k) => k, @@ -290,14 +292,15 @@ pub fn raw_concat_channel_read_stream_file_pipe(query: &netpod::AggQuerySingleCh } } -fn open_files(query: &netpod::AggQuerySingleChannel) -> async_channel::Receiver> { +fn open_files(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> async_channel::Receiver> { let (chtx, chrx) = async_channel::bounded(2); let mut query = query.clone(); + let node = node.clone(); tokio::spawn(async move { let tb0 = query.timebin; for i1 in 0..query.tb_file_count { query.timebin = tb0 + i1; - let path = datapath(&query); + let path = datapath(query.timebin as u64, &query.channel_config, &node); let fileres = OpenOptions::new() .read(true) .open(&path) @@ -341,10 +344,11 @@ pub fn file_content_stream(mut file: tokio::fs::File, buffer_size: usize) -> imp } -pub fn parsed1(query: &netpod::AggQuerySingleChannel) -> impl Stream> + Send { +pub fn parsed1(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> impl Stream> + Send { let query = query.clone(); + let node = node.clone(); async_stream::stream! { - let filerx = open_files(&query); + let filerx = open_files(&query, &node); while let Ok(fileres) = filerx.recv().await { match fileres { Ok(file) => { @@ -385,9 +389,9 @@ pub struct EventBlobsComplete { } impl EventBlobsComplete { - pub fn new(query: &netpod::AggQuerySingleChannel) -> Self { + pub fn new(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> Self { Self { - file_chan: open_files(query), + file_chan: open_files(query, node), evs: None, buffer_size: query.buffer_size, } @@ -438,10 +442,11 @@ impl Stream for EventBlobsComplete { } -pub fn event_blobs_complete(query: &netpod::AggQuerySingleChannel) -> impl Stream> + Send { +pub fn event_blobs_complete(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> impl Stream> + Send { let query = query.clone(); + let node = node.clone(); async_stream::stream! { - let filerx = open_files(&query); + let filerx = open_files(&query, &node); while let Ok(fileres) = filerx.recv().await { match fileres { Ok(file) => { @@ -812,14 +817,15 @@ impl Stream for NeedMinBuffer { -pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> impl Stream> + Send { +pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> impl Stream> + Send { let mut query = query.clone(); + let node = node.clone(); async_stream::stream! { let mut i1 = 0; loop { let timebin = 18700 + i1; query.timebin = timebin; - let s2 = raw_concat_channel_read_stream_timebin(&query); + let s2 = raw_concat_channel_read_stream_timebin(&query, &node); pin_mut!(s2); while let Some(item) = s2.next().await { yield item; @@ -833,16 +839,13 @@ pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> } -pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChannel) -> impl Stream> { +pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> impl Stream> { let query = query.clone(); - let pre = "/data/sf-databuffer/daq_swissfel"; - let path = format!("{}/{}_{}/byTime/{}/{:019}/{:010}/{:019}_00000_Data", pre, query.ksprefix, query.keyspace, query.channel.name(), query.timebin, query.split, query.tbsize); + let node = node.clone(); async_stream::stream! { - debug!("try path: {}", path); - let mut fin = tokio::fs::OpenOptions::new() - .read(true) - .open(path) - .await?; + let path = datapath(query.timebin as u64, &query.channel_config, &node); + debug!("try path: {:?}", path); + let mut fin = OpenOptions::new().read(true).open(path).await?; let meta = fin.metadata().await?; debug!("file meta {:?}", meta); let blen = query.buffer_size as usize; @@ -878,21 +881,6 @@ fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &netpod::Node) - } -pub async fn raw_concat_channel_read(query: &netpod::AggQuerySingleChannel) -> Result { - let _reader = RawConcatChannelReader { - ksprefix: query.ksprefix.clone(), - keyspace: query.keyspace, - channel: query.channel.clone(), - split: query.split, - tbsize: query.tbsize, - buffer_size: query.buffer_size, - tb: 18714, - //file_reader: None, - fopen: None, - }; - todo!() -} - /** Read all events from all timebins for the given channel and split. */ diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index fe90e2e..7632505 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -8,10 +8,8 @@ use std::path::PathBuf; pub struct AggQuerySingleChannel { pub channel_config: ChannelConfig, pub timebin: u32, - pub split: u32, - pub tbsize: u32, - pub buffer_size: u32, pub tb_file_count: u32, + pub buffer_size: u32, } pub struct BodyStream { @@ -90,7 +88,7 @@ impl ScalarType { } - +#[derive(Clone)] pub struct Node { pub host: String, pub port: u16, @@ -129,6 +127,7 @@ pub struct ChannelConfig { pub time_bin_size: u64, pub scalar_type: ScalarType, pub shape: Shape, + pub big_endian: bool, pub compression: bool, }