Fix warnings

This commit is contained in:
Dominik Werder
2021-04-19 10:41:54 +02:00
parent 7dc19ad404
commit 8c328cf6cf
16 changed files with 496 additions and 478 deletions

View File

@@ -1,12 +1,10 @@
use crate::merge::MergeDim1F32Stream;
use crate::EventFull;
use err::Error;
use futures_core::Stream;
use futures_util::{future::ready, pin_mut, StreamExt};
use futures_util::StreamExt;
use netpod::BinSpecDimT;
use netpod::{timeunits::*, Channel, ChannelConfig, Node, ScalarType, Shape};
use netpod::{Node, ScalarType};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
@@ -42,7 +40,7 @@ impl AggregatableXdim1Bin for () {
impl AggregatableTdim for () {
type Output = ();
type Aggregator = ();
fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator {
fn aggregator_new(&self, _ts1: u64, _ts2: u64) -> Self::Aggregator {
todo!()
}
}
@@ -50,19 +48,19 @@ impl AggregatorTdim for () {
type InputValue = ();
type OutputValue = ();
fn ends_before(&self, inp: &Self::InputValue) -> bool {
fn ends_before(&self, _inp: &Self::InputValue) -> bool {
todo!()
}
fn ends_after(&self, inp: &Self::InputValue) -> bool {
fn ends_after(&self, _inp: &Self::InputValue) -> bool {
todo!()
}
fn starts_after(&self, inp: &Self::InputValue) -> bool {
fn starts_after(&self, _inp: &Self::InputValue) -> bool {
todo!()
}
fn ingest(&mut self, v: &Self::InputValue) {
fn ingest(&mut self, _v: &Self::InputValue) {
todo!()
}
fn result(self) -> Self::OutputValue {
@@ -305,6 +303,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
}
}
#[allow(dead_code)]
pub struct MinMaxAvgScalarBinBatch {
ts1s: Vec<u64>,
ts2s: Vec<u64>,
@@ -330,7 +329,7 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatch {
impl AggregatableTdim for MinMaxAvgScalarBinBatch {
type Output = MinMaxAvgScalarBinSingle;
type Aggregator = MinMaxAvgScalarBinBatchAggregator;
fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator {
fn aggregator_new(&self, _ts1: u64, _ts2: u64) -> Self::Aggregator {
todo!()
}
}
@@ -341,19 +340,19 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
type InputValue = MinMaxAvgScalarBinBatch;
type OutputValue = MinMaxAvgScalarBinSingle;
fn ends_before(&self, inp: &Self::InputValue) -> bool {
fn ends_before(&self, _inp: &Self::InputValue) -> bool {
todo!()
}
fn ends_after(&self, inp: &Self::InputValue) -> bool {
fn ends_after(&self, _inp: &Self::InputValue) -> bool {
todo!()
}
fn starts_after(&self, inp: &Self::InputValue) -> bool {
fn starts_after(&self, _inp: &Self::InputValue) -> bool {
todo!()
}
fn ingest(&mut self, v: &Self::InputValue) {
fn ingest(&mut self, _v: &Self::InputValue) {
todo!()
}
@@ -384,7 +383,7 @@ impl std::fmt::Debug for MinMaxAvgScalarBinSingle {
impl AggregatableTdim for MinMaxAvgScalarBinSingle {
type Output = MinMaxAvgScalarBinSingle;
type Aggregator = MinMaxAvgScalarBinSingleAggregator;
fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator {
fn aggregator_new(&self, _ts1: u64, _ts2: u64) -> Self::Aggregator {
todo!()
}
}
@@ -402,19 +401,19 @@ impl AggregatorTdim for MinMaxAvgScalarBinSingleAggregator {
type InputValue = MinMaxAvgScalarBinSingle;
type OutputValue = MinMaxAvgScalarBinSingle;
fn ends_before(&self, inp: &Self::InputValue) -> bool {
fn ends_before(&self, _inp: &Self::InputValue) -> bool {
todo!()
}
fn ends_after(&self, inp: &Self::InputValue) -> bool {
fn ends_after(&self, _inp: &Self::InputValue) -> bool {
todo!()
}
fn starts_after(&self, inp: &Self::InputValue) -> bool {
fn starts_after(&self, _inp: &Self::InputValue) -> bool {
todo!()
}
fn ingest(&mut self, v: &Self::InputValue) {
fn ingest(&mut self, _v: &Self::InputValue) {
todo!()
}
@@ -455,7 +454,9 @@ where
// do the conversion
// TODO only a scalar!
todo!();
if true {
todo!();
}
let n1 = decomp.len();
assert!(n1 % ty.bytes() as usize == 0);
@@ -479,10 +480,10 @@ where
ret.tss.push(k.tss[i1]);
ret.values.push(j);
}
_ => todo!(),
_ => err::todoval(),
}
}
Ready(Some(Ok(todo!())))
Ready(Some(Ok(err::todoval())))
}
Ready(Some(Err(e))) => Ready(Some(Err(e))),
Ready(None) => Ready(None),
@@ -744,193 +745,9 @@ pub fn make_test_node(id: u32) -> Node {
id,
host: "localhost".into(),
port: 8800 + id as u16,
port_raw: 8800 + id as u16 + 100,
data_base_path: format!("../tmpdata/node{:02}", id).into(),
split: id,
ksprefix: "ks".into(),
}
}
#[test]
fn agg_x_dim_0() {
taskrun::run(async {
agg_x_dim_0_inner().await;
Ok(())
})
.unwrap();
}
async fn agg_x_dim_0_inner() {
let node = make_test_node(0);
let node = Arc::new(node);
let query = netpod::AggQuerySingleChannel {
channel_config: ChannelConfig {
channel: Channel {
backend: "sf-databuffer".into(),
name: "S10BC01-DBAM070:EOM1_T1".into(),
},
keyspace: 2,
time_bin_size: DAY,
array: false,
shape: Shape::Scalar,
scalar_type: ScalarType::F64,
big_endian: true,
compression: true,
},
timebin: 18723,
tb_file_count: 1,
buffer_size: 1024 * 4,
};
let bin_count = 20;
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size;
let ts2 = ts1 + HOUR * 24;
let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node)
.into_dim_1_f32_stream()
//.take(1000)
.map(|q| {
if let Ok(ref k) = q {
//info!("vals: {:?}", k);
}
q
})
.into_binned_x_bins_1()
.map(|k| {
//info!("after X binning {:?}", k.as_ref().unwrap());
k
})
.into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2))
.map(|k| {
info!("after T binning {:?}", k.as_ref().unwrap());
k
})
.for_each(|k| ready(()));
fut1.await;
}
#[test]
fn agg_x_dim_1() {
taskrun::run(async {
agg_x_dim_1_inner().await;
Ok(())
})
.unwrap();
}
async fn agg_x_dim_1_inner() {
// sf-databuffer
// /data/sf-databuffer/daq_swissfel/daq_swissfel_3/byTime/S10BC01-DBAM070\:BAM_CH1_NORM/*
// S10BC01-DBAM070:BAM_CH1_NORM
let node = make_test_node(0);
let node = Arc::new(node);
let query = netpod::AggQuerySingleChannel {
channel_config: ChannelConfig {
channel: Channel {
backend: "ks".into(),
name: "wave1".into(),
},
keyspace: 3,
time_bin_size: DAY,
array: true,
shape: Shape::Wave(1024),
scalar_type: ScalarType::F64,
big_endian: true,
compression: true,
},
timebin: 0,
tb_file_count: 1,
buffer_size: 17,
};
let bin_count = 10;
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size;
let ts2 = ts1 + HOUR * 24;
let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node)
.into_dim_1_f32_stream()
//.take(1000)
.map(|q| {
if let Ok(ref k) = q {
//info!("vals: {:?}", k);
}
q
})
.into_binned_x_bins_1()
.map(|k| {
//info!("after X binning {:?}", k.as_ref().unwrap());
k
})
.into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2))
.map(|k| {
info!("after T binning {:?}", k.as_ref().unwrap());
k
})
.for_each(|k| ready(()));
fut1.await;
}
#[test]
fn merge_0() {
taskrun::run(async {
merge_0_inner().await;
Ok(())
})
.unwrap();
}
async fn merge_0_inner() {
let query = netpod::AggQuerySingleChannel {
channel_config: ChannelConfig {
channel: Channel {
backend: "ks".into(),
name: "wave1".into(),
},
keyspace: 3,
time_bin_size: DAY,
array: true,
shape: Shape::Wave(17),
scalar_type: ScalarType::F64,
big_endian: true,
compression: true,
},
timebin: 0,
tb_file_count: 1,
buffer_size: 1024 * 8,
};
let streams = (0..13)
.into_iter()
.map(|k| make_test_node(k))
.map(|node| {
let node = Arc::new(node);
crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node).into_dim_1_f32_stream()
})
.collect();
MergeDim1F32Stream::new(streams)
.map(|k| {
//info!("NEXT MERGED ITEM ts {:?}", k.as_ref().unwrap().tss);
})
.fold(0, |k, q| ready(0))
.await;
}
pub fn tmp_some_older_things() {
let vals = ValuesDim1 {
tss: vec![0, 1, 2, 3],
values: vec![vec![0., 0., 0.], vec![1., 1., 1.], vec![2., 2., 2.], vec![3., 3., 3.]],
};
// I want to distinguish already in the outer part between dim-0 and dim-1 and generate
// separate code for these cases...
// That means that also the reading chain itself needs to be typed on that.
// Need to supply some event-payload converter type which has that type as Output type.
let vals2 = vals.into_agg();
// Now the T-binning:
/*
T-aggregator must be able to produce empty-values of correct type even if we never get
a single value of input data.
Therefore, it needs the bin range definition.
How do I want to drive the system?
If I write the T-binner as a Stream, then I also need to pass it the input!
Meaning, I need to pass the Stream which produces the actual numbers from disk.
readchannel() -> Stream of timestamped byte blobs
.to_f32() -> Stream ? indirection to branch on the underlying shape
.agg_x_bins_1() -> Stream ? can I keep it at the single indirection on the top level?
*/
}

203
disk/src/aggtest.rs Normal file
View File

@@ -0,0 +1,203 @@
use super::agg::{AggregatableXdim1Bin, IntoBinnedT, IntoBinnedXBins1, IntoDim1F32Stream, ValuesDim1};
use super::merge::MergeDim1F32Stream;
use crate::agg::make_test_node;
use futures_util::StreamExt;
use netpod::timeunits::*;
use netpod::{BinSpecDimT, Channel, ChannelConfig, ScalarType, Shape};
use std::future::ready;
use std::sync::Arc;
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
#[test]
fn agg_x_dim_0() {
taskrun::run(async {
agg_x_dim_0_inner().await;
Ok(())
})
.unwrap();
}
async fn agg_x_dim_0_inner() {
let node = make_test_node(0);
let node = Arc::new(node);
let query = netpod::AggQuerySingleChannel {
channel_config: ChannelConfig {
channel: Channel {
backend: "sf-databuffer".into(),
name: "S10BC01-DBAM070:EOM1_T1".into(),
},
keyspace: 2,
time_bin_size: DAY,
array: false,
shape: Shape::Scalar,
scalar_type: ScalarType::F64,
big_endian: true,
compression: true,
},
timebin: 18723,
tb_file_count: 1,
buffer_size: 1024 * 4,
};
let bin_count = 20;
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size;
let ts2 = ts1 + HOUR * 24;
let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node)
.into_dim_1_f32_stream()
//.take(1000)
.map(|q| {
if false {
if let Ok(ref k) = q {
trace!("vals: {:?}", k);
}
}
q
})
.into_binned_x_bins_1()
.map(|k| {
if false {
trace!("after X binning {:?}", k.as_ref().unwrap());
}
k
})
.into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2))
.map(|k| {
if false {
trace!("after T binning {:?}", k.as_ref().unwrap());
}
k
})
.for_each(|_k| ready(()));
fut1.await;
}
#[test]
fn agg_x_dim_1() {
taskrun::run(async {
agg_x_dim_1_inner().await;
Ok(())
})
.unwrap();
}
async fn agg_x_dim_1_inner() {
// sf-databuffer
// /data/sf-databuffer/daq_swissfel/daq_swissfel_3/byTime/S10BC01-DBAM070\:BAM_CH1_NORM/*
// S10BC01-DBAM070:BAM_CH1_NORM
let node = make_test_node(0);
let node = Arc::new(node);
let query = netpod::AggQuerySingleChannel {
channel_config: ChannelConfig {
channel: Channel {
backend: "ks".into(),
name: "wave1".into(),
},
keyspace: 3,
time_bin_size: DAY,
array: true,
shape: Shape::Wave(1024),
scalar_type: ScalarType::F64,
big_endian: true,
compression: true,
},
timebin: 0,
tb_file_count: 1,
buffer_size: 17,
};
let bin_count = 10;
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size;
let ts2 = ts1 + HOUR * 24;
let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node)
.into_dim_1_f32_stream()
//.take(1000)
.map(|q| {
if false {
if let Ok(ref k) = q {
info!("vals: {:?}", k);
}
}
q
})
.into_binned_x_bins_1()
.map(|k| {
//info!("after X binning {:?}", k.as_ref().unwrap());
k
})
.into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2))
.map(|k| {
info!("after T binning {:?}", k.as_ref().unwrap());
k
})
.for_each(|_k| ready(()));
fut1.await;
}
#[test]
fn merge_0() {
taskrun::run(async {
merge_0_inner().await;
Ok(())
})
.unwrap();
}
async fn merge_0_inner() {
let query = netpod::AggQuerySingleChannel {
channel_config: ChannelConfig {
channel: Channel {
backend: "ks".into(),
name: "wave1".into(),
},
keyspace: 3,
time_bin_size: DAY,
array: true,
shape: Shape::Wave(17),
scalar_type: ScalarType::F64,
big_endian: true,
compression: true,
},
timebin: 0,
tb_file_count: 1,
buffer_size: 1024 * 8,
};
let streams = (0..13)
.into_iter()
.map(|k| make_test_node(k))
.map(|node| {
let node = Arc::new(node);
crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node).into_dim_1_f32_stream()
})
.collect();
MergeDim1F32Stream::new(streams)
.map(|_k| {
//info!("NEXT MERGED ITEM ts {:?}", k.as_ref().unwrap().tss);
})
.fold(0, |_k, _q| ready(0))
.await;
}
pub fn tmp_some_older_things() {
let vals = ValuesDim1 {
tss: vec![0, 1, 2, 3],
values: vec![vec![0., 0., 0.], vec![1., 1., 1.], vec![2., 2., 2.], vec![3., 3., 3.]],
};
// I want to distinguish already in the outer part between dim-0 and dim-1 and generate
// separate code for these cases...
// That means that also the reading chain itself needs to be typed on that.
// Need to supply some event-payload converter type which has that type as Output type.
let _vals2 = vals.into_agg();
// Now the T-binning:
/*
T-aggregator must be able to produce empty-values of correct type even if we never get
a single value of input data.
Therefore, it needs the bin range definition.
How do I want to drive the system?
If I write the T-binner as a Stream, then I also need to pass it the input!
Meaning, I need to pass the Stream which produces the actual numbers from disk.
readchannel() -> Stream of timestamped byte blobs
.to_f32() -> Stream ? indirection to branch on the underlying shape
.agg_x_bins_1() -> Stream ? can I keep it at the single indirection on the top level?
*/
}

View File

@@ -3,19 +3,17 @@ use bytes::{BufMut, Bytes, BytesMut};
use chrono::{DateTime, Utc};
use err::Error;
use futures_core::Stream;
use futures_util::{pin_mut, FutureExt, StreamExt, TryFutureExt};
use http::uri::Scheme;
use futures_util::{pin_mut, FutureExt, StreamExt};
use netpod::{
AggKind, Channel, Cluster, NanoRange, Node, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchGridSpec,
PreBinnedPatchIterator, PreBinnedPatchRange, ToNanos,
AggKind, Channel, Cluster, NanoRange, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange,
ToNanos,
};
use serde::{Deserialize, Serialize};
use std::future::{ready, Future};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tiny_keccak::Hasher;
use tokio::fs::OpenOptions;
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
@@ -97,7 +95,7 @@ impl Stream for BinnedBytesForHttpStream {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
Ready(Some(Ok(_k))) => {
let mut buf = BytesMut::with_capacity(250);
buf.put(&b"TODO serialize to bytes\n"[..]);
Ready(Some(Ok(buf.freeze())))
@@ -168,7 +166,7 @@ impl Stream for PreBinnedValueByteStream {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
Ready(Some(Ok(_k))) => {
error!("TODO convert item to Bytes");
let buf = Bytes::new();
Ready(Some(Ok(buf)))
@@ -233,7 +231,7 @@ impl PreBinnedValueStream {
let channel = self.channel.clone();
let agg_kind = self.agg_kind.clone();
let node_config = self.node_config.clone();
let mut patch_it = PreBinnedPatchIterator::from_range(range);
let patch_it = PreBinnedPatchIterator::from_range(range);
let s = futures_util::stream::iter(patch_it)
.map(move |coord| {
PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), node_config.clone())
@@ -264,13 +262,10 @@ impl Stream for PreBinnedValueStream {
fut.poll_next_unpin(cx)
} else if let Some(fut) = self.open_check_local_file.as_mut() {
match fut.poll_unpin(cx) {
Ready(Ok(file)) => {
todo!("IMPLEMENT READ FROM LOCAL CACHE");
Pending
}
Ready(Ok(_file)) => err::todoval(),
Ready(Err(e)) => match e.kind() {
std::io::ErrorKind::NotFound => {
warn!("TODO LOCAL CACHE FILE NOT FOUND");
error!("TODO LOCAL CACHE FILE NOT FOUND");
self.try_setup_fetch_prebinned_higher_res();
continue 'outer;
}
@@ -282,6 +277,7 @@ impl Stream for PreBinnedValueStream {
Pending => Pending,
}
} else {
#[allow(unused_imports)]
use std::os::unix::fs::OpenOptionsExt;
let mut opts = std::fs::OpenOptions::new();
opts.read(true);
@@ -295,7 +291,6 @@ impl Stream for PreBinnedValueStream {
pub struct PreBinnedValueFetchedStream {
uri: http::Uri,
patch_coord: PreBinnedPatchCoord,
resfut: Option<hyper::client::ResponseFuture>,
res: Option<hyper::Response<hyper::Body>>,
}
@@ -323,7 +318,6 @@ impl PreBinnedValueFetchedStream {
.unwrap();
Self {
uri,
patch_coord,
resfut: None,
res: None,
}
@@ -345,14 +339,14 @@ impl Stream for PreBinnedValueFetchedStream {
pin_mut!(res);
use hyper::body::HttpBody;
match res.poll_data(cx) {
Ready(Some(Ok(k))) => Pending,
Ready(Some(Ok(_k))) => todo!(),
Ready(Some(Err(e))) => Ready(Some(Err(e.into()))),
Ready(None) => Ready(None),
Pending => Pending,
}
}
None => match self.resfut.as_mut() {
Some(mut resfut) => match resfut.poll_unpin(cx) {
Some(resfut) => match resfut.poll_unpin(cx) {
Ready(res) => match res {
Ok(res) => {
info!("GOT result from SUB REQUEST: {:?}", res);
@@ -394,7 +388,6 @@ impl BinnedStream {
node_config: Arc<NodeConfig>,
) -> Self {
warn!("BinnedStream will open a PreBinnedValueStream");
let mut patch_it = patch_it;
let inp = futures_util::stream::iter(patch_it)
.map(move |coord| {
PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), node_config.clone())
@@ -426,7 +419,7 @@ impl Stream for BinnedStream {
pub struct SomeReturnThing {}
impl From<SomeReturnThing> for Bytes {
fn from(k: SomeReturnThing) -> Self {
fn from(_k: SomeReturnThing) -> Self {
error!("TODO convert result to octets");
todo!("TODO convert result to octets")
}
@@ -441,7 +434,7 @@ pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, c
hash.update(&patch_coord.bin_t_len().to_le_bytes());
let mut out = [0; 32];
hash.finalize(&mut out);
let mut a = [out[0], out[1], out[2], out[3]];
let a = [out[0], out[1], out[2], out[3]];
let ix = u32::from_le_bytes(a) % cluster.nodes.len() as u32;
info!("node_ix_for_patch {}", ix);
ix

View File

@@ -1,5 +1,4 @@
use err::Error;
use nom::error::{ErrorKind, VerboseError};
use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8};
use nom::Needed;
#[allow(unused_imports)]
@@ -7,7 +6,6 @@ use nom::{
bytes::complete::{tag, take, take_while_m_n},
combinator::map_res,
sequence::tuple,
IResult,
};
use num_derive::{FromPrimitive, ToPrimitive};
use num_traits::ToPrimitive;
@@ -15,6 +13,16 @@ use serde::{Deserialize, Serialize};
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
type NRes<'a, O> = nom::IResult<&'a [u8], O, err::Error>;
fn mkerr<'a, S, O>(msg: S) -> NRes<'a, O>
where
S: Into<String>,
{
let e = Error::with_msg(msg);
Err(nom::Err::Error(e))
}
#[derive(Debug, FromPrimitive, ToPrimitive, Serialize, Deserialize)]
pub enum DType {
Bool = 0,
@@ -56,7 +64,7 @@ pub struct ConfigEntry {
pub pulse: i64,
pub ks: i32,
pub bs: i64,
pub splitCount: i32,
pub split_count: i32,
pub status: i32,
pub bb: i8,
pub modulo: i32,
@@ -69,71 +77,62 @@ pub struct ConfigEntry {
*/
pub precision: i16,
pub dtype: DType,
pub isCompressed: bool,
pub isShaped: bool,
pub isArray: bool,
pub isBigEndian: bool,
pub compressionMethod: Option<CompressionMethod>,
pub is_compressed: bool,
pub is_shaped: bool,
pub is_array: bool,
pub is_big_endian: bool,
pub compression_method: Option<CompressionMethod>,
pub shape: Option<Vec<u32>>,
pub sourceName: Option<String>,
pub source_name: Option<String>,
unit: Option<String>,
description: Option<String>,
optionalFields: Option<String>,
valueConverter: Option<String>,
optional_fields: Option<String>,
value_converter: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Config {
pub formatVersion: i16,
pub channelName: String,
pub format_version: i16,
pub channel_name: String,
pub entries: Vec<ConfigEntry>,
}
fn parse_short_string(inp: &[u8]) -> IResult<&[u8], Option<String>> {
fn parse_short_string(inp: &[u8]) -> NRes<Option<String>> {
let (inp, len1) = be_i32(inp)?;
if len1 == -1 {
return Ok((inp, None));
}
if len1 < 4 {
error!("bad string len {}", len1);
let err = nom::error::make_error(inp, ErrorKind::Verify);
return Err(nom::Err::Error(err));
return mkerr(format!("bad string len {}", len1));
}
if len1 > 500 {
error!("large string len {}", len1);
let err = nom::error::make_error(inp, ErrorKind::Verify);
return Err(nom::Err::Error(err));
return mkerr(format!("large string len {}", len1));
}
let (inp, snb) = take((len1 - 4) as usize)(inp)?;
match String::from_utf8(snb.to_vec()) {
Ok(s1) => Ok((inp, Some(s1))),
Err(e) => {
let err = nom::error::make_error(inp, ErrorKind::Verify);
Err(nom::Err::Error(err))
}
Err(e) => mkerr(format!("{:?}", e)),
}
}
pub fn parse_entry(inp: &[u8]) -> IResult<&[u8], Option<ConfigEntry>> {
let t = be_i32(inp);
let (inp, len1) = t?;
//pub fn parse_entry(inp: &[u8]) -> IResult<&[u8], Option<ConfigEntry>> {
pub fn parse_entry(inp: &[u8]) -> NRes<Option<ConfigEntry>> {
let (inp, len1) = be_i32(inp)?;
if len1 < 0 || len1 > 4000 {
return Err(nom::Err::Error(nom::error::Error::new(inp, ErrorKind::Verify)));
//return Err(format!("ConfigEntry bad len1 {}", len1).into());
return mkerr(format!("ConfigEntry bad len1 {}", len1));
}
if inp.len() == 0 {
return Ok((inp, None));
}
if inp.len() < len1 as usize - 4 {
return Err(nom::Err::Incomplete(Needed::new(len1 as usize - 4)));
//return Err(format!("incomplete input").into());
}
let inpE = &inp[(len1 - 8) as usize..];
let inp_e = &inp[(len1 - 8) as usize..];
let (inp, ts) = be_i64(inp)?;
let (inp, pulse) = be_i64(inp)?;
let (inp, ks) = be_i32(inp)?;
let (inp, bs) = be_i64(inp)?;
let (inp, splitCount) = be_i32(inp)?;
let (inp, split_count) = be_i32(inp)?;
let (inp, status) = be_i32(inp)?;
let (inp, bb) = be_i8(inp)?;
let (inp, modulo) = be_i32(inp)?;
@@ -141,47 +140,41 @@ pub fn parse_entry(inp: &[u8]) -> IResult<&[u8], Option<ConfigEntry>> {
let (inp, precision) = be_i16(inp)?;
let (inp, dtlen) = be_i32(inp)?;
if dtlen > 100 {
error!("unexpected data type len {}", dtlen);
return Err(nom::Err::Error(nom::error::make_error(inp, ErrorKind::Verify)));
//return Err(format!("unexpected data type len {}", dtlen).into());
return mkerr(format!("unexpected data type len {}", dtlen));
}
let (inp, dtmask) = be_u8(inp)?;
let isCompressed = dtmask & 0x80 != 0;
let isArray = dtmask & 0x40 != 0;
let isBigEndian = dtmask & 0x20 != 0;
let isShaped = dtmask & 0x10 != 0;
let is_compressed = dtmask & 0x80 != 0;
let is_array = dtmask & 0x40 != 0;
let is_big_endian = dtmask & 0x20 != 0;
let is_shaped = dtmask & 0x10 != 0;
let (inp, dtype) = be_i8(inp)?;
if dtype > 13 {
error!("unexpected data type {}", dtype);
return Err(nom::Err::Error(nom::error::make_error(inp, ErrorKind::Verify)));
return mkerr(format!("unexpected data type {}", dtype));
}
let dtype = match num_traits::FromPrimitive::from_i8(dtype) {
Some(k) => k,
None => {
error!("Can not convert {} to DType", dtype);
return Err(nom::Err::Error(nom::error::make_error(inp, ErrorKind::Verify)));
return mkerr(format!("Can not convert {} to DType", dtype));
}
};
let (inp, compressionMethod) = match isCompressed {
let (inp, compression_method) = match is_compressed {
false => (inp, None),
true => {
let (inp, cm) = be_u8(inp)?;
match num_traits::FromPrimitive::from_u8(cm) {
Some(k) => (inp, Some(k)),
None => {
error!("unknown compression");
return Err(nom::Err::Error(nom::error::make_error(inp, ErrorKind::Verify)));
return mkerr(format!("unknown compression"));
}
}
}
};
let (inp, shape) = match isShaped {
let (inp, shape) = match is_shaped {
false => (inp, None),
true => {
let (mut inp, dim) = be_u8(inp)?;
if dim > 4 {
error!("unexpected number of dimensions: {}", dim);
return Err(nom::Err::Error(nom::error::make_error(inp, ErrorKind::Verify)));
return mkerr(format!("unexpected number of dimensions: {}", dim));
}
let mut shape = vec![];
for _ in 0..dim {
@@ -192,42 +185,41 @@ pub fn parse_entry(inp: &[u8]) -> IResult<&[u8], Option<ConfigEntry>> {
(inp, Some(shape))
}
};
let (inp, sourceName) = parse_short_string(inp)?;
let (inp, source_name) = parse_short_string(inp)?;
let (inp, unit) = parse_short_string(inp)?;
let (inp, description) = parse_short_string(inp)?;
let (inp, optionalFields) = parse_short_string(inp)?;
let (inp, valueConverter) = parse_short_string(inp)?;
assert_eq!(inp.len(), inpE.len());
let (inpE, len2) = be_i32(inpE)?;
let (inp, optional_fields) = parse_short_string(inp)?;
let (inp, value_converter) = parse_short_string(inp)?;
assert_eq!(inp.len(), inp_e.len());
let (inp_e, len2) = be_i32(inp_e)?;
if len1 != len2 {
error!("mismatch len1 {} len2 {}", len1, len2);
return Err(nom::Err::Error(nom::error::make_error(inp, ErrorKind::Verify)));
return mkerr(format!("mismatch len1 {} len2 {}", len1, len2));
}
Ok((
inpE,
inp_e,
Some(ConfigEntry {
ts,
pulse,
ks,
bs,
splitCount,
split_count: split_count,
status,
bb,
modulo,
offset,
precision,
dtype,
isCompressed,
isArray,
isShaped,
isBigEndian,
compressionMethod,
is_compressed: is_compressed,
is_array: is_array,
is_shaped: is_shaped,
is_big_endian: is_big_endian,
compression_method: compression_method,
shape,
sourceName,
source_name: source_name,
unit,
description,
optionalFields,
valueConverter,
optional_fields: optional_fields,
value_converter: value_converter,
}),
))
}
@@ -235,39 +227,36 @@ pub fn parse_entry(inp: &[u8]) -> IResult<&[u8], Option<ConfigEntry>> {
/*
Parse the full configuration file.
*/
pub fn parseConfig(inp: &[u8]) -> IResult<&[u8], Config> {
pub fn parse_config(inp: &[u8]) -> NRes<Config> {
let (inp, ver) = be_i16(inp)?;
let (inp, len1) = be_i32(inp)?;
if len1 <= 8 || len1 > 500 {
error!("no channel name. len1 {}", len1);
return Err(nom::Err::Error(nom::error::make_error(inp, ErrorKind::Verify)));
return mkerr(format!("no channel name. len1 {}", len1));
}
let (inp, chn) = take((len1 - 8) as usize)(inp)?;
let (inp, len2) = be_i32(inp)?;
if len1 != len2 {
error!("Mismatch len1 {} len2 {}", len1, len2);
return Err(nom::Err::Error(nom::error::make_error(inp, ErrorKind::Verify)));
return mkerr(format!("Mismatch len1 {} len2 {}", len1, len2));
}
let mut entries = vec![];
let mut inpA = inp;
while inpA.len() > 0 {
let inp = inpA;
let mut inp_a = inp;
while inp_a.len() > 0 {
let inp = inp_a;
let (inp, e) = parse_entry(inp)?;
if let Some(e) = e {
entries.push(e);
}
inpA = inp;
inp_a = inp;
}
let channelName = match String::from_utf8(chn.to_vec()) {
let channel_name = match String::from_utf8(chn.to_vec()) {
Ok(k) => k,
Err(e) => {
error!("channelName utf8 error");
return Err(nom::Err::Error(nom::error::make_error(inp, ErrorKind::Verify)));
return mkerr(format!("channelName utf8 error {:?}", e));
}
};
let ret = Config {
formatVersion: ver,
channelName,
format_version: ver,
channel_name: channel_name,
entries: entries,
};
Ok((inp, ret))
@@ -285,15 +274,15 @@ fn read_data() -> Vec<u8> {
#[test]
fn parse_dummy() {
let config = parseConfig(&[0, 0, 0, 0, 0, 11, 0x61, 0x62, 0x63, 0, 0, 0, 11, 0, 0, 0, 1]).unwrap();
assert_eq!(0, config.1.formatVersion);
assert_eq!("abc", config.1.channelName);
let config = parse_config(&[0, 0, 0, 0, 0, 11, 0x61, 0x62, 0x63, 0, 0, 0, 11, 0, 0, 0, 1]).unwrap();
assert_eq!(0, config.1.format_version);
assert_eq!("abc", config.1.channel_name);
}
#[test]
fn open_file() {
let config = parseConfig(&read_data()).unwrap().1;
assert_eq!(0, config.formatVersion);
let config = parse_config(&read_data()).unwrap().1;
assert_eq!(0, config.format_version);
assert_eq!(9, config.entries.len());
for e in &config.entries {
assert!(e.ts >= 631152000000000000);

View File

@@ -1,19 +1,12 @@
use crate::ChannelConfigExt;
use bitshuffle::bitshuffle_compress;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use bytes::{BufMut, BytesMut};
use err::Error;
use futures_core::Stream;
use futures_util::future::FusedFuture;
use futures_util::{pin_mut, StreamExt};
use netpod::ScalarType;
use netpod::{timeunits::*, Channel, ChannelConfig, Node, Shape};
use std::future::Future;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncRead, AsyncWriteExt};
use tokio::io::AsyncWriteExt;
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
@@ -58,6 +51,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
id: i1,
host: "localhost".into(),
port: 7780 + i1 as u16,
port_raw: 7780 + i1 as u16 + 100,
split: i1,
data_base_path: data_base_path.join(format!("node{:02}", i1)),
ksprefix: ksprefix.clone(),
@@ -108,7 +102,12 @@ async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) ->
Ok(())
}
async fn gen_config(config_path: &Path, config: &ChannelConfig, node: &Node, ensemble: &Ensemble) -> Result<(), Error> {
async fn gen_config(
config_path: &Path,
config: &ChannelConfig,
_node: &Node,
_ensemble: &Ensemble,
) -> Result<(), Error> {
let path = config_path.join("latest");
tokio::fs::create_dir_all(&path).await?;
let path = path.join("00000_Config");
@@ -173,7 +172,7 @@ async fn gen_config(config_path: &Path, config: &ChannelConfig, node: &Node, ens
let len = p2 - p1 + 4;
buf.put_i32(len as i32);
buf.as_mut()[p1..].as_mut().put_i32(len as i32);
file.write(&buf);
file.write(&buf).await?;
Ok(())
}

View File

@@ -18,6 +18,8 @@ use tokio::io::AsyncRead;
use tracing::{debug, error, info, trace, warn};
pub mod agg;
#[cfg(test)]
pub mod aggtest;
pub mod cache;
pub mod channelconfig;
pub mod gen;
@@ -136,7 +138,7 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(
query: &netpod::AggQuerySingleChannel,
node: Arc<Node>,
) -> impl Stream<Item = Result<Bytes, Error>> + Send {
let mut query = query.clone();
let query = query.clone();
let node = node.clone();
async_stream::stream! {
use tokio::io::AsyncReadExt;
@@ -349,7 +351,7 @@ pub fn parsed1(
match fileres {
Ok(file) => {
let inp = Box::pin(file_content_stream(file, query.buffer_size as usize));
let mut chunker = EventChunker::new(inp, todo!());
let mut chunker = EventChunker::new(inp, err::todoval());
while let Some(evres) = chunker.next().await {
match evres {
Ok(evres) => {
@@ -414,7 +416,7 @@ impl Stream for EventBlobsComplete {
Ready(Some(k)) => match k {
Ok(file) => {
let inp = Box::pin(file_content_stream(file, self.buffer_size as usize));
let mut chunker = EventChunker::new(inp, self.channel_config.clone());
let chunker = EventChunker::new(inp, self.channel_config.clone());
self.evs.replace(chunker);
continue 'outer;
}
@@ -441,7 +443,7 @@ pub fn event_blobs_complete(
match fileres {
Ok(file) => {
let inp = Box::pin(file_content_stream(file, query.buffer_size as usize));
let mut chunker = EventChunker::new(inp, todo!());
let mut chunker = EventChunker::new(inp, err::todoval());
while let Some(evres) = chunker.next().await {
match evres {
Ok(evres) => {
@@ -463,10 +465,8 @@ pub fn event_blobs_complete(
pub struct EventChunker {
inp: NeedMinBuffer,
had_channel: bool,
polled: u32,
state: DataFileState,
tmpbuf: Vec<u8>,
channel_config: ChannelConfig,
}
@@ -484,10 +484,8 @@ impl EventChunker {
inp.set_need_min(6);
Self {
inp: inp,
had_channel: false,
polled: 0,
state: DataFileState::FileHeader,
tmpbuf: vec![0; 1024 * 1024 * 4],
channel_config,
}
}
@@ -545,7 +543,7 @@ impl EventChunker {
let mut sl = std::io::Cursor::new(buf.as_ref());
sl.read_i32::<BE>().unwrap();
sl.read_i64::<BE>().unwrap();
let ts = sl.read_i64::<BE>().unwrap();
let _ts = sl.read_i64::<BE>().unwrap();
//info!("parse_buf not enough C len {} have {} ts {}", len, buf.len(), ts);
need_min = len as u32;
break;
@@ -569,7 +567,7 @@ impl EventChunker {
use dtflags::*;
let is_compressed = type_flags & COMPRESSION != 0;
let is_array = type_flags & ARRAY != 0;
let is_big_endian = type_flags & BIG_ENDIAN != 0;
let _is_big_endian = type_flags & BIG_ENDIAN != 0;
let is_shaped = type_flags & SHAPE != 0;
if let Shape::Wave(_) = self.channel_config.shape {
assert!(is_array);

View File

@@ -2,7 +2,7 @@ use crate::agg::{Dim1F32Stream, ValuesDim1};
use crate::EventFull;
use err::Error;
use futures_core::Stream;
use futures_util::{future::ready, pin_mut, StreamExt};
use futures_util::StreamExt;
use std::pin::Pin;
use std::task::{Context, Poll};
#[allow(unused_imports)]
@@ -61,10 +61,9 @@ where
self.current[i1] = CurVal::Val(k);
}
Ready(Some(Err(e))) => {
// TODO emit this error, consider this stream as done, anything more to do here?
//self.current[i1] = CurVal::Err(e);
return Ready(Some(Err(e)));
// TODO emit this error, consider this stream as done.
todo!()
}
Ready(None) => {
self.current[i1] = CurVal::Finish;
@@ -72,7 +71,6 @@ where
Pending => {
// TODO is this behaviour correct?
return Pending;
todo!()
}
}
}
@@ -128,6 +126,7 @@ where
enum CurVal {
None,
Finish,
#[allow(dead_code)]
Err(Error),
Val(ValuesDim1),
}

View File

@@ -2,6 +2,7 @@
Provide ser/de of value data to a good net exchange format.
*/
#[allow(dead_code)]
async fn local_unpacked_test() {
// TODO what kind of query format? What information do I need here?
// Don't need exact details of channel because I need to parse the databuffer config anyway.
@@ -24,8 +25,8 @@ async fn local_unpacked_test() {
buffer_size: 1024 * 8,
};*/
let query = todo!();
let node = todo!();
let query = err::todoval();
let node = err::todoval();
// TODO generate channel configs for my test data.
@@ -34,5 +35,5 @@ async fn local_unpacked_test() {
// TODO find the matching config entry. (bonus: fuse consecutive compatible entries)
use crate::agg::IntoDim1F32Stream;
let stream = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node).into_dim_1_f32_stream();
let _stream = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node).into_dim_1_f32_stream();
}