Return value for find matching config entry, fix warnings

This commit is contained in:
Dominik Werder
2021-05-26 18:28:57 +02:00
parent 11d1df238d
commit b3b2b3e4f7
19 changed files with 173 additions and 129 deletions

View File

@@ -1,5 +1,4 @@
use crate::agg::streams::StreamItem; use crate::agg::streams::StreamItem;
use crate::agg::AggregatableXdim1Bin;
use crate::binned::{BinnedStreamKind, RangeCompletableItem}; use crate::binned::{BinnedStreamKind, RangeCompletableItem};
use err::Error; use err::Error;
use futures_core::Stream; use futures_core::Stream;
@@ -115,7 +114,7 @@ where
fn cycle_current_bin(&mut self) { fn cycle_current_bin(&mut self) {
self.curbin += 1; self.curbin += 1;
let range = self.spec.get_range(self.curbin); let range = self.spec.get_range(self.curbin);
let ret = self let _ret = self
.aggtor .aggtor
.replace( .replace(
<<SK as BinnedStreamKind>::XBinnedEvents as AggregatableTdim<SK>>::aggregator_new_static( <<SK as BinnedStreamKind>::XBinnedEvents as AggregatableTdim<SK>>::aggregator_new_static(
@@ -125,8 +124,10 @@ where
// TODO handle None case, or remove Option if Agg is always present // TODO handle None case, or remove Option if Agg is always present
.unwrap() .unwrap()
.result(); .result();
//self.tmp_agg_results = VecDeque::from(ret); // TODO retire this module
err::todo();
self.tmp_agg_results = VecDeque::new(); self.tmp_agg_results = VecDeque::new();
//self.tmp_agg_results = VecDeque::from(ret);
if self.curbin >= self.spec.count as u32 { if self.curbin >= self.spec.count as u32 {
self.all_bins_emitted = true; self.all_bins_emitted = true;
} }
@@ -165,7 +166,8 @@ where
// TODO cycle_current_bin enqueues the bin, can I return here instead? // TODO cycle_current_bin enqueues the bin, can I return here instead?
None None
} else { } else {
let mut item = item; let item = item;
// TODO can we retire this module?
//ag.ingest(&mut item); //ag.ingest(&mut item);
ag.ingest(err::todoval()); ag.ingest(err::todoval());
let item = item; let item = item;

View File

@@ -1,6 +1,5 @@
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::StreamItem; use crate::agg::streams::StreamItem;
use crate::agg::AggregatableXdim1Bin;
use crate::binned::RangeCompletableItem; use crate::binned::RangeCompletableItem;
use err::Error; use err::Error;
use futures_core::Stream; use futures_core::Stream;

View File

@@ -1,7 +1,7 @@
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::StreamItem; use crate::agg::streams::StreamItem;
use crate::binned::{BinnedStreamKind, RangeCompletableItem, RangeOverlapInfo}; use crate::binned::{RangeCompletableItem, RangeOverlapInfo};
use err::Error; use err::Error;
use futures_core::Stream; use futures_core::Stream;
use futures_util::StreamExt; use futures_util::StreamExt;

View File

@@ -2,7 +2,7 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim};
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::{Appendable, StreamItem}; use crate::agg::streams::{Appendable, StreamItem};
use crate::agg::AggregatableXdim1Bin; use crate::agg::AggregatableXdim1Bin;
use crate::binned::{BinnedStreamKind, MakeBytesFrame, RangeCompletableItem, RangeOverlapInfo, WithTimestamps}; use crate::binned::{BinnedStreamKind, MakeBytesFrame, RangeCompletableItem, RangeOverlapInfo};
use crate::frame::makeframe::make_frame; use crate::frame::makeframe::make_frame;
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use err::Error; use err::Error;

View File

@@ -1,6 +1,4 @@
use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; use crate::binned::WithLen;
use crate::agg::AggregatableXdim1Bin;
use crate::binned::BinnedStreamKind;
use crate::streamlog::LogItem; use crate::streamlog::LogItem;
use err::Error; use err::Error;
use netpod::EventDataReadStats; use netpod::EventDataReadStats;
@@ -39,7 +37,7 @@ pub trait ToJsonResult {
fn to_json_result(&self) -> Result<Self::Output, Error>; fn to_json_result(&self) -> Result<Self::Output, Error>;
} }
pub trait Appendable { pub trait Appendable: WithLen {
fn empty() -> Self; fn empty() -> Self;
fn append(&mut self, src: &Self); fn append(&mut self, src: &Self);
} }

View File

@@ -1,15 +1,15 @@
use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim, IntoBinnedT}; use crate::agg::binnedt::AggregatableTdim;
use crate::agg::binnedt2::AggregatableTdim2; use crate::agg::binnedt2::AggregatableTdim2;
use crate::agg::binnedt3::{Agg3, BinnedT3Stream}; use crate::agg::binnedt3::{Agg3, BinnedT3Stream};
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchAggregator}; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::{Appendable, Collectable, Collected, StreamItem, ToJsonResult}; use crate::agg::streams::{Appendable, Collectable, Collected, StreamItem, ToJsonResult};
use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside}; use crate::agg::{Fits, FitsInside};
use crate::binned::scalar::binned_stream; use crate::binned::scalar::binned_stream;
use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BoxedStream}; use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BoxedStream};
use crate::cache::{BinnedQuery, MergedFromRemotes}; use crate::cache::{BinnedQuery, MergedFromRemotes};
use crate::channelconfig::{extract_matching_config_entry, read_local_config}; use crate::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry};
use crate::frame::makeframe::{make_frame, FrameType}; use crate::frame::makeframe::FrameType;
use crate::raw::EventsQuery; use crate::raw::EventsQuery;
use bytes::Bytes; use bytes::Bytes;
use chrono::{TimeZone, Utc}; use chrono::{TimeZone, Utc};
@@ -23,6 +23,7 @@ use netpod::{
use num_traits::Zero; use num_traits::Zero;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize, Serializer}; use serde::{Deserialize, Serialize, Serializer};
use serde_json::Map;
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
@@ -55,15 +56,6 @@ impl MinMaxAvgScalarBinBatchCollected {
} }
} }
fn append_to_min_max_avg_scalar_bin_batch(batch: &mut MinMaxAvgScalarBinBatch, item: &mut MinMaxAvgScalarBinBatch) {
batch.ts1s.append(&mut item.ts1s);
batch.ts2s.append(&mut item.ts2s);
batch.counts.append(&mut item.counts);
batch.mins.append(&mut item.mins);
batch.maxs.append(&mut item.maxs);
batch.avgs.append(&mut item.avgs);
}
impl Collected for MinMaxAvgScalarBinBatchCollected { impl Collected for MinMaxAvgScalarBinBatchCollected {
fn new(bin_count_exp: u32) -> Self { fn new(bin_count_exp: u32) -> Self {
Self::empty(bin_count_exp) Self::empty(bin_count_exp)
@@ -162,20 +154,29 @@ pub async fn binned_bytes_for_http(
query: &BinnedQuery, query: &BinnedQuery,
) -> Result<BinnedBytesStreamBox, Error> { ) -> Result<BinnedBytesStreamBox, Error> {
let channel_config = read_local_config(&query.channel(), &node_config.node).await?; let channel_config = read_local_config(&query.channel(), &node_config.node).await?;
let entry = extract_matching_config_entry(query.range(), &channel_config)?; match extract_matching_config_entry(query.range(), &channel_config)? {
info!("binned_bytes_for_http found config entry {:?}", entry); MatchingConfigEntry::None => {
match query.agg_kind() { // TODO can I use the same binned_stream machinery to construct the matching empty result?
AggKind::DimXBins1 => { let s = futures_util::stream::empty();
let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?; Ok(Box::pin(s))
let ret = BinnedBytesForHttpStream::new(res.binned_stream);
Ok(Box::pin(ret))
} }
AggKind::DimXBinsN(_) => { MatchingConfigEntry::Multiple => Err(Error::with_msg("multiple config entries found"))?,
// TODO pass a different stream kind here: MatchingConfigEntry::Entry(entry) => {
err::todo(); info!("binned_bytes_for_http found config entry {:?}", entry);
let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?; match query.agg_kind() {
let ret = BinnedBytesForHttpStream::new(res.binned_stream); AggKind::DimXBins1 => {
Ok(Box::pin(ret)) let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?;
let ret = BinnedBytesForHttpStream::new(res.binned_stream);
Ok(Box::pin(ret))
}
AggKind::DimXBinsN(_) => {
// TODO pass a different stream kind here:
err::todo();
let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?;
let ret = BinnedBytesForHttpStream::new(res.binned_stream);
Ok(Box::pin(ret))
}
}
} }
} }
} }
@@ -285,7 +286,7 @@ where
StreamItem::Stats(_) => {} StreamItem::Stats(_) => {}
StreamItem::DataItem(item) => match item { StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {} RangeCompletableItem::RangeComplete => {}
RangeCompletableItem::Data(mut item) => { RangeCompletableItem::Data(item) => {
item.append_to(&mut main_item); item.append_to(&mut main_item);
i1 += 1; i1 += 1;
} }
@@ -317,16 +318,36 @@ pub struct BinnedJsonResult {
pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result<serde_json::Value, Error> { pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result<serde_json::Value, Error> {
let channel_config = read_local_config(&query.channel(), &node_config.node).await?; let channel_config = read_local_config(&query.channel(), &node_config.node).await?;
let entry = extract_matching_config_entry(query.range(), &channel_config)?; match extract_matching_config_entry(query.range(), &channel_config)? {
info!("binned_json found config entry {:?}", entry); MatchingConfigEntry::None => {
// TODO can I use the same binned_stream machinery to construct the matching empty result?
// TODO create the matching stream based on AggKind and ConfigEntry. Ok(serde_json::Value::Object(Map::new()))
}
let t = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?; MatchingConfigEntry::Multiple => Err(Error::with_msg("multiple config entries found"))?,
// TODO need to collect also timeout, number of missing expected bins, ... MatchingConfigEntry::Entry(entry) => {
let collected = collect_all(t.binned_stream, t.range.count as u32).await?; info!("binned_json found config entry {:?}", entry);
let ret = ToJsonResult::to_json_result(&collected)?; match query.agg_kind() {
Ok(serde_json::to_value(ret)?) AggKind::DimXBins1 => {
let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?;
//let ret = BinnedBytesForHttpStream::new(res.binned_stream);
//Ok(Box::pin(ret))
// TODO need to collect also timeout, number of missing expected bins, ...
let collected = collect_all(res.binned_stream, res.range.count as u32).await?;
let ret = ToJsonResult::to_json_result(&collected)?;
Ok(serde_json::to_value(ret)?)
}
AggKind::DimXBinsN(_xbincount) => {
// TODO pass a different stream kind here:
err::todo();
let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?;
// TODO need to collect also timeout, number of missing expected bins, ...
let collected = collect_all(res.binned_stream, res.range.count as u32).await?;
let ret = ToJsonResult::to_json_result(&collected)?;
Ok(serde_json::to_value(ret)?)
}
}
}
}
} }
pub struct ReadPbv<T> pub struct ReadPbv<T>
@@ -586,6 +607,7 @@ impl BinnedStreamKind for BinnedStreamKindScalar {
query.cache_usage().clone(), query.cache_usage().clone(),
node_config, node_config,
query.disk_stats_every().clone(), query.disk_stats_every().clone(),
query.report_error(),
self.clone(), self.clone(),
)?; )?;
Ok(BoxedStream::new(Box::pin(s))?) Ok(BoxedStream::new(Box::pin(s))?)
@@ -611,26 +633,3 @@ impl BinnedStreamKind for BinnedStreamKindScalar {
Self::XBinnedToTBinnedStream::new(inp, spec) Self::XBinnedToTBinnedStream::new(inp, spec)
} }
} }
// TODO this code is needed somewhere:
fn pbv_handle_fut2_item(
item: StreamItem<RangeCompletableItem<MinMaxAvgScalarBinBatch>>,
) -> Option<StreamItem<RangeCompletableItem<MinMaxAvgScalarBinBatch>>> {
// TODO make this code work in this context:
// Do I need more parameters here?
/*Ok(item) => match item {
StreamItem::DataItem(item) => match item {
PreBinnedScalarItem::Batch(batch) => {
self.values.ts1s.extend(batch.ts1s.iter());
self.values.ts2s.extend(batch.ts2s.iter());
self.values.counts.extend(batch.counts.iter());
self.values.mins.extend(batch.mins.iter());
self.values.maxs.extend(batch.maxs.iter());
self.values.avgs.extend(batch.avgs.iter());
StreamItem::DataItem(PreBinnedScalarItem::Batch(batch))
}
},
},*/
err::todo();
None
}

