diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index fcb9634..1302641 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -782,26 +782,26 @@ impl Daemon { } async fn check_channel_states(&mut self) -> Result<(), Error> { - let mut currently_search_pending = 0; + let mut search_pending_count = 0; { + let mut unknown_address_count = 0; let mut with_address_count = 0; - let mut without_address_count = 0; + let mut no_address_count = 0; for (_ch, st) in &self.channel_states { match &st.value { ChannelStateValue::Active(st2) => match st2 { ActiveChannelState::Init { .. } => { - without_address_count += 1; + unknown_address_count += 1; } ActiveChannelState::WaitForStatusSeriesId { .. } => { - without_address_count += 1; + unknown_address_count += 1; } ActiveChannelState::WithStatusSeriesId { state, .. } => match &state.inner { WithStatusSeriesIdStateInner::UnknownAddress { .. } => { - without_address_count += 1; + unknown_address_count += 1; } WithStatusSeriesIdStateInner::SearchPending { .. } => { - currently_search_pending += 1; - without_address_count += 1; + search_pending_count += 1; } WithStatusSeriesIdStateInner::WithAddress { state, .. } => match state { WithAddressState::Unassigned { .. } => { @@ -812,21 +812,27 @@ impl Daemon { } }, WithStatusSeriesIdStateInner::NoAddress { .. } => { - without_address_count += 1; + no_address_count += 1; } }, }, ChannelStateValue::ToRemove { .. } => { - with_address_count += 1; + unknown_address_count += 1; } } } + self.stats + .channel_unknown_address + .store(unknown_address_count, atomic::Ordering::Release); self.stats .channel_with_address .store(with_address_count, atomic::Ordering::Release); self.stats - .channel_without_address - .store(without_address_count, atomic::Ordering::Release); + .channel_search_pending + .store(search_pending_count as u64, atomic::Ordering::Release); + self.stats + .channel_no_address + .store(no_address_count, atomic::Ordering::Release); } let k = self.chan_check_next.take(); trace!("------------ check_chans start at {:?}", k); @@ -880,10 +886,12 @@ impl Daemon { }; } Err(e) => { - error!("could not get a status series id"); + error!("could not get a status series id {ch:?} {e}"); } }, - Err(e) => {} + Err(_) => { + // TODO should maybe not attempt receive on each channel check. + } } } } @@ -895,8 +903,8 @@ impl Daemon { let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); if dt > UNKNOWN_ADDRESS_STAY { //info!("UnknownAddress {} {:?}", i, ch); - if currently_search_pending < CURRENT_SEARCH_PENDING_MAX { - currently_search_pending += 1; + if search_pending_count < CURRENT_SEARCH_PENDING_MAX { + search_pending_count += 1; state.inner = WithStatusSeriesIdStateInner::SearchPending { since: tsnow, did_send: false, @@ -911,7 +919,7 @@ impl Daemon { if dt > SEARCH_PENDING_TIMEOUT { info!("Search timeout for {ch:?}"); state.inner = WithStatusSeriesIdStateInner::NoAddress { since: tsnow }; - currently_search_pending -= 1; + search_pending_count -= 1; } } WithStatusSeriesIdStateInner::WithAddress { addr, state } => { @@ -1334,6 +1342,7 @@ impl Daemon { WithStatusSeriesIdStateInner::SearchPending { .. } => {} WithStatusSeriesIdStateInner::WithAddress { addr, .. } => { if addr == &conn_addr { + self.stats.caconn_done_channel_state_reset_inc(); // TODO reset channel, emit log event for the connection addr only //info!("ca conn down, reset {k:?}"); *v = ChannelState { diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index c52c6ca..59aa710 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -927,7 +927,7 @@ impl CaConn { // TODO handle not-found error: let ch_s = self.channels.get_mut(&cid).unwrap(); let cssid = match ch_s { - ChannelState::Created(_series, st2) => st2.cssid.clone(), + ChannelState::FetchingSeriesId(st2) => st2.cssid.clone(), _ => { let name = self.name_by_cid.get(&cid); let e = Error::with_msg_no_trace(format!("channel_to_evented bad state {name:?} {ch_s:?}")); @@ -1381,7 +1381,7 @@ impl CaConn { let cssid = match ch_s { ChannelState::Creating { cssid, .. } => cssid.clone(), _ => { - let e = Error::with_msg_no_trace("channel_to_evented bad state"); + let e = Error::with_msg_no_trace("handle_peer_ready bad state"); return Ready(Some(Err(e))); } }; diff --git a/netfetch/src/ca/search.rs b/netfetch/src/ca/search.rs index f201a92..817b11e 100644 --- a/netfetch/src/ca/search.rs +++ b/netfetch/src/ca/search.rs @@ -119,7 +119,10 @@ impl DbUpdateWorker { .unwrap(); } else { warn!("Duplicate for {}", item.channel); - let sql="with q1 as (select ctid from ioc_by_channel_log where facility = $1 and channel = $2 and addr is not distinct from $3 order by tsmod desc, ctid desc limit 1) update ioc_by_channel_log t set archived = 1 from q1 where t.facility = $1 and t.channel = $2 and t.addr is not distinct from $3 and t.ctid != q1.ctid"; + let sql = concat!( + "with q1 as (select ctid from ioc_by_channel_log where facility = $1 and channel = $2 and addr is not distinct from $3 order by tsmod desc, ctid desc limit 1)", + " update ioc_by_channel_log t set archived = 1 from q1 where t.facility = $1 and t.channel = $2 and t.ctid != q1.ctid and archived != 1", + ); pg_client.execute(sql, &[&backend, &item.channel, &addr]).await.unwrap(); pg_client .execute(&qu_update_tsmod, &[&backend, &item.channel, &addr, &responseaddr]) diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index 2efcc2f..acf129d 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -158,7 +158,7 @@ impl BsreadClient { ts: u64, pulse: u64, ) -> Result<(), Error> { - let chname = "SAR-CVME-TIFALL4:EvtSet"; + let chname = "SAR-CVME-TIFALL5:EvtSet"; // Test the bool set write let mut i3 = usize::MAX; for (i, ch) in bm.head_b.channels.iter().enumerate() { @@ -675,7 +675,8 @@ impl Zmtp { complete: false, socket_type, conn, - conn_state: ConnState::LockScan(1), + //conn_state: ConnState::LockScan(1), + conn_state: ConnState::InitSend, buf: NetBuf::new(1024 * 128), outbuf: NetBuf::new(1024 * 128), out_enable: false, diff --git a/stats/src/stats.rs b/stats/src/stats.rs index e9b9436..593105b 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -286,8 +286,14 @@ stats_proc::stats_struct!(( ca_conn_status_feedback_recv, ca_conn_status_feedback_no_dst, ca_echo_timeout_total, + caconn_done_channel_state_reset, + ), + values( + channel_unknown_address, + channel_search_pending, + channel_with_address, + channel_no_address ), - values(channel_without_address, channel_with_address), ), agg(name(DaemonStatsAgg), parent(DaemonStats)), diff(name(DaemonStatsAggDiff), input(DaemonStatsAgg)),