From 836d0248753973fdba3afb9261cc04e513669751 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 9 Jun 2021 21:38:15 +0200 Subject: [PATCH] Remove StreamKind --- disk/src/agg.rs | 33 +--- disk/src/agg/binnedt4.rs | 12 +- disk/src/agg/enp.rs | 14 +- disk/src/agg/eventbatch.rs | 27 +--- disk/src/agg/scalarbinbatch.rs | 26 +--- disk/src/agg/streams.rs | 1 - disk/src/aggtest.rs | 11 +- disk/src/binned.rs | 195 ++---------------------- disk/src/binned/binnedfrompbv.rs | 3 +- disk/src/binned/pbv.rs | 10 +- disk/src/binned/prebinned.rs | 10 +- disk/src/binned/scalar.rs | 70 --------- disk/src/binnedstream.rs | 8 - disk/src/cache.rs | 124 +-------------- disk/src/cache/pbvfs.rs | 153 ------------------- disk/src/decode.rs | 23 ++- disk/src/merge.rs | 225 +--------------------------- disk/src/merge/mergedfromremotes.rs | 2 +- disk/src/raw.rs | 36 +---- disk/src/raw/conn.rs | 5 +- disk/src/raw/eventsfromframes.rs | 2 - httpret/src/lib.rs | 3 - 22 files changed, 58 insertions(+), 935 deletions(-) delete mode 100644 disk/src/binned/scalar.rs delete mode 100644 disk/src/cache/pbvfs.rs diff --git a/disk/src/agg.rs b/disk/src/agg.rs index b7b326b..d82d8f4 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -1,21 +1,11 @@ -/*! -Aggregation and binning support. -*/ +//! Aggregation and binning support. -use super::eventchunker::EventFull; -use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; -use crate::agg::streams::StreamItem; -use crate::binned::{RangeCompletableItem, StreamKind}; use bytes::BytesMut; use err::Error; -use futures_core::Stream; -use futures_util::StreamExt; use netpod::NanoRange; use netpod::ScalarType; use serde::{Deserialize, Serialize}; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; +use std::time::Duration; pub mod binnedt4; pub mod enp; @@ -23,24 +13,6 @@ pub mod eventbatch; pub mod scalarbinbatch; pub mod streams; -/// Batch of events with a scalar (zero dimensions) numeric value. -pub struct ValuesDim0 { - tss: Vec, - values: Vec>, -} - -impl std::fmt::Debug for ValuesDim0 { - fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - write!( - fmt, - "count {} tsA {:?} tsB {:?}", - self.tss.len(), - self.tss.first(), - self.tss.last() - ) - } -} - #[derive(Debug, Serialize, Deserialize)] pub struct ValuesExtractStats { pub dur: Duration, @@ -110,6 +82,7 @@ impl NumEx for NumF32 { macro_rules! make_get_values { ($n:ident, $TY:ident, $FROM_BYTES:ident, $BY:expr) => { + #[allow(unused)] fn $n(decomp: &BytesMut, ty: &ScalarType) -> Result, Error> { let n1 = decomp.len(); if ty.bytes() as usize != $BY { diff --git a/disk/src/agg/binnedt4.rs b/disk/src/agg/binnedt4.rs index 870c311..8489dcb 100644 --- a/disk/src/agg/binnedt4.rs +++ b/disk/src/agg/binnedt4.rs @@ -1,15 +1,6 @@ -use crate::agg::enp::XBinnedScalarEvents; -use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; -use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::{Appendable, StreamItem}; -use crate::binned::{ - FilterFittingInside, MinMaxAvgAggregator, MinMaxAvgBins, NumOps, RangeCompletableItem, RangeOverlapInfo, ReadPbv, - ReadableFromFile, SingleXBinAggregator, -}; -use crate::decode::EventValues; -use crate::frame::makeframe::Framable; +use crate::binned::{FilterFittingInside, RangeCompletableItem, RangeOverlapInfo, ReadableFromFile}; use crate::Sitemty; -use err::Error; use futures_core::Stream; use futures_util::StreamExt; use netpod::log::*; @@ -19,7 +10,6 @@ use std::collections::VecDeque; use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; -use tokio::fs::File; pub struct DefaultBinsTimeBinner { _m1: PhantomData, diff --git a/disk/src/agg/enp.rs b/disk/src/agg/enp.rs index c0ea46c..98c1d12 100644 --- a/disk/src/agg/enp.rs +++ b/disk/src/agg/enp.rs @@ -7,7 +7,6 @@ use crate::binned::{ use crate::decode::EventValues; use err::Error; use netpod::NanoRange; -use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use std::marker::PhantomData; use tokio::fs::File; @@ -23,7 +22,7 @@ where type Input = NTY; type Output = EventValues; - fn process(inp: EventValues) -> Self::Output { + fn process(_inp: EventValues) -> Self::Output { todo!() } } @@ -85,7 +84,7 @@ impl RangeOverlapInfo for XBinnedScalarEvents { } impl FilterFittingInside for XBinnedScalarEvents { - fn filter_fitting_inside(self, fit_range: NanoRange) -> Option { + fn filter_fitting_inside(self, _fit_range: NanoRange) -> Option { todo!() } } @@ -124,12 +123,13 @@ impl ReadableFromFile for XBinnedScalarEvents where NTY: NumOps, { - fn read_from_file(file: File) -> Result, Error> { - todo!() + fn read_from_file(_file: File) -> Result, Error> { + // TODO refactor types such that this impl is not needed. + panic!() } - fn from_buf(buf: &[u8]) -> Result { - todo!() + fn from_buf(_buf: &[u8]) -> Result { + panic!() } } diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs index ff00d9e..3ab4b0f 100644 --- a/disk/src/agg/eventbatch.rs +++ b/disk/src/agg/eventbatch.rs @@ -1,6 +1,5 @@ -use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::{Appendable, StreamItem}; -use crate::binned::{MakeBytesFrame, RangeCompletableItem, RangeOverlapInfo, StreamKind}; +use crate::binned::{MakeBytesFrame, RangeCompletableItem, RangeOverlapInfo}; use crate::frame::makeframe::make_frame; use bytes::{BufMut, Bytes, BytesMut}; use err::Error; @@ -124,30 +123,6 @@ impl MinMaxAvgScalarEventBatch { } } -pub struct MinMaxAvgScalarEventBatchAggregator { - ts1: u64, - ts2: u64, - count: u64, - min: f32, - max: f32, - sum: f32, - sumc: u64, -} - -impl MinMaxAvgScalarEventBatchAggregator { - pub fn new(ts1: u64, ts2: u64) -> Self { - Self { - ts1, - ts2, - count: 0, - min: f32::MAX, - max: f32::MIN, - sum: f32::NAN, - sumc: 0, - } - } -} - impl MakeBytesFrame for Result>, Error> { fn make_bytes_frame(&self) -> Result { Ok(make_frame(self)?.freeze()) diff --git a/disk/src/agg/scalarbinbatch.rs b/disk/src/agg/scalarbinbatch.rs index 3a871d0..27803ee 100644 --- a/disk/src/agg/scalarbinbatch.rs +++ b/disk/src/agg/scalarbinbatch.rs @@ -1,6 +1,6 @@ use crate::agg::streams::{Appendable, StreamItem, ToJsonBytes}; use crate::agg::{Fits, FitsInside}; -use crate::binned::{MakeBytesFrame, RangeCompletableItem, StreamKind}; +use crate::binned::{MakeBytesFrame, RangeCompletableItem}; use crate::frame::makeframe::make_frame; use bytes::{BufMut, Bytes, BytesMut}; use err::Error; @@ -184,30 +184,6 @@ impl MinMaxAvgScalarBinBatch { } } -pub struct MinMaxAvgScalarBinBatchAggregator { - ts1: u64, - ts2: u64, - count: u64, - min: f32, - max: f32, - sum: f32, - sumc: u64, -} - -impl MinMaxAvgScalarBinBatchAggregator { - pub fn new(ts1: u64, ts2: u64) -> Self { - Self { - ts1, - ts2, - count: 0, - min: f32::MAX, - max: f32::MIN, - sum: 0f32, - sumc: 0, - } - } -} - impl MakeBytesFrame for Result>, Error> { fn make_bytes_frame(&self) -> Result { Ok(make_frame(self)?.freeze()) diff --git a/disk/src/agg/streams.rs b/disk/src/agg/streams.rs index 8be6214..4262fbb 100644 --- a/disk/src/agg/streams.rs +++ b/disk/src/agg/streams.rs @@ -3,7 +3,6 @@ use crate::streamlog::LogItem; use err::Error; use netpod::EventDataReadStats; use serde::{Deserialize, Serialize}; -use std::any::Any; #[derive(Debug, Serialize, Deserialize)] pub enum StatsItem { diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 2dbea43..10a1ac0 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -1,10 +1,7 @@ -use crate::binned::BinnedStreamKindScalar; use crate::eventblobs::EventBlobsComplete; use crate::eventchunker::EventChunkerConf; -use futures_util::StreamExt; use netpod::timeunits::*; -use netpod::{BinnedRange, ByteOrder, ByteSize, Channel, ChannelConfig, NanoRange, Nanos, Node, ScalarType, Shape}; -use std::future::ready; +use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, NanoRange, Nanos, Node, ScalarType, Shape}; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -50,7 +47,7 @@ async fn agg_x_dim_0_inner() { tb_file_count: 1, buffer_size: 1024 * 4, }; - let bin_count = 20; + let _bin_count = 20; let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.ns; let ts2 = ts1 + HOUR * 24; let range = NanoRange { beg: ts1, end: ts2 }; @@ -63,6 +60,7 @@ async fn agg_x_dim_0_inner() { query.buffer_size as usize, event_chunker_conf, ); + let _ = fut1; // TODO add the binning and expectation and await the result. } @@ -98,7 +96,7 @@ async fn agg_x_dim_1_inner() { tb_file_count: 1, buffer_size: 17, }; - let bin_count = 10; + let _bin_count = 10; let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.ns; let ts2 = ts1 + HOUR * 24; let range = NanoRange { beg: ts1, end: ts2 }; @@ -111,5 +109,6 @@ async fn agg_x_dim_1_inner() { query.buffer_size as usize, event_chunker_conf, ); + let _ = fut1; // TODO add the binning and expectation and await the result. } diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 30792e6..7aa3233 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -1,18 +1,17 @@ use crate::agg::binnedt4::{TBinnerStream, TimeBinnableType, TimeBinnableTypeAggregator}; -use crate::agg::enp::{Identity, WaveXBinner, XBinnedScalarEvents}; +use crate::agg::enp::{Identity, WaveXBinner}; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::{Appendable, Collectable, Collector, StreamItem, ToJsonBytes, ToJsonResult}; use crate::agg::{Fits, FitsInside}; use crate::binned::binnedfrompbv::BinnedFromPreBinned; -use crate::binned::query::{BinnedQuery, PreBinnedQuery}; +use crate::binned::query::BinnedQuery; use crate::binnedstream::BoxedStream; -use crate::cache::MergedFromRemotes; use crate::decode::{ BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValues, EventValuesDim0Case, EventValuesDim1Case, LittleEndian, NumFromBytes, }; -use crate::frame::makeframe::{make_frame, Framable, FrameType, SubFrId}; +use crate::frame::makeframe::{Framable, FrameType, SubFrId}; use crate::merge::mergedfromremotes::MergedFromRemotes2; use crate::raw::EventsQuery; use crate::Sitemty; @@ -24,15 +23,13 @@ use futures_util::{FutureExt, StreamExt}; use netpod::log::*; use netpod::timeunits::SEC; use netpod::{ - AggKind, BinnedRange, ByteOrder, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, - PreBinnedPatchRange, ScalarType, Shape, + BinnedRange, ByteOrder, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange, + ScalarType, Shape, }; use num_traits::{AsPrimitive, Bounded, Zero}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize, Serializer}; -use serde_json::Map; -use std::any::Any; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; @@ -45,7 +42,6 @@ pub mod binnedfrompbv; pub mod pbv; pub mod prebinned; pub mod query; -pub mod scalar; pub struct BinnedStreamRes { pub binned_stream: BoxedStream>, Error>>, @@ -143,7 +139,6 @@ impl BinnedResponseItem for T where T: Send + ToJsonResult + Framable {} pub struct BinnedResponseDyn { stream: Pin> + Send>>, - bin_count: u32, } fn make_num_pipeline_nty_end_evs_enp( @@ -174,10 +169,7 @@ where { let res = make_num_pipeline_nty_end_evs_enp_stat::<_, _, _, ENP>(event_value_shape, query, node_config)?; let s = ppp.convert(res.stream, res.bin_count); - let ret = BinnedResponseDyn { - stream: Box::pin(s), - bin_count: res.bin_count, - }; + let ret = BinnedResponseDyn { stream: Box::pin(s) }; Ok(ret) } @@ -267,10 +259,7 @@ where // TODO can I use the same binned_stream machinery to construct the matching empty result? // Need the requested range all with empty/nan values and zero counts. let s = futures_util::stream::empty(); - let ret = BinnedResponseDyn { - stream: Box::pin(s), - bin_count: 0, - }; + let ret = BinnedResponseDyn { stream: Box::pin(s) }; Ok(ret) } MatchingConfigEntry::Entry(entry) => { @@ -310,7 +299,7 @@ where fn convert( &self, inp: Pin>> + Send>>, - bin_count_exp: u32, + _bin_count_exp: u32, ) -> Pin> + Send>> { let s = StreamExt::map(inp, |item| Box::new(item) as Box); Box::pin(s) @@ -700,35 +689,6 @@ pub trait RangeOverlapInfo { fn starts_after(&self, range: NanoRange) -> bool; } -pub trait XBinnedEvents: - Sized + Unpin + Send + Serialize + DeserializeOwned + WithLen + WithTimestamps + PushableIndex + Appendable -where - SK: StreamKind, -{ - fn frame_type() -> u32; -} - -pub trait TBinnedBins: - Sized + Unpin + Send + Serialize + DeserializeOwned + ReadableFromFile + FilterFittingInside + WithLen + Appendable -{ - fn frame_type() -> u32; -} - -impl XBinnedEvents for MinMaxAvgScalarEventBatch -where - SK: StreamKind, -{ - fn frame_type() -> u32 { - >, Error> as FrameType>::FRAME_TYPE_ID - } -} - -impl TBinnedBins for MinMaxAvgScalarBinBatch { - fn frame_type() -> u32 { - >, Error> as FrameType>::FRAME_TYPE_ID - } -} - pub trait NumOps: Sized + Copy + Send + Unpin + Zero + AsPrimitive + Bounded + PartialOrd + SubFrId + Serialize + DeserializeOwned { @@ -938,11 +898,6 @@ impl MinMaxAvgBinsCollected { } } -pub struct MinMaxAvgBinsCollectionSpec { - bin_count_exp: u32, - _m1: PhantomData, -} - #[derive(Serialize)] pub struct MinMaxAvgBinsCollectedResult { ts_bin_edges: Vec, @@ -1042,7 +997,7 @@ where } } -pub struct MinMaxAvgAggregator { +pub struct EventValuesAggregator { range: NanoRange, count: u32, min: Option, @@ -1050,7 +1005,7 @@ pub struct MinMaxAvgAggregator { avg: Option, } -impl MinMaxAvgAggregator { +impl EventValuesAggregator { pub fn new(range: NanoRange) -> Self { Self { range, @@ -1062,8 +1017,7 @@ impl MinMaxAvgAggregator { } } -// TODO rename to EventValuesAggregator -impl TimeBinnableTypeAggregator for MinMaxAvgAggregator +impl TimeBinnableTypeAggregator for EventValuesAggregator where NTY: NumOps, { @@ -1074,7 +1028,8 @@ where &self.range } - fn ingest(&mut self, item: &Self::Input) { + fn ingest(&mut self, _item: &Self::Input) { + // TODO construct test case to hit this: todo!() } @@ -1085,10 +1040,9 @@ where pub struct MinMaxAvgBinsAggregator { range: NanoRange, - count: u32, + count: u64, min: Option, max: Option, - avg: Option, sum: f32, sumc: u32, } @@ -1097,11 +1051,10 @@ impl MinMaxAvgBinsAggregator { pub fn new(range: NanoRange) -> Self { Self { range, - // TODO: count events here? + // TODO: actually count events through the whole pipeline. count: 0, min: None, max: None, - avg: None, sum: 0f32, sumc: 0, } @@ -1175,8 +1128,7 @@ where Self::Output { ts1s: vec![self.range.beg], ts2s: vec![self.range.end], - // TODO - counts: vec![0], + counts: vec![self.count as u64], mins: vec![self.min], maxs: vec![self.max], avgs: vec![avg], @@ -1184,123 +1136,8 @@ where } } -pub struct SingleXBinAggregator { - range: NanoRange, - count: u32, - min: Option, - max: Option, - avg: Option, -} - -impl SingleXBinAggregator { - pub fn new(range: NanoRange) -> Self { - Self { - range, - count: 0, - min: None, - max: None, - avg: None, - } - } -} - -pub trait StreamKind: Clone + Unpin + Send + Sync + 'static { - type TBinnedStreamType: Stream>, Error>> + Send; - type XBinnedEvents: XBinnedEvents; - type TBinnedBins: TBinnedBins; - type XBinnedToTBinnedAggregator; - type XBinnedToTBinnedStream: Stream>, Error>> - + Send; - - fn new_binned_from_prebinned( - &self, - query: &BinnedQuery, - range: BinnedRange, - pre_range: PreBinnedPatchRange, - node_config: &NodeConfigCached, - ) -> Result; - - fn new_binned_from_merged( - &self, - evq: EventsQuery, - perf_opts: PerfOpts, - range: BinnedRange, - node_config: &NodeConfigCached, - ) -> Result; - - fn xbinned_to_tbinned(inp: S, spec: BinnedRange) -> Self::XBinnedToTBinnedStream - where - S: Stream>, Error>> + Send + 'static; -} - -#[derive(Clone)] -pub struct BinnedStreamKindScalar {} - -#[derive(Clone)] -pub struct BinnedStreamKindWave {} - -impl BinnedStreamKindScalar { - pub fn new() -> Self { - Self {} - } -} - -impl BinnedStreamKindWave { - pub fn new() -> Self { - Self {} - } -} - #[derive(Debug, Serialize, Deserialize)] pub enum RangeCompletableItem { RangeComplete, Data(T), } - -pub struct Agg3 {} - -pub struct BinnedT3Stream {} - -impl Stream for BinnedT3Stream { - type Item = Sitemty; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - todo!() - } -} - -impl StreamKind for BinnedStreamKindScalar { - // TODO is this really needed? - type TBinnedStreamType = BoxedStream>, Error>>; - type XBinnedEvents = MinMaxAvgScalarEventBatch; - type TBinnedBins = MinMaxAvgScalarBinBatch; - type XBinnedToTBinnedAggregator = Agg3; - type XBinnedToTBinnedStream = BinnedT3Stream; - - fn new_binned_from_prebinned( - &self, - query: &BinnedQuery, - range: BinnedRange, - pre_range: PreBinnedPatchRange, - node_config: &NodeConfigCached, - ) -> Result { - err::todoval() - } - - fn new_binned_from_merged( - &self, - evq: EventsQuery, - perf_opts: PerfOpts, - range: BinnedRange, - node_config: &NodeConfigCached, - ) -> Result { - err::todoval() - } - - fn xbinned_to_tbinned(inp: S, spec: BinnedRange) -> Self::XBinnedToTBinnedStream - where - S: Stream>, Error>> + Send + 'static, - { - err::todoval() - } -} diff --git a/disk/src/binned/binnedfrompbv.rs b/disk/src/binned/binnedfrompbv.rs index f9332df..93b912a 100644 --- a/disk/src/binned/binnedfrompbv.rs +++ b/disk/src/binned/binnedfrompbv.rs @@ -1,4 +1,4 @@ -use crate::agg::binnedt4::{TBinnerStream, TimeBinnableType, TimeBinnableTypeAggregator}; +use crate::agg::binnedt4::{TBinnerStream, TimeBinnableType}; use crate::agg::streams::StreamItem; use crate::binned::query::{CacheUsage, PreBinnedQuery}; use crate::binned::RangeCompletableItem; @@ -13,7 +13,6 @@ use http::{StatusCode, Uri}; use netpod::log::*; use netpod::{AggKind, BinnedRange, ByteSize, Channel, NodeConfigCached, PerfOpts, PreBinnedPatchIterator}; use serde::de::DeserializeOwned; -use serde::Deserialize; use std::future::ready; use std::marker::PhantomData; use std::pin::Pin; diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index 8918448..837b0ff 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -2,14 +2,10 @@ use crate::agg::binnedt4::{TBinnerStream, TimeBinnableType}; use crate::agg::streams::{Appendable, StreamItem}; use crate::binned::binnedfrompbv::FetchedPreBinned; use crate::binned::query::{CacheUsage, PreBinnedQuery}; -use crate::binned::{ - BinnedStreamKindScalar, EventsNodeProcessor, NumOps, PushableIndex, RangeCompletableItem, ReadableFromFile, - StreamKind, WithLen, -}; -use crate::cache::pbvfs::PreBinnedScalarValueFetchedStream; -use crate::cache::{write_pb_cache_min_max_avg_scalar, CacheFileDesc, MergedFromRemotes, WrittenPbCache}; +use crate::binned::{EventsNodeProcessor, NumOps, PushableIndex, RangeCompletableItem, ReadableFromFile, WithLen}; +use crate::cache::{write_pb_cache_min_max_avg_scalar, CacheFileDesc, WrittenPbCache}; use crate::decode::{Endianness, EventValueFromBytes, EventValueShape, NumFromBytes}; -use crate::frame::makeframe::{make_frame, FrameType}; +use crate::frame::makeframe::FrameType; use crate::merge::mergedfromremotes::MergedFromRemotes2; use crate::raw::EventsQuery; use crate::streamlog::Streamlog; diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index f34b0b9..e99d0b4 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -1,12 +1,9 @@ -use crate::agg::binnedt4::{DefaultBinsTimeBinner, TimeBinnableType}; +use crate::agg::binnedt4::TimeBinnableType; use crate::agg::enp::{Identity, WaveXBinner}; -use crate::agg::streams::{Appendable, StreamItem}; -// use crate::binned::pbv2::{ -// pre_binned_value_byte_stream_new, PreBinnedValueByteStream, PreBinnedValueByteStreamInner, PreBinnedValueStream, -// }; +use crate::agg::streams::Appendable; use crate::binned::pbv::PreBinnedValueStream; use crate::binned::query::PreBinnedQuery; -use crate::binned::{EventsNodeProcessor, NumOps, PushableIndex, RangeCompletableItem, ReadableFromFile, StreamKind}; +use crate::binned::{EventsNodeProcessor, NumOps, PushableIndex}; use crate::cache::node_ix_for_patch; use crate::decode::{ BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case, @@ -18,7 +15,6 @@ use bytes::Bytes; use err::Error; use futures_core::Stream; use futures_util::StreamExt; -use netpod::streamext::SCC; use netpod::{ByteOrder, NodeConfigCached, ScalarType, Shape}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; use serde::de::DeserializeOwned; diff --git a/disk/src/binned/scalar.rs b/disk/src/binned/scalar.rs deleted file mode 100644 index 250a0f4..0000000 --- a/disk/src/binned/scalar.rs +++ /dev/null @@ -1,70 +0,0 @@ -use crate::binned::query::BinnedQuery; -use crate::binned::{BinnedStreamRes, StreamKind}; -use crate::binnedstream::BoxedStream; -use crate::raw::EventsQuery; -use err::Error; -use netpod::log::*; -use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchRange}; - -// TODO can be removed when StreamKind no longer used. -pub async fn binned_stream( - node_config: &NodeConfigCached, - query: &BinnedQuery, - stream_kind: SK, -) -> Result, Error> -where - SK: StreamKind, -{ - if query.channel().backend != node_config.node.backend { - let err = Error::with_msg(format!( - "backend mismatch node: {} requested: {}", - node_config.node.backend, - query.channel().backend - )); - return Err(err); - } - let range = BinnedRange::covering_range(query.range().clone(), query.bin_count())?.ok_or(Error::with_msg( - format!("binned_bytes_for_http BinnedRange::covering_range returned None"), - ))?; - let perf_opts = PerfOpts { inmem_bufcap: 512 }; - //let _shape = entry.to_shape()?; - match PreBinnedPatchRange::covering_range(query.range().clone(), query.bin_count()) { - Ok(Some(pre_range)) => { - info!("binned_bytes_for_http found pre_range: {:?}", pre_range); - if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { - let msg = format!( - "binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}", - pre_range, range - ); - return Err(Error::with_msg(msg)); - } - let s = SK::new_binned_from_prebinned(&stream_kind, query, range.clone(), pre_range, node_config)?; - let s = BoxedStream::new(Box::pin(s))?; - let ret = BinnedStreamRes { - binned_stream: s, - range, - }; - Ok(ret) - } - Ok(None) => { - info!( - "binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}", - range - ); - let evq = EventsQuery { - channel: query.channel().clone(), - range: query.range().clone(), - agg_kind: query.agg_kind().clone(), - }; - // TODO do I need to set up more transformations or binning to deliver the requested data? - let s = SK::new_binned_from_merged(&stream_kind, evq, perf_opts, range.clone(), node_config)?; - let s = BoxedStream::new(Box::pin(s))?; - let ret = BinnedStreamRes { - binned_stream: s, - range, - }; - Ok(ret) - } - Err(e) => Err(e), - } -} diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index 7ee5cb9..cf6c2b7 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -1,14 +1,6 @@ -use crate::agg::streams::StreamItem; -use crate::binned::query::{CacheUsage, PreBinnedQuery}; -use crate::binned::{RangeCompletableItem, StreamKind}; -use crate::cache::pbvfs::PreBinnedScalarValueFetchedStream; -use crate::frame::makeframe::FrameType; use err::Error; use futures_core::Stream; use futures_util::StreamExt; -use netpod::log::*; -use netpod::{AggKind, BinnedRange, ByteSize, Channel, NodeConfigCached, PreBinnedPatchIterator}; -use std::future::ready; use std::pin::Pin; use std::task::{Context, Poll}; diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 4815155..4c69f23 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,21 +1,13 @@ -use crate::agg::streams::StreamItem; -use crate::binned::{RangeCompletableItem, StreamKind}; -use crate::frame::makeframe::FrameType; -use crate::merge::MergedStream; -use crate::raw::{x_processed_stream_from_node, EventsQuery}; -use crate::Sitemty; use bytes::Bytes; use chrono::Utc; use err::Error; -use futures_core::Stream; -use futures_util::{pin_mut, StreamExt}; +use futures_util::pin_mut; use hyper::{Body, Response}; use netpod::log::*; use netpod::timeunits::SEC; -use netpod::{AggKind, Channel, Cluster, NodeConfigCached, PerfOpts, PreBinnedPatchCoord}; +use netpod::{AggKind, Channel, Cluster, NodeConfigCached, PreBinnedPatchCoord}; use serde::{Deserialize, Serialize}; use std::collections::VecDeque; -use std::future::Future; use std::io; use std::path::PathBuf; use std::pin::Pin; @@ -23,8 +15,6 @@ use std::task::{Context, Poll}; use tiny_keccak::Hasher; use tokio::io::{AsyncRead, ReadBuf}; -pub mod pbvfs; - // TODO move to a better fitting module: pub struct HttpBodyAsAsyncRead { inp: Response, @@ -86,116 +76,6 @@ impl AsyncRead for HttpBodyAsAsyncRead { } } -type T001 = Pin, Error>> + Send>>; -type T002 = Pin, Error>> + Send>>; - -// TODO remove after refactoring. -pub struct MergedFromRemotes -where - SK: StreamKind, -{ - tcp_establish_futs: Vec::XBinnedEvents>>>, - nodein: Vec::XBinnedEvents>>>>, - merged: Option::XBinnedEvents>>>, - completed: bool, - errored: bool, -} - -impl MergedFromRemotes -where - SK: StreamKind, - Sitemty<::XBinnedEvents>: FrameType, -{ - pub fn new(evq: EventsQuery, perf_opts: PerfOpts, cluster: Cluster, stream_kind: SK) -> Self { - let mut tcp_establish_futs = vec![]; - for node in &cluster.nodes { - let f = x_processed_stream_from_node(evq.clone(), perf_opts.clone(), node.clone(), stream_kind.clone()); - let f: T002::XBinnedEvents>> = Box::pin(f); - tcp_establish_futs.push(f); - } - let n = tcp_establish_futs.len(); - Self { - tcp_establish_futs, - nodein: (0..n).into_iter().map(|_| None).collect(), - merged: None, - completed: false, - errored: false, - } - } -} - -impl Stream for MergedFromRemotes -where - SK: StreamKind, -{ - type Item = Result::XBinnedEvents>>, Error>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - 'outer: loop { - break if self.completed { - panic!("MergedFromRemotes poll_next on completed"); - } else if self.errored { - self.completed = true; - return Ready(None); - } else if let Some(fut) = &mut self.merged { - match fut.poll_next_unpin(cx) { - Ready(Some(Ok(k))) => Ready(Some(Ok(k))), - Ready(Some(Err(e))) => { - self.errored = true; - Ready(Some(Err(e))) - } - Ready(None) => { - self.completed = true; - Ready(None) - } - Pending => Pending, - } - } else { - let mut pend = false; - let mut c1 = 0; - for i1 in 0..self.tcp_establish_futs.len() { - if self.nodein[i1].is_none() { - let f = &mut self.tcp_establish_futs[i1]; - pin_mut!(f); - match f.poll(cx) { - Ready(Ok(k)) => { - self.nodein[i1] = Some(k); - } - Ready(Err(e)) => { - self.errored = true; - return Ready(Some(Err(e))); - } - Pending => { - pend = true; - } - } - } else { - c1 += 1; - } - } - if pend { - Pending - } else { - if c1 == self.tcp_establish_futs.len() { - debug!("MergedFromRemotes setting up merged stream"); - let inps = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect(); - let s1 = MergedStream::<_, SK>::new(inps); - self.merged = Some(Box::pin(s1)); - } else { - debug!( - "MergedFromRemotes raw / estab {} {}", - c1, - self.tcp_establish_futs.len() - ); - } - continue 'outer; - } - }; - } - } -} - pub struct BytesWrap {} impl From for Bytes { diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs deleted file mode 100644 index a2cbdd9..0000000 --- a/disk/src/cache/pbvfs.rs +++ /dev/null @@ -1,153 +0,0 @@ -use crate::agg::streams::StreamItem; -use crate::binned::query::PreBinnedQuery; -use crate::binned::{RangeCompletableItem, StreamKind}; -use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead}; -use crate::frame::inmem::InMemoryFrameAsyncReadStream; -use crate::frame::makeframe::{decode_frame, FrameType}; -use err::Error; -use futures_core::Stream; -use futures_util::{pin_mut, FutureExt}; -use http::StatusCode; -use netpod::log::*; -use netpod::{NodeConfigCached, PerfOpts}; -use std::pin::Pin; -use std::task::{Context, Poll}; - -// TODO remove when SK no longer needed. -pub struct PreBinnedScalarValueFetchedStream -where - SK: StreamKind, -{ - uri: http::Uri, - resfut: Option, - res: Option>, - errored: bool, - completed: bool, - _stream_kind: SK, -} - -impl PreBinnedScalarValueFetchedStream -where - SK: StreamKind, -{ - pub fn new(query: &PreBinnedQuery, node_config: &NodeConfigCached, stream_kind: &SK) -> Result { - let nodeix = node_ix_for_patch(&query.patch(), &query.channel(), &node_config.node_config.cluster); - let node = &node_config.node_config.cluster.nodes[nodeix as usize]; - let uri: hyper::Uri = format!( - "http://{}:{}/api/4/prebinned?{}", - node.host, - node.port, - query.make_query_string() - ) - .parse()?; - let ret = Self { - uri, - resfut: None, - res: None, - errored: false, - completed: false, - _stream_kind: stream_kind.clone(), - }; - Ok(ret) - } -} - -// TODO change name, is now generic: -impl Stream for PreBinnedScalarValueFetchedStream -where - SK: StreamKind, - Result>, err::Error>: FrameType, -{ - type Item = Result>, Error>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - 'outer: loop { - break if self.completed { - panic!("poll_next on completed"); - } else if self.errored { - self.completed = true; - return Ready(None); - } else if let Some(res) = self.res.as_mut() { - pin_mut!(res); - match res.poll_next(cx) { - Ready(Some(Ok(item))) => match item { - StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), - StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), - StreamItem::DataItem(item) => { - match decode_frame::>, Error>>( - &item, - ) { - Ok(Ok(item)) => Ready(Some(Ok(item))), - Ok(Err(e)) => { - self.errored = true; - Ready(Some(Err(e))) - } - Err(e) => { - self.errored = true; - Ready(Some(Err(e))) - } - } - } - }, - Ready(Some(Err(e))) => { - self.errored = true; - Ready(Some(Err(e))) - } - Ready(None) => { - self.completed = true; - Ready(None) - } - Pending => Pending, - } - } else if let Some(resfut) = self.resfut.as_mut() { - match resfut.poll_unpin(cx) { - Ready(res) => match res { - Ok(res) => { - if res.status() == StatusCode::OK { - let perf_opts = PerfOpts { inmem_bufcap: 512 }; - let s1 = HttpBodyAsAsyncRead::new(res); - let s2 = InMemoryFrameAsyncReadStream::new(s1, perf_opts.inmem_bufcap); - self.res = Some(s2); - continue 'outer; - } else { - error!( - "PreBinnedValueFetchedStream got non-OK result from sub request: {:?}", - res - ); - let e = Error::with_msg(format!( - "PreBinnedValueFetchedStream got non-OK result from sub request: {:?}", - res - )); - self.errored = true; - Ready(Some(Err(e))) - } - } - Err(e) => { - error!("PreBinnedValueStream error in stream {:?}", e); - self.errored = true; - Ready(Some(Err(e.into()))) - } - }, - Pending => Pending, - } - } else { - match hyper::Request::builder() - .method(http::Method::GET) - .uri(&self.uri) - .body(hyper::Body::empty()) - { - Ok(req) => { - let client = hyper::Client::new(); - self.resfut = Some(client.request(req)); - continue 'outer; - } - Err(e) => { - self.errored = true; - Ready(Some(Err(e.into()))) - } - } - }; - } - } -} diff --git a/disk/src/decode.rs b/disk/src/decode.rs index 3985fd3..ecb3bea 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -1,20 +1,16 @@ -use crate::agg::binnedt4::{TimeBinnableType, TimeBinnableTypeAggregator}; +use crate::agg::binnedt4::TimeBinnableType; use crate::agg::enp::{Identity, WaveXBinner}; -use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::{Appendable, StreamItem}; use crate::binned::{ - EventsNodeProcessor, FilterFittingInside, MinMaxAvgAggregator, MinMaxAvgBins, MinMaxAvgBinsAggregator, NumOps, - PushableIndex, RangeCompletableItem, RangeOverlapInfo, ReadPbv, ReadableFromFile, WithLen, WithTimestamps, + EventValuesAggregator, EventsNodeProcessor, FilterFittingInside, MinMaxAvgBins, NumOps, PushableIndex, + RangeCompletableItem, RangeOverlapInfo, ReadPbv, ReadableFromFile, WithLen, WithTimestamps, }; use crate::eventblobs::EventBlobsComplete; use crate::eventchunker::EventFull; -use crate::frame::makeframe::{make_frame, Framable}; -use bytes::BytesMut; use err::Error; use futures_core::Stream; use futures_util::StreamExt; use netpod::NanoRange; -use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use std::marker::PhantomData; use std::mem::size_of; @@ -218,7 +214,7 @@ impl RangeOverlapInfo for EventValues { } impl FilterFittingInside for EventValues { - fn filter_fitting_inside(self, fit_range: NanoRange) -> Option { + fn filter_fitting_inside(self, _fit_range: NanoRange) -> Option { todo!() } } @@ -251,12 +247,13 @@ impl ReadableFromFile for EventValues where NTY: NumOps, { - fn read_from_file(file: File) -> Result, Error> { - todo!() + fn read_from_file(_file: File) -> Result, Error> { + // TODO refactor types such that this can be removed. + panic!() } - fn from_buf(buf: &[u8]) -> Result { - todo!() + fn from_buf(_buf: &[u8]) -> Result { + panic!() } } @@ -265,7 +262,7 @@ where NTY: NumOps, { type Output = MinMaxAvgBins; - type Aggregator = MinMaxAvgAggregator; + type Aggregator = EventValuesAggregator; fn aggregator(range: NanoRange) -> Self::Aggregator { Self::Aggregator::new(range) diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 7eaa54b..16c4f95 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -1,5 +1,5 @@ use crate::agg::streams::{Appendable, StatsItem, StreamItem}; -use crate::binned::{EventsNodeProcessor, PushableIndex, RangeCompletableItem, StreamKind, WithLen, WithTimestamps}; +use crate::binned::{EventsNodeProcessor, PushableIndex, RangeCompletableItem, WithLen, WithTimestamps}; use crate::streamlog::LogItem; use crate::Sitemty; use err::Error; @@ -243,226 +243,3 @@ where } } } - -// TODO remove after refactor -pub struct MergedStream -where - S: Stream>, Error>> + Unpin, - SK: StreamKind, -{ - inps: Vec, - current: Vec::XBinnedEvents>>, - ixs: Vec, - errored: bool, - completed: bool, - batch: ::XBinnedEvents, - ts_last_emit: u64, - range_complete_observed: Vec, - range_complete_observed_all: bool, - range_complete_observed_all_emitted: bool, - data_emit_complete: bool, - batch_size: usize, - logitems: VecDeque, - event_data_read_stats_items: VecDeque, -} - -impl MergedStream -where - S: Stream>, Error>> + Unpin, - SK: StreamKind, -{ - pub fn new(inps: Vec) -> Self { - let n = inps.len(); - let current = (0..n).into_iter().map(|_| MergedCurVal::None).collect(); - Self { - inps, - current: current, - ixs: vec![0; n], - errored: false, - completed: false, - batch: <::XBinnedEvents as Appendable>::empty(), - ts_last_emit: 0, - range_complete_observed: vec![false; n], - range_complete_observed_all: false, - range_complete_observed_all_emitted: false, - data_emit_complete: false, - batch_size: 64, - logitems: VecDeque::new(), - event_data_read_stats_items: VecDeque::new(), - } - } - - fn replenish(self: &mut Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - let mut pending = 0; - for i1 in 0..self.inps.len() { - match self.current[i1] { - MergedCurVal::None => { - 'l1: loop { - break match self.inps[i1].poll_next_unpin(cx) { - Ready(Some(Ok(k))) => match k { - StreamItem::Log(item) => { - self.logitems.push_back(item); - continue 'l1; - } - StreamItem::Stats(item) => { - match item { - StatsItem::EventDataReadStats(item) => { - self.event_data_read_stats_items.push_back(item); - } - } - continue 'l1; - } - StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => { - self.range_complete_observed[i1] = true; - let d = self.range_complete_observed.iter().filter(|&&k| k).count(); - if d == self.range_complete_observed.len() { - self.range_complete_observed_all = true; - debug!("MergedStream range_complete d {} COMPLETE", d); - } else { - trace!("MergedStream range_complete d {}", d); - } - continue 'l1; - } - RangeCompletableItem::Data(item) => { - self.ixs[i1] = 0; - self.current[i1] = MergedCurVal::Val(item); - } - }, - }, - Ready(Some(Err(e))) => { - // TODO emit this error, consider this stream as done, anything more to do here? - //self.current[i1] = CurVal::Err(e); - self.errored = true; - return Ready(Err(e)); - } - Ready(None) => { - self.current[i1] = MergedCurVal::Finish; - } - Pending => { - pending += 1; - } - }; - } - } - _ => (), - } - } - if pending > 0 { - Pending - } else { - Ready(Ok(())) - } - } -} - -impl Stream for MergedStream -where - S: Stream::XBinnedEvents>>, Error>> + Unpin, - SK: StreamKind, -{ - type Item = Result::XBinnedEvents>>, Error>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - 'outer: loop { - break if self.completed { - panic!("MergedStream poll_next on completed"); - } else if self.errored { - self.completed = true; - Ready(None) - } else if let Some(item) = self.logitems.pop_front() { - Ready(Some(Ok(StreamItem::Log(item)))) - } else if let Some(item) = self.event_data_read_stats_items.pop_front() { - Ready(Some(Ok(StreamItem::Stats(StatsItem::EventDataReadStats(item))))) - } else if self.data_emit_complete { - if self.range_complete_observed_all { - if self.range_complete_observed_all_emitted { - self.completed = true; - Ready(None) - } else { - self.range_complete_observed_all_emitted = true; - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) - } - } else { - self.completed = true; - Ready(None) - } - } else { - // Can only run logic if all streams are either finished, errored or have some current value. - match self.replenish(cx) { - Ready(Ok(_)) => { - let mut lowest_ix = usize::MAX; - let mut lowest_ts = u64::MAX; - for i1 in 0..self.inps.len() { - if let MergedCurVal::Val(val) = &self.current[i1] { - let u = self.ixs[i1]; - if u >= val.len() { - self.ixs[i1] = 0; - self.current[i1] = MergedCurVal::None; - continue 'outer; - } else { - let ts = val.ts(u); - if ts < lowest_ts { - lowest_ix = i1; - lowest_ts = ts; - } - } - } - } - if lowest_ix == usize::MAX { - if self.batch.len() != 0 { - //let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); - //let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k); - let emp = <::XBinnedEvents as Appendable>::empty(); - let ret = std::mem::replace(&mut self.batch, emp); - self.data_emit_complete = true; - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) - } else { - self.data_emit_complete = true; - continue 'outer; - } - } else { - assert!(lowest_ts >= self.ts_last_emit); - let emp = <::XBinnedEvents as Appendable>::empty(); - let mut local_batch = std::mem::replace(&mut self.batch, emp); - self.ts_last_emit = lowest_ts; - let rix = self.ixs[lowest_ix]; - match &self.current[lowest_ix] { - MergedCurVal::Val(val) => { - local_batch.push_index(val, rix); - } - MergedCurVal::None => panic!(), - MergedCurVal::Finish => panic!(), - } - self.batch = local_batch; - self.ixs[lowest_ix] += 1; - let curlen = match &self.current[lowest_ix] { - MergedCurVal::Val(val) => val.len(), - MergedCurVal::None => panic!(), - MergedCurVal::Finish => panic!(), - }; - if self.ixs[lowest_ix] >= curlen { - self.ixs[lowest_ix] = 0; - self.current[lowest_ix] = MergedCurVal::None; - } - if self.batch.len() >= self.batch_size { - let emp = <::XBinnedEvents as Appendable>::empty(); - let ret = std::mem::replace(&mut self.batch, emp); - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) - } else { - continue 'outer; - } - } - } - Ready(Err(e)) => { - self.errored = true; - Ready(Some(Err(e))) - } - Pending => Pending, - } - }; - } - } -} diff --git a/disk/src/merge/mergedfromremotes.rs b/disk/src/merge/mergedfromremotes.rs index 13db6d7..812b323 100644 --- a/disk/src/merge/mergedfromremotes.rs +++ b/disk/src/merge/mergedfromremotes.rs @@ -1,7 +1,7 @@ use crate::agg::streams::Appendable; use crate::binned::{EventsNodeProcessor, PushableIndex}; use crate::frame::makeframe::FrameType; -use crate::merge::{MergedStream, MergedStream2}; +use crate::merge::MergedStream2; use crate::raw::{x_processed_stream_from_node2, EventsQuery}; use crate::Sitemty; use err::Error; diff --git a/disk/src/raw.rs b/disk/src/raw.rs index d99cf28..a961577 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -6,7 +6,7 @@ to request such data from nodes. */ use crate::agg::streams::StreamItem; -use crate::binned::{EventsNodeProcessor, RangeCompletableItem, StreamKind}; +use crate::binned::{EventsNodeProcessor, RangeCompletableItem}; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::{make_frame, make_term_frame, FrameType}; use crate::raw::eventsfromframes::EventsFromFrames; @@ -37,40 +37,6 @@ pub struct EventsQuery { #[derive(Serialize, Deserialize)] pub struct EventQueryJsonStringFrame(String); -// TODO remove after refactor. -pub async fn x_processed_stream_from_node( - query: EventsQuery, - perf_opts: PerfOpts, - node: Node, - stream_kind: SK, -) -> Result< - Pin< - Box< - dyn Stream::XBinnedEvents>>, Error>> - + Send, - >, - >, - Error, -> -where - SK: StreamKind, - Result::XBinnedEvents>>, err::Error>: FrameType, -{ - let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; - let qjs = serde_json::to_string(&query)?; - let (netin, mut netout) = net.into_split(); - let buf = make_frame(&EventQueryJsonStringFrame(qjs))?; - netout.write_all(&buf).await?; - let buf = make_term_frame(); - netout.write_all(&buf).await?; - netout.flush().await?; - netout.forget(); - let frames = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap); - //let items = EventsFromFrames::new(frames); - //Ok(Box::pin(items)) - Ok(err::todoval()) -} - pub async fn x_processed_stream_from_node2( query: EventsQuery, perf_opts: PerfOpts, diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 6cdc244..ae5d9f4 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -1,7 +1,6 @@ -use crate::agg::enp::{Identity, WaveXBinner}; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::streams::StreamItem; -use crate::binned::{BinnedStreamKindScalar, EventsNodeProcessor, NumOps, RangeCompletableItem, StreamKind}; +use crate::binned::{EventsNodeProcessor, NumOps, RangeCompletableItem}; use crate::decode::{ BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case, EventsDecodedStream, LittleEndian, NumFromBytes, @@ -9,7 +8,7 @@ use crate::decode::{ use crate::eventblobs::EventBlobsComplete; use crate::eventchunker::EventChunkerConf; use crate::frame::inmem::InMemoryFrameAsyncReadStream; -use crate::frame::makeframe::{decode_frame, make_frame, make_term_frame, Framable, FrameType}; +use crate::frame::makeframe::{decode_frame, make_frame, make_term_frame, Framable}; use crate::raw::{EventQueryJsonStringFrame, EventsQuery}; use crate::Sitemty; use err::Error; diff --git a/disk/src/raw/eventsfromframes.rs b/disk/src/raw/eventsfromframes.rs index 992c5fc..a13412e 100644 --- a/disk/src/raw/eventsfromframes.rs +++ b/disk/src/raw/eventsfromframes.rs @@ -1,9 +1,7 @@ use crate::agg::streams::StreamItem; -use crate::binned::{RangeCompletableItem, StreamKind, XBinnedEvents}; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::{decode_frame, FrameType}; use crate::Sitemty; -use err::Error; use futures_core::Stream; use futures_util::StreamExt; use netpod::log::*; diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 68a53ea..c7ee88d 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -2,7 +2,6 @@ use crate::gather::gather_get_json; use bytes::Bytes; use disk::binned::prebinned::pre_binned_bytes_for_http; use disk::binned::query::{BinnedQuery, PreBinnedQuery}; -use disk::binned::BinnedStreamKindScalar; use disk::raw::conn::events_service; use err::Error; use future::Future; @@ -364,8 +363,6 @@ async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result let query = PreBinnedQuery::from_request(&head)?; let desc = format!("pre-b-{}", query.patch().bin_t_len() / 1000000000); let span1 = span!(Level::INFO, "httpret::prebinned_DISABLED", desc = &desc.as_str()); - // TODO remove StreamKind - let stream_kind = BinnedStreamKindScalar::new(); //span1.in_scope(|| {}); let fut = pre_binned_bytes_for_http(node_config, &query).instrument(span1); let ret = match fut.await {