Status event on silent monitored channel

This commit is contained in:
Dominik Werder
2024-08-22 16:33:05 +02:00
parent 95acfd6061
commit a715f7f9c7
9 changed files with 152 additions and 53 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.2.2"
version = "0.2.3-aa.1"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -42,17 +42,17 @@ use scywr::insertqueues::InsertDeques;
use scywr::insertqueues::InsertQueuesTx;
use scywr::insertqueues::InsertSenderPolling;
use scywr::iteminsertqueue as scywriiq;
use scywr::iteminsertqueue::Accounting;
use scywr::iteminsertqueue::AccountingRecv;
use scywr::iteminsertqueue::MspItem;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::ShutdownReason;
use scywr::senderpolling::SenderPolling;
use scywriiq::Accounting;
use scywriiq::AccountingRecv;
use scywriiq::ChannelStatus;
use scywriiq::ChannelStatusClosedReason;
use scywriiq::ChannelStatusItem;
use scywriiq::ConnectionStatus;
use scywriiq::ConnectionStatusItem;
use scywriiq::MspItem;
use scywriiq::QueryItem;
use scywriiq::ShutdownReason;
use serde::Serialize;
use series::ChannelStatusSeriesId;
use series::SeriesId;
@@ -695,11 +695,13 @@ impl WriterStatus {
item: ChannelStatusItem,
deque: &mut VecDeque<QueryItem>,
) -> Result<(), Error> {
let tsev = TsNano::from_system_time(SystemTime::now());
let (ts, val) = item.to_ts_val();
self.writer_status.write(
serieswriter::fixgridwriter::ChannelStatusWriteValue::new(ts, val),
&mut self.writer_status_state,
Instant::now(),
tsev,
deque,
)?;
Ok(())
@@ -1831,8 +1833,8 @@ impl CaConn {
}
} else {
if let Some(cid) = self.read_ioids.get(&ioid) {
let ch_s = if let Some(x) = self.channels.get_mut(cid) {
&mut x.state
let (ch_s, ch_wrst) = if let Some(x) = self.channels.get_mut(cid) {
(&mut x.state, &mut x.wrst)
} else {
warn!("handle_read_notify_res can not find channel for {cid:?} {ioid:?}");
return Ok(());
@@ -1893,6 +1895,14 @@ impl CaConn {
}
self.read_ioids.remove(&ioid);
st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState { tsbeg: tsnow });
{
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: st.channel.cssid.clone(),
status: ChannelStatus::MonitoringSilenceReadUnchanged,
};
ch_wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?;
}
let iqdqs = &mut self.iqdqs;
let stats = self.stats.as_ref();
Self::read_notify_res_for_write(ev, st, iqdqs, stnow, tsnow, stats)?;
@@ -1973,14 +1983,14 @@ impl CaConn {
crst.recv_bytes += payload_len as u64;
crst.acc_recv.push_written(payload_len);
// TODO should attach these counters already to Writable state.
let ts_local = {
let epoch = stnow.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::ZERO);
epoch.as_secs() * SEC + epoch.subsec_nanos() as u64
};
let ts = value.ts().ok_or_else(|| Error::MissingTimestamp)?;
let ts_diff = ts.abs_diff(ts_local);
stats.ca_ts_off().ingest((ts_diff / MS) as u32);
let ts_local = TsNano::from_system_time(stnow);
{
let ts = value.ts().ok_or_else(|| Error::MissingTimestamp)?;
let ts_diff = ts.abs_diff(ts_local.ns());
stats.ca_ts_off().ingest((ts_diff / MS) as u32);
}
{
let evts = ts_local;
Self::check_ev_value_data(&value.data, &writer.scalar_type())?;
crst.muted_before = 0;
crst.insert_item_ivl_ema.tick(tsnow);
@@ -1988,7 +1998,7 @@ impl CaConn {
// let ts_local = TsNano::from_ns(ts_local);
// binwriter.ingest(ts_ioc, ts_local, &val, iqdqs)?;
{
let wres = writer.write(CaWriterValue::new(value, crst), tsnow, iqdqs)?;
let wres = writer.write(CaWriterValue::new(value, crst), tsnow, evts, iqdqs)?;
crst.status_emit_count += wres.nstatus() as u64;
if wres.st.accept {
crst.dw_st_last = stnow;
@@ -2186,6 +2196,14 @@ impl CaConn {
self.proto.as_mut().ok_or_else(|| Error::NoProtocol)?.push_out(msg);
st3.mon2state = Monitoring2State::ReadPending(ioid, tsnow);
self.stats.caget_issued().inc();
{
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: st2.channel.cssid.clone(),
status: ChannelStatus::MonitoringSilenceReadStart,
};
conf.wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?;
}
}
}
Monitoring2State::ReadPending(ioid, since) => {
@@ -2200,6 +2218,14 @@ impl CaConn {
name,
ioid
);
{
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: st2.channel.cssid.clone(),
status: ChannelStatus::MonitoringSilenceReadTimeout,
};
conf.wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?;
}
if false {
// Here we try to close the channel at hand.
@@ -3392,6 +3418,7 @@ impl EmittableType for CaWriterValue {
fn into_query_item(
mut self,
ts_net: Instant,
tsev: TsNano,
state: &mut <Self as EmittableType>::State,
) -> serieswriter::writer::EmitRes {
let mut items = serieswriter::writer::SmallVec::new();
@@ -3409,10 +3436,7 @@ impl EmittableType for CaWriterValue {
},
None => true,
};
let ts = TsNano::from_ns(self.0.ts().unwrap());
if let Some(ts) = self.0.ts() {
state.last_accepted_ts = TsNano::from_ns(ts);
}
let ts = tsev;
state.last_accepted_val = Some(self.clone());
let byte_size = self.byte_size();
if diff_data {

View File

@@ -1384,15 +1384,16 @@ impl CaConnSet {
MaybeWrongAddressState::new(stnow, st3.addr_find_backoff),
);
let item = ChannelStatusItem::new_closed_conn_timeout(stnow, st3.cssid.clone());
let (ts, val) = item.to_ts_val();
let (tsev, val) = item.to_ts_val();
let deque = &mut item_deque;
st3.writer_status
.as_mut()
.unwrap()
.write(
serieswriter::fixgridwriter::ChannelStatusWriteValue::new(ts, val),
serieswriter::fixgridwriter::ChannelStatusWriteValue::new(tsev, val),
st3.writer_status_state.as_mut().unwrap(),
tsnow,
tsev,
deque,
)
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;

View File

@@ -90,6 +90,7 @@ impl EmittableType for WritableType {
fn into_query_item(
self,
ts_net: Instant,
tsev: TsNano,
state: &mut <Self as EmittableType>::State,
) -> serieswriter::writer::EmitRes {
todo!()
@@ -317,13 +318,16 @@ where
.map_err(|_| Error::Decode)?;
let evs: EventsDim0<T> = evs.into();
trace_input!("see events {:?}", evs);
warn!("TODO require timestamp in input format");
let stnow = SystemTime::now();
let tsev = TsNano::from_system_time(stnow);
let tsnow = Instant::now();
let mut emit_state = ();
for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() {
let val = val.clone();
trace_input!("ev {:6} {:20} {:20?}", i, ts, val);
let val = f1(val);
writer.write(WritableType(val), &mut emit_state, tsnow, deque)?;
writer.write(WritableType(val), &mut emit_state, tsnow, tsev, deque)?;
}
Ok(())
}
@@ -345,13 +349,16 @@ where
.map_err(|_| Error::Decode)?;
let evs: EventsDim1<T> = evs.into();
trace_input!("see events {:?}", evs);
warn!("TODO require timestamp in input format");
let stnow = SystemTime::now();
let tsev = TsNano::from_system_time(stnow);
let tsnow = Instant::now();
let mut emit_state = ();
for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() {
let val = val.clone();
trace_input!("ev {:6} {:20} {:20?}", i, ts, val);
let val = f1(val);
writer.write(WritableType(val), &mut emit_state, tsnow, deque)?;
writer.write(WritableType(val), &mut emit_state, tsnow, tsev, deque)?;
}
Ok(())
}

View File

@@ -429,6 +429,9 @@ pub enum ChannelStatus {
Opened,
Closed(ChannelStatusClosedReason),
Pong,
MonitoringSilenceReadStart,
MonitoringSilenceReadTimeout,
MonitoringSilenceReadUnchanged,
}
impl ChannelStatus {
@@ -452,6 +455,9 @@ impl ChannelStatus {
IoError => 12,
},
Pong => 25,
MonitoringSilenceReadStart => 26,
MonitoringSilenceReadTimeout => 27,
MonitoringSilenceReadUnchanged => 28,
}
}
@@ -473,6 +479,9 @@ impl ChannelStatus {
12 => Closed(IoError),
24 => AssignedToAddress,
25 => Pong,
26 => MonitoringSilenceReadStart,
27 => MonitoringSilenceReadTimeout,
28 => MonitoringSilenceReadUnchanged,
_ => {
return Err(err::Error::with_msg_no_trace(format!(
"unknown ChannelStatus kind {kind}"

View File

@@ -43,10 +43,11 @@ impl EmittableType for ChannelStatusWriteValue {
fn into_query_item(
self,
ts_net: Instant,
tsev: TsNano,
state: &mut <Self as EmittableType>::State,
) -> serieswriter::writer::EmitRes {
let mut items = serieswriter::writer::SmallVec::new();
let ts = self.ts();
let ts = tsev;
state.last_accepted_ts = ts;
state.last_accepted_val = Some(self.1);
let byte_size = self.byte_size();

View File

@@ -15,8 +15,8 @@ use std::time::Instant;
#[allow(unused)]
macro_rules! trace_rt_decision {
($($arg:tt)*) => {
if true {
($det:expr, $($arg:tt)*) => {
if $det {
trace!($($arg)*);
}
};
@@ -46,6 +46,7 @@ where
last_insert_val: Option<ET>,
dbgname: String,
writer: SeriesWriter<ET>,
do_trace_detail: bool,
_t1: PhantomData<ET>,
}
@@ -68,30 +69,55 @@ where
last_insert_val: None,
dbgname,
writer,
do_trace_detail: netpod::TRACE_SERIES_ID.contains(&series.id()),
_t1: PhantomData,
};
if ret.do_trace_detail {
debug!("debug test for detail series");
trace!("trace test for detail series");
}
Ok(ret)
}
pub fn write(&mut self, item: ET, ts_net: Instant, deque: &mut VecDeque<QueryItem>) -> Result<WriteRes, Error> {
pub fn write(
&mut self,
item: ET,
ts_net: Instant,
tsev: TsNano,
deque: &mut VecDeque<QueryItem>,
) -> Result<WriteRes, Error> {
// Decide whether we want to write.
// TODO catch already in CaConn the cases when the IOC-timestamp did not change.
let det = self.do_trace_detail;
let tsl = self.last_insert_ts.clone();
let dbgname = &self.dbgname;
let sid = &self.series;
let min_quiet = 1000 * self.min_quiet.as_secs() + self.min_quiet.subsec_millis() as u64;
let ts = tsev;
if false {
trace_rt_decision!(
det,
"{dbgname} {sid} min_quiet {min_quiet:?} ts1 {ts1:?} ts2 {ts2:?} item {item:?}",
ts1 = ts.ms(),
ts2 = tsl.ms(),
item = item,
);
}
let do_write = {
let ts = item.ts();
if ts == tsl {
trace_rt_decision!("{dbgname} {sid} ignore, because same time {ts:?} {tsl:?}");
trace_rt_decision!(det, "{dbgname} {sid} ignore, because same time {ts:?} {tsl:?}");
false
} else if ts < tsl {
trace_rt_decision!("{dbgname} {sid} ignore, because ts_local rewind {ts:?} {tsl:?}");
trace_rt_decision!(
det,
"{dbgname} {sid} ignore, because ts_local rewind {ts:?} {tsl:?}",
);
false
} else if ts.ms() < tsl.ms() + 1000 * self.min_quiet.as_secs() {
trace_rt_decision!("{dbgname} {sid} ignore, because not min quiet {ts:?} {tsl:?}");
} else if ts.ms() < tsl.ms() + min_quiet {
trace_rt_decision!(det, "{dbgname} {sid} ignore, because not min quiet {ts:?} {tsl:?}");
false
} else if ts < tsl.add_dt_nano(DtNano::from_ms(5)) {
trace_rt_decision!("{dbgname} {sid} ignore, because store rate cap");
trace_rt_decision!(det, "{dbgname} {sid} ignore, because store rate cap");
false
} else if self
.last_insert_val
@@ -99,15 +125,19 @@ where
.map(|k| !item.has_change(k))
.unwrap_or(false)
{
trace_rt_decision!("{dbgname} {sid} ignore, because value did not change");
trace_rt_decision!(det, "{dbgname} {sid} ignore, because value did not change");
false
} else {
trace_rt_decision!("{dbgname} {sid} accept");
trace_rt_decision!(det, "{dbgname} {sid} accept");
if true {
self.last_insert_val = Some(item.clone());
}
self.last_insert_ts = ts.clone();
true
}
};
if do_write {
let res = self.writer.write(item, &mut self.emit_state, ts_net, deque)?;
let res = self.writer.write(item, &mut self.emit_state, ts_net, ts, deque)?;
let ret = WriteRes {
accept: true,
bytes: res.bytes,

View File

@@ -5,6 +5,7 @@ use err::ThisError;
use netpod::log::*;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::QueryItem;
use series::SeriesId;
@@ -13,9 +14,9 @@ use std::time::Duration;
use std::time::Instant;
#[allow(unused)]
macro_rules! trace_ {
($($arg:tt)*) => {
if false {
macro_rules! trace_emit {
($det:expr, $($arg:tt)*) => {
if $det {
trace!($($arg)*);
}
};
@@ -76,6 +77,7 @@ where
state_mt: State<ET>,
state_lt: State<ET>,
min_quiets: MinQuiets,
do_trace_detail: bool,
}
impl<ET> RtWriter<ET>
@@ -110,6 +112,7 @@ where
state_mt,
state_lt,
min_quiets,
do_trace_detail: netpod::TRACE_SERIES_ID.contains(&series.id()),
};
Ok(ret)
}
@@ -130,14 +133,21 @@ where
self.min_quiets.clone()
}
pub fn write(&mut self, item: ET, ts_net: Instant, iqdqs: &mut InsertDeques) -> Result<WriteRes, Error> {
trace!("write {:?}", item.ts());
pub fn write(
&mut self,
item: ET,
ts_net: Instant,
tsev: TsNano,
iqdqs: &mut InsertDeques,
) -> Result<WriteRes, Error> {
let det = self.do_trace_detail;
trace_emit!(det, "write {:?}", item.ts());
// TODO
// Optimize for the common case that we only write into one of the stores.
// Make the decision first, based on ref, then clone only as required.
let res_st = Self::write_inner(&mut self.state_st, item.clone(), ts_net, &mut iqdqs.st_rf3_qu)?;
let res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, &mut iqdqs.mt_rf3_qu)?;
let res_lt = Self::write_inner(&mut self.state_lt, item, ts_net, &mut iqdqs.lt_rf3_qu)?;
let res_st = Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf3_qu)?;
let res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, tsev, &mut iqdqs.mt_rf3_qu)?;
let res_lt = Self::write_inner(&mut self.state_lt, item, ts_net, tsev, &mut iqdqs.lt_rf3_qu)?;
let ret = WriteRes {
st: WriteRtRes {
accept: res_st.accept,
@@ -162,9 +172,10 @@ where
state: &mut State<ET>,
item: ET,
ts_net: Instant,
tsev: TsNano,
deque: &mut VecDeque<QueryItem>,
) -> Result<crate::ratelimitwriter::WriteRes, Error> {
Ok(state.writer.write(item, ts_net, deque)?)
Ok(state.writer.write(item, ts_net, tsev, deque)?)
}
pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {

View File

@@ -11,6 +11,15 @@ use std::time::Instant;
use core::fmt;
pub use smallvec::SmallVec;
#[allow(unused)]
macro_rules! trace_emit {
($det:expr, $($arg:tt)*) => {
if $det {
trace!($($arg)*);
}
};
}
#[derive(Debug)]
pub struct EmitRes {
pub items: SmallVec<[QueryItem; 4]>,
@@ -23,7 +32,7 @@ pub trait EmittableType: fmt::Debug + Clone {
fn ts(&self) -> TsNano;
fn has_change(&self, k: &Self) -> bool;
fn byte_size(&self) -> u32;
fn into_query_item(self, ts_net: Instant, state: &mut <Self as EmittableType>::State) -> EmitRes;
fn into_query_item(self, ts_net: Instant, tsev: TsNano, state: &mut <Self as EmittableType>::State) -> EmitRes;
}
#[derive(Debug, ThisError)]
@@ -60,7 +69,8 @@ pub struct WriteRes {
#[derive(Debug)]
pub struct SeriesWriter<ET> {
sid: SeriesId,
series: SeriesId,
do_trace_detail: bool,
_t1: PhantomData<ET>,
}
@@ -68,13 +78,17 @@ impl<ET> SeriesWriter<ET>
where
ET: EmittableType,
{
pub fn new(sid: SeriesId) -> Result<Self, Error> {
let res = Self { sid, _t1: PhantomData };
pub fn new(series: SeriesId) -> Result<Self, Error> {
let res = Self {
series,
do_trace_detail: netpod::TRACE_SERIES_ID.contains(&series.id()),
_t1: PhantomData,
};
Ok(res)
}
pub fn sid(&self) -> SeriesId {
self.sid.clone()
self.series.clone()
}
pub fn write(
@@ -82,11 +96,13 @@ where
item: ET,
state: &mut <ET as EmittableType>::State,
ts_net: Instant,
tsev: TsNano,
deque: &mut VecDeque<QueryItem>,
) -> Result<WriteRes, Error> {
let det = self.do_trace_detail;
let ts_main = item.ts();
let res = item.into_query_item(ts_net, state);
trace!("emit value for ts {:?} items len {}", ts_main, res.items.len());
let res = item.into_query_item(ts_net, tsev, state);
trace_emit!(det, "emit value for ts {:?} items len {}", ts_main, res.items.len());
for item in res.items {
deque.push_back(item);
}