Refactor channel send

This commit is contained in:
Dominik Werder
2024-09-30 13:22:30 +02:00
parent f09ee15b48
commit d4b8beaa82

View File

@@ -948,7 +948,7 @@ impl CaConnSet {
}
fn handle_ca_conn_eos(&mut self, addr: SocketAddr, reason: EndOfStreamReason) -> Result<(), Error> {
info!("handle_ca_conn_eos {addr} {reason:?}");
debug!("handle_ca_conn_eos {addr} {reason:?}");
if let Some(e) = self.ca_conn_ress.remove(&addr) {
self.stats.ca_conn_eos_ok().inc();
self.await_ca_conn_jhs.push_back((addr, e.jh));
@@ -1038,27 +1038,6 @@ impl CaConnSet {
Ok(())
}
fn remove_channel_status_for_addr(&mut self, addr: SocketAddr) -> Result<(), Error> {
debug!("TODO remove_channel_status_for_addr");
if true {
let e = Error::with_msg_no_trace("TODO remove_channel_status_for_addr");
return Err(e);
}
for (_, v) in self.channel_states.iter_mut() {
match &mut v.value {
ChannelStateValue::Active(st2) => match st2 {
ActiveChannelState::WithStatusSeriesId(st3) => match &mut st3.inner {
WithStatusSeriesIdStateInner::WithAddress { addr: a2, state: st4 } => {}
_ => {}
},
_ => {}
},
ChannelStateValue::ToRemove { .. } => {}
}
}
Ok(())
}
fn ready_for_end_of_stream(&self) -> bool {
if !self.shutdown_stopping {
false
@@ -1173,61 +1152,6 @@ impl CaConnSet {
}
}
#[allow(unused)]
async fn __enqueue_command_to_all<F>(&self, cmdgen: F) -> Result<Vec<CmdId>, Error>
where
F: Fn() -> ConnCommand,
{
let mut senders: Vec<(SocketAddrV4, Sender<ConnCommand>)> = err::todoval();
let mut cmdids = Vec::new();
for (addr, sender) in senders {
let cmd = cmdgen();
let cmdid = cmd.id();
match sender.send(cmd).await {
Ok(()) => {
cmdids.push(CmdId(addr, cmdid));
}
Err(e) => {
error!("enqueue_command_to_all can not send command {e:?} {:?}", e.0);
}
}
}
Ok(cmdids)
}
#[allow(unused)]
async fn __send_command_to_addr_disabled<F, R>(&self, addr: &SocketAddrV4, cmdgen: F) -> Result<R, Error>
where
F: Fn() -> (ConnCommand, async_channel::Receiver<R>),
{
let tx: Sender<ConnCommand> = err::todoval();
let (cmd, rx) = cmdgen();
tx.send(cmd).await.err_conv()?;
let ret = rx.recv().await.err_conv()?;
Ok(ret)
}
#[allow(unused)]
async fn __send_command_inner_disabled<'a, IT, F, R>(it: &mut IT, cmdgen: F) -> Vec<async_channel::Receiver<R>>
where
IT: Iterator<Item = (&'a SocketAddrV4, &'a async_channel::Sender<ConnCommand>)>,
F: Fn() -> (ConnCommand, async_channel::Receiver<R>),
{
let mut rxs = Vec::new();
for (_, tx) in it {
let (cmd, rx) = cmdgen();
match tx.send(cmd).await {
Ok(()) => {
rxs.push(rx);
}
Err(e) => {
error!("can not send command {e:?}");
}
}
}
rxs
}
async fn wait_stopped(&self) -> Result<(), Error> {
warn!("Lock for wait_stopped");
// let mut g = self.ca_conn_ress.lock().await;
@@ -1594,6 +1518,96 @@ impl CaConnSet {
}
}
struct PendingProgress {
pending: bool,
progress: bool,
}
impl PendingProgress {
fn new() -> Self {
Self {
pending: false,
progress: false,
}
}
fn mark_pending(&mut self) {
self.pending = true;
}
fn mark_progress(&mut self) {
self.progress = true;
}
fn pending(&self) -> bool {
self.pending
}
fn progress(&self) -> bool {
self.progress
}
}
fn merge_pending_progress<E>(res: Option<Poll<Result<(), E>>>, penpro: &mut PendingProgress) -> Result<(), E>
where
E: std::error::Error,
{
use Poll::*;
match res {
Some(x) => match x {
Ready(x) => match x {
Ok(()) => {
penpro.mark_progress();
Ok(())
}
Err(e) => {
penpro.mark_progress();
Err(e)
}
},
Pending => {
penpro.mark_pending();
Ok(())
}
},
None => Ok(()),
}
}
fn sender_polling_send<T, F>(
qu: &mut VecDeque<T>,
mut sender: Pin<&mut SenderPolling<T>>,
cx: &mut Context,
on_send_ok: F,
) -> Option<Poll<Result<(), Error>>>
where
T: Unpin,
F: FnOnce(),
{
use Poll::*;
if sender.is_idle() {
if let Some(item) = qu.pop_front() {
sender.as_mut().send_pin(item);
}
}
if sender.is_sending() {
match sender.poll_unpin(cx) {
Ready(Ok(())) => {
on_send_ok();
Some(Ready(Ok(())))
}
Ready(Err(_)) => {
let e = Error::with_msg_no_trace("can not send into channel");
error!("{e}");
Some(Ready(Err(e)))
}
Pending => Some(Pending),
}
} else {
None
}
}
impl Stream for CaConnSet {
type Item = CaConnSetItem;
@@ -1625,8 +1639,7 @@ impl Stream for CaConnSet {
.set(self.find_ioc_query_sender.len().unwrap_or(0) as _);
self.stats.ca_conn_res_tx_len.set(self.ca_conn_res_tx.len() as _);
let mut have_pending = false;
let mut have_progress = false;
let mut penpro = PendingProgress::new();
if let Err(e) = self.try_push_ca_conn_cmds(cx) {
break Ready(Some(CaConnSetItem::Error(e)));
@@ -1639,16 +1652,15 @@ impl Stream for CaConnSet {
match self.ticker.poll_unpin(cx) {
Ready(()) => match self.as_mut().handle_own_ticker_tick(cx) {
Ok(()) => {
have_progress = true;
penpro.mark_progress();
}
Err(e) => {
have_progress = true;
error!("ticker {e}");
break Ready(Some(CaConnSetItem::Error(e)));
}
},
Pending => {
have_pending = true;
penpro.mark_pending();
}
}
@@ -1672,83 +1684,49 @@ impl Stream for CaConnSet {
error!("CaConn {addr} join error: {e} left {left}");
}
}
have_progress = true;
penpro.mark_progress();
}
Pending => {
have_pending = true;
penpro.mark_pending();
}
}
}
if self.storage_insert_sender.is_idle() {
if let Some(item) = self.storage_insert_queue.pop_front() {
self.storage_insert_sender.as_mut().send_pin(item);
{
let this = self.as_mut().get_mut();
let qu = &mut this.storage_insert_queue;
let tx = this.storage_insert_sender.as_mut();
let counter = this.stats.storage_insert_queue_send();
let x = sender_polling_send(qu, tx, cx, || {
counter.inc();
});
if let Err(e) = merge_pending_progress(x, &mut penpro) {
break Ready(Some(CaConnSetItem::Error(e)));
}
}
if self.storage_insert_sender.is_sending() {
match self.storage_insert_sender.poll_unpin(cx) {
Ready(Ok(())) => {
self.stats.storage_insert_queue_send().inc();
have_progress = true;
}
Ready(Err(_)) => {
let e = Error::with_msg_no_trace("can not send into channel");
error!("{e}");
break Ready(Some(CaConnSetItem::Error(e)));
}
Pending => {
have_pending = true;
}
{
let this = self.as_mut().get_mut();
let qu = &mut this.find_ioc_query_queue;
let tx = this.find_ioc_query_sender.as_mut();
let x = sender_polling_send(qu, tx, cx, || ());
if let Err(e) = merge_pending_progress(x, &mut penpro) {
break Ready(Some(CaConnSetItem::Error(e)));
}
}
if self.find_ioc_query_sender.is_idle() {
if let Some(item) = self.find_ioc_query_queue.pop_front() {
self.find_ioc_query_sender.as_mut().send_pin(item);
}
}
if self.find_ioc_query_sender.is_sending() {
match self.find_ioc_query_sender.poll_unpin(cx) {
Ready(Ok(())) => {
have_progress = true;
}
Ready(Err(_)) => {
let e = Error::with_msg_no_trace("can not send into channel");
error!("{e}");
break Ready(Some(CaConnSetItem::Error(e)));
}
Pending => {
have_pending = true;
}
}
}
if self.channel_info_query_sender.is_idle() {
// if self.channel_info_query_sender.len().unwrap_or(0) <= 10 {}
if let Some(item) = self.channel_info_query_queue.pop_front() {
self.channel_info_query_sender.as_mut().send_pin(item);
}
}
if self.channel_info_query_sender.is_sending() {
match self.channel_info_query_sender.poll_unpin(cx) {
Ready(Ok(())) => {
have_progress = true;
}
Ready(Err(_)) => {
let e = Error::with_msg_no_trace("can not send into channel");
error!("{e}");
break Ready(Some(CaConnSetItem::Error(e)));
}
Pending => {
have_pending = true;
}
{
let this = self.as_mut().get_mut();
let qu = &mut this.channel_info_query_queue;
let tx = this.channel_info_query_sender.as_mut();
let x = sender_polling_send(qu, tx, cx, || ());
if let Err(e) = merge_pending_progress(x, &mut penpro) {
break Ready(Some(CaConnSetItem::Error(e)));
}
}
match self.find_ioc_res_rx.as_mut().poll_next(cx) {
Ready(Some(x)) => match self.handle_ioc_query_result(x) {
Ok(()) => {
have_progress = true;
penpro.mark_progress();
}
Err(e) => break Ready(Some(CaConnSetItem::Error(e))),
},
@@ -1756,40 +1734,40 @@ impl Stream for CaConnSet {
// TODO trigger shutdown because of error
}
Pending => {
have_pending = true;
penpro.mark_pending();
}
}
match self.ca_conn_res_rx.as_mut().poll_next(cx) {
Ready(Some((addr, ev))) => match self.handle_ca_conn_event(addr, ev) {
Ok(()) => {
have_progress = true;
penpro.mark_progress();
}
Err(e) => break Ready(Some(CaConnSetItem::Error(e))),
},
Ready(None) => {}
Pending => {
have_pending = true;
penpro.mark_pending();
}
}
match self.channel_info_res_rx.as_mut().poll_next(cx) {
Ready(Some(x)) => match self.handle_series_lookup_result(x) {
Ok(()) => {
have_progress = true;
penpro.mark_progress();
}
Err(e) => break Ready(Some(CaConnSetItem::Error(e))),
},
Ready(None) => {}
Pending => {
have_pending = true;
penpro.mark_pending();
}
}
match self.connset_inp_rx.as_mut().poll_next(cx) {
Ready(Some(x)) => match self.handle_event(x) {
Ok(()) => {
have_progress = true;
penpro.mark_progress();
}
Err(e) => break Ready(Some(CaConnSetItem::Error(e))),
},
@@ -1797,24 +1775,24 @@ impl Stream for CaConnSet {
warn!("connset_inp_rx broken?")
}
Pending => {
have_pending = true;
penpro.mark_pending();
}
}
break if self.ready_for_end_of_stream() {
self.stats.ready_for_end_of_stream().inc();
if have_progress {
if penpro.progress() {
self.stats.ready_for_end_of_stream_with_progress().inc();
continue;
} else {
Ready(None)
}
} else {
if have_progress {
if penpro.progress() {
self.stats.poll_reloop().inc();
continue;
} else {
if have_pending {
if penpro.pending() {
self.stats.poll_pending().inc();
Pending
} else {