From a6f6ba427705208fc94fc9bfef61adf61b4a3534 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 24 Oct 2023 11:33:12 +0200 Subject: [PATCH] Latest --- daqingest/src/daemon.rs | 7 +++++-- netfetch/src/ca/proto.rs | 24 ++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 2cd87ae..f198163 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -54,6 +54,7 @@ pub struct DaemonOpts { insert_worker_count: usize, insert_scylla_sessions: usize, insert_frac: Arc, + store_workers_rate: Arc, } impl DaemonOpts { @@ -139,11 +140,11 @@ impl Daemon { } }); - let use_rate_limit_queue = false; + let use_rate_limit_queue = true; let ttls = opts.ttls.clone(); let insert_worker_opts = InsertWorkerOpts { - store_workers_rate: Arc::new(AtomicU64::new(20000000)), + store_workers_rate: opts.store_workers_rate.clone(), insert_workers_running: Arc::new(AtomicU64::new(0)), insert_frac: opts.insert_frac.clone(), array_truncate: Arc::new(AtomicU64::new(opts.array_truncate)), @@ -579,6 +580,7 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> } let insert_frac = Arc::new(AtomicU64::new(opts.insert_frac())); + let store_workers_rate = Arc::new(AtomicU64::new(opts.store_workers_rate())); let opts2 = DaemonOpts { backend: opts.backend().into(), @@ -597,6 +599,7 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> insert_worker_count: opts.insert_worker_count(), insert_scylla_sessions: opts.insert_scylla_sessions(), insert_frac: insert_frac.clone(), + store_workers_rate, }; let daemon = Daemon::new(opts2, opts.clone()).await?; let tx = daemon.tx.clone(); diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index eb4e28d..f165954 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -852,6 +852,30 @@ impl CaMsg { value, payload_len: hi.payload_len() as u32, }; + // TODO quick test only + if false { + let nn = 4; + let mut blob = vec![0; nn]; + for (i, x) in blob.iter_mut().enumerate() { + *x = i as _; + } + let d = EventAddRes { + // i32 with time and status + data_type: 19, + data_count: nn as u32, + status: hi.param1, + subid: hi.param2, + value: CaEventValue { + ts, + status: ca_status, + severity: ca_severity, + data: CaDataValue::Array(CaDataArrayValue::I32(blob)), + }, + payload_len: hi.payload_len() as u32, + }; + let ty = CaMsgTy::EventAddRes(d); + return Ok(CaMsg::from_ty_ts(ty, tsnow)); + } let ty = CaMsgTy::EventAddRes(d); CaMsg::from_ty_ts(ty, tsnow) }