This commit is contained in:
Dominik Werder
2023-02-16 13:16:55 +01:00
parent 7a10a740f6
commit cd13cc8374
3 changed files with 54 additions and 38 deletions

View File

@@ -750,36 +750,38 @@ impl Daemon {
!self.shutting_down
}
async fn check_chans(&mut self) -> Result<(), Error> {
{
let tsnow = Instant::now();
for (k, v) in &mut self.connection_states {
match v.value {
CaConnStateValue::Fresh => {
// TODO check for delta t since last issued status command.
if tsnow.duration_since(v.last_feedback) > Duration::from_millis(20000) {
error!("TODO send connection-close for {k:?}");
self.stats.ca_conn_status_feedback_timeout_inc();
v.value = CaConnStateValue::Shutdown { since: tsnow };
}
fn check_connection_states(&mut self) -> Result<(), Error> {
let tsnow = Instant::now();
for (k, v) in &mut self.connection_states {
match v.value {
CaConnStateValue::Fresh => {
// TODO check for delta t since last issued status command.
if tsnow.duration_since(v.last_feedback) > Duration::from_millis(20000) {
error!("TODO Fresh timeout send connection-close for {k:?}");
self.stats.ca_conn_status_feedback_timeout_inc();
v.value = CaConnStateValue::Shutdown { since: tsnow };
}
CaConnStateValue::HadFeedback => {
// TODO check for delta t since last issued status command.
if tsnow.duration_since(v.last_feedback) > Duration::from_millis(20000) {
error!("TODO send connection-close for {k:?}");
self.stats.ca_conn_status_feedback_timeout_inc();
v.value = CaConnStateValue::Shutdown { since: tsnow };
}
}
CaConnStateValue::HadFeedback => {
// TODO check for delta t since last issued status command.
if tsnow.duration_since(v.last_feedback) > Duration::from_millis(20000) {
error!("TODO HadFeedback timeout send connection-close for {k:?}");
self.stats.ca_conn_status_feedback_timeout_inc();
v.value = CaConnStateValue::Shutdown { since: tsnow };
}
CaConnStateValue::Shutdown { since } => {
if tsnow.saturating_duration_since(since) > Duration::from_millis(10000) {
self.stats.critical_error_inc();
error!("Shutdown of CaConn to {} failed", k);
}
}
CaConnStateValue::Shutdown { since } => {
if tsnow.saturating_duration_since(since) > Duration::from_millis(10000) {
self.stats.critical_error_inc();
error!("Shutdown of CaConn to {} failed", k);
}
}
}
}
Ok(())
}
async fn check_channel_states(&mut self) -> Result<(), Error> {
let mut currently_search_pending = 0;
{
let mut with_address_count = 0;
@@ -863,7 +865,7 @@ impl Daemon {
ActiveChannelState::WaitForStatusSeriesId { since, rx } => {
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > Duration::from_millis(5000) {
warn!("timeout can not get status series id");
warn!("timeout can not get status series id for {ch:?}");
*st2 = ActiveChannelState::Init { since: tsnow };
} else {
match rx.try_recv() {
@@ -942,13 +944,19 @@ impl Daemon {
value: ConnectionStateValue::Unconnected,
};
*state = WithAddressState::Assigned(cs);
self.connection_states.entry(*addr).or_insert_with(|| {
let t = CaConnState {
last_feedback: Instant::now(),
value: CaConnStateValue::Fresh,
};
t
});
self.connection_states
.entry(*addr)
.and_modify(|_| {
// TODO may be count for metrics.
// Nothing else to do.
})
.or_insert_with(|| {
let t = CaConnState {
last_feedback: Instant::now(),
value: CaConnStateValue::Fresh,
};
t
});
// TODO move await out of here
if let Some(tx) = self.ingest_commons.insert_item_queue.sender() {
let item = QueryItem::ChannelStatus(ChannelStatusItem {
@@ -1105,7 +1113,8 @@ impl Daemon {
warn!("Received SIGTERM");
SIGTERM.store(2, atomic::Ordering::Release);
}
self.check_chans().await?;
self.check_connection_states()?;
self.check_channel_states().await?;
let dt = ts1.elapsed();
if dt > Duration::from_millis(500) {
info!("slow check_chans {}ms", dt.as_secs_f32() * 1e3);
@@ -1332,6 +1341,8 @@ impl Daemon {
since: SystemTime::now(),
}),
};
} else {
// nothing to do
}
}
WithStatusSeriesIdStateInner::NoAddress { .. } => {}
@@ -1415,6 +1426,7 @@ impl Daemon {
let item_summary = item.summary();
let ret = match item {
TimerTick => {
let ts1 = Instant::now();
let ret = self.handle_timer_tick().await;
match ticker_inp_tx.send(42).await {
Ok(_) => {}
@@ -1424,6 +1436,8 @@ impl Daemon {
return Err(Error::with_msg_no_trace("can not send ticker token"));
}
}
// TODO collect timer tick min/max/avg metrics.
let _ = ts1.elapsed();
ret
}
ChannelAdd(ch) => self.handle_channel_add(ch),

View File

@@ -818,11 +818,11 @@ impl CaConn {
} else {
self.ioc_ping_start = Some(Instant::now());
if let Some(proto) = &mut self.proto {
trace!("push echo to {}", self.remote_addr_dbg);
info!("push echo to {}", self.remote_addr_dbg);
let msg = CaMsg { ty: CaMsgTy::Echo };
proto.push_out(msg);
} else {
warn!("can not push echo, no proto");
warn!("can not push echo, no proto {}", self.remote_addr_dbg);
self.trigger_shutdown(ChannelStatusClosedReason::NoProtocol);
}
}
@@ -1443,7 +1443,7 @@ impl CaConn {
},
Err(e) => {
// TODO count only
error!("can not receive series lookup result {e}");
error!("can not receive series lookup result for {name} {e}");
Err(Error::with_msg_no_trace("can not receive lookup result"))
}
}

View File

@@ -109,6 +109,7 @@ impl CaConnSet {
let conn_fut = async move {
let stats = conn.stats();
let mut conn = conn;
let mut ret = Ok(());
while let Some(item) = conn.next().await {
match item {
Ok(item) => {
@@ -117,10 +118,11 @@ impl CaConnSet {
}
Err(e) => {
error!("CaConn gives error: {e:?}");
return Err(e);
ret = Err(e);
}
}
}
info!("CaConn stream ended {}", addr);
Self::conn_remove(&ca_conn_ress, addr).await?;
conn_item_tx
.send((
@@ -131,7 +133,7 @@ impl CaConnSet {
},
))
.await?;
Ok(())
ret
};
let jh = tokio::spawn(conn_fut);
let ca_conn_ress = CaConnRess {