From 4a8809c2fd1b5f01e6f813ac43a3148e14f61872 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 7 Nov 2024 21:09:29 +0100 Subject: [PATCH] Factored into separate crate --- .gitignore | 2 + Cargo.toml | 20 ++ src/api1_parse.rs | 507 +++++++++++++++++++++++++++++++++++++++++++ src/channelconfig.rs | 457 ++++++++++++++++++++++++++++++++++++++ src/jsonconf.rs | 32 +++ src/lib.rs | 5 + 6 files changed, 1023 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 src/api1_parse.rs create mode 100644 src/channelconfig.rs create mode 100644 src/jsonconf.rs create mode 100644 src/lib.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1b72444 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/Cargo.lock +/target diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..9c110f6 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "daqbuf-parse" +version = "0.0.3" +authors = ["Dominik Werder "] +edition = "2021" + +[dependencies] +serde = { version = "1", features = ["derive"] } +serde_json = "1" +humantime-serde = "1.1" +chrono = { version = "0.4.26", features = ["serde"] } +bytes = "1.8" +byteorder = "1.4" +hex = "0.4.3" +nom = "7.1.3" +daqbuf-err = { path = "../daqbuf-err" } +netpod = { path = "../daqbuf-netpod", package = "daqbuf-netpod" } + +[patch.crates-io] +thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" } diff --git a/src/api1_parse.rs b/src/api1_parse.rs new file mode 100644 index 0000000..99fcfa2 --- /dev/null +++ b/src/api1_parse.rs @@ -0,0 +1,507 @@ +use crate::channelconfig::CompressionMethod; +use crate::nom; +use daqbuf_err as err; +use netpod::log::*; +use netpod::ScalarType; +use netpod::Shape; +use nom::bytes::complete::take; +use nom::error::context; +use nom::error::ContextError; +use nom::error::ErrorKind; +use nom::error::ParseError; +use nom::multi::many0; +use nom::number::complete::be_u32; +use nom::number::complete::be_u64; +use nom::number::complete::be_u8; +use nom::Err; +use nom::IResult; +use nom::Needed; +use serde::Deserialize; +use serde::Serialize; +use std::fmt; +use std::num::NonZeroUsize; + +type Nres<'a, O, E = nom::error::Error<&'a [u8]>> = Result<(&'a [u8], O), nom::Err>; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum Api1ByteOrder { + #[serde(rename = "LITTLE_ENDIAN")] + Little, + #[serde(rename = "BIG_ENDIAN")] + Big, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum Api1ScalarType { + #[serde(rename = "uint8")] + U8, + #[serde(rename = "uint16")] + U16, + #[serde(rename = "uint32")] + U32, + #[serde(rename = "uint64")] + U64, + #[serde(rename = "int8")] + I8, + #[serde(rename = "int16")] + I16, + #[serde(rename = "int32")] + I32, + #[serde(rename = "int64")] + I64, + #[serde(rename = "float32")] + F32, + #[serde(rename = "float64")] + F64, + #[serde(rename = "bool")] + BOOL, + #[serde(rename = "string")] + STRING, +} + +impl Api1ScalarType { + pub fn to_str(&self) -> &'static str { + use Api1ScalarType as A; + match self { + A::U8 => "uint8", + A::U16 => "uint16", + A::U32 => "uint32", + A::U64 => "uint64", + A::I8 => "int8", + A::I16 => "int16", + A::I32 => "int32", + A::I64 => "int64", + A::F32 => "float32", + A::F64 => "float64", + A::BOOL => "bool", + A::STRING => "string", + } + } +} + +#[test] +fn test_custom_variant_name() { + let val = Api1ScalarType::F32; + assert_eq!(format!("{val:?}"), "F32"); + assert_eq!(format!("{val}"), "float32"); + let s = serde_json::to_string(&val).unwrap(); + assert_eq!(s, "\"float32\""); +} + +impl fmt::Display for Api1ScalarType { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.to_str()) + } +} + +impl From<&ScalarType> for Api1ScalarType { + fn from(k: &ScalarType) -> Self { + use Api1ScalarType as B; + use ScalarType as A; + match k { + A::U8 => B::U8, + A::U16 => B::U16, + A::U32 => B::U32, + A::U64 => B::U64, + A::I8 => B::I8, + A::I16 => B::I16, + A::I32 => B::I32, + A::I64 => B::I64, + A::F32 => B::F32, + A::F64 => B::F64, + A::BOOL => B::BOOL, + A::STRING => B::STRING, + // TODO treat enum as number only + A::Enum => B::U16, + } + } +} + +impl From for Api1ScalarType { + fn from(x: ScalarType) -> Self { + (&x).into() + } +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct Api1ChannelHeader { + name: String, + #[serde(rename = "type")] + ty: Api1ScalarType, + #[serde(rename = "byteOrder")] + byte_order: Api1ByteOrder, + #[serde(default)] + shape: Vec, + #[serde(default, skip_serializing_if = "Option::is_none", with = "serde_compression_method")] + compression: Option, +} + +impl Api1ChannelHeader { + pub fn new( + name: String, + ty: Api1ScalarType, + byte_order: Api1ByteOrder, + shape: Shape, + compression: Option, + ) -> Self { + Self { + name, + ty, + byte_order, + shape: shape.to_u32_vec(), + compression, + } + } + + pub fn name(&self) -> &str { + &self.name + } + + pub fn ty(&self) -> Api1ScalarType { + self.ty.clone() + } +} + +mod serde_compression_method { + use super::CompressionMethod; + use serde::de; + use serde::de::Visitor; + use serde::Deserializer; + use serde::Serializer; + use std::fmt; + + pub fn serialize(v: &Option, ser: S) -> Result + where + S: Serializer, + { + match v { + Some(v) => { + let n = match v { + CompressionMethod::BitshuffleLZ4 => 1, + }; + ser.serialize_some(&n) + } + None => ser.serialize_none(), + } + } + + struct VisC; + + impl<'de> Visitor<'de> for VisC { + type Value = Option; + + fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "compression method index") + } + + fn visit_u64(self, v: u64) -> Result + where + E: de::Error, + { + match v { + 0 => Ok(None), + 1 => Ok(Some(CompressionMethod::BitshuffleLZ4)), + _ => Err(de::Error::unknown_variant("compression variant index", &["0"])), + } + } + } + + struct Vis; + + impl<'de> Visitor<'de> for Vis { + type Value = Option; + + fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "optional compression method index") + } + + fn visit_some(self, de: D) -> Result + where + D: Deserializer<'de>, + { + de.deserialize_u64(VisC) + } + + fn visit_none(self) -> Result + where + E: de::Error, + { + Ok(None) + } + } + + pub fn deserialize<'de, D>(de: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + de.deserialize_option(Vis) + } +} + +#[test] +fn basic_header_ser_00() { + let h = Api1ChannelHeader { + name: "Name".into(), + ty: Api1ScalarType::F32, + byte_order: Api1ByteOrder::Big, + shape: Vec::new(), + compression: None, + }; + let js = serde_json::to_string(&h).unwrap(); + let vals = serde_json::from_str::(&js).unwrap(); + let x = vals.as_object().unwrap().get("compression"); + assert_eq!(x, None) +} + +#[test] +fn basic_header_ser_01() { + let h = Api1ChannelHeader { + name: "Name".into(), + ty: Api1ScalarType::F32, + byte_order: Api1ByteOrder::Big, + shape: Vec::new(), + compression: Some(CompressionMethod::BitshuffleLZ4), + }; + let js = serde_json::to_string(&h).unwrap(); + let vals = serde_json::from_str::(&js).unwrap(); + let x = vals.as_object().unwrap().get("compression").unwrap().as_i64(); + assert_eq!(x, Some(1)) +} + +#[test] +fn basic_header_deser_00() { + let js = r#"{ "name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN" }"#; + let h: Api1ChannelHeader = serde_json::from_str(js).unwrap(); + assert!(h.compression.is_none()); +} + +#[test] +fn basic_header_deser_01() { + let js = r#"{ "name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN", "compression": null }"#; + let h: Api1ChannelHeader = serde_json::from_str(js).unwrap(); + assert!(h.compression.is_none()); +} + +#[test] +fn basic_header_deser_02() { + let js = r#"{ "name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN", "compression": 0 }"#; + let h: Api1ChannelHeader = serde_json::from_str(js).unwrap(); + assert!(h.compression.is_none()); +} + +#[test] +fn basic_header_deser_03() { + let js = r#"{ "name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN", "compression": 1 }"#; + let h: Api1ChannelHeader = serde_json::from_str(js).unwrap(); + assert!(h.compression.is_some()); + assert_eq!(h.compression, Some(CompressionMethod::BitshuffleLZ4)); +} + +#[test] +fn basic_header_deser_04() { + let js = r#"{ "name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN", "compression": 2 }"#; + let res = serde_json::from_str::(js); + assert!(res.is_err()); +} + +// u32be length_1. +// there is exactly length_1 more bytes in this message. +// u8 mtype: 0: channel-header, 1: data + +// for mtype == 0: +// The rest is a JSON with the channel header. + +// for mtype == 1: +// u64be timestamp +// u64be pulse +// After that comes exactly (length_1 - 17) bytes of data. + +#[derive(Debug)] +pub struct Header { + header: Api1ChannelHeader, +} + +impl Header { + pub fn header(&self) -> &Api1ChannelHeader { + &self.header + } +} + +#[derive(Debug)] +pub struct Data { + ts: u64, + pulse: u64, + data: Vec, +} + +impl Data { + pub fn ts(&self) -> u64 { + self.ts + } + + pub fn pulse(&self) -> u64 { + self.pulse + } + + pub fn data(&self) -> &[u8] { + &self.data + } +} + +#[derive(Debug)] +pub enum Api1Frame { + Header(Header), + Data(Data), +} + +fn fail_on_input<'a, T, E>(inp: &'a [u8]) -> Nres +where + E: ParseError<&'a [u8]>, +{ + let e = nom::error::ParseError::from_error_kind(inp, ErrorKind::Fail); + IResult::Err(Err::Failure(e)) +} + +fn header<'a, E>(inp: &'a [u8]) -> Nres +where + E: ParseError<&'a [u8]> + ContextError<&'a [u8]>, +{ + match serde_json::from_slice(inp) { + Ok(k) => { + let k: Api1ChannelHeader = k; + eprintln!("Parsed header OK: {k:?}"); + IResult::Ok((&inp[inp.len()..], Header { header: k })) + } + Err(e) => { + let s = String::from_utf8_lossy(inp); + error!("can not parse json: {e}\n{s:?}"); + context("json parse", fail_on_input)(inp) + } + } +} + +fn data<'a, E>(inp: &'a [u8]) -> Nres +where + E: ParseError<&'a [u8]>, +{ + if inp.len() < 16 { + IResult::Err(Err::Incomplete(Needed::Size(NonZeroUsize::new(16).unwrap()))) + } else { + let (inp, ts) = be_u64(inp)?; + let (inp, pulse) = be_u64(inp)?; + let (inp, data) = take(inp.len())(inp)?; + let data = data.into(); + let res = Data { ts, pulse, data }; + IResult::Ok((inp, res)) + } +} + +fn api1_frame_complete<'a, E>(inp: &'a [u8]) -> Nres +where + E: ParseError<&'a [u8]> + ContextError<&'a [u8]>, +{ + let (inp, mtype) = be_u8(inp)?; + if mtype == 0 { + let (inp, val) = header(inp)?; + if inp.len() != 0 { + context("header did not consume all bytes", fail_on_input)(inp) + } else { + let res = Api1Frame::Header(val); + IResult::Ok((inp, res)) + } + } else if mtype == 1 { + let (inp, val) = data(inp)?; + if inp.len() != 0 { + context("data did not consume all bytes", fail_on_input)(inp) + } else { + let res = Api1Frame::Data(val); + IResult::Ok((inp, res)) + } + } else { + let e = Err::Incomplete(Needed::Size(NonZeroUsize::new(1).unwrap())); + IResult::Err(e) + } +} + +fn api1_frame<'a, E>(inp: &'a [u8]) -> Nres +where + E: ParseError<&'a [u8]> + ContextError<&'a [u8]>, +{ + let inp_orig = inp; + let (inp, len) = be_u32(inp)?; + if len < 1 { + IResult::Err(Err::Failure(ParseError::from_error_kind(inp, ErrorKind::Fail))) + } else { + if inp.len() < len as usize + 4 { + let e = Err::Incomplete(Needed::Size(NonZeroUsize::new(len as _).unwrap())); + IResult::Err(e) + } else { + let (inp, payload) = nom::bytes::complete::take(len)(inp)?; + let (inp, len2) = be_u32(inp)?; + if len != len2 { + IResult::Err(Err::Failure(ParseError::from_error_kind(inp_orig, ErrorKind::Fail))) + } else { + let (left, res) = api1_frame_complete(payload)?; + if left.len() != 0 { + context("frame did not consume all bytes", fail_on_input)(inp_orig) + } else { + IResult::Ok((inp, res)) + } + } + } + } +} + +pub fn api1_frames<'a, E>(inp: &'a [u8]) -> Nres, E> +where + E: ParseError<&'a [u8]> + ContextError<&'a [u8]>, +{ + many0(api1_frame)(inp) +} + +#[allow(unused)] +fn verbose_err(inp: &[u8]) -> Nres { + use nom::error::ErrorKind; + use nom::error::ParseError; + use nom::error::VerboseError; + use nom::Err; + let e = ParseError::from_error_kind(inp, ErrorKind::Fail); + IResult::Err(Err::Failure(e)) +} + +#[test] +fn combinator_default_err() { + be_u32::<_, nom::error::Error<_>>([1, 2, 3, 4].as_slice()).unwrap(); +} + +#[test] +fn test_basic_frames() -> Result<(), err::Error> { + use std::io::Write; + let mut buf = Vec::new(); + let js = r#"{"name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN"}"#; + buf.write(&(1 + js.as_bytes().len() as u32).to_be_bytes())?; + buf.write(&[0])?; + buf.write(js.as_bytes())?; + + buf.write(&25u32.to_be_bytes())?; + buf.write(&[1])?; + buf.write(&20u64.to_be_bytes())?; + buf.write(&21u64.to_be_bytes())?; + buf.write(&5.123f64.to_be_bytes())?; + + buf.write(&25u32.to_be_bytes())?; + buf.write(&[1])?; + buf.write(&22u64.to_be_bytes())?; + buf.write(&23u64.to_be_bytes())?; + buf.write(&7.88f64.to_be_bytes())?; + + match api1_frames::>(&buf) { + Ok((_, frames)) => { + assert_eq!(frames.len(), 3); + } + Err(e) => { + panic!("can not parse result: {e}") + } + }; + Ok(()) +} diff --git a/src/channelconfig.rs b/src/channelconfig.rs new file mode 100644 index 0000000..73611f1 --- /dev/null +++ b/src/channelconfig.rs @@ -0,0 +1,457 @@ +use daqbuf_err as err; +use err::*; +use netpod::log::*; +use netpod::range::evrange::NanoRange; +use netpod::timeunits::DAY; +use netpod::timeunits::MS; +use netpod::ByteOrder; +use netpod::DtNano; +use netpod::NodeConfigCached; +use netpod::ScalarType; +use netpod::SfDbChannel; +use netpod::Shape; +use netpod::TsNano; +use nom::bytes::complete::take; +use nom::number::complete::be_i16; +use nom::number::complete::be_i32; +use nom::number::complete::be_i64; +use nom::number::complete::be_i8; +use nom::number::complete::be_u8; +use nom::Needed; +use serde::Deserialize; +use serde::Serialize; +use std::fmt; +use std::time::Duration; +use std::time::SystemTime; + +#[derive(Debug, ThisError)] +#[cstm(name = "ConfigParse")] +pub enum ConfigParseError { + NotSupportedOnNode, + FileNotFound, + PermissionDenied, + IO, + ParseError(String), + NotSupported, +} + +impl From> for ConfigParseError { + fn from(k: nom::Err) -> Self { + let msg = format!("nom::Err {:?}", k); + Self::ParseError(msg) + } +} + +impl nom::error::ParseError for ConfigParseError { + fn from_error_kind(_input: I, kind: nom::error::ErrorKind) -> Self { + let msg = format!("ParseError kind {:?}", kind); + Self::ParseError(msg) + } + + fn append(_input: I, kind: nom::error::ErrorKind, other: Self) -> Self { + let msg = format!("ParseError kind {:?} other {:?}", kind, other); + Self::ParseError(msg) + } +} + +type NRes<'a, O> = nom::IResult<&'a [u8], O, ConfigParseError>; + +fn mkerr<'a, S, O>(msg: S) -> NRes<'a, O> +where + S: Into, +{ + let e = ConfigParseError::ParseError(msg.into()); + Err(nom::Err::Error(e)) +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum CompressionMethod { + BitshuffleLZ4, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ConfigEntry { + pub ts: TsNano, + #[serde(with = "humantime_serde")] + pub ts_human: SystemTime, + pub pulse: i64, + pub ks: i32, + pub bs: DtNano, + pub split_count: i32, + pub status: i32, + pub bb: i8, + pub modulo: i32, + pub offset: i32, + /* + Precision: + 0 'default' whatever that is + -7 f32 + -16 f64 + */ + pub precision: i16, + pub scalar_type: ScalarType, + pub is_compressed: bool, + pub is_shaped: bool, + pub is_array: bool, + pub byte_order: ByteOrder, + pub compression_method: Option, + pub shape: Option>, + pub source_name: Option, + pub unit: Option, + pub description: Option, + pub optional_fields: Option, + pub value_converter: Option, +} + +impl ConfigEntry { + pub fn to_shape(&self) -> Result { + let ret = match &self.shape { + Some(lens) => { + if lens.len() == 1 { + Shape::Wave(lens[0]) + } else if lens.len() == 2 { + Shape::Image(lens[0], lens[1]) + } else { + // TODO + // Need a new Shape variant for images. + return Err(Error::with_msg(format!("Channel config unsupported shape {:?}", self)))?; + } + } + None => Shape::Scalar, + }; + Ok(ret) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChannelConfigs { + pub format_version: i16, + pub channel_name: String, + pub entries: Vec, +} + +fn parse_short_string(inp: &[u8]) -> NRes> { + let (inp, len1) = be_i32(inp)?; + if len1 == -1 { + return Ok((inp, None)); + } + if len1 < 4 { + return mkerr(format!("bad string len {}", len1)); + } + if len1 > 500 { + return mkerr(format!("large string len {}", len1)); + } + let (inp, snb) = take((len1 - 4) as usize)(inp)?; + match String::from_utf8(snb.to_vec()) { + Ok(s1) => Ok((inp, Some(s1))), + Err(e) => mkerr(format!("{:?}", e)), + } +} + +pub fn parse_entry(inp: &[u8]) -> NRes> { + let (inp, len1) = be_i32(inp)?; + if len1 < 0 || len1 > 4000 { + return mkerr(format!("ConfigEntry bad len1 {}", len1)); + } + if inp.len() == 0 { + return Ok((inp, None)); + } + if inp.len() < len1 as usize - 4 { + return Err(nom::Err::Incomplete(Needed::new(len1 as usize - 4))); + } + let inp_e = &inp[(len1 - 8) as usize..]; + let (inp, ts) = be_i64(inp)?; + let (inp, pulse) = be_i64(inp)?; + let (inp, ks) = be_i32(inp)?; + let (inp, bs) = be_i64(inp)?; + let bs = DtNano::from_ns(bs as u64 * MS); + let (inp, split_count) = be_i32(inp)?; + let (inp, status) = be_i32(inp)?; + let (inp, bb) = be_i8(inp)?; + let (inp, modulo) = be_i32(inp)?; + let (inp, offset) = be_i32(inp)?; + let (inp, precision) = be_i16(inp)?; + let (inp, dtlen) = be_i32(inp)?; + if dtlen > 100 { + return mkerr(format!("unexpected data type len {}", dtlen)); + } + let (inp, dtmask) = be_u8(inp)?; + let is_compressed = dtmask & 0x80 != 0; + let is_array = dtmask & 0x40 != 0; + let byte_order = ByteOrder::from_dtype_flags(dtmask); + let is_shaped = dtmask & 0x10 != 0; + let (inp, dtype) = be_u8(inp)?; + if dtype > 13 { + return mkerr(format!("unexpected data type {}", dtype)); + } + let scalar_type = match ScalarType::from_dtype_index(dtype) { + Ok(k) => k, + Err(e) => { + return mkerr(format!("Can not convert {} to DType {:?}", dtype, e)); + } + }; + let (inp, compression_method) = match is_compressed { + false => (inp, None), + true => { + let (inp, cm) = be_u8(inp)?; + match cm { + 0 => (inp, Some(CompressionMethod::BitshuffleLZ4)), + _ => return mkerr(format!("unknown compression")), + } + } + }; + let (inp, shape) = match is_shaped { + false => (inp, None), + true => { + let (mut inp, dim) = be_u8(inp)?; + if dim > 4 { + return mkerr(format!("unexpected number of dimensions: {}", dim)); + } + let mut shape = vec![]; + for _ in 0..dim { + let t1 = be_i32(inp)?; + inp = t1.0; + shape.push(t1.1 as u32); + } + (inp, Some(shape)) + } + }; + let (inp, source_name) = parse_short_string(inp)?; + let (inp, unit) = parse_short_string(inp)?; + let (inp, description) = parse_short_string(inp)?; + let (inp, optional_fields) = parse_short_string(inp)?; + let (inp, value_converter) = parse_short_string(inp)?; + assert_eq!(inp.len(), inp_e.len()); + let (inp_e, len2) = be_i32(inp_e)?; + if len1 != len2 { + return mkerr(format!("mismatch len1 {} len2 {}", len1, len2)); + } + Ok(( + inp_e, + Some(ConfigEntry { + ts: TsNano::from_ns(ts as u64), + ts_human: SystemTime::UNIX_EPOCH + Duration::from_nanos(ts as u64), + pulse, + ks, + bs, + split_count: split_count, + status, + bb, + modulo, + offset, + precision, + scalar_type, + is_compressed: is_compressed, + is_array: is_array, + is_shaped: is_shaped, + byte_order, + compression_method: compression_method, + shape, + source_name: source_name, + unit, + description, + optional_fields: optional_fields, + value_converter: value_converter, + }), + )) +} + +/// Parse a complete configuration file from given in-memory input buffer. +fn parse_config_inner(inp: &[u8]) -> NRes { + let (inp, ver) = be_i16(inp)?; + let (inp, len1) = be_i32(inp)?; + if len1 <= 8 || len1 > 500 { + return mkerr(format!("no channel name. len1 {}", len1)); + } + let (inp, chn) = take((len1 - 8) as usize)(inp)?; + let channel_name = match String::from_utf8(chn.to_vec()) { + Ok(k) => k, + Err(e) => { + return mkerr(format!("channelName utf8 error {:?}", e)); + } + }; + let (inp, len2) = be_i32(inp)?; + if len1 != len2 { + return mkerr(format!("Mismatch len1 {} len2 {}", len1, len2)); + } + let mut entries = Vec::new(); + let mut inp_a = inp; + while inp_a.len() > 0 { + let inp = inp_a; + let (inp, e) = parse_entry(inp)?; + if let Some(e) = e { + entries.push(e); + } + inp_a = inp; + } + // Do not sort the parsed config entries. + // We want to deliver the actual order which is found on disk. + // Important for troubleshooting. + let ret = ChannelConfigs { + format_version: ver, + channel_name, + entries, + }; + Ok((inp, ret)) +} + +pub fn parse_config(inp: &[u8]) -> Result { + let (_inp, ret) = parse_config_inner(inp).map_err(|e| ConfigParseError::ParseError(e.to_string()))?; + Ok(ret) +} + +#[derive(Clone)] +pub enum MatchingConfigEntry<'a> { + None, + Single(&'a ConfigEntry), + // In this case, we only return the entry which best matches to the time range + Multiple(&'a ConfigEntry), +} + +impl<'a> MatchingConfigEntry<'a> { + pub fn best(&self) -> Option<&ConfigEntry> { + match self { + MatchingConfigEntry::None => None, + MatchingConfigEntry::Single(e) => Some(e), + MatchingConfigEntry::Multiple(e) => Some(e), + } + } +} + +pub fn extract_matching_config_entry<'a>( + range: &NanoRange, + channel_config: &'a ChannelConfigs, +) -> Result, ConfigParseError> { + const DO_DEBUG: bool = false; + if DO_DEBUG { + debug!("extract_matching_config_entry range {range:?}"); + } + let mut a: Vec<_> = channel_config.entries.iter().enumerate().map(|(i, x)| (i, x)).collect(); + a.sort_unstable_by_key(|(_, x)| x.ts.ns()); + let a = a; + + if DO_DEBUG { + debug!("------------------------------------------------------------------"); + for x in &a { + debug!("SORTED {:3} {:?}", x.0, x.1.ks); + } + } + + let b: Vec<_> = a + .into_iter() + .rev() + .map({ + let mut last = None; + move |(i, x)| { + let k = last.clone(); + last = Some(x.ts.clone()); + (i, x, k) + } + }) + .collect(); + + if DO_DEBUG { + debug!("------------------------------------------------------------------"); + for x in &b { + debug!("NEIGHB {:3} {:?} {:?}", x.0, x.1.ks, x.2); + } + } + let c: Vec<_> = b + .into_iter() + .rev() + .map(|(i, e, tsn)| { + if let Some(ts2) = tsn.clone() { + if e.ts.ns() < range.end() { + let p = if e.ts.ns() < range.beg() { + range.beg() + } else { + e.ts.ns() + }; + let q = if ts2.ns() < range.beg() { + range.beg() + } else { + if ts2.ns() < range.end() { + ts2.ns() + } else { + range.end() + } + }; + (i, DtNano::from_ns(q - p), e) + } else { + (i, DtNano::from_ns(0), e) + } + } else { + if e.ts.ns() < range.end() { + if e.ts.ns() < range.beg() { + (i, DtNano::from_ns(range.delta()), e) + } else { + (i, DtNano::from_ns(range.end() - e.ts.ns()), e) + } + } else { + (i, DtNano::from_ns(0), e) + } + } + }) + .collect(); + + if DO_DEBUG { + debug!("------------------------------------------------------------------"); + for (i, dt, e) in &c { + debug!("WEIGHT {:3} {:?} {:?} {:?}", i, dt, e.ks, e.ts); + } + } + + let mut c = c; + c.sort_unstable_by_key(|(_, dt, _)| u64::MAX - dt.ns()); + let c = c; + + if DO_DEBUG { + debug!("------------------------------------------------------------------"); + for (i, dt, e) in &c { + debug!("WEISOR {:3} {:?} {:?} {:?}", i, dt, e.ks, e.ts); + } + } + + if let Some(&(i, _, _)) = c.first() { + Ok(MatchingConfigEntry::Single(&channel_config.entries[i])) + } else { + Ok(MatchingConfigEntry::None) + } +} + +#[cfg(test)] +mod test { + use super::parse_config; + + fn read_data() -> Vec { + use std::io::Read; + //let path = "ks/config/S10CB01-RLOD100-PUP10:SIG-AMPLT/latest/00000_Config"; + let cwd = std::env::current_dir(); + netpod::log::info!("CWD: {:?}", cwd); + let path = "../resources/sf-daqbuf-33-S10CB01-RLOD100-PUP10:SIG-AMPLT-latest-00000_Config"; + //let path = "../resources/sf-daqbuf-21-S10CB01-RLOD100-PUP10:SIG-AMPLT-latest-00000_Config"; + let mut f1 = std::fs::File::open(path).unwrap(); + let mut buf = Vec::new(); + f1.read_to_end(&mut buf).unwrap(); + buf + } + + #[test] + fn parse_dummy() { + let config = parse_config(&[0, 0, 0, 0, 0, 11, 0x61, 0x62, 0x63, 0, 0, 0, 11, 0, 0, 0, 1]).unwrap(); + assert_eq!(0, config.format_version); + assert_eq!("abc", config.channel_name); + } + + #[test] + fn open_file() { + let config = parse_config(&read_data()).unwrap(); + assert_eq!(config.format_version, 0); + assert_eq!(config.entries.len(), 18); + for e in &config.entries { + assert!(e.ts.ns() >= 631152000000000000); + assert!(e.ts.ns() <= 1613640673424172164); + assert!(e.shape.is_some()); + } + } +} diff --git a/src/jsonconf.rs b/src/jsonconf.rs new file mode 100644 index 0000000..2e74e7a --- /dev/null +++ b/src/jsonconf.rs @@ -0,0 +1,32 @@ +#[test] +fn test_json_trailing() { + use serde::Deserialize; + use serde_json::Value as JsonValue; + use std::io::Cursor; + if serde_json::from_str::(r#"{}."#).is_ok() { + panic!("Should fail because of trailing character"); + } + let cur = Cursor::new(r#"{}..."#); + let mut de = serde_json::Deserializer::from_reader(cur); + if JsonValue::deserialize(&mut de).is_err() { + panic!("Should allow trailing characters") + } + let cur = Cursor::new(r#"nullA"#); + let mut de = serde_json::Deserializer::from_reader(cur); + if let Ok(val) = JsonValue::deserialize(&mut de) { + if val != serde_json::json!(null) { + panic!("Bad parse") + } + } else { + panic!("Should allow trailing characters") + } + let cur = Cursor::new(r#" {}AA"#); + let mut de = serde_json::Deserializer::from_reader(cur); + if let Ok(val) = JsonValue::deserialize(&mut de) { + if val != serde_json::json!({}) { + panic!("Bad parse") + } + } else { + panic!("Should allow trailing characters") + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..181d75b --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,5 @@ +pub mod api1_parse; +pub mod channelconfig; +pub use nom; + +mod jsonconf;