From 9ad28060752485be485ca801b3780799621dc7d4 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 29 Jul 2025 15:45:53 +0200 Subject: [PATCH] Report scylla 6 workarounds option in some places --- crates/httpret/src/api4/binned.rs | 34 +++++++++++++++++++----- crates/httpret/src/api4/binned_v2.rs | 19 ++++++++++--- crates/httpret/src/api4/binwriteindex.rs | 17 +++++++++--- crates/httpret/src/channelconfig.rs | 12 ++++++--- crates/nodenet/src/conn.rs | 19 +++++++++---- crates/nodenet/src/scylla.rs | 18 ++++++++++--- crates/scyllaconn/src/events2/events.rs | 2 ++ crates/scyllaconn/src/events2/prepare.rs | 2 +- 8 files changed, 95 insertions(+), 28 deletions(-) diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index 514c702..5d02567 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -21,7 +21,7 @@ use httpclient::IntoBody; use httpclient::Requ; use httpclient::StreamResponse; use httpclient::ToJsonBody; -use netpod::log::*; +use netpod::log; use netpod::req_uri_to_url; use netpod::timeunits::SEC; use netpod::ChannelTypeConfigGen; @@ -48,6 +48,11 @@ use streams::timebin::CacheReadProvider; use tracing::Instrument; use tracing::Span; +macro_rules! error { ($($arg:tt)*) => ( if true { log::error!($($arg)*); } ); } +macro_rules! debug { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ); } +macro_rules! trace { ($($arg:tt)*) => ( if true { log::trace!($($arg)*); } ); } +macro_rules! log_query { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ); } + autoerr::create_error_v1!( name(Error, "Api4Binned"), enum variants { @@ -145,8 +150,8 @@ async fn binned( } else { tracing::Span::none() }; - let span1 = span!( - Level::INFO, + let span1 = log::span!( + log::Level::INFO, "httpret::binned_cbor_framed", reqid, beg = query.range().beg_u64() / SEC, @@ -203,7 +208,7 @@ fn make_read_provider( } else if ncc.node_config.cluster.scylla_lt().is_some() { scyqueue .clone() - .map(|qu| ScyllaEventReadProvider::new(qu)) + .map(|qu| ScyllaEventReadProvider::new(qu, use_scylla6_workarounds.clone())) .map(|x| Arc::new(x) as Arc) .expect("scylla queue") } else if ncc.node.sf_databuffer.is_some() { @@ -273,6 +278,12 @@ async fn binned_json_framed( ncc: &NodeConfigCached, ) -> Result { debug!("binned_json_framed"); + let use_scylla6_workarounds = res2 + .query + .use_scylla6_workarounds() + .map(From::from) + .unwrap_or(ncc.node_config.cluster.use_scylla6_workarounds()); + log_query!("binned_json_framed {:?} {:?}", res2.query, use_scylla6_workarounds); // TODO handle None case better and return 404 let ch_conf = ch_conf_from_binned(&res2.query, ctx, res2.pgqueue, ncc) .await? @@ -280,7 +291,7 @@ async fn binned_json_framed( let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); let (events_read_provider, cache_read_provider) = make_read_provider( ch_conf.name(), - res2.query.use_scylla6_workarounds().into(), + use_scylla6_workarounds.clone(), res2.scyqueue, open_bytes, ctx, @@ -311,6 +322,12 @@ async fn binned_cbor_framed( ncc: &NodeConfigCached, ) -> Result { debug!("binned_cbor_framed"); + let use_scylla6_workarounds = res2 + .query + .use_scylla6_workarounds() + .map(From::from) + .unwrap_or(ncc.node_config.cluster.use_scylla6_workarounds()); + log_query!("binned_cbor_framed {:?} {:?}", res2.query, use_scylla6_workarounds); // TODO handle None case better and return 404 let ch_conf = ch_conf_from_binned(&res2.query, ctx, res2.pgqueue, ncc) .await? @@ -318,7 +335,7 @@ async fn binned_cbor_framed( let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); let (events_read_provider, cache_read_provider) = make_read_provider( ch_conf.name(), - res2.query.use_scylla6_workarounds().into(), + use_scylla6_workarounds, res2.scyqueue, open_bytes, ctx, @@ -369,7 +386,10 @@ impl<'a> HandleRes2<'a> { let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); let (events_read_provider, cache_read_provider) = make_read_provider( ch_conf.name(), - query.use_scylla6_workarounds().into(), + query + .use_scylla6_workarounds() + .map(From::from) + .unwrap_or(ncc.node_config.cluster.use_scylla6_workarounds()), scyqueue.clone(), open_bytes, ctx, diff --git a/crates/httpret/src/api4/binned_v2.rs b/crates/httpret/src/api4/binned_v2.rs index 420bf9a..ec121f5 100644 --- a/crates/httpret/src/api4/binned_v2.rs +++ b/crates/httpret/src/api4/binned_v2.rs @@ -62,6 +62,7 @@ macro_rules! error { ($($arg:tt)*) => ( if true { log::error!($($arg)*); } ); } macro_rules! info { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ); } macro_rules! debug { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ); } macro_rules! trace { ($($arg:tt)*) => ( if true { log::trace!($($arg)*); } ); } +macro_rules! log_query { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ); } autoerr::create_error_v1!( name(Error, "Api4BinnedV2"), @@ -221,7 +222,7 @@ fn make_read_provider( } else if ncc.node_config.cluster.scylla_lt().is_some() { scyqueue .clone() - .map(|qu| ScyllaEventReadProvider::new(qu)) + .map(|qu| ScyllaEventReadProvider::new(qu, use_scylla6_workarounds.clone())) .map(|x| Arc::new(x) as Arc) .expect("scylla queue") } else if ncc.node.sf_databuffer.is_some() { @@ -253,13 +254,18 @@ fn to_debug(x: T) -> String { async fn binned_json_framed( res2: HandleRes2<'_>, ctx: &ReqCtx, - _ncc: &NodeConfigCached, + ncc: &NodeConfigCached, ) -> Result { use futures_util::Stream; info!("binned_json_framed V2 prebinned"); let series = SeriesId::new(res2.ch_conf.series().unwrap()); let range = res2.query.range().to_time().unwrap(); let scyqueue = res2.scyqueue.as_ref().unwrap(); + let use_scylla6_workarounds = res2 + .query + .use_scylla6_workarounds() + .map(From::from) + .unwrap_or(ncc.node_config.cluster.use_scylla6_workarounds()); let stream = if res2.url.as_str().contains("testpart=read_all_coarse") { // let stream = scyllaconn::binwriteindex::read_all_coarse::ReadAllCoarse::new(series, range, scyqueue.clone()); // let stream = stream.map_ok(to_debug).map_err(Error::from); @@ -276,7 +282,7 @@ async fn binned_json_framed( let stream = scyllaconn::binned2::frombinned::FromBinned::new( series, binrange.clone(), - res2.query.use_scylla6_workarounds().into(), + use_scylla6_workarounds.clone(), scyqueue, res2.cache_read_provider, ); @@ -382,13 +388,18 @@ impl<'a> HandleRes2<'a> { scyqueue: Option, ncc: &NodeConfigCached, ) -> Result { + let use_scylla6_workarounds = query + .use_scylla6_workarounds() + .map(From::from) + .unwrap_or(ncc.node_config.cluster.use_scylla6_workarounds()); + log_query!("HandleRes2::new {:?} {:?}", query, use_scylla6_workarounds); let ch_conf = ch_conf_from_binned(&query, ctx, pgqueue, ncc) .await? .ok_or_else(|| Error::ChannelNotFound)?; let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); let (events_read_provider, cache_read_provider) = make_read_provider( ch_conf.name(), - query.use_scylla6_workarounds().into(), + use_scylla6_workarounds.clone(), scyqueue.clone(), open_bytes, ctx, diff --git a/crates/httpret/src/api4/binwriteindex.rs b/crates/httpret/src/api4/binwriteindex.rs index 67aef5e..63ac254 100644 --- a/crates/httpret/src/api4/binwriteindex.rs +++ b/crates/httpret/src/api4/binwriteindex.rs @@ -213,7 +213,7 @@ fn make_read_provider( } else if ncc.node_config.cluster.scylla_lt().is_some() { scyqueue .clone() - .map(|qu| ScyllaEventReadProvider::new(qu)) + .map(|qu| ScyllaEventReadProvider::new(qu, use_scylla6_workarounds.clone())) .map(|x| Arc::new(x) as Arc) .expect("scylla queue") } else if ncc.node.sf_databuffer.is_some() { @@ -241,10 +241,15 @@ fn make_read_provider( async fn binned_json_single( res2: HandleRes2<'_>, ctx: &ReqCtx, - _ncc: &NodeConfigCached, + ncc: &NodeConfigCached, ) -> Result { // TODO unify with binned_json_framed debug!("binned_json_single"); + let use_scylla6_workarounds = res2 + .query + .use_scylla6_workarounds() + .map(From::from) + .unwrap_or(ncc.node_config.cluster.use_scylla6_workarounds()); let rt1 = res2.query.retention_time(); let pbp = res2.query.prebinned_partitioning(); // let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long]; @@ -256,7 +261,7 @@ async fn binned_json_single( SeriesId::new(res2.ch_conf.series().unwrap()), pbp.clone(), res2.query.range().to_time().unwrap(), - res2.query.use_scylla6_workarounds().into(), + use_scylla6_workarounds, res2.scyqueue.clone().unwrap(), ); while let Some(x) = stream.next().await { @@ -292,6 +297,10 @@ impl<'a> HandleRes2<'a> { scyqueue: Option, ncc: &NodeConfigCached, ) -> Result { + let use_scylla6_workarounds = query + .use_scylla6_workarounds() + .map(From::from) + .unwrap_or(ncc.node_config.cluster.use_scylla6_workarounds()); let q2 = BinnedQuery::new(query.channel().clone(), query.range().clone(), 100); let ch_conf = ch_conf_from_binned(&q2, ctx, pgqueue, ncc) .await? @@ -299,7 +308,7 @@ impl<'a> HandleRes2<'a> { let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); let (events_read_provider, cache_read_provider) = make_read_provider( ch_conf.name(), - query.use_scylla6_workarounds().into(), + use_scylla6_workarounds, scyqueue.clone(), open_bytes, ctx, diff --git a/crates/httpret/src/channelconfig.rs b/crates/httpret/src/channelconfig.rs index cbc2a54..993b0f0 100644 --- a/crates/httpret/src/channelconfig.rs +++ b/crates/httpret/src/channelconfig.rs @@ -31,6 +31,7 @@ use netpod::ReqCtx; use netpod::ScalarType; use netpod::SfDbChannel; use netpod::Shape; +use netpod::UseScylla6Workarounds; use netpod::ACCEPT_ALL; use netpod::APP_JSON; use nodenet::configquorum::find_config_basics_quorum; @@ -687,6 +688,11 @@ impl ScyllaSeriesTsMsp { q: &ScyllaSeriesTsMspQuery, shared_res: &ServiceSharedResources, ) -> Result { + // TODO also use the cluster config default + let use_scylla6_workarounds = q + .use_scylla6_workarounds + .map(From::from) + .unwrap_or(UseScylla6Workarounds::production_default()); let nano_range = if let SeriesRange::TimeRange(x) = q.range.clone() { x } else { @@ -704,7 +710,7 @@ impl ScyllaSeriesTsMsp { RetentionTime::Short, sid, (&q.range).into(), - q.use_scylla6_workarounds.into(), + use_scylla6_workarounds.clone(), scyqueue.clone(), ); use chrono::TimeZone; @@ -720,7 +726,7 @@ impl ScyllaSeriesTsMsp { RetentionTime::Medium, sid, (&q.range).into(), - q.use_scylla6_workarounds.into(), + use_scylla6_workarounds.clone(), scyqueue.clone(), ); while let Some(x) = msp_stream.next().await { @@ -735,7 +741,7 @@ impl ScyllaSeriesTsMsp { RetentionTime::Long, sid, (&q.range).into(), - q.use_scylla6_workarounds.into(), + use_scylla6_workarounds.clone(), scyqueue.clone(), ); while let Some(x) = msp_stream.next().await { diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index 8eb1fb5..a422044 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -17,7 +17,7 @@ use items_2::frame::decode_frame; use items_2::frame::make_term_frame; use items_2::inmem::InMemoryFrame; use netpod::histo::HistoLog2; -use netpod::log::*; +use netpod::log; use netpod::NodeConfigCached; use netpod::ReqCtxArc; use query::api4::events::EventsSubQuery; @@ -36,6 +36,11 @@ use tokio::net::tcp::OwnedWriteHalf; use tokio::net::TcpStream; use tracing::Instrument; +macro_rules! error { ($($arg:tt)*) => ( if true { log::error!($($arg)*); } ); } +macro_rules! debug { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ); } +macro_rules! trace { ($($arg:tt)*) => ( if true { log::trace!($($arg)*); } ); } +macro_rules! log_query { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ); } + #[cfg(test)] mod test; @@ -94,6 +99,10 @@ async fn make_channel_events_stream_data( ncc: &NodeConfigCached, ) -> Result> + Send>>, Error> { // ) -> Result>, Error> { + let use_scylla6_workarounds = subq + .use_scylla6_workarounds() + .unwrap_or(ncc.node_config.cluster.use_scylla6_workarounds()); + log_query!("make_channel_events_stream_data {:?}", use_scylla6_workarounds); if subq.backend() == TEST_BACKEND { let node_count = ncc.node_config.cluster.nodes.len() as u64; let node_ix = ncc.ix as u64; @@ -101,7 +110,7 @@ async fn make_channel_events_stream_data( Ok(ret) } else if let Some(scyqueue) = scyqueue { let cfg = subq.ch_conf().to_scylla()?; - let ret = scylla_channel_event_stream(subq, cfg, scyqueue).await?; + let ret = scylla_channel_event_stream(subq, cfg, scyqueue, use_scylla6_workarounds).await?; Ok(ret) } else if let Some(_) = &ncc.node.channel_archiver { let e = Error::NotAvailable; @@ -177,7 +186,7 @@ async fn events_conn_handler_with_reqid( } } { - let item = LogItem::level_msg(Level::DEBUG, format!("buf_len_histo: {:?}", buf_len_histo)); + let item = LogItem::level_msg(log::Level::DEBUG, format!("buf_len_histo: {:?}", buf_len_histo)); let item: Sitemty = Ok(StreamItem::Log(item)); let buf = match item.make_frame_dyn() { Ok(k) => k, @@ -211,7 +220,7 @@ where let mut frames = Vec::new(); while let Some(k) = h .next() - .instrument(span!(Level::INFO, "events_conn_handler/query-input")) + .instrument(log::span!(log::Level::INFO, "events_conn_handler/query-input")) .await { match k { @@ -323,7 +332,7 @@ async fn events_conn_handler( let inp = TcpReadAsBytes::new(netin); let inp = inp.map_err(sitem_err2_from_string); let inp = Box::new(inp); - let span1 = span!(Level::INFO, "events_conn_handler"); + let span1 = log::span!(log::Level::INFO, "events_conn_handler"); let r = events_conn_handler_inner(inp, netout, addr, scyqueue, &ncc) .instrument(span1) .await; diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index d066e5d..e2a2e72 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -11,7 +11,9 @@ use items_2::binning::container_events::ContainerEvents; use items_2::channelevents::ChannelEvents; use netpod::log::*; use netpod::ChConf; +use netpod::NodeConfigCached; use netpod::SeriesKind; +use netpod::UseScylla6Workarounds; use query::api4::events::EventsSubQuery; use scyllaconn::events2::events::EventReadOpts; use scyllaconn::events2::mergert; @@ -34,6 +36,7 @@ pub async fn scylla_channel_event_stream( evq: EventsSubQuery, chconf: ChConf, scyqueue: &ScyllaQueue, + use_scylla6_workarounds: UseScylla6Workarounds, ) -> Result> + Send>>, Error> { trace!("scylla_channel_event_stream {evq:?}"); // TODO depends in general on the query @@ -43,7 +46,7 @@ pub async fn scylla_channel_event_stream( evq.need_one_before_range(), evq.need_value_data(), evq.settings().scylla_read_queue_len(), - evq.use_scylla6_workarounds().into(), + use_scylla6_workarounds, ); let stream: Pin + Send>> = if let Some(rt) = evq.use_rt() { trace!("========= SOLO {rt:?} ====================="); @@ -184,11 +187,15 @@ impl Stream for ScyllaEventsReadStream { pub struct ScyllaEventReadProvider { scyqueue: ScyllaQueue, + use_scylla6_workarounds: UseScylla6Workarounds, } impl ScyllaEventReadProvider { - pub fn new(scyqueue: ScyllaQueue) -> Self { - Self { scyqueue } + pub fn new(scyqueue: ScyllaQueue, use_scylla6_workarounds: UseScylla6Workarounds) -> Self { + Self { + scyqueue, + use_scylla6_workarounds, + } } } @@ -197,7 +204,10 @@ impl EventsReadProvider for ScyllaEventReadProvider { let scyqueue = self.scyqueue.clone(); match evq.ch_conf().clone() { netpod::ChannelTypeConfigGen::Scylla(ch_conf) => { - let fut1 = async move { crate::scylla::scylla_channel_event_stream(evq, ch_conf, &scyqueue).await }; + let use_scylla6_workarounds = self.use_scylla6_workarounds.clone(); + let fut1 = async move { + crate::scylla::scylla_channel_event_stream(evq, ch_conf, &scyqueue, use_scylla6_workarounds).await + }; let stream = ScyllaEventsReadStream { fut1: Some(Box::pin(fut1)), stream: None, diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index 9d8f6ed..c81f199 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -67,6 +67,8 @@ macro_rules! log_fetch_result { ($($arg:tt)*) => { if false { log::trace!("fetch {}", format_args!($($arg)*)); } }; } +macro_rules! log_query { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ) } + #[derive(Debug, Clone)] pub struct EventReadOpts { with_values: bool, diff --git a/crates/scyllaconn/src/events2/prepare.rs b/crates/scyllaconn/src/events2/prepare.rs index 0e2fa02..9a61177 100644 --- a/crates/scyllaconn/src/events2/prepare.rs +++ b/crates/scyllaconn/src/events2/prepare.rs @@ -4,7 +4,7 @@ use scylla::statement::prepared::PreparedStatement; macro_rules! log_prepare { ($($arg:tt)*) => { log::debug!("prepare cql {}", format_args!($($arg)*)); }; } -macro_rules! trace_scy6 { ($($arg:tt)*) => { log::info!("{}", format_args!($($arg)*)); }; } +macro_rules! trace_scy6 { ($($arg:tt)*) => { if false { log::trace!("{}", format_args!($($arg)*)); } }; } autoerr::create_error_v1!( name(Error, "ScyllaPrepare"),