This commit is contained in:
Dominik Werder
2021-06-06 13:32:41 +02:00
parent 7f2ccac690
commit df4d3b8e17
3 changed files with 97 additions and 26 deletions

View File

@@ -6,7 +6,6 @@ use crate::cache::{write_pb_cache_min_max_avg_scalar, CacheFileDesc, MergedFromR
use crate::frame::makeframe::{make_frame, FrameType};
use crate::raw::EventsQuery;
use crate::streamlog::Streamlog;
use bytes::Bytes;
use err::Error;
use futures_core::Stream;
use futures_util::{FutureExt, StreamExt};

View File

@@ -1,13 +1,69 @@
use crate::agg::streams::StreamItem;
use crate::binned::pbv2::{pre_binned_value_byte_stream_new, PreBinnedValueByteStream, PreBinnedValueStream};
use crate::binned::query::PreBinnedQuery;
use crate::binned::{RangeCompletableItem, StreamKind};
use crate::binned::{EventsNodeProcessor, NumOps, RangeCompletableItem, StreamKind};
use crate::cache::node_ix_for_patch;
use crate::frame::makeframe::FrameType;
use crate::decode::{
BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case,
LittleEndian, NumFromBytes, ProcAA, ProcBB,
};
use crate::frame::makeframe::{Framable, FrameType};
use crate::Sitemty;
use err::Error;
use netpod::NodeConfigCached;
use futures_core::Stream;
use netpod::{ByteOrder, NodeConfigCached, ScalarType, Shape};
use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry};
use std::pin::Pin;
pub fn pre_binned_bytes_for_http<SK>(
// TODO instead of EventNodeProcessor, use a T-binning processor here
// TODO might also want another stateful processor which can run on the merged event stream, like smoothing.
fn make_num_pipeline_nty_end_evs_enp<NTY, END, EVS, ENP>(
event_value_shape: EVS,
) -> Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>
where
NTY: NumOps + NumFromBytes<NTY, END> + 'static,
END: Endianness + 'static,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output>,
Sitemty<<ENP as EventsNodeProcessor>::Output>: Framable + 'static,
<ENP as EventsNodeProcessor>::Output: 'static,
{
err::todoval()
}
fn make_num_pipeline_nty_end<NTY, END>(shape: Shape) -> Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>
where
NTY: NumOps + NumFromBytes<NTY, END> + 'static,
END: Endianness + 'static,
{
match shape {
Shape::Scalar => make_num_pipeline_nty_end_evs_enp::<NTY, END, _, ProcAA<NTY>>(EventValuesDim0Case::new()),
Shape::Wave(n) => make_num_pipeline_nty_end_evs_enp::<NTY, END, _, ProcBB<NTY>>(EventValuesDim1Case::new(n)),
}
}
macro_rules! match_end {
($nty:ident, $end:expr, $shape:expr) => {
match $end {
ByteOrder::LE => make_num_pipeline_nty_end::<$nty, LittleEndian>($shape),
ByteOrder::BE => make_num_pipeline_nty_end::<$nty, BigEndian>($shape),
}
};
}
fn make_num_pipeline(
scalar_type: ScalarType,
byte_order: ByteOrder,
shape: Shape,
) -> Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>> {
match scalar_type {
ScalarType::I32 => match_end!(i32, byte_order, shape),
_ => todo!(),
}
}
pub async fn pre_binned_bytes_for_http<SK>(
node_config: &NodeConfigCached,
query: &PreBinnedQuery,
stream_kind: SK,
@@ -33,8 +89,23 @@ where
return Err(err);
}
let channel_config = read_local_config(&query.channel(), &node_config.node).await?;
let entry_res = extract_matching_config_entry(&query.patch().patch_range(), &channel_config)?;
let entry = match entry_res {
MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found")),
MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found")),
MatchingConfigEntry::Entry(entry) => entry,
};
let _shape = match entry.to_shape() {
Ok(k) => k,
Err(e) => return Err(e),
};
// TODO enable
if false {
// TODO
// Decide here analogue to conn in some steps which generic pipeline we use.
PreBinnedValueStream::new(query.clone(), node_config, stream_kind.clone());
err::todoval()
} else {

View File

@@ -20,6 +20,7 @@ use serde::{Deserialize, Serialize};
use std::{future, net, panic, pin, task};
use task::{Context, Poll};
use tracing::field::Empty;
use tracing::Instrument;
pub mod gather;
pub mod search;
@@ -365,30 +366,30 @@ async fn prebinned(req: Request<Body>, node_config: &NodeConfigCached) -> Result
let (head, _body) = req.into_parts();
let query = PreBinnedQuery::from_request(&head)?;
let desc = format!("pre-b-{}", query.patch().bin_t_len() / 1000000000);
let span1 = span!(Level::INFO, "httpret::prebinned", desc = &desc.as_str());
let span1 = span!(Level::INFO, "httpret::prebinned_DISABLED", desc = &desc.as_str());
// TODO remove StreamKind
let stream_kind = BinnedStreamKindScalar::new();
span1.in_scope(|| {
let ret = match pre_binned_bytes_for_http(node_config, &query, stream_kind) {
Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(
s,
format!(
"pre-b-{}-p-{}",
query.patch().bin_t_len() / 1000000000,
query.patch().patch_beg() / 1000000000,
),
))?,
Err(e) => {
if query.report_error() {
response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))?
} else {
error!("fn prebinned: {:?}", e);
response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?
}
//span1.in_scope(|| {});
let fut = pre_binned_bytes_for_http(node_config, &query, stream_kind).instrument(span1);
let ret = match fut.await {
Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(
s,
format!(
"pre-b-{}-p-{}",
query.patch().bin_t_len() / 1000000000,
query.patch().patch_beg() / 1000000000,
),
))?,
Err(e) => {
if query.report_error() {
response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))?
} else {
error!("fn prebinned: {:?}", e);
response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?
}
};
Ok(ret)
})
}
};
Ok(ret)
}
#[derive(Debug, Serialize, Deserialize)]