Fix workaround for missing cql DESC query

This commit is contained in:
Dominik Werder
2025-06-30 18:58:24 +02:00
parent 287b906ef9
commit 2a8c1a91d3
5 changed files with 452 additions and 337 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqretrieve"
version = "0.5.5-aa.15"
version = "0.5.5-aa.18"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2024"

View File

@@ -29,9 +29,11 @@ use netpod::TsMsVecFmt;
use netpod::TsNano;
use netpod::log;
use netpod::ttl::RetentionTime;
use scylla::client::pager::QueryPager;
use scylla::client::session::Session;
use std::collections::VecDeque;
use std::fmt;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
@@ -46,13 +48,13 @@ macro_rules! warn { ($($arg:expr),*) => ( if true { log::warn!($($arg),*); } ) }
macro_rules! trace_init { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! trace_fetch { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! trace_fetch { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ) }
macro_rules! trace_msp_fetch { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! trace_msp_fetch { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ) }
macro_rules! trace_redo_fwd_read { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! trace_redo_fwd_read { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ) }
macro_rules! trace_emit { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! trace_emit { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ) }
macro_rules! trace_every_event { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ) }
@@ -301,34 +303,7 @@ pub(super) struct ReadNextValuesOpts {
range: ScyllaSeriesRange,
fwd: bool,
readopts: EventReadOpts,
scyqueue: ScyllaQueue,
}
impl ReadNextValuesOpts {
pub(super) fn new(
rt: RetentionTime,
series: SeriesId,
ts_msp: TsMs,
range: ScyllaSeriesRange,
fwd: bool,
readopts: EventReadOpts,
scyqueue: ScyllaQueue,
) -> Self {
Self {
rt,
series: series.id(),
ts_msp,
range,
fwd,
readopts,
scyqueue,
}
}
}
struct ReadNextValuesParams {
opts: ReadNextValuesOpts,
jobtrace: ReadJobTrace,
val_ty_dyn: Box<dyn ValTyDyn>,
}
#[derive(Clone)]
@@ -444,66 +419,23 @@ impl EventsStreamRt {
mfi: MakeFutInfo,
jobtrace: ReadJobTrace,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>> {
let opts = ReadNextValuesOpts::new(mfi.rt, mfi.series, ts_msp, mfi.range, !bck, mfi.readopts, mfi.scyqueue);
let scalar_type = mfi.ch_conf.scalar_type().clone();
let shape = mfi.ch_conf.shape().clone();
trace_fetch!(
"make_read_events_fut bck {} msp {:?} {} {:?} {:?}",
bck,
let scyqueue = mfi.scyqueue.clone();
let rt = mfi.rt.clone();
let series = mfi.series.clone();
let range = mfi.range.clone();
let readopts = mfi.readopts.clone();
let ch_conf = mfi.ch_conf.clone();
let params = ReadEventsJobParams {
series,
rt,
scalar_type: ch_conf.scalar_type().clone(),
shape: ch_conf.shape().clone(),
ts_msp,
ts_msp.fmt(),
shape,
scalar_type
);
let fut = async move {
if false {
taskrun::tokio::time::sleep(Duration::from_millis(10)).await;
}
let params = ReadNextValuesParams { opts, jobtrace };
let ret = match &shape {
Shape::Scalar => match &scalar_type {
ScalarType::U8 => read_next_values::<u8>(params).await,
ScalarType::U16 => read_next_values::<u16>(params).await,
ScalarType::U32 => read_next_values::<u32>(params).await,
ScalarType::U64 => read_next_values::<u64>(params).await,
ScalarType::I8 => read_next_values::<i8>(params).await,
ScalarType::I16 => read_next_values::<i16>(params).await,
ScalarType::I32 => read_next_values::<i32>(params).await,
ScalarType::I64 => read_next_values::<i64>(params).await,
ScalarType::F32 => read_next_values::<f32>(params).await,
ScalarType::F64 => read_next_values::<f64>(params).await,
ScalarType::BOOL => read_next_values::<bool>(params).await,
ScalarType::STRING => read_next_values::<String>(params).await,
ScalarType::Enum => read_next_values::<EnumVariant>(params).await,
},
Shape::Wave(_) => match &scalar_type {
ScalarType::U8 => read_next_values::<Vec<u8>>(params).await,
ScalarType::U16 => read_next_values::<Vec<u16>>(params).await,
ScalarType::U32 => read_next_values::<Vec<u32>>(params).await,
ScalarType::U64 => read_next_values::<Vec<u64>>(params).await,
ScalarType::I8 => read_next_values::<Vec<i8>>(params).await,
ScalarType::I16 => read_next_values::<Vec<i16>>(params).await,
ScalarType::I32 => read_next_values::<Vec<i32>>(params).await,
ScalarType::I64 => read_next_values::<Vec<i64>>(params).await,
ScalarType::F32 => read_next_values::<Vec<f32>>(params).await,
ScalarType::F64 => read_next_values::<Vec<f64>>(params).await,
ScalarType::BOOL => read_next_values::<Vec<bool>>(params).await,
ScalarType::STRING => {
warn!("read not yet supported {:?} {:?}", shape, scalar_type);
err::todoval()
}
ScalarType::Enum => {
warn!("read not yet supported {:?} {:?}", shape, scalar_type);
err::todoval()
}
},
_ => {
error!("TODO ReadValues add more types");
err::todoval()
}
};
ret.map_err(Error::from)
range,
fwd: !bck,
readopts,
};
let fut = async move { scyqueue.read_events_v02(params).await.map_err(From::from) };
Box::pin(fut)
}
@@ -689,7 +621,8 @@ impl Stream for EventsStreamRt {
});
}
} else {
panic!("absolutely nothing to read");
log::error!("logic error no msp to read should be handled before");
self.transition_to_fwd_read();
}
continue;
}
@@ -854,25 +787,56 @@ fn phantomval<T>() -> T {
panic!()
}
async fn read_next_values_2<ST>(
#[derive(Debug)]
pub struct ReadEventsJobParams {
series: SeriesId,
rt: RetentionTime,
scalar_type: ScalarType,
shape: Shape,
ts_msp: TsMs,
range: ScyllaSeriesRange,
fwd: bool,
readopts: EventReadOpts,
}
// TODO
async fn __use_log_and_instrument_code() {
let level = taskrun::query_log_level();
let futgen = move || {
let fut = async move {
let logspan = if level == log::Level::DEBUG {
tracing::span!(log::Level::INFO, "log_span_debug")
} else if level == log::Level::TRACE {
tracing::span!(log::Level::INFO, "log_span_trace")
} else {
tracing::Span::none()
};
let fut = async { 0u8 };
let fut = tracing::Instrument::instrument(fut, logspan);
};
};
todo!()
}
async fn read_next_values_3_fwd(
opts: ReadNextValuesOpts,
mut jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>
where
ST: ValTy,
{
trace_fetch!("read_next_values_2 {:?} st_name {}", opts, ST::st_name());
scy: Arc<Session>,
jobtrace: &mut ReadJobTrace,
) -> Result<(Box<dyn BinningggContainerEventsDyn>,), Error> {
let selfname = "read_next_values_3_fwd";
let val_ty_dyn = &opts.val_ty_dyn;
trace_fetch!("{selfname} {:?} st_name {}", opts, val_ty_dyn.st_name());
let series = opts.series;
let ts_msp = opts.ts_msp;
let range = opts.range;
let table_name = ST::table_name();
let table_name = val_ty_dyn.table_name();
let with_values = opts.readopts.with_values();
if range.end() > TsNano::from_ns(i64::MAX as u64) {
return Err(Error::RangeEndOverflow);
}
let ret = if opts.fwd {
{
// Original
let ts_lsp_min = if range.beg() > ts_msp.ns() {
range.beg().delta(ts_msp.ns())
} else {
@@ -883,213 +847,344 @@ where
} else {
DtNano::from_ns(0)
};
trace_fetch!(
"FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} {}",
ts_msp.fmt(),
ts_lsp_min,
ts_lsp_max,
table_name
);
let qu = stmts
.rt(&opts.rt)
.lsp(!opts.fwd, with_values)
.shape(ST::is_valueblob())
.st(ST::st_name())?;
let qu = {
let mut qu = qu.clone();
if qu.is_token_aware() == false {
return Err(Error::NotTokenAware);
}
qu.set_page_size(10000);
// qu.disable_paging();
qu
};
let params = (
series as i64,
ts_msp.ms() as i64,
ts_lsp_min.ns() as i64,
ts_lsp_max.ns() as i64,
);
trace_fetch!("FWD event search params {:?}", params);
jobtrace.add_event_now(ReadEventKind::CallExecuteIter);
log_fetch_result!("read_next_values_2 {params:?}");
let res = scy.execute_iter(qu.clone(), params).await?;
{
let mut ret = <ST as ValTy>::Container::empty();
// TODO must branch already here depending on what input columns we expect
if with_values {
if <ST as ValTy>::is_valueblob() {
let mut it = res.rows_stream::<(i64, Vec<u8>)>()?;
while let Some(row) = it.try_next().await? {
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::from_valueblob(row.1);
ret.push(ts, value);
}
ret
} else {
let mut i = 0;
let mut it = res.rows_stream::<<ST as ValTy>::ScyRowTy>()?;
while let Some(row) = it.try_next().await? {
let (ts, value) = <ST as ValTy>::scy_row_to_ts_val(ts_msp, row);
log_fetch_result!("read_next_values_2 {params:?} {ts}");
// let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
// let value = <ST as ValTy>::from_scyty(row.1);
ret.push(ts, value);
i += 1;
if i % 2000 == 0 {
jobtrace.add_event_now(ReadEventKind::ScyllaReadRow(i));
}
}
{
jobtrace.add_event_now(ReadEventKind::ScyllaReadRowDone(i));
}
ret
}
} else {
let mut it = res.rows_stream::<(i64,)>()?;
while let Some(row) = it.try_next().await? {
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::default();
ret.push(ts, value);
}
ret
}
}
}
let ts_lsp_min = if range.beg() > ts_msp.ns() {
range.beg().delta(ts_msp.ns())
} else {
let ts_lsp_max = if ts_msp.ns() < range.beg() {
range.beg().delta(ts_msp.ns())
} else {
DtNano::from_ns(0)
};
trace_fetch!(
"BCK ts_msp {} ts_lsp_max {} {}",
ts_msp.fmt(),
ts_lsp_max,
table_name
);
let qu = stmts
.rt(&opts.rt)
.lsp(!opts.fwd, with_values)
.shape(ST::is_valueblob())
.st(ST::st_name())?;
let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64);
trace_fetch!("BCK event search params {:?}", params);
let res = scy.execute_iter(qu.clone(), params).await?;
{
let mut ret = <ST as ValTy>::Container::empty();
// TODO must branch already here depending on what input columns we expect
if with_values {
if <ST as ValTy>::is_valueblob() {
let mut it = res.rows_stream::<(i64, Vec<u8>)>()?;
while let Some(row) = it.try_next().await? {
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::from_valueblob(row.1);
ret.push(ts, value);
}
ret
} else {
let mut i = 0;
let mut it = res.rows_stream::<<ST as ValTy>::ScyRowTy>()?;
while let Some(row) = it.try_next().await? {
let (ts, value) = <ST as ValTy>::scy_row_to_ts_val(ts_msp, row);
// let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
// let value = <ST as ValTy>::from_scyty(row.1);
ret.push(ts, value);
i += 1;
if i % 2000 == 0 {
jobtrace.add_event_now(ReadEventKind::ScyllaReadRow(i));
}
}
{
jobtrace.add_event_now(ReadEventKind::ScyllaReadRowDone(i));
}
ret
}
} else {
let mut it = res.rows_stream::<(i64,)>()?;
while let Some(row) = it.try_next().await? {
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::default();
ret.push(ts, value);
}
ret
}
}
DtNano::from_ns(0)
};
let ts_lsp_max = if range.end() > ts_msp.ns() {
range.end().delta(ts_msp.ns())
} else {
DtNano::from_ns(0)
};
trace_fetch!(
"{selfname} ts_msp {} ts_lsp_min {} ts_lsp_max {} {}",
ts_msp.fmt(),
ts_lsp_min,
ts_lsp_max,
table_name
);
let qu = stmts
.rt(&opts.rt)
.lsp(!opts.fwd, with_values)
.shape(val_ty_dyn.is_valueblob())
.st(val_ty_dyn.st_name())?;
let qu = {
let mut qu = qu.clone();
if qu.is_token_aware() == false {
return Err(Error::NotTokenAware);
}
qu.set_page_size(10000);
// qu.disable_paging();
qu
};
let params = (
series as i64,
ts_msp.ms() as i64,
ts_lsp_min.ns() as i64,
ts_lsp_max.ns() as i64,
);
trace_fetch!("{selfname} event search params {:?}", params);
jobtrace.add_event_now(ReadEventKind::CallExecuteIter);
let res = scy.execute_iter(qu.clone(), params).await?;
let ret = opts.val_ty_dyn.read_into_container(res, ts_msp, with_values).await?;
let byte_est = ret.byte_estimate();
trace_fetch!(
"read ts_msp {} len {} byte_est {}",
"{selfname} read ts_msp {} len {} byte_est {}",
ts_msp.fmt(),
ret.len(),
byte_est
);
let ret = Box::new(ret);
Ok((ret,))
}
async fn read_next_values_3_bck(
opts: ReadNextValuesOpts,
stmts: Arc<StmtsEvents>,
scy: Arc<Session>,
jobtrace: &mut ReadJobTrace,
) -> Result<(Box<dyn BinningggContainerEventsDyn>,), Error> {
let selfname = "read_next_values_3_bck";
let ret = opts.val_ty_dyn.empty_container_for_test();
if true {
return Ok((ret,));
}
let val_ty_dyn = &opts.val_ty_dyn;
trace_fetch!("{selfname} {:?} st_name {}", opts, val_ty_dyn.st_name());
let series = opts.series;
let ts_msp = opts.ts_msp;
let range = opts.range;
let table_name = val_ty_dyn.table_name();
let with_values = opts.readopts.with_values();
if range.end() > TsNano::from_ns(i64::MAX as u64) {
return Err(Error::RangeEndOverflow);
}
let ts_lsp_max = if ts_msp.ns() < range.beg() {
range.beg().delta(ts_msp.ns())
} else {
DtNano::from_ns(0)
};
trace_fetch!(
"{selfname} ts_msp {} ts_lsp_max {} {}",
ts_msp.fmt(),
ts_lsp_max,
table_name
);
let qu = stmts
.rt(&opts.rt)
.lsp(!opts.fwd, with_values)
// TODO
.shape(val_ty_dyn.is_valueblob())
.st(val_ty_dyn.st_name())?;
let qu = if false {
let mut qu = qu.clone();
if qu.is_token_aware() == false {
return Err(Error::NotTokenAware);
}
qu.set_page_size(10000);
// qu.disable_paging();
qu
} else {
qu.clone()
};
let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64);
trace_fetch!("{selfname} event search params {:?}", params);
jobtrace.add_event_now(ReadEventKind::CallExecuteIter);
let res = scy.execute_iter(qu.clone(), params).await?;
let ret = opts.val_ty_dyn.read_into_container(res, ts_msp, with_values).await?;
let byte_est = ret.byte_estimate();
trace_fetch!(
"{selfname} read ts_msp {} len {} byte_est {}",
ts_msp.fmt(),
ret.len(),
byte_est
);
Ok((ret,))
}
async fn read_next_values_3(
opts: ReadNextValuesOpts,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error> {
let mut jobtrace = ReadJobTrace::new();
let (ret,) = if opts.fwd {
read_next_values_3_fwd(opts, stmts, scy, &mut jobtrace).await?
} else {
read_next_values_3_bck(opts, stmts, scy, &mut jobtrace).await?
};
Ok((ret, jobtrace))
}
async fn read_next_values<ST>(
params: ReadNextValuesParams,
) -> Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>
pub async fn read_events_v02(
params: ReadEventsJobParams,
scy: Arc<scylla::client::session::Session>,
stmts: Arc<StmtsEvents>,
) -> Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error> {
let val_ty_dyn = new_val_ty_dyn_from_shape_scalar_type(params.shape.clone(), params.scalar_type.clone());
let opts = ReadNextValuesOpts {
rt: params.rt.clone(),
series: params.series.id(),
ts_msp: params.ts_msp,
range: params.range,
fwd: params.fwd,
readopts: params.readopts,
val_ty_dyn,
};
read_next_values_3(opts, scy, stmts).await
}
trait ValTyDyn: fmt::Debug + Send {
fn table_name(&self) -> &str;
fn st_name(&self) -> &str;
fn is_valueblob(&self) -> bool;
fn read_into_container(
&self,
scyres: QueryPager,
ts_msp: TsMs,
with_vaues: bool,
// TODO
// jobtrace: &mut ReadJobTrace,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn BinningggContainerEventsDyn>, Error>> + Send>>;
fn empty_container_for_test(&self) -> Box<dyn BinningggContainerEventsDyn>;
}
async fn read_into_container_branch_00<ST>(
scyres: QueryPager,
ts_msp: TsMs,
with_values: bool,
// TODO
// jobtrace: &mut ReadJobTrace,
) -> Result<Box<dyn BinningggContainerEventsDyn>, Error>
where
ST: ValTy,
{
let opts = params.opts;
let jobtrace = params.jobtrace;
// TODO could take scyqeue out of opts struct.
let scyqueue = opts.scyqueue.clone();
let level = taskrun::query_log_level();
let futgen = move |scy: Arc<Session>, stmts: Arc<StmtsEvents>, mut jobtrace: ReadJobTrace| {
// TODO avoid this
// opts.jobtrace = jobtrace;
let fut = async move {
// let jobtrace = &mut opts.jobtrace;
let logspan = if level == log::Level::DEBUG {
tracing::span!(log::Level::INFO, "log_span_debug")
} else if level == log::Level::TRACE {
tracing::span!(log::Level::INFO, "log_span_trace")
} else {
tracing::Span::none()
};
jobtrace.add_event_now(ReadEventKind::FutgenCallingReadNextValues);
let fut = ST::read_next_values_trait(opts, jobtrace, scy, stmts);
let fut = tracing::Instrument::instrument(fut, logspan);
match fut.await.map_err(crate::worker::Error::from) {
Ok((ret, mut jobtrace)) => {
jobtrace.add_event_now(ReadEventKind::ReadNextValuesFutureDone);
Ok((ret, jobtrace))
}
Err(e) => Err(e),
let selfname = "read_into_container_branch_00";
let mut jobtrace = ReadJobTrace::new();
// TODO must branch already here depending on what input columns we expect
let mut ret = <ST as ValTy>::Container::empty();
let ret = if with_values {
if ST::is_valueblob() {
let mut it = scyres.rows_stream::<(i64, Vec<u8>)>()?;
while let Some(row) = it.try_next().await? {
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = ST::from_valueblob(row.1);
ret.push(ts, value);
}
};
Box::pin(fut)
as Pin<
Box<
dyn Future<
Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), crate::worker::Error>,
> + Send,
>,
>
ret
} else {
let mut i = 0;
let mut it = scyres.rows_stream::<<ST as ValTy>::ScyRowTy>()?;
while let Some(row) = it.try_next().await? {
let (ts, value) = <ST as ValTy>::scy_row_to_ts_val(ts_msp, row);
log_fetch_result!("{selfname} {ts}");
// let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
// let value = <ST as ValTy>::from_scyty(row.1);
ret.push(ts, value);
i += 1;
if i % 2000 == 0 {
jobtrace.add_event_now(ReadEventKind::ScyllaReadRow(i));
}
}
{
jobtrace.add_event_now(ReadEventKind::ScyllaReadRowDone(i));
}
ret
}
} else {
let mut it = scyres.rows_stream::<(i64,)>()?;
while let Some(row) = it.try_next().await? {
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::default();
ret.push(ts, value);
}
ret
};
let (res, jobtrace) = scyqueue.read_next_values(futgen, jobtrace).await?;
Ok((res, jobtrace))
let ret = Box::new(ret);
Ok(ret)
}
trait ValTy: Sized + 'static {
#[derive(Debug)]
struct ValTyDynTesting<ST>
where
ST: ValTy,
{
_t1: PhantomData<ST>,
}
impl<ST> ValTyDynTesting<ST>
where
ST: ValTy,
{
fn new() -> Self {
Self { _t1: PhantomData }
}
fn boxed() -> Box<dyn ValTyDyn> {
Box::new(Self::new())
}
}
fn new_val_ty_dyn_from_shape_scalar_type(shape: Shape, scalar_type: ScalarType) -> Box<dyn ValTyDyn> {
match shape {
Shape::Scalar => {
use ScalarType::*;
match scalar_type {
U8 => ValTyDynTesting::<u8>::boxed(),
U16 => ValTyDynTesting::<u16>::boxed(),
U32 => ValTyDynTesting::<u32>::boxed(),
U64 => ValTyDynTesting::<u64>::boxed(),
I8 => ValTyDynTesting::<i8>::boxed(),
I16 => ValTyDynTesting::<i16>::boxed(),
I32 => ValTyDynTesting::<i32>::boxed(),
I64 => ValTyDynTesting::<i64>::boxed(),
F32 => ValTyDynTesting::<f32>::boxed(),
F64 => ValTyDynTesting::<f64>::boxed(),
BOOL => ValTyDynTesting::<bool>::boxed(),
STRING => ValTyDynTesting::<String>::boxed(),
Enum => ValTyDynTesting::<EnumVariant>::boxed(),
}
}
Shape::Wave(_) => {
use ScalarType::*;
match scalar_type {
U8 => ValTyDynTesting::<Vec<u8>>::boxed(),
U16 => ValTyDynTesting::<Vec<u16>>::boxed(),
U32 => ValTyDynTesting::<Vec<u32>>::boxed(),
U64 => ValTyDynTesting::<Vec<u64>>::boxed(),
I8 => ValTyDynTesting::<Vec<i8>>::boxed(),
I16 => ValTyDynTesting::<Vec<i16>>::boxed(),
I32 => ValTyDynTesting::<Vec<i32>>::boxed(),
I64 => ValTyDynTesting::<Vec<i64>>::boxed(),
F32 => ValTyDynTesting::<Vec<f32>>::boxed(),
F64 => ValTyDynTesting::<Vec<f64>>::boxed(),
BOOL => ValTyDynTesting::<Vec<bool>>::boxed(),
STRING => {
warn!("read not yet supported {:?} {:?}", shape, scalar_type);
err::todoval()
}
Enum => {
warn!("read not yet supported {:?} {:?}", shape, scalar_type);
err::todoval()
}
}
}
Shape::Image(_, _) => {
error!("TODO ReadValues add more types");
err::todoval()
}
}
}
impl<ST> ValTyDyn for ValTyDynTesting<ST>
where
ST: ValTy,
{
fn table_name(&self) -> &str {
ST::table_name()
}
fn st_name(&self) -> &str {
ST::st_name()
}
fn is_valueblob(&self) -> bool {
ST::is_valueblob()
}
fn read_into_container(
&self,
scyres: QueryPager,
ts_msp: TsMs,
with_values: bool,
// jobtrace: &mut ReadJobTrace,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn BinningggContainerEventsDyn>, Error>> + Send>> {
// TODO
Box::pin(read_into_container_branch_00::<ST>(
scyres,
ts_msp,
with_values,
// TODO
// jobtrace,
))
}
fn empty_container_for_test(&self) -> Box<dyn BinningggContainerEventsDyn> {
Box::new(<ST::Container as Empty>::empty())
}
}
trait ValTy: fmt::Debug + Send + Sized + 'static {
type ScaTy: ScalarOps + std::default::Default;
type ScyTy: for<'a, 'b> scylla::deserialize::value::DeserializeValue<'a, 'b>;
type ScyRowTy: for<'a, 'b> scylla::deserialize::row::DeserializeRow<'a, 'b>;
type ScyTy: for<'a, 'b> scylla::deserialize::value::DeserializeValue<'a, 'b> + Send;
type ScyRowTy: for<'a, 'b> scylla::deserialize::row::DeserializeRow<'a, 'b> + Send;
type Container: BinningggContainerEventsDyn + Empty + Appendable<Self>;
fn from_valueblob(inp: Vec<u8>) -> Self;
fn table_name() -> &'static str;
fn default() -> Self;
fn is_valueblob() -> bool;
fn st_name() -> &'static str;
fn read_next_values_trait(
opts: ReadNextValuesOpts,
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>>;
fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self);
}
@@ -1121,15 +1216,6 @@ macro_rules! impl_scaty_scalar {
$st_name
}
fn read_next_values_trait(
opts: ReadNextValuesOpts,
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>> {
Box::pin(read_next_values_2::<Self>(opts, jobtrace, scy, stmts))
}
fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) {
let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64);
(ts, inp.1 as Self::ScaTy)
@@ -1179,18 +1265,9 @@ macro_rules! impl_scaty_array {
$st_name
}
fn read_next_values_trait(
opts: ReadNextValuesOpts,
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>> {
Box::pin(read_next_values_2::<Self>(opts, jobtrace, scy, stmts))
}
fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) {
let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64);
(ts, inp.1 .into_iter().map(|x| x as _).collect())
(ts, inp.1.into_iter().map(|x| x as _).collect())
}
}
};
@@ -1222,15 +1299,6 @@ impl ValTy for EnumVariant {
"enum"
}
fn read_next_values_trait(
opts: ReadNextValuesOpts,
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>> {
Box::pin(read_next_values_2::<Self>(opts, jobtrace, scy, stmts))
}
fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) {
let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64);
(ts, EnumVariant::new(inp.1 as u16, inp.2))
@@ -1263,16 +1331,6 @@ impl ValTy for Vec<String> {
"string"
}
fn read_next_values_trait(
opts: ReadNextValuesOpts,
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>> {
let fut = read_next_values_2::<Self>(opts, jobtrace, scy, stmts);
Box::pin(fut)
}
fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) {
let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64);
(ts, inp.1)

