Support channel removal on config reload

This commit is contained in:
Dominik Werder
2024-10-10 16:38:45 +02:00
parent 87e6dfdcaa
commit 6febac3033
5 changed files with 90 additions and 6 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.2.4-aa.2"
version = "0.2.4-aa.4"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -504,7 +504,14 @@ impl Daemon {
async fn handle_config_reload_inner(&mut self) -> Result<(), Error> {
let channels_dir = self.ingest_opts.channels();
let channels = match netfetch::conf::parse_channels(channels_dir).await {
Ok(x) => x,
Ok(x) => {
if let Some(x) = &x {
info!("parsed {} channels", x.len());
} else {
info!("config does not specify channels");
}
x
}
Err(e) => {
return Err(Error::with_msg_no_trace(format!(
"could not reload channel config {e}"
@@ -512,9 +519,13 @@ impl Daemon {
}
};
if let Some(channels) = channels {
debug!("channels config reloaded");
// TODO
// Send a marker flag-clear to CaConnSet.
if true {
let (tx, rx) = async_channel::bounded(10);
self.connset_ctrl.channel_config_flag_reset(tx).await?;
rx.recv().await??;
}
// Send all the channel-add commands.
let mut i = 0;
for ch_cfg in channels.channels() {
@@ -523,7 +534,12 @@ impl Daemon {
rx.recv().await??;
i += 1;
}
debug!("channel add send n {i}");
if true {
let (tx, rx) = async_channel::bounded(10);
self.connset_ctrl.channel_config_remove_unflagged(tx).await?;
rx.recv().await??;
}
info!("config reload done, applied {} channels", i);
// Send a marker remove-cleared to CaConnSet (must impl that on CaConnSet to remove those channels)
Ok(())
} else {
@@ -533,7 +549,7 @@ impl Daemon {
async fn handle_config_reload(&mut self, tx: async_channel::Sender<u64>) -> Result<(), Error> {
match self.handle_config_reload_inner().await {
Ok(x) => {
Ok(()) => {
if tx.send(0).await.is_err() {
self.stats.channel_send_err().inc();
}

View File

@@ -189,6 +189,16 @@ pub struct ChannelAddWithStatusId {
cssid: ChannelStatusSeriesId,
}
#[derive(Debug, Clone)]
pub struct ChannelConfigFlagReset {
restx: crate::ca::conn::CmdResTx,
}
#[derive(Debug, Clone)]
pub struct ChannelConfigRemoveUnflagged {
restx: crate::ca::conn::CmdResTx,
}
#[derive(Debug, Clone)]
pub struct ChannelAdd {
ch_cfg: ChannelConfig,
@@ -241,6 +251,8 @@ impl fmt::Debug for ChannelStatusesRequest {
#[derive(Debug)]
pub enum ConnSetCmd {
ChannelConfigFlagReset(ChannelConfigFlagReset),
ChannelConfigRemoveUnflagged(ChannelConfigRemoveUnflagged),
ChannelAdd(ChannelAdd),
ChannelRemove(ChannelRemove),
Shutdown,
@@ -283,6 +295,20 @@ impl CaConnSetCtrl {
self.rx.clone()
}
pub async fn channel_config_flag_reset(&self, restx: crate::ca::conn::CmdResTx) -> Result<(), Error> {
let cmd = ChannelConfigFlagReset { restx };
let cmd = ConnSetCmd::ChannelConfigFlagReset(cmd);
self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?;
Ok(())
}
pub async fn channel_config_remove_unflagged(&self, restx: crate::ca::conn::CmdResTx) -> Result<(), Error> {
let cmd = ChannelConfigRemoveUnflagged { restx };
let cmd = ConnSetCmd::ChannelConfigRemoveUnflagged(cmd);
self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?;
Ok(())
}
pub async fn add_channel(&self, ch_cfg: ChannelConfig, restx: crate::ca::conn::CmdResTx) -> Result<(), Error> {
let cmd = ChannelAdd { ch_cfg, restx };
let cmd = ConnSetCmd::ChannelAdd(cmd);
@@ -568,6 +594,8 @@ impl CaConnSet {
fn handle_event(&mut self, ev: CaConnSetEvent) -> Result<(), Error> {
match ev {
CaConnSetEvent::ConnSetCmd(cmd) => match cmd {
ConnSetCmd::ChannelConfigFlagReset(x) => self.handle_channel_config_flag_reset(x),
ConnSetCmd::ChannelConfigRemoveUnflagged(x) => self.handle_channel_config_remove_unflagged(x),
ConnSetCmd::ChannelAdd(x) => self.handle_add_channel(x),
ConnSetCmd::ChannelRemove(x) => self.handle_remove_channel(x),
ConnSetCmd::Shutdown => self.handle_shutdown(),
@@ -608,6 +636,7 @@ impl CaConnSet {
fn handle_add_channel_existing(cmd: ChannelAdd, ress: StateTransRes) -> Result<(), Error> {
let tsnow = Instant::now();
ress.chst.touched = 1;
if cmd.ch_cfg == ress.chst.config {
debug!("handle_add_channel_existing config same {}", cmd.name());
if let Err(_) = cmd.restx.try_send(Ok(())) {
@@ -678,6 +707,36 @@ impl CaConnSet {
}
}
fn handle_channel_config_flag_reset(&mut self, cmd: ChannelConfigFlagReset) -> Result<(), Error> {
for chst in self.channel_states.iter_mut() {
chst.1.touched = 0;
}
if let Err(_) = cmd.restx.try_send(Ok(())) {
self.stats.command_reply_fail().inc();
}
Ok(())
}
fn handle_channel_config_remove_unflagged(&mut self, cmd: ChannelConfigRemoveUnflagged) -> Result<(), Error> {
let mut cmds = VecDeque::new();
for chst in self.channel_states.iter_mut() {
if chst.1.touched == 0 {
let cmd = ChannelRemove {
name: chst.0.name().into(),
};
cmds.push_back(cmd);
}
}
for cmd in cmds {
debug!("call handle_remove_channel {cmd:?}");
self.handle_remove_channel(cmd)?;
}
if let Err(_) = cmd.restx.try_send(Ok(())) {
self.stats.command_reply_fail().inc();
}
Ok(())
}
fn handle_add_channel(&mut self, cmd: ChannelAdd) -> Result<(), Error> {
if self.shutdown_stopping {
trace3!("handle_add_channel but shutdown_stopping");
@@ -895,6 +954,13 @@ impl CaConnSet {
k.value = ChannelStateValue::ToRemove { addr: None };
}
WithStatusSeriesIdStateInner::WithAddress { addr, state: _ } => {
debug!("send remove {ch:?} to {addr}");
let conn_ress = self
.ca_conn_ress
.get_mut(&SocketAddr::V4(addr.clone()))
.ok_or_else(|| Error::ChannelAssignedWithoutConnRess)?;
let item = ConnCommand::channel_close(ch.name().into());
conn_ress.cmd_queue.push_back(item);
k.value = ChannelStateValue::ToRemove {
addr: Some(addr.clone()),
};

View File

@@ -183,6 +183,7 @@ async fn always_error(params: HashMap<String, String>) -> Result<axum::Json<bool
}
async fn config_reload(dcom: Arc<DaemonComm>) -> Result<axum::Json<serde_json::Value>, Response> {
info!("api config reload request");
let (tx, rx) = async_channel::bounded(10);
let item = DaemonEvent::ConfigReload(tx);
dcom.tx.send(item).await;

View File

@@ -64,7 +64,7 @@ impl BinWriter {
let margin = 1000 * 1000 * 1000 * 60 * 60 * 24 * 40;
let end = u64::MAX - margin;
let range = BinnedRange::from_nano_range(NanoRange::from_ns_u64(beg.ns(), end), DtMs::from_ms_u64(1000 * 10));
let binner = BinnedEventsTimeweight::new(range);
let binner = BinnedEventsTimeweight::new(range).disable_cnt_zero();
let ret = Self {
rt,
cssid,
@@ -113,6 +113,7 @@ impl BinWriter {
for ((((((&ts1, &ts2), &cnt), &min), &max), &avg), &fnl) in out.zip_iter() {
if fnl == false {
debug!("non final bin");
} else if cnt == 0 {
} else {
let bin_len = DtMs::from_ms_u64(ts2.delta(ts1).ms_u64());
let div = if bin_len == DtMs::from_ms_u64(1000 * 10) {