diff --git a/Cargo.toml b/Cargo.toml index 5dc344e..b564da5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,8 +11,8 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_cbor = "0.11.1" typetag = "0.2.18" -ciborium = "0.2.1" -bytes = "1.8" +ciborium = "0.2.2" +bytes = "1.10.1" arrayref = "0.3.6" crc32fast = "1.4.2" byteorder = "1.5.0" diff --git a/src/timebin.rs b/src/timebin.rs index c9e5b9d..c07b857 100644 --- a/src/timebin.rs +++ b/src/timebin.rs @@ -2,6 +2,7 @@ pub mod cached; pub mod fromevents; pub mod fromlayers; pub mod opts; +pub mod pbd2; mod basic; mod gapfill; diff --git a/src/timebin/fromlayers.rs b/src/timebin/fromlayers.rs index 60c1f23..574bb5c 100644 --- a/src/timebin/fromlayers.rs +++ b/src/timebin/fromlayers.rs @@ -7,7 +7,10 @@ use crate::timebin::gapfill::GapFill; use crate::timebin::grid::find_next_finer_bin_len; use futures_util::Stream; use futures_util::StreamExt; +use items_0::streamitem::LogItem; use items_0::streamitem::Sitemty; +use items_0::streamitem::StatsItem; +use items_0::streamitem::StreamItem; use items_0::timebin::BinsBoxed; use items_2::binning::timeweight::timeweight_bins_stream::BinnedBinsTimeweightStream; use netpod::query::CacheUsage; @@ -21,11 +24,14 @@ use query::api4::events::EventsSubQuery; use query::api4::events::EventsSubQuerySelect; use query::api4::events::EventsSubQuerySettings; use query::transform::TransformQuery; +use std::collections::VecDeque; use std::pin::Pin; use std::sync::Arc; use std::task::Context; use std::task::Poll; +macro_rules! info_init { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ) } + macro_rules! trace_init { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) } autoerr::create_error_v1!( @@ -41,6 +47,7 @@ type BoxedInput = Pin> + Send>>; pub struct TimeBinnedFromLayers { inp: BoxedInput, + outbuf: VecDeque<::Item>, } impl TimeBinnedFromLayers { @@ -71,7 +78,30 @@ impl TimeBinnedFromLayers { binning_opts ); let bin_len = DtMs::from_ms_u64(range.bin_len.ms()); - if cache_usage.is_cache_read() && bin_len_layers.contains(&bin_len) { + if binning_opts.pbd_enable() { + let inp = futures_util::stream::iter([]); + let mut ret = Self { + inp: Box::pin(inp), + outbuf: VecDeque::new(), + }; + info_init!("pbd_enable"); + let item = LogItem::from_node(0, log::Level::TRACE, "test-log-item-trace".into()); + let item = StreamItem::Log(item); + ret.outbuf.push_back(Ok(item)); + let item = LogItem::from_node(0, log::Level::DEBUG, "test-log-item-debug".into()); + let item = StreamItem::Log(item); + ret.outbuf.push_back(Ok(item)); + let item = LogItem::from_node(0, log::Level::INFO, "test-log-item-info".into()); + let item = StreamItem::Log(item); + ret.outbuf.push_back(Ok(item)); + let item = LogItem::from_node(0, log::Level::WARN, "test-log-item-warn".into()); + let item = StreamItem::Log(item); + ret.outbuf.push_back(Ok(item)); + let item = StatsItem::Binning; + let item = StreamItem::Stats(item); + ret.outbuf.push_back(Ok(item)); + Ok(ret) + } else if cache_usage.is_cache_read() && bin_len_layers.contains(&bin_len) { trace_init!("{}::new bin_len in layers {:?}", Self::type_name(), range); let inp = GapFill::new( "FromLayers-ongrid".into(), @@ -87,7 +117,10 @@ impl TimeBinnedFromLayers { cache_read_provider, events_read_provider.clone(), )?; - let ret = Self { inp: Box::pin(inp) }; + let ret = Self { + inp: Box::pin(inp), + outbuf: VecDeque::new(), + }; Ok(ret) } else { trace_init!( @@ -127,7 +160,10 @@ impl TimeBinnedFromLayers { events_read_provider.clone(), )?; let inp = BinnedBinsTimeweightStream::new(range, Box::pin(inp)); - let ret = Self { inp: Box::pin(inp) }; + let ret = Self { + inp: Box::pin(inp), + outbuf: VecDeque::new(), + }; Ok(ret) } None => { @@ -153,12 +189,18 @@ impl TimeBinnedFromLayers { do_time_weight, events_read_provider, )?; - let ret = Self { inp: Box::pin(inp) }; + let ret = Self { + inp: Box::pin(inp), + outbuf: VecDeque::new(), + }; trace_init!("{}::new setup from events", Self::type_name()); Ok(ret) } else { let inp = futures_util::stream::iter([]); - let ret = Self { inp: Box::pin(inp) }; + let ret = Self { + inp: Box::pin(inp), + outbuf: VecDeque::new(), + }; trace_init!("{}::new setup nothing", Self::type_name()); trace_init!("bin from events disabled on user request"); Ok(ret) @@ -174,10 +216,14 @@ impl Stream for TimeBinnedFromLayers { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - match self.inp.poll_next_unpin(cx) { - Ready(Some(x)) => Ready(Some(x)), - Ready(None) => Ready(None), - Pending => Pending, + if let Some(x) = self.outbuf.pop_front() { + Ready(Some(x)) + } else { + match self.inp.poll_next_unpin(cx) { + Ready(Some(x)) => Ready(Some(x)), + Ready(None) => Ready(None), + Pending => Pending, + } } } } diff --git a/src/timebin/opts.rs b/src/timebin/opts.rs index e2c533e..86fbafc 100644 --- a/src/timebin/opts.rs +++ b/src/timebin/opts.rs @@ -7,6 +7,9 @@ pub struct BinningOptions { allow_from_events: bool, allow_from_prebinned: bool, allow_rebin: bool, + pbd_enable: bool, + pbd_rts_pbp_block: Vec>, + pbd_evs: bool, } impl BinningOptions { @@ -16,6 +19,9 @@ impl BinningOptions { allow_from_events: true, allow_from_prebinned: true, allow_rebin: true, + pbd_enable: false, + pbd_rts_pbp_block: Vec::new(), + pbd_evs: false, } } @@ -25,6 +31,9 @@ impl BinningOptions { allow_from_events: false, allow_from_prebinned: true, allow_rebin: true, + pbd_enable: false, + pbd_rts_pbp_block: Vec::new(), + pbd_evs: false, } } @@ -43,6 +52,18 @@ impl BinningOptions { pub fn allow_rebin(&self) -> bool { self.allow_rebin } + + pub fn pbd_enable(&self) -> bool { + self.pbd_enable.clone() + } + + pub fn pbd_rts_pbp_block(&self) -> Vec> { + self.pbd_rts_pbp_block.clone() + } + + pub fn pbd_evs(&self) -> bool { + self.pbd_evs.clone() + } } impl From<&BinnedQuery> for BinningOptions { @@ -53,6 +74,9 @@ impl From<&BinnedQuery> for BinningOptions { allow_from_events: value.allow_from_events().unwrap_or(true), allow_from_prebinned: value.allow_from_prebinned().unwrap_or(true), allow_rebin: value.allow_rebin().unwrap_or(true), + pbd_enable: value.pbd_enable().unwrap_or(false), + pbd_rts_pbp_block: value.pbd_rts_pbp_block().unwrap_or(Vec::new()), + pbd_evs: value.pbd_evs().unwrap_or(false), } } } diff --git a/src/timebin/pbd2.rs b/src/timebin/pbd2.rs new file mode 100644 index 0000000..8f9a322 --- /dev/null +++ b/src/timebin/pbd2.rs @@ -0,0 +1,2 @@ +pub mod binstream; +pub mod srctraits; diff --git a/src/timebin/pbd2/binstream.rs b/src/timebin/pbd2/binstream.rs new file mode 100644 index 0000000..c451b40 --- /dev/null +++ b/src/timebin/pbd2/binstream.rs @@ -0,0 +1,8 @@ +pub struct BinStream {} + +impl BinStream { + pub fn new() -> Self { + // BinWriteIndexRtStream + todo!() + } +} diff --git a/src/timebin/pbd2/srctraits.rs b/src/timebin/pbd2/srctraits.rs new file mode 100644 index 0000000..28dc3bd --- /dev/null +++ b/src/timebin/pbd2/srctraits.rs @@ -0,0 +1,15 @@ +use netpod::range::evrange::NanoRange; +use netpod::ttl::RetentionTime; +use series::msp::PrebinnedPartitioning; +use series::SeriesId; + +pub trait BinWriteIndexReaderMaker { + fn new( + rt1: RetentionTime, + rt2: RetentionTime, + series: SeriesId, + pbp: PrebinnedPartitioning, + range: NanoRange, + // scyqueue: ScyllaQueue, + ); +} diff --git a/src/timebinnedjson.rs b/src/timebinnedjson.rs index 27ff3fa..54bc134 100644 --- a/src/timebinnedjson.rs +++ b/src/timebinnedjson.rs @@ -24,6 +24,7 @@ use items_2::channelevents::ChannelEvents; use items_2::jsonbytes::CborBytes; use items_2::jsonbytes::JsonBytes; use items_2::merger::Merger; +use netpod::log; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::BinnedRangeEnum; @@ -523,6 +524,17 @@ pub async fn timebinned_cbor_framed( let timeout_content_2 = timeout_content_base * 2 / 3; let mut coll = None; let mut last_emit = Instant::now(); + let log_items_level = match query.log_items() { + "trace" => log::Level::TRACE, + "debug" => log::Level::DEBUG, + "info" => log::Level::INFO, + "warn" => log::Level::WARN, + _ => log::Level::ERROR, + }; + let stats_items = match query.stats_items() { + Some(x) => true, + None => false, + }; let stream = stream .map(|x| Some(x)) .chain(futures_util::stream::iter([None])); @@ -547,14 +559,26 @@ pub async fn timebinned_cbor_framed( RangeCompletableItem::RangeComplete => None, }, StreamItem::Log(x) => { - debug!("{x:?}"); - // Some(serde_json::Value::String(format!("{x:?}"))) - None + if x.level <= log_items_level { + let mut buf = Vec::with_capacity(1024); + ciborium::into_writer(&x, &mut buf).expect("cbor serialize"); + let bytes = Bytes::from(buf); + let item = CborBytes::new(bytes); + Some(Ok(item)) + } else { + None + } } StreamItem::Stats(x) => { - debug!("{x:?}"); - // Some(serde_json::Value::String(format!("{x:?}"))) - None + if stats_items { + let mut buf = Vec::with_capacity(1024); + ciborium::into_writer(&x, &mut buf).expect("cbor serialize"); + let bytes = Bytes::from(buf); + let item = CborBytes::new(bytes); + Some(Ok(item)) + } else { + None + } } }, Err(e) => Some(Err(e)),