Parse channel yml conf

This commit is contained in:
Dominik Werder
2024-02-16 21:52:26 +01:00
parent e1599ab0b7
commit cb2231e58d
2 changed files with 255 additions and 24 deletions

View File

@@ -56,7 +56,8 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
ChannelAccess::CaIngest(k) => {
info!("daqingest version {}", clap::crate_version!());
let (conf, channels) = parse_config(k.config.into()).await?;
daqingest::daemon::run(conf, channels).await?
todo!();
// daqingest::daemon::run(conf, channels).await?
}
},
#[cfg(feature = "bsread")]

View File

@@ -2,8 +2,11 @@ use err::Error;
use netpod::log::*;
use netpod::Database;
use netpod::ScyllaConfig;
use regex::Regex;
use serde::Deserialize;
use serde::Serialize;
use std::collections::BTreeMap;
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
use taskrun::tokio;
@@ -181,40 +184,267 @@ fn test_duration_parse() {
assert_eq!(a.dur, Duration::from_millis(3170));
}
pub async fn parse_config(config: PathBuf) -> Result<(CaIngestOpts, Option<Vec<String>>), Error> {
pub async fn parse_config(config: PathBuf) -> Result<(CaIngestOpts, Option<ChannelsConfig>), Error> {
let mut file = OpenOptions::new().read(true).open(config).await?;
let mut buf = Vec::new();
file.read_to_end(&mut buf).await?;
let conf: CaIngestOpts = serde_yaml::from_slice(&buf).map_err(|e| Error::with_msg_no_trace(format!("{:?}", e)))?;
drop(file);
let re_p = regex::Regex::new(&conf.whitelist.clone().unwrap_or("--nothing-whitelisted--".into()))?;
let re_n = regex::Regex::new(&conf.blacklist.clone().unwrap_or("--nothing-blacklisted--".into()))?;
let re_p = regex::Regex::new(&conf.whitelist.clone().unwrap_or("--nothing-ur9nc23ur98c--".into()))?;
let re_n = regex::Regex::new(&conf.blacklist.clone().unwrap_or("--nothing-ksm2u98rcm28--".into()))?;
let channels = if let Some(fname) = conf.channels.as_ref() {
let mut file = OpenOptions::new().read(true).open(fname).await?;
let mut buf = Vec::new();
file.read_to_end(&mut buf).await?;
let lines = buf.split(|&x| x == 0x0a);
let mut channels = Vec::new();
for line in lines {
let line = String::from_utf8_lossy(line);
let line = line.trim();
let use_line = if line.is_empty() {
false
} else if let Some(_cs) = re_p.captures(&line) {
true
} else if re_n.is_match(&line) {
false
if fname.ends_with(".txt") {
Some(parse_channel_config_txt(fname, re_p, re_n).await?)
} else if fname.ends_with(".yml") {
let e = Error::with_msg_no_trace("unsupported channe config file");
return Err(e);
} else {
let meta = tokio::fs::metadata(fname).await?;
if meta.is_dir() {
Some(parse_config_dir(&fname).await?)
} else {
true
};
if use_line {
channels.push(line.into());
let e = Error::with_msg_no_trace("unsupported channe config file");
return Err(e);
}
}
info!("Parsed {} channels", channels.len());
Some(channels)
} else {
None
};
Ok((conf, channels))
}
async fn parse_config_dir(dir: &Path) -> Result<ChannelsConfig, Error> {
let mut ret = ChannelsConfig::new();
let mut rd = tokio::fs::read_dir(dir).await?;
loop {
let e = rd.next_entry().await?;
let e = if let Some(x) = e {
x
} else {
break;
};
// TODO parse the yml file at this path and compile a merged configuration
e.path();
todo!();
}
Ok(ret)
}
async fn parse_channel_config_txt(fname: &Path, re_p: Regex, re_n: Regex) -> Result<ChannelsConfig, Error> {
let mut file = OpenOptions::new().read(true).open(fname).await?;
let mut buf = Vec::new();
file.read_to_end(&mut buf).await?;
let lines = buf.split(|&x| x == 0x0a);
let mut conf = ChannelsConfig::new();
for line in lines {
let line = String::from_utf8_lossy(line);
let line = line.trim();
let use_line = if line.is_empty() {
false
} else if let Some(_cs) = re_p.captures(&line) {
true
} else if re_n.is_match(&line) {
false
} else {
true
};
if use_line {
let item = ChannelConfig {
name: line.into(),
arch: IngestConfigArchiving {
replication: true,
short_term: Some(ChannelReadConfig::Monitor),
medium_term: None,
long_term: None,
},
};
conf.channels.push(item);
}
}
info!("Parsed {} channels", conf.channels.len());
Ok(conf)
}
#[derive(Debug, Deserialize)]
pub struct ChannelConfigParse {
archiving_configuration: IngestConfigArchiving,
}
#[derive(Debug, Deserialize)]
pub struct IngestConfigArchiving {
#[serde(default, with = "serde_replication_bool")]
replication: bool,
#[serde(default, with = "serde_option_channel_read_config")]
short_term: Option<ChannelReadConfig>,
#[serde(default, with = "serde_option_channel_read_config")]
medium_term: Option<ChannelReadConfig>,
#[serde(default, with = "serde_option_channel_read_config")]
long_term: Option<ChannelReadConfig>,
}
mod serde_replication_bool {
use serde::de;
use serde::Deserializer;
use std::fmt;
pub fn deserialize<'de, D>(de: D) -> Result<bool, D::Error>
where
D: Deserializer<'de>,
{
de.deserialize_any(Vis)
}
struct Vis;
impl<'de> de::Visitor<'de> for Vis {
type Value = bool;
fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "a keyword `Enabled` or `None`, or not this field at all")
}
fn visit_bool<E>(self, _v: bool) -> Result<Self::Value, E>
where
E: de::Error,
{
let e = E::custom(format!("could accept bool, but it's not in specification"));
return Err(e);
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
let ret = if v == "Enabled" {
true
} else if v == "Disabled" {
false
} else {
let e = E::custom(format!("unexpected value {v:?}"));
return Err(e);
};
Ok(ret)
}
}
}
mod serde_option_channel_read_config {
use super::ChannelReadConfig;
use serde::de;
use serde::Deserializer;
use std::fmt;
use std::time::Duration;
pub fn deserialize<'de, D>(de: D) -> Result<Option<ChannelReadConfig>, D::Error>
where
D: Deserializer<'de>,
{
de.deserialize_any(Vis)
}
struct Vis;
impl<'de> de::Visitor<'de> for Vis {
type Value = Option<ChannelReadConfig>;
fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "keyword `Monitor`, an integer, or not this field at all")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
let ret = if v == "Monitor" {
Some(ChannelReadConfig::Monitor)
} else if v == "None" {
None
} else {
let e = E::custom(format!("unexpected value {v:?}"));
return Err(e);
};
Ok(ret)
}
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where
E: de::Error,
{
if v < 1 || v > 108000 {
let e = E::custom(format!("unsupported value {v:?}, polling must be in range 1..108000"));
return Err(e);
}
Ok(Some(ChannelReadConfig::Poll(Duration::from_secs(v as u64))))
}
fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
where
E: de::Error,
{
if v < 1 || v > 108000 {
let e = E::custom(format!("unsupported value {v:?}, polling must be in range 1..108000"));
return Err(e);
}
self.visit_u64(v as u64)
}
}
}
#[derive(Debug, Deserialize)]
pub enum ChannelReadConfig {
Monitor,
Poll(Duration),
}
#[test]
fn test_channel_config_00() {
let inp = r###"
CH-00:
archiving_configuration:
replication: Enabled
short_term: Monitor
medium_term: 60
long_term: 3600
CH-01:
archiving_configuration:
replication: Enabled
short_term: 1
medium_term: 10
long_term: 60
CH-02:
archiving_configuration:
short_term: Monitor
CH-03:
archiving_configuration:
"###;
let x: BTreeMap<String, ChannelConfigParse> = serde_yaml::from_str(inp).unwrap();
}
pub struct ChannelsConfig {
channels: Vec<ChannelConfig>,
}
impl ChannelsConfig {
fn new() -> Self {
Self { channels: Vec::new() }
}
}
impl From<BTreeMap<String, ChannelConfigParse>> for ChannelsConfig {
fn from(value: BTreeMap<String, ChannelConfigParse>) -> Self {
let channels = value
.into_iter()
.map(|(k, v)| ChannelConfig {
name: k,
arch: v.archiving_configuration,
})
.collect();
ChannelsConfig { channels }
}
}
pub struct ChannelConfig {
name: String,
arch: IngestConfigArchiving,
}