This commit is contained in:
Dominik Werder
2025-02-17 13:36:48 +01:00
parent b296206668
commit bf2a218f7b
10 changed files with 43 additions and 89 deletions

View File

@@ -32,6 +32,7 @@ query = { path = "../daqbuf-query", package = "daqbuf-query" }
items_0 = { path = "../daqbuf-items-0", package = "daqbuf-items-0" }
items_2 = { path = "../daqbuf-items-2", package = "daqbuf-items-2" }
parse = { path = "../daqbuf-parse", package = "daqbuf-parse" }
series = { path = "../daqbuf-series", package = "daqbuf-series" }
[dev-dependencies]
tokio = { version = "1", features = ["rt"] }

View File

@@ -97,26 +97,26 @@ where
StreamItem::Log(item) => {
match item.level {
Level::TRACE => {
trace!("{item:?}");
trace!("{:?}", item);
}
Level::DEBUG => {
debug!("{item:?}");
debug!("{:?}", item);
}
Level::INFO => {
info!("{item:?}");
info!("{:?}", item);
}
Level::WARN => {
warn!("{item:?}");
warn!("{:?}", item);
}
Level::ERROR => {
error!("{item:?}");
error!("{:?}", item);
}
}
let item = CborBytes::new(Bytes::new());
Ok(item)
}
StreamItem::Stats(item) => {
info!("{item:?}");
debug!("{:?}", item);
let item = CborBytes::new(Bytes::new());
Ok(item)
}

View File

@@ -4,7 +4,6 @@ use crate::timebin::cached::reader::CacheReadProvider;
use crate::timebin::cached::reader::CacheReading;
use crate::timebin::cached::reader::EventsReadProvider;
use crate::timebin::cached::reader::EventsReading;
use crate::timebin::cached::reader::PrebinnedPartitioning;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
@@ -18,6 +17,7 @@ use netpod::DtMs;
use netpod::ReqCtx;
use netpod::TsNano;
use query::api4::events::EventsSubQuery;
use series::msp::PrebinnedPartitioning;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;

View File

@@ -27,7 +27,9 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let _spg = this.span.enter();
this.inp.poll_next_unpin(cx)
let spg = this.span.enter();
let ret = this.inp.poll_next_unpin(cx);
drop(spg);
ret
}
}

View File

