This commit is contained in:
Dominik Werder
2024-11-14 12:14:51 +01:00
parent 1de0060f89
commit 505ea3c6ac
4 changed files with 4 additions and 79 deletions

View File

@@ -26,7 +26,6 @@ regex = "1.8.4"
axum = "0.7.5"
http-body = "1"
url = "2.2"
#hyper = "1.3.1"
chrono = "0.4"
humantime = "2.1.0"
humantime-serde = "1.1.1"
@@ -52,8 +51,8 @@ items_0 = { path = "../../daqbuf-items-0", package = "daqbuf-items-0" }
items_2 = { path = "../../daqbuf-items-2", package = "daqbuf-items-2" }
streams = { path = "../../daqbuf-streams", package = "daqbuf-streams" }
ca_proto = { path = "../../daqbuf-ca-proto", package = "daqbuf-ca-proto" }
ca-proto-tokio = { path = "../../daqbuf-ca-proto-tokio", package = "daqbuf-ca-proto-tokio" }
taskrun = { path = "../../daqbuffer/crates/taskrun" }
#bitshuffle = { path = "../../daqbuffer/crates/bitshuffle" }
mrucache = { path = "../mrucache" }
batchtools = { path = "../batchtools" }

View File

@@ -2,11 +2,11 @@ mod enumfetch;
use crate::conf::ChannelConfig;
use crate::metrics::status::StorageUsage;
use crate::tcpasyncwriteread::TcpAsyncWriteRead;
use crate::throttletrace::ThrottleTrace;
use async_channel::Receiver;
use async_channel::Sender;
use ca_proto::ca::proto;
use ca_proto_tokio::tcpasyncwriteread::TcpAsyncWriteRead;
use core::fmt;
use dbpg::seriesbychannel::ChannelInfoQuery;
use dbpg::seriesbychannel::ChannelInfoResult;
@@ -246,7 +246,6 @@ pub struct ChannelStateInfo {
mod ser_instant {
use super::*;
use netpod::DATETIME_FMT_3MS;
use serde::Deserializer;
use serde::Serializer;
pub fn serialize<S>(val: &Option<Instant>, ser: S) -> Result<S::Ok, S::Error>
@@ -280,14 +279,6 @@ mod ser_instant {
None => ser.serialize_none(),
}
}
pub fn deserialize<'de, D>(_de: D) -> Result<Option<Instant>, D::Error>
where
D: Deserializer<'de>,
{
let e = serde::de::Error::custom("todo deserialize for ser_instant");
Err(e)
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
@@ -1822,7 +1813,7 @@ impl CaConn {
match x.mon2state {
// actually, no differing behavior needed so far.
Monitoring2State::Passive(_) => {}
Monitoring2State::ReadPending(ioid, since) => {}
Monitoring2State::ReadPending(_, _) => {}
}
Some(x.subid.clone())
}
@@ -1988,7 +1979,7 @@ impl CaConn {
Monitoring2State::Passive(st3) => {
st3.tsbeg = tsnow;
}
Monitoring2State::ReadPending(ioid, since) => {}
Monitoring2State::ReadPending(_, _) => {}
}
let name = self.name_by_cid(cid);
warn!("received event-cancel but channel {name:?} in wrong state");
@@ -2884,16 +2875,6 @@ impl CaConn {
Ok(())
}
// `?` works not in here.
fn _test_control_flow(&mut self, _cx: &mut Context) -> ControlFlow<Poll<Result<(), Error>>> {
use ControlFlow::*;
use Poll::*;
let e = Error::CreateChannelBadState;
// Err(e)?;
let _ = e;
Break(Pending)
}
fn handle_conn_state(&mut self, tsnow: Instant, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
use Poll::*;
loop {

View File

@@ -8,7 +8,6 @@ pub mod netbuf;
pub mod polltimer;
pub mod ratelimit;
pub mod rt;
pub mod tcpasyncwriteread;
#[cfg(test)]
pub mod test;
pub mod throttletrace;

View File

@@ -1,54 +0,0 @@
use futures_util::AsyncRead;
use futures_util::AsyncWrite;
use std::io;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use taskrun::tokio::io::ReadBuf;
use taskrun::tokio::net::TcpStream;
#[pin_project::pin_project]
pub struct TcpAsyncWriteRead {
#[pin]
tcp: TcpStream,
}
impl From<TcpStream> for TcpAsyncWriteRead {
fn from(value: TcpStream) -> Self {
Self { tcp: value }
}
}
impl AsyncWrite for TcpAsyncWriteRead {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
use taskrun::tokio::io::AsyncWrite;
let this = self.project();
this.tcp.poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
use taskrun::tokio::io::AsyncWrite;
let this = self.project();
this.tcp.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
use taskrun::tokio::io::AsyncWrite;
let this = self.project();
this.tcp.poll_shutdown(cx)
}
}
impl AsyncRead for TcpAsyncWriteRead {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<io::Result<usize>> {
use taskrun::tokio::io::AsyncRead;
use Poll::*;
let this = self.project();
let mut readbuf = ReadBuf::new(buf);
match this.tcp.poll_read(cx, &mut readbuf) {
Ready(Ok(())) => Ready(Ok(readbuf.filled().len())),
Ready(Err(e)) => Ready(Err(e)),
Pending => Pending,
}
}
}