Clean up, collect with timeout

This commit is contained in:
Dominik Werder
2022-12-19 14:09:37 +01:00
parent 64233b0ccb
commit 646ec38b3c
32 changed files with 622 additions and 321 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqbuffer"
version = "4.1.0"
version = "0.3.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -1,15 +1,11 @@
use crate::ErrConv;
use err::Error;
use netpod::log::*;
use netpod::{Channel, NodeConfigCached, ScalarType, Shape};
use crate::ErrConv;
pub struct ChConf {
pub series: u64,
pub name: String,
pub scalar_type: ScalarType,
pub shape: Shape,
}
use netpod::ChConf;
use netpod::Channel;
use netpod::NodeConfigCached;
use netpod::ScalarType;
use netpod::Shape;
/// It is an unsolved question as to how we want to uniquely address channels.
/// Currently, the usual (backend, channelname) works in 99% of the cases, but the edge-cases
@@ -26,9 +22,11 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) ->
channel.backend, ncc.node_config.cluster.backend
);
}
let backend = channel.backend().into();
if channel.backend() == "test-inmem" {
let ret = if channel.name() == "inmem-d0-i32" {
let ret = ChConf {
backend: channel.backend().into(),
series: 1,
name: channel.name().into(),
scalar_type: ScalarType::I32,
@@ -46,6 +44,7 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) ->
// TODO the series-ids here are just random. Need to integrate with better test setup.
let ret = if channel.name() == "scalar-i32-be" {
let ret = ChConf {
backend,
series: 1,
name: channel.name().into(),
scalar_type: ScalarType::I32,
@@ -54,6 +53,7 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) ->
Ok(ret)
} else if channel.name() == "wave-f64-be-n21" {
let ret = ChConf {
backend,
series: 2,
name: channel.name().into(),
scalar_type: ScalarType::F64,
@@ -62,6 +62,7 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) ->
Ok(ret)
} else if channel.name() == "const-regular-scalar-i32-be" {
let ret = ChConf {
backend,
series: 3,
name: channel.name().into(),
scalar_type: ScalarType::I32,
@@ -96,6 +97,7 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) ->
// TODO can I get a slice from psql driver?
let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec<i32>>(2))?;
let ret = ChConf {
backend,
series,
name,
scalar_type,
@@ -127,6 +129,7 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) ->
// TODO can I get a slice from psql driver?
let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec<i32>>(3))?;
let ret = ChConf {
backend,
series,
name,
scalar_type,

View File

@@ -100,7 +100,7 @@ pub async fn search_channel_scylla(
" series, facility, channel, scalar_type, shape_dims",
" from series_by_channel",
" where channel ~* $1",
" limit 100,"
" limit 100",
));
let pgclient = crate::create_connection(pgconf).await?;
let rows = pgclient.query(sql.as_str(), &[&query.name_regex]).await.err_conv()?;

View File

@@ -1,6 +1,4 @@
/*!
Error handling and reporting.
*/
//! Error handling and reporting.
use serde::{Deserialize, Serialize};
use std::array::TryFromSliceError;
@@ -22,15 +20,17 @@ pub enum Reason {
IoError,
}
/**
The common error type for this application.
*/
/// The common error type for this application.
#[derive(Clone, PartialEq, Serialize, Deserialize)]
pub struct Error {
msg: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
trace_str: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
public_msg: Option<Vec<String>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
reason: Option<Reason>,
#[serde(default, skip_serializing_if = "Option::is_none")]
parent: Option<Box<Error>>,
}

View File

@@ -1,6 +1,6 @@
[package]
name = "httpret"
version = "0.0.2"
version = "0.3.5"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -37,7 +37,7 @@ async fn binned_json(url: Url, req: Request<Body>, node_config: &NodeConfigCache
span1.in_scope(|| {
debug!("begin");
});
let item = streams::timebinnedjson::timebinned_json(&query, &node_config.node_config.cluster)
let item = streams::timebinnedjson::timebinned_json(&query, &chconf, &node_config.node_config.cluster)
.instrument(span1)
.await?;
let buf = serde_json::to_vec(&item)?;

View File

@@ -42,7 +42,7 @@ impl ChannelSearchHandler {
Ok(response(StatusCode::OK).body(Body::from(buf))?)
}
Err(e) => {
warn!("ChannelConfigHandler::handle: got error from channel_config: {e:?}");
warn!("handle: got error from channel_search: {e:?}");
Ok(e.to_public_response())
}
}

View File

@@ -10,6 +10,7 @@ use netpod::NodeConfigCached;
use netpod::NodeStatus;
use netpod::NodeStatusArchiverAppliance;
use netpod::TableSizes;
use std::collections::VecDeque;
use std::time::Duration;
async fn table_sizes(node_config: &NodeConfigCached) -> Result<TableSizes, Error> {
@@ -94,13 +95,14 @@ impl StatusNodesRecursive {
let database_size = dbconn::database_size(node_config).await.map_err(|e| format!("{e:?}"));
let ret = NodeStatus {
name: format!("{}:{}", node_config.node.host, node_config.node.port),
version: core::env!("CARGO_PKG_VERSION").into(),
is_sf_databuffer: node_config.node.sf_databuffer.is_some(),
is_archiver_engine: node_config.node.channel_archiver.is_some(),
is_archiver_appliance: node_config.node.archiver_appliance.is_some(),
database_size: Some(database_size),
table_sizes: Some(table_sizes(node_config).await.map_err(Into::into)),
archiver_appliance_status,
subs: None,
subs: VecDeque::new(),
};
Ok(ret)
}

View File

@@ -1,7 +1,6 @@
use crate::err::Error;
use crate::{response, ToPublicResponse};
use dbconn::channelconfig::chconf_from_database;
use dbconn::channelconfig::ChConf;
use dbconn::create_connection;
use futures_util::StreamExt;
use http::{Method, Request, Response, StatusCode};
@@ -11,6 +10,7 @@ use netpod::log::*;
use netpod::query::prebinned::PreBinnedQuery;
use netpod::query::{BinnedQuery, PlainEventsQuery};
use netpod::timeunits::*;
use netpod::ChConf;
use netpod::{Channel, ChannelConfigQuery, FromUrl, ScalarType, Shape};
use netpod::{ChannelConfigResponse, NodeConfigCached};
use netpod::{ACCEPT_ALL, APP_JSON};
@@ -33,6 +33,7 @@ pub async fn chconf_from_events_json(q: &PlainEventsQuery, ncc: &NodeConfigCache
pub async fn chconf_from_prebinned(q: &PreBinnedQuery, _ncc: &NodeConfigCached) -> Result<ChConf, Error> {
let ret = ChConf {
backend: q.channel().backend().into(),
series: q
.channel()
.series()

View File

@@ -34,7 +34,6 @@ impl EventsHandler {
}
async fn plain_events(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
info!("httpret plain_events req: {:?}", req);
let accept_def = APP_JSON;
let accept = req
.headers()
@@ -83,20 +82,19 @@ async fn plain_events_json(
req: Request<Body>,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
debug!("httpret plain_events_json req: {:?}", req);
info!("httpret plain_events_json req: {:?}", req);
let (_head, _body) = req.into_parts();
let query = PlainEventsQuery::from_url(&url)?;
let chconf = chconf_from_events_json(&query, node_config).await.map_err(|e| {
error!("chconf_from_events_json {e:?}");
e.add_public_msg(format!("Can not get channel information"))
})?;
let chconf = chconf_from_events_json(&query, node_config)
.await
.map_err(Error::from)?;
// Update the series id since we don't require some unique identifier yet.
let mut query = query;
query.set_series_id(chconf.series);
let query = query;
// ---
//let query = RawEventsQuery::new(query.channel().clone(), query.range().clone(), AggKind::Plain);
let item = streams::plaineventsjson::plain_events_json(&query, &node_config.node_config.cluster).await;
let item = streams::plaineventsjson::plain_events_json(&query, &chconf, &node_config.node_config.cluster).await;
let item = match item {
Ok(item) => item,
Err(e) => {

View File

@@ -178,7 +178,8 @@ pub async fn gather_get_json_generic<SM, NT, FT, OUT>(
tags: Vec<String>,
nt: NT,
ft: FT,
// TODO use deadline instead
// TODO use deadline instead.
// TODO Wait a bit longer compared to remote to receive partial results.
timeout: Duration,
) -> Result<OUT, Error>
where
@@ -190,6 +191,8 @@ where
+ 'static,
FT: Fn(Vec<(Tag, Result<SubRes<SM>, Error>)>) -> Result<OUT, Error>,
{
// TODO remove magic constant
let extra_timeout = Duration::from_millis(3000);
if urls.len() != bodies.len() {
return Err(Error::with_msg_no_trace("unequal numbers of urls and bodies"));
}
@@ -222,14 +225,17 @@ where
let tag2 = tag.clone();
let jh = tokio::spawn(async move {
select! {
_ = sleep(timeout).fuse() => {
_ = sleep(timeout + extra_timeout).fuse() => {
error!("PROXY TIMEOUT");
Err(Error::with_msg_no_trace("timeout"))
}
res = {
let client = Client::new();
client.request(req?).fuse()
} => {
info!("received result in time");
let ret = nt(tag2, res?).await?;
info!("transformed result in time");
Ok(ret)
}
}

View File

@@ -112,8 +112,8 @@ async fn proxy_http_service_inner(
h.handle(req, ctx, &proxy_config).await
} else if path == "/api/4/backends" {
Ok(backends(req, proxy_config).await?)
} else if path == "/api/4/search/channel" {
Ok(api4::channel_search(req, proxy_config).await?)
} else if let Some(h) = api4::ChannelSearchAggHandler::handler(&req) {
h.handle(req, &proxy_config).await
} else if path == "/api/4/events" {
Ok(proxy_single_backend_query::<PlainEventsQuery>(req, ctx, proxy_config).await?)
} else if path == "/api/4/status/connection/events" {

View File

@@ -1,108 +1,147 @@
use crate::bodystream::ToPublicResponse;
use crate::err::Error;
use crate::gather::{gather_get_json_generic, SubRes, Tag};
use crate::{response, ReqCtx};
use crate::gather::gather_get_json_generic;
use crate::gather::SubRes;
use crate::gather::Tag;
use crate::response;
use crate::ReqCtx;
use futures_util::Future;
use http::{header, Request, Response, StatusCode};
use http::Method;
use http::Request;
use http::Response;
use http::StatusCode;
use hyper::Body;
use itertools::Itertools;
use netpod::log::*;
use netpod::ChannelSearchQuery;
use netpod::ChannelSearchResult;
use netpod::NodeStatus;
use netpod::NodeStatusSub;
use netpod::ProxyConfig;
use netpod::ACCEPT_ALL;
use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON};
use netpod::APP_JSON;
use serde_json::Value as JsVal;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::pin::Pin;
use std::time::Duration;
use url::Url;
pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> Result<Response<Body>, Error> {
// TODO model channel search according to StatusNodesRecursive.
// Make sure that backend handling is correct:
// The aggregator asks all backends, except if the user specifies some backend
// in which case it should only go to the matching backends.
// The aggregators and leaf nodes behind should as well not depend on backend,
// but simply answer all matching.
pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> Result<ChannelSearchResult, Error> {
let (head, _body) = req.into_parts();
let vdef = header::HeaderValue::from_static(APP_JSON);
let v = head.headers.get(header::ACCEPT).unwrap_or(&vdef);
if v == APP_JSON || v == ACCEPT_ALL {
let inpurl = Url::parse(&format!("dummy:{}", head.uri))?;
let query = ChannelSearchQuery::from_url(&inpurl)?;
let mut bodies = vec![];
let urls = proxy_config
.backends
.iter()
.filter(|k| {
if let Some(back) = &query.backend {
back == &k.name
} else {
true
}
})
.map(|pb| match Url::parse(&format!("{}/api/4/search/channel", pb.url)) {
let inpurl = Url::parse(&format!("dummy:{}", head.uri))?;
let query = ChannelSearchQuery::from_url(&inpurl)?;
let mut urls = Vec::new();
let mut tags = Vec::new();
let mut bodies = Vec::new();
for pb in &proxy_config.backends {
if if let Some(b) = &query.backend {
pb.name.contains(b)
} else {
true
} {
match Url::parse(&format!("{}/api/4/search/channel", pb.url)) {
Ok(mut url) => {
query.append_to_url(&mut url);
Ok(url)
tags.push(url.to_string());
bodies.push(None);
urls.push(url);
}
Err(_) => return Err(Error::with_msg(format!("parse error for: {:?}", pb))),
}
}
}
let nt = |tag, res| {
let fut = async {
let body = hyper::body::to_bytes(res).await?;
info!("got a result {:?}", body);
let res: ChannelSearchResult = match serde_json::from_slice(&body) {
Ok(k) => k,
Err(_) => {
let msg = format!("can not parse result: {}", String::from_utf8_lossy(&body));
error!("{}", msg);
return Err(Error::with_msg_no_trace(msg));
}
Err(_) => Err(Error::with_msg(format!("parse error for: {:?}", pb))),
})
.fold_ok(vec![], |mut a, x| {
a.push(x);
bodies.push(None);
a
})?;
let tags = urls.iter().map(|k| k.to_string()).collect();
let nt = |tag, res| {
let fut = async {
let body = hyper::body::to_bytes(res).await?;
//info!("got a result {:?}", body);
let res: ChannelSearchResult = match serde_json::from_slice(&body) {
Ok(k) => k,
Err(_) => {
let msg = format!("can not parse result: {}", String::from_utf8_lossy(&body));
error!("{}", msg);
return Err(Error::with_msg_no_trace(msg));
}
};
let ret = SubRes {
tag,
status: StatusCode::OK,
val: res,
};
Ok(ret)
};
Box::pin(fut) as Pin<Box<dyn Future<Output = _> + Send>>
let ret = SubRes {
tag,
status: StatusCode::OK,
val: res,
};
Ok(ret)
};
let ft = |all: Vec<(Tag, Result<SubRes<ChannelSearchResult>, Error>)>| {
let mut res = Vec::new();
for (_tag, j) in all {
match j {
Ok(j) => {
for k in j.val.channels {
res.push(k);
}
}
Err(e) => {
warn!("{e}");
Box::pin(fut) as Pin<Box<dyn Future<Output = _> + Send>>
};
let ft = |all: Vec<(Tag, Result<SubRes<ChannelSearchResult>, Error>)>| {
let mut res = Vec::new();
for (_tag, j) in all {
match j {
Ok(j) => {
for k in j.val.channels {
res.push(k);
}
}
Err(e) => {
warn!("{e}");
}
}
let res = ChannelSearchResult { channels: res };
let res = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(Body::from(serde_json::to_string(&res)?))
.map_err(Error::from)?;
Ok(res)
};
let ret = gather_get_json_generic(
http::Method::GET,
urls,
bodies,
tags,
nt,
ft,
Duration::from_millis(3000),
)
.await?;
Ok(ret)
} else {
Ok(response(StatusCode::NOT_ACCEPTABLE)
.body(Body::from(format!("{:?}", proxy_config.name)))
.map_err(Error::from)?)
}
let res = ChannelSearchResult { channels: res };
Ok(res)
};
let ret = gather_get_json_generic(
http::Method::GET,
urls,
bodies,
tags,
nt,
ft,
Duration::from_millis(3000),
)
.await?;
Ok(ret)
}
pub struct ChannelSearchAggHandler {}
impl ChannelSearchAggHandler {
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path() == "/api/4/search/channel" {
Some(Self {})
} else {
None
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &ProxyConfig) -> Result<Response<Body>, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
.headers()
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
match channel_search(req, node_config).await {
Ok(item) => {
let buf = serde_json::to_vec(&item)?;
Ok(response(StatusCode::OK).body(Body::from(buf))?)
}
Err(e) => {
warn!("ChannelConfigHandler::handle: got error from channel_config: {e:?}");
Ok(e.to_public_response())
}
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
}
}
@@ -183,27 +222,33 @@ impl StatusNodesRecursive {
Box::pin(fut) as Pin<Box<dyn Future<Output = _> + Send>>
};
let ft = |all: Vec<(Tag, Result<SubRes<JsVal>, Error>)>| {
let mut subs = BTreeMap::new();
let mut subs = VecDeque::new();
for (tag, sr) in all {
match sr {
Ok(sr) => {
let s: Result<NodeStatus, _> = serde_json::from_value(sr.val).map_err(err::Error::from);
subs.insert(tag.0, s);
let sub = NodeStatusSub { url: tag.0, status: s };
subs.push_back(sub);
}
Err(e) => {
subs.insert(tag.0, Err(err::Error::from(e)));
let sub = NodeStatusSub {
url: tag.0,
status: Err(err::Error::from(e)),
};
subs.push_back(sub);
}
}
}
let ret = NodeStatus {
name: format!("{}:{}", proxy_config.name, proxy_config.port),
version: core::env!("CARGO_PKG_VERSION").into(),
is_sf_databuffer: false,
is_archiver_engine: false,
is_archiver_appliance: false,
database_size: None,
table_sizes: None,
archiver_appliance_status: None,
subs: Some(subs),
subs,
};
Ok(ret)
};

View File

@@ -3,7 +3,10 @@ use crate::collect_s::ToJsonResult;
use crate::AsAnyMut;
use crate::AsAnyRef;
use crate::Events;
use crate::WithLen;
use err::Error;
use netpod::BinnedRange;
use netpod::NanoRange;
use std::fmt;
pub trait Collector: fmt::Debug + Send {
@@ -11,14 +14,13 @@ pub trait Collector: fmt::Debug + Send {
fn ingest(&mut self, item: &mut dyn Collectable);
fn set_range_complete(&mut self);
fn set_timed_out(&mut self);
fn result(&mut self) -> Result<Box<dyn Collected>, Error>;
fn result(&mut self, range: Option<NanoRange>, binrange: Option<BinnedRange>) -> Result<Box<dyn Collected>, Error>;
}
pub trait Collectable: fmt::Debug + AsAnyMut {
pub trait Collectable: fmt::Debug + AsAnyMut + crate::WithLen {
fn new_collector(&self) -> Box<dyn Collector>;
}
// TODO can this get removed?
pub trait Collected: fmt::Debug + ToJsonResult + AsAnyRef + Send {}
erased_serde::serialize_trait_object!(Collected);
@@ -44,13 +46,19 @@ pub trait CollectorDyn: fmt::Debug + Send {
fn set_timed_out(&mut self);
fn result(&mut self) -> Result<Box<dyn Collected>, Error>;
fn result(&mut self, range: Option<NanoRange>, binrange: Option<BinnedRange>) -> Result<Box<dyn Collected>, Error>;
}
pub trait CollectableWithDefault: AsAnyMut {
fn new_collector(&self) -> Box<dyn CollectorDyn>;
}
impl crate::WithLen for Box<dyn Events> {
fn len(&self) -> usize {
self.as_ref().len()
}
}
impl Collectable for Box<dyn Events> {
fn new_collector(&self) -> Box<dyn Collector> {
todo!()
@@ -77,11 +85,21 @@ impl Collector for TimeBinnedCollector {
todo!()
}
fn result(&mut self) -> Result<Box<dyn Collected>, Error> {
fn result(
&mut self,
_range: Option<NanoRange>,
_binrange: Option<BinnedRange>,
) -> Result<Box<dyn Collected>, Error> {
todo!()
}
}
impl WithLen for Box<dyn crate::TimeBinned> {
fn len(&self) -> usize {
self.as_ref().len()
}
}
impl Collectable for Box<dyn crate::TimeBinned> {
fn new_collector(&self) -> Box<dyn Collector> {
self.as_ref().new_collector()

View File

@@ -1,6 +1,10 @@
use super::collect_c::Collected;
use crate::{AsAnyMut, AsAnyRef, WithLen};
use crate::AsAnyMut;
use crate::AsAnyRef;
use crate::WithLen;
use err::Error;
use netpod::BinnedRange;
use netpod::NanoRange;
use serde::Serialize;
use std::any::Any;
use std::fmt;
@@ -15,14 +19,18 @@ pub trait CollectorType: Send + Unpin + WithLen {
fn set_timed_out(&mut self);
// TODO use this crate's Error instead:
fn result(&mut self) -> Result<Self::Output, Error>;
fn result(&mut self, range: Option<NanoRange>, binrange: Option<BinnedRange>) -> Result<Self::Output, Error>;
}
pub trait Collector: Send + Unpin + WithLen {
fn ingest(&mut self, src: &mut dyn Collectable);
fn set_range_complete(&mut self);
fn set_timed_out(&mut self);
fn result(&mut self) -> Result<Box<dyn ToJsonResult>, Error>;
fn result(
&mut self,
range: Option<NanoRange>,
binrange: Option<BinnedRange>,
) -> Result<Box<dyn ToJsonResult>, Error>;
}
// TODO rename to `Typed`
@@ -49,8 +57,12 @@ impl<T: CollectorType + 'static> Collector for T {
T::set_timed_out(self)
}
fn result(&mut self) -> Result<Box<dyn ToJsonResult>, Error> {
let ret = T::result(self)?;
fn result(
&mut self,
range: Option<NanoRange>,
binrange: Option<BinnedRange>,
) -> Result<Box<dyn ToJsonResult>, Error> {
let ret = T::result(self, range, binrange)?;
Ok(Box::new(ret) as _)
}
}

View File

@@ -127,7 +127,7 @@ pub trait TimeBinnable: fmt::Debug + WithLen + RangeOverlapInfo + Any + AsAnyRef
/// Container of some form of events, for use as trait object.
pub trait Events:
fmt::Debug + Any + Collectable + CollectableWithDefault + TimeBinnable + Send + erased_serde::Serialize
fmt::Debug + Any + Collectable + CollectableWithDefault + TimeBinnable + WithLen + Send + erased_serde::Serialize
{
fn as_time_binnable(&self) -> &dyn TimeBinnable;
fn verify(&self) -> bool;

View File

@@ -10,9 +10,9 @@ use items_0::TimeBins;
use items_0::WithLen;
use items_0::{AppendEmptyBin, AsAnyRef};
use items_0::{AsAnyMut, Empty};
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::NanoRange;
use netpod::{log::*, BinnedRange};
use num_traits::Zero;
use serde::{Deserialize, Serialize};
use std::any::Any;
@@ -386,8 +386,12 @@ impl<NTY: ScalarOps> CollectorType for BinsDim0Collector<NTY> {
self.timed_out = true;
}
fn result(&mut self) -> Result<Self::Output, Error> {
let bin_count_exp = 0;
fn result(&mut self, _range: Option<NanoRange>, binrange: Option<BinnedRange>) -> Result<Self::Output, Error> {
let bin_count_exp = if let Some(r) = &binrange {
r.bin_count() as u32
} else {
0
};
let bin_count = self.vals.ts1s.len() as u32;
let (missing_bins, continue_at, finished_at) = if bin_count < bin_count_exp {
match self.vals.ts2s.back() {
@@ -502,8 +506,12 @@ where
CollectorType::set_timed_out(self)
}
fn result(&mut self) -> Result<Box<dyn items_0::collect_c::Collected>, Error> {
match CollectorType::result(self) {
fn result(
&mut self,
range: Option<NanoRange>,
binrange: Option<BinnedRange>,
) -> Result<Box<dyn items_0::collect_c::Collected>, Error> {
match CollectorType::result(self, range, binrange) {
Ok(res) => Ok(Box::new(res)),
Err(e) => Err(e.into()),
}

View File

@@ -10,9 +10,9 @@ use items_0::TimeBinned;
use items_0::TimeBins;
use items_0::WithLen;
use items_0::{AppendEmptyBin, AsAnyMut, AsAnyRef};
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::NanoRange;
use netpod::{log::*, BinnedRange};
use num_traits::Zero;
use serde::{Deserialize, Serialize};
use std::any::Any;
@@ -387,8 +387,12 @@ impl<NTY: ScalarOps> CollectorType for BinsXbinDim0Collector<NTY> {
self.timed_out = true;
}
fn result(&mut self) -> Result<Self::Output, Error> {
let bin_count_exp = 0;
fn result(&mut self, _range: Option<NanoRange>, binrange: Option<BinnedRange>) -> Result<Self::Output, Error> {
let bin_count_exp = if let Some(r) = &binrange {
r.bin_count() as u32
} else {
0
};
let bin_count = self.vals.ts1s.len() as u32;
let (missing_bins, continue_at, finished_at) = if bin_count < bin_count_exp {
match self.vals.ts2s.back() {
@@ -503,8 +507,12 @@ where
CollectorType::set_timed_out(self)
}
fn result(&mut self) -> Result<Box<dyn items_0::collect_c::Collected>, Error> {
match CollectorType::result(self) {
fn result(
&mut self,
range: Option<NanoRange>,
binrange: Option<BinnedRange>,
) -> Result<Box<dyn items_0::collect_c::Collected>, Error> {
match CollectorType::result(self, range, binrange) {
Ok(res) => Ok(Box::new(res)),
Err(e) => Err(e.into()),
}

View File

@@ -8,6 +8,8 @@ use items_0::collect_s::Collector;
use items_0::AsAnyMut;
use items_0::AsAnyRef;
use netpod::log::*;
use netpod::BinnedRange;
use netpod::NanoRange;
use serde::{Deserialize, Serialize};
use std::any::Any;
use std::fmt;
@@ -669,7 +671,11 @@ impl items_0::collect_c::Collector for ChannelEventsCollector {
self.timed_out = true;
}
fn result(&mut self) -> Result<Box<dyn items_0::collect_c::Collected>, err::Error> {
fn result(
&mut self,
range: Option<NanoRange>,
binrange: Option<BinnedRange>,
) -> Result<Box<dyn items_0::collect_c::Collected>, err::Error> {
match self.coll.as_mut() {
Some(coll) => {
if self.range_complete {
@@ -678,20 +684,25 @@ impl items_0::collect_c::Collector for ChannelEventsCollector {
if self.timed_out {
coll.set_timed_out();
}
let res = coll.result()?;
//error!("fix output of ChannelEventsCollector [03ce6bc5a]");
//err::todo();
//let output = ChannelEventsCollectorOutput {};
let res = coll.result(range, binrange)?;
Ok(res)
}
None => {
error!("nothing collected [caa8d2565]");
todo!()
Err(err::Error::with_public_msg_no_trace("nothing collected [caa8d2565]"))
}
}
}
}
impl items_0::WithLen for ChannelEvents {
fn len(&self) -> usize {
match self {
ChannelEvents::Events(k) => k.len(),
ChannelEvents::Status(_) => 1,
}
}
}
impl items_0::collect_c::Collectable for ChannelEvents {
fn new_collector(&self) -> Box<dyn items_0::collect_c::Collector> {
Box::new(ChannelEventsCollector::new())

View File

@@ -5,9 +5,9 @@ use crate::{TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinn
use err::Error;
use items_0::scalar_ops::ScalarOps;
use items_0::{AsAnyMut, AsAnyRef, Empty, Events, WithLen};
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::NanoRange;
use netpod::{log::*, BinnedRange};
use serde::{Deserialize, Serialize};
use std::any::Any;
use std::collections::VecDeque;
@@ -151,7 +151,7 @@ where
#[derive(Debug)]
pub struct EventsDim0Collector<NTY> {
vals: EventsDim0<NTY>,
range_complete: bool,
range_final: bool,
timed_out: bool,
}
@@ -159,7 +159,7 @@ impl<NTY> EventsDim0Collector<NTY> {
pub fn new() -> Self {
Self {
vals: EventsDim0::empty(),
range_complete: false,
range_final: false,
timed_out: false,
}
}
@@ -186,7 +186,7 @@ pub struct EventsDim0CollectorOutput<NTY> {
#[serde(rename = "values")]
values: VecDeque<NTY>,
#[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")]
range_complete: bool,
range_final: bool,
#[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")]
timed_out: bool,
#[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
@@ -220,7 +220,7 @@ impl<NTY: ScalarOps> EventsDim0CollectorOutput<NTY> {
}
pub fn range_complete(&self) -> bool {
self.range_complete
self.range_final
}
pub fn timed_out(&self) -> bool {
@@ -266,14 +266,14 @@ impl<NTY: ScalarOps> items_0::collect_s::CollectorType for EventsDim0Collector<N
}
fn set_range_complete(&mut self) {
self.range_complete = true;
self.range_final = true;
}
fn set_timed_out(&mut self) {
self.timed_out = true;
}
fn result(&mut self) -> Result<Self::Output, Error> {
fn result(&mut self, range: Option<NanoRange>, _binrange: Option<BinnedRange>) -> Result<Self::Output, Error> {
// If we timed out, we want to hint the client from where to continue.
// This is tricky: currently, client can not request a left-exclusive range.
// We currently give the timestamp of the last event plus a small delta.
@@ -281,13 +281,17 @@ impl<NTY: ScalarOps> items_0::collect_s::CollectorType for EventsDim0Collector<N
// can parse and handle.
let continue_at = if self.timed_out {
if let Some(ts) = self.vals.tss.back() {
Some(IsoDateTime::from_u64(*ts))
Some(IsoDateTime::from_u64(*ts + netpod::timeunits::MS))
} else {
// TODO tricky: should yield again the original range begin? Leads to recursion.
// Range begin plus delta?
// Anyway, we don't have the range begin here.
warn!("timed out without any result, can not yield a continue-at");
None
if let Some(range) = &range {
Some(IsoDateTime::from_u64(range.beg + netpod::timeunits::SEC))
} else {
// TODO tricky: should yield again the original range begin? Leads to recursion.
// Range begin plus delta?
// Anyway, we don't have the range begin here.
warn!("timed out without any result, can not yield a continue-at");
None
}
}
} else {
None
@@ -303,7 +307,7 @@ impl<NTY: ScalarOps> items_0::collect_s::CollectorType for EventsDim0Collector<N
pulse_anchor,
pulse_off: pulse_off,
values: mem::replace(&mut self.vals.values, VecDeque::new()),
range_complete: self.range_complete,
range_final: self.range_final,
timed_out: self.timed_out,
continue_at,
};
@@ -340,8 +344,12 @@ impl<NTY: ScalarOps> items_0::collect_c::Collector for EventsDim0Collector<NTY>
items_0::collect_s::CollectorType::set_timed_out(self)
}
fn result(&mut self) -> Result<Box<dyn items_0::collect_c::Collected>, err::Error> {
match items_0::collect_s::CollectorType::result(self) {
fn result(
&mut self,
range: Option<NanoRange>,
binrange: Option<BinnedRange>,
) -> Result<Box<dyn items_0::collect_c::Collected>, err::Error> {
match items_0::collect_s::CollectorType::result(self, range, binrange) {
Ok(x) => Ok(Box::new(x)),
Err(e) => Err(e.into()),
}
@@ -970,7 +978,11 @@ impl items_0::collect_c::CollectorDyn for EventsDim0CollectorDyn {
todo!()
}
fn result(&mut self) -> Result<Box<dyn items_0::collect_c::Collected>, err::Error> {
fn result(
&mut self,
_range: Option<NanoRange>,
_binrange: Option<BinnedRange>,
) -> Result<Box<dyn items_0::collect_c::Collected>, err::Error> {
todo!()
}
}
@@ -998,8 +1010,12 @@ impl<NTY: ScalarOps> items_0::collect_c::CollectorDyn for EventsDim0Collector<NT
items_0::collect_s::CollectorType::set_timed_out(self);
}
fn result(&mut self) -> Result<Box<dyn items_0::collect_c::Collected>, err::Error> {
items_0::collect_s::CollectorType::result(self)
fn result(
&mut self,
range: Option<NanoRange>,
binrange: Option<BinnedRange>,
) -> Result<Box<dyn items_0::collect_c::Collected>, err::Error> {
items_0::collect_s::CollectorType::result(self, range, binrange)
.map(|x| Box::new(x) as _)
.map_err(|e| e.into())
}

View File

@@ -1,13 +1,13 @@
use crate::binsxbindim0::BinsXbinDim0;
use crate::RangeOverlapInfo;
use crate::{pulse_offs_from_abs, ts_offs_from_abs};
use crate::{IsoDateTime, RangeOverlapInfo};
use crate::{TimeBinnableType, TimeBinnableTypeAggregator};
use err::Error;
use items_0::scalar_ops::ScalarOps;
use items_0::{AsAnyMut, WithLen};
use items_0::{AsAnyRef, Empty};
use netpod::log::*;
use netpod::NanoRange;
use netpod::{log::*, BinnedRange};
use serde::{Deserialize, Serialize};
use std::any::Any;
use std::collections::VecDeque;
@@ -359,10 +359,11 @@ pub struct EventsXbinDim0CollectorOutput<NTY> {
#[serde(rename = "avgs")]
avgs: VecDeque<f32>,
#[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")]
finalised_range: bool,
range_final: bool,
#[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")]
timed_out: bool,
// TODO add continue-at
#[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
continue_at: Option<IsoDateTime>,
}
impl<NTY> AsAnyRef for EventsXbinDim0CollectorOutput<NTY>
@@ -398,14 +399,14 @@ impl<NTY> items_0::collect_c::Collected for EventsXbinDim0CollectorOutput<NTY> w
#[derive(Debug)]
pub struct EventsXbinDim0Collector<NTY> {
vals: EventsXbinDim0<NTY>,
finalised_range: bool,
range_final: bool,
timed_out: bool,
}
impl<NTY> EventsXbinDim0Collector<NTY> {
pub fn new() -> Self {
Self {
finalised_range: false,
range_final: false,
timed_out: false,
vals: EventsXbinDim0::empty(),
}
@@ -434,15 +435,32 @@ where
}
fn set_range_complete(&mut self) {
self.finalised_range = true;
self.range_final = true;
}
fn set_timed_out(&mut self) {
self.timed_out = true;
}
fn result(&mut self) -> Result<Self::Output, Error> {
fn result(&mut self, range: Option<NanoRange>, _binrange: Option<BinnedRange>) -> Result<Self::Output, Error> {
use std::mem::replace;
let continue_at = if self.timed_out {
if let Some(ts) = self.vals.tss.back() {
Some(IsoDateTime::from_u64(*ts + netpod::timeunits::MS))
} else {
if let Some(range) = &range {
Some(IsoDateTime::from_u64(range.beg + netpod::timeunits::SEC))
} else {
// TODO tricky: should yield again the original range begin? Leads to recursion.
// Range begin plus delta?
// Anyway, we don't have the range begin here.
warn!("timed out without any result, can not yield a continue-at");
None
}
}
} else {
None
};
let mins = replace(&mut self.vals.mins, VecDeque::new());
let maxs = replace(&mut self.vals.maxs, VecDeque::new());
let avgs = replace(&mut self.vals.avgs, VecDeque::new());
@@ -459,8 +477,9 @@ where
mins,
maxs,
avgs,
finalised_range: self.finalised_range,
range_final: self.range_final,
timed_out: self.timed_out,
continue_at,
};
Ok(ret)
}
@@ -497,7 +516,11 @@ where
todo!()
}
fn result(&mut self) -> Result<Box<dyn items_0::collect_c::Collected>, Error> {
fn result(
&mut self,
_range: Option<NanoRange>,
_binrange: Option<BinnedRange>,
) -> Result<Box<dyn items_0::collect_c::Collected>, Error> {
todo!()
}
}

View File

@@ -371,7 +371,8 @@ fn flush_binned(
}
}
// TODO remove
// TODO remove.
// Compare with items_2::test::bin01
pub async fn binned_collected(
scalar_type: ScalarType,
shape: Shape,
@@ -477,7 +478,7 @@ pub async fn binned_collected(
}
match coll {
Some(mut coll) => {
let res = coll.result().map_err(|e| format!("{e}"))?;
let res = coll.result(None, None).map_err(|e| format!("{e}"))?;
tokio::time::sleep(Duration::from_millis(2000)).await;
Ok(res)
}

View File

@@ -298,7 +298,12 @@ fn bin01() {
let mut stream = ChannelEventsMerger::new(vec![inp1, inp2]);
let mut coll = None;
let mut binner = None;
let edges: Vec<_> = (0..10).into_iter().map(|t| SEC * 10 * t).collect();
let range = NanoRange {
beg: SEC * 0,
end: SEC * 100,
};
let binrange = BinnedRange::covering_range(range, 10).unwrap();
let edges = binrange.edges();
// TODO implement continue-at [hcn2956jxhwsf]
#[allow(unused)]
let bin_count_exp = (edges.len() - 1) as u32;
@@ -369,7 +374,7 @@ fn bin01() {
}
match coll {
Some(mut coll) => {
let res = coll.result().map_err(|e| format!("{e}"))?;
let res = coll.result(None, Some(binrange.clone())).map_err(|e| format!("{e}"))?;
//let res = res.to_json_result().map_err(|e| format!("{e}"))?;
//let res = res.to_json_bytes().map_err(|e| format!("{e}"))?;
eprintln!("res {res:?}");

View File

@@ -10,6 +10,7 @@ path = "src/netpod.rs"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
humantime-serde = "1.1.1"
async-channel = "1.6"
bytes = "1.3"
chrono = { version = "0.4.19", features = ["serde"] }

View File

@@ -11,7 +11,7 @@ use err::Error;
use futures_util::{Stream, StreamExt};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsVal;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, VecDeque};
use std::fmt;
use std::iter::FromIterator;
use std::net::SocketAddr;
@@ -543,11 +543,28 @@ pub struct TableSizes {
pub sizes: Vec<(String, String)>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct NodeStatusSub {
pub url: String,
pub status: Result<NodeStatus, Error>,
}
fn is_false<T>(x: T) -> bool
where
T: std::borrow::Borrow<bool>,
{
*x.borrow() == false
}
#[derive(Debug, Serialize, Deserialize)]
pub struct NodeStatus {
pub name: String,
pub version: String,
#[serde(default, skip_serializing_if = "is_false")]
pub is_sf_databuffer: bool,
#[serde(default, skip_serializing_if = "is_false")]
pub is_archiver_engine: bool,
#[serde(default, skip_serializing_if = "is_false")]
pub is_archiver_appliance: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub database_size: Option<Result<u64, String>>,
@@ -555,8 +572,8 @@ pub struct NodeStatus {
pub table_sizes: Option<Result<TableSizes, Error>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub archiver_appliance_status: Option<NodeStatusArchiverAppliance>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub subs: Option<BTreeMap<String, Result<NodeStatus, Error>>>,
#[serde(default, skip_serializing_if = "VecDeque::is_empty")]
pub subs: VecDeque<NodeStatusSub>,
}
// Describes a "channel" which is a time-series with a unique name within a "backend".
@@ -601,12 +618,19 @@ impl FromUrl for Channel {
.into(),
name: pairs
.get("channelName")
.ok_or(Error::with_public_msg("missing channelName"))?
//.ok_or(Error::with_public_msg("missing channelName"))?
.map(String::from)
.unwrap_or(String::new())
.into(),
series: pairs
.get("seriesId")
.and_then(|x| x.parse::<u64>().map_or(None, |x| Some(x))),
};
if ret.name.is_empty() && ret.series.is_none() {
return Err(Error::with_public_msg(format!(
"Missing one of channelName or seriesId parameters."
)));
}
Ok(ret)
}
}
@@ -615,7 +639,9 @@ impl AppendToUrl for Channel {
fn append_to_url(&self, url: &mut Url) {
let mut g = url.query_pairs_mut();
g.append_pair("backend", &self.backend);
g.append_pair("channelName", &self.name);
if self.name().len() > 0 {
g.append_pair("channelName", &self.name);
}
if let Some(series) = self.series {
g.append_pair("seriesId", &series.to_string());
}
@@ -2088,7 +2114,7 @@ pub struct ChannelConfigResponse {
Provide basic information about a channel, especially it's shape.
Also, byte-order is important for clients that process the raw databuffer event data (python data_api3).
*/
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChannelInfo {
pub scalar_type: ScalarType,
pub byte_order: Option<ByteOrder>,
@@ -2096,6 +2122,15 @@ pub struct ChannelInfo {
pub msg: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChConf {
pub backend: String,
pub series: u64,
pub name: String,
pub scalar_type: ScalarType,
pub shape: Shape,
}
pub fn f32_close(a: f32, b: f32) -> bool {
if (a - b).abs() < 1e-4 || (a / b > 0.999 && a / b < 1.001) {
true

View File

@@ -3,11 +3,23 @@ pub mod datetime;
pub mod prebinned;
use crate::get_url_query_pairs;
use crate::is_false;
use crate::log::*;
use crate::{AggKind, AppendToUrl, ByteSize, Channel, FromUrl, HasBackend, HasTimeout, NanoRange, ToNanos};
use chrono::{DateTime, TimeZone, Utc};
use crate::AggKind;
use crate::AppendToUrl;
use crate::ByteSize;
use crate::Channel;
use crate::FromUrl;
use crate::HasBackend;
use crate::HasTimeout;
use crate::NanoRange;
use crate::ToNanos;
use chrono::DateTime;
use chrono::TimeZone;
use chrono::Utc;
use err::Error;
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use serde::Serialize;
use std::collections::BTreeMap;
use std::fmt;
use std::time::Duration;
@@ -71,11 +83,19 @@ pub struct PlainEventsQuery {
range: NanoRange,
agg_kind: AggKind,
timeout: Duration,
#[serde(default, skip_serializing_if = "Option::is_none")]
events_max: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none", with = "humantime_serde")]
event_delay: Option<Duration>,
#[serde(default, skip_serializing_if = "Option::is_none")]
stream_batch_len: Option<usize>,
#[serde(default, skip_serializing_if = "is_false")]
report_error: bool,
#[serde(default, skip_serializing_if = "is_false")]
do_log: bool,
#[serde(default, skip_serializing_if = "is_false")]
do_test_main_error: bool,
#[serde(default, skip_serializing_if = "is_false")]
do_test_stream_error: bool,
}
@@ -94,6 +114,7 @@ impl PlainEventsQuery {
agg_kind,
timeout,
events_max,
event_delay: None,
stream_batch_len: None,
report_error: false,
do_log,
@@ -126,8 +147,12 @@ impl PlainEventsQuery {
self.timeout
}
pub fn events_max(&self) -> Option<u64> {
self.events_max
pub fn events_max(&self) -> u64 {
self.events_max.unwrap_or(1024 * 1024)
}
pub fn event_delay(&self) -> &Option<Duration> {
&self.event_delay
}
pub fn do_log(&self) -> bool {
@@ -196,6 +221,9 @@ impl FromUrl for PlainEventsQuery {
events_max: pairs
.get("eventsMax")
.map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?,
event_delay: pairs.get("eventDelay").map_or(Ok(None), |k| {
k.parse::<u64>().map(|x| Duration::from_millis(x)).map(|k| Some(k))
})?,
stream_batch_len: pairs
.get("streamBatchLen")
.map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?,
@@ -242,10 +270,15 @@ impl AppendToUrl for PlainEventsQuery {
if let Some(x) = self.events_max.as_ref() {
g.append_pair("eventsMax", &format!("{}", x));
}
if let Some(x) = self.event_delay.as_ref() {
g.append_pair("eventDelay", &format!("{:.0}", x.as_secs_f64() * 1e3));
}
if let Some(x) = self.stream_batch_len.as_ref() {
g.append_pair("streamBatchLen", &format!("{}", x));
}
g.append_pair("doLog", &format!("{}", self.do_log));
if self.do_log {
g.append_pair("doLog", &format!("{}", self.do_log));
}
}
}
@@ -427,7 +460,12 @@ impl AppendToUrl for BinnedQuery {
{
self.channel.append_to_url(url);
let mut g = url.query_pairs_mut();
g.append_pair("cacheUsage", &self.cache_usage.to_string());
match &self.cache_usage {
CacheUsage::Use => {}
_ => {
g.append_pair("cacheUsage", &self.cache_usage.to_string());
}
}
g.append_pair("binCount", &format!("{}", self.bin_count));
g.append_pair(
"begDate",
@@ -443,11 +481,16 @@ impl AppendToUrl for BinnedQuery {
}
{
let mut g = url.query_pairs_mut();
g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size));
g.append_pair("diskStatsEveryKb", &format!("{}", self.disk_stats_every.bytes() / 1024));
// TODO
//g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size));
//g.append_pair("diskStatsEveryKb", &format!("{}", self.disk_stats_every.bytes() / 1024));
g.append_pair("timeout", &format!("{}", self.timeout.as_millis()));
g.append_pair("abortAfterBinCount", &format!("{}", self.abort_after_bin_count));
g.append_pair("doLog", &format!("{}", self.do_log));
if self.abort_after_bin_count > 0 {
g.append_pair("abortAfterBinCount", &format!("{}", self.abort_after_bin_count));
}
if self.do_log {
g.append_pair("doLog", &format!("{}", self.do_log));
}
}
}
}

View File

@@ -81,9 +81,6 @@ async fn events_conn_handler_inner_try(
error!("missing command frame len {}", frames.len());
return Err((Error::with_msg("missing command frame"), netout).into());
}
//if frames[1].tyid() != items::TERM_FRAME_TYPE_ID {
// return Err((Error::with_msg("input without term frame"), netout).into());
//}
let query_frame = &frames[0];
if query_frame.tyid() != items::EVENT_QUERY_JSON_STRING_FRAME {
return Err((Error::with_msg("query frame wrong type"), netout).into());
@@ -184,24 +181,43 @@ async fn events_conn_handler_inner_try(
scy,
do_test_stream_error,
);
let stream = stream.map(|item| {
let item = match item {
Ok(item) => match item {
ChannelEvents::Events(item) => {
let item = ChannelEvents::Events(item);
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)));
item
}
ChannelEvents::Status(item) => {
let item = ChannelEvents::Status(item);
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)));
item
let stream = stream
.map(|item| match &item {
Ok(k) => match k {
ChannelEvents::Events(k) => {
let n = k.len();
let d = evq.event_delay();
(item, n, d.clone())
}
ChannelEvents::Status(_) => (item, 1, None),
},
Err(e) => Err(e),
};
item
});
Err(_) => (item, 1, None),
})
.then(|(item, n, d)| async move {
if let Some(d) = d {
debug!("sleep {} times {:?}", n, d);
tokio::time::sleep(d).await;
}
item
})
.map(|item| {
let item = match item {
Ok(item) => match item {
ChannelEvents::Events(item) => {
let item = ChannelEvents::Events(item);
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)));
item
}
ChannelEvents::Status(item) => {
let item = ChannelEvents::Status(item);
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)));
item
}
},
Err(e) => Err(e),
};
item
});
Box::pin(stream)
} else if let Some(_) = &node_config.node.channel_archiver {
let e = Error::with_msg_no_trace("archapp not built");
@@ -236,6 +252,7 @@ async fn events_conn_handler_inner_try(
let item = item.make_frame();
match item {
Ok(buf) => {
info!("write {} bytes", buf.len());
buf_len_histo.ingest(buf.len() as u32);
match netout.write_all(&buf).await {
Ok(_) => {}

View File

@@ -2,7 +2,7 @@ use err::Error;
use futures_util::{Stream, StreamExt};
use items::{RangeCompletableItem, Sitemty, StreamItem};
use items_0::collect_c::Collectable;
use netpod::log::*;
use netpod::{log::*, BinnedRange, NanoRange};
use std::fmt;
use std::time::{Duration, Instant};
use tracing::Instrument;
@@ -25,99 +25,120 @@ macro_rules! trace4 {
($($arg:tt)*) => (eprintln!($($arg)*));
}
async fn collect_in_span<T, S>(
stream: S,
deadline: Instant,
events_max: u64,
range: Option<NanoRange>,
binrange: Option<BinnedRange>,
) -> Result<Box<dyn items_0::collect_c::Collected>, Error>
where
S: Stream<Item = Sitemty<T>> + Unpin,
T: Collectable + items_0::WithLen + fmt::Debug,
{
let mut collector: Option<Box<dyn items_0::collect_c::Collector>> = None;
let mut stream = stream;
let deadline = deadline.into();
let mut range_complete = false;
let mut timed_out = false;
let mut total_duration = Duration::ZERO;
loop {
let item = match tokio::time::timeout_at(deadline, stream.next()).await {
Ok(Some(k)) => k,
Ok(None) => break,
Err(_e) => {
warn!("collect_in_span time out");
timed_out = true;
if let Some(coll) = collector.as_mut() {
coll.set_timed_out();
} else {
warn!("Timeout but no collector yet");
}
break;
}
};
info!("collect_in_span see item");
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
range_complete = true;
if let Some(coll) = collector.as_mut() {
coll.set_range_complete();
} else {
warn!("Received RangeComplete but no collector yet");
}
}
RangeCompletableItem::Data(mut item) => {
info!("collect_in_span sees {}", item.len());
if collector.is_none() {
let c = item.new_collector();
collector = Some(c);
}
let coll = collector.as_mut().unwrap();
coll.ingest(&mut item);
if coll.len() as u64 >= events_max {
warn!("Reached events_max {} abort", events_max);
break;
}
}
},
StreamItem::Log(item) => {
trace!("Log {:?}", item);
}
StreamItem::Stats(item) => {
trace!("Stats {:?}", item);
use items::StatsItem;
use netpod::DiskStats;
match item {
// TODO factor and simplify the stats collection:
StatsItem::EventDataReadStats(_) => {}
StatsItem::RangeFilterStats(_) => {}
StatsItem::DiskStats(item) => match item {
DiskStats::OpenStats(k) => {
total_duration += k.duration;
}
DiskStats::SeekStats(k) => {
total_duration += k.duration;
}
DiskStats::ReadStats(k) => {
total_duration += k.duration;
}
DiskStats::ReadExactStats(k) => {
total_duration += k.duration;
}
},
}
}
},
Err(e) => {
// TODO Need to use some flags to get good enough error message for remote user.
return Err(e);
}
}
}
let _ = range_complete;
let _ = timed_out;
let res = collector
.ok_or_else(|| Error::with_msg_no_trace(format!("no result because no collector was created")))?
.result(range, binrange)?;
debug!("Total duration: {:?}", total_duration);
Ok(res)
}
pub async fn collect<T, S>(
stream: S,
deadline: Instant,
events_max: u64,
range: Option<NanoRange>,
binrange: Option<BinnedRange>,
) -> Result<Box<dyn items_0::collect_c::Collected>, Error>
where
S: Stream<Item = Sitemty<T>> + Unpin,
T: Collectable + fmt::Debug,
T: Collectable + items_0::WithLen + fmt::Debug,
{
let span = tracing::span!(tracing::Level::TRACE, "collect");
let fut = async {
let mut collector: Option<Box<dyn items_0::collect_c::Collector>> = None;
let mut stream = stream;
let deadline = deadline.into();
let mut range_complete = false;
let mut total_duration = Duration::ZERO;
loop {
let item = match tokio::time::timeout_at(deadline, stream.next()).await {
Ok(Some(k)) => k,
Ok(None) => break,
Err(_e) => {
if let Some(coll) = collector.as_mut() {
coll.set_timed_out();
} else {
warn!("Timeout but no collector yet");
}
break;
}
};
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
range_complete = true;
if let Some(coll) = collector.as_mut() {
coll.set_range_complete();
} else {
warn!("Received RangeComplete but no collector yet");
}
}
RangeCompletableItem::Data(mut item) => {
if collector.is_none() {
let c = item.new_collector();
collector = Some(c);
}
let coll = collector.as_mut().unwrap();
coll.ingest(&mut item);
if coll.len() as u64 >= events_max {
warn!("Reached events_max {} abort", events_max);
break;
}
}
},
StreamItem::Log(item) => {
trace!("Log {:?}", item);
}
StreamItem::Stats(item) => {
trace!("Stats {:?}", item);
use items::StatsItem;
use netpod::DiskStats;
match item {
// TODO factor and simplify the stats collection:
StatsItem::EventDataReadStats(_) => {}
StatsItem::RangeFilterStats(_) => {}
StatsItem::DiskStats(item) => match item {
DiskStats::OpenStats(k) => {
total_duration += k.duration;
}
DiskStats::SeekStats(k) => {
total_duration += k.duration;
}
DiskStats::ReadStats(k) => {
total_duration += k.duration;
}
DiskStats::ReadExactStats(k) => {
total_duration += k.duration;
}
},
}
}
},
Err(e) => {
// TODO Need to use some flags to get good enough error message for remote user.
return Err(e);
}
}
}
let _ = range_complete;
let res = collector
.ok_or_else(|| Error::with_msg_no_trace(format!("no result because no collector was created")))?
.result()?;
debug!("Total duration: {:?}", total_duration);
Ok(res)
};
fut.instrument(span).await
collect_in_span(stream, deadline, events_max, range, binrange)
.instrument(span)
.await
}

View File

@@ -62,7 +62,7 @@ where
Ready(Ok(())) => {
let n = buf.filled().len();
self.buf.wadv(n)?;
trace!("recv bytes {}", n);
debug!("recv bytes {}", n);
Ready(Ok(n))
}
Ready(Err(e)) => Ready(Err(e.into())),
@@ -131,6 +131,7 @@ where
return Err(e);
}
self.inp_bytes_consumed += lentot as u64;
debug!("parsed frame well len {}", len);
let ret = InMemoryFrame {
len,
tyid,

View File

@@ -1,15 +1,28 @@
use crate::tcprawclient::open_tcp_streams;
use err::Error;
#[allow(unused)]
use netpod::log::*;
use futures_util::stream;
use futures_util::StreamExt;
use items_2::channelevents::ChannelEvents;
use netpod::query::PlainEventsQuery;
use netpod::ChConf;
use netpod::Cluster;
use serde_json::Value as JsonValue;
use std::time::Duration;
use std::time::Instant;
pub async fn plain_events_json(query: &PlainEventsQuery, cluster: &Cluster) -> Result<JsonValue, Error> {
let deadline = Instant::now() + query.timeout();
let events_max = query.events_max().unwrap_or(1024 * 32);
pub async fn plain_events_json(
query: &PlainEventsQuery,
chconf: &ChConf,
cluster: &Cluster,
) -> Result<JsonValue, Error> {
// TODO remove magic constant
let deadline = Instant::now() + query.timeout() + Duration::from_millis(1000);
let events_max = query.events_max();
let _empty = items::empty_events_dyn(&chconf.scalar_type, &chconf.shape, query.agg_kind());
let _empty = items_2::empty_events_dyn(&chconf.scalar_type, &chconf.shape, query.agg_kind());
let empty = items_2::empty_events_dyn_2(&chconf.scalar_type, &chconf.shape, query.agg_kind());
let empty = ChannelEvents::Events(empty);
let empty = items::sitem_data(empty);
// TODO should be able to ask for data-events only, instead of mixed data and status events.
let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&query, cluster).await?;
//let inps = open_tcp_streams::<_, Box<dyn items_2::Events>>(&query, cluster).await?;
@@ -23,8 +36,9 @@ pub async fn plain_events_json(query: &PlainEventsQuery, cluster: &Cluster) -> R
let stream = inp0.chain(inp1).chain(inp2);
stream
};
let stream = { items_2::merger::Merger::new(inps, 512) };
let collected = crate::collect::collect(stream, deadline, events_max).await?;
let stream = { items_2::merger::Merger::new(inps, 1) };
let stream = stream::iter([empty]).chain(stream);
let collected = crate::collect::collect(stream, deadline, events_max, Some(query.range().clone()), None).await?;
let jsval = serde_json::to_value(&collected)?;
Ok(jsval)
}

View File

@@ -15,7 +15,7 @@ fn collect_channel_events() -> Result<(), Error> {
let stream = stream::iter(vec![sitem_data(evs0), sitem_data(evs1)]);
let deadline = Instant::now() + Duration::from_millis(4000);
let events_max = 10000;
let res = crate::collect::collect(stream, deadline, events_max).await?;
let res = crate::collect::collect(stream, deadline, events_max, None, None).await?;
//eprintln!("collected result: {res:?}");
if let Some(res) = res.as_any_ref().downcast_ref::<EventsDim0CollectorOutput<f32>>() {
eprintln!("Great, a match");

View File

@@ -1,19 +1,30 @@
use crate::tcprawclient::open_tcp_streams;
use err::Error;
use futures_util::stream;
use futures_util::StreamExt;
use items_2::channelevents::ChannelEvents;
#[allow(unused)]
use netpod::log::*;
use netpod::query::{BinnedQuery, PlainEventsQuery};
use netpod::{BinnedRange, Cluster};
use netpod::query::BinnedQuery;
use netpod::query::PlainEventsQuery;
use netpod::BinnedRange;
use netpod::ChConf;
use netpod::Cluster;
use serde_json::Value as JsonValue;
use std::time::{Duration, Instant};
use std::time::Duration;
use std::time::Instant;
pub async fn timebinned_json(query: &BinnedQuery, cluster: &Cluster) -> Result<JsonValue, Error> {
pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Cluster) -> Result<JsonValue, Error> {
let binned_range = BinnedRange::covering_range(query.range().clone(), query.bin_count())?;
let events_max = 10000;
let bins_max = 10000;
let do_time_weight = query.agg_kind().do_time_weighted();
let timeout = Duration::from_millis(7500);
let deadline = Instant::now() + timeout;
let _empty = items::empty_events_dyn(&chconf.scalar_type, &chconf.shape, query.agg_kind());
let _empty = items_2::empty_events_dyn(&chconf.scalar_type, &chconf.shape, query.agg_kind());
let empty = items_2::empty_events_dyn_2(&chconf.scalar_type, &chconf.shape, query.agg_kind());
let empty = ChannelEvents::Events(empty);
let empty = items::sitem_data(empty);
let rawquery = PlainEventsQuery::new(
query.channel().clone(),
query.range().clone(),
@@ -25,6 +36,7 @@ pub async fn timebinned_json(query: &BinnedQuery, cluster: &Cluster) -> Result<J
let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&rawquery, cluster).await?;
// TODO propagate also the max-buf-len for the first stage event reader:
let stream = { items_2::merger::Merger::new(inps, 1) };
let stream = stream::iter([empty]).chain(stream);
let stream = Box::pin(stream);
let stream = crate::timebin::TimeBinnedStream::new(stream, binned_range.edges(), do_time_weight, deadline);
if false {
@@ -32,7 +44,7 @@ pub async fn timebinned_json(query: &BinnedQuery, cluster: &Cluster) -> Result<J
let _: Option<items::Sitemty<Box<dyn items_0::TimeBinned>>> = stream.next().await;
panic!()
}
let collected = crate::collect::collect(stream, deadline, events_max).await?;
let collected = crate::collect::collect(stream, deadline, bins_max, None, Some(binned_range.clone())).await?;
let jsval = serde_json::to_value(&collected)?;
Ok(jsval)
}