@@ -20,17 +20,18 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
macro_rules! trace_inp { ($det:expr, $($arg:tt)*) => ( if false && $det { trace!($($arg)*); } ) }
macro_rules! trace_inp { ($det:expr, $($arg:expr),*) => ( if false && $det { trace!($($arg),*); } ) }
macro_rules! trace_init { ($det:expr, $($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_init { ($det:expr, $($arg:expr),*) => ( if false { trace!($($arg),*); } ) }
macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_emit { ($det:expr, $($arg:expr),*) => ( if false { trace!($($arg),*); } ) }
#[derive(Debug, thiserror::Error)]
#[cstm(name = "Rangefilter")]
pub enum Error {
DrainUnclean,
}
autoerr::create_error_v1!(
name(Error, "Rangefilter"),
enum variants {
DrainUnclean,
},
);
pub struct RangeFilter2<INP, ITY>
where
@@ -167,7 +168,7 @@ where
}
None => {
// TODO keep stats about this case
trace_emit!(self.trdet, "drain into to keep one before",);
trace_emit!(self.trdet, "drain into to keep one before");
let n = item.len();
match item.drain_into_new(n.max(1) - 1..n) {
DrainIntoNewResult::Done(keep) => {

View File

@@ -10,6 +10,7 @@ use netpod::BinnedRange;
use netpod::DtMs;
use netpod::TsNano;
use query::api4::events::EventsSubQuery;
use series::msp::PrebinnedPartitioning;
use std::future::Future;
use std::ops::Range;
use std::pin::Pin;
@@ -24,7 +25,7 @@ autoerr::create_error_v1!(
ChannelSend,
ChannelRecv,
Scylla(String),
PrebinnedPartitioningInvalid,
PrebinnedPartitioningInvalid(#[from] series::msp::Error),
},
);
@@ -34,67 +35,6 @@ pub type BinsReadResErr = streams::timebin::cached::reader::Error;
pub type BinsReadRes = Result<Option<BinsBoxed>, BinsReadResErr>;
pub type BinsReadFutBoxed = Pin<Box<dyn Future<Output = BinsReadRes> + Send>>;
pub enum PrebinnedPartitioning {
Sec1,
Sec10,
Min1,
Min10,
Hour1,
}
impl PrebinnedPartitioning {
pub fn bin_len(&self) -> DtMs {
use PrebinnedPartitioning::*;
match self {
Sec1 => DtMs::from_ms_u64(1000 * 1),
Sec10 => DtMs::from_ms_u64(1000 * 10),
Min1 => DtMs::from_ms_u64(1000 * 60 * 1),
Min10 => DtMs::from_ms_u64(1000 * 60 * 10),
Hour1 => DtMs::from_ms_u64(1000 * 60 * 60 * 1),
}
}
pub fn msp_div(&self) -> DtMs {
use PrebinnedPartitioning::*;
match self {
Sec1 => DtMs::from_ms_u64(1000 * 60 * 10),
Sec10 => DtMs::from_ms_u64(1000 * 60 * 60 * 2),
Min1 => DtMs::from_ms_u64(1000 * 60 * 60 * 8),
Min10 => DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 4),
Hour1 => DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 28),
}
}
pub fn off_max(&self) -> u32 {
self.msp_div().ms() as u32 / self.bin_len().ms() as u32
}
pub fn clamp_off(&self, off: u32) -> u32 {
self.off_max().min(off)
}
}
impl TryFrom<DtMs> for PrebinnedPartitioning {
type Error = Error;
fn try_from(value: DtMs) -> Result<Self, Self::Error> {
use PrebinnedPartitioning::*;
if value == DtMs::from_ms_u64(1000) {
Ok(Sec1)
} else if value == DtMs::from_ms_u64(1000 * 10) {
Ok(Sec10)
} else if value == DtMs::from_ms_u64(1000 * 60) {
Ok(Min1)
} else if value == DtMs::from_ms_u64(1000 * 60 * 10) {
Ok(Min10)
} else if value == DtMs::from_ms_u64(1000 * 60 * 60) {
Ok(Hour1)
} else {
Err(Error::PrebinnedPartitioningInvalid)
}
}
}
pub struct EventsReading {
stream: Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>,
}

View File

@@ -14,7 +14,7 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_emit { ($($arg:expr),*) => ( if true { trace!($($arg),*); } ) }
autoerr::create_error_v1!(
name(Error, "ReadingBinnedFromEvents"),
@@ -35,6 +35,7 @@ impl BinnedFromEvents {
do_time_weight: bool,
read_provider: Arc<dyn EventsReadProvider>,
) -> Result<Self, Error> {
trace_emit!("new");
if !evq.range().is_time() {
return Err(Error::ExpectTimerange);
}

View File

@@ -10,6 +10,7 @@ use futures_util::StreamExt;
use items_0::streamitem::Sitemty;
use items_0::timebin::BinsBoxed;
use items_2::binning::timeweight::timeweight_bins_stream::BinnedBinsTimeweightStream;
use netpod::query::CacheUsage;
use netpod::range::evrange::SeriesRange;
use netpod::BinnedRange;
use netpod::ChannelTypeConfigGen;
@@ -50,6 +51,7 @@ impl TimeBinnedFromLayers {
pub fn new(
ch_conf: ChannelTypeConfigGen,
binning_opts: BinningOptions,
cache_usage: CacheUsage,
transform_query: TransformQuery,
sub: EventsSubQuerySettings,
log_level: String,
@@ -69,7 +71,7 @@ impl TimeBinnedFromLayers {
binning_opts
);
let bin_len = DtMs::from_ms_u64(range.bin_len.ms());
if bin_len_layers.contains(&bin_len) {
if cache_usage.is_cache_read() && bin_len_layers.contains(&bin_len) {
trace_init!("{}::new bin_len in layers {:?}", Self::type_name(), range);
let inp = GapFill::new(
"FromLayers-ongrid".into(),
@@ -93,7 +95,12 @@ impl TimeBinnedFromLayers {
Self::type_name(),
range
);
match find_next_finer_bin_len(bin_len, &bin_len_layers) {
let x = if cache_usage.is_cache_read() {
find_next_finer_bin_len(bin_len, &bin_len_layers)
} else {
None
};
match x {
Some(finer) => {
if bin_len.ms() % finer.ms() != 0 {
return Err(Error::FinerGridMismatch(bin_len, finer));
@@ -153,6 +160,7 @@ impl TimeBinnedFromLayers {
let inp = futures_util::stream::iter([]);
let ret = Self { inp: Box::pin(inp) };
trace_init!("{}::new setup nothing", Self::type_name());
info!("bin from events disabled on user request");
Ok(ret)
}
}

View File

@@ -12,7 +12,7 @@ pub struct BinningOptions {
impl BinningOptions {
pub fn default() -> Self {
Self {
cache_usage: CacheUsage::Read,
cache_usage: CacheUsage::default(),
allow_from_events: true,
allow_from_prebinned: true,
allow_rebin: true,
@@ -21,7 +21,7 @@ impl BinningOptions {
pub fn testing_no_events() -> Self {
Self {
cache_usage: CacheUsage::Read,
cache_usage: CacheUsage::default(),
allow_from_events: false,
allow_from_prebinned: true,
allow_rebin: true,
@@ -47,7 +47,7 @@ impl BinningOptions {
impl From<&BinnedQuery> for BinningOptions {
fn from(value: &BinnedQuery) -> Self {
let cache_usage = value.cache_usage().unwrap_or(CacheUsage::Ignore);
let cache_usage = value.cache_usage().unwrap_or(CacheUsage::default());
Self {
cache_usage,
allow_from_events: value.allow_from_events().unwrap_or(true),

View File

@@ -280,6 +280,7 @@ async fn timebinned_stream(
let stream = crate::timebin::fromlayers::TimeBinnedFromLayers::new(
ch_conf,
(&query).into(),
query.cache_usage().unwrap_or(Default::default()),
query.transform().clone(),
EventsSubQuerySettings::from(&query),
query.log_level().into(),
@@ -291,7 +292,6 @@ async fn timebinned_stream(
events_read_provider,
)?;
let stream = stream.map(|item| {
// use items_0::timebin::BinningggContainerBinsDyn;
use items_0::timebin::BinsBoxed;
on_sitemty_data!(item, |mut x: BinsBoxed| {
x.fix_numerics();
@@ -541,10 +541,11 @@ pub async fn timebinned_cbor_framed(
Ok(x) => match x {
StreamItem::DataItem(x) => match x {
RangeCompletableItem::Data(mut item) => {
let tsnow = Instant::now();
let coll = coll.get_or_insert_with(|| item.new_collector());
coll.ingest(&mut item);
if coll.len() >= 128 || last_emit.elapsed() >= timeout_content_2 {
last_emit = Instant::now();
if coll.len() >= 128 || tsnow >= last_emit + timeout_content_2 {
last_emit = tsnow;
take_collector_result_cbor(coll).map(|x| Ok(x))
} else {
None