From f7db475b3024d05d0e3d2be719de08bbc3a0eae4 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 30 Oct 2023 13:52:41 +0100 Subject: [PATCH] WIP checks --- ingest-bsread/Cargo.toml | 1 + ingest-bsread/src/zmtp/zmtpproto.rs | 24 +++++++++---------- netfetch/src/ca/conn.rs | 33 +++++++++++++------------- netfetch/src/ca/connset.rs | 33 +++++++++++++------------- netfetch/src/ca/connset_input_merge.rs | 27 +++++++++++++-------- netfetch/src/senderpolling.rs | 23 +++++++++++------- 6 files changed, 77 insertions(+), 64 deletions(-) diff --git a/ingest-bsread/Cargo.toml b/ingest-bsread/Cargo.toml index f4c6957..2f618b6 100644 --- a/ingest-bsread/Cargo.toml +++ b/ingest-bsread/Cargo.toml @@ -12,6 +12,7 @@ serde_json = "1.0" bytes = "1.4.0" md-5 = "0.10.5" hex = "0.4.3" +pin-project = "1" log = { path = "../log" } series = { path = "../series" } stats = { path = "../stats" } diff --git a/ingest-bsread/src/zmtp/zmtpproto.rs b/ingest-bsread/src/zmtp/zmtpproto.rs index 7b7f329..e50418c 100644 --- a/ingest-bsread/src/zmtp/zmtpproto.rs +++ b/ingest-bsread/src/zmtp/zmtpproto.rs @@ -9,7 +9,6 @@ use err::thiserror; use err::ThisError; use futures_util::pin_mut; use futures_util::Stream; -use futures_util::StreamExt; use netpod::log::*; use netpod::timeunits::SEC; use serde_json::Value as JsVal; @@ -116,8 +115,8 @@ pub struct Zmtp { peer_ver: (u8, u8), frames: Vec, inp_eof: bool, - data_tx: Sender, - data_rx: Receiver, + data_tx: Pin>>, + data_rx: Pin>>, input_state: Vec, input_state_ix: usize, conn_state_log: Vec, @@ -143,8 +142,8 @@ impl Zmtp { peer_ver: (0, 0), frames: Vec::new(), inp_eof: false, - data_tx: tx, - data_rx: rx, + data_tx: Box::pin(tx), + data_rx: Box::pin(rx), input_state: vec![0; 64].iter().map(|_| InpState::default()).collect(), input_state_ix: 0, conn_state_log: vec![0; 64].iter().map(|_| ConnState::InitSend).collect(), @@ -153,7 +152,7 @@ impl Zmtp { } pub fn out_channel(&self) -> Sender { - self.data_tx.clone() + self.data_tx.as_ref().get_ref().clone() } fn inpbuf_conn(&mut self, need_min: usize) -> Result<(&mut TcpStream, ReadBuf), Error> { @@ -162,10 +161,6 @@ impl Zmtp { Ok((&mut self.conn, buf)) } - fn outbuf_conn(&mut self) -> (&mut TcpStream, &[u8]) { - (&mut self.conn, self.outbuf.data()) - } - #[allow(unused)] #[inline(always)] fn record_input_state(&mut self) {} @@ -214,8 +209,8 @@ impl Zmtp { let mut item_count = 0; // TODO should I better keep one serialized item in Self so that I know how much space it needs? let serialized: Int> = if self.out_enable && self.outbuf.wcap() >= self.outbuf.cap() / 2 { - let data_rx = std::pin::pin!(self.data_rx); - match data_rx.poll_next(cx) { + let rx = self.data_rx.as_mut(); + match rx.poll_next(cx) { Ready(Some(_item)) => { // TODO item should be something that we can convert into a zmtp message. Int::Empty @@ -230,7 +225,10 @@ impl Zmtp { let write: Int> = if item_count > 0 { Int::NoWork } else if self.outbuf.len() > 0 { - let (w, b) = self.outbuf_conn(); + fn connout(this: &mut Zmtp) -> (&mut TcpStream, &[u8]) { + (&mut this.conn, this.outbuf.data()) + } + let (w, b) = connout(&mut self); pin_mut!(w); match w.poll_write(cx, b) { Ready(k) => match k { diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 8704646..e2cee03 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -509,8 +509,8 @@ pub struct CaConn { local_epics_hostname: String, stats: Arc, insert_ivl_min_mus: u64, - conn_command_tx: Sender, - conn_command_rx: Receiver, + conn_command_tx: Pin>>, + conn_command_rx: Pin>>, conn_backoff: f32, conn_backoff_beg: f32, inserts_counter: u64, @@ -518,10 +518,10 @@ pub struct CaConn { ioc_ping_last: Instant, ioc_ping_next: Instant, ioc_ping_start: Option, - storage_insert_sender: SenderPolling, + storage_insert_sender: Pin>>, ca_conn_event_out_queue: VecDeque, channel_info_query_queue: VecDeque, - channel_info_query_sending: SenderPolling, + channel_info_query_sending: Pin>>, time_binners: BTreeMap, thr_msg_poll: ThrottleTrace, ca_proto_stats: Arc, @@ -567,8 +567,8 @@ impl CaConn { local_epics_hostname, stats, insert_ivl_min_mus: 1000 * 6, - conn_command_tx: cq_tx, - conn_command_rx: cq_rx, + conn_command_tx: Box::pin(cq_tx), + conn_command_rx: Box::pin(cq_rx), conn_backoff: 0.02, conn_backoff_beg: 0.02, inserts_counter: 0, @@ -576,10 +576,10 @@ impl CaConn { ioc_ping_last: Instant::now(), ioc_ping_next: Instant::now() + Self::ioc_ping_ivl_rng(&mut rng), ioc_ping_start: None, - storage_insert_sender: SenderPolling::new(storage_insert_tx), + storage_insert_sender: Box::pin(SenderPolling::new(storage_insert_tx)), ca_conn_event_out_queue: VecDeque::new(), channel_info_query_queue: VecDeque::new(), - channel_info_query_sending: SenderPolling::new(channel_info_query_tx), + channel_info_query_sending: Box::pin(SenderPolling::new(channel_info_query_tx)), time_binners: BTreeMap::new(), thr_msg_poll: ThrottleTrace::new(Duration::from_millis(10000)), ca_proto_stats, @@ -596,8 +596,8 @@ impl CaConn { Box::pin(tokio::time::sleep(Duration::from_millis(500))) } - pub fn conn_command_tx(&self) -> async_channel::Sender { - self.conn_command_tx.clone() + pub fn conn_command_tx(&self) -> Sender { + self.conn_command_tx.as_ref().get_ref().clone() } fn is_shutdown(&self) -> bool { @@ -794,7 +794,8 @@ impl CaConn { if self.is_shutdown() { Ok(Ready(None)) } else { - match pin!(self.conn_command_rx).poll_next(cx) { + let rx = self.conn_command_rx.as_mut(); + match rx.poll_next(cx) { Ready(Some(a)) => { trace3!("handle_conn_command received a command {}", self.remote_addr_dbg); match a.kind { @@ -1570,7 +1571,7 @@ impl CaConn { *ch_s = ChannelState::FetchingSeriesId(created_state); // TODO handle error in different way. Should most likely not abort. let tx = SendSeriesLookup { - tx: self.conn_command_tx.clone(), + tx: self.conn_command_tx(), }; let query = ChannelInfoQuery { backend: self.backend.clone(), @@ -1934,7 +1935,7 @@ impl CaConn { let sd = &mut self.storage_insert_sender; if sd.is_idle() { if let Some(item) = self.insert_item_queue.pop_front() { - self.storage_insert_sender.send(item); + self.storage_insert_sender.as_mut().send_pin(item); } } if self.storage_insert_sender.is_sending() { @@ -1959,12 +1960,12 @@ impl CaConn { if self.is_shutdown() { Ok(Ready(None)) } else { - let sd = &mut self.channel_info_query_sending; + let sd = self.channel_info_query_sending.as_mut(); if sd.is_idle() { if let Some(item) = self.channel_info_query_queue.pop_front() { trace3!("send series query {item:?}"); - let sd = &mut self.channel_info_query_sending; - sd.send(item); + let sd = self.channel_info_query_sending.as_mut(); + sd.send_pin(item); } } let sd = &mut self.channel_info_query_sending; diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index aced6de..bcd8559 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -331,7 +331,6 @@ impl CanSendChannelInfoResult for SeriesLookupSender { } } -#[pin_project::pin_project] pub struct CaConnSet { backend: String, local_epics_hostname: String, @@ -339,16 +338,16 @@ pub struct CaConnSet { channel_states: ChannelStateMap, connset_inp_rx: Pin>>, channel_info_query_queue: VecDeque, - channel_info_query_sender: SenderPolling, + channel_info_query_sender: Pin>>, channel_info_query_tx: Option>, channel_info_res_tx: Pin>>>, channel_info_res_rx: Pin>>>, find_ioc_query_queue: VecDeque, - find_ioc_query_sender: SenderPolling, + find_ioc_query_sender: Pin>>, find_ioc_res_rx: Pin>>>, storage_insert_tx: Pin>>, storage_insert_queue: VecDeque, - storage_insert_sender: SenderPolling, + storage_insert_sender: Pin>>, ca_conn_res_tx: Pin>>, ca_conn_res_rx: Pin>>, connset_out_queue: VecDeque, @@ -400,16 +399,16 @@ impl CaConnSet { channel_states: ChannelStateMap::new(), connset_inp_rx: Box::pin(connset_inp_rx), channel_info_query_queue: VecDeque::new(), - channel_info_query_sender: SenderPolling::new(channel_info_query_tx.clone()), + channel_info_query_sender: Box::pin(SenderPolling::new(channel_info_query_tx.clone())), channel_info_query_tx: Some(channel_info_query_tx), channel_info_res_tx: Box::pin(channel_info_res_tx), channel_info_res_rx: Box::pin(channel_info_res_rx), find_ioc_query_queue: VecDeque::new(), - find_ioc_query_sender: SenderPolling::new(find_ioc_query_tx), + find_ioc_query_sender: Box::pin(SenderPolling::new(find_ioc_query_tx)), find_ioc_res_rx: Box::pin(find_ioc_res_rx), storage_insert_tx: Box::pin(storage_insert_tx.clone()), storage_insert_queue: VecDeque::new(), - storage_insert_sender: SenderPolling::new(storage_insert_tx), + storage_insert_sender: Box::pin(SenderPolling::new(storage_insert_tx)), ca_conn_res_tx: Box::pin(ca_conn_res_tx), ca_conn_res_rx: Box::pin(ca_conn_res_rx), shutdown_stopping: false, @@ -459,7 +458,7 @@ impl CaConnSet { // ); debug!("CaConnSet EndOfStream"); debug!("join ioc_finder_jh A {:?}", this.find_ioc_query_sender.len()); - this.find_ioc_query_sender.drop(); + this.find_ioc_query_sender.as_mut().drop(); debug!("join ioc_finder_jh B {:?}", this.find_ioc_query_sender.len()); this.ioc_finder_jh .await @@ -801,9 +800,9 @@ impl CaConnSet { debug!("handle_shutdown"); self.shutdown_stopping = true; self.find_ioc_res_rx.close(); - self.channel_info_query_sender.drop(); + self.channel_info_query_sender.as_mut().drop(); self.channel_info_query_tx = None; - self.find_ioc_query_sender.drop(); + self.find_ioc_query_sender.as_mut().drop(); for (_addr, res) in self.ca_conn_ress.iter() { let item = ConnCommand::shutdown(); // TODO not the nicest @@ -1519,7 +1518,7 @@ impl Stream for CaConnSet { if self.storage_insert_sender.is_idle() { if let Some(item) = self.storage_insert_queue.pop_front() { self.stats.logic_error().inc(); - self.storage_insert_sender.send(item); + self.storage_insert_sender.as_mut().send_pin(item); } } if self.storage_insert_sender.is_sending() { @@ -1540,7 +1539,7 @@ impl Stream for CaConnSet { if self.find_ioc_query_sender.is_idle() { if let Some(item) = self.find_ioc_query_queue.pop_front() { - self.find_ioc_query_sender.send(item); + self.find_ioc_query_sender.as_mut().send_pin(item); } } if self.find_ioc_query_sender.is_sending() { @@ -1561,7 +1560,7 @@ impl Stream for CaConnSet { if self.channel_info_query_sender.is_idle() { if let Some(item) = self.channel_info_query_queue.pop_front() { - self.channel_info_query_sender.send(item); + self.channel_info_query_sender.as_mut().send_pin(item); } } if self.channel_info_query_sender.is_sending() { @@ -1580,7 +1579,7 @@ impl Stream for CaConnSet { } } - match pin!(self.find_ioc_res_rx).poll_next(cx) { + match self.find_ioc_res_rx.as_mut().poll_next(cx) { Ready(Some(x)) => match self.handle_ioc_query_result(x) { Ok(()) => { have_progress = true; @@ -1593,7 +1592,7 @@ impl Stream for CaConnSet { } } - match pin!(self.ca_conn_res_rx).poll_next(cx) { + match self.ca_conn_res_rx.as_mut().poll_next(cx) { Ready(Some((addr, ev))) => match self.handle_ca_conn_event(addr, ev) { Ok(()) => { have_progress = true; @@ -1606,7 +1605,7 @@ impl Stream for CaConnSet { } } - match pin!(self.channel_info_res_rx).poll_next(cx) { + match self.channel_info_res_rx.as_mut().poll_next(cx) { Ready(Some(x)) => match self.handle_series_lookup_result(x) { Ok(()) => { have_progress = true; @@ -1619,7 +1618,7 @@ impl Stream for CaConnSet { } } - match pin!(self.connset_inp_rx).poll_next(cx) { + match self.connset_inp_rx.as_mut().poll_next(cx) { Ready(Some(x)) => match self.handle_event(x) { Ok(()) => { have_progress = true; diff --git a/netfetch/src/ca/connset_input_merge.rs b/netfetch/src/ca/connset_input_merge.rs index b040d88..92878f9 100644 --- a/netfetch/src/ca/connset_input_merge.rs +++ b/netfetch/src/ca/connset_input_merge.rs @@ -4,15 +4,21 @@ use crate::ca::connset::ConnSetCmd; use async_channel::Receiver; use dbpg::seriesbychannel::ChannelInfoResult; use err::Error; +use futures_util::Stream; +use pin_project::pin_project; use std::collections::VecDeque; use std::pin::pin; use std::pin::Pin; use std::task::Context; use std::task::Poll; +#[pin_project] pub struct InputMerge { + #[pin] inp1: Option>, + #[pin] inp2: Option>>, + #[pin] inp3: Option>>, } @@ -36,17 +42,18 @@ impl InputMerge { } } -impl futures_util::Stream for InputMerge { +impl Stream for InputMerge { type Item = CaConnSetEvent; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + let selfp = self.project(); let ret = { - if let Some(inp) = &mut self.inp3 { - match pin!(*inp).poll_next(cx) { + if let Some(inp) = selfp.inp3.as_pin_mut() { + match inp.poll_next(cx) { Ready(Some(x)) => Some(CaConnSetEvent::ConnSetCmd(todo!())), Ready(None) => { - self.inp2 = None; + // self.inp3 = None; None } Pending => None, @@ -58,11 +65,11 @@ impl futures_util::Stream for InputMerge { let ret = if let Some(x) = ret { Some(x) } else { - if let Some(inp) = &mut self.inp2 { - match pin!(*inp).poll_next(cx) { + if let Some(inp) = selfp.inp2.as_pin_mut() { + match inp.poll_next(cx) { Ready(Some(x)) => Some(CaConnSetEvent::ConnSetCmd(todo!())), Ready(None) => { - self.inp2 = None; + // self.inp2 = None; None } Pending => None, @@ -74,11 +81,11 @@ impl futures_util::Stream for InputMerge { if let Some(x) = ret { Ready(Some(x)) } else { - if let Some(inp) = &mut self.inp1 { - match pin!(*inp).poll_next(cx) { + if let Some(inp) = selfp.inp1.as_pin_mut() { + match inp.poll_next(cx) { Ready(Some(x)) => Ready(Some(x)), Ready(None) => { - self.inp1 = None; + // self.inp1 = None; Ready(None) } Pending => Pending, diff --git a/netfetch/src/senderpolling.rs b/netfetch/src/senderpolling.rs index 4c6079d..9168f89 100644 --- a/netfetch/src/senderpolling.rs +++ b/netfetch/src/senderpolling.rs @@ -71,9 +71,12 @@ impl SenderPolling { } } - pub fn drop(&mut self) { - self.fut = None; - self.sender = None; + pub fn drop(self: Pin<&mut Self>) { + unsafe { + let this = self.get_unchecked_mut(); + this.fut = None; + this.sender = None; + } } pub fn len(&self) -> Option { @@ -87,17 +90,21 @@ where { type Output = Result<(), Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { use Poll::*; - let this = self.project(); - match this.fut.as_pin_mut() { + let mut this = self.project(); + match this.fut.as_mut().as_pin_mut() { Some(fut) => match fut.poll(cx) { Ready(Ok(())) => { - self.fut = None; + unsafe { + *this.fut.get_unchecked_mut() = None; + } Ready(Ok(())) } Ready(Err(e)) => { - self.fut = None; + unsafe { + *this.fut.get_unchecked_mut() = None; + } Ready(Err(Error::Closed(e.0))) } Pending => Pending,