This commit is contained in:
Dominik Werder
2021-04-22 11:56:28 +02:00
parent caf91b460e
commit bd87323bf1
6 changed files with 251 additions and 148 deletions

View File

@@ -17,10 +17,10 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf};
use tokio::net::tcp::OwnedReadHalf;
use tokio::net::TcpStream;
use tracing::Instrument;
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, info, span, trace, warn, Level};
/**
Query parameters to request (optionally) X-processed, but not T-processed events.
@@ -37,15 +37,15 @@ pub async fn x_processed_stream_from_node(
node: Arc<Node>,
) -> Result<Pin<Box<dyn Stream<Item = Result<MinMaxAvgScalarEventBatch, Error>> + Send>>, Error> {
debug!("x_processed_stream_from_node ENTER");
let mut net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?;
let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?;
debug!("x_processed_stream_from_node CONNECTED");
let qjs = serde_json::to_vec(query.as_ref())?;
debug!("x_processed_stream_from_node qjs len {}", qjs.len());
net.write_u32_le(qjs.len() as u32).await?;
net.write_all(&qjs).await?;
net.write_u32_le(0).await?;
net.flush().await?;
let (netin, netout) = net.into_split();
let (netin, mut netout) = net.into_split();
netout.write_u32_le(qjs.len() as u32).await?;
netout.write_all(&qjs).await?;
netout.write_u32_le(0).await?;
netout.flush().await?;
netout.forget();
debug!("x_processed_stream_from_node WRITTEN");
let frames = InMemoryFrameAsyncReadStream::new(netin);
@@ -112,6 +112,8 @@ where
inp: T,
buf: BytesMut,
wp: usize,
tryparse: bool,
stopped: bool,
}
impl<T> InMemoryFrameAsyncReadStream<T>
@@ -122,7 +124,57 @@ where
// TODO make start cap adjustable
let mut buf = BytesMut::with_capacity(1024);
buf.resize(buf.capacity(), 0);
Self { inp, buf, wp: 0 }
Self {
inp,
buf,
wp: 0,
tryparse: false,
stopped: false,
}
}
fn tryparse(&mut self) -> Option<Option<Bytes>> {
info!("InMemoryFrameAsyncReadStream tryparse");
let mut buf = std::mem::replace(&mut self.buf, BytesMut::new());
if self.wp >= 4 {
let len = u32::from_le_bytes(*arrayref::array_ref![buf, 0, 4]);
info!("InMemoryFrameAsyncReadStream tryparse len: {}", len);
if len == 0 {
warn!("InMemoryFrameAsyncReadStream tryparse ✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ STOP FRAME");
assert!(self.wp == 4);
self.buf = buf;
Some(None)
} else {
assert!(len > 0 && len < 1024 * 512);
let nl = len as usize + 4;
if buf.capacity() < nl {
buf.resize(nl, 0);
} else {
// nothing to do
}
if self.wp >= nl {
info!("InMemoryFrameAsyncReadStream tryparse Have whole frame");
let mut buf3 = BytesMut::with_capacity(buf.capacity());
// TODO make stats of copied bytes and warn if ratio is too bad.
buf3.put(buf[nl..self.wp].as_ref());
buf3.resize(buf3.capacity(), 0);
use bytes::Buf;
buf.truncate(nl);
buf.advance(4);
self.wp = self.wp - nl;
self.buf = buf3;
Some(Some(buf.freeze()))
} else {
trace!("InMemoryFrameAsyncReadStream tryparse less than length + 4 bytes");
self.buf = buf;
None
}
}
} else {
trace!("InMemoryFrameAsyncReadStream tryparse less than 4 bytes");
self.buf = buf;
None
}
}
}
@@ -133,77 +185,71 @@ where
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
info!("InMemoryFrameAsyncReadStream MAIN POLL");
use Poll::*;
assert!(!self.stopped);
'outer: loop {
info!("InMemoryFrameAsyncReadStream PREPARE BUFFER FOR READING");
let mut buf0 = std::mem::replace(&mut self.buf, BytesMut::new());
if buf0.as_mut().len() != buf0.capacity() {
error!("------- {} {}", buf0.as_mut().len(), buf0.capacity());
panic!();
}
let mut buf2 = ReadBuf::new(buf0.as_mut()[self.wp..].as_mut());
assert!(buf2.filled().len() == 0);
assert!(buf2.capacity() > 0);
assert!(buf2.remaining() > 0);
let r1 = buf2.remaining();
let j = &mut self.inp;
pin_mut!(j);
break match AsyncRead::poll_read(j, cx, &mut buf2) {
Ready(Ok(())) => {
let n1 = buf2.filled().len();
info!("InMemoryFrameAsyncReadStream read Ok n1 {}", n1);
let r2 = buf2.remaining();
if r2 == r1 {
info!("InMemoryFrameAsyncReadStream END OF INPUT");
if self.wp != 0 {
error!("self.wp != 0 {}", self.wp);
}
assert!(self.wp == 0);
if self.tryparse {
info!("InMemoryFrameAsyncReadStream TRYPARSE");
break match self.tryparse() {
None => {
self.tryparse = false;
continue 'outer;
}
Some(None) => {
self.tryparse = false;
self.stopped = true;
Ready(None)
} else {
let n = buf2.filled().len();
self.wp += n;
info!("InMemoryFrameAsyncReadStream read n {} wp {}", n, self.wp);
if self.wp >= 4 {
let len = u32::from_le_bytes(*arrayref::array_ref![buf0.as_mut(), 0, 4]);
info!("InMemoryFrameAsyncReadStream len: {}", len);
assert!(len > 0 && len < 1024 * 512);
let nl = len as usize + 4;
if buf0.capacity() < nl {
buf0.resize(nl, 0);
} else {
// nothing to do
}
Some(Some(k)) => Ready(Some(Ok(k))),
};
} else {
info!("InMemoryFrameAsyncReadStream PREPARE BUFFER FOR READING");
let mut buf0 = std::mem::replace(&mut self.buf, BytesMut::new());
if buf0.as_mut().len() != buf0.capacity() {
error!("------- {} {}", buf0.as_mut().len(), buf0.capacity());
panic!();
}
let mut buf2 = ReadBuf::new(buf0.as_mut()[self.wp..].as_mut());
assert!(buf2.filled().len() == 0);
assert!(buf2.capacity() > 0);
assert!(buf2.remaining() > 0);
let r1 = buf2.remaining();
let j = &mut self.inp;
pin_mut!(j);
info!(
"InMemoryFrameAsyncReadStream POLL READ remaining {}",
buf2.remaining()
);
break match AsyncRead::poll_read(j, cx, &mut buf2) {
Ready(Ok(())) => {
let n1 = buf2.filled().len();
info!("InMemoryFrameAsyncReadStream read Ok n1 {}", n1);
let r2 = buf2.remaining();
if r2 == r1 {
info!("InMemoryFrameAsyncReadStream END OF INPUT");
if self.wp != 0 {
error!("InMemoryFrameAsyncReadStream self.wp != 0 {}", self.wp);
}
if self.wp >= nl {
info!("InMemoryFrameAsyncReadStream Have whole frame");
let mut buf3 = BytesMut::with_capacity(buf0.capacity());
// TODO make stats of copied bytes and warn if ratio is too bad.
buf3.put(buf0.as_ref()[nl..self.wp].as_ref());
buf3.resize(buf3.capacity(), 0);
self.wp = self.wp - nl;
self.buf = buf3;
use bytes::Buf;
buf0.truncate(nl);
buf0.advance(4);
Ready(Some(Ok(buf0.freeze())))
} else {
self.buf = buf0;
continue 'outer;
}
} else {
info!("InMemoryFrameAsyncReadStream not yet enough for len");
self.buf = buf0;
Ready(None)
} else {
let n = buf2.filled().len();
self.wp += n;
info!("InMemoryFrameAsyncReadStream read n {} wp {}", n, self.wp);
self.buf = buf0;
self.tryparse = true;
continue 'outer;
}
}
}
Ready(Err(e)) => Ready(Some(Err(e.into()))),
Pending => {
info!("InMemoryFrameAsyncReadStream Pending");
self.buf = buf0;
Pending
}
};
Ready(Err(e)) => Ready(Some(Err(e.into()))),
Pending => {
info!("InMemoryFrameAsyncReadStream Pending");
self.buf = buf0;
Pending
}
};
}
}
}
}
@@ -240,10 +286,17 @@ pub async fn raw_service(node_config: Arc<NodeConfig>) -> Result<(), Error> {
}
async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr) -> Result<(), Error> {
//use tracing_futures::Instrument;
let span1 = span!(Level::INFO, "raw::raw_conn_handler");
raw_conn_handler_inner(stream, addr).instrument(span1).await
}
async fn raw_conn_handler_inner(stream: TcpStream, addr: SocketAddr) -> Result<(), Error> {
info!("raw_conn_handler SPAWNED for {:?}", addr);
let (netin, mut netout) = stream.into_split();
let mut h = InMemoryFrameAsyncReadStream::new(netin);
while let Some(k) = h.next().await {
let inp_read_span = span!(Level::INFO, "raw_conn_handler INPUT STREAM READ");
while let Some(k) = h.next().instrument(inp_read_span).await {
warn!("raw_conn_handler FRAME RECV {}", k.is_ok());
break;
}
@@ -266,6 +319,7 @@ async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr) -> Result<(), Err
netout.write_u32_le(fr.len() as u32).await?;
netout.write(fr.as_ref()).await?;
}
netout.write_u32_le(0).await?;
netout.flush().await?;
netout.forget();
warn!("raw_conn_handler DONE");