Scylla schema checks via one entry fn

This commit is contained in:
Dominik Werder
2025-02-27 12:01:14 +01:00
parent 12778fe121
commit e3074411dc
4 changed files with 130 additions and 98 deletions

View File

@@ -2,8 +2,8 @@ use clap::Parser;
use daqingest::opts::DaqIngestOpts;
use err::Error;
use log::*;
use netfetch::conf::parse_config;
use netfetch::conf::CaIngestOpts;
use netfetch::conf::parse_config;
use netpod::Database;
use scywr::config::ScyllaIngestConfig;
use taskrun::TracingMode;
@@ -33,7 +33,7 @@ async fn main_run(opts: DaqIngestOpts) -> Result<(), Error> {
}
async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
let buildmark = "+0009";
let buildmark = "+0010";
use daqingest::opts::ChannelAccess;
use daqingest::opts::SubCmd;
match opts.subcmd {
@@ -97,16 +97,20 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
}
}
}
SubCmd::ScyllaSchemaCheck(k) => {
info!("daqingest version {} {}", clap::crate_version!(), buildmark);
let (opts, _) = parse_config(k.config.into()).await?;
scylla_schema_check(opts, false).await?;
}
SubCmd::ScyllaSchemaChange(k) => {
info!("daqingest version {} {}", clap::crate_version!(), buildmark);
let (opts, _) = parse_config(k.config.into()).await?;
scylla_schema_check(opts, true).await?;
}
SubCmd::Scylla(k) => match k.sub {
daqingest::opts::ScyllaSubcmd::Schema(j) => match j.sub {
daqingest::opts::ScyllaSchemaSubcmd::Check(h) => {
info!("daqingest version {} {}", clap::crate_version!(), buildmark);
let (opts, _) = parse_config(h.config.into()).await?;
scylla_schema_check(opts, false).await?;
}
daqingest::opts::ScyllaSchemaSubcmd::Change(h) => {
info!("daqingest version {} {}", clap::crate_version!(), buildmark);
let (opts, _) = parse_config(h.config.into()).await?;
scylla_schema_check(opts, true).await?;
}
},
},
SubCmd::ChannelAccess(k) => match k {
ChannelAccess::CaIngest(k) => {
info!("daqingest version {} {}", clap::crate_version!(), buildmark);
@@ -161,16 +165,16 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
async fn scylla_schema_check(opts: CaIngestOpts, do_change: bool) -> Result<(), Error> {
let opstr = if do_change { "change" } else { "check" };
info!("start scylla schema {}", opstr);
use netpod::ttl::RetentionTime;
scywr::schema::migrate_scylla_data_schema(opts.scylla_config_st(), RetentionTime::Short, do_change)
.await
.map_err(Error::from_string)?;
scywr::schema::migrate_scylla_data_schema(opts.scylla_config_mt(), RetentionTime::Medium, do_change)
.await
.map_err(Error::from_string)?;
scywr::schema::migrate_scylla_data_schema(opts.scylla_config_lt(), RetentionTime::Long, do_change)
.await
.map_err(Error::from_string)?;
scywr::schema::migrate_scylla_data_schema_all_rt(
[
opts.scylla_config_st(),
opts.scylla_config_mt(),
opts.scylla_config_lt(),
],
do_change,
)
.await
.map_err(Error::from_string)?;
info!("stop scylla schema {}", opstr);
Ok(())
}

View File

@@ -17,20 +17,20 @@ use netfetch::daemon_common::DaemonEvent;
use netfetch::metrics::RoutesResources;
use netfetch::metrics::StatsSet;
use netfetch::throttletrace::ThrottleTrace;
use netpod::ttl::RetentionTime;
use netpod::Database;
use netpod::ttl::RetentionTime;
use scywr::config::ScyllaIngestConfig;
use scywr::insertqueues::InsertQueuesRx;
use scywr::insertqueues::InsertQueuesTx;
use scywr::insertworker::InsertWorkerOpts;
use stats::rand_xoshiro::rand_core::RngCore;
use stats::DaemonStats;
use stats::InsertWorkerStats;
use stats::SeriesByChannelStats;
use stats::rand_xoshiro::rand_core::RngCore;
use std::sync::Arc;
use std::sync::atomic;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
@@ -802,15 +802,16 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option<ChannelsConfig>) ->
warn!("scylla_disable config flag enabled");
} else {
info!("start scylla schema check");
scywr::schema::migrate_scylla_data_schema(opts.scylla_config_st(), RetentionTime::Short, false)
.await
.map_err(Error::from_string)?;
scywr::schema::migrate_scylla_data_schema(opts.scylla_config_mt(), RetentionTime::Medium, false)
.await
.map_err(Error::from_string)?;
scywr::schema::migrate_scylla_data_schema(opts.scylla_config_lt(), RetentionTime::Long, false)
.await
.map_err(Error::from_string)?;
scywr::schema::migrate_scylla_data_schema_all_rt(
[
opts.scylla_config_st(),
opts.scylla_config_mt(),
opts.scylla_config_lt(),
],
false,
)
.await
.map_err(Error::from_string)?;
info!("stop scylla schema check");
}
info!("database check done");

View File

