This commit is contained in:
Dominik Werder
2021-04-09 22:17:58 +02:00
parent 09adbc4405
commit ef00b6e923
4 changed files with 203 additions and 114 deletions

View File

@@ -385,7 +385,6 @@ impl AggregatorTdim for MinMaxAvgScalarBinSingleAggregator {
todo!()
}
fn ingest(&mut self, v: &Self::InputValue) {
todo!()
}
@@ -810,34 +809,6 @@ fn agg_x_dim_1() {
}
async fn agg_x_dim_1_inner() {
let vals = ValuesDim1 {
tss: vec![0, 1, 2, 3],
values: vec![
vec![0., 0., 0.],
vec![1., 1., 1.],
vec![2., 2., 2.],
vec![3., 3., 3.],
],
};
// I want to distinguish already in the outer part between dim-0 and dim-1 and generate
// separate code for these cases...
// That means that also the reading chain itself needs to be typed on that.
// Need to supply some event-payload converter type which has that type as Output type.
let vals2 = vals.into_agg();
// Now the T-binning:
/*
T-aggregator must be able to produce empty-values of correct type even if we never get
a single value of input data.
Therefore, it needs the bin range definition.
How do I want to drive the system?
If I write the T-binner as a Stream, then I also need to pass it the input!
Meaning, I need to pass the Stream which produces the actual numbers from disk.
readchannel() -> Stream of timestamped byte blobs
.to_f32() -> Stream ? indirection to branch on the underlying shape
.agg_x_bins_1() -> Stream ? can I keep it at the single indirection on the top level?
*/
let query = netpod::AggQuerySingleChannel {
ksprefix: "daq_swissfel".into(),
keyspace: 3,
@@ -876,3 +847,35 @@ async fn agg_x_dim_1_inner() {
.for_each(|k| ready(()));
fut1.await;
}
pub fn tmp_some_older_things() {
let vals = ValuesDim1 {
tss: vec![0, 1, 2, 3],
values: vec![
vec![0., 0., 0.],
vec![1., 1., 1.],
vec![2., 2., 2.],
vec![3., 3., 3.],
],
};
// I want to distinguish already in the outer part between dim-0 and dim-1 and generate
// separate code for these cases...
// That means that also the reading chain itself needs to be typed on that.
// Need to supply some event-payload converter type which has that type as Output type.
let vals2 = vals.into_agg();
// Now the T-binning:
/*
T-aggregator must be able to produce empty-values of correct type even if we never get
a single value of input data.
Therefore, it needs the bin range definition.
How do I want to drive the system?
If I write the T-binner as a Stream, then I also need to pass it the input!
Meaning, I need to pass the Stream which produces the actual numbers from disk.
readchannel() -> Stream of timestamped byte blobs
.to_f32() -> Stream ? indirection to branch on the underlying shape
.agg_x_bins_1() -> Stream ? can I keep it at the single indirection on the top level?
*/
}

View File

@@ -14,8 +14,8 @@ use std::path::{Path, PathBuf};
use bitshuffle::bitshuffle_compress;
use netpod::ScalarType;
use std::sync::Arc;
use crate::gen::Shape::Scalar;
use crate::timeunits::*;
use netpod::{Node, Channel, Shape};
#[test]
fn test_gen_test_data() {
@@ -26,14 +26,11 @@ fn test_gen_test_data() {
}
pub async fn gen_test_data() -> Result<(), Error> {
let direnv = DirEnv {
path: "../tmpdata".into(),
ksprefix: "ks".into(),
};
let data_base_path = PathBuf::from("../tmpdata");
let ksprefix = String::from("ks");
let mut ensemble = Ensemble {
nodes: vec![],
channels: vec![],
direnv: Arc::new(direnv),
};
{
let chan = Channel {
@@ -41,8 +38,9 @@ pub async fn gen_test_data() -> Result<(), Error> {
keyspace: 3,
name: "wave1".into(),
time_bin_size: DAY,
scalar_type: ScalarType::F32,
shape: Shape::Wave(42),
scalar_type: ScalarType::F64,
shape: Shape::Wave(9),
compression: true,
time_spacing: HOUR * 6,
};
ensemble.channels.push(chan);
@@ -51,11 +49,15 @@ pub async fn gen_test_data() -> Result<(), Error> {
host: "localhost".into(),
port: 7780,
split: 0,
data_base_path: data_base_path.clone(),
ksprefix: ksprefix.clone(),
};
let node1 = Node {
host: "localhost".into(),
port: 7781,
split: 1,
data_base_path: data_base_path.clone(),
ksprefix: ksprefix.clone(),
};
ensemble.nodes.push(node0);
ensemble.nodes.push(node1);
@@ -68,39 +70,6 @@ pub async fn gen_test_data() -> Result<(), Error> {
struct Ensemble {
nodes: Vec<Node>,
channels: Vec<Channel>,
direnv: Arc<DirEnv>,
}
struct DirEnv {
path: PathBuf,
ksprefix: String,
}
struct Node {
host: String,
port: u16,
split: u8,
}
impl Node {
fn name(&self) -> String {
format!("{}-{}", self.host, self.port)
}
}
enum Shape {
Scalar,
Wave(usize),
}
struct Channel {
keyspace: u8,
backend: String,
name: String,
time_bin_size: u64,
scalar_type: ScalarType,
shape: Shape,
time_spacing: u64,
}
async fn gen_node(node: &Node, ensemble: &Ensemble) -> Result<(), Error> {
@@ -112,7 +81,7 @@ async fn gen_node(node: &Node, ensemble: &Ensemble) -> Result<(), Error> {
}
async fn gen_channel(channel: &Channel, node: &Node, ensemble: &Ensemble) -> Result<(), Error> {
let mut channel_path = ensemble.direnv.path.clone()
let channel_path = ensemble.direnv.path.clone()
.join(node.name())
.join(format!("{}_{}", ensemble.direnv.ksprefix, channel.keyspace))
.join("byTime")
@@ -142,6 +111,7 @@ async fn gen_timebin(ts: u64, channel_path: &Path, channel: &Channel, node: &Nod
let tsmax = (tb + 1) * channel.time_bin_size;
while ts < tsmax {
trace!("gen ts {}", ts);
gen_event(&mut file, ts, channel).await?;
ts += channel.time_spacing;
}
let ret = GenTimebinRes {
@@ -152,7 +122,6 @@ async fn gen_timebin(ts: u64, channel_path: &Path, channel: &Channel, node: &Nod
async fn gen_datafile_header(file: &mut File, channel: &Channel) -> Result<(), Error> {
let mut buf = BytesMut::with_capacity(1024);
//use bytes::BufMut;
let cnenc = channel.name.as_bytes();
let len1 = cnenc.len() + 8;
buf.put_i16(0);
@@ -162,3 +131,61 @@ async fn gen_datafile_header(file: &mut File, channel: &Channel) -> Result<(), E
file.write_all(&buf).await?;
Ok(())
}
async fn gen_event(file: &mut File, ts: u64, channel: &Channel) -> Result<(), Error> {
let mut buf = BytesMut::with_capacity(1024 * 16);
buf.put_i32(0xcafecafe as u32 as i32);
buf.put_u64(0xcafecafe);
buf.put_u64(ts);
buf.put_u64(2323);
buf.put_u64(0xcafecafe);
buf.put_u8(0);
buf.put_u8(0);
buf.put_i32(-1);
use crate::dtflags::*;
if channel.compression {
match channel.shape {
Shape::Wave(ele_count) => {
buf.put_u8(COMPRESSION | ARRAY | SHAPE | BIG_ENDIAN);
buf.put_u8(channel.scalar_type.index());
let comp_method = 0 as u8;
buf.put_u8(comp_method);
buf.put_u8(1);
buf.put_u32(ele_count as u32);
match &channel.scalar_type {
ScalarType::F64 => {
let ele_size = 8;
let mut vals = vec![0; ele_size * ele_count];
for i1 in 0..ele_count {
let v = 1.22 as f64;
let a = v.to_be_bytes();
let mut c1 = std::io::Cursor::new(&mut vals);
use std::io::{Seek, SeekFrom};
c1.seek(SeekFrom::Start(i1 as u64 * ele_size as u64))?;
std::io::Write::write_all(&mut c1, &a)?;
}
let mut comp = vec![0u8; ele_size * ele_count + 64];
let n1 = bitshuffle_compress(&vals, &mut comp, ele_count, ele_size, 0).unwrap();
trace!("comp size {} {}e-2", n1, 100 * n1 / vals.len());
buf.put_u64(vals.len() as u64);
let comp_block_size = 0;
buf.put_u32(comp_block_size);
buf.put(comp.as_slice());
}
_ => todo!()
}
}
_ => todo!()
}
}
else {
todo!()
}
{
let len = buf.len() as u32 + 4;
buf.put_u32(len);
buf.as_mut().put_u32(len);
}
file.write_all(buf.as_ref()).await?;
Ok(())
}

View File

@@ -15,14 +15,13 @@ use tokio::fs::{OpenOptions, File};
use bytes::{Bytes, BytesMut, Buf};
use std::path::PathBuf;
use bitshuffle::bitshuffle_decompress;
use netpod::ScalarType;
use netpod::{ScalarType, Node};
pub async fn read_test_1(query: &netpod::AggQuerySingleChannel) -> Result<netpod::BodyStream, Error> {
let pre = "/data/sf-databuffer/daq_swissfel";
let path = format!("{}/{}_{}/byTime/{}/{:019}/{:010}/{:019}_00000_Data", pre, query.ksprefix, query.keyspace, query.channel.name(), query.timebin, query.split, query.tbsize);
debug!("try path: {}", path);
let fin = tokio::fs::OpenOptions::new()
pub async fn read_test_1(query: &netpod::AggQuerySingleChannel, node: &Node) -> Result<netpod::BodyStream, Error> {
let path = datapath(query.timebin as u64, &query.channel_config, node);
debug!("try path: {:?}", path);
let fin = OpenOptions::new()
.read(true)
.open(path)
.await?;
@@ -142,7 +141,7 @@ impl FusedFuture for Fopen1 {
unsafe impl Send for Fopen1 {}
pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> + Send {
pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel, node: &Node) -> impl Stream<Item=Result<Bytes, Error>> + Send {
let mut query = query.clone();
async_stream::stream! {
use tokio::io::AsyncReadExt;
@@ -157,9 +156,8 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
let blen = query.buffer_size as usize;
{
if !fopen_avail && file_prep.is_none() && i1 < 16 {
query.timebin = 18700 + i1;
info!("Prepare open task for next file {}", query.timebin);
fopen.replace(Fopen1::new(datapath(&query)));
info!("Prepare open task for next file {}", query.timebin + i1);
fopen.replace(Fopen1::new(datapath(query.timebin as u64 + i1 as u64, &query.channel_config, node)));
fopen_avail = true;
i1 += 1;
}
@@ -300,7 +298,7 @@ fn open_files(query: &netpod::AggQuerySingleChannel) -> async_channel::Receiver<
for i1 in 0..query.tb_file_count {
query.timebin = tb0 + i1;
let path = datapath(&query);
let fileres = tokio::fs::OpenOptions::new()
let fileres = OpenOptions::new()
.read(true)
.open(&path)
.await;
@@ -573,10 +571,11 @@ impl EventChunker {
let type_flags = sl.read_u8().unwrap();
let type_index = sl.read_u8().unwrap();
assert!(type_index <= 13);
let is_compressed = type_flags & 0x80 != 0;
let is_array = type_flags & 0x40 != 0;
let is_big_endian = type_flags & 0x20 != 0;
let is_shaped = type_flags & 0x10 != 0;
use dtflags::*;
let is_compressed = type_flags & COMPRESSION != 0;
let is_array = type_flags & ARRAY != 0;
let is_big_endian = type_flags & BIG_ENDIAN != 0;
let is_shaped = type_flags & SHAPE != 0;
let compression_method = if is_compressed {
sl.read_u8().unwrap()
}
@@ -595,7 +594,7 @@ impl EventChunker {
for i1 in 0..shape_dim {
shape_lens[i1 as usize] = sl.read_u8().unwrap();
}
if true && is_compressed {
if is_compressed {
//info!("event ts {} is_compressed {}", ts, is_compressed);
let value_bytes = sl.read_u64::<BE>().unwrap();
let block_size = sl.read_u32::<BE>().unwrap();
@@ -618,6 +617,9 @@ impl EventChunker {
assert!(c1.unwrap() as u32 == k1);
ret.add_event(ts, pulse, Some(decomp), ScalarType::from_dtype_index(type_index));
}
else {
todo!()
}
buf.advance(len as usize);
need_min = 4;
}
@@ -864,10 +866,15 @@ pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChan
}
fn datapath(query: &netpod::AggQuerySingleChannel) -> PathBuf {
let pre = "/data/sf-databuffer/daq_swissfel";
let path = format!("{}/{}_{}/byTime/{}/{:019}/{:010}/{:019}_00000_Data", pre, query.ksprefix, query.keyspace, query.channel.name(), query.timebin, query.split, query.tbsize);
path.into()
fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &netpod::Node) -> PathBuf {
//let pre = "/data/sf-databuffer/daq_swissfel";
node.data_base_path
.join(format!("{}_{}", node.ksprefix, config.channel.keyspace))
.join("byTime")
.join(config.channel.name.clone())
.join(format!("{:019}", timebin))
.join(format!("{:010}", node.split))
.join(format!("{:019}_00000_Data", config.time_bin_size))
}
@@ -938,3 +945,10 @@ pub mod timeunits {
pub const DAY: u64 = HOUR * 24;
pub const WEEK: u64 = DAY * 7;
}
pub mod dtflags {
pub const COMPRESSION: u8 = 0x80;
pub const ARRAY: u8 = 0x40;
pub const BIG_ENDIAN: u8 = 0x20;
pub const SHAPE: u8 = 0x10;
}

View File

@@ -1,32 +1,12 @@
use serde::{Serialize, Deserialize};
use err::Error;
//use std::pin::Pin;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Channel {
pub backend: String,
pub name: String,
}
impl Channel {
pub fn name(&self) -> &str {
&self.name
}
}
#[test]
fn serde_channel() {
let _ex = "{\"name\":\"thechannel\",\"backend\":\"thebackend\"}";
}
use std::path::PathBuf;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AggQuerySingleChannel {
pub ksprefix: String,
pub keyspace: u32,
pub channel: Channel,
pub channel_config: ChannelConfig,
pub timebin: u32,
pub split: u32,
pub tbsize: u32,
@@ -39,6 +19,7 @@ pub struct BodyStream {
pub inner: Box<dyn futures_core::Stream<Item=Result<bytes::Bytes, Error>> + Send + Unpin>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ScalarType {
U8,
U16,
@@ -91,4 +72,68 @@ impl ScalarType {
}
}
pub fn index(&self) -> u8 {
use ScalarType::*;
match self {
U8 => 3,
U16 => 5,
U32 => 8,
U64 => 10,
I8 => 2,
I16 => 4,
I32 => 7,
I64 => 9,
F32 => 11,
F64 => 12,
}
}
}
pub struct Node {
pub host: String,
pub port: u16,
pub split: u8,
pub data_base_path: PathBuf,
pub ksprefix: String,
}
impl Node {
pub fn name(&self) -> String {
format!("{}-{}", self.host, self.port)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Channel {
pub keyspace: u8,
pub backend: String,
pub name: String,
}
impl Channel {
pub fn name(&self) -> &str {
&self.name
}
}
#[test]
fn serde_channel() {
let _ex = "{\"name\":\"thechannel\",\"backend\":\"thebackend\"}";
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChannelConfig {
pub channel: Channel,
pub time_bin_size: u64,
pub scalar_type: ScalarType,
pub shape: Shape,
pub compression: bool,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Shape {
Scalar,
Wave(usize),
}