diff --git a/Cargo.toml b/Cargo.toml index 9204bbf..57fbefc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ slidebuf = "0.0.1" thiserror = "=0.0.1" series = { path = "../daqbuf-series", package = "daqbuf-series" } netpod = { path = "../daqbuf-netpod", package = "daqbuf-netpod" } +mettrics = { version = "0.0.6", path = "../mettrics" } [patch.crates-io] thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" } diff --git a/metrics.rs b/metrics.rs new file mode 100644 index 0000000..a8af981 --- /dev/null +++ b/metrics.rs @@ -0,0 +1,19 @@ +mod Metrics { + type StructName = CaProtoMetrics; + enum counters { + metrics_emit, + tcp_recv_count, + tcp_recv_bytes, + protocol_issue, + payload_std_too_large, + payload_ext_but_small, + payload_ext_very_large, + out_msg_placed, + out_bytes, + } + enum histolog2s { + payload_size, + data_count, + outbuf_len, + } +} diff --git a/src/ca/proto.rs b/src/ca/proto.rs index 873a8c5..7057d9d 100644 --- a/src/ca/proto.rs +++ b/src/ca/proto.rs @@ -1277,7 +1277,7 @@ pub trait AsyncWriteRead: AsyncWrite + AsyncRead + Send + 'static {} impl AsyncWriteRead for T where T: AsyncWrite + AsyncRead + Send + 'static {} -pub struct CaProto { +pub struct CaProto { tcp: Pin>, tcp_eof: bool, remote_name: String, @@ -1286,7 +1286,7 @@ pub struct CaProto { outbuf: SlideBuf, out: VecDeque, array_truncate: usize, - stats: STATS, + mett: crate::mett::CaProtoMetrics, resqu: VecDeque, event_add_res_cnt: u32, bytes_recv_testing: u32, @@ -1303,7 +1303,6 @@ impl fmt::Debug for CaProto { .field("outbuf", &self.outbuf) .field("out", &self.out) .field("array_truncate", &self.array_truncate) - .field("stats", &self.stats) .field("resqu", &self.resqu) .field("event_add_res_cnt", &self.event_add_res_cnt) .field("bytes_recv_testing", &self.bytes_recv_testing) @@ -1311,16 +1310,8 @@ impl fmt::Debug for CaProto { } } -impl CaProto -where - STATS: CaProtoStatsRecv, -{ - pub fn new( - tcp: T, - remote_name: String, - array_truncate: usize, - stats: STATS, - ) -> Self { +impl CaProto { + pub fn new(tcp: T, remote_name: String, array_truncate: usize) -> Self { Self { tcp: Box::pin(tcp), tcp_eof: false, @@ -1330,13 +1321,17 @@ where outbuf: SlideBuf::new(1024 * 256), out: VecDeque::new(), array_truncate, - stats, + mett: crate::mett::CaProtoMetrics::new(), resqu: VecDeque::with_capacity(256), event_add_res_cnt: 0, bytes_recv_testing: 0, } } + pub fn mett(&mut self) -> &mut crate::mett::CaProtoMetrics { + &mut self.mett + } + pub fn proto_out_len(&self) -> usize { self.out.len() } @@ -1369,7 +1364,7 @@ where Ready(k) => match k { Ok(k) => match self.outbuf.adv(k) { Ok(()) => { - self.stats.out_bytes().add(k as u64); + self.mett.out_bytes().add(k as u32); Ready(Ok(k)) } Err(e) => { @@ -1393,7 +1388,7 @@ where let tsnow = Instant::now(); { let g = self.outbuf.len(); - self.stats.outbuf_len().ingest(g as u32); + self.mett.outbuf_len().push_val(g as u32); } while let Some((msg, buf)) = self.out_msg_buf() { let msglen = msg.len(); @@ -1403,7 +1398,7 @@ where msg.place_into(&mut buf[..msglen]); self.outbuf.wadv(msglen)?; self.out.pop_front(); - self.stats.out_msg_placed().inc(); + self.mett.out_msg_placed().inc(); } while self.outbuf.len() != 0 { match Self::attempt_output(self.as_mut(), cx)? { @@ -1470,8 +1465,8 @@ where self.buf.wadv(nf)?; } have_progress = true; - self.stats.tcp_recv_count().inc(); - self.stats.tcp_recv_bytes().add(nf as _); + self.mett.tcp_recv_count().inc(); + self.mett.tcp_recv_bytes().add(nf as _); continue; } } @@ -1516,13 +1511,13 @@ where let hi = HeadInfo::from_netbuf(&mut self.buf)?; if hi.cmdid > 26 { // TODO count as logic error - self.stats.protocol_issue().inc(); + self.mett.protocol_issue().inc(); } if hi.payload_size == 0xffff { self.state = CaState::ExtHead(hi); Ok(None) } else { - self.stats.payload_size().ingest(hi.payload_len() as u32); + self.mett.payload_size().push_val(hi.payload_len() as u32); if hi.payload_size == 0 { self.state = CaState::StdHead; let msg = CaMsg::from_proto_infos(&hi, &[], tsnow, self.array_truncate)?; @@ -1536,9 +1531,9 @@ where CaState::ExtHead(hi) => { let payload_size = self.buf.read_u32_be()?; let data_count = self.buf.read_u32_be()?; - self.stats.payload_size().ingest(hi.payload_len() as u32); + self.mett.payload_size().push_val(hi.payload_len() as u32); if payload_size > PAYLOAD_LEN_MAX { - self.stats.payload_ext_very_large().inc(); + self.mett.payload_ext_very_large().inc(); if false { warn!( "ExtHead data_type {} payload_size {payload_size} data_count {data_count}", @@ -1548,7 +1543,7 @@ where } if payload_size <= 0x3ff0 { // NOTE can happen even with zero payload, just because data-count exceeds u16. - self.stats.payload_ext_but_small().inc(); + self.mett.payload_ext_but_small().inc(); if false { warn!( "ExtHead data_type {} payload_size {payload_size} data_count {data_count}", @@ -1566,7 +1561,7 @@ where // data-count is only reasonable for event messages let ret = match &msg.ty { CaMsgTy::EventAddRes(..) => { - self.stats.data_count().ingest(hi.data_count() as u32); + self.mett.data_count().push_val(hi.data_count() as u32); if TESTING_UNRESPONSIVE_TODO_REMOVE { if self.event_add_res_cnt < TESTING_EVENT_ADD_RES_MAX { self.event_add_res_cnt += 1; diff --git a/src/lib.rs b/src/lib.rs index ca75baf..ec468bc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1 +1,2 @@ pub mod ca; +pub mod mett; diff --git a/src/mett.rs b/src/mett.rs new file mode 100644 index 0000000..1d170cf --- /dev/null +++ b/src/mett.rs @@ -0,0 +1,4 @@ +use mettrics::types::CounterU32; +use mettrics::types::HistoLog2; + +mettrics::macros::make_metrics!("metrics.rs");