View File

@@ -1,12 +1,8 @@
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::binned::{BinnedStreamKind, BinnedStreamRes};
use crate::agg::streams::StreamItem;
use crate::binned::{BinnedStreamKind, BinnedStreamRes, RangeCompletableItem};
use crate::binnedstream::BoxedStream; use crate::binnedstream::BoxedStream;
use crate::cache::BinnedQuery; use crate::cache::BinnedQuery;
use crate::frame::makeframe::FrameType;
use crate::raw::EventsQuery; use crate::raw::EventsQuery;
use err::Error; use err::Error;
use futures_core::Stream;
use netpod::log::*; use netpod::log::*;
use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchRange}; use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchRange};

View File

@@ -22,7 +22,7 @@ where
+ Send, + Send,
>, >,
>, >,
stream_kind: BK, _stream_kind: BK,
} }
impl<BK> BinnedScalarStreamFromPreBinnedPatches<BK> impl<BK> BinnedScalarStreamFromPreBinnedPatches<BK>
@@ -38,6 +38,7 @@ where
cache_usage: CacheUsage, cache_usage: CacheUsage,
node_config: &NodeConfigCached, node_config: &NodeConfigCached,
disk_stats_every: ByteSize, disk_stats_every: ByteSize,
report_error: bool,
stream_kind: BK, stream_kind: BK,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let patches: Vec<_> = patch_it.collect(); let patches: Vec<_> = patch_it.collect();
@@ -61,6 +62,7 @@ where
agg_kind.clone(), agg_kind.clone(),
cache_usage.clone(), cache_usage.clone(),
disk_stats_every.clone(), disk_stats_every.clone(),
report_error,
); );
let ret: Pin<Box<dyn Stream<Item = _> + Send>> = let ret: Pin<Box<dyn Stream<Item = _> + Send>> =
match PreBinnedScalarValueFetchedStream::new(&query, &node_config, &stream_kind) { match PreBinnedScalarValueFetchedStream::new(&query, &node_config, &stream_kind) {
@@ -104,7 +106,7 @@ where
let inp = crate::agg::binnedt2::IntoBinnedT::into_binned_t(inp, range); let inp = crate::agg::binnedt2::IntoBinnedT::into_binned_t(inp, range);
Ok(Self { Ok(Self {
inp: Box::pin(inp), inp: Box::pin(inp),
stream_kind, _stream_kind: stream_kind,
}) })
} }
} }

