diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 194caae..c2334c4 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.4-aa.1" +version = "0.2.4-aa.2" authors = ["Dominik Werder "] edition = "2021" diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 461a929..35eae37 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -407,10 +407,7 @@ impl Daemon { ch_cfg: ChannelConfig, restx: netfetch::ca::conn::CmdResTx, ) -> Result<(), Error> { - // debug!("handle_channel_add {ch:?}"); - self.connset_ctrl - .add_channel(self.ingest_opts.backend().into(), ch_cfg, restx) - .await?; + self.connset_ctrl.add_channel(ch_cfg, restx).await?; Ok(()) } diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 3e53301..71eda97 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -140,21 +140,19 @@ pub struct ChannelAddWithAddr { #[derive(Debug, Clone)] pub struct ChannelAddWithStatusId { - backend: String, ch_cfg: ChannelConfig, cssid: ChannelStatusSeriesId, } #[derive(Debug, Clone)] pub struct ChannelAdd { - backend: String, ch_cfg: ChannelConfig, restx: crate::ca::conn::CmdResTx, } impl ChannelAdd { pub fn name(&self) -> &str { - &self.ch_cfg.name() + self.ch_cfg.name() } } @@ -240,13 +238,8 @@ impl CaConnSetCtrl { self.rx.clone() } - pub async fn add_channel( - &self, - backend: String, - ch_cfg: ChannelConfig, - restx: crate::ca::conn::CmdResTx, - ) -> Result<(), Error> { - let cmd = ChannelAdd { backend, ch_cfg, restx }; + pub async fn add_channel(&self, ch_cfg: ChannelConfig, restx: crate::ca::conn::CmdResTx) -> Result<(), Error> { + let cmd = ChannelAdd { ch_cfg, restx }; let cmd = ConnSetCmd::ChannelAdd(cmd); self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?; Ok(()) @@ -550,7 +543,7 @@ impl CaConnSet { let channel_name = cmd.name().into(); let tx = self.channel_info_res_tx.as_ref().get_ref().clone(); let item = ChannelInfoQuery { - backend: cmd.backend, + backend: self.backend.clone(), channel: channel_name, kind: SeriesKind::ChannelStatus, scalar_type: ScalarType::U64, @@ -589,7 +582,6 @@ impl CaConnSet { self.channel_by_cssid .insert(cssid.clone(), Channel::new(res.channel.clone())); let add = ChannelAddWithStatusId { - backend: res.backend, ch_cfg: st.config.clone(), cssid, }; diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 28a3466..1ed4e7a 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -259,6 +259,7 @@ async fn parse_channel_config_txt(fname: &Path, re_p: Regex, re_n: Regex) -> Res medium_term: None, long_term: None, is_polled: false, + timestamp: ChannelTimestamp::Archiver, }, }; conf.channels.push(item); @@ -273,9 +274,21 @@ pub struct ChannelConfigParse { archiving_configuration: IngestConfigArchiving, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ChannelTimestamp { + Archiver, + IOC, +} + +impl ChannelTimestamp { + fn default_config() -> Self { + Self::Archiver + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct IngestConfigArchiving { - #[serde(default, skip_serializing_if = "bool_is_false")] + #[serde(default = "bool_true")] #[serde(with = "serde_replication_bool")] replication: bool, #[serde(default, skip_serializing_if = "Option::is_none")] @@ -289,12 +302,22 @@ pub struct IngestConfigArchiving { long_term: Option, #[serde(default, skip_serializing_if = "bool_is_false")] is_polled: bool, + #[serde(default = "ChannelTimestamp::default_config")] + timestamp: ChannelTimestamp, } fn bool_is_false(x: &bool) -> bool { *x == false } +fn bool_is_true(x: &bool) -> bool { + *x == false +} + +fn bool_true() -> bool { + true +} + mod serde_replication_bool { use serde::de; use serde::Deserializer; @@ -321,7 +344,15 @@ mod serde_replication_bool { type Value = bool; fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "a keyword `Enabled` or `None`, or not this field at all") + write!(fmt, "a keyword `Enabled`, `Disabled`, null, or not this field at all") + } + + fn visit_none(self) -> Result + where + E: de::Error, + { + let e = E::custom(format!("could accept `null` value, but it's not in specification")); + return Err(e); } fn visit_bool(self, _v: bool) -> Result @@ -530,6 +561,7 @@ impl ChannelConfig { medium_term: None, long_term: None, is_polled: false, + timestamp: ChannelTimestamp::Archiver, }, } }