Add reason to channel close
This commit is contained in:
@@ -19,6 +19,7 @@ use netpod::Database;
|
||||
use stats::CaConnStatsAgg;
|
||||
use std::net::SocketAddrV4;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic;
|
||||
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
@@ -44,6 +45,7 @@ pub struct IngestCommons {
|
||||
pub insert_frac: AtomicU64,
|
||||
pub store_workers_rate: AtomicU64,
|
||||
pub ca_conn_set: CaConnSet,
|
||||
pub insert_workers_running: atomic::AtomicUsize,
|
||||
}
|
||||
|
||||
pub trait SlowWarnable {
|
||||
|
||||
@@ -13,11 +13,15 @@ use crate::series::Existence;
|
||||
use crate::series::SeriesId;
|
||||
use crate::store::ChannelInfoItem;
|
||||
use crate::store::ChannelStatus;
|
||||
use crate::store::ChannelStatusClosedReason;
|
||||
use crate::store::ChannelStatusItem;
|
||||
use crate::store::CommonInsertItemQueueSender;
|
||||
use crate::store::ConnectionStatus;
|
||||
use crate::store::ConnectionStatusItem;
|
||||
use crate::store::{InsertItem, IvlItem, MuteItem, QueryItem};
|
||||
use crate::store::InsertItem;
|
||||
use crate::store::IvlItem;
|
||||
use crate::store::MuteItem;
|
||||
use crate::store::QueryItem;
|
||||
use async_channel::Sender;
|
||||
use err::Error;
|
||||
use futures_util::stream::FuturesUnordered;
|
||||
@@ -499,10 +503,10 @@ impl CaConn {
|
||||
}
|
||||
}
|
||||
|
||||
fn trigger_shutdown(&mut self) {
|
||||
fn trigger_shutdown(&mut self, channel_reason: ChannelStatusClosedReason) {
|
||||
self.state = CaConnState::Shutdown;
|
||||
self.proto = None;
|
||||
let res = self.channel_state_on_shutdown();
|
||||
self.channel_state_on_shutdown(channel_reason);
|
||||
}
|
||||
|
||||
fn cmd_check_health(&mut self) {
|
||||
@@ -510,7 +514,7 @@ impl CaConn {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("{e}");
|
||||
self.trigger_shutdown();
|
||||
self.trigger_shutdown(ChannelStatusClosedReason::InternalError);
|
||||
}
|
||||
}
|
||||
// TODO return the result
|
||||
@@ -579,7 +583,7 @@ impl CaConn {
|
||||
}
|
||||
|
||||
fn cmd_shutdown(&mut self) {
|
||||
self.trigger_shutdown();
|
||||
self.trigger_shutdown(ChannelStatusClosedReason::IngestExit);
|
||||
}
|
||||
|
||||
fn cmd_extra_inserts_conf(&mut self, extra_inserts_conf: ExtraInsertsConf) {
|
||||
@@ -717,42 +721,7 @@ impl CaConn {
|
||||
self.conn_backoff = self.conn_backoff_beg;
|
||||
}
|
||||
|
||||
fn before_reset_of_channel_state(&mut self) {
|
||||
trace!("before_reset_of_channel_state channels {}", self.channels.len());
|
||||
let mut warn_max = 0;
|
||||
for (_cid, chst) in &mut self.channels {
|
||||
match chst {
|
||||
ChannelState::Init => {}
|
||||
ChannelState::Creating { .. } => {
|
||||
*chst = ChannelState::Init;
|
||||
}
|
||||
ChannelState::Created(st) => {
|
||||
if let Some(series) = &st.series {
|
||||
let item = QueryItem::ChannelStatus(ChannelStatusItem {
|
||||
ts: SystemTime::now(),
|
||||
series: series.clone(),
|
||||
status: ChannelStatus::Closed,
|
||||
});
|
||||
self.insert_item_queue.push_back(item);
|
||||
} else {
|
||||
if warn_max < 10 {
|
||||
debug!("no series for cid {:?}", st.cid);
|
||||
warn_max += 1;
|
||||
}
|
||||
}
|
||||
*chst = ChannelState::Init;
|
||||
}
|
||||
ChannelState::Error(_) => {
|
||||
*chst = ChannelState::Init;
|
||||
}
|
||||
ChannelState::Ended => {
|
||||
*chst = ChannelState::Init;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn channel_state_on_shutdown(&mut self) {
|
||||
fn channel_state_on_shutdown(&mut self, channel_reason: ChannelStatusClosedReason) {
|
||||
trace!("channel_state_on_shutdown channels {}", self.channels.len());
|
||||
let mut warn_max = 0;
|
||||
for (_cid, chst) in &mut self.channels {
|
||||
@@ -768,7 +737,7 @@ impl CaConn {
|
||||
let item = QueryItem::ChannelStatus(ChannelStatusItem {
|
||||
ts: SystemTime::now(),
|
||||
series: series.clone(),
|
||||
status: ChannelStatus::Closed,
|
||||
status: ChannelStatus::Closed(channel_reason.clone()),
|
||||
});
|
||||
self.insert_item_queue.push_back(item);
|
||||
} else {
|
||||
@@ -841,7 +810,7 @@ impl CaConn {
|
||||
value: CaConnEventValue::EchoTimeout,
|
||||
};
|
||||
self.ca_conn_event_out_queue.push_back(item);
|
||||
self.trigger_shutdown();
|
||||
self.trigger_shutdown(ChannelStatusClosedReason::IocTimeout);
|
||||
}
|
||||
} else {
|
||||
self.ioc_ping_start = Some(Instant::now());
|
||||
@@ -851,7 +820,7 @@ impl CaConn {
|
||||
proto.push_out(msg);
|
||||
} else {
|
||||
warn!("can not push echo, no proto");
|
||||
self.trigger_shutdown();
|
||||
self.trigger_shutdown(ChannelStatusClosedReason::NoProtocol);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1510,12 +1479,12 @@ impl CaConn {
|
||||
}
|
||||
Ready(Some(Err(e))) => {
|
||||
error!("CaProto yields error: {e:?} remote {:?}", self.remote_addr_dbg);
|
||||
self.trigger_shutdown();
|
||||
self.trigger_shutdown(ChannelStatusClosedReason::ProtocolError);
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
Ready(None) => {
|
||||
warn!("handle_peer_ready CaProto is done {:?}", self.remote_addr_dbg);
|
||||
self.trigger_shutdown();
|
||||
self.trigger_shutdown(ChannelStatusClosedReason::ProtocolDone);
|
||||
Ready(None)
|
||||
}
|
||||
Pending => Pending,
|
||||
@@ -1735,12 +1704,12 @@ impl Stream for CaConn {
|
||||
Ready(Some(Ok(_))) => {}
|
||||
Ready(Some(Err(e))) => {
|
||||
error!("{e}");
|
||||
self.trigger_shutdown();
|
||||
self.trigger_shutdown(ChannelStatusClosedReason::InternalError);
|
||||
break;
|
||||
}
|
||||
Ready(None) => {
|
||||
warn!("command input queue closed, do shutdown");
|
||||
self.trigger_shutdown();
|
||||
self.trigger_shutdown(ChannelStatusClosedReason::InternalError);
|
||||
break;
|
||||
}
|
||||
Pending => break,
|
||||
|
||||
@@ -6,7 +6,7 @@ use err::Error;
|
||||
use log::*;
|
||||
use netpod::timeunits::{MS, SEC};
|
||||
use netpod::ScyllaConfig;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::atomic::{self, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio_postgres::Client as PgClient;
|
||||
@@ -128,8 +128,8 @@ pub async fn spawn_scylla_insert_workers(
|
||||
let data_store = Arc::new(DataStore::new(&scyconf).await?);
|
||||
data_stores.push(data_store);
|
||||
}
|
||||
for i1 in 0..insert_worker_count {
|
||||
let data_store = data_stores[i1 * data_stores.len() / insert_worker_count].clone();
|
||||
for worker_ix in 0..insert_worker_count {
|
||||
let data_store = data_stores[worker_ix * data_stores.len() / insert_worker_count].clone();
|
||||
let stats = store_stats.clone();
|
||||
let recv = if use_rate_limit_queue {
|
||||
q2_rx.clone()
|
||||
@@ -140,6 +140,9 @@ pub async fn spawn_scylla_insert_workers(
|
||||
};
|
||||
let ingest_commons = ingest_commons.clone();
|
||||
let fut = async move {
|
||||
ingest_commons
|
||||
.insert_workers_running
|
||||
.fetch_add(1, atomic::Ordering::AcqRel);
|
||||
let backoff_0 = Duration::from_millis(10);
|
||||
let mut backoff = backoff_0.clone();
|
||||
let mut i1 = 0;
|
||||
@@ -267,7 +270,10 @@ pub async fn spawn_scylla_insert_workers(
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("insert worker {i1} has no more messages");
|
||||
ingest_commons
|
||||
.insert_workers_running
|
||||
.fetch_sub(1, atomic::Ordering::AcqRel);
|
||||
info!("insert worker {worker_ix} has no more messages");
|
||||
};
|
||||
let jh = tokio::spawn(fut);
|
||||
jhs.push(jh);
|
||||
|
||||
@@ -161,22 +161,67 @@ pub struct ConnectionStatusItem {
|
||||
pub status: ConnectionStatus,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ChannelStatusClosedReason {
|
||||
IngestExit,
|
||||
ChannelRemove,
|
||||
ProtocolError,
|
||||
FrequencyQuota,
|
||||
BandwidthQuota,
|
||||
InternalError,
|
||||
IocTimeout,
|
||||
NoProtocol,
|
||||
ProtocolDone,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ChannelStatus {
|
||||
Opened,
|
||||
Closed,
|
||||
ClosedUnexpected,
|
||||
Closed(ChannelStatusClosedReason),
|
||||
}
|
||||
|
||||
impl ChannelStatus {
|
||||
pub fn kind(&self) -> u32 {
|
||||
pub fn to_kind(&self) -> u32 {
|
||||
use ChannelStatus::*;
|
||||
use ChannelStatusClosedReason::*;
|
||||
match self {
|
||||
Opened => 1,
|
||||
Closed => 2,
|
||||
ClosedUnexpected => 3,
|
||||
Closed(x) => match x {
|
||||
IngestExit => 2,
|
||||
ChannelRemove => 3,
|
||||
ProtocolError => 4,
|
||||
FrequencyQuota => 5,
|
||||
BandwidthQuota => 6,
|
||||
InternalError => 7,
|
||||
IocTimeout => 8,
|
||||
NoProtocol => 9,
|
||||
ProtocolDone => 10,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_kind(kind: u32) -> Result<Self, err::Error> {
|
||||
use ChannelStatus::*;
|
||||
use ChannelStatusClosedReason::*;
|
||||
let ret = match kind {
|
||||
1 => Opened,
|
||||
2 => Closed(IngestExit),
|
||||
3 => Closed(ChannelRemove),
|
||||
4 => Closed(ProtocolError),
|
||||
5 => Closed(FrequencyQuota),
|
||||
6 => Closed(BandwidthQuota),
|
||||
7 => Closed(InternalError),
|
||||
8 => Closed(IocTimeout),
|
||||
9 => Closed(NoProtocol),
|
||||
10 => Closed(ProtocolDone),
|
||||
_ => {
|
||||
return Err(err::Error::with_msg_no_trace(format!(
|
||||
"unknown ChannelStatus kind {kind}"
|
||||
)));
|
||||
}
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -295,7 +340,9 @@ impl CommonInsertItemQueue {
|
||||
self.sender.lock().unwrap().as_ref().map(|x| x.close());
|
||||
}
|
||||
|
||||
pub fn drop_sender(&self) {}
|
||||
pub fn drop_sender(&self) {
|
||||
self.sender.lock().unwrap().take();
|
||||
}
|
||||
}
|
||||
|
||||
struct InsParCom {
|
||||
@@ -463,7 +510,7 @@ pub async fn insert_channel_status(
|
||||
let ts = secs + nanos;
|
||||
let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV;
|
||||
let ts_lsp = ts - ts_msp;
|
||||
let kind = item.status.kind();
|
||||
let kind = item.status.to_kind();
|
||||
let series = item.series.id();
|
||||
let params = (
|
||||
series as i64,
|
||||
|
||||
Reference in New Issue
Block a user