Add type to handling
This commit is contained in:
+19
-8
@@ -343,6 +343,7 @@ impl PipelinePostProcessA for CollectForJson {}
|
|||||||
|
|
||||||
pub struct JsonCollector {
|
pub struct JsonCollector {
|
||||||
fut: Pin<Box<dyn Future<Output = Result<serde_json::Value, Error>> + Send>>,
|
fut: Pin<Box<dyn Future<Output = Result<serde_json::Value, Error>> + Send>>,
|
||||||
|
completed: bool,
|
||||||
done: bool,
|
done: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -358,7 +359,11 @@ impl JsonCollector {
|
|||||||
{
|
{
|
||||||
let fut = collect_all(inp, bin_count_exp, timeout, abort_after_bin_count);
|
let fut = collect_all(inp, bin_count_exp, timeout, abort_after_bin_count);
|
||||||
let fut = Box::pin(fut);
|
let fut = Box::pin(fut);
|
||||||
Self { fut, done: false }
|
Self {
|
||||||
|
fut,
|
||||||
|
completed: false,
|
||||||
|
done: false,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -380,12 +385,12 @@ impl ToJsonResult for Sitemty<serde_json::Value> {
|
|||||||
Ok(item) => match item {
|
Ok(item) => match item {
|
||||||
StreamItem::DataItem(item) => match item {
|
StreamItem::DataItem(item) => match item {
|
||||||
RangeCompletableItem::Data(item) => Ok(Box::new(item.clone())),
|
RangeCompletableItem::Data(item) => Ok(Box::new(item.clone())),
|
||||||
RangeCompletableItem::RangeComplete => Err(Error::with_msg("logic")),
|
RangeCompletableItem::RangeComplete => Err(Error::with_msg("RangeComplete")),
|
||||||
},
|
},
|
||||||
StreamItem::Log(_) => Err(Error::with_msg("logic")),
|
StreamItem::Log(item) => Err(Error::with_msg(format!("Log {:?}", item))),
|
||||||
StreamItem::Stats(_) => Err(Error::with_msg("logic")),
|
StreamItem::Stats(item) => Err(Error::with_msg(format!("Stats {:?}", item))),
|
||||||
},
|
},
|
||||||
Err(_) => Err(Error::with_msg("logic")),
|
Err(e) => Err(Error::with_msg(format!("Error {:?}", e))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -396,7 +401,10 @@ impl Stream for JsonCollector {
|
|||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
use Poll::*;
|
use Poll::*;
|
||||||
loop {
|
loop {
|
||||||
break if self.done {
|
break if self.completed {
|
||||||
|
panic!("poll_next on completed")
|
||||||
|
} else if self.done {
|
||||||
|
self.completed = true;
|
||||||
Ready(None)
|
Ready(None)
|
||||||
} else {
|
} else {
|
||||||
match self.fut.poll_unpin(cx) {
|
match self.fut.poll_unpin(cx) {
|
||||||
@@ -407,8 +415,10 @@ impl Stream for JsonCollector {
|
|||||||
Ready(Some(item))
|
Ready(Some(item))
|
||||||
}
|
}
|
||||||
Ready(Err(e)) => {
|
Ready(Err(e)) => {
|
||||||
|
// TODO don't emit the error as json.
|
||||||
let item = Err::<StreamItem<RangeCompletableItem<serde_json::Value>>, _>(e);
|
let item = Err::<StreamItem<RangeCompletableItem<serde_json::Value>>, _>(e);
|
||||||
let item = Box::new(item) as Box<dyn BinnedResponseItem>;
|
let item = Box::new(item) as Box<dyn BinnedResponseItem>;
|
||||||
|
self.done = true;
|
||||||
Ready(Some(item))
|
Ready(Some(item))
|
||||||
}
|
}
|
||||||
Pending => Pending,
|
Pending => Pending,
|
||||||
@@ -927,8 +937,9 @@ where
|
|||||||
NTY: NumOps,
|
NTY: NumOps,
|
||||||
{
|
{
|
||||||
fn to_json_result(&self) -> Result<Box<dyn ToJsonBytes>, Error> {
|
fn to_json_result(&self) -> Result<Box<dyn ToJsonBytes>, Error> {
|
||||||
// not available.
|
Ok(Box::new(serde_json::Value::String(format!(
|
||||||
panic!()
|
"MinMaxAvgBins/non-json-item"
|
||||||
|
))))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -89,7 +89,7 @@ fn make_num_pipeline(
|
|||||||
ScalarType::I16 => match_end!(i16, byte_order, shape, query, node_config),
|
ScalarType::I16 => match_end!(i16, byte_order, shape, query, node_config),
|
||||||
ScalarType::I32 => match_end!(i32, byte_order, shape, query, node_config),
|
ScalarType::I32 => match_end!(i32, byte_order, shape, query, node_config),
|
||||||
ScalarType::I64 => match_end!(i64, byte_order, shape, query, node_config),
|
ScalarType::I64 => match_end!(i64, byte_order, shape, query, node_config),
|
||||||
ScalarType::F32 => match_end!(f64, byte_order, shape, query, node_config),
|
ScalarType::F32 => match_end!(f32, byte_order, shape, query, node_config),
|
||||||
ScalarType::F64 => match_end!(f64, byte_order, shape, query, node_config),
|
ScalarType::F64 => match_end!(f64, byte_order, shape, query, node_config),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user