View File

@@ -1,4 +1,3 @@
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::StreamItem; use crate::agg::streams::StreamItem;
use crate::binned::{BinnedStreamKind, RangeCompletableItem}; use crate::binned::{BinnedStreamKind, RangeCompletableItem};
use crate::cache::pbv::PreBinnedValueByteStream; use crate::cache::pbv::PreBinnedValueByteStream;
@@ -77,6 +76,7 @@ pub struct BinnedQuery {
channel: Channel, channel: Channel,
cache_usage: CacheUsage, cache_usage: CacheUsage,
disk_stats_every: ByteSize, disk_stats_every: ByteSize,
report_error: bool,
} }
impl BinnedQuery { impl BinnedQuery {
@@ -106,6 +106,11 @@ impl BinnedQuery {
channel: channel_from_params(&params)?, channel: channel_from_params(&params)?,
cache_usage: CacheUsage::from_params(&params)?, cache_usage: CacheUsage::from_params(&params)?,
disk_stats_every: ByteSize::kb(disk_stats_every), disk_stats_every: ByteSize::kb(disk_stats_every),
report_error: params
.get("report_error")
.map_or("false", |k| k)
.parse()
.map_err(|e| Error::with_msg(format!("can not parse report_error {:?}", e)))?,
}; };
info!("BinnedQuery::from_request {:?}", ret); info!("BinnedQuery::from_request {:?}", ret);
Ok(ret) Ok(ret)
@@ -134,6 +139,10 @@ impl BinnedQuery {
pub fn disk_stats_every(&self) -> &ByteSize { pub fn disk_stats_every(&self) -> &ByteSize {
&self.disk_stats_every &self.disk_stats_every
} }
pub fn report_error(&self) -> bool {
self.report_error
}
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@@ -143,6 +152,7 @@ pub struct PreBinnedQuery {
channel: Channel, channel: Channel,
cache_usage: CacheUsage, cache_usage: CacheUsage,
disk_stats_every: ByteSize, disk_stats_every: ByteSize,
report_error: bool,
} }
impl PreBinnedQuery { impl PreBinnedQuery {
@@ -152,6 +162,7 @@ impl PreBinnedQuery {
agg_kind: AggKind, agg_kind: AggKind,
cache_usage: CacheUsage, cache_usage: CacheUsage,
disk_stats_every: ByteSize, disk_stats_every: ByteSize,
report_error: bool,
) -> Self { ) -> Self {
Self { Self {
patch, patch,
@@ -159,6 +170,7 @@ impl PreBinnedQuery {
channel, channel,
cache_usage, cache_usage,
disk_stats_every, disk_stats_every,
report_error,
} }
} }
@@ -192,25 +204,35 @@ impl PreBinnedQuery {
channel: channel_from_params(&params)?, channel: channel_from_params(&params)?,
cache_usage: CacheUsage::from_params(&params)?, cache_usage: CacheUsage::from_params(&params)?,
disk_stats_every: ByteSize::kb(disk_stats_every), disk_stats_every: ByteSize::kb(disk_stats_every),
report_error: params
.get("report_error")
.map_or("false", |k| k)
.parse()
.map_err(|e| Error::with_msg(format!("can not parse report_error {:?}", e)))?,
}; };
Ok(ret) Ok(ret)
} }
pub fn make_query_string(&self) -> String { pub fn make_query_string(&self) -> String {
format!( format!(
"{}&channel_backend={}&channel_name={}&agg_kind={}&cache_usage={}&disk_stats_every_kb={}", "{}&channel_backend={}&channel_name={}&agg_kind={}&cache_usage={}&disk_stats_every_kb={}&report_error={}",
self.patch.to_url_params_strings(), self.patch.to_url_params_strings(),
self.channel.backend, self.channel.backend,
self.channel.name, self.channel.name,
self.agg_kind, self.agg_kind,
self.cache_usage, self.cache_usage,
self.disk_stats_every.bytes() / 1024, self.disk_stats_every.bytes() / 1024,
self.report_error(),
) )
} }
pub fn patch(&self) -> &PreBinnedPatchCoord { pub fn patch(&self) -> &PreBinnedPatchCoord {
&self.patch &self.patch
} }
pub fn report_error(&self) -> bool {
self.report_error
}
} }
fn channel_from_params(params: &BTreeMap<String, String>) -> Result<Channel, Error> { fn channel_from_params(params: &BTreeMap<String, String>) -> Result<Channel, Error> {
@@ -508,13 +530,17 @@ impl CacheFileDesc {
} }
} }
pub struct WrittenPbCache {
bytes: u64,
}
pub async fn write_pb_cache_min_max_avg_scalar<T>( pub async fn write_pb_cache_min_max_avg_scalar<T>(
values: T, values: T,
patch: PreBinnedPatchCoord, patch: PreBinnedPatchCoord,
agg_kind: AggKind, agg_kind: AggKind,
channel: Channel, channel: Channel,
node_config: NodeConfigCached, node_config: NodeConfigCached,
) -> Result<(), Error> ) -> Result<WrittenPbCache, Error>
where where
T: Serialize, T: Serialize,
{ {
@@ -527,7 +553,7 @@ where
let enc = serde_cbor::to_vec(&values)?; let enc = serde_cbor::to_vec(&values)?;
info!("Writing cache file size {}\n{:?}\npath: {:?}", enc.len(), cfd, path); info!("Writing cache file size {}\n{:?}\npath: {:?}", enc.len(), cfd, path);
tokio::fs::create_dir_all(path.parent().unwrap()).await?; tokio::fs::create_dir_all(path.parent().unwrap()).await?;
tokio::task::spawn_blocking({ let res = tokio::task::spawn_blocking({
let path = path.clone(); let path = path.clone();
move || { move || {
use fs2::FileExt; use fs2::FileExt;
@@ -540,9 +566,10 @@ where
f.lock_exclusive()?; f.lock_exclusive()?;
f.write_all(&enc)?; f.write_all(&enc)?;
f.unlock()?; f.unlock()?;
Ok::<_, Error>(()) Ok::<_, Error>(enc.len())
} }
}) })
.await??; .await??;
Ok(()) let ret = WrittenPbCache { bytes: res as u64 };
Ok(ret)
} }

