See some non-deterministic failure on production
This commit is contained in:
@@ -1,8 +1,8 @@
|
||||
[workspace]
|
||||
members = ["retrieval", "httpret", "err", "disk"]
|
||||
members = ["retrieval"]
|
||||
|
||||
[profile.release]
|
||||
#debug = 1
|
||||
#opt-level = 0
|
||||
debug = 1
|
||||
#opt-level = 1
|
||||
#overflow-checks = true
|
||||
#debug-assertions = true
|
||||
|
||||
@@ -182,6 +182,21 @@ where
|
||||
S: Stream<Item = Result<EventFull, Error>>,
|
||||
{
|
||||
inp: S,
|
||||
errored: bool,
|
||||
completed: bool,
|
||||
}
|
||||
|
||||
impl<S> Dim0F32Stream<S>
|
||||
where
|
||||
S: Stream<Item = Result<EventFull, Error>>,
|
||||
{
|
||||
pub fn new(inp: S) -> Self {
|
||||
Self {
|
||||
inp,
|
||||
errored: false,
|
||||
completed: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Stream for Dim0F32Stream<S>
|
||||
@@ -192,6 +207,13 @@ where
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
if self.completed {
|
||||
panic!("Dim0F32Stream poll_next on completed");
|
||||
}
|
||||
if self.errored {
|
||||
self.completed = true;
|
||||
return Ready(None);
|
||||
}
|
||||
match self.inp.poll_next_unpin(cx) {
|
||||
Ready(Some(Ok(k))) => {
|
||||
let mut ret = ValuesDim1 {
|
||||
@@ -236,10 +258,17 @@ where
|
||||
_ => err::todoval(),
|
||||
}
|
||||
}
|
||||
Ready(Some(Ok(err::todoval())))
|
||||
self.errored = true;
|
||||
Ready(Some(Err(Error::with_msg(format!("TODO not yet implemented")))))
|
||||
}
|
||||
Ready(Some(Err(e))) => {
|
||||
self.errored = true;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
Ready(None) => {
|
||||
self.completed = true;
|
||||
Ready(None)
|
||||
}
|
||||
Ready(Some(Err(e))) => Ready(Some(Err(e))),
|
||||
Ready(None) => Ready(None),
|
||||
Pending => Pending,
|
||||
}
|
||||
}
|
||||
@@ -256,7 +285,7 @@ where
|
||||
T: Stream<Item = Result<EventFull, Error>>,
|
||||
{
|
||||
fn into_dim_0_f32_stream(self) -> Dim0F32Stream<T> {
|
||||
Dim0F32Stream { inp: self }
|
||||
Dim0F32Stream::new(self)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -269,6 +298,19 @@ where
|
||||
completed: bool,
|
||||
}
|
||||
|
||||
impl<S> Dim1F32Stream<S>
|
||||
where
|
||||
S: Stream<Item = Result<EventFull, Error>>,
|
||||
{
|
||||
pub fn new(inp: S) -> Self {
|
||||
Self {
|
||||
inp,
|
||||
errored: false,
|
||||
completed: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Stream for Dim1F32Stream<S>
|
||||
where
|
||||
S: Stream<Item = Result<EventFull, Error>> + Unpin,
|
||||
@@ -278,7 +320,7 @@ where
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
if self.completed {
|
||||
panic!("poll_next on completed");
|
||||
panic!("Dim1F32Stream poll_next on completed");
|
||||
}
|
||||
if self.errored {
|
||||
self.completed = true;
|
||||
@@ -349,8 +391,14 @@ where
|
||||
}
|
||||
Ready(Some(Ok(ret)))
|
||||
}
|
||||
Ready(Some(Err(e))) => Ready(Some(Err(e))),
|
||||
Ready(None) => Ready(None),
|
||||
Ready(Some(Err(e))) => {
|
||||
self.errored = true;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
Ready(None) => {
|
||||
self.completed = true;
|
||||
Ready(None)
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
}
|
||||
@@ -367,11 +415,7 @@ where
|
||||
T: Stream<Item = Result<EventFull, Error>>,
|
||||
{
|
||||
fn into_dim_1_f32_stream(self) -> Dim1F32Stream<T> {
|
||||
Dim1F32Stream {
|
||||
inp: self,
|
||||
errored: false,
|
||||
completed: false,
|
||||
}
|
||||
Dim1F32Stream::new(self)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -66,11 +66,7 @@ where
|
||||
bnew[..].as_mut().put_slice(&self.buf[..self.wp]);
|
||||
self.buf = bnew;
|
||||
}
|
||||
trace!(
|
||||
".............. PREPARE READ FROM wp {} self.buf.len() {}",
|
||||
self.wp,
|
||||
self.buf.len(),
|
||||
);
|
||||
trace!("prepare read from wp {} self.buf.len() {}", self.wp, self.buf.len(),);
|
||||
let gg = self.buf.len() - self.wp;
|
||||
let mut buf2 = ReadBuf::new(&mut self.buf[self.wp..]);
|
||||
if gg < 1 || gg > 1024 * 1024 * 20 {
|
||||
@@ -265,7 +261,6 @@ where
|
||||
let r = self.poll_upstream(cx);
|
||||
break match r {
|
||||
Ready(Ok(n1)) => {
|
||||
trace!("poll_upstream GIVES Ready {}", n1);
|
||||
self.wp += n1;
|
||||
if n1 == 0 {
|
||||
let n2 = self.buf.len();
|
||||
|
||||
@@ -139,7 +139,8 @@ impl FusedFuture for Fopen1 {
|
||||
|
||||
unsafe impl Send for Fopen1 {}
|
||||
|
||||
pub fn raw_concat_channel_read_stream_try_open_in_background(
|
||||
#[allow(dead_code)]
|
||||
fn unused_raw_concat_channel_read_stream_try_open_in_background(
|
||||
query: &netpod::AggQuerySingleChannel,
|
||||
node: Node,
|
||||
) -> impl Stream<Item = Result<Bytes, Error>> + Send {
|
||||
@@ -173,13 +174,13 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(
|
||||
// But next iteration, the file is not available, but reading is, so I should read!
|
||||
// I can not simply drop the reading future, that would lose the request.
|
||||
|
||||
if reading.is_some() {
|
||||
let k: Result<(tokio::fs::File, BytesMut), Error> = reading.as_mut().unwrap().await;
|
||||
if let Some(read) = &mut reading {
|
||||
let k: Result<(tokio::fs::File, BytesMut), Error> = read.await;
|
||||
if k.is_err() {
|
||||
error!("LONELY READ ERROR");
|
||||
}
|
||||
let k = k.unwrap();
|
||||
reading.take();
|
||||
reading = None;
|
||||
file = Some(k.0);
|
||||
yield Ok(k.1.freeze());
|
||||
}
|
||||
@@ -266,7 +267,8 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(
|
||||
}
|
||||
}
|
||||
|
||||
pub fn raw_concat_channel_read_stream_file_pipe(
|
||||
#[allow(dead_code)]
|
||||
fn unused_raw_concat_channel_read_stream_file_pipe(
|
||||
range: &NanoRange,
|
||||
channel_config: &ChannelConfig,
|
||||
node: Node,
|
||||
|
||||
@@ -16,6 +16,8 @@ where
|
||||
T: AsyncRead + Unpin,
|
||||
{
|
||||
inp: InMemoryFrameAsyncReadStream<T>,
|
||||
errored: bool,
|
||||
completed: bool,
|
||||
}
|
||||
|
||||
impl<T> MinMaxAvgScalarEventBatchStreamFromFrames<T>
|
||||
@@ -23,7 +25,11 @@ where
|
||||
T: AsyncRead + Unpin,
|
||||
{
|
||||
pub fn new(inp: InMemoryFrameAsyncReadStream<T>) -> Self {
|
||||
Self { inp }
|
||||
Self {
|
||||
inp,
|
||||
errored: false,
|
||||
completed: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,6 +41,13 @@ where
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
if self.completed {
|
||||
panic!("MinMaxAvgScalarEventBatchStreamFromFrames poll_next on completed");
|
||||
}
|
||||
if self.errored {
|
||||
self.completed = true;
|
||||
return Ready(None);
|
||||
}
|
||||
loop {
|
||||
let j = &mut self.inp;
|
||||
pin_mut!(j);
|
||||
@@ -49,19 +62,29 @@ where
|
||||
match bincode::deserialize::<ExpectedType>(frame.buf()) {
|
||||
Ok(item) => match item {
|
||||
Ok(item) => Ready(Some(Ok(item))),
|
||||
Err(e) => Ready(Some(Err(e))),
|
||||
Err(e) => {
|
||||
self.errored = true;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
error!(
|
||||
"MinMaxAvgScalarEventBatchStreamFromFrames ~~~~~~~~ ERROR on frame payload {}",
|
||||
frame.buf().len(),
|
||||
);
|
||||
self.errored = true;
|
||||
Ready(Some(Err(e.into())))
|
||||
}
|
||||
}
|
||||
}
|
||||
Ready(Some(Err(e))) => Ready(Some(Err(e))),
|
||||
Ready(None) => Ready(None),
|
||||
Ready(Some(Err(e))) => {
|
||||
self.errored = true;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
Ready(None) => {
|
||||
self.completed = true;
|
||||
Ready(None)
|
||||
}
|
||||
Pending => Pending,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -63,7 +63,7 @@ fn fmt_backtrace(trace: &backtrace::Backtrace) -> String {
|
||||
Some(k) => k,
|
||||
_ => 0,
|
||||
};
|
||||
if is_ours {
|
||||
if true || is_ours {
|
||||
write!(&mut buf, "\n {}\n {} {}", name, filename, lineno).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user