use crate::agg::streams::StreamItem; use crate::agg::AggregatableXdim1Bin; use crate::binned::{BinnedStreamKind, RangeCompletableItem}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; use std::pin::Pin; use std::task::{Context, Poll}; pub trait IntoBinnedXBins1 where SK: BinnedStreamKind, Self: Stream>, Error>> + Unpin, I: AggregatableXdim1Bin, { type StreamOut; fn into_binned_x_bins_1(self) -> Self::StreamOut; } impl IntoBinnedXBins1 for S where SK: BinnedStreamKind, S: Stream>, Error>> + Unpin, I: AggregatableXdim1Bin, { type StreamOut = IntoBinnedXBins1DefaultStream; fn into_binned_x_bins_1(self) -> Self::StreamOut { IntoBinnedXBins1DefaultStream { inp: self, _marker: std::marker::PhantomData::default(), } } } pub struct IntoBinnedXBins1DefaultStream where SK: BinnedStreamKind, S: Stream>, Error>> + Unpin, I: AggregatableXdim1Bin, { inp: S, _marker: std::marker::PhantomData, } impl Stream for IntoBinnedXBins1DefaultStream where SK: BinnedStreamKind, S: Stream>, Error>> + Unpin, I: AggregatableXdim1Bin, { type Item = Result>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(k))) => match k { StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), StreamItem::DataItem(item) => match item { RangeCompletableItem::RangeComplete => { Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) } RangeCompletableItem::Data(item) => Ready(Some(Ok(StreamItem::DataItem( RangeCompletableItem::Data(item.into_agg()), )))), }, }, Ready(Some(Err(e))) => Ready(Some(Err(e))), Ready(None) => Ready(None), Pending => Pending, } } }