From 99c45985ef10fcc182d5f0a7fcfaafe14f458903 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 10 Jun 2021 11:34:20 +0200 Subject: [PATCH] Add type to handling --- disk/src/binned.rs | 27 +++++++++++++++++++-------- disk/src/binned/prebinned.rs | 2 +- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 862a63c..956fd9b 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -343,6 +343,7 @@ impl PipelinePostProcessA for CollectForJson {} pub struct JsonCollector { fut: Pin> + Send>>, + completed: bool, done: bool, } @@ -358,7 +359,11 @@ impl JsonCollector { { let fut = collect_all(inp, bin_count_exp, timeout, abort_after_bin_count); let fut = Box::pin(fut); - Self { fut, done: false } + Self { + fut, + completed: false, + done: false, + } } } @@ -380,12 +385,12 @@ impl ToJsonResult for Sitemty { Ok(item) => match item { StreamItem::DataItem(item) => match item { RangeCompletableItem::Data(item) => Ok(Box::new(item.clone())), - RangeCompletableItem::RangeComplete => Err(Error::with_msg("logic")), + RangeCompletableItem::RangeComplete => Err(Error::with_msg("RangeComplete")), }, - StreamItem::Log(_) => Err(Error::with_msg("logic")), - StreamItem::Stats(_) => Err(Error::with_msg("logic")), + StreamItem::Log(item) => Err(Error::with_msg(format!("Log {:?}", item))), + StreamItem::Stats(item) => Err(Error::with_msg(format!("Stats {:?}", item))), }, - Err(_) => Err(Error::with_msg("logic")), + Err(e) => Err(Error::with_msg(format!("Error {:?}", e))), } } } @@ -396,7 +401,10 @@ impl Stream for JsonCollector { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; loop { - break if self.done { + break if self.completed { + panic!("poll_next on completed") + } else if self.done { + self.completed = true; Ready(None) } else { match self.fut.poll_unpin(cx) { @@ -407,8 +415,10 @@ impl Stream for JsonCollector { Ready(Some(item)) } Ready(Err(e)) => { + // TODO don't emit the error as json. let item = Err::>, _>(e); let item = Box::new(item) as Box; + self.done = true; Ready(Some(item)) } Pending => Pending, @@ -927,8 +937,9 @@ where NTY: NumOps, { fn to_json_result(&self) -> Result, Error> { - // not available. - panic!() + Ok(Box::new(serde_json::Value::String(format!( + "MinMaxAvgBins/non-json-item" + )))) } } diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index 775a48a..19c619b 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -89,7 +89,7 @@ fn make_num_pipeline( ScalarType::I16 => match_end!(i16, byte_order, shape, query, node_config), ScalarType::I32 => match_end!(i32, byte_order, shape, query, node_config), ScalarType::I64 => match_end!(i64, byte_order, shape, query, node_config), - ScalarType::F32 => match_end!(f64, byte_order, shape, query, node_config), + ScalarType::F32 => match_end!(f32, byte_order, shape, query, node_config), ScalarType::F64 => match_end!(f64, byte_order, shape, query, node_config), } }