WIP on json endpoint

This commit is contained in:
Dominik Werder
2021-05-11 18:59:37 +02:00
parent c4b021668d
commit 414aa59403
9 changed files with 286 additions and 129 deletions

173
disk/src/binned.rs Normal file
View File

@@ -0,0 +1,173 @@
use crate::agg::binnedt::IntoBinnedT;
use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem};
use crate::binnedstream::{BinnedStream, BinnedStreamFromPreBinnedPatches};
use crate::cache::{BinnedQuery, MergedFromRemotes};
use crate::channelconfig::{extract_matching_config_entry, read_local_config};
use crate::frame::makeframe::make_frame;
use crate::raw::EventsQuery;
use bytes::Bytes;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange};
use serde::{Deserialize, Serialize};
use std::pin::Pin;
use std::task::{Context, Poll};
pub async fn binned_stream(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result<BinnedStream, Error> {
if query.channel().backend != node_config.node.backend {
let err = Error::with_msg(format!(
"backend mismatch node: {} requested: {}",
node_config.node.backend,
query.channel().backend
));
return Err(err);
}
let range = query.range();
let channel_config = read_local_config(&query.channel(), &node_config.node).await?;
let entry = extract_matching_config_entry(range, &channel_config);
info!("binned_bytes_for_http found config entry {:?}", entry);
let range = BinnedRange::covering_range(range.clone(), query.bin_count()).ok_or(Error::with_msg(format!(
"binned_bytes_for_http BinnedRange::covering_range returned None"
)))?;
let perf_opts = PerfOpts { inmem_bufcap: 512 };
match PreBinnedPatchRange::covering_range(query.range().clone(), query.bin_count()) {
Some(pre_range) => {
info!("binned_bytes_for_http found pre_range: {:?}", pre_range);
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
let msg = format!(
"binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}",
pre_range, range
);
return Err(Error::with_msg(msg));
}
let s1 = BinnedStreamFromPreBinnedPatches::new(
PreBinnedPatchIterator::from_range(pre_range),
query.channel().clone(),
range,
query.agg_kind().clone(),
query.cache_usage().clone(),
node_config,
query.disk_stats_every().clone(),
)?;
let ret = BinnedStream::new(Box::pin(s1))?;
Ok(ret)
}
None => {
info!(
"binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}",
range
);
let evq = EventsQuery {
channel: query.channel().clone(),
range: query.range().clone(),
agg_kind: query.agg_kind().clone(),
};
// TODO do I need to set up more transformations or binning to deliver the requested data?
let s1 = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone());
let s1 = s1.into_binned_t(range);
let ret = BinnedStream::new(Box::pin(s1))?;
Ok(ret)
}
}
}
type BinnedStreamBox = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>;
pub async fn binned_bytes_for_http(
node_config: &NodeConfigCached,
query: &BinnedQuery,
) -> Result<BinnedStreamBox, Error> {
let s1 = binned_stream(node_config, query).await?;
let ret = BinnedBytesForHttpStream::new(s1);
Ok(Box::pin(ret))
}
pub type BinnedBytesForHttpStreamFrame = <BinnedStreamFromPreBinnedPatches as Stream>::Item;
pub struct BinnedBytesForHttpStream<S> {
inp: S,
errored: bool,
completed: bool,
}
impl<S> BinnedBytesForHttpStream<S> {
pub fn new(inp: S) -> Self {
Self {
inp,
errored: false,
completed: false,
}
}
}
impl<S> Stream for BinnedBytesForHttpStream<S>
where
S: Stream<Item = Result<MinMaxAvgScalarBinBatchStreamItem, Error>> + Unpin,
{
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
if self.completed {
panic!("BinnedBytesForHttpStream poll_next on completed");
}
if self.errored {
self.completed = true;
return Ready(None);
}
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => match make_frame::<BinnedBytesForHttpStreamFrame>(&item) {
Ok(buf) => Ready(Some(Ok(buf.freeze()))),
Err(e) => {
self.errored = true;
Ready(Some(Err(e.into())))
}
},
Ready(None) => {
self.completed = true;
Ready(None)
}
Pending => Pending,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct BinnedJsonResult {
ts_bin_edges: Vec<u64>,
counts: Vec<u64>,
}
pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result<serde_json::Value, Error> {
let mut batch = MinMaxAvgScalarBinBatch::empty();
let mut items = binned_stream(node_config, query).await?;
while let Some(item) = items.next().await {
match item {
Ok(item) => {
match item {
MinMaxAvgScalarBinBatchStreamItem::Values(mut vals) => {
batch.ts1s.append(&mut vals.ts1s);
batch.ts2s.append(&mut vals.ts2s);
batch.counts.append(&mut vals.counts);
batch.mins.append(&mut vals.mins);
batch.maxs.append(&mut vals.maxs);
batch.avgs.append(&mut vals.avgs);
}
_ => {}
}
serde_json::Value::String(format!("all good"))
}
Err(e) => serde_json::Value::String(format!("{:?}", e)),
};
}
let mut ret = BinnedJsonResult {
ts_bin_edges: batch.ts1s,
counts: batch.counts,
};
if let Some(&z) = batch.ts2s.last() {
ret.ts_bin_edges.push(z);
}
Ok(serde_json::to_value(ret)?)
}

View File

@@ -96,11 +96,11 @@ impl Stream for BinnedStreamFromPreBinnedPatches {
}
}
pub struct BinnedStreamFromMerged {
pub struct BinnedStream {
inp: Pin<Box<dyn Stream<Item = Result<MinMaxAvgScalarBinBatchStreamItem, Error>> + Send>>,
}
impl BinnedStreamFromMerged {
impl BinnedStream {
pub fn new(
inp: Pin<Box<dyn Stream<Item = Result<MinMaxAvgScalarBinBatchStreamItem, Error>> + Send>>,
) -> Result<Self, Error> {
@@ -108,7 +108,7 @@ impl BinnedStreamFromMerged {
}
}
impl Stream for BinnedStreamFromMerged {
impl Stream for BinnedStream {
// TODO make this generic over all possible things
type Item = Result<MinMaxAvgScalarBinBatchStreamItem, Error>;

View File

@@ -1,11 +1,7 @@
use crate::agg::binnedt::IntoBinnedT;
use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem;
use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem};
use crate::binnedstream::{BinnedStreamFromMerged, BinnedStreamFromPreBinnedPatches};
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::cache::pbv::PreBinnedValueByteStream;
use crate::cache::pbvfs::PreBinnedItem;
use crate::channelconfig::{extract_matching_config_entry, read_local_config};
use crate::frame::makeframe::make_frame;
use crate::merge::MergedMinMaxAvgScalarStream;
use crate::raw::EventsQuery;
use bytes::Bytes;
@@ -15,8 +11,7 @@ use futures_core::Stream;
use futures_util::{pin_mut, StreamExt};
use hyper::Response;
use netpod::{
AggKind, BinnedRange, ByteSize, Channel, Cluster, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchCoord,
PreBinnedPatchIterator, PreBinnedPatchRange, ToNanos,
AggKind, ByteSize, Channel, Cluster, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchCoord, ToNanos,
};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
@@ -90,6 +85,30 @@ impl BinnedQuery {
info!("BinnedQuery::from_request {:?}", ret);
Ok(ret)
}
pub fn range(&self) -> &NanoRange {
&self.range
}
pub fn channel(&self) -> &Channel {
&self.channel
}
pub fn bin_count(&self) -> u64 {
self.bin_count
}
pub fn agg_kind(&self) -> &AggKind {
&self.agg_kind
}
pub fn cache_usage(&self) -> &CacheUsage {
&self.cache_usage
}
pub fn disk_stats_every(&self) -> &ByteSize {
&self.disk_stats_every
}
}
#[derive(Clone, Debug)]
@@ -195,119 +214,6 @@ fn cache_usage_from_params(params: &BTreeMap<String, String>) -> Result<CacheUsa
Ok(ret)
}
type BinnedStreamBox = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>;
pub async fn binned_bytes_for_http(
node_config: &NodeConfigCached,
query: &BinnedQuery,
) -> Result<BinnedStreamBox, Error> {
if query.channel.backend != node_config.node.backend {
let err = Error::with_msg(format!(
"backend mismatch node: {} requested: {}",
node_config.node.backend, query.channel.backend
));
return Err(err);
}
let range = &query.range;
let channel_config = read_local_config(&query.channel, &node_config.node).await?;
let entry = extract_matching_config_entry(range, &channel_config);
info!("binned_bytes_for_http found config entry {:?}", entry);
let range = BinnedRange::covering_range(range.clone(), query.bin_count).ok_or(Error::with_msg(format!(
"binned_bytes_for_http BinnedRange::covering_range returned None"
)))?;
let perf_opts = PerfOpts { inmem_bufcap: 512 };
match PreBinnedPatchRange::covering_range(query.range.clone(), query.bin_count) {
Some(pre_range) => {
info!("binned_bytes_for_http found pre_range: {:?}", pre_range);
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
let msg = format!(
"binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}",
pre_range, range
);
return Err(Error::with_msg(msg));
}
let s1 = BinnedStreamFromPreBinnedPatches::new(
PreBinnedPatchIterator::from_range(pre_range),
query.channel.clone(),
range,
query.agg_kind.clone(),
query.cache_usage.clone(),
node_config,
query.disk_stats_every.clone(),
)?;
let ret = BinnedBytesForHttpStream::new(s1);
Ok(Box::pin(ret))
}
None => {
info!(
"binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}",
range
);
let evq = EventsQuery {
channel: query.channel.clone(),
range: query.range.clone(),
agg_kind: query.agg_kind.clone(),
};
// TODO do I need to set up more transformations or binning to deliver the requested data?
let s1 = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone());
let s1 = s1.into_binned_t(range);
let s1 = BinnedStreamFromMerged::new(Box::pin(s1))?;
let ret = BinnedBytesForHttpStream::new(s1);
Ok(Box::pin(ret))
}
}
}
pub type BinnedBytesForHttpStreamFrame = <BinnedStreamFromPreBinnedPatches as Stream>::Item;
pub struct BinnedBytesForHttpStream<S> {
inp: S,
errored: bool,
completed: bool,
}
impl<S> BinnedBytesForHttpStream<S> {
pub fn new(inp: S) -> Self {
Self {
inp,
errored: false,
completed: false,
}
}
}
impl<S> Stream for BinnedBytesForHttpStream<S>
where
S: Stream<Item = Result<MinMaxAvgScalarBinBatchStreamItem, Error>> + Unpin,
{
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
if self.completed {
panic!("BinnedBytesForHttpStream poll_next on completed");
}
if self.errored {
self.completed = true;
return Ready(None);
}
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => match make_frame::<BinnedBytesForHttpStreamFrame>(&item) {
Ok(buf) => Ready(Some(Ok(buf.freeze()))),
Err(e) => {
self.errored = true;
Ready(Some(Err(e.into())))
}
},
Ready(None) => {
self.completed = true;
Ready(None)
}
Pending => Pending,
}
}
}
// NOTE This answers a request for a single valid pre-binned patch.
// A user must first make sure that the grid spec is valid, and that this node is responsible for it.
// Otherwise it is an error.

