From 8eedf53f3912e2d25ea95cc787a7593a36a3dd1a Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 23 Nov 2022 15:10:00 +0100 Subject: [PATCH] Make serde Shape compatible with bincode --- items/Cargo.toml | 2 + items/src/binsdim0.rs | 3 +- items/src/binsdim1.rs | 3 +- items/src/frame.rs | 99 ++++++++++++--- netpod/src/netpod.rs | 279 ++++++++++++++++++++++++------------------ 5 files changed, 246 insertions(+), 140 deletions(-) diff --git a/items/Cargo.toml b/items/Cargo.toml index 6e485a1..b58c420 100644 --- a/items/Cargo.toml +++ b/items/Cargo.toml @@ -14,6 +14,8 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" ciborium = "0.2" rmp-serde = "1.1.1" +bson = "2.4.0" +bincode = "1.3.3" erased-serde = "0.3" bytes = "1.2.1" num-traits = "0.2.15" diff --git a/items/src/binsdim0.rs b/items/src/binsdim0.rs index 9359344..440f002 100644 --- a/items/src/binsdim0.rs +++ b/items/src/binsdim0.rs @@ -1,3 +1,4 @@ +use crate::frame::bincode_from_slice; use crate::numops::NumOps; use crate::streams::{Collectable, Collector, ToJsonBytes, ToJsonResult}; use crate::Appendable; @@ -211,7 +212,7 @@ where } fn from_buf(buf: &[u8]) -> Result { - let dec = rmp_serde::from_slice(&buf)?; + let dec = bincode_from_slice(buf)?; Ok(dec) } } diff --git a/items/src/binsdim1.rs b/items/src/binsdim1.rs index 98ebcd1..3ceeff6 100644 --- a/items/src/binsdim1.rs +++ b/items/src/binsdim1.rs @@ -1,3 +1,4 @@ +use crate::frame::bincode_from_slice; use crate::numops::NumOps; use crate::streams::{Collectable, Collector, ToJsonBytes, ToJsonResult}; use crate::ts_offs_from_abs; @@ -208,7 +209,7 @@ where } fn from_buf(buf: &[u8]) -> Result { - let dec = rmp_serde::from_slice(&buf)?; + let dec = bincode_from_slice(buf)?; Ok(dec) } } diff --git a/items/src/frame.rs b/items/src/frame.rs index 81ef444..f820f64 100644 --- a/items/src/frame.rs +++ b/items/src/frame.rs @@ -2,6 +2,10 @@ use crate::inmem::InMemoryFrame; use crate::{ContainsError, FrameDecodable, FrameType, LogItem, StatsItem}; use crate::{ERROR_FRAME_TYPE_ID, INMEM_FRAME_ENCID, INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC}; use crate::{LOG_FRAME_TYPE_ID, RANGE_COMPLETE_FRAME_TYPE_ID, STATS_FRAME_TYPE_ID, TERM_FRAME_TYPE_ID}; +use bincode::config::{ + FixintEncoding, LittleEndian, RejectTrailing, WithOtherEndian, WithOtherIntEncoding, WithOtherTrailing, +}; +use bincode::DefaultOptions; use bytes::{BufMut, BytesMut}; use err::Error; #[allow(unused)] @@ -35,15 +39,60 @@ where } } +pub fn bincode_ser( + w: W, +) -> bincode::Serializer< + W, + WithOtherTrailing< + WithOtherIntEncoding, FixintEncoding>, + RejectTrailing, + >, +> +where + W: std::io::Write, +{ + use bincode::Options; + let opts = DefaultOptions::new() + .with_little_endian() + .with_fixint_encoding() + .reject_trailing_bytes(); + let ser = bincode::Serializer::new(w, opts); + ser +} + +pub fn bincode_to_vec(item: S) -> Result, Error> +where + S: Serialize, +{ + let mut out = Vec::new(); + let mut ser = bincode_ser(&mut out); + item.serialize(&mut ser).map_err(|e| format!("{e}"))?; + Ok(out) +} + +pub fn bincode_from_slice(buf: &[u8]) -> Result +where + T: for<'de> serde::Deserialize<'de>, +{ + use bincode::Options; + let opts = DefaultOptions::new() + .with_little_endian() + .with_fixint_encoding() + .reject_trailing_bytes(); + let mut de = bincode::Deserializer::from_slice(buf, opts); + ::deserialize(&mut de).map_err(|e| format!("{e}").into()) +} + pub fn make_frame_2(item: &T, fty: u32) -> Result where T: erased_serde::Serialize, { trace!("make_frame_2 fty {:x}", fty); let mut out = Vec::new(); - let mut ser = rmp_serde::Serializer::new(&mut out); - let mut ser2 = ::erase(&mut ser); + let mut ser = bincode_ser(&mut out); + //let mut ser = rmp_serde::Serializer::new(&mut out).with_struct_map(); //let writer = ciborium::ser::into_writer(&item, &mut out).unwrap(); + let mut ser2 = ::erase(&mut ser); match item.erased_serialize(&mut ser2) { Ok(_) => { let enc = out; @@ -77,7 +126,7 @@ where // TODO remove duplication for these similar `make_*_frame` functions: pub fn make_error_frame(error: &::err::Error) -> Result { - match rmp_serde::to_vec(error) { + match bincode_to_vec(error) { Ok(enc) => { let mut h = crc32fast::Hasher::new(); h.update(&enc); @@ -99,12 +148,14 @@ pub fn make_error_frame(error: &::err::Error) -> Result { //trace!("frame_crc {}", frame_crc); Ok(buf) } - Err(e) => Err(e.ec())?, + Err(e) => Err(e)?, } } +// TODO can I remove this usage? pub fn make_log_frame(item: &LogItem) -> Result { - match rmp_serde::to_vec(item) { + warn!("make_log_frame {item:?}"); + match bincode_to_vec(item) { Ok(enc) => { let mut h = crc32fast::Hasher::new(); h.update(&enc); @@ -113,6 +164,7 @@ pub fn make_log_frame(item: &LogItem) -> Result { buf.put_u32_le(INMEM_FRAME_MAGIC); buf.put_u32_le(INMEM_FRAME_ENCID); buf.put_u32_le(LOG_FRAME_TYPE_ID); + warn!("make_log_frame payload len {}", enc.len()); buf.put_u32_le(enc.len() as u32); buf.put_u32_le(payload_crc); // TODO add padding to align to 8 bytes. @@ -123,12 +175,12 @@ pub fn make_log_frame(item: &LogItem) -> Result { buf.put_u32_le(frame_crc); Ok(buf) } - Err(e) => Err(e.ec())?, + Err(e) => Err(e)?, } } pub fn make_stats_frame(item: &StatsItem) -> Result { - match rmp_serde::to_vec(item) { + match bincode_to_vec(item) { Ok(enc) => { let mut h = crc32fast::Hasher::new(); h.update(&enc); @@ -147,7 +199,7 @@ pub fn make_stats_frame(item: &StatsItem) -> Result { buf.put_u32_le(frame_crc); Ok(buf) } - Err(e) => Err(e.ec())?, + Err(e) => Err(e)?, } } @@ -207,38 +259,53 @@ where ))); } if frame.tyid() == ERROR_FRAME_TYPE_ID { - let k: ::err::Error = match rmp_serde::from_slice(frame.buf()) { + let k: ::err::Error = match bincode_from_slice(frame.buf()) { Ok(item) => item, Err(e) => { error!("ERROR deserialize len {} ERROR_FRAME_TYPE_ID", frame.buf().len()); let n = frame.buf().len().min(128); let s = String::from_utf8_lossy(&frame.buf()[..n]); error!("frame.buf as string: {:?}", s); - Err(e.ec())? + Err(e)? } }; Ok(T::from_error(k)) } else if frame.tyid() == LOG_FRAME_TYPE_ID { - let k: LogItem = match rmp_serde::from_slice(frame.buf()) { + let k: LogItem = match bincode_from_slice(frame.buf()) { Ok(item) => item, Err(e) => { error!("ERROR deserialize len {} LOG_FRAME_TYPE_ID", frame.buf().len()); let n = frame.buf().len().min(128); let s = String::from_utf8_lossy(&frame.buf()[..n]); error!("frame.buf as string: {:?}", s); - Err(e.ec())? + Err(e)? } }; Ok(T::from_log(k)) + } else if frame.tyid() == LOG_FRAME_TYPE_ID { + let _: crate::Sitemty<()> = match bincode_from_slice(frame.buf()) { + Ok(item) => { + error!("GOOD DECODE OF A FULL LOG FRAME SITEMTY {item:?}"); + item + } + Err(e) => { + error!("ERROR deserialize len {} LOG_FRAME_TYPE_ID", frame.buf().len()); + let n = frame.buf().len().min(128); + let s = String::from_utf8_lossy(&frame.buf()[..n]); + error!("frame.buf as string: {:?}", s); + Err(e)? + } + }; + Err(Error::with_msg_no_trace("BAD")) } else if frame.tyid() == STATS_FRAME_TYPE_ID { - let k: StatsItem = match rmp_serde::from_slice(frame.buf()) { + let k: StatsItem = match bincode_from_slice(frame.buf()) { Ok(item) => item, Err(e) => { error!("ERROR deserialize len {} STATS_FRAME_TYPE_ID", frame.buf().len()); let n = frame.buf().len().min(128); let s = String::from_utf8_lossy(&frame.buf()[..n]); error!("frame.buf as string: {:?}", s); - Err(e.ec())? + Err(e)? } }; Ok(T::from_stats(k)) @@ -255,14 +322,14 @@ where frame ))) } else { - match rmp_serde::from_slice(frame.buf()) { + match bincode_from_slice(frame.buf()) { Ok(item) => Ok(item), Err(e) => { error!("ERROR deserialize len {} tyid {:x}", frame.buf().len(), frame.tyid()); let n = frame.buf().len().min(64); let s = String::from_utf8_lossy(&frame.buf()[..n]); error!("frame.buf as string: {:?}", s); - Err(e.ec())? + Err(e)? } } } diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index fac071b..89a40db 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -40,7 +40,7 @@ pub struct BodyStream { pub inner: Box> + Send + Unpin>, } -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)] pub enum ScalarType { U8, U16, @@ -56,6 +56,18 @@ pub enum ScalarType { STRING, } +impl fmt::Debug for ScalarType { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.to_variant_str()) + } +} + +impl fmt::Display for ScalarType { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.to_variant_str()) + } +} + impl Serialize for ScalarType { fn serialize(&self, ser: S) -> Result where @@ -347,9 +359,9 @@ pub struct Node { pub host: String, // TODO for `listen` and the ports, would be great to allow a default on Cluster level. pub listen: String, - #[serde(deserialize_with = "port_from_any")] + #[serde(deserialize_with = "serde_port::port_from_any")] pub port: u16, - #[serde(deserialize_with = "port_from_any")] + #[serde(deserialize_with = "serde_port::port_from_any")] pub port_raw: u16, pub cache_base_path: PathBuf, pub sf_databuffer: Option, @@ -358,58 +370,78 @@ pub struct Node { pub prometheus_api_bind: Option, } -struct Visit1 {} +mod serde_port { + use super::*; -impl<'de> serde::de::Visitor<'de> for Visit1 { - type Value = u16; + struct Vis; - fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "a tcp port number, in numeric or string form.") - } + impl<'de> serde::de::Visitor<'de> for Vis { + type Value = u16; - fn visit_u64(self, v: u64) -> Result - where - E: serde::de::Error, - { - if v > u16::MAX as u64 { - Err(serde::de::Error::invalid_type( - serde::de::Unexpected::Unsigned(v), - &self, - )) - } else { - self.visit_i64(v as i64) + fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "a tcp port number, in numeric or string form.") + } + + fn visit_u64(self, val: u64) -> Result + where + E: serde::de::Error, + { + if val > u16::MAX as u64 { + Err(serde::de::Error::invalid_type( + serde::de::Unexpected::Unsigned(val), + &self, + )) + } else { + self.visit_i64(val as i64) + } + } + + fn visit_i64(self, val: i64) -> Result + where + E: serde::de::Error, + { + if val < 1 || val > u16::MAX as i64 { + Err(serde::de::Error::invalid_type( + serde::de::Unexpected::Signed(val), + &self, + )) + } else { + Ok(val as u16) + } + } + + fn visit_str(self, val: &str) -> Result + where + E: serde::de::Error, + { + match val.parse::() { + Err(_) => Err(serde::de::Error::invalid_type(serde::de::Unexpected::Str(val), &self)), + Ok(v) => Ok(v), + } } } - fn visit_i64(self, v: i64) -> Result + pub fn port_from_any<'de, D>(de: D) -> Result where - E: serde::de::Error, + D: serde::Deserializer<'de>, { - if v < 1 || v > u16::MAX as i64 { - Err(serde::de::Error::invalid_type(serde::de::Unexpected::Signed(v), &self)) - } else { - Ok(v as u16) - } + de.deserialize_any(Vis) } - fn visit_str(self, v: &str) -> Result - where - E: serde::de::Error, - { - match v.parse::() { - Err(_) => Err(serde::de::Error::invalid_type(serde::de::Unexpected::Str(v), &self)), - Ok(v) => Ok(v), + #[test] + fn test_port_from_any() { + #[derive(Deserialize)] + struct Conf { + #[serde(deserialize_with = "port_from_any")] + port: u16, } + let conf: Conf = serde_json::from_str(r#"{"port":"9192"}"#).unwrap(); + assert_eq!(conf.port, 9192); + let conf: Conf = serde_json::from_str(r#"{"port":9194}"#).unwrap(); + assert_eq!(conf.port, 9194); } } -fn port_from_any<'de, D>(de: D) -> Result -where - D: serde::Deserializer<'de>, -{ - de.deserialize_any(Visit1 {}) -} - impl Node { // TODO needed? Could `sf_databuffer` be None? pub fn dummy() -> Self { @@ -775,88 +807,95 @@ pub enum Shape { Image(u32, u32), } -impl Serialize for Shape { - fn serialize(&self, ser: S) -> Result - where - S::Error: serde::ser::Error, - { - use Shape::*; - match self { - Scalar => ser.collect_seq([0u32; 0].iter()), - Wave(a) => ser.collect_seq([*a].iter()), - Image(a, b) => ser.collect_seq([*a, *b].iter()), - } - } -} +mod serde_shape { + use super::*; -struct ShapeVis; - -impl<'de> serde::de::Visitor<'de> for ShapeVis { - type Value = Shape; - - fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.write_str("a string describing the Shape variant") - } - - fn visit_str(self, v: &str) -> Result - where - E: serde::de::Error, - { - if v == "Scalar" { - Ok(Shape::Scalar) - } else { - Err(E::custom(format!("unexpected value: {v:?}"))) + impl Serialize for Shape { + fn serialize(&self, ser: S) -> Result + where + S::Error: serde::ser::Error, + { + use Shape::*; + match self { + Scalar => ser.collect_seq([0u32; 0].iter()), + Wave(a) => ser.collect_seq([*a].iter()), + Image(a, b) => ser.collect_seq([*a, *b].iter()), + } } } - fn visit_map(self, mut map: A) -> Result - where - A: serde::de::MapAccess<'de>, - { - use serde::de::Error; - if let Some(key) = map.next_key::()? { - if key == "Wave" { - let n: u32 = map.next_value()?; - Ok(Shape::Wave(n)) - } else if key == "Image" { - let a = map.next_value::<[u32; 2]>()?; + struct ShapeVis; + + impl<'de> serde::de::Visitor<'de> for ShapeVis { + type Value = Shape; + + fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.write_str("a vector describing the shape") + } + + // TODO unused, do not support deser from any for Shape + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + if v == "Scalar" { + Ok(Shape::Scalar) + } else { + Err(E::custom(format!("unexpected value: {v:?}"))) + } + } + + // TODO unused, do not support deser from any for Shape + fn visit_map(self, mut map: A) -> Result + where + A: serde::de::MapAccess<'de>, + { + use serde::de::Error; + if let Some(key) = map.next_key::()? { + if key == "Wave" { + let n: u32 = map.next_value()?; + Ok(Shape::Wave(n)) + } else if key == "Image" { + let a = map.next_value::<[u32; 2]>()?; + Ok(Shape::Image(a[0], a[1])) + } else { + Err(A::Error::custom(format!("unexpected key {key:?}"))) + } + } else { + Err(A::Error::custom(format!("invalid shape format"))) + } + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: serde::de::SeqAccess<'de>, + { + let mut a = vec![]; + while let Some(item) = seq.next_element()? { + let n: u32 = item; + a.push(n); + } + if a.len() == 0 { + Ok(Shape::Scalar) + } else if a.len() == 1 { + Ok(Shape::Wave(a[0])) + } else if a.len() == 2 { Ok(Shape::Image(a[0], a[1])) } else { - Err(A::Error::custom(format!("unexpected key {key:?}"))) + use serde::de::Error; + Err(A::Error::custom(format!("bad shape"))) } - } else { - Err(A::Error::custom(format!("invalid shape format"))) } } - fn visit_seq(self, mut seq: A) -> Result - where - A: serde::de::SeqAccess<'de>, - { - let mut a = vec![]; - while let Some(item) = seq.next_element()? { - let n: u32 = item; - a.push(n); + impl<'de> Deserialize<'de> for Shape { + fn deserialize(de: D) -> Result + where + D: serde::Deserializer<'de>, + { + let res = de.deserialize_seq(ShapeVis); + res } - if a.len() == 0 { - Ok(Shape::Scalar) - } else if a.len() == 1 { - Ok(Shape::Wave(a[0])) - } else if a.len() == 2 { - Ok(Shape::Image(a[0], a[1])) - } else { - use serde::de::Error; - Err(A::Error::custom(format!("bad shape"))) - } - } -} - -impl<'de> Deserialize<'de> for Shape { - fn deserialize(de: D) -> Result - where - D: serde::Deserializer<'de>, - { - de.deserialize_any(ShapeVis) } } @@ -994,22 +1033,18 @@ fn test_shape_serde() { assert_eq!(s, r#"{"Wave":8}"#); let s = serde_json::to_string(&ShapeOld::Image(42, 43)).unwrap(); assert_eq!(s, r#"{"Image":[42,43]}"#); - let s = serde_json::from_str::(r#""Scalar""#).unwrap(); + let s: ShapeOld = serde_json::from_str(r#""Scalar""#).unwrap(); assert_eq!(s, ShapeOld::Scalar); - let s = serde_json::from_str::(r#"{"Wave": 123}"#).unwrap(); + let s: ShapeOld = serde_json::from_str(r#"{"Wave": 123}"#).unwrap(); assert_eq!(s, ShapeOld::Wave(123)); - let s = serde_json::from_str::(r#"{"Image":[77, 78]}"#).unwrap(); + let s: ShapeOld = serde_json::from_str(r#"{"Image":[77, 78]}"#).unwrap(); assert_eq!(s, ShapeOld::Image(77, 78)); - let s = serde_json::from_str::(r#"[]"#).unwrap(); + let s: Shape = serde_json::from_str(r#"[]"#).unwrap(); assert_eq!(s, Shape::Scalar); - let s = serde_json::from_str::(r#"[12]"#).unwrap(); + let s: Shape = serde_json::from_str(r#"[12]"#).unwrap(); assert_eq!(s, Shape::Wave(12)); - let s = serde_json::from_str::(r#"[12, 13]"#).unwrap(); + let s: Shape = serde_json::from_str(r#"[12, 13]"#).unwrap(); assert_eq!(s, Shape::Image(12, 13)); - let s = serde_json::from_str::(r#""Scalar""#).unwrap(); - assert_eq!(s, Shape::Scalar); - let s = serde_json::from_str::(r#"{"Wave":55}"#).unwrap(); - assert_eq!(s, Shape::Wave(55)); } pub trait HasShape {