Add scylla default hosts config option

This commit is contained in:
Dominik Werder
2025-05-06 11:24:12 +02:00
parent 6c789e54b4
commit af3b550a43
4 changed files with 145 additions and 30 deletions

View File

@@ -164,10 +164,10 @@ async fn scylla_schema_check(opts: CaIngestOpts, do_change: bool) -> Result<(),
info!("start scylla schema {}", opstr);
scywr::schema::migrate_scylla_data_schema_all_rt(
[
opts.scylla_config_st(),
opts.scylla_config_mt(),
opts.scylla_config_lt(),
opts.scylla_config_st_rf1(),
&opts.scylla_config_st(),
&opts.scylla_config_mt(),
&opts.scylla_config_lt(),
&opts.scylla_config_st_rf1(),
],
do_change,
)

View File

@@ -983,10 +983,10 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option<ChannelsConfig>) ->
info!("start scylla schema check");
scywr::schema::migrate_scylla_data_schema_all_rt(
[
opts.scylla_config_st(),
opts.scylla_config_mt(),
opts.scylla_config_lt(),
opts.scylla_config_st_rf1(),
&opts.scylla_config_st(),
&opts.scylla_config_mt(),
&opts.scylla_config_lt(),
&opts.scylla_config_st_rf1(),
],
false,
)

View File

