Factor out unsafe code
This commit is contained in:
@@ -6,6 +6,7 @@ use super::proto::CaProto;
|
||||
use super::ExtraInsertsConf;
|
||||
use crate::ca::proto::CreateChan;
|
||||
use crate::ca::proto::EventAdd;
|
||||
use crate::senderpolling::SenderPolling;
|
||||
use crate::timebin::ConnTimeBin;
|
||||
use async_channel::Sender;
|
||||
use dbpg::seriesbychannel::ChannelInfoQuery;
|
||||
@@ -420,6 +421,7 @@ struct ChannelOpsResources<'a> {
|
||||
}
|
||||
|
||||
pub struct CaConn {
|
||||
backend: String,
|
||||
state: CaConnState,
|
||||
ticker: Pin<Box<tokio::time::Sleep>>,
|
||||
proto: Option<CaProto>,
|
||||
@@ -431,9 +433,7 @@ pub struct CaConn {
|
||||
cid_by_subid: BTreeMap<u32, Cid>,
|
||||
name_by_cid: BTreeMap<Cid, String>,
|
||||
insert_item_queue: VecDeque<QueryItem>,
|
||||
insert_item_sender: CommonInsertItemQueueSender,
|
||||
insert_item_send_fut: Option<async_channel::Send<'static, QueryItem>>,
|
||||
backend: String,
|
||||
sender_polling: SenderPolling<QueryItem>,
|
||||
remote_addr_dbg: SocketAddrV4,
|
||||
local_epics_hostname: String,
|
||||
array_truncate: usize,
|
||||
@@ -473,6 +473,7 @@ impl CaConn {
|
||||
) -> Self {
|
||||
let (cq_tx, cq_rx) = async_channel::bounded(32);
|
||||
Self {
|
||||
backend,
|
||||
state: CaConnState::Unconnected,
|
||||
ticker: Self::new_self_ticker(),
|
||||
proto: None,
|
||||
@@ -484,9 +485,7 @@ impl CaConn {
|
||||
cid_by_subid: BTreeMap::new(),
|
||||
name_by_cid: BTreeMap::new(),
|
||||
insert_item_queue: VecDeque::new(),
|
||||
insert_item_sender,
|
||||
insert_item_send_fut: None,
|
||||
backend,
|
||||
sender_polling: SenderPolling::new(insert_item_sender.inner().clone()),
|
||||
remote_addr_dbg,
|
||||
local_epics_hostname,
|
||||
array_truncate,
|
||||
@@ -801,40 +800,26 @@ impl CaConn {
|
||||
use Poll::*;
|
||||
loop {
|
||||
self.stats.caconn_loop4_count_inc();
|
||||
match self.insert_item_send_fut.as_mut() {
|
||||
Some(fut) => match fut.poll_unpin(cx) {
|
||||
if self.sender_polling.is_sending() {
|
||||
match self.sender_polling.poll_unpin(cx) {
|
||||
Ready(Ok(())) => {
|
||||
self.stats.inserts_queue_push_inc();
|
||||
self.insert_item_send_fut = None;
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
self.insert_item_send_fut = None;
|
||||
error!("handle_insert_futs can not send item {e}");
|
||||
break Ready(Err(Error::with_msg_no_trace(format!("can not send the item"))));
|
||||
}
|
||||
Pending => {
|
||||
if false {
|
||||
// TODO test this case.
|
||||
self.stats.inserts_queue_drop_inc();
|
||||
self.insert_item_send_fut = None;
|
||||
} else {
|
||||
// Wait until global queue is ready (peer will see network pressure)
|
||||
break Pending;
|
||||
use crate::senderpolling::Error::*;
|
||||
match e {
|
||||
NoSendInProgress => break Ready(Err(Error::with_msg_no_trace("no send in progress"))),
|
||||
Closed(_item) => break Ready(Err(Error::with_msg_no_trace("insert channel closed"))),
|
||||
}
|
||||
}
|
||||
},
|
||||
None => {}
|
||||
}
|
||||
if let Some(item) = self.insert_item_queue.pop_front() {
|
||||
self.stats.inserts_queue_pop_for_global_inc();
|
||||
let sender = unsafe { &*(&self.insert_item_sender as *const CommonInsertItemQueueSender) };
|
||||
if sender.is_full() {
|
||||
self.stats.inserts_queue_drop_inc();
|
||||
} else {
|
||||
self.insert_item_send_fut = Some(sender.send(item));
|
||||
Pending => break Pending,
|
||||
}
|
||||
} else {
|
||||
break Ready(Ok(()));
|
||||
if let Some(item) = self.insert_item_queue.pop_front() {
|
||||
self.sender_polling.send2(item);
|
||||
} else {
|
||||
break Ready(Ok(()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ pub mod metrics;
|
||||
pub mod netbuf;
|
||||
pub mod patchcollect;
|
||||
pub mod rt;
|
||||
pub mod senderpolling;
|
||||
#[cfg(test)]
|
||||
pub mod test;
|
||||
pub mod timebin;
|
||||
|
||||
86
netfetch/src/senderpolling.rs
Normal file
86
netfetch/src/senderpolling.rs
Normal file
@@ -0,0 +1,86 @@
|
||||
use async_channel::Send;
|
||||
use async_channel::Sender;
|
||||
use err::thiserror;
|
||||
use futures_util::Future;
|
||||
use futures_util::FutureExt;
|
||||
use pin_project::pin_project;
|
||||
use std::marker::PhantomPinned;
|
||||
use std::pin::Pin;
|
||||
use std::ptr::NonNull;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error<T> {
|
||||
NoSendInProgress,
|
||||
Closed(T),
|
||||
}
|
||||
|
||||
#[pin_project]
|
||||
pub struct SenderPolling<T>
|
||||
where
|
||||
T: 'static,
|
||||
{
|
||||
sender: Sender<T>,
|
||||
sender_ptr: NonNull<Sender<T>>,
|
||||
fut: Option<Send<'static, T>>,
|
||||
_pin: PhantomPinned,
|
||||
}
|
||||
|
||||
unsafe impl<T> core::marker::Send for SenderPolling<T> where T: core::marker::Send {}
|
||||
|
||||
impl<T> SenderPolling<T> {
|
||||
pub fn new(sender: Sender<T>) -> Self {
|
||||
let mut ret = Self {
|
||||
sender,
|
||||
sender_ptr: NonNull::dangling(),
|
||||
fut: None,
|
||||
_pin: PhantomPinned,
|
||||
};
|
||||
ret.sender_ptr = NonNull::from(&ret.sender);
|
||||
ret
|
||||
}
|
||||
|
||||
pub fn is_sending(&self) -> bool {
|
||||
self.fut.is_some()
|
||||
}
|
||||
|
||||
pub fn send(self: Pin<&mut Self>, item: T) {
|
||||
let (tx, fut) = unsafe {
|
||||
let x = Pin::get_unchecked_mut(self);
|
||||
(x.sender_ptr.as_mut(), &mut x.fut)
|
||||
};
|
||||
let s = tx.send(item);
|
||||
*fut = Some(s);
|
||||
}
|
||||
|
||||
pub fn send2(&mut self, item: T) {
|
||||
let sender = unsafe { self.sender_ptr.as_mut() };
|
||||
let s = sender.send(item);
|
||||
self.fut = Some(s);
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Future for SenderPolling<T> {
|
||||
type Output = Result<(), Error<T>>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
use Poll::*;
|
||||
let this = self.project();
|
||||
match this.fut {
|
||||
Some(fut) => match fut.poll_unpin(cx) {
|
||||
Ready(Ok(())) => {
|
||||
*this.fut = None;
|
||||
Ready(Ok(()))
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
*this.fut = None;
|
||||
Ready(Err(Error::Closed(e.0)))
|
||||
}
|
||||
Pending => Pending,
|
||||
},
|
||||
None => Ready(Err(Error::NoSendInProgress)),
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user