Improve network transport safety
This commit is contained in:
@@ -2,7 +2,6 @@
|
||||
Aggregation and binning support.
|
||||
*/
|
||||
|
||||
use crate::raw::Frameable;
|
||||
use crate::EventFull;
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use err::Error;
|
||||
@@ -305,8 +304,9 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatch {
|
||||
}
|
||||
}
|
||||
|
||||
impl Frameable for MinMaxAvgScalarEventBatch {
|
||||
fn serialized(&self) -> Bytes {
|
||||
impl MinMaxAvgScalarEventBatch {
|
||||
#[allow(dead_code)]
|
||||
fn old_serialized(&self) -> Bytes {
|
||||
let n1 = self.tss.len();
|
||||
let mut g = BytesMut::with_capacity(4 + n1 * (8 + 3 * 4));
|
||||
g.put_u32_le(n1 as u32);
|
||||
@@ -523,8 +523,9 @@ impl MinMaxAvgScalarBinBatch {
|
||||
}
|
||||
}
|
||||
|
||||
impl Frameable for MinMaxAvgScalarBinBatch {
|
||||
fn serialized(&self) -> Bytes {
|
||||
impl MinMaxAvgScalarBinBatch {
|
||||
#[allow(dead_code)]
|
||||
fn old_serialized(&self) -> Bytes {
|
||||
let n1 = self.ts1s.len();
|
||||
let mut g = BytesMut::with_capacity(4 + n1 * (3 * 8 + 3 * 4));
|
||||
g.put_u32_le(n1 as u32);
|
||||
@@ -936,7 +937,6 @@ where
|
||||
type Item = Result<<I::Aggregator as AggregatorTdim>::OutputValue, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
trace!("IntoBinnedTDefaultStream poll_next");
|
||||
use Poll::*;
|
||||
if self.completed {
|
||||
panic!("MergedFromRemotes ✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ poll_next on completed");
|
||||
@@ -950,12 +950,11 @@ where
|
||||
trace!("IntoBinnedTDefaultStream curbin out of spec, END");
|
||||
Ready(None)
|
||||
} else if let Some(k) = self.left.take() {
|
||||
trace!("IntoBinnedTDefaultStream GIVE LEFT");
|
||||
trace!("IntoBinnedTDefaultStream USE LEFTOVER");
|
||||
k
|
||||
} else if self.inp_completed {
|
||||
Ready(None)
|
||||
} else {
|
||||
trace!("IntoBinnedTDefaultStream POLL OUR INPUT");
|
||||
let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll");
|
||||
inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx))
|
||||
};
|
||||
|
||||
@@ -9,7 +9,7 @@ use futures_util::{pin_mut, FutureExt, StreamExt, TryStreamExt};
|
||||
use hyper::Response;
|
||||
use netpod::{
|
||||
AggKind, BinSpecDimT, Channel, Cluster, NanoRange, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator,
|
||||
PreBinnedPatchRange, ToNanos,
|
||||
PreBinnedPatchRange, RetStreamExt, ToNanos,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::future::{ready, Future};
|
||||
@@ -145,7 +145,7 @@ impl Stream for BinnedBytesForHttpStream {
|
||||
}
|
||||
match self.inp.poll_next_unpin(cx) {
|
||||
Ready(Some(item)) => {
|
||||
match bincode::serialize(&item) {
|
||||
match bincode::serialize::<BinnedBytesForHttpStreamFrame>(&item) {
|
||||
Ok(enc) => {
|
||||
// TODO optimize this...
|
||||
const HEAD: usize = super::raw::INMEM_FRAME_HEAD;
|
||||
@@ -699,7 +699,6 @@ impl BinnedStream {
|
||||
agg_kind: AggKind,
|
||||
node_config: Arc<NodeConfig>,
|
||||
) -> Self {
|
||||
use netpod::RetStreamExt;
|
||||
warn!("BinnedStream will open a PreBinnedValueStream");
|
||||
let inp = futures_util::stream::iter(patch_it)
|
||||
.map(move |coord| {
|
||||
|
||||
@@ -43,7 +43,7 @@ pub async fn x_processed_stream_from_node(
|
||||
|
||||
// TODO this incorrect magic MUST bubble up into the final result and be reported.
|
||||
|
||||
netout.write_u32_le(INMEM_FRAME_MAGIC - 1).await?;
|
||||
netout.write_u32_le(INMEM_FRAME_MAGIC).await?;
|
||||
netout.write_u32_le(qjs.len() as u32).await?;
|
||||
netout.write_u32_le(0).await?;
|
||||
netout.write_all(&qjs).await?;
|
||||
@@ -52,12 +52,9 @@ pub async fn x_processed_stream_from_node(
|
||||
netout.write_u32_le(0).await?;
|
||||
netout.flush().await?;
|
||||
netout.forget();
|
||||
debug!("x_processed_stream_from_node WRITTEN");
|
||||
let frames = InMemoryFrameAsyncReadStream::new(netin);
|
||||
let s2 = MinMaxAvgScalarEventBatchStreamFromFrames::new(frames);
|
||||
debug!("x_processed_stream_from_node HAVE STREAM INSTANCE");
|
||||
let s3: Pin<Box<dyn Stream<Item = Result<_, Error>> + Send>> = Box::pin(s2);
|
||||
debug!("x_processed_stream_from_node RETURN");
|
||||
Ok(s3)
|
||||
}
|
||||
|
||||
@@ -94,13 +91,19 @@ where
|
||||
"MinMaxAvgScalarEventBatchStreamFromFrames got full frame buf {}",
|
||||
buf.len()
|
||||
);
|
||||
//let item = MinMaxAvgScalarEventBatch::from_full_frame(&buf);
|
||||
match bincode::deserialize::<RawConnOut>(buf.as_ref()) {
|
||||
Ok(item) => match item {
|
||||
Ok(item) => Ready(Some(Ok(item))),
|
||||
Err(e) => Ready(Some(Err(e))),
|
||||
},
|
||||
Err(e) => Ready(Some(Err(e.into()))),
|
||||
Err(e) => {
|
||||
trace!(
|
||||
"MinMaxAvgScalarEventBatchStreamFromFrames ~~~~~~~~ ERROR on frame payload {} {}",
|
||||
buf.len(),
|
||||
crchex(buf)
|
||||
);
|
||||
Ready(Some(Err(e.into())))
|
||||
}
|
||||
}
|
||||
}
|
||||
Ready(Some(Err(e))) => Ready(Some(Err(e))),
|
||||
@@ -303,13 +306,6 @@ async fn local_unpacked_test() {
|
||||
let _stream = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node).into_dim_1_f32_stream();
|
||||
}
|
||||
|
||||
/**
|
||||
Can be serialized as a length-delimited frame.
|
||||
*/
|
||||
pub trait Frameable {
|
||||
fn serialized(&self) -> Bytes;
|
||||
}
|
||||
|
||||
pub async fn raw_service(node_config: Arc<NodeConfig>) -> Result<(), Error> {
|
||||
let addr = format!("{}:{}", node_config.node.listen, node_config.node.port_raw);
|
||||
let lis = tokio::net::TcpListener::bind(addr).await?;
|
||||
@@ -335,14 +331,18 @@ async fn raw_conn_handler_inner(stream: TcpStream, addr: SocketAddr) -> Result<(
|
||||
match raw_conn_handler_inner_try(stream, addr).await {
|
||||
Ok(_) => (),
|
||||
Err(mut ce) => {
|
||||
let ret: RawConnOut = Err(ce.err);
|
||||
let enc = bincode::serialize(&ret)?;
|
||||
error!("raw_conn_handler_inner CAUGHT ERROR AND TRY TO SEND OVER TCP");
|
||||
let enc = bincode::serialize::<RawConnOut>(&Err(ce.err))?;
|
||||
// TODO optimize
|
||||
let mut buf = BytesMut::with_capacity(enc.len() + 32);
|
||||
let mut buf = BytesMut::with_capacity(enc.len() + INMEM_FRAME_HEAD);
|
||||
buf.put_u32_le(INMEM_FRAME_MAGIC);
|
||||
buf.put_u32_le(enc.len() as u32);
|
||||
buf.put_u32_le(0);
|
||||
buf.put(enc.as_ref());
|
||||
trace!(
|
||||
"raw_conn_handler_inner ~~~~~~~~~~~~ EMIT FRAME PAYLOAD CRC {}",
|
||||
crchex(&enc)
|
||||
);
|
||||
match ce.netout.write(&buf).await {
|
||||
Ok(_) => (),
|
||||
Err(e) => return Err(e)?,
|
||||
@@ -357,14 +357,8 @@ struct ConnErr {
|
||||
netout: OwnedWriteHalf,
|
||||
}
|
||||
|
||||
impl From<(Error, OwnedWriteHalf)> for ConnErr {
|
||||
fn from((err, netout): (Error, OwnedWriteHalf)) -> Self {
|
||||
Self { err, netout }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<(std::io::Error, OwnedWriteHalf)> for ConnErr {
|
||||
fn from((err, netout): (std::io::Error, OwnedWriteHalf)) -> Self {
|
||||
impl<E: Into<Error>> From<(E, OwnedWriteHalf)> for ConnErr {
|
||||
fn from((err, netout): (E, OwnedWriteHalf)) -> Self {
|
||||
Self {
|
||||
err: err.into(),
|
||||
netout,
|
||||
@@ -406,17 +400,28 @@ async fn raw_conn_handler_inner_try(stream: TcpStream, addr: SocketAddr) -> Resu
|
||||
batch.maxs.push(8.4);
|
||||
batch.avgs.push(9.5);
|
||||
batch.avgs.push(9.6);
|
||||
let mut s1 = futures_util::stream::iter(vec![batch]);
|
||||
let mut s1 = futures_util::stream::iter(vec![batch]).map(Result::Ok);
|
||||
while let Some(item) = s1.next().await {
|
||||
let fr = item.serialized();
|
||||
let mut buf = BytesMut::with_capacity(fr.len() + 32);
|
||||
buf.put_u32_le(INMEM_FRAME_MAGIC);
|
||||
buf.put_u32_le(fr.len() as u32);
|
||||
buf.put_u32_le(0);
|
||||
buf.put(fr.as_ref());
|
||||
match netout.write(&buf).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => return Err((e, netout))?,
|
||||
match bincode::serialize::<RawConnOut>(&item) {
|
||||
Ok(enc) => {
|
||||
let mut buf = BytesMut::with_capacity(enc.len() + INMEM_FRAME_HEAD);
|
||||
buf.put_u32_le(INMEM_FRAME_MAGIC);
|
||||
buf.put_u32_le(enc.len() as u32);
|
||||
buf.put_u32_le(0);
|
||||
buf.put(enc.as_ref());
|
||||
trace!(
|
||||
"raw_conn_handler_inner_try ~~~~~~~~~~~~ EMIT FRAME PAYLOAD {} {}",
|
||||
enc.len(),
|
||||
crchex(enc)
|
||||
);
|
||||
match netout.write(&buf).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => return Err((e, netout))?,
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
return Err((e, netout))?;
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut buf = BytesMut::with_capacity(32);
|
||||
@@ -433,3 +438,13 @@ async fn raw_conn_handler_inner_try(stream: TcpStream, addr: SocketAddr) -> Resu
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn crchex<T>(t: T) -> String
|
||||
where
|
||||
T: AsRef<[u8]>,
|
||||
{
|
||||
let mut h = crc32fast::Hasher::new();
|
||||
h.update(t.as_ref());
|
||||
let crc = h.finalize();
|
||||
format!("{:08x}", crc)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user