Remove unused

This commit is contained in:
Dominik Werder
2023-09-11 07:18:51 +02:00
parent 8bbd6c37d1
commit 9d329c26ad
10 changed files with 266 additions and 206 deletions
+15 -26
View File
@@ -9,7 +9,6 @@ use log::*;
use netfetch::ca::connset::CaConnSet;
use netfetch::ca::connset::CaConnSetCtrl;
use netfetch::ca::connset::CaConnSetItem;
use netfetch::ca::IngestCommons;
use netfetch::conf::CaIngestOpts;
use netfetch::daemon_common::Channel;
use netfetch::daemon_common::DaemonEvent;
@@ -17,6 +16,7 @@ use netfetch::metrics::ExtraInsertsConf;
use netfetch::metrics::StatsSet;
use netpod::Database;
use netpod::ScyllaConfig;
use scywr::insertworker::InsertWorkerOpts;
use scywr::insertworker::Ttls;
use scywr::iteminsertqueue as scywriiq;
use scywr::store::DataStore;
@@ -78,13 +78,14 @@ pub struct Daemon {
count_assigned: usize,
last_status_print: SystemTime,
insert_workers_jh: Vec<JoinHandle<Result<(), Error>>>,
ingest_commons: Arc<IngestCommons>,
caconn_last_channel_check: Instant,
stats: Arc<DaemonStats>,
shutting_down: bool,
insert_rx_weak: WeakReceiver<QueryItem>,
connset_ctrl: CaConnSetCtrl,
connset_status_last: Instant,
// TODO should be a stats object?
insert_workers_running: AtomicU64,
}
impl Daemon {
@@ -137,25 +138,17 @@ impl Daemon {
}
});
let ingest_commons = IngestCommons {
pgconf: Arc::new(opts.pgconf.clone()),
backend: opts.backend().into(),
local_epics_hostname: opts.local_epics_hostname.clone(),
data_store: datastore.clone(),
insert_ivl_min: Arc::new(AtomicU64::new(0)),
extra_inserts_conf: tokio::sync::Mutex::new(ExtraInsertsConf::new()),
store_workers_rate: Arc::new(AtomicU64::new(20000)),
insert_frac: Arc::new(AtomicU64::new(1000)),
insert_workers_running: Arc::new(AtomicU64::new(0)),
};
let ingest_commons = Arc::new(ingest_commons);
let use_rate_limit_queue = false;
// TODO use a new stats type:
let store_stats = Arc::new(stats::CaConnStats::new());
let ttls = opts.ttls.clone();
let insert_worker_opts = Arc::new(ingest_commons.as_ref().into());
let insert_worker_opts = InsertWorkerOpts {
store_workers_rate: Arc::new(AtomicU64::new(20000000)),
insert_workers_running: Arc::new(AtomicU64::new(0)),
insert_frac: Arc::new(AtomicU64::new(1000)),
};
let insert_worker_opts = Arc::new(insert_worker_opts);
let insert_workers_jh = scywr::insertworker::spawn_scylla_insert_workers(
opts.scyconf.clone(),
opts.insert_scylla_sessions,
@@ -214,13 +207,13 @@ impl Daemon {
count_assigned: 0,
last_status_print: SystemTime::now(),
insert_workers_jh,
ingest_commons,
caconn_last_channel_check: Instant::now(),
stats: Arc::new(DaemonStats::new()),
shutting_down: false,
insert_rx_weak: query_item_rx.downgrade(),
connset_ctrl: conn_set_ctrl,
connset_status_last: Instant::now(),
insert_workers_running: AtomicU64::new(0),
};
Ok(ret)
}
@@ -239,10 +232,7 @@ impl Daemon {
async fn handle_timer_tick(&mut self) -> Result<(), Error> {
if self.shutting_down {
let nworkers = self
.ingest_commons
.insert_workers_running
.load(atomic::Ordering::Acquire);
let nworkers = self.insert_workers_running.load(atomic::Ordering::Acquire);
let nitems = self
.insert_rx_weak
.upgrade()
@@ -253,7 +243,7 @@ impl Daemon {
std::process::exit(0);
}
}
self.stats.handle_timer_tick_count_inc();
self.stats.handle_timer_tick_count.inc();
let ts1 = Instant::now();
let tsnow = SystemTime::now();
if SIGINT.load(atomic::Ordering::Acquire) == 1 {
@@ -406,7 +396,7 @@ impl Daemon {
async fn handle_event(&mut self, item: DaemonEvent) -> Result<(), Error> {
use DaemonEvent::*;
self.stats.events_inc();
self.stats.events.inc();
let ts1 = Instant::now();
let item_summary = item.summary();
let ret = match item {
@@ -416,7 +406,7 @@ impl Daemon {
match tx.send(i.wrapping_add(1)).await {
Ok(_) => {}
Err(_) => {
self.stats.ticker_token_release_error_inc();
self.stats.ticker_token_release_error.inc();
error!("can not send ticker token");
return Err(Error::with_msg_no_trace("can not send ticker token"));
}
@@ -462,7 +452,7 @@ impl Daemon {
match ticker_inp_rx.recv().await {
Ok(_) => {}
Err(_) => {
stats.ticker_token_acquire_error_inc();
stats.ticker_token_acquire_error.inc();
break;
}
}
@@ -494,7 +484,6 @@ impl Daemon {
}
}
}
warn!("TODO should not have to close the channel");
warn!("TODO wait for insert workers");
while let Some(jh) = self.insert_workers_jh.pop() {
match jh.await.map_err(Error::from_string) {
-22
View File
@@ -32,28 +32,6 @@ lazy_static::lazy_static! {
pub static ref METRICS: Mutex<Option<CaConnStatsAgg>> = Mutex::new(None);
}
pub struct IngestCommons {
pub pgconf: Arc<Database>,
pub backend: String,
pub local_epics_hostname: String,
pub data_store: Arc<DataStore>,
pub insert_ivl_min: Arc<AtomicU64>,
pub extra_inserts_conf: TokMx<ExtraInsertsConf>,
pub insert_frac: Arc<AtomicU64>,
pub store_workers_rate: Arc<AtomicU64>,
pub insert_workers_running: Arc<AtomicU64>,
}
impl From<&IngestCommons> for InsertWorkerOpts {
fn from(val: &IngestCommons) -> Self {
Self {
store_workers_rate: val.store_workers_rate.clone(),
insert_workers_running: val.insert_workers_running.clone(),
insert_frac: val.insert_frac.clone(),
}
}
}
pub trait SlowWarnable {
fn slow_warn(self, ms: u64) -> SlowWarn<Pin<Box<Self>>>
where
+21 -79
View File
@@ -415,17 +415,6 @@ impl CanSendChannelInfoResult for SendSeriesLookup {
}
}
struct ChannelOpsResources<'a> {
channel_set_ops: &'a StdMutex<BTreeMap<String, ChannelSetOp>>,
channels: &'a mut BTreeMap<Cid, ChannelState>,
cid_by_name: &'a mut BTreeMap<String, Cid>,
name_by_cid: &'a mut BTreeMap<Cid, String>,
cid_store: &'a mut CidStore,
init_state_count: &'a mut u64,
channel_set_ops_flag: &'a AtomicUsize,
time_binners: &'a mut BTreeMap<Cid, ConnTimeBin>,
}
pub struct CaConnOpts {
insert_queue_max: usize,
array_truncate: usize,
@@ -558,7 +547,7 @@ impl CaConn {
kind: ConnCommandResultKind::CheckHealth,
};
self.cmd_res_queue.push_back(res);
//self.stats.caconn_command_can_not_reply_inc();
//self.stats.caconn_command_can_not_reply.inc();
}
fn cmd_find_channel(&self, pattern: &str) {
@@ -608,13 +597,13 @@ impl CaConn {
fn cmd_channel_add(&mut self, name: String, cssid: ChannelStatusSeriesId) {
self.channel_add(name, cssid);
// TODO return the result
//self.stats.caconn_command_can_not_reply_inc();
//self.stats.caconn_command_can_not_reply.inc();
}
fn cmd_channel_remove(&mut self, name: String) {
self.channel_remove(name);
// TODO return the result
//self.stats.caconn_command_can_not_reply_inc();
//self.stats.caconn_command_can_not_reply.inc();
}
fn cmd_shutdown(&mut self) {
@@ -679,7 +668,7 @@ impl CaConn {
fn handle_conn_command(&mut self, cx: &mut Context) -> Poll<Option<Result<(), Error>>> {
// TODO if this loops for too long time, yield and make sure we get wake up again.
use Poll::*;
self.stats.caconn_loop3_count_inc();
self.stats.caconn_loop3_count.inc();
match self.conn_command_rx.poll_next_unpin(cx) {
Ready(Some(a)) => {
trace!("handle_conn_command received a command {}", self.remote_addr_dbg);
@@ -891,15 +880,9 @@ impl CaConn {
_ => {}
}
}
self.stats
.channel_all_count
.store(self.channels.len() as _, Ordering::Release);
self.stats
.channel_alive_count
.store(alive_count as _, Ordering::Release);
self.stats
.channel_not_alive_count
.store(not_alive_count as _, Ordering::Release);
self.stats.channel_all_count.__set(self.channels.len() as _);
self.stats.channel_alive_count.__set(alive_count as _);
self.stats.channel_not_alive_count.__set(not_alive_count as _);
Ok(())
}
@@ -954,7 +937,7 @@ impl CaConn {
series: SeriesId,
) -> Result<(), Error> {
let tsnow = Instant::now();
self.stats.get_series_id_ok_inc();
self.stats.get_series_id_ok.inc();
if series.id() == 0 {
warn!("Weird series id: {series:?}");
}
@@ -1058,7 +1041,7 @@ impl CaConn {
ts_msp_grid,
};
item_queue.push_back(QueryItem::Insert(item));
stats.insert_item_create_inc();
stats.insert_item_create.inc();
Ok(())
}
@@ -1172,15 +1155,15 @@ impl CaConn {
let ts = ev.value.ts.map_or(0, |x| x.get());
let ts_diff = ts.abs_diff(ts_local);
if ts_diff > SEC * 300 {
self.stats.ca_ts_off_4_inc();
self.stats.ca_ts_off_4.inc();
//warn!("Bad time for {name} {ts} vs {ts_local} diff {}", ts_diff / SEC);
// TODO mute this channel for some time, discard the event.
} else if ts_diff > SEC * 120 {
self.stats.ca_ts_off_3_inc();
self.stats.ca_ts_off_3.inc();
} else if ts_diff > SEC * 20 {
self.stats.ca_ts_off_2_inc();
self.stats.ca_ts_off_2.inc();
} else if ts_diff > SEC * 3 {
self.stats.ca_ts_off_1_inc();
self.stats.ca_ts_off_1.inc();
}
if tsnow >= st.insert_next_earliest {
//let channel_state = self.channels.get_mut(&cid).unwrap();
@@ -1238,7 +1221,7 @@ impl CaConn {
extra_inserts_conf,
)?;
} else {
self.stats.channel_fast_item_drop_inc();
self.stats.channel_fast_item_drop.inc();
if tsnow.duration_since(st.insert_recv_ivl_last) >= Duration::from_millis(10000) {
st.insert_recv_ivl_last = tsnow;
let ema = st.insert_item_ivl_ema.ema();
@@ -1369,7 +1352,7 @@ impl CaConn {
let ts2 = Instant::now();
self.stats
.time_check_channels_state_init
.fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::Release);
.add((ts2.duration_since(ts1) * MS as u32).as_secs());
ts1 = ts2;
let mut do_wake_again = false;
if msgs_tmp.len() > 0 {
@@ -1456,12 +1439,12 @@ impl CaConn {
}
CaMsgTy::EventAddRes(k) => {
trace!("got EventAddRes: {k:?}");
self.stats.caconn_recv_data_inc();
self.stats.caconn_recv_data.inc();
let res = Self::handle_event_add_res(self, k, tsnow);
let ts2 = Instant::now();
self.stats
.time_handle_event_add_res
.fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::AcqRel);
.add((ts2.duration_since(ts1) * MS as u32).as_secs());
ts1 = ts2;
let _ = ts1;
res?
@@ -1638,7 +1621,7 @@ impl CaConn {
fn loop_inner(&mut self, cx: &mut Context) -> Result<Option<Poll<()>>, Error> {
use Poll::*;
loop {
self.stats.caconn_loop2_count_inc();
self.stats.caconn_loop2_count.inc();
if self.is_shutdown() {
break Ok(None);
}
@@ -1653,47 +1636,6 @@ impl CaConn {
}
}
fn apply_channel_ops_with_res(res: ChannelOpsResources) {
let mut g = res.channel_set_ops.lock().unwrap();
let map = std::mem::replace(&mut *g, BTreeMap::new());
for (ch, op) in map {
match op {
ChannelSetOp::Add(cssid) => Self::channel_add_expl(
ch,
cssid,
res.channels,
res.cid_by_name,
res.name_by_cid,
res.cid_store,
res.init_state_count,
),
ChannelSetOp::Remove => Self::channel_remove_expl(
ch,
res.channels,
res.cid_by_name,
res.name_by_cid,
res.cid_store,
res.time_binners,
),
}
}
res.channel_set_ops_flag.store(0, atomic::Ordering::Release);
}
fn apply_channel_ops(&mut self) {
let res = ChannelOpsResources {
channel_set_ops: err::todoval(),
channels: &mut self.channels,
cid_by_name: &mut self.cid_by_name,
name_by_cid: &mut self.name_by_cid,
cid_store: &mut self.cid_store,
init_state_count: &mut self.init_state_count,
channel_set_ops_flag: err::todoval(),
time_binners: &mut self.time_binners,
};
Self::apply_channel_ops_with_res(res)
}
fn handle_own_ticker(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> {
use Poll::*;
match self.ticker.poll_unpin(cx) {
@@ -1757,7 +1699,7 @@ impl Stream for CaConn {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
self.stats.caconn_poll_count_inc();
self.stats.caconn_poll_count.inc();
loop {
let mut have_pending = false;
break if let CaConnState::EndOfStream = self.state {
@@ -1798,8 +1740,8 @@ impl Stream for CaConn {
} {
Ready(Some(item))
} else {
// Ready(_) => self.stats.conn_stream_ready_inc(),
// Pending => self.stats.conn_stream_pending_inc(),
// Ready(_) => self.stats.conn_stream_ready.inc(),
// Pending => self.stats.conn_stream_pending.inc(),
let _item = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::None,
+13 -13
View File
@@ -520,20 +520,20 @@ impl CaConnSet {
if let Some(e) = self.ca_conn_ress.remove(&addr) {
match e.jh.await {
Ok(Ok(())) => {
self.stats.ca_conn_task_join_done_ok_inc();
self.stats.ca_conn_task_join_done_ok.inc();
debug!("CaConn {addr} finished well");
}
Ok(Err(e)) => {
self.stats.ca_conn_task_join_done_err_inc();
self.stats.ca_conn_task_join_done_err.inc();
error!("CaConn {addr} task error: {e}");
}
Err(e) => {
self.stats.ca_conn_task_join_err_inc();
self.stats.ca_conn_task_join_err.inc();
error!("CaConn {addr} join error: {e}");
}
}
} else {
self.stats.ca_conn_task_eos_non_exist_inc();
self.stats.ca_conn_task_eos_non_exist.inc();
warn!("end-of-stream received for non-existent CaConn {addr}");
}
Ok(())
@@ -581,7 +581,7 @@ impl CaConnSet {
while let Some(item) = conn.next().await {
match item {
Ok(item) => {
stats.conn_item_count_inc();
stats.conn_item_count.inc();
conn_item_tx
.send(CaConnSetEvent::CaConnEvent((SocketAddr::V4(addr), item)))
.await?;
@@ -721,7 +721,7 @@ impl CaConnSet {
if tsnow.duration_since(state.last_feedback) > Duration::from_millis(20000) {
error!("TODO Fresh timeout send connection-close for {addr}");
// TODO collect in metrics
// self.stats.ca_conn_status_feedback_timeout_inc();
// self.stats.ca_conn_status_feedback_timeout.inc();
// TODO send shutdown to this CaConn, check that we've received
// a 'shutdown' state from it. (see below)
*v = CaConnStateValue::Shutdown { since: tsnow };
@@ -732,14 +732,14 @@ impl CaConnSet {
if tsnow.duration_since(state.last_feedback) > Duration::from_millis(20000) {
error!("TODO HadFeedback timeout send connection-close for {addr}");
// TODO collect in metrics
// self.stats.ca_conn_status_feedback_timeout_inc();
// self.stats.ca_conn_status_feedback_timeout.inc();
*v = CaConnStateValue::Shutdown { since: tsnow };
}
}
CaConnStateValue::Shutdown { since } => {
if tsnow.saturating_duration_since(*since) > Duration::from_millis(10000) {
// TODO collect in metrics as severe error, this would be a bug.
// self.stats.critical_error_inc();
// self.stats.critical_error.inc();
error!("Shutdown of CaConn failed for {addr}");
}
}
@@ -908,11 +908,11 @@ impl CaConnSet {
}
}
use atomic::Ordering::Release;
self.stats.channel_unknown_address.store(unknown_address, Release);
self.stats.channel_search_pending.store(search_pending, Release);
self.stats.channel_no_address.store(no_address, Release);
self.stats.channel_unassigned.store(unassigned, Release);
self.stats.channel_assigned.store(assigned, Release);
self.stats.channel_unknown_address.__set(unknown_address);
self.stats.channel_search_pending.__set(search_pending);
self.stats.channel_no_address.__set(no_address);
self.stats.channel_unassigned.__set(unassigned);
self.stats.channel_assigned.__set(assigned);
(search_pending,)
}
}
+13 -5
View File
@@ -1,11 +1,12 @@
use crate::ca::IngestCommons;
use crate::ca::METRICS;
use crate::daemon_common::DaemonEvent;
use async_channel::Sender;
use async_channel::WeakSender;
use axum::extract::Query;
use err::Error;
use http::Request;
use log::*;
use scywr::iteminsertqueue::QueryItem;
use serde::Deserialize;
use serde::Serialize;
use stats::CaConnStats;
@@ -251,7 +252,7 @@ pub async fn start_metrics_service(bind_to: String, dcom: Arc<DaemonComm>, stats
}
pub async fn metrics_agg_task(
ingest_commons: Arc<IngestCommons>,
query_item_chn: WeakSender<QueryItem>,
local_stats: Arc<CaConnStats>,
store_stats: Arc<CaConnStats>,
) -> Result<(), Error> {
@@ -262,6 +263,14 @@ pub async fn metrics_agg_task(
agg.push(&local_stats);
agg.push(&store_stats);
trace!("TODO metrics_agg_task");
// TODO when a CaConn is closed, I'll lose the so far collected counts, which creates a jump
// in the metrics.
// To make this sound:
// Let CaConn keep a stats and just count.
// At the tick, create a snapshot: all atomics are copied after each other.
// Diff this new snapshot with an older snapshot and send that.
// Note: some stats are counters, but some are current values.
// e.g. the number of active channels should go down when a CaConn stops.
#[cfg(DISABLED)]
{
let conn_stats_guard = ingest_commons.ca_conn_set.ca_conn_ress().lock().await;
@@ -271,9 +280,8 @@ pub async fn metrics_agg_task(
}
{
warn!("TODO provide metrics with a weak ref to the query_item_channel");
let nitems = 0;
// let nitems = weak.upgrade()..len();
agg.store_worker_recv_queue_len.store(nitems, Ordering::Release);
let nitems = query_item_chn.upgrade().map_or(0, |x| x.len());
agg.store_worker_recv_queue_len.__set(nitems as u64);
}
let mut m = METRICS.lock().unwrap();
*m = Some(agg.clone());
+16 -16
View File
@@ -24,22 +24,22 @@ fn stats_inc_for_err(stats: &stats::CaConnStats, err: &crate::iteminsertqueue::E
use crate::iteminsertqueue::Error;
match err {
Error::DbOverload => {
stats.store_worker_insert_overload_inc();
stats.store_worker_insert_overload.inc();
}
Error::DbTimeout => {
stats.store_worker_insert_timeout_inc();
stats.store_worker_insert_timeout.inc();
}
Error::DbUnavailable => {
stats.store_worker_insert_unavailable_inc();
stats.store_worker_insert_unavailable.inc();
}
Error::DbError(e) => {
if false {
warn!("db error {e}");
}
stats.store_worker_insert_error_inc();
stats.store_worker_insert_error.inc();
}
Error::QueryError(_) => {
stats.store_worker_insert_error_inc();
stats.store_worker_insert_error.inc();
}
}
}
@@ -103,7 +103,7 @@ async fn rate_limiter_worker(
let ivl2 = Duration::from_nanos(ema2.ema() as u64);
if allowed_to_drop && ivl2 < dt_min {
//tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await;
stats.store_worker_ratelimit_drop_inc();
stats.store_worker_ratelimit_drop.inc();
} else {
if tx.send(item).await.is_err() {
break;
@@ -113,7 +113,7 @@ async fn rate_limiter_worker(
let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64;
ivl_ema.update(dt_ns.min(MS * 100) as f32);
ts_forward_last = tsnow;
stats.inter_ivl_ema.store(ivl_ema.ema() as u64, Ordering::Release);
stats.inter_ivl_ema.set(ivl_ema.ema() as u64);
}
}
}
@@ -146,7 +146,7 @@ async fn worker(
let mut i1 = 0;
loop {
let item = if let Ok(item) = item_inp.recv().await {
stats.store_worker_item_recv_inc();
stats.store_worker_item_recv.inc();
item
} else {
break;
@@ -155,7 +155,7 @@ async fn worker(
QueryItem::ConnectionStatus(item) => {
match insert_connection_status(item, ttls.index, &data_store, &stats).await {
Ok(_) => {
stats.connection_status_insert_done_inc();
stats.connection_status_insert_done.inc();
backoff = backoff_0;
}
Err(e) => {
@@ -167,7 +167,7 @@ async fn worker(
QueryItem::ChannelStatus(item) => {
match insert_channel_status(item, ttls.index, &data_store, &stats).await {
Ok(_) => {
stats.channel_status_insert_done_inc();
stats.channel_status_insert_done.inc();
backoff = backoff_0;
}
Err(e) => {
@@ -181,7 +181,7 @@ async fn worker(
if i1 % 1000 < insert_frac {
match insert_item(item, ttls.index, ttls.d0, ttls.d1, &data_store, &stats).await {
Ok(_) => {
stats.store_worker_insert_done_inc();
stats.store_worker_insert_done.inc();
backoff = backoff_0;
}
Err(e) => {
@@ -190,7 +190,7 @@ async fn worker(
}
}
} else {
stats.store_worker_fraction_drop_inc();
stats.store_worker_fraction_drop.inc();
}
i1 += 1;
}
@@ -206,7 +206,7 @@ async fn worker(
let qres = data_store.scy.execute(&data_store.qu_insert_muted, values).await;
match qres {
Ok(_) => {
stats.mute_insert_done_inc();
stats.mute_insert_done.inc();
backoff = backoff_0;
}
Err(e) => {
@@ -230,7 +230,7 @@ async fn worker(
.await;
match qres {
Ok(_) => {
stats.ivl_insert_done_inc();
stats.ivl_insert_done.inc();
backoff = backoff_0;
}
Err(e) => {
@@ -252,7 +252,7 @@ async fn worker(
let qres = data_store.scy.execute(&data_store.qu_insert_channel_ping, params).await;
match qres {
Ok(_) => {
stats.channel_info_insert_done_inc();
stats.channel_info_insert_done.inc();
backoff = backoff_0;
}
Err(e) => {
@@ -281,7 +281,7 @@ async fn worker(
.await;
match qres {
Ok(_) => {
stats.store_worker_insert_binned_done_inc();
stats.store_worker_insert_binned_done.inc();
backoff = backoff_0;
}
Err(e) => {
+3 -3
View File
@@ -388,7 +388,7 @@ pub async fn insert_item(
if item.msp_bump {
let params = (item.series.id() as i64, item.ts_msp as i64, ttl_index.as_secs() as i32);
data_store.scy.execute(&data_store.qu_insert_ts_msp, params).await?;
stats.inserts_msp_inc();
stats.inserts_msp.inc();
}
if let Some(ts_msp_grid) = item.ts_msp_grid {
let params = (
@@ -403,7 +403,7 @@ pub async fn insert_item(
.scy
.execute(&data_store.qu_insert_series_by_ts_msp, params)
.await?;
stats.inserts_msp_grid_inc();
stats.inserts_msp_grid.inc();
}
use DataValue::*;
match item.val {
@@ -446,7 +446,7 @@ pub async fn insert_item(
}
}
}
stats.inserts_val_inc();
stats.inserts_val.inc();
Ok(())
}
+25 -2
View File
@@ -207,6 +207,7 @@ impl IntervalEma {
}
}
// #[cfg(DISABLED)]
stats_proc::stats_struct!((
stats_struct(
name(CaConnSetStats),
@@ -224,9 +225,10 @@ stats_proc::stats_struct!((
),
),
// agg(name(CaConnSetStatsAgg), parent(CaConnSetStats)),
diff(name(CaConnSetStatsDiff), input(CaConnSetStats)),
// diff(name(CaConnSetStatsDiff), input(CaConnSetStats)),
));
// #[cfg(DISABLED)]
stats_proc::stats_struct!((
stats_struct(
name(CaConnStats),
@@ -280,13 +282,14 @@ stats_proc::stats_struct!((
ca_ts_off_2,
ca_ts_off_3,
ca_ts_off_4,
inter_ivl_ema,
),
values(inter_ivl_ema)
),
agg(name(CaConnStatsAgg), parent(CaConnStats)),
diff(name(CaConnStatsAggDiff), input(CaConnStatsAgg)),
));
// #[cfg(DISABLED)]
stats_proc::stats_struct!((
stats_struct(
name(DaemonStats),
@@ -320,3 +323,23 @@ stats_proc::stats_struct!((
agg(name(DaemonStatsAgg), parent(DaemonStats)),
diff(name(DaemonStatsAggDiff), input(DaemonStatsAgg)),
));
stats_proc::stats_struct!((
stats_struct(name(TestStats0), counters(count0,), values(val0),),
diff(name(TestStats0Diff), input(TestStats0)),
agg(name(TestStats0Agg), parent(TestStats0)),
diff(name(TestStats0AggDiff), input(TestStats0Agg)),
));
#[test]
fn test0_diff() {
let stats_a = TestStats0::new();
stats_a.count0().inc();
stats_a.val0().set(43);
let stats_b = stats_a.snapshot();
stats_b.count0().inc();
stats_b.count0().inc();
stats_b.count0().inc();
let diff = TestStats0Diff::diff_from(&stats_a, &stats_b);
assert_eq!(diff.count0.load(), 3);
}
+62 -37
View File
@@ -1,7 +1,8 @@
use proc_macro::TokenStream;
use quote::quote;
use syn::parse::ParseStream;
use syn::{parse_macro_input, Ident};
use syn::parse_macro_input;
use syn::Ident;
type PunctExpr = syn::punctuated::Punctuated<syn::Expr, syn::token::Comma>;
@@ -43,11 +44,11 @@ fn stats_struct_impl(st: &StatsStructDef) -> String {
let inits1 = st
.counters
.iter()
.map(|x| format!("{:12}{}: AtomicU64::new(0)", "", x.to_string()));
.map(|x| format!("{:12}{}: stats_types::Counter::new()", "", x.to_string()));
let inits2 = st
.values
.iter()
.map(|x| format!("{:12}{}: AtomicU64::new(0)", "", x.to_string()));
.map(|x| format!("{:12}{}: stats_types::Value::new()", "", x.to_string()));
let inits: Vec<_> = inits1.into_iter().chain(inits2).collect();
let inits = inits.join(",\n");
let incers: String = st
@@ -56,15 +57,12 @@ fn stats_struct_impl(st: &StatsStructDef) -> String {
.map(|nn| {
format!(
"
pub fn {nn}_inc(&self) {{
self.{nn}.fetch_add(1, Ordering::AcqRel);
}}
pub fn {nn}_add(&self, v: u64) {{
self.{nn}.fetch_add(v, Ordering::AcqRel);
}}
pub fn {nn}_dur(&self, v: Duration) {{
self.{nn}.fetch_add((v * 1000000).as_secs(), Ordering::AcqRel);
pub fn {nn}(&self) -> &stats_types::Counter {{
&self.{nn}
}}
//pub fn {nn}_dur(&self, v: Duration) {{
// self.{nn}.fetch_add((v * 1000000).as_secs(), Ordering::AcqRel);
//}}
"
)
})
@@ -78,8 +76,8 @@ fn stats_struct_impl(st: &StatsStructDef) -> String {
write!(
buf,
"
pub fn {nn}_set(&self, v: u64) {{
self.{nn}.store(v, Ordering::Release);
pub fn {nn}(&self) -> &stats_types::Value {{
&self.{nn}
}}
"
)
@@ -97,7 +95,7 @@ fn stats_struct_impl(st: &StatsStructDef) -> String {
String::new()
};
buf.push_str(&format!(
"ret.push_str(&format!(\"daqingest{}_{} {{}}\\n\", self.{}.load(Ordering::Acquire)));\n",
"ret.push_str(&format!(\"daqingest{}_{} {{}}\\n\", self.{}.load()));\n",
pre, n, n
));
}
@@ -109,7 +107,7 @@ fn stats_struct_impl(st: &StatsStructDef) -> String {
n.to_string()
};
buf.push_str(&format!(
"ret.push_str(&format!(\"daqingest_{} {{}}\\n\", self.{}.load(Ordering::Acquire)));\n",
"ret.push_str(&format!(\"daqingest_{} {{}}\\n\", self.{}.load()));\n",
nn, n
));
}
@@ -123,12 +121,35 @@ fn stats_struct_impl(st: &StatsStructDef) -> String {
"
)
};
let fn_snapshot = {
let mut init_counters = String::new();
for x in &st.counters {
let n = x.to_string();
init_counters.push_str(&format!("ret.{}.__set(self.{}.load());\n", n, n));
}
let mut init_values = String::new();
for x in &st.values {
let n = x.to_string();
init_values.push_str(&format!("ret.{}.set(self.{}.load());\n", n, n));
}
format!(
"
pub fn snapshot(&self) -> Self {{
let ret = Self::new();
{init_counters}
{init_values}
ret
}}
"
)
};
format!(
"
impl {name} {{
pub fn new() -> Self {{
Self {{
ts_create: Instant::now(),
dropped: stats_types::Value::new(),
{inits}
}}
}}
@@ -138,8 +159,17 @@ impl {name} {{
{values}
{fn_prometheus}
{fn_snapshot}
}}
"
impl stats_types::DropMark for {name} {{
fn field(&self) -> &stats_types::Value {{
&self.dropped
}}
}}
"
)
}
@@ -148,17 +178,18 @@ fn stats_struct_decl_impl(st: &StatsStructDef) -> String {
let counters_decl = st
.counters
.iter()
.map(|x| format!("{:4}pub {}: AtomicU64,\n", "", x.to_string()))
.map(|x| format!("{:4}pub {}: stats_types::Counter,\n", "", x.to_string()))
.fold(String::new(), extend_str);
let values_decl = st
.values
.iter()
.map(|x| format!("{:4}pub {}: AtomicU64,\n", "", x.to_string()))
.map(|x| format!("{:4}pub {}: stats_types::Value,\n", "", x.to_string()))
.fold(String::new(), extend_str);
let structt = format!(
"
pub struct {name} {{
pub ts_create: Instant,
dropped: stats_types::Value,
{counters_decl}
{values_decl}
}}
@@ -175,7 +206,7 @@ fn agg_decl_impl(st: &StatsStructDef, ag: &AggStructDef) -> String {
let counters_decl = st
.counters
.iter()
.map(|x| format!("{:4}pub {}: AtomicU64,\n", "", x.to_string()))
.map(|x| format!("{:4}pub {}: stats_types::Counter,\n", "", x.to_string()))
.fold(String::new(), extend_str);
let mut code = String::new();
let s = format!(
@@ -183,7 +214,7 @@ fn agg_decl_impl(st: &StatsStructDef, ag: &AggStructDef) -> String {
// Agg decl
pub struct {name} {{
pub ts_create: Instant,
pub aggcount: AtomicU64,
pub aggcount: stats_types::Counter,
{counters_decl}
}}
"
@@ -194,7 +225,7 @@ pub struct {name} {{
.iter()
.map(|x| {
let n = x.to_string();
format!("{:12}{}: AtomicU64::new(self.{}.load(Ordering::Acquire)),\n", "", n, n)
format!("{:12}{}: stats_types::Counter::init(self.{}.load()),\n", "", n, n)
})
.fold(String::new(), extend_str);
let s = format!(
@@ -203,7 +234,7 @@ impl Clone for {name} {{
fn clone(&self) -> Self {{
Self {{
ts_create: self.ts_create.clone(),
aggcount: AtomicU64::new(self.aggcount.load(Ordering::Acquire)),
aggcount: stats_types::Counter::init(self.aggcount.load()),
{clone_counters}
}}
}}
@@ -214,7 +245,7 @@ impl Clone for {name} {{
let inits = st
.counters
.iter()
.map(|x| format!("{:12}{}: AtomicU64::new(0),\n", "", x.to_string()))
.map(|x| format!("{:12}{}: stats_types::Counter::new(),\n", "", x.to_string()))
.fold(String::new(), extend_str);
let s = format!(
"
@@ -223,7 +254,7 @@ impl {name} {{
pub fn new() -> Self {{
Self {{
ts_create: Instant::now(),
aggcount: AtomicU64::new(0),
aggcount: stats_types::Counter::new(),
{inits}
}}
}}
@@ -233,18 +264,12 @@ impl {name} {{
let counters_add = st
.counters
.iter()
.map(|x| {
format!(
"self.{}.fetch_add(inp.{}.load(Ordering::Acquire), Ordering::AcqRel);\n",
x.to_string(),
x.to_string()
)
})
.map(|x| format!("self.{}.add(inp.{}.load());\n", x.to_string(), x.to_string()))
.fold(String::new(), extend_str);
let s = format!(
"
pub fn push(&self, inp: &{name_inp}) {{
self.aggcount.fetch_add(1, Ordering::AcqRel);
self.aggcount.inc();
{counters_add}
}}
"
@@ -255,7 +280,7 @@ impl {name} {{
for x in &st.counters {
let n = x.to_string();
buf.push_str(&format!(
"ret.push_str(&format!(\"daqingest_{} {{}}\\n\", self.{}.load(Ordering::Acquire)));\n",
"ret.push_str(&format!(\"daqingest_{} {{}}\\n\", self.{}.load()));\n",
n, n
));
}
@@ -263,7 +288,7 @@ impl {name} {{
"
pub fn prometheus(&self) -> String {{
let mut ret = String::new();
ret.push_str(&format!(\"daqingest_aggcount {{}}\\n\", self.aggcount.load(Ordering::Acquire)));
ret.push_str(&format!(\"daqingest_aggcount {{}}\\n\", self.aggcount.load()));
{buf}
ret
}}
@@ -287,7 +312,7 @@ fn diff_decl_impl(st: &DiffStructDef, inp: &StatsStructDef) -> String {
let decl = inp
.counters
.iter()
.map(|x| format!("{:4}pub {}: AtomicU64,\n", "", x.to_string()))
.map(|x| format!("{:4}pub {}: stats_types::Counter,\n", "", x.to_string()))
.fold(String::new(), extend_str);
let mut code = String::new();
let s = format!(
@@ -309,7 +334,7 @@ pub struct {name} {{
.map(|x| {
let n = x.to_string();
format!(
"{:12}let {} = AtomicU64::new(b.{}.load(Ordering::Acquire) - a.{}.load(Ordering::Acquire));\n",
"{:12}let {} = stats_types::Counter::init(b.{}.load() - a.{}.load());\n",
"", n, n, n
)
})
@@ -339,7 +364,7 @@ pub struct {name} {{
let mut b = String::new();
for h in &inp.counters {
a.push_str(&format!("{} {{}} ", h.to_string()));
b.push_str(&format!("self.{}.load(Ordering::Acquire), ", h.to_string()));
b.push_str(&format!("self.{}.load(), ", h.to_string()));
}
let s = format!(
"
+98 -3
View File
@@ -1,15 +1,110 @@
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering::AcqRel;
use std::sync::atomic::Ordering::Acquire;
use std::sync::atomic::Ordering::Release;
#[derive(Debug)]
pub struct Counter {
pub struct CounterDesc {
pub name: String,
}
#[derive(Debug)]
pub struct StatsStruct {
pub name: String,
pub counters: Vec<Counter>,
pub counters: Vec<CounterDesc>,
}
#[derive(Debug)]
pub struct StatsStructDef {
pub name: String,
pub counters: Vec<Counter>,
pub counters: Vec<CounterDesc>,
}
#[derive(Debug)]
pub struct Counter(AtomicU64);
impl Counter {
pub fn new() -> Self {
Counter(AtomicU64::new(0))
}
pub fn init(x: u64) -> Self {
Counter(AtomicU64::new(x))
}
pub fn inc(&self) {
self.0.fetch_add(1, AcqRel);
}
pub fn add(&self, x: u64) {
self.0.fetch_add(x, AcqRel);
}
pub fn load(&self) -> u64 {
self.0.load(Acquire)
}
pub fn __set(&self, x: u64) {
self.0.store(x, Release);
}
}
#[derive(Debug)]
pub struct Value(AtomicU64);
impl Value {
pub fn new() -> Self {
Value(AtomicU64::new(0))
}
pub fn init(x: u64) -> Self {
Value(AtomicU64::new(x))
}
pub fn set(&self, x: u64) {
self.0.store(x, Release);
}
pub fn load(&self) -> u64 {
self.0.load(Acquire)
}
}
pub trait DropMark {
fn field(&self) -> &Value;
}
pub struct DropGuard<'a> {
mark: &'a Value,
}
impl<'a> Drop for DropGuard<'a> {
fn drop(&mut self) {
self.mark.set(1);
}
}
#[allow(unused)]
struct StatsAInner {
count0: Counter,
val0: Value,
done: Value,
}
#[allow(unused)]
struct StatsA {
inner: std::sync::Arc<StatsAInner>,
}
impl Drop for StatsA {
fn drop(&mut self) {
self.inner.done.set(1);
}
}
#[allow(unused)]
struct StatsAReader {
inner: std::sync::Arc<StatsAInner>,
}
impl StatsAReader {}