diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index 42b51e4..40289f4 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqretrieve" -version = "0.5.5-aa.14" +version = "0.5.5-aa.15" authors = ["Dominik Werder "] edition = "2024" diff --git a/crates/httpclient/src/httpclient.rs b/crates/httpclient/src/httpclient.rs index 128e464..b3a80f2 100644 --- a/crates/httpclient/src/httpclient.rs +++ b/crates/httpclient/src/httpclient.rs @@ -191,6 +191,14 @@ impl From> for ToJsonBody { } } +impl From for ToJsonBody { + fn from(value: String) -> Self { + Self { + body: value.into_bytes(), + } + } +} + impl From<&S> for ToJsonBody { fn from(value: &S) -> Self { Self { diff --git a/crates/httpret/src/api4.rs b/crates/httpret/src/api4.rs index be21a19..aaed32f 100644 --- a/crates/httpret/src/api4.rs +++ b/crates/httpret/src/api4.rs @@ -4,6 +4,7 @@ pub mod binned; pub mod binned_v2; pub mod binwriteindex; pub mod databuffer_tools; +pub mod datasearch; pub mod docs; pub mod eventdata; pub mod events; diff --git a/crates/httpret/src/api4/datasearch.rs b/crates/httpret/src/api4/datasearch.rs new file mode 100644 index 0000000..219e348 --- /dev/null +++ b/crates/httpret/src/api4/datasearch.rs @@ -0,0 +1,148 @@ +use crate::bodystream::response; +use crate::requests::accepts_json_or_all; +use crate::ReqCtx; +use crate::ServiceSharedResources; +use dbconn::worker::PgQueue; +use futures_util::StreamExt; +use futures_util::TryStreamExt; +use http::Method; +use http::StatusCode; +use httpclient::body_empty; +use httpclient::body_string; +use httpclient::IntoBody; +use httpclient::Requ; +use httpclient::StreamResponse; +use httpclient::ToJsonBody; +use netpod::log; +use netpod::req_uri_to_url; +use netpod::ttl::RetentionTime; +use netpod::FromUrl; +use netpod::NodeConfigCached; +use netpod::ScalarType; +use netpod::Shape; +use netpod::TsMs; +use netpod::UriError; +use serde::Deserialize; +use serde::Serialize; +use std::collections::BTreeMap; + +autoerr::create_error_v1!( + name(Error, "DataSearch"), + enum variants { + NoScylla, + Uri(#[from] UriError), + Json(#[from] serde_json::Error), + Http(#[from] http::Error), + ScyllaWorker(#[from] scyllaconn::worker::Error), + HttpBody(#[from] httpclient::BodyError), + ScyllaType(#[from] scyllaconn::scylla::errors::TypeCheckError), + ScyllaNextRow(#[from] scyllaconn::scylla::errors::NextRowError), + }, +); + +#[derive(Debug, Serialize, Deserialize)] +pub struct AccountedIngested { + names: Vec, + counts: Vec, + bytes: Vec, + scalar_types: Vec, + shapes: Vec, +} + +impl AccountedIngested { + fn new() -> Self { + Self { + names: Vec::new(), + counts: Vec::new(), + bytes: Vec::new(), + scalar_types: Vec::new(), + shapes: Vec::new(), + } + } + + fn push(&mut self, name: String, counts: u64, bytes: u64, scalar_type: ScalarType, shape: Shape) { + self.names.push(name); + self.counts.push(counts); + self.bytes.push(bytes); + self.scalar_types.push(scalar_type); + self.shapes.push(shape); + } + + #[allow(unused)] + fn truncate(&mut self, len: usize) { + self.names.truncate(len); + self.counts.truncate(len); + self.bytes.truncate(len); + self.scalar_types.truncate(len); + self.shapes.truncate(len); + } +} + +pub struct DataSearch {} + +impl DataSearch { + pub fn handler(req: &Requ) -> Option { + if req.uri().path().starts_with("/api/4/private/search/data") { + Some(Self {}) + } else { + None + } + } + + pub async fn handle( + &self, + req: Requ, + ctx: &ReqCtx, + shared_res: &ServiceSharedResources, + ncc: &NodeConfigCached, + ) -> Result { + if req.method() == Method::GET { + if accepts_json_or_all(req.headers()) { + match self.handle_get(req, ctx, shared_res, ncc).await { + Ok(x) => Ok(x), + Err(e) => { + let s = serde_json::to_string(&e.to_string())?; + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_string(s))?) + } + } + } else { + Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?) + } + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) + } + } + + async fn handle_get( + &self, + req: Requ, + ctx: &ReqCtx, + shared_res: &ServiceSharedResources, + ncc: &NodeConfigCached, + ) -> Result { + let url = req_uri_to_url(req.uri())?; + let params: BTreeMap<_, _> = url.query_pairs().collect(); + if let Some(scyqu) = &shared_res.scyqueue { + let cql = "select ts_msp from sls_st.st_ts_msp where series = 6293882751490541488"; + let st = scyqu.prepare(cql.into()).await?; + let qp = scyqu.execute(st).await?; + let mut it = qp.rows_stream::<(i64,)>()?; + let mut msps_in_table = Vec::new(); + while let Some(row) = it.try_next().await? { + msps_in_table.push(row.0); + } + let body = serde_json::json!({ + "msps_in_table": msps_in_table, + }); + let body = ToJsonBody::from(&body).into_body(); + Ok(response(StatusCode::OK).body(body)?) + } else { + Err(Error::NoScylla) + } + // let ret = serde_json::json!({ + // "key": "value", + // }); + // let body = ToJsonBody::from(&ret).into_body(); + // Ok(response(StatusCode::OK).body(body)?) + } +} diff --git a/crates/httpret/src/channelconfig.rs b/crates/httpret/src/channelconfig.rs index baf0ef9..0fd01d7 100644 --- a/crates/httpret/src/channelconfig.rs +++ b/crates/httpret/src/channelconfig.rs @@ -1,3 +1,4 @@ +use crate::requests::accepts_json_or_all; use crate::response; use crate::ServiceSharedResources; use dbconn::create_connection; @@ -166,12 +167,7 @@ impl ChannelConfigHandler { node_config: &NodeConfigCached, ) -> Result { if req.method() == Method::GET { - let accept_def = APP_JSON; - let accept = req - .headers() - .get(http::header::ACCEPT) - .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); - if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { + if accepts_json_or_all(req.headers()) { match self.channel_config(req, pgqueue, &node_config).await { Ok(k) => Ok(k), Err(e) => { diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 54f2860..932033d 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -478,6 +478,8 @@ async fn http_service_inner( Ok(h.handle(req, &node_config).await?) } else if let Some(h) = api1::RequestStatusHandler::handler(&req) { Ok(h.handle(req, ctx, &node_config).await?) + } else if let Some(h) = api4::datasearch::DataSearch::handler(&req) { + Ok(h.handle(req, ctx, &shared_res, &node_config).await?) } else if let Some(h) = api4::docs::DocsHandler::handler(&req) { Ok(h.handle(req, ctx).await?) } else { diff --git a/crates/scyllaconn/src/accounting/toplist.rs b/crates/scyllaconn/src/accounting/toplist.rs index d06896d..9f6586c 100644 --- a/crates/scyllaconn/src/accounting/toplist.rs +++ b/crates/scyllaconn/src/accounting/toplist.rs @@ -1,8 +1,8 @@ use crate::log::*; use futures_util::TryStreamExt; -use netpod::ttl::RetentionTime; -use netpod::TsMs; use netpod::EMIT_ACCOUNTING_SNAP; +use netpod::TsMs; +use netpod::ttl::RetentionTime; use scylla::client::session::Session as ScySession; use scylla::statement::prepared::PreparedStatement; @@ -107,12 +107,13 @@ async fn read_ts_inner(ks: &str, rt: RetentionTime, ts: TsMs, scy: &ScySession) concat!( "select series, count, bytes", " from {}.{}account_00", - " where part = ? and ts = ?" + " where part = ? and ts = ?", + " bypass cache" ), ks, rt.table_prefix() ); - let qu = prep(&cql, scy).await?; + let qu = scy.prepare(cql).await?; let ts_sec = ts.ms() as i64 / 1000; let mut ret = UsageData::new(ts); for part in 0..255_u32 { @@ -132,7 +133,3 @@ async fn read_ts_inner(ks: &str, rt: RetentionTime, ts: TsMs, scy: &ScySession) ret.verify()?; Ok(ret) } - -async fn prep(cql: &str, scy: &ScySession) -> Result { - Ok(scy.prepare(cql).await?) -} diff --git a/crates/scyllaconn/src/accounting/totals.rs b/crates/scyllaconn/src/accounting/totals.rs index 8b99abe..93fe50f 100644 --- a/crates/scyllaconn/src/accounting/totals.rs +++ b/crates/scyllaconn/src/accounting/totals.rs @@ -166,7 +166,8 @@ impl Stream for AccountingStreamScylla { } break match &mut self.state { FrState::New => { - let cql = concat!("select series, count, bytes from account_00 where part = ? and ts = ?"); + let cql = + concat!("select series, count, bytes from account_00 where part = ? and ts = ? bypass cache"); let fut = prep(cql, self.scy.clone()); let fut: PrepFut = Box::pin(fut); self.state = FrState::Prepare(fut); diff --git a/crates/scyllaconn/src/bincache.rs b/crates/scyllaconn/src/bincache.rs index 2e6b2f6..ef580f2 100644 --- a/crates/scyllaconn/src/bincache.rs +++ b/crates/scyllaconn/src/bincache.rs @@ -1,14 +1,12 @@ -use crate::events2::prepare::StmtsCache; use crate::events2::prepare::StmtsEvents; -use crate::log::*; use crate::worker::ScyllaQueue; use daqbuf_series::msp::PrebinnedPartitioning; use futures_util::TryStreamExt; use items_0::merge::MergeableTy; use items_2::binning::container_bins::ContainerBins; -use netpod::ttl::RetentionTime; use netpod::DtMs; use netpod::TsNano; +use netpod::ttl::RetentionTime; use std::ops::Range; use streams::timebin::cached::reader::BinsReadRes; @@ -93,41 +91,6 @@ impl streams::timebin::CacheReadProvider for ScyllaPrebinnedReadProvider { } } -pub async fn worker_write( - series: u64, - bins: ContainerBins, - stmts_cache: &StmtsCache, - scy: &ScySession, -) -> Result<(), streams::timebin::cached::reader::Error> { - if true { - error!("TODO retrieval should probably not write a cache at all"); - return Err(streams::timebin::cached::reader::Error::TodoImpl); - } - for (((((((&ts1, &ts2), &cnt), min), max), avg), lst), _fnl) in bins.zip_iter() { - let bin_len = DtMs::from_ms_u64((ts2.ns() - ts1.ns()) / 1000000); - // let div = streams::timebin::cached::reader::part_len(bin_len).ns(); - let div = 42424242424242; - let msp = ts1.ns() / div; - let off = (ts1.ns() - msp * div) / bin_len.ns(); - let params = ( - series as i64, - bin_len.ms() as i32, - msp as i64, - off as i32, - cnt as i64, - min, - max, - avg, - lst, - ); - // trace!("cache write {:?}", params); - scy.execute_unpaged(stmts_cache.st_write_f32(), params) - .await - .map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?; - } - Ok(()) -} - pub async fn worker_read( rt: RetentionTime, series: u64, diff --git a/crates/scyllaconn/src/conn.rs b/crates/scyllaconn/src/conn.rs index 16e21ab..f0c0102 100644 --- a/crates/scyllaconn/src/conn.rs +++ b/crates/scyllaconn/src/conn.rs @@ -1,5 +1,5 @@ use netpod::ScyllaConfig; -use netpod::log::*; +use netpod::log; use scylla::client::execution_profile::ExecutionProfileBuilder; use scylla::client::session::Session; use scylla::client::session_builder::SessionBuilder; @@ -24,13 +24,13 @@ pub async fn create_scy_session(scyconf: &ScyllaConfig) -> Result, } pub async fn create_scy_session_no_ks(scyconf: &ScyllaConfig) -> Result { - info!("creating scylla connection"); + log::info!("creating scylla connection"); let scy = SessionBuilder::new() .pool_size(scylla::client::PoolSize::PerHost(NonZero::new(4).unwrap())) .known_nodes(&scyconf.hosts) .default_execution_profile_handle( ExecutionProfileBuilder::default() - .consistency(Consistency::Quorum) + .consistency(Consistency::All) .build() .into_handle(), ) diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index 672a6bd..d99787c 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -54,9 +54,13 @@ macro_rules! trace_redo_fwd_read { ($($arg:expr),*) => ( if false { log::trace!( macro_rules! trace_emit { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) } -macro_rules! trace_every_event { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) } +macro_rules! trace_every_event { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ) } -macro_rules! warn_item { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ) } +macro_rules! warn_item { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ) } + +macro_rules! log_fetch_result { + ($($arg:tt)*) => { if false { log::trace!("fetch {}", format_args!($($arg)*)); } }; +} #[derive(Debug, Clone)] pub struct EventReadOpts { @@ -908,6 +912,7 @@ where ); trace_fetch!("FWD event search params {:?}", params); jobtrace.add_event_now(ReadEventKind::CallExecuteIter); + log_fetch_result!("read_next_values_2 {params:?}"); let res = scy.execute_iter(qu.clone(), params).await?; { let mut ret = ::Container::empty(); @@ -926,6 +931,7 @@ where let mut it = res.rows_stream::<::ScyRowTy>()?; while let Some(row) = it.try_next().await? { let (ts, value) = ::scy_row_to_ts_val(ts_msp, row); + log_fetch_result!("read_next_values_2 {params:?} {ts}"); // let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); // let value = ::from_scyty(row.1); ret.push(ts, value); diff --git a/crates/scyllaconn/src/events2/msp.rs b/crates/scyllaconn/src/events2/msp.rs index 54d4430..1b2ab88 100644 --- a/crates/scyllaconn/src/events2/msp.rs +++ b/crates/scyllaconn/src/events2/msp.rs @@ -6,19 +6,23 @@ use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; use futures_util::TryStreamExt; -use netpod::log; -use netpod::ttl::RetentionTime; use netpod::TsMs; use netpod::TsMsVecFmt; +use netpod::log; +use netpod::ttl::RetentionTime; use scylla::client::session::Session; use std::collections::VecDeque; use std::pin::Pin; use std::task::Context; use std::task::Poll; -macro_rules! trace_emit { ($det:expr, $($arg:expr),*) => ( if $det { log::trace!($($arg),*); } ) } +macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if $det { log::trace!($($arg)*); } ) } -macro_rules! trace_msp { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) } +macro_rules! trace_msp { ($($arg:tt)*) => ( if true { log::trace!($($arg)*); } ) } + +macro_rules! log_fetch_result { + ($($arg:tt)*) => { if false { log::trace!("fetch {}", format_args!($($arg)*)); } }; +} autoerr::create_error_v1!( name(Error, "EventsMsp"), @@ -28,6 +32,7 @@ autoerr::create_error_v1!( ScyllaRow(#[from] scylla::errors::NextRowError), ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError), ScyllaPagerExecution(#[from] scylla::errors::PagerExecutionError), + TooManyRows, }, ); @@ -60,11 +65,7 @@ where } fn is_taken(&self) -> bool { - if let Self::Taken = self { - true - } else { - false - } + if let Self::Taken = self { true } else { false } } } @@ -313,7 +314,7 @@ pub async fn find_ts_msp( bck ); if bck { - find_ts_msp_bck(rt, series, range, stmts, scy).await + find_ts_msp_bck_workaround(rt, series, range, stmts, scy).await } else { find_ts_msp_fwd(rt, series, range, stmts, scy).await } @@ -326,15 +327,18 @@ async fn find_ts_msp_fwd( stmts: &StmtsEvents, scy: &Session, ) -> Result, Error> { + let selfname = "find_ts_msp_fwd"; let mut ret = VecDeque::new(); // TODO time range truncation can be handled better let params = (series as i64, range.beg().ms() as i64, 1 + range.end().ms() as i64); + log_fetch_result!("{selfname} {:?}", params); let mut res = scy .execute_iter(stmts.rt(rt).ts_msp_fwd().clone(), params) .await? .rows_stream::<(i64,)>()?; while let Some(row) = res.try_next().await? { let ts = TsMs::from_ms_u64(row.0 as u64); + log_fetch_result!("{selfname} {params:?} {ts}"); ret.push_back(ts); } Ok(ret) @@ -347,15 +351,59 @@ async fn find_ts_msp_bck( stmts: &StmtsEvents, scy: &Session, ) -> Result, Error> { + let selfname = "find_ts_msp_bck"; let mut ret = VecDeque::new(); let params = (series as i64, range.beg().ms() as i64); + log_fetch_result!("{selfname} {:?}", params); let mut res = scy .execute_iter(stmts.rt(rt).ts_msp_bck().clone(), params) .await? .rows_stream::<(i64,)>()?; while let Some(row) = res.try_next().await? { let ts = TsMs::from_ms_u64(row.0 as u64); + log_fetch_result!("{selfname} {params:?} {ts}"); ret.push_front(ts); } Ok(ret) } + +/* +Workaround because scylla's order by desc is broken at the moment. +*/ +async fn find_ts_msp_bck_workaround( + rt: &RetentionTime, + series: u64, + range: ScyllaSeriesRange, + stmts: &StmtsEvents, + scy: &Session, +) -> Result, Error> { + let selfname = "find_ts_msp_bck_workaround"; + let mut ret = VecDeque::new(); + let params = (series as i64, 0 as i64, i64::MAX); + log_fetch_result!("{selfname} {:?}", params); + let mut res = scy + .execute_iter(stmts.rt(rt).ts_msp_fwd().clone(), params) + .await? + .rows_stream::<(i64,)>()?; + while let Some(row) = res.try_next().await? { + let ts = TsMs::from_ms_u64(row.0 as u64); + log_fetch_result!("{selfname} {params:?} {ts}"); + ret.push_back(ts); + if ret.len() > 1024 * 1024 { + return Err(Error::TooManyRows); + } + } + if ret.len() > 1024 * 2 { + log::info!("quite many ts_msp values in reverse lookup len {}", ret.len()); + } + if ret.len() > 1024 * 64 { + log::warn!("quite many ts_msp values in reverse lookup len {}", ret.len()); + } + let ret = if ret.len() > 2 { + let tmp: Vec<_> = ret.into_iter().rev().take(2).collect(); + tmp.into_iter().rev().collect() + } else { + ret + }; + Ok(ret) +} diff --git a/crates/scyllaconn/src/events2/prepare.rs b/crates/scyllaconn/src/events2/prepare.rs index 087713b..43c9747 100644 --- a/crates/scyllaconn/src/events2/prepare.rs +++ b/crates/scyllaconn/src/events2/prepare.rs @@ -2,6 +2,10 @@ use netpod::ttl::RetentionTime; use scylla::client::session::Session; use scylla::statement::prepared::PreparedStatement; +macro_rules! info_prepare { + ($($arg:tt)*) => { log::info!("prepare cql {}", format_args!($($arg)*)); }; +} + autoerr::create_error_v1!( name(Error, "ScyllaPrepare"), enum variants { @@ -62,11 +66,7 @@ pub struct StmtsLspDir { impl StmtsLspDir { pub fn shape(&self, array: bool) -> &StmtsLspShape { - if array { - &self.array - } else { - &self.scalar - } + if array { &self.array } else { &self.scalar } } } @@ -93,17 +93,9 @@ impl StmtsEventsRt { pub fn lsp(&self, bck: bool, val: bool) -> &StmtsLspDir { if bck { - if val { - &self.lsp_bck_val - } else { - &self.lsp_bck_ts - } + if val { &self.lsp_bck_val } else { &self.lsp_bck_ts } } else { - if val { - &self.lsp_fwd_val - } else { - &self.lsp_fwd_ts - } + if val { &self.lsp_fwd_val } else { &self.lsp_fwd_ts } } } @@ -116,7 +108,13 @@ impl StmtsEventsRt { } } -async fn make_msp_dir(ks: &str, rt: &RetentionTime, bck: bool, scy: &Session) -> Result { +async fn make_msp_dir( + ks: &str, + rt: &RetentionTime, + bck: bool, + query_opts: &str, + scy: &Session, +) -> Result { let table_name = "ts_msp"; let select_cond = if bck { "ts_msp < ? order by ts_msp desc limit 2" @@ -124,12 +122,14 @@ async fn make_msp_dir(ks: &str, rt: &RetentionTime, bck: bool, scy: &Session) -> "ts_msp >= ? and ts_msp < ? limit 20" }; let cql = format!( - "select ts_msp from {}.{}{} where series = ? and {}", + "select ts_msp from {}.{}{} where series = ? and {} {}", ks, rt.table_prefix(), table_name, - select_cond + select_cond, + query_opts ); + info_prepare!("{ks} {rt} {cql}"); let qu = scy.prepare(cql).await?; Ok(qu) } @@ -141,6 +141,7 @@ async fn make_lsp( stname: &str, values: &str, bck: bool, + query_opts: &str, scy: &Session, ) -> Result { let select_cond = if bck { @@ -151,15 +152,17 @@ async fn make_lsp( let cql = format!( concat!( "select {} from {}.{}events_{}_{}", - " where series = ? and ts_msp = ? and {}" + " where series = ? and ts_msp = ? and {} {}" ), values, ks, rt.table_prefix(), shapepre, stname, - select_cond + select_cond, + query_opts ); + info_prepare!("{ks} {rt} {cql}"); let qu = scy.prepare(cql).await?; Ok(qu) } @@ -170,6 +173,7 @@ async fn make_lsp_shape( shapepre: &str, values: &str, bck: bool, + query_opts: &str, scy: &Session, ) -> Result { let values = if shapepre.contains("array") { @@ -178,7 +182,7 @@ async fn make_lsp_shape( values.into() }; let values = &values; - let maker = |stname| make_lsp(ks, rt, shapepre, stname, values, bck, scy); + let maker = |stname| make_lsp(ks, rt, shapepre, stname, values, bck, query_opts, scy); let ret = StmtsLspShape { u8: maker("u8").await?, u16: maker("u16").await?, @@ -193,11 +197,28 @@ async fn make_lsp_shape( bool: maker("bool").await?, string: maker("string").await?, enumvals: if shapepre == "scalar" { - make_lsp(ks, rt, shapepre, "enum", "ts_lsp, value, valuestr", bck, scy).await? + make_lsp( + ks, + rt, + shapepre, + "enum", + "ts_lsp, value, valuestr", + bck, + query_opts, + scy, + ) + .await? } else { // exists only for scalar, therefore produce some dummy here let table_name = "ts_msp"; - let cql = format!("select ts_msp from {}.{}{} limit 1", ks, rt.table_prefix(), table_name); + let cql = format!( + "select ts_msp from {}.{}{} limit 1 {}", + ks, + rt.table_prefix(), + table_name, + query_opts + ); + info_prepare!("{ks} {rt} {cql}"); let qu = scy.prepare(cql).await?; qu }, @@ -210,54 +231,71 @@ async fn make_lsp_dir( rt: &RetentionTime, values: &str, bck: bool, + query_opts: &str, scy: &Session, ) -> Result { let ret = StmtsLspDir { - scalar: make_lsp_shape(ks, rt, "scalar", values, bck, scy).await?, - array: make_lsp_shape(ks, rt, "array", values, bck, scy).await?, + scalar: make_lsp_shape(ks, rt, "scalar", values, bck, query_opts, scy).await?, + array: make_lsp_shape(ks, rt, "array", values, bck, query_opts, scy).await?, }; Ok(ret) } -async fn make_prebinned_f32(ks: &str, rt: &RetentionTime, scy: &Session) -> Result { +async fn make_prebinned_f32( + ks: &str, + rt: &RetentionTime, + query_opts: &str, + scy: &Session, +) -> Result { let cql = format!( concat!( "select off, cnt, min, max, avg, lst from {}.{}binned_scalar_f32_v02", " where series = ? and binlen = ? and msp = ?", - " and off >= ? and off < ?" + " and off >= ? and off < ?", + " {}" ), ks, - rt.table_prefix() + rt.table_prefix(), + query_opts ); + info_prepare!("{ks} {rt} {cql}"); let qu = scy.prepare(cql).await?; Ok(qu) } -async fn make_bin_write_index_read(ks: &str, rt: &RetentionTime, scy: &Session) -> Result { +async fn make_bin_write_index_read( + ks: &str, + rt: &RetentionTime, + query_opts: &str, + scy: &Session, +) -> Result { let cql = format!( concat!( "select lsp, binlen", " from {}.{}bin_write_index_v04", " where series = ? and pbp = ? and msp = ?", " and lsp >= ? and lsp < ?", + " {}" ), ks, - rt.table_prefix() + rt.table_prefix(), + query_opts ); + info_prepare!("{ks} {rt} {cql}"); let qu = scy.prepare(cql).await?; Ok(qu) } -async fn make_rt(ks: &str, rt: &RetentionTime, scy: &Session) -> Result { +async fn make_rt(ks: &str, rt: &RetentionTime, query_opts: &str, scy: &Session) -> Result { let ret = StmtsEventsRt { - ts_msp_fwd: make_msp_dir(ks, rt, false, scy).await?, - ts_msp_bck: make_msp_dir(ks, rt, true, scy).await?, - lsp_fwd_val: make_lsp_dir(ks, rt, "ts_lsp, value", false, scy).await?, - lsp_bck_val: make_lsp_dir(ks, rt, "ts_lsp, value", true, scy).await?, - lsp_fwd_ts: make_lsp_dir(ks, rt, "ts_lsp", false, scy).await?, - lsp_bck_ts: make_lsp_dir(ks, rt, "ts_lsp", true, scy).await?, - prebinned_f32: make_prebinned_f32(ks, rt, scy).await?, - bin_write_index_read: make_bin_write_index_read(ks, rt, scy).await?, + ts_msp_fwd: make_msp_dir(ks, rt, false, query_opts, scy).await?, + ts_msp_bck: make_msp_dir(ks, rt, true, query_opts, scy).await?, + lsp_fwd_val: make_lsp_dir(ks, rt, "ts_lsp, value", false, query_opts, scy).await?, + lsp_bck_val: make_lsp_dir(ks, rt, "ts_lsp, value", true, query_opts, scy).await?, + lsp_fwd_ts: make_lsp_dir(ks, rt, "ts_lsp", false, query_opts, scy).await?, + lsp_bck_ts: make_lsp_dir(ks, rt, "ts_lsp", true, query_opts, scy).await?, + prebinned_f32: make_prebinned_f32(ks, rt, query_opts, scy).await?, + bin_write_index_read: make_bin_write_index_read(ks, rt, query_opts, scy).await?, }; Ok(ret) } @@ -270,11 +308,12 @@ pub struct StmtsEvents { } impl StmtsEvents { - pub async fn new(ks: [&str; 3], scy: &Session) -> Result { + pub async fn new(ks: [&str; 3], bypass_cache: bool, scy: &Session) -> Result { + let query_opts = if bypass_cache { "bypass cache" } else { "" }; let ret = StmtsEvents { - st: make_rt(ks[0], &RetentionTime::Short, scy).await?, - mt: make_rt(ks[1], &RetentionTime::Medium, scy).await?, - lt: make_rt(ks[2], &RetentionTime::Long, scy).await?, + st: make_rt(ks[0], &RetentionTime::Short, query_opts, scy).await?, + mt: make_rt(ks[1], &RetentionTime::Medium, query_opts, scy).await?, + lt: make_rt(ks[2], &RetentionTime::Long, query_opts, scy).await?, }; Ok(ret) } @@ -287,53 +326,3 @@ impl StmtsEvents { } } } - -#[derive(Debug)] -pub struct StmtsCache { - st_write_f32: PreparedStatement, - st_read_f32: PreparedStatement, -} - -impl StmtsCache { - pub async fn new(ks: &str, scy: &Session) -> Result { - let rt = RetentionTime::Short; - let st_write_f32 = scy - .prepare(format!( - concat!( - "insert into {}.{}binned_scalar_f32", - " (series, bin_len_ms, ts_msp, off, count, min, max, avg, lst)", - " values (?, ?, ?, ?, ?, ?, ?, ?, ?)" - ), - ks, - rt.table_prefix() - )) - .await?; - let st_read_f32 = scy - .prepare(format!( - concat!( - "select off, count, min, max, avg, lst", - " from {}.{}binned_scalar_f32", - " where series = ?", - " and bin_len_ms = ?", - " and ts_msp = ?", - " and off >= ? and off < ?" - ), - ks, - rt.table_prefix() - )) - .await?; - let ret = Self { - st_write_f32, - st_read_f32, - }; - Ok(ret) - } - - pub fn st_write_f32(&self) -> &PreparedStatement { - &self.st_write_f32 - } - - pub fn st_read_f32(&self) -> &PreparedStatement { - &self.st_read_f32 - } -} diff --git a/crates/scyllaconn/src/worker.rs b/crates/scyllaconn/src/worker.rs index 62baaee..f708aad 100644 --- a/crates/scyllaconn/src/worker.rs +++ b/crates/scyllaconn/src/worker.rs @@ -5,27 +5,27 @@ use crate::events2::prepare::StmtsEvents; use crate::range::ScyllaSeriesRange; use async_channel::Receiver; use async_channel::Sender; +use daqbuf_series::SeriesId; use daqbuf_series::msp::MspU32; use daqbuf_series::msp::PrebinnedPartitioning; -use daqbuf_series::SeriesId; use futures_util::Future; use futures_util::StreamExt; use futures_util::TryStreamExt; use items_0::timebin::BinningggContainerEventsDyn; use items_2::binning::container_bins::ContainerBins; -use netpod::log; -use netpod::ttl::RetentionTime; use netpod::DtMs; use netpod::ScyllaConfig; use netpod::TsMs; +use netpod::log; +use netpod::ttl::RetentionTime; use scylla::client::session::Session; use std::collections::VecDeque; use std::fmt; use std::pin::Pin; use std::sync::Arc; -macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ); } -macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); } +macro_rules! info { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ); } +macro_rules! debug { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ); } const CONCURRENT_QUERIES_PER_WORKER: usize = 80; const SCYLLA_WORKER_QUEUE_LEN: usize = 200; @@ -43,6 +43,7 @@ autoerr::create_error_v1!( Toplist(#[from] crate::accounting::toplist::Error), MissingKeyspaceConfig, CacheWriteF32(#[from] streams::timebin::cached::reader::Error), + ScyllaPrepare(#[from] scylla::errors::PrepareError), ScyllaType(#[from] scylla::deserialize::TypeCheckError), ScyllaNextRow(#[from] scylla::errors::NextRowError), ScyllaPagerExecution(#[from] scylla::errors::PagerExecutionError), @@ -118,6 +119,28 @@ impl BinWriteIndexRead { } } +#[derive(Debug)] +struct PrepareV1 { + cql: String, + tx: Sender>, +} + +struct ExecuteV1 { + st: scylla::statement::prepared::PreparedStatement, + params: Box, + tx: Sender>, +} + +impl fmt::Debug for ExecuteV1 { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("ExecuteV1") + .field("st", &self.st) + .field("params", &"...") + .field("tx", &"...") + .finish() + } +} + #[derive(Debug)] enum Job { FindTsMsp( @@ -141,6 +164,8 @@ enum Job { ), ReadPrebinnedF32(ReadPrebinnedF32), BinWriteIndexRead(BinWriteIndexRead), + PrepareV1(PrepareV1), + ExecuteV1(ExecuteV1), } struct ReadNextValues { @@ -297,6 +322,38 @@ impl ScyllaQueue { .map_err(|_| streams::timebin::cached::reader::Error::ChannelRecv)??; Ok(res) } + + pub async fn prepare(&self, cql: String) -> Result { + let (tx, rx) = async_channel::bounded(1); + let job = Job::PrepareV1(PrepareV1 { cql, tx }); + self.tx + .send(job) + .await + .map_err(|_| streams::timebin::cached::reader::Error::ChannelSend)?; + let res = rx + .recv() + .await + .map_err(|_| streams::timebin::cached::reader::Error::ChannelRecv)??; + Ok(res) + } + + pub async fn execute( + &self, + st: scylla::statement::prepared::PreparedStatement, + ) -> Result { + let (tx, rx) = async_channel::bounded(1); + let params = Box::new(()); + let job = Job::ExecuteV1(ExecuteV1 { st, params, tx }); + self.tx + .send(job) + .await + .map_err(|_| streams::timebin::cached::reader::Error::ChannelSend)?; + let res = rx + .recv() + .await + .map_err(|_| streams::timebin::cached::reader::Error::ChannelRecv)??; + Ok(res) + } } #[derive(Debug)] @@ -333,7 +390,12 @@ impl ScyllaWorker { self.scyconf_lt.keyspace.as_str(), ]; debug!("scylla worker prepare start"); - let stmts = StmtsEvents::new(kss.try_into().map_err(|_| Error::MissingKeyspaceConfig)?, &scy).await?; + let stmts = StmtsEvents::new( + kss.try_into().map_err(|_| Error::MissingKeyspaceConfig)?, + self.scyconf_st.bypass_cache, + &scy, + ) + .await?; let stmts = Arc::new(stmts); // let stmts_cache = StmtsCache::new(kss[0], &scy).await?; // let stmts_cache = Arc::new(stmts_cache); @@ -390,6 +452,16 @@ impl ScyllaWorker { } } Job::BinWriteIndexRead(job) => job.execute(&stmts, &scy).await, + Job::PrepareV1(job) => { + let res = scy.prepare(job.cql).await.map_err(|e| e.into()); + // TODO log? + let _ = job.tx.send(res).await; + } + Job::ExecuteV1(job) => { + let res = scy.execute_iter(job.st, job.params).await.map_err(|e| e.into()); + // TODO log? + let _ = job.tx.send(res).await; + } } }) .buffer_unordered(CONCURRENT_QUERIES_PER_WORKER)