diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 8e8f015..6676d54 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -6,12 +6,13 @@ use crate::channelconfig::{extract_matching_config_entry, read_local_config}; use crate::frame::makeframe::make_frame; use crate::raw::EventsQuery; use bytes::Bytes; +use chrono::{TimeZone, Utc}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; use netpod::log::*; use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange}; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Serialize, Serializer}; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; @@ -137,17 +138,45 @@ where } } +struct Bool {} + +impl Bool { + pub fn is_false(x: &bool) -> bool { + *x == false + } +} + +#[derive(Clone, Debug, Deserialize)] +pub struct IsoDateTime(chrono::DateTime); + +impl Serialize for IsoDateTime { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&self.0.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string()) + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct BinnedJsonResult { - ts_bin_edges: Vec, + ts_bin_edges: Vec, counts: Vec, + #[serde(skip_serializing_if = "Bool::is_false")] + partial_content: bool, + #[serde(skip_serializing_if = "Bool::is_false")] + finalised_range: bool, + #[serde(skip_serializing_if = "Option::is_none")] + continue_at: Option, } pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result { - let deadline = tokio::time::Instant::now() + Duration::from_millis(0); + let deadline = tokio::time::Instant::now() + Duration::from_millis(1000); let mut batch = MinMaxAvgScalarBinBatch::empty(); let mut items = binned_stream(node_config, query).await?; let mut i1 = 0; + let mut partial_content = false; + let mut finalised_range = false; loop { let item = if i1 == 0 { items.next().await @@ -155,7 +184,7 @@ pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> match tokio::time::timeout_at(deadline, items.next()).await { Ok(k) => k, Err(_) => { - error!("TIMEOUT"); + partial_content = true; None } } @@ -166,7 +195,7 @@ pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> match item { Ok(item) => match item { Values(mut vals) => { - info!("APPEND BATCH {}", vals.ts1s.len()); + // TODO gather stats about the batch sizes. batch.ts1s.append(&mut vals.ts1s); batch.ts2s.append(&mut vals.ts2s); batch.counts.append(&mut vals.counts); @@ -177,23 +206,41 @@ pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> } Log(_) => {} EventDataReadStats(_) => {} - RangeComplete => {} + RangeComplete => { + finalised_range = true; + } }, Err(e) => { // TODO Need to use some flags to get good enough error message for remote user. - Err(e)? + Err(e)?; } }; } None => break, } } - let mut ret = BinnedJsonResult { - ts_bin_edges: batch.ts1s, - counts: batch.counts, - }; + let mut tsa: Vec<_> = batch + .ts1s + .iter() + .map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64))) + .collect(); if let Some(&z) = batch.ts2s.last() { - ret.ts_bin_edges.push(z); + tsa.push(IsoDateTime(Utc.timestamp_nanos(z as i64))); } + let continue_at = if partial_content { + match tsa.last() { + Some(k) => Some(k.clone()), + None => Err(Error::with_msg("partial_content but no bin in result"))?, + } + } else { + None + }; + let ret = BinnedJsonResult { + ts_bin_edges: tsa, + counts: batch.counts, + partial_content, + finalised_range, + continue_at, + }; Ok(serde_json::to_value(ret)?) }