From 778264eb300734c764ac619df02e7f15d619632c Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 23 Jul 2025 23:57:29 +0200 Subject: [PATCH] Choose workaround at query time --- crates/daqbuffer/Cargo.toml | 2 +- crates/httpret/src/api4/binned.rs | 34 +++- crates/httpret/src/api4/binned_v2.rs | 15 +- crates/httpret/src/api4/binwriteindex.rs | 23 ++- crates/httpret/src/channelconfig.rs | 36 +++- crates/nodenet/src/scylla.rs | 1 + crates/scyllaconn/src/bincache.rs | 32 +++- .../scyllaconn/src/binned2/binnedrtbinlen.rs | 14 +- .../scyllaconn/src/binned2/binnedrtmsplsps.rs | 9 +- crates/scyllaconn/src/binned2/frombinned.rs | 9 +- crates/scyllaconn/src/binned2/msplspiter.rs | 6 +- crates/scyllaconn/src/binned3.rs | 3 + crates/scyllaconn/src/binned3/index_entry.rs | 22 +++ .../scyllaconn/src/binned3/layeredatgrid.rs | 46 +++++ crates/scyllaconn/src/binned3/layeredtop.rs | 177 ++++++++++++++++++ crates/scyllaconn/src/binwriteindex.rs | 8 +- crates/scyllaconn/src/binwriteindex/bwxcmb.rs | 19 +- .../src/binwriteindex/read_all_coarse.rs | 25 ++- crates/scyllaconn/src/events2/events.rs | 29 ++- crates/scyllaconn/src/events2/msp.rs | 51 ++++- crates/scyllaconn/src/events2/prepare.rs | 59 ++++-- crates/scyllaconn/src/lib.rs | 1 + crates/scyllaconn/src/worker.rs | 104 +++++++--- 23 files changed, 622 insertions(+), 103 deletions(-) create mode 100644 crates/scyllaconn/src/binned3.rs create mode 100644 crates/scyllaconn/src/binned3/index_entry.rs create mode 100644 crates/scyllaconn/src/binned3/layeredatgrid.rs create mode 100644 crates/scyllaconn/src/binned3/layeredtop.rs diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index 983ca41..214c794 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqretrieve" -version = "0.5.5" +version = "0.5.6-aa.2" authors = ["Dominik Werder "] edition = "2024" diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index 6b008ca..514c702 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -28,6 +28,7 @@ use netpod::ChannelTypeConfigGen; use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::ReqCtx; +use netpod::UseScylla6Workarounds; use netpod::APP_CBOR_FRAMED; use netpod::APP_JSON; use netpod::APP_JSON_FRAMED; @@ -190,6 +191,7 @@ async fn binned_instrumented( fn make_read_provider( chname: &str, + use_scylla6_workarounds: UseScylla6Workarounds, scyqueue: Option, open_bytes: Pin>, ctx: &ReqCtx, @@ -214,7 +216,7 @@ fn make_read_provider( let cache_read_provider = if ncc.node_config.cluster.scylla_lt().is_some() { scyqueue .clone() - .map(|qu| scyllaconn::bincache::ScyllaPrebinnedReadProvider::new(qu)) + .map(|qu| scyllaconn::bincache::ScyllaPrebinnedReadProvider::new(use_scylla6_workarounds, qu)) .map(|x| Arc::new(x) as Arc) .expect("scylla queue") } else if ncc.node.sf_databuffer.is_some() { @@ -276,8 +278,14 @@ async fn binned_json_framed( .await? .ok_or_else(|| Error::ChannelNotFound)?; let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); - let (events_read_provider, cache_read_provider) = - make_read_provider(ch_conf.name(), res2.scyqueue, open_bytes, ctx, ncc); + let (events_read_provider, cache_read_provider) = make_read_provider( + ch_conf.name(), + res2.query.use_scylla6_workarounds().into(), + res2.scyqueue, + open_bytes, + ctx, + ncc, + ); let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed(); let stream = streams::timebinnedjson::timebinned_json_framed( res2.query, @@ -308,8 +316,14 @@ async fn binned_cbor_framed( .await? .ok_or_else(|| Error::ChannelNotFound)?; let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); - let (events_read_provider, cache_read_provider) = - make_read_provider(ch_conf.name(), res2.scyqueue, open_bytes, ctx, ncc); + let (events_read_provider, cache_read_provider) = make_read_provider( + ch_conf.name(), + res2.query.use_scylla6_workarounds().into(), + res2.scyqueue, + open_bytes, + ctx, + ncc, + ); let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed(); let stream = streams::timebinnedjson::timebinned_cbor_framed( res2.query, @@ -353,8 +367,14 @@ impl<'a> HandleRes2<'a> { .await? .ok_or_else(|| Error::ChannelNotFound)?; let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); - let (events_read_provider, cache_read_provider) = - make_read_provider(ch_conf.name(), scyqueue.clone(), open_bytes, ctx, ncc); + let (events_read_provider, cache_read_provider) = make_read_provider( + ch_conf.name(), + query.use_scylla6_workarounds().into(), + scyqueue.clone(), + open_bytes, + ctx, + ncc, + ); let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed(); let ret = Self { logspan, diff --git a/crates/httpret/src/api4/binned_v2.rs b/crates/httpret/src/api4/binned_v2.rs index 82be1cd..420bf9a 100644 --- a/crates/httpret/src/api4/binned_v2.rs +++ b/crates/httpret/src/api4/binned_v2.rs @@ -32,6 +32,7 @@ use netpod::ChannelTypeConfigGen; use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::ReqCtx; +use netpod::UseScylla6Workarounds; use netpod::APP_CBOR_FRAMED; use netpod::APP_JSON; use netpod::APP_JSON_FRAMED; @@ -208,6 +209,7 @@ async fn binned_instrumented( fn make_read_provider( chname: &str, + use_scylla6_workarounds: UseScylla6Workarounds, scyqueue: Option, open_bytes: Pin>, ctx: &ReqCtx, @@ -232,7 +234,7 @@ fn make_read_provider( let cache_read_provider = if ncc.node_config.cluster.scylla_lt().is_some() { scyqueue .clone() - .map(|qu| scyllaconn::bincache::ScyllaPrebinnedReadProvider::new(qu)) + .map(|qu| scyllaconn::bincache::ScyllaPrebinnedReadProvider::new(use_scylla6_workarounds, qu)) .map(|x| Arc::new(x) as Arc) .expect("scylla queue") } else if ncc.node.sf_databuffer.is_some() { @@ -274,6 +276,7 @@ async fn binned_json_framed( let stream = scyllaconn::binned2::frombinned::FromBinned::new( series, binrange.clone(), + res2.query.use_scylla6_workarounds().into(), scyqueue, res2.cache_read_provider, ); @@ -383,8 +386,14 @@ impl<'a> HandleRes2<'a> { .await? .ok_or_else(|| Error::ChannelNotFound)?; let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); - let (events_read_provider, cache_read_provider) = - make_read_provider(ch_conf.name(), scyqueue.clone(), open_bytes, ctx, ncc); + let (events_read_provider, cache_read_provider) = make_read_provider( + ch_conf.name(), + query.use_scylla6_workarounds().into(), + scyqueue.clone(), + open_bytes, + ctx, + ncc, + ); let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed(); let ret = Self { logspan, diff --git a/crates/httpret/src/api4/binwriteindex.rs b/crates/httpret/src/api4/binwriteindex.rs index 7fc2152..67aef5e 100644 --- a/crates/httpret/src/api4/binwriteindex.rs +++ b/crates/httpret/src/api4/binwriteindex.rs @@ -31,6 +31,7 @@ use netpod::ChannelTypeConfigGen; use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::ReqCtx; +use netpod::UseScylla6Workarounds; use netpod::APP_CBOR_FRAMED; use netpod::APP_JSON; use netpod::APP_JSON_FRAMED; @@ -53,10 +54,10 @@ use tracing::Instrument; use tracing::Span; use url::Url; -macro_rules! error { ($($arg:expr),*) => ( if true { log::error!($($arg),*); } ); } -macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ); } -macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); } -macro_rules! trace { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ); } +macro_rules! error { ($($arg:tt)*) => ( if true { log::error!($($arg)*); } ); } +macro_rules! info { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ); } +macro_rules! debug { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ); } +macro_rules! trace { ($($arg:tt)*) => ( if true { log::trace!($($arg)*); } ); } autoerr::create_error_v1!( name(Error, "Api4BinWriteIndex"), @@ -200,6 +201,7 @@ async fn binned_instrumented( fn make_read_provider( chname: &str, + use_scylla6_workarounds: UseScylla6Workarounds, scyqueue: Option, open_bytes: Pin>, ctx: &ReqCtx, @@ -224,7 +226,7 @@ fn make_read_provider( let cache_read_provider = if ncc.node_config.cluster.scylla_lt().is_some() { scyqueue .clone() - .map(|qu| scyllaconn::bincache::ScyllaPrebinnedReadProvider::new(qu)) + .map(|qu| scyllaconn::bincache::ScyllaPrebinnedReadProvider::new(use_scylla6_workarounds, qu)) .map(|x| Arc::new(x) as Arc) .expect("scylla queue") } else if ncc.node.sf_databuffer.is_some() { @@ -254,6 +256,7 @@ async fn binned_json_single( SeriesId::new(res2.ch_conf.series().unwrap()), pbp.clone(), res2.query.range().to_time().unwrap(), + res2.query.use_scylla6_workarounds().into(), res2.scyqueue.clone().unwrap(), ); while let Some(x) = stream.next().await { @@ -294,8 +297,14 @@ impl<'a> HandleRes2<'a> { .await? .ok_or_else(|| Error::ChannelNotFound)?; let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); - let (events_read_provider, cache_read_provider) = - make_read_provider(ch_conf.name(), scyqueue.clone(), open_bytes, ctx, ncc); + let (events_read_provider, cache_read_provider) = make_read_provider( + ch_conf.name(), + query.use_scylla6_workarounds().into(), + scyqueue.clone(), + open_bytes, + ctx, + ncc, + ); let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed(); let ret = Self { logspan, diff --git a/crates/httpret/src/channelconfig.rs b/crates/httpret/src/channelconfig.rs index 0fd01d7..cbc2a54 100644 --- a/crates/httpret/src/channelconfig.rs +++ b/crates/httpret/src/channelconfig.rs @@ -605,6 +605,8 @@ impl IocForChannel { pub struct ScyllaSeriesTsMspQuery { channel: SfDbChannel, range: SeriesRange, + #[serde(default, skip_serializing_if = "Option::is_none")] + use_scylla6_workarounds: Option, } impl FromUrl for ScyllaSeriesTsMspQuery { @@ -624,7 +626,11 @@ impl FromUrl for ScyllaSeriesTsMspQuery { } else { return Err(Error::MissingTimerange); }; - Ok(Self { channel, range }) + Ok(Self { + channel, + range, + use_scylla6_workarounds: pairs.get("use_scylla6_workarounds").and_then(|x| x.parse().ok()), + }) } } @@ -693,10 +699,14 @@ impl ScyllaSeriesTsMsp { use scyllaconn::SeriesId; let sid = SeriesId::new(chconf.series()); let scyqueue = shared_res.scyqueue.clone().unwrap(); - let mut st_ts_msp_ms = Vec::new(); - let mut msp_stream = - scyllaconn::events2::msp::MspStreamRt::new(RetentionTime::Short, sid, (&q.range).into(), scyqueue.clone()); + let mut msp_stream = scyllaconn::events2::msp::MspStreamRt::new( + RetentionTime::Short, + sid, + (&q.range).into(), + q.use_scylla6_workarounds.into(), + scyqueue.clone(), + ); use chrono::TimeZone; while let Some(x) = msp_stream.next().await { let v = x.unwrap().ms(); @@ -706,8 +716,13 @@ impl ScyllaSeriesTsMsp { } let mut mt_ts_msp_ms = Vec::new(); - let mut msp_stream = - scyllaconn::events2::msp::MspStreamRt::new(RetentionTime::Medium, sid, (&q.range).into(), scyqueue.clone()); + let mut msp_stream = scyllaconn::events2::msp::MspStreamRt::new( + RetentionTime::Medium, + sid, + (&q.range).into(), + q.use_scylla6_workarounds.into(), + scyqueue.clone(), + ); while let Some(x) = msp_stream.next().await { let v = x.unwrap().ms(); let st = chrono::Utc.timestamp_millis_opt(v as _).earliest().unwrap(); @@ -716,8 +731,13 @@ impl ScyllaSeriesTsMsp { } let mut lt_ts_msp_ms = Vec::new(); - let mut msp_stream = - scyllaconn::events2::msp::MspStreamRt::new(RetentionTime::Long, sid, (&q.range).into(), scyqueue.clone()); + let mut msp_stream = scyllaconn::events2::msp::MspStreamRt::new( + RetentionTime::Long, + sid, + (&q.range).into(), + q.use_scylla6_workarounds.into(), + scyqueue.clone(), + ); while let Some(x) = msp_stream.next().await { let v = x.unwrap().ms(); let st = chrono::Utc.timestamp_millis_opt(v as _).earliest().unwrap(); diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index a66dbf8..d066e5d 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -43,6 +43,7 @@ pub async fn scylla_channel_event_stream( evq.need_one_before_range(), evq.need_value_data(), evq.settings().scylla_read_queue_len(), + evq.use_scylla6_workarounds().into(), ); let stream: Pin + Send>> = if let Some(rt) = evq.use_rt() { trace!("========= SOLO {rt:?} ====================="); diff --git a/crates/scyllaconn/src/bincache.rs b/crates/scyllaconn/src/bincache.rs index ef580f2..8c39b1b 100644 --- a/crates/scyllaconn/src/bincache.rs +++ b/crates/scyllaconn/src/bincache.rs @@ -6,6 +6,7 @@ use items_0::merge::MergeableTy; use items_2::binning::container_bins::ContainerBins; use netpod::DtMs; use netpod::TsNano; +use netpod::UseScylla6Workarounds; use netpod::ttl::RetentionTime; use std::ops::Range; use streams::timebin::cached::reader::BinsReadRes; @@ -17,13 +18,14 @@ async fn scylla_read_prebinned_f32( bin_len: DtMs, msp: u64, offs: Range, + use_scylla6_workarounds: UseScylla6Workarounds, scyqueue: ScyllaQueue, ) -> BinsReadRes { let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long]; let mut res = Vec::new(); for rt in rts { let x = scyqueue - .read_prebinned_f32(rt, series, bin_len, msp, offs.clone()) + .read_prebinned_f32(rt, series, bin_len, msp, offs.clone(), use_scylla6_workarounds.clone()) .await?; res.push(x); } @@ -68,12 +70,16 @@ async fn scylla_read_prebinned_f32( } pub struct ScyllaPrebinnedReadProvider { + use_scylla6_workarounds: UseScylla6Workarounds, scyqueue: ScyllaQueue, } impl ScyllaPrebinnedReadProvider { - pub fn new(scyqueue: ScyllaQueue) -> Self { - Self { scyqueue } + pub fn new(use_scylla6_workarounds: UseScylla6Workarounds, scyqueue: ScyllaQueue) -> Self { + Self { + use_scylla6_workarounds, + scyqueue, + } } } @@ -86,17 +92,26 @@ impl streams::timebin::CacheReadProvider for ScyllaPrebinnedReadProvider { offs: Range, ) -> streams::timebin::cached::reader::CacheReading { // let fut = async { todo!("TODO impl scylla cache read") }; - let fut = scylla_read_prebinned_f32(series, bin_len, msp, offs, self.scyqueue.clone()); + let fut = scylla_read_prebinned_f32( + series, + bin_len, + msp, + offs, + self.use_scylla6_workarounds.clone(), + self.scyqueue.clone(), + ); streams::timebin::cached::reader::CacheReading::new(Box::pin(fut)) } } +// TODO remove? pub async fn worker_read( rt: RetentionTime, series: u64, bin_len: DtMs, msp: u64, offs: core::ops::Range, + use_scylla6_workarounds: UseScylla6Workarounds, stmts: &StmtsEvents, scy: &ScySession, ) -> Result, streams::timebin::cached::reader::Error> { @@ -110,7 +125,14 @@ pub async fn worker_read( offs.end as i32, ); let res = scy - .execute_iter(stmts.rt(&rt).prebinned_f32().clone(), params) + .execute_iter( + stmts + .cache_bypass(*use_scylla6_workarounds) + .rt(&rt) + .prebinned_f32() + .clone(), + params, + ) .await .map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?; let mut it = res diff --git a/crates/scyllaconn/src/binned2/binnedrtbinlen.rs b/crates/scyllaconn/src/binned2/binnedrtbinlen.rs index cf8c419..a118dab 100644 --- a/crates/scyllaconn/src/binned2/binnedrtbinlen.rs +++ b/crates/scyllaconn/src/binned2/binnedrtbinlen.rs @@ -3,6 +3,7 @@ use daqbuf_series::SeriesId; use daqbuf_series::msp::PrebinnedPartitioning; use netpod::BinnedRange; use netpod::TsNano; +use netpod::UseScylla6Workarounds; use netpod::ttl::RetentionTime; /* @@ -16,6 +17,7 @@ pub struct BinnedRtBinlenStream { rt: RetentionTime, pbp: PrebinnedPartitioning, range: BinnedRange, + use_scylla6_workarounds: UseScylla6Workarounds, scyqueue: ScyllaQueue, } @@ -25,6 +27,7 @@ impl BinnedRtBinlenStream { rt: RetentionTime, pbp: PrebinnedPartitioning, range: BinnedRange, + use_scylla6_workarounds: UseScylla6Workarounds, scyqueue: ScyllaQueue, ) -> Self { Self { @@ -32,6 +35,7 @@ impl BinnedRtBinlenStream { rt, pbp, range, + use_scylla6_workarounds, scyqueue, } } @@ -42,6 +46,14 @@ impl BinnedRtBinlenStream { let msp = todo!(); let binlen = todo!(); let lsps = todo!(); - super::binnedrtmsplsps::BinnedRtMspLsps::new(series, rt, msp, binlen, lsps, self.scyqueue.clone()); + super::binnedrtmsplsps::BinnedRtMspLsps::new( + series, + rt, + msp, + binlen, + lsps, + self.use_scylla6_workarounds.clone(), + self.scyqueue.clone(), + ); } } diff --git a/crates/scyllaconn/src/binned2/binnedrtmsplsps.rs b/crates/scyllaconn/src/binned2/binnedrtmsplsps.rs index 1ef0212..3e49243 100644 --- a/crates/scyllaconn/src/binned2/binnedrtmsplsps.rs +++ b/crates/scyllaconn/src/binned2/binnedrtmsplsps.rs @@ -12,13 +12,11 @@ use daqbuf_series::msp::LspU32; use daqbuf_series::msp::MspU32; use items_2::binning::container_bins::ContainerBins; use netpod::DtMs; +use netpod::UseScylla6Workarounds; use netpod::ttl::RetentionTime; use std::fmt; use std::pin::Pin; -macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ); } -macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); } - autoerr::create_error_v1!( name(Error, "BinnedRtMsp"), enum variants { @@ -43,6 +41,7 @@ pub struct BinnedRtMspLsps { msp: MspU32, lsps: (LspU32, LspU32), binlen: DtMs, + use_scylla6_workarounds: UseScylla6Workarounds, scyqueue: ScyllaQueue, fut: Option, } @@ -54,6 +53,7 @@ impl BinnedRtMspLsps { msp: MspU32, binlen: DtMs, lsps: (LspU32, LspU32), + use_scylla6_workarounds: UseScylla6Workarounds, scyqueue: ScyllaQueue, ) -> Self { Self { @@ -62,6 +62,7 @@ impl BinnedRtMspLsps { msp, lsps, binlen, + use_scylla6_workarounds, scyqueue, fut: None, } @@ -75,7 +76,7 @@ impl BinnedRtMspLsps { let offs = self.lsps.0.to_u32()..self.lsps.1.to_u32(); // SAFETY we only use scyqueue while we self are alive. let scyqueue = unsafe { &*(&self.scyqueue as *const ScyllaQueue) }; - let fut = scyqueue.read_prebinned_f32(rt, series, binlen, msp, offs); + let fut = scyqueue.read_prebinned_f32(rt, series, binlen, msp, offs, self.use_scylla6_workarounds.clone()); let fut = Box::pin(fut); Some(fut) } diff --git a/crates/scyllaconn/src/binned2/frombinned.rs b/crates/scyllaconn/src/binned2/frombinned.rs index f771c14..506bfe0 100644 --- a/crates/scyllaconn/src/binned2/frombinned.rs +++ b/crates/scyllaconn/src/binned2/frombinned.rs @@ -19,6 +19,7 @@ use netpod::BinnedRange; use netpod::DtMs; use netpod::TsMs; use netpod::TsNano; +use netpod::UseScylla6Workarounds; use netpod::range::evrange::NanoRange; use netpod::ttl::RetentionTime; use serde::Serialize; @@ -74,11 +75,17 @@ impl FromBinned { pub fn new( series: SeriesId, binrange: BinnedRange, + use_scylla6_workarounds: UseScylla6Workarounds, scyqueue: &ScyllaQueue, cache_read_provider: Arc, ) -> Self { let state_a = StateA::ReadAllCoarse( - ReadAllCoarse::new(series, binrange.to_nano_range(), scyqueue.clone()), + ReadAllCoarse::new( + series, + binrange.to_nano_range(), + use_scylla6_workarounds, + scyqueue.clone(), + ), def(), ); Self { diff --git a/crates/scyllaconn/src/binned2/msplspiter.rs b/crates/scyllaconn/src/binned2/msplspiter.rs index 345f996..5bb5751 100644 --- a/crates/scyllaconn/src/binned2/msplspiter.rs +++ b/crates/scyllaconn/src/binned2/msplspiter.rs @@ -12,10 +12,8 @@ pub struct MspLspItem { #[derive(Debug)] pub struct MspLspIter { - #[allow(unused)] range: NanoRange, pbp: PrebinnedPartitioning, - #[allow(unused)] mins: (u32, u32), maxs: (u32, u32), curs: (u32, u32), @@ -37,6 +35,10 @@ impl MspLspIter { } } + pub fn range(&self) -> NanoRange { + self.range.clone() + } + pub fn mins(&self) -> (MspU32, LspU32) { (MspU32(self.mins.0), LspU32(self.mins.1)) } diff --git a/crates/scyllaconn/src/binned3.rs b/crates/scyllaconn/src/binned3.rs new file mode 100644 index 0000000..249d7a5 --- /dev/null +++ b/crates/scyllaconn/src/binned3.rs @@ -0,0 +1,3 @@ +pub mod index_entry; +pub mod layeredatgrid; +pub mod layeredtop; diff --git a/crates/scyllaconn/src/binned3/index_entry.rs b/crates/scyllaconn/src/binned3/index_entry.rs new file mode 100644 index 0000000..a4c0902 --- /dev/null +++ b/crates/scyllaconn/src/binned3/index_entry.rs @@ -0,0 +1,22 @@ +use daqbuf_series::msp::LspU32; +use daqbuf_series::msp::MspU32; +use daqbuf_series::msp::PrebinnedPartitioning; +use netpod::DtMs; +use std::collections::VecDeque; + +#[derive(Debug)] +pub struct IndexEntry { + pub pbp: PrebinnedPartitioning, + pub msp: MspU32, + pub lsp: LspU32, + pub binlen: DtMs, +} + +pub fn check_good_order(v: &VecDeque) -> bool { + for (a, b) in v.iter().skip(1).zip(v.iter()) { + if a.binlen < b.binlen { + return false; + } + } + true +} diff --git a/crates/scyllaconn/src/binned3/layeredatgrid.rs b/crates/scyllaconn/src/binned3/layeredatgrid.rs new file mode 100644 index 0000000..2980e25 --- /dev/null +++ b/crates/scyllaconn/src/binned3/layeredatgrid.rs @@ -0,0 +1,46 @@ +use crate::binned3::index_entry::IndexEntry; +use crate::worker::ScyllaQueue; +use daqbuf_series::SeriesId; +use futures_util::Stream; +use items_0::streamitem::Sitemty3; +use netpod::BinnedRange; +use netpod::TsNano; +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +autoerr::create_error_v1!( + name(Error, "BinReadLayeredAtGrid"), + enum variants { + IndexEntriesOrderBad, + }, +); + +fn def() -> T { + Default::default() +} + +pub struct BinReadLayeredAtGrid {} + +impl BinReadLayeredAtGrid { + pub fn new( + series: SeriesId, + binrange: BinnedRange, + index_entries: VecDeque, + scyqueue: &ScyllaQueue, + ) -> Result { + if !crate::binned3::index_entry::check_good_order(&index_entries) { + return Err(Error::IndexEntriesOrderBad); + } + todo!() + } +} + +impl Stream for BinReadLayeredAtGrid { + type Item = Sitemty3<(), Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + todo!() + } +} diff --git a/crates/scyllaconn/src/binned3/layeredtop.rs b/crates/scyllaconn/src/binned3/layeredtop.rs new file mode 100644 index 0000000..ad2f1ad --- /dev/null +++ b/crates/scyllaconn/src/binned3/layeredtop.rs @@ -0,0 +1,177 @@ +use crate::binned2::msplspiter::MspLspIter; +use crate::binned3::index_entry::IndexEntry; +use crate::worker::ScyllaQueue; +use daqbuf_series::SeriesId; +use daqbuf_series::msp::LspU32; +use daqbuf_series::msp::MspU32; +use daqbuf_series::msp::PrebinnedPartitioning; +use futures_util::Stream; +use items_0::streamitem::LogItem; +use items_0::streamitem::Sitemty3; +use netpod::BinnedRange; +use netpod::DtMs; +use netpod::TsNano; +use netpod::UseScylla6Workarounds; +use netpod::ttl::RetentionTime; +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +autoerr::create_error_v1!( + name(Error, "BinReadLayeredTop"), + enum variants { + StateModFn, + Worker(#[from] crate::worker::Error), + }, +); + +fn def() -> T { + Default::default() +} + +type FetchingIndexFutRes = Result, Error>; + +async fn fetch_index_entries( + series: SeriesId, + msp: MspU32, + lsp: LspU32, + use_scylla6_workarounds: UseScylla6Workarounds, + scyqueue: &ScyllaQueue, +) -> FetchingIndexFutRes { + let rt = RetentionTime::Long; + let pbp = PrebinnedPartitioning::Day1; + match scyqueue + .bin_write_index_read( + rt, + series, + pbp.clone(), + msp, + lsp.to_u32(), + 1 + lsp.to_u32(), + use_scylla6_workarounds, + ) + .await + { + Ok(x) => { + let mut a = VecDeque::new(); + for e in x { + let y = super::index_entry::IndexEntry { + pbp: pbp.clone(), + msp, + lsp, + binlen: DtMs::from_ms_u64(e.binlen as _), + }; + a.push_back(y); + } + Ok(a) + } + Err(e) => Err(e.into()), + } +} + +struct FetchingIndex { + fut: Pin + Send>>, +} + +enum StateModFn { + State(Box () + Send>), +} + +struct Common { + series: SeriesId, + msplspiter: MspLspIter, + use_scylla6_workarounds: UseScylla6Workarounds, + scyqueue: ScyllaQueue, + logoutbuf: VecDeque, +} + +enum State { + StartNextDay1, + FetchingIndex(FetchingIndex), + DataDone, +} + +pub struct BinReadLayeredTop { + state: State, + common: Common, +} + +impl BinReadLayeredTop { + pub fn new( + series: SeriesId, + binrange: BinnedRange, + use_scylla6_workarounds: UseScylla6Workarounds, + scyqueue: ScyllaQueue, + ) -> Result { + // TODO + // Compute (PBP) the list of MSP and LSP-ranges to query for Day1 index entries. + let msplspiter = MspLspIter::new_covering(binrange.full_range(), PrebinnedPartitioning::Day1); + // Ask scyqueue to fetch the Day1 index entries for all rt for the given series and range. + let ret = Self { + state: State::StartNextDay1, + common: Common { + series, + msplspiter, + use_scylla6_workarounds, + scyqueue, + logoutbuf: VecDeque::new(), + }, + }; + Ok(ret) + } + + fn start_next_day1(common: &mut Common) -> StateModFn { + match common.msplspiter.next() { + Some(x) => { + let series = common.series.clone(); + // SAFETY future must not outlive self. + let scyqueue = unsafe { netpod::extltref(&common.scyqueue) }; + // self.state = State::FetchingIndex(FetchingIndex { + // fut: Box::pin(fetch_index_entries(common.series.clone(), x.0, x.1, scyqueue)), + // }); + let st = State::FetchingIndex(FetchingIndex { + fut: Box::pin(fetch_index_entries( + common.series.clone(), + x.0, + x.1, + common.use_scylla6_workarounds.clone(), + scyqueue, + )), + }); + let y = move |state: &mut State| { + *state = st; + }; + let x = StateModFn::State(Box::new(y)); + x + } + None => { + // self.state = State::DataDone; + todo!() + } + } + } +} + +impl Stream for BinReadLayeredTop { + type Item = Sitemty3<(), Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break match &mut self.state { + State::StartNextDay1 => { + // + match Self::start_next_day1(&mut self.common) { + StateModFn::State(f) => { + f(&mut self.state); + continue; + } + _ => Ready(Some(Err(Error::StateModFn))), + } + } + _ => todo!(), + }; + } + } +} diff --git a/crates/scyllaconn/src/binwriteindex.rs b/crates/scyllaconn/src/binwriteindex.rs index d9c382b..53e6eb1 100644 --- a/crates/scyllaconn/src/binwriteindex.rs +++ b/crates/scyllaconn/src/binwriteindex.rs @@ -14,6 +14,7 @@ use items_0::streamitem::StreamItem; use items_0::streamitem::sitem3_data; use log::log_item_emit as lg; use netpod::DtMs; +use netpod::UseScylla6Workarounds; use netpod::range::evrange::NanoRange; use netpod::ttl::RetentionTime; use std::collections::VecDeque; @@ -65,6 +66,7 @@ pub struct BinWriteIndexRtStream { lsp_min: u32, msp_end: u32, lsp_end: u32, + use_scylla6_workarounds: UseScylla6Workarounds, fut1: Option, logbuf: VecDeque, } @@ -79,6 +81,7 @@ impl BinWriteIndexRtStream { series: SeriesId, pbp: PrebinnedPartitioning, range: NanoRange, + use_scylla6_workarounds: UseScylla6Workarounds, scyqueue: ScyllaQueue, ) -> Self { lg::info!("============================ log item emitted from binwriteindex.rs"); @@ -104,6 +107,7 @@ impl BinWriteIndexRtStream { lsp_min: lsp_beg, msp_end, lsp_end, + use_scylla6_workarounds, fut1: None, logbuf: Default::default(), } @@ -117,10 +121,11 @@ impl BinWriteIndexRtStream { msp: u32, lsp_min: u32, lsp_max: u32, + use_scylla6_workarounds: UseScylla6Workarounds, ) -> Result<(u32, u32, u32, VecDeque), crate::worker::Error> { debug!("make_next_query_fut msp {} lsp {} {}", msp, lsp_min, lsp_max); let res = scyqueue - .bin_write_index_read(rt1, series, pbp, MspU32(msp), lsp_min, lsp_max) + .bin_write_index_read(rt1, series, pbp, MspU32(msp), lsp_min, lsp_max, use_scylla6_workarounds) .await?; Ok((msp, lsp_min, lsp_max, res)) } @@ -150,6 +155,7 @@ impl BinWriteIndexRtStream { msp, lsp_min, lsp_max, + self.use_scylla6_workarounds.clone(), ); Some(Fut1(Box::pin(fut))) } else { diff --git a/crates/scyllaconn/src/binwriteindex/bwxcmb.rs b/crates/scyllaconn/src/binwriteindex/bwxcmb.rs index 00267b9..9589f72 100644 --- a/crates/scyllaconn/src/binwriteindex/bwxcmb.rs +++ b/crates/scyllaconn/src/binwriteindex/bwxcmb.rs @@ -1,10 +1,11 @@ use super::BinWriteIndexRtStream; use crate::worker::ScyllaQueue; -use daqbuf_series::msp::PrebinnedPartitioning; use daqbuf_series::SeriesId; +use daqbuf_series::msp::PrebinnedPartitioning; use futures_util::Future; use futures_util::Stream; use futures_util::StreamExt; +use netpod::UseScylla6Workarounds; use netpod::log; use netpod::range::evrange::NanoRange; use netpod::ttl::RetentionTime; @@ -14,7 +15,7 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; -macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); } +macro_rules! debug { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ); } autoerr::create_error_v1!( name(Error, "BinWriteIndexStream"), @@ -43,6 +44,7 @@ enum InpSt { #[derive(Debug)] pub struct BinWriteIndexStream { rtss: VecDeque, + use_scylla6_workarounds: UseScylla6Workarounds, } impl BinWriteIndexStream { @@ -50,7 +52,12 @@ impl BinWriteIndexStream { std::any::type_name::() } - pub fn new(series: SeriesId, range: NanoRange, scyqueue: ScyllaQueue) -> Self { + pub fn new( + series: SeriesId, + range: NanoRange, + use_scylla6_workarounds: UseScylla6Workarounds, + scyqueue: ScyllaQueue, + ) -> Self { debug!("{}::new", Self::type_name()); let mut rtss = VecDeque::new(); let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long]; @@ -60,11 +67,15 @@ impl BinWriteIndexStream { series.clone(), PrebinnedPartitioning::Day1, range.clone(), + use_scylla6_workarounds.clone(), scyqueue.clone(), ); rtss.push_back(InpSt::Polling(s)); } - BinWriteIndexStream { rtss } + BinWriteIndexStream { + rtss, + use_scylla6_workarounds, + } } fn abort(&mut self) { diff --git a/crates/scyllaconn/src/binwriteindex/read_all_coarse.rs b/crates/scyllaconn/src/binwriteindex/read_all_coarse.rs index b23f060..534aca4 100644 --- a/crates/scyllaconn/src/binwriteindex/read_all_coarse.rs +++ b/crates/scyllaconn/src/binwriteindex/read_all_coarse.rs @@ -10,7 +10,7 @@ use futures_util::TryStreamExt; use items_0::streamitem::Sitemty3; use items_0::streamitem::sitem3_data; use netpod::DtMs; -use netpod::log; +use netpod::UseScylla6Workarounds; use netpod::range::evrange::NanoRange; use netpod::ttl::RetentionTime; use std::collections::VecDeque; @@ -18,8 +18,8 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; -macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ); } -macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); } +macro_rules! info { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ); } +macro_rules! debug { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ); } autoerr::create_error_v1!( name(Error, "BinIndexReadAllCoarse"), @@ -32,6 +32,7 @@ autoerr::create_error_v1!( async fn read_all_coarse( series: SeriesId, range: NanoRange, + use_scylla6_workarounds: UseScylla6Workarounds, scyqueue: &ScyllaQueue, ) -> Result, Error> { let rts = { @@ -41,7 +42,14 @@ async fn read_all_coarse( let mut ret = VecDeque::new(); for rt in rts { let pbp = PrebinnedPartitioning::Day1; - let mut stream = BinWriteIndexRtStream::new(rt.clone(), series, pbp, range.clone(), scyqueue.clone()); + let mut stream = BinWriteIndexRtStream::new( + rt.clone(), + series, + pbp, + range.clone(), + use_scylla6_workarounds.clone(), + scyqueue.clone(), + ); while let Some(x) = stream.try_next().await? { match x.into_data() { Ok(x) => { @@ -77,11 +85,16 @@ pub struct ReadAllCoarse { } impl ReadAllCoarse { - pub fn new(series: SeriesId, range: NanoRange, scyqueue: ScyllaQueue) -> Self { + pub fn new( + series: SeriesId, + range: NanoRange, + use_scylla6_workarounds: UseScylla6Workarounds, + scyqueue: ScyllaQueue, + ) -> Self { let scyqueue = Box::new(scyqueue); let fut = { let scyqueue = unsafe { &*(scyqueue.as_ref() as *const ScyllaQueue) }; - read_all_coarse(series, range, scyqueue) + read_all_coarse(series, range, use_scylla6_workarounds, scyqueue) }; Self { scyqueue, diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index 424aa96..9d8f6ed 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -27,6 +27,7 @@ use netpod::Shape; use netpod::TsMs; use netpod::TsMsVecFmt; use netpod::TsNano; +use netpod::UseScylla6Workarounds; use netpod::log; use netpod::ttl::RetentionTime; use scylla::client::pager::QueryPager; @@ -46,6 +47,8 @@ macro_rules! error { ($($arg:tt)*) => ( if true { log::error!($($arg)*); } ) } macro_rules! warn { ($($arg:tt)*) => ( if true { log::warn!($($arg)*); } ) } +macro_rules! info { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ) } + macro_rules! trace_init { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ) } macro_rules! trace_fetch { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ) } @@ -69,14 +72,21 @@ pub struct EventReadOpts { with_values: bool, one_before: bool, qucap: u32, + use_scylla6_workarounds: UseScylla6Workarounds, } impl EventReadOpts { - pub fn new(one_before: bool, with_values: bool, qucap: Option) -> Self { + pub fn new( + one_before: bool, + with_values: bool, + qucap: Option, + use_scylla6_workarounds: UseScylla6Workarounds, + ) -> Self { Self { one_before, with_values, qucap: qucap.unwrap_or(6), + use_scylla6_workarounds, } } @@ -379,7 +389,13 @@ impl EventsStreamRt { ) -> Self { trace_init!("EventsStreamRt::new {ch_conf:?} {range:?} {rt:?} {readopts:?}"); let series = SeriesId::new(ch_conf.series()); - let msp_inp = crate::events2::msp::MspStreamRt::new(rt.clone(), series, range.clone(), scyqueue.clone()); + let msp_inp = crate::events2::msp::MspStreamRt::new( + rt.clone(), + series, + range.clone(), + readopts.use_scylla6_workarounds.clone(), + scyqueue.clone(), + ); Self { qucap: readopts.qucap as usize, rt, @@ -878,6 +894,7 @@ async fn read_next_values_3_fwd( table_name ); let qu = stmts + .cache_bypass(*opts.readopts.use_scylla6_workarounds) .rt(&opts.rt) .lsp(!opts.fwd, with_values) .shape(val_ty_dyn.is_valueblob()) @@ -926,6 +943,7 @@ async fn read_lsp_all( opts.range.beg().delta(opts.ts_msp.ns()) }; let mut qu = stmts + .cache_bypass(*opts.readopts.use_scylla6_workarounds) .rt(&opts.rt) .lsp_all() .shape(opts.val_ty_dyn.is_valueblob()) @@ -964,6 +982,9 @@ async fn read_next_values_3_bck( if n > 1024 * 200 { log::info!("{n} lsp in msp {msp}", msp = opts.ts_msp) // TODO metrics + } else if n > 1024 * 20 { + log::debug!("{n} lsp in msp {msp}", msp = opts.ts_msp) + // TODO metrics } } jobtrace.add_event_now(ReadEventKind::ReadEventsLspAllDone); @@ -974,6 +995,9 @@ async fn read_next_values_3_bck( return Ok((ret,)); }; let val_ty_dyn = &opts.val_ty_dyn; + if *opts.readopts.use_scylla6_workarounds == false { + info!("{selfname} NO WORKAROUND"); + } trace_fetch!("{selfname} {:?} st_name {}", opts, val_ty_dyn.st_name()); let series = opts.series; let ts_msp = opts.ts_msp; @@ -981,6 +1005,7 @@ async fn read_next_values_3_bck( let with_values = opts.readopts.with_values(); trace_fetch!("{selfname} ts_msp {} lsp {} {}", ts_msp.fmt(), lsp, table_name); let qu = stmts + .cache_bypass(*opts.readopts.use_scylla6_workarounds) .rt(&opts.rt) .lsp(false, with_values) // TODO diff --git a/crates/scyllaconn/src/events2/msp.rs b/crates/scyllaconn/src/events2/msp.rs index 165efd5..6812672 100644 --- a/crates/scyllaconn/src/events2/msp.rs +++ b/crates/scyllaconn/src/events2/msp.rs @@ -8,6 +8,7 @@ use futures_util::Stream; use futures_util::TryStreamExt; use netpod::TsMs; use netpod::TsMsVecFmt; +use netpod::UseScylla6Workarounds; use netpod::log; use netpod::ttl::RetentionTime; use scylla::client::session::Session; @@ -92,23 +93,40 @@ pub struct MspStreamRt { out: VecDeque, scyqueue: ScyllaQueue, do_trace_detail: bool, + use_scylla6_workarounds: UseScylla6Workarounds, } impl MspStreamRt { - pub fn new(rt: RetentionTime, series: SeriesId, range: ScyllaSeriesRange, scyqueue: ScyllaQueue) -> Self { + pub fn new( + rt: RetentionTime, + series: SeriesId, + range: ScyllaSeriesRange, + use_scylla6_workarounds: UseScylla6Workarounds, + scyqueue: ScyllaQueue, + ) -> Self { let fut_bck = { let scyqueue = scyqueue.clone(); let rt = rt.clone(); let series = series.clone(); let range = range.clone(); - async move { scyqueue.find_ts_msp(rt, series.id(), range, true).await } + let use_scylla6_workarounds = use_scylla6_workarounds.clone(); + async move { + scyqueue + .find_ts_msp(rt, series, range, true, use_scylla6_workarounds) + .await + } }; let fut_fwd = { let scyqueue = scyqueue.clone(); let rt = rt.clone(); let series = series.clone(); let range = range.clone(); - async move { scyqueue.find_ts_msp(rt, series.id(), range, false).await } + let use_scylla6_workarounds = use_scylla6_workarounds.clone(); + async move { + scyqueue + .find_ts_msp(rt, series, range, false, use_scylla6_workarounds) + .await + } }; let do_trace_detail = daqbuf_series::dbg::dbg_series(series.clone()); trace_emit!(do_trace_detail, "------------------------------------- TEST TRACE"); @@ -124,6 +142,7 @@ impl MspStreamRt { out: VecDeque::new(), scyqueue, do_trace_detail, + use_scylla6_workarounds, } } @@ -144,7 +163,12 @@ impl MspStreamRt { let scyqueue = self.scyqueue.clone(); let rt = self.rt.clone(); let series = self.series.clone(); - async move { scyqueue.find_ts_msp(rt, series.id(), range, false).await } + let use_scylla6_workarounds = self.use_scylla6_workarounds.clone(); + async move { + scyqueue + .find_ts_msp(rt, series, range, false, use_scylla6_workarounds) + .await + } }; Resolvable::Future(Box::pin(fut_fwd)) } @@ -301,6 +325,7 @@ pub async fn find_ts_msp( series: u64, range: ScyllaSeriesRange, bck: bool, + use_scylla6_workarounds: UseScylla6Workarounds, stmts: &StmtsEvents, scy: &Session, ) -> Result, Error> { @@ -312,9 +337,13 @@ pub async fn find_ts_msp( bck ); if bck { - find_ts_msp_bck_workaround(rt, series, range, stmts, scy).await + if *use_scylla6_workarounds { + find_ts_msp_bck_workaround(rt, series, range, stmts, scy).await + } else { + find_ts_msp_bck(rt, series, range, stmts, scy).await + } } else { - find_ts_msp_fwd(rt, series, range, stmts, scy).await + find_ts_msp_fwd(rt, series, range, use_scylla6_workarounds, stmts, scy).await } } @@ -322,6 +351,7 @@ async fn find_ts_msp_fwd( rt: &RetentionTime, series: u64, range: ScyllaSeriesRange, + use_scylla6_workarounds: UseScylla6Workarounds, stmts: &StmtsEvents, scy: &Session, ) -> Result, Error> { @@ -331,7 +361,10 @@ async fn find_ts_msp_fwd( let params = (series as i64, range.beg().ms() as i64, 1 + range.end().ms() as i64); log_fetch_result!("{selfname} {:?}", params); let mut res = scy - .execute_iter(stmts.rt(rt).ts_msp_fwd().clone(), params) + .execute_iter( + stmts.cache_bypass(*use_scylla6_workarounds).rt(rt).ts_msp_fwd().clone(), + params, + ) .await? .rows_stream::<(i64,)>()?; while let Some(row) = res.try_next().await? { @@ -354,7 +387,7 @@ async fn find_ts_msp_bck( let params = (series as i64, range.beg().ms() as i64); log_fetch_result!("{selfname} {:?}", params); let mut res = scy - .execute_iter(stmts.rt(rt).ts_msp_bck().clone(), params) + .execute_iter(stmts.cache_bypass(false).rt(rt).ts_msp_bck().clone(), params) .await? .rows_stream::<(i64,)>()?; while let Some(row) = res.try_next().await? { @@ -379,7 +412,7 @@ async fn find_ts_msp_bck_workaround( let params = (series as i64, 0 as i64, i64::MAX); log_fetch_result!("{selfname} {:?}", params); let mut res = scy - .execute_iter(stmts.rt(rt).ts_msp_bck_workaround().clone(), params) + .execute_iter(stmts.cache_bypass(true).rt(rt).ts_msp_bck_workaround().clone(), params) .await? .rows_stream::<(i64,)>()?; let mut c = 0; diff --git a/crates/scyllaconn/src/events2/prepare.rs b/crates/scyllaconn/src/events2/prepare.rs index fdcf0a2..0e2fa02 100644 --- a/crates/scyllaconn/src/events2/prepare.rs +++ b/crates/scyllaconn/src/events2/prepare.rs @@ -2,9 +2,9 @@ use netpod::ttl::RetentionTime; use scylla::client::session::Session; use scylla::statement::prepared::PreparedStatement; -macro_rules! info_prepare { - ($($arg:tt)*) => { log::info!("prepare cql {}", format_args!($($arg)*)); }; -} +macro_rules! log_prepare { ($($arg:tt)*) => { log::debug!("prepare cql {}", format_args!($($arg)*)); }; } + +macro_rules! trace_scy6 { ($($arg:tt)*) => { log::info!("{}", format_args!($($arg)*)); }; } autoerr::create_error_v1!( name(Error, "ScyllaPrepare"), @@ -141,6 +141,7 @@ impl StmtsEventsRt { } pub fn ts_msp_bck(&self) -> &PreparedStatement { + trace_scy6!("StmtsEventsRt ORDER DESC"); &self.ts_msp_bck } @@ -190,7 +191,7 @@ async fn make_msp_dir( select_cond, query_opts ); - info_prepare!("{ks} {rt} {cql}"); + log_prepare!("{ks} {rt} {cql}"); let qu = scy.prepare(cql).await?; Ok(qu) } @@ -211,7 +212,7 @@ async fn make_msp_fwd_for_bck_workaround( select_cond, query_opts ); - info_prepare!("{ks} {rt} {cql}"); + log_prepare!("{ks} {rt} {cql}"); let qu = scy.prepare(cql).await?; Ok(qu) } @@ -235,7 +236,7 @@ async fn make_lsp_all_shape_st( stname, query_opts ); - info_prepare!("{ks} {rt} {cql}"); + log_prepare!("{ks} {rt} {cql}"); let qu = scy.prepare(cql).await?; Ok(qu) } @@ -273,7 +274,7 @@ async fn make_lsp_all_shape( table_name, query_opts ); - info_prepare!("{ks} {rt} {cql}"); + log_prepare!("{ks} {rt} {cql}"); let qu = scy.prepare(cql).await?; qu }, @@ -317,7 +318,7 @@ async fn make_lsp( select_cond, query_opts ); - info_prepare!("{ks} {rt} {cql}"); + log_prepare!("{ks} {rt} {cql}"); let qu = scy.prepare(cql).await?; Ok(qu) } @@ -373,7 +374,7 @@ async fn make_lsp_shape( table_name, query_opts ); - info_prepare!("{ks} {rt} {cql}"); + log_prepare!("{ks} {rt} {cql}"); let qu = scy.prepare(cql).await?; qu }, @@ -413,7 +414,7 @@ async fn make_prebinned_f32( rt.table_prefix(), query_opts ); - info_prepare!("{ks} {rt} {cql}"); + log_prepare!("{ks} {rt} {cql}"); let qu = scy.prepare(cql).await?; Ok(qu) } @@ -436,7 +437,7 @@ async fn make_bin_write_index_read( rt.table_prefix(), query_opts ); - info_prepare!("{ks} {rt} {cql}"); + log_prepare!("{ks} {rt} {cql}"); let qu = scy.prepare(cql).await?; Ok(qu) } @@ -458,24 +459,29 @@ async fn make_rt(ks: &str, rt: &RetentionTime, query_opts: &str, scy: &Session) } #[derive(Debug)] -pub struct StmtsEvents { +pub struct StmtsEventsCacheBypass { st: StmtsEventsRt, mt: StmtsEventsRt, lt: StmtsEventsRt, + bypass_cache: bool, } -impl StmtsEvents { +impl StmtsEventsCacheBypass { pub async fn new(ks: [&str; 3], bypass_cache: bool, scy: &Session) -> Result { let query_opts = if bypass_cache { "bypass cache" } else { "" }; - let ret = StmtsEvents { + let ret = StmtsEventsCacheBypass { st: make_rt(ks[0], &RetentionTime::Short, query_opts, scy).await?, mt: make_rt(ks[1], &RetentionTime::Medium, query_opts, scy).await?, lt: make_rt(ks[2], &RetentionTime::Long, query_opts, scy).await?, + bypass_cache, }; Ok(ret) } pub fn rt(&self, rt: &RetentionTime) -> &StmtsEventsRt { + if self.bypass_cache == false { + trace_scy6!("StmtsEventsCacheBypass false"); + } match rt { RetentionTime::Short => &self.st, RetentionTime::Medium => &self.mt, @@ -483,3 +489,28 @@ impl StmtsEvents { } } } + +#[derive(Debug)] +pub struct StmtsEvents { + cache_use: StmtsEventsCacheBypass, + cache_bypass: StmtsEventsCacheBypass, +} + +impl StmtsEvents { + pub async fn new(ks: [&str; 3], scy: &Session) -> Result { + let ret = StmtsEvents { + cache_use: StmtsEventsCacheBypass::new(ks, false, scy).await?, + cache_bypass: StmtsEventsCacheBypass::new(ks, true, scy).await?, + }; + Ok(ret) + } + + pub fn cache_bypass(&self, cache_bypass: bool) -> &StmtsEventsCacheBypass { + if cache_bypass { + &self.cache_bypass + } else { + trace_scy6!("cache_bypass false"); + &self.cache_use + } + } +} diff --git a/crates/scyllaconn/src/lib.rs b/crates/scyllaconn/src/lib.rs index f0c7d7c..4fc1185 100644 --- a/crates/scyllaconn/src/lib.rs +++ b/crates/scyllaconn/src/lib.rs @@ -1,6 +1,7 @@ pub mod accounting; pub mod bincache; pub mod binned2; +pub mod binned3; pub mod binwriteindex; pub mod conn; pub mod errconv; diff --git a/crates/scyllaconn/src/worker.rs b/crates/scyllaconn/src/worker.rs index b6c568a..6bcf696 100644 --- a/crates/scyllaconn/src/worker.rs +++ b/crates/scyllaconn/src/worker.rs @@ -16,6 +16,7 @@ use items_2::binning::container_bins::ContainerBins; use netpod::DtMs; use netpod::ScyllaConfig; use netpod::TsMs; +use netpod::UseScylla6Workarounds; use netpod::log; use netpod::ttl::RetentionTime; use std::collections::VecDeque; @@ -56,6 +57,48 @@ impl From> for Error { type ScySessTy = scylla::client::session::Session; +#[derive(Debug)] +struct FindTsMsp { + rt: RetentionTime, + series: SeriesId, + range: ScyllaSeriesRange, + bck: bool, + use_scylla6_workarounds: UseScylla6Workarounds, + tx: Sender, Error>>, +} + +impl FindTsMsp { + async fn execute(self, stmts: &StmtsEvents, scy: &ScySessTy) { + // TODO avoid the extra clone + let tx = self.tx.clone(); + match self.execute_inner(stmts, scy).await { + Ok(()) => {} + Err(e) => { + if tx.send(Err(e)).await.is_err() { + // TODO count for stats + } + } + } + } + + async fn execute_inner(self, stmts: &StmtsEvents, scy: &ScySessTy) -> Result<(), Error> { + let res = crate::events2::msp::find_ts_msp( + &self.rt, + self.series.id(), + self.range, + self.bck, + self.use_scylla6_workarounds, + &stmts, + &scy, + ) + .await; + if self.tx.send(res.map_err(Into::into)).await.is_err() { + // TODO count for stats + } + Ok(()) + } +} + #[derive(Debug)] struct ReadPrebinnedF32 { rt: RetentionTime, @@ -63,17 +106,19 @@ struct ReadPrebinnedF32 { bin_len: DtMs, msp: u64, offs: core::ops::Range, + use_scylla6_workarounds: UseScylla6Workarounds, tx: Sender, streams::timebin::cached::reader::Error>>, } #[derive(Debug)] struct BinWriteIndexRead { - rt1: RetentionTime, + rt: RetentionTime, series: SeriesId, pbp: PrebinnedPartitioning, msp: MspU32, lsp_min: u32, lsp_max: u32, + use_scylla6_workarounds: UseScylla6Workarounds, tx: Sender, Error>>, } @@ -101,7 +146,14 @@ impl BinWriteIndexRead { ); log::info!("execute {:?}", params); let res = scy - .execute_iter(stmts.rt(&self.rt1).bin_write_index_read().clone(), params) + .execute_iter( + stmts + .cache_bypass(*self.use_scylla6_workarounds) + .rt(&self.rt) + .bin_write_index_read() + .clone(), + params, + ) .await?; let mut it = res.rows_stream::<(i32, i32)>()?; let mut all = VecDeque::new(); @@ -141,14 +193,7 @@ impl fmt::Debug for ExecuteV1 { #[derive(Debug)] enum Job { - FindTsMsp( - RetentionTime, - // series-id - u64, - ScyllaSeriesRange, - bool, - Sender, Error>>, - ), + FindTsMsp(FindTsMsp), AccountingReadTs( RetentionTime, TsMs, @@ -178,12 +223,21 @@ impl ScyllaQueue { pub async fn find_ts_msp( &self, rt: RetentionTime, - series: u64, + series: SeriesId, range: ScyllaSeriesRange, bck: bool, + use_scylla6_workarounds: UseScylla6Workarounds, ) -> Result, Error> { let (tx, rx) = async_channel::bounded(1); - let job = Job::FindTsMsp(rt, series, range, bck, tx); + let job = FindTsMsp { + rt, + series, + range, + bck, + use_scylla6_workarounds, + tx, + }; + let job = Job::FindTsMsp(job); self.tx.send(job).await.map_err(|_| Error::ChannelSend)?; let res = rx.recv().await.map_err(|_| Error::ChannelRecv)??; Ok(res) @@ -237,6 +291,7 @@ impl ScyllaQueue { bin_len: DtMs, msp: u64, offs: core::ops::Range, + use_scylla6_workarounds: UseScylla6Workarounds, ) -> Result, streams::timebin::cached::reader::Error> { let (tx, rx) = async_channel::bounded(1); let job = Job::ReadPrebinnedF32(ReadPrebinnedF32 { @@ -245,6 +300,7 @@ impl ScyllaQueue { bin_len, msp, offs, + use_scylla6_workarounds, tx, }); self.tx @@ -260,21 +316,23 @@ impl ScyllaQueue { pub async fn bin_write_index_read( &self, - rt1: RetentionTime, + rt: RetentionTime, series: SeriesId, pbp: PrebinnedPartitioning, msp: MspU32, lsp_min: u32, lsp_max: u32, + use_scylla6_workarounds: UseScylla6Workarounds, ) -> Result, Error> { let (tx, rx) = async_channel::bounded(1); let job = BinWriteIndexRead { - rt1, + rt, series, pbp, msp, lsp_min, lsp_max, + use_scylla6_workarounds, tx, }; let job = Job::BinWriteIndexRead(job); @@ -356,25 +414,13 @@ impl ScyllaWorker { self.scyconf_lt.keyspace.as_str(), ]; debug!("scylla worker prepare start"); - let stmts = StmtsEvents::new( - kss.try_into().map_err(|_| Error::MissingKeyspaceConfig)?, - self.scyconf_st.bypass_cache, - &scy, - ) - .await?; + let stmts = StmtsEvents::new(kss.try_into().map_err(|_| Error::MissingKeyspaceConfig)?, &scy).await?; let stmts = Arc::new(stmts); - // let stmts_cache = StmtsCache::new(kss[0], &scy).await?; - // let stmts_cache = Arc::new(stmts_cache); debug!("scylla worker prepare done"); self.rx .map(|job| async { match job { - Job::FindTsMsp(rt, series, range, bck, tx) => { - let res = crate::events2::msp::find_ts_msp(&rt, series, range, bck, &stmts, &scy).await; - if tx.send(res.map_err(Into::into)).await.is_err() { - // TODO count for stats - } - } + Job::FindTsMsp(job) => job.execute(&stmts, &scy).await, Job::AccountingReadTs(rt, ts, tx) => { let ks = match &rt { RetentionTime::Short => &self.scyconf_st.keyspace, @@ -396,12 +442,14 @@ impl ScyllaWorker { } } Job::ReadPrebinnedF32(job) => { + // TODO remove I guess? let res = super::bincache::worker_read( job.rt, job.series, job.bin_len, job.msp, job.offs, + job.use_scylla6_workarounds, &stmts, &scy, )