From 8cedf06d9c1bbca611b76bdbca9af3f67ca328e4 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 10 May 2021 13:56:11 +0200 Subject: [PATCH] Adjust caching and disk usage log output --- disk/src/aggtest.rs | 7 +++++- disk/src/binnedstream.rs | 11 ++++++++-- disk/src/cache.rs | 31 ++++++++++++++++++++++++--- disk/src/cache/pbv.rs | 4 +++- disk/src/eventblobs.rs | 13 ++++++++++-- disk/src/eventchunker.rs | 39 +++++++++++++++++++--------------- disk/src/lib.rs | 9 ++++++-- disk/src/merge.rs | 3 +-- disk/src/raw/conn.rs | 5 ++++- httpret/src/lib.rs | 20 ++++------------- netpod/src/lib.rs | 31 ++++++++++++++++++++------- retrieval/src/bin/retrieval.rs | 1 + retrieval/src/cli.rs | 2 ++ retrieval/src/client.rs | 4 +++- retrieval/src/test.rs | 6 ++++-- 15 files changed, 128 insertions(+), 58 deletions(-) diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 3d218ea..5b8c9bc 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -1,9 +1,10 @@ use super::agg::IntoDim1F32Stream; use crate::agg::binnedt::IntoBinnedT; use crate::agg::binnedx::IntoBinnedXBins1; +use crate::eventchunker::EventChunkerConf; use futures_util::StreamExt; use netpod::timeunits::*; -use netpod::{BinnedRange, Channel, ChannelConfig, NanoRange, Nanos, Node, ScalarType, Shape}; +use netpod::{BinnedRange, ByteSize, Channel, ChannelConfig, NanoRange, Nanos, Node, ScalarType, Shape}; use std::future::ready; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -54,11 +55,13 @@ async fn agg_x_dim_0_inner() { let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.ns; let ts2 = ts1 + HOUR * 24; let range = NanoRange { beg: ts1, end: ts2 }; + let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); let fut1 = super::eventblobs::EventBlobsComplete::new( range.clone(), query.channel_config.clone(), node.clone(), query.buffer_size as usize, + event_chunker_conf, ) .into_dim_1_f32_stream() .into_binned_x_bins_1() @@ -115,11 +118,13 @@ async fn agg_x_dim_1_inner() { let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.ns; let ts2 = ts1 + HOUR * 24; let range = NanoRange { beg: ts1, end: ts2 }; + let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); let fut1 = super::eventblobs::EventBlobsComplete::new( range.clone(), query.channel_config.clone(), node.clone(), query.buffer_size as usize, + event_chunker_conf, ) .into_dim_1_f32_stream() //.take(1000) diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index 634f3b6..ffd4b93 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -6,7 +6,7 @@ use futures_core::Stream; use futures_util::StreamExt; #[allow(unused_imports)] use netpod::log::*; -use netpod::{AggKind, BinnedRange, Channel, NodeConfigCached, PreBinnedPatchIterator}; +use netpod::{AggKind, BinnedRange, ByteSize, Channel, NodeConfigCached, PreBinnedPatchIterator}; use std::future::ready; use std::pin::Pin; use std::task::{Context, Poll}; @@ -23,6 +23,7 @@ impl BinnedStream { agg_kind: AggKind, cache_usage: CacheUsage, node_config: &NodeConfigCached, + disk_stats_every: ByteSize, ) -> Result { let patches: Vec<_> = patch_it.collect(); let mut sp = String::new(); @@ -36,7 +37,13 @@ impl BinnedStream { .map({ let node_config = node_config.clone(); move |patch| { - let query = PreBinnedQuery::new(patch, channel.clone(), agg_kind.clone(), cache_usage.clone()); + let query = PreBinnedQuery::new( + patch, + channel.clone(), + agg_kind.clone(), + cache_usage.clone(), + disk_stats_every.clone(), + ); let s: Pin + Send>> = match PreBinnedValueFetchedStream::new(&query, &node_config) { Ok(k) => Box::pin(k), diff --git a/disk/src/cache.rs b/disk/src/cache.rs index b3c4703..f6cfa03 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -15,7 +15,7 @@ use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; use hyper::Response; use netpod::{ - AggKind, BinnedRange, Channel, Cluster, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchCoord, + AggKind, BinnedRange, ByteSize, Channel, Cluster, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange, ToNanos, }; use serde::{Deserialize, Serialize}; @@ -58,6 +58,7 @@ pub struct BinnedQuery { agg_kind: AggKind, channel: Channel, cache_usage: CacheUsage, + disk_stats_every: ByteSize, } impl BinnedQuery { @@ -65,6 +66,12 @@ impl BinnedQuery { let params = netpod::query_params(req.uri.query()); let beg_date = params.get("beg_date").ok_or(Error::with_msg("missing beg_date"))?; let end_date = params.get("end_date").ok_or(Error::with_msg("missing end_date"))?; + let disk_stats_every = params + .get("disk_stats_every_kb") + .ok_or(Error::with_msg("missing disk_stats_every_kb"))?; + let disk_stats_every = disk_stats_every + .parse() + .map_err(|e| Error::with_msg(format!("can not parse disk_stats_every_kb {:?}", e)))?; let ret = BinnedQuery { range: NanoRange { beg: beg_date.parse::>()?.to_nanos(), @@ -78,6 +85,7 @@ impl BinnedQuery { agg_kind: AggKind::DimXBins1, channel: channel_from_params(¶ms)?, cache_usage: cache_usage_from_params(¶ms)?, + disk_stats_every: ByteSize::kb(disk_stats_every), }; info!("BinnedQuery::from_request {:?}", ret); Ok(ret) @@ -90,15 +98,23 @@ pub struct PreBinnedQuery { agg_kind: AggKind, channel: Channel, cache_usage: CacheUsage, + disk_stats_every: ByteSize, } impl PreBinnedQuery { - pub fn new(patch: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, cache_usage: CacheUsage) -> Self { + pub fn new( + patch: PreBinnedPatchCoord, + channel: Channel, + agg_kind: AggKind, + cache_usage: CacheUsage, + disk_stats_every: ByteSize, + ) -> Self { Self { patch, agg_kind, channel, cache_usage, + disk_stats_every, } } @@ -112,11 +128,18 @@ impl PreBinnedQuery { .get("bin_t_len") .ok_or(Error::with_msg("missing bin_t_len"))? .parse()?; + let disk_stats_every = params + .get("disk_stats_every_kb") + .ok_or(Error::with_msg("missing disk_stats_every_kb"))?; + let disk_stats_every = disk_stats_every + .parse() + .map_err(|e| Error::with_msg(format!("can not parse disk_stats_every_kb {:?}", e)))?; let ret = PreBinnedQuery { patch: PreBinnedPatchCoord::new(bin_t_len, patch_ix), agg_kind: AggKind::DimXBins1, channel: channel_from_params(¶ms)?, cache_usage: cache_usage_from_params(¶ms)?, + disk_stats_every: ByteSize::kb(disk_stats_every), }; Ok(ret) } @@ -128,12 +151,13 @@ impl PreBinnedQuery { CacheUsage::Recreate => "recreate", }; format!( - "{}&channel_backend={}&channel_name={}&agg_kind={:?}&cache_usage={}", + "{}&channel_backend={}&channel_name={}&agg_kind={:?}&cache_usage={}&disk_stats_every_kb={}", self.patch.to_url_params_strings(), self.channel.backend, self.channel.name, self.agg_kind, cache_usage, + self.disk_stats_every.bytes() / 1024, ) } @@ -209,6 +233,7 @@ pub async fn binned_bytes_for_http( query.agg_kind.clone(), query.cache_usage.clone(), node_config, + query.disk_stats_every.clone(), )?; let ret = BinnedBytesForHttpStream::new(s1); Ok(Box::pin(ret)) diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index 48062c1..19a74f5 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -136,7 +136,7 @@ impl PreBinnedValueStream { error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h); return; } - if g / h > 200 { + if g / h > 1024 * 10 { error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h); return; } @@ -149,12 +149,14 @@ impl PreBinnedValueStream { let s = futures_util::stream::iter(patch_it) .map({ let q2 = self.query.clone(); + let disk_stats_every = self.query.disk_stats_every.clone(); move |patch| { let query = PreBinnedQuery { patch, channel: q2.channel.clone(), agg_kind: q2.agg_kind.clone(), cache_usage: q2.cache_usage.clone(), + disk_stats_every: disk_stats_every.clone(), }; PreBinnedValueFetchedStream::new(&query, &node_config) } diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index fa1b6d6..4975e0f 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -1,5 +1,5 @@ use crate::dataopen::open_files; -use crate::eventchunker::{EventChunker, EventChunkerItem}; +use crate::eventchunker::{EventChunker, EventChunkerConf, EventChunkerItem}; use crate::file_content_stream; use err::Error; use futures_core::Stream; @@ -14,17 +14,25 @@ pub struct EventBlobsComplete { file_chan: async_channel::Receiver>, evs: Option, buffer_size: usize, + event_chunker_conf: EventChunkerConf, range: NanoRange, errored: bool, completed: bool, } impl EventBlobsComplete { - pub fn new(range: NanoRange, channel_config: ChannelConfig, node: Node, buffer_size: usize) -> Self { + pub fn new( + range: NanoRange, + channel_config: ChannelConfig, + node: Node, + buffer_size: usize, + event_chunker_conf: EventChunkerConf, + ) -> Self { Self { file_chan: open_files(&range, &channel_config, node), evs: None, buffer_size, + event_chunker_conf, channel_config, range, errored: false, @@ -62,6 +70,7 @@ impl Stream for EventBlobsComplete { inp, self.channel_config.clone(), self.range.clone(), + self.event_chunker_conf.clone(), ); self.evs = Some(chunker); continue 'outer; diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 26cc760..3c21e47 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -4,10 +4,9 @@ use bytes::{Buf, BytesMut}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; -#[allow(unused_imports)] use netpod::log::*; use netpod::timeunits::SEC; -use netpod::{ChannelConfig, EventDataReadStats, NanoRange, ScalarType, Shape}; +use netpod::{ByteSize, ChannelConfig, EventDataReadStats, NanoRange, ScalarType, Shape}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -19,12 +18,11 @@ pub struct EventChunker { errored: bool, completed: bool, range: NanoRange, + stats_conf: EventChunkerConf, seen_beyond_range: bool, sent_beyond_range: bool, data_emit_complete: bool, final_stats_sent: bool, - data_since_last_stats: u32, - stats_emit_interval: u32, parsed_bytes: u64, } @@ -38,11 +36,23 @@ struct ParseResult { parsed_bytes: u64, } +#[derive(Clone, Debug)] +pub struct EventChunkerConf { + disk_stats_every: ByteSize, +} + +impl EventChunkerConf { + pub fn new(disk_stats_every: ByteSize) -> Self { + Self { disk_stats_every } + } +} + impl EventChunker { pub fn from_start( inp: Pin> + Send>>, channel_config: ChannelConfig, range: NanoRange, + stats_conf: EventChunkerConf, ) -> Self { let mut inp = NeedMinBuffer::new(inp); inp.set_need_min(6); @@ -54,12 +64,11 @@ impl EventChunker { errored: false, completed: false, range, + stats_conf, seen_beyond_range: false, sent_beyond_range: false, data_emit_complete: false, final_stats_sent: false, - data_since_last_stats: 0, - stats_emit_interval: 256, parsed_bytes: 0, } } @@ -68,8 +77,9 @@ impl EventChunker { inp: Pin> + Send>>, channel_config: ChannelConfig, range: NanoRange, + stats_conf: EventChunkerConf, ) -> Self { - let mut ret = Self::from_start(inp, channel_config, range); + let mut ret = Self::from_start(inp, channel_config, range, stats_conf); ret.state = DataFileState::Event; ret.need_min = 4; ret.inp.set_need_min(4); @@ -85,7 +95,6 @@ impl EventChunker { let mut parsed_bytes = 0; use byteorder::{ReadBytesExt, BE}; loop { - trace!("parse_buf LOOP buf len {} need_min {}", buf.len(), self.need_min); if (buf.len() as u32) < self.need_min { break; } @@ -209,10 +218,10 @@ impl EventChunker { ) { Ok(c1) => { assert!(c1 as u32 == k1); - trace!("decompress result c1 {} k1 {}", c1, k1); + //trace!("decompress result c1 {} k1 {}", c1, k1); if ts < self.range.beg { } else if ts >= self.range.end { - error!("EVENT AFTER RANGE {}", ts / SEC); + Err(Error::with_msg(format!("event after range {}", ts / SEC)))?; } else { ret.add_event( ts, @@ -287,10 +296,10 @@ impl Stream for EventChunker { } else if self.errored { self.completed = true; Ready(None) - } else if self.data_since_last_stats >= self.stats_emit_interval { - self.data_since_last_stats = 0; + } else if self.parsed_bytes >= self.stats_conf.disk_stats_every.bytes() as u64 { let item = EventDataReadStats { - parsed_bytes: self.parsed_bytes, + //parsed_bytes: self.parsed_bytes, + parsed_bytes: 1000, }; self.parsed_bytes = 0; let ret = EventChunkerItem::EventDataReadStats(item); @@ -306,7 +315,6 @@ impl Stream for EventChunker { continue 'outer; } } else if self.data_emit_complete { - self.data_since_last_stats = 0; let item = EventDataReadStats { parsed_bytes: self.parsed_bytes, }; @@ -327,19 +335,16 @@ impl Stream for EventChunker { } if self.need_min > 1024 * 8 { let msg = format!("spurious EventChunker asks for need_min {}", self.need_min); - warn!("{}", msg); self.errored = true; Ready(Some(Err(Error::with_msg(msg)))) } else { let x = self.need_min; self.inp.set_need_min(x); - self.data_since_last_stats += 1; let ret = EventChunkerItem::Events(res.events); Ready(Some(Ok(ret))) } } Err(e) => { - error!("EventChunker parse_buf returned error {:?}", e); self.errored = true; Ready(Some(Err(e.into()))) } diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 46d930b..2569a94 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -1,5 +1,6 @@ use crate::dataopen::open_files; use crate::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE}; +use crate::eventchunker::EventChunkerConf; use bytes::{Bytes, BytesMut}; use err::Error; use futures_core::Stream; @@ -333,7 +334,11 @@ pub fn file_content_stream( } } -pub fn parsed1(query: &netpod::AggQuerySingleChannel, node: &Node) -> impl Stream> + Send { +pub fn parsed1( + query: &netpod::AggQuerySingleChannel, + node: &Node, + stats_conf: EventChunkerConf, +) -> impl Stream> + Send { let query = query.clone(); let node = node.clone(); async_stream::stream! { @@ -343,7 +348,7 @@ pub fn parsed1(query: &netpod::AggQuerySingleChannel, node: &Node) -> impl Strea Ok(file) => { let inp = Box::pin(file_content_stream(file, query.buffer_size as usize)); let range = err::todoval(); - let mut chunker = eventchunker::EventChunker::from_event_boundary(inp, err::todoval(), range); + let mut chunker = eventchunker::EventChunker::from_event_boundary(inp, err::todoval(), range, stats_conf.clone()); while let Some(evres) = chunker.next().await { use eventchunker::EventChunkerItem; match evres { diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 37cf04e..cca7251 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -3,12 +3,11 @@ use crate::streamlog::LogItem; use err::Error; use futures_core::Stream; use futures_util::StreamExt; +use netpod::log::*; use netpod::EventDataReadStats; use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; -#[allow(unused_imports)] -use tracing::{debug, error, info, trace, warn}; pub struct MergedMinMaxAvgScalarStream where diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 3640731..65bdf14 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -3,13 +3,14 @@ use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem; use crate::agg::IntoDim1F32Stream; use crate::channelconfig::{extract_matching_config_entry, read_local_config}; use crate::eventblobs::EventBlobsComplete; +use crate::eventchunker::EventChunkerConf; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::{decode_frame, make_frame, make_term_frame}; use crate::raw::{EventQueryJsonStringFrame, EventsQuery}; use err::Error; use futures_util::StreamExt; use netpod::log::*; -use netpod::{NodeConfigCached, PerfOpts, Shape}; +use netpod::{ByteSize, NodeConfigCached, PerfOpts, Shape}; use std::net::SocketAddr; use tokio::io::AsyncWriteExt; use tokio::net::tcp::OwnedWriteHalf; @@ -163,11 +164,13 @@ async fn raw_conn_handler_inner_try( // TODO use the requested buffer size buffer_size: 1024 * 4, }; + let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); let mut s1 = EventBlobsComplete::new( range.clone(), query.channel_config.clone(), node_config.node.clone(), query.buffer_size as usize, + event_chunker_conf, ) .into_dim_1_f32_stream() .into_binned_x_bins_1() diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index eaae09d..007a3f2 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -1,5 +1,6 @@ use bytes::Bytes; use disk::cache::PreBinnedQuery; +use disk::eventchunker::EventChunkerConf; use disk::raw::conn::raw_service; use err::Error; use future::Future; @@ -9,7 +10,7 @@ use http::{HeaderMap, Method, StatusCode}; use hyper::service::{make_service_fn, service_fn}; use hyper::{server::Server, Body, Request, Response}; use net::SocketAddr; -use netpod::{Node, NodeConfigCached}; +use netpod::{ByteSize, Node, NodeConfigCached}; use panic::{AssertUnwindSafe, UnwindSafe}; use pin::Pin; use std::{future, net, panic, pin, task}; @@ -123,22 +124,9 @@ async fn parsed_raw(req: Request, node: &Node) -> Result, E let reqbody = req.into_body(); let bodyslice = hyper::body::to_bytes(reqbody).await?; let query: AggQuerySingleChannel = serde_json::from_slice(&bodyslice)?; - //let q = disk::read_test_1(&query).await?; - //let s = q.inner; - let s = disk::parsed1(&query, node); + let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); + let s = disk::parsed1(&query, node, event_chunker_conf); let res = response(StatusCode::OK).body(Body::wrap_stream(s))?; - /* - let res = match q { - Ok(k) => { - response(StatusCode::OK) - .body(Body::wrap_stream(k.inner))? - } - Err(e) => { - response(StatusCode::INTERNAL_SERVER_ERROR) - .body(Body::empty())? - } - }; - */ Ok(res) } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index d23b134..8d16cb9 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -246,14 +246,13 @@ pub mod timeunits { pub const MIN: u64 = SEC * 60; pub const HOUR: u64 = MIN * 60; pub const DAY: u64 = HOUR * 24; - pub const WEEK: u64 = DAY * 7; } -const BIN_T_LEN_OPTIONS: [u64; 6] = [SEC * 10, MIN * 10, HOUR, HOUR * 4, DAY, DAY * 4]; +const BIN_T_LEN_OPTIONS: [u64; 3] = [SEC, MIN * 10, HOUR * 2]; -const PATCH_T_LEN_OPTIONS: [u64; 6] = [MIN * 10, HOUR, HOUR * 4, DAY, DAY * 4, DAY * 12]; +const PATCH_T_LEN_OPTIONS: [u64; 3] = [MIN * 20, HOUR * 12, DAY * 16]; -const BIN_THRESHOLDS: [u64; 33] = [ +const BIN_THRESHOLDS: [u64; 31] = [ 2, 10, 100, @@ -283,10 +282,8 @@ const BIN_THRESHOLDS: [u64; 33] = [ DAY * 4, DAY * 8, DAY * 16, - WEEK, - WEEK * 2, - WEEK * 10, - WEEK * 60, + DAY * 32, + DAY * 64, ]; #[derive(Clone, Serialize, Deserialize)] @@ -672,3 +669,21 @@ impl EventDataReadStats { pub struct PerfOpts { pub inmem_bufcap: usize, } + +#[derive(Clone, Debug)] +pub struct ByteSize(u32); + +impl ByteSize { + pub fn b(b: u32) -> Self { + Self(b) + } + pub fn kb(kb: u32) -> Self { + Self(1024 * kb) + } + pub fn mb(mb: u32) -> Self { + Self(1024 * 1024 * mb) + } + pub fn bytes(&self) -> u32 { + self.0 + } +} diff --git a/retrieval/src/bin/retrieval.rs b/retrieval/src/bin/retrieval.rs index cc60c36..2e1f52d 100644 --- a/retrieval/src/bin/retrieval.rs +++ b/retrieval/src/bin/retrieval.rs @@ -53,6 +53,7 @@ async fn go() -> Result<(), Error> { end, opts.bins, cache_usage, + opts.disk_stats_every_kb, ) .await?; } diff --git a/retrieval/src/cli.rs b/retrieval/src/cli.rs index ae1c612..d078104 100644 --- a/retrieval/src/cli.rs +++ b/retrieval/src/cli.rs @@ -53,4 +53,6 @@ pub struct BinnedClient { pub ignore_cache: bool, #[clap(long)] pub recreate_cache: bool, + #[clap(long, default_value = "1048576")] + pub disk_stats_every_kb: u32, } diff --git a/retrieval/src/client.rs b/retrieval/src/client.rs index df5c076..6cfed7b 100644 --- a/retrieval/src/client.rs +++ b/retrieval/src/client.rs @@ -19,12 +19,13 @@ pub async fn get_binned( end_date: DateTime, bin_count: u32, cache_usage: CacheUsage, + disk_stats_every_kb: u32, ) -> Result<(), Error> { info!("------- get_binned client"); let t1 = Utc::now(); let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; let uri = format!( - "http://{}:{}/api/1/binned?channel_backend={}&channel_name={}&beg_date={}&end_date={}&bin_count={}&cache_usage={}", + "http://{}:{}/api/1/binned?channel_backend={}&channel_name={}&beg_date={}&end_date={}&bin_count={}&cache_usage={}&disk_stats_every_kb={}", host, port, channel_backend, @@ -33,6 +34,7 @@ pub async fn get_binned( end_date.format(date_fmt), bin_count, cache_usage.query_param_value(), + disk_stats_every_kb, ); info!("get_binned uri {:?}", uri); let req = hyper::Request::builder() diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index 5c15124..80ab30b 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -10,7 +10,7 @@ use futures_util::TryStreamExt; use http::StatusCode; use hyper::Body; use netpod::log::*; -use netpod::{Cluster, Database, Node, PerfOpts}; +use netpod::{ByteSize, Cluster, Database, Node, PerfOpts}; use std::future::ready; use tokio::io::AsyncRead; @@ -94,9 +94,10 @@ where let channel_backend = "testbackend"; let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; let perf_opts = PerfOpts { inmem_bufcap: 512 }; + let disk_stats_every = ByteSize::kb(1024); // TODO have a function to form the uri, including perf opts: let uri = format!( - "http://{}:{}/api/1/binned?cache_usage=ignore&channel_backend={}&channel_name={}&bin_count={}&beg_date={}&end_date={}", + "http://{}:{}/api/1/binned?cache_usage=ignore&channel_backend={}&channel_name={}&bin_count={}&beg_date={}&end_date={}&disk_stats_every_kb={}", node0.host, node0.port, channel_backend, @@ -104,6 +105,7 @@ where bin_count, beg_date.format(date_fmt), end_date.format(date_fmt), + disk_stats_every.bytes() / 1024, ); info!("get_binned_channel get {}", uri); let req = hyper::Request::builder()