From a76e86e6239cf7c9acf1c40af756b1290e468795 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 26 May 2021 08:29:29 +0200 Subject: [PATCH] Factor types --- disk/src/binned.rs | 81 ++++++++++++++++++++++++++++++++----- disk/src/binned/scalar.rs | 1 + disk/src/cache/pbvfs.rs | 2 +- disk/src/frame/makeframe.rs | 11 +++-- disk/src/merge.rs | 44 +++++++++++++------- disk/src/raw/bffr.rs | 16 ++++++-- disk/src/raw/conn.rs | 5 ++- retrieval/src/client.rs | 5 ++- retrieval/src/test.rs | 5 ++- 9 files changed, 130 insertions(+), 40 deletions(-) diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 430cd51..2a28e36 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -8,7 +8,7 @@ use crate::binned::scalar::binned_stream; use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BoxedStream}; use crate::cache::{BinnedQuery, MergedFromRemotes}; use crate::channelconfig::{extract_matching_config_entry, read_local_config}; -use crate::frame::makeframe::make_frame; +use crate::frame::makeframe::{make_frame, FrameType}; use crate::raw::EventsQuery; use bytes::Bytes; use chrono::{TimeZone, Utc}; @@ -332,7 +332,7 @@ where impl Future for ReadPbv where - T: ReadableFromFile, + T: ReadableFromFile + Unpin, { type Output = Result>, Error>; @@ -391,6 +391,45 @@ impl FilterFittingInside for MinMaxAvgScalarBinBatch { } } +pub trait WithLen { + fn len(&self) -> usize; +} + +impl WithLen for MinMaxAvgScalarEventBatch { + fn len(&self) -> usize { + self.tss.len() + } +} + +impl WithLen for MinMaxAvgScalarBinBatch { + fn len(&self) -> usize { + self.ts1s.len() + } +} + +pub trait WithTimestamps { + fn ts(&self, ix: usize) -> u64; +} + +impl WithTimestamps for MinMaxAvgScalarEventBatch { + fn ts(&self, ix: usize) -> u64 { + self.tss[ix] + } +} + +pub trait PushableIndex { + fn push_index(&mut self, src: &Self, ix: usize); +} + +impl PushableIndex for MinMaxAvgScalarEventBatch { + fn push_index(&mut self, src: &Self, ix: usize) { + self.tss.push(src.tss[ix]); + self.mins.push(src.mins[ix]); + self.maxs.push(src.maxs[ix]); + self.avgs.push(src.avgs[ix]); + } +} + impl Collected for MinMaxAvgScalarEventBatch { // TODO for this case we don't have an expected number of events. Factor out into another trait? fn new(bin_count_exp: u32) -> Self { @@ -415,8 +454,6 @@ impl Collectable for MinMaxAvgScalarEventBatch { } } -pub trait TBinned: Send + Serialize + DeserializeOwned + Unpin + Collectable + AggregatableTdim {} - impl Collected for MinMaxAvgScalarBinBatch { fn new(bin_count_exp: u32) -> Self { MinMaxAvgScalarBinBatch::empty() @@ -437,14 +474,21 @@ impl Collectable for MinMaxAvgScalarBinBatch { } pub trait XBinnedEvents: - Sized + Unpin + Send + Serialize + DeserializeOwned + Collectable + Collected + AggregatableTdim + Sized + + Unpin + + Send + + Serialize + + DeserializeOwned + + Collectable + + Collected + + AggregatableTdim + + WithLen + + WithTimestamps + + PushableIndex { + fn frame_type() -> u32; } -impl XBinnedEvents for MinMaxAvgScalarEventBatch {} - -impl TBinnedBins for MinMaxAvgScalarBinBatch {} - pub trait TBinnedBins: Sized + Unpin @@ -456,10 +500,27 @@ pub trait TBinnedBins: + ReadableFromFile + FilterFittingInside + AggregatableTdim2 + + WithLen { + fn frame_type() -> u32; } -pub trait BinnedStreamKind: Clone + Unpin + Send + Sync + 'static { +impl XBinnedEvents for MinMaxAvgScalarEventBatch { + fn frame_type() -> u32 { + >, Error> as FrameType>::FRAME_TYPE_ID + } +} + +impl TBinnedBins for MinMaxAvgScalarBinBatch { + fn frame_type() -> u32 { + >, Error> as FrameType>::FRAME_TYPE_ID + } +} + +pub trait BinnedStreamKind: Clone + Unpin + Send + Sync + 'static +// TODO would it be better to express it here? +//where Result>, Error>: FrameType, +{ type TBinnedStreamType: Stream>, Error>> + Send + 'static; diff --git a/disk/src/binned/scalar.rs b/disk/src/binned/scalar.rs index 03241f3..918c6bf 100644 --- a/disk/src/binned/scalar.rs +++ b/disk/src/binned/scalar.rs @@ -3,6 +3,7 @@ use crate::agg::streams::StreamItem; use crate::binned::{BinnedStreamKind, BinnedStreamRes, RangeCompletableItem}; use crate::binnedstream::BoxedStream; use crate::cache::BinnedQuery; +use crate::frame::makeframe::FrameType; use crate::raw::EventsQuery; use err::Error; use futures_core::Stream; diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs index 6521e1a..29932a6 100644 --- a/disk/src/cache/pbvfs.rs +++ b/disk/src/cache/pbvfs.rs @@ -76,7 +76,7 @@ where StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), StreamItem::DataItem(item) => { match decode_frame::>, Error>>( - &item, + &item, >, Error> as FrameType>::FRAME_TYPE_ID, ) { Ok(Ok(item)) => Ready(Some(Ok(item))), Ok(Err(e)) => { diff --git a/disk/src/frame/makeframe.rs b/disk/src/frame/makeframe.rs index 6a82c4d..0fa1d5c 100644 --- a/disk/src/frame/makeframe.rs +++ b/disk/src/frame/makeframe.rs @@ -1,7 +1,7 @@ use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; -use crate::binned::RangeCompletableItem; +use crate::binned::{RangeCompletableItem, XBinnedEvents}; use crate::frame::inmem::InMemoryFrame; use crate::raw::EventQueryJsonStringFrame; use bytes::{BufMut, BytesMut}; @@ -76,18 +76,17 @@ pub fn make_term_frame() -> BytesMut { buf } -pub fn decode_frame(frame: &InMemoryFrame) -> Result +pub fn decode_frame(frame: &InMemoryFrame, frame_type: u32) -> Result where - FT: FrameType + DeserializeOwned, + T: DeserializeOwned, { if frame.encid() != 0x12121212 { return Err(Error::with_msg(format!("unknown encoder id {:?}", frame))); } - if frame.tyid() != FT::FRAME_TYPE_ID { + if frame.tyid() != frame_type { return Err(Error::with_msg(format!( "type id mismatch expect {} found {:?}", - FT::FRAME_TYPE_ID, - frame + frame_type, frame ))); } if frame.len() as usize != frame.buf().len() { diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 215f6da..417a8a8 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -1,7 +1,7 @@ use crate::agg::binnedt::AggregatableTdim; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::streams::{Collectable, Collected, StatsItem, StreamItem}; -use crate::binned::{BinnedStreamKind, RangeCompletableItem}; +use crate::binned::{BinnedStreamKind, PushableIndex, RangeCompletableItem, WithLen, WithTimestamps}; use crate::streamlog::LogItem; use err::Error; use futures_core::Stream; @@ -166,12 +166,12 @@ where for i1 in 0..self.inps.len() { if let MergedCurVal::Val(val) = &self.current[i1] { let u = self.ixs[i1]; - if u >= val.tss.len() { + if u >= val.len() { self.ixs[i1] = 0; self.current[i1] = MergedCurVal::None; continue 'outer; } else { - let ts = val.tss[u]; + let ts = val.ts(u); if ts < lowest_ts { lowest_ix = i1; lowest_ts = ts; @@ -180,40 +180,56 @@ where } } if lowest_ix == usize::MAX { - if self.batch.tss.len() != 0 { + if self.batch.len() != 0 { //let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); //let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k); let emp = <::XBinnedEvents as Collected>::new(0); let ret = std::mem::replace(&mut self.batch, emp); self.data_emit_complete = true; - Ready(Some(Ok(StreamItem::DataItem(ret)))) + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) } else { self.data_emit_complete = true; continue 'outer; } } else { assert!(lowest_ts >= self.ts_last_emit); + let emp = <::XBinnedEvents as Collected>::new(0); + let mut local_batch = std::mem::replace(&mut self.batch, emp); self.ts_last_emit = lowest_ts; - self.batch.tss.push(lowest_ts); let rix = self.ixs[lowest_ix]; - let z = match &self.current[lowest_ix] { + match &self.current[lowest_ix] { + MergedCurVal::Val(val) => { + local_batch.push_index(val, rix); + } + MergedCurVal::None => panic!(), + MergedCurVal::Finish => panic!(), + } + self.batch = local_batch; + self.ixs[lowest_ix] += 1; + let curlen = match &self.current[lowest_ix] { + MergedCurVal::Val(val) => val.len(), + MergedCurVal::None => panic!(), + MergedCurVal::Finish => panic!(), + }; + if self.ixs[lowest_ix] >= curlen { + self.ixs[lowest_ix] = 0; + self.current[lowest_ix] = MergedCurVal::None; + } + //self.batch.tss.push(lowest_ts); + /*let z = match &self.current[lowest_ix] { MergedCurVal::Val(k) => (k.mins[rix], k.maxs[rix], k.avgs[rix], k.tss.len()), _ => panic!(), }; self.batch.mins.push(z.0); self.batch.maxs.push(z.1); self.batch.avgs.push(z.2); - self.ixs[lowest_ix] += 1; - if self.ixs[lowest_ix] >= z.3 { - self.ixs[lowest_ix] = 0; - self.current[lowest_ix] = MergedCurVal::None; - } - if self.batch.tss.len() >= self.batch_size { + */ + if self.batch.len() >= self.batch_size { //let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); //let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k); let emp = <::XBinnedEvents as Collected>::new(0); let ret = std::mem::replace(&mut self.batch, emp); - Ready(Some(Ok(StreamItem::DataItem(ret)))) + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) } else { continue 'outer; } diff --git a/disk/src/raw/bffr.rs b/disk/src/raw/bffr.rs index 845d457..f816f84 100644 --- a/disk/src/raw/bffr.rs +++ b/disk/src/raw/bffr.rs @@ -1,7 +1,7 @@ use crate::agg::streams::StreamItem; -use crate::binned::{BinnedStreamKind, RangeCompletableItem}; +use crate::binned::{BinnedStreamKind, RangeCompletableItem, XBinnedEvents}; use crate::frame::inmem::InMemoryFrameAsyncReadStream; -use crate::frame::makeframe::decode_frame; +use crate::frame::makeframe::{decode_frame, FrameType}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; @@ -40,6 +40,8 @@ impl Stream for EventsFromFrames where T: AsyncRead + Unpin, SK: BinnedStreamKind, + // TODO see binned.rs better to express it on trait? + //Result::XBinnedEvents>>, Error>: FrameType, { type Item = Result::XBinnedEvents>>, Error>; @@ -57,7 +59,15 @@ where StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), StreamItem::DataItem(frame) => { - match decode_frame::<::XBinnedEvents>(&frame) { + match decode_frame::< + Result< + StreamItem::XBinnedEvents>>, + Error, + >, + >( + &frame, + <::XBinnedEvents as XBinnedEvents>::frame_type(), + ) { Ok(item) => match item { Ok(item) => Ready(Some(Ok(item))), Err(e) => { diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index c85cb10..170db0c 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -8,7 +8,7 @@ use crate::channelconfig::{extract_matching_config_entry, read_local_config}; use crate::eventblobs::EventBlobsComplete; use crate::eventchunker::EventChunkerConf; use crate::frame::inmem::InMemoryFrameAsyncReadStream; -use crate::frame::makeframe::{decode_frame, make_frame, make_term_frame}; +use crate::frame::makeframe::{decode_frame, make_frame, make_term_frame, FrameType}; use crate::raw::{EventQueryJsonStringFrame, EventsQuery}; use err::Error; use futures_util::StreamExt; @@ -111,7 +111,8 @@ async fn events_conn_handler_inner_try( error!("missing command frame"); return Err((Error::with_msg("missing command frame"), netout))?; } - let qitem: EventQueryJsonStringFrame = match decode_frame(&frames[0]) { + let frame_type = ::FRAME_TYPE_ID; + let qitem: EventQueryJsonStringFrame = match decode_frame(&frames[0], frame_type) { Ok(k) => k, Err(e) => return Err((e, netout).into()), }; diff --git a/retrieval/src/client.rs b/retrieval/src/client.rs index 698c1a1..042384a 100644 --- a/retrieval/src/client.rs +++ b/retrieval/src/client.rs @@ -1,6 +1,7 @@ use chrono::{DateTime, Utc}; +use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use disk::agg::streams::StreamItem; -use disk::binned::BinnedScalarStreamItem; +use disk::binned::RangeCompletableItem; use disk::cache::CacheUsage; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::frame::makeframe::FrameType; @@ -91,7 +92,7 @@ pub async fn get_binned( None } StreamItem::DataItem(frame) => { - type ExpectedType = Result, Error>; + type ExpectedType = Result>, Error>; let type_id_exp = ::FRAME_TYPE_ID; if frame.tyid() != type_id_exp { error!("unexpected type id got {} exp {}", frame.tyid(), type_id_exp); diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index 99408c9..6465116 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -1,8 +1,9 @@ use crate::spawn_test_hosts; use bytes::BytesMut; use chrono::{DateTime, Utc}; +use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use disk::agg::streams::StreamItem; -use disk::binned::BinnedScalarStreamItem; +use disk::binned::RangeCompletableItem; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::streamlog::Streamlog; use err::Error; @@ -162,7 +163,7 @@ where None } StreamItem::DataItem(frame) => { - type ExpectedType = Result, Error>; + type ExpectedType = Result>, Error>; match bincode::deserialize::(frame.buf()) { Ok(item) => match item { Ok(item) => match item {