Choose workaround at query time

This commit is contained in:
Dominik Werder
2025-07-23 23:57:29 +02:00
parent cb382d2370
commit 778264eb30
23 changed files with 622 additions and 103 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqretrieve"
version = "0.5.5"
version = "0.5.6-aa.2"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2024"

View File

@@ -28,6 +28,7 @@ use netpod::ChannelTypeConfigGen;
use netpod::FromUrl;
use netpod::NodeConfigCached;
use netpod::ReqCtx;
use netpod::UseScylla6Workarounds;
use netpod::APP_CBOR_FRAMED;
use netpod::APP_JSON;
use netpod::APP_JSON_FRAMED;
@@ -190,6 +191,7 @@ async fn binned_instrumented(
fn make_read_provider(
chname: &str,
use_scylla6_workarounds: UseScylla6Workarounds,
scyqueue: Option<ScyllaQueue>,
open_bytes: Pin<Arc<OpenBoxedBytesViaHttp>>,
ctx: &ReqCtx,
@@ -214,7 +216,7 @@ fn make_read_provider(
let cache_read_provider = if ncc.node_config.cluster.scylla_lt().is_some() {
scyqueue
.clone()
.map(|qu| scyllaconn::bincache::ScyllaPrebinnedReadProvider::new(qu))
.map(|qu| scyllaconn::bincache::ScyllaPrebinnedReadProvider::new(use_scylla6_workarounds, qu))
.map(|x| Arc::new(x) as Arc<dyn CacheReadProvider>)
.expect("scylla queue")
} else if ncc.node.sf_databuffer.is_some() {
@@ -276,8 +278,14 @@ async fn binned_json_framed(
.await?
.ok_or_else(|| Error::ChannelNotFound)?;
let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()));
let (events_read_provider, cache_read_provider) =
make_read_provider(ch_conf.name(), res2.scyqueue, open_bytes, ctx, ncc);
let (events_read_provider, cache_read_provider) = make_read_provider(
ch_conf.name(),
res2.query.use_scylla6_workarounds().into(),
res2.scyqueue,
open_bytes,
ctx,
ncc,
);
let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed();
let stream = streams::timebinnedjson::timebinned_json_framed(
res2.query,
@@ -308,8 +316,14 @@ async fn binned_cbor_framed(
.await?
.ok_or_else(|| Error::ChannelNotFound)?;
let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()));
let (events_read_provider, cache_read_provider) =
make_read_provider(ch_conf.name(), res2.scyqueue, open_bytes, ctx, ncc);
let (events_read_provider, cache_read_provider) = make_read_provider(
ch_conf.name(),
res2.query.use_scylla6_workarounds().into(),
res2.scyqueue,
open_bytes,
ctx,
ncc,
);
let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed();
let stream = streams::timebinnedjson::timebinned_cbor_framed(
res2.query,
@@ -353,8 +367,14 @@ impl<'a> HandleRes2<'a> {
.await?
.ok_or_else(|| Error::ChannelNotFound)?;
let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()));
let (events_read_provider, cache_read_provider) =
make_read_provider(ch_conf.name(), scyqueue.clone(), open_bytes, ctx, ncc);
let (events_read_provider, cache_read_provider) = make_read_provider(
ch_conf.name(),
query.use_scylla6_workarounds().into(),
scyqueue.clone(),
open_bytes,
ctx,
ncc,
);
let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed();
let ret = Self {
logspan,

View File

@@ -32,6 +32,7 @@ use netpod::ChannelTypeConfigGen;
use netpod::FromUrl;
use netpod::NodeConfigCached;
use netpod::ReqCtx;
use netpod::UseScylla6Workarounds;
use netpod::APP_CBOR_FRAMED;
use netpod::APP_JSON;
use netpod::APP_JSON_FRAMED;
@@ -208,6 +209,7 @@ async fn binned_instrumented(
fn make_read_provider(
chname: &str,
use_scylla6_workarounds: UseScylla6Workarounds,
scyqueue: Option<ScyllaQueue>,
open_bytes: Pin<Arc<OpenBoxedBytesViaHttp>>,
ctx: &ReqCtx,
@@ -232,7 +234,7 @@ fn make_read_provider(
let cache_read_provider = if ncc.node_config.cluster.scylla_lt().is_some() {
scyqueue
.clone()
.map(|qu| scyllaconn::bincache::ScyllaPrebinnedReadProvider::new(qu))
.map(|qu| scyllaconn::bincache::ScyllaPrebinnedReadProvider::new(use_scylla6_workarounds, qu))
.map(|x| Arc::new(x) as Arc<dyn CacheReadProvider>)
.expect("scylla queue")
} else if ncc.node.sf_databuffer.is_some() {
@@ -274,6 +276,7 @@ async fn binned_json_framed(
let stream = scyllaconn::binned2::frombinned::FromBinned::new(
series,
binrange.clone(),
res2.query.use_scylla6_workarounds().into(),
scyqueue,
res2.cache_read_provider,
);
@@ -383,8 +386,14 @@ impl<'a> HandleRes2<'a> {
.await?
.ok_or_else(|| Error::ChannelNotFound)?;
let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()));
let (events_read_provider, cache_read_provider) =
make_read_provider(ch_conf.name(), scyqueue.clone(), open_bytes, ctx, ncc);
let (events_read_provider, cache_read_provider) = make_read_provider(
ch_conf.name(),
query.use_scylla6_workarounds().into(),
scyqueue.clone(),
open_bytes,
ctx,
ncc,
);
let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed();
let ret = Self {
logspan,

View File

@@ -31,6 +31,7 @@ use netpod::ChannelTypeConfigGen;
use netpod::FromUrl;
use netpod::NodeConfigCached;
use netpod::ReqCtx;
use netpod::UseScylla6Workarounds;
use netpod::APP_CBOR_FRAMED;
use netpod::APP_JSON;
use netpod::APP_JSON_FRAMED;
@@ -53,10 +54,10 @@ use tracing::Instrument;
use tracing::Span;
use url::Url;
macro_rules! error { ($($arg:expr),*) => ( if true { log::error!($($arg),*); } ); }
macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ); }
macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); }
macro_rules! trace { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ); }
macro_rules! error { ($($arg:tt)*) => ( if true { log::error!($($arg)*); } ); }
macro_rules! info { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ); }
macro_rules! debug { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ); }
macro_rules! trace { ($($arg:tt)*) => ( if true { log::trace!($($arg)*); } ); }
autoerr::create_error_v1!(
name(Error, "Api4BinWriteIndex"),
@@ -200,6 +201,7 @@ async fn binned_instrumented(
fn make_read_provider(
chname: &str,
use_scylla6_workarounds: UseScylla6Workarounds,
scyqueue: Option<ScyllaQueue>,
open_bytes: Pin<Arc<OpenBoxedBytesViaHttp>>,
ctx: &ReqCtx,
@@ -224,7 +226,7 @@ fn make_read_provider(
let cache_read_provider = if ncc.node_config.cluster.scylla_lt().is_some() {
scyqueue
.clone()
.map(|qu| scyllaconn::bincache::ScyllaPrebinnedReadProvider::new(qu))
.map(|qu| scyllaconn::bincache::ScyllaPrebinnedReadProvider::new(use_scylla6_workarounds, qu))
.map(|x| Arc::new(x) as Arc<dyn CacheReadProvider>)
.expect("scylla queue")
} else if ncc.node.sf_databuffer.is_some() {
@@ -254,6 +256,7 @@ async fn binned_json_single(
SeriesId::new(res2.ch_conf.series().unwrap()),
pbp.clone(),
res2.query.range().to_time().unwrap(),
res2.query.use_scylla6_workarounds().into(),
res2.scyqueue.clone().unwrap(),
);
while let Some(x) = stream.next().await {
@@ -294,8 +297,14 @@ impl<'a> HandleRes2<'a> {
.await?
.ok_or_else(|| Error::ChannelNotFound)?;
let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()));
let (events_read_provider, cache_read_provider) =
make_read_provider(ch_conf.name(), scyqueue.clone(), open_bytes, ctx, ncc);
let (events_read_provider, cache_read_provider) = make_read_provider(
ch_conf.name(),
query.use_scylla6_workarounds().into(),
scyqueue.clone(),
open_bytes,
ctx,
ncc,
);
let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed();
let ret = Self {
logspan,

View File

@@ -605,6 +605,8 @@ impl IocForChannel {
pub struct ScyllaSeriesTsMspQuery {
channel: SfDbChannel,
range: SeriesRange,
#[serde(default, skip_serializing_if = "Option::is_none")]
use_scylla6_workarounds: Option<u32>,
}
impl FromUrl for ScyllaSeriesTsMspQuery {
@@ -624,7 +626,11 @@ impl FromUrl for ScyllaSeriesTsMspQuery {
} else {
return Err(Error::MissingTimerange);
};
Ok(Self { channel, range })
Ok(Self {
channel,
range,
use_scylla6_workarounds: pairs.get("use_scylla6_workarounds").and_then(|x| x.parse().ok()),
})
}
}
@@ -693,10 +699,14 @@ impl ScyllaSeriesTsMsp {
use scyllaconn::SeriesId;
let sid = SeriesId::new(chconf.series());
let scyqueue = shared_res.scyqueue.clone().unwrap();
let mut st_ts_msp_ms = Vec::new();
let mut msp_stream =
scyllaconn::events2::msp::MspStreamRt::new(RetentionTime::Short, sid, (&q.range).into(), scyqueue.clone());
let mut msp_stream = scyllaconn::events2::msp::MspStreamRt::new(
RetentionTime::Short,
sid,
(&q.range).into(),
q.use_scylla6_workarounds.into(),
scyqueue.clone(),
);
use chrono::TimeZone;
while let Some(x) = msp_stream.next().await {
let v = x.unwrap().ms();
@@ -706,8 +716,13 @@ impl ScyllaSeriesTsMsp {
}
let mut mt_ts_msp_ms = Vec::new();
let mut msp_stream =
scyllaconn::events2::msp::MspStreamRt::new(RetentionTime::Medium, sid, (&q.range).into(), scyqueue.clone());
let mut msp_stream = scyllaconn::events2::msp::MspStreamRt::new(
RetentionTime::Medium,
sid,
(&q.range).into(),
q.use_scylla6_workarounds.into(),
scyqueue.clone(),
);
while let Some(x) = msp_stream.next().await {
let v = x.unwrap().ms();
let st = chrono::Utc.timestamp_millis_opt(v as _).earliest().unwrap();
@@ -716,8 +731,13 @@ impl ScyllaSeriesTsMsp {
}
let mut lt_ts_msp_ms = Vec::new();
let mut msp_stream =
scyllaconn::events2::msp::MspStreamRt::new(RetentionTime::Long, sid, (&q.range).into(), scyqueue.clone());
let mut msp_stream = scyllaconn::events2::msp::MspStreamRt::new(
RetentionTime::Long,
sid,
(&q.range).into(),
q.use_scylla6_workarounds.into(),
scyqueue.clone(),
);
while let Some(x) = msp_stream.next().await {
let v = x.unwrap().ms();
let st = chrono::Utc.timestamp_millis_opt(v as _).earliest().unwrap();

View File

@@ -43,6 +43,7 @@ pub async fn scylla_channel_event_stream(
evq.need_one_before_range(),
evq.need_value_data(),
evq.settings().scylla_read_queue_len(),
evq.use_scylla6_workarounds().into(),
);
let stream: Pin<Box<dyn Stream<Item = _> + Send>> = if let Some(rt) = evq.use_rt() {
trace!("========= SOLO {rt:?} =====================");

View File

@@ -6,6 +6,7 @@ use items_0::merge::MergeableTy;
use items_2::binning::container_bins::ContainerBins;
use netpod::DtMs;
use netpod::TsNano;
use netpod::UseScylla6Workarounds;
use netpod::ttl::RetentionTime;
use std::ops::Range;
use streams::timebin::cached::reader::BinsReadRes;
@@ -17,13 +18,14 @@ async fn scylla_read_prebinned_f32(
bin_len: DtMs,
msp: u64,
offs: Range<u32>,
use_scylla6_workarounds: UseScylla6Workarounds,
scyqueue: ScyllaQueue,
) -> BinsReadRes {
let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long];
let mut res = Vec::new();
for rt in rts {
let x = scyqueue
.read_prebinned_f32(rt, series, bin_len, msp, offs.clone())
.read_prebinned_f32(rt, series, bin_len, msp, offs.clone(), use_scylla6_workarounds.clone())
.await?;
res.push(x);
}
@@ -68,12 +70,16 @@ async fn scylla_read_prebinned_f32(
}
pub struct ScyllaPrebinnedReadProvider {
use_scylla6_workarounds: UseScylla6Workarounds,
scyqueue: ScyllaQueue,
}
impl ScyllaPrebinnedReadProvider {
pub fn new(scyqueue: ScyllaQueue) -> Self {
Self { scyqueue }
pub fn new(use_scylla6_workarounds: UseScylla6Workarounds, scyqueue: ScyllaQueue) -> Self {
Self {
use_scylla6_workarounds,
scyqueue,
}
}
}
@@ -86,17 +92,26 @@ impl streams::timebin::CacheReadProvider for ScyllaPrebinnedReadProvider {
offs: Range<u32>,
) -> streams::timebin::cached::reader::CacheReading {
// let fut = async { todo!("TODO impl scylla cache read") };
let fut = scylla_read_prebinned_f32(series, bin_len, msp, offs, self.scyqueue.clone());
let fut = scylla_read_prebinned_f32(
series,
bin_len,
msp,
offs,
self.use_scylla6_workarounds.clone(),
self.scyqueue.clone(),
);
streams::timebin::cached::reader::CacheReading::new(Box::pin(fut))
}
}
// TODO remove?
pub async fn worker_read(
rt: RetentionTime,
series: u64,
bin_len: DtMs,
msp: u64,
offs: core::ops::Range<u32>,
use_scylla6_workarounds: UseScylla6Workarounds,
stmts: &StmtsEvents,
scy: &ScySession,
) -> Result<ContainerBins<f32, f32>, streams::timebin::cached::reader::Error> {
@@ -110,7 +125,14 @@ pub async fn worker_read(
offs.end as i32,
);
let res = scy
.execute_iter(stmts.rt(&rt).prebinned_f32().clone(), params)
.execute_iter(
stmts
.cache_bypass(*use_scylla6_workarounds)
.rt(&rt)
.prebinned_f32()
.clone(),
params,
)
.await
.map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?;
let mut it = res

View File

@@ -3,6 +3,7 @@ use daqbuf_series::SeriesId;
use daqbuf_series::msp::PrebinnedPartitioning;
use netpod::BinnedRange;
use netpod::TsNano;
use netpod::UseScylla6Workarounds;
use netpod::ttl::RetentionTime;
/*
@@ -16,6 +17,7 @@ pub struct BinnedRtBinlenStream {
rt: RetentionTime,
pbp: PrebinnedPartitioning,
range: BinnedRange<TsNano>,
use_scylla6_workarounds: UseScylla6Workarounds,
scyqueue: ScyllaQueue,
}
@@ -25,6 +27,7 @@ impl BinnedRtBinlenStream {
rt: RetentionTime,
pbp: PrebinnedPartitioning,
range: BinnedRange<TsNano>,
use_scylla6_workarounds: UseScylla6Workarounds,
scyqueue: ScyllaQueue,
) -> Self {
Self {
@@ -32,6 +35,7 @@ impl BinnedRtBinlenStream {
rt,
pbp,
range,
use_scylla6_workarounds,
scyqueue,
}
}
@@ -42,6 +46,14 @@ impl BinnedRtBinlenStream {
let msp = todo!();
let binlen = todo!();
let lsps = todo!();
super::binnedrtmsplsps::BinnedRtMspLsps::new(series, rt, msp, binlen, lsps, self.scyqueue.clone());
super::binnedrtmsplsps::BinnedRtMspLsps::new(
series,
rt,
msp,
binlen,
lsps,
self.use_scylla6_workarounds.clone(),
self.scyqueue.clone(),
);
}
}

View File

@@ -12,13 +12,11 @@ use daqbuf_series::msp::LspU32;
use daqbuf_series::msp::MspU32;
use items_2::binning::container_bins::ContainerBins;
use netpod::DtMs;
use netpod::UseScylla6Workarounds;
use netpod::ttl::RetentionTime;
use std::fmt;
use std::pin::Pin;
macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ); }
macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); }
autoerr::create_error_v1!(
name(Error, "BinnedRtMsp"),
enum variants {
@@ -43,6 +41,7 @@ pub struct BinnedRtMspLsps {
msp: MspU32,
lsps: (LspU32, LspU32),
binlen: DtMs,
use_scylla6_workarounds: UseScylla6Workarounds,
scyqueue: ScyllaQueue,
fut: Option<FutW>,
}
@@ -54,6 +53,7 @@ impl BinnedRtMspLsps {
msp: MspU32,
binlen: DtMs,
lsps: (LspU32, LspU32),
use_scylla6_workarounds: UseScylla6Workarounds,
scyqueue: ScyllaQueue,
) -> Self {
Self {
@@ -62,6 +62,7 @@ impl BinnedRtMspLsps {
msp,
lsps,
binlen,
use_scylla6_workarounds,
scyqueue,
fut: None,
}
@@ -75,7 +76,7 @@ impl BinnedRtMspLsps {
let offs = self.lsps.0.to_u32()..self.lsps.1.to_u32();
// SAFETY we only use scyqueue while we self are alive.
let scyqueue = unsafe { &*(&self.scyqueue as *const ScyllaQueue) };
let fut = scyqueue.read_prebinned_f32(rt, series, binlen, msp, offs);
let fut = scyqueue.read_prebinned_f32(rt, series, binlen, msp, offs, self.use_scylla6_workarounds.clone());
let fut = Box::pin(fut);
Some(fut)
}

View File

@@ -19,6 +19,7 @@ use netpod::BinnedRange;
use netpod::DtMs;
use netpod::TsMs;
use netpod::TsNano;
use netpod::UseScylla6Workarounds;
use netpod::range::evrange::NanoRange;
use netpod::ttl::RetentionTime;
use serde::Serialize;
@@ -74,11 +75,17 @@ impl FromBinned {
pub fn new(
series: SeriesId,
binrange: BinnedRange<TsNano>,
use_scylla6_workarounds: UseScylla6Workarounds,
scyqueue: &ScyllaQueue,
cache_read_provider: Arc<dyn CacheReadProvider>,
) -> Self {
let state_a = StateA::ReadAllCoarse(
ReadAllCoarse::new(series, binrange.to_nano_range(), scyqueue.clone()),
ReadAllCoarse::new(
series,
binrange.to_nano_range(),
use_scylla6_workarounds,
scyqueue.clone(),
),
def(),
);
Self {

View File

@@ -12,10 +12,8 @@ pub struct MspLspItem {
#[derive(Debug)]
pub struct MspLspIter {
#[allow(unused)]
range: NanoRange,
pbp: PrebinnedPartitioning,
#[allow(unused)]
mins: (u32, u32),
maxs: (u32, u32),
curs: (u32, u32),
@@ -37,6 +35,10 @@ impl MspLspIter {
}
}
pub fn range(&self) -> NanoRange {
self.range.clone()
}
pub fn mins(&self) -> (MspU32, LspU32) {
(MspU32(self.mins.0), LspU32(self.mins.1))
}

View File

@@ -0,0 +1,3 @@
pub mod index_entry;
pub mod layeredatgrid;
pub mod layeredtop;

View File

@@ -0,0 +1,22 @@
use daqbuf_series::msp::LspU32;
use daqbuf_series::msp::MspU32;
use daqbuf_series::msp::PrebinnedPartitioning;
use netpod::DtMs;
use std::collections::VecDeque;
#[derive(Debug)]
pub struct IndexEntry {
pub pbp: PrebinnedPartitioning,
pub msp: MspU32,
pub lsp: LspU32,
pub binlen: DtMs,
}
pub fn check_good_order(v: &VecDeque<IndexEntry>) -> bool {
for (a, b) in v.iter().skip(1).zip(v.iter()) {
if a.binlen < b.binlen {
return false;
}
}
true
}

View File

@@ -0,0 +1,46 @@
use crate::binned3::index_entry::IndexEntry;
use crate::worker::ScyllaQueue;
use daqbuf_series::SeriesId;
use futures_util::Stream;
use items_0::streamitem::Sitemty3;
use netpod::BinnedRange;
use netpod::TsNano;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
autoerr::create_error_v1!(
name(Error, "BinReadLayeredAtGrid"),
enum variants {
IndexEntriesOrderBad,
},
);
fn def<T: Default>() -> T {
Default::default()
}
pub struct BinReadLayeredAtGrid {}
impl BinReadLayeredAtGrid {
pub fn new(
series: SeriesId,
binrange: BinnedRange<TsNano>,
index_entries: VecDeque<IndexEntry>,
scyqueue: &ScyllaQueue,
) -> Result<Self, Error> {
if !crate::binned3::index_entry::check_good_order(&index_entries) {
return Err(Error::IndexEntriesOrderBad);
}
todo!()
}
}
impl Stream for BinReadLayeredAtGrid {
type Item = Sitemty3<(), Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
todo!()
}
}

View File

@@ -0,0 +1,177 @@
use crate::binned2::msplspiter::MspLspIter;
use crate::binned3::index_entry::IndexEntry;
use crate::worker::ScyllaQueue;
use daqbuf_series::SeriesId;
use daqbuf_series::msp::LspU32;
use daqbuf_series::msp::MspU32;
use daqbuf_series::msp::PrebinnedPartitioning;
use futures_util::Stream;
use items_0::streamitem::LogItem;
use items_0::streamitem::Sitemty3;
use netpod::BinnedRange;
use netpod::DtMs;
use netpod::TsNano;
use netpod::UseScylla6Workarounds;
use netpod::ttl::RetentionTime;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
autoerr::create_error_v1!(
name(Error, "BinReadLayeredTop"),
enum variants {
StateModFn,
Worker(#[from] crate::worker::Error),
},
);
fn def<T: Default>() -> T {
Default::default()
}
type FetchingIndexFutRes = Result<VecDeque<IndexEntry>, Error>;
async fn fetch_index_entries(
series: SeriesId,
msp: MspU32,
lsp: LspU32,
use_scylla6_workarounds: UseScylla6Workarounds,
scyqueue: &ScyllaQueue,
) -> FetchingIndexFutRes {
let rt = RetentionTime::Long;
let pbp = PrebinnedPartitioning::Day1;
match scyqueue
.bin_write_index_read(
rt,
series,
pbp.clone(),
msp,
lsp.to_u32(),
1 + lsp.to_u32(),
use_scylla6_workarounds,
)
.await
{
Ok(x) => {
let mut a = VecDeque::new();
for e in x {
let y = super::index_entry::IndexEntry {
pbp: pbp.clone(),
msp,
lsp,
binlen: DtMs::from_ms_u64(e.binlen as _),
};
a.push_back(y);
}
Ok(a)
}
Err(e) => Err(e.into()),
}
}
struct FetchingIndex {
fut: Pin<Box<dyn Future<Output = FetchingIndexFutRes> + Send>>,
}
enum StateModFn {
State(Box<dyn FnOnce(&mut State) -> () + Send>),
}
struct Common {
series: SeriesId,
msplspiter: MspLspIter,
use_scylla6_workarounds: UseScylla6Workarounds,
scyqueue: ScyllaQueue,
logoutbuf: VecDeque<LogItem>,
}
enum State {
StartNextDay1,
FetchingIndex(FetchingIndex),
DataDone,
}
pub struct BinReadLayeredTop {
state: State,
common: Common,
}
impl BinReadLayeredTop {
pub fn new(
series: SeriesId,
binrange: BinnedRange<TsNano>,
use_scylla6_workarounds: UseScylla6Workarounds,
scyqueue: ScyllaQueue,
) -> Result<Self, Error> {
// TODO
// Compute (PBP) the list of MSP and LSP-ranges to query for Day1 index entries.
let msplspiter = MspLspIter::new_covering(binrange.full_range(), PrebinnedPartitioning::Day1);
// Ask scyqueue to fetch the Day1 index entries for all rt for the given series and range.
let ret = Self {
state: State::StartNextDay1,
common: Common {
series,
msplspiter,
use_scylla6_workarounds,
scyqueue,
logoutbuf: VecDeque::new(),
},
};
Ok(ret)
}
fn start_next_day1(common: &mut Common) -> StateModFn {
match common.msplspiter.next() {
Some(x) => {
let series = common.series.clone();
// SAFETY future must not outlive self.
let scyqueue = unsafe { netpod::extltref(&common.scyqueue) };
// self.state = State::FetchingIndex(FetchingIndex {
// fut: Box::pin(fetch_index_entries(common.series.clone(), x.0, x.1, scyqueue)),
// });
let st = State::FetchingIndex(FetchingIndex {
fut: Box::pin(fetch_index_entries(
common.series.clone(),
x.0,
x.1,
common.use_scylla6_workarounds.clone(),
scyqueue,
)),
});
let y = move |state: &mut State| {
*state = st;
};
let x = StateModFn::State(Box::new(y));
x
}
None => {
// self.state = State::DataDone;
todo!()
}
}
}
}
impl Stream for BinReadLayeredTop {
type Item = Sitemty3<(), Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break match &mut self.state {
State::StartNextDay1 => {
//
match Self::start_next_day1(&mut self.common) {
StateModFn::State(f) => {
f(&mut self.state);
continue;
}
_ => Ready(Some(Err(Error::StateModFn))),
}
}
_ => todo!(),
};
}
}
}

View File

@@ -14,6 +14,7 @@ use items_0::streamitem::StreamItem;
use items_0::streamitem::sitem3_data;
use log::log_item_emit as lg;
use netpod::DtMs;
use netpod::UseScylla6Workarounds;
use netpod::range::evrange::NanoRange;
use netpod::ttl::RetentionTime;
use std::collections::VecDeque;
@@ -65,6 +66,7 @@ pub struct BinWriteIndexRtStream {
lsp_min: u32,
msp_end: u32,
lsp_end: u32,
use_scylla6_workarounds: UseScylla6Workarounds,
fut1: Option<Fut1>,
logbuf: VecDeque<LogItem>,
}
@@ -79,6 +81,7 @@ impl BinWriteIndexRtStream {
series: SeriesId,
pbp: PrebinnedPartitioning,
range: NanoRange,
use_scylla6_workarounds: UseScylla6Workarounds,
scyqueue: ScyllaQueue,
) -> Self {
lg::info!("============================ log item emitted from binwriteindex.rs");
@@ -104,6 +107,7 @@ impl BinWriteIndexRtStream {
lsp_min: lsp_beg,
msp_end,
lsp_end,
use_scylla6_workarounds,
fut1: None,
logbuf: Default::default(),
}
@@ -117,10 +121,11 @@ impl BinWriteIndexRtStream {
msp: u32,
lsp_min: u32,
lsp_max: u32,
use_scylla6_workarounds: UseScylla6Workarounds,
) -> Result<(u32, u32, u32, VecDeque<BinWriteIndexEntry>), crate::worker::Error> {
debug!("make_next_query_fut msp {} lsp {} {}", msp, lsp_min, lsp_max);
let res = scyqueue
.bin_write_index_read(rt1, series, pbp, MspU32(msp), lsp_min, lsp_max)
.bin_write_index_read(rt1, series, pbp, MspU32(msp), lsp_min, lsp_max, use_scylla6_workarounds)
.await?;
Ok((msp, lsp_min, lsp_max, res))
}
@@ -150,6 +155,7 @@ impl BinWriteIndexRtStream {
msp,
lsp_min,
lsp_max,
self.use_scylla6_workarounds.clone(),
);
Some(Fut1(Box::pin(fut)))
} else {

View File

@@ -1,10 +1,11 @@
use super::BinWriteIndexRtStream;
use crate::worker::ScyllaQueue;
use daqbuf_series::msp::PrebinnedPartitioning;
use daqbuf_series::SeriesId;
use daqbuf_series::msp::PrebinnedPartitioning;
use futures_util::Future;
use futures_util::Stream;
use futures_util::StreamExt;
use netpod::UseScylla6Workarounds;
use netpod::log;
use netpod::range::evrange::NanoRange;
use netpod::ttl::RetentionTime;
@@ -14,7 +15,7 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); }
macro_rules! debug { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ); }
autoerr::create_error_v1!(
name(Error, "BinWriteIndexStream"),
@@ -43,6 +44,7 @@ enum InpSt {
#[derive(Debug)]
pub struct BinWriteIndexStream {
rtss: VecDeque<InpSt>,
use_scylla6_workarounds: UseScylla6Workarounds,
}
impl BinWriteIndexStream {
@@ -50,7 +52,12 @@ impl BinWriteIndexStream {
std::any::type_name::<Self>()
}
pub fn new(series: SeriesId, range: NanoRange, scyqueue: ScyllaQueue) -> Self {
pub fn new(
series: SeriesId,
range: NanoRange,
use_scylla6_workarounds: UseScylla6Workarounds,
scyqueue: ScyllaQueue,
) -> Self {
debug!("{}::new", Self::type_name());
let mut rtss = VecDeque::new();
let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long];
@@ -60,11 +67,15 @@ impl BinWriteIndexStream {
series.clone(),
PrebinnedPartitioning::Day1,
range.clone(),
use_scylla6_workarounds.clone(),
scyqueue.clone(),
);
rtss.push_back(InpSt::Polling(s));
}
BinWriteIndexStream { rtss }
BinWriteIndexStream {
rtss,
use_scylla6_workarounds,
}
}
fn abort(&mut self) {

View File

@@ -10,7 +10,7 @@ use futures_util::TryStreamExt;
use items_0::streamitem::Sitemty3;
use items_0::streamitem::sitem3_data;
use netpod::DtMs;
use netpod::log;
use netpod::UseScylla6Workarounds;
use netpod::range::evrange::NanoRange;
use netpod::ttl::RetentionTime;
use std::collections::VecDeque;
@@ -18,8 +18,8 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ); }
macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); }
macro_rules! info { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ); }
macro_rules! debug { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ); }
autoerr::create_error_v1!(
name(Error, "BinIndexReadAllCoarse"),
@@ -32,6 +32,7 @@ autoerr::create_error_v1!(
async fn read_all_coarse(
series: SeriesId,
range: NanoRange,
use_scylla6_workarounds: UseScylla6Workarounds,
scyqueue: &ScyllaQueue,
) -> Result<VecDeque<(RetentionTime, MspU32, LspU32, DtMs)>, Error> {
let rts = {
@@ -41,7 +42,14 @@ async fn read_all_coarse(
let mut ret = VecDeque::new();
for rt in rts {
let pbp = PrebinnedPartitioning::Day1;
let mut stream = BinWriteIndexRtStream::new(rt.clone(), series, pbp, range.clone(), scyqueue.clone());
let mut stream = BinWriteIndexRtStream::new(
rt.clone(),
series,
pbp,
range.clone(),
use_scylla6_workarounds.clone(),
scyqueue.clone(),
);
while let Some(x) = stream.try_next().await? {
match x.into_data() {
Ok(x) => {
@@ -77,11 +85,16 @@ pub struct ReadAllCoarse {
}
impl ReadAllCoarse {
pub fn new(series: SeriesId, range: NanoRange, scyqueue: ScyllaQueue) -> Self {
pub fn new(
series: SeriesId,
range: NanoRange,
use_scylla6_workarounds: UseScylla6Workarounds,
scyqueue: ScyllaQueue,
) -> Self {
let scyqueue = Box::new(scyqueue);
let fut = {
let scyqueue = unsafe { &*(scyqueue.as_ref() as *const ScyllaQueue) };
read_all_coarse(series, range, scyqueue)
read_all_coarse(series, range, use_scylla6_workarounds, scyqueue)
};
Self {
scyqueue,

View File

@@ -27,6 +27,7 @@ use netpod::Shape;
use netpod::TsMs;
use netpod::TsMsVecFmt;
use netpod::TsNano;
use netpod::UseScylla6Workarounds;
use netpod::log;
use netpod::ttl::RetentionTime;
use scylla::client::pager::QueryPager;
@@ -46,6 +47,8 @@ macro_rules! error { ($($arg:tt)*) => ( if true { log::error!($($arg)*); } ) }
macro_rules! warn { ($($arg:tt)*) => ( if true { log::warn!($($arg)*); } ) }
macro_rules! info { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ) }
macro_rules! trace_init { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ) }
macro_rules! trace_fetch { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ) }
@@ -69,14 +72,21 @@ pub struct EventReadOpts {
with_values: bool,
one_before: bool,
qucap: u32,
use_scylla6_workarounds: UseScylla6Workarounds,
}
impl EventReadOpts {
pub fn new(one_before: bool, with_values: bool, qucap: Option<u32>) -> Self {
pub fn new(
one_before: bool,
with_values: bool,
qucap: Option<u32>,
use_scylla6_workarounds: UseScylla6Workarounds,
) -> Self {
Self {
one_before,
with_values,
qucap: qucap.unwrap_or(6),
use_scylla6_workarounds,
}
}
@@ -379,7 +389,13 @@ impl EventsStreamRt {
) -> Self {
trace_init!("EventsStreamRt::new {ch_conf:?} {range:?} {rt:?} {readopts:?}");
let series = SeriesId::new(ch_conf.series());
let msp_inp = crate::events2::msp::MspStreamRt::new(rt.clone(), series, range.clone(), scyqueue.clone());
let msp_inp = crate::events2::msp::MspStreamRt::new(
rt.clone(),
series,
range.clone(),
readopts.use_scylla6_workarounds.clone(),
scyqueue.clone(),
);
Self {
qucap: readopts.qucap as usize,
rt,
@@ -878,6 +894,7 @@ async fn read_next_values_3_fwd(
table_name
);
let qu = stmts
.cache_bypass(*opts.readopts.use_scylla6_workarounds)
.rt(&opts.rt)
.lsp(!opts.fwd, with_values)
.shape(val_ty_dyn.is_valueblob())
@@ -926,6 +943,7 @@ async fn read_lsp_all(
opts.range.beg().delta(opts.ts_msp.ns())
};
let mut qu = stmts
.cache_bypass(*opts.readopts.use_scylla6_workarounds)
.rt(&opts.rt)
.lsp_all()
.shape(opts.val_ty_dyn.is_valueblob())
@@ -964,6 +982,9 @@ async fn read_next_values_3_bck(
if n > 1024 * 200 {
log::info!("{n} lsp in msp {msp}", msp = opts.ts_msp)
// TODO metrics
} else if n > 1024 * 20 {
log::debug!("{n} lsp in msp {msp}", msp = opts.ts_msp)
// TODO metrics
}
}
jobtrace.add_event_now(ReadEventKind::ReadEventsLspAllDone);
@@ -974,6 +995,9 @@ async fn read_next_values_3_bck(
return Ok((ret,));
};
let val_ty_dyn = &opts.val_ty_dyn;
if *opts.readopts.use_scylla6_workarounds == false {
info!("{selfname} NO WORKAROUND");
}
trace_fetch!("{selfname} {:?} st_name {}", opts, val_ty_dyn.st_name());
let series = opts.series;
let ts_msp = opts.ts_msp;
@@ -981,6 +1005,7 @@ async fn read_next_values_3_bck(
let with_values = opts.readopts.with_values();
trace_fetch!("{selfname} ts_msp {} lsp {} {}", ts_msp.fmt(), lsp, table_name);
let qu = stmts
.cache_bypass(*opts.readopts.use_scylla6_workarounds)
.rt(&opts.rt)
.lsp(false, with_values)
// TODO

View File

@@ -8,6 +8,7 @@ use futures_util::Stream;
use futures_util::TryStreamExt;
use netpod::TsMs;
use netpod::TsMsVecFmt;
use netpod::UseScylla6Workarounds;
use netpod::log;
use netpod::ttl::RetentionTime;
use scylla::client::session::Session;
@@ -92,23 +93,40 @@ pub struct MspStreamRt {
out: VecDeque<TsMs>,
scyqueue: ScyllaQueue,
do_trace_detail: bool,
use_scylla6_workarounds: UseScylla6Workarounds,
}
impl MspStreamRt {
pub fn new(rt: RetentionTime, series: SeriesId, range: ScyllaSeriesRange, scyqueue: ScyllaQueue) -> Self {
pub fn new(
rt: RetentionTime,
series: SeriesId,
range: ScyllaSeriesRange,
use_scylla6_workarounds: UseScylla6Workarounds,
scyqueue: ScyllaQueue,
) -> Self {
let fut_bck = {
let scyqueue = scyqueue.clone();
let rt = rt.clone();
let series = series.clone();
let range = range.clone();
async move { scyqueue.find_ts_msp(rt, series.id(), range, true).await }
let use_scylla6_workarounds = use_scylla6_workarounds.clone();
async move {
scyqueue
.find_ts_msp(rt, series, range, true, use_scylla6_workarounds)
.await
}
};
let fut_fwd = {
let scyqueue = scyqueue.clone();
let rt = rt.clone();
let series = series.clone();
let range = range.clone();
async move { scyqueue.find_ts_msp(rt, series.id(), range, false).await }
let use_scylla6_workarounds = use_scylla6_workarounds.clone();
async move {
scyqueue
.find_ts_msp(rt, series, range, false, use_scylla6_workarounds)
.await
}
};
let do_trace_detail = daqbuf_series::dbg::dbg_series(series.clone());
trace_emit!(do_trace_detail, "------------------------------------- TEST TRACE");
@@ -124,6 +142,7 @@ impl MspStreamRt {
out: VecDeque::new(),
scyqueue,
do_trace_detail,
use_scylla6_workarounds,
}
}
@@ -144,7 +163,12 @@ impl MspStreamRt {
let scyqueue = self.scyqueue.clone();
let rt = self.rt.clone();
let series = self.series.clone();
async move { scyqueue.find_ts_msp(rt, series.id(), range, false).await }
let use_scylla6_workarounds = self.use_scylla6_workarounds.clone();
async move {
scyqueue
.find_ts_msp(rt, series, range, false, use_scylla6_workarounds)
.await
}
};
Resolvable::Future(Box::pin(fut_fwd))
}
@@ -301,6 +325,7 @@ pub async fn find_ts_msp(
series: u64,
range: ScyllaSeriesRange,
bck: bool,
use_scylla6_workarounds: UseScylla6Workarounds,
stmts: &StmtsEvents,
scy: &Session,
) -> Result<VecDeque<TsMs>, Error> {
@@ -312,9 +337,13 @@ pub async fn find_ts_msp(
bck
);
if bck {
find_ts_msp_bck_workaround(rt, series, range, stmts, scy).await
if *use_scylla6_workarounds {
find_ts_msp_bck_workaround(rt, series, range, stmts, scy).await
} else {
find_ts_msp_bck(rt, series, range, stmts, scy).await
}
} else {
find_ts_msp_fwd(rt, series, range, stmts, scy).await
find_ts_msp_fwd(rt, series, range, use_scylla6_workarounds, stmts, scy).await
}
}
@@ -322,6 +351,7 @@ async fn find_ts_msp_fwd(
rt: &RetentionTime,
series: u64,
range: ScyllaSeriesRange,
use_scylla6_workarounds: UseScylla6Workarounds,
stmts: &StmtsEvents,
scy: &Session,
) -> Result<VecDeque<TsMs>, Error> {
@@ -331,7 +361,10 @@ async fn find_ts_msp_fwd(
let params = (series as i64, range.beg().ms() as i64, 1 + range.end().ms() as i64);
log_fetch_result!("{selfname} {:?}", params);
let mut res = scy
.execute_iter(stmts.rt(rt).ts_msp_fwd().clone(), params)
.execute_iter(
stmts.cache_bypass(*use_scylla6_workarounds).rt(rt).ts_msp_fwd().clone(),
params,
)
.await?
.rows_stream::<(i64,)>()?;
while let Some(row) = res.try_next().await? {
@@ -354,7 +387,7 @@ async fn find_ts_msp_bck(
let params = (series as i64, range.beg().ms() as i64);
log_fetch_result!("{selfname} {:?}", params);
let mut res = scy
.execute_iter(stmts.rt(rt).ts_msp_bck().clone(), params)
.execute_iter(stmts.cache_bypass(false).rt(rt).ts_msp_bck().clone(), params)
.await?
.rows_stream::<(i64,)>()?;
while let Some(row) = res.try_next().await? {
@@ -379,7 +412,7 @@ async fn find_ts_msp_bck_workaround(
let params = (series as i64, 0 as i64, i64::MAX);
log_fetch_result!("{selfname} {:?}", params);
let mut res = scy
.execute_iter(stmts.rt(rt).ts_msp_bck_workaround().clone(), params)
.execute_iter(stmts.cache_bypass(true).rt(rt).ts_msp_bck_workaround().clone(), params)
.await?
.rows_stream::<(i64,)>()?;
let mut c = 0;

View File

@@ -2,9 +2,9 @@ use netpod::ttl::RetentionTime;
use scylla::client::session::Session;
use scylla::statement::prepared::PreparedStatement;
macro_rules! info_prepare {
($($arg:tt)*) => { log::info!("prepare cql {}", format_args!($($arg)*)); };
}
macro_rules! log_prepare { ($($arg:tt)*) => { log::debug!("prepare cql {}", format_args!($($arg)*)); }; }
macro_rules! trace_scy6 { ($($arg:tt)*) => { log::info!("{}", format_args!($($arg)*)); }; }
autoerr::create_error_v1!(
name(Error, "ScyllaPrepare"),
@@ -141,6 +141,7 @@ impl StmtsEventsRt {
}
pub fn ts_msp_bck(&self) -> &PreparedStatement {
trace_scy6!("StmtsEventsRt ORDER DESC");
&self.ts_msp_bck
}
@@ -190,7 +191,7 @@ async fn make_msp_dir(
select_cond,
query_opts
);
info_prepare!("{ks} {rt} {cql}");
log_prepare!("{ks} {rt} {cql}");
let qu = scy.prepare(cql).await?;
Ok(qu)
}
@@ -211,7 +212,7 @@ async fn make_msp_fwd_for_bck_workaround(
select_cond,
query_opts
);
info_prepare!("{ks} {rt} {cql}");
log_prepare!("{ks} {rt} {cql}");
let qu = scy.prepare(cql).await?;
Ok(qu)
}
@@ -235,7 +236,7 @@ async fn make_lsp_all_shape_st(
stname,
query_opts
);
info_prepare!("{ks} {rt} {cql}");
log_prepare!("{ks} {rt} {cql}");
let qu = scy.prepare(cql).await?;
Ok(qu)
}
@@ -273,7 +274,7 @@ async fn make_lsp_all_shape(
table_name,
query_opts
);
info_prepare!("{ks} {rt} {cql}");
log_prepare!("{ks} {rt} {cql}");
let qu = scy.prepare(cql).await?;
qu
},
@@ -317,7 +318,7 @@ async fn make_lsp(
select_cond,
query_opts
);
info_prepare!("{ks} {rt} {cql}");
log_prepare!("{ks} {rt} {cql}");
let qu = scy.prepare(cql).await?;
Ok(qu)
}
@@ -373,7 +374,7 @@ async fn make_lsp_shape(
table_name,
query_opts
);
info_prepare!("{ks} {rt} {cql}");
log_prepare!("{ks} {rt} {cql}");
let qu = scy.prepare(cql).await?;
qu
},
@@ -413,7 +414,7 @@ async fn make_prebinned_f32(
rt.table_prefix(),
query_opts
);
info_prepare!("{ks} {rt} {cql}");
log_prepare!("{ks} {rt} {cql}");
let qu = scy.prepare(cql).await?;
Ok(qu)
}
@@ -436,7 +437,7 @@ async fn make_bin_write_index_read(
rt.table_prefix(),
query_opts
);
info_prepare!("{ks} {rt} {cql}");
log_prepare!("{ks} {rt} {cql}");
let qu = scy.prepare(cql).await?;
Ok(qu)
}
@@ -458,24 +459,29 @@ async fn make_rt(ks: &str, rt: &RetentionTime, query_opts: &str, scy: &Session)
}
#[derive(Debug)]
pub struct StmtsEvents {
pub struct StmtsEventsCacheBypass {
st: StmtsEventsRt,
mt: StmtsEventsRt,
lt: StmtsEventsRt,
bypass_cache: bool,
}
impl StmtsEvents {
impl StmtsEventsCacheBypass {
pub async fn new(ks: [&str; 3], bypass_cache: bool, scy: &Session) -> Result<Self, Error> {
let query_opts = if bypass_cache { "bypass cache" } else { "" };
let ret = StmtsEvents {
let ret = StmtsEventsCacheBypass {
st: make_rt(ks[0], &RetentionTime::Short, query_opts, scy).await?,
mt: make_rt(ks[1], &RetentionTime::Medium, query_opts, scy).await?,
lt: make_rt(ks[2], &RetentionTime::Long, query_opts, scy).await?,
bypass_cache,
};
Ok(ret)
}
pub fn rt(&self, rt: &RetentionTime) -> &StmtsEventsRt {
if self.bypass_cache == false {
trace_scy6!("StmtsEventsCacheBypass false");
}
match rt {
RetentionTime::Short => &self.st,
RetentionTime::Medium => &self.mt,
@@ -483,3 +489,28 @@ impl StmtsEvents {
}
}
}
#[derive(Debug)]
pub struct StmtsEvents {
cache_use: StmtsEventsCacheBypass,
cache_bypass: StmtsEventsCacheBypass,
}
impl StmtsEvents {
pub async fn new(ks: [&str; 3], scy: &Session) -> Result<Self, Error> {
let ret = StmtsEvents {
cache_use: StmtsEventsCacheBypass::new(ks, false, scy).await?,
cache_bypass: StmtsEventsCacheBypass::new(ks, true, scy).await?,
};
Ok(ret)
}
pub fn cache_bypass(&self, cache_bypass: bool) -> &StmtsEventsCacheBypass {
if cache_bypass {
&self.cache_bypass
} else {
trace_scy6!("cache_bypass false");
&self.cache_use
}
}
}

View File

@@ -1,6 +1,7 @@
pub mod accounting;
pub mod bincache;
pub mod binned2;
pub mod binned3;
pub mod binwriteindex;
pub mod conn;
pub mod errconv;

View File

@@ -16,6 +16,7 @@ use items_2::binning::container_bins::ContainerBins;
use netpod::DtMs;
use netpod::ScyllaConfig;
use netpod::TsMs;
use netpod::UseScylla6Workarounds;
use netpod::log;
use netpod::ttl::RetentionTime;
use std::collections::VecDeque;
@@ -56,6 +57,48 @@ impl<T> From<async_channel::SendError<T>> for Error {
type ScySessTy = scylla::client::session::Session;
#[derive(Debug)]
struct FindTsMsp {
rt: RetentionTime,
series: SeriesId,
range: ScyllaSeriesRange,
bck: bool,
use_scylla6_workarounds: UseScylla6Workarounds,
tx: Sender<Result<VecDeque<TsMs>, Error>>,
}
impl FindTsMsp {
async fn execute(self, stmts: &StmtsEvents, scy: &ScySessTy) {
// TODO avoid the extra clone
let tx = self.tx.clone();
match self.execute_inner(stmts, scy).await {
Ok(()) => {}
Err(e) => {
if tx.send(Err(e)).await.is_err() {
// TODO count for stats
}
}
}
}
async fn execute_inner(self, stmts: &StmtsEvents, scy: &ScySessTy) -> Result<(), Error> {
let res = crate::events2::msp::find_ts_msp(
&self.rt,
self.series.id(),
self.range,
self.bck,
self.use_scylla6_workarounds,
&stmts,
&scy,
)
.await;
if self.tx.send(res.map_err(Into::into)).await.is_err() {
// TODO count for stats
}
Ok(())
}
}
#[derive(Debug)]
struct ReadPrebinnedF32 {
rt: RetentionTime,
@@ -63,17 +106,19 @@ struct ReadPrebinnedF32 {
bin_len: DtMs,
msp: u64,
offs: core::ops::Range<u32>,
use_scylla6_workarounds: UseScylla6Workarounds,
tx: Sender<Result<ContainerBins<f32, f32>, streams::timebin::cached::reader::Error>>,
}
#[derive(Debug)]
struct BinWriteIndexRead {
rt1: RetentionTime,
rt: RetentionTime,
series: SeriesId,
pbp: PrebinnedPartitioning,
msp: MspU32,
lsp_min: u32,
lsp_max: u32,
use_scylla6_workarounds: UseScylla6Workarounds,
tx: Sender<Result<VecDeque<BinWriteIndexEntry>, Error>>,
}
@@ -101,7 +146,14 @@ impl BinWriteIndexRead {
);
log::info!("execute {:?}", params);
let res = scy
.execute_iter(stmts.rt(&self.rt1).bin_write_index_read().clone(), params)
.execute_iter(
stmts
.cache_bypass(*self.use_scylla6_workarounds)
.rt(&self.rt)
.bin_write_index_read()
.clone(),
params,
)
.await?;
let mut it = res.rows_stream::<(i32, i32)>()?;
let mut all = VecDeque::new();
@@ -141,14 +193,7 @@ impl fmt::Debug for ExecuteV1 {
#[derive(Debug)]
enum Job {
FindTsMsp(
RetentionTime,
// series-id
u64,
ScyllaSeriesRange,
bool,
Sender<Result<VecDeque<TsMs>, Error>>,
),
FindTsMsp(FindTsMsp),
AccountingReadTs(
RetentionTime,
TsMs,
@@ -178,12 +223,21 @@ impl ScyllaQueue {
pub async fn find_ts_msp(
&self,
rt: RetentionTime,
series: u64,
series: SeriesId,
range: ScyllaSeriesRange,
bck: bool,
use_scylla6_workarounds: UseScylla6Workarounds,
) -> Result<VecDeque<TsMs>, Error> {
let (tx, rx) = async_channel::bounded(1);
let job = Job::FindTsMsp(rt, series, range, bck, tx);
let job = FindTsMsp {
rt,
series,
range,
bck,
use_scylla6_workarounds,
tx,
};
let job = Job::FindTsMsp(job);
self.tx.send(job).await.map_err(|_| Error::ChannelSend)?;
let res = rx.recv().await.map_err(|_| Error::ChannelRecv)??;
Ok(res)
@@ -237,6 +291,7 @@ impl ScyllaQueue {
bin_len: DtMs,
msp: u64,
offs: core::ops::Range<u32>,
use_scylla6_workarounds: UseScylla6Workarounds,
) -> Result<ContainerBins<f32, f32>, streams::timebin::cached::reader::Error> {
let (tx, rx) = async_channel::bounded(1);
let job = Job::ReadPrebinnedF32(ReadPrebinnedF32 {
@@ -245,6 +300,7 @@ impl ScyllaQueue {
bin_len,
msp,
offs,
use_scylla6_workarounds,
tx,
});
self.tx
@@ -260,21 +316,23 @@ impl ScyllaQueue {
pub async fn bin_write_index_read(
&self,
rt1: RetentionTime,
rt: RetentionTime,
series: SeriesId,
pbp: PrebinnedPartitioning,
msp: MspU32,
lsp_min: u32,
lsp_max: u32,
use_scylla6_workarounds: UseScylla6Workarounds,
) -> Result<VecDeque<BinWriteIndexEntry>, Error> {
let (tx, rx) = async_channel::bounded(1);
let job = BinWriteIndexRead {
rt1,
rt,
series,
pbp,
msp,
lsp_min,
lsp_max,
use_scylla6_workarounds,
tx,
};
let job = Job::BinWriteIndexRead(job);
@@ -356,25 +414,13 @@ impl ScyllaWorker {
self.scyconf_lt.keyspace.as_str(),
];
debug!("scylla worker prepare start");
let stmts = StmtsEvents::new(
kss.try_into().map_err(|_| Error::MissingKeyspaceConfig)?,
self.scyconf_st.bypass_cache,
&scy,
)
.await?;
let stmts = StmtsEvents::new(kss.try_into().map_err(|_| Error::MissingKeyspaceConfig)?, &scy).await?;
let stmts = Arc::new(stmts);
// let stmts_cache = StmtsCache::new(kss[0], &scy).await?;
// let stmts_cache = Arc::new(stmts_cache);
debug!("scylla worker prepare done");
self.rx
.map(|job| async {
match job {
Job::FindTsMsp(rt, series, range, bck, tx) => {
let res = crate::events2::msp::find_ts_msp(&rt, series, range, bck, &stmts, &scy).await;
if tx.send(res.map_err(Into::into)).await.is_err() {
// TODO count for stats
}
}
Job::FindTsMsp(job) => job.execute(&stmts, &scy).await,
Job::AccountingReadTs(rt, ts, tx) => {
let ks = match &rt {
RetentionTime::Short => &self.scyconf_st.keyspace,
@@ -396,12 +442,14 @@ impl ScyllaWorker {
}
}
Job::ReadPrebinnedF32(job) => {
// TODO remove I guess?
let res = super::bincache::worker_read(
job.rt,
job.series,
job.bin_len,
job.msp,
job.offs,
job.use_scylla6_workarounds,
&stmts,
&scy,
)