From a23400aaf8b088dac9d33d0fd9d954704c05879d Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 30 Apr 2021 20:20:30 +0200 Subject: [PATCH] Try to wrap stream into SCC to factor the errored and completed checks --- disk/src/cache.rs | 2 +- disk/src/cache/pbv.rs | 38 +++++++++------------- netpod/src/lib.rs | 2 ++ netpod/src/streamext.rs | 72 +++++++++++++++++++++++++++++++++++++++++ retrieval/src/client.rs | 8 ++--- 5 files changed, 94 insertions(+), 28 deletions(-) create mode 100644 netpod/src/streamext.rs diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 830f7ed..60476b4 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -185,7 +185,7 @@ pub fn pre_binned_bytes_for_http( query: &PreBinnedQuery, ) -> Result { info!("pre_binned_bytes_for_http {:?} {:?}", query, node); - let ret = PreBinnedValueByteStream::new( + let ret = super::cache::pbv::pre_binned_value_byte_stream_new( query.patch.clone(), query.channel.clone(), query.agg_kind.clone(), diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index 58a1e70..c0bb17a 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -9,6 +9,7 @@ use err::Error; use futures_core::Stream; use futures_util::{FutureExt, StreamExt}; use netpod::log::*; +use netpod::streamext::SCC; use netpod::{ AggKind, BinnedRange, Channel, NanoRange, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange, @@ -17,41 +18,32 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -pub struct PreBinnedValueByteStream { +pub type PreBinnedValueByteStream = SCC; + +pub struct PreBinnedValueByteStreamInner { inp: PreBinnedValueStream, - errored: bool, - completed: bool, } -impl PreBinnedValueByteStream { - pub fn new(patch: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, node_config: &NodeConfig) -> Self { - Self { - inp: PreBinnedValueStream::new(patch, channel, agg_kind, node_config), - errored: false, - completed: false, - } - } +pub fn pre_binned_value_byte_stream_new( + patch: PreBinnedPatchCoord, + channel: Channel, + agg_kind: AggKind, + node_config: &NodeConfig, +) -> PreBinnedValueByteStream { + let s1 = PreBinnedValueStream::new(patch, channel, agg_kind, node_config); + let s2 = PreBinnedValueByteStreamInner { inp: s1 }; + SCC::new(s2) } -impl Stream for PreBinnedValueByteStream { +impl Stream for PreBinnedValueByteStreamInner { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - if self.completed { - panic!("poll_next on completed"); - } - if self.errored { - self.completed = true; - return Ready(None); - } match self.inp.poll_next_unpin(cx) { Ready(Some(item)) => match make_frame::(&item) { Ok(buf) => Ready(Some(Ok(buf.freeze()))), - Err(e) => { - self.errored = true; - Ready(Some(Err(e.into()))) - } + Err(e) => Ready(Some(Err(e.into()))), }, Ready(None) => Ready(None), Pending => Pending, diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 280a417..78dc929 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -10,6 +10,8 @@ use timeunits::*; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; +pub mod streamext; + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AggQuerySingleChannel { pub channel_config: ChannelConfig, diff --git a/netpod/src/streamext.rs b/netpod/src/streamext.rs new file mode 100644 index 0000000..ae2bef5 --- /dev/null +++ b/netpod/src/streamext.rs @@ -0,0 +1,72 @@ +use err::Error; +use futures_core::Stream; +use futures_util::StreamExt; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pub struct SCC +where + S: Stream, +{ + inp: S, + errored: bool, + completed: bool, +} + +impl SCC +where + S: Stream, +{ + pub fn new(inp: S) -> Self { + Self { + inp, + errored: false, + completed: false, + } + } +} + +impl Stream for SCC +where + S: Stream> + Unpin, +{ + type Item = ::Item; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + if self.completed { + panic!("poll_next on completed"); + } else if self.errored { + self.completed = true; + Ready(None) + } else { + match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(k))) => Ready(Some(Ok(k))), + Ready(Some(Err(e))) => { + self.errored = true; + Ready(Some(Err(e))) + } + Ready(None) => { + self.completed = true; + Ready(None) + } + Pending => Pending, + } + } + } +} + +pub trait IntoSCC +where + S: Stream, +{ + fn into_scc(self) -> SCC; +} + +impl IntoSCC for S +where + S: Stream, +{ + fn into_scc(self) -> SCC { + SCC::new(self) + } +} diff --git a/retrieval/src/client.rs b/retrieval/src/client.rs index f54b0be..0c36a29 100644 --- a/retrieval/src/client.rs +++ b/retrieval/src/client.rs @@ -52,21 +52,21 @@ pub async fn get_binned( let g = match item { Ok(frame) => { type ExpectedType = disk::cache::BinnedBytesForHttpStreamFrame; - info!("frame len {}", frame.buf().len()); + let n1 = frame.buf().len(); match bincode::deserialize::(frame.buf()) { Ok(item) => match item { Ok(item) => { - info!("item: {:?}", item); + info!("len {} item {:?}", n1, item); bin_count += 1; Some(Ok(item)) } Err(e) => { - error!("error frame: {:?}", e); + error!("len {} error frame {:?}", n1, e); Some(Err(e)) } }, Err(e) => { - error!("bincode error: {:?}", e); + error!("len {} bincode error {:?}", n1, e); Some(Err(e.into())) } }