Rebin to add zeros

This commit is contained in:
Dominik Werder
2025-06-06 14:59:14 +02:00
parent 5a64613727
commit 17c24d0c15

View File

@@ -57,10 +57,10 @@ use tracing::Instrument;
use tracing::Span;
use url::Url;
macro_rules! error { ($($arg:expr),*) => ( if true { log::error!($($arg),*); } ); }
macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ); }
macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); }
macro_rules! trace { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ); }
macro_rules! error { ($($arg:tt)*) => ( if true { log::error!($($arg)*); } ); }
macro_rules! info { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ); }
macro_rules! debug { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ); }
macro_rules! trace { ($($arg:tt)*) => ( if true { log::trace!($($arg)*); } ); }
autoerr::create_error_v1!(
name(Error, "Api4BinnedV2"),
@@ -264,15 +264,19 @@ async fn binned_json_framed(
// let msg = format!("{}", res2.url.as_str());
// let stream = futures_util::stream::iter([Ok(msg)]).chain(stream);
// Box::pin(stream) as Pin<Box<dyn Stream<Item = _> + Send>>
todo!()
todo!("testpart=read_all_coarse disabled")
} else if res2.url.as_str().contains("testpart=frombinned") {
let binrange = res2
.query
.covering_range()?
.binned_range_time()
.ok_or_else(|| Error::BadRange)?;
let stream =
scyllaconn::binned2::frombinned::FromBinned::new(series, binrange, scyqueue, res2.cache_read_provider);
let stream = scyllaconn::binned2::frombinned::FromBinned::new(
series,
binrange.clone(),
scyqueue,
res2.cache_read_provider,
);
let stream = stream.map_err(Error::from);
// let msg = format!("{}", res2.url.as_str());
// let stream = futures_util::stream::iter([Ok(msg)]).chain(stream);
@@ -280,6 +284,28 @@ async fn binned_json_framed(
//
x
});
// use items_2::binning::timeweight::timeweight_bins::BinnedBinsTimeweight;
use items_0::streamitem::StreamItem;
use items_2::binning::timeweight::timeweight_bins_lazy::BinnedBinsTimeweightLazy;
let mut rebinner = BinnedBinsTimeweightLazy::new(binrange).set_cnt_zero();
let stream = stream.filter_map(move |x| {
let ret = match x {
Ok(StreamItem::DataItem(x)) => match rebinner.ingest(&x) {
Ok(()) => match rebinner.output() {
Ok(Some(x)) => Some(Ok(items_0::streamitem::StreamItem::DataItem(x))),
Ok(None) => None,
Err(e) => Some(Err(e)),
},
Err(e) => Some(Err(e)),
},
Ok(StreamItem::Log(x)) => Some(Ok(StreamItem::Log(x))),
Ok(StreamItem::Stats(x)) => Some(Ok(StreamItem::Stats(x))),
Err(e) => Some(Err(e.into())),
};
futures_util::future::ready(ret)
});
let stream = stream.map(|item| {
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::StreamItem;