Fix api 1 status request
This commit is contained in:
@@ -43,7 +43,8 @@ pub enum DataParseError {
|
|||||||
HeaderTooLarge,
|
HeaderTooLarge,
|
||||||
Utf8Error,
|
Utf8Error,
|
||||||
EventTooShort,
|
EventTooShort,
|
||||||
EventTooLong,
|
#[error("EventTooLong({0}, {1})")]
|
||||||
|
EventTooLong(Shape, u32),
|
||||||
TooManyBeforeRange,
|
TooManyBeforeRange,
|
||||||
EventWithOptional,
|
EventWithOptional,
|
||||||
BadTypeIndex,
|
BadTypeIndex,
|
||||||
@@ -261,10 +262,17 @@ impl EventChunker {
|
|||||||
if len < 20 {
|
if len < 20 {
|
||||||
return Err(DataParseError::EventTooShort);
|
return Err(DataParseError::EventTooShort);
|
||||||
}
|
}
|
||||||
match self.fetch_info.shape() {
|
let shape = self.fetch_info.shape();
|
||||||
Shape::Scalar if len > 1000 => return Err(DataParseError::EventTooLong),
|
match shape {
|
||||||
Shape::Wave(_) if len > 500000 * 8 => return Err(DataParseError::EventTooLong),
|
Shape::Scalar if len > 1024 * 64 => {
|
||||||
Shape::Image(_, _) if len > 3200 * 3200 * 8 => return Err(DataParseError::EventTooLong),
|
return Err(DataParseError::EventTooLong(shape.clone(), len as _))
|
||||||
|
}
|
||||||
|
Shape::Wave(_) if len > 1024 * 1024 * 32 => {
|
||||||
|
return Err(DataParseError::EventTooLong(shape.clone(), len as _))
|
||||||
|
}
|
||||||
|
Shape::Image(_, _) if len > 1024 * 1024 * 200 => {
|
||||||
|
return Err(DataParseError::EventTooLong(shape.clone(), len as _))
|
||||||
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
let len = len as u32;
|
let len = len as u32;
|
||||||
@@ -481,11 +489,12 @@ impl EventChunker {
|
|||||||
log_items.push(item);
|
log_items.push(item);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(e) => {
|
||||||
self.discard_count_shape_derived_err += 1;
|
self.discard_count_shape_derived_err += 1;
|
||||||
ret.pop_back();
|
ret.pop_back();
|
||||||
let msg = format!(
|
let msg = format!(
|
||||||
"shape_derived error {:?} {:?}",
|
"shape_derived error {} {:?} {:?}",
|
||||||
|
e,
|
||||||
self.fetch_info.scalar_type(),
|
self.fetch_info.scalar_type(),
|
||||||
self.fetch_info.shape(),
|
self.fetch_info.shape(),
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -227,15 +227,15 @@ pub enum Error {
|
|||||||
NoHostInUrl,
|
NoHostInUrl,
|
||||||
NoPortInUrl,
|
NoPortInUrl,
|
||||||
Connection,
|
Connection,
|
||||||
IO,
|
IO(std::io::Error),
|
||||||
Http,
|
Http,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::error::Error for Error {}
|
impl std::error::Error for Error {}
|
||||||
|
|
||||||
impl From<std::io::Error> for Error {
|
impl From<std::io::Error> for Error {
|
||||||
fn from(_: std::io::Error) -> Self {
|
fn from(value: std::io::Error) -> Self {
|
||||||
Self::IO
|
Self::IO(value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -269,6 +269,7 @@ pub struct HttpResponse {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn http_get(url: Url, accept: &str, ctx: &ReqCtx) -> Result<HttpResponse, Error> {
|
pub async fn http_get(url: Url, accept: &str, ctx: &ReqCtx) -> Result<HttpResponse, Error> {
|
||||||
|
debug!("http_get {:?} {:?} {:?}", url, accept, ctx);
|
||||||
let req = Request::builder()
|
let req = Request::builder()
|
||||||
.method(http::Method::GET)
|
.method(http::Method::GET)
|
||||||
.uri(url.to_string())
|
.uri(url.to_string())
|
||||||
|
|||||||
+34
-19
@@ -3,6 +3,7 @@ use crate::body_string;
|
|||||||
use crate::err::Error;
|
use crate::err::Error;
|
||||||
use crate::gather::gather_get_json_generic;
|
use crate::gather::gather_get_json_generic;
|
||||||
use crate::gather::SubRes;
|
use crate::gather::SubRes;
|
||||||
|
use crate::requests::accepts_json_or_all;
|
||||||
use crate::response;
|
use crate::response;
|
||||||
use crate::ReqCtx;
|
use crate::ReqCtx;
|
||||||
use crate::Requ;
|
use crate::Requ;
|
||||||
@@ -16,8 +17,10 @@ use http::header;
|
|||||||
use http::Method;
|
use http::Method;
|
||||||
use http::Response;
|
use http::Response;
|
||||||
use http::StatusCode;
|
use http::StatusCode;
|
||||||
|
use httpclient::body_bytes;
|
||||||
use httpclient::body_stream;
|
use httpclient::body_stream;
|
||||||
use httpclient::connect_client;
|
use httpclient::connect_client;
|
||||||
|
use httpclient::error_status_response;
|
||||||
use httpclient::read_body_bytes;
|
use httpclient::read_body_bytes;
|
||||||
use httpclient::IntoBody;
|
use httpclient::IntoBody;
|
||||||
use httpclient::StreamResponse;
|
use httpclient::StreamResponse;
|
||||||
@@ -1045,7 +1048,7 @@ impl Api1EventsBinaryHandler {
|
|||||||
.parse()
|
.parse()
|
||||||
.map_err(|e| Error::with_msg_no_trace(format!("{e}")))?
|
.map_err(|e| Error::with_msg_no_trace(format!("{e}")))?
|
||||||
} else {
|
} else {
|
||||||
0
|
ncc.ix as _
|
||||||
};
|
};
|
||||||
let req_stat_id = format!("{}{:02}", reqctx.reqid_this(), nodeno);
|
let req_stat_id = format!("{}{:02}", reqctx.reqid_this(), nodeno);
|
||||||
info!("return req_stat_id {req_stat_id}");
|
info!("return req_stat_id {req_stat_id}");
|
||||||
@@ -1076,29 +1079,41 @@ impl RequestStatusHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle(&self, req: Requ, _ncc: &NodeConfigCached) -> Result<StreamResponse, Error> {
|
pub async fn handle(&self, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) -> Result<StreamResponse, Error> {
|
||||||
let (head, body) = req.into_parts();
|
let (head, body) = req.into_parts();
|
||||||
if head.method != Method::GET {
|
if head.method != Method::GET {
|
||||||
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?);
|
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?);
|
||||||
}
|
}
|
||||||
let accept = head
|
if accepts_json_or_all(&head.headers) {
|
||||||
.headers
|
let _body_data = read_body_bytes(body).await?;
|
||||||
.get(header::ACCEPT)
|
let status_id = &head.uri.path()[Self::path_prefix().len()..];
|
||||||
.map_or(Ok(ACCEPT_ALL), |k| k.to_str())
|
if status_id.len() == 8 {
|
||||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?
|
debug!("RequestStatusHandler status_id {:?}", status_id);
|
||||||
.to_owned();
|
let status = crate::status_board().unwrap().status_as_json(status_id);
|
||||||
if accept != APP_JSON && accept != ACCEPT_ALL {
|
let s = serde_json::to_string(&status)?;
|
||||||
// TODO set the public error code and message and return Err(e).
|
let ret = response(StatusCode::OK).body(body_string(s))?;
|
||||||
let e = Error::with_public_msg_no_trace(format!("unsupported accept: {:?}", accept));
|
Ok(ret)
|
||||||
error!("{e}");
|
} else if status_id.len() == 10 {
|
||||||
|
let node_id =
|
||||||
|
u32::from_str_radix(&status_id[8..10], 16).map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
|
||||||
|
let status_id = status_id[0..8].to_string();
|
||||||
|
let node2 = ncc
|
||||||
|
.node_config
|
||||||
|
.cluster
|
||||||
|
.nodes
|
||||||
|
.get(node_id as usize)
|
||||||
|
.ok_or_else(|| Error::with_msg_no_trace(format!("node {node_id} unknown")))?;
|
||||||
|
let url = Url::parse(&format!("{}api/1/requestStatus/{}", node2.baseurl(), status_id))
|
||||||
|
.map_err(|_| Error::with_msg_no_trace(format!("bad url")))?;
|
||||||
|
let res = httpclient::http_get(url, APP_JSON, ctx).await?;
|
||||||
|
let ret = response(StatusCode::OK).body(body_bytes(res.body))?;
|
||||||
|
Ok(ret)
|
||||||
|
} else {
|
||||||
|
let ret = error_status_response(StatusCode::BAD_REQUEST, format!("bad status id requested"), "0000");
|
||||||
|
Ok(ret)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
|
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
|
||||||
}
|
}
|
||||||
let _body_data = read_body_bytes(body).await?;
|
|
||||||
let status_id = &head.uri.path()[Self::path_prefix().len()..];
|
|
||||||
debug!("RequestStatusHandler status_id {:?}", status_id);
|
|
||||||
let status = crate::status_board().unwrap().status_as_json(status_id);
|
|
||||||
let s = serde_json::to_string(&status)?;
|
|
||||||
let ret = response(StatusCode::OK).body(body_string(s))?;
|
|
||||||
Ok(ret)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -446,7 +446,7 @@ async fn http_service_inner(
|
|||||||
} else if let Some(h) = pulsemap::Api4MapPulseHttpFunction::handler(&req) {
|
} else if let Some(h) = pulsemap::Api4MapPulseHttpFunction::handler(&req) {
|
||||||
Ok(h.handle(req, &node_config).await?)
|
Ok(h.handle(req, &node_config).await?)
|
||||||
} else if let Some(h) = api1::RequestStatusHandler::handler(&req) {
|
} else if let Some(h) = api1::RequestStatusHandler::handler(&req) {
|
||||||
Ok(h.handle(req, &node_config).await?)
|
Ok(h.handle(req, ctx, &node_config).await?)
|
||||||
} else if let Some(h) = api4::docs::DocsHandler::handler(&req) {
|
} else if let Some(h) = api4::docs::DocsHandler::handler(&req) {
|
||||||
Ok(h.handle(req, ctx).await?)
|
Ok(h.handle(req, ctx).await?)
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -77,6 +77,12 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub trait BinningggContainerEventsDyn: fmt::Debug {
|
||||||
|
fn binned_events_timeweight_traitobj(&self) -> Box<dyn BinnedEventsTimeweightTrait>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait BinningggContainerBinsDyn: fmt::Debug {}
|
||||||
|
|
||||||
pub trait BinningggBinnerTy: fmt::Debug + Send {
|
pub trait BinningggBinnerTy: fmt::Debug + Send {
|
||||||
type Input: fmt::Debug;
|
type Input: fmt::Debug;
|
||||||
type Output: fmt::Debug;
|
type Output: fmt::Debug;
|
||||||
@@ -85,19 +91,6 @@ pub trait BinningggBinnerTy: fmt::Debug + Send {
|
|||||||
fn range_final(&mut self);
|
fn range_final(&mut self);
|
||||||
fn bins_ready_count(&self) -> usize;
|
fn bins_ready_count(&self) -> usize;
|
||||||
fn bins_ready(&mut self) -> Option<Self::Output>;
|
fn bins_ready(&mut self) -> Option<Self::Output>;
|
||||||
|
|
||||||
/// If there is a bin in progress with non-zero count, push it to the result set.
|
|
||||||
/// With push_empty == true, a bin in progress is pushed even if it contains no counts.
|
|
||||||
fn push_in_progress(&mut self, push_empty: bool);
|
|
||||||
|
|
||||||
/// Implies `Self::push_in_progress` but in addition, pushes a zero-count bin if the call
|
|
||||||
/// to `push_in_progress` did not change the result count, as long as edges are left.
|
|
||||||
/// The next call to `Self::bins_ready_count` must return one higher count than before.
|
|
||||||
fn cycle(&mut self);
|
|
||||||
|
|
||||||
fn empty(&self) -> Option<Self::Output>;
|
|
||||||
|
|
||||||
fn append_empty_until_end(&mut self);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait BinningggBinnableTy: fmt::Debug + WithLen + Send {
|
pub trait BinningggBinnableTy: fmt::Debug + WithLen + Send {
|
||||||
@@ -111,7 +104,12 @@ pub trait BinningggBinnerDyn: fmt::Debug + Send {
|
|||||||
fn input_done_range_open(&mut self) -> Result<(), BinningggError>;
|
fn input_done_range_open(&mut self) -> Result<(), BinningggError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait BinningBinnableDyn: fmt::Debug + Send {}
|
pub trait BinnedEventsTimeweightTrait: fmt::Debug + Send {
|
||||||
|
fn ingest(&mut self, evs_all: Box<dyn BinningggContainerEventsDyn>) -> Result<(), BinningggError>;
|
||||||
|
fn input_done_range_final(&mut self) -> Result<(), BinningggError>;
|
||||||
|
fn input_done_range_open(&mut self) -> Result<(), BinningggError>;
|
||||||
|
fn output(&mut self) -> Result<Box<dyn BinningggContainerBinsDyn>, BinningggError>;
|
||||||
|
}
|
||||||
|
|
||||||
/// Data in time-binned form.
|
/// Data in time-binned form.
|
||||||
pub trait TimeBinned: Any + TypeName + TimeBinnable + Resettable + Collectable + erased_serde::Serialize {
|
pub trait TimeBinned: Any + TypeName + TimeBinnable + Resettable + Collectable + erased_serde::Serialize {
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ use netpod::DtNano;
|
|||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
|
|
||||||
pub trait AggTimeWeightOutputAvg: fmt::Debug + Clone + Serialize + for<'a> Deserialize<'a> {}
|
pub trait AggTimeWeightOutputAvg: fmt::Debug + Clone + Send + Serialize + for<'a> Deserialize<'a> {}
|
||||||
|
|
||||||
impl AggTimeWeightOutputAvg for u64 {}
|
impl AggTimeWeightOutputAvg for u64 {}
|
||||||
|
|
||||||
@@ -13,7 +13,7 @@ impl AggTimeWeightOutputAvg for f32 {}
|
|||||||
|
|
||||||
impl AggTimeWeightOutputAvg for f64 {}
|
impl AggTimeWeightOutputAvg for f64 {}
|
||||||
|
|
||||||
pub trait AggregatorTimeWeight<EVT>
|
pub trait AggregatorTimeWeight<EVT>: fmt::Debug + Send
|
||||||
where
|
where
|
||||||
EVT: EventValueType,
|
EVT: EventValueType,
|
||||||
{
|
{
|
||||||
@@ -23,6 +23,7 @@ where
|
|||||||
fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> EVT::AggTimeWeightOutputAvg;
|
fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> EVT::AggTimeWeightOutputAvg;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
pub struct AggregatorNumeric {
|
pub struct AggregatorNumeric {
|
||||||
sum: f64,
|
sum: f64,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ pub trait Container<EVT>: fmt::Debug + Clone + PreviewRange + Serialize + for<'a
|
|||||||
fn pop_front(&mut self) -> Option<EVT>;
|
fn pop_front(&mut self) -> Option<EVT>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait EventValueType: fmt::Debug + Clone + PartialOrd {
|
pub trait EventValueType: fmt::Debug + Clone + PartialOrd + Send {
|
||||||
type Container: Container<Self>;
|
type Container: Container<Self>;
|
||||||
type AggregatorTimeWeight: AggregatorTimeWeight<Self>;
|
type AggregatorTimeWeight: AggregatorTimeWeight<Self>;
|
||||||
type AggTimeWeightOutputAvg: AggTimeWeightOutputAvg;
|
type AggTimeWeightOutputAvg: AggTimeWeightOutputAvg;
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ use crate::binning::container_events::ContainerEvents;
|
|||||||
use crate::binning::container_events::ContainerEventsTakeUpTo;
|
use crate::binning::container_events::ContainerEventsTakeUpTo;
|
||||||
use crate::binning::container_events::EventSingle;
|
use crate::binning::container_events::EventSingle;
|
||||||
use crate::channelevents::ChannelEvents;
|
use crate::channelevents::ChannelEvents;
|
||||||
|
use core::fmt;
|
||||||
use err::thiserror;
|
use err::thiserror;
|
||||||
use err::ThisError;
|
use err::ThisError;
|
||||||
use futures_util::Stream;
|
use futures_util::Stream;
|
||||||
@@ -81,6 +82,7 @@ struct LstRef<'a, EVT>(&'a EventSingle<EVT>);
|
|||||||
|
|
||||||
struct LstMut<'a, EVT>(&'a mut EventSingle<EVT>);
|
struct LstMut<'a, EVT>(&'a mut EventSingle<EVT>);
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
struct InnerB<EVT>
|
struct InnerB<EVT>
|
||||||
where
|
where
|
||||||
EVT: EventValueType,
|
EVT: EventValueType,
|
||||||
@@ -243,6 +245,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
struct InnerA<EVT>
|
struct InnerA<EVT>
|
||||||
where
|
where
|
||||||
EVT: EventValueType,
|
EVT: EventValueType,
|
||||||
@@ -381,6 +384,20 @@ where
|
|||||||
out: ContainerBins<EVT>,
|
out: ContainerBins<EVT>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<EVT> fmt::Debug for BinnedEventsTimeweight<EVT>
|
||||||
|
where
|
||||||
|
EVT: EventValueType,
|
||||||
|
{
|
||||||
|
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
fmt.debug_struct("BinnedEventsTimeweight")
|
||||||
|
.field("lst", &self.lst)
|
||||||
|
.field("range", &self.range)
|
||||||
|
.field("inner_a", &self.inner_a)
|
||||||
|
.field("out", &self.out)
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<EVT> BinnedEventsTimeweight<EVT>
|
impl<EVT> BinnedEventsTimeweight<EVT>
|
||||||
where
|
where
|
||||||
EVT: EventValueType,
|
EVT: EventValueType,
|
||||||
|
|||||||
@@ -1,9 +1,14 @@
|
|||||||
|
use super::timeweight_events::BinnedEventsTimeweight;
|
||||||
|
use crate::binning::container_events::EventValueType;
|
||||||
use crate::channelevents::ChannelEvents;
|
use crate::channelevents::ChannelEvents;
|
||||||
use err::thiserror;
|
use err::thiserror;
|
||||||
use err::ThisError;
|
use err::ThisError;
|
||||||
use futures_util::Stream;
|
use futures_util::Stream;
|
||||||
use items_0::streamitem::Sitemty;
|
use items_0::streamitem::Sitemty;
|
||||||
|
use items_0::timebin::BinnedEventsTimeweightTrait;
|
||||||
use items_0::timebin::BinningggBinnerDyn;
|
use items_0::timebin::BinningggBinnerDyn;
|
||||||
|
use items_0::timebin::BinningggContainerBinsDyn;
|
||||||
|
use items_0::timebin::BinningggContainerEventsDyn;
|
||||||
use items_0::timebin::BinningggError;
|
use items_0::timebin::BinningggError;
|
||||||
use netpod::BinnedRange;
|
use netpod::BinnedRange;
|
||||||
use netpod::TsNano;
|
use netpod::TsNano;
|
||||||
@@ -17,37 +22,48 @@ pub enum Error {
|
|||||||
InnerDynMissing,
|
InnerDynMissing,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct BinnedEventsTimeweightDyn {
|
#[derive(Debug)]
|
||||||
|
pub struct BinnedEventsTimeweightDynbox<EVT>
|
||||||
|
where
|
||||||
|
EVT: EventValueType,
|
||||||
|
{
|
||||||
range: BinnedRange<TsNano>,
|
range: BinnedRange<TsNano>,
|
||||||
binner: Option<Box<dyn BinningggBinnerDyn>>,
|
binner: BinnedEventsTimeweight<EVT>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BinnedEventsTimeweightDyn {
|
impl<EVT> BinnedEventsTimeweightDynbox<EVT>
|
||||||
pub fn new(range: BinnedRange<TsNano>) -> Self {
|
where
|
||||||
Self { range, binner: None }
|
EVT: EventValueType + 'static,
|
||||||
|
{
|
||||||
|
pub fn new(range: BinnedRange<TsNano>) -> Box<dyn BinnedEventsTimeweightTrait> {
|
||||||
|
let ret = Self {
|
||||||
|
binner: BinnedEventsTimeweight::new(range.clone()),
|
||||||
|
range,
|
||||||
|
};
|
||||||
|
Box::new(ret)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn ingest(&mut self, mut evs_all: ContainerEventsDyn) -> Result<(), BinningggError> {
|
impl<EVT> BinnedEventsTimeweightTrait for BinnedEventsTimeweightDynbox<EVT>
|
||||||
TODO;
|
where
|
||||||
|
EVT: EventValueType,
|
||||||
|
{
|
||||||
|
fn ingest(&mut self, evs_all: Box<dyn BinningggContainerEventsDyn>) -> Result<(), BinningggError> {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn input_done_range_final(&mut self) -> Result<(), BinningggError> {
|
fn input_done_range_final(&mut self) -> Result<(), BinningggError> {
|
||||||
self.binner
|
// self.binner.input_done_range_final()
|
||||||
.as_mut()
|
todo!()
|
||||||
.ok_or(Error::InnerDynMissing)?
|
|
||||||
.input_done_range_final()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn input_done_range_open(&mut self) -> Result<(), BinningggError> {
|
fn input_done_range_open(&mut self) -> Result<(), BinningggError> {
|
||||||
self.binner
|
// self.binner.input_done_range_open()
|
||||||
.as_mut()
|
todo!()
|
||||||
.ok_or(Error::InnerDynMissing)?
|
|
||||||
.input_done_range_open()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn output(&mut self) -> ContainerBinsDyn {
|
fn output(&mut self) -> Result<Box<dyn BinningggContainerBinsDyn>, BinningggError> {
|
||||||
TODO;
|
// self.binner.output()
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -63,3 +79,38 @@ impl Stream for BinnedEventsTimeweightStream {
|
|||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct BinnedEventsTimeweightLazy {
|
||||||
|
range: BinnedRange<TsNano>,
|
||||||
|
binned_events: Option<Box<dyn BinnedEventsTimeweightTrait>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BinnedEventsTimeweightLazy {
|
||||||
|
pub fn new(range: BinnedRange<TsNano>) -> Self {
|
||||||
|
Self {
|
||||||
|
range,
|
||||||
|
binned_events: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BinnedEventsTimeweightTrait for BinnedEventsTimeweightLazy {
|
||||||
|
fn ingest(&mut self, evs_all: Box<dyn BinningggContainerEventsDyn>) -> Result<(), BinningggError> {
|
||||||
|
// TODO the container must provide a method to create the dyn binner.
|
||||||
|
let binned_events = self.binned_events.get_or_insert_with(|| todo!());
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn input_done_range_final(&mut self) -> Result<(), BinningggError> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn input_done_range_open(&mut self) -> Result<(), BinningggError> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn output(&mut self) -> Result<Box<dyn BinningggContainerBinsDyn>, BinningggError> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -49,6 +49,7 @@ impl Container<EnumVariant> for EnumVariantContainer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
pub struct EnumVariantAggregatorTimeWeight {
|
pub struct EnumVariantAggregatorTimeWeight {
|
||||||
sum: f32,
|
sum: f32,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -50,12 +50,12 @@ pub mod log_macros {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub mod log2 {
|
pub mod log {
|
||||||
pub use tracing::{self, event, span, Level};
|
pub use tracing::{self, event, span, Level};
|
||||||
pub use tracing::{debug, error, info, trace, warn};
|
pub use tracing::{debug, error, info, trace, warn};
|
||||||
}
|
}
|
||||||
|
|
||||||
pub mod log {
|
pub mod log_ {
|
||||||
pub use crate::{debug, error, info, trace, warn};
|
pub use crate::{debug, error, info, trace, warn};
|
||||||
pub use tracing::{self, event, span, Level};
|
pub use tracing::{self, event, span, Level};
|
||||||
}
|
}
|
||||||
@@ -1337,6 +1337,12 @@ pub enum Shape {
|
|||||||
Image(u32, u32),
|
Image(u32, u32),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for Shape {
|
||||||
|
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
fmt::Debug::fmt(self, fmt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
mod serde_shape {
|
mod serde_shape {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user