From e3074411dc71f69619c1ba03b717c7edee2c9b2d Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 27 Feb 2025 12:01:14 +0100 Subject: [PATCH] Scylla schema checks via one entry fn --- daqingest/src/bin/daqingest.rs | 48 +++++++------ daqingest/src/daemon.rs | 25 +++---- daqingest/src/opts.rs | 36 +++++++++- scywr/src/schema.rs | 119 ++++++++++++++++----------------- 4 files changed, 130 insertions(+), 98 deletions(-) diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 9e6174d..7307ee7 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -2,8 +2,8 @@ use clap::Parser; use daqingest::opts::DaqIngestOpts; use err::Error; use log::*; -use netfetch::conf::parse_config; use netfetch::conf::CaIngestOpts; +use netfetch::conf::parse_config; use netpod::Database; use scywr::config::ScyllaIngestConfig; use taskrun::TracingMode; @@ -33,7 +33,7 @@ async fn main_run(opts: DaqIngestOpts) -> Result<(), Error> { } async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { - let buildmark = "+0009"; + let buildmark = "+0010"; use daqingest::opts::ChannelAccess; use daqingest::opts::SubCmd; match opts.subcmd { @@ -97,16 +97,20 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { } } } - SubCmd::ScyllaSchemaCheck(k) => { - info!("daqingest version {} {}", clap::crate_version!(), buildmark); - let (opts, _) = parse_config(k.config.into()).await?; - scylla_schema_check(opts, false).await?; - } - SubCmd::ScyllaSchemaChange(k) => { - info!("daqingest version {} {}", clap::crate_version!(), buildmark); - let (opts, _) = parse_config(k.config.into()).await?; - scylla_schema_check(opts, true).await?; - } + SubCmd::Scylla(k) => match k.sub { + daqingest::opts::ScyllaSubcmd::Schema(j) => match j.sub { + daqingest::opts::ScyllaSchemaSubcmd::Check(h) => { + info!("daqingest version {} {}", clap::crate_version!(), buildmark); + let (opts, _) = parse_config(h.config.into()).await?; + scylla_schema_check(opts, false).await?; + } + daqingest::opts::ScyllaSchemaSubcmd::Change(h) => { + info!("daqingest version {} {}", clap::crate_version!(), buildmark); + let (opts, _) = parse_config(h.config.into()).await?; + scylla_schema_check(opts, true).await?; + } + }, + }, SubCmd::ChannelAccess(k) => match k { ChannelAccess::CaIngest(k) => { info!("daqingest version {} {}", clap::crate_version!(), buildmark); @@ -161,16 +165,16 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { async fn scylla_schema_check(opts: CaIngestOpts, do_change: bool) -> Result<(), Error> { let opstr = if do_change { "change" } else { "check" }; info!("start scylla schema {}", opstr); - use netpod::ttl::RetentionTime; - scywr::schema::migrate_scylla_data_schema(opts.scylla_config_st(), RetentionTime::Short, do_change) - .await - .map_err(Error::from_string)?; - scywr::schema::migrate_scylla_data_schema(opts.scylla_config_mt(), RetentionTime::Medium, do_change) - .await - .map_err(Error::from_string)?; - scywr::schema::migrate_scylla_data_schema(opts.scylla_config_lt(), RetentionTime::Long, do_change) - .await - .map_err(Error::from_string)?; + scywr::schema::migrate_scylla_data_schema_all_rt( + [ + opts.scylla_config_st(), + opts.scylla_config_mt(), + opts.scylla_config_lt(), + ], + do_change, + ) + .await + .map_err(Error::from_string)?; info!("stop scylla schema {}", opstr); Ok(()) } diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 6e87ae7..130acd0 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -17,20 +17,20 @@ use netfetch::daemon_common::DaemonEvent; use netfetch::metrics::RoutesResources; use netfetch::metrics::StatsSet; use netfetch::throttletrace::ThrottleTrace; -use netpod::ttl::RetentionTime; use netpod::Database; +use netpod::ttl::RetentionTime; use scywr::config::ScyllaIngestConfig; use scywr::insertqueues::InsertQueuesRx; use scywr::insertqueues::InsertQueuesTx; use scywr::insertworker::InsertWorkerOpts; -use stats::rand_xoshiro::rand_core::RngCore; use stats::DaemonStats; use stats::InsertWorkerStats; use stats::SeriesByChannelStats; +use stats::rand_xoshiro::rand_core::RngCore; +use std::sync::Arc; use std::sync::atomic; use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicUsize; -use std::sync::Arc; use std::time::Duration; use std::time::Instant; use std::time::SystemTime; @@ -802,15 +802,16 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> warn!("scylla_disable config flag enabled"); } else { info!("start scylla schema check"); - scywr::schema::migrate_scylla_data_schema(opts.scylla_config_st(), RetentionTime::Short, false) - .await - .map_err(Error::from_string)?; - scywr::schema::migrate_scylla_data_schema(opts.scylla_config_mt(), RetentionTime::Medium, false) - .await - .map_err(Error::from_string)?; - scywr::schema::migrate_scylla_data_schema(opts.scylla_config_lt(), RetentionTime::Long, false) - .await - .map_err(Error::from_string)?; + scywr::schema::migrate_scylla_data_schema_all_rt( + [ + opts.scylla_config_st(), + opts.scylla_config_mt(), + opts.scylla_config_lt(), + ], + false, + ) + .await + .map_err(Error::from_string)?; info!("stop scylla schema check"); } info!("database check done"); diff --git a/daqingest/src/opts.rs b/daqingest/src/opts.rs index 0d87679..9a084d2 100644 --- a/daqingest/src/opts.rs +++ b/daqingest/src/opts.rs @@ -23,8 +23,7 @@ pub enum SubCmd { ListPulses, FetchEvents(FetchEvents), Db(Db), - ScyllaSchemaCheck(CaConfig), - ScyllaSchemaChange(CaConfig), + Scylla(Scylla), #[command(subcommand)] ChannelAccess(ChannelAccess), #[cfg(feature = "bsread")] @@ -117,6 +116,39 @@ pub struct Db { pub sub: DbSub, } +#[derive(Debug, clap::Parser)] +pub struct Scylla { + #[command(subcommand)] + pub sub: ScyllaSubcmd, +} + +#[derive(Debug, clap::Parser)] +pub enum ScyllaSubcmd { + Schema(ScyllaSchema), +} + +#[derive(Debug, clap::Parser)] +pub struct ScyllaSchema { + #[command(subcommand)] + pub sub: ScyllaSchemaSubcmd, +} + +#[derive(Debug, clap::Parser)] +pub enum ScyllaSchemaSubcmd { + Check(ScyllaSchemaCheck), + Change(ScyllaSchemaChange), +} + +#[derive(Debug, clap::Parser)] +pub struct ScyllaSchemaCheck { + pub config: String, +} + +#[derive(Debug, clap::Parser)] +pub struct ScyllaSchemaChange { + pub config: String, +} + #[derive(Debug, clap::Parser)] pub struct ScyllaDb { #[arg(long)] diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index c57e161..d3e47f4 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -36,28 +36,12 @@ impl From for Error { } struct Changeset { - do_change: bool, todo: Vec, - cql_done: Vec, } impl Changeset { fn new() -> Self { - Self { - do_change: false, - todo: Vec::new(), - cql_done: Vec::new(), - } - } - - fn with_do_change(self, do_change: bool) -> Self { - let mut x = self; - x.do_change = do_change; - x - } - - fn do_change(&self) -> bool { - self.do_change + Self { todo: Vec::new() } } fn add_todo(&mut self, cql: String) { @@ -535,21 +519,35 @@ async fn check_event_tables( Ok(()) } -pub async fn migrate_scylla_data_schema( +async fn migrate_scylla_data_schema( scyconf: &ScyllaIngestConfig, rett: RetentionTime, - do_change: bool, + chs: &mut Changeset, ) -> Result<(), Error> { - let mut chsv = Changeset::new().with_do_change(do_change); - let chs = &mut chsv; let scy2 = create_session_no_ks(scyconf).await?; let scy = &scy2; let durable = true; if !has_keyspace(scyconf.keyspace(), scy).await? { - if chs.do_change() { - // TODO - let replication = 3; + // TODO + let replication = 3; + let cql = format!( + concat!( + "create keyspace {}", + " with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': {} }}", + " and durable_writes = {};" + ), + scyconf.keyspace(), + replication, + durable + ); + info!("scylla create keyspace {cql}"); + chs.add_todo(cql); + } + + if let Some(ks) = scyconf.keyspace_rf1() { + if !has_keyspace(ks, scy).await? { + let replication = 1; let cql = format!( concat!( "create keyspace {}", @@ -561,35 +559,7 @@ pub async fn migrate_scylla_data_schema( durable ); info!("scylla create keyspace {cql}"); - scy.query_iter(cql, ()).await?; - info!("keyspace created"); - } else { - error!("missing keyspace {:?}", scyconf.keyspace()); - return Err(Error::BadSchema); - } - } - - if let Some(ks) = scyconf.keyspace_rf1() { - if !has_keyspace(ks, scy).await? { - if chs.do_change() { - let replication = 1; - let cql = format!( - concat!( - "create keyspace {}", - " with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': {} }}", - " and durable_writes = {};" - ), - scyconf.keyspace(), - replication, - durable - ); - info!("scylla create keyspace {cql}"); - scy.query_iter(cql, ()).await?; - info!("keyspace created"); - } else { - error!("missing keyspace {:?}", scyconf.keyspace_rf1()); - return Err(Error::BadSchema); - } + chs.add_todo(cql); } } @@ -740,19 +710,44 @@ pub async fn migrate_scylla_data_schema( ); tab.setup(chs, scy).await?; } + Ok(()) +} - if chs.has_to_do() { - if do_change { - for cql in chs.todo.iter() { - scy.query_unpaged(cql.as_str(), ()).await?; +pub async fn migrate_scylla_data_schema_all_rt( + scyconfs: [&ScyllaIngestConfig; 3], + do_change: bool, +) -> Result<(), Error> { + let mut chsa = [Changeset::new(), Changeset::new(), Changeset::new()]; + let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long]; + for ((rt, scyconf), chs) in rts.clone().into_iter().zip(scyconfs.into_iter()).zip(chsa.iter_mut()) { + migrate_scylla_data_schema(scyconf, rt, chs).await?; + } + let todo = chsa.iter().any(|x| x.has_to_do()); + if do_change { + if todo { + for ((_rt, scyconf), chs) in rts.into_iter().zip(scyconfs.into_iter()).zip(chsa.iter_mut()) { + if chs.has_to_do() { + let scy2 = create_session_no_ks(scyconf).await?; + let scy = &scy2; + for cql in chs.todo.iter() { + scy.query_unpaged(cql.as_str(), ()).await?; + } + } } - let fut = migrate_scylla_data_schema(scyconf, rett, false); - Box::pin(fut).await + let fut = migrate_scylla_data_schema_all_rt(scyconfs, false); + Box::pin(fut).await?; + Ok(()) } else { - chs.log_statements(); - Err(Error::BadSchema) + Ok(()) } } else { - Ok(()) + if todo { + for chs in chsa.iter_mut() { + chs.log_statements(); + } + Err(Error::BadSchema) + } else { + Ok(()) + } } }