diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index c2334c4..add5ccf 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.4-aa.2" +version = "0.2.4-aa.4" authors = ["Dominik Werder "] edition = "2021" diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index b49c772..5b5f3e1 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -504,7 +504,14 @@ impl Daemon { async fn handle_config_reload_inner(&mut self) -> Result<(), Error> { let channels_dir = self.ingest_opts.channels(); let channels = match netfetch::conf::parse_channels(channels_dir).await { - Ok(x) => x, + Ok(x) => { + if let Some(x) = &x { + info!("parsed {} channels", x.len()); + } else { + info!("config does not specify channels"); + } + x + } Err(e) => { return Err(Error::with_msg_no_trace(format!( "could not reload channel config {e}" @@ -512,9 +519,13 @@ impl Daemon { } }; if let Some(channels) = channels { - debug!("channels config reloaded"); // TODO // Send a marker flag-clear to CaConnSet. + if true { + let (tx, rx) = async_channel::bounded(10); + self.connset_ctrl.channel_config_flag_reset(tx).await?; + rx.recv().await??; + } // Send all the channel-add commands. let mut i = 0; for ch_cfg in channels.channels() { @@ -523,7 +534,12 @@ impl Daemon { rx.recv().await??; i += 1; } - debug!("channel add send n {i}"); + if true { + let (tx, rx) = async_channel::bounded(10); + self.connset_ctrl.channel_config_remove_unflagged(tx).await?; + rx.recv().await??; + } + info!("config reload done, applied {} channels", i); // Send a marker remove-cleared to CaConnSet (must impl that on CaConnSet to remove those channels) Ok(()) } else { @@ -533,7 +549,7 @@ impl Daemon { async fn handle_config_reload(&mut self, tx: async_channel::Sender) -> Result<(), Error> { match self.handle_config_reload_inner().await { - Ok(x) => { + Ok(()) => { if tx.send(0).await.is_err() { self.stats.channel_send_err().inc(); } diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 43348ec..b896810 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -189,6 +189,16 @@ pub struct ChannelAddWithStatusId { cssid: ChannelStatusSeriesId, } +#[derive(Debug, Clone)] +pub struct ChannelConfigFlagReset { + restx: crate::ca::conn::CmdResTx, +} + +#[derive(Debug, Clone)] +pub struct ChannelConfigRemoveUnflagged { + restx: crate::ca::conn::CmdResTx, +} + #[derive(Debug, Clone)] pub struct ChannelAdd { ch_cfg: ChannelConfig, @@ -241,6 +251,8 @@ impl fmt::Debug for ChannelStatusesRequest { #[derive(Debug)] pub enum ConnSetCmd { + ChannelConfigFlagReset(ChannelConfigFlagReset), + ChannelConfigRemoveUnflagged(ChannelConfigRemoveUnflagged), ChannelAdd(ChannelAdd), ChannelRemove(ChannelRemove), Shutdown, @@ -283,6 +295,20 @@ impl CaConnSetCtrl { self.rx.clone() } + pub async fn channel_config_flag_reset(&self, restx: crate::ca::conn::CmdResTx) -> Result<(), Error> { + let cmd = ChannelConfigFlagReset { restx }; + let cmd = ConnSetCmd::ChannelConfigFlagReset(cmd); + self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?; + Ok(()) + } + + pub async fn channel_config_remove_unflagged(&self, restx: crate::ca::conn::CmdResTx) -> Result<(), Error> { + let cmd = ChannelConfigRemoveUnflagged { restx }; + let cmd = ConnSetCmd::ChannelConfigRemoveUnflagged(cmd); + self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?; + Ok(()) + } + pub async fn add_channel(&self, ch_cfg: ChannelConfig, restx: crate::ca::conn::CmdResTx) -> Result<(), Error> { let cmd = ChannelAdd { ch_cfg, restx }; let cmd = ConnSetCmd::ChannelAdd(cmd); @@ -568,6 +594,8 @@ impl CaConnSet { fn handle_event(&mut self, ev: CaConnSetEvent) -> Result<(), Error> { match ev { CaConnSetEvent::ConnSetCmd(cmd) => match cmd { + ConnSetCmd::ChannelConfigFlagReset(x) => self.handle_channel_config_flag_reset(x), + ConnSetCmd::ChannelConfigRemoveUnflagged(x) => self.handle_channel_config_remove_unflagged(x), ConnSetCmd::ChannelAdd(x) => self.handle_add_channel(x), ConnSetCmd::ChannelRemove(x) => self.handle_remove_channel(x), ConnSetCmd::Shutdown => self.handle_shutdown(), @@ -608,6 +636,7 @@ impl CaConnSet { fn handle_add_channel_existing(cmd: ChannelAdd, ress: StateTransRes) -> Result<(), Error> { let tsnow = Instant::now(); + ress.chst.touched = 1; if cmd.ch_cfg == ress.chst.config { debug!("handle_add_channel_existing config same {}", cmd.name()); if let Err(_) = cmd.restx.try_send(Ok(())) { @@ -678,6 +707,36 @@ impl CaConnSet { } } + fn handle_channel_config_flag_reset(&mut self, cmd: ChannelConfigFlagReset) -> Result<(), Error> { + for chst in self.channel_states.iter_mut() { + chst.1.touched = 0; + } + if let Err(_) = cmd.restx.try_send(Ok(())) { + self.stats.command_reply_fail().inc(); + } + Ok(()) + } + + fn handle_channel_config_remove_unflagged(&mut self, cmd: ChannelConfigRemoveUnflagged) -> Result<(), Error> { + let mut cmds = VecDeque::new(); + for chst in self.channel_states.iter_mut() { + if chst.1.touched == 0 { + let cmd = ChannelRemove { + name: chst.0.name().into(), + }; + cmds.push_back(cmd); + } + } + for cmd in cmds { + debug!("call handle_remove_channel {cmd:?}"); + self.handle_remove_channel(cmd)?; + } + if let Err(_) = cmd.restx.try_send(Ok(())) { + self.stats.command_reply_fail().inc(); + } + Ok(()) + } + fn handle_add_channel(&mut self, cmd: ChannelAdd) -> Result<(), Error> { if self.shutdown_stopping { trace3!("handle_add_channel but shutdown_stopping"); @@ -895,6 +954,13 @@ impl CaConnSet { k.value = ChannelStateValue::ToRemove { addr: None }; } WithStatusSeriesIdStateInner::WithAddress { addr, state: _ } => { + debug!("send remove {ch:?} to {addr}"); + let conn_ress = self + .ca_conn_ress + .get_mut(&SocketAddr::V4(addr.clone())) + .ok_or_else(|| Error::ChannelAssignedWithoutConnRess)?; + let item = ConnCommand::channel_close(ch.name().into()); + conn_ress.cmd_queue.push_back(item); k.value = ChannelStateValue::ToRemove { addr: Some(addr.clone()), }; diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 4cd489b..92e8867 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -183,6 +183,7 @@ async fn always_error(params: HashMap) -> Result) -> Result, Response> { + info!("api config reload request"); let (tx, rx) = async_channel::bounded(10); let item = DaemonEvent::ConfigReload(tx); dcom.tx.send(item).await; diff --git a/serieswriter/src/binwriter.rs b/serieswriter/src/binwriter.rs index ad4ee08..e650a18 100644 --- a/serieswriter/src/binwriter.rs +++ b/serieswriter/src/binwriter.rs @@ -64,7 +64,7 @@ impl BinWriter { let margin = 1000 * 1000 * 1000 * 60 * 60 * 24 * 40; let end = u64::MAX - margin; let range = BinnedRange::from_nano_range(NanoRange::from_ns_u64(beg.ns(), end), DtMs::from_ms_u64(1000 * 10)); - let binner = BinnedEventsTimeweight::new(range); + let binner = BinnedEventsTimeweight::new(range).disable_cnt_zero(); let ret = Self { rt, cssid, @@ -113,6 +113,7 @@ impl BinWriter { for ((((((&ts1, &ts2), &cnt), &min), &max), &avg), &fnl) in out.zip_iter() { if fnl == false { debug!("non final bin"); + } else if cnt == 0 { } else { let bin_len = DtMs::from_ms_u64(ts2.delta(ts1).ms_u64()); let div = if bin_len == DtMs::from_ms_u64(1000 * 10) {