diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index 5132ba0..a380991 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -6,6 +6,7 @@ use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_2::channelevents::ChannelEvents; use netpod::log::*; +use netpod::ttl::RetentionTime; use netpod::ChConf; use query::api4::events::EventsSubQuery; use scyllaconn::worker::ScyllaQueue; @@ -28,6 +29,7 @@ pub async fn scylla_channel_event_stream( let with_values = evq.need_value_data(); debug!("\n\nmake EventsStreamScylla {series:?} {scalar_type:?} {shape:?}\n"); let stream = scyllaconn::events::EventsStreamScylla::new( + RetentionTime::Short, series, evq.range().into(), do_one_before_range, diff --git a/crates/scyllaconn/src/events.rs b/crates/scyllaconn/src/events.rs index 3e0f7dd..f5dbf55 100644 --- a/crates/scyllaconn/src/events.rs +++ b/crates/scyllaconn/src/events.rs @@ -15,6 +15,7 @@ use items_2::channelevents::ChannelEvents; use items_2::eventsdim0::EventsDim0; use items_2::eventsdim1::EventsDim1; use netpod::log::*; +use netpod::ttl::RetentionTime; use netpod::DtNano; use netpod::ScalarType; use netpod::Shape; @@ -24,7 +25,6 @@ use scylla::frame::response::result::Row; use scylla::prepared_statement::PreparedStatement; use scylla::Session; use scylla::Session as ScySession; -use std::collections::BTreeMap; use std::collections::VecDeque; use std::mem; use std::pin::Pin; @@ -33,71 +33,212 @@ use std::task::Context; use std::task::Poll; #[derive(Debug)] -pub struct StmtsEventsRt { - ts_msp_bck: PreparedStatement, - ts_msp_fwd: PreparedStatement, - read_value_queries: BTreeMap, +pub struct StmtsLspShape { + u8: PreparedStatement, + u16: PreparedStatement, + u32: PreparedStatement, + u64: PreparedStatement, + i8: PreparedStatement, + i16: PreparedStatement, + i32: PreparedStatement, + i64: PreparedStatement, + f32: PreparedStatement, + f64: PreparedStatement, + bool: PreparedStatement, + string: PreparedStatement, } -impl StmtsEventsRt { - pub(super) async fn new(rtpre: &str, scy: &Session) -> Result { - let cql = format!( - "select ts_msp from {}{} where series = ? and ts_msp < ? order by ts_msp desc limit 2", - rtpre, "ts_msp" - ); - let ts_msp_bck = scy.prepare(cql).await.err_conv()?; - let cql = format!( - "select ts_msp from {}{} where series = ? and ts_msp >= ? and ts_msp < ?", - rtpre, "ts_msp" - ); - let ts_msp_fwd = scy.prepare(cql).await.err_conv()?; - let mut read_value_queries = BTreeMap::new(); - for sct in [ - "u8", "u16", "u32", "u64", "i8", "i16", "i32", "i64", "f32", "f64", "bool", "string", - ] { - let combinations = [ - ("timestamps", "scalar", "ts_lsp, pulse"), - ("timestamps", "array", "ts_lsp, pulse"), - ("values", "scalar", "ts_lsp, pulse, value"), - ("valueblobs", "array", "ts_lsp, pulse, valueblob"), - ]; - for com in combinations { - let query_name = format!("{}_{}_{}_fwd", com.1, sct, com.0); - let cql = format!( - concat!( - "select {} from {}events_{}_{}", - " where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?" - ), - com.2, rtpre, com.1, sct, - ); - let qu = scy.prepare(cql).await.err_conv()?; - read_value_queries.insert(query_name, qu); - - let query_name = format!("{}_{}_{}_bck", com.1, sct, com.0); - let cql = format!( - concat!( - "select {} from {}events_{}_{}", - " where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1" - ), - com.2, rtpre, com.1, sct, - ); - let qu = scy.prepare(cql).await.err_conv()?; - read_value_queries.insert(query_name, qu); - } - } - let ret = Self { - ts_msp_bck, - ts_msp_fwd, - read_value_queries, +impl StmtsLspShape { + fn st(&self, stname: &str) -> Result<&PreparedStatement, Error> { + let ret = match stname { + "u8" => &self.u8, + _ => return Err(Error::with_msg_no_trace(format!("no query for stname {stname}"))), }; Ok(ret) } } +#[derive(Debug)] +pub struct StmtsLspDir { + scalar: StmtsLspShape, + array: StmtsLspShape, +} + +impl StmtsLspDir { + fn shape(&self, array: bool) -> &StmtsLspShape { + if array { + &self.array + } else { + &self.scalar + } + } +} + +#[derive(Debug)] +pub struct StmtsEventsRt { + ts_msp_fwd: PreparedStatement, + ts_msp_bck: PreparedStatement, + lsp_fwd_val: StmtsLspDir, + lsp_bck_val: StmtsLspDir, + lsp_fwd_ts: StmtsLspDir, + lsp_bck_ts: StmtsLspDir, +} + +impl StmtsEventsRt { + fn lsp(&self, bck: bool, val: bool) -> &StmtsLspDir { + if bck { + if val { + &self.lsp_bck_val + } else { + &self.lsp_bck_ts + } + } else { + if val { + &self.lsp_fwd_val + } else { + &self.lsp_fwd_ts + } + } + } +} + +#[derive(Debug)] +pub struct StmtsEvents { + st: StmtsEventsRt, + mt: StmtsEventsRt, + lt: StmtsEventsRt, +} + +async fn make_msp_dir(ks: &str, rt: &RetentionTime, bck: bool, scy: &Session) -> Result { + let table_name = "ts_msp"; + let select_cond = if bck { + "ts_msp < ? order by ts_msp desc limit 2" + } else { + "ts_msp >= ? and ts_msp < ?" + }; + let cql = format!( + "select ts_msp from {}.{}{} where series = ? and {}", + ks, + rt.table_prefix(), + table_name, + select_cond + ); + let qu = scy.prepare(cql).await.err_conv()?; + Ok(qu) +} + +async fn make_lsp( + ks: &str, + rt: &RetentionTime, + shapepre: &str, + stname: &str, + values: &str, + bck: bool, + scy: &Session, +) -> Result { + let select_cond = if bck { + "ts_lsp < ? order by ts_lsp desc limit 1" + } else { + "ts_lsp >= ? and ts_lsp < ?" + }; + let cql = format!( + concat!( + "select {} from {}.{}events_{}_{}", + " where series = ? and ts_msp = ? and {}" + ), + values, + ks, + rt.table_prefix(), + shapepre, + stname, + select_cond + ); + let qu = scy.prepare(cql).await.err_conv()?; + Ok(qu) +} + +async fn make_lsp_shape( + ks: &str, + rt: &RetentionTime, + shapepre: &str, + values: &str, + bck: bool, + scy: &Session, +) -> Result { + let values = if shapepre.contains("array") { + values.replace("value", "valueblob") + } else { + values.into() + }; + let values = &values; + let maker = |stname| make_lsp(ks, rt, shapepre, stname, values, bck, scy); + let ret = StmtsLspShape { + u8: maker("u8").await?, + u16: maker("u16").await?, + u32: maker("u32").await?, + u64: maker("u64").await?, + i8: maker("i8").await?, + i16: maker("i16").await?, + i32: maker("i32").await?, + i64: maker("i64").await?, + f32: maker("f32").await?, + f64: maker("f64").await?, + bool: maker("bool").await?, + string: maker("string").await?, + }; + Ok(ret) +} + +async fn make_lsp_dir( + ks: &str, + rt: &RetentionTime, + values: &str, + bck: bool, + scy: &Session, +) -> Result { + let ret = StmtsLspDir { + scalar: make_lsp_shape(ks, rt, "scalar", values, bck, scy).await?, + array: make_lsp_shape(ks, rt, "array", values, bck, scy).await?, + }; + Ok(ret) +} + +async fn make_rt(ks: &str, rt: &RetentionTime, scy: &Session) -> Result { + let ret = StmtsEventsRt { + ts_msp_fwd: make_msp_dir(ks, rt, false, scy).await?, + ts_msp_bck: make_msp_dir(ks, rt, true, scy).await?, + lsp_fwd_val: make_lsp_dir(ks, rt, "ts_lsp, pulse, value", false, scy).await?, + lsp_bck_val: make_lsp_dir(ks, rt, "ts_lsp, pulse, value", true, scy).await?, + lsp_fwd_ts: make_lsp_dir(ks, rt, "ts_lsp, pulse", false, scy).await?, + lsp_bck_ts: make_lsp_dir(ks, rt, "ts_lsp, pulse", true, scy).await?, + }; + Ok(ret) +} + +impl StmtsEvents { + pub(super) async fn new(ks: [&str; 3], scy: &Session) -> Result { + let ret = StmtsEvents { + st: make_rt(ks[0], &RetentionTime::Short, scy).await?, + mt: make_rt(ks[1], &RetentionTime::Medium, scy).await?, + lt: make_rt(ks[2], &RetentionTime::Long, scy).await?, + }; + Ok(ret) + } + + fn rt(&self, rt: &RetentionTime) -> &StmtsEventsRt { + match rt { + RetentionTime::Short => &self.st, + RetentionTime::Medium => &self.mt, + RetentionTime::Long => &&self.lt, + } + } +} + pub(super) async fn find_ts_msp_worker( + rt: &RetentionTime, series: u64, range: ScyllaSeriesRange, - stmts: &StmtsEventsRt, + stmts: &StmtsEvents, scy: &ScySession, ) -> Result<(VecDeque, VecDeque), Error> { trace!("find_ts_msp series {:?} {:?}", series, range); @@ -106,7 +247,7 @@ pub(super) async fn find_ts_msp_worker( let params = (series as i64, range.beg().ms() as i64); trace!("find_ts_msp query 1 params {:?}", params); let mut res = scy - .execute_iter(stmts.ts_msp_bck.clone(), params) + .execute_iter(stmts.rt(rt).ts_msp_bck.clone(), params) .await .err_conv()? .into_typed::<(i64,)>(); @@ -119,7 +260,7 @@ pub(super) async fn find_ts_msp_worker( let params = (series as i64, range.beg().ms() as i64, 1 + range.end().ms() as i64); trace!("find_ts_msp query 2 params {:?}", params); let mut res = scy - .execute_iter(stmts.ts_msp_fwd.clone(), params) + .execute_iter(stmts.rt(rt).ts_msp_fwd.clone(), params) .await .err_conv()? .into_typed::<(i64,)>(); @@ -168,7 +309,7 @@ macro_rules! impl_scaty_scalar { ::default() } fn table_name() -> &'static str { - $table_name + concat!("scalar_", $table_name) } fn default() -> Self { ::default() @@ -209,7 +350,7 @@ macro_rules! impl_scaty_array { } } fn table_name() -> &'static str { - $table_name + concat!("array_", $table_name) } fn default() -> Self { Vec::new() @@ -234,11 +375,12 @@ impl ValTy for Vec { } fn from_valueblob(inp: Vec) -> Self { - todo!() + warn!("ValTy::from_valueblob for Vec"); + Vec::new() } fn table_name() -> &'static str { - "st_events_array_enum" + "array_string" } fn default() -> Self { @@ -250,36 +392,37 @@ impl ValTy for Vec { } fn st_name() -> &'static str { - "enum" + "string" } } -impl_scaty_scalar!(u8, i8, "u8", "st_events_scalar_u8"); -impl_scaty_scalar!(u16, i16, "u16", "st_events_scalar_u16"); -impl_scaty_scalar!(u32, i32, "u32", "st_events_scalar_u32"); -impl_scaty_scalar!(u64, i64, "u64", "st_events_scalar_u64"); -impl_scaty_scalar!(i8, i8, "i8", "st_events_scalar_i8"); -impl_scaty_scalar!(i16, i16, "i16", "st_events_scalar_i16"); -impl_scaty_scalar!(i32, i32, "i32", "st_events_scalar_i32"); -impl_scaty_scalar!(i64, i64, "i64", "st_events_scalar_i64"); -impl_scaty_scalar!(f32, f32, "f32", "st_events_scalar_f32"); -impl_scaty_scalar!(f64, f64, "f64", "st_events_scalar_f64"); -impl_scaty_scalar!(bool, bool, "bool", "st_events_scalar_bool"); -impl_scaty_scalar!(String, String, "string", "st_events_scalar_string"); +impl_scaty_scalar!(u8, i8, "u8", "u8"); +impl_scaty_scalar!(u16, i16, "u16", "u16"); +impl_scaty_scalar!(u32, i32, "u32", "u32"); +impl_scaty_scalar!(u64, i64, "u64", "u64"); +impl_scaty_scalar!(i8, i8, "i8", "i8"); +impl_scaty_scalar!(i16, i16, "i16", "i16"); +impl_scaty_scalar!(i32, i32, "i32", "i32"); +impl_scaty_scalar!(i64, i64, "i64", "i64"); +impl_scaty_scalar!(f32, f32, "f32", "f32"); +impl_scaty_scalar!(f64, f64, "f64", "f64"); +impl_scaty_scalar!(bool, bool, "bool", "bool"); +impl_scaty_scalar!(String, String, "string", "string"); -impl_scaty_array!(Vec, u8, Vec, "u8", "st_events_array_u8"); -impl_scaty_array!(Vec, u16, Vec, "u16", "st_events_array_u16"); -impl_scaty_array!(Vec, u32, Vec, "u32", "st_events_array_u32"); -impl_scaty_array!(Vec, u64, Vec, "u64", "st_events_array_u64"); -impl_scaty_array!(Vec, i8, Vec, "i8", "st_events_array_i8"); -impl_scaty_array!(Vec, i16, Vec, "i16", "st_events_array_i16"); -impl_scaty_array!(Vec, i32, Vec, "i32", "st_events_array_i32"); -impl_scaty_array!(Vec, i64, Vec, "i64", "st_events_array_i64"); -impl_scaty_array!(Vec, f32, Vec, "f32", "st_events_array_f32"); -impl_scaty_array!(Vec, f64, Vec, "f64", "st_events_array_f64"); -impl_scaty_array!(Vec, bool, Vec, "bool", "st_events_array_bool"); +impl_scaty_array!(Vec, u8, Vec, "u8", "u8"); +impl_scaty_array!(Vec, u16, Vec, "u16", "u16"); +impl_scaty_array!(Vec, u32, Vec, "u32", "u32"); +impl_scaty_array!(Vec, u64, Vec, "u64", "u64"); +impl_scaty_array!(Vec, i8, Vec, "i8", "i8"); +impl_scaty_array!(Vec, i16, Vec, "i16", "i16"); +impl_scaty_array!(Vec, i32, Vec, "i32", "i32"); +impl_scaty_array!(Vec, i64, Vec, "i64", "i64"); +impl_scaty_array!(Vec, f32, Vec, "f32", "f32"); +impl_scaty_array!(Vec, f64, Vec, "f64", "f64"); +impl_scaty_array!(Vec, bool, Vec, "bool", "bool"); struct ReadNextValuesOpts { + rt: RetentionTime, series: u64, ts_msp: TsMs, range: ScyllaSeriesRange, @@ -294,23 +437,23 @@ where { // TODO could take scyqeue out of opts struct. let scyqueue = opts.scyqueue.clone(); - let futgen = Box::new(|scy: Arc, stmts: Arc| { - let fut = read_next_values_worker::(opts, scy, stmts); + let futgen = Box::new(|scy: Arc, stmts: Arc| { + let fut = read_next_values_2::(opts, scy, stmts); Box::pin(fut) as Pin, err::Error>> + Send>> }); let res = scyqueue.read_next_values(futgen).await?; Ok(res) } -async fn read_next_values_worker( +async fn read_next_values_2( opts: ReadNextValuesOpts, scy: Arc, - stmts: Arc, + stmts: Arc, ) -> Result, Error> where ST: ValTy, { - trace!("read_next_values_worker {} {}", opts.series, opts.ts_msp); + trace!("read_next_values_2 {} {}", opts.series, opts.ts_msp); let series = opts.series; let ts_msp = opts.ts_msp; let range = opts.range; @@ -350,11 +493,11 @@ where format!("scalar_{}_timestamps_{}", ST::st_name(), dir) } }; - let qu = stmts.read_value_queries.get(&qu_name).ok_or_else(|| { - let e = Error::with_msg_no_trace(format!("can not find query name {}", qu_name)); - error!("{e}"); - e - })?; + let qu = stmts + .rt(&opts.rt) + .lsp(!opts.fwd, opts.with_values) + .shape(ST::is_valueblob()) + .st(ST::st_name())?; let params = ( series as i64, ts_msp.ms() as i64, @@ -391,11 +534,11 @@ where format!("scalar_{}_timestamps_{}", ST::st_name(), dir) } }; - let qu = stmts.read_value_queries.get(&qu_name).ok_or_else(|| { - let e = Error::with_msg_no_trace(format!("can not find query name {}", qu_name)); - error!("{e}"); - e - })?; + let qu = stmts + .rt(&opts.rt) + .lsp(!opts.fwd, opts.with_values) + .shape(ST::is_valueblob()) + .st(ST::st_name())?; let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64); trace!("BCK event search params {:?}", params); let mut res = scy.execute_iter(qu.clone(), params).await.err_conv()?; @@ -474,6 +617,7 @@ fn convert_rows( } struct ReadValues { + rt: RetentionTime, series: u64, scalar_type: ScalarType, shape: Shape, @@ -488,6 +632,7 @@ struct ReadValues { impl ReadValues { fn new( + rt: RetentionTime, series: u64, scalar_type: ScalarType, shape: Shape, @@ -498,6 +643,7 @@ impl ReadValues { scyqueue: ScyllaQueue, ) -> Self { let mut ret = Self { + rt, series, scalar_type, shape, @@ -527,6 +673,7 @@ impl ReadValues { fn make_fut(&mut self, ts_msp: TsMs) -> Pin, Error>> + Send>> { let opts = ReadNextValuesOpts { + rt: self.rt.clone(), series: self.series.clone(), ts_msp, range: self.range.clone(), @@ -600,6 +747,7 @@ enum FrState { } pub struct EventsStreamScylla { + rt: RetentionTime, state: FrState, series: u64, scalar_type: ScalarType, @@ -618,6 +766,7 @@ pub struct EventsStreamScylla { impl EventsStreamScylla { pub fn new( + rt: RetentionTime, series: u64, range: ScyllaSeriesRange, do_one_before_range: bool, @@ -629,6 +778,7 @@ impl EventsStreamScylla { ) -> Self { debug!("EventsStreamScylla::new"); Self { + rt, state: FrState::New, series, scalar_type, @@ -663,6 +813,7 @@ impl EventsStreamScylla { if let Some(msp) = self.ts_msp_bck.pop_back() { trace!("start ReadBack1 msp {}", msp); let st = ReadValues::new( + self.rt.clone(), self.series, self.scalar_type.clone(), self.shape.clone(), @@ -676,6 +827,7 @@ impl EventsStreamScylla { } else if self.ts_msp_fwd.len() > 0 { trace!("begin immediately with forward read"); let st = ReadValues::new( + self.rt.clone(), self.series, self.scalar_type.clone(), self.shape.clone(), @@ -698,6 +850,7 @@ impl EventsStreamScylla { if self.ts_msp_fwd.len() > 0 { trace!("start forward read after back1"); let st = ReadValues::new( + self.rt.clone(), self.series, self.scalar_type.clone(), self.shape.clone(), @@ -715,6 +868,7 @@ impl EventsStreamScylla { if let Some(msp) = self.ts_msp_bck.pop_back() { trace!("start ReadBack2 msp {}", msp); let st = ReadValues::new( + self.rt.clone(), self.series, self.scalar_type.clone(), self.shape.clone(), @@ -728,6 +882,7 @@ impl EventsStreamScylla { } else if self.ts_msp_fwd.len() > 0 { trace!("no 2nd back MSP, go for forward read"); let st = ReadValues::new( + self.rt.clone(), self.series, self.scalar_type.clone(), self.shape.clone(), @@ -753,6 +908,7 @@ impl EventsStreamScylla { if self.ts_msp_fwd.len() > 0 { trace!("start forward read after back2"); let st = ReadValues::new( + self.rt.clone(), self.series, self.scalar_type.clone(), self.shape.clone(), @@ -771,11 +927,12 @@ impl EventsStreamScylla { } async fn find_ts_msp_via_queue( + rt: RetentionTime, series: u64, range: ScyllaSeriesRange, scyqueue: ScyllaQueue, ) -> Result<(VecDeque, VecDeque), crate::worker::Error> { - scyqueue.find_ts_msp(series, range).await + scyqueue.find_ts_msp(rt, series, range).await } impl Stream for EventsStreamScylla { @@ -810,7 +967,7 @@ impl Stream for EventsStreamScylla { FrState::New => { let series = self.series.clone(); let range = self.range.clone(); - let fut = find_ts_msp_via_queue(series, range, self.scyqueue.clone()); + let fut = find_ts_msp_via_queue(self.rt.clone(), series, range, self.scyqueue.clone()); let fut = Box::pin(fut); self.state = FrState::FindMsp(fut); continue; diff --git a/crates/scyllaconn/src/events2.rs b/crates/scyllaconn/src/events2.rs new file mode 100644 index 0000000..e69de29 diff --git a/crates/scyllaconn/src/scyllaconn.rs b/crates/scyllaconn/src/scyllaconn.rs index 478f676..50de65f 100644 --- a/crates/scyllaconn/src/scyllaconn.rs +++ b/crates/scyllaconn/src/scyllaconn.rs @@ -3,6 +3,7 @@ pub mod bincache; pub mod conn; pub mod errconv; pub mod events; +pub mod events2; pub mod range; pub mod status; pub mod worker; diff --git a/crates/scyllaconn/src/worker.rs b/crates/scyllaconn/src/worker.rs index a934e80..2047fb3 100644 --- a/crates/scyllaconn/src/worker.rs +++ b/crates/scyllaconn/src/worker.rs @@ -1,5 +1,5 @@ use crate::conn::create_scy_session_no_ks; -use crate::events::StmtsEventsRt; +use crate::events::StmtsEvents; use crate::range::ScyllaSeriesRange; use async_channel::Receiver; use async_channel::Sender; @@ -8,6 +8,7 @@ use err::ThisError; use futures_util::Future; use items_0::Events; use netpod::log::*; +use netpod::ttl::RetentionTime; use netpod::ScyllaConfig; use netpod::TsMs; use scylla::Session; @@ -33,6 +34,7 @@ impl err::ToErr for Error { #[derive(Debug)] enum Job { FindTsMsp( + RetentionTime, // series-id u64, ScyllaSeriesRange, @@ -45,7 +47,7 @@ struct ReadNextValues { futgen: Box< dyn FnOnce( Arc, - Arc, + Arc, ) -> Pin, err::Error>> + Send>> + Send, >, @@ -67,11 +69,12 @@ pub struct ScyllaQueue { impl ScyllaQueue { pub async fn find_ts_msp( &self, + rt: RetentionTime, series: u64, range: ScyllaSeriesRange, ) -> Result<(VecDeque, VecDeque), Error> { let (tx, rx) = async_channel::bounded(1); - let job = Job::FindTsMsp(series, range, tx); + let job = Job::FindTsMsp(rt, series, range, tx); self.tx.send(job).await.map_err(|_| Error::ChannelSend)?; let res = rx.recv().await.map_err(|_| Error::ChannelRecv)??; Ok(res) @@ -81,7 +84,7 @@ impl ScyllaQueue { where F: FnOnce( Arc, - Arc, + Arc, ) -> Pin, err::Error>> + Send>> + Send + 'static, @@ -125,9 +128,13 @@ impl ScyllaWorker { pub async fn work(self) -> Result<(), Error> { let scy = create_scy_session_no_ks(&self.scyconf_st).await?; let scy = Arc::new(scy); - let rtpre = format!("{}.st_", self.scyconf_st.keyspace); - let stmts_st = StmtsEventsRt::new(&rtpre, &scy).await?; - let stmts_st = Arc::new(stmts_st); + let kss = [ + self.scyconf_st.keyspace.as_str(), + self.scyconf_mt.keyspace.as_str(), + self.scyconf_lt.keyspace.as_str(), + ]; + let stmts = StmtsEvents::new(kss.try_into().unwrap(), &scy).await?; + let stmts = Arc::new(stmts); loop { let x = self.rx.recv().await; let job = match x { @@ -138,14 +145,14 @@ impl ScyllaWorker { } }; match job { - Job::FindTsMsp(series, range, tx) => { - let res = crate::events::find_ts_msp_worker(series, range, &stmts_st, &scy).await; + Job::FindTsMsp(rt, series, range, tx) => { + let res = crate::events::find_ts_msp_worker(&rt, series, range, &stmts, &scy).await; if tx.send(res.map_err(Into::into)).await.is_err() { // TODO count for stats } } Job::ReadNextValues(job) => { - let fut = (job.futgen)(scy.clone(), stmts_st.clone()); + let fut = (job.futgen)(scy.clone(), stmts.clone()); let res = fut.await; if job.tx.send(res.map_err(Into::into)).await.is_err() { // TODO count for stats diff --git a/crates/taskrun/src/taskrun.rs b/crates/taskrun/src/taskrun.rs index 02f1509..846b9f7 100644 --- a/crates/taskrun/src/taskrun.rs +++ b/crates/taskrun/src/taskrun.rs @@ -106,6 +106,7 @@ where } } +#[allow(unused)] struct LogFilterLayer where L: tracing_subscriber::Layer, @@ -157,7 +158,6 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { .init(); console_subscriber::init(); } else { - // #[cfg(DISABLED)] // Logging setup let filter = tracing_subscriber::EnvFilter::builder() .with_default_directive(tracing::metadata::LevelFilter::INFO.into()) @@ -168,14 +168,43 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { .with_default_directive(tracing::metadata::LevelFilter::INFO.into()) .from_env() .map_err(|e| Error::with_msg_no_trace(format!("can not build tracing env filter {e}")))?; - // let filter_3 = tracing_subscriber::filter::dynamic_filter_fn(|meta, ctx| { - // // - // if ["scyllaconn"].contains(&meta.target()) { - // true - // } else { - // true - // } - // }); + let filter_3 = tracing_subscriber::filter::dynamic_filter_fn(|meta, ctx| { + if *meta.level() <= tracing::Level::TRACE { + if ["httpret", "scyllaconn"].contains(&meta.target()) { + let mut sr = ctx.lookup_current(); + let mut allow = false; + while let Some(g) = sr { + if g.name() == "log_span_trace" { + allow = true; + break; + } else { + sr = g.parent(); + } + } + allow + } else { + false + } + } else if *meta.level() <= tracing::Level::DEBUG { + if ["httpret", "scyllaconn", "items_0", "items_2", "streams"].contains(&meta.target()) { + let mut sr = ctx.lookup_current(); + let mut allow = false; + while let Some(g) = sr { + if g.name() == "log_span_trace" || g.name() == "log_span_debug" { + allow = true; + break; + } else { + sr = g.parent(); + } + } + allow + } else { + false + } + } else { + true + } + }); let fmt_layer = tracing_subscriber::fmt::Layer::new() .with_writer(io::stderr) .with_timer(timer) @@ -183,6 +212,7 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { .with_ansi(false) .with_thread_names(true) .event_format(formatter::FormatTxt) + .with_filter(filter_3) .with_filter(filter_2) .with_filter(filter) // .and_then(LogFilterLayer::new("lay1".into()))