diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index 8ac803a..0e1a28c 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -1,5 +1,4 @@ use crate::agg::streams::StreamItem; -use crate::agg::AggregatableXdim1Bin; use crate::binned::{BinnedStreamKind, RangeCompletableItem}; use err::Error; use futures_core::Stream; @@ -115,7 +114,7 @@ where fn cycle_current_bin(&mut self) { self.curbin += 1; let range = self.spec.get_range(self.curbin); - let ret = self + let _ret = self .aggtor .replace( <::XBinnedEvents as AggregatableTdim>::aggregator_new_static( @@ -125,8 +124,10 @@ where // TODO handle None case, or remove Option if Agg is always present .unwrap() .result(); - //self.tmp_agg_results = VecDeque::from(ret); + // TODO retire this module + err::todo(); self.tmp_agg_results = VecDeque::new(); + //self.tmp_agg_results = VecDeque::from(ret); if self.curbin >= self.spec.count as u32 { self.all_bins_emitted = true; } @@ -165,7 +166,8 @@ where // TODO cycle_current_bin enqueues the bin, can I return here instead? None } else { - let mut item = item; + let item = item; + // TODO can we retire this module? //ag.ingest(&mut item); ag.ingest(err::todoval()); let item = item; diff --git a/disk/src/agg/binnedt2.rs b/disk/src/agg/binnedt2.rs index f39c13b..8577ae1 100644 --- a/disk/src/agg/binnedt2.rs +++ b/disk/src/agg/binnedt2.rs @@ -1,6 +1,5 @@ use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; -use crate::agg::AggregatableXdim1Bin; use crate::binned::RangeCompletableItem; use err::Error; use futures_core::Stream; diff --git a/disk/src/agg/binnedt3.rs b/disk/src/agg/binnedt3.rs index 278dfd2..e134a03 100644 --- a/disk/src/agg/binnedt3.rs +++ b/disk/src/agg/binnedt3.rs @@ -1,7 +1,7 @@ use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; -use crate::binned::{BinnedStreamKind, RangeCompletableItem, RangeOverlapInfo}; +use crate::binned::{RangeCompletableItem, RangeOverlapInfo}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs index ef348d5..9afc69a 100644 --- a/disk/src/agg/eventbatch.rs +++ b/disk/src/agg/eventbatch.rs @@ -2,7 +2,7 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::{Appendable, StreamItem}; use crate::agg::AggregatableXdim1Bin; -use crate::binned::{BinnedStreamKind, MakeBytesFrame, RangeCompletableItem, RangeOverlapInfo, WithTimestamps}; +use crate::binned::{BinnedStreamKind, MakeBytesFrame, RangeCompletableItem, RangeOverlapInfo}; use crate::frame::makeframe::make_frame; use bytes::{BufMut, Bytes, BytesMut}; use err::Error; diff --git a/disk/src/agg/streams.rs b/disk/src/agg/streams.rs index 8ca70da..0c3dc8e 100644 --- a/disk/src/agg/streams.rs +++ b/disk/src/agg/streams.rs @@ -1,6 +1,4 @@ -use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; -use crate::agg::AggregatableXdim1Bin; -use crate::binned::BinnedStreamKind; +use crate::binned::WithLen; use crate::streamlog::LogItem; use err::Error; use netpod::EventDataReadStats; @@ -39,7 +37,7 @@ pub trait ToJsonResult { fn to_json_result(&self) -> Result; } -pub trait Appendable { +pub trait Appendable: WithLen { fn empty() -> Self; fn append(&mut self, src: &Self); } diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 61ae5c7..089bc0b 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -1,15 +1,15 @@ -use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim, IntoBinnedT}; +use crate::agg::binnedt::AggregatableTdim; use crate::agg::binnedt2::AggregatableTdim2; use crate::agg::binnedt3::{Agg3, BinnedT3Stream}; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; -use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchAggregator}; +use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::{Appendable, Collectable, Collected, StreamItem, ToJsonResult}; -use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside}; +use crate::agg::{Fits, FitsInside}; use crate::binned::scalar::binned_stream; use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BoxedStream}; use crate::cache::{BinnedQuery, MergedFromRemotes}; -use crate::channelconfig::{extract_matching_config_entry, read_local_config}; -use crate::frame::makeframe::{make_frame, FrameType}; +use crate::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; +use crate::frame::makeframe::FrameType; use crate::raw::EventsQuery; use bytes::Bytes; use chrono::{TimeZone, Utc}; @@ -23,6 +23,7 @@ use netpod::{ use num_traits::Zero; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize, Serializer}; +use serde_json::Map; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; @@ -55,15 +56,6 @@ impl MinMaxAvgScalarBinBatchCollected { } } -fn append_to_min_max_avg_scalar_bin_batch(batch: &mut MinMaxAvgScalarBinBatch, item: &mut MinMaxAvgScalarBinBatch) { - batch.ts1s.append(&mut item.ts1s); - batch.ts2s.append(&mut item.ts2s); - batch.counts.append(&mut item.counts); - batch.mins.append(&mut item.mins); - batch.maxs.append(&mut item.maxs); - batch.avgs.append(&mut item.avgs); -} - impl Collected for MinMaxAvgScalarBinBatchCollected { fn new(bin_count_exp: u32) -> Self { Self::empty(bin_count_exp) @@ -162,20 +154,29 @@ pub async fn binned_bytes_for_http( query: &BinnedQuery, ) -> Result { let channel_config = read_local_config(&query.channel(), &node_config.node).await?; - let entry = extract_matching_config_entry(query.range(), &channel_config)?; - info!("binned_bytes_for_http found config entry {:?}", entry); - match query.agg_kind() { - AggKind::DimXBins1 => { - let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?; - let ret = BinnedBytesForHttpStream::new(res.binned_stream); - Ok(Box::pin(ret)) + match extract_matching_config_entry(query.range(), &channel_config)? { + MatchingConfigEntry::None => { + // TODO can I use the same binned_stream machinery to construct the matching empty result? + let s = futures_util::stream::empty(); + Ok(Box::pin(s)) } - AggKind::DimXBinsN(_) => { - // TODO pass a different stream kind here: - err::todo(); - let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?; - let ret = BinnedBytesForHttpStream::new(res.binned_stream); - Ok(Box::pin(ret)) + MatchingConfigEntry::Multiple => Err(Error::with_msg("multiple config entries found"))?, + MatchingConfigEntry::Entry(entry) => { + info!("binned_bytes_for_http found config entry {:?}", entry); + match query.agg_kind() { + AggKind::DimXBins1 => { + let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?; + let ret = BinnedBytesForHttpStream::new(res.binned_stream); + Ok(Box::pin(ret)) + } + AggKind::DimXBinsN(_) => { + // TODO pass a different stream kind here: + err::todo(); + let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?; + let ret = BinnedBytesForHttpStream::new(res.binned_stream); + Ok(Box::pin(ret)) + } + } } } } @@ -285,7 +286,7 @@ where StreamItem::Stats(_) => {} StreamItem::DataItem(item) => match item { RangeCompletableItem::RangeComplete => {} - RangeCompletableItem::Data(mut item) => { + RangeCompletableItem::Data(item) => { item.append_to(&mut main_item); i1 += 1; } @@ -317,16 +318,36 @@ pub struct BinnedJsonResult { pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result { let channel_config = read_local_config(&query.channel(), &node_config.node).await?; - let entry = extract_matching_config_entry(query.range(), &channel_config)?; - info!("binned_json found config entry {:?}", entry); - - // TODO create the matching stream based on AggKind and ConfigEntry. - - let t = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?; - // TODO need to collect also timeout, number of missing expected bins, ... - let collected = collect_all(t.binned_stream, t.range.count as u32).await?; - let ret = ToJsonResult::to_json_result(&collected)?; - Ok(serde_json::to_value(ret)?) + match extract_matching_config_entry(query.range(), &channel_config)? { + MatchingConfigEntry::None => { + // TODO can I use the same binned_stream machinery to construct the matching empty result? + Ok(serde_json::Value::Object(Map::new())) + } + MatchingConfigEntry::Multiple => Err(Error::with_msg("multiple config entries found"))?, + MatchingConfigEntry::Entry(entry) => { + info!("binned_json found config entry {:?}", entry); + match query.agg_kind() { + AggKind::DimXBins1 => { + let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?; + //let ret = BinnedBytesForHttpStream::new(res.binned_stream); + //Ok(Box::pin(ret)) + // TODO need to collect also timeout, number of missing expected bins, ... + let collected = collect_all(res.binned_stream, res.range.count as u32).await?; + let ret = ToJsonResult::to_json_result(&collected)?; + Ok(serde_json::to_value(ret)?) + } + AggKind::DimXBinsN(_xbincount) => { + // TODO pass a different stream kind here: + err::todo(); + let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?; + // TODO need to collect also timeout, number of missing expected bins, ... + let collected = collect_all(res.binned_stream, res.range.count as u32).await?; + let ret = ToJsonResult::to_json_result(&collected)?; + Ok(serde_json::to_value(ret)?) + } + } + } + } } pub struct ReadPbv @@ -586,6 +607,7 @@ impl BinnedStreamKind for BinnedStreamKindScalar { query.cache_usage().clone(), node_config, query.disk_stats_every().clone(), + query.report_error(), self.clone(), )?; Ok(BoxedStream::new(Box::pin(s))?) @@ -611,26 +633,3 @@ impl BinnedStreamKind for BinnedStreamKindScalar { Self::XBinnedToTBinnedStream::new(inp, spec) } } - -// TODO this code is needed somewhere: -fn pbv_handle_fut2_item( - item: StreamItem>, -) -> Option>> { - // TODO make this code work in this context: - // Do I need more parameters here? - /*Ok(item) => match item { - StreamItem::DataItem(item) => match item { - PreBinnedScalarItem::Batch(batch) => { - self.values.ts1s.extend(batch.ts1s.iter()); - self.values.ts2s.extend(batch.ts2s.iter()); - self.values.counts.extend(batch.counts.iter()); - self.values.mins.extend(batch.mins.iter()); - self.values.maxs.extend(batch.maxs.iter()); - self.values.avgs.extend(batch.avgs.iter()); - StreamItem::DataItem(PreBinnedScalarItem::Batch(batch)) - } - }, - },*/ - err::todo(); - None -} diff --git a/disk/src/binned/scalar.rs b/disk/src/binned/scalar.rs index 918c6bf..8a9f1a0 100644 --- a/disk/src/binned/scalar.rs +++ b/disk/src/binned/scalar.rs @@ -1,12 +1,8 @@ -use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; -use crate::agg::streams::StreamItem; -use crate::binned::{BinnedStreamKind, BinnedStreamRes, RangeCompletableItem}; +use crate::binned::{BinnedStreamKind, BinnedStreamRes}; use crate::binnedstream::BoxedStream; use crate::cache::BinnedQuery; -use crate::frame::makeframe::FrameType; use crate::raw::EventsQuery; use err::Error; -use futures_core::Stream; use netpod::log::*; use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchRange}; diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index 601650e..feec921 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -22,7 +22,7 @@ where + Send, >, >, - stream_kind: BK, + _stream_kind: BK, } impl BinnedScalarStreamFromPreBinnedPatches @@ -38,6 +38,7 @@ where cache_usage: CacheUsage, node_config: &NodeConfigCached, disk_stats_every: ByteSize, + report_error: bool, stream_kind: BK, ) -> Result { let patches: Vec<_> = patch_it.collect(); @@ -61,6 +62,7 @@ where agg_kind.clone(), cache_usage.clone(), disk_stats_every.clone(), + report_error, ); let ret: Pin + Send>> = match PreBinnedScalarValueFetchedStream::new(&query, &node_config, &stream_kind) { @@ -104,7 +106,7 @@ where let inp = crate::agg::binnedt2::IntoBinnedT::into_binned_t(inp, range); Ok(Self { inp: Box::pin(inp), - stream_kind, + _stream_kind: stream_kind, }) } } diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 1ec494d..204ebd0 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,4 +1,3 @@ -use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; use crate::binned::{BinnedStreamKind, RangeCompletableItem}; use crate::cache::pbv::PreBinnedValueByteStream; @@ -77,6 +76,7 @@ pub struct BinnedQuery { channel: Channel, cache_usage: CacheUsage, disk_stats_every: ByteSize, + report_error: bool, } impl BinnedQuery { @@ -106,6 +106,11 @@ impl BinnedQuery { channel: channel_from_params(¶ms)?, cache_usage: CacheUsage::from_params(¶ms)?, disk_stats_every: ByteSize::kb(disk_stats_every), + report_error: params + .get("report_error") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse report_error {:?}", e)))?, }; info!("BinnedQuery::from_request {:?}", ret); Ok(ret) @@ -134,6 +139,10 @@ impl BinnedQuery { pub fn disk_stats_every(&self) -> &ByteSize { &self.disk_stats_every } + + pub fn report_error(&self) -> bool { + self.report_error + } } #[derive(Clone, Debug)] @@ -143,6 +152,7 @@ pub struct PreBinnedQuery { channel: Channel, cache_usage: CacheUsage, disk_stats_every: ByteSize, + report_error: bool, } impl PreBinnedQuery { @@ -152,6 +162,7 @@ impl PreBinnedQuery { agg_kind: AggKind, cache_usage: CacheUsage, disk_stats_every: ByteSize, + report_error: bool, ) -> Self { Self { patch, @@ -159,6 +170,7 @@ impl PreBinnedQuery { channel, cache_usage, disk_stats_every, + report_error, } } @@ -192,25 +204,35 @@ impl PreBinnedQuery { channel: channel_from_params(¶ms)?, cache_usage: CacheUsage::from_params(¶ms)?, disk_stats_every: ByteSize::kb(disk_stats_every), + report_error: params + .get("report_error") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse report_error {:?}", e)))?, }; Ok(ret) } pub fn make_query_string(&self) -> String { format!( - "{}&channel_backend={}&channel_name={}&agg_kind={}&cache_usage={}&disk_stats_every_kb={}", + "{}&channel_backend={}&channel_name={}&agg_kind={}&cache_usage={}&disk_stats_every_kb={}&report_error={}", self.patch.to_url_params_strings(), self.channel.backend, self.channel.name, self.agg_kind, self.cache_usage, self.disk_stats_every.bytes() / 1024, + self.report_error(), ) } pub fn patch(&self) -> &PreBinnedPatchCoord { &self.patch } + + pub fn report_error(&self) -> bool { + self.report_error + } } fn channel_from_params(params: &BTreeMap) -> Result { @@ -508,13 +530,17 @@ impl CacheFileDesc { } } +pub struct WrittenPbCache { + bytes: u64, +} + pub async fn write_pb_cache_min_max_avg_scalar( values: T, patch: PreBinnedPatchCoord, agg_kind: AggKind, channel: Channel, node_config: NodeConfigCached, -) -> Result<(), Error> +) -> Result where T: Serialize, { @@ -527,7 +553,7 @@ where let enc = serde_cbor::to_vec(&values)?; info!("Writing cache file size {}\n{:?}\npath: {:?}", enc.len(), cfd, path); tokio::fs::create_dir_all(path.parent().unwrap()).await?; - tokio::task::spawn_blocking({ + let res = tokio::task::spawn_blocking({ let path = path.clone(); move || { use fs2::FileExt; @@ -540,9 +566,10 @@ where f.lock_exclusive()?; f.write_all(&enc)?; f.unlock()?; - Ok::<_, Error>(()) + Ok::<_, Error>(enc.len()) } }) .await??; - Ok(()) + let ret = WrittenPbCache { bytes: res as u64 }; + Ok(ret) } diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index 8c819d0..8cf3a0b 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -1,9 +1,7 @@ -use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; -use crate::agg::streams::{Appendable, Collectable, Collected, StreamItem}; -use crate::binned::RangeCompletableItem::RangeComplete; -use crate::binned::{BinnedStreamKind, RangeCompletableItem}; +use crate::agg::streams::{Appendable, StreamItem}; +use crate::binned::{BinnedStreamKind, RangeCompletableItem, WithLen}; use crate::cache::pbvfs::PreBinnedScalarValueFetchedStream; -use crate::cache::{CacheFileDesc, MergedFromRemotes, PreBinnedQuery}; +use crate::cache::{CacheFileDesc, MergedFromRemotes, PreBinnedQuery, WrittenPbCache}; use crate::frame::makeframe::{make_frame, FrameType}; use crate::raw::EventsQuery; use crate::streamlog::Streamlog; @@ -96,7 +94,7 @@ where completed: bool, streamlog: Streamlog, values: ::TBinnedBins, - write_fut: Option> + Send>>>, + write_fut: Option> + Send>>>, read_cache_fut: Option< Pin< Box< @@ -200,6 +198,7 @@ where let q2 = self.query.clone(); let disk_stats_every = self.query.disk_stats_every.clone(); let stream_kind = self.stream_kind.clone(); + let report_error = self.query.report_error(); move |patch| { let query = PreBinnedQuery { patch, @@ -207,6 +206,7 @@ where agg_kind: q2.agg_kind.clone(), cache_usage: q2.cache_usage.clone(), disk_stats_every: disk_stats_every.clone(), + report_error, }; PreBinnedScalarValueFetchedStream::new(&query, &node_config, &stream_kind) } @@ -260,8 +260,9 @@ where self.cache_written = true; self.write_fut = None; match item { - Ok(()) => { - self.streamlog.append(Level::INFO, format!("cache file written")); + Ok(res) => { + self.streamlog + .append(Level::INFO, format!("cache file written bytes: {}", res.bytes)); continue 'outer; } Err(e) => { @@ -309,13 +310,10 @@ where } else { match self.query.cache_usage { super::CacheUsage::Use | super::CacheUsage::Recreate => { - err::todo(); let msg = format!( "write cache file query: {:?} bin count: {}", self.query.patch, - //self.values.ts1s.len() - // TODO create trait to extract number of bins from item: - 0 + self.values.len(), ); self.streamlog.append(Level::INFO, msg); let values = std::mem::replace( @@ -373,7 +371,6 @@ where match item { Ok(file) => { self.read_from_cache = true; - use crate::binned::ReadableFromFile; let fut = <::TBinnedBins as crate::binned::ReadableFromFile>::read_from_file(file)?; self.read_cache_fut = Some(Box::pin(fut)); continue 'outer; diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs index 29932a6..0b03984 100644 --- a/disk/src/cache/pbvfs.rs +++ b/disk/src/cache/pbvfs.rs @@ -1,4 +1,3 @@ -use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; use crate::binned::{BinnedStreamKind, RangeCompletableItem}; use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead, PreBinnedQuery}; @@ -10,7 +9,6 @@ use futures_util::{pin_mut, FutureExt}; use http::StatusCode; use netpod::log::*; use netpod::{NodeConfigCached, PerfOpts}; -use serde::{Deserialize, Serialize}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -23,7 +21,7 @@ where res: Option>, errored: bool, completed: bool, - stream_kind: BK, + _stream_kind: BK, } impl PreBinnedScalarValueFetchedStream @@ -46,7 +44,7 @@ where res: None, errored: false, completed: false, - stream_kind: stream_kind.clone(), + _stream_kind: stream_kind.clone(), }; Ok(ret) } diff --git a/disk/src/channelconfig.rs b/disk/src/channelconfig.rs index 55e298b..5eb88e1 100644 --- a/disk/src/channelconfig.rs +++ b/disk/src/channelconfig.rs @@ -268,10 +268,16 @@ pub async fn read_local_config(channel: &Channel, node: &Node) -> Result { + None, + Multiple, + Entry(&'a ConfigEntry), +} + pub fn extract_matching_config_entry<'a>( range: &NanoRange, channel_config: &'a Config, -) -> Result<&'a ConfigEntry, Error> { +) -> Result, Error> { let mut ixs = vec![]; for i1 in 0..channel_config.entries.len() { let e1 = &channel_config.entries[i1]; @@ -287,11 +293,12 @@ pub fn extract_matching_config_entry<'a>( } } if ixs.len() == 0 { - return Err(Error::with_msg(format!("no config entries found"))); + Ok(MatchingConfigEntry::None) } else if ixs.len() > 1 { - return Err(Error::with_msg(format!("too many config entries found: {}", ixs.len()))); + Ok(MatchingConfigEntry::Multiple) + } else { + Ok(MatchingConfigEntry::Entry(&channel_config.entries[ixs[0]])) } - Ok(&channel_config.entries[ixs[0]]) } #[cfg(test)] diff --git a/disk/src/frame/makeframe.rs b/disk/src/frame/makeframe.rs index 0fa1d5c..f365f29 100644 --- a/disk/src/frame/makeframe.rs +++ b/disk/src/frame/makeframe.rs @@ -1,7 +1,7 @@ use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; -use crate::binned::{RangeCompletableItem, XBinnedEvents}; +use crate::binned::RangeCompletableItem; use crate::frame::inmem::InMemoryFrame; use crate::raw::EventQueryJsonStringFrame; use bytes::{BufMut, BytesMut}; diff --git a/disk/src/merge.rs b/disk/src/merge.rs index b4f90e7..ebab73e 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -1,6 +1,4 @@ -use crate::agg::binnedt::AggregatableTdim; -use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; -use crate::agg::streams::{Appendable, Collectable, Collected, StatsItem, StreamItem}; +use crate::agg::streams::{Appendable, StatsItem, StreamItem}; use crate::binned::{BinnedStreamKind, PushableIndex, RangeCompletableItem, WithLen, WithTimestamps}; use crate::streamlog::LogItem; use err::Error; diff --git a/disk/src/raw/bffr.rs b/disk/src/raw/bffr.rs index f4ebf1b..1881579 100644 --- a/disk/src/raw/bffr.rs +++ b/disk/src/raw/bffr.rs @@ -1,7 +1,7 @@ use crate::agg::streams::StreamItem; use crate::binned::{BinnedStreamKind, RangeCompletableItem, XBinnedEvents}; use crate::frame::inmem::InMemoryFrameAsyncReadStream; -use crate::frame::makeframe::{decode_frame, FrameType}; +use crate::frame::makeframe::decode_frame; use err::Error; use futures_core::Stream; use futures_util::StreamExt; @@ -18,7 +18,7 @@ where inp: InMemoryFrameAsyncReadStream, errored: bool, completed: bool, - stream_kind: SK, + _stream_kind: SK, } impl EventsFromFrames @@ -31,7 +31,7 @@ where inp, errored: false, completed: false, - stream_kind, + _stream_kind: stream_kind, } } } diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 261e2d8..3df22f5 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -1,10 +1,9 @@ use crate::agg::binnedx::IntoBinnedXBins1; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; -use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; use crate::agg::IntoDim1F32Stream; use crate::binned::{BinnedStreamKind, BinnedStreamKindScalar, RangeCompletableItem}; -use crate::channelconfig::{extract_matching_config_entry, read_local_config}; +use crate::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; use crate::eventblobs::EventBlobsComplete; use crate::eventchunker::EventChunkerConf; use crate::frame::inmem::InMemoryFrameAsyncReadStream; @@ -136,10 +135,15 @@ async fn events_conn_handler_inner_try( Ok(k) => k, Err(e) => return Err((e, netout))?, }; - let entry = match extract_matching_config_entry(range, &channel_config) { + let entry_res = match extract_matching_config_entry(range, &channel_config) { Ok(k) => k, Err(e) => return Err((e, netout))?, }; + let entry = match entry_res { + MatchingConfigEntry::None => return Err((Error::with_msg("no config entry found"), netout))?, + MatchingConfigEntry::Multiple => return Err((Error::with_msg("multiple config entries found"), netout))?, + MatchingConfigEntry::Entry(entry) => entry, + }; let shape = match entry.to_shape() { Ok(k) => k, Err(e) => return Err((e, netout))?, @@ -196,7 +200,7 @@ async fn events_conn_handler_inner_try( } } // TODO define this case: - AggKind::DimXBinsN(n1) => match make_frame::< + AggKind::DimXBinsN(_xbincount) => match make_frame::< Result< StreamItem::XBinnedEvents>>, Error, diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 937f7c7..adf6ef0 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -244,27 +244,33 @@ async fn binned(req: Request, node_config: &NodeConfigCached) -> Result Result, Error> { - info!("binned_binary"); let ret = match disk::binned::binned_bytes_for_http(node_config, &query).await { Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("desc-BINNED")))?, Err(e) => { - error!("fn binned: {:?}", e); - response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? + if query.report_error() { + response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))? + } else { + error!("fn binned_binary: {:?}", e); + response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? + } } }; Ok(ret) } async fn binned_json(query: BinnedQuery, node_config: &NodeConfigCached) -> Result, Error> { - info!("binned_json"); let ret = match disk::binned::binned_json(node_config, &query).await { Ok(val) => { let body = serde_json::to_string(&val)?; response(StatusCode::OK).body(Body::from(body)) }?, Err(e) => { - error!("fn binned: {:?}", e); - response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? + if query.report_error() { + response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))? + } else { + error!("fn binned_json: {:?}", e); + response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? + } } }; Ok(ret) @@ -291,8 +297,12 @@ async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result ), ))?, Err(e) => { - error!("fn prebinned: {:?}", e); - response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? + if query.report_error() { + response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))? + } else { + error!("fn prebinned: {:?}", e); + response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? + } } }; Ok(ret) diff --git a/retrieval/src/client.rs b/retrieval/src/client.rs index 042384a..695b73f 100644 --- a/retrieval/src/client.rs +++ b/retrieval/src/client.rs @@ -50,7 +50,11 @@ pub async fn get_binned( let t1 = Utc::now(); let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; let uri = format!( - "http://{}:{}/api/4/binned?channel_backend={}&channel_name={}&beg_date={}&end_date={}&bin_count={}&cache_usage={}&disk_stats_every_kb={}", + concat!( + "http://{}:{}/api/4/binned?channel_backend={}&channel_name={}", + "&beg_date={}&end_date={}&bin_count={}&cache_usage={}", + "&disk_stats_every_kb={}&report_error=true", + ), host, port, channel_backend, @@ -65,13 +69,16 @@ pub async fn get_binned( let req = hyper::Request::builder() .method(http::Method::GET) .uri(uri) - .header("aCCepT", "application/octet-stream") + .header("accept", "application/octet-stream") .body(Body::empty())?; let client = hyper::Client::new(); let res = client.request(req).await?; if res.status() != StatusCode::OK { error!("Server error {:?}", res); - return Err(Error::with_msg(format!("Server error {:?}", res))); + let (head, body) = res.into_parts(); + let buf = hyper::body::to_bytes(body).await?; + let s = String::from_utf8_lossy(&buf); + return Err(Error::with_msg(format!("Server error {:?}\n---------------------- message from http body:\n{}\n---------------------- end of http body", head, s))); } let perf_opts = PerfOpts { inmem_bufcap: 512 }; let s1 = disk::cache::HttpBodyAsAsyncRead::new(res); diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index b305e41..73c19cb 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -101,7 +101,7 @@ where let disk_stats_every = ByteSize::kb(1024); // TODO have a function to form the uri, including perf opts: let uri = format!( - "http://{}:{}/api/4/binned?cache_usage=ignore&channel_backend={}&channel_name={}&bin_count={}&beg_date={}&end_date={}&disk_stats_every_kb={}", + "http://{}:{}/api/4/binned?cache_usage=use&channel_backend={}&channel_name={}&bin_count={}&beg_date={}&end_date={}&disk_stats_every_kb={}", node0.host, node0.port, channel_backend,