Discard zero len frames if any

This commit is contained in:
Dominik Werder
2025-03-21 14:45:01 +01:00
parent 7b644ed054
commit 17f2c87c34
4 changed files with 34 additions and 20 deletions

View File

@@ -75,7 +75,7 @@ where
RangeCompletableItem::Data(evs) => {
let val = evs.into_user_facing_api_type();
let val = val.into_serializable_normal();
let mut buf = Vec::with_capacity(64);
let mut buf = Vec::with_capacity(1024);
ciborium::into_writer(&val, &mut buf).map_err(|e| Error::Msg(e.to_string()))?;
let bytes = Bytes::from(buf);
let item = CborBytes::new(bytes);
@@ -87,7 +87,7 @@ where
"rangeFinal" => true,
})
.map_err(|e| Error::Msg(e.to_string()))?;
let mut buf = Vec::with_capacity(64);
let mut buf = Vec::with_capacity(1024);
ciborium::into_writer(&val, &mut buf).map_err(|e| Error::Msg(e.to_string()))?;
let bytes = Bytes::from(buf);
let item = CborBytes::new(bytes);
@@ -127,7 +127,7 @@ where
"error" => e.to_string(),
})
.map_err(|e| Error::Msg(e.to_string()))?;
let mut buf = Vec::with_capacity(64);
let mut buf = Vec::with_capacity(1024);
ciborium::into_writer(&item, &mut buf).map_err(|e| Error::Msg(e.to_string()))?;
let bytes = Bytes::from(buf);
let item = CborBytes::new(bytes);
@@ -142,7 +142,7 @@ fn make_keepalive() -> Result<CborBytes, Error> {
"type" => "keepalive",
})
.map_err(ErrMsg)?;
let mut buf = Vec::with_capacity(64);
let mut buf = Vec::with_capacity(128);
ciborium::into_writer(&item, &mut buf).map_err(ErrMsg)?;
let bytes = Bytes::from(buf);
let item = Ok(CborBytes::new(bytes));

View File

@@ -7,6 +7,12 @@ use futures_util::stream;
use futures_util::Stream;
use futures_util::StreamExt;
fn pad_to_8(buf: &mut Vec<u8>) {
let npadded = (7 + buf.len()) & (!0x7);
let npad = npadded - buf.len();
buf.extend_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0][..npad]);
}
pub fn bytes_chunks_to_framed<S, T, E>(stream: S) -> impl Stream<Item = Result<Bytes, E>>
where
S: Stream<Item = Result<T, E>>,
@@ -18,14 +24,20 @@ where
.flat_map(|x| match x {
Ok(y) => {
let buf = y.into();
let adv = (buf.len() + 7) / 8 * 8;
let pad = adv - buf.len();
let mut b2 = BytesMut::with_capacity(16);
b2.put_u32_le(buf.len() as u32);
b2.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
let mut b3 = BytesMut::with_capacity(16);
b3.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0][..pad]);
stream::iter([Ok(b2.freeze()), Ok(buf), Ok(b3.freeze())])
let n = buf.len();
if n == 0 {
info!("skip framing of zero sized chunk");
stream::iter([Ok(Bytes::new()), Ok(Bytes::new()), Ok(Bytes::new())])
} else {
let adv = (n + 7) & (!0x7);
let pad = adv - n;
let mut b2 = BytesMut::with_capacity(16);
b2.put_u32_le(n as u32);
b2.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
let mut b3 = BytesMut::with_capacity(16);
b3.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0][..pad]);
stream::iter([Ok(b2.freeze()), Ok(buf), Ok(b3.freeze())])
}
}
Err(e) => {
error!("{}", e);

View File

@@ -12,12 +12,13 @@ use netpod::ReqCtx;
use query::api4::events::PlainEventsQuery;
use std::pin::Pin;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "PlainEventsStream")]
pub enum Error {
Netpod(#[from] netpod::Error),
TcpRawClient(#[from] crate::tcprawclient::Error),
}
autoerr::create_error_v1!(
name(Error, "PlainEventsStream"),
enum variants {
Netpod(#[from] netpod::Error),
TcpRawClient(#[from] crate::tcprawclient::Error),
},
);
pub type ChannelEventsStream = Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>;

View File

@@ -371,9 +371,10 @@ fn take_collector_result_cbor(
) -> Option<CborBytes> {
match coll.result() {
Ok(collres) => {
trace!("take_collector_result_cbor len {}", collres.len());
let x = collres.into_user_facing_api_type_box();
let val = x.into_serializable_normal();
let mut buf = Vec::with_capacity(64);
let mut buf = Vec::with_capacity(1024);
ciborium::into_writer(&val, &mut buf).expect("cbor serialize");
let bytes = Bytes::from(buf);
let item = CborBytes::new(bytes);
@@ -386,7 +387,7 @@ fn take_collector_result_cbor(
"ERROR" => true,
})
.unwrap();
let mut buf = Vec::with_capacity(64);
let mut buf = Vec::with_capacity(1024);
ciborium::into_writer(&val, &mut buf).expect("cbor serialize");
let bytes = Bytes::from(buf);
let item = CborBytes::new(bytes);