This commit is contained in:
Dominik Werder
2021-06-25 12:56:42 +02:00
parent 7e0a540aa6
commit 4b2048c103
20 changed files with 433 additions and 286 deletions
+2
View File
@@ -168,6 +168,7 @@ where
shape: Shape,
agg_kind: AggKind,
cache_usage: CacheUsage,
disk_io_buffer_size: usize,
node_config: &NodeConfigCached,
disk_stats_every: ByteSize,
report_error: bool,
@@ -193,6 +194,7 @@ where
channel.clone(),
agg_kind.clone(),
cache_usage.clone(),
disk_io_buffer_size,
disk_stats_every.clone(),
report_error,
);
+3
View File
@@ -114,6 +114,7 @@ where
channel: self.query.channel().clone(),
range: self.query.patch().patch_range(),
agg_kind: self.query.agg_kind().clone(),
disk_io_buffer_size: self.query.disk_io_buffer_size(),
};
if self.query.patch().patch_t_len() % self.query.patch().bin_t_len() != 0 {
let msg = format!(
@@ -172,6 +173,7 @@ where
let s = futures_util::stream::iter(patch_it)
.map({
let q2 = self.query.clone();
let disk_io_buffer_size = self.query.disk_io_buffer_size();
let disk_stats_every = self.query.disk_stats_every().clone();
let report_error = self.query.report_error();
move |patch| {
@@ -180,6 +182,7 @@ where
q2.channel().clone(),
q2.agg_kind().clone(),
q2.cache_usage().clone(),
disk_io_buffer_size,
disk_stats_every.clone(),
report_error,
);
+29
View File
@@ -16,6 +16,7 @@ pub struct PreBinnedQuery {
agg_kind: AggKind,
channel: Channel,
cache_usage: CacheUsage,
disk_io_buffer_size: usize,
disk_stats_every: ByteSize,
report_error: bool,
}
@@ -26,6 +27,7 @@ impl PreBinnedQuery {
channel: Channel,
agg_kind: AggKind,
cache_usage: CacheUsage,
disk_io_buffer_size: usize,
disk_stats_every: ByteSize,
report_error: bool,
) -> Self {
@@ -34,6 +36,7 @@ impl PreBinnedQuery {
agg_kind,
channel,
cache_usage,
disk_io_buffer_size,
disk_stats_every,
report_error,
}
@@ -68,6 +71,11 @@ impl PreBinnedQuery {
channel: channel_from_pairs(&pairs)?,
agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::DimXBins1),
cache_usage: CacheUsage::from_pairs(&pairs)?,
disk_io_buffer_size: pairs
.get("diskIoBufferSize")
.map_or("4096", |k| k)
.parse()
.map_err(|e| Error::with_msg(format!("can not parse diskIoBufferSize {:?}", e)))?,
disk_stats_every: ByteSize::kb(disk_stats_every),
report_error: pairs
.get("reportError")
@@ -107,6 +115,10 @@ impl PreBinnedQuery {
pub fn cache_usage(&self) -> CacheUsage {
self.cache_usage.clone()
}
pub fn disk_io_buffer_size(&self) -> usize {
self.disk_io_buffer_size
}
}
impl AppendToUrl for PreBinnedQuery {
@@ -117,6 +129,7 @@ impl AppendToUrl for PreBinnedQuery {
g.append_pair("channelBackend", &self.channel.backend);
g.append_pair("channelName", &self.channel.name);
g.append_pair("cacheUsage", &format!("{}", self.cache_usage.query_param_value()));
g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size));
g.append_pair("diskStatsEveryKb", &format!("{}", self.disk_stats_every.bytes() / 1024));
g.append_pair("reportError", &format!("{}", self.report_error()));
}
@@ -181,6 +194,7 @@ pub struct BinnedQuery {
bin_count: u32,
agg_kind: AggKind,
cache_usage: CacheUsage,
disk_io_buffer_size: usize,
disk_stats_every: ByteSize,
report_error: bool,
timeout: Duration,
@@ -196,6 +210,7 @@ impl BinnedQuery {
bin_count,
agg_kind,
cache_usage: CacheUsage::Use,
disk_io_buffer_size: 1024 * 4,
disk_stats_every: ByteSize(1024 * 1024 * 4),
report_error: false,
timeout: Duration::from_millis(2000),
@@ -228,6 +243,10 @@ impl BinnedQuery {
&self.disk_stats_every
}
pub fn disk_io_buffer_size(&self) -> usize {
self.disk_io_buffer_size
}
pub fn report_error(&self) -> bool {
self.report_error
}
@@ -255,6 +274,10 @@ impl BinnedQuery {
pub fn set_timeout(&mut self, k: Duration) {
self.timeout = k;
}
pub fn set_disk_io_buffer_size(&mut self, k: usize) {
self.disk_io_buffer_size = k;
}
}
impl HasBackend for BinnedQuery {
@@ -291,6 +314,11 @@ impl FromUrl for BinnedQuery {
.map_err(|e| Error::with_msg(format!("can not parse binCount {:?}", e)))?,
agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::DimXBins1),
cache_usage: CacheUsage::from_pairs(&pairs)?,
disk_io_buffer_size: pairs
.get("diskIoBufferSize")
.map_or("4096", |k| k)
.parse()
.map_err(|e| Error::with_msg(format!("can not parse diskIoBufferSize {:?}", e)))?,
disk_stats_every: ByteSize::kb(disk_stats_every),
report_error: pairs
.get("reportError")
@@ -342,6 +370,7 @@ impl AppendToUrl for BinnedQuery {
}
{
let mut g = url.query_pairs_mut();
g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size));
g.append_pair("diskStatsEveryKb", &format!("{}", self.disk_stats_every.bytes() / 1024));
g.append_pair("timeout", &format!("{}", self.timeout.as_millis()));
g.append_pair("abortAfterBinCount", &format!("{}", self.abort_after_bin_count));