The 3 binned binary tests pass

This commit is contained in:
Dominik Werder
2021-06-09 18:14:02 +02:00
parent 98dbae02d5
commit 1df36f3aeb
9 changed files with 98 additions and 610 deletions

View File

@@ -5,8 +5,9 @@ use disk::agg::streams::{Bins, StatsItem, StreamItem};
use disk::binned::query::{BinnedQuery, CacheUsage};
use disk::binned::{MinMaxAvgBins, RangeCompletableItem, WithLen};
use disk::frame::inmem::InMemoryFrameAsyncReadStream;
use disk::frame::makeframe::FrameType;
use disk::frame::makeframe::{FrameType, SubFrId};
use disk::streamlog::Streamlog;
use disk::Sitemty;
use err::Error;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
@@ -14,6 +15,8 @@ use http::StatusCode;
use hyper::Body;
use netpod::log::*;
use netpod::{AggKind, Channel, Cluster, Database, HostPort, NanoRange, Node, PerfOpts};
use serde::de::DeserializeOwned;
use std::fmt::Debug;
use std::future::ready;
use std::sync::{Arc, Mutex};
use tokio::io::AsyncRead;
@@ -82,7 +85,7 @@ async fn get_binned_binary_inner() -> Result<(), Error> {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
if true {
get_binned_channel(
get_binned_channel::<f64>(
"wave-f64-be-n21",
"1970-01-01T00:20:10.000Z",
"1970-01-01T00:20:30.000Z",
@@ -93,8 +96,8 @@ async fn get_binned_binary_inner() -> Result<(), Error> {
)
.await?;
}
if false {
get_binned_channel(
if true {
get_binned_channel::<u16>(
"wave-u16-le-n77",
"1970-01-01T01:11:00.000Z",
"1970-01-01T01:35:00.000Z",
@@ -105,8 +108,8 @@ async fn get_binned_binary_inner() -> Result<(), Error> {
)
.await?;
}
if false {
get_binned_channel(
if true {
get_binned_channel::<u16>(
"wave-u16-le-n77",
"1970-01-01T01:42:00.000Z",
"1970-01-01T03:55:00.000Z",
@@ -120,23 +123,23 @@ async fn get_binned_binary_inner() -> Result<(), Error> {
Ok(())
}
async fn get_binned_channel<S>(
async fn get_binned_channel<NTY>(
channel_name: &str,
beg_date: S,
end_date: S,
beg_date: &str,
end_date: &str,
bin_count: u32,
cluster: &Cluster,
expect_range_complete: bool,
expect_bin_count: u64,
) -> Result<BinnedResponse, Error>
where
S: AsRef<str>,
NTY: Debug + SubFrId + DeserializeOwned,
{
let t1 = Utc::now();
let agg_kind = AggKind::DimXBins1;
let node0 = &cluster.nodes[0];
let beg_date: DateTime<Utc> = beg_date.as_ref().parse()?;
let end_date: DateTime<Utc> = end_date.as_ref().parse()?;
let beg_date: DateTime<Utc> = beg_date.parse()?;
let end_date: DateTime<Utc> = end_date.parse()?;
let channel_backend = "testbackend";
let perf_opts = PerfOpts { inmem_bufcap: 512 };
let channel = Channel {
@@ -161,7 +164,7 @@ where
}
let s1 = disk::cache::HttpBodyAsAsyncRead::new(res);
let s2 = InMemoryFrameAsyncReadStream::new(s1, perf_opts.inmem_bufcap);
let res = consume_binned_response(s2).await?;
let res = consume_binned_response::<NTY, _>(s2).await?;
let t2 = chrono::Utc::now();
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
info!("get_cached_0 DONE bin_count {} time {} ms", res.bin_count, ms);
@@ -209,8 +212,9 @@ impl BinnedResponse {
}
}
async fn consume_binned_response<T>(inp: InMemoryFrameAsyncReadStream<T>) -> Result<BinnedResponse, Error>
async fn consume_binned_response<NTY, T>(inp: InMemoryFrameAsyncReadStream<T>) -> Result<BinnedResponse, Error>
where
NTY: Debug + SubFrId + DeserializeOwned,
T: AsyncRead + Unpin,
{
let s1 = inp
@@ -227,11 +231,10 @@ where
None
}
StreamItem::DataItem(frame) => {
type ExpectedType = Result<StreamItem<RangeCompletableItem<MinMaxAvgBins<f64>>>, Error>;
if frame.tyid() != <ExpectedType as FrameType>::FRAME_TYPE_ID {
if frame.tyid() != <Sitemty<MinMaxAvgBins<NTY>> as FrameType>::FRAME_TYPE_ID {
error!("test receives unexpected tyid {:x}", frame.tyid());
}
match bincode::deserialize::<ExpectedType>(frame.buf()) {
match bincode::deserialize::<Sitemty<MinMaxAvgBins<NTY>>>(frame.buf()) {
Ok(item) => match item {
Ok(item) => match item {
StreamItem::Log(item) => {

View File

@@ -33,6 +33,7 @@ pub trait Collectable {
fn append_to(&self, collected: &mut Self::Collected);
}
// TODO can be removed?
pub trait Collectable2: Any {
fn as_any_ref(&self) -> &dyn Any;
fn append(&mut self, src: &dyn Any);

View File

@@ -15,7 +15,6 @@ use crate::agg::streams::{
use crate::agg::{Fits, FitsInside};
use crate::binned::binnedfrompbv::BinnedFromPreBinned;
use crate::binned::query::{BinnedQuery, PreBinnedQuery};
use crate::binned::scalar::binned_stream;
use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BoxedStream};
use crate::cache::MergedFromRemotes;
use crate::decode::{
@@ -53,8 +52,6 @@ use tokio::io::{AsyncRead, ReadBuf};
pub mod binnedfrompbv;
pub mod pbv;
// TODO get rid of whole pbv2 mod?
pub mod pbv2;
pub mod prebinned;
pub mod query;
pub mod scalar;
@@ -261,34 +258,12 @@ where
}
}
pub trait MakeFrame2 {
fn make_frame_2(&self) -> Result<BytesMut, Error>;
}
pub trait BinnedResponseItem: Send + ToJsonResult + Framable {}
impl<T> MakeFrame2 for Sitemty<T>
where
Sitemty<T>: Framable,
{
fn make_frame_2(&self) -> Result<BytesMut, Error> {
todo!()
}
}
pub trait DataFramable {
fn make_data_frame(&self) -> Result<BytesMut, Error>;
}
pub trait BinnedResponseItem: Send + ToJsonResult + DataFramable {}
impl<T> BinnedResponseItem for T
where
T: Send + ToJsonResult + DataFramable,
Sitemty<T>: Framable,
{
}
impl<T> BinnedResponseItem for T where T: Send + ToJsonResult + Framable {}
pub struct BinnedResponseDyn {
stream: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn BinnedResponseItem>>> + Send>>,
stream: Pin<Box<dyn Stream<Item = Box<dyn BinnedResponseItem>> + Send>>,
bin_count: u32,
}
@@ -313,10 +288,12 @@ where
>: Framable,
// TODO require these things in general?
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType + Framable + 'static,
// TODO is this correct? why do I want the Output to be Framable?
Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>:
FrameType + Framable + DeserializeOwned,
<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output: ToJsonResult + DataFramable,
Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>: ToJsonResult + Framable,
{
let _ = ppp;
let res = make_num_pipeline_nty_end_evs_enp_stat::<_, _, _, ENP>(event_value_shape, query, node_config)?;
let s = PPP::convert(res.stream);
let ret = BinnedResponseDyn {
@@ -373,12 +350,15 @@ fn make_num_pipeline_entry<PPP>(
) -> Result<BinnedResponseDyn, Error>
where
PPP: PipelinePostProcessA,
PPP: PipelinePostProcessB<MinMaxAvgBins<u16>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<i32>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<f64>>,
{
match scalar_type {
ScalarType::U16 => match_end!(u16, byte_order, shape, query, ppp, node_config),
ScalarType::I32 => match_end!(i32, byte_order, shape, query, ppp, node_config),
ScalarType::F64 => match_end!(f64, byte_order, shape, query, ppp, node_config),
// TODO complete set
_ => todo!(),
}
}
@@ -390,6 +370,7 @@ async fn make_num_pipeline<PPP>(
) -> Result<BinnedResponseDyn, Error>
where
PPP: PipelinePostProcessA,
PPP: PipelinePostProcessB<MinMaxAvgBins<u16>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<i32>>,
PPP: PipelinePostProcessB<MinMaxAvgBins<f64>>,
{
@@ -445,7 +426,7 @@ impl PipelinePostProcessA for Ppp1 {
pub trait PipelinePostProcessB<T> {
fn convert(
inp: Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>,
) -> Pin<Box<dyn Stream<Item = Sitemty<Box<dyn BinnedResponseItem>>> + Send>>;
) -> Pin<Box<dyn Stream<Item = Box<dyn BinnedResponseItem>> + Send>>;
}
impl<NTY> PipelinePostProcessB<MinMaxAvgBins<NTY>> for Ppp1
@@ -454,20 +435,8 @@ where
{
fn convert(
inp: Pin<Box<dyn Stream<Item = Sitemty<MinMaxAvgBins<NTY>>> + Send>>,
) -> Pin<Box<dyn Stream<Item = Sitemty<Box<dyn BinnedResponseItem>>> + Send>> {
let s = StreamExt::map(inp, |item| match item {
Ok(item) => Ok(match item {
StreamItem::DataItem(item) => StreamItem::DataItem(match item {
RangeCompletableItem::Data(item) => {
RangeCompletableItem::Data(Box::new(item) as Box<dyn BinnedResponseItem>)
}
RangeCompletableItem::RangeComplete => RangeCompletableItem::RangeComplete,
}),
StreamItem::Log(item) => StreamItem::Log(item),
StreamItem::Stats(item) => StreamItem::Stats(item),
}),
Err(e) => Err(e),
});
) -> Pin<Box<dyn Stream<Item = Box<dyn BinnedResponseItem>> + Send>> {
let s = StreamExt::map(inp, |item| Box::new(item) as Box<dyn BinnedResponseItem>);
Box::pin(s)
}
}
@@ -478,29 +447,7 @@ pub async fn binned_bytes_for_http(
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>, Error> {
let pl = make_num_pipeline::<Ppp1>(query, Ppp1 {}, node_config).await?;
let ret = pl.stream.map(|item| {
// TODO
// TODO
// Even for the "common" frame types I need the type of the inner item because the serialization
// depends on the full type. The representation of the "common" variants are not necessarily
// the same for different inner type!
// Therefore, need a "make frame" on the full Sitemty<Box<BinnedResponseItem>>
let fr = match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::Data(item) => item.make_data_frame(),
RangeCompletableItem::RangeComplete => {
make_frame(&Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))
}
},
StreamItem::Log(item) => make_frame(&Ok(StreamItem::Log(item))),
StreamItem::Stats(item) => make_frame(&Ok(StreamItem::Stats(item))),
},
Err(e) => make_frame(&Err(e)),
};
let fr = item.make_frame();
let fr = fr?;
Ok(fr.freeze())
});
@@ -1057,25 +1004,7 @@ where
}
}
impl<NTY> DataFramable for MinMaxAvgBins<NTY>
where
NTY: NumOps,
Sitemty<Self>: FrameType,
{
fn make_data_frame(&self) -> Result<BytesMut, Error> {
let item = Self {
ts1s: self.ts1s.clone(),
ts2s: self.ts2s.clone(),
counts: self.counts.clone(),
mins: self.mins.clone(),
maxs: self.maxs.clone(),
avgs: self.avgs.clone(),
};
make_frame(&Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))
}
}
impl<NTY> ToJsonResult for MinMaxAvgBins<NTY>
impl<NTY> ToJsonResult for Sitemty<MinMaxAvgBins<NTY>>
where
NTY: NumOps,
{

View File

@@ -236,11 +236,6 @@ where
ready(g)
}
});
// TODO TBinnerStream is for T-binning events.
// But here, we need to bin bins into bigger bins.
// The logic in TBinnerStream is actually the same I think..
// Reuse??
let inp = TBinnerStream::<_, TBT>::new(inp, range);
Ok(Self {
inp: Box::pin(inp),

View File

@@ -1,5 +1,6 @@
use crate::agg::binnedt4::{TBinnerStream, TimeBinnableType};
use crate::agg::streams::{Appendable, StreamItem};
use crate::binned::binnedfrompbv::FetchedPreBinned;
use crate::binned::query::{CacheUsage, PreBinnedQuery};
use crate::binned::{
BinnedStreamKindScalar, BinsTimeBinner, EventsNodeProcessor, EventsTimeBinner, NumOps, PushableIndex,
@@ -18,6 +19,7 @@ use futures_core::Stream;
use futures_util::{FutureExt, StreamExt};
use netpod::log::*;
use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::future::Future;
use std::io;
@@ -27,15 +29,12 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::fs::{File, OpenOptions};
//pub type SomeScc = netpod::streamext::SCC<u32>;
pub struct PreBinnedValueStream<NTY, END, EVS, ENP, ETB>
pub struct PreBinnedValueStream<NTY, END, EVS, ENP>
where
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + 'static,
END: Endianness + 'static,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output>,
ETB: EventsTimeBinner<Input = <ENP as EventsNodeProcessor>::Output>,
{
query: PreBinnedQuery,
node_config: NodeConfigCached,
@@ -64,19 +63,19 @@ where
_m2: PhantomData<END>,
_m3: PhantomData<EVS>,
_m4: PhantomData<ENP>,
_m5: PhantomData<ETB>,
}
impl<NTY, END, EVS, ENP, ETB> PreBinnedValueStream<NTY, END, EVS, ENP, ETB>
impl<NTY, END, EVS, ENP> PreBinnedValueStream<NTY, END, EVS, ENP>
where
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + 'static,
END: Endianness + 'static,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output> + 'static,
ETB: EventsTimeBinner<Input = <ENP as EventsNodeProcessor>::Output> + 'static,
<ENP as EventsNodeProcessor>::Output: PushableIndex + Appendable,
// TODO is this needed:
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType,
<ETB as EventsTimeBinner>::Output: Appendable,
// TODO who exactly needs this DeserializeOwned?
Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>: FrameType + DeserializeOwned,
{
pub fn new(query: PreBinnedQuery, node_config: &NodeConfigCached) -> Self {
Self {
@@ -100,7 +99,6 @@ where
_m2: PhantomData,
_m3: PhantomData,
_m4: PhantomData,
_m5: PhantomData,
}
}
@@ -179,10 +177,12 @@ where
disk_stats_every.clone(),
report_error,
);
// TODO copy and adapt PreBinnedScalarValueFetchedStream
//PreBinnedScalarValueFetchedStream::new(&query, &node_config, &stream_kind)
let tmp: PreBinnedScalarValueFetchedStream<BinnedStreamKindScalar> = err::todoval();
Ok(tmp)
let ret =
FetchedPreBinned::<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>::new(
&query,
&node_config,
)?;
Ok(ret)
}
})
.map(|k| {
@@ -193,7 +193,7 @@ where
s
})
.flatten();
Err(err::todoval())
Ok(Box::pin(s))
}
fn try_setup_fetch_prebinned_higher_res(&mut self) -> Result<(), Error> {
@@ -211,16 +211,16 @@ where
}
}
impl<NTY, END, EVS, ENP, ETB> Stream for PreBinnedValueStream<NTY, END, EVS, ENP, ETB>
impl<NTY, END, EVS, ENP> Stream for PreBinnedValueStream<NTY, END, EVS, ENP>
where
NTY: NumOps + NumFromBytes<NTY, END> + Serialize + Unpin + 'static,
END: Endianness + Unpin + 'static,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + Unpin + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output> + Unpin + 'static,
ETB: EventsTimeBinner<Input = <ENP as EventsNodeProcessor>::Output> + Unpin + 'static,
<ENP as EventsNodeProcessor>::Output: PushableIndex + Appendable,
// TODO needed?
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType,
<ETB as EventsTimeBinner>::Output: Serialize + ReadableFromFile + 'static,
Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>: FrameType + DeserializeOwned,
{
type Item = Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>;

View File

@@ -1,424 +0,0 @@
use crate::agg::streams::{Appendable, StreamItem};
use crate::binned::query::{CacheUsage, PreBinnedQuery};
use crate::binned::{RangeCompletableItem, StreamKind, WithLen};
use crate::cache::pbvfs::PreBinnedScalarValueFetchedStream;
use crate::cache::{write_pb_cache_min_max_avg_scalar, CacheFileDesc, MergedFromRemotes, WrittenPbCache};
use crate::frame::makeframe::{make_frame, FrameType};
use crate::raw::EventsQuery;
use crate::streamlog::Streamlog;
use bytes::Bytes;
use err::Error;
use futures_core::Stream;
use futures_util::{FutureExt, StreamExt};
use netpod::log::*;
use netpod::streamext::SCC;
use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange};
use std::future::Future;
use std::io;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::fs::{File, OpenOptions};
pub type PreBinnedValueByteStream<SK> = SCC<PreBinnedValueByteStreamInner<SK>>;
pub struct PreBinnedValueByteStreamInner<SK>
where
SK: StreamKind,
{
inp: PreBinnedValueStream<SK>,
}
pub fn pre_binned_value_byte_stream_new<SK>(
query: &PreBinnedQuery,
node_config: &NodeConfigCached,
stream_kind: SK,
) -> PreBinnedValueByteStream<SK>
where
SK: StreamKind,
Result<StreamItem<RangeCompletableItem<<SK as StreamKind>::TBinnedBins>>, err::Error>: FrameType,
{
let s1 = PreBinnedValueStream::new(query.clone(), node_config, stream_kind);
let s2 = PreBinnedValueByteStreamInner { inp: s1 };
SCC::new(s2)
}
impl<SK> Stream for PreBinnedValueByteStreamInner<SK>
where
SK: StreamKind,
Result<StreamItem<RangeCompletableItem<<SK as StreamKind>::TBinnedBins>>, err::Error>: FrameType,
{
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => {
match make_frame::<Result<StreamItem<RangeCompletableItem<<SK as StreamKind>::TBinnedBins>>, err::Error>>(
&item,
) {
Ok(buf) => Ready(Some(Ok(buf.freeze()))),
Err(e) => Ready(Some(Err(e.into()))),
}
}
Ready(None) => Ready(None),
Pending => Pending,
}
}
}
pub struct PreBinnedValueStream<SK>
where
SK: StreamKind,
{
query: PreBinnedQuery,
node_config: NodeConfigCached,
open_check_local_file: Option<Pin<Box<dyn Future<Output = Result<File, std::io::Error>> + Send>>>,
fut2: Option<
Pin<
Box<
dyn Stream<Item = Result<StreamItem<RangeCompletableItem<<SK as StreamKind>::TBinnedBins>>, err::Error>>
+ Send,
>,
>,
>,
read_from_cache: bool,
cache_written: bool,
data_complete: bool,
range_complete_observed: bool,
range_complete_emitted: bool,
errored: bool,
completed: bool,
streamlog: Streamlog,
values: <SK as StreamKind>::TBinnedBins,
write_fut: Option<Pin<Box<dyn Future<Output = Result<WrittenPbCache, Error>> + Send>>>,
read_cache_fut: Option<
Pin<
Box<
dyn Future<
Output = Result<StreamItem<RangeCompletableItem<<SK as StreamKind>::TBinnedBins>>, err::Error>,
> + Send,
>,
>,
>,
stream_kind: SK,
}
impl<SK> PreBinnedValueStream<SK>
where
SK: StreamKind,
Result<StreamItem<RangeCompletableItem<<SK as StreamKind>::TBinnedBins>>, err::Error>: FrameType,
{
pub fn new(query: PreBinnedQuery, node_config: &NodeConfigCached, stream_kind: SK) -> Self {
Self {
query,
node_config: node_config.clone(),
open_check_local_file: None,
fut2: None,
read_from_cache: false,
cache_written: false,
data_complete: false,
range_complete_observed: false,
range_complete_emitted: false,
errored: false,
completed: false,
streamlog: Streamlog::new(node_config.ix as u32),
values: <<SK as StreamKind>::TBinnedBins as Appendable>::empty(),
write_fut: None,
read_cache_fut: None,
stream_kind,
}
}
// TODO handle errors also here via return type.
fn setup_merged_from_remotes(&mut self) {
let evq = EventsQuery {
channel: self.query.channel().clone(),
range: self.query.patch().patch_range(),
agg_kind: self.query.agg_kind().clone(),
};
if self.query.patch().patch_t_len() % self.query.patch().bin_t_len() != 0 {
error!(
"Patch length inconsistency {} {}",
self.query.patch().patch_t_len(),
self.query.patch().bin_t_len()
);
return;
}
// TODO do I need to set up more transformations or binning to deliver the requested data?
let count = self.query.patch().patch_t_len() / self.query.patch().bin_t_len();
let range = BinnedRange::covering_range(evq.range.clone(), count as u32)
.unwrap()
.ok_or(Error::with_msg("covering_range returns None"))
.unwrap();
let perf_opts = PerfOpts { inmem_bufcap: 512 };
// TODO remove whole mod after refactor
/*let s1 = MergedFromRemotes::new(
evq,
perf_opts,
self.node_config.node_config.cluster.clone(),
self.stream_kind.clone(),
);
let s1 = <SK as StreamKind>::xbinned_to_tbinned(s1, range);
self.fut2 = Some(Box::pin(s1));*/
err::todo();
self.fut2 = None;
}
fn setup_from_higher_res_prebinned(&mut self, range: PreBinnedPatchRange) {
let g = self.query.patch().bin_t_len();
let h = range.grid_spec.bin_t_len();
trace!(
"try_setup_fetch_prebinned_higher_res found g {} h {} ratio {} mod {} {:?}",
g,
h,
g / h,
g % h,
range,
);
if g / h <= 1 {
error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h);
return;
}
if g / h > 1024 * 10 {
error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h);
return;
}
if g % h != 0 {
error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h);
return;
}
let node_config = self.node_config.clone();
let patch_it = PreBinnedPatchIterator::from_range(range);
let s = futures_util::stream::iter(patch_it)
.map({
let q2 = self.query.clone();
let disk_stats_every = self.query.disk_stats_every().clone();
let stream_kind = self.stream_kind.clone();
let report_error = self.query.report_error();
move |patch| {
let query = PreBinnedQuery::new(
patch,
q2.channel().clone(),
q2.agg_kind().clone(),
q2.cache_usage().clone(),
disk_stats_every.clone(),
report_error,
);
PreBinnedScalarValueFetchedStream::new(&query, &node_config, &stream_kind)
}
})
.map(|k| {
let s: Pin<Box<dyn Stream<Item = _> + Send>> = match k {
Ok(k) => Box::pin(k),
Err(e) => Box::pin(futures_util::stream::iter(vec![Err(e)])),
};
s
})
.flatten();
self.fut2 = Some(Box::pin(s));
}
fn try_setup_fetch_prebinned_higher_res(&mut self) -> Result<(), Error> {
let range = self.query.patch().patch_range();
match PreBinnedPatchRange::covering_range(range, self.query.patch().bin_count() + 1) {
Ok(Some(range)) => {
self.setup_from_higher_res_prebinned(range);
}
Ok(None) => {
self.setup_merged_from_remotes();
}
Err(e) => return Err(e),
}
Ok(())
}
}
impl<SK> Stream for PreBinnedValueStream<SK>
where
SK: StreamKind + Unpin,
Result<StreamItem<RangeCompletableItem<<SK as StreamKind>::TBinnedBins>>, err::Error>: FrameType,
{
type Item = Result<StreamItem<RangeCompletableItem<<SK as StreamKind>::TBinnedBins>>, err::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
'outer: loop {
break if self.completed {
panic!("PreBinnedValueStream poll_next on completed");
} else if self.errored {
self.completed = true;
Ready(None)
} else if let Some(item) = self.streamlog.pop() {
Ready(Some(Ok(StreamItem::Log(item))))
} else if let Some(fut) = &mut self.write_fut {
match fut.poll_unpin(cx) {
Ready(item) => {
self.cache_written = true;
self.write_fut = None;
match item {
Ok(res) => {
self.streamlog
.append(Level::INFO, format!("cache file written bytes: {}", res.bytes));
continue 'outer;
}
Err(e) => {
self.errored = true;
Ready(Some(Err(e)))
}
}
}
Pending => Pending,
}
} else if let Some(fut) = &mut self.read_cache_fut {
match fut.poll_unpin(cx) {
Ready(item) => {
self.read_cache_fut = None;
match item {
Ok(item) => {
self.data_complete = true;
self.range_complete_observed = true;
Ready(Some(Ok(item)))
}
Err(e) => {
self.errored = true;
Ready(Some(Err(e)))
}
}
}
Pending => Pending,
}
} else if self.range_complete_emitted {
self.completed = true;
Ready(None)
} else if self.data_complete {
if self.cache_written {
if self.range_complete_observed {
self.range_complete_emitted = true;
let item = RangeCompletableItem::RangeComplete;
Ready(Some(Ok(StreamItem::DataItem(item))))
} else {
self.completed = true;
Ready(None)
}
} else if self.read_from_cache {
self.cache_written = true;
continue 'outer;
} else {
match self.query.cache_usage() {
CacheUsage::Use | CacheUsage::Recreate => {
let msg = format!(
"write cache file query: {:?} bin count: {}",
self.query.patch(),
self.values.len(),
);
self.streamlog.append(Level::INFO, msg);
let values = std::mem::replace(
&mut self.values,
<<SK as StreamKind>::TBinnedBins as Appendable>::empty(),
);
let fut = write_pb_cache_min_max_avg_scalar(
values,
self.query.patch().clone(),
self.query.agg_kind().clone(),
self.query.channel().clone(),
self.node_config.clone(),
);
self.write_fut = Some(Box::pin(fut));
continue 'outer;
}
_ => {
self.cache_written = true;
continue 'outer;
}
}
}
} else if let Some(fut) = self.fut2.as_mut() {
match fut.poll_next_unpin(cx) {
Ready(Some(k)) => match k {
Ok(item) => match item {
StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))),
StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))),
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
self.range_complete_observed = true;
continue 'outer;
}
RangeCompletableItem::Data(item) => {
self.values.append(&item);
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))
}
},
},
Err(e) => {
self.errored = true;
Ready(Some(Err(e)))
}
},
Ready(None) => {
self.data_complete = true;
continue 'outer;
}
Pending => Pending,
}
} else if let Some(fut) = self.open_check_local_file.as_mut() {
match fut.poll_unpin(cx) {
Ready(item) => {
self.open_check_local_file = None;
match item {
Ok(file) => {
self.read_from_cache = true;
let fut = <<SK as StreamKind>::TBinnedBins as crate::binned::ReadableFromFile>::read_from_file(file)?;
self.read_cache_fut = Some(Box::pin(fut));
continue 'outer;
}
Err(e) => match e.kind() {
// TODO other error kinds
io::ErrorKind::NotFound => match self.try_setup_fetch_prebinned_higher_res() {
Ok(_) => {
if self.fut2.is_none() {
let e = Err(Error::with_msg(format!(
"try_setup_fetch_prebinned_higher_res failed"
)));
self.errored = true;
Ready(Some(e))
} else {
continue 'outer;
}
}
Err(e) => {
let e = Error::with_msg(format!(
"try_setup_fetch_prebinned_higher_res error: {:?}",
e
));
self.errored = true;
Ready(Some(Err(e)))
}
},
_ => {
error!("File I/O error: kind {:?} {:?}\n\n..............", e.kind(), e);
self.errored = true;
Ready(Some(Err(e.into())))
}
},
}
}
Pending => Pending,
}
} else {
let cfd = CacheFileDesc::new(
self.query.channel().clone(),
self.query.patch().clone(),
self.query.agg_kind().clone(),
);
let path = match self.query.cache_usage() {
CacheUsage::Use => cfd.path(&self.node_config),
_ => PathBuf::from("DOESNOTEXIST"),
};
let fut = async { OpenOptions::new().read(true).open(path).await };
self.open_check_local_file = Some(Box::pin(fut));
continue 'outer;
};
}
}
}