23
disk/src/cache/pbv.rs vendored
View File

@@ -1,9 +1,7 @@
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::{Appendable, StreamItem};
use crate::agg::streams::{Appendable, Collectable, Collected, StreamItem}; use crate::binned::{BinnedStreamKind, RangeCompletableItem, WithLen};
use crate::binned::RangeCompletableItem::RangeComplete;
use crate::binned::{BinnedStreamKind, RangeCompletableItem};
use crate::cache::pbvfs::PreBinnedScalarValueFetchedStream; use crate::cache::pbvfs::PreBinnedScalarValueFetchedStream;
use crate::cache::{CacheFileDesc, MergedFromRemotes, PreBinnedQuery}; use crate::cache::{CacheFileDesc, MergedFromRemotes, PreBinnedQuery, WrittenPbCache};
use crate::frame::makeframe::{make_frame, FrameType}; use crate::frame::makeframe::{make_frame, FrameType};
use crate::raw::EventsQuery; use crate::raw::EventsQuery;
use crate::streamlog::Streamlog; use crate::streamlog::Streamlog;
@@ -96,7 +94,7 @@ where
completed: bool, completed: bool,
streamlog: Streamlog, streamlog: Streamlog,
values: <SK as BinnedStreamKind>::TBinnedBins, values: <SK as BinnedStreamKind>::TBinnedBins,
write_fut: Option<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>, write_fut: Option<Pin<Box<dyn Future<Output = Result<WrittenPbCache, Error>> + Send>>>,
read_cache_fut: Option< read_cache_fut: Option<
Pin< Pin<
Box< Box<
@@ -200,6 +198,7 @@ where
let q2 = self.query.clone(); let q2 = self.query.clone();
let disk_stats_every = self.query.disk_stats_every.clone(); let disk_stats_every = self.query.disk_stats_every.clone();
let stream_kind = self.stream_kind.clone(); let stream_kind = self.stream_kind.clone();
let report_error = self.query.report_error();
move |patch| { move |patch| {
let query = PreBinnedQuery { let query = PreBinnedQuery {
patch, patch,
@@ -207,6 +206,7 @@ where
agg_kind: q2.agg_kind.clone(), agg_kind: q2.agg_kind.clone(),
cache_usage: q2.cache_usage.clone(), cache_usage: q2.cache_usage.clone(),
disk_stats_every: disk_stats_every.clone(), disk_stats_every: disk_stats_every.clone(),
report_error,
}; };
PreBinnedScalarValueFetchedStream::new(&query, &node_config, &stream_kind) PreBinnedScalarValueFetchedStream::new(&query, &node_config, &stream_kind)
} }
@@ -260,8 +260,9 @@ where
self.cache_written = true; self.cache_written = true;
self.write_fut = None; self.write_fut = None;
match item { match item {
Ok(()) => { Ok(res) => {
self.streamlog.append(Level::INFO, format!("cache file written")); self.streamlog
.append(Level::INFO, format!("cache file written bytes: {}", res.bytes));
continue 'outer; continue 'outer;
} }
Err(e) => { Err(e) => {
@@ -309,13 +310,10 @@ where
} else { } else {
match self.query.cache_usage { match self.query.cache_usage {
super::CacheUsage::Use | super::CacheUsage::Recreate => { super::CacheUsage::Use | super::CacheUsage::Recreate => {
err::todo();
let msg = format!( let msg = format!(
"write cache file query: {:?} bin count: {}", "write cache file query: {:?} bin count: {}",
self.query.patch, self.query.patch,
//self.values.ts1s.len() self.values.len(),
// TODO create trait to extract number of bins from item:
0
); );
self.streamlog.append(Level::INFO, msg); self.streamlog.append(Level::INFO, msg);
let values = std::mem::replace( let values = std::mem::replace(
@@ -373,7 +371,6 @@ where
match item { match item {
Ok(file) => { Ok(file) => {
self.read_from_cache = true; self.read_from_cache = true;
use crate::binned::ReadableFromFile;
let fut = <<SK as BinnedStreamKind>::TBinnedBins as crate::binned::ReadableFromFile>::read_from_file(file)?; let fut = <<SK as BinnedStreamKind>::TBinnedBins as crate::binned::ReadableFromFile>::read_from_file(file)?;
self.read_cache_fut = Some(Box::pin(fut)); self.read_cache_fut = Some(Box::pin(fut));
continue 'outer; continue 'outer;

View File

@@ -1,4 +1,3 @@
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::StreamItem; use crate::agg::streams::StreamItem;
use crate::binned::{BinnedStreamKind, RangeCompletableItem}; use crate::binned::{BinnedStreamKind, RangeCompletableItem};
use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead, PreBinnedQuery}; use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead, PreBinnedQuery};
@@ -10,7 +9,6 @@ use futures_util::{pin_mut, FutureExt};
use http::StatusCode; use http::StatusCode;
use netpod::log::*; use netpod::log::*;
use netpod::{NodeConfigCached, PerfOpts}; use netpod::{NodeConfigCached, PerfOpts};
use serde::{Deserialize, Serialize};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
@@ -23,7 +21,7 @@ where
res: Option<InMemoryFrameAsyncReadStream<HttpBodyAsAsyncRead>>, res: Option<InMemoryFrameAsyncReadStream<HttpBodyAsAsyncRead>>,
errored: bool, errored: bool,
completed: bool, completed: bool,
stream_kind: BK, _stream_kind: BK,
} }
impl<BK> PreBinnedScalarValueFetchedStream<BK> impl<BK> PreBinnedScalarValueFetchedStream<BK>
@@ -46,7 +44,7 @@ where
res: None, res: None,
errored: false, errored: false,
completed: false, completed: false,
stream_kind: stream_kind.clone(), _stream_kind: stream_kind.clone(),
}; };
Ok(ret) Ok(ret)
} }

View File

@@ -268,10 +268,16 @@ pub async fn read_local_config(channel: &Channel, node: &Node) -> Result<Config,
Ok(config.1) Ok(config.1)
} }
pub enum MatchingConfigEntry<'a> {
None,
Multiple,
Entry(&'a ConfigEntry),
}
pub fn extract_matching_config_entry<'a>( pub fn extract_matching_config_entry<'a>(
range: &NanoRange, range: &NanoRange,
channel_config: &'a Config, channel_config: &'a Config,
) -> Result<&'a ConfigEntry, Error> { ) -> Result<MatchingConfigEntry<'a>, Error> {
let mut ixs = vec![]; let mut ixs = vec![];
for i1 in 0..channel_config.entries.len() { for i1 in 0..channel_config.entries.len() {
let e1 = &channel_config.entries[i1]; let e1 = &channel_config.entries[i1];
@@ -287,11 +293,12 @@ pub fn extract_matching_config_entry<'a>(
} }
} }
if ixs.len() == 0 { if ixs.len() == 0 {
return Err(Error::with_msg(format!("no config entries found"))); Ok(MatchingConfigEntry::None)
} else if ixs.len() > 1 { } else if ixs.len() > 1 {
return Err(Error::with_msg(format!("too many config entries found: {}", ixs.len()))); Ok(MatchingConfigEntry::Multiple)
} else {
Ok(MatchingConfigEntry::Entry(&channel_config.entries[ixs[0]]))
} }
Ok(&channel_config.entries[ixs[0]])
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -1,7 +1,7 @@
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::StreamItem; use crate::agg::streams::StreamItem;
use crate::binned::{RangeCompletableItem, XBinnedEvents}; use crate::binned::RangeCompletableItem;
use crate::frame::inmem::InMemoryFrame; use crate::frame::inmem::InMemoryFrame;
use crate::raw::EventQueryJsonStringFrame; use crate::raw::EventQueryJsonStringFrame;
use bytes::{BufMut, BytesMut}; use bytes::{BufMut, BytesMut};

View File

@@ -1,6 +1,4 @@
use crate::agg::binnedt::AggregatableTdim; use crate::agg::streams::{Appendable, StatsItem, StreamItem};
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::agg::streams::{Appendable, Collectable, Collected, StatsItem, StreamItem};
use crate::binned::{BinnedStreamKind, PushableIndex, RangeCompletableItem, WithLen, WithTimestamps}; use crate::binned::{BinnedStreamKind, PushableIndex, RangeCompletableItem, WithLen, WithTimestamps};
use crate::streamlog::LogItem; use crate::streamlog::LogItem;
use err::Error; use err::Error;

View File

@@ -1,7 +1,7 @@
use crate::agg::streams::StreamItem; use crate::agg::streams::StreamItem;
use crate::binned::{BinnedStreamKind, RangeCompletableItem, XBinnedEvents}; use crate::binned::{BinnedStreamKind, RangeCompletableItem, XBinnedEvents};
use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::inmem::InMemoryFrameAsyncReadStream;
use crate::frame::makeframe::{decode_frame, FrameType}; use crate::frame::makeframe::decode_frame;
use err::Error; use err::Error;
use futures_core::Stream; use futures_core::Stream;
use futures_util::StreamExt; use futures_util::StreamExt;
@@ -18,7 +18,7 @@ where
inp: InMemoryFrameAsyncReadStream<T>, inp: InMemoryFrameAsyncReadStream<T>,
errored: bool, errored: bool,
completed: bool, completed: bool,
stream_kind: SK, _stream_kind: SK,
} }
impl<T, SK> EventsFromFrames<T, SK> impl<T, SK> EventsFromFrames<T, SK>
@@ -31,7 +31,7 @@ where
inp, inp,
errored: false, errored: false,
completed: false, completed: false,
stream_kind, _stream_kind: stream_kind,
} }
} }
} }

