From 414aa594035f4de1feb1f46a5ba43822f920cd79 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 11 May 2021 18:59:37 +0200 Subject: [PATCH] WIP on json endpoint --- disk/src/binned.rs | 173 ++++++++++++++++++++++++++++++++++++ disk/src/binnedstream.rs | 6 +- disk/src/cache.rs | 146 ++++++------------------------ disk/src/frame/makeframe.rs | 2 +- disk/src/lib.rs | 1 + httpret/src/lib.rs | 9 +- retrieval/src/client.rs | 2 +- retrieval/src/test.rs | 4 +- retrieval/src/test/json.rs | 72 +++++++++++++++ 9 files changed, 286 insertions(+), 129 deletions(-) create mode 100644 disk/src/binned.rs create mode 100644 retrieval/src/test/json.rs diff --git a/disk/src/binned.rs b/disk/src/binned.rs new file mode 100644 index 0000000..30ada28 --- /dev/null +++ b/disk/src/binned.rs @@ -0,0 +1,173 @@ +use crate::agg::binnedt::IntoBinnedT; +use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem}; +use crate::binnedstream::{BinnedStream, BinnedStreamFromPreBinnedPatches}; +use crate::cache::{BinnedQuery, MergedFromRemotes}; +use crate::channelconfig::{extract_matching_config_entry, read_local_config}; +use crate::frame::makeframe::make_frame; +use crate::raw::EventsQuery; +use bytes::Bytes; +use err::Error; +use futures_core::Stream; +use futures_util::StreamExt; +use netpod::log::*; +use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange}; +use serde::{Deserialize, Serialize}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pub async fn binned_stream(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result { + if query.channel().backend != node_config.node.backend { + let err = Error::with_msg(format!( + "backend mismatch node: {} requested: {}", + node_config.node.backend, + query.channel().backend + )); + return Err(err); + } + let range = query.range(); + let channel_config = read_local_config(&query.channel(), &node_config.node).await?; + let entry = extract_matching_config_entry(range, &channel_config); + info!("binned_bytes_for_http found config entry {:?}", entry); + let range = BinnedRange::covering_range(range.clone(), query.bin_count()).ok_or(Error::with_msg(format!( + "binned_bytes_for_http BinnedRange::covering_range returned None" + )))?; + let perf_opts = PerfOpts { inmem_bufcap: 512 }; + match PreBinnedPatchRange::covering_range(query.range().clone(), query.bin_count()) { + Some(pre_range) => { + info!("binned_bytes_for_http found pre_range: {:?}", pre_range); + if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { + let msg = format!( + "binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}", + pre_range, range + ); + return Err(Error::with_msg(msg)); + } + let s1 = BinnedStreamFromPreBinnedPatches::new( + PreBinnedPatchIterator::from_range(pre_range), + query.channel().clone(), + range, + query.agg_kind().clone(), + query.cache_usage().clone(), + node_config, + query.disk_stats_every().clone(), + )?; + let ret = BinnedStream::new(Box::pin(s1))?; + Ok(ret) + } + None => { + info!( + "binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}", + range + ); + let evq = EventsQuery { + channel: query.channel().clone(), + range: query.range().clone(), + agg_kind: query.agg_kind().clone(), + }; + // TODO do I need to set up more transformations or binning to deliver the requested data? + let s1 = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone()); + let s1 = s1.into_binned_t(range); + let ret = BinnedStream::new(Box::pin(s1))?; + Ok(ret) + } + } +} + +type BinnedStreamBox = Pin> + Send>>; + +pub async fn binned_bytes_for_http( + node_config: &NodeConfigCached, + query: &BinnedQuery, +) -> Result { + let s1 = binned_stream(node_config, query).await?; + let ret = BinnedBytesForHttpStream::new(s1); + Ok(Box::pin(ret)) +} + +pub type BinnedBytesForHttpStreamFrame = ::Item; + +pub struct BinnedBytesForHttpStream { + inp: S, + errored: bool, + completed: bool, +} + +impl BinnedBytesForHttpStream { + pub fn new(inp: S) -> Self { + Self { + inp, + errored: false, + completed: false, + } + } +} + +impl Stream for BinnedBytesForHttpStream +where + S: Stream> + Unpin, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + if self.completed { + panic!("BinnedBytesForHttpStream poll_next on completed"); + } + if self.errored { + self.completed = true; + return Ready(None); + } + match self.inp.poll_next_unpin(cx) { + Ready(Some(item)) => match make_frame::(&item) { + Ok(buf) => Ready(Some(Ok(buf.freeze()))), + Err(e) => { + self.errored = true; + Ready(Some(Err(e.into()))) + } + }, + Ready(None) => { + self.completed = true; + Ready(None) + } + Pending => Pending, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct BinnedJsonResult { + ts_bin_edges: Vec, + counts: Vec, +} + +pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result { + let mut batch = MinMaxAvgScalarBinBatch::empty(); + let mut items = binned_stream(node_config, query).await?; + while let Some(item) = items.next().await { + match item { + Ok(item) => { + match item { + MinMaxAvgScalarBinBatchStreamItem::Values(mut vals) => { + batch.ts1s.append(&mut vals.ts1s); + batch.ts2s.append(&mut vals.ts2s); + batch.counts.append(&mut vals.counts); + batch.mins.append(&mut vals.mins); + batch.maxs.append(&mut vals.maxs); + batch.avgs.append(&mut vals.avgs); + } + _ => {} + } + serde_json::Value::String(format!("all good")) + } + Err(e) => serde_json::Value::String(format!("{:?}", e)), + }; + } + let mut ret = BinnedJsonResult { + ts_bin_edges: batch.ts1s, + counts: batch.counts, + }; + if let Some(&z) = batch.ts2s.last() { + ret.ts_bin_edges.push(z); + } + Ok(serde_json::to_value(ret)?) +} diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index 55cde27..41ac5f3 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -96,11 +96,11 @@ impl Stream for BinnedStreamFromPreBinnedPatches { } } -pub struct BinnedStreamFromMerged { +pub struct BinnedStream { inp: Pin> + Send>>, } -impl BinnedStreamFromMerged { +impl BinnedStream { pub fn new( inp: Pin> + Send>>, ) -> Result { @@ -108,7 +108,7 @@ impl BinnedStreamFromMerged { } } -impl Stream for BinnedStreamFromMerged { +impl Stream for BinnedStream { // TODO make this generic over all possible things type Item = Result; diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 8dc7914..117f8ce 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,11 +1,7 @@ -use crate::agg::binnedt::IntoBinnedT; use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem; -use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem}; -use crate::binnedstream::{BinnedStreamFromMerged, BinnedStreamFromPreBinnedPatches}; +use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::cache::pbv::PreBinnedValueByteStream; use crate::cache::pbvfs::PreBinnedItem; -use crate::channelconfig::{extract_matching_config_entry, read_local_config}; -use crate::frame::makeframe::make_frame; use crate::merge::MergedMinMaxAvgScalarStream; use crate::raw::EventsQuery; use bytes::Bytes; @@ -15,8 +11,7 @@ use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; use hyper::Response; use netpod::{ - AggKind, BinnedRange, ByteSize, Channel, Cluster, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchCoord, - PreBinnedPatchIterator, PreBinnedPatchRange, ToNanos, + AggKind, ByteSize, Channel, Cluster, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchCoord, ToNanos, }; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; @@ -90,6 +85,30 @@ impl BinnedQuery { info!("BinnedQuery::from_request {:?}", ret); Ok(ret) } + + pub fn range(&self) -> &NanoRange { + &self.range + } + + pub fn channel(&self) -> &Channel { + &self.channel + } + + pub fn bin_count(&self) -> u64 { + self.bin_count + } + + pub fn agg_kind(&self) -> &AggKind { + &self.agg_kind + } + + pub fn cache_usage(&self) -> &CacheUsage { + &self.cache_usage + } + + pub fn disk_stats_every(&self) -> &ByteSize { + &self.disk_stats_every + } } #[derive(Clone, Debug)] @@ -195,119 +214,6 @@ fn cache_usage_from_params(params: &BTreeMap) -> Result> + Send>>; - -pub async fn binned_bytes_for_http( - node_config: &NodeConfigCached, - query: &BinnedQuery, -) -> Result { - if query.channel.backend != node_config.node.backend { - let err = Error::with_msg(format!( - "backend mismatch node: {} requested: {}", - node_config.node.backend, query.channel.backend - )); - return Err(err); - } - let range = &query.range; - let channel_config = read_local_config(&query.channel, &node_config.node).await?; - let entry = extract_matching_config_entry(range, &channel_config); - info!("binned_bytes_for_http found config entry {:?}", entry); - let range = BinnedRange::covering_range(range.clone(), query.bin_count).ok_or(Error::with_msg(format!( - "binned_bytes_for_http BinnedRange::covering_range returned None" - )))?; - let perf_opts = PerfOpts { inmem_bufcap: 512 }; - match PreBinnedPatchRange::covering_range(query.range.clone(), query.bin_count) { - Some(pre_range) => { - info!("binned_bytes_for_http found pre_range: {:?}", pre_range); - if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { - let msg = format!( - "binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}", - pre_range, range - ); - return Err(Error::with_msg(msg)); - } - let s1 = BinnedStreamFromPreBinnedPatches::new( - PreBinnedPatchIterator::from_range(pre_range), - query.channel.clone(), - range, - query.agg_kind.clone(), - query.cache_usage.clone(), - node_config, - query.disk_stats_every.clone(), - )?; - let ret = BinnedBytesForHttpStream::new(s1); - Ok(Box::pin(ret)) - } - None => { - info!( - "binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}", - range - ); - let evq = EventsQuery { - channel: query.channel.clone(), - range: query.range.clone(), - agg_kind: query.agg_kind.clone(), - }; - // TODO do I need to set up more transformations or binning to deliver the requested data? - let s1 = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone()); - let s1 = s1.into_binned_t(range); - let s1 = BinnedStreamFromMerged::new(Box::pin(s1))?; - let ret = BinnedBytesForHttpStream::new(s1); - Ok(Box::pin(ret)) - } - } -} - -pub type BinnedBytesForHttpStreamFrame = ::Item; - -pub struct BinnedBytesForHttpStream { - inp: S, - errored: bool, - completed: bool, -} - -impl BinnedBytesForHttpStream { - pub fn new(inp: S) -> Self { - Self { - inp, - errored: false, - completed: false, - } - } -} - -impl Stream for BinnedBytesForHttpStream -where - S: Stream> + Unpin, -{ - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - if self.completed { - panic!("BinnedBytesForHttpStream poll_next on completed"); - } - if self.errored { - self.completed = true; - return Ready(None); - } - match self.inp.poll_next_unpin(cx) { - Ready(Some(item)) => match make_frame::(&item) { - Ok(buf) => Ready(Some(Ok(buf.freeze()))), - Err(e) => { - self.errored = true; - Ready(Some(Err(e.into()))) - } - }, - Ready(None) => { - self.completed = true; - Ready(None) - } - Pending => Pending, - } - } -} - // NOTE This answers a request for a single valid pre-binned patch. // A user must first make sure that the grid spec is valid, and that this node is responsible for it. // Otherwise it is an error. diff --git a/disk/src/frame/makeframe.rs b/disk/src/frame/makeframe.rs index f374fb7..876bb37 100644 --- a/disk/src/frame/makeframe.rs +++ b/disk/src/frame/makeframe.rs @@ -1,5 +1,5 @@ +use crate::binned::BinnedBytesForHttpStreamFrame; use crate::cache::pbvfs::PreBinnedItem; -use crate::cache::BinnedBytesForHttpStreamFrame; use crate::frame::inmem::InMemoryFrame; use crate::raw::conn::RawConnOut; use crate::raw::EventQueryJsonStringFrame; diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 2569a94..dc01b52 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -20,6 +20,7 @@ use tracing::{debug, error, info, span, trace, warn, Level}; pub mod agg; #[cfg(test)] pub mod aggtest; +pub mod binned; pub mod binnedstream; pub mod cache; pub mod channelconfig; diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index fda1792..324135a 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -208,7 +208,7 @@ async fn binned(req: Request, node_config: &NodeConfigCached) -> Result Result, Error> { info!("binned_binary"); - let ret = match disk::cache::binned_bytes_for_http(node_config, &query).await { + let ret = match disk::binned::binned_bytes_for_http(node_config, &query).await { Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("desc-BINNED")))?, Err(e) => { error!("fn binned: {:?}", e); @@ -220,8 +220,11 @@ async fn binned_binary(query: BinnedQuery, node_config: &NodeConfigCached) -> Re async fn binned_json(query: BinnedQuery, node_config: &NodeConfigCached) -> Result, Error> { info!("binned_json"); - let ret = match disk::cache::binned_bytes_for_http(node_config, &query).await { - Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("desc-BINNED")))?, + let ret = match disk::binned::binned_json(node_config, &query).await { + Ok(val) => { + let body = serde_json::to_string(&val)?; + response(StatusCode::OK).body(Body::from(body)) + }?, Err(e) => { error!("fn binned: {:?}", e); response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? diff --git a/retrieval/src/client.rs b/retrieval/src/client.rs index 6cfed7b..15fde32 100644 --- a/retrieval/src/client.rs +++ b/retrieval/src/client.rs @@ -58,7 +58,7 @@ pub async fn get_binned( .filter_map(|item| { let g = match item { Ok(frame) => { - type ExpectedType = disk::cache::BinnedBytesForHttpStreamFrame; + type ExpectedType = disk::binned::BinnedBytesForHttpStreamFrame; let n1 = frame.buf().len(); match bincode::deserialize::(frame.buf()) { Ok(item) => match item { diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index 80ab30b..a8c5d8b 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -14,6 +14,8 @@ use netpod::{ByteSize, Cluster, Database, Node, PerfOpts}; use std::future::ready; use tokio::io::AsyncRead; +pub mod json; + fn test_cluster() -> Cluster { let nodes = (0..3) .into_iter() @@ -150,7 +152,7 @@ where .filter_map(|item| { let g = match item { Ok(frame) => { - type ExpectedType = disk::cache::BinnedBytesForHttpStreamFrame; + type ExpectedType = disk::binned::BinnedBytesForHttpStreamFrame; //info!("TEST GOT FRAME len {}", frame.buf().len()); match bincode::deserialize::(frame.buf()) { Ok(item) => match item { diff --git a/retrieval/src/test/json.rs b/retrieval/src/test/json.rs new file mode 100644 index 0000000..e535e9b --- /dev/null +++ b/retrieval/src/test/json.rs @@ -0,0 +1,72 @@ +use crate::spawn_test_hosts; +use crate::test::test_cluster; +use chrono::{DateTime, Utc}; +use err::Error; +use http::StatusCode; +use hyper::Body; +use netpod::log::*; +use netpod::{ByteSize, Cluster}; + +#[test] +fn get_binned_json_0() { + taskrun::run(get_binned_json_0_inner()).unwrap(); +} + +async fn get_binned_json_0_inner() -> Result<(), Error> { + let cluster = test_cluster(); + let _hosts = spawn_test_hosts(cluster.clone()); + get_binned_json_0_inner2( + "wave-f64-be-n21", + "1970-01-01T00:20:10.000Z", + "1970-01-01T01:20:30.000Z", + 10, + &cluster, + ) + .await +} + +async fn get_binned_json_0_inner2( + channel_name: &str, + beg_date: &str, + end_date: &str, + bin_count: u32, + cluster: &Cluster, +) -> Result<(), Error> { + let t1 = Utc::now(); + let node0 = &cluster.nodes[0]; + let beg_date: DateTime = beg_date.parse()?; + let end_date: DateTime = end_date.parse()?; + let channel_backend = "testbackend"; + let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; + let disk_stats_every = ByteSize::kb(1024); + // TODO have a function to form the uri, including perf opts: + let uri = format!( + "http://{}:{}/api/1/binned?cache_usage=ignore&channel_backend={}&channel_name={}&bin_count={}&beg_date={}&end_date={}&disk_stats_every_kb={}", + node0.host, + node0.port, + channel_backend, + channel_name, + bin_count, + beg_date.format(date_fmt), + end_date.format(date_fmt), + disk_stats_every.bytes() / 1024, + ); + info!("get_binned_json_0 get {}", uri); + let req = hyper::Request::builder() + .method(http::Method::GET) + .uri(uri) + .header("Accept", "application/json") + .body(Body::empty())?; + let client = hyper::Client::new(); + let res = client.request(req).await?; + if res.status() != StatusCode::OK { + error!("client response {:?}", res); + } + let res = hyper::body::to_bytes(res.into_body()).await?; + let res = String::from_utf8(res.to_vec())?; + info!("result from endpoint: [[[\n{}\n]]]", res); + let t2 = chrono::Utc::now(); + let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; + info!("get_binned_json_0 DONE time {} ms", ms); + Ok(()) +}