WIP cache read

This commit is contained in:
Dominik Werder
2024-12-08 07:39:33 +01:00
parent 6368003376
commit 309c995ad7
9 changed files with 106 additions and 73 deletions

View File

@@ -22,6 +22,7 @@ http = "1"
http-body = "1"
http-body-util = "0.1.0"
thiserror = "=0.0.1"
autoerr = "0.0.3"
chrono = { version = "0.4.38", features = ["serde"] }
wasmer = { version = "5.0.1", default-features = false, features = ["sys", "cranelift"], optional = true }
netpod = { path = "../daqbuf-netpod", package = "daqbuf-netpod" }

View File

@@ -2,6 +2,7 @@ use crate::eventsplainreader::DummyCacheReadProvider;
use crate::log::*;
use crate::test::events_reader::TestEventsReader;
use crate::timebin::fromlayers::TimeBinnedFromLayers;
use crate::timebin::opts::BinningOptions;
use futures_util::StreamExt;
use netpod::query::CacheUsage;
use netpod::range::evrange::NanoRange;
@@ -38,6 +39,7 @@ async fn timebin_from_layers_inner() -> Result<(), Error> {
"basictest-f32",
));
let cache_usage = CacheUsage::Ignore;
let binning_opts: BinningOptions = todo!();
let transform_query = TransformQuery::default_time_binned();
let nano_range = NanoRange {
beg: 1000 * 1000 * 1000 * 1,
@@ -63,7 +65,7 @@ async fn timebin_from_layers_inner() -> Result<(), Error> {
let range = BinnedRange::from_nano_range(nano_range, bin_len);
let mut stream = TimeBinnedFromLayers::new(
ch_conf,
cache_usage,
binning_opts,
transform_query,
settings,
log_level.into(),
@@ -100,6 +102,7 @@ async fn timebin_from_layers_1layer_inner() -> Result<(), Error> {
"basictest-f32",
));
let cache_usage = CacheUsage::Ignore;
let binning_opts: BinningOptions = todo!();
let transform_query = TransformQuery::default_time_binned();
let nano_range = NanoRange {
beg: 1000 * 1000 * 1000 * 1,
@@ -125,7 +128,7 @@ async fn timebin_from_layers_1layer_inner() -> Result<(), Error> {
let range = BinnedRange::from_nano_range(nano_range, bin_len);
let mut stream = TimeBinnedFromLayers::new(
ch_conf,
cache_usage,
binning_opts,
transform_query,
settings,
log_level.into(),

View File

@@ -1,7 +1,7 @@
pub mod cached;
pub mod fromevents;
pub mod fromlayers;
pub mod timebin;
pub mod opts;
mod basic;
mod gapfill;

View File

@@ -1,11 +1,11 @@
use crate as streams;
use crate::log::*;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::Sitemty;
use items_0::timebin::BinsBoxed;
use items_2::channelevents::ChannelEvents;
use netpod::log::*;
use netpod::BinnedRange;
use netpod::DtMs;
use netpod::TsNano;
@@ -17,17 +17,22 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "BinCachedReader")]
pub enum Error {
TodoImpl,
ChannelSend,
ChannelRecv,
Scylla(String),
}
autoerr::create_error_v1!(
name(Error, "BinCachedReader"),
enum variants {
TodoImpl,
ChannelSend,
ChannelRecv,
Scylla(String),
},
);
macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
pub type BinsReadResErr = streams::timebin::cached::reader::Error;
pub type BinsReadRes = Result<Option<BinsBoxed>, BinsReadResErr>;
pub type BinsReadFutBoxed = Pin<Box<dyn Future<Output = BinsReadRes> + Send>>;
pub fn off_max() -> u64 {
1000
}
@@ -74,30 +79,17 @@ pub trait EventsReadProvider: Send + Sync {
}
pub struct CacheReading {
fut: Pin<
Box<
dyn Future<Output = Result<Option<BinsBoxed>, streams::timebin::cached::reader::Error>>
+ Send,
>,
>,
fut: BinsReadFutBoxed,
}
impl CacheReading {
pub fn new(
fut: Pin<
Box<
dyn Future<
Output = Result<Option<BinsBoxed>, streams::timebin::cached::reader::Error>,
> + Send,
>,
>,
) -> Self {
pub fn new(fut: BinsReadFutBoxed) -> Self {
Self { fut }
}
}
impl Future for CacheReading {
type Output = Result<Option<BinsBoxed>, streams::timebin::cached::reader::Error>;
type Output = BinsReadRes;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.fut.poll_unpin(cx)

View File

@@ -1,5 +1,6 @@
use super::cached::reader::CacheReadProvider;
use super::cached::reader::EventsReadProvider;
use super::opts::BinningOptions;
use crate::log::*;
use crate::timebin::fromevents::BinnedFromEvents;
use crate::timebin::gapfill::GapFill;
@@ -9,7 +10,6 @@ use futures_util::StreamExt;
use items_0::streamitem::Sitemty;
use items_0::timebin::BinsBoxed;
use items_2::binning::timeweight::timeweight_bins_stream::BinnedBinsTimeweightStream;
use netpod::query::CacheUsage;
use netpod::range::evrange::SeriesRange;
use netpod::BinnedRange;
use netpod::ChannelTypeConfigGen;
@@ -27,14 +27,14 @@ use std::task::Poll;
macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
#[derive(Debug, thiserror::Error)]
#[cstm(name = "TimeBinnedFromLayers")]
pub enum Error {
GapFill(#[from] super::gapfill::Error),
BinnedFromEvents(#[from] super::fromevents::Error),
#[error("FinerGridMismatch({0}, {1})")]
FinerGridMismatch(DtMs, DtMs),
}
autoerr::create_error_v1!(
name(Error, "TimeBinnedFromLayers"),
enum variants {
GapFill(#[from] super::gapfill::Error),
BinnedFromEvents(#[from] super::fromevents::Error),
FinerGridMismatch(DtMs, DtMs),
},
);
type BoxedInput = Pin<Box<dyn Stream<Item = Sitemty<BinsBoxed>> + Send>>;
@@ -49,7 +49,7 @@ impl TimeBinnedFromLayers {
pub fn new(
ch_conf: ChannelTypeConfigGen,
cache_usage: CacheUsage,
binning_opts: BinningOptions,
transform_query: TransformQuery,
sub: EventsSubQuerySettings,
log_level: String,
@@ -73,7 +73,7 @@ impl TimeBinnedFromLayers {
let inp = GapFill::new(
"FromLayers-ongrid".into(),
ch_conf.clone(),
cache_usage.clone(),
binning_opts.clone(),
transform_query.clone(),
sub.clone(),
log_level.clone(),
@@ -107,7 +107,7 @@ impl TimeBinnedFromLayers {
let inp = GapFill::new(
"FromLayers-finergrid".into(),
ch_conf.clone(),
cache_usage.clone(),
binning_opts.clone(),
transform_query.clone(),
sub.clone(),
log_level.clone(),

View File

@@ -1,5 +1,6 @@
use super::cached::reader::CacheReadProvider;
use super::cached::reader::EventsReadProvider;
use super::opts::BinningOptions;
use crate::log::*;
use crate::timebin::fromevents::BinnedFromEvents;
use futures_util::FutureExt;
@@ -11,7 +12,6 @@ use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::timebin::BinsBoxed;
use items_2::binning::timeweight::timeweight_bins_stream::BinnedBinsTimeweightStream;
use netpod::query::CacheUsage;
use netpod::range::evrange::NanoRange;
use netpod::range::evrange::SeriesRange;
use netpod::BinnedRange;
@@ -28,30 +28,27 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
#[allow(unused)]
macro_rules! debug_init { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) }
#[allow(unused)]
macro_rules! debug_setup { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) }
#[allow(unused)]
macro_rules! debug_cache { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) }
macro_rules! trace_cache { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) }
#[allow(unused)]
macro_rules! trace_handle { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
#[derive(Debug, thiserror::Error)]
#[cstm(name = "BinCachedGapFill")]
pub enum Error {
CacheReader(#[from] super::cached::reader::Error),
#[error("GapFromFiner({0}, {1}, {2})")]
GapFromFiner(TsNano, TsNano, DtMs),
#[error("MissingBegFromFiner({0}, {1}, {2})")]
MissingBegFromFiner(TsNano, TsNano, DtMs),
#[error("InputBeforeRange({0}, {1})")]
InputBeforeRange(NanoRange, BinnedRange<TsNano>),
EventsReader(#[from] super::fromevents::Error),
}
autoerr::create_error_v1!(
name(Error, "BinCachedGapFill"),
enum variants {
CacheReader(#[from] super::cached::reader::Error),
// #[error("GapFromFiner({0}, {1}, {2})")]
GapFromFiner(TsNano, TsNano, DtMs),
// #[error("MissingBegFromFiner({0}, {1}, {2})")]
MissingBegFromFiner(TsNano, TsNano, DtMs),
// #[error("InputBeforeRange({0}, {1})")]
InputBeforeRange(NanoRange, BinnedRange<TsNano>),
EventsReader(#[from] super::fromevents::Error),
},
);
type Input = Pin<Box<dyn Stream<Item = Sitemty<BinsBoxed>> + Send>>;
@@ -60,7 +57,7 @@ type Input = Pin<Box<dyn Stream<Item = Sitemty<BinsBoxed>> + Send>>;
pub struct GapFill {
dbgname: String,
ch_conf: ChannelTypeConfigGen,
cache_usage: CacheUsage,
binning_opts: BinningOptions,
transform_query: TransformQuery,
sub: EventsSubQuerySettings,
log_level: String,
@@ -90,7 +87,7 @@ impl GapFill {
pub fn new(
dbgname_parent: String,
ch_conf: ChannelTypeConfigGen,
cache_usage: CacheUsage,
binning_opts: BinningOptions,
transform_query: TransformQuery,
sub: EventsSubQuerySettings,
log_level: String,
@@ -103,7 +100,7 @@ impl GapFill {
) -> Result<Self, Error> {
let dbgname = format!("{}--[{}]", dbgname_parent, range);
debug_init!("new dbgname {}", dbgname);
let inp = if cache_usage.is_cache_read() {
let inp = if binning_opts.cache_usage().is_cache_read() {
let series = ch_conf.series().expect("series id for cache read");
let stream = super::cached::reader::CachedReader::new(
series,
@@ -122,7 +119,7 @@ impl GapFill {
let ret = Self {
dbgname,
ch_conf,
cache_usage,
binning_opts,
transform_query,
sub,
log_level,
@@ -173,13 +170,14 @@ impl GapFill {
.get_or_insert_with(|| bins.empty());
bins2.drain_into(dst.as_mut(), 0..bins2.len());
}
if self.cache_usage.is_cache_write() {
if self.binning_opts.cache_usage().is_cache_write() {
self.cache_write_intermediate()?;
} // TODO make sure that input does not send "made-up" empty future bins.
// On the other hand, if the request is over past range, but the channel was silent ever since?
// Then we should in principle know that from is-alive status checking.
// So, until then, allow made-up bins?
// Maybe, for now, only write those bins before some last non-zero-count bin. The only safe way.
}
// TODO make sure that input does not send "made-up" empty future bins.
// On the other hand, if the request is over past range, but the channel was silent ever since?
// Then we should in principle know that from is-alive status checking.
// So, until then, allow made-up bins?
// Maybe, for now, only write those bins before some last non-zero-count bin. The only safe way.
Ok(bins)
}
@@ -265,7 +263,7 @@ impl GapFill {
let inp_finer = GapFill::new(
self.dbgname.clone(),
self.ch_conf.clone(),
self.cache_usage.clone(),
self.binning_opts.clone(),
self.transform_query.clone(),
self.sub.clone(),
self.log_level.clone(),
@@ -338,6 +336,7 @@ impl GapFill {
fn cache_write_intermediate(self: Pin<&mut Self>) -> Result<(), Error> {
// TODO See cache_write_on_end
trace_cache!("maybe write to cache");
Ok(())
}
}
@@ -377,7 +376,7 @@ impl Stream for GapFill {
trace_handle!("{} RECV RANGE FINAL", self.dbgname);
self.inp_finer_range_final = true;
self.inp_finer_range_final_cnt += 1;
if self.cache_usage.is_cache_write() {
if self.binning_opts.cache_usage().is_cache_write() {
match self.as_mut().cache_write_on_end() {
Ok(()) => continue,
Err(e) => Ready(Some(sitem_err_from_string(e))),

40
src/timebin/opts.rs Normal file
View File

@@ -0,0 +1,40 @@
use netpod::query::CacheUsage;
use query::api4::binned::BinnedQuery;
#[derive(Debug, Clone)]
pub struct BinningOptions {
cache_usage: CacheUsage,
allow_from_events: bool,
allow_from_prebinned: bool,
allow_rebin: bool,
}
impl BinningOptions {
pub fn cache_usage(&self) -> &CacheUsage {
&self.cache_usage
}
pub fn allow_from_events(&self) -> bool {
self.allow_from_events
}
pub fn allow_from_prebinned(&self) -> bool {
self.allow_from_prebinned
}
pub fn allow_rebin(&self) -> bool {
self.allow_rebin
}
}
impl From<&BinnedQuery> for BinningOptions {
fn from(value: &BinnedQuery) -> Self {
let cache_usage = value.cache_usage().unwrap_or(CacheUsage::Ignore);
Self {
cache_usage,
allow_from_events: value.allow_from_events().unwrap_or(true),
allow_from_prebinned: value.allow_from_prebinned().unwrap_or(true),
allow_rebin: value.allow_rebin().unwrap_or(true),
}
}
}

View File

@@ -1 +0,0 @@

View File

@@ -265,7 +265,6 @@ async fn timebinned_stream(
events_read_provider: Arc<dyn EventsReadProvider>,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<Box<dyn CollectableDyn>>> + Send>>, Error> {
use netpod::query::CacheUsage;
let cache_usage = query.cache_usage().unwrap_or(CacheUsage::Ignore);
let do_time_weight = true;
let bin_len_layers = if let Some(subgrids) = query.subgrids() {
subgrids
@@ -277,7 +276,7 @@ async fn timebinned_stream(
};
let stream = crate::timebin::fromlayers::TimeBinnedFromLayers::new(
ch_conf,
cache_usage,
(&query).into(),
query.transform().clone(),
EventsSubQuerySettings::from(&query),
query.log_level().into(),