Remove another old stats type
This commit is contained in:
@@ -880,10 +880,8 @@ impl Daemon {
|
||||
);
|
||||
let rres = Arc::new(rres);
|
||||
let metrics_jh = {
|
||||
let conn_set_stats = self.connset_ctrl.stats().clone();
|
||||
let stats_set = StatsSet::new(
|
||||
daemon_stats,
|
||||
conn_set_stats,
|
||||
self.insert_worker_stats.clone(),
|
||||
self.series_by_channel_stats.clone(),
|
||||
self.connset_ctrl.ioc_finder_stats().clone(),
|
||||
|
||||
@@ -44,7 +44,6 @@ use statemap::ConnectionState;
|
||||
use statemap::ConnectionStateValue;
|
||||
use statemap::WithStatusSeriesIdState;
|
||||
use statemap::WithStatusSeriesIdStateInner;
|
||||
use stats::CaConnSetStats;
|
||||
use stats::IocFinderStats;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::VecDeque;
|
||||
@@ -58,6 +57,7 @@ use netpod::OnDrop;
|
||||
use netpod::TsNano;
|
||||
use scywr::insertqueues::InsertQueuesTx;
|
||||
use series::SeriesId;
|
||||
use stats::mett::CaConnSetMetrics;
|
||||
use std::sync::Arc;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
@@ -247,7 +247,6 @@ pub enum CaConnSetItem {
|
||||
pub struct CaConnSetCtrl {
|
||||
tx: Sender<CaConnSetEvent>,
|
||||
rx: Receiver<CaConnSetItem>,
|
||||
stats: Arc<CaConnSetStats>,
|
||||
ioc_finder_stats: Arc<IocFinderStats>,
|
||||
jh: JoinHandle<Result<(), Error>>,
|
||||
}
|
||||
@@ -307,10 +306,6 @@ impl CaConnSetCtrl {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn stats(&self) -> &Arc<CaConnSetStats> {
|
||||
&self.stats
|
||||
}
|
||||
|
||||
pub fn ioc_finder_stats(&self) -> &Arc<IocFinderStats> {
|
||||
&self.ioc_finder_stats
|
||||
}
|
||||
@@ -362,11 +357,11 @@ impl CanSendChannelInfoResult for SeriesLookupSender {
|
||||
|
||||
struct StateTransRes<'a> {
|
||||
backend: &'a str,
|
||||
stats: &'a CaConnSetStats,
|
||||
ca_conn_ress: &'a mut HashMap<SocketAddr, CaConnRes>,
|
||||
channel_info_query_qu: &'a mut VecDeque<ChannelInfoQuery>,
|
||||
channel_info_res_tx: Pin<&'a mut Sender<Result<ChannelInfoResult, Error>>>,
|
||||
chst: &'a mut ChannelState,
|
||||
mett: &'a mut CaConnSetMetrics,
|
||||
}
|
||||
|
||||
impl<'a> StateTransRes<'a> {
|
||||
@@ -374,11 +369,11 @@ impl<'a> StateTransRes<'a> {
|
||||
let chst = value.channel_states.get_mut_or_dummy_init(&chname);
|
||||
Self {
|
||||
backend: &value.backend,
|
||||
stats: &value.stats,
|
||||
ca_conn_ress: &mut value.ca_conn_ress,
|
||||
channel_info_query_qu: &mut value.channel_info_query_qu,
|
||||
channel_info_res_tx: value.channel_info_res_tx.as_mut(),
|
||||
chst,
|
||||
mett: &mut value.mett,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -414,13 +409,11 @@ pub struct CaConnSet {
|
||||
shutdown_stopping: bool,
|
||||
shutdown_done: bool,
|
||||
chan_check_next: Option<ChannelName>,
|
||||
stats: Arc<CaConnSetStats>,
|
||||
ioc_finder_jh: JoinHandle<Result<(), crate::ca::finder::Error>>,
|
||||
await_ca_conn_jhs: VecDeque<(SocketAddr, JoinHandle<Result<(), Error>>)>,
|
||||
thr_msg_storage_len: ThrottleTrace,
|
||||
rogue_channel_count: u64,
|
||||
cssid_latency_max: Duration,
|
||||
ca_connset_metrics: stats::mett::CaConnSetMetrics,
|
||||
mett: stats::mett::CaConnSetMetrics,
|
||||
}
|
||||
|
||||
impl CaConnSet {
|
||||
@@ -448,7 +441,6 @@ impl CaConnSet {
|
||||
)
|
||||
.unwrap();
|
||||
let (channel_info_res_tx, channel_info_res_rx) = async_channel::bounded(400);
|
||||
let stats = Arc::new(CaConnSetStats::new());
|
||||
let connset = Self {
|
||||
ticker: Self::new_self_ticker(),
|
||||
backend,
|
||||
@@ -479,23 +471,20 @@ impl CaConnSet {
|
||||
shutdown_stopping: false,
|
||||
shutdown_done: false,
|
||||
chan_check_next: None,
|
||||
stats: stats.clone(),
|
||||
connset_out_tx: Box::pin(connset_out_tx),
|
||||
connset_out_queue: VecDeque::new(),
|
||||
// connset_out_sender: SenderPolling::new(connset_out_tx),
|
||||
ioc_finder_jh,
|
||||
await_ca_conn_jhs: VecDeque::new(),
|
||||
thr_msg_storage_len: ThrottleTrace::new(Duration::from_millis(1000)),
|
||||
rogue_channel_count: 0,
|
||||
cssid_latency_max: Duration::from_millis(2000),
|
||||
ca_connset_metrics: stats::mett::CaConnSetMetrics::new(),
|
||||
mett: stats::mett::CaConnSetMetrics::new(),
|
||||
};
|
||||
// TODO await on jh
|
||||
let jh = tokio::spawn(CaConnSet::run(connset));
|
||||
CaConnSetCtrl {
|
||||
tx: connset_inp_tx,
|
||||
rx: connset_out_rx,
|
||||
stats,
|
||||
ioc_finder_stats,
|
||||
jh,
|
||||
}
|
||||
@@ -577,7 +566,7 @@ impl CaConnSet {
|
||||
ress.channel_info_query_qu.push_back(item);
|
||||
}
|
||||
if let Err(_) = cmd.restx.try_send(Ok(())) {
|
||||
ress.stats.command_reply_fail().inc();
|
||||
ress.mett.cmd_res_send_err().inc();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -588,7 +577,7 @@ impl CaConnSet {
|
||||
if cmd.ch_cfg == ress.chst.config {
|
||||
debug!("handle_add_channel_existing config same {}", cmd.name());
|
||||
if let Err(_) = cmd.restx.try_send(Ok(())) {
|
||||
ress.stats.command_reply_fail().inc();
|
||||
ress.mett.cmd_res_send_err().inc();
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
@@ -652,7 +641,7 @@ impl CaConnSet {
|
||||
}
|
||||
}
|
||||
if let Err(_) = cmd.restx.try_send(Ok(())) {
|
||||
ress.stats.command_reply_fail().inc();
|
||||
ress.mett.cmd_res_send_err().inc();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -663,7 +652,7 @@ impl CaConnSet {
|
||||
chst.1.touched = 0;
|
||||
}
|
||||
if let Err(_) = cmd.restx.try_send(Ok(())) {
|
||||
self.stats.command_reply_fail().inc();
|
||||
self.mett.cmd_res_send_err().inc();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -683,7 +672,7 @@ impl CaConnSet {
|
||||
self.handle_remove_channel(cmd)?;
|
||||
}
|
||||
if let Err(_) = cmd.restx.try_send(Ok(())) {
|
||||
self.stats.command_reply_fail().inc();
|
||||
self.mett.cmd_res_send_err().inc();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -697,7 +686,6 @@ impl CaConnSet {
|
||||
info!("handle_add_channel {:?}", cmd);
|
||||
}
|
||||
trace_channel_state!("handle_add_channel {:?}", cmd);
|
||||
self.stats.channel_add().inc();
|
||||
// TODO should I add the transition through ActiveChannelState::Init as well?
|
||||
let chname = ChannelName::new(cmd.name().into());
|
||||
let ress = StateTransRes::init(self, &chname);
|
||||
@@ -720,7 +708,7 @@ impl CaConnSet {
|
||||
CaConnEventValue::EndOfStream(reason) => self.handle_ca_conn_eos(addr, reason),
|
||||
CaConnEventValue::ChannelRemoved(name) => self.handle_ca_conn_channel_removed(addr, name),
|
||||
CaConnEventValue::Metrics(v) => {
|
||||
self.ca_connset_metrics.ca_conn().ingest(v);
|
||||
self.mett.ca_conn().ingest(v);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -766,7 +754,7 @@ impl CaConnSet {
|
||||
debug!("handle_add_channel but shutdown_stopping");
|
||||
return Ok(());
|
||||
}
|
||||
self.stats.channel_status_series_found().inc();
|
||||
self.mett.channel_status_series_found().inc();
|
||||
if series::dbg::dbg_chn(&name) {
|
||||
info!("handle_add_channel_with_status_id {cmd:?}");
|
||||
}
|
||||
@@ -805,17 +793,17 @@ impl CaConnSet {
|
||||
});
|
||||
let qu = IocAddrQuery::cached(name.into());
|
||||
self.find_ioc_query_queue.push_back(qu);
|
||||
self.stats.ioc_search_start().inc();
|
||||
self.mett.ioc_search_start().inc();
|
||||
} else {
|
||||
self.stats.logic_issue().inc();
|
||||
self.mett.logic_err().inc();
|
||||
trace!("TODO have a status series id but no more channel");
|
||||
}
|
||||
} else {
|
||||
self.stats.logic_issue().inc();
|
||||
self.mett.logic_err().inc();
|
||||
trace!("TODO have a status series id but no more channel");
|
||||
}
|
||||
} else {
|
||||
self.stats.logic_issue().inc();
|
||||
self.mett.logic_err().inc();
|
||||
trace!("TODO have a status series id but no more channel");
|
||||
}
|
||||
Ok(())
|
||||
@@ -842,7 +830,7 @@ impl CaConnSet {
|
||||
if let ChannelStateValue::Active(ast) = &mut chst.value {
|
||||
if let ActiveChannelState::WithStatusSeriesId(st3) = ast {
|
||||
trace!("handle_add_channel_with_addr INNER {cmd:?}");
|
||||
self.stats.handle_add_channel_with_addr().inc();
|
||||
self.mett.handle_add_channel_with_addr().inc();
|
||||
let tsnow = SystemTime::now();
|
||||
let mut writer_status = serieswriter::writer::SeriesWriter::new(SeriesId::new(cmd.cssid.id()))?;
|
||||
let mut writer_status_state = serieswriter::fixgridwriter::ChannelStatusWriteState::new(
|
||||
@@ -958,7 +946,7 @@ impl CaConnSet {
|
||||
if let ChannelStateValue::Active(ast) = &mut chst.value {
|
||||
if let ActiveChannelState::WithStatusSeriesId(st2) = ast {
|
||||
if let Some(addr) = res.addr {
|
||||
self.stats.ioc_addr_found().inc();
|
||||
self.mett.ioc_addr_found().inc();
|
||||
trace!("ioc found {res:?}");
|
||||
let cmd = ChannelAddWithAddr {
|
||||
ch_cfg: chst.config.clone(),
|
||||
@@ -967,21 +955,21 @@ impl CaConnSet {
|
||||
};
|
||||
self.handle_add_channel_with_addr(cmd)?;
|
||||
} else {
|
||||
self.stats.ioc_addr_not_found().inc();
|
||||
self.mett.ioc_addr_not_found().inc();
|
||||
trace!("ioc not found {res:?}");
|
||||
let since = SystemTime::now();
|
||||
st2.inner = WithStatusSeriesIdStateInner::UnknownAddress { since };
|
||||
}
|
||||
} else {
|
||||
self.stats.ioc_addr_result_for_unknown_channel().inc();
|
||||
self.mett.ioc_addr_result_for_unknown_channel().inc();
|
||||
warn!("TODO got address but no longer active");
|
||||
}
|
||||
} else {
|
||||
self.stats.ioc_addr_result_for_unknown_channel().inc();
|
||||
self.mett.ioc_addr_result_for_unknown_channel().inc();
|
||||
warn!("TODO got address but no longer active");
|
||||
}
|
||||
} else {
|
||||
self.stats.ioc_addr_result_for_unknown_channel().inc();
|
||||
self.mett.ioc_addr_result_for_unknown_channel().inc();
|
||||
warn!("ioc addr lookup done but channel no longer here");
|
||||
}
|
||||
}
|
||||
@@ -1021,7 +1009,7 @@ impl CaConnSet {
|
||||
.collect();
|
||||
let item = ChannelStatusesResponse { channels_ca_conn_set };
|
||||
if req.tx.try_send(item).is_err() {
|
||||
self.stats.response_tx_fail.inc();
|
||||
self.mett.chan_send_err().inc();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -1085,9 +1073,9 @@ impl CaConnSet {
|
||||
fn apply_ca_conn_health_update(&mut self, addr: SocketAddr, res: ChannelStatusPartial) -> Result<(), Error> {
|
||||
trace_health_update!("apply_ca_conn_health_update {addr}");
|
||||
let tsnow = SystemTime::now();
|
||||
self.rogue_channel_count = 0;
|
||||
let mut rogue_channel_count = 0;
|
||||
for (k, v) in res.channel_statuses {
|
||||
trace_health_update!("self.rogue_channel_count {}", self.rogue_channel_count);
|
||||
trace_health_update!("self.rogue_channel_count {}", rogue_channel_count);
|
||||
trace_health_update!("apply_ca_conn_health_update {k:?} {v:?}");
|
||||
let ch = if let Some(x) = self.channel_by_cssid.get(&k) {
|
||||
x
|
||||
@@ -1103,30 +1091,30 @@ impl CaConnSet {
|
||||
} = &mut st3.inner
|
||||
{
|
||||
if SocketAddr::V4(*conn_addr) != addr {
|
||||
self.rogue_channel_count += 1;
|
||||
rogue_channel_count += 1;
|
||||
}
|
||||
if let WithAddressState::Assigned(st5) = st4 {
|
||||
st5.updated = tsnow;
|
||||
st5.health_update_count += 1;
|
||||
st5.value = ConnectionStateValue::ChannelStateInfo(v);
|
||||
} else {
|
||||
self.rogue_channel_count += 1;
|
||||
rogue_channel_count += 1;
|
||||
}
|
||||
} else {
|
||||
self.rogue_channel_count += 1;
|
||||
rogue_channel_count += 1;
|
||||
}
|
||||
} else {
|
||||
self.rogue_channel_count += 1;
|
||||
rogue_channel_count += 1;
|
||||
}
|
||||
} else {
|
||||
self.rogue_channel_count += 1;
|
||||
rogue_channel_count += 1;
|
||||
}
|
||||
} else {
|
||||
self.rogue_channel_count += 1;
|
||||
rogue_channel_count += 1;
|
||||
}
|
||||
}
|
||||
trace_health_update!("self.rogue_channel_count {}", self.rogue_channel_count);
|
||||
self.stats.channel_rogue.set(self.rogue_channel_count);
|
||||
trace_health_update!("rogue_channel_count {}", rogue_channel_count);
|
||||
self.mett.channel_rogue().set(rogue_channel_count);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1167,10 +1155,10 @@ impl CaConnSet {
|
||||
fn handle_ca_conn_eos(&mut self, addr: SocketAddr, reason: EndOfStreamReason) -> Result<(), Error> {
|
||||
debug!("handle_ca_conn_eos {addr} {reason:?}");
|
||||
if let Some(e) = self.ca_conn_ress.remove(&addr) {
|
||||
self.stats.ca_conn_eos_ok().inc();
|
||||
self.mett.ca_conn_eos_ok().inc();
|
||||
self.await_ca_conn_jhs.push_back((addr, e.jh));
|
||||
} else {
|
||||
self.stats.ca_conn_eos_unexpected().inc();
|
||||
self.mett.ca_conn_eos_unexpected().inc();
|
||||
warn!("end-of-stream received for non-existent CaConn {addr}");
|
||||
}
|
||||
{
|
||||
@@ -1299,7 +1287,7 @@ impl CaConnSet {
|
||||
}
|
||||
}
|
||||
|
||||
fn create_ca_conn(&self, add: ChannelAddWithAddr) -> Result<CaConnRes, Error> {
|
||||
fn create_ca_conn(&mut self, add: ChannelAddWithAddr) -> Result<CaConnRes, Error> {
|
||||
// TODO should we save this as event?
|
||||
let opts = CaConnOpts::default();
|
||||
let addr = add.addr;
|
||||
@@ -1308,7 +1296,7 @@ impl CaConnSet {
|
||||
} else {
|
||||
return Err(Error::ExpectIpv4);
|
||||
};
|
||||
self.stats.create_ca_conn().inc();
|
||||
self.mett.create_ca_conn().inc();
|
||||
let conn = CaConn::new(
|
||||
opts,
|
||||
self.backend.clone(),
|
||||
@@ -1412,6 +1400,8 @@ impl CaConnSet {
|
||||
|
||||
fn check_channel_states(&mut self, tsnow: Instant, stnow: SystemTime) -> Result<(), Error> {
|
||||
let (mut search_pending_count, mut assigned_without_health_update) = self.update_channel_state_counts();
|
||||
let mut channel_health_timeout_soon = 0;
|
||||
let mut channel_health_timeout_reached = 0;
|
||||
let mut cmd_remove_channel = Vec::new();
|
||||
let mut cmd_add_channel = Vec::new();
|
||||
let k = self.chan_check_next.take();
|
||||
@@ -1429,7 +1419,7 @@ impl CaConnSet {
|
||||
ChannelStateValue::Active(st2) => match st2 {
|
||||
ActiveChannelState::Init { since: _ } => {
|
||||
// TODO no longer used? remove?
|
||||
self.stats.logic_error().inc();
|
||||
self.mett.logic_err().inc();
|
||||
}
|
||||
ActiveChannelState::WaitForStatusSeriesId { since } => {
|
||||
let dt = stnow.duration_since(*since).unwrap_or(Duration::ZERO);
|
||||
@@ -1474,10 +1464,10 @@ impl CaConnSet {
|
||||
}
|
||||
Assigned(st4) => {
|
||||
if st4.updated + CHANNEL_HEALTH_TIMEOUT / 3 < stnow {
|
||||
self.stats.channel_health_timeout_soon().inc();
|
||||
channel_health_timeout_soon += 1;
|
||||
}
|
||||
if st4.updated + CHANNEL_HEALTH_TIMEOUT < stnow {
|
||||
self.stats.channel_health_timeout().inc();
|
||||
channel_health_timeout_reached += 1;
|
||||
trace!("health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~");
|
||||
// TODO
|
||||
error!("TODO health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~");
|
||||
@@ -1555,6 +1545,10 @@ impl CaConnSet {
|
||||
break;
|
||||
}
|
||||
}
|
||||
self.mett
|
||||
.channel_health_timeout_reached()
|
||||
.set(channel_health_timeout_reached);
|
||||
self.mett.channel_health_timeout_soon().set(channel_health_timeout_soon);
|
||||
loop {
|
||||
break if search_pending_count >= CURRENT_SEARCH_PENDING_MAX as _ {
|
||||
} else {
|
||||
@@ -1572,7 +1566,7 @@ impl CaConnSet {
|
||||
st3.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: stnow };
|
||||
let qu = IocAddrQuery::uncached(ch.name().into());
|
||||
self.find_ioc_query_queue.push_back(qu);
|
||||
self.stats.ioc_search_start().inc();
|
||||
self.mett.ioc_search_start().inc();
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
@@ -1600,7 +1594,7 @@ impl CaConnSet {
|
||||
}
|
||||
|
||||
// TODO should use both counters and values
|
||||
fn update_channel_state_counts(&mut self) -> (u64, u64) {
|
||||
fn update_channel_state_counts(&mut self) -> (u32, u32) {
|
||||
let mut unknown_address = 0;
|
||||
let mut search_pending = 0;
|
||||
let mut no_address = 0;
|
||||
@@ -1663,16 +1657,16 @@ impl CaConnSet {
|
||||
ChannelStateValue::InitDummy => {}
|
||||
}
|
||||
}
|
||||
self.stats.channel_unknown_address.set(unknown_address);
|
||||
self.stats.channel_search_pending.set(search_pending);
|
||||
self.stats.channel_no_address.set(no_address);
|
||||
self.stats.channel_unassigned.set(unassigned);
|
||||
// self.stats.channel_backoff.set(backoff);
|
||||
self.stats.channel_assigned.set(assigned);
|
||||
self.stats.channel_connected.set(connected);
|
||||
self.stats.channel_maybe_wrong_address.set(maybe_wrong_address);
|
||||
self.stats
|
||||
.channel_assigned_without_health_update
|
||||
self.mett.channel_unknown_address().set(unknown_address);
|
||||
self.mett.channel_search_pending().set(search_pending);
|
||||
self.mett.channel_no_address().set(no_address);
|
||||
self.mett.channel_unassigned().set(unassigned);
|
||||
// self.mett.channel_backoff().set(backoff);
|
||||
self.mett.channel_assigned().set(assigned);
|
||||
self.mett.channel_connected().set(connected);
|
||||
self.mett.channel_maybe_wrong_address().set(maybe_wrong_address);
|
||||
self.mett
|
||||
.channel_assigned_without_health_update()
|
||||
.set(assigned_without_health_update);
|
||||
(search_pending, assigned_without_health_update)
|
||||
}
|
||||
@@ -1687,7 +1681,7 @@ impl CaConnSet {
|
||||
break if tx.is_sending() {
|
||||
match tx.poll_unpin(cx) {
|
||||
Ready(Ok(())) => {
|
||||
self.stats.try_push_ca_conn_cmds_sent.inc();
|
||||
self.mett.try_push_ca_conn_cmds_sent().inc();
|
||||
have_progress = true;
|
||||
continue;
|
||||
}
|
||||
@@ -1702,7 +1696,7 @@ impl CaConnSet {
|
||||
// Should be nothing to do here.
|
||||
// The connection ended, which CaConnSet notices anyway.
|
||||
// self.handle_connect_fail(addr)?;
|
||||
self.stats.try_push_ca_conn_cmds_closed().inc();
|
||||
self.mett.try_push_ca_conn_cmds_closed().inc();
|
||||
}
|
||||
},
|
||||
Pending => {
|
||||
@@ -1748,7 +1742,7 @@ impl CaConnSet {
|
||||
}
|
||||
{
|
||||
// let item = std::mem::replace(&mut self.ca_connset_metrics, CaConnSetMetrics::new());
|
||||
let item = self.ca_connset_metrics.take_and_reset();
|
||||
let item = self.mett.take_and_reset();
|
||||
let item = CaConnSetItem::Metrics(item);
|
||||
self.connset_out_queue.push_back(item);
|
||||
}
|
||||
@@ -1852,29 +1846,30 @@ impl Stream for CaConnSet {
|
||||
use Poll::*;
|
||||
trace4!("CaConnSet poll begin");
|
||||
let poll_ts1 = Instant::now();
|
||||
self.stats.poll_fn_begin().inc();
|
||||
self.mett.poll_fn_begin().inc();
|
||||
let ret = loop {
|
||||
trace4!("CaConnSet poll loop");
|
||||
self.stats.poll_loop_begin().inc();
|
||||
|
||||
// TODO generalize to all combinations
|
||||
self.stats.storage_insert_tx_len.set(self.iqtx.st_rf3_tx.len() as _);
|
||||
self.stats
|
||||
.storage_insert_queue_len
|
||||
.set(self.storage_insert_st_qu.len() as _);
|
||||
self.stats
|
||||
.channel_info_query_queue_len
|
||||
.set(self.channel_info_query_qu.len() as _);
|
||||
self.stats
|
||||
.channel_info_query_sender_len
|
||||
.set(self.channel_info_query_sender.len().unwrap_or(0) as _);
|
||||
self.stats
|
||||
.channel_info_res_tx_len
|
||||
.set(self.channel_info_res_tx.len() as _);
|
||||
self.stats
|
||||
.find_ioc_query_sender_len
|
||||
.set(self.find_ioc_query_sender.len().unwrap_or(0) as _);
|
||||
self.stats.ca_conn_res_tx_len.set(self.ca_conn_res_tx.len() as _);
|
||||
{
|
||||
let mut self2 = self.as_mut().get_mut();
|
||||
self2.mett.poll_loop_begin().inc();
|
||||
self2
|
||||
.mett
|
||||
.channel_info_query_queue_len()
|
||||
.set(self2.channel_info_query_qu.len() as _);
|
||||
self2
|
||||
.mett
|
||||
.channel_info_query_sender_len()
|
||||
.set(self2.channel_info_query_sender.len().unwrap_or(0) as _);
|
||||
self2
|
||||
.mett
|
||||
.channel_info_res_tx_len()
|
||||
.set(self2.channel_info_res_tx.len() as _);
|
||||
self2
|
||||
.mett
|
||||
.find_ioc_query_sender_len()
|
||||
.set(self2.find_ioc_query_sender.len().unwrap_or(0) as _);
|
||||
self2.mett.ca_conn_res_tx_len().set(self2.ca_conn_res_tx.len() as _);
|
||||
}
|
||||
|
||||
let mut penpro = PendingProgress::new();
|
||||
|
||||
@@ -1910,15 +1905,15 @@ impl Stream for CaConnSet {
|
||||
let left = self.await_ca_conn_jhs.len();
|
||||
match x {
|
||||
Ok(Ok(())) => {
|
||||
self.stats.ca_conn_task_join_done_ok.inc();
|
||||
self.mett.ca_conn_task_join_done_ok().inc();
|
||||
debug!("CaConn {addr} finished well left {left}");
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
self.stats.ca_conn_task_join_done_err.inc();
|
||||
self.mett.ca_conn_task_join_done_err().inc();
|
||||
error!("CaConn {addr} task error: {e} left {left}");
|
||||
}
|
||||
Err(e) => {
|
||||
self.stats.ca_conn_task_join_err.inc();
|
||||
self.mett.ca_conn_task_join_err().inc();
|
||||
error!("CaConn {addr} join error: {e} left {left}");
|
||||
}
|
||||
}
|
||||
@@ -1934,7 +1929,7 @@ impl Stream for CaConnSet {
|
||||
let this = self.as_mut().get_mut();
|
||||
let qu = &mut this.storage_insert_st_qu;
|
||||
let tx = this.storage_insert_st_tx.as_mut();
|
||||
let counter = this.stats.storage_insert_queue_send();
|
||||
let counter = this.mett.storage_insert_queue_send();
|
||||
let x = sender_polling_send(qu, tx, cx, || {
|
||||
counter.inc();
|
||||
});
|
||||
@@ -1946,7 +1941,7 @@ impl Stream for CaConnSet {
|
||||
let this = self.as_mut().get_mut();
|
||||
let qu = &mut this.storage_insert_lt_qu;
|
||||
let tx = this.storage_insert_lt_tx.as_mut();
|
||||
let counter = this.stats.storage_insert_queue_send();
|
||||
let counter = this.mett.storage_insert_queue_send();
|
||||
let x = sender_polling_send(qu, tx, cx, || {
|
||||
counter.inc();
|
||||
});
|
||||
@@ -2030,23 +2025,23 @@ impl Stream for CaConnSet {
|
||||
}
|
||||
|
||||
break if self.ready_for_end_of_stream() {
|
||||
self.stats.ready_for_end_of_stream().inc();
|
||||
self.mett.ready_for_end_of_stream().inc();
|
||||
if penpro.progress() {
|
||||
self.stats.ready_for_end_of_stream_with_progress().inc();
|
||||
self.mett.ready_for_end_of_stream_with_progress().inc();
|
||||
continue;
|
||||
} else {
|
||||
Ready(None)
|
||||
}
|
||||
} else {
|
||||
if penpro.progress() {
|
||||
self.stats.poll_reloop().inc();
|
||||
self.mett.poll_reloop().inc();
|
||||
continue;
|
||||
} else {
|
||||
if penpro.pending() {
|
||||
self.stats.poll_pending().inc();
|
||||
self.mett.poll_pending().inc();
|
||||
Pending
|
||||
} else {
|
||||
self.stats.poll_no_progress_no_pending().inc();
|
||||
self.mett.poll_no_progress_no_pending().inc();
|
||||
let e = Error::NoProgressNoPending;
|
||||
Ready(Some(CaConnSetItem::Error(e)))
|
||||
}
|
||||
@@ -2056,7 +2051,7 @@ impl Stream for CaConnSet {
|
||||
trace4!("CaConnSet poll done");
|
||||
let poll_ts2 = Instant::now();
|
||||
let dt = poll_ts2.saturating_duration_since(poll_ts1);
|
||||
self.stats.poll_all_dt().ingest((1e3 * dt.as_secs_f32()) as u32);
|
||||
self.mett.poll_all_dt().push_dur_100us(dt);
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,7 +33,6 @@ use scywr::iteminsertqueue::QueryItem;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use serde_json::json;
|
||||
use stats::CaConnSetStats;
|
||||
use stats::CaProtoStats;
|
||||
use stats::DaemonStats;
|
||||
use stats::InsertWorkerStats;
|
||||
@@ -130,7 +129,6 @@ impl IntoResponse for CustomErrorResponse {
|
||||
#[derive(Clone)]
|
||||
pub struct StatsSet {
|
||||
daemon: Arc<DaemonStats>,
|
||||
ca_conn_set: Arc<CaConnSetStats>,
|
||||
insert_worker_stats: Arc<InsertWorkerStats>,
|
||||
series_by_channel_stats: Arc<SeriesByChannelStats>,
|
||||
ioc_finder_stats: Arc<IocFinderStats>,
|
||||
@@ -140,7 +138,6 @@ pub struct StatsSet {
|
||||
impl StatsSet {
|
||||
pub fn new(
|
||||
daemon: Arc<DaemonStats>,
|
||||
ca_conn_set: Arc<CaConnSetStats>,
|
||||
insert_worker_stats: Arc<InsertWorkerStats>,
|
||||
series_by_channel_stats: Arc<SeriesByChannelStats>,
|
||||
ioc_finder_stats: Arc<IocFinderStats>,
|
||||
@@ -148,7 +145,6 @@ impl StatsSet {
|
||||
) -> Self {
|
||||
Self {
|
||||
daemon,
|
||||
ca_conn_set,
|
||||
insert_worker_stats,
|
||||
series_by_channel_stats,
|
||||
ioc_finder_stats,
|
||||
@@ -354,7 +350,6 @@ fn metricbeat(stats_set: &StatsSet) -> axum::Json<serde_json::Value> {
|
||||
fn metrics(stats_set: &StatsSet) -> String {
|
||||
let ss = [
|
||||
stats_set.daemon.prometheus(),
|
||||
stats_set.ca_conn_set.prometheus(),
|
||||
stats_set.insert_worker_stats.prometheus(),
|
||||
stats_set.series_by_channel_stats.prometheus(),
|
||||
stats_set.ioc_finder_stats.prometheus(),
|
||||
|
||||
@@ -7,7 +7,6 @@ edition = "2024"
|
||||
[dependencies]
|
||||
futures-util = "0.3.28"
|
||||
async-channel = "2.3.1"
|
||||
#scylla = "0.15.1"
|
||||
scylla = "1.1"
|
||||
smallvec = "1.11.0"
|
||||
pin-project = "1.1.5"
|
||||
|
||||
@@ -87,6 +87,54 @@ mod Metrics {
|
||||
type Input = CaConnMetrics;
|
||||
type Name = ca_conn;
|
||||
}
|
||||
enum counters {
|
||||
poll_fn_begin,
|
||||
poll_loop_begin,
|
||||
ready_for_end_of_stream,
|
||||
ready_for_end_of_stream_with_progress,
|
||||
poll_reloop,
|
||||
poll_pending,
|
||||
poll_no_progress_no_pending,
|
||||
ioc_search_start,
|
||||
chan_send_err,
|
||||
cmd_res_send_err,
|
||||
logic_err,
|
||||
channel_status_series_found,
|
||||
ioc_addr_found,
|
||||
ioc_addr_not_found,
|
||||
ioc_addr_result_for_unknown_channel,
|
||||
ca_conn_eos_ok,
|
||||
ca_conn_eos_unexpected,
|
||||
handle_add_channel_with_addr,
|
||||
try_push_ca_conn_cmds_sent,
|
||||
try_push_ca_conn_cmds_closed,
|
||||
create_ca_conn,
|
||||
storage_insert_queue_send,
|
||||
ca_conn_task_join_done_ok,
|
||||
ca_conn_task_join_done_err,
|
||||
ca_conn_task_join_err,
|
||||
}
|
||||
enum values {
|
||||
channel_info_query_queue_len,
|
||||
channel_info_query_sender_len,
|
||||
channel_info_res_tx_len,
|
||||
ca_conn_res_tx_len,
|
||||
find_ioc_query_sender_len,
|
||||
channel_rogue,
|
||||
channel_unknown_address,
|
||||
channel_search_pending,
|
||||
channel_no_address,
|
||||
channel_unassigned,
|
||||
channel_assigned,
|
||||
channel_connected,
|
||||
channel_maybe_wrong_address,
|
||||
channel_assigned_without_health_update,
|
||||
channel_health_timeout_soon,
|
||||
channel_health_timeout_reached,
|
||||
}
|
||||
enum histolog2s {
|
||||
poll_all_dt,
|
||||
}
|
||||
}
|
||||
|
||||
mod Metrics {
|
||||
|
||||
@@ -318,64 +318,6 @@ stats_proc::stats_struct!((
|
||||
),
|
||||
histolog2s(payload_size, data_count, outbuf_len,),
|
||||
),
|
||||
stats_struct(
|
||||
name(CaConnSetStats),
|
||||
prefix(connset),
|
||||
counters(
|
||||
channel_add,
|
||||
channel_status_series_found,
|
||||
channel_health_timeout_soon,
|
||||
channel_health_timeout,
|
||||
ioc_search_start,
|
||||
ioc_addr_found,
|
||||
ioc_addr_not_found,
|
||||
ioc_addr_result_for_unknown_channel,
|
||||
ca_conn_task_begin,
|
||||
ca_conn_task_done,
|
||||
ca_conn_task_join_done_ok,
|
||||
ca_conn_task_join_done_err,
|
||||
ca_conn_task_join_err,
|
||||
ca_conn_eos_ok,
|
||||
ca_conn_eos_err,
|
||||
ca_conn_eos_unexpected,
|
||||
response_tx_fail,
|
||||
try_push_ca_conn_cmds_sent,
|
||||
try_push_ca_conn_cmds_closed,
|
||||
logic_error,
|
||||
logic_issue,
|
||||
ready_for_end_of_stream,
|
||||
ready_for_end_of_stream_with_progress,
|
||||
poll_fn_begin,
|
||||
poll_loop_begin,
|
||||
poll_pending,
|
||||
poll_reloop,
|
||||
poll_no_progress_no_pending,
|
||||
handle_add_channel_with_addr,
|
||||
create_ca_conn,
|
||||
command_reply_fail,
|
||||
storage_insert_queue_send,
|
||||
),
|
||||
values(
|
||||
storage_insert_queue_len,
|
||||
storage_insert_tx_len,
|
||||
channel_info_query_queue_len,
|
||||
channel_info_query_sender_len,
|
||||
channel_info_res_tx_len,
|
||||
find_ioc_query_sender_len,
|
||||
ca_conn_res_tx_len,
|
||||
channel_unknown_address,
|
||||
channel_search_pending,
|
||||
channel_no_address,
|
||||
channel_unassigned,
|
||||
channel_backoff,
|
||||
channel_assigned,
|
||||
channel_connected,
|
||||
channel_maybe_wrong_address,
|
||||
channel_assigned_without_health_update,
|
||||
channel_rogue,
|
||||
),
|
||||
histolog2s(poll_all_dt,),
|
||||
),
|
||||
stats_struct(
|
||||
name(SeriesByChannelStats),
|
||||
prefix(seriesbychannel),
|
||||
|
||||
Reference in New Issue
Block a user