From db3e243e2ee5a7eb848043262dc7b466e6c96c07 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 7 Apr 2025 16:36:11 +0200 Subject: [PATCH] WIP transition metrics --- daqingest/src/daemon.rs | 8 +- netfetch/src/ca/conn.rs | 231 +++++++++++++++++++++------------- netfetch/src/ca/connset.rs | 9 +- netfetch/src/ca/findioc.rs | 68 +++++----- netfetch/src/metrics/types.rs | 23 +--- stats/Cargo.toml | 2 + stats/mettdecl.rs | 68 ++++++++++ stats/src/mett.rs | 3 + stats/src/stats.rs | 3 +- stats_proc/src/stats_proc.rs | 2 +- 10 files changed, 270 insertions(+), 147 deletions(-) create mode 100644 stats/mettdecl.rs create mode 100644 stats/src/mett.rs diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 99b0070..cf019a6 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -86,7 +86,7 @@ pub struct Daemon { // TODO series_conf_by_id_tx: Sender<()>, iqtx: Option, - daemon_metrics: DaemonMetrics, + daemon_metrics: stats::mett::DaemonMetrics, } impl Daemon { @@ -387,7 +387,7 @@ impl Daemon { channel_info_query_tx, series_conf_by_id_tx, iqtx: Some(iqtx2), - daemon_metrics: DaemonMetrics::new(), + daemon_metrics: stats::mett::DaemonMetrics::new(), }; Ok(ret) } @@ -564,8 +564,8 @@ impl Daemon { error!("error from CaConnSet: {e}"); self.handle_shutdown().await?; } - Metrics(metrics) => { - self.daemon_metrics.ingest_ca_conn_set(metrics); + Metrics(x) => { + self.daemon_metrics.ca_conn_set().ingest(x); } } Ok(()) diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 54994e1..175c25e 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -326,7 +326,7 @@ enum Monitoring2State { ReadPending(Monitoring2ReadPendingState), } #[derive(Debug, Clone, Serialize)] -enum MonitorReadCmp { +pub enum MonitorReadCmp { Equal, DiffTime, DiffTimeValue, @@ -1014,7 +1014,7 @@ pub enum CaConnEventValue { ChannelCreateFail(String), EndOfStream(EndOfStreamReason), ChannelRemoved(String), - Metrics(CaConnMetrics), + Metrics(stats::mett::CaConnMetrics), } impl CaConnEventValue { @@ -1072,6 +1072,9 @@ pub struct CaConn { state: CaConnState, ticker: Pin>, proto: Option, + version_seen: bool, + non_version_seen: bool, + non_version_seen_emitted: bool, cid_store: CidStore, subid_store: SubidStore, channels: HashMap, @@ -1112,9 +1115,8 @@ pub struct CaConn { read_ioids: HashMap, handler_by_ioid: HashMap>>>, trace_channel_poll: bool, - ca_msg_recv_cnt: u64, - ca_version_recv_count: u64, ts_channel_status_pong_last: Instant, + mett: stats::mett::CaConnMetrics, } impl Drop for CaConn { @@ -1143,6 +1145,9 @@ impl CaConn { state: CaConnState::Unconnected(tsnow), ticker: Self::new_self_ticker(&mut rng), proto: None, + version_seen: false, + non_version_seen: false, + non_version_seen_emitted: false, cid_store: CidStore::new_from_time(), subid_store: SubidStore::new_from_time(), init_state_count: 0, @@ -1179,9 +1184,8 @@ impl CaConn { read_ioids: HashMap::new(), handler_by_ioid: HashMap::new(), trace_channel_poll: false, - ca_msg_recv_cnt: 0, - ca_version_recv_count: 0, ts_channel_status_pong_last: tsnow, + mett: stats::mett::CaConnMetrics::new(), } } @@ -1285,7 +1289,7 @@ impl CaConn { status: ConnectionStatus::Closing, }; if self.emit_connection_status_item(item).is_err() { - self.stats.logic_error().inc(); + self.mett.logic_error().inc(); } self.proto = None; } @@ -1299,7 +1303,6 @@ impl CaConn { fn cmd_channel_close(&mut self, name: String) { self.channel_close(name); // TODO return the result - //self.stats.caconn_command_can_not_reply.inc(); } fn cmd_shutdown(&mut self) { @@ -1309,7 +1312,7 @@ impl CaConn { fn handle_conn_command(&mut self, cx: &mut Context) -> Result>, Error> { use Poll::*; - self.stats.loop3_count.inc(); + self.mett.loop3_count().inc(); if self.is_shutdown() { Ok(Ready(None)) } else { @@ -1480,7 +1483,7 @@ impl CaConn { st2.channel.shape.clone(), conf.conf.name().into(), )?; - self.stats.get_series_id_ok.inc(); + self.stats.get_series_id_ok().inc(); { let item = ChannelStatusItem { ts: self.tmp_ts_poll, @@ -1568,7 +1571,7 @@ impl CaConn { } } if self.cid_by_name(conf.name()).is_some() { - self.stats.channel_add_exists.inc(); + self.mett.channel_add_exists().inc(); if series::dbg::dbg_chn(&conf.name()) { error!("logic error channel already exists {conf:?}"); } @@ -1576,7 +1579,7 @@ impl CaConn { } else { let cid = self.cid_by_name_or_insert(conf.name())?; if self.channels.contains_key(&cid) { - self.stats.channel_add_exists.inc(); + self.mett.channel_add_exists().inc(); if series::dbg::dbg_chn(&conf.name()) { error!("logic error channel already exists {conf:?}"); } @@ -1610,7 +1613,7 @@ impl CaConn { }; let qu = Self::channel_status_qu(&mut self.iqdqs); if conf.wrst.emit_channel_status_item(item, qu).is_err() { - self.stats.logic_error().inc(); + self.mett.logic_error().inc(); } // TODO shutdown the internal writer structures. if let Some(cst) = conf.state.created_state() { @@ -1716,7 +1719,7 @@ impl CaConn { status: ChannelStatus::Closed(channel_reason.clone()), }; if conf.wrst.emit_channel_status_item(item, status_qu).is_err() { - self.stats.logic_error().inc(); + self.mett.logic_error().inc(); } } for (_cid, conf) in &mut self.channels { @@ -1761,7 +1764,7 @@ impl CaConn { let cid = if let Some(x) = self.cid_by_subid.get(&subid) { *x } else { - self.stats.unknown_subid().inc(); + self.mett.unknown_subid().inc(); return Ok(()); }; let ch_s = if let Some(x) = self.channels.get_mut(&cid) { @@ -1804,7 +1807,7 @@ impl CaConn { *x } else { if self.thr_msg_poll.is_action() { - self.stats.no_cid_for_subid().inc(); + self.mett.no_cid_for_subid().inc(); // debug!("can not find cid for subid {subid:?}"); } // return Err(Error::with_msg_no_trace()); @@ -1859,19 +1862,19 @@ impl CaConn { Some(x.subid.clone()) } ReadingState::StopMonitoringForPolling(_) => { - self.stats.transition_to_polling_bad_state().inc(); + self.mett.transition_to_polling_bad_state().inc(); None } ReadingState::Polling(_) => { - self.stats.transition_to_polling_already_in().inc(); + self.mett.transition_to_polling_already_in().inc(); None } }; if let Some(subid) = subid { - self.stats.transition_to_polling().inc(); + self.mett.transition_to_polling().inc(); self.transition_to_polling(subid, tsnow)?; } else { - self.stats.transition_to_polling_bad_state().inc(); + self.mett.transition_to_polling_bad_state().inc(); } return Ok(()); } @@ -1918,7 +1921,7 @@ impl CaConn { Monitoring2State::ReadPending(_) => { // Received EventAdd while still waiting for answer to explicit ReadNotify. // This is fine. - self.stats.recv_event_add_while_wait_on_read_notify.inc(); + self.mett.recv_event_add_while_wait_on_read_notify().inc(); } } let crst = &mut st.channel; @@ -1963,10 +1966,10 @@ impl CaConn { | ChannelState::FetchEnumDetails(_) | ChannelState::FetchCaStatusSeries(_) | ChannelState::MakingSeriesWriter(_) => { - self.stats.recv_read_notify_but_not_init_yet.inc(); + self.mett.recv_read_notify_but_not_init_yet().inc(); } ChannelState::Closing(_) | ChannelState::Ended(_) | ChannelState::Error(_) => { - self.stats.recv_read_notify_but_no_longer_ready.inc(); + self.mett.recv_read_notify_but_no_longer_ready().inc(); } } Ok(()) @@ -1979,7 +1982,7 @@ impl CaConn { *x } else { if self.thr_msg_poll.is_action() { - self.stats.no_cid_for_subid().inc(); + self.mett.no_cid_for_subid().inc(); // debug!("can not find cid for subid {subid:?}"); } // return Err(Error::with_msg_no_trace()); @@ -2090,13 +2093,18 @@ impl CaConn { match &mut st.reading { ReadingState::Polling(st2) => match &mut st2.tick { PollTickState::Idle(_) => { - self.stats.recv_read_notify_while_polling_idle.inc(); + self.mett.recv_read_notify_while_polling_idle().inc(); } PollTickState::Wait(st3) => { let dt = tsnow.saturating_duration_since(st3.since); - self.stats.caget_lat().ingest_dur_dms(dt); + // TODO STATS + // self.stats.caget_lat().ingest_dur_dms(dt); // TODO maintain histogram of read-notify latencies - self.read_ioids.remove(&st3.ioid); + if self.read_ioids.remove(&st3.ioid).is_some() { + self.mett.ioid_read_done().inc(); + } else { + self.mett.ioid_read_error_not_found().inc(); + } let next = PollTickStateIdle::decide_next(st3.next_backup, st2.poll_ivl, tsnow); if self.trace_channel_poll { trace!("make next poll idle at {:?} tsnow {:?}", next, tsnow); @@ -2123,9 +2131,10 @@ impl CaConn { ReadingState::Monitoring(st2) => match &mut st2.mon2state { Monitoring2State::Passive(st3) => { if self.read_ioids.remove(&ioid).is_some() { - self.stats.recv_read_notify_state_passive_found_ioid.inc(); + self.mett.ioid_read_done().inc(); + self.mett.recv_read_notify_state_passive_found_ioid().inc(); } else { - self.stats.recv_read_notify_state_passive.inc(); + self.mett.ioid_read_error_not_found().inc(); } st3.tsbeg = tsnow; } @@ -2134,13 +2143,15 @@ impl CaConn { // So we could be here a little beyond timeout but we don't care about that. if ioid != st3.ioid { // warn!("IOID mismatch ReadNotifyRes on Monitor Read Pending {ioid:?} {ioid2:?}"); - self.stats.recv_read_notify_state_read_pending_bad_ioid.inc(); + self.mett.recv_read_notify_state_read_pending_bad_ioid().inc(); } else { - self.stats.recv_read_notify_state_read_pending.inc(); + self.mett.recv_read_notify_state_read_pending().inc(); } let read_expected = if let Some(_cid) = self.read_ioids.remove(&ioid) { + self.mett.ioid_read_done().inc(); true } else { + self.mett.ioid_read_error_not_found().inc(); false }; st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState { @@ -2219,7 +2230,7 @@ impl CaConn { } } else { // warn!("unknown {ioid:?}"); - self.stats.unknown_ioid().inc(); + self.mett.unknown_ioid().inc(); } Ok(()) } @@ -2517,11 +2528,19 @@ impl CaConn { Monitoring2State::Passive(st4) => { if st4.tsbeg + conf.conf.manual_poll_on_quiet_after() < tsnow { trace_monitor_stale!("check_channels_state_poll Monitoring2State::Passive timeout"); - self.stats.monitor_stale_read_begin().inc(); + self.mett.monitor_stale_read_begin().inc(); // TODO encapsulate and unify with Polling handler let ioid = Ioid(self.ioid); self.ioid = self.ioid.wrapping_add(1); - self.read_ioids.insert(ioid, st2.channel.cid.clone()); + self.read_ioids + .entry(ioid) + .and_modify(|e| { + self.mett.ioid_read_error_exists().inc(); + }) + .or_insert_with(|| { + self.mett.ioid_read_begin().inc(); + st2.channel.cid.clone() + }); let msg = CaMsg::from_ty_ts( CaMsgTy::ReadNotify(ReadNotify { data_type: st2.channel.ca_dbr_type, @@ -2535,7 +2554,7 @@ impl CaConn { self.proto.as_mut().ok_or_else(|| Error::NoProtocol)?.push_out(msg); st3.mon2state = Monitoring2State::ReadPending(Monitoring2ReadPendingState { tsbeg: tsnow, ioid }); - self.stats.caget_issued().inc(); + self.mett.caget_issued().inc(); { let item = ChannelStatusItem { ts: self.tmp_ts_poll, @@ -2552,7 +2571,7 @@ impl CaConn { // Something is wrong with this channel. // Maybe we lost connection, maybe the IOC went down, maybe there is a bug where only // this or a subset of the subscribed channels no longer give updates. - self.stats.monitor_stale_read_timeout().inc(); + self.mett.monitor_stale_read_timeout().inc(); let name = conf.conf.name(); trace_monitor_stale!( "channel monitor explicit read timeout {} ioid {:?}", @@ -2600,7 +2619,15 @@ impl CaConn { if st4.next <= tsnow { let ioid = Ioid(self.ioid); self.ioid = self.ioid.wrapping_add(1); - self.read_ioids.insert(ioid, st2.channel.cid.clone()); + self.read_ioids + .entry(ioid) + .and_modify(|e| { + self.mett.ioid_read_error_exists().inc(); + }) + .or_insert_with(|| { + self.mett.ioid_read_begin().inc(); + st2.channel.cid.clone() + }); let msg = CaMsg::from_ty_ts( CaMsgTy::ReadNotify(ReadNotify { data_type: st2.channel.ca_dbr_type, @@ -2617,13 +2644,15 @@ impl CaConn { since: tsnow, ioid, }); - self.stats.caget_issued().inc(); + self.mett.caget_issued().inc(); } } PollTickState::Wait(st4) => { if st4.since + POLL_READ_TIMEOUT <= tsnow { - self.read_ioids.remove(&st4.ioid); - self.stats.caget_timeout().inc(); + if self.read_ioids.remove(&st4.ioid).is_some() { + self.mett.ioid_read_timeout().inc(); + } + self.mett.caget_timeout().inc(); let next = PollTickStateIdle::decide_next(st4.next_backup, st3.poll_ivl, tsnow); if self.trace_channel_poll { trace!("make poll idle after poll timeout {next:?}"); @@ -2657,7 +2686,7 @@ impl CaConn { trace3!("check_channels_alive {}", self.remote_addr_dbg); if let Some(started) = self.ioc_ping_start { if started + TIMEOUT_PONG_WAIT < tsnow { - self.stats.pong_timeout().inc(); + self.mett.pong_timeout().inc(); info!("pong timeout {}", self.remote_addr_dbg); self.ioc_ping_start = None; let item = CaConnEvent { @@ -2670,13 +2699,13 @@ impl CaConn { } else { if self.ioc_ping_next < tsnow { if let Some(proto) = &mut self.proto { - self.stats.ping_start().inc(); + self.mett.ping_start().inc(); self.ioc_ping_start = Some(tsnow); self.ioc_ping_last = Some(self.tmp_ts_poll); let msg = CaMsg::from_ty_ts(CaMsgTy::Echo, tsnow); proto.push_out(msg); } else { - self.stats.ping_no_proto().inc(); + self.mett.ping_no_proto().inc(); info!("can not ping {} no proto", self.remote_addr_dbg); self.trigger_shutdown(ShutdownReason::ProtocolMissing); } @@ -2709,7 +2738,7 @@ impl CaConn { } if st2.channel.ts_activity_last + conf.conf.expect_activity_within() < tsnow { not_alive_count += 1; - self.stats.channel_not_alive_no_activity.inc(); + self.mett.channel_not_alive_no_activity().inc(); } else { alive_count += 1; } @@ -2717,9 +2746,10 @@ impl CaConn { _ => {} } } - self.stats.channel_all_count.__set(self.channels.len() as _); - self.stats.channel_alive_count.__set(alive_count as _); - self.stats.channel_not_alive_count.__set(not_alive_count as _); + // TODO STATS + // self.stats.channel_all_count.__set(self.channels.len() as _); + // self.stats.channel_alive_count.__set(alive_count as _); + // self.stats.channel_not_alive_count.__set(not_alive_count as _); Ok(()) } @@ -2730,9 +2760,10 @@ impl CaConn { let mut ts1 = Instant::now(); // TODO unify with Listen state where protocol gets polled as well. let ts2 = Instant::now(); - self.stats - .time_check_channels_state_init - .add((ts2.duration_since(ts1) * MS as u32).as_secs()); + // TODO STATS + // self.mett + // .time_check_channels_state_init + // .add((ts2.duration_since(ts1) * 1000).as_secs()); ts1 = ts2; let _ = ts1; let tsnow = Instant::now(); @@ -2745,6 +2776,35 @@ impl CaConn { Ready(Some(Ok(k))) => { match k { CaItem::Msg(camsg) => { + match &camsg.ty { + CaMsgTy::Version => { + if !self.version_seen { + self.version_seen = true; + if self.non_version_seen { + self.mett.ca_proto_version_later().inc(); + } + } + } + CaMsgTy::VersionRes(_) => { + if !self.version_seen { + self.version_seen = true; + if self.non_version_seen { + self.mett.ca_proto_version_later().inc(); + } + } + } + _ => { + if !self.non_version_seen { + self.non_version_seen = true; + } + if !self.version_seen { + if !self.non_version_seen_emitted { + self.non_version_seen_emitted = true; + self.mett.ca_proto_no_version_as_first().inc(); + } + } + } + } match camsg.ty { CaMsgTy::SearchRes(k) => { let a = k.addr.to_be_bytes(); @@ -2759,7 +2819,7 @@ impl CaConn { } CaMsgTy::EventAddRes(ev) => { trace4!("got EventAddRes {:?} cnt {}", camsg.ts, ev.data_count); - self.stats.event_add_res_recv.inc(); + self.mett.event_add_res_recv().inc(); Self::handle_event_add_res(self, ev, tsnow)? } CaMsgTy::EventAddResEmpty(ev) => { @@ -2771,7 +2831,8 @@ impl CaConn { // let addr = &self.remote_addr_dbg; if let Some(started) = self.ioc_ping_start { let dt = started.elapsed(); - self.stats.pong_recv_lat().ingest_dur_dms(dt); + // TODO STATS + // self.stats.pong_recv_lat().ingest_dur_dms(dt); } else { let addr = &self.remote_addr_dbg; warn!("received Echo even though we didn't asked for it {addr:?}"); @@ -2811,9 +2872,10 @@ impl CaConn { } CaMsgTy::VersionRes(_) => { // TODO must check that version is only ever the first message. - if false { - self.stats.ca_proto_version_later().inc(); + if self.non_version_seen { + self.mett.ca_proto_version_later().inc(); } + self.version_seen = true; } CaMsgTy::ChannelCloseRes(x) => { self.handle_channel_close_res(x, tsnow)?; @@ -2825,11 +2887,7 @@ impl CaConn { } CaItem::Empty => {} } - if false { - // TODO use flags to keep track of whether we have seen version or other msgs. - self.stats.ca_proto_no_version_as_first().inc(); - } - self.ca_msg_recv_cnt = self.ca_msg_recv_cnt.saturating_add(1); + self.mett.ca_msg_recv().inc(); Ready(Some(Ok(()))) } Ready(Some(Err(e))) => { @@ -2982,7 +3040,7 @@ impl CaConn { Ready(connect_result) => { match connect_result { Ok(Ok(tcp)) => { - self.stats.tcp_connected.inc(); + self.mett.tcp_connected().inc(); let addr = addr.clone(); self.emit_connection_status_item(ConnectionStatusItem { ts: self.tmp_ts_poll, @@ -3085,7 +3143,7 @@ impl CaConn { let mut have_progress = false; let mut have_pending = false; for _ in 0..64 { - self.stats.loop2_count.inc(); + self.mett.loop2_count().inc(); if self.is_shutdown() { break; } else if self.iqdqs.len() >= self.opts.insert_queue_max { @@ -3186,12 +3244,7 @@ impl CaConn { fn housekeeping_self(&mut self) {} fn metrics_emit(&mut self) { - let ca_msg_recv_cnt = self.ca_msg_recv_cnt; - self.ca_msg_recv_cnt = 0; - let item = CaConnMetrics { - ca_conn_event_out_queue_len: self.ca_conn_event_out_queue.len(), - ca_msg_recv_cnt, - }; + let item = self.mett.take_and_reset(); let item = CaConnEvent::new(Instant::now(), CaConnEventValue::Metrics(item)); self.ca_conn_event_out_queue.push_back(item); } @@ -3216,7 +3269,7 @@ impl CaConn { // What to do if limit reached? // Increase some error counter. if self.ca_conn_event_out_queue.len() > self.ca_conn_event_out_queue_max { - self.stats.out_queue_full().inc(); + self.mett.out_queue_full().inc(); } else { self.ca_conn_event_out_queue.push_back(item); } @@ -3300,7 +3353,7 @@ impl CaConn { Self::channel_status_pong_qu(&mut self.iqdqs) }; if ch.wrst.emit_channel_status_item(item, qu).is_err() { - self.stats.logic_error().inc(); + self.mett.logic_error().inc(); } } ChannelState::Closing(_) => {} @@ -3475,17 +3528,17 @@ impl Stream for CaConn { self.poll_tsnow = Instant::now(); self.tmp_ts_poll = SystemTime::now(); let poll_ts1 = Instant::now(); - self.stats.poll_fn_begin().inc(); + self.mett.poll_fn_begin().inc(); let mut reloops: u32 = 0; let ret = loop { let lts1 = Instant::now(); - self.stats.poll_loop_begin().inc(); + self.mett.poll_loop_begin().inc(); let qlen = self.iqdqs.len(); if qlen >= self.opts.insert_queue_max * 2 / 3 { - self.stats.insert_item_queue_pressure().inc(); + self.mett.insert_item_queue_pressure().inc(); } else if qlen >= self.opts.insert_queue_max { - self.stats.insert_item_queue_full().inc(); + self.mett.insert_item_queue_full().inc(); } let mut have_pending = false; @@ -3511,12 +3564,14 @@ impl Stream for CaConn { { let n = self.iqdqs.len(); + // TODO STATS self.stats.iiq_len().ingest(n as u32); } { let stats2 = self.stats.clone(); let stats_fn = move |item: &VecDeque| { + // TODO STATS stats2.iiq_batch_len().ingest(item.len() as u32); }; flush_queue_dqs!( @@ -3533,6 +3588,7 @@ impl Stream for CaConn { let stats2 = self.stats.clone(); let stats_fn = move |item: &VecDeque| { + // TODO STATS stats2.iiq_batch_len().ingest(item.len() as u32); }; flush_queue_dqs!( @@ -3549,6 +3605,7 @@ impl Stream for CaConn { let stats2 = self.stats.clone(); let stats_fn = move |item: &VecDeque| { + // TODO STATS stats2.iiq_batch_len().ingest(item.len() as u32); }; flush_queue_dqs!( @@ -3565,6 +3622,7 @@ impl Stream for CaConn { let stats2 = self.stats.clone(); let stats_fn = move |item: &VecDeque| { + // TODO STATS stats2.iiq_batch_len().ingest(item.len() as u32); }; flush_queue_dqs!( @@ -3581,6 +3639,7 @@ impl Stream for CaConn { let stats2 = self.stats.clone(); let stats_fn = move |item: &VecDeque| { + // TODO STATS stats2.iiq_batch_len().ingest(item.len() as u32); }; flush_queue_dqs!( @@ -3663,6 +3722,7 @@ impl Stream for CaConn { debug!("LONG OPERATION 2 {:.0} ms", 1e3 * dt.as_secs_f32()); } let dt = lts3.saturating_duration_since(lts2); + // TODO STATS self.stats.poll_op3_dt().ingest_dur_dms(dt); if dt > max { debug!("LONG OPERATION 3 {:.0} ms", 1e3 * dt.as_secs_f32()); @@ -3695,18 +3755,18 @@ impl Stream for CaConn { } else { if have_progress { debug!("is_shutdown NOT queues_out_flushed prog {}", self.remote_addr_dbg); - self.stats.poll_reloop().inc(); + self.mett.poll_reloop().inc(); reloops += 1; continue; } else if have_pending { debug!("is_shutdown NOT queues_out_flushed pend {}", self.remote_addr_dbg); self.log_queues_summary(); - self.stats.poll_pending().inc(); + self.mett.poll_pending().inc(); Pending } else { // TODO error error!("shutting down, queues not flushed, no progress, no pending"); - self.stats.logic_error().inc(); + self.mett.logic_error().inc(); let e = Error::ShutdownWithQueuesNoProgressNoPending; Ready(Some(CaConnEvent::err_now(e))) } @@ -3714,19 +3774,19 @@ impl Stream for CaConn { } else { if have_progress { if poll_ts1.elapsed() > Duration::from_millis(5) { - self.stats.poll_wake_break().inc(); + self.mett.poll_wake_break().inc(); cx.waker().wake_by_ref(); break Ready(Some(CaConnEvent::new(self.poll_tsnow, CaConnEventValue::None))); } else { - self.stats.poll_reloop().inc(); + self.mett.poll_reloop().inc(); reloops += 1; continue; } } else if have_pending { - self.stats.poll_pending().inc(); + self.mett.poll_pending().inc(); Pending } else { - self.stats.poll_no_progress_no_pending().inc(); + self.mett.poll_no_progress_no_pending().inc(); let e = Error::NoProgressNoPending; Ready(Some(CaConnEvent::err_now(e))) } @@ -3735,22 +3795,19 @@ impl Stream for CaConn { let poll_ts2 = Instant::now(); let dt = poll_ts2.saturating_duration_since(poll_ts1); if self.trace_channel_poll { + // TODO STATS self.stats.poll_all_dt().ingest_dur_dms(dt); if dt >= Duration::from_millis(10) { trace!("long poll {dt:?}"); } else if dt >= Duration::from_micros(400) { + // TODO STATS let v = self.stats.poll_all_dt.to_display(); let ip = self.remote_addr_dbg; trace!("poll_all_dt {ip} {v}"); } } - self.stats.read_ioids_len().set(self.read_ioids.len() as u64); - let n = match &self.proto { - Some(x) => x.proto_out_len() as u64, - None => 0, - }; - self.stats.proto_out_len().set(n); - self.stats.poll_reloops().ingest(reloops); + // TODO STATS + // self.stats.poll_reloops().ingest(reloops); ret } } diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index a029bdd..6b60dd0 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -259,7 +259,7 @@ impl CaConnSetEvent { pub enum CaConnSetItem { Error(Error), Healthy, - Metrics(crate::metrics::types::CaConnSetMetrics), + Metrics(stats::mett::CaConnSetMetrics), } pub struct CaConnSetCtrl { @@ -770,7 +770,7 @@ impl CaConnSet { CaConnEventValue::EndOfStream(reason) => self.handle_ca_conn_eos(addr, reason), CaConnEventValue::ChannelRemoved(name) => self.handle_ca_conn_channel_removed(addr, name), CaConnEventValue::Metrics(v) => { - self.ca_connset_metrics.ca_conn_agg.ingest(v); + self.ca_connset_metrics.mett.ca_conn().ingest(v); Ok(()) } } @@ -1882,8 +1882,9 @@ impl CaConnSet { } } { - let metrics = std::mem::replace(&mut self.ca_connset_metrics, CaConnSetMetrics::new()); - let item = CaConnSetItem::Metrics(metrics); + // let item = std::mem::replace(&mut self.ca_connset_metrics, CaConnSetMetrics::new()); + let item = self.ca_connset_metrics.mett.take_and_reset(); + let item = CaConnSetItem::Metrics(item); self.connset_out_queue.push_back(item); } Ok(()) diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs index ffdcd5a..dabb7cc 100644 --- a/netfetch/src/ca/findioc.rs +++ b/netfetch/src/ca/findioc.rs @@ -15,9 +15,9 @@ use std::collections::VecDeque; use std::net::Ipv4Addr; use std::net::SocketAddrV4; use std::pin::Pin; +use std::sync::Arc; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; -use std::sync::Arc; use std::task::Context; use std::task::Poll; use std::time::Duration; @@ -188,26 +188,28 @@ impl FindIocStream { } unsafe fn create_socket() -> Result { - let ec = libc::socket(libc::AF_INET, libc::SOCK_DGRAM, 0); + let ec = unsafe { libc::socket(libc::AF_INET, libc::SOCK_DGRAM, 0) }; if ec == -1 { return Err(Error::SocketCreate); } let sock = SockBox(ec); { let opt: libc::c_int = 1; - let ec = libc::setsockopt( - sock.0, - libc::SOL_SOCKET, - libc::SO_BROADCAST, - &opt as *const _ as _, - std::mem::size_of::() as _, - ); + let ec = unsafe { + libc::setsockopt( + sock.0, + libc::SOL_SOCKET, + libc::SO_BROADCAST, + &opt as *const _ as _, + std::mem::size_of::() as _, + ) + }; if ec == -1 { return Err(Error::BroadcastEnable); } } { - let ec = libc::fcntl(sock.0, libc::F_SETFL, libc::O_NONBLOCK); + let ec = unsafe { libc::fcntl(sock.0, libc::F_SETFL, libc::O_NONBLOCK) }; if ec == -1 { return Err(Error::NonblockEnable); } @@ -222,7 +224,7 @@ impl FindIocStream { sin_zero: [0; 8], }; let addr_len = std::mem::size_of::(); - let ec = libc::bind(sock.0, &addr as *const _ as _, addr_len as _); + let ec = unsafe { libc::bind(sock.0, &addr as *const _ as _, addr_len as _) }; if ec == -1 { return Err(Error::SocketBind); } @@ -234,7 +236,7 @@ impl FindIocStream { sin_zero: [0; 8], }; let mut addr_len = std::mem::size_of::(); - let ec = libc::getsockname(sock.0, &mut addr as *mut _ as _, &mut addr_len as *mut _ as _); + let ec = unsafe { libc::getsockname(sock.0, &mut addr as *mut _ as _, &mut addr_len as *mut _ as _) }; if ec == -1 { error!("getsockname {ec}"); return Err(Error::SocketConvertTokio); @@ -261,16 +263,18 @@ impl FindIocStream { sin_zero: [0; 8], }; let addr_len = std::mem::size_of::(); - let ec = libc::sendto( - sock, - &buf[0] as *const _ as _, - buf.len() as _, - 0, - &addr as *const _ as _, - addr_len as _, - ); + let ec = unsafe { + libc::sendto( + sock, + &buf[0] as *const _ as _, + buf.len() as _, + 0, + &addr as *const _ as _, + addr_len as _, + ) + }; if ec == -1 { - let errno = *libc::__errno_location(); + let errno = unsafe { *libc::__errno_location() }; if errno == libc::EAGAIN { return Poll::Pending; } else { @@ -288,16 +292,18 @@ impl FindIocStream { let mut saddr_mem = [0u8; std::mem::size_of::()]; let mut saddr_len: libc::socklen_t = saddr_mem.len() as _; let mut buf = vec![0u8; 1024]; - let ec = libc::recvfrom( - sock, - buf.as_mut_ptr() as _, - buf.len() as _, - libc::O_NONBLOCK, - &mut saddr_mem as *mut _ as _, - &mut saddr_len as *mut _ as _, - ); + let ec = unsafe { + libc::recvfrom( + sock, + buf.as_mut_ptr() as _, + buf.len() as _, + libc::O_NONBLOCK, + &mut saddr_mem as *mut _ as _, + &mut saddr_len as *mut _ as _, + ) + }; if ec == -1 { - let errno = *libc::__errno_location(); + let errno = unsafe { *libc::__errno_location() }; if errno == libc::EAGAIN { return Poll::Pending; } else { @@ -312,7 +318,7 @@ impl FindIocStream { Poll::Ready(Err(Error::ReadEmpty)) } else { stats.ca_udp_io_recv().inc(); - let saddr2: libc::sockaddr_in = std::mem::transmute_copy(&saddr_mem); + let saddr2: libc::sockaddr_in = unsafe { std::mem::transmute_copy(&saddr_mem) }; let src_addr = Ipv4Addr::from(saddr2.sin_addr.s_addr.to_ne_bytes()); let src_port = u16::from_be(saddr2.sin_port); if false { diff --git a/netfetch/src/metrics/types.rs b/netfetch/src/metrics/types.rs index a60f543..43b10b7 100644 --- a/netfetch/src/metrics/types.rs +++ b/netfetch/src/metrics/types.rs @@ -62,12 +62,14 @@ impl CaConnMetricsAggAgg { #[derive(Debug, Serialize)] pub struct CaConnSetMetrics { pub ca_conn_agg: CaConnMetricsAgg, + pub mett: stats::mett::CaConnSetMetrics, } impl CaConnSetMetrics { pub fn new() -> Self { Self { ca_conn_agg: CaConnMetricsAgg::new(), + mett: stats::mett::CaConnSetMetrics::new(), } } } @@ -117,23 +119,6 @@ impl From<&InsertQueuesTx> for InsertQueuesTxMetrics { } } -#[derive(Debug, Serialize)] -pub struct DaemonMetrics { - pub ca_conn_set_agg: CaConnSetAggMetrics, -} - -impl DaemonMetrics { - pub fn new() -> Self { - Self { - ca_conn_set_agg: CaConnSetAggMetrics::new(), - } - } - - pub fn ingest_ca_conn_set(&mut self, inp: CaConnSetMetrics) { - self.ca_conn_set_agg.ingest(inp); - } -} - #[derive(Debug, Serialize)] pub struct MetricsPrometheusShort { // derived from values @@ -157,8 +142,8 @@ impl MetricsPrometheusShort { } } -impl From<&DaemonMetrics> for MetricsPrometheusShort { - fn from(value: &DaemonMetrics) -> Self { +impl From<&stats::mett::DaemonMetrics> for MetricsPrometheusShort { + fn from(value: &stats::mett::DaemonMetrics) -> Self { Self { ca_conn_event_out_queue_len_max: value.ca_conn_set_agg.ca_conn_agg_agg.ca_conn_event_out_queue_len_max, ca_msg_recv_cnt_all: value.ca_conn_set_agg.ca_conn_agg_agg.ca_msg_recv_cnt_all, diff --git a/stats/Cargo.toml b/stats/Cargo.toml index 0e18976..d98f292 100644 --- a/stats/Cargo.toml +++ b/stats/Cargo.toml @@ -8,7 +8,9 @@ edition = "2021" path = "src/stats.rs" [dependencies] +serde = { version = "1", features = ["derive"] } rand_xoshiro = "0.6.0" stats_types = { path = "../stats_types" } stats_proc = { path = "../stats_proc" } log = { path = "../log" } +mettrics = { version = "0.0.5", path = "../../mettrics" } diff --git a/stats/mettdecl.rs b/stats/mettdecl.rs new file mode 100644 index 0000000..0ee7b5f --- /dev/null +++ b/stats/mettdecl.rs @@ -0,0 +1,68 @@ +mod Metrics { + type StructName = CaConnMetrics; + enum counters { + ioid_read_begin, + ioid_read_done, + ioid_read_timeout, + ioid_read_error_exists, + ioid_read_error_not_found, + recv_read_notify_state_passive_found_ioid, + proto_out_push, + logic_error, + poll_fn_begin, + poll_loop_begin, + poll_no_progress_no_pending, + poll_pending, + poll_reloop, + poll_wake_break, + insert_item_queue_full, + insert_item_queue_pressure, + out_queue_full, + loop2_count, + loop3_count, + tcp_connected, + ca_proto_no_version_as_first, + ca_proto_version_later, + ca_msg_recv, + event_add_res_recv, + time_check_channels_state_init, + channel_not_alive_no_activity, + ping_no_proto, + ping_start, + pong_timeout, + caget_timeout, + caget_issued, + monitor_stale_read_timeout, + monitor_stale_read_begin, + unknown_ioid, + recv_read_notify_state_read_pending, + recv_read_notify_state_read_pending_bad_ioid, + recv_read_notify_while_polling_idle, + no_cid_for_subid, + recv_read_notify_but_no_longer_ready, + recv_read_notify_but_not_init_yet, + recv_event_add_while_wait_on_read_notify, + transition_to_polling, + transition_to_polling_bad_state, + transition_to_polling_already_in, + unknown_subid, + get_series_id_ok, + channel_add_exists, + } +} + +mod Metrics { + type StructName = CaConnSetMetrics; + mod Compose { + type Input = CaConnMetrics; + type Name = ca_conn; + } +} + +mod Metrics { + type StructName = DaemonMetrics; + mod Compose { + type Input = CaConnSetMetrics; + type Name = ca_conn_set; + } +} diff --git a/stats/src/mett.rs b/stats/src/mett.rs new file mode 100644 index 0000000..3800997 --- /dev/null +++ b/stats/src/mett.rs @@ -0,0 +1,3 @@ +use mettrics::types::CounterU32; + +mettrics::macros::make_metrics!("mettdecl.rs"); diff --git a/stats/src/stats.rs b/stats/src/stats.rs index a902ff8..92f9911 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -1,3 +1,5 @@ +pub mod mett; + pub use rand_xoshiro; use std::sync::atomic::AtomicU64; @@ -410,7 +412,6 @@ stats_proc::stats_struct!(( ca_proto_version_later, no_cid_for_subid, ), - values(inter_ivl_ema, read_ioids_len, proto_out_len,), histolog2s( poll_all_dt, poll_op3_dt, diff --git a/stats_proc/src/stats_proc.rs b/stats_proc/src/stats_proc.rs index 5418947..eadea98 100644 --- a/stats_proc/src/stats_proc.rs +++ b/stats_proc/src/stats_proc.rs @@ -636,7 +636,7 @@ impl syn::parse::Parse for StatsTreeDef { let diff_def = DiffStructDef::from_args(fa.args)?; diff_defs.push(diff_def); } else { - return Err(syn::Error::new(fa.name.span(), "Unexpected")); + return Err(syn::Error::new(fa.name.span(), "unexpected")); } } let ret = StatsTreeDef {