diff --git a/src/timebinnedjson.rs b/src/timebinnedjson.rs index 4cfd985..4a3c8f3 100644 --- a/src/timebinnedjson.rs +++ b/src/timebinnedjson.rs @@ -400,41 +400,12 @@ fn take_collector_result_cbor( } } -pub async fn timebinned_json_framed( - query: BinnedQuery, - ch_conf: ChannelTypeConfigGen, - ctx: &ReqCtx, - cache_read_provider: Arc, - events_read_provider: Arc, - timeout_provider: Box, -) -> Result { - let binned_range = query.covering_range()?; - // TODO derive better values, from query - let stream = timebinned_stream( - query.clone(), - binned_range.clone(), - ch_conf, - ctx, - cache_read_provider, - events_read_provider, - ) - .await?; - // let stream = timebinned_to_collectable(stream); - // TODO create a custom Stream adapter. - // Want to timeout only on data items: the user wants to wait for bins only a maximum time. - // But also, I want to coalesce. - let timeout_content_base = query - .timeout_content() - .unwrap_or(Duration::from_millis(1000)) - .min(Duration::from_millis(5000)) - .max(Duration::from_millis(100)); - let timeout_content_2 = timeout_content_base * 2 / 3; +pub fn timeoutable_collectable_stream_to_json_bytes( + stream: Pin>>>> + Send>>, + timeout_content_2: Duration, +) -> Pin> + Send>> { let mut coll = None; let mut last_emit = Instant::now(); - let stream = stream - .map(|x| Some(x)) - .chain(futures_util::stream::iter([None])); - let stream = TimeoutableStream::new(timeout_content_base, timeout_provider, stream); let stream = stream.map(move |x| { match x { Some(x) => match x { @@ -455,8 +426,19 @@ pub async fn timebinned_json_framed( RangeCompletableItem::RangeComplete => None, }, StreamItem::Log(x) => { - debug!("{x:?}"); - // Some(serde_json::Value::String(format!("{x:?}"))) + if x.level == Level::ERROR { + error!("{}", x.msg); + } else if x.level == Level::WARN { + warn!("{}", x.msg); + } else if x.level == Level::INFO { + info!("{}", x.msg); + } else if x.level == Level::DEBUG { + debug!("{}", x.msg); + } else if x.level == Level::TRACE { + trace!("{}", x.msg); + } else { + trace!("{}", x.msg); + } None } StreamItem::Stats(x) => { @@ -497,7 +479,47 @@ pub async fn timebinned_json_framed( }); let stream = stream.filter_map(|x| futures_util::future::ready(x)); let stream = stream.map_err(|e| crate::json_stream::Error::Msg(e.to_string())); - Ok(Box::pin(stream)) + let stream: Pin> + Send>> = + Box::pin(stream); + stream +} + +pub async fn timebinned_json_framed( + query: BinnedQuery, + ch_conf: ChannelTypeConfigGen, + ctx: &ReqCtx, + cache_read_provider: Arc, + events_read_provider: Arc, + timeout_provider: Box, +) -> Result { + let binned_range = query.covering_range()?; + // TODO derive better values, from query + let stream = timebinned_stream( + query.clone(), + binned_range.clone(), + ch_conf, + ctx, + cache_read_provider, + events_read_provider, + ) + .await?; + // let stream = timebinned_to_collectable(stream); + // TODO create a custom Stream adapter. + // Want to timeout only on data items: the user wants to wait for bins only a maximum time. + // But also, I want to coalesce. + let timeout_content_base = query + .timeout_content() + .unwrap_or(Duration::from_millis(1000)) + .min(Duration::from_millis(5000)) + .max(Duration::from_millis(100)); + let timeout_content_2 = timeout_content_base * 2 / 3; + let stream = stream + .map(|x| Some(x)) + .chain(futures_util::stream::iter([None])); + let stream = TimeoutableStream::new(timeout_content_base, timeout_provider, stream); + let stream = Box::pin(stream); + let stream = timeoutable_collectable_stream_to_json_bytes(stream, timeout_content_2); + Ok(stream) } pub async fn timebinned_cbor_framed(