From bf2a218f7b1d25a839a8a3e135c361a3f7dd45ad Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 17 Feb 2025 13:36:48 +0100 Subject: [PATCH] Refactor --- Cargo.toml | 1 + src/cbor_stream.rs | 12 +++---- src/eventsplainreader.rs | 2 +- src/instrument.rs | 6 ++-- src/rangefilter2.rs | 19 ++++++----- src/timebin/cached/reader.rs | 64 ++---------------------------------- src/timebin/fromevents.rs | 3 +- src/timebin/fromlayers.rs | 12 +++++-- src/timebin/opts.rs | 6 ++-- src/timebinnedjson.rs | 7 ++-- 10 files changed, 43 insertions(+), 89 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7dd8692..5dc344e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ query = { path = "../daqbuf-query", package = "daqbuf-query" } items_0 = { path = "../daqbuf-items-0", package = "daqbuf-items-0" } items_2 = { path = "../daqbuf-items-2", package = "daqbuf-items-2" } parse = { path = "../daqbuf-parse", package = "daqbuf-parse" } +series = { path = "../daqbuf-series", package = "daqbuf-series" } [dev-dependencies] tokio = { version = "1", features = ["rt"] } diff --git a/src/cbor_stream.rs b/src/cbor_stream.rs index e0f8fc2..4aa630a 100644 --- a/src/cbor_stream.rs +++ b/src/cbor_stream.rs @@ -97,26 +97,26 @@ where StreamItem::Log(item) => { match item.level { Level::TRACE => { - trace!("{item:?}"); + trace!("{:?}", item); } Level::DEBUG => { - debug!("{item:?}"); + debug!("{:?}", item); } Level::INFO => { - info!("{item:?}"); + info!("{:?}", item); } Level::WARN => { - warn!("{item:?}"); + warn!("{:?}", item); } Level::ERROR => { - error!("{item:?}"); + error!("{:?}", item); } } let item = CborBytes::new(Bytes::new()); Ok(item) } StreamItem::Stats(item) => { - info!("{item:?}"); + debug!("{:?}", item); let item = CborBytes::new(Bytes::new()); Ok(item) } diff --git a/src/eventsplainreader.rs b/src/eventsplainreader.rs index 8cc2fb1..82bf7ab 100644 --- a/src/eventsplainreader.rs +++ b/src/eventsplainreader.rs @@ -4,7 +4,6 @@ use crate::timebin::cached::reader::CacheReadProvider; use crate::timebin::cached::reader::CacheReading; use crate::timebin::cached::reader::EventsReadProvider; use crate::timebin::cached::reader::EventsReading; -use crate::timebin::cached::reader::PrebinnedPartitioning; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; @@ -18,6 +17,7 @@ use netpod::DtMs; use netpod::ReqCtx; use netpod::TsNano; use query::api4::events::EventsSubQuery; +use series::msp::PrebinnedPartitioning; use std::ops::Range; use std::pin::Pin; use std::sync::Arc; diff --git a/src/instrument.rs b/src/instrument.rs index 28c7bf6..4f99f61 100644 --- a/src/instrument.rs +++ b/src/instrument.rs @@ -27,7 +27,9 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut this = self.project(); - let _spg = this.span.enter(); - this.inp.poll_next_unpin(cx) + let spg = this.span.enter(); + let ret = this.inp.poll_next_unpin(cx); + drop(spg); + ret } } diff --git a/src/rangefilter2.rs b/src/rangefilter2.rs index 3dfc077..fcd9a0c 100644 --- a/src/rangefilter2.rs +++ b/src/rangefilter2.rs @@ -20,17 +20,18 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; -macro_rules! trace_inp { ($det:expr, $($arg:tt)*) => ( if false && $det { trace!($($arg)*); } ) } +macro_rules! trace_inp { ($det:expr, $($arg:expr),*) => ( if false && $det { trace!($($arg),*); } ) } -macro_rules! trace_init { ($det:expr, $($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +macro_rules! trace_init { ($det:expr, $($arg:expr),*) => ( if false { trace!($($arg),*); } ) } -macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +macro_rules! trace_emit { ($det:expr, $($arg:expr),*) => ( if false { trace!($($arg),*); } ) } -#[derive(Debug, thiserror::Error)] -#[cstm(name = "Rangefilter")] -pub enum Error { - DrainUnclean, -} +autoerr::create_error_v1!( + name(Error, "Rangefilter"), + enum variants { + DrainUnclean, + }, +); pub struct RangeFilter2 where @@ -167,7 +168,7 @@ where } None => { // TODO keep stats about this case - trace_emit!(self.trdet, "drain into to keep one before",); + trace_emit!(self.trdet, "drain into to keep one before"); let n = item.len(); match item.drain_into_new(n.max(1) - 1..n) { DrainIntoNewResult::Done(keep) => { diff --git a/src/timebin/cached/reader.rs b/src/timebin/cached/reader.rs index 17359b6..e646d15 100644 --- a/src/timebin/cached/reader.rs +++ b/src/timebin/cached/reader.rs @@ -10,6 +10,7 @@ use netpod::BinnedRange; use netpod::DtMs; use netpod::TsNano; use query::api4::events::EventsSubQuery; +use series::msp::PrebinnedPartitioning; use std::future::Future; use std::ops::Range; use std::pin::Pin; @@ -24,7 +25,7 @@ autoerr::create_error_v1!( ChannelSend, ChannelRecv, Scylla(String), - PrebinnedPartitioningInvalid, + PrebinnedPartitioningInvalid(#[from] series::msp::Error), }, ); @@ -34,67 +35,6 @@ pub type BinsReadResErr = streams::timebin::cached::reader::Error; pub type BinsReadRes = Result, BinsReadResErr>; pub type BinsReadFutBoxed = Pin + Send>>; -pub enum PrebinnedPartitioning { - Sec1, - Sec10, - Min1, - Min10, - Hour1, -} - -impl PrebinnedPartitioning { - pub fn bin_len(&self) -> DtMs { - use PrebinnedPartitioning::*; - match self { - Sec1 => DtMs::from_ms_u64(1000 * 1), - Sec10 => DtMs::from_ms_u64(1000 * 10), - Min1 => DtMs::from_ms_u64(1000 * 60 * 1), - Min10 => DtMs::from_ms_u64(1000 * 60 * 10), - Hour1 => DtMs::from_ms_u64(1000 * 60 * 60 * 1), - } - } - - pub fn msp_div(&self) -> DtMs { - use PrebinnedPartitioning::*; - match self { - Sec1 => DtMs::from_ms_u64(1000 * 60 * 10), - Sec10 => DtMs::from_ms_u64(1000 * 60 * 60 * 2), - Min1 => DtMs::from_ms_u64(1000 * 60 * 60 * 8), - Min10 => DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 4), - Hour1 => DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 28), - } - } - - pub fn off_max(&self) -> u32 { - self.msp_div().ms() as u32 / self.bin_len().ms() as u32 - } - - pub fn clamp_off(&self, off: u32) -> u32 { - self.off_max().min(off) - } -} - -impl TryFrom for PrebinnedPartitioning { - type Error = Error; - - fn try_from(value: DtMs) -> Result { - use PrebinnedPartitioning::*; - if value == DtMs::from_ms_u64(1000) { - Ok(Sec1) - } else if value == DtMs::from_ms_u64(1000 * 10) { - Ok(Sec10) - } else if value == DtMs::from_ms_u64(1000 * 60) { - Ok(Min1) - } else if value == DtMs::from_ms_u64(1000 * 60 * 10) { - Ok(Min10) - } else if value == DtMs::from_ms_u64(1000 * 60 * 60) { - Ok(Hour1) - } else { - Err(Error::PrebinnedPartitioningInvalid) - } - } -} - pub struct EventsReading { stream: Pin> + Send>>, } diff --git a/src/timebin/fromevents.rs b/src/timebin/fromevents.rs index d506345..8821bf0 100644 --- a/src/timebin/fromevents.rs +++ b/src/timebin/fromevents.rs @@ -14,7 +14,7 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; -macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +macro_rules! trace_emit { ($($arg:expr),*) => ( if true { trace!($($arg),*); } ) } autoerr::create_error_v1!( name(Error, "ReadingBinnedFromEvents"), @@ -35,6 +35,7 @@ impl BinnedFromEvents { do_time_weight: bool, read_provider: Arc, ) -> Result { + trace_emit!("new"); if !evq.range().is_time() { return Err(Error::ExpectTimerange); } diff --git a/src/timebin/fromlayers.rs b/src/timebin/fromlayers.rs index b20ce9c..c752d8b 100644 --- a/src/timebin/fromlayers.rs +++ b/src/timebin/fromlayers.rs @@ -10,6 +10,7 @@ use futures_util::StreamExt; use items_0::streamitem::Sitemty; use items_0::timebin::BinsBoxed; use items_2::binning::timeweight::timeweight_bins_stream::BinnedBinsTimeweightStream; +use netpod::query::CacheUsage; use netpod::range::evrange::SeriesRange; use netpod::BinnedRange; use netpod::ChannelTypeConfigGen; @@ -50,6 +51,7 @@ impl TimeBinnedFromLayers { pub fn new( ch_conf: ChannelTypeConfigGen, binning_opts: BinningOptions, + cache_usage: CacheUsage, transform_query: TransformQuery, sub: EventsSubQuerySettings, log_level: String, @@ -69,7 +71,7 @@ impl TimeBinnedFromLayers { binning_opts ); let bin_len = DtMs::from_ms_u64(range.bin_len.ms()); - if bin_len_layers.contains(&bin_len) { + if cache_usage.is_cache_read() && bin_len_layers.contains(&bin_len) { trace_init!("{}::new bin_len in layers {:?}", Self::type_name(), range); let inp = GapFill::new( "FromLayers-ongrid".into(), @@ -93,7 +95,12 @@ impl TimeBinnedFromLayers { Self::type_name(), range ); - match find_next_finer_bin_len(bin_len, &bin_len_layers) { + let x = if cache_usage.is_cache_read() { + find_next_finer_bin_len(bin_len, &bin_len_layers) + } else { + None + }; + match x { Some(finer) => { if bin_len.ms() % finer.ms() != 0 { return Err(Error::FinerGridMismatch(bin_len, finer)); @@ -153,6 +160,7 @@ impl TimeBinnedFromLayers { let inp = futures_util::stream::iter([]); let ret = Self { inp: Box::pin(inp) }; trace_init!("{}::new setup nothing", Self::type_name()); + info!("bin from events disabled on user request"); Ok(ret) } } diff --git a/src/timebin/opts.rs b/src/timebin/opts.rs index 40b37f5..e2c533e 100644 --- a/src/timebin/opts.rs +++ b/src/timebin/opts.rs @@ -12,7 +12,7 @@ pub struct BinningOptions { impl BinningOptions { pub fn default() -> Self { Self { - cache_usage: CacheUsage::Read, + cache_usage: CacheUsage::default(), allow_from_events: true, allow_from_prebinned: true, allow_rebin: true, @@ -21,7 +21,7 @@ impl BinningOptions { pub fn testing_no_events() -> Self { Self { - cache_usage: CacheUsage::Read, + cache_usage: CacheUsage::default(), allow_from_events: false, allow_from_prebinned: true, allow_rebin: true, @@ -47,7 +47,7 @@ impl BinningOptions { impl From<&BinnedQuery> for BinningOptions { fn from(value: &BinnedQuery) -> Self { - let cache_usage = value.cache_usage().unwrap_or(CacheUsage::Ignore); + let cache_usage = value.cache_usage().unwrap_or(CacheUsage::default()); Self { cache_usage, allow_from_events: value.allow_from_events().unwrap_or(true), diff --git a/src/timebinnedjson.rs b/src/timebinnedjson.rs index 95d2cac..ef10846 100644 --- a/src/timebinnedjson.rs +++ b/src/timebinnedjson.rs @@ -280,6 +280,7 @@ async fn timebinned_stream( let stream = crate::timebin::fromlayers::TimeBinnedFromLayers::new( ch_conf, (&query).into(), + query.cache_usage().unwrap_or(Default::default()), query.transform().clone(), EventsSubQuerySettings::from(&query), query.log_level().into(), @@ -291,7 +292,6 @@ async fn timebinned_stream( events_read_provider, )?; let stream = stream.map(|item| { - // use items_0::timebin::BinningggContainerBinsDyn; use items_0::timebin::BinsBoxed; on_sitemty_data!(item, |mut x: BinsBoxed| { x.fix_numerics(); @@ -541,10 +541,11 @@ pub async fn timebinned_cbor_framed( Ok(x) => match x { StreamItem::DataItem(x) => match x { RangeCompletableItem::Data(mut item) => { + let tsnow = Instant::now(); let coll = coll.get_or_insert_with(|| item.new_collector()); coll.ingest(&mut item); - if coll.len() >= 128 || last_emit.elapsed() >= timeout_content_2 { - last_emit = Instant::now(); + if coll.len() >= 128 || tsnow >= last_emit + timeout_content_2 { + last_emit = tsnow; take_collector_result_cbor(coll).map(|x| Ok(x)) } else { None