From 40715983946c790af40291e7dcb8dc78fe912973 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 16 Apr 2025 15:37:39 +0200 Subject: [PATCH] Expose raw fd --- metrics.rs | 2 ++ src/ca/proto.rs | 40 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/metrics.rs b/metrics.rs index a8af981..7aa9e99 100644 --- a/metrics.rs +++ b/metrics.rs @@ -10,6 +10,8 @@ mod Metrics { payload_ext_very_large, out_msg_placed, out_bytes, + fionread_inc, + fionread_dec, } enum histolog2s { payload_size, diff --git a/src/ca/proto.rs b/src/ca/proto.rs index 6be18e2..0e0efec 100644 --- a/src/ca/proto.rs +++ b/src/ca/proto.rs @@ -733,15 +733,37 @@ macro_rules! convert_wave_value { }}; } +#[derive(Debug)] +pub struct CaMsgCommon { + ts: Instant, +} + +impl CaMsgCommon { + pub fn new(ts: Instant) -> Self { + Self { ts } + } + + pub fn ts(&self) -> Instant { + self.ts + } +} + #[derive(Debug)] pub struct CaMsg { pub ty: CaMsgTy, - pub ts: Instant, + common: CaMsgCommon, } impl CaMsg { pub fn from_ty_ts(ty: CaMsgTy, ts: Instant) -> Self { - Self { ty, ts } + Self { + ty, + common: CaMsgCommon::new(ts), + } + } + + pub fn into_parts(self) -> (CaMsgCommon, CaMsgTy) { + (self.common, self.ty) } fn len(&self) -> usize { @@ -1200,6 +1222,7 @@ impl AsyncWriteRead for T where T: AsyncWrite + AsyncRead + Send + 'static {} pub struct CaProto { tcp: Pin>, + raw_socket_fd: Option, tcp_eof: bool, remote_name: String, state: CaState, @@ -1217,6 +1240,7 @@ impl fmt::Debug for CaProto { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("CaProto") // .field("tcp", &self.tcp) + .field("raw_socket_fd", &self.raw_socket_fd) .field("tcp_eof", &self.tcp_eof) .field("remote_name", &self.remote_name) .field("state", &self.state) @@ -1232,9 +1256,15 @@ impl fmt::Debug for CaProto { } impl CaProto { - pub fn new(tcp: T, remote_name: String, array_truncate: usize) -> Self { + pub fn new( + tcp: T, + raw_socket_fd: Option, + remote_name: String, + array_truncate: usize, + ) -> Self { Self { tcp: Box::pin(tcp), + raw_socket_fd, tcp_eof: false, remote_name, state: CaState::StdHead, @@ -1503,6 +1533,10 @@ impl CaProto { CaState::Done => Err(Error::ParseAttemptInDoneState), } } + + pub fn get_raw_socket_fd(&self) -> Option { + self.raw_socket_fd.clone() + } } impl Stream for CaProto {