WIP transition metrics

This commit is contained in:
Dominik Werder
2025-04-07 16:36:11 +02:00
parent ba4b444778
commit db3e243e2e
10 changed files with 270 additions and 147 deletions

View File

@@ -86,7 +86,7 @@ pub struct Daemon {
// TODO
series_conf_by_id_tx: Sender<()>,
iqtx: Option<InsertQueuesTx>,
daemon_metrics: DaemonMetrics,
daemon_metrics: stats::mett::DaemonMetrics,
}
impl Daemon {
@@ -387,7 +387,7 @@ impl Daemon {
channel_info_query_tx,
series_conf_by_id_tx,
iqtx: Some(iqtx2),
daemon_metrics: DaemonMetrics::new(),
daemon_metrics: stats::mett::DaemonMetrics::new(),
};
Ok(ret)
}
@@ -564,8 +564,8 @@ impl Daemon {
error!("error from CaConnSet: {e}");
self.handle_shutdown().await?;
}
Metrics(metrics) => {
self.daemon_metrics.ingest_ca_conn_set(metrics);
Metrics(x) => {
self.daemon_metrics.ca_conn_set().ingest(x);
}
}
Ok(())

View File

@@ -326,7 +326,7 @@ enum Monitoring2State {
ReadPending(Monitoring2ReadPendingState),
}
#[derive(Debug, Clone, Serialize)]
enum MonitorReadCmp {
pub enum MonitorReadCmp {
Equal,
DiffTime,
DiffTimeValue,
@@ -1014,7 +1014,7 @@ pub enum CaConnEventValue {
ChannelCreateFail(String),
EndOfStream(EndOfStreamReason),
ChannelRemoved(String),
Metrics(CaConnMetrics),
Metrics(stats::mett::CaConnMetrics),
}
impl CaConnEventValue {
@@ -1072,6 +1072,9 @@ pub struct CaConn {
state: CaConnState,
ticker: Pin<Box<tokio::time::Sleep>>,
proto: Option<CaProto>,
version_seen: bool,
non_version_seen: bool,
non_version_seen_emitted: bool,
cid_store: CidStore,
subid_store: SubidStore,
channels: HashMap<Cid, ChannelConf>,
@@ -1112,9 +1115,8 @@ pub struct CaConn {
read_ioids: HashMap<Ioid, Cid>,
handler_by_ioid: HashMap<Ioid, Option<Pin<Box<dyn ConnFuture>>>>,
trace_channel_poll: bool,
ca_msg_recv_cnt: u64,
ca_version_recv_count: u64,
ts_channel_status_pong_last: Instant,
mett: stats::mett::CaConnMetrics,
}
impl Drop for CaConn {
@@ -1143,6 +1145,9 @@ impl CaConn {
state: CaConnState::Unconnected(tsnow),
ticker: Self::new_self_ticker(&mut rng),
proto: None,
version_seen: false,
non_version_seen: false,
non_version_seen_emitted: false,
cid_store: CidStore::new_from_time(),
subid_store: SubidStore::new_from_time(),
init_state_count: 0,
@@ -1179,9 +1184,8 @@ impl CaConn {
read_ioids: HashMap::new(),
handler_by_ioid: HashMap::new(),
trace_channel_poll: false,
ca_msg_recv_cnt: 0,
ca_version_recv_count: 0,
ts_channel_status_pong_last: tsnow,
mett: stats::mett::CaConnMetrics::new(),
}
}
@@ -1285,7 +1289,7 @@ impl CaConn {
status: ConnectionStatus::Closing,
};
if self.emit_connection_status_item(item).is_err() {
self.stats.logic_error().inc();
self.mett.logic_error().inc();
}
self.proto = None;
}
@@ -1299,7 +1303,6 @@ impl CaConn {
fn cmd_channel_close(&mut self, name: String) {
self.channel_close(name);
// TODO return the result
//self.stats.caconn_command_can_not_reply.inc();
}
fn cmd_shutdown(&mut self) {
@@ -1309,7 +1312,7 @@ impl CaConn {
fn handle_conn_command(&mut self, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
use Poll::*;
self.stats.loop3_count.inc();
self.mett.loop3_count().inc();
if self.is_shutdown() {
Ok(Ready(None))
} else {
@@ -1480,7 +1483,7 @@ impl CaConn {
st2.channel.shape.clone(),
conf.conf.name().into(),
)?;
self.stats.get_series_id_ok.inc();
self.stats.get_series_id_ok().inc();
{
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
@@ -1568,7 +1571,7 @@ impl CaConn {
}
}
if self.cid_by_name(conf.name()).is_some() {
self.stats.channel_add_exists.inc();
self.mett.channel_add_exists().inc();
if series::dbg::dbg_chn(&conf.name()) {
error!("logic error channel already exists {conf:?}");
}
@@ -1576,7 +1579,7 @@ impl CaConn {
} else {
let cid = self.cid_by_name_or_insert(conf.name())?;
if self.channels.contains_key(&cid) {
self.stats.channel_add_exists.inc();
self.mett.channel_add_exists().inc();
if series::dbg::dbg_chn(&conf.name()) {
error!("logic error channel already exists {conf:?}");
}
@@ -1610,7 +1613,7 @@ impl CaConn {
};
let qu = Self::channel_status_qu(&mut self.iqdqs);
if conf.wrst.emit_channel_status_item(item, qu).is_err() {
self.stats.logic_error().inc();
self.mett.logic_error().inc();
}
// TODO shutdown the internal writer structures.
if let Some(cst) = conf.state.created_state() {
@@ -1716,7 +1719,7 @@ impl CaConn {
status: ChannelStatus::Closed(channel_reason.clone()),
};
if conf.wrst.emit_channel_status_item(item, status_qu).is_err() {
self.stats.logic_error().inc();
self.mett.logic_error().inc();
}
}
for (_cid, conf) in &mut self.channels {
@@ -1761,7 +1764,7 @@ impl CaConn {
let cid = if let Some(x) = self.cid_by_subid.get(&subid) {
*x
} else {
self.stats.unknown_subid().inc();
self.mett.unknown_subid().inc();
return Ok(());
};
let ch_s = if let Some(x) = self.channels.get_mut(&cid) {
@@ -1804,7 +1807,7 @@ impl CaConn {
*x
} else {
if self.thr_msg_poll.is_action() {
self.stats.no_cid_for_subid().inc();
self.mett.no_cid_for_subid().inc();
// debug!("can not find cid for subid {subid:?}");
}
// return Err(Error::with_msg_no_trace());
@@ -1859,19 +1862,19 @@ impl CaConn {
Some(x.subid.clone())
}
ReadingState::StopMonitoringForPolling(_) => {
self.stats.transition_to_polling_bad_state().inc();
self.mett.transition_to_polling_bad_state().inc();
None
}
ReadingState::Polling(_) => {
self.stats.transition_to_polling_already_in().inc();
self.mett.transition_to_polling_already_in().inc();
None
}
};
if let Some(subid) = subid {
self.stats.transition_to_polling().inc();
self.mett.transition_to_polling().inc();
self.transition_to_polling(subid, tsnow)?;
} else {
self.stats.transition_to_polling_bad_state().inc();
self.mett.transition_to_polling_bad_state().inc();
}
return Ok(());
}
@@ -1918,7 +1921,7 @@ impl CaConn {
Monitoring2State::ReadPending(_) => {
// Received EventAdd while still waiting for answer to explicit ReadNotify.
// This is fine.
self.stats.recv_event_add_while_wait_on_read_notify.inc();
self.mett.recv_event_add_while_wait_on_read_notify().inc();
}
}
let crst = &mut st.channel;
@@ -1963,10 +1966,10 @@ impl CaConn {
| ChannelState::FetchEnumDetails(_)
| ChannelState::FetchCaStatusSeries(_)
| ChannelState::MakingSeriesWriter(_) => {
self.stats.recv_read_notify_but_not_init_yet.inc();
self.mett.recv_read_notify_but_not_init_yet().inc();
}
ChannelState::Closing(_) | ChannelState::Ended(_) | ChannelState::Error(_) => {
self.stats.recv_read_notify_but_no_longer_ready.inc();
self.mett.recv_read_notify_but_no_longer_ready().inc();
}
}
Ok(())
@@ -1979,7 +1982,7 @@ impl CaConn {
*x
} else {
if self.thr_msg_poll.is_action() {
self.stats.no_cid_for_subid().inc();
self.mett.no_cid_for_subid().inc();
// debug!("can not find cid for subid {subid:?}");
}
// return Err(Error::with_msg_no_trace());
@@ -2090,13 +2093,18 @@ impl CaConn {
match &mut st.reading {
ReadingState::Polling(st2) => match &mut st2.tick {
PollTickState::Idle(_) => {
self.stats.recv_read_notify_while_polling_idle.inc();
self.mett.recv_read_notify_while_polling_idle().inc();
}
PollTickState::Wait(st3) => {
let dt = tsnow.saturating_duration_since(st3.since);
self.stats.caget_lat().ingest_dur_dms(dt);
// TODO STATS
// self.stats.caget_lat().ingest_dur_dms(dt);
// TODO maintain histogram of read-notify latencies
self.read_ioids.remove(&st3.ioid);
if self.read_ioids.remove(&st3.ioid).is_some() {
self.mett.ioid_read_done().inc();
} else {
self.mett.ioid_read_error_not_found().inc();
}
let next = PollTickStateIdle::decide_next(st3.next_backup, st2.poll_ivl, tsnow);
if self.trace_channel_poll {
trace!("make next poll idle at {:?} tsnow {:?}", next, tsnow);
@@ -2123,9 +2131,10 @@ impl CaConn {
ReadingState::Monitoring(st2) => match &mut st2.mon2state {
Monitoring2State::Passive(st3) => {
if self.read_ioids.remove(&ioid).is_some() {
self.stats.recv_read_notify_state_passive_found_ioid.inc();
self.mett.ioid_read_done().inc();
self.mett.recv_read_notify_state_passive_found_ioid().inc();
} else {
self.stats.recv_read_notify_state_passive.inc();
self.mett.ioid_read_error_not_found().inc();
}
st3.tsbeg = tsnow;
}
@@ -2134,13 +2143,15 @@ impl CaConn {
// So we could be here a little beyond timeout but we don't care about that.
if ioid != st3.ioid {
// warn!("IOID mismatch ReadNotifyRes on Monitor Read Pending {ioid:?} {ioid2:?}");
self.stats.recv_read_notify_state_read_pending_bad_ioid.inc();
self.mett.recv_read_notify_state_read_pending_bad_ioid().inc();
} else {
self.stats.recv_read_notify_state_read_pending.inc();
self.mett.recv_read_notify_state_read_pending().inc();
}
let read_expected = if let Some(_cid) = self.read_ioids.remove(&ioid) {
self.mett.ioid_read_done().inc();
true
} else {
self.mett.ioid_read_error_not_found().inc();
false
};
st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState {
@@ -2219,7 +2230,7 @@ impl CaConn {
}
} else {
// warn!("unknown {ioid:?}");
self.stats.unknown_ioid().inc();
self.mett.unknown_ioid().inc();
}
Ok(())
}
@@ -2517,11 +2528,19 @@ impl CaConn {
Monitoring2State::Passive(st4) => {
if st4.tsbeg + conf.conf.manual_poll_on_quiet_after() < tsnow {
trace_monitor_stale!("check_channels_state_poll Monitoring2State::Passive timeout");
self.stats.monitor_stale_read_begin().inc();
self.mett.monitor_stale_read_begin().inc();
// TODO encapsulate and unify with Polling handler
let ioid = Ioid(self.ioid);
self.ioid = self.ioid.wrapping_add(1);
self.read_ioids.insert(ioid, st2.channel.cid.clone());
self.read_ioids
.entry(ioid)
.and_modify(|e| {
self.mett.ioid_read_error_exists().inc();
})
.or_insert_with(|| {
self.mett.ioid_read_begin().inc();
st2.channel.cid.clone()
});
let msg = CaMsg::from_ty_ts(
CaMsgTy::ReadNotify(ReadNotify {
data_type: st2.channel.ca_dbr_type,
@@ -2535,7 +2554,7 @@ impl CaConn {
self.proto.as_mut().ok_or_else(|| Error::NoProtocol)?.push_out(msg);
st3.mon2state =
Monitoring2State::ReadPending(Monitoring2ReadPendingState { tsbeg: tsnow, ioid });
self.stats.caget_issued().inc();
self.mett.caget_issued().inc();
{
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
@@ -2552,7 +2571,7 @@ impl CaConn {
// Something is wrong with this channel.
// Maybe we lost connection, maybe the IOC went down, maybe there is a bug where only
// this or a subset of the subscribed channels no longer give updates.
self.stats.monitor_stale_read_timeout().inc();
self.mett.monitor_stale_read_timeout().inc();
let name = conf.conf.name();
trace_monitor_stale!(
"channel monitor explicit read timeout {} ioid {:?}",
@@ -2600,7 +2619,15 @@ impl CaConn {
if st4.next <= tsnow {
let ioid = Ioid(self.ioid);
self.ioid = self.ioid.wrapping_add(1);
self.read_ioids.insert(ioid, st2.channel.cid.clone());
self.read_ioids
.entry(ioid)
.and_modify(|e| {
self.mett.ioid_read_error_exists().inc();
})
.or_insert_with(|| {
self.mett.ioid_read_begin().inc();
st2.channel.cid.clone()
});
let msg = CaMsg::from_ty_ts(
CaMsgTy::ReadNotify(ReadNotify {
data_type: st2.channel.ca_dbr_type,
@@ -2617,13 +2644,15 @@ impl CaConn {
since: tsnow,
ioid,
});
self.stats.caget_issued().inc();
self.mett.caget_issued().inc();
}
}
PollTickState::Wait(st4) => {
if st4.since + POLL_READ_TIMEOUT <= tsnow {
self.read_ioids.remove(&st4.ioid);
self.stats.caget_timeout().inc();
if self.read_ioids.remove(&st4.ioid).is_some() {
self.mett.ioid_read_timeout().inc();
}
self.mett.caget_timeout().inc();
let next = PollTickStateIdle::decide_next(st4.next_backup, st3.poll_ivl, tsnow);
if self.trace_channel_poll {
trace!("make poll idle after poll timeout {next:?}");
@@ -2657,7 +2686,7 @@ impl CaConn {
trace3!("check_channels_alive {}", self.remote_addr_dbg);
if let Some(started) = self.ioc_ping_start {
if started + TIMEOUT_PONG_WAIT < tsnow {
self.stats.pong_timeout().inc();
self.mett.pong_timeout().inc();
info!("pong timeout {}", self.remote_addr_dbg);
self.ioc_ping_start = None;
let item = CaConnEvent {
@@ -2670,13 +2699,13 @@ impl CaConn {
} else {
if self.ioc_ping_next < tsnow {
if let Some(proto) = &mut self.proto {
self.stats.ping_start().inc();
self.mett.ping_start().inc();
self.ioc_ping_start = Some(tsnow);
self.ioc_ping_last = Some(self.tmp_ts_poll);
let msg = CaMsg::from_ty_ts(CaMsgTy::Echo, tsnow);
proto.push_out(msg);
} else {
self.stats.ping_no_proto().inc();
self.mett.ping_no_proto().inc();
info!("can not ping {} no proto", self.remote_addr_dbg);
self.trigger_shutdown(ShutdownReason::ProtocolMissing);
}
@@ -2709,7 +2738,7 @@ impl CaConn {
}
if st2.channel.ts_activity_last + conf.conf.expect_activity_within() < tsnow {
not_alive_count += 1;
self.stats.channel_not_alive_no_activity.inc();
self.mett.channel_not_alive_no_activity().inc();
} else {
alive_count += 1;
}
@@ -2717,9 +2746,10 @@ impl CaConn {
_ => {}
}
}
self.stats.channel_all_count.__set(self.channels.len() as _);
self.stats.channel_alive_count.__set(alive_count as _);
self.stats.channel_not_alive_count.__set(not_alive_count as _);
// TODO STATS
// self.stats.channel_all_count.__set(self.channels.len() as _);
// self.stats.channel_alive_count.__set(alive_count as _);
// self.stats.channel_not_alive_count.__set(not_alive_count as _);
Ok(())
}
@@ -2730,9 +2760,10 @@ impl CaConn {
let mut ts1 = Instant::now();
// TODO unify with Listen state where protocol gets polled as well.
let ts2 = Instant::now();
self.stats
.time_check_channels_state_init
.add((ts2.duration_since(ts1) * MS as u32).as_secs());
// TODO STATS
// self.mett
// .time_check_channels_state_init
// .add((ts2.duration_since(ts1) * 1000).as_secs());
ts1 = ts2;
let _ = ts1;
let tsnow = Instant::now();
@@ -2745,6 +2776,35 @@ impl CaConn {
Ready(Some(Ok(k))) => {
match k {
CaItem::Msg(camsg) => {
match &camsg.ty {
CaMsgTy::Version => {
if !self.version_seen {
self.version_seen = true;
if self.non_version_seen {
self.mett.ca_proto_version_later().inc();
}
}
}
CaMsgTy::VersionRes(_) => {
if !self.version_seen {
self.version_seen = true;
if self.non_version_seen {
self.mett.ca_proto_version_later().inc();
}
}
}
_ => {
if !self.non_version_seen {
self.non_version_seen = true;
}
if !self.version_seen {
if !self.non_version_seen_emitted {
self.non_version_seen_emitted = true;
self.mett.ca_proto_no_version_as_first().inc();
}
}
}
}
match camsg.ty {
CaMsgTy::SearchRes(k) => {
let a = k.addr.to_be_bytes();
@@ -2759,7 +2819,7 @@ impl CaConn {
}
CaMsgTy::EventAddRes(ev) => {
trace4!("got EventAddRes {:?} cnt {}", camsg.ts, ev.data_count);
self.stats.event_add_res_recv.inc();
self.mett.event_add_res_recv().inc();
Self::handle_event_add_res(self, ev, tsnow)?
}
CaMsgTy::EventAddResEmpty(ev) => {
@@ -2771,7 +2831,8 @@ impl CaConn {
// let addr = &self.remote_addr_dbg;
if let Some(started) = self.ioc_ping_start {
let dt = started.elapsed();
self.stats.pong_recv_lat().ingest_dur_dms(dt);
// TODO STATS
// self.stats.pong_recv_lat().ingest_dur_dms(dt);
} else {
let addr = &self.remote_addr_dbg;
warn!("received Echo even though we didn't asked for it {addr:?}");
@@ -2811,9 +2872,10 @@ impl CaConn {
}
CaMsgTy::VersionRes(_) => {
// TODO must check that version is only ever the first message.
if false {
self.stats.ca_proto_version_later().inc();
if self.non_version_seen {
self.mett.ca_proto_version_later().inc();
}
self.version_seen = true;
}
CaMsgTy::ChannelCloseRes(x) => {
self.handle_channel_close_res(x, tsnow)?;
@@ -2825,11 +2887,7 @@ impl CaConn {
}
CaItem::Empty => {}
}
if false {
// TODO use flags to keep track of whether we have seen version or other msgs.
self.stats.ca_proto_no_version_as_first().inc();
}
self.ca_msg_recv_cnt = self.ca_msg_recv_cnt.saturating_add(1);
self.mett.ca_msg_recv().inc();
Ready(Some(Ok(())))
}
Ready(Some(Err(e))) => {
@@ -2982,7 +3040,7 @@ impl CaConn {
Ready(connect_result) => {
match connect_result {
Ok(Ok(tcp)) => {
self.stats.tcp_connected.inc();
self.mett.tcp_connected().inc();
let addr = addr.clone();
self.emit_connection_status_item(ConnectionStatusItem {
ts: self.tmp_ts_poll,
@@ -3085,7 +3143,7 @@ impl CaConn {
let mut have_progress = false;
let mut have_pending = false;
for _ in 0..64 {
self.stats.loop2_count.inc();
self.mett.loop2_count().inc();
if self.is_shutdown() {
break;
} else if self.iqdqs.len() >= self.opts.insert_queue_max {
@@ -3186,12 +3244,7 @@ impl CaConn {
fn housekeeping_self(&mut self) {}
fn metrics_emit(&mut self) {
let ca_msg_recv_cnt = self.ca_msg_recv_cnt;
self.ca_msg_recv_cnt = 0;
let item = CaConnMetrics {
ca_conn_event_out_queue_len: self.ca_conn_event_out_queue.len(),
ca_msg_recv_cnt,
};
let item = self.mett.take_and_reset();
let item = CaConnEvent::new(Instant::now(), CaConnEventValue::Metrics(item));
self.ca_conn_event_out_queue.push_back(item);
}
@@ -3216,7 +3269,7 @@ impl CaConn {
// What to do if limit reached?
// Increase some error counter.
if self.ca_conn_event_out_queue.len() > self.ca_conn_event_out_queue_max {
self.stats.out_queue_full().inc();
self.mett.out_queue_full().inc();
} else {
self.ca_conn_event_out_queue.push_back(item);
}
@@ -3300,7 +3353,7 @@ impl CaConn {
Self::channel_status_pong_qu(&mut self.iqdqs)
};
if ch.wrst.emit_channel_status_item(item, qu).is_err() {
self.stats.logic_error().inc();
self.mett.logic_error().inc();
}
}
ChannelState::Closing(_) => {}
@@ -3475,17 +3528,17 @@ impl Stream for CaConn {
self.poll_tsnow = Instant::now();
self.tmp_ts_poll = SystemTime::now();
let poll_ts1 = Instant::now();
self.stats.poll_fn_begin().inc();
self.mett.poll_fn_begin().inc();
let mut reloops: u32 = 0;
let ret = loop {
let lts1 = Instant::now();
self.stats.poll_loop_begin().inc();
self.mett.poll_loop_begin().inc();
let qlen = self.iqdqs.len();
if qlen >= self.opts.insert_queue_max * 2 / 3 {
self.stats.insert_item_queue_pressure().inc();
self.mett.insert_item_queue_pressure().inc();
} else if qlen >= self.opts.insert_queue_max {
self.stats.insert_item_queue_full().inc();
self.mett.insert_item_queue_full().inc();
}
let mut have_pending = false;
@@ -3511,12 +3564,14 @@ impl Stream for CaConn {
{
let n = self.iqdqs.len();
// TODO STATS
self.stats.iiq_len().ingest(n as u32);
}
{
let stats2 = self.stats.clone();
let stats_fn = move |item: &VecDeque<QueryItem>| {
// TODO STATS
stats2.iiq_batch_len().ingest(item.len() as u32);
};
flush_queue_dqs!(
@@ -3533,6 +3588,7 @@ impl Stream for CaConn {
let stats2 = self.stats.clone();
let stats_fn = move |item: &VecDeque<QueryItem>| {
// TODO STATS
stats2.iiq_batch_len().ingest(item.len() as u32);
};
flush_queue_dqs!(
@@ -3549,6 +3605,7 @@ impl Stream for CaConn {
let stats2 = self.stats.clone();
let stats_fn = move |item: &VecDeque<QueryItem>| {
// TODO STATS
stats2.iiq_batch_len().ingest(item.len() as u32);
};
flush_queue_dqs!(
@@ -3565,6 +3622,7 @@ impl Stream for CaConn {
let stats2 = self.stats.clone();
let stats_fn = move |item: &VecDeque<QueryItem>| {
// TODO STATS
stats2.iiq_batch_len().ingest(item.len() as u32);
};
flush_queue_dqs!(
@@ -3581,6 +3639,7 @@ impl Stream for CaConn {
let stats2 = self.stats.clone();
let stats_fn = move |item: &VecDeque<QueryItem>| {
// TODO STATS
stats2.iiq_batch_len().ingest(item.len() as u32);
};
flush_queue_dqs!(
@@ -3663,6 +3722,7 @@ impl Stream for CaConn {
debug!("LONG OPERATION 2 {:.0} ms", 1e3 * dt.as_secs_f32());
}
let dt = lts3.saturating_duration_since(lts2);
// TODO STATS
self.stats.poll_op3_dt().ingest_dur_dms(dt);
if dt > max {
debug!("LONG OPERATION 3 {:.0} ms", 1e3 * dt.as_secs_f32());
@@ -3695,18 +3755,18 @@ impl Stream for CaConn {
} else {
if have_progress {
debug!("is_shutdown NOT queues_out_flushed prog {}", self.remote_addr_dbg);
self.stats.poll_reloop().inc();
self.mett.poll_reloop().inc();
reloops += 1;
continue;
} else if have_pending {
debug!("is_shutdown NOT queues_out_flushed pend {}", self.remote_addr_dbg);
self.log_queues_summary();
self.stats.poll_pending().inc();
self.mett.poll_pending().inc();
Pending
} else {
// TODO error
error!("shutting down, queues not flushed, no progress, no pending");
self.stats.logic_error().inc();
self.mett.logic_error().inc();
let e = Error::ShutdownWithQueuesNoProgressNoPending;
Ready(Some(CaConnEvent::err_now(e)))
}
@@ -3714,19 +3774,19 @@ impl Stream for CaConn {
} else {
if have_progress {
if poll_ts1.elapsed() > Duration::from_millis(5) {
self.stats.poll_wake_break().inc();
self.mett.poll_wake_break().inc();
cx.waker().wake_by_ref();
break Ready(Some(CaConnEvent::new(self.poll_tsnow, CaConnEventValue::None)));
} else {
self.stats.poll_reloop().inc();
self.mett.poll_reloop().inc();
reloops += 1;
continue;
}
} else if have_pending {
self.stats.poll_pending().inc();
self.mett.poll_pending().inc();
Pending
} else {
self.stats.poll_no_progress_no_pending().inc();
self.mett.poll_no_progress_no_pending().inc();
let e = Error::NoProgressNoPending;
Ready(Some(CaConnEvent::err_now(e)))
}
@@ -3735,22 +3795,19 @@ impl Stream for CaConn {
let poll_ts2 = Instant::now();
let dt = poll_ts2.saturating_duration_since(poll_ts1);
if self.trace_channel_poll {
// TODO STATS
self.stats.poll_all_dt().ingest_dur_dms(dt);
if dt >= Duration::from_millis(10) {
trace!("long poll {dt:?}");
} else if dt >= Duration::from_micros(400) {
// TODO STATS
let v = self.stats.poll_all_dt.to_display();
let ip = self.remote_addr_dbg;
trace!("poll_all_dt {ip} {v}");
}
}
self.stats.read_ioids_len().set(self.read_ioids.len() as u64);
let n = match &self.proto {
Some(x) => x.proto_out_len() as u64,
None => 0,
};
self.stats.proto_out_len().set(n);
self.stats.poll_reloops().ingest(reloops);
// TODO STATS
// self.stats.poll_reloops().ingest(reloops);
ret
}
}

View File

@@ -259,7 +259,7 @@ impl CaConnSetEvent {
pub enum CaConnSetItem {
Error(Error),
Healthy,
Metrics(crate::metrics::types::CaConnSetMetrics),
Metrics(stats::mett::CaConnSetMetrics),
}
pub struct CaConnSetCtrl {
@@ -770,7 +770,7 @@ impl CaConnSet {
CaConnEventValue::EndOfStream(reason) => self.handle_ca_conn_eos(addr, reason),
CaConnEventValue::ChannelRemoved(name) => self.handle_ca_conn_channel_removed(addr, name),
CaConnEventValue::Metrics(v) => {
self.ca_connset_metrics.ca_conn_agg.ingest(v);
self.ca_connset_metrics.mett.ca_conn().ingest(v);
Ok(())
}
}
@@ -1882,8 +1882,9 @@ impl CaConnSet {
}
}
{
let metrics = std::mem::replace(&mut self.ca_connset_metrics, CaConnSetMetrics::new());
let item = CaConnSetItem::Metrics(metrics);
// let item = std::mem::replace(&mut self.ca_connset_metrics, CaConnSetMetrics::new());
let item = self.ca_connset_metrics.mett.take_and_reset();
let item = CaConnSetItem::Metrics(item);
self.connset_out_queue.push_back(item);
}
Ok(())

View File

@@ -15,9 +15,9 @@ use std::collections::VecDeque;
use std::net::Ipv4Addr;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
@@ -188,26 +188,28 @@ impl FindIocStream {
}
unsafe fn create_socket() -> Result<SockBox, Error> {
let ec = libc::socket(libc::AF_INET, libc::SOCK_DGRAM, 0);
let ec = unsafe { libc::socket(libc::AF_INET, libc::SOCK_DGRAM, 0) };
if ec == -1 {
return Err(Error::SocketCreate);
}
let sock = SockBox(ec);
{
let opt: libc::c_int = 1;
let ec = libc::setsockopt(
sock.0,
libc::SOL_SOCKET,
libc::SO_BROADCAST,
&opt as *const _ as _,
std::mem::size_of::<libc::c_int>() as _,
);
let ec = unsafe {
libc::setsockopt(
sock.0,
libc::SOL_SOCKET,
libc::SO_BROADCAST,
&opt as *const _ as _,
std::mem::size_of::<libc::c_int>() as _,
)
};
if ec == -1 {
return Err(Error::BroadcastEnable);
}
}
{
let ec = libc::fcntl(sock.0, libc::F_SETFL, libc::O_NONBLOCK);
let ec = unsafe { libc::fcntl(sock.0, libc::F_SETFL, libc::O_NONBLOCK) };
if ec == -1 {
return Err(Error::NonblockEnable);
}
@@ -222,7 +224,7 @@ impl FindIocStream {
sin_zero: [0; 8],
};
let addr_len = std::mem::size_of::<libc::sockaddr_in>();
let ec = libc::bind(sock.0, &addr as *const _ as _, addr_len as _);
let ec = unsafe { libc::bind(sock.0, &addr as *const _ as _, addr_len as _) };
if ec == -1 {
return Err(Error::SocketBind);
}
@@ -234,7 +236,7 @@ impl FindIocStream {
sin_zero: [0; 8],
};
let mut addr_len = std::mem::size_of::<libc::sockaddr_in>();
let ec = libc::getsockname(sock.0, &mut addr as *mut _ as _, &mut addr_len as *mut _ as _);
let ec = unsafe { libc::getsockname(sock.0, &mut addr as *mut _ as _, &mut addr_len as *mut _ as _) };
if ec == -1 {
error!("getsockname {ec}");
return Err(Error::SocketConvertTokio);
@@ -261,16 +263,18 @@ impl FindIocStream {
sin_zero: [0; 8],
};
let addr_len = std::mem::size_of::<libc::sockaddr_in>();
let ec = libc::sendto(
sock,
&buf[0] as *const _ as _,
buf.len() as _,
0,
&addr as *const _ as _,
addr_len as _,
);
let ec = unsafe {
libc::sendto(
sock,
&buf[0] as *const _ as _,
buf.len() as _,
0,
&addr as *const _ as _,
addr_len as _,
)
};
if ec == -1 {
let errno = *libc::__errno_location();
let errno = unsafe { *libc::__errno_location() };
if errno == libc::EAGAIN {
return Poll::Pending;
} else {
@@ -288,16 +292,18 @@ impl FindIocStream {
let mut saddr_mem = [0u8; std::mem::size_of::<libc::sockaddr>()];
let mut saddr_len: libc::socklen_t = saddr_mem.len() as _;
let mut buf = vec![0u8; 1024];
let ec = libc::recvfrom(
sock,
buf.as_mut_ptr() as _,
buf.len() as _,
libc::O_NONBLOCK,
&mut saddr_mem as *mut _ as _,
&mut saddr_len as *mut _ as _,
);
let ec = unsafe {
libc::recvfrom(
sock,
buf.as_mut_ptr() as _,
buf.len() as _,
libc::O_NONBLOCK,
&mut saddr_mem as *mut _ as _,
&mut saddr_len as *mut _ as _,
)
};
if ec == -1 {
let errno = *libc::__errno_location();
let errno = unsafe { *libc::__errno_location() };
if errno == libc::EAGAIN {
return Poll::Pending;
} else {
@@ -312,7 +318,7 @@ impl FindIocStream {
Poll::Ready(Err(Error::ReadEmpty))
} else {
stats.ca_udp_io_recv().inc();
let saddr2: libc::sockaddr_in = std::mem::transmute_copy(&saddr_mem);
let saddr2: libc::sockaddr_in = unsafe { std::mem::transmute_copy(&saddr_mem) };
let src_addr = Ipv4Addr::from(saddr2.sin_addr.s_addr.to_ne_bytes());
let src_port = u16::from_be(saddr2.sin_port);
if false {

View File

@@ -62,12 +62,14 @@ impl CaConnMetricsAggAgg {
#[derive(Debug, Serialize)]
pub struct CaConnSetMetrics {
pub ca_conn_agg: CaConnMetricsAgg,
pub mett: stats::mett::CaConnSetMetrics,
}
impl CaConnSetMetrics {
pub fn new() -> Self {
Self {
ca_conn_agg: CaConnMetricsAgg::new(),
mett: stats::mett::CaConnSetMetrics::new(),
}
}
}
@@ -117,23 +119,6 @@ impl From<&InsertQueuesTx> for InsertQueuesTxMetrics {
}
}
#[derive(Debug, Serialize)]
pub struct DaemonMetrics {
pub ca_conn_set_agg: CaConnSetAggMetrics,
}
impl DaemonMetrics {
pub fn new() -> Self {
Self {
ca_conn_set_agg: CaConnSetAggMetrics::new(),
}
}
pub fn ingest_ca_conn_set(&mut self, inp: CaConnSetMetrics) {
self.ca_conn_set_agg.ingest(inp);
}
}
#[derive(Debug, Serialize)]
pub struct MetricsPrometheusShort {
// derived from values
@@ -157,8 +142,8 @@ impl MetricsPrometheusShort {
}
}
impl From<&DaemonMetrics> for MetricsPrometheusShort {
fn from(value: &DaemonMetrics) -> Self {
impl From<&stats::mett::DaemonMetrics> for MetricsPrometheusShort {
fn from(value: &stats::mett::DaemonMetrics) -> Self {
Self {
ca_conn_event_out_queue_len_max: value.ca_conn_set_agg.ca_conn_agg_agg.ca_conn_event_out_queue_len_max,
ca_msg_recv_cnt_all: value.ca_conn_set_agg.ca_conn_agg_agg.ca_msg_recv_cnt_all,

View File

@@ -8,7 +8,9 @@ edition = "2021"
path = "src/stats.rs"
[dependencies]
serde = { version = "1", features = ["derive"] }
rand_xoshiro = "0.6.0"
stats_types = { path = "../stats_types" }
stats_proc = { path = "../stats_proc" }
log = { path = "../log" }
mettrics = { version = "0.0.5", path = "../../mettrics" }

68
stats/mettdecl.rs Normal file
View File

@@ -0,0 +1,68 @@
mod Metrics {
type StructName = CaConnMetrics;
enum counters {
ioid_read_begin,
ioid_read_done,
ioid_read_timeout,
ioid_read_error_exists,
ioid_read_error_not_found,
recv_read_notify_state_passive_found_ioid,
proto_out_push,
logic_error,
poll_fn_begin,
poll_loop_begin,
poll_no_progress_no_pending,
poll_pending,
poll_reloop,
poll_wake_break,
insert_item_queue_full,
insert_item_queue_pressure,
out_queue_full,
loop2_count,
loop3_count,
tcp_connected,
ca_proto_no_version_as_first,
ca_proto_version_later,
ca_msg_recv,
event_add_res_recv,
time_check_channels_state_init,
channel_not_alive_no_activity,
ping_no_proto,
ping_start,
pong_timeout,
caget_timeout,
caget_issued,
monitor_stale_read_timeout,
monitor_stale_read_begin,
unknown_ioid,
recv_read_notify_state_read_pending,
recv_read_notify_state_read_pending_bad_ioid,
recv_read_notify_while_polling_idle,
no_cid_for_subid,
recv_read_notify_but_no_longer_ready,
recv_read_notify_but_not_init_yet,
recv_event_add_while_wait_on_read_notify,
transition_to_polling,
transition_to_polling_bad_state,
transition_to_polling_already_in,
unknown_subid,
get_series_id_ok,
channel_add_exists,
}
}
mod Metrics {
type StructName = CaConnSetMetrics;
mod Compose {
type Input = CaConnMetrics;
type Name = ca_conn;
}
}
mod Metrics {
type StructName = DaemonMetrics;
mod Compose {
type Input = CaConnSetMetrics;
type Name = ca_conn_set;
}
}

3
stats/src/mett.rs Normal file
View File

@@ -0,0 +1,3 @@
use mettrics::types::CounterU32;
mettrics::macros::make_metrics!("mettdecl.rs");

View File

@@ -1,3 +1,5 @@
pub mod mett;
pub use rand_xoshiro;
use std::sync::atomic::AtomicU64;
@@ -410,7 +412,6 @@ stats_proc::stats_struct!((
ca_proto_version_later,
no_cid_for_subid,
),
values(inter_ivl_ema, read_ioids_len, proto_out_len,),
histolog2s(
poll_all_dt,
poll_op3_dt,

View File

@@ -636,7 +636,7 @@ impl syn::parse::Parse for StatsTreeDef {
let diff_def = DiffStructDef::from_args(fa.args)?;
diff_defs.push(diff_def);
} else {
return Err(syn::Error::new(fa.name.span(), "Unexpected"));
return Err(syn::Error::new(fa.name.span(), "unexpected"));
}
}
let ret = StatsTreeDef {