From 9a81e25625aaf177510496ea13e5f0cfd982450c Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 17 Feb 2025 17:55:58 +0100 Subject: [PATCH] WIP conn2 --- daqingest/Cargo.toml | 2 +- netfetch/src/ca/conn2/conn.rs | 6 ++++-- netfetch/src/ca/conn2/conn/connected.rs | 2 +- netfetch/src/ca/conn2/conn/connecting.rs | 4 ++++ 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 98651d9..4523fe6 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.7-aa.0" +version = "0.2.7-aa.1" authors = ["Dominik Werder "] edition = "2021" diff --git a/netfetch/src/ca/conn2/conn.rs b/netfetch/src/ca/conn2/conn.rs index 92a64c1..f2c862e 100644 --- a/netfetch/src/ca/conn2/conn.rs +++ b/netfetch/src/ca/conn2/conn.rs @@ -77,7 +77,7 @@ enum ConnectedState { #[derive(Debug)] enum CaConnState { Connecting(Connecting), - Connected(CaProto), + Connected(Connected), Shutdown(EndOfStreamReason), Done, } @@ -313,12 +313,14 @@ impl Stream for CaConn { // } // } + let tsnow4 = Instant::now(); + match &mut self.state { CaConnState::Connecting(st2) => match st2.poll_unpin(cx) { Ready(x) => match x { Ok(Some(x)) => { hpp.have_progress(); - self.state = CaConnState::Connected(Connected::new(x)); + self.state = CaConnState::Connected(Connected::new(st2.addr(), x, tsnow4)); } Ok(None) => { // TODO diff --git a/netfetch/src/ca/conn2/conn/connected.rs b/netfetch/src/ca/conn2/conn/connected.rs index 846fe8d..bddf9f8 100644 --- a/netfetch/src/ca/conn2/conn/connected.rs +++ b/netfetch/src/ca/conn2/conn/connected.rs @@ -42,7 +42,7 @@ impl fmt::Debug for Connected { } impl Connected { - pub fn new(tcp: TcpStream) -> Self { + pub fn new(remote_addr: SocketAddrV4, tcp: TcpStream, tsnow: Instant) -> Self { Self { tsbeg: tsnow, addr: remote_addr, diff --git a/netfetch/src/ca/conn2/conn/connecting.rs b/netfetch/src/ca/conn2/conn/connecting.rs index 81fa51c..a6793a0 100644 --- a/netfetch/src/ca/conn2/conn/connecting.rs +++ b/netfetch/src/ca/conn2/conn/connecting.rs @@ -50,6 +50,10 @@ impl Connecting { } } + pub fn addr(&self) -> SocketAddrV4 { + self.addr + } + pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll, Error>> { use Poll::*; match self.fut.poll_unpin(cx) {