From 298e9b4faa4ec8c77b1575e76f70a96850df54bc Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 13 Nov 2023 16:26:34 +0100 Subject: [PATCH] WIP --- netfetch/src/ca/conn.rs | 47 +++++++++++-------------- netfetch/src/ca/connset.rs | 64 +++++++++++++++++++---------------- netfetch/src/metrics.rs | 11 ++++++ netfetch/src/senderpolling.rs | 28 +++++++++++++++ stats/src/stats.rs | 1 + 5 files changed, 94 insertions(+), 57 deletions(-) diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 9548944..d39a05e 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -664,6 +664,7 @@ impl CaConn { } fn cmd_check_health(&mut self) { + debug!("cmd_check_health"); match self.check_channels_alive() { Ok(_) => {} Err(e) => { @@ -856,38 +857,23 @@ impl CaConn { self.stats.clone() } - fn channel_add_expl( - channel: String, - cssid: ChannelStatusSeriesId, - channels: &mut BTreeMap, - cid_by_name: &mut BTreeMap, - name_by_cid: &mut BTreeMap, - cid_store: &mut CidStore, - init_state_count: &mut u64, - ) { - if cid_by_name.contains_key(&channel) { + pub fn channel_add(&mut self, channel: String, cssid: ChannelStatusSeriesId) { + if self.cid_by_name.contains_key(&channel) { return; } - let cid = Self::cid_by_name_expl(&channel, cid_by_name, name_by_cid, cid_store); - if channels.contains_key(&cid) { - error!("logic error"); - } else { - channels.insert(cid, ChannelState::Init(cssid)); - // TODO do not count, use separate queue for those channels. - *init_state_count += 1; - } - } - - pub fn channel_add(&mut self, channel: String, cssid: ChannelStatusSeriesId) { - Self::channel_add_expl( - channel, - cssid, - &mut self.channels, + let cid = Self::cid_by_name_expl( + &channel, &mut self.cid_by_name, &mut self.name_by_cid, &mut self.cid_store, - &mut self.init_state_count, - ) + ); + if self.channels.contains_key(&cid) { + error!("logic error"); + } else { + self.channels.insert(cid, ChannelState::Init(cssid)); + // TODO do not count, use separate queue for those channels. + self.init_state_count += 1; + } } pub fn channel_remove(&mut self, channel: String) { @@ -1547,7 +1533,12 @@ impl CaConn { .add((ts2.duration_since(ts1) * MS as u32).as_secs()); ts1 = ts2; let tsnow = Instant::now(); - let res = match self.proto.as_mut().unwrap().poll_next_unpin(cx) { + let proto = if let Some(x) = self.proto.as_mut() { + x + } else { + return Ready(Some(Err(Error::with_msg_no_trace("handle_peer_ready but no proto")))); + }; + let res = match proto.poll_next_unpin(cx) { Ready(Some(Ok(k))) => { match k { CaItem::Msg(camsg) => { diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 6ed66b5..45bba2d 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -117,7 +117,7 @@ pub struct CmdId(SocketAddrV4, usize); pub struct CaConnRes { state: CaConnState, - sender: Sender, + sender: Pin>>, stats: Arc, cmd_queue: VecDeque, // TODO await on jh @@ -364,7 +364,6 @@ pub struct CaConnSet { did_connset_out_queue: bool, ca_proto_stats: Arc, rogue_channel_count: u64, - have_conn_command: bool, connect_fail_count: usize, } @@ -426,7 +425,6 @@ impl CaConnSet { did_connset_out_queue: false, ca_proto_stats: ca_proto_stats.clone(), rogue_channel_count: 0, - have_conn_command: false, connect_fail_count: 0, }; // TODO await on jh @@ -637,7 +635,6 @@ impl CaConnSet { let conn_ress = self.ca_conn_ress.get_mut(&cmd.addr).unwrap(); let cmd = ConnCommand::channel_add(cmd.name, cmd.cssid); conn_ress.cmd_queue.push_back(cmd); - self.have_conn_command = true; } } } @@ -746,6 +743,7 @@ impl CaConnSet { } fn handle_check_health(&mut self, ts1: Instant) -> Result<(), Error> { + debug!("handle_check_health"); if self.shutdown_stopping { return Ok(()); } @@ -764,8 +762,12 @@ impl CaConnSet { for (_, res) in self.ca_conn_ress.iter_mut() { let item = ConnCommand::check_health(); res.cmd_queue.push_back(item); + debug!( + "handle_check_health pushed check command {:?} {:?}", + res.cmd_queue.len(), + res.sender.len() + ); } - self.have_conn_command = true; let ts2 = Instant::now(); let item = CaConnSetItem::Healthy(ts1, ts2); @@ -806,8 +808,8 @@ impl CaConnSet { for (_addr, res) in self.ca_conn_ress.iter() { let item = ConnCommand::shutdown(); // TODO not the nicest - let tx = res.sender.clone(); - tokio::spawn(async move { tx.send(item).await }); + let mut tx = res.sender.clone(); + tokio::spawn(async move { tx.as_mut().send_async_pin(item).await }); } Ok(()) } @@ -820,6 +822,7 @@ impl CaConnSet { } fn apply_ca_conn_health_update(&mut self, addr: SocketAddr, res: CheckHealthResult) -> Result<(), Error> { + debug!("apply_ca_conn_health_update {addr}"); let tsnow = SystemTime::now(); self.rogue_channel_count = 0; for (k, v) in res.channel_statuses { @@ -979,7 +982,7 @@ impl CaConnSet { let jh = tokio::spawn(Self::ca_conn_item_merge(conn, tx1, tx2, addr, self.stats.clone())); let ca_conn_res = CaConnRes { state: CaConnState::new(CaConnStateValue::Fresh), - sender: conn_tx, + sender: Box::pin(conn_tx.into()), stats: conn_stats, cmd_queue: VecDeque::new(), jh, @@ -1340,7 +1343,6 @@ impl CaConnSet { if let Some(g) = self.ca_conn_ress.get_mut(&addr) { let cmd = ConnCommand::channel_remove(ch.id().into()); g.cmd_queue.push_back(cmd); - self.have_conn_command = true; } let cmd = ChannelRemove { name: ch.id().into() }; self.handle_remove_channel(cmd)?; @@ -1421,27 +1423,31 @@ impl CaConnSet { (search_pending, assigned_without_health_update) } - fn try_push_ca_conn_cmds(&mut self) { - if self.have_conn_command { - self.have_conn_command = false; - for (_, v) in self.ca_conn_ress.iter_mut() { + fn try_push_ca_conn_cmds(&mut self, cx: &mut Context) { + use Poll::*; + for (_, v) in self.ca_conn_ress.iter_mut() { + 'level2: loop { + let tx = &mut v.sender; + if v.cmd_queue.len() != 0 || tx.is_sending() { + debug!("try_push_ca_conn_cmds {:?} {:?}", v.cmd_queue.len(), tx.len()); + } loop { - break if let Some(item) = v.cmd_queue.pop_front() { - match v.sender.try_send(item) { - Ok(()) => continue, - Err(e) => match e { - async_channel::TrySendError::Full(e) => { - self.stats.try_push_ca_conn_cmds_full.inc(); - v.cmd_queue.push_front(e); - self.have_conn_command = true; - } - async_channel::TrySendError::Closed(_) => { - // TODO - self.stats.try_push_ca_conn_cmds_closed.inc(); - self.have_conn_command = true; - } - }, + break if tx.is_sending() { + match tx.poll_unpin(cx) { + Ready(Ok(())) => { + self.stats.try_push_ca_conn_cmds_sent.inc(); + continue; + } + Ready(Err(e)) => { + error!("try_push_ca_conn_cmds {e}"); + } + Pending => { + break 'level2; + } } + } else if let Some(item) = v.cmd_queue.pop_front() { + tx.as_mut().send_pin(item); + continue; }; } } @@ -1479,7 +1485,7 @@ impl Stream for CaConnSet { let mut have_pending = false; let mut have_progress = false; - self.try_push_ca_conn_cmds(); + self.try_push_ca_conn_cmds(cx); if self.did_connset_out_queue { self.did_connset_out_queue = false; diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index cb510d7..4a53d6d 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -216,6 +216,17 @@ fn make_routes(dcom: Arc, connset_cmd_tx: Sender, st } }), ) + .route( + "/metricbeat", + get({ + // + || async move { + axum::Json(serde_json::json!({ + "v1": 42_u32, + })) + } + }), + ) .route( "/daqingest/find/channel", get({ diff --git a/netfetch/src/senderpolling.rs b/netfetch/src/senderpolling.rs index 9168f89..0ce781e 100644 --- a/netfetch/src/senderpolling.rs +++ b/netfetch/src/senderpolling.rs @@ -1,4 +1,5 @@ use async_channel::Send; +use async_channel::SendError; use async_channel::Sender; use err::thiserror; use futures_util::Future; @@ -82,6 +83,20 @@ impl SenderPolling { pub fn len(&self) -> Option { self.sender.as_ref().map(|x| x.len()) } + + pub async fn send_async_pin(self: Pin<&mut Self>, item: T) -> Result<(), SendError> { + unsafe { Pin::get_unchecked_mut(self) }.send_async(item).await + } + + pub async fn send_async(&mut self, item: T) -> Result<(), SendError> { + if self.is_sending() { + let fut = self.fut.take().unwrap(); + if let Err(e) = fut.await { + return Err(e); + } + } + self.sender.as_ref().unwrap().send(item).await + } } impl Future for SenderPolling @@ -113,3 +128,16 @@ where } } } + +impl Clone for SenderPolling { + fn clone(&self) -> Self { + let sender = self.sender.as_ref().unwrap().as_ref().clone(); + SenderPolling::new(sender) + } +} + +impl From> for SenderPolling { + fn from(value: Sender) -> Self { + SenderPolling::new(value) + } +} diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 36bc7d0..26d4635 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -270,6 +270,7 @@ stats_proc::stats_struct!(( ca_conn_eos_ok, ca_conn_eos_unexpected, response_tx_fail, + try_push_ca_conn_cmds_sent, try_push_ca_conn_cmds_full, try_push_ca_conn_cmds_closed, logic_error,