diff --git a/daqbuffer/Cargo.toml b/daqbuffer/Cargo.toml index 6961e0b..cc37b26 100644 --- a/daqbuffer/Cargo.toml +++ b/daqbuffer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqbuffer" -version = "4.1.0" +version = "0.3.0" authors = ["Dominik Werder "] edition = "2021" diff --git a/dbconn/src/channelconfig.rs b/dbconn/src/channelconfig.rs index 32e1875..a12371c 100644 --- a/dbconn/src/channelconfig.rs +++ b/dbconn/src/channelconfig.rs @@ -1,15 +1,11 @@ +use crate::ErrConv; use err::Error; use netpod::log::*; -use netpod::{Channel, NodeConfigCached, ScalarType, Shape}; - -use crate::ErrConv; - -pub struct ChConf { - pub series: u64, - pub name: String, - pub scalar_type: ScalarType, - pub shape: Shape, -} +use netpod::ChConf; +use netpod::Channel; +use netpod::NodeConfigCached; +use netpod::ScalarType; +use netpod::Shape; /// It is an unsolved question as to how we want to uniquely address channels. /// Currently, the usual (backend, channelname) works in 99% of the cases, but the edge-cases @@ -26,9 +22,11 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) -> channel.backend, ncc.node_config.cluster.backend ); } + let backend = channel.backend().into(); if channel.backend() == "test-inmem" { let ret = if channel.name() == "inmem-d0-i32" { let ret = ChConf { + backend: channel.backend().into(), series: 1, name: channel.name().into(), scalar_type: ScalarType::I32, @@ -46,6 +44,7 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) -> // TODO the series-ids here are just random. Need to integrate with better test setup. let ret = if channel.name() == "scalar-i32-be" { let ret = ChConf { + backend, series: 1, name: channel.name().into(), scalar_type: ScalarType::I32, @@ -54,6 +53,7 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) -> Ok(ret) } else if channel.name() == "wave-f64-be-n21" { let ret = ChConf { + backend, series: 2, name: channel.name().into(), scalar_type: ScalarType::F64, @@ -62,6 +62,7 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) -> Ok(ret) } else if channel.name() == "const-regular-scalar-i32-be" { let ret = ChConf { + backend, series: 3, name: channel.name().into(), scalar_type: ScalarType::I32, @@ -96,6 +97,7 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) -> // TODO can I get a slice from psql driver? let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec>(2))?; let ret = ChConf { + backend, series, name, scalar_type, @@ -127,6 +129,7 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) -> // TODO can I get a slice from psql driver? let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec>(3))?; let ret = ChConf { + backend, series, name, scalar_type, diff --git a/dbconn/src/search.rs b/dbconn/src/search.rs index 79057c8..e40fb93 100644 --- a/dbconn/src/search.rs +++ b/dbconn/src/search.rs @@ -100,7 +100,7 @@ pub async fn search_channel_scylla( " series, facility, channel, scalar_type, shape_dims", " from series_by_channel", " where channel ~* $1", - " limit 100," + " limit 100", )); let pgclient = crate::create_connection(pgconf).await?; let rows = pgclient.query(sql.as_str(), &[&query.name_regex]).await.err_conv()?; diff --git a/err/src/lib.rs b/err/src/lib.rs index 0cc8728..3d4e5a8 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -1,6 +1,4 @@ -/*! -Error handling and reporting. -*/ +//! Error handling and reporting. use serde::{Deserialize, Serialize}; use std::array::TryFromSliceError; @@ -22,15 +20,17 @@ pub enum Reason { IoError, } -/** -The common error type for this application. -*/ +/// The common error type for this application. #[derive(Clone, PartialEq, Serialize, Deserialize)] pub struct Error { msg: String, + #[serde(default, skip_serializing_if = "Option::is_none")] trace_str: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] public_msg: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] reason: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] parent: Option>, } diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index 613566b..150c7f9 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "httpret" -version = "0.0.2" +version = "0.3.5" authors = ["Dominik Werder "] edition = "2021" diff --git a/httpret/src/api4/binned.rs b/httpret/src/api4/binned.rs index 21cd8c6..41bd921 100644 --- a/httpret/src/api4/binned.rs +++ b/httpret/src/api4/binned.rs @@ -37,7 +37,7 @@ async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCache span1.in_scope(|| { debug!("begin"); }); - let item = streams::timebinnedjson::timebinned_json(&query, &node_config.node_config.cluster) + let item = streams::timebinnedjson::timebinned_json(&query, &chconf, &node_config.node_config.cluster) .instrument(span1) .await?; let buf = serde_json::to_vec(&item)?; diff --git a/httpret/src/api4/search.rs b/httpret/src/api4/search.rs index 964a888..ece579d 100644 --- a/httpret/src/api4/search.rs +++ b/httpret/src/api4/search.rs @@ -42,7 +42,7 @@ impl ChannelSearchHandler { Ok(response(StatusCode::OK).body(Body::from(buf))?) } Err(e) => { - warn!("ChannelConfigHandler::handle: got error from channel_config: {e:?}"); + warn!("handle: got error from channel_search: {e:?}"); Ok(e.to_public_response()) } } diff --git a/httpret/src/api4/status.rs b/httpret/src/api4/status.rs index 6e3624e..02b2d16 100644 --- a/httpret/src/api4/status.rs +++ b/httpret/src/api4/status.rs @@ -10,6 +10,7 @@ use netpod::NodeConfigCached; use netpod::NodeStatus; use netpod::NodeStatusArchiverAppliance; use netpod::TableSizes; +use std::collections::VecDeque; use std::time::Duration; async fn table_sizes(node_config: &NodeConfigCached) -> Result { @@ -94,13 +95,14 @@ impl StatusNodesRecursive { let database_size = dbconn::database_size(node_config).await.map_err(|e| format!("{e:?}")); let ret = NodeStatus { name: format!("{}:{}", node_config.node.host, node_config.node.port), + version: core::env!("CARGO_PKG_VERSION").into(), is_sf_databuffer: node_config.node.sf_databuffer.is_some(), is_archiver_engine: node_config.node.channel_archiver.is_some(), is_archiver_appliance: node_config.node.archiver_appliance.is_some(), database_size: Some(database_size), table_sizes: Some(table_sizes(node_config).await.map_err(Into::into)), archiver_appliance_status, - subs: None, + subs: VecDeque::new(), }; Ok(ret) } diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index 9e75536..9a82042 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -1,7 +1,6 @@ use crate::err::Error; use crate::{response, ToPublicResponse}; use dbconn::channelconfig::chconf_from_database; -use dbconn::channelconfig::ChConf; use dbconn::create_connection; use futures_util::StreamExt; use http::{Method, Request, Response, StatusCode}; @@ -11,6 +10,7 @@ use netpod::log::*; use netpod::query::prebinned::PreBinnedQuery; use netpod::query::{BinnedQuery, PlainEventsQuery}; use netpod::timeunits::*; +use netpod::ChConf; use netpod::{Channel, ChannelConfigQuery, FromUrl, ScalarType, Shape}; use netpod::{ChannelConfigResponse, NodeConfigCached}; use netpod::{ACCEPT_ALL, APP_JSON}; @@ -33,6 +33,7 @@ pub async fn chconf_from_events_json(q: &PlainEventsQuery, ncc: &NodeConfigCache pub async fn chconf_from_prebinned(q: &PreBinnedQuery, _ncc: &NodeConfigCached) -> Result { let ret = ChConf { + backend: q.channel().backend().into(), series: q .channel() .series() diff --git a/httpret/src/events.rs b/httpret/src/events.rs index 7b8688b..aa90377 100644 --- a/httpret/src/events.rs +++ b/httpret/src/events.rs @@ -34,7 +34,6 @@ impl EventsHandler { } async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - info!("httpret plain_events req: {:?}", req); let accept_def = APP_JSON; let accept = req .headers() @@ -83,20 +82,19 @@ async fn plain_events_json( req: Request, node_config: &NodeConfigCached, ) -> Result, Error> { - debug!("httpret plain_events_json req: {:?}", req); + info!("httpret plain_events_json req: {:?}", req); let (_head, _body) = req.into_parts(); let query = PlainEventsQuery::from_url(&url)?; - let chconf = chconf_from_events_json(&query, node_config).await.map_err(|e| { - error!("chconf_from_events_json {e:?}"); - e.add_public_msg(format!("Can not get channel information")) - })?; + let chconf = chconf_from_events_json(&query, node_config) + .await + .map_err(Error::from)?; // Update the series id since we don't require some unique identifier yet. let mut query = query; query.set_series_id(chconf.series); let query = query; // --- //let query = RawEventsQuery::new(query.channel().clone(), query.range().clone(), AggKind::Plain); - let item = streams::plaineventsjson::plain_events_json(&query, &node_config.node_config.cluster).await; + let item = streams::plaineventsjson::plain_events_json(&query, &chconf, &node_config.node_config.cluster).await; let item = match item { Ok(item) => item, Err(e) => { diff --git a/httpret/src/gather.rs b/httpret/src/gather.rs index 5d435f0..98a9f84 100644 --- a/httpret/src/gather.rs +++ b/httpret/src/gather.rs @@ -178,7 +178,8 @@ pub async fn gather_get_json_generic( tags: Vec, nt: NT, ft: FT, - // TODO use deadline instead + // TODO use deadline instead. + // TODO Wait a bit longer compared to remote to receive partial results. timeout: Duration, ) -> Result where @@ -190,6 +191,8 @@ where + 'static, FT: Fn(Vec<(Tag, Result, Error>)>) -> Result, { + // TODO remove magic constant + let extra_timeout = Duration::from_millis(3000); if urls.len() != bodies.len() { return Err(Error::with_msg_no_trace("unequal numbers of urls and bodies")); } @@ -222,14 +225,17 @@ where let tag2 = tag.clone(); let jh = tokio::spawn(async move { select! { - _ = sleep(timeout).fuse() => { + _ = sleep(timeout + extra_timeout).fuse() => { + error!("PROXY TIMEOUT"); Err(Error::with_msg_no_trace("timeout")) } res = { let client = Client::new(); client.request(req?).fuse() } => { + info!("received result in time"); let ret = nt(tag2, res?).await?; + info!("transformed result in time"); Ok(ret) } } diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index e833315..5ddc820 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -112,8 +112,8 @@ async fn proxy_http_service_inner( h.handle(req, ctx, &proxy_config).await } else if path == "/api/4/backends" { Ok(backends(req, proxy_config).await?) - } else if path == "/api/4/search/channel" { - Ok(api4::channel_search(req, proxy_config).await?) + } else if let Some(h) = api4::ChannelSearchAggHandler::handler(&req) { + h.handle(req, &proxy_config).await } else if path == "/api/4/events" { Ok(proxy_single_backend_query::(req, ctx, proxy_config).await?) } else if path == "/api/4/status/connection/events" { diff --git a/httpret/src/proxy/api4.rs b/httpret/src/proxy/api4.rs index bdc270d..373a20d 100644 --- a/httpret/src/proxy/api4.rs +++ b/httpret/src/proxy/api4.rs @@ -1,108 +1,147 @@ +use crate::bodystream::ToPublicResponse; use crate::err::Error; -use crate::gather::{gather_get_json_generic, SubRes, Tag}; -use crate::{response, ReqCtx}; +use crate::gather::gather_get_json_generic; +use crate::gather::SubRes; +use crate::gather::Tag; +use crate::response; +use crate::ReqCtx; use futures_util::Future; -use http::{header, Request, Response, StatusCode}; +use http::Method; +use http::Request; +use http::Response; +use http::StatusCode; use hyper::Body; -use itertools::Itertools; use netpod::log::*; +use netpod::ChannelSearchQuery; +use netpod::ChannelSearchResult; use netpod::NodeStatus; +use netpod::NodeStatusSub; +use netpod::ProxyConfig; use netpod::ACCEPT_ALL; -use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON}; +use netpod::APP_JSON; use serde_json::Value as JsVal; -use std::collections::BTreeMap; +use std::collections::VecDeque; use std::pin::Pin; use std::time::Duration; use url::Url; -pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> Result, Error> { +// TODO model channel search according to StatusNodesRecursive. +// Make sure that backend handling is correct: +// The aggregator asks all backends, except if the user specifies some backend +// in which case it should only go to the matching backends. +// The aggregators and leaf nodes behind should as well not depend on backend, +// but simply answer all matching. + +pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> Result { let (head, _body) = req.into_parts(); - let vdef = header::HeaderValue::from_static(APP_JSON); - let v = head.headers.get(header::ACCEPT).unwrap_or(&vdef); - if v == APP_JSON || v == ACCEPT_ALL { - let inpurl = Url::parse(&format!("dummy:{}", head.uri))?; - let query = ChannelSearchQuery::from_url(&inpurl)?; - let mut bodies = vec![]; - let urls = proxy_config - .backends - .iter() - .filter(|k| { - if let Some(back) = &query.backend { - back == &k.name - } else { - true - } - }) - .map(|pb| match Url::parse(&format!("{}/api/4/search/channel", pb.url)) { + let inpurl = Url::parse(&format!("dummy:{}", head.uri))?; + let query = ChannelSearchQuery::from_url(&inpurl)?; + let mut urls = Vec::new(); + let mut tags = Vec::new(); + let mut bodies = Vec::new(); + for pb in &proxy_config.backends { + if if let Some(b) = &query.backend { + pb.name.contains(b) + } else { + true + } { + match Url::parse(&format!("{}/api/4/search/channel", pb.url)) { Ok(mut url) => { query.append_to_url(&mut url); - Ok(url) + tags.push(url.to_string()); + bodies.push(None); + urls.push(url); + } + Err(_) => return Err(Error::with_msg(format!("parse error for: {:?}", pb))), + } + } + } + let nt = |tag, res| { + let fut = async { + let body = hyper::body::to_bytes(res).await?; + info!("got a result {:?}", body); + let res: ChannelSearchResult = match serde_json::from_slice(&body) { + Ok(k) => k, + Err(_) => { + let msg = format!("can not parse result: {}", String::from_utf8_lossy(&body)); + error!("{}", msg); + return Err(Error::with_msg_no_trace(msg)); } - Err(_) => Err(Error::with_msg(format!("parse error for: {:?}", pb))), - }) - .fold_ok(vec![], |mut a, x| { - a.push(x); - bodies.push(None); - a - })?; - let tags = urls.iter().map(|k| k.to_string()).collect(); - let nt = |tag, res| { - let fut = async { - let body = hyper::body::to_bytes(res).await?; - //info!("got a result {:?}", body); - let res: ChannelSearchResult = match serde_json::from_slice(&body) { - Ok(k) => k, - Err(_) => { - let msg = format!("can not parse result: {}", String::from_utf8_lossy(&body)); - error!("{}", msg); - return Err(Error::with_msg_no_trace(msg)); - } - }; - let ret = SubRes { - tag, - status: StatusCode::OK, - val: res, - }; - Ok(ret) }; - Box::pin(fut) as Pin + Send>> + let ret = SubRes { + tag, + status: StatusCode::OK, + val: res, + }; + Ok(ret) }; - let ft = |all: Vec<(Tag, Result, Error>)>| { - let mut res = Vec::new(); - for (_tag, j) in all { - match j { - Ok(j) => { - for k in j.val.channels { - res.push(k); - } - } - Err(e) => { - warn!("{e}"); + Box::pin(fut) as Pin + Send>> + }; + let ft = |all: Vec<(Tag, Result, Error>)>| { + let mut res = Vec::new(); + for (_tag, j) in all { + match j { + Ok(j) => { + for k in j.val.channels { + res.push(k); } } + Err(e) => { + warn!("{e}"); + } } - let res = ChannelSearchResult { channels: res }; - let res = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, APP_JSON) - .body(Body::from(serde_json::to_string(&res)?)) - .map_err(Error::from)?; - Ok(res) - }; - let ret = gather_get_json_generic( - http::Method::GET, - urls, - bodies, - tags, - nt, - ft, - Duration::from_millis(3000), - ) - .await?; - Ok(ret) - } else { - Ok(response(StatusCode::NOT_ACCEPTABLE) - .body(Body::from(format!("{:?}", proxy_config.name))) - .map_err(Error::from)?) + } + let res = ChannelSearchResult { channels: res }; + Ok(res) + }; + let ret = gather_get_json_generic( + http::Method::GET, + urls, + bodies, + tags, + nt, + ft, + Duration::from_millis(3000), + ) + .await?; + Ok(ret) +} + +pub struct ChannelSearchAggHandler {} + +impl ChannelSearchAggHandler { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/4/search/channel" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &ProxyConfig) -> Result, Error> { + if req.method() == Method::GET { + let accept_def = APP_JSON; + let accept = req + .headers() + .get(http::header::ACCEPT) + .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); + if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { + match channel_search(req, node_config).await { + Ok(item) => { + let buf = serde_json::to_vec(&item)?; + Ok(response(StatusCode::OK).body(Body::from(buf))?) + } + Err(e) => { + warn!("ChannelConfigHandler::handle: got error from channel_config: {e:?}"); + Ok(e.to_public_response()) + } + } + } else { + Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) + } + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } } } @@ -183,27 +222,33 @@ impl StatusNodesRecursive { Box::pin(fut) as Pin + Send>> }; let ft = |all: Vec<(Tag, Result, Error>)>| { - let mut subs = BTreeMap::new(); + let mut subs = VecDeque::new(); for (tag, sr) in all { match sr { Ok(sr) => { let s: Result = serde_json::from_value(sr.val).map_err(err::Error::from); - subs.insert(tag.0, s); + let sub = NodeStatusSub { url: tag.0, status: s }; + subs.push_back(sub); } Err(e) => { - subs.insert(tag.0, Err(err::Error::from(e))); + let sub = NodeStatusSub { + url: tag.0, + status: Err(err::Error::from(e)), + }; + subs.push_back(sub); } } } let ret = NodeStatus { name: format!("{}:{}", proxy_config.name, proxy_config.port), + version: core::env!("CARGO_PKG_VERSION").into(), is_sf_databuffer: false, is_archiver_engine: false, is_archiver_appliance: false, database_size: None, table_sizes: None, archiver_appliance_status: None, - subs: Some(subs), + subs, }; Ok(ret) }; diff --git a/items_0/src/collect_c.rs b/items_0/src/collect_c.rs index 4767985..5db322c 100644 --- a/items_0/src/collect_c.rs +++ b/items_0/src/collect_c.rs @@ -3,7 +3,10 @@ use crate::collect_s::ToJsonResult; use crate::AsAnyMut; use crate::AsAnyRef; use crate::Events; +use crate::WithLen; use err::Error; +use netpod::BinnedRange; +use netpod::NanoRange; use std::fmt; pub trait Collector: fmt::Debug + Send { @@ -11,14 +14,13 @@ pub trait Collector: fmt::Debug + Send { fn ingest(&mut self, item: &mut dyn Collectable); fn set_range_complete(&mut self); fn set_timed_out(&mut self); - fn result(&mut self) -> Result, Error>; + fn result(&mut self, range: Option, binrange: Option) -> Result, Error>; } -pub trait Collectable: fmt::Debug + AsAnyMut { +pub trait Collectable: fmt::Debug + AsAnyMut + crate::WithLen { fn new_collector(&self) -> Box; } -// TODO can this get removed? pub trait Collected: fmt::Debug + ToJsonResult + AsAnyRef + Send {} erased_serde::serialize_trait_object!(Collected); @@ -44,13 +46,19 @@ pub trait CollectorDyn: fmt::Debug + Send { fn set_timed_out(&mut self); - fn result(&mut self) -> Result, Error>; + fn result(&mut self, range: Option, binrange: Option) -> Result, Error>; } pub trait CollectableWithDefault: AsAnyMut { fn new_collector(&self) -> Box; } +impl crate::WithLen for Box { + fn len(&self) -> usize { + self.as_ref().len() + } +} + impl Collectable for Box { fn new_collector(&self) -> Box { todo!() @@ -77,11 +85,21 @@ impl Collector for TimeBinnedCollector { todo!() } - fn result(&mut self) -> Result, Error> { + fn result( + &mut self, + _range: Option, + _binrange: Option, + ) -> Result, Error> { todo!() } } +impl WithLen for Box { + fn len(&self) -> usize { + self.as_ref().len() + } +} + impl Collectable for Box { fn new_collector(&self) -> Box { self.as_ref().new_collector() diff --git a/items_0/src/collect_s.rs b/items_0/src/collect_s.rs index 6263d20..fec4518 100644 --- a/items_0/src/collect_s.rs +++ b/items_0/src/collect_s.rs @@ -1,6 +1,10 @@ use super::collect_c::Collected; -use crate::{AsAnyMut, AsAnyRef, WithLen}; +use crate::AsAnyMut; +use crate::AsAnyRef; +use crate::WithLen; use err::Error; +use netpod::BinnedRange; +use netpod::NanoRange; use serde::Serialize; use std::any::Any; use std::fmt; @@ -15,14 +19,18 @@ pub trait CollectorType: Send + Unpin + WithLen { fn set_timed_out(&mut self); // TODO use this crate's Error instead: - fn result(&mut self) -> Result; + fn result(&mut self, range: Option, binrange: Option) -> Result; } pub trait Collector: Send + Unpin + WithLen { fn ingest(&mut self, src: &mut dyn Collectable); fn set_range_complete(&mut self); fn set_timed_out(&mut self); - fn result(&mut self) -> Result, Error>; + fn result( + &mut self, + range: Option, + binrange: Option, + ) -> Result, Error>; } // TODO rename to `Typed` @@ -49,8 +57,12 @@ impl Collector for T { T::set_timed_out(self) } - fn result(&mut self) -> Result, Error> { - let ret = T::result(self)?; + fn result( + &mut self, + range: Option, + binrange: Option, + ) -> Result, Error> { + let ret = T::result(self, range, binrange)?; Ok(Box::new(ret) as _) } } diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs index 43fd427..f351fe3 100644 --- a/items_0/src/items_0.rs +++ b/items_0/src/items_0.rs @@ -127,7 +127,7 @@ pub trait TimeBinnable: fmt::Debug + WithLen + RangeOverlapInfo + Any + AsAnyRef /// Container of some form of events, for use as trait object. pub trait Events: - fmt::Debug + Any + Collectable + CollectableWithDefault + TimeBinnable + Send + erased_serde::Serialize + fmt::Debug + Any + Collectable + CollectableWithDefault + TimeBinnable + WithLen + Send + erased_serde::Serialize { fn as_time_binnable(&self) -> &dyn TimeBinnable; fn verify(&self) -> bool; diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs index c801166..410c7c2 100644 --- a/items_2/src/binsdim0.rs +++ b/items_2/src/binsdim0.rs @@ -10,9 +10,9 @@ use items_0::TimeBins; use items_0::WithLen; use items_0::{AppendEmptyBin, AsAnyRef}; use items_0::{AsAnyMut, Empty}; -use netpod::log::*; use netpod::timeunits::SEC; use netpod::NanoRange; +use netpod::{log::*, BinnedRange}; use num_traits::Zero; use serde::{Deserialize, Serialize}; use std::any::Any; @@ -386,8 +386,12 @@ impl CollectorType for BinsDim0Collector { self.timed_out = true; } - fn result(&mut self) -> Result { - let bin_count_exp = 0; + fn result(&mut self, _range: Option, binrange: Option) -> Result { + let bin_count_exp = if let Some(r) = &binrange { + r.bin_count() as u32 + } else { + 0 + }; let bin_count = self.vals.ts1s.len() as u32; let (missing_bins, continue_at, finished_at) = if bin_count < bin_count_exp { match self.vals.ts2s.back() { @@ -502,8 +506,12 @@ where CollectorType::set_timed_out(self) } - fn result(&mut self) -> Result, Error> { - match CollectorType::result(self) { + fn result( + &mut self, + range: Option, + binrange: Option, + ) -> Result, Error> { + match CollectorType::result(self, range, binrange) { Ok(res) => Ok(Box::new(res)), Err(e) => Err(e.into()), } diff --git a/items_2/src/binsxbindim0.rs b/items_2/src/binsxbindim0.rs index 808176e..18f4b0a 100644 --- a/items_2/src/binsxbindim0.rs +++ b/items_2/src/binsxbindim0.rs @@ -10,9 +10,9 @@ use items_0::TimeBinned; use items_0::TimeBins; use items_0::WithLen; use items_0::{AppendEmptyBin, AsAnyMut, AsAnyRef}; -use netpod::log::*; use netpod::timeunits::SEC; use netpod::NanoRange; +use netpod::{log::*, BinnedRange}; use num_traits::Zero; use serde::{Deserialize, Serialize}; use std::any::Any; @@ -387,8 +387,12 @@ impl CollectorType for BinsXbinDim0Collector { self.timed_out = true; } - fn result(&mut self) -> Result { - let bin_count_exp = 0; + fn result(&mut self, _range: Option, binrange: Option) -> Result { + let bin_count_exp = if let Some(r) = &binrange { + r.bin_count() as u32 + } else { + 0 + }; let bin_count = self.vals.ts1s.len() as u32; let (missing_bins, continue_at, finished_at) = if bin_count < bin_count_exp { match self.vals.ts2s.back() { @@ -503,8 +507,12 @@ where CollectorType::set_timed_out(self) } - fn result(&mut self) -> Result, Error> { - match CollectorType::result(self) { + fn result( + &mut self, + range: Option, + binrange: Option, + ) -> Result, Error> { + match CollectorType::result(self, range, binrange) { Ok(res) => Ok(Box::new(res)), Err(e) => Err(e.into()), } diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index 65c94a4..980e541 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -8,6 +8,8 @@ use items_0::collect_s::Collector; use items_0::AsAnyMut; use items_0::AsAnyRef; use netpod::log::*; +use netpod::BinnedRange; +use netpod::NanoRange; use serde::{Deserialize, Serialize}; use std::any::Any; use std::fmt; @@ -669,7 +671,11 @@ impl items_0::collect_c::Collector for ChannelEventsCollector { self.timed_out = true; } - fn result(&mut self) -> Result, err::Error> { + fn result( + &mut self, + range: Option, + binrange: Option, + ) -> Result, err::Error> { match self.coll.as_mut() { Some(coll) => { if self.range_complete { @@ -678,20 +684,25 @@ impl items_0::collect_c::Collector for ChannelEventsCollector { if self.timed_out { coll.set_timed_out(); } - let res = coll.result()?; - //error!("fix output of ChannelEventsCollector [03ce6bc5a]"); - //err::todo(); - //let output = ChannelEventsCollectorOutput {}; + let res = coll.result(range, binrange)?; Ok(res) } None => { error!("nothing collected [caa8d2565]"); - todo!() + Err(err::Error::with_public_msg_no_trace("nothing collected [caa8d2565]")) } } } } +impl items_0::WithLen for ChannelEvents { + fn len(&self) -> usize { + match self { + ChannelEvents::Events(k) => k.len(), + ChannelEvents::Status(_) => 1, + } + } +} impl items_0::collect_c::Collectable for ChannelEvents { fn new_collector(&self) -> Box { Box::new(ChannelEventsCollector::new()) diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index f86c1fb..19afaa9 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -5,9 +5,9 @@ use crate::{TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinn use err::Error; use items_0::scalar_ops::ScalarOps; use items_0::{AsAnyMut, AsAnyRef, Empty, Events, WithLen}; -use netpod::log::*; use netpod::timeunits::SEC; use netpod::NanoRange; +use netpod::{log::*, BinnedRange}; use serde::{Deserialize, Serialize}; use std::any::Any; use std::collections::VecDeque; @@ -151,7 +151,7 @@ where #[derive(Debug)] pub struct EventsDim0Collector { vals: EventsDim0, - range_complete: bool, + range_final: bool, timed_out: bool, } @@ -159,7 +159,7 @@ impl EventsDim0Collector { pub fn new() -> Self { Self { vals: EventsDim0::empty(), - range_complete: false, + range_final: false, timed_out: false, } } @@ -186,7 +186,7 @@ pub struct EventsDim0CollectorOutput { #[serde(rename = "values")] values: VecDeque, #[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")] - range_complete: bool, + range_final: bool, #[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")] timed_out: bool, #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] @@ -220,7 +220,7 @@ impl EventsDim0CollectorOutput { } pub fn range_complete(&self) -> bool { - self.range_complete + self.range_final } pub fn timed_out(&self) -> bool { @@ -266,14 +266,14 @@ impl items_0::collect_s::CollectorType for EventsDim0Collector Result { + fn result(&mut self, range: Option, _binrange: Option) -> Result { // If we timed out, we want to hint the client from where to continue. // This is tricky: currently, client can not request a left-exclusive range. // We currently give the timestamp of the last event plus a small delta. @@ -281,13 +281,17 @@ impl items_0::collect_s::CollectorType for EventsDim0Collector items_0::collect_s::CollectorType for EventsDim0Collector items_0::collect_c::Collector for EventsDim0Collector items_0::collect_s::CollectorType::set_timed_out(self) } - fn result(&mut self) -> Result, err::Error> { - match items_0::collect_s::CollectorType::result(self) { + fn result( + &mut self, + range: Option, + binrange: Option, + ) -> Result, err::Error> { + match items_0::collect_s::CollectorType::result(self, range, binrange) { Ok(x) => Ok(Box::new(x)), Err(e) => Err(e.into()), } @@ -970,7 +978,11 @@ impl items_0::collect_c::CollectorDyn for EventsDim0CollectorDyn { todo!() } - fn result(&mut self) -> Result, err::Error> { + fn result( + &mut self, + _range: Option, + _binrange: Option, + ) -> Result, err::Error> { todo!() } } @@ -998,8 +1010,12 @@ impl items_0::collect_c::CollectorDyn for EventsDim0Collector Result, err::Error> { - items_0::collect_s::CollectorType::result(self) + fn result( + &mut self, + range: Option, + binrange: Option, + ) -> Result, err::Error> { + items_0::collect_s::CollectorType::result(self, range, binrange) .map(|x| Box::new(x) as _) .map_err(|e| e.into()) } diff --git a/items_2/src/eventsxbindim0.rs b/items_2/src/eventsxbindim0.rs index da57a42..efcf2eb 100644 --- a/items_2/src/eventsxbindim0.rs +++ b/items_2/src/eventsxbindim0.rs @@ -1,13 +1,13 @@ use crate::binsxbindim0::BinsXbinDim0; -use crate::RangeOverlapInfo; use crate::{pulse_offs_from_abs, ts_offs_from_abs}; +use crate::{IsoDateTime, RangeOverlapInfo}; use crate::{TimeBinnableType, TimeBinnableTypeAggregator}; use err::Error; use items_0::scalar_ops::ScalarOps; use items_0::{AsAnyMut, WithLen}; use items_0::{AsAnyRef, Empty}; -use netpod::log::*; use netpod::NanoRange; +use netpod::{log::*, BinnedRange}; use serde::{Deserialize, Serialize}; use std::any::Any; use std::collections::VecDeque; @@ -359,10 +359,11 @@ pub struct EventsXbinDim0CollectorOutput { #[serde(rename = "avgs")] avgs: VecDeque, #[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")] - finalised_range: bool, + range_final: bool, #[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")] timed_out: bool, - // TODO add continue-at + #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] + continue_at: Option, } impl AsAnyRef for EventsXbinDim0CollectorOutput @@ -398,14 +399,14 @@ impl items_0::collect_c::Collected for EventsXbinDim0CollectorOutput w #[derive(Debug)] pub struct EventsXbinDim0Collector { vals: EventsXbinDim0, - finalised_range: bool, + range_final: bool, timed_out: bool, } impl EventsXbinDim0Collector { pub fn new() -> Self { Self { - finalised_range: false, + range_final: false, timed_out: false, vals: EventsXbinDim0::empty(), } @@ -434,15 +435,32 @@ where } fn set_range_complete(&mut self) { - self.finalised_range = true; + self.range_final = true; } fn set_timed_out(&mut self) { self.timed_out = true; } - fn result(&mut self) -> Result { + fn result(&mut self, range: Option, _binrange: Option) -> Result { use std::mem::replace; + let continue_at = if self.timed_out { + if let Some(ts) = self.vals.tss.back() { + Some(IsoDateTime::from_u64(*ts + netpod::timeunits::MS)) + } else { + if let Some(range) = &range { + Some(IsoDateTime::from_u64(range.beg + netpod::timeunits::SEC)) + } else { + // TODO tricky: should yield again the original range begin? Leads to recursion. + // Range begin plus delta? + // Anyway, we don't have the range begin here. + warn!("timed out without any result, can not yield a continue-at"); + None + } + } + } else { + None + }; let mins = replace(&mut self.vals.mins, VecDeque::new()); let maxs = replace(&mut self.vals.maxs, VecDeque::new()); let avgs = replace(&mut self.vals.avgs, VecDeque::new()); @@ -459,8 +477,9 @@ where mins, maxs, avgs, - finalised_range: self.finalised_range, + range_final: self.range_final, timed_out: self.timed_out, + continue_at, }; Ok(ret) } @@ -497,7 +516,11 @@ where todo!() } - fn result(&mut self) -> Result, Error> { + fn result( + &mut self, + _range: Option, + _binrange: Option, + ) -> Result, Error> { todo!() } } diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index d3e3e92..579cf36 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -371,7 +371,8 @@ fn flush_binned( } } -// TODO remove +// TODO remove. +// Compare with items_2::test::bin01 pub async fn binned_collected( scalar_type: ScalarType, shape: Shape, @@ -477,7 +478,7 @@ pub async fn binned_collected( } match coll { Some(mut coll) => { - let res = coll.result().map_err(|e| format!("{e}"))?; + let res = coll.result(None, None).map_err(|e| format!("{e}"))?; tokio::time::sleep(Duration::from_millis(2000)).await; Ok(res) } diff --git a/items_2/src/test.rs b/items_2/src/test.rs index 6fe4c4d..38f2452 100644 --- a/items_2/src/test.rs +++ b/items_2/src/test.rs @@ -298,7 +298,12 @@ fn bin01() { let mut stream = ChannelEventsMerger::new(vec![inp1, inp2]); let mut coll = None; let mut binner = None; - let edges: Vec<_> = (0..10).into_iter().map(|t| SEC * 10 * t).collect(); + let range = NanoRange { + beg: SEC * 0, + end: SEC * 100, + }; + let binrange = BinnedRange::covering_range(range, 10).unwrap(); + let edges = binrange.edges(); // TODO implement continue-at [hcn2956jxhwsf] #[allow(unused)] let bin_count_exp = (edges.len() - 1) as u32; @@ -369,7 +374,7 @@ fn bin01() { } match coll { Some(mut coll) => { - let res = coll.result().map_err(|e| format!("{e}"))?; + let res = coll.result(None, Some(binrange.clone())).map_err(|e| format!("{e}"))?; //let res = res.to_json_result().map_err(|e| format!("{e}"))?; //let res = res.to_json_bytes().map_err(|e| format!("{e}"))?; eprintln!("res {res:?}"); diff --git a/netpod/Cargo.toml b/netpod/Cargo.toml index 19aa77d..1e889ad 100644 --- a/netpod/Cargo.toml +++ b/netpod/Cargo.toml @@ -10,6 +10,7 @@ path = "src/netpod.rs" [dependencies] serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +humantime-serde = "1.1.1" async-channel = "1.6" bytes = "1.3" chrono = { version = "0.4.19", features = ["serde"] } diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index c678756..209e975 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -11,7 +11,7 @@ use err::Error; use futures_util::{Stream, StreamExt}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsVal; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, VecDeque}; use std::fmt; use std::iter::FromIterator; use std::net::SocketAddr; @@ -543,11 +543,28 @@ pub struct TableSizes { pub sizes: Vec<(String, String)>, } +#[derive(Debug, Serialize, Deserialize)] +pub struct NodeStatusSub { + pub url: String, + pub status: Result, +} + +fn is_false(x: T) -> bool +where + T: std::borrow::Borrow, +{ + *x.borrow() == false +} + #[derive(Debug, Serialize, Deserialize)] pub struct NodeStatus { pub name: String, + pub version: String, + #[serde(default, skip_serializing_if = "is_false")] pub is_sf_databuffer: bool, + #[serde(default, skip_serializing_if = "is_false")] pub is_archiver_engine: bool, + #[serde(default, skip_serializing_if = "is_false")] pub is_archiver_appliance: bool, #[serde(default, skip_serializing_if = "Option::is_none")] pub database_size: Option>, @@ -555,8 +572,8 @@ pub struct NodeStatus { pub table_sizes: Option>, #[serde(default, skip_serializing_if = "Option::is_none")] pub archiver_appliance_status: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub subs: Option>>, + #[serde(default, skip_serializing_if = "VecDeque::is_empty")] + pub subs: VecDeque, } // Describes a "channel" which is a time-series with a unique name within a "backend". @@ -601,12 +618,19 @@ impl FromUrl for Channel { .into(), name: pairs .get("channelName") - .ok_or(Error::with_public_msg("missing channelName"))? + //.ok_or(Error::with_public_msg("missing channelName"))? + .map(String::from) + .unwrap_or(String::new()) .into(), series: pairs .get("seriesId") .and_then(|x| x.parse::().map_or(None, |x| Some(x))), }; + if ret.name.is_empty() && ret.series.is_none() { + return Err(Error::with_public_msg(format!( + "Missing one of channelName or seriesId parameters." + ))); + } Ok(ret) } } @@ -615,7 +639,9 @@ impl AppendToUrl for Channel { fn append_to_url(&self, url: &mut Url) { let mut g = url.query_pairs_mut(); g.append_pair("backend", &self.backend); - g.append_pair("channelName", &self.name); + if self.name().len() > 0 { + g.append_pair("channelName", &self.name); + } if let Some(series) = self.series { g.append_pair("seriesId", &series.to_string()); } @@ -2088,7 +2114,7 @@ pub struct ChannelConfigResponse { Provide basic information about a channel, especially it's shape. Also, byte-order is important for clients that process the raw databuffer event data (python data_api3). */ -#[derive(Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct ChannelInfo { pub scalar_type: ScalarType, pub byte_order: Option, @@ -2096,6 +2122,15 @@ pub struct ChannelInfo { pub msg: serde_json::Value, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChConf { + pub backend: String, + pub series: u64, + pub name: String, + pub scalar_type: ScalarType, + pub shape: Shape, +} + pub fn f32_close(a: f32, b: f32) -> bool { if (a - b).abs() < 1e-4 || (a / b > 0.999 && a / b < 1.001) { true diff --git a/netpod/src/query.rs b/netpod/src/query.rs index ded8915..60b484e 100644 --- a/netpod/src/query.rs +++ b/netpod/src/query.rs @@ -3,11 +3,23 @@ pub mod datetime; pub mod prebinned; use crate::get_url_query_pairs; +use crate::is_false; use crate::log::*; -use crate::{AggKind, AppendToUrl, ByteSize, Channel, FromUrl, HasBackend, HasTimeout, NanoRange, ToNanos}; -use chrono::{DateTime, TimeZone, Utc}; +use crate::AggKind; +use crate::AppendToUrl; +use crate::ByteSize; +use crate::Channel; +use crate::FromUrl; +use crate::HasBackend; +use crate::HasTimeout; +use crate::NanoRange; +use crate::ToNanos; +use chrono::DateTime; +use chrono::TimeZone; +use chrono::Utc; use err::Error; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; +use serde::Serialize; use std::collections::BTreeMap; use std::fmt; use std::time::Duration; @@ -71,11 +83,19 @@ pub struct PlainEventsQuery { range: NanoRange, agg_kind: AggKind, timeout: Duration, + #[serde(default, skip_serializing_if = "Option::is_none")] events_max: Option, + #[serde(default, skip_serializing_if = "Option::is_none", with = "humantime_serde")] + event_delay: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] stream_batch_len: Option, + #[serde(default, skip_serializing_if = "is_false")] report_error: bool, + #[serde(default, skip_serializing_if = "is_false")] do_log: bool, + #[serde(default, skip_serializing_if = "is_false")] do_test_main_error: bool, + #[serde(default, skip_serializing_if = "is_false")] do_test_stream_error: bool, } @@ -94,6 +114,7 @@ impl PlainEventsQuery { agg_kind, timeout, events_max, + event_delay: None, stream_batch_len: None, report_error: false, do_log, @@ -126,8 +147,12 @@ impl PlainEventsQuery { self.timeout } - pub fn events_max(&self) -> Option { - self.events_max + pub fn events_max(&self) -> u64 { + self.events_max.unwrap_or(1024 * 1024) + } + + pub fn event_delay(&self) -> &Option { + &self.event_delay } pub fn do_log(&self) -> bool { @@ -196,6 +221,9 @@ impl FromUrl for PlainEventsQuery { events_max: pairs .get("eventsMax") .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + event_delay: pairs.get("eventDelay").map_or(Ok(None), |k| { + k.parse::().map(|x| Duration::from_millis(x)).map(|k| Some(k)) + })?, stream_batch_len: pairs .get("streamBatchLen") .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, @@ -242,10 +270,15 @@ impl AppendToUrl for PlainEventsQuery { if let Some(x) = self.events_max.as_ref() { g.append_pair("eventsMax", &format!("{}", x)); } + if let Some(x) = self.event_delay.as_ref() { + g.append_pair("eventDelay", &format!("{:.0}", x.as_secs_f64() * 1e3)); + } if let Some(x) = self.stream_batch_len.as_ref() { g.append_pair("streamBatchLen", &format!("{}", x)); } - g.append_pair("doLog", &format!("{}", self.do_log)); + if self.do_log { + g.append_pair("doLog", &format!("{}", self.do_log)); + } } } @@ -427,7 +460,12 @@ impl AppendToUrl for BinnedQuery { { self.channel.append_to_url(url); let mut g = url.query_pairs_mut(); - g.append_pair("cacheUsage", &self.cache_usage.to_string()); + match &self.cache_usage { + CacheUsage::Use => {} + _ => { + g.append_pair("cacheUsage", &self.cache_usage.to_string()); + } + } g.append_pair("binCount", &format!("{}", self.bin_count)); g.append_pair( "begDate", @@ -443,11 +481,16 @@ impl AppendToUrl for BinnedQuery { } { let mut g = url.query_pairs_mut(); - g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size)); - g.append_pair("diskStatsEveryKb", &format!("{}", self.disk_stats_every.bytes() / 1024)); + // TODO + //g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size)); + //g.append_pair("diskStatsEveryKb", &format!("{}", self.disk_stats_every.bytes() / 1024)); g.append_pair("timeout", &format!("{}", self.timeout.as_millis())); - g.append_pair("abortAfterBinCount", &format!("{}", self.abort_after_bin_count)); - g.append_pair("doLog", &format!("{}", self.do_log)); + if self.abort_after_bin_count > 0 { + g.append_pair("abortAfterBinCount", &format!("{}", self.abort_after_bin_count)); + } + if self.do_log { + g.append_pair("doLog", &format!("{}", self.do_log)); + } } } } diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index eff78ff..29b7367 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -81,9 +81,6 @@ async fn events_conn_handler_inner_try( error!("missing command frame len {}", frames.len()); return Err((Error::with_msg("missing command frame"), netout).into()); } - //if frames[1].tyid() != items::TERM_FRAME_TYPE_ID { - // return Err((Error::with_msg("input without term frame"), netout).into()); - //} let query_frame = &frames[0]; if query_frame.tyid() != items::EVENT_QUERY_JSON_STRING_FRAME { return Err((Error::with_msg("query frame wrong type"), netout).into()); @@ -184,24 +181,43 @@ async fn events_conn_handler_inner_try( scy, do_test_stream_error, ); - let stream = stream.map(|item| { - let item = match item { - Ok(item) => match item { - ChannelEvents::Events(item) => { - let item = ChannelEvents::Events(item); - let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); - item - } - ChannelEvents::Status(item) => { - let item = ChannelEvents::Status(item); - let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); - item + let stream = stream + .map(|item| match &item { + Ok(k) => match k { + ChannelEvents::Events(k) => { + let n = k.len(); + let d = evq.event_delay(); + (item, n, d.clone()) } + ChannelEvents::Status(_) => (item, 1, None), }, - Err(e) => Err(e), - }; - item - }); + Err(_) => (item, 1, None), + }) + .then(|(item, n, d)| async move { + if let Some(d) = d { + debug!("sleep {} times {:?}", n, d); + tokio::time::sleep(d).await; + } + item + }) + .map(|item| { + let item = match item { + Ok(item) => match item { + ChannelEvents::Events(item) => { + let item = ChannelEvents::Events(item); + let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); + item + } + ChannelEvents::Status(item) => { + let item = ChannelEvents::Status(item); + let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); + item + } + }, + Err(e) => Err(e), + }; + item + }); Box::pin(stream) } else if let Some(_) = &node_config.node.channel_archiver { let e = Error::with_msg_no_trace("archapp not built"); @@ -236,6 +252,7 @@ async fn events_conn_handler_inner_try( let item = item.make_frame(); match item { Ok(buf) => { + info!("write {} bytes", buf.len()); buf_len_histo.ingest(buf.len() as u32); match netout.write_all(&buf).await { Ok(_) => {} diff --git a/streams/src/collect.rs b/streams/src/collect.rs index a672e1b..f9810e7 100644 --- a/streams/src/collect.rs +++ b/streams/src/collect.rs @@ -2,7 +2,7 @@ use err::Error; use futures_util::{Stream, StreamExt}; use items::{RangeCompletableItem, Sitemty, StreamItem}; use items_0::collect_c::Collectable; -use netpod::log::*; +use netpod::{log::*, BinnedRange, NanoRange}; use std::fmt; use std::time::{Duration, Instant}; use tracing::Instrument; @@ -25,99 +25,120 @@ macro_rules! trace4 { ($($arg:tt)*) => (eprintln!($($arg)*)); } +async fn collect_in_span( + stream: S, + deadline: Instant, + events_max: u64, + range: Option, + binrange: Option, +) -> Result, Error> +where + S: Stream> + Unpin, + T: Collectable + items_0::WithLen + fmt::Debug, +{ + let mut collector: Option> = None; + let mut stream = stream; + let deadline = deadline.into(); + let mut range_complete = false; + let mut timed_out = false; + let mut total_duration = Duration::ZERO; + loop { + let item = match tokio::time::timeout_at(deadline, stream.next()).await { + Ok(Some(k)) => k, + Ok(None) => break, + Err(_e) => { + warn!("collect_in_span time out"); + timed_out = true; + if let Some(coll) = collector.as_mut() { + coll.set_timed_out(); + } else { + warn!("Timeout but no collector yet"); + } + break; + } + }; + info!("collect_in_span see item"); + match item { + Ok(item) => match item { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + range_complete = true; + if let Some(coll) = collector.as_mut() { + coll.set_range_complete(); + } else { + warn!("Received RangeComplete but no collector yet"); + } + } + RangeCompletableItem::Data(mut item) => { + info!("collect_in_span sees {}", item.len()); + if collector.is_none() { + let c = item.new_collector(); + collector = Some(c); + } + let coll = collector.as_mut().unwrap(); + coll.ingest(&mut item); + if coll.len() as u64 >= events_max { + warn!("Reached events_max {} abort", events_max); + break; + } + } + }, + StreamItem::Log(item) => { + trace!("Log {:?}", item); + } + StreamItem::Stats(item) => { + trace!("Stats {:?}", item); + use items::StatsItem; + use netpod::DiskStats; + match item { + // TODO factor and simplify the stats collection: + StatsItem::EventDataReadStats(_) => {} + StatsItem::RangeFilterStats(_) => {} + StatsItem::DiskStats(item) => match item { + DiskStats::OpenStats(k) => { + total_duration += k.duration; + } + DiskStats::SeekStats(k) => { + total_duration += k.duration; + } + DiskStats::ReadStats(k) => { + total_duration += k.duration; + } + DiskStats::ReadExactStats(k) => { + total_duration += k.duration; + } + }, + } + } + }, + Err(e) => { + // TODO Need to use some flags to get good enough error message for remote user. + return Err(e); + } + } + } + let _ = range_complete; + let _ = timed_out; + let res = collector + .ok_or_else(|| Error::with_msg_no_trace(format!("no result because no collector was created")))? + .result(range, binrange)?; + debug!("Total duration: {:?}", total_duration); + Ok(res) +} + pub async fn collect( stream: S, deadline: Instant, events_max: u64, + range: Option, + binrange: Option, ) -> Result, Error> where S: Stream> + Unpin, - T: Collectable + fmt::Debug, + T: Collectable + items_0::WithLen + fmt::Debug, { let span = tracing::span!(tracing::Level::TRACE, "collect"); - let fut = async { - let mut collector: Option> = None; - let mut stream = stream; - let deadline = deadline.into(); - let mut range_complete = false; - let mut total_duration = Duration::ZERO; - loop { - let item = match tokio::time::timeout_at(deadline, stream.next()).await { - Ok(Some(k)) => k, - Ok(None) => break, - Err(_e) => { - if let Some(coll) = collector.as_mut() { - coll.set_timed_out(); - } else { - warn!("Timeout but no collector yet"); - } - break; - } - }; - match item { - Ok(item) => match item { - StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => { - range_complete = true; - if let Some(coll) = collector.as_mut() { - coll.set_range_complete(); - } else { - warn!("Received RangeComplete but no collector yet"); - } - } - RangeCompletableItem::Data(mut item) => { - if collector.is_none() { - let c = item.new_collector(); - collector = Some(c); - } - let coll = collector.as_mut().unwrap(); - coll.ingest(&mut item); - if coll.len() as u64 >= events_max { - warn!("Reached events_max {} abort", events_max); - break; - } - } - }, - StreamItem::Log(item) => { - trace!("Log {:?}", item); - } - StreamItem::Stats(item) => { - trace!("Stats {:?}", item); - use items::StatsItem; - use netpod::DiskStats; - match item { - // TODO factor and simplify the stats collection: - StatsItem::EventDataReadStats(_) => {} - StatsItem::RangeFilterStats(_) => {} - StatsItem::DiskStats(item) => match item { - DiskStats::OpenStats(k) => { - total_duration += k.duration; - } - DiskStats::SeekStats(k) => { - total_duration += k.duration; - } - DiskStats::ReadStats(k) => { - total_duration += k.duration; - } - DiskStats::ReadExactStats(k) => { - total_duration += k.duration; - } - }, - } - } - }, - Err(e) => { - // TODO Need to use some flags to get good enough error message for remote user. - return Err(e); - } - } - } - let _ = range_complete; - let res = collector - .ok_or_else(|| Error::with_msg_no_trace(format!("no result because no collector was created")))? - .result()?; - debug!("Total duration: {:?}", total_duration); - Ok(res) - }; - fut.instrument(span).await + collect_in_span(stream, deadline, events_max, range, binrange) + .instrument(span) + .await } diff --git a/streams/src/frames/inmem.rs b/streams/src/frames/inmem.rs index 5b8bf46..1c7ea87 100644 --- a/streams/src/frames/inmem.rs +++ b/streams/src/frames/inmem.rs @@ -62,7 +62,7 @@ where Ready(Ok(())) => { let n = buf.filled().len(); self.buf.wadv(n)?; - trace!("recv bytes {}", n); + debug!("recv bytes {}", n); Ready(Ok(n)) } Ready(Err(e)) => Ready(Err(e.into())), @@ -131,6 +131,7 @@ where return Err(e); } self.inp_bytes_consumed += lentot as u64; + debug!("parsed frame well len {}", len); let ret = InMemoryFrame { len, tyid, diff --git a/streams/src/plaineventsjson.rs b/streams/src/plaineventsjson.rs index 8464950..9374b49 100644 --- a/streams/src/plaineventsjson.rs +++ b/streams/src/plaineventsjson.rs @@ -1,15 +1,28 @@ use crate::tcprawclient::open_tcp_streams; use err::Error; -#[allow(unused)] -use netpod::log::*; +use futures_util::stream; +use futures_util::StreamExt; +use items_2::channelevents::ChannelEvents; use netpod::query::PlainEventsQuery; +use netpod::ChConf; use netpod::Cluster; use serde_json::Value as JsonValue; +use std::time::Duration; use std::time::Instant; -pub async fn plain_events_json(query: &PlainEventsQuery, cluster: &Cluster) -> Result { - let deadline = Instant::now() + query.timeout(); - let events_max = query.events_max().unwrap_or(1024 * 32); +pub async fn plain_events_json( + query: &PlainEventsQuery, + chconf: &ChConf, + cluster: &Cluster, +) -> Result { + // TODO remove magic constant + let deadline = Instant::now() + query.timeout() + Duration::from_millis(1000); + let events_max = query.events_max(); + let _empty = items::empty_events_dyn(&chconf.scalar_type, &chconf.shape, query.agg_kind()); + let _empty = items_2::empty_events_dyn(&chconf.scalar_type, &chconf.shape, query.agg_kind()); + let empty = items_2::empty_events_dyn_2(&chconf.scalar_type, &chconf.shape, query.agg_kind()); + let empty = ChannelEvents::Events(empty); + let empty = items::sitem_data(empty); // TODO should be able to ask for data-events only, instead of mixed data and status events. let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&query, cluster).await?; //let inps = open_tcp_streams::<_, Box>(&query, cluster).await?; @@ -23,8 +36,9 @@ pub async fn plain_events_json(query: &PlainEventsQuery, cluster: &Cluster) -> R let stream = inp0.chain(inp1).chain(inp2); stream }; - let stream = { items_2::merger::Merger::new(inps, 512) }; - let collected = crate::collect::collect(stream, deadline, events_max).await?; + let stream = { items_2::merger::Merger::new(inps, 1) }; + let stream = stream::iter([empty]).chain(stream); + let collected = crate::collect::collect(stream, deadline, events_max, Some(query.range().clone()), None).await?; let jsval = serde_json::to_value(&collected)?; Ok(jsval) } diff --git a/streams/src/test/collect.rs b/streams/src/test/collect.rs index 243792e..d2850c2 100644 --- a/streams/src/test/collect.rs +++ b/streams/src/test/collect.rs @@ -15,7 +15,7 @@ fn collect_channel_events() -> Result<(), Error> { let stream = stream::iter(vec![sitem_data(evs0), sitem_data(evs1)]); let deadline = Instant::now() + Duration::from_millis(4000); let events_max = 10000; - let res = crate::collect::collect(stream, deadline, events_max).await?; + let res = crate::collect::collect(stream, deadline, events_max, None, None).await?; //eprintln!("collected result: {res:?}"); if let Some(res) = res.as_any_ref().downcast_ref::>() { eprintln!("Great, a match"); diff --git a/streams/src/timebinnedjson.rs b/streams/src/timebinnedjson.rs index 6c59849..ad62074 100644 --- a/streams/src/timebinnedjson.rs +++ b/streams/src/timebinnedjson.rs @@ -1,19 +1,30 @@ use crate::tcprawclient::open_tcp_streams; use err::Error; +use futures_util::stream; use futures_util::StreamExt; +use items_2::channelevents::ChannelEvents; #[allow(unused)] use netpod::log::*; -use netpod::query::{BinnedQuery, PlainEventsQuery}; -use netpod::{BinnedRange, Cluster}; +use netpod::query::BinnedQuery; +use netpod::query::PlainEventsQuery; +use netpod::BinnedRange; +use netpod::ChConf; +use netpod::Cluster; use serde_json::Value as JsonValue; -use std::time::{Duration, Instant}; +use std::time::Duration; +use std::time::Instant; -pub async fn timebinned_json(query: &BinnedQuery, cluster: &Cluster) -> Result { +pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Cluster) -> Result { let binned_range = BinnedRange::covering_range(query.range().clone(), query.bin_count())?; - let events_max = 10000; + let bins_max = 10000; let do_time_weight = query.agg_kind().do_time_weighted(); let timeout = Duration::from_millis(7500); let deadline = Instant::now() + timeout; + let _empty = items::empty_events_dyn(&chconf.scalar_type, &chconf.shape, query.agg_kind()); + let _empty = items_2::empty_events_dyn(&chconf.scalar_type, &chconf.shape, query.agg_kind()); + let empty = items_2::empty_events_dyn_2(&chconf.scalar_type, &chconf.shape, query.agg_kind()); + let empty = ChannelEvents::Events(empty); + let empty = items::sitem_data(empty); let rawquery = PlainEventsQuery::new( query.channel().clone(), query.range().clone(), @@ -25,6 +36,7 @@ pub async fn timebinned_json(query: &BinnedQuery, cluster: &Cluster) -> Result(&rawquery, cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader: let stream = { items_2::merger::Merger::new(inps, 1) }; + let stream = stream::iter([empty]).chain(stream); let stream = Box::pin(stream); let stream = crate::timebin::TimeBinnedStream::new(stream, binned_range.edges(), do_time_weight, deadline); if false { @@ -32,7 +44,7 @@ pub async fn timebinned_json(query: &BinnedQuery, cluster: &Cluster) -> Result>> = stream.next().await; panic!() } - let collected = crate::collect::collect(stream, deadline, events_max).await?; + let collected = crate::collect::collect(stream, deadline, bins_max, None, Some(binned_range.clone())).await?; let jsval = serde_json::to_value(&collected)?; Ok(jsval) }