View File

@@ -1,5 +1,5 @@
use crate::binned::BinnedBytesForHttpStreamFrame;
use crate::cache::pbvfs::PreBinnedItem;
use crate::cache::BinnedBytesForHttpStreamFrame;
use crate::frame::inmem::InMemoryFrame;
use crate::raw::conn::RawConnOut;
use crate::raw::EventQueryJsonStringFrame;

View File

@@ -20,6 +20,7 @@ use tracing::{debug, error, info, span, trace, warn, Level};
pub mod agg;
#[cfg(test)]
pub mod aggtest;
pub mod binned;
pub mod binnedstream;
pub mod cache;
pub mod channelconfig;

View File

@@ -208,7 +208,7 @@ async fn binned(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Re
async fn binned_binary(query: BinnedQuery, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
info!("binned_binary");
let ret = match disk::cache::binned_bytes_for_http(node_config, &query).await {
let ret = match disk::binned::binned_bytes_for_http(node_config, &query).await {
Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("desc-BINNED")))?,
Err(e) => {
error!("fn binned: {:?}", e);
@@ -220,8 +220,11 @@ async fn binned_binary(query: BinnedQuery, node_config: &NodeConfigCached) -> Re
async fn binned_json(query: BinnedQuery, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
info!("binned_json");
let ret = match disk::cache::binned_bytes_for_http(node_config, &query).await {
Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("desc-BINNED")))?,
let ret = match disk::binned::binned_json(node_config, &query).await {
Ok(val) => {
let body = serde_json::to_string(&val)?;
response(StatusCode::OK).body(Body::from(body))
}?,
Err(e) => {
error!("fn binned: {:?}", e);
response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?

View File

@@ -58,7 +58,7 @@ pub async fn get_binned(
.filter_map(|item| {
let g = match item {
Ok(frame) => {
type ExpectedType = disk::cache::BinnedBytesForHttpStreamFrame;
type ExpectedType = disk::binned::BinnedBytesForHttpStreamFrame;
let n1 = frame.buf().len();
match bincode::deserialize::<ExpectedType>(frame.buf()) {
Ok(item) => match item {

View File

@@ -14,6 +14,8 @@ use netpod::{ByteSize, Cluster, Database, Node, PerfOpts};
use std::future::ready;
use tokio::io::AsyncRead;
pub mod json;
fn test_cluster() -> Cluster {
let nodes = (0..3)
.into_iter()
@@ -150,7 +152,7 @@ where
.filter_map(|item| {
let g = match item {
Ok(frame) => {
type ExpectedType = disk::cache::BinnedBytesForHttpStreamFrame;
type ExpectedType = disk::binned::BinnedBytesForHttpStreamFrame;
//info!("TEST GOT FRAME len {}", frame.buf().len());
match bincode::deserialize::<ExpectedType>(frame.buf()) {
Ok(item) => match item {

View File

@@ -0,0 +1,72 @@
use crate::spawn_test_hosts;
use crate::test::test_cluster;
use chrono::{DateTime, Utc};
use err::Error;
use http::StatusCode;
use hyper::Body;
use netpod::log::*;
use netpod::{ByteSize, Cluster};
#[test]
fn get_binned_json_0() {
taskrun::run(get_binned_json_0_inner()).unwrap();
}
async fn get_binned_json_0_inner() -> Result<(), Error> {
let cluster = test_cluster();
let _hosts = spawn_test_hosts(cluster.clone());
get_binned_json_0_inner2(
"wave-f64-be-n21",
"1970-01-01T00:20:10.000Z",
"1970-01-01T01:20:30.000Z",
10,
&cluster,
)
.await
}
async fn get_binned_json_0_inner2(
channel_name: &str,
beg_date: &str,
end_date: &str,
bin_count: u32,
cluster: &Cluster,
) -> Result<(), Error> {
let t1 = Utc::now();
let node0 = &cluster.nodes[0];
let beg_date: DateTime<Utc> = beg_date.parse()?;
let end_date: DateTime<Utc> = end_date.parse()?;
let channel_backend = "testbackend";
let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ";
let disk_stats_every = ByteSize::kb(1024);
// TODO have a function to form the uri, including perf opts:
let uri = format!(
"http://{}:{}/api/1/binned?cache_usage=ignore&channel_backend={}&channel_name={}&bin_count={}&beg_date={}&end_date={}&disk_stats_every_kb={}",
node0.host,
node0.port,
channel_backend,
channel_name,
bin_count,
beg_date.format(date_fmt),
end_date.format(date_fmt),
disk_stats_every.bytes() / 1024,
);
info!("get_binned_json_0 get {}", uri);
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri(uri)
.header("Accept", "application/json")
.body(Body::empty())?;
let client = hyper::Client::new();
let res = client.request(req).await?;
if res.status() != StatusCode::OK {
error!("client response {:?}", res);
}
let res = hyper::body::to_bytes(res.into_body()).await?;
let res = String::from_utf8(res.to_vec())?;
info!("result from endpoint: [[[\n{}\n]]]", res);
let t2 = chrono::Utc::now();
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
info!("get_binned_json_0 DONE time {} ms", ms);
Ok(())
}