WIP new binned query

This commit is contained in:
Dominik Werder
2025-03-26 09:06:05 +01:00
parent 17f2c87c34
commit fb509b04ec
8 changed files with 137 additions and 17 deletions

View File

@@ -2,6 +2,7 @@ pub mod cached;
pub mod fromevents;
pub mod fromlayers;
pub mod opts;
pub mod pbd2;
mod basic;
mod gapfill;

View File

@@ -7,7 +7,10 @@ use crate::timebin::gapfill::GapFill;
use crate::timebin::grid::find_next_finer_bin_len;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::LogItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StatsItem;
use items_0::streamitem::StreamItem;
use items_0::timebin::BinsBoxed;
use items_2::binning::timeweight::timeweight_bins_stream::BinnedBinsTimeweightStream;
use netpod::query::CacheUsage;
@@ -21,11 +24,14 @@ use query::api4::events::EventsSubQuery;
use query::api4::events::EventsSubQuerySelect;
use query::api4::events::EventsSubQuerySettings;
use query::transform::TransformQuery;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
macro_rules! info_init { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ) }
macro_rules! trace_init { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) }
autoerr::create_error_v1!(
@@ -41,6 +47,7 @@ type BoxedInput = Pin<Box<dyn Stream<Item = Sitemty<BinsBoxed>> + Send>>;
pub struct TimeBinnedFromLayers {
inp: BoxedInput,
outbuf: VecDeque<<Self as Stream>::Item>,
}
impl TimeBinnedFromLayers {
@@ -71,7 +78,30 @@ impl TimeBinnedFromLayers {
binning_opts
);
let bin_len = DtMs::from_ms_u64(range.bin_len.ms());
if cache_usage.is_cache_read() && bin_len_layers.contains(&bin_len) {
if binning_opts.pbd_enable() {
let inp = futures_util::stream::iter([]);
let mut ret = Self {
inp: Box::pin(inp),
outbuf: VecDeque::new(),
};
info_init!("pbd_enable");
let item = LogItem::from_node(0, log::Level::TRACE, "test-log-item-trace".into());
let item = StreamItem::Log(item);
ret.outbuf.push_back(Ok(item));
let item = LogItem::from_node(0, log::Level::DEBUG, "test-log-item-debug".into());
let item = StreamItem::Log(item);
ret.outbuf.push_back(Ok(item));
let item = LogItem::from_node(0, log::Level::INFO, "test-log-item-info".into());
let item = StreamItem::Log(item);
ret.outbuf.push_back(Ok(item));
let item = LogItem::from_node(0, log::Level::WARN, "test-log-item-warn".into());
let item = StreamItem::Log(item);
ret.outbuf.push_back(Ok(item));
let item = StatsItem::Binning;
let item = StreamItem::Stats(item);
ret.outbuf.push_back(Ok(item));
Ok(ret)
} else if cache_usage.is_cache_read() && bin_len_layers.contains(&bin_len) {
trace_init!("{}::new bin_len in layers {:?}", Self::type_name(), range);
let inp = GapFill::new(
"FromLayers-ongrid".into(),
@@ -87,7 +117,10 @@ impl TimeBinnedFromLayers {
cache_read_provider,
events_read_provider.clone(),
)?;
let ret = Self { inp: Box::pin(inp) };
let ret = Self {
inp: Box::pin(inp),
outbuf: VecDeque::new(),
};
Ok(ret)
} else {
trace_init!(
@@ -127,7 +160,10 @@ impl TimeBinnedFromLayers {
events_read_provider.clone(),
)?;
let inp = BinnedBinsTimeweightStream::new(range, Box::pin(inp));
let ret = Self { inp: Box::pin(inp) };
let ret = Self {
inp: Box::pin(inp),
outbuf: VecDeque::new(),
};
Ok(ret)
}
None => {
@@ -153,12 +189,18 @@ impl TimeBinnedFromLayers {
do_time_weight,
events_read_provider,
)?;
let ret = Self { inp: Box::pin(inp) };
let ret = Self {
inp: Box::pin(inp),
outbuf: VecDeque::new(),
};
trace_init!("{}::new setup from events", Self::type_name());
Ok(ret)
} else {
let inp = futures_util::stream::iter([]);
let ret = Self { inp: Box::pin(inp) };
let ret = Self {
inp: Box::pin(inp),
outbuf: VecDeque::new(),
};
trace_init!("{}::new setup nothing", Self::type_name());
trace_init!("bin from events disabled on user request");
Ok(ret)
@@ -174,10 +216,14 @@ impl Stream for TimeBinnedFromLayers {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
match self.inp.poll_next_unpin(cx) {
Ready(Some(x)) => Ready(Some(x)),
Ready(None) => Ready(None),
Pending => Pending,
if let Some(x) = self.outbuf.pop_front() {
Ready(Some(x))
} else {
match self.inp.poll_next_unpin(cx) {
Ready(Some(x)) => Ready(Some(x)),
Ready(None) => Ready(None),
Pending => Pending,
}
}
}
}

View File

@@ -7,6 +7,9 @@ pub struct BinningOptions {
allow_from_events: bool,
allow_from_prebinned: bool,
allow_rebin: bool,
pbd_enable: bool,
pbd_rts_pbp_block: Vec<Vec<u8>>,
pbd_evs: bool,
}
impl BinningOptions {
@@ -16,6 +19,9 @@ impl BinningOptions {
allow_from_events: true,
allow_from_prebinned: true,
allow_rebin: true,
pbd_enable: false,
pbd_rts_pbp_block: Vec::new(),
pbd_evs: false,
}
}
@@ -25,6 +31,9 @@ impl BinningOptions {
allow_from_events: false,
allow_from_prebinned: true,
allow_rebin: true,
pbd_enable: false,
pbd_rts_pbp_block: Vec::new(),
pbd_evs: false,
}
}
@@ -43,6 +52,18 @@ impl BinningOptions {
pub fn allow_rebin(&self) -> bool {
self.allow_rebin
}
pub fn pbd_enable(&self) -> bool {
self.pbd_enable.clone()
}
pub fn pbd_rts_pbp_block(&self) -> Vec<Vec<u8>> {
self.pbd_rts_pbp_block.clone()
}
pub fn pbd_evs(&self) -> bool {
self.pbd_evs.clone()
}
}
impl From<&BinnedQuery> for BinningOptions {
@@ -53,6 +74,9 @@ impl From<&BinnedQuery> for BinningOptions {
allow_from_events: value.allow_from_events().unwrap_or(true),
allow_from_prebinned: value.allow_from_prebinned().unwrap_or(true),
allow_rebin: value.allow_rebin().unwrap_or(true),
pbd_enable: value.pbd_enable().unwrap_or(false),
pbd_rts_pbp_block: value.pbd_rts_pbp_block().unwrap_or(Vec::new()),
pbd_evs: value.pbd_evs().unwrap_or(false),
}
}
}

2
src/timebin/pbd2.rs Normal file
View File

@@ -0,0 +1,2 @@
pub mod binstream;
pub mod srctraits;

View File

@@ -0,0 +1,8 @@
pub struct BinStream {}
impl BinStream {
pub fn new() -> Self {
// BinWriteIndexRtStream
todo!()
}
}

View File

@@ -0,0 +1,15 @@
use netpod::range::evrange::NanoRange;
use netpod::ttl::RetentionTime;
use series::msp::PrebinnedPartitioning;
use series::SeriesId;
pub trait BinWriteIndexReaderMaker {
fn new(
rt1: RetentionTime,
rt2: RetentionTime,
series: SeriesId,
pbp: PrebinnedPartitioning,
range: NanoRange,
// scyqueue: ScyllaQueue,
);
}

View File

@@ -24,6 +24,7 @@ use items_2::channelevents::ChannelEvents;
use items_2::jsonbytes::CborBytes;
use items_2::jsonbytes::JsonBytes;
use items_2::merger::Merger;
use netpod::log;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::BinnedRangeEnum;
@@ -523,6 +524,17 @@ pub async fn timebinned_cbor_framed(
let timeout_content_2 = timeout_content_base * 2 / 3;
let mut coll = None;
let mut last_emit = Instant::now();
let log_items_level = match query.log_items() {
"trace" => log::Level::TRACE,
"debug" => log::Level::DEBUG,
"info" => log::Level::INFO,
"warn" => log::Level::WARN,
_ => log::Level::ERROR,
};
let stats_items = match query.stats_items() {
Some(x) => true,
None => false,
};
let stream = stream
.map(|x| Some(x))
.chain(futures_util::stream::iter([None]));
@@ -547,14 +559,26 @@ pub async fn timebinned_cbor_framed(
RangeCompletableItem::RangeComplete => None,
},
StreamItem::Log(x) => {
debug!("{x:?}");
// Some(serde_json::Value::String(format!("{x:?}")))
None
if x.level <= log_items_level {
let mut buf = Vec::with_capacity(1024);
ciborium::into_writer(&x, &mut buf).expect("cbor serialize");
let bytes = Bytes::from(buf);
let item = CborBytes::new(bytes);
Some(Ok(item))
} else {
None
}
}
StreamItem::Stats(x) => {
debug!("{x:?}");
// Some(serde_json::Value::String(format!("{x:?}")))
None
if stats_items {
let mut buf = Vec::with_capacity(1024);
ciborium::into_writer(&x, &mut buf).expect("cbor serialize");
let bytes = Bytes::from(buf);
let item = CborBytes::new(bytes);
Some(Ok(item))
} else {
None
}
}
},
Err(e) => Some(Err(e)),