From 74af61f7fbf937d19b407e6874ff52023f8b13cd Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 1 Dec 2022 16:10:43 +0100 Subject: [PATCH] Reenable test for plain event json data --- daqbufp2/src/test/api4.rs | 1 + daqbufp2/src/test/api4/binnedjson.rs | 81 ++++++++++++++ daqbufp2/src/test/api4/eventsjson.rs | 24 ++--- httpret/src/api4.rs | 1 + httpret/src/api4/binned.rs | 102 ++++++++++++++++++ httpret/src/events.rs | 3 +- httpret/src/httpret.rs | 1 + items/src/frame.rs | 2 - items/src/items.rs | 1 - items_2/src/collect.rs | 1 - items_2/src/items_2.rs | 1 - items_2/src/merger.rs | 154 +++++++++++++------------- nodenet/src/conn.rs | 34 ++++-- streams/src/collect.rs | 155 ++++++++++++++------------- streams/src/frames/inmem.rs | 44 ++++---- streams/src/merge.rs | 50 --------- streams/src/plaineventsjson.rs | 4 +- streams/src/tcprawclient.rs | 33 ++++++ 18 files changed, 432 insertions(+), 260 deletions(-) create mode 100644 daqbufp2/src/test/api4/binnedjson.rs create mode 100644 httpret/src/api4.rs create mode 100644 httpret/src/api4/binned.rs delete mode 100644 items_2/src/collect.rs diff --git a/daqbufp2/src/test/api4.rs b/daqbufp2/src/test/api4.rs index c8a9d81..f72445b 100644 --- a/daqbufp2/src/test/api4.rs +++ b/daqbufp2/src/test/api4.rs @@ -1 +1,2 @@ +pub mod binnedjson; pub mod eventsjson; diff --git a/daqbufp2/src/test/api4/binnedjson.rs b/daqbufp2/src/test/api4/binnedjson.rs new file mode 100644 index 0000000..257981a --- /dev/null +++ b/daqbufp2/src/test/api4/binnedjson.rs @@ -0,0 +1,81 @@ +use crate::err::ErrConv; +use crate::nodes::require_test_hosts_running; +use chrono::{DateTime, Utc}; +use err::Error; +use http::StatusCode; +use hyper::Body; +use netpod::query::BinnedQuery; +use netpod::APP_JSON; +use netpod::{log::*, AggKind}; +use netpod::{AppendToUrl, Channel, Cluster, HostPort, NanoRange}; +use serde_json::Value as JsonValue; +use url::Url; + +#[test] +fn binned_d0_json_00() -> Result<(), Error> { + let fut = async { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + let jsv = binned_d0_json( + Channel { + backend: "test-disk-databuffer".into(), + name: "scalar-i32-be".into(), + series: None, + }, + "1970-01-01T00:20:04.000Z", + "1970-01-01T00:20:37.000Z", + 6, + cluster, + ) + .await?; + info!("Receveided a response json value: {jsv:?}"); + let res: items_2::eventsdim0::EventsDim0CollectorOutput = serde_json::from_value(jsv)?; + // inmem was meant just for functional test, ignores the requested time range + assert_eq!(res.len(), 20); + assert_eq!(res.ts_anchor_sec(), 0); + Ok(()) + }; + taskrun::run(fut) +} + +async fn binned_d0_json( + channel: Channel, + beg_date: &str, + end_date: &str, + bin_count: u32, + cluster: &Cluster, +) -> Result { + let t1 = Utc::now(); + let node0 = &cluster.nodes[0]; + let beg_date: DateTime = beg_date.parse()?; + let end_date: DateTime = end_date.parse()?; + let range = NanoRange::from_date_time(beg_date, end_date); + let query = BinnedQuery::new(channel, range, bin_count, AggKind::TimeWeightedScalar); + let hp = HostPort::from_node(node0); + let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?; + query.append_to_url(&mut url); + let url = url; + info!("http get {}", url); + let req = hyper::Request::builder() + .method(http::Method::GET) + .uri(url.to_string()) + .header(http::header::ACCEPT, APP_JSON) + .body(Body::empty()) + .ec()?; + let client = hyper::Client::new(); + let res = client.request(req).await.ec()?; + if res.status() != StatusCode::OK { + error!("client response {:?}", res); + return Err(Error::with_msg_no_trace(format!("bad result {res:?}"))); + } + let buf = hyper::body::to_bytes(res.into_body()).await.ec()?; + let s = String::from_utf8_lossy(&buf); + let res: JsonValue = serde_json::from_str(&s)?; + let pretty = serde_json::to_string_pretty(&res)?; + trace!("{pretty}"); + let t2 = chrono::Utc::now(); + let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; + // TODO add timeout + debug!("time {} ms", ms); + Ok(res) +} diff --git a/daqbufp2/src/test/api4/eventsjson.rs b/daqbufp2/src/test/api4/eventsjson.rs index f28c71f..1a67457 100644 --- a/daqbufp2/src/test/api4/eventsjson.rs +++ b/daqbufp2/src/test/api4/eventsjson.rs @@ -17,7 +17,7 @@ fn events_plain_json_00() -> Result<(), Error> { let fut = async { let rh = require_test_hosts_running()?; let cluster = &rh.cluster; - events_plain_json( + let jsv = events_plain_json( Channel { backend: "test-inmem".into(), name: "inmem-d0-i32".into(), @@ -26,10 +26,13 @@ fn events_plain_json_00() -> Result<(), Error> { "1970-01-01T00:20:04.000Z", "1970-01-01T00:20:10.000Z", cluster, - true, - 4, ) .await?; + info!("Receveided a response json value: {jsv:?}"); + let res: items_2::eventsdim0::EventsDim0CollectorOutput = serde_json::from_value(jsv)?; + // inmem was meant just for functional test, ignores the requested time range + assert_eq!(res.len(), 20); + assert_eq!(res.ts_anchor_sec(), 0); Ok(()) }; taskrun::run(fut) @@ -49,11 +52,10 @@ fn events_plain_json_01() -> Result<(), Error> { "1970-01-01T00:20:10.000Z", "1970-01-01T00:20:13.000Z", cluster, - true, - 4, ) .await?; - let res: items_2::eventsdim0::EventsDim0CollectorOutput = serde_json::from_value(jsv).unwrap(); + info!("Receveided a response json value: {jsv:?}"); + let res: items_2::eventsdim0::EventsDim0CollectorOutput = serde_json::from_value(jsv)?; assert_eq!(res.ts_anchor_sec(), 1210); assert_eq!(res.pulse_anchor(), 2420); let exp = [2420., 2421., 2422., 2423., 2424., 2425.]; @@ -79,8 +81,6 @@ fn events_plain_json_02_range_incomplete() -> Result<(), Error> { "1970-01-03T23:59:55.000Z", "1970-01-04T00:00:01.000Z", cluster, - true, - 4, ) .await?; let res: items_2::eventsdim0::EventsDim0CollectorOutput = serde_json::from_value(jsv).unwrap(); @@ -97,8 +97,6 @@ async fn events_plain_json( beg_date: &str, end_date: &str, cluster: &Cluster, - _expect_range_complete: bool, - _expect_event_count: u64, ) -> Result { let t1 = Utc::now(); let node0 = &cluster.nodes[0]; @@ -110,7 +108,7 @@ async fn events_plain_json( let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; query.append_to_url(&mut url); let url = url; - info!("get_plain_events get {}", url); + info!("http get {}", url); let req = hyper::Request::builder() .method(http::Method::GET) .uri(url.to_string()) @@ -127,9 +125,7 @@ async fn events_plain_json( let s = String::from_utf8_lossy(&buf); let res: JsonValue = serde_json::from_str(&s)?; let pretty = serde_json::to_string_pretty(&res)?; - eprintln!("{pretty}"); - - // TODO assert more + trace!("{pretty}"); let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; // TODO add timeout diff --git a/httpret/src/api4.rs b/httpret/src/api4.rs new file mode 100644 index 0000000..a3b9424 --- /dev/null +++ b/httpret/src/api4.rs @@ -0,0 +1 @@ +pub mod binned; diff --git a/httpret/src/api4/binned.rs b/httpret/src/api4/binned.rs new file mode 100644 index 0000000..8676ee2 --- /dev/null +++ b/httpret/src/api4/binned.rs @@ -0,0 +1,102 @@ +use crate::bodystream::{response, ToPublicResponse}; +use crate::channelconfig::chconf_from_binned; +use crate::err::Error; +use crate::response_err; +use http::{Method, StatusCode}; +use http::{Request, Response}; +use hyper::Body; +use netpod::log::*; +use netpod::query::BinnedQuery; +use netpod::timeunits::SEC; +use netpod::FromUrl; +use netpod::NodeConfigCached; +use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET}; +use tracing::Instrument; +use url::Url; + +async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + info!("httpret plain_events_json req: {:?}", req); + let (head, _body) = req.into_parts(); + let query = BinnedQuery::from_url(&url).map_err(|e| { + let msg = format!("can not parse query: {}", e.msg()); + e.add_public_msg(msg) + })?; + let chconf = chconf_from_binned(&query, node_config).await?; + // Update the series id since we don't require some unique identifier yet. + let mut query = query; + query.set_series_id(chconf.series); + let query = query; + // --- + + let span1 = span!( + Level::INFO, + "httpret::binned", + beg = query.range().beg / SEC, + end = query.range().end / SEC, + ch = query.channel().name(), + ); + span1.in_scope(|| { + debug!("begin"); + }); + let _: Result<_, Error> = match head.headers.get(http::header::ACCEPT) { + //Some(v) if v == APP_OCTET => binned_binary(query, chconf, &ctx, node_config).await, + //Some(v) if v == APP_JSON || v == ACCEPT_ALL => binned_json(query, chconf, &ctx, node_config).await, + _ => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?), + }; + + // TODO analogue to `streams::plaineventsjson::plain_events_json` create a function for binned json. + + let item = streams::plaineventsjson::plain_events_json("", &node_config.node_config.cluster) + .instrument(span1) + .await?; + let buf = serde_json::to_vec(&item)?; + let ret = response(StatusCode::OK).body(Body::from(buf))?; + Ok(ret) +} + +async fn binned(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + info!("req: {:?}", req); + let accept = req + .headers() + .get(http::header::ACCEPT) + .map_or(ACCEPT_ALL, |k| k.to_str().unwrap_or(ACCEPT_ALL)); + let url = { + let s1 = format!("dummy:{}", req.uri()); + Url::parse(&s1) + .map_err(Error::from) + .map_err(|e| e.add_public_msg(format!("Can not parse query url")))? + }; + if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { + Ok(binned_json(url, req, node_config).await?) + } else if accept == APP_OCTET { + Ok(response_err( + StatusCode::NOT_ACCEPTABLE, + format!("binary binned data not yet available"), + )?) + } else { + let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("Unsupported Accept: {:?}", accept))?; + Ok(ret) + } +} + +pub struct BinnedHandler {} + +impl BinnedHandler { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/4/binned" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() != Method::GET { + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + } + match binned(req, node_config).await { + Ok(ret) => Ok(ret), + Err(e) => Ok(e.to_public_response()), + } + } +} diff --git a/httpret/src/events.rs b/httpret/src/events.rs index 79a129c..7426513 100644 --- a/httpret/src/events.rs +++ b/httpret/src/events.rs @@ -54,8 +54,7 @@ async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Res .map_err(Error::from) .map_err(|e| e.add_public_msg(format!("Can not parse query url")))? }; - // TODO format error. - if accept == APP_JSON || accept == ACCEPT_ALL { + if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { Ok(plain_events_json(url, req, node_config).await?) } else if accept == APP_OCTET { Ok(plain_events_binary(url, req, node_config).await?) diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 18b6098..af33654 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -1,4 +1,5 @@ pub mod api1; +pub mod api4; pub mod bodystream; pub mod channel_status; pub mod channelconfig; diff --git a/items/src/frame.rs b/items/src/frame.rs index 2a456fe..c018247 100644 --- a/items/src/frame.rs +++ b/items/src/frame.rs @@ -223,7 +223,6 @@ pub fn make_stats_frame(item: &StatsItem) -> Result { } pub fn make_range_complete_frame() -> Result { - warn!("make_range_complete_frame"); let enc = []; let mut h = crc32fast::Hasher::new(); h.update(&enc); @@ -315,7 +314,6 @@ where }; Ok(T::from_stats(k)) } else if frame.tyid() == RANGE_COMPLETE_FRAME_TYPE_ID { - warn!("decode_frame SEE RANGE COMPLETE FRAME TYPE"); // There is currently no content in this variant. Ok(T::from_range_complete()) } else { diff --git a/items/src/items.rs b/items/src/items.rs index 2c652c473..a964dd1 100644 --- a/items/src/items.rs +++ b/items/src/items.rs @@ -307,7 +307,6 @@ where T: Sized + serde::Serialize + FrameType, { fn make_frame(&self) -> Result { - info!("-------- make_frame for Sitemty"); match self { Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) => { let frame_type_id = k.frame_type_id(); diff --git a/items_2/src/collect.rs b/items_2/src/collect.rs deleted file mode 100644 index 8b13789..0000000 --- a/items_2/src/collect.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index 2545a61..c2ea8b7 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -1,6 +1,5 @@ pub mod binsdim0; pub mod channelevents; -pub mod collect; pub mod eventsdim0; pub mod merger; pub mod merger_cev; diff --git a/items_2/src/merger.rs b/items_2/src/merger.rs index 0f26efd..08cea99 100644 --- a/items_2/src/merger.rs +++ b/items_2/src/merger.rs @@ -3,6 +3,7 @@ use futures_util::{Stream, StreamExt}; use items::sitem_data; use items::{RangeCompletableItem, Sitemty, StreamItem}; use netpod::log::*; +use std::collections::VecDeque; use std::fmt; use std::ops::ControlFlow; use std::pin::Pin; @@ -10,20 +11,20 @@ use std::task::{Context, Poll}; #[allow(unused)] macro_rules! trace2 { - (D$($arg:tt)*) => (); - ($($arg:tt)*) => (eprintln!($($arg)*)); + ($($arg:tt)*) => (); + ($($arg:tt)*) => (trace!($($arg)*)); } #[allow(unused)] macro_rules! trace3 { - (D$($arg:tt)*) => (); - ($($arg:tt)*) => (eprintln!($($arg)*)); + ($($arg:tt)*) => (); + ($($arg:tt)*) => (trace!($($arg)*)); } #[allow(unused)] macro_rules! trace4 { - (D$($arg:tt)*) => (); - ($($arg:tt)*) => (eprintln!($($arg)*)); + ($($arg:tt)*) => (); + ($($arg:tt)*) => (trace!($($arg)*)); } #[derive(Debug)] @@ -58,10 +59,11 @@ pub struct Merger { out: Option, do_clear_out: bool, out_max_len: usize, - range_complete: bool, - done: bool, - done2: bool, - done3: bool, + range_complete: Vec, + out_of_band_queue: VecDeque>, + done_data: bool, + done_buffered: bool, + done_range_complete: bool, complete: bool, } @@ -76,9 +78,10 @@ where .field("items", &self.items) .field("out_max_len", &self.out_max_len) .field("range_complete", &self.range_complete) - .field("done", &self.done) - .field("done2", &self.done2) - .field("done3", &self.done3) + .field("out_of_band_queue", &self.out_of_band_queue.len()) + .field("done_data", &self.done_data) + .field("done_buffered", &self.done_buffered) + .field("done_range_complete", &self.done_range_complete) .finish() } } @@ -95,10 +98,11 @@ where out: None, do_clear_out: false, out_max_len, - range_complete: false, - done: false, - done2: false, - done3: false, + range_complete: vec![false; n], + out_of_band_queue: VecDeque::new(), + done_data: false, + done_buffered: false, + done_range_complete: false, complete: false, } } @@ -124,6 +128,7 @@ where fn process(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result, Error> { use ControlFlow::*; + trace4!("process"); let mut tslows = [None, None]; for (i1, itemopt) in self.items.iter_mut().enumerate() { if let Some(item) = itemopt { @@ -221,66 +226,63 @@ where } } - fn refill(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow> { + fn refill(mut self: Pin<&mut Self>, cx: &mut Context) -> Result, Error> { trace4!("refill"); - use ControlFlow::*; use Poll::*; let mut has_pending = false; - for i1 in 0..self.inps.len() { - let item = &self.items[i1]; - if item.is_none() { - while let Some(inp) = &mut self.inps[i1] { - trace4!("refill while"); + for i in 0..self.inps.len() { + if self.items[i].is_none() { + while let Some(inp) = self.inps[i].as_mut() { match inp.poll_next_unpin(cx) { - Ready(Some(Ok(k))) => { - match k { - StreamItem::DataItem(k) => match k { - RangeCompletableItem::RangeComplete => { - trace!("--------------------- ChannelEvents::RangeComplete \n======================"); - // TODO track range complete for all inputs, it's only complete if all inputs are complete. - self.range_complete = true; - eprintln!("TODO inp RangeComplete which does not fill slot"); - } - RangeCompletableItem::Data(k) => { - self.items[i1] = Some(k); - break; - } - }, - StreamItem::Log(_) => { - eprintln!("TODO inp Log which does not fill slot"); + Ready(Some(Ok(k))) => match k { + StreamItem::DataItem(k) => match k { + RangeCompletableItem::Data(k) => { + self.items[i] = Some(k); + trace4!("refilled {}", i); } - StreamItem::Stats(_) => { - eprintln!("TODO inp Stats which does not fill slot"); + RangeCompletableItem::RangeComplete => { + self.range_complete[i] = true; + debug!("Merger range_complete {:?}", self.range_complete); + continue; } + }, + StreamItem::Log(item) => { + // TODO limit queue length + self.out_of_band_queue.push_back(Ok(StreamItem::Log(item))); + continue; } + StreamItem::Stats(item) => { + // TODO limit queue length + self.out_of_band_queue.push_back(Ok(StreamItem::Stats(item))); + continue; + } + }, + Ready(Some(Err(e))) => { + self.inps[i] = None; + return Err(e.into()); } - Ready(Some(Err(e))) => return Break(Ready(e.into())), Ready(None) => { - self.inps[i1] = None; + self.inps[i] = None; } Pending => { has_pending = true; } } + break; } - } else { - trace4!("refill inp {} has {}", i1, item.as_ref().unwrap().len()); } } if has_pending { - Break(Pending) + Ok(Pending) } else { - Continue(()) + Ok(Ready(())) } } - fn poll3( - mut self: Pin<&mut Self>, - cx: &mut Context, - has_pending: bool, - ) -> ControlFlow>>> { + fn poll3(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow>>> { use ControlFlow::*; use Poll::*; + #[allow(unused)] let ninps = self.inps.iter().filter(|a| a.is_some()).count(); let nitems = self.items.iter().filter(|a| a.is_some()).count(); let nitemsmissing = self @@ -290,16 +292,11 @@ where .filter(|(a, b)| a.is_some() && b.is_none()) .count(); trace3!("ninps {ninps} nitems {nitems} nitemsmissing {nitemsmissing}"); - if ninps == 0 && nitems == 0 { - self.done = true; + if nitemsmissing != 0 { + let e = Error::from(format!("missing but no pending")); + Break(Ready(Some(Err(e)))) + } else if nitems == 0 { Break(Ready(None)) - } else if nitemsmissing != 0 { - if !has_pending { - let e = Error::from(format!("missing but no pending")); - Break(Ready(Some(Err(e)))) - } else { - Break(Pending) - } } else { match Self::process(Pin::new(&mut self), cx) { Ok(Break(())) => { @@ -332,9 +329,9 @@ where use ControlFlow::*; use Poll::*; match Self::refill(Pin::new(&mut self), cx) { - Continue(()) => Self::poll3(self, cx, false), - Break(Pending) => Self::poll3(self, cx, true), - Break(Ready(e)) => Break(Ready(Some(Err(e)))), + Ok(Ready(())) => Self::poll3(self, cx), + Ok(Pending) => Break(Pending), + Err(e) => Break(Ready(Some(Err(e)))), } } } @@ -347,43 +344,44 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - const NAME: &str = "Merger_mergeable"; - let span = span!(Level::TRACE, NAME); + let span = span!(Level::TRACE, "merger"); let _spanguard = span.enter(); loop { - trace3!("{NAME} poll"); + trace3!("poll"); break if self.complete { panic!("poll after complete"); - } else if self.done3 { + } else if self.done_range_complete { self.complete = true; Ready(None) - } else if self.done2 { - self.done3 = true; - if self.range_complete { - warn!("TODO emit range complete only if all inputs signaled complete"); - trace!("{NAME} emit RangeComplete"); + } else if self.done_buffered { + self.done_range_complete = true; + if self.range_complete.iter().all(|x| *x) { + trace!("emit RangeComplete"); Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) } else { continue; } - } else if self.done { - self.done2 = true; + } else if self.done_data { + self.done_buffered = true; if let Some(out) = self.out.take() { Ready(Some(sitem_data(out))) } else { continue; } + } else if let Some(item) = self.out_of_band_queue.pop_front() { + trace4!("emit out-of-band"); + Ready(Some(item)) } else { match Self::poll2(self.as_mut(), cx) { ControlFlow::Continue(()) => continue, ControlFlow::Break(k) => match k { Ready(Some(Ok(item))) => Ready(Some(sitem_data(item))), Ready(Some(Err(e))) => { - self.done = true; + self.done_data = true; Ready(Some(Err(e.into()))) } Ready(None) => { - self.done = true; + self.done_data = true; continue; } Pending => Pending, diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index bcc16ff..02f52c4 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -57,35 +57,49 @@ async fn events_conn_handler_inner_try( let (netin, mut netout) = stream.into_split(); let perf_opts = PerfOpts { inmem_bufcap: 512 }; let mut h = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap); - let mut frames = vec![]; + let mut frames = Vec::new(); while let Some(k) = h .next() - .instrument(span!(Level::INFO, "events_conn_handler INPUT STREAM READ")) + .instrument(span!(Level::INFO, "events_conn_handler/query-input")) .await { match k { Ok(StreamItem::DataItem(item)) => { + info!("GOT FRAME: {:?}", item); frames.push(item); } - Ok(_) => {} + Ok(item) => { + debug!("ignored incoming frame {:?}", item); + } Err(e) => { - return Err((e, netout))?; + return Err((e, netout).into()); } } } + debug!("events_conn_handler input frames received"); if frames.len() != 1 { - error!("missing command frame"); - return Err((Error::with_msg("missing command frame"), netout))?; + error!("{:?}", frames); + error!("missing command frame len {}", frames.len()); + return Err((Error::with_msg("missing command frame"), netout).into()); + } + //if frames[1].tyid() != items::TERM_FRAME_TYPE_ID { + // return Err((Error::with_msg("input without term frame"), netout).into()); + //} + let query_frame = &frames[0]; + if query_frame.tyid() != items::EVENT_QUERY_JSON_STRING_FRAME { + return Err((Error::with_msg("query frame wrong type"), netout).into()); } // TODO this does not need all variants of Sitemty. - let qitem = match decode_frame::>(&frames[0]) { + let qitem = match decode_frame::>(query_frame) { Ok(k) => match k { Ok(k) => match k { StreamItem::DataItem(k) => match k { RangeCompletableItem::Data(k) => k, - RangeCompletableItem::RangeComplete => panic!(), + RangeCompletableItem::RangeComplete => { + return Err((Error::with_msg("bad query item"), netout).into()) + } }, - _ => panic!(), + _ => return Err((Error::with_msg("bad query item"), netout).into()), }, Err(e) => return Err((e, netout).into()), }, @@ -96,7 +110,7 @@ async fn events_conn_handler_inner_try( Ok(k) => k, Err(e) => { error!("json parse error: {:?}", e); - return Err((Error::with_msg("json parse error"), netout))?; + return Err((Error::with_msg("json parse error"), netout).into()); } }; info!("events_conn_handler_inner_try evq {:?}", evq); diff --git a/streams/src/collect.rs b/streams/src/collect.rs index 10ca585..9d22589 100644 --- a/streams/src/collect.rs +++ b/streams/src/collect.rs @@ -5,6 +5,7 @@ use items_0::collect_c::{Collectable, Collector}; use netpod::log::*; use std::fmt; use std::time::{Duration, Instant}; +use tracing::Instrument; #[allow(unused)] macro_rules! trace2 { @@ -33,88 +34,90 @@ where S: Stream> + Unpin, T: Collectable + fmt::Debug, { - let mut collector: Option<::Collector> = None; - let mut stream = stream; - let deadline = deadline.into(); - let mut range_complete = false; - let mut total_duration = Duration::ZERO; - loop { - let item = match tokio::time::timeout_at(deadline, stream.next()).await { - Ok(Some(k)) => k, - Ok(None) => break, - Err(_e) => { - if let Some(coll) = collector.as_mut() { - coll.set_timed_out(); - } else { - eprintln!("TODO [861a95813]"); - err::todo(); - } - break; - } - }; - match item { - Ok(item) => match item { - StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => { - range_complete = true; - if let Some(coll) = collector.as_mut() { - coll.set_range_complete(); - } else { - eprintln!("TODO [7cc0fca8f]"); - err::todo(); - } + let span = tracing::span!(tracing::Level::TRACE, "collect"); + let fut = async { + let mut collector: Option<::Collector> = None; + let mut stream = stream; + let deadline = deadline.into(); + let mut range_complete = false; + let mut total_duration = Duration::ZERO; + loop { + let item = match tokio::time::timeout_at(deadline, stream.next()).await { + Ok(Some(k)) => k, + Ok(None) => break, + Err(_e) => { + if let Some(coll) = collector.as_mut() { + coll.set_timed_out(); + } else { + warn!("Timeout but no collector yet"); } - RangeCompletableItem::Data(mut item) => { - eprintln!("COLLECTOR INGEST ITEM"); - if collector.is_none() { - let c = item.new_collector(); - collector = Some(c); + break; + } + }; + match item { + Ok(item) => match item { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + range_complete = true; + if let Some(coll) = collector.as_mut() { + coll.set_range_complete(); + } else { + warn!("Received RangeComplete but no collector yet"); + } } - let coll = collector.as_mut().unwrap(); - coll.ingest(&mut item); - if coll.len() as u64 >= events_max { - break; + RangeCompletableItem::Data(mut item) => { + if collector.is_none() { + let c = item.new_collector(); + collector = Some(c); + } + let coll = collector.as_mut().unwrap(); + coll.ingest(&mut item); + if coll.len() as u64 >= events_max { + warn!("Reached events_max {} abort", events_max); + break; + } + } + }, + StreamItem::Log(item) => { + trace!("Log {:?}", item); + } + StreamItem::Stats(item) => { + trace!("Stats {:?}", item); + use items::StatsItem; + use netpod::DiskStats; + match item { + // TODO factor and simplify the stats collection: + StatsItem::EventDataReadStats(_) => {} + StatsItem::RangeFilterStats(_) => {} + StatsItem::DiskStats(item) => match item { + DiskStats::OpenStats(k) => { + total_duration += k.duration; + } + DiskStats::SeekStats(k) => { + total_duration += k.duration; + } + DiskStats::ReadStats(k) => { + total_duration += k.duration; + } + DiskStats::ReadExactStats(k) => { + total_duration += k.duration; + } + }, } } }, - StreamItem::Log(item) => { - trace!("Log {:?}", item); + Err(e) => { + // TODO Need to use some flags to get good enough error message for remote user. + return Err(e); } - StreamItem::Stats(item) => { - trace!("Stats {:?}", item); - use items::StatsItem; - use netpod::DiskStats; - match item { - // TODO factor and simplify the stats collection: - StatsItem::EventDataReadStats(_) => {} - StatsItem::RangeFilterStats(_) => {} - StatsItem::DiskStats(item) => match item { - DiskStats::OpenStats(k) => { - total_duration += k.duration; - } - DiskStats::SeekStats(k) => { - total_duration += k.duration; - } - DiskStats::ReadStats(k) => { - total_duration += k.duration; - } - DiskStats::ReadExactStats(k) => { - total_duration += k.duration; - } - }, - } - } - }, - Err(e) => { - // TODO Need to use some flags to get good enough error message for remote user. - Err(e)?; } } - } - let _ = range_complete; - let res = collector - .ok_or_else(|| Error::with_msg_no_trace(format!("no collector created")))? - .result()?; - debug!("Total duration: {:?}", total_duration); - Ok(res) + let _ = range_complete; + let res = collector + .ok_or_else(|| Error::with_msg_no_trace(format!("no result because no collector was created")))? + .result()?; + debug!("Total duration: {:?}", total_duration); + Ok(res) + }; + fut.instrument(span).await } diff --git a/streams/src/frames/inmem.rs b/streams/src/frames/inmem.rs index 96d667d..e07d214 100644 --- a/streams/src/frames/inmem.rs +++ b/streams/src/frames/inmem.rs @@ -3,13 +3,19 @@ use bytes::Bytes; use err::Error; use futures_util::{pin_mut, Stream}; use items::inmem::InMemoryFrame; -use items::StreamItem; +use items::{StreamItem, TERM_FRAME_TYPE_ID}; use items::{INMEM_FRAME_FOOT, INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC}; use netpod::log::*; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, ReadBuf}; +#[allow(unused)] +macro_rules! trace2 { + ($($arg:tt)*) => (); + ($($arg:tt)*) => (trace!($($arg)*)); +} + impl err::ToErr for crate::slidebuf::Error { fn to_err(self) -> Error { Error::with_msg_no_trace(format!("{self}")) @@ -29,7 +35,6 @@ where done: bool, complete: bool, inp_bytes_consumed: u64, - npoll: u64, } impl InMemoryFrameAsyncReadStream @@ -44,12 +49,11 @@ where done: false, complete: false, inp_bytes_consumed: 0, - npoll: 0, } } fn poll_upstream(&mut self, cx: &mut Context) -> Poll> { - trace!("poll_upstream"); + trace2!("poll_upstream"); use Poll::*; let mut buf = ReadBuf::new(self.buf.available_writable_area(self.need_min - self.buf.len())?); let inp = &mut self.inp; @@ -58,7 +62,7 @@ where Ready(Ok(())) => { let n = buf.filled().len(); self.buf.wadv(n)?; - trace!("InMemoryFrameAsyncReadStream READ {} FROM UPSTREAM", n); + trace!("recv bytes {}", n); Ready(Ok(n)) } Ready(Err(e)) => Ready(Err(e.into())), @@ -66,12 +70,10 @@ where } } - // Try to parse a frame. + // Try to consume bytes to parse a frame. // Update the need_min to the most current state. - // If successful, return item and number of bytes consumed. // Must only be called when at least `need_min` bytes are available. fn parse(&mut self) -> Result, Error> { - trace!("parse"); let buf = self.buf.data(); if buf.len() < self.need_min { return Err(Error::with_msg_no_trace("expect at least need_min")); @@ -116,9 +118,6 @@ where h.update(&buf[INMEM_FRAME_HEAD..p1]); let payload_crc = h.finalize(); let frame_crc_ind = u32::from_le_bytes(buf[p1..p1 + 4].try_into()?); - //info!("len {}", len); - //info!("payload_crc_ind {}", payload_crc_ind); - //info!("frame_crc_ind {}", frame_crc_ind); let payload_crc_match = payload_crc_exp == payload_crc; let frame_crc_match = frame_crc_ind == frame_crc; if !frame_crc_match || !payload_crc_match { @@ -152,11 +151,8 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - trace!("poll"); - self.npoll += 1; - if self.npoll > 2000 { - panic!() - } + let span = span!(Level::TRACE, "inmem"); + let _spanguard = span.enter(); loop { break if self.complete { panic!("poll_next on complete") @@ -171,11 +167,17 @@ where let e = Error::with_msg_no_trace("enough bytes but nothing parsed"); Ready(Some(Err(e))) } else { - debug!("not enouh for parse, need to wait for more"); continue; } } - Ok(Some(item)) => Ready(Some(Ok(StreamItem::DataItem(item)))), + Ok(Some(item)) => { + if item.tyid() == TERM_FRAME_TYPE_ID { + self.done = true; + continue; + } else { + Ready(Some(Ok(StreamItem::DataItem(item)))) + } + } Err(e) => { self.done = true; Ready(Some(Err(e))) @@ -184,7 +186,6 @@ where } else { match self.poll_upstream(cx) { Ready(Ok(n1)) => { - debug!("read {n1}"); if n1 == 0 { self.done = true; continue; @@ -197,10 +198,7 @@ where self.done = true; Ready(Some(Err(e))) } - Pending => { - debug!("PENDING"); - Pending - } + Pending => Pending, } }; } diff --git a/streams/src/merge.rs b/streams/src/merge.rs index dae4cbe..695d506 100644 --- a/streams/src/merge.rs +++ b/streams/src/merge.rs @@ -1,51 +1 @@ pub mod mergedstream; - -use crate::frames::eventsfromframes::EventsFromFrames; -use crate::frames::inmem::InMemoryFrameAsyncReadStream; -use err::Error; -use futures_util::Stream; -use futures_util::StreamExt; -use items::frame::make_frame; -use items::frame::make_term_frame; -use items::sitem_data; -use items::EventQueryJsonStringFrame; -use items::Sitemty; -use netpod::log::*; -use netpod::Cluster; -use std::pin::Pin; -use tokio::io::AsyncWriteExt; -use tokio::net::TcpStream; - -pub type BoxedStream = Pin> + Send>>; - -pub async fn open_tcp_streams(query: Q, cluster: &Cluster) -> Result>, Error> -where - Q: serde::Serialize, - // Group bounds in new trait - T: items::FrameTypeInnerStatic + serde::de::DeserializeOwned + Send + Unpin + 'static, -{ - // TODO when unit tests established, change to async connect: - let mut streams = Vec::new(); - for node in &cluster.nodes { - debug!("open_tcp_streams to: {}:{}", node.host, node.port_raw); - let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; - let qjs = serde_json::to_string(&query)?; - let (netin, mut netout) = net.into_split(); - let item = EventQueryJsonStringFrame(qjs); - let item = sitem_data(item); - let buf = make_frame(&item)?; - netout.write_all(&buf).await?; - let buf = make_term_frame()?; - netout.write_all(&buf).await?; - netout.flush().await?; - netout.forget(); - // TODO for images, we need larger buffer capacity - let frames = InMemoryFrameAsyncReadStream::new(netin, 1024 * 128); - let stream = EventsFromFrames::<_, T>::new(frames); - let stream = stream.inspect(|x| { - items::on_sitemty_range_complete!(x, warn!("RangeComplete SEEN IN RECEIVED TCP STREAM")); - }); - streams.push(Box::pin(stream) as _); - } - Ok(streams) -} diff --git a/streams/src/plaineventsjson.rs b/streams/src/plaineventsjson.rs index 65818ff..367ce47 100644 --- a/streams/src/plaineventsjson.rs +++ b/streams/src/plaineventsjson.rs @@ -1,4 +1,4 @@ -use crate::merge::open_tcp_streams; +use crate::tcprawclient::open_tcp_streams; use bytes::Bytes; use err::Error; use futures_util::{Stream, StreamExt}; @@ -41,7 +41,7 @@ where stream }; let stream = { items_2::merger::Merger::new(inps, 1) }; - let deadline = Instant::now() + Duration::from_millis(2000); + let deadline = Instant::now() + Duration::from_millis(8000); let events_max = 100; let collected = crate::collect::collect(stream, deadline, events_max).await?; let jsval = serde_json::to_value(&collected)?; diff --git a/streams/src/tcprawclient.rs b/streams/src/tcprawclient.rs index d0e518a..86efe23 100644 --- a/streams/src/tcprawclient.rs +++ b/streams/src/tcprawclient.rs @@ -11,9 +11,11 @@ use err::Error; use futures_util::Stream; use items::eventfull::EventFull; use items::frame::{make_frame, make_term_frame}; +use items::sitem_data; use items::{EventQueryJsonStringFrame, EventsNodeProcessor, RangeCompletableItem, Sitemty, StreamItem}; use netpod::log::*; use netpod::query::RawEventsQuery; +use netpod::Cluster; use netpod::{Node, PerfOpts}; use std::pin::Pin; use tokio::io::AsyncWriteExt; @@ -71,3 +73,34 @@ pub async fn x_processed_event_blobs_stream_from_node( let items = EventsFromFrames::new(frames); Ok(Box::pin(items)) } + +pub type BoxedStream = Pin> + Send>>; + +pub async fn open_tcp_streams(query: Q, cluster: &Cluster) -> Result>, Error> +where + Q: serde::Serialize, + // Group bounds in new trait + T: items::FrameTypeInnerStatic + serde::de::DeserializeOwned + Send + Unpin + 'static, +{ + // TODO when unit tests established, change to async connect: + let mut streams = Vec::new(); + for node in &cluster.nodes { + debug!("open_tcp_streams to: {}:{}", node.host, node.port_raw); + let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; + let qjs = serde_json::to_string(&query)?; + let (netin, mut netout) = net.into_split(); + let item = EventQueryJsonStringFrame(qjs); + let item = sitem_data(item); + let buf = make_frame(&item)?; + netout.write_all(&buf).await?; + let buf = make_term_frame()?; + netout.write_all(&buf).await?; + netout.flush().await?; + netout.forget(); + // TODO for images, we need larger buffer capacity + let frames = InMemoryFrameAsyncReadStream::new(netin, 1024 * 128); + let stream = EventsFromFrames::<_, T>::new(frames); + streams.push(Box::pin(stream) as _); + } + Ok(streams) +}