Reenable test for plain event json data

This commit is contained in:
Dominik Werder
2022-12-01 16:10:43 +01:00
parent 8082271c2a
commit 74af61f7fb
18 changed files with 432 additions and 260 deletions

View File

@@ -1 +1,2 @@
pub mod binnedjson;
pub mod eventsjson;

View File

@@ -0,0 +1,81 @@
use crate::err::ErrConv;
use crate::nodes::require_test_hosts_running;
use chrono::{DateTime, Utc};
use err::Error;
use http::StatusCode;
use hyper::Body;
use netpod::query::BinnedQuery;
use netpod::APP_JSON;
use netpod::{log::*, AggKind};
use netpod::{AppendToUrl, Channel, Cluster, HostPort, NanoRange};
use serde_json::Value as JsonValue;
use url::Url;
#[test]
fn binned_d0_json_00() -> Result<(), Error> {
let fut = async {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
let jsv = binned_d0_json(
Channel {
backend: "test-disk-databuffer".into(),
name: "scalar-i32-be".into(),
series: None,
},
"1970-01-01T00:20:04.000Z",
"1970-01-01T00:20:37.000Z",
6,
cluster,
)
.await?;
info!("Receveided a response json value: {jsv:?}");
let res: items_2::eventsdim0::EventsDim0CollectorOutput<i32> = serde_json::from_value(jsv)?;
// inmem was meant just for functional test, ignores the requested time range
assert_eq!(res.len(), 20);
assert_eq!(res.ts_anchor_sec(), 0);
Ok(())
};
taskrun::run(fut)
}
async fn binned_d0_json(
channel: Channel,
beg_date: &str,
end_date: &str,
bin_count: u32,
cluster: &Cluster,
) -> Result<JsonValue, Error> {
let t1 = Utc::now();
let node0 = &cluster.nodes[0];
let beg_date: DateTime<Utc> = beg_date.parse()?;
let end_date: DateTime<Utc> = end_date.parse()?;
let range = NanoRange::from_date_time(beg_date, end_date);
let query = BinnedQuery::new(channel, range, bin_count, AggKind::TimeWeightedScalar);
let hp = HostPort::from_node(node0);
let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?;
query.append_to_url(&mut url);
let url = url;
info!("http get {}", url);
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri(url.to_string())
.header(http::header::ACCEPT, APP_JSON)
.body(Body::empty())
.ec()?;
let client = hyper::Client::new();
let res = client.request(req).await.ec()?;
if res.status() != StatusCode::OK {
error!("client response {:?}", res);
return Err(Error::with_msg_no_trace(format!("bad result {res:?}")));
}
let buf = hyper::body::to_bytes(res.into_body()).await.ec()?;
let s = String::from_utf8_lossy(&buf);
let res: JsonValue = serde_json::from_str(&s)?;
let pretty = serde_json::to_string_pretty(&res)?;
trace!("{pretty}");
let t2 = chrono::Utc::now();
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
// TODO add timeout
debug!("time {} ms", ms);
Ok(res)
}

View File

