diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 8536aa6..a1c3730 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -37,7 +37,6 @@ use std::time::SystemTime; use taskrun::tokio; use tokio::task::JoinHandle; -const CHECK_HEALTH_IVL: Duration = Duration::from_millis(2000); const CHECK_HEALTH_TIMEOUT: Duration = Duration::from_millis(5000); const PRINT_ACTIVE_INTERVAL: Duration = Duration::from_millis(60000); const PRINT_STATUS_INTERVAL: Duration = Duration::from_millis(20000); @@ -77,7 +76,6 @@ pub struct Daemon { connset_status_last: Instant, // TODO should be a stats object? insert_workers_running: AtomicU64, - connset_health_lat_ema: f32, metrics_shutdown_tx: Sender, metrics_shutdown_rx: Receiver, metrics_jh: Option>>, @@ -412,7 +410,6 @@ impl Daemon { connset_ctrl: conn_set_ctrl, connset_status_last: Instant::now(), insert_workers_running: AtomicU64::new(0), - connset_health_lat_ema: 0., metrics_shutdown_tx, metrics_shutdown_rx, metrics_jh: None, diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index a958222..741563a 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -2,7 +2,6 @@ mod enumfetch; use crate::conf::ChannelConfig; use crate::metrics::status::StorageUsage; -use crate::metrics::types::CaConnMetrics; use crate::throttletrace::ThrottleTrace; use async_channel::Receiver; use async_channel::Sender; diff --git a/netfetch/src/metrics/types.rs b/netfetch/src/metrics/types.rs index f0520b9..12a3a83 100644 --- a/netfetch/src/metrics/types.rs +++ b/netfetch/src/metrics/types.rs @@ -1,64 +1,6 @@ use scywr::insertqueues::InsertQueuesTx; use serde::Serialize; -#[derive(Debug, Serialize)] -pub struct CaConnMetrics { - // a value - pub ca_conn_event_out_queue_len: usize, - // a term of a running counter sum - pub ca_msg_recv_cnt: u64, -} - -#[derive(Debug, Serialize)] -pub struct CaConnMetricsAgg { - // derived from values - pub ca_conn_event_out_queue_len_max: usize, - // derived from counter terms - pub ca_msg_recv_cnt_all: u64, -} - -impl CaConnMetricsAgg { - pub fn new() -> Self { - Self { - ca_conn_event_out_queue_len_max: 0, - ca_msg_recv_cnt_all: 0, - } - } - - pub fn ingest(&mut self, inp: CaConnMetrics) { - self.ca_conn_event_out_queue_len_max = self - .ca_conn_event_out_queue_len_max - .max(inp.ca_conn_event_out_queue_len); - self.ca_msg_recv_cnt_all += inp.ca_msg_recv_cnt; - } -} - -#[derive(Debug, Serialize)] -pub struct CaConnMetricsAggAgg { - // derived from values - pub ca_conn_event_out_queue_len_max: usize, - // derived from counter terms - pub ca_msg_recv_cnt_all: u64, -} - -impl CaConnMetricsAggAgg { - pub fn new() -> Self { - Self { - ca_conn_event_out_queue_len_max: 0, - ca_msg_recv_cnt_all: 0, - } - } - - pub fn ingest(&mut self, inp: CaConnMetricsAgg) { - // take again the max of the maxs - self.ca_conn_event_out_queue_len_max = self - .ca_conn_event_out_queue_len_max - .max(inp.ca_conn_event_out_queue_len_max); - // sum up again to a total - self.ca_msg_recv_cnt_all += inp.ca_msg_recv_cnt_all; - } -} - #[derive(Debug, Serialize)] pub struct CaConnSetMetrics { pub ca_conn_agg: CaConnMetricsAgg, diff --git a/scywr/src/fut.rs b/scywr/src/fut.rs index 2fcbffe..4f77adf 100644 --- a/scywr/src/fut.rs +++ b/scywr/src/fut.rs @@ -2,10 +2,10 @@ use crate::access::Error; use crate::session::ScySession; use futures_util::Future; use futures_util::FutureExt; +use scylla::QueryResult; use scylla::frame::value::ValueList; use scylla::prepared_statement::PreparedStatement; use scylla::transport::errors::QueryError; -use scylla::QueryResult; use std::pin::Pin; use std::task::Context; use std::task::Poll; diff --git a/scywr/src/senderpolling.rs b/scywr/src/senderpolling.rs index d8ff42f..c639e68 100644 --- a/scywr/src/senderpolling.rs +++ b/scywr/src/senderpolling.rs @@ -125,15 +125,19 @@ impl SenderPolling { } unsafe fn reset_fut(futopt: Pin<&mut Option>>) { - let y = futopt.get_unchecked_mut(); - let z = y.as_mut().unwrap_unchecked(); - std::ptr::drop_in_place(z); - std::ptr::write(y, None); + unsafe { + let y = futopt.get_unchecked_mut(); + let z = y.as_mut().unwrap_unchecked(); + std::ptr::drop_in_place(z); + std::ptr::write(y, None); + } } #[allow(unused)] unsafe fn reset_fut_old(futopt: Pin<&mut Option>>) { - *futopt.get_unchecked_mut() = None; + unsafe { + *futopt.get_unchecked_mut() = None; + } } }