View File

@@ -1,10 +1,9 @@
use crate::agg::binnedx::IntoBinnedXBins1; use crate::agg::binnedx::IntoBinnedXBins1;
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::StreamItem; use crate::agg::streams::StreamItem;
use crate::agg::IntoDim1F32Stream; use crate::agg::IntoDim1F32Stream;
use crate::binned::{BinnedStreamKind, BinnedStreamKindScalar, RangeCompletableItem}; use crate::binned::{BinnedStreamKind, BinnedStreamKindScalar, RangeCompletableItem};
use crate::channelconfig::{extract_matching_config_entry, read_local_config}; use crate::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry};
use crate::eventblobs::EventBlobsComplete; use crate::eventblobs::EventBlobsComplete;
use crate::eventchunker::EventChunkerConf; use crate::eventchunker::EventChunkerConf;
use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::inmem::InMemoryFrameAsyncReadStream;
@@ -136,10 +135,15 @@ async fn events_conn_handler_inner_try(
Ok(k) => k, Ok(k) => k,
Err(e) => return Err((e, netout))?, Err(e) => return Err((e, netout))?,
}; };
let entry = match extract_matching_config_entry(range, &channel_config) { let entry_res = match extract_matching_config_entry(range, &channel_config) {
Ok(k) => k, Ok(k) => k,
Err(e) => return Err((e, netout))?, Err(e) => return Err((e, netout))?,
}; };
let entry = match entry_res {
MatchingConfigEntry::None => return Err((Error::with_msg("no config entry found"), netout))?,
MatchingConfigEntry::Multiple => return Err((Error::with_msg("multiple config entries found"), netout))?,
MatchingConfigEntry::Entry(entry) => entry,
};
let shape = match entry.to_shape() { let shape = match entry.to_shape() {
Ok(k) => k, Ok(k) => k,
Err(e) => return Err((e, netout))?, Err(e) => return Err((e, netout))?,
@@ -196,7 +200,7 @@ async fn events_conn_handler_inner_try(
} }
} }
// TODO define this case: // TODO define this case:
AggKind::DimXBinsN(n1) => match make_frame::< AggKind::DimXBinsN(_xbincount) => match make_frame::<
Result< Result<
StreamItem<RangeCompletableItem<<BinnedStreamKindScalar as BinnedStreamKind>::XBinnedEvents>>, StreamItem<RangeCompletableItem<<BinnedStreamKindScalar as BinnedStreamKind>::XBinnedEvents>>,
Error, Error,

