From 505ea3c6ac01e88039df942dc7598fcad43392dc Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 14 Nov 2024 12:14:51 +0100 Subject: [PATCH] Cleanup --- netfetch/Cargo.toml | 3 +- netfetch/src/ca/conn.rs | 25 ++------------ netfetch/src/lib.rs | 1 - netfetch/src/tcpasyncwriteread.rs | 54 ------------------------------- 4 files changed, 4 insertions(+), 79 deletions(-) delete mode 100644 netfetch/src/tcpasyncwriteread.rs diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index bd1a8b6..618499b 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -26,7 +26,6 @@ regex = "1.8.4" axum = "0.7.5" http-body = "1" url = "2.2" -#hyper = "1.3.1" chrono = "0.4" humantime = "2.1.0" humantime-serde = "1.1.1" @@ -52,8 +51,8 @@ items_0 = { path = "../../daqbuf-items-0", package = "daqbuf-items-0" } items_2 = { path = "../../daqbuf-items-2", package = "daqbuf-items-2" } streams = { path = "../../daqbuf-streams", package = "daqbuf-streams" } ca_proto = { path = "../../daqbuf-ca-proto", package = "daqbuf-ca-proto" } +ca-proto-tokio = { path = "../../daqbuf-ca-proto-tokio", package = "daqbuf-ca-proto-tokio" } taskrun = { path = "../../daqbuffer/crates/taskrun" } -#bitshuffle = { path = "../../daqbuffer/crates/bitshuffle" } mrucache = { path = "../mrucache" } batchtools = { path = "../batchtools" } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index b20d2b5..03cfbde 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -2,11 +2,11 @@ mod enumfetch; use crate::conf::ChannelConfig; use crate::metrics::status::StorageUsage; -use crate::tcpasyncwriteread::TcpAsyncWriteRead; use crate::throttletrace::ThrottleTrace; use async_channel::Receiver; use async_channel::Sender; use ca_proto::ca::proto; +use ca_proto_tokio::tcpasyncwriteread::TcpAsyncWriteRead; use core::fmt; use dbpg::seriesbychannel::ChannelInfoQuery; use dbpg::seriesbychannel::ChannelInfoResult; @@ -246,7 +246,6 @@ pub struct ChannelStateInfo { mod ser_instant { use super::*; use netpod::DATETIME_FMT_3MS; - use serde::Deserializer; use serde::Serializer; pub fn serialize(val: &Option, ser: S) -> Result @@ -280,14 +279,6 @@ mod ser_instant { None => ser.serialize_none(), } } - - pub fn deserialize<'de, D>(_de: D) -> Result, D::Error> - where - D: Deserializer<'de>, - { - let e = serde::de::Error::custom("todo deserialize for ser_instant"); - Err(e) - } } #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] @@ -1822,7 +1813,7 @@ impl CaConn { match x.mon2state { // actually, no differing behavior needed so far. Monitoring2State::Passive(_) => {} - Monitoring2State::ReadPending(ioid, since) => {} + Monitoring2State::ReadPending(_, _) => {} } Some(x.subid.clone()) } @@ -1988,7 +1979,7 @@ impl CaConn { Monitoring2State::Passive(st3) => { st3.tsbeg = tsnow; } - Monitoring2State::ReadPending(ioid, since) => {} + Monitoring2State::ReadPending(_, _) => {} } let name = self.name_by_cid(cid); warn!("received event-cancel but channel {name:?} in wrong state"); @@ -2884,16 +2875,6 @@ impl CaConn { Ok(()) } - // `?` works not in here. - fn _test_control_flow(&mut self, _cx: &mut Context) -> ControlFlow>> { - use ControlFlow::*; - use Poll::*; - let e = Error::CreateChannelBadState; - // Err(e)?; - let _ = e; - Break(Pending) - } - fn handle_conn_state(&mut self, tsnow: Instant, cx: &mut Context) -> Result>, Error> { use Poll::*; loop { diff --git a/netfetch/src/lib.rs b/netfetch/src/lib.rs index 6b58354..2edae26 100644 --- a/netfetch/src/lib.rs +++ b/netfetch/src/lib.rs @@ -8,7 +8,6 @@ pub mod netbuf; pub mod polltimer; pub mod ratelimit; pub mod rt; -pub mod tcpasyncwriteread; #[cfg(test)] pub mod test; pub mod throttletrace; diff --git a/netfetch/src/tcpasyncwriteread.rs b/netfetch/src/tcpasyncwriteread.rs deleted file mode 100644 index 33df685..0000000 --- a/netfetch/src/tcpasyncwriteread.rs +++ /dev/null @@ -1,54 +0,0 @@ -use futures_util::AsyncRead; -use futures_util::AsyncWrite; -use std::io; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; -use taskrun::tokio::io::ReadBuf; -use taskrun::tokio::net::TcpStream; - -#[pin_project::pin_project] -pub struct TcpAsyncWriteRead { - #[pin] - tcp: TcpStream, -} - -impl From for TcpAsyncWriteRead { - fn from(value: TcpStream) -> Self { - Self { tcp: value } - } -} - -impl AsyncWrite for TcpAsyncWriteRead { - fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { - use taskrun::tokio::io::AsyncWrite; - let this = self.project(); - this.tcp.poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use taskrun::tokio::io::AsyncWrite; - let this = self.project(); - this.tcp.poll_flush(cx) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use taskrun::tokio::io::AsyncWrite; - let this = self.project(); - this.tcp.poll_shutdown(cx) - } -} - -impl AsyncRead for TcpAsyncWriteRead { - fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { - use taskrun::tokio::io::AsyncRead; - use Poll::*; - let this = self.project(); - let mut readbuf = ReadBuf::new(buf); - match this.tcp.poll_read(cx, &mut readbuf) { - Ready(Ok(())) => Ready(Ok(readbuf.filled().len())), - Ready(Err(e)) => Ready(Err(e)), - Pending => Pending, - } - } -}