Improve CaConn state handling

This commit is contained in:
Dominik Werder
2024-01-26 00:07:38 +01:00
parent edbbb4f751
commit 2a5b7cbccd
3 changed files with 142 additions and 120 deletions

View File

@@ -100,8 +100,9 @@ macro_rules! trace4 {
#[derive(Debug, ThisError)]
pub enum Error {
ConnectFail,
NoProto,
NoProtocol,
ProtocolError,
IocIssue,
Protocol(#[from] crate::ca::proto::Error),
Writer(#[from] serieswriter::writer::Error),
UnknownCid(Cid),
@@ -114,6 +115,7 @@ pub enum Error {
ClosedSending,
NoProgressNoPending,
ShutdownWithQueuesNoProgressNoPending,
Error,
}
impl err::ToErr for Error {
@@ -440,9 +442,8 @@ enum CaConnState {
Init,
Handshake,
PeerReady,
Shutdown,
Shutdown(EndOfStreamReason),
EndOfStream,
Error,
}
impl fmt::Debug for CaConnState {
@@ -450,12 +451,11 @@ impl fmt::Debug for CaConnState {
match self {
Self::Unconnected(since) => fmt.debug_tuple("Unconnected").field(since).finish(),
Self::Connecting(since, addr, _) => fmt.debug_tuple("Connecting").field(since).field(addr).finish(),
Self::Init => write!(fmt, "Init"),
Self::Handshake => write!(fmt, "Handshake"),
Self::PeerReady => write!(fmt, "PeerReady"),
Self::Shutdown => write!(fmt, "Shutdown"),
Self::EndOfStream => write!(fmt, "EndOfStream"),
Self::Error => write!(fmt, "Error"),
Self::Init => fmt.debug_tuple("Init").finish(),
Self::Handshake => fmt.debug_tuple("Handshake").finish(),
Self::PeerReady => fmt.debug_tuple("PeerReady").finish(),
Self::Shutdown(v0) => fmt.debug_tuple("Shutdown").field(v0).finish(),
Self::EndOfStream => fmt.debug_tuple("EndOfStream").finish(),
}
}
}
@@ -619,6 +619,29 @@ impl ConnCommandResult {
}
}
#[derive(Debug)]
pub struct CaConnEvent {
pub ts: Instant,
pub value: CaConnEventValue,
}
impl CaConnEvent {
pub fn new(ts: Instant, value: CaConnEventValue) -> Self {
Self { ts, value }
}
pub fn err_now(err: Error) -> Self {
Self::new_now(CaConnEventValue::EndOfStream(EndOfStreamReason::Error(err)))
}
pub fn new_now(value: CaConnEventValue) -> Self {
Self {
ts: Instant::now(),
value,
}
}
}
#[derive(Debug)]
pub enum CaConnEventValue {
None,
@@ -626,14 +649,16 @@ pub enum CaConnEventValue {
ConnCommandResult(ConnCommandResult),
ChannelStatus(ChannelStatusPartial),
ChannelCreateFail(String),
Error(Error),
EndOfStream,
EndOfStream(EndOfStreamReason),
}
#[derive(Debug)]
pub struct CaConnEvent {
pub ts: Instant,
pub value: CaConnEventValue,
pub enum EndOfStreamReason {
UnspecifiedReason,
Error(Error),
ConnectFail,
OnCommand,
RemoteClosed,
}
pub struct CaConnOpts {
@@ -787,7 +812,7 @@ impl CaConn {
}
fn is_shutdown(&self) -> bool {
if let CaConnState::Shutdown = self.state {
if let CaConnState::Shutdown(..) = self.state {
true
} else {
false
@@ -795,25 +820,37 @@ impl CaConn {
}
fn trigger_shutdown(&mut self, channel_reason: ChannelStatusClosedReason) {
self.state = CaConnState::Shutdown;
self.proto = None;
match &channel_reason {
ChannelStatusClosedReason::ShutdownCommand => {}
ChannelStatusClosedReason::ChannelRemove => {}
ChannelStatusClosedReason::ProtocolError => {}
ChannelStatusClosedReason::FrequencyQuota => {}
ChannelStatusClosedReason::BandwidthQuota => {}
ChannelStatusClosedReason::InternalError => {}
ChannelStatusClosedReason::IocTimeout => {}
ChannelStatusClosedReason::NoProtocol => {}
ChannelStatusClosedReason::ProtocolDone => {}
ChannelStatusClosedReason::ConnectFail => {
debug!("emit status ConnectFail");
let item = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::Error(Error::ConnectFail),
};
self.ca_conn_event_out_queue.push_back(item);
self.state = CaConnState::Shutdown(EndOfStreamReason::ConnectFail);
}
ChannelStatusClosedReason::ShutdownCommand => {
self.state = CaConnState::Shutdown(EndOfStreamReason::OnCommand);
}
ChannelStatusClosedReason::ChannelRemove => {
self.state = CaConnState::Shutdown(EndOfStreamReason::ConnectFail);
}
ChannelStatusClosedReason::ProtocolError => {
self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::ProtocolError));
}
ChannelStatusClosedReason::FrequencyQuota => {
self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::IocIssue));
}
ChannelStatusClosedReason::BandwidthQuota => {
self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::IocIssue));
}
ChannelStatusClosedReason::InternalError => {
self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::Error));
}
ChannelStatusClosedReason::IocTimeout => {
self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::IocIssue));
}
ChannelStatusClosedReason::NoProtocol => {
self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::NoProtocol));
}
ChannelStatusClosedReason::ProtocolDone => {
self.state = CaConnState::Shutdown(EndOfStreamReason::RemoteClosed);
}
}
self.channel_state_on_shutdown(channel_reason);
@@ -1827,7 +1864,7 @@ impl CaConn {
let proto = if let Some(x) = self.proto.as_mut() {
x
} else {
return Ready(Some(Err(Error::NoProto)));
return Ready(Some(Err(Error::NoProtocol)));
};
let res = match proto.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
@@ -2125,9 +2162,8 @@ impl CaConn {
Pending => Ok(Pending),
}
}
CaConnState::Shutdown => Ok(Ready(None)),
CaConnState::Shutdown(..) => Ok(Ready(None)),
CaConnState::EndOfStream => Ok(Ready(None)),
CaConnState::Error => Ok(Ready(None)),
}
}
@@ -2209,9 +2245,8 @@ impl CaConn {
CaConnState::Init => {}
CaConnState::Handshake => {}
CaConnState::PeerReady => {}
CaConnState::Shutdown => {}
CaConnState::Shutdown(..) => {}
CaConnState::EndOfStream => {}
CaConnState::Error => {}
}
Ok(())
}
@@ -2341,7 +2376,7 @@ macro_rules! flush_queue {
Ok(Pending) => {
*$have.1 |= true;
}
Err(e) => break Ready(Some(Err(e))),
Err(e) => break Ready(Some(CaConnEvent::err_now(e))),
}
};
}
@@ -2361,7 +2396,7 @@ fn send_batched<const N: usize, T>(qu: &mut VecDeque<T>) -> Option<VecDeque<T>>
}
impl Stream for CaConn {
type Item = Result<CaConnEvent, Error>;
type Item = CaConnEvent;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
@@ -2387,9 +2422,8 @@ impl Stream for CaConn {
if let CaConnState::EndOfStream = self.state {
break Ready(None);
}
if let Some(item) = self.ca_conn_event_out_queue.pop_front() {
break Ready(Some(Ok(item)));
} else if let Some(item) = self.ca_conn_event_out_queue.pop_front() {
break Ready(Some(item));
}
let lts2 = Instant::now();
@@ -2401,7 +2435,7 @@ impl Stream for CaConn {
Ok(Pending) => {
have_pending = true;
}
Err(e) => break Ready(Some(Err(e))),
Err(e) => break Ready(Some(CaConnEvent::err_now(e))),
}
{
@@ -2453,7 +2487,7 @@ impl Stream for CaConn {
Ok(Pending) => {
have_pending = true;
}
Err(e) => break Ready(Some(Err(e))),
Err(e) => break Ready(Some(CaConnEvent::err_now(e))),
}
let lts5 = Instant::now();
@@ -2466,7 +2500,7 @@ impl Stream for CaConn {
Ok(Pending) => {
have_pending = true;
}
Err(e) => break Ready(Some(Err(e))),
Err(e) => break Ready(Some(CaConnEvent::err_now(e))),
}
let lts6 = Instant::now();
@@ -2482,7 +2516,7 @@ impl Stream for CaConn {
Err(e) => {
error!("{e}");
self.state = CaConnState::EndOfStream;
break Ready(Some(Err(e)));
break Ready(Some(CaConnEvent::err_now(e)));
}
}
@@ -2518,8 +2552,11 @@ impl Stream for CaConn {
break if self.is_shutdown() {
if self.queues_out_flushed() {
debug!("is_shutdown queues_out_flushed set EOS {}", self.remote_addr_dbg);
self.state = CaConnState::EndOfStream;
Ready(None)
if let CaConnState::Shutdown(x) = std::mem::replace(&mut self.state, CaConnState::EndOfStream) {
Ready(Some(CaConnEvent::new_now(CaConnEventValue::EndOfStream(x))))
} else {
continue;
}
} else {
if have_progress {
debug!("is_shutdown NOT queues_out_flushed prog {}", self.remote_addr_dbg);
@@ -2535,7 +2572,7 @@ impl Stream for CaConn {
error!("shutting down, queues not flushed, no progress, no pending");
self.stats.logic_error().inc();
let e = Error::ShutdownWithQueuesNoProgressNoPending;
Ready(Some(Err(e)))
Ready(Some(CaConnEvent::err_now(e)))
}
}
} else {
@@ -2543,10 +2580,7 @@ impl Stream for CaConn {
if poll_ts1.elapsed() > Duration::from_millis(5) {
self.stats.poll_wake_break().inc();
cx.waker().wake_by_ref();
break Ready(Some(Ok(CaConnEvent {
ts: poll_ts1,
value: CaConnEventValue::None,
})));
break Ready(Some(CaConnEvent::new(self.poll_tsnow, CaConnEventValue::None)));
} else {
self.stats.poll_reloop().inc();
reloops += 1;
@@ -2558,7 +2592,7 @@ impl Stream for CaConn {
} else {
self.stats.poll_no_progress_no_pending().inc();
let e = Error::NoProgressNoPending;
Ready(Some(Err(e)))
Ready(Some(CaConnEvent::err_now(e)))
}
};
};

View File

@@ -1,3 +1,4 @@
use super::conn::EndOfStreamReason;
use super::findioc::FindIocRes;
use crate::ca::conn;
use crate::ca::statemap;
@@ -84,7 +85,6 @@ const MAYBE_WRONG_ADDRESS_STAY: Duration = Duration::from_millis(4000);
const SEARCH_PENDING_TIMEOUT: Duration = Duration::from_millis(30000);
const CHANNEL_HEALTH_TIMEOUT: Duration = Duration::from_millis(30000);
const CHANNEL_UNASSIGNED_TIMEOUT: Duration = Duration::from_millis(0);
const CHANNEL_BACKOFF: Duration = Duration::from_millis(10000);
const CHANNEL_MAX_WITHOUT_HEALTH_UPDATE: usize = 3000000;
#[allow(unused)]
@@ -552,8 +552,7 @@ impl CaConnSet {
CaConnEventValue::ConnCommandResult(x) => self.handle_conn_command_result(addr, x),
CaConnEventValue::ChannelCreateFail(x) => self.handle_channel_create_fail(addr, x),
CaConnEventValue::ChannelStatus(st) => self.apply_ca_conn_health_update(addr, st),
CaConnEventValue::Error(e) => self.handle_ca_conn_err(e, addr),
CaConnEventValue::EndOfStream => self.handle_ca_conn_eos(addr),
CaConnEventValue::EndOfStream(reason) => self.handle_ca_conn_eos(addr, reason),
}
}
@@ -889,8 +888,8 @@ impl CaConnSet {
Ok(())
}
fn handle_ca_conn_eos(&mut self, addr: SocketAddr) -> Result<(), Error> {
debug!("handle_ca_conn_eos {addr}");
fn handle_ca_conn_eos(&mut self, addr: SocketAddr, reason: EndOfStreamReason) -> Result<(), Error> {
debug!("handle_ca_conn_eos {addr} {reason:?}");
if let Some(e) = self.ca_conn_ress.remove(&addr) {
self.stats.ca_conn_eos_ok().inc();
self.await_ca_conn_jhs.push_back((addr, e.jh));
@@ -898,23 +897,26 @@ impl CaConnSet {
self.stats.ca_conn_eos_unexpected().inc();
warn!("end-of-stream received for non-existent CaConn {addr}");
}
match reason {
EndOfStreamReason::UnspecifiedReason => {
warn!("EndOfStreamReason::UnspecifiedReason");
self.handle_connect_fail(addr)?
}
EndOfStreamReason::Error(e) => {
warn!("received error {addr} {e}");
self.handle_connect_fail(addr)?
}
EndOfStreamReason::ConnectFail => self.handle_connect_fail(addr)?,
EndOfStreamReason::OnCommand => {
warn!("TODO make sure no channel is in state which could trigger health timeout")
}
EndOfStreamReason::RemoteClosed => self.handle_connect_fail(addr)?,
}
// self.remove_channel_status_for_addr(addr)?;
trace2!("still CaConn left {}", self.ca_conn_ress.len());
Ok(())
}
fn handle_ca_conn_err(&mut self, e: super::conn::Error, addr: SocketAddr) -> Result<(), Error> {
use super::conn::Error as E2;
error!("received error {addr} {e}");
match e {
E2::ConnectFail => self.handle_connect_fail(addr)?,
_ => {
// TODO others?
}
}
Ok(())
}
fn handle_connect_fail(&mut self, addr: SocketAddr) -> Result<(), Error> {
// TODO ideally should only remove on EOS.
self.ca_conn_ress.remove(&addr);
@@ -966,11 +968,7 @@ impl CaConnSet {
match &mut v.value {
ChannelStateValue::Active(st2) => match st2 {
ActiveChannelState::WithStatusSeriesId(st3) => match &mut st3.inner {
WithStatusSeriesIdStateInner::WithAddress { addr: a2, state: st4 } => {
if SocketAddr::V4(*a2) == addr {
*st4 = WithAddressState::Backoff(Instant::now());
}
}
WithStatusSeriesIdStateInner::WithAddress { addr: a2, state: st4 } => {}
_ => {}
},
_ => {}
@@ -1044,16 +1042,10 @@ impl CaConnSet {
let ret = Self::ca_conn_item_merge_inner(Box::pin(conn), tx1.clone(), addr, connstats).await;
trace2!("ca_conn_consumer ended {}", addr);
match ret {
Ok(()) => {
Ok(x) => {
debug!("Sending CaConnEventValue::EndOfStream");
tx1.send((
addr,
CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::EndOfStream,
},
))
.await?;
tx1.send((addr, CaConnEvent::new_now(CaConnEventValue::EndOfStream(x))))
.await?;
}
Err(e) => {
error!("ca_conn_item_merge received from inner: {e}");
@@ -1068,39 +1060,37 @@ impl CaConnSet {
tx1: Sender<(SocketAddr, CaConnEvent)>,
addr: SocketAddr,
stats: Arc<CaConnStats>,
) -> Result<(), Error> {
) -> Result<EndOfStreamReason, Error> {
let mut eos_reason = None;
while let Some(item) = conn.next().await {
match item {
Ok(item) => {
stats.item_count.inc();
match item.value {
CaConnEventValue::None
| CaConnEventValue::EchoTimeout
| CaConnEventValue::ConnCommandResult(..)
| CaConnEventValue::ChannelCreateFail(..)
| CaConnEventValue::ChannelStatus(..)
| CaConnEventValue::Error(..) => {
if let Err(e) = tx1.send((addr, item)).await {
error!("can not deliver error {e}");
return Err(Error::with_msg_no_trace("can not deliver error"));
}
}
CaConnEventValue::EndOfStream => break,
}
}
Err(e) => {
let item = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::Error(e),
};
if let Some(x) = eos_reason {
let e = Error::with_msg_no_trace(format!("CaConn delivered already eos {addr} {x:?}"));
error!("{e}");
return Err(e);
}
stats.item_count.inc();
match item.value {
CaConnEventValue::None
| CaConnEventValue::EchoTimeout
| CaConnEventValue::ConnCommandResult(..)
| CaConnEventValue::ChannelCreateFail(..)
| CaConnEventValue::ChannelStatus(..) => {
if let Err(e) = tx1.send((addr, item)).await {
error!("can not deliver error {e}");
return Err(Error::with_msg_no_trace("can not deliver error"));
}
}
CaConnEventValue::EndOfStream(reason) => {
eos_reason = Some(reason);
}
}
}
Ok(())
if let Some(x) = eos_reason {
Ok(x)
} else {
let e = Error::with_msg_no_trace(format!("CaConn gave no reason {addr}"));
Err(e)
}
}
fn push_channel_status(&mut self, item: ChannelStatusItem) -> Result<(), Error> {
@@ -1354,11 +1344,6 @@ impl CaConnSet {
}
}
}
Backoff(ts) => {
if tsnow.saturating_duration_since(*ts) >= CHANNEL_BACKOFF {
*st4 = Unassigned { since: stnow };
}
}
}
}
WithStatusSeriesIdStateInner::NoAddress { since } => {
@@ -1455,9 +1440,6 @@ impl CaConnSet {
}
}
}
WithAddressState::Backoff(ts) => {
backoff += 1;
}
},
WithStatusSeriesIdStateInner::NoAddress { .. } => {
no_address += 1;

View File

@@ -50,12 +50,18 @@ pub struct ConnectionState {
#[derive(Debug, Clone, Serialize)]
pub enum WithAddressState {
Unassigned {
//#[serde(with = "serde_Instant")]
#[serde(with = "humantime_serde")]
since: SystemTime,
},
Assigned(ConnectionState),
Backoff(#[serde(with = "serde_helper::serde_Instant")] Instant),
}
#[derive(Debug, Clone, Serialize)]
pub struct UnassignedState {
#[serde(with = "humantime_serde")]
since: SystemTime,
#[serde(with = "serde_helper::serde_Instant")]
unused_since_ts: Instant,
}
#[derive(Debug, Clone, Serialize)]