@@ -23,8 +23,7 @@ pub enum SubCmd {
ListPulses,
FetchEvents(FetchEvents),
Db(Db),
ScyllaSchemaCheck(CaConfig),
ScyllaSchemaChange(CaConfig),
Scylla(Scylla),
#[command(subcommand)]
ChannelAccess(ChannelAccess),
#[cfg(feature = "bsread")]
@@ -117,6 +116,39 @@ pub struct Db {
pub sub: DbSub,
}
#[derive(Debug, clap::Parser)]
pub struct Scylla {
#[command(subcommand)]
pub sub: ScyllaSubcmd,
}
#[derive(Debug, clap::Parser)]
pub enum ScyllaSubcmd {
Schema(ScyllaSchema),
}
#[derive(Debug, clap::Parser)]
pub struct ScyllaSchema {
#[command(subcommand)]
pub sub: ScyllaSchemaSubcmd,
}
#[derive(Debug, clap::Parser)]
pub enum ScyllaSchemaSubcmd {
Check(ScyllaSchemaCheck),
Change(ScyllaSchemaChange),
}
#[derive(Debug, clap::Parser)]
pub struct ScyllaSchemaCheck {
pub config: String,
}
#[derive(Debug, clap::Parser)]
pub struct ScyllaSchemaChange {
pub config: String,
}
#[derive(Debug, clap::Parser)]
pub struct ScyllaDb {
#[arg(long)]

View File

@@ -36,28 +36,12 @@ impl From<crate::session::Error> for Error {
}
struct Changeset {
do_change: bool,
todo: Vec<String>,
cql_done: Vec<String>,
}
impl Changeset {
fn new() -> Self {
Self {
do_change: false,
todo: Vec::new(),
cql_done: Vec::new(),
}
}
fn with_do_change(self, do_change: bool) -> Self {
let mut x = self;
x.do_change = do_change;
x
}
fn do_change(&self) -> bool {
self.do_change
Self { todo: Vec::new() }
}
fn add_todo(&mut self, cql: String) {
@@ -535,21 +519,35 @@ async fn check_event_tables(
Ok(())
}
pub async fn migrate_scylla_data_schema(
async fn migrate_scylla_data_schema(
scyconf: &ScyllaIngestConfig,
rett: RetentionTime,
do_change: bool,
chs: &mut Changeset,
) -> Result<(), Error> {
let mut chsv = Changeset::new().with_do_change(do_change);
let chs = &mut chsv;
let scy2 = create_session_no_ks(scyconf).await?;
let scy = &scy2;
let durable = true;
if !has_keyspace(scyconf.keyspace(), scy).await? {
if chs.do_change() {
// TODO
let replication = 3;
// TODO
let replication = 3;
let cql = format!(
concat!(
"create keyspace {}",
" with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': {} }}",
" and durable_writes = {};"
),
scyconf.keyspace(),
replication,
durable
);
info!("scylla create keyspace {cql}");
chs.add_todo(cql);
}
if let Some(ks) = scyconf.keyspace_rf1() {
if !has_keyspace(ks, scy).await? {
let replication = 1;
let cql = format!(
concat!(
"create keyspace {}",
@@ -561,35 +559,7 @@ pub async fn migrate_scylla_data_schema(
durable
);
info!("scylla create keyspace {cql}");
scy.query_iter(cql, ()).await?;
info!("keyspace created");
} else {
error!("missing keyspace {:?}", scyconf.keyspace());
return Err(Error::BadSchema);
}
}
if let Some(ks) = scyconf.keyspace_rf1() {
if !has_keyspace(ks, scy).await? {
if chs.do_change() {
let replication = 1;
let cql = format!(
concat!(
"create keyspace {}",
" with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': {} }}",
" and durable_writes = {};"
),
scyconf.keyspace(),
replication,
durable
);
info!("scylla create keyspace {cql}");
scy.query_iter(cql, ()).await?;
info!("keyspace created");
} else {
error!("missing keyspace {:?}", scyconf.keyspace_rf1());
return Err(Error::BadSchema);
}
chs.add_todo(cql);
}
}
@@ -740,19 +710,44 @@ pub async fn migrate_scylla_data_schema(
);
tab.setup(chs, scy).await?;
}
Ok(())
}
if chs.has_to_do() {
if do_change {
for cql in chs.todo.iter() {
scy.query_unpaged(cql.as_str(), ()).await?;
pub async fn migrate_scylla_data_schema_all_rt(
scyconfs: [&ScyllaIngestConfig; 3],
do_change: bool,
) -> Result<(), Error> {
let mut chsa = [Changeset::new(), Changeset::new(), Changeset::new()];
let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long];
for ((rt, scyconf), chs) in rts.clone().into_iter().zip(scyconfs.into_iter()).zip(chsa.iter_mut()) {
migrate_scylla_data_schema(scyconf, rt, chs).await?;
}
let todo = chsa.iter().any(|x| x.has_to_do());
if do_change {
if todo {
for ((_rt, scyconf), chs) in rts.into_iter().zip(scyconfs.into_iter()).zip(chsa.iter_mut()) {
if chs.has_to_do() {
let scy2 = create_session_no_ks(scyconf).await?;
let scy = &scy2;
for cql in chs.todo.iter() {
scy.query_unpaged(cql.as_str(), ()).await?;
}
}
}
let fut = migrate_scylla_data_schema(scyconf, rett, false);
Box::pin(fut).await
let fut = migrate_scylla_data_schema_all_rt(scyconfs, false);
Box::pin(fut).await?;
Ok(())
} else {
chs.log_statements();
Err(Error::BadSchema)
Ok(())
}
} else {
Ok(())
if todo {
for chs in chsa.iter_mut() {
chs.log_statements();
}
Err(Error::BadSchema)
} else {
Ok(())
}
}
}