Work with 2nd scylla outlet

This commit is contained in:
Dominik Werder
2025-07-23 23:58:42 +02:00
parent 6721edcaef
commit a545c1a5b7
9 changed files with 287 additions and 76 deletions

View File

@@ -5,6 +5,7 @@ use log::*;
use netfetch::conf::CaIngestOpts;
use netfetch::conf::parse_config;
use netpod::Database;
use netpod::ttl::RetentionTime;
use scywr::config::ScyllaIngestConfig;
use taskrun::TracingMode;
@@ -160,25 +161,33 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
}
async fn scylla_schema_check(opts: CaIngestOpts, do_change: bool) -> Result<(), Error> {
todo!("scylla_schema_check config");
let scy_confs = opts.scylla_insert_set_conf(0);
let mut confs = Vec::new();
confs.push(opts.scylla_insert_set_conf_main());
if let Some(x) = opts.scylla_insert_set_conf_2nd() {
confs.push(x);
}
let opstr = if do_change { "change" } else { "check" };
info!("start scylla schema {}", opstr);
info!("{:?}", opts.scylla_config_st());
info!("{:?}", opts.scylla_config_mt());
info!("{:?}", opts.scylla_config_lt());
info!("{:?}", opts.scylla_config_st_rf1());
scywr::schema::migrate_scylla_data_schema_all_rt(
[
&opts.scylla_config_st(),
&opts.scylla_config_mt(),
&opts.scylla_config_lt(),
&opts.scylla_config_st_rf1(),
],
do_change,
)
.await
.map_err(Error::from_string)?;
for conf in confs {
info!("{:?}", conf.st_rf3());
info!("{:?}", conf.mt_rf3());
info!("{:?}", conf.lt_rf3());
info!("{:?}", conf.st_rf1());
// TODO attach RetentionTime to the config struct
let rts = [
RetentionTime::Short,
RetentionTime::Medium,
RetentionTime::Long,
RetentionTime::Short,
];
scywr::schema::migrate_scylla_data_schema_all_rt(
rts,
[&conf.st_rf3(), &conf.mt_rf3(), &conf.lt_rf3(), &conf.st_rf1()],
do_change,
)
.await
.map_err(Error::from_string)?;
}
info!("stop scylla schema {}", opstr);
Ok(())
}

View File

