diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index 40289f4..7cb6d4b 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqretrieve" -version = "0.5.5-aa.15" +version = "0.5.5-aa.18" authors = ["Dominik Werder "] edition = "2024" diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index d99787c..b41c578 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -29,9 +29,11 @@ use netpod::TsMsVecFmt; use netpod::TsNano; use netpod::log; use netpod::ttl::RetentionTime; +use scylla::client::pager::QueryPager; use scylla::client::session::Session; use std::collections::VecDeque; use std::fmt; +use std::marker::PhantomData; use std::pin::Pin; use std::sync::Arc; use std::task::Context; @@ -46,13 +48,13 @@ macro_rules! warn { ($($arg:expr),*) => ( if true { log::warn!($($arg),*); } ) } macro_rules! trace_init { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) } -macro_rules! trace_fetch { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) } +macro_rules! trace_fetch { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ) } -macro_rules! trace_msp_fetch { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) } +macro_rules! trace_msp_fetch { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ) } -macro_rules! trace_redo_fwd_read { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) } +macro_rules! trace_redo_fwd_read { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ) } -macro_rules! trace_emit { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) } +macro_rules! trace_emit { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ) } macro_rules! trace_every_event { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ) } @@ -301,34 +303,7 @@ pub(super) struct ReadNextValuesOpts { range: ScyllaSeriesRange, fwd: bool, readopts: EventReadOpts, - scyqueue: ScyllaQueue, -} - -impl ReadNextValuesOpts { - pub(super) fn new( - rt: RetentionTime, - series: SeriesId, - ts_msp: TsMs, - range: ScyllaSeriesRange, - fwd: bool, - readopts: EventReadOpts, - scyqueue: ScyllaQueue, - ) -> Self { - Self { - rt, - series: series.id(), - ts_msp, - range, - fwd, - readopts, - scyqueue, - } - } -} - -struct ReadNextValuesParams { - opts: ReadNextValuesOpts, - jobtrace: ReadJobTrace, + val_ty_dyn: Box, } #[derive(Clone)] @@ -444,66 +419,23 @@ impl EventsStreamRt { mfi: MakeFutInfo, jobtrace: ReadJobTrace, ) -> Pin, ReadJobTrace), Error>> + Send>> { - let opts = ReadNextValuesOpts::new(mfi.rt, mfi.series, ts_msp, mfi.range, !bck, mfi.readopts, mfi.scyqueue); - let scalar_type = mfi.ch_conf.scalar_type().clone(); - let shape = mfi.ch_conf.shape().clone(); - trace_fetch!( - "make_read_events_fut bck {} msp {:?} {} {:?} {:?}", - bck, + let scyqueue = mfi.scyqueue.clone(); + let rt = mfi.rt.clone(); + let series = mfi.series.clone(); + let range = mfi.range.clone(); + let readopts = mfi.readopts.clone(); + let ch_conf = mfi.ch_conf.clone(); + let params = ReadEventsJobParams { + series, + rt, + scalar_type: ch_conf.scalar_type().clone(), + shape: ch_conf.shape().clone(), ts_msp, - ts_msp.fmt(), - shape, - scalar_type - ); - let fut = async move { - if false { - taskrun::tokio::time::sleep(Duration::from_millis(10)).await; - } - let params = ReadNextValuesParams { opts, jobtrace }; - let ret = match &shape { - Shape::Scalar => match &scalar_type { - ScalarType::U8 => read_next_values::(params).await, - ScalarType::U16 => read_next_values::(params).await, - ScalarType::U32 => read_next_values::(params).await, - ScalarType::U64 => read_next_values::(params).await, - ScalarType::I8 => read_next_values::(params).await, - ScalarType::I16 => read_next_values::(params).await, - ScalarType::I32 => read_next_values::(params).await, - ScalarType::I64 => read_next_values::(params).await, - ScalarType::F32 => read_next_values::(params).await, - ScalarType::F64 => read_next_values::(params).await, - ScalarType::BOOL => read_next_values::(params).await, - ScalarType::STRING => read_next_values::(params).await, - ScalarType::Enum => read_next_values::(params).await, - }, - Shape::Wave(_) => match &scalar_type { - ScalarType::U8 => read_next_values::>(params).await, - ScalarType::U16 => read_next_values::>(params).await, - ScalarType::U32 => read_next_values::>(params).await, - ScalarType::U64 => read_next_values::>(params).await, - ScalarType::I8 => read_next_values::>(params).await, - ScalarType::I16 => read_next_values::>(params).await, - ScalarType::I32 => read_next_values::>(params).await, - ScalarType::I64 => read_next_values::>(params).await, - ScalarType::F32 => read_next_values::>(params).await, - ScalarType::F64 => read_next_values::>(params).await, - ScalarType::BOOL => read_next_values::>(params).await, - ScalarType::STRING => { - warn!("read not yet supported {:?} {:?}", shape, scalar_type); - err::todoval() - } - ScalarType::Enum => { - warn!("read not yet supported {:?} {:?}", shape, scalar_type); - err::todoval() - } - }, - _ => { - error!("TODO ReadValues add more types"); - err::todoval() - } - }; - ret.map_err(Error::from) + range, + fwd: !bck, + readopts, }; + let fut = async move { scyqueue.read_events_v02(params).await.map_err(From::from) }; Box::pin(fut) } @@ -689,7 +621,8 @@ impl Stream for EventsStreamRt { }); } } else { - panic!("absolutely nothing to read"); + log::error!("logic error no msp to read should be handled before"); + self.transition_to_fwd_read(); } continue; } @@ -854,25 +787,56 @@ fn phantomval() -> T { panic!() } -async fn read_next_values_2( +#[derive(Debug)] +pub struct ReadEventsJobParams { + series: SeriesId, + rt: RetentionTime, + scalar_type: ScalarType, + shape: Shape, + ts_msp: TsMs, + range: ScyllaSeriesRange, + fwd: bool, + readopts: EventReadOpts, +} + +// TODO +async fn __use_log_and_instrument_code() { + let level = taskrun::query_log_level(); + let futgen = move || { + let fut = async move { + let logspan = if level == log::Level::DEBUG { + tracing::span!(log::Level::INFO, "log_span_debug") + } else if level == log::Level::TRACE { + tracing::span!(log::Level::INFO, "log_span_trace") + } else { + tracing::Span::none() + }; + let fut = async { 0u8 }; + let fut = tracing::Instrument::instrument(fut, logspan); + }; + }; + todo!() +} + +async fn read_next_values_3_fwd( opts: ReadNextValuesOpts, - mut jobtrace: ReadJobTrace, - scy: Arc, stmts: Arc, -) -> Result<(Box, ReadJobTrace), Error> -where - ST: ValTy, -{ - trace_fetch!("read_next_values_2 {:?} st_name {}", opts, ST::st_name()); + scy: Arc, + jobtrace: &mut ReadJobTrace, +) -> Result<(Box,), Error> { + let selfname = "read_next_values_3_fwd"; + let val_ty_dyn = &opts.val_ty_dyn; + trace_fetch!("{selfname} {:?} st_name {}", opts, val_ty_dyn.st_name()); let series = opts.series; let ts_msp = opts.ts_msp; let range = opts.range; - let table_name = ST::table_name(); + let table_name = val_ty_dyn.table_name(); let with_values = opts.readopts.with_values(); if range.end() > TsNano::from_ns(i64::MAX as u64) { return Err(Error::RangeEndOverflow); } - let ret = if opts.fwd { + { + // Original let ts_lsp_min = if range.beg() > ts_msp.ns() { range.beg().delta(ts_msp.ns()) } else { @@ -883,213 +847,344 @@ where } else { DtNano::from_ns(0) }; - trace_fetch!( - "FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} {}", - ts_msp.fmt(), - ts_lsp_min, - ts_lsp_max, - table_name - ); - let qu = stmts - .rt(&opts.rt) - .lsp(!opts.fwd, with_values) - .shape(ST::is_valueblob()) - .st(ST::st_name())?; - let qu = { - let mut qu = qu.clone(); - if qu.is_token_aware() == false { - return Err(Error::NotTokenAware); - } - qu.set_page_size(10000); - // qu.disable_paging(); - qu - }; - let params = ( - series as i64, - ts_msp.ms() as i64, - ts_lsp_min.ns() as i64, - ts_lsp_max.ns() as i64, - ); - trace_fetch!("FWD event search params {:?}", params); - jobtrace.add_event_now(ReadEventKind::CallExecuteIter); - log_fetch_result!("read_next_values_2 {params:?}"); - let res = scy.execute_iter(qu.clone(), params).await?; - { - let mut ret = ::Container::empty(); - // TODO must branch already here depending on what input columns we expect - if with_values { - if ::is_valueblob() { - let mut it = res.rows_stream::<(i64, Vec)>()?; - while let Some(row) = it.try_next().await? { - let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); - let value = ::from_valueblob(row.1); - ret.push(ts, value); - } - ret - } else { - let mut i = 0; - let mut it = res.rows_stream::<::ScyRowTy>()?; - while let Some(row) = it.try_next().await? { - let (ts, value) = ::scy_row_to_ts_val(ts_msp, row); - log_fetch_result!("read_next_values_2 {params:?} {ts}"); - // let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); - // let value = ::from_scyty(row.1); - ret.push(ts, value); - i += 1; - if i % 2000 == 0 { - jobtrace.add_event_now(ReadEventKind::ScyllaReadRow(i)); - } - } - { - jobtrace.add_event_now(ReadEventKind::ScyllaReadRowDone(i)); - } - ret - } - } else { - let mut it = res.rows_stream::<(i64,)>()?; - while let Some(row) = it.try_next().await? { - let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); - let value = ::default(); - ret.push(ts, value); - } - ret - } - } + } + let ts_lsp_min = if range.beg() > ts_msp.ns() { + range.beg().delta(ts_msp.ns()) } else { - let ts_lsp_max = if ts_msp.ns() < range.beg() { - range.beg().delta(ts_msp.ns()) - } else { - DtNano::from_ns(0) - }; - trace_fetch!( - "BCK ts_msp {} ts_lsp_max {} {}", - ts_msp.fmt(), - ts_lsp_max, - table_name - ); - let qu = stmts - .rt(&opts.rt) - .lsp(!opts.fwd, with_values) - .shape(ST::is_valueblob()) - .st(ST::st_name())?; - let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64); - trace_fetch!("BCK event search params {:?}", params); - let res = scy.execute_iter(qu.clone(), params).await?; - { - let mut ret = ::Container::empty(); - // TODO must branch already here depending on what input columns we expect - if with_values { - if ::is_valueblob() { - let mut it = res.rows_stream::<(i64, Vec)>()?; - while let Some(row) = it.try_next().await? { - let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); - let value = ::from_valueblob(row.1); - ret.push(ts, value); - } - ret - } else { - let mut i = 0; - let mut it = res.rows_stream::<::ScyRowTy>()?; - while let Some(row) = it.try_next().await? { - let (ts, value) = ::scy_row_to_ts_val(ts_msp, row); - // let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); - // let value = ::from_scyty(row.1); - ret.push(ts, value); - i += 1; - if i % 2000 == 0 { - jobtrace.add_event_now(ReadEventKind::ScyllaReadRow(i)); - } - } - { - jobtrace.add_event_now(ReadEventKind::ScyllaReadRowDone(i)); - } - ret - } - } else { - let mut it = res.rows_stream::<(i64,)>()?; - while let Some(row) = it.try_next().await? { - let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); - let value = ::default(); - ret.push(ts, value); - } - ret - } - } + DtNano::from_ns(0) }; + let ts_lsp_max = if range.end() > ts_msp.ns() { + range.end().delta(ts_msp.ns()) + } else { + DtNano::from_ns(0) + }; + trace_fetch!( + "{selfname} ts_msp {} ts_lsp_min {} ts_lsp_max {} {}", + ts_msp.fmt(), + ts_lsp_min, + ts_lsp_max, + table_name + ); + let qu = stmts + .rt(&opts.rt) + .lsp(!opts.fwd, with_values) + .shape(val_ty_dyn.is_valueblob()) + .st(val_ty_dyn.st_name())?; + let qu = { + let mut qu = qu.clone(); + if qu.is_token_aware() == false { + return Err(Error::NotTokenAware); + } + qu.set_page_size(10000); + // qu.disable_paging(); + qu + }; + let params = ( + series as i64, + ts_msp.ms() as i64, + ts_lsp_min.ns() as i64, + ts_lsp_max.ns() as i64, + ); + trace_fetch!("{selfname} event search params {:?}", params); + jobtrace.add_event_now(ReadEventKind::CallExecuteIter); + let res = scy.execute_iter(qu.clone(), params).await?; + let ret = opts.val_ty_dyn.read_into_container(res, ts_msp, with_values).await?; let byte_est = ret.byte_estimate(); trace_fetch!( - "read ts_msp {} len {} byte_est {}", + "{selfname} read ts_msp {} len {} byte_est {}", ts_msp.fmt(), ret.len(), byte_est ); - let ret = Box::new(ret); + Ok((ret,)) +} + +async fn read_next_values_3_bck( + opts: ReadNextValuesOpts, + stmts: Arc, + scy: Arc, + jobtrace: &mut ReadJobTrace, +) -> Result<(Box,), Error> { + let selfname = "read_next_values_3_bck"; + let ret = opts.val_ty_dyn.empty_container_for_test(); + if true { + return Ok((ret,)); + } + let val_ty_dyn = &opts.val_ty_dyn; + trace_fetch!("{selfname} {:?} st_name {}", opts, val_ty_dyn.st_name()); + let series = opts.series; + let ts_msp = opts.ts_msp; + let range = opts.range; + let table_name = val_ty_dyn.table_name(); + let with_values = opts.readopts.with_values(); + if range.end() > TsNano::from_ns(i64::MAX as u64) { + return Err(Error::RangeEndOverflow); + } + let ts_lsp_max = if ts_msp.ns() < range.beg() { + range.beg().delta(ts_msp.ns()) + } else { + DtNano::from_ns(0) + }; + trace_fetch!( + "{selfname} ts_msp {} ts_lsp_max {} {}", + ts_msp.fmt(), + ts_lsp_max, + table_name + ); + let qu = stmts + .rt(&opts.rt) + .lsp(!opts.fwd, with_values) + // TODO + .shape(val_ty_dyn.is_valueblob()) + .st(val_ty_dyn.st_name())?; + let qu = if false { + let mut qu = qu.clone(); + if qu.is_token_aware() == false { + return Err(Error::NotTokenAware); + } + qu.set_page_size(10000); + // qu.disable_paging(); + qu + } else { + qu.clone() + }; + let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64); + trace_fetch!("{selfname} event search params {:?}", params); + jobtrace.add_event_now(ReadEventKind::CallExecuteIter); + let res = scy.execute_iter(qu.clone(), params).await?; + let ret = opts.val_ty_dyn.read_into_container(res, ts_msp, with_values).await?; + let byte_est = ret.byte_estimate(); + trace_fetch!( + "{selfname} read ts_msp {} len {} byte_est {}", + ts_msp.fmt(), + ret.len(), + byte_est + ); + Ok((ret,)) +} + +async fn read_next_values_3( + opts: ReadNextValuesOpts, + scy: Arc, + stmts: Arc, +) -> Result<(Box, ReadJobTrace), Error> { + let mut jobtrace = ReadJobTrace::new(); + let (ret,) = if opts.fwd { + read_next_values_3_fwd(opts, stmts, scy, &mut jobtrace).await? + } else { + read_next_values_3_bck(opts, stmts, scy, &mut jobtrace).await? + }; Ok((ret, jobtrace)) } -async fn read_next_values( - params: ReadNextValuesParams, -) -> Result<(Box, ReadJobTrace), Error> +pub async fn read_events_v02( + params: ReadEventsJobParams, + scy: Arc, + stmts: Arc, +) -> Result<(Box, ReadJobTrace), Error> { + let val_ty_dyn = new_val_ty_dyn_from_shape_scalar_type(params.shape.clone(), params.scalar_type.clone()); + let opts = ReadNextValuesOpts { + rt: params.rt.clone(), + series: params.series.id(), + ts_msp: params.ts_msp, + range: params.range, + fwd: params.fwd, + readopts: params.readopts, + val_ty_dyn, + }; + read_next_values_3(opts, scy, stmts).await +} + +trait ValTyDyn: fmt::Debug + Send { + fn table_name(&self) -> &str; + fn st_name(&self) -> &str; + fn is_valueblob(&self) -> bool; + fn read_into_container( + &self, + scyres: QueryPager, + ts_msp: TsMs, + with_vaues: bool, + // TODO + // jobtrace: &mut ReadJobTrace, + ) -> Pin, Error>> + Send>>; + fn empty_container_for_test(&self) -> Box; +} + +async fn read_into_container_branch_00( + scyres: QueryPager, + ts_msp: TsMs, + with_values: bool, + // TODO + // jobtrace: &mut ReadJobTrace, +) -> Result, Error> where ST: ValTy, { - let opts = params.opts; - let jobtrace = params.jobtrace; - // TODO could take scyqeue out of opts struct. - let scyqueue = opts.scyqueue.clone(); - let level = taskrun::query_log_level(); - let futgen = move |scy: Arc, stmts: Arc, mut jobtrace: ReadJobTrace| { - // TODO avoid this - // opts.jobtrace = jobtrace; - let fut = async move { - // let jobtrace = &mut opts.jobtrace; - let logspan = if level == log::Level::DEBUG { - tracing::span!(log::Level::INFO, "log_span_debug") - } else if level == log::Level::TRACE { - tracing::span!(log::Level::INFO, "log_span_trace") - } else { - tracing::Span::none() - }; - jobtrace.add_event_now(ReadEventKind::FutgenCallingReadNextValues); - let fut = ST::read_next_values_trait(opts, jobtrace, scy, stmts); - let fut = tracing::Instrument::instrument(fut, logspan); - match fut.await.map_err(crate::worker::Error::from) { - Ok((ret, mut jobtrace)) => { - jobtrace.add_event_now(ReadEventKind::ReadNextValuesFutureDone); - Ok((ret, jobtrace)) - } - Err(e) => Err(e), + let selfname = "read_into_container_branch_00"; + let mut jobtrace = ReadJobTrace::new(); + // TODO must branch already here depending on what input columns we expect + let mut ret = ::Container::empty(); + let ret = if with_values { + if ST::is_valueblob() { + let mut it = scyres.rows_stream::<(i64, Vec)>()?; + while let Some(row) = it.try_next().await? { + let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); + let value = ST::from_valueblob(row.1); + ret.push(ts, value); } - }; - Box::pin(fut) - as Pin< - Box< - dyn Future< - Output = Result<(Box, ReadJobTrace), crate::worker::Error>, - > + Send, - >, - > + ret + } else { + let mut i = 0; + let mut it = scyres.rows_stream::<::ScyRowTy>()?; + while let Some(row) = it.try_next().await? { + let (ts, value) = ::scy_row_to_ts_val(ts_msp, row); + log_fetch_result!("{selfname} {ts}"); + // let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); + // let value = ::from_scyty(row.1); + ret.push(ts, value); + i += 1; + if i % 2000 == 0 { + jobtrace.add_event_now(ReadEventKind::ScyllaReadRow(i)); + } + } + { + jobtrace.add_event_now(ReadEventKind::ScyllaReadRowDone(i)); + } + ret + } + } else { + let mut it = scyres.rows_stream::<(i64,)>()?; + while let Some(row) = it.try_next().await? { + let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); + let value = ::default(); + ret.push(ts, value); + } + ret }; - let (res, jobtrace) = scyqueue.read_next_values(futgen, jobtrace).await?; - Ok((res, jobtrace)) + let ret = Box::new(ret); + Ok(ret) } -trait ValTy: Sized + 'static { +#[derive(Debug)] +struct ValTyDynTesting +where + ST: ValTy, +{ + _t1: PhantomData, +} + +impl ValTyDynTesting +where + ST: ValTy, +{ + fn new() -> Self { + Self { _t1: PhantomData } + } + + fn boxed() -> Box { + Box::new(Self::new()) + } +} + +fn new_val_ty_dyn_from_shape_scalar_type(shape: Shape, scalar_type: ScalarType) -> Box { + match shape { + Shape::Scalar => { + use ScalarType::*; + match scalar_type { + U8 => ValTyDynTesting::::boxed(), + U16 => ValTyDynTesting::::boxed(), + U32 => ValTyDynTesting::::boxed(), + U64 => ValTyDynTesting::::boxed(), + I8 => ValTyDynTesting::::boxed(), + I16 => ValTyDynTesting::::boxed(), + I32 => ValTyDynTesting::::boxed(), + I64 => ValTyDynTesting::::boxed(), + F32 => ValTyDynTesting::::boxed(), + F64 => ValTyDynTesting::::boxed(), + BOOL => ValTyDynTesting::::boxed(), + STRING => ValTyDynTesting::::boxed(), + Enum => ValTyDynTesting::::boxed(), + } + } + Shape::Wave(_) => { + use ScalarType::*; + match scalar_type { + U8 => ValTyDynTesting::>::boxed(), + U16 => ValTyDynTesting::>::boxed(), + U32 => ValTyDynTesting::>::boxed(), + U64 => ValTyDynTesting::>::boxed(), + I8 => ValTyDynTesting::>::boxed(), + I16 => ValTyDynTesting::>::boxed(), + I32 => ValTyDynTesting::>::boxed(), + I64 => ValTyDynTesting::>::boxed(), + F32 => ValTyDynTesting::>::boxed(), + F64 => ValTyDynTesting::>::boxed(), + BOOL => ValTyDynTesting::>::boxed(), + STRING => { + warn!("read not yet supported {:?} {:?}", shape, scalar_type); + err::todoval() + } + Enum => { + warn!("read not yet supported {:?} {:?}", shape, scalar_type); + err::todoval() + } + } + } + Shape::Image(_, _) => { + error!("TODO ReadValues add more types"); + err::todoval() + } + } +} + +impl ValTyDyn for ValTyDynTesting +where + ST: ValTy, +{ + fn table_name(&self) -> &str { + ST::table_name() + } + + fn st_name(&self) -> &str { + ST::st_name() + } + + fn is_valueblob(&self) -> bool { + ST::is_valueblob() + } + + fn read_into_container( + &self, + scyres: QueryPager, + ts_msp: TsMs, + with_values: bool, + // jobtrace: &mut ReadJobTrace, + ) -> Pin, Error>> + Send>> { + // TODO + Box::pin(read_into_container_branch_00::( + scyres, + ts_msp, + with_values, + // TODO + // jobtrace, + )) + } + + fn empty_container_for_test(&self) -> Box { + Box::new(::empty()) + } +} + +trait ValTy: fmt::Debug + Send + Sized + 'static { type ScaTy: ScalarOps + std::default::Default; - type ScyTy: for<'a, 'b> scylla::deserialize::value::DeserializeValue<'a, 'b>; - type ScyRowTy: for<'a, 'b> scylla::deserialize::row::DeserializeRow<'a, 'b>; + type ScyTy: for<'a, 'b> scylla::deserialize::value::DeserializeValue<'a, 'b> + Send; + type ScyRowTy: for<'a, 'b> scylla::deserialize::row::DeserializeRow<'a, 'b> + Send; type Container: BinningggContainerEventsDyn + Empty + Appendable; fn from_valueblob(inp: Vec) -> Self; fn table_name() -> &'static str; fn default() -> Self; fn is_valueblob() -> bool; fn st_name() -> &'static str; - fn read_next_values_trait( - opts: ReadNextValuesOpts, - jobtrace: ReadJobTrace, - scy: Arc, - stmts: Arc, - ) -> Pin, ReadJobTrace), Error>> + Send>>; fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self); } @@ -1121,15 +1216,6 @@ macro_rules! impl_scaty_scalar { $st_name } - fn read_next_values_trait( - opts: ReadNextValuesOpts, - jobtrace: ReadJobTrace, - scy: Arc, - stmts: Arc, - ) -> Pin, ReadJobTrace), Error>> + Send>> { - Box::pin(read_next_values_2::(opts, jobtrace, scy, stmts)) - } - fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) { let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64); (ts, inp.1 as Self::ScaTy) @@ -1179,18 +1265,9 @@ macro_rules! impl_scaty_array { $st_name } - fn read_next_values_trait( - opts: ReadNextValuesOpts, - jobtrace: ReadJobTrace, - scy: Arc, - stmts: Arc, - ) -> Pin, ReadJobTrace), Error>> + Send>> { - Box::pin(read_next_values_2::(opts, jobtrace, scy, stmts)) - } - fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) { let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64); - (ts, inp.1 .into_iter().map(|x| x as _).collect()) + (ts, inp.1.into_iter().map(|x| x as _).collect()) } } }; @@ -1222,15 +1299,6 @@ impl ValTy for EnumVariant { "enum" } - fn read_next_values_trait( - opts: ReadNextValuesOpts, - jobtrace: ReadJobTrace, - scy: Arc, - stmts: Arc, - ) -> Pin, ReadJobTrace), Error>> + Send>> { - Box::pin(read_next_values_2::(opts, jobtrace, scy, stmts)) - } - fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) { let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64); (ts, EnumVariant::new(inp.1 as u16, inp.2)) @@ -1263,16 +1331,6 @@ impl ValTy for Vec { "string" } - fn read_next_values_trait( - opts: ReadNextValuesOpts, - jobtrace: ReadJobTrace, - scy: Arc, - stmts: Arc, - ) -> Pin, ReadJobTrace), Error>> + Send>> { - let fut = read_next_values_2::(opts, jobtrace, scy, stmts); - Box::pin(fut) - } - fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) { let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64); (ts, inp.1) diff --git a/crates/scyllaconn/src/events2/msp.rs b/crates/scyllaconn/src/events2/msp.rs index 1b2ab88..62b6c8f 100644 --- a/crates/scyllaconn/src/events2/msp.rs +++ b/crates/scyllaconn/src/events2/msp.rs @@ -20,9 +20,7 @@ macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if $det { log::trace!($( macro_rules! trace_msp { ($($arg:tt)*) => ( if true { log::trace!($($arg)*); } ) } -macro_rules! log_fetch_result { - ($($arg:tt)*) => { if false { log::trace!("fetch {}", format_args!($($arg)*)); } }; -} +macro_rules! log_fetch_result { ($($arg:tt)*) => { if true { log::trace!("fetch {}", format_args!($($arg)*)); } }; } autoerr::create_error_v1!( name(Error, "EventsMsp"), @@ -378,32 +376,39 @@ async fn find_ts_msp_bck_workaround( scy: &Session, ) -> Result, Error> { let selfname = "find_ts_msp_bck_workaround"; - let mut ret = VecDeque::new(); + let mut ret = Vec::new(); + // let params = (series as i64, 0 as i64, range.beg().ms() as i64); let params = (series as i64, 0 as i64, i64::MAX); log_fetch_result!("{selfname} {:?}", params); let mut res = scy - .execute_iter(stmts.rt(rt).ts_msp_fwd().clone(), params) + .execute_iter(stmts.rt(rt).ts_msp_bck_workaround().clone(), params) .await? .rows_stream::<(i64,)>()?; + let mut c = 0; while let Some(row) = res.try_next().await? { + c += 1; let ts = TsMs::from_ms_u64(row.0 as u64); - log_fetch_result!("{selfname} {params:?} {ts}"); - ret.push_back(ts); - if ret.len() > 1024 * 1024 { - return Err(Error::TooManyRows); + if ts >= range.beg().to_ts_ms() { + log_fetch_result!("{selfname} {params:?} {ts} DISCARD AFTER RANGE"); + } else { + if ret.len() > 1024 * 1024 { + return Err(Error::TooManyRows); + } else { + log_fetch_result!("{selfname} {params:?} {ts} USE"); + ret.push(ts); + } } } + log_fetch_result!("{selfname} {params:?} considered msp {c}"); if ret.len() > 1024 * 2 { log::info!("quite many ts_msp values in reverse lookup len {}", ret.len()); } if ret.len() > 1024 * 64 { log::warn!("quite many ts_msp values in reverse lookup len {}", ret.len()); } - let ret = if ret.len() > 2 { - let tmp: Vec<_> = ret.into_iter().rev().take(2).collect(); - tmp.into_iter().rev().collect() - } else { - ret - }; + let m = ret.len().max(2) - 2; + log_fetch_result!("{selfname} {params:?} {ret:?} {m}"); + let ret = ret.into_iter().skip(m).collect(); + log_fetch_result!("{selfname} {params:?} {ret:?}"); Ok(ret) } diff --git a/crates/scyllaconn/src/events2/prepare.rs b/crates/scyllaconn/src/events2/prepare.rs index 43c9747..8fe8b1e 100644 --- a/crates/scyllaconn/src/events2/prepare.rs +++ b/crates/scyllaconn/src/events2/prepare.rs @@ -74,6 +74,7 @@ impl StmtsLspDir { pub struct StmtsEventsRt { ts_msp_fwd: PreparedStatement, ts_msp_bck: PreparedStatement, + ts_msp_bck_workaround: PreparedStatement, lsp_fwd_val: StmtsLspDir, lsp_bck_val: StmtsLspDir, lsp_fwd_ts: StmtsLspDir, @@ -91,6 +92,10 @@ impl StmtsEventsRt { &self.ts_msp_bck } + pub fn ts_msp_bck_workaround(&self) -> &PreparedStatement { + &self.ts_msp_bck_workaround + } + pub fn lsp(&self, bck: bool, val: bool) -> &StmtsLspDir { if bck { if val { &self.lsp_bck_val } else { &self.lsp_bck_ts } @@ -134,6 +139,27 @@ async fn make_msp_dir( Ok(qu) } +async fn make_msp_fwd_for_bck_workaround( + ks: &str, + rt: &RetentionTime, + query_opts: &str, + scy: &Session, +) -> Result { + let table_name = "ts_msp"; + let select_cond = "ts_msp >= ? and ts_msp < ?"; + let cql = format!( + "select ts_msp from {}.{}{} where series = ? and {} {}", + ks, + rt.table_prefix(), + table_name, + select_cond, + query_opts + ); + info_prepare!("{ks} {rt} {cql}"); + let qu = scy.prepare(cql).await?; + Ok(qu) +} + async fn make_lsp( ks: &str, rt: &RetentionTime, @@ -290,6 +316,7 @@ async fn make_rt(ks: &str, rt: &RetentionTime, query_opts: &str, scy: &Session) let ret = StmtsEventsRt { ts_msp_fwd: make_msp_dir(ks, rt, false, query_opts, scy).await?, ts_msp_bck: make_msp_dir(ks, rt, true, query_opts, scy).await?, + ts_msp_bck_workaround: make_msp_fwd_for_bck_workaround(ks, rt, query_opts, scy).await?, lsp_fwd_val: make_lsp_dir(ks, rt, "ts_lsp, value", false, query_opts, scy).await?, lsp_bck_val: make_lsp_dir(ks, rt, "ts_lsp, value", true, query_opts, scy).await?, lsp_fwd_ts: make_lsp_dir(ks, rt, "ts_lsp", false, query_opts, scy).await?, diff --git a/crates/scyllaconn/src/worker.rs b/crates/scyllaconn/src/worker.rs index f708aad..3dd4bbe 100644 --- a/crates/scyllaconn/src/worker.rs +++ b/crates/scyllaconn/src/worker.rs @@ -1,5 +1,6 @@ use crate::binwriteindex::BinWriteIndexEntry; use crate::conn::create_scy_session_no_ks; +use crate::events2::events::ReadEventsJobParams; use crate::events2::events::ReadJobTrace; use crate::events2::prepare::StmtsEvents; use crate::range::ScyllaSeriesRange; @@ -166,6 +167,10 @@ enum Job { BinWriteIndexRead(BinWriteIndexRead), PrepareV1(PrepareV1), ExecuteV1(ExecuteV1), + ReadEvents02( + ReadEventsJobParams, + Sender, ReadJobTrace), Error>>, + ), } struct ReadNextValues { @@ -208,6 +213,7 @@ impl ScyllaQueue { Ok(res) } + // TODO remove pub async fn read_next_values( &self, futgen: F, @@ -234,6 +240,17 @@ impl ScyllaQueue { Ok(res) } + pub async fn read_events_v02( + &self, + params: ReadEventsJobParams, + ) -> Result<(Box, ReadJobTrace), Error> { + let (tx, rx) = async_channel::bounded(1); + let job = Job::ReadEvents02(params, tx); + self.tx.send(job).await.map_err(|_| Error::ChannelSend)?; + let res = rx.recv().await.map_err(|_| Error::ChannelRecv)??; + Ok(res) + } + pub async fn accounting_read_ts( &self, rt: RetentionTime, @@ -462,6 +479,14 @@ impl ScyllaWorker { // TODO log? let _ = job.tx.send(res).await; } + Job::ReadEvents02(params, tx) => { + let res = crate::events2::events::read_events_v02(params, scy.clone(), stmts.clone()) + .await + .map_err(From::from); + if tx.send(res).await.is_err() { + // TODO count for stats + } + } } }) .buffer_unordered(CONCURRENT_QUERIES_PER_WORKER)