View File

@@ -3,9 +3,10 @@ use crate::agg::binnedt4::{
};
use crate::agg::enp::{Identity, WaveXBinner};
use crate::agg::streams::{Appendable, StreamItem};
use crate::binned::pbv2::{
pre_binned_value_byte_stream_new, PreBinnedValueByteStream, PreBinnedValueByteStreamInner, PreBinnedValueStream,
};
// use crate::binned::pbv2::{
// pre_binned_value_byte_stream_new, PreBinnedValueByteStream, PreBinnedValueByteStreamInner, PreBinnedValueStream,
// };
use crate::binned::pbv::PreBinnedValueStream;
use crate::binned::query::PreBinnedQuery;
use crate::binned::{
BinsTimeBinner, EventsNodeProcessor, EventsTimeBinner, NumOps, PushableIndex, RangeCompletableItem,
@@ -25,12 +26,13 @@ use futures_util::StreamExt;
use netpod::streamext::SCC;
use netpod::{ByteOrder, NodeConfigCached, ScalarType, Shape};
use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::pin::Pin;
fn make_num_pipeline_nty_end_evs_enp<NTY, END, EVS, ENP, ETB>(
query: PreBinnedQuery,
fn make_num_pipeline_nty_end_evs_enp<NTY, END, EVS, ENP>(
_event_value_shape: EVS,
query: PreBinnedQuery,
node_config: &NodeConfigCached,
) -> Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>
where
@@ -38,16 +40,12 @@ where
END: Endianness + 'static,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output> + 'static,
ETB: EventsTimeBinner<Input = <ENP as EventsNodeProcessor>::Output> + 'static,
<ENP as EventsNodeProcessor>::Output: PushableIndex + Appendable + 'static,
<ETB as EventsTimeBinner>::Output: Serialize + ReadableFromFile + 'static,
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType + Framable + 'static,
Sitemty<<ETB as EventsTimeBinner>::Output>: Framable,
Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>: Framable,
Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>:
Framable + FrameType + DeserializeOwned,
{
// TODO
// Currently, this mod uses stuff from pbv2, therefore complete path:
let ret = crate::binned::pbv::PreBinnedValueStream::<NTY, END, EVS, ENP, ETB>::new(query, node_config);
let ret = PreBinnedValueStream::<NTY, END, EVS, ENP>::new(query, node_config);
let ret = StreamExt::map(ret, |item| Box::new(item) as Box<dyn Framable>);
Box::pin(ret)
}
@@ -62,20 +60,16 @@ where
END: Endianness + 'static,
{
match shape {
Shape::Scalar => {
make_num_pipeline_nty_end_evs_enp::<NTY, END, _, Identity<NTY>, DefaultScalarEventsTimeBinner<NTY>>(
query,
EventValuesDim0Case::new(),
node_config,
)
}
Shape::Wave(n) => {
make_num_pipeline_nty_end_evs_enp::<NTY, END, _, WaveXBinner<NTY>, DefaultSingleXBinTimeBinner<NTY>>(
query,
EventValuesDim1Case::new(n),
node_config,
)
}
Shape::Scalar => make_num_pipeline_nty_end_evs_enp::<NTY, END, _, Identity<NTY>>(
EventValuesDim0Case::new(),
query,
node_config,
),
Shape::Wave(n) => make_num_pipeline_nty_end_evs_enp::<NTY, END, _, WaveXBinner<NTY>>(
EventValuesDim1Case::new(n),
query,
node_config,
),
}
}
@@ -96,22 +90,24 @@ fn make_num_pipeline(
node_config: &NodeConfigCached,
) -> Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>> {
match scalar_type {
ScalarType::U8 => match_end!(u8, byte_order, shape, query, node_config),
ScalarType::U16 => match_end!(u16, byte_order, shape, query, node_config),
ScalarType::U32 => match_end!(u32, byte_order, shape, query, node_config),
ScalarType::U64 => match_end!(u64, byte_order, shape, query, node_config),
ScalarType::I8 => match_end!(i8, byte_order, shape, query, node_config),
ScalarType::I16 => match_end!(i16, byte_order, shape, query, node_config),
ScalarType::I32 => match_end!(i32, byte_order, shape, query, node_config),
ScalarType::I64 => match_end!(i64, byte_order, shape, query, node_config),
ScalarType::F32 => match_end!(f64, byte_order, shape, query, node_config),
ScalarType::F64 => match_end!(f64, byte_order, shape, query, node_config),
_ => todo!(),
}
}
// TODO after the refactor, return direct value instead of boxed.
pub async fn pre_binned_bytes_for_http<SK>(
pub async fn pre_binned_bytes_for_http(
node_config: &NodeConfigCached,
query: &PreBinnedQuery,
stream_kind: SK,
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>, Error>
where
SK: StreamKind,
Result<StreamItem<RangeCompletableItem<SK::TBinnedBins>>, err::Error>: FrameType,
{
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>, Error> {
if query.channel().backend != node_config.node.backend {
let err = Error::with_msg(format!(
"backend mismatch node: {} requested: {}",
@@ -128,7 +124,6 @@ where
));
return Err(err);
}
let channel_config = read_local_config(&query.channel(), &node_config.node).await?;
let entry_res = extract_matching_config_entry(&query.patch().patch_range(), &channel_config)?;
let entry = match entry_res {
@@ -136,28 +131,17 @@ where
MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found")),
MatchingConfigEntry::Entry(entry) => entry,
};
let _shape = match entry.to_shape() {
Ok(k) => k,
Err(e) => return Err(e),
};
if true {
let ret = make_num_pipeline(
entry.scalar_type.clone(),
entry.byte_order.clone(),
entry.to_shape().unwrap(),
query.clone(),
node_config,
)
.map(|item| match item.make_frame() {
Ok(item) => Ok(item.freeze()),
Err(e) => Err(e),
});
let ret = Box::pin(ret);
Ok(ret)
} else {
let ret = pre_binned_value_byte_stream_new(query, node_config, stream_kind);
let ret = Box::pin(ret);
Ok(ret)
}
let ret = make_num_pipeline(
entry.scalar_type.clone(),
entry.byte_order.clone(),
entry.to_shape()?,
query.clone(),
node_config,
)
.map(|item| match item.make_frame() {
Ok(item) => Ok(item.freeze()),
Err(e) => Err(e),
});
let ret = Box::pin(ret);
Ok(ret)
}

View File

@@ -478,7 +478,7 @@ pub fn raw_concat_channel_read_stream_timebin(
}
}
type Sitemty<T> = Result<StreamItem<RangeCompletableItem<T>>, Error>;
pub type Sitemty<T> = Result<StreamItem<RangeCompletableItem<T>>, Error>;
pub mod dtflags {
pub const COMPRESSION: u8 = 0x80;

View File

@@ -370,7 +370,7 @@ async fn prebinned(req: Request<Body>, node_config: &NodeConfigCached) -> Result
// TODO remove StreamKind
let stream_kind = BinnedStreamKindScalar::new();
//span1.in_scope(|| {});
let fut = pre_binned_bytes_for_http(node_config, &query, stream_kind).instrument(span1);
let fut = pre_binned_bytes_for_http(node_config, &query).instrument(span1);
let ret = match fut.await {
Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(
s,