Track down scylla inconsistencies

This commit is contained in:
Dominik Werder
2025-06-27 17:34:13 +02:00
parent 72cff94b21
commit 287b906ef9
14 changed files with 399 additions and 168 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqretrieve"
version = "0.5.5-aa.14"
version = "0.5.5-aa.15"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2024"

View File

@@ -191,6 +191,14 @@ impl From<Vec<u8>> for ToJsonBody {
}
}
impl From<String> for ToJsonBody {
fn from(value: String) -> Self {
Self {
body: value.into_bytes(),
}
}
}
impl<S: Serialize> From<&S> for ToJsonBody {
fn from(value: &S) -> Self {
Self {

View File

@@ -4,6 +4,7 @@ pub mod binned;
pub mod binned_v2;
pub mod binwriteindex;
pub mod databuffer_tools;
pub mod datasearch;
pub mod docs;
pub mod eventdata;
pub mod events;

View File

@@ -0,0 +1,148 @@
use crate::bodystream::response;
use crate::requests::accepts_json_or_all;
use crate::ReqCtx;
use crate::ServiceSharedResources;
use dbconn::worker::PgQueue;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use http::Method;
use http::StatusCode;
use httpclient::body_empty;
use httpclient::body_string;
use httpclient::IntoBody;
use httpclient::Requ;
use httpclient::StreamResponse;
use httpclient::ToJsonBody;
use netpod::log;
use netpod::req_uri_to_url;
use netpod::ttl::RetentionTime;
use netpod::FromUrl;
use netpod::NodeConfigCached;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsMs;
use netpod::UriError;
use serde::Deserialize;
use serde::Serialize;
use std::collections::BTreeMap;
autoerr::create_error_v1!(
name(Error, "DataSearch"),
enum variants {
NoScylla,
Uri(#[from] UriError),
Json(#[from] serde_json::Error),
Http(#[from] http::Error),
ScyllaWorker(#[from] scyllaconn::worker::Error),
HttpBody(#[from] httpclient::BodyError),
ScyllaType(#[from] scyllaconn::scylla::errors::TypeCheckError),
ScyllaNextRow(#[from] scyllaconn::scylla::errors::NextRowError),
},
);
#[derive(Debug, Serialize, Deserialize)]
pub struct AccountedIngested {
names: Vec<String>,
counts: Vec<u64>,
bytes: Vec<u64>,
scalar_types: Vec<ScalarType>,
shapes: Vec<Shape>,
}
impl AccountedIngested {
fn new() -> Self {
Self {
names: Vec::new(),
counts: Vec::new(),
bytes: Vec::new(),
scalar_types: Vec::new(),
shapes: Vec::new(),
}
}
fn push(&mut self, name: String, counts: u64, bytes: u64, scalar_type: ScalarType, shape: Shape) {
self.names.push(name);
self.counts.push(counts);
self.bytes.push(bytes);
self.scalar_types.push(scalar_type);
self.shapes.push(shape);
}
#[allow(unused)]
fn truncate(&mut self, len: usize) {
self.names.truncate(len);
self.counts.truncate(len);
self.bytes.truncate(len);
self.scalar_types.truncate(len);
self.shapes.truncate(len);
}
}
pub struct DataSearch {}
impl DataSearch {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path().starts_with("/api/4/private/search/data") {
Some(Self {})
} else {
None
}
}
pub async fn handle(
&self,
req: Requ,
ctx: &ReqCtx,
shared_res: &ServiceSharedResources,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, crate::err::Error> {
if req.method() == Method::GET {
if accepts_json_or_all(req.headers()) {
match self.handle_get(req, ctx, shared_res, ncc).await {
Ok(x) => Ok(x),
Err(e) => {
let s = serde_json::to_string(&e.to_string())?;
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_string(s))?)
}
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
}
async fn handle_get(
&self,
req: Requ,
ctx: &ReqCtx,
shared_res: &ServiceSharedResources,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
let url = req_uri_to_url(req.uri())?;
let params: BTreeMap<_, _> = url.query_pairs().collect();
if let Some(scyqu) = &shared_res.scyqueue {
let cql = "select ts_msp from sls_st.st_ts_msp where series = 6293882751490541488";
let st = scyqu.prepare(cql.into()).await?;
let qp = scyqu.execute(st).await?;
let mut it = qp.rows_stream::<(i64,)>()?;
let mut msps_in_table = Vec::new();
while let Some(row) = it.try_next().await? {
msps_in_table.push(row.0);
}
let body = serde_json::json!({
"msps_in_table": msps_in_table,
});
let body = ToJsonBody::from(&body).into_body();
Ok(response(StatusCode::OK).body(body)?)
} else {
Err(Error::NoScylla)
}
// let ret = serde_json::json!({
// "key": "value",
// });
// let body = ToJsonBody::from(&ret).into_body();
// Ok(response(StatusCode::OK).body(body)?)
}
}

View File

@@ -1,3 +1,4 @@
use crate::requests::accepts_json_or_all;
use crate::response;
use crate::ServiceSharedResources;
use dbconn::create_connection;
@@ -166,12 +167,7 @@ impl ChannelConfigHandler {
node_config: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
.headers()
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
if accepts_json_or_all(req.headers()) {
match self.channel_config(req, pgqueue, &node_config).await {
Ok(k) => Ok(k),
Err(e) => {

View File

@@ -478,6 +478,8 @@ async fn http_service_inner(
Ok(h.handle(req, &node_config).await?)
} else if let Some(h) = api1::RequestStatusHandler::handler(&req) {
Ok(h.handle(req, ctx, &node_config).await?)
} else if let Some(h) = api4::datasearch::DataSearch::handler(&req) {
Ok(h.handle(req, ctx, &shared_res, &node_config).await?)
} else if let Some(h) = api4::docs::DocsHandler::handler(&req) {
Ok(h.handle(req, ctx).await?)
} else {

View File

@@ -1,8 +1,8 @@
use crate::log::*;
use futures_util::TryStreamExt;
use netpod::ttl::RetentionTime;
use netpod::TsMs;
use netpod::EMIT_ACCOUNTING_SNAP;
use netpod::TsMs;
use netpod::ttl::RetentionTime;
use scylla::client::session::Session as ScySession;
use scylla::statement::prepared::PreparedStatement;
@@ -107,12 +107,13 @@ async fn read_ts_inner(ks: &str, rt: RetentionTime, ts: TsMs, scy: &ScySession)
concat!(
"select series, count, bytes",
" from {}.{}account_00",
" where part = ? and ts = ?"
" where part = ? and ts = ?",
" bypass cache"
),
ks,
rt.table_prefix()
);
let qu = prep(&cql, scy).await?;
let qu = scy.prepare(cql).await?;
let ts_sec = ts.ms() as i64 / 1000;
let mut ret = UsageData::new(ts);
for part in 0..255_u32 {
@@ -132,7 +133,3 @@ async fn read_ts_inner(ks: &str, rt: RetentionTime, ts: TsMs, scy: &ScySession)
ret.verify()?;
Ok(ret)
}
async fn prep(cql: &str, scy: &ScySession) -> Result<PreparedStatement, Error> {
Ok(scy.prepare(cql).await?)
}

View File

@@ -166,7 +166,8 @@ impl Stream for AccountingStreamScylla {
}
break match &mut self.state {
FrState::New => {
let cql = concat!("select series, count, bytes from account_00 where part = ? and ts = ?");
let cql =
concat!("select series, count, bytes from account_00 where part = ? and ts = ? bypass cache");
let fut = prep(cql, self.scy.clone());
let fut: PrepFut = Box::pin(fut);
self.state = FrState::Prepare(fut);

View File

@@ -1,14 +1,12 @@
use crate::events2::prepare::StmtsCache;
use crate::events2::prepare::StmtsEvents;
use crate::log::*;
use crate::worker::ScyllaQueue;
use daqbuf_series::msp::PrebinnedPartitioning;
use futures_util::TryStreamExt;
use items_0::merge::MergeableTy;
use items_2::binning::container_bins::ContainerBins;
use netpod::ttl::RetentionTime;
use netpod::DtMs;
use netpod::TsNano;
use netpod::ttl::RetentionTime;
use std::ops::Range;
use streams::timebin::cached::reader::BinsReadRes;
@@ -93,41 +91,6 @@ impl streams::timebin::CacheReadProvider for ScyllaPrebinnedReadProvider {
}
}
pub async fn worker_write(
series: u64,
bins: ContainerBins<f32, f32>,
stmts_cache: &StmtsCache,
scy: &ScySession,
) -> Result<(), streams::timebin::cached::reader::Error> {
if true {
error!("TODO retrieval should probably not write a cache at all");
return Err(streams::timebin::cached::reader::Error::TodoImpl);
}
for (((((((&ts1, &ts2), &cnt), min), max), avg), lst), _fnl) in bins.zip_iter() {
let bin_len = DtMs::from_ms_u64((ts2.ns() - ts1.ns()) / 1000000);
// let div = streams::timebin::cached::reader::part_len(bin_len).ns();
let div = 42424242424242;
let msp = ts1.ns() / div;
let off = (ts1.ns() - msp * div) / bin_len.ns();
let params = (
series as i64,
bin_len.ms() as i32,
msp as i64,
off as i32,
cnt as i64,
min,
max,
avg,
lst,
);
// trace!("cache write {:?}", params);
scy.execute_unpaged(stmts_cache.st_write_f32(), params)
.await
.map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?;
}
Ok(())
}
pub async fn worker_read(
rt: RetentionTime,
series: u64,

View File

@@ -1,5 +1,5 @@
use netpod::ScyllaConfig;
use netpod::log::*;
use netpod::log;
use scylla::client::execution_profile::ExecutionProfileBuilder;
use scylla::client::session::Session;
use scylla::client::session_builder::SessionBuilder;
@@ -24,13 +24,13 @@ pub async fn create_scy_session(scyconf: &ScyllaConfig) -> Result<Arc<Session>,
}
pub async fn create_scy_session_no_ks(scyconf: &ScyllaConfig) -> Result<Session, Error> {
info!("creating scylla connection");
log::info!("creating scylla connection");
let scy = SessionBuilder::new()
.pool_size(scylla::client::PoolSize::PerHost(NonZero::new(4).unwrap()))
.known_nodes(&scyconf.hosts)
.default_execution_profile_handle(
ExecutionProfileBuilder::default()
.consistency(Consistency::Quorum)
.consistency(Consistency::All)
.build()
.into_handle(),
)

View File

@@ -54,9 +54,13 @@ macro_rules! trace_redo_fwd_read { ($($arg:expr),*) => ( if false { log::trace!(
macro_rules! trace_emit { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! trace_every_event { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! trace_every_event { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ) }
macro_rules! warn_item { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ) }
macro_rules! warn_item { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ) }
macro_rules! log_fetch_result {
($($arg:tt)*) => { if false { log::trace!("fetch {}", format_args!($($arg)*)); } };
}
#[derive(Debug, Clone)]
pub struct EventReadOpts {
@@ -908,6 +912,7 @@ where
);
trace_fetch!("FWD event search params {:?}", params);
jobtrace.add_event_now(ReadEventKind::CallExecuteIter);
log_fetch_result!("read_next_values_2 {params:?}");
let res = scy.execute_iter(qu.clone(), params).await?;
{
let mut ret = <ST as ValTy>::Container::empty();
@@ -926,6 +931,7 @@ where
let mut it = res.rows_stream::<<ST as ValTy>::ScyRowTy>()?;
while let Some(row) = it.try_next().await? {
let (ts, value) = <ST as ValTy>::scy_row_to_ts_val(ts_msp, row);
log_fetch_result!("read_next_values_2 {params:?} {ts}");
// let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
// let value = <ST as ValTy>::from_scyty(row.1);
ret.push(ts, value);

View File

@@ -6,19 +6,23 @@ use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::TryStreamExt;
use netpod::log;
use netpod::ttl::RetentionTime;
use netpod::TsMs;
use netpod::TsMsVecFmt;
use netpod::log;
use netpod::ttl::RetentionTime;
use scylla::client::session::Session;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
macro_rules! trace_emit { ($det:expr, $($arg:expr),*) => ( if $det { log::trace!($($arg),*); } ) }
macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if $det { log::trace!($($arg)*); } ) }
macro_rules! trace_msp { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) }
macro_rules! trace_msp { ($($arg:tt)*) => ( if true { log::trace!($($arg)*); } ) }
macro_rules! log_fetch_result {
($($arg:tt)*) => { if false { log::trace!("fetch {}", format_args!($($arg)*)); } };
}
autoerr::create_error_v1!(
name(Error, "EventsMsp"),
@@ -28,6 +32,7 @@ autoerr::create_error_v1!(
ScyllaRow(#[from] scylla::errors::NextRowError),
ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError),
ScyllaPagerExecution(#[from] scylla::errors::PagerExecutionError),
TooManyRows,
},
);
@@ -60,11 +65,7 @@ where
}
fn is_taken(&self) -> bool {
if let Self::Taken = self {
true
} else {
false
}
if let Self::Taken = self { true } else { false }
}
}
@@ -313,7 +314,7 @@ pub async fn find_ts_msp(
bck
);
if bck {
find_ts_msp_bck(rt, series, range, stmts, scy).await
find_ts_msp_bck_workaround(rt, series, range, stmts, scy).await
} else {
find_ts_msp_fwd(rt, series, range, stmts, scy).await
}
@@ -326,15 +327,18 @@ async fn find_ts_msp_fwd(
stmts: &StmtsEvents,
scy: &Session,
) -> Result<VecDeque<TsMs>, Error> {
let selfname = "find_ts_msp_fwd";
let mut ret = VecDeque::new();
// TODO time range truncation can be handled better
let params = (series as i64, range.beg().ms() as i64, 1 + range.end().ms() as i64);
log_fetch_result!("{selfname} {:?}", params);
let mut res = scy
.execute_iter(stmts.rt(rt).ts_msp_fwd().clone(), params)
.await?
.rows_stream::<(i64,)>()?;
while let Some(row) = res.try_next().await? {
let ts = TsMs::from_ms_u64(row.0 as u64);
log_fetch_result!("{selfname} {params:?} {ts}");
ret.push_back(ts);
}
Ok(ret)
@@ -347,15 +351,59 @@ async fn find_ts_msp_bck(
stmts: &StmtsEvents,
scy: &Session,
) -> Result<VecDeque<TsMs>, Error> {
let selfname = "find_ts_msp_bck";
let mut ret = VecDeque::new();
let params = (series as i64, range.beg().ms() as i64);
log_fetch_result!("{selfname} {:?}", params);
let mut res = scy
.execute_iter(stmts.rt(rt).ts_msp_bck().clone(), params)
.await?
.rows_stream::<(i64,)>()?;
while let Some(row) = res.try_next().await? {
let ts = TsMs::from_ms_u64(row.0 as u64);
log_fetch_result!("{selfname} {params:?} {ts}");
ret.push_front(ts);
}
Ok(ret)
}
/*
Workaround because scylla's order by desc is broken at the moment.
*/
async fn find_ts_msp_bck_workaround(
rt: &RetentionTime,
series: u64,
range: ScyllaSeriesRange,
stmts: &StmtsEvents,
scy: &Session,
) -> Result<VecDeque<TsMs>, Error> {
let selfname = "find_ts_msp_bck_workaround";
let mut ret = VecDeque::new();
let params = (series as i64, 0 as i64, i64::MAX);
log_fetch_result!("{selfname} {:?}", params);
let mut res = scy
.execute_iter(stmts.rt(rt).ts_msp_fwd().clone(), params)
.await?
.rows_stream::<(i64,)>()?;
while let Some(row) = res.try_next().await? {
let ts = TsMs::from_ms_u64(row.0 as u64);
log_fetch_result!("{selfname} {params:?} {ts}");
ret.push_back(ts);
if ret.len() > 1024 * 1024 {
return Err(Error::TooManyRows);
}
}
if ret.len() > 1024 * 2 {
log::info!("quite many ts_msp values in reverse lookup len {}", ret.len());
}
if ret.len() > 1024 * 64 {
log::warn!("quite many ts_msp values in reverse lookup len {}", ret.len());
}
let ret = if ret.len() > 2 {
let tmp: Vec<_> = ret.into_iter().rev().take(2).collect();
tmp.into_iter().rev().collect()
} else {
ret
};
Ok(ret)
}

View File

@@ -2,6 +2,10 @@ use netpod::ttl::RetentionTime;
use scylla::client::session::Session;
use scylla::statement::prepared::PreparedStatement;
macro_rules! info_prepare {
($($arg:tt)*) => { log::info!("prepare cql {}", format_args!($($arg)*)); };
}
autoerr::create_error_v1!(
name(Error, "ScyllaPrepare"),
enum variants {
@@ -62,11 +66,7 @@ pub struct StmtsLspDir {
impl StmtsLspDir {
pub fn shape(&self, array: bool) -> &StmtsLspShape {
if array {
&self.array
} else {
&self.scalar
}
if array { &self.array } else { &self.scalar }
}
}
@@ -93,17 +93,9 @@ impl StmtsEventsRt {
pub fn lsp(&self, bck: bool, val: bool) -> &StmtsLspDir {
if bck {
if val {
&self.lsp_bck_val
} else {
&self.lsp_bck_ts
}
if val { &self.lsp_bck_val } else { &self.lsp_bck_ts }
} else {
if val {
&self.lsp_fwd_val
} else {
&self.lsp_fwd_ts
}
if val { &self.lsp_fwd_val } else { &self.lsp_fwd_ts }
}
}
@@ -116,7 +108,13 @@ impl StmtsEventsRt {
}
}
async fn make_msp_dir(ks: &str, rt: &RetentionTime, bck: bool, scy: &Session) -> Result<PreparedStatement, Error> {
async fn make_msp_dir(
ks: &str,
rt: &RetentionTime,
bck: bool,
query_opts: &str,
scy: &Session,
) -> Result<PreparedStatement, Error> {
let table_name = "ts_msp";
let select_cond = if bck {
"ts_msp < ? order by ts_msp desc limit 2"
@@ -124,12 +122,14 @@ async fn make_msp_dir(ks: &str, rt: &RetentionTime, bck: bool, scy: &Session) ->
"ts_msp >= ? and ts_msp < ? limit 20"
};
let cql = format!(
"select ts_msp from {}.{}{} where series = ? and {}",
"select ts_msp from {}.{}{} where series = ? and {} {}",
ks,
rt.table_prefix(),
table_name,
select_cond
select_cond,
query_opts
);
info_prepare!("{ks} {rt} {cql}");
let qu = scy.prepare(cql).await?;
Ok(qu)
}
@@ -141,6 +141,7 @@ async fn make_lsp(
stname: &str,
values: &str,
bck: bool,
query_opts: &str,
scy: &Session,
) -> Result<PreparedStatement, Error> {
let select_cond = if bck {
@@ -151,15 +152,17 @@ async fn make_lsp(
let cql = format!(
concat!(
"select {} from {}.{}events_{}_{}",
" where series = ? and ts_msp = ? and {}"
" where series = ? and ts_msp = ? and {} {}"
),
values,
ks,
rt.table_prefix(),
shapepre,
stname,
select_cond
select_cond,
query_opts
);
info_prepare!("{ks} {rt} {cql}");
let qu = scy.prepare(cql).await?;
Ok(qu)
}
@@ -170,6 +173,7 @@ async fn make_lsp_shape(
shapepre: &str,
values: &str,
bck: bool,
query_opts: &str,
scy: &Session,
) -> Result<StmtsLspShape, Error> {
let values = if shapepre.contains("array") {
@@ -178,7 +182,7 @@ async fn make_lsp_shape(
values.into()
};
let values = &values;
let maker = |stname| make_lsp(ks, rt, shapepre, stname, values, bck, scy);
let maker = |stname| make_lsp(ks, rt, shapepre, stname, values, bck, query_opts, scy);
let ret = StmtsLspShape {
u8: maker("u8").await?,
u16: maker("u16").await?,
@@ -193,11 +197,28 @@ async fn make_lsp_shape(
bool: maker("bool").await?,
string: maker("string").await?,
enumvals: if shapepre == "scalar" {
make_lsp(ks, rt, shapepre, "enum", "ts_lsp, value, valuestr", bck, scy).await?
make_lsp(
ks,
rt,
shapepre,
"enum",
"ts_lsp, value, valuestr",
bck,
query_opts,
scy,
)
.await?
} else {
// exists only for scalar, therefore produce some dummy here
let table_name = "ts_msp";
let cql = format!("select ts_msp from {}.{}{} limit 1", ks, rt.table_prefix(), table_name);
let cql = format!(
"select ts_msp from {}.{}{} limit 1 {}",
ks,
rt.table_prefix(),
table_name,
query_opts
);
info_prepare!("{ks} {rt} {cql}");
let qu = scy.prepare(cql).await?;
qu
},
@@ -210,54 +231,71 @@ async fn make_lsp_dir(
rt: &RetentionTime,
values: &str,
bck: bool,
query_opts: &str,
scy: &Session,
) -> Result<StmtsLspDir, Error> {
let ret = StmtsLspDir {
scalar: make_lsp_shape(ks, rt, "scalar", values, bck, scy).await?,
array: make_lsp_shape(ks, rt, "array", values, bck, scy).await?,
scalar: make_lsp_shape(ks, rt, "scalar", values, bck, query_opts, scy).await?,
array: make_lsp_shape(ks, rt, "array", values, bck, query_opts, scy).await?,
};
Ok(ret)
}
async fn make_prebinned_f32(ks: &str, rt: &RetentionTime, scy: &Session) -> Result<PreparedStatement, Error> {
async fn make_prebinned_f32(
ks: &str,
rt: &RetentionTime,
query_opts: &str,
scy: &Session,
) -> Result<PreparedStatement, Error> {
let cql = format!(
concat!(
"select off, cnt, min, max, avg, lst from {}.{}binned_scalar_f32_v02",
" where series = ? and binlen = ? and msp = ?",
" and off >= ? and off < ?"
" and off >= ? and off < ?",
" {}"
),
ks,
rt.table_prefix()
rt.table_prefix(),
query_opts
);
info_prepare!("{ks} {rt} {cql}");
let qu = scy.prepare(cql).await?;
Ok(qu)
}
async fn make_bin_write_index_read(ks: &str, rt: &RetentionTime, scy: &Session) -> Result<PreparedStatement, Error> {
async fn make_bin_write_index_read(
ks: &str,
rt: &RetentionTime,
query_opts: &str,
scy: &Session,
) -> Result<PreparedStatement, Error> {
let cql = format!(
concat!(
"select lsp, binlen",
" from {}.{}bin_write_index_v04",
" where series = ? and pbp = ? and msp = ?",
" and lsp >= ? and lsp < ?",
" {}"
),
ks,
rt.table_prefix()
rt.table_prefix(),
query_opts
);
info_prepare!("{ks} {rt} {cql}");
let qu = scy.prepare(cql).await?;
Ok(qu)
}
async fn make_rt(ks: &str, rt: &RetentionTime, scy: &Session) -> Result<StmtsEventsRt, Error> {
async fn make_rt(ks: &str, rt: &RetentionTime, query_opts: &str, scy: &Session) -> Result<StmtsEventsRt, Error> {
let ret = StmtsEventsRt {
ts_msp_fwd: make_msp_dir(ks, rt, false, scy).await?,
ts_msp_bck: make_msp_dir(ks, rt, true, scy).await?,
lsp_fwd_val: make_lsp_dir(ks, rt, "ts_lsp, value", false, scy).await?,
lsp_bck_val: make_lsp_dir(ks, rt, "ts_lsp, value", true, scy).await?,
lsp_fwd_ts: make_lsp_dir(ks, rt, "ts_lsp", false, scy).await?,
lsp_bck_ts: make_lsp_dir(ks, rt, "ts_lsp", true, scy).await?,
prebinned_f32: make_prebinned_f32(ks, rt, scy).await?,
bin_write_index_read: make_bin_write_index_read(ks, rt, scy).await?,
ts_msp_fwd: make_msp_dir(ks, rt, false, query_opts, scy).await?,
ts_msp_bck: make_msp_dir(ks, rt, true, query_opts, scy).await?,
lsp_fwd_val: make_lsp_dir(ks, rt, "ts_lsp, value", false, query_opts, scy).await?,
lsp_bck_val: make_lsp_dir(ks, rt, "ts_lsp, value", true, query_opts, scy).await?,
lsp_fwd_ts: make_lsp_dir(ks, rt, "ts_lsp", false, query_opts, scy).await?,
lsp_bck_ts: make_lsp_dir(ks, rt, "ts_lsp", true, query_opts, scy).await?,
prebinned_f32: make_prebinned_f32(ks, rt, query_opts, scy).await?,
bin_write_index_read: make_bin_write_index_read(ks, rt, query_opts, scy).await?,
};
Ok(ret)
}
@@ -270,11 +308,12 @@ pub struct StmtsEvents {
}
impl StmtsEvents {
pub async fn new(ks: [&str; 3], scy: &Session) -> Result<Self, Error> {
pub async fn new(ks: [&str; 3], bypass_cache: bool, scy: &Session) -> Result<Self, Error> {
let query_opts = if bypass_cache { "bypass cache" } else { "" };
let ret = StmtsEvents {
st: make_rt(ks[0], &RetentionTime::Short, scy).await?,
mt: make_rt(ks[1], &RetentionTime::Medium, scy).await?,
lt: make_rt(ks[2], &RetentionTime::Long, scy).await?,
st: make_rt(ks[0], &RetentionTime::Short, query_opts, scy).await?,
mt: make_rt(ks[1], &RetentionTime::Medium, query_opts, scy).await?,
lt: make_rt(ks[2], &RetentionTime::Long, query_opts, scy).await?,
};
Ok(ret)
}
@@ -287,53 +326,3 @@ impl StmtsEvents {
}
}
}
#[derive(Debug)]
pub struct StmtsCache {
st_write_f32: PreparedStatement,
st_read_f32: PreparedStatement,
}
impl StmtsCache {
pub async fn new(ks: &str, scy: &Session) -> Result<Self, Error> {
let rt = RetentionTime::Short;
let st_write_f32 = scy
.prepare(format!(
concat!(
"insert into {}.{}binned_scalar_f32",
" (series, bin_len_ms, ts_msp, off, count, min, max, avg, lst)",
" values (?, ?, ?, ?, ?, ?, ?, ?, ?)"
),
ks,
rt.table_prefix()
))
.await?;
let st_read_f32 = scy
.prepare(format!(
concat!(
"select off, count, min, max, avg, lst",
" from {}.{}binned_scalar_f32",
" where series = ?",
" and bin_len_ms = ?",
" and ts_msp = ?",
" and off >= ? and off < ?"
),
ks,
rt.table_prefix()
))
.await?;
let ret = Self {
st_write_f32,
st_read_f32,
};
Ok(ret)
}
pub fn st_write_f32(&self) -> &PreparedStatement {
&self.st_write_f32
}
pub fn st_read_f32(&self) -> &PreparedStatement {
&self.st_read_f32
}
}

View File

@@ -5,27 +5,27 @@ use crate::events2::prepare::StmtsEvents;
use crate::range::ScyllaSeriesRange;
use async_channel::Receiver;
use async_channel::Sender;
use daqbuf_series::SeriesId;
use daqbuf_series::msp::MspU32;
use daqbuf_series::msp::PrebinnedPartitioning;
use daqbuf_series::SeriesId;
use futures_util::Future;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use items_0::timebin::BinningggContainerEventsDyn;
use items_2::binning::container_bins::ContainerBins;
use netpod::log;
use netpod::ttl::RetentionTime;
use netpod::DtMs;
use netpod::ScyllaConfig;
use netpod::TsMs;
use netpod::log;
use netpod::ttl::RetentionTime;
use scylla::client::session::Session;
use std::collections::VecDeque;
use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ); }
macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); }
macro_rules! info { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ); }
macro_rules! debug { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ); }
const CONCURRENT_QUERIES_PER_WORKER: usize = 80;
const SCYLLA_WORKER_QUEUE_LEN: usize = 200;
@@ -43,6 +43,7 @@ autoerr::create_error_v1!(
Toplist(#[from] crate::accounting::toplist::Error),
MissingKeyspaceConfig,
CacheWriteF32(#[from] streams::timebin::cached::reader::Error),
ScyllaPrepare(#[from] scylla::errors::PrepareError),
ScyllaType(#[from] scylla::deserialize::TypeCheckError),
ScyllaNextRow(#[from] scylla::errors::NextRowError),
ScyllaPagerExecution(#[from] scylla::errors::PagerExecutionError),
@@ -118,6 +119,28 @@ impl BinWriteIndexRead {
}
}
#[derive(Debug)]
struct PrepareV1 {
cql: String,
tx: Sender<Result<scylla::statement::prepared::PreparedStatement, Error>>,
}
struct ExecuteV1 {
st: scylla::statement::prepared::PreparedStatement,
params: Box<dyn scylla::serialize::row::SerializeRow + Send>,
tx: Sender<Result<scylla::client::pager::QueryPager, Error>>,
}
impl fmt::Debug for ExecuteV1 {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("ExecuteV1")
.field("st", &self.st)
.field("params", &"...")
.field("tx", &"...")
.finish()
}
}
#[derive(Debug)]
enum Job {
FindTsMsp(
@@ -141,6 +164,8 @@ enum Job {
),
ReadPrebinnedF32(ReadPrebinnedF32),
BinWriteIndexRead(BinWriteIndexRead),
PrepareV1(PrepareV1),
ExecuteV1(ExecuteV1),
}
struct ReadNextValues {
@@ -297,6 +322,38 @@ impl ScyllaQueue {
.map_err(|_| streams::timebin::cached::reader::Error::ChannelRecv)??;
Ok(res)
}
pub async fn prepare(&self, cql: String) -> Result<scylla::statement::prepared::PreparedStatement, Error> {
let (tx, rx) = async_channel::bounded(1);
let job = Job::PrepareV1(PrepareV1 { cql, tx });
self.tx
.send(job)
.await
.map_err(|_| streams::timebin::cached::reader::Error::ChannelSend)?;
let res = rx
.recv()
.await
.map_err(|_| streams::timebin::cached::reader::Error::ChannelRecv)??;
Ok(res)
}
pub async fn execute(
&self,
st: scylla::statement::prepared::PreparedStatement,
) -> Result<scylla::client::pager::QueryPager, Error> {
let (tx, rx) = async_channel::bounded(1);
let params = Box::new(());
let job = Job::ExecuteV1(ExecuteV1 { st, params, tx });
self.tx
.send(job)
.await
.map_err(|_| streams::timebin::cached::reader::Error::ChannelSend)?;
let res = rx
.recv()
.await
.map_err(|_| streams::timebin::cached::reader::Error::ChannelRecv)??;
Ok(res)
}
}
#[derive(Debug)]
@@ -333,7 +390,12 @@ impl ScyllaWorker {
self.scyconf_lt.keyspace.as_str(),
];
debug!("scylla worker prepare start");
let stmts = StmtsEvents::new(kss.try_into().map_err(|_| Error::MissingKeyspaceConfig)?, &scy).await?;
let stmts = StmtsEvents::new(
kss.try_into().map_err(|_| Error::MissingKeyspaceConfig)?,
self.scyconf_st.bypass_cache,
&scy,
)
.await?;
let stmts = Arc::new(stmts);
// let stmts_cache = StmtsCache::new(kss[0], &scy).await?;
// let stmts_cache = Arc::new(stmts_cache);
@@ -390,6 +452,16 @@ impl ScyllaWorker {
}
}
Job::BinWriteIndexRead(job) => job.execute(&stmts, &scy).await,
Job::PrepareV1(job) => {
let res = scy.prepare(job.cql).await.map_err(|e| e.into());
// TODO log?
let _ = job.tx.send(res).await;
}
Job::ExecuteV1(job) => {
let res = scy.execute_iter(job.st, job.params).await.map_err(|e| e.into());
// TODO log?
let _ = job.tx.send(res).await;
}
}
})
.buffer_unordered(CONCURRENT_QUERIES_PER_WORKER)