Send channel status in conn set

This commit is contained in:
Dominik Werder
2024-08-26 17:28:10 +02:00
parent a715f7f9c7
commit 4550d26f37
7 changed files with 92 additions and 100 deletions

View File

@@ -23,6 +23,8 @@ use futures_util::Stream;
use futures_util::StreamExt;
use hashbrown::HashMap;
use log::*;
use netpod::channelstatus::ChannelStatus;
use netpod::channelstatus::ChannelStatusClosedReason;
use netpod::timeunits::*;
use netpod::trigger;
use netpod::ttl::RetentionTime;
@@ -45,8 +47,6 @@ use scywr::iteminsertqueue as scywriiq;
use scywr::senderpolling::SenderPolling;
use scywriiq::Accounting;
use scywriiq::AccountingRecv;
use scywriiq::ChannelStatus;
use scywriiq::ChannelStatusClosedReason;
use scywriiq::ChannelStatusItem;
use scywriiq::ConnectionStatus;
use scywriiq::ConnectionStatusItem;
@@ -90,6 +90,7 @@ const DO_RATE_CHECK: bool = false;
const MONITOR_POLL_TIMEOUT: Duration = Duration::from_millis(6000);
const TIMEOUT_CHANNEL_CLOSING: Duration = Duration::from_millis(8000);
const TIMEOUT_PONG_WAIT: Duration = Duration::from_millis(10000);
const READ_CHANNEL_VALUE_STATUS_EMIT_QUIET_MIN: Duration = Duration::from_millis(120000);
#[allow(unused)]
macro_rules! trace2 {
@@ -472,6 +473,7 @@ struct CreatedState {
name: String,
enum_str_table: Option<Vec<String>>,
status_emit_count: u64,
ts_recv_value_status_emit_next: Instant,
}
impl CreatedState {
@@ -510,6 +512,7 @@ impl CreatedState {
name: String::new(),
enum_str_table: None,
status_emit_count: 0,
ts_recv_value_status_emit_next: Instant::now(),
}
}
@@ -1599,8 +1602,8 @@ impl CaConn {
return Ok(());
};
let dbg_chn = dbg_chn_cid(cid, self);
let ch_s = if let Some(x) = self.channels.get_mut(&cid) {
&mut x.state
let (ch_s, ch_wrst) = if let Some(x) = self.channels.get_mut(&cid) {
(&mut x.state, &mut x.wrst)
} else {
// TODO return better as error and let caller decide (with more structured errors)
warn!("TODO handle_event_add_res can not find channel for {cid:?} {subid:?}");
@@ -1681,6 +1684,7 @@ impl CaConn {
Self::event_add_ingest(
ev.payload_len,
ev.value,
ch_wrst,
crst,
writer,
binwriter,
@@ -1709,6 +1713,7 @@ impl CaConn {
Self::event_add_ingest(
ev.payload_len,
ev.value,
ch_wrst,
crst,
writer,
binwriter,
@@ -1869,7 +1874,7 @@ impl CaConn {
st2.tick = PollTickState::Idle(tsnow);
let iqdqs = &mut self.iqdqs;
let stats = self.stats.as_ref();
Self::read_notify_res_for_write(ev, st, iqdqs, stnow, tsnow, stats)?;
Self::read_notify_res_for_write(ev, ch_wrst, st, iqdqs, stnow, tsnow, stats)?;
}
},
ReadingState::EnableMonitoring(_) => {
@@ -1905,7 +1910,7 @@ impl CaConn {
}
let iqdqs = &mut self.iqdqs;
let stats = self.stats.as_ref();
Self::read_notify_res_for_write(ev, st, iqdqs, stnow, tsnow, stats)?;
Self::read_notify_res_for_write(ev, ch_wrst, st, iqdqs, stnow, tsnow, stats)?;
}
},
ReadingState::StopMonitoringForPolling(..) => {
@@ -1928,6 +1933,7 @@ impl CaConn {
fn read_notify_res_for_write(
ev: proto::ReadNotifyRes,
wrst: &mut WriterStatus,
st: &mut WritableState,
iqdqs: &mut InsertDeques,
stnow: SystemTime,
@@ -1940,6 +1946,7 @@ impl CaConn {
Self::event_add_ingest(
ev.payload_len,
ev.value,
wrst,
crst,
writer,
binwriter,
@@ -1954,6 +1961,7 @@ impl CaConn {
fn event_add_ingest(
payload_len: u32,
value: CaEventValue,
wrst: &mut WriterStatus,
crst: &mut CreatedState,
writer: &mut CaRtWriter,
binwriter: &mut BinWriter,
@@ -1983,6 +1991,18 @@ impl CaConn {
crst.recv_bytes += payload_len as u64;
crst.acc_recv.push_written(payload_len);
// TODO should attach these counters already to Writable state.
if crst.ts_recv_value_status_emit_next <= tsnow {
crst.ts_recv_value_status_emit_next = tsnow + READ_CHANNEL_VALUE_STATUS_EMIT_QUIET_MIN;
let item = ChannelStatusItem {
ts: stnow,
cssid: crst.cssid,
status: ChannelStatus::MonitoringSilenceReadUnchanged,
};
let deque = &mut iqdqs.st_rf3_qu;
if wrst.emit_channel_status_item(item, deque).is_err() {
stats.logic_error().inc();
}
}
let ts_local = TsNano::from_system_time(stnow);
{
let ts = value.ts().ok_or_else(|| Error::MissingTimestamp)?;
@@ -2568,6 +2588,7 @@ impl CaConn {
name: conf.conf.name().into(),
enum_str_table: None,
status_emit_count: 0,
ts_recv_value_status_emit_next: Instant::now(),
};
if dbg_chn_name(created_state.name()) {
info!(

View File

@@ -63,6 +63,7 @@ use std::pin::Pin;
use netpod::trigger;
use netpod::OnDrop;
use netpod::TsNano;
use scywr::insertqueues::InsertQueuesTx;
use series::SeriesId;
use std::sync::Arc;
@@ -625,12 +626,25 @@ impl CaConnSet {
self.cssid_latency_max = dt + Duration::from_millis(2000);
debug!("slow cssid fetch dt {:.0} ms {:?}", 1e3 * dt.as_secs_f32(), cmd);
}
let writer_status = serieswriter::writer::SeriesWriter::new(SeriesId::new(cmd.cssid.id()))
let mut writer_status = serieswriter::writer::SeriesWriter::new(SeriesId::new(cmd.cssid.id()))
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let writer_status_state = serieswriter::fixgridwriter::ChannelStatusWriteState::new(
let mut writer_status_state = serieswriter::fixgridwriter::ChannelStatusWriteState::new(
SeriesId::new(cmd.cssid.id()),
serieswriter::fixgridwriter::CHANNEL_STATUS_GRID,
);
{
let status = netpod::channelstatus::ChannelStatus::HaveStatusId;
let stnow = SystemTime::now();
let ts = TsNano::from_system_time(stnow);
let item = serieswriter::fixgridwriter::ChannelStatusWriteValue::new(ts, status.to_u64());
let state = &mut writer_status_state;
let ts_net = Instant::now();
let mut deque = VecDeque::new();
writer_status
.write(item, state, ts_net, ts, &mut deque)
.map_err(Error::from_string)?;
self.storage_insert_queue.push_back(deque);
}
*chst2 = ActiveChannelState::WithStatusSeriesId(WithStatusSeriesIdState {
cssid: cmd.cssid,
addr_find_backoff: 0,
@@ -679,12 +693,25 @@ impl CaConnSet {
trace!("handle_add_channel_with_addr INNER {cmd:?}");
self.stats.handle_add_channel_with_addr().inc();
let tsnow = SystemTime::now();
let writer_status = serieswriter::writer::SeriesWriter::new(SeriesId::new(cmd.cssid.id()))
let mut writer_status = serieswriter::writer::SeriesWriter::new(SeriesId::new(cmd.cssid.id()))
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let writer_status_state = serieswriter::fixgridwriter::ChannelStatusWriteState::new(
let mut writer_status_state = serieswriter::fixgridwriter::ChannelStatusWriteState::new(
SeriesId::new(cmd.cssid.id()),
serieswriter::fixgridwriter::CHANNEL_STATUS_GRID,
);
{
let status = netpod::channelstatus::ChannelStatus::HaveAddress;
let stnow = SystemTime::now();
let ts = TsNano::from_system_time(stnow);
let item = serieswriter::fixgridwriter::ChannelStatusWriteValue::new(ts, status.to_u64());
let state = &mut writer_status_state;
let ts_net = Instant::now();
let mut deque = VecDeque::new();
writer_status
.write(item, state, ts_net, ts, &mut deque)
.map_err(Error::from_string)?;
self.storage_insert_queue.push_back(deque);
}
*st3 = WithStatusSeriesIdState {
cssid: cmd.cssid.clone(),
addr_find_backoff: 0,
@@ -1657,13 +1684,13 @@ impl Stream for CaConnSet {
if self.storage_insert_sender.is_idle() {
if let Some(item) = self.storage_insert_queue.pop_front() {
self.stats.logic_error().inc();
self.storage_insert_sender.as_mut().send_pin(item);
}
}
if self.storage_insert_sender.is_sending() {
match self.storage_insert_sender.poll_unpin(cx) {
Ready(Ok(())) => {
self.stats.storage_insert_queue_send().inc();
have_progress = true;
}
Ready(Err(_)) => {

View File

@@ -638,7 +638,12 @@ impl CaMsgTy {
AccessRightsRes(_) => {}
EventAdd(_) => {
// Using flags DBE_ARCHIVE, DBE_ALARM, DBE_PROPERTY.
let flags = 0b1110;
let dbe_value = 0x01;
let dbe_log = 0x02;
let dbe_alarm = 0x04;
let dbe_property = 0x08;
let _ = dbe_value | dbe_property;
let flags = dbe_log | dbe_alarm;
buf.copy_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, flags, 0, 0]);
}
EventAddRes(_) => {}

View File

@@ -6,6 +6,7 @@ pub mod linuxhelper;
pub mod metrics;
pub mod netbuf;
pub mod polltimer;
pub mod ratelimit;
pub mod rt;
#[cfg(test)]
pub mod test;

23
netfetch/src/ratelimit.rs Normal file
View File

@@ -0,0 +1,23 @@
use std::time::Duration;
use std::time::Instant;
pub struct RateLimit {
last: Instant,
dtmin: Duration,
}
impl RateLimit {
pub fn new(dtmin: Duration) -> Self {
let last = Instant::now().checked_sub(2 * dtmin).unwrap();
Self { last, dtmin }
}
pub fn trigger(&mut self, tsnow: Instant) -> bool {
if self.last + self.dtmin <= tsnow {
self.last = tsnow;
true
} else {
false
}
}
}

View File

@@ -7,6 +7,8 @@ use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::FutureExt;
use netpod::channelstatus::ChannelStatus;
use netpod::channelstatus::ChannelStatusClosedReason;
use netpod::DtNano;
use netpod::Shape;
use netpod::TsMs;
@@ -408,94 +410,6 @@ pub struct ConnectionStatusItem {
pub status: ConnectionStatus,
}
#[derive(Debug, Clone)]
pub enum ChannelStatusClosedReason {
ShutdownCommand,
ChannelRemove,
ProtocolError,
FrequencyQuota,
BandwidthQuota,
InternalError,
IocTimeout,
NoProtocol,
ProtocolDone,
ConnectFail,
IoError,
}
#[derive(Debug, Clone)]
pub enum ChannelStatus {
AssignedToAddress,
Opened,
Closed(ChannelStatusClosedReason),
Pong,
MonitoringSilenceReadStart,
MonitoringSilenceReadTimeout,
MonitoringSilenceReadUnchanged,
}
impl ChannelStatus {
pub fn to_kind(&self) -> u32 {
use ChannelStatus::*;
use ChannelStatusClosedReason::*;
match self {
AssignedToAddress => 24,
Opened => 1,
Closed(x) => match x {
ShutdownCommand => 2,
ChannelRemove => 3,
ProtocolError => 4,
FrequencyQuota => 5,
BandwidthQuota => 6,
InternalError => 7,
IocTimeout => 8,
NoProtocol => 9,
ProtocolDone => 10,
ConnectFail => 11,
IoError => 12,
},
Pong => 25,
MonitoringSilenceReadStart => 26,
MonitoringSilenceReadTimeout => 27,
MonitoringSilenceReadUnchanged => 28,
}
}
pub fn from_kind(kind: u32) -> Result<Self, err::Error> {
use ChannelStatus::*;
use ChannelStatusClosedReason::*;
let ret = match kind {
1 => Opened,
2 => Closed(ShutdownCommand),
3 => Closed(ChannelRemove),
4 => Closed(ProtocolError),
5 => Closed(FrequencyQuota),
6 => Closed(BandwidthQuota),
7 => Closed(InternalError),
8 => Closed(IocTimeout),
9 => Closed(NoProtocol),
10 => Closed(ProtocolDone),
11 => Closed(ConnectFail),
12 => Closed(IoError),
24 => AssignedToAddress,
25 => Pong,
26 => MonitoringSilenceReadStart,
27 => MonitoringSilenceReadTimeout,
28 => MonitoringSilenceReadUnchanged,
_ => {
return Err(err::Error::with_msg_no_trace(format!(
"unknown ChannelStatus kind {kind}"
)));
}
};
Ok(ret)
}
pub fn to_u64(&self) -> u64 {
self.to_kind() as u64
}
}
#[derive(Debug, Clone)]
pub enum ShutdownReason {
ConnectRefused,

View File

@@ -424,6 +424,7 @@ stats_proc::stats_struct!((
handle_add_channel_with_addr,
create_ca_conn,
command_reply_fail,
storage_insert_queue_send,
),
values(
storage_insert_queue_len,