Do not write on silent monitor check

This commit is contained in:
Dominik Werder
2024-08-28 17:12:04 +02:00
parent 4550d26f37
commit b6d8f8830a
4 changed files with 24 additions and 10 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.2.3-aa.1"
version = "0.2.3"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -356,6 +356,7 @@ struct Monitoring2PassiveState {
// Holds instant when we entered this state. A receive of an event is considered a re-enter of the state,
// so the instant gets updated. Used for timeout check.
tsbeg: Instant,
ts_silence_read_next: Instant,
}
#[derive(Debug, Clone)]
@@ -1111,6 +1112,10 @@ impl CaConn {
Duration::from_millis(8000 + (rng.next_u32() & 0xfff) as u64)
}
fn silence_read_next_ivl_rng(rng: &mut Xoshiro128PlusPlus) -> Duration {
Duration::from_millis(1000 * 300 + (rng.next_u32() & 0x3fff) as u64)
}
fn new_self_ticker() -> Pin<Box<tokio::time::Sleep>> {
Box::pin(tokio::time::sleep(Duration::from_millis(1500)))
}
@@ -1674,7 +1679,10 @@ impl CaConn {
st.reading = ReadingState::Monitoring(MonitoringState {
tsbeg: tsnow,
subid: st2.subid,
mon2state: Monitoring2State::Passive(Monitoring2PassiveState { tsbeg: tsnow }),
mon2state: Monitoring2State::Passive(Monitoring2PassiveState {
tsbeg: tsnow,
ts_silence_read_next: tsnow + Self::silence_read_next_ivl_rng(&mut self.rng),
}),
});
let crst = &mut st.channel;
let writer = &mut st.writer;
@@ -1899,7 +1907,10 @@ impl CaConn {
self.stats.recv_read_notify_state_read_pending.inc();
}
self.read_ioids.remove(&ioid);
st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState { tsbeg: tsnow });
st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState {
tsbeg: tsnow,
ts_silence_read_next: tsnow + Self::silence_read_next_ivl_rng(&mut self.rng),
});
{
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
@@ -1910,7 +1921,13 @@ impl CaConn {
}
let iqdqs = &mut self.iqdqs;
let stats = self.stats.as_ref();
Self::read_notify_res_for_write(ev, ch_wrst, st, iqdqs, stnow, tsnow, stats)?;
// TODO check ADEL to see if monitor should have fired.
// But there is still a small chance that the monitor will just received slightly later.
// More involved check would be to raise a flag, wait for the expected monitor for some
// timeout, and if we get nothing error out.
if false {
Self::read_notify_res_for_write(ev, ch_wrst, st, iqdqs, stnow, tsnow, stats)?;
}
}
},
ReadingState::StopMonitoringForPolling(..) => {

View File

@@ -1559,7 +1559,9 @@ impl CaConnSet {
}
Ready(Err(e)) => match e {
scywr::senderpolling::Error::NoSendInProgress => {
let e = Error::with_msg_no_trace(format!("try_push_ca_conn_cmds E-A {addr} {e}"));
let e = Error::with_msg_no_trace(format!(
"try_push_ca_conn_cmds E-A {addr} NoSendInProgress"
));
error!("{e}");
return Err(e);
}

View File

@@ -1,7 +1,6 @@
use async_channel::Send;
use async_channel::SendError;
use async_channel::Sender;
use err::thiserror;
use futures_util::Future;
use pin_project::pin_project;
use std::marker::PhantomPinned;
@@ -9,13 +8,9 @@ use std::pin::Pin;
use std::ptr::NonNull;
use std::task::Context;
use std::task::Poll;
use thiserror::Error;
#[derive(Debug, Error)]
#[cstm(name = "SenderPolling")]
pub enum Error<T> {
NoSendInProgress,
#[error("Closed")]
Closed(T),
}