From af3b550a437bb0b0ac6274b04d42f1d51ad5d2d1 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 6 May 2025 11:24:12 +0200 Subject: [PATCH] Add scylla default hosts config option --- daqingest/src/bin/daqingest.rs | 8 +- daqingest/src/daemon.rs | 8 +- netfetch/src/conf.rs | 147 +++++++++++++++++++++++++++++---- scywr/src/config.rs | 12 +-- 4 files changed, 145 insertions(+), 30 deletions(-) diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 634ab8e..15c866c 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -164,10 +164,10 @@ async fn scylla_schema_check(opts: CaIngestOpts, do_change: bool) -> Result<(), info!("start scylla schema {}", opstr); scywr::schema::migrate_scylla_data_schema_all_rt( [ - opts.scylla_config_st(), - opts.scylla_config_mt(), - opts.scylla_config_lt(), - opts.scylla_config_st_rf1(), + &opts.scylla_config_st(), + &opts.scylla_config_mt(), + &opts.scylla_config_lt(), + &opts.scylla_config_st_rf1(), ], do_change, ) diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index c5992de..20b9e17 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -983,10 +983,10 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> info!("start scylla schema check"); scywr::schema::migrate_scylla_data_schema_all_rt( [ - opts.scylla_config_st(), - opts.scylla_config_mt(), - opts.scylla_config_lt(), - opts.scylla_config_st_rf1(), + &opts.scylla_config_st(), + &opts.scylla_config_mt(), + &opts.scylla_config_lt(), + &opts.scylla_config_st_rf1(), ], false, ) diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 922b41a..9903f0b 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -27,10 +27,11 @@ pub struct CaIngestOpts { #[serde(default, with = "humantime_serde")] timeout: Option, postgresql: Database, - scylla_st: ScyllaIngestConfig, - scylla_mt: ScyllaIngestConfig, - scylla_lt: ScyllaIngestConfig, - scylla_st_rf1: ScyllaIngestConfig, + scylla: Option, + scylla_st: ScyllaRtConf, + scylla_mt: ScyllaRtConf, + scylla_lt: ScyllaRtConf, + scylla_st_rf1: ScyllaRtConf, array_truncate: Option, insert_worker_count: Option, insert_worker_concurrency: Option, @@ -67,20 +68,44 @@ impl CaIngestOpts { &self.postgresql } - pub fn scylla_config_st(&self) -> &ScyllaIngestConfig { - &self.scylla_st + pub fn scylla_config_st(&self) -> ScyllaIngestConfig { + let d = &self.scylla; + let c = &self.scylla_st; + let hosts = c + .hosts + .as_ref() + .map_or(d.as_ref().map_or(Vec::new(), |x| x.hosts.clone()), |x| x.clone()); + ScyllaIngestConfig::new(hosts, c.keyspace.clone()) } - pub fn scylla_config_mt(&self) -> &ScyllaIngestConfig { - &self.scylla_mt + pub fn scylla_config_mt(&self) -> ScyllaIngestConfig { + let d = &self.scylla; + let c = &self.scylla_mt; + let hosts = c + .hosts + .as_ref() + .map_or(d.as_ref().map_or(Vec::new(), |x| x.hosts.clone()), |x| x.clone()); + ScyllaIngestConfig::new(hosts, c.keyspace.clone()) } - pub fn scylla_config_lt(&self) -> &ScyllaIngestConfig { - &self.scylla_lt + pub fn scylla_config_lt(&self) -> ScyllaIngestConfig { + let d = &self.scylla; + let c = &self.scylla_lt; + let hosts = c + .hosts + .as_ref() + .map_or(d.as_ref().map_or(Vec::new(), |x| x.hosts.clone()), |x| x.clone()); + ScyllaIngestConfig::new(hosts, c.keyspace.clone()) } - pub fn scylla_config_st_rf1(&self) -> &ScyllaIngestConfig { - &self.scylla_st_rf1 + pub fn scylla_config_st_rf1(&self) -> ScyllaIngestConfig { + let d = &self.scylla; + let c = &self.scylla_st_rf1; + let hosts = c + .hosts + .as_ref() + .map_or(d.as_ref().map_or(Vec::new(), |x| x.hosts.clone()), |x| x.clone()); + ScyllaIngestConfig::new(hosts, c.keyspace.clone()) } pub fn search(&self) -> &Vec { @@ -134,6 +159,18 @@ impl CaIngestOpts { pub fn scylla_ignore_writes(&self) -> bool { self.scylla_ignore_writes } + + pub fn is_valid(&self) -> bool { + let confs = [&self.scylla_st, &self.scylla_mt, &self.scylla_lt, &self.scylla_st_rf1]; + let has_default_hosts = self.scylla.is_some(); + for c in confs.iter() { + if c.hosts.is_none() && !has_default_hosts { + warn!("scylla config is missing hosts"); + return false; + } + } + return true; + } } #[test] @@ -153,20 +190,83 @@ postgresql: pass: PASS name: NAME scylla_st: + keyspace: ks_st hosts: - - sf-nube-11:19042 - - sf-nube-12:19042 - keyspace: ks1 + - node-st-1:19042 + - node-st-2:19042 +scylla_mt: + keyspace: ks_mt + hosts: + - node-mt-1:19042 + - node-mt-2:19042 +scylla_st_rf1: + keyspace: ks_st_rf1 + hosts: + - node-st-rf1-1:19042 + - node-st-rf1-2:19042 +scylla_lt: + keyspace: ks_lt + hosts: + - node-lt-1:19042 + - node-lt-2:19042 "###; let res: Result = serde_yaml::from_slice(conf.as_bytes()); let conf = res.unwrap(); + assert_eq!(conf.is_valid(), true); assert_eq!(conf.channels, Some(PathBuf::from("/some/path/file.txt"))); assert_eq!(&conf.api_bind, "0.0.0.0:3011"); assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string())); assert_eq!( conf.scylla_config_st().hosts().get(1), - Some(&"sf-nube-12:19042".to_string()) + Some(&"node-st-2:19042".to_string()) ); + assert_eq!( + conf.scylla_config_lt().hosts().get(1), + Some(&"node-lt-2:19042".to_string()) + ); + assert_eq!(conf.timeout, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45))); +} + +#[test] +fn parse_config_with_scylla_default() { + let conf = r###" +backend: test_backend +timeout: 10m 3s 45ms +api_bind: "0.0.0.0:3011" +channels: /some/path/file.txt +search: + - 172.26.0.255 + - 172.26.2.255 +postgresql: + host: host.example.com + port: 5432 + user: USER + pass: PASS + name: NAME +scylla: + hosts: + - node1:19042 + - node2:19042 +scylla_st: + keyspace: ks_st +scylla_mt: + keyspace: ks_mt +scylla_st_rf1: + keyspace: ks_st_rf1 +scylla_lt: + keyspace: ks_lt + hosts: + - node3:19042 + - node4:19042 +"###; + let res: Result = serde_yaml::from_slice(conf.as_bytes()); + let conf = res.unwrap(); + assert_eq!(conf.is_valid(), true); + assert_eq!(conf.channels, Some(PathBuf::from("/some/path/file.txt"))); + assert_eq!(&conf.api_bind, "0.0.0.0:3011"); + assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string())); + assert_eq!(conf.scylla_config_st().hosts().get(1), Some(&"node2:19042".to_string())); + assert_eq!(conf.scylla_config_lt().hosts().get(1), Some(&"node4:19042".to_string())); assert_eq!(conf.timeout, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45))); } @@ -259,6 +359,10 @@ pub async fn parse_config(config: PathBuf) -> Result<(CaIngestOpts, Option, +} + +#[derive(Debug, Clone, Deserialize)] +struct ScyllaRtConf { + keyspace: String, + hosts: Option>, +} + #[derive(Debug, Clone, PartialEq, Deserialize)] pub struct IngestConfigArchiving { #[serde(default = "bool_true")] diff --git a/scywr/src/config.rs b/scywr/src/config.rs index e8a91b7..53a32e5 100644 --- a/scywr/src/config.rs +++ b/scywr/src/config.rs @@ -2,8 +2,8 @@ use serde::Deserialize; #[derive(Debug, Clone, Deserialize)] pub struct ScyllaIngestConfig { - hosts: Vec, keyspace: String, + hosts: Vec, } impl ScyllaIngestConfig { @@ -14,16 +14,16 @@ impl ScyllaIngestConfig { K1: Into, { Self { - hosts: hosts.into_iter().map(Into::into).collect(), keyspace: ks.into(), + hosts: hosts.into_iter().map(Into::into).collect(), } } - pub fn hosts(&self) -> &Vec { - &self.hosts - } - pub fn keyspace(&self) -> &String { &self.keyspace } + + pub fn hosts(&self) -> &Vec { + &self.hosts + } }