Refactor framed stream
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
#![allow(unused)]
|
||||
|
||||
use crate::errconv::ErrConv;
|
||||
use err::Error;
|
||||
use futures_util::Future;
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use crate::events2::events::EventReadOpts;
|
||||
use crate::events2::prepare::StmtsEvents;
|
||||
use crate::range::ScyllaSeriesRange;
|
||||
use crate::worker::ScyllaQueue;
|
||||
@@ -32,6 +33,7 @@ use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
use tracing::Instrument;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[cstm(name = "ScyllaReadEvents")]
|
||||
@@ -45,6 +47,7 @@ pub enum Error {
|
||||
RangeEndOverflow,
|
||||
InvalidFuture,
|
||||
TestError(String),
|
||||
Logic,
|
||||
}
|
||||
|
||||
impl From<crate::worker::Error> for Error {
|
||||
@@ -63,6 +66,19 @@ pub(super) trait ValTy: Sized + 'static {
|
||||
fn default() -> Self;
|
||||
fn is_valueblob() -> bool;
|
||||
fn st_name() -> &'static str;
|
||||
fn read_next_values(
|
||||
opts: ReadNextValuesOpts,
|
||||
scy: Arc<Session>,
|
||||
stmts: Arc<StmtsEvents>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>>;
|
||||
fn convert_rows(
|
||||
rows: Vec<Row>,
|
||||
range: ScyllaSeriesRange,
|
||||
ts_msp: TsMs,
|
||||
with_values: bool,
|
||||
bck: bool,
|
||||
last_before: &mut Option<(TsNano, Self)>,
|
||||
) -> Result<Self::Container, Error>;
|
||||
}
|
||||
|
||||
macro_rules! impl_scaty_scalar {
|
||||
@@ -71,24 +87,49 @@ macro_rules! impl_scaty_scalar {
|
||||
type ScaTy = $st;
|
||||
type ScyTy = $st_scy;
|
||||
type Container = EventsDim0<Self::ScaTy>;
|
||||
|
||||
fn from_scyty(inp: Self::ScyTy) -> Self {
|
||||
inp as Self
|
||||
}
|
||||
|
||||
fn from_valueblob(_inp: Vec<u8>) -> Self {
|
||||
<Self as ValTy>::default()
|
||||
}
|
||||
|
||||
fn table_name() -> &'static str {
|
||||
concat!("scalar_", $table_name)
|
||||
}
|
||||
|
||||
fn default() -> Self {
|
||||
<Self as std::default::Default>::default()
|
||||
}
|
||||
|
||||
fn is_valueblob() -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn st_name() -> &'static str {
|
||||
$st_name
|
||||
}
|
||||
|
||||
fn read_next_values(
|
||||
opts: ReadNextValuesOpts,
|
||||
scy: Arc<Session>,
|
||||
stmts: Arc<StmtsEvents>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>> {
|
||||
Box::pin(read_next_values_2::<Self>(opts, scy, stmts))
|
||||
}
|
||||
|
||||
fn convert_rows(
|
||||
rows: Vec<Row>,
|
||||
range: ScyllaSeriesRange,
|
||||
ts_msp: TsMs,
|
||||
with_values: bool,
|
||||
bck: bool,
|
||||
last_before: &mut Option<(TsNano, Self)>,
|
||||
) -> Result<Self::Container, Error> {
|
||||
convert_rows_0::<Self>(rows, range, ts_msp, with_values, bck, last_before)
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -99,9 +140,11 @@ macro_rules! impl_scaty_array {
|
||||
type ScaTy = $st;
|
||||
type ScyTy = $st_scy;
|
||||
type Container = EventsDim1<Self::ScaTy>;
|
||||
|
||||
fn from_scyty(inp: Self::ScyTy) -> Self {
|
||||
inp.into_iter().map(|x| x as Self::ScaTy).collect()
|
||||
}
|
||||
|
||||
fn from_valueblob(inp: Vec<u8>) -> Self {
|
||||
if inp.len() < 32 {
|
||||
<Self as ValTy>::default()
|
||||
@@ -118,18 +161,41 @@ macro_rules! impl_scaty_array {
|
||||
c
|
||||
}
|
||||
}
|
||||
|
||||
fn table_name() -> &'static str {
|
||||
concat!("array_", $table_name)
|
||||
}
|
||||
|
||||
fn default() -> Self {
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
fn is_valueblob() -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn st_name() -> &'static str {
|
||||
$st_name
|
||||
}
|
||||
|
||||
fn read_next_values(
|
||||
opts: ReadNextValuesOpts,
|
||||
scy: Arc<Session>,
|
||||
stmts: Arc<StmtsEvents>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>> {
|
||||
Box::pin(read_next_values_2::<Self>(opts, scy, stmts))
|
||||
}
|
||||
|
||||
fn convert_rows(
|
||||
rows: Vec<Row>,
|
||||
range: ScyllaSeriesRange,
|
||||
ts_msp: TsMs,
|
||||
with_values: bool,
|
||||
bck: bool,
|
||||
last_before: &mut Option<(TsNano, Self)>,
|
||||
) -> Result<Self::Container, Error> {
|
||||
convert_rows_0::<Self>(rows, range, ts_msp, with_values, bck, last_before)
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -164,6 +230,26 @@ impl ValTy for EnumVariant {
|
||||
fn st_name() -> &'static str {
|
||||
"enum"
|
||||
}
|
||||
|
||||
fn read_next_values(
|
||||
opts: ReadNextValuesOpts,
|
||||
scy: Arc<Session>,
|
||||
stmts: Arc<StmtsEvents>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>> {
|
||||
let fut = read_next_values_2::<Self>(opts, scy, stmts);
|
||||
Box::pin(fut)
|
||||
}
|
||||
|
||||
fn convert_rows(
|
||||
rows: Vec<Row>,
|
||||
range: ScyllaSeriesRange,
|
||||
ts_msp: TsMs,
|
||||
with_values: bool,
|
||||
bck: bool,
|
||||
last_before: &mut Option<(TsNano, Self)>,
|
||||
) -> Result<Self::Container, Error> {
|
||||
convert_rows_enum(rows, range, ts_msp, with_values, bck, last_before)
|
||||
}
|
||||
}
|
||||
|
||||
impl ValTy for Vec<String> {
|
||||
@@ -196,6 +282,26 @@ impl ValTy for Vec<String> {
|
||||
fn st_name() -> &'static str {
|
||||
"string"
|
||||
}
|
||||
|
||||
fn read_next_values(
|
||||
opts: ReadNextValuesOpts,
|
||||
scy: Arc<Session>,
|
||||
stmts: Arc<StmtsEvents>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>> {
|
||||
let fut = read_next_values_2::<Self>(opts, scy, stmts);
|
||||
Box::pin(fut)
|
||||
}
|
||||
|
||||
fn convert_rows(
|
||||
rows: Vec<Row>,
|
||||
range: ScyllaSeriesRange,
|
||||
ts_msp: TsMs,
|
||||
with_values: bool,
|
||||
bck: bool,
|
||||
last_before: &mut Option<(TsNano, Self)>,
|
||||
) -> Result<Self::Container, Error> {
|
||||
convert_rows_0::<Self>(rows, range, ts_msp, with_values, bck, last_before)
|
||||
}
|
||||
}
|
||||
|
||||
impl_scaty_scalar!(u8, i8, "u8", "u8");
|
||||
@@ -230,7 +336,7 @@ pub(super) struct ReadNextValuesOpts {
|
||||
ts_msp: TsMs,
|
||||
range: ScyllaSeriesRange,
|
||||
fwd: bool,
|
||||
with_values: bool,
|
||||
readopts: EventReadOpts,
|
||||
scyqueue: ScyllaQueue,
|
||||
}
|
||||
|
||||
@@ -241,7 +347,7 @@ impl ReadNextValuesOpts {
|
||||
ts_msp: TsMs,
|
||||
range: ScyllaSeriesRange,
|
||||
fwd: bool,
|
||||
with_values: bool,
|
||||
readopts: EventReadOpts,
|
||||
scyqueue: ScyllaQueue,
|
||||
) -> Self {
|
||||
Self {
|
||||
@@ -250,7 +356,7 @@ impl ReadNextValuesOpts {
|
||||
ts_msp,
|
||||
range,
|
||||
fwd,
|
||||
with_values,
|
||||
readopts,
|
||||
scyqueue,
|
||||
}
|
||||
}
|
||||
@@ -262,10 +368,18 @@ where
|
||||
{
|
||||
// TODO could take scyqeue out of opts struct.
|
||||
let scyqueue = opts.scyqueue.clone();
|
||||
debug!("bbbbbbbbbbbbbbbbbbbbbbbbbbbb");
|
||||
let futgen = Box::new(|scy: Arc<Session>, stmts: Arc<StmtsEvents>| {
|
||||
let fut = async {
|
||||
read_next_values_2::<ST>(opts, scy, stmts)
|
||||
let level = taskrun::query_log_level();
|
||||
let futgen = Box::new(move |scy: Arc<Session>, stmts: Arc<StmtsEvents>| {
|
||||
let fut = async move {
|
||||
let logspan = if level == Level::DEBUG {
|
||||
tracing::span!(Level::INFO, "log_span_debug")
|
||||
} else if level == Level::TRACE {
|
||||
tracing::span!(Level::INFO, "log_span_trace")
|
||||
} else {
|
||||
tracing::Span::none()
|
||||
};
|
||||
ST::read_next_values(opts, scy, stmts)
|
||||
.instrument(logspan)
|
||||
.await
|
||||
.map_err(crate::worker::Error::from)
|
||||
};
|
||||
@@ -283,17 +397,12 @@ async fn read_next_values_2<ST>(
|
||||
where
|
||||
ST: ValTy,
|
||||
{
|
||||
debug!("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa {opts:?}");
|
||||
trace!(
|
||||
"read_next_values_2 {} {} st_name {}",
|
||||
opts.series,
|
||||
opts.ts_msp,
|
||||
ST::st_name()
|
||||
);
|
||||
trace!("read_next_values_2 {:?} st_name {}", opts, ST::st_name());
|
||||
let series = opts.series;
|
||||
let ts_msp = opts.ts_msp;
|
||||
let range = opts.range;
|
||||
let table_name = ST::table_name();
|
||||
let with_values = opts.readopts.with_values;
|
||||
if range.end() > TsNano::from_ns(i64::MAX as u64) {
|
||||
return Err(Error::RangeEndOverflow);
|
||||
}
|
||||
@@ -317,7 +426,7 @@ where
|
||||
);
|
||||
let qu = stmts
|
||||
.rt(&opts.rt)
|
||||
.lsp(!opts.fwd, opts.with_values)
|
||||
.lsp(!opts.fwd, with_values)
|
||||
.shape(ST::is_valueblob())
|
||||
.st(ST::st_name())?;
|
||||
let params = (
|
||||
@@ -333,7 +442,7 @@ where
|
||||
rows.push(x?);
|
||||
}
|
||||
let mut last_before = None;
|
||||
let ret = convert_rows::<ST>(rows, range, ts_msp, opts.with_values, !opts.fwd, &mut last_before)?;
|
||||
let ret = ST::convert_rows(rows, range, ts_msp, with_values, !opts.fwd, &mut last_before)?;
|
||||
ret
|
||||
} else {
|
||||
let ts_lsp_max = if ts_msp.ns() < range.beg() {
|
||||
@@ -344,7 +453,7 @@ where
|
||||
trace!("BCK ts_msp {} ts_lsp_max {} {}", ts_msp, ts_lsp_max, table_name,);
|
||||
let qu = stmts
|
||||
.rt(&opts.rt)
|
||||
.lsp(!opts.fwd, opts.with_values)
|
||||
.lsp(!opts.fwd, with_values)
|
||||
.shape(ST::is_valueblob())
|
||||
.st(ST::st_name())?;
|
||||
let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64);
|
||||
@@ -355,7 +464,7 @@ where
|
||||
rows.push(x?);
|
||||
}
|
||||
let mut _last_before = None;
|
||||
let ret = convert_rows::<ST>(rows, range, ts_msp, opts.with_values, !opts.fwd, &mut _last_before)?;
|
||||
let ret = ST::convert_rows(rows, range, ts_msp, with_values, !opts.fwd, &mut _last_before)?;
|
||||
if ret.len() > 1 {
|
||||
error!("multiple events in backwards search {}", ret.len());
|
||||
}
|
||||
@@ -366,478 +475,116 @@ where
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
fn convert_rows<ST: ValTy>(
|
||||
fn convert_rows_0<ST: ValTy>(
|
||||
rows: Vec<Row>,
|
||||
range: ScyllaSeriesRange,
|
||||
ts_msp: TsMs,
|
||||
with_values: bool,
|
||||
bck: bool,
|
||||
last_before: &mut Option<(TsNano, u64, ST)>,
|
||||
last_before: &mut Option<(TsNano, ST)>,
|
||||
) -> Result<<ST as ValTy>::Container, Error> {
|
||||
let mut ret = <ST as ValTy>::Container::empty();
|
||||
for row in rows {
|
||||
let (ts, pulse, value) = if with_values {
|
||||
let (ts, value) = if with_values {
|
||||
if ST::is_valueblob() {
|
||||
let row: (i64, i64, Vec<u8>) = row.into_typed()?;
|
||||
trace!("read a value blob len {}", row.2.len());
|
||||
let row: (i64, Vec<u8>) = row.into_typed()?;
|
||||
// trace!("read a value blob len {}", row.1.len());
|
||||
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
|
||||
let pulse = row.1 as u64;
|
||||
let value = ValTy::from_valueblob(row.2);
|
||||
(ts, pulse, value)
|
||||
let value = ValTy::from_valueblob(row.1);
|
||||
(ts, value)
|
||||
} else {
|
||||
let row: (i64, i64, ST::ScyTy) = row.into_typed()?;
|
||||
let row: (i64, ST::ScyTy) = row.into_typed()?;
|
||||
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
|
||||
let pulse = row.1 as u64;
|
||||
let value = ValTy::from_scyty(row.2);
|
||||
(ts, pulse, value)
|
||||
let value = ValTy::from_scyty(row.1);
|
||||
(ts, value)
|
||||
}
|
||||
} else {
|
||||
let row: (i64, i64) = row.into_typed()?;
|
||||
let row: (i64,) = row.into_typed()?;
|
||||
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
|
||||
let pulse = row.1 as u64;
|
||||
let value = ValTy::default();
|
||||
(ts, pulse, value)
|
||||
(ts, value)
|
||||
};
|
||||
if bck {
|
||||
if ts >= range.beg() {
|
||||
// TODO count as logic error
|
||||
error!("ts >= range.beg");
|
||||
} else if ts < range.beg() {
|
||||
ret.push(ts.ns(), pulse, value);
|
||||
ret.push(ts.ns(), 0, value);
|
||||
} else {
|
||||
*last_before = Some((ts, pulse, value));
|
||||
*last_before = Some((ts, value));
|
||||
}
|
||||
} else {
|
||||
if ts >= range.end() {
|
||||
// TODO count as logic error
|
||||
error!("ts >= range.end");
|
||||
} else if ts >= range.beg() {
|
||||
ret.push(ts.ns(), pulse, value);
|
||||
ret.push(ts.ns(), 0, value);
|
||||
} else {
|
||||
if last_before.is_none() {
|
||||
warn!("encounter event before range in forward read {ts}");
|
||||
}
|
||||
*last_before = Some((ts, pulse, value));
|
||||
*last_before = Some((ts, value));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub(super) struct ReadValues {
|
||||
rt: RetentionTime,
|
||||
series: u64,
|
||||
scalar_type: ScalarType,
|
||||
shape: Shape,
|
||||
fn convert_rows_enum(
|
||||
rows: Vec<Row>,
|
||||
range: ScyllaSeriesRange,
|
||||
ts_msps: VecDeque<TsMs>,
|
||||
fwd: bool,
|
||||
ts_msp: TsMs,
|
||||
with_values: bool,
|
||||
fut: Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>>,
|
||||
fut_done: bool,
|
||||
scyqueue: ScyllaQueue,
|
||||
}
|
||||
|
||||
impl ReadValues {
|
||||
pub(super) fn new(
|
||||
rt: RetentionTime,
|
||||
series: u64,
|
||||
scalar_type: ScalarType,
|
||||
shape: Shape,
|
||||
range: ScyllaSeriesRange,
|
||||
ts_msps: VecDeque<TsMs>,
|
||||
fwd: bool,
|
||||
with_values: bool,
|
||||
scyqueue: ScyllaQueue,
|
||||
) -> Self {
|
||||
let mut ret = Self {
|
||||
rt,
|
||||
series,
|
||||
scalar_type,
|
||||
shape,
|
||||
range,
|
||||
ts_msps,
|
||||
fwd,
|
||||
with_values,
|
||||
fut: Box::pin(futures_util::future::ready(Err(Error::InvalidFuture))),
|
||||
fut_done: false,
|
||||
scyqueue,
|
||||
};
|
||||
ret.next();
|
||||
ret
|
||||
}
|
||||
|
||||
fn next(&mut self) -> bool {
|
||||
if let Some(ts_msp) = self.ts_msps.pop_front() {
|
||||
self.fut = self.make_fut(ts_msp);
|
||||
self.fut_done = false;
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn make_fut(&mut self, ts_msp: TsMs) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>> {
|
||||
let opts = ReadNextValuesOpts {
|
||||
rt: self.rt.clone(),
|
||||
series: self.series.clone(),
|
||||
ts_msp,
|
||||
range: self.range.clone(),
|
||||
fwd: self.fwd,
|
||||
with_values: self.with_values,
|
||||
scyqueue: self.scyqueue.clone(),
|
||||
};
|
||||
let scalar_type = self.scalar_type.clone();
|
||||
let shape = self.shape.clone();
|
||||
let fut = async move {
|
||||
match &shape {
|
||||
Shape::Scalar => match &scalar_type {
|
||||
ScalarType::U8 => read_next_values::<u8>(opts).await,
|
||||
ScalarType::U16 => read_next_values::<u16>(opts).await,
|
||||
ScalarType::U32 => read_next_values::<u32>(opts).await,
|
||||
ScalarType::U64 => read_next_values::<u64>(opts).await,
|
||||
ScalarType::I8 => read_next_values::<i8>(opts).await,
|
||||
ScalarType::I16 => read_next_values::<i16>(opts).await,
|
||||
ScalarType::I32 => read_next_values::<i32>(opts).await,
|
||||
ScalarType::I64 => read_next_values::<i64>(opts).await,
|
||||
ScalarType::F32 => read_next_values::<f32>(opts).await,
|
||||
ScalarType::F64 => read_next_values::<f64>(opts).await,
|
||||
ScalarType::BOOL => read_next_values::<bool>(opts).await,
|
||||
ScalarType::STRING => read_next_values::<String>(opts).await,
|
||||
ScalarType::Enum => read_next_values::<String>(opts).await,
|
||||
ScalarType::ChannelStatus => {
|
||||
warn!("read scalar channel status not yet supported");
|
||||
err::todoval()
|
||||
}
|
||||
},
|
||||
Shape::Wave(_) => match &scalar_type {
|
||||
ScalarType::U8 => read_next_values::<Vec<u8>>(opts).await,
|
||||
ScalarType::U16 => read_next_values::<Vec<u16>>(opts).await,
|
||||
ScalarType::U32 => read_next_values::<Vec<u32>>(opts).await,
|
||||
ScalarType::U64 => read_next_values::<Vec<u64>>(opts).await,
|
||||
ScalarType::I8 => read_next_values::<Vec<i8>>(opts).await,
|
||||
ScalarType::I16 => read_next_values::<Vec<i16>>(opts).await,
|
||||
ScalarType::I32 => read_next_values::<Vec<i32>>(opts).await,
|
||||
ScalarType::I64 => read_next_values::<Vec<i64>>(opts).await,
|
||||
ScalarType::F32 => read_next_values::<Vec<f32>>(opts).await,
|
||||
ScalarType::F64 => read_next_values::<Vec<f64>>(opts).await,
|
||||
ScalarType::BOOL => read_next_values::<Vec<bool>>(opts).await,
|
||||
ScalarType::STRING => {
|
||||
warn!("read array string not yet supported");
|
||||
err::todoval()
|
||||
}
|
||||
ScalarType::Enum => read_next_values::<Vec<String>>(opts).await,
|
||||
ScalarType::ChannelStatus => {
|
||||
warn!("read array channel status not yet supported");
|
||||
err::todoval()
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
error!("TODO ReadValues add more types");
|
||||
err::todoval()
|
||||
bck: bool,
|
||||
last_before: &mut Option<(TsNano, EnumVariant)>,
|
||||
) -> Result<<EnumVariant as ValTy>::Container, Error> {
|
||||
let mut ret = <EnumVariant as ValTy>::Container::empty();
|
||||
for row in rows {
|
||||
let (ts, value) = if with_values {
|
||||
if EnumVariant::is_valueblob() {
|
||||
if true {
|
||||
return Err(Error::Logic);
|
||||
}
|
||||
}
|
||||
};
|
||||
Box::pin(fut)
|
||||
}
|
||||
}
|
||||
|
||||
enum FrState {
|
||||
New,
|
||||
FindMsp(Pin<Box<dyn Future<Output = Result<VecDeque<TsMs>, crate::worker::Error>> + Send>>),
|
||||
ReadBack1(ReadValues),
|
||||
ReadBack2(ReadValues),
|
||||
ReadValues(ReadValues),
|
||||
DataDone,
|
||||
Done,
|
||||
}
|
||||
|
||||
pub struct EventsStreamScylla {
|
||||
rt: RetentionTime,
|
||||
state: FrState,
|
||||
series: u64,
|
||||
scalar_type: ScalarType,
|
||||
shape: Shape,
|
||||
range: ScyllaSeriesRange,
|
||||
do_one_before_range: bool,
|
||||
ts_msp_bck: VecDeque<TsMs>,
|
||||
ts_msp_fwd: VecDeque<TsMs>,
|
||||
scyqueue: ScyllaQueue,
|
||||
do_test_stream_error: bool,
|
||||
found_one_after: bool,
|
||||
with_values: bool,
|
||||
outqueue: VecDeque<Box<dyn Events>>,
|
||||
ts_seen_max: u64,
|
||||
}
|
||||
|
||||
impl EventsStreamScylla {
|
||||
pub fn _new(
|
||||
rt: RetentionTime,
|
||||
series: u64,
|
||||
range: ScyllaSeriesRange,
|
||||
do_one_before_range: bool,
|
||||
scalar_type: ScalarType,
|
||||
shape: Shape,
|
||||
with_values: bool,
|
||||
scyqueue: ScyllaQueue,
|
||||
do_test_stream_error: bool,
|
||||
) -> Self {
|
||||
debug!("EventsStreamScylla::new");
|
||||
Self {
|
||||
rt,
|
||||
state: FrState::New,
|
||||
series,
|
||||
scalar_type,
|
||||
shape,
|
||||
range,
|
||||
do_one_before_range,
|
||||
ts_msp_bck: VecDeque::new(),
|
||||
ts_msp_fwd: VecDeque::new(),
|
||||
scyqueue,
|
||||
do_test_stream_error,
|
||||
found_one_after: false,
|
||||
with_values,
|
||||
outqueue: VecDeque::new(),
|
||||
ts_seen_max: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn ts_msps_found(&mut self, msps1: VecDeque<TsMs>, msps2: VecDeque<TsMs>) {
|
||||
trace!("ts_msps_found msps1 {msps1:?} msps2 {msps2:?}");
|
||||
self.ts_msp_bck = msps1;
|
||||
self.ts_msp_fwd = msps2;
|
||||
for x in self.ts_msp_bck.iter().rev() {
|
||||
let x = x.clone();
|
||||
if x.ns() >= self.range.end() {
|
||||
info!("FOUND one-after because of MSP");
|
||||
self.found_one_after = true;
|
||||
}
|
||||
self.ts_msp_fwd.push_front(x);
|
||||
}
|
||||
trace!("ts_msp_bck {:?}", self.ts_msp_bck);
|
||||
trace!("ts_msp_fwd {:?}", self.ts_msp_fwd);
|
||||
if let Some(msp) = self.ts_msp_bck.pop_back() {
|
||||
trace!("start ReadBack1 msp {}", msp);
|
||||
let st = ReadValues::new(
|
||||
self.rt.clone(),
|
||||
self.series,
|
||||
self.scalar_type.clone(),
|
||||
self.shape.clone(),
|
||||
self.range.clone(),
|
||||
[msp].into(),
|
||||
false,
|
||||
self.with_values,
|
||||
self.scyqueue.clone(),
|
||||
);
|
||||
self.state = FrState::ReadBack1(st);
|
||||
} else if self.ts_msp_fwd.len() > 0 {
|
||||
trace!("begin immediately with forward read");
|
||||
let st = ReadValues::new(
|
||||
self.rt.clone(),
|
||||
self.series,
|
||||
self.scalar_type.clone(),
|
||||
self.shape.clone(),
|
||||
self.range.clone(),
|
||||
mem::replace(&mut self.ts_msp_fwd, VecDeque::new()),
|
||||
true,
|
||||
self.with_values,
|
||||
self.scyqueue.clone(),
|
||||
);
|
||||
self.state = FrState::ReadValues(st);
|
||||
} else {
|
||||
self.state = FrState::DataDone;
|
||||
}
|
||||
}
|
||||
|
||||
fn back_1_done(&mut self, item: Box<dyn Events>) {
|
||||
trace!("back_1_done item len {}", item.len());
|
||||
if item.len() > 0 {
|
||||
self.outqueue.push_back(item);
|
||||
if self.ts_msp_fwd.len() > 0 {
|
||||
trace!("start forward read after back1");
|
||||
let st = ReadValues::new(
|
||||
self.rt.clone(),
|
||||
self.series,
|
||||
self.scalar_type.clone(),
|
||||
self.shape.clone(),
|
||||
self.range.clone(),
|
||||
mem::replace(&mut self.ts_msp_fwd, VecDeque::new()),
|
||||
true,
|
||||
self.with_values,
|
||||
self.scyqueue.clone(),
|
||||
);
|
||||
self.state = FrState::ReadValues(st);
|
||||
let row: (i64, Vec<u8>) = row.into_typed()?;
|
||||
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
|
||||
let value = ValTy::from_valueblob(row.1);
|
||||
(ts, value)
|
||||
} else {
|
||||
self.state = FrState::DataDone;
|
||||
let row: (i64, i16, String) = row.into_typed()?;
|
||||
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
|
||||
let val = row.1 as u16;
|
||||
let valstr = row.2;
|
||||
let value = EnumVariant::new(val, valstr.into());
|
||||
(ts, value)
|
||||
}
|
||||
} else {
|
||||
if let Some(msp) = self.ts_msp_bck.pop_back() {
|
||||
trace!("start ReadBack2 msp {}", msp);
|
||||
let st = ReadValues::new(
|
||||
self.rt.clone(),
|
||||
self.series,
|
||||
self.scalar_type.clone(),
|
||||
self.shape.clone(),
|
||||
self.range.clone(),
|
||||
[msp].into(),
|
||||
false,
|
||||
self.with_values,
|
||||
self.scyqueue.clone(),
|
||||
);
|
||||
self.state = FrState::ReadBack2(st);
|
||||
} else if self.ts_msp_fwd.len() > 0 {
|
||||
trace!("no 2nd back MSP, go for forward read");
|
||||
let st = ReadValues::new(
|
||||
self.rt.clone(),
|
||||
self.series,
|
||||
self.scalar_type.clone(),
|
||||
self.shape.clone(),
|
||||
self.range.clone(),
|
||||
mem::replace(&mut self.ts_msp_fwd, VecDeque::new()),
|
||||
true,
|
||||
self.with_values,
|
||||
self.scyqueue.clone(),
|
||||
);
|
||||
self.state = FrState::ReadValues(st);
|
||||
let row: (i64,) = row.into_typed()?;
|
||||
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
|
||||
let value = ValTy::default();
|
||||
(ts, value)
|
||||
};
|
||||
if bck {
|
||||
if ts >= range.beg() {
|
||||
// TODO count as logic error
|
||||
error!("ts >= range.beg");
|
||||
} else if ts < range.beg() {
|
||||
ret.push(ts.ns(), 0, value);
|
||||
} else {
|
||||
trace!("no 2nd back msp, but also nothing to go forward");
|
||||
self.state = FrState::DataDone;
|
||||
*last_before = Some((ts, value));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn back_2_done(&mut self, item: Box<dyn Events>) {
|
||||
trace!("back_2_done item len {}", item.len());
|
||||
if item.len() > 0 {
|
||||
self.outqueue.push_back(item);
|
||||
}
|
||||
if self.ts_msp_fwd.len() > 0 {
|
||||
trace!("start forward read after back2");
|
||||
let st = ReadValues::new(
|
||||
self.rt.clone(),
|
||||
self.series,
|
||||
self.scalar_type.clone(),
|
||||
self.shape.clone(),
|
||||
self.range.clone(),
|
||||
mem::replace(&mut self.ts_msp_fwd, VecDeque::new()),
|
||||
true,
|
||||
self.with_values,
|
||||
self.scyqueue.clone(),
|
||||
);
|
||||
self.state = FrState::ReadValues(st);
|
||||
} else {
|
||||
trace!("nothing to forward read after back 2");
|
||||
self.state = FrState::DataDone;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for EventsStreamScylla {
|
||||
type Item = Result<ChannelEvents, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
if self.do_test_stream_error {
|
||||
let e = Error::TestError("test-message".into());
|
||||
return Ready(Some(Err(e)));
|
||||
}
|
||||
loop {
|
||||
if let Some(item) = self.outqueue.pop_front() {
|
||||
item.verify();
|
||||
if let Some(item_min) = item.ts_min() {
|
||||
if item_min < self.ts_seen_max {
|
||||
debug!("ordering error A {} {}", item_min, self.ts_seen_max);
|
||||
}
|
||||
if ts >= range.end() {
|
||||
// TODO count as logic error
|
||||
error!("ts >= range.end");
|
||||
} else if ts >= range.beg() {
|
||||
ret.push(ts.ns(), 0, value);
|
||||
} else {
|
||||
if last_before.is_none() {
|
||||
warn!("encounter event before range in forward read {ts}");
|
||||
}
|
||||
if let Some(item_max) = item.ts_max() {
|
||||
if item_max < self.ts_seen_max {
|
||||
debug!("ordering error B {} {}", item_max, self.ts_seen_max);
|
||||
} else {
|
||||
self.ts_seen_max = item_max;
|
||||
}
|
||||
}
|
||||
debug!("deliver item {}", item.output_info());
|
||||
break Ready(Some(Ok(ChannelEvents::Events(item))));
|
||||
*last_before = Some((ts, value));
|
||||
}
|
||||
break match self.state {
|
||||
FrState::New => {
|
||||
let series = self.series.clone();
|
||||
let range = self.range.clone();
|
||||
// TODO this no longer works, we miss the backwards part here
|
||||
// let fut = find_ts_msp_via_queue(self.rt.clone(), series, range, false, self.scyqueue.clone());
|
||||
// let fut = Box::pin(fut);
|
||||
let fut = todo!();
|
||||
self.state = FrState::FindMsp(fut);
|
||||
continue;
|
||||
}
|
||||
FrState::FindMsp(ref mut fut) => match fut.poll_unpin(cx) {
|
||||
Ready(Ok(msps)) => {
|
||||
self.ts_msps_found(VecDeque::new(), msps);
|
||||
continue;
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
error!("EventsStreamScylla FindMsp {e}");
|
||||
self.state = FrState::DataDone;
|
||||
Ready(Some(Err(e.into())))
|
||||
}
|
||||
Pending => Pending,
|
||||
},
|
||||
FrState::ReadBack1(ref mut st) => match st.fut.poll_unpin(cx) {
|
||||
Ready(Ok(item)) => {
|
||||
st.fut_done = true;
|
||||
self.back_1_done(item);
|
||||
continue;
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
error!("EventsStreamScylla ReadBack1 {e}");
|
||||
st.fut_done = true;
|
||||
self.state = FrState::DataDone;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
Pending => Pending,
|
||||
},
|
||||
FrState::ReadBack2(ref mut st) => match st.fut.poll_unpin(cx) {
|
||||
Ready(Ok(item)) => {
|
||||
st.fut_done = true;
|
||||
self.back_2_done(item);
|
||||
continue;
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
error!("EventsStreamScylla ReadBack2 {e}");
|
||||
st.fut_done = true;
|
||||
self.state = FrState::DataDone;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
Pending => Pending,
|
||||
},
|
||||
FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) {
|
||||
Ready(Ok(item)) => {
|
||||
st.fut_done = true;
|
||||
if !st.next() {
|
||||
trace!("ReadValues exhausted");
|
||||
self.state = FrState::DataDone;
|
||||
}
|
||||
if item.len() > 0 {
|
||||
self.outqueue.push_back(item);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
error!("EventsStreamScylla ReadValues {e}");
|
||||
st.fut_done = true;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
Pending => Pending,
|
||||
},
|
||||
FrState::DataDone => {
|
||||
if self.found_one_after {
|
||||
// TODO emit RangeComplete
|
||||
}
|
||||
self.state = FrState::Done;
|
||||
continue;
|
||||
}
|
||||
FrState::Done => Ready(None),
|
||||
};
|
||||
}
|
||||
}
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
@@ -41,6 +41,21 @@ macro_rules! warn_item {
|
||||
};
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct EventReadOpts {
|
||||
pub with_values: bool,
|
||||
pub enum_as_strings: bool,
|
||||
}
|
||||
|
||||
impl EventReadOpts {
|
||||
pub fn new(with_values: bool, enum_as_strings: bool) -> Self {
|
||||
Self {
|
||||
with_values,
|
||||
enum_as_strings,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[cstm(name = "ScyllaEvents")]
|
||||
pub enum Error {
|
||||
@@ -86,7 +101,7 @@ pub struct EventsStreamRt {
|
||||
scalar_type: ScalarType,
|
||||
shape: Shape,
|
||||
range: ScyllaSeriesRange,
|
||||
with_values: bool,
|
||||
readopts: EventReadOpts,
|
||||
state: State,
|
||||
scyqueue: ScyllaQueue,
|
||||
msp_inp: MspStreamRt,
|
||||
@@ -101,10 +116,10 @@ impl EventsStreamRt {
|
||||
scalar_type: ScalarType,
|
||||
shape: Shape,
|
||||
range: ScyllaSeriesRange,
|
||||
with_values: bool,
|
||||
readopts: EventReadOpts,
|
||||
scyqueue: ScyllaQueue,
|
||||
) -> Self {
|
||||
debug!("EventsStreamRt::new {series:?} {range:?} {rt:?}");
|
||||
debug!("EventsStreamRt::new {series:?} {range:?} {rt:?} {readopts:?}");
|
||||
let msp_inp =
|
||||
crate::events2::msp::MspStreamRt::new(rt.clone(), series.clone(), range.clone(), scyqueue.clone());
|
||||
Self {
|
||||
@@ -113,7 +128,7 @@ impl EventsStreamRt {
|
||||
scalar_type,
|
||||
shape,
|
||||
range,
|
||||
with_values,
|
||||
readopts,
|
||||
state: State::Begin,
|
||||
scyqueue,
|
||||
msp_inp,
|
||||
@@ -140,7 +155,7 @@ impl EventsStreamRt {
|
||||
ts_msp,
|
||||
self.range.clone(),
|
||||
fwd,
|
||||
self.with_values,
|
||||
self.readopts.clone(),
|
||||
scyqueue,
|
||||
);
|
||||
let scalar_type = self.scalar_type.clone();
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use super::events::EventReadOpts;
|
||||
use super::events::EventsStreamRt;
|
||||
use super::firstbefore::FirstBeforeAndInside;
|
||||
use crate::events2::firstbefore;
|
||||
@@ -30,6 +31,8 @@ pub enum Error {
|
||||
Input(#[from] crate::events2::firstbefore::Error),
|
||||
Events(#[from] crate::events2::events::Error),
|
||||
Logic,
|
||||
OrderMin,
|
||||
OrderMax,
|
||||
}
|
||||
|
||||
enum Resolvable<F>
|
||||
@@ -103,7 +106,7 @@ pub struct MergeRts {
|
||||
range: ScyllaSeriesRange,
|
||||
range_mt: ScyllaSeriesRange,
|
||||
range_lt: ScyllaSeriesRange,
|
||||
with_values: bool,
|
||||
readopts: EventReadOpts,
|
||||
scyqueue: ScyllaQueue,
|
||||
inp_st: Option<Box<TI>>,
|
||||
inp_mt: Option<Box<TI>>,
|
||||
@@ -123,7 +126,7 @@ impl MergeRts {
|
||||
scalar_type: ScalarType,
|
||||
shape: Shape,
|
||||
range: ScyllaSeriesRange,
|
||||
with_values: bool,
|
||||
readopts: EventReadOpts,
|
||||
scyqueue: ScyllaQueue,
|
||||
) -> Self {
|
||||
Self {
|
||||
@@ -133,7 +136,7 @@ impl MergeRts {
|
||||
range_mt: range.clone(),
|
||||
range_lt: range.clone(),
|
||||
range,
|
||||
with_values,
|
||||
readopts,
|
||||
scyqueue,
|
||||
inp_st: None,
|
||||
inp_mt: None,
|
||||
@@ -161,7 +164,7 @@ impl MergeRts {
|
||||
self.scalar_type.clone(),
|
||||
self.shape.clone(),
|
||||
range,
|
||||
self.with_values,
|
||||
self.readopts.clone(),
|
||||
self.scyqueue.clone(),
|
||||
);
|
||||
let inp = TI::new(inp, tsbeg);
|
||||
@@ -182,7 +185,7 @@ impl MergeRts {
|
||||
self.scalar_type.clone(),
|
||||
self.shape.clone(),
|
||||
range,
|
||||
self.with_values,
|
||||
self.readopts.clone(),
|
||||
self.scyqueue.clone(),
|
||||
);
|
||||
let inp = TI::new(inp, tsbeg);
|
||||
@@ -202,7 +205,7 @@ impl MergeRts {
|
||||
self.scalar_type.clone(),
|
||||
self.shape.clone(),
|
||||
range,
|
||||
self.with_values,
|
||||
self.readopts.clone(),
|
||||
self.scyqueue.clone(),
|
||||
);
|
||||
let inp = TI::new(inp, tsbeg);
|
||||
@@ -309,7 +312,7 @@ impl Stream for MergeRts {
|
||||
"\n\n--------------------------\n", item_min, self.ts_seen_max
|
||||
);
|
||||
self.state = State::Done;
|
||||
break Ready(Some(Err(Error::Logic)));
|
||||
break Ready(Some(Err(Error::OrderMin)));
|
||||
}
|
||||
}
|
||||
if let Some(item_max) = item.ts_max() {
|
||||
@@ -319,7 +322,7 @@ impl Stream for MergeRts {
|
||||
"\n\n--------------------------\n", item_max, self.ts_seen_max
|
||||
);
|
||||
self.state = State::Done;
|
||||
break Ready(Some(Err(Error::Logic)));
|
||||
break Ready(Some(Err(Error::OrderMax)));
|
||||
} else {
|
||||
self.ts_seen_max = item_max;
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ pub struct StmtsLspShape {
|
||||
f64: PreparedStatement,
|
||||
bool: PreparedStatement,
|
||||
string: PreparedStatement,
|
||||
enumvals: PreparedStatement,
|
||||
}
|
||||
|
||||
impl StmtsLspShape {
|
||||
@@ -48,6 +49,7 @@ impl StmtsLspShape {
|
||||
"f64" => &self.f64,
|
||||
"bool" => &self.bool,
|
||||
"string" => &self.string,
|
||||
"enum" => &self.enumvals,
|
||||
_ => return Err(Error::MissingQuery(format!("no query for stname {stname}"))),
|
||||
};
|
||||
Ok(ret)
|
||||
@@ -182,6 +184,15 @@ async fn make_lsp_shape(
|
||||
f64: maker("f64").await?,
|
||||
bool: maker("bool").await?,
|
||||
string: maker("string").await?,
|
||||
enumvals: if shapepre == "scalar" {
|
||||
make_lsp(ks, rt, shapepre, "enum", "ts_lsp, value, valuestr", bck, scy).await?
|
||||
} else {
|
||||
// exists only for scalar, therefore produce some dummy here
|
||||
let table_name = "ts_msp";
|
||||
let cql = format!("select ts_msp from {}.{}{} limit 1", ks, rt.table_prefix(), table_name);
|
||||
let qu = scy.prepare(cql).await?;
|
||||
qu
|
||||
},
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
@@ -204,10 +215,10 @@ async fn make_rt(ks: &str, rt: &RetentionTime, scy: &Session) -> Result<StmtsEve
|
||||
let ret = StmtsEventsRt {
|
||||
ts_msp_fwd: make_msp_dir(ks, rt, false, scy).await?,
|
||||
ts_msp_bck: make_msp_dir(ks, rt, true, scy).await?,
|
||||
lsp_fwd_val: make_lsp_dir(ks, rt, "ts_lsp, pulse, value", false, scy).await?,
|
||||
lsp_bck_val: make_lsp_dir(ks, rt, "ts_lsp, pulse, value", true, scy).await?,
|
||||
lsp_fwd_ts: make_lsp_dir(ks, rt, "ts_lsp, pulse", false, scy).await?,
|
||||
lsp_bck_ts: make_lsp_dir(ks, rt, "ts_lsp, pulse", true, scy).await?,
|
||||
lsp_fwd_val: make_lsp_dir(ks, rt, "ts_lsp, value", false, scy).await?,
|
||||
lsp_bck_val: make_lsp_dir(ks, rt, "ts_lsp, value", true, scy).await?,
|
||||
lsp_fwd_ts: make_lsp_dir(ks, rt, "ts_lsp", false, scy).await?,
|
||||
lsp_bck_ts: make_lsp_dir(ks, rt, "ts_lsp", true, scy).await?,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ pub enum Error {
|
||||
ChannelRecv,
|
||||
Join,
|
||||
Toplist(#[from] crate::accounting::toplist::Error),
|
||||
MissingKeyspaceConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -153,9 +154,12 @@ impl ScyllaWorker {
|
||||
self.scyconf_mt.keyspace.as_str(),
|
||||
self.scyconf_lt.keyspace.as_str(),
|
||||
];
|
||||
let stmts = StmtsEvents::new(kss.try_into().unwrap(), &scy).await?;
|
||||
info!("scylla worker PREPARE START");
|
||||
let stmts = StmtsEvents::new(kss.try_into().map_err(|_| Error::MissingKeyspaceConfig)?, &scy).await?;
|
||||
let stmts = Arc::new(stmts);
|
||||
info!("scylla worker PREPARE DONE");
|
||||
loop {
|
||||
info!("scylla worker WAIT FOR JOB");
|
||||
let x = self.rx.recv().await;
|
||||
let job = match x {
|
||||
Ok(x) => x,
|
||||
@@ -166,12 +170,14 @@ impl ScyllaWorker {
|
||||
};
|
||||
match job {
|
||||
Job::FindTsMsp(rt, series, range, bck, tx) => {
|
||||
info!("scylla worker Job::FindTsMsp");
|
||||
let res = crate::events2::msp::find_ts_msp(&rt, series, range, bck, &stmts, &scy).await;
|
||||
if tx.send(res.map_err(Into::into)).await.is_err() {
|
||||
// TODO count for stats
|
||||
}
|
||||
}
|
||||
Job::ReadNextValues(job) => {
|
||||
info!("scylla worker Job::ReadNextValues");
|
||||
let fut = (job.futgen)(scy.clone(), stmts.clone());
|
||||
let res = fut.await;
|
||||
if job.tx.send(res.map_err(Into::into)).await.is_err() {
|
||||
@@ -179,6 +185,7 @@ impl ScyllaWorker {
|
||||
}
|
||||
}
|
||||
Job::AccountingReadTs(rt, ts, tx) => {
|
||||
info!("scylla worker Job::AccountingReadTs");
|
||||
let ks = match &rt {
|
||||
RetentionTime::Short => &self.scyconf_st.keyspace,
|
||||
RetentionTime::Medium => &self.scyconf_mt.keyspace,
|
||||
@@ -191,5 +198,6 @@ impl ScyllaWorker {
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("scylla worker ended");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user