WIP on lots of changes
This commit is contained in:
@@ -4,9 +4,9 @@ Aggregation and binning support.
|
||||
|
||||
use super::eventchunker::EventFull;
|
||||
use crate::agg::binnedt::AggregatableTdim;
|
||||
use crate::agg::eventbatch::{MinMaxAvgScalarEventBatch, MinMaxAvgScalarEventBatchStreamItem};
|
||||
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
|
||||
use crate::agg::streams::StreamItem;
|
||||
use crate::eventchunker::EventChunkerItem;
|
||||
use crate::binned::RangeCompletableItem;
|
||||
use bytes::BytesMut;
|
||||
use err::Error;
|
||||
use futures_core::Stream;
|
||||
@@ -19,6 +19,7 @@ use std::task::{Context, Poll};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
pub mod binnedt;
|
||||
pub mod binnedt2;
|
||||
pub mod binnedx;
|
||||
pub mod eventbatch;
|
||||
pub mod scalarbinbatch;
|
||||
@@ -502,17 +503,11 @@ impl<S> Dim1F32Stream<S> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Dim1F32StreamItem {
|
||||
Values(ValuesDim1),
|
||||
RangeComplete,
|
||||
}
|
||||
|
||||
impl<S> Stream for Dim1F32Stream<S>
|
||||
where
|
||||
S: Stream<Item = Result<StreamItem<EventChunkerItem>, Error>> + Unpin,
|
||||
S: Stream<Item = Result<StreamItem<RangeCompletableItem<EventFull>>, Error>> + Unpin,
|
||||
{
|
||||
type Item = Result<StreamItem<Dim1F32StreamItem>, Error>;
|
||||
type Item = Result<StreamItem<RangeCompletableItem<ValuesDim1>>, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
@@ -528,19 +523,20 @@ where
|
||||
let inst1 = Instant::now();
|
||||
let u = match k {
|
||||
StreamItem::DataItem(item) => match item {
|
||||
EventChunkerItem::Events(events) => match self.process_event_data(&events) {
|
||||
Ok(k) => {
|
||||
let ret = Dim1F32StreamItem::Values(k);
|
||||
Ready(Some(Ok(StreamItem::DataItem(ret))))
|
||||
RangeCompletableItem::RangeComplete => {
|
||||
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
|
||||
}
|
||||
RangeCompletableItem::Data(item) => match self.process_event_data(&item) {
|
||||
Ok(item) => {
|
||||
let ret = RangeCompletableItem::Data(item);
|
||||
let ret = StreamItem::DataItem(ret);
|
||||
Ready(Some(Ok(ret)))
|
||||
}
|
||||
Err(e) => {
|
||||
self.errored = true;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
},
|
||||
EventChunkerItem::RangeComplete => {
|
||||
Ready(Some(Ok(StreamItem::DataItem(Dim1F32StreamItem::RangeComplete))))
|
||||
}
|
||||
},
|
||||
StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))),
|
||||
StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))),
|
||||
@@ -566,25 +562,14 @@ where
|
||||
pub trait IntoDim1F32Stream {
|
||||
fn into_dim_1_f32_stream(self) -> Dim1F32Stream<Self>
|
||||
where
|
||||
Self: Stream<Item = Result<StreamItem<EventChunkerItem>, Error>> + Sized;
|
||||
Self: Stream<Item = Result<StreamItem<RangeCompletableItem<EventFull>>, Error>> + Sized;
|
||||
}
|
||||
|
||||
impl<T> IntoDim1F32Stream for T
|
||||
where
|
||||
T: Stream<Item = Result<StreamItem<EventChunkerItem>, Error>>,
|
||||
T: Stream<Item = Result<StreamItem<RangeCompletableItem<EventFull>>, Error>>,
|
||||
{
|
||||
fn into_dim_1_f32_stream(self) -> Dim1F32Stream<T> {
|
||||
Dim1F32Stream::new(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl AggregatableXdim1Bin for Dim1F32StreamItem {
|
||||
type Output = MinMaxAvgScalarEventBatchStreamItem;
|
||||
|
||||
fn into_agg(self) -> Self::Output {
|
||||
match self {
|
||||
Dim1F32StreamItem::Values(vals) => MinMaxAvgScalarEventBatchStreamItem::Values(vals.into_agg()),
|
||||
Dim1F32StreamItem::RangeComplete => MinMaxAvgScalarEventBatchStreamItem::RangeComplete,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,8 +23,6 @@ pub trait AggregatableTdim: Sized {
|
||||
type Output: AggregatableXdim1Bin + AggregatableTdim;
|
||||
type Aggregator: AggregatorTdim<InputValue = Self>;
|
||||
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator;
|
||||
fn is_range_complete(&self) -> bool;
|
||||
fn make_range_complete_item() -> Option<Self>;
|
||||
}
|
||||
|
||||
pub trait IntoBinnedT {
|
||||
|
||||
215
disk/src/agg/binnedt2.rs
Normal file
215
disk/src/agg/binnedt2.rs
Normal file
@@ -0,0 +1,215 @@
|
||||
use crate::agg::streams::StreamItem;
|
||||
use crate::agg::AggregatableXdim1Bin;
|
||||
use err::Error;
|
||||
use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::log::*;
|
||||
use netpod::BinnedRange;
|
||||
use std::collections::VecDeque;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
pub trait AggregatorTdim2: Sized + Unpin {
|
||||
type InputValue;
|
||||
fn ends_before(&self, inp: &Self::InputValue) -> bool;
|
||||
fn ends_after(&self, inp: &Self::InputValue) -> bool;
|
||||
fn starts_after(&self, inp: &Self::InputValue) -> bool;
|
||||
fn ingest(&mut self, inp: &mut Self::InputValue);
|
||||
fn result(self) -> Vec<Self::InputValue>;
|
||||
}
|
||||
|
||||
pub trait AggregatableTdim2: Sized {
|
||||
type Aggregator: AggregatorTdim2<InputValue = Self>;
|
||||
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator;
|
||||
fn is_range_complete(&self) -> bool;
|
||||
fn make_range_complete_item() -> Option<Self>;
|
||||
}
|
||||
|
||||
pub trait IntoBinnedT {
|
||||
type StreamOut: Stream;
|
||||
fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut;
|
||||
}
|
||||
|
||||
impl<S, I> IntoBinnedT for S
|
||||
where
|
||||
S: Stream<Item = Result<StreamItem<I>, Error>> + Unpin,
|
||||
I: AggregatableTdim2 + Unpin,
|
||||
I::Aggregator: Unpin,
|
||||
{
|
||||
type StreamOut = IntoBinnedTDefaultStream<S, I>;
|
||||
|
||||
fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut {
|
||||
IntoBinnedTDefaultStream::new(self, spec)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct IntoBinnedTDefaultStream<S, I>
|
||||
where
|
||||
S: Stream<Item = Result<StreamItem<I>, Error>>,
|
||||
I: AggregatableTdim2,
|
||||
{
|
||||
inp: S,
|
||||
aggtor: Option<I::Aggregator>,
|
||||
spec: BinnedRange,
|
||||
curbin: u32,
|
||||
inp_completed: bool,
|
||||
all_bins_emitted: bool,
|
||||
range_complete_observed: bool,
|
||||
range_complete_emitted: bool,
|
||||
left: Option<Poll<Option<Result<StreamItem<I>, Error>>>>,
|
||||
errored: bool,
|
||||
completed: bool,
|
||||
tmp_agg_results: VecDeque<I>,
|
||||
}
|
||||
|
||||
impl<S, I> IntoBinnedTDefaultStream<S, I>
|
||||
where
|
||||
S: Stream<Item = Result<StreamItem<I>, Error>> + Unpin,
|
||||
I: AggregatableTdim2,
|
||||
{
|
||||
pub fn new(inp: S, spec: BinnedRange) -> Self {
|
||||
let range = spec.get_range(0);
|
||||
Self {
|
||||
inp,
|
||||
aggtor: Some(I::aggregator_new_static(range.beg, range.end)),
|
||||
spec,
|
||||
curbin: 0,
|
||||
inp_completed: false,
|
||||
all_bins_emitted: false,
|
||||
range_complete_observed: false,
|
||||
range_complete_emitted: false,
|
||||
left: None,
|
||||
errored: false,
|
||||
completed: false,
|
||||
tmp_agg_results: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn cur(&mut self, cx: &mut Context) -> Poll<Option<Result<StreamItem<I>, Error>>> {
|
||||
if let Some(cur) = self.left.take() {
|
||||
cur
|
||||
} else if self.inp_completed {
|
||||
Poll::Ready(None)
|
||||
} else {
|
||||
let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll");
|
||||
inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx))
|
||||
}
|
||||
}
|
||||
|
||||
fn cycle_current_bin(&mut self) {
|
||||
self.curbin += 1;
|
||||
let range = self.spec.get_range(self.curbin);
|
||||
let ret = self
|
||||
.aggtor
|
||||
.replace(I::aggregator_new_static(range.beg, range.end))
|
||||
// TODO handle None case, or remove Option if Agg is always present
|
||||
.unwrap()
|
||||
.result();
|
||||
self.tmp_agg_results = ret.into();
|
||||
if self.curbin >= self.spec.count as u32 {
|
||||
self.all_bins_emitted = true;
|
||||
}
|
||||
}
|
||||
|
||||
fn handle(
|
||||
&mut self,
|
||||
cur: Poll<Option<Result<StreamItem<I>, Error>>>,
|
||||
) -> Option<Poll<Option<Result<StreamItem<I>, Error>>>> {
|
||||
use Poll::*;
|
||||
match cur {
|
||||
Ready(Some(Ok(item))) => match item {
|
||||
StreamItem::Log(item) => Some(Ready(Some(Ok(StreamItem::Log(item))))),
|
||||
StreamItem::Stats(item) => Some(Ready(Some(Ok(StreamItem::Stats(item))))),
|
||||
StreamItem::DataItem(item) => {
|
||||
if item.is_range_complete() {
|
||||
self.range_complete_observed = true;
|
||||
None
|
||||
} else if self.all_bins_emitted {
|
||||
// Just drop the item because we will not emit anymore data.
|
||||
// Could also at least gather some stats.
|
||||
None
|
||||
} else {
|
||||
let ag = self.aggtor.as_mut().unwrap();
|
||||
if ag.ends_before(&item) {
|
||||
None
|
||||
} else if ag.starts_after(&item) {
|
||||
self.left = Some(Ready(Some(Ok(StreamItem::DataItem(item)))));
|
||||
self.cycle_current_bin();
|
||||
// TODO cycle_current_bin enqueues the bin, can I return here instead?
|
||||
None
|
||||
} else {
|
||||
let mut item = item;
|
||||
ag.ingest(&mut item);
|
||||
let item = item;
|
||||
if ag.ends_after(&item) {
|
||||
self.left = Some(Ready(Some(Ok(StreamItem::DataItem(item)))));
|
||||
self.cycle_current_bin();
|
||||
}
|
||||
// TODO cycle_current_bin enqueues the bin, can I return here instead?
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Ready(Some(Err(e))) => {
|
||||
self.errored = true;
|
||||
Some(Ready(Some(Err(e))))
|
||||
}
|
||||
Ready(None) => {
|
||||
self.inp_completed = true;
|
||||
if self.all_bins_emitted {
|
||||
None
|
||||
} else {
|
||||
self.cycle_current_bin();
|
||||
// TODO cycle_current_bin enqueues the bin, can I return here instead?
|
||||
None
|
||||
}
|
||||
}
|
||||
Pending => Some(Pending),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, I> Stream for IntoBinnedTDefaultStream<S, I>
|
||||
where
|
||||
S: Stream<Item = Result<StreamItem<I>, Error>> + Unpin,
|
||||
I: AggregatableTdim2 + Unpin,
|
||||
I::Aggregator: Unpin,
|
||||
{
|
||||
type Item = Result<StreamItem<I>, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
'outer: loop {
|
||||
break if self.completed {
|
||||
panic!("IntoBinnedTDefaultStream poll_next on completed");
|
||||
} else if self.errored {
|
||||
self.completed = true;
|
||||
Ready(None)
|
||||
} else if let Some(item) = self.tmp_agg_results.pop_front() {
|
||||
Ready(Some(Ok(StreamItem::DataItem(item))))
|
||||
} else if self.range_complete_emitted {
|
||||
self.completed = true;
|
||||
Ready(None)
|
||||
} else if self.inp_completed && self.all_bins_emitted {
|
||||
self.range_complete_emitted = true;
|
||||
if self.range_complete_observed {
|
||||
if let Some(item) = I::make_range_complete_item() {
|
||||
Ready(Some(Ok(StreamItem::DataItem(item))))
|
||||
} else {
|
||||
warn!("IntoBinnedTDefaultStream should emit RangeComplete but it doesn't have one");
|
||||
continue 'outer;
|
||||
}
|
||||
} else {
|
||||
continue 'outer;
|
||||
}
|
||||
} else {
|
||||
let cur = self.cur(cx);
|
||||
match self.handle(cur) {
|
||||
Some(item) => item,
|
||||
None => continue 'outer,
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim};
|
||||
use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem};
|
||||
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
|
||||
use crate::agg::AggregatableXdim1Bin;
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use netpod::log::*;
|
||||
@@ -110,14 +110,6 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatch {
|
||||
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
|
||||
MinMaxAvgScalarEventBatchAggregator::new(ts1, ts2)
|
||||
}
|
||||
|
||||
fn is_range_complete(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn make_range_complete_item() -> Option<Self> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl MinMaxAvgScalarEventBatch {
|
||||
@@ -238,93 +230,3 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
|
||||
vec![v]
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum MinMaxAvgScalarEventBatchStreamItem {
|
||||
Values(MinMaxAvgScalarEventBatch),
|
||||
RangeComplete,
|
||||
}
|
||||
|
||||
impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatchStreamItem {
|
||||
type Output = MinMaxAvgScalarEventBatchStreamItem;
|
||||
|
||||
fn into_agg(self) -> Self::Output {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl AggregatableTdim for MinMaxAvgScalarEventBatchStreamItem {
|
||||
type Output = MinMaxAvgScalarBinBatchStreamItem;
|
||||
type Aggregator = MinMaxAvgScalarEventBatchStreamItemAggregator;
|
||||
|
||||
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
|
||||
//<Self as AggregatableTdim>::Aggregator::new(ts1, ts2)
|
||||
Self::Aggregator::new(ts1, ts2)
|
||||
}
|
||||
|
||||
fn is_range_complete(&self) -> bool {
|
||||
if let MinMaxAvgScalarEventBatchStreamItem::RangeComplete = self {
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn make_range_complete_item() -> Option<Self> {
|
||||
Some(MinMaxAvgScalarEventBatchStreamItem::RangeComplete)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MinMaxAvgScalarEventBatchStreamItemAggregator {
|
||||
agg: MinMaxAvgScalarEventBatchAggregator,
|
||||
}
|
||||
|
||||
impl MinMaxAvgScalarEventBatchStreamItemAggregator {
|
||||
pub fn new(ts1: u64, ts2: u64) -> Self {
|
||||
let agg = <MinMaxAvgScalarEventBatch as AggregatableTdim>::aggregator_new_static(ts1, ts2);
|
||||
Self { agg }
|
||||
}
|
||||
}
|
||||
|
||||
impl AggregatorTdim for MinMaxAvgScalarEventBatchStreamItemAggregator {
|
||||
type InputValue = MinMaxAvgScalarEventBatchStreamItem;
|
||||
type OutputValue = MinMaxAvgScalarBinBatchStreamItem;
|
||||
|
||||
fn ends_before(&self, inp: &Self::InputValue) -> bool {
|
||||
match inp {
|
||||
MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ends_before(vals),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn ends_after(&self, inp: &Self::InputValue) -> bool {
|
||||
match inp {
|
||||
MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ends_after(vals),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn starts_after(&self, inp: &Self::InputValue) -> bool {
|
||||
match inp {
|
||||
MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.starts_after(vals),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn ingest(&mut self, inp: &mut Self::InputValue) {
|
||||
match inp {
|
||||
MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ingest(vals),
|
||||
MinMaxAvgScalarEventBatchStreamItem::RangeComplete => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn result(self) -> Vec<Self::OutputValue> {
|
||||
let ret: Vec<_> = self
|
||||
.agg
|
||||
.result()
|
||||
.into_iter()
|
||||
.map(MinMaxAvgScalarBinBatchStreamItem::Values)
|
||||
.collect();
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim};
|
||||
use crate::agg::streams::Bins;
|
||||
use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside};
|
||||
use crate::binned::MakeBytesFrame;
|
||||
use crate::binned::{MakeBytesFrame, RangeCompletableItem};
|
||||
use crate::frame::makeframe::make_frame;
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use err::Error;
|
||||
@@ -199,14 +199,6 @@ impl AggregatableTdim for MinMaxAvgScalarBinBatch {
|
||||
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
|
||||
MinMaxAvgScalarBinBatchAggregator::new(ts1, ts2)
|
||||
}
|
||||
|
||||
fn is_range_complete(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn make_range_complete_item() -> Option<Self> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl Bins for MinMaxAvgScalarBinBatch {
|
||||
@@ -310,97 +302,8 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum MinMaxAvgScalarBinBatchStreamItem {
|
||||
Values(MinMaxAvgScalarBinBatch),
|
||||
RangeComplete,
|
||||
}
|
||||
|
||||
impl AggregatableTdim for MinMaxAvgScalarBinBatchStreamItem {
|
||||
type Output = MinMaxAvgScalarBinBatchStreamItem;
|
||||
type Aggregator = MinMaxAvgScalarBinBatchStreamItemAggregator;
|
||||
|
||||
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
|
||||
Self::Aggregator::new(ts1, ts2)
|
||||
}
|
||||
|
||||
fn is_range_complete(&self) -> bool {
|
||||
if let MinMaxAvgScalarBinBatchStreamItem::RangeComplete = self {
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn make_range_complete_item() -> Option<Self> {
|
||||
Some(MinMaxAvgScalarBinBatchStreamItem::RangeComplete)
|
||||
}
|
||||
}
|
||||
|
||||
impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatchStreamItem {
|
||||
type Output = MinMaxAvgScalarBinBatchStreamItem;
|
||||
|
||||
fn into_agg(self) -> Self::Output {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl MakeBytesFrame for Result<MinMaxAvgScalarBinBatchStreamItem, Error> {
|
||||
impl MakeBytesFrame for Result<RangeCompletableItem<MinMaxAvgScalarBinBatch>, Error> {
|
||||
fn make_bytes_frame(&self) -> Result<Bytes, Error> {
|
||||
Ok(make_frame(self)?.freeze())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MinMaxAvgScalarBinBatchStreamItemAggregator {
|
||||
agg: MinMaxAvgScalarBinBatchAggregator,
|
||||
}
|
||||
|
||||
impl MinMaxAvgScalarBinBatchStreamItemAggregator {
|
||||
pub fn new(ts1: u64, ts2: u64) -> Self {
|
||||
let agg = <MinMaxAvgScalarBinBatch as AggregatableTdim>::aggregator_new_static(ts1, ts2);
|
||||
Self { agg }
|
||||
}
|
||||
}
|
||||
|
||||
impl AggregatorTdim for MinMaxAvgScalarBinBatchStreamItemAggregator {
|
||||
type InputValue = MinMaxAvgScalarBinBatchStreamItem;
|
||||
type OutputValue = MinMaxAvgScalarBinBatchStreamItem;
|
||||
|
||||
fn ends_before(&self, inp: &Self::InputValue) -> bool {
|
||||
match inp {
|
||||
MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ends_before(vals),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn ends_after(&self, inp: &Self::InputValue) -> bool {
|
||||
match inp {
|
||||
MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ends_after(vals),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn starts_after(&self, inp: &Self::InputValue) -> bool {
|
||||
match inp {
|
||||
MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.starts_after(vals),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn ingest(&mut self, inp: &mut Self::InputValue) {
|
||||
match inp {
|
||||
MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ingest(vals),
|
||||
MinMaxAvgScalarBinBatchStreamItem::RangeComplete => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn result(self) -> Vec<Self::OutputValue> {
|
||||
let ret: Vec<_> = self
|
||||
.agg
|
||||
.result()
|
||||
.into_iter()
|
||||
.map(MinMaxAvgScalarBinBatchStreamItem::Values)
|
||||
.collect();
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
@@ -129,20 +129,4 @@ where
|
||||
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
|
||||
Self::Aggregator::new(ts1, ts2)
|
||||
}
|
||||
|
||||
fn is_range_complete(&self) -> bool {
|
||||
match self {
|
||||
Self::DataItem(item) => item.is_range_complete(),
|
||||
Self::Log(_) => false,
|
||||
Self::Stats(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO refactor: is this necessary to have on the trait?
|
||||
fn make_range_complete_item() -> Option<Self> {
|
||||
match <T as AggregatableTdim>::make_range_complete_item() {
|
||||
Some(k) => Some(Self::DataItem(k)),
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim, IntoBinnedT};
|
||||
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
|
||||
use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchAggregator};
|
||||
use crate::agg::streams::{Collectable, Collected, StreamItem, ToJsonResult};
|
||||
use crate::agg::{AggregatableXdim1Bin, FitsInside};
|
||||
use crate::binned::scalar::{adapter_to_stream_item, binned_stream};
|
||||
use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside};
|
||||
use crate::binned::scalar::binned_stream;
|
||||
use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BinnedStream};
|
||||
use crate::cache::pbvfs::PreBinnedScalarItem;
|
||||
use crate::cache::{BinnedQuery, MergedFromRemotes};
|
||||
@@ -19,10 +20,14 @@ use netpod::{
|
||||
AggKind, BinnedRange, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange,
|
||||
};
|
||||
use num_traits::Zero;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::{AsyncRead, ReadBuf};
|
||||
|
||||
pub mod scalar;
|
||||
|
||||
@@ -31,12 +36,6 @@ pub struct BinnedStreamRes<I> {
|
||||
pub range: BinnedRange,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum BinnedScalarStreamItem {
|
||||
Values(MinMaxAvgScalarBinBatch),
|
||||
RangeComplete,
|
||||
}
|
||||
|
||||
pub struct MinMaxAvgScalarBinBatchCollected {
|
||||
batch: MinMaxAvgScalarBinBatch,
|
||||
timed_out: bool,
|
||||
@@ -55,22 +54,6 @@ impl MinMaxAvgScalarBinBatchCollected {
|
||||
}
|
||||
}
|
||||
|
||||
impl Collectable for BinnedScalarStreamItem {
|
||||
type Collected = MinMaxAvgScalarBinBatchCollected;
|
||||
|
||||
fn append_to(&mut self, collected: &mut Self::Collected) {
|
||||
use BinnedScalarStreamItem::*;
|
||||
match self {
|
||||
Values(item) => {
|
||||
append_to_min_max_avg_scalar_bin_batch(&mut collected.batch, item);
|
||||
}
|
||||
RangeComplete => {
|
||||
// TODO use some other batch type in order to raise the range complete flag.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn append_to_min_max_avg_scalar_bin_batch(batch: &mut MinMaxAvgScalarBinBatch, item: &mut MinMaxAvgScalarBinBatch) {
|
||||
batch.ts1s.append(&mut item.ts1s);
|
||||
batch.ts2s.append(&mut item.ts2s);
|
||||
@@ -134,98 +117,12 @@ impl ToJsonResult for MinMaxAvgScalarBinBatchCollected {
|
||||
}
|
||||
}
|
||||
|
||||
impl MakeBytesFrame for Result<StreamItem<BinnedScalarStreamItem>, Error> {
|
||||
impl MakeBytesFrame for Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarBinBatch>>, Error> {
|
||||
fn make_bytes_frame(&self) -> Result<Bytes, Error> {
|
||||
Ok(make_frame(self)?.freeze())
|
||||
}
|
||||
}
|
||||
|
||||
impl AggregatableXdim1Bin for BinnedScalarStreamItem {
|
||||
// TODO does this already include all cases?
|
||||
type Output = BinnedScalarStreamItem;
|
||||
|
||||
fn into_agg(self) -> Self::Output {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BinnedScalarStreamItemAggregator {
|
||||
inner_agg: MinMaxAvgScalarBinBatchAggregator,
|
||||
}
|
||||
|
||||
impl BinnedScalarStreamItemAggregator {
|
||||
pub fn new(ts1: u64, ts2: u64) -> Self {
|
||||
Self {
|
||||
inner_agg: MinMaxAvgScalarBinBatchAggregator::new(ts1, ts2),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO this could be some generic impl for all wrapper that can carry some AggregatableTdim variant.
|
||||
impl AggregatorTdim for BinnedScalarStreamItemAggregator {
|
||||
type InputValue = BinnedScalarStreamItem;
|
||||
// TODO using the same type for the output, does this cover all cases?
|
||||
type OutputValue = BinnedScalarStreamItem;
|
||||
|
||||
fn ends_before(&self, inp: &Self::InputValue) -> bool {
|
||||
match inp {
|
||||
Self::OutputValue::Values(item) => self.inner_agg.ends_before(item),
|
||||
Self::OutputValue::RangeComplete => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn ends_after(&self, inp: &Self::InputValue) -> bool {
|
||||
match inp {
|
||||
Self::OutputValue::Values(item) => self.inner_agg.ends_after(item),
|
||||
Self::OutputValue::RangeComplete => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn starts_after(&self, inp: &Self::InputValue) -> bool {
|
||||
match inp {
|
||||
Self::OutputValue::Values(item) => self.inner_agg.starts_after(item),
|
||||
Self::OutputValue::RangeComplete => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn ingest(&mut self, inp: &mut Self::InputValue) {
|
||||
match inp {
|
||||
Self::OutputValue::Values(item) => self.inner_agg.ingest(item),
|
||||
Self::OutputValue::RangeComplete => (),
|
||||
}
|
||||
}
|
||||
|
||||
fn result(self) -> Vec<Self::OutputValue> {
|
||||
self.inner_agg
|
||||
.result()
|
||||
.into_iter()
|
||||
.map(|k| BinnedScalarStreamItem::Values(k))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl AggregatableTdim for BinnedScalarStreamItem {
|
||||
type Aggregator = BinnedScalarStreamItemAggregator;
|
||||
// TODO isn't this already defined in terms of the Aggregator?
|
||||
type Output = BinnedScalarStreamItem;
|
||||
|
||||
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
|
||||
BinnedScalarStreamItemAggregator::new(ts1, ts2)
|
||||
}
|
||||
|
||||
fn is_range_complete(&self) -> bool {
|
||||
if let Self::RangeComplete = self {
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn make_range_complete_item() -> Option<Self> {
|
||||
Some(Self::RangeComplete)
|
||||
}
|
||||
}
|
||||
|
||||
type BinnedStreamBox = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>;
|
||||
|
||||
pub async fn binned_bytes_for_http(
|
||||
@@ -394,9 +291,70 @@ pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) ->
|
||||
Ok(serde_json::to_value(ret)?)
|
||||
}
|
||||
|
||||
pub trait PreBinnedItem: Unpin {
|
||||
pub struct ReadPbv<PBI>
|
||||
where
|
||||
PBI: PreBinnedItem,
|
||||
{
|
||||
buf: Vec<u8>,
|
||||
file: Option<File>,
|
||||
_mark: std::marker::PhantomData<PBI>,
|
||||
}
|
||||
|
||||
impl<PBI> ReadPbv<PBI>
|
||||
where
|
||||
PBI: PreBinnedItem,
|
||||
{
|
||||
fn new(file: File) -> Self {
|
||||
Self {
|
||||
buf: vec![],
|
||||
file: Some(file),
|
||||
_mark: std::marker::PhantomData::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<PBI> Future for ReadPbv<PBI>
|
||||
where
|
||||
PBI: PreBinnedItem,
|
||||
{
|
||||
type Output = Result<StreamItem<PBI>, Error>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
use Poll::*;
|
||||
let mut buf = vec![];
|
||||
let mut dst = ReadBuf::new(&mut buf);
|
||||
let fp = self.file.as_mut().unwrap();
|
||||
let f = Pin::new(fp);
|
||||
match File::poll_read(f, cx, &mut dst) {
|
||||
Ready(res) => match res {
|
||||
Ok(_) => {
|
||||
if dst.filled().len() > 0 {
|
||||
self.buf.extend_from_slice(&mut buf);
|
||||
Pending
|
||||
} else {
|
||||
match PBI::from_buf(&mut self.buf) {
|
||||
Ok(item) => Ready(Ok(StreamItem::DataItem(item))),
|
||||
Err(e) => Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => Ready(Err(e.into())),
|
||||
},
|
||||
Pending => Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait PreBinnedItem: Send + Serialize + DeserializeOwned + Unpin {
|
||||
type BinnedStreamItem: AggregatableTdim + Unpin + Send;
|
||||
fn into_binned_stream_item(self, fit_range: NanoRange) -> Option<Self::BinnedStreamItem>;
|
||||
fn make_range_complete() -> Self;
|
||||
fn read_pbv(file: File) -> Result<ReadPbv<Self>, Error>
|
||||
where
|
||||
Self: Sized;
|
||||
fn from_buf(buf: &[u8]) -> Result<Self, Error>
|
||||
where
|
||||
Self: Sized;
|
||||
}
|
||||
|
||||
impl PreBinnedItem for PreBinnedScalarItem {
|
||||
@@ -405,24 +363,99 @@ impl PreBinnedItem for PreBinnedScalarItem {
|
||||
fn into_binned_stream_item(self, fit_range: NanoRange) -> Option<Self::BinnedStreamItem> {
|
||||
match self {
|
||||
Self::RangeComplete => Some(Self::BinnedStreamItem::RangeComplete),
|
||||
Self::Batch(item) => {
|
||||
use super::agg::{Fits, FitsInside};
|
||||
match item.fits_inside(fit_range) {
|
||||
Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => {
|
||||
Some(Self::BinnedStreamItem::Values(item))
|
||||
}
|
||||
_ => None,
|
||||
Self::Batch(item) => match item.fits_inside(fit_range) {
|
||||
Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => {
|
||||
Some(Self::BinnedStreamItem::Values(item))
|
||||
}
|
||||
}
|
||||
_ => None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn make_range_complete() -> Self {
|
||||
Self::RangeComplete
|
||||
}
|
||||
|
||||
fn read_pbv(file: File) -> Result<ReadPbv<Self>, Error> {
|
||||
Ok(ReadPbv::new(file))
|
||||
}
|
||||
|
||||
fn from_buf(buf: &[u8]) -> Result<Self, Error> {
|
||||
let dec: MinMaxAvgScalarBinBatch = serde_cbor::from_slice(&buf)?;
|
||||
Ok(Self::Batch(dec))
|
||||
}
|
||||
}
|
||||
|
||||
pub trait XBinnedEventsStreamItem:
|
||||
Send + Serialize + DeserializeOwned + Unpin + Collectable + Collected + AggregatableTdim
|
||||
{
|
||||
fn make_range_complete() -> Self;
|
||||
}
|
||||
|
||||
impl Collected for MinMaxAvgScalarEventBatchStreamItem {
|
||||
// TODO for this case we don't have an expected number of events. Factor out into another trait?
|
||||
fn new(bin_count_exp: u32) -> Self {
|
||||
// TODO factor out the concept of RangeComplete into another trait layer:
|
||||
Self::Values(MinMaxAvgScalarEventBatch::empty())
|
||||
}
|
||||
fn timed_out(&mut self, k: bool) {}
|
||||
}
|
||||
|
||||
impl Collectable for MinMaxAvgScalarEventBatchStreamItem {
|
||||
type Collected = MinMaxAvgScalarEventBatchStreamItem;
|
||||
fn append_to(&mut self, collected: &mut Self::Collected) {
|
||||
match self {
|
||||
Self::RangeComplete => {
|
||||
// TODO would be more nice to insert another type layer for RangeComplete concept.
|
||||
panic!()
|
||||
}
|
||||
Self::Values(this) => match collected {
|
||||
Self::RangeComplete => {}
|
||||
Self::Values(coll) => {
|
||||
coll.tss.append(&mut coll.tss);
|
||||
coll.mins.append(&mut coll.mins);
|
||||
coll.maxs.append(&mut coll.maxs);
|
||||
coll.avgs.append(&mut coll.avgs);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl XBinnedEventsStreamItem for MinMaxAvgScalarEventBatchStreamItem {
|
||||
fn make_range_complete() -> Self {
|
||||
Self::RangeComplete
|
||||
}
|
||||
}
|
||||
|
||||
pub trait TBinned: Send + Serialize + DeserializeOwned + Unpin + Collectable + AggregatableTdim<Output = Self> {}
|
||||
|
||||
impl TBinned for MinMaxAvgScalarBinBatchStreamItem {}
|
||||
|
||||
impl Collected for MinMaxAvgScalarBinBatch {
|
||||
fn new(bin_count_exp: u32) -> Self {
|
||||
MinMaxAvgScalarBinBatch::empty()
|
||||
}
|
||||
fn timed_out(&mut self, k: bool) {}
|
||||
}
|
||||
|
||||
impl Collectable for MinMaxAvgScalarBinBatch {
|
||||
type Collected = MinMaxAvgScalarBinBatch;
|
||||
fn append_to(&mut self, collected: &mut Self::Collected) {
|
||||
collected.ts1s.append(&mut self.ts1s);
|
||||
collected.ts2s.append(&mut self.ts2s);
|
||||
collected.counts.append(&mut self.counts);
|
||||
collected.mins.append(&mut self.mins);
|
||||
collected.maxs.append(&mut self.maxs);
|
||||
collected.avgs.append(&mut self.avgs);
|
||||
}
|
||||
}
|
||||
|
||||
pub trait BinnedStreamKind: Clone + Unpin + Send + Sync + 'static {
|
||||
type BinnedStreamItem: MakeBytesFrame;
|
||||
type BinnedStreamType: Stream + Send + 'static;
|
||||
type Dummy: Default + Unpin + Send;
|
||||
type PreBinnedItem: PreBinnedItem + Send;
|
||||
type PreBinnedItem: PreBinnedItem;
|
||||
type XBinnedEvents;
|
||||
|
||||
fn new_binned_from_prebinned(
|
||||
&self,
|
||||
@@ -439,6 +472,8 @@ pub trait BinnedStreamKind: Clone + Unpin + Send + Sync + 'static {
|
||||
range: BinnedRange,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<Self::BinnedStreamType, Error>;
|
||||
|
||||
fn pbv_handle_fut2_item(item: StreamItem<Self::PreBinnedItem>) -> Option<StreamItem<Self::PreBinnedItem>>;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -458,11 +493,16 @@ impl BinnedStreamKindWave {
|
||||
}
|
||||
}
|
||||
|
||||
pub enum RangeCompletableItem<T> {
|
||||
RangeComplete,
|
||||
Data(T),
|
||||
}
|
||||
|
||||
impl BinnedStreamKind for BinnedStreamKindScalar {
|
||||
type BinnedStreamItem = Result<StreamItem<BinnedScalarStreamItem>, Error>;
|
||||
type BinnedStreamType = BinnedStream<Self::BinnedStreamItem>;
|
||||
type Dummy = u32;
|
||||
type PreBinnedItem = PreBinnedScalarItem;
|
||||
type XBinnedEvents = MinMaxAvgScalarEventBatch;
|
||||
|
||||
fn new_binned_from_prebinned(
|
||||
&self,
|
||||
@@ -491,9 +531,36 @@ impl BinnedStreamKind for BinnedStreamKindScalar {
|
||||
range: BinnedRange,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<Self::BinnedStreamType, Error> {
|
||||
let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone())
|
||||
.into_binned_t(range.clone())
|
||||
.map(adapter_to_stream_item);
|
||||
let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone(), self.clone());
|
||||
// TODO use the binned2 instead
|
||||
let s = crate::agg::binnedt::IntoBinnedT::into_binned_t(s, range);
|
||||
let s = s.map(adapter_to_stream_item);
|
||||
Ok(BinnedStream::new(Box::pin(s))?)
|
||||
}
|
||||
|
||||
fn pbv_handle_fut2_item(item: StreamItem<Self::PreBinnedItem>) -> Option<StreamItem<Self::PreBinnedItem>> {
|
||||
// TODO make this code work in this context:
|
||||
// Do I need more parameters here?
|
||||
/*Ok(item) => match item {
|
||||
StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))),
|
||||
StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))),
|
||||
StreamItem::DataItem(item) => match item {
|
||||
PreBinnedScalarItem::RangeComplete => {
|
||||
self.range_complete_observed = true;
|
||||
None
|
||||
}
|
||||
PreBinnedScalarItem::Batch(batch) => {
|
||||
self.values.ts1s.extend(batch.ts1s.iter());
|
||||
self.values.ts2s.extend(batch.ts2s.iter());
|
||||
self.values.counts.extend(batch.counts.iter());
|
||||
self.values.mins.extend(batch.mins.iter());
|
||||
self.values.maxs.extend(batch.maxs.iter());
|
||||
self.values.avgs.extend(batch.avgs.iter());
|
||||
StreamItem::DataItem(PreBinnedScalarItem::Batch(batch))
|
||||
}
|
||||
},
|
||||
},*/
|
||||
err::todo();
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,35 +1,14 @@
|
||||
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem;
|
||||
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
|
||||
use crate::agg::streams::StreamItem;
|
||||
use crate::binned::{BinnedScalarStreamItem, BinnedStreamKind, BinnedStreamRes};
|
||||
use crate::binned::{BinnedStreamKind, BinnedStreamRes, RangeCompletableItem};
|
||||
use crate::binnedstream::BinnedStream;
|
||||
use crate::cache::BinnedQuery;
|
||||
use crate::raw::EventsQuery;
|
||||
use err::Error;
|
||||
use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::log::*;
|
||||
use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchRange};
|
||||
|
||||
pub fn adapter_to_stream_item(
|
||||
k: Result<StreamItem<MinMaxAvgScalarBinBatchStreamItem>, Error>,
|
||||
) -> Result<StreamItem<BinnedScalarStreamItem>, Error> {
|
||||
match k {
|
||||
Ok(k) => match k {
|
||||
StreamItem::Log(item) => Ok(StreamItem::Log(item)),
|
||||
StreamItem::Stats(item) => Ok(StreamItem::Stats(item)),
|
||||
StreamItem::DataItem(item) => match item {
|
||||
MinMaxAvgScalarBinBatchStreamItem::RangeComplete => {
|
||||
Ok(StreamItem::DataItem(BinnedScalarStreamItem::RangeComplete))
|
||||
}
|
||||
MinMaxAvgScalarBinBatchStreamItem::Values(item) => {
|
||||
Ok(StreamItem::DataItem(BinnedScalarStreamItem::Values(item)))
|
||||
}
|
||||
},
|
||||
},
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn binned_stream<BK>(
|
||||
node_config: &NodeConfigCached,
|
||||
query: &BinnedQuery,
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use crate::agg::binnedt::IntoBinnedT;
|
||||
use crate::agg::streams::StreamItem;
|
||||
use crate::binned::{BinnedScalarStreamItem, BinnedStreamKind, PreBinnedItem};
|
||||
use crate::cache::pbvfs::{PreBinnedScalarItem, PreBinnedScalarValueFetchedStream};
|
||||
use crate::binned::{BinnedStreamKind, PreBinnedItem};
|
||||
use crate::cache::pbvfs::PreBinnedScalarValueFetchedStream;
|
||||
use crate::cache::{CacheUsage, PreBinnedQuery};
|
||||
use crate::frame::makeframe::FrameType;
|
||||
use err::Error;
|
||||
use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
@@ -33,6 +33,7 @@ where
|
||||
impl<BK> BinnedScalarStreamFromPreBinnedPatches<BK>
|
||||
where
|
||||
BK: BinnedStreamKind,
|
||||
Result<StreamItem<<BK as BinnedStreamKind>::PreBinnedItem>, err::Error>: FrameType,
|
||||
{
|
||||
pub fn new(
|
||||
patch_it: PreBinnedPatchIterator,
|
||||
@@ -57,6 +58,7 @@ where
|
||||
let inp = futures_util::stream::iter(patches.into_iter())
|
||||
.map({
|
||||
let node_config = node_config.clone();
|
||||
let stream_kind = stream_kind.clone();
|
||||
move |patch| {
|
||||
let query = PreBinnedQuery::new(
|
||||
patch,
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem;
|
||||
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
|
||||
use crate::agg::streams::StreamItem;
|
||||
use crate::binned::BinnedStreamKind;
|
||||
use crate::binned::{BinnedStreamKind, RangeCompletableItem};
|
||||
use crate::cache::pbv::PreBinnedValueByteStream;
|
||||
use crate::cache::pbvfs::PreBinnedScalarItem;
|
||||
use crate::frame::makeframe::FrameType;
|
||||
use crate::merge::MergedMinMaxAvgScalarStream;
|
||||
use crate::raw::EventsQuery;
|
||||
use bytes::Bytes;
|
||||
@@ -24,8 +23,7 @@ use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use tiny_keccak::Hasher;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
|
||||
use tokio::io::{AsyncRead, ReadBuf};
|
||||
#[allow(unused_imports)]
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
|
||||
@@ -232,13 +230,14 @@ fn channel_from_params(params: &BTreeMap<String, String>) -> Result<Channel, Err
|
||||
// NOTE This answers a request for a single valid pre-binned patch.
|
||||
// A user must first make sure that the grid spec is valid, and that this node is responsible for it.
|
||||
// Otherwise it is an error.
|
||||
pub fn pre_binned_bytes_for_http<BK>(
|
||||
pub fn pre_binned_bytes_for_http<'a, BK>(
|
||||
node_config: &NodeConfigCached,
|
||||
query: &PreBinnedQuery,
|
||||
stream_kind: BK,
|
||||
) -> Result<PreBinnedValueByteStream<BK>, Error>
|
||||
where
|
||||
BK: BinnedStreamKind,
|
||||
Result<StreamItem<<BK as BinnedStreamKind>::PreBinnedItem>, err::Error>: FrameType,
|
||||
{
|
||||
if query.channel.backend != node_config.node.backend {
|
||||
let err = Error::with_msg(format!(
|
||||
@@ -319,22 +318,34 @@ impl AsyncRead for HttpBodyAsAsyncRead {
|
||||
}
|
||||
}
|
||||
|
||||
type T001 = Pin<Box<dyn Stream<Item = Result<StreamItem<MinMaxAvgScalarEventBatchStreamItem>, Error>> + Send>>;
|
||||
type T002 = Pin<Box<dyn Future<Output = Result<T001, Error>> + Send>>;
|
||||
pub struct MergedFromRemotes {
|
||||
tcp_establish_futs: Vec<T002>,
|
||||
nodein: Vec<Option<T001>>,
|
||||
merged: Option<T001>,
|
||||
type T001<T> = Pin<Box<dyn Stream<Item = Result<StreamItem<T>, Error>> + Send>>;
|
||||
type T002<T> = Pin<Box<dyn Future<Output = Result<T001<T>, Error>> + Send>>;
|
||||
|
||||
pub struct MergedFromRemotes<SK>
|
||||
where
|
||||
SK: BinnedStreamKind,
|
||||
{
|
||||
tcp_establish_futs: Vec<T002<RangeCompletableItem<<SK as BinnedStreamKind>::XBinnedEvents>>>,
|
||||
nodein: Vec<Option<T001<RangeCompletableItem<<SK as BinnedStreamKind>::XBinnedEvents>>>>,
|
||||
merged: Option<T001<RangeCompletableItem<<SK as BinnedStreamKind>::XBinnedEvents>>>,
|
||||
completed: bool,
|
||||
errored: bool,
|
||||
}
|
||||
|
||||
impl MergedFromRemotes {
|
||||
pub fn new(evq: EventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self {
|
||||
impl<SK> MergedFromRemotes<SK>
|
||||
where
|
||||
SK: BinnedStreamKind,
|
||||
{
|
||||
pub fn new(evq: EventsQuery, perf_opts: PerfOpts, cluster: Cluster, stream_kind: SK) -> Self {
|
||||
let mut tcp_establish_futs = vec![];
|
||||
for node in &cluster.nodes {
|
||||
let f = super::raw::x_processed_stream_from_node(evq.clone(), perf_opts.clone(), node.clone());
|
||||
let f: T002 = Box::pin(f);
|
||||
let f = super::raw::x_processed_stream_from_node(
|
||||
evq.clone(),
|
||||
perf_opts.clone(),
|
||||
node.clone(),
|
||||
stream_kind.clone(),
|
||||
);
|
||||
let f: T002<RangeCompletableItem<<SK as BinnedStreamKind>::XBinnedEvents>> = Box::pin(f);
|
||||
tcp_establish_futs.push(f);
|
||||
}
|
||||
let n = tcp_establish_futs.len();
|
||||
@@ -348,9 +359,11 @@ impl MergedFromRemotes {
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for MergedFromRemotes {
|
||||
// TODO need this generic for scalar and array (when wave is not binned down to a single scalar point)
|
||||
type Item = Result<StreamItem<MinMaxAvgScalarEventBatchStreamItem>, Error>;
|
||||
impl<SK> Stream for MergedFromRemotes<SK>
|
||||
where
|
||||
SK: BinnedStreamKind,
|
||||
{
|
||||
type Item = Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::XBinnedEvents>>, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
@@ -402,7 +415,7 @@ impl Stream for MergedFromRemotes {
|
||||
if c1 == self.tcp_establish_futs.len() {
|
||||
debug!("MergedFromRemotes setting up merged stream");
|
||||
let inps = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect();
|
||||
let s1 = MergedMinMaxAvgScalarStream::new(inps);
|
||||
let s1 = MergedMinMaxAvgScalarStream::<_, SK>::new(inps);
|
||||
self.merged = Some(Box::pin(s1));
|
||||
} else {
|
||||
debug!(
|
||||
@@ -530,11 +543,3 @@ pub async fn write_pb_cache_min_max_avg_scalar(
|
||||
.await??;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn read_pbv(mut file: File) -> Result<StreamItem<PreBinnedScalarItem>, Error> {
|
||||
let mut buf = vec![];
|
||||
file.read_to_end(&mut buf).await?;
|
||||
trace!("Read cached file len {}", buf.len());
|
||||
let dec: MinMaxAvgScalarBinBatch = serde_cbor::from_slice(&buf)?;
|
||||
Ok(StreamItem::DataItem(PreBinnedScalarItem::Batch(dec)))
|
||||
}
|
||||
|
||||
112
disk/src/cache/pbv.rs
vendored
112
disk/src/cache/pbv.rs
vendored
@@ -1,10 +1,10 @@
|
||||
use crate::agg::binnedt::IntoBinnedT;
|
||||
use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem};
|
||||
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
|
||||
use crate::agg::streams::StreamItem;
|
||||
use crate::binned::{BinnedStreamKind, PreBinnedItem};
|
||||
use crate::binned::{BinnedStreamKind, PreBinnedItem, RangeCompletableItem};
|
||||
use crate::cache::pbvfs::{PreBinnedScalarItem, PreBinnedScalarValueFetchedStream};
|
||||
use crate::cache::{CacheFileDesc, MergedFromRemotes, PreBinnedQuery};
|
||||
use crate::frame::makeframe::make_frame;
|
||||
use crate::frame::makeframe::{make_frame, FrameType};
|
||||
use crate::raw::EventsQuery;
|
||||
use crate::streamlog::Streamlog;
|
||||
use bytes::Bytes;
|
||||
@@ -18,57 +18,63 @@ use std::future::Future;
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use tokio::fs::File;
|
||||
|
||||
pub type PreBinnedValueByteStream<BK> = SCC<PreBinnedValueByteStreamInner<BK>>;
|
||||
|
||||
pub struct PreBinnedValueByteStreamInner<BK>
|
||||
pub struct PreBinnedValueByteStreamInner<SK>
|
||||
where
|
||||
BK: BinnedStreamKind,
|
||||
SK: BinnedStreamKind,
|
||||
{
|
||||
inp: PreBinnedValueStream<BK>,
|
||||
inp: PreBinnedValueStream<SK>,
|
||||
}
|
||||
|
||||
pub fn pre_binned_value_byte_stream_new<BK>(
|
||||
pub fn pre_binned_value_byte_stream_new<SK>(
|
||||
query: &PreBinnedQuery,
|
||||
node_config: &NodeConfigCached,
|
||||
stream_kind: BK,
|
||||
) -> PreBinnedValueByteStream<BK>
|
||||
stream_kind: SK,
|
||||
) -> PreBinnedValueByteStream<SK>
|
||||
where
|
||||
BK: BinnedStreamKind + Unpin,
|
||||
SK: BinnedStreamKind + Unpin,
|
||||
Result<StreamItem<<SK as BinnedStreamKind>::PreBinnedItem>, err::Error>: FrameType,
|
||||
{
|
||||
let s1 = PreBinnedValueStream::new(query.clone(), node_config, stream_kind);
|
||||
let s2 = PreBinnedValueByteStreamInner { inp: s1 };
|
||||
SCC::new(s2)
|
||||
}
|
||||
|
||||
impl<BK> Stream for PreBinnedValueByteStreamInner<BK>
|
||||
impl<SK> Stream for PreBinnedValueByteStreamInner<SK>
|
||||
where
|
||||
BK: BinnedStreamKind + Unpin,
|
||||
SK: BinnedStreamKind + Unpin,
|
||||
Result<StreamItem<<SK as BinnedStreamKind>::PreBinnedItem>, err::Error>: FrameType,
|
||||
PreBinnedValueStream<SK>: Unpin,
|
||||
{
|
||||
type Item = Result<Bytes, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
match self.inp.poll_next_unpin(cx) {
|
||||
Ready(Some(item)) => match make_frame::<Result<StreamItem<PreBinnedScalarItem>, Error>>(&item) {
|
||||
Ok(buf) => Ready(Some(Ok(buf.freeze()))),
|
||||
Err(e) => Ready(Some(Err(e.into()))),
|
||||
},
|
||||
Ready(Some(item)) => {
|
||||
match make_frame::<Result<StreamItem<<SK as BinnedStreamKind>::PreBinnedItem>, Error>>(&item) {
|
||||
Ok(buf) => Ready(Some(Ok(buf.freeze()))),
|
||||
Err(e) => Ready(Some(Err(e.into()))),
|
||||
}
|
||||
}
|
||||
Ready(None) => Ready(None),
|
||||
Pending => Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PreBinnedValueStream<BK>
|
||||
pub struct PreBinnedValueStream<SK>
|
||||
where
|
||||
BK: BinnedStreamKind,
|
||||
SK: BinnedStreamKind,
|
||||
{
|
||||
query: PreBinnedQuery,
|
||||
node_config: NodeConfigCached,
|
||||
open_check_local_file: Option<Pin<Box<dyn Future<Output = Result<tokio::fs::File, std::io::Error>> + Send>>>,
|
||||
open_check_local_file: Option<Pin<Box<dyn Future<Output = Result<File, std::io::Error>> + Send>>>,
|
||||
fut2:
|
||||
Option<Pin<Box<dyn Stream<Item = Result<StreamItem<<BK as BinnedStreamKind>::PreBinnedItem>, Error>> + Send>>>,
|
||||
Option<Pin<Box<dyn Stream<Item = Result<StreamItem<<SK as BinnedStreamKind>::PreBinnedItem>, Error>> + Send>>>,
|
||||
read_from_cache: bool,
|
||||
cache_written: bool,
|
||||
data_complete: bool,
|
||||
@@ -79,15 +85,18 @@ where
|
||||
streamlog: Streamlog,
|
||||
values: MinMaxAvgScalarBinBatch,
|
||||
write_fut: Option<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
|
||||
read_cache_fut: Option<Pin<Box<dyn Future<Output = Result<StreamItem<PreBinnedScalarItem>, Error>> + Send>>>,
|
||||
stream_kind: BK,
|
||||
read_cache_fut: Option<
|
||||
Pin<Box<dyn Future<Output = Result<StreamItem<<SK as BinnedStreamKind>::PreBinnedItem>, Error>> + Send>>,
|
||||
>,
|
||||
stream_kind: SK,
|
||||
}
|
||||
|
||||
impl<BK> PreBinnedValueStream<BK>
|
||||
impl<SK> PreBinnedValueStream<SK>
|
||||
where
|
||||
BK: BinnedStreamKind,
|
||||
SK: BinnedStreamKind,
|
||||
Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::PreBinnedItem>>, err::Error>: FrameType,
|
||||
{
|
||||
pub fn new(query: PreBinnedQuery, node_config: &NodeConfigCached, stream_kind: BK) -> Self {
|
||||
pub fn new(query: PreBinnedQuery, node_config: &NodeConfigCached, stream_kind: SK) -> Self {
|
||||
Self {
|
||||
query,
|
||||
node_config: node_config.clone(),
|
||||
@@ -130,22 +139,28 @@ where
|
||||
.ok_or(Error::with_msg("covering_range returns None"))
|
||||
.unwrap();
|
||||
let perf_opts = PerfOpts { inmem_bufcap: 512 };
|
||||
let s1 = MergedFromRemotes::new(evq, perf_opts, self.node_config.node_config.cluster.clone());
|
||||
let s1 = s1.into_binned_t(range);
|
||||
let s1 = MergedFromRemotes::new(
|
||||
evq,
|
||||
perf_opts,
|
||||
self.node_config.node_config.cluster.clone(),
|
||||
self.stream_kind.clone(),
|
||||
);
|
||||
let s1 = IntoBinnedT::into_binned_t(s1, range);
|
||||
let s1 = s1.map(|item| {
|
||||
// TODO does this do anything?
|
||||
match item {
|
||||
Ok(item) => match item {
|
||||
StreamItem::Log(item) => Ok(StreamItem::Log(item)),
|
||||
StreamItem::Stats(item) => Ok(StreamItem::Stats(item)),
|
||||
StreamItem::DataItem(item) => match item {
|
||||
StreamItem::DataItem(item) => Ok(StreamItem::DataItem(item)),
|
||||
/*StreamItem::DataItem(item) => match item {
|
||||
MinMaxAvgScalarBinBatchStreamItem::RangeComplete => {
|
||||
Ok(StreamItem::DataItem(PreBinnedScalarItem::RangeComplete))
|
||||
}
|
||||
MinMaxAvgScalarBinBatchStreamItem::Values(item) => {
|
||||
Ok(StreamItem::DataItem(PreBinnedScalarItem::Batch(item)))
|
||||
}
|
||||
},
|
||||
},*/
|
||||
},
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
@@ -153,7 +168,10 @@ where
|
||||
|
||||
// TODO
|
||||
// In the above must introduce a trait to convert to the generic item type:
|
||||
self.fut2 = Some(Box::pin(s1));
|
||||
|
||||
// TODO!!
|
||||
self.fut2 = Some(err::todoval());
|
||||
//self.fut2 = Some(Box::pin(s1));
|
||||
}
|
||||
|
||||
fn setup_from_higher_res_prebinned(&mut self, range: PreBinnedPatchRange) {
|
||||
@@ -223,12 +241,12 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<BK> Stream for PreBinnedValueStream<BK>
|
||||
impl<SK> Stream for PreBinnedValueStream<SK>
|
||||
where
|
||||
BK: BinnedStreamKind,
|
||||
SK: BinnedStreamKind + Unpin,
|
||||
Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::PreBinnedItem>>, err::Error>: FrameType,
|
||||
{
|
||||
// TODO need this generic for scalar and array (when wave is not binned down to a single scalar point)
|
||||
type Item = Result<StreamItem<PreBinnedScalarItem>, Error>;
|
||||
type Item = Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::PreBinnedItem>>, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
@@ -283,7 +301,8 @@ where
|
||||
if self.cache_written {
|
||||
if self.range_complete_observed {
|
||||
self.range_complete_emitted = true;
|
||||
Ready(Some(Ok(StreamItem::DataItem(PreBinnedScalarItem::RangeComplete))))
|
||||
let item = <<SK as BinnedStreamKind>::PreBinnedItem as PreBinnedItem>::make_range_complete();
|
||||
Ready(Some(Ok(StreamItem::DataItem(item))))
|
||||
} else {
|
||||
self.completed = true;
|
||||
Ready(None)
|
||||
@@ -320,24 +339,9 @@ where
|
||||
} else if let Some(fut) = self.fut2.as_mut() {
|
||||
match fut.poll_next_unpin(cx) {
|
||||
Ready(Some(k)) => match k {
|
||||
Ok(item) => match item {
|
||||
StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))),
|
||||
StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))),
|
||||
StreamItem::DataItem(item) => match item {
|
||||
PreBinnedScalarItem::RangeComplete => {
|
||||
self.range_complete_observed = true;
|
||||
continue 'outer;
|
||||
}
|
||||
PreBinnedScalarItem::Batch(batch) => {
|
||||
self.values.ts1s.extend(batch.ts1s.iter());
|
||||
self.values.ts2s.extend(batch.ts2s.iter());
|
||||
self.values.counts.extend(batch.counts.iter());
|
||||
self.values.mins.extend(batch.mins.iter());
|
||||
self.values.maxs.extend(batch.maxs.iter());
|
||||
self.values.avgs.extend(batch.avgs.iter());
|
||||
Ready(Some(Ok(StreamItem::DataItem(PreBinnedScalarItem::Batch(batch)))))
|
||||
}
|
||||
},
|
||||
Ok(item) => match SK::pbv_handle_fut2_item(item) {
|
||||
None => continue 'outer,
|
||||
Some(item) => Ready(Some(Ok(item))),
|
||||
},
|
||||
Err(e) => {
|
||||
self.errored = true;
|
||||
@@ -357,7 +361,7 @@ where
|
||||
match item {
|
||||
Ok(file) => {
|
||||
self.read_from_cache = true;
|
||||
let fut = super::read_pbv(file);
|
||||
let fut = <SK as BinnedStreamKind>::PreBinnedItem::read_pbv(file)?;
|
||||
self.read_cache_fut = Some(Box::pin(fut));
|
||||
continue 'outer;
|
||||
}
|
||||
|
||||
7
disk/src/cache/pbvfs.rs
vendored
7
disk/src/cache/pbvfs.rs
vendored
@@ -3,7 +3,7 @@ use crate::agg::streams::StreamItem;
|
||||
use crate::binned::BinnedStreamKind;
|
||||
use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead, PreBinnedQuery};
|
||||
use crate::frame::inmem::InMemoryFrameAsyncReadStream;
|
||||
use crate::frame::makeframe::decode_frame;
|
||||
use crate::frame::makeframe::{decode_frame, FrameType};
|
||||
use err::Error;
|
||||
use futures_core::Stream;
|
||||
use futures_util::{pin_mut, FutureExt};
|
||||
@@ -61,6 +61,7 @@ pub enum PreBinnedScalarItem {
|
||||
impl<BK> Stream for PreBinnedScalarValueFetchedStream<BK>
|
||||
where
|
||||
BK: BinnedStreamKind,
|
||||
Result<StreamItem<<BK as BinnedStreamKind>::PreBinnedItem>, err::Error>: FrameType,
|
||||
{
|
||||
type Item = Result<StreamItem<BK::PreBinnedItem>, Error>;
|
||||
|
||||
@@ -79,7 +80,9 @@ where
|
||||
StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))),
|
||||
StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))),
|
||||
StreamItem::DataItem(item) => {
|
||||
match decode_frame::<Result<StreamItem<PreBinnedScalarItem>, Error>>(&item) {
|
||||
match decode_frame::<Result<StreamItem<<BK as BinnedStreamKind>::PreBinnedItem>, Error>>(
|
||||
&item,
|
||||
) {
|
||||
Ok(Ok(item)) => Ready(Some(Ok(item))),
|
||||
Ok(Err(e)) => {
|
||||
self.errored = true;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use crate::agg::streams::StreamItem;
|
||||
use crate::dataopen::{open_files, OpenedFile};
|
||||
use crate::eventchunker::{EventChunker, EventChunkerConf, EventChunkerItem};
|
||||
use crate::eventchunker::{EventChunker, EventChunkerConf};
|
||||
use crate::file_content_stream;
|
||||
use crate::streamlog::LogItem;
|
||||
use err::Error;
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::agg::streams::{StatsItem, StreamItem};
|
||||
use crate::binned::RangeCompletableItem;
|
||||
use crate::{FileChunkRead, NeedMinBuffer};
|
||||
use bitshuffle::bitshuffle_decompress;
|
||||
use bytes::{Buf, BytesMut};
|
||||
@@ -345,13 +346,8 @@ impl EventFull {
|
||||
}
|
||||
}
|
||||
|
||||
pub enum EventChunkerItem {
|
||||
Events(EventFull),
|
||||
RangeComplete,
|
||||
}
|
||||
|
||||
impl Stream for EventChunker {
|
||||
type Item = Result<StreamItem<EventChunkerItem>, Error>;
|
||||
type Item = Result<StreamItem<RangeCompletableItem<EventFull>>, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
@@ -374,7 +370,7 @@ impl Stream for EventChunker {
|
||||
} else if self.final_stats_sent {
|
||||
self.sent_beyond_range = true;
|
||||
if self.seen_beyond_range {
|
||||
Ready(Some(Ok(StreamItem::DataItem(EventChunkerItem::RangeComplete))))
|
||||
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
|
||||
} else {
|
||||
continue 'outer;
|
||||
}
|
||||
@@ -404,7 +400,7 @@ impl Stream for EventChunker {
|
||||
} else {
|
||||
let x = self.need_min;
|
||||
self.inp.set_need_min(x);
|
||||
let ret = StreamItem::DataItem(EventChunkerItem::Events(res.events));
|
||||
let ret = StreamItem::DataItem(RangeCompletableItem::Data(res.events));
|
||||
Ready(Some(Ok(ret)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,13 +1,11 @@
|
||||
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem;
|
||||
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
|
||||
use crate::agg::streams::StreamItem;
|
||||
use crate::binned::BinnedScalarStreamItem;
|
||||
use crate::cache::pbvfs::PreBinnedScalarItem;
|
||||
use crate::frame::inmem::InMemoryFrame;
|
||||
use crate::raw::conn::RawConnOut;
|
||||
use crate::raw::EventQueryJsonStringFrame;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use err::Error;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
|
||||
pub const INMEM_FRAME_HEAD: usize = 20;
|
||||
pub const INMEM_FRAME_FOOT: usize = 4;
|
||||
@@ -21,10 +19,6 @@ impl FrameType for EventQueryJsonStringFrame {
|
||||
const FRAME_TYPE_ID: u32 = 0x03;
|
||||
}
|
||||
|
||||
impl FrameType for RawConnOut {
|
||||
const FRAME_TYPE_ID: u32 = 0x04;
|
||||
}
|
||||
|
||||
impl FrameType for Result<StreamItem<BinnedScalarStreamItem>, Error> {
|
||||
const FRAME_TYPE_ID: u32 = 0x06;
|
||||
}
|
||||
@@ -37,6 +31,10 @@ impl FrameType for Result<StreamItem<PreBinnedScalarItem>, Error> {
|
||||
const FRAME_TYPE_ID: u32 = 0x08;
|
||||
}
|
||||
|
||||
impl FrameType for Result<StreamItem<MinMaxAvgScalarEventBatchStreamItem>, Error> {
|
||||
const FRAME_TYPE_ID: u32 = 0x09;
|
||||
}
|
||||
|
||||
pub fn make_frame<FT>(item: &FT) -> Result<BytesMut, Error>
|
||||
where
|
||||
FT: FrameType + Serialize,
|
||||
@@ -85,9 +83,9 @@ pub fn make_term_frame() -> BytesMut {
|
||||
buf
|
||||
}
|
||||
|
||||
pub fn decode_frame<'a, FT>(frame: &'a InMemoryFrame) -> Result<FT, Error>
|
||||
pub fn decode_frame<FT>(frame: &InMemoryFrame) -> Result<FT, Error>
|
||||
where
|
||||
FT: FrameType + Deserialize<'a>,
|
||||
FT: FrameType + DeserializeOwned,
|
||||
{
|
||||
if frame.encid() != 0x12121212 {
|
||||
return Err(Error::with_msg(format!("unknown encoder id {:?}", frame)));
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use crate::agg::eventbatch::{MinMaxAvgScalarEventBatch, MinMaxAvgScalarEventBatchStreamItem};
|
||||
use crate::agg::streams::{StatsItem, StreamItem};
|
||||
use crate::agg::binnedt::AggregatableTdim;
|
||||
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
|
||||
use crate::agg::streams::{Collectable, Collected, StatsItem, StreamItem};
|
||||
use crate::binned::{BinnedStreamKind, XBinnedEventsStreamItem};
|
||||
use crate::streamlog::LogItem;
|
||||
use err::Error;
|
||||
use futures_core::Stream;
|
||||
@@ -10,16 +12,17 @@ use std::collections::VecDeque;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
pub struct MergedMinMaxAvgScalarStream<S>
|
||||
pub struct MergedMinMaxAvgScalarStream<S, SK>
|
||||
where
|
||||
S: Stream<Item = Result<StreamItem<MinMaxAvgScalarEventBatchStreamItem>, Error>>,
|
||||
S: Stream<Item = Result<StreamItem<<SK as BinnedStreamKind>::XBinnedEvents>, Error>> + Unpin,
|
||||
SK: BinnedStreamKind,
|
||||
{
|
||||
inps: Vec<S>,
|
||||
current: Vec<MergedMinMaxAvgScalarStreamCurVal>,
|
||||
current: Vec<MergedCurVal<<SK as BinnedStreamKind>::XBinnedEvents>>,
|
||||
ixs: Vec<usize>,
|
||||
errored: bool,
|
||||
completed: bool,
|
||||
batch: MinMaxAvgScalarEventBatch,
|
||||
batch: <SK as BinnedStreamKind>::XBinnedEvents,
|
||||
ts_last_emit: u64,
|
||||
range_complete_observed: Vec<bool>,
|
||||
range_complete_observed_all: bool,
|
||||
@@ -30,23 +33,21 @@ where
|
||||
event_data_read_stats_items: VecDeque<EventDataReadStats>,
|
||||
}
|
||||
|
||||
impl<S> MergedMinMaxAvgScalarStream<S>
|
||||
impl<S, SK> MergedMinMaxAvgScalarStream<S, SK>
|
||||
where
|
||||
S: Stream<Item = Result<StreamItem<MinMaxAvgScalarEventBatchStreamItem>, Error>> + Unpin,
|
||||
S: Stream<Item = Result<StreamItem<<SK as BinnedStreamKind>::XBinnedEvents>, Error>> + Unpin,
|
||||
SK: BinnedStreamKind,
|
||||
{
|
||||
pub fn new(inps: Vec<S>) -> Self {
|
||||
let n = inps.len();
|
||||
let current = (0..n)
|
||||
.into_iter()
|
||||
.map(|_| MergedMinMaxAvgScalarStreamCurVal::None)
|
||||
.collect();
|
||||
let current = (0..n).into_iter().map(|_| MergedCurVal::None).collect();
|
||||
Self {
|
||||
inps,
|
||||
current: current,
|
||||
ixs: vec![0; n],
|
||||
errored: false,
|
||||
completed: false,
|
||||
batch: MinMaxAvgScalarEventBatch::empty(),
|
||||
batch: <<SK as BinnedStreamKind>::XBinnedEvents as Collected>::new(0),
|
||||
ts_last_emit: 0,
|
||||
range_complete_observed: vec![false; n],
|
||||
range_complete_observed_all: false,
|
||||
@@ -63,7 +64,7 @@ where
|
||||
let mut pending = 0;
|
||||
for i1 in 0..self.inps.len() {
|
||||
match self.current[i1] {
|
||||
MergedMinMaxAvgScalarStreamCurVal::None => {
|
||||
MergedCurVal::None => {
|
||||
'l1: loop {
|
||||
break match self.inps[i1].poll_next_unpin(cx) {
|
||||
Ready(Some(Ok(k))) => match k {
|
||||
@@ -79,23 +80,23 @@ where
|
||||
}
|
||||
continue 'l1;
|
||||
}
|
||||
StreamItem::DataItem(item) => match item {
|
||||
MinMaxAvgScalarEventBatchStreamItem::Values(vals) => {
|
||||
self.ixs[i1] = 0;
|
||||
self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::Val(vals);
|
||||
}
|
||||
MinMaxAvgScalarEventBatchStreamItem::RangeComplete => {
|
||||
StreamItem::DataItem(item) => {
|
||||
// TODO factor out the concept of RangeComplete into another trait layer.
|
||||
if item.is_range_complete() {
|
||||
self.range_complete_observed[i1] = true;
|
||||
let d = self.range_complete_observed.iter().filter(|&&k| k).count();
|
||||
if d == self.range_complete_observed.len() {
|
||||
self.range_complete_observed_all = true;
|
||||
debug!("MergedMinMaxAvgScalarStream range_complete d {} COMPLETE", d);
|
||||
debug!("MergedStream range_complete d {} COMPLETE", d);
|
||||
} else {
|
||||
trace!("MergedMinMaxAvgScalarStream range_complete d {}", d);
|
||||
trace!("MergedStream range_complete d {}", d);
|
||||
}
|
||||
continue 'l1;
|
||||
} else {
|
||||
self.ixs[i1] = 0;
|
||||
self.current[i1] = MergedCurVal::Val(item);
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
Ready(Some(Err(e))) => {
|
||||
// TODO emit this error, consider this stream as done, anything more to do here?
|
||||
@@ -104,7 +105,7 @@ where
|
||||
return Ready(Err(e));
|
||||
}
|
||||
Ready(None) => {
|
||||
self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::Finish;
|
||||
self.current[i1] = MergedCurVal::Finish;
|
||||
}
|
||||
Pending => {
|
||||
pending += 1;
|
||||
@@ -123,17 +124,18 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Stream for MergedMinMaxAvgScalarStream<S>
|
||||
impl<S, SK> Stream for MergedMinMaxAvgScalarStream<S, SK>
|
||||
where
|
||||
S: Stream<Item = Result<StreamItem<MinMaxAvgScalarEventBatchStreamItem>, Error>> + Unpin,
|
||||
S: Stream<Item = Result<StreamItem<<SK as BinnedStreamKind>::XBinnedEvents>, Error>> + Unpin,
|
||||
SK: BinnedStreamKind,
|
||||
{
|
||||
type Item = Result<StreamItem<MinMaxAvgScalarEventBatchStreamItem>, Error>;
|
||||
type Item = Result<StreamItem<<SK as BinnedStreamKind>::XBinnedEvents>, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
'outer: loop {
|
||||
break if self.completed {
|
||||
panic!("MergedMinMaxAvgScalarStream poll_next on completed");
|
||||
panic!("MergedStream poll_next on completed");
|
||||
} else if self.errored {
|
||||
self.completed = true;
|
||||
Ready(None)
|
||||
@@ -149,7 +151,7 @@ where
|
||||
} else {
|
||||
self.range_complete_observed_all_emitted = true;
|
||||
Ready(Some(Ok(StreamItem::DataItem(
|
||||
MinMaxAvgScalarEventBatchStreamItem::RangeComplete,
|
||||
<<SK as BinnedStreamKind>::XBinnedEvents as XBinnedEventsStreamItem>::make_range_complete(),
|
||||
))))
|
||||
}
|
||||
} else {
|
||||
@@ -163,11 +165,11 @@ where
|
||||
let mut lowest_ix = usize::MAX;
|
||||
let mut lowest_ts = u64::MAX;
|
||||
for i1 in 0..self.inps.len() {
|
||||
if let MergedMinMaxAvgScalarStreamCurVal::Val(val) = &self.current[i1] {
|
||||
if let MergedCurVal::Val(val) = &self.current[i1] {
|
||||
let u = self.ixs[i1];
|
||||
if u >= val.tss.len() {
|
||||
self.ixs[i1] = 0;
|
||||
self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::None;
|
||||
self.current[i1] = MergedCurVal::None;
|
||||
continue 'outer;
|
||||
} else {
|
||||
let ts = val.tss[u];
|
||||
@@ -180,8 +182,10 @@ where
|
||||
}
|
||||
if lowest_ix == usize::MAX {
|
||||
if self.batch.tss.len() != 0 {
|
||||
let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty());
|
||||
let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k);
|
||||
//let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty());
|
||||
//let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k);
|
||||
let emp = <<SK as BinnedStreamKind>::XBinnedEvents as Collected>::new(0);
|
||||
let ret = std::mem::replace(&mut self.batch, emp);
|
||||
self.data_emit_complete = true;
|
||||
Ready(Some(Ok(StreamItem::DataItem(ret))))
|
||||
} else {
|
||||
@@ -194,9 +198,7 @@ where
|
||||
self.batch.tss.push(lowest_ts);
|
||||
let rix = self.ixs[lowest_ix];
|
||||
let z = match &self.current[lowest_ix] {
|
||||
MergedMinMaxAvgScalarStreamCurVal::Val(k) => {
|
||||
(k.mins[rix], k.maxs[rix], k.avgs[rix], k.tss.len())
|
||||
}
|
||||
MergedCurVal::Val(k) => (k.mins[rix], k.maxs[rix], k.avgs[rix], k.tss.len()),
|
||||
_ => panic!(),
|
||||
};
|
||||
self.batch.mins.push(z.0);
|
||||
@@ -205,11 +207,13 @@ where
|
||||
self.ixs[lowest_ix] += 1;
|
||||
if self.ixs[lowest_ix] >= z.3 {
|
||||
self.ixs[lowest_ix] = 0;
|
||||
self.current[lowest_ix] = MergedMinMaxAvgScalarStreamCurVal::None;
|
||||
self.current[lowest_ix] = MergedCurVal::None;
|
||||
}
|
||||
if self.batch.tss.len() >= self.batch_size {
|
||||
let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty());
|
||||
let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k);
|
||||
//let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty());
|
||||
//let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k);
|
||||
let emp = <<SK as BinnedStreamKind>::XBinnedEvents as Collected>::new(0);
|
||||
let ret = std::mem::replace(&mut self.batch, emp);
|
||||
Ready(Some(Ok(StreamItem::DataItem(ret))))
|
||||
} else {
|
||||
continue 'outer;
|
||||
@@ -227,8 +231,8 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
enum MergedMinMaxAvgScalarStreamCurVal {
|
||||
enum MergedCurVal<T> {
|
||||
None,
|
||||
Finish,
|
||||
Val(MinMaxAvgScalarEventBatch),
|
||||
Val(T),
|
||||
}
|
||||
|
||||
@@ -5,11 +5,11 @@ Delivers event data (not yet time-binned) from local storage and provides client
|
||||
to request such data from nodes.
|
||||
*/
|
||||
|
||||
use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem;
|
||||
use crate::agg::streams::StreamItem;
|
||||
use crate::binned::{BinnedStreamKind, RangeCompletableItem};
|
||||
use crate::frame::inmem::InMemoryFrameAsyncReadStream;
|
||||
use crate::frame::makeframe::{make_frame, make_term_frame};
|
||||
use crate::raw::bffr::MinMaxAvgScalarEventBatchStreamFromFrames;
|
||||
use crate::raw::bffr::EventsFromFrames;
|
||||
use err::Error;
|
||||
use futures_core::Stream;
|
||||
use netpod::{AggKind, Channel, NanoRange, Node, PerfOpts};
|
||||
@@ -36,11 +36,23 @@ pub struct EventsQuery {
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct EventQueryJsonStringFrame(String);
|
||||
|
||||
pub async fn x_processed_stream_from_node(
|
||||
pub async fn x_processed_stream_from_node<SK>(
|
||||
query: EventsQuery,
|
||||
perf_opts: PerfOpts,
|
||||
node: Node,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Result<StreamItem<MinMaxAvgScalarEventBatchStreamItem>, Error>> + Send>>, Error> {
|
||||
stream_kind: SK,
|
||||
) -> Result<
|
||||
Pin<
|
||||
Box<
|
||||
dyn Stream<Item = Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::XBinnedEvents>>, Error>>
|
||||
+ Send,
|
||||
>,
|
||||
>,
|
||||
Error,
|
||||
>
|
||||
where
|
||||
SK: BinnedStreamKind,
|
||||
{
|
||||
let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?;
|
||||
let qjs = serde_json::to_string(&query)?;
|
||||
let (netin, mut netout) = net.into_split();
|
||||
@@ -51,7 +63,7 @@ pub async fn x_processed_stream_from_node(
|
||||
netout.flush().await?;
|
||||
netout.forget();
|
||||
let frames = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap);
|
||||
let items = MinMaxAvgScalarEventBatchStreamFromFrames::new(frames);
|
||||
let items = EventsFromFrames::new(frames, stream_kind);
|
||||
Ok(Box::pin(items))
|
||||
}
|
||||
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem;
|
||||
use crate::agg::streams::StreamItem;
|
||||
use crate::binned::{BinnedStreamKind, RangeCompletableItem};
|
||||
use crate::frame::inmem::InMemoryFrameAsyncReadStream;
|
||||
use crate::frame::makeframe::decode_frame;
|
||||
use crate::raw::conn::RawConnOut;
|
||||
use err::Error;
|
||||
use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
@@ -11,39 +10,44 @@ use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use tokio::io::AsyncRead;
|
||||
|
||||
pub struct MinMaxAvgScalarEventBatchStreamFromFrames<T>
|
||||
pub struct EventsFromFrames<T, SK>
|
||||
where
|
||||
T: AsyncRead + Unpin,
|
||||
SK: BinnedStreamKind,
|
||||
{
|
||||
inp: InMemoryFrameAsyncReadStream<T>,
|
||||
errored: bool,
|
||||
completed: bool,
|
||||
stream_kind: SK,
|
||||
}
|
||||
|
||||
impl<T> MinMaxAvgScalarEventBatchStreamFromFrames<T>
|
||||
impl<T, SK> EventsFromFrames<T, SK>
|
||||
where
|
||||
T: AsyncRead + Unpin,
|
||||
SK: BinnedStreamKind,
|
||||
{
|
||||
pub fn new(inp: InMemoryFrameAsyncReadStream<T>) -> Self {
|
||||
pub fn new(inp: InMemoryFrameAsyncReadStream<T>, stream_kind: SK) -> Self {
|
||||
Self {
|
||||
inp,
|
||||
errored: false,
|
||||
completed: false,
|
||||
stream_kind,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Stream for MinMaxAvgScalarEventBatchStreamFromFrames<T>
|
||||
impl<T, SK> Stream for EventsFromFrames<T, SK>
|
||||
where
|
||||
T: AsyncRead + Unpin,
|
||||
SK: BinnedStreamKind,
|
||||
{
|
||||
type Item = Result<StreamItem<MinMaxAvgScalarEventBatchStreamItem>, Error>;
|
||||
type Item = Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::XBinnedEvents>>, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
loop {
|
||||
break if self.completed {
|
||||
panic!("MinMaxAvgScalarEventBatchStreamFromFrames poll_next on completed");
|
||||
panic!("EventsFromFrames poll_next on completed");
|
||||
} else if self.errored {
|
||||
self.completed = true;
|
||||
Ready(None)
|
||||
@@ -53,8 +57,7 @@ where
|
||||
StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))),
|
||||
StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))),
|
||||
StreamItem::DataItem(frame) => {
|
||||
type ExpectedType = RawConnOut;
|
||||
match decode_frame::<ExpectedType>(&frame) {
|
||||
match decode_frame::<<SK as BinnedStreamKind>::XBinnedEvents>(&frame) {
|
||||
Ok(item) => match item {
|
||||
Ok(item) => Ready(Some(Ok(item))),
|
||||
Err(e) => {
|
||||
@@ -64,7 +67,7 @@ where
|
||||
},
|
||||
Err(e) => {
|
||||
error!(
|
||||
"MinMaxAvgScalarEventBatchStreamFromFrames ~~~~~~~~ ERROR on frame payload {}",
|
||||
"EventsFromFrames ~~~~~~~~ ERROR on frame payload {}",
|
||||
frame.buf().len(),
|
||||
);
|
||||
self.errored = true;
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use crate::agg::binnedx::IntoBinnedXBins1;
|
||||
use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem;
|
||||
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
|
||||
use crate::agg::streams::StreamItem;
|
||||
use crate::agg::IntoDim1F32Stream;
|
||||
use crate::binned::{BinnedStreamKind, BinnedStreamKindScalar, RangeCompletableItem};
|
||||
use crate::channelconfig::{extract_matching_config_entry, read_local_config};
|
||||
use crate::eventblobs::EventBlobsComplete;
|
||||
use crate::eventchunker::EventChunkerConf;
|
||||
@@ -11,30 +12,30 @@ use crate::raw::{EventQueryJsonStringFrame, EventsQuery};
|
||||
use err::Error;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::log::*;
|
||||
use netpod::{ByteSize, NodeConfigCached, PerfOpts};
|
||||
use netpod::{AggKind, ByteSize, NodeConfigCached, PerfOpts};
|
||||
use std::net::SocketAddr;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::net::tcp::OwnedWriteHalf;
|
||||
use tokio::net::TcpStream;
|
||||
use tracing::Instrument;
|
||||
|
||||
pub async fn raw_service(node_config: NodeConfigCached) -> Result<(), Error> {
|
||||
pub async fn events_service(node_config: NodeConfigCached) -> Result<(), Error> {
|
||||
let addr = format!("{}:{}", node_config.node.listen, node_config.node.port_raw);
|
||||
let lis = tokio::net::TcpListener::bind(addr).await?;
|
||||
loop {
|
||||
match lis.accept().await {
|
||||
Ok((stream, addr)) => {
|
||||
taskrun::spawn(raw_conn_handler(stream, addr, node_config.clone()));
|
||||
taskrun::spawn(events_conn_handler(stream, addr, node_config.clone()));
|
||||
}
|
||||
Err(e) => Err(e)?,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr, node_config: NodeConfigCached) -> Result<(), Error> {
|
||||
async fn events_conn_handler(stream: TcpStream, addr: SocketAddr, node_config: NodeConfigCached) -> Result<(), Error> {
|
||||
//use tracing_futures::Instrument;
|
||||
let span1 = span!(Level::INFO, "raw::raw_conn_handler");
|
||||
let r = raw_conn_handler_inner(stream, addr, &node_config)
|
||||
let r = events_conn_handler_inner(stream, addr, &node_config)
|
||||
.instrument(span1)
|
||||
.await;
|
||||
match r {
|
||||
@@ -46,17 +47,16 @@ async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr, node_config: Node
|
||||
}
|
||||
}
|
||||
|
||||
pub type RawConnOut = Result<StreamItem<MinMaxAvgScalarEventBatchStreamItem>, Error>;
|
||||
|
||||
async fn raw_conn_handler_inner(
|
||||
async fn events_conn_handler_inner(
|
||||
stream: TcpStream,
|
||||
addr: SocketAddr,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<(), Error> {
|
||||
match raw_conn_handler_inner_try(stream, addr, node_config).await {
|
||||
match events_conn_handler_inner_try(stream, addr, node_config).await {
|
||||
Ok(_) => (),
|
||||
Err(mut ce) => {
|
||||
let buf = make_frame::<RawConnOut>(&Err(ce.err))?;
|
||||
// TODO is it guaranteed to be compatible to serialize this way?
|
||||
let buf = make_frame::<Result<StreamItem<MinMaxAvgScalarEventBatchStreamItem>, Error>>(&Err(ce.err))?;
|
||||
match ce.netout.write_all(&buf).await {
|
||||
Ok(_) => (),
|
||||
Err(e) => return Err(e)?,
|
||||
@@ -80,7 +80,7 @@ impl<E: Into<Error>> From<(E, OwnedWriteHalf)> for ConnErr {
|
||||
}
|
||||
}
|
||||
|
||||
async fn raw_conn_handler_inner_try(
|
||||
async fn events_conn_handler_inner_try(
|
||||
stream: TcpStream,
|
||||
addr: SocketAddr,
|
||||
node_config: &NodeConfigCached,
|
||||
@@ -170,14 +170,40 @@ async fn raw_conn_handler_inner_try(
|
||||
Ok(_) => {}
|
||||
Err(_) => {}
|
||||
}
|
||||
match make_frame::<RawConnOut>(&item) {
|
||||
Ok(buf) => match netout.write_all(&buf).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => return Err((e, netout))?,
|
||||
},
|
||||
Err(e) => {
|
||||
return Err((e, netout))?;
|
||||
match evq.agg_kind {
|
||||
AggKind::DimXBins1 => {
|
||||
match make_frame::<
|
||||
Result<
|
||||
StreamItem<RangeCompletableItem<<BinnedStreamKindScalar as BinnedStreamKind>::XBinnedEvents>>,
|
||||
Error,
|
||||
>,
|
||||
>(&item)
|
||||
{
|
||||
Ok(buf) => match netout.write_all(&buf).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => return Err((e, netout))?,
|
||||
},
|
||||
Err(e) => {
|
||||
return Err((e, netout))?;
|
||||
}
|
||||
}
|
||||
}
|
||||
// TODO define this case:
|
||||
AggKind::DimXBinsN(n1) => match make_frame::<
|
||||
Result<
|
||||
StreamItem<RangeCompletableItem<<BinnedStreamKindScalar as BinnedStreamKind>::XBinnedEvents>>,
|
||||
Error,
|
||||
>,
|
||||
>(err::todoval())
|
||||
{
|
||||
Ok(buf) => match netout.write_all(&buf).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => return Err((e, netout))?,
|
||||
},
|
||||
Err(e) => {
|
||||
return Err((e, netout))?;
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
let buf = make_term_frame();
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use bytes::Bytes;
|
||||
use disk::binned::BinnedStreamKindScalar;
|
||||
use disk::cache::{BinnedQuery, PreBinnedQuery};
|
||||
use disk::raw::conn::raw_service;
|
||||
use disk::raw::conn::events_service;
|
||||
use err::Error;
|
||||
use future::Future;
|
||||
use futures_core::Stream;
|
||||
@@ -23,7 +23,7 @@ pub mod gather;
|
||||
|
||||
pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> {
|
||||
let node_config = node_config.clone();
|
||||
let rawjh = taskrun::spawn(raw_service(node_config.clone()));
|
||||
let rawjh = taskrun::spawn(events_service(node_config.clone()));
|
||||
use std::str::FromStr;
|
||||
let addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen, node_config.node.port))?;
|
||||
let make_service = make_service_fn({
|
||||
|
||||
Reference in New Issue
Block a user