Report scylla 6 workarounds option in some places

This commit is contained in:
Dominik Werder
2025-07-29 15:45:53 +02:00
parent 778264eb30
commit 9ad2806075
8 changed files with 95 additions and 28 deletions

View File

@@ -21,7 +21,7 @@ use httpclient::IntoBody;
use httpclient::Requ;
use httpclient::StreamResponse;
use httpclient::ToJsonBody;
use netpod::log::*;
use netpod::log;
use netpod::req_uri_to_url;
use netpod::timeunits::SEC;
use netpod::ChannelTypeConfigGen;
@@ -48,6 +48,11 @@ use streams::timebin::CacheReadProvider;
use tracing::Instrument;
use tracing::Span;
macro_rules! error { ($($arg:tt)*) => ( if true { log::error!($($arg)*); } ); }
macro_rules! debug { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ); }
macro_rules! trace { ($($arg:tt)*) => ( if true { log::trace!($($arg)*); } ); }
macro_rules! log_query { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ); }
autoerr::create_error_v1!(
name(Error, "Api4Binned"),
enum variants {
@@ -145,8 +150,8 @@ async fn binned(
} else {
tracing::Span::none()
};
let span1 = span!(
Level::INFO,
let span1 = log::span!(
log::Level::INFO,
"httpret::binned_cbor_framed",
reqid,
beg = query.range().beg_u64() / SEC,
@@ -203,7 +208,7 @@ fn make_read_provider(
} else if ncc.node_config.cluster.scylla_lt().is_some() {
scyqueue
.clone()
.map(|qu| ScyllaEventReadProvider::new(qu))
.map(|qu| ScyllaEventReadProvider::new(qu, use_scylla6_workarounds.clone()))
.map(|x| Arc::new(x) as Arc<dyn EventsReadProvider>)
.expect("scylla queue")
} else if ncc.node.sf_databuffer.is_some() {
@@ -273,6 +278,12 @@ async fn binned_json_framed(
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
debug!("binned_json_framed");
let use_scylla6_workarounds = res2
.query
.use_scylla6_workarounds()
.map(From::from)
.unwrap_or(ncc.node_config.cluster.use_scylla6_workarounds());
log_query!("binned_json_framed {:?} {:?}", res2.query, use_scylla6_workarounds);
// TODO handle None case better and return 404
let ch_conf = ch_conf_from_binned(&res2.query, ctx, res2.pgqueue, ncc)
.await?
@@ -280,7 +291,7 @@ async fn binned_json_framed(
let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()));
let (events_read_provider, cache_read_provider) = make_read_provider(
ch_conf.name(),
res2.query.use_scylla6_workarounds().into(),
use_scylla6_workarounds.clone(),
res2.scyqueue,
open_bytes,
ctx,
@@ -311,6 +322,12 @@ async fn binned_cbor_framed(
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
debug!("binned_cbor_framed");
let use_scylla6_workarounds = res2
.query
.use_scylla6_workarounds()
.map(From::from)
.unwrap_or(ncc.node_config.cluster.use_scylla6_workarounds());
log_query!("binned_cbor_framed {:?} {:?}", res2.query, use_scylla6_workarounds);
// TODO handle None case better and return 404
let ch_conf = ch_conf_from_binned(&res2.query, ctx, res2.pgqueue, ncc)
.await?
@@ -318,7 +335,7 @@ async fn binned_cbor_framed(
let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()));
let (events_read_provider, cache_read_provider) = make_read_provider(
ch_conf.name(),
res2.query.use_scylla6_workarounds().into(),
use_scylla6_workarounds,
res2.scyqueue,
open_bytes,
ctx,
@@ -369,7 +386,10 @@ impl<'a> HandleRes2<'a> {
let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()));
let (events_read_provider, cache_read_provider) = make_read_provider(
ch_conf.name(),
query.use_scylla6_workarounds().into(),
query
.use_scylla6_workarounds()
.map(From::from)
.unwrap_or(ncc.node_config.cluster.use_scylla6_workarounds()),
scyqueue.clone(),
open_bytes,
ctx,

View File

@@ -62,6 +62,7 @@ macro_rules! error { ($($arg:tt)*) => ( if true { log::error!($($arg)*); } ); }
macro_rules! info { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ); }
macro_rules! debug { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ); }
macro_rules! trace { ($($arg:tt)*) => ( if true { log::trace!($($arg)*); } ); }
macro_rules! log_query { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ); }
autoerr::create_error_v1!(
name(Error, "Api4BinnedV2"),
@@ -221,7 +222,7 @@ fn make_read_provider(
} else if ncc.node_config.cluster.scylla_lt().is_some() {
scyqueue
.clone()
.map(|qu| ScyllaEventReadProvider::new(qu))
.map(|qu| ScyllaEventReadProvider::new(qu, use_scylla6_workarounds.clone()))
.map(|x| Arc::new(x) as Arc<dyn EventsReadProvider>)
.expect("scylla queue")
} else if ncc.node.sf_databuffer.is_some() {
@@ -253,13 +254,18 @@ fn to_debug<T: std::fmt::Debug>(x: T) -> String {
async fn binned_json_framed(
res2: HandleRes2<'_>,
ctx: &ReqCtx,
_ncc: &NodeConfigCached,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
use futures_util::Stream;
info!("binned_json_framed V2 prebinned");
let series = SeriesId::new(res2.ch_conf.series().unwrap());
let range = res2.query.range().to_time().unwrap();
let scyqueue = res2.scyqueue.as_ref().unwrap();
let use_scylla6_workarounds = res2
.query
.use_scylla6_workarounds()
.map(From::from)
.unwrap_or(ncc.node_config.cluster.use_scylla6_workarounds());
let stream = if res2.url.as_str().contains("testpart=read_all_coarse") {
// let stream = scyllaconn::binwriteindex::read_all_coarse::ReadAllCoarse::new(series, range, scyqueue.clone());
// let stream = stream.map_ok(to_debug).map_err(Error::from);
@@ -276,7 +282,7 @@ async fn binned_json_framed(
let stream = scyllaconn::binned2::frombinned::FromBinned::new(
series,
binrange.clone(),
res2.query.use_scylla6_workarounds().into(),
use_scylla6_workarounds.clone(),
scyqueue,
res2.cache_read_provider,
);
@@ -382,13 +388,18 @@ impl<'a> HandleRes2<'a> {
scyqueue: Option<ScyllaQueue>,
ncc: &NodeConfigCached,
) -> Result<Self, Error> {
let use_scylla6_workarounds = query
.use_scylla6_workarounds()
.map(From::from)
.unwrap_or(ncc.node_config.cluster.use_scylla6_workarounds());
log_query!("HandleRes2::new {:?} {:?}", query, use_scylla6_workarounds);
let ch_conf = ch_conf_from_binned(&query, ctx, pgqueue, ncc)
.await?
.ok_or_else(|| Error::ChannelNotFound)?;
let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()));
let (events_read_provider, cache_read_provider) = make_read_provider(
ch_conf.name(),
query.use_scylla6_workarounds().into(),
use_scylla6_workarounds.clone(),
scyqueue.clone(),
open_bytes,
ctx,

View File

@@ -213,7 +213,7 @@ fn make_read_provider(
} else if ncc.node_config.cluster.scylla_lt().is_some() {
scyqueue
.clone()
.map(|qu| ScyllaEventReadProvider::new(qu))
.map(|qu| ScyllaEventReadProvider::new(qu, use_scylla6_workarounds.clone()))
.map(|x| Arc::new(x) as Arc<dyn EventsReadProvider>)
.expect("scylla queue")
} else if ncc.node.sf_databuffer.is_some() {
@@ -241,10 +241,15 @@ fn make_read_provider(
async fn binned_json_single(
res2: HandleRes2<'_>,
ctx: &ReqCtx,
_ncc: &NodeConfigCached,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
// TODO unify with binned_json_framed
debug!("binned_json_single");
let use_scylla6_workarounds = res2
.query
.use_scylla6_workarounds()
.map(From::from)
.unwrap_or(ncc.node_config.cluster.use_scylla6_workarounds());
let rt1 = res2.query.retention_time();
let pbp = res2.query.prebinned_partitioning();
// let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long];
@@ -256,7 +261,7 @@ async fn binned_json_single(
SeriesId::new(res2.ch_conf.series().unwrap()),
pbp.clone(),
res2.query.range().to_time().unwrap(),
res2.query.use_scylla6_workarounds().into(),
use_scylla6_workarounds,
res2.scyqueue.clone().unwrap(),
);
while let Some(x) = stream.next().await {
@@ -292,6 +297,10 @@ impl<'a> HandleRes2<'a> {
scyqueue: Option<ScyllaQueue>,
ncc: &NodeConfigCached,
) -> Result<Self, Error> {
let use_scylla6_workarounds = query
.use_scylla6_workarounds()
.map(From::from)
.unwrap_or(ncc.node_config.cluster.use_scylla6_workarounds());
let q2 = BinnedQuery::new(query.channel().clone(), query.range().clone(), 100);
let ch_conf = ch_conf_from_binned(&q2, ctx, pgqueue, ncc)
.await?
@@ -299,7 +308,7 @@ impl<'a> HandleRes2<'a> {
let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()));
let (events_read_provider, cache_read_provider) = make_read_provider(
ch_conf.name(),
query.use_scylla6_workarounds().into(),
use_scylla6_workarounds,
scyqueue.clone(),
open_bytes,
ctx,

View File

@@ -31,6 +31,7 @@ use netpod::ReqCtx;
use netpod::ScalarType;
use netpod::SfDbChannel;
use netpod::Shape;
use netpod::UseScylla6Workarounds;
use netpod::ACCEPT_ALL;
use netpod::APP_JSON;
use nodenet::configquorum::find_config_basics_quorum;
@@ -687,6 +688,11 @@ impl ScyllaSeriesTsMsp {
q: &ScyllaSeriesTsMspQuery,
shared_res: &ServiceSharedResources,
) -> Result<ScyllaSeriesTsMspResponse, Error> {
// TODO also use the cluster config default
let use_scylla6_workarounds = q
.use_scylla6_workarounds
.map(From::from)
.unwrap_or(UseScylla6Workarounds::production_default());
let nano_range = if let SeriesRange::TimeRange(x) = q.range.clone() {
x
} else {
@@ -704,7 +710,7 @@ impl ScyllaSeriesTsMsp {
RetentionTime::Short,
sid,
(&q.range).into(),
q.use_scylla6_workarounds.into(),
use_scylla6_workarounds.clone(),
scyqueue.clone(),
);
use chrono::TimeZone;
@@ -720,7 +726,7 @@ impl ScyllaSeriesTsMsp {
RetentionTime::Medium,
sid,
(&q.range).into(),
q.use_scylla6_workarounds.into(),
use_scylla6_workarounds.clone(),
scyqueue.clone(),
);
while let Some(x) = msp_stream.next().await {
@@ -735,7 +741,7 @@ impl ScyllaSeriesTsMsp {
RetentionTime::Long,
sid,
(&q.range).into(),
q.use_scylla6_workarounds.into(),
use_scylla6_workarounds.clone(),
scyqueue.clone(),
);
while let Some(x) = msp_stream.next().await {

View File

@@ -17,7 +17,7 @@ use items_2::frame::decode_frame;
use items_2::frame::make_term_frame;
use items_2::inmem::InMemoryFrame;
use netpod::histo::HistoLog2;
use netpod::log::*;
use netpod::log;
use netpod::NodeConfigCached;
use netpod::ReqCtxArc;
use query::api4::events::EventsSubQuery;
@@ -36,6 +36,11 @@ use tokio::net::tcp::OwnedWriteHalf;
use tokio::net::TcpStream;
use tracing::Instrument;
macro_rules! error { ($($arg:tt)*) => ( if true { log::error!($($arg)*); } ); }
macro_rules! debug { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ); }
macro_rules! trace { ($($arg:tt)*) => ( if true { log::trace!($($arg)*); } ); }
macro_rules! log_query { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ); }
#[cfg(test)]
mod test;
@@ -94,6 +99,10 @@ async fn make_channel_events_stream_data(
ncc: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>, Error> {
// ) -> Result<impl Stream<Item = Sitemty<ChannelEvents>>, Error> {
let use_scylla6_workarounds = subq
.use_scylla6_workarounds()
.unwrap_or(ncc.node_config.cluster.use_scylla6_workarounds());
log_query!("make_channel_events_stream_data {:?}", use_scylla6_workarounds);
if subq.backend() == TEST_BACKEND {
let node_count = ncc.node_config.cluster.nodes.len() as u64;
let node_ix = ncc.ix as u64;
@@ -101,7 +110,7 @@ async fn make_channel_events_stream_data(
Ok(ret)
} else if let Some(scyqueue) = scyqueue {
let cfg = subq.ch_conf().to_scylla()?;
let ret = scylla_channel_event_stream(subq, cfg, scyqueue).await?;
let ret = scylla_channel_event_stream(subq, cfg, scyqueue, use_scylla6_workarounds).await?;
Ok(ret)
} else if let Some(_) = &ncc.node.channel_archiver {
let e = Error::NotAvailable;
@@ -177,7 +186,7 @@ async fn events_conn_handler_with_reqid(
}
}
{
let item = LogItem::level_msg(Level::DEBUG, format!("buf_len_histo: {:?}", buf_len_histo));
let item = LogItem::level_msg(log::Level::DEBUG, format!("buf_len_histo: {:?}", buf_len_histo));
let item: Sitemty<ChannelEvents> = Ok(StreamItem::Log(item));
let buf = match item.make_frame_dyn() {
Ok(k) => k,
@@ -211,7 +220,7 @@ where
let mut frames = Vec::new();
while let Some(k) = h
.next()
.instrument(span!(Level::INFO, "events_conn_handler/query-input"))
.instrument(log::span!(log::Level::INFO, "events_conn_handler/query-input"))
.await
{
match k {
@@ -323,7 +332,7 @@ async fn events_conn_handler(
let inp = TcpReadAsBytes::new(netin);
let inp = inp.map_err(sitem_err2_from_string);
let inp = Box::new(inp);
let span1 = span!(Level::INFO, "events_conn_handler");
let span1 = log::span!(log::Level::INFO, "events_conn_handler");
let r = events_conn_handler_inner(inp, netout, addr, scyqueue, &ncc)
.instrument(span1)
.await;

View File

@@ -11,7 +11,9 @@ use items_2::binning::container_events::ContainerEvents;
use items_2::channelevents::ChannelEvents;
use netpod::log::*;
use netpod::ChConf;
use netpod::NodeConfigCached;
use netpod::SeriesKind;
use netpod::UseScylla6Workarounds;
use query::api4::events::EventsSubQuery;
use scyllaconn::events2::events::EventReadOpts;
use scyllaconn::events2::mergert;
@@ -34,6 +36,7 @@ pub async fn scylla_channel_event_stream(
evq: EventsSubQuery,
chconf: ChConf,
scyqueue: &ScyllaQueue,
use_scylla6_workarounds: UseScylla6Workarounds,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>, Error> {
trace!("scylla_channel_event_stream {evq:?}");
// TODO depends in general on the query
@@ -43,7 +46,7 @@ pub async fn scylla_channel_event_stream(
evq.need_one_before_range(),
evq.need_value_data(),
evq.settings().scylla_read_queue_len(),
evq.use_scylla6_workarounds().into(),
use_scylla6_workarounds,
);
let stream: Pin<Box<dyn Stream<Item = _> + Send>> = if let Some(rt) = evq.use_rt() {
trace!("========= SOLO {rt:?} =====================");
@@ -184,11 +187,15 @@ impl Stream for ScyllaEventsReadStream {
pub struct ScyllaEventReadProvider {
scyqueue: ScyllaQueue,
use_scylla6_workarounds: UseScylla6Workarounds,
}
impl ScyllaEventReadProvider {
pub fn new(scyqueue: ScyllaQueue) -> Self {
Self { scyqueue }
pub fn new(scyqueue: ScyllaQueue, use_scylla6_workarounds: UseScylla6Workarounds) -> Self {
Self {
scyqueue,
use_scylla6_workarounds,
}
}
}
@@ -197,7 +204,10 @@ impl EventsReadProvider for ScyllaEventReadProvider {
let scyqueue = self.scyqueue.clone();
match evq.ch_conf().clone() {
netpod::ChannelTypeConfigGen::Scylla(ch_conf) => {
let fut1 = async move { crate::scylla::scylla_channel_event_stream(evq, ch_conf, &scyqueue).await };
let use_scylla6_workarounds = self.use_scylla6_workarounds.clone();
let fut1 = async move {
crate::scylla::scylla_channel_event_stream(evq, ch_conf, &scyqueue, use_scylla6_workarounds).await
};
let stream = ScyllaEventsReadStream {
fut1: Some(Box::pin(fut1)),
stream: None,

View File

@@ -67,6 +67,8 @@ macro_rules! log_fetch_result {
($($arg:tt)*) => { if false { log::trace!("fetch {}", format_args!($($arg)*)); } };
}
macro_rules! log_query { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ) }
#[derive(Debug, Clone)]
pub struct EventReadOpts {
with_values: bool,

View File

@@ -4,7 +4,7 @@ use scylla::statement::prepared::PreparedStatement;
macro_rules! log_prepare { ($($arg:tt)*) => { log::debug!("prepare cql {}", format_args!($($arg)*)); }; }
macro_rules! trace_scy6 { ($($arg:tt)*) => { log::info!("{}", format_args!($($arg)*)); }; }
macro_rules! trace_scy6 { ($($arg:tt)*) => { if false { log::trace!("{}", format_args!($($arg)*)); } }; }
autoerr::create_error_v1!(
name(Error, "ScyllaPrepare"),