From 93da364fd5da12100016dfb9d76ec9138e5cc15b Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 4 Sep 2023 11:31:28 +0200 Subject: [PATCH] Factor out unsafe code --- netfetch/src/ca/conn.rs | 49 +++++++------------- netfetch/src/lib.rs | 1 + netfetch/src/senderpolling.rs | 86 +++++++++++++++++++++++++++++++++++ 3 files changed, 104 insertions(+), 32 deletions(-) create mode 100644 netfetch/src/senderpolling.rs diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 02dee76..69a5c22 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -6,6 +6,7 @@ use super::proto::CaProto; use super::ExtraInsertsConf; use crate::ca::proto::CreateChan; use crate::ca::proto::EventAdd; +use crate::senderpolling::SenderPolling; use crate::timebin::ConnTimeBin; use async_channel::Sender; use dbpg::seriesbychannel::ChannelInfoQuery; @@ -420,6 +421,7 @@ struct ChannelOpsResources<'a> { } pub struct CaConn { + backend: String, state: CaConnState, ticker: Pin>, proto: Option, @@ -431,9 +433,7 @@ pub struct CaConn { cid_by_subid: BTreeMap, name_by_cid: BTreeMap, insert_item_queue: VecDeque, - insert_item_sender: CommonInsertItemQueueSender, - insert_item_send_fut: Option>, - backend: String, + sender_polling: SenderPolling, remote_addr_dbg: SocketAddrV4, local_epics_hostname: String, array_truncate: usize, @@ -473,6 +473,7 @@ impl CaConn { ) -> Self { let (cq_tx, cq_rx) = async_channel::bounded(32); Self { + backend, state: CaConnState::Unconnected, ticker: Self::new_self_ticker(), proto: None, @@ -484,9 +485,7 @@ impl CaConn { cid_by_subid: BTreeMap::new(), name_by_cid: BTreeMap::new(), insert_item_queue: VecDeque::new(), - insert_item_sender, - insert_item_send_fut: None, - backend, + sender_polling: SenderPolling::new(insert_item_sender.inner().clone()), remote_addr_dbg, local_epics_hostname, array_truncate, @@ -801,40 +800,26 @@ impl CaConn { use Poll::*; loop { self.stats.caconn_loop4_count_inc(); - match self.insert_item_send_fut.as_mut() { - Some(fut) => match fut.poll_unpin(cx) { + if self.sender_polling.is_sending() { + match self.sender_polling.poll_unpin(cx) { Ready(Ok(())) => { self.stats.inserts_queue_push_inc(); - self.insert_item_send_fut = None; } Ready(Err(e)) => { - self.insert_item_send_fut = None; - error!("handle_insert_futs can not send item {e}"); - break Ready(Err(Error::with_msg_no_trace(format!("can not send the item")))); - } - Pending => { - if false { - // TODO test this case. - self.stats.inserts_queue_drop_inc(); - self.insert_item_send_fut = None; - } else { - // Wait until global queue is ready (peer will see network pressure) - break Pending; + use crate::senderpolling::Error::*; + match e { + NoSendInProgress => break Ready(Err(Error::with_msg_no_trace("no send in progress"))), + Closed(_item) => break Ready(Err(Error::with_msg_no_trace("insert channel closed"))), } } - }, - None => {} - } - if let Some(item) = self.insert_item_queue.pop_front() { - self.stats.inserts_queue_pop_for_global_inc(); - let sender = unsafe { &*(&self.insert_item_sender as *const CommonInsertItemQueueSender) }; - if sender.is_full() { - self.stats.inserts_queue_drop_inc(); - } else { - self.insert_item_send_fut = Some(sender.send(item)); + Pending => break Pending, } } else { - break Ready(Ok(())); + if let Some(item) = self.insert_item_queue.pop_front() { + self.sender_polling.send2(item); + } else { + break Ready(Ok(())); + } } } } diff --git a/netfetch/src/lib.rs b/netfetch/src/lib.rs index 35a343f..5786bc7 100644 --- a/netfetch/src/lib.rs +++ b/netfetch/src/lib.rs @@ -8,6 +8,7 @@ pub mod metrics; pub mod netbuf; pub mod patchcollect; pub mod rt; +pub mod senderpolling; #[cfg(test)] pub mod test; pub mod timebin; diff --git a/netfetch/src/senderpolling.rs b/netfetch/src/senderpolling.rs new file mode 100644 index 0000000..cb45aba --- /dev/null +++ b/netfetch/src/senderpolling.rs @@ -0,0 +1,86 @@ +use async_channel::Send; +use async_channel::Sender; +use err::thiserror; +use futures_util::Future; +use futures_util::FutureExt; +use pin_project::pin_project; +use std::marker::PhantomPinned; +use std::pin::Pin; +use std::ptr::NonNull; +use std::task::Context; +use std::task::Poll; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + NoSendInProgress, + Closed(T), +} + +#[pin_project] +pub struct SenderPolling +where + T: 'static, +{ + sender: Sender, + sender_ptr: NonNull>, + fut: Option>, + _pin: PhantomPinned, +} + +unsafe impl core::marker::Send for SenderPolling where T: core::marker::Send {} + +impl SenderPolling { + pub fn new(sender: Sender) -> Self { + let mut ret = Self { + sender, + sender_ptr: NonNull::dangling(), + fut: None, + _pin: PhantomPinned, + }; + ret.sender_ptr = NonNull::from(&ret.sender); + ret + } + + pub fn is_sending(&self) -> bool { + self.fut.is_some() + } + + pub fn send(self: Pin<&mut Self>, item: T) { + let (tx, fut) = unsafe { + let x = Pin::get_unchecked_mut(self); + (x.sender_ptr.as_mut(), &mut x.fut) + }; + let s = tx.send(item); + *fut = Some(s); + } + + pub fn send2(&mut self, item: T) { + let sender = unsafe { self.sender_ptr.as_mut() }; + let s = sender.send(item); + self.fut = Some(s); + } +} + +impl Future for SenderPolling { + type Output = Result<(), Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + use Poll::*; + let this = self.project(); + match this.fut { + Some(fut) => match fut.poll_unpin(cx) { + Ready(Ok(())) => { + *this.fut = None; + Ready(Ok(())) + } + Ready(Err(e)) => { + *this.fut = None; + Ready(Err(Error::Closed(e.0))) + } + Pending => Pending, + }, + None => Ready(Err(Error::NoSendInProgress)), + } + } +}