diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 30ad114..31c44b0 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -55,9 +55,8 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { } ChannelAccess::CaIngest(k) => { info!("daqingest version {}", clap::crate_version!()); - let (conf, channels) = parse_config(k.config.into()).await?; - todo!(); - // daqingest::daemon::run(conf, channels).await? + let (conf, channels_config) = parse_config(k.config.into()).await?; + daqingest::daemon::run(conf, channels_config).await? } }, #[cfg(feature = "bsread")] diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 4692251..fe4f870 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -10,6 +10,8 @@ use netfetch::ca::connset::CaConnSet; use netfetch::ca::connset::CaConnSetCtrl; use netfetch::ca::connset::CaConnSetItem; use netfetch::conf::CaIngestOpts; +use netfetch::conf::ChannelConfig; +use netfetch::conf::ChannelsConfig; use netfetch::daemon_common::Channel; use netfetch::daemon_common::DaemonEvent; use netfetch::metrics::StatsSet; @@ -276,9 +278,13 @@ impl Daemon { } self.stats.handle_timer_tick_count.inc(); let tsnow = SystemTime::now(); - if SIGINT.load(atomic::Ordering::Acquire) == 1 { - warn!("Received SIGINT"); - SIGINT.store(2, atomic::Ordering::Release); + { + let n = SIGINT.load(atomic::Ordering::Acquire); + let m = SIGINT_CONFIRM.load(atomic::Ordering::Acquire); + if m != n { + warn!("Received SIGINT"); + SIGINT_CONFIRM.store(n, atomic::Ordering::Release); + } } if SIGTERM.load(atomic::Ordering::Acquire) == 1 { warn!("Received SIGTERM"); @@ -306,16 +312,20 @@ impl Daemon { Ok(()) } - async fn handle_channel_add(&mut self, ch: Channel, restx: netfetch::ca::conn::CmdResTx) -> Result<(), Error> { + async fn handle_channel_add( + &mut self, + ch_cfg: ChannelConfig, + restx: netfetch::ca::conn::CmdResTx, + ) -> Result<(), Error> { // debug!("handle_channel_add {ch:?}"); self.connset_ctrl - .add_channel(self.ingest_opts.backend().into(), ch.id().into(), restx) + .add_channel(self.ingest_opts.backend().into(), ch_cfg, restx) .await?; Ok(()) } async fn handle_channel_remove(&mut self, ch: Channel) -> Result<(), Error> { - self.connset_ctrl.remove_channel(ch.id().into()).await?; + self.connset_ctrl.remove_channel(ch.name().into()).await?; Ok(()) } @@ -445,7 +455,7 @@ impl Daemon { }; let dt = ts1.elapsed(); if dt > Duration::from_millis(200) { - warn!("handle_event slow {}ms {}", dt.as_secs_f32() * 1e3, item_summary); + warn!("handle_event slow {} ms {}", dt.as_secs_f32() * 1e3, item_summary); } ret } @@ -531,13 +541,16 @@ impl Daemon { } static SIGINT: AtomicUsize = AtomicUsize::new(0); +static SIGINT_CONFIRM: AtomicUsize = AtomicUsize::new(0); static SIGTERM: AtomicUsize = AtomicUsize::new(0); static SHUTDOWN_SENT: AtomicUsize = AtomicUsize::new(0); fn handler_sigint(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc::c_void) { - std::process::exit(13); - SIGINT.store(1, atomic::Ordering::Release); - let _ = ingest_linux::signal::unset_signal_handler(libc::SIGINT); + let n = SIGINT.fetch_add(1, atomic::Ordering::AcqRel); + if n >= 2 { + let _ = ingest_linux::signal::unset_signal_handler(libc::SIGINT); + std::process::exit(13); + } } fn handler_sigterm(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc::c_void) { @@ -545,8 +558,9 @@ fn handler_sigterm(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc: let _ = ingest_linux::signal::unset_signal_handler(libc::SIGTERM); } -pub async fn run(opts: CaIngestOpts, channels: Option>) -> Result<(), Error> { +pub async fn run(opts: CaIngestOpts, channels_config: Option) -> Result<(), Error> { info!("start up {opts:?}"); + debug!("channels_config {channels_config:?}"); ingest_linux::signal::set_signal_handler(libc::SIGINT, handler_sigint).map_err(Error::from_string)?; ingest_linux::signal::set_signal_handler(libc::SIGTERM, handler_sigterm).map_err(Error::from_string)?; @@ -567,10 +581,11 @@ pub async fn run(opts: CaIngestOpts, channels: Option>) -> Result<() //let metrics_agg_fut = metrics_agg_task(ingest_commons.clone(), local_stats.clone(), store_stats.clone()); //let metrics_agg_jh = tokio::spawn(metrics_agg_fut); - let mut channels = channels; - if opts.test_bsread_addr.is_some() { - channels = None; - } + let channels_config = if opts.test_bsread_addr.is_some() { + None + } else { + channels_config + }; let insert_frac = Arc::new(AtomicU64::new(opts.insert_frac())); let store_workers_rate = Arc::new(AtomicU64::new(opts.store_workers_rate())); @@ -616,13 +631,15 @@ pub async fn run(opts: CaIngestOpts, channels: Option>) -> Result<() let daemon_jh = taskrun::spawn(daemon.daemon()); - if let Some(channels) = channels { - debug!("will configure {} channels", channels.len()); + if let Some(channels_config) = channels_config { + debug!("will configure {} channels", channels_config.len()); let mut thr_msg = ThrottleTrace::new(Duration::from_millis(1000)); let mut i = 0; - for s in &channels { - let ch = Channel::new(s.into()); - match tx.send(DaemonEvent::ChannelAdd(ch, async_channel::bounded(1).0)).await { + for ch_cfg in channels_config.channels() { + match tx + .send(DaemonEvent::ChannelAdd(ch_cfg.clone(), async_channel::bounded(1).0)) + .await + { Ok(()) => {} Err(e) => { error!("{e}"); @@ -632,7 +649,7 @@ pub async fn run(opts: CaIngestOpts, channels: Option>) -> Result<() thr_msg.trigger("daemon sent ChannelAdd", &[&i as &_]); i += 1; } - debug!("{} configured channels applied", channels.len()); + debug!("{} configured channels applied", channels_config.len()); } daemon_jh.await.map_err(|e| Error::with_msg_no_trace(e.to_string()))??; info!("Daemon joined."); diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 278addf..74ad137 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -3,6 +3,7 @@ use super::proto::CaEventValue; use super::proto::ReadNotify; use super::ExtraInsertsConf; use crate::ca::proto::EventCancel; +use crate::conf::ChannelConfig; use crate::senderpolling::SenderPolling; use crate::throttletrace::ThrottleTrace; use async_channel::Receiver; @@ -126,7 +127,7 @@ impl err::ToErr for Error { } } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize)] pub enum ChannelConnectedInfo { Disconnected, Connecting, @@ -134,7 +135,7 @@ pub enum ChannelConnectedInfo { Error, } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize)] pub struct ChannelStateInfo { pub cssid: ChannelStatusSeriesId, pub addr: SocketAddrV4, @@ -154,6 +155,7 @@ pub struct ChannelStateInfo { // #[serde(skip_serializing_if = "Option::is_none")] pub item_recv_ivl_ema: Option, pub interest_score: f32, + pub conf: ChannelConfig, } mod ser_instant { @@ -360,8 +362,14 @@ enum ChannelState { Ended(ChannelStatusSeriesId), } +#[derive(Debug)] +struct ChannelConf { + conf: ChannelConfig, + state: ChannelState, +} + impl ChannelState { - fn to_info(&self, cssid: ChannelStatusSeriesId, addr: SocketAddrV4) -> ChannelStateInfo { + fn to_info(&self, cssid: ChannelStatusSeriesId, addr: SocketAddrV4, conf: ChannelConfig) -> ChannelStateInfo { let channel_connected_info = match self { ChannelState::Init(..) => ChannelConnectedInfo::Disconnected, ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting, @@ -423,6 +431,7 @@ impl ChannelState { recv_bytes, item_recv_ivl_ema, interest_score, + conf, } } @@ -557,7 +566,7 @@ pub type CmdResTx = Sender>; #[derive(Debug)] pub enum ConnCommandKind { - ChannelAdd(String, ChannelStatusSeriesId), + ChannelAdd(ChannelConfig, ChannelStatusSeriesId), ChannelClose(String), Shutdown, } @@ -569,10 +578,10 @@ pub struct ConnCommand { } impl ConnCommand { - pub fn channel_add(name: String, cssid: ChannelStatusSeriesId) -> Self { + pub fn channel_add(conf: ChannelConfig, cssid: ChannelStatusSeriesId) -> Self { Self { id: Self::make_id(), - kind: ConnCommandKind::ChannelAdd(name, cssid), + kind: ConnCommandKind::ChannelAdd(conf, cssid), } } @@ -698,12 +707,11 @@ pub struct CaConn { proto: Option, cid_store: CidStore, subid_store: SubidStore, - channels: HashMap, + channels: HashMap, // btree because require order: cid_by_name: BTreeMap, cid_by_subid: HashMap, cid_by_sid: HashMap, - name_by_cid: HashMap, channel_status_emit_last: Instant, tick_last_writer: Instant, init_state_count: u64, @@ -716,7 +724,6 @@ pub struct CaConn { conn_command_rx: Pin>>, conn_backoff: f32, conn_backoff_beg: f32, - extra_inserts_conf: ExtraInsertsConf, ioc_ping_last: Instant, ioc_ping_next: Instant, ioc_ping_start: Option, @@ -773,7 +780,6 @@ impl CaConn { cid_by_name: BTreeMap::new(), cid_by_subid: HashMap::new(), cid_by_sid: HashMap::new(), - name_by_cid: HashMap::new(), channel_status_emit_last: tsnow, tick_last_writer: tsnow, insert_item_queue: VecDeque::new(), @@ -785,7 +791,6 @@ impl CaConn { conn_command_rx: Box::pin(cq_rx), conn_backoff: 0.02, conn_backoff_beg: 0.02, - extra_inserts_conf: ExtraInsertsConf::new(), ioc_ping_last: tsnow, ioc_ping_next: tsnow + Self::ioc_ping_ivl_rng(&mut rng), ioc_ping_start: None, @@ -928,51 +933,7 @@ impl CaConn { // } } - fn cmd_find_channel(&self, pattern: &str) { - let res = if let Ok(re) = regex::Regex::new(&pattern) { - self.name_by_cid - .values() - .filter(|x| re.is_match(x)) - .map(ToString::to_string) - .collect() - } else { - Vec::new() - }; - // TODO return the result - } - - fn cmd_channel_state(&self, name: String) { - let res = match self.cid_by_name(&name) { - Some(cid) => match self.channels.get(&cid) { - Some(state) => Some(state.to_info(state.cssid(), self.remote_addr_dbg.clone())), - None => None, - }, - None => None, - }; - let msg = (self.remote_addr_dbg.clone(), res); - if msg.1.is_some() { - info!("Sending back {msg:?}"); - } - // TODO return the result - } - - fn cmd_channel_states_all(&self) { - let res: Vec<_> = self - .channels - .iter() - .map(|(cid, state)| { - // let name = self - // .name_by_cid - // .get(cid) - // .map_or("--unknown--".into(), |x| x.to_string()); - state.to_info(state.cssid(), self.remote_addr_dbg.clone()) - }) - .collect(); - let msg = (self.remote_addr_dbg.clone(), res); - // TODO return the result - } - - fn cmd_channel_add(&mut self, name: String, cssid: ChannelStatusSeriesId) { + fn cmd_channel_add(&mut self, name: ChannelConfig, cssid: ChannelStatusSeriesId) { self.channel_add(name, cssid); } @@ -987,16 +948,6 @@ impl CaConn { self.trigger_shutdown(ChannelStatusClosedReason::ShutdownCommand); } - fn cmd_extra_inserts_conf(&mut self, extra_inserts_conf: ExtraInsertsConf) { - self.extra_inserts_conf = extra_inserts_conf; - // TODO return the result - } - - fn cmd_save_conn_info(&mut self) { - let res = self.emit_channel_info_insert_items(); - // TODO return the result - } - fn handle_conn_command(&mut self, cx: &mut Context) -> Result>, Error> { use Poll::*; self.stats.loop3_count.inc(); @@ -1008,8 +959,8 @@ impl CaConn { Ready(Some(a)) => { trace3!("handle_conn_command received a command {}", self.remote_addr_dbg); match a.kind { - ConnCommandKind::ChannelAdd(name, cssid) => { - self.cmd_channel_add(name, cssid); + ConnCommandKind::ChannelAdd(conf, cssid) => { + self.cmd_channel_add(conf, cssid); Ok(Ready(Some(()))) } ConnCommandKind::ChannelClose(name) => { @@ -1064,7 +1015,8 @@ impl CaConn { // Store the writer with the channel state. // Create a monitor for the channel. // NOTE: must store the Writer even if not yet in Evented, we could also transition to Polled! - if let Some(chst) = self.channels.get_mut(&cid) { + if let Some(conf) = self.channels.get_mut(&cid) { + let chst = &mut conf.state; if let ChannelState::MakingSeriesWriter(st2) = chst { self.stats.get_series_id_ok.inc(); { @@ -1075,7 +1027,7 @@ impl CaConn { }); self.insert_item_queue.push_back(item); } - let name = self.name_by_cid.get(&st2.channel.cid).map(|x| x.as_str()).unwrap_or(""); + let name = conf.conf.name(); if name.starts_with("TEST:PEAKING:") { let created_state = WritableState { tsbeg: self.poll_tsnow, @@ -1140,16 +1092,20 @@ impl CaConn { self.stats.clone() } - pub fn channel_add(&mut self, channel: String, cssid: ChannelStatusSeriesId) { - if self.cid_by_name(&channel).is_some() { + pub fn channel_add(&mut self, conf: ChannelConfig, cssid: ChannelStatusSeriesId) { + if self.cid_by_name(conf.name()).is_some() { // TODO count for metrics return; } - let cid = self.cid_by_name_or_insert(&channel); + let cid = self.cid_by_name_or_insert(conf.name()); if self.channels.contains_key(&cid) { - error!("logic error channel already exists {channel}"); + error!("logic error channel already exists {conf:?}"); } else { - self.channels.insert(cid, ChannelState::Init(cssid)); + let conf = ChannelConf { + conf, + state: ChannelState::Init(cssid), + }; + self.channels.insert(cid, conf); // TODO do not count, use separate queue for those channels. self.init_state_count += 1; } @@ -1169,7 +1125,6 @@ impl CaConn { fn channel_remove_by_cid(&mut self, cid: Cid) { self.cid_by_name.retain(|_, v| *v != cid); - self.name_by_cid.remove(&cid); self.channels.remove(&cid); } @@ -1183,13 +1138,12 @@ impl CaConn { } else { let cid = self.cid_store.next(); self.cid_by_name.insert(name.into(), cid); - self.name_by_cid.insert(cid, name.into()); cid } } fn name_by_cid(&self, cid: Cid) -> Option<&str> { - self.name_by_cid.get(&cid).map(|x| x.as_str()) + self.channels.get(&cid).map(|x| x.conf.name()) } fn backoff_next(&mut self) -> u64 { @@ -1205,7 +1159,8 @@ impl CaConn { fn channel_state_on_shutdown(&mut self, channel_reason: ChannelStatusClosedReason) { // TODO can I reuse emit_channel_info_insert_items ? trace!("channel_state_on_shutdown channels {}", self.channels.len()); - for (_cid, chst) in &mut self.channels { + for (_cid, conf) in &mut self.channels { + let chst = &mut conf.state; match chst { ChannelState::Init(cssid) => { *chst = ChannelState::Ended(cssid.clone()); @@ -1269,7 +1224,8 @@ impl CaConn { } let mut alive_count = 0; let mut not_alive_count = 0; - for (_, st) in &self.channels { + for (_, conf) in &self.channels { + let st = &conf.state; match st { ChannelState::Writable(st2) => { if tsnow.duration_since(st2.channel.ts_alive_last) >= Duration::from_millis(10000) { @@ -1290,7 +1246,8 @@ impl CaConn { fn emit_channel_info_insert_items(&mut self) -> Result<(), Error> { let timenow = self.tmp_ts_poll; - for (_, st) in &mut self.channels { + for (_, conf) in &mut self.channels { + let st = &mut conf.state; match st { ChannelState::Init(..) => { // TODO need last-save-ts for this state. @@ -1334,7 +1291,7 @@ impl CaConn { return Ok(()); }; let ch_s = if let Some(x) = self.channels.get_mut(&cid) { - x + &mut x.state } else { // TODO return better as error and let caller decide (with more structured errors) // TODO @@ -1377,7 +1334,7 @@ impl CaConn { return Ok(()); }; let ch_s = if let Some(x) = self.channels.get_mut(&cid) { - x + &mut x.state } else { // TODO return better as error and let caller decide (with more structured errors) warn!("TODO handle_event_add_res can not find channel for {cid:?} {subid:?}"); @@ -1481,7 +1438,7 @@ impl CaConn { return Ok(()); }; let ch_s = if let Some(x) = self.channels.get_mut(&cid) { - x + &mut x.state } else { // TODO return better as error and let caller decide (with more structured errors) warn!("TODO handle_event_add_res can not find channel for {cid:?} {subid:?}"); @@ -1532,7 +1489,7 @@ impl CaConn { let ioid = Ioid(ev.ioid); if let Some(cid) = self.read_ioids.get(&ioid) { let ch_s = if let Some(x) = self.channels.get_mut(cid) { - x + &mut x.state } else { warn!("handle_read_notify_res can not find channel for {cid:?} {ioid:?}"); return Ok(()); @@ -1621,7 +1578,7 @@ impl CaConn { let series = writer.sid(); // TODO should attach these counters already to Writable state. let ts_local = { - let epoch = stnow.duration_since(std::time::UNIX_EPOCH).unwrap(); + let epoch = stnow.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::ZERO); epoch.as_secs() * SEC + epoch.subsec_nanos() as u64 }; let ts = value.ts; @@ -1722,7 +1679,8 @@ impl CaConn { */ fn handle_handshake(&mut self, cx: &mut Context) -> Poll>> { use Poll::*; - match self.proto.as_mut().unwrap().poll_next_unpin(cx) { + let proto = self.proto.as_mut().ok_or_else(|| Error::NoProtocol)?; + match proto.poll_next_unpin(cx) { Ready(Some(k)) => match k { Ok(k) => match k { CaItem::Empty => { @@ -1770,16 +1728,16 @@ impl CaConn { if self.init_state_count == 0 { return Ok(()); } - let keys: Vec = self.channels.keys().map(|x| *x).collect(); + let channels = &mut self.channels; + let proto = self.proto.as_mut().ok_or_else(|| Error::NoProtocol)?; + let keys: Vec = channels.keys().map(|x| *x).collect(); for cid in keys { - match self.channels.get(&cid).unwrap() { + let conf = channels.get(&cid).ok_or_else(|| Error::UnknownCid(cid))?; + let st = &conf.state; + match st { ChannelState::Init(cssid) => { let cssid = cssid.clone(); - let name = self.name_by_cid(cid).ok_or_else(|| Error::UnknownCid(cid)); - let name = match name { - Ok(k) => k.to_string(), - Err(e) => return Err(e), - }; + let name = conf.conf.name(); let msg = CaMsg::from_ty_ts( CaMsgTy::CreateChan(CreateChan { cid: cid.0, @@ -1788,10 +1746,10 @@ impl CaConn { tsnow, ); do_wake_again = true; - self.proto.as_mut().unwrap().push_out(msg); - // TODO handle not-found error: - let ch_s = self.channels.get_mut(&cid).unwrap(); - *ch_s = ChannelState::Creating(CreatingState { + proto.push_out(msg); + // TODO handle not-found error, just count and continue? + let ch_s = channels.get_mut(&cid).ok_or_else(|| Error::UnknownCid(cid))?; + ch_s.state = ChannelState::Creating(CreatingState { tsbeg: tsnow, cssid, cid, @@ -1810,8 +1768,9 @@ impl CaConn { fn check_channels_state_poll(&mut self, tsnow: Instant, cx: &mut Context) -> Result<(), Error> { let mut do_wake_again = false; let channels = &mut self.channels; - for (_k, v) in channels { - match v { + for (_k, conf) in channels { + let chst = &mut conf.state; + match chst { ChannelState::Init(_) => {} ChannelState::Creating(_) => {} ChannelState::MakingSeriesWriter(_) => {} @@ -1924,7 +1883,8 @@ impl CaConn { // The channel status must be "Fail" so that ConnSet can decide to re-search. // TODO how to transition the channel state? Any invariants or simply write to the map? let cid = Cid(msg.cid); - if let Some(name) = self.name_by_cid.get(&cid) { + if let Some(conf) = self.channels.get(&cid) { + let name = conf.conf.name(); debug!("queue event to notive channel create fail {name}"); let item = CaConnEvent { ts: tsnow, @@ -1987,18 +1947,15 @@ impl CaConn { fn handle_create_chan_res(&mut self, k: proto::CreateChanRes, tsnow: Instant) -> Result<(), Error> { let cid = Cid(k.cid); let sid = Sid(k.sid); - let channels = &mut self.channels; - let name_by_cid = &self.name_by_cid; - // TODO handle cid-not-found which can also indicate peer error. - let name = if let Some(x) = name_by_cid.get(&cid) { - x.to_string() + let conf = if let Some(x) = self.channels.get_mut(&cid) { + x } else { - return Err(Error::NoNameForCid(cid)); + // TODO handle not-found error: just count for metrics? + warn!("CreateChanRes {:?} unknown", cid); + return Ok(()); }; - trace!("handle_create_chan_res {k:?} {name:?}"); - // TODO handle not-found error: - let ch_s = channels.get_mut(&cid).unwrap(); - let cssid = match ch_s { + let chst = &mut conf.state; + let cssid = match chst { ChannelState::Creating(st) => st.cssid.clone(), _ => { // TODO handle in better way: @@ -2043,11 +2000,11 @@ impl CaConn { account_count: 0, account_bytes: 0, }; - *ch_s = ChannelState::MakingSeriesWriter(MakingSeriesWriterState { tsbeg: tsnow, channel }); + *chst = ChannelState::MakingSeriesWriter(MakingSeriesWriterState { tsbeg: tsnow, channel }); let job = EstablishWorkerJob::new( JobId(cid.0 as _), self.backend.clone(), - name.into(), + conf.conf.name().into(), scalar_type, shape, self.writer_tx.clone(), @@ -2260,10 +2217,10 @@ impl CaConn { fn emit_channel_status(&mut self) -> Result<(), Error> { let mut channel_statuses = BTreeMap::new(); - for e in self.channels.iter() { - let ch = &e.1; - let chinfo = ch.to_info(ch.cssid(), self.remote_addr_dbg); - channel_statuses.insert(ch.cssid(), chinfo); + for (_, conf) in self.channels.iter() { + let chst = &conf.state; + let chinfo = chst.to_info(chst.cssid(), self.remote_addr_dbg, conf.conf.clone()); + channel_statuses.insert(chst.cssid(), chinfo); } trace!("emit_channel_status {}", channel_statuses.len()); let val = ChannelStatusPartial { channel_statuses }; @@ -2287,7 +2244,8 @@ impl CaConn { let stnow = self.tmp_ts_poll; let ts_sec = stnow.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); let ts_sec_snap = ts_sec / EMIT_ACCOUNTING_SNAP * EMIT_ACCOUNTING_SNAP; - for (_k, st0) in self.channels.iter_mut() { + for (_k, chconf) in self.channels.iter_mut() { + let st0 = &mut chconf.state; match st0 { ChannelState::Writable(st1) => { let ch = &mut st1.channel; @@ -2317,8 +2275,9 @@ impl CaConn { } fn tick_writers(&mut self) -> Result<(), Error> { - for (k, st) in &mut self.channels { - if let ChannelState::Writable(st2) = st { + for (_, chconf) in &mut self.channels { + let chst = &mut chconf.state; + if let ChannelState::Writable(st2) = chst { st2.writer.tick(&mut self.insert_item_queue)?; } } diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 0541efd..d9f6c2a 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -6,6 +6,7 @@ use crate::ca::statemap::CaConnState; use crate::ca::statemap::MaybeWrongAddressState; use crate::ca::statemap::WithAddressState; use crate::conf::CaIngestOpts; +use crate::conf::ChannelConfig; use crate::daemon_common::Channel; use crate::errconv::ErrConv; use crate::rt::JoinHandle; @@ -61,7 +62,7 @@ use std::collections::VecDeque; use std::net::SocketAddr; use std::net::SocketAddrV4; use std::pin::Pin; -use std::sync::atomic; + use std::sync::Arc; use std::task::Context; use std::task::Poll; @@ -138,7 +139,7 @@ impl CaConnRes { #[derive(Debug, Clone)] pub struct ChannelAddWithAddr { backend: String, - name: String, + ch_cfg: ChannelConfig, cssid: ChannelStatusSeriesId, addr: SocketAddr, } @@ -146,17 +147,23 @@ pub struct ChannelAddWithAddr { #[derive(Debug, Clone)] pub struct ChannelAddWithStatusId { backend: String, - name: String, + ch_cfg: ChannelConfig, cssid: ChannelStatusSeriesId, } #[derive(Debug, Clone)] pub struct ChannelAdd { backend: String, - name: String, + ch_cfg: ChannelConfig, restx: crate::ca::conn::CmdResTx, } +impl ChannelAdd { + pub fn name(&self) -> &str { + &self.ch_cfg.name() + } +} + #[derive(Debug, Clone)] pub struct ChannelRemove { name: String, @@ -246,10 +253,10 @@ impl CaConnSetCtrl { pub async fn add_channel( &self, backend: String, - name: String, + ch_cfg: ChannelConfig, restx: crate::ca::conn::CmdResTx, ) -> Result<(), Error> { - let cmd = ChannelAdd { backend, name, restx }; + let cmd = ChannelAdd { backend, ch_cfg, restx }; let cmd = ConnSetCmd::ChannelAdd(cmd); self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?; Ok(()) @@ -519,13 +526,10 @@ impl CaConnSet { trace3!("handle_add_channel but shutdown_stopping"); return Ok(()); } - trace3!("handle_add_channel {}", cmd.name); - if trigger.contains(&cmd.name.as_str()) { - debug!("handle_add_channel {cmd:?}"); - } + trace3!("handle_add_channel {:?}", cmd); self.stats.channel_add().inc(); // TODO should I add the transition through ActiveChannelState::Init as well? - let ch = Channel::new(cmd.name.clone()); + let ch = Channel::new(cmd.name().into()); let _st = if let Some(e) = self.channel_states.get_mut(&ch) { e } else { @@ -533,14 +537,16 @@ impl CaConnSet { value: ChannelStateValue::Active(ActiveChannelState::WaitForStatusSeriesId { since: SystemTime::now(), }), + config: cmd.ch_cfg.clone(), }; self.channel_states.insert(ch.clone(), item); self.channel_states.get_mut(&ch).unwrap() }; + let channel_name = cmd.name().into(); let tx = self.channel_info_res_tx.as_ref().get_ref().clone(); let item = ChannelInfoQuery { backend: cmd.backend, - channel: cmd.name, + channel: channel_name, kind: SeriesKind::ChannelStatus, scalar_type: ScalarType::ChannelStatus, shape: Shape::Scalar, @@ -571,34 +577,44 @@ impl CaConnSet { } match res { Ok(res) => { - let cssid = ChannelStatusSeriesId::new(res.series.to_series().id()); - self.channel_by_cssid - .insert(cssid.clone(), Channel::new(res.channel.clone())); - let add = ChannelAddWithStatusId { - backend: res.backend, - name: res.channel, - cssid, - }; - self.handle_add_channel_with_status_id(add)?; + let channel = Channel::new(res.channel.clone()); + // TODO must not depend on purely informative `self.channel_state` + if let Some(st) = self.channel_states.get_mut(&channel) { + let cssid = ChannelStatusSeriesId::new(res.series.to_series().id()); + self.channel_by_cssid + .insert(cssid.clone(), Channel::new(res.channel.clone())); + let add = ChannelAddWithStatusId { + backend: res.backend, + ch_cfg: st.config.clone(), + cssid, + }; + self.handle_add_channel_with_status_id(add)?; + Ok(()) + } else { + // TODO count for metrics + warn!("received series id for unknown channel"); + Ok(()) + } } Err(e) => { warn!("TODO handle error {e}"); + Ok(()) } } - Ok(()) } fn handle_add_channel_with_status_id(&mut self, cmd: ChannelAddWithStatusId) -> Result<(), Error> { - trace3!("handle_add_channel_with_status_id {}", cmd.name); + let name = cmd.ch_cfg.name(); + trace3!("handle_add_channel_with_status_id {}", name); if self.shutdown_stopping { debug!("handle_add_channel but shutdown_stopping"); return Ok(()); } self.stats.channel_status_series_found().inc(); - if trigger.contains(&cmd.name.as_str()) { + if trigger.contains(&name) { debug!("handle_add_channel_with_status_id {cmd:?}"); } - let ch = Channel::new(cmd.name.clone()); + let ch = Channel::new(name.into()); if let Some(chst) = self.channel_states.get_mut(&ch) { if let ChannelStateValue::Active(chst2) = &mut chst.value { if let ActiveChannelState::WaitForStatusSeriesId { since } = chst2 { @@ -614,7 +630,7 @@ impl CaConnSet { since: SystemTime::now(), }, }); - let qu = IocAddrQuery::cached(cmd.name); + let qu = IocAddrQuery::cached(name.into()); self.find_ioc_query_queue.push_back(qu); self.stats.ioc_search_start().inc(); } else { @@ -633,6 +649,7 @@ impl CaConnSet { } fn handle_add_channel_with_addr(&mut self, cmd: ChannelAddWithAddr) -> Result<(), Error> { + let name = cmd.ch_cfg.name(); if self.shutdown_stopping { trace3!("handle_add_channel but shutdown_stopping"); return Ok(()); @@ -642,10 +659,10 @@ impl CaConnSet { } else { return Err(Error::with_msg_no_trace("ipv4 for epics")); }; - if trigger.contains(&cmd.name.as_str()) { + if trigger.contains(&name) { debug!("handle_add_channel_with_addr {cmd:?}"); } - let ch = Channel::new(cmd.name.clone()); + let ch = Channel::new(name.into()); if let Some(chst) = self.channel_states.get_mut(&ch) { if let ChannelStateValue::Active(ast) = &mut chst.value { if let ActiveChannelState::WithStatusSeriesId(st3) = ast { @@ -673,7 +690,7 @@ impl CaConnSet { self.ca_conn_ress.insert(addr, c); } let conn_ress = self.ca_conn_ress.get_mut(&addr).unwrap(); - let cmd = ConnCommand::channel_add(cmd.name, cmd.cssid); + let cmd = ConnCommand::channel_add(cmd.ch_cfg, cmd.cssid); conn_ress.cmd_queue.push_back(cmd); } } @@ -728,7 +745,7 @@ impl CaConnSet { } for res in results { let ch = Channel::new(res.channel.clone()); - if trigger.contains(&ch.id()) { + if trigger.contains(&ch.name()) { trace!("handle_ioc_query_result {res:?}"); } if let Some(chst) = self.channel_states.get_mut(&ch) { @@ -739,7 +756,7 @@ impl CaConnSet { trace!("ioc found {res:?}"); let cmd = ChannelAddWithAddr { backend: self.backend.clone(), - name: res.channel, + ch_cfg: chst.config.clone(), addr: SocketAddr::V4(addr), cssid: st2.cssid.clone(), }; @@ -793,8 +810,8 @@ impl CaConnSet { let channels_ca_conn_set = self .channel_states .iter() - .filter(|(k, _)| reg1.is_match(k.id())) - .map(|(k, v)| (k.id().to_string(), v.clone())) + .filter(|(k, _)| reg1.is_match(k.name())) + .map(|(k, v)| (k.name().to_string(), v.clone())) .collect(); let item = ChannelStatusesResponse { channels_ca_conn_set }; if req.tx.try_send(item).is_err() { @@ -942,9 +959,9 @@ impl CaConnSet { } = &mut st3.inner { if SocketAddr::V4(*addr_ch) == addr { - if trigger.contains(&ch.id()) { + if trigger.contains(&ch.name()) { self.connect_fail_count += 1; - debug!(" connect fail, maybe wrong address for {} {}", addr, ch.id()); + debug!(" connect fail, maybe wrong address for {} {}", addr, ch.name()); } if self.connect_fail_count > 400 { std::process::exit(1); @@ -1290,7 +1307,7 @@ impl CaConnSet { } else { search_pending_count += 1; st3.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: stnow }; - let qu = IocAddrQuery::uncached(ch.id().into()); + let qu = IocAddrQuery::uncached(ch.name().into()); self.find_ioc_query_queue.push_back(qu); self.stats.ioc_search_start().inc(); } @@ -1317,7 +1334,7 @@ impl CaConnSet { assigned_without_health_update += 1; let cmd = ChannelAddWithAddr { backend: self.backend.clone(), - name: ch.id().into(), + ch_cfg: st.config.clone(), cssid: st3.cssid.clone(), addr: SocketAddr::V4(*addr_v4), }; @@ -1358,12 +1375,12 @@ impl CaConnSet { if st4.since + st4.backoff_dt < stnow { if search_pending_count < CURRENT_SEARCH_PENDING_MAX as _ { trace!("try again channel after MaybeWrongAddress"); - if trigger.contains(&ch.id()) { - debug!("issue ioc search for {}", ch.id()); + if trigger.contains(&ch.name()) { + debug!("issue ioc search for {}", ch.name()); } search_pending_count += 1; st3.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: stnow }; - let qu = IocAddrQuery::uncached(ch.id().into()); + let qu = IocAddrQuery::uncached(ch.name().into()); self.find_ioc_query_queue.push_back(qu); self.stats.ioc_search_start().inc(); } @@ -1385,10 +1402,10 @@ impl CaConnSet { } for (addr, ch) in cmd_remove_channel { if let Some(g) = self.ca_conn_ress.get_mut(&addr) { - let cmd = ConnCommand::channel_close(ch.id().into()); + let cmd = ConnCommand::channel_close(ch.name().into()); g.cmd_queue.push_back(cmd); } - let cmd = ChannelRemove { name: ch.id().into() }; + let cmd = ChannelRemove { name: ch.name().into() }; self.handle_remove_channel(cmd)?; } for cmd in cmd_add_channel { diff --git a/netfetch/src/ca/statemap.rs b/netfetch/src/ca/statemap.rs index 8ceec88..b60f880 100644 --- a/netfetch/src/ca/statemap.rs +++ b/netfetch/src/ca/statemap.rs @@ -1,4 +1,5 @@ use crate::ca::conn::ChannelStateInfo; +use crate::conf::ChannelConfig; use crate::daemon_common::Channel; use dashmap::DashMap; use serde::Serialize; @@ -133,6 +134,7 @@ pub enum ChannelStateValue { #[derive(Debug, Clone, Serialize)] pub struct ChannelState { pub value: ChannelStateValue, + pub config: ChannelConfig, } #[derive(Debug, Clone, Serialize)] diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 8d2abda..d823269 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -188,7 +188,7 @@ pub async fn parse_config(config: PathBuf) -> Result<(CaIngestOpts, Option Result { } else { break; }; - - // TODO parse the yml file at this path and compile a merged configuration - e.path(); - todo!(); + let fnp = e.path(); + let fns = fnp.to_str().unwrap(); + if fns.ends_with(".yml") || fns.ends_with(".yaml") { + let buf = tokio::fs::read(e.path()).await?; + let conf: BTreeMap = + serde_yaml::from_slice(&buf).map_err(Error::from_string)?; + ret.push_from_parsed(&conf); + } else { + debug!("ignore channel config file {:?}", e.path()); + } } Ok(ret) } @@ -271,23 +277,39 @@ pub struct ChannelConfigParse { archiving_configuration: IngestConfigArchiving, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct IngestConfigArchiving { - #[serde(default, with = "serde_replication_bool")] + #[serde(default, skip_serializing_if = "bool_is_false")] + #[serde(with = "serde_replication_bool")] replication: bool, - #[serde(default, with = "serde_option_channel_read_config")] + #[serde(default, skip_serializing_if = "Option::is_none")] + #[serde(with = "serde_option_channel_read_config")] short_term: Option, - #[serde(default, with = "serde_option_channel_read_config")] + #[serde(default, skip_serializing_if = "Option::is_none")] + #[serde(with = "serde_option_channel_read_config")] medium_term: Option, - #[serde(default, with = "serde_option_channel_read_config")] + #[serde(default, skip_serializing_if = "Option::is_none")] + #[serde(with = "serde_option_channel_read_config")] long_term: Option, } +fn bool_is_false(x: &bool) -> bool { + *x == false +} + mod serde_replication_bool { use serde::de; use serde::Deserializer; + use serde::Serializer; use std::fmt; + pub fn serialize(v: &bool, ser: S) -> Result + where + S: Serializer, + { + ser.serialize_bool(*v) + } + pub fn deserialize<'de, D>(de: D) -> Result where D: Deserializer<'de>, @@ -333,9 +355,23 @@ mod serde_option_channel_read_config { use super::ChannelReadConfig; use serde::de; use serde::Deserializer; + use serde::Serializer; use std::fmt; use std::time::Duration; + pub fn serialize(v: &Option, ser: S) -> Result + where + S: Serializer, + { + match v { + Some(x) => match x { + ChannelReadConfig::Monitor => ser.serialize_str("Monitor"), + ChannelReadConfig::Poll(n) => ser.serialize_u32(n.as_secs() as u32), + }, + None => ser.serialize_none(), + } + } + pub fn deserialize<'de, D>(de: D) -> Result, D::Error> where D: Deserializer<'de>, @@ -391,7 +427,7 @@ mod serde_option_channel_read_config { } } -#[derive(Debug, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum ChannelReadConfig { Monitor, Poll(Duration), @@ -421,6 +457,7 @@ CH-03: let x: BTreeMap = serde_yaml::from_str(inp).unwrap(); } +#[derive(Debug)] pub struct ChannelsConfig { channels: Vec, } @@ -429,6 +466,24 @@ impl ChannelsConfig { fn new() -> Self { Self { channels: Vec::new() } } + + pub fn len(&self) -> usize { + self.channels.len() + } + + pub fn channels(&self) -> &Vec { + &self.channels + } + + fn push_from_parsed(&mut self, rhs: &BTreeMap) { + for (k, v) in rhs.iter() { + let item = ChannelConfig { + name: k.into(), + arch: v.archiving_configuration.clone(), + }; + self.channels.push(item); + } + } } impl From> for ChannelsConfig { @@ -444,7 +499,26 @@ impl From> for ChannelsConfig { } } +#[derive(Debug, Clone, Serialize)] pub struct ChannelConfig { name: String, arch: IngestConfigArchiving, } + +impl ChannelConfig { + pub fn st_monitor>(name: S) -> Self { + Self { + name: name.into(), + arch: IngestConfigArchiving { + replication: true, + short_term: Some(ChannelReadConfig::Monitor), + medium_term: None, + long_term: None, + }, + } + } + + pub fn name(&self) -> &str { + &self.name + } +} diff --git a/netfetch/src/daemon_common.rs b/netfetch/src/daemon_common.rs index 19596b7..781d2c2 100644 --- a/netfetch/src/daemon_common.rs +++ b/netfetch/src/daemon_common.rs @@ -1,26 +1,27 @@ use crate::ca::connset::CaConnSetItem; +use crate::conf::ChannelConfig; use async_channel::Sender; use serde::Serialize; #[derive(Clone, Debug, Serialize, PartialEq, PartialOrd, Eq, Ord, Hash)] pub struct Channel { - id: String, + name: String, } impl Channel { - pub fn new(id: String) -> Self { - Self { id } + pub fn new(name: String) -> Self { + Self { name } } - pub fn id(&self) -> &str { - &self.id + pub fn name(&self) -> &str { + &self.name } } #[derive(Debug, Clone)] pub enum DaemonEvent { TimerTick(u32, Sender), - ChannelAdd(Channel, crate::ca::conn::CmdResTx), + ChannelAdd(ChannelConfig, crate::ca::conn::CmdResTx), ChannelRemove(Channel), CaConnSetItem(CaConnSetItem), Shutdown, diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index f70dced..f7cb124 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -6,6 +6,7 @@ use crate::ca::connset::ChannelStatusesRequest; use crate::ca::connset::ChannelStatusesResponse; use crate::ca::connset::ConnSetCmd; use crate::ca::statemap::ChannelState; +use crate::conf::ChannelConfig; use crate::daemon_common::DaemonEvent; use async_channel::Receiver; use async_channel::Sender; @@ -140,9 +141,10 @@ async fn find_channel( async fn channel_add_inner(params: HashMap, dcom: Arc) -> Result<(), Error> { if let Some(name) = params.get("name") { - let ch = crate::daemon_common::Channel::new(name.into()); + // let ch = crate::daemon_common::Channel::new(name.into()); + let ch_cfg = ChannelConfig::st_monitor(name); let (tx, rx) = async_channel::bounded(1); - let ev = DaemonEvent::ChannelAdd(ch, tx); + let ev = DaemonEvent::ChannelAdd(ch_cfg, tx); dcom.tx.send(ev).await?; match rx.recv().await { Ok(Ok(())) => Ok(()), diff --git a/netfetch/src/metrics/status.rs b/netfetch/src/metrics/status.rs index f3dcd42..6855861 100644 --- a/netfetch/src/metrics/status.rs +++ b/netfetch/src/metrics/status.rs @@ -1,6 +1,7 @@ use crate::ca::connset::CaConnSetEvent; use crate::ca::connset::ChannelStatusesRequest; use crate::ca::connset::ConnSetCmd; +use crate::conf::ChannelConfig; use async_channel::Sender; use serde::Serialize; use std::collections::BTreeMap; @@ -16,7 +17,7 @@ pub struct ChannelStates { struct ChannelState { ioc_address: Option, connection: ConnectionState, - archive_settings: ArchiveSettings, + archiving_configuration: ChannelConfig, } #[derive(Debug, Serialize)] @@ -28,23 +29,6 @@ enum ConnectionState { Error, } -#[derive(Debug, Serialize)] -struct ArchiveSettings { - short_term: Option, - medium_term: Option, - long_term: Option, -} - -impl ArchiveSettings { - fn dummy() -> Self { - Self { - short_term: None, - medium_term: None, - long_term: None, - } - } -} - // ChannelStatusesResponse // BTreeMap pub async fn channel_states(params: HashMap, tx: Sender) -> axum::Json { @@ -73,7 +57,7 @@ pub async fn channel_states(params: HashMap, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender