From ce1f32234649cb028d5cd8745205d49088b6daa8 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 21 Jun 2023 11:36:36 +0200 Subject: [PATCH] Assert timestamps in test --- daqbufp2/src/test/api1.rs | 2 +- daqbufp2/src/test/api1/data_api_python.rs | 32 ++++++++++-- disk/src/raw/generated.rs | 12 +++-- parse/src/api1_parse.rs | 61 +++++++++++++++-------- 4 files changed, 76 insertions(+), 31 deletions(-) diff --git a/daqbufp2/src/test/api1.rs b/daqbufp2/src/test/api1.rs index 21c160b..2ea6b91 100644 --- a/daqbufp2/src/test/api1.rs +++ b/daqbufp2/src/test/api1.rs @@ -66,7 +66,7 @@ fn events_f64_plain() -> Result<(), Error> { let body = serde_json::to_string(&qu)?; let buf = http_post(url, accept, body.into()).await?; eprintln!("body received: {}", buf.len()); - match api1_parse::api1_frames(&buf) { + match api1_parse::api1_frames::>(&buf) { Ok((_, frames)) => { debug!("FRAMES LEN: {}", frames.len()); assert_eq!(frames.len(), 121); diff --git a/daqbufp2/src/test/api1/data_api_python.rs b/daqbufp2/src/test/api1/data_api_python.rs index 45cbc91..a90b090 100644 --- a/daqbufp2/src/test/api1/data_api_python.rs +++ b/daqbufp2/src/test/api1/data_api_python.rs @@ -6,12 +6,15 @@ use http::StatusCode; use hyper::Body; use netpod::log::*; use netpod::range::evrange::NanoRange; +use netpod::timeunits::MS; use netpod::Cluster; use netpod::HostPort; use netpod::SfDbChannel; use netpod::APP_JSON; use netpod::DATETIME_FMT_3MS; use parse::api1_parse::api1_frames; +use parse::api1_parse::Api1Frame; +use parse::api1_parse::Api1ScalarType; use url::Url; const TEST_BACKEND: &str = "testbackend-00"; @@ -75,12 +78,31 @@ fn api3_hdf_dim0_00() -> Result<(), Error> { cluster, ) .await?; + use nom::error::VerboseError; use parse::nom; - let x: Result<(&[u8], Vec), nom::Err>> = - api1_frames(&jsv); - let res = x.unwrap(); - eprintln!("{res:?}"); - panic!(); + if false { + // Uses the default error type, but not unwrapped: + let _x: nom::IResult<_, _> = api1_frames(&jsv); + } + if false { + // Default error and unwrapped: + let (_, _) = api1_frames::>(&jsv).unwrap(); + } + let (_, frames) = api1_frames::>(&jsv).unwrap(); + if let Api1Frame::Header(header) = frames.get(0).expect("frame") { + assert_eq!(header.header().ty(), Api1ScalarType::I32); + } else { + panic!("expect header frame"); + } + for (i, frame) in frames[1..].iter().enumerate() { + if let Api1Frame::Data(data) = frame { + assert_eq!(data.ts() / MS, 1000 * (60 * 20 + 4 + i as u64)); + eprintln!("ts {}", data.ts() / MS); + } else { + panic!("expect data frame"); + } + } + assert_eq!(frames.len(), 67); Ok(()) }; taskrun::run(fut) diff --git a/disk/src/raw/generated.rs b/disk/src/raw/generated.rs index e79c923..81e9ef5 100644 --- a/disk/src/raw/generated.rs +++ b/disk/src/raw/generated.rs @@ -45,8 +45,9 @@ impl EventBlobsGeneratorI32Test00 { SeriesRange::TimeRange(k) => k, SeriesRange::PulseRange(_) => todo!(), }; - let dts = MS * 1000 * node_count as u64; - let ts = (range.beg / dts + node_ix) * dts; + let dt = MS * 1000; + let ts = (range.beg / dt + node_ix) * dt; + let dts = dt * node_count as u64; let tsend = range.end; Self { ts, @@ -68,7 +69,7 @@ impl EventBlobsGeneratorI32Test00 { let mut item = EventFull::empty(); let mut ts = self.ts; loop { - if self.ts >= self.tsend || item.byte_estimate() > 200 { + if ts >= self.tsend || item.byte_estimate() > 200 { break; } let pulse = ts; @@ -146,8 +147,9 @@ impl EventBlobsGeneratorI32Test01 { SeriesRange::TimeRange(k) => k, SeriesRange::PulseRange(_) => todo!(), }; - let dts = MS * 500 * node_count as u64; - let ts = (range.beg / dts + node_ix) * dts; + let dt = MS * 500; + let ts = (range.beg / dt + node_ix) * dt; + let dts = dt * node_count as u64; let tsend = range.end; Self { ts, diff --git a/parse/src/api1_parse.rs b/parse/src/api1_parse.rs index 843fea9..e933f11 100644 --- a/parse/src/api1_parse.rs +++ b/parse/src/api1_parse.rs @@ -3,6 +3,7 @@ use crate::nom; use netpod::log::*; use netpod::ScalarType; use netpod::Shape; +use nom::bytes::complete::take; use nom::error::context; use nom::error::ContextError; use nom::error::ErrorKind; @@ -19,8 +20,6 @@ use serde::Serialize; use std::fmt; use std::num::NonZeroUsize; -//type Nres<'a, T> = IResult<&'a [u8], T, nom::error::VerboseError<&'a [u8]>>; - type Nres<'a, O, E = nom::error::Error<&'a [u8]>> = Result<(&'a [u8], O), nom::Err>; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -210,43 +209,52 @@ pub enum Api1Frame { Data(Data), } -fn fail_on_input<'a, T, E>(inp: &'a [u8]) -> Nres<'a, T, E> +fn fail_on_input<'a, T, E>(inp: &'a [u8]) -> Nres where E: ParseError<&'a [u8]>, { - // IResult::Err(Err::Failure(make_error(inp, ErrorKind::Fail))) let e = nom::error::ParseError::from_error_kind(inp, ErrorKind::Fail); IResult::Err(Err::Failure(e)) } -fn header(inp: &[u8]) -> Nres
{ +fn header<'a, E>(inp: &'a [u8]) -> Nres +where + E: ParseError<&'a [u8]> + ContextError<&'a [u8]>, +{ match serde_json::from_slice(inp) { Ok(k) => { let k: Api1ChannelHeader = k; + eprintln!("Parsed header OK: {k:?}"); IResult::Ok((&inp[inp.len()..], Header { header: k })) } Err(e) => { - let s = std::str::from_utf8(inp); + let s = String::from_utf8_lossy(inp); error!("can not parse json: {e}\n{s:?}"); context("json parse", fail_on_input)(inp) } } } -fn data(inp: &[u8]) -> Nres { +fn data<'a, E>(inp: &'a [u8]) -> Nres +where + E: ParseError<&'a [u8]>, +{ if inp.len() < 16 { IResult::Err(Err::Incomplete(Needed::Size(NonZeroUsize::new(16).unwrap()))) } else { let (inp, ts) = be_u64(inp)?; let (inp, pulse) = be_u64(inp)?; - let data = inp.into(); - let inp = &inp[inp.len()..]; + let (inp, data) = take(inp.len())(inp)?; + let data = data.into(); let res = Data { ts, pulse, data }; IResult::Ok((inp, res)) } } -fn api1_frame_complete(inp: &[u8]) -> Nres { +fn api1_frame_complete<'a, E>(inp: &'a [u8]) -> Nres +where + E: ParseError<&'a [u8]> + ContextError<&'a [u8]>, +{ let (inp, mtype) = be_u8(inp)?; if mtype == 0 { let (inp, val) = header(inp)?; @@ -270,32 +278,40 @@ fn api1_frame_complete(inp: &[u8]) -> Nres { } } -fn api1_frame(inp: &[u8]) -> Nres { +fn api1_frame<'a, E>(inp: &'a [u8]) -> Nres +where + E: ParseError<&'a [u8]> + ContextError<&'a [u8]>, +{ + let inp_orig = inp; let (inp, len) = be_u32(inp)?; if len < 1 { IResult::Err(Err::Failure(ParseError::from_error_kind(inp, ErrorKind::Fail))) } else { - if inp.len() < len as usize { + if inp.len() < len as usize + 4 { let e = Err::Incomplete(Needed::Size(NonZeroUsize::new(len as _).unwrap())); IResult::Err(e) } else { - let (inp2, inp) = inp.split_at(len as _); - let (inp2, res) = api1_frame_complete(inp2)?; - if inp2.len() != 0 { - context("frame did not consume all bytes", fail_on_input)(inp) + let (inp, payload) = nom::bytes::complete::take(len)(inp)?; + let (inp, len2) = be_u32(inp)?; + if len != len2 { + IResult::Err(Err::Failure(ParseError::from_error_kind(inp_orig, ErrorKind::Fail))) } else { - IResult::Ok((inp, res)) + let (left, res) = api1_frame_complete(payload)?; + if left.len() != 0 { + context("frame did not consume all bytes", fail_on_input)(inp_orig) + } else { + IResult::Ok((inp, res)) + } } } } } -pub fn api1_frames<'a, E>(inp: &[u8]) -> Nres, E> +pub fn api1_frames<'a, E>(inp: &'a [u8]) -> Nres, E> where E: ParseError<&'a [u8]> + ContextError<&'a [u8]>, { - // TODO remove unwrap - IResult::Ok(many0(api1_frame)(inp).unwrap()) + many0(api1_frame)(inp) } #[allow(unused)] @@ -308,6 +324,11 @@ fn verbose_err(inp: &[u8]) -> Nres { IResult::Err(Err::Failure(e)) } +#[test] +fn combinator_default_err() { + be_u32::<_, nom::error::Error<_>>([1, 2, 3, 4].as_slice()).unwrap(); +} + #[test] fn test_basic_frames() -> Result<(), err::Error> { use std::io::Write;