From 2a5b7cbccd3dbaccc523446005c773816fedb42c Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 26 Jan 2024 00:07:38 +0100 Subject: [PATCH] Improve CaConn state handling --- netfetch/src/ca/conn.rs | 142 ++++++++++++++++++++++-------------- netfetch/src/ca/connset.rs | 110 ++++++++++++---------------- netfetch/src/ca/statemap.rs | 10 ++- 3 files changed, 142 insertions(+), 120 deletions(-) diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 99e576e..06793d1 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -100,8 +100,9 @@ macro_rules! trace4 { #[derive(Debug, ThisError)] pub enum Error { - ConnectFail, - NoProto, + NoProtocol, + ProtocolError, + IocIssue, Protocol(#[from] crate::ca::proto::Error), Writer(#[from] serieswriter::writer::Error), UnknownCid(Cid), @@ -114,6 +115,7 @@ pub enum Error { ClosedSending, NoProgressNoPending, ShutdownWithQueuesNoProgressNoPending, + Error, } impl err::ToErr for Error { @@ -440,9 +442,8 @@ enum CaConnState { Init, Handshake, PeerReady, - Shutdown, + Shutdown(EndOfStreamReason), EndOfStream, - Error, } impl fmt::Debug for CaConnState { @@ -450,12 +451,11 @@ impl fmt::Debug for CaConnState { match self { Self::Unconnected(since) => fmt.debug_tuple("Unconnected").field(since).finish(), Self::Connecting(since, addr, _) => fmt.debug_tuple("Connecting").field(since).field(addr).finish(), - Self::Init => write!(fmt, "Init"), - Self::Handshake => write!(fmt, "Handshake"), - Self::PeerReady => write!(fmt, "PeerReady"), - Self::Shutdown => write!(fmt, "Shutdown"), - Self::EndOfStream => write!(fmt, "EndOfStream"), - Self::Error => write!(fmt, "Error"), + Self::Init => fmt.debug_tuple("Init").finish(), + Self::Handshake => fmt.debug_tuple("Handshake").finish(), + Self::PeerReady => fmt.debug_tuple("PeerReady").finish(), + Self::Shutdown(v0) => fmt.debug_tuple("Shutdown").field(v0).finish(), + Self::EndOfStream => fmt.debug_tuple("EndOfStream").finish(), } } } @@ -619,6 +619,29 @@ impl ConnCommandResult { } } +#[derive(Debug)] +pub struct CaConnEvent { + pub ts: Instant, + pub value: CaConnEventValue, +} + +impl CaConnEvent { + pub fn new(ts: Instant, value: CaConnEventValue) -> Self { + Self { ts, value } + } + + pub fn err_now(err: Error) -> Self { + Self::new_now(CaConnEventValue::EndOfStream(EndOfStreamReason::Error(err))) + } + + pub fn new_now(value: CaConnEventValue) -> Self { + Self { + ts: Instant::now(), + value, + } + } +} + #[derive(Debug)] pub enum CaConnEventValue { None, @@ -626,14 +649,16 @@ pub enum CaConnEventValue { ConnCommandResult(ConnCommandResult), ChannelStatus(ChannelStatusPartial), ChannelCreateFail(String), - Error(Error), - EndOfStream, + EndOfStream(EndOfStreamReason), } #[derive(Debug)] -pub struct CaConnEvent { - pub ts: Instant, - pub value: CaConnEventValue, +pub enum EndOfStreamReason { + UnspecifiedReason, + Error(Error), + ConnectFail, + OnCommand, + RemoteClosed, } pub struct CaConnOpts { @@ -787,7 +812,7 @@ impl CaConn { } fn is_shutdown(&self) -> bool { - if let CaConnState::Shutdown = self.state { + if let CaConnState::Shutdown(..) = self.state { true } else { false @@ -795,25 +820,37 @@ impl CaConn { } fn trigger_shutdown(&mut self, channel_reason: ChannelStatusClosedReason) { - self.state = CaConnState::Shutdown; self.proto = None; match &channel_reason { - ChannelStatusClosedReason::ShutdownCommand => {} - ChannelStatusClosedReason::ChannelRemove => {} - ChannelStatusClosedReason::ProtocolError => {} - ChannelStatusClosedReason::FrequencyQuota => {} - ChannelStatusClosedReason::BandwidthQuota => {} - ChannelStatusClosedReason::InternalError => {} - ChannelStatusClosedReason::IocTimeout => {} - ChannelStatusClosedReason::NoProtocol => {} - ChannelStatusClosedReason::ProtocolDone => {} ChannelStatusClosedReason::ConnectFail => { - debug!("emit status ConnectFail"); - let item = CaConnEvent { - ts: Instant::now(), - value: CaConnEventValue::Error(Error::ConnectFail), - }; - self.ca_conn_event_out_queue.push_back(item); + self.state = CaConnState::Shutdown(EndOfStreamReason::ConnectFail); + } + ChannelStatusClosedReason::ShutdownCommand => { + self.state = CaConnState::Shutdown(EndOfStreamReason::OnCommand); + } + ChannelStatusClosedReason::ChannelRemove => { + self.state = CaConnState::Shutdown(EndOfStreamReason::ConnectFail); + } + ChannelStatusClosedReason::ProtocolError => { + self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::ProtocolError)); + } + ChannelStatusClosedReason::FrequencyQuota => { + self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::IocIssue)); + } + ChannelStatusClosedReason::BandwidthQuota => { + self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::IocIssue)); + } + ChannelStatusClosedReason::InternalError => { + self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::Error)); + } + ChannelStatusClosedReason::IocTimeout => { + self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::IocIssue)); + } + ChannelStatusClosedReason::NoProtocol => { + self.state = CaConnState::Shutdown(EndOfStreamReason::Error(Error::NoProtocol)); + } + ChannelStatusClosedReason::ProtocolDone => { + self.state = CaConnState::Shutdown(EndOfStreamReason::RemoteClosed); } } self.channel_state_on_shutdown(channel_reason); @@ -1827,7 +1864,7 @@ impl CaConn { let proto = if let Some(x) = self.proto.as_mut() { x } else { - return Ready(Some(Err(Error::NoProto))); + return Ready(Some(Err(Error::NoProtocol))); }; let res = match proto.poll_next_unpin(cx) { Ready(Some(Ok(k))) => { @@ -2125,9 +2162,8 @@ impl CaConn { Pending => Ok(Pending), } } - CaConnState::Shutdown => Ok(Ready(None)), + CaConnState::Shutdown(..) => Ok(Ready(None)), CaConnState::EndOfStream => Ok(Ready(None)), - CaConnState::Error => Ok(Ready(None)), } } @@ -2209,9 +2245,8 @@ impl CaConn { CaConnState::Init => {} CaConnState::Handshake => {} CaConnState::PeerReady => {} - CaConnState::Shutdown => {} + CaConnState::Shutdown(..) => {} CaConnState::EndOfStream => {} - CaConnState::Error => {} } Ok(()) } @@ -2341,7 +2376,7 @@ macro_rules! flush_queue { Ok(Pending) => { *$have.1 |= true; } - Err(e) => break Ready(Some(Err(e))), + Err(e) => break Ready(Some(CaConnEvent::err_now(e))), } }; } @@ -2361,7 +2396,7 @@ fn send_batched(qu: &mut VecDeque) -> Option> } impl Stream for CaConn { - type Item = Result; + type Item = CaConnEvent; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -2387,9 +2422,8 @@ impl Stream for CaConn { if let CaConnState::EndOfStream = self.state { break Ready(None); - } - if let Some(item) = self.ca_conn_event_out_queue.pop_front() { - break Ready(Some(Ok(item))); + } else if let Some(item) = self.ca_conn_event_out_queue.pop_front() { + break Ready(Some(item)); } let lts2 = Instant::now(); @@ -2401,7 +2435,7 @@ impl Stream for CaConn { Ok(Pending) => { have_pending = true; } - Err(e) => break Ready(Some(Err(e))), + Err(e) => break Ready(Some(CaConnEvent::err_now(e))), } { @@ -2453,7 +2487,7 @@ impl Stream for CaConn { Ok(Pending) => { have_pending = true; } - Err(e) => break Ready(Some(Err(e))), + Err(e) => break Ready(Some(CaConnEvent::err_now(e))), } let lts5 = Instant::now(); @@ -2466,7 +2500,7 @@ impl Stream for CaConn { Ok(Pending) => { have_pending = true; } - Err(e) => break Ready(Some(Err(e))), + Err(e) => break Ready(Some(CaConnEvent::err_now(e))), } let lts6 = Instant::now(); @@ -2482,7 +2516,7 @@ impl Stream for CaConn { Err(e) => { error!("{e}"); self.state = CaConnState::EndOfStream; - break Ready(Some(Err(e))); + break Ready(Some(CaConnEvent::err_now(e))); } } @@ -2518,8 +2552,11 @@ impl Stream for CaConn { break if self.is_shutdown() { if self.queues_out_flushed() { debug!("is_shutdown queues_out_flushed set EOS {}", self.remote_addr_dbg); - self.state = CaConnState::EndOfStream; - Ready(None) + if let CaConnState::Shutdown(x) = std::mem::replace(&mut self.state, CaConnState::EndOfStream) { + Ready(Some(CaConnEvent::new_now(CaConnEventValue::EndOfStream(x)))) + } else { + continue; + } } else { if have_progress { debug!("is_shutdown NOT queues_out_flushed prog {}", self.remote_addr_dbg); @@ -2535,7 +2572,7 @@ impl Stream for CaConn { error!("shutting down, queues not flushed, no progress, no pending"); self.stats.logic_error().inc(); let e = Error::ShutdownWithQueuesNoProgressNoPending; - Ready(Some(Err(e))) + Ready(Some(CaConnEvent::err_now(e))) } } } else { @@ -2543,10 +2580,7 @@ impl Stream for CaConn { if poll_ts1.elapsed() > Duration::from_millis(5) { self.stats.poll_wake_break().inc(); cx.waker().wake_by_ref(); - break Ready(Some(Ok(CaConnEvent { - ts: poll_ts1, - value: CaConnEventValue::None, - }))); + break Ready(Some(CaConnEvent::new(self.poll_tsnow, CaConnEventValue::None))); } else { self.stats.poll_reloop().inc(); reloops += 1; @@ -2558,7 +2592,7 @@ impl Stream for CaConn { } else { self.stats.poll_no_progress_no_pending().inc(); let e = Error::NoProgressNoPending; - Ready(Some(Err(e))) + Ready(Some(CaConnEvent::err_now(e))) } }; }; diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 406f75c..5a42364 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -1,3 +1,4 @@ +use super::conn::EndOfStreamReason; use super::findioc::FindIocRes; use crate::ca::conn; use crate::ca::statemap; @@ -84,7 +85,6 @@ const MAYBE_WRONG_ADDRESS_STAY: Duration = Duration::from_millis(4000); const SEARCH_PENDING_TIMEOUT: Duration = Duration::from_millis(30000); const CHANNEL_HEALTH_TIMEOUT: Duration = Duration::from_millis(30000); const CHANNEL_UNASSIGNED_TIMEOUT: Duration = Duration::from_millis(0); -const CHANNEL_BACKOFF: Duration = Duration::from_millis(10000); const CHANNEL_MAX_WITHOUT_HEALTH_UPDATE: usize = 3000000; #[allow(unused)] @@ -552,8 +552,7 @@ impl CaConnSet { CaConnEventValue::ConnCommandResult(x) => self.handle_conn_command_result(addr, x), CaConnEventValue::ChannelCreateFail(x) => self.handle_channel_create_fail(addr, x), CaConnEventValue::ChannelStatus(st) => self.apply_ca_conn_health_update(addr, st), - CaConnEventValue::Error(e) => self.handle_ca_conn_err(e, addr), - CaConnEventValue::EndOfStream => self.handle_ca_conn_eos(addr), + CaConnEventValue::EndOfStream(reason) => self.handle_ca_conn_eos(addr, reason), } } @@ -889,8 +888,8 @@ impl CaConnSet { Ok(()) } - fn handle_ca_conn_eos(&mut self, addr: SocketAddr) -> Result<(), Error> { - debug!("handle_ca_conn_eos {addr}"); + fn handle_ca_conn_eos(&mut self, addr: SocketAddr, reason: EndOfStreamReason) -> Result<(), Error> { + debug!("handle_ca_conn_eos {addr} {reason:?}"); if let Some(e) = self.ca_conn_ress.remove(&addr) { self.stats.ca_conn_eos_ok().inc(); self.await_ca_conn_jhs.push_back((addr, e.jh)); @@ -898,23 +897,26 @@ impl CaConnSet { self.stats.ca_conn_eos_unexpected().inc(); warn!("end-of-stream received for non-existent CaConn {addr}"); } + match reason { + EndOfStreamReason::UnspecifiedReason => { + warn!("EndOfStreamReason::UnspecifiedReason"); + self.handle_connect_fail(addr)? + } + EndOfStreamReason::Error(e) => { + warn!("received error {addr} {e}"); + self.handle_connect_fail(addr)? + } + EndOfStreamReason::ConnectFail => self.handle_connect_fail(addr)?, + EndOfStreamReason::OnCommand => { + warn!("TODO make sure no channel is in state which could trigger health timeout") + } + EndOfStreamReason::RemoteClosed => self.handle_connect_fail(addr)?, + } // self.remove_channel_status_for_addr(addr)?; trace2!("still CaConn left {}", self.ca_conn_ress.len()); Ok(()) } - fn handle_ca_conn_err(&mut self, e: super::conn::Error, addr: SocketAddr) -> Result<(), Error> { - use super::conn::Error as E2; - error!("received error {addr} {e}"); - match e { - E2::ConnectFail => self.handle_connect_fail(addr)?, - _ => { - // TODO others? - } - } - Ok(()) - } - fn handle_connect_fail(&mut self, addr: SocketAddr) -> Result<(), Error> { // TODO ideally should only remove on EOS. self.ca_conn_ress.remove(&addr); @@ -966,11 +968,7 @@ impl CaConnSet { match &mut v.value { ChannelStateValue::Active(st2) => match st2 { ActiveChannelState::WithStatusSeriesId(st3) => match &mut st3.inner { - WithStatusSeriesIdStateInner::WithAddress { addr: a2, state: st4 } => { - if SocketAddr::V4(*a2) == addr { - *st4 = WithAddressState::Backoff(Instant::now()); - } - } + WithStatusSeriesIdStateInner::WithAddress { addr: a2, state: st4 } => {} _ => {} }, _ => {} @@ -1044,16 +1042,10 @@ impl CaConnSet { let ret = Self::ca_conn_item_merge_inner(Box::pin(conn), tx1.clone(), addr, connstats).await; trace2!("ca_conn_consumer ended {}", addr); match ret { - Ok(()) => { + Ok(x) => { debug!("Sending CaConnEventValue::EndOfStream"); - tx1.send(( - addr, - CaConnEvent { - ts: Instant::now(), - value: CaConnEventValue::EndOfStream, - }, - )) - .await?; + tx1.send((addr, CaConnEvent::new_now(CaConnEventValue::EndOfStream(x)))) + .await?; } Err(e) => { error!("ca_conn_item_merge received from inner: {e}"); @@ -1068,39 +1060,37 @@ impl CaConnSet { tx1: Sender<(SocketAddr, CaConnEvent)>, addr: SocketAddr, stats: Arc, - ) -> Result<(), Error> { + ) -> Result { + let mut eos_reason = None; while let Some(item) = conn.next().await { - match item { - Ok(item) => { - stats.item_count.inc(); - match item.value { - CaConnEventValue::None - | CaConnEventValue::EchoTimeout - | CaConnEventValue::ConnCommandResult(..) - | CaConnEventValue::ChannelCreateFail(..) - | CaConnEventValue::ChannelStatus(..) - | CaConnEventValue::Error(..) => { - if let Err(e) = tx1.send((addr, item)).await { - error!("can not deliver error {e}"); - return Err(Error::with_msg_no_trace("can not deliver error")); - } - } - CaConnEventValue::EndOfStream => break, - } - } - Err(e) => { - let item = CaConnEvent { - ts: Instant::now(), - value: CaConnEventValue::Error(e), - }; + if let Some(x) = eos_reason { + let e = Error::with_msg_no_trace(format!("CaConn delivered already eos {addr} {x:?}")); + error!("{e}"); + return Err(e); + } + stats.item_count.inc(); + match item.value { + CaConnEventValue::None + | CaConnEventValue::EchoTimeout + | CaConnEventValue::ConnCommandResult(..) + | CaConnEventValue::ChannelCreateFail(..) + | CaConnEventValue::ChannelStatus(..) => { if let Err(e) = tx1.send((addr, item)).await { error!("can not deliver error {e}"); return Err(Error::with_msg_no_trace("can not deliver error")); } } + CaConnEventValue::EndOfStream(reason) => { + eos_reason = Some(reason); + } } } - Ok(()) + if let Some(x) = eos_reason { + Ok(x) + } else { + let e = Error::with_msg_no_trace(format!("CaConn gave no reason {addr}")); + Err(e) + } } fn push_channel_status(&mut self, item: ChannelStatusItem) -> Result<(), Error> { @@ -1354,11 +1344,6 @@ impl CaConnSet { } } } - Backoff(ts) => { - if tsnow.saturating_duration_since(*ts) >= CHANNEL_BACKOFF { - *st4 = Unassigned { since: stnow }; - } - } } } WithStatusSeriesIdStateInner::NoAddress { since } => { @@ -1455,9 +1440,6 @@ impl CaConnSet { } } } - WithAddressState::Backoff(ts) => { - backoff += 1; - } }, WithStatusSeriesIdStateInner::NoAddress { .. } => { no_address += 1; diff --git a/netfetch/src/ca/statemap.rs b/netfetch/src/ca/statemap.rs index ee2bf67..4b9728b 100644 --- a/netfetch/src/ca/statemap.rs +++ b/netfetch/src/ca/statemap.rs @@ -50,12 +50,18 @@ pub struct ConnectionState { #[derive(Debug, Clone, Serialize)] pub enum WithAddressState { Unassigned { - //#[serde(with = "serde_Instant")] #[serde(with = "humantime_serde")] since: SystemTime, }, Assigned(ConnectionState), - Backoff(#[serde(with = "serde_helper::serde_Instant")] Instant), +} + +#[derive(Debug, Clone, Serialize)] +pub struct UnassignedState { + #[serde(with = "humantime_serde")] + since: SystemTime, + #[serde(with = "serde_helper::serde_Instant")] + unused_since_ts: Instant, } #[derive(Debug, Clone, Serialize)]