From 17c24d0c153fa4c5d444fc7f1421a27629a82123 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 6 Jun 2025 14:59:14 +0200 Subject: [PATCH] Rebin to add zeros --- crates/httpret/src/api4/binned_v2.rs | 40 +++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/crates/httpret/src/api4/binned_v2.rs b/crates/httpret/src/api4/binned_v2.rs index 701beb1..656ae93 100644 --- a/crates/httpret/src/api4/binned_v2.rs +++ b/crates/httpret/src/api4/binned_v2.rs @@ -57,10 +57,10 @@ use tracing::Instrument; use tracing::Span; use url::Url; -macro_rules! error { ($($arg:expr),*) => ( if true { log::error!($($arg),*); } ); } -macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ); } -macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); } -macro_rules! trace { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ); } +macro_rules! error { ($($arg:tt)*) => ( if true { log::error!($($arg)*); } ); } +macro_rules! info { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ); } +macro_rules! debug { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ); } +macro_rules! trace { ($($arg:tt)*) => ( if true { log::trace!($($arg)*); } ); } autoerr::create_error_v1!( name(Error, "Api4BinnedV2"), @@ -264,15 +264,19 @@ async fn binned_json_framed( // let msg = format!("{}", res2.url.as_str()); // let stream = futures_util::stream::iter([Ok(msg)]).chain(stream); // Box::pin(stream) as Pin + Send>> - todo!() + todo!("testpart=read_all_coarse disabled") } else if res2.url.as_str().contains("testpart=frombinned") { let binrange = res2 .query .covering_range()? .binned_range_time() .ok_or_else(|| Error::BadRange)?; - let stream = - scyllaconn::binned2::frombinned::FromBinned::new(series, binrange, scyqueue, res2.cache_read_provider); + let stream = scyllaconn::binned2::frombinned::FromBinned::new( + series, + binrange.clone(), + scyqueue, + res2.cache_read_provider, + ); let stream = stream.map_err(Error::from); // let msg = format!("{}", res2.url.as_str()); // let stream = futures_util::stream::iter([Ok(msg)]).chain(stream); @@ -280,6 +284,28 @@ async fn binned_json_framed( // x }); + + // use items_2::binning::timeweight::timeweight_bins::BinnedBinsTimeweight; + use items_0::streamitem::StreamItem; + use items_2::binning::timeweight::timeweight_bins_lazy::BinnedBinsTimeweightLazy; + let mut rebinner = BinnedBinsTimeweightLazy::new(binrange).set_cnt_zero(); + let stream = stream.filter_map(move |x| { + let ret = match x { + Ok(StreamItem::DataItem(x)) => match rebinner.ingest(&x) { + Ok(()) => match rebinner.output() { + Ok(Some(x)) => Some(Ok(items_0::streamitem::StreamItem::DataItem(x))), + Ok(None) => None, + Err(e) => Some(Err(e)), + }, + Err(e) => Some(Err(e)), + }, + Ok(StreamItem::Log(x)) => Some(Ok(StreamItem::Log(x))), + Ok(StreamItem::Stats(x)) => Some(Ok(StreamItem::Stats(x))), + Err(e) => Some(Err(e.into())), + }; + futures_util::future::ready(ret) + }); + let stream = stream.map(|item| { use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::StreamItem;