This commit is contained in:
Dominik Werder
2023-11-13 16:26:34 +01:00
parent 8351dabb72
commit 298e9b4faa
5 changed files with 94 additions and 57 deletions

View File

@@ -664,6 +664,7 @@ impl CaConn {
}
fn cmd_check_health(&mut self) {
debug!("cmd_check_health");
match self.check_channels_alive() {
Ok(_) => {}
Err(e) => {
@@ -856,38 +857,23 @@ impl CaConn {
self.stats.clone()
}
fn channel_add_expl(
channel: String,
cssid: ChannelStatusSeriesId,
channels: &mut BTreeMap<Cid, ChannelState>,
cid_by_name: &mut BTreeMap<String, Cid>,
name_by_cid: &mut BTreeMap<Cid, String>,
cid_store: &mut CidStore,
init_state_count: &mut u64,
) {
if cid_by_name.contains_key(&channel) {
pub fn channel_add(&mut self, channel: String, cssid: ChannelStatusSeriesId) {
if self.cid_by_name.contains_key(&channel) {
return;
}
let cid = Self::cid_by_name_expl(&channel, cid_by_name, name_by_cid, cid_store);
if channels.contains_key(&cid) {
error!("logic error");
} else {
channels.insert(cid, ChannelState::Init(cssid));
// TODO do not count, use separate queue for those channels.
*init_state_count += 1;
}
}
pub fn channel_add(&mut self, channel: String, cssid: ChannelStatusSeriesId) {
Self::channel_add_expl(
channel,
cssid,
&mut self.channels,
let cid = Self::cid_by_name_expl(
&channel,
&mut self.cid_by_name,
&mut self.name_by_cid,
&mut self.cid_store,
&mut self.init_state_count,
)
);
if self.channels.contains_key(&cid) {
error!("logic error");
} else {
self.channels.insert(cid, ChannelState::Init(cssid));
// TODO do not count, use separate queue for those channels.
self.init_state_count += 1;
}
}
pub fn channel_remove(&mut self, channel: String) {
@@ -1547,7 +1533,12 @@ impl CaConn {
.add((ts2.duration_since(ts1) * MS as u32).as_secs());
ts1 = ts2;
let tsnow = Instant::now();
let res = match self.proto.as_mut().unwrap().poll_next_unpin(cx) {
let proto = if let Some(x) = self.proto.as_mut() {
x
} else {
return Ready(Some(Err(Error::with_msg_no_trace("handle_peer_ready but no proto"))));
};
let res = match proto.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
match k {
CaItem::Msg(camsg) => {

View File

@@ -117,7 +117,7 @@ pub struct CmdId(SocketAddrV4, usize);
pub struct CaConnRes {
state: CaConnState,
sender: Sender<ConnCommand>,
sender: Pin<Box<SenderPolling<ConnCommand>>>,
stats: Arc<CaConnStats>,
cmd_queue: VecDeque<ConnCommand>,
// TODO await on jh
@@ -364,7 +364,6 @@ pub struct CaConnSet {
did_connset_out_queue: bool,
ca_proto_stats: Arc<CaProtoStats>,
rogue_channel_count: u64,
have_conn_command: bool,
connect_fail_count: usize,
}
@@ -426,7 +425,6 @@ impl CaConnSet {
did_connset_out_queue: false,
ca_proto_stats: ca_proto_stats.clone(),
rogue_channel_count: 0,
have_conn_command: false,
connect_fail_count: 0,
};
// TODO await on jh
@@ -637,7 +635,6 @@ impl CaConnSet {
let conn_ress = self.ca_conn_ress.get_mut(&cmd.addr).unwrap();
let cmd = ConnCommand::channel_add(cmd.name, cmd.cssid);
conn_ress.cmd_queue.push_back(cmd);
self.have_conn_command = true;
}
}
}
@@ -746,6 +743,7 @@ impl CaConnSet {
}
fn handle_check_health(&mut self, ts1: Instant) -> Result<(), Error> {
debug!("handle_check_health");
if self.shutdown_stopping {
return Ok(());
}
@@ -764,8 +762,12 @@ impl CaConnSet {
for (_, res) in self.ca_conn_ress.iter_mut() {
let item = ConnCommand::check_health();
res.cmd_queue.push_back(item);
debug!(
"handle_check_health pushed check command {:?} {:?}",
res.cmd_queue.len(),
res.sender.len()
);
}
self.have_conn_command = true;
let ts2 = Instant::now();
let item = CaConnSetItem::Healthy(ts1, ts2);
@@ -806,8 +808,8 @@ impl CaConnSet {
for (_addr, res) in self.ca_conn_ress.iter() {
let item = ConnCommand::shutdown();
// TODO not the nicest
let tx = res.sender.clone();
tokio::spawn(async move { tx.send(item).await });
let mut tx = res.sender.clone();
tokio::spawn(async move { tx.as_mut().send_async_pin(item).await });
}
Ok(())
}
@@ -820,6 +822,7 @@ impl CaConnSet {
}
fn apply_ca_conn_health_update(&mut self, addr: SocketAddr, res: CheckHealthResult) -> Result<(), Error> {
debug!("apply_ca_conn_health_update {addr}");
let tsnow = SystemTime::now();
self.rogue_channel_count = 0;
for (k, v) in res.channel_statuses {
@@ -979,7 +982,7 @@ impl CaConnSet {
let jh = tokio::spawn(Self::ca_conn_item_merge(conn, tx1, tx2, addr, self.stats.clone()));
let ca_conn_res = CaConnRes {
state: CaConnState::new(CaConnStateValue::Fresh),
sender: conn_tx,
sender: Box::pin(conn_tx.into()),
stats: conn_stats,
cmd_queue: VecDeque::new(),
jh,
@@ -1340,7 +1343,6 @@ impl CaConnSet {
if let Some(g) = self.ca_conn_ress.get_mut(&addr) {
let cmd = ConnCommand::channel_remove(ch.id().into());
g.cmd_queue.push_back(cmd);
self.have_conn_command = true;
}
let cmd = ChannelRemove { name: ch.id().into() };
self.handle_remove_channel(cmd)?;
@@ -1421,27 +1423,31 @@ impl CaConnSet {
(search_pending, assigned_without_health_update)
}
fn try_push_ca_conn_cmds(&mut self) {
if self.have_conn_command {
self.have_conn_command = false;
for (_, v) in self.ca_conn_ress.iter_mut() {
fn try_push_ca_conn_cmds(&mut self, cx: &mut Context) {
use Poll::*;
for (_, v) in self.ca_conn_ress.iter_mut() {
'level2: loop {
let tx = &mut v.sender;
if v.cmd_queue.len() != 0 || tx.is_sending() {
debug!("try_push_ca_conn_cmds {:?} {:?}", v.cmd_queue.len(), tx.len());
}
loop {
break if let Some(item) = v.cmd_queue.pop_front() {
match v.sender.try_send(item) {
Ok(()) => continue,
Err(e) => match e {
async_channel::TrySendError::Full(e) => {
self.stats.try_push_ca_conn_cmds_full.inc();
v.cmd_queue.push_front(e);
self.have_conn_command = true;
}
async_channel::TrySendError::Closed(_) => {
// TODO
self.stats.try_push_ca_conn_cmds_closed.inc();
self.have_conn_command = true;
}
},
break if tx.is_sending() {
match tx.poll_unpin(cx) {
Ready(Ok(())) => {
self.stats.try_push_ca_conn_cmds_sent.inc();
continue;
}
Ready(Err(e)) => {
error!("try_push_ca_conn_cmds {e}");
}
Pending => {
break 'level2;
}
}
} else if let Some(item) = v.cmd_queue.pop_front() {
tx.as_mut().send_pin(item);
continue;
};
}
}
@@ -1479,7 +1485,7 @@ impl Stream for CaConnSet {
let mut have_pending = false;
let mut have_progress = false;
self.try_push_ca_conn_cmds();
self.try_push_ca_conn_cmds(cx);
if self.did_connset_out_queue {
self.did_connset_out_queue = false;

View File

@@ -216,6 +216,17 @@ fn make_routes(dcom: Arc<DaemonComm>, connset_cmd_tx: Sender<CaConnSetEvent>, st
}
}),
)
.route(
"/metricbeat",
get({
//
|| async move {
axum::Json(serde_json::json!({
"v1": 42_u32,
}))
}
}),
)
.route(
"/daqingest/find/channel",
get({

View File

@@ -1,4 +1,5 @@
use async_channel::Send;
use async_channel::SendError;
use async_channel::Sender;
use err::thiserror;
use futures_util::Future;
@@ -82,6 +83,20 @@ impl<T> SenderPolling<T> {
pub fn len(&self) -> Option<usize> {
self.sender.as_ref().map(|x| x.len())
}
pub async fn send_async_pin(self: Pin<&mut Self>, item: T) -> Result<(), SendError<T>> {
unsafe { Pin::get_unchecked_mut(self) }.send_async(item).await
}
pub async fn send_async(&mut self, item: T) -> Result<(), SendError<T>> {
if self.is_sending() {
let fut = self.fut.take().unwrap();
if let Err(e) = fut.await {
return Err(e);
}
}
self.sender.as_ref().unwrap().send(item).await
}
}
impl<T> Future for SenderPolling<T>
@@ -113,3 +128,16 @@ where
}
}
}
impl<T> Clone for SenderPolling<T> {
fn clone(&self) -> Self {
let sender = self.sender.as_ref().unwrap().as_ref().clone();
SenderPolling::new(sender)
}
}
impl<T> From<Sender<T>> for SenderPolling<T> {
fn from(value: Sender<T>) -> Self {
SenderPolling::new(value)
}
}

View File

@@ -270,6 +270,7 @@ stats_proc::stats_struct!((
ca_conn_eos_ok,
ca_conn_eos_unexpected,
response_tx_fail,
try_push_ca_conn_cmds_sent,
try_push_ca_conn_cmds_full,
try_push_ca_conn_cmds_closed,
logic_error,