From 2d566cc9ca7cad97373e87a5cd6599af1d46342a Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 20 Jun 2023 19:00:08 +0200 Subject: [PATCH] WIP --- daqbufp2/Cargo.toml | 4 +- daqbufp2/src/test/api1.rs | 21 +- daqbufp2/src/test/api1/api1_parse.rs | 184 ------------ daqbufp2/src/test/api1/data_api_python.rs | 7 + httpret/src/api1.rs | 156 +--------- netpod/src/netpod.rs | 1 + netpod/src/query/datetime.rs | 75 ++++- parse/Cargo.toml | 12 +- parse/src/api1_parse.rs | 341 ++++++++++++++++++++++ parse/src/lib.rs | 3 + parse/src/nom2.rs | 3 + 11 files changed, 452 insertions(+), 355 deletions(-) delete mode 100644 daqbufp2/src/test/api1/api1_parse.rs create mode 100644 parse/src/api1_parse.rs create mode 100644 parse/src/nom2.rs diff --git a/daqbufp2/Cargo.toml b/daqbufp2/Cargo.toml index 4d35d9e..b82846c 100644 --- a/daqbufp2/Cargo.toml +++ b/daqbufp2/Cargo.toml @@ -32,6 +32,4 @@ disk = { path = "../disk" } items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } streams = { path = "../streams" } - -[dev-dependencies] -nom = "7.1.1" +parse = { path = "../parse" } diff --git a/daqbufp2/src/test/api1.rs b/daqbufp2/src/test/api1.rs index 46510e4..21c160b 100644 --- a/daqbufp2/src/test/api1.rs +++ b/daqbufp2/src/test/api1.rs @@ -1,16 +1,17 @@ -mod api1_parse; mod data_api_python; use crate::nodes::require_test_hosts_running; -use crate::test::api1::api1_parse::Api1Frame; use err::Error; use futures_util::Future; use httpclient::http_post; -use httpret::api1::Api1ScalarType; use netpod::log::*; use netpod::query::api1::Api1Query; use netpod::query::api1::Api1Range; use netpod::query::api1::ChannelTuple; +use netpod::APP_OCTET; +use parse::api1_parse; +use parse::api1_parse::Api1Frame; +use parse::api1_parse::Api1ScalarType; use std::fmt; use url::Url; @@ -23,7 +24,7 @@ where taskrun::run(fut) } -fn is_monitonic_strict(it: I) -> bool +fn is_monotonic_strict(it: I) -> bool where I: Iterator, ::Item: PartialOrd + fmt::Debug, @@ -42,8 +43,8 @@ where #[test] fn test_is_monitonic_strict() { - assert_eq!(is_monitonic_strict([1, 2, 3].iter()), true); - assert_eq!(is_monitonic_strict([1, 2, 2].iter()), false); + assert_eq!(is_monotonic_strict([1, 2, 3].iter()), true); + assert_eq!(is_monotonic_strict([1, 2, 2].iter()), false); } #[test] @@ -57,7 +58,7 @@ fn events_f64_plain() -> Result<(), Error> { let cluster = &rh.cluster; let node = &cluster.nodes[0]; let url: Url = format!("http://{}:{}/api/1/query", node.host, node.port).parse()?; - let accept = "application/octet-stream"; + let accept = APP_OCTET; let range = Api1Range::new("1970-01-01T00:00:00Z".try_into()?, "1970-01-01T00:01:00Z".try_into()?)?; // TODO the channel list needs to get pre-processed to check for backend prefix! let ch = ChannelTuple::new(TEST_BACKEND.into(), "test-gen-i32-dim0-v01".into()); @@ -101,9 +102,9 @@ fn events_f64_plain() -> Result<(), Error> { _ => None, }) .collect(); - assert_eq!(is_monitonic_strict(tss.iter()), true); - assert_eq!(is_monitonic_strict(pulses.iter()), true); - assert_eq!(is_monitonic_strict(values.iter()), true); + assert_eq!(is_monotonic_strict(tss.iter()), true); + assert_eq!(is_monotonic_strict(pulses.iter()), true); + assert_eq!(is_monotonic_strict(values.iter()), true); for &val in &values { assert!(val >= 0); assert!(val < 120); diff --git a/daqbufp2/src/test/api1/api1_parse.rs b/daqbufp2/src/test/api1/api1_parse.rs deleted file mode 100644 index c6e112f..0000000 --- a/daqbufp2/src/test/api1/api1_parse.rs +++ /dev/null @@ -1,184 +0,0 @@ -use httpret::api1::Api1ChannelHeader; -use netpod::log::*; -use nom::multi::many0; -use nom::number::complete::{be_u32, be_u64, be_u8}; -use nom::IResult; -use std::num::NonZeroUsize; - -// u32be length_1. -// there is exactly length_1 more bytes in this message. -// u8 mtype: 0: channel-header, 1: data - -// for mtype == 0: -// The rest is a JSON with the channel header. - -// for mtype == 1: -// u64be timestamp -// u64be pulse -// After that comes exactly (length_1 - 17) bytes of data. - -#[derive(Debug)] -pub struct Header { - header: Api1ChannelHeader, -} - -impl Header { - pub fn header(&self) -> &Api1ChannelHeader { - &self.header - } -} - -#[derive(Debug)] -pub struct Data { - ts: u64, - pulse: u64, - data: Vec, -} - -impl Data { - pub fn ts(&self) -> u64 { - self.ts - } - - pub fn pulse(&self) -> u64 { - self.pulse - } - - pub fn data(&self) -> &[u8] { - &self.data - } -} - -#[derive(Debug)] -pub enum Api1Frame { - Header(Header), - Data(Data), -} - -fn header(inp: &[u8]) -> IResult<&[u8], Header> { - match serde_json::from_slice(inp) { - Ok(k) => { - let k: Api1ChannelHeader = k; - IResult::Ok((&inp[inp.len()..], Header { header: k })) - } - Err(e) => { - error!("json header parse error: {e}"); - let e = nom::Err::Failure(nom::error::make_error(inp, nom::error::ErrorKind::Fail)); - IResult::Err(e) - } - } -} - -fn data(inp: &[u8]) -> IResult<&[u8], Data> { - if inp.len() < 16 { - use nom::{Err, Needed}; - return IResult::Err(Err::Incomplete(Needed::Size(NonZeroUsize::new(16).unwrap()))); - } - let (inp, ts) = be_u64(inp)?; - let (inp, pulse) = be_u64(inp)?; - let data = inp.into(); - let inp = &inp[inp.len()..]; - let res = Data { ts, pulse, data }; - IResult::Ok((inp, res)) -} - -fn api1_frame_complete(inp: &[u8]) -> IResult<&[u8], Api1Frame> { - let (inp, mtype) = be_u8(inp)?; - if mtype == 0 { - let (inp, val) = header(inp)?; - if inp.len() != 0 { - // We did not consume the exact number of bytes - let kind = nom::error::ErrorKind::Verify; - let e = nom::error::Error::new(inp, kind); - Err(nom::Err::Failure(e)) - } else { - let res = Api1Frame::Header(val); - IResult::Ok((inp, res)) - } - } else if mtype == 1 { - let (inp, val) = data(inp)?; - if inp.len() != 0 { - // We did not consume the exact number of bytes - let kind = nom::error::ErrorKind::Verify; - let e = nom::error::Error::new(inp, kind); - Err(nom::Err::Failure(e)) - } else { - let res = Api1Frame::Data(val); - IResult::Ok((inp, res)) - } - } else { - let e = nom::Err::Incomplete(nom::Needed::Size(NonZeroUsize::new(1).unwrap())); - IResult::Err(e) - } -} - -fn api1_frame(inp: &[u8]) -> IResult<&[u8], Api1Frame> { - let (inp, len) = be_u32(inp)?; - if len < 1 { - use nom::error::{ErrorKind, ParseError}; - use nom::Err; - return IResult::Err(Err::Failure(ParseError::from_error_kind(inp, ErrorKind::Fail))); - } - if inp.len() < len as usize { - let e = nom::Err::Incomplete(nom::Needed::Size(NonZeroUsize::new(len as _).unwrap())); - IResult::Err(e) - } else { - let (inp2, inp) = inp.split_at(len as _); - let (inp2, res) = api1_frame_complete(inp2)?; - if inp2.len() != 0 { - let kind = nom::error::ErrorKind::Fail; - let e = nom::error::Error::new(inp, kind); - IResult::Err(nom::Err::Failure(e)) - } else { - IResult::Ok((inp, res)) - } - } -} - -type Nres<'a, T> = IResult<&'a [u8], T, nom::error::VerboseError<&'a [u8]>>; - -#[allow(unused)] -fn verbose_err(inp: &[u8]) -> Nres { - use nom::error::{ErrorKind, ParseError, VerboseError}; - use nom::Err; - let e = VerboseError::from_error_kind(inp, ErrorKind::Fail); - return IResult::Err(Err::Failure(e)); -} - -pub fn api1_frames(inp: &[u8]) -> IResult<&[u8], Vec> { - let (inp, res) = many0(api1_frame)(inp)?; - IResult::Ok((inp, res)) -} - -#[test] -fn test_basic_frames() -> Result<(), err::Error> { - use std::io::Write; - let mut buf = Vec::new(); - let js = r#"{"name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN"}"#; - buf.write(&(1 + js.as_bytes().len() as u32).to_be_bytes())?; - buf.write(&[0])?; - buf.write(js.as_bytes())?; - - buf.write(&25u32.to_be_bytes())?; - buf.write(&[1])?; - buf.write(&20u64.to_be_bytes())?; - buf.write(&21u64.to_be_bytes())?; - buf.write(&5.123f64.to_be_bytes())?; - - buf.write(&25u32.to_be_bytes())?; - buf.write(&[1])?; - buf.write(&22u64.to_be_bytes())?; - buf.write(&23u64.to_be_bytes())?; - buf.write(&7.88f64.to_be_bytes())?; - - match api1_frames(&buf) { - Ok((_, frames)) => { - assert_eq!(frames.len(), 3); - } - Err(e) => { - error!("can not parse result: {e}"); - panic!() - } - }; - Ok(()) -} diff --git a/daqbufp2/src/test/api1/data_api_python.rs b/daqbufp2/src/test/api1/data_api_python.rs index fc3e6f3..45cbc91 100644 --- a/daqbufp2/src/test/api1/data_api_python.rs +++ b/daqbufp2/src/test/api1/data_api_python.rs @@ -11,6 +11,7 @@ use netpod::HostPort; use netpod::SfDbChannel; use netpod::APP_JSON; use netpod::DATETIME_FMT_3MS; +use parse::api1_parse::api1_frames; use url::Url; const TEST_BACKEND: &str = "testbackend-00"; @@ -74,6 +75,12 @@ fn api3_hdf_dim0_00() -> Result<(), Error> { cluster, ) .await?; + use parse::nom; + let x: Result<(&[u8], Vec), nom::Err>> = + api1_frames(&jsv); + let res = x.unwrap(); + eprintln!("{res:?}"); + panic!(); Ok(()) }; taskrun::run(fut) diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 848b394..3de3703 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -9,11 +9,9 @@ use bytes::BytesMut; use disk::eventchunker::EventChunkerConf; use disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes; use disk::raw::conn::make_local_event_blobs_stream; -use futures_util::stream; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; -use futures_util::TryFutureExt; use futures_util::TryStreamExt; use http::Method; use http::StatusCode; @@ -32,7 +30,6 @@ use netpod::query::api1::Api1Query; use netpod::range::evrange::NanoRange; use netpod::timeunits::SEC; use netpod::ByteSize; -use netpod::ChConf; use netpod::ChannelSearchQuery; use netpod::ChannelSearchResult; use netpod::ChannelTypeConfigGen; @@ -40,23 +37,19 @@ use netpod::DiskIoTune; use netpod::NodeConfigCached; use netpod::PerfOpts; use netpod::ProxyConfig; -use netpod::ScalarType; use netpod::SfChFetchInfo; use netpod::SfDbChannel; use netpod::Shape; use netpod::ACCEPT_ALL; use netpod::APP_JSON; use netpod::APP_OCTET; -use parse::channelconfig::extract_matching_config_entry; -use parse::channelconfig::read_local_config; -use parse::channelconfig::ChannelConfigs; -use parse::channelconfig::ConfigEntry; +use parse::api1_parse::Api1ByteOrder; +use parse::api1_parse::Api1ChannelHeader; use query::api4::events::PlainEventsQuery; use serde::Deserialize; use serde::Serialize; use serde_json::Value as JsonValue; use std::collections::VecDeque; -use std::fmt; use std::future::Future; use std::pin::Pin; use std::task::Context; @@ -519,127 +512,6 @@ async fn process_answer(res: Response) -> Result { } } -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub enum Api1ScalarType { - #[serde(rename = "uint8")] - U8, - #[serde(rename = "uint16")] - U16, - #[serde(rename = "uint32")] - U32, - #[serde(rename = "uint64")] - U64, - #[serde(rename = "int8")] - I8, - #[serde(rename = "int16")] - I16, - #[serde(rename = "int32")] - I32, - #[serde(rename = "int64")] - I64, - #[serde(rename = "float32")] - F32, - #[serde(rename = "float64")] - F64, - #[serde(rename = "bool")] - BOOL, - #[serde(rename = "string")] - STRING, -} - -impl Api1ScalarType { - pub fn to_str(&self) -> &'static str { - use Api1ScalarType as A; - match self { - A::U8 => "uint8", - A::U16 => "uint16", - A::U32 => "uint32", - A::U64 => "uint64", - A::I8 => "int8", - A::I16 => "int16", - A::I32 => "int32", - A::I64 => "int64", - A::F32 => "float32", - A::F64 => "float64", - A::BOOL => "bool", - A::STRING => "string", - } - } -} - -impl fmt::Display for Api1ScalarType { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "{}", self.to_str()) - } -} - -impl From<&ScalarType> for Api1ScalarType { - fn from(k: &ScalarType) -> Self { - use Api1ScalarType as B; - use ScalarType as A; - match k { - A::U8 => B::U8, - A::U16 => B::U16, - A::U32 => B::U32, - A::U64 => B::U64, - A::I8 => B::I8, - A::I16 => B::I16, - A::I32 => B::I32, - A::I64 => B::I64, - A::F32 => B::F32, - A::F64 => B::F64, - A::BOOL => B::BOOL, - A::STRING => B::STRING, - } - } -} - -impl From for Api1ScalarType { - fn from(x: ScalarType) -> Self { - (&x).into() - } -} - -#[test] -fn test_custom_variant_name() { - let val = Api1ScalarType::F32; - assert_eq!(format!("{val:?}"), "F32"); - assert_eq!(format!("{val}"), "float32"); - let s = serde_json::to_string(&val).unwrap(); - assert_eq!(s, "\"float32\""); -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub enum Api1ByteOrder { - #[serde(rename = "LITTLE_ENDIAN")] - Little, - #[serde(rename = "BIG_ENDIAN")] - Big, -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct Api1ChannelHeader { - name: String, - #[serde(rename = "type")] - ty: Api1ScalarType, - #[serde(rename = "byteOrder")] - byte_order: Api1ByteOrder, - #[serde(default)] - shape: Vec, - #[serde(default, skip_serializing_if = "Option::is_none")] - compression: Option, -} - -impl Api1ChannelHeader { - pub fn name(&self) -> &str { - &self.name - } - - pub fn ty(&self) -> Api1ScalarType { - self.ty.clone() - } -} - async fn find_ch_conf( range: NanoRange, channel: SfDbChannel, @@ -728,21 +600,19 @@ impl DataApiPython3DataStream { None }; }; - let compression = if let Some(_) = &b.comps[i1] { Some(1) } else { None }; if !*header_out { - let head = Api1ChannelHeader { - name: channel.name().into(), - ty: (&b.scalar_types[i1]).into(), - byte_order: if b.be[i1] { - Api1ByteOrder::Big - } else { - Api1ByteOrder::Little - }, - // The shape is inconsistent on the events. - // Seems like the config is to be trusted in this case. - shape: shape.to_u32_vec(), - compression, + let byte_order = if b.be[i1] { + Api1ByteOrder::Big + } else { + Api1ByteOrder::Little }; + let head = Api1ChannelHeader::new( + channel.name().into(), + b.scalar_types.get(i1).unwrap().into(), + byte_order, + shape.clone(), + b.comps.get(i1).map(|x| x.clone()).unwrap(), + ); let h = serde_json::to_string(&head)?; info!("sending channel header {}", h); let l1 = 1 + h.as_bytes().len() as u32; diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 7363a2f..588c05a 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -47,6 +47,7 @@ pub const CONNECTION_STATUS_DIV: u64 = timeunits::DAY; pub const TS_MSP_GRID_UNIT: u64 = timeunits::SEC * 10; pub const TS_MSP_GRID_SPACING: u64 = 6 * 2; +pub const DATETIME_FMT_0MS: &str = "%Y-%m-%dT%H:%M:%SZ"; pub const DATETIME_FMT_3MS: &str = "%Y-%m-%dT%H:%M:%S.%3fZ"; pub const DATETIME_FMT_6MS: &str = "%Y-%m-%dT%H:%M:%S.%6fZ"; pub const DATETIME_FMT_9MS: &str = "%Y-%m-%dT%H:%M:%S.%9fZ"; diff --git a/netpod/src/query/datetime.rs b/netpod/src/query/datetime.rs index c0c0578..3039098 100644 --- a/netpod/src/query/datetime.rs +++ b/netpod/src/query/datetime.rs @@ -6,7 +6,6 @@ use serde::Deserialize; use serde::Serialize; use std::fmt; use std::ops; -use std::time::Duration; #[derive(Clone, Debug, PartialEq)] pub struct Datetime(DateTime); @@ -43,24 +42,66 @@ impl Serialize for Datetime { S: serde::Serializer, { use fmt::Write; + use serde::ser::Error; let val = &self.0; let mut s = String::with_capacity(64); - write!(&mut s, "{}", val.format("%Y-%m-%dT%H:%M:%S")).map_err(|_| serde::ser::Error::custom("fmt"))?; + write!(&mut s, "{}", val.format("%Y-%m-%dT%H:%M:%S")).map_err(|_| Error::custom("fmt"))?; + let ns = val.timestamp_subsec_nanos(); let mus = val.timestamp_subsec_micros(); - if mus % 1000 != 0 { - write!(&mut s, "{}", val.format(".%6f")).map_err(|_| serde::ser::Error::custom("fmt"))?; + if ns % 1000 != 0 { + write!(&mut s, "{}", val.format(".%9f")).map_err(|_| Error::custom("fmt"))?; + } else if mus % 1000 != 0 { + write!(&mut s, "{}", val.format(".%6f")).map_err(|_| Error::custom("fmt"))?; } else if mus != 0 { - write!(&mut s, "{}", val.format(".%3f")).map_err(|_| serde::ser::Error::custom("fmt"))?; + write!(&mut s, "{}", val.format(".%3f")).map_err(|_| Error::custom("fmt"))?; } if val.offset().local_minus_utc() == 0 { - write!(&mut s, "Z").map_err(|_| serde::ser::Error::custom("fmt"))?; + write!(&mut s, "Z").map_err(|_| Error::custom("fmt"))?; } else { - write!(&mut s, "{}", val.format("%:z")).map_err(|_| serde::ser::Error::custom("fmt"))?; + write!(&mut s, "{}", val.format("%:z")).map_err(|_| Error::custom("fmt"))?; } serializer.collect_str(&s) } } +mod ser_impl_2 { + use super::Datetime; + use crate::DATETIME_FMT_0MS; + use crate::DATETIME_FMT_3MS; + use crate::DATETIME_FMT_6MS; + use crate::DATETIME_FMT_9MS; + use fmt::Write; + use serde::ser::Error; + use std::fmt; + + #[allow(unused)] + fn serialize(obj: &Datetime, serializer: S) -> Result + where + S: serde::Serializer, + { + let val = &obj.0; + let mut s = String::with_capacity(64); + write!(&mut s, "{}", val.format("%Y-%m-%dT%H:%M:%S")).map_err(|_| Error::custom("fmt"))?; + let ns = val.timestamp_subsec_nanos(); + let s = if ns % 1000 != 0 { + val.format(DATETIME_FMT_9MS) + } else { + let mus = val.timestamp_subsec_micros(); + if mus % 1000 != 0 { + val.format(DATETIME_FMT_6MS) + } else { + let ms = val.timestamp_subsec_millis(); + if ms != 0 { + val.format(DATETIME_FMT_3MS) + } else { + val.format(DATETIME_FMT_0MS) + } + } + }; + serializer.collect_str(&s) + } +} + struct Vis1; impl<'de> Visitor<'de> for Vis1 { @@ -125,10 +166,26 @@ fn ser_02() { .with_ymd_and_hms(2023, 2, 3, 15, 12, 40) .earliest() .unwrap() - .checked_add_signed(chrono::Duration::nanoseconds(543432120)) + .checked_add_signed(chrono::Duration::nanoseconds(543430000)) .unwrap(); let x = Datetime(x); let s = serde_json::to_string(&x).unwrap(); - assert_eq!(s, r#""2023-02-03T15:12:40.543432Z""#); + assert_eq!(s, r#""2023-02-03T15:12:40.543430Z""#); +} + +#[test] +fn ser_03() { + use chrono::TimeZone; + let x = FixedOffset::east_opt(0) + .unwrap() + .with_ymd_and_hms(2023, 2, 3, 15, 12, 40) + .earliest() + .unwrap() + .checked_add_signed(chrono::Duration::nanoseconds(543432321)) + .unwrap(); + let x = Datetime(x); + let s = serde_json::to_string(&x).unwrap(); + + assert_eq!(s, r#""2023-02-03T15:12:40.543432321Z""#); } diff --git a/parse/Cargo.toml b/parse/Cargo.toml index 5ad7f5d..f11065d 100644 --- a/parse/Cargo.toml +++ b/parse/Cargo.toml @@ -6,13 +6,13 @@ edition = "2021" [dependencies] serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0.89" -tokio = { version = "1.22.0", features = ["fs"] } -chrono = { version = "0.4.19", features = ["serde"] } -bytes = "1.0.1" -byteorder = "1.4.3" +serde_json = "1.0" +tokio = { version = "1.28.2", features = ["fs"] } +chrono = { version = "0.4.26", features = ["serde"] } +bytes = "1.4" +byteorder = "1.4" hex = "0.4.3" -nom = "7.1.1" +nom = "7.1.3" num-traits = "0.2" num-derive = "0.3" err = { path = "../err" } diff --git a/parse/src/api1_parse.rs b/parse/src/api1_parse.rs new file mode 100644 index 0000000..843fea9 --- /dev/null +++ b/parse/src/api1_parse.rs @@ -0,0 +1,341 @@ +use crate::channelconfig::CompressionMethod; +use crate::nom; +use netpod::log::*; +use netpod::ScalarType; +use netpod::Shape; +use nom::error::context; +use nom::error::ContextError; +use nom::error::ErrorKind; +use nom::error::ParseError; +use nom::multi::many0; +use nom::number::complete::be_u32; +use nom::number::complete::be_u64; +use nom::number::complete::be_u8; +use nom::Err; +use nom::IResult; +use nom::Needed; +use serde::Deserialize; +use serde::Serialize; +use std::fmt; +use std::num::NonZeroUsize; + +//type Nres<'a, T> = IResult<&'a [u8], T, nom::error::VerboseError<&'a [u8]>>; + +type Nres<'a, O, E = nom::error::Error<&'a [u8]>> = Result<(&'a [u8], O), nom::Err>; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum Api1ByteOrder { + #[serde(rename = "LITTLE_ENDIAN")] + Little, + #[serde(rename = "BIG_ENDIAN")] + Big, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum Api1ScalarType { + #[serde(rename = "uint8")] + U8, + #[serde(rename = "uint16")] + U16, + #[serde(rename = "uint32")] + U32, + #[serde(rename = "uint64")] + U64, + #[serde(rename = "int8")] + I8, + #[serde(rename = "int16")] + I16, + #[serde(rename = "int32")] + I32, + #[serde(rename = "int64")] + I64, + #[serde(rename = "float32")] + F32, + #[serde(rename = "float64")] + F64, + #[serde(rename = "bool")] + BOOL, + #[serde(rename = "string")] + STRING, +} + +impl Api1ScalarType { + pub fn to_str(&self) -> &'static str { + use Api1ScalarType as A; + match self { + A::U8 => "uint8", + A::U16 => "uint16", + A::U32 => "uint32", + A::U64 => "uint64", + A::I8 => "int8", + A::I16 => "int16", + A::I32 => "int32", + A::I64 => "int64", + A::F32 => "float32", + A::F64 => "float64", + A::BOOL => "bool", + A::STRING => "string", + } + } +} + +#[test] +fn test_custom_variant_name() { + let val = Api1ScalarType::F32; + assert_eq!(format!("{val:?}"), "F32"); + assert_eq!(format!("{val}"), "float32"); + let s = serde_json::to_string(&val).unwrap(); + assert_eq!(s, "\"float32\""); +} + +impl fmt::Display for Api1ScalarType { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.to_str()) + } +} + +impl From<&ScalarType> for Api1ScalarType { + fn from(k: &ScalarType) -> Self { + use Api1ScalarType as B; + use ScalarType as A; + match k { + A::U8 => B::U8, + A::U16 => B::U16, + A::U32 => B::U32, + A::U64 => B::U64, + A::I8 => B::I8, + A::I16 => B::I16, + A::I32 => B::I32, + A::I64 => B::I64, + A::F32 => B::F32, + A::F64 => B::F64, + A::BOOL => B::BOOL, + A::STRING => B::STRING, + } + } +} + +impl From for Api1ScalarType { + fn from(x: ScalarType) -> Self { + (&x).into() + } +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct Api1ChannelHeader { + name: String, + #[serde(rename = "type")] + ty: Api1ScalarType, + #[serde(rename = "byteOrder")] + byte_order: Api1ByteOrder, + #[serde(default)] + shape: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] + compression: Option, +} + +impl Api1ChannelHeader { + pub fn new( + name: String, + ty: Api1ScalarType, + byte_order: Api1ByteOrder, + shape: Shape, + compression: Option, + ) -> Self { + Self { + name, + ty, + byte_order, + shape: shape.to_u32_vec(), + compression: compression.map(|x| x.to_i16() as usize), + } + } + + pub fn name(&self) -> &str { + &self.name + } + + pub fn ty(&self) -> Api1ScalarType { + self.ty.clone() + } +} + +// u32be length_1. +// there is exactly length_1 more bytes in this message. +// u8 mtype: 0: channel-header, 1: data + +// for mtype == 0: +// The rest is a JSON with the channel header. + +// for mtype == 1: +// u64be timestamp +// u64be pulse +// After that comes exactly (length_1 - 17) bytes of data. + +#[derive(Debug)] +pub struct Header { + header: Api1ChannelHeader, +} + +impl Header { + pub fn header(&self) -> &Api1ChannelHeader { + &self.header + } +} + +#[derive(Debug)] +pub struct Data { + ts: u64, + pulse: u64, + data: Vec, +} + +impl Data { + pub fn ts(&self) -> u64 { + self.ts + } + + pub fn pulse(&self) -> u64 { + self.pulse + } + + pub fn data(&self) -> &[u8] { + &self.data + } +} + +#[derive(Debug)] +pub enum Api1Frame { + Header(Header), + Data(Data), +} + +fn fail_on_input<'a, T, E>(inp: &'a [u8]) -> Nres<'a, T, E> +where + E: ParseError<&'a [u8]>, +{ + // IResult::Err(Err::Failure(make_error(inp, ErrorKind::Fail))) + let e = nom::error::ParseError::from_error_kind(inp, ErrorKind::Fail); + IResult::Err(Err::Failure(e)) +} + +fn header(inp: &[u8]) -> Nres
{ + match serde_json::from_slice(inp) { + Ok(k) => { + let k: Api1ChannelHeader = k; + IResult::Ok((&inp[inp.len()..], Header { header: k })) + } + Err(e) => { + let s = std::str::from_utf8(inp); + error!("can not parse json: {e}\n{s:?}"); + context("json parse", fail_on_input)(inp) + } + } +} + +fn data(inp: &[u8]) -> Nres { + if inp.len() < 16 { + IResult::Err(Err::Incomplete(Needed::Size(NonZeroUsize::new(16).unwrap()))) + } else { + let (inp, ts) = be_u64(inp)?; + let (inp, pulse) = be_u64(inp)?; + let data = inp.into(); + let inp = &inp[inp.len()..]; + let res = Data { ts, pulse, data }; + IResult::Ok((inp, res)) + } +} + +fn api1_frame_complete(inp: &[u8]) -> Nres { + let (inp, mtype) = be_u8(inp)?; + if mtype == 0 { + let (inp, val) = header(inp)?; + if inp.len() != 0 { + context("header did not consume all bytes", fail_on_input)(inp) + } else { + let res = Api1Frame::Header(val); + IResult::Ok((inp, res)) + } + } else if mtype == 1 { + let (inp, val) = data(inp)?; + if inp.len() != 0 { + context("data did not consume all bytes", fail_on_input)(inp) + } else { + let res = Api1Frame::Data(val); + IResult::Ok((inp, res)) + } + } else { + let e = Err::Incomplete(Needed::Size(NonZeroUsize::new(1).unwrap())); + IResult::Err(e) + } +} + +fn api1_frame(inp: &[u8]) -> Nres { + let (inp, len) = be_u32(inp)?; + if len < 1 { + IResult::Err(Err::Failure(ParseError::from_error_kind(inp, ErrorKind::Fail))) + } else { + if inp.len() < len as usize { + let e = Err::Incomplete(Needed::Size(NonZeroUsize::new(len as _).unwrap())); + IResult::Err(e) + } else { + let (inp2, inp) = inp.split_at(len as _); + let (inp2, res) = api1_frame_complete(inp2)?; + if inp2.len() != 0 { + context("frame did not consume all bytes", fail_on_input)(inp) + } else { + IResult::Ok((inp, res)) + } + } + } +} + +pub fn api1_frames<'a, E>(inp: &[u8]) -> Nres, E> +where + E: ParseError<&'a [u8]> + ContextError<&'a [u8]>, +{ + // TODO remove unwrap + IResult::Ok(many0(api1_frame)(inp).unwrap()) +} + +#[allow(unused)] +fn verbose_err(inp: &[u8]) -> Nres { + use nom::error::ErrorKind; + use nom::error::ParseError; + use nom::error::VerboseError; + use nom::Err; + let e = ParseError::from_error_kind(inp, ErrorKind::Fail); + IResult::Err(Err::Failure(e)) +} + +#[test] +fn test_basic_frames() -> Result<(), err::Error> { + use std::io::Write; + let mut buf = Vec::new(); + let js = r#"{"name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN"}"#; + buf.write(&(1 + js.as_bytes().len() as u32).to_be_bytes())?; + buf.write(&[0])?; + buf.write(js.as_bytes())?; + + buf.write(&25u32.to_be_bytes())?; + buf.write(&[1])?; + buf.write(&20u64.to_be_bytes())?; + buf.write(&21u64.to_be_bytes())?; + buf.write(&5.123f64.to_be_bytes())?; + + buf.write(&25u32.to_be_bytes())?; + buf.write(&[1])?; + buf.write(&22u64.to_be_bytes())?; + buf.write(&23u64.to_be_bytes())?; + buf.write(&7.88f64.to_be_bytes())?; + + match api1_frames::>(&buf) { + Ok((_, frames)) => { + assert_eq!(frames.len(), 3); + } + Err(e) => { + panic!("can not parse result: {e}") + } + }; + Ok(()) +} diff --git a/parse/src/lib.rs b/parse/src/lib.rs index 7244ccf..10ea298 100644 --- a/parse/src/lib.rs +++ b/parse/src/lib.rs @@ -1,2 +1,5 @@ +pub mod api1_parse; pub mod channelconfig; mod jsonconf; +pub mod nom2; +pub use nom; diff --git a/parse/src/nom2.rs b/parse/src/nom2.rs new file mode 100644 index 0000000..3838425 --- /dev/null +++ b/parse/src/nom2.rs @@ -0,0 +1,3 @@ +// pub use nom::error::VerboseError; +// pub use nom::Err; +// pub use nom::IResult;