Refactor remote stream decode
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use crate::slidebuf::SlideBuf;
|
||||
use bytes::Bytes;
|
||||
use err::Error;
|
||||
use futures_util::{pin_mut, Stream};
|
||||
use items::inmem::InMemoryFrame;
|
||||
@@ -9,23 +10,26 @@ use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use tokio::io::{AsyncRead, ReadBuf};
|
||||
|
||||
/**
|
||||
Interprets a byte stream as length-delimited frames.
|
||||
impl err::ToErr for crate::slidebuf::Error {
|
||||
fn to_err(self) -> Error {
|
||||
Error::with_msg_no_trace(format!("{self}"))
|
||||
}
|
||||
}
|
||||
|
||||
Emits each frame as a single item. Therefore, each item must fit easily into memory.
|
||||
*/
|
||||
/// Interprets a byte stream as length-delimited frames.
|
||||
///
|
||||
/// Emits each frame as a single item. Therefore, each item must fit easily into memory.
|
||||
pub struct InMemoryFrameAsyncReadStream<T>
|
||||
where
|
||||
T: AsyncRead + Unpin,
|
||||
{
|
||||
inp: T,
|
||||
buf: BytesMut,
|
||||
bufcap: usize,
|
||||
wp: usize,
|
||||
tryparse: bool,
|
||||
errored: bool,
|
||||
completed: bool,
|
||||
buf: SlideBuf,
|
||||
need_min: usize,
|
||||
done: bool,
|
||||
complete: bool,
|
||||
inp_bytes_consumed: u64,
|
||||
npoll: u64,
|
||||
}
|
||||
|
||||
impl<T> InMemoryFrameAsyncReadStream<T>
|
||||
@@ -33,182 +37,110 @@ where
|
||||
T: AsyncRead + Unpin,
|
||||
{
|
||||
pub fn new(inp: T, bufcap: usize) -> Self {
|
||||
let mut t = Self {
|
||||
Self {
|
||||
inp,
|
||||
buf: BytesMut::new(),
|
||||
bufcap,
|
||||
wp: 0,
|
||||
tryparse: false,
|
||||
errored: false,
|
||||
completed: false,
|
||||
buf: SlideBuf::new(bufcap),
|
||||
need_min: INMEM_FRAME_HEAD,
|
||||
done: false,
|
||||
complete: false,
|
||||
inp_bytes_consumed: 0,
|
||||
};
|
||||
t.buf = t.empty_buf();
|
||||
t
|
||||
}
|
||||
|
||||
fn empty_buf(&self) -> BytesMut {
|
||||
let mut buf = BytesMut::with_capacity(self.bufcap);
|
||||
buf.resize(buf.capacity(), 0);
|
||||
buf
|
||||
npoll: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_upstream(&mut self, cx: &mut Context) -> Poll<Result<usize, Error>> {
|
||||
if true || self.wp > 0 {
|
||||
let mut bnew = self.empty_buf();
|
||||
assert!(self.buf.len() >= self.wp);
|
||||
assert!(bnew.capacity() >= self.wp);
|
||||
trace!(
|
||||
"InMemoryFrameAsyncReadStream re-use {} bytes from previous i/o",
|
||||
self.wp,
|
||||
);
|
||||
bnew[..].as_mut().put_slice(&self.buf[..self.wp]);
|
||||
self.buf = bnew;
|
||||
}
|
||||
trace!("prepare read from wp {} self.buf.len() {}", self.wp, self.buf.len(),);
|
||||
let gg = self.buf.len() - self.wp;
|
||||
let mut buf2 = ReadBuf::new(&mut self.buf[self.wp..]);
|
||||
if gg < 1 || gg > 1024 * 1024 * 40 {
|
||||
use bytes::Buf;
|
||||
panic!(
|
||||
"have gg {} len {} cap {} rem {} rem mut {} self.wp {}",
|
||||
gg,
|
||||
self.buf.len(),
|
||||
self.buf.capacity(),
|
||||
self.buf.remaining(),
|
||||
self.buf.remaining_mut(),
|
||||
self.wp,
|
||||
);
|
||||
}
|
||||
assert!(buf2.remaining() == gg);
|
||||
assert!(buf2.capacity() == gg);
|
||||
assert!(buf2.filled().len() == 0);
|
||||
let j = &mut self.inp;
|
||||
pin_mut!(j);
|
||||
trace!("poll_upstream");
|
||||
use Poll::*;
|
||||
match AsyncRead::poll_read(j, cx, &mut buf2) {
|
||||
Ready(Ok(_)) => {
|
||||
let n1 = buf2.filled().len();
|
||||
trace!("InMemoryFrameAsyncReadStream READ {} FROM UPSTREAM", n1);
|
||||
Ready(Ok(n1))
|
||||
let mut buf = ReadBuf::new(self.buf.available_writable_area(self.need_min - self.buf.len())?);
|
||||
let inp = &mut self.inp;
|
||||
pin_mut!(inp);
|
||||
match AsyncRead::poll_read(inp, cx, &mut buf) {
|
||||
Ready(Ok(())) => {
|
||||
let n = buf.filled().len();
|
||||
self.buf.wadv(n)?;
|
||||
trace!("InMemoryFrameAsyncReadStream READ {} FROM UPSTREAM", n);
|
||||
Ready(Ok(n))
|
||||
}
|
||||
Ready(Err(e)) => Ready(Err(e.into())),
|
||||
Pending => Pending,
|
||||
}
|
||||
}
|
||||
|
||||
fn tryparse(
|
||||
&mut self,
|
||||
buf: BytesMut,
|
||||
wp: usize,
|
||||
) -> (Option<Option<Result<InMemoryFrame, Error>>>, BytesMut, usize) {
|
||||
let nb = wp;
|
||||
if nb >= INMEM_FRAME_HEAD {
|
||||
let magic = u32::from_le_bytes(*arrayref::array_ref![buf, 0, 4]);
|
||||
let encid = u32::from_le_bytes(*arrayref::array_ref![buf, 4, 4]);
|
||||
let tyid = u32::from_le_bytes(*arrayref::array_ref![buf, 8, 4]);
|
||||
let len = u32::from_le_bytes(*arrayref::array_ref![buf, 12, 4]);
|
||||
if magic != INMEM_FRAME_MAGIC {
|
||||
let z = nb.min(256);
|
||||
let u = String::from_utf8_lossy(&buf[0..z]);
|
||||
let e = Error::with_msg("INCORRECT MAGIC");
|
||||
error!("incorrect magic buf as utf8: {:?} error: {:?}", u, e);
|
||||
let msg = format!(
|
||||
"InMemoryFrameAsyncReadStream tryparse incorrect magic: {} buf as utf8: {:?}",
|
||||
magic, u
|
||||
);
|
||||
error!("{}", msg);
|
||||
return (Some(Some(Err(Error::with_msg(format!("{}", msg))))), buf, wp);
|
||||
}
|
||||
if len == 0 {
|
||||
if nb != INMEM_FRAME_HEAD + INMEM_FRAME_FOOT {
|
||||
warn!("stop-frame with nb {}", nb);
|
||||
}
|
||||
(Some(None), buf, wp)
|
||||
} else {
|
||||
if len > 1024 * 1024 * 50 {
|
||||
let msg = format!(
|
||||
"InMemoryFrameAsyncReadStream tryparse huge buffer len {} self.inp_bytes_consumed {}",
|
||||
len, self.inp_bytes_consumed
|
||||
);
|
||||
error!("{}", msg);
|
||||
return (Some(Some(Err(Error::with_msg(msg)))), buf, wp);
|
||||
} else if len > 1024 * 1024 * 1 {
|
||||
// TODO
|
||||
//warn!("InMemoryFrameAsyncReadStream big len received {}", len);
|
||||
}
|
||||
let nl = len as usize + INMEM_FRAME_HEAD + INMEM_FRAME_FOOT;
|
||||
if self.bufcap < nl {
|
||||
// TODO count cases in production
|
||||
let n = 2 * nl;
|
||||
debug!("Adjust bufcap old {} new {}", self.bufcap, n);
|
||||
self.bufcap = n;
|
||||
}
|
||||
if nb < nl {
|
||||
(None, buf, wp)
|
||||
} else {
|
||||
use bytes::Buf;
|
||||
let mut h = crc32fast::Hasher::new();
|
||||
h.update(&buf[..(nl - INMEM_FRAME_FOOT)]);
|
||||
let frame_crc = h.finalize();
|
||||
let mut h = crc32fast::Hasher::new();
|
||||
h.update(&buf[INMEM_FRAME_HEAD..(nl - INMEM_FRAME_FOOT)]);
|
||||
let payload_crc = h.finalize();
|
||||
let frame_crc_ind =
|
||||
u32::from_le_bytes(*arrayref::array_ref![buf, INMEM_FRAME_HEAD + len as usize, 4]);
|
||||
let payload_crc_ind = u32::from_le_bytes(*arrayref::array_ref![buf, 16, 4]);
|
||||
//info!("len {}", len);
|
||||
//info!("payload_crc_ind {}", payload_crc_ind);
|
||||
//info!("frame_crc_ind {}", frame_crc_ind);
|
||||
let payload_crc_match = payload_crc_ind == payload_crc;
|
||||
let frame_crc_match = frame_crc_ind == frame_crc;
|
||||
if !frame_crc_match || !payload_crc_match {
|
||||
let ss = String::from_utf8_lossy(&buf[..buf.len().min(256)]);
|
||||
warn!("CRC mismatch A frame_crc_match {frame_crc_match} payload_crc_match {payload_crc_match}\n{ss:?}");
|
||||
return (
|
||||
Some(Some(Err(Error::with_msg(format!(
|
||||
"InMemoryFrameAsyncReadStream tryparse crc mismatch A {} {}",
|
||||
payload_crc_match, frame_crc_match,
|
||||
))))),
|
||||
buf,
|
||||
wp,
|
||||
);
|
||||
}
|
||||
let mut buf = buf;
|
||||
let mut buf3 = buf.split_to(nl);
|
||||
let buf = buf;
|
||||
buf3.advance(INMEM_FRAME_HEAD);
|
||||
buf3.truncate(len as usize);
|
||||
let mut h = crc32fast::Hasher::new();
|
||||
h.update(&buf3);
|
||||
let payload_crc_2 = h.finalize();
|
||||
let payload_crc_2_match = payload_crc_2 == payload_crc_ind;
|
||||
if !payload_crc_2_match {
|
||||
let sa = String::from_utf8_lossy(&buf[..buf.len().min(256)]);
|
||||
let sb = String::from_utf8_lossy(&buf3[..buf3.len().min(256)]);
|
||||
warn!("CRC mismatch B\n{sa:?}\n{sb:?}");
|
||||
return (
|
||||
Some(Some(Err(Error::with_msg(format!(
|
||||
"InMemoryFrameAsyncReadStream tryparse crc mismatch B {} {} {}",
|
||||
payload_crc_match, frame_crc_match, payload_crc_2_match,
|
||||
))))),
|
||||
buf,
|
||||
wp,
|
||||
);
|
||||
}
|
||||
self.inp_bytes_consumed += nl as u64;
|
||||
let ret = InMemoryFrame {
|
||||
len,
|
||||
tyid,
|
||||
encid,
|
||||
buf: buf3.freeze(),
|
||||
};
|
||||
(Some(Some(Ok(ret))), buf, wp - nl)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
(None, buf, wp)
|
||||
// Try to parse a frame.
|
||||
// Update the need_min to the most current state.
|
||||
// If successful, return item and number of bytes consumed.
|
||||
// Must only be called when at least `need_min` bytes are available.
|
||||
fn parse(&mut self) -> Result<Option<InMemoryFrame>, Error> {
|
||||
trace!("parse");
|
||||
let buf = self.buf.data();
|
||||
if buf.len() < self.need_min {
|
||||
return Err(Error::with_msg_no_trace("expect at least need_min"));
|
||||
}
|
||||
if buf.len() < INMEM_FRAME_HEAD {
|
||||
return Err(Error::with_msg_no_trace("expect at least enough bytes for the header"));
|
||||
}
|
||||
let magic = u32::from_le_bytes(buf[0..4].try_into()?);
|
||||
let encid = u32::from_le_bytes(buf[4..8].try_into()?);
|
||||
let tyid = u32::from_le_bytes(buf[8..12].try_into()?);
|
||||
let len = u32::from_le_bytes(buf[12..16].try_into()?);
|
||||
let payload_crc_exp = u32::from_le_bytes(buf[16..20].try_into()?);
|
||||
if magic != INMEM_FRAME_MAGIC {
|
||||
let n = buf.len().min(64);
|
||||
let u = String::from_utf8_lossy(&buf[0..n]);
|
||||
let msg = format!(
|
||||
"InMemoryFrameAsyncReadStream tryparse incorrect magic: {} buf as utf8: {:?}",
|
||||
magic, u
|
||||
);
|
||||
error!("{msg}");
|
||||
return Err(Error::with_msg(msg));
|
||||
}
|
||||
if len > 1024 * 1024 * 50 {
|
||||
let msg = format!(
|
||||
"InMemoryFrameAsyncReadStream tryparse huge buffer len {} self.inp_bytes_consumed {}",
|
||||
len, self.inp_bytes_consumed
|
||||
);
|
||||
error!("{msg}");
|
||||
return Err(Error::with_msg(msg));
|
||||
}
|
||||
let lentot = INMEM_FRAME_HEAD + INMEM_FRAME_FOOT + len as usize;
|
||||
if buf.len() < lentot {
|
||||
// TODO count cases in production
|
||||
self.need_min = lentot;
|
||||
return Ok(None);
|
||||
}
|
||||
let p1 = INMEM_FRAME_HEAD + len as usize;
|
||||
let mut h = crc32fast::Hasher::new();
|
||||
h.update(&buf[..p1]);
|
||||
let frame_crc = h.finalize();
|
||||
let mut h = crc32fast::Hasher::new();
|
||||
h.update(&buf[INMEM_FRAME_HEAD..p1]);
|
||||
let payload_crc = h.finalize();
|
||||
let frame_crc_ind = u32::from_le_bytes(buf[p1..p1 + 4].try_into()?);
|
||||
//info!("len {}", len);
|
||||
//info!("payload_crc_ind {}", payload_crc_ind);
|
||||
//info!("frame_crc_ind {}", frame_crc_ind);
|
||||
let payload_crc_match = payload_crc_exp == payload_crc;
|
||||
let frame_crc_match = frame_crc_ind == frame_crc;
|
||||
if !frame_crc_match || !payload_crc_match {
|
||||
let _ss = String::from_utf8_lossy(&buf[..buf.len().min(256)]);
|
||||
let msg = format!(
|
||||
"InMemoryFrameAsyncReadStream tryparse crc mismatch A {} {}",
|
||||
payload_crc_match, frame_crc_match,
|
||||
);
|
||||
error!("{msg}");
|
||||
let e = Error::with_msg_no_trace(msg);
|
||||
return Err(e);
|
||||
}
|
||||
self.inp_bytes_consumed += lentot as u64;
|
||||
let ret = InMemoryFrame {
|
||||
len,
|
||||
tyid,
|
||||
encid,
|
||||
buf: Bytes::from(buf[INMEM_FRAME_HEAD..p1].to_vec()),
|
||||
};
|
||||
self.buf.adv(lentot)?;
|
||||
self.need_min = INMEM_FRAME_HEAD;
|
||||
Ok(Some(ret))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -220,67 +152,57 @@ where
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
assert!(!self.completed);
|
||||
if self.errored {
|
||||
self.completed = true;
|
||||
return Ready(None);
|
||||
trace!("poll");
|
||||
self.npoll += 1;
|
||||
if self.npoll > 2000 {
|
||||
panic!()
|
||||
}
|
||||
'outer: loop {
|
||||
if self.tryparse {
|
||||
let r = {
|
||||
let buf = std::mem::replace(&mut self.buf, BytesMut::new());
|
||||
let wp = self.wp;
|
||||
let (r, buf, wp) = self.tryparse(buf, wp);
|
||||
self.buf = buf;
|
||||
self.wp = wp;
|
||||
r
|
||||
};
|
||||
break match r {
|
||||
None => {
|
||||
self.tryparse = false;
|
||||
continue 'outer;
|
||||
loop {
|
||||
break if self.complete {
|
||||
panic!("poll_next on complete")
|
||||
} else if self.done {
|
||||
self.complete = true;
|
||||
Ready(None)
|
||||
} else if self.buf.len() >= self.need_min {
|
||||
match self.parse() {
|
||||
Ok(None) => {
|
||||
if self.buf.len() >= self.need_min {
|
||||
self.done = true;
|
||||
let e = Error::with_msg_no_trace("enough bytes but nothing parsed");
|
||||
Ready(Some(Err(e)))
|
||||
} else {
|
||||
debug!("not enouh for parse, need to wait for more");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Some(None) => {
|
||||
self.tryparse = false;
|
||||
self.completed = true;
|
||||
Ready(None)
|
||||
}
|
||||
Some(Some(Ok(item))) => Ready(Some(Ok(StreamItem::DataItem(item)))),
|
||||
Some(Some(Err(e))) => {
|
||||
self.tryparse = false;
|
||||
self.errored = true;
|
||||
Ok(Some(item)) => Ready(Some(Ok(StreamItem::DataItem(item)))),
|
||||
Err(e) => {
|
||||
self.done = true;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
};
|
||||
}
|
||||
} else {
|
||||
let r = self.poll_upstream(cx);
|
||||
break match r {
|
||||
match self.poll_upstream(cx) {
|
||||
Ready(Ok(n1)) => {
|
||||
self.wp += n1;
|
||||
debug!("read {n1}");
|
||||
if n1 == 0 {
|
||||
let n2 = self.buf.len();
|
||||
if n2 != 0 {
|
||||
// TODO anything more to handle here?
|
||||
debug!(
|
||||
"InMemoryFrameAsyncReadStream n2 != 0 n2 {} consumed {}",
|
||||
n2, self.inp_bytes_consumed
|
||||
);
|
||||
}
|
||||
self.completed = true;
|
||||
Ready(None)
|
||||
self.done = true;
|
||||
continue;
|
||||
} else {
|
||||
self.tryparse = true;
|
||||
continue 'outer;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
trace!("poll_upstream GIVES Error");
|
||||
self.errored = true;
|
||||
Ready(Some(Err(e.into())))
|
||||
error!("poll_upstream {e:?}");
|
||||
self.done = true;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
Pending => Pending,
|
||||
};
|
||||
}
|
||||
Pending => {
|
||||
debug!("PENDING");
|
||||
Pending
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user