@@ -259,8 +259,8 @@ impl Daemon {
.map_err(Error::from_string)?;
insert_workers_jhs.extend(jh);
} else {
let scyset1 = ingest_opts.scylla_insert_set_conf(0).unwrap();
if let Some(scyset2) = ingest_opts.scylla_insert_set_conf(1) {
let scyset1 = ingest_opts.scylla_insert_set_conf_main();
if let Some(scyset2) = ingest_opts.scylla_insert_set_conf_2nd() {
let (iqrx1, iqrx2) = iqrx.clone_2();
let jhs = spawn_scylla_insert_workers(
&scyset1,
@@ -911,7 +911,14 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option<ChannelsConfig>) ->
warn!("scylla_disable config flag enabled");
} else {
info!("start scylla schema check");
let rts = [
RetentionTime::Short,
RetentionTime::Medium,
RetentionTime::Long,
RetentionTime::Short,
];
scywr::schema::migrate_scylla_data_schema_all_rt(
rts,
[
&opts.scylla_config_st(),
&opts.scylla_config_mt(),

View File

@@ -82,7 +82,7 @@ use tokio::net::TcpStream;
const CONNECTING_TIMEOUT: Duration = Duration::from_millis(1000 * 6);
const CHANNEL_STATUS_EMIT_IVL: Duration = Duration::from_millis(1000 * 8);
const IOC_PING_IVL: Duration = Duration::from_millis(1000 * 180);
const IOC_PING_IVL: Duration = Duration::from_millis(1000 * 60 * 10);
const MONITOR_POLL_TIMEOUT: Duration = Duration::from_millis(1000 * 6);
const TIMEOUT_CHANNEL_CLOSING: Duration = Duration::from_millis(1000 * 8);
const TIMEOUT_PONG_WAIT: Duration = Duration::from_millis(1000 * 10);
@@ -90,7 +90,7 @@ const READ_CHANNEL_VALUE_STATUS_EMIT_QUIET_MIN: Duration = Duration::from_millis
const SILENCE_READ_NEXT_IVL: Duration = Duration::from_millis(1000 * 200);
const POLL_READ_TIMEOUT: Duration = Duration::from_millis(1000 * 10);
const DO_RATE_CHECK: bool = false;
const CHANNEL_STATUS_PONG_QUIET: Duration = Duration::from_millis(1000 * 60 * 60);
const CHANNEL_STATUS_PONG_QUIET: Duration = Duration::from_millis(1000 * 60 * 5);
const METRICS_EMIT_IVL: Duration = Duration::from_millis(1000 * 1);
macro_rules! trace3 { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ); }
@@ -779,6 +779,7 @@ impl WriterStatus {
&mut self,
item: ChannelStatusItem,
deque: &mut VecDeque<QueryItem>,
mett: &mut stats::mett::CaConnMetrics,
) -> Result<(), Error> {
let tsev = TsNano::from_system_time(SystemTime::now());
let (ts, val) = item.to_ts_val();
@@ -789,6 +790,7 @@ impl WriterStatus {
tsev,
deque,
)?;
mett.emit_channel_status_item().inc();
Ok(())
}
}
@@ -1064,7 +1066,7 @@ impl<'a> EventAddIngestRefobj<'a> {
status: ChannelStatus::MonitoringSilenceReadUnchanged,
};
if wrst
.emit_channel_status_item(item, CaConn::channel_status_qu(self.iqdqs))
.emit_channel_status_item(item, CaConn::channel_status_qu(self.iqdqs), mett)
.is_err()
{
mett.logic_error().inc();
@@ -1527,10 +1529,6 @@ impl CaConn {
&mut iqdqs.lt_rf3_qu
}
fn channel_status_pong_qu(iqdqs: &mut InsertDeques) -> &mut VecDeque<QueryItem> {
&mut iqdqs.st_rf3_qu
}
pub fn conn_command_tx(&self) -> Sender<ConnCommand> {
self.conn_command_tx.as_ref().get_ref().clone()
}
@@ -1835,8 +1833,11 @@ impl CaConn {
cssid: st2.channel.cssid.clone(),
status: ChannelStatus::Opened,
};
conf.wrst
.emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?;
conf.wrst.emit_channel_status_item(
item,
Self::channel_status_qu(&mut self.iqdqs),
&mut self.mett,
)?;
}
if let Some((ivl,)) = conf_poll_conf {
let ivl = Duration::from_millis(ivl);
@@ -1972,7 +1973,7 @@ impl CaConn {
status: ChannelStatus::Closed(ChannelStatusClosedReason::ChannelRemove),
};
let qu = Self::channel_status_qu(&mut self.iqdqs);
if conf.wrst.emit_channel_status_item(item, qu).is_err() {
if conf.wrst.emit_channel_status_item(item, qu, &mut self.mett).is_err() {
self.mett.logic_error().inc();
}
// TODO shutdown the internal writer structures.
@@ -2089,7 +2090,11 @@ impl CaConn {
cssid: conf.state.cssid(),
status: ChannelStatus::Closed(channel_reason.clone()),
};
if conf.wrst.emit_channel_status_item(item, status_qu).is_err() {
if conf
.wrst
.emit_channel_status_item(item, status_qu, &mut self.mett)
.is_err()
{
self.mett.logic_error().inc();
}
}
@@ -2177,6 +2182,7 @@ impl CaConn {
tsnow: Instant,
tscaproto: Instant,
) -> Result<(), Error> {
self.mett.fn_handle_event_add_res().inc();
let subid = Subid(ev.subid);
// TODO handle subid-not-found which can also be peer error:
let cid = if let Some(x) = self.cid_by_subid.get(&subid) {
@@ -2400,6 +2406,7 @@ impl CaConn {
tsnow: Instant,
tscaproto: Instant,
) -> Result<(), Error> {
self.mett.fn_handle_read_notify_res().inc();
// trace!("handle_read_notify_res {ev:?}");
// TODO can not rely on the SID in the response.
let sid_ev = Sid(ev.sid);
@@ -2494,6 +2501,7 @@ impl CaConn {
ch_wrst.emit_channel_status_item(
item,
Self::channel_status_qu(&mut self.iqdqs),
&mut self.mett,
)?;
}
// NOTE we do not update the last value in this ev handler.
@@ -2516,6 +2524,7 @@ impl CaConn {
ch_wrst.emit_channel_status_item(
item,
Self::channel_status_qu(&mut self.iqdqs),
&mut self.mett,
)?;
}
}
@@ -2535,6 +2544,7 @@ impl CaConn {
ch_wrst.emit_channel_status_item(
item,
Self::channel_status_qu(&mut self.iqdqs),
&mut self.mett,
)?;
}
}
@@ -2754,8 +2764,11 @@ impl CaConn {
cssid: st2.channel.cssid.clone(),
status: ChannelStatus::MonitoringSilenceReadStart,
};
conf.wrst
.emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?;
conf.wrst.emit_channel_status_item(
item,
Self::channel_status_qu(&mut self.iqdqs),
&mut self.mett,
)?;
}
}
}
@@ -2777,8 +2790,11 @@ impl CaConn {
cssid: st2.channel.cssid.clone(),
status: ChannelStatus::MonitoringSilenceReadTimeout,
};
conf.wrst
.emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?;
conf.wrst.emit_channel_status_item(
item,
Self::channel_status_qu(&mut self.iqdqs),
&mut self.mett,
)?;
}
if false {
// Here we try to close the channel at hand.
@@ -2855,8 +2871,11 @@ impl CaConn {
cssid: st2.channel.cssid.clone(),
status: ChannelStatus::PollingReadTimeout,
};
conf.wrst
.emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?;
conf.wrst.emit_channel_status_item(
item,
Self::channel_status_qu(&mut self.iqdqs),
&mut self.mett,
)?;
}
}
}
@@ -2894,6 +2913,25 @@ impl CaConn {
value: CaConnEventValue::EchoTimeout,
};
self.ca_conn_event_out_queue.push_back(item);
for (_cid, ch_conf) in &mut self.channels {
match &mut ch_conf.state {
ChannelState::Writable(st2) => {
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: st2.channel.cssid.clone(),
status: ChannelStatus::PongTimeout,
};
ch_conf.wrst.emit_channel_status_item(
item,
Self::channel_status_qu(&mut self.iqdqs),
&mut self.mett,
)?;
}
_ => {
// TODO count metrics
}
}
}
self.trigger_shutdown(ShutdownReason::IocTimeout);
}
} else {
@@ -2904,6 +2942,25 @@ impl CaConn {
self.ioc_ping_last = Some(self.tmp_ts_poll);
let msg = CaMsg::from_ty_ts(CaMsgTy::Echo, tsnow);
proto.push_out(msg);
for (_cid, ch_conf) in &mut self.channels {
match &mut ch_conf.state {
ChannelState::Writable(st2) => {
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: st2.channel.cssid.clone(),
status: ChannelStatus::Ping,
};
ch_conf.wrst.emit_channel_status_item(
item,
Self::channel_status_qu(&mut self.iqdqs),
&mut self.mett,
)?;
}
_ => {
// TODO count metrics
}
}
}
} else {
self.mett.ping_no_proto().inc();
info!("can not ping {} no proto", self.remote_addr_dbg);
@@ -3619,9 +3676,9 @@ impl CaConn {
self.ts_channel_status_pong_last = self.poll_tsnow;
Self::channel_status_qu(&mut self.iqdqs)
} else {
Self::channel_status_pong_qu(&mut self.iqdqs)
Self::channel_status_qu(&mut self.iqdqs)
};
if ch.wrst.emit_channel_status_item(item, qu).is_err() {
if ch.wrst.emit_channel_status_item(item, qu, &mut self.mett).is_err() {
self.mett.logic_error().inc();
}
}

View File

@@ -32,6 +32,7 @@ pub struct CaIngestOpts {
scylla_mt: ScyllaRtConf,
scylla_lt: ScyllaRtConf,
scylla_st_rf1: ScyllaRtConf,
scylla_2nd: Option<ScyllaConfigBlockV2>,
array_truncate: Option<u64>,
insert_worker_count: Option<usize>,
insert_worker_concurrency: Option<usize>,
@@ -70,13 +71,23 @@ impl CaIngestOpts {
&self.postgresql
}
pub fn scylla_insert_set_conf(&self, n: usize) -> Option<ScyllaInsertsetConf> {
if n == 0 {
pub fn scylla_insert_set_conf_main(&self) -> ScyllaInsertsetConf {
let ret = ScyllaInsertsetConf {
st_rf1: self.scylla_config_st_rf1(),
st_rf3: self.scylla_config_st(),
mt_rf3: self.scylla_config_mt(),
lt_rf3: self.scylla_config_lt(),
};
ret
}
pub fn scylla_insert_set_conf_2nd(&self) -> Option<ScyllaInsertsetConf> {
if let Some(cc) = self.scylla_2nd.as_ref() {
let ret = ScyllaInsertsetConf {
st_rf1: self.scylla_config_st_rf1(),
st_rf3: self.scylla_config_st(),
mt_rf3: self.scylla_config_mt(),
lt_rf3: self.scylla_config_lt(),
st_rf1: cc.scylla_config_st_rf1(),
st_rf3: cc.scylla_config_st(),
mt_rf3: cc.scylla_config_mt(),
lt_rf3: cc.scylla_config_lt(),
};
Some(ret)
} else {
@@ -193,6 +204,52 @@ impl CaIngestOpts {
}
}
#[derive(Clone, Debug, Deserialize)]
pub struct ScyllaConfigBlockV2 {
scylla: ScyllaConfigBlockV2MainHosts,
scylla_st: ScyllaRtConf,
scylla_mt: ScyllaRtConf,
scylla_lt: ScyllaRtConf,
scylla_st_rf1: ScyllaRtConf,
}
impl ScyllaConfigBlockV2 {
pub fn scylla_config_st(&self) -> ScyllaIngestConfig {
let c = &self.scylla_st;
let hosts = self.fill_hosts_if_empty(c);
ScyllaIngestConfig::new(hosts, c.keyspace.clone())
}
pub fn scylla_config_mt(&self) -> ScyllaIngestConfig {
let c = &self.scylla_mt;
let hosts = self.fill_hosts_if_empty(c);
ScyllaIngestConfig::new(hosts, c.keyspace.clone())
}
pub fn scylla_config_lt(&self) -> ScyllaIngestConfig {
let c = &self.scylla_lt;
let hosts = self.fill_hosts_if_empty(c);
ScyllaIngestConfig::new(hosts, c.keyspace.clone())
}
pub fn scylla_config_st_rf1(&self) -> ScyllaIngestConfig {
let c = &self.scylla_st_rf1;
let hosts = self.fill_hosts_if_empty(c);
ScyllaIngestConfig::new(hosts, c.keyspace.clone())
}
fn fill_hosts_if_empty(&self, c: &ScyllaRtConf) -> Vec<String> {
c.hosts
.as_ref()
.map_or_else(|| self.scylla.hosts.clone(), |x| x.clone())
}
}
#[derive(Clone, Debug, Deserialize)]
struct ScyllaConfigBlockV2MainHosts {
hosts: Vec<String>,
}
#[test]
fn parse_config_minimal() {
let conf = r###"
@@ -342,6 +399,11 @@ scylla_2nd:
assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string()));
assert_eq!(conf.scylla_config_st().hosts().get(1), Some(&"node2:19042".to_string()));
assert_eq!(conf.scylla_config_lt().hosts().get(1), Some(&"node4:19042".to_string()));
assert_eq!(conf.scylla_2nd.is_some(), true);
{
let scy = conf.scylla_2nd.as_ref().unwrap();
assert_eq!(scy.scylla.hosts.len(), 1);
}
assert_eq!(conf.timeout, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45)));
}

View File

@@ -1,3 +1,4 @@
use netpod::ttl::RetentionTime;
use serde::Deserialize;
#[derive(Debug, Clone, Deserialize)]
@@ -26,4 +27,8 @@ impl ScyllaIngestConfig {
pub fn hosts(&self) -> &Vec<String> {
&self.hosts
}
pub fn short_name(&self, rt: RetentionTime) -> String {
format!("Scyconf {{ {:?}, {:?}, {:?} }}", self.hosts.get(0), self.keyspace, rt)
}
}

View File

@@ -31,19 +31,19 @@ use std::time::Instant;
use taskrun::tokio;
use tokio::task::JoinHandle;
macro_rules! error { ($($arg:expr),*) => ( if true { log::error!($($arg),*); } ); }
macro_rules! error { ($($arg:tt)*) => ( if true { log::error!($($arg)*); } ); }
macro_rules! warn { ($($arg:expr),*) => ( if true { log::warn!($($arg),*); } ); }
macro_rules! warn { ($($arg:tt)*) => ( if true { log::warn!($($arg)*); } ); }
macro_rules! trace2 { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ); }
macro_rules! trace2 { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ); }
macro_rules! trace_transform { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ); }
macro_rules! trace_transform { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ); }
macro_rules! trace_inspect { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ); }
macro_rules! trace_inspect { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ); }
macro_rules! trace_item_execute { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ); }
macro_rules! trace_item_execute { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ); }
macro_rules! debug_setup { ($($arg:expr),*) => ( if false { log::debug!($($arg),*); } ); }
macro_rules! debug_setup { ($($arg:tt)*) => ( if false { log::debug!($($arg)*); } ); }
autoerr::create_error_v1!(
name(Error, "ScyllaInsertWorker"),
@@ -169,8 +169,9 @@ struct FutTrackDt<F> {
ts1: Instant,
ts2: Instant,
ts_net: Instant,
poll1: bool,
npoll: u16,
fut: F,
jobkind: FutJobKind,
}
impl FutTrackDt<InsertFut> {
@@ -180,8 +181,9 @@ impl FutTrackDt<InsertFut> {
ts1: tsnow,
ts2: tsnow,
ts_net: job.ts_net,
poll1: false,
npoll: 0,
fut: job.fut,
jobkind: job.jobkind,
}
}
}
@@ -190,16 +192,16 @@ impl<F> Future for FutTrackDt<F>
where
F: Future + Unpin,
{
type Output = (Instant, Instant, Instant, F::Output);
type Output = (Instant, Instant, Instant, F::Output, FutJobKind);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
if self.poll1 == false {
self.poll1 = true;
if self.npoll == 0 {
self.ts2 = Instant::now();
}
self.npoll = self.npoll.saturating_add(1);
match self.as_mut().fut.poll_unpin(cx) {
Ready(x) => Ready((self.ts_net, self.ts1, self.ts2, x)),
Ready(x) => Ready((self.ts_net, self.ts1, self.ts2, x, self.jobkind.clone())),
Pending => Pending,
}
}
@@ -236,9 +238,29 @@ async fn worker_streamed(
.buffer_unordered(concurrency);
let mut stream = Box::pin(stream);
debug_setup!("waiting for item");
while let Some((ts_net, ts1, ts2, item)) = stream.next().await {
while let Some((ts_net, ts1, ts2, item, jobkind)) = stream.next().await {
trace_item_execute!("see item");
let tsnow = Instant::now();
match jobkind {
FutJobKind::SeriesData => {
mett.jobtrans().SeriesData().inc();
}
FutJobKind::SeriesMsp => {
mett.jobtrans().SeriesMsp().inc();
}
FutJobKind::TimeBinSimpleF32V02 => {
mett.jobtrans().TimeBinSimpleF32V02().inc();
}
FutJobKind::BinWriteIndexV04 => {
mett.jobtrans().BinWriteIndexV04().inc();
}
FutJobKind::Accounting => {
mett.jobtrans().Accounting().inc();
}
FutJobKind::AccountingRecv => {
mett.jobtrans().AccountingRecv().inc();
}
}
match item {
Ok(_) => {
mett.job_ok().inc();
@@ -282,9 +304,20 @@ async fn worker_streamed(
Ok(())
}
#[derive(Debug, Clone)]
enum FutJobKind {
SeriesData,
SeriesMsp,
TimeBinSimpleF32V02,
BinWriteIndexV04,
Accounting,
AccountingRecv,
}
struct FutJob {
fut: InsertFut,
ts_net: Instant,
jobkind: FutJobKind,
}
fn transform_to_db_futures<S>(
@@ -296,11 +329,7 @@ where
S: Stream<Item = VecDeque<QueryItem>>,
{
trace_transform!("transform_to_db_futures begin");
// TODO possible without box?
// let item_inp = Box::pin(item_inp);
item_inp.map(move |batch| {
// TODO
// stats.item_recv.inc();
trace_transform!("transform_to_db_futures have batch len {}", batch.len());
let tsnow = Instant::now();
let mut res = Vec::with_capacity(32);
@@ -400,6 +429,7 @@ fn prepare_msp_insert_futs(item: MspItem, data_store: &Arc<DataStore>) -> SmallV
let fut = FutJob {
fut,
ts_net: item.ts_net(),
jobkind: FutJobKind::SeriesMsp,
};
let futs = smallvec![fut];
futs
@@ -412,6 +442,7 @@ fn prepare_query_insert_futs(item: InsertItem, data_store: &Arc<DataStore>) -> S
let fut = FutJob {
fut,
ts_net: item_ts_net,
jobkind: FutJobKind::SeriesData,
};
let futs = smallvec![fut];
futs
@@ -439,7 +470,11 @@ fn prepare_timebin_v02_insert_futs(
data_store.qu_insert_binned_scalar_f32_v02.clone(),
params,
);
let fut = FutJob { fut, ts_net: tsnow };
let fut = FutJob {
fut,
ts_net: tsnow,
jobkind: FutJobKind::TimeBinSimpleF32V02,
};
let futs = smallvec![fut];
// TODO match on the query result:
@@ -467,7 +502,11 @@ fn prepare_bin_write_index_v04_insert_futs(
data_store.qu_insert_bin_write_index_v04.clone(),
params,
);
let fut = FutJob { fut, ts_net: tsnow };
let fut = FutJob {
fut,
ts_net: tsnow,
jobkind: FutJobKind::BinWriteIndexV04,
};
let futs = smallvec![fut];
// TODO match on the query result:
@@ -497,7 +536,11 @@ fn prepare_accounting_insert_futs(
item.bytes,
);
let fut = InsertFut::new(data_store.scy.clone(), data_store.qu_account_00.clone(), params);
let fut = FutJob { fut, ts_net: tsnow };
let fut = FutJob {
fut,
ts_net: tsnow,
jobkind: FutJobKind::Accounting,
};
let futs = smallvec![fut];
futs
}
@@ -515,7 +558,11 @@ fn prepare_accounting_recv_insert_futs(
item.bytes,
);
let fut = InsertFut::new(data_store.scy.clone(), data_store.qu_account_recv_00.clone(), params);
let fut = FutJob { fut, ts_net: tsnow };
let fut = FutJob {
fut,
ts_net: tsnow,
jobkind: FutJobKind::AccountingRecv,
};
let futs = smallvec![fut];
futs
}

View File

@@ -37,12 +37,20 @@ impl From<crate::session::Error> for Error {
}
struct Changeset {
name: String,
todo: Vec<String>,
}
impl Changeset {
fn new() -> Self {
Self { todo: Vec::new() }
Self {
name: String::new(),
todo: Vec::new(),
}
}
fn set_name(&mut self, x: String) {
self.name = x;
}
fn add_todo(&mut self, cql: String) {
@@ -55,7 +63,7 @@ impl Changeset {
fn log_statements(&self) {
for q in &self.todo {
info!("would execute:\n{}\n", q);
info!("would execute:\n{}\n{}\n", self.name, q);
}
}
}
@@ -717,16 +725,11 @@ async fn migrate_scylla_data_schema(
}
pub async fn migrate_scylla_data_schema_all_rt(
rts: [RetentionTime; 4],
scyconfs: [&ScyllaIngestConfig; 4],
do_change: bool,
) -> Result<(), Error> {
let mut chsa = [Changeset::new(), Changeset::new(), Changeset::new(), Changeset::new()];
let rts = [
RetentionTime::Short,
RetentionTime::Medium,
RetentionTime::Long,
RetentionTime::Short,
];
let rfs = [3, 3, 3, 1];
for (((rt, scyconf), chs), rf) in rts
.clone()
@@ -735,12 +738,13 @@ pub async fn migrate_scylla_data_schema_all_rt(
.zip(chsa.iter_mut())
.zip(rfs.iter().map(|&x| x))
{
chs.set_name(scyconf.short_name(rt.clone()));
migrate_scylla_data_schema(scyconf, rt, rf, chs).await?;
}
let todo = chsa.iter().any(|x| x.has_to_do());
if do_change {
if todo {
for ((_rt, scyconf), chs) in rts.into_iter().zip(scyconfs.iter()).zip(chsa.iter_mut()) {
for ((_rt, scyconf), chs) in rts.clone().into_iter().zip(scyconfs.iter()).zip(chsa.iter_mut()) {
if chs.has_to_do() {
let scy2 = create_session_no_ks(scyconf).await?;
let scy = &scy2;
@@ -755,7 +759,7 @@ pub async fn migrate_scylla_data_schema_all_rt(
}
}
}
let fut = migrate_scylla_data_schema_all_rt(scyconfs, false);
let fut = migrate_scylla_data_schema_all_rt(rts, scyconfs, false);
Box::pin(fut).await?;
Ok(())
} else {

View File

@@ -1,3 +1,15 @@
mod Metrics {
type StructName = ScyllaJobTransform;
enum counters {
SeriesData,
SeriesMsp,
TimeBinSimpleF32V02,
BinWriteIndexV04,
Accounting,
AccountingRecv,
}
}
mod Metrics {
type StructName = ScyllaInsertWorker;
enum counters {
@@ -10,6 +22,10 @@ mod Metrics {
job_dt2,
job_dt_net,
}
mod Compose {
type Input = ScyllaJobTransform;
type Name = jobtrans;
}
}
mod Metrics {
@@ -41,6 +57,8 @@ mod Metrics {
pong_timeout,
caget_timeout,
caget_issued,
fn_handle_event_add_res,
fn_handle_read_notify_res,
unknown_ioid,
monitor_stale_read_begin,
monitor_stale_read_timeout,
@@ -81,6 +99,7 @@ mod Metrics {
writer_ignore_monitor_not_min_quiet,
writer_ignore_poll_not_min_quiet,
writer_ignore_rate_cap,
emit_channel_status_item,
}
enum values {
channel_all_count,

View File

@@ -1,5 +1,6 @@
pub mod mett;
pub use mettrics;
pub use rand_xoshiro;
use std::time::Duration;