From 09adbc44055932b90f82165e374016ce0633c0ff Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 9 Apr 2021 11:05:18 +0200 Subject: [PATCH] WIP on test datafile generator --- .gitignore | 1 + disk/src/agg.rs | 8 +--- disk/src/gen.rs | 101 +++++++++++++++++++++++++++++++++++++++++------- disk/src/lib.rs | 29 +++++++++----- 4 files changed, 109 insertions(+), 30 deletions(-) diff --git a/.gitignore b/.gitignore index 3e0782b..84eac3f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /Cargo.lock /target /.idea +/tmpdata diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 53de3c6..636da96 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -753,14 +753,8 @@ pub struct TimeRange { ts2: u64, } -const MU: u64 = 1000; -const MS: u64 = MU * 1000; -const SEC: u64 = MS * 1000; -const MIN: u64 = SEC * 60; -const HOUR: u64 = MIN * 60; -const DAY: u64 = HOUR * 24; -const WEEK: u64 = DAY * 7; +use crate::timeunits::*; diff --git a/disk/src/gen.rs b/disk/src/gen.rs index bc9baf5..02838ef 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -7,40 +7,57 @@ use futures_core::Stream; use futures_util::future::FusedFuture; use futures_util::{pin_mut, StreamExt}; use std::pin::Pin; -use tokio::io::AsyncRead; -use tokio::fs::File; +use tokio::io::{AsyncRead, AsyncWriteExt}; +use tokio::fs::{OpenOptions, File}; use bytes::{Bytes, BytesMut, BufMut, Buf}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use bitshuffle::bitshuffle_compress; use netpod::ScalarType; use std::sync::Arc; use crate::gen::Shape::Scalar; +use crate::timeunits::*; + +#[test] +fn test_gen_test_data() { + taskrun::run(async { + gen_test_data().await?; + Ok(()) + }).unwrap(); +} pub async fn gen_test_data() -> Result<(), Error> { + let direnv = DirEnv { + path: "../tmpdata".into(), + ksprefix: "ks".into(), + }; let mut ensemble = Ensemble { nodes: vec![], channels: vec![], + direnv: Arc::new(direnv), }; { let chan = Channel { backend: "test".into(), keyspace: 3, name: "wave1".into(), + time_bin_size: DAY, scalar_type: ScalarType::F32, shape: Shape::Wave(42), - time_spacing: 420, + time_spacing: HOUR * 6, }; ensemble.channels.push(chan); } - let direnv = DirEnv { - path: "./tmpdata".into(), - }; - let direnv = Arc::new(direnv); - let node1 = Node { + let node0 = Node { host: "localhost".into(), port: 7780, - direnv: direnv.clone(), + split: 0, }; + let node1 = Node { + host: "localhost".into(), + port: 7781, + split: 1, + }; + ensemble.nodes.push(node0); ensemble.nodes.push(node1); for node in &ensemble.nodes { gen_node(node, &ensemble).await?; @@ -51,21 +68,23 @@ pub async fn gen_test_data() -> Result<(), Error> { struct Ensemble { nodes: Vec, channels: Vec, + direnv: Arc, } struct DirEnv { path: PathBuf, + ksprefix: String, } struct Node { host: String, port: u16, - direnv: Arc, + split: u8, } impl Node { fn name(&self) -> String { - format!("{}:{}", self.host, self.port) + format!("{}-{}", self.host, self.port) } } @@ -78,12 +97,68 @@ struct Channel { keyspace: u8, backend: String, name: String, + time_bin_size: u64, scalar_type: ScalarType, shape: Shape, time_spacing: u64, } async fn gen_node(node: &Node, ensemble: &Ensemble) -> Result<(), Error> { - tokio::fs::create_dir_all(&node.direnv.path).await?; + tokio::fs::create_dir_all(&ensemble.direnv.path).await?; + for channel in &ensemble.channels { + gen_channel(channel, node, ensemble).await? + } + Ok(()) +} + +async fn gen_channel(channel: &Channel, node: &Node, ensemble: &Ensemble) -> Result<(), Error> { + let mut channel_path = ensemble.direnv.path.clone() + .join(node.name()) + .join(format!("{}_{}", ensemble.direnv.ksprefix, channel.keyspace)) + .join("byTime") + .join(&channel.name); + tokio::fs::create_dir_all(&channel_path).await?; + let mut ts = 0; + while ts < DAY { + let res = gen_timebin(ts, &channel_path, channel, node, ensemble).await?; + ts = res.ts; + } + Ok(()) +} + +struct GenTimebinRes { + ts: u64, +} + +async fn gen_timebin(ts: u64, channel_path: &Path, channel: &Channel, node: &Node, ensemble: &Ensemble) -> Result { + let tb = ts / channel.time_bin_size; + let path = channel_path.join(format!("{:019}", tb)).join(format!("{:010}", node.split)); + tokio::fs::create_dir_all(&path).await?; + let path = path.join(format!("{:019}_{:05}_Data", channel.time_bin_size / MS, 0)); + info!("open file {:?}", path); + let mut file = OpenOptions::new().write(true).create(true).truncate(true).open(path).await?; + gen_datafile_header(&mut file, channel).await?; + let mut ts = ts; + let tsmax = (tb + 1) * channel.time_bin_size; + while ts < tsmax { + trace!("gen ts {}", ts); + ts += channel.time_spacing; + } + let ret = GenTimebinRes { + ts, + }; + Ok(ret) +} + +async fn gen_datafile_header(file: &mut File, channel: &Channel) -> Result<(), Error> { + let mut buf = BytesMut::with_capacity(1024); + //use bytes::BufMut; + let cnenc = channel.name.as_bytes(); + let len1 = cnenc.len() + 8; + buf.put_i16(0); + buf.put_i32(len1 as i32); + buf.put(cnenc); + buf.put_i32(len1 as i32); + file.write_all(&buf).await?; Ok(()) } diff --git a/disk/src/lib.rs b/disk/src/lib.rs index ece88f1..c7ce7d9 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -8,10 +8,10 @@ use std::task::{Context, Poll}; use std::future::Future; use futures_core::Stream; use futures_util::future::FusedFuture; -use futures_util::{pin_mut, StreamExt}; +use futures_util::{FutureExt, StreamExt, pin_mut, select}; use std::pin::Pin; use tokio::io::AsyncRead; -use tokio::fs::File; +use tokio::fs::{OpenOptions, File}; use bytes::{Bytes, BytesMut, Buf}; use std::path::PathBuf; use bitshuffle::bitshuffle_decompress; @@ -84,8 +84,8 @@ impl Stream for FileReader { #[allow(dead_code)] struct Fopen1 { - opts: tokio::fs::OpenOptions, - fut: Pin>>>, + opts: OpenOptions, + fut: Pin>>>, term: bool, } @@ -93,18 +93,18 @@ impl Fopen1 { pub fn new(path: PathBuf) -> Self { let fut = Box::pin(async { - let mut o1 = tokio::fs::OpenOptions::new(); + let mut o1 = OpenOptions::new(); let o2 = o1.read(true); let res = o2.open(path); //() == res; //todo!() res.await - }) as Pin>>>; + }) as Pin>>>; let _fut2: Box> = Box::new(async { 123 }); Self { - opts: tokio::fs::OpenOptions::new(), + opts: OpenOptions::new(), fut, term: false, } @@ -113,7 +113,7 @@ impl Fopen1 { } impl Future for Fopen1 { - type Output = Result; + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let g = self.fut.as_mut(); @@ -143,10 +143,9 @@ unsafe impl Send for Fopen1 {} pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel) -> impl Stream> + Send { - use futures_util::{FutureExt, select}; - use tokio::io::AsyncReadExt; let mut query = query.clone(); async_stream::stream! { + use tokio::io::AsyncReadExt; let mut fopen = None; let mut fopen_avail = false; let mut file_prep: Option = None; @@ -929,3 +928,13 @@ impl futures_core::Stream for RawConcatChannelReader { } } + +pub mod timeunits { + pub const MU: u64 = 1000; + pub const MS: u64 = MU * 1000; + pub const SEC: u64 = MS * 1000; + pub const MIN: u64 = SEC * 60; + pub const HOUR: u64 = MIN * 60; + pub const DAY: u64 = HOUR * 24; + pub const WEEK: u64 = DAY * 7; +}