Collected json result test passes

This commit is contained in:
Dominik Werder
2021-06-09 20:03:58 +02:00
parent 1df36f3aeb
commit 28b4bb3e04
4 changed files with 252 additions and 78 deletions

View File

@@ -9,8 +9,8 @@ use crate::agg::enp::{Identity, WaveXBinner, XBinnedScalarEvents};
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::{
Appendable, Collectable, Collectable2, Collected, CollectionSpec2, CollectionSpecMaker2, StreamItem, ToJsonBytes,
ToJsonResult,
Appendable, Collectable, Collectable2, Collected, CollectionSpec2, CollectionSpecMaker2, Collector, StreamItem,
ToJsonBytes, ToJsonResult,
};
use crate::agg::{Fits, FitsInside};
use crate::binned::binnedfrompbv::BinnedFromPreBinned;
@@ -89,20 +89,6 @@ impl Collected for MinMaxAvgScalarBinBatchCollected {
}
}
impl Collectable for MinMaxAvgScalarBinBatch {
type Collected = MinMaxAvgScalarBinBatchCollected;
fn append_to(&self, collected: &mut Self::Collected) {
let batch = &mut collected.batch;
batch.ts1s.extend_from_slice(&self.ts1s);
batch.ts2s.extend_from_slice(&self.ts2s);
batch.counts.extend_from_slice(&self.counts);
batch.mins.extend_from_slice(&self.mins);
batch.maxs.extend_from_slice(&self.maxs);
batch.avgs.extend_from_slice(&self.avgs);
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct MinMaxAvgScalarBinBatchCollectedJsonResult {
#[serde(rename = "tsBinEdges")]
@@ -293,9 +279,8 @@ where
FrameType + Framable + DeserializeOwned,
Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>: ToJsonResult + Framable,
{
let _ = ppp;
let res = make_num_pipeline_nty_end_evs_enp_stat::<_, _, _, ENP>(event_value_shape, query, node_config)?;
let s = PPP::convert(res.stream);
let s = ppp.convert(res.stream, res.bin_count);
let ret = BinnedResponseDyn {
stream: Box::pin(s),
bin_count: res.bin_count,
@@ -411,41 +396,135 @@ where
}
}
pub trait PipelinePostProcessA {
fn unused(&self);
}
pub trait PipelinePostProcessA {}
struct Ppp1 {}
struct MakeBoxedItems {}
impl PipelinePostProcessA for Ppp1 {
fn unused(&self) {
todo!()
}
}
impl PipelinePostProcessA for MakeBoxedItems {}
pub trait PipelinePostProcessB<T> {
fn convert(
&self,
inp: Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>,
bin_count_exp: u32,
) -> Pin<Box<dyn Stream<Item = Box<dyn BinnedResponseItem>> + Send>>;
}
impl<NTY> PipelinePostProcessB<MinMaxAvgBins<NTY>> for Ppp1
impl<NTY> PipelinePostProcessB<MinMaxAvgBins<NTY>> for MakeBoxedItems
where
NTY: NumOps,
{
fn convert(
&self,
inp: Pin<Box<dyn Stream<Item = Sitemty<MinMaxAvgBins<NTY>>> + Send>>,
bin_count_exp: u32,
) -> Pin<Box<dyn Stream<Item = Box<dyn BinnedResponseItem>> + Send>> {
let s = StreamExt::map(inp, |item| Box::new(item) as Box<dyn BinnedResponseItem>);
Box::pin(s)
}
}
struct CollectForJson {}
impl CollectForJson {
pub fn new() -> Self {
Self {}
}
}
impl PipelinePostProcessA for CollectForJson {}
pub struct JsonCollector {
fut: Pin<Box<dyn Future<Output = Result<serde_json::Value, Error>> + Send>>,
done: bool,
}
impl JsonCollector {
pub fn new<NTY>(inp: Pin<Box<dyn Stream<Item = Sitemty<MinMaxAvgBins<NTY>>> + Send>>, bin_count_exp: u32) -> Self
where
NTY: NumOps + Serialize + 'static,
{
let fut = collect_all(inp, bin_count_exp);
let fut = Box::pin(fut);
Self { fut, done: false }
}
}
impl Framable for Sitemty<serde_json::Value> {
fn make_frame(&self) -> Result<BytesMut, Error> {
panic!()
}
}
impl ToJsonBytes for serde_json::Value {
fn to_json_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(serde_json::to_vec(self)?)
}
}
impl ToJsonResult for Sitemty<serde_json::Value> {
fn to_json_result(&self) -> Result<Box<dyn ToJsonBytes>, Error> {
match self {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::Data(item) => Ok(Box::new(item.clone())),
RangeCompletableItem::RangeComplete => Err(Error::with_msg("logic")),
},
StreamItem::Log(_) => Err(Error::with_msg("logic")),
StreamItem::Stats(_) => Err(Error::with_msg("logic")),
},
Err(_) => Err(Error::with_msg("logic")),
}
}
}
impl Stream for JsonCollector {
type Item = Box<dyn BinnedResponseItem>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break if self.done {
Ready(None)
} else {
match self.fut.poll_unpin(cx) {
Ready(Ok(item)) => {
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)));
let item = Box::new(item) as Box<dyn BinnedResponseItem>;
self.done = true;
Ready(Some(item))
}
Ready(Err(e)) => {
let item = Err::<StreamItem<RangeCompletableItem<serde_json::Value>>, _>(e);
let item = Box::new(item) as Box<dyn BinnedResponseItem>;
Ready(Some(item))
}
Pending => Pending,
}
};
}
}
}
impl<NTY> PipelinePostProcessB<MinMaxAvgBins<NTY>> for CollectForJson
where
NTY: NumOps,
{
fn convert(
&self,
inp: Pin<Box<dyn Stream<Item = Sitemty<MinMaxAvgBins<NTY>>> + Send>>,
bin_count_exp: u32,
) -> Pin<Box<dyn Stream<Item = Box<dyn BinnedResponseItem>> + Send>> {
let s = JsonCollector::new(inp, bin_count_exp);
Box::pin(s)
}
}
pub async fn binned_bytes_for_http(
query: &BinnedQuery,
node_config: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>, Error> {
let pl = make_num_pipeline::<Ppp1>(query, Ppp1 {}, node_config).await?;
let pl = make_num_pipeline::<MakeBoxedItems>(query, MakeBoxedItems {}, node_config).await?;
let ret = pl.stream.map(|item| {
let fr = item.make_frame();
let fr = fr?;
@@ -527,13 +606,13 @@ impl Serialize for IsoDateTime {
}
}
pub async fn collect_all<S>(stream: S, collection_spec: Box<dyn CollectionSpec2>) -> Result<serde_json::Value, Error>
pub async fn collect_all<T, S>(stream: S, bin_count_exp: u32) -> Result<serde_json::Value, Error>
where
S: Stream<Item = Sitemty<Box<dyn Any>>> + Unpin,
S: Stream<Item = Sitemty<T>> + Unpin,
T: Collectable,
{
let deadline = tokio::time::Instant::now() + Duration::from_millis(1000);
//let mut main_item = <T as Collectable>::Collected::new(bin_count_exp);
let mut main_item = collection_spec.empty();
let mut collector = <T as Collectable>::new_collector(bin_count_exp);
let mut i1 = 0;
let mut stream = stream;
loop {
@@ -544,13 +623,7 @@ where
match tokio::time::timeout_at(deadline, stream.next()).await {
Ok(k) => k,
Err(_) => {
// TODO
// TODO
// TODO
//main_item.timed_out(true);
collector.set_timed_out();
None
}
}
@@ -562,10 +635,11 @@ where
StreamItem::Log(_) => {}
StreamItem::Stats(_) => {}
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {}
RangeCompletableItem::RangeComplete => {
collector.set_range_complete();
}
RangeCompletableItem::Data(item) => {
main_item.append(&item);
//item.append_to(&mut main_item);
collector.ingest(&item);
i1 += 1;
}
},
@@ -579,31 +653,21 @@ where
None => break,
}
}
Ok(err::todoval())
}
// TODO remove
#[derive(Debug, Serialize, Deserialize)]
pub struct UnusedBinnedJsonResult {
ts_bin_edges: Vec<IsoDateTime>,
counts: Vec<u64>,
#[serde(skip_serializing_if = "Bool::is_false")]
finalised_range: bool,
#[serde(skip_serializing_if = "Zero::is_zero")]
missing_bins: u64,
#[serde(skip_serializing_if = "Option::is_none")]
continue_at: Option<IsoDateTime>,
}
pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result<serde_json::Value, Error> {
err::todoval()
/*
let pl = make_num_pipeline_for_entry(query, node_config).await?;
let collected = collect_all(pl.stream, pl.bin_count).await?;
let ret = ToJsonResult::to_json_result(&collected)?;
let ret = serde_json::to_value(ret)?;
let ret = serde_json::to_value(collector.result()?)?;
Ok(ret)
*/
}
pub async fn binned_json(
node_config: &NodeConfigCached,
query: &BinnedQuery,
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>, Error> {
let pl = make_num_pipeline(query, CollectForJson::new(), node_config).await?;
let ret = pl.stream.map(|item| {
let fr = item.to_json_result()?;
let buf = fr.to_json_bytes()?;
Ok(Bytes::from(buf))
});
Ok(Box::pin(ret))
}
pub struct ReadPbv<T>
@@ -767,7 +831,6 @@ pub trait TBinnedBins:
+ Send
+ Serialize
+ DeserializeOwned
+ Collectable
+ ReadableFromFile
+ FilterFittingInside
+ AggregatableTdim2
@@ -1063,6 +1126,105 @@ where
}
}
#[derive(Serialize)]
pub struct MinMaxAvgBinsCollectedResult<NTY> {
ts_bin_edges: Vec<IsoDateTime>,
counts: Vec<u64>,
mins: Vec<Option<NTY>>,
maxs: Vec<Option<NTY>>,
avgs: Vec<Option<f32>>,
#[serde(skip_serializing_if = "Bool::is_false", rename = "finalisedRange")]
finalised_range: bool,
#[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")]
missing_bins: u32,
#[serde(skip_serializing_if = "Option::is_none", rename = "continueAt")]
continue_at: Option<IsoDateTime>,
}
pub struct MinMaxAvgBinsCollector<NTY> {
bin_count_exp: u32,
timed_out: bool,
range_complete: bool,
vals: MinMaxAvgBins<NTY>,
_m1: PhantomData<NTY>,
}
impl<NTY> MinMaxAvgBinsCollector<NTY> {
pub fn new(bin_count_exp: u32) -> Self {
Self {
bin_count_exp,
timed_out: false,
range_complete: false,
vals: MinMaxAvgBins::<NTY>::empty(),
_m1: PhantomData,
}
}
}
impl<NTY> Collector for MinMaxAvgBinsCollector<NTY>
where
NTY: NumOps + Serialize,
{
type Input = MinMaxAvgBins<NTY>;
type Output = MinMaxAvgBinsCollectedResult<NTY>;
fn ingest(&mut self, src: &Self::Input) {
Appendable::append(&mut self.vals, src);
}
fn set_range_complete(&mut self) {
self.range_complete = true;
}
fn set_timed_out(&mut self) {
self.timed_out = true;
}
fn result(self) -> Result<Self::Output, Error> {
let bin_count = self.vals.ts1s.len() as u32;
let mut tsa: Vec<_> = self
.vals
.ts1s
.iter()
.map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64)))
.collect();
if let Some(&z) = self.vals.ts2s.last() {
tsa.push(IsoDateTime(Utc.timestamp_nanos(z as i64)));
}
let tsa = tsa;
let continue_at = if self.vals.ts1s.len() < self.bin_count_exp as usize {
match tsa.last() {
Some(k) => Some(k.clone()),
None => Err(Error::with_msg("partial_content but no bin in result"))?,
}
} else {
None
};
let ret = MinMaxAvgBinsCollectedResult::<NTY> {
ts_bin_edges: tsa,
counts: vec![],
mins: self.vals.mins,
maxs: self.vals.maxs,
avgs: self.vals.avgs,
finalised_range: self.range_complete,
missing_bins: self.bin_count_exp - bin_count,
continue_at,
};
Ok(ret)
}
}
impl<NTY> Collectable for MinMaxAvgBins<NTY>
where
NTY: NumOps + Serialize,
{
type Collector = MinMaxAvgBinsCollector<NTY>;
fn new_collector(bin_count_exp: u32) -> Self::Collector {
Self::Collector::new(bin_count_exp)
}
}
pub struct MinMaxAvgAggregator<NTY> {
range: NanoRange,
count: u32,