From cd13cc83743df91592857186c3030cd3aed21296 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 16 Feb 2023 13:16:55 +0100 Subject: [PATCH] WIP --- daqingest/src/daemon.rs | 80 ++++++++++++++++++++++---------------- netfetch/src/ca/conn.rs | 6 +-- netfetch/src/ca/connset.rs | 6 ++- 3 files changed, 54 insertions(+), 38 deletions(-) diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 5d0f6df..fcb9634 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -750,36 +750,38 @@ impl Daemon { !self.shutting_down } - async fn check_chans(&mut self) -> Result<(), Error> { - { - let tsnow = Instant::now(); - for (k, v) in &mut self.connection_states { - match v.value { - CaConnStateValue::Fresh => { - // TODO check for delta t since last issued status command. - if tsnow.duration_since(v.last_feedback) > Duration::from_millis(20000) { - error!("TODO send connection-close for {k:?}"); - self.stats.ca_conn_status_feedback_timeout_inc(); - v.value = CaConnStateValue::Shutdown { since: tsnow }; - } + fn check_connection_states(&mut self) -> Result<(), Error> { + let tsnow = Instant::now(); + for (k, v) in &mut self.connection_states { + match v.value { + CaConnStateValue::Fresh => { + // TODO check for delta t since last issued status command. + if tsnow.duration_since(v.last_feedback) > Duration::from_millis(20000) { + error!("TODO Fresh timeout send connection-close for {k:?}"); + self.stats.ca_conn_status_feedback_timeout_inc(); + v.value = CaConnStateValue::Shutdown { since: tsnow }; } - CaConnStateValue::HadFeedback => { - // TODO check for delta t since last issued status command. - if tsnow.duration_since(v.last_feedback) > Duration::from_millis(20000) { - error!("TODO send connection-close for {k:?}"); - self.stats.ca_conn_status_feedback_timeout_inc(); - v.value = CaConnStateValue::Shutdown { since: tsnow }; - } + } + CaConnStateValue::HadFeedback => { + // TODO check for delta t since last issued status command. + if tsnow.duration_since(v.last_feedback) > Duration::from_millis(20000) { + error!("TODO HadFeedback timeout send connection-close for {k:?}"); + self.stats.ca_conn_status_feedback_timeout_inc(); + v.value = CaConnStateValue::Shutdown { since: tsnow }; } - CaConnStateValue::Shutdown { since } => { - if tsnow.saturating_duration_since(since) > Duration::from_millis(10000) { - self.stats.critical_error_inc(); - error!("Shutdown of CaConn to {} failed", k); - } + } + CaConnStateValue::Shutdown { since } => { + if tsnow.saturating_duration_since(since) > Duration::from_millis(10000) { + self.stats.critical_error_inc(); + error!("Shutdown of CaConn to {} failed", k); } } } } + Ok(()) + } + + async fn check_channel_states(&mut self) -> Result<(), Error> { let mut currently_search_pending = 0; { let mut with_address_count = 0; @@ -863,7 +865,7 @@ impl Daemon { ActiveChannelState::WaitForStatusSeriesId { since, rx } => { let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); if dt > Duration::from_millis(5000) { - warn!("timeout can not get status series id"); + warn!("timeout can not get status series id for {ch:?}"); *st2 = ActiveChannelState::Init { since: tsnow }; } else { match rx.try_recv() { @@ -942,13 +944,19 @@ impl Daemon { value: ConnectionStateValue::Unconnected, }; *state = WithAddressState::Assigned(cs); - self.connection_states.entry(*addr).or_insert_with(|| { - let t = CaConnState { - last_feedback: Instant::now(), - value: CaConnStateValue::Fresh, - }; - t - }); + self.connection_states + .entry(*addr) + .and_modify(|_| { + // TODO may be count for metrics. + // Nothing else to do. + }) + .or_insert_with(|| { + let t = CaConnState { + last_feedback: Instant::now(), + value: CaConnStateValue::Fresh, + }; + t + }); // TODO move await out of here if let Some(tx) = self.ingest_commons.insert_item_queue.sender() { let item = QueryItem::ChannelStatus(ChannelStatusItem { @@ -1105,7 +1113,8 @@ impl Daemon { warn!("Received SIGTERM"); SIGTERM.store(2, atomic::Ordering::Release); } - self.check_chans().await?; + self.check_connection_states()?; + self.check_channel_states().await?; let dt = ts1.elapsed(); if dt > Duration::from_millis(500) { info!("slow check_chans {}ms", dt.as_secs_f32() * 1e3); @@ -1332,6 +1341,8 @@ impl Daemon { since: SystemTime::now(), }), }; + } else { + // nothing to do } } WithStatusSeriesIdStateInner::NoAddress { .. } => {} @@ -1415,6 +1426,7 @@ impl Daemon { let item_summary = item.summary(); let ret = match item { TimerTick => { + let ts1 = Instant::now(); let ret = self.handle_timer_tick().await; match ticker_inp_tx.send(42).await { Ok(_) => {} @@ -1424,6 +1436,8 @@ impl Daemon { return Err(Error::with_msg_no_trace("can not send ticker token")); } } + // TODO collect timer tick min/max/avg metrics. + let _ = ts1.elapsed(); ret } ChannelAdd(ch) => self.handle_channel_add(ch), diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index d574af8..4f1ceb3 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -818,11 +818,11 @@ impl CaConn { } else { self.ioc_ping_start = Some(Instant::now()); if let Some(proto) = &mut self.proto { - trace!("push echo to {}", self.remote_addr_dbg); + info!("push echo to {}", self.remote_addr_dbg); let msg = CaMsg { ty: CaMsgTy::Echo }; proto.push_out(msg); } else { - warn!("can not push echo, no proto"); + warn!("can not push echo, no proto {}", self.remote_addr_dbg); self.trigger_shutdown(ChannelStatusClosedReason::NoProtocol); } } @@ -1443,7 +1443,7 @@ impl CaConn { }, Err(e) => { // TODO count only - error!("can not receive series lookup result {e}"); + error!("can not receive series lookup result for {name} {e}"); Err(Error::with_msg_no_trace("can not receive lookup result")) } } diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 9b197e5..45f984a 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -109,6 +109,7 @@ impl CaConnSet { let conn_fut = async move { let stats = conn.stats(); let mut conn = conn; + let mut ret = Ok(()); while let Some(item) = conn.next().await { match item { Ok(item) => { @@ -117,10 +118,11 @@ impl CaConnSet { } Err(e) => { error!("CaConn gives error: {e:?}"); - return Err(e); + ret = Err(e); } } } + info!("CaConn stream ended {}", addr); Self::conn_remove(&ca_conn_ress, addr).await?; conn_item_tx .send(( @@ -131,7 +133,7 @@ impl CaConnSet { }, )) .await?; - Ok(()) + ret }; let jh = tokio::spawn(conn_fut); let ca_conn_ress = CaConnRess {