diff --git a/apidoc/src/bins.md b/apidoc/src/bins.md index 189fa89..5899410 100644 --- a/apidoc/src/bins.md +++ b/apidoc/src/bins.md @@ -3,7 +3,7 @@ Binned data can be fetched like this: ```bash -curl "https://data-api.psi.ch/api/4/binned?backend=sf-databuffer&channelName=S10BC01-DBPM010:Q1&begDate=2024-02-15T00:00:00Z&endDate=2024-02-15T12:00:00Z&binWidth=" +curl "https://data-api.psi.ch/api/4/binned?backend=sf-databuffer&channelName=S10BC01-DBPM010:Q1&begDate=2024-02-15T00:00:00Z&endDate=2024-02-15T12:00:00Z&binWidth=5m" ``` Parameters: @@ -15,7 +15,7 @@ Parameters: `1h` for 1 hour. - `binCount`: requested number of bins, can not be combined with `binWidth`. - `contentTimeout`: return the so-far computed results after the given timeout. - The streaming (e.g. `json-framed`) response will yield results in `contentTimeout` intervals. + When streaming content was requested (e.g. `json-framed`) the response will yield results in `contentTimeout` intervals. - `allowLargeResult=true` **DEPRECATED, will be rejected in the future** indicates that the client is prepared to accept also larger responses compared to what might be suitable for a typical browser. Please download large result sets as @@ -27,10 +27,41 @@ Note: it is an error to specify both `binWidth` and `binCount`. The server may return more than `binCount` bins, and it will choose a `binWidth` from a set of supported widths to best possibly match the requested width. +If the service was not able to complete a single bin within the timeout it returns HTTP 504. + +For each bin both edges are returned (ts1 and ts2) in order to maybe support sparse and varying-width +binned responses in the future. + +The json response contains timestamps in the form `TIMESTAMP = ANCHOR + MILLIS + NANOS`. Example response, truncated to 4 bins: ```json { + "tsAnchor": 1726394700, + "ts1Ms": [ + 0, + 300000, + 600000, + 900000 + ], + "ts2Ms": [ + 300000, + 600000, + 900000, + 1200000 + ], + "ts1Ns": [ + 0, + 0, + 0, + 0 + ], + "ts2Ns": [ + 0, + 0, + 0, + 0 + ], "avgs": [ 0.010802877135574818, 0.010565019212663174, @@ -54,41 +85,18 @@ Example response, truncated to 4 bins: 0.0040797283872962, 0.004329073242843151, 0.004934651777148247 - ], - "ts1Ms": [ - 0, - 300000, - 600000, - 900000 - ], - "ts1Ns": [ - 0, - 0, - 0, - 0 - ], - "ts2Ms": [ - 300000, - 600000, - 900000, - 1200000 - ], - "ts2Ns": [ - 0, - 0, - 0, - 0 - ], - "tsAnchor": 1726394700 + ] } ``` +Note: the fields `ts1Ns` and `ts2Ns` may be omitted in the response if they are all zero. + ## As framed JSON stream To download larger amounts data as JSON it is recommended to use the `json-framed` content encoding. Using this encoding, the server can send the requested events as a stream of json objects, where each -json object contains a batch of events. +json object contains a batch of bins. This content encoding is triggered via the `Accept: application/json-framed` header in the request. The returned body looks like: @@ -108,4 +116,4 @@ where each `[JSON-frame]` looks like: ``` Note: "data" objects are currently identified by the presence of the `ts1s` key. -There can be other types of objects, like keepalive, log or statistics. +There can be other types of objects, like keepalive, log messages or request metrics. diff --git a/apidoc/src/events.md b/apidoc/src/events.md index 47e79af..69162f8 100644 --- a/apidoc/src/events.md +++ b/apidoc/src/events.md @@ -31,6 +31,46 @@ the key `continueAt` which indicates that the response is incomplete and that th issue another request with `begDate` as given by `continueAt`. +By default, or explicitly with `Accept: application/json` the response will be a json object. + +Example response: +```json +{ + "tsAnchor": 1727336613, + "tsMs": [ + 3, + 13, + 23, + 33 + ], + "tsNs": [ + 80998, + 80999, + 81000, + 81001 + ], + "pulseAnchor": 22238080000, + "pulseOff": [ + 998, + 999, + 1000, + 1001 + ], + "values": [ + -0.005684227774617943, + -0.0056660833356960184, + -0.005697272133280464, + -0.005831689871131955 + ] +} +``` + +The timestamp of each event is `TIMESTAMP = ANCHOR + MILLIS + NANOS`. + +Note: the field `tsNs` may be omitted in the response if they are all zero. + + + ## Events as framed JSON stream To download larger amounts data as JSON it is recommended to use the `json-framed` content encoding. diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index 107a6cd..a787245 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -14,6 +14,7 @@ use http::StatusCode; use httpclient::body_empty; use httpclient::body_stream; use httpclient::error_response; +use httpclient::error_status_response; use httpclient::not_found_response; use httpclient::IntoBody; use httpclient::Requ; @@ -25,6 +26,7 @@ use netpod::timeunits::SEC; use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::ReqCtx; +use netpod::APP_JSON; use netpod::APP_JSON_FRAMED; use netpod::HEADER_NAME_REQUEST_ID; use nodenet::client::OpenBoxedBytesViaHttp; @@ -33,6 +35,7 @@ use query::api4::binned::BinnedQuery; use scyllaconn::bincache::ScyllaCacheReadProvider; use scyllaconn::worker::ScyllaQueue; use std::sync::Arc; +use streams::collect::CollectResult; use streams::timebin::cached::reader::EventsReadProvider; use streams::timebin::CacheReadProvider; use tracing::Instrument; @@ -184,8 +187,23 @@ async fn binned_json_single( .instrument(span1) .await .map_err(|e| Error::BinnedStream(e))?; - let ret = response(StatusCode::OK).body(ToJsonBody::from(&item).into_body())?; - Ok(ret) + match item { + CollectResult::Some(item) => { + let ret = response(StatusCode::OK) + .header(CONTENT_TYPE, APP_JSON) + .header(HEADER_NAME_REQUEST_ID, ctx.reqid()) + .body(ToJsonBody::from(&item).into_body())?; + Ok(ret) + } + CollectResult::Timeout => { + let ret = error_status_response( + StatusCode::GATEWAY_TIMEOUT, + format!("no data within timeout"), + ctx.reqid(), + ); + Ok(ret) + } + } } async fn binned_json_framed( diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index 6c20a36..816a574 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -19,6 +19,7 @@ use http::StatusCode; use httpclient::body_empty; use httpclient::body_stream; use httpclient::error_response; +use httpclient::error_status_response; use httpclient::IntoBody; use httpclient::Requ; use httpclient::StreamBody; @@ -37,6 +38,7 @@ use netpod::HEADER_NAME_REQUEST_ID; use nodenet::client::OpenBoxedBytesViaHttp; use query::api4::events::PlainEventsQuery; use std::sync::Arc; +use streams::collect::CollectResult; use streams::instrument::InstrumentStream; use tracing::Instrument; @@ -238,12 +240,24 @@ async fn plain_events_json( return Err(e.into()); } }; - let ret = response(StatusCode::OK) - .header(CONTENT_TYPE, APP_JSON) - .header(HEADER_NAME_REQUEST_ID, ctx.reqid()) - .body(ToJsonBody::from(&item).into_body())?; - debug!("{self_name} response created"); - Ok(ret) + match item { + CollectResult::Some(item) => { + let ret = response(StatusCode::OK) + .header(CONTENT_TYPE, APP_JSON) + .header(HEADER_NAME_REQUEST_ID, ctx.reqid()) + .body(ToJsonBody::from(&item).into_body())?; + debug!("{self_name} response created"); + Ok(ret) + } + CollectResult::Timeout => { + let ret = error_status_response( + StatusCode::GATEWAY_TIMEOUT, + format!("no data within timeout"), + ctx.reqid(), + ); + Ok(ret) + } + } } fn bytes_chunks_to_framed(stream: S) -> impl Stream> diff --git a/crates/items_2/src/binsdim0.rs b/crates/items_2/src/binsdim0.rs index 35ed5a0..883be89 100644 --- a/crates/items_2/src/binsdim0.rs +++ b/crates/items_2/src/binsdim0.rs @@ -285,7 +285,7 @@ items_0::impl_range_overlap_info_bins!(BinsDim0); impl AppendEmptyBin for BinsDim0 { fn append_empty_bin(&mut self, ts1: u64, ts2: u64) { - error!("AppendEmptyBin::append_empty_bin should not get used"); + debug!("AppendEmptyBin::append_empty_bin should not get used"); self.ts1s.push_back(ts1); self.ts2s.push_back(ts2); self.cnts.push_back(0); @@ -298,7 +298,7 @@ impl AppendEmptyBin for BinsDim0 { impl AppendAllFrom for BinsDim0 { fn append_all_from(&mut self, src: &mut Self) { - error!("AppendAllFrom::append_all_from should not get used"); + debug!("AppendAllFrom::append_all_from should not get used"); self.ts1s.extend(src.ts1s.drain(..)); self.ts2s.extend(src.ts2s.drain(..)); self.cnts.extend(src.cnts.drain(..)); diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index 75b9d3d..ed0e65a 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -57,19 +57,19 @@ use std::fmt; use std::mem; #[allow(unused)] -macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } +macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } #[allow(unused)] -macro_rules! trace_ingest { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } +macro_rules! trace_ingest { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } #[allow(unused)] -macro_rules! trace_ingest_item { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } +macro_rules! trace_ingest_item { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } #[allow(unused)] -macro_rules! trace2 { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } +macro_rules! trace2 { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } #[allow(unused)] -macro_rules! trace_binning { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } +macro_rules! trace_binning { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } #[allow(unused)] macro_rules! debug_ingest { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } @@ -588,6 +588,11 @@ impl TimeAggregatorCommonV0Trait for EventsDim0Aggregator { } self.count += 1; self.last_ts = ts; + if let Some(minmaxlst) = self.minmaxlst.as_mut() { + minmaxlst.2 = val.clone(); + } else { + self.minmaxlst = Some((val.clone(), val.clone(), val.clone())); + } } } } diff --git a/crates/items_2/src/timebin.rs b/crates/items_2/src/timebin.rs index ecb5e6b..e2b157d 100644 --- a/crates/items_2/src/timebin.rs +++ b/crates/items_2/src/timebin.rs @@ -14,7 +14,10 @@ use std::ops::Range; macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } #[allow(unused)] -macro_rules! trace_ingest { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } +macro_rules! trace_ingest_item { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } + +#[allow(unused)] +macro_rules! trace_ingest_event { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } #[allow(unused)] macro_rules! trace_ingest_detail { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } @@ -41,7 +44,7 @@ impl TimeBinnerCommonV0Func { B: TimeBinnerCommonV0Trait, { let self_name = B::type_name(); - trace_ingest!( + trace_ingest_item!( "TimeBinner for {} ingest common_range_current {:?} item {:?}", self_name, binner.common_range_current(), @@ -56,20 +59,20 @@ impl TimeBinnerCommonV0Func { // Or consume the input data. loop { while item.starts_after(B::common_range_current(binner)) { - trace_ingest!("{self_name} ignore item and cycle starts_after"); + trace_ingest_item!("{self_name} ignore item and cycle starts_after"); TimeBinnerCommonV0Func::cycle(binner); if !B::common_has_more_range(binner) { debug!("{self_name} no more bin in edges after starts_after"); return; } } - // if item.ends_before(B::common_range_current(binner)) { - // trace_ingest_item!("{self_name} ignore item ends_before"); - // return; - // } + if item.ends_before(B::common_range_current(binner)) { + trace_ingest_item!("{self_name} ignore item ends_before"); + return; + } { if !B::common_has_more_range(binner) { - trace_ingest!("{self_name} no more bin in edges"); + trace_ingest_item!("{self_name} no more bin in edges"); return; } else { if let Some(item) = item @@ -78,10 +81,10 @@ impl TimeBinnerCommonV0Func { .downcast_mut::() { // TODO collect statistics associated with this request: - trace_ingest!("{self_name} FEED THE ITEM..."); + trace_ingest_item!("{self_name} FEED THE ITEM..."); TimeBinnerCommonV0Func::agg_ingest(binner, item); if item.ends_after(B::common_range_current(binner)) { - trace_ingest!( + trace_ingest_item!( "{self_name} FED ITEM, ENDS AFTER agg-range {:?}", B::common_range_current(binner) ); @@ -90,10 +93,10 @@ impl TimeBinnerCommonV0Func { warn!("{self_name} no more bin in edges after ingest and cycle"); return; } else { - trace_ingest!("{self_name} item fed, cycled, continue"); + trace_ingest_item!("{self_name} item fed, cycled, continue"); } } else { - trace_ingest!("{self_name} item fed, break"); + trace_ingest_item!("{self_name} item fed, break"); break; } } else { @@ -121,7 +124,7 @@ impl TimeBinnerCommonV0Func { B: TimeBinnerCommonV0Trait, { let self_name = B::type_name(); - trace_ingest!("{self_name}::push_in_progress push_empty {push_empty}"); + trace_ingest_item!("{self_name}::push_in_progress push_empty {push_empty}"); // TODO expand should be derived from AggKind. Is it still required after all? // TODO here, the expand means that agg will assume that the current value is kept constant during // the rest of the time range. @@ -145,7 +148,7 @@ impl TimeBinnerCommonV0Func { B: TimeBinnerCommonV0Trait, { let self_name = any::type_name::(); - trace_ingest!("{self_name}::cycle"); + trace_ingest_item!("{self_name}::cycle"); // TODO refactor this logic. let n = TimeBinnerCommonV0Trait::common_bins_ready_count(binner); TimeBinnerCommonV0Func::push_in_progress(binner, true); @@ -203,18 +206,19 @@ impl ChooseIndicesForTimeBinEvents { let mut k = tss.len(); for (i1, &ts) in tss.iter().enumerate() { if ts >= end { - trace_ingest!("{self_name} ingest {:6} {:20} AFTER", i1, ts); + trace_ingest_event!("{self_name} ingest {:6} {:20} AFTER", i1, ts); // TODO count all the ignored events for stats k = i1; break; } else if ts >= beg { - trace_ingest!("{self_name} ingest {:6} {:20} INSIDE", i1, ts); + trace_ingest_event!("{self_name} ingest {:6} {:20} INSIDE", i1, ts); } else { - trace_ingest!("{self_name} ingest {:6} {:20} BEFORE", i1, ts); + trace_ingest_event!("{self_name} ingest {:6} {:20} BEFORE", i1, ts); one_before = Some(i1); j = i1 + 1; } } + trace_ingest_item!("{self_name} chosen {one_before:?} {j:?} {k:?}"); (one_before, j, k) } } @@ -239,7 +243,7 @@ impl TimeAggregatorCommonV0Func { let self_name = B::type_name(); // TODO let items_seen = 777; - trace_ingest!( + trace_ingest_item!( "{self_name}::ingest_unweight item len {} items_seen {}", item.len(), items_seen @@ -266,7 +270,7 @@ impl TimeAggregatorCommonV0Func { let self_name = B::type_name(); // TODO let items_seen = 777; - trace_ingest!( + trace_ingest_item!( "{self_name}::ingest_time_weight item len {} items_seen {}", item.len(), items_seen diff --git a/crates/streams/src/collect.rs b/crates/streams/src/collect.rs index 857248c..22b4f05 100644 --- a/crates/streams/src/collect.rs +++ b/crates/streams/src/collect.rs @@ -41,6 +41,11 @@ macro_rules! trace4 { ($($arg:tt)*) => (eprintln!($($arg)*)); } +pub enum CollectResult { + Timeout, + Some(T), +} + pub struct Collect { inp: Pin>> + Send>>, events_max: u64, @@ -156,7 +161,7 @@ impl Collect { } impl Future for Collect { - type Output = Result, Error>; + type Output = Result>, Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { use Poll::*; @@ -177,14 +182,13 @@ impl Future for Collect { Some(mut coll) => match coll.result(self.range.clone(), self.binrange.clone()) { Ok(res) => { //info!("collect stats total duration: {:?}", total_duration); - Ready(Ok(res)) + Ready(Ok(CollectResult::Some(res))) } Err(e) => Ready(Err(e)), }, None => { - let e = Error::with_msg_no_trace(format!("no result because no collector was created")); - error!("{e}"); - Ready(Err(e)) + debug!("no result because no collector was created"); + Ready(Ok(CollectResult::Timeout)) } } } else { @@ -245,7 +249,7 @@ where info!("collect_in_span call set_timed_out"); coll.set_timed_out(); } else { - warn!("collect timeout but no collector yet"); + warn!("collect_in_span collect timeout but no collector yet"); } break; } @@ -258,11 +262,11 @@ where if let Some(coll) = collector.as_mut() { coll.set_range_complete(); } else { - warn!("collect received RangeComplete but no collector yet"); + warn!("collect_in_span received RangeComplete but no collector yet"); } } RangeCompletableItem::Data(mut item) => { - trace!("collect sees len {}", item.len()); + trace!("collect_in_span sees len {}", item.len()); if collector.is_none() { let c = item.new_collector(); collector = Some(c); @@ -278,10 +282,10 @@ where } }, StreamItem::Log(item) => { - trace!("collect log {:?}", item); + trace!("collect_in_span log {:?}", item); } StreamItem::Stats(item) => { - trace!("collect stats {:?}", item); + trace!("collect_in_span stats {:?}", item); match item { // TODO factor and simplify the stats collection: StatsItem::EventDataReadStats(_) => {} @@ -313,9 +317,9 @@ where let _ = range_complete; let _ = timed_out; let res = collector - .ok_or_else(|| Error::with_msg_no_trace(format!("no result because no collector was created")))? + .ok_or_else(|| Error::with_msg_no_trace(format!("no result, no collector created")))? .result(range, binrange)?; - info!("collect stats total duration: {:?}", total_duration); + info!("collect_in_span stats total duration: {:?}", total_duration); Ok(res) } diff --git a/crates/streams/src/plaineventsjson.rs b/crates/streams/src/plaineventsjson.rs index f2d2a07..d14fdd6 100644 --- a/crates/streams/src/plaineventsjson.rs +++ b/crates/streams/src/plaineventsjson.rs @@ -1,4 +1,5 @@ use crate::collect::Collect; +use crate::collect::CollectResult; use crate::firsterr::non_empty; use crate::firsterr::only_first_err; use crate::json_stream::events_stream_to_json_stream; @@ -34,7 +35,7 @@ pub async fn plain_events_json( ctx: &ReqCtx, _cluster: &Cluster, open_bytes: OpenBoxedBytesStreamsBox, -) -> Result { +) -> Result, Error> { debug!("plain_events_json evquery {:?}", evq); let deadline = Instant::now() + evq.timeout().unwrap_or(Duration::from_millis(4000)); @@ -93,9 +94,14 @@ pub async fn plain_events_json( .await .map_err(Error::Collect)?; debug!("plain_events_json collected"); - let jsval = serde_json::to_value(&collected)?; - debug!("plain_events_json json serialized"); - Ok(jsval) + if let CollectResult::Some(x) = collected { + let jsval = serde_json::to_value(&x)?; + debug!("plain_events_json json serialized"); + Ok(CollectResult::Some(jsval)) + } else { + debug!("plain_events_json timeout"); + Ok(CollectResult::Timeout) + } } pub async fn plain_events_json_stream( diff --git a/crates/streams/src/test/collect.rs b/crates/streams/src/test/collect.rs index a55658c..6edf1bb 100644 --- a/crates/streams/src/test/collect.rs +++ b/crates/streams/src/test/collect.rs @@ -1,4 +1,5 @@ use crate::collect::Collect; +use crate::collect::CollectResult; use crate::test::runfut; use crate::transform::build_event_transform; use crate::transform::build_time_binning_transform; @@ -11,8 +12,6 @@ use items_0::on_sitemty_data; use items_0::streamitem::sitem_data; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::StreamItem; -use items_0::transform::EventStreamBox; -use items_0::transform::EventStreamTrait; use items_0::WithLen; use items_2::eventsdim0::EventsDim0CollectorOutput; use items_2::streams::PlainEventStream; @@ -69,14 +68,18 @@ fn collect_channel_events_01() -> Result<(), Error> { let stream = TimeBinnableToCollectable::new(stream); let stream = Box::pin(stream); let res = Collect::new(stream, deadline, events_max, bytes_max, None, None).await?; - if let Some(res) = res.as_any_ref().downcast_ref::>() { - eprintln!("Great, a match"); - eprintln!("{res:?}"); - assert_eq!(res.len(), 40); + if let CollectResult::Some(res) = res { + if let Some(res) = res.as_any_ref().downcast_ref::>() { + eprintln!("Great, a match"); + eprintln!("{res:?}"); + assert_eq!(res.len(), 40); + } else { + return Err(Error::with_msg(format!("bad type of collected result"))); + } + Ok(()) } else { return Err(Error::with_msg(format!("bad type of collected result"))); } - Ok(()) }; runfut(fut) } @@ -110,14 +113,18 @@ fn collect_channel_events_pulse_id_diff() -> Result<(), Error> { let stream = TimeBinnableToCollectable::new(stream); let stream = Box::pin(stream); let res = Collect::new(stream, deadline, events_max, bytes_max, None, None).await?; - if let Some(res) = res.as_any_ref().downcast_ref::>() { - eprintln!("Great, a match"); - eprintln!("{res:?}"); - assert_eq!(res.len(), 40); + if let CollectResult::Some(res) = res { + if let Some(res) = res.as_any_ref().downcast_ref::>() { + eprintln!("Great, a match"); + eprintln!("{res:?}"); + assert_eq!(res.len(), 40); + } else { + return Err(Error::with_msg(format!("bad type of collected result"))); + } + Ok(()) } else { return Err(Error::with_msg(format!("bad type of collected result"))); } - Ok(()) }; runfut(fut) } diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index 2cfe4bb..ae3bb45 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -1,4 +1,5 @@ use crate::collect::Collect; +use crate::collect::CollectResult; use crate::json_stream::JsonBytes; use crate::json_stream::JsonStream; use crate::rangefilter2::RangeFilter2; @@ -316,7 +317,7 @@ async fn timebinned_stream( events_read_provider: Option>, ) -> Result>> + Send>>, Error> { use netpod::query::CacheUsage; - let cache_usage = query.cache_usage().unwrap_or(CacheUsage::Use); + let cache_usage = query.cache_usage().unwrap_or(CacheUsage::V0NoCache); match ( ch_conf.series(), cache_usage.clone(), @@ -413,7 +414,7 @@ pub async fn timebinned_json( open_bytes: OpenBoxedBytesStreamsBox, cache_read_provider: Option>, events_read_provider: Option>, -) -> Result { +) -> Result, Error> { let deadline = Instant::now() + query .timeout_content() @@ -438,19 +439,24 @@ pub async fn timebinned_json( let collected = Collect::new(stream, deadline, collect_max, bytes_max, None, Some(binned_range)); let collected: BoxFuture<_> = Box::pin(collected); let collres = collected.await?; - info!("timebinned_json collected type_name {:?}", collres.type_name()); - let collres = if let Some(bins) = collres - .as_any_ref() - .downcast_ref::>() - { - warn!("unexpected binned enum"); - // bins.boxed_collected_with_enum_fix() - collres - } else { - collres - }; - let jsval = serde_json::to_value(&collres)?; - Ok(jsval) + match collres { + CollectResult::Some(collres) => { + let collres = if let Some(bins) = collres + .as_any_ref() + .downcast_ref::>() + { + debug!("unexpected binned enum"); + // bins.boxed_collected_with_enum_fix() + collres + } else { + debug!("timebinned_json collected type_name {:?}", collres.type_name()); + collres + }; + let jsval = serde_json::to_value(&collres)?; + Ok(CollectResult::Some(jsval)) + } + CollectResult::Timeout => Ok(CollectResult::Timeout), + } } fn take_collector_result(coll: &mut Box) -> Option {