This commit is contained in:
Dominik Werder
2023-09-06 11:59:08 +02:00
parent c85f0ea0f5
commit 84d045f245
5 changed files with 27 additions and 47 deletions

View File

@@ -373,6 +373,7 @@ pub enum CaConnEventValue {
None,
EchoTimeout,
ConnCommandResult(ConnCommandResult),
QueryItem(QueryItem),
EndOfStream,
}
@@ -785,38 +786,6 @@ impl CaConn {
}
}
fn handle_insert_futs(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
use Poll::*;
loop {
break {
self.stats.caconn_loop4_count_inc();
if self.sender_polling.is_sending() {
match self.sender_polling.poll_unpin(cx) {
Ready(Ok(())) => {
self.stats.inserts_queue_push_inc();
continue;
}
Ready(Err(e)) => {
use crate::senderpolling::Error::*;
match e {
NoSendInProgress => break Ready(Err(Error::with_msg_no_trace("no send in progress"))),
Closed(_item) => break Ready(Err(Error::with_msg_no_trace("insert channel closed"))),
}
}
Pending => Pending,
}
} else {
if let Some(item) = self.insert_item_queue.pop_front() {
self.sender_polling.send2(item);
continue;
} else {
Ready(Ok(()))
}
}
};
}
}
fn check_channels_alive(&mut self) -> Result<(), Error> {
let tsnow = Instant::now();
trace!("CheckChannelsAlive {addr:?}", addr = &self.remote_addr_dbg);
@@ -1784,6 +1753,12 @@ impl Stream for CaConn {
})))
} else if let Some(item) = self.ca_conn_event_out_queue.pop_front() {
Ready(Some(Ok(item)))
} else if let Some(item) = self.insert_item_queue.pop_front() {
let ev = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::QueryItem(item),
};
Ready(Some(Ok(ev)))
} else {
let ret = loop {
self.stats.caconn_loop1_count_inc();
@@ -1807,10 +1782,6 @@ impl Stream for CaConn {
}
};
}
match self.handle_insert_futs(cx) {
Ready(_) => {}
Pending => break Pending,
}
if self.is_shutdown() {
if self.outgoing_queues_empty() {
debug!("shut down and all items flushed {}", self.remote_addr_dbg);

View File

@@ -94,6 +94,7 @@ pub struct CaConnSet {
connset_tx: Sender<CaConnSetEvent>,
connset_rx: Receiver<CaConnSetEvent>,
channel_info_query_tx: Sender<ChannelInfoQuery>,
storage_insert_tx: Sender<QueryItem>,
shutdown: bool,
}
@@ -108,6 +109,7 @@ impl CaConnSet {
connset_tx: connset_tx.clone(),
connset_rx,
channel_info_query_tx,
storage_insert_tx,
shutdown: false,
};
// TODO await on jh

View File

@@ -45,6 +45,7 @@ impl DaemonEvent {
None => format!("CaConnEvent/None"),
EchoTimeout => format!("CaConnEvent/EchoTimeout"),
ConnCommandResult(_) => format!("CaConnEvent/ConnCommandResult"),
QueryItem(_) => format!("CaConnEvent/QueryItem"),
EndOfStream => format!("CaConnEvent/EndOfStream"),
}
}