diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index c0d4aea..a66dbf8 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -65,7 +65,9 @@ pub async fn scylla_channel_event_stream( .map(move |item| match item { Ok(k) => match k { ChannelEvents::Events(mut k) => { - if let SeriesKind::ChannelStatus = chconf.kind() { + if true { + Ok(ChannelEvents::Events(k)) + } else if let SeriesKind::ChannelStatus = chconf.kind() { type C1 = ContainerEvents; type C2 = ContainerEvents; if let Some(j) = k.as_any_mut().downcast_mut::() { diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index b41c578..e4327a5 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -42,11 +42,11 @@ use std::time::Duration; use std::time::Instant; use taskrun::tracing; -macro_rules! error { ($($arg:expr),*) => ( if true { log::error!($($arg),*); } ) } +macro_rules! error { ($($arg:tt)*) => ( if true { log::error!($($arg)*); } ) } -macro_rules! warn { ($($arg:expr),*) => ( if true { log::warn!($($arg),*); } ) } +macro_rules! warn { ($($arg:tt)*) => ( if true { log::warn!($($arg)*); } ) } -macro_rules! trace_init { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) } +macro_rules! trace_init { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ) } macro_rules! trace_fetch { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ) } @@ -260,6 +260,7 @@ pub enum ReadEventKind { ScyllaReadRowDone(u32), ReadNextValuesFutureDone, EventsStreamRtSees(u32), + ReadEventsLspAllDone, } #[derive(Debug)] @@ -298,7 +299,7 @@ impl fmt::Display for ReadJobTrace { #[derive(Debug)] pub(super) struct ReadNextValuesOpts { rt: RetentionTime, - series: u64, + series: SeriesId, ts_msp: TsMs, range: ScyllaSeriesRange, fwd: bool, @@ -306,6 +307,20 @@ pub(super) struct ReadNextValuesOpts { val_ty_dyn: Box, } +impl Clone for ReadNextValuesOpts { + fn clone(&self) -> Self { + Self { + rt: self.rt.clone(), + series: self.series.clone(), + ts_msp: self.ts_msp.clone(), + range: self.range.clone(), + fwd: self.fwd.clone(), + readopts: self.readopts.clone(), + val_ty_dyn: self.val_ty_dyn.clone_boxed(), + } + } +} + #[derive(Clone)] struct MakeFutInfo { scyqueue: ScyllaQueue, @@ -417,7 +432,6 @@ impl EventsStreamRt { ts_msp: TsMs, bck: bool, mfi: MakeFutInfo, - jobtrace: ReadJobTrace, ) -> Pin, ReadJobTrace), Error>> + Send>> { let scyqueue = mfi.scyqueue.clone(); let rt = mfi.rt.clone(); @@ -462,9 +476,8 @@ impl EventsStreamRt { fn setup_bck_read(&mut self) { if let Some(ts) = self.msp_buf_bck.pop_back() { trace_fetch!("setup_bck_read {}", ts.fmt()); - let jobtrace = ReadJobTrace::new(); let mfi = MakeFutInfo::new(self); - let fut = Self::make_read_events_fut(ts, true, mfi, jobtrace); + let fut = Self::make_read_events_fut(ts, true, mfi); self.state = State::ReadingBck(ReadingBck { reading_state: ReadingState::FetchEvents(FetchEvents::from_fut(fut)), }); @@ -494,9 +507,8 @@ impl EventsStreamRt { while qu.has_space() { if let Some(ts) = msp_buf.pop_front() { trace_fetch!("{selfname} {} FILL A SLOT", ts.fmt()); - let jobtrace = ReadJobTrace::new(); let mfi = st.make_fut_info.clone(); - let fut = Self::make_read_events_fut(ts, false, mfi, jobtrace); + let fut = Self::make_read_events_fut(ts, false, mfi); qu.push(fut); } else { break; @@ -880,7 +892,7 @@ async fn read_next_values_3_fwd( qu }; let params = ( - series as i64, + series.to_i64(), ts_msp.ms() as i64, ts_lsp_min.ns() as i64, ts_lsp_max.ns() as i64, @@ -899,6 +911,33 @@ async fn read_next_values_3_fwd( Ok((ret,)) } +async fn read_lsp_all( + opts: ReadNextValuesOpts, + stmts: Arc, + scy: Arc, + jobtrace: &mut ReadJobTrace, +) -> Result<(Vec,), Error> { + let selfname = "read_lsp_all"; + let mut qu = stmts + .rt(&opts.rt) + .lsp_all() + .shape(opts.val_ty_dyn.is_valueblob()) + .st(opts.val_ty_dyn.st_name())? + .clone(); + qu.set_page_size(1024 * 4); + let params = (opts.series.to_i64(), opts.ts_msp.ms() as i64); + trace_fetch!("{selfname} event search params {:?}", params); + jobtrace.add_event_now(ReadEventKind::CallExecuteIter); + let res = scy.execute_iter(qu.clone(), params).await?; + let mut ret = Vec::new(); + let mut it = res.rows_stream::<(i64,)>()?; + while let Some(row) = it.try_next().await? { + let lsp = DtNano::from_ns(row.0 as u64); + ret.push(lsp); + } + Ok((ret,)) +} + async fn read_next_values_3_bck( opts: ReadNextValuesOpts, stmts: Arc, @@ -906,49 +945,39 @@ async fn read_next_values_3_bck( jobtrace: &mut ReadJobTrace, ) -> Result<(Box,), Error> { let selfname = "read_next_values_3_bck"; - let ret = opts.val_ty_dyn.empty_container_for_test(); - if true { + if false { + let ret = opts.val_ty_dyn.empty_container_for_test(); return Ok((ret,)); } + let (mut lsp_all,) = read_lsp_all(opts.clone(), stmts.clone(), scy.clone(), jobtrace).await?; + { + let n = lsp_all.len(); + if n > 1024 * 200 { + log::info!("{n} lsp in msp {msp}", msp = opts.ts_msp) + // TODO metrics + } + } + jobtrace.add_event_now(ReadEventKind::ReadEventsLspAllDone); + let lsp = if let Some(x) = lsp_all.pop() { + x + } else { + let ret = opts.val_ty_dyn.empty_container_for_test(); + return Ok((ret,)); + }; let val_ty_dyn = &opts.val_ty_dyn; trace_fetch!("{selfname} {:?} st_name {}", opts, val_ty_dyn.st_name()); let series = opts.series; let ts_msp = opts.ts_msp; - let range = opts.range; let table_name = val_ty_dyn.table_name(); let with_values = opts.readopts.with_values(); - if range.end() > TsNano::from_ns(i64::MAX as u64) { - return Err(Error::RangeEndOverflow); - } - let ts_lsp_max = if ts_msp.ns() < range.beg() { - range.beg().delta(ts_msp.ns()) - } else { - DtNano::from_ns(0) - }; - trace_fetch!( - "{selfname} ts_msp {} ts_lsp_max {} {}", - ts_msp.fmt(), - ts_lsp_max, - table_name - ); + trace_fetch!("{selfname} ts_msp {} lsp {} {}", ts_msp.fmt(), lsp, table_name); let qu = stmts .rt(&opts.rt) - .lsp(!opts.fwd, with_values) + .lsp(false, with_values) // TODO .shape(val_ty_dyn.is_valueblob()) .st(val_ty_dyn.st_name())?; - let qu = if false { - let mut qu = qu.clone(); - if qu.is_token_aware() == false { - return Err(Error::NotTokenAware); - } - qu.set_page_size(10000); - // qu.disable_paging(); - qu - } else { - qu.clone() - }; - let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64); + let params = (series.to_i64(), ts_msp.ms() as i64, lsp.to_i64(), lsp.to_i64() + 1); trace_fetch!("{selfname} event search params {:?}", params); jobtrace.add_event_now(ReadEventKind::CallExecuteIter); let res = scy.execute_iter(qu.clone(), params).await?; @@ -985,7 +1014,7 @@ pub async fn read_events_v02( let val_ty_dyn = new_val_ty_dyn_from_shape_scalar_type(params.shape.clone(), params.scalar_type.clone()); let opts = ReadNextValuesOpts { rt: params.rt.clone(), - series: params.series.id(), + series: params.series, ts_msp: params.ts_msp, range: params.range, fwd: params.fwd, @@ -1008,6 +1037,7 @@ trait ValTyDyn: fmt::Debug + Send { // jobtrace: &mut ReadJobTrace, ) -> Pin, Error>> + Send>>; fn empty_container_for_test(&self) -> Box; + fn clone_boxed(&self) -> Box; } async fn read_into_container_branch_00( @@ -1065,7 +1095,7 @@ where Ok(ret) } -#[derive(Debug)] +#[derive(Debug, Clone)] struct ValTyDynTesting where ST: ValTy, @@ -1173,6 +1203,11 @@ where fn empty_container_for_test(&self) -> Box { Box::new(::empty()) } + + fn clone_boxed(&self) -> Box { + let ret = Self { _t1: PhantomData }; + Box::new(ret) + } } trait ValTy: fmt::Debug + Send + Sized + 'static { diff --git a/crates/scyllaconn/src/events2/prepare.rs b/crates/scyllaconn/src/events2/prepare.rs index 8fe8b1e..fdcf0a2 100644 --- a/crates/scyllaconn/src/events2/prepare.rs +++ b/crates/scyllaconn/src/events2/prepare.rs @@ -19,6 +19,57 @@ autoerr::create_error_v1!( }, ); +#[derive(Debug)] +pub struct StmtsLspAllShape { + u8: PreparedStatement, + u16: PreparedStatement, + u32: PreparedStatement, + u64: PreparedStatement, + i8: PreparedStatement, + i16: PreparedStatement, + i32: PreparedStatement, + i64: PreparedStatement, + f32: PreparedStatement, + f64: PreparedStatement, + bool: PreparedStatement, + string: PreparedStatement, + enumvals: PreparedStatement, +} + +impl StmtsLspAllShape { + pub fn st(&self, stname: &str) -> Result<&PreparedStatement, Error> { + let ret = match stname { + "u8" => &self.u8, + "u16" => &self.u16, + "u32" => &self.u32, + "u64" => &self.u64, + "i8" => &self.i8, + "i16" => &self.i16, + "i32" => &self.i32, + "i64" => &self.i64, + "f32" => &self.f32, + "f64" => &self.f64, + "bool" => &self.bool, + "string" => &self.string, + "enum" => &self.enumvals, + _ => return Err(Error::MissingQuery(format!("no query for stname {stname}"))), + }; + Ok(ret) + } +} + +#[derive(Debug)] +pub struct StmtsLspAll { + scalar: StmtsLspAllShape, + array: StmtsLspAllShape, +} + +impl StmtsLspAll { + pub fn shape(&self, array: bool) -> &StmtsLspAllShape { + if array { &self.array } else { &self.scalar } + } +} + #[derive(Debug)] pub struct StmtsLspShape { u8: PreparedStatement, @@ -75,6 +126,7 @@ pub struct StmtsEventsRt { ts_msp_fwd: PreparedStatement, ts_msp_bck: PreparedStatement, ts_msp_bck_workaround: PreparedStatement, + lsp_all: StmtsLspAll, lsp_fwd_val: StmtsLspDir, lsp_bck_val: StmtsLspDir, lsp_fwd_ts: StmtsLspDir, @@ -96,6 +148,10 @@ impl StmtsEventsRt { &self.ts_msp_bck_workaround } + pub fn lsp_all(&self) -> &StmtsLspAll { + &self.lsp_all + } + pub fn lsp(&self, bck: bool, val: bool) -> &StmtsLspDir { if bck { if val { &self.lsp_bck_val } else { &self.lsp_bck_ts } @@ -160,6 +216,79 @@ async fn make_msp_fwd_for_bck_workaround( Ok(qu) } +async fn make_lsp_all_shape_st( + ks: &str, + rt: &RetentionTime, + shapepre: &str, + stname: &str, + query_opts: &str, + scy: &Session, +) -> Result { + let cql = format!( + concat!( + "select ts_lsp from {}.{}events_{}_{}", + " where series = ? and ts_msp = ? {}" + ), + ks, + rt.table_prefix(), + shapepre, + stname, + query_opts + ); + info_prepare!("{ks} {rt} {cql}"); + let qu = scy.prepare(cql).await?; + Ok(qu) +} + +async fn make_lsp_all_shape( + ks: &str, + rt: &RetentionTime, + shapepre: &str, + query_opts: &str, + scy: &Session, +) -> Result { + let maker = |stname| make_lsp_all_shape_st(ks, rt, shapepre, stname, query_opts, scy); + let ret = StmtsLspAllShape { + u8: maker("u8").await?, + u16: maker("u16").await?, + u32: maker("u32").await?, + u64: maker("u64").await?, + i8: maker("i8").await?, + i16: maker("i16").await?, + i32: maker("i32").await?, + i64: maker("i64").await?, + f32: maker("f32").await?, + f64: maker("f64").await?, + bool: maker("bool").await?, + string: maker("string").await?, + enumvals: if shapepre == "scalar" { + make_lsp_all_shape_st(ks, rt, shapepre, "enum", query_opts, scy).await? + } else { + // exists only for scalar, therefore produce some dummy here + let table_name = "ts_msp"; + let cql = format!( + "select ts_msp from {}.{}{} limit 1 {}", + ks, + rt.table_prefix(), + table_name, + query_opts + ); + info_prepare!("{ks} {rt} {cql}"); + let qu = scy.prepare(cql).await?; + qu + }, + }; + Ok(ret) +} + +async fn make_lsp_all(ks: &str, rt: &RetentionTime, query_opts: &str, scy: &Session) -> Result { + let ret = StmtsLspAll { + scalar: make_lsp_all_shape(ks, rt, "scalar", query_opts, scy).await?, + array: make_lsp_all_shape(ks, rt, "array", query_opts, scy).await?, + }; + Ok(ret) +} + async fn make_lsp( ks: &str, rt: &RetentionTime, @@ -317,6 +446,7 @@ async fn make_rt(ks: &str, rt: &RetentionTime, query_opts: &str, scy: &Session) ts_msp_fwd: make_msp_dir(ks, rt, false, query_opts, scy).await?, ts_msp_bck: make_msp_dir(ks, rt, true, query_opts, scy).await?, ts_msp_bck_workaround: make_msp_fwd_for_bck_workaround(ks, rt, query_opts, scy).await?, + lsp_all: make_lsp_all(ks, rt, query_opts, scy).await?, lsp_fwd_val: make_lsp_dir(ks, rt, "ts_lsp, value", false, query_opts, scy).await?, lsp_bck_val: make_lsp_dir(ks, rt, "ts_lsp, value", true, query_opts, scy).await?, lsp_fwd_ts: make_lsp_dir(ks, rt, "ts_lsp", false, query_opts, scy).await?, diff --git a/crates/scyllaconn/src/worker.rs b/crates/scyllaconn/src/worker.rs index 3dd4bbe..b6c568a 100644 --- a/crates/scyllaconn/src/worker.rs +++ b/crates/scyllaconn/src/worker.rs @@ -9,7 +9,6 @@ use async_channel::Sender; use daqbuf_series::SeriesId; use daqbuf_series::msp::MspU32; use daqbuf_series::msp::PrebinnedPartitioning; -use futures_util::Future; use futures_util::StreamExt; use futures_util::TryStreamExt; use items_0::timebin::BinningggContainerEventsDyn; @@ -19,10 +18,8 @@ use netpod::ScyllaConfig; use netpod::TsMs; use netpod::log; use netpod::ttl::RetentionTime; -use scylla::client::session::Session; use std::collections::VecDeque; use std::fmt; -use std::pin::Pin; use std::sync::Arc; macro_rules! info { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ); } @@ -152,7 +149,6 @@ enum Job { bool, Sender, Error>>, ), - ReadNextValues(ReadNextValues), AccountingReadTs( RetentionTime, TsMs, @@ -173,26 +169,6 @@ enum Job { ), } -struct ReadNextValues { - futgen: Box< - dyn FnOnce( - Arc, - Arc, - ReadJobTrace, - ) -> Pin< - Box, ReadJobTrace), Error>> + Send>, - > + Send, - >, - tx: Sender, ReadJobTrace), Error>>, - jobtrace: ReadJobTrace, -} - -impl fmt::Debug for ReadNextValues { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "ReadNextValues {{ .. }}") - } -} - #[derive(Debug, Clone)] pub struct ScyllaQueue { tx: Sender, @@ -213,33 +189,6 @@ impl ScyllaQueue { Ok(res) } - // TODO remove - pub async fn read_next_values( - &self, - futgen: F, - jobtrace: ReadJobTrace, - ) -> Result<(Box, ReadJobTrace), Error> - where - F: FnOnce( - Arc, - Arc, - ReadJobTrace, - ) -> Pin< - Box, ReadJobTrace), Error>> + Send>, - > + Send - + 'static, - { - let (tx, rx) = async_channel::bounded(1); - let job = Job::ReadNextValues(ReadNextValues { - futgen: Box::new(futgen), - tx, - jobtrace, - }); - self.tx.send(job).await.map_err(|_| Error::ChannelSend)?; - let res = rx.recv().await.map_err(|_| Error::ChannelRecv)??; - Ok(res) - } - pub async fn read_events_v02( &self, params: ReadEventsJobParams, @@ -426,13 +375,6 @@ impl ScyllaWorker { // TODO count for stats } } - Job::ReadNextValues(job) => { - let fut = (job.futgen)(scy.clone(), stmts.clone(), job.jobtrace); - let res = fut.await; - if job.tx.send(res.map_err(Into::into)).await.is_err() { - // TODO count for stats - } - } Job::AccountingReadTs(rt, ts, tx) => { let ks = match &rt { RetentionTime::Short => &self.scyconf_st.keyspace,