From 365cdcf2d61d96c39b5f301ed180072963e6f0a6 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 7 Oct 2024 23:40:56 +0200 Subject: [PATCH] Fix api 1 status request --- crates/disk/src/eventchunker.rs | 23 +++-- crates/httpclient/src/httpclient.rs | 7 +- crates/httpret/src/api1.rs | 53 +++++++---- crates/httpret/src/httpret.rs | 2 +- crates/items_0/src/timebin.rs | 26 +++--- crates/items_2/src/binning/aggregator.rs | 5 +- .../items_2/src/binning/container_events.rs | 2 +- .../binning/timeweight/timeweight_events.rs | 17 ++++ .../timeweight/timeweight_events_dyn.rs | 89 +++++++++++++++---- crates/items_2/src/binning/valuetype.rs | 1 + crates/netpod/src/netpod.rs | 10 ++- 11 files changed, 167 insertions(+), 68 deletions(-) diff --git a/crates/disk/src/eventchunker.rs b/crates/disk/src/eventchunker.rs index cf00616..6f3edcf 100644 --- a/crates/disk/src/eventchunker.rs +++ b/crates/disk/src/eventchunker.rs @@ -43,7 +43,8 @@ pub enum DataParseError { HeaderTooLarge, Utf8Error, EventTooShort, - EventTooLong, + #[error("EventTooLong({0}, {1})")] + EventTooLong(Shape, u32), TooManyBeforeRange, EventWithOptional, BadTypeIndex, @@ -261,10 +262,17 @@ impl EventChunker { if len < 20 { return Err(DataParseError::EventTooShort); } - match self.fetch_info.shape() { - Shape::Scalar if len > 1000 => return Err(DataParseError::EventTooLong), - Shape::Wave(_) if len > 500000 * 8 => return Err(DataParseError::EventTooLong), - Shape::Image(_, _) if len > 3200 * 3200 * 8 => return Err(DataParseError::EventTooLong), + let shape = self.fetch_info.shape(); + match shape { + Shape::Scalar if len > 1024 * 64 => { + return Err(DataParseError::EventTooLong(shape.clone(), len as _)) + } + Shape::Wave(_) if len > 1024 * 1024 * 32 => { + return Err(DataParseError::EventTooLong(shape.clone(), len as _)) + } + Shape::Image(_, _) if len > 1024 * 1024 * 200 => { + return Err(DataParseError::EventTooLong(shape.clone(), len as _)) + } _ => {} } let len = len as u32; @@ -481,11 +489,12 @@ impl EventChunker { log_items.push(item); } } - Err(_) => { + Err(e) => { self.discard_count_shape_derived_err += 1; ret.pop_back(); let msg = format!( - "shape_derived error {:?} {:?}", + "shape_derived error {} {:?} {:?}", + e, self.fetch_info.scalar_type(), self.fetch_info.shape(), ); diff --git a/crates/httpclient/src/httpclient.rs b/crates/httpclient/src/httpclient.rs index 5ef00b9..6294cfb 100644 --- a/crates/httpclient/src/httpclient.rs +++ b/crates/httpclient/src/httpclient.rs @@ -227,15 +227,15 @@ pub enum Error { NoHostInUrl, NoPortInUrl, Connection, - IO, + IO(std::io::Error), Http, } impl std::error::Error for Error {} impl From for Error { - fn from(_: std::io::Error) -> Self { - Self::IO + fn from(value: std::io::Error) -> Self { + Self::IO(value) } } @@ -269,6 +269,7 @@ pub struct HttpResponse { } pub async fn http_get(url: Url, accept: &str, ctx: &ReqCtx) -> Result { + debug!("http_get {:?} {:?} {:?}", url, accept, ctx); let req = Request::builder() .method(http::Method::GET) .uri(url.to_string()) diff --git a/crates/httpret/src/api1.rs b/crates/httpret/src/api1.rs index d59462c..0f41a17 100644 --- a/crates/httpret/src/api1.rs +++ b/crates/httpret/src/api1.rs @@ -3,6 +3,7 @@ use crate::body_string; use crate::err::Error; use crate::gather::gather_get_json_generic; use crate::gather::SubRes; +use crate::requests::accepts_json_or_all; use crate::response; use crate::ReqCtx; use crate::Requ; @@ -16,8 +17,10 @@ use http::header; use http::Method; use http::Response; use http::StatusCode; +use httpclient::body_bytes; use httpclient::body_stream; use httpclient::connect_client; +use httpclient::error_status_response; use httpclient::read_body_bytes; use httpclient::IntoBody; use httpclient::StreamResponse; @@ -1045,7 +1048,7 @@ impl Api1EventsBinaryHandler { .parse() .map_err(|e| Error::with_msg_no_trace(format!("{e}")))? } else { - 0 + ncc.ix as _ }; let req_stat_id = format!("{}{:02}", reqctx.reqid_this(), nodeno); info!("return req_stat_id {req_stat_id}"); @@ -1076,29 +1079,41 @@ impl RequestStatusHandler { } } - pub async fn handle(&self, req: Requ, _ncc: &NodeConfigCached) -> Result { + pub async fn handle(&self, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) -> Result { let (head, body) = req.into_parts(); if head.method != Method::GET { return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?); } - let accept = head - .headers - .get(header::ACCEPT) - .map_or(Ok(ACCEPT_ALL), |k| k.to_str()) - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))? - .to_owned(); - if accept != APP_JSON && accept != ACCEPT_ALL { - // TODO set the public error code and message and return Err(e). - let e = Error::with_public_msg_no_trace(format!("unsupported accept: {:?}", accept)); - error!("{e}"); + if accepts_json_or_all(&head.headers) { + let _body_data = read_body_bytes(body).await?; + let status_id = &head.uri.path()[Self::path_prefix().len()..]; + if status_id.len() == 8 { + debug!("RequestStatusHandler status_id {:?}", status_id); + let status = crate::status_board().unwrap().status_as_json(status_id); + let s = serde_json::to_string(&status)?; + let ret = response(StatusCode::OK).body(body_string(s))?; + Ok(ret) + } else if status_id.len() == 10 { + let node_id = + u32::from_str_radix(&status_id[8..10], 16).map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let status_id = status_id[0..8].to_string(); + let node2 = ncc + .node_config + .cluster + .nodes + .get(node_id as usize) + .ok_or_else(|| Error::with_msg_no_trace(format!("node {node_id} unknown")))?; + let url = Url::parse(&format!("{}api/1/requestStatus/{}", node2.baseurl(), status_id)) + .map_err(|_| Error::with_msg_no_trace(format!("bad url")))?; + let res = httpclient::http_get(url, APP_JSON, ctx).await?; + let ret = response(StatusCode::OK).body(body_bytes(res.body))?; + Ok(ret) + } else { + let ret = error_status_response(StatusCode::BAD_REQUEST, format!("bad status id requested"), "0000"); + Ok(ret) + } + } else { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); } - let _body_data = read_body_bytes(body).await?; - let status_id = &head.uri.path()[Self::path_prefix().len()..]; - debug!("RequestStatusHandler status_id {:?}", status_id); - let status = crate::status_board().unwrap().status_as_json(status_id); - let s = serde_json::to_string(&status)?; - let ret = response(StatusCode::OK).body(body_string(s))?; - Ok(ret) } } diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 1dec158..5c292c9 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -446,7 +446,7 @@ async fn http_service_inner( } else if let Some(h) = pulsemap::Api4MapPulseHttpFunction::handler(&req) { Ok(h.handle(req, &node_config).await?) } else if let Some(h) = api1::RequestStatusHandler::handler(&req) { - Ok(h.handle(req, &node_config).await?) + Ok(h.handle(req, ctx, &node_config).await?) } else if let Some(h) = api4::docs::DocsHandler::handler(&req) { Ok(h.handle(req, ctx).await?) } else { diff --git a/crates/items_0/src/timebin.rs b/crates/items_0/src/timebin.rs index 0f0e61a..0d15004 100644 --- a/crates/items_0/src/timebin.rs +++ b/crates/items_0/src/timebin.rs @@ -77,6 +77,12 @@ where } } +pub trait BinningggContainerEventsDyn: fmt::Debug { + fn binned_events_timeweight_traitobj(&self) -> Box; +} + +pub trait BinningggContainerBinsDyn: fmt::Debug {} + pub trait BinningggBinnerTy: fmt::Debug + Send { type Input: fmt::Debug; type Output: fmt::Debug; @@ -85,19 +91,6 @@ pub trait BinningggBinnerTy: fmt::Debug + Send { fn range_final(&mut self); fn bins_ready_count(&self) -> usize; fn bins_ready(&mut self) -> Option; - - /// If there is a bin in progress with non-zero count, push it to the result set. - /// With push_empty == true, a bin in progress is pushed even if it contains no counts. - fn push_in_progress(&mut self, push_empty: bool); - - /// Implies `Self::push_in_progress` but in addition, pushes a zero-count bin if the call - /// to `push_in_progress` did not change the result count, as long as edges are left. - /// The next call to `Self::bins_ready_count` must return one higher count than before. - fn cycle(&mut self); - - fn empty(&self) -> Option; - - fn append_empty_until_end(&mut self); } pub trait BinningggBinnableTy: fmt::Debug + WithLen + Send { @@ -111,7 +104,12 @@ pub trait BinningggBinnerDyn: fmt::Debug + Send { fn input_done_range_open(&mut self) -> Result<(), BinningggError>; } -pub trait BinningBinnableDyn: fmt::Debug + Send {} +pub trait BinnedEventsTimeweightTrait: fmt::Debug + Send { + fn ingest(&mut self, evs_all: Box) -> Result<(), BinningggError>; + fn input_done_range_final(&mut self) -> Result<(), BinningggError>; + fn input_done_range_open(&mut self) -> Result<(), BinningggError>; + fn output(&mut self) -> Result, BinningggError>; +} /// Data in time-binned form. pub trait TimeBinned: Any + TypeName + TimeBinnable + Resettable + Collectable + erased_serde::Serialize { diff --git a/crates/items_2/src/binning/aggregator.rs b/crates/items_2/src/binning/aggregator.rs index c05e306..ec3279f 100644 --- a/crates/items_2/src/binning/aggregator.rs +++ b/crates/items_2/src/binning/aggregator.rs @@ -5,7 +5,7 @@ use netpod::DtNano; use serde::Deserialize; use serde::Serialize; -pub trait AggTimeWeightOutputAvg: fmt::Debug + Clone + Serialize + for<'a> Deserialize<'a> {} +pub trait AggTimeWeightOutputAvg: fmt::Debug + Clone + Send + Serialize + for<'a> Deserialize<'a> {} impl AggTimeWeightOutputAvg for u64 {} @@ -13,7 +13,7 @@ impl AggTimeWeightOutputAvg for f32 {} impl AggTimeWeightOutputAvg for f64 {} -pub trait AggregatorTimeWeight +pub trait AggregatorTimeWeight: fmt::Debug + Send where EVT: EventValueType, { @@ -23,6 +23,7 @@ where fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> EVT::AggTimeWeightOutputAvg; } +#[derive(Debug)] pub struct AggregatorNumeric { sum: f64, } diff --git a/crates/items_2/src/binning/container_events.rs b/crates/items_2/src/binning/container_events.rs index 58fbadb..3d8fc6c 100644 --- a/crates/items_2/src/binning/container_events.rs +++ b/crates/items_2/src/binning/container_events.rs @@ -27,7 +27,7 @@ pub trait Container: fmt::Debug + Clone + PreviewRange + Serialize + for<'a fn pop_front(&mut self) -> Option; } -pub trait EventValueType: fmt::Debug + Clone + PartialOrd { +pub trait EventValueType: fmt::Debug + Clone + PartialOrd + Send { type Container: Container; type AggregatorTimeWeight: AggregatorTimeWeight; type AggTimeWeightOutputAvg: AggTimeWeightOutputAvg; diff --git a/crates/items_2/src/binning/timeweight/timeweight_events.rs b/crates/items_2/src/binning/timeweight/timeweight_events.rs index 45dc5df..a5f8c75 100644 --- a/crates/items_2/src/binning/timeweight/timeweight_events.rs +++ b/crates/items_2/src/binning/timeweight/timeweight_events.rs @@ -6,6 +6,7 @@ use crate::binning::container_events::ContainerEvents; use crate::binning::container_events::ContainerEventsTakeUpTo; use crate::binning::container_events::EventSingle; use crate::channelevents::ChannelEvents; +use core::fmt; use err::thiserror; use err::ThisError; use futures_util::Stream; @@ -81,6 +82,7 @@ struct LstRef<'a, EVT>(&'a EventSingle); struct LstMut<'a, EVT>(&'a mut EventSingle); +#[derive(Debug)] struct InnerB where EVT: EventValueType, @@ -243,6 +245,7 @@ where } } +#[derive(Debug)] struct InnerA where EVT: EventValueType, @@ -381,6 +384,20 @@ where out: ContainerBins, } +impl fmt::Debug for BinnedEventsTimeweight +where + EVT: EventValueType, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("BinnedEventsTimeweight") + .field("lst", &self.lst) + .field("range", &self.range) + .field("inner_a", &self.inner_a) + .field("out", &self.out) + .finish() + } +} + impl BinnedEventsTimeweight where EVT: EventValueType, diff --git a/crates/items_2/src/binning/timeweight/timeweight_events_dyn.rs b/crates/items_2/src/binning/timeweight/timeweight_events_dyn.rs index b78d8c5..3d30552 100644 --- a/crates/items_2/src/binning/timeweight/timeweight_events_dyn.rs +++ b/crates/items_2/src/binning/timeweight/timeweight_events_dyn.rs @@ -1,9 +1,14 @@ +use super::timeweight_events::BinnedEventsTimeweight; +use crate::binning::container_events::EventValueType; use crate::channelevents::ChannelEvents; use err::thiserror; use err::ThisError; use futures_util::Stream; use items_0::streamitem::Sitemty; +use items_0::timebin::BinnedEventsTimeweightTrait; use items_0::timebin::BinningggBinnerDyn; +use items_0::timebin::BinningggContainerBinsDyn; +use items_0::timebin::BinningggContainerEventsDyn; use items_0::timebin::BinningggError; use netpod::BinnedRange; use netpod::TsNano; @@ -17,37 +22,48 @@ pub enum Error { InnerDynMissing, } -pub struct BinnedEventsTimeweightDyn { +#[derive(Debug)] +pub struct BinnedEventsTimeweightDynbox +where + EVT: EventValueType, +{ range: BinnedRange, - binner: Option>, + binner: BinnedEventsTimeweight, } -impl BinnedEventsTimeweightDyn { - pub fn new(range: BinnedRange) -> Self { - Self { range, binner: None } +impl BinnedEventsTimeweightDynbox +where + EVT: EventValueType + 'static, +{ + pub fn new(range: BinnedRange) -> Box { + let ret = Self { + binner: BinnedEventsTimeweight::new(range.clone()), + range, + }; + Box::new(ret) } +} - pub fn ingest(&mut self, mut evs_all: ContainerEventsDyn) -> Result<(), BinningggError> { - TODO; +impl BinnedEventsTimeweightTrait for BinnedEventsTimeweightDynbox +where + EVT: EventValueType, +{ + fn ingest(&mut self, evs_all: Box) -> Result<(), BinningggError> { todo!() } - pub fn input_done_range_final(&mut self) -> Result<(), BinningggError> { - self.binner - .as_mut() - .ok_or(Error::InnerDynMissing)? - .input_done_range_final() + fn input_done_range_final(&mut self) -> Result<(), BinningggError> { + // self.binner.input_done_range_final() + todo!() } - pub fn input_done_range_open(&mut self) -> Result<(), BinningggError> { - self.binner - .as_mut() - .ok_or(Error::InnerDynMissing)? - .input_done_range_open() + fn input_done_range_open(&mut self) -> Result<(), BinningggError> { + // self.binner.input_done_range_open() + todo!() } - pub fn output(&mut self) -> ContainerBinsDyn { - TODO; + fn output(&mut self) -> Result, BinningggError> { + // self.binner.output() todo!() } } @@ -63,3 +79,38 @@ impl Stream for BinnedEventsTimeweightStream { todo!() } } + +#[derive(Debug)] +pub struct BinnedEventsTimeweightLazy { + range: BinnedRange, + binned_events: Option>, +} + +impl BinnedEventsTimeweightLazy { + pub fn new(range: BinnedRange) -> Self { + Self { + range, + binned_events: None, + } + } +} + +impl BinnedEventsTimeweightTrait for BinnedEventsTimeweightLazy { + fn ingest(&mut self, evs_all: Box) -> Result<(), BinningggError> { + // TODO the container must provide a method to create the dyn binner. + let binned_events = self.binned_events.get_or_insert_with(|| todo!()); + todo!() + } + + fn input_done_range_final(&mut self) -> Result<(), BinningggError> { + todo!() + } + + fn input_done_range_open(&mut self) -> Result<(), BinningggError> { + todo!() + } + + fn output(&mut self) -> Result, BinningggError> { + todo!() + } +} diff --git a/crates/items_2/src/binning/valuetype.rs b/crates/items_2/src/binning/valuetype.rs index de626a1..aec2e29 100644 --- a/crates/items_2/src/binning/valuetype.rs +++ b/crates/items_2/src/binning/valuetype.rs @@ -49,6 +49,7 @@ impl Container for EnumVariantContainer { } } +#[derive(Debug)] pub struct EnumVariantAggregatorTimeWeight { sum: f32, } diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 6df1394..bb85163 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -50,12 +50,12 @@ pub mod log_macros { } } -pub mod log2 { +pub mod log { pub use tracing::{self, event, span, Level}; pub use tracing::{debug, error, info, trace, warn}; } -pub mod log { +pub mod log_ { pub use crate::{debug, error, info, trace, warn}; pub use tracing::{self, event, span, Level}; } @@ -1337,6 +1337,12 @@ pub enum Shape { Image(u32, u32), } +impl fmt::Display for Shape { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(self, fmt) + } +} + mod serde_shape { use super::*;