diff --git a/Cargo.toml b/Cargo.toml index 55dd421..b7fe624 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,8 @@ [workspace] -members = ["retrieval", "httpret", "err", "disk"] +members = ["retrieval"] [profile.release] -#debug = 1 -#opt-level = 0 +debug = 1 +#opt-level = 1 #overflow-checks = true #debug-assertions = true diff --git a/disk/src/agg.rs b/disk/src/agg.rs index d36f3e9..4da4396 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -182,6 +182,21 @@ where S: Stream>, { inp: S, + errored: bool, + completed: bool, +} + +impl Dim0F32Stream +where + S: Stream>, +{ + pub fn new(inp: S) -> Self { + Self { + inp, + errored: false, + completed: false, + } + } } impl Stream for Dim0F32Stream @@ -192,6 +207,13 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + if self.completed { + panic!("Dim0F32Stream poll_next on completed"); + } + if self.errored { + self.completed = true; + return Ready(None); + } match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(k))) => { let mut ret = ValuesDim1 { @@ -236,10 +258,17 @@ where _ => err::todoval(), } } - Ready(Some(Ok(err::todoval()))) + self.errored = true; + Ready(Some(Err(Error::with_msg(format!("TODO not yet implemented"))))) + } + Ready(Some(Err(e))) => { + self.errored = true; + Ready(Some(Err(e))) + } + Ready(None) => { + self.completed = true; + Ready(None) } - Ready(Some(Err(e))) => Ready(Some(Err(e))), - Ready(None) => Ready(None), Pending => Pending, } } @@ -256,7 +285,7 @@ where T: Stream>, { fn into_dim_0_f32_stream(self) -> Dim0F32Stream { - Dim0F32Stream { inp: self } + Dim0F32Stream::new(self) } } @@ -269,6 +298,19 @@ where completed: bool, } +impl Dim1F32Stream +where + S: Stream>, +{ + pub fn new(inp: S) -> Self { + Self { + inp, + errored: false, + completed: false, + } + } +} + impl Stream for Dim1F32Stream where S: Stream> + Unpin, @@ -278,7 +320,7 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; if self.completed { - panic!("poll_next on completed"); + panic!("Dim1F32Stream poll_next on completed"); } if self.errored { self.completed = true; @@ -349,8 +391,14 @@ where } Ready(Some(Ok(ret))) } - Ready(Some(Err(e))) => Ready(Some(Err(e))), - Ready(None) => Ready(None), + Ready(Some(Err(e))) => { + self.errored = true; + Ready(Some(Err(e))) + } + Ready(None) => { + self.completed = true; + Ready(None) + } Pending => Pending, } } @@ -367,11 +415,7 @@ where T: Stream>, { fn into_dim_1_f32_stream(self) -> Dim1F32Stream { - Dim1F32Stream { - inp: self, - errored: false, - completed: false, - } + Dim1F32Stream::new(self) } } diff --git a/disk/src/frame/inmem.rs b/disk/src/frame/inmem.rs index c2e8449..b04e293 100644 --- a/disk/src/frame/inmem.rs +++ b/disk/src/frame/inmem.rs @@ -66,11 +66,7 @@ where bnew[..].as_mut().put_slice(&self.buf[..self.wp]); self.buf = bnew; } - trace!( - ".............. PREPARE READ FROM wp {} self.buf.len() {}", - self.wp, - self.buf.len(), - ); + trace!("prepare read from wp {} self.buf.len() {}", self.wp, self.buf.len(),); let gg = self.buf.len() - self.wp; let mut buf2 = ReadBuf::new(&mut self.buf[self.wp..]); if gg < 1 || gg > 1024 * 1024 * 20 { @@ -265,7 +261,6 @@ where let r = self.poll_upstream(cx); break match r { Ready(Ok(n1)) => { - trace!("poll_upstream GIVES Ready {}", n1); self.wp += n1; if n1 == 0 { let n2 = self.buf.len(); diff --git a/disk/src/lib.rs b/disk/src/lib.rs index b51b856..4c606a5 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -139,7 +139,8 @@ impl FusedFuture for Fopen1 { unsafe impl Send for Fopen1 {} -pub fn raw_concat_channel_read_stream_try_open_in_background( +#[allow(dead_code)] +fn unused_raw_concat_channel_read_stream_try_open_in_background( query: &netpod::AggQuerySingleChannel, node: Node, ) -> impl Stream> + Send { @@ -173,13 +174,13 @@ pub fn raw_concat_channel_read_stream_try_open_in_background( // But next iteration, the file is not available, but reading is, so I should read! // I can not simply drop the reading future, that would lose the request. - if reading.is_some() { - let k: Result<(tokio::fs::File, BytesMut), Error> = reading.as_mut().unwrap().await; + if let Some(read) = &mut reading { + let k: Result<(tokio::fs::File, BytesMut), Error> = read.await; if k.is_err() { error!("LONELY READ ERROR"); } let k = k.unwrap(); - reading.take(); + reading = None; file = Some(k.0); yield Ok(k.1.freeze()); } @@ -266,7 +267,8 @@ pub fn raw_concat_channel_read_stream_try_open_in_background( } } -pub fn raw_concat_channel_read_stream_file_pipe( +#[allow(dead_code)] +fn unused_raw_concat_channel_read_stream_file_pipe( range: &NanoRange, channel_config: &ChannelConfig, node: Node, diff --git a/disk/src/raw/bffr.rs b/disk/src/raw/bffr.rs index 9e4e4c8..2483093 100644 --- a/disk/src/raw/bffr.rs +++ b/disk/src/raw/bffr.rs @@ -16,6 +16,8 @@ where T: AsyncRead + Unpin, { inp: InMemoryFrameAsyncReadStream, + errored: bool, + completed: bool, } impl MinMaxAvgScalarEventBatchStreamFromFrames @@ -23,7 +25,11 @@ where T: AsyncRead + Unpin, { pub fn new(inp: InMemoryFrameAsyncReadStream) -> Self { - Self { inp } + Self { + inp, + errored: false, + completed: false, + } } } @@ -35,6 +41,13 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + if self.completed { + panic!("MinMaxAvgScalarEventBatchStreamFromFrames poll_next on completed"); + } + if self.errored { + self.completed = true; + return Ready(None); + } loop { let j = &mut self.inp; pin_mut!(j); @@ -49,19 +62,29 @@ where match bincode::deserialize::(frame.buf()) { Ok(item) => match item { Ok(item) => Ready(Some(Ok(item))), - Err(e) => Ready(Some(Err(e))), + Err(e) => { + self.errored = true; + Ready(Some(Err(e))) + } }, Err(e) => { error!( "MinMaxAvgScalarEventBatchStreamFromFrames ~~~~~~~~ ERROR on frame payload {}", frame.buf().len(), ); + self.errored = true; Ready(Some(Err(e.into()))) } } } - Ready(Some(Err(e))) => Ready(Some(Err(e))), - Ready(None) => Ready(None), + Ready(Some(Err(e))) => { + self.errored = true; + Ready(Some(Err(e))) + } + Ready(None) => { + self.completed = true; + Ready(None) + } Pending => Pending, }; } diff --git a/err/src/lib.rs b/err/src/lib.rs index 0f7f1a9..e8726e0 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -63,7 +63,7 @@ fn fmt_backtrace(trace: &backtrace::Backtrace) -> String { Some(k) => k, _ => 0, }; - if is_ours { + if true || is_ours { write!(&mut buf, "\n {}\n {} {}", name, filename, lineno).unwrap(); } }