WIP on parsing

This commit is contained in:
Dominik Werder
2021-04-26 12:22:36 +02:00
parent 832cc1747e
commit 01f5244c21
2 changed files with 148 additions and 67 deletions
+139 -67
View File
@@ -128,6 +128,7 @@ where
{ {
inp: T, inp: T,
buf: BytesMut, buf: BytesMut,
bufcap: usize,
wp: usize, wp: usize,
tryparse: bool, tryparse: bool,
errored: bool, errored: bool,
@@ -140,88 +141,158 @@ where
T: AsyncRead + Unpin, T: AsyncRead + Unpin,
{ {
pub fn new(inp: T) -> Self { pub fn new(inp: T) -> Self {
// TODO make start cap adjustable // TODO make capacity adjustable.
let mut buf = BytesMut::with_capacity(1024); let bufcap = 512;
buf.resize(buf.capacity(), 0); let mut t = Self {
Self {
inp, inp,
buf, buf: BytesMut::new(),
bufcap: bufcap,
wp: 0, wp: 0,
tryparse: false, tryparse: false,
errored: false, errored: false,
completed: false, completed: false,
inp_bytes_consumed: 0, inp_bytes_consumed: 0,
};
t.buf = t.empty_buf();
t
}
fn empty_buf(&self) -> BytesMut {
let mut buf = BytesMut::with_capacity(self.bufcap);
buf.resize(buf.capacity(), 0);
buf
}
fn poll_upstream(&mut self, cx: &mut Context) -> Poll<Result<usize, Error>> {
if self.wp > 0 {
// TODO copy only if we gain capacity in the current buffer.
// Also copy if the bufcap got increased: how to find out with BytesMut? Question about how capacity is defined exactly...
// Avoid copies after e.g. after a previous Pending.
let mut bnew = self.empty_buf();
assert!(self.buf.len() >= self.wp);
assert!(bnew.capacity() >= self.wp);
info!(
"InMemoryFrameAsyncReadStream re-use {} bytes from previous i/o",
self.wp
);
bnew.put(&self.buf[..self.wp]);
self.buf = bnew;
}
info!(
".............. PREPARE READ FROM wp {} self.buf.len() {}",
self.wp,
self.buf.len(),
);
let gg = self.buf.len() - self.wp;
let mut buf2 = ReadBuf::new(&mut self.buf[self.wp..]);
assert!(gg > 0);
assert!(buf2.remaining() == gg);
assert!(buf2.capacity() == gg);
assert!(buf2.filled().len() == 0);
let j = &mut self.inp;
pin_mut!(j);
use Poll::*;
match AsyncRead::poll_read(j, cx, &mut buf2) {
Ready(Ok(_)) => {
let n1 = buf2.filled().len();
info!("InMemoryFrameAsyncReadStream READ {} FROM UPSTREAM", n1);
Ready(Ok(n1))
}
Ready(Err(e)) => Ready(Err(e.into())),
Pending => Pending,
} }
} }
fn tryparse(&mut self) -> Option<Option<Result<InMemoryFrame, Error>>> { fn tryparse(
&mut self,
buf: BytesMut,
wp: usize,
) -> (Option<Option<Result<InMemoryFrame, Error>>>, BytesMut, usize) {
info!(
"+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ tryparse with buf.len() {} wp {}",
buf.len(),
wp
);
const HEAD: usize = INMEM_FRAME_HEAD; const HEAD: usize = INMEM_FRAME_HEAD;
let mut buf = std::mem::replace(&mut self.buf, BytesMut::new()); let mut buf = buf;
if self.wp >= HEAD { let nb = wp;
if nb >= HEAD {
let magic = u32::from_le_bytes(*arrayref::array_ref![buf, 0, 4]); let magic = u32::from_le_bytes(*arrayref::array_ref![buf, 0, 4]);
let encid = u32::from_le_bytes(*arrayref::array_ref![buf, 4, 4]); let encid = u32::from_le_bytes(*arrayref::array_ref![buf, 4, 4]);
let tyid = u32::from_le_bytes(*arrayref::array_ref![buf, 8, 4]); let tyid = u32::from_le_bytes(*arrayref::array_ref![buf, 8, 4]);
let len = u32::from_le_bytes(*arrayref::array_ref![buf, 12, 4]); let len = u32::from_le_bytes(*arrayref::array_ref![buf, 12, 4]);
if magic != INMEM_FRAME_MAGIC { if magic != INMEM_FRAME_MAGIC {
error!("InMemoryFrameAsyncReadStream tryparse incorrect magic: {}", magic); error!("InMemoryFrameAsyncReadStream tryparse incorrect magic: {}", magic);
return Some(Some(Err(Error::with_msg(format!( return (
"InMemoryFrameAsyncReadStream tryparse incorrect magic: {}", Some(Some(Err(Error::with_msg(format!(
magic "InMemoryFrameAsyncReadStream tryparse incorrect magic: {}",
))))); magic
))))),
buf,
wp,
);
} }
info!("\n\ntryparse len {}\n\n", len);
if len == 0 { if len == 0 {
if self.wp != HEAD { if nb != HEAD {
return Some(Some(Err(Error::with_msg(format!( return (
"InMemoryFrameAsyncReadStream tryparse unexpected amount left {}", Some(Some(Err(Error::with_msg(format!(
self.wp "InMemoryFrameAsyncReadStream tryparse unexpected amount left {}",
))))); nb
))))),
buf,
wp,
);
} }
self.buf = buf; (Some(None), buf, wp)
Some(None)
} else { } else {
if len > 1024 * 32 { if len > 1024 * 32 {
warn!("InMemoryFrameAsyncReadStream big len received {}", len); warn!("InMemoryFrameAsyncReadStream big len received {}", len);
} }
if len > 1024 * 1024 * 2 { if len > 1024 * 1024 * 2 {
error!("InMemoryFrameAsyncReadStream too long len {}", len); error!("InMemoryFrameAsyncReadStream too long len {}", len);
return Some(Some(Err(Error::with_msg(format!( return (
"InMemoryFrameAsyncReadStream tryparse hug buffer len {} self.inp_bytes_consumed {}", Some(Some(Err(Error::with_msg(format!(
len, self.inp_bytes_consumed "InMemoryFrameAsyncReadStream tryparse huge buffer len {} self.inp_bytes_consumed {}",
))))); len, self.inp_bytes_consumed
))))),
buf,
wp,
);
}
if len == 0 && len > 1024 * 512 {
return (
Some(Some(Err(Error::with_msg(format!(
"InMemoryFrameAsyncReadStream tryparse len {} self.inp_bytes_consumed {}",
len, self.inp_bytes_consumed
))))),
buf,
wp,
);
} }
assert!(len > 0 && len < 1024 * 512);
let nl = len as usize + HEAD; let nl = len as usize + HEAD;
if buf.capacity() < nl { if self.bufcap < nl {
buf.resize(nl, 0); // TODO count cases in production
} else { self.bufcap += 2 * nl;
// nothing to do
} }
if self.wp >= nl { if nb >= nl {
let mut buf3 = BytesMut::with_capacity(buf.capacity());
// TODO make stats of copied bytes and warn if ratio is too bad.
buf3.put(buf[nl..self.wp].as_ref());
buf3.resize(buf3.capacity(), 0);
use bytes::Buf; use bytes::Buf;
buf.truncate(nl); let mut buf3 = buf.split_to(nl);
buf.advance(HEAD); buf3.advance(HEAD);
self.wp = self.wp - nl;
self.buf = buf3;
self.inp_bytes_consumed += nl as u64; self.inp_bytes_consumed += nl as u64;
let ret = InMemoryFrame { let ret = InMemoryFrame {
len, len,
tyid, tyid,
encid, encid,
buf: buf.freeze(), buf: buf3.freeze(),
}; };
Some(Some(Ok(ret))) (Some(Some(Ok(ret))), buf, wp - nl)
} else { } else {
self.buf = buf; (None, buf, wp)
None
} }
} }
} else { } else {
self.buf = buf; (None, buf, wp)
None
} }
} }
} }
@@ -255,6 +326,7 @@ where
type Item = Result<InMemoryFrame, Error>; type Item = Result<InMemoryFrame, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
info!("InMemoryFrameAsyncReadStream poll_next");
use Poll::*; use Poll::*;
assert!(!self.completed); assert!(!self.completed);
if self.errored { if self.errored {
@@ -263,7 +335,14 @@ where
} }
'outer: loop { 'outer: loop {
if self.tryparse { if self.tryparse {
let r = self.tryparse(); let r = {
let buf = std::mem::replace(&mut self.buf, BytesMut::new());
let wp = self.wp;
let (r, buf, wp) = self.tryparse(buf, wp);
self.buf = buf;
self.wp = wp;
r
};
break match r { break match r {
None => { None => {
self.tryparse = false; self.tryparse = false;
@@ -282,40 +361,33 @@ where
} }
}; };
} else { } else {
let mut buf0 = std::mem::replace(&mut self.buf, BytesMut::new()); let r = self.poll_upstream(cx);
if buf0.as_mut().len() != buf0.capacity() { break match r {
error!("------- {} {}", buf0.as_mut().len(), buf0.capacity()); Ready(Ok(n1)) => {
panic!(); info!("poll_upstream GIVES Ready {}", n1);
} self.wp += n1;
let mut buf2 = ReadBuf::new(buf0.as_mut()[self.wp..].as_mut());
assert!(buf2.filled().len() == 0);
assert!(buf2.capacity() > 0);
assert!(buf2.remaining() > 0);
let j = &mut self.inp;
pin_mut!(j);
break match AsyncRead::poll_read(j, cx, &mut buf2) {
Ready(Ok(())) => {
let n1 = buf2.filled().len();
if n1 == 0 { if n1 == 0 {
if self.wp != 0 { let n2 = self.buf.len();
error!( if n2 != 0 {
"InMemoryFrameAsyncReadStream self.wp != 0 wp {} consumed {}", warn!(
self.wp, self.inp_bytes_consumed "InMemoryFrameAsyncReadStream n2 != 0 n2 {} consumed {} ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~",
n2, self.inp_bytes_consumed
); );
} }
self.buf = buf0;
self.completed = true; self.completed = true;
Ready(None) Ready(None)
} else { } else {
self.wp += n1;
self.buf = buf0;
self.tryparse = true; self.tryparse = true;
continue 'outer; continue 'outer;
} }
} }
Ready(Err(e)) => Ready(Some(Err(e.into()))), Ready(Err(e)) => {
info!("poll_upstream GIVES Error");
self.errored = true;
Ready(Some(Err(e.into())))
}
Pending => { Pending => {
self.buf = buf0; info!("poll_upstream GIVES Pending");
Pending Pending
} }
}; };
+9
View File
@@ -1,4 +1,5 @@
use crate::spawn_test_hosts; use crate::spawn_test_hosts;
use bytes::{BufMut, BytesMut};
use chrono::Utc; use chrono::Utc;
use err::Error; use err::Error;
use futures_util::TryStreamExt; use futures_util::TryStreamExt;
@@ -128,3 +129,11 @@ async fn get_cached_0_inner() -> Result<(), Error> {
//Err::<(), _>(format!("test error").into()) //Err::<(), _>(format!("test error").into())
Ok(()) Ok(())
} }
#[test]
fn bufs() {
let mut buf = BytesMut::with_capacity(1024);
assert!(buf.as_mut().len() == 0);
buf.put_u32_le(123);
assert!(buf.as_mut().len() == 4);
}