@@ -17,7 +17,7 @@ fn events_plain_json_00() -> Result<(), Error> {
let fut = async {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
events_plain_json(
let jsv = events_plain_json(
Channel {
backend: "test-inmem".into(),
name: "inmem-d0-i32".into(),
@@ -26,10 +26,13 @@ fn events_plain_json_00() -> Result<(), Error> {
"1970-01-01T00:20:04.000Z",
"1970-01-01T00:20:10.000Z",
cluster,
true,
4,
)
.await?;
info!("Receveided a response json value: {jsv:?}");
let res: items_2::eventsdim0::EventsDim0CollectorOutput<i32> = serde_json::from_value(jsv)?;
// inmem was meant just for functional test, ignores the requested time range
assert_eq!(res.len(), 20);
assert_eq!(res.ts_anchor_sec(), 0);
Ok(())
};
taskrun::run(fut)
@@ -49,11 +52,10 @@ fn events_plain_json_01() -> Result<(), Error> {
"1970-01-01T00:20:10.000Z",
"1970-01-01T00:20:13.000Z",
cluster,
true,
4,
)
.await?;
let res: items_2::eventsdim0::EventsDim0CollectorOutput<i32> = serde_json::from_value(jsv).unwrap();
info!("Receveided a response json value: {jsv:?}");
let res: items_2::eventsdim0::EventsDim0CollectorOutput<i32> = serde_json::from_value(jsv)?;
assert_eq!(res.ts_anchor_sec(), 1210);
assert_eq!(res.pulse_anchor(), 2420);
let exp = [2420., 2421., 2422., 2423., 2424., 2425.];
@@ -79,8 +81,6 @@ fn events_plain_json_02_range_incomplete() -> Result<(), Error> {
"1970-01-03T23:59:55.000Z",
"1970-01-04T00:00:01.000Z",
cluster,
true,
4,
)
.await?;
let res: items_2::eventsdim0::EventsDim0CollectorOutput<i32> = serde_json::from_value(jsv).unwrap();
@@ -97,8 +97,6 @@ async fn events_plain_json(
beg_date: &str,
end_date: &str,
cluster: &Cluster,
_expect_range_complete: bool,
_expect_event_count: u64,
) -> Result<JsonValue, Error> {
let t1 = Utc::now();
let node0 = &cluster.nodes[0];
@@ -110,7 +108,7 @@ async fn events_plain_json(
let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?;
query.append_to_url(&mut url);
let url = url;
info!("get_plain_events get {}", url);
info!("http get {}", url);
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri(url.to_string())
@@ -127,9 +125,7 @@ async fn events_plain_json(
let s = String::from_utf8_lossy(&buf);
let res: JsonValue = serde_json::from_str(&s)?;
let pretty = serde_json::to_string_pretty(&res)?;
eprintln!("{pretty}");
// TODO assert more
trace!("{pretty}");
let t2 = chrono::Utc::now();
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
// TODO add timeout

1
httpret/src/api4.rs Normal file
View File

@@ -0,0 +1 @@
pub mod binned;

102
httpret/src/api4/binned.rs Normal file
View File

@@ -0,0 +1,102 @@
use crate::bodystream::{response, ToPublicResponse};
use crate::channelconfig::chconf_from_binned;
use crate::err::Error;
use crate::response_err;
use http::{Method, StatusCode};
use http::{Request, Response};
use hyper::Body;
use netpod::log::*;
use netpod::query::BinnedQuery;
use netpod::timeunits::SEC;
use netpod::FromUrl;
use netpod::NodeConfigCached;
use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET};
use tracing::Instrument;
use url::Url;
async fn binned_json(url: Url, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
info!("httpret plain_events_json req: {:?}", req);
let (head, _body) = req.into_parts();
let query = BinnedQuery::from_url(&url).map_err(|e| {
let msg = format!("can not parse query: {}", e.msg());
e.add_public_msg(msg)
})?;
let chconf = chconf_from_binned(&query, node_config).await?;
// Update the series id since we don't require some unique identifier yet.
let mut query = query;
query.set_series_id(chconf.series);
let query = query;
// ---
let span1 = span!(
Level::INFO,
"httpret::binned",
beg = query.range().beg / SEC,
end = query.range().end / SEC,
ch = query.channel().name(),
);
span1.in_scope(|| {
debug!("begin");
});
let _: Result<_, Error> = match head.headers.get(http::header::ACCEPT) {
//Some(v) if v == APP_OCTET => binned_binary(query, chconf, &ctx, node_config).await,
//Some(v) if v == APP_JSON || v == ACCEPT_ALL => binned_json(query, chconf, &ctx, node_config).await,
_ => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?),
};
// TODO analogue to `streams::plaineventsjson::plain_events_json` create a function for binned json.
let item = streams::plaineventsjson::plain_events_json("", &node_config.node_config.cluster)
.instrument(span1)
.await?;
let buf = serde_json::to_vec(&item)?;
let ret = response(StatusCode::OK).body(Body::from(buf))?;
Ok(ret)
}
async fn binned(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
info!("req: {:?}", req);
let accept = req
.headers()
.get(http::header::ACCEPT)
.map_or(ACCEPT_ALL, |k| k.to_str().unwrap_or(ACCEPT_ALL));
let url = {
let s1 = format!("dummy:{}", req.uri());
Url::parse(&s1)
.map_err(Error::from)
.map_err(|e| e.add_public_msg(format!("Can not parse query url")))?
};
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
Ok(binned_json(url, req, node_config).await?)
} else if accept == APP_OCTET {
Ok(response_err(
StatusCode::NOT_ACCEPTABLE,
format!("binary binned data not yet available"),
)?)
} else {
let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("Unsupported Accept: {:?}", accept))?;
Ok(ret)
}
}
pub struct BinnedHandler {}
impl BinnedHandler {
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path() == "/api/4/binned" {
Some(Self {})
} else {
None
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
}
match binned(req, node_config).await {
Ok(ret) => Ok(ret),
Err(e) => Ok(e.to_public_response()),
}
}
}

View File

@@ -54,8 +54,7 @@ async fn plain_events(req: Request<Body>, node_config: &NodeConfigCached) -> Res
.map_err(Error::from)
.map_err(|e| e.add_public_msg(format!("Can not parse query url")))?
};
// TODO format error.
if accept == APP_JSON || accept == ACCEPT_ALL {
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
Ok(plain_events_json(url, req, node_config).await?)
} else if accept == APP_OCTET {
Ok(plain_events_binary(url, req, node_config).await?)

View File

@@ -1,4 +1,5 @@
pub mod api1;
pub mod api4;
pub mod bodystream;
pub mod channel_status;
pub mod channelconfig;

View File

@@ -223,7 +223,6 @@ pub fn make_stats_frame(item: &StatsItem) -> Result<BytesMut, Error> {
}
pub fn make_range_complete_frame() -> Result<BytesMut, Error> {
warn!("make_range_complete_frame");
let enc = [];
let mut h = crc32fast::Hasher::new();
h.update(&enc);
@@ -315,7 +314,6 @@ where
};
Ok(T::from_stats(k))
} else if frame.tyid() == RANGE_COMPLETE_FRAME_TYPE_ID {
warn!("decode_frame SEE RANGE COMPLETE FRAME TYPE");
// There is currently no content in this variant.
Ok(T::from_range_complete())
} else {

View File

@@ -307,7 +307,6 @@ where
T: Sized + serde::Serialize + FrameType,
{
fn make_frame(&self) -> Result<BytesMut, Error> {
info!("-------- make_frame for Sitemty");
match self {
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) => {
let frame_type_id = k.frame_type_id();

View File

@@ -1 +0,0 @@

View File

@@ -1,6 +1,5 @@
pub mod binsdim0;
pub mod channelevents;
pub mod collect;
pub mod eventsdim0;
pub mod merger;
pub mod merger_cev;

View File

@@ -3,6 +3,7 @@ use futures_util::{Stream, StreamExt};
use items::sitem_data;
use items::{RangeCompletableItem, Sitemty, StreamItem};
use netpod::log::*;
use std::collections::VecDeque;
use std::fmt;
use std::ops::ControlFlow;
use std::pin::Pin;
@@ -10,20 +11,20 @@ use std::task::{Context, Poll};
#[allow(unused)]
macro_rules! trace2 {
(D$($arg:tt)*) => ();
($($arg:tt)*) => (eprintln!($($arg)*));
($($arg:tt)*) => ();
($($arg:tt)*) => (trace!($($arg)*));
}
#[allow(unused)]
macro_rules! trace3 {
(D$($arg:tt)*) => ();
($($arg:tt)*) => (eprintln!($($arg)*));
($($arg:tt)*) => ();
($($arg:tt)*) => (trace!($($arg)*));
}
#[allow(unused)]
macro_rules! trace4 {
(D$($arg:tt)*) => ();
($($arg:tt)*) => (eprintln!($($arg)*));
($($arg:tt)*) => ();
($($arg:tt)*) => (trace!($($arg)*));
}
#[derive(Debug)]
@@ -58,10 +59,11 @@ pub struct Merger<T> {
out: Option<T>,
do_clear_out: bool,
out_max_len: usize,
range_complete: bool,
done: bool,
done2: bool,
done3: bool,
range_complete: Vec<bool>,
out_of_band_queue: VecDeque<Sitemty<T>>,
done_data: bool,
done_buffered: bool,
done_range_complete: bool,
complete: bool,
}
@@ -76,9 +78,10 @@ where
.field("items", &self.items)
.field("out_max_len", &self.out_max_len)
.field("range_complete", &self.range_complete)
.field("done", &self.done)
.field("done2", &self.done2)
.field("done3", &self.done3)
.field("out_of_band_queue", &self.out_of_band_queue.len())
.field("done_data", &self.done_data)
.field("done_buffered", &self.done_buffered)
.field("done_range_complete", &self.done_range_complete)
.finish()
}
}
@@ -95,10 +98,11 @@ where
out: None,
do_clear_out: false,
out_max_len,
range_complete: false,
done: false,
done2: false,
done3: false,
range_complete: vec![false; n],
out_of_band_queue: VecDeque::new(),
done_data: false,
done_buffered: false,
done_range_complete: false,
complete: false,
}
}
@@ -124,6 +128,7 @@ where
fn process(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result<ControlFlow<()>, Error> {
use ControlFlow::*;
trace4!("process");
let mut tslows = [None, None];
for (i1, itemopt) in self.items.iter_mut().enumerate() {
if let Some(item) = itemopt {
@@ -221,66 +226,63 @@ where
}
}
fn refill(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow<Poll<Error>> {
fn refill(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<Poll<()>, Error> {
trace4!("refill");
use ControlFlow::*;
use Poll::*;
let mut has_pending = false;
for i1 in 0..self.inps.len() {
let item = &self.items[i1];
if item.is_none() {
while let Some(inp) = &mut self.inps[i1] {
trace4!("refill while");
for i in 0..self.inps.len() {
if self.items[i].is_none() {
while let Some(inp) = self.inps[i].as_mut() {
match inp.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
match k {
StreamItem::DataItem(k) => match k {
RangeCompletableItem::RangeComplete => {
trace!("--------------------- ChannelEvents::RangeComplete \n======================");
// TODO track range complete for all inputs, it's only complete if all inputs are complete.
self.range_complete = true;
eprintln!("TODO inp RangeComplete which does not fill slot");
}
RangeCompletableItem::Data(k) => {
self.items[i1] = Some(k);
break;
}
},
StreamItem::Log(_) => {
eprintln!("TODO inp Log which does not fill slot");
Ready(Some(Ok(k))) => match k {
StreamItem::DataItem(k) => match k {
RangeCompletableItem::Data(k) => {
self.items[i] = Some(k);
trace4!("refilled {}", i);
}
StreamItem::Stats(_) => {
eprintln!("TODO inp Stats which does not fill slot");
RangeCompletableItem::RangeComplete => {
self.range_complete[i] = true;
debug!("Merger range_complete {:?}", self.range_complete);
continue;
}
},
StreamItem::Log(item) => {
// TODO limit queue length
self.out_of_band_queue.push_back(Ok(StreamItem::Log(item)));
continue;
}
StreamItem::Stats(item) => {
// TODO limit queue length
self.out_of_band_queue.push_back(Ok(StreamItem::Stats(item)));
continue;
}
},
Ready(Some(Err(e))) => {
self.inps[i] = None;
return Err(e.into());
}
Ready(Some(Err(e))) => return Break(Ready(e.into())),
Ready(None) => {
self.inps[i1] = None;
self.inps[i] = None;
}
Pending => {
has_pending = true;
}
}
break;
}
} else {
trace4!("refill inp {} has {}", i1, item.as_ref().unwrap().len());
}
}
if has_pending {
Break(Pending)
Ok(Pending)
} else {
Continue(())
Ok(Ready(()))
}
}
fn poll3(
mut self: Pin<&mut Self>,
cx: &mut Context,
has_pending: bool,
) -> ControlFlow<Poll<Option<Result<T, Error>>>> {
fn poll3(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow<Poll<Option<Result<T, Error>>>> {
use ControlFlow::*;
use Poll::*;
#[allow(unused)]
let ninps = self.inps.iter().filter(|a| a.is_some()).count();
let nitems = self.items.iter().filter(|a| a.is_some()).count();
let nitemsmissing = self
@@ -290,16 +292,11 @@ where
.filter(|(a, b)| a.is_some() && b.is_none())
.count();
trace3!("ninps {ninps} nitems {nitems} nitemsmissing {nitemsmissing}");
if ninps == 0 && nitems == 0 {
self.done = true;
if nitemsmissing != 0 {
let e = Error::from(format!("missing but no pending"));
Break(Ready(Some(Err(e))))
} else if nitems == 0 {
Break(Ready(None))
} else if nitemsmissing != 0 {
if !has_pending {
let e = Error::from(format!("missing but no pending"));
Break(Ready(Some(Err(e))))
} else {
Break(Pending)
}
} else {
match Self::process(Pin::new(&mut self), cx) {
Ok(Break(())) => {
@@ -332,9 +329,9 @@ where
use ControlFlow::*;
use Poll::*;
match Self::refill(Pin::new(&mut self), cx) {
Continue(()) => Self::poll3(self, cx, false),
Break(Pending) => Self::poll3(self, cx, true),
Break(Ready(e)) => Break(Ready(Some(Err(e)))),
Ok(Ready(())) => Self::poll3(self, cx),
Ok(Pending) => Break(Pending),
Err(e) => Break(Ready(Some(Err(e)))),
}
}
}
@@ -347,43 +344,44 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
const NAME: &str = "Merger_mergeable";
let span = span!(Level::TRACE, NAME);
let span = span!(Level::TRACE, "merger");
let _spanguard = span.enter();
loop {
trace3!("{NAME} poll");
trace3!("poll");
break if self.complete {
panic!("poll after complete");
} else if self.done3 {
} else if self.done_range_complete {
self.complete = true;
Ready(None)
} else if self.done2 {
self.done3 = true;
if self.range_complete {
warn!("TODO emit range complete only if all inputs signaled complete");
trace!("{NAME} emit RangeComplete");
} else if self.done_buffered {
self.done_range_complete = true;
if self.range_complete.iter().all(|x| *x) {
trace!("emit RangeComplete");
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
} else {
continue;
}
} else if self.done {
self.done2 = true;
} else if self.done_data {
self.done_buffered = true;
if let Some(out) = self.out.take() {
Ready(Some(sitem_data(out)))
} else {
continue;
}
} else if let Some(item) = self.out_of_band_queue.pop_front() {
trace4!("emit out-of-band");
Ready(Some(item))
} else {
match Self::poll2(self.as_mut(), cx) {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(k) => match k {
Ready(Some(Ok(item))) => Ready(Some(sitem_data(item))),
Ready(Some(Err(e))) => {
self.done = true;
self.done_data = true;
Ready(Some(Err(e.into())))
}
Ready(None) => {
self.done = true;
self.done_data = true;
continue;
}
Pending => Pending,

View File

@@ -57,35 +57,49 @@ async fn events_conn_handler_inner_try(
let (netin, mut netout) = stream.into_split();
let perf_opts = PerfOpts { inmem_bufcap: 512 };
let mut h = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap);
let mut frames = vec![];
let mut frames = Vec::new();
while let Some(k) = h
.next()
.instrument(span!(Level::INFO, "events_conn_handler INPUT STREAM READ"))
.instrument(span!(Level::INFO, "events_conn_handler/query-input"))
.await
{
match k {
Ok(StreamItem::DataItem(item)) => {
info!("GOT FRAME: {:?}", item);
frames.push(item);
}
Ok(_) => {}
Ok(item) => {
debug!("ignored incoming frame {:?}", item);
}
Err(e) => {
return Err((e, netout))?;
return Err((e, netout).into());
}
}
}
debug!("events_conn_handler input frames received");
if frames.len() != 1 {
error!("missing command frame");
return Err((Error::with_msg("missing command frame"), netout))?;
error!("{:?}", frames);
error!("missing command frame len {}", frames.len());
return Err((Error::with_msg("missing command frame"), netout).into());
}
//if frames[1].tyid() != items::TERM_FRAME_TYPE_ID {
// return Err((Error::with_msg("input without term frame"), netout).into());
//}
let query_frame = &frames[0];
if query_frame.tyid() != items::EVENT_QUERY_JSON_STRING_FRAME {
return Err((Error::with_msg("query frame wrong type"), netout).into());
}
// TODO this does not need all variants of Sitemty.
let qitem = match decode_frame::<Sitemty<EventQueryJsonStringFrame>>(&frames[0]) {
let qitem = match decode_frame::<Sitemty<EventQueryJsonStringFrame>>(query_frame) {
Ok(k) => match k {
Ok(k) => match k {
StreamItem::DataItem(k) => match k {
RangeCompletableItem::Data(k) => k,
RangeCompletableItem::RangeComplete => panic!(),
RangeCompletableItem::RangeComplete => {
return Err((Error::with_msg("bad query item"), netout).into())
}
},
_ => panic!(),
_ => return Err((Error::with_msg("bad query item"), netout).into()),
},
Err(e) => return Err((e, netout).into()),
},
@@ -96,7 +110,7 @@ async fn events_conn_handler_inner_try(
Ok(k) => k,
Err(e) => {
error!("json parse error: {:?}", e);
return Err((Error::with_msg("json parse error"), netout))?;
return Err((Error::with_msg("json parse error"), netout).into());
}
};
info!("events_conn_handler_inner_try evq {:?}", evq);

View File

@@ -5,6 +5,7 @@ use items_0::collect_c::{Collectable, Collector};
use netpod::log::*;
use std::fmt;
use std::time::{Duration, Instant};
use tracing::Instrument;
#[allow(unused)]
macro_rules! trace2 {
@@ -33,88 +34,90 @@ where
S: Stream<Item = Sitemty<T>> + Unpin,
T: Collectable + fmt::Debug,
{
let mut collector: Option<<T as Collectable>::Collector> = None;
let mut stream = stream;
let deadline = deadline.into();
let mut range_complete = false;
let mut total_duration = Duration::ZERO;
loop {
let item = match tokio::time::timeout_at(deadline, stream.next()).await {
Ok(Some(k)) => k,
Ok(None) => break,
Err(_e) => {
if let Some(coll) = collector.as_mut() {
coll.set_timed_out();
} else {
eprintln!("TODO [861a95813]");
err::todo();
}
break;
}
};
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
range_complete = true;
if let Some(coll) = collector.as_mut() {
coll.set_range_complete();
} else {
eprintln!("TODO [7cc0fca8f]");
err::todo();
}
let span = tracing::span!(tracing::Level::TRACE, "collect");
let fut = async {
let mut collector: Option<<T as Collectable>::Collector> = None;
let mut stream = stream;
let deadline = deadline.into();
let mut range_complete = false;
let mut total_duration = Duration::ZERO;
loop {
let item = match tokio::time::timeout_at(deadline, stream.next()).await {
Ok(Some(k)) => k,
Ok(None) => break,
Err(_e) => {
if let Some(coll) = collector.as_mut() {
coll.set_timed_out();
} else {
warn!("Timeout but no collector yet");
}
RangeCompletableItem::Data(mut item) => {
eprintln!("COLLECTOR INGEST ITEM");
if collector.is_none() {
let c = item.new_collector();
collector = Some(c);
break;
}
};
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
range_complete = true;
if let Some(coll) = collector.as_mut() {
coll.set_range_complete();
} else {
warn!("Received RangeComplete but no collector yet");
}
}
let coll = collector.as_mut().unwrap();
coll.ingest(&mut item);
if coll.len() as u64 >= events_max {
break;
RangeCompletableItem::Data(mut item) => {
if collector.is_none() {
let c = item.new_collector();
collector = Some(c);
}
let coll = collector.as_mut().unwrap();
coll.ingest(&mut item);
if coll.len() as u64 >= events_max {
warn!("Reached events_max {} abort", events_max);
break;
}
}
},
StreamItem::Log(item) => {
trace!("Log {:?}", item);
}
StreamItem::Stats(item) => {
trace!("Stats {:?}", item);
use items::StatsItem;
use netpod::DiskStats;
match item {
// TODO factor and simplify the stats collection:
StatsItem::EventDataReadStats(_) => {}
StatsItem::RangeFilterStats(_) => {}
StatsItem::DiskStats(item) => match item {
DiskStats::OpenStats(k) => {
total_duration += k.duration;
}
DiskStats::SeekStats(k) => {
total_duration += k.duration;
}
DiskStats::ReadStats(k) => {
total_duration += k.duration;
}
DiskStats::ReadExactStats(k) => {
total_duration += k.duration;
}
},
}
}
},
StreamItem::Log(item) => {
trace!("Log {:?}", item);
Err(e) => {
// TODO Need to use some flags to get good enough error message for remote user.
return Err(e);
}
StreamItem::Stats(item) => {
trace!("Stats {:?}", item);
use items::StatsItem;
use netpod::DiskStats;
match item {
// TODO factor and simplify the stats collection:
StatsItem::EventDataReadStats(_) => {}
StatsItem::RangeFilterStats(_) => {}
StatsItem::DiskStats(item) => match item {
DiskStats::OpenStats(k) => {
total_duration += k.duration;
}
DiskStats::SeekStats(k) => {
total_duration += k.duration;
}
DiskStats::ReadStats(k) => {
total_duration += k.duration;
}
DiskStats::ReadExactStats(k) => {
total_duration += k.duration;
}
},
}
}
},
Err(e) => {
// TODO Need to use some flags to get good enough error message for remote user.
Err(e)?;
}
}
}
let _ = range_complete;
let res = collector
.ok_or_else(|| Error::with_msg_no_trace(format!("no collector created")))?
.result()?;
debug!("Total duration: {:?}", total_duration);
Ok(res)
let _ = range_complete;
let res = collector
.ok_or_else(|| Error::with_msg_no_trace(format!("no result because no collector was created")))?
.result()?;
debug!("Total duration: {:?}", total_duration);
Ok(res)
};
fut.instrument(span).await
}

View File

@@ -3,13 +3,19 @@ use bytes::Bytes;
use err::Error;
use futures_util::{pin_mut, Stream};
use items::inmem::InMemoryFrame;
use items::StreamItem;
use items::{StreamItem, TERM_FRAME_TYPE_ID};
use items::{INMEM_FRAME_FOOT, INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC};
use netpod::log::*;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, ReadBuf};
#[allow(unused)]
macro_rules! trace2 {
($($arg:tt)*) => ();
($($arg:tt)*) => (trace!($($arg)*));
}
impl err::ToErr for crate::slidebuf::Error {
fn to_err(self) -> Error {
Error::with_msg_no_trace(format!("{self}"))
@@ -29,7 +35,6 @@ where
done: bool,
complete: bool,
inp_bytes_consumed: u64,
npoll: u64,
}
impl<T> InMemoryFrameAsyncReadStream<T>
@@ -44,12 +49,11 @@ where
done: false,
complete: false,
inp_bytes_consumed: 0,
npoll: 0,
}
}
fn poll_upstream(&mut self, cx: &mut Context) -> Poll<Result<usize, Error>> {
trace!("poll_upstream");
trace2!("poll_upstream");
use Poll::*;
let mut buf = ReadBuf::new(self.buf.available_writable_area(self.need_min - self.buf.len())?);
let inp = &mut self.inp;
@@ -58,7 +62,7 @@ where
Ready(Ok(())) => {
let n = buf.filled().len();
self.buf.wadv(n)?;
trace!("InMemoryFrameAsyncReadStream READ {} FROM UPSTREAM", n);
trace!("recv bytes {}", n);
Ready(Ok(n))
}
Ready(Err(e)) => Ready(Err(e.into())),
@@ -66,12 +70,10 @@ where
}
}
// Try to parse a frame.
// Try to consume bytes to parse a frame.
// Update the need_min to the most current state.
// If successful, return item and number of bytes consumed.
// Must only be called when at least `need_min` bytes are available.
fn parse(&mut self) -> Result<Option<InMemoryFrame>, Error> {
trace!("parse");
let buf = self.buf.data();
if buf.len() < self.need_min {
return Err(Error::with_msg_no_trace("expect at least need_min"));
@@ -116,9 +118,6 @@ where
h.update(&buf[INMEM_FRAME_HEAD..p1]);
let payload_crc = h.finalize();
let frame_crc_ind = u32::from_le_bytes(buf[p1..p1 + 4].try_into()?);
//info!("len {}", len);
//info!("payload_crc_ind {}", payload_crc_ind);
//info!("frame_crc_ind {}", frame_crc_ind);
let payload_crc_match = payload_crc_exp == payload_crc;
let frame_crc_match = frame_crc_ind == frame_crc;
if !frame_crc_match || !payload_crc_match {
@@ -152,11 +151,8 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
trace!("poll");
self.npoll += 1;
if self.npoll > 2000 {
panic!()
}
let span = span!(Level::TRACE, "inmem");
let _spanguard = span.enter();
loop {
break if self.complete {
panic!("poll_next on complete")
@@ -171,11 +167,17 @@ where
let e = Error::with_msg_no_trace("enough bytes but nothing parsed");
Ready(Some(Err(e)))
} else {
debug!("not enouh for parse, need to wait for more");
continue;
}
}
Ok(Some(item)) => Ready(Some(Ok(StreamItem::DataItem(item)))),
Ok(Some(item)) => {
if item.tyid() == TERM_FRAME_TYPE_ID {
self.done = true;
continue;
} else {
Ready(Some(Ok(StreamItem::DataItem(item))))
}
}
Err(e) => {
self.done = true;
Ready(Some(Err(e)))
@@ -184,7 +186,6 @@ where
} else {
match self.poll_upstream(cx) {
Ready(Ok(n1)) => {
debug!("read {n1}");
if n1 == 0 {
self.done = true;
continue;
@@ -197,10 +198,7 @@ where
self.done = true;
Ready(Some(Err(e)))
}
Pending => {
debug!("PENDING");
Pending
}
Pending => Pending,
}
};
}

View File

@@ -1,51 +1 @@
pub mod mergedstream;
use crate::frames::eventsfromframes::EventsFromFrames;
use crate::frames::inmem::InMemoryFrameAsyncReadStream;
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
use items::frame::make_frame;
use items::frame::make_term_frame;
use items::sitem_data;
use items::EventQueryJsonStringFrame;
use items::Sitemty;
use netpod::log::*;
use netpod::Cluster;
use std::pin::Pin;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
pub type BoxedStream<T> = Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>;
pub async fn open_tcp_streams<Q, T>(query: Q, cluster: &Cluster) -> Result<Vec<BoxedStream<T>>, Error>
where
Q: serde::Serialize,
// Group bounds in new trait
T: items::FrameTypeInnerStatic + serde::de::DeserializeOwned + Send + Unpin + 'static,
{
// TODO when unit tests established, change to async connect:
let mut streams = Vec::new();
for node in &cluster.nodes {
debug!("open_tcp_streams to: {}:{}", node.host, node.port_raw);
let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?;
let qjs = serde_json::to_string(&query)?;
let (netin, mut netout) = net.into_split();
let item = EventQueryJsonStringFrame(qjs);
let item = sitem_data(item);
let buf = make_frame(&item)?;
netout.write_all(&buf).await?;
let buf = make_term_frame()?;
netout.write_all(&buf).await?;
netout.flush().await?;
netout.forget();
// TODO for images, we need larger buffer capacity
let frames = InMemoryFrameAsyncReadStream::new(netin, 1024 * 128);
let stream = EventsFromFrames::<_, T>::new(frames);
let stream = stream.inspect(|x| {
items::on_sitemty_range_complete!(x, warn!("RangeComplete SEEN IN RECEIVED TCP STREAM"));
});
streams.push(Box::pin(stream) as _);
}
Ok(streams)
}

View File

@@ -1,4 +1,4 @@
use crate::merge::open_tcp_streams;
use crate::tcprawclient::open_tcp_streams;
use bytes::Bytes;
use err::Error;
use futures_util::{Stream, StreamExt};
@@ -41,7 +41,7 @@ where
stream
};
let stream = { items_2::merger::Merger::new(inps, 1) };
let deadline = Instant::now() + Duration::from_millis(2000);
let deadline = Instant::now() + Duration::from_millis(8000);
let events_max = 100;
let collected = crate::collect::collect(stream, deadline, events_max).await?;
let jsval = serde_json::to_value(&collected)?;

View File

@@ -11,9 +11,11 @@ use err::Error;
use futures_util::Stream;
use items::eventfull::EventFull;
use items::frame::{make_frame, make_term_frame};
use items::sitem_data;
use items::{EventQueryJsonStringFrame, EventsNodeProcessor, RangeCompletableItem, Sitemty, StreamItem};
use netpod::log::*;
use netpod::query::RawEventsQuery;
use netpod::Cluster;
use netpod::{Node, PerfOpts};
use std::pin::Pin;
use tokio::io::AsyncWriteExt;
@@ -71,3 +73,34 @@ pub async fn x_processed_event_blobs_stream_from_node(
let items = EventsFromFrames::new(frames);
Ok(Box::pin(items))
}
pub type BoxedStream<T> = Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>;
pub async fn open_tcp_streams<Q, T>(query: Q, cluster: &Cluster) -> Result<Vec<BoxedStream<T>>, Error>
where
Q: serde::Serialize,
// Group bounds in new trait
T: items::FrameTypeInnerStatic + serde::de::DeserializeOwned + Send + Unpin + 'static,
{
// TODO when unit tests established, change to async connect:
let mut streams = Vec::new();
for node in &cluster.nodes {
debug!("open_tcp_streams to: {}:{}", node.host, node.port_raw);
let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?;
let qjs = serde_json::to_string(&query)?;
let (netin, mut netout) = net.into_split();
let item = EventQueryJsonStringFrame(qjs);
let item = sitem_data(item);
let buf = make_frame(&item)?;
netout.write_all(&buf).await?;
let buf = make_term_frame()?;
netout.write_all(&buf).await?;
netout.flush().await?;
netout.forget();
// TODO for images, we need larger buffer capacity
let frames = InMemoryFrameAsyncReadStream::new(netin, 1024 * 128);
let stream = EventsFromFrames::<_, T>::new(frames);
streams.push(Box::pin(stream) as _);
}
Ok(streams)
}