diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 250f26a..5c0512f 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -948,7 +948,7 @@ impl CaConnSet { } fn handle_ca_conn_eos(&mut self, addr: SocketAddr, reason: EndOfStreamReason) -> Result<(), Error> { - info!("handle_ca_conn_eos {addr} {reason:?}"); + debug!("handle_ca_conn_eos {addr} {reason:?}"); if let Some(e) = self.ca_conn_ress.remove(&addr) { self.stats.ca_conn_eos_ok().inc(); self.await_ca_conn_jhs.push_back((addr, e.jh)); @@ -1038,27 +1038,6 @@ impl CaConnSet { Ok(()) } - fn remove_channel_status_for_addr(&mut self, addr: SocketAddr) -> Result<(), Error> { - debug!("TODO remove_channel_status_for_addr"); - if true { - let e = Error::with_msg_no_trace("TODO remove_channel_status_for_addr"); - return Err(e); - } - for (_, v) in self.channel_states.iter_mut() { - match &mut v.value { - ChannelStateValue::Active(st2) => match st2 { - ActiveChannelState::WithStatusSeriesId(st3) => match &mut st3.inner { - WithStatusSeriesIdStateInner::WithAddress { addr: a2, state: st4 } => {} - _ => {} - }, - _ => {} - }, - ChannelStateValue::ToRemove { .. } => {} - } - } - Ok(()) - } - fn ready_for_end_of_stream(&self) -> bool { if !self.shutdown_stopping { false @@ -1173,61 +1152,6 @@ impl CaConnSet { } } - #[allow(unused)] - async fn __enqueue_command_to_all(&self, cmdgen: F) -> Result, Error> - where - F: Fn() -> ConnCommand, - { - let mut senders: Vec<(SocketAddrV4, Sender)> = err::todoval(); - let mut cmdids = Vec::new(); - for (addr, sender) in senders { - let cmd = cmdgen(); - let cmdid = cmd.id(); - match sender.send(cmd).await { - Ok(()) => { - cmdids.push(CmdId(addr, cmdid)); - } - Err(e) => { - error!("enqueue_command_to_all can not send command {e:?} {:?}", e.0); - } - } - } - Ok(cmdids) - } - - #[allow(unused)] - async fn __send_command_to_addr_disabled(&self, addr: &SocketAddrV4, cmdgen: F) -> Result - where - F: Fn() -> (ConnCommand, async_channel::Receiver), - { - let tx: Sender = err::todoval(); - let (cmd, rx) = cmdgen(); - tx.send(cmd).await.err_conv()?; - let ret = rx.recv().await.err_conv()?; - Ok(ret) - } - - #[allow(unused)] - async fn __send_command_inner_disabled<'a, IT, F, R>(it: &mut IT, cmdgen: F) -> Vec> - where - IT: Iterator)>, - F: Fn() -> (ConnCommand, async_channel::Receiver), - { - let mut rxs = Vec::new(); - for (_, tx) in it { - let (cmd, rx) = cmdgen(); - match tx.send(cmd).await { - Ok(()) => { - rxs.push(rx); - } - Err(e) => { - error!("can not send command {e:?}"); - } - } - } - rxs - } - async fn wait_stopped(&self) -> Result<(), Error> { warn!("Lock for wait_stopped"); // let mut g = self.ca_conn_ress.lock().await; @@ -1594,6 +1518,96 @@ impl CaConnSet { } } +struct PendingProgress { + pending: bool, + progress: bool, +} + +impl PendingProgress { + fn new() -> Self { + Self { + pending: false, + progress: false, + } + } + + fn mark_pending(&mut self) { + self.pending = true; + } + + fn mark_progress(&mut self) { + self.progress = true; + } + + fn pending(&self) -> bool { + self.pending + } + + fn progress(&self) -> bool { + self.progress + } +} + +fn merge_pending_progress(res: Option>>, penpro: &mut PendingProgress) -> Result<(), E> +where + E: std::error::Error, +{ + use Poll::*; + match res { + Some(x) => match x { + Ready(x) => match x { + Ok(()) => { + penpro.mark_progress(); + Ok(()) + } + Err(e) => { + penpro.mark_progress(); + Err(e) + } + }, + Pending => { + penpro.mark_pending(); + Ok(()) + } + }, + None => Ok(()), + } +} + +fn sender_polling_send( + qu: &mut VecDeque, + mut sender: Pin<&mut SenderPolling>, + cx: &mut Context, + on_send_ok: F, +) -> Option>> +where + T: Unpin, + F: FnOnce(), +{ + use Poll::*; + if sender.is_idle() { + if let Some(item) = qu.pop_front() { + sender.as_mut().send_pin(item); + } + } + if sender.is_sending() { + match sender.poll_unpin(cx) { + Ready(Ok(())) => { + on_send_ok(); + Some(Ready(Ok(()))) + } + Ready(Err(_)) => { + let e = Error::with_msg_no_trace("can not send into channel"); + error!("{e}"); + Some(Ready(Err(e))) + } + Pending => Some(Pending), + } + } else { + None + } +} + impl Stream for CaConnSet { type Item = CaConnSetItem; @@ -1625,8 +1639,7 @@ impl Stream for CaConnSet { .set(self.find_ioc_query_sender.len().unwrap_or(0) as _); self.stats.ca_conn_res_tx_len.set(self.ca_conn_res_tx.len() as _); - let mut have_pending = false; - let mut have_progress = false; + let mut penpro = PendingProgress::new(); if let Err(e) = self.try_push_ca_conn_cmds(cx) { break Ready(Some(CaConnSetItem::Error(e))); @@ -1639,16 +1652,15 @@ impl Stream for CaConnSet { match self.ticker.poll_unpin(cx) { Ready(()) => match self.as_mut().handle_own_ticker_tick(cx) { Ok(()) => { - have_progress = true; + penpro.mark_progress(); } Err(e) => { - have_progress = true; error!("ticker {e}"); break Ready(Some(CaConnSetItem::Error(e))); } }, Pending => { - have_pending = true; + penpro.mark_pending(); } } @@ -1672,83 +1684,49 @@ impl Stream for CaConnSet { error!("CaConn {addr} join error: {e} left {left}"); } } - have_progress = true; + penpro.mark_progress(); } Pending => { - have_pending = true; + penpro.mark_pending(); } } } - if self.storage_insert_sender.is_idle() { - if let Some(item) = self.storage_insert_queue.pop_front() { - self.storage_insert_sender.as_mut().send_pin(item); + { + let this = self.as_mut().get_mut(); + let qu = &mut this.storage_insert_queue; + let tx = this.storage_insert_sender.as_mut(); + let counter = this.stats.storage_insert_queue_send(); + let x = sender_polling_send(qu, tx, cx, || { + counter.inc(); + }); + if let Err(e) = merge_pending_progress(x, &mut penpro) { + break Ready(Some(CaConnSetItem::Error(e))); } } - if self.storage_insert_sender.is_sending() { - match self.storage_insert_sender.poll_unpin(cx) { - Ready(Ok(())) => { - self.stats.storage_insert_queue_send().inc(); - have_progress = true; - } - Ready(Err(_)) => { - let e = Error::with_msg_no_trace("can not send into channel"); - error!("{e}"); - break Ready(Some(CaConnSetItem::Error(e))); - } - Pending => { - have_pending = true; - } + { + let this = self.as_mut().get_mut(); + let qu = &mut this.find_ioc_query_queue; + let tx = this.find_ioc_query_sender.as_mut(); + let x = sender_polling_send(qu, tx, cx, || ()); + if let Err(e) = merge_pending_progress(x, &mut penpro) { + break Ready(Some(CaConnSetItem::Error(e))); } } - - if self.find_ioc_query_sender.is_idle() { - if let Some(item) = self.find_ioc_query_queue.pop_front() { - self.find_ioc_query_sender.as_mut().send_pin(item); - } - } - if self.find_ioc_query_sender.is_sending() { - match self.find_ioc_query_sender.poll_unpin(cx) { - Ready(Ok(())) => { - have_progress = true; - } - Ready(Err(_)) => { - let e = Error::with_msg_no_trace("can not send into channel"); - error!("{e}"); - break Ready(Some(CaConnSetItem::Error(e))); - } - Pending => { - have_pending = true; - } - } - } - - if self.channel_info_query_sender.is_idle() { - // if self.channel_info_query_sender.len().unwrap_or(0) <= 10 {} - if let Some(item) = self.channel_info_query_queue.pop_front() { - self.channel_info_query_sender.as_mut().send_pin(item); - } - } - if self.channel_info_query_sender.is_sending() { - match self.channel_info_query_sender.poll_unpin(cx) { - Ready(Ok(())) => { - have_progress = true; - } - Ready(Err(_)) => { - let e = Error::with_msg_no_trace("can not send into channel"); - error!("{e}"); - break Ready(Some(CaConnSetItem::Error(e))); - } - Pending => { - have_pending = true; - } + { + let this = self.as_mut().get_mut(); + let qu = &mut this.channel_info_query_queue; + let tx = this.channel_info_query_sender.as_mut(); + let x = sender_polling_send(qu, tx, cx, || ()); + if let Err(e) = merge_pending_progress(x, &mut penpro) { + break Ready(Some(CaConnSetItem::Error(e))); } } match self.find_ioc_res_rx.as_mut().poll_next(cx) { Ready(Some(x)) => match self.handle_ioc_query_result(x) { Ok(()) => { - have_progress = true; + penpro.mark_progress(); } Err(e) => break Ready(Some(CaConnSetItem::Error(e))), }, @@ -1756,40 +1734,40 @@ impl Stream for CaConnSet { // TODO trigger shutdown because of error } Pending => { - have_pending = true; + penpro.mark_pending(); } } match self.ca_conn_res_rx.as_mut().poll_next(cx) { Ready(Some((addr, ev))) => match self.handle_ca_conn_event(addr, ev) { Ok(()) => { - have_progress = true; + penpro.mark_progress(); } Err(e) => break Ready(Some(CaConnSetItem::Error(e))), }, Ready(None) => {} Pending => { - have_pending = true; + penpro.mark_pending(); } } match self.channel_info_res_rx.as_mut().poll_next(cx) { Ready(Some(x)) => match self.handle_series_lookup_result(x) { Ok(()) => { - have_progress = true; + penpro.mark_progress(); } Err(e) => break Ready(Some(CaConnSetItem::Error(e))), }, Ready(None) => {} Pending => { - have_pending = true; + penpro.mark_pending(); } } match self.connset_inp_rx.as_mut().poll_next(cx) { Ready(Some(x)) => match self.handle_event(x) { Ok(()) => { - have_progress = true; + penpro.mark_progress(); } Err(e) => break Ready(Some(CaConnSetItem::Error(e))), }, @@ -1797,24 +1775,24 @@ impl Stream for CaConnSet { warn!("connset_inp_rx broken?") } Pending => { - have_pending = true; + penpro.mark_pending(); } } break if self.ready_for_end_of_stream() { self.stats.ready_for_end_of_stream().inc(); - if have_progress { + if penpro.progress() { self.stats.ready_for_end_of_stream_with_progress().inc(); continue; } else { Ready(None) } } else { - if have_progress { + if penpro.progress() { self.stats.poll_reloop().inc(); continue; } else { - if have_pending { + if penpro.pending() { self.stats.poll_pending().inc(); Pending } else {