Workaround for one before range handling

This commit is contained in:
Dominik Werder
2025-07-04 15:10:03 +02:00
parent 2a8c1a91d3
commit 9ac95a7866
4 changed files with 210 additions and 101 deletions

View File

@@ -65,7 +65,9 @@ pub async fn scylla_channel_event_stream(
.map(move |item| match item {
Ok(k) => match k {
ChannelEvents::Events(mut k) => {
if let SeriesKind::ChannelStatus = chconf.kind() {
if true {
Ok(ChannelEvents::Events(k))
} else if let SeriesKind::ChannelStatus = chconf.kind() {
type C1 = ContainerEvents<u64>;
type C2 = ContainerEvents<String>;
if let Some(j) = k.as_any_mut().downcast_mut::<C1>() {

View File

@@ -42,11 +42,11 @@ use std::time::Duration;
use std::time::Instant;
use taskrun::tracing;
macro_rules! error { ($($arg:expr),*) => ( if true { log::error!($($arg),*); } ) }
macro_rules! error { ($($arg:tt)*) => ( if true { log::error!($($arg)*); } ) }
macro_rules! warn { ($($arg:expr),*) => ( if true { log::warn!($($arg),*); } ) }
macro_rules! warn { ($($arg:tt)*) => ( if true { log::warn!($($arg)*); } ) }
macro_rules! trace_init { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! trace_init { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ) }
macro_rules! trace_fetch { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ) }
@@ -260,6 +260,7 @@ pub enum ReadEventKind {
ScyllaReadRowDone(u32),
ReadNextValuesFutureDone,
EventsStreamRtSees(u32),
ReadEventsLspAllDone,
}
#[derive(Debug)]
@@ -298,7 +299,7 @@ impl fmt::Display for ReadJobTrace {
#[derive(Debug)]
pub(super) struct ReadNextValuesOpts {
rt: RetentionTime,
series: u64,
series: SeriesId,
ts_msp: TsMs,
range: ScyllaSeriesRange,
fwd: bool,
@@ -306,6 +307,20 @@ pub(super) struct ReadNextValuesOpts {
val_ty_dyn: Box<dyn ValTyDyn>,
}
impl Clone for ReadNextValuesOpts {
fn clone(&self) -> Self {
Self {
rt: self.rt.clone(),
series: self.series.clone(),
ts_msp: self.ts_msp.clone(),
range: self.range.clone(),
fwd: self.fwd.clone(),
readopts: self.readopts.clone(),
val_ty_dyn: self.val_ty_dyn.clone_boxed(),
}
}
}
#[derive(Clone)]
struct MakeFutInfo {
scyqueue: ScyllaQueue,
@@ -417,7 +432,6 @@ impl EventsStreamRt {
ts_msp: TsMs,
bck: bool,
mfi: MakeFutInfo,
jobtrace: ReadJobTrace,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>> {
let scyqueue = mfi.scyqueue.clone();
let rt = mfi.rt.clone();
@@ -462,9 +476,8 @@ impl EventsStreamRt {
fn setup_bck_read(&mut self) {
if let Some(ts) = self.msp_buf_bck.pop_back() {
trace_fetch!("setup_bck_read {}", ts.fmt());
let jobtrace = ReadJobTrace::new();
let mfi = MakeFutInfo::new(self);
let fut = Self::make_read_events_fut(ts, true, mfi, jobtrace);
let fut = Self::make_read_events_fut(ts, true, mfi);
self.state = State::ReadingBck(ReadingBck {
reading_state: ReadingState::FetchEvents(FetchEvents::from_fut(fut)),
});
@@ -494,9 +507,8 @@ impl EventsStreamRt {
while qu.has_space() {
if let Some(ts) = msp_buf.pop_front() {
trace_fetch!("{selfname} {} FILL A SLOT", ts.fmt());
let jobtrace = ReadJobTrace::new();
let mfi = st.make_fut_info.clone();
let fut = Self::make_read_events_fut(ts, false, mfi, jobtrace);
let fut = Self::make_read_events_fut(ts, false, mfi);
qu.push(fut);
} else {
break;
@@ -880,7 +892,7 @@ async fn read_next_values_3_fwd(
qu
};
let params = (
series as i64,
series.to_i64(),
ts_msp.ms() as i64,
ts_lsp_min.ns() as i64,
ts_lsp_max.ns() as i64,
@@ -899,6 +911,33 @@ async fn read_next_values_3_fwd(
Ok((ret,))
}
async fn read_lsp_all(
opts: ReadNextValuesOpts,
stmts: Arc<StmtsEvents>,
scy: Arc<Session>,
jobtrace: &mut ReadJobTrace,
) -> Result<(Vec<DtNano>,), Error> {
let selfname = "read_lsp_all";
let mut qu = stmts
.rt(&opts.rt)
.lsp_all()
.shape(opts.val_ty_dyn.is_valueblob())
.st(opts.val_ty_dyn.st_name())?
.clone();
qu.set_page_size(1024 * 4);
let params = (opts.series.to_i64(), opts.ts_msp.ms() as i64);
trace_fetch!("{selfname} event search params {:?}", params);
jobtrace.add_event_now(ReadEventKind::CallExecuteIter);
let res = scy.execute_iter(qu.clone(), params).await?;
let mut ret = Vec::new();
let mut it = res.rows_stream::<(i64,)>()?;
while let Some(row) = it.try_next().await? {
let lsp = DtNano::from_ns(row.0 as u64);
ret.push(lsp);
}
Ok((ret,))
}
async fn read_next_values_3_bck(
opts: ReadNextValuesOpts,
stmts: Arc<StmtsEvents>,
@@ -906,49 +945,39 @@ async fn read_next_values_3_bck(
jobtrace: &mut ReadJobTrace,
) -> Result<(Box<dyn BinningggContainerEventsDyn>,), Error> {
let selfname = "read_next_values_3_bck";
let ret = opts.val_ty_dyn.empty_container_for_test();
if true {
if false {
let ret = opts.val_ty_dyn.empty_container_for_test();
return Ok((ret,));
}
let (mut lsp_all,) = read_lsp_all(opts.clone(), stmts.clone(), scy.clone(), jobtrace).await?;
{
let n = lsp_all.len();
if n > 1024 * 200 {
log::info!("{n} lsp in msp {msp}", msp = opts.ts_msp)
// TODO metrics
}
}
jobtrace.add_event_now(ReadEventKind::ReadEventsLspAllDone);
let lsp = if let Some(x) = lsp_all.pop() {
x
} else {
let ret = opts.val_ty_dyn.empty_container_for_test();
return Ok((ret,));
};
let val_ty_dyn = &opts.val_ty_dyn;
trace_fetch!("{selfname} {:?} st_name {}", opts, val_ty_dyn.st_name());
let series = opts.series;
let ts_msp = opts.ts_msp;
let range = opts.range;
let table_name = val_ty_dyn.table_name();
let with_values = opts.readopts.with_values();
if range.end() > TsNano::from_ns(i64::MAX as u64) {
return Err(Error::RangeEndOverflow);
}
let ts_lsp_max = if ts_msp.ns() < range.beg() {
range.beg().delta(ts_msp.ns())
} else {
DtNano::from_ns(0)
};
trace_fetch!(
"{selfname} ts_msp {} ts_lsp_max {} {}",
ts_msp.fmt(),
ts_lsp_max,
table_name
);
trace_fetch!("{selfname} ts_msp {} lsp {} {}", ts_msp.fmt(), lsp, table_name);
let qu = stmts
.rt(&opts.rt)
.lsp(!opts.fwd, with_values)
.lsp(false, with_values)
// TODO
.shape(val_ty_dyn.is_valueblob())
.st(val_ty_dyn.st_name())?;
let qu = if false {
let mut qu = qu.clone();
if qu.is_token_aware() == false {
return Err(Error::NotTokenAware);
}
qu.set_page_size(10000);
// qu.disable_paging();
qu
} else {
qu.clone()
};
let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64);
let params = (series.to_i64(), ts_msp.ms() as i64, lsp.to_i64(), lsp.to_i64() + 1);
trace_fetch!("{selfname} event search params {:?}", params);
jobtrace.add_event_now(ReadEventKind::CallExecuteIter);
let res = scy.execute_iter(qu.clone(), params).await?;
@@ -985,7 +1014,7 @@ pub async fn read_events_v02(
let val_ty_dyn = new_val_ty_dyn_from_shape_scalar_type(params.shape.clone(), params.scalar_type.clone());
let opts = ReadNextValuesOpts {
rt: params.rt.clone(),
series: params.series.id(),
series: params.series,
ts_msp: params.ts_msp,
range: params.range,
fwd: params.fwd,
@@ -1008,6 +1037,7 @@ trait ValTyDyn: fmt::Debug + Send {
// jobtrace: &mut ReadJobTrace,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn BinningggContainerEventsDyn>, Error>> + Send>>;
fn empty_container_for_test(&self) -> Box<dyn BinningggContainerEventsDyn>;
fn clone_boxed(&self) -> Box<dyn ValTyDyn>;
}
async fn read_into_container_branch_00<ST>(
@@ -1065,7 +1095,7 @@ where
Ok(ret)
}
#[derive(Debug)]
#[derive(Debug, Clone)]
struct ValTyDynTesting<ST>
where
ST: ValTy,
@@ -1173,6 +1203,11 @@ where
fn empty_container_for_test(&self) -> Box<dyn BinningggContainerEventsDyn> {
Box::new(<ST::Container as Empty>::empty())
}
fn clone_boxed(&self) -> Box<dyn ValTyDyn> {
let ret = Self { _t1: PhantomData };
Box::new(ret)
}
}
trait ValTy: fmt::Debug + Send + Sized + 'static {

View File

@@ -19,6 +19,57 @@ autoerr::create_error_v1!(
},
);
#[derive(Debug)]
pub struct StmtsLspAllShape {
u8: PreparedStatement,
u16: PreparedStatement,
u32: PreparedStatement,
u64: PreparedStatement,
i8: PreparedStatement,
i16: PreparedStatement,
i32: PreparedStatement,
i64: PreparedStatement,
f32: PreparedStatement,
f64: PreparedStatement,
bool: PreparedStatement,
string: PreparedStatement,
enumvals: PreparedStatement,
}
impl StmtsLspAllShape {
pub fn st(&self, stname: &str) -> Result<&PreparedStatement, Error> {
let ret = match stname {
"u8" => &self.u8,
"u16" => &self.u16,
"u32" => &self.u32,
"u64" => &self.u64,
"i8" => &self.i8,
"i16" => &self.i16,
"i32" => &self.i32,
"i64" => &self.i64,
"f32" => &self.f32,
"f64" => &self.f64,
"bool" => &self.bool,
"string" => &self.string,
"enum" => &self.enumvals,
_ => return Err(Error::MissingQuery(format!("no query for stname {stname}"))),
};
Ok(ret)
}
}
#[derive(Debug)]
pub struct StmtsLspAll {
scalar: StmtsLspAllShape,
array: StmtsLspAllShape,
}
impl StmtsLspAll {
pub fn shape(&self, array: bool) -> &StmtsLspAllShape {
if array { &self.array } else { &self.scalar }
}
}
#[derive(Debug)]
pub struct StmtsLspShape {
u8: PreparedStatement,
@@ -75,6 +126,7 @@ pub struct StmtsEventsRt {
ts_msp_fwd: PreparedStatement,
ts_msp_bck: PreparedStatement,
ts_msp_bck_workaround: PreparedStatement,
lsp_all: StmtsLspAll,
lsp_fwd_val: StmtsLspDir,
lsp_bck_val: StmtsLspDir,
lsp_fwd_ts: StmtsLspDir,
@@ -96,6 +148,10 @@ impl StmtsEventsRt {
&self.ts_msp_bck_workaround
}
pub fn lsp_all(&self) -> &StmtsLspAll {
&self.lsp_all
}
pub fn lsp(&self, bck: bool, val: bool) -> &StmtsLspDir {
if bck {
if val { &self.lsp_bck_val } else { &self.lsp_bck_ts }
@@ -160,6 +216,79 @@ async fn make_msp_fwd_for_bck_workaround(
Ok(qu)
}
async fn make_lsp_all_shape_st(
ks: &str,
rt: &RetentionTime,
shapepre: &str,
stname: &str,
query_opts: &str,
scy: &Session,
) -> Result<PreparedStatement, Error> {
let cql = format!(
concat!(
"select ts_lsp from {}.{}events_{}_{}",
" where series = ? and ts_msp = ? {}"
),
ks,
rt.table_prefix(),
shapepre,
stname,
query_opts
);
info_prepare!("{ks} {rt} {cql}");
let qu = scy.prepare(cql).await?;
Ok(qu)
}
async fn make_lsp_all_shape(
ks: &str,
rt: &RetentionTime,
shapepre: &str,
query_opts: &str,
scy: &Session,
) -> Result<StmtsLspAllShape, Error> {
let maker = |stname| make_lsp_all_shape_st(ks, rt, shapepre, stname, query_opts, scy);
let ret = StmtsLspAllShape {
u8: maker("u8").await?,
u16: maker("u16").await?,
u32: maker("u32").await?,
u64: maker("u64").await?,
i8: maker("i8").await?,
i16: maker("i16").await?,
i32: maker("i32").await?,
i64: maker("i64").await?,
f32: maker("f32").await?,
f64: maker("f64").await?,
bool: maker("bool").await?,
string: maker("string").await?,
enumvals: if shapepre == "scalar" {
make_lsp_all_shape_st(ks, rt, shapepre, "enum", query_opts, scy).await?
} else {
// exists only for scalar, therefore produce some dummy here
let table_name = "ts_msp";
let cql = format!(
"select ts_msp from {}.{}{} limit 1 {}",
ks,
rt.table_prefix(),
table_name,
query_opts
);
info_prepare!("{ks} {rt} {cql}");
let qu = scy.prepare(cql).await?;
qu
},
};
Ok(ret)
}
async fn make_lsp_all(ks: &str, rt: &RetentionTime, query_opts: &str, scy: &Session) -> Result<StmtsLspAll, Error> {
let ret = StmtsLspAll {
scalar: make_lsp_all_shape(ks, rt, "scalar", query_opts, scy).await?,
array: make_lsp_all_shape(ks, rt, "array", query_opts, scy).await?,
};
Ok(ret)
}
async fn make_lsp(
ks: &str,
rt: &RetentionTime,
@@ -317,6 +446,7 @@ async fn make_rt(ks: &str, rt: &RetentionTime, query_opts: &str, scy: &Session)
ts_msp_fwd: make_msp_dir(ks, rt, false, query_opts, scy).await?,
ts_msp_bck: make_msp_dir(ks, rt, true, query_opts, scy).await?,
ts_msp_bck_workaround: make_msp_fwd_for_bck_workaround(ks, rt, query_opts, scy).await?,
lsp_all: make_lsp_all(ks, rt, query_opts, scy).await?,
lsp_fwd_val: make_lsp_dir(ks, rt, "ts_lsp, value", false, query_opts, scy).await?,
lsp_bck_val: make_lsp_dir(ks, rt, "ts_lsp, value", true, query_opts, scy).await?,
lsp_fwd_ts: make_lsp_dir(ks, rt, "ts_lsp", false, query_opts, scy).await?,

View File

@@ -9,7 +9,6 @@ use async_channel::Sender;
use daqbuf_series::SeriesId;
use daqbuf_series::msp::MspU32;
use daqbuf_series::msp::PrebinnedPartitioning;
use futures_util::Future;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use items_0::timebin::BinningggContainerEventsDyn;
@@ -19,10 +18,8 @@ use netpod::ScyllaConfig;
use netpod::TsMs;
use netpod::log;
use netpod::ttl::RetentionTime;
use scylla::client::session::Session;
use std::collections::VecDeque;
use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
macro_rules! info { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ); }
@@ -152,7 +149,6 @@ enum Job {
bool,
Sender<Result<VecDeque<TsMs>, Error>>,
),
ReadNextValues(ReadNextValues),
AccountingReadTs(
RetentionTime,
TsMs,
@@ -173,26 +169,6 @@ enum Job {
),
}
struct ReadNextValues {
futgen: Box<
dyn FnOnce(
Arc<Session>,
Arc<StmtsEvents>,
ReadJobTrace,
) -> Pin<
Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>,
> + Send,
>,
tx: Sender<Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>>,
jobtrace: ReadJobTrace,
}
impl fmt::Debug for ReadNextValues {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "ReadNextValues {{ .. }}")
}
}
#[derive(Debug, Clone)]
pub struct ScyllaQueue {
tx: Sender<Job>,
@@ -213,33 +189,6 @@ impl ScyllaQueue {
Ok(res)
}
// TODO remove
pub async fn read_next_values<F>(
&self,
futgen: F,
jobtrace: ReadJobTrace,
) -> Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>
where
F: FnOnce(
Arc<Session>,
Arc<StmtsEvents>,
ReadJobTrace,
) -> Pin<
Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>,
> + Send
+ 'static,
{
let (tx, rx) = async_channel::bounded(1);
let job = Job::ReadNextValues(ReadNextValues {
futgen: Box::new(futgen),
tx,
jobtrace,
});
self.tx.send(job).await.map_err(|_| Error::ChannelSend)?;
let res = rx.recv().await.map_err(|_| Error::ChannelRecv)??;
Ok(res)
}
pub async fn read_events_v02(
&self,
params: ReadEventsJobParams,
@@ -426,13 +375,6 @@ impl ScyllaWorker {
// TODO count for stats
}
}
Job::ReadNextValues(job) => {
let fut = (job.futgen)(scy.clone(), stmts.clone(), job.jobtrace);
let res = fut.await;
if job.tx.send(res.map_err(Into::into)).await.is_err() {
// TODO count for stats
}
}
Job::AccountingReadTs(rt, ts, tx) => {
let ks = match &rt {
RetentionTime::Short => &self.scyconf_st.keyspace,