diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index ba6b6ca..d323e85 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1,5 +1,6 @@ mod enumfetch; +use crate::ca::connset::CaConnSet; use crate::conf::ChannelConfig; use crate::metrics::status::StorageUsage; use crate::throttletrace::ThrottleTrace; @@ -69,6 +70,7 @@ use stats::rand_xoshiro::rand_core::SeedableRng; use std::collections::BTreeMap; use std::collections::VecDeque; use std::fmt; +use std::marker::PhantomData; use std::net::SocketAddrV4; use std::pin::Pin; use std::sync::atomic; @@ -875,6 +877,230 @@ fn info_store_msp_from_time(ts: SystemTime) -> u32 { (dt.as_secs() / 60 * 60) as u32 } +fn check_ev_value_data(data: &proto::CaDataValue, scalar_type: &ScalarType) -> Result<(), Error> { + use ca_proto::ca::proto::CaDataScalarValue; + use ca_proto::ca::proto::CaDataValue; + match data { + CaDataValue::Scalar(x) => match &x { + CaDataScalarValue::F32(..) => match &scalar_type { + ScalarType::F32 => {} + _ => { + error!("MISMATCH got f32 exp {:?}", scalar_type); + } + }, + CaDataScalarValue::F64(..) => match &scalar_type { + ScalarType::F64 => {} + _ => { + error!("MISMATCH got f64 exp {:?}", scalar_type); + } + }, + CaDataScalarValue::I16(..) => match &scalar_type { + ScalarType::I16 => {} + ScalarType::Enum => {} + _ => { + error!("MISMATCH got i16 exp {:?}", scalar_type); + } + }, + CaDataScalarValue::I32(..) => match &scalar_type { + ScalarType::I32 => {} + _ => { + error!("MISMATCH got i32 exp {:?}", scalar_type); + } + }, + _ => {} + }, + _ => {} + } + Ok(()) +} + +struct EventAddIngestRefobjStage1<'a> { + iqdqs: &'a mut InsertDeques, + crst: &'a mut CreatedState, + rtwriter: &'a mut CaRtWriter, + mett: &'a mut CaConnMetrics, + rng: &'a mut Xoshiro128PlusPlus, + binwriter: Option<&'a mut BinWriter>, +} + +impl<'a> EventAddIngestRefobjStage1<'a> { + fn and_channel_status_writer(self, wrst: &'a mut WriterStatus) -> EventAddIngestRefobjStage2<'a> { + EventAddIngestRefobjStage2 { + iqdqs: self.iqdqs, + crst: self.crst, + rtwriter: self.rtwriter, + mett: self.mett, + rng: self.rng, + binwriter: self.binwriter, + wrst, + } + } +} + +struct EventAddIngestRefobjStage2<'a> { + iqdqs: &'a mut InsertDeques, + crst: &'a mut CreatedState, + rtwriter: &'a mut CaRtWriter, + mett: &'a mut CaConnMetrics, + rng: &'a mut Xoshiro128PlusPlus, + binwriter: Option<&'a mut BinWriter>, + // + wrst: &'a mut WriterStatus, +} + +impl<'a> EventAddIngestRefobjStage2<'a> { + fn and_with_use_ioc_time(self, v: bool) -> EventAddIngestRefobj<'a> { + EventAddIngestRefobj { + iqdqs: self.iqdqs, + wrst: self.wrst, + crst: self.crst, + rtwriter: self.rtwriter, + mett: self.mett, + rng: self.rng, + binwriter: self.binwriter, + use_ioc_time: v, + } + } +} + +struct EventAddIngestRefobj<'a> { + iqdqs: &'a mut InsertDeques, + wrst: &'a mut WriterStatus, + crst: &'a mut CreatedState, + rtwriter: &'a mut CaRtWriter, + mett: &'a mut CaConnMetrics, + rng: &'a mut Xoshiro128PlusPlus, + binwriter: Option<&'a mut BinWriter>, + use_ioc_time: bool, +} + +impl<'a> EventAddIngestRefobj<'a> { + fn from_writable_state( + opts: &CaConnOpts, + iqdqs: &'a mut InsertDeques, + st: &'a mut WritableState, + mett: &'a mut CaConnMetrics, + rng: &'a mut Xoshiro128PlusPlus, + ) -> EventAddIngestRefobjStage1<'a> { + let binwriter = if opts.binwriter_enable { + Some(&mut st.binwriter) + } else { + None + }; + EventAddIngestRefobjStage1 { + iqdqs, + crst: &mut st.channel, + rtwriter: &mut st.writer, + mett, + rng, + binwriter, + } + } + + fn event_add_ingest( + &mut self, + payload_len: u32, + value: CaEventValue, + tsnow: Instant, + stnow: SystemTime, + tscaproto: Instant, + ) -> Result<(), Error> { + let wrst = &mut self.wrst; + let crst = &mut self.crst; + let rtwriter = &mut self.rtwriter; + let mett = &mut self.mett; + { + use ca_proto::ca::proto::CaMetaValue::*; + match &value.meta { + CaMetaTime(meta) => { + if meta.status != 0 { + let sid = rtwriter.series(); + debug!("{:?} status {:3} severity {:3}", sid, meta.status, meta.severity); + } + } + _ => {} + } + } + trace_event_incoming!("event_add_ingest payload_len {} value {:?}", payload_len, value); + crst.ts_alive_last = tsnow; + crst.ts_activity_last = tsnow; + crst.st_activity_last = stnow; + crst.item_recv_ivl_ema.tick(tsnow); + crst.recv_count += 1; + crst.recv_bytes += payload_len as u64; + crst.acc_recv.push_written(payload_len); + // TODO should attach these counters already to Writable state. + if crst.ts_recv_value_status_emit_next <= tsnow { + crst.ts_recv_value_status_emit_next = tsnow + CaConn::recv_value_status_emit_ivl_rng(self.rng); + // TODO was only for debugging + if false { + let item = ChannelStatusItem { + ts: stnow, + cssid: crst.cssid, + status: ChannelStatus::MonitoringSilenceReadUnchanged, + }; + if wrst + .emit_channel_status_item(item, CaConn::channel_status_qu(self.iqdqs)) + .is_err() + { + mett.logic_error().inc(); + } + } + } + let tsev_local = TsNano::from_system_time(stnow); + { + let ts = value.ts().ok_or_else(|| Error::MissingTimestamp)?; + let ts_diff = ts.abs_diff(tsev_local.ns()); + let ts_diff_dur = Duration::from_nanos(ts_diff); + mett.ca_ts_off().push_dur_100us(ts_diff_dur); + } + { + let tsev = if self.use_ioc_time { + if let Some(x) = value.ts() { + TsNano::from_ns(x) + } else { + tsev_local + } + } else { + tsev_local + }; + check_ev_value_data(&value.data, &rtwriter.scalar_type())?; + crst.muted_before = 0; + crst.insert_item_ivl_ema.tick(tsnow); + let val_for_agg = value.f32_for_binning(); + let wres = rtwriter.write(CaWriterValue::new(value, crst), tscaproto, tsev, self.iqdqs)?; + if wres.st.accept { + crst.dw_st_last = stnow; + crst.acc_st.push_written(payload_len); + } + if wres.mt.accept { + crst.dw_mt_last = stnow; + crst.acc_mt.push_written(payload_len); + } + if wres.lt.accept { + crst.dw_lt_last = stnow; + crst.acc_lt.push_written(payload_len); + } + if let Some(binwriter) = self.binwriter.as_mut() { + binwriter.ingest(tsev, val_for_agg, self.iqdqs)?; + } + } + if false { + // TODO record stats on drop with the new filter + { + if tsnow.duration_since(crst.insert_recv_ivl_last) >= Duration::from_millis(10000) { + crst.insert_recv_ivl_last = tsnow; + let ema = crst.insert_item_ivl_ema.ema(); + let _ = ema; + } + if crst.muted_before == 0 {} + crst.muted_before = 1; + } + } + Ok(()) + } +} + pub type CmdResTx = Sender>; #[derive(Debug)] @@ -1966,27 +2192,21 @@ impl CaConn { monitoring_event_last: Some(ev.clone()), last_comparisons: VecDeque::new(), }); - let crst = &mut st.channel; - let writer = &mut st.writer; - let binwriter = &mut st.binwriter; - let iqdqs = &mut self.iqdqs; - let mett = &mut self.mett; - Self::event_add_ingest( - ev.payload_len, - ev.value, - ch_wrst, - crst, - writer, - binwriter, - iqdqs, - tsnow, - stnow, - tscaproto, - ch_conf.use_ioc_time(), - self.opts.binwriter_enable, - mett, + let binwriter = if self.opts.binwriter_enable { + Some(&mut st.binwriter) + } else { + None + }; + let mut robj = EventAddIngestRefobj::from_writable_state( + &self.opts, + &mut self.iqdqs, + st, + &mut self.mett, &mut self.rng, - )?; + ) + .and_channel_status_writer(ch_wrst) + .and_with_use_ioc_time(ch_conf.use_ioc_time()); + robj.event_add_ingest(ev.payload_len, ev.value, tsnow, stnow, tscaproto)?; } ReadingState::Monitoring(st2) => { match &mut st2.mon2state { @@ -1999,28 +2219,22 @@ impl CaConn { self.mett.recv_event_add_while_wait_on_read_notify().inc(); } } - let crst = &mut st.channel; - let writer = &mut st.writer; - let binwriter = &mut st.binwriter; - let iqdqs = &mut self.iqdqs; - let mett = &mut self.mett; st2.monitoring_event_last = Some(ev.clone()); - Self::event_add_ingest( - ev.payload_len, - ev.value, - ch_wrst, - crst, - writer, - binwriter, - iqdqs, - tsnow, - stnow, - tscaproto, - ch_conf.use_ioc_time(), - self.opts.binwriter_enable, - mett, + let binwriter = if self.opts.binwriter_enable { + Some(&mut st.binwriter) + } else { + None + }; + let mut robj = EventAddIngestRefobj::from_writable_state( + &self.opts, + &mut self.iqdqs, + st, + &mut self.mett, &mut self.rng, - )?; + ) + .and_channel_status_writer(ch_wrst) + .and_with_use_ioc_time(ch_conf.use_ioc_time()); + robj.event_add_ingest(ev.payload_len, ev.value, tsnow, stnow, tscaproto)?; } ReadingState::StopMonitoringForPolling(st2) => { // TODO count for metrics @@ -2185,21 +2399,21 @@ impl CaConn { trace!("make next poll idle at {:?} tsnow {:?}", next, tsnow); } st2.tick = PollTickState::Idle(PollTickStateIdle { next }); - let iqdqs = &mut self.iqdqs; - let mett = &mut self.mett; - Self::read_notify_res_for_write( - ev, - ch_wrst, + let binwriter = if self.opts.binwriter_enable { + Some(&mut st.binwriter) + } else { + None + }; + let mut robj = EventAddIngestRefobj::from_writable_state( + &self.opts, + &mut self.iqdqs, st, - iqdqs, - stnow, - tsnow, - tscaproto, - ch_conf.use_ioc_time(), - self.opts.binwriter_enable, - mett, + &mut self.mett, &mut self.rng, - )?; + ) + .and_channel_status_writer(ch_wrst) + .and_with_use_ioc_time(ch_conf.use_ioc_time()); + robj.event_add_ingest(ev.payload_len, ev.value, tsnow, stnow, tscaproto)?; } }, ReadingState::EnableMonitoring(_) => { @@ -2281,19 +2495,16 @@ impl CaConn { // timeout, and if we get nothing error out. // TODO read-result-after-monitor-silence if false { - Self::read_notify_res_for_write( - ev, - ch_wrst, + let mut robj = EventAddIngestRefobj::from_writable_state( + &self.opts, + &mut self.iqdqs, st, - iqdqs, - stnow, - tsnow, - tscaproto, - ch_conf.use_ioc_time(), - self.opts.binwriter_enable, - mett, + &mut self.mett, &mut self.rng, - )?; + ) + .and_channel_status_writer(ch_wrst) + .and_with_use_ioc_time(ch_conf.use_ioc_time()); + robj.event_add_ingest(ev.payload_len, ev.value, tsnow, stnow, tscaproto)?; } } }, @@ -2315,187 +2526,6 @@ impl CaConn { } } - fn read_notify_res_for_write( - ev: proto::ReadNotifyRes, - wrst: &mut WriterStatus, - st: &mut WritableState, - iqdqs: &mut InsertDeques, - stnow: SystemTime, - tsnow: Instant, - tscaproto: Instant, - use_ioc_time: bool, - binwriter_enable: bool, - mett: &mut CaConnMetrics, - rng: &mut Xoshiro128PlusPlus, - ) -> Result<(), Error> { - let crst = &mut st.channel; - let writer = &mut st.writer; - let binwriter = &mut st.binwriter; - Self::event_add_ingest( - ev.payload_len, - ev.value, - wrst, - crst, - writer, - binwriter, - iqdqs, - tsnow, - stnow, - tscaproto, - use_ioc_time, - binwriter_enable, - mett, - rng, - )?; - Ok(()) - } - - fn event_add_ingest( - payload_len: u32, - value: CaEventValue, - wrst: &mut WriterStatus, - crst: &mut CreatedState, - writer: &mut CaRtWriter, - binwriter: &mut BinWriter, - iqdqs: &mut InsertDeques, - tsnow: Instant, - stnow: SystemTime, - tscaproto: Instant, - use_ioc_time: bool, - binwriter_enable: bool, - mett: &mut CaConnMetrics, - rng: &mut Xoshiro128PlusPlus, - ) -> Result<(), Error> { - { - use ca_proto::ca::proto::CaMetaValue::*; - match &value.meta { - CaMetaTime(meta) => { - if meta.status != 0 { - let sid = writer.series(); - debug!("{:?} status {:3} severity {:3}", sid, meta.status, meta.severity); - } - } - _ => {} - } - } - trace_event_incoming!("event_add_ingest payload_len {} value {:?}", payload_len, value); - crst.ts_alive_last = tsnow; - crst.ts_activity_last = tsnow; - crst.st_activity_last = stnow; - crst.item_recv_ivl_ema.tick(tsnow); - crst.recv_count += 1; - crst.recv_bytes += payload_len as u64; - crst.acc_recv.push_written(payload_len); - // TODO should attach these counters already to Writable state. - if crst.ts_recv_value_status_emit_next <= tsnow { - crst.ts_recv_value_status_emit_next = tsnow + Self::recv_value_status_emit_ivl_rng(rng); - // TODO was only for debugging - if false { - let item = ChannelStatusItem { - ts: stnow, - cssid: crst.cssid, - status: ChannelStatus::MonitoringSilenceReadUnchanged, - }; - if wrst - .emit_channel_status_item(item, Self::channel_status_qu(iqdqs)) - .is_err() - { - mett.logic_error().inc(); - } - } - } - let tsev_local = TsNano::from_system_time(stnow); - { - let ts = value.ts().ok_or_else(|| Error::MissingTimestamp)?; - let ts_diff = ts.abs_diff(tsev_local.ns()); - let ts_diff_dur = Duration::from_nanos(ts_diff); - mett.ca_ts_off().push_dur_100us(ts_diff_dur); - } - { - let tsev = if use_ioc_time { - if let Some(x) = value.ts() { - TsNano::from_ns(x) - } else { - tsev_local - } - } else { - tsev_local - }; - Self::check_ev_value_data(&value.data, &writer.scalar_type())?; - crst.muted_before = 0; - crst.insert_item_ivl_ema.tick(tsnow); - let val_for_agg = value.f32_for_binning(); - let wres = writer.write(CaWriterValue::new(value, crst), tscaproto, tsev, iqdqs)?; - if wres.st.accept { - crst.dw_st_last = stnow; - crst.acc_st.push_written(payload_len); - } - if wres.mt.accept { - crst.dw_mt_last = stnow; - crst.acc_mt.push_written(payload_len); - } - if wres.lt.accept { - crst.dw_lt_last = stnow; - crst.acc_lt.push_written(payload_len); - } - if binwriter_enable { - if true || wres.accept_any() { - binwriter.ingest(tsev, val_for_agg, iqdqs)?; - } - } - } - if false { - // TODO record stats on drop with the new filter - { - if tsnow.duration_since(crst.insert_recv_ivl_last) >= Duration::from_millis(10000) { - crst.insert_recv_ivl_last = tsnow; - let ema = crst.insert_item_ivl_ema.ema(); - let _ = ema; - } - if crst.muted_before == 0 {} - crst.muted_before = 1; - } - } - Ok(()) - } - - fn check_ev_value_data(data: &proto::CaDataValue, scalar_type: &ScalarType) -> Result<(), Error> { - use ca_proto::ca::proto::CaDataScalarValue; - use ca_proto::ca::proto::CaDataValue; - match data { - CaDataValue::Scalar(x) => match &x { - CaDataScalarValue::F32(..) => match &scalar_type { - ScalarType::F32 => {} - _ => { - error!("MISMATCH got f32 exp {:?}", scalar_type); - } - }, - CaDataScalarValue::F64(..) => match &scalar_type { - ScalarType::F64 => {} - _ => { - error!("MISMATCH got f64 exp {:?}", scalar_type); - } - }, - CaDataScalarValue::I16(..) => match &scalar_type { - ScalarType::I16 => {} - ScalarType::Enum => {} - _ => { - error!("MISMATCH got i16 exp {:?}", scalar_type); - } - }, - CaDataScalarValue::I32(..) => match &scalar_type { - ScalarType::I32 => {} - _ => { - error!("MISMATCH got i32 exp {:?}", scalar_type); - } - }, - _ => {} - }, - _ => {} - } - Ok(()) - } - /* Acts more like a stream? Can be: Pending