diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 740444d..a788b1a 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.3-aa.1" +version = "0.2.3" authors = ["Dominik Werder "] edition = "2021" diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 0d2c9c9..bb2e344 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -356,6 +356,7 @@ struct Monitoring2PassiveState { // Holds instant when we entered this state. A receive of an event is considered a re-enter of the state, // so the instant gets updated. Used for timeout check. tsbeg: Instant, + ts_silence_read_next: Instant, } #[derive(Debug, Clone)] @@ -1111,6 +1112,10 @@ impl CaConn { Duration::from_millis(8000 + (rng.next_u32() & 0xfff) as u64) } + fn silence_read_next_ivl_rng(rng: &mut Xoshiro128PlusPlus) -> Duration { + Duration::from_millis(1000 * 300 + (rng.next_u32() & 0x3fff) as u64) + } + fn new_self_ticker() -> Pin> { Box::pin(tokio::time::sleep(Duration::from_millis(1500))) } @@ -1674,7 +1679,10 @@ impl CaConn { st.reading = ReadingState::Monitoring(MonitoringState { tsbeg: tsnow, subid: st2.subid, - mon2state: Monitoring2State::Passive(Monitoring2PassiveState { tsbeg: tsnow }), + mon2state: Monitoring2State::Passive(Monitoring2PassiveState { + tsbeg: tsnow, + ts_silence_read_next: tsnow + Self::silence_read_next_ivl_rng(&mut self.rng), + }), }); let crst = &mut st.channel; let writer = &mut st.writer; @@ -1899,7 +1907,10 @@ impl CaConn { self.stats.recv_read_notify_state_read_pending.inc(); } self.read_ioids.remove(&ioid); - st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState { tsbeg: tsnow }); + st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState { + tsbeg: tsnow, + ts_silence_read_next: tsnow + Self::silence_read_next_ivl_rng(&mut self.rng), + }); { let item = ChannelStatusItem { ts: self.tmp_ts_poll, @@ -1910,7 +1921,13 @@ impl CaConn { } let iqdqs = &mut self.iqdqs; let stats = self.stats.as_ref(); - Self::read_notify_res_for_write(ev, ch_wrst, st, iqdqs, stnow, tsnow, stats)?; + // TODO check ADEL to see if monitor should have fired. + // But there is still a small chance that the monitor will just received slightly later. + // More involved check would be to raise a flag, wait for the expected monitor for some + // timeout, and if we get nothing error out. + if false { + Self::read_notify_res_for_write(ev, ch_wrst, st, iqdqs, stnow, tsnow, stats)?; + } } }, ReadingState::StopMonitoringForPolling(..) => { diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 0e941e5..4a5ae7c 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -1559,7 +1559,9 @@ impl CaConnSet { } Ready(Err(e)) => match e { scywr::senderpolling::Error::NoSendInProgress => { - let e = Error::with_msg_no_trace(format!("try_push_ca_conn_cmds E-A {addr} {e}")); + let e = Error::with_msg_no_trace(format!( + "try_push_ca_conn_cmds E-A {addr} NoSendInProgress" + )); error!("{e}"); return Err(e); } diff --git a/scywr/src/senderpolling.rs b/scywr/src/senderpolling.rs index 9312d82..920eece 100644 --- a/scywr/src/senderpolling.rs +++ b/scywr/src/senderpolling.rs @@ -1,7 +1,6 @@ use async_channel::Send; use async_channel::SendError; use async_channel::Sender; -use err::thiserror; use futures_util::Future; use pin_project::pin_project; use std::marker::PhantomPinned; @@ -9,13 +8,9 @@ use std::pin::Pin; use std::ptr::NonNull; use std::task::Context; use std::task::Poll; -use thiserror::Error; -#[derive(Debug, Error)] -#[cstm(name = "SenderPolling")] pub enum Error { NoSendInProgress, - #[error("Closed")] Closed(T), }