diff --git a/disk/src/cache.rs b/disk/src/cache.rs index af0b8d8..6e08338 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -131,14 +131,17 @@ impl Stream for BinnedBytesForHttpStream { match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(k))) => { // TODO optimize this... - let mut buf = BytesMut::with_capacity(4); - buf.resize(4, 0); + const HEAD: usize = super::raw::INMEM_FRAME_HEAD; + let mut buf = BytesMut::with_capacity(HEAD); + buf.resize(HEAD, 0); let k = k.serialized(); info!("BinnedBytesForHttpStream serialized slice has len {}", k.len()); let n1 = k.len(); buf.put_slice(&k); - assert!(buf.len() == k.len() + 4); - buf.as_mut().put_u32_le(n1 as u32); + assert!(buf.len() == n1 + HEAD); + buf[0..4].as_mut().put_u32_le(super::raw::INMEM_FRAME_MAGIC); + buf[4..8].as_mut().put_u32_le(n1 as u32); + buf[8..12].as_mut().put_u32_le(0); info!("BinnedBytesForHttpStream emit buf len {}", buf.len()); Ready(Some(Ok(buf.freeze()))) } @@ -214,11 +217,15 @@ impl Stream for PreBinnedValueByteStream { } match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(k))) => { + // TODO optimize this + const HEAD: usize = super::raw::INMEM_FRAME_HEAD; let buf = k.serialized(); let n1 = buf.len(); self.left = Some(buf); - let mut buf2 = BytesMut::with_capacity(4); + let mut buf2 = BytesMut::with_capacity(HEAD); + buf2.put_u32_le(super::raw::INMEM_FRAME_MAGIC); buf2.put_u32_le(n1 as u32); + buf2.put_u32_le(0); Ready(Some(Ok(buf2.freeze()))) } Ready(Some(Err(e))) => Ready(Some(Err(e))), diff --git a/disk/src/lib.rs b/disk/src/lib.rs index a24d503..d223d28 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -13,7 +13,6 @@ use std::sync::Arc; use std::task::{Context, Poll}; use tokio::fs::{File, OpenOptions}; use tokio::io::AsyncRead; - #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; diff --git a/disk/src/raw.rs b/disk/src/raw.rs index 61a8828..e189cf2 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -42,8 +42,15 @@ pub async fn x_processed_stream_from_node( let qjs = serde_json::to_vec(query.as_ref())?; debug!("x_processed_stream_from_node qjs len {}", qjs.len()); let (netin, mut netout) = net.into_split(); + + // TODO this incorrect magic MUST bubble up into the final result and be reported. + + netout.write_u32_le(INMEM_FRAME_MAGIC - 1).await?; netout.write_u32_le(qjs.len() as u32).await?; + netout.write_u32_le(0).await?; netout.write_all(&qjs).await?; + netout.write_u32_le(INMEM_FRAME_MAGIC).await?; + netout.write_u32_le(0).await?; netout.write_u32_le(0).await?; netout.flush().await?; netout.forget(); @@ -100,6 +107,9 @@ where } } +pub const INMEM_FRAME_HEAD: usize = 12; +pub const INMEM_FRAME_MAGIC: u32 = 0xc6c3b73d; + /** Interprets a byte stream as length-delimited frames. @@ -138,12 +148,25 @@ where } fn tryparse(&mut self) -> Option>> { + const HEAD: usize = INMEM_FRAME_HEAD; let mut buf = std::mem::replace(&mut self.buf, BytesMut::new()); - if self.wp >= 4 { - let len = u32::from_le_bytes(*arrayref::array_ref![buf, 0, 4]); + if self.wp >= HEAD { + let magic = u32::from_le_bytes(*arrayref::array_ref![buf, 0, 4]); + let len = u32::from_le_bytes(*arrayref::array_ref![buf, 4, 4]); + let _tyid = u32::from_le_bytes(*arrayref::array_ref![buf, 8, 4]); + if magic != INMEM_FRAME_MAGIC { + error!("InMemoryFrameAsyncReadStream tryparse incorrect magic: {}", magic); + return Some(Some(Err(Error::with_msg(format!( + "InMemoryFrameAsyncReadStream tryparse incorrect magic: {}", + magic + ))))); + } if len == 0 { - warn!("InMemoryFrameAsyncReadStream tryparse ✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ STOP FRAME self.wp {}", self.wp); - if self.wp != 4 { + warn!( + "InMemoryFrameAsyncReadStream tryparse STOP FRAME self.wp {}", + self.wp + ); + if self.wp != HEAD { return Some(Some(Err(Error::with_msg(format!( "InMemoryFrameAsyncReadStream tryparse unexpected amount left {}", self.wp @@ -156,17 +179,14 @@ where warn!("InMemoryFrameAsyncReadStream big len received {}", len); } if len > 1024 * 1024 * 2 { - error!( - "??????????????????????????? {} ????????????????????????????????????????????", - len - ); + error!("InMemoryFrameAsyncReadStream too long len {}", len); return Some(Some(Err(Error::with_msg(format!( "InMemoryFrameAsyncReadStream tryparse hug buffer len {} self.inp_bytes_consumed {}", len, self.inp_bytes_consumed ))))); } assert!(len > 0 && len < 1024 * 512); - let nl = len as usize + 4; + let nl = len as usize + HEAD; if buf.capacity() < nl { buf.resize(nl, 0); } else { @@ -179,7 +199,7 @@ where buf3.resize(buf3.capacity(), 0); use bytes::Buf; buf.truncate(nl); - buf.advance(4); + buf.advance(HEAD); self.wp = self.wp - nl; self.buf = buf3; self.inp_bytes_consumed += buf.len() as u64 + 4; @@ -331,9 +351,13 @@ async fn raw_conn_handler_inner(stream: TcpStream, addr: SocketAddr) -> Result<( let mut s1 = futures_util::stream::iter(vec![batch]); while let Some(item) = s1.next().await { let fr = item.serialized(); + netout.write_u32_le(INMEM_FRAME_MAGIC).await?; netout.write_u32_le(fr.len() as u32).await?; + netout.write_u32_le(0).await?; netout.write(fr.as_ref()).await?; } + netout.write_u32_le(INMEM_FRAME_MAGIC).await?; + netout.write_u32_le(0).await?; netout.write_u32_le(0).await?; netout.flush().await?; netout.forget(); diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 03ce887..45c9f8e 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -1,3 +1,6 @@ +#[doc(inline)] +pub use std; + use chrono::{DateTime, TimeZone, Utc}; use err::Error; use serde::{Deserialize, Serialize};