Factored into separate crate
This commit is contained in:
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
/Cargo.lock
|
||||
/target
|
||||
20
Cargo.toml
Normal file
20
Cargo.toml
Normal file
@@ -0,0 +1,20 @@
|
||||
[package]
|
||||
name = "daqbuf-parse"
|
||||
version = "0.0.3"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
humantime-serde = "1.1"
|
||||
chrono = { version = "0.4.26", features = ["serde"] }
|
||||
bytes = "1.8"
|
||||
byteorder = "1.4"
|
||||
hex = "0.4.3"
|
||||
nom = "7.1.3"
|
||||
daqbuf-err = { path = "../daqbuf-err" }
|
||||
netpod = { path = "../daqbuf-netpod", package = "daqbuf-netpod" }
|
||||
|
||||
[patch.crates-io]
|
||||
thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" }
|
||||
507
src/api1_parse.rs
Normal file
507
src/api1_parse.rs
Normal file
@@ -0,0 +1,507 @@
|
||||
use crate::channelconfig::CompressionMethod;
|
||||
use crate::nom;
|
||||
use daqbuf_err as err;
|
||||
use netpod::log::*;
|
||||
use netpod::ScalarType;
|
||||
use netpod::Shape;
|
||||
use nom::bytes::complete::take;
|
||||
use nom::error::context;
|
||||
use nom::error::ContextError;
|
||||
use nom::error::ErrorKind;
|
||||
use nom::error::ParseError;
|
||||
use nom::multi::many0;
|
||||
use nom::number::complete::be_u32;
|
||||
use nom::number::complete::be_u64;
|
||||
use nom::number::complete::be_u8;
|
||||
use nom::Err;
|
||||
use nom::IResult;
|
||||
use nom::Needed;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::fmt;
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
type Nres<'a, O, E = nom::error::Error<&'a [u8]>> = Result<(&'a [u8], O), nom::Err<E>>;
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub enum Api1ByteOrder {
|
||||
#[serde(rename = "LITTLE_ENDIAN")]
|
||||
Little,
|
||||
#[serde(rename = "BIG_ENDIAN")]
|
||||
Big,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub enum Api1ScalarType {
|
||||
#[serde(rename = "uint8")]
|
||||
U8,
|
||||
#[serde(rename = "uint16")]
|
||||
U16,
|
||||
#[serde(rename = "uint32")]
|
||||
U32,
|
||||
#[serde(rename = "uint64")]
|
||||
U64,
|
||||
#[serde(rename = "int8")]
|
||||
I8,
|
||||
#[serde(rename = "int16")]
|
||||
I16,
|
||||
#[serde(rename = "int32")]
|
||||
I32,
|
||||
#[serde(rename = "int64")]
|
||||
I64,
|
||||
#[serde(rename = "float32")]
|
||||
F32,
|
||||
#[serde(rename = "float64")]
|
||||
F64,
|
||||
#[serde(rename = "bool")]
|
||||
BOOL,
|
||||
#[serde(rename = "string")]
|
||||
STRING,
|
||||
}
|
||||
|
||||
impl Api1ScalarType {
|
||||
pub fn to_str(&self) -> &'static str {
|
||||
use Api1ScalarType as A;
|
||||
match self {
|
||||
A::U8 => "uint8",
|
||||
A::U16 => "uint16",
|
||||
A::U32 => "uint32",
|
||||
A::U64 => "uint64",
|
||||
A::I8 => "int8",
|
||||
A::I16 => "int16",
|
||||
A::I32 => "int32",
|
||||
A::I64 => "int64",
|
||||
A::F32 => "float32",
|
||||
A::F64 => "float64",
|
||||
A::BOOL => "bool",
|
||||
A::STRING => "string",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_custom_variant_name() {
|
||||
let val = Api1ScalarType::F32;
|
||||
assert_eq!(format!("{val:?}"), "F32");
|
||||
assert_eq!(format!("{val}"), "float32");
|
||||
let s = serde_json::to_string(&val).unwrap();
|
||||
assert_eq!(s, "\"float32\"");
|
||||
}
|
||||
|
||||
impl fmt::Display for Api1ScalarType {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(fmt, "{}", self.to_str())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&ScalarType> for Api1ScalarType {
|
||||
fn from(k: &ScalarType) -> Self {
|
||||
use Api1ScalarType as B;
|
||||
use ScalarType as A;
|
||||
match k {
|
||||
A::U8 => B::U8,
|
||||
A::U16 => B::U16,
|
||||
A::U32 => B::U32,
|
||||
A::U64 => B::U64,
|
||||
A::I8 => B::I8,
|
||||
A::I16 => B::I16,
|
||||
A::I32 => B::I32,
|
||||
A::I64 => B::I64,
|
||||
A::F32 => B::F32,
|
||||
A::F64 => B::F64,
|
||||
A::BOOL => B::BOOL,
|
||||
A::STRING => B::STRING,
|
||||
// TODO treat enum as number only
|
||||
A::Enum => B::U16,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ScalarType> for Api1ScalarType {
|
||||
fn from(x: ScalarType) -> Self {
|
||||
(&x).into()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct Api1ChannelHeader {
|
||||
name: String,
|
||||
#[serde(rename = "type")]
|
||||
ty: Api1ScalarType,
|
||||
#[serde(rename = "byteOrder")]
|
||||
byte_order: Api1ByteOrder,
|
||||
#[serde(default)]
|
||||
shape: Vec<u32>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none", with = "serde_compression_method")]
|
||||
compression: Option<CompressionMethod>,
|
||||
}
|
||||
|
||||
impl Api1ChannelHeader {
|
||||
pub fn new(
|
||||
name: String,
|
||||
ty: Api1ScalarType,
|
||||
byte_order: Api1ByteOrder,
|
||||
shape: Shape,
|
||||
compression: Option<CompressionMethod>,
|
||||
) -> Self {
|
||||
Self {
|
||||
name,
|
||||
ty,
|
||||
byte_order,
|
||||
shape: shape.to_u32_vec(),
|
||||
compression,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
|
||||
pub fn ty(&self) -> Api1ScalarType {
|
||||
self.ty.clone()
|
||||
}
|
||||
}
|
||||
|
||||
mod serde_compression_method {
|
||||
use super::CompressionMethod;
|
||||
use serde::de;
|
||||
use serde::de::Visitor;
|
||||
use serde::Deserializer;
|
||||
use serde::Serializer;
|
||||
use std::fmt;
|
||||
|
||||
pub fn serialize<S>(v: &Option<CompressionMethod>, ser: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
match v {
|
||||
Some(v) => {
|
||||
let n = match v {
|
||||
CompressionMethod::BitshuffleLZ4 => 1,
|
||||
};
|
||||
ser.serialize_some(&n)
|
||||
}
|
||||
None => ser.serialize_none(),
|
||||
}
|
||||
}
|
||||
|
||||
struct VisC;
|
||||
|
||||
impl<'de> Visitor<'de> for VisC {
|
||||
type Value = Option<CompressionMethod>;
|
||||
|
||||
fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(fmt, "compression method index")
|
||||
}
|
||||
|
||||
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
|
||||
where
|
||||
E: de::Error,
|
||||
{
|
||||
match v {
|
||||
0 => Ok(None),
|
||||
1 => Ok(Some(CompressionMethod::BitshuffleLZ4)),
|
||||
_ => Err(de::Error::unknown_variant("compression variant index", &["0"])),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct Vis;
|
||||
|
||||
impl<'de> Visitor<'de> for Vis {
|
||||
type Value = Option<CompressionMethod>;
|
||||
|
||||
fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(fmt, "optional compression method index")
|
||||
}
|
||||
|
||||
fn visit_some<D>(self, de: D) -> Result<Self::Value, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
de.deserialize_u64(VisC)
|
||||
}
|
||||
|
||||
fn visit_none<E>(self) -> Result<Self::Value, E>
|
||||
where
|
||||
E: de::Error,
|
||||
{
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deserialize<'de, D>(de: D) -> Result<Option<CompressionMethod>, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
de.deserialize_option(Vis)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn basic_header_ser_00() {
|
||||
let h = Api1ChannelHeader {
|
||||
name: "Name".into(),
|
||||
ty: Api1ScalarType::F32,
|
||||
byte_order: Api1ByteOrder::Big,
|
||||
shape: Vec::new(),
|
||||
compression: None,
|
||||
};
|
||||
let js = serde_json::to_string(&h).unwrap();
|
||||
let vals = serde_json::from_str::<serde_json::Value>(&js).unwrap();
|
||||
let x = vals.as_object().unwrap().get("compression");
|
||||
assert_eq!(x, None)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn basic_header_ser_01() {
|
||||
let h = Api1ChannelHeader {
|
||||
name: "Name".into(),
|
||||
ty: Api1ScalarType::F32,
|
||||
byte_order: Api1ByteOrder::Big,
|
||||
shape: Vec::new(),
|
||||
compression: Some(CompressionMethod::BitshuffleLZ4),
|
||||
};
|
||||
let js = serde_json::to_string(&h).unwrap();
|
||||
let vals = serde_json::from_str::<serde_json::Value>(&js).unwrap();
|
||||
let x = vals.as_object().unwrap().get("compression").unwrap().as_i64();
|
||||
assert_eq!(x, Some(1))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn basic_header_deser_00() {
|
||||
let js = r#"{ "name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN" }"#;
|
||||
let h: Api1ChannelHeader = serde_json::from_str(js).unwrap();
|
||||
assert!(h.compression.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn basic_header_deser_01() {
|
||||
let js = r#"{ "name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN", "compression": null }"#;
|
||||
let h: Api1ChannelHeader = serde_json::from_str(js).unwrap();
|
||||
assert!(h.compression.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn basic_header_deser_02() {
|
||||
let js = r#"{ "name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN", "compression": 0 }"#;
|
||||
let h: Api1ChannelHeader = serde_json::from_str(js).unwrap();
|
||||
assert!(h.compression.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn basic_header_deser_03() {
|
||||
let js = r#"{ "name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN", "compression": 1 }"#;
|
||||
let h: Api1ChannelHeader = serde_json::from_str(js).unwrap();
|
||||
assert!(h.compression.is_some());
|
||||
assert_eq!(h.compression, Some(CompressionMethod::BitshuffleLZ4));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn basic_header_deser_04() {
|
||||
let js = r#"{ "name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN", "compression": 2 }"#;
|
||||
let res = serde_json::from_str::<Api1ChannelHeader>(js);
|
||||
assert!(res.is_err());
|
||||
}
|
||||
|
||||
// u32be length_1.
|
||||
// there is exactly length_1 more bytes in this message.
|
||||
// u8 mtype: 0: channel-header, 1: data
|
||||
|
||||
// for mtype == 0:
|
||||
// The rest is a JSON with the channel header.
|
||||
|
||||
// for mtype == 1:
|
||||
// u64be timestamp
|
||||
// u64be pulse
|
||||
// After that comes exactly (length_1 - 17) bytes of data.
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Header {
|
||||
header: Api1ChannelHeader,
|
||||
}
|
||||
|
||||
impl Header {
|
||||
pub fn header(&self) -> &Api1ChannelHeader {
|
||||
&self.header
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Data {
|
||||
ts: u64,
|
||||
pulse: u64,
|
||||
data: Vec<u8>,
|
||||
}
|
||||
|
||||
impl Data {
|
||||
pub fn ts(&self) -> u64 {
|
||||
self.ts
|
||||
}
|
||||
|
||||
pub fn pulse(&self) -> u64 {
|
||||
self.pulse
|
||||
}
|
||||
|
||||
pub fn data(&self) -> &[u8] {
|
||||
&self.data
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Api1Frame {
|
||||
Header(Header),
|
||||
Data(Data),
|
||||
}
|
||||
|
||||
fn fail_on_input<'a, T, E>(inp: &'a [u8]) -> Nres<T, E>
|
||||
where
|
||||
E: ParseError<&'a [u8]>,
|
||||
{
|
||||
let e = nom::error::ParseError::from_error_kind(inp, ErrorKind::Fail);
|
||||
IResult::Err(Err::Failure(e))
|
||||
}
|
||||
|
||||
fn header<'a, E>(inp: &'a [u8]) -> Nres<Header, E>
|
||||
where
|
||||
E: ParseError<&'a [u8]> + ContextError<&'a [u8]>,
|
||||
{
|
||||
match serde_json::from_slice(inp) {
|
||||
Ok(k) => {
|
||||
let k: Api1ChannelHeader = k;
|
||||
eprintln!("Parsed header OK: {k:?}");
|
||||
IResult::Ok((&inp[inp.len()..], Header { header: k }))
|
||||
}
|
||||
Err(e) => {
|
||||
let s = String::from_utf8_lossy(inp);
|
||||
error!("can not parse json: {e}\n{s:?}");
|
||||
context("json parse", fail_on_input)(inp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn data<'a, E>(inp: &'a [u8]) -> Nres<Data, E>
|
||||
where
|
||||
E: ParseError<&'a [u8]>,
|
||||
{
|
||||
if inp.len() < 16 {
|
||||
IResult::Err(Err::Incomplete(Needed::Size(NonZeroUsize::new(16).unwrap())))
|
||||
} else {
|
||||
let (inp, ts) = be_u64(inp)?;
|
||||
let (inp, pulse) = be_u64(inp)?;
|
||||
let (inp, data) = take(inp.len())(inp)?;
|
||||
let data = data.into();
|
||||
let res = Data { ts, pulse, data };
|
||||
IResult::Ok((inp, res))
|
||||
}
|
||||
}
|
||||
|
||||
fn api1_frame_complete<'a, E>(inp: &'a [u8]) -> Nres<Api1Frame, E>
|
||||
where
|
||||
E: ParseError<&'a [u8]> + ContextError<&'a [u8]>,
|
||||
{
|
||||
let (inp, mtype) = be_u8(inp)?;
|
||||
if mtype == 0 {
|
||||
let (inp, val) = header(inp)?;
|
||||
if inp.len() != 0 {
|
||||
context("header did not consume all bytes", fail_on_input)(inp)
|
||||
} else {
|
||||
let res = Api1Frame::Header(val);
|
||||
IResult::Ok((inp, res))
|
||||
}
|
||||
} else if mtype == 1 {
|
||||
let (inp, val) = data(inp)?;
|
||||
if inp.len() != 0 {
|
||||
context("data did not consume all bytes", fail_on_input)(inp)
|
||||
} else {
|
||||
let res = Api1Frame::Data(val);
|
||||
IResult::Ok((inp, res))
|
||||
}
|
||||
} else {
|
||||
let e = Err::Incomplete(Needed::Size(NonZeroUsize::new(1).unwrap()));
|
||||
IResult::Err(e)
|
||||
}
|
||||
}
|
||||
|
||||
fn api1_frame<'a, E>(inp: &'a [u8]) -> Nres<Api1Frame, E>
|
||||
where
|
||||
E: ParseError<&'a [u8]> + ContextError<&'a [u8]>,
|
||||
{
|
||||
let inp_orig = inp;
|
||||
let (inp, len) = be_u32(inp)?;
|
||||
if len < 1 {
|
||||
IResult::Err(Err::Failure(ParseError::from_error_kind(inp, ErrorKind::Fail)))
|
||||
} else {
|
||||
if inp.len() < len as usize + 4 {
|
||||
let e = Err::Incomplete(Needed::Size(NonZeroUsize::new(len as _).unwrap()));
|
||||
IResult::Err(e)
|
||||
} else {
|
||||
let (inp, payload) = nom::bytes::complete::take(len)(inp)?;
|
||||
let (inp, len2) = be_u32(inp)?;
|
||||
if len != len2 {
|
||||
IResult::Err(Err::Failure(ParseError::from_error_kind(inp_orig, ErrorKind::Fail)))
|
||||
} else {
|
||||
let (left, res) = api1_frame_complete(payload)?;
|
||||
if left.len() != 0 {
|
||||
context("frame did not consume all bytes", fail_on_input)(inp_orig)
|
||||
} else {
|
||||
IResult::Ok((inp, res))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn api1_frames<'a, E>(inp: &'a [u8]) -> Nres<Vec<Api1Frame>, E>
|
||||
where
|
||||
E: ParseError<&'a [u8]> + ContextError<&'a [u8]>,
|
||||
{
|
||||
many0(api1_frame)(inp)
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
fn verbose_err(inp: &[u8]) -> Nres<u32> {
|
||||
use nom::error::ErrorKind;
|
||||
use nom::error::ParseError;
|
||||
use nom::error::VerboseError;
|
||||
use nom::Err;
|
||||
let e = ParseError::from_error_kind(inp, ErrorKind::Fail);
|
||||
IResult::Err(Err::Failure(e))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn combinator_default_err() {
|
||||
be_u32::<_, nom::error::Error<_>>([1, 2, 3, 4].as_slice()).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_basic_frames() -> Result<(), err::Error> {
|
||||
use std::io::Write;
|
||||
let mut buf = Vec::new();
|
||||
let js = r#"{"name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN"}"#;
|
||||
buf.write(&(1 + js.as_bytes().len() as u32).to_be_bytes())?;
|
||||
buf.write(&[0])?;
|
||||
buf.write(js.as_bytes())?;
|
||||
|
||||
buf.write(&25u32.to_be_bytes())?;
|
||||
buf.write(&[1])?;
|
||||
buf.write(&20u64.to_be_bytes())?;
|
||||
buf.write(&21u64.to_be_bytes())?;
|
||||
buf.write(&5.123f64.to_be_bytes())?;
|
||||
|
||||
buf.write(&25u32.to_be_bytes())?;
|
||||
buf.write(&[1])?;
|
||||
buf.write(&22u64.to_be_bytes())?;
|
||||
buf.write(&23u64.to_be_bytes())?;
|
||||
buf.write(&7.88f64.to_be_bytes())?;
|
||||
|
||||
match api1_frames::<nom::error::Error<_>>(&buf) {
|
||||
Ok((_, frames)) => {
|
||||
assert_eq!(frames.len(), 3);
|
||||
}
|
||||
Err(e) => {
|
||||
panic!("can not parse result: {e}")
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
457
src/channelconfig.rs
Normal file
457
src/channelconfig.rs
Normal file
@@ -0,0 +1,457 @@
|
||||
use daqbuf_err as err;
|
||||
use err::*;
|
||||
use netpod::log::*;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
use netpod::timeunits::DAY;
|
||||
use netpod::timeunits::MS;
|
||||
use netpod::ByteOrder;
|
||||
use netpod::DtNano;
|
||||
use netpod::NodeConfigCached;
|
||||
use netpod::ScalarType;
|
||||
use netpod::SfDbChannel;
|
||||
use netpod::Shape;
|
||||
use netpod::TsNano;
|
||||
use nom::bytes::complete::take;
|
||||
use nom::number::complete::be_i16;
|
||||
use nom::number::complete::be_i32;
|
||||
use nom::number::complete::be_i64;
|
||||
use nom::number::complete::be_i8;
|
||||
use nom::number::complete::be_u8;
|
||||
use nom::Needed;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::fmt;
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[cstm(name = "ConfigParse")]
|
||||
pub enum ConfigParseError {
|
||||
NotSupportedOnNode,
|
||||
FileNotFound,
|
||||
PermissionDenied,
|
||||
IO,
|
||||
ParseError(String),
|
||||
NotSupported,
|
||||
}
|
||||
|
||||
impl<T: fmt::Debug> From<nom::Err<T>> for ConfigParseError {
|
||||
fn from(k: nom::Err<T>) -> Self {
|
||||
let msg = format!("nom::Err<T> {:?}", k);
|
||||
Self::ParseError(msg)
|
||||
}
|
||||
}
|
||||
|
||||
impl<I> nom::error::ParseError<I> for ConfigParseError {
|
||||
fn from_error_kind(_input: I, kind: nom::error::ErrorKind) -> Self {
|
||||
let msg = format!("ParseError kind {:?}", kind);
|
||||
Self::ParseError(msg)
|
||||
}
|
||||
|
||||
fn append(_input: I, kind: nom::error::ErrorKind, other: Self) -> Self {
|
||||
let msg = format!("ParseError kind {:?} other {:?}", kind, other);
|
||||
Self::ParseError(msg)
|
||||
}
|
||||
}
|
||||
|
||||
type NRes<'a, O> = nom::IResult<&'a [u8], O, ConfigParseError>;
|
||||
|
||||
fn mkerr<'a, S, O>(msg: S) -> NRes<'a, O>
|
||||
where
|
||||
S: Into<String>,
|
||||
{
|
||||
let e = ConfigParseError::ParseError(msg.into());
|
||||
Err(nom::Err::Error(e))
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub enum CompressionMethod {
|
||||
BitshuffleLZ4,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct ConfigEntry {
|
||||
pub ts: TsNano,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub ts_human: SystemTime,
|
||||
pub pulse: i64,
|
||||
pub ks: i32,
|
||||
pub bs: DtNano,
|
||||
pub split_count: i32,
|
||||
pub status: i32,
|
||||
pub bb: i8,
|
||||
pub modulo: i32,
|
||||
pub offset: i32,
|
||||
/*
|
||||
Precision:
|
||||
0 'default' whatever that is
|
||||
-7 f32
|
||||
-16 f64
|
||||
*/
|
||||
pub precision: i16,
|
||||
pub scalar_type: ScalarType,
|
||||
pub is_compressed: bool,
|
||||
pub is_shaped: bool,
|
||||
pub is_array: bool,
|
||||
pub byte_order: ByteOrder,
|
||||
pub compression_method: Option<CompressionMethod>,
|
||||
pub shape: Option<Vec<u32>>,
|
||||
pub source_name: Option<String>,
|
||||
pub unit: Option<String>,
|
||||
pub description: Option<String>,
|
||||
pub optional_fields: Option<String>,
|
||||
pub value_converter: Option<String>,
|
||||
}
|
||||
|
||||
impl ConfigEntry {
|
||||
pub fn to_shape(&self) -> Result<Shape, Error> {
|
||||
let ret = match &self.shape {
|
||||
Some(lens) => {
|
||||
if lens.len() == 1 {
|
||||
Shape::Wave(lens[0])
|
||||
} else if lens.len() == 2 {
|
||||
Shape::Image(lens[0], lens[1])
|
||||
} else {
|
||||
// TODO
|
||||
// Need a new Shape variant for images.
|
||||
return Err(Error::with_msg(format!("Channel config unsupported shape {:?}", self)))?;
|
||||
}
|
||||
}
|
||||
None => Shape::Scalar,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ChannelConfigs {
|
||||
pub format_version: i16,
|
||||
pub channel_name: String,
|
||||
pub entries: Vec<ConfigEntry>,
|
||||
}
|
||||
|
||||
fn parse_short_string(inp: &[u8]) -> NRes<Option<String>> {
|
||||
let (inp, len1) = be_i32(inp)?;
|
||||
if len1 == -1 {
|
||||
return Ok((inp, None));
|
||||
}
|
||||
if len1 < 4 {
|
||||
return mkerr(format!("bad string len {}", len1));
|
||||
}
|
||||
if len1 > 500 {
|
||||
return mkerr(format!("large string len {}", len1));
|
||||
}
|
||||
let (inp, snb) = take((len1 - 4) as usize)(inp)?;
|
||||
match String::from_utf8(snb.to_vec()) {
|
||||
Ok(s1) => Ok((inp, Some(s1))),
|
||||
Err(e) => mkerr(format!("{:?}", e)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parse_entry(inp: &[u8]) -> NRes<Option<ConfigEntry>> {
|
||||
let (inp, len1) = be_i32(inp)?;
|
||||
if len1 < 0 || len1 > 4000 {
|
||||
return mkerr(format!("ConfigEntry bad len1 {}", len1));
|
||||
}
|
||||
if inp.len() == 0 {
|
||||
return Ok((inp, None));
|
||||
}
|
||||
if inp.len() < len1 as usize - 4 {
|
||||
return Err(nom::Err::Incomplete(Needed::new(len1 as usize - 4)));
|
||||
}
|
||||
let inp_e = &inp[(len1 - 8) as usize..];
|
||||
let (inp, ts) = be_i64(inp)?;
|
||||
let (inp, pulse) = be_i64(inp)?;
|
||||
let (inp, ks) = be_i32(inp)?;
|
||||
let (inp, bs) = be_i64(inp)?;
|
||||
let bs = DtNano::from_ns(bs as u64 * MS);
|
||||
let (inp, split_count) = be_i32(inp)?;
|
||||
let (inp, status) = be_i32(inp)?;
|
||||
let (inp, bb) = be_i8(inp)?;
|
||||
let (inp, modulo) = be_i32(inp)?;
|
||||
let (inp, offset) = be_i32(inp)?;
|
||||
let (inp, precision) = be_i16(inp)?;
|
||||
let (inp, dtlen) = be_i32(inp)?;
|
||||
if dtlen > 100 {
|
||||
return mkerr(format!("unexpected data type len {}", dtlen));
|
||||
}
|
||||
let (inp, dtmask) = be_u8(inp)?;
|
||||
let is_compressed = dtmask & 0x80 != 0;
|
||||
let is_array = dtmask & 0x40 != 0;
|
||||
let byte_order = ByteOrder::from_dtype_flags(dtmask);
|
||||
let is_shaped = dtmask & 0x10 != 0;
|
||||
let (inp, dtype) = be_u8(inp)?;
|
||||
if dtype > 13 {
|
||||
return mkerr(format!("unexpected data type {}", dtype));
|
||||
}
|
||||
let scalar_type = match ScalarType::from_dtype_index(dtype) {
|
||||
Ok(k) => k,
|
||||
Err(e) => {
|
||||
return mkerr(format!("Can not convert {} to DType {:?}", dtype, e));
|
||||
}
|
||||
};
|
||||
let (inp, compression_method) = match is_compressed {
|
||||
false => (inp, None),
|
||||
true => {
|
||||
let (inp, cm) = be_u8(inp)?;
|
||||
match cm {
|
||||
0 => (inp, Some(CompressionMethod::BitshuffleLZ4)),
|
||||
_ => return mkerr(format!("unknown compression")),
|
||||
}
|
||||
}
|
||||
};
|
||||
let (inp, shape) = match is_shaped {
|
||||
false => (inp, None),
|
||||
true => {
|
||||
let (mut inp, dim) = be_u8(inp)?;
|
||||
if dim > 4 {
|
||||
return mkerr(format!("unexpected number of dimensions: {}", dim));
|
||||
}
|
||||
let mut shape = vec![];
|
||||
for _ in 0..dim {
|
||||
let t1 = be_i32(inp)?;
|
||||
inp = t1.0;
|
||||
shape.push(t1.1 as u32);
|
||||
}
|
||||
(inp, Some(shape))
|
||||
}
|
||||
};
|
||||
let (inp, source_name) = parse_short_string(inp)?;
|
||||
let (inp, unit) = parse_short_string(inp)?;
|
||||
let (inp, description) = parse_short_string(inp)?;
|
||||
let (inp, optional_fields) = parse_short_string(inp)?;
|
||||
let (inp, value_converter) = parse_short_string(inp)?;
|
||||
assert_eq!(inp.len(), inp_e.len());
|
||||
let (inp_e, len2) = be_i32(inp_e)?;
|
||||
if len1 != len2 {
|
||||
return mkerr(format!("mismatch len1 {} len2 {}", len1, len2));
|
||||
}
|
||||
Ok((
|
||||
inp_e,
|
||||
Some(ConfigEntry {
|
||||
ts: TsNano::from_ns(ts as u64),
|
||||
ts_human: SystemTime::UNIX_EPOCH + Duration::from_nanos(ts as u64),
|
||||
pulse,
|
||||
ks,
|
||||
bs,
|
||||
split_count: split_count,
|
||||
status,
|
||||
bb,
|
||||
modulo,
|
||||
offset,
|
||||
precision,
|
||||
scalar_type,
|
||||
is_compressed: is_compressed,
|
||||
is_array: is_array,
|
||||
is_shaped: is_shaped,
|
||||
byte_order,
|
||||
compression_method: compression_method,
|
||||
shape,
|
||||
source_name: source_name,
|
||||
unit,
|
||||
description,
|
||||
optional_fields: optional_fields,
|
||||
value_converter: value_converter,
|
||||
}),
|
||||
))
|
||||
}
|
||||
|
||||
/// Parse a complete configuration file from given in-memory input buffer.
|
||||
fn parse_config_inner(inp: &[u8]) -> NRes<ChannelConfigs> {
|
||||
let (inp, ver) = be_i16(inp)?;
|
||||
let (inp, len1) = be_i32(inp)?;
|
||||
if len1 <= 8 || len1 > 500 {
|
||||
return mkerr(format!("no channel name. len1 {}", len1));
|
||||
}
|
||||
let (inp, chn) = take((len1 - 8) as usize)(inp)?;
|
||||
let channel_name = match String::from_utf8(chn.to_vec()) {
|
||||
Ok(k) => k,
|
||||
Err(e) => {
|
||||
return mkerr(format!("channelName utf8 error {:?}", e));
|
||||
}
|
||||
};
|
||||
let (inp, len2) = be_i32(inp)?;
|
||||
if len1 != len2 {
|
||||
return mkerr(format!("Mismatch len1 {} len2 {}", len1, len2));
|
||||
}
|
||||
let mut entries = Vec::new();
|
||||
let mut inp_a = inp;
|
||||
while inp_a.len() > 0 {
|
||||
let inp = inp_a;
|
||||
let (inp, e) = parse_entry(inp)?;
|
||||
if let Some(e) = e {
|
||||
entries.push(e);
|
||||
}
|
||||
inp_a = inp;
|
||||
}
|
||||
// Do not sort the parsed config entries.
|
||||
// We want to deliver the actual order which is found on disk.
|
||||
// Important for troubleshooting.
|
||||
let ret = ChannelConfigs {
|
||||
format_version: ver,
|
||||
channel_name,
|
||||
entries,
|
||||
};
|
||||
Ok((inp, ret))
|
||||
}
|
||||
|
||||
pub fn parse_config(inp: &[u8]) -> Result<ChannelConfigs, ConfigParseError> {
|
||||
let (_inp, ret) = parse_config_inner(inp).map_err(|e| ConfigParseError::ParseError(e.to_string()))?;
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum MatchingConfigEntry<'a> {
|
||||
None,
|
||||
Single(&'a ConfigEntry),
|
||||
// In this case, we only return the entry which best matches to the time range
|
||||
Multiple(&'a ConfigEntry),
|
||||
}
|
||||
|
||||
impl<'a> MatchingConfigEntry<'a> {
|
||||
pub fn best(&self) -> Option<&ConfigEntry> {
|
||||
match self {
|
||||
MatchingConfigEntry::None => None,
|
||||
MatchingConfigEntry::Single(e) => Some(e),
|
||||
MatchingConfigEntry::Multiple(e) => Some(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn extract_matching_config_entry<'a>(
|
||||
range: &NanoRange,
|
||||
channel_config: &'a ChannelConfigs,
|
||||
) -> Result<MatchingConfigEntry<'a>, ConfigParseError> {
|
||||
const DO_DEBUG: bool = false;
|
||||
if DO_DEBUG {
|
||||
debug!("extract_matching_config_entry range {range:?}");
|
||||
}
|
||||
let mut a: Vec<_> = channel_config.entries.iter().enumerate().map(|(i, x)| (i, x)).collect();
|
||||
a.sort_unstable_by_key(|(_, x)| x.ts.ns());
|
||||
let a = a;
|
||||
|
||||
if DO_DEBUG {
|
||||
debug!("------------------------------------------------------------------");
|
||||
for x in &a {
|
||||
debug!("SORTED {:3} {:?}", x.0, x.1.ks);
|
||||
}
|
||||
}
|
||||
|
||||
let b: Vec<_> = a
|
||||
.into_iter()
|
||||
.rev()
|
||||
.map({
|
||||
let mut last = None;
|
||||
move |(i, x)| {
|
||||
let k = last.clone();
|
||||
last = Some(x.ts.clone());
|
||||
(i, x, k)
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
if DO_DEBUG {
|
||||
debug!("------------------------------------------------------------------");
|
||||
for x in &b {
|
||||
debug!("NEIGHB {:3} {:?} {:?}", x.0, x.1.ks, x.2);
|
||||
}
|
||||
}
|
||||
let c: Vec<_> = b
|
||||
.into_iter()
|
||||
.rev()
|
||||
.map(|(i, e, tsn)| {
|
||||
if let Some(ts2) = tsn.clone() {
|
||||
if e.ts.ns() < range.end() {
|
||||
let p = if e.ts.ns() < range.beg() {
|
||||
range.beg()
|
||||
} else {
|
||||
e.ts.ns()
|
||||
};
|
||||
let q = if ts2.ns() < range.beg() {
|
||||
range.beg()
|
||||
} else {
|
||||
if ts2.ns() < range.end() {
|
||||
ts2.ns()
|
||||
} else {
|
||||
range.end()
|
||||
}
|
||||
};
|
||||
(i, DtNano::from_ns(q - p), e)
|
||||
} else {
|
||||
(i, DtNano::from_ns(0), e)
|
||||
}
|
||||
} else {
|
||||
if e.ts.ns() < range.end() {
|
||||
if e.ts.ns() < range.beg() {
|
||||
(i, DtNano::from_ns(range.delta()), e)
|
||||
} else {
|
||||
(i, DtNano::from_ns(range.end() - e.ts.ns()), e)
|
||||
}
|
||||
} else {
|
||||
(i, DtNano::from_ns(0), e)
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
if DO_DEBUG {
|
||||
debug!("------------------------------------------------------------------");
|
||||
for (i, dt, e) in &c {
|
||||
debug!("WEIGHT {:3} {:?} {:?} {:?}", i, dt, e.ks, e.ts);
|
||||
}
|
||||
}
|
||||
|
||||
let mut c = c;
|
||||
c.sort_unstable_by_key(|(_, dt, _)| u64::MAX - dt.ns());
|
||||
let c = c;
|
||||
|
||||
if DO_DEBUG {
|
||||
debug!("------------------------------------------------------------------");
|
||||
for (i, dt, e) in &c {
|
||||
debug!("WEISOR {:3} {:?} {:?} {:?}", i, dt, e.ks, e.ts);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(&(i, _, _)) = c.first() {
|
||||
Ok(MatchingConfigEntry::Single(&channel_config.entries[i]))
|
||||
} else {
|
||||
Ok(MatchingConfigEntry::None)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::parse_config;
|
||||
|
||||
fn read_data() -> Vec<u8> {
|
||||
use std::io::Read;
|
||||
//let path = "ks/config/S10CB01-RLOD100-PUP10:SIG-AMPLT/latest/00000_Config";
|
||||
let cwd = std::env::current_dir();
|
||||
netpod::log::info!("CWD: {:?}", cwd);
|
||||
let path = "../resources/sf-daqbuf-33-S10CB01-RLOD100-PUP10:SIG-AMPLT-latest-00000_Config";
|
||||
//let path = "../resources/sf-daqbuf-21-S10CB01-RLOD100-PUP10:SIG-AMPLT-latest-00000_Config";
|
||||
let mut f1 = std::fs::File::open(path).unwrap();
|
||||
let mut buf = Vec::new();
|
||||
f1.read_to_end(&mut buf).unwrap();
|
||||
buf
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_dummy() {
|
||||
let config = parse_config(&[0, 0, 0, 0, 0, 11, 0x61, 0x62, 0x63, 0, 0, 0, 11, 0, 0, 0, 1]).unwrap();
|
||||
assert_eq!(0, config.format_version);
|
||||
assert_eq!("abc", config.channel_name);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn open_file() {
|
||||
let config = parse_config(&read_data()).unwrap();
|
||||
assert_eq!(config.format_version, 0);
|
||||
assert_eq!(config.entries.len(), 18);
|
||||
for e in &config.entries {
|
||||
assert!(e.ts.ns() >= 631152000000000000);
|
||||
assert!(e.ts.ns() <= 1613640673424172164);
|
||||
assert!(e.shape.is_some());
|
||||
}
|
||||
}
|
||||
}
|
||||
32
src/jsonconf.rs
Normal file
32
src/jsonconf.rs
Normal file
@@ -0,0 +1,32 @@
|
||||
#[test]
|
||||
fn test_json_trailing() {
|
||||
use serde::Deserialize;
|
||||
use serde_json::Value as JsonValue;
|
||||
use std::io::Cursor;
|
||||
if serde_json::from_str::<JsonValue>(r#"{}."#).is_ok() {
|
||||
panic!("Should fail because of trailing character");
|
||||
}
|
||||
let cur = Cursor::new(r#"{}..."#);
|
||||
let mut de = serde_json::Deserializer::from_reader(cur);
|
||||
if JsonValue::deserialize(&mut de).is_err() {
|
||||
panic!("Should allow trailing characters")
|
||||
}
|
||||
let cur = Cursor::new(r#"nullA"#);
|
||||
let mut de = serde_json::Deserializer::from_reader(cur);
|
||||
if let Ok(val) = JsonValue::deserialize(&mut de) {
|
||||
if val != serde_json::json!(null) {
|
||||
panic!("Bad parse")
|
||||
}
|
||||
} else {
|
||||
panic!("Should allow trailing characters")
|
||||
}
|
||||
let cur = Cursor::new(r#" {}AA"#);
|
||||
let mut de = serde_json::Deserializer::from_reader(cur);
|
||||
if let Ok(val) = JsonValue::deserialize(&mut de) {
|
||||
if val != serde_json::json!({}) {
|
||||
panic!("Bad parse")
|
||||
}
|
||||
} else {
|
||||
panic!("Should allow trailing characters")
|
||||
}
|
||||
}
|
||||
5
src/lib.rs
Normal file
5
src/lib.rs
Normal file
@@ -0,0 +1,5 @@
|
||||
pub mod api1_parse;
|
||||
pub mod channelconfig;
|
||||
pub use nom;
|
||||
|
||||
mod jsonconf;
|
||||
Reference in New Issue
Block a user