Check senders

This commit is contained in:
Dominik Werder
2023-01-25 15:47:57 +01:00
parent 2171bbabd7
commit 0b39f92575
8 changed files with 223 additions and 131 deletions

View File

@@ -393,7 +393,6 @@ struct ChannelOpsResources<'a> {
pub struct CaConn {
state: CaConnState,
shutdown: bool,
ticker: Pin<Box<tokio::time::Sleep>>,
proto: Option<CaProto>,
cid_store: CidStore,
@@ -445,7 +444,6 @@ impl CaConn {
let (cq_tx, cq_rx) = async_channel::bounded(32);
Self {
state: CaConnState::Unconnected,
shutdown: false,
ticker: Box::pin(tokio::time::sleep(Duration::from_millis(500))),
proto: None,
cid_store: CidStore::new(),
@@ -493,34 +491,20 @@ impl CaConn {
self.conn_command_tx.clone()
}
fn trigger_shutdown(&mut self) {
self.shutdown = true;
for (_k, v) in self.channels.iter_mut() {
match v {
ChannelState::Init => {
*v = ChannelState::Ended;
}
ChannelState::Creating { .. } => {
*v = ChannelState::Ended;
}
ChannelState::Created(st) => {
if let Some(series) = &st.series {
let item = QueryItem::ChannelStatus(ChannelStatusItem {
ts: SystemTime::now(),
series: series.clone(),
status: ChannelStatus::Closed,
});
info!("emit status item {item:?}");
self.insert_item_queue.push_back(item);
}
*v = ChannelState::Ended;
}
ChannelState::Error(_) => {}
ChannelState::Ended => {}
}
fn is_shutdown(&self) -> bool {
if let CaConnState::Shutdown = self.state {
true
} else {
false
}
}
fn trigger_shutdown(&mut self) {
self.state = CaConnState::Shutdown;
self.proto = None;
let res = self.channel_state_on_shutdown();
}
fn cmd_check_health(&mut self) {
match self.check_channels_alive() {
Ok(_) => {}
@@ -596,10 +580,6 @@ impl CaConn {
fn cmd_shutdown(&mut self) {
self.trigger_shutdown();
let res = self.before_reset_of_channel_state();
self.state = CaConnState::Shutdown;
self.proto = None;
// TODO return the result
}
fn cmd_extra_inserts_conf(&mut self, extra_inserts_conf: ExtraInsertsConf) {
@@ -737,12 +717,15 @@ impl CaConn {
self.conn_backoff = self.conn_backoff_beg;
}
fn before_reset_of_channel_state(&mut self) -> Result<(), Error> {
fn before_reset_of_channel_state(&mut self) {
trace!("before_reset_of_channel_state channels {}", self.channels.len());
let mut created = 0;
let mut warn_max = 0;
for (_cid, chst) in &self.channels {
for (_cid, chst) in &mut self.channels {
match chst {
ChannelState::Init => {}
ChannelState::Creating { .. } => {
*chst = ChannelState::Init;
}
ChannelState::Created(st) => {
if let Some(series) = &st.series {
let item = QueryItem::ChannelStatus(ChannelStatusItem {
@@ -750,9 +733,6 @@ impl CaConn {
series: series.clone(),
status: ChannelStatus::Closed,
});
if created < 10 {
trace!("store {:?}", item);
}
self.insert_item_queue.push_back(item);
} else {
if warn_max < 10 {
@@ -760,12 +740,51 @@ impl CaConn {
warn_max += 1;
}
}
created += 1;
*chst = ChannelState::Init;
}
ChannelState::Error(_) => {
*chst = ChannelState::Init;
}
ChannelState::Ended => {
*chst = ChannelState::Init;
}
_ => (),
}
}
Ok(())
}
fn channel_state_on_shutdown(&mut self) {
trace!("channel_state_on_shutdown channels {}", self.channels.len());
let mut warn_max = 0;
for (_cid, chst) in &mut self.channels {
match chst {
ChannelState::Init => {
*chst = ChannelState::Ended;
}
ChannelState::Creating { .. } => {
*chst = ChannelState::Ended;
}
ChannelState::Created(st) => {
if let Some(series) = &st.series {
let item = QueryItem::ChannelStatus(ChannelStatusItem {
ts: SystemTime::now(),
series: series.clone(),
status: ChannelStatus::Closed,
});
self.insert_item_queue.push_back(item);
} else {
if warn_max < 10 {
debug!("no series for cid {:?}", st.cid);
warn_max += 1;
}
}
*chst = ChannelState::Ended;
}
ChannelState::Error(_) => {
*chst = ChannelState::Ended;
}
ChannelState::Ended => {}
}
}
}
fn handle_insert_futs(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
@@ -1491,26 +1510,13 @@ impl CaConn {
}
Ready(Some(Err(e))) => {
error!("CaProto yields error: {e:?} remote {:?}", self.remote_addr_dbg);
// TODO unify this handling with the block below
let reset_res = self.before_reset_of_channel_state();
self.state = CaConnState::Wait(wait_fut(self.backoff_next()));
self.proto = None;
if let Err(e) = reset_res {
error!("can not destruct channel state before reset {e:?}");
}
self.trigger_shutdown();
Ready(Some(Err(e)))
}
Ready(None) => {
warn!("handle_peer_ready CaProto is done {:?}", self.remote_addr_dbg);
let reset_res = self.before_reset_of_channel_state();
self.state = CaConnState::Wait(wait_fut(self.backoff_next()));
self.proto = None;
if let Err(e) = reset_res {
error!("can not destruct channel state before reset {e:?}");
Ready(Some(Err(e)))
} else {
Ready(None)
}
self.trigger_shutdown();
Ready(None)
}
Pending => Pending,
};
@@ -1649,7 +1655,7 @@ impl CaConn {
if self.insert_item_queue.len() >= self.insert_queue_max {
break None;
}
if self.shutdown {
if self.is_shutdown() {
break None;
}
}
@@ -1722,7 +1728,7 @@ impl Stream for CaConn {
i1 += 1;
self.stats.caconn_loop1_count_inc();
loop {
if self.shutdown {
if self.is_shutdown() {
break;
}
break match self.handle_conn_command(cx) {
@@ -1744,7 +1750,7 @@ impl Stream for CaConn {
Ready(_) => {}
Pending => break Pending,
}
if self.shutdown {
if self.is_shutdown() {
if self.insert_item_queue.len() == 0 {
trace!("no more items to flush");
if i1 >= 10 {
@@ -1757,7 +1763,7 @@ impl Stream for CaConn {
if self.insert_item_queue.len() >= self.insert_queue_max {
break Pending;
}
if !self.shutdown {
if !self.is_shutdown() {
if let Some(v) = self.loop_inner(cx) {
if i1 >= 10 {
break v;
@@ -1769,7 +1775,7 @@ impl Stream for CaConn {
Ready(_) => self.stats.conn_stream_ready_inc(),
Pending => self.stats.conn_stream_pending_inc(),
}
if self.shutdown && self.insert_item_queue.len() == 0 {
if self.is_shutdown() && self.insert_item_queue.len() == 0 {
Ready(None)
} else {
match ret {

View File

@@ -266,7 +266,9 @@ impl CaConnSet {
local_epics_hostname,
array_truncate,
insert_queue_max,
insert_item_queue.sender(),
insert_item_queue
.sender()
.ok_or_else(|| Error::with_msg_no_trace("can not derive sender"))?,
data_store.clone(),
Vec::new(),
)?;
@@ -302,7 +304,8 @@ impl CaConnSet {
ca_conn_ress: &TokMx<BTreeMap<SocketAddrV4, CaConnRess>>,
addr: SocketAddrV4,
) -> Result<bool, Error> {
warn!("Lock for conn_remove");
// TODO make this lock-free.
//warn!("Lock for conn_remove");
if let Some(_caconn) = ca_conn_ress.lock().await.remove(&addr) {
Ok(true)
} else {

View File

@@ -27,6 +27,7 @@ pub enum DaemonEvent {
ChannelRemove(Channel),
SearchDone(Result<VecDeque<FindIocRes>, Error>),
CaConnEvent(SocketAddrV4, CaConnEvent),
Shutdown,
}
impl DaemonEvent {
@@ -47,6 +48,7 @@ impl DaemonEvent {
EndOfStream => format!("CaConnEvent/EndOfStream"),
}
}
Shutdown => format!("Shutdown"),
}
}
}

View File

@@ -61,11 +61,17 @@ pub async fn spawn_scylla_insert_workers(
use_rate_limit_queue: bool,
ttls: Ttls,
) -> Result<Vec<JoinHandle<()>>, Error> {
let (q2_tx, q2_rx) = async_channel::bounded(insert_item_queue.receiver().capacity().unwrap_or(20000));
let (q2_tx, q2_rx) = async_channel::bounded(
insert_item_queue
.receiver()
.map_or(20000, |x| x.capacity().unwrap_or(20000)),
);
{
let ingest_commons = ingest_commons.clone();
let stats = store_stats.clone();
let recv = insert_item_queue.receiver();
let recv = insert_item_queue
.receiver()
.ok_or_else(|| Error::with_msg_no_trace("can not derive insert queue receiver"))?;
let store_stats = store_stats.clone();
let fut = async move {
if !use_rate_limit_queue {
@@ -128,7 +134,9 @@ pub async fn spawn_scylla_insert_workers(
let recv = if use_rate_limit_queue {
q2_rx.clone()
} else {
insert_item_queue.receiver()
insert_item_queue
.receiver()
.ok_or_else(|| Error::with_msg_no_trace("can not derive receiver"))?
};
let ingest_commons = ingest_commons.clone();
let fut = async move {
@@ -259,7 +267,7 @@ pub async fn spawn_scylla_insert_workers(
}
}
}
trace!("insert worker has no more messages");
info!("insert worker {i1} has no more messages");
};
let jh = tokio::spawn(fut);
jhs.push(jh);

View File

@@ -267,7 +267,10 @@ pub async fn metrics_agg_task(
}
}
{
let val = ingest_commons.insert_item_queue.receiver().len() as u64;
let val = ingest_commons
.insert_item_queue
.receiver()
.map_or(0, |x| x.len() as u64);
agg.store_worker_recv_queue_len.store(val, Ordering::Release);
}
let mut m = METRICS.lock().unwrap();

View File

@@ -251,35 +251,39 @@ impl CommonInsertItemQueueSender {
}
pub struct CommonInsertItemQueue {
sender: async_channel::Sender<QueryItem>,
sender: std::sync::Mutex<Option<async_channel::Sender<QueryItem>>>,
recv: async_channel::Receiver<QueryItem>,
}
impl CommonInsertItemQueue {
pub fn new(cap: usize) -> Self {
let (tx, rx) = async_channel::bounded(cap);
Self { sender: tx, recv: rx }
}
pub fn sender(&self) -> CommonInsertItemQueueSender {
CommonInsertItemQueueSender {
sender: self.sender.clone(),
Self {
sender: std::sync::Mutex::new(Some(tx)),
recv: rx,
}
}
pub fn sender_raw(&self) -> async_channel::Sender<QueryItem> {
self.sender.clone()
pub fn sender(&self) -> Option<CommonInsertItemQueueSender> {
match self.sender.lock().unwrap().as_ref() {
Some(sender) => {
let ret = CommonInsertItemQueueSender { sender: sender.clone() };
Some(ret)
}
None => None,
}
}
pub fn receiver(&self) -> async_channel::Receiver<QueryItem> {
self.recv.clone()
pub fn receiver(&self) -> Option<async_channel::Receiver<QueryItem>> {
let ret = self.recv.clone();
Some(ret)
}
pub fn sender_count(&self) -> usize {
self.sender.sender_count()
pub fn sender_count(&self) -> Option<usize> {
self.sender.lock().unwrap().as_ref().map(|x| x.sender_count())
}
pub fn sender_count2(&self) -> usize {
pub fn sender_count_2(&self) -> usize {
self.recv.sender_count()
}
@@ -288,8 +292,10 @@ impl CommonInsertItemQueue {
}
pub fn close(&self) {
self.sender.close();
self.sender.lock().unwrap().as_ref().map(|x| x.close());
}
pub fn drop_sender(&self) {}
}
struct InsParCom {