diff --git a/disk/src/cache.rs b/disk/src/cache.rs index f99bd2f..ca4ea2d 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -5,7 +5,7 @@ use crate::frame::makeframe::FrameType; use crate::merge::MergedMinMaxAvgScalarStream; use crate::raw::EventsQuery; use bytes::Bytes; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, TimeZone, Utc}; use err::Error; use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; @@ -13,7 +13,7 @@ use hyper::{Body, Response}; use netpod::log::*; use netpod::timeunits::SEC; use netpod::{ - AggKind, ByteSize, Channel, Cluster, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchCoord, ToNanos, + AggKind, ByteSize, Channel, Cluster, HostPort, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchCoord, ToNanos, }; use serde::{Deserialize, Serialize}; use std::collections::{BTreeMap, VecDeque}; @@ -83,16 +83,28 @@ impl Display for CacheUsage { #[derive(Clone, Debug)] pub struct BinnedQuery { - range: NanoRange, - bin_count: u64, - agg_kind: AggKind, channel: Channel, + range: NanoRange, + bin_count: u32, + agg_kind: AggKind, cache_usage: CacheUsage, disk_stats_every: ByteSize, report_error: bool, } impl BinnedQuery { + pub fn new(channel: Channel, range: NanoRange, bin_count: u32, agg_kind: AggKind) -> BinnedQuery { + BinnedQuery { + channel, + range, + bin_count, + agg_kind, + cache_usage: CacheUsage::Use, + disk_stats_every: ByteSize(1024 * 1024 * 4), + report_error: false, + } + } + pub fn from_request(req: &http::request::Parts) -> Result { let params = netpod::query_params(req.uri.query()); let beg_date = params.get("begDate").ok_or(Error::with_msg("missing begDate"))?; @@ -137,7 +149,7 @@ impl BinnedQuery { &self.channel } - pub fn bin_count(&self) -> u64 { + pub fn bin_count(&self) -> u32 { self.bin_count } @@ -156,6 +168,32 @@ impl BinnedQuery { pub fn report_error(&self) -> bool { self.report_error } + + pub fn set_cache_usage(&mut self, k: CacheUsage) { + self.cache_usage = k; + } + + pub fn set_disk_stats_every(&mut self, k: ByteSize) { + self.disk_stats_every = k; + } + + // TODO the BinnedQuery itself should maybe already carry the full HostPort? + // On the other hand, want to keep the flexibility for the fail over possibility.. + pub fn url(&self, host: &HostPort) -> String { + let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; + format!( + "http://{}:{}/api/4/binned?cacheUsage={}&channelBackend={}&channelName={}&binCount={}&begDate={}&endDate={}&diskStatsEveryKb={}", + host.host, + host.port, + self.cache_usage, + self.channel.backend, + self.channel.name, + self.bin_count, + Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt), + Utc.timestamp_nanos(self.range.end as i64).format(date_fmt), + self.disk_stats_every.bytes() / 1024, + ) + } } #[derive(Clone, Debug)] @@ -190,45 +228,45 @@ impl PreBinnedQuery { pub fn from_request(req: &http::request::Parts) -> Result { let params = netpod::query_params(req.uri.query()); let patch_ix = params - .get("patch_ix") - .ok_or(Error::with_msg("missing patch_ix"))? + .get("patchIx") + .ok_or(Error::with_msg("missing patchIx"))? .parse()?; let bin_t_len = params - .get("bin_t_len") - .ok_or(Error::with_msg("missing bin_t_len"))? + .get("binTlen") + .ok_or(Error::with_msg("missing binTlen"))? .parse()?; let patch_t_len = params - .get("patch_t_len") - .ok_or(Error::with_msg("missing patch_t_len"))? + .get("patchTlen") + .ok_or(Error::with_msg("missing patchTlen"))? .parse()?; let disk_stats_every = params - .get("disk_stats_every_kb") - .ok_or(Error::with_msg("missing disk_stats_every_kb"))?; + .get("diskStatsEveryKb") + .ok_or(Error::with_msg("missing diskStatsEveryKb"))?; let disk_stats_every = disk_stats_every .parse() - .map_err(|e| Error::with_msg(format!("can not parse disk_stats_every_kb {:?}", e)))?; + .map_err(|e| Error::with_msg(format!("can not parse diskStatsEveryKb {:?}", e)))?; let ret = PreBinnedQuery { patch: PreBinnedPatchCoord::new(bin_t_len, patch_t_len, patch_ix), agg_kind: params - .get("agg_kind") + .get("aggKind") .map_or(&format!("{}", AggKind::DimXBins1), |k| k) .parse() - .map_err(|e| Error::with_msg(format!("can not parse agg_kind {:?}", e)))?, + .map_err(|e| Error::with_msg(format!("can not parse aggKind {:?}", e)))?, channel: channel_from_params(¶ms)?, cache_usage: CacheUsage::from_params(¶ms)?, disk_stats_every: ByteSize::kb(disk_stats_every), report_error: params - .get("report_error") + .get("reportError") .map_or("false", |k| k) .parse() - .map_err(|e| Error::with_msg(format!("can not parse report_error {:?}", e)))?, + .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?, }; Ok(ret) } pub fn make_query_string(&self) -> String { format!( - "{}&channel_backend={}&channel_name={}&agg_kind={}&cache_usage={}&disk_stats_every_kb={}&report_error={}", + "{}&channelBackend={}&channelName={}&aggKind={}&cacheUsage={}&diskStatsEveryKb={}&reportError={}", self.patch.to_url_params_strings(), self.channel.backend, self.channel.name, diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index 8cf3a0b..e8ace01 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -153,7 +153,7 @@ where } // TODO do I need to set up more transformations or binning to deliver the requested data? let count = self.query.patch.patch_t_len() / self.query.patch.bin_t_len(); - let range = BinnedRange::covering_range(evq.range.clone(), count) + let range = BinnedRange::covering_range(evq.range.clone(), count as u32) .unwrap() .ok_or(Error::with_msg("covering_range returns None")) .unwrap(); diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 71c0232..4620425 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -34,7 +34,7 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { async move { Ok::<_, Error>(service_fn({ move |req| { - let f = data_api_proxy(req, node_config.clone()); + let f = http_service(req, node_config.clone()); Cont { f: Box::pin(f) } } })) @@ -46,8 +46,8 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { Ok(()) } -async fn data_api_proxy(req: Request, node_config: NodeConfigCached) -> Result, Error> { - match data_api_proxy_try(req, &node_config).await { +async fn http_service(req: Request, node_config: NodeConfigCached) -> Result, Error> { + match http_service_try(req, &node_config).await { Ok(k) => Ok(k), Err(e) => { error!("data_api_proxy sees error: {:?}", e); @@ -107,7 +107,7 @@ macro_rules! static_http { }; } -async fn data_api_proxy_try(req: Request, node_config: &NodeConfigCached) -> Result, Error> { +async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let uri = req.uri().clone(); let path = uri.path(); if path == "/api/4/node_status" { diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 89cffbe..ea31ca3 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -182,6 +182,27 @@ pub struct Channel { pub name: String, } +pub struct HostPort { + pub host: String, + pub port: u16, +} + +impl HostPort { + pub fn new>(host: S, port: u16) -> Self { + Self { + host: host.into(), + port, + } + } + + pub fn from_node(node: &Node) -> Self { + Self { + host: node.host.clone(), + port: node.port, + } + } +} + impl Channel { pub fn name(&self) -> &str { &self.name @@ -218,6 +239,13 @@ impl std::fmt::Debug for NanoRange { } impl NanoRange { + pub fn from_date_time(beg: DateTime, end: DateTime) -> Self { + Self { + beg: beg.timestamp_nanos() as u64, + end: end.timestamp_nanos() as u64, + } + } + pub fn delta(&self) -> u64 { self.end - self.beg } @@ -364,7 +392,7 @@ fn get_patch_t_len(bin_t_len: u64) -> u64 { impl PreBinnedPatchRange { /// Cover at least the given range with at least as many as the requested number of bins. - pub fn covering_range(range: NanoRange, min_bin_count: u64) -> Result, Error> { + pub fn covering_range(range: NanoRange, min_bin_count: u32) -> Result, Error> { if min_bin_count < 1 { Err(Error::with_msg("min_bin_count < 1"))?; } @@ -375,7 +403,7 @@ impl PreBinnedPatchRange { if dt > DAY * 14 { Err(Error::with_msg("dt > DAY * 14"))?; } - let bs = dt / min_bin_count; + let bs = dt / min_bin_count as u64; let mut i1 = BIN_T_LEN_OPTIONS.len(); loop { if i1 <= 0 { @@ -434,8 +462,8 @@ impl PreBinnedPatchCoord { } } - pub fn bin_count(&self) -> u64 { - self.spec.patch_t_len() / self.spec.bin_t_len + pub fn bin_count(&self) -> u32 { + (self.spec.patch_t_len() / self.spec.bin_t_len) as u32 } pub fn spec(&self) -> &PreBinnedPatchGridSpec { @@ -448,7 +476,7 @@ impl PreBinnedPatchCoord { pub fn to_url_params_strings(&self) -> String { format!( - "patch_t_len={}&bin_t_len={}&patch_ix={}", + "patchTlen={}&binTlen={}&patchIx={}", self.spec.patch_t_len(), self.spec.bin_t_len(), self.ix() @@ -542,7 +570,7 @@ pub struct BinnedRange { } impl BinnedRange { - pub fn covering_range(range: NanoRange, min_bin_count: u64) -> Result, Error> { + pub fn covering_range(range: NanoRange, min_bin_count: u32) -> Result, Error> { if min_bin_count < 1 { Err(Error::with_msg("min_bin_count < 1"))?; } @@ -553,7 +581,7 @@ impl BinnedRange { if dt > DAY * 14 { Err(Error::with_msg("dt > DAY * 14"))?; } - let bs = dt / min_bin_count; + let bs = dt / min_bin_count as u64; let mut i1 = BIN_THRESHOLDS.len(); loop { if i1 <= 0 { @@ -742,7 +770,7 @@ pub struct PerfOpts { } #[derive(Clone, Debug)] -pub struct ByteSize(u32); +pub struct ByteSize(pub u32); impl ByteSize { pub fn b(b: u32) -> Self { diff --git a/retrieval/src/client.rs b/retrieval/src/client.rs index 89e2163..247b006 100644 --- a/retrieval/src/client.rs +++ b/retrieval/src/client.rs @@ -2,7 +2,7 @@ use chrono::{DateTime, Utc}; use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use disk::agg::streams::StreamItem; use disk::binned::RangeCompletableItem; -use disk::cache::CacheUsage; +use disk::cache::{BinnedQuery, CacheUsage}; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::frame::makeframe::FrameType; use disk::streamlog::Streamlog; @@ -11,7 +11,7 @@ use futures_util::TryStreamExt; use http::StatusCode; use hyper::Body; use netpod::log::*; -use netpod::PerfOpts; +use netpod::{AggKind, ByteSize, Channel, HostPort, NanoRange, PerfOpts}; pub async fn status(host: String, port: u16) -> Result<(), Error> { let t1 = Utc::now(); @@ -52,26 +52,20 @@ pub async fn get_binned( info!("end {}", end_date); info!("-------"); let t1 = Utc::now(); - let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; - let uri = format!( - concat!( - "http://{}:{}/api/4/binned?channelBackend={}&channelName={}", - "&begDate={}&endDate={}&binCount={}&cacheUsage={}", - "&diskStatsEveryKb={}&reportError=true", - ), - host, - port, - channel_backend, - channel_name, - beg_date.format(date_fmt), - end_date.format(date_fmt), - bin_count, - cache_usage.query_param_value(), - disk_stats_every_kb, - ); + let channel = Channel { + backend: channel_backend.clone(), + name: channel_name.into(), + }; + let agg_kind = AggKind::DimXBins1; + let range = NanoRange::from_date_time(beg_date, end_date); + let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind); + query.set_cache_usage(cache_usage); + query.set_disk_stats_every(ByteSize(1024 * disk_stats_every_kb)); + let hp = HostPort { host: host, port: port }; + let url = query.url(&hp); let req = hyper::Request::builder() .method(http::Method::GET) - .uri(uri) + .uri(url) .header("accept", "application/octet-stream") .body(Body::empty())?; let client = hyper::Client::new(); diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index 73c19cb..c809b8f 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -4,6 +4,7 @@ use chrono::{DateTime, Utc}; use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use disk::agg::streams::StreamItem; use disk::binned::RangeCompletableItem; +use disk::cache::BinnedQuery; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::streamlog::Streamlog; use err::Error; @@ -12,7 +13,7 @@ use futures_util::TryStreamExt; use http::StatusCode; use hyper::Body; use netpod::log::*; -use netpod::{ByteSize, Cluster, Database, Node, PerfOpts}; +use netpod::{AggKind, Channel, Cluster, Database, HostPort, NanoRange, Node, PerfOpts}; use std::future::ready; use tokio::io::AsyncRead; @@ -92,29 +93,24 @@ where S: AsRef, { let t1 = Utc::now(); + let agg_kind = AggKind::DimXBins1; let node0 = &cluster.nodes[0]; let beg_date: DateTime = beg_date.as_ref().parse()?; let end_date: DateTime = end_date.as_ref().parse()?; let channel_backend = "testbackend"; - let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; let perf_opts = PerfOpts { inmem_bufcap: 512 }; - let disk_stats_every = ByteSize::kb(1024); - // TODO have a function to form the uri, including perf opts: - let uri = format!( - "http://{}:{}/api/4/binned?cache_usage=use&channel_backend={}&channel_name={}&bin_count={}&beg_date={}&end_date={}&disk_stats_every_kb={}", - node0.host, - node0.port, - channel_backend, - channel_name, - bin_count, - beg_date.format(date_fmt), - end_date.format(date_fmt), - disk_stats_every.bytes() / 1024, - ); - info!("get_binned_channel get {}", uri); + let channel = Channel { + backend: channel_backend.into(), + name: channel_name.into(), + }; + let range = NanoRange::from_date_time(beg_date, end_date); + let query = BinnedQuery::new(channel, range, bin_count, agg_kind); + let hp = HostPort::from_node(node0); + let url = query.url(&hp); + info!("get_binned_channel get {}", url); let req = hyper::Request::builder() .method(http::Method::GET) - .uri(uri) + .uri(url) .header("accept", "application/octet-stream") .body(Body::empty())?; let client = hyper::Client::new(); diff --git a/retrieval/src/test/json.rs b/retrieval/src/test/json.rs index 511c66a..3700719 100644 --- a/retrieval/src/test/json.rs +++ b/retrieval/src/test/json.rs @@ -1,11 +1,12 @@ use crate::spawn_test_hosts; use crate::test::test_cluster; use chrono::{DateTime, Utc}; +use disk::cache::BinnedQuery; use err::Error; use http::StatusCode; use hyper::Body; use netpod::log::*; -use netpod::{ByteSize, Cluster}; +use netpod::{AggKind, Channel, Cluster, HostPort, NanoRange}; #[test] fn get_binned_json_0() { @@ -33,28 +34,22 @@ async fn get_binned_json_0_inner2( cluster: &Cluster, ) -> Result<(), Error> { let t1 = Utc::now(); + let agg_kind = AggKind::DimXBins1; let node0 = &cluster.nodes[0]; let beg_date: DateTime = beg_date.parse()?; let end_date: DateTime = end_date.parse()?; let channel_backend = "testbackend"; - let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; - let disk_stats_every = ByteSize::kb(1024); - // TODO have a function to form the uri, including perf opts: - let uri = format!( - "http://{}:{}/api/4/binned?cache_usage=ignore&channel_backend={}&channel_name={}&bin_count={}&beg_date={}&end_date={}&disk_stats_every_kb={}", - node0.host, - node0.port, - channel_backend, - channel_name, - bin_count, - beg_date.format(date_fmt), - end_date.format(date_fmt), - disk_stats_every.bytes() / 1024, - ); - info!("get_binned_json_0 get {}", uri); + let channel = Channel { + backend: channel_backend.into(), + name: channel_name.into(), + }; + let range = NanoRange::from_date_time(beg_date, end_date); + let query = BinnedQuery::new(channel, range, bin_count, agg_kind); + let url = query.url(&HostPort::from_node(node0)); + info!("get_binned_json_0 get {}", url); let req = hyper::Request::builder() .method(http::Method::GET) - .uri(uri) + .uri(url) .header("Accept", "application/json") .body(Body::empty())?; let client = hyper::Client::new();