Remove one old stats type

This commit is contained in:
Dominik Werder
2025-04-30 10:03:56 +02:00
parent 6cd821378b
commit 7a536ef7f6
7 changed files with 41 additions and 193 deletions

View File

@@ -824,7 +824,7 @@ impl Daemon {
ret
}
fn spawn_ticker(tx: Sender<DaemonEvent>, stats: Arc<DaemonStats>) {
fn spawn_ticker(tx: Sender<DaemonEvent>) {
let (ticker_inp_tx, ticker_inp_rx) = async_channel::bounded::<u32>(1);
let ticker = {
async move {
@@ -850,7 +850,6 @@ impl Daemon {
Ok(_) => {}
Err(_) => {
panic!("can not acquire timer ticker token");
break;
}
}
}
@@ -865,7 +864,6 @@ impl Daemon {
let tx = self.tx.clone();
let daemon_stats = self.stats().clone();
let connset_cmd_tx = self.connset_ctrl.sender().clone();
let ca_conn_stats = self.connset_ctrl.ca_conn_stats().clone();
let dcom = Arc::new(netfetch::metrics::DaemonComm::new(tx.clone()));
let rres = RoutesResources::new(
self.ingest_opts.backend().into(),
@@ -886,7 +884,6 @@ impl Daemon {
let stats_set = StatsSet::new(
daemon_stats,
conn_set_stats,
ca_conn_stats,
self.insert_worker_stats.clone(),
self.series_by_channel_stats.clone(),
self.connset_ctrl.ioc_finder_stats().clone(),
@@ -908,7 +905,7 @@ impl Daemon {
pub async fn daemon(mut self) -> Result<(), Error> {
self.spawn_metrics().await?;
Self::spawn_ticker(self.tx.clone(), self.stats.clone());
Self::spawn_ticker(self.tx.clone());
loop {
if self.shutting_down {
break;
@@ -1001,11 +998,6 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option<ChannelsConfig>) ->
}
info!("database check done");
// TODO use a new stats type:
//let store_stats = Arc::new(CaConnStats::new());
//let metrics_agg_fut = metrics_agg_task(ingest_commons.clone(), local_stats.clone(), store_stats.clone());
//let metrics_agg_jh = tokio::spawn(metrics_agg_fut);
let channels_config = if opts.test_bsread_addr.is_some() {
None
} else {

View File

@@ -60,9 +60,8 @@ use serieswriter::fixgridwriter::ChannelStatusWriteState;
use serieswriter::msptool::MspSplit;
use serieswriter::rtwriter::RtWriter;
use serieswriter::writer::EmittableType;
use stats::CaConnStats;
use stats::CaProtoStats;
use stats::IntervalEma;
use stats::mett::CaConnMetrics;
use stats::rand_xoshiro::Xoshiro128PlusPlus;
use stats::rand_xoshiro::rand_core::RngCore;
use stats::rand_xoshiro::rand_core::SeedableRng;
@@ -71,7 +70,6 @@ use std::collections::VecDeque;
use std::fmt;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic;
use std::sync::atomic::AtomicUsize;
use std::task::Context;
@@ -958,11 +956,6 @@ impl ConnCommandResult {
pub fn id(&self) -> usize {
self.id
}
fn make_id() -> usize {
static ID: AtomicUsize = AtomicUsize::new(0);
ID.fetch_add(1, atomic::Ordering::AcqRel)
}
}
#[derive(Debug)]
@@ -1090,7 +1083,6 @@ pub struct CaConn {
iqdqs: InsertDeques,
remote_addr_dbg: SocketAddrV4,
local_epics_hostname: String,
stats: Arc<CaConnStats>,
conn_command_tx: Pin<Box<Sender<ConnCommand>>>,
conn_command_rx: Pin<Box<Receiver<ConnCommand>>>,
conn_backoff: f32,
@@ -1136,7 +1128,6 @@ impl CaConn {
local_epics_hostname: String,
iqtx: InsertQueuesTx,
channel_info_query_tx: Sender<ChannelInfoQuery>,
stats: Arc<CaConnStats>,
) -> Self {
let tsnow = Instant::now();
let (cq_tx, cq_rx) = async_channel::bounded(32);
@@ -1162,7 +1153,6 @@ impl CaConn {
iqdqs: InsertDeques::new(),
remote_addr_dbg,
local_epics_hostname,
stats,
conn_command_tx: Box::pin(cq_tx),
conn_command_rx: Box::pin(cq_rx),
conn_backoff: 0.02,
@@ -1486,7 +1476,7 @@ impl CaConn {
st2.channel.shape.clone(),
conf.conf.name().into(),
)?;
self.stats.get_series_id_ok().inc();
self.mett.get_series_id_ok().inc();
{
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
@@ -1564,10 +1554,6 @@ impl CaConn {
}
}
pub fn stats(&self) -> Arc<CaConnStats> {
self.stats.clone()
}
pub fn channel_add(&mut self, conf: ChannelConfig, cssid: ChannelStatusSeriesId) -> Result<(), Error> {
debug!("channel_add {conf:?} {cssid:?}");
if false {
@@ -1907,7 +1893,7 @@ impl CaConn {
let writer = &mut st.writer;
let binwriter = &mut st.binwriter;
let iqdqs = &mut self.iqdqs;
let stats = self.stats.as_ref();
let mett = &mut self.mett;
Self::event_add_ingest(
ev.payload_len,
ev.value,
@@ -1920,7 +1906,7 @@ impl CaConn {
stnow,
tscaproto,
ch_conf.use_ioc_time(),
stats,
mett,
&mut self.rng,
)?;
}
@@ -1939,7 +1925,7 @@ impl CaConn {
let writer = &mut st.writer;
let binwriter = &mut st.binwriter;
let iqdqs = &mut self.iqdqs;
let stats = self.stats.as_ref();
let mett = &mut self.mett;
st2.monitoring_event_last = Some(ev.clone());
Self::event_add_ingest(
ev.payload_len,
@@ -1953,7 +1939,7 @@ impl CaConn {
stnow,
tscaproto,
ch_conf.use_ioc_time(),
stats,
mett,
&mut self.rng,
)?;
}
@@ -2118,7 +2104,7 @@ impl CaConn {
}
st2.tick = PollTickState::Idle(PollTickStateIdle { next });
let iqdqs = &mut self.iqdqs;
let stats = self.stats.as_ref();
let mett = &mut self.mett;
Self::read_notify_res_for_write(
ev,
ch_wrst,
@@ -2128,13 +2114,13 @@ impl CaConn {
tsnow,
tscaproto,
ch_conf.use_ioc_time(),
stats,
mett,
&mut self.rng,
)?;
}
},
ReadingState::EnableMonitoring(_) => {
self.stats.recv_read_notify_while_enabling_monitoring.inc();
self.mett.recv_read_notify_while_enabling_monitoring().inc();
}
ReadingState::Monitoring(st2) => match &mut st2.mon2state {
Monitoring2State::Passive(st3) => {
@@ -2184,7 +2170,7 @@ impl CaConn {
.emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?;
}
let iqdqs = &mut self.iqdqs;
let stats = self.stats.as_ref();
let mett = &mut self.mett;
// NOTE we do not update the last value in this ev handler.
{
if let Some(lst) = st2.monitoring_event_last.as_ref() {
@@ -2221,7 +2207,7 @@ impl CaConn {
tsnow,
tscaproto,
ch_conf.use_ioc_time(),
stats,
mett,
&mut self.rng,
)?;
}
@@ -2254,7 +2240,7 @@ impl CaConn {
tsnow: Instant,
tscaproto: Instant,
use_ioc_time: bool,
stats: &CaConnStats,
mett: &mut CaConnMetrics,
rng: &mut Xoshiro128PlusPlus,
) -> Result<(), Error> {
let crst = &mut st.channel;
@@ -2272,7 +2258,7 @@ impl CaConn {
stnow,
tscaproto,
use_ioc_time,
stats,
mett,
rng,
)?;
Ok(())
@@ -2290,7 +2276,7 @@ impl CaConn {
stnow: SystemTime,
tscaproto: Instant,
use_ioc_time: bool,
stats: &CaConnStats,
mett: &mut CaConnMetrics,
rng: &mut Xoshiro128PlusPlus,
) -> Result<(), Error> {
{
@@ -2327,7 +2313,7 @@ impl CaConn {
.emit_channel_status_item(item, Self::channel_status_qu(iqdqs))
.is_err()
{
stats.logic_error().inc();
mett.logic_error().inc();
}
}
}
@@ -2335,7 +2321,8 @@ impl CaConn {
{
let ts = value.ts().ok_or_else(|| Error::MissingTimestamp)?;
let ts_diff = ts.abs_diff(tsev_local.ns());
stats.ca_ts_off().ingest((ts_diff / MS) as u32);
let ts_diff_dur = Duration::from_nanos(ts_diff);
mett.ca_ts_off().push_dur_100us(ts_diff_dur);
}
{
let tsev = if use_ioc_time {
@@ -2370,7 +2357,6 @@ impl CaConn {
}
if false {
// TODO record stats on drop with the new filter
stats.channel_fast_item_drop.inc();
{
if tsnow.duration_since(crst.insert_recv_ivl_last) >= Duration::from_millis(10000) {
crst.insert_recv_ivl_last = tsnow;
@@ -2546,7 +2532,7 @@ impl CaConn {
self.ioid = self.ioid.wrapping_add(1);
self.read_ioids
.entry(ioid)
.and_modify(|e| {
.and_modify(|_| {
self.mett.ioid_read_error_exists().inc();
})
.or_insert_with(|| {
@@ -2631,7 +2617,7 @@ impl CaConn {
self.ioid = self.ioid.wrapping_add(1);
self.read_ioids
.entry(ioid)
.and_modify(|e| {
.and_modify(|_| {
self.mett.ioid_read_error_exists().inc();
})
.or_insert_with(|| {
@@ -3757,7 +3743,7 @@ impl Stream for CaConn {
}
let dt = lts3.saturating_duration_since(lts2);
// TODO STATS
self.stats.poll_op3_dt().ingest_dur_dms(dt);
self.mett.poll_op3_dt().push_dur_100us(dt);
if dt > max {
debug!("LONG OPERATION 3 {:.0} ms", 1e3 * dt.as_secs_f32());
}

View File

@@ -21,8 +21,6 @@ use proto::CaProto;
use scywr::insertqueues::InsertDeques;
use scywr::insertqueues::InsertQueuesTx;
use scywr::iteminsertqueue::QueryItem;
use stats::CaConnStats;
use stats::CaProtoStats;
use stats::rand_xoshiro::Xoshiro128PlusPlus;
use std::collections::VecDeque;
use std::fmt;
@@ -90,7 +88,7 @@ struct CaConn {
ca_conn_event_out_queue: VecDeque<CaConnEvent>,
ca_conn_event_out_queue_max: usize,
rng: Xoshiro128PlusPlus,
stats: Arc<CaConnStats>,
mett: stats::mett::CaConnMetrics,
}
impl CaConn {
@@ -101,8 +99,6 @@ impl CaConn {
local_epics_hostname: String,
iqtx: InsertQueuesTx,
channel_info_query_tx: Sender<ChannelInfoQuery>,
stats: Arc<CaConnStats>,
ca_proto_stats: Arc<CaProtoStats>,
) -> Self {
let tsnow = Instant::now();
let (cq_tx, cq_rx) = async_channel::bounded::<ConnCommand>(32);
@@ -115,7 +111,7 @@ impl CaConn {
ca_conn_event_out_queue: VecDeque::new(),
ca_conn_event_out_queue_max: 2000,
rng,
stats,
mett: stats::mett::CaConnMetrics::new(),
}
}
@@ -160,16 +156,14 @@ impl Stream for CaConn {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let mut durs = DurationMeasureSteps::new();
// TODO STATS
// self.stats.poll_fn_begin().inc();
self.mett.poll_fn_begin().inc();
let ret = loop {
// TODO STATS
// self.stats.poll_loop_begin().inc();
self.mett.poll_loop_begin().inc();
let qlen = self.iqdqs.len();
if qlen >= self.opts.insert_queue_max * 2 / 3 {
self.stats.insert_item_queue_pressure().inc();
self.mett.insert_item_queue_pressure().inc();
} else if qlen >= self.opts.insert_queue_max {
self.stats.insert_item_queue_full().inc();
self.mett.insert_item_queue_full().inc();
}
let mut hppv = HaveProgressPending::new();
let hpp = &mut hppv;

View File

@@ -45,8 +45,6 @@ use statemap::ConnectionStateValue;
use statemap::WithStatusSeriesIdState;
use statemap::WithStatusSeriesIdStateInner;
use stats::CaConnSetStats;
use stats::CaConnStats;
use stats::CaProtoStats;
use stats::IocFinderStats;
use std::collections::BTreeMap;
use std::collections::VecDeque;
@@ -134,18 +132,11 @@ pub struct CmdId(SocketAddrV4, usize);
pub struct CaConnRes {
sender: Pin<Box<SenderPolling<ConnCommand>>>,
stats: Arc<CaConnStats>,
cmd_queue: VecDeque<ConnCommand>,
// TODO await on jh
jh: JoinHandle<Result<(), Error>>,
}
impl CaConnRes {
pub fn stats(&self) -> &Arc<CaConnStats> {
&self.stats
}
}
#[derive(Debug, Clone)]
pub struct ChannelAddWithAddr {
ch_cfg: ChannelConfig,
@@ -257,7 +248,6 @@ pub struct CaConnSetCtrl {
tx: Sender<CaConnSetEvent>,
rx: Receiver<CaConnSetItem>,
stats: Arc<CaConnSetStats>,
ca_conn_stats: Arc<CaConnStats>,
ioc_finder_stats: Arc<IocFinderStats>,
jh: JoinHandle<Result<(), Error>>,
}
@@ -321,10 +311,6 @@ impl CaConnSetCtrl {
&self.stats
}
pub fn ca_conn_stats(&self) -> &Arc<CaConnStats> {
&self.ca_conn_stats
}
pub fn ioc_finder_stats(&self) -> &Arc<IocFinderStats> {
&self.ioc_finder_stats
}
@@ -429,7 +415,6 @@ pub struct CaConnSet {
shutdown_done: bool,
chan_check_next: Option<ChannelName>,
stats: Arc<CaConnSetStats>,
ca_conn_stats: Arc<CaConnStats>,
ioc_finder_jh: JoinHandle<Result<(), crate::ca::finder::Error>>,
await_ca_conn_jhs: VecDeque<(SocketAddr, JoinHandle<Result<(), Error>>)>,
thr_msg_storage_len: ThrottleTrace,
@@ -464,8 +449,6 @@ impl CaConnSet {
.unwrap();
let (channel_info_res_tx, channel_info_res_rx) = async_channel::bounded(400);
let stats = Arc::new(CaConnSetStats::new());
let ca_proto_stats = Arc::new(CaProtoStats::new());
let ca_conn_stats = Arc::new(CaConnStats::new());
let connset = Self {
ticker: Self::new_self_ticker(),
backend,
@@ -497,7 +480,6 @@ impl CaConnSet {
shutdown_done: false,
chan_check_next: None,
stats: stats.clone(),
ca_conn_stats: ca_conn_stats.clone(),
connset_out_tx: Box::pin(connset_out_tx),
connset_out_queue: VecDeque::new(),
// connset_out_sender: SenderPolling::new(connset_out_tx),
@@ -514,7 +496,6 @@ impl CaConnSet {
tx: connset_inp_tx,
rx: connset_out_rx,
stats,
ca_conn_stats,
ioc_finder_stats,
jh,
}
@@ -1337,10 +1318,8 @@ impl CaConnSet {
self.channel_info_query_tx
.clone()
.ok_or_else(|| Error::MissingChannelInfoChannelTx)?,
self.ca_conn_stats.clone(),
);
let conn_tx = conn.conn_command_tx();
let conn_stats = conn.stats();
let tx1 = self.ca_conn_res_tx.as_ref().get_ref().clone();
let log_level = "trace";
let logspan = if log_level == "trace" {
@@ -1352,12 +1331,11 @@ impl CaConnSet {
} else {
tracing::Span::none()
};
let fut = Self::ca_conn_item_merge(conn, tx1, addr, self.stats.clone());
let fut = Self::ca_conn_item_merge(conn, tx1, addr);
let fut = fut.instrument(logspan);
let jh = tokio::spawn(fut);
let ca_conn_res = CaConnRes {
sender: Box::pin(conn_tx.into()),
stats: conn_stats,
cmd_queue: VecDeque::new(),
jh,
};
@@ -1368,12 +1346,9 @@ impl CaConnSet {
conn: CaConn,
tx1: Sender<(SocketAddr, CaConnEvent)>,
addr: SocketAddr,
stats: Arc<CaConnSetStats>,
) -> Result<(), Error> {
stats.ca_conn_task_begin().inc();
trace2!("ca_conn_consumer begin {}", addr);
let connstats = conn.stats();
let ret = Self::ca_conn_item_merge_inner(Box::pin(conn), tx1.clone(), addr, connstats).await;
let ret = Self::ca_conn_item_merge_inner(Box::pin(conn), tx1.clone(), addr).await;
trace2!("ca_conn_consumer ended {}", addr);
match ret {
Ok(x) => {
@@ -1385,7 +1360,6 @@ impl CaConnSet {
error!("ca_conn_item_merge received from inner: {e}");
}
}
stats.ca_conn_task_done().inc();
Ok(())
}
@@ -1393,7 +1367,6 @@ impl CaConnSet {
mut conn: Pin<Box<CaConn>>,
tx1: Sender<(SocketAddr, CaConnEvent)>,
addr: SocketAddr,
stats: Arc<CaConnStats>,
) -> Result<EndOfStreamReason, Error> {
let mut eos_reason = None;
while let Some(item) = conn.next().await {
@@ -1405,7 +1378,6 @@ impl CaConnSet {
// return Err(e);
warn!("CaConn {addr} EOS reason [{x:?}] after [{eos_reason:?}]");
}
stats.item_count.inc();
match item.value {
CaConnEventValue::None
| CaConnEventValue::EchoTimeout

View File

@@ -34,9 +34,6 @@ use serde::Deserialize;
use serde::Serialize;
use serde_json::json;
use stats::CaConnSetStats;
use stats::CaConnStats;
use stats::CaConnStatsAgg;
use stats::CaConnStatsAggDiff;
use stats::CaProtoStats;
use stats::DaemonStats;
use stats::InsertWorkerStats;
@@ -134,7 +131,6 @@ impl IntoResponse for CustomErrorResponse {
pub struct StatsSet {
daemon: Arc<DaemonStats>,
ca_conn_set: Arc<CaConnSetStats>,
ca_conn: Arc<CaConnStats>,
insert_worker_stats: Arc<InsertWorkerStats>,
series_by_channel_stats: Arc<SeriesByChannelStats>,
ioc_finder_stats: Arc<IocFinderStats>,
@@ -145,7 +141,6 @@ impl StatsSet {
pub fn new(
daemon: Arc<DaemonStats>,
ca_conn_set: Arc<CaConnSetStats>,
ca_conn: Arc<CaConnStats>,
insert_worker_stats: Arc<InsertWorkerStats>,
series_by_channel_stats: Arc<SeriesByChannelStats>,
ioc_finder_stats: Arc<IocFinderStats>,
@@ -154,7 +149,6 @@ impl StatsSet {
Self {
daemon,
ca_conn_set,
ca_conn,
insert_worker_stats,
series_by_channel_stats,
ioc_finder_stats,
@@ -358,13 +352,14 @@ fn metricbeat(stats_set: &StatsSet) -> axum::Json<serde_json::Value> {
}
fn metrics(stats_set: &StatsSet) -> String {
let s1 = stats_set.daemon.prometheus();
let s2 = stats_set.ca_conn_set.prometheus();
let s3 = stats_set.insert_worker_stats.prometheus();
let s4 = stats_set.ca_conn.prometheus();
let s5 = stats_set.series_by_channel_stats.prometheus();
let s7 = stats_set.ioc_finder_stats.prometheus();
[s1, s2, s3, s4, s5, s7].join("")
let ss = [
stats_set.daemon.prometheus(),
stats_set.ca_conn_set.prometheus(),
stats_set.insert_worker_stats.prometheus(),
stats_set.series_by_channel_stats.prometheus(),
stats_set.ioc_finder_stats.prometheus(),
];
ss.join("")
}
pub struct RoutesResources {
@@ -684,18 +679,3 @@ pub async fn metrics_service(
.await?;
Ok(())
}
pub async fn metrics_agg_task(local_stats: Arc<CaConnStats>, store_stats: Arc<CaConnStats>) -> Result<(), Error> {
use stats::rand_xoshiro::rand_core::RngCore;
let mut rng = stats::xoshiro_from_time();
let mut agg_last = CaConnStatsAgg::new();
loop {
let dt = rng.next_u32();
tokio::time::sleep(Duration::from_millis(500 + (dt as u64 & 0x7f))).await;
let agg = CaConnStatsAgg::new();
agg.push(&local_stats);
agg.push(&store_stats);
trace!("TODO metrics_agg_task");
agg_last = agg;
}
}

View File

@@ -53,6 +53,7 @@ mod Metrics {
recv_read_notify_state_read_pending,
recv_read_notify_state_read_pending_bad_ioid,
recv_read_notify_while_polling_idle,
recv_read_notify_while_enabling_monitoring,
no_cid_for_subid,
recv_read_notify_but_no_longer_ready,
recv_read_notify_but_not_init_yet,
@@ -69,8 +70,10 @@ mod Metrics {
caget_lat,
poll_reloops,
poll_all_dt,
poll_op3_dt,
iiq_batch_len,
pong_recv_lat,
ca_ts_off,
}
mod Compose {
type Input = ca_proto::mett::CaProtoMetrics;

View File

@@ -303,83 +303,6 @@ stats_proc::stats_struct!((
),
agg(name(DaemonStatsAgg), parent(DaemonStats)),
diff(name(DaemonStatsAggDiff), input(DaemonStatsAgg)),
stats_struct(
name(CaConnStats),
prefix(caconn),
counters(
insert_item_create,
inserts_val,
inserts_msp,
inserts_msp_grid,
inserts_queue_pop_for_global,
inserts_queue_push,
insert_item_queue_pressure,
insert_item_queue_full,
out_queue_full,
channel_fast_item_drop,
logic_error,
// TODO maybe rename: this is now only the recv of the intermediate queue:
store_worker_item_recv,
// TODO rename to make clear that this drop is voluntary because of user config choice:
// store_worker_fraction_drop,
// store_worker_ratelimit_drop,
// store_worker_insert_done,
// store_worker_insert_binned_done,
// store_worker_insert_overload,
// store_worker_insert_timeout,
// store_worker_insert_unavailable,
// store_worker_insert_error,
connection_status_insert_done,
channel_status_insert_done,
channel_info_insert_done,
ivl_insert_done,
mute_insert_done,
command_can_not_reply,
time_handle_conn_listen,
time_handle_peer_ready,
time_check_channels_state_init,
tcp_connected,
get_series_id_ok,
item_count,
stream_ready,
stream_pending,
channel_all_count,
channel_alive_count,
channel_not_alive_count,
channel_series_lookup_already_pending,
ping_start,
ping_no_proto,
pong_timeout,
storage_queue_send,
storage_queue_pending,
event_add_res_recv,
caget_issued,
caget_timeout,
unknown_subid,
unknown_ioid,
transition_to_polling,
transition_to_polling_already_in,
transition_to_polling_bad_state,
channel_add_exists,
recv_event_add_while_wait_on_read_notify,
recv_read_notify_state_passive_found_ioid,
recv_read_notify_state_passive,
recv_read_notify_state_read_pending_bad_ioid,
recv_read_notify_state_read_pending,
recv_read_notify_but_not_init_yet,
recv_read_notify_but_no_longer_ready,
recv_read_notify_while_enabling_monitoring,
recv_read_notify_while_polling_idle,
monitor_stale_read_begin,
monitor_stale_read_timeout,
ca_proto_no_version_as_first,
ca_proto_version_later,
no_cid_for_subid,
),
histolog2s(poll_op3_dt, ca_ts_off,),
),
agg(name(CaConnStatsAgg), parent(CaConnStats)),
diff(name(CaConnStatsAggDiff), input(CaConnStatsAgg)),
stats_struct(
name(CaProtoStats),
prefix(ca_proto),
@@ -453,8 +376,6 @@ stats_proc::stats_struct!((
),
histolog2s(poll_all_dt,),
),
// agg(name(CaConnSetStatsAgg), parent(CaConnSetStats)),
// diff(name(CaConnSetStatsDiff), input(CaConnSetStats)),
stats_struct(
name(SeriesByChannelStats),
prefix(seriesbychannel),