From 0c43d5367e57a655f7c387ce17700c9854fe0b3c Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 10 Jun 2021 17:37:40 +0200 Subject: [PATCH] First basic plain event fetch as binary --- daqbuffer/src/test.rs | 1 + daqbuffer/src/test/events.rs | 210 ++++++++++++++++++++++++++++ disk/src/binned.rs | 26 +++- disk/src/binned/pbv.rs | 4 +- disk/src/binned/query.rs | 87 +++++++++++- disk/src/channelexec.rs | 162 +++++++++++++++++++++ disk/src/lib.rs | 1 + disk/src/merge/mergedfromremotes.rs | 8 +- httpret/src/lib.rs | 18 ++- 9 files changed, 499 insertions(+), 18 deletions(-) create mode 100644 daqbuffer/src/test/events.rs create mode 100644 disk/src/channelexec.rs diff --git a/daqbuffer/src/test.rs b/daqbuffer/src/test.rs index 81f4552..e661948 100644 --- a/daqbuffer/src/test.rs +++ b/daqbuffer/src/test.rs @@ -20,6 +20,7 @@ use std::fmt::Debug; use std::future::ready; use tokio::io::AsyncRead; +pub mod events; pub mod json; #[test] diff --git a/daqbuffer/src/test/events.rs b/daqbuffer/src/test/events.rs new file mode 100644 index 0000000..290ca83 --- /dev/null +++ b/daqbuffer/src/test/events.rs @@ -0,0 +1,210 @@ +use crate::nodes::require_test_hosts_running; +use chrono::{DateTime, Utc}; +use disk::agg::streams::{StatsItem, StreamItem}; +use disk::binned::query::PlainEventsQuery; +use disk::binned::{NumOps, RangeCompletableItem, WithLen}; +use disk::decode::EventValues; +use disk::frame::inmem::InMemoryFrameAsyncReadStream; +use disk::frame::makeframe::FrameType; +use disk::streamlog::Streamlog; +use disk::Sitemty; +use err::Error; +use futures_util::{StreamExt, TryStreamExt}; +use http::StatusCode; +use hyper::Body; +use netpod::log::*; +use netpod::{Channel, Cluster, HostPort, NanoRange, PerfOpts}; +use std::fmt::Debug; +use std::future::ready; +use tokio::io::AsyncRead; + +#[test] +fn get_plain_events_0() { + taskrun::run(get_plain_events_0_inner()).unwrap(); +} + +async fn get_plain_events_0_inner() -> Result<(), Error> { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + if true { + get_plain_events::( + "scalar-i32-be", + "1970-01-01T00:20:10.000Z", + "1970-01-01T00:20:50.000Z", + cluster, + true, + 4, + ) + .await?; + } + Ok(()) +} + +async fn get_plain_events( + channel_name: &str, + beg_date: &str, + end_date: &str, + cluster: &Cluster, + _expect_range_complete: bool, + _expect_event_count: u64, +) -> Result +where + NTY: NumOps, +{ + let t1 = Utc::now(); + let node0 = &cluster.nodes[0]; + let beg_date: DateTime = beg_date.parse()?; + let end_date: DateTime = end_date.parse()?; + let channel_backend = "testbackend"; + let perf_opts = PerfOpts { inmem_bufcap: 512 }; + let channel = Channel { + backend: channel_backend.into(), + name: channel_name.into(), + }; + let range = NanoRange::from_date_time(beg_date, end_date); + let query = PlainEventsQuery::new(channel, range); + let hp = HostPort::from_node(node0); + let url = query.url(&hp); + info!("get_plain_events get {}", url); + let req = hyper::Request::builder() + .method(http::Method::GET) + .uri(url) + .header("accept", "application/octet-stream") + .body(Body::empty())?; + let client = hyper::Client::new(); + let res = client.request(req).await?; + if res.status() != StatusCode::OK { + error!("client response {:?}", res); + } + let s1 = disk::cache::HttpBodyAsAsyncRead::new(res); + let s2 = InMemoryFrameAsyncReadStream::new(s1, perf_opts.inmem_bufcap); + let res = consume_plain_events::(s2).await?; + let t2 = chrono::Utc::now(); + let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; + info!("time {} ms", ms); + if !res.is_valid() { + Ok(res) + } else { + Ok(res) + } +} + +#[derive(Debug)] +pub struct EventsResponse { + event_count: u64, + err_item_count: u64, + data_item_count: u64, + bytes_read: u64, + range_complete_count: u64, + log_item_count: u64, + stats_item_count: u64, +} + +impl EventsResponse { + pub fn new() -> Self { + Self { + event_count: 0, + err_item_count: 0, + data_item_count: 0, + bytes_read: 0, + range_complete_count: 0, + log_item_count: 0, + stats_item_count: 0, + } + } + + pub fn is_valid(&self) -> bool { + if self.range_complete_count > 1 { + false + } else { + true + } + } +} + +async fn consume_plain_events(inp: InMemoryFrameAsyncReadStream) -> Result +where + NTY: NumOps, + T: AsyncRead + Unpin, +{ + let s1 = inp + .map_err(|e| error!("TEST GOT ERROR {:?}", e)) + .filter_map(|item| { + let g = match item { + Ok(item) => match item { + StreamItem::Log(item) => { + Streamlog::emit(&item); + None + } + StreamItem::Stats(item) => { + info!("Stats: {:?}", item); + None + } + StreamItem::DataItem(frame) => { + if frame.tyid() != > as FrameType>::FRAME_TYPE_ID { + error!("test receives unexpected tyid {:x}", frame.tyid()); + None + } else { + match bincode::deserialize::>>(frame.buf()) { + Ok(item) => match item { + Ok(item) => match item { + StreamItem::Log(item) => { + Streamlog::emit(&item); + Some(Ok(StreamItem::Log(item))) + } + item => { + info!("TEST GOT ITEM {:?}", item); + Some(Ok(item)) + } + }, + Err(e) => { + error!("TEST GOT ERROR FRAME: {:?}", e); + Some(Err(e)) + } + }, + Err(e) => { + error!("bincode error: {:?}", e); + Some(Err(e.into())) + } + } + } + } + }, + Err(e) => Some(Err(Error::with_msg(format!("WEIRD EMPTY ERROR {:?}", e)))), + }; + ready(g) + }) + .fold(EventsResponse::new(), |mut a, k| { + let g = match k { + Ok(StreamItem::Log(_item)) => { + a.log_item_count += 1; + a + } + Ok(StreamItem::Stats(item)) => match item { + StatsItem::EventDataReadStats(item) => { + a.bytes_read += item.parsed_bytes; + a + } + }, + Ok(StreamItem::DataItem(item)) => match item { + RangeCompletableItem::RangeComplete => { + a.range_complete_count += 1; + a + } + RangeCompletableItem::Data(item) => { + a.data_item_count += 1; + a.event_count += WithLen::len(&item) as u64; + a + } + }, + Err(_e) => { + a.err_item_count += 1; + a + } + }; + ready(g) + }); + let ret = s1.await; + info!("result: {:?}", ret); + Ok(ret) +} diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 956fd9b..c5194b9 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -12,7 +12,7 @@ use crate::decode::{ LittleEndian, NumFromBytes, }; use crate::frame::makeframe::{Framable, FrameType, SubFrId}; -use crate::merge::mergedfromremotes::MergedFromRemotes2; +use crate::merge::mergedfromremotes::MergedFromRemotes; use crate::raw::EventsQuery; use crate::Sitemty; use bytes::{Bytes, BytesMut}; @@ -30,6 +30,7 @@ use num_traits::{AsPrimitive, Bounded, Zero}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize, Serializer}; +use std::fmt::Debug; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; @@ -83,7 +84,6 @@ where format!("binned_bytes_for_http BinnedRange::covering_range returned None"), ))?; let perf_opts = PerfOpts { inmem_bufcap: 512 }; - //let _shape = entry.to_shape()?; match PreBinnedPatchRange::covering_range(query.range().clone(), query.bin_count()) { Ok(Some(pre_range)) => { info!("binned_bytes_for_http found pre_range: {:?}", pre_range); @@ -121,7 +121,7 @@ where range: query.range().clone(), agg_kind: query.agg_kind().clone(), }; - let s = MergedFromRemotes2::::new(evq, perf_opts, node_config.node_config.cluster.clone()); + let s = MergedFromRemotes::::new(evq, perf_opts, node_config.node_config.cluster.clone()); let s = TBinnerStream::<_, ::Output>::new(s, range); let ret = BinnedResponseStat { stream: Box::pin(s), @@ -150,10 +150,11 @@ fn make_num_pipeline_nty_end_evs_enp( where PPP: PipelinePostProcessA, PPP: PipelinePostProcessB<<::Output as TimeBinnableType>::Output>, - NTY: NumOps + NumFromBytes + Serialize + 'static, + NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Output> + 'static, + // TODO require these properties in general: ::Output: TimeBinnableType + PushableIndex + Appendable + 'static, <::Output as TimeBinnableType>::Output: TimeBinnableType::Output as TimeBinnableType>::Output> + Unpin, @@ -182,7 +183,7 @@ fn make_num_pipeline_nty_end( where PPP: PipelinePostProcessA, PPP: PipelinePostProcessB>, - NTY: NumOps + NumFromBytes + Serialize + 'static, + NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, { match shape { @@ -744,12 +745,23 @@ pub trait RangeOverlapInfo { } pub trait NumOps: - Sized + Copy + Send + Unpin + Zero + AsPrimitive + Bounded + PartialOrd + SubFrId + Serialize + DeserializeOwned + Sized + + Copy + + Send + + Unpin + + Debug + + Zero + + AsPrimitive + + Bounded + + PartialOrd + + SubFrId + + Serialize + + DeserializeOwned { } impl NumOps for T where - T: Send + Unpin + Zero + AsPrimitive + Bounded + PartialOrd + SubFrId + Serialize + DeserializeOwned + T: Send + Unpin + Debug + Zero + AsPrimitive + Bounded + PartialOrd + SubFrId + Serialize + DeserializeOwned { } diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index aea64df..8599e79 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -6,7 +6,7 @@ use crate::binned::{EventsNodeProcessor, NumOps, PushableIndex, RangeCompletable use crate::cache::{write_pb_cache_min_max_avg_scalar, CacheFileDesc, WrittenPbCache}; use crate::decode::{Endianness, EventValueFromBytes, EventValueShape, NumFromBytes}; use crate::frame::makeframe::FrameType; -use crate::merge::mergedfromremotes::MergedFromRemotes2; +use crate::merge::mergedfromremotes::MergedFromRemotes; use crate::raw::EventsQuery; use crate::streamlog::Streamlog; use crate::Sitemty; @@ -123,7 +123,7 @@ where let range = BinnedRange::covering_range(evq.range.clone(), count as u32)? .ok_or(Error::with_msg("covering_range returns None"))?; let perf_opts = PerfOpts { inmem_bufcap: 512 }; - let s = MergedFromRemotes2::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); + let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); let ret = TBinnerStream::<_, ::Output>::new(s, range); Ok(Box::pin(ret)) } diff --git a/disk/src/binned/query.rs b/disk/src/binned/query.rs index 7189ae0..5fd6cfb 100644 --- a/disk/src/binned/query.rs +++ b/disk/src/binned/query.rs @@ -54,7 +54,7 @@ impl PreBinnedQuery { let disk_stats_every = disk_stats_every .parse() .map_err(|e| Error::with_msg(format!("can not parse diskStatsEveryKb {:?}", e)))?; - let ret = PreBinnedQuery { + let ret = Self { patch: PreBinnedPatchCoord::new(bin_t_len, patch_t_len, patch_ix), agg_kind: params .get("aggKind") @@ -177,8 +177,8 @@ pub struct BinnedQuery { } impl BinnedQuery { - pub fn new(channel: Channel, range: NanoRange, bin_count: u32, agg_kind: AggKind) -> BinnedQuery { - BinnedQuery { + pub fn new(channel: Channel, range: NanoRange, bin_count: u32, agg_kind: AggKind) -> Self { + Self { channel, range, bin_count, @@ -199,7 +199,7 @@ impl BinnedQuery { let disk_stats_every = disk_stats_every .parse() .map_err(|e| Error::with_msg(format!("can not parse diskStatsEveryKb {:?}", e)))?; - let ret = BinnedQuery { + let ret = Self { range: NanoRange { beg: beg_date.parse::>()?.to_nanos(), end: end_date.parse::>()?.to_nanos(), @@ -320,3 +320,82 @@ fn channel_from_params(params: &BTreeMap) -> Result Self { + Self { + channel, + range, + report_error: false, + timeout: Duration::from_millis(2000), + } + } + + pub fn from_request(req: &http::request::Parts) -> Result { + let params = netpod::query_params(req.uri.query()); + let beg_date = params.get("begDate").ok_or(Error::with_msg("missing begDate"))?; + let end_date = params.get("endDate").ok_or(Error::with_msg("missing endDate"))?; + let ret = Self { + range: NanoRange { + beg: beg_date.parse::>()?.to_nanos(), + end: end_date.parse::>()?.to_nanos(), + }, + channel: channel_from_params(¶ms)?, + report_error: params + .get("reportError") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?, + timeout: params + .get("timeout") + .map_or("2000", |k| k) + .parse::() + .map(|k| Duration::from_millis(k)) + .map_err(|e| Error::with_msg(format!("can not parse timeout {:?}", e)))?, + }; + Ok(ret) + } + + pub fn range(&self) -> &NanoRange { + &self.range + } + + pub fn channel(&self) -> &Channel { + &self.channel + } + + pub fn report_error(&self) -> bool { + self.report_error + } + + pub fn timeout(&self) -> Duration { + self.timeout + } + + pub fn set_timeout(&mut self, k: Duration) { + self.timeout = k; + } + + pub fn url(&self, host: &HostPort) -> String { + let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; + format!( + "http://{}:{}/api/4/plain_events?channelBackend={}&channelName={}&begDate={}&endDate={}&timeout={}", + host.host, + host.port, + self.channel.backend, + self.channel.name, + Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt), + Utc.timestamp_nanos(self.range.end as i64).format(date_fmt), + self.timeout.as_millis(), + ) + } +} diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs new file mode 100644 index 0000000..111a026 --- /dev/null +++ b/disk/src/channelexec.rs @@ -0,0 +1,162 @@ +use crate::agg::enp::Identity; +use crate::binned::NumOps; +use crate::decode::{ + BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case, + LittleEndian, NumFromBytes, +}; +use crate::frame::makeframe::Framable; +use crate::merge::mergedfromremotes::MergedFromRemotes; +use crate::raw::EventsQuery; +use err::Error; +use futures_core::Stream; +use futures_util::StreamExt; +use netpod::{AggKind, ByteOrder, Channel, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape}; +use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; +use std::pin::Pin; + +pub trait ChannelExecFunction { + type Output; + + fn exec(self, byte_order: END, event_value_shape: EVS) -> Result + where + NTY: NumOps + NumFromBytes + 'static, + END: Endianness + 'static, + EVS: EventValueShape + EventValueFromBytes + 'static; +} + +fn channel_exec_nty_end_evs_enp( + f: F, + byte_order: END, + event_value_shape: EVS, +) -> Result +where + F: ChannelExecFunction, + NTY: NumOps + NumFromBytes + 'static, + END: Endianness + 'static, + EVS: EventValueShape + EventValueFromBytes + 'static, +{ + Ok(f.exec::(byte_order, event_value_shape)?) +} + +fn channel_exec_nty_end(f: F, byte_order: END, shape: Shape) -> Result +where + F: ChannelExecFunction, + NTY: NumOps + NumFromBytes + 'static, + END: Endianness + 'static, +{ + match shape { + Shape::Scalar => channel_exec_nty_end_evs_enp::<_, NTY, _, _>(f, byte_order, EventValuesDim0Case::new()), + Shape::Wave(n) => channel_exec_nty_end_evs_enp::<_, NTY, _, _>(f, byte_order, EventValuesDim1Case::new(n)), + } +} + +macro_rules! match_end { + ($f:expr, $nty:ident, $end:expr, $shape:expr, $node_config:expr) => { + match $end { + ByteOrder::LE => channel_exec_nty_end::<_, $nty, _>($f, LittleEndian {}, $shape), + ByteOrder::BE => channel_exec_nty_end::<_, $nty, _>($f, BigEndian {}, $shape), + } + }; +} + +fn channel_exec_config( + f: F, + scalar_type: ScalarType, + byte_order: ByteOrder, + shape: Shape, + _node_config: &NodeConfigCached, +) -> Result +where + F: ChannelExecFunction, +{ + match scalar_type { + ScalarType::U8 => match_end!(f, u8, byte_order, shape, node_config), + ScalarType::U16 => match_end!(f, u16, byte_order, shape, node_config), + ScalarType::U32 => match_end!(f, u32, byte_order, shape, node_config), + ScalarType::U64 => match_end!(f, u64, byte_order, shape, node_config), + ScalarType::I8 => match_end!(f, i8, byte_order, shape, node_config), + ScalarType::I16 => match_end!(f, i16, byte_order, shape, node_config), + ScalarType::I32 => match_end!(f, i32, byte_order, shape, node_config), + ScalarType::I64 => match_end!(f, i64, byte_order, shape, node_config), + ScalarType::F32 => match_end!(f, f32, byte_order, shape, node_config), + ScalarType::F64 => match_end!(f, f64, byte_order, shape, node_config), + } +} + +pub async fn channel_exec( + f: F, + channel: &Channel, + range: &NanoRange, + node_config: &NodeConfigCached, +) -> Result +where + F: ChannelExecFunction, +{ + let channel_config = read_local_config(channel, &node_config.node).await?; + match extract_matching_config_entry(range, &channel_config)? { + MatchingConfigEntry::Multiple => Err(Error::with_msg("multiple config entries found"))?, + MatchingConfigEntry::None => { + // TODO function needs to provide some default. + err::todoval() + } + MatchingConfigEntry::Entry(entry) => { + let ret = channel_exec_config( + f, + entry.scalar_type.clone(), + entry.byte_order.clone(), + entry.to_shape()?, + node_config, + )?; + Ok(ret) + } + } +} + +pub struct PlainEvents { + channel: Channel, + range: NanoRange, + agg_kind: AggKind, + node_config: NodeConfigCached, +} + +impl PlainEvents { + pub fn new(channel: Channel, range: NanoRange, node_config: NodeConfigCached) -> Self { + Self { + channel, + range, + agg_kind: AggKind::DimXBins1, + node_config, + } + } + + pub fn channel(&self) -> &Channel { + &self.channel + } + + pub fn range(&self) -> &NanoRange { + &self.range + } +} + +impl ChannelExecFunction for PlainEvents { + type Output = Pin> + Send>>; + + fn exec(self, byte_order: END, event_value_shape: EVS) -> Result + where + NTY: NumOps + NumFromBytes + 'static, + END: Endianness + 'static, + EVS: EventValueShape + EventValueFromBytes + 'static, + { + let _ = byte_order; + let _ = event_value_shape; + let perf_opts = PerfOpts { inmem_bufcap: 4096 }; + let evq = EventsQuery { + channel: self.channel, + range: self.range, + agg_kind: self.agg_kind, + }; + let s = MergedFromRemotes::>::new(evq, perf_opts, self.node_config.node_config.cluster); + let s = s.map(|item| Box::new(item) as Box); + Ok(Box::pin(s)) + } +} diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 63cbf59..b577827 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -24,6 +24,7 @@ pub mod binned; pub mod binnedstream; pub mod cache; pub mod channelconfig; +pub mod channelexec; pub mod dataopen; pub mod decode; pub mod eventblobs; diff --git a/disk/src/merge/mergedfromremotes.rs b/disk/src/merge/mergedfromremotes.rs index 812b323..cf3672b 100644 --- a/disk/src/merge/mergedfromremotes.rs +++ b/disk/src/merge/mergedfromremotes.rs @@ -15,7 +15,7 @@ use std::task::{Context, Poll}; type T001 = Pin> + Send>>; type T002 = Pin, Error>> + Send>>; -pub struct MergedFromRemotes2 +pub struct MergedFromRemotes where ENP: EventsNodeProcessor, { @@ -26,7 +26,7 @@ where errored: bool, } -impl MergedFromRemotes2 +impl MergedFromRemotes where ENP: EventsNodeProcessor + 'static, ::Output: 'static, @@ -51,7 +51,7 @@ where } } -impl Stream for MergedFromRemotes2 +impl Stream for MergedFromRemotes where ENP: EventsNodeProcessor + 'static, ::Output: PushableIndex + Appendable, @@ -62,7 +62,7 @@ where use Poll::*; 'outer: loop { break if self.completed { - panic!("MergedFromRemotes poll_next on completed"); + panic!("poll_next on completed"); } else if self.errored { self.completed = true; return Ready(None); diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 8308477..2ca2005 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -1,7 +1,7 @@ use crate::gather::gather_get_json; use bytes::Bytes; use disk::binned::prebinned::pre_binned_bytes_for_http; -use disk::binned::query::{BinnedQuery, PreBinnedQuery}; +use disk::binned::query::{BinnedQuery, PlainEventsQuery, PreBinnedQuery}; use disk::raw::conn::events_service; use err::Error; use future::Future; @@ -169,6 +169,12 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } + } else if path == "/api/4/plain_events" { + if req.method() == Method::GET { + Ok(plain_events(req, &node_config).await?) + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } } else if path.starts_with("/api/4/gather/") { if req.method() == Method::GET { Ok(gather_get_json(req, &node_config).await?) @@ -386,6 +392,16 @@ async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result Ok(ret) } +async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + let (head, _body) = req.into_parts(); + let query = PlainEventsQuery::from_request(&head)?; + let op = disk::channelexec::PlainEvents::new(query.channel().clone(), query.range().clone(), node_config.clone()); + let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), node_config).await?; + let s = s.map(|item| item.make_frame()); + let ret = response(StatusCode::OK).body(BodyStream::wrapped(s, format!("plain_events")))?; + Ok(ret) +} + #[derive(Debug, Serialize, Deserialize)] pub struct NodeStatus { database_size: u64,