diff --git a/daqbuffer/src/test.rs b/daqbuffer/src/test.rs index 8ed34bb..2a6444b 100644 --- a/daqbuffer/src/test.rs +++ b/daqbuffer/src/test.rs @@ -5,8 +5,9 @@ use disk::agg::streams::{Bins, StatsItem, StreamItem}; use disk::binned::query::{BinnedQuery, CacheUsage}; use disk::binned::{MinMaxAvgBins, RangeCompletableItem, WithLen}; use disk::frame::inmem::InMemoryFrameAsyncReadStream; -use disk::frame::makeframe::FrameType; +use disk::frame::makeframe::{FrameType, SubFrId}; use disk::streamlog::Streamlog; +use disk::Sitemty; use err::Error; use futures_util::StreamExt; use futures_util::TryStreamExt; @@ -14,6 +15,8 @@ use http::StatusCode; use hyper::Body; use netpod::log::*; use netpod::{AggKind, Channel, Cluster, Database, HostPort, NanoRange, Node, PerfOpts}; +use serde::de::DeserializeOwned; +use std::fmt::Debug; use std::future::ready; use std::sync::{Arc, Mutex}; use tokio::io::AsyncRead; @@ -82,7 +85,7 @@ async fn get_binned_binary_inner() -> Result<(), Error> { let rh = require_test_hosts_running()?; let cluster = &rh.cluster; if true { - get_binned_channel( + get_binned_channel::( "wave-f64-be-n21", "1970-01-01T00:20:10.000Z", "1970-01-01T00:20:30.000Z", @@ -93,8 +96,8 @@ async fn get_binned_binary_inner() -> Result<(), Error> { ) .await?; } - if false { - get_binned_channel( + if true { + get_binned_channel::( "wave-u16-le-n77", "1970-01-01T01:11:00.000Z", "1970-01-01T01:35:00.000Z", @@ -105,8 +108,8 @@ async fn get_binned_binary_inner() -> Result<(), Error> { ) .await?; } - if false { - get_binned_channel( + if true { + get_binned_channel::( "wave-u16-le-n77", "1970-01-01T01:42:00.000Z", "1970-01-01T03:55:00.000Z", @@ -120,23 +123,23 @@ async fn get_binned_binary_inner() -> Result<(), Error> { Ok(()) } -async fn get_binned_channel( +async fn get_binned_channel( channel_name: &str, - beg_date: S, - end_date: S, + beg_date: &str, + end_date: &str, bin_count: u32, cluster: &Cluster, expect_range_complete: bool, expect_bin_count: u64, ) -> Result where - S: AsRef, + NTY: Debug + SubFrId + DeserializeOwned, { let t1 = Utc::now(); let agg_kind = AggKind::DimXBins1; let node0 = &cluster.nodes[0]; - let beg_date: DateTime = beg_date.as_ref().parse()?; - let end_date: DateTime = end_date.as_ref().parse()?; + let beg_date: DateTime = beg_date.parse()?; + let end_date: DateTime = end_date.parse()?; let channel_backend = "testbackend"; let perf_opts = PerfOpts { inmem_bufcap: 512 }; let channel = Channel { @@ -161,7 +164,7 @@ where } let s1 = disk::cache::HttpBodyAsAsyncRead::new(res); let s2 = InMemoryFrameAsyncReadStream::new(s1, perf_opts.inmem_bufcap); - let res = consume_binned_response(s2).await?; + let res = consume_binned_response::(s2).await?; let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; info!("get_cached_0 DONE bin_count {} time {} ms", res.bin_count, ms); @@ -209,8 +212,9 @@ impl BinnedResponse { } } -async fn consume_binned_response(inp: InMemoryFrameAsyncReadStream) -> Result +async fn consume_binned_response(inp: InMemoryFrameAsyncReadStream) -> Result where + NTY: Debug + SubFrId + DeserializeOwned, T: AsyncRead + Unpin, { let s1 = inp @@ -227,11 +231,10 @@ where None } StreamItem::DataItem(frame) => { - type ExpectedType = Result>>, Error>; - if frame.tyid() != ::FRAME_TYPE_ID { + if frame.tyid() != > as FrameType>::FRAME_TYPE_ID { error!("test receives unexpected tyid {:x}", frame.tyid()); } - match bincode::deserialize::(frame.buf()) { + match bincode::deserialize::>>(frame.buf()) { Ok(item) => match item { Ok(item) => match item { StreamItem::Log(item) => { diff --git a/disk/src/agg/streams.rs b/disk/src/agg/streams.rs index d474619..d13eb2c 100644 --- a/disk/src/agg/streams.rs +++ b/disk/src/agg/streams.rs @@ -33,6 +33,7 @@ pub trait Collectable { fn append_to(&self, collected: &mut Self::Collected); } +// TODO can be removed? pub trait Collectable2: Any { fn as_any_ref(&self) -> &dyn Any; fn append(&mut self, src: &dyn Any); diff --git a/disk/src/binned.rs b/disk/src/binned.rs index a1acb21..c0219b0 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -15,7 +15,6 @@ use crate::agg::streams::{ use crate::agg::{Fits, FitsInside}; use crate::binned::binnedfrompbv::BinnedFromPreBinned; use crate::binned::query::{BinnedQuery, PreBinnedQuery}; -use crate::binned::scalar::binned_stream; use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BoxedStream}; use crate::cache::MergedFromRemotes; use crate::decode::{ @@ -53,8 +52,6 @@ use tokio::io::{AsyncRead, ReadBuf}; pub mod binnedfrompbv; pub mod pbv; -// TODO get rid of whole pbv2 mod? -pub mod pbv2; pub mod prebinned; pub mod query; pub mod scalar; @@ -261,34 +258,12 @@ where } } -pub trait MakeFrame2 { - fn make_frame_2(&self) -> Result; -} +pub trait BinnedResponseItem: Send + ToJsonResult + Framable {} -impl MakeFrame2 for Sitemty -where - Sitemty: Framable, -{ - fn make_frame_2(&self) -> Result { - todo!() - } -} - -pub trait DataFramable { - fn make_data_frame(&self) -> Result; -} - -pub trait BinnedResponseItem: Send + ToJsonResult + DataFramable {} - -impl BinnedResponseItem for T -where - T: Send + ToJsonResult + DataFramable, - Sitemty: Framable, -{ -} +impl BinnedResponseItem for T where T: Send + ToJsonResult + Framable {} pub struct BinnedResponseDyn { - stream: Pin>> + Send>>, + stream: Pin> + Send>>, bin_count: u32, } @@ -313,10 +288,12 @@ where >: Framable, // TODO require these things in general? Sitemty<::Output>: FrameType + Framable + 'static, + // TODO is this correct? why do I want the Output to be Framable? Sitemty<<::Output as TimeBinnableType>::Output>: FrameType + Framable + DeserializeOwned, - <::Output as TimeBinnableType>::Output: ToJsonResult + DataFramable, + Sitemty<<::Output as TimeBinnableType>::Output>: ToJsonResult + Framable, { + let _ = ppp; let res = make_num_pipeline_nty_end_evs_enp_stat::<_, _, _, ENP>(event_value_shape, query, node_config)?; let s = PPP::convert(res.stream); let ret = BinnedResponseDyn { @@ -373,12 +350,15 @@ fn make_num_pipeline_entry( ) -> Result where PPP: PipelinePostProcessA, + PPP: PipelinePostProcessB>, PPP: PipelinePostProcessB>, PPP: PipelinePostProcessB>, { match scalar_type { + ScalarType::U16 => match_end!(u16, byte_order, shape, query, ppp, node_config), ScalarType::I32 => match_end!(i32, byte_order, shape, query, ppp, node_config), ScalarType::F64 => match_end!(f64, byte_order, shape, query, ppp, node_config), + // TODO complete set _ => todo!(), } } @@ -390,6 +370,7 @@ async fn make_num_pipeline( ) -> Result where PPP: PipelinePostProcessA, + PPP: PipelinePostProcessB>, PPP: PipelinePostProcessB>, PPP: PipelinePostProcessB>, { @@ -445,7 +426,7 @@ impl PipelinePostProcessA for Ppp1 { pub trait PipelinePostProcessB { fn convert( inp: Pin> + Send>>, - ) -> Pin>> + Send>>; + ) -> Pin> + Send>>; } impl PipelinePostProcessB> for Ppp1 @@ -454,20 +435,8 @@ where { fn convert( inp: Pin>> + Send>>, - ) -> Pin>> + Send>> { - let s = StreamExt::map(inp, |item| match item { - Ok(item) => Ok(match item { - StreamItem::DataItem(item) => StreamItem::DataItem(match item { - RangeCompletableItem::Data(item) => { - RangeCompletableItem::Data(Box::new(item) as Box) - } - RangeCompletableItem::RangeComplete => RangeCompletableItem::RangeComplete, - }), - StreamItem::Log(item) => StreamItem::Log(item), - StreamItem::Stats(item) => StreamItem::Stats(item), - }), - Err(e) => Err(e), - }); + ) -> Pin> + Send>> { + let s = StreamExt::map(inp, |item| Box::new(item) as Box); Box::pin(s) } } @@ -478,29 +447,7 @@ pub async fn binned_bytes_for_http( ) -> Result> + Send>>, Error> { let pl = make_num_pipeline::(query, Ppp1 {}, node_config).await?; let ret = pl.stream.map(|item| { - // TODO - - // TODO - - // Even for the "common" frame types I need the type of the inner item because the serialization - // depends on the full type. The representation of the "common" variants are not necessarily - // the same for different inner type! - - // Therefore, need a "make frame" on the full Sitemty> - - let fr = match item { - Ok(item) => match item { - StreamItem::DataItem(item) => match item { - RangeCompletableItem::Data(item) => item.make_data_frame(), - RangeCompletableItem::RangeComplete => { - make_frame(&Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))) - } - }, - StreamItem::Log(item) => make_frame(&Ok(StreamItem::Log(item))), - StreamItem::Stats(item) => make_frame(&Ok(StreamItem::Stats(item))), - }, - Err(e) => make_frame(&Err(e)), - }; + let fr = item.make_frame(); let fr = fr?; Ok(fr.freeze()) }); @@ -1057,25 +1004,7 @@ where } } -impl DataFramable for MinMaxAvgBins -where - NTY: NumOps, - Sitemty: FrameType, -{ - fn make_data_frame(&self) -> Result { - let item = Self { - ts1s: self.ts1s.clone(), - ts2s: self.ts2s.clone(), - counts: self.counts.clone(), - mins: self.mins.clone(), - maxs: self.maxs.clone(), - avgs: self.avgs.clone(), - }; - make_frame(&Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))) - } -} - -impl ToJsonResult for MinMaxAvgBins +impl ToJsonResult for Sitemty> where NTY: NumOps, { diff --git a/disk/src/binned/binnedfrompbv.rs b/disk/src/binned/binnedfrompbv.rs index 7292ac4..817f78a 100644 --- a/disk/src/binned/binnedfrompbv.rs +++ b/disk/src/binned/binnedfrompbv.rs @@ -236,11 +236,6 @@ where ready(g) } }); - - // TODO TBinnerStream is for T-binning events. - // But here, we need to bin bins into bigger bins. - // The logic in TBinnerStream is actually the same I think.. - // Reuse?? let inp = TBinnerStream::<_, TBT>::new(inp, range); Ok(Self { inp: Box::pin(inp), diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index 8ded692..aa3afcd 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -1,5 +1,6 @@ use crate::agg::binnedt4::{TBinnerStream, TimeBinnableType}; use crate::agg::streams::{Appendable, StreamItem}; +use crate::binned::binnedfrompbv::FetchedPreBinned; use crate::binned::query::{CacheUsage, PreBinnedQuery}; use crate::binned::{ BinnedStreamKindScalar, BinsTimeBinner, EventsNodeProcessor, EventsTimeBinner, NumOps, PushableIndex, @@ -18,6 +19,7 @@ use futures_core::Stream; use futures_util::{FutureExt, StreamExt}; use netpod::log::*; use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange}; +use serde::de::DeserializeOwned; use serde::Serialize; use std::future::Future; use std::io; @@ -27,15 +29,12 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tokio::fs::{File, OpenOptions}; -//pub type SomeScc = netpod::streamext::SCC; - -pub struct PreBinnedValueStream +pub struct PreBinnedValueStream where NTY: NumOps + NumFromBytes + Serialize + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Output>, - ETB: EventsTimeBinner::Output>, { query: PreBinnedQuery, node_config: NodeConfigCached, @@ -64,19 +63,19 @@ where _m2: PhantomData, _m3: PhantomData, _m4: PhantomData, - _m5: PhantomData, } -impl PreBinnedValueStream +impl PreBinnedValueStream where NTY: NumOps + NumFromBytes + Serialize + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Output> + 'static, - ETB: EventsTimeBinner::Output> + 'static, ::Output: PushableIndex + Appendable, + // TODO is this needed: Sitemty<::Output>: FrameType, - ::Output: Appendable, + // TODO who exactly needs this DeserializeOwned? + Sitemty<<::Output as TimeBinnableType>::Output>: FrameType + DeserializeOwned, { pub fn new(query: PreBinnedQuery, node_config: &NodeConfigCached) -> Self { Self { @@ -100,7 +99,6 @@ where _m2: PhantomData, _m3: PhantomData, _m4: PhantomData, - _m5: PhantomData, } } @@ -179,10 +177,12 @@ where disk_stats_every.clone(), report_error, ); - // TODO copy and adapt PreBinnedScalarValueFetchedStream - //PreBinnedScalarValueFetchedStream::new(&query, &node_config, &stream_kind) - let tmp: PreBinnedScalarValueFetchedStream = err::todoval(); - Ok(tmp) + let ret = + FetchedPreBinned::<<::Output as TimeBinnableType>::Output>::new( + &query, + &node_config, + )?; + Ok(ret) } }) .map(|k| { @@ -193,7 +193,7 @@ where s }) .flatten(); - Err(err::todoval()) + Ok(Box::pin(s)) } fn try_setup_fetch_prebinned_higher_res(&mut self) -> Result<(), Error> { @@ -211,16 +211,16 @@ where } } -impl Stream for PreBinnedValueStream +impl Stream for PreBinnedValueStream where NTY: NumOps + NumFromBytes + Serialize + Unpin + 'static, END: Endianness + Unpin + 'static, EVS: EventValueShape + EventValueFromBytes + Unpin + 'static, ENP: EventsNodeProcessor>::Output> + Unpin + 'static, - ETB: EventsTimeBinner::Output> + Unpin + 'static, ::Output: PushableIndex + Appendable, + // TODO needed? Sitemty<::Output>: FrameType, - ::Output: Serialize + ReadableFromFile + 'static, + Sitemty<<::Output as TimeBinnableType>::Output>: FrameType + DeserializeOwned, { type Item = Sitemty<<::Output as TimeBinnableType>::Output>; diff --git a/disk/src/binned/pbv2.rs b/disk/src/binned/pbv2.rs deleted file mode 100644 index 3e5e368..0000000 --- a/disk/src/binned/pbv2.rs +++ /dev/null @@ -1,424 +0,0 @@ -use crate::agg::streams::{Appendable, StreamItem}; -use crate::binned::query::{CacheUsage, PreBinnedQuery}; -use crate::binned::{RangeCompletableItem, StreamKind, WithLen}; -use crate::cache::pbvfs::PreBinnedScalarValueFetchedStream; -use crate::cache::{write_pb_cache_min_max_avg_scalar, CacheFileDesc, MergedFromRemotes, WrittenPbCache}; -use crate::frame::makeframe::{make_frame, FrameType}; -use crate::raw::EventsQuery; -use crate::streamlog::Streamlog; -use bytes::Bytes; -use err::Error; -use futures_core::Stream; -use futures_util::{FutureExt, StreamExt}; -use netpod::log::*; -use netpod::streamext::SCC; -use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange}; -use std::future::Future; -use std::io; -use std::path::PathBuf; -use std::pin::Pin; -use std::task::{Context, Poll}; -use tokio::fs::{File, OpenOptions}; - -pub type PreBinnedValueByteStream = SCC>; - -pub struct PreBinnedValueByteStreamInner -where - SK: StreamKind, -{ - inp: PreBinnedValueStream, -} - -pub fn pre_binned_value_byte_stream_new( - query: &PreBinnedQuery, - node_config: &NodeConfigCached, - stream_kind: SK, -) -> PreBinnedValueByteStream -where - SK: StreamKind, - Result::TBinnedBins>>, err::Error>: FrameType, -{ - let s1 = PreBinnedValueStream::new(query.clone(), node_config, stream_kind); - let s2 = PreBinnedValueByteStreamInner { inp: s1 }; - SCC::new(s2) -} - -impl Stream for PreBinnedValueByteStreamInner -where - SK: StreamKind, - Result::TBinnedBins>>, err::Error>: FrameType, -{ - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - match self.inp.poll_next_unpin(cx) { - Ready(Some(item)) => { - match make_frame::::TBinnedBins>>, err::Error>>( - &item, - ) { - Ok(buf) => Ready(Some(Ok(buf.freeze()))), - Err(e) => Ready(Some(Err(e.into()))), - } - } - Ready(None) => Ready(None), - Pending => Pending, - } - } -} - -pub struct PreBinnedValueStream -where - SK: StreamKind, -{ - query: PreBinnedQuery, - node_config: NodeConfigCached, - open_check_local_file: Option> + Send>>>, - fut2: Option< - Pin< - Box< - dyn Stream::TBinnedBins>>, err::Error>> - + Send, - >, - >, - >, - read_from_cache: bool, - cache_written: bool, - data_complete: bool, - range_complete_observed: bool, - range_complete_emitted: bool, - errored: bool, - completed: bool, - streamlog: Streamlog, - values: ::TBinnedBins, - write_fut: Option> + Send>>>, - read_cache_fut: Option< - Pin< - Box< - dyn Future< - Output = Result::TBinnedBins>>, err::Error>, - > + Send, - >, - >, - >, - stream_kind: SK, -} - -impl PreBinnedValueStream -where - SK: StreamKind, - Result::TBinnedBins>>, err::Error>: FrameType, -{ - pub fn new(query: PreBinnedQuery, node_config: &NodeConfigCached, stream_kind: SK) -> Self { - Self { - query, - node_config: node_config.clone(), - open_check_local_file: None, - fut2: None, - read_from_cache: false, - cache_written: false, - data_complete: false, - range_complete_observed: false, - range_complete_emitted: false, - errored: false, - completed: false, - streamlog: Streamlog::new(node_config.ix as u32), - values: <::TBinnedBins as Appendable>::empty(), - write_fut: None, - read_cache_fut: None, - stream_kind, - } - } - - // TODO handle errors also here via return type. - fn setup_merged_from_remotes(&mut self) { - let evq = EventsQuery { - channel: self.query.channel().clone(), - range: self.query.patch().patch_range(), - agg_kind: self.query.agg_kind().clone(), - }; - if self.query.patch().patch_t_len() % self.query.patch().bin_t_len() != 0 { - error!( - "Patch length inconsistency {} {}", - self.query.patch().patch_t_len(), - self.query.patch().bin_t_len() - ); - return; - } - // TODO do I need to set up more transformations or binning to deliver the requested data? - let count = self.query.patch().patch_t_len() / self.query.patch().bin_t_len(); - let range = BinnedRange::covering_range(evq.range.clone(), count as u32) - .unwrap() - .ok_or(Error::with_msg("covering_range returns None")) - .unwrap(); - let perf_opts = PerfOpts { inmem_bufcap: 512 }; - // TODO remove whole mod after refactor - /*let s1 = MergedFromRemotes::new( - evq, - perf_opts, - self.node_config.node_config.cluster.clone(), - self.stream_kind.clone(), - ); - let s1 = ::xbinned_to_tbinned(s1, range); - self.fut2 = Some(Box::pin(s1));*/ - err::todo(); - self.fut2 = None; - } - - fn setup_from_higher_res_prebinned(&mut self, range: PreBinnedPatchRange) { - let g = self.query.patch().bin_t_len(); - let h = range.grid_spec.bin_t_len(); - trace!( - "try_setup_fetch_prebinned_higher_res found g {} h {} ratio {} mod {} {:?}", - g, - h, - g / h, - g % h, - range, - ); - if g / h <= 1 { - error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h); - return; - } - if g / h > 1024 * 10 { - error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h); - return; - } - if g % h != 0 { - error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h); - return; - } - let node_config = self.node_config.clone(); - let patch_it = PreBinnedPatchIterator::from_range(range); - let s = futures_util::stream::iter(patch_it) - .map({ - let q2 = self.query.clone(); - let disk_stats_every = self.query.disk_stats_every().clone(); - let stream_kind = self.stream_kind.clone(); - let report_error = self.query.report_error(); - move |patch| { - let query = PreBinnedQuery::new( - patch, - q2.channel().clone(), - q2.agg_kind().clone(), - q2.cache_usage().clone(), - disk_stats_every.clone(), - report_error, - ); - PreBinnedScalarValueFetchedStream::new(&query, &node_config, &stream_kind) - } - }) - .map(|k| { - let s: Pin + Send>> = match k { - Ok(k) => Box::pin(k), - Err(e) => Box::pin(futures_util::stream::iter(vec![Err(e)])), - }; - s - }) - .flatten(); - self.fut2 = Some(Box::pin(s)); - } - - fn try_setup_fetch_prebinned_higher_res(&mut self) -> Result<(), Error> { - let range = self.query.patch().patch_range(); - match PreBinnedPatchRange::covering_range(range, self.query.patch().bin_count() + 1) { - Ok(Some(range)) => { - self.setup_from_higher_res_prebinned(range); - } - Ok(None) => { - self.setup_merged_from_remotes(); - } - Err(e) => return Err(e), - } - Ok(()) - } -} - -impl Stream for PreBinnedValueStream -where - SK: StreamKind + Unpin, - Result::TBinnedBins>>, err::Error>: FrameType, -{ - type Item = Result::TBinnedBins>>, err::Error>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - 'outer: loop { - break if self.completed { - panic!("PreBinnedValueStream poll_next on completed"); - } else if self.errored { - self.completed = true; - Ready(None) - } else if let Some(item) = self.streamlog.pop() { - Ready(Some(Ok(StreamItem::Log(item)))) - } else if let Some(fut) = &mut self.write_fut { - match fut.poll_unpin(cx) { - Ready(item) => { - self.cache_written = true; - self.write_fut = None; - match item { - Ok(res) => { - self.streamlog - .append(Level::INFO, format!("cache file written bytes: {}", res.bytes)); - continue 'outer; - } - Err(e) => { - self.errored = true; - Ready(Some(Err(e))) - } - } - } - Pending => Pending, - } - } else if let Some(fut) = &mut self.read_cache_fut { - match fut.poll_unpin(cx) { - Ready(item) => { - self.read_cache_fut = None; - match item { - Ok(item) => { - self.data_complete = true; - self.range_complete_observed = true; - Ready(Some(Ok(item))) - } - Err(e) => { - self.errored = true; - Ready(Some(Err(e))) - } - } - } - Pending => Pending, - } - } else if self.range_complete_emitted { - self.completed = true; - Ready(None) - } else if self.data_complete { - if self.cache_written { - if self.range_complete_observed { - self.range_complete_emitted = true; - let item = RangeCompletableItem::RangeComplete; - Ready(Some(Ok(StreamItem::DataItem(item)))) - } else { - self.completed = true; - Ready(None) - } - } else if self.read_from_cache { - self.cache_written = true; - continue 'outer; - } else { - match self.query.cache_usage() { - CacheUsage::Use | CacheUsage::Recreate => { - let msg = format!( - "write cache file query: {:?} bin count: {}", - self.query.patch(), - self.values.len(), - ); - self.streamlog.append(Level::INFO, msg); - let values = std::mem::replace( - &mut self.values, - <::TBinnedBins as Appendable>::empty(), - ); - let fut = write_pb_cache_min_max_avg_scalar( - values, - self.query.patch().clone(), - self.query.agg_kind().clone(), - self.query.channel().clone(), - self.node_config.clone(), - ); - self.write_fut = Some(Box::pin(fut)); - continue 'outer; - } - _ => { - self.cache_written = true; - continue 'outer; - } - } - } - } else if let Some(fut) = self.fut2.as_mut() { - match fut.poll_next_unpin(cx) { - Ready(Some(k)) => match k { - Ok(item) => match item { - StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), - StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), - StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => { - self.range_complete_observed = true; - continue 'outer; - } - RangeCompletableItem::Data(item) => { - self.values.append(&item); - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) - } - }, - }, - Err(e) => { - self.errored = true; - Ready(Some(Err(e))) - } - }, - Ready(None) => { - self.data_complete = true; - continue 'outer; - } - Pending => Pending, - } - } else if let Some(fut) = self.open_check_local_file.as_mut() { - match fut.poll_unpin(cx) { - Ready(item) => { - self.open_check_local_file = None; - match item { - Ok(file) => { - self.read_from_cache = true; - let fut = <::TBinnedBins as crate::binned::ReadableFromFile>::read_from_file(file)?; - self.read_cache_fut = Some(Box::pin(fut)); - continue 'outer; - } - Err(e) => match e.kind() { - // TODO other error kinds - io::ErrorKind::NotFound => match self.try_setup_fetch_prebinned_higher_res() { - Ok(_) => { - if self.fut2.is_none() { - let e = Err(Error::with_msg(format!( - "try_setup_fetch_prebinned_higher_res failed" - ))); - self.errored = true; - Ready(Some(e)) - } else { - continue 'outer; - } - } - Err(e) => { - let e = Error::with_msg(format!( - "try_setup_fetch_prebinned_higher_res error: {:?}", - e - )); - self.errored = true; - Ready(Some(Err(e))) - } - }, - _ => { - error!("File I/O error: kind {:?} {:?}\n\n..............", e.kind(), e); - self.errored = true; - Ready(Some(Err(e.into()))) - } - }, - } - } - Pending => Pending, - } - } else { - let cfd = CacheFileDesc::new( - self.query.channel().clone(), - self.query.patch().clone(), - self.query.agg_kind().clone(), - ); - let path = match self.query.cache_usage() { - CacheUsage::Use => cfd.path(&self.node_config), - _ => PathBuf::from("DOESNOTEXIST"), - }; - let fut = async { OpenOptions::new().read(true).open(path).await }; - self.open_check_local_file = Some(Box::pin(fut)); - continue 'outer; - }; - } - } -} diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index 4db5741..fcfb2a6 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -3,9 +3,10 @@ use crate::agg::binnedt4::{ }; use crate::agg::enp::{Identity, WaveXBinner}; use crate::agg::streams::{Appendable, StreamItem}; -use crate::binned::pbv2::{ - pre_binned_value_byte_stream_new, PreBinnedValueByteStream, PreBinnedValueByteStreamInner, PreBinnedValueStream, -}; +// use crate::binned::pbv2::{ +// pre_binned_value_byte_stream_new, PreBinnedValueByteStream, PreBinnedValueByteStreamInner, PreBinnedValueStream, +// }; +use crate::binned::pbv::PreBinnedValueStream; use crate::binned::query::PreBinnedQuery; use crate::binned::{ BinsTimeBinner, EventsNodeProcessor, EventsTimeBinner, NumOps, PushableIndex, RangeCompletableItem, @@ -25,12 +26,13 @@ use futures_util::StreamExt; use netpod::streamext::SCC; use netpod::{ByteOrder, NodeConfigCached, ScalarType, Shape}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; +use serde::de::DeserializeOwned; use serde::Serialize; use std::pin::Pin; -fn make_num_pipeline_nty_end_evs_enp( - query: PreBinnedQuery, +fn make_num_pipeline_nty_end_evs_enp( _event_value_shape: EVS, + query: PreBinnedQuery, node_config: &NodeConfigCached, ) -> Pin> + Send>> where @@ -38,16 +40,12 @@ where END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Output> + 'static, - ETB: EventsTimeBinner::Output> + 'static, ::Output: PushableIndex + Appendable + 'static, - ::Output: Serialize + ReadableFromFile + 'static, Sitemty<::Output>: FrameType + Framable + 'static, - Sitemty<::Output>: Framable, - Sitemty<<::Output as TimeBinnableType>::Output>: Framable, + Sitemty<<::Output as TimeBinnableType>::Output>: + Framable + FrameType + DeserializeOwned, { - // TODO - // Currently, this mod uses stuff from pbv2, therefore complete path: - let ret = crate::binned::pbv::PreBinnedValueStream::::new(query, node_config); + let ret = PreBinnedValueStream::::new(query, node_config); let ret = StreamExt::map(ret, |item| Box::new(item) as Box); Box::pin(ret) } @@ -62,20 +60,16 @@ where END: Endianness + 'static, { match shape { - Shape::Scalar => { - make_num_pipeline_nty_end_evs_enp::, DefaultScalarEventsTimeBinner>( - query, - EventValuesDim0Case::new(), - node_config, - ) - } - Shape::Wave(n) => { - make_num_pipeline_nty_end_evs_enp::, DefaultSingleXBinTimeBinner>( - query, - EventValuesDim1Case::new(n), - node_config, - ) - } + Shape::Scalar => make_num_pipeline_nty_end_evs_enp::>( + EventValuesDim0Case::new(), + query, + node_config, + ), + Shape::Wave(n) => make_num_pipeline_nty_end_evs_enp::>( + EventValuesDim1Case::new(n), + query, + node_config, + ), } } @@ -96,22 +90,24 @@ fn make_num_pipeline( node_config: &NodeConfigCached, ) -> Pin> + Send>> { match scalar_type { + ScalarType::U8 => match_end!(u8, byte_order, shape, query, node_config), + ScalarType::U16 => match_end!(u16, byte_order, shape, query, node_config), + ScalarType::U32 => match_end!(u32, byte_order, shape, query, node_config), + ScalarType::U64 => match_end!(u64, byte_order, shape, query, node_config), + ScalarType::I8 => match_end!(i8, byte_order, shape, query, node_config), + ScalarType::I16 => match_end!(i16, byte_order, shape, query, node_config), ScalarType::I32 => match_end!(i32, byte_order, shape, query, node_config), + ScalarType::I64 => match_end!(i64, byte_order, shape, query, node_config), + ScalarType::F32 => match_end!(f64, byte_order, shape, query, node_config), ScalarType::F64 => match_end!(f64, byte_order, shape, query, node_config), - _ => todo!(), } } // TODO after the refactor, return direct value instead of boxed. -pub async fn pre_binned_bytes_for_http( +pub async fn pre_binned_bytes_for_http( node_config: &NodeConfigCached, query: &PreBinnedQuery, - stream_kind: SK, -) -> Result> + Send>>, Error> -where - SK: StreamKind, - Result>, err::Error>: FrameType, -{ +) -> Result> + Send>>, Error> { if query.channel().backend != node_config.node.backend { let err = Error::with_msg(format!( "backend mismatch node: {} requested: {}", @@ -128,7 +124,6 @@ where )); return Err(err); } - let channel_config = read_local_config(&query.channel(), &node_config.node).await?; let entry_res = extract_matching_config_entry(&query.patch().patch_range(), &channel_config)?; let entry = match entry_res { @@ -136,28 +131,17 @@ where MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found")), MatchingConfigEntry::Entry(entry) => entry, }; - let _shape = match entry.to_shape() { - Ok(k) => k, - Err(e) => return Err(e), - }; - - if true { - let ret = make_num_pipeline( - entry.scalar_type.clone(), - entry.byte_order.clone(), - entry.to_shape().unwrap(), - query.clone(), - node_config, - ) - .map(|item| match item.make_frame() { - Ok(item) => Ok(item.freeze()), - Err(e) => Err(e), - }); - let ret = Box::pin(ret); - Ok(ret) - } else { - let ret = pre_binned_value_byte_stream_new(query, node_config, stream_kind); - let ret = Box::pin(ret); - Ok(ret) - } + let ret = make_num_pipeline( + entry.scalar_type.clone(), + entry.byte_order.clone(), + entry.to_shape()?, + query.clone(), + node_config, + ) + .map(|item| match item.make_frame() { + Ok(item) => Ok(item.freeze()), + Err(e) => Err(e), + }); + let ret = Box::pin(ret); + Ok(ret) } diff --git a/disk/src/lib.rs b/disk/src/lib.rs index a562a44..b8d9b13 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -478,7 +478,7 @@ pub fn raw_concat_channel_read_stream_timebin( } } -type Sitemty = Result>, Error>; +pub type Sitemty = Result>, Error>; pub mod dtflags { pub const COMPRESSION: u8 = 0x80; diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index c0d1344..93a0854 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -370,7 +370,7 @@ async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result // TODO remove StreamKind let stream_kind = BinnedStreamKindScalar::new(); //span1.in_scope(|| {}); - let fut = pre_binned_bytes_for_http(node_config, &query, stream_kind).instrument(span1); + let fut = pre_binned_bytes_for_http(node_config, &query).instrument(span1); let ret = match fut.await { Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped( s,