diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 636da96..7bb18cf 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -385,7 +385,6 @@ impl AggregatorTdim for MinMaxAvgScalarBinSingleAggregator { todo!() } - fn ingest(&mut self, v: &Self::InputValue) { todo!() } @@ -810,34 +809,6 @@ fn agg_x_dim_1() { } async fn agg_x_dim_1_inner() { - let vals = ValuesDim1 { - tss: vec![0, 1, 2, 3], - values: vec![ - vec![0., 0., 0.], - vec![1., 1., 1.], - vec![2., 2., 2.], - vec![3., 3., 3.], - ], - }; - // I want to distinguish already in the outer part between dim-0 and dim-1 and generate - // separate code for these cases... - // That means that also the reading chain itself needs to be typed on that. - // Need to supply some event-payload converter type which has that type as Output type. - let vals2 = vals.into_agg(); - // Now the T-binning: - - /* - T-aggregator must be able to produce empty-values of correct type even if we never get - a single value of input data. - Therefore, it needs the bin range definition. - How do I want to drive the system? - If I write the T-binner as a Stream, then I also need to pass it the input! - Meaning, I need to pass the Stream which produces the actual numbers from disk. - - readchannel() -> Stream of timestamped byte blobs - .to_f32() -> Stream ? indirection to branch on the underlying shape - .agg_x_bins_1() -> Stream ? can I keep it at the single indirection on the top level? - */ let query = netpod::AggQuerySingleChannel { ksprefix: "daq_swissfel".into(), keyspace: 3, @@ -876,3 +847,35 @@ async fn agg_x_dim_1_inner() { .for_each(|k| ready(())); fut1.await; } + + +pub fn tmp_some_older_things() { + let vals = ValuesDim1 { + tss: vec![0, 1, 2, 3], + values: vec![ + vec![0., 0., 0.], + vec![1., 1., 1.], + vec![2., 2., 2.], + vec![3., 3., 3.], + ], + }; + // I want to distinguish already in the outer part between dim-0 and dim-1 and generate + // separate code for these cases... + // That means that also the reading chain itself needs to be typed on that. + // Need to supply some event-payload converter type which has that type as Output type. + let vals2 = vals.into_agg(); + // Now the T-binning: + + /* + T-aggregator must be able to produce empty-values of correct type even if we never get + a single value of input data. + Therefore, it needs the bin range definition. + How do I want to drive the system? + If I write the T-binner as a Stream, then I also need to pass it the input! + Meaning, I need to pass the Stream which produces the actual numbers from disk. + + readchannel() -> Stream of timestamped byte blobs + .to_f32() -> Stream ? indirection to branch on the underlying shape + .agg_x_bins_1() -> Stream ? can I keep it at the single indirection on the top level? + */ +} diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 02838ef..113cf8a 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -14,8 +14,8 @@ use std::path::{Path, PathBuf}; use bitshuffle::bitshuffle_compress; use netpod::ScalarType; use std::sync::Arc; -use crate::gen::Shape::Scalar; use crate::timeunits::*; +use netpod::{Node, Channel, Shape}; #[test] fn test_gen_test_data() { @@ -26,14 +26,11 @@ fn test_gen_test_data() { } pub async fn gen_test_data() -> Result<(), Error> { - let direnv = DirEnv { - path: "../tmpdata".into(), - ksprefix: "ks".into(), - }; + let data_base_path = PathBuf::from("../tmpdata"); + let ksprefix = String::from("ks"); let mut ensemble = Ensemble { nodes: vec![], channels: vec![], - direnv: Arc::new(direnv), }; { let chan = Channel { @@ -41,8 +38,9 @@ pub async fn gen_test_data() -> Result<(), Error> { keyspace: 3, name: "wave1".into(), time_bin_size: DAY, - scalar_type: ScalarType::F32, - shape: Shape::Wave(42), + scalar_type: ScalarType::F64, + shape: Shape::Wave(9), + compression: true, time_spacing: HOUR * 6, }; ensemble.channels.push(chan); @@ -51,11 +49,15 @@ pub async fn gen_test_data() -> Result<(), Error> { host: "localhost".into(), port: 7780, split: 0, + data_base_path: data_base_path.clone(), + ksprefix: ksprefix.clone(), }; let node1 = Node { host: "localhost".into(), port: 7781, split: 1, + data_base_path: data_base_path.clone(), + ksprefix: ksprefix.clone(), }; ensemble.nodes.push(node0); ensemble.nodes.push(node1); @@ -68,39 +70,6 @@ pub async fn gen_test_data() -> Result<(), Error> { struct Ensemble { nodes: Vec, channels: Vec, - direnv: Arc, -} - -struct DirEnv { - path: PathBuf, - ksprefix: String, -} - -struct Node { - host: String, - port: u16, - split: u8, -} - -impl Node { - fn name(&self) -> String { - format!("{}-{}", self.host, self.port) - } -} - -enum Shape { - Scalar, - Wave(usize), -} - -struct Channel { - keyspace: u8, - backend: String, - name: String, - time_bin_size: u64, - scalar_type: ScalarType, - shape: Shape, - time_spacing: u64, } async fn gen_node(node: &Node, ensemble: &Ensemble) -> Result<(), Error> { @@ -112,7 +81,7 @@ async fn gen_node(node: &Node, ensemble: &Ensemble) -> Result<(), Error> { } async fn gen_channel(channel: &Channel, node: &Node, ensemble: &Ensemble) -> Result<(), Error> { - let mut channel_path = ensemble.direnv.path.clone() + let channel_path = ensemble.direnv.path.clone() .join(node.name()) .join(format!("{}_{}", ensemble.direnv.ksprefix, channel.keyspace)) .join("byTime") @@ -142,6 +111,7 @@ async fn gen_timebin(ts: u64, channel_path: &Path, channel: &Channel, node: &Nod let tsmax = (tb + 1) * channel.time_bin_size; while ts < tsmax { trace!("gen ts {}", ts); + gen_event(&mut file, ts, channel).await?; ts += channel.time_spacing; } let ret = GenTimebinRes { @@ -152,7 +122,6 @@ async fn gen_timebin(ts: u64, channel_path: &Path, channel: &Channel, node: &Nod async fn gen_datafile_header(file: &mut File, channel: &Channel) -> Result<(), Error> { let mut buf = BytesMut::with_capacity(1024); - //use bytes::BufMut; let cnenc = channel.name.as_bytes(); let len1 = cnenc.len() + 8; buf.put_i16(0); @@ -162,3 +131,61 @@ async fn gen_datafile_header(file: &mut File, channel: &Channel) -> Result<(), E file.write_all(&buf).await?; Ok(()) } + +async fn gen_event(file: &mut File, ts: u64, channel: &Channel) -> Result<(), Error> { + let mut buf = BytesMut::with_capacity(1024 * 16); + buf.put_i32(0xcafecafe as u32 as i32); + buf.put_u64(0xcafecafe); + buf.put_u64(ts); + buf.put_u64(2323); + buf.put_u64(0xcafecafe); + buf.put_u8(0); + buf.put_u8(0); + buf.put_i32(-1); + use crate::dtflags::*; + if channel.compression { + match channel.shape { + Shape::Wave(ele_count) => { + buf.put_u8(COMPRESSION | ARRAY | SHAPE | BIG_ENDIAN); + buf.put_u8(channel.scalar_type.index()); + let comp_method = 0 as u8; + buf.put_u8(comp_method); + buf.put_u8(1); + buf.put_u32(ele_count as u32); + match &channel.scalar_type { + ScalarType::F64 => { + let ele_size = 8; + let mut vals = vec![0; ele_size * ele_count]; + for i1 in 0..ele_count { + let v = 1.22 as f64; + let a = v.to_be_bytes(); + let mut c1 = std::io::Cursor::new(&mut vals); + use std::io::{Seek, SeekFrom}; + c1.seek(SeekFrom::Start(i1 as u64 * ele_size as u64))?; + std::io::Write::write_all(&mut c1, &a)?; + } + let mut comp = vec![0u8; ele_size * ele_count + 64]; + let n1 = bitshuffle_compress(&vals, &mut comp, ele_count, ele_size, 0).unwrap(); + trace!("comp size {} {}e-2", n1, 100 * n1 / vals.len()); + buf.put_u64(vals.len() as u64); + let comp_block_size = 0; + buf.put_u32(comp_block_size); + buf.put(comp.as_slice()); + } + _ => todo!() + } + } + _ => todo!() + } + } + else { + todo!() + } + { + let len = buf.len() as u32 + 4; + buf.put_u32(len); + buf.as_mut().put_u32(len); + } + file.write_all(buf.as_ref()).await?; + Ok(()) +} diff --git a/disk/src/lib.rs b/disk/src/lib.rs index c7ce7d9..d782687 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -15,14 +15,13 @@ use tokio::fs::{OpenOptions, File}; use bytes::{Bytes, BytesMut, Buf}; use std::path::PathBuf; use bitshuffle::bitshuffle_decompress; -use netpod::ScalarType; +use netpod::{ScalarType, Node}; -pub async fn read_test_1(query: &netpod::AggQuerySingleChannel) -> Result { - let pre = "/data/sf-databuffer/daq_swissfel"; - let path = format!("{}/{}_{}/byTime/{}/{:019}/{:010}/{:019}_00000_Data", pre, query.ksprefix, query.keyspace, query.channel.name(), query.timebin, query.split, query.tbsize); - debug!("try path: {}", path); - let fin = tokio::fs::OpenOptions::new() +pub async fn read_test_1(query: &netpod::AggQuerySingleChannel, node: &Node) -> Result { + let path = datapath(query.timebin as u64, &query.channel_config, node); + debug!("try path: {:?}", path); + let fin = OpenOptions::new() .read(true) .open(path) .await?; @@ -142,7 +141,7 @@ impl FusedFuture for Fopen1 { unsafe impl Send for Fopen1 {} -pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel) -> impl Stream> + Send { +pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel, node: &Node) -> impl Stream> + Send { let mut query = query.clone(); async_stream::stream! { use tokio::io::AsyncReadExt; @@ -157,9 +156,8 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg let blen = query.buffer_size as usize; { if !fopen_avail && file_prep.is_none() && i1 < 16 { - query.timebin = 18700 + i1; - info!("Prepare open task for next file {}", query.timebin); - fopen.replace(Fopen1::new(datapath(&query))); + info!("Prepare open task for next file {}", query.timebin + i1); + fopen.replace(Fopen1::new(datapath(query.timebin as u64 + i1 as u64, &query.channel_config, node))); fopen_avail = true; i1 += 1; } @@ -300,7 +298,7 @@ fn open_files(query: &netpod::AggQuerySingleChannel) -> async_channel::Receiver< for i1 in 0..query.tb_file_count { query.timebin = tb0 + i1; let path = datapath(&query); - let fileres = tokio::fs::OpenOptions::new() + let fileres = OpenOptions::new() .read(true) .open(&path) .await; @@ -573,10 +571,11 @@ impl EventChunker { let type_flags = sl.read_u8().unwrap(); let type_index = sl.read_u8().unwrap(); assert!(type_index <= 13); - let is_compressed = type_flags & 0x80 != 0; - let is_array = type_flags & 0x40 != 0; - let is_big_endian = type_flags & 0x20 != 0; - let is_shaped = type_flags & 0x10 != 0; + use dtflags::*; + let is_compressed = type_flags & COMPRESSION != 0; + let is_array = type_flags & ARRAY != 0; + let is_big_endian = type_flags & BIG_ENDIAN != 0; + let is_shaped = type_flags & SHAPE != 0; let compression_method = if is_compressed { sl.read_u8().unwrap() } @@ -595,7 +594,7 @@ impl EventChunker { for i1 in 0..shape_dim { shape_lens[i1 as usize] = sl.read_u8().unwrap(); } - if true && is_compressed { + if is_compressed { //info!("event ts {} is_compressed {}", ts, is_compressed); let value_bytes = sl.read_u64::().unwrap(); let block_size = sl.read_u32::().unwrap(); @@ -618,6 +617,9 @@ impl EventChunker { assert!(c1.unwrap() as u32 == k1); ret.add_event(ts, pulse, Some(decomp), ScalarType::from_dtype_index(type_index)); } + else { + todo!() + } buf.advance(len as usize); need_min = 4; } @@ -864,10 +866,15 @@ pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChan } -fn datapath(query: &netpod::AggQuerySingleChannel) -> PathBuf { - let pre = "/data/sf-databuffer/daq_swissfel"; - let path = format!("{}/{}_{}/byTime/{}/{:019}/{:010}/{:019}_00000_Data", pre, query.ksprefix, query.keyspace, query.channel.name(), query.timebin, query.split, query.tbsize); - path.into() +fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &netpod::Node) -> PathBuf { + //let pre = "/data/sf-databuffer/daq_swissfel"; + node.data_base_path + .join(format!("{}_{}", node.ksprefix, config.channel.keyspace)) + .join("byTime") + .join(config.channel.name.clone()) + .join(format!("{:019}", timebin)) + .join(format!("{:010}", node.split)) + .join(format!("{:019}_00000_Data", config.time_bin_size)) } @@ -938,3 +945,10 @@ pub mod timeunits { pub const DAY: u64 = HOUR * 24; pub const WEEK: u64 = DAY * 7; } + +pub mod dtflags { + pub const COMPRESSION: u8 = 0x80; + pub const ARRAY: u8 = 0x40; + pub const BIG_ENDIAN: u8 = 0x20; + pub const SHAPE: u8 = 0x10; +} diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 84e9801..fe90e2e 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -1,32 +1,12 @@ use serde::{Serialize, Deserialize}; use err::Error; -//use std::pin::Pin; - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Channel { - pub backend: String, - pub name: String, -} - -impl Channel { - pub fn name(&self) -> &str { - &self.name - } -} - -#[test] -fn serde_channel() { - let _ex = "{\"name\":\"thechannel\",\"backend\":\"thebackend\"}"; -} - +use std::path::PathBuf; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AggQuerySingleChannel { - pub ksprefix: String, - pub keyspace: u32, - pub channel: Channel, + pub channel_config: ChannelConfig, pub timebin: u32, pub split: u32, pub tbsize: u32, @@ -39,6 +19,7 @@ pub struct BodyStream { pub inner: Box> + Send + Unpin>, } +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum ScalarType { U8, U16, @@ -91,4 +72,68 @@ impl ScalarType { } } + pub fn index(&self) -> u8 { + use ScalarType::*; + match self { + U8 => 3, + U16 => 5, + U32 => 8, + U64 => 10, + I8 => 2, + I16 => 4, + I32 => 7, + I64 => 9, + F32 => 11, + F64 => 12, + } + } + +} + + +pub struct Node { + pub host: String, + pub port: u16, + pub split: u8, + pub data_base_path: PathBuf, + pub ksprefix: String, +} + +impl Node { + pub fn name(&self) -> String { + format!("{}-{}", self.host, self.port) + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Channel { + pub keyspace: u8, + pub backend: String, + pub name: String, +} + +impl Channel { + pub fn name(&self) -> &str { + &self.name + } +} + +#[test] +fn serde_channel() { + let _ex = "{\"name\":\"thechannel\",\"backend\":\"thebackend\"}"; +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ChannelConfig { + pub channel: Channel, + pub time_bin_size: u64, + pub scalar_type: ScalarType, + pub shape: Shape, + pub compression: bool, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum Shape { + Scalar, + Wave(usize), }