diff --git a/daqbuffer/src/test/events.rs b/daqbuffer/src/test/events.rs index 290ca83..4c8bd8c 100644 --- a/daqbuffer/src/test/events.rs +++ b/daqbuffer/src/test/events.rs @@ -14,20 +14,21 @@ use http::StatusCode; use hyper::Body; use netpod::log::*; use netpod::{Channel, Cluster, HostPort, NanoRange, PerfOpts}; +use serde_json::Value as JsonValue; use std::fmt::Debug; use std::future::ready; use tokio::io::AsyncRead; #[test] -fn get_plain_events_0() { - taskrun::run(get_plain_events_0_inner()).unwrap(); +fn get_plain_events_binary_0() { + taskrun::run(get_plain_events_binary_0_inner()).unwrap(); } -async fn get_plain_events_0_inner() -> Result<(), Error> { +async fn get_plain_events_binary_0_inner() -> Result<(), Error> { let rh = require_test_hosts_running()?; let cluster = &rh.cluster; if true { - get_plain_events::( + get_plain_events_binary::( "scalar-i32-be", "1970-01-01T00:20:10.000Z", "1970-01-01T00:20:50.000Z", @@ -40,7 +41,7 @@ async fn get_plain_events_0_inner() -> Result<(), Error> { Ok(()) } -async fn get_plain_events( +async fn get_plain_events_binary( channel_name: &str, beg_date: &str, end_date: &str, @@ -78,7 +79,7 @@ where } let s1 = disk::cache::HttpBodyAsAsyncRead::new(res); let s2 = InMemoryFrameAsyncReadStream::new(s1, perf_opts.inmem_bufcap); - let res = consume_plain_events::(s2).await?; + let res = consume_plain_events_binary::(s2).await?; let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; info!("time {} ms", ms); @@ -122,7 +123,7 @@ impl EventsResponse { } } -async fn consume_plain_events(inp: InMemoryFrameAsyncReadStream) -> Result +async fn consume_plain_events_binary(inp: InMemoryFrameAsyncReadStream) -> Result where NTY: NumOps, T: AsyncRead + Unpin, @@ -208,3 +209,65 @@ where info!("result: {:?}", ret); Ok(ret) } + +#[test] +fn get_plain_events_json_0() { + taskrun::run(get_plain_events_json_0_inner()).unwrap(); +} + +async fn get_plain_events_json_0_inner() -> Result<(), Error> { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + get_plain_events_json( + "scalar-i32-be", + "1970-01-01T00:20:10.000Z", + "1970-01-01T00:20:12.000Z", + cluster, + true, + 4, + ) + .await?; + Ok(()) +} + +async fn get_plain_events_json( + channel_name: &str, + beg_date: &str, + end_date: &str, + cluster: &Cluster, + _expect_range_complete: bool, + _expect_event_count: u64, +) -> Result<(), Error> { + let t1 = Utc::now(); + let node0 = &cluster.nodes[0]; + let beg_date: DateTime = beg_date.parse()?; + let end_date: DateTime = end_date.parse()?; + let channel_backend = "testbackend"; + let channel = Channel { + backend: channel_backend.into(), + name: channel_name.into(), + }; + let range = NanoRange::from_date_time(beg_date, end_date); + let query = PlainEventsQuery::new(channel, range); + let hp = HostPort::from_node(node0); + let url = query.url(&hp); + info!("get_plain_events get {}", url); + let req = hyper::Request::builder() + .method(http::Method::GET) + .uri(url) + .header("accept", "application/octet-stream") + .body(Body::empty())?; + let client = hyper::Client::new(); + let res = client.request(req).await?; + if res.status() != StatusCode::OK { + error!("client response {:?}", res); + } + let buf = hyper::body::to_bytes(res.into_body()).await?; + let s = String::from_utf8_lossy(&buf); + let res: JsonValue = serde_json::from_str(&s)?; + info!("GOT: {}", serde_json::to_string_pretty(&res)?); + let t2 = chrono::Utc::now(); + let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; + info!("time {} ms", ms); + Ok(()) +} diff --git a/disk/src/agg/streams.rs b/disk/src/agg/streams.rs index fac8ee2..025258e 100644 --- a/disk/src/agg/streams.rs +++ b/disk/src/agg/streams.rs @@ -16,7 +16,7 @@ pub enum StreamItem { Stats(StatsItem), } -pub trait Collector: WithLen { +pub trait Collector: Send + Unpin + WithLen { type Input: Collectable; type Output: Serialize; fn ingest(&mut self, src: &Self::Input); diff --git a/disk/src/binned.rs b/disk/src/binned.rs index c5194b9..365b6a4 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -509,7 +509,7 @@ where } } -struct Bool {} +pub struct Bool {} impl Bool { pub fn is_false(x: &bool) -> bool { diff --git a/disk/src/binned/query.rs b/disk/src/binned/query.rs index 5fd6cfb..42a80f6 100644 --- a/disk/src/binned/query.rs +++ b/disk/src/binned/query.rs @@ -388,7 +388,7 @@ impl PlainEventsQuery { pub fn url(&self, host: &HostPort) -> String { let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; format!( - "http://{}:{}/api/4/plain_events?channelBackend={}&channelName={}&begDate={}&endDate={}&timeout={}", + "http://{}:{}/api/4/plain_events_json?channelBackend={}&channelName={}&begDate={}&endDate={}&timeout={}", host.host, host.port, self.channel.backend, diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index 111a026..cf47369 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -1,18 +1,24 @@ use crate::agg::enp::Identity; -use crate::binned::NumOps; +use crate::agg::streams::{Collectable, Collector, StreamItem}; +use crate::binned::{NumOps, RangeCompletableItem}; use crate::decode::{ - BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case, + BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValues, EventValuesDim0Case, EventValuesDim1Case, LittleEndian, NumFromBytes, }; use crate::frame::makeframe::Framable; use crate::merge::mergedfromremotes::MergedFromRemotes; use crate::raw::EventsQuery; +use crate::Sitemty; +use bytes::Bytes; use err::Error; use futures_core::Stream; +use futures_util::future::FutureExt; use futures_util::StreamExt; use netpod::{AggKind, ByteOrder, Channel, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; +use serde_json::Value as JsonValue; use std::pin::Pin; +use std::time::Duration; pub trait ChannelExecFunction { type Output; @@ -21,7 +27,8 @@ pub trait ChannelExecFunction { where NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, - EVS: EventValueShape + EventValueFromBytes + 'static; + EVS: EventValueShape + EventValueFromBytes + 'static, + EventValues: Collectable; } fn channel_exec_nty_end_evs_enp( @@ -34,6 +41,7 @@ where NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, + EventValues: Collectable, { Ok(f.exec::(byte_order, event_value_shape)?) } @@ -43,6 +51,7 @@ where F: ChannelExecFunction, NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, + EventValues: Collectable, { match shape { Shape::Scalar => channel_exec_nty_end_evs_enp::<_, NTY, _, _>(f, byte_order, EventValuesDim0Case::new()), @@ -146,6 +155,7 @@ impl ChannelExecFunction for PlainEvents { NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, + EventValues: Collectable, { let _ = byte_order; let _ = event_value_shape; @@ -160,3 +170,118 @@ impl ChannelExecFunction for PlainEvents { Ok(Box::pin(s)) } } + +pub struct PlainEventsJson { + channel: Channel, + range: NanoRange, + agg_kind: AggKind, + node_config: NodeConfigCached, +} + +impl PlainEventsJson { + pub fn new(channel: Channel, range: NanoRange, node_config: NodeConfigCached) -> Self { + Self { + channel, + range, + agg_kind: AggKind::DimXBins1, + node_config, + } + } + + pub fn channel(&self) -> &Channel { + &self.channel + } + + pub fn range(&self) -> &NanoRange { + &self.range + } +} + +pub async fn collect_plain_events_json(stream: S, timeout: Duration) -> Result +where + S: Stream> + Unpin, + T: Collectable, +{ + let deadline = tokio::time::Instant::now() + timeout; + // TODO in general a Collector does not need to know about the expected number of bins. + // It would make more sense for some specific Collector kind to know. + // Therefore introduce finer grained types. + let mut collector = ::new_collector(0); + let mut i1 = 0; + let mut stream = stream; + loop { + let item = if i1 == 0 { + stream.next().await + } else { + if false { + None + } else { + match tokio::time::timeout_at(deadline, stream.next()).await { + Ok(k) => k, + Err(_) => { + collector.set_timed_out(); + None + } + } + } + }; + match item { + Some(item) => { + match item { + Ok(item) => match item { + StreamItem::Log(_) => {} + StreamItem::Stats(_) => {} + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + collector.set_range_complete(); + } + RangeCompletableItem::Data(item) => { + collector.ingest(&item); + i1 += 1; + } + }, + }, + Err(e) => { + // TODO Need to use some flags to get good enough error message for remote user. + Err(e)?; + } + }; + } + None => break, + } + } + let ret = serde_json::to_value(collector.result()?)?; + Ok(ret) +} + +impl ChannelExecFunction for PlainEventsJson { + type Output = Pin> + Send>>; + + fn exec(self, byte_order: END, event_value_shape: EVS) -> Result + where + NTY: NumOps + NumFromBytes + 'static, + END: Endianness + 'static, + EVS: EventValueShape + EventValueFromBytes + 'static, + EventValues: Collectable, + EventValues: Collectable, + { + let _ = byte_order; + let _ = event_value_shape; + let perf_opts = PerfOpts { inmem_bufcap: 4096 }; + let evq = EventsQuery { + channel: self.channel, + range: self.range, + agg_kind: self.agg_kind, + }; + let s = MergedFromRemotes::>::new(evq, perf_opts, self.node_config.node_config.cluster); + // TODO take time out from query parameter. + let f = collect_plain_events_json(s, Duration::from_millis(2000)); + //let s = s.map(|item| Box::new(item) as Box); + let f = FutureExt::map(f, |item| match item { + Ok(item) => Ok(Bytes::from(serde_json::to_vec(&item)?)), + Err(e) => Err(e.into()), + }); + let s = futures_util::stream::once(f); + Ok(Box::pin(s)) + } +} diff --git a/disk/src/decode.rs b/disk/src/decode.rs index dce36c8..b9662e4 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -1,9 +1,9 @@ use crate::agg::binnedt::TimeBinnableType; use crate::agg::enp::{Identity, WaveXBinner}; -use crate::agg::streams::{Appendable, StreamItem}; +use crate::agg::streams::{Appendable, Collectable, Collector, StreamItem}; use crate::agg::{Fits, FitsInside}; use crate::binned::{ - EventValuesAggregator, EventsNodeProcessor, FilterFittingInside, MinMaxAvgBins, NumOps, PushableIndex, + Bool, EventValuesAggregator, EventsNodeProcessor, FilterFittingInside, MinMaxAvgBins, NumOps, PushableIndex, RangeCompletableItem, RangeOverlapInfo, ReadPbv, ReadableFromFile, WithLen, WithTimestamps, }; use crate::eventblobs::EventBlobsComplete; @@ -297,6 +297,83 @@ where } } +pub struct EventValuesCollector { + vals: EventValues, + range_complete: bool, + timed_out: bool, +} + +impl EventValuesCollector { + pub fn new() -> Self { + Self { + vals: EventValues::empty(), + range_complete: false, + timed_out: false, + } + } +} + +impl WithLen for EventValuesCollector { + fn len(&self) -> usize { + self.vals.tss.len() + } +} + +#[derive(Serialize)] +pub struct EventValuesCollectorOutput { + ts0: u64, + tsoff: Vec, + values: Vec, + #[serde(skip_serializing_if = "Bool::is_false", rename = "finalisedRange")] + range_complete: bool, + #[serde(skip_serializing_if = "Bool::is_false", rename = "timedOut")] + timed_out: bool, +} + +impl Collector for EventValuesCollector +where + NTY: NumOps, +{ + type Input = EventValues; + type Output = EventValuesCollectorOutput; + + fn ingest(&mut self, src: &Self::Input) { + self.vals.append(src); + } + + fn set_range_complete(&mut self) { + self.range_complete = true; + } + + fn set_timed_out(&mut self) { + self.timed_out = true; + } + + fn result(self) -> Result { + let ts0 = self.vals.tss.first().map_or(0, |k| *k); + let tsoff = self.vals.tss.into_iter().map(|k| k - ts0).collect(); + let ret = Self::Output { + ts0, + tsoff, + values: self.vals.values, + range_complete: self.range_complete, + timed_out: self.timed_out, + }; + Ok(ret) + } +} + +impl Collectable for EventValues +where + NTY: NumOps, +{ + type Collector = EventValuesCollector; + + fn new_collector(_bin_count_exp: u32) -> Self::Collector { + Self::Collector::new() + } +} + pub struct EventsDecodedStream where NTY: NumOps + NumFromBytes, diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 2ca2005..d026c4a 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -175,6 +175,12 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } + } else if path == "/api/4/plain_events_json" { + if req.method() == Method::GET { + Ok(plain_events_json(req, &node_config).await?) + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } } else if path.starts_with("/api/4/gather/") { if req.method() == Method::GET { Ok(gather_get_json(req, &node_config).await?) @@ -402,6 +408,16 @@ async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Res Ok(ret) } +async fn plain_events_json(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + let (head, _body) = req.into_parts(); + let query = PlainEventsQuery::from_request(&head)?; + let op = + disk::channelexec::PlainEventsJson::new(query.channel().clone(), query.range().clone(), node_config.clone()); + let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), node_config).await?; + let ret = response(StatusCode::OK).body(BodyStream::wrapped(s, format!("plain_events")))?; + Ok(ret) +} + #[derive(Debug, Serialize, Deserialize)] pub struct NodeStatus { database_size: u64,