diff --git a/daqbuffer/src/test.rs b/daqbuffer/src/test.rs index 4e32540..8ed34bb 100644 --- a/daqbuffer/src/test.rs +++ b/daqbuffer/src/test.rs @@ -1,11 +1,11 @@ use crate::spawn_test_hosts; use bytes::BytesMut; use chrono::{DateTime, Utc}; -use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use disk::agg::streams::{Bins, StatsItem, StreamItem}; use disk::binned::query::{BinnedQuery, CacheUsage}; use disk::binned::{MinMaxAvgBins, RangeCompletableItem, WithLen}; use disk::frame::inmem::InMemoryFrameAsyncReadStream; +use disk::frame::makeframe::FrameType; use disk::streamlog::Streamlog; use err::Error; use futures_util::StreamExt; @@ -227,8 +227,10 @@ where None } StreamItem::DataItem(frame) => { - info!("test receives tyid {:x}", frame.tyid()); type ExpectedType = Result>>, Error>; + if frame.tyid() != ::FRAME_TYPE_ID { + error!("test receives unexpected tyid {:x}", frame.tyid()); + } match bincode::deserialize::(frame.buf()) { Ok(item) => match item { Ok(item) => match item { diff --git a/disk/src/agg/binnedt4.rs b/disk/src/agg/binnedt4.rs index 18bbeb7..affb846 100644 --- a/disk/src/agg/binnedt4.rs +++ b/disk/src/agg/binnedt4.rs @@ -21,6 +21,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tokio::fs::File; +// TODO no longer needed pub struct DefaultScalarEventsTimeBinner { _m1: PhantomData, } @@ -38,6 +39,7 @@ where } } +// TODO no longer needed pub struct DefaultSingleXBinTimeBinner { _m1: PhantomData, } diff --git a/disk/src/agg/streams.rs b/disk/src/agg/streams.rs index 0c3dc8e..01ff468 100644 --- a/disk/src/agg/streams.rs +++ b/disk/src/agg/streams.rs @@ -3,6 +3,7 @@ use crate::streamlog::LogItem; use err::Error; use netpod::EventDataReadStats; use serde::{Deserialize, Serialize}; +use std::any::Any; #[derive(Debug, Serialize, Deserialize)] pub enum StatsItem { @@ -32,6 +33,21 @@ pub trait Collectable { fn append_to(&self, collected: &mut Self::Collected); } +pub trait Collectable2: Any { + fn as_any_ref(&self) -> &dyn Any; + fn append(&mut self, src: &dyn Any); +} + +pub trait CollectionSpec2 { + // TODO Can I use here associated types and return concrete types? + // Probably not object safe. + fn empty(&self) -> Box; +} + +pub trait CollectionSpecMaker2 { + fn spec(bin_count_exp: u32) -> Box; +} + pub trait ToJsonResult { type Output; fn to_json_result(&self) -> Result; diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 21a7e8a..301391f 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -8,7 +8,9 @@ use crate::agg::binnedt4::{ use crate::agg::enp::{Identity, WaveXBinner, XBinnedScalarEvents}; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; -use crate::agg::streams::{Appendable, Collectable, Collected, StreamItem, ToJsonResult}; +use crate::agg::streams::{ + Appendable, Collectable, Collectable2, Collected, CollectionSpec2, CollectionSpecMaker2, StreamItem, ToJsonResult, +}; use crate::agg::{Fits, FitsInside}; use crate::binned::binnedfrompbv::BinnedFromPreBinned; use crate::binned::query::{BinnedQuery, PreBinnedQuery}; @@ -39,6 +41,7 @@ use parse::channelconfig::{extract_matching_config_entry, read_local_config, Mat use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize, Serializer}; use serde_json::Map; +use std::any::Any; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; @@ -49,6 +52,7 @@ use tokio::io::{AsyncRead, ReadBuf}; pub mod binnedfrompbv; pub mod pbv; +// TODO get rid of whole pbv2 mod? pub mod pbv2; pub mod prebinned; pub mod query; @@ -169,41 +173,40 @@ impl ToJsonResult for MinMaxAvgScalarBinBatch { } } -type BinnedBytesStreamBox = Pin> + Send>>; +pub trait BinnedResponseItem: Framable {} +impl BinnedResponseItem for T where T: Framable {} + +pub struct BinnedResponse { + stream: Pin> + Send>>, + bin_count: u32, + collection_spec: Box, +} // TODO Can I unify these functions with the ones from prebinned.rs? // They also must resolve to the same types, so would be good to unify. -fn make_num_pipeline_nty_end_evs_enp( +fn make_num_pipeline_nty_end_evs_enp( query: BinnedQuery, _event_value_shape: EVS, node_config: &NodeConfigCached, -) -> Result> + Send>>, Error> +) -> Result where NTY: NumOps + NumFromBytes + Serialize + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Output> + 'static, - ETB: EventsTimeBinner::Output> + 'static, ::Output: TimeBinnableType + PushableIndex + Appendable + 'static, <::Output as TimeBinnableType>::Output: TimeBinnableType::Output as TimeBinnableType>::Output> + Unpin, - ::Output: Serialize + ReadableFromFile + 'static, Sitemty< <<::Output as TimeBinnableType>::Aggregator as TimeBinnableTypeAggregator>::Output, >: Framable, // TODO require these things in general? Sitemty<::Output>: FrameType + Framable + 'static, - Sitemty<::Output>: FrameType + Framable + DeserializeOwned, - Sitemty< - <<::Output as TimeBinnableType>::Aggregator as TimeBinnableTypeAggregator>::Output, - >: FrameType + Framable + DeserializeOwned, + Sitemty<<::Output as TimeBinnableType>::Output>: + FrameType + Framable + DeserializeOwned, + <::Output as TimeBinnableType>::Output: CollectionSpecMaker2, { - // TODO construct the binned pipeline: - // Either take from prebinned sub sstream, or directly from a merged. - //let ret = crate::binned::pbv::PreBinnedValueStream::::new(query, node_config); - //let ret = StreamExt::map(ret, |item| Box::new(item) as Box); - //Box::pin(ret) if query.channel().backend != node_config.node.backend { let err = Error::with_msg(format!( "backend mismatch node: {} requested: {}", @@ -237,14 +240,23 @@ where query.disk_stats_every().clone(), query.report_error(), )? - .map(|item| Box::new(item) as Box); - Ok(Box::pin(s)) + .map(|item| Box::new(item) as Box); + let ret = BinnedResponse { + stream: Box::pin(s), + bin_count: range.count as u32, + collection_spec: + <<::Output as TimeBinnableType>::Output as CollectionSpecMaker2>::spec( + range.count as u32, + ), + }; + Ok(ret) } Ok(None) => { info!( "binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}", range ); + let bin_count = range.count as u32; let evq = EventsQuery { channel: query.channel().clone(), range: query.range().clone(), @@ -252,8 +264,16 @@ where }; let s = MergedFromRemotes2::::new(evq, perf_opts, node_config.node_config.cluster.clone()); let s = TBinnerStream::<_, ::Output>::new(s, range); - let s = StreamExt::map(s, |item| Box::new(item) as Box); - Ok(Box::pin(s)) + let s = StreamExt::map(s, |item| Box::new(item) as Box); + let ret = BinnedResponse { + stream: Box::pin(s), + bin_count, + collection_spec: + <<::Output as TimeBinnableType>::Output as CollectionSpecMaker2>::spec( + range.count as u32, + ), + }; + Ok(ret) } Err(e) => Err(e), } @@ -263,26 +283,22 @@ fn make_num_pipeline_nty_end( shape: Shape, query: BinnedQuery, node_config: &NodeConfigCached, -) -> Result> + Send>>, Error> +) -> Result where NTY: NumOps + NumFromBytes + Serialize + 'static, END: Endianness + 'static, { match shape { - Shape::Scalar => { - make_num_pipeline_nty_end_evs_enp::, DefaultScalarEventsTimeBinner>( - query, - EventValuesDim0Case::new(), - node_config, - ) - } - Shape::Wave(n) => { - make_num_pipeline_nty_end_evs_enp::, DefaultSingleXBinTimeBinner>( - query, - EventValuesDim1Case::new(n), - node_config, - ) - } + Shape::Scalar => make_num_pipeline_nty_end_evs_enp::>( + query, + EventValuesDim0Case::new(), + node_config, + ), + Shape::Wave(n) => make_num_pipeline_nty_end_evs_enp::>( + query, + EventValuesDim1Case::new(n), + node_config, + ), } } @@ -301,7 +317,7 @@ fn make_num_pipeline( shape: Shape, query: BinnedQuery, node_config: &NodeConfigCached, -) -> Result> + Send>>, Error> { +) -> Result { match scalar_type { ScalarType::I32 => match_end!(i32, byte_order, shape, query, node_config), ScalarType::F64 => match_end!(f64, byte_order, shape, query, node_config), @@ -309,10 +325,20 @@ fn make_num_pipeline( } } -pub async fn binned_bytes_for_http( - node_config: &NodeConfigCached, +pub trait PipelinePostProcess { + type Input; + type Output; + fn post(inp: Self::Input) -> Self::Output; +} + +// TODO return impl Stream instead. +async fn make_num_pipeline_for_entry( query: &BinnedQuery, -) -> Result { + node_config: &NodeConfigCached, +) -> Result +where + PPP: PipelinePostProcess, +{ let channel_config = read_local_config(&query.channel(), &node_config.node).await?; match extract_matching_config_entry(query.range(), &channel_config)? { MatchingConfigEntry::Multiple => Err(Error::with_msg("multiple config entries found"))?, @@ -320,43 +346,51 @@ pub async fn binned_bytes_for_http( // TODO can I use the same binned_stream machinery to construct the matching empty result? // Need the requested range all with empty/nan values and zero counts. let s = futures_util::stream::empty(); - Ok(Box::pin(s)) + let ret = BinnedResponse { + stream: Box::pin(s), + bin_count: 0, + collection_spec: as CollectionSpecMaker2>::spec(0), + }; + Ok(ret) } MatchingConfigEntry::Entry(entry) => { // TODO make this a stream log: info!("binned_bytes_for_http found config entry {:?}", entry); - let res = make_num_pipeline( + let ret = make_num_pipeline( entry.scalar_type.clone(), entry.byte_order.clone(), entry.to_shape()?, query.clone(), node_config, )?; - let res = res.map(|item| item.make_frame()); - let res = res.map(|item| match item { - Ok(item) => Ok(item.freeze()), - Err(e) => Err(e), - }); - let res = Box::pin(res); - return Ok(res); - match query.agg_kind() { - AggKind::DimXBins1 => { - let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?; - let ret = BinnedBytesForHttpStream::new(res.binned_stream); - Ok(Box::pin(ret)) - } - AggKind::DimXBinsN(_) => { - // TODO pass a different stream kind here: - err::todo(); - let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?; - let ret = BinnedBytesForHttpStream::new(res.binned_stream); - Ok(Box::pin(ret)) - } - } + Ok(ret) } } } +struct ToFrameBytes {} + +impl PipelinePostProcess for ToFrameBytes { + type Input = (); + type Output = (); + + fn post(inp: Self::Input) -> Self::Output { + todo!() + } +} + +pub async fn binned_bytes_for_http( + node_config: &NodeConfigCached, + query: &BinnedQuery, +) -> Result> + Send>>, Error> { + let pl = make_num_pipeline_for_entry::(query, node_config).await?; + let ret = pl.stream.map(|item| match item.make_frame() { + Ok(item) => Ok(item.freeze()), + Err(e) => Err(e), + }); + Ok(Box::pin(ret)) +} + pub struct BinnedBytesForHttpStream { inp: S, errored: bool, @@ -430,15 +464,13 @@ impl Serialize for IsoDateTime { } } -pub async fn collect_all( - stream: impl Stream>, Error>> + Unpin, - bin_count_exp: u32, -) -> Result<::Collected, Error> +pub async fn collect_all(stream: S, collection_spec: Box) -> Result where - T: Collectable, + S: Stream>> + Unpin, { let deadline = tokio::time::Instant::now() + Duration::from_millis(1000); - let mut main_item = ::Collected::new(bin_count_exp); + //let mut main_item = ::Collected::new(bin_count_exp); + let mut main_item = collection_spec.empty(); let mut i1 = 0; let mut stream = stream; loop { @@ -449,7 +481,13 @@ where match tokio::time::timeout_at(deadline, stream.next()).await { Ok(k) => k, Err(_) => { - main_item.timed_out(true); + // TODO + + // TODO + + // TODO + + //main_item.timed_out(true); None } } @@ -463,7 +501,8 @@ where StreamItem::DataItem(item) => match item { RangeCompletableItem::RangeComplete => {} RangeCompletableItem::Data(item) => { - item.append_to(&mut main_item); + main_item.append(&item); + //item.append_to(&mut main_item); i1 += 1; } }, @@ -477,11 +516,12 @@ where None => break, } } - Ok(main_item) + Ok(err::todoval()) } +// TODO remove #[derive(Debug, Serialize, Deserialize)] -pub struct BinnedJsonResult { +pub struct UnusedBinnedJsonResult { ts_bin_edges: Vec, counts: Vec, #[serde(skip_serializing_if = "Bool::is_false")] @@ -493,37 +533,12 @@ pub struct BinnedJsonResult { } pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result { - let channel_config = read_local_config(&query.channel(), &node_config.node).await?; - match extract_matching_config_entry(query.range(), &channel_config)? { - MatchingConfigEntry::None => { - // TODO can I use the same binned_stream machinery to construct the matching empty result? - Ok(serde_json::Value::Object(Map::new())) - } - MatchingConfigEntry::Multiple => Err(Error::with_msg("multiple config entries found"))?, - MatchingConfigEntry::Entry(entry) => { - info!("binned_json found config entry {:?}", entry); - match query.agg_kind() { - AggKind::DimXBins1 => { - let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?; - //let ret = BinnedBytesForHttpStream::new(res.binned_stream); - //Ok(Box::pin(ret)) - // TODO need to collect also timeout, number of missing expected bins, ... - let collected = collect_all(res.binned_stream, res.range.count as u32).await?; - let ret = ToJsonResult::to_json_result(&collected)?; - Ok(serde_json::to_value(ret)?) - } - AggKind::DimXBinsN(_xbincount) => { - // TODO pass a different stream kind here: - err::todo(); - let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?; - // TODO need to collect also timeout, number of missing expected bins, ... - let collected = collect_all(res.binned_stream, res.range.count as u32).await?; - let ret = ToJsonResult::to_json_result(&collected)?; - Ok(serde_json::to_value(ret)?) - } - } - } - } + err::todoval() + /*let pl = make_num_pipeline_for_entry(query, node_config).await?; + let collected = collect_all(pl.stream, pl.bin_count).await?; + let ret = ToJsonResult::to_json_result(&collected)?; + let ret = serde_json::to_value(ret)?; + Ok(ret)*/ } pub struct ReadPbv @@ -933,6 +948,52 @@ where } } +pub struct MinMaxAvgBinsCollected { + _m1: PhantomData, +} + +impl MinMaxAvgBinsCollected { + pub fn new() -> Self { + Self { _m1: PhantomData } + } +} + +impl Collectable2 for Sitemty> +where + NTY: 'static, +{ + fn as_any_ref(&self) -> &dyn Any { + self + } + + fn append(&mut self, src: &dyn Any) { + todo!() + } +} + +pub struct MinMaxAvgBinsCollectionSpec { + bin_count_exp: u32, + _m1: PhantomData, +} + +impl CollectionSpec2 for MinMaxAvgBinsCollectionSpec { + fn empty(&self) -> Box { + Box::new(MinMaxAvgBins::::empty()) + } +} + +impl CollectionSpecMaker2 for MinMaxAvgBins +where + NTY: 'static, +{ + fn spec(bin_count_exp: u32) -> Box { + Box::new(MinMaxAvgBinsCollectionSpec:: { + bin_count_exp, + _m1: PhantomData, + }) + } +} + pub struct MinMaxAvgAggregator { range: NanoRange, count: u32,