Join http api on shutdown
This commit is contained in:
@@ -15,6 +15,7 @@ use netfetch::daemon_common::Channel;
|
||||
use netfetch::daemon_common::DaemonEvent;
|
||||
use netfetch::metrics::ExtraInsertsConf;
|
||||
use netfetch::metrics::StatsSet;
|
||||
use netfetch::throttletrace::ThrottleTrace;
|
||||
use netpod::Database;
|
||||
use netpod::ScyllaConfig;
|
||||
use scywr::insertworker::InsertWorkerOpts;
|
||||
@@ -592,16 +593,19 @@ pub async fn run(opts: CaIngestOpts, channels: Vec<String>) -> Result<(), Error>
|
||||
let tx = daemon.tx.clone();
|
||||
let daemon_stats = daemon.stats().clone();
|
||||
|
||||
let (metrics_shutdown_tx, metrics_shutdown_rx) = async_channel::bounded(8);
|
||||
|
||||
let dcom = Arc::new(netfetch::metrics::DaemonComm::new(tx.clone()));
|
||||
let metrics_jh = {
|
||||
let stats_set = StatsSet::new(daemon_stats);
|
||||
let fut = netfetch::metrics::start_metrics_service(opts.api_bind(), dcom, stats_set);
|
||||
let fut = netfetch::metrics::metrics_service(opts.api_bind(), dcom, stats_set, metrics_shutdown_rx);
|
||||
tokio::task::spawn(fut)
|
||||
};
|
||||
|
||||
let daemon_jh = taskrun::spawn(daemon.daemon());
|
||||
|
||||
debug!("will configure {} channels", channels.len());
|
||||
let mut thr_msg = ThrottleTrace::new(Duration::from_millis(1000));
|
||||
let mut i = 0;
|
||||
for s in &channels {
|
||||
let ch = Channel::new(s.into());
|
||||
@@ -609,15 +613,17 @@ pub async fn run(opts: CaIngestOpts, channels: Vec<String>) -> Result<(), Error>
|
||||
Ok(()) => {}
|
||||
Err(_) => break,
|
||||
}
|
||||
thr_msg.trigger_fmt("sent ChannelAdd", &[&i as &_]);
|
||||
i += 1;
|
||||
if i % 100 == 0 {
|
||||
debug!("sent {} ChannelAdd", i);
|
||||
}
|
||||
// if i % 100 == 0 {
|
||||
// debug!("sent {} ChannelAdd", i);
|
||||
// }
|
||||
}
|
||||
debug!("{} configured channels applied", channels.len());
|
||||
daemon_jh.await.map_err(|e| Error::with_msg_no_trace(e.to_string()))??;
|
||||
if false {
|
||||
metrics_jh.await.unwrap();
|
||||
}
|
||||
info!("Daemon joined.");
|
||||
metrics_shutdown_tx.send(1).await?;
|
||||
metrics_jh.await.unwrap();
|
||||
info!("Metrics joined.");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use super::proto;
|
||||
use super::ExtraInsertsConf;
|
||||
use crate::senderpolling::SenderPolling;
|
||||
use crate::throttletrace::ThrottleTrace;
|
||||
use crate::timebin::ConnTimeBin;
|
||||
use async_channel::Sender;
|
||||
use core::fmt;
|
||||
@@ -477,6 +478,7 @@ pub struct CaConn {
|
||||
channel_info_query_queue: VecDeque<ChannelInfoQuery>,
|
||||
channel_info_query_sending: SenderPolling<ChannelInfoQuery>,
|
||||
time_binners: BTreeMap<Cid, ConnTimeBin>,
|
||||
thr_msg_poll: ThrottleTrace,
|
||||
}
|
||||
|
||||
impl Drop for CaConn {
|
||||
@@ -525,6 +527,7 @@ impl CaConn {
|
||||
channel_info_query_queue: VecDeque::new(),
|
||||
channel_info_query_sending: SenderPolling::new(channel_info_query_tx),
|
||||
time_binners: BTreeMap::new(),
|
||||
thr_msg_poll: ThrottleTrace::new(Duration::from_millis(10000)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1708,7 +1711,7 @@ impl CaConn {
|
||||
}
|
||||
|
||||
fn handle_own_ticker_tick(self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> {
|
||||
debug!("tick CaConn {}", self.remote_addr_dbg);
|
||||
// debug!("tick CaConn {}", self.remote_addr_dbg);
|
||||
let this = self.get_mut();
|
||||
if false {
|
||||
for (_, tb) in this.time_binners.iter_mut() {
|
||||
@@ -1759,6 +1762,7 @@ impl Stream for CaConn {
|
||||
self.stats.caconn_poll_count.inc();
|
||||
let poll_ts1 = Instant::now();
|
||||
let ret = loop {
|
||||
self.thr_msg_poll.trigger("CaConn::poll_next");
|
||||
break if let CaConnState::EndOfStream = self.state {
|
||||
Ready(None)
|
||||
} else if let Err(e) = self.as_mut().handle_own_ticker(cx) {
|
||||
@@ -1819,7 +1823,7 @@ impl Stream for CaConn {
|
||||
warn!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3)
|
||||
} else if dt > Duration::from_millis(40) {
|
||||
info!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3)
|
||||
} else if dt > Duration::from_millis(5) {
|
||||
} else if false && dt > Duration::from_millis(5) {
|
||||
debug!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3)
|
||||
}
|
||||
ret
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use super::connset_input_merge::InputMerge;
|
||||
use super::findioc::FindIocRes;
|
||||
use super::statemap;
|
||||
use super::statemap::ChannelState;
|
||||
@@ -16,6 +15,7 @@ use crate::errconv::ErrConv;
|
||||
use crate::rt::JoinHandle;
|
||||
use crate::rt::TokMx;
|
||||
use crate::senderpolling::SenderPolling;
|
||||
use crate::throttletrace::ThrottleTrace;
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
use atomic::AtomicUsize;
|
||||
@@ -260,6 +260,8 @@ pub struct CaConnSet {
|
||||
stats: CaConnSetStats,
|
||||
ioc_finder_jh: JoinHandle<Result<(), Error>>,
|
||||
await_ca_conn_jhs: VecDeque<(SocketAddr, JoinHandle<Result<(), Error>>)>,
|
||||
thr_msg_poll_1: ThrottleTrace,
|
||||
thr_msg_storage_len: ThrottleTrace,
|
||||
}
|
||||
|
||||
impl CaConnSet {
|
||||
@@ -304,6 +306,8 @@ impl CaConnSet {
|
||||
// connset_out_sender: SenderPolling::new(connset_out_tx),
|
||||
ioc_finder_jh,
|
||||
await_ca_conn_jhs: VecDeque::new(),
|
||||
thr_msg_poll_1: ThrottleTrace::new(Duration::from_millis(2000)),
|
||||
thr_msg_storage_len: ThrottleTrace::new(Duration::from_millis(1000)),
|
||||
};
|
||||
// TODO await on jh
|
||||
let jh = tokio::spawn(CaConnSet::run(connset));
|
||||
@@ -555,6 +559,8 @@ impl CaConnSet {
|
||||
if self.shutdown_stopping {
|
||||
return Ok(());
|
||||
}
|
||||
self.thr_msg_storage_len
|
||||
.trigger_fmt("msg", &[&self.storage_insert_sender.len()]);
|
||||
debug!("TODO handle_check_health");
|
||||
let ts2 = Instant::now();
|
||||
let item = CaConnSetItem::Healthy(ts1, ts2);
|
||||
@@ -636,7 +642,7 @@ impl CaConnSet {
|
||||
tx: Sender<(SocketAddr, CaConnEvent)>,
|
||||
addr: SocketAddr,
|
||||
) -> Result<(), Error> {
|
||||
debug!("ca_conn_consumer begin {}", addr);
|
||||
trace2!("ca_conn_consumer begin {}", addr);
|
||||
let stats = conn.stats();
|
||||
let mut conn = conn;
|
||||
let mut ret = Ok(());
|
||||
@@ -652,7 +658,7 @@ impl CaConnSet {
|
||||
}
|
||||
}
|
||||
}
|
||||
debug!("ca_conn_consumer ended {}", addr);
|
||||
trace2!("ca_conn_consumer ended {}", addr);
|
||||
tx.send((
|
||||
addr,
|
||||
CaConnEvent {
|
||||
@@ -661,7 +667,7 @@ impl CaConnSet {
|
||||
},
|
||||
))
|
||||
.await?;
|
||||
debug!("ca_conn_consumer signaled {}", addr);
|
||||
trace!("ca_conn_consumer signaled {}", addr);
|
||||
ret
|
||||
}
|
||||
|
||||
@@ -980,8 +986,9 @@ impl Stream for CaConnSet {
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
debug!("CaConnSet::poll");
|
||||
loop {
|
||||
self.thr_msg_poll_1.trigger("CaConnSet");
|
||||
|
||||
let mut have_pending = false;
|
||||
|
||||
if let Some(item) = self.connset_out_queue.pop_front() {
|
||||
|
||||
@@ -12,4 +12,5 @@ pub mod rt;
|
||||
pub mod senderpolling;
|
||||
#[cfg(test)]
|
||||
pub mod test;
|
||||
pub mod throttletrace;
|
||||
pub mod timebin;
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use crate::ca::METRICS;
|
||||
use crate::daemon_common::DaemonEvent;
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
use async_channel::WeakSender;
|
||||
use axum::extract::Query;
|
||||
@@ -244,9 +245,19 @@ fn make_routes(dcom: Arc<DaemonComm>, stats_set: StatsSet) -> axum::Router {
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn start_metrics_service(bind_to: String, dcom: Arc<DaemonComm>, stats_set: StatsSet) {
|
||||
axum::Server::bind(&bind_to.parse().unwrap())
|
||||
.serve(make_routes(dcom, stats_set).into_make_service())
|
||||
pub async fn metrics_service(
|
||||
bind_to: String,
|
||||
dcom: Arc<DaemonComm>,
|
||||
stats_set: StatsSet,
|
||||
shutdown_signal: Receiver<u32>,
|
||||
) {
|
||||
let addr = bind_to.parse().unwrap();
|
||||
let router = make_routes(dcom, stats_set).into_make_service();
|
||||
axum::Server::bind(&addr)
|
||||
.serve(router)
|
||||
.with_graceful_shutdown(async move {
|
||||
let _ = shutdown_signal.recv().await;
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
@@ -75,6 +75,10 @@ impl<T> SenderPolling<T> {
|
||||
self.sender = None;
|
||||
self.fut = None;
|
||||
}
|
||||
|
||||
pub fn len(&self) -> Option<usize> {
|
||||
self.sender.as_ref().map(|x| x.len())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Future for SenderPolling<T> {
|
||||
|
||||
44
netfetch/src/throttletrace.rs
Normal file
44
netfetch/src/throttletrace.rs
Normal file
@@ -0,0 +1,44 @@
|
||||
use core::fmt;
|
||||
use log::*;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
pub struct ThrottleTrace {
|
||||
ivl: Duration,
|
||||
next: Instant,
|
||||
count: u64,
|
||||
}
|
||||
|
||||
impl ThrottleTrace {
|
||||
pub fn new(ivl: Duration) -> Self {
|
||||
Self {
|
||||
ivl,
|
||||
next: Instant::now(),
|
||||
count: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn trigger(&mut self, msg: &str) {
|
||||
self.count += 1;
|
||||
let tsnow = Instant::now();
|
||||
if self.next <= tsnow {
|
||||
self.next = tsnow + self.ivl;
|
||||
debug!("{} (count {})", msg, self.count);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn trigger_fmt(&mut self, msg: &str, params: &[&dyn fmt::Debug]) {
|
||||
self.count += 1;
|
||||
let tsnow = Instant::now();
|
||||
if self.next <= tsnow {
|
||||
self.next = tsnow + self.ivl;
|
||||
if params.len() == 1 {
|
||||
debug!("{} {:?} (count {})", msg, params[0], self.count);
|
||||
} else if params.len() == 2 {
|
||||
debug!("{} {:?} {:?} (count {})", msg, params[0], params[1], self.count);
|
||||
} else {
|
||||
debug!("{} (count {})", msg, self.count);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user