Use independent metrics type

This commit is contained in:
Dominik Werder
2025-04-10 11:58:28 +02:00
parent d212613b2d
commit 30dfbfd796
5 changed files with 45 additions and 25 deletions

View File

@@ -19,6 +19,7 @@ slidebuf = "0.0.1"
thiserror = "=0.0.1"
series = { path = "../daqbuf-series", package = "daqbuf-series" }
netpod = { path = "../daqbuf-netpod", package = "daqbuf-netpod" }
mettrics = { version = "0.0.6", path = "../mettrics" }
[patch.crates-io]
thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" }

19
metrics.rs Normal file
View File

@@ -0,0 +1,19 @@
mod Metrics {
type StructName = CaProtoMetrics;
enum counters {
metrics_emit,
tcp_recv_count,
tcp_recv_bytes,
protocol_issue,
payload_std_too_large,
payload_ext_but_small,
payload_ext_very_large,
out_msg_placed,
out_bytes,
}
enum histolog2s {
payload_size,
data_count,
outbuf_len,
}
}

View File

@@ -1277,7 +1277,7 @@ pub trait AsyncWriteRead: AsyncWrite + AsyncRead + Send + 'static {}
impl<T> AsyncWriteRead for T where T: AsyncWrite + AsyncRead + Send + 'static {}
pub struct CaProto<STATS = ()> {
pub struct CaProto {
tcp: Pin<Box<dyn AsyncWriteRead>>,
tcp_eof: bool,
remote_name: String,
@@ -1286,7 +1286,7 @@ pub struct CaProto<STATS = ()> {
outbuf: SlideBuf,
out: VecDeque<CaMsg>,
array_truncate: usize,
stats: STATS,
mett: crate::mett::CaProtoMetrics,
resqu: VecDeque<CaItem>,
event_add_res_cnt: u32,
bytes_recv_testing: u32,
@@ -1303,7 +1303,6 @@ impl fmt::Debug for CaProto {
.field("outbuf", &self.outbuf)
.field("out", &self.out)
.field("array_truncate", &self.array_truncate)
.field("stats", &self.stats)
.field("resqu", &self.resqu)
.field("event_add_res_cnt", &self.event_add_res_cnt)
.field("bytes_recv_testing", &self.bytes_recv_testing)
@@ -1311,16 +1310,8 @@ impl fmt::Debug for CaProto {
}
}
impl<STATS> CaProto<STATS>
where
STATS: CaProtoStatsRecv,
{
pub fn new<T: AsyncWriteRead>(
tcp: T,
remote_name: String,
array_truncate: usize,
stats: STATS,
) -> Self {
impl CaProto {
pub fn new<T: AsyncWriteRead>(tcp: T, remote_name: String, array_truncate: usize) -> Self {
Self {
tcp: Box::pin(tcp),
tcp_eof: false,
@@ -1330,13 +1321,17 @@ where
outbuf: SlideBuf::new(1024 * 256),
out: VecDeque::new(),
array_truncate,
stats,
mett: crate::mett::CaProtoMetrics::new(),
resqu: VecDeque::with_capacity(256),
event_add_res_cnt: 0,
bytes_recv_testing: 0,
}
}
pub fn mett(&mut self) -> &mut crate::mett::CaProtoMetrics {
&mut self.mett
}
pub fn proto_out_len(&self) -> usize {
self.out.len()
}
@@ -1369,7 +1364,7 @@ where
Ready(k) => match k {
Ok(k) => match self.outbuf.adv(k) {
Ok(()) => {
self.stats.out_bytes().add(k as u64);
self.mett.out_bytes().add(k as u32);
Ready(Ok(k))
}
Err(e) => {
@@ -1393,7 +1388,7 @@ where
let tsnow = Instant::now();
{
let g = self.outbuf.len();
self.stats.outbuf_len().ingest(g as u32);
self.mett.outbuf_len().push_val(g as u32);
}
while let Some((msg, buf)) = self.out_msg_buf() {
let msglen = msg.len();
@@ -1403,7 +1398,7 @@ where
msg.place_into(&mut buf[..msglen]);
self.outbuf.wadv(msglen)?;
self.out.pop_front();
self.stats.out_msg_placed().inc();
self.mett.out_msg_placed().inc();
}
while self.outbuf.len() != 0 {
match Self::attempt_output(self.as_mut(), cx)? {
@@ -1470,8 +1465,8 @@ where
self.buf.wadv(nf)?;
}
have_progress = true;
self.stats.tcp_recv_count().inc();
self.stats.tcp_recv_bytes().add(nf as _);
self.mett.tcp_recv_count().inc();
self.mett.tcp_recv_bytes().add(nf as _);
continue;
}
}
@@ -1516,13 +1511,13 @@ where
let hi = HeadInfo::from_netbuf(&mut self.buf)?;
if hi.cmdid > 26 {
// TODO count as logic error
self.stats.protocol_issue().inc();
self.mett.protocol_issue().inc();
}
if hi.payload_size == 0xffff {
self.state = CaState::ExtHead(hi);
Ok(None)
} else {
self.stats.payload_size().ingest(hi.payload_len() as u32);
self.mett.payload_size().push_val(hi.payload_len() as u32);
if hi.payload_size == 0 {
self.state = CaState::StdHead;
let msg = CaMsg::from_proto_infos(&hi, &[], tsnow, self.array_truncate)?;
@@ -1536,9 +1531,9 @@ where
CaState::ExtHead(hi) => {
let payload_size = self.buf.read_u32_be()?;
let data_count = self.buf.read_u32_be()?;
self.stats.payload_size().ingest(hi.payload_len() as u32);
self.mett.payload_size().push_val(hi.payload_len() as u32);
if payload_size > PAYLOAD_LEN_MAX {
self.stats.payload_ext_very_large().inc();
self.mett.payload_ext_very_large().inc();
if false {
warn!(
"ExtHead data_type {} payload_size {payload_size} data_count {data_count}",
@@ -1548,7 +1543,7 @@ where
}
if payload_size <= 0x3ff0 {
// NOTE can happen even with zero payload, just because data-count exceeds u16.
self.stats.payload_ext_but_small().inc();
self.mett.payload_ext_but_small().inc();
if false {
warn!(
"ExtHead data_type {} payload_size {payload_size} data_count {data_count}",
@@ -1566,7 +1561,7 @@ where
// data-count is only reasonable for event messages
let ret = match &msg.ty {
CaMsgTy::EventAddRes(..) => {
self.stats.data_count().ingest(hi.data_count() as u32);
self.mett.data_count().push_val(hi.data_count() as u32);
if TESTING_UNRESPONSIVE_TODO_REMOVE {
if self.event_add_res_cnt < TESTING_EVENT_ADD_RES_MAX {
self.event_add_res_cnt += 1;

View File

@@ -1 +1,2 @@
pub mod ca;
pub mod mett;

4
src/mett.rs Normal file
View File

@@ -0,0 +1,4 @@
use mettrics::types::CounterU32;
use mettrics::types::HistoLog2;
mettrics::macros::make_metrics!("metrics.rs");