View File

@@ -244,27 +244,33 @@ async fn binned(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Re
} }
async fn binned_binary(query: BinnedQuery, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> { async fn binned_binary(query: BinnedQuery, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
info!("binned_binary");
let ret = match disk::binned::binned_bytes_for_http(node_config, &query).await { let ret = match disk::binned::binned_bytes_for_http(node_config, &query).await {
Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("desc-BINNED")))?, Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("desc-BINNED")))?,
Err(e) => { Err(e) => {
error!("fn binned: {:?}", e); if query.report_error() {
response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))?
} else {
error!("fn binned_binary: {:?}", e);
response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?
}
} }
}; };
Ok(ret) Ok(ret)
} }
async fn binned_json(query: BinnedQuery, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> { async fn binned_json(query: BinnedQuery, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
info!("binned_json");
let ret = match disk::binned::binned_json(node_config, &query).await { let ret = match disk::binned::binned_json(node_config, &query).await {
Ok(val) => { Ok(val) => {
let body = serde_json::to_string(&val)?; let body = serde_json::to_string(&val)?;
response(StatusCode::OK).body(Body::from(body)) response(StatusCode::OK).body(Body::from(body))
}?, }?,
Err(e) => { Err(e) => {
error!("fn binned: {:?}", e); if query.report_error() {
response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))?
} else {
error!("fn binned_json: {:?}", e);
response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?
}
} }
}; };
Ok(ret) Ok(ret)
@@ -291,8 +297,12 @@ async fn prebinned(req: Request<Body>, node_config: &NodeConfigCached) -> Result
), ),
))?, ))?,
Err(e) => { Err(e) => {
error!("fn prebinned: {:?}", e); if query.report_error() {
response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))?
} else {
error!("fn prebinned: {:?}", e);
response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?
}
} }
}; };
Ok(ret) Ok(ret)

