From 2e90ec19a0df1d44c92d923c8dd7dd519802ae73 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 10 Apr 2025 11:59:12 +0200 Subject: [PATCH] WIP refactor metrics --- daqingest/Cargo.toml | 2 +- daqingest/src/daemon.rs | 42 ++++++++++++++ daqingest/src/tools.rs | 35 ++++++------ netfetch/src/ca/conn.rs | 103 ++++++++++++++-------------------- netfetch/src/ca/conn2/conn.rs | 13 ++--- netfetch/src/daemon_common.rs | 2 + netfetch/src/metrics/types.rs | 6 +- scywr/src/insertworker.rs | 32 ++++++++++- stats/Cargo.toml | 3 +- stats/mettdecl.rs | 25 +++++++++ stats/src/mett.rs | 1 + stats/src/stats.rs | 26 +-------- 12 files changed, 173 insertions(+), 117 deletions(-) diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index c757a3e..31a83e9 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.3.0-aa.0" +version = "0.3.0-aa.1" authors = ["Dominik Werder "] edition = "2024" diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index abf3114..8536aa6 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -200,6 +200,7 @@ impl Daemon { let ignore_writes = ingest_opts.scylla_ignore_writes(); + let (insert_worker_output_tx, insert_worker_output_rx) = async_channel::bounded(256); let mut insert_worker_jhs = Vec::new(); if ingest_opts.scylla_disable() { @@ -209,6 +210,7 @@ impl Daemon { iqrx.st_rf1_rx, insert_worker_opts.clone(), insert_worker_stats.clone(), + insert_worker_output_tx.clone(), ) .await .map_err(Error::from_string)?; @@ -219,6 +221,7 @@ impl Daemon { iqrx.st_rf3_rx, insert_worker_opts.clone(), insert_worker_stats.clone(), + insert_worker_output_tx.clone(), ) .await .map_err(Error::from_string)?; @@ -229,6 +232,7 @@ impl Daemon { iqrx.mt_rf3_rx, insert_worker_opts.clone(), insert_worker_stats.clone(), + insert_worker_output_tx.clone(), ) .await .map_err(Error::from_string)?; @@ -239,6 +243,7 @@ impl Daemon { iqrx.lt_rf3_rx, insert_worker_opts.clone(), insert_worker_stats.clone(), + insert_worker_output_tx.clone(), ) .await .map_err(Error::from_string)?; @@ -249,6 +254,7 @@ impl Daemon { iqrx.lt_rf3_lat5_rx, insert_worker_opts.clone(), insert_worker_stats.clone(), + insert_worker_output_tx.clone(), ) .await .map_err(Error::from_string)?; @@ -265,6 +271,7 @@ impl Daemon { insert_worker_stats.clone(), ingest_opts.use_rate_limit_queue(), ignore_writes, + insert_worker_output_tx.clone(), ) .await .map_err(Error::from_string)?; @@ -281,6 +288,7 @@ impl Daemon { insert_worker_stats.clone(), ingest_opts.use_rate_limit_queue(), ignore_writes, + insert_worker_output_tx.clone(), ) .await .map_err(Error::from_string)?; @@ -297,6 +305,7 @@ impl Daemon { insert_worker_stats.clone(), ingest_opts.use_rate_limit_queue(), ignore_writes, + insert_worker_output_tx.clone(), ) .await .map_err(Error::from_string)?; @@ -315,6 +324,7 @@ impl Daemon { insert_worker_stats.clone(), ingest_opts.use_rate_limit_queue(), ignore_writes, + insert_worker_output_tx.clone(), ) .await .map_err(Error::from_string)?; @@ -356,6 +366,29 @@ impl Daemon { //jh.await.map_err(|e| e.to_string()).map_err(Error::from)??; } + { + // TODO join the task + let tx = daemon_ev_tx.clone(); + tokio::task::spawn(async move { + loop { + match insert_worker_output_rx.recv().await { + Ok(x) => { + match tx.send(DaemonEvent::ScyllaInsertWorkerOutput(x)).await { + Ok(()) => {} + Err(_) => { + // TODO + break; + } + } + } + Err(_) => { + // TODO + break; + } + } + } + }); + } let (metrics_shutdown_tx, metrics_shutdown_rx) = async_channel::bounded(8); let ret = Self { @@ -709,6 +742,15 @@ impl Daemon { } Ok(()) } + ScyllaInsertWorkerOutput(x) => { + use scywr::insertworker::InsertWorkerOutputItem::*; + match x { + Metrics(x) => { + self.daemon_metrics.scy_inswork().ingest(x); + Ok(()) + } + } + } }; let dt = ts1.elapsed(); if dt > Duration::from_millis(200) { diff --git a/daqingest/src/tools.rs b/daqingest/src/tools.rs index a674300..9a04620 100644 --- a/daqingest/src/tools.rs +++ b/daqingest/src/tools.rs @@ -6,18 +6,16 @@ use crate::opts::RemoveOlderAll; use chrono::DateTime; use chrono::Utc; use dbpg::conn::PgClient; -use err::thiserror; -use err::ThisError; -use futures_util::future; -use futures_util::stream; use futures_util::StreamExt; use futures_util::TryStreamExt; +use futures_util::future; +use futures_util::stream; use log::*; -use netpod::ttl::RetentionTime; use netpod::Database; use netpod::ScalarType; use netpod::Shape; use netpod::TsMs; +use netpod::ttl::RetentionTime; use scywr::config::ScyllaIngestConfig; use scywr::scylla::prepared_statement::PreparedStatement; use scywr::scylla::transport::errors::QueryError; @@ -28,19 +26,20 @@ use std::sync::Arc; use std::time::Duration; use std::time::Instant; -#[derive(Debug, ThisError)] -#[cstm(name = "DaqingestTools")] -pub enum Error { - PgConn(#[from] dbpg::err::Error), - Postgres(#[from] dbpg::postgres::Error), - ScyllaSession(#[from] scywr::session::Error), - ScyllaQuery(#[from] QueryError), - ScyllaNextRowError(#[from] NextRowError), - ScyllaSchema(#[from] scywr::schema::Error), - ScyllaTypeCheck(#[from] scywr::scylla::deserialize::TypeCheckError), - ParseError(String), - InvalidValue, -} +autoerr::create_error_v1!( + name(Error, "DaqingestTools"), + enum variants { + PgConn(#[from] dbpg::err::Error), + Postgres(#[from] dbpg::postgres::Error), + ScyllaSession(#[from] scywr::session::Error), + ScyllaQuery(#[from] QueryError), + ScyllaNextRowError(#[from] NextRowError), + ScyllaSchema(#[from] scywr::schema::Error), + ScyllaTypeCheck(#[from] scywr::scylla::deserialize::TypeCheckError), + ParseError(String), + InvalidValue, + }, +); pub async fn remove_older( backend: String, diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 175c25e..a958222 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -95,6 +95,7 @@ const SILENCE_READ_NEXT_IVL: Duration = Duration::from_millis(1000 * 200); const POLL_READ_TIMEOUT: Duration = Duration::from_millis(1000 * 10); const DO_RATE_CHECK: bool = false; const CHANNEL_STATUS_PONG_QUIET: Duration = Duration::from_millis(1000 * 60 * 60); +const METRICS_EMIT_IVL: Duration = Duration::from_millis(1000 * 1); macro_rules! trace3 { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ); } @@ -1117,6 +1118,7 @@ pub struct CaConn { trace_channel_poll: bool, ts_channel_status_pong_last: Instant, mett: stats::mett::CaConnMetrics, + metrics_emit_last: Instant, } impl Drop for CaConn { @@ -1186,6 +1188,7 @@ impl CaConn { trace_channel_poll: false, ts_channel_status_pong_last: tsnow, mett: stats::mett::CaConnMetrics::new(), + metrics_emit_last: tsnow, } } @@ -2096,15 +2099,13 @@ impl CaConn { self.mett.recv_read_notify_while_polling_idle().inc(); } PollTickState::Wait(st3) => { - let dt = tsnow.saturating_duration_since(st3.since); - // TODO STATS - // self.stats.caget_lat().ingest_dur_dms(dt); - // TODO maintain histogram of read-notify latencies if self.read_ioids.remove(&st3.ioid).is_some() { self.mett.ioid_read_done().inc(); } else { self.mett.ioid_read_error_not_found().inc(); } + let dt = tsnow.saturating_duration_since(st3.since); + self.mett.caget_lat().push_dur_100us(dt); let next = PollTickStateIdle::decide_next(st3.next_backup, st2.poll_ivl, tsnow); if self.trace_channel_poll { trace!("make next poll idle at {:?} tsnow {:?}", next, tsnow); @@ -3052,8 +3053,6 @@ impl CaConn { TcpAsyncWriteRead::from(tcp), self.remote_addr_dbg.to_string(), self.opts.array_truncate, - // self.ca_proto_stats.clone(), - (), ); self.state = CaConnState::Init; self.proto = Some(proto); @@ -3237,13 +3236,23 @@ impl CaConn { CaConnState::EndOfStream => {} } self.iqdqs.housekeeping(); - self.metrics_emit(); + if self.metrics_emit_last + METRICS_EMIT_IVL <= tsnow { + self.metrics_emit_last = tsnow; + self.metrics_emit(); + } Ok(()) } fn housekeeping_self(&mut self) {} fn metrics_emit(&mut self) { + if let Some(x) = self.proto.as_mut() { + let mett = x.mett(); + mett.metrics_emit().inc(); + let m = mett.take_and_reset(); + self.mett.proto().ingest(m); + } + self.mett.metrics_emit().inc(); let item = self.mett.take_and_reset(); let item = CaConnEvent::new(Instant::now(), CaConnEventValue::Metrics(item)); self.ca_conn_event_out_queue.push_back(item); @@ -3393,12 +3402,13 @@ impl CaConn { loop_max: u32, cx: &mut Context, id: &str, - stats: FS, + mut stats: FS, + mett: &mut stats::mett::CaConnMetrics, ) -> Result>, Error> where Q: Unpin, FB: Fn(&mut VecDeque) -> Option, - FS: Fn(&Q), + FS: for<'a, 'b> FnMut(&'a Q, &'b mut stats::mett::CaConnMetrics), { let self_name = "attempt_flush_queue"; use Poll::*; @@ -3417,7 +3427,7 @@ impl CaConn { } if sp.is_idle() { if let Some(item) = qu_to_si(qu) { - stats(&item); + stats(&item, mett); sp.as_mut().send_pin(item); } else { break; @@ -3469,7 +3479,8 @@ macro_rules! flush_queue { qu.shrink_to(qu.capacity() * 7 / 10); } let sp = obj.$sp.as_mut(); - match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id, $stats) { + let mett = &mut obj.mett; + match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id, $stats, mett) { Ok(Ready(Some(()))) => { *$have.0 |= true; } @@ -3493,7 +3504,8 @@ macro_rules! flush_queue_dqs { qu.shrink_to(qu.capacity() * 7 / 10); } let sp = obj.iqsp.as_mut().$sp(); - match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id, $stats) { + let mett = &mut obj.mett; + match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id, $stats, mett) { Ok(Ready(Some(()))) => { *$have.0 |= true; } @@ -3563,17 +3575,6 @@ impl Stream for CaConn { } { - let n = self.iqdqs.len(); - // TODO STATS - self.stats.iiq_len().ingest(n as u32); - } - - { - let stats2 = self.stats.clone(); - let stats_fn = move |item: &VecDeque| { - // TODO STATS - stats2.iiq_batch_len().ingest(item.len() as u32); - }; flush_queue_dqs!( self, st_rf1_qu, @@ -3583,14 +3584,11 @@ impl Stream for CaConn { (&mut have_progress, &mut have_pending), "st_rf1_rx", cx, - stats_fn + |item: &VecDeque, mett: &mut stats::mett::CaConnMetrics| { + mett.iiq_batch_len().push_val(item.len() as u32); + } ); - let stats2 = self.stats.clone(); - let stats_fn = move |item: &VecDeque| { - // TODO STATS - stats2.iiq_batch_len().ingest(item.len() as u32); - }; flush_queue_dqs!( self, st_rf3_qu, @@ -3600,14 +3598,11 @@ impl Stream for CaConn { (&mut have_progress, &mut have_pending), "st_rf3_rx", cx, - stats_fn + |item: &VecDeque, mett: &mut stats::mett::CaConnMetrics| { + mett.iiq_batch_len().push_val(item.len() as u32); + } ); - let stats2 = self.stats.clone(); - let stats_fn = move |item: &VecDeque| { - // TODO STATS - stats2.iiq_batch_len().ingest(item.len() as u32); - }; flush_queue_dqs!( self, mt_rf3_qu, @@ -3617,14 +3612,11 @@ impl Stream for CaConn { (&mut have_progress, &mut have_pending), "mt_rf3_rx", cx, - stats_fn + |item: &VecDeque, mett: &mut stats::mett::CaConnMetrics| { + mett.iiq_batch_len().push_val(item.len() as u32); + } ); - let stats2 = self.stats.clone(); - let stats_fn = move |item: &VecDeque| { - // TODO STATS - stats2.iiq_batch_len().ingest(item.len() as u32); - }; flush_queue_dqs!( self, lt_rf3_qu, @@ -3634,14 +3626,11 @@ impl Stream for CaConn { (&mut have_progress, &mut have_pending), "lt_rf3_rx", cx, - stats_fn + |item: &VecDeque, mett: &mut stats::mett::CaConnMetrics| { + mett.iiq_batch_len().push_val(item.len() as u32); + } ); - let stats2 = self.stats.clone(); - let stats_fn = move |item: &VecDeque| { - // TODO STATS - stats2.iiq_batch_len().ingest(item.len() as u32); - }; flush_queue_dqs!( self, lt_rf3_lat5_qu, @@ -3651,7 +3640,9 @@ impl Stream for CaConn { (&mut have_progress, &mut have_pending), "lt_rf3_lat5_rx", cx, - stats_fn + |item: &VecDeque, mett: &mut stats::mett::CaConnMetrics| { + mett.iiq_batch_len().push_val(item.len() as u32); + } ); } @@ -3667,7 +3658,7 @@ impl Stream for CaConn { (&mut have_progress, &mut have_pending), "chinf", cx, - |_| {} + |_, _| {} ); } @@ -3795,19 +3786,9 @@ impl Stream for CaConn { let poll_ts2 = Instant::now(); let dt = poll_ts2.saturating_duration_since(poll_ts1); if self.trace_channel_poll { - // TODO STATS - self.stats.poll_all_dt().ingest_dur_dms(dt); - if dt >= Duration::from_millis(10) { - trace!("long poll {dt:?}"); - } else if dt >= Duration::from_micros(400) { - // TODO STATS - let v = self.stats.poll_all_dt.to_display(); - let ip = self.remote_addr_dbg; - trace!("poll_all_dt {ip} {v}"); - } + self.mett.poll_all_dt().push_dur_100us(dt); } - // TODO STATS - // self.stats.poll_reloops().ingest(reloops); + self.mett.poll_reloops().push_val(reloops); ret } } diff --git a/netfetch/src/ca/conn2/conn.rs b/netfetch/src/ca/conn2/conn.rs index f2c862e..d35dffc 100644 --- a/netfetch/src/ca/conn2/conn.rs +++ b/netfetch/src/ca/conn2/conn.rs @@ -21,9 +21,9 @@ use proto::CaProto; use scywr::insertqueues::InsertDeques; use scywr::insertqueues::InsertQueuesTx; use scywr::iteminsertqueue::QueryItem; -use stats::rand_xoshiro::Xoshiro128PlusPlus; use stats::CaConnStats; use stats::CaProtoStats; +use stats::rand_xoshiro::Xoshiro128PlusPlus; use std::collections::VecDeque; use std::fmt; use std::net::SocketAddrV4; @@ -160,9 +160,11 @@ impl Stream for CaConn { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; let mut durs = DurationMeasureSteps::new(); - self.stats.poll_fn_begin().inc(); + // TODO STATS + // self.stats.poll_fn_begin().inc(); let ret = loop { - self.stats.poll_loop_begin().inc(); + // TODO STATS + // self.stats.poll_loop_begin().inc(); let qlen = self.iqdqs.len(); if qlen >= self.opts.insert_queue_max * 2 / 3 { self.stats.insert_item_queue_pressure().inc(); @@ -191,11 +193,6 @@ impl Stream for CaConn { } } - { - let n = self.iqdqs.len(); - self.stats.iiq_len().ingest(n as u32); - } - { // let stats2 = self.stats.clone(); // let stats_fn = move |item: &VecDeque| { diff --git a/netfetch/src/daemon_common.rs b/netfetch/src/daemon_common.rs index 4a22423..5c9987d 100644 --- a/netfetch/src/daemon_common.rs +++ b/netfetch/src/daemon_common.rs @@ -29,6 +29,7 @@ pub enum DaemonEvent { Shutdown, ConfigReload(Sender), GetMetrics(Sender), + ScyllaInsertWorkerOutput(scywr::insertworker::InsertWorkerOutputItem), } impl DaemonEvent { @@ -43,6 +44,7 @@ impl DaemonEvent { Shutdown => format!("Shutdown"), ConfigReload(..) => format!("ConfigReload"), GetMetrics(..) => format!("GetMetrics"), + ScyllaInsertWorkerOutput(..) => format!("ScyllaInsertWorkerOutput"), } } } diff --git a/netfetch/src/metrics/types.rs b/netfetch/src/metrics/types.rs index 1447afd..f0520b9 100644 --- a/netfetch/src/metrics/types.rs +++ b/netfetch/src/metrics/types.rs @@ -121,7 +121,7 @@ impl From<&InsertQueuesTx> for InsertQueuesTxMetrics { #[derive(Debug, Serialize)] pub struct MetricsPrometheusShort { - counters: Vec<(String, u64)>, + counters: Vec, } impl MetricsPrometheusShort { @@ -129,7 +129,7 @@ impl MetricsPrometheusShort { use std::fmt::Write; let mut s = String::new(); for e in self.counters.iter() { - write!(&mut s, "{} {}\n", e.0, e.1).unwrap(); + write!(&mut s, "{}\n", e).unwrap(); } s } @@ -138,7 +138,7 @@ impl MetricsPrometheusShort { impl From<&stats::mett::DaemonMetrics> for MetricsPrometheusShort { fn from(value: &stats::mett::DaemonMetrics) -> Self { Self { - counters: value.to_flatten_prometheus(), + counters: value.to_flatten_prometheus("daemon"), } } } diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index c3998d3..f91f395 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -11,6 +11,7 @@ use crate::iteminsertqueue::insert_item_fut; use crate::iteminsertqueue::insert_msp_fut; use crate::store::DataStore; use async_channel::Receiver; +use async_channel::Sender; use atomic::AtomicU64; use futures_util::Stream; use futures_util::StreamExt; @@ -100,6 +101,11 @@ async fn back_off_sleep(backoff_dt: &mut Duration) { tokio::time::sleep(*backoff_dt).await; } +#[derive(Debug)] +pub enum InsertWorkerOutputItem { + Metrics(stats::mett::ScyllaInsertWorker), +} + pub struct InsertWorkerOpts { pub store_workers_rate: Arc, pub insert_workers_running: Arc, @@ -118,6 +124,7 @@ pub async fn spawn_scylla_insert_workers( store_stats: Arc, use_rate_limit_queue: bool, ignore_writes: bool, + tx: Sender, ) -> Result>>, Error> { let item_inp = if use_rate_limit_queue { crate::ratelimit::rate_limiter(insert_worker_opts.store_workers_rate.clone(), item_inp) @@ -140,6 +147,7 @@ pub async fn spawn_scylla_insert_workers( Some(data_store), ignore_writes, store_stats.clone(), + tx.clone(), )); jhs.push(jh); } @@ -152,6 +160,7 @@ pub async fn spawn_scylla_insert_workers_dummy( item_inp: Receiver>, insert_worker_opts: Arc, store_stats: Arc, + tx: Sender, ) -> Result>>, Error> { let mut jhs = Vec::new(); for worker_ix in 0..insert_worker_count { @@ -164,6 +173,7 @@ pub async fn spawn_scylla_insert_workers_dummy( data_store, true, store_stats.clone(), + tx.clone(), )); jhs.push(jh); } @@ -178,9 +188,13 @@ async fn worker_streamed( data_store: Option>, ignore_writes: bool, stats: Arc, + tx: Sender, ) -> Result<(), Error> { debug_setup!("worker_streamed begin"); - stats.worker_start().inc(); + let tsnow = Instant::now(); + let mut mett = stats::mett::ScyllaInsertWorker::new(); + let mut mett_emit_last = tsnow; + let metrics_ivl = Duration::from_millis(1000); insert_worker_opts .insert_workers_running .fetch_add(1, atomic::Ordering::AcqRel); @@ -199,9 +213,10 @@ async fn worker_streamed( debug_setup!("waiting for item"); while let Some(item) = stream.next().await { trace_item_execute!("see item"); + let tsnow = Instant::now(); match item { Ok(_) => { - stats.inserted_values().inc(); + mett.job_ok().inc(); // TODO compute the insert latency bin and count. } Err(e) => { @@ -215,9 +230,22 @@ async fn worker_streamed( }, _ => e.into(), }; + mett.job_err().inc(); stats_inc_for_err(&stats, &e); } } + if mett_emit_last + metrics_ivl <= tsnow { + mett_emit_last = tsnow; + let m = mett.take_and_reset(); + let item = InsertWorkerOutputItem::Metrics(m); + match tx.send(item).await { + Ok(()) => {} + Err(_) => { + error!("insert worker can not emit metrics"); + break; + } + } + } } } else { let mut stream = Box::pin(stream); diff --git a/stats/Cargo.toml b/stats/Cargo.toml index d98f292..055158a 100644 --- a/stats/Cargo.toml +++ b/stats/Cargo.toml @@ -13,4 +13,5 @@ rand_xoshiro = "0.6.0" stats_types = { path = "../stats_types" } stats_proc = { path = "../stats_proc" } log = { path = "../log" } -mettrics = { version = "0.0.5", path = "../../mettrics" } +mettrics = { version = "0.0.6", path = "../../mettrics" } +ca_proto = { path = "../../daqbuf-ca-proto", package = "daqbuf-ca-proto" } diff --git a/stats/mettdecl.rs b/stats/mettdecl.rs index 0ee7b5f..5520b39 100644 --- a/stats/mettdecl.rs +++ b/stats/mettdecl.rs @@ -1,6 +1,16 @@ +mod Metrics { + type StructName = ScyllaInsertWorker; + enum counters { + metrics_emit, + job_ok, + job_err, + } +} + mod Metrics { type StructName = CaConnMetrics; enum counters { + metrics_emit, ioid_read_begin, ioid_read_done, ioid_read_timeout, @@ -49,6 +59,17 @@ mod Metrics { get_series_id_ok, channel_add_exists, } + enum histolog2s { + clock_ioc_diff_abs, + caget_lat, + poll_reloops, + poll_all_dt, + iiq_batch_len, + } + mod Compose { + type Input = ca_proto::mett::CaProtoMetrics; + type Name = proto; + } } mod Metrics { @@ -65,4 +86,8 @@ mod Metrics { type Input = CaConnSetMetrics; type Name = ca_conn_set; } + mod Compose { + type Input = ScyllaInsertWorker; + type Name = scy_inswork; + } } diff --git a/stats/src/mett.rs b/stats/src/mett.rs index 3800997..821f7ab 100644 --- a/stats/src/mett.rs +++ b/stats/src/mett.rs @@ -1,3 +1,4 @@ use mettrics::types::CounterU32; +use mettrics::types::HistoLog2; mettrics::macros::make_metrics!("mettdecl.rs"); diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 92f9911..5f96356 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -337,7 +337,6 @@ stats_proc::stats_struct!(( inserts_msp_grid, inserts_queue_pop_for_global, inserts_queue_push, - inserts_queue_drop, insert_item_queue_pressure, insert_item_queue_full, out_queue_full, @@ -359,10 +358,6 @@ stats_proc::stats_struct!(( channel_info_insert_done, ivl_insert_done, mute_insert_done, - loop1_count, - loop2_count, - loop3_count, - loop4_count, command_can_not_reply, time_handle_conn_listen, time_handle_peer_ready, @@ -379,12 +374,6 @@ stats_proc::stats_struct!(( ping_start, ping_no_proto, pong_timeout, - poll_fn_begin, - poll_loop_begin, - poll_reloop, - poll_pending, - poll_no_progress_no_pending, - poll_wake_break, storage_queue_send, storage_queue_pending, event_add_res_recv, @@ -412,16 +401,7 @@ stats_proc::stats_struct!(( ca_proto_version_later, no_cid_for_subid, ), - histolog2s( - poll_all_dt, - poll_op3_dt, - poll_reloops, - pong_recv_lat, - ca_ts_off, - iiq_len, - iiq_batch_len, - caget_lat, - ), + histolog2s(poll_op3_dt, pong_recv_lat, ca_ts_off,), ), agg(name(CaConnStatsAgg), parent(CaConnStats)), diff(name(CaConnStatsAggDiff), input(CaConnStatsAgg)), @@ -429,8 +409,8 @@ stats_proc::stats_struct!(( name(CaProtoStats), prefix(ca_proto), counters( - tcp_recv_count, - tcp_recv_bytes, + // tcp_recv_count, + // tcp_recv_bytes, protocol_issue, payload_std_too_large, payload_ext_but_small,