Factor transport frame handling more

This commit is contained in:
Dominik Werder
2021-04-30 16:54:08 +02:00
parent d5ad917491
commit 40c27450c4
5 changed files with 65 additions and 20 deletions

View File

@@ -1,7 +1,7 @@
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead};
use crate::frame::inmem::InMemoryFrameAsyncReadStream;
use crate::frame::makeframe::FrameType;
use crate::frame::makeframe::decode_frame;
use err::Error;
use futures_core::Stream;
use futures_util::{pin_mut, FutureExt};
@@ -73,15 +73,21 @@ impl Stream for PreBinnedValueFetchedStream {
break if let Some(res) = self.res.as_mut() {
pin_mut!(res);
match res.poll_next(cx) {
Ready(Some(Ok(frame))) => {
assert!(frame.tyid() == <PreBinnedHttpFrame as FrameType>::FRAME_TYPE_ID);
match bincode::deserialize::<PreBinnedHttpFrame>(frame.buf()) {
Ok(item) => Ready(Some(item)),
Err(e) => Ready(Some(Err(e.into()))),
Ready(Some(Ok(frame))) => match decode_frame::<PreBinnedHttpFrame>(&frame) {
Ok(item) => Ready(Some(item)),
Err(e) => {
self.errored = true;
Ready(Some(Err(e.into())))
}
},
Ready(Some(Err(e))) => {
self.errored = true;
Ready(Some(Err(e.into())))
}
Ready(None) => {
self.completed = true;
Ready(None)
}
Ready(Some(Err(e))) => Ready(Some(Err(e.into()))),
Ready(None) => Ready(None),
Pending => Pending,
}
} else if let Some(resfut) = self.resfut.as_mut() {
@@ -96,6 +102,7 @@ impl Stream for PreBinnedValueFetchedStream {
}
Err(e) => {
error!("PreBinnedValueStream error in stream {:?}", e);
self.errored = true;
Ready(Some(Err(e.into())))
}
},

View File

@@ -216,6 +216,16 @@ impl InMemoryFrame {
}
}
impl std::fmt::Debug for InMemoryFrame {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
fmt,
"InMemoryFrame {{ encid: {:x} tyid: {:x} len {} }}",
self.encid, self.tyid, self.len
)
}
}
impl<T> Stream for InMemoryFrameAsyncReadStream<T>
where
T: AsyncRead + Unpin,

View File

@@ -1,9 +1,10 @@
use crate::cache::BinnedBytesForHttpStreamFrame;
use crate::frame::inmem::InMemoryFrame;
use crate::raw::conn::RawConnOut;
use crate::raw::EventQueryJsonStringFrame;
use bytes::{BufMut, BytesMut};
use err::Error;
use serde::Serialize;
use serde::{Deserialize, Serialize};
pub const INMEM_FRAME_HEAD: usize = 16;
pub const INMEM_FRAME_MAGIC: u32 = 0xc6c3b73d;
@@ -55,3 +56,31 @@ pub fn make_term_frame() -> BytesMut {
buf.put_u32_le(0);
buf
}
pub fn decode_frame<'a, FT>(frame: &'a InMemoryFrame) -> Result<FT, Error>
where
FT: FrameType + Deserialize<'a>,
{
if frame.encid() != 0x12121212 {
return Err(Error::with_msg(format!("unknown encoder id {:?}", frame)));
}
if frame.tyid() != FT::FRAME_TYPE_ID {
return Err(Error::with_msg(format!(
"type id mismatch expect {} found {:?}",
FT::FRAME_TYPE_ID,
frame
)));
}
if frame.len() as usize != frame.buf().len() {
return Err(Error::with_msg(format!(
"buf mismatch {} vs {} in {:?}",
frame.len(),
frame.buf().len(),
frame
)));
}
match bincode::deserialize(frame.buf()) {
Ok(item) => Ok(item),
Err(e) => Err(e.into()),
}
}

View File

@@ -1,6 +1,6 @@
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::frame::inmem::InMemoryFrameAsyncReadStream;
use crate::frame::makeframe::FrameType;
use crate::frame::makeframe::decode_frame;
use crate::raw::conn::RawConnOut;
use err::Error;
use futures_core::Stream;
@@ -58,8 +58,7 @@ where
"MinMaxAvgScalarEventBatchStreamFromFrames got full frame buf {}",
frame.buf().len()
);
assert!(frame.tyid() == <ExpectedType as FrameType>::FRAME_TYPE_ID);
match bincode::deserialize::<ExpectedType>(frame.buf()) {
match decode_frame::<ExpectedType>(&frame) {
Ok(item) => match item {
Ok(item) => Ready(Some(Ok(item))),
Err(e) => {
@@ -73,7 +72,7 @@ where
frame.buf().len(),
);
self.errored = true;
Ready(Some(Err(e.into())))
Ready(Some(Err(e)))
}
}
}

View File

@@ -4,7 +4,7 @@ use crate::agg::IntoDim1F32Stream;
use crate::channelconfig::{extract_matching_config_entry, read_local_config};
use crate::eventblobs::EventBlobsComplete;
use crate::frame::inmem::InMemoryFrameAsyncReadStream;
use crate::frame::makeframe::{make_frame, make_term_frame};
use crate::frame::makeframe::{decode_frame, make_frame, make_term_frame};
use crate::raw::{EventQueryJsonStringFrame, EventsQuery};
use err::Error;
use futures_util::StreamExt;
@@ -68,7 +68,7 @@ async fn raw_conn_handler_inner(
ce.err
);*/
let buf = make_frame::<RawConnOut>(&Err(ce.err))?;
match ce.netout.write(&buf).await {
match ce.netout.write_all(&buf).await {
Ok(_) => (),
Err(e) => return Err(e)?,
}
@@ -120,9 +120,9 @@ async fn raw_conn_handler_inner_try(
error!("expect a command frame");
return Err((Error::with_msg("expect a command frame"), netout))?;
}
let qitem = match bincode::deserialize::<EventQueryJsonStringFrame>(frames[0].buf()) {
let qitem: EventQueryJsonStringFrame = match decode_frame(&frames[0]) {
Ok(k) => k,
Err(e) => return Err((e, netout))?,
Err(e) => return Err((e, netout).into()),
};
trace!("json: {}", qitem.0);
let res: Result<EventsQuery, _> = serde_json::from_str(&qitem.0);
@@ -197,7 +197,7 @@ async fn raw_conn_handler_inner_try(
);
}
match make_frame::<RawConnOut>(&item) {
Ok(buf) => match netout.write(&buf).await {
Ok(buf) => match netout.write_all(&buf).await {
Ok(_) => {}
Err(e) => return Err((e, netout))?,
},
@@ -220,7 +220,7 @@ async fn raw_conn_handler_inner_try(
let mut s1 = futures_util::stream::iter(vec![batch]).map(Result::Ok);
while let Some(item) = s1.next().await {
match make_frame::<RawConnOut>(&item) {
Ok(buf) => match netout.write(&buf).await {
Ok(buf) => match netout.write_all(&buf).await {
Ok(_) => {}
Err(e) => return Err((e, netout))?,
},
@@ -231,7 +231,7 @@ async fn raw_conn_handler_inner_try(
}
}
let buf = make_term_frame();
match netout.write(&buf).await {
match netout.write_all(&buf).await {
Ok(_) => (),
Err(e) => return Err((e, netout))?,
}