View File

@@ -20,9 +20,7 @@ macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if $det { log::trace!($(
macro_rules! trace_msp { ($($arg:tt)*) => ( if true { log::trace!($($arg)*); } ) }
macro_rules! log_fetch_result {
($($arg:tt)*) => { if false { log::trace!("fetch {}", format_args!($($arg)*)); } };
}
macro_rules! log_fetch_result { ($($arg:tt)*) => { if true { log::trace!("fetch {}", format_args!($($arg)*)); } }; }
autoerr::create_error_v1!(
name(Error, "EventsMsp"),
@@ -378,32 +376,39 @@ async fn find_ts_msp_bck_workaround(
scy: &Session,
) -> Result<VecDeque<TsMs>, Error> {
let selfname = "find_ts_msp_bck_workaround";
let mut ret = VecDeque::new();
let mut ret = Vec::new();
// let params = (series as i64, 0 as i64, range.beg().ms() as i64);
let params = (series as i64, 0 as i64, i64::MAX);
log_fetch_result!("{selfname} {:?}", params);
let mut res = scy
.execute_iter(stmts.rt(rt).ts_msp_fwd().clone(), params)
.execute_iter(stmts.rt(rt).ts_msp_bck_workaround().clone(), params)
.await?
.rows_stream::<(i64,)>()?;
let mut c = 0;
while let Some(row) = res.try_next().await? {
c += 1;
let ts = TsMs::from_ms_u64(row.0 as u64);
log_fetch_result!("{selfname} {params:?} {ts}");
ret.push_back(ts);
if ret.len() > 1024 * 1024 {
return Err(Error::TooManyRows);
if ts >= range.beg().to_ts_ms() {
log_fetch_result!("{selfname} {params:?} {ts} DISCARD AFTER RANGE");
} else {
if ret.len() > 1024 * 1024 {
return Err(Error::TooManyRows);
} else {
log_fetch_result!("{selfname} {params:?} {ts} USE");
ret.push(ts);
}
}
}
log_fetch_result!("{selfname} {params:?} considered msp {c}");
if ret.len() > 1024 * 2 {
log::info!("quite many ts_msp values in reverse lookup len {}", ret.len());
}
if ret.len() > 1024 * 64 {
log::warn!("quite many ts_msp values in reverse lookup len {}", ret.len());
}
let ret = if ret.len() > 2 {
let tmp: Vec<_> = ret.into_iter().rev().take(2).collect();
tmp.into_iter().rev().collect()
} else {
ret
};
let m = ret.len().max(2) - 2;
log_fetch_result!("{selfname} {params:?} {ret:?} {m}");
let ret = ret.into_iter().skip(m).collect();
log_fetch_result!("{selfname} {params:?} {ret:?}");
Ok(ret)
}

View File

@@ -74,6 +74,7 @@ impl StmtsLspDir {
pub struct StmtsEventsRt {
ts_msp_fwd: PreparedStatement,
ts_msp_bck: PreparedStatement,
ts_msp_bck_workaround: PreparedStatement,
lsp_fwd_val: StmtsLspDir,
lsp_bck_val: StmtsLspDir,
lsp_fwd_ts: StmtsLspDir,
@@ -91,6 +92,10 @@ impl StmtsEventsRt {
&self.ts_msp_bck
}
pub fn ts_msp_bck_workaround(&self) -> &PreparedStatement {
&self.ts_msp_bck_workaround
}
pub fn lsp(&self, bck: bool, val: bool) -> &StmtsLspDir {
if bck {
if val { &self.lsp_bck_val } else { &self.lsp_bck_ts }
@@ -134,6 +139,27 @@ async fn make_msp_dir(
Ok(qu)
}
async fn make_msp_fwd_for_bck_workaround(
ks: &str,
rt: &RetentionTime,
query_opts: &str,
scy: &Session,
) -> Result<PreparedStatement, Error> {
let table_name = "ts_msp";
let select_cond = "ts_msp >= ? and ts_msp < ?";
let cql = format!(
"select ts_msp from {}.{}{} where series = ? and {} {}",
ks,
rt.table_prefix(),
table_name,
select_cond,
query_opts
);
info_prepare!("{ks} {rt} {cql}");
let qu = scy.prepare(cql).await?;
Ok(qu)
}
async fn make_lsp(
ks: &str,
rt: &RetentionTime,
@@ -290,6 +316,7 @@ async fn make_rt(ks: &str, rt: &RetentionTime, query_opts: &str, scy: &Session)
let ret = StmtsEventsRt {
ts_msp_fwd: make_msp_dir(ks, rt, false, query_opts, scy).await?,
ts_msp_bck: make_msp_dir(ks, rt, true, query_opts, scy).await?,
ts_msp_bck_workaround: make_msp_fwd_for_bck_workaround(ks, rt, query_opts, scy).await?,
lsp_fwd_val: make_lsp_dir(ks, rt, "ts_lsp, value", false, query_opts, scy).await?,
lsp_bck_val: make_lsp_dir(ks, rt, "ts_lsp, value", true, query_opts, scy).await?,
lsp_fwd_ts: make_lsp_dir(ks, rt, "ts_lsp", false, query_opts, scy).await?,

View File

@@ -1,5 +1,6 @@
use crate::binwriteindex::BinWriteIndexEntry;
use crate::conn::create_scy_session_no_ks;
use crate::events2::events::ReadEventsJobParams;
use crate::events2::events::ReadJobTrace;
use crate::events2::prepare::StmtsEvents;
use crate::range::ScyllaSeriesRange;
@@ -166,6 +167,10 @@ enum Job {
BinWriteIndexRead(BinWriteIndexRead),
PrepareV1(PrepareV1),
ExecuteV1(ExecuteV1),
ReadEvents02(
ReadEventsJobParams,
Sender<Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>>,
),
}
struct ReadNextValues {
@@ -208,6 +213,7 @@ impl ScyllaQueue {
Ok(res)
}
// TODO remove
pub async fn read_next_values<F>(
&self,
futgen: F,
@@ -234,6 +240,17 @@ impl ScyllaQueue {
Ok(res)
}
pub async fn read_events_v02(
&self,
params: ReadEventsJobParams,
) -> Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error> {
let (tx, rx) = async_channel::bounded(1);
let job = Job::ReadEvents02(params, tx);
self.tx.send(job).await.map_err(|_| Error::ChannelSend)?;
let res = rx.recv().await.map_err(|_| Error::ChannelRecv)??;
Ok(res)
}
pub async fn accounting_read_ts(
&self,
rt: RetentionTime,
@@ -462,6 +479,14 @@ impl ScyllaWorker {
// TODO log?
let _ = job.tx.send(res).await;
}
Job::ReadEvents02(params, tx) => {
let res = crate::events2::events::read_events_v02(params, scy.clone(), stmts.clone())
.await
.map_err(From::from);
if tx.send(res).await.is_err() {
// TODO count for stats
}
}
}
})
.buffer_unordered(CONCURRENT_QUERIES_PER_WORKER)