This commit is contained in:
Dominik Werder
2021-05-07 12:48:47 +02:00
parent db93ae1545
commit 073fde5fa8
14 changed files with 265 additions and 207 deletions

View File

@@ -111,12 +111,18 @@ impl PreBinnedQuery {
}
pub fn make_query_string(&self) -> String {
let cache_usage = match self.cache_usage {
CacheUsage::Use => "use",
CacheUsage::Ignore => "ignore",
CacheUsage::Recreate => "recreate",
};
format!(
"{}&channel_backend={}&channel_name={}&agg_kind={:?}",
"{}&channel_backend={}&channel_name={}&agg_kind={:?}&cache_usage={}",
self.patch.to_url_params_strings(),
self.channel.backend,
self.channel.name,
self.agg_kind
self.agg_kind,
cache_usage,
)
}
@@ -209,16 +215,6 @@ pub async fn binned_bytes_for_http(
// TODO do I need to set up more transformations or binning to deliver the requested data?
let s1 = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone());
let s1 = s1.into_binned_t(range);
/*let s1 = s1.map(|k| {
use super::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem::*;
match k {
Ok(Values(k)) => Ok(PreBinnedItem::Batch(k)),
Ok(RangeComplete) => Ok(PreBinnedItem::RangeComplete),
Ok(EventDataReadStats(stats)) => Ok(PreBinnedItem::EventDataReadStats(stats)),
Ok(Log(item)) => Ok(PreBinnedItem::Log(item)),
Err(e) => Err(e),
}
});*/
let s1 = BinnedStreamFromMerged::new(Box::pin(s1))?;
let ret = BinnedBytesForHttpStream::new(s1);
Ok(Box::pin(ret))
@@ -445,12 +441,12 @@ impl Stream for MergedFromRemotes {
Pending
} else {
if c1 == self.tcp_establish_futs.len() {
debug!("MergedFromRemotes SETTING UP MERGED STREAM");
debug!("MergedFromRemotes setting up merged stream");
let inps = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect();
let s1 = MergedMinMaxAvgScalarStream::new(inps);
self.merged = Some(Box::pin(s1));
} else {
info!(
debug!(
"MergedFromRemotes raw / estab {} {}",
c1,
self.tcp_establish_futs.len()