diff --git a/src/cbor_stream.rs b/src/cbor_stream.rs index 245bf77..67cad49 100644 --- a/src/cbor_stream.rs +++ b/src/cbor_stream.rs @@ -75,7 +75,7 @@ where RangeCompletableItem::Data(evs) => { let val = evs.into_user_facing_api_type(); let val = val.into_serializable_normal(); - let mut buf = Vec::with_capacity(64); + let mut buf = Vec::with_capacity(1024); ciborium::into_writer(&val, &mut buf).map_err(|e| Error::Msg(e.to_string()))?; let bytes = Bytes::from(buf); let item = CborBytes::new(bytes); @@ -87,7 +87,7 @@ where "rangeFinal" => true, }) .map_err(|e| Error::Msg(e.to_string()))?; - let mut buf = Vec::with_capacity(64); + let mut buf = Vec::with_capacity(1024); ciborium::into_writer(&val, &mut buf).map_err(|e| Error::Msg(e.to_string()))?; let bytes = Bytes::from(buf); let item = CborBytes::new(bytes); @@ -127,7 +127,7 @@ where "error" => e.to_string(), }) .map_err(|e| Error::Msg(e.to_string()))?; - let mut buf = Vec::with_capacity(64); + let mut buf = Vec::with_capacity(1024); ciborium::into_writer(&item, &mut buf).map_err(|e| Error::Msg(e.to_string()))?; let bytes = Bytes::from(buf); let item = CborBytes::new(bytes); @@ -142,7 +142,7 @@ fn make_keepalive() -> Result { "type" => "keepalive", }) .map_err(ErrMsg)?; - let mut buf = Vec::with_capacity(64); + let mut buf = Vec::with_capacity(128); ciborium::into_writer(&item, &mut buf).map_err(ErrMsg)?; let bytes = Bytes::from(buf); let item = Ok(CborBytes::new(bytes)); diff --git a/src/lenframe.rs b/src/lenframe.rs index 2e1bdea..751654b 100644 --- a/src/lenframe.rs +++ b/src/lenframe.rs @@ -7,6 +7,12 @@ use futures_util::stream; use futures_util::Stream; use futures_util::StreamExt; +fn pad_to_8(buf: &mut Vec) { + let npadded = (7 + buf.len()) & (!0x7); + let npad = npadded - buf.len(); + buf.extend_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0][..npad]); +} + pub fn bytes_chunks_to_framed(stream: S) -> impl Stream> where S: Stream>, @@ -18,14 +24,20 @@ where .flat_map(|x| match x { Ok(y) => { let buf = y.into(); - let adv = (buf.len() + 7) / 8 * 8; - let pad = adv - buf.len(); - let mut b2 = BytesMut::with_capacity(16); - b2.put_u32_le(buf.len() as u32); - b2.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]); - let mut b3 = BytesMut::with_capacity(16); - b3.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0][..pad]); - stream::iter([Ok(b2.freeze()), Ok(buf), Ok(b3.freeze())]) + let n = buf.len(); + if n == 0 { + info!("skip framing of zero sized chunk"); + stream::iter([Ok(Bytes::new()), Ok(Bytes::new()), Ok(Bytes::new())]) + } else { + let adv = (n + 7) & (!0x7); + let pad = adv - n; + let mut b2 = BytesMut::with_capacity(16); + b2.put_u32_le(n as u32); + b2.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]); + let mut b3 = BytesMut::with_capacity(16); + b3.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0][..pad]); + stream::iter([Ok(b2.freeze()), Ok(buf), Ok(b3.freeze())]) + } } Err(e) => { error!("{}", e); diff --git a/src/plaineventsstream.rs b/src/plaineventsstream.rs index 260e921..5b87938 100644 --- a/src/plaineventsstream.rs +++ b/src/plaineventsstream.rs @@ -12,12 +12,13 @@ use netpod::ReqCtx; use query::api4::events::PlainEventsQuery; use std::pin::Pin; -#[derive(Debug, thiserror::Error)] -#[cstm(name = "PlainEventsStream")] -pub enum Error { - Netpod(#[from] netpod::Error), - TcpRawClient(#[from] crate::tcprawclient::Error), -} +autoerr::create_error_v1!( + name(Error, "PlainEventsStream"), + enum variants { + Netpod(#[from] netpod::Error), + TcpRawClient(#[from] crate::tcprawclient::Error), + }, +); pub type ChannelEventsStream = Pin> + Send>>; diff --git a/src/timebinnedjson.rs b/src/timebinnedjson.rs index 47cfe17..27ff3fa 100644 --- a/src/timebinnedjson.rs +++ b/src/timebinnedjson.rs @@ -371,9 +371,10 @@ fn take_collector_result_cbor( ) -> Option { match coll.result() { Ok(collres) => { + trace!("take_collector_result_cbor len {}", collres.len()); let x = collres.into_user_facing_api_type_box(); let val = x.into_serializable_normal(); - let mut buf = Vec::with_capacity(64); + let mut buf = Vec::with_capacity(1024); ciborium::into_writer(&val, &mut buf).expect("cbor serialize"); let bytes = Bytes::from(buf); let item = CborBytes::new(bytes); @@ -386,7 +387,7 @@ fn take_collector_result_cbor( "ERROR" => true, }) .unwrap(); - let mut buf = Vec::with_capacity(64); + let mut buf = Vec::with_capacity(1024); ciborium::into_writer(&val, &mut buf).expect("cbor serialize"); let bytes = Bytes::from(buf); let item = CborBytes::new(bytes);