View File

@@ -50,7 +50,11 @@ pub async fn get_binned(
let t1 = Utc::now(); let t1 = Utc::now();
let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ";
let uri = format!( let uri = format!(
"http://{}:{}/api/4/binned?channel_backend={}&channel_name={}&beg_date={}&end_date={}&bin_count={}&cache_usage={}&disk_stats_every_kb={}", concat!(
"http://{}:{}/api/4/binned?channel_backend={}&channel_name={}",
"&beg_date={}&end_date={}&bin_count={}&cache_usage={}",
"&disk_stats_every_kb={}&report_error=true",
),
host, host,
port, port,
channel_backend, channel_backend,
@@ -65,13 +69,16 @@ pub async fn get_binned(
let req = hyper::Request::builder() let req = hyper::Request::builder()
.method(http::Method::GET) .method(http::Method::GET)
.uri(uri) .uri(uri)
.header("aCCepT", "application/octet-stream") .header("accept", "application/octet-stream")
.body(Body::empty())?; .body(Body::empty())?;
let client = hyper::Client::new(); let client = hyper::Client::new();
let res = client.request(req).await?; let res = client.request(req).await?;
if res.status() != StatusCode::OK { if res.status() != StatusCode::OK {
error!("Server error {:?}", res); error!("Server error {:?}", res);
return Err(Error::with_msg(format!("Server error {:?}", res))); let (head, body) = res.into_parts();
let buf = hyper::body::to_bytes(body).await?;
let s = String::from_utf8_lossy(&buf);
return Err(Error::with_msg(format!("Server error {:?}\n---------------------- message from http body:\n{}\n---------------------- end of http body", head, s)));
} }
let perf_opts = PerfOpts { inmem_bufcap: 512 }; let perf_opts = PerfOpts { inmem_bufcap: 512 };
let s1 = disk::cache::HttpBodyAsAsyncRead::new(res); let s1 = disk::cache::HttpBodyAsAsyncRead::new(res);

View File

@@ -101,7 +101,7 @@ where
let disk_stats_every = ByteSize::kb(1024); let disk_stats_every = ByteSize::kb(1024);
// TODO have a function to form the uri, including perf opts: // TODO have a function to form the uri, including perf opts:
let uri = format!( let uri = format!(
"http://{}:{}/api/4/binned?cache_usage=ignore&channel_backend={}&channel_name={}&bin_count={}&beg_date={}&end_date={}&disk_stats_every_kb={}", "http://{}:{}/api/4/binned?cache_usage=use&channel_backend={}&channel_name={}&bin_count={}&beg_date={}&end_date={}&disk_stats_every_kb={}",
node0.host, node0.host,
node0.port, node0.port,
channel_backend, channel_backend,