From edbbb4f7510613dd2004c6d07246f29236cc7be4 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 24 Jan 2024 16:32:16 +0100 Subject: [PATCH] State transitions --- netfetch/Cargo.toml | 1 + netfetch/src/ca/conn.rs | 376 ++++++++++++++++++------------------ netfetch/src/ca/connset.rs | 367 ++++++++++++++++++----------------- netfetch/src/ca/proto.rs | 41 ++-- netfetch/src/ca/statemap.rs | 7 +- serde_helper/src/lib.rs | 2 +- stats/src/stats.rs | 253 ++++++++++++------------ 7 files changed, 535 insertions(+), 512 deletions(-) diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index ac2209a..d70e312 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -39,6 +39,7 @@ serieswriter = { path = "../serieswriter" } stats = { path = "../stats" } scywr = { path = "../scywr" } dbpg = { path = "../dbpg" } +serde_helper = { path = "../serde_helper" } ingest-linux = { path = "../ingest-linux" } err = { path = "../../daqbuffer/crates/err" } netpod = { path = "../../daqbuffer/crates/netpod" } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index d280a9c..99e576e 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -9,7 +9,8 @@ use async_channel::Receiver; use async_channel::Sender; use core::fmt; use dbpg::seriesbychannel::ChannelInfoQuery; -use err::Error; +use err::thiserror; +use err::ThisError; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; @@ -97,6 +98,30 @@ macro_rules! trace4 { }; } +#[derive(Debug, ThisError)] +pub enum Error { + ConnectFail, + NoProto, + Protocol(#[from] crate::ca::proto::Error), + Writer(#[from] serieswriter::writer::Error), + UnknownCid(Cid), + NoNameForCid(Cid), + CreateChannelBadState, + CommonError(#[from] err::Error), + LoopInnerLogicError, + NoSender, + NotSending, + ClosedSending, + NoProgressNoPending, + ShutdownWithQueuesNoProgressNoPending, +} + +impl err::ToErr for Error { + fn to_err(self) -> err::Error { + err::Error::with_msg_no_trace(self.to_string()) + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub enum ChannelConnectedInfo { Disconnected, @@ -415,9 +440,9 @@ enum CaConnState { Init, Handshake, PeerReady, - Wait(Pin + Send>>), Shutdown, EndOfStream, + Error, } impl fmt::Debug for CaConnState { @@ -428,9 +453,9 @@ impl fmt::Debug for CaConnState { Self::Init => write!(fmt, "Init"), Self::Handshake => write!(fmt, "Handshake"), Self::PeerReady => write!(fmt, "PeerReady"), - Self::Wait(_) => fmt.debug_tuple("Wait").finish(), Self::Shutdown => write!(fmt, "Shutdown"), Self::EndOfStream => write!(fmt, "EndOfStream"), + Self::Error => write!(fmt, "Error"), } } } @@ -600,10 +625,9 @@ pub enum CaConnEventValue { EchoTimeout, ConnCommandResult(ConnCommandResult), ChannelStatus(ChannelStatusPartial), - QueryItem(QueryItem), ChannelCreateFail(String), + Error(Error), EndOfStream, - ConnectFail, } #[derive(Debug)] @@ -787,7 +811,7 @@ impl CaConn { debug!("emit status ConnectFail"); let item = CaConnEvent { ts: Instant::now(), - value: CaConnEventValue::ConnectFail, + value: CaConnEventValue::Error(Error::ConnectFail), }; self.ca_conn_event_out_queue.push_back(item); } @@ -974,13 +998,9 @@ impl CaConn { let jobid = res.0; // by convention: let cid = Cid(jobid.0 as _); - match res.1 { - Ok(wr) => { - self.handle_writer_establish_inner(cid, wr)?; - Ok(Ready(Some(()))) - } - Err(e) => Err(Error::from_string(e.to_string())), - } + let wr = res.1?; + self.handle_writer_establish_inner(cid, wr)?; + Ok(Ready(Some(()))) } Ready(None) => { error!("writer_establish queue closed"); @@ -1010,41 +1030,57 @@ impl CaConn { }); self.insert_item_queue.push_back(item); } - let subid = { - let subid = self.subid_store.next(); - self.cid_by_subid.insert(subid, cid); - trace!( - "new {:?} for {:?} chst {:?} {:?}", - subid, - cid, - st2.channel.cid, - st2.channel.sid - ); - subid - }; - { - trace!("send out EventAdd for {cid:?}"); - let ty = CaMsgTy::EventAdd(EventAdd { - sid: st2.channel.sid.to_u32(), - data_type: st2.channel.ca_dbr_type, - data_count: st2.channel.ca_dbr_count, - subid: subid.to_u32(), - }); - let msg = CaMsg::from_ty_ts(ty, self.poll_tsnow); - let proto = self.proto.as_mut().unwrap(); - proto.push_out(msg); - } - let created_state = WritableState { - tsbeg: self.poll_tsnow, - channel: std::mem::replace(&mut st2.channel, CreatedState::dummy()), - writer, - reading: ReadingState::EnableMonitoring(EnableMonitoringState { + let name = self.name_by_cid.get(&st2.channel.cid).map(|x| x.as_str()).unwrap_or(""); + if name.starts_with("TEST:PEAKING:") { + let created_state = WritableState { tsbeg: self.poll_tsnow, - subid, - }), - }; - *chst = ChannelState::Writable(created_state); - Ok(()) + channel: std::mem::replace(&mut st2.channel, CreatedState::dummy()), + writer, + reading: ReadingState::Polling(PollingState { + tsbeg: self.poll_tsnow, + poll_ivl: Duration::from_millis(1000), + tick: PollTickState::Idle(self.poll_tsnow), + }), + }; + *chst = ChannelState::Writable(created_state); + Ok(()) + } else { + let subid = { + let subid = self.subid_store.next(); + self.cid_by_subid.insert(subid, cid); + trace!( + "new {:?} for {:?} chst {:?} {:?}", + subid, + cid, + st2.channel.cid, + st2.channel.sid + ); + subid + }; + { + trace!("send out EventAdd for {cid:?}"); + let ty = CaMsgTy::EventAdd(EventAdd { + sid: st2.channel.sid.to_u32(), + data_type: st2.channel.ca_dbr_type, + data_count: st2.channel.ca_dbr_count, + subid: subid.to_u32(), + }); + let msg = CaMsg::from_ty_ts(ty, self.poll_tsnow); + let proto = self.proto.as_mut().unwrap(); + proto.push_out(msg); + } + let created_state = WritableState { + tsbeg: self.poll_tsnow, + channel: std::mem::replace(&mut st2.channel, CreatedState::dummy()), + writer, + reading: ReadingState::EnableMonitoring(EnableMonitoringState { + tsbeg: self.poll_tsnow, + subid, + }), + }; + *chst = ChannelState::Writable(created_state); + Ok(()) + } } else { warn!("TODO handle_series_lookup_result channel in bad state, reset"); Ok(()) @@ -1249,9 +1285,8 @@ impl CaConn { let cid = if let Some(x) = self.cid_by_subid.get(&subid) { *x } else { - let e = Error::with_msg_no_trace("unknown {subid:?}"); - error!("{e}"); - return Err(e); + self.stats.unknown_subid().inc(); + return Ok(()); }; let ch_s = if let Some(x) = self.channels.get_mut(&cid) { x @@ -1263,9 +1298,7 @@ impl CaConn { // If we don't have it in the "closed" btree, then close connection to the IOC and count // as logic error. // Close connection to the IOC. Cout as logic error. - let e = Error::with_msg_no_trace(format!( - "TODO handle_event_add_res can not find channel for {cid:?} {subid:?}" - )); + let e = Error::UnknownCid(cid); error!("{e}"); return Err(e); }; @@ -1317,7 +1350,7 @@ impl CaConn { ChannelState::Writable(st) => { let stnow = self.tmp_ts_poll; let crst = &mut st.channel; - let stwin_ts = stnow.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() / 10; + let stwin_ts = stnow.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() / 4; if crst.stwin_ts != stwin_ts { crst.stwin_ts = stwin_ts; crst.stwin_count = 0; @@ -1325,15 +1358,24 @@ impl CaConn { { crst.stwin_count += 1; crst.stwin_bytes += ev.payload_len; - if crst.stwin_count > 5 || crst.stwin_bytes > 1024 * 1024 * 1 { + if crst.stwin_count > 30000 || crst.stwin_bytes > 1024 * 1024 * 500 { let subid = match &mut st.reading { ReadingState::EnableMonitoring(x) => Some(x.subid.clone()), ReadingState::Monitoring(x) => Some(x.subid.clone()), - ReadingState::StopMonitoringForPolling(_) => None, - ReadingState::Polling(_) => None, + ReadingState::StopMonitoringForPolling(_) => { + self.stats.transition_to_polling_bad_state().inc(); + None + } + ReadingState::Polling(_) => { + self.stats.transition_to_polling_already_in().inc(); + None + } }; if let Some(subid) = subid { + self.stats.transition_to_polling().inc(); self.transition_to_polling(subid, tsnow)?; + } else { + self.stats.transition_to_polling_bad_state().inc(); } return Ok(()); } @@ -1413,7 +1455,7 @@ impl CaConn { ReadingState::StopMonitoringForPolling(..) => { st.reading = ReadingState::Polling(PollingState { tsbeg: tsnow, - poll_ivl: Duration::from_millis(2000), + poll_ivl: Duration::from_millis(1000), tick: PollTickState::Idle(tsnow), }); } @@ -1511,7 +1553,8 @@ impl CaConn { } } } else { - warn!("unknown {ioid:?}"); + // warn!("unknown {ioid:?}"); + self.stats.unknown_ioid().inc(); } Ok(()) } @@ -1552,35 +1595,36 @@ impl CaConn { Self::check_ev_value_data(&value.data, writer.scalar_type())?; { let val: DataValue = value.data.into(); - writer - .write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iiq) - .map_err(|e| Error::from_string(e))?; + writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iiq)?; } Ok(()) } else { stats.channel_fast_item_drop.inc(); - if tsnow.duration_since(crst.insert_recv_ivl_last) >= Duration::from_millis(10000) { - crst.insert_recv_ivl_last = tsnow; - let ema = crst.insert_item_ivl_ema.ema(); - let item = IvlItem { - series: series.clone(), - ts, - ema: ema.ema(), - emd: ema.emv().sqrt(), - }; - iiq.push_back(QueryItem::Ivl(item)); + // TODO + if false { + if tsnow.duration_since(crst.insert_recv_ivl_last) >= Duration::from_millis(10000) { + crst.insert_recv_ivl_last = tsnow; + let ema = crst.insert_item_ivl_ema.ema(); + let item = IvlItem { + series: series.clone(), + ts, + ema: ema.ema(), + emd: ema.emv().sqrt(), + }; + iiq.push_back(QueryItem::Ivl(item)); + } + if false && crst.muted_before == 0 { + let ema = crst.insert_item_ivl_ema.ema(); + let item = MuteItem { + series: series.clone(), + ts, + ema: ema.ema(), + emd: ema.emv().sqrt(), + }; + iiq.push_back(QueryItem::Mute(item)); + } + crst.muted_before = 1; } - if false && crst.muted_before == 0 { - let ema = crst.insert_item_ivl_ema.ema(); - let item = MuteItem { - series: series.clone(), - ts, - ema: ema.ema(), - emd: ema.emv().sqrt(), - }; - iiq.push_back(QueryItem::Mute(item)); - } - crst.muted_before = 1; Ok(()) } } @@ -1637,7 +1681,7 @@ impl CaConn { } CaItem::Msg(msg) => match msg.ty { CaMsgTy::VersionRes(n) => { - debug!("see incoming {:?} {:?}", self.remote_addr_dbg, msg); + // debug!("see incoming {:?} {:?}", self.remote_addr_dbg, msg); if n < 12 || n > 13 { error!("See some unexpected version {n} channel search may not work."); Ready(Some(Ok(()))) @@ -1657,13 +1701,13 @@ impl CaConn { }, Err(e) => { error!("got error item from CaProto {e:?}"); - Ready(Some(Err(e.to_string().into()))) + Ready(Some(Err(e.into()))) } }, Ready(None) => { warn!("handle_conn_listen CaProto is done {:?}", self.remote_addr_dbg); - self.state = CaConnState::Wait(wait_fut(self.backoff_next())); self.proto = None; + self.state = CaConnState::EndOfStream; Ready(None) } Pending => Pending, @@ -1681,9 +1725,7 @@ impl CaConn { match self.channels.get(&cid).unwrap() { ChannelState::Init(cssid) => { let cssid = cssid.clone(); - let name = self - .name_by_cid(cid) - .ok_or_else(|| Error::with_msg_no_trace("name for cid not known")); + let name = self.name_by_cid(cid).ok_or_else(|| Error::UnknownCid(cid)); let name = match name { Ok(k) => k.to_string(), Err(e) => return Err(e), @@ -1745,6 +1787,7 @@ impl CaConn { do_wake_again = true; self.proto.as_mut().unwrap().push_out(msg); st3.tick = PollTickState::Wait(tsnow, ioid); + self.stats.caget_issued().inc(); } } PollTickState::Wait(x, ioid) => { @@ -1753,6 +1796,7 @@ impl CaConn { self.stats.caget_timeout().inc(); // warn!("channel caget timeout"); // std::process::exit(1); + st3.tick = PollTickState::Idle(tsnow); } } }, @@ -1783,7 +1827,7 @@ impl CaConn { let proto = if let Some(x) = self.proto.as_mut() { x } else { - return Ready(Some(Err(Error::with_msg_no_trace("handle_peer_ready but no proto")))); + return Ready(Some(Err(Error::NoProto))); }; let res = match proto.poll_next_unpin(cx) { Ready(Some(Ok(k))) => { @@ -1897,7 +1941,7 @@ impl CaConn { } Pending => Pending, }; - res.map_err(|e| Error::from(e.to_string())) + res.map_err(Into::into) } fn handle_create_chan_res(&mut self, k: proto::CreateChanRes, tsnow: Instant) -> Result<(), Error> { @@ -1909,7 +1953,7 @@ impl CaConn { let name = if let Some(x) = name_by_cid.get(&cid) { x.to_string() } else { - return Err(Error::with_msg_no_trace(format!("no name for {cid:?}"))); + return Err(Error::NoNameForCid(cid)); }; trace!("handle_create_chan_res {k:?} {name:?}"); // TODO handle not-found error: @@ -1919,7 +1963,7 @@ impl CaConn { _ => { // TODO handle in better way: // Remove channel and emit notice that channel is removed with reason. - let e = Error::with_msg_no_trace("handle_peer_ready bad state"); + let e = Error::CreateChannelBadState; return Err(e); } }; @@ -1974,8 +2018,8 @@ impl CaConn { fn _test_control_flow(&mut self, _cx: &mut Context) -> ControlFlow>> { use ControlFlow::*; use Poll::*; - let e = Error::with_msg_no_trace(format!("test")); - //Err(e)?; + let e = Error::CreateChannelBadState; + // Err(e)?; let _ = e; Break(Pending) } @@ -2018,43 +2062,28 @@ impl CaConn { Ok(Ready(Some(()))) } Ok(Err(e)) => { - trace!("error connect to {addr} {e}"); - if true { - let addr = addr.clone(); - self.insert_item_queue.push_back(QueryItem::ConnectionStatus( - ConnectionStatusItem { - ts: self.tmp_ts_poll, - addr, - status: ConnectionStatus::ConnectError, - }, - )); - self.trigger_shutdown(ChannelStatusClosedReason::ConnectFail); - } else { - // TODO log with exponential backoff - let dt = self.backoff_next(); - self.state = CaConnState::Wait(wait_fut(dt)); - self.proto = None; - } + debug!("error connect to {addr} {e}"); + let addr = addr.clone(); + self.insert_item_queue + .push_back(QueryItem::ConnectionStatus(ConnectionStatusItem { + ts: self.tmp_ts_poll, + addr, + status: ConnectionStatus::ConnectError, + })); + self.trigger_shutdown(ChannelStatusClosedReason::ConnectFail); Ok(Ready(Some(()))) } Err(e) => { // TODO log with exponential backoff - trace!("timeout connect to {addr} {e}"); - if true { - let addr = addr.clone(); - self.insert_item_queue.push_back(QueryItem::ConnectionStatus( - ConnectionStatusItem { - ts: self.tmp_ts_poll, - addr, - status: ConnectionStatus::ConnectTimeout, - }, - )); - self.trigger_shutdown(ChannelStatusClosedReason::ConnectFail); - } else { - let dt = self.backoff_next(); - self.state = CaConnState::Wait(wait_fut(dt)); - self.proto = None; - } + debug!("timeout connect to {addr} {e}"); + let addr = addr.clone(); + self.insert_item_queue + .push_back(QueryItem::ConnectionStatus(ConnectionStatusItem { + ts: self.tmp_ts_poll, + addr, + status: ConnectionStatus::ConnectTimeout, + })); + self.trigger_shutdown(ChannelStatusClosedReason::ConnectFail); Ok(Ready(Some(()))) } } @@ -2096,25 +2125,9 @@ impl CaConn { Pending => Ok(Pending), } } - CaConnState::Wait(inst) => { - trace4!("Wait"); - match inst.poll_unpin(cx) { - Ready(_) => { - self.state = CaConnState::Unconnected(Instant::now()); - self.proto = None; - Ok(Ready(Some(()))) - } - Pending => Ok(Pending), - } - } - CaConnState::Shutdown => { - trace4!("Shutdown"); - Ok(Ready(None)) - } - CaConnState::EndOfStream => { - trace4!("EndOfStream"); - Ok(Ready(None)) - } + CaConnState::Shutdown => Ok(Ready(None)), + CaConnState::EndOfStream => Ok(Ready(None)), + CaConnState::Error => Ok(Ready(None)), } } @@ -2137,7 +2150,7 @@ impl CaConn { } Ready(None) => { error!("handle_conn_state yields {x:?}"); - return Err(Error::with_msg_no_trace("logic error")); + return Err(Error::LoopInnerLogicError); } Pending => return Ok(Pending), }, @@ -2196,9 +2209,9 @@ impl CaConn { CaConnState::Init => {} CaConnState::Handshake => {} CaConnState::PeerReady => {} - CaConnState::Wait(_) => {} CaConnState::Shutdown => {} CaConnState::EndOfStream => {} + CaConnState::Error => {} } Ok(()) } @@ -2231,9 +2244,7 @@ impl CaConn { fn tick_writers(&mut self) -> Result<(), Error> { for (k, st) in &mut self.channels { if let ChannelState::Writable(st2) = st { - st2.writer - .tick(&mut self.insert_item_queue) - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + st2.writer.tick(&mut self.insert_item_queue)?; } } Ok(()) @@ -2244,13 +2255,15 @@ impl CaConn { } fn queues_out_flushed(&self) -> bool { - self.queues_async_out_flushed() && self.ca_conn_event_out_queue.is_empty() - } - - fn queues_async_out_flushed(&self) -> bool { - // self.channel_info_query_queue.is_empty() && self.channel_info_query_sending.is_idle() - // TODO re-enable later - self.insert_item_queue.is_empty() && self.storage_insert_sender.is_idle() + debug!( + "async out flushed iiq {} {} caout {}", + self.insert_item_queue.is_empty(), + self.storage_insert_sender.is_idle(), + self.ca_conn_event_out_queue.is_empty() + ); + self.insert_item_queue.is_empty() + && self.storage_insert_sender.is_idle() + && self.ca_conn_event_out_queue.is_empty() } fn attempt_flush_queue( @@ -2276,7 +2289,7 @@ impl CaConn { break; } if !sp.has_sender() { - return Err(Error::with_msg_no_trace(format!("flush queue {id} no sender"))); + return Err(Error::NoSender); } if sp.is_idle() { if let Some(item) = qu_to_si(qu) { @@ -2292,16 +2305,18 @@ impl CaConn { have_progress = true; } Ready(Err(e)) => { - let e = Error::with_msg_no_trace(format!("flush queue {id} {e}")); - return Err(e); + use crate::senderpolling::Error as SpErr; + match e { + SpErr::NoSendInProgress => return Err(Error::NotSending), + SpErr::Closed(_) => return Err(Error::ClosedSending), + } } Pending => { return Ok(Pending); } } } else { - let e = Error::with_msg_no_trace(format!("flush queue {id} not sending")); - return Err(e); + return Err(Error::NotSending); } } if have_progress { @@ -2390,22 +2405,11 @@ impl Stream for CaConn { } { - // TODO use stats histogram type to test the native prometheus histogram feature - let qu = &self.insert_item_queue; - let stats = &self.stats; - let n = qu.len(); - if n >= 128 { - stats.storage_queue_above_128().inc(); - } else if n >= 32 { - stats.storage_queue_above_32().inc(); - } else if n >= 8 { - stats.storage_queue_above_8().inc(); - } + let iiq = &self.insert_item_queue; + self.stats.iiq_len().ingest(iiq.len() as u32); } - let lts3; - - if !self.is_shutdown() { + { let stats2 = self.stats.clone(); let stats_fn = move |item: &VecDeque| { stats2.iiq_batch_len().ingest(item.len() as u32); @@ -2421,8 +2425,11 @@ impl Stream for CaConn { cx, stats_fn ); - lts3 = Instant::now(); + } + let lts3 = Instant::now(); + + if !self.is_shutdown() { flush_queue!( self, writer_establish_qu, @@ -2434,8 +2441,6 @@ impl Stream for CaConn { cx, |_| {} ); - } else { - lts3 = Instant::now(); } let lts4 = Instant::now(); @@ -2512,23 +2517,24 @@ impl Stream for CaConn { break if self.is_shutdown() { if self.queues_out_flushed() { - // debug!("end of stream {}", self.remote_addr_dbg); + debug!("is_shutdown queues_out_flushed set EOS {}", self.remote_addr_dbg); self.state = CaConnState::EndOfStream; Ready(None) } else { - // debug!("queues_out_flushed false"); if have_progress { + debug!("is_shutdown NOT queues_out_flushed prog {}", self.remote_addr_dbg); self.stats.poll_reloop().inc(); reloops += 1; continue; } else if have_pending { + debug!("is_shutdown NOT queues_out_flushed pend {}", self.remote_addr_dbg); self.stats.poll_pending().inc(); Pending } else { // TODO error error!("shutting down, queues not flushed, no progress, no pending"); self.stats.logic_error().inc(); - let e = Error::with_msg_no_trace("shutting down, queues not flushed, no progress, no pending"); + let e = Error::ShutdownWithQueuesNoProgressNoPending; Ready(Some(Err(e))) } } @@ -2551,7 +2557,7 @@ impl Stream for CaConn { Pending } else { self.stats.poll_no_progress_no_pending().inc(); - let e = Error::with_msg_no_trace("no progress no pending"); + let e = Error::NoProgressNoPending; Ready(Some(Err(e))) } }; diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index d6e4ca6..406f75c 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -11,7 +11,6 @@ use crate::senderpolling::SenderPolling; use crate::throttletrace::ThrottleTrace; use async_channel::Receiver; use async_channel::Sender; -use atomic::AtomicUsize; use conn::CaConn; use conn::CaConnEvent; use conn::CaConnEventValue; @@ -85,6 +84,7 @@ const MAYBE_WRONG_ADDRESS_STAY: Duration = Duration::from_millis(4000); const SEARCH_PENDING_TIMEOUT: Duration = Duration::from_millis(30000); const CHANNEL_HEALTH_TIMEOUT: Duration = Duration::from_millis(30000); const CHANNEL_UNASSIGNED_TIMEOUT: Duration = Duration::from_millis(0); +const CHANNEL_BACKOFF: Duration = Duration::from_millis(10000); const CHANNEL_MAX_WITHOUT_HEALTH_UPDATE: usize = 3000000; #[allow(unused)] @@ -550,68 +550,15 @@ impl CaConnSet { CaConnEventValue::None => Ok(()), CaConnEventValue::EchoTimeout => Ok(()), CaConnEventValue::ConnCommandResult(x) => self.handle_conn_command_result(addr, x), - CaConnEventValue::QueryItem(item) => { - error!("TODO remove this insert case"); - // self.storage_insert_queue.push_back(item); - Ok(()) - } CaConnEventValue::ChannelCreateFail(x) => self.handle_channel_create_fail(addr, x), + CaConnEventValue::ChannelStatus(st) => self.apply_ca_conn_health_update(addr, st), + CaConnEventValue::Error(e) => self.handle_ca_conn_err(e, addr), CaConnEventValue::EndOfStream => self.handle_ca_conn_eos(addr), - CaConnEventValue::ConnectFail => self.handle_connect_fail(addr), - CaConnEventValue::ChannelStatus(st) => { - self.apply_ca_conn_health_update(addr, st)?; - - // let sst = &mut self.channel_states; - // for (k, v) in st.channel_statuses { - // if let Some(ch) = self.channel_by_cssid.get(&k) { - // // Only when the channel is active we expect to receive status updates. - // if let Some(st) = sst.get_mut(ch) { - // if let ChannelStateValue::Active(st2) = &mut st.value { - // if let ActiveChannelState::WithStatusSeriesId { - // status_series_id, - // state: st3, - // } = st2 - // { - // if let WithStatusSeriesIdStateInner::WithAddress { addr, state: st4 } = - // &mut st3.inner - // { - // if let WithAddressState::Assigned(st5) = st4 { - // } else { - // } - // } else { - // } - // } else { - // } - // } else { - // } - // st.value = ChannelStateValue::Active(ActiveChannelState::WithStatusSeriesId { - // status_series_id: (), - // state: WithStatusSeriesIdState { - // addr_find_backoff: todo!(), - // inner: todo!(), - // }, - // }); - // } else { - // // TODO this should be an error. - // } - // match v.channel_connected_info { - // conn::ChannelConnectedInfo::Disconnected => {} - // conn::ChannelConnectedInfo::Connecting => todo!(), - // conn::ChannelConnectedInfo::Connected => todo!(), - // conn::ChannelConnectedInfo::Error => todo!(), - // conn::ChannelConnectedInfo::Ended => todo!(), - // } - // } else { - // warn!("we do not know {:?}", k); - // } - // } - - Ok(()) - } } } fn handle_series_lookup_result(&mut self, res: Result) -> Result<(), Error> { + debug!("handle_series_lookup_result {res:?}"); if self.shutdown_stopping { return Ok(()); } @@ -649,15 +596,13 @@ impl CaConnSet { if let Some(chst) = self.channel_states.get_mut(&ch) { if let ChannelStateValue::Active(chst2) = &mut chst.value { if let ActiveChannelState::WaitForStatusSeriesId { .. } = chst2 { - *chst2 = ActiveChannelState::WithStatusSeriesId { - status_series_id: cmd.cssid, - state: WithStatusSeriesIdState { - addr_find_backoff: 0, - inner: WithStatusSeriesIdStateInner::AddrSearchPending { - since: SystemTime::now(), - }, + *chst2 = ActiveChannelState::WithStatusSeriesId(WithStatusSeriesIdState { + cssid: cmd.cssid, + addr_find_backoff: 0, + inner: WithStatusSeriesIdStateInner::AddrSearchPending { + since: SystemTime::now(), }, - }; + }); let qu = IocAddrQuery::cached(cmd.name); self.find_ioc_query_queue.push_back(qu); self.stats.ioc_search_start().inc(); @@ -692,14 +637,12 @@ impl CaConnSet { let ch = Channel::new(cmd.name.clone()); if let Some(chst) = self.channel_states.get_mut(&ch) { if let ChannelStateValue::Active(ast) = &mut chst.value { - if let ActiveChannelState::WithStatusSeriesId { - status_series_id: _, - state: st3, - } = ast - { + if let ActiveChannelState::WithStatusSeriesId(st3) = ast { + debug!("handle_add_channel_with_addr INNER {cmd:?}"); self.stats.handle_add_channel_with_addr().inc(); let tsnow = SystemTime::now(); *st3 = WithStatusSeriesIdState { + cssid: cmd.cssid.clone(), addr_find_backoff: 0, inner: WithStatusSeriesIdStateInner::WithAddress { addr: addr_v4, @@ -710,11 +653,15 @@ impl CaConnSet { }), }, }; - if !self.ca_conn_ress.contains_key(&cmd.addr) { + let addr = cmd.addr; + if self.ca_conn_ress.contains_key(&addr) { + debug!("ca_conn_ress has already {addr:?}"); + } else { + debug!("ca_conn_ress NEW {addr:?}"); let c = self.create_ca_conn(cmd.clone())?; - self.ca_conn_ress.insert(cmd.addr, c); + self.ca_conn_ress.insert(addr, c); } - let conn_ress = self.ca_conn_ress.get_mut(&cmd.addr).unwrap(); + let conn_ress = self.ca_conn_ress.get_mut(&addr).unwrap(); let cmd = ConnCommand::channel_add(cmd.name, cmd.cssid); conn_ress.cmd_queue.push_back(cmd); } @@ -737,10 +684,7 @@ impl CaConnSet { ActiveChannelState::WaitForStatusSeriesId { .. } => { k.value = ChannelStateValue::ToRemove { addr: None }; } - ActiveChannelState::WithStatusSeriesId { - status_series_id: _, - state, - } => match &state.inner { + ActiveChannelState::WithStatusSeriesId(state) => match &state.inner { WithStatusSeriesIdStateInner::UnknownAddress { .. } => { k.value = ChannelStateValue::ToRemove { addr: None }; } @@ -767,10 +711,10 @@ impl CaConnSet { } fn handle_ioc_query_result(&mut self, results: VecDeque) -> Result<(), Error> { + debug!("handle_ioc_query_result {results:?}"); if self.shutdown_stopping { return Ok(()); } - trace3!("handle_ioc_query_result"); for res in results { let ch = Channel::new(res.channel.clone()); if trigger.contains(&ch.id()) { @@ -778,34 +722,31 @@ impl CaConnSet { } if let Some(chst) = self.channel_states.get_mut(&ch) { if let ChannelStateValue::Active(ast) = &mut chst.value { - if let ActiveChannelState::WithStatusSeriesId { - status_series_id, - state, - } = ast - { + if let ActiveChannelState::WithStatusSeriesId(st2) = ast { if let Some(addr) = res.addr { self.stats.ioc_addr_found().inc(); - trace3!("ioc found {res:?}"); - let since = SystemTime::now(); - state.addr_find_backoff = 0; - state.inner = WithStatusSeriesIdStateInner::WithAddress { - addr, - state: WithAddressState::Unassigned { since }, - }; + debug!("ioc found {res:?}"); if false { + let since = SystemTime::now(); + st2.addr_find_backoff = 0; + st2.inner = WithStatusSeriesIdStateInner::WithAddress { + addr, + state: WithAddressState::Unassigned { since }, + }; + } else { let cmd = ChannelAddWithAddr { backend: self.backend.clone(), name: res.channel, addr: SocketAddr::V4(addr), - cssid: status_series_id.clone(), + cssid: st2.cssid.clone(), }; self.handle_add_channel_with_addr(cmd)?; } } else { self.stats.ioc_addr_not_found().inc(); - trace3!("ioc not found {res:?}"); + debug!("ioc not found {res:?}"); let since = SystemTime::now(); - state.inner = WithStatusSeriesIdStateInner::UnknownAddress { since }; + st2.inner = WithStatusSeriesIdStateInner::UnknownAddress { since }; } } else { self.stats.ioc_addr_result_for_unknown_channel().inc(); @@ -824,6 +765,8 @@ impl CaConnSet { } fn handle_check_health(&mut self) -> Result<(), Error> { + let tsnow = Instant::now(); + let stnow = SystemTime::now(); trace2!("handle_check_health"); if self.shutdown_stopping { Ok(()) @@ -832,7 +775,7 @@ impl CaConnSet { self.thr_msg_storage_len .trigger("connset handle_check_health", &[&self.storage_insert_sender.len()]); } - self.check_channel_states()?; + self.check_channel_states(tsnow, stnow)?; let item = CaConnSetItem::Healthy; self.connset_out_queue.push_back(item); Ok(()) @@ -877,7 +820,7 @@ impl CaConnSet { Ok(()) } - fn handle_conn_command_result(&mut self, addr: SocketAddr, res: ConnCommandResult) -> Result<(), Error> { + fn handle_conn_command_result(&mut self, _addr: SocketAddr, res: ConnCommandResult) -> Result<(), Error> { use crate::ca::conn::ConnCommandResultKind::*; match res.kind { Unused => Ok(()), @@ -897,11 +840,7 @@ impl CaConnSet { }; if let Some(st1) = self.channel_states.get_mut(&ch) { if let ChannelStateValue::Active(st2) = &mut st1.value { - if let ActiveChannelState::WithStatusSeriesId { - status_series_id: _, - state: st3, - } = st2 - { + if let ActiveChannelState::WithStatusSeriesId(st3) = st2 { if let WithStatusSeriesIdStateInner::WithAddress { addr: conn_addr, state: st4, @@ -940,11 +879,7 @@ impl CaConnSet { let ch = Channel::new(name); if let Some(st1) = self.channel_states.get_mut(&ch) { if let ChannelStateValue::Active(st2) = &mut st1.value { - if let ActiveChannelState::WithStatusSeriesId { - status_series_id: _, - state: st3, - } = st2 - { + if let ActiveChannelState::WithStatusSeriesId(st3) = st2 { trace!("handle_channel_create_fail {addr} {ch:?} set to MaybeWrongAddress"); st3.addr_find_backoff += 1; st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress { since: tsnow }; @@ -955,7 +890,7 @@ impl CaConnSet { } fn handle_ca_conn_eos(&mut self, addr: SocketAddr) -> Result<(), Error> { - trace2!("handle_ca_conn_eos {addr}"); + debug!("handle_ca_conn_eos {addr}"); if let Some(e) = self.ca_conn_ress.remove(&addr) { self.stats.ca_conn_eos_ok().inc(); self.await_ca_conn_jhs.push_back((addr, e.jh)); @@ -963,12 +898,31 @@ impl CaConnSet { self.stats.ca_conn_eos_unexpected().inc(); warn!("end-of-stream received for non-existent CaConn {addr}"); } - self.remove_status_for_addr(addr)?; + // self.remove_channel_status_for_addr(addr)?; trace2!("still CaConn left {}", self.ca_conn_ress.len()); Ok(()) } + fn handle_ca_conn_err(&mut self, e: super::conn::Error, addr: SocketAddr) -> Result<(), Error> { + use super::conn::Error as E2; + error!("received error {addr} {e}"); + match e { + E2::ConnectFail => self.handle_connect_fail(addr)?, + _ => { + // TODO others? + } + } + Ok(()) + } + fn handle_connect_fail(&mut self, addr: SocketAddr) -> Result<(), Error> { + // TODO ideally should only remove on EOS. + self.ca_conn_ress.remove(&addr); + self.transition_channels_to_maybe_wrong_address(addr)?; + Ok(()) + } + + fn transition_channels_to_maybe_wrong_address(&mut self, addr: SocketAddr) -> Result<(), Error> { trace2!("handle_connect_fail {addr}"); let tsnow = SystemTime::now(); for (ch, st1) in self.channel_states.iter_mut() { @@ -976,10 +930,7 @@ impl CaConnSet { ChannelStateValue::Active(st2) => match st2 { ActiveChannelState::Init { since: _ } => {} ActiveChannelState::WaitForStatusSeriesId { since: _ } => {} - ActiveChannelState::WithStatusSeriesId { - status_series_id: _, - state: st3, - } => { + ActiveChannelState::WithStatusSeriesId(st3) => { if let WithStatusSeriesIdStateInner::WithAddress { addr: addr_ch, state: _st4, @@ -1005,7 +956,28 @@ impl CaConnSet { Ok(()) } - fn remove_status_for_addr(&mut self, addr: SocketAddr) -> Result<(), Error> { + fn remove_channel_status_for_addr(&mut self, addr: SocketAddr) -> Result<(), Error> { + debug!("TODO remove_channel_status_for_addr"); + if true { + let e = Error::with_msg_no_trace("TODO remove_channel_status_for_addr"); + return Err(e); + } + for (_, v) in self.channel_states.iter_mut() { + match &mut v.value { + ChannelStateValue::Active(st2) => match st2 { + ActiveChannelState::WithStatusSeriesId(st3) => match &mut st3.inner { + WithStatusSeriesIdStateInner::WithAddress { addr: a2, state: st4 } => { + if SocketAddr::V4(*a2) == addr { + *st4 = WithAddressState::Backoff(Instant::now()); + } + } + _ => {} + }, + _ => {} + }, + ChannelStateValue::ToRemove { .. } => {} + } + } Ok(()) } @@ -1062,60 +1034,73 @@ impl CaConnSet { async fn ca_conn_item_merge( conn: CaConn, tx1: Sender<(SocketAddr, CaConnEvent)>, - tx2: Sender>, + _tx2: Sender>, addr: SocketAddr, stats: Arc, ) -> Result<(), Error> { stats.ca_conn_task_begin().inc(); trace2!("ca_conn_consumer begin {}", addr); let connstats = conn.stats(); - let mut conn = Box::pin(conn); - let mut ret = Ok(()); + let ret = Self::ca_conn_item_merge_inner(Box::pin(conn), tx1.clone(), addr, connstats).await; + trace2!("ca_conn_consumer ended {}", addr); + match ret { + Ok(()) => { + debug!("Sending CaConnEventValue::EndOfStream"); + tx1.send(( + addr, + CaConnEvent { + ts: Instant::now(), + value: CaConnEventValue::EndOfStream, + }, + )) + .await?; + } + Err(e) => { + error!("ca_conn_item_merge received from inner: {e}"); + } + } + stats.ca_conn_task_done().inc(); + Ok(()) + } + + async fn ca_conn_item_merge_inner( + mut conn: Pin>, + tx1: Sender<(SocketAddr, CaConnEvent)>, + addr: SocketAddr, + stats: Arc, + ) -> Result<(), Error> { while let Some(item) = conn.next().await { match item { Ok(item) => { - connstats.item_count.inc(); + stats.item_count.inc(); match item.value { - CaConnEventValue::QueryItem(x) => { - warn!("ca_conn_item_merge should not go here often"); - let mut v = VecDeque::new(); - v.push_back(x); - if let Err(_) = tx2.send(v).await { - break; - } - } CaConnEventValue::None | CaConnEventValue::EchoTimeout | CaConnEventValue::ConnCommandResult(..) | CaConnEventValue::ChannelCreateFail(..) - | CaConnEventValue::EndOfStream - | CaConnEventValue::ConnectFail - | CaConnEventValue::ChannelStatus(..) => { - if let Err(_) = tx1.send((addr, item)).await { - break; + | CaConnEventValue::ChannelStatus(..) + | CaConnEventValue::Error(..) => { + if let Err(e) = tx1.send((addr, item)).await { + error!("can not deliver error {e}"); + return Err(Error::with_msg_no_trace("can not deliver error")); } } + CaConnEventValue::EndOfStream => break, } } Err(e) => { - error!("CaConn gives error: {e:?}"); - ret = Err(e); - break; + let item = CaConnEvent { + ts: Instant::now(), + value: CaConnEventValue::Error(e), + }; + if let Err(e) = tx1.send((addr, item)).await { + error!("can not deliver error {e}"); + return Err(Error::with_msg_no_trace("can not deliver error")); + } } } } - trace2!("ca_conn_consumer ended {}", addr); - tx1.send(( - addr, - CaConnEvent { - ts: Instant::now(), - value: CaConnEventValue::EndOfStream, - }, - )) - .await?; - trace2!("ca_conn_consumer signaled {}", addr); - stats.ca_conn_task_done().inc(); - ret + Ok(()) } fn push_channel_status(&mut self, item: ChannelStatusItem) -> Result<(), Error> { @@ -1264,7 +1249,7 @@ impl CaConnSet { Ok(()) } - fn check_channel_states(&mut self) -> Result<(), Error> { + fn check_channel_states(&mut self, tsnow: Instant, stnow: SystemTime) -> Result<(), Error> { let (mut search_pending_count, mut assigned_without_health_update) = self.update_channel_state_counts(); let mut cmd_remove_channel = Vec::new(); let mut cmd_add_channel = Vec::new(); @@ -1276,8 +1261,6 @@ impl CaConnSet { } else { self.channel_states.range_mut(..) }; - - let tsnow = SystemTime::now(); for (i, (ch, st)) in it.enumerate() { match &mut st.value { ChannelStateValue::Active(st2) => match st2 { @@ -1286,51 +1269,61 @@ impl CaConnSet { self.stats.logic_error().inc(); } ActiveChannelState::WaitForStatusSeriesId { since } => { - let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); + let dt = stnow.duration_since(*since).unwrap_or(Duration::ZERO); if dt > Duration::from_millis(5000) { warn!("timeout can not get status series id for {ch:?}"); - *st2 = ActiveChannelState::Init { since: tsnow }; + *st2 = ActiveChannelState::Init { since: stnow }; } else { // TODO } } - ActiveChannelState::WithStatusSeriesId { - status_series_id, - state, - } => match &mut state.inner { + ActiveChannelState::WithStatusSeriesId(st3) => match &mut st3.inner { WithStatusSeriesIdStateInner::UnknownAddress { since } => { if search_pending_count < CURRENT_SEARCH_PENDING_MAX as _ { - if since.checked_add(UNKNOWN_ADDRESS_STAY).unwrap() < tsnow { + if since.checked_add(UNKNOWN_ADDRESS_STAY).unwrap() < stnow { if false { - // TODO + error!("TODO trigger address search from state UnknownAddress"); + if true { + std::process::exit(1); + } + if false { + // TODO + search_pending_count += 1; + st3.inner = + WithStatusSeriesIdStateInner::AddrSearchPending { since: stnow }; + } + } else { search_pending_count += 1; - state.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: tsnow }; + st3.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: stnow }; + let qu = IocAddrQuery::uncached(ch.id().into()); + self.find_ioc_query_queue.push_back(qu); + self.stats.ioc_search_start().inc(); } } } } WithStatusSeriesIdStateInner::AddrSearchPending { since } => { - let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); + let dt = stnow.duration_since(*since).unwrap_or(Duration::ZERO); if dt > SEARCH_PENDING_TIMEOUT { debug!("TODO should receive some error indication instead of timeout for {ch:?}"); - state.inner = WithStatusSeriesIdStateInner::NoAddress { since: tsnow }; + st3.inner = WithStatusSeriesIdStateInner::NoAddress { since: stnow }; search_pending_count -= 1; } } WithStatusSeriesIdStateInner::WithAddress { addr: addr_v4, - state: st3, + state: st4, } => { use WithAddressState::*; - match st3 { + match st4 { Unassigned { since } => { if assigned_without_health_update < CHANNEL_MAX_WITHOUT_HEALTH_UPDATE as _ { - if *since + CHANNEL_UNASSIGNED_TIMEOUT < tsnow { + if *since + CHANNEL_UNASSIGNED_TIMEOUT < stnow { assigned_without_health_update += 1; let cmd = ChannelAddWithAddr { backend: self.backend.clone(), name: ch.id().into(), - cssid: status_series_id.clone(), + cssid: st3.cssid.clone(), addr: SocketAddr::V4(*addr_v4), }; cmd_add_channel.push(cmd); @@ -1338,47 +1331,55 @@ impl CaConnSet { } } Assigned(st4) => { - if st4.updated + CHANNEL_HEALTH_TIMEOUT / 3 < tsnow { - warn!("soon health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~"); + if st4.updated + CHANNEL_HEALTH_TIMEOUT / 3 < stnow { + self.stats.channel_health_timeout_soon().inc(); } - if st4.updated + CHANNEL_HEALTH_TIMEOUT < tsnow { + if st4.updated + CHANNEL_HEALTH_TIMEOUT < stnow { self.stats.channel_health_timeout().inc(); trace!("health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~"); // TODO error!("TODO health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~"); - std::process::exit(1); + if true { + std::process::exit(1); + } let addr = SocketAddr::V4(*addr_v4); cmd_remove_channel.push((addr, ch.clone())); if st.health_timeout_count < 3 { - state.addr_find_backoff += 1; - state.inner = - WithStatusSeriesIdStateInner::MaybeWrongAddress { since: tsnow }; - let item = ChannelStatusItem::new_closed_conn_timeout( - tsnow, - status_series_id.clone(), - ); + st3.addr_find_backoff += 1; + st3.inner = + WithStatusSeriesIdStateInner::MaybeWrongAddress { since: stnow }; + let item = + ChannelStatusItem::new_closed_conn_timeout(stnow, st3.cssid.clone()); channel_status_items.push(item); } } } + Backoff(ts) => { + if tsnow.saturating_duration_since(*ts) >= CHANNEL_BACKOFF { + *st4 = Unassigned { since: stnow }; + } + } } } WithStatusSeriesIdStateInner::NoAddress { since } => { - if *since + NO_ADDRESS_STAY < tsnow { - state.inner = WithStatusSeriesIdStateInner::UnknownAddress { since: tsnow }; + if *since + NO_ADDRESS_STAY < stnow { + st3.inner = WithStatusSeriesIdStateInner::UnknownAddress { since: stnow }; } } WithStatusSeriesIdStateInner::MaybeWrongAddress { since } => { - if *since + (MAYBE_WRONG_ADDRESS_STAY * state.addr_find_backoff.min(10).max(1)) < tsnow { + if *since + (MAYBE_WRONG_ADDRESS_STAY * st3.addr_find_backoff.max(1).min(10)) < stnow { if search_pending_count < CURRENT_SEARCH_PENDING_MAX as _ { + debug!("try again channel after MaybeWrongAddress"); if trigger.contains(&ch.id()) { debug!("issue ioc search for {}", ch.id()); } search_pending_count += 1; - state.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: tsnow }; + st3.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: stnow }; let qu = IocAddrQuery::uncached(ch.id().into()); self.find_ioc_query_queue.push_back(qu); self.stats.ioc_search_start().inc(); + } else { + debug!("try again channel after MaybeWrongAddress NOT YET"); } } } @@ -1416,6 +1417,7 @@ impl CaConnSet { let mut search_pending = 0; let mut no_address = 0; let mut unassigned = 0; + let mut backoff = 0; let mut assigned = 0; let mut connected = 0; let mut maybe_wrong_address = 0; @@ -1429,7 +1431,7 @@ impl CaConnSet { ActiveChannelState::WaitForStatusSeriesId { .. } => { unknown_address += 1; } - ActiveChannelState::WithStatusSeriesId { state, .. } => match &state.inner { + ActiveChannelState::WithStatusSeriesId(st3) => match &st3.inner { WithStatusSeriesIdStateInner::UnknownAddress { .. } => { unknown_address += 1; } @@ -1453,6 +1455,9 @@ impl CaConnSet { } } } + WithAddressState::Backoff(ts) => { + backoff += 1; + } }, WithStatusSeriesIdStateInner::NoAddress { .. } => { no_address += 1; @@ -1471,6 +1476,7 @@ impl CaConnSet { self.stats.channel_search_pending.set(search_pending); self.stats.channel_no_address.set(no_address); self.stats.channel_unassigned.set(unassigned); + self.stats.channel_backoff.set(backoff); self.stats.channel_assigned.set(assigned); self.stats.channel_connected.set(connected); self.stats.channel_maybe_wrong_address.set(maybe_wrong_address); @@ -1634,6 +1640,7 @@ impl Stream for CaConnSet { if self.find_ioc_query_sender.is_idle() { if let Some(item) = self.find_ioc_query_queue.pop_front() { + debug!("push find item {item:?}"); self.find_ioc_query_sender.as_mut().send_pin(item); } } @@ -1682,7 +1689,9 @@ impl Stream for CaConnSet { } Err(e) => break Ready(Some(CaConnSetItem::Error(e))), }, - Ready(None) => {} + Ready(None) => { + // TODO trigger shutdown because of error + } Pending => { have_pending = true; } diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 1f0f773..ed14fb9 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -6,12 +6,9 @@ use log::*; use netpod::timeunits::*; use slidebuf::SlideBuf; use stats::CaProtoStats; -use std::collections::HashMap; use std::collections::VecDeque; use std::io; use std::net::SocketAddrV4; -use std::num::NonZeroU16; -use std::num::NonZeroU64; use std::pin::Pin; use std::sync::Arc; use std::task::Context; @@ -47,6 +44,7 @@ pub enum Error { ExtendedHeaderBadCount, NoReadBufferSpace, NeitherPendingNorProgress, + OutputBufferTooSmall, } const CA_PROTO_VERSION: u16 = 13; @@ -421,24 +419,24 @@ impl CaMsgTy { Search(_) => CA_PROTO_VERSION, SearchRes(_) => 0, CreateChan(_) => 0, - CreateChanRes(x) => { + CreateChanRes(..) => { panic!(); - x.data_count as _ + // x.data_count as _ } CreateChanFail(_) => 0, AccessRightsRes(_) => 0, EventAdd(x) => x.data_count, - EventAddRes(x) => { + EventAddRes(..) => { panic!(); - x.data_count as _ + // x.data_count as _ } EventAddResEmpty(_) => 0, EventCancel(x) => x.data_count, - EventCancelRes(x) => 0, + EventCancelRes(..) => 0, ReadNotify(x) => x.data_count, - ReadNotifyRes(x) => { + ReadNotifyRes(..) => { panic!(); - x.data_count as _ + // x.data_count as _ } Echo => 0, } @@ -963,7 +961,6 @@ pub struct CaProto { outbuf: SlideBuf, out: VecDeque, array_truncate: usize, - logged_proto_error_for_cid: HashMap, stats: Arc, resqu: VecDeque, } @@ -978,7 +975,6 @@ impl CaProto { outbuf: SlideBuf::new(1024 * 128), out: VecDeque::new(), array_truncate, - logged_proto_error_for_cid: HashMap::new(), stats, resqu: VecDeque::with_capacity(256), } @@ -1023,7 +1019,10 @@ impl CaProto { match w.poll_write(cx, b) { Ready(k) => match k { Ok(k) => match self.outbuf.adv(k) { - Ok(()) => Ready(Ok(k)), + Ok(()) => { + self.stats.out_bytes().add(k as u64); + Ready(Ok(k)) + } Err(e) => { error!("advance error {:?}", e); Ready(Err(e.into())) @@ -1043,16 +1042,22 @@ impl CaProto { let mut have_pending = false; let mut have_progress = false; let tsnow = Instant::now(); + { + let g = self.outbuf.len(); + self.stats.outbuf_len().ingest(g as u32); + } 'l1: while self.out.len() != 0 { while let Some((msg, buf)) = self.out_msg_buf() { let msglen = msg.len(); if msglen > buf.len() { error!("got output buffer but too small"); - break; + let e = Error::OutputBufferTooSmall; + return Err(e); } else { msg.place_into(&mut buf[..msglen]); self.outbuf.wadv(msglen)?; self.out.pop_front(); + self.stats.out_msg_placed().inc(); } } while self.outbuf.len() != 0 { @@ -1105,14 +1110,15 @@ impl CaProto { Ok(()) => { let nf = rbuf.filled().len(); if nf == 0 { - info!( - "EOF peer {:?} {:?} {:?}", + debug!( + "peer done {:?} {:?} {:?}", self.tcp.peer_addr(), self.remote_addr_dbg, self.state ); // TODO may need another state, if not yet done when input is EOF. self.state = CaState::Done; + have_progress = true; } else { if false { info!("received {} bytes", rbuf.filled().len()); @@ -1166,7 +1172,6 @@ impl CaProto { CaState::StdHead => { let hi = HeadInfo::from_netbuf(&mut self.buf)?; if hi.cmdid == 1 || hi.cmdid == 15 { - let sid = hi.param1; if hi.payload_size == 0xffff { if hi.data_count != 0 { warn!("protocol error: {hi:?}"); @@ -1226,7 +1231,7 @@ impl CaProto { let g = self.buf.read_bytes(hi.payload_len())?; let msg = CaMsg::from_proto_infos(hi, g, tsnow, self.array_truncate)?; // data-count is only reasonable for event messages - if let CaMsgTy::EventAddRes(e) = &msg.ty { + if let CaMsgTy::EventAddRes(..) = &msg.ty { self.stats.data_count().ingest(hi.data_count() as u32); } self.state = CaState::StdHead; diff --git a/netfetch/src/ca/statemap.rs b/netfetch/src/ca/statemap.rs index cbb2353..ee2bf67 100644 --- a/netfetch/src/ca/statemap.rs +++ b/netfetch/src/ca/statemap.rs @@ -55,6 +55,7 @@ pub enum WithAddressState { since: SystemTime, }, Assigned(ConnectionState), + Backoff(#[serde(with = "serde_helper::serde_Instant")] Instant), } #[derive(Debug, Clone, Serialize)] @@ -83,6 +84,7 @@ pub enum WithStatusSeriesIdStateInner { #[derive(Debug, Clone, Serialize)] pub struct WithStatusSeriesIdState { + pub cssid: ChannelStatusSeriesId, pub addr_find_backoff: u32, pub inner: WithStatusSeriesIdStateInner, } @@ -97,10 +99,7 @@ pub enum ActiveChannelState { #[serde(with = "humantime_serde")] since: SystemTime, }, - WithStatusSeriesId { - status_series_id: ChannelStatusSeriesId, - state: WithStatusSeriesIdState, - }, + WithStatusSeriesId(WithStatusSeriesIdState), } #[derive(Debug, Clone, Serialize)] diff --git a/serde_helper/src/lib.rs b/serde_helper/src/lib.rs index 3eb4d34..54b71b5 100644 --- a/serde_helper/src/lib.rs +++ b/serde_helper/src/lib.rs @@ -1,5 +1,5 @@ #[allow(non_snake_case)] -mod serde_Instant { +pub mod serde_Instant { use serde::Serializer; use std::time::Instant; diff --git a/stats/src/stats.rs b/stats/src/stats.rs index a52ac2b..4ae9ada 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -240,6 +240,128 @@ impl XorShift32 { } stats_proc::stats_struct!(( + stats_struct( + name(DaemonStats), + prefix(daemon), + counters( + critical_error, + todo_mark, + ticker_token_acquire_error, + ticker_token_release_error, + handle_timer_tick_count, + ioc_search_err, + ioc_search_some, + ioc_search_none, + lookupaddr_ok, + events, + event_ca_conn, + ca_conn_status_done, + ca_conn_status_feedback_timeout, + ca_conn_status_feedback_recv, + ca_conn_status_feedback_no_dst, + ca_echo_timeout_total, + caconn_done_channel_state_reset, + insert_worker_spawned, + insert_worker_join_ok, + insert_worker_join_ok_err, + insert_worker_join_err, + caconnset_health_response, + ), + values( + channel_unknown_address, + channel_search_pending, + channel_with_address, + channel_no_address, + connset_health_lat_ema, + ), + ), + agg(name(DaemonStatsAgg), parent(DaemonStats)), + diff(name(DaemonStatsAggDiff), input(DaemonStatsAgg)), + stats_struct( + name(CaConnStats), + prefix(caconn), + counters( + insert_item_create, + inserts_val, + inserts_msp, + inserts_msp_grid, + inserts_queue_pop_for_global, + inserts_queue_push, + inserts_queue_drop, + insert_item_queue_pressure, + insert_item_queue_full, + out_queue_full, + channel_fast_item_drop, + logic_error, + // TODO maybe rename: this is now only the recv of the intermediate queue: + store_worker_item_recv, + // TODO rename to make clear that this drop is voluntary because of user config choice: + // store_worker_fraction_drop, + // store_worker_ratelimit_drop, + // store_worker_insert_done, + // store_worker_insert_binned_done, + // store_worker_insert_overload, + // store_worker_insert_timeout, + // store_worker_insert_unavailable, + // store_worker_insert_error, + connection_status_insert_done, + channel_status_insert_done, + channel_info_insert_done, + ivl_insert_done, + mute_insert_done, + poll_count, + loop1_count, + loop2_count, + loop3_count, + loop4_count, + command_can_not_reply, + time_handle_conn_listen, + time_handle_peer_ready, + time_check_channels_state_init, + time_handle_event_add_res, + tcp_connected, + get_series_id_ok, + item_count, + stream_ready, + stream_pending, + channel_all_count, + channel_alive_count, + channel_not_alive_count, + channel_series_lookup_already_pending, + ping_start, + ping_no_proto, + pong_timeout, + poll_fn_begin, + poll_loop_begin, + poll_reloop, + poll_pending, + poll_no_progress_no_pending, + poll_wake_break, + storage_queue_send, + storage_queue_pending, + event_add_res_recv, + caget_issued, + caget_timeout, + unknown_subid, + unknown_ioid, + transition_to_polling, + transition_to_polling_already_in, + transition_to_polling_bad_state, + ), + values(inter_ivl_ema, read_ioids_len, proto_out_len,), + histolog2s( + poll_all_dt, + poll_op3_dt, + poll_reloops, + pong_recv_lat, + ca_ts_off, + iiq_len, + iiq_batch_len, + caget_lat, + ), + ), + agg(name(CaConnStatsAgg), parent(CaConnStats)), + diff(name(CaConnStatsAggDiff), input(CaConnStatsAgg)), stats_struct( name(CaProtoStats), prefix(ca_proto), @@ -250,8 +372,10 @@ stats_proc::stats_struct!(( payload_std_too_large, payload_ext_but_small, payload_ext_very_large, + out_msg_placed, + out_bytes, ), - histolog2s(payload_size, data_count,), + histolog2s(payload_size, data_count, outbuf_len,), ), stats_struct( name(CaConnSetStats), @@ -259,6 +383,7 @@ stats_proc::stats_struct!(( counters( channel_add, channel_status_series_found, + channel_health_timeout_soon, channel_health_timeout, ioc_search_start, ioc_addr_found, @@ -270,6 +395,7 @@ stats_proc::stats_struct!(( ca_conn_task_join_done_err, ca_conn_task_join_err, ca_conn_eos_ok, + ca_conn_eos_err, ca_conn_eos_unexpected, response_tx_fail, try_push_ca_conn_cmds_sent, @@ -300,6 +426,7 @@ stats_proc::stats_struct!(( channel_search_pending, channel_no_address, channel_unassigned, + channel_backoff, channel_assigned, channel_connected, channel_maybe_wrong_address, @@ -370,130 +497,6 @@ stats_proc::stats_struct!(( stats_struct(name(SeriesWriterEstablishStats), prefix(wrest), counters(job_recv,),), )); -stats_proc::stats_struct!(( - stats_struct( - name(CaConnStats), - prefix(caconn), - counters( - insert_item_create, - inserts_val, - inserts_msp, - inserts_msp_grid, - inserts_queue_pop_for_global, - inserts_queue_push, - inserts_queue_drop, - insert_item_queue_pressure, - insert_item_queue_full, - out_queue_full, - channel_fast_item_drop, - logic_error, - // TODO maybe rename: this is now only the recv of the intermediate queue: - store_worker_item_recv, - // TODO rename to make clear that this drop is voluntary because of user config choice: - // store_worker_fraction_drop, - // store_worker_ratelimit_drop, - // store_worker_insert_done, - // store_worker_insert_binned_done, - // store_worker_insert_overload, - // store_worker_insert_timeout, - // store_worker_insert_unavailable, - // store_worker_insert_error, - connection_status_insert_done, - channel_status_insert_done, - channel_info_insert_done, - ivl_insert_done, - mute_insert_done, - poll_count, - loop1_count, - loop2_count, - loop3_count, - loop4_count, - command_can_not_reply, - time_handle_conn_listen, - time_handle_peer_ready, - time_check_channels_state_init, - time_handle_event_add_res, - tcp_connected, - get_series_id_ok, - item_count, - stream_ready, - stream_pending, - channel_all_count, - channel_alive_count, - channel_not_alive_count, - channel_series_lookup_already_pending, - ping_start, - ping_no_proto, - pong_timeout, - poll_fn_begin, - poll_loop_begin, - poll_reloop, - poll_pending, - poll_no_progress_no_pending, - poll_wake_break, - storage_queue_send, - storage_queue_pending, - storage_queue_above_8, - storage_queue_above_32, - storage_queue_above_128, - event_add_res_recv, - caget_timeout, - ), - values(inter_ivl_ema, read_ioids_len, proto_out_len,), - histolog2s( - poll_all_dt, - poll_op3_dt, - poll_reloops, - pong_recv_lat, - ca_ts_off, - iiq_batch_len, - caget_lat, - ), - ), - agg(name(CaConnStatsAgg), parent(CaConnStats)), - diff(name(CaConnStatsAggDiff), input(CaConnStatsAgg)), -)); - -stats_proc::stats_struct!(( - stats_struct( - name(DaemonStats), - prefix(daemon), - counters( - critical_error, - todo_mark, - ticker_token_acquire_error, - ticker_token_release_error, - handle_timer_tick_count, - ioc_search_err, - ioc_search_some, - ioc_search_none, - lookupaddr_ok, - events, - event_ca_conn, - ca_conn_status_done, - ca_conn_status_feedback_timeout, - ca_conn_status_feedback_recv, - ca_conn_status_feedback_no_dst, - ca_echo_timeout_total, - caconn_done_channel_state_reset, - insert_worker_spawned, - insert_worker_join_ok, - insert_worker_join_ok_err, - insert_worker_join_err, - caconnset_health_response, - ), - values( - channel_unknown_address, - channel_search_pending, - channel_with_address, - channel_no_address, - connset_health_lat_ema, - ), - ), - agg(name(DaemonStatsAgg), parent(DaemonStats)), - diff(name(DaemonStatsAggDiff), input(DaemonStatsAgg)), -)); - stats_proc::stats_struct!(( stats_struct(name(TestStats0), counters(count0,), values(val0),), diff(name(TestStats0Diff), input(TestStats0)),