From 28954e5c0da5010b354431fde001dd4a9e2ffffd Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 26 Jan 2023 06:44:38 +0100 Subject: [PATCH] Add reason to channel close --- daqingest/src/daemon.rs | 13 +++++++- netfetch/src/ca.rs | 2 ++ netfetch/src/ca/conn.rs | 65 ++++++++++-------------------------- netfetch/src/insertworker.rs | 14 +++++--- netfetch/src/store.rs | 61 +++++++++++++++++++++++++++++---- 5 files changed, 95 insertions(+), 60 deletions(-) diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index c875637..d40167a 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -289,6 +289,7 @@ impl Daemon { .sender() .ok_or_else(|| Error::with_msg_no_trace("can not derive sender for insert queue adapter"))?; let insert_queue_counter = insert_queue_counter.clone(); + let common_insert_item_queue_2 = common_insert_item_queue_2.clone(); async move { let mut printed_last = Instant::now(); let mut histo = BTreeMap::new(); @@ -344,6 +345,7 @@ impl Daemon { } } info!("insert queue adapter ended"); + common_insert_item_queue_2.drop_sender(); } }); } @@ -359,6 +361,7 @@ impl Daemon { store_workers_rate: AtomicU64::new(20000), insert_frac: AtomicU64::new(1000), ca_conn_set: CaConnSet::new(channel_info_query_tx), + insert_workers_running: atomic::AtomicUsize::new(0), }; let ingest_commons = Arc::new(ingest_commons); @@ -930,7 +933,15 @@ impl Daemon { if self.shutting_down { let sa1 = self.ingest_commons.insert_item_queue.sender_count(); let sa2 = self.ingest_commons.insert_item_queue.sender_count_2(); - info!("qu senders A {:?} {:?}", sa1, sa2); + let nworkers = self + .ingest_commons + .insert_workers_running + .load(atomic::Ordering::Acquire); + info!("qu senders A {:?} {:?} nworkers {}", sa1, sa2, nworkers); + if nworkers == 0 { + info!("goodbye"); + std::process::exit(0); + } } self.stats.handle_timer_tick_count_inc(); let ts1 = Instant::now(); diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 4a24af9..7e7a4ab 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -19,6 +19,7 @@ use netpod::Database; use stats::CaConnStatsAgg; use std::net::SocketAddrV4; use std::pin::Pin; +use std::sync::atomic; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::Arc; use std::sync::Mutex; @@ -44,6 +45,7 @@ pub struct IngestCommons { pub insert_frac: AtomicU64, pub store_workers_rate: AtomicU64, pub ca_conn_set: CaConnSet, + pub insert_workers_running: atomic::AtomicUsize, } pub trait SlowWarnable { diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index eb1ce2b..5bec4c8 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -13,11 +13,15 @@ use crate::series::Existence; use crate::series::SeriesId; use crate::store::ChannelInfoItem; use crate::store::ChannelStatus; +use crate::store::ChannelStatusClosedReason; use crate::store::ChannelStatusItem; use crate::store::CommonInsertItemQueueSender; use crate::store::ConnectionStatus; use crate::store::ConnectionStatusItem; -use crate::store::{InsertItem, IvlItem, MuteItem, QueryItem}; +use crate::store::InsertItem; +use crate::store::IvlItem; +use crate::store::MuteItem; +use crate::store::QueryItem; use async_channel::Sender; use err::Error; use futures_util::stream::FuturesUnordered; @@ -499,10 +503,10 @@ impl CaConn { } } - fn trigger_shutdown(&mut self) { + fn trigger_shutdown(&mut self, channel_reason: ChannelStatusClosedReason) { self.state = CaConnState::Shutdown; self.proto = None; - let res = self.channel_state_on_shutdown(); + self.channel_state_on_shutdown(channel_reason); } fn cmd_check_health(&mut self) { @@ -510,7 +514,7 @@ impl CaConn { Ok(_) => {} Err(e) => { error!("{e}"); - self.trigger_shutdown(); + self.trigger_shutdown(ChannelStatusClosedReason::InternalError); } } // TODO return the result @@ -579,7 +583,7 @@ impl CaConn { } fn cmd_shutdown(&mut self) { - self.trigger_shutdown(); + self.trigger_shutdown(ChannelStatusClosedReason::IngestExit); } fn cmd_extra_inserts_conf(&mut self, extra_inserts_conf: ExtraInsertsConf) { @@ -717,42 +721,7 @@ impl CaConn { self.conn_backoff = self.conn_backoff_beg; } - fn before_reset_of_channel_state(&mut self) { - trace!("before_reset_of_channel_state channels {}", self.channels.len()); - let mut warn_max = 0; - for (_cid, chst) in &mut self.channels { - match chst { - ChannelState::Init => {} - ChannelState::Creating { .. } => { - *chst = ChannelState::Init; - } - ChannelState::Created(st) => { - if let Some(series) = &st.series { - let item = QueryItem::ChannelStatus(ChannelStatusItem { - ts: SystemTime::now(), - series: series.clone(), - status: ChannelStatus::Closed, - }); - self.insert_item_queue.push_back(item); - } else { - if warn_max < 10 { - debug!("no series for cid {:?}", st.cid); - warn_max += 1; - } - } - *chst = ChannelState::Init; - } - ChannelState::Error(_) => { - *chst = ChannelState::Init; - } - ChannelState::Ended => { - *chst = ChannelState::Init; - } - } - } - } - - fn channel_state_on_shutdown(&mut self) { + fn channel_state_on_shutdown(&mut self, channel_reason: ChannelStatusClosedReason) { trace!("channel_state_on_shutdown channels {}", self.channels.len()); let mut warn_max = 0; for (_cid, chst) in &mut self.channels { @@ -768,7 +737,7 @@ impl CaConn { let item = QueryItem::ChannelStatus(ChannelStatusItem { ts: SystemTime::now(), series: series.clone(), - status: ChannelStatus::Closed, + status: ChannelStatus::Closed(channel_reason.clone()), }); self.insert_item_queue.push_back(item); } else { @@ -841,7 +810,7 @@ impl CaConn { value: CaConnEventValue::EchoTimeout, }; self.ca_conn_event_out_queue.push_back(item); - self.trigger_shutdown(); + self.trigger_shutdown(ChannelStatusClosedReason::IocTimeout); } } else { self.ioc_ping_start = Some(Instant::now()); @@ -851,7 +820,7 @@ impl CaConn { proto.push_out(msg); } else { warn!("can not push echo, no proto"); - self.trigger_shutdown(); + self.trigger_shutdown(ChannelStatusClosedReason::NoProtocol); } } } @@ -1510,12 +1479,12 @@ impl CaConn { } Ready(Some(Err(e))) => { error!("CaProto yields error: {e:?} remote {:?}", self.remote_addr_dbg); - self.trigger_shutdown(); + self.trigger_shutdown(ChannelStatusClosedReason::ProtocolError); Ready(Some(Err(e))) } Ready(None) => { warn!("handle_peer_ready CaProto is done {:?}", self.remote_addr_dbg); - self.trigger_shutdown(); + self.trigger_shutdown(ChannelStatusClosedReason::ProtocolDone); Ready(None) } Pending => Pending, @@ -1735,12 +1704,12 @@ impl Stream for CaConn { Ready(Some(Ok(_))) => {} Ready(Some(Err(e))) => { error!("{e}"); - self.trigger_shutdown(); + self.trigger_shutdown(ChannelStatusClosedReason::InternalError); break; } Ready(None) => { warn!("command input queue closed, do shutdown"); - self.trigger_shutdown(); + self.trigger_shutdown(ChannelStatusClosedReason::InternalError); break; } Pending => break, diff --git a/netfetch/src/insertworker.rs b/netfetch/src/insertworker.rs index 2a67d5a..4e79fb2 100644 --- a/netfetch/src/insertworker.rs +++ b/netfetch/src/insertworker.rs @@ -6,7 +6,7 @@ use err::Error; use log::*; use netpod::timeunits::{MS, SEC}; use netpod::ScyllaConfig; -use std::sync::atomic::Ordering; +use std::sync::atomic::{self, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio_postgres::Client as PgClient; @@ -128,8 +128,8 @@ pub async fn spawn_scylla_insert_workers( let data_store = Arc::new(DataStore::new(&scyconf).await?); data_stores.push(data_store); } - for i1 in 0..insert_worker_count { - let data_store = data_stores[i1 * data_stores.len() / insert_worker_count].clone(); + for worker_ix in 0..insert_worker_count { + let data_store = data_stores[worker_ix * data_stores.len() / insert_worker_count].clone(); let stats = store_stats.clone(); let recv = if use_rate_limit_queue { q2_rx.clone() @@ -140,6 +140,9 @@ pub async fn spawn_scylla_insert_workers( }; let ingest_commons = ingest_commons.clone(); let fut = async move { + ingest_commons + .insert_workers_running + .fetch_add(1, atomic::Ordering::AcqRel); let backoff_0 = Duration::from_millis(10); let mut backoff = backoff_0.clone(); let mut i1 = 0; @@ -267,7 +270,10 @@ pub async fn spawn_scylla_insert_workers( } } } - info!("insert worker {i1} has no more messages"); + ingest_commons + .insert_workers_running + .fetch_sub(1, atomic::Ordering::AcqRel); + info!("insert worker {worker_ix} has no more messages"); }; let jh = tokio::spawn(fut); jhs.push(jh); diff --git a/netfetch/src/store.rs b/netfetch/src/store.rs index 7d0aadf..80bd062 100644 --- a/netfetch/src/store.rs +++ b/netfetch/src/store.rs @@ -161,22 +161,67 @@ pub struct ConnectionStatusItem { pub status: ConnectionStatus, } +#[derive(Debug, Clone)] +pub enum ChannelStatusClosedReason { + IngestExit, + ChannelRemove, + ProtocolError, + FrequencyQuota, + BandwidthQuota, + InternalError, + IocTimeout, + NoProtocol, + ProtocolDone, +} + #[derive(Debug)] pub enum ChannelStatus { Opened, - Closed, - ClosedUnexpected, + Closed(ChannelStatusClosedReason), } impl ChannelStatus { - pub fn kind(&self) -> u32 { + pub fn to_kind(&self) -> u32 { use ChannelStatus::*; + use ChannelStatusClosedReason::*; match self { Opened => 1, - Closed => 2, - ClosedUnexpected => 3, + Closed(x) => match x { + IngestExit => 2, + ChannelRemove => 3, + ProtocolError => 4, + FrequencyQuota => 5, + BandwidthQuota => 6, + InternalError => 7, + IocTimeout => 8, + NoProtocol => 9, + ProtocolDone => 10, + }, } } + + pub fn from_kind(kind: u32) -> Result { + use ChannelStatus::*; + use ChannelStatusClosedReason::*; + let ret = match kind { + 1 => Opened, + 2 => Closed(IngestExit), + 3 => Closed(ChannelRemove), + 4 => Closed(ProtocolError), + 5 => Closed(FrequencyQuota), + 6 => Closed(BandwidthQuota), + 7 => Closed(InternalError), + 8 => Closed(IocTimeout), + 9 => Closed(NoProtocol), + 10 => Closed(ProtocolDone), + _ => { + return Err(err::Error::with_msg_no_trace(format!( + "unknown ChannelStatus kind {kind}" + ))); + } + }; + Ok(ret) + } } #[derive(Debug)] @@ -295,7 +340,9 @@ impl CommonInsertItemQueue { self.sender.lock().unwrap().as_ref().map(|x| x.close()); } - pub fn drop_sender(&self) {} + pub fn drop_sender(&self) { + self.sender.lock().unwrap().take(); + } } struct InsParCom { @@ -463,7 +510,7 @@ pub async fn insert_channel_status( let ts = secs + nanos; let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV; let ts_lsp = ts - ts_msp; - let kind = item.status.kind(); + let kind = item.status.to_kind(); let series = item.series.id(); let params = ( series as i64,