Refactor
This commit is contained in:
@@ -6,7 +6,7 @@ use std::pin::Pin;
|
||||
use crate::EventFull;
|
||||
use futures_core::Stream;
|
||||
use futures_util::{pin_mut, StreamExt, future::ready};
|
||||
use netpod::ScalarType;
|
||||
use netpod::{Channel, ChannelConfig, ScalarType, Shape, Node};
|
||||
|
||||
pub trait AggregatorTdim {
|
||||
type InputValue;
|
||||
@@ -763,23 +763,34 @@ fn agg_x_dim_0() {
|
||||
}
|
||||
|
||||
async fn agg_x_dim_0_inner() {
|
||||
let node = Node {
|
||||
host: "localhost".into(),
|
||||
port: 8888,
|
||||
data_base_path: "../tmpdata/node0".into(),
|
||||
split: 0,
|
||||
ksprefix: "ks".into(),
|
||||
};
|
||||
let query = netpod::AggQuerySingleChannel {
|
||||
ksprefix: "daq_swissfel".into(),
|
||||
keyspace: 2,
|
||||
channel: netpod::Channel {
|
||||
name: "S10BC01-DBAM070:EOM1_T1".into(),
|
||||
backend: "sf-databuffer".into(),
|
||||
channel_config: ChannelConfig {
|
||||
channel: Channel {
|
||||
backend: "sf-databuffer".into(),
|
||||
keyspace: 2,
|
||||
name: "S10BC01-DBAM070:EOM1_T1".into(),
|
||||
},
|
||||
time_bin_size: DAY,
|
||||
shape: Shape::Scalar,
|
||||
scalar_type: ScalarType::F64,
|
||||
big_endian: true,
|
||||
compression: true,
|
||||
},
|
||||
timebin: 18723,
|
||||
tb_file_count: 1,
|
||||
split: 12,
|
||||
tbsize: 1000 * 60 * 60 * 24,
|
||||
buffer_size: 1024 * 4,
|
||||
};
|
||||
let bin_count = 20;
|
||||
let ts1 = query.timebin as u64 * query.tbsize as u64 * MS;
|
||||
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size * MS;
|
||||
let ts2 = ts1 + HOUR * 24;
|
||||
let fut1 = crate::EventBlobsComplete::new(&query)
|
||||
let fut1 = crate::EventBlobsComplete::new(&query, &node)
|
||||
.into_dim_1_f32_stream()
|
||||
//.take(1000)
|
||||
.map(|q| {
|
||||
@@ -809,23 +820,36 @@ fn agg_x_dim_1() {
|
||||
}
|
||||
|
||||
async fn agg_x_dim_1_inner() {
|
||||
// sf-databuffer
|
||||
// /data/sf-databuffer/daq_swissfel/daq_swissfel_3/byTime/S10BC01-DBAM070\:BAM_CH1_NORM/*
|
||||
let node = Node {
|
||||
host: "localhost".into(),
|
||||
port: 8888,
|
||||
data_base_path: "../tmpdata/node0".into(),
|
||||
split: 0,
|
||||
ksprefix: "ks".into(),
|
||||
};
|
||||
let query = netpod::AggQuerySingleChannel {
|
||||
ksprefix: "daq_swissfel".into(),
|
||||
keyspace: 3,
|
||||
channel: netpod::Channel {
|
||||
name: "S10BC01-DBAM070:BAM_CH1_NORM".into(),
|
||||
backend: "sf-databuffer".into(),
|
||||
channel_config: ChannelConfig {
|
||||
channel: Channel {
|
||||
backend: "ks".into(),
|
||||
keyspace: 3,
|
||||
name: "S10BC01-DBAM070:BAM_CH1_NORM".into(),
|
||||
},
|
||||
time_bin_size: DAY,
|
||||
shape: Shape::Wave(1024),
|
||||
scalar_type: ScalarType::F64,
|
||||
big_endian: true,
|
||||
compression: true,
|
||||
},
|
||||
timebin: 18722,
|
||||
tb_file_count: 1,
|
||||
split: 12,
|
||||
tbsize: 1000 * 60 * 60 * 24,
|
||||
buffer_size: 1024 * 4,
|
||||
};
|
||||
let bin_count = 100;
|
||||
let ts1 = query.timebin as u64 * query.tbsize as u64 * MS;
|
||||
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size * MS;
|
||||
let ts2 = ts1 + HOUR * 24;
|
||||
let fut1 = crate::EventBlobsComplete::new(&query)
|
||||
let fut1 = crate::EventBlobsComplete::new(&query, &node)
|
||||
.into_dim_1_f32_stream()
|
||||
//.take(1000)
|
||||
.map(|q| {
|
||||
|
||||
@@ -15,7 +15,7 @@ use bitshuffle::bitshuffle_compress;
|
||||
use netpod::ScalarType;
|
||||
use std::sync::Arc;
|
||||
use crate::timeunits::*;
|
||||
use netpod::{Node, Channel, Shape};
|
||||
use netpod::{Node, Channel, ChannelConfig, Shape};
|
||||
|
||||
#[test]
|
||||
fn test_gen_test_data() {
|
||||
@@ -33,30 +33,32 @@ pub async fn gen_test_data() -> Result<(), Error> {
|
||||
channels: vec![],
|
||||
};
|
||||
{
|
||||
let chan = Channel {
|
||||
backend: "test".into(),
|
||||
keyspace: 3,
|
||||
name: "wave1".into(),
|
||||
let config = ChannelConfig {
|
||||
channel: Channel {
|
||||
backend: "test".into(),
|
||||
keyspace: 3,
|
||||
name: "wave1".into(),
|
||||
},
|
||||
time_bin_size: DAY,
|
||||
scalar_type: ScalarType::F64,
|
||||
shape: Shape::Wave(9),
|
||||
big_endian: true,
|
||||
compression: true,
|
||||
time_spacing: HOUR * 6,
|
||||
};
|
||||
ensemble.channels.push(chan);
|
||||
ensemble.channels.push(config);
|
||||
}
|
||||
let node0 = Node {
|
||||
host: "localhost".into(),
|
||||
port: 7780,
|
||||
split: 0,
|
||||
data_base_path: data_base_path.clone(),
|
||||
data_base_path: data_base_path.join("node0"),
|
||||
ksprefix: ksprefix.clone(),
|
||||
};
|
||||
let node1 = Node {
|
||||
host: "localhost".into(),
|
||||
port: 7781,
|
||||
split: 1,
|
||||
data_base_path: data_base_path.clone(),
|
||||
data_base_path: data_base_path.join("node1"),
|
||||
ksprefix: ksprefix.clone(),
|
||||
};
|
||||
ensemble.nodes.push(node0);
|
||||
@@ -69,27 +71,30 @@ pub async fn gen_test_data() -> Result<(), Error> {
|
||||
|
||||
struct Ensemble {
|
||||
nodes: Vec<Node>,
|
||||
channels: Vec<Channel>,
|
||||
channels: Vec<ChannelConfig>,
|
||||
}
|
||||
|
||||
async fn gen_node(node: &Node, ensemble: &Ensemble) -> Result<(), Error> {
|
||||
tokio::fs::create_dir_all(&ensemble.direnv.path).await?;
|
||||
for channel in &ensemble.channels {
|
||||
gen_channel(channel, node, ensemble).await?
|
||||
for config in &ensemble.channels {
|
||||
gen_channel(config, node, ensemble).await?
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn gen_channel(channel: &Channel, node: &Node, ensemble: &Ensemble) -> Result<(), Error> {
|
||||
let channel_path = ensemble.direnv.path.clone()
|
||||
.join(node.name())
|
||||
.join(format!("{}_{}", ensemble.direnv.ksprefix, channel.keyspace))
|
||||
async fn gen_channel(config: &ChannelConfig, node: &Node, ensemble: &Ensemble) -> Result<(), Error> {
|
||||
let config_path = node.data_base_path
|
||||
.join("config")
|
||||
.join(&config.channel.name);
|
||||
tokio::fs::create_dir_all(&config_path).await?;
|
||||
let channel_path = node.data_base_path
|
||||
.join(format!("{}_{}", node.ksprefix, config.channel.keyspace))
|
||||
.join("byTime")
|
||||
.join(&channel.name);
|
||||
.join(&config.channel.name);
|
||||
tokio::fs::create_dir_all(&channel_path).await?;
|
||||
let ts_spacing = HOUR * 6;
|
||||
let mut ts = 0;
|
||||
while ts < DAY {
|
||||
let res = gen_timebin(ts, &channel_path, channel, node, ensemble).await?;
|
||||
let res = gen_timebin(ts, ts_spacing, &channel_path, config, node, ensemble).await?;
|
||||
ts = res.ts;
|
||||
}
|
||||
Ok(())
|
||||
@@ -99,20 +104,20 @@ struct GenTimebinRes {
|
||||
ts: u64,
|
||||
}
|
||||
|
||||
async fn gen_timebin(ts: u64, channel_path: &Path, channel: &Channel, node: &Node, ensemble: &Ensemble) -> Result<GenTimebinRes, Error> {
|
||||
let tb = ts / channel.time_bin_size;
|
||||
async fn gen_timebin(ts: u64, ts_spacing: u64, channel_path: &Path, config: &ChannelConfig, node: &Node, ensemble: &Ensemble) -> Result<GenTimebinRes, Error> {
|
||||
let tb = ts / config.time_bin_size;
|
||||
let path = channel_path.join(format!("{:019}", tb)).join(format!("{:010}", node.split));
|
||||
tokio::fs::create_dir_all(&path).await?;
|
||||
let path = path.join(format!("{:019}_{:05}_Data", channel.time_bin_size / MS, 0));
|
||||
let path = path.join(format!("{:019}_{:05}_Data", config.time_bin_size / MS, 0));
|
||||
info!("open file {:?}", path);
|
||||
let mut file = OpenOptions::new().write(true).create(true).truncate(true).open(path).await?;
|
||||
gen_datafile_header(&mut file, channel).await?;
|
||||
gen_datafile_header(&mut file, config).await?;
|
||||
let mut ts = ts;
|
||||
let tsmax = (tb + 1) * channel.time_bin_size;
|
||||
let tsmax = (tb + 1) * config.time_bin_size;
|
||||
while ts < tsmax {
|
||||
trace!("gen ts {}", ts);
|
||||
gen_event(&mut file, ts, channel).await?;
|
||||
ts += channel.time_spacing;
|
||||
gen_event(&mut file, ts, config).await?;
|
||||
ts += ts_spacing;
|
||||
}
|
||||
let ret = GenTimebinRes {
|
||||
ts,
|
||||
@@ -120,9 +125,9 @@ async fn gen_timebin(ts: u64, channel_path: &Path, channel: &Channel, node: &Nod
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
async fn gen_datafile_header(file: &mut File, channel: &Channel) -> Result<(), Error> {
|
||||
async fn gen_datafile_header(file: &mut File, config: &ChannelConfig) -> Result<(), Error> {
|
||||
let mut buf = BytesMut::with_capacity(1024);
|
||||
let cnenc = channel.name.as_bytes();
|
||||
let cnenc = config.channel.name.as_bytes();
|
||||
let len1 = cnenc.len() + 8;
|
||||
buf.put_i16(0);
|
||||
buf.put_i32(len1 as i32);
|
||||
@@ -132,7 +137,7 @@ async fn gen_datafile_header(file: &mut File, channel: &Channel) -> Result<(), E
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn gen_event(file: &mut File, ts: u64, channel: &Channel) -> Result<(), Error> {
|
||||
async fn gen_event(file: &mut File, ts: u64, config: &ChannelConfig) -> Result<(), Error> {
|
||||
let mut buf = BytesMut::with_capacity(1024 * 16);
|
||||
buf.put_i32(0xcafecafe as u32 as i32);
|
||||
buf.put_u64(0xcafecafe);
|
||||
@@ -143,16 +148,16 @@ async fn gen_event(file: &mut File, ts: u64, channel: &Channel) -> Result<(), Er
|
||||
buf.put_u8(0);
|
||||
buf.put_i32(-1);
|
||||
use crate::dtflags::*;
|
||||
if channel.compression {
|
||||
match channel.shape {
|
||||
if config.compression {
|
||||
match config.shape {
|
||||
Shape::Wave(ele_count) => {
|
||||
buf.put_u8(COMPRESSION | ARRAY | SHAPE | BIG_ENDIAN);
|
||||
buf.put_u8(channel.scalar_type.index());
|
||||
buf.put_u8(config.scalar_type.index());
|
||||
let comp_method = 0 as u8;
|
||||
buf.put_u8(comp_method);
|
||||
buf.put_u8(1);
|
||||
buf.put_u32(ele_count as u32);
|
||||
match &channel.scalar_type {
|
||||
match &config.scalar_type {
|
||||
ScalarType::F64 => {
|
||||
let ele_size = 8;
|
||||
let mut vals = vec![0; ele_size * ele_count];
|
||||
|
||||
@@ -143,6 +143,7 @@ unsafe impl Send for Fopen1 {}
|
||||
|
||||
pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel, node: &Node) -> impl Stream<Item=Result<Bytes, Error>> + Send {
|
||||
let mut query = query.clone();
|
||||
let node = node.clone();
|
||||
async_stream::stream! {
|
||||
use tokio::io::AsyncReadExt;
|
||||
let mut fopen = None;
|
||||
@@ -157,7 +158,7 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
|
||||
{
|
||||
if !fopen_avail && file_prep.is_none() && i1 < 16 {
|
||||
info!("Prepare open task for next file {}", query.timebin + i1);
|
||||
fopen.replace(Fopen1::new(datapath(query.timebin as u64 + i1 as u64, &query.channel_config, node)));
|
||||
fopen.replace(Fopen1::new(datapath(query.timebin as u64 + i1 as u64, &query.channel_config, &node)));
|
||||
fopen_avail = true;
|
||||
i1 += 1;
|
||||
}
|
||||
@@ -265,10 +266,11 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
|
||||
}
|
||||
|
||||
|
||||
pub fn raw_concat_channel_read_stream_file_pipe(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<BytesMut, Error>> + Send {
|
||||
pub fn raw_concat_channel_read_stream_file_pipe(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> impl Stream<Item=Result<BytesMut, Error>> + Send {
|
||||
let query = query.clone();
|
||||
let node = node.clone();
|
||||
async_stream::stream! {
|
||||
let chrx = open_files(&query);
|
||||
let chrx = open_files(&query, &node);
|
||||
while let Ok(file) = chrx.recv().await {
|
||||
let mut file = match file {
|
||||
Ok(k) => k,
|
||||
@@ -290,14 +292,15 @@ pub fn raw_concat_channel_read_stream_file_pipe(query: &netpod::AggQuerySingleCh
|
||||
}
|
||||
}
|
||||
|
||||
fn open_files(query: &netpod::AggQuerySingleChannel) -> async_channel::Receiver<Result<tokio::fs::File, Error>> {
|
||||
fn open_files(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> async_channel::Receiver<Result<tokio::fs::File, Error>> {
|
||||
let (chtx, chrx) = async_channel::bounded(2);
|
||||
let mut query = query.clone();
|
||||
let node = node.clone();
|
||||
tokio::spawn(async move {
|
||||
let tb0 = query.timebin;
|
||||
for i1 in 0..query.tb_file_count {
|
||||
query.timebin = tb0 + i1;
|
||||
let path = datapath(&query);
|
||||
let path = datapath(query.timebin as u64, &query.channel_config, &node);
|
||||
let fileres = OpenOptions::new()
|
||||
.read(true)
|
||||
.open(&path)
|
||||
@@ -341,10 +344,11 @@ pub fn file_content_stream(mut file: tokio::fs::File, buffer_size: usize) -> imp
|
||||
}
|
||||
|
||||
|
||||
pub fn parsed1(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> + Send {
|
||||
pub fn parsed1(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> impl Stream<Item=Result<Bytes, Error>> + Send {
|
||||
let query = query.clone();
|
||||
let node = node.clone();
|
||||
async_stream::stream! {
|
||||
let filerx = open_files(&query);
|
||||
let filerx = open_files(&query, &node);
|
||||
while let Ok(fileres) = filerx.recv().await {
|
||||
match fileres {
|
||||
Ok(file) => {
|
||||
@@ -385,9 +389,9 @@ pub struct EventBlobsComplete {
|
||||
}
|
||||
|
||||
impl EventBlobsComplete {
|
||||
pub fn new(query: &netpod::AggQuerySingleChannel) -> Self {
|
||||
pub fn new(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> Self {
|
||||
Self {
|
||||
file_chan: open_files(query),
|
||||
file_chan: open_files(query, node),
|
||||
evs: None,
|
||||
buffer_size: query.buffer_size,
|
||||
}
|
||||
@@ -438,10 +442,11 @@ impl Stream for EventBlobsComplete {
|
||||
}
|
||||
|
||||
|
||||
pub fn event_blobs_complete(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<EventFull, Error>> + Send {
|
||||
pub fn event_blobs_complete(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> impl Stream<Item=Result<EventFull, Error>> + Send {
|
||||
let query = query.clone();
|
||||
let node = node.clone();
|
||||
async_stream::stream! {
|
||||
let filerx = open_files(&query);
|
||||
let filerx = open_files(&query, &node);
|
||||
while let Ok(fileres) = filerx.recv().await {
|
||||
match fileres {
|
||||
Ok(file) => {
|
||||
@@ -812,14 +817,15 @@ impl Stream for NeedMinBuffer {
|
||||
|
||||
|
||||
|
||||
pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> + Send {
|
||||
pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> impl Stream<Item=Result<Bytes, Error>> + Send {
|
||||
let mut query = query.clone();
|
||||
let node = node.clone();
|
||||
async_stream::stream! {
|
||||
let mut i1 = 0;
|
||||
loop {
|
||||
let timebin = 18700 + i1;
|
||||
query.timebin = timebin;
|
||||
let s2 = raw_concat_channel_read_stream_timebin(&query);
|
||||
let s2 = raw_concat_channel_read_stream_timebin(&query, &node);
|
||||
pin_mut!(s2);
|
||||
while let Some(item) = s2.next().await {
|
||||
yield item;
|
||||
@@ -833,16 +839,13 @@ pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) ->
|
||||
}
|
||||
|
||||
|
||||
pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> {
|
||||
pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> impl Stream<Item=Result<Bytes, Error>> {
|
||||
let query = query.clone();
|
||||
let pre = "/data/sf-databuffer/daq_swissfel";
|
||||
let path = format!("{}/{}_{}/byTime/{}/{:019}/{:010}/{:019}_00000_Data", pre, query.ksprefix, query.keyspace, query.channel.name(), query.timebin, query.split, query.tbsize);
|
||||
let node = node.clone();
|
||||
async_stream::stream! {
|
||||
debug!("try path: {}", path);
|
||||
let mut fin = tokio::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(path)
|
||||
.await?;
|
||||
let path = datapath(query.timebin as u64, &query.channel_config, &node);
|
||||
debug!("try path: {:?}", path);
|
||||
let mut fin = OpenOptions::new().read(true).open(path).await?;
|
||||
let meta = fin.metadata().await?;
|
||||
debug!("file meta {:?}", meta);
|
||||
let blen = query.buffer_size as usize;
|
||||
@@ -878,21 +881,6 @@ fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &netpod::Node) -
|
||||
}
|
||||
|
||||
|
||||
pub async fn raw_concat_channel_read(query: &netpod::AggQuerySingleChannel) -> Result<netpod::BodyStream, Error> {
|
||||
let _reader = RawConcatChannelReader {
|
||||
ksprefix: query.ksprefix.clone(),
|
||||
keyspace: query.keyspace,
|
||||
channel: query.channel.clone(),
|
||||
split: query.split,
|
||||
tbsize: query.tbsize,
|
||||
buffer_size: query.buffer_size,
|
||||
tb: 18714,
|
||||
//file_reader: None,
|
||||
fopen: None,
|
||||
};
|
||||
todo!()
|
||||
}
|
||||
|
||||
/**
|
||||
Read all events from all timebins for the given channel and split.
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user