diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 75dde75..26410eb 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -93,7 +93,6 @@ const POLL_READ_TIMEOUT: Duration = Duration::from_millis(1000 * 10); const DO_RATE_CHECK: bool = false; const CHANNEL_STATUS_PONG_QUIET: Duration = Duration::from_millis(1000 * 60 * 60); const METRICS_EMIT_IVL: Duration = Duration::from_millis(1000 * 1); -const USE_BIN_WRITER: bool = true; macro_rules! trace3 { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ); } @@ -1052,6 +1051,7 @@ pub struct CaConnOpts { // TODO make private when we don't share it anymore pub(super) insert_queue_max: usize, pub(super) array_truncate: usize, + binwriter_enable: bool, } impl CaConnOpts { @@ -1059,6 +1059,12 @@ impl CaConnOpts { self.insert_queue_max = val; self } + + pub fn binwriter_use(self, v: bool) -> Self { + let mut ret = self; + ret.binwriter_enable = v; + ret + } } impl Default for CaConnOpts { @@ -1066,6 +1072,7 @@ impl Default for CaConnOpts { Self { insert_queue_max: 20000, array_truncate: 2000000, + binwriter_enable: false, } } } @@ -1946,6 +1953,7 @@ impl CaConn { stnow, tscaproto, ch_conf.use_ioc_time(), + self.opts.binwriter_enable, mett, &mut self.rng, )?; @@ -1979,6 +1987,7 @@ impl CaConn { stnow, tscaproto, ch_conf.use_ioc_time(), + self.opts.binwriter_enable, mett, &mut self.rng, )?; @@ -2157,6 +2166,7 @@ impl CaConn { tsnow, tscaproto, ch_conf.use_ioc_time(), + self.opts.binwriter_enable, mett, &mut self.rng, )?; @@ -2250,6 +2260,7 @@ impl CaConn { tsnow, tscaproto, ch_conf.use_ioc_time(), + self.opts.binwriter_enable, mett, &mut self.rng, )?; @@ -2283,6 +2294,7 @@ impl CaConn { tsnow: Instant, tscaproto: Instant, use_ioc_time: bool, + binwriter_enable: bool, mett: &mut CaConnMetrics, rng: &mut Xoshiro128PlusPlus, ) -> Result<(), Error> { @@ -2301,6 +2313,7 @@ impl CaConn { stnow, tscaproto, use_ioc_time, + binwriter_enable, mett, rng, )?; @@ -2319,6 +2332,7 @@ impl CaConn { stnow: SystemTime, tscaproto: Instant, use_ioc_time: bool, + binwriter_enable: bool, mett: &mut CaConnMetrics, rng: &mut Xoshiro128PlusPlus, ) -> Result<(), Error> { @@ -2380,7 +2394,7 @@ impl CaConn { Self::check_ev_value_data(&value.data, &writer.scalar_type())?; crst.muted_before = 0; crst.insert_item_ivl_ema.tick(tsnow); - if USE_BIN_WRITER { + if binwriter_enable { binwriter.ingest(tsev, value.f32_for_binning(), iqdqs)?; } { @@ -3458,7 +3472,7 @@ impl CaConn { if let ChannelState::Writable(st2) = chst { let iqdqs = &mut self.iqdqs; st2.writer.tick(iqdqs)?; - if USE_BIN_WRITER { + if self.opts.binwriter_enable { st2.binwriter.tick(iqdqs)?; } } diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index e1990c7..ee075b7 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -408,6 +408,7 @@ pub struct CaConnSet { thr_msg_storage_len: ThrottleTrace, cssid_latency_max: Duration, mett: stats::mett::CaConnSetMetrics, + use_binwriter: bool, } impl CaConnSet { @@ -422,6 +423,7 @@ impl CaConnSet { channel_info_query_tx: Sender, ingest_opts: CaIngestOpts, ) -> CaConnSetCtrl { + let use_binwriter = ingest_opts.binwriter_enable(); let (ca_conn_res_tx, ca_conn_res_rx) = async_channel::bounded(200); let (connset_inp_tx, connset_inp_rx) = async_channel::bounded(200); let (connset_out_tx, connset_out_rx) = async_channel::bounded(200); @@ -467,6 +469,7 @@ impl CaConnSet { thr_msg_storage_len: ThrottleTrace::new(Duration::from_millis(1000)), cssid_latency_max: Duration::from_millis(2000), mett: stats::mett::CaConnSetMetrics::new(), + use_binwriter, }; // TODO await on jh let jh = tokio::spawn(CaConnSet::run(connset)); @@ -1276,7 +1279,7 @@ impl CaConnSet { fn create_ca_conn(&mut self, add: ChannelAddWithAddr) -> Result { // TODO should we save this as event? - let opts = CaConnOpts::default(); + let opts = CaConnOpts::default().binwriter_use(self.use_binwriter); let addr = add.addr; let addr_v4 = if let SocketAddr::V4(x) = add.addr { x diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 9903f0b..1e479fb 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -45,6 +45,8 @@ pub struct CaIngestOpts { scylla_disable: bool, #[serde(default)] scylla_ignore_writes: bool, + #[serde(default)] + binwriter_enable: bool, } impl CaIngestOpts { @@ -160,6 +162,10 @@ impl CaIngestOpts { self.scylla_ignore_writes } + pub fn binwriter_enable(&self) -> bool { + self.binwriter_enable + } + pub fn is_valid(&self) -> bool { let confs = [&self.scylla_st, &self.scylla_mt, &self.scylla_lt, &self.scylla_st_rf1]; let has_default_hosts = self.scylla.is_some();