WIP checks

This commit is contained in:
Dominik Werder
2023-10-30 13:52:41 +01:00
parent a7b76c9868
commit f7db475b30
6 changed files with 77 additions and 64 deletions

View File

@@ -12,6 +12,7 @@ serde_json = "1.0"
bytes = "1.4.0"
md-5 = "0.10.5"
hex = "0.4.3"
pin-project = "1"
log = { path = "../log" }
series = { path = "../series" }
stats = { path = "../stats" }

View File

@@ -9,7 +9,6 @@ use err::thiserror;
use err::ThisError;
use futures_util::pin_mut;
use futures_util::Stream;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::timeunits::SEC;
use serde_json::Value as JsVal;
@@ -116,8 +115,8 @@ pub struct Zmtp {
peer_ver: (u8, u8),
frames: Vec<ZmtpFrame>,
inp_eof: bool,
data_tx: Sender<u32>,
data_rx: Receiver<u32>,
data_tx: Pin<Box<Sender<u32>>>,
data_rx: Pin<Box<Receiver<u32>>>,
input_state: Vec<InpState>,
input_state_ix: usize,
conn_state_log: Vec<ConnState>,
@@ -143,8 +142,8 @@ impl Zmtp {
peer_ver: (0, 0),
frames: Vec::new(),
inp_eof: false,
data_tx: tx,
data_rx: rx,
data_tx: Box::pin(tx),
data_rx: Box::pin(rx),
input_state: vec![0; 64].iter().map(|_| InpState::default()).collect(),
input_state_ix: 0,
conn_state_log: vec![0; 64].iter().map(|_| ConnState::InitSend).collect(),
@@ -153,7 +152,7 @@ impl Zmtp {
}
pub fn out_channel(&self) -> Sender<u32> {
self.data_tx.clone()
self.data_tx.as_ref().get_ref().clone()
}
fn inpbuf_conn(&mut self, need_min: usize) -> Result<(&mut TcpStream, ReadBuf), Error> {
@@ -162,10 +161,6 @@ impl Zmtp {
Ok((&mut self.conn, buf))
}
fn outbuf_conn(&mut self) -> (&mut TcpStream, &[u8]) {
(&mut self.conn, self.outbuf.data())
}
#[allow(unused)]
#[inline(always)]
fn record_input_state(&mut self) {}
@@ -214,8 +209,8 @@ impl Zmtp {
let mut item_count = 0;
// TODO should I better keep one serialized item in Self so that I know how much space it needs?
let serialized: Int<Result<(), Error>> = if self.out_enable && self.outbuf.wcap() >= self.outbuf.cap() / 2 {
let data_rx = std::pin::pin!(self.data_rx);
match data_rx.poll_next(cx) {
let rx = self.data_rx.as_mut();
match rx.poll_next(cx) {
Ready(Some(_item)) => {
// TODO item should be something that we can convert into a zmtp message.
Int::Empty
@@ -230,7 +225,10 @@ impl Zmtp {
let write: Int<Result<(), Error>> = if item_count > 0 {
Int::NoWork
} else if self.outbuf.len() > 0 {
let (w, b) = self.outbuf_conn();
fn connout(this: &mut Zmtp) -> (&mut TcpStream, &[u8]) {
(&mut this.conn, this.outbuf.data())
}
let (w, b) = connout(&mut self);
pin_mut!(w);
match w.poll_write(cx, b) {
Ready(k) => match k {

View File

@@ -509,8 +509,8 @@ pub struct CaConn {
local_epics_hostname: String,
stats: Arc<CaConnStats>,
insert_ivl_min_mus: u64,
conn_command_tx: Sender<ConnCommand>,
conn_command_rx: Receiver<ConnCommand>,
conn_command_tx: Pin<Box<Sender<ConnCommand>>>,
conn_command_rx: Pin<Box<Receiver<ConnCommand>>>,
conn_backoff: f32,
conn_backoff_beg: f32,
inserts_counter: u64,
@@ -518,10 +518,10 @@ pub struct CaConn {
ioc_ping_last: Instant,
ioc_ping_next: Instant,
ioc_ping_start: Option<Instant>,
storage_insert_sender: SenderPolling<QueryItem>,
storage_insert_sender: Pin<Box<SenderPolling<QueryItem>>>,
ca_conn_event_out_queue: VecDeque<CaConnEvent>,
channel_info_query_queue: VecDeque<ChannelInfoQuery>,
channel_info_query_sending: SenderPolling<ChannelInfoQuery>,
channel_info_query_sending: Pin<Box<SenderPolling<ChannelInfoQuery>>>,
time_binners: BTreeMap<Cid, ConnTimeBin>,
thr_msg_poll: ThrottleTrace,
ca_proto_stats: Arc<CaProtoStats>,
@@ -567,8 +567,8 @@ impl CaConn {
local_epics_hostname,
stats,
insert_ivl_min_mus: 1000 * 6,
conn_command_tx: cq_tx,
conn_command_rx: cq_rx,
conn_command_tx: Box::pin(cq_tx),
conn_command_rx: Box::pin(cq_rx),
conn_backoff: 0.02,
conn_backoff_beg: 0.02,
inserts_counter: 0,
@@ -576,10 +576,10 @@ impl CaConn {
ioc_ping_last: Instant::now(),
ioc_ping_next: Instant::now() + Self::ioc_ping_ivl_rng(&mut rng),
ioc_ping_start: None,
storage_insert_sender: SenderPolling::new(storage_insert_tx),
storage_insert_sender: Box::pin(SenderPolling::new(storage_insert_tx)),
ca_conn_event_out_queue: VecDeque::new(),
channel_info_query_queue: VecDeque::new(),
channel_info_query_sending: SenderPolling::new(channel_info_query_tx),
channel_info_query_sending: Box::pin(SenderPolling::new(channel_info_query_tx)),
time_binners: BTreeMap::new(),
thr_msg_poll: ThrottleTrace::new(Duration::from_millis(10000)),
ca_proto_stats,
@@ -596,8 +596,8 @@ impl CaConn {
Box::pin(tokio::time::sleep(Duration::from_millis(500)))
}
pub fn conn_command_tx(&self) -> async_channel::Sender<ConnCommand> {
self.conn_command_tx.clone()
pub fn conn_command_tx(&self) -> Sender<ConnCommand> {
self.conn_command_tx.as_ref().get_ref().clone()
}
fn is_shutdown(&self) -> bool {
@@ -794,7 +794,8 @@ impl CaConn {
if self.is_shutdown() {
Ok(Ready(None))
} else {
match pin!(self.conn_command_rx).poll_next(cx) {
let rx = self.conn_command_rx.as_mut();
match rx.poll_next(cx) {
Ready(Some(a)) => {
trace3!("handle_conn_command received a command {}", self.remote_addr_dbg);
match a.kind {
@@ -1570,7 +1571,7 @@ impl CaConn {
*ch_s = ChannelState::FetchingSeriesId(created_state);
// TODO handle error in different way. Should most likely not abort.
let tx = SendSeriesLookup {
tx: self.conn_command_tx.clone(),
tx: self.conn_command_tx(),
};
let query = ChannelInfoQuery {
backend: self.backend.clone(),
@@ -1934,7 +1935,7 @@ impl CaConn {
let sd = &mut self.storage_insert_sender;
if sd.is_idle() {
if let Some(item) = self.insert_item_queue.pop_front() {
self.storage_insert_sender.send(item);
self.storage_insert_sender.as_mut().send_pin(item);
}
}
if self.storage_insert_sender.is_sending() {
@@ -1959,12 +1960,12 @@ impl CaConn {
if self.is_shutdown() {
Ok(Ready(None))
} else {
let sd = &mut self.channel_info_query_sending;
let sd = self.channel_info_query_sending.as_mut();
if sd.is_idle() {
if let Some(item) = self.channel_info_query_queue.pop_front() {
trace3!("send series query {item:?}");
let sd = &mut self.channel_info_query_sending;
sd.send(item);
let sd = self.channel_info_query_sending.as_mut();
sd.send_pin(item);
}
}
let sd = &mut self.channel_info_query_sending;

View File

@@ -331,7 +331,6 @@ impl CanSendChannelInfoResult for SeriesLookupSender {
}
}
#[pin_project::pin_project]
pub struct CaConnSet {
backend: String,
local_epics_hostname: String,
@@ -339,16 +338,16 @@ pub struct CaConnSet {
channel_states: ChannelStateMap,
connset_inp_rx: Pin<Box<Receiver<CaConnSetEvent>>>,
channel_info_query_queue: VecDeque<ChannelInfoQuery>,
channel_info_query_sender: SenderPolling<ChannelInfoQuery>,
channel_info_query_sender: Pin<Box<SenderPolling<ChannelInfoQuery>>>,
channel_info_query_tx: Option<Sender<ChannelInfoQuery>>,
channel_info_res_tx: Pin<Box<Sender<Result<ChannelInfoResult, Error>>>>,
channel_info_res_rx: Pin<Box<Receiver<Result<ChannelInfoResult, Error>>>>,
find_ioc_query_queue: VecDeque<IocAddrQuery>,
find_ioc_query_sender: SenderPolling<IocAddrQuery>,
find_ioc_query_sender: Pin<Box<SenderPolling<IocAddrQuery>>>,
find_ioc_res_rx: Pin<Box<Receiver<VecDeque<FindIocRes>>>>,
storage_insert_tx: Pin<Box<Sender<QueryItem>>>,
storage_insert_queue: VecDeque<QueryItem>,
storage_insert_sender: SenderPolling<QueryItem>,
storage_insert_sender: Pin<Box<SenderPolling<QueryItem>>>,
ca_conn_res_tx: Pin<Box<Sender<(SocketAddr, CaConnEvent)>>>,
ca_conn_res_rx: Pin<Box<Receiver<(SocketAddr, CaConnEvent)>>>,
connset_out_queue: VecDeque<CaConnSetItem>,
@@ -400,16 +399,16 @@ impl CaConnSet {
channel_states: ChannelStateMap::new(),
connset_inp_rx: Box::pin(connset_inp_rx),
channel_info_query_queue: VecDeque::new(),
channel_info_query_sender: SenderPolling::new(channel_info_query_tx.clone()),
channel_info_query_sender: Box::pin(SenderPolling::new(channel_info_query_tx.clone())),
channel_info_query_tx: Some(channel_info_query_tx),
channel_info_res_tx: Box::pin(channel_info_res_tx),
channel_info_res_rx: Box::pin(channel_info_res_rx),
find_ioc_query_queue: VecDeque::new(),
find_ioc_query_sender: SenderPolling::new(find_ioc_query_tx),
find_ioc_query_sender: Box::pin(SenderPolling::new(find_ioc_query_tx)),
find_ioc_res_rx: Box::pin(find_ioc_res_rx),
storage_insert_tx: Box::pin(storage_insert_tx.clone()),
storage_insert_queue: VecDeque::new(),
storage_insert_sender: SenderPolling::new(storage_insert_tx),
storage_insert_sender: Box::pin(SenderPolling::new(storage_insert_tx)),
ca_conn_res_tx: Box::pin(ca_conn_res_tx),
ca_conn_res_rx: Box::pin(ca_conn_res_rx),
shutdown_stopping: false,
@@ -459,7 +458,7 @@ impl CaConnSet {
// );
debug!("CaConnSet EndOfStream");
debug!("join ioc_finder_jh A {:?}", this.find_ioc_query_sender.len());
this.find_ioc_query_sender.drop();
this.find_ioc_query_sender.as_mut().drop();
debug!("join ioc_finder_jh B {:?}", this.find_ioc_query_sender.len());
this.ioc_finder_jh
.await
@@ -801,9 +800,9 @@ impl CaConnSet {
debug!("handle_shutdown");
self.shutdown_stopping = true;
self.find_ioc_res_rx.close();
self.channel_info_query_sender.drop();
self.channel_info_query_sender.as_mut().drop();
self.channel_info_query_tx = None;
self.find_ioc_query_sender.drop();
self.find_ioc_query_sender.as_mut().drop();
for (_addr, res) in self.ca_conn_ress.iter() {
let item = ConnCommand::shutdown();
// TODO not the nicest
@@ -1519,7 +1518,7 @@ impl Stream for CaConnSet {
if self.storage_insert_sender.is_idle() {
if let Some(item) = self.storage_insert_queue.pop_front() {
self.stats.logic_error().inc();
self.storage_insert_sender.send(item);
self.storage_insert_sender.as_mut().send_pin(item);
}
}
if self.storage_insert_sender.is_sending() {
@@ -1540,7 +1539,7 @@ impl Stream for CaConnSet {
if self.find_ioc_query_sender.is_idle() {
if let Some(item) = self.find_ioc_query_queue.pop_front() {
self.find_ioc_query_sender.send(item);
self.find_ioc_query_sender.as_mut().send_pin(item);
}
}
if self.find_ioc_query_sender.is_sending() {
@@ -1561,7 +1560,7 @@ impl Stream for CaConnSet {
if self.channel_info_query_sender.is_idle() {
if let Some(item) = self.channel_info_query_queue.pop_front() {
self.channel_info_query_sender.send(item);
self.channel_info_query_sender.as_mut().send_pin(item);
}
}
if self.channel_info_query_sender.is_sending() {
@@ -1580,7 +1579,7 @@ impl Stream for CaConnSet {
}
}
match pin!(self.find_ioc_res_rx).poll_next(cx) {
match self.find_ioc_res_rx.as_mut().poll_next(cx) {
Ready(Some(x)) => match self.handle_ioc_query_result(x) {
Ok(()) => {
have_progress = true;
@@ -1593,7 +1592,7 @@ impl Stream for CaConnSet {
}
}
match pin!(self.ca_conn_res_rx).poll_next(cx) {
match self.ca_conn_res_rx.as_mut().poll_next(cx) {
Ready(Some((addr, ev))) => match self.handle_ca_conn_event(addr, ev) {
Ok(()) => {
have_progress = true;
@@ -1606,7 +1605,7 @@ impl Stream for CaConnSet {
}
}
match pin!(self.channel_info_res_rx).poll_next(cx) {
match self.channel_info_res_rx.as_mut().poll_next(cx) {
Ready(Some(x)) => match self.handle_series_lookup_result(x) {
Ok(()) => {
have_progress = true;
@@ -1619,7 +1618,7 @@ impl Stream for CaConnSet {
}
}
match pin!(self.connset_inp_rx).poll_next(cx) {
match self.connset_inp_rx.as_mut().poll_next(cx) {
Ready(Some(x)) => match self.handle_event(x) {
Ok(()) => {
have_progress = true;

View File

@@ -4,15 +4,21 @@ use crate::ca::connset::ConnSetCmd;
use async_channel::Receiver;
use dbpg::seriesbychannel::ChannelInfoResult;
use err::Error;
use futures_util::Stream;
use pin_project::pin_project;
use std::collections::VecDeque;
use std::pin::pin;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
#[pin_project]
pub struct InputMerge {
#[pin]
inp1: Option<Receiver<CaConnSetEvent>>,
#[pin]
inp2: Option<Receiver<VecDeque<FindIocRes>>>,
#[pin]
inp3: Option<Receiver<Result<ChannelInfoResult, Error>>>,
}
@@ -36,17 +42,18 @@ impl InputMerge {
}
}
impl futures_util::Stream for InputMerge {
impl Stream for InputMerge {
type Item = CaConnSetEvent;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let selfp = self.project();
let ret = {
if let Some(inp) = &mut self.inp3 {
match pin!(*inp).poll_next(cx) {
if let Some(inp) = selfp.inp3.as_pin_mut() {
match inp.poll_next(cx) {
Ready(Some(x)) => Some(CaConnSetEvent::ConnSetCmd(todo!())),
Ready(None) => {
self.inp2 = None;
// self.inp3 = None;
None
}
Pending => None,
@@ -58,11 +65,11 @@ impl futures_util::Stream for InputMerge {
let ret = if let Some(x) = ret {
Some(x)
} else {
if let Some(inp) = &mut self.inp2 {
match pin!(*inp).poll_next(cx) {
if let Some(inp) = selfp.inp2.as_pin_mut() {
match inp.poll_next(cx) {
Ready(Some(x)) => Some(CaConnSetEvent::ConnSetCmd(todo!())),
Ready(None) => {
self.inp2 = None;
// self.inp2 = None;
None
}
Pending => None,
@@ -74,11 +81,11 @@ impl futures_util::Stream for InputMerge {
if let Some(x) = ret {
Ready(Some(x))
} else {
if let Some(inp) = &mut self.inp1 {
match pin!(*inp).poll_next(cx) {
if let Some(inp) = selfp.inp1.as_pin_mut() {
match inp.poll_next(cx) {
Ready(Some(x)) => Ready(Some(x)),
Ready(None) => {
self.inp1 = None;
// self.inp1 = None;
Ready(None)
}
Pending => Pending,

View File

@@ -71,9 +71,12 @@ impl<T> SenderPolling<T> {
}
}
pub fn drop(&mut self) {
self.fut = None;
self.sender = None;
pub fn drop(self: Pin<&mut Self>) {
unsafe {
let this = self.get_unchecked_mut();
this.fut = None;
this.sender = None;
}
}
pub fn len(&self) -> Option<usize> {
@@ -87,17 +90,21 @@ where
{
type Output = Result<(), Error<T>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
let this = self.project();
match this.fut.as_pin_mut() {
let mut this = self.project();
match this.fut.as_mut().as_pin_mut() {
Some(fut) => match fut.poll(cx) {
Ready(Ok(())) => {
self.fut = None;
unsafe {
*this.fut.get_unchecked_mut() = None;
}
Ready(Ok(()))
}
Ready(Err(e)) => {
self.fut = None;
unsafe {
*this.fut.get_unchecked_mut() = None;
}
Ready(Err(Error::Closed(e.0)))
}
Pending => Pending,