Query parameters camelCase

This commit is contained in:
Dominik Werder
2021-05-27 19:24:44 +02:00
parent 894079d936
commit b1bed52ada
7 changed files with 138 additions and 87 deletions
+58 -20
View File
@@ -5,7 +5,7 @@ use crate::frame::makeframe::FrameType;
use crate::merge::MergedMinMaxAvgScalarStream; use crate::merge::MergedMinMaxAvgScalarStream;
use crate::raw::EventsQuery; use crate::raw::EventsQuery;
use bytes::Bytes; use bytes::Bytes;
use chrono::{DateTime, Utc}; use chrono::{DateTime, TimeZone, Utc};
use err::Error; use err::Error;
use futures_core::Stream; use futures_core::Stream;
use futures_util::{pin_mut, StreamExt}; use futures_util::{pin_mut, StreamExt};
@@ -13,7 +13,7 @@ use hyper::{Body, Response};
use netpod::log::*; use netpod::log::*;
use netpod::timeunits::SEC; use netpod::timeunits::SEC;
use netpod::{ use netpod::{
AggKind, ByteSize, Channel, Cluster, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchCoord, ToNanos, AggKind, ByteSize, Channel, Cluster, HostPort, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchCoord, ToNanos,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, VecDeque}; use std::collections::{BTreeMap, VecDeque};
@@ -83,16 +83,28 @@ impl Display for CacheUsage {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct BinnedQuery { pub struct BinnedQuery {
range: NanoRange,
bin_count: u64,
agg_kind: AggKind,
channel: Channel, channel: Channel,
range: NanoRange,
bin_count: u32,
agg_kind: AggKind,
cache_usage: CacheUsage, cache_usage: CacheUsage,
disk_stats_every: ByteSize, disk_stats_every: ByteSize,
report_error: bool, report_error: bool,
} }
impl BinnedQuery { impl BinnedQuery {
pub fn new(channel: Channel, range: NanoRange, bin_count: u32, agg_kind: AggKind) -> BinnedQuery {
BinnedQuery {
channel,
range,
bin_count,
agg_kind,
cache_usage: CacheUsage::Use,
disk_stats_every: ByteSize(1024 * 1024 * 4),
report_error: false,
}
}
pub fn from_request(req: &http::request::Parts) -> Result<Self, Error> { pub fn from_request(req: &http::request::Parts) -> Result<Self, Error> {
let params = netpod::query_params(req.uri.query()); let params = netpod::query_params(req.uri.query());
let beg_date = params.get("begDate").ok_or(Error::with_msg("missing begDate"))?; let beg_date = params.get("begDate").ok_or(Error::with_msg("missing begDate"))?;
@@ -137,7 +149,7 @@ impl BinnedQuery {
&self.channel &self.channel
} }
pub fn bin_count(&self) -> u64 { pub fn bin_count(&self) -> u32 {
self.bin_count self.bin_count
} }
@@ -156,6 +168,32 @@ impl BinnedQuery {
pub fn report_error(&self) -> bool { pub fn report_error(&self) -> bool {
self.report_error self.report_error
} }
pub fn set_cache_usage(&mut self, k: CacheUsage) {
self.cache_usage = k;
}
pub fn set_disk_stats_every(&mut self, k: ByteSize) {
self.disk_stats_every = k;
}
// TODO the BinnedQuery itself should maybe already carry the full HostPort?
// On the other hand, want to keep the flexibility for the fail over possibility..
pub fn url(&self, host: &HostPort) -> String {
let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ";
format!(
"http://{}:{}/api/4/binned?cacheUsage={}&channelBackend={}&channelName={}&binCount={}&begDate={}&endDate={}&diskStatsEveryKb={}",
host.host,
host.port,
self.cache_usage,
self.channel.backend,
self.channel.name,
self.bin_count,
Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt),
Utc.timestamp_nanos(self.range.end as i64).format(date_fmt),
self.disk_stats_every.bytes() / 1024,
)
}
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@@ -190,45 +228,45 @@ impl PreBinnedQuery {
pub fn from_request(req: &http::request::Parts) -> Result<Self, Error> { pub fn from_request(req: &http::request::Parts) -> Result<Self, Error> {
let params = netpod::query_params(req.uri.query()); let params = netpod::query_params(req.uri.query());
let patch_ix = params let patch_ix = params
.get("patch_ix") .get("patchIx")
.ok_or(Error::with_msg("missing patch_ix"))? .ok_or(Error::with_msg("missing patchIx"))?
.parse()?; .parse()?;
let bin_t_len = params let bin_t_len = params
.get("bin_t_len") .get("binTlen")
.ok_or(Error::with_msg("missing bin_t_len"))? .ok_or(Error::with_msg("missing binTlen"))?
.parse()?; .parse()?;
let patch_t_len = params let patch_t_len = params
.get("patch_t_len") .get("patchTlen")
.ok_or(Error::with_msg("missing patch_t_len"))? .ok_or(Error::with_msg("missing patchTlen"))?
.parse()?; .parse()?;
let disk_stats_every = params let disk_stats_every = params
.get("disk_stats_every_kb") .get("diskStatsEveryKb")
.ok_or(Error::with_msg("missing disk_stats_every_kb"))?; .ok_or(Error::with_msg("missing diskStatsEveryKb"))?;
let disk_stats_every = disk_stats_every let disk_stats_every = disk_stats_every
.parse() .parse()
.map_err(|e| Error::with_msg(format!("can not parse disk_stats_every_kb {:?}", e)))?; .map_err(|e| Error::with_msg(format!("can not parse diskStatsEveryKb {:?}", e)))?;
let ret = PreBinnedQuery { let ret = PreBinnedQuery {
patch: PreBinnedPatchCoord::new(bin_t_len, patch_t_len, patch_ix), patch: PreBinnedPatchCoord::new(bin_t_len, patch_t_len, patch_ix),
agg_kind: params agg_kind: params
.get("agg_kind") .get("aggKind")
.map_or(&format!("{}", AggKind::DimXBins1), |k| k) .map_or(&format!("{}", AggKind::DimXBins1), |k| k)
.parse() .parse()
.map_err(|e| Error::with_msg(format!("can not parse agg_kind {:?}", e)))?, .map_err(|e| Error::with_msg(format!("can not parse aggKind {:?}", e)))?,
channel: channel_from_params(&params)?, channel: channel_from_params(&params)?,
cache_usage: CacheUsage::from_params(&params)?, cache_usage: CacheUsage::from_params(&params)?,
disk_stats_every: ByteSize::kb(disk_stats_every), disk_stats_every: ByteSize::kb(disk_stats_every),
report_error: params report_error: params
.get("report_error") .get("reportError")
.map_or("false", |k| k) .map_or("false", |k| k)
.parse() .parse()
.map_err(|e| Error::with_msg(format!("can not parse report_error {:?}", e)))?, .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?,
}; };
Ok(ret) Ok(ret)
} }
pub fn make_query_string(&self) -> String { pub fn make_query_string(&self) -> String {
format!( format!(
"{}&channel_backend={}&channel_name={}&agg_kind={}&cache_usage={}&disk_stats_every_kb={}&report_error={}", "{}&channelBackend={}&channelName={}&aggKind={}&cacheUsage={}&diskStatsEveryKb={}&reportError={}",
self.patch.to_url_params_strings(), self.patch.to_url_params_strings(),
self.channel.backend, self.channel.backend,
self.channel.name, self.channel.name,
+1 -1
View File
@@ -153,7 +153,7 @@ where
} }
// TODO do I need to set up more transformations or binning to deliver the requested data? // TODO do I need to set up more transformations or binning to deliver the requested data?
let count = self.query.patch.patch_t_len() / self.query.patch.bin_t_len(); let count = self.query.patch.patch_t_len() / self.query.patch.bin_t_len();
let range = BinnedRange::covering_range(evq.range.clone(), count) let range = BinnedRange::covering_range(evq.range.clone(), count as u32)
.unwrap() .unwrap()
.ok_or(Error::with_msg("covering_range returns None")) .ok_or(Error::with_msg("covering_range returns None"))
.unwrap(); .unwrap();
+4 -4
View File
@@ -34,7 +34,7 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> {
async move { async move {
Ok::<_, Error>(service_fn({ Ok::<_, Error>(service_fn({
move |req| { move |req| {
let f = data_api_proxy(req, node_config.clone()); let f = http_service(req, node_config.clone());
Cont { f: Box::pin(f) } Cont { f: Box::pin(f) }
} }
})) }))
@@ -46,8 +46,8 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> {
Ok(()) Ok(())
} }
async fn data_api_proxy(req: Request<Body>, node_config: NodeConfigCached) -> Result<Response<Body>, Error> { async fn http_service(req: Request<Body>, node_config: NodeConfigCached) -> Result<Response<Body>, Error> {
match data_api_proxy_try(req, &node_config).await { match http_service_try(req, &node_config).await {
Ok(k) => Ok(k), Ok(k) => Ok(k),
Err(e) => { Err(e) => {
error!("data_api_proxy sees error: {:?}", e); error!("data_api_proxy sees error: {:?}", e);
@@ -107,7 +107,7 @@ macro_rules! static_http {
}; };
} }
async fn data_api_proxy_try(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> { async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
let uri = req.uri().clone(); let uri = req.uri().clone();
let path = uri.path(); let path = uri.path();
if path == "/api/4/node_status" { if path == "/api/4/node_status" {
+36 -8
View File
@@ -182,6 +182,27 @@ pub struct Channel {
pub name: String, pub name: String,
} }
pub struct HostPort {
pub host: String,
pub port: u16,
}
impl HostPort {
pub fn new<S: Into<String>>(host: S, port: u16) -> Self {
Self {
host: host.into(),
port,
}
}
pub fn from_node(node: &Node) -> Self {
Self {
host: node.host.clone(),
port: node.port,
}
}
}
impl Channel { impl Channel {
pub fn name(&self) -> &str { pub fn name(&self) -> &str {
&self.name &self.name
@@ -218,6 +239,13 @@ impl std::fmt::Debug for NanoRange {
} }
impl NanoRange { impl NanoRange {
pub fn from_date_time(beg: DateTime<Utc>, end: DateTime<Utc>) -> Self {
Self {
beg: beg.timestamp_nanos() as u64,
end: end.timestamp_nanos() as u64,
}
}
pub fn delta(&self) -> u64 { pub fn delta(&self) -> u64 {
self.end - self.beg self.end - self.beg
} }
@@ -364,7 +392,7 @@ fn get_patch_t_len(bin_t_len: u64) -> u64 {
impl PreBinnedPatchRange { impl PreBinnedPatchRange {
/// Cover at least the given range with at least as many as the requested number of bins. /// Cover at least the given range with at least as many as the requested number of bins.
pub fn covering_range(range: NanoRange, min_bin_count: u64) -> Result<Option<Self>, Error> { pub fn covering_range(range: NanoRange, min_bin_count: u32) -> Result<Option<Self>, Error> {
if min_bin_count < 1 { if min_bin_count < 1 {
Err(Error::with_msg("min_bin_count < 1"))?; Err(Error::with_msg("min_bin_count < 1"))?;
} }
@@ -375,7 +403,7 @@ impl PreBinnedPatchRange {
if dt > DAY * 14 { if dt > DAY * 14 {
Err(Error::with_msg("dt > DAY * 14"))?; Err(Error::with_msg("dt > DAY * 14"))?;
} }
let bs = dt / min_bin_count; let bs = dt / min_bin_count as u64;
let mut i1 = BIN_T_LEN_OPTIONS.len(); let mut i1 = BIN_T_LEN_OPTIONS.len();
loop { loop {
if i1 <= 0 { if i1 <= 0 {
@@ -434,8 +462,8 @@ impl PreBinnedPatchCoord {
} }
} }
pub fn bin_count(&self) -> u64 { pub fn bin_count(&self) -> u32 {
self.spec.patch_t_len() / self.spec.bin_t_len (self.spec.patch_t_len() / self.spec.bin_t_len) as u32
} }
pub fn spec(&self) -> &PreBinnedPatchGridSpec { pub fn spec(&self) -> &PreBinnedPatchGridSpec {
@@ -448,7 +476,7 @@ impl PreBinnedPatchCoord {
pub fn to_url_params_strings(&self) -> String { pub fn to_url_params_strings(&self) -> String {
format!( format!(
"patch_t_len={}&bin_t_len={}&patch_ix={}", "patchTlen={}&binTlen={}&patchIx={}",
self.spec.patch_t_len(), self.spec.patch_t_len(),
self.spec.bin_t_len(), self.spec.bin_t_len(),
self.ix() self.ix()
@@ -542,7 +570,7 @@ pub struct BinnedRange {
} }
impl BinnedRange { impl BinnedRange {
pub fn covering_range(range: NanoRange, min_bin_count: u64) -> Result<Option<Self>, Error> { pub fn covering_range(range: NanoRange, min_bin_count: u32) -> Result<Option<Self>, Error> {
if min_bin_count < 1 { if min_bin_count < 1 {
Err(Error::with_msg("min_bin_count < 1"))?; Err(Error::with_msg("min_bin_count < 1"))?;
} }
@@ -553,7 +581,7 @@ impl BinnedRange {
if dt > DAY * 14 { if dt > DAY * 14 {
Err(Error::with_msg("dt > DAY * 14"))?; Err(Error::with_msg("dt > DAY * 14"))?;
} }
let bs = dt / min_bin_count; let bs = dt / min_bin_count as u64;
let mut i1 = BIN_THRESHOLDS.len(); let mut i1 = BIN_THRESHOLDS.len();
loop { loop {
if i1 <= 0 { if i1 <= 0 {
@@ -742,7 +770,7 @@ pub struct PerfOpts {
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct ByteSize(u32); pub struct ByteSize(pub u32);
impl ByteSize { impl ByteSize {
pub fn b(b: u32) -> Self { pub fn b(b: u32) -> Self {
+14 -20
View File
@@ -2,7 +2,7 @@ use chrono::{DateTime, Utc};
use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use disk::agg::streams::StreamItem; use disk::agg::streams::StreamItem;
use disk::binned::RangeCompletableItem; use disk::binned::RangeCompletableItem;
use disk::cache::CacheUsage; use disk::cache::{BinnedQuery, CacheUsage};
use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::frame::inmem::InMemoryFrameAsyncReadStream;
use disk::frame::makeframe::FrameType; use disk::frame::makeframe::FrameType;
use disk::streamlog::Streamlog; use disk::streamlog::Streamlog;
@@ -11,7 +11,7 @@ use futures_util::TryStreamExt;
use http::StatusCode; use http::StatusCode;
use hyper::Body; use hyper::Body;
use netpod::log::*; use netpod::log::*;
use netpod::PerfOpts; use netpod::{AggKind, ByteSize, Channel, HostPort, NanoRange, PerfOpts};
pub async fn status(host: String, port: u16) -> Result<(), Error> { pub async fn status(host: String, port: u16) -> Result<(), Error> {
let t1 = Utc::now(); let t1 = Utc::now();
@@ -52,26 +52,20 @@ pub async fn get_binned(
info!("end {}", end_date); info!("end {}", end_date);
info!("-------"); info!("-------");
let t1 = Utc::now(); let t1 = Utc::now();
let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; let channel = Channel {
let uri = format!( backend: channel_backend.clone(),
concat!( name: channel_name.into(),
"http://{}:{}/api/4/binned?channelBackend={}&channelName={}", };
"&begDate={}&endDate={}&binCount={}&cacheUsage={}", let agg_kind = AggKind::DimXBins1;
"&diskStatsEveryKb={}&reportError=true", let range = NanoRange::from_date_time(beg_date, end_date);
), let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind);
host, query.set_cache_usage(cache_usage);
port, query.set_disk_stats_every(ByteSize(1024 * disk_stats_every_kb));
channel_backend, let hp = HostPort { host: host, port: port };
channel_name, let url = query.url(&hp);
beg_date.format(date_fmt),
end_date.format(date_fmt),
bin_count,
cache_usage.query_param_value(),
disk_stats_every_kb,
);
let req = hyper::Request::builder() let req = hyper::Request::builder()
.method(http::Method::GET) .method(http::Method::GET)
.uri(uri) .uri(url)
.header("accept", "application/octet-stream") .header("accept", "application/octet-stream")
.body(Body::empty())?; .body(Body::empty())?;
let client = hyper::Client::new(); let client = hyper::Client::new();
+13 -17
View File
@@ -4,6 +4,7 @@ use chrono::{DateTime, Utc};
use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use disk::agg::streams::StreamItem; use disk::agg::streams::StreamItem;
use disk::binned::RangeCompletableItem; use disk::binned::RangeCompletableItem;
use disk::cache::BinnedQuery;
use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::frame::inmem::InMemoryFrameAsyncReadStream;
use disk::streamlog::Streamlog; use disk::streamlog::Streamlog;
use err::Error; use err::Error;
@@ -12,7 +13,7 @@ use futures_util::TryStreamExt;
use http::StatusCode; use http::StatusCode;
use hyper::Body; use hyper::Body;
use netpod::log::*; use netpod::log::*;
use netpod::{ByteSize, Cluster, Database, Node, PerfOpts}; use netpod::{AggKind, Channel, Cluster, Database, HostPort, NanoRange, Node, PerfOpts};
use std::future::ready; use std::future::ready;
use tokio::io::AsyncRead; use tokio::io::AsyncRead;
@@ -92,29 +93,24 @@ where
S: AsRef<str>, S: AsRef<str>,
{ {
let t1 = Utc::now(); let t1 = Utc::now();
let agg_kind = AggKind::DimXBins1;
let node0 = &cluster.nodes[0]; let node0 = &cluster.nodes[0];
let beg_date: DateTime<Utc> = beg_date.as_ref().parse()?; let beg_date: DateTime<Utc> = beg_date.as_ref().parse()?;
let end_date: DateTime<Utc> = end_date.as_ref().parse()?; let end_date: DateTime<Utc> = end_date.as_ref().parse()?;
let channel_backend = "testbackend"; let channel_backend = "testbackend";
let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ";
let perf_opts = PerfOpts { inmem_bufcap: 512 }; let perf_opts = PerfOpts { inmem_bufcap: 512 };
let disk_stats_every = ByteSize::kb(1024); let channel = Channel {
// TODO have a function to form the uri, including perf opts: backend: channel_backend.into(),
let uri = format!( name: channel_name.into(),
"http://{}:{}/api/4/binned?cache_usage=use&channel_backend={}&channel_name={}&bin_count={}&beg_date={}&end_date={}&disk_stats_every_kb={}", };
node0.host, let range = NanoRange::from_date_time(beg_date, end_date);
node0.port, let query = BinnedQuery::new(channel, range, bin_count, agg_kind);
channel_backend, let hp = HostPort::from_node(node0);
channel_name, let url = query.url(&hp);
bin_count, info!("get_binned_channel get {}", url);
beg_date.format(date_fmt),
end_date.format(date_fmt),
disk_stats_every.bytes() / 1024,
);
info!("get_binned_channel get {}", uri);
let req = hyper::Request::builder() let req = hyper::Request::builder()
.method(http::Method::GET) .method(http::Method::GET)
.uri(uri) .uri(url)
.header("accept", "application/octet-stream") .header("accept", "application/octet-stream")
.body(Body::empty())?; .body(Body::empty())?;
let client = hyper::Client::new(); let client = hyper::Client::new();
+12 -17
View File
@@ -1,11 +1,12 @@
use crate::spawn_test_hosts; use crate::spawn_test_hosts;
use crate::test::test_cluster; use crate::test::test_cluster;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use disk::cache::BinnedQuery;
use err::Error; use err::Error;
use http::StatusCode; use http::StatusCode;
use hyper::Body; use hyper::Body;
use netpod::log::*; use netpod::log::*;
use netpod::{ByteSize, Cluster}; use netpod::{AggKind, Channel, Cluster, HostPort, NanoRange};
#[test] #[test]
fn get_binned_json_0() { fn get_binned_json_0() {
@@ -33,28 +34,22 @@ async fn get_binned_json_0_inner2(
cluster: &Cluster, cluster: &Cluster,
) -> Result<(), Error> { ) -> Result<(), Error> {
let t1 = Utc::now(); let t1 = Utc::now();
let agg_kind = AggKind::DimXBins1;
let node0 = &cluster.nodes[0]; let node0 = &cluster.nodes[0];
let beg_date: DateTime<Utc> = beg_date.parse()?; let beg_date: DateTime<Utc> = beg_date.parse()?;
let end_date: DateTime<Utc> = end_date.parse()?; let end_date: DateTime<Utc> = end_date.parse()?;
let channel_backend = "testbackend"; let channel_backend = "testbackend";
let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; let channel = Channel {
let disk_stats_every = ByteSize::kb(1024); backend: channel_backend.into(),
// TODO have a function to form the uri, including perf opts: name: channel_name.into(),
let uri = format!( };
"http://{}:{}/api/4/binned?cache_usage=ignore&channel_backend={}&channel_name={}&bin_count={}&beg_date={}&end_date={}&disk_stats_every_kb={}", let range = NanoRange::from_date_time(beg_date, end_date);
node0.host, let query = BinnedQuery::new(channel, range, bin_count, agg_kind);
node0.port, let url = query.url(&HostPort::from_node(node0));
channel_backend, info!("get_binned_json_0 get {}", url);
channel_name,
bin_count,
beg_date.format(date_fmt),
end_date.format(date_fmt),
disk_stats_every.bytes() / 1024,
);
info!("get_binned_json_0 get {}", uri);
let req = hyper::Request::builder() let req = hyper::Request::builder()
.method(http::Method::GET) .method(http::Method::GET)
.uri(uri) .uri(url)
.header("Accept", "application/json") .header("Accept", "application/json")
.body(Body::empty())?; .body(Body::empty())?;
let client = hyper::Client::new(); let client = hyper::Client::new();