Ingest EvtSet with zmtp init

This commit is contained in:
Dominik Werder
2023-02-20 16:36:23 +01:00
parent 1ba74e1342
commit c3ac621cc6
5 changed files with 41 additions and 22 deletions

View File

@@ -782,26 +782,26 @@ impl Daemon {
}
async fn check_channel_states(&mut self) -> Result<(), Error> {
let mut currently_search_pending = 0;
let mut search_pending_count = 0;
{
let mut unknown_address_count = 0;
let mut with_address_count = 0;
let mut without_address_count = 0;
let mut no_address_count = 0;
for (_ch, st) in &self.channel_states {
match &st.value {
ChannelStateValue::Active(st2) => match st2 {
ActiveChannelState::Init { .. } => {
without_address_count += 1;
unknown_address_count += 1;
}
ActiveChannelState::WaitForStatusSeriesId { .. } => {
without_address_count += 1;
unknown_address_count += 1;
}
ActiveChannelState::WithStatusSeriesId { state, .. } => match &state.inner {
WithStatusSeriesIdStateInner::UnknownAddress { .. } => {
without_address_count += 1;
unknown_address_count += 1;
}
WithStatusSeriesIdStateInner::SearchPending { .. } => {
currently_search_pending += 1;
without_address_count += 1;
search_pending_count += 1;
}
WithStatusSeriesIdStateInner::WithAddress { state, .. } => match state {
WithAddressState::Unassigned { .. } => {
@@ -812,21 +812,27 @@ impl Daemon {
}
},
WithStatusSeriesIdStateInner::NoAddress { .. } => {
without_address_count += 1;
no_address_count += 1;
}
},
},
ChannelStateValue::ToRemove { .. } => {
with_address_count += 1;
unknown_address_count += 1;
}
}
}
self.stats
.channel_unknown_address
.store(unknown_address_count, atomic::Ordering::Release);
self.stats
.channel_with_address
.store(with_address_count, atomic::Ordering::Release);
self.stats
.channel_without_address
.store(without_address_count, atomic::Ordering::Release);
.channel_search_pending
.store(search_pending_count as u64, atomic::Ordering::Release);
self.stats
.channel_no_address
.store(no_address_count, atomic::Ordering::Release);
}
let k = self.chan_check_next.take();
trace!("------------ check_chans start at {:?}", k);
@@ -880,10 +886,12 @@ impl Daemon {
};
}
Err(e) => {
error!("could not get a status series id");
error!("could not get a status series id {ch:?} {e}");
}
},
Err(e) => {}
Err(_) => {
// TODO should maybe not attempt receive on each channel check.
}
}
}
}
@@ -895,8 +903,8 @@ impl Daemon {
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > UNKNOWN_ADDRESS_STAY {
//info!("UnknownAddress {} {:?}", i, ch);
if currently_search_pending < CURRENT_SEARCH_PENDING_MAX {
currently_search_pending += 1;
if search_pending_count < CURRENT_SEARCH_PENDING_MAX {
search_pending_count += 1;
state.inner = WithStatusSeriesIdStateInner::SearchPending {
since: tsnow,
did_send: false,
@@ -911,7 +919,7 @@ impl Daemon {
if dt > SEARCH_PENDING_TIMEOUT {
info!("Search timeout for {ch:?}");
state.inner = WithStatusSeriesIdStateInner::NoAddress { since: tsnow };
currently_search_pending -= 1;
search_pending_count -= 1;
}
}
WithStatusSeriesIdStateInner::WithAddress { addr, state } => {
@@ -1334,6 +1342,7 @@ impl Daemon {
WithStatusSeriesIdStateInner::SearchPending { .. } => {}
WithStatusSeriesIdStateInner::WithAddress { addr, .. } => {
if addr == &conn_addr {
self.stats.caconn_done_channel_state_reset_inc();
// TODO reset channel, emit log event for the connection addr only
//info!("ca conn down, reset {k:?}");
*v = ChannelState {

View File

@@ -927,7 +927,7 @@ impl CaConn {
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
let cssid = match ch_s {
ChannelState::Created(_series, st2) => st2.cssid.clone(),
ChannelState::FetchingSeriesId(st2) => st2.cssid.clone(),
_ => {
let name = self.name_by_cid.get(&cid);
let e = Error::with_msg_no_trace(format!("channel_to_evented bad state {name:?} {ch_s:?}"));
@@ -1381,7 +1381,7 @@ impl CaConn {
let cssid = match ch_s {
ChannelState::Creating { cssid, .. } => cssid.clone(),
_ => {
let e = Error::with_msg_no_trace("channel_to_evented bad state");
let e = Error::with_msg_no_trace("handle_peer_ready bad state");
return Ready(Some(Err(e)));
}
};

View File

@@ -119,7 +119,10 @@ impl DbUpdateWorker {
.unwrap();
} else {
warn!("Duplicate for {}", item.channel);
let sql="with q1 as (select ctid from ioc_by_channel_log where facility = $1 and channel = $2 and addr is not distinct from $3 order by tsmod desc, ctid desc limit 1) update ioc_by_channel_log t set archived = 1 from q1 where t.facility = $1 and t.channel = $2 and t.addr is not distinct from $3 and t.ctid != q1.ctid";
let sql = concat!(
"with q1 as (select ctid from ioc_by_channel_log where facility = $1 and channel = $2 and addr is not distinct from $3 order by tsmod desc, ctid desc limit 1)",
" update ioc_by_channel_log t set archived = 1 from q1 where t.facility = $1 and t.channel = $2 and t.ctid != q1.ctid and archived != 1",
);
pg_client.execute(sql, &[&backend, &item.channel, &addr]).await.unwrap();
pg_client
.execute(&qu_update_tsmod, &[&backend, &item.channel, &addr, &responseaddr])

View File

@@ -158,7 +158,7 @@ impl BsreadClient {
ts: u64,
pulse: u64,
) -> Result<(), Error> {
let chname = "SAR-CVME-TIFALL4:EvtSet";
let chname = "SAR-CVME-TIFALL5:EvtSet";
// Test the bool set write
let mut i3 = usize::MAX;
for (i, ch) in bm.head_b.channels.iter().enumerate() {
@@ -675,7 +675,8 @@ impl Zmtp {
complete: false,
socket_type,
conn,
conn_state: ConnState::LockScan(1),
//conn_state: ConnState::LockScan(1),
conn_state: ConnState::InitSend,
buf: NetBuf::new(1024 * 128),
outbuf: NetBuf::new(1024 * 128),
out_enable: false,

View File

@@ -286,8 +286,14 @@ stats_proc::stats_struct!((
ca_conn_status_feedback_recv,
ca_conn_status_feedback_no_dst,
ca_echo_timeout_total,
caconn_done_channel_state_reset,
),
values(
channel_unknown_address,
channel_search_pending,
channel_with_address,
channel_no_address
),
values(channel_without_address, channel_with_address),
),
agg(name(DaemonStatsAgg), parent(DaemonStats)),
diff(name(DaemonStatsAggDiff), input(DaemonStatsAgg)),