Reenable metrics output
This commit is contained in:
@@ -87,9 +87,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.61"
|
||||
version = "0.1.62"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "705339e0e4a9690e2908d2b3d049d85682cf19fbd5782494498fbf7003a6a282"
|
||||
checksum = "689894c2db1ea643a50834b999abf1c110887402542955ff5451dab8f861f9ed"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -209,9 +209,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "bumpalo"
|
||||
version = "3.11.1"
|
||||
version = "3.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba"
|
||||
checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535"
|
||||
|
||||
[[package]]
|
||||
name = "byteorder"
|
||||
@@ -400,9 +400,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cxx"
|
||||
version = "1.0.86"
|
||||
version = "1.0.87"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "51d1075c37807dcf850c379432f0df05ba52cc30f279c5cfc43cc221ce7f8579"
|
||||
checksum = "b61a7545f753a88bcbe0a70de1fcc0221e10bfc752f576754fa91e663db1622e"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"cxxbridge-flags",
|
||||
@@ -412,9 +412,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cxx-build"
|
||||
version = "1.0.86"
|
||||
version = "1.0.87"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5044281f61b27bc598f2f6647d480aed48d2bf52d6eb0b627d84c0361b17aa70"
|
||||
checksum = "f464457d494b5ed6905c63b0c4704842aba319084a0a3561cdc1359536b53200"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"codespan-reporting",
|
||||
@@ -427,15 +427,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cxxbridge-flags"
|
||||
version = "1.0.86"
|
||||
version = "1.0.87"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "61b50bc93ba22c27b0d31128d2d130a0a6b3d267ae27ef7e4fae2167dfe8781c"
|
||||
checksum = "43c7119ce3a3701ed81aca8410b9acf6fc399d2629d057b87e2efa4e63a3aaea"
|
||||
|
||||
[[package]]
|
||||
name = "cxxbridge-macro"
|
||||
version = "1.0.86"
|
||||
version = "1.0.87"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "39e61fda7e62115119469c7b3591fd913ecca96fb766cfd3f2e2502ab7bc87a5"
|
||||
checksum = "65e07508b90551e610910fa648a1878991d367064997a596135b86df30daf07e"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -1412,9 +1412,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro2"
|
||||
version = "1.0.49"
|
||||
version = "1.0.50"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "57a8eca9f9c4ffde41714334dee777596264c7825420f521abc92b5b5deb63a5"
|
||||
checksum = "6ef7d57beacfaf2d8aee5937dab7b7f28de3cb8b1828479bb5de2a7106f2bae2"
|
||||
dependencies = [
|
||||
"unicode-ident",
|
||||
]
|
||||
@@ -1556,9 +1556,9 @@ checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342"
|
||||
|
||||
[[package]]
|
||||
name = "rustix"
|
||||
version = "0.36.6"
|
||||
version = "0.36.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4feacf7db682c6c329c4ede12649cd36ecab0f3be5b7d74e6a20304725db4549"
|
||||
checksum = "d4fdebc4b395b7fbb9ab11e462e20ed9051e7b16e42d24042c776eca0ac81b03"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"errno",
|
||||
@@ -1719,9 +1719,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde_yaml"
|
||||
version = "0.9.16"
|
||||
version = "0.9.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "92b5b431e8907b50339b51223b97d102db8d987ced36f6e4d03621db9316c834"
|
||||
checksum = "8fb06d4b6cdaef0e0c51fa881acb721bed3c924cfaa71d9c94a3b771dfdf6567"
|
||||
dependencies = [
|
||||
"indexmap",
|
||||
"itoa",
|
||||
@@ -1984,9 +1984,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.24.1"
|
||||
version = "1.24.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1d9f76183f91ecfb55e1d7d5602bd1d979e38a3a522fe900241cf195624d67ae"
|
||||
checksum = "597a12a59981d9e3c38d216785b0c37399f6e415e8d0712047620f189371b0bb"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"bytes",
|
||||
@@ -2073,9 +2073,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "toml"
|
||||
version = "0.5.10"
|
||||
version = "0.5.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1333c76748e868a4d9d1017b5ab53171dfd095f70c712fdb4653a406547f598f"
|
||||
checksum = "f4f7f0dd8d50a853a531c426359045b1998f04219d88799810762cd4ad314234"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
@@ -2261,9 +2261,9 @@ checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba"
|
||||
|
||||
[[package]]
|
||||
name = "unicode-bidi"
|
||||
version = "0.3.8"
|
||||
version = "0.3.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992"
|
||||
checksum = "0046be40136ef78dc325e0edefccf84ccddacd0afcc1ca54103fa3c61bbdab1d"
|
||||
|
||||
[[package]]
|
||||
name = "unicode-ident"
|
||||
|
||||
@@ -4,6 +4,7 @@ use err::Error;
|
||||
use futures_util::FutureExt;
|
||||
use futures_util::StreamExt;
|
||||
use log::*;
|
||||
use netfetch::ca::conn::CaConnEvent;
|
||||
use netfetch::ca::conn::ConnCommand;
|
||||
use netfetch::ca::connset::CaConnSet;
|
||||
use netfetch::ca::findioc::FindIocRes;
|
||||
@@ -17,10 +18,12 @@ use netfetch::daemon_common::DaemonEvent;
|
||||
use netfetch::errconv::ErrConv;
|
||||
use netfetch::insertworker::Ttls;
|
||||
use netfetch::metrics::ExtraInsertsConf;
|
||||
use netfetch::metrics::StatsSet;
|
||||
use netfetch::store::CommonInsertItemQueue;
|
||||
use netpod::Database;
|
||||
use netpod::ScyllaConfig;
|
||||
use serde::Serialize;
|
||||
use stats::DaemonStats;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::VecDeque;
|
||||
@@ -153,17 +156,19 @@ pub struct ChannelState {
|
||||
value: ChannelStateValue,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub enum CaConnStateValue {}
|
||||
#[derive(Debug)]
|
||||
pub enum CaConnStateValue {
|
||||
Fresh,
|
||||
HadFeedback,
|
||||
Shutdown { since: Instant },
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct CaConnState {
|
||||
last_feedback: Instant,
|
||||
#[allow(unused)]
|
||||
value: CaConnStateValue,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DaemonOpts {
|
||||
backend: String,
|
||||
local_epics_hostname: String,
|
||||
@@ -248,6 +253,7 @@ pub struct Daemon {
|
||||
pg_client: Arc<PgClient>,
|
||||
ingest_commons: Arc<IngestCommons>,
|
||||
caconn_last_channel_check: Instant,
|
||||
stats: Arc<DaemonStats>,
|
||||
}
|
||||
|
||||
impl Daemon {
|
||||
@@ -409,6 +415,7 @@ impl Daemon {
|
||||
pg_client,
|
||||
ingest_commons,
|
||||
caconn_last_channel_check: Instant::now(),
|
||||
stats: Arc::new(DaemonStats::new()),
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
@@ -662,23 +669,77 @@ impl Daemon {
|
||||
(qtx, ioc_finder_jh)
|
||||
}
|
||||
|
||||
fn stats(&self) -> &Arc<DaemonStats> {
|
||||
&self.stats
|
||||
}
|
||||
|
||||
async fn check_chans(&mut self) -> Result<(), Error> {
|
||||
{
|
||||
let tsnow = Instant::now();
|
||||
for (k, v) in &self.connection_states {
|
||||
// TODO check for delta t since last issued status command.
|
||||
if tsnow.duration_since(v.last_feedback) > Duration::from_millis(20000) {
|
||||
error!("CaConn without status feedback {k:?}");
|
||||
for (k, v) in &mut self.connection_states {
|
||||
match v.value {
|
||||
CaConnStateValue::Fresh => {
|
||||
// TODO check for delta t since last issued status command.
|
||||
if tsnow.duration_since(v.last_feedback) > Duration::from_millis(20000) {
|
||||
error!("TODO send connection-close for {k:?}");
|
||||
self.stats.ca_conn_status_feedback_timeout_inc();
|
||||
v.value = CaConnStateValue::Shutdown { since: tsnow };
|
||||
}
|
||||
}
|
||||
CaConnStateValue::HadFeedback => {
|
||||
// TODO check for delta t since last issued status command.
|
||||
if tsnow.duration_since(v.last_feedback) > Duration::from_millis(20000) {
|
||||
error!("TODO send connection-close for {k:?}");
|
||||
self.stats.ca_conn_status_feedback_timeout_inc();
|
||||
v.value = CaConnStateValue::Shutdown { since: tsnow };
|
||||
}
|
||||
}
|
||||
CaConnStateValue::Shutdown { since } => {
|
||||
if tsnow.saturating_duration_since(since) > Duration::from_millis(10000) {
|
||||
self.stats.critical_error_inc();
|
||||
error!("Shutdown of CaConn to {} failed", k);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut currently_search_pending = 0;
|
||||
{
|
||||
let mut with_address_count = 0;
|
||||
let mut without_address_count = 0;
|
||||
for (_ch, st) in &self.channel_states {
|
||||
if let ChannelStateValue::Active(ActiveChannelState::SearchPending { .. }) = &st.value {
|
||||
currently_search_pending += 1;
|
||||
match &st.value {
|
||||
ChannelStateValue::Active(st2) => match st2 {
|
||||
ActiveChannelState::UnknownAddress { since: _ } => {
|
||||
without_address_count += 1;
|
||||
}
|
||||
ActiveChannelState::SearchPending { since: _, did_send: _ } => {
|
||||
currently_search_pending += 1;
|
||||
without_address_count += 1;
|
||||
}
|
||||
ActiveChannelState::WithAddress { addr: _, state } => match state {
|
||||
WithAddressState::Unassigned { assign_at: _ } => {
|
||||
with_address_count += 1;
|
||||
}
|
||||
WithAddressState::Assigned(_) => {
|
||||
with_address_count += 1;
|
||||
}
|
||||
},
|
||||
ActiveChannelState::NoAddress { since: _ } => {
|
||||
without_address_count += 1;
|
||||
}
|
||||
},
|
||||
ChannelStateValue::ToRemove { addr: _ } => {
|
||||
with_address_count += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
self.stats
|
||||
.channel_with_address
|
||||
.store(with_address_count, atomic::Ordering::Release);
|
||||
self.stats
|
||||
.channel_without_address
|
||||
.store(without_address_count, atomic::Ordering::Release);
|
||||
}
|
||||
let k = self.chan_check_next.take();
|
||||
trace!("------------ check_chans start at {:?}", k);
|
||||
@@ -745,6 +806,13 @@ impl Daemon {
|
||||
value: ConnectionStateValue::Unconnected,
|
||||
};
|
||||
*state = WithAddressState::Assigned(cs);
|
||||
self.connection_states.entry(*addr).or_insert_with(|| {
|
||||
let t = CaConnState {
|
||||
last_feedback: Instant::now(),
|
||||
value: CaConnStateValue::Fresh,
|
||||
};
|
||||
t
|
||||
});
|
||||
}
|
||||
}
|
||||
Assigned(_) => {
|
||||
@@ -838,7 +906,17 @@ impl Daemon {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn ca_conn_send_shutdown(&mut self) -> Result<(), Error> {
|
||||
warn!("send shutdown to all ca connections");
|
||||
self.ingest_commons
|
||||
.ca_conn_set
|
||||
.enqueue_command_to_all(|| ConnCommand::shutdown())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_timer_tick(&mut self) -> Result<(), Error> {
|
||||
self.stats.handle_timer_tick_count_inc();
|
||||
let ts1 = Instant::now();
|
||||
let tsnow = SystemTime::now();
|
||||
if SIGINT.load(atomic::Ordering::Acquire) == 1 {
|
||||
@@ -936,6 +1014,7 @@ impl Daemon {
|
||||
SEARCH_ANS_COUNT.fetch_add(ress.len(), atomic::Ordering::AcqRel);
|
||||
for res in ress {
|
||||
if let Some(addr) = &res.addr {
|
||||
self.stats.ioc_search_some_inc();
|
||||
let ch = Channel::new(res.channel);
|
||||
if let Some(st) = self.channel_states.get_mut(&ch) {
|
||||
if let ChannelStateValue::Active(ActiveChannelState::SearchPending { since, did_send: _ }) =
|
||||
@@ -989,6 +1068,7 @@ impl Daemon {
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
self.stats.ioc_search_err_inc();
|
||||
error!("error from search: {e}");
|
||||
}
|
||||
}
|
||||
@@ -998,7 +1078,7 @@ impl Daemon {
|
||||
async fn handle_ca_conn_done(&mut self, conn_addr: SocketAddrV4) -> Result<(), Error> {
|
||||
info!("handle_ca_conn_done {conn_addr:?}");
|
||||
self.connection_states.remove(&conn_addr);
|
||||
for (k, v) in self.channel_states.iter_mut() {
|
||||
for (_k, v) in self.channel_states.iter_mut() {
|
||||
match &v.value {
|
||||
ChannelStateValue::Active(st2) => match st2 {
|
||||
ActiveChannelState::UnknownAddress { .. } => {}
|
||||
@@ -1022,8 +1102,45 @@ impl Daemon {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_ca_conn_event(&mut self, addr: SocketAddrV4, item: CaConnEvent) -> Result<(), Error> {
|
||||
self.stats.event_ca_conn_inc();
|
||||
use netfetch::ca::conn::CaConnEventValue::*;
|
||||
match item.value {
|
||||
None => {
|
||||
// TODO count, maybe reduce.
|
||||
Ok(())
|
||||
}
|
||||
EchoTimeout => {
|
||||
self.stats.ca_echo_timeout_total_inc();
|
||||
error!("TODO on EchoTimeout remove the CaConn and reset channels");
|
||||
Ok(())
|
||||
}
|
||||
ConnCommandResult(item) => {
|
||||
self.stats.todo_mark_inc();
|
||||
use netfetch::ca::conn::ConnCommandResultKind::*;
|
||||
match &item.kind {
|
||||
CheckHealth => {
|
||||
if let Some(st) = self.connection_states.get_mut(&addr) {
|
||||
self.stats.ca_conn_status_feedback_recv_inc();
|
||||
st.last_feedback = Instant::now();
|
||||
Ok(())
|
||||
} else {
|
||||
self.stats.ca_conn_status_feedback_no_dst_inc();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
EndOfStream => {
|
||||
self.stats.ca_conn_status_done_inc();
|
||||
self.handle_ca_conn_done(addr).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_event(&mut self, item: DaemonEvent, ticker_inp_tx: &Sender<u32>) -> Result<(), Error> {
|
||||
use DaemonEvent::*;
|
||||
self.stats.events_inc();
|
||||
let ts1 = Instant::now();
|
||||
let item_summary = item.summary();
|
||||
let ret = match item {
|
||||
@@ -1032,6 +1149,7 @@ impl Daemon {
|
||||
match ticker_inp_tx.send(42).await {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
self.stats.ticker_token_release_error_inc();
|
||||
error!("can not send ticker token");
|
||||
return Err(Error::with_msg_no_trace("can not send ticker token"));
|
||||
}
|
||||
@@ -1041,34 +1159,7 @@ impl Daemon {
|
||||
ChannelAdd(ch) => self.handle_channel_add(ch),
|
||||
ChannelRemove(ch) => self.handle_channel_remove(ch),
|
||||
SearchDone(item) => self.handle_search_done(item).await,
|
||||
CaConnEvent(addr, item) => {
|
||||
use netfetch::ca::conn::CaConnEventValue::*;
|
||||
match item.value {
|
||||
None => {
|
||||
// TODO count, maybe reduce.
|
||||
Ok(())
|
||||
}
|
||||
EchoTimeout => {
|
||||
error!("TODO on EchoTimeout remove the CaConn and reset channels");
|
||||
Ok(())
|
||||
}
|
||||
HealthCheckDone => {
|
||||
if let Some(st) = self.connection_states.get_mut(&addr) {
|
||||
st.last_feedback = Instant::now();
|
||||
Ok(())
|
||||
} else {
|
||||
error!("received HealthCheckDone for unknown CaConn");
|
||||
// TODO
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
ConnCommandResult(item) => {
|
||||
info!("TODO handle ConnCommandResult {item:?}");
|
||||
Ok(())
|
||||
}
|
||||
EndOfStream => self.handle_ca_conn_done(addr).await,
|
||||
}
|
||||
}
|
||||
CaConnEvent(addr, item) => self.handle_ca_conn_event(addr, item).await,
|
||||
};
|
||||
let dt = ts1.elapsed();
|
||||
if dt > Duration::from_millis(200) {
|
||||
@@ -1081,6 +1172,7 @@ impl Daemon {
|
||||
let (ticker_inp_tx, ticker_inp_rx) = async_channel::bounded::<u32>(1);
|
||||
let ticker = {
|
||||
let tx = self.tx.clone();
|
||||
let stats = self.stats.clone();
|
||||
async move {
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
@@ -1092,7 +1184,10 @@ impl Daemon {
|
||||
for _ in 0..c {
|
||||
match ticker_inp_rx.recv().await {
|
||||
Ok(_) => {}
|
||||
Err(_) => break,
|
||||
Err(_) => {
|
||||
stats.ticker_token_acquire_error_inc();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1161,9 +1256,14 @@ pub async fn run(opts: CaIngestOpts, channels: Vec<String>) -> Result<(), Error>
|
||||
};
|
||||
let mut daemon = Daemon::new(opts2).await?;
|
||||
let tx = daemon.tx.clone();
|
||||
let daemon_stats = daemon.stats().clone();
|
||||
|
||||
let dcom = Arc::new(netfetch::metrics::DaemonComm::dummy());
|
||||
netfetch::metrics::start_metrics_service(opts.api_bind(), dcom);
|
||||
let dcom = Arc::new(netfetch::metrics::DaemonComm::new(tx.clone()));
|
||||
let metrics_jh = {
|
||||
let stats_set = StatsSet::new(daemon_stats);
|
||||
let fut = netfetch::metrics::start_metrics_service(opts.api_bind(), dcom, stats_set);
|
||||
tokio::task::spawn(fut)
|
||||
};
|
||||
|
||||
let daemon_jh = taskrun::spawn(async move {
|
||||
// TODO handle Err
|
||||
@@ -1175,5 +1275,8 @@ pub async fn run(opts: CaIngestOpts, channels: Vec<String>) -> Result<(), Error>
|
||||
}
|
||||
info!("all channels sent to daemon");
|
||||
daemon_jh.await.unwrap();
|
||||
if false {
|
||||
metrics_jh.await.unwrap();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -338,15 +338,14 @@ pub enum ConnCommandResultKind {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ConnCommandResult {
|
||||
id: usize,
|
||||
kind: ConnCommandResultKind,
|
||||
pub id: usize,
|
||||
pub kind: ConnCommandResultKind,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum CaConnEventValue {
|
||||
None,
|
||||
EchoTimeout,
|
||||
HealthCheckDone,
|
||||
ConnCommandResult(ConnCommandResult),
|
||||
EndOfStream,
|
||||
}
|
||||
@@ -423,6 +422,7 @@ pub struct CaConn {
|
||||
ioc_ping_last: Instant,
|
||||
ioc_ping_start: Option<Instant>,
|
||||
cmd_res_queue: VecDeque<ConnCommandResult>,
|
||||
ca_conn_event_out_queue: VecDeque<CaConnEvent>,
|
||||
channel_set_ops: Arc<ChannelSetOps>,
|
||||
channel_info_query_tx: Sender<ChannelInfoQuery>,
|
||||
series_lookup_schedule: BTreeMap<Cid, ChannelInfoQuery>,
|
||||
@@ -474,6 +474,7 @@ impl CaConn {
|
||||
ioc_ping_last: Instant::now(),
|
||||
ioc_ping_start: None,
|
||||
cmd_res_queue: VecDeque::new(),
|
||||
ca_conn_event_out_queue: VecDeque::new(),
|
||||
channel_set_ops: Arc::new(ChannelSetOps {
|
||||
ops: StdMutex::new(BTreeMap::new()),
|
||||
flag: AtomicUsize::new(0),
|
||||
@@ -528,8 +529,13 @@ impl CaConn {
|
||||
self.trigger_shutdown();
|
||||
}
|
||||
}
|
||||
//self.stats.caconn_command_can_not_reply_inc();
|
||||
// TODO return the result
|
||||
let res = ConnCommandResult {
|
||||
id: 0,
|
||||
kind: ConnCommandResultKind::CheckHealth,
|
||||
};
|
||||
self.cmd_res_queue.push_back(res);
|
||||
//self.stats.caconn_command_can_not_reply_inc();
|
||||
}
|
||||
|
||||
fn cmd_find_channel(&self, pattern: &str) {
|
||||
@@ -811,6 +817,11 @@ impl CaConn {
|
||||
if let Some(started) = self.ioc_ping_start {
|
||||
if started.elapsed() > Duration::from_millis(4000) {
|
||||
warn!("Echo timeout {addr:?}", addr = self.remote_addr_dbg);
|
||||
let item = CaConnEvent {
|
||||
ts: Instant::now(),
|
||||
value: CaConnEventValue::EchoTimeout,
|
||||
};
|
||||
self.ca_conn_event_out_queue.push_back(item);
|
||||
self.trigger_shutdown();
|
||||
}
|
||||
} else {
|
||||
@@ -1703,6 +1714,8 @@ impl Stream for CaConn {
|
||||
ts: Instant::now(),
|
||||
value: CaConnEventValue::ConnCommandResult(item),
|
||||
})))
|
||||
} else if let Some(item) = self.ca_conn_event_out_queue.pop_front() {
|
||||
Ready(Some(Ok(item)))
|
||||
} else {
|
||||
let mut i1 = 0;
|
||||
let ret = loop {
|
||||
|
||||
@@ -11,12 +11,23 @@ use serde::Serialize;
|
||||
use stats::CaConnStats;
|
||||
use stats::CaConnStatsAgg;
|
||||
use stats::CaConnStatsAggDiff;
|
||||
use stats::DaemonStats;
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddrV4;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
pub struct StatsSet {
|
||||
daemon: Arc<DaemonStats>,
|
||||
}
|
||||
|
||||
impl StatsSet {
|
||||
pub fn new(daemon: Arc<DaemonStats>) -> Self {
|
||||
Self { daemon }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ExtraInsertsConf {
|
||||
pub copies: Vec<(u64, u64)>,
|
||||
@@ -112,19 +123,17 @@ struct DummyQuery {
|
||||
age: usize,
|
||||
}
|
||||
|
||||
pub struct DaemonComm {}
|
||||
pub struct DaemonComm {
|
||||
tx: Sender<DaemonEvent>,
|
||||
}
|
||||
|
||||
impl DaemonComm {
|
||||
pub fn new(tx: Sender<DaemonEvent>) -> Self {
|
||||
Self {}
|
||||
}
|
||||
|
||||
pub fn dummy() -> Self {
|
||||
Self {}
|
||||
Self { tx }
|
||||
}
|
||||
}
|
||||
|
||||
fn make_routes(dcom: Arc<DaemonComm>) -> axum::Router {
|
||||
fn make_routes(dcom: Arc<DaemonComm>, stats_set: StatsSet) -> axum::Router {
|
||||
use axum::extract;
|
||||
use axum::routing::get;
|
||||
use axum::routing::put;
|
||||
@@ -147,17 +156,12 @@ fn make_routes(dcom: Arc<DaemonComm>) -> axum::Router {
|
||||
)
|
||||
.route(
|
||||
"/metrics",
|
||||
get(|| async {
|
||||
let stats = crate::ca::METRICS.lock().unwrap();
|
||||
match stats.as_ref() {
|
||||
Some(s) => {
|
||||
trace!("Metrics");
|
||||
s.prometheus()
|
||||
}
|
||||
None => {
|
||||
trace!("Metrics empty");
|
||||
String::new()
|
||||
}
|
||||
get({
|
||||
//
|
||||
|| async move {
|
||||
info!("metrics");
|
||||
let s1 = stats_set.daemon.prometheus();
|
||||
s1
|
||||
}
|
||||
}),
|
||||
)
|
||||
@@ -238,9 +242,9 @@ fn make_routes(dcom: Arc<DaemonComm>) -> axum::Router {
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn start_metrics_service(bind_to: String, dcom: Arc<DaemonComm>) {
|
||||
pub async fn start_metrics_service(bind_to: String, dcom: Arc<DaemonComm>, stats_set: StatsSet) {
|
||||
axum::Server::bind(&bind_to.parse().unwrap())
|
||||
.serve(make_routes(dcom).into_make_service())
|
||||
.serve(make_routes(dcom, stats_set).into_make_service())
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
@@ -266,7 +266,29 @@ stats_proc::stats_struct!((
|
||||
));
|
||||
|
||||
stats_proc::stats_struct!((
|
||||
stats_struct(name(DaemonStats), counters(main_lookupaddr_ok)),
|
||||
stats_struct(
|
||||
name(DaemonStats),
|
||||
prefix(daemon),
|
||||
counters(
|
||||
critical_error,
|
||||
todo_mark,
|
||||
ticker_token_acquire_error,
|
||||
ticker_token_release_error,
|
||||
handle_timer_tick_count,
|
||||
ioc_search_err,
|
||||
ioc_search_some,
|
||||
ioc_search_none,
|
||||
lookupaddr_ok,
|
||||
events,
|
||||
event_ca_conn,
|
||||
ca_conn_status_done,
|
||||
ca_conn_status_feedback_timeout,
|
||||
ca_conn_status_feedback_recv,
|
||||
ca_conn_status_feedback_no_dst,
|
||||
ca_echo_timeout_total,
|
||||
),
|
||||
values(channel_without_address, channel_with_address),
|
||||
),
|
||||
agg(name(DaemonStatsAgg), parent(DaemonStats)),
|
||||
diff(name(DaemonStatsAggDiff), input(DaemonStatsAgg)),
|
||||
));
|
||||
|
||||
@@ -13,7 +13,9 @@ struct FuncCallWithArgs {
|
||||
#[derive(Clone, Debug)]
|
||||
struct StatsStructDef {
|
||||
name: syn::Ident,
|
||||
prefix: Option<syn::Ident>,
|
||||
counters: Vec<syn::Ident>,
|
||||
values: Vec<syn::Ident>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -36,18 +38,22 @@ fn extend_str(mut a: String, x: impl AsRef<str>) -> String {
|
||||
}
|
||||
|
||||
fn stats_struct_impl(st: &StatsStructDef) -> String {
|
||||
use std::fmt::Write;
|
||||
let name = &st.name;
|
||||
let inits: Vec<_> = st
|
||||
let inits1 = st
|
||||
.counters
|
||||
.iter()
|
||||
.map(|x| format!("{:12}{}: AtomicU64::new(0)", "", x.to_string()))
|
||||
.collect();
|
||||
.map(|x| format!("{:12}{}: AtomicU64::new(0)", "", x.to_string()));
|
||||
let inits2 = st
|
||||
.values
|
||||
.iter()
|
||||
.map(|x| format!("{:12}{}: AtomicU64::new(0)", "", x.to_string()));
|
||||
let inits: Vec<_> = inits1.into_iter().chain(inits2).collect();
|
||||
let inits = inits.join(",\n");
|
||||
let incers: String = st
|
||||
.counters
|
||||
.iter()
|
||||
.map(|x| {
|
||||
let nn = x.to_string();
|
||||
.map(|nn| {
|
||||
format!(
|
||||
"
|
||||
pub fn {nn}_inc(&self) {{
|
||||
@@ -62,7 +68,61 @@ fn stats_struct_impl(st: &StatsStructDef) -> String {
|
||||
"
|
||||
)
|
||||
})
|
||||
.fold(String::new(), |a, x| format!("{}{}", a, x));
|
||||
.fold(String::new(), |mut a, x| {
|
||||
a.push_str(&x);
|
||||
a
|
||||
});
|
||||
let values = {
|
||||
let mut buf = String::new();
|
||||
for nn in &st.values {
|
||||
write!(
|
||||
buf,
|
||||
"
|
||||
pub fn {nn}_set(&self, v: u64) {{
|
||||
self.{nn}.store(v, Ordering::Release);
|
||||
}}
|
||||
"
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
buf
|
||||
};
|
||||
let fn_prometheus = {
|
||||
let mut buf = String::new();
|
||||
for x in &st.counters {
|
||||
let n = x.to_string();
|
||||
let pre = if let Some(x) = &st.prefix {
|
||||
format!("_{}", x)
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
buf.push_str(&format!(
|
||||
"ret.push_str(&format!(\"daqingest{}_{} {{}}\\n\", self.{}.load(Ordering::Acquire)));\n",
|
||||
pre, n, n
|
||||
));
|
||||
}
|
||||
for x in &st.values {
|
||||
let n = x.to_string();
|
||||
let nn = if let Some(pre) = &st.prefix {
|
||||
format!("{pre}_{n}")
|
||||
} else {
|
||||
n.to_string()
|
||||
};
|
||||
buf.push_str(&format!(
|
||||
"ret.push_str(&format!(\"daqingest_{} {{}}\\n\", self.{}.load(Ordering::Acquire)));\n",
|
||||
nn, n
|
||||
));
|
||||
}
|
||||
format!(
|
||||
"
|
||||
pub fn prometheus(&self) -> String {{
|
||||
let mut ret = String::new();
|
||||
{buf}
|
||||
ret
|
||||
}}
|
||||
"
|
||||
)
|
||||
};
|
||||
format!(
|
||||
"
|
||||
impl {name} {{
|
||||
@@ -74,6 +134,10 @@ impl {name} {{
|
||||
}}
|
||||
|
||||
{incers}
|
||||
|
||||
{values}
|
||||
|
||||
{fn_prometheus}
|
||||
}}
|
||||
"
|
||||
)
|
||||
@@ -86,11 +150,17 @@ fn stats_struct_decl_impl(st: &StatsStructDef) -> String {
|
||||
.iter()
|
||||
.map(|x| format!("{:4}pub {}: AtomicU64,\n", "", x.to_string()))
|
||||
.fold(String::new(), extend_str);
|
||||
let values_decl = st
|
||||
.values
|
||||
.iter()
|
||||
.map(|x| format!("{:4}pub {}: AtomicU64,\n", "", x.to_string()))
|
||||
.fold(String::new(), extend_str);
|
||||
let structt = format!(
|
||||
"
|
||||
pub struct {name} {{
|
||||
pub ts_create: Instant,
|
||||
{counters_decl}
|
||||
{values_decl}
|
||||
}}
|
||||
|
||||
"
|
||||
@@ -350,27 +420,40 @@ impl StatsStructDef {
|
||||
fn empty() -> Self {
|
||||
Self {
|
||||
name: syn::parse_str("__empty").unwrap(),
|
||||
counters: vec![],
|
||||
prefix: syn::parse_str("__empty").unwrap(),
|
||||
counters: Vec::new(),
|
||||
values: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn from_args(inp: PunctExpr) -> syn::Result<Self> {
|
||||
let mut name = None;
|
||||
let mut prefix = None;
|
||||
let mut counters = None;
|
||||
let mut values = None;
|
||||
for k in inp {
|
||||
let fa = FuncCallWithArgs::from_expr(k)?;
|
||||
if fa.name == "name" {
|
||||
let ident = ident_from_expr(fa.args[0].clone())?;
|
||||
name = Some(ident);
|
||||
}
|
||||
if fa.name == "counters" {
|
||||
} else if fa.name == "prefix" {
|
||||
let ident = ident_from_expr(fa.args[0].clone())?;
|
||||
prefix = Some(ident);
|
||||
} else if fa.name == "counters" {
|
||||
let idents = idents_from_exprs(fa.args)?;
|
||||
counters = Some(idents);
|
||||
} else if fa.name == "values" {
|
||||
let idents = idents_from_exprs(fa.args)?;
|
||||
values = Some(idents);
|
||||
} else {
|
||||
panic!("fa.name: {:?}", fa.name);
|
||||
}
|
||||
}
|
||||
let ret = StatsStructDef {
|
||||
name: name.expect("Expect name for StatsStructDef"),
|
||||
counters: counters.unwrap_or(vec![]),
|
||||
prefix,
|
||||
counters: counters.unwrap_or(Vec::new()),
|
||||
values: values.unwrap_or(Vec::new()),
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
@@ -476,7 +559,9 @@ pub fn stats_struct(ts: TokenStream) -> TokenStream {
|
||||
for j in &def.agg_defs {
|
||||
let h = StatsStructDef {
|
||||
name: j.name.clone(),
|
||||
prefix: None,
|
||||
counters: j.stats.counters.clone(),
|
||||
values: Vec::new(),
|
||||
};
|
||||
def.stats_struct_defs.push(h);
|
||||
}
|
||||
@@ -505,7 +590,11 @@ pub fn stats_struct(ts: TokenStream) -> TokenStream {
|
||||
// TODO currently, "j.stats" describes the input to the "agg", so that contains the wrong name.
|
||||
let p = StatsStructDef {
|
||||
name: k.input.clone(),
|
||||
// TODO refactor
|
||||
prefix: None,
|
||||
counters: j.stats.counters.clone(),
|
||||
// TODO compute values
|
||||
values: Vec::new(),
|
||||
};
|
||||
let s = diff_decl_impl(k, &p);
|
||||
code.push_str(&s);
|
||||
|
||||
Reference in New Issue
Block a user