@@ -27,10 +27,11 @@ pub struct CaIngestOpts {
#[serde(default, with = "humantime_serde")]
timeout: Option<Duration>,
postgresql: Database,
scylla_st: ScyllaIngestConfig,
scylla_mt: ScyllaIngestConfig,
scylla_lt: ScyllaIngestConfig,
scylla_st_rf1: ScyllaIngestConfig,
scylla: Option<ScyllaDefaultHosts>,
scylla_st: ScyllaRtConf,
scylla_mt: ScyllaRtConf,
scylla_lt: ScyllaRtConf,
scylla_st_rf1: ScyllaRtConf,
array_truncate: Option<u64>,
insert_worker_count: Option<usize>,
insert_worker_concurrency: Option<usize>,
@@ -67,20 +68,44 @@ impl CaIngestOpts {
&self.postgresql
}
pub fn scylla_config_st(&self) -> &ScyllaIngestConfig {
&self.scylla_st
pub fn scylla_config_st(&self) -> ScyllaIngestConfig {
let d = &self.scylla;
let c = &self.scylla_st;
let hosts = c
.hosts
.as_ref()
.map_or(d.as_ref().map_or(Vec::new(), |x| x.hosts.clone()), |x| x.clone());
ScyllaIngestConfig::new(hosts, c.keyspace.clone())
}
pub fn scylla_config_mt(&self) -> &ScyllaIngestConfig {
&self.scylla_mt
pub fn scylla_config_mt(&self) -> ScyllaIngestConfig {
let d = &self.scylla;
let c = &self.scylla_mt;
let hosts = c
.hosts
.as_ref()
.map_or(d.as_ref().map_or(Vec::new(), |x| x.hosts.clone()), |x| x.clone());
ScyllaIngestConfig::new(hosts, c.keyspace.clone())
}
pub fn scylla_config_lt(&self) -> &ScyllaIngestConfig {
&self.scylla_lt
pub fn scylla_config_lt(&self) -> ScyllaIngestConfig {
let d = &self.scylla;
let c = &self.scylla_lt;
let hosts = c
.hosts
.as_ref()
.map_or(d.as_ref().map_or(Vec::new(), |x| x.hosts.clone()), |x| x.clone());
ScyllaIngestConfig::new(hosts, c.keyspace.clone())
}
pub fn scylla_config_st_rf1(&self) -> &ScyllaIngestConfig {
&self.scylla_st_rf1
pub fn scylla_config_st_rf1(&self) -> ScyllaIngestConfig {
let d = &self.scylla;
let c = &self.scylla_st_rf1;
let hosts = c
.hosts
.as_ref()
.map_or(d.as_ref().map_or(Vec::new(), |x| x.hosts.clone()), |x| x.clone());
ScyllaIngestConfig::new(hosts, c.keyspace.clone())
}
pub fn search(&self) -> &Vec<String> {
@@ -134,6 +159,18 @@ impl CaIngestOpts {
pub fn scylla_ignore_writes(&self) -> bool {
self.scylla_ignore_writes
}
pub fn is_valid(&self) -> bool {
let confs = [&self.scylla_st, &self.scylla_mt, &self.scylla_lt, &self.scylla_st_rf1];
let has_default_hosts = self.scylla.is_some();
for c in confs.iter() {
if c.hosts.is_none() && !has_default_hosts {
warn!("scylla config is missing hosts");
return false;
}
}
return true;
}
}
#[test]
@@ -153,20 +190,83 @@ postgresql:
pass: PASS
name: NAME
scylla_st:
keyspace: ks_st
hosts:
- sf-nube-11:19042
- sf-nube-12:19042
keyspace: ks1
- node-st-1:19042
- node-st-2:19042
scylla_mt:
keyspace: ks_mt
hosts:
- node-mt-1:19042
- node-mt-2:19042
scylla_st_rf1:
keyspace: ks_st_rf1
hosts:
- node-st-rf1-1:19042
- node-st-rf1-2:19042
scylla_lt:
keyspace: ks_lt
hosts:
- node-lt-1:19042
- node-lt-2:19042
"###;
let res: Result<CaIngestOpts, _> = serde_yaml::from_slice(conf.as_bytes());
let conf = res.unwrap();
assert_eq!(conf.is_valid(), true);
assert_eq!(conf.channels, Some(PathBuf::from("/some/path/file.txt")));
assert_eq!(&conf.api_bind, "0.0.0.0:3011");
assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string()));
assert_eq!(
conf.scylla_config_st().hosts().get(1),
Some(&"sf-nube-12:19042".to_string())
Some(&"node-st-2:19042".to_string())
);
assert_eq!(
conf.scylla_config_lt().hosts().get(1),
Some(&"node-lt-2:19042".to_string())
);
assert_eq!(conf.timeout, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45)));
}
#[test]
fn parse_config_with_scylla_default() {
let conf = r###"
backend: test_backend
timeout: 10m 3s 45ms
api_bind: "0.0.0.0:3011"
channels: /some/path/file.txt
search:
- 172.26.0.255
- 172.26.2.255
postgresql:
host: host.example.com
port: 5432
user: USER
pass: PASS
name: NAME
scylla:
hosts:
- node1:19042
- node2:19042
scylla_st:
keyspace: ks_st
scylla_mt:
keyspace: ks_mt
scylla_st_rf1:
keyspace: ks_st_rf1
scylla_lt:
keyspace: ks_lt
hosts:
- node3:19042
- node4:19042
"###;
let res: Result<CaIngestOpts, _> = serde_yaml::from_slice(conf.as_bytes());
let conf = res.unwrap();
assert_eq!(conf.is_valid(), true);
assert_eq!(conf.channels, Some(PathBuf::from("/some/path/file.txt")));
assert_eq!(&conf.api_bind, "0.0.0.0:3011");
assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string()));
assert_eq!(conf.scylla_config_st().hosts().get(1), Some(&"node2:19042".to_string()));
assert_eq!(conf.scylla_config_lt().hosts().get(1), Some(&"node4:19042".to_string()));
assert_eq!(conf.timeout, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45)));
}
@@ -259,6 +359,10 @@ pub async fn parse_config(config: PathBuf) -> Result<(CaIngestOpts, Option<Chann
file.read_to_end(&mut buf).await?;
let conf: CaIngestOpts = serde_yaml::from_slice(&buf).map_err(Error::from_string)?;
drop(file);
if !conf.is_valid() {
let e = Error::with_msg_no_trace(format!("invalid config file"));
return Err(e);
}
// let re_p = regex::Regex::new(&conf.whitelist.clone().unwrap_or("--nothing-ur9nc23ur98c--".into()))?;
// let re_n = regex::Regex::new(&conf.blacklist.clone().unwrap_or("--nothing-ksm2u98rcm28--".into()))?;
let channels = parse_channels(conf.channels.clone()).await?;
@@ -316,6 +420,17 @@ impl ChannelTimestamp {
}
}
#[derive(Debug, Clone, Deserialize)]
struct ScyllaDefaultHosts {
hosts: Vec<String>,
}
#[derive(Debug, Clone, Deserialize)]
struct ScyllaRtConf {
keyspace: String,
hosts: Option<Vec<String>>,
}
#[derive(Debug, Clone, PartialEq, Deserialize)]
pub struct IngestConfigArchiving {
#[serde(default = "bool_true")]

View File

@@ -2,8 +2,8 @@ use serde::Deserialize;
#[derive(Debug, Clone, Deserialize)]
pub struct ScyllaIngestConfig {
hosts: Vec<String>,
keyspace: String,
hosts: Vec<String>,
}
impl ScyllaIngestConfig {
@@ -14,16 +14,16 @@ impl ScyllaIngestConfig {
K1: Into<String>,
{
Self {
hosts: hosts.into_iter().map(Into::into).collect(),
keyspace: ks.into(),
hosts: hosts.into_iter().map(Into::into).collect(),
}
}
pub fn hosts(&self) -> &Vec<String> {
&self.hosts
}
pub fn keyspace(&self) -> &String {
&self.keyspace
}
pub fn hosts(&self) -> &Vec<String> {
&self.hosts
}
}