From 72006fbf175891ad8d11faf433500f88aecd09db Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 20 May 2021 13:27:46 +0200 Subject: [PATCH] Factor more --- disk/src/agg/scalarbinbatch.rs | 15 +- disk/src/agg/streams.rs | 16 +- disk/src/binned.rs | 264 ++++++++++-------- disk/src/frame/makeframe.rs | 7 +- httpret/src/lib.rs | 2 +- httpret/static/documentation/status-main.html | 2 +- retrieval/src/client.rs | 29 +- 7 files changed, 190 insertions(+), 145 deletions(-) diff --git a/disk/src/agg/scalarbinbatch.rs b/disk/src/agg/scalarbinbatch.rs index e7432eb..b5dc530 100644 --- a/disk/src/agg/scalarbinbatch.rs +++ b/disk/src/agg/scalarbinbatch.rs @@ -1,5 +1,5 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; -use crate::agg::streams::{Batchable, Bins}; +use crate::agg::streams::Bins; use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside}; use crate::binned::MakeBytesFrame; use crate::frame::makeframe::make_frame; @@ -234,17 +234,6 @@ impl AggregatableTdim for MinMaxAvgScalarBinBatch { } } -impl Batchable for MinMaxAvgScalarBinBatch { - fn append(&mut self, k: &mut Self) { - self.ts1s.append(&mut k.ts1s); - self.ts2s.append(&mut k.ts2s); - self.counts.append(&mut k.counts); - self.mins.append(&mut k.mins); - self.maxs.append(&mut k.maxs); - self.avgs.append(&mut k.avgs); - } -} - impl Bins for MinMaxAvgScalarBinBatch { fn bin_count(&self) -> u32 { self.ts1s.len() as u32 @@ -425,7 +414,7 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatchStreamItem { impl MakeBytesFrame for Result { fn make_bytes_frame(&self) -> Result { - Ok(make_frame::(self)?.freeze()) + Ok(make_frame(self)?.freeze()) } } diff --git a/disk/src/agg/streams.rs b/disk/src/agg/streams.rs index c76e8bf..24f2da0 100644 --- a/disk/src/agg/streams.rs +++ b/disk/src/agg/streams.rs @@ -1,4 +1,5 @@ use crate::streamlog::LogItem; +use err::Error; use netpod::EventDataReadStats; use serde::{Deserialize, Serialize}; @@ -18,6 +19,17 @@ pub trait Bins { fn bin_count(&self) -> u32; } -pub trait Batchable { - fn append(&mut self, k: &mut Self); +pub trait Collected { + fn new(bin_count_exp: u32) -> Self; + fn timed_out(&mut self, k: bool); +} + +pub trait Collectable { + type Collected: Collected; + fn append_to(&mut self, collected: &mut Self::Collected); +} + +pub trait ToJsonResult { + type Output; + fn to_json_result(&self) -> Result; } diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 2a6383d..085e556 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -1,10 +1,10 @@ use crate::agg::binnedt::IntoBinnedT; use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem}; -use crate::agg::streams::{Batchable, Bins, StatsItem, StreamItem}; +use crate::agg::streams::{Collectable, Collected, StatsItem, StreamItem, ToJsonResult}; use crate::binnedstream::{BinnedStream, BinnedStreamFromPreBinnedPatches}; use crate::cache::{BinnedQuery, MergedFromRemotes}; use crate::channelconfig::{extract_matching_config_entry, read_local_config}; -use crate::frame::makeframe::{make_frame, FrameType}; +use crate::frame::makeframe::make_frame; use crate::raw::EventsQuery; use bytes::Bytes; use chrono::{TimeZone, Utc}; @@ -30,14 +30,107 @@ pub enum BinnedScalarStreamItem { RangeComplete, } -impl MakeBytesFrame for Result, Error> { - fn make_bytes_frame(&self) -> Result { - Ok(make_frame::(self)?.freeze()) +pub struct MinMaxAvgScalarBinBatchCollected { + batch: MinMaxAvgScalarBinBatch, + timed_out: bool, + finalised_range: bool, + bin_count_exp: u32, +} + +impl MinMaxAvgScalarBinBatchCollected { + pub fn empty(bin_count_exp: u32) -> Self { + Self { + batch: MinMaxAvgScalarBinBatch::empty(), + timed_out: false, + finalised_range: false, + bin_count_exp, + } } } -impl FrameType for Result, Error> { - const FRAME_TYPE_ID: u32 = 0x02; +impl Collectable for BinnedScalarStreamItem { + type Collected = MinMaxAvgScalarBinBatchCollected; + + fn append_to(&mut self, collected: &mut Self::Collected) { + use BinnedScalarStreamItem::*; + match self { + Values(item) => { + append_to_min_max_avg_scalar_bin_batch(&mut collected.batch, item); + } + RangeComplete => { + // TODO use some other batch type in order to raise the range complete flag. + } + } + } +} + +fn append_to_min_max_avg_scalar_bin_batch(batch: &mut MinMaxAvgScalarBinBatch, item: &mut MinMaxAvgScalarBinBatch) { + batch.ts1s.append(&mut item.ts1s); + batch.ts2s.append(&mut item.ts2s); + batch.counts.append(&mut item.counts); + batch.mins.append(&mut item.mins); + batch.maxs.append(&mut item.maxs); + batch.avgs.append(&mut item.avgs); +} + +impl Collected for MinMaxAvgScalarBinBatchCollected { + fn new(bin_count_exp: u32) -> Self { + Self::empty(bin_count_exp) + } + + fn timed_out(&mut self, k: bool) { + self.timed_out = k; + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct MinMaxAvgScalarBinBatchCollectedJsonResult { + ts_bin_edges: Vec, + counts: Vec, + #[serde(skip_serializing_if = "Bool::is_false")] + finalised_range: bool, + #[serde(skip_serializing_if = "Zero::is_zero")] + missing_bins: u32, + #[serde(skip_serializing_if = "Option::is_none")] + continue_at: Option, +} + +impl ToJsonResult for MinMaxAvgScalarBinBatchCollected { + type Output = MinMaxAvgScalarBinBatchCollectedJsonResult; + + fn to_json_result(&self) -> Result { + let mut tsa: Vec<_> = self + .batch + .ts1s + .iter() + .map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64))) + .collect(); + if let Some(&z) = self.batch.ts2s.last() { + tsa.push(IsoDateTime(Utc.timestamp_nanos(z as i64))); + } + let continue_at = if self.batch.ts1s.len() < self.bin_count_exp as usize { + match tsa.last() { + Some(k) => Some(k.clone()), + None => Err(Error::with_msg("partial_content but no bin in result"))?, + } + } else { + None + }; + let ret = MinMaxAvgScalarBinBatchCollectedJsonResult { + counts: self.batch.counts.clone(), + missing_bins: self.bin_count_exp - self.batch.ts1s.len() as u32, + finalised_range: self.finalised_range, + ts_bin_edges: tsa, + continue_at, + }; + Ok(ret) + } +} + +impl MakeBytesFrame for Result, Error> { + fn make_bytes_frame(&self) -> Result { + Ok(make_frame(self)?.freeze()) + } } fn adapter_to_stream_item( @@ -135,26 +228,9 @@ pub async fn binned_bytes_for_http( node_config: &NodeConfigCached, query: &BinnedQuery, ) -> Result { - // TODO must decide here already which AggKind so that I can call into the generic code. - // Really here? - // But don't I need the channel config to make a decision if the requested binning actually makes sense? - // In that case, the function I call here, should return a boxed trait object. - // What traits must the returned stream fulfill? - // Also, the items in that case must be trait objects as well. - // But that is ok, because there are only few items in the stream anyways at this stage. - - // TODO what traits do I need downstream from here? - // TODO use boxed trait objects from here on. - // Must be able to convert them to bytes frames.. - // TODO but the json endpoint: how should that be handled? - // Maybe it is enough if the items can turn themselves into serde_json::Value ? - - //todo::todo; - let channel_config = read_local_config(&query.channel(), &node_config.node).await?; let entry = extract_matching_config_entry(query.range(), &channel_config)?; info!("binned_bytes_for_http found config entry {:?}", entry); - match query.agg_kind() { AggKind::DimXBins1 => { let res = binned_scalar_stream(node_config, query).await?; @@ -241,6 +317,53 @@ impl Serialize for IsoDateTime { } } +pub async fn collect_all( + stream: impl Stream, Error>> + Unpin, + bin_count_exp: u32, +) -> Result<::Collected, Error> +where + T: Collectable, +{ + let deadline = tokio::time::Instant::now() + Duration::from_millis(1000); + let mut main_item = ::Collected::new(bin_count_exp); + let mut i1 = 0; + let mut stream = stream; + loop { + // TODO use the trait instead to check if we have already at least one bin in the result: + let item = if i1 == 0 { + stream.next().await + } else { + match tokio::time::timeout_at(deadline, stream.next()).await { + Ok(k) => k, + Err(_) => { + main_item.timed_out(true); + None + } + } + }; + match item { + Some(item) => { + match item { + Ok(item) => match item { + StreamItem::Log(_) => {} + StreamItem::Stats(_) => {} + StreamItem::DataItem(mut item) => { + item.append_to(&mut main_item); + i1 += 1; + } + }, + Err(e) => { + // TODO Need to use some flags to get good enough error message for remote user. + Err(e)?; + } + }; + } + None => break, + } + } + Ok(main_item) +} + #[derive(Debug, Serialize, Deserialize)] pub struct BinnedJsonResult { ts_bin_edges: Vec, @@ -260,97 +383,8 @@ pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> // TODO create the matching stream based on AggKind and ConfigEntry. - // TODO must batch the whole result stream. Can I have a trait to append all to the first received? - - // TODO must convert the all-batched last item together. - - let deadline = tokio::time::Instant::now() + Duration::from_millis(1000); let t = binned_scalar_stream(node_config, query).await?; - let bin_count_exp = t.range.count; - let mut bin_count = 0; - let mut items = t.binned_stream; - let mut i1 = 0; - let mut partial_content = false; - let mut finalised_range = false; - - // TODO factor out the collecting: - // How can I make this generic on the item type? - let mut main_item: Option = None; - loop { - let item = if i1 == 0 { - items.next().await - } else { - match tokio::time::timeout_at(deadline, items.next()).await { - Ok(k) => k, - Err(_) => { - partial_content = true; - None - } - } - }; - match item { - Some(item) => { - match item { - Ok(item) => match item { - StreamItem::Log(_) => {} - StreamItem::Stats(_) => {} - StreamItem::DataItem(item) => match item { - BinnedScalarStreamItem::RangeComplete => { - finalised_range = true; - } - BinnedScalarStreamItem::Values(mut vals) => { - bin_count += vals.bin_count(); - match &mut main_item { - Some(main) => { - main.append(&mut vals); - } - None => { - main_item = Some(vals); - } - } - // TODO solve this via some trait to append to a batch: - // TODO gather stats about the batch sizes. - i1 += 1; - } - }, - }, - Err(e) => { - // TODO Need to use some flags to get good enough error message for remote user. - Err(e)?; - } - }; - } - None => break, - } - } - - // TODO handle the case when I didn't get any items.. - let batch = main_item.unwrap(); - let mut tsa: Vec<_> = batch - .ts1s - .iter() - .map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64))) - .collect(); - if let Some(&z) = batch.ts2s.last() { - tsa.push(IsoDateTime(Utc.timestamp_nanos(z as i64))); - } - let continue_at = if partial_content { - match tsa.last() { - Some(k) => Some(k.clone()), - None => Err(Error::with_msg("partial_content but no bin in result"))?, - } - } else { - None - }; - if bin_count_exp < bin_count as u64 { - Err(Error::with_msg("bin_count_exp < bin_count"))? - } - let ret = BinnedJsonResult { - ts_bin_edges: tsa, - counts: batch.counts, - missing_bins: bin_count_exp - bin_count as u64, - finalised_range, - continue_at, - }; + let collected = collect_all(t.binned_stream, t.range.count as u32).await?; + let ret = collected.to_json_result(); Ok(serde_json::to_value(ret)?) } diff --git a/disk/src/frame/makeframe.rs b/disk/src/frame/makeframe.rs index 1dd764a..318b4c2 100644 --- a/disk/src/frame/makeframe.rs +++ b/disk/src/frame/makeframe.rs @@ -1,4 +1,5 @@ -use crate::binned::BinnedBytesForHttpStreamFrame; +use crate::agg::streams::StreamItem; +use crate::binned::{BinnedBytesForHttpStreamFrame, BinnedScalarStreamItem}; use crate::cache::pbvfs::PreBinnedItem; use crate::frame::inmem::InMemoryFrame; use crate::raw::conn::RawConnOut; @@ -32,6 +33,10 @@ impl FrameType for Result { const FRAME_TYPE_ID: u32 = 0x05; } +impl FrameType for Result, Error> { + const FRAME_TYPE_ID: u32 = 0x06; +} + pub fn make_frame(item: &FT) -> Result where FT: FrameType + Serialize, diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 4f5c640..d87f208 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -145,7 +145,7 @@ async fn data_api_proxy_try(req: Request, node_config: &NodeConfigCached) } else if path.starts_with("/api/4/documentation/") { if req.method() == Method::GET { static_http!(path, "", "index.html", "text/html"); - static_http!(path, "style.css", "text/stylesheet"); + static_http!(path, "style.css", "text/css"); static_http!(path, "script.js", "text/javascript"); static_http!(path, "status-main.html", "text/html"); Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?) diff --git a/httpret/static/documentation/status-main.html b/httpret/static/documentation/status-main.html index da9b9e9..82db750 100644 --- a/httpret/static/documentation/status-main.html +++ b/httpret/static/documentation/status-main.html @@ -8,7 +8,7 @@ -

daqbuffer - Main Status

+

Retrieval - Main Status

diff --git a/retrieval/src/client.rs b/retrieval/src/client.rs index f29c89d..067094a 100644 --- a/retrieval/src/client.rs +++ b/retrieval/src/client.rs @@ -1,7 +1,9 @@ use chrono::{DateTime, Utc}; -use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem; +use disk::agg::streams::StreamItem; +use disk::binned::BinnedScalarStreamItem; use disk::cache::CacheUsage; use disk::frame::inmem::InMemoryFrameAsyncReadStream; +use disk::frame::makeframe::FrameType; use disk::streamlog::Streamlog; use err::Error; use futures_util::TryStreamExt; @@ -75,30 +77,33 @@ pub async fn get_binned( let s2 = InMemoryFrameAsyncReadStream::new(s1, perf_opts.inmem_bufcap); use futures_util::StreamExt; use std::future::ready; - let mut bin_count = 0; let s3 = s2 .map_err(|e| error!("get_binned {:?}", e)) .filter_map(|item| { let g = match item { Ok(frame) => { - type ExpectedType = disk::binned::BinnedBytesForHttpStreamFrame; + type _ExpectedType2 = disk::binned::BinnedBytesForHttpStreamFrame; + type ExpectedType = Result, Error>; + let type_id_exp = ::FRAME_TYPE_ID; + if frame.tyid() != type_id_exp { + error!("unexpected type id got {} exp {}", frame.tyid(), type_id_exp); + } let n1 = frame.buf().len(); match bincode::deserialize::(frame.buf()) { Ok(item) => match item { Ok(item) => { - match &item { - MinMaxAvgScalarBinBatchStreamItem::Log(item) => { - Streamlog::emit(item); + match item { + StreamItem::Log(item) => { + Streamlog::emit(&item); } - MinMaxAvgScalarBinBatchStreamItem::Values(item) => { - bin_count += 1; - info!("len {} values {:?}", n1, item); + StreamItem::Stats(item) => { + info!("Stats: {:?}", item); } - item => { - info!("len {} item {:?}", n1, item); + StreamItem::DataItem(item) => { + info!("DataItem: {:?}", item); } } - Some(Ok(item)) + Some(Ok(())) } Err(e) => { error!("len {} error frame {:?}", n1, e);