Factor out

This commit is contained in:
Dominik Werder
2025-07-02 12:30:37 +02:00
parent 2f20b49193
commit 0d3caea38a
+275 -245
View File
@@ -1,5 +1,6 @@
mod enumfetch;
use crate::ca::connset::CaConnSet;
use crate::conf::ChannelConfig;
use crate::metrics::status::StorageUsage;
use crate::throttletrace::ThrottleTrace;
@@ -69,6 +70,7 @@ use stats::rand_xoshiro::rand_core::SeedableRng;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::fmt;
use std::marker::PhantomData;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::sync::atomic;
@@ -875,6 +877,230 @@ fn info_store_msp_from_time(ts: SystemTime) -> u32 {
(dt.as_secs() / 60 * 60) as u32
}
fn check_ev_value_data(data: &proto::CaDataValue, scalar_type: &ScalarType) -> Result<(), Error> {
use ca_proto::ca::proto::CaDataScalarValue;
use ca_proto::ca::proto::CaDataValue;
match data {
CaDataValue::Scalar(x) => match &x {
CaDataScalarValue::F32(..) => match &scalar_type {
ScalarType::F32 => {}
_ => {
error!("MISMATCH got f32 exp {:?}", scalar_type);
}
},
CaDataScalarValue::F64(..) => match &scalar_type {
ScalarType::F64 => {}
_ => {
error!("MISMATCH got f64 exp {:?}", scalar_type);
}
},
CaDataScalarValue::I16(..) => match &scalar_type {
ScalarType::I16 => {}
ScalarType::Enum => {}
_ => {
error!("MISMATCH got i16 exp {:?}", scalar_type);
}
},
CaDataScalarValue::I32(..) => match &scalar_type {
ScalarType::I32 => {}
_ => {
error!("MISMATCH got i32 exp {:?}", scalar_type);
}
},
_ => {}
},
_ => {}
}
Ok(())
}
struct EventAddIngestRefobjStage1<'a> {
iqdqs: &'a mut InsertDeques,
crst: &'a mut CreatedState,
rtwriter: &'a mut CaRtWriter,
mett: &'a mut CaConnMetrics,
rng: &'a mut Xoshiro128PlusPlus,
binwriter: Option<&'a mut BinWriter>,
}
impl<'a> EventAddIngestRefobjStage1<'a> {
fn and_channel_status_writer(self, wrst: &'a mut WriterStatus) -> EventAddIngestRefobjStage2<'a> {
EventAddIngestRefobjStage2 {
iqdqs: self.iqdqs,
crst: self.crst,
rtwriter: self.rtwriter,
mett: self.mett,
rng: self.rng,
binwriter: self.binwriter,
wrst,
}
}
}
struct EventAddIngestRefobjStage2<'a> {
iqdqs: &'a mut InsertDeques,
crst: &'a mut CreatedState,
rtwriter: &'a mut CaRtWriter,
mett: &'a mut CaConnMetrics,
rng: &'a mut Xoshiro128PlusPlus,
binwriter: Option<&'a mut BinWriter>,
//
wrst: &'a mut WriterStatus,
}
impl<'a> EventAddIngestRefobjStage2<'a> {
fn and_with_use_ioc_time(self, v: bool) -> EventAddIngestRefobj<'a> {
EventAddIngestRefobj {
iqdqs: self.iqdqs,
wrst: self.wrst,
crst: self.crst,
rtwriter: self.rtwriter,
mett: self.mett,
rng: self.rng,
binwriter: self.binwriter,
use_ioc_time: v,
}
}
}
struct EventAddIngestRefobj<'a> {
iqdqs: &'a mut InsertDeques,
wrst: &'a mut WriterStatus,
crst: &'a mut CreatedState,
rtwriter: &'a mut CaRtWriter,
mett: &'a mut CaConnMetrics,
rng: &'a mut Xoshiro128PlusPlus,
binwriter: Option<&'a mut BinWriter>,
use_ioc_time: bool,
}
impl<'a> EventAddIngestRefobj<'a> {
fn from_writable_state(
opts: &CaConnOpts,
iqdqs: &'a mut InsertDeques,
st: &'a mut WritableState,
mett: &'a mut CaConnMetrics,
rng: &'a mut Xoshiro128PlusPlus,
) -> EventAddIngestRefobjStage1<'a> {
let binwriter = if opts.binwriter_enable {
Some(&mut st.binwriter)
} else {
None
};
EventAddIngestRefobjStage1 {
iqdqs,
crst: &mut st.channel,
rtwriter: &mut st.writer,
mett,
rng,
binwriter,
}
}
fn event_add_ingest(
&mut self,
payload_len: u32,
value: CaEventValue,
tsnow: Instant,
stnow: SystemTime,
tscaproto: Instant,
) -> Result<(), Error> {
let wrst = &mut self.wrst;
let crst = &mut self.crst;
let rtwriter = &mut self.rtwriter;
let mett = &mut self.mett;
{
use ca_proto::ca::proto::CaMetaValue::*;
match &value.meta {
CaMetaTime(meta) => {
if meta.status != 0 {
let sid = rtwriter.series();
debug!("{:?} status {:3} severity {:3}", sid, meta.status, meta.severity);
}
}
_ => {}
}
}
trace_event_incoming!("event_add_ingest payload_len {} value {:?}", payload_len, value);
crst.ts_alive_last = tsnow;
crst.ts_activity_last = tsnow;
crst.st_activity_last = stnow;
crst.item_recv_ivl_ema.tick(tsnow);
crst.recv_count += 1;
crst.recv_bytes += payload_len as u64;
crst.acc_recv.push_written(payload_len);
// TODO should attach these counters already to Writable state.
if crst.ts_recv_value_status_emit_next <= tsnow {
crst.ts_recv_value_status_emit_next = tsnow + CaConn::recv_value_status_emit_ivl_rng(self.rng);
// TODO was only for debugging
if false {
let item = ChannelStatusItem {
ts: stnow,
cssid: crst.cssid,
status: ChannelStatus::MonitoringSilenceReadUnchanged,
};
if wrst
.emit_channel_status_item(item, CaConn::channel_status_qu(self.iqdqs))
.is_err()
{
mett.logic_error().inc();
}
}
}
let tsev_local = TsNano::from_system_time(stnow);
{
let ts = value.ts().ok_or_else(|| Error::MissingTimestamp)?;
let ts_diff = ts.abs_diff(tsev_local.ns());
let ts_diff_dur = Duration::from_nanos(ts_diff);
mett.ca_ts_off().push_dur_100us(ts_diff_dur);
}
{
let tsev = if self.use_ioc_time {
if let Some(x) = value.ts() {
TsNano::from_ns(x)
} else {
tsev_local
}
} else {
tsev_local
};
check_ev_value_data(&value.data, &rtwriter.scalar_type())?;
crst.muted_before = 0;
crst.insert_item_ivl_ema.tick(tsnow);
let val_for_agg = value.f32_for_binning();
let wres = rtwriter.write(CaWriterValue::new(value, crst), tscaproto, tsev, self.iqdqs)?;
if wres.st.accept {
crst.dw_st_last = stnow;
crst.acc_st.push_written(payload_len);
}
if wres.mt.accept {
crst.dw_mt_last = stnow;
crst.acc_mt.push_written(payload_len);
}
if wres.lt.accept {
crst.dw_lt_last = stnow;
crst.acc_lt.push_written(payload_len);
}
if let Some(binwriter) = self.binwriter.as_mut() {
binwriter.ingest(tsev, val_for_agg, self.iqdqs)?;
}
}
if false {
// TODO record stats on drop with the new filter
{
if tsnow.duration_since(crst.insert_recv_ivl_last) >= Duration::from_millis(10000) {
crst.insert_recv_ivl_last = tsnow;
let ema = crst.insert_item_ivl_ema.ema();
let _ = ema;
}
if crst.muted_before == 0 {}
crst.muted_before = 1;
}
}
Ok(())
}
}
pub type CmdResTx = Sender<Result<(), Error>>;
#[derive(Debug)]
@@ -1966,27 +2192,21 @@ impl CaConn {
monitoring_event_last: Some(ev.clone()),
last_comparisons: VecDeque::new(),
});
let crst = &mut st.channel;
let writer = &mut st.writer;
let binwriter = &mut st.binwriter;
let iqdqs = &mut self.iqdqs;
let mett = &mut self.mett;
Self::event_add_ingest(
ev.payload_len,
ev.value,
ch_wrst,
crst,
writer,
binwriter,
iqdqs,
tsnow,
stnow,
tscaproto,
ch_conf.use_ioc_time(),
self.opts.binwriter_enable,
mett,
let binwriter = if self.opts.binwriter_enable {
Some(&mut st.binwriter)
} else {
None
};
let mut robj = EventAddIngestRefobj::from_writable_state(
&self.opts,
&mut self.iqdqs,
st,
&mut self.mett,
&mut self.rng,
)?;
)
.and_channel_status_writer(ch_wrst)
.and_with_use_ioc_time(ch_conf.use_ioc_time());
robj.event_add_ingest(ev.payload_len, ev.value, tsnow, stnow, tscaproto)?;
}
ReadingState::Monitoring(st2) => {
match &mut st2.mon2state {
@@ -1999,28 +2219,22 @@ impl CaConn {
self.mett.recv_event_add_while_wait_on_read_notify().inc();
}
}
let crst = &mut st.channel;
let writer = &mut st.writer;
let binwriter = &mut st.binwriter;
let iqdqs = &mut self.iqdqs;
let mett = &mut self.mett;
st2.monitoring_event_last = Some(ev.clone());
Self::event_add_ingest(
ev.payload_len,
ev.value,
ch_wrst,
crst,
writer,
binwriter,
iqdqs,
tsnow,
stnow,
tscaproto,
ch_conf.use_ioc_time(),
self.opts.binwriter_enable,
mett,
let binwriter = if self.opts.binwriter_enable {
Some(&mut st.binwriter)
} else {
None
};
let mut robj = EventAddIngestRefobj::from_writable_state(
&self.opts,
&mut self.iqdqs,
st,
&mut self.mett,
&mut self.rng,
)?;
)
.and_channel_status_writer(ch_wrst)
.and_with_use_ioc_time(ch_conf.use_ioc_time());
robj.event_add_ingest(ev.payload_len, ev.value, tsnow, stnow, tscaproto)?;
}
ReadingState::StopMonitoringForPolling(st2) => {
// TODO count for metrics
@@ -2185,21 +2399,21 @@ impl CaConn {
trace!("make next poll idle at {:?} tsnow {:?}", next, tsnow);
}
st2.tick = PollTickState::Idle(PollTickStateIdle { next });
let iqdqs = &mut self.iqdqs;
let mett = &mut self.mett;
Self::read_notify_res_for_write(
ev,
ch_wrst,
let binwriter = if self.opts.binwriter_enable {
Some(&mut st.binwriter)
} else {
None
};
let mut robj = EventAddIngestRefobj::from_writable_state(
&self.opts,
&mut self.iqdqs,
st,
iqdqs,
stnow,
tsnow,
tscaproto,
ch_conf.use_ioc_time(),
self.opts.binwriter_enable,
mett,
&mut self.mett,
&mut self.rng,
)?;
)
.and_channel_status_writer(ch_wrst)
.and_with_use_ioc_time(ch_conf.use_ioc_time());
robj.event_add_ingest(ev.payload_len, ev.value, tsnow, stnow, tscaproto)?;
}
},
ReadingState::EnableMonitoring(_) => {
@@ -2281,19 +2495,16 @@ impl CaConn {
// timeout, and if we get nothing error out.
// TODO read-result-after-monitor-silence
if false {
Self::read_notify_res_for_write(
ev,
ch_wrst,
let mut robj = EventAddIngestRefobj::from_writable_state(
&self.opts,
&mut self.iqdqs,
st,
iqdqs,
stnow,
tsnow,
tscaproto,
ch_conf.use_ioc_time(),
self.opts.binwriter_enable,
mett,
&mut self.mett,
&mut self.rng,
)?;
)
.and_channel_status_writer(ch_wrst)
.and_with_use_ioc_time(ch_conf.use_ioc_time());
robj.event_add_ingest(ev.payload_len, ev.value, tsnow, stnow, tscaproto)?;
}
}
},
@@ -2315,187 +2526,6 @@ impl CaConn {
}
}
fn read_notify_res_for_write(
ev: proto::ReadNotifyRes,
wrst: &mut WriterStatus,
st: &mut WritableState,
iqdqs: &mut InsertDeques,
stnow: SystemTime,
tsnow: Instant,
tscaproto: Instant,
use_ioc_time: bool,
binwriter_enable: bool,
mett: &mut CaConnMetrics,
rng: &mut Xoshiro128PlusPlus,
) -> Result<(), Error> {
let crst = &mut st.channel;
let writer = &mut st.writer;
let binwriter = &mut st.binwriter;
Self::event_add_ingest(
ev.payload_len,
ev.value,
wrst,
crst,
writer,
binwriter,
iqdqs,
tsnow,
stnow,
tscaproto,
use_ioc_time,
binwriter_enable,
mett,
rng,
)?;
Ok(())
}
fn event_add_ingest(
payload_len: u32,
value: CaEventValue,
wrst: &mut WriterStatus,
crst: &mut CreatedState,
writer: &mut CaRtWriter,
binwriter: &mut BinWriter,
iqdqs: &mut InsertDeques,
tsnow: Instant,
stnow: SystemTime,
tscaproto: Instant,
use_ioc_time: bool,
binwriter_enable: bool,
mett: &mut CaConnMetrics,
rng: &mut Xoshiro128PlusPlus,
) -> Result<(), Error> {
{
use ca_proto::ca::proto::CaMetaValue::*;
match &value.meta {
CaMetaTime(meta) => {
if meta.status != 0 {
let sid = writer.series();
debug!("{:?} status {:3} severity {:3}", sid, meta.status, meta.severity);
}
}
_ => {}
}
}
trace_event_incoming!("event_add_ingest payload_len {} value {:?}", payload_len, value);
crst.ts_alive_last = tsnow;
crst.ts_activity_last = tsnow;
crst.st_activity_last = stnow;
crst.item_recv_ivl_ema.tick(tsnow);
crst.recv_count += 1;
crst.recv_bytes += payload_len as u64;
crst.acc_recv.push_written(payload_len);
// TODO should attach these counters already to Writable state.
if crst.ts_recv_value_status_emit_next <= tsnow {
crst.ts_recv_value_status_emit_next = tsnow + Self::recv_value_status_emit_ivl_rng(rng);
// TODO was only for debugging
if false {
let item = ChannelStatusItem {
ts: stnow,
cssid: crst.cssid,
status: ChannelStatus::MonitoringSilenceReadUnchanged,
};
if wrst
.emit_channel_status_item(item, Self::channel_status_qu(iqdqs))
.is_err()
{
mett.logic_error().inc();
}
}
}
let tsev_local = TsNano::from_system_time(stnow);
{
let ts = value.ts().ok_or_else(|| Error::MissingTimestamp)?;
let ts_diff = ts.abs_diff(tsev_local.ns());
let ts_diff_dur = Duration::from_nanos(ts_diff);
mett.ca_ts_off().push_dur_100us(ts_diff_dur);
}
{
let tsev = if use_ioc_time {
if let Some(x) = value.ts() {
TsNano::from_ns(x)
} else {
tsev_local
}
} else {
tsev_local
};
Self::check_ev_value_data(&value.data, &writer.scalar_type())?;
crst.muted_before = 0;
crst.insert_item_ivl_ema.tick(tsnow);
let val_for_agg = value.f32_for_binning();
let wres = writer.write(CaWriterValue::new(value, crst), tscaproto, tsev, iqdqs)?;
if wres.st.accept {
crst.dw_st_last = stnow;
crst.acc_st.push_written(payload_len);
}
if wres.mt.accept {
crst.dw_mt_last = stnow;
crst.acc_mt.push_written(payload_len);
}
if wres.lt.accept {
crst.dw_lt_last = stnow;
crst.acc_lt.push_written(payload_len);
}
if binwriter_enable {
if true || wres.accept_any() {
binwriter.ingest(tsev, val_for_agg, iqdqs)?;
}
}
}
if false {
// TODO record stats on drop with the new filter
{
if tsnow.duration_since(crst.insert_recv_ivl_last) >= Duration::from_millis(10000) {
crst.insert_recv_ivl_last = tsnow;
let ema = crst.insert_item_ivl_ema.ema();
let _ = ema;
}
if crst.muted_before == 0 {}
crst.muted_before = 1;
}
}
Ok(())
}
fn check_ev_value_data(data: &proto::CaDataValue, scalar_type: &ScalarType) -> Result<(), Error> {
use ca_proto::ca::proto::CaDataScalarValue;
use ca_proto::ca::proto::CaDataValue;
match data {
CaDataValue::Scalar(x) => match &x {
CaDataScalarValue::F32(..) => match &scalar_type {
ScalarType::F32 => {}
_ => {
error!("MISMATCH got f32 exp {:?}", scalar_type);
}
},
CaDataScalarValue::F64(..) => match &scalar_type {
ScalarType::F64 => {}
_ => {
error!("MISMATCH got f64 exp {:?}", scalar_type);
}
},
CaDataScalarValue::I16(..) => match &scalar_type {
ScalarType::I16 => {}
ScalarType::Enum => {}
_ => {
error!("MISMATCH got i16 exp {:?}", scalar_type);
}
},
CaDataScalarValue::I32(..) => match &scalar_type {
ScalarType::I32 => {}
_ => {
error!("MISMATCH got i32 exp {:?}", scalar_type);
}
},
_ => {}
},
_ => {}
}
Ok(())
}
/*
Acts more like a stream? Can be:
Pending