From 033bf2260935837ab769dcd2901f0a33e6756421 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 11 May 2021 22:23:04 +0200 Subject: [PATCH] Break after deadline --- disk/src/binned.rs | 50 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 30ada28..63ae254 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -14,6 +14,7 @@ use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, Pr use serde::{Deserialize, Serialize}; use std::pin::Pin; use std::task::{Context, Poll}; +use std::time::Duration; pub async fn binned_stream(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result { if query.channel().backend != node_config.node.backend { @@ -141,26 +142,45 @@ pub struct BinnedJsonResult { } pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result { + let deadline = tokio::time::Instant::now() + Duration::from_millis(0); let mut batch = MinMaxAvgScalarBinBatch::empty(); let mut items = binned_stream(node_config, query).await?; - while let Some(item) = items.next().await { - match item { - Ok(item) => { - match item { - MinMaxAvgScalarBinBatchStreamItem::Values(mut vals) => { - batch.ts1s.append(&mut vals.ts1s); - batch.ts2s.append(&mut vals.ts2s); - batch.counts.append(&mut vals.counts); - batch.mins.append(&mut vals.mins); - batch.maxs.append(&mut vals.maxs); - batch.avgs.append(&mut vals.avgs); - } - _ => {} + let mut i1 = 0; + loop { + let item = if i1 == 0 { + items.next().await + } else { + match tokio::time::timeout_at(deadline, items.next()).await { + Ok(k) => k, + Err(_) => { + error!("TIMEOUT"); + None } - serde_json::Value::String(format!("all good")) } - Err(e) => serde_json::Value::String(format!("{:?}", e)), }; + match item { + Some(item) => { + match item { + Ok(item) => { + match item { + MinMaxAvgScalarBinBatchStreamItem::Values(mut vals) => { + batch.ts1s.append(&mut vals.ts1s); + batch.ts2s.append(&mut vals.ts2s); + batch.counts.append(&mut vals.counts); + batch.mins.append(&mut vals.mins); + batch.maxs.append(&mut vals.maxs); + batch.avgs.append(&mut vals.avgs); + } + _ => {} + } + serde_json::Value::String(format!("all good")) + } + Err(e) => serde_json::Value::String(format!("{:?}", e)), + }; + } + None => break, + } + i1 += 1; } let mut ret = BinnedJsonResult { ts_bin_edges: batch.ts1s,