json binned with iso timestamps and partial flag and continue at date
This commit is contained in:
+59
-12
@@ -6,12 +6,13 @@ use crate::channelconfig::{extract_matching_config_entry, read_local_config};
|
|||||||
use crate::frame::makeframe::make_frame;
|
use crate::frame::makeframe::make_frame;
|
||||||
use crate::raw::EventsQuery;
|
use crate::raw::EventsQuery;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
|
use chrono::{TimeZone, Utc};
|
||||||
use err::Error;
|
use err::Error;
|
||||||
use futures_core::Stream;
|
use futures_core::Stream;
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
use netpod::log::*;
|
use netpod::log::*;
|
||||||
use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange};
|
use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize, Serializer};
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@@ -137,17 +138,45 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct Bool {}
|
||||||
|
|
||||||
|
impl Bool {
|
||||||
|
pub fn is_false(x: &bool) -> bool {
|
||||||
|
*x == false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
|
pub struct IsoDateTime(chrono::DateTime<Utc>);
|
||||||
|
|
||||||
|
impl Serialize for IsoDateTime {
|
||||||
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
serializer.serialize_str(&self.0.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub struct BinnedJsonResult {
|
pub struct BinnedJsonResult {
|
||||||
ts_bin_edges: Vec<u64>,
|
ts_bin_edges: Vec<IsoDateTime>,
|
||||||
counts: Vec<u64>,
|
counts: Vec<u64>,
|
||||||
|
#[serde(skip_serializing_if = "Bool::is_false")]
|
||||||
|
partial_content: bool,
|
||||||
|
#[serde(skip_serializing_if = "Bool::is_false")]
|
||||||
|
finalised_range: bool,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
continue_at: Option<IsoDateTime>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result<serde_json::Value, Error> {
|
pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result<serde_json::Value, Error> {
|
||||||
let deadline = tokio::time::Instant::now() + Duration::from_millis(0);
|
let deadline = tokio::time::Instant::now() + Duration::from_millis(1000);
|
||||||
let mut batch = MinMaxAvgScalarBinBatch::empty();
|
let mut batch = MinMaxAvgScalarBinBatch::empty();
|
||||||
let mut items = binned_stream(node_config, query).await?;
|
let mut items = binned_stream(node_config, query).await?;
|
||||||
let mut i1 = 0;
|
let mut i1 = 0;
|
||||||
|
let mut partial_content = false;
|
||||||
|
let mut finalised_range = false;
|
||||||
loop {
|
loop {
|
||||||
let item = if i1 == 0 {
|
let item = if i1 == 0 {
|
||||||
items.next().await
|
items.next().await
|
||||||
@@ -155,7 +184,7 @@ pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) ->
|
|||||||
match tokio::time::timeout_at(deadline, items.next()).await {
|
match tokio::time::timeout_at(deadline, items.next()).await {
|
||||||
Ok(k) => k,
|
Ok(k) => k,
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
error!("TIMEOUT");
|
partial_content = true;
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -166,7 +195,7 @@ pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) ->
|
|||||||
match item {
|
match item {
|
||||||
Ok(item) => match item {
|
Ok(item) => match item {
|
||||||
Values(mut vals) => {
|
Values(mut vals) => {
|
||||||
info!("APPEND BATCH {}", vals.ts1s.len());
|
// TODO gather stats about the batch sizes.
|
||||||
batch.ts1s.append(&mut vals.ts1s);
|
batch.ts1s.append(&mut vals.ts1s);
|
||||||
batch.ts2s.append(&mut vals.ts2s);
|
batch.ts2s.append(&mut vals.ts2s);
|
||||||
batch.counts.append(&mut vals.counts);
|
batch.counts.append(&mut vals.counts);
|
||||||
@@ -177,23 +206,41 @@ pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) ->
|
|||||||
}
|
}
|
||||||
Log(_) => {}
|
Log(_) => {}
|
||||||
EventDataReadStats(_) => {}
|
EventDataReadStats(_) => {}
|
||||||
RangeComplete => {}
|
RangeComplete => {
|
||||||
|
finalised_range = true;
|
||||||
|
}
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// TODO Need to use some flags to get good enough error message for remote user.
|
// TODO Need to use some flags to get good enough error message for remote user.
|
||||||
Err(e)?
|
Err(e)?;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
None => break,
|
None => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let mut ret = BinnedJsonResult {
|
let mut tsa: Vec<_> = batch
|
||||||
ts_bin_edges: batch.ts1s,
|
.ts1s
|
||||||
counts: batch.counts,
|
.iter()
|
||||||
};
|
.map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64)))
|
||||||
|
.collect();
|
||||||
if let Some(&z) = batch.ts2s.last() {
|
if let Some(&z) = batch.ts2s.last() {
|
||||||
ret.ts_bin_edges.push(z);
|
tsa.push(IsoDateTime(Utc.timestamp_nanos(z as i64)));
|
||||||
}
|
}
|
||||||
|
let continue_at = if partial_content {
|
||||||
|
match tsa.last() {
|
||||||
|
Some(k) => Some(k.clone()),
|
||||||
|
None => Err(Error::with_msg("partial_content but no bin in result"))?,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
let ret = BinnedJsonResult {
|
||||||
|
ts_bin_edges: tsa,
|
||||||
|
counts: batch.counts,
|
||||||
|
partial_content,
|
||||||
|
finalised_range,
|
||||||
|
continue_at,
|
||||||
|
};
|
||||||
Ok(serde_json::to_value(ret)?)
|
Ok(serde_json::to_value(ret)?)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user