From cb2231e58d55b70f8f220735c8f4446f7c030a9f Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 16 Feb 2024 21:52:26 +0100 Subject: [PATCH] Parse channel yml conf --- daqingest/src/bin/daqingest.rs | 3 +- netfetch/src/conf.rs | 276 ++++++++++++++++++++++++++++++--- 2 files changed, 255 insertions(+), 24 deletions(-) diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 2ebad51..30ad114 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -56,7 +56,8 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { ChannelAccess::CaIngest(k) => { info!("daqingest version {}", clap::crate_version!()); let (conf, channels) = parse_config(k.config.into()).await?; - daqingest::daemon::run(conf, channels).await? + todo!(); + // daqingest::daemon::run(conf, channels).await? } }, #[cfg(feature = "bsread")] diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 8211079..8d2abda 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -2,8 +2,11 @@ use err::Error; use netpod::log::*; use netpod::Database; use netpod::ScyllaConfig; +use regex::Regex; use serde::Deserialize; use serde::Serialize; +use std::collections::BTreeMap; +use std::path::Path; use std::path::PathBuf; use std::time::Duration; use taskrun::tokio; @@ -181,40 +184,267 @@ fn test_duration_parse() { assert_eq!(a.dur, Duration::from_millis(3170)); } -pub async fn parse_config(config: PathBuf) -> Result<(CaIngestOpts, Option>), Error> { +pub async fn parse_config(config: PathBuf) -> Result<(CaIngestOpts, Option), Error> { let mut file = OpenOptions::new().read(true).open(config).await?; let mut buf = Vec::new(); file.read_to_end(&mut buf).await?; let conf: CaIngestOpts = serde_yaml::from_slice(&buf).map_err(|e| Error::with_msg_no_trace(format!("{:?}", e)))?; drop(file); - let re_p = regex::Regex::new(&conf.whitelist.clone().unwrap_or("--nothing-whitelisted--".into()))?; - let re_n = regex::Regex::new(&conf.blacklist.clone().unwrap_or("--nothing-blacklisted--".into()))?; + let re_p = regex::Regex::new(&conf.whitelist.clone().unwrap_or("--nothing-ur9nc23ur98c--".into()))?; + let re_n = regex::Regex::new(&conf.blacklist.clone().unwrap_or("--nothing-ksm2u98rcm28--".into()))?; let channels = if let Some(fname) = conf.channels.as_ref() { - let mut file = OpenOptions::new().read(true).open(fname).await?; - let mut buf = Vec::new(); - file.read_to_end(&mut buf).await?; - let lines = buf.split(|&x| x == 0x0a); - let mut channels = Vec::new(); - for line in lines { - let line = String::from_utf8_lossy(line); - let line = line.trim(); - let use_line = if line.is_empty() { - false - } else if let Some(_cs) = re_p.captures(&line) { - true - } else if re_n.is_match(&line) { - false + if fname.ends_with(".txt") { + Some(parse_channel_config_txt(fname, re_p, re_n).await?) + } else if fname.ends_with(".yml") { + let e = Error::with_msg_no_trace("unsupported channe config file"); + return Err(e); + } else { + let meta = tokio::fs::metadata(fname).await?; + if meta.is_dir() { + Some(parse_config_dir(&fname).await?) } else { - true - }; - if use_line { - channels.push(line.into()); + let e = Error::with_msg_no_trace("unsupported channe config file"); + return Err(e); } } - info!("Parsed {} channels", channels.len()); - Some(channels) } else { None }; Ok((conf, channels)) } + +async fn parse_config_dir(dir: &Path) -> Result { + let mut ret = ChannelsConfig::new(); + let mut rd = tokio::fs::read_dir(dir).await?; + loop { + let e = rd.next_entry().await?; + let e = if let Some(x) = e { + x + } else { + break; + }; + + // TODO parse the yml file at this path and compile a merged configuration + e.path(); + todo!(); + } + Ok(ret) +} + +async fn parse_channel_config_txt(fname: &Path, re_p: Regex, re_n: Regex) -> Result { + let mut file = OpenOptions::new().read(true).open(fname).await?; + let mut buf = Vec::new(); + file.read_to_end(&mut buf).await?; + let lines = buf.split(|&x| x == 0x0a); + let mut conf = ChannelsConfig::new(); + for line in lines { + let line = String::from_utf8_lossy(line); + let line = line.trim(); + let use_line = if line.is_empty() { + false + } else if let Some(_cs) = re_p.captures(&line) { + true + } else if re_n.is_match(&line) { + false + } else { + true + }; + if use_line { + let item = ChannelConfig { + name: line.into(), + arch: IngestConfigArchiving { + replication: true, + short_term: Some(ChannelReadConfig::Monitor), + medium_term: None, + long_term: None, + }, + }; + conf.channels.push(item); + } + } + info!("Parsed {} channels", conf.channels.len()); + Ok(conf) +} + +#[derive(Debug, Deserialize)] +pub struct ChannelConfigParse { + archiving_configuration: IngestConfigArchiving, +} + +#[derive(Debug, Deserialize)] +pub struct IngestConfigArchiving { + #[serde(default, with = "serde_replication_bool")] + replication: bool, + #[serde(default, with = "serde_option_channel_read_config")] + short_term: Option, + #[serde(default, with = "serde_option_channel_read_config")] + medium_term: Option, + #[serde(default, with = "serde_option_channel_read_config")] + long_term: Option, +} + +mod serde_replication_bool { + use serde::de; + use serde::Deserializer; + use std::fmt; + + pub fn deserialize<'de, D>(de: D) -> Result + where + D: Deserializer<'de>, + { + de.deserialize_any(Vis) + } + + struct Vis; + + impl<'de> de::Visitor<'de> for Vis { + type Value = bool; + + fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "a keyword `Enabled` or `None`, or not this field at all") + } + + fn visit_bool(self, _v: bool) -> Result + where + E: de::Error, + { + let e = E::custom(format!("could accept bool, but it's not in specification")); + return Err(e); + } + + fn visit_str(self, v: &str) -> Result + where + E: de::Error, + { + let ret = if v == "Enabled" { + true + } else if v == "Disabled" { + false + } else { + let e = E::custom(format!("unexpected value {v:?}")); + return Err(e); + }; + Ok(ret) + } + } +} + +mod serde_option_channel_read_config { + use super::ChannelReadConfig; + use serde::de; + use serde::Deserializer; + use std::fmt; + use std::time::Duration; + + pub fn deserialize<'de, D>(de: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + de.deserialize_any(Vis) + } + + struct Vis; + + impl<'de> de::Visitor<'de> for Vis { + type Value = Option; + + fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "keyword `Monitor`, an integer, or not this field at all") + } + + fn visit_str(self, v: &str) -> Result + where + E: de::Error, + { + let ret = if v == "Monitor" { + Some(ChannelReadConfig::Monitor) + } else if v == "None" { + None + } else { + let e = E::custom(format!("unexpected value {v:?}")); + return Err(e); + }; + Ok(ret) + } + + fn visit_u64(self, v: u64) -> Result + where + E: de::Error, + { + if v < 1 || v > 108000 { + let e = E::custom(format!("unsupported value {v:?}, polling must be in range 1..108000")); + return Err(e); + } + Ok(Some(ChannelReadConfig::Poll(Duration::from_secs(v as u64)))) + } + + fn visit_i64(self, v: i64) -> Result + where + E: de::Error, + { + if v < 1 || v > 108000 { + let e = E::custom(format!("unsupported value {v:?}, polling must be in range 1..108000")); + return Err(e); + } + self.visit_u64(v as u64) + } + } +} + +#[derive(Debug, Deserialize)] +pub enum ChannelReadConfig { + Monitor, + Poll(Duration), +} + +#[test] +fn test_channel_config_00() { + let inp = r###" +CH-00: + archiving_configuration: + replication: Enabled + short_term: Monitor + medium_term: 60 + long_term: 3600 +CH-01: + archiving_configuration: + replication: Enabled + short_term: 1 + medium_term: 10 + long_term: 60 +CH-02: + archiving_configuration: + short_term: Monitor +CH-03: + archiving_configuration: +"###; + let x: BTreeMap = serde_yaml::from_str(inp).unwrap(); +} + +pub struct ChannelsConfig { + channels: Vec, +} + +impl ChannelsConfig { + fn new() -> Self { + Self { channels: Vec::new() } + } +} + +impl From> for ChannelsConfig { + fn from(value: BTreeMap) -> Self { + let channels = value + .into_iter() + .map(|(k, v)| ChannelConfig { + name: k, + arch: v.archiving_configuration, + }) + .collect(); + ChannelsConfig { channels } + } +} + +pub struct ChannelConfig { + name: String, + arch: IngestConfigArchiving, +}