diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index 3a1c598..47ed4da 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -6,7 +6,6 @@ use crate::cache::{write_pb_cache_min_max_avg_scalar, CacheFileDesc, MergedFromR use crate::frame::makeframe::{make_frame, FrameType}; use crate::raw::EventsQuery; use crate::streamlog::Streamlog; -use bytes::Bytes; use err::Error; use futures_core::Stream; use futures_util::{FutureExt, StreamExt}; diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index 1ec3fb0..7323975 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -1,13 +1,69 @@ use crate::agg::streams::StreamItem; use crate::binned::pbv2::{pre_binned_value_byte_stream_new, PreBinnedValueByteStream, PreBinnedValueStream}; use crate::binned::query::PreBinnedQuery; -use crate::binned::{RangeCompletableItem, StreamKind}; +use crate::binned::{EventsNodeProcessor, NumOps, RangeCompletableItem, StreamKind}; use crate::cache::node_ix_for_patch; -use crate::frame::makeframe::FrameType; +use crate::decode::{ + BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case, + LittleEndian, NumFromBytes, ProcAA, ProcBB, +}; +use crate::frame::makeframe::{Framable, FrameType}; +use crate::Sitemty; use err::Error; -use netpod::NodeConfigCached; +use futures_core::Stream; +use netpod::{ByteOrder, NodeConfigCached, ScalarType, Shape}; +use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; +use std::pin::Pin; -pub fn pre_binned_bytes_for_http( +// TODO instead of EventNodeProcessor, use a T-binning processor here +// TODO might also want another stateful processor which can run on the merged event stream, like smoothing. + +fn make_num_pipeline_nty_end_evs_enp( + event_value_shape: EVS, +) -> Pin> + Send>> +where + NTY: NumOps + NumFromBytes + 'static, + END: Endianness + 'static, + EVS: EventValueShape + EventValueFromBytes + 'static, + ENP: EventsNodeProcessor>::Output>, + Sitemty<::Output>: Framable + 'static, + ::Output: 'static, +{ + err::todoval() +} + +fn make_num_pipeline_nty_end(shape: Shape) -> Pin> + Send>> +where + NTY: NumOps + NumFromBytes + 'static, + END: Endianness + 'static, +{ + match shape { + Shape::Scalar => make_num_pipeline_nty_end_evs_enp::>(EventValuesDim0Case::new()), + Shape::Wave(n) => make_num_pipeline_nty_end_evs_enp::>(EventValuesDim1Case::new(n)), + } +} + +macro_rules! match_end { + ($nty:ident, $end:expr, $shape:expr) => { + match $end { + ByteOrder::LE => make_num_pipeline_nty_end::<$nty, LittleEndian>($shape), + ByteOrder::BE => make_num_pipeline_nty_end::<$nty, BigEndian>($shape), + } + }; +} + +fn make_num_pipeline( + scalar_type: ScalarType, + byte_order: ByteOrder, + shape: Shape, +) -> Pin> + Send>> { + match scalar_type { + ScalarType::I32 => match_end!(i32, byte_order, shape), + _ => todo!(), + } +} + +pub async fn pre_binned_bytes_for_http( node_config: &NodeConfigCached, query: &PreBinnedQuery, stream_kind: SK, @@ -33,8 +89,23 @@ where return Err(err); } + let channel_config = read_local_config(&query.channel(), &node_config.node).await?; + let entry_res = extract_matching_config_entry(&query.patch().patch_range(), &channel_config)?; + let entry = match entry_res { + MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found")), + MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found")), + MatchingConfigEntry::Entry(entry) => entry, + }; + let _shape = match entry.to_shape() { + Ok(k) => k, + Err(e) => return Err(e), + }; + // TODO enable if false { + // TODO + // Decide here analogue to conn in some steps which generic pipeline we use. + PreBinnedValueStream::new(query.clone(), node_config, stream_kind.clone()); err::todoval() } else { diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 562f06b..f311500 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -20,6 +20,7 @@ use serde::{Deserialize, Serialize}; use std::{future, net, panic, pin, task}; use task::{Context, Poll}; use tracing::field::Empty; +use tracing::Instrument; pub mod gather; pub mod search; @@ -365,30 +366,30 @@ async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result let (head, _body) = req.into_parts(); let query = PreBinnedQuery::from_request(&head)?; let desc = format!("pre-b-{}", query.patch().bin_t_len() / 1000000000); - let span1 = span!(Level::INFO, "httpret::prebinned", desc = &desc.as_str()); + let span1 = span!(Level::INFO, "httpret::prebinned_DISABLED", desc = &desc.as_str()); // TODO remove StreamKind let stream_kind = BinnedStreamKindScalar::new(); - span1.in_scope(|| { - let ret = match pre_binned_bytes_for_http(node_config, &query, stream_kind) { - Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped( - s, - format!( - "pre-b-{}-p-{}", - query.patch().bin_t_len() / 1000000000, - query.patch().patch_beg() / 1000000000, - ), - ))?, - Err(e) => { - if query.report_error() { - response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))? - } else { - error!("fn prebinned: {:?}", e); - response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? - } + //span1.in_scope(|| {}); + let fut = pre_binned_bytes_for_http(node_config, &query, stream_kind).instrument(span1); + let ret = match fut.await { + Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped( + s, + format!( + "pre-b-{}-p-{}", + query.patch().bin_t_len() / 1000000000, + query.patch().patch_beg() / 1000000000, + ), + ))?, + Err(e) => { + if query.report_error() { + response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))? + } else { + error!("fn prebinned: {:?}", e); + response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? } - }; - Ok(ret) - }) + } + }; + Ok(ret) } #[derive(Debug, Serialize, Deserialize)]