diff --git a/daqbuffer/src/test/json.rs b/daqbuffer/src/test/json.rs index 58f1baf..0e236c0 100644 --- a/daqbuffer/src/test/json.rs +++ b/daqbuffer/src/test/json.rs @@ -58,7 +58,11 @@ async fn get_binned_json_0_inner2( } let res = hyper::body::to_bytes(res.into_body()).await?; let res = String::from_utf8(res.to_vec())?; - info!("result from endpoint: [[[\n{}\n]]]", res); + let res: serde_json::Value = serde_json::from_str(res.as_str())?; + info!( + "result from endpoint: --------------\n{}\n--------------", + serde_json::to_string_pretty(&res)? + ); let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; info!("get_binned_json_0 DONE time {} ms", ms); diff --git a/disk/src/agg/streams.rs b/disk/src/agg/streams.rs index d13eb2c..06d52cb 100644 --- a/disk/src/agg/streams.rs +++ b/disk/src/agg/streams.rs @@ -17,20 +17,29 @@ pub enum StreamItem { Stats(StatsItem), } +// TODO remove in favor of WithLen: pub trait Bins { fn bin_count(&self) -> u32; } -// TODO this is meant for bins, count them, collect range-complete, and deliver these -// information also to the client. +// TODO remove: pub trait Collected { fn new(bin_count_exp: u32) -> Self; fn timed_out(&mut self, k: bool); } +pub trait Collector { + type Input: Collectable; + type Output: Serialize; + fn ingest(&mut self, src: &Self::Input); + fn set_range_complete(&mut self); + fn set_timed_out(&mut self); + fn result(self) -> Result; +} + pub trait Collectable { - type Collected: Collected; - fn append_to(&self, collected: &mut Self::Collected); + type Collector: Collector; + fn new_collector(bin_count_exp: u32) -> Self::Collector; } // TODO can be removed? @@ -39,12 +48,14 @@ pub trait Collectable2: Any { fn append(&mut self, src: &dyn Any); } +// TODO remove pub trait CollectionSpec2 { // TODO Can I use here associated types and return concrete types? // Probably not object safe. fn empty(&self) -> Box; } +// TODO rmove pub trait CollectionSpecMaker2 { fn spec(bin_count_exp: u32) -> Box; } diff --git a/disk/src/binned.rs b/disk/src/binned.rs index c0219b0..93c3045 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -9,8 +9,8 @@ use crate::agg::enp::{Identity, WaveXBinner, XBinnedScalarEvents}; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::{ - Appendable, Collectable, Collectable2, Collected, CollectionSpec2, CollectionSpecMaker2, StreamItem, ToJsonBytes, - ToJsonResult, + Appendable, Collectable, Collectable2, Collected, CollectionSpec2, CollectionSpecMaker2, Collector, StreamItem, + ToJsonBytes, ToJsonResult, }; use crate::agg::{Fits, FitsInside}; use crate::binned::binnedfrompbv::BinnedFromPreBinned; @@ -89,20 +89,6 @@ impl Collected for MinMaxAvgScalarBinBatchCollected { } } -impl Collectable for MinMaxAvgScalarBinBatch { - type Collected = MinMaxAvgScalarBinBatchCollected; - - fn append_to(&self, collected: &mut Self::Collected) { - let batch = &mut collected.batch; - batch.ts1s.extend_from_slice(&self.ts1s); - batch.ts2s.extend_from_slice(&self.ts2s); - batch.counts.extend_from_slice(&self.counts); - batch.mins.extend_from_slice(&self.mins); - batch.maxs.extend_from_slice(&self.maxs); - batch.avgs.extend_from_slice(&self.avgs); - } -} - #[derive(Debug, Serialize, Deserialize)] pub struct MinMaxAvgScalarBinBatchCollectedJsonResult { #[serde(rename = "tsBinEdges")] @@ -293,9 +279,8 @@ where FrameType + Framable + DeserializeOwned, Sitemty<<::Output as TimeBinnableType>::Output>: ToJsonResult + Framable, { - let _ = ppp; let res = make_num_pipeline_nty_end_evs_enp_stat::<_, _, _, ENP>(event_value_shape, query, node_config)?; - let s = PPP::convert(res.stream); + let s = ppp.convert(res.stream, res.bin_count); let ret = BinnedResponseDyn { stream: Box::pin(s), bin_count: res.bin_count, @@ -411,41 +396,135 @@ where } } -pub trait PipelinePostProcessA { - fn unused(&self); -} +pub trait PipelinePostProcessA {} -struct Ppp1 {} +struct MakeBoxedItems {} -impl PipelinePostProcessA for Ppp1 { - fn unused(&self) { - todo!() - } -} +impl PipelinePostProcessA for MakeBoxedItems {} pub trait PipelinePostProcessB { fn convert( + &self, inp: Pin> + Send>>, + bin_count_exp: u32, ) -> Pin> + Send>>; } -impl PipelinePostProcessB> for Ppp1 +impl PipelinePostProcessB> for MakeBoxedItems where NTY: NumOps, { fn convert( + &self, inp: Pin>> + Send>>, + bin_count_exp: u32, ) -> Pin> + Send>> { let s = StreamExt::map(inp, |item| Box::new(item) as Box); Box::pin(s) } } +struct CollectForJson {} + +impl CollectForJson { + pub fn new() -> Self { + Self {} + } +} + +impl PipelinePostProcessA for CollectForJson {} + +pub struct JsonCollector { + fut: Pin> + Send>>, + done: bool, +} + +impl JsonCollector { + pub fn new(inp: Pin>> + Send>>, bin_count_exp: u32) -> Self + where + NTY: NumOps + Serialize + 'static, + { + let fut = collect_all(inp, bin_count_exp); + let fut = Box::pin(fut); + Self { fut, done: false } + } +} + +impl Framable for Sitemty { + fn make_frame(&self) -> Result { + panic!() + } +} + +impl ToJsonBytes for serde_json::Value { + fn to_json_bytes(&self) -> Result, Error> { + Ok(serde_json::to_vec(self)?) + } +} + +impl ToJsonResult for Sitemty { + fn to_json_result(&self) -> Result, Error> { + match self { + Ok(item) => match item { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::Data(item) => Ok(Box::new(item.clone())), + RangeCompletableItem::RangeComplete => Err(Error::with_msg("logic")), + }, + StreamItem::Log(_) => Err(Error::with_msg("logic")), + StreamItem::Stats(_) => Err(Error::with_msg("logic")), + }, + Err(_) => Err(Error::with_msg("logic")), + } + } +} + +impl Stream for JsonCollector { + type Item = Box; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break if self.done { + Ready(None) + } else { + match self.fut.poll_unpin(cx) { + Ready(Ok(item)) => { + let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); + let item = Box::new(item) as Box; + self.done = true; + Ready(Some(item)) + } + Ready(Err(e)) => { + let item = Err::>, _>(e); + let item = Box::new(item) as Box; + Ready(Some(item)) + } + Pending => Pending, + } + }; + } + } +} + +impl PipelinePostProcessB> for CollectForJson +where + NTY: NumOps, +{ + fn convert( + &self, + inp: Pin>> + Send>>, + bin_count_exp: u32, + ) -> Pin> + Send>> { + let s = JsonCollector::new(inp, bin_count_exp); + Box::pin(s) + } +} + pub async fn binned_bytes_for_http( query: &BinnedQuery, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { - let pl = make_num_pipeline::(query, Ppp1 {}, node_config).await?; + let pl = make_num_pipeline::(query, MakeBoxedItems {}, node_config).await?; let ret = pl.stream.map(|item| { let fr = item.make_frame(); let fr = fr?; @@ -527,13 +606,13 @@ impl Serialize for IsoDateTime { } } -pub async fn collect_all(stream: S, collection_spec: Box) -> Result +pub async fn collect_all(stream: S, bin_count_exp: u32) -> Result where - S: Stream>> + Unpin, + S: Stream> + Unpin, + T: Collectable, { let deadline = tokio::time::Instant::now() + Duration::from_millis(1000); - //let mut main_item = ::Collected::new(bin_count_exp); - let mut main_item = collection_spec.empty(); + let mut collector = ::new_collector(bin_count_exp); let mut i1 = 0; let mut stream = stream; loop { @@ -544,13 +623,7 @@ where match tokio::time::timeout_at(deadline, stream.next()).await { Ok(k) => k, Err(_) => { - // TODO - - // TODO - - // TODO - - //main_item.timed_out(true); + collector.set_timed_out(); None } } @@ -562,10 +635,11 @@ where StreamItem::Log(_) => {} StreamItem::Stats(_) => {} StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => {} + RangeCompletableItem::RangeComplete => { + collector.set_range_complete(); + } RangeCompletableItem::Data(item) => { - main_item.append(&item); - //item.append_to(&mut main_item); + collector.ingest(&item); i1 += 1; } }, @@ -579,31 +653,21 @@ where None => break, } } - Ok(err::todoval()) -} - -// TODO remove -#[derive(Debug, Serialize, Deserialize)] -pub struct UnusedBinnedJsonResult { - ts_bin_edges: Vec, - counts: Vec, - #[serde(skip_serializing_if = "Bool::is_false")] - finalised_range: bool, - #[serde(skip_serializing_if = "Zero::is_zero")] - missing_bins: u64, - #[serde(skip_serializing_if = "Option::is_none")] - continue_at: Option, -} - -pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result { - err::todoval() - /* - let pl = make_num_pipeline_for_entry(query, node_config).await?; - let collected = collect_all(pl.stream, pl.bin_count).await?; - let ret = ToJsonResult::to_json_result(&collected)?; - let ret = serde_json::to_value(ret)?; + let ret = serde_json::to_value(collector.result()?)?; Ok(ret) - */ +} + +pub async fn binned_json( + node_config: &NodeConfigCached, + query: &BinnedQuery, +) -> Result> + Send>>, Error> { + let pl = make_num_pipeline(query, CollectForJson::new(), node_config).await?; + let ret = pl.stream.map(|item| { + let fr = item.to_json_result()?; + let buf = fr.to_json_bytes()?; + Ok(Bytes::from(buf)) + }); + Ok(Box::pin(ret)) } pub struct ReadPbv @@ -767,7 +831,6 @@ pub trait TBinnedBins: + Send + Serialize + DeserializeOwned - + Collectable + ReadableFromFile + FilterFittingInside + AggregatableTdim2 @@ -1063,6 +1126,105 @@ where } } +#[derive(Serialize)] +pub struct MinMaxAvgBinsCollectedResult { + ts_bin_edges: Vec, + counts: Vec, + mins: Vec>, + maxs: Vec>, + avgs: Vec>, + #[serde(skip_serializing_if = "Bool::is_false", rename = "finalisedRange")] + finalised_range: bool, + #[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")] + missing_bins: u32, + #[serde(skip_serializing_if = "Option::is_none", rename = "continueAt")] + continue_at: Option, +} + +pub struct MinMaxAvgBinsCollector { + bin_count_exp: u32, + timed_out: bool, + range_complete: bool, + vals: MinMaxAvgBins, + _m1: PhantomData, +} + +impl MinMaxAvgBinsCollector { + pub fn new(bin_count_exp: u32) -> Self { + Self { + bin_count_exp, + timed_out: false, + range_complete: false, + vals: MinMaxAvgBins::::empty(), + _m1: PhantomData, + } + } +} + +impl Collector for MinMaxAvgBinsCollector +where + NTY: NumOps + Serialize, +{ + type Input = MinMaxAvgBins; + type Output = MinMaxAvgBinsCollectedResult; + + fn ingest(&mut self, src: &Self::Input) { + Appendable::append(&mut self.vals, src); + } + + fn set_range_complete(&mut self) { + self.range_complete = true; + } + + fn set_timed_out(&mut self) { + self.timed_out = true; + } + + fn result(self) -> Result { + let bin_count = self.vals.ts1s.len() as u32; + let mut tsa: Vec<_> = self + .vals + .ts1s + .iter() + .map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64))) + .collect(); + if let Some(&z) = self.vals.ts2s.last() { + tsa.push(IsoDateTime(Utc.timestamp_nanos(z as i64))); + } + let tsa = tsa; + let continue_at = if self.vals.ts1s.len() < self.bin_count_exp as usize { + match tsa.last() { + Some(k) => Some(k.clone()), + None => Err(Error::with_msg("partial_content but no bin in result"))?, + } + } else { + None + }; + let ret = MinMaxAvgBinsCollectedResult:: { + ts_bin_edges: tsa, + counts: vec![], + mins: self.vals.mins, + maxs: self.vals.maxs, + avgs: self.vals.avgs, + finalised_range: self.range_complete, + missing_bins: self.bin_count_exp - bin_count, + continue_at, + }; + Ok(ret) + } +} + +impl Collectable for MinMaxAvgBins +where + NTY: NumOps + Serialize, +{ + type Collector = MinMaxAvgBinsCollector; + + fn new_collector(bin_count_exp: u32) -> Self::Collector { + Self::Collector::new(bin_count_exp) + } +} + pub struct MinMaxAvgAggregator { range: NanoRange, count: u32, diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 93a0854..68a53ea 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -331,7 +331,7 @@ async fn binned(req: Request, node_config: &NodeConfigCached) -> Result Result, Error> { let ret = match disk::binned::binned_bytes_for_http(&query, node_config).await { - Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("desc-BINNED")))?, + Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("binned_binary")))?, Err(e) => { if query.report_error() { response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))? @@ -346,10 +346,7 @@ async fn binned_binary(query: BinnedQuery, node_config: &NodeConfigCached) -> Re async fn binned_json(query: BinnedQuery, node_config: &NodeConfigCached) -> Result, Error> { let ret = match disk::binned::binned_json(node_config, &query).await { - Ok(val) => { - let body = serde_json::to_string(&val)?; - response(StatusCode::OK).body(Body::from(body)) - }?, + Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("binned_json")))?, Err(e) => { if query.report_error() { response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))?