Factor types

This commit is contained in:
Dominik Werder
2021-05-26 08:29:29 +02:00
parent 9751660118
commit a76e86e623
9 changed files with 130 additions and 40 deletions

View File

@@ -8,7 +8,7 @@ use crate::binned::scalar::binned_stream;
use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BoxedStream};
use crate::cache::{BinnedQuery, MergedFromRemotes};
use crate::channelconfig::{extract_matching_config_entry, read_local_config};
use crate::frame::makeframe::make_frame;
use crate::frame::makeframe::{make_frame, FrameType};
use crate::raw::EventsQuery;
use bytes::Bytes;
use chrono::{TimeZone, Utc};
@@ -332,7 +332,7 @@ where
impl<T> Future for ReadPbv<T>
where
T: ReadableFromFile,
T: ReadableFromFile + Unpin,
{
type Output = Result<StreamItem<RangeCompletableItem<T>>, Error>;
@@ -391,6 +391,45 @@ impl FilterFittingInside for MinMaxAvgScalarBinBatch {
}
}
pub trait WithLen {
fn len(&self) -> usize;
}
impl WithLen for MinMaxAvgScalarEventBatch {
fn len(&self) -> usize {
self.tss.len()
}
}
impl WithLen for MinMaxAvgScalarBinBatch {
fn len(&self) -> usize {
self.ts1s.len()
}
}
pub trait WithTimestamps {
fn ts(&self, ix: usize) -> u64;
}
impl WithTimestamps for MinMaxAvgScalarEventBatch {
fn ts(&self, ix: usize) -> u64 {
self.tss[ix]
}
}
pub trait PushableIndex {
fn push_index(&mut self, src: &Self, ix: usize);
}
impl PushableIndex for MinMaxAvgScalarEventBatch {
fn push_index(&mut self, src: &Self, ix: usize) {
self.tss.push(src.tss[ix]);
self.mins.push(src.mins[ix]);
self.maxs.push(src.maxs[ix]);
self.avgs.push(src.avgs[ix]);
}
}
impl Collected for MinMaxAvgScalarEventBatch {
// TODO for this case we don't have an expected number of events. Factor out into another trait?
fn new(bin_count_exp: u32) -> Self {
@@ -415,8 +454,6 @@ impl Collectable for MinMaxAvgScalarEventBatch {
}
}
pub trait TBinned: Send + Serialize + DeserializeOwned + Unpin + Collectable + AggregatableTdim<Output = Self> {}
impl Collected for MinMaxAvgScalarBinBatch {
fn new(bin_count_exp: u32) -> Self {
MinMaxAvgScalarBinBatch::empty()
@@ -437,14 +474,21 @@ impl Collectable for MinMaxAvgScalarBinBatch {
}
pub trait XBinnedEvents:
Sized + Unpin + Send + Serialize + DeserializeOwned + Collectable + Collected + AggregatableTdim
Sized
+ Unpin
+ Send
+ Serialize
+ DeserializeOwned
+ Collectable
+ Collected
+ AggregatableTdim
+ WithLen
+ WithTimestamps
+ PushableIndex
{
fn frame_type() -> u32;
}
impl XBinnedEvents for MinMaxAvgScalarEventBatch {}
impl TBinnedBins for MinMaxAvgScalarBinBatch {}
pub trait TBinnedBins:
Sized
+ Unpin
@@ -456,10 +500,27 @@ pub trait TBinnedBins:
+ ReadableFromFile
+ FilterFittingInside
+ AggregatableTdim2
+ WithLen
{
fn frame_type() -> u32;
}
pub trait BinnedStreamKind: Clone + Unpin + Send + Sync + 'static {
impl XBinnedEvents for MinMaxAvgScalarEventBatch {
fn frame_type() -> u32 {
<Result<StreamItem<RangeCompletableItem<Self>>, Error> as FrameType>::FRAME_TYPE_ID
}
}
impl TBinnedBins for MinMaxAvgScalarBinBatch {
fn frame_type() -> u32 {
<Result<StreamItem<RangeCompletableItem<Self>>, Error> as FrameType>::FRAME_TYPE_ID
}
}
pub trait BinnedStreamKind: Clone + Unpin + Send + Sync + 'static
// TODO would it be better to express it here?
//where Result<StreamItem<RangeCompletableItem<Self::XBinnedEvents>>, Error>: FrameType,
{
type TBinnedStreamType: Stream<Item = Result<StreamItem<RangeCompletableItem<Self::TBinnedBins>>, Error>>
+ Send
+ 'static;

View File

@@ -3,6 +3,7 @@ use crate::agg::streams::StreamItem;
use crate::binned::{BinnedStreamKind, BinnedStreamRes, RangeCompletableItem};
use crate::binnedstream::BoxedStream;
use crate::cache::BinnedQuery;
use crate::frame::makeframe::FrameType;
use crate::raw::EventsQuery;
use err::Error;
use futures_core::Stream;

View File

@@ -76,7 +76,7 @@ where
StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))),
StreamItem::DataItem(item) => {
match decode_frame::<Result<StreamItem<RangeCompletableItem<SK::TBinnedBins>>, Error>>(
&item,
&item, <Result<StreamItem<RangeCompletableItem<SK::TBinnedBins>>, Error> as FrameType>::FRAME_TYPE_ID,
) {
Ok(Ok(item)) => Ready(Some(Ok(item))),
Ok(Err(e)) => {

View File

@@ -1,7 +1,7 @@
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::StreamItem;
use crate::binned::RangeCompletableItem;
use crate::binned::{RangeCompletableItem, XBinnedEvents};
use crate::frame::inmem::InMemoryFrame;
use crate::raw::EventQueryJsonStringFrame;
use bytes::{BufMut, BytesMut};
@@ -76,18 +76,17 @@ pub fn make_term_frame() -> BytesMut {
buf
}
pub fn decode_frame<FT>(frame: &InMemoryFrame) -> Result<FT, Error>
pub fn decode_frame<T>(frame: &InMemoryFrame, frame_type: u32) -> Result<T, Error>
where
FT: FrameType + DeserializeOwned,
T: DeserializeOwned,
{
if frame.encid() != 0x12121212 {
return Err(Error::with_msg(format!("unknown encoder id {:?}", frame)));
}
if frame.tyid() != FT::FRAME_TYPE_ID {
if frame.tyid() != frame_type {
return Err(Error::with_msg(format!(
"type id mismatch expect {} found {:?}",
FT::FRAME_TYPE_ID,
frame
frame_type, frame
)));
}
if frame.len() as usize != frame.buf().len() {

View File

@@ -1,7 +1,7 @@
use crate::agg::binnedt::AggregatableTdim;
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::agg::streams::{Collectable, Collected, StatsItem, StreamItem};
use crate::binned::{BinnedStreamKind, RangeCompletableItem};
use crate::binned::{BinnedStreamKind, PushableIndex, RangeCompletableItem, WithLen, WithTimestamps};
use crate::streamlog::LogItem;
use err::Error;
use futures_core::Stream;
@@ -166,12 +166,12 @@ where
for i1 in 0..self.inps.len() {
if let MergedCurVal::Val(val) = &self.current[i1] {
let u = self.ixs[i1];
if u >= val.tss.len() {
if u >= val.len() {
self.ixs[i1] = 0;
self.current[i1] = MergedCurVal::None;
continue 'outer;
} else {
let ts = val.tss[u];
let ts = val.ts(u);
if ts < lowest_ts {
lowest_ix = i1;
lowest_ts = ts;
@@ -180,40 +180,56 @@ where
}
}
if lowest_ix == usize::MAX {
if self.batch.tss.len() != 0 {
if self.batch.len() != 0 {
//let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty());
//let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k);
let emp = <<SK as BinnedStreamKind>::XBinnedEvents as Collected>::new(0);
let ret = std::mem::replace(&mut self.batch, emp);
self.data_emit_complete = true;
Ready(Some(Ok(StreamItem::DataItem(ret))))
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
} else {
self.data_emit_complete = true;
continue 'outer;
}
} else {
assert!(lowest_ts >= self.ts_last_emit);
let emp = <<SK as BinnedStreamKind>::XBinnedEvents as Collected>::new(0);
let mut local_batch = std::mem::replace(&mut self.batch, emp);
self.ts_last_emit = lowest_ts;
self.batch.tss.push(lowest_ts);
let rix = self.ixs[lowest_ix];
let z = match &self.current[lowest_ix] {
match &self.current[lowest_ix] {
MergedCurVal::Val(val) => {
local_batch.push_index(val, rix);
}
MergedCurVal::None => panic!(),
MergedCurVal::Finish => panic!(),
}
self.batch = local_batch;
self.ixs[lowest_ix] += 1;
let curlen = match &self.current[lowest_ix] {
MergedCurVal::Val(val) => val.len(),
MergedCurVal::None => panic!(),
MergedCurVal::Finish => panic!(),
};
if self.ixs[lowest_ix] >= curlen {
self.ixs[lowest_ix] = 0;
self.current[lowest_ix] = MergedCurVal::None;
}
//self.batch.tss.push(lowest_ts);
/*let z = match &self.current[lowest_ix] {
MergedCurVal::Val(k) => (k.mins[rix], k.maxs[rix], k.avgs[rix], k.tss.len()),
_ => panic!(),
};
self.batch.mins.push(z.0);
self.batch.maxs.push(z.1);
self.batch.avgs.push(z.2);
self.ixs[lowest_ix] += 1;
if self.ixs[lowest_ix] >= z.3 {
self.ixs[lowest_ix] = 0;
self.current[lowest_ix] = MergedCurVal::None;
}
if self.batch.tss.len() >= self.batch_size {
*/
if self.batch.len() >= self.batch_size {
//let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty());
//let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k);
let emp = <<SK as BinnedStreamKind>::XBinnedEvents as Collected>::new(0);
let ret = std::mem::replace(&mut self.batch, emp);
Ready(Some(Ok(StreamItem::DataItem(ret))))
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
} else {
continue 'outer;
}

View File

@@ -1,7 +1,7 @@
use crate::agg::streams::StreamItem;
use crate::binned::{BinnedStreamKind, RangeCompletableItem};
use crate::binned::{BinnedStreamKind, RangeCompletableItem, XBinnedEvents};
use crate::frame::inmem::InMemoryFrameAsyncReadStream;
use crate::frame::makeframe::decode_frame;
use crate::frame::makeframe::{decode_frame, FrameType};
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
@@ -40,6 +40,8 @@ impl<T, SK> Stream for EventsFromFrames<T, SK>
where
T: AsyncRead + Unpin,
SK: BinnedStreamKind,
// TODO see binned.rs better to express it on trait?
//Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::XBinnedEvents>>, Error>: FrameType,
{
type Item = Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::XBinnedEvents>>, Error>;
@@ -57,7 +59,15 @@ where
StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))),
StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))),
StreamItem::DataItem(frame) => {
match decode_frame::<<SK as BinnedStreamKind>::XBinnedEvents>(&frame) {
match decode_frame::<
Result<
StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::XBinnedEvents>>,
Error,
>,
>(
&frame,
<<SK as BinnedStreamKind>::XBinnedEvents as XBinnedEvents>::frame_type(),
) {
Ok(item) => match item {
Ok(item) => Ready(Some(Ok(item))),
Err(e) => {

View File

@@ -8,7 +8,7 @@ use crate::channelconfig::{extract_matching_config_entry, read_local_config};
use crate::eventblobs::EventBlobsComplete;
use crate::eventchunker::EventChunkerConf;
use crate::frame::inmem::InMemoryFrameAsyncReadStream;
use crate::frame::makeframe::{decode_frame, make_frame, make_term_frame};
use crate::frame::makeframe::{decode_frame, make_frame, make_term_frame, FrameType};
use crate::raw::{EventQueryJsonStringFrame, EventsQuery};
use err::Error;
use futures_util::StreamExt;
@@ -111,7 +111,8 @@ async fn events_conn_handler_inner_try(
error!("missing command frame");
return Err((Error::with_msg("missing command frame"), netout))?;
}
let qitem: EventQueryJsonStringFrame = match decode_frame(&frames[0]) {
let frame_type = <EventQueryJsonStringFrame as FrameType>::FRAME_TYPE_ID;
let qitem: EventQueryJsonStringFrame = match decode_frame(&frames[0], frame_type) {
Ok(k) => k,
Err(e) => return Err((e, netout).into()),
};

View File

@@ -1,6 +1,7 @@
use chrono::{DateTime, Utc};
use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use disk::agg::streams::StreamItem;
use disk::binned::BinnedScalarStreamItem;
use disk::binned::RangeCompletableItem;
use disk::cache::CacheUsage;
use disk::frame::inmem::InMemoryFrameAsyncReadStream;
use disk::frame::makeframe::FrameType;
@@ -91,7 +92,7 @@ pub async fn get_binned(
None
}
StreamItem::DataItem(frame) => {
type ExpectedType = Result<StreamItem<BinnedScalarStreamItem>, Error>;
type ExpectedType = Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarBinBatch>>, Error>;
let type_id_exp = <ExpectedType as FrameType>::FRAME_TYPE_ID;
if frame.tyid() != type_id_exp {
error!("unexpected type id got {} exp {}", frame.tyid(), type_id_exp);

View File

@@ -1,8 +1,9 @@
use crate::spawn_test_hosts;
use bytes::BytesMut;
use chrono::{DateTime, Utc};
use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use disk::agg::streams::StreamItem;
use disk::binned::BinnedScalarStreamItem;
use disk::binned::RangeCompletableItem;
use disk::frame::inmem::InMemoryFrameAsyncReadStream;
use disk::streamlog::Streamlog;
use err::Error;
@@ -162,7 +163,7 @@ where
None
}
StreamItem::DataItem(frame) => {
type ExpectedType = Result<StreamItem<BinnedScalarStreamItem>, Error>;
type ExpectedType = Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarBinBatch>>, Error>;
match bincode::deserialize::<ExpectedType>(frame.buf()) {
Ok(item) => match item {
Ok(item) => match item {