Insert frame head magic

This commit is contained in:
Dominik Werder
2021-04-23 07:26:10 +02:00
parent 2b28b99f90
commit 70a4cb8d42
4 changed files with 49 additions and 16 deletions

View File

@@ -131,14 +131,17 @@ impl Stream for BinnedBytesForHttpStream {
match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
// TODO optimize this...
let mut buf = BytesMut::with_capacity(4);
buf.resize(4, 0);
const HEAD: usize = super::raw::INMEM_FRAME_HEAD;
let mut buf = BytesMut::with_capacity(HEAD);
buf.resize(HEAD, 0);
let k = k.serialized();
info!("BinnedBytesForHttpStream serialized slice has len {}", k.len());
let n1 = k.len();
buf.put_slice(&k);
assert!(buf.len() == k.len() + 4);
buf.as_mut().put_u32_le(n1 as u32);
assert!(buf.len() == n1 + HEAD);
buf[0..4].as_mut().put_u32_le(super::raw::INMEM_FRAME_MAGIC);
buf[4..8].as_mut().put_u32_le(n1 as u32);
buf[8..12].as_mut().put_u32_le(0);
info!("BinnedBytesForHttpStream emit buf len {}", buf.len());
Ready(Some(Ok(buf.freeze())))
}
@@ -214,11 +217,15 @@ impl Stream for PreBinnedValueByteStream {
}
match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
// TODO optimize this
const HEAD: usize = super::raw::INMEM_FRAME_HEAD;
let buf = k.serialized();
let n1 = buf.len();
self.left = Some(buf);
let mut buf2 = BytesMut::with_capacity(4);
let mut buf2 = BytesMut::with_capacity(HEAD);
buf2.put_u32_le(super::raw::INMEM_FRAME_MAGIC);
buf2.put_u32_le(n1 as u32);
buf2.put_u32_le(0);
Ready(Some(Ok(buf2.freeze())))
}
Ready(Some(Err(e))) => Ready(Some(Err(e))),

View File

@@ -13,7 +13,6 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::fs::{File, OpenOptions};
use tokio::io::AsyncRead;
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};

View File

@@ -42,8 +42,15 @@ pub async fn x_processed_stream_from_node(
let qjs = serde_json::to_vec(query.as_ref())?;
debug!("x_processed_stream_from_node qjs len {}", qjs.len());
let (netin, mut netout) = net.into_split();
// TODO this incorrect magic MUST bubble up into the final result and be reported.
netout.write_u32_le(INMEM_FRAME_MAGIC - 1).await?;
netout.write_u32_le(qjs.len() as u32).await?;
netout.write_u32_le(0).await?;
netout.write_all(&qjs).await?;
netout.write_u32_le(INMEM_FRAME_MAGIC).await?;
netout.write_u32_le(0).await?;
netout.write_u32_le(0).await?;
netout.flush().await?;
netout.forget();
@@ -100,6 +107,9 @@ where
}
}
pub const INMEM_FRAME_HEAD: usize = 12;
pub const INMEM_FRAME_MAGIC: u32 = 0xc6c3b73d;
/**
Interprets a byte stream as length-delimited frames.
@@ -138,12 +148,25 @@ where
}
fn tryparse(&mut self) -> Option<Option<Result<Bytes, Error>>> {
const HEAD: usize = INMEM_FRAME_HEAD;
let mut buf = std::mem::replace(&mut self.buf, BytesMut::new());
if self.wp >= 4 {
let len = u32::from_le_bytes(*arrayref::array_ref![buf, 0, 4]);
if self.wp >= HEAD {
let magic = u32::from_le_bytes(*arrayref::array_ref![buf, 0, 4]);
let len = u32::from_le_bytes(*arrayref::array_ref![buf, 4, 4]);
let _tyid = u32::from_le_bytes(*arrayref::array_ref![buf, 8, 4]);
if magic != INMEM_FRAME_MAGIC {
error!("InMemoryFrameAsyncReadStream tryparse incorrect magic: {}", magic);
return Some(Some(Err(Error::with_msg(format!(
"InMemoryFrameAsyncReadStream tryparse incorrect magic: {}",
magic
)))));
}
if len == 0 {
warn!("InMemoryFrameAsyncReadStream tryparse ✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ STOP FRAME self.wp {}", self.wp);
if self.wp != 4 {
warn!(
"InMemoryFrameAsyncReadStream tryparse STOP FRAME self.wp {}",
self.wp
);
if self.wp != HEAD {
return Some(Some(Err(Error::with_msg(format!(
"InMemoryFrameAsyncReadStream tryparse unexpected amount left {}",
self.wp
@@ -156,17 +179,14 @@ where
warn!("InMemoryFrameAsyncReadStream big len received {}", len);
}
if len > 1024 * 1024 * 2 {
error!(
"??????????????????????????? {} ????????????????????????????????????????????",
len
);
error!("InMemoryFrameAsyncReadStream too long len {}", len);
return Some(Some(Err(Error::with_msg(format!(
"InMemoryFrameAsyncReadStream tryparse hug buffer len {} self.inp_bytes_consumed {}",
len, self.inp_bytes_consumed
)))));
}
assert!(len > 0 && len < 1024 * 512);
let nl = len as usize + 4;
let nl = len as usize + HEAD;
if buf.capacity() < nl {
buf.resize(nl, 0);
} else {
@@ -179,7 +199,7 @@ where
buf3.resize(buf3.capacity(), 0);
use bytes::Buf;
buf.truncate(nl);
buf.advance(4);
buf.advance(HEAD);
self.wp = self.wp - nl;
self.buf = buf3;
self.inp_bytes_consumed += buf.len() as u64 + 4;
@@ -331,9 +351,13 @@ async fn raw_conn_handler_inner(stream: TcpStream, addr: SocketAddr) -> Result<(
let mut s1 = futures_util::stream::iter(vec![batch]);
while let Some(item) = s1.next().await {
let fr = item.serialized();
netout.write_u32_le(INMEM_FRAME_MAGIC).await?;
netout.write_u32_le(fr.len() as u32).await?;
netout.write_u32_le(0).await?;
netout.write(fr.as_ref()).await?;
}
netout.write_u32_le(INMEM_FRAME_MAGIC).await?;
netout.write_u32_le(0).await?;
netout.write_u32_le(0).await?;
netout.flush().await?;
netout.forget();