Generate two tiny files and read back one of them
This commit is contained in:
+15
-7
@@ -91,20 +91,23 @@ async fn gen_channel(config: &ChannelConfig, node: &Node, ensemble: &Ensemble) -
|
|||||||
.join("byTime")
|
.join("byTime")
|
||||||
.join(&config.channel.name);
|
.join(&config.channel.name);
|
||||||
tokio::fs::create_dir_all(&channel_path).await?;
|
tokio::fs::create_dir_all(&channel_path).await?;
|
||||||
let ts_spacing = HOUR * 6;
|
let ts_spacing = HOUR * 1;
|
||||||
|
let mut evix = 0;
|
||||||
let mut ts = 0;
|
let mut ts = 0;
|
||||||
while ts < DAY {
|
while ts < DAY {
|
||||||
let res = gen_timebin(ts, ts_spacing, &channel_path, config, node, ensemble).await?;
|
let res = gen_timebin(evix, ts, ts_spacing, &channel_path, config, node, ensemble).await?;
|
||||||
|
evix = res.evix;
|
||||||
ts = res.ts;
|
ts = res.ts;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
struct GenTimebinRes {
|
struct GenTimebinRes {
|
||||||
|
evix: u64,
|
||||||
ts: u64,
|
ts: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn gen_timebin(ts: u64, ts_spacing: u64, channel_path: &Path, config: &ChannelConfig, node: &Node, ensemble: &Ensemble) -> Result<GenTimebinRes, Error> {
|
async fn gen_timebin(evix: u64, ts: u64, ts_spacing: u64, channel_path: &Path, config: &ChannelConfig, node: &Node, ensemble: &Ensemble) -> Result<GenTimebinRes, Error> {
|
||||||
let tb = ts / config.time_bin_size;
|
let tb = ts / config.time_bin_size;
|
||||||
let path = channel_path.join(format!("{:019}", tb)).join(format!("{:010}", node.split));
|
let path = channel_path.join(format!("{:019}", tb)).join(format!("{:010}", node.split));
|
||||||
tokio::fs::create_dir_all(&path).await?;
|
tokio::fs::create_dir_all(&path).await?;
|
||||||
@@ -112,14 +115,19 @@ async fn gen_timebin(ts: u64, ts_spacing: u64, channel_path: &Path, config: &Cha
|
|||||||
info!("open file {:?}", path);
|
info!("open file {:?}", path);
|
||||||
let mut file = OpenOptions::new().write(true).create(true).truncate(true).open(path).await?;
|
let mut file = OpenOptions::new().write(true).create(true).truncate(true).open(path).await?;
|
||||||
gen_datafile_header(&mut file, config).await?;
|
gen_datafile_header(&mut file, config).await?;
|
||||||
|
let mut evix = evix;
|
||||||
let mut ts = ts;
|
let mut ts = ts;
|
||||||
let tsmax = (tb + 1) * config.time_bin_size;
|
let tsmax = (tb + 1) * config.time_bin_size;
|
||||||
while ts < tsmax {
|
while ts < tsmax {
|
||||||
trace!("gen ts {}", ts);
|
if evix % ensemble.nodes.len() as u64 == node.split as u64 {
|
||||||
gen_event(&mut file, ts, config).await?;
|
trace!("gen ts {}", ts);
|
||||||
|
gen_event(&mut file, evix, ts, config).await?;
|
||||||
|
}
|
||||||
|
evix += 1;
|
||||||
ts += ts_spacing;
|
ts += ts_spacing;
|
||||||
}
|
}
|
||||||
let ret = GenTimebinRes {
|
let ret = GenTimebinRes {
|
||||||
|
evix,
|
||||||
ts,
|
ts,
|
||||||
};
|
};
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
@@ -137,7 +145,7 @@ async fn gen_datafile_header(file: &mut File, config: &ChannelConfig) -> Result<
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn gen_event(file: &mut File, ts: u64, config: &ChannelConfig) -> Result<(), Error> {
|
async fn gen_event(file: &mut File, evix: u64, ts: u64, config: &ChannelConfig) -> Result<(), Error> {
|
||||||
let mut buf = BytesMut::with_capacity(1024 * 16);
|
let mut buf = BytesMut::with_capacity(1024 * 16);
|
||||||
buf.put_i32(0xcafecafe as u32 as i32);
|
buf.put_i32(0xcafecafe as u32 as i32);
|
||||||
buf.put_u64(0xcafecafe);
|
buf.put_u64(0xcafecafe);
|
||||||
@@ -162,7 +170,7 @@ async fn gen_event(file: &mut File, ts: u64, config: &ChannelConfig) -> Result<(
|
|||||||
let ele_size = 8;
|
let ele_size = 8;
|
||||||
let mut vals = vec![0; ele_size * ele_count];
|
let mut vals = vec![0; ele_size * ele_count];
|
||||||
for i1 in 0..ele_count {
|
for i1 in 0..ele_count {
|
||||||
let v = 1.22 as f64;
|
let v = evix as f64;
|
||||||
let a = v.to_be_bytes();
|
let a = v.to_be_bytes();
|
||||||
let mut c1 = std::io::Cursor::new(&mut vals);
|
let mut c1 = std::io::Cursor::new(&mut vals);
|
||||||
use std::io::{Seek, SeekFrom};
|
use std::io::{Seek, SeekFrom};
|
||||||
|
|||||||
+2
-2
@@ -617,9 +617,9 @@ impl EventChunker {
|
|||||||
unsafe {
|
unsafe {
|
||||||
decomp.set_len(decomp_bytes);
|
decomp.set_len(decomp_bytes);
|
||||||
}
|
}
|
||||||
debug!("try decompress value_bytes {} ele_size {} ele_count {} type_index {}", value_bytes, ele_size, ele_count, type_index);
|
//debug!("try decompress value_bytes {} ele_size {} ele_count {} type_index {}", value_bytes, ele_size, ele_count, type_index);
|
||||||
let c1 = bitshuffle_decompress(&buf.as_ref()[p1 as usize..], &mut decomp, ele_count as usize, ele_size as usize, 0).unwrap();
|
let c1 = bitshuffle_decompress(&buf.as_ref()[p1 as usize..], &mut decomp, ele_count as usize, ele_size as usize, 0).unwrap();
|
||||||
debug!("decompress result c1 {} k1 {}", c1, k1);
|
//debug!("decompress result c1 {} k1 {}", c1, k1);
|
||||||
assert!(c1 as u32 == k1);
|
assert!(c1 as u32 == k1);
|
||||||
ret.add_event(ts, pulse, Some(decomp), ScalarType::from_dtype_index(type_index));
|
ret.add_event(ts, pulse, Some(decomp), ScalarType::from_dtype_index(type_index));
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user