From ef9f713ee1037020968db7e82e7073c73caf6dc8 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 23 Apr 2021 16:45:14 +0200 Subject: [PATCH] RetStreamExt to pass only first error --- disk/Cargo.toml | 1 + disk/src/agg.rs | 8 ++- disk/src/cache.rs | 139 ++++++++++++++++++++++++++-------------- disk/src/raw.rs | 143 +++++++++++++++++++++++++++++++----------- err/Cargo.toml | 2 + err/src/lib.rs | 67 ++++++++++++-------- netpod/Cargo.toml | 1 + netpod/src/lib.rs | 62 +++++++++++++++++- retrieval/Cargo.toml | 1 + retrieval/src/test.rs | 32 +++++++--- 10 files changed, 335 insertions(+), 121 deletions(-) diff --git a/disk/Cargo.toml b/disk/Cargo.toml index 4c1eac8..75627ef 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -13,6 +13,7 @@ tokio = { version = "1.5.0", features = ["rt-multi-thread", "io-util", "net", "t hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] } async-channel = "1.6" bytes = "1.0.1" +bincode = "1.3.3" arrayref = "0.3.6" byteorder = "1.4.3" futures-core = "0.3.14" diff --git a/disk/src/agg.rs b/disk/src/agg.rs index fe85b67..1b7d3c8 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -10,6 +10,7 @@ use futures_core::Stream; use futures_util::StreamExt; use netpod::BinSpecDimT; use netpod::{Node, ScalarType}; +use serde::{Deserialize, Serialize}; use std::mem::size_of; use std::pin::Pin; use std::task::{Context, Poll}; @@ -200,6 +201,7 @@ impl AggregatableXdim1Bin for ValuesDim0 { } } +#[derive(Serialize, Deserialize)] pub struct MinMaxAvgScalarEventBatch { pub tss: Vec, pub mins: Vec, @@ -413,6 +415,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { } #[allow(dead_code)] +#[derive(Serialize, Deserialize)] pub struct MinMaxAvgScalarBinBatch { ts1s: Vec, ts2s: Vec, @@ -935,11 +938,12 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { trace!("IntoBinnedTDefaultStream poll_next"); use Poll::*; + if self.completed { + panic!("MergedFromRemotes ✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ poll_next on completed"); + } if self.errored { self.completed = true; return Ready(None); - } else if self.completed { - panic!("MergedFromRemotes ✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ poll_next on completed"); } 'outer: loop { let cur = if self.curbin as u64 >= self.spec.count { diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 6e08338..005392d 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,6 +1,6 @@ use crate::agg::{IntoBinnedT, MinMaxAvgScalarBinBatch, MinMaxAvgScalarEventBatch}; use crate::merge::MergedMinMaxAvgScalarStream; -use crate::raw::{EventsQuery, Frameable, InMemoryFrameAsyncReadStream}; +use crate::raw::{EventsQuery, InMemoryFrameAsyncReadStream}; use bytes::{BufMut, Bytes, BytesMut}; use chrono::{DateTime, Utc}; use err::Error; @@ -12,7 +12,7 @@ use netpod::{ PreBinnedPatchRange, ToNanos, }; use serde::{Deserialize, Serialize}; -use std::future::Future; +use std::future::{ready, Future}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -113,13 +113,21 @@ pub async fn binned_bytes_for_http( } } +pub type BinnedBytesForHttpStreamFrame = ::Item; + pub struct BinnedBytesForHttpStream { inp: BinnedStream, + errored: bool, + completed: bool, } impl BinnedBytesForHttpStream { pub fn new(inp: BinnedStream) -> Self { - Self { inp } + Self { + inp, + errored: false, + completed: false, + } } } @@ -128,25 +136,36 @@ impl Stream for BinnedBytesForHttpStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + if self.completed { + panic!("BinnedBytesForHttpStream poll_next on completed"); + } + if self.errored { + self.completed = true; + return Ready(None); + } match self.inp.poll_next_unpin(cx) { - Ready(Some(Ok(k))) => { - // TODO optimize this... - const HEAD: usize = super::raw::INMEM_FRAME_HEAD; - let mut buf = BytesMut::with_capacity(HEAD); - buf.resize(HEAD, 0); - let k = k.serialized(); - info!("BinnedBytesForHttpStream serialized slice has len {}", k.len()); - let n1 = k.len(); - buf.put_slice(&k); - assert!(buf.len() == n1 + HEAD); - buf[0..4].as_mut().put_u32_le(super::raw::INMEM_FRAME_MAGIC); - buf[4..8].as_mut().put_u32_le(n1 as u32); - buf[8..12].as_mut().put_u32_le(0); - info!("BinnedBytesForHttpStream emit buf len {}", buf.len()); - Ready(Some(Ok(buf.freeze()))) + Ready(Some(item)) => { + match bincode::serialize(&item) { + Ok(enc) => { + // TODO optimize this... + const HEAD: usize = super::raw::INMEM_FRAME_HEAD; + let mut buf = BytesMut::with_capacity(enc.len() + HEAD); + buf.put_u32_le(super::raw::INMEM_FRAME_MAGIC); + buf.put_u32_le(enc.len() as u32); + buf.put_u32_le(0); + buf.put(enc.as_ref()); + Ready(Some(Ok(buf.freeze()))) + } + Err(e) => { + self.errored = true; + Ready(Some(Err(e.into()))) + } + } + } + Ready(None) => { + self.completed = true; + Ready(None) } - Ready(Some(Err(e))) => Ready(Some(Err(e))), - Ready(None) => Ready(None), Pending => Pending, } } @@ -194,7 +213,8 @@ pub fn pre_binned_bytes_for_http( pub struct PreBinnedValueByteStream { inp: PreBinnedValueStream, - left: Option, + errored: bool, + completed: bool, } impl PreBinnedValueByteStream { @@ -202,7 +222,8 @@ impl PreBinnedValueByteStream { warn!("PreBinnedValueByteStream"); Self { inp: PreBinnedValueStream::new(patch, channel, agg_kind, node_config), - left: None, + errored: false, + completed: false, } } } @@ -212,23 +233,32 @@ impl Stream for PreBinnedValueByteStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - if let Some(buf) = self.left.take() { - return Ready(Some(Ok(buf))); + if self.completed { + panic!("poll_next on completed"); + } + if self.errored { + self.completed = true; + return Ready(None); } match self.inp.poll_next_unpin(cx) { - Ready(Some(Ok(k))) => { + Ready(Some(item)) => { // TODO optimize this const HEAD: usize = super::raw::INMEM_FRAME_HEAD; - let buf = k.serialized(); - let n1 = buf.len(); - self.left = Some(buf); - let mut buf2 = BytesMut::with_capacity(HEAD); - buf2.put_u32_le(super::raw::INMEM_FRAME_MAGIC); - buf2.put_u32_le(n1 as u32); - buf2.put_u32_le(0); - Ready(Some(Ok(buf2.freeze()))) + match bincode::serialize::(&item) { + Ok(enc) => { + let mut buf = BytesMut::with_capacity(enc.len() + HEAD); + buf.put_u32_le(super::raw::INMEM_FRAME_MAGIC); + buf.put_u32_le(enc.len() as u32); + buf.put_u32_le(0); + buf.put(enc.as_ref()); + Ready(Some(Ok(buf.freeze()))) + } + Err(e) => { + self.errored = true; + Ready(Some(Err(e.into()))) + } + } } - Ready(Some(Err(e))) => Ready(Some(Err(e))), Ready(None) => Ready(None), Pending => Pending, } @@ -324,7 +354,7 @@ impl PreBinnedValueStream { let s2 = s1 .map(|k| { if k.is_err() { - error!(".................. try_setup_fetch_prebinned_higher_res got ERROR"); + error!("\n\n\n.................. try_setup_fetch_prebinned_higher_res got ERROR"); } else { trace!("try_setup_fetch_prebinned_higher_res got some item from MergedFromRemotes"); } @@ -344,13 +374,25 @@ impl PreBinnedValueStream { } }) .filter_map(|k| { - use std::future::ready; let g = match k { Ok(Some(k)) => Some(Ok(k)), Ok(None) => None, Err(e) => Some(Err(e)), }; ready(g) + }) + .take_while({ + let mut run = true; + move |k| { + if !run { + ready(false) + } else { + if k.is_err() { + run = false; + } + ready(true) + } + } }); self.fut2 = Some(Box::pin(s2)); } @@ -432,9 +474,11 @@ impl PreBinnedValueFetchedStream { } } +pub type PreBinnedHttpFrame = Result; + impl Stream for PreBinnedValueFetchedStream { // TODO need this generic for scalar and array (when wave is not binned down to a single scalar point) - type Item = Result; + type Item = PreBinnedHttpFrame; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -442,10 +486,10 @@ impl Stream for PreBinnedValueFetchedStream { break if let Some(res) = self.res.as_mut() { pin_mut!(res); match res.poll_next(cx) { - Ready(Some(Ok(buf))) => { - let item = MinMaxAvgScalarBinBatch::from_full_frame(&buf); - Ready(Some(Ok(item))) - } + Ready(Some(Ok(buf))) => match bincode::deserialize::(&buf) { + Ok(item) => Ready(Some(item)), + Err(e) => Ready(Some(Err(e.into()))), + }, Ready(Some(Err(e))) => Ready(Some(Err(e.into()))), Ready(None) => Ready(None), Pending => Pending, @@ -454,7 +498,7 @@ impl Stream for PreBinnedValueFetchedStream { match resfut.poll_unpin(cx) { Ready(res) => match res { Ok(res) => { - info!("GOT result from SUB REQUEST: {:?}", res); + info!("PreBinnedValueFetchedStream GOT result from SUB REQUEST: {:?}", res); let s1 = HttpBodyAsAsyncRead::new(res); let s2 = InMemoryFrameAsyncReadStream::new(s1); self.res = Some(s2); @@ -473,7 +517,7 @@ impl Stream for PreBinnedValueFetchedStream { .uri(&self.uri) .body(hyper::Body::empty())?; let client = hyper::Client::new(); - info!("START REQUEST FOR {:?}", req); + info!("PreBinnedValueFetchedStream START REQUEST FOR {:?}", req); self.resfut = Some(client.request(req)); continue 'outer; }; @@ -576,12 +620,13 @@ impl Stream for MergedFromRemotes { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + if self.completed { + panic!("MergedFromRemotes ✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ poll_next on completed"); + } if self.errored { warn!("MergedFromRemotes return None after Err"); self.completed = true; return Ready(None); - } else if self.completed { - panic!("MergedFromRemotes ✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ poll_next on completed"); } 'outer: loop { break if let Some(fut) = &mut self.merged { @@ -654,19 +699,21 @@ impl BinnedStream { agg_kind: AggKind, node_config: Arc, ) -> Self { + use netpod::RetStreamExt; warn!("BinnedStream will open a PreBinnedValueStream"); let inp = futures_util::stream::iter(patch_it) .map(move |coord| { PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), node_config.clone()) }) .flatten() + .only_first_error() .map(|k| { match k { Ok(ref k) => { info!("BinnedStream got good item {:?}", k); } Err(_) => { - error!("BinnedStream got error") + error!("\n\n----------------------------------------------------- BinnedStream got error") } } k diff --git a/disk/src/raw.rs b/disk/src/raw.rs index e189cf2..a2c7e9c 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -17,6 +17,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; +use tokio::net::tcp::OwnedWriteHalf; use tokio::net::TcpStream; use tracing::Instrument; #[allow(unused_imports)] @@ -36,11 +37,8 @@ pub async fn x_processed_stream_from_node( query: Arc, node: Arc, ) -> Result> + Send>>, Error> { - debug!("x_processed_stream_from_node ENTER"); let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; - debug!("x_processed_stream_from_node CONNECTED"); let qjs = serde_json::to_vec(query.as_ref())?; - debug!("x_processed_stream_from_node qjs len {}", qjs.len()); let (netin, mut netout) = net.into_split(); // TODO this incorrect magic MUST bubble up into the final result and be reported. @@ -96,8 +94,14 @@ where "MinMaxAvgScalarEventBatchStreamFromFrames got full frame buf {}", buf.len() ); - let item = MinMaxAvgScalarEventBatch::from_full_frame(&buf); - Ready(Some(Ok(item))) + //let item = MinMaxAvgScalarEventBatch::from_full_frame(&buf); + match bincode::deserialize::(buf.as_ref()) { + Ok(item) => match item { + Ok(item) => Ready(Some(Ok(item))), + Err(e) => Ready(Some(Err(e))), + }, + Err(e) => Ready(Some(Err(e.into()))), + } } Ready(Some(Err(e))) => Ready(Some(Err(e))), Ready(None) => Ready(None), @@ -162,10 +166,6 @@ where ))))); } if len == 0 { - warn!( - "InMemoryFrameAsyncReadStream tryparse STOP FRAME self.wp {}", - self.wp - ); if self.wp != HEAD { return Some(Some(Err(Error::with_msg(format!( "InMemoryFrameAsyncReadStream tryparse unexpected amount left {}", @@ -202,7 +202,7 @@ where buf.advance(HEAD); self.wp = self.wp - nl; self.buf = buf3; - self.inp_bytes_consumed += buf.len() as u64 + 4; + self.inp_bytes_consumed += nl as u64; Some(Some(Ok(buf.freeze()))) } else { self.buf = buf; @@ -224,14 +224,15 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + assert!(!self.completed); if self.errored { self.completed = true; return Ready(None); } - assert!(!self.completed); 'outer: loop { if self.tryparse { - break match self.tryparse() { + let r = self.tryparse(); + break match r { None => { self.tryparse = false; continue 'outer; @@ -243,6 +244,7 @@ where } Some(Some(Ok(k))) => Ready(Some(Ok(k))), Some(Some(Err(e))) => { + self.tryparse = false; self.errored = true; Ready(Some(Err(e))) } @@ -257,22 +259,23 @@ where assert!(buf2.filled().len() == 0); assert!(buf2.capacity() > 0); assert!(buf2.remaining() > 0); - let r1 = buf2.remaining(); let j = &mut self.inp; pin_mut!(j); break match AsyncRead::poll_read(j, cx, &mut buf2) { Ready(Ok(())) => { - let _n1 = buf2.filled().len(); - let r2 = buf2.remaining(); - if r2 == r1 { + let n1 = buf2.filled().len(); + if n1 == 0 { if self.wp != 0 { - error!("InMemoryFrameAsyncReadStream self.wp != 0 {}", self.wp); + error!( + "InMemoryFrameAsyncReadStream self.wp != 0 wp {} consumed {}", + self.wp, self.inp_bytes_consumed + ); } self.buf = buf0; + self.completed = true; Ready(None) } else { - let n = buf2.filled().len(); - self.wp += n; + self.wp += n1; self.buf = buf0; self.tryparse = true; continue 'outer; @@ -326,19 +329,74 @@ async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr) -> Result<(), Err raw_conn_handler_inner(stream, addr).instrument(span1).await } +type RawConnOut = Result; + async fn raw_conn_handler_inner(stream: TcpStream, addr: SocketAddr) -> Result<(), Error> { + match raw_conn_handler_inner_try(stream, addr).await { + Ok(_) => (), + Err(mut ce) => { + let ret: RawConnOut = Err(ce.err); + let enc = bincode::serialize(&ret)?; + // TODO optimize + let mut buf = BytesMut::with_capacity(enc.len() + 32); + buf.put_u32_le(INMEM_FRAME_MAGIC); + buf.put_u32_le(enc.len() as u32); + buf.put_u32_le(0); + buf.put(enc.as_ref()); + match ce.netout.write(&buf).await { + Ok(_) => (), + Err(e) => return Err(e)?, + } + } + } + Ok(()) +} + +struct ConnErr { + err: Error, + netout: OwnedWriteHalf, +} + +impl From<(Error, OwnedWriteHalf)> for ConnErr { + fn from((err, netout): (Error, OwnedWriteHalf)) -> Self { + Self { err, netout } + } +} + +impl From<(std::io::Error, OwnedWriteHalf)> for ConnErr { + fn from((err, netout): (std::io::Error, OwnedWriteHalf)) -> Self { + Self { + err: err.into(), + netout, + } + } +} + +async fn raw_conn_handler_inner_try(stream: TcpStream, addr: SocketAddr) -> Result<(), ConnErr> { info!("raw_conn_handler SPAWNED for {:?}", addr); let (netin, mut netout) = stream.into_split(); let mut h = InMemoryFrameAsyncReadStream::new(netin); - let inp_read_span = span!(Level::INFO, "raw_conn_handler INPUT STREAM READ"); - while let Some(k) = h.next().instrument(inp_read_span).await { - warn!("raw_conn_handler FRAME RECV {}", k.is_ok()); - break; + let mut frames = vec![]; + while let Some(k) = h + .next() + .instrument(span!(Level::INFO, "raw_conn_handler INPUT STREAM READ")) + .await + { + match k { + Ok(_) => { + info!(". . . . . . . . . . . . . . . . . . . . . . . . . . raw_conn_handler FRAME RECV"); + frames.push(k); + } + Err(e) => { + return Err((e, netout))?; + } + } } - - warn!("TODO decide on response content based on the parsed json query"); - - warn!("raw_conn_handler INPUT STREAM END"); + if frames.len() != 1 { + error!("expect a command frame"); + return Err((Error::with_msg("expect a command frame"), netout))?; + } + error!("TODO decide on response content based on the parsed json query"); let mut batch = MinMaxAvgScalarEventBatch::empty(); batch.tss.push(42); batch.tss.push(43); @@ -351,16 +409,27 @@ async fn raw_conn_handler_inner(stream: TcpStream, addr: SocketAddr) -> Result<( let mut s1 = futures_util::stream::iter(vec![batch]); while let Some(item) = s1.next().await { let fr = item.serialized(); - netout.write_u32_le(INMEM_FRAME_MAGIC).await?; - netout.write_u32_le(fr.len() as u32).await?; - netout.write_u32_le(0).await?; - netout.write(fr.as_ref()).await?; + let mut buf = BytesMut::with_capacity(fr.len() + 32); + buf.put_u32_le(INMEM_FRAME_MAGIC); + buf.put_u32_le(fr.len() as u32); + buf.put_u32_le(0); + buf.put(fr.as_ref()); + match netout.write(&buf).await { + Ok(_) => {} + Err(e) => return Err((e, netout))?, + } + } + let mut buf = BytesMut::with_capacity(32); + buf.put_u32_le(INMEM_FRAME_MAGIC); + buf.put_u32_le(0); + buf.put_u32_le(0); + match netout.write(&buf).await { + Ok(_) => (), + Err(e) => return Err((e, netout))?, + } + match netout.flush().await { + Ok(_) => (), + Err(e) => return Err((e, netout))?, } - netout.write_u32_le(INMEM_FRAME_MAGIC).await?; - netout.write_u32_le(0).await?; - netout.write_u32_le(0).await?; - netout.flush().await?; - netout.forget(); - warn!("raw_conn_handler DONE"); Ok(()) } diff --git a/err/Cargo.toml b/err/Cargo.toml index 01e86de..af5bcbb 100644 --- a/err/Cargo.toml +++ b/err/Cargo.toml @@ -9,7 +9,9 @@ hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "t http = "0.2" tokio = { version = "1.5.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } backtrace = "0.3.56" +serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +bincode = "1.3.3" async-channel = "1.6" chrono = { version = "0.4.19", features = ["serde"] } nom = "6.1.2" diff --git a/err/src/lib.rs b/err/src/lib.rs index 2377d1f..e93bc91 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -3,6 +3,7 @@ Error handling and reporting. */ use nom::error::ErrorKind; +use serde::{Deserialize, Serialize}; use std::fmt::Debug; use std::net::AddrParseError; use std::num::ParseIntError; @@ -12,16 +13,18 @@ use tokio::task::JoinError; /** The common error type for this application. */ +#[derive(Serialize, Deserialize)] pub struct Error { msg: String, - trace: backtrace::Backtrace, + #[serde(skip)] + trace: Option, } impl Error { pub fn with_msg>(s: S) -> Self { Self { msg: s.into(), - trace: backtrace::Backtrace::new(), + trace: Some(backtrace::Backtrace::new()), } } } @@ -30,33 +33,39 @@ impl std::fmt::Debug for Error { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { use std::io::Write; let mut buf = vec![]; - for fr in self.trace.frames() { - for sy in fr.symbols() { - let is_ours = match sy.filename() { - None => false, - Some(s) => s.to_str().unwrap().contains("dev/daqbuffer"), - }; - let name = match sy.name() { - Some(k) => k.to_string(), - _ => "[err]".into(), - }; - let filename = match sy.filename() { - Some(k) => match k.to_str() { - Some(k) => k, - _ => "[err]", - }, - _ => "[err]", - }; - let lineno = match sy.lineno() { - Some(k) => k, - _ => 0, - }; - if is_ours { - write!(&mut buf, "\n {}\n {} {}", name, filename, lineno).unwrap(); + match &self.trace { + Some(trace) => { + for fr in trace.frames() { + for sy in fr.symbols() { + let is_ours = match sy.filename() { + None => false, + Some(s) => s.to_str().unwrap().contains("dev/daqbuffer"), + }; + let name = match sy.name() { + Some(k) => k.to_string(), + _ => "[err]".into(), + }; + let filename = match sy.filename() { + Some(k) => match k.to_str() { + Some(k) => k, + _ => "[err]", + }, + _ => "[err]", + }; + let lineno = match sy.lineno() { + Some(k) => k, + _ => 0, + }; + if is_ours { + write!(&mut buf, "\n {}\n {} {}", name, filename, lineno).unwrap(); + } + } } } + None => { + write!(&mut buf, "NO_TRACE").unwrap(); + } } - //write!(fmt, "Error {} backtrace:\n{:?}", self.msg, self.trace) write!(fmt, "Error {}\nTrace:{}", self.msg, String::from_utf8(buf).unwrap()) } } @@ -157,6 +166,12 @@ impl From for Error { } } +impl From> for Error { + fn from(k: Box) -> Self { + Self::with_msg(format!("bincode::ErrorKind {:?}", k)) + } +} + pub fn todoval() -> T { todo!("TODO todoval") } diff --git a/netpod/Cargo.toml b/netpod/Cargo.toml index 66fbf88..6a9ca92 100644 --- a/netpod/Cargo.toml +++ b/netpod/Cargo.toml @@ -10,6 +10,7 @@ async-channel = "1.6" bytes = "1.0.1" chrono = { version = "0.4.19", features = ["serde"] } futures-core = "0.3.12" +futures-util = "0.3.14" tracing = "0.1.25" url = "2.2" err = { path = "../err" } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 45c9f8e..2e95c59 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -1,12 +1,13 @@ -#[doc(inline)] -pub use std; - use chrono::{DateTime, TimeZone, Utc}; use err::Error; +use futures_core::Stream; +use futures_util::StreamExt; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::path::PathBuf; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use timeunits::*; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -480,3 +481,58 @@ impl ToNanos for DateTime { self.timestamp() as u64 * timeunits::SEC + self.timestamp_subsec_nanos() as u64 } } + +pub trait RetStreamExt: Stream { + fn only_first_error(self) -> OnlyFirstError + where + Self: Sized; +} + +pub struct OnlyFirstError { + inp: T, + errored: bool, + completed: bool, +} + +impl Stream for OnlyFirstError +where + T: Stream> + Unpin, +{ + type Item = ::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + if self.completed { + panic!("poll_next on completed"); + } + if self.errored { + self.completed = true; + return Ready(None); + } + match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(k))) => Ready(Some(Ok(k))), + Ready(Some(Err(e))) => { + self.errored = true; + Ready(Some(Err(e))) + } + Ready(None) => { + self.completed = true; + Ready(None) + } + Pending => Pending, + } + } +} + +impl RetStreamExt for T +where + T: Stream, +{ + fn only_first_error(self) -> OnlyFirstError { + OnlyFirstError { + inp: self, + errored: false, + completed: false, + } + } +} diff --git a/retrieval/Cargo.toml b/retrieval/Cargo.toml index 0974388..0ba2257 100644 --- a/retrieval/Cargo.toml +++ b/retrieval/Cargo.toml @@ -13,6 +13,7 @@ tracing-subscriber = "0.2.17" futures-core = "0.3.14" futures-util = "0.3.14" bytes = "1.0.1" +bincode = "1.3.3" #async-channel = "1" #dashmap = "3" tokio-postgres = "0.7" diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index 9ea1a50..d32f63d 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -1,6 +1,5 @@ use crate::spawn_test_hosts; use chrono::Utc; -use disk::agg::MinMaxAvgScalarBinBatch; use err::Error; use futures_util::TryStreamExt; use hyper::Body; @@ -86,12 +85,31 @@ async fn get_cached_0_inner() -> Result<(), Error> { let mut bin_count = 0; let s3 = s2 .map_err(|e| error!("TEST GOT ERROR {:?}", e)) - .map_ok(|k| { - info!("TEST GOT ITEM: {:?}", k); - let z = MinMaxAvgScalarBinBatch::from_full_frame(&k); - info!("TEST GOT BATCH: {:?}", z); - bin_count += 1; - z + .filter_map(|item| { + let g = match item { + Ok(buf) => { + info!("TEST GOT FRAME len {}", buf.len()); + match bincode::deserialize::(&buf) { + Ok(item) => match item { + Ok(item) => { + info!("TEST GOT ITEM"); + bin_count += 1; + Some(Ok(item)) + } + Err(e) => { + error!("TEST GOT ERROR FRAME: {:?}", e); + Some(Err(e)) + } + }, + Err(e) => { + error!("bincode error: {:?}", e); + Some(Err(e.into())) + } + } + } + Err(e) => Some(Err(Error::with_msg(format!("WEIRD EMPTY ERROR {:?}", e)))), + }; + ready(g) }) .for_each(|_| ready(())); s3.await;