Prepare scylla queries using less macro usage

This commit is contained in:
Dominik Werder
2024-06-13 09:42:19 +02:00
parent 03e8ac7a70
commit e3669e4335
6 changed files with 319 additions and 122 deletions
+2
View File
@@ -6,6 +6,7 @@ use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem; use items_0::streamitem::StreamItem;
use items_2::channelevents::ChannelEvents; use items_2::channelevents::ChannelEvents;
use netpod::log::*; use netpod::log::*;
use netpod::ttl::RetentionTime;
use netpod::ChConf; use netpod::ChConf;
use query::api4::events::EventsSubQuery; use query::api4::events::EventsSubQuery;
use scyllaconn::worker::ScyllaQueue; use scyllaconn::worker::ScyllaQueue;
@@ -28,6 +29,7 @@ pub async fn scylla_channel_event_stream(
let with_values = evq.need_value_data(); let with_values = evq.need_value_data();
debug!("\n\nmake EventsStreamScylla {series:?} {scalar_type:?} {shape:?}\n"); debug!("\n\nmake EventsStreamScylla {series:?} {scalar_type:?} {shape:?}\n");
let stream = scyllaconn::events::EventsStreamScylla::new( let stream = scyllaconn::events::EventsStreamScylla::new(
RetentionTime::Short,
series, series,
evq.range().into(), evq.range().into(),
do_one_before_range, do_one_before_range,
+260 -103
View File
@@ -15,6 +15,7 @@ use items_2::channelevents::ChannelEvents;
use items_2::eventsdim0::EventsDim0; use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim1::EventsDim1; use items_2::eventsdim1::EventsDim1;
use netpod::log::*; use netpod::log::*;
use netpod::ttl::RetentionTime;
use netpod::DtNano; use netpod::DtNano;
use netpod::ScalarType; use netpod::ScalarType;
use netpod::Shape; use netpod::Shape;
@@ -24,7 +25,6 @@ use scylla::frame::response::result::Row;
use scylla::prepared_statement::PreparedStatement; use scylla::prepared_statement::PreparedStatement;
use scylla::Session; use scylla::Session;
use scylla::Session as ScySession; use scylla::Session as ScySession;
use std::collections::BTreeMap;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::mem; use std::mem;
use std::pin::Pin; use std::pin::Pin;
@@ -33,71 +33,212 @@ use std::task::Context;
use std::task::Poll; use std::task::Poll;
#[derive(Debug)] #[derive(Debug)]
pub struct StmtsEventsRt { pub struct StmtsLspShape {
ts_msp_bck: PreparedStatement, u8: PreparedStatement,
ts_msp_fwd: PreparedStatement, u16: PreparedStatement,
read_value_queries: BTreeMap<String, PreparedStatement>, u32: PreparedStatement,
u64: PreparedStatement,
i8: PreparedStatement,
i16: PreparedStatement,
i32: PreparedStatement,
i64: PreparedStatement,
f32: PreparedStatement,
f64: PreparedStatement,
bool: PreparedStatement,
string: PreparedStatement,
} }
impl StmtsEventsRt { impl StmtsLspShape {
pub(super) async fn new(rtpre: &str, scy: &Session) -> Result<Self, Error> { fn st(&self, stname: &str) -> Result<&PreparedStatement, Error> {
let cql = format!( let ret = match stname {
"select ts_msp from {}{} where series = ? and ts_msp < ? order by ts_msp desc limit 2", "u8" => &self.u8,
rtpre, "ts_msp" _ => return Err(Error::with_msg_no_trace(format!("no query for stname {stname}"))),
);
let ts_msp_bck = scy.prepare(cql).await.err_conv()?;
let cql = format!(
"select ts_msp from {}{} where series = ? and ts_msp >= ? and ts_msp < ?",
rtpre, "ts_msp"
);
let ts_msp_fwd = scy.prepare(cql).await.err_conv()?;
let mut read_value_queries = BTreeMap::new();
for sct in [
"u8", "u16", "u32", "u64", "i8", "i16", "i32", "i64", "f32", "f64", "bool", "string",
] {
let combinations = [
("timestamps", "scalar", "ts_lsp, pulse"),
("timestamps", "array", "ts_lsp, pulse"),
("values", "scalar", "ts_lsp, pulse, value"),
("valueblobs", "array", "ts_lsp, pulse, valueblob"),
];
for com in combinations {
let query_name = format!("{}_{}_{}_fwd", com.1, sct, com.0);
let cql = format!(
concat!(
"select {} from {}events_{}_{}",
" where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?"
),
com.2, rtpre, com.1, sct,
);
let qu = scy.prepare(cql).await.err_conv()?;
read_value_queries.insert(query_name, qu);
let query_name = format!("{}_{}_{}_bck", com.1, sct, com.0);
let cql = format!(
concat!(
"select {} from {}events_{}_{}",
" where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1"
),
com.2, rtpre, com.1, sct,
);
let qu = scy.prepare(cql).await.err_conv()?;
read_value_queries.insert(query_name, qu);
}
}
let ret = Self {
ts_msp_bck,
ts_msp_fwd,
read_value_queries,
}; };
Ok(ret) Ok(ret)
} }
} }
#[derive(Debug)]
pub struct StmtsLspDir {
scalar: StmtsLspShape,
array: StmtsLspShape,
}
impl StmtsLspDir {
fn shape(&self, array: bool) -> &StmtsLspShape {
if array {
&self.array
} else {
&self.scalar
}
}
}
#[derive(Debug)]
pub struct StmtsEventsRt {
ts_msp_fwd: PreparedStatement,
ts_msp_bck: PreparedStatement,
lsp_fwd_val: StmtsLspDir,
lsp_bck_val: StmtsLspDir,
lsp_fwd_ts: StmtsLspDir,
lsp_bck_ts: StmtsLspDir,
}
impl StmtsEventsRt {
fn lsp(&self, bck: bool, val: bool) -> &StmtsLspDir {
if bck {
if val {
&self.lsp_bck_val
} else {
&self.lsp_bck_ts
}
} else {
if val {
&self.lsp_fwd_val
} else {
&self.lsp_fwd_ts
}
}
}
}
#[derive(Debug)]
pub struct StmtsEvents {
st: StmtsEventsRt,
mt: StmtsEventsRt,
lt: StmtsEventsRt,
}
async fn make_msp_dir(ks: &str, rt: &RetentionTime, bck: bool, scy: &Session) -> Result<PreparedStatement, Error> {
let table_name = "ts_msp";
let select_cond = if bck {
"ts_msp < ? order by ts_msp desc limit 2"
} else {
"ts_msp >= ? and ts_msp < ?"
};
let cql = format!(
"select ts_msp from {}.{}{} where series = ? and {}",
ks,
rt.table_prefix(),
table_name,
select_cond
);
let qu = scy.prepare(cql).await.err_conv()?;
Ok(qu)
}
async fn make_lsp(
ks: &str,
rt: &RetentionTime,
shapepre: &str,
stname: &str,
values: &str,
bck: bool,
scy: &Session,
) -> Result<PreparedStatement, Error> {
let select_cond = if bck {
"ts_lsp < ? order by ts_lsp desc limit 1"
} else {
"ts_lsp >= ? and ts_lsp < ?"
};
let cql = format!(
concat!(
"select {} from {}.{}events_{}_{}",
" where series = ? and ts_msp = ? and {}"
),
values,
ks,
rt.table_prefix(),
shapepre,
stname,
select_cond
);
let qu = scy.prepare(cql).await.err_conv()?;
Ok(qu)
}
async fn make_lsp_shape(
ks: &str,
rt: &RetentionTime,
shapepre: &str,
values: &str,
bck: bool,
scy: &Session,
) -> Result<StmtsLspShape, Error> {
let values = if shapepre.contains("array") {
values.replace("value", "valueblob")
} else {
values.into()
};
let values = &values;
let maker = |stname| make_lsp(ks, rt, shapepre, stname, values, bck, scy);
let ret = StmtsLspShape {
u8: maker("u8").await?,
u16: maker("u16").await?,
u32: maker("u32").await?,
u64: maker("u64").await?,
i8: maker("i8").await?,
i16: maker("i16").await?,
i32: maker("i32").await?,
i64: maker("i64").await?,
f32: maker("f32").await?,
f64: maker("f64").await?,
bool: maker("bool").await?,
string: maker("string").await?,
};
Ok(ret)
}
async fn make_lsp_dir(
ks: &str,
rt: &RetentionTime,
values: &str,
bck: bool,
scy: &Session,
) -> Result<StmtsLspDir, Error> {
let ret = StmtsLspDir {
scalar: make_lsp_shape(ks, rt, "scalar", values, bck, scy).await?,
array: make_lsp_shape(ks, rt, "array", values, bck, scy).await?,
};
Ok(ret)
}
async fn make_rt(ks: &str, rt: &RetentionTime, scy: &Session) -> Result<StmtsEventsRt, Error> {
let ret = StmtsEventsRt {
ts_msp_fwd: make_msp_dir(ks, rt, false, scy).await?,
ts_msp_bck: make_msp_dir(ks, rt, true, scy).await?,
lsp_fwd_val: make_lsp_dir(ks, rt, "ts_lsp, pulse, value", false, scy).await?,
lsp_bck_val: make_lsp_dir(ks, rt, "ts_lsp, pulse, value", true, scy).await?,
lsp_fwd_ts: make_lsp_dir(ks, rt, "ts_lsp, pulse", false, scy).await?,
lsp_bck_ts: make_lsp_dir(ks, rt, "ts_lsp, pulse", true, scy).await?,
};
Ok(ret)
}
impl StmtsEvents {
pub(super) async fn new(ks: [&str; 3], scy: &Session) -> Result<Self, Error> {
let ret = StmtsEvents {
st: make_rt(ks[0], &RetentionTime::Short, scy).await?,
mt: make_rt(ks[1], &RetentionTime::Medium, scy).await?,
lt: make_rt(ks[2], &RetentionTime::Long, scy).await?,
};
Ok(ret)
}
fn rt(&self, rt: &RetentionTime) -> &StmtsEventsRt {
match rt {
RetentionTime::Short => &self.st,
RetentionTime::Medium => &self.mt,
RetentionTime::Long => &&self.lt,
}
}
}
pub(super) async fn find_ts_msp_worker( pub(super) async fn find_ts_msp_worker(
rt: &RetentionTime,
series: u64, series: u64,
range: ScyllaSeriesRange, range: ScyllaSeriesRange,
stmts: &StmtsEventsRt, stmts: &StmtsEvents,
scy: &ScySession, scy: &ScySession,
) -> Result<(VecDeque<TsMs>, VecDeque<TsMs>), Error> { ) -> Result<(VecDeque<TsMs>, VecDeque<TsMs>), Error> {
trace!("find_ts_msp series {:?} {:?}", series, range); trace!("find_ts_msp series {:?} {:?}", series, range);
@@ -106,7 +247,7 @@ pub(super) async fn find_ts_msp_worker(
let params = (series as i64, range.beg().ms() as i64); let params = (series as i64, range.beg().ms() as i64);
trace!("find_ts_msp query 1 params {:?}", params); trace!("find_ts_msp query 1 params {:?}", params);
let mut res = scy let mut res = scy
.execute_iter(stmts.ts_msp_bck.clone(), params) .execute_iter(stmts.rt(rt).ts_msp_bck.clone(), params)
.await .await
.err_conv()? .err_conv()?
.into_typed::<(i64,)>(); .into_typed::<(i64,)>();
@@ -119,7 +260,7 @@ pub(super) async fn find_ts_msp_worker(
let params = (series as i64, range.beg().ms() as i64, 1 + range.end().ms() as i64); let params = (series as i64, range.beg().ms() as i64, 1 + range.end().ms() as i64);
trace!("find_ts_msp query 2 params {:?}", params); trace!("find_ts_msp query 2 params {:?}", params);
let mut res = scy let mut res = scy
.execute_iter(stmts.ts_msp_fwd.clone(), params) .execute_iter(stmts.rt(rt).ts_msp_fwd.clone(), params)
.await .await
.err_conv()? .err_conv()?
.into_typed::<(i64,)>(); .into_typed::<(i64,)>();
@@ -168,7 +309,7 @@ macro_rules! impl_scaty_scalar {
<Self as ValTy>::default() <Self as ValTy>::default()
} }
fn table_name() -> &'static str { fn table_name() -> &'static str {
$table_name concat!("scalar_", $table_name)
} }
fn default() -> Self { fn default() -> Self {
<Self as std::default::Default>::default() <Self as std::default::Default>::default()
@@ -209,7 +350,7 @@ macro_rules! impl_scaty_array {
} }
} }
fn table_name() -> &'static str { fn table_name() -> &'static str {
$table_name concat!("array_", $table_name)
} }
fn default() -> Self { fn default() -> Self {
Vec::new() Vec::new()
@@ -234,11 +375,12 @@ impl ValTy for Vec<String> {
} }
fn from_valueblob(inp: Vec<u8>) -> Self { fn from_valueblob(inp: Vec<u8>) -> Self {
todo!() warn!("ValTy::from_valueblob for Vec<String>");
Vec::new()
} }
fn table_name() -> &'static str { fn table_name() -> &'static str {
"st_events_array_enum" "array_string"
} }
fn default() -> Self { fn default() -> Self {
@@ -250,36 +392,37 @@ impl ValTy for Vec<String> {
} }
fn st_name() -> &'static str { fn st_name() -> &'static str {
"enum" "string"
} }
} }
impl_scaty_scalar!(u8, i8, "u8", "st_events_scalar_u8"); impl_scaty_scalar!(u8, i8, "u8", "u8");
impl_scaty_scalar!(u16, i16, "u16", "st_events_scalar_u16"); impl_scaty_scalar!(u16, i16, "u16", "u16");
impl_scaty_scalar!(u32, i32, "u32", "st_events_scalar_u32"); impl_scaty_scalar!(u32, i32, "u32", "u32");
impl_scaty_scalar!(u64, i64, "u64", "st_events_scalar_u64"); impl_scaty_scalar!(u64, i64, "u64", "u64");
impl_scaty_scalar!(i8, i8, "i8", "st_events_scalar_i8"); impl_scaty_scalar!(i8, i8, "i8", "i8");
impl_scaty_scalar!(i16, i16, "i16", "st_events_scalar_i16"); impl_scaty_scalar!(i16, i16, "i16", "i16");
impl_scaty_scalar!(i32, i32, "i32", "st_events_scalar_i32"); impl_scaty_scalar!(i32, i32, "i32", "i32");
impl_scaty_scalar!(i64, i64, "i64", "st_events_scalar_i64"); impl_scaty_scalar!(i64, i64, "i64", "i64");
impl_scaty_scalar!(f32, f32, "f32", "st_events_scalar_f32"); impl_scaty_scalar!(f32, f32, "f32", "f32");
impl_scaty_scalar!(f64, f64, "f64", "st_events_scalar_f64"); impl_scaty_scalar!(f64, f64, "f64", "f64");
impl_scaty_scalar!(bool, bool, "bool", "st_events_scalar_bool"); impl_scaty_scalar!(bool, bool, "bool", "bool");
impl_scaty_scalar!(String, String, "string", "st_events_scalar_string"); impl_scaty_scalar!(String, String, "string", "string");
impl_scaty_array!(Vec<u8>, u8, Vec<i8>, "u8", "st_events_array_u8"); impl_scaty_array!(Vec<u8>, u8, Vec<i8>, "u8", "u8");
impl_scaty_array!(Vec<u16>, u16, Vec<i16>, "u16", "st_events_array_u16"); impl_scaty_array!(Vec<u16>, u16, Vec<i16>, "u16", "u16");
impl_scaty_array!(Vec<u32>, u32, Vec<i32>, "u32", "st_events_array_u32"); impl_scaty_array!(Vec<u32>, u32, Vec<i32>, "u32", "u32");
impl_scaty_array!(Vec<u64>, u64, Vec<i64>, "u64", "st_events_array_u64"); impl_scaty_array!(Vec<u64>, u64, Vec<i64>, "u64", "u64");
impl_scaty_array!(Vec<i8>, i8, Vec<i8>, "i8", "st_events_array_i8"); impl_scaty_array!(Vec<i8>, i8, Vec<i8>, "i8", "i8");
impl_scaty_array!(Vec<i16>, i16, Vec<i16>, "i16", "st_events_array_i16"); impl_scaty_array!(Vec<i16>, i16, Vec<i16>, "i16", "i16");
impl_scaty_array!(Vec<i32>, i32, Vec<i32>, "i32", "st_events_array_i32"); impl_scaty_array!(Vec<i32>, i32, Vec<i32>, "i32", "i32");
impl_scaty_array!(Vec<i64>, i64, Vec<i64>, "i64", "st_events_array_i64"); impl_scaty_array!(Vec<i64>, i64, Vec<i64>, "i64", "i64");
impl_scaty_array!(Vec<f32>, f32, Vec<f32>, "f32", "st_events_array_f32"); impl_scaty_array!(Vec<f32>, f32, Vec<f32>, "f32", "f32");
impl_scaty_array!(Vec<f64>, f64, Vec<f64>, "f64", "st_events_array_f64"); impl_scaty_array!(Vec<f64>, f64, Vec<f64>, "f64", "f64");
impl_scaty_array!(Vec<bool>, bool, Vec<bool>, "bool", "st_events_array_bool"); impl_scaty_array!(Vec<bool>, bool, Vec<bool>, "bool", "bool");
struct ReadNextValuesOpts { struct ReadNextValuesOpts {
rt: RetentionTime,
series: u64, series: u64,
ts_msp: TsMs, ts_msp: TsMs,
range: ScyllaSeriesRange, range: ScyllaSeriesRange,
@@ -294,23 +437,23 @@ where
{ {
// TODO could take scyqeue out of opts struct. // TODO could take scyqeue out of opts struct.
let scyqueue = opts.scyqueue.clone(); let scyqueue = opts.scyqueue.clone();
let futgen = Box::new(|scy: Arc<ScySession>, stmts: Arc<StmtsEventsRt>| { let futgen = Box::new(|scy: Arc<ScySession>, stmts: Arc<StmtsEvents>| {
let fut = read_next_values_worker::<ST>(opts, scy, stmts); let fut = read_next_values_2::<ST>(opts, scy, stmts);
Box::pin(fut) as Pin<Box<dyn Future<Output = Result<Box<dyn Events>, err::Error>> + Send>> Box::pin(fut) as Pin<Box<dyn Future<Output = Result<Box<dyn Events>, err::Error>> + Send>>
}); });
let res = scyqueue.read_next_values(futgen).await?; let res = scyqueue.read_next_values(futgen).await?;
Ok(res) Ok(res)
} }
async fn read_next_values_worker<ST>( async fn read_next_values_2<ST>(
opts: ReadNextValuesOpts, opts: ReadNextValuesOpts,
scy: Arc<ScySession>, scy: Arc<ScySession>,
stmts: Arc<StmtsEventsRt>, stmts: Arc<StmtsEvents>,
) -> Result<Box<dyn Events>, Error> ) -> Result<Box<dyn Events>, Error>
where where
ST: ValTy, ST: ValTy,
{ {
trace!("read_next_values_worker {} {}", opts.series, opts.ts_msp); trace!("read_next_values_2 {} {}", opts.series, opts.ts_msp);
let series = opts.series; let series = opts.series;
let ts_msp = opts.ts_msp; let ts_msp = opts.ts_msp;
let range = opts.range; let range = opts.range;
@@ -350,11 +493,11 @@ where
format!("scalar_{}_timestamps_{}", ST::st_name(), dir) format!("scalar_{}_timestamps_{}", ST::st_name(), dir)
} }
}; };
let qu = stmts.read_value_queries.get(&qu_name).ok_or_else(|| { let qu = stmts
let e = Error::with_msg_no_trace(format!("can not find query name {}", qu_name)); .rt(&opts.rt)
error!("{e}"); .lsp(!opts.fwd, opts.with_values)
e .shape(ST::is_valueblob())
})?; .st(ST::st_name())?;
let params = ( let params = (
series as i64, series as i64,
ts_msp.ms() as i64, ts_msp.ms() as i64,
@@ -391,11 +534,11 @@ where
format!("scalar_{}_timestamps_{}", ST::st_name(), dir) format!("scalar_{}_timestamps_{}", ST::st_name(), dir)
} }
}; };
let qu = stmts.read_value_queries.get(&qu_name).ok_or_else(|| { let qu = stmts
let e = Error::with_msg_no_trace(format!("can not find query name {}", qu_name)); .rt(&opts.rt)
error!("{e}"); .lsp(!opts.fwd, opts.with_values)
e .shape(ST::is_valueblob())
})?; .st(ST::st_name())?;
let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64); let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64);
trace!("BCK event search params {:?}", params); trace!("BCK event search params {:?}", params);
let mut res = scy.execute_iter(qu.clone(), params).await.err_conv()?; let mut res = scy.execute_iter(qu.clone(), params).await.err_conv()?;
@@ -474,6 +617,7 @@ fn convert_rows<ST: ValTy>(
} }
struct ReadValues { struct ReadValues {
rt: RetentionTime,
series: u64, series: u64,
scalar_type: ScalarType, scalar_type: ScalarType,
shape: Shape, shape: Shape,
@@ -488,6 +632,7 @@ struct ReadValues {
impl ReadValues { impl ReadValues {
fn new( fn new(
rt: RetentionTime,
series: u64, series: u64,
scalar_type: ScalarType, scalar_type: ScalarType,
shape: Shape, shape: Shape,
@@ -498,6 +643,7 @@ impl ReadValues {
scyqueue: ScyllaQueue, scyqueue: ScyllaQueue,
) -> Self { ) -> Self {
let mut ret = Self { let mut ret = Self {
rt,
series, series,
scalar_type, scalar_type,
shape, shape,
@@ -527,6 +673,7 @@ impl ReadValues {
fn make_fut(&mut self, ts_msp: TsMs) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>> { fn make_fut(&mut self, ts_msp: TsMs) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>> {
let opts = ReadNextValuesOpts { let opts = ReadNextValuesOpts {
rt: self.rt.clone(),
series: self.series.clone(), series: self.series.clone(),
ts_msp, ts_msp,
range: self.range.clone(), range: self.range.clone(),
@@ -600,6 +747,7 @@ enum FrState {
} }
pub struct EventsStreamScylla { pub struct EventsStreamScylla {
rt: RetentionTime,
state: FrState, state: FrState,
series: u64, series: u64,
scalar_type: ScalarType, scalar_type: ScalarType,
@@ -618,6 +766,7 @@ pub struct EventsStreamScylla {
impl EventsStreamScylla { impl EventsStreamScylla {
pub fn new( pub fn new(
rt: RetentionTime,
series: u64, series: u64,
range: ScyllaSeriesRange, range: ScyllaSeriesRange,
do_one_before_range: bool, do_one_before_range: bool,
@@ -629,6 +778,7 @@ impl EventsStreamScylla {
) -> Self { ) -> Self {
debug!("EventsStreamScylla::new"); debug!("EventsStreamScylla::new");
Self { Self {
rt,
state: FrState::New, state: FrState::New,
series, series,
scalar_type, scalar_type,
@@ -663,6 +813,7 @@ impl EventsStreamScylla {
if let Some(msp) = self.ts_msp_bck.pop_back() { if let Some(msp) = self.ts_msp_bck.pop_back() {
trace!("start ReadBack1 msp {}", msp); trace!("start ReadBack1 msp {}", msp);
let st = ReadValues::new( let st = ReadValues::new(
self.rt.clone(),
self.series, self.series,
self.scalar_type.clone(), self.scalar_type.clone(),
self.shape.clone(), self.shape.clone(),
@@ -676,6 +827,7 @@ impl EventsStreamScylla {
} else if self.ts_msp_fwd.len() > 0 { } else if self.ts_msp_fwd.len() > 0 {
trace!("begin immediately with forward read"); trace!("begin immediately with forward read");
let st = ReadValues::new( let st = ReadValues::new(
self.rt.clone(),
self.series, self.series,
self.scalar_type.clone(), self.scalar_type.clone(),
self.shape.clone(), self.shape.clone(),
@@ -698,6 +850,7 @@ impl EventsStreamScylla {
if self.ts_msp_fwd.len() > 0 { if self.ts_msp_fwd.len() > 0 {
trace!("start forward read after back1"); trace!("start forward read after back1");
let st = ReadValues::new( let st = ReadValues::new(
self.rt.clone(),
self.series, self.series,
self.scalar_type.clone(), self.scalar_type.clone(),
self.shape.clone(), self.shape.clone(),
@@ -715,6 +868,7 @@ impl EventsStreamScylla {
if let Some(msp) = self.ts_msp_bck.pop_back() { if let Some(msp) = self.ts_msp_bck.pop_back() {
trace!("start ReadBack2 msp {}", msp); trace!("start ReadBack2 msp {}", msp);
let st = ReadValues::new( let st = ReadValues::new(
self.rt.clone(),
self.series, self.series,
self.scalar_type.clone(), self.scalar_type.clone(),
self.shape.clone(), self.shape.clone(),
@@ -728,6 +882,7 @@ impl EventsStreamScylla {
} else if self.ts_msp_fwd.len() > 0 { } else if self.ts_msp_fwd.len() > 0 {
trace!("no 2nd back MSP, go for forward read"); trace!("no 2nd back MSP, go for forward read");
let st = ReadValues::new( let st = ReadValues::new(
self.rt.clone(),
self.series, self.series,
self.scalar_type.clone(), self.scalar_type.clone(),
self.shape.clone(), self.shape.clone(),
@@ -753,6 +908,7 @@ impl EventsStreamScylla {
if self.ts_msp_fwd.len() > 0 { if self.ts_msp_fwd.len() > 0 {
trace!("start forward read after back2"); trace!("start forward read after back2");
let st = ReadValues::new( let st = ReadValues::new(
self.rt.clone(),
self.series, self.series,
self.scalar_type.clone(), self.scalar_type.clone(),
self.shape.clone(), self.shape.clone(),
@@ -771,11 +927,12 @@ impl EventsStreamScylla {
} }
async fn find_ts_msp_via_queue( async fn find_ts_msp_via_queue(
rt: RetentionTime,
series: u64, series: u64,
range: ScyllaSeriesRange, range: ScyllaSeriesRange,
scyqueue: ScyllaQueue, scyqueue: ScyllaQueue,
) -> Result<(VecDeque<TsMs>, VecDeque<TsMs>), crate::worker::Error> { ) -> Result<(VecDeque<TsMs>, VecDeque<TsMs>), crate::worker::Error> {
scyqueue.find_ts_msp(series, range).await scyqueue.find_ts_msp(rt, series, range).await
} }
impl Stream for EventsStreamScylla { impl Stream for EventsStreamScylla {
@@ -810,7 +967,7 @@ impl Stream for EventsStreamScylla {
FrState::New => { FrState::New => {
let series = self.series.clone(); let series = self.series.clone();
let range = self.range.clone(); let range = self.range.clone();
let fut = find_ts_msp_via_queue(series, range, self.scyqueue.clone()); let fut = find_ts_msp_via_queue(self.rt.clone(), series, range, self.scyqueue.clone());
let fut = Box::pin(fut); let fut = Box::pin(fut);
self.state = FrState::FindMsp(fut); self.state = FrState::FindMsp(fut);
continue; continue;
View File
+1
View File
@@ -3,6 +3,7 @@ pub mod bincache;
pub mod conn; pub mod conn;
pub mod errconv; pub mod errconv;
pub mod events; pub mod events;
pub mod events2;
pub mod range; pub mod range;
pub mod status; pub mod status;
pub mod worker; pub mod worker;
+17 -10
View File
@@ -1,5 +1,5 @@
use crate::conn::create_scy_session_no_ks; use crate::conn::create_scy_session_no_ks;
use crate::events::StmtsEventsRt; use crate::events::StmtsEvents;
use crate::range::ScyllaSeriesRange; use crate::range::ScyllaSeriesRange;
use async_channel::Receiver; use async_channel::Receiver;
use async_channel::Sender; use async_channel::Sender;
@@ -8,6 +8,7 @@ use err::ThisError;
use futures_util::Future; use futures_util::Future;
use items_0::Events; use items_0::Events;
use netpod::log::*; use netpod::log::*;
use netpod::ttl::RetentionTime;
use netpod::ScyllaConfig; use netpod::ScyllaConfig;
use netpod::TsMs; use netpod::TsMs;
use scylla::Session; use scylla::Session;
@@ -33,6 +34,7 @@ impl err::ToErr for Error {
#[derive(Debug)] #[derive(Debug)]
enum Job { enum Job {
FindTsMsp( FindTsMsp(
RetentionTime,
// series-id // series-id
u64, u64,
ScyllaSeriesRange, ScyllaSeriesRange,
@@ -45,7 +47,7 @@ struct ReadNextValues {
futgen: Box< futgen: Box<
dyn FnOnce( dyn FnOnce(
Arc<Session>, Arc<Session>,
Arc<StmtsEventsRt>, Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, err::Error>> + Send>> ) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, err::Error>> + Send>>
+ Send, + Send,
>, >,
@@ -67,11 +69,12 @@ pub struct ScyllaQueue {
impl ScyllaQueue { impl ScyllaQueue {
pub async fn find_ts_msp( pub async fn find_ts_msp(
&self, &self,
rt: RetentionTime,
series: u64, series: u64,
range: ScyllaSeriesRange, range: ScyllaSeriesRange,
) -> Result<(VecDeque<TsMs>, VecDeque<TsMs>), Error> { ) -> Result<(VecDeque<TsMs>, VecDeque<TsMs>), Error> {
let (tx, rx) = async_channel::bounded(1); let (tx, rx) = async_channel::bounded(1);
let job = Job::FindTsMsp(series, range, tx); let job = Job::FindTsMsp(rt, series, range, tx);
self.tx.send(job).await.map_err(|_| Error::ChannelSend)?; self.tx.send(job).await.map_err(|_| Error::ChannelSend)?;
let res = rx.recv().await.map_err(|_| Error::ChannelRecv)??; let res = rx.recv().await.map_err(|_| Error::ChannelRecv)??;
Ok(res) Ok(res)
@@ -81,7 +84,7 @@ impl ScyllaQueue {
where where
F: FnOnce( F: FnOnce(
Arc<Session>, Arc<Session>,
Arc<StmtsEventsRt>, Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, err::Error>> + Send>> ) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, err::Error>> + Send>>
+ Send + Send
+ 'static, + 'static,
@@ -125,9 +128,13 @@ impl ScyllaWorker {
pub async fn work(self) -> Result<(), Error> { pub async fn work(self) -> Result<(), Error> {
let scy = create_scy_session_no_ks(&self.scyconf_st).await?; let scy = create_scy_session_no_ks(&self.scyconf_st).await?;
let scy = Arc::new(scy); let scy = Arc::new(scy);
let rtpre = format!("{}.st_", self.scyconf_st.keyspace); let kss = [
let stmts_st = StmtsEventsRt::new(&rtpre, &scy).await?; self.scyconf_st.keyspace.as_str(),
let stmts_st = Arc::new(stmts_st); self.scyconf_mt.keyspace.as_str(),
self.scyconf_lt.keyspace.as_str(),
];
let stmts = StmtsEvents::new(kss.try_into().unwrap(), &scy).await?;
let stmts = Arc::new(stmts);
loop { loop {
let x = self.rx.recv().await; let x = self.rx.recv().await;
let job = match x { let job = match x {
@@ -138,14 +145,14 @@ impl ScyllaWorker {
} }
}; };
match job { match job {
Job::FindTsMsp(series, range, tx) => { Job::FindTsMsp(rt, series, range, tx) => {
let res = crate::events::find_ts_msp_worker(series, range, &stmts_st, &scy).await; let res = crate::events::find_ts_msp_worker(&rt, series, range, &stmts, &scy).await;
if tx.send(res.map_err(Into::into)).await.is_err() { if tx.send(res.map_err(Into::into)).await.is_err() {
// TODO count for stats // TODO count for stats
} }
} }
Job::ReadNextValues(job) => { Job::ReadNextValues(job) => {
let fut = (job.futgen)(scy.clone(), stmts_st.clone()); let fut = (job.futgen)(scy.clone(), stmts.clone());
let res = fut.await; let res = fut.await;
if job.tx.send(res.map_err(Into::into)).await.is_err() { if job.tx.send(res.map_err(Into::into)).await.is_err() {
// TODO count for stats // TODO count for stats
+39 -9
View File
@@ -106,6 +106,7 @@ where
} }
} }
#[allow(unused)]
struct LogFilterLayer<S, L> struct LogFilterLayer<S, L>
where where
L: tracing_subscriber::Layer<S>, L: tracing_subscriber::Layer<S>,
@@ -157,7 +158,6 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
.init(); .init();
console_subscriber::init(); console_subscriber::init();
} else { } else {
// #[cfg(DISABLED)]
// Logging setup // Logging setup
let filter = tracing_subscriber::EnvFilter::builder() let filter = tracing_subscriber::EnvFilter::builder()
.with_default_directive(tracing::metadata::LevelFilter::INFO.into()) .with_default_directive(tracing::metadata::LevelFilter::INFO.into())
@@ -168,14 +168,43 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
.with_default_directive(tracing::metadata::LevelFilter::INFO.into()) .with_default_directive(tracing::metadata::LevelFilter::INFO.into())
.from_env() .from_env()
.map_err(|e| Error::with_msg_no_trace(format!("can not build tracing env filter {e}")))?; .map_err(|e| Error::with_msg_no_trace(format!("can not build tracing env filter {e}")))?;
// let filter_3 = tracing_subscriber::filter::dynamic_filter_fn(|meta, ctx| { let filter_3 = tracing_subscriber::filter::dynamic_filter_fn(|meta, ctx| {
// // if *meta.level() <= tracing::Level::TRACE {
// if ["scyllaconn"].contains(&meta.target()) { if ["httpret", "scyllaconn"].contains(&meta.target()) {
// true let mut sr = ctx.lookup_current();
// } else { let mut allow = false;
// true while let Some(g) = sr {
// } if g.name() == "log_span_trace" {
// }); allow = true;
break;
} else {
sr = g.parent();
}
}
allow
} else {
false
}
} else if *meta.level() <= tracing::Level::DEBUG {
if ["httpret", "scyllaconn", "items_0", "items_2", "streams"].contains(&meta.target()) {
let mut sr = ctx.lookup_current();
let mut allow = false;
while let Some(g) = sr {
if g.name() == "log_span_trace" || g.name() == "log_span_debug" {
allow = true;
break;
} else {
sr = g.parent();
}
}
allow
} else {
false
}
} else {
true
}
});
let fmt_layer = tracing_subscriber::fmt::Layer::new() let fmt_layer = tracing_subscriber::fmt::Layer::new()
.with_writer(io::stderr) .with_writer(io::stderr)
.with_timer(timer) .with_timer(timer)
@@ -183,6 +212,7 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
.with_ansi(false) .with_ansi(false)
.with_thread_names(true) .with_thread_names(true)
.event_format(formatter::FormatTxt) .event_format(formatter::FormatTxt)
.with_filter(filter_3)
.with_filter(filter_2) .with_filter(filter_2)
.with_filter(filter) .with_filter(filter)
// .and_then(LogFilterLayer::new("lay1".into())) // .and_then(LogFilterLayer::new("lay1".into()))