Handle java impl bug better

This commit is contained in:
Dominik Werder
2024-09-27 13:25:21 +02:00
parent a2906590be
commit ca8ff71238
4 changed files with 302 additions and 269 deletions

View File

@@ -1049,7 +1049,6 @@ pub struct CaConn {
ca_conn_event_out_queue_max: usize,
thr_msg_poll: ThrottleTrace,
ca_proto_stats: Arc<CaProtoStats>,
weird_count: usize,
rng: Xoshiro128PlusPlus,
channel_info_query_qu: VecDeque<ChannelInfoQuery>,
channel_info_query_tx: Pin<Box<SenderPolling<ChannelInfoQuery>>>,
@@ -1063,6 +1062,8 @@ pub struct CaConn {
read_ioids: HashMap<Ioid, Cid>,
handler_by_ioid: HashMap<Ioid, Option<Pin<Box<dyn ConnFuture>>>>,
trace_channel_poll: bool,
ca_msg_recv_count: u64,
ca_version_recv_count: u64,
}
impl Drop for CaConn {
@@ -1117,7 +1118,6 @@ impl CaConn {
ca_conn_event_out_queue_max: 2000,
thr_msg_poll: ThrottleTrace::new(Duration::from_millis(10000)),
ca_proto_stats,
weird_count: 0,
rng,
channel_info_query_qu: VecDeque::new(),
channel_info_query_tx: Box::pin(SenderPolling::new(channel_info_query_tx)),
@@ -1128,6 +1128,8 @@ impl CaConn {
read_ioids: HashMap::new(),
handler_by_ioid: HashMap::new(),
trace_channel_poll: false,
ca_msg_recv_count: 0,
ca_version_recv_count: 0,
}
}
@@ -2589,11 +2591,10 @@ impl CaConn {
warn!("CaConn sees: {msg:?}");
}
}
CaMsgTy::VersionRes(x) => {
debug!("VersionRes({x})");
self.weird_count += 1;
if self.weird_count > 200 {
// std::process::exit(13);
CaMsgTy::VersionRes(_) => {
if self.ca_msg_recv_count != 0 {
self.stats.ca_proto_version_later().inc();
// TODO emit log or count stats
}
}
CaMsgTy::ChannelCloseRes(x) => {
@@ -2606,6 +2607,12 @@ impl CaConn {
}
CaItem::Empty => {}
}
if self.ca_msg_recv_count == 0 {
if self.ca_version_recv_count == 0 {
self.stats.ca_proto_no_version_as_first().inc();
}
}
self.ca_msg_recv_count += 1;
Ready(Some(Ok(())))
}
Ready(Some(Err(e))) => {
@@ -2751,114 +2758,116 @@ impl CaConn {
fn handle_conn_state(&mut self, tsnow: Instant, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
use Poll::*;
match &mut self.state {
CaConnState::Unconnected(_since) => {
let addr = self.remote_addr_dbg.clone();
loop {
break match &mut self.state {
CaConnState::Unconnected(_since) => {
let addr = self.remote_addr_dbg.clone();
// TODO issue a TCP-connect event (and later a "connected")
trace!("create tcp connection to {:?}", (addr.ip(), addr.port()));
// TODO issue a TCP-connect event (and later a "connected")
trace!("create tcp connection to {:?}", (addr.ip(), addr.port()));
let fut = tokio::time::timeout(Duration::from_millis(1000), TcpStream::connect(addr));
self.state = CaConnState::Connecting(Instant::now(), addr, Box::pin(fut));
Ok(Ready(Some(())))
}
CaConnState::Connecting(_since, addr, fut) => {
match fut.poll_unpin(cx) {
Ready(connect_result) => {
match connect_result {
Ok(Ok(tcp)) => {
self.stats.tcp_connected.inc();
let addr = addr.clone();
self.emit_connection_status_item(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::Established,
})?;
self.backoff_reset();
let proto = CaProto::new(
tcp,
self.remote_addr_dbg.to_string(),
self.opts.array_truncate,
self.ca_proto_stats.clone(),
);
self.state = CaConnState::Init;
self.proto = Some(proto);
Ok(Ready(Some(())))
}
Ok(Err(e)) => {
use std::io::ErrorKind;
debug!("error connect to {addr} {e}");
let addr = addr.clone();
self.emit_connection_status_item(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::ConnectError,
})?;
let reason = match e.kind() {
ErrorKind::ConnectionRefused => ShutdownReason::ConnectRefused,
_ => ShutdownReason::IoError,
};
self.trigger_shutdown(reason);
Ok(Ready(Some(())))
}
Err(e) => {
// TODO log with exponential backoff
debug!("timeout connect to {addr} {e}");
let addr = addr.clone();
self.emit_connection_status_item(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::ConnectTimeout,
})?;
self.trigger_shutdown(ShutdownReason::ConnectTimeout);
Ok(Ready(Some(())))
let fut = tokio::time::timeout(Duration::from_millis(1000), TcpStream::connect(addr));
self.state = CaConnState::Connecting(Instant::now(), addr, Box::pin(fut));
Ok(Ready(Some(())))
}
CaConnState::Connecting(_since, addr, fut) => {
match fut.poll_unpin(cx) {
Ready(connect_result) => {
match connect_result {
Ok(Ok(tcp)) => {
self.stats.tcp_connected.inc();
let addr = addr.clone();
self.emit_connection_status_item(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::Established,
})?;
self.backoff_reset();
let proto = CaProto::new(
tcp,
self.remote_addr_dbg.to_string(),
self.opts.array_truncate,
self.ca_proto_stats.clone(),
);
self.state = CaConnState::Init;
self.proto = Some(proto);
Ok(Ready(Some(())))
}
Ok(Err(e)) => {
use std::io::ErrorKind;
debug!("error connect to {addr} {e}");
let addr = addr.clone();
self.emit_connection_status_item(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::ConnectError,
})?;
let reason = match e.kind() {
ErrorKind::ConnectionRefused => ShutdownReason::ConnectRefused,
_ => ShutdownReason::IoError,
};
self.trigger_shutdown(reason);
Ok(Ready(Some(())))
}
Err(e) => {
// TODO log with exponential backoff
debug!("timeout connect to {addr} {e}");
let addr = addr.clone();
self.emit_connection_status_item(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::ConnectTimeout,
})?;
self.trigger_shutdown(ShutdownReason::ConnectTimeout);
Ok(Ready(Some(())))
}
}
}
Pending => Ok(Pending),
}
Pending => Ok(Pending),
}
}
CaConnState::Init => {
trace4!("Init");
let hostname = self.local_epics_hostname.clone();
let proto = self.proto.as_mut().unwrap();
let msg = CaMsg::from_ty_ts(CaMsgTy::Version, tsnow);
proto.push_out(msg);
let msg = CaMsg::from_ty_ts(CaMsgTy::ClientName, tsnow);
proto.push_out(msg);
let msg = CaMsg::from_ty_ts(CaMsgTy::HostName(hostname), tsnow);
proto.push_out(msg);
self.state = CaConnState::Handshake;
Ok(Ready(Some(())))
}
CaConnState::Handshake => {
if true {
// because of bad java clients which do not send a version, skip the handshake.
self.state = CaConnState::PeerReady;
self.handle_conn_state(tsnow, cx)
} else {
match {
let res = self.handle_handshake(cx);
res
} {
CaConnState::Init => {
trace4!("Init");
let hostname = self.local_epics_hostname.clone();
let proto = self.proto.as_mut().unwrap();
let msg = CaMsg::from_ty_ts(CaMsgTy::Version, tsnow);
proto.push_out(msg);
let msg = CaMsg::from_ty_ts(CaMsgTy::ClientName, tsnow);
proto.push_out(msg);
let msg = CaMsg::from_ty_ts(CaMsgTy::HostName(hostname), tsnow);
proto.push_out(msg);
self.state = CaConnState::Handshake;
Ok(Ready(Some(())))
}
CaConnState::Handshake => {
if true {
// because of bad java clients which do not send a version, skip the handshake.
self.state = CaConnState::PeerReady;
continue;
} else {
match {
let res = self.handle_handshake(cx);
res
} {
Ready(Some(Ok(()))) => Ok(Ready(Some(()))),
Ready(Some(Err(e))) => Err(e),
Ready(None) => Ok(Ready(Some(()))),
Pending => Ok(Pending),
}
}
}
CaConnState::PeerReady => {
let res = self.handle_peer_ready(cx);
match res {
Ready(Some(Ok(()))) => Ok(Ready(Some(()))),
Ready(Some(Err(e))) => Err(e),
Ready(None) => Ok(Ready(Some(()))),
Pending => Ok(Pending),
}
}
}
CaConnState::PeerReady => {
let res = self.handle_peer_ready(cx);
match res {
Ready(Some(Ok(()))) => Ok(Ready(Some(()))),
Ready(Some(Err(e))) => Err(e),
Ready(None) => Ok(Ready(Some(()))),
Pending => Ok(Pending),
}
}
CaConnState::Shutdown(..) => Ok(Ready(None)),
CaConnState::EndOfStream => Ok(Ready(None)),
CaConnState::Shutdown(..) => Ok(Ready(None)),
CaConnState::EndOfStream => Ok(Ready(None)),
};
}
}
@@ -2866,6 +2875,7 @@ impl CaConn {
use Poll::*;
let tsnow = Instant::now();
let mut have_progress = false;
let mut have_pending = false;
for _ in 0..64 {
self.stats.loop2_count.inc();
if self.is_shutdown() {
@@ -2883,7 +2893,10 @@ impl CaConn {
error!("handle_conn_state yields {x:?}");
return Err(Error::LoopInnerLogicError);
}
Pending => return Ok(Pending),
Pending => {
have_pending = true;
break;
}
},
Err(e) => return Err(e),
}
@@ -2891,6 +2904,8 @@ impl CaConn {
}
if have_progress {
Ok(Ready(Some(())))
} else if have_pending {
Ok(Pending)
} else {
Ok(Ready(None))
}
@@ -2923,6 +2938,7 @@ impl CaConn {
}
// cx.waker().wake_by_ref();
}
self.housekeeping_self();
self.check_channels_state_init(tsnow, cx)?;
self.check_channels_state_poll(tsnow, cx)?;
self.check_channels_alive(tsnow, cx)?;
@@ -2953,6 +2969,14 @@ impl CaConn {
Ok(())
}
fn housekeeping_self(&mut self) {
let cnt_max = 0xfffffff000000000;
if self.ca_msg_recv_count > cnt_max {
let mask = !cnt_max;
self.ca_msg_recv_count &= mask;
}
}
fn emit_channel_status(&mut self) -> Result<(), Error> {
let stnow = SystemTime::now();
let mut channel_statuses = BTreeMap::new();