Break after deadline

This commit is contained in:
Dominik Werder
2021-05-11 22:23:04 +02:00
parent 414aa59403
commit 033bf22609

View File

@@ -14,6 +14,7 @@ use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, Pr
use serde::{Deserialize, Serialize};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
pub async fn binned_stream(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result<BinnedStream, Error> {
if query.channel().backend != node_config.node.backend {
@@ -141,26 +142,45 @@ pub struct BinnedJsonResult {
}
pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result<serde_json::Value, Error> {
let deadline = tokio::time::Instant::now() + Duration::from_millis(0);
let mut batch = MinMaxAvgScalarBinBatch::empty();
let mut items = binned_stream(node_config, query).await?;
while let Some(item) = items.next().await {
match item {
Ok(item) => {
match item {
MinMaxAvgScalarBinBatchStreamItem::Values(mut vals) => {
batch.ts1s.append(&mut vals.ts1s);
batch.ts2s.append(&mut vals.ts2s);
batch.counts.append(&mut vals.counts);
batch.mins.append(&mut vals.mins);
batch.maxs.append(&mut vals.maxs);
batch.avgs.append(&mut vals.avgs);
}
_ => {}
let mut i1 = 0;
loop {
let item = if i1 == 0 {
items.next().await
} else {
match tokio::time::timeout_at(deadline, items.next()).await {
Ok(k) => k,
Err(_) => {
error!("TIMEOUT");
None
}
serde_json::Value::String(format!("all good"))
}
Err(e) => serde_json::Value::String(format!("{:?}", e)),
};
match item {
Some(item) => {
match item {
Ok(item) => {
match item {
MinMaxAvgScalarBinBatchStreamItem::Values(mut vals) => {
batch.ts1s.append(&mut vals.ts1s);
batch.ts2s.append(&mut vals.ts2s);
batch.counts.append(&mut vals.counts);
batch.mins.append(&mut vals.mins);
batch.maxs.append(&mut vals.maxs);
batch.avgs.append(&mut vals.avgs);
}
_ => {}
}
serde_json::Value::String(format!("all good"))
}
Err(e) => serde_json::Value::String(format!("{:?}", e)),
};
}
None => break,
}
i1 += 1;
}
let mut ret = BinnedJsonResult {
ts_bin_edges: batch.ts1s,