|
|
|
|
@@ -2,6 +2,10 @@ use crate::inmem::InMemoryFrame;
|
|
|
|
|
use crate::{ContainsError, FrameDecodable, FrameType, LogItem, StatsItem};
|
|
|
|
|
use crate::{ERROR_FRAME_TYPE_ID, INMEM_FRAME_ENCID, INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC};
|
|
|
|
|
use crate::{LOG_FRAME_TYPE_ID, RANGE_COMPLETE_FRAME_TYPE_ID, STATS_FRAME_TYPE_ID, TERM_FRAME_TYPE_ID};
|
|
|
|
|
use bincode::config::{
|
|
|
|
|
FixintEncoding, LittleEndian, RejectTrailing, WithOtherEndian, WithOtherIntEncoding, WithOtherTrailing,
|
|
|
|
|
};
|
|
|
|
|
use bincode::DefaultOptions;
|
|
|
|
|
use bytes::{BufMut, BytesMut};
|
|
|
|
|
use err::Error;
|
|
|
|
|
#[allow(unused)]
|
|
|
|
|
@@ -35,15 +39,60 @@ where
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn bincode_ser<W>(
|
|
|
|
|
w: W,
|
|
|
|
|
) -> bincode::Serializer<
|
|
|
|
|
W,
|
|
|
|
|
WithOtherTrailing<
|
|
|
|
|
WithOtherIntEncoding<WithOtherEndian<DefaultOptions, LittleEndian>, FixintEncoding>,
|
|
|
|
|
RejectTrailing,
|
|
|
|
|
>,
|
|
|
|
|
>
|
|
|
|
|
where
|
|
|
|
|
W: std::io::Write,
|
|
|
|
|
{
|
|
|
|
|
use bincode::Options;
|
|
|
|
|
let opts = DefaultOptions::new()
|
|
|
|
|
.with_little_endian()
|
|
|
|
|
.with_fixint_encoding()
|
|
|
|
|
.reject_trailing_bytes();
|
|
|
|
|
let ser = bincode::Serializer::new(w, opts);
|
|
|
|
|
ser
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn bincode_to_vec<S>(item: S) -> Result<Vec<u8>, Error>
|
|
|
|
|
where
|
|
|
|
|
S: Serialize,
|
|
|
|
|
{
|
|
|
|
|
let mut out = Vec::new();
|
|
|
|
|
let mut ser = bincode_ser(&mut out);
|
|
|
|
|
item.serialize(&mut ser).map_err(|e| format!("{e}"))?;
|
|
|
|
|
Ok(out)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn bincode_from_slice<T>(buf: &[u8]) -> Result<T, Error>
|
|
|
|
|
where
|
|
|
|
|
T: for<'de> serde::Deserialize<'de>,
|
|
|
|
|
{
|
|
|
|
|
use bincode::Options;
|
|
|
|
|
let opts = DefaultOptions::new()
|
|
|
|
|
.with_little_endian()
|
|
|
|
|
.with_fixint_encoding()
|
|
|
|
|
.reject_trailing_bytes();
|
|
|
|
|
let mut de = bincode::Deserializer::from_slice(buf, opts);
|
|
|
|
|
<T as serde::Deserialize>::deserialize(&mut de).map_err(|e| format!("{e}").into())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn make_frame_2<T>(item: &T, fty: u32) -> Result<BytesMut, Error>
|
|
|
|
|
where
|
|
|
|
|
T: erased_serde::Serialize,
|
|
|
|
|
{
|
|
|
|
|
trace!("make_frame_2 fty {:x}", fty);
|
|
|
|
|
let mut out = Vec::new();
|
|
|
|
|
let mut ser = rmp_serde::Serializer::new(&mut out);
|
|
|
|
|
let mut ser2 = <dyn erased_serde::Serializer>::erase(&mut ser);
|
|
|
|
|
let mut ser = bincode_ser(&mut out);
|
|
|
|
|
//let mut ser = rmp_serde::Serializer::new(&mut out).with_struct_map();
|
|
|
|
|
//let writer = ciborium::ser::into_writer(&item, &mut out).unwrap();
|
|
|
|
|
let mut ser2 = <dyn erased_serde::Serializer>::erase(&mut ser);
|
|
|
|
|
match item.erased_serialize(&mut ser2) {
|
|
|
|
|
Ok(_) => {
|
|
|
|
|
let enc = out;
|
|
|
|
|
@@ -77,7 +126,7 @@ where
|
|
|
|
|
// TODO remove duplication for these similar `make_*_frame` functions:
|
|
|
|
|
|
|
|
|
|
pub fn make_error_frame(error: &::err::Error) -> Result<BytesMut, Error> {
|
|
|
|
|
match rmp_serde::to_vec(error) {
|
|
|
|
|
match bincode_to_vec(error) {
|
|
|
|
|
Ok(enc) => {
|
|
|
|
|
let mut h = crc32fast::Hasher::new();
|
|
|
|
|
h.update(&enc);
|
|
|
|
|
@@ -99,12 +148,14 @@ pub fn make_error_frame(error: &::err::Error) -> Result<BytesMut, Error> {
|
|
|
|
|
//trace!("frame_crc {}", frame_crc);
|
|
|
|
|
Ok(buf)
|
|
|
|
|
}
|
|
|
|
|
Err(e) => Err(e.ec())?,
|
|
|
|
|
Err(e) => Err(e)?,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TODO can I remove this usage?
|
|
|
|
|
pub fn make_log_frame(item: &LogItem) -> Result<BytesMut, Error> {
|
|
|
|
|
match rmp_serde::to_vec(item) {
|
|
|
|
|
warn!("make_log_frame {item:?}");
|
|
|
|
|
match bincode_to_vec(item) {
|
|
|
|
|
Ok(enc) => {
|
|
|
|
|
let mut h = crc32fast::Hasher::new();
|
|
|
|
|
h.update(&enc);
|
|
|
|
|
@@ -113,6 +164,7 @@ pub fn make_log_frame(item: &LogItem) -> Result<BytesMut, Error> {
|
|
|
|
|
buf.put_u32_le(INMEM_FRAME_MAGIC);
|
|
|
|
|
buf.put_u32_le(INMEM_FRAME_ENCID);
|
|
|
|
|
buf.put_u32_le(LOG_FRAME_TYPE_ID);
|
|
|
|
|
warn!("make_log_frame payload len {}", enc.len());
|
|
|
|
|
buf.put_u32_le(enc.len() as u32);
|
|
|
|
|
buf.put_u32_le(payload_crc);
|
|
|
|
|
// TODO add padding to align to 8 bytes.
|
|
|
|
|
@@ -123,12 +175,12 @@ pub fn make_log_frame(item: &LogItem) -> Result<BytesMut, Error> {
|
|
|
|
|
buf.put_u32_le(frame_crc);
|
|
|
|
|
Ok(buf)
|
|
|
|
|
}
|
|
|
|
|
Err(e) => Err(e.ec())?,
|
|
|
|
|
Err(e) => Err(e)?,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn make_stats_frame(item: &StatsItem) -> Result<BytesMut, Error> {
|
|
|
|
|
match rmp_serde::to_vec(item) {
|
|
|
|
|
match bincode_to_vec(item) {
|
|
|
|
|
Ok(enc) => {
|
|
|
|
|
let mut h = crc32fast::Hasher::new();
|
|
|
|
|
h.update(&enc);
|
|
|
|
|
@@ -147,7 +199,7 @@ pub fn make_stats_frame(item: &StatsItem) -> Result<BytesMut, Error> {
|
|
|
|
|
buf.put_u32_le(frame_crc);
|
|
|
|
|
Ok(buf)
|
|
|
|
|
}
|
|
|
|
|
Err(e) => Err(e.ec())?,
|
|
|
|
|
Err(e) => Err(e)?,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -207,38 +259,53 @@ where
|
|
|
|
|
)));
|
|
|
|
|
}
|
|
|
|
|
if frame.tyid() == ERROR_FRAME_TYPE_ID {
|
|
|
|
|
let k: ::err::Error = match rmp_serde::from_slice(frame.buf()) {
|
|
|
|
|
let k: ::err::Error = match bincode_from_slice(frame.buf()) {
|
|
|
|
|
Ok(item) => item,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
error!("ERROR deserialize len {} ERROR_FRAME_TYPE_ID", frame.buf().len());
|
|
|
|
|
let n = frame.buf().len().min(128);
|
|
|
|
|
let s = String::from_utf8_lossy(&frame.buf()[..n]);
|
|
|
|
|
error!("frame.buf as string: {:?}", s);
|
|
|
|
|
Err(e.ec())?
|
|
|
|
|
Err(e)?
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
Ok(T::from_error(k))
|
|
|
|
|
} else if frame.tyid() == LOG_FRAME_TYPE_ID {
|
|
|
|
|
let k: LogItem = match rmp_serde::from_slice(frame.buf()) {
|
|
|
|
|
let k: LogItem = match bincode_from_slice(frame.buf()) {
|
|
|
|
|
Ok(item) => item,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
error!("ERROR deserialize len {} LOG_FRAME_TYPE_ID", frame.buf().len());
|
|
|
|
|
let n = frame.buf().len().min(128);
|
|
|
|
|
let s = String::from_utf8_lossy(&frame.buf()[..n]);
|
|
|
|
|
error!("frame.buf as string: {:?}", s);
|
|
|
|
|
Err(e.ec())?
|
|
|
|
|
Err(e)?
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
Ok(T::from_log(k))
|
|
|
|
|
} else if frame.tyid() == LOG_FRAME_TYPE_ID {
|
|
|
|
|
let _: crate::Sitemty<()> = match bincode_from_slice(frame.buf()) {
|
|
|
|
|
Ok(item) => {
|
|
|
|
|
error!("GOOD DECODE OF A FULL LOG FRAME SITEMTY {item:?}");
|
|
|
|
|
item
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
error!("ERROR deserialize len {} LOG_FRAME_TYPE_ID", frame.buf().len());
|
|
|
|
|
let n = frame.buf().len().min(128);
|
|
|
|
|
let s = String::from_utf8_lossy(&frame.buf()[..n]);
|
|
|
|
|
error!("frame.buf as string: {:?}", s);
|
|
|
|
|
Err(e)?
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
Err(Error::with_msg_no_trace("BAD"))
|
|
|
|
|
} else if frame.tyid() == STATS_FRAME_TYPE_ID {
|
|
|
|
|
let k: StatsItem = match rmp_serde::from_slice(frame.buf()) {
|
|
|
|
|
let k: StatsItem = match bincode_from_slice(frame.buf()) {
|
|
|
|
|
Ok(item) => item,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
error!("ERROR deserialize len {} STATS_FRAME_TYPE_ID", frame.buf().len());
|
|
|
|
|
let n = frame.buf().len().min(128);
|
|
|
|
|
let s = String::from_utf8_lossy(&frame.buf()[..n]);
|
|
|
|
|
error!("frame.buf as string: {:?}", s);
|
|
|
|
|
Err(e.ec())?
|
|
|
|
|
Err(e)?
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
Ok(T::from_stats(k))
|
|
|
|
|
@@ -255,14 +322,14 @@ where
|
|
|
|
|
frame
|
|
|
|
|
)))
|
|
|
|
|
} else {
|
|
|
|
|
match rmp_serde::from_slice(frame.buf()) {
|
|
|
|
|
match bincode_from_slice(frame.buf()) {
|
|
|
|
|
Ok(item) => Ok(item),
|
|
|
|
|
Err(e) => {
|
|
|
|
|
error!("ERROR deserialize len {} tyid {:x}", frame.buf().len(), frame.tyid());
|
|
|
|
|
let n = frame.buf().len().min(64);
|
|
|
|
|
let s = String::from_utf8_lossy(&frame.buf()[..n]);
|
|
|
|
|
error!("frame.buf as string: {:?}", s);
|
|
|
|
|
Err(e.ec())?
|
|
|
